Compare commits

...

3 Commits

Author SHA1 Message Date
Abhishek Kumar Singh
e10c1b9723 feat: added handling for log and traces based aggregation for QB in CalculateEvalDelay 2025-12-18 13:43:25 +05:30
Abhishek Kumar Singh
06ae762070 fix: use matchType and compare operator from threshold rather than ruleCondition 2025-12-17 21:03:54 +05:30
Abhishek Kumar Singh
3cc34baf74 chore: remove eval delay for safe eval rules 2025-12-17 20:39:32 +05:30
4 changed files with 1603 additions and 3 deletions

View File

@@ -26,6 +26,10 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
if err != nil {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "evaluation is invalid: %v", err)
}
// calculate eval delay based on rule config
evalDelay := baserules.CalculateEvalDelay(opts.Rule, opts.ManagerOpts.EvalDelay)
if opts.Rule.RuleType == ruletypes.RuleTypeThreshold {
// create a threshold rule
tr, err := baserules.NewThresholdRule(
@@ -35,7 +39,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Reader,
opts.Querier,
opts.SLogger,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithEvalDelay(evalDelay),
baserules.WithSQLStore(opts.SQLStore),
)
@@ -80,7 +84,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Querier,
opts.SLogger,
opts.Cache,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithEvalDelay(evalDelay),
baserules.WithSQLStore(opts.SQLStore),
)
if err != nil {

View File

@@ -0,0 +1,224 @@
package rules
import (
"strings"
"time"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
)
// CalculateEvalDelay determines if the default evaluation delay can be removed (set to 0)
// based on the rule's match type, compare operator, and aggregation type.
// If the combination ensures that new data will not invalidate the alert condition
// (e.g. values only increase for a "Greater Than" check), the delay is removed.
//
// A combination is considered "safe" if new data arriving late cannot invalidate
// a previously triggered alert condition. This happens when:
// - The aggregation function is monotonic (only increases or only decreases)
// - The comparison operator aligns with the monotonic direction
// - The match type allows the safety property to hold
//
// Safe combinations include:
// - Min aggregation + Below/BelowOrEq operators (Min can only decrease)
// - Max aggregation + Above/AboveOrEq operators (Max can only increase)
// - Count/CountDistinct + Above/AboveOrEq operators (Count can only increase)
//
// Returns 0 if all queries are safe, otherwise returns defaultDelay.
func CalculateEvalDelay(rule *ruletypes.PostableRule, defaultDelay time.Duration) time.Duration {
// Phase 1: Validate rule condition
if !isRuleConditionValid(rule) {
return defaultDelay
}
// Phase 2: Get match type and compare operator from thresholds
matchType, compareOp, ok := getThresholdMatchTypeAndCompareOp(rule)
if !ok {
return defaultDelay
}
// Phase 3: Check if all queries are safe
for _, query := range rule.RuleCondition.CompositeQuery.Queries {
if !isQuerySafe(query, matchType, compareOp) {
return defaultDelay
}
}
// Phase 4: All queries are safe, delay can be removed
return 0
}
// isRuleConditionValid checks if the rule condition is valid for delay calculation.
// Returns false if the rule condition is nil, has no queries, or has invalid thresholds.
func isRuleConditionValid(rule *ruletypes.PostableRule) bool {
if rule.RuleCondition == nil || rule.RuleCondition.CompositeQuery == nil {
return false
}
// BuilderQueries, PromQL Queries, ClickHouse SQL Queries attributes of CompositeQuery
// are not supported for now, only Queries attribute is supported
if len(rule.RuleCondition.CompositeQuery.Queries) == 0 {
return false
}
// Validate that thresholds exist and contain valid match type and compare operator
matchType, compareOp, ok := getThresholdMatchTypeAndCompareOp(rule)
if !ok {
return false
}
if matchType == ruletypes.MatchTypeNone || compareOp == ruletypes.CompareOpNone {
return false
}
return true
}
// getThresholdMatchTypeAndCompareOp extracts match type and compare operator from the rule's thresholds.
// Returns the match type, compare operator, and a boolean indicating success.
// All thresholds share the same match type and compare operator, so we use the first threshold's values.
func getThresholdMatchTypeAndCompareOp(rule *ruletypes.PostableRule) (ruletypes.MatchType, ruletypes.CompareOp, bool) {
if rule.RuleCondition == nil || rule.RuleCondition.Thresholds == nil {
return ruletypes.MatchTypeNone, ruletypes.CompareOpNone, false
}
// Get the threshold interface
threshold, err := rule.RuleCondition.Thresholds.GetRuleThreshold()
if err != nil {
return ruletypes.MatchTypeNone, ruletypes.CompareOpNone, false
}
// Cast to BasicRuleThresholds (only supported kind)
basicThresholds, ok := threshold.(ruletypes.BasicRuleThresholds)
if !ok || len(basicThresholds) == 0 {
return ruletypes.MatchTypeNone, ruletypes.CompareOpNone, false
}
// Use first threshold's MatchType and CompareOp (all thresholds share the same values)
matchType := basicThresholds[0].MatchType
compareOp := basicThresholds[0].CompareOp
return matchType, compareOp, true
}
// aggregationExpressionToTimeAggregation converts the aggregation expression to the corresponding time aggregation
// based on the expression
// if the expression is not a valid aggregation expression, it returns the unspecified time aggregation
// Note: Longer/more specific prefixes (e.g., "count_distinct") must be checked before shorter ones (e.g., "count")
func aggregationExpressionToTimeAggregation(expression string) metrictypes.TimeAggregation {
expression = strings.TrimSpace(strings.ToLower(expression))
switch {
case strings.HasPrefix(expression, "count_distinct"):
return metrictypes.TimeAggregationCountDistinct
case strings.HasPrefix(expression, "count"):
return metrictypes.TimeAggregationCount
case strings.HasPrefix(expression, "min"):
return metrictypes.TimeAggregationMin
case strings.HasPrefix(expression, "max"):
return metrictypes.TimeAggregationMax
case strings.HasPrefix(expression, "avg"):
return metrictypes.TimeAggregationAvg
case strings.HasPrefix(expression, "sum"):
return metrictypes.TimeAggregationSum
case strings.HasPrefix(expression, "rate"):
return metrictypes.TimeAggregationRate
case strings.HasPrefix(expression, "increase"):
return metrictypes.TimeAggregationIncrease
case strings.HasPrefix(expression, "latest"):
return metrictypes.TimeAggregationLatest
default:
return metrictypes.TimeAggregationUnspecified
}
}
// extractTimeAggFromQuerySpec extracts the time aggregation from the query spec of the QueryEnvelope
func extractTimeAggFromQuerySpec(spec any) []metrictypes.TimeAggregation {
timeAggs := []metrictypes.TimeAggregation{}
// Extract the time aggregation from the query spec
// based on different types of query spec
switch spec := spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
for _, agg := range spec.Aggregations {
timeAggs = append(timeAggs, agg.TimeAggregation)
}
// the log and trace aggregations don't store the time aggregation directly but expression for the aggregation
// so we need to convert the expression to the corresponding time aggregation
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
for _, agg := range spec.Aggregations {
timeAggs = append(timeAggs, aggregationExpressionToTimeAggregation(agg.Expression))
}
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
for _, agg := range spec.Aggregations {
timeAggs = append(timeAggs, aggregationExpressionToTimeAggregation(agg.Expression))
}
}
return timeAggs
}
// isQuerySafe determines if a single query is safe to remove the eval delay.
// A query is safe only if it's a Builder query (with MetricAggregation, LogAggregation, or TraceAggregation type)
// and all its aggregations are safe.
func isQuerySafe(query qbtypes.QueryEnvelope, matchType ruletypes.MatchType, compareOp ruletypes.CompareOp) bool {
// We only handle Builder Queries for now
if query.Type != qbtypes.QueryTypeBuilder {
return false
}
// extract time aggregations from the query spec
timeAggs := extractTimeAggFromQuerySpec(query.Spec)
// A query must have at least one aggregation
if len(timeAggs) == 0 {
return false
}
// All aggregations in the query must be safe
for _, timeAgg := range timeAggs {
if !isAggregationSafe(timeAgg, matchType, compareOp) {
return false
}
}
return true
}
// isAggregationSafe checks if the aggregation is safe to remove the eval delay
func isAggregationSafe(timeAgg metrictypes.TimeAggregation, matchType ruletypes.MatchType, compareOp ruletypes.CompareOp) bool {
switch timeAgg {
case metrictypes.TimeAggregationMin:
// Group: Min, MinIf
// Value can only go down or remain same.
if matchType == ruletypes.AtleastOnce || matchType == ruletypes.AllTheTimes {
if compareOp == ruletypes.ValueIsBelow || compareOp == ruletypes.ValueBelowOrEq {
return true
}
}
case metrictypes.TimeAggregationMax:
// Group: Max, MaxIf
// Value can only go up or remain same.
if matchType == ruletypes.AtleastOnce || matchType == ruletypes.AllTheTimes {
if compareOp == ruletypes.ValueIsAbove || compareOp == ruletypes.ValueAboveOrEq {
return true
}
}
case metrictypes.TimeAggregationCount, metrictypes.TimeAggregationCountDistinct:
// Group: Count
// Value can only go up or remain same.
if matchType == ruletypes.AtleastOnce || matchType == ruletypes.AllTheTimes {
if compareOp == ruletypes.ValueIsAbove || compareOp == ruletypes.ValueAboveOrEq {
return true
}
}
// Other aggregations (Sum, Avg, Rate, etc.) are not safe.
}
return false
}

File diff suppressed because it is too large Load Diff

View File

@@ -154,6 +154,8 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
if err != nil {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "evaluation is invalid: %v", err)
}
// calculate eval delay based on rule config
evalDelay := CalculateEvalDelay(opts.Rule, opts.ManagerOpts.EvalDelay)
if opts.Rule.RuleType == ruletypes.RuleTypeThreshold {
// create a threshold rule
@@ -164,7 +166,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.Reader,
opts.Querier,
opts.SLogger,
WithEvalDelay(opts.ManagerOpts.EvalDelay),
WithEvalDelay(evalDelay),
WithSQLStore(opts.SQLStore),
)