mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-27 18:54:27 +00:00
Compare commits
3 Commits
SIG-9522
...
chore/remo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e10c1b9723 | ||
|
|
06ae762070 | ||
|
|
3cc34baf74 |
@@ -26,6 +26,10 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
if err != nil {
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "evaluation is invalid: %v", err)
|
||||
}
|
||||
|
||||
// calculate eval delay based on rule config
|
||||
evalDelay := baserules.CalculateEvalDelay(opts.Rule, opts.ManagerOpts.EvalDelay)
|
||||
|
||||
if opts.Rule.RuleType == ruletypes.RuleTypeThreshold {
|
||||
// create a threshold rule
|
||||
tr, err := baserules.NewThresholdRule(
|
||||
@@ -35,7 +39,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
opts.Reader,
|
||||
opts.Querier,
|
||||
opts.SLogger,
|
||||
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
||||
baserules.WithEvalDelay(evalDelay),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
)
|
||||
|
||||
@@ -80,7 +84,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
opts.Querier,
|
||||
opts.SLogger,
|
||||
opts.Cache,
|
||||
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
||||
baserules.WithEvalDelay(evalDelay),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
)
|
||||
if err != nil {
|
||||
|
||||
224
pkg/query-service/rules/eval_delay.go
Normal file
224
pkg/query-service/rules/eval_delay.go
Normal file
@@ -0,0 +1,224 @@
|
||||
package rules
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
)
|
||||
|
||||
// CalculateEvalDelay determines if the default evaluation delay can be removed (set to 0)
|
||||
// based on the rule's match type, compare operator, and aggregation type.
|
||||
// If the combination ensures that new data will not invalidate the alert condition
|
||||
// (e.g. values only increase for a "Greater Than" check), the delay is removed.
|
||||
//
|
||||
// A combination is considered "safe" if new data arriving late cannot invalidate
|
||||
// a previously triggered alert condition. This happens when:
|
||||
// - The aggregation function is monotonic (only increases or only decreases)
|
||||
// - The comparison operator aligns with the monotonic direction
|
||||
// - The match type allows the safety property to hold
|
||||
//
|
||||
// Safe combinations include:
|
||||
// - Min aggregation + Below/BelowOrEq operators (Min can only decrease)
|
||||
// - Max aggregation + Above/AboveOrEq operators (Max can only increase)
|
||||
// - Count/CountDistinct + Above/AboveOrEq operators (Count can only increase)
|
||||
//
|
||||
// Returns 0 if all queries are safe, otherwise returns defaultDelay.
|
||||
func CalculateEvalDelay(rule *ruletypes.PostableRule, defaultDelay time.Duration) time.Duration {
|
||||
// Phase 1: Validate rule condition
|
||||
if !isRuleConditionValid(rule) {
|
||||
return defaultDelay
|
||||
}
|
||||
|
||||
// Phase 2: Get match type and compare operator from thresholds
|
||||
matchType, compareOp, ok := getThresholdMatchTypeAndCompareOp(rule)
|
||||
if !ok {
|
||||
return defaultDelay
|
||||
}
|
||||
|
||||
// Phase 3: Check if all queries are safe
|
||||
for _, query := range rule.RuleCondition.CompositeQuery.Queries {
|
||||
if !isQuerySafe(query, matchType, compareOp) {
|
||||
return defaultDelay
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 4: All queries are safe, delay can be removed
|
||||
return 0
|
||||
}
|
||||
|
||||
// isRuleConditionValid checks if the rule condition is valid for delay calculation.
|
||||
// Returns false if the rule condition is nil, has no queries, or has invalid thresholds.
|
||||
func isRuleConditionValid(rule *ruletypes.PostableRule) bool {
|
||||
if rule.RuleCondition == nil || rule.RuleCondition.CompositeQuery == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// BuilderQueries, PromQL Queries, ClickHouse SQL Queries attributes of CompositeQuery
|
||||
// are not supported for now, only Queries attribute is supported
|
||||
if len(rule.RuleCondition.CompositeQuery.Queries) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
// Validate that thresholds exist and contain valid match type and compare operator
|
||||
matchType, compareOp, ok := getThresholdMatchTypeAndCompareOp(rule)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
if matchType == ruletypes.MatchTypeNone || compareOp == ruletypes.CompareOpNone {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// getThresholdMatchTypeAndCompareOp extracts match type and compare operator from the rule's thresholds.
|
||||
// Returns the match type, compare operator, and a boolean indicating success.
|
||||
// All thresholds share the same match type and compare operator, so we use the first threshold's values.
|
||||
func getThresholdMatchTypeAndCompareOp(rule *ruletypes.PostableRule) (ruletypes.MatchType, ruletypes.CompareOp, bool) {
|
||||
if rule.RuleCondition == nil || rule.RuleCondition.Thresholds == nil {
|
||||
return ruletypes.MatchTypeNone, ruletypes.CompareOpNone, false
|
||||
}
|
||||
|
||||
// Get the threshold interface
|
||||
threshold, err := rule.RuleCondition.Thresholds.GetRuleThreshold()
|
||||
if err != nil {
|
||||
return ruletypes.MatchTypeNone, ruletypes.CompareOpNone, false
|
||||
}
|
||||
|
||||
// Cast to BasicRuleThresholds (only supported kind)
|
||||
basicThresholds, ok := threshold.(ruletypes.BasicRuleThresholds)
|
||||
if !ok || len(basicThresholds) == 0 {
|
||||
return ruletypes.MatchTypeNone, ruletypes.CompareOpNone, false
|
||||
}
|
||||
|
||||
// Use first threshold's MatchType and CompareOp (all thresholds share the same values)
|
||||
matchType := basicThresholds[0].MatchType
|
||||
compareOp := basicThresholds[0].CompareOp
|
||||
|
||||
return matchType, compareOp, true
|
||||
}
|
||||
|
||||
// aggregationExpressionToTimeAggregation converts the aggregation expression to the corresponding time aggregation
|
||||
// based on the expression
|
||||
// if the expression is not a valid aggregation expression, it returns the unspecified time aggregation
|
||||
// Note: Longer/more specific prefixes (e.g., "count_distinct") must be checked before shorter ones (e.g., "count")
|
||||
func aggregationExpressionToTimeAggregation(expression string) metrictypes.TimeAggregation {
|
||||
expression = strings.TrimSpace(strings.ToLower(expression))
|
||||
switch {
|
||||
case strings.HasPrefix(expression, "count_distinct"):
|
||||
return metrictypes.TimeAggregationCountDistinct
|
||||
case strings.HasPrefix(expression, "count"):
|
||||
return metrictypes.TimeAggregationCount
|
||||
case strings.HasPrefix(expression, "min"):
|
||||
return metrictypes.TimeAggregationMin
|
||||
case strings.HasPrefix(expression, "max"):
|
||||
return metrictypes.TimeAggregationMax
|
||||
case strings.HasPrefix(expression, "avg"):
|
||||
return metrictypes.TimeAggregationAvg
|
||||
case strings.HasPrefix(expression, "sum"):
|
||||
return metrictypes.TimeAggregationSum
|
||||
case strings.HasPrefix(expression, "rate"):
|
||||
return metrictypes.TimeAggregationRate
|
||||
case strings.HasPrefix(expression, "increase"):
|
||||
return metrictypes.TimeAggregationIncrease
|
||||
case strings.HasPrefix(expression, "latest"):
|
||||
return metrictypes.TimeAggregationLatest
|
||||
default:
|
||||
return metrictypes.TimeAggregationUnspecified
|
||||
}
|
||||
}
|
||||
|
||||
// extractTimeAggFromQuerySpec extracts the time aggregation from the query spec of the QueryEnvelope
|
||||
func extractTimeAggFromQuerySpec(spec any) []metrictypes.TimeAggregation {
|
||||
timeAggs := []metrictypes.TimeAggregation{}
|
||||
|
||||
// Extract the time aggregation from the query spec
|
||||
// based on different types of query spec
|
||||
switch spec := spec.(type) {
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
||||
for _, agg := range spec.Aggregations {
|
||||
timeAggs = append(timeAggs, agg.TimeAggregation)
|
||||
}
|
||||
// the log and trace aggregations don't store the time aggregation directly but expression for the aggregation
|
||||
// so we need to convert the expression to the corresponding time aggregation
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
|
||||
for _, agg := range spec.Aggregations {
|
||||
timeAggs = append(timeAggs, aggregationExpressionToTimeAggregation(agg.Expression))
|
||||
}
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
|
||||
for _, agg := range spec.Aggregations {
|
||||
timeAggs = append(timeAggs, aggregationExpressionToTimeAggregation(agg.Expression))
|
||||
}
|
||||
}
|
||||
return timeAggs
|
||||
}
|
||||
|
||||
// isQuerySafe determines if a single query is safe to remove the eval delay.
|
||||
// A query is safe only if it's a Builder query (with MetricAggregation, LogAggregation, or TraceAggregation type)
|
||||
// and all its aggregations are safe.
|
||||
func isQuerySafe(query qbtypes.QueryEnvelope, matchType ruletypes.MatchType, compareOp ruletypes.CompareOp) bool {
|
||||
// We only handle Builder Queries for now
|
||||
if query.Type != qbtypes.QueryTypeBuilder {
|
||||
return false
|
||||
}
|
||||
|
||||
// extract time aggregations from the query spec
|
||||
timeAggs := extractTimeAggFromQuerySpec(query.Spec)
|
||||
|
||||
// A query must have at least one aggregation
|
||||
if len(timeAggs) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
// All aggregations in the query must be safe
|
||||
for _, timeAgg := range timeAggs {
|
||||
if !isAggregationSafe(timeAgg, matchType, compareOp) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// isAggregationSafe checks if the aggregation is safe to remove the eval delay
|
||||
func isAggregationSafe(timeAgg metrictypes.TimeAggregation, matchType ruletypes.MatchType, compareOp ruletypes.CompareOp) bool {
|
||||
switch timeAgg {
|
||||
case metrictypes.TimeAggregationMin:
|
||||
// Group: Min, MinIf
|
||||
// Value can only go down or remain same.
|
||||
|
||||
if matchType == ruletypes.AtleastOnce || matchType == ruletypes.AllTheTimes {
|
||||
if compareOp == ruletypes.ValueIsBelow || compareOp == ruletypes.ValueBelowOrEq {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
case metrictypes.TimeAggregationMax:
|
||||
// Group: Max, MaxIf
|
||||
// Value can only go up or remain same.
|
||||
|
||||
if matchType == ruletypes.AtleastOnce || matchType == ruletypes.AllTheTimes {
|
||||
if compareOp == ruletypes.ValueIsAbove || compareOp == ruletypes.ValueAboveOrEq {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
case metrictypes.TimeAggregationCount, metrictypes.TimeAggregationCountDistinct:
|
||||
// Group: Count
|
||||
// Value can only go up or remain same.
|
||||
|
||||
if matchType == ruletypes.AtleastOnce || matchType == ruletypes.AllTheTimes {
|
||||
if compareOp == ruletypes.ValueIsAbove || compareOp == ruletypes.ValueAboveOrEq {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Other aggregations (Sum, Avg, Rate, etc.) are not safe.
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
1370
pkg/query-service/rules/eval_delay_test.go
Normal file
1370
pkg/query-service/rules/eval_delay_test.go
Normal file
File diff suppressed because it is too large
Load Diff
@@ -154,6 +154,8 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
if err != nil {
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "evaluation is invalid: %v", err)
|
||||
}
|
||||
// calculate eval delay based on rule config
|
||||
evalDelay := CalculateEvalDelay(opts.Rule, opts.ManagerOpts.EvalDelay)
|
||||
|
||||
if opts.Rule.RuleType == ruletypes.RuleTypeThreshold {
|
||||
// create a threshold rule
|
||||
@@ -164,7 +166,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
opts.Reader,
|
||||
opts.Querier,
|
||||
opts.SLogger,
|
||||
WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
||||
WithEvalDelay(evalDelay),
|
||||
WithSQLStore(opts.SQLStore),
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user