Compare commits

...

93 Commits

Author SHA1 Message Date
Srikanth Chekuri
ee739c4cde Merge branch 'main' into feat/cron-alert 2025-09-15 21:24:44 +05:30
aniket
1a575459f5 chore(cron-alerts): sorted imports 2025-09-15 19:39:23 +05:30
aniket
397116c6e2 chore(cron-alerts): added timezone for cron handling 2025-09-15 19:30:30 +05:30
aniket
be25a42007 Merge branch 'main' of github.com:SigNoz/signoz into feat/cron-alert 2025-09-15 19:01:32 +05:30
aniket
5400700b57 chore: added rrue evaluator logic 2025-09-15 02:56:32 +05:30
aniket
294b41c79f Merge branch 'feat/cummulative-window' of github.com:SigNoz/signoz into feat/cron-alert 2025-09-15 02:45:06 +05:30
aniket
7d9aaf2d49 chore(cumulative-window): updated schedule enum 2025-09-15 02:42:07 +05:30
Srikanth Chekuri
1e098fa08b Merge branch 'main' into feat/cummulative-window 2025-09-14 23:27:27 +05:30
Srikanth Chekuri
b50df67895 Merge branch 'main' into feat/cummulative-window 2025-09-14 20:42:43 +05:30
Srikanth Chekuri
3de1888c13 Merge branch 'main' into feat/cummulative-window 2025-09-14 19:49:24 +05:30
aniket
3f5f63d3ac chore(cumulative-window): updated api structure for cumulative window 2025-09-13 17:12:10 +05:30
aniketio-ctrl
48f5defbcb Merge branch 'main' into feat/cummulative-window 2025-09-12 20:20:37 +05:30
aniket
efaee2030e chore(cumulative-window): added validation for eval window 2025-09-12 19:12:04 +05:30
aniket
c4fe00ecfc chore(cumulative-window): added case for timezone 2025-09-12 18:38:36 +05:30
aniket
eec2c43011 chore(cumulative-window): removed error from next window for 2025-09-12 16:43:39 +05:30
aniket
c963064293 chore(cumulative-window): removed error from next window for 2025-09-12 16:33:09 +05:30
aniket
d26308590f chore(cumulative-window): sorted imports 2025-09-12 15:52:08 +05:30
aniket
12ffb02a94 chore(cumulative-window): sorted imports 2025-09-12 15:51:06 +05:30
aniket
d00e2c7590 chore(cumulative-window): sorted imports 2025-09-12 15:47:46 +05:30
aniket
7d72c03b95 chore(cumulative-window): added reset boundary condition tests 2025-09-12 15:40:47 +05:30
aniket
29ba0b0ae9 chore(cumulative-window): added reset boundary condition tests 2025-09-12 15:38:37 +05:30
aniket
ad8a8c6502 chore(cumulative-window): removed naked errors 2025-09-12 13:23:45 +05:30
aniket
79fbf48e89 Merge branch 'main' of github.com:SigNoz/signoz into feat/cummulative-window 2025-09-12 13:17:24 +05:30
aniket
bb0ee343c9 Merge branch 'feat/multi-threshold' of github.com:SigNoz/signoz into feat/cummulative-window 2025-09-12 12:20:48 +05:30
aniket
1d656b7759 chore(cumulative-window): renamed funcitons 2025-09-12 12:18:33 +05:30
aniketio-ctrl
8f10ed417b Update pkg/types/ruletypes/evaluation.go
Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
2025-09-12 12:10:33 +05:30
aniketio-ctrl
6a86cd205a Update pkg/types/ruletypes/evaluation.go
Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
2025-09-12 12:10:23 +05:30
aniketio-ctrl
47a71db8b8 Merge branch 'main' into feat/multi-threshold 2025-09-12 12:08:38 +05:30
aniket
ded8b34623 feat(multi-threhsold): removed yaml support for alerts 2025-09-11 18:14:52 +05:30
aniketio-ctrl
64a14dfe2e Update pkg/types/ruletypes/evaluation.go
Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
2025-09-11 14:57:41 +05:30
aniket
776889e77d feat(cumulative-window): added validation check 2025-09-11 14:43:33 +05:30
aniket
ac8daf68c2 Merge branch 'feat/multi-threshold' of github.com:SigNoz/signoz into feat/cummulative-window 2025-09-11 14:35:34 +05:30
aniket
7f56931840 Merge branch 'feat/multi-threshold' of github.com:SigNoz/signoz into feat/multi-threshold 2025-09-11 14:26:59 +05:30
aniket
a83e5e87ed feat(multi-threshold): added umnarshaller for postable rule 2025-09-11 14:26:36 +05:30
aniketio-ctrl
a640168d26 Merge branch 'main' into feat/multi-threshold 2025-09-11 14:08:02 +05:30
aniket
c36eb64a5e feat(multi-threshold): added umnarshaller for postable rule 2025-09-11 13:53:16 +05:30
aniket
bc2c307dbd feat(multi-threshold): added json parser for gettable rule 2025-09-11 01:16:09 +05:30
aniket
a49a8cab9b feat(multi-threshold): added json parser for gettable rule 2025-09-11 01:02:05 +05:30
aniket
53a7f2dbca Merge branch 'feat/multi-threshold' of github.com:SigNoz/signoz into feat/multi-threshold 2025-09-11 00:56:21 +05:30
aniket
ab7f590794 feat(multi-threshold): added json parser for gettable rule 2025-09-11 00:55:38 +05:30
Srikanth Chekuri
b62a19c9d7 Merge branch 'main' into feat/multi-threshold 2025-09-11 00:34:40 +05:30
aniket
c48622ff30 feat(multi-notification): removed pre defined labels from links of log and traces 2025-09-11 00:19:43 +05:30
aniket
93e8045c55 feat(multi-notification): removed pre defined labels from links of log and traces 2025-09-11 00:08:31 +05:30
aniket
47083e2de1 feat(multi-threshold): added validation and error propagation 2025-09-10 17:28:43 +05:30
aniket
cdc3f2c5e8 Merge branch 'main' of github.com:SigNoz/signoz into feat/multi-threshold 2025-09-10 15:32:24 +05:30
aniketio-ctrl
dce653686b Merge branch 'main' into feat/multi-threshold 2025-09-10 13:08:00 +05:30
aniket
890743ce33 Merge branch 'feat/cummulative-window' of github.com:SigNoz/signoz into feat/cummulative-window 2025-09-10 13:06:10 +05:30
aniket
c74f976163 feat(cumulative-window): segregated json marshalling and evaluation logic 2025-09-10 13:05:46 +05:30
aniket
872323f798 Merge branch 'feat/multi-threshold' of github.com:SigNoz/signoz into feat/cummulative-window 2025-09-10 12:59:59 +05:30
aniketio-ctrl
d866a298c2 Update pkg/types/ruletypes/threshold.go
Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
2025-09-10 12:59:02 +05:30
aniket
c4ad1f05ad feat(multi-threhsold): added error wrapper 2025-09-10 12:58:16 +05:30
aniket
88db8b3ce7 feat(cumulative-window): segregated json marshalling and evaluation logic 2025-09-10 12:55:18 +05:30
aniket
4fb227a0de Merge branch 'feat/multi-threshold' of github.com:SigNoz/signoz into feat/cummulative-window 2025-09-10 12:50:57 +05:30
aniket
56649f7747 feat(multi-threhsold): added error wrapper 2025-09-10 12:50:12 +05:30
aniket
acc5c76339 feat(multi-threhsold): added error wrapper 2025-09-10 12:48:53 +05:30
aniketio-ctrl
cb8ce22bfd Merge branch 'feat/multi-threshold' into feat/cummulative-window 2025-09-10 12:36:18 +05:30
aniket
ee3c682f1e feat(cumulative-window): segregated json marshalling and evaluation logic 2025-09-10 03:48:23 +05:30
aniket
62f61ff3c4 feat(multi-threshold): added segregation on json marshalling and actual threhsold logic 2025-09-10 03:22:16 +05:30
aniket
fb997cff04 feat(cumulative-window): segregated json marshalling and evaluation logic 2025-09-10 03:20:12 +05:30
aniket
593657a66d Merge branch 'feat/multi-threshold' of github.com:SigNoz/signoz into feat/cummulative-window 2025-09-10 02:45:22 +05:30
aniket
c1d801828f feat(multi-threshold): added segregation on json marshalling and actual threhsold logic 2025-09-10 02:41:01 +05:30
aniket
34e29da919 feat(multi-threshold): added segregation on json marshalling and actual threhsold logic 2025-09-10 02:31:46 +05:30
aniket
e8fb0e622e feat(cumulative-window): segregated json marshalling and evaluation logic 2025-09-09 18:38:34 +05:30
aniket
4d5a34fc18 feat(cumulative-window): segregated json marshalling and evaluation logic 2025-09-09 18:26:13 +05:30
aniket
2dd712a441 feat(multi-threshold): corrected the test cases 2025-09-09 15:42:28 +05:30
aniket
22113df7dd feat(cumulative-window): segregated json marshalling with evaluation logic 2025-09-09 15:34:29 +05:30
aniket
2ef6bfd012 feat(multi-threshold): removed break to send multi threshold alerts 2025-09-09 15:13:37 +05:30
aniket
6b923d6e16 feat(multi-threshold): removed break to send multi threshold alerts 2025-09-09 15:11:47 +05:30
Srikanth Chekuri
2f317257f6 Merge branch 'main' into feat/multi-threshold 2025-09-08 19:41:44 +05:30
aniket
6107552af9 feat(cron-alerts): added rrule validation 2025-09-08 18:33:09 +05:30
aniket
4ff172d129 feat(cron-alerts): added cron alerts for alerts v2 2025-09-08 18:30:00 +05:30
aniket
a4540323b7 Revert "feat(rulesV2): added new api componenets"
This reverts commit 2d8803a2e8.
2025-09-08 17:41:25 +05:30
aniket
d90e4ba995 Merge branch 'feat/cummulative-window' of github.com:SigNoz/signoz into feat/cron-alert 2025-09-08 17:38:25 +05:30
aniket
a6efeb3cbe feat(cumulative-window): added cumulative window for alerts v2 2025-09-08 16:33:31 +05:30
aniket
4ac34a8ef2 Merge branch 'feat/multi-threshold' of github.com:SigNoz/signoz into feat/cummulative-window 2025-09-08 15:18:03 +05:30
Srikanth Chekuri
b4780c96f5 Merge branch 'main' into feat/multi-threshold 2025-08-31 10:12:11 +05:30
aniket
5bf60674be Merge branch 'feat/multi-threshold' of github.com:SigNoz/signoz into feat/multi-threshold 2025-08-30 18:05:08 +05:30
aniket
a7bcbfea9b feat(multi-threshold): added severity as threshold name 2025-08-30 18:04:34 +05:30
aniketio-ctrl
d309975c18 Merge branch 'main' into feat/multi-threshold 2025-08-30 17:12:40 +05:30
aniket
d6a39a377f feat(multi-threshold): fixed log lines 2025-08-30 14:24:26 +05:30
aniketio-ctrl
284f3814b7 Update pkg/query-service/rules/threshold_rule.go
Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
2025-08-30 14:11:29 +05:30
aniket
d7b83da3c6 feat(multi-threshold): added recovery min points 2025-08-30 14:09:12 +05:30
aniket
2f0636f7e7 Merge branch 'main' of github.com:SigNoz/signoz into feat/multi-threshold 2025-08-27 21:41:45 +05:30
aniket
2d8803a2e8 feat(rulesV2): added new api componenets 2025-08-22 02:30:58 +05:30
aniket
cb7990b265 feat(custom-schedule): added cron alerts 2025-08-18 18:38:05 +05:30
aniket
a258c24918 feat(cron-alerts): added cron alerts 2025-08-18 18:31:22 +05:30
aniket
05bb5166b8 feat(cumulative-window): added cumulative window 2025-08-18 17:37:30 +05:30
aniket
9695ecd57d Merge branch 'feat/multi-threshold' of github.com:SigNoz/signoz into feat/multi-threshold 2025-08-17 13:16:19 +05:30
aniket
cc33362bf2 feat(multiple-threshold): added multiple thresholds 2025-08-17 12:59:56 +05:30
aniketio-ctrl
a4eb94f141 Update pkg/types/ruletypes/alerting.go
Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
2025-08-17 12:58:56 +05:30
aniket
73a6bed1af feat(multiple-threshold): added multiple thresholds 2025-08-16 17:09:36 +05:30
aniketio-ctrl
3467c2a6c8 Update pkg/types/ruletypes/api_params.go
Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
2025-08-16 14:03:43 +05:30
aniket
a011389fe0 feat(multi-threshold): added multi threshold 2025-08-16 14:03:06 +05:30
13 changed files with 347 additions and 102 deletions

View File

@@ -47,7 +47,10 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
// create ch rule task for evalution
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(evaluation.GetFrequency()), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
if tr.IsScheduled() {
schedule, startTime, timezone := tr.GetSchedule()
task.SetSchedule(schedule, startTime, timezone)
}
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
// create promql rule
@@ -69,7 +72,10 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
// create promql rule task for evalution
task = newTask(baserules.TaskTypeProm, opts.TaskName, time.Duration(evaluation.GetFrequency()), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
if pr.IsScheduled() {
schedule, startTime, timezone := pr.GetSchedule()
task.SetSchedule(schedule, startTime, timezone)
}
} else if opts.Rule.RuleType == ruletypes.RuleTypeAnomaly {
// create anomaly rule
ar, err := NewAnomalyRule(
@@ -91,7 +97,10 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
// create anomaly rule task for evalution
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(evaluation.GetFrequency()), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
if ar.IsScheduled() {
schedule, startTime, timezone := ar.GetSchedule()
task.SetSchedule(schedule, startTime, timezone)
}
} else {
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
}

1
go.mod
View File

@@ -55,6 +55,7 @@ require (
github.com/spf13/cobra v1.9.1
github.com/srikanthccv/ClickHouse-go-mock v0.12.0
github.com/stretchr/testify v1.10.0
github.com/teambition/rrule-go v1.8.2
github.com/tidwall/gjson v1.18.0
github.com/uptrace/bun v1.2.9
github.com/uptrace/bun/dialect/pgdialect v1.2.9

2
go.sum
View File

@@ -980,6 +980,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/teambition/rrule-go v1.8.2 h1:lIjpjvWTj9fFUZCmuoVDrKVOtdiyzbzc93qTmRVe/J8=
github.com/teambition/rrule-go v1.8.2/go.mod h1:Ieq5AbrKGciP1V//Wq8ktsTXwSwJHDD5mD/wLBGl3p4=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=

View File

@@ -90,6 +90,10 @@ type BaseRule struct {
sqlstore sqlstore.SQLStore
evaluation ruletypes.Evaluation
schedule string
scheduleStartsAt time.Time
timezone string
}
type RuleOption func(*BaseRule)
@@ -154,6 +158,9 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader
TemporalityMap: make(map[string]map[v3.Temporality]bool),
Threshold: threshold,
evaluation: evaluation,
schedule: p.Schedule,
scheduleStartsAt: time.UnixMilli(p.ScheduleStartsAt),
timezone: p.ScheduleTimezone,
}
if baseRule.evalWindow == 0 {
@@ -756,3 +763,11 @@ func (r *BaseRule) PopulateTemporality(ctx context.Context, orgID valuer.UUID, q
}
return nil
}
func (r *BaseRule) IsScheduled() bool {
return r.schedule != ""
}
func (r *BaseRule) GetSchedule() (string, time.Time, string) {
return r.schedule, r.scheduleStartsAt, r.timezone
}

View File

@@ -173,7 +173,10 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
// create ch rule task for evalution
task = newTask(TaskTypeCh, opts.TaskName, taskNamesuffix, time.Duration(evaluation.GetFrequency()), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
if tr.IsScheduled() {
schedule, startTime, timezone := tr.GetSchedule()
task.SetSchedule(schedule, startTime, timezone)
}
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
// create promql rule
@@ -195,7 +198,10 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
// create promql rule task for evalution
task = newTask(TaskTypeProm, opts.TaskName, taskNamesuffix, time.Duration(evaluation.GetFrequency()), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
if pr.IsScheduled() {
schedule, startTime, timezone := pr.GetSchedule()
task.SetSchedule(schedule, startTime, timezone)
}
} else {
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
}

View File

@@ -39,6 +39,9 @@ type PromRuleTask struct {
maintenanceStore ruletypes.MaintenanceStore
orgID valuer.UUID
schedule string
scheduleStartsAt time.Time
timezone string
}
// newPromRuleTask holds rules that have promql condition
@@ -75,6 +78,10 @@ func (g *PromRuleTask) Key() string {
return g.name + ";" + g.file
}
func (g *PromRuleTask) IsCronSchedule() bool {
return g.schedule != ""
}
func (g *PromRuleTask) Type() TaskType { return TaskTypeProm }
// Rules returns the group's rules.
@@ -92,36 +99,6 @@ func (g *PromRuleTask) Pause(b bool) {
func (g *PromRuleTask) Run(ctx context.Context) {
defer close(g.terminated)
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
select {
case <-time.After(time.Until(evalTimestamp)):
case <-g.done:
return
}
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
"ruleGroup": map[string]string{
"name": g.Name(),
},
})
iter := func() {
start := time.Now()
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}
// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.frequency` occurrence.
tick := time.NewTicker(g.frequency)
defer tick.Stop()
// defer cleanup
defer func() {
if !g.markStale {
@@ -140,23 +117,36 @@ func (g *PromRuleTask) Run(ctx context.Context) {
}()
iter()
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
"ruleGroup": map[string]string{
"name": g.Name(),
},
})
// let the group iterate and run
for {
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.frequency) - 1
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
iter()
}
if g.IsCronSchedule() {
evalFunc := createCronEvalFunction(&g.pause, g.Eval, g.setEvaluationTime, g.setLastEvaluation, ctx)
err := runCronScheduledTask(g.schedule, g.scheduleStartsAt, g.timezone, g.done, evalFunc)
if err != nil {
zap.L().Error("cron scheduler failed", zap.String("rule", g.Name()), zap.Error(err))
}
} else {
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
evalFunc := createIntervalEvalFunction(
&g.pause,
g.Eval,
g.setEvaluationTime,
g.setLastEvaluation,
ctx,
func() time.Time { return evalTimestamp },
)
runIntervalScheduledTask(
g.frequency,
g.done,
func() time.Time { return evalTimestamp },
evalFunc,
)
}
}
@@ -245,6 +235,14 @@ func (g *PromRuleTask) setLastEvaluation(ts time.Time) {
g.lastEvaluation = ts
}
func (g *PromRuleTask) SetSchedule(schedule string, t time.Time, timezone string) {
g.mtx.Lock()
defer g.mtx.Unlock()
g.schedule = schedule
g.scheduleStartsAt = t
g.timezone = timezone
}
// EvalTimestamp returns the immediately preceding consistently slotted evaluation time.
func (g *PromRuleTask) EvalTimestamp(startTime int64) time.Time {
var (

View File

@@ -36,6 +36,11 @@ type RuleTask struct {
maintenanceStore ruletypes.MaintenanceStore
orgID valuer.UUID
// New field for rrule-based scheduling
schedule string
scheduleStartsAt time.Time
timezone string
}
const DefaultFrequency = 1 * time.Minute
@@ -71,6 +76,10 @@ func (g *RuleTask) Key() string {
return g.name + ";" + g.file
}
func (g *RuleTask) IsCronSchedule() bool {
return g.schedule != ""
}
// Name returns the group name.
func (g *RuleTask) Type() TaskType { return TaskTypeCh }
@@ -95,58 +104,37 @@ func NewQueryOriginContext(ctx context.Context, data map[string]interface{}) con
func (g *RuleTask) Run(ctx context.Context) {
defer close(g.terminated)
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
zap.L().Debug("group run to begin at", zap.Time("evalTimestamp", evalTimestamp))
select {
case <-time.After(time.Until(evalTimestamp)):
case <-g.done:
return
}
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
"ruleRuleTask": map[string]string{
"name": g.Name(),
},
})
iter := func() {
if g.pause {
// todo(amol): remove in memory active alerts
// and last series state
return
if g.IsCronSchedule() {
evalFunc := createCronEvalFunction(&g.pause, g.Eval, g.setEvaluationTime, g.setLastEvaluation, ctx)
err := runCronScheduledTask(g.schedule, g.scheduleStartsAt, g.timezone, g.done, evalFunc)
if err != nil {
zap.L().Error("cron scheduler failed", zap.String("rule", g.Name()), zap.Error(err))
}
start := time.Now()
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
} else {
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
zap.L().Debug("group run to begin at", zap.Time("evalTimestamp", evalTimestamp))
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}
evalFunc := createIntervalEvalFunction(
&g.pause,
g.Eval,
g.setEvaluationTime,
g.setLastEvaluation,
ctx,
func() time.Time { return evalTimestamp },
)
// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.frequency` occurrence.
tick := time.NewTicker(g.frequency)
defer tick.Stop()
iter()
// let the group iterate and run
for {
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.frequency) - 1
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
iter()
}
}
runIntervalScheduledTask(
g.frequency,
g.done,
func() time.Time { return evalTimestamp },
evalFunc,
)
}
}
@@ -298,6 +286,14 @@ func (g *RuleTask) CopyState(fromTask Task) error {
return nil
}
func (g *RuleTask) SetSchedule(schedule string, t time.Time, timezone string) {
g.mtx.Lock()
defer g.mtx.Unlock()
g.schedule = schedule
g.scheduleStartsAt = t
g.timezone = timezone
}
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {

View File

@@ -28,6 +28,8 @@ type Task interface {
Rules() []Rule
Stop()
Pause(b bool)
IsCronSchedule() bool
SetSchedule(string, time.Time, string)
}
// newTask returns an appropriate group for

View File

@@ -0,0 +1,172 @@
package rules
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"time"
"github.com/teambition/rrule-go"
"go.uber.org/zap"
)
// runCronScheduledTask handles cron-based scheduling with timezone support
func runCronScheduledTask(
schedule string,
scheduleStartsAt time.Time,
timezone string,
done chan struct{},
evalFunc func(),
) error {
// Load the timezone
loc, err := time.LoadLocation(timezone)
if err != nil {
zap.L().Error("failed to load timezone", zap.String("timezone", timezone), zap.Error(err))
return err
}
// Convert start time to the specified timezone
startTimeInTZ := scheduleStartsAt.In(loc)
// Format DTSTART with timezone info
var rruleStr string
if loc == time.UTC {
rruleStr = "DTSTART:" + startTimeInTZ.Format("20060102T150405Z") + "\n" + schedule
} else {
// For non-UTC timezones, include timezone info
rruleStr = "DTSTART;TZID=" + timezone + ":" + startTimeInTZ.Format("20060102T150405") + "\n" + schedule
}
parsedSchedule, err := rrule.StrToRRule(rruleStr)
if err != nil {
zap.L().Error("failed to parse rrule expression", zap.String("rrule", schedule), zap.String("timezone", timezone), zap.Error(err))
return err
}
// Use timezone-aware current time
now := time.Now().In(loc)
nextRun := parsedSchedule.After(now, false)
if nextRun.IsZero() {
zap.L().Error("no future runs found for schedule", zap.String("schedule", schedule), zap.String("timezone", timezone))
return errors.New(errors.TypeCanceled, errors.CodeCanceled, "no future runs found for schedule")
}
zap.L().Debug("cron scheduler starting", zap.String("schedule", schedule), zap.String("timezone", timezone), zap.Time("nextRun", nextRun))
select {
case <-time.After(time.Until(nextRun)):
case <-done:
return nil
}
evalFunc()
currentRun := nextRun
for {
nextRun = parsedSchedule.After(currentRun, false)
if nextRun.IsZero() {
zap.L().Info("no more scheduled runs", zap.String("schedule", schedule))
return nil
}
select {
case <-done:
return nil
default:
select {
case <-done:
return nil
case <-time.After(time.Until(nextRun)):
now := time.Now().In(loc)
if now.After(nextRun.Add(time.Minute)) {
zap.L().Warn("missed scheduled run",
zap.String("schedule", schedule),
zap.String("timezone", timezone),
zap.Time("scheduled", nextRun),
zap.Time("actual", now))
}
currentRun = nextRun
evalFunc()
}
}
}
}
// runIntervalScheduledTask handles interval-based scheduling
func runIntervalScheduledTask(
frequency time.Duration,
done chan struct{},
evalTimestampFunc func() time.Time,
evalFunc func(),
) {
evalTimestamp := evalTimestampFunc()
select {
case <-time.After(time.Until(evalTimestamp)):
case <-done:
return
}
tick := time.NewTicker(frequency)
defer tick.Stop()
evalFunc()
for {
select {
case <-done:
return
default:
select {
case <-done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / frequency) - 1
evalTimestamp = evalTimestamp.Add((missed + 1) * frequency)
evalFunc()
}
}
}
}
// createCronEvalFunction creates evaluation function for cron scheduling
func createCronEvalFunction(
pause *bool,
evalFunc func(ctx context.Context, ts time.Time),
setEvaluationTime func(time.Duration),
setLastEvaluation func(time.Time),
ctx context.Context,
) func() {
return func() {
if *pause {
return
}
start := time.Now()
evalFunc(ctx, start)
timeSinceStart := time.Since(start)
setEvaluationTime(timeSinceStart)
setLastEvaluation(start)
}
}
// createIntervalEvalFunction creates evaluation function for interval scheduling
func createIntervalEvalFunction(
pause *bool,
evalFunc func(ctx context.Context, ts time.Time),
setEvaluationTime func(time.Duration),
setLastEvaluation func(time.Time),
ctx context.Context,
getEvalTimestamp func() time.Time,
) func() {
return func() {
if *pause {
return
}
start := time.Now()
evalFunc(ctx, getEvalTimestamp())
timeSinceStart := time.Since(start)
setEvaluationTime(timeSinceStart)
setLastEvaluation(start)
}
}

View File

@@ -609,7 +609,6 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
}
value := valueFormatter.Format(smpl.V, r.Unit())
//todo(aniket): handle different threshold
threshold := valueFormatter.Format(r.targetVal(), r.Unit())
r.logger.DebugContext(ctx, "Alert template data for rule", "rule_name", r.Name(), "formatter", valueFormatter.Name(), "value", value, "threshold", threshold)

View File

@@ -6,10 +6,11 @@ import (
"time"
"unicode/utf8"
"github.com/teambition/rrule-go"
signozError "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/times"
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
)
@@ -51,7 +52,10 @@ type PostableRule struct {
Version string `json:"version,omitempty"`
Evaluation *EvaluationEnvelope `yaml:"evaluation,omitempty" json:"evaluation,omitempty"`
Evaluation *EvaluationEnvelope `json:"evaluation,omitempty"`
ScheduleStartsAt int64 `json:"scheduleStartsAt,omitempty"`
Schedule string `json:"schedule,omitempty"`
ScheduleTimezone string `json:"scheduleTimezone,omitempty"`
}
func (r *PostableRule) processRuleDefaults() error {
@@ -104,6 +108,16 @@ func (r *PostableRule) processRuleDefaults() error {
r.Evaluation = &EvaluationEnvelope{RollingEvaluation, RollingWindow{EvalWindow: r.EvalWindow, Frequency: r.Frequency}}
}
// Validate rrule schedule if present
if r.Schedule != "" {
if r.ScheduleTimezone == "" {
r.ScheduleTimezone = "UTC"
}
if err := validateRRule(r.Schedule, r.ScheduleTimezone, r.ScheduleStartsAt); err != nil {
return signozError.NewInvalidInputf(signozError.CodeInvalidInput, "invalid rrule or timezone: %v", err)
}
}
return r.Validate()
}
@@ -263,3 +277,32 @@ type GettableRule struct {
UpdatedAt *time.Time `json:"updateAt"`
UpdatedBy *string `json:"updateBy"`
}
// validateRRule validates the rrule schedule format with timezone support
func validateRRule(schedule, timezone string, at int64) error {
if schedule == "" {
return nil
}
loc, err := time.LoadLocation(timezone)
if err != nil {
return signozError.NewInvalidInputf(signozError.CodeInvalidInput, "invalid timezone: %s", err)
}
startsAt := time.UnixMilli(at)
rruleStr := "DTSTART;TZID=" + timezone + ":" + startsAt.Format("20060102T150405") + "\n" + schedule
parsedRule, err := rrule.StrToRRule(rruleStr)
if err != nil {
return signozError.NewInvalidInputf(signozError.CodeInvalidInput, "rrule parsing error: %s", err)
}
now := time.Now().In(loc)
nextOccurrence := parsedRule.After(now, false)
if nextOccurrence.IsZero() {
return signozError.NewInvalidInputf(signozError.CodeInvalidInput, "schedule will never occur in future")
}
return nil
}

View File

@@ -2,10 +2,11 @@ package ruletypes
import (
"encoding/json"
"github.com/stretchr/testify/assert"
"testing"
"time"
"github.com/stretchr/testify/assert"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
)

View File

@@ -2,13 +2,14 @@ package ruletypes
import (
"encoding/json"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/converter"
"github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/valuer"
"math"
"sort"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/converter"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/valuer"
)
type ThresholdKind struct {