Compare commits
12 Commits
main
...
feat/alert
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
96bbeb0f6e | ||
|
|
05a0ce64d3 | ||
|
|
500c5ecd8c | ||
|
|
2cd9f3b6a8 | ||
|
|
43eea4d2a0 | ||
|
|
a1bace9b14 | ||
|
|
473488a91a | ||
|
|
5ebbafcb30 | ||
|
|
735f9e8105 | ||
|
|
92caca2507 | ||
|
|
c15f91529c | ||
|
|
355863fed9 |
@@ -167,16 +167,9 @@ func (r *AnomalyRule) prepareQueryRange(ctx context.Context, ts time.Time) (*v3.
|
|||||||
ctx, "prepare query range request v4", "ts", ts.UnixMilli(), "eval_window", r.EvalWindow().Milliseconds(), "eval_delay", r.EvalDelay().Milliseconds(),
|
ctx, "prepare query range request v4", "ts", ts.UnixMilli(), "eval_window", r.EvalWindow().Milliseconds(), "eval_delay", r.EvalDelay().Milliseconds(),
|
||||||
)
|
)
|
||||||
|
|
||||||
start := ts.Add(-time.Duration(r.EvalWindow())).UnixMilli()
|
st, en := r.Timestamps(ts)
|
||||||
end := ts.UnixMilli()
|
start := st.UnixMilli()
|
||||||
|
end := en.UnixMilli()
|
||||||
if r.EvalDelay() > 0 {
|
|
||||||
start = start - int64(r.EvalDelay().Milliseconds())
|
|
||||||
end = end - int64(r.EvalDelay().Milliseconds())
|
|
||||||
}
|
|
||||||
// round to minute otherwise we could potentially miss data
|
|
||||||
start = start - (start % (60 * 1000))
|
|
||||||
end = end - (end % (60 * 1000))
|
|
||||||
|
|
||||||
compositeQuery := r.Condition().CompositeQuery
|
compositeQuery := r.Condition().CompositeQuery
|
||||||
|
|
||||||
@@ -253,9 +246,11 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
|
|||||||
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
|
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
|
||||||
|
|
||||||
for _, series := range queryResult.AnomalyScores {
|
for _, series := range queryResult.AnomalyScores {
|
||||||
smpl, shouldAlert := r.ShouldAlert(*series)
|
for _, threshold := range r.Thresholds() {
|
||||||
if shouldAlert {
|
smpl, shouldAlert := threshold.ShouldAlert(*series)
|
||||||
resultVector = append(resultVector, smpl)
|
if shouldAlert {
|
||||||
|
resultVector = append(resultVector, smpl)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return resultVector, nil
|
return resultVector, nil
|
||||||
@@ -296,9 +291,11 @@ func (r *AnomalyRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID,
|
|||||||
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
|
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
|
||||||
|
|
||||||
for _, series := range queryResult.AnomalyScores {
|
for _, series := range queryResult.AnomalyScores {
|
||||||
smpl, shouldAlert := r.ShouldAlert(*series)
|
for _, threshold := range r.Thresholds() {
|
||||||
if shouldAlert {
|
smpl, shouldAlert := threshold.ShouldAlert(*series)
|
||||||
resultVector = append(resultVector, smpl)
|
if shouldAlert {
|
||||||
|
resultVector = append(resultVector, smpl)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return resultVector, nil
|
return resultVector, nil
|
||||||
|
|||||||
@@ -41,7 +41,9 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
|||||||
|
|
||||||
// create ch rule task for evalution
|
// create ch rule task for evalution
|
||||||
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||||
|
if tr.IsScheduled() {
|
||||||
|
task.SetSchedule(tr.GetSchedule())
|
||||||
|
}
|
||||||
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
|
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
|
||||||
|
|
||||||
// create promql rule
|
// create promql rule
|
||||||
@@ -63,7 +65,9 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
|||||||
|
|
||||||
// create promql rule task for evalution
|
// create promql rule task for evalution
|
||||||
task = newTask(baserules.TaskTypeProm, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
task = newTask(baserules.TaskTypeProm, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||||
|
if pr.IsScheduled() {
|
||||||
|
task.SetSchedule(pr.GetSchedule())
|
||||||
|
}
|
||||||
} else if opts.Rule.RuleType == ruletypes.RuleTypeAnomaly {
|
} else if opts.Rule.RuleType == ruletypes.RuleTypeAnomaly {
|
||||||
// create anomaly rule
|
// create anomaly rule
|
||||||
ar, err := NewAnomalyRule(
|
ar, err := NewAnomalyRule(
|
||||||
@@ -85,7 +89,9 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
|||||||
|
|
||||||
// create anomaly rule task for evalution
|
// create anomaly rule task for evalution
|
||||||
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||||
|
if ar.IsScheduled() {
|
||||||
|
task.SetSchedule(ar.GetSchedule())
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
|
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
|
||||||
}
|
}
|
||||||
|
|||||||
1
go.mod
1
go.mod
@@ -53,6 +53,7 @@ require (
|
|||||||
github.com/spf13/cobra v1.9.1
|
github.com/spf13/cobra v1.9.1
|
||||||
github.com/srikanthccv/ClickHouse-go-mock v0.12.0
|
github.com/srikanthccv/ClickHouse-go-mock v0.12.0
|
||||||
github.com/stretchr/testify v1.10.0
|
github.com/stretchr/testify v1.10.0
|
||||||
|
github.com/teambition/rrule-go v1.8.2
|
||||||
github.com/tidwall/gjson v1.18.0
|
github.com/tidwall/gjson v1.18.0
|
||||||
github.com/uptrace/bun v1.2.9
|
github.com/uptrace/bun v1.2.9
|
||||||
github.com/uptrace/bun/dialect/pgdialect v1.2.9
|
github.com/uptrace/bun/dialect/pgdialect v1.2.9
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -918,6 +918,8 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8
|
|||||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
|
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
|
||||||
|
github.com/teambition/rrule-go v1.8.2 h1:lIjpjvWTj9fFUZCmuoVDrKVOtdiyzbzc93qTmRVe/J8=
|
||||||
|
github.com/teambition/rrule-go v1.8.2/go.mod h1:Ieq5AbrKGciP1V//Wq8ktsTXwSwJHDD5mD/wLBGl3p4=
|
||||||
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
|
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
|
||||||
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||||
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
|
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
|
||||||
|
|||||||
@@ -85,6 +85,11 @@ type BaseRule struct {
|
|||||||
TemporalityMap map[string]map[v3.Temporality]bool
|
TemporalityMap map[string]map[v3.Temporality]bool
|
||||||
|
|
||||||
sqlstore sqlstore.SQLStore
|
sqlstore sqlstore.SQLStore
|
||||||
|
|
||||||
|
evaluation ruletypes.Evaluation
|
||||||
|
|
||||||
|
schedule string
|
||||||
|
scheduleStartsAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
type RuleOption func(*BaseRule)
|
type RuleOption func(*BaseRule)
|
||||||
@@ -139,6 +144,8 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader
|
|||||||
Active: map[uint64]*ruletypes.Alert{},
|
Active: map[uint64]*ruletypes.Alert{},
|
||||||
reader: reader,
|
reader: reader,
|
||||||
TemporalityMap: make(map[string]map[v3.Temporality]bool),
|
TemporalityMap: make(map[string]map[v3.Temporality]bool),
|
||||||
|
evaluation: p.Evaluation,
|
||||||
|
schedule: p.Schedule,
|
||||||
}
|
}
|
||||||
|
|
||||||
if baseRule.evalWindow == 0 {
|
if baseRule.evalWindow == 0 {
|
||||||
@@ -210,6 +217,18 @@ func (r *BaseRule) TargetVal() float64 {
|
|||||||
return r.targetVal()
|
return r.targetVal()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *BaseRule) Thresholds() []ruletypes.RuleThreshold {
|
||||||
|
return r.ruleCondition.Thresholds
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *BaseRule) IsScheduled() bool {
|
||||||
|
return r.schedule != ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *BaseRule) GetSchedule() (string, time.Time) {
|
||||||
|
return r.schedule, r.scheduleStartsAt
|
||||||
|
}
|
||||||
|
|
||||||
func (r *ThresholdRule) hostFromSource() string {
|
func (r *ThresholdRule) hostFromSource() string {
|
||||||
parsedUrl, err := url.Parse(r.source)
|
parsedUrl, err := url.Parse(r.source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -241,8 +260,10 @@ func (r *BaseRule) Unit() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *BaseRule) Timestamps(ts time.Time) (time.Time, time.Time) {
|
func (r *BaseRule) Timestamps(ts time.Time) (time.Time, time.Time) {
|
||||||
start := ts.Add(-time.Duration(r.evalWindow)).UnixMilli()
|
|
||||||
end := ts.UnixMilli()
|
st, en := r.evaluation.EvaluationTime(ts)
|
||||||
|
start := st.UnixMilli()
|
||||||
|
end := en.UnixMilli()
|
||||||
|
|
||||||
if r.evalDelay > 0 {
|
if r.evalDelay > 0 {
|
||||||
start = start - int64(r.evalDelay.Milliseconds())
|
start = start - int64(r.evalDelay.Milliseconds())
|
||||||
|
|||||||
@@ -168,7 +168,9 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
|||||||
|
|
||||||
// create ch rule task for evalution
|
// create ch rule task for evalution
|
||||||
task = newTask(TaskTypeCh, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
task = newTask(TaskTypeCh, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||||
|
if tr.IsScheduled() {
|
||||||
|
task.SetSchedule(tr.GetSchedule())
|
||||||
|
}
|
||||||
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
|
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
|
||||||
|
|
||||||
// create promql rule
|
// create promql rule
|
||||||
@@ -190,7 +192,9 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
|||||||
|
|
||||||
// create promql rule task for evalution
|
// create promql rule task for evalution
|
||||||
task = newTask(TaskTypeProm, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
task = newTask(TaskTypeProm, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||||
|
if pr.IsScheduled() {
|
||||||
|
task.SetSchedule(pr.GetSchedule())
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
|
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -125,8 +125,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
|
|||||||
|
|
||||||
prevState := r.State()
|
prevState := r.State()
|
||||||
|
|
||||||
start := ts.Add(-r.evalWindow)
|
start, end := r.Timestamps(ts)
|
||||||
end := ts
|
|
||||||
interval := 60 * time.Second // TODO(srikanthccv): this should be configurable
|
interval := 60 * time.Second // TODO(srikanthccv): this should be configurable
|
||||||
|
|
||||||
valueFormatter := formatter.FromUnit(r.Unit())
|
valueFormatter := formatter.FromUnit(r.Unit())
|
||||||
@@ -151,84 +150,86 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
|
|||||||
var alerts = make(map[uint64]*ruletypes.Alert, len(res))
|
var alerts = make(map[uint64]*ruletypes.Alert, len(res))
|
||||||
|
|
||||||
for _, series := range res {
|
for _, series := range res {
|
||||||
l := make(map[string]string, len(series.Metric))
|
for _, ruleThreshold := range r.Thresholds() {
|
||||||
for _, lbl := range series.Metric {
|
l := make(map[string]string, len(series.Metric))
|
||||||
l[lbl.Name] = lbl.Value
|
for _, lbl := range series.Metric {
|
||||||
}
|
l[lbl.Name] = lbl.Value
|
||||||
|
|
||||||
if len(series.Floats) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
alertSmpl, shouldAlert := r.ShouldAlert(toCommonSeries(series))
|
|
||||||
if !shouldAlert {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
r.logger.DebugContext(ctx, "alerting for series", "rule_name", r.Name(), "series", series)
|
|
||||||
|
|
||||||
threshold := valueFormatter.Format(r.targetVal(), r.Unit())
|
|
||||||
|
|
||||||
tmplData := ruletypes.AlertTemplateData(l, valueFormatter.Format(alertSmpl.V, r.Unit()), threshold)
|
|
||||||
// Inject some convenience variables that are easier to remember for users
|
|
||||||
// who are not used to Go's templating system.
|
|
||||||
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
|
|
||||||
|
|
||||||
expand := func(text string) string {
|
|
||||||
|
|
||||||
tmpl := ruletypes.NewTemplateExpander(
|
|
||||||
ctx,
|
|
||||||
defs+text,
|
|
||||||
"__alert_"+r.Name(),
|
|
||||||
tmplData,
|
|
||||||
times.Time(timestamp.FromTime(ts)),
|
|
||||||
nil,
|
|
||||||
)
|
|
||||||
result, err := tmpl.Expand()
|
|
||||||
if err != nil {
|
|
||||||
result = fmt.Sprintf("<error expanding template: %s>", err)
|
|
||||||
r.logger.WarnContext(ctx, "Expanding alert template failed", "rule_name", r.Name(), "error", err, "data", tmplData)
|
|
||||||
}
|
}
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
lb := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel)
|
if len(series.Floats) == 0 {
|
||||||
resultLabels := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel).Labels()
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
for name, value := range r.labels.Map() {
|
alertSmpl, shouldAlert := ruleThreshold.ShouldAlert(toCommonSeries(series))
|
||||||
lb.Set(name, expand(value))
|
if !shouldAlert {
|
||||||
}
|
continue
|
||||||
|
}
|
||||||
|
r.logger.DebugContext(ctx, "alerting for series", "rule_name", r.Name(), "series", series)
|
||||||
|
|
||||||
lb.Set(qslabels.AlertNameLabel, r.Name())
|
threshold := valueFormatter.Format(r.targetVal(), r.Unit())
|
||||||
lb.Set(qslabels.AlertRuleIdLabel, r.ID())
|
|
||||||
lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL())
|
|
||||||
|
|
||||||
annotations := make(qslabels.Labels, 0, len(r.annotations.Map()))
|
tmplData := ruletypes.AlertTemplateData(l, valueFormatter.Format(alertSmpl.V, r.Unit()), threshold)
|
||||||
for name, value := range r.annotations.Map() {
|
// Inject some convenience variables that are easier to remember for users
|
||||||
annotations = append(annotations, qslabels.Label{Name: name, Value: expand(value)})
|
// who are not used to Go's templating system.
|
||||||
}
|
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
|
||||||
|
|
||||||
lbs := lb.Labels()
|
expand := func(text string) string {
|
||||||
h := lbs.Hash()
|
|
||||||
resultFPs[h] = struct{}{}
|
|
||||||
|
|
||||||
if _, ok := alerts[h]; ok {
|
tmpl := ruletypes.NewTemplateExpander(
|
||||||
err = fmt.Errorf("vector contains metrics with the same labelset after applying alert labels")
|
ctx,
|
||||||
// We have already acquired the lock above hence using SetHealth and
|
defs+text,
|
||||||
// SetLastError will deadlock.
|
"__alert_"+r.Name(),
|
||||||
r.health = ruletypes.HealthBad
|
tmplData,
|
||||||
r.lastError = err
|
times.Time(timestamp.FromTime(ts)),
|
||||||
return nil, err
|
nil,
|
||||||
}
|
)
|
||||||
|
result, err := tmpl.Expand()
|
||||||
|
if err != nil {
|
||||||
|
result = fmt.Sprintf("<error expanding template: %s>", err)
|
||||||
|
r.logger.WarnContext(ctx, "Expanding alert template failed", "rule_name", r.Name(), "error", err, "data", tmplData)
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
alerts[h] = &ruletypes.Alert{
|
lb := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel)
|
||||||
Labels: lbs,
|
resultLabels := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel).Labels()
|
||||||
QueryResultLables: resultLabels,
|
|
||||||
Annotations: annotations,
|
for name, value := range r.labels.Map() {
|
||||||
ActiveAt: ts,
|
lb.Set(name, expand(value))
|
||||||
State: model.StatePending,
|
}
|
||||||
Value: alertSmpl.V,
|
|
||||||
GeneratorURL: r.GeneratorURL(),
|
lb.Set(qslabels.AlertNameLabel, r.Name())
|
||||||
Receivers: r.preferredChannels,
|
lb.Set(qslabels.AlertRuleIdLabel, r.ID())
|
||||||
|
lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL())
|
||||||
|
|
||||||
|
annotations := make(qslabels.Labels, 0, len(r.annotations.Map()))
|
||||||
|
for name, value := range r.annotations.Map() {
|
||||||
|
annotations = append(annotations, qslabels.Label{Name: name, Value: expand(value)})
|
||||||
|
}
|
||||||
|
|
||||||
|
lbs := lb.Labels()
|
||||||
|
h := lbs.Hash()
|
||||||
|
resultFPs[h] = struct{}{}
|
||||||
|
|
||||||
|
if _, ok := alerts[h]; ok {
|
||||||
|
err = fmt.Errorf("vector contains metrics with the same labelset after applying alert labels")
|
||||||
|
// We have already acquired the lock above hence using SetHealth and
|
||||||
|
// SetLastError will deadlock.
|
||||||
|
r.health = ruletypes.HealthBad
|
||||||
|
r.lastError = err
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
alerts[h] = &ruletypes.Alert{
|
||||||
|
Labels: lbs,
|
||||||
|
QueryResultLables: resultLabels,
|
||||||
|
Annotations: annotations,
|
||||||
|
ActiveAt: ts,
|
||||||
|
State: model.StatePending,
|
||||||
|
Value: alertSmpl.V,
|
||||||
|
GeneratorURL: r.GeneratorURL(),
|
||||||
|
Receivers: r.preferredChannels,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package rules
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/teambition/rrule-go"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -39,6 +40,8 @@ type PromRuleTask struct {
|
|||||||
|
|
||||||
maintenanceStore ruletypes.MaintenanceStore
|
maintenanceStore ruletypes.MaintenanceStore
|
||||||
orgID valuer.UUID
|
orgID valuer.UUID
|
||||||
|
schedule string
|
||||||
|
scheduleStartsAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// newPromRuleTask holds rules that have promql condition
|
// newPromRuleTask holds rules that have promql condition
|
||||||
@@ -75,6 +78,10 @@ func (g *PromRuleTask) Key() string {
|
|||||||
return g.name + ";" + g.file
|
return g.name + ";" + g.file
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *PromRuleTask) IsCronSchedule() bool {
|
||||||
|
return g.schedule != ""
|
||||||
|
}
|
||||||
|
|
||||||
func (g *PromRuleTask) Type() TaskType { return TaskTypeProm }
|
func (g *PromRuleTask) Type() TaskType { return TaskTypeProm }
|
||||||
|
|
||||||
// Rules returns the group's rules.
|
// Rules returns the group's rules.
|
||||||
@@ -91,38 +98,6 @@ func (g *PromRuleTask) Pause(b bool) {
|
|||||||
|
|
||||||
func (g *PromRuleTask) Run(ctx context.Context) {
|
func (g *PromRuleTask) Run(ctx context.Context) {
|
||||||
defer close(g.terminated)
|
defer close(g.terminated)
|
||||||
|
|
||||||
// Wait an initial amount to have consistently slotted intervals.
|
|
||||||
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
|
|
||||||
select {
|
|
||||||
case <-time.After(time.Until(evalTimestamp)):
|
|
||||||
case <-g.done:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
|
||||||
"ruleGroup": map[string]string{
|
|
||||||
"name": g.Name(),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
iter := func() {
|
|
||||||
|
|
||||||
start := time.Now()
|
|
||||||
g.Eval(ctx, evalTimestamp)
|
|
||||||
timeSinceStart := time.Since(start)
|
|
||||||
|
|
||||||
g.setEvaluationTime(timeSinceStart)
|
|
||||||
g.setLastEvaluation(start)
|
|
||||||
}
|
|
||||||
|
|
||||||
// The assumption here is that since the ticker was started after having
|
|
||||||
// waited for `evalTimestamp` to pass, the ticks will trigger soon
|
|
||||||
// after each `evalTimestamp + N * g.frequency` occurrence.
|
|
||||||
tick := time.NewTicker(g.frequency)
|
|
||||||
defer tick.Stop()
|
|
||||||
|
|
||||||
// defer cleanup
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if !g.markStale {
|
if !g.markStale {
|
||||||
return
|
return
|
||||||
@@ -139,22 +114,114 @@ func (g *PromRuleTask) Run(ctx context.Context) {
|
|||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
}()
|
}()
|
||||||
|
if g.IsCronSchedule() {
|
||||||
|
schedule, err := rrule.StrToRRule("DTSTART=" + g.scheduleStartsAt.UTC().Format("20060102T150405Z") + "\nRRULE:" + g.schedule) // assuming g.cronExpr contains the cron expression
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("failed to parse rrule expression", zap.String("rrule", g.schedule), zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
nextRun := schedule.After(now, false)
|
||||||
|
|
||||||
iter()
|
|
||||||
|
|
||||||
// let the group iterate and run
|
|
||||||
for {
|
|
||||||
select {
|
select {
|
||||||
|
case <-time.After(time.Until(nextRun)):
|
||||||
case <-g.done:
|
case <-g.done:
|
||||||
return
|
return
|
||||||
default:
|
}
|
||||||
|
|
||||||
|
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||||
|
"ruleRuleTask": map[string]string{
|
||||||
|
"name": g.Name(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
iter := func() {
|
||||||
|
if g.pause {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
start := time.Now()
|
||||||
|
g.Eval(ctx, start) // using current time instead of evalTimestamp
|
||||||
|
timeSinceStart := time.Since(start)
|
||||||
|
|
||||||
|
g.setEvaluationTime(timeSinceStart)
|
||||||
|
g.setLastEvaluation(start)
|
||||||
|
}
|
||||||
|
|
||||||
|
iter()
|
||||||
|
currentRun := nextRun
|
||||||
|
|
||||||
|
for {
|
||||||
|
// Calculate the next run time
|
||||||
|
nextRun = schedule.After(currentRun, false)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-g.done:
|
case <-g.done:
|
||||||
return
|
return
|
||||||
case <-tick.C:
|
default:
|
||||||
missed := (time.Since(evalTimestamp) / g.frequency) - 1
|
select {
|
||||||
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
|
case <-g.done:
|
||||||
iter()
|
return
|
||||||
|
case <-time.After(time.Until(nextRun)):
|
||||||
|
// Check if we missed any scheduled runs
|
||||||
|
now := time.Now()
|
||||||
|
if now.After(nextRun.Add(time.Minute)) { // Allow 1 minute tolerance
|
||||||
|
zap.L().Warn("missed scheduled run",
|
||||||
|
zap.Time("scheduled", nextRun),
|
||||||
|
zap.Time("actual", now))
|
||||||
|
}
|
||||||
|
|
||||||
|
currentRun = nextRun
|
||||||
|
iter()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Wait an initial amount to have consistently slotted intervals.
|
||||||
|
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Until(evalTimestamp)):
|
||||||
|
case <-g.done:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||||
|
"ruleGroup": map[string]string{
|
||||||
|
"name": g.Name(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
iter := func() {
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
g.Eval(ctx, evalTimestamp)
|
||||||
|
timeSinceStart := time.Since(start)
|
||||||
|
|
||||||
|
g.setEvaluationTime(timeSinceStart)
|
||||||
|
g.setLastEvaluation(start)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The assumption here is that since the ticker was started after having
|
||||||
|
// waited for `evalTimestamp` to pass, the ticks will trigger soon
|
||||||
|
// after each `evalTimestamp + N * g.frequency` occurrence.
|
||||||
|
tick := time.NewTicker(g.frequency)
|
||||||
|
defer tick.Stop()
|
||||||
|
|
||||||
|
iter()
|
||||||
|
|
||||||
|
// let the group iterate and run
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-g.done:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
select {
|
||||||
|
case <-g.done:
|
||||||
|
return
|
||||||
|
case <-tick.C:
|
||||||
|
missed := (time.Since(evalTimestamp) / g.frequency) - 1
|
||||||
|
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
|
||||||
|
iter()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -245,6 +312,13 @@ func (g *PromRuleTask) setLastEvaluation(ts time.Time) {
|
|||||||
g.lastEvaluation = ts
|
g.lastEvaluation = ts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *PromRuleTask) SetSchedule(schedule string, t time.Time) {
|
||||||
|
g.mtx.Lock()
|
||||||
|
defer g.mtx.Unlock()
|
||||||
|
g.schedule = schedule
|
||||||
|
g.scheduleStartsAt = t
|
||||||
|
}
|
||||||
|
|
||||||
// EvalTimestamp returns the immediately preceding consistently slotted evaluation time.
|
// EvalTimestamp returns the immediately preceding consistently slotted evaluation time.
|
||||||
func (g *PromRuleTask) EvalTimestamp(startTime int64) time.Time {
|
func (g *PromRuleTask) EvalTimestamp(startTime int64) time.Time {
|
||||||
var (
|
var (
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||||
"github.com/SigNoz/signoz/pkg/valuer"
|
"github.com/SigNoz/signoz/pkg/valuer"
|
||||||
opentracing "github.com/opentracing/opentracing-go"
|
opentracing "github.com/opentracing/opentracing-go"
|
||||||
|
"github.com/teambition/rrule-go"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -36,6 +37,10 @@ type RuleTask struct {
|
|||||||
|
|
||||||
maintenanceStore ruletypes.MaintenanceStore
|
maintenanceStore ruletypes.MaintenanceStore
|
||||||
orgID valuer.UUID
|
orgID valuer.UUID
|
||||||
|
|
||||||
|
// New field for rrule-based scheduling
|
||||||
|
schedule string
|
||||||
|
scheduleStartsAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
const DefaultFrequency = 1 * time.Minute
|
const DefaultFrequency = 1 * time.Minute
|
||||||
@@ -71,6 +76,10 @@ func (g *RuleTask) Key() string {
|
|||||||
return g.name + ";" + g.file
|
return g.name + ";" + g.file
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *RuleTask) IsCronSchedule() bool {
|
||||||
|
return g.schedule != ""
|
||||||
|
}
|
||||||
|
|
||||||
// Name returns the group name.
|
// Name returns the group name.
|
||||||
func (g *RuleTask) Type() TaskType { return TaskTypeCh }
|
func (g *RuleTask) Type() TaskType { return TaskTypeCh }
|
||||||
|
|
||||||
@@ -95,56 +104,119 @@ func NewQueryOriginContext(ctx context.Context, data map[string]interface{}) con
|
|||||||
func (g *RuleTask) Run(ctx context.Context) {
|
func (g *RuleTask) Run(ctx context.Context) {
|
||||||
defer close(g.terminated)
|
defer close(g.terminated)
|
||||||
|
|
||||||
// Wait an initial amount to have consistently slotted intervals.
|
if g.IsCronSchedule() {
|
||||||
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
|
schedule, err := rrule.StrToRRule("DTSTART=" + g.scheduleStartsAt.UTC().Format("20060102T150405Z") + "\nRRULE:" + g.schedule) // assuming g.cronExpr contains the cron expression
|
||||||
zap.L().Debug("group run to begin at", zap.Time("evalTimestamp", evalTimestamp))
|
if err != nil {
|
||||||
select {
|
zap.L().Error("failed to parse rrule expression", zap.String("rrule", g.schedule), zap.Error(err))
|
||||||
case <-time.After(time.Until(evalTimestamp)):
|
|
||||||
case <-g.done:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
|
||||||
"ruleRuleTask": map[string]string{
|
|
||||||
"name": g.Name(),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
iter := func() {
|
|
||||||
if g.pause {
|
|
||||||
// todo(amol): remove in memory active alerts
|
|
||||||
// and last series state
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
start := time.Now()
|
now := time.Now()
|
||||||
g.Eval(ctx, evalTimestamp)
|
nextRun := schedule.After(now, false)
|
||||||
timeSinceStart := time.Since(start)
|
|
||||||
|
|
||||||
g.setEvaluationTime(timeSinceStart)
|
|
||||||
g.setLastEvaluation(start)
|
|
||||||
}
|
|
||||||
|
|
||||||
// The assumption here is that since the ticker was started after having
|
|
||||||
// waited for `evalTimestamp` to pass, the ticks will trigger soon
|
|
||||||
// after each `evalTimestamp + N * g.frequency` occurrence.
|
|
||||||
tick := time.NewTicker(g.frequency)
|
|
||||||
defer tick.Stop()
|
|
||||||
|
|
||||||
iter()
|
|
||||||
|
|
||||||
// let the group iterate and run
|
|
||||||
for {
|
|
||||||
select {
|
select {
|
||||||
|
case <-time.After(time.Until(nextRun)):
|
||||||
case <-g.done:
|
case <-g.done:
|
||||||
return
|
return
|
||||||
default:
|
}
|
||||||
|
|
||||||
|
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||||
|
"ruleRuleTask": map[string]string{
|
||||||
|
"name": g.Name(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
iter := func() {
|
||||||
|
if g.pause {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
start := time.Now()
|
||||||
|
g.Eval(ctx, start) // using current time instead of evalTimestamp
|
||||||
|
timeSinceStart := time.Since(start)
|
||||||
|
|
||||||
|
g.setEvaluationTime(timeSinceStart)
|
||||||
|
g.setLastEvaluation(start)
|
||||||
|
}
|
||||||
|
|
||||||
|
iter()
|
||||||
|
currentRun := nextRun
|
||||||
|
|
||||||
|
for {
|
||||||
|
// Calculate the next run time
|
||||||
|
nextRun = schedule.After(currentRun, false)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-g.done:
|
case <-g.done:
|
||||||
return
|
return
|
||||||
case <-tick.C:
|
default:
|
||||||
missed := (time.Since(evalTimestamp) / g.frequency) - 1
|
select {
|
||||||
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
|
case <-g.done:
|
||||||
iter()
|
return
|
||||||
|
case <-time.After(time.Until(nextRun)):
|
||||||
|
// Check if we missed any scheduled runs
|
||||||
|
now := time.Now()
|
||||||
|
if now.After(nextRun.Add(time.Minute)) { // Allow 1 minute tolerance
|
||||||
|
zap.L().Warn("missed scheduled run",
|
||||||
|
zap.Time("scheduled", nextRun),
|
||||||
|
zap.Time("actual", now))
|
||||||
|
}
|
||||||
|
|
||||||
|
currentRun = nextRun
|
||||||
|
iter()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Wait an initial amount to have consistently slotted intervals.
|
||||||
|
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
|
||||||
|
zap.L().Debug("group run to begin at", zap.Time("evalTimestamp", evalTimestamp))
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Until(evalTimestamp)):
|
||||||
|
case <-g.done:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||||
|
"ruleRuleTask": map[string]string{
|
||||||
|
"name": g.Name(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
iter := func() {
|
||||||
|
if g.pause {
|
||||||
|
// todo(amol): remove in memory active alerts
|
||||||
|
// and last series state
|
||||||
|
return
|
||||||
|
}
|
||||||
|
start := time.Now()
|
||||||
|
g.Eval(ctx, evalTimestamp)
|
||||||
|
timeSinceStart := time.Since(start)
|
||||||
|
|
||||||
|
g.setEvaluationTime(timeSinceStart)
|
||||||
|
g.setLastEvaluation(start)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The assumption here is that since the ticker was started after having
|
||||||
|
// waited for `evalTimestamp` to pass, the ticks will trigger soon
|
||||||
|
// after each `evalTimestamp + N * g.frequency` occurrence.
|
||||||
|
tick := time.NewTicker(g.frequency)
|
||||||
|
defer tick.Stop()
|
||||||
|
|
||||||
|
iter()
|
||||||
|
|
||||||
|
// let the group iterate and run
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-g.done:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
select {
|
||||||
|
case <-g.done:
|
||||||
|
return
|
||||||
|
case <-tick.C:
|
||||||
|
missed := (time.Since(evalTimestamp) / g.frequency) - 1
|
||||||
|
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
|
||||||
|
iter()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -298,6 +370,13 @@ func (g *RuleTask) CopyState(fromTask Task) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *RuleTask) SetSchedule(schedule string, t time.Time) {
|
||||||
|
g.mtx.Lock()
|
||||||
|
defer g.mtx.Unlock()
|
||||||
|
g.schedule = schedule
|
||||||
|
g.scheduleStartsAt = t
|
||||||
|
}
|
||||||
|
|
||||||
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
|
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
|
||||||
func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
|
func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
|
||||||
|
|
||||||
@@ -379,3 +458,41 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
|
|||||||
}(i, rule)
|
}(i, rule)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Helper to convert ruletypes.Schedule/Recurrence to rrule.ROption
|
||||||
|
func recurrenceToROption(s *ruletypes.Schedule) rrule.ROption {
|
||||||
|
// Only basic mapping for daily/weekly/monthly, can be extended
|
||||||
|
opt := rrule.ROption{
|
||||||
|
Dtstart: s.Recurrence.StartTime,
|
||||||
|
}
|
||||||
|
switch s.Recurrence.RepeatType {
|
||||||
|
case ruletypes.RepeatTypeDaily:
|
||||||
|
opt.Freq = rrule.DAILY
|
||||||
|
case ruletypes.RepeatTypeWeekly:
|
||||||
|
opt.Freq = rrule.WEEKLY
|
||||||
|
for _, day := range s.Recurrence.RepeatOn {
|
||||||
|
switch day {
|
||||||
|
case ruletypes.RepeatOnSunday:
|
||||||
|
opt.Byweekday = append(opt.Byweekday, rrule.SU)
|
||||||
|
case ruletypes.RepeatOnMonday:
|
||||||
|
opt.Byweekday = append(opt.Byweekday, rrule.MO)
|
||||||
|
case ruletypes.RepeatOnTuesday:
|
||||||
|
opt.Byweekday = append(opt.Byweekday, rrule.TU)
|
||||||
|
case ruletypes.RepeatOnWednesday:
|
||||||
|
opt.Byweekday = append(opt.Byweekday, rrule.WE)
|
||||||
|
case ruletypes.RepeatOnThursday:
|
||||||
|
opt.Byweekday = append(opt.Byweekday, rrule.TH)
|
||||||
|
case ruletypes.RepeatOnFriday:
|
||||||
|
opt.Byweekday = append(opt.Byweekday, rrule.FR)
|
||||||
|
case ruletypes.RepeatOnSaturday:
|
||||||
|
opt.Byweekday = append(opt.Byweekday, rrule.SA)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case ruletypes.RepeatTypeMonthly:
|
||||||
|
opt.Freq = rrule.MONTHLY
|
||||||
|
}
|
||||||
|
if s.Recurrence.EndTime != nil {
|
||||||
|
opt.Until = *s.Recurrence.EndTime
|
||||||
|
}
|
||||||
|
return opt
|
||||||
|
}
|
||||||
|
|||||||
@@ -28,6 +28,8 @@ type Task interface {
|
|||||||
Rules() []Rule
|
Rules() []Rule
|
||||||
Stop()
|
Stop()
|
||||||
Pause(b bool)
|
Pause(b bool)
|
||||||
|
IsCronSchedule() bool
|
||||||
|
SetSchedule(string, time.Time)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newTask returns an appropriate group for
|
// newTask returns an appropriate group for
|
||||||
|
|||||||
@@ -479,9 +479,11 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, series := range queryResult.Series {
|
for _, series := range queryResult.Series {
|
||||||
smpl, shouldAlert := r.ShouldAlert(*series)
|
for _, threshold := range r.Thresholds() {
|
||||||
if shouldAlert {
|
smpl, shouldAlert := threshold.ShouldAlert(*series)
|
||||||
resultVector = append(resultVector, smpl)
|
if shouldAlert {
|
||||||
|
resultVector = append(resultVector, smpl)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -549,9 +551,11 @@ func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUI
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, series := range queryResult.Series {
|
for _, series := range queryResult.Series {
|
||||||
smpl, shouldAlert := r.ShouldAlert(*series)
|
for _, threshold := range r.Thresholds() {
|
||||||
if shouldAlert {
|
smpl, shouldAlert := threshold.ShouldAlert(*series)
|
||||||
resultVector = append(resultVector, smpl)
|
if shouldAlert {
|
||||||
|
resultVector = append(resultVector, smpl)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -592,6 +596,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
|
|||||||
}
|
}
|
||||||
|
|
||||||
value := valueFormatter.Format(smpl.V, r.Unit())
|
value := valueFormatter.Format(smpl.V, r.Unit())
|
||||||
|
//todo(aniket): handle different threshold
|
||||||
threshold := valueFormatter.Format(r.targetVal(), r.Unit())
|
threshold := valueFormatter.Format(r.targetVal(), r.Unit())
|
||||||
r.logger.DebugContext(ctx, "Alert template data for rule", "rule_name", r.Name(), "formatter", valueFormatter.Name(), "value", value, "threshold", threshold)
|
r.logger.DebugContext(ctx, "Alert template data for rule", "rule_name", r.Name(), "formatter", valueFormatter.Name(), "value", value, "threshold", threshold)
|
||||||
|
|
||||||
|
|||||||
@@ -870,6 +870,10 @@ func TestPrepareLinksToLogs(t *testing.T) {
|
|||||||
RuleType: ruletypes.RuleTypeThreshold,
|
RuleType: ruletypes.RuleTypeThreshold,
|
||||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
Evaluation: &ruletypes.RollingWindow{
|
||||||
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
},
|
||||||
RuleCondition: &ruletypes.RuleCondition{
|
RuleCondition: &ruletypes.RuleCondition{
|
||||||
CompositeQuery: &v3.CompositeQuery{
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
QueryType: v3.QueryTypeBuilder,
|
QueryType: v3.QueryTypeBuilder,
|
||||||
@@ -913,6 +917,10 @@ func TestPrepareLinksToLogsV5(t *testing.T) {
|
|||||||
RuleType: ruletypes.RuleTypeThreshold,
|
RuleType: ruletypes.RuleTypeThreshold,
|
||||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
Evaluation: &ruletypes.RollingWindow{
|
||||||
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
},
|
||||||
RuleCondition: &ruletypes.RuleCondition{
|
RuleCondition: &ruletypes.RuleCondition{
|
||||||
CompositeQuery: &v3.CompositeQuery{
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
QueryType: v3.QueryTypeBuilder,
|
QueryType: v3.QueryTypeBuilder,
|
||||||
@@ -963,6 +971,10 @@ func TestPrepareLinksToTracesV5(t *testing.T) {
|
|||||||
RuleType: ruletypes.RuleTypeThreshold,
|
RuleType: ruletypes.RuleTypeThreshold,
|
||||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
Evaluation: &ruletypes.RollingWindow{
|
||||||
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
},
|
||||||
RuleCondition: &ruletypes.RuleCondition{
|
RuleCondition: &ruletypes.RuleCondition{
|
||||||
CompositeQuery: &v3.CompositeQuery{
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
QueryType: v3.QueryTypeBuilder,
|
QueryType: v3.QueryTypeBuilder,
|
||||||
@@ -1013,6 +1025,10 @@ func TestPrepareLinksToTraces(t *testing.T) {
|
|||||||
RuleType: ruletypes.RuleTypeThreshold,
|
RuleType: ruletypes.RuleTypeThreshold,
|
||||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
Evaluation: &ruletypes.RollingWindow{
|
||||||
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
},
|
||||||
RuleCondition: &ruletypes.RuleCondition{
|
RuleCondition: &ruletypes.RuleCondition{
|
||||||
CompositeQuery: &v3.CompositeQuery{
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
QueryType: v3.QueryTypeBuilder,
|
QueryType: v3.QueryTypeBuilder,
|
||||||
@@ -1141,6 +1157,10 @@ func TestThresholdRuleEvalDelay(t *testing.T) {
|
|||||||
RuleType: ruletypes.RuleTypeThreshold,
|
RuleType: ruletypes.RuleTypeThreshold,
|
||||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
Evaluation: &ruletypes.RollingWindow{
|
||||||
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
},
|
||||||
RuleCondition: &ruletypes.RuleCondition{
|
RuleCondition: &ruletypes.RuleCondition{
|
||||||
CompositeQuery: &v3.CompositeQuery{
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
QueryType: v3.QueryTypeClickHouseSQL,
|
QueryType: v3.QueryTypeClickHouseSQL,
|
||||||
@@ -1191,6 +1211,10 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) {
|
|||||||
RuleType: ruletypes.RuleTypeThreshold,
|
RuleType: ruletypes.RuleTypeThreshold,
|
||||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
Evaluation: &ruletypes.RollingWindow{
|
||||||
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
},
|
||||||
RuleCondition: &ruletypes.RuleCondition{
|
RuleCondition: &ruletypes.RuleCondition{
|
||||||
CompositeQuery: &v3.CompositeQuery{
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
QueryType: v3.QueryTypeClickHouseSQL,
|
QueryType: v3.QueryTypeClickHouseSQL,
|
||||||
@@ -1248,6 +1272,10 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
|||||||
RuleType: ruletypes.RuleTypeThreshold,
|
RuleType: ruletypes.RuleTypeThreshold,
|
||||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
Evaluation: &ruletypes.RollingWindow{
|
||||||
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
},
|
||||||
RuleCondition: &ruletypes.RuleCondition{
|
RuleCondition: &ruletypes.RuleCondition{
|
||||||
CompositeQuery: &v3.CompositeQuery{
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
QueryType: v3.QueryTypeBuilder,
|
QueryType: v3.QueryTypeBuilder,
|
||||||
@@ -1351,6 +1379,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
|||||||
postableRule.RuleCondition.Target = &c.target
|
postableRule.RuleCondition.Target = &c.target
|
||||||
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
|
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
|
||||||
postableRule.RuleCondition.TargetUnit = c.targetUnit
|
postableRule.RuleCondition.TargetUnit = c.targetUnit
|
||||||
|
postableRule.RuleCondition.Thresholds = []ruletypes.RuleThreshold{ruletypes.NewBasicRuleThreshold(postableRule.AlertName, &c.target, nil, ruletypes.MatchType(c.matchType), ruletypes.CompareOp(c.compareOp), postableRule.RuleCondition.SelectedQuery, c.targetUnit, postableRule.RuleCondition.CompositeQuery.Unit)}
|
||||||
postableRule.Annotations = map[string]string{
|
postableRule.Annotations = map[string]string{
|
||||||
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
|
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
|
||||||
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
||||||
@@ -1398,6 +1427,10 @@ func TestThresholdRuleNoData(t *testing.T) {
|
|||||||
RuleType: ruletypes.RuleTypeThreshold,
|
RuleType: ruletypes.RuleTypeThreshold,
|
||||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
Evaluation: &ruletypes.RollingWindow{
|
||||||
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
},
|
||||||
RuleCondition: &ruletypes.RuleCondition{
|
RuleCondition: &ruletypes.RuleCondition{
|
||||||
CompositeQuery: &v3.CompositeQuery{
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
QueryType: v3.QueryTypeBuilder,
|
QueryType: v3.QueryTypeBuilder,
|
||||||
@@ -1491,6 +1524,10 @@ func TestThresholdRuleTracesLink(t *testing.T) {
|
|||||||
RuleType: ruletypes.RuleTypeThreshold,
|
RuleType: ruletypes.RuleTypeThreshold,
|
||||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
Evaluation: &ruletypes.RollingWindow{
|
||||||
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
},
|
||||||
RuleCondition: &ruletypes.RuleCondition{
|
RuleCondition: &ruletypes.RuleCondition{
|
||||||
CompositeQuery: &v3.CompositeQuery{
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
QueryType: v3.QueryTypeBuilder,
|
QueryType: v3.QueryTypeBuilder,
|
||||||
@@ -1556,6 +1593,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
|
|||||||
postableRule.RuleCondition.Target = &c.target
|
postableRule.RuleCondition.Target = &c.target
|
||||||
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
|
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
|
||||||
postableRule.RuleCondition.TargetUnit = c.targetUnit
|
postableRule.RuleCondition.TargetUnit = c.targetUnit
|
||||||
|
postableRule.RuleCondition.Thresholds = []ruletypes.RuleThreshold{ruletypes.NewBasicRuleThreshold(postableRule.AlertName, &c.target, nil, ruletypes.MatchType(c.matchType), ruletypes.CompareOp(c.compareOp), postableRule.RuleCondition.SelectedQuery, c.targetUnit, postableRule.RuleCondition.CompositeQuery.Unit)}
|
||||||
postableRule.Annotations = map[string]string{
|
postableRule.Annotations = map[string]string{
|
||||||
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
|
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
|
||||||
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
||||||
@@ -1602,6 +1640,10 @@ func TestThresholdRuleLogsLink(t *testing.T) {
|
|||||||
RuleType: ruletypes.RuleTypeThreshold,
|
RuleType: ruletypes.RuleTypeThreshold,
|
||||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
Evaluation: &ruletypes.RollingWindow{
|
||||||
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
},
|
||||||
RuleCondition: &ruletypes.RuleCondition{
|
RuleCondition: &ruletypes.RuleCondition{
|
||||||
CompositeQuery: &v3.CompositeQuery{
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
QueryType: v3.QueryTypeBuilder,
|
QueryType: v3.QueryTypeBuilder,
|
||||||
@@ -1679,6 +1721,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
|
|||||||
postableRule.RuleCondition.Target = &c.target
|
postableRule.RuleCondition.Target = &c.target
|
||||||
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
|
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
|
||||||
postableRule.RuleCondition.TargetUnit = c.targetUnit
|
postableRule.RuleCondition.TargetUnit = c.targetUnit
|
||||||
|
postableRule.RuleCondition.Thresholds = []ruletypes.RuleThreshold{ruletypes.NewBasicRuleThreshold(postableRule.AlertName, &c.target, nil, ruletypes.MatchType(c.matchType), ruletypes.CompareOp(c.compareOp), postableRule.RuleCondition.SelectedQuery, c.targetUnit, postableRule.RuleCondition.CompositeQuery.Unit)}
|
||||||
postableRule.Annotations = map[string]string{
|
postableRule.Annotations = map[string]string{
|
||||||
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
|
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
|
||||||
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
||||||
@@ -1726,6 +1769,10 @@ func TestThresholdRuleShiftBy(t *testing.T) {
|
|||||||
RuleType: ruletypes.RuleTypeThreshold,
|
RuleType: ruletypes.RuleTypeThreshold,
|
||||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
Evaluation: &ruletypes.RollingWindow{
|
||||||
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
},
|
||||||
RuleCondition: &ruletypes.RuleCondition{
|
RuleCondition: &ruletypes.RuleCondition{
|
||||||
CompositeQuery: &v3.CompositeQuery{
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
QueryType: v3.QueryTypeBuilder,
|
QueryType: v3.QueryTypeBuilder,
|
||||||
@@ -1782,3 +1829,172 @@ func TestThresholdRuleShiftBy(t *testing.T) {
|
|||||||
|
|
||||||
assert.Equal(t, int64(10), params.CompositeQuery.BuilderQueries["A"].ShiftBy)
|
assert.Equal(t, int64(10), params.CompositeQuery.BuilderQueries["A"].ShiftBy)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMultipleThresholdRule(t *testing.T) {
|
||||||
|
postableRule := ruletypes.PostableRule{
|
||||||
|
AlertName: "Mulitple threshold test",
|
||||||
|
AlertType: ruletypes.AlertTypeMetric,
|
||||||
|
RuleType: ruletypes.RuleTypeThreshold,
|
||||||
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
Evaluation: &ruletypes.RollingWindow{
|
||||||
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
},
|
||||||
|
RuleCondition: &ruletypes.RuleCondition{
|
||||||
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
|
QueryType: v3.QueryTypeBuilder,
|
||||||
|
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||||
|
"A": {
|
||||||
|
QueryName: "A",
|
||||||
|
StepInterval: 60,
|
||||||
|
AggregateAttribute: v3.AttributeKey{
|
||||||
|
Key: "signoz_calls_total",
|
||||||
|
},
|
||||||
|
AggregateOperator: v3.AggregateOperatorSumRate,
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
Expression: "A",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
|
||||||
|
|
||||||
|
cols := make([]cmock.ColumnType, 0)
|
||||||
|
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"})
|
||||||
|
cols = append(cols, cmock.ColumnType{Name: "attr", Type: "String"})
|
||||||
|
cols = append(cols, cmock.ColumnType{Name: "timestamp", Type: "String"})
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
targetUnit string
|
||||||
|
yAxisUnit string
|
||||||
|
values [][]interface{}
|
||||||
|
expectAlerts int
|
||||||
|
compareOp string
|
||||||
|
matchType string
|
||||||
|
target float64
|
||||||
|
secondTarget float64
|
||||||
|
summaryAny []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
targetUnit: "s",
|
||||||
|
yAxisUnit: "ns",
|
||||||
|
values: [][]interface{}{
|
||||||
|
{float64(572588400), "attr", time.Now()}, // 0.57 seconds
|
||||||
|
{float64(572386400), "attr", time.Now().Add(1 * time.Second)}, // 0.57 seconds
|
||||||
|
{float64(300947400), "attr", time.Now().Add(2 * time.Second)}, // 0.3 seconds
|
||||||
|
{float64(299316000), "attr", time.Now().Add(3 * time.Second)}, // 0.3 seconds
|
||||||
|
{float64(66640400.00000001), "attr", time.Now().Add(4 * time.Second)}, // 0.06 seconds
|
||||||
|
},
|
||||||
|
expectAlerts: 2,
|
||||||
|
compareOp: "1", // Above
|
||||||
|
matchType: "1", // Once
|
||||||
|
target: 1, // 1 second
|
||||||
|
secondTarget: .5,
|
||||||
|
summaryAny: []string{
|
||||||
|
"observed metric value is 573 ms",
|
||||||
|
"observed metric value is 572 ms",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
targetUnit: "ms",
|
||||||
|
yAxisUnit: "ns",
|
||||||
|
values: [][]interface{}{
|
||||||
|
{float64(572588400), "attr", time.Now()}, // 572.58 ms
|
||||||
|
{float64(572386400), "attr", time.Now().Add(1 * time.Second)}, // 572.38 ms
|
||||||
|
{float64(300947400), "attr", time.Now().Add(2 * time.Second)}, // 300.94 ms
|
||||||
|
{float64(299316000), "attr", time.Now().Add(3 * time.Second)}, // 299.31 ms
|
||||||
|
{float64(66640400.00000001), "attr", time.Now().Add(4 * time.Second)}, // 66.64 ms
|
||||||
|
},
|
||||||
|
expectAlerts: 6,
|
||||||
|
compareOp: "1", // Above
|
||||||
|
matchType: "1", // Once
|
||||||
|
target: 200, // 200 ms
|
||||||
|
secondTarget: 500,
|
||||||
|
summaryAny: []string{
|
||||||
|
"observed metric value is 299 ms",
|
||||||
|
"the observed metric value is 573 ms",
|
||||||
|
"the observed metric value is 572 ms",
|
||||||
|
"the observed metric value is 301 ms",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
targetUnit: "decgbytes",
|
||||||
|
yAxisUnit: "bytes",
|
||||||
|
values: [][]interface{}{
|
||||||
|
{float64(2863284053), "attr", time.Now()}, // 2.86 GB
|
||||||
|
{float64(2863388842), "attr", time.Now().Add(1 * time.Second)}, // 2.86 GB
|
||||||
|
{float64(300947400), "attr", time.Now().Add(2 * time.Second)}, // 0.3 GB
|
||||||
|
{float64(299316000), "attr", time.Now().Add(3 * time.Second)}, // 0.3 GB
|
||||||
|
{float64(66640400.00000001), "attr", time.Now().Add(4 * time.Second)}, // 66.64 MB
|
||||||
|
},
|
||||||
|
expectAlerts: 2,
|
||||||
|
compareOp: "1", // Above
|
||||||
|
matchType: "1", // Once
|
||||||
|
target: 200, // 200 GB
|
||||||
|
secondTarget: 2, // 2GB
|
||||||
|
summaryAny: []string{
|
||||||
|
"observed metric value is 2.7 GiB",
|
||||||
|
"the observed metric value is 0.3 GB",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := instrumentationtest.New().Logger()
|
||||||
|
|
||||||
|
for idx, c := range cases {
|
||||||
|
rows := cmock.NewRows(cols, c.values)
|
||||||
|
// We are testing the eval logic after the query is run
|
||||||
|
// so we don't care about the query string here
|
||||||
|
queryString := "SELECT any"
|
||||||
|
telemetryStore.Mock().
|
||||||
|
ExpectQuery(queryString).
|
||||||
|
WillReturnRows(rows)
|
||||||
|
postableRule.RuleCondition.CompareOp = ruletypes.CompareOp(c.compareOp)
|
||||||
|
postableRule.RuleCondition.MatchType = ruletypes.MatchType(c.matchType)
|
||||||
|
postableRule.RuleCondition.Target = &c.target
|
||||||
|
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
|
||||||
|
postableRule.RuleCondition.TargetUnit = c.targetUnit
|
||||||
|
postableRule.RuleCondition.Thresholds = []ruletypes.RuleThreshold{ruletypes.NewBasicRuleThreshold("first_threshold", &c.target, nil, ruletypes.MatchType(c.matchType), ruletypes.CompareOp(c.compareOp), postableRule.RuleCondition.SelectedQuery, c.targetUnit, postableRule.RuleCondition.CompositeQuery.Unit),
|
||||||
|
ruletypes.NewBasicRuleThreshold("second_threshold", &c.secondTarget, nil, ruletypes.MatchType(c.matchType), ruletypes.CompareOp(c.compareOp), postableRule.RuleCondition.SelectedQuery, c.targetUnit, postableRule.RuleCondition.CompositeQuery.Unit),
|
||||||
|
}
|
||||||
|
postableRule.Annotations = map[string]string{
|
||||||
|
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
|
||||||
|
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
||||||
|
}
|
||||||
|
|
||||||
|
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||||
|
readerCache, err := cachetest.New(cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
|
||||||
|
require.NoError(t, err)
|
||||||
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), readerCache)
|
||||||
|
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
|
||||||
|
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||||
|
"signoz_calls_total": {
|
||||||
|
v3.Delta: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
retVal, err := rule.Eval(context.Background(), time.Now())
|
||||||
|
if err != nil {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx)
|
||||||
|
if c.expectAlerts != 0 {
|
||||||
|
foundCount := 0
|
||||||
|
for _, item := range rule.Active {
|
||||||
|
for _, summary := range c.summaryAny {
|
||||||
|
if strings.Contains(item.Annotations.Get("summary"), summary) {
|
||||||
|
foundCount++
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert.Equal(t, c.expectAlerts, foundCount, "case %d", idx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ package ruletypes
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/SigNoz/signoz/pkg/query-service/converter"
|
||||||
|
"math"
|
||||||
"net/url"
|
"net/url"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -11,6 +13,7 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||||
|
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
// this file contains common structs and methods used by
|
// this file contains common structs and methods used by
|
||||||
@@ -103,6 +106,294 @@ const (
|
|||||||
Last MatchType = "5"
|
Last MatchType = "5"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type RuleThreshold interface {
|
||||||
|
Name() string
|
||||||
|
Target() float64
|
||||||
|
RecoveryTarget() float64
|
||||||
|
|
||||||
|
MatchType() MatchType
|
||||||
|
CompareOp() CompareOp
|
||||||
|
|
||||||
|
SelectedQuery() string
|
||||||
|
ShouldAlert(series v3.Series) (Sample, bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
type BasicRuleThreshold struct {
|
||||||
|
name string
|
||||||
|
target *float64
|
||||||
|
targetUnit string
|
||||||
|
ruleUnit string
|
||||||
|
recoveryTarget *float64
|
||||||
|
matchType MatchType
|
||||||
|
compareOp CompareOp
|
||||||
|
selectedQuery string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBasicRuleThreshold(name string, target *float64, recoveryTarget *float64, matchType MatchType, op CompareOp, selectedQuery string, targetUnit string, ruleUnit string) *BasicRuleThreshold {
|
||||||
|
return &BasicRuleThreshold{name: name, target: target, recoveryTarget: recoveryTarget, matchType: matchType, selectedQuery: selectedQuery, compareOp: op, targetUnit: targetUnit, ruleUnit: ruleUnit}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b BasicRuleThreshold) Name() string {
|
||||||
|
return b.name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b BasicRuleThreshold) Target() float64 {
|
||||||
|
unitConverter := converter.FromUnit(converter.Unit(b.targetUnit))
|
||||||
|
// convert the target value to the y-axis unit
|
||||||
|
value := unitConverter.Convert(converter.Value{
|
||||||
|
F: *b.target,
|
||||||
|
U: converter.Unit(b.targetUnit),
|
||||||
|
}, converter.Unit(b.ruleUnit))
|
||||||
|
return value.F
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b BasicRuleThreshold) RecoveryTarget() float64 {
|
||||||
|
return *b.recoveryTarget
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b BasicRuleThreshold) MatchType() MatchType {
|
||||||
|
return b.matchType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b BasicRuleThreshold) CompareOp() CompareOp {
|
||||||
|
return b.compareOp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b BasicRuleThreshold) SelectedQuery() string {
|
||||||
|
return b.selectedQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
func removeGroupinSetPoints(series v3.Series) []v3.Point {
|
||||||
|
var result []v3.Point
|
||||||
|
for _, s := range series.Points {
|
||||||
|
if s.Timestamp >= 0 && !math.IsNaN(s.Value) && !math.IsInf(s.Value, 0) {
|
||||||
|
result = append(result, s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b BasicRuleThreshold) ShouldAlert(series v3.Series) (Sample, bool) {
|
||||||
|
var shouldAlert bool
|
||||||
|
var alertSmpl Sample
|
||||||
|
var lbls qslabels.Labels
|
||||||
|
|
||||||
|
for name, value := range series.Labels {
|
||||||
|
lbls = append(lbls, qslabels.Label{Name: name, Value: value})
|
||||||
|
}
|
||||||
|
|
||||||
|
lbls = append(lbls, qslabels.Label{Name: "threshold", Value: b.name})
|
||||||
|
|
||||||
|
series.Points = removeGroupinSetPoints(series)
|
||||||
|
|
||||||
|
// nothing to evaluate
|
||||||
|
if len(series.Points) == 0 {
|
||||||
|
return alertSmpl, false
|
||||||
|
}
|
||||||
|
|
||||||
|
switch b.MatchType() {
|
||||||
|
case AtleastOnce:
|
||||||
|
// If any sample matches the condition, the rule is firing.
|
||||||
|
if b.CompareOp() == ValueIsAbove {
|
||||||
|
for _, smpl := range series.Points {
|
||||||
|
if smpl.Value > b.Target() {
|
||||||
|
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||||
|
shouldAlert = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueIsBelow {
|
||||||
|
for _, smpl := range series.Points {
|
||||||
|
if smpl.Value < b.Target() {
|
||||||
|
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||||
|
shouldAlert = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueIsEq {
|
||||||
|
for _, smpl := range series.Points {
|
||||||
|
if smpl.Value == b.Target() {
|
||||||
|
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||||
|
shouldAlert = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueIsNotEq {
|
||||||
|
for _, smpl := range series.Points {
|
||||||
|
if smpl.Value != b.Target() {
|
||||||
|
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||||
|
shouldAlert = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueOutsideBounds {
|
||||||
|
for _, smpl := range series.Points {
|
||||||
|
if math.Abs(smpl.Value) >= b.Target() {
|
||||||
|
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||||
|
shouldAlert = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case AllTheTimes:
|
||||||
|
// If all samples match the condition, the rule is firing.
|
||||||
|
shouldAlert = true
|
||||||
|
alertSmpl = Sample{Point: Point{V: b.Target()}, Metric: lbls}
|
||||||
|
if b.CompareOp() == ValueIsAbove {
|
||||||
|
for _, smpl := range series.Points {
|
||||||
|
if smpl.Value <= b.Target() {
|
||||||
|
shouldAlert = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// use min value from the series
|
||||||
|
if shouldAlert {
|
||||||
|
var minValue float64 = math.Inf(1)
|
||||||
|
for _, smpl := range series.Points {
|
||||||
|
if smpl.Value < minValue {
|
||||||
|
minValue = smpl.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
alertSmpl = Sample{Point: Point{V: minValue}, Metric: lbls}
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueIsBelow {
|
||||||
|
for _, smpl := range series.Points {
|
||||||
|
if smpl.Value >= b.Target() {
|
||||||
|
shouldAlert = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if shouldAlert {
|
||||||
|
var maxValue float64 = math.Inf(-1)
|
||||||
|
for _, smpl := range series.Points {
|
||||||
|
if smpl.Value > maxValue {
|
||||||
|
maxValue = smpl.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
alertSmpl = Sample{Point: Point{V: maxValue}, Metric: lbls}
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueIsEq {
|
||||||
|
for _, smpl := range series.Points {
|
||||||
|
if smpl.Value != b.Target() {
|
||||||
|
shouldAlert = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueIsNotEq {
|
||||||
|
for _, smpl := range series.Points {
|
||||||
|
if smpl.Value == b.Target() {
|
||||||
|
shouldAlert = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// use any non-inf or nan value from the series
|
||||||
|
if shouldAlert {
|
||||||
|
for _, smpl := range series.Points {
|
||||||
|
if !math.IsInf(smpl.Value, 0) && !math.IsNaN(smpl.Value) {
|
||||||
|
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueOutsideBounds {
|
||||||
|
for _, smpl := range series.Points {
|
||||||
|
if math.Abs(smpl.Value) < b.Target() {
|
||||||
|
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||||
|
shouldAlert = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case OnAverage:
|
||||||
|
// If the average of all samples matches the condition, the rule is firing.
|
||||||
|
var sum, count float64
|
||||||
|
for _, smpl := range series.Points {
|
||||||
|
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
sum += smpl.Value
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
avg := sum / count
|
||||||
|
alertSmpl = Sample{Point: Point{V: avg}, Metric: lbls}
|
||||||
|
if b.CompareOp() == ValueIsAbove {
|
||||||
|
if avg > b.Target() {
|
||||||
|
shouldAlert = true
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueIsBelow {
|
||||||
|
if avg < b.Target() {
|
||||||
|
shouldAlert = true
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueIsEq {
|
||||||
|
if avg == b.Target() {
|
||||||
|
shouldAlert = true
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueIsNotEq {
|
||||||
|
if avg != b.Target() {
|
||||||
|
shouldAlert = true
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueOutsideBounds {
|
||||||
|
if math.Abs(avg) >= b.Target() {
|
||||||
|
shouldAlert = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case InTotal:
|
||||||
|
// If the sum of all samples matches the condition, the rule is firing.
|
||||||
|
var sum float64
|
||||||
|
|
||||||
|
for _, smpl := range series.Points {
|
||||||
|
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
sum += smpl.Value
|
||||||
|
}
|
||||||
|
alertSmpl = Sample{Point: Point{V: sum}, Metric: lbls}
|
||||||
|
if b.CompareOp() == ValueIsAbove {
|
||||||
|
if sum > b.Target() {
|
||||||
|
shouldAlert = true
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueIsBelow {
|
||||||
|
if sum < b.Target() {
|
||||||
|
shouldAlert = true
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueIsEq {
|
||||||
|
if sum == b.Target() {
|
||||||
|
shouldAlert = true
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueIsNotEq {
|
||||||
|
if sum != b.Target() {
|
||||||
|
shouldAlert = true
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueOutsideBounds {
|
||||||
|
if math.Abs(sum) >= b.Target() {
|
||||||
|
shouldAlert = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case Last:
|
||||||
|
// If the last sample matches the condition, the rule is firing.
|
||||||
|
shouldAlert = false
|
||||||
|
alertSmpl = Sample{Point: Point{V: series.Points[len(series.Points)-1].Value}, Metric: lbls}
|
||||||
|
if b.CompareOp() == ValueIsAbove {
|
||||||
|
if series.Points[len(series.Points)-1].Value > b.Target() {
|
||||||
|
shouldAlert = true
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueIsBelow {
|
||||||
|
if series.Points[len(series.Points)-1].Value < b.Target() {
|
||||||
|
shouldAlert = true
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueIsEq {
|
||||||
|
if series.Points[len(series.Points)-1].Value == b.Target() {
|
||||||
|
shouldAlert = true
|
||||||
|
}
|
||||||
|
} else if b.CompareOp() == ValueIsNotEq {
|
||||||
|
if series.Points[len(series.Points)-1].Value != b.Target() {
|
||||||
|
shouldAlert = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return alertSmpl, shouldAlert
|
||||||
|
}
|
||||||
|
|
||||||
type RuleCondition struct {
|
type RuleCondition struct {
|
||||||
CompositeQuery *v3.CompositeQuery `json:"compositeQuery,omitempty" yaml:"compositeQuery,omitempty"`
|
CompositeQuery *v3.CompositeQuery `json:"compositeQuery,omitempty" yaml:"compositeQuery,omitempty"`
|
||||||
CompareOp CompareOp `yaml:"op,omitempty" json:"op,omitempty"`
|
CompareOp CompareOp `yaml:"op,omitempty" json:"op,omitempty"`
|
||||||
@@ -116,6 +407,7 @@ type RuleCondition struct {
|
|||||||
SelectedQuery string `json:"selectedQueryName,omitempty"`
|
SelectedQuery string `json:"selectedQueryName,omitempty"`
|
||||||
RequireMinPoints bool `yaml:"requireMinPoints,omitempty" json:"requireMinPoints,omitempty"`
|
RequireMinPoints bool `yaml:"requireMinPoints,omitempty" json:"requireMinPoints,omitempty"`
|
||||||
RequiredNumPoints int `yaml:"requiredNumPoints,omitempty" json:"requiredNumPoints,omitempty"`
|
RequiredNumPoints int `yaml:"requiredNumPoints,omitempty" json:"requiredNumPoints,omitempty"`
|
||||||
|
Thresholds []RuleThreshold `yaml:"thresholds,omitempty" json:"thresholds,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rc *RuleCondition) GetSelectedQueryName() string {
|
func (rc *RuleCondition) GetSelectedQueryName() string {
|
||||||
|
|||||||
@@ -65,8 +65,12 @@ type PostableRule struct {
|
|||||||
Version string `json:"version,omitempty"`
|
Version string `json:"version,omitempty"`
|
||||||
|
|
||||||
// legacy
|
// legacy
|
||||||
Expr string `yaml:"expr,omitempty" json:"expr,omitempty"`
|
Expr string `yaml:"expr,omitempty" json:"expr,omitempty"`
|
||||||
OldYaml string `json:"yaml,omitempty"`
|
OldYaml string `json:"yaml,omitempty"`
|
||||||
|
EvalType string `yaml:"evalType,omitempty" json:"evalType,omitempty"`
|
||||||
|
Evaluation Evaluation `yaml:"evaluation,omitempty" json:"evaluation,omitempty"`
|
||||||
|
StartsAt int64 `yaml:"startsAt,omitempty" json:"startsAt,omitempty"`
|
||||||
|
Schedule string `json:"schedule,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func ParsePostableRule(content []byte) (*PostableRule, error) {
|
func ParsePostableRule(content []byte) (*PostableRule, error) {
|
||||||
@@ -140,6 +144,15 @@ func ParseIntoRule(initRule PostableRule, content []byte, kind RuleDataKind) (*P
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//added alerts v2 fields
|
||||||
|
rule.RuleCondition.Thresholds = append(rule.RuleCondition.Thresholds,
|
||||||
|
NewBasicRuleThreshold(rule.AlertName, rule.RuleCondition.Target, nil, rule.RuleCondition.MatchType, rule.RuleCondition.CompareOp, rule.RuleCondition.SelectedQuery, rule.RuleCondition.TargetUnit, rule.RuleCondition.CompositeQuery.Unit))
|
||||||
|
if rule.EvalType == "" || rule.EvalType == "rolling" {
|
||||||
|
rule.EvalType = "rolling"
|
||||||
|
rule.Evaluation = NewEvaluation(rule.EvalType, RollingWindow{EvalWindow: rule.EvalWindow, Frequency: rule.Frequency, RequiredNumPoints: rule.RuleCondition.RequiredNumPoints, RequireMinPoints: rule.RuleCondition.RequireMinPoints})
|
||||||
|
} else if rule.EvalType == "cumulative" {
|
||||||
|
rule.Evaluation = NewEvaluation(rule.EvalType, CumulativeWindow{EvalWindow: rule.EvalWindow, StartsAt: time.UnixMilli(rule.StartsAt), RequiredNumPoints: rule.RuleCondition.RequiredNumPoints, RequireMinPoints: rule.RuleCondition.RequireMinPoints})
|
||||||
|
}
|
||||||
return rule, nil
|
return rule, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
67
pkg/types/ruletypes/evaluation.go
Normal file
67
pkg/types/ruletypes/evaluation.go
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
package ruletypes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Evaluation interface {
|
||||||
|
EvaluationTime(curr time.Time) (time.Time, time.Time)
|
||||||
|
}
|
||||||
|
|
||||||
|
type RollingWindow struct {
|
||||||
|
EvalWindow Duration `json:"evalWindow"`
|
||||||
|
Frequency Duration `json:"frequency"`
|
||||||
|
RequireMinPoints bool `json:"requireMinPoints"`
|
||||||
|
RequiredNumPoints int `json:"requiredNumPoints"`
|
||||||
|
SkipEvalForNewGroups []v3.AttributeKey `json:"skipEvalForNewGroups"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rollingWindow *RollingWindow) EvaluationTime(curr time.Time) (time.Time, time.Time) {
|
||||||
|
return curr.Add(time.Duration(-rollingWindow.EvalWindow)), curr
|
||||||
|
}
|
||||||
|
|
||||||
|
type CumulativeWindow struct {
|
||||||
|
StartsAt time.Time `json:"startsAt"`
|
||||||
|
EvalWindow Duration `json:"evalWindow"`
|
||||||
|
RequireMinPoints bool `json:"requireMinPoints"`
|
||||||
|
RequiredNumPoints int `json:"requiredNumPoints"`
|
||||||
|
SkipEvalForNewGroups []v3.AttributeKey `json:"skipEvalForNewGroups"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cumulativeWindow *CumulativeWindow) EvaluationTime(curr time.Time) (time.Time, time.Time) {
|
||||||
|
if curr.Before(cumulativeWindow.StartsAt) {
|
||||||
|
return curr, curr
|
||||||
|
}
|
||||||
|
|
||||||
|
dur := time.Duration(cumulativeWindow.EvalWindow)
|
||||||
|
if dur <= 0 {
|
||||||
|
return curr, curr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate the number of complete windows since StartsAt
|
||||||
|
elapsed := curr.Sub(cumulativeWindow.StartsAt)
|
||||||
|
windows := int64(elapsed / dur)
|
||||||
|
windowStart := cumulativeWindow.StartsAt.Add(time.Duration(windows) * dur)
|
||||||
|
return windowStart, curr
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEvaluation(evalType string, params interface{}) Evaluation {
|
||||||
|
switch evalType {
|
||||||
|
case "rolling":
|
||||||
|
p, ok := params.(RollingWindow)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &p
|
||||||
|
case "cumulative":
|
||||||
|
p, ok := params.(CumulativeWindow)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &p
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user