Compare commits
93 Commits
main
...
feat/cron-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ee739c4cde | ||
|
|
1a575459f5 | ||
|
|
397116c6e2 | ||
|
|
be25a42007 | ||
|
|
5400700b57 | ||
|
|
294b41c79f | ||
|
|
7d9aaf2d49 | ||
|
|
1e098fa08b | ||
|
|
b50df67895 | ||
|
|
3de1888c13 | ||
|
|
3f5f63d3ac | ||
|
|
48f5defbcb | ||
|
|
efaee2030e | ||
|
|
c4fe00ecfc | ||
|
|
eec2c43011 | ||
|
|
c963064293 | ||
|
|
d26308590f | ||
|
|
12ffb02a94 | ||
|
|
d00e2c7590 | ||
|
|
7d72c03b95 | ||
|
|
29ba0b0ae9 | ||
|
|
ad8a8c6502 | ||
|
|
79fbf48e89 | ||
|
|
bb0ee343c9 | ||
|
|
1d656b7759 | ||
|
|
8f10ed417b | ||
|
|
6a86cd205a | ||
|
|
47a71db8b8 | ||
|
|
ded8b34623 | ||
|
|
64a14dfe2e | ||
|
|
776889e77d | ||
|
|
ac8daf68c2 | ||
|
|
7f56931840 | ||
|
|
a83e5e87ed | ||
|
|
a640168d26 | ||
|
|
c36eb64a5e | ||
|
|
bc2c307dbd | ||
|
|
a49a8cab9b | ||
|
|
53a7f2dbca | ||
|
|
ab7f590794 | ||
|
|
b62a19c9d7 | ||
|
|
c48622ff30 | ||
|
|
93e8045c55 | ||
|
|
47083e2de1 | ||
|
|
cdc3f2c5e8 | ||
|
|
dce653686b | ||
|
|
890743ce33 | ||
|
|
c74f976163 | ||
|
|
872323f798 | ||
|
|
d866a298c2 | ||
|
|
c4ad1f05ad | ||
|
|
88db8b3ce7 | ||
|
|
4fb227a0de | ||
|
|
56649f7747 | ||
|
|
acc5c76339 | ||
|
|
cb8ce22bfd | ||
|
|
ee3c682f1e | ||
|
|
62f61ff3c4 | ||
|
|
fb997cff04 | ||
|
|
593657a66d | ||
|
|
c1d801828f | ||
|
|
34e29da919 | ||
|
|
e8fb0e622e | ||
|
|
4d5a34fc18 | ||
|
|
2dd712a441 | ||
|
|
22113df7dd | ||
|
|
2ef6bfd012 | ||
|
|
6b923d6e16 | ||
|
|
2f317257f6 | ||
|
|
6107552af9 | ||
|
|
4ff172d129 | ||
|
|
a4540323b7 | ||
|
|
d90e4ba995 | ||
|
|
a6efeb3cbe | ||
|
|
4ac34a8ef2 | ||
|
|
b4780c96f5 | ||
|
|
5bf60674be | ||
|
|
a7bcbfea9b | ||
|
|
d309975c18 | ||
|
|
d6a39a377f | ||
|
|
284f3814b7 | ||
|
|
d7b83da3c6 | ||
|
|
2f0636f7e7 | ||
|
|
2d8803a2e8 | ||
|
|
cb7990b265 | ||
|
|
a258c24918 | ||
|
|
05bb5166b8 | ||
|
|
9695ecd57d | ||
|
|
cc33362bf2 | ||
|
|
a4eb94f141 | ||
|
|
73a6bed1af | ||
|
|
3467c2a6c8 | ||
|
|
a011389fe0 |
@@ -47,7 +47,10 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
|
||||
// create ch rule task for evalution
|
||||
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(evaluation.GetFrequency()), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
if tr.IsScheduled() {
|
||||
schedule, startTime, timezone := tr.GetSchedule()
|
||||
task.SetSchedule(schedule, startTime, timezone)
|
||||
}
|
||||
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
|
||||
|
||||
// create promql rule
|
||||
@@ -69,7 +72,10 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
|
||||
// create promql rule task for evalution
|
||||
task = newTask(baserules.TaskTypeProm, opts.TaskName, time.Duration(evaluation.GetFrequency()), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
if pr.IsScheduled() {
|
||||
schedule, startTime, timezone := pr.GetSchedule()
|
||||
task.SetSchedule(schedule, startTime, timezone)
|
||||
}
|
||||
} else if opts.Rule.RuleType == ruletypes.RuleTypeAnomaly {
|
||||
// create anomaly rule
|
||||
ar, err := NewAnomalyRule(
|
||||
@@ -91,7 +97,10 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
|
||||
// create anomaly rule task for evalution
|
||||
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(evaluation.GetFrequency()), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
if ar.IsScheduled() {
|
||||
schedule, startTime, timezone := ar.GetSchedule()
|
||||
task.SetSchedule(schedule, startTime, timezone)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
|
||||
}
|
||||
|
||||
1
go.mod
1
go.mod
@@ -55,6 +55,7 @@ require (
|
||||
github.com/spf13/cobra v1.9.1
|
||||
github.com/srikanthccv/ClickHouse-go-mock v0.12.0
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/teambition/rrule-go v1.8.2
|
||||
github.com/tidwall/gjson v1.18.0
|
||||
github.com/uptrace/bun v1.2.9
|
||||
github.com/uptrace/bun/dialect/pgdialect v1.2.9
|
||||
|
||||
2
go.sum
2
go.sum
@@ -980,6 +980,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf
|
||||
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
|
||||
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
|
||||
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
|
||||
github.com/teambition/rrule-go v1.8.2 h1:lIjpjvWTj9fFUZCmuoVDrKVOtdiyzbzc93qTmRVe/J8=
|
||||
github.com/teambition/rrule-go v1.8.2/go.mod h1:Ieq5AbrKGciP1V//Wq8ktsTXwSwJHDD5mD/wLBGl3p4=
|
||||
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
|
||||
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
|
||||
|
||||
@@ -90,6 +90,10 @@ type BaseRule struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
|
||||
evaluation ruletypes.Evaluation
|
||||
|
||||
schedule string
|
||||
scheduleStartsAt time.Time
|
||||
timezone string
|
||||
}
|
||||
|
||||
type RuleOption func(*BaseRule)
|
||||
@@ -154,6 +158,9 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader
|
||||
TemporalityMap: make(map[string]map[v3.Temporality]bool),
|
||||
Threshold: threshold,
|
||||
evaluation: evaluation,
|
||||
schedule: p.Schedule,
|
||||
scheduleStartsAt: time.UnixMilli(p.ScheduleStartsAt),
|
||||
timezone: p.ScheduleTimezone,
|
||||
}
|
||||
|
||||
if baseRule.evalWindow == 0 {
|
||||
@@ -756,3 +763,11 @@ func (r *BaseRule) PopulateTemporality(ctx context.Context, orgID valuer.UUID, q
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *BaseRule) IsScheduled() bool {
|
||||
return r.schedule != ""
|
||||
}
|
||||
|
||||
func (r *BaseRule) GetSchedule() (string, time.Time, string) {
|
||||
return r.schedule, r.scheduleStartsAt, r.timezone
|
||||
}
|
||||
|
||||
@@ -173,7 +173,10 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
|
||||
// create ch rule task for evalution
|
||||
task = newTask(TaskTypeCh, opts.TaskName, taskNamesuffix, time.Duration(evaluation.GetFrequency()), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
if tr.IsScheduled() {
|
||||
schedule, startTime, timezone := tr.GetSchedule()
|
||||
task.SetSchedule(schedule, startTime, timezone)
|
||||
}
|
||||
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
|
||||
|
||||
// create promql rule
|
||||
@@ -195,7 +198,10 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
|
||||
// create promql rule task for evalution
|
||||
task = newTask(TaskTypeProm, opts.TaskName, taskNamesuffix, time.Duration(evaluation.GetFrequency()), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
if pr.IsScheduled() {
|
||||
schedule, startTime, timezone := pr.GetSchedule()
|
||||
task.SetSchedule(schedule, startTime, timezone)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
|
||||
}
|
||||
|
||||
@@ -39,6 +39,9 @@ type PromRuleTask struct {
|
||||
|
||||
maintenanceStore ruletypes.MaintenanceStore
|
||||
orgID valuer.UUID
|
||||
schedule string
|
||||
scheduleStartsAt time.Time
|
||||
timezone string
|
||||
}
|
||||
|
||||
// newPromRuleTask holds rules that have promql condition
|
||||
@@ -75,6 +78,10 @@ func (g *PromRuleTask) Key() string {
|
||||
return g.name + ";" + g.file
|
||||
}
|
||||
|
||||
func (g *PromRuleTask) IsCronSchedule() bool {
|
||||
return g.schedule != ""
|
||||
}
|
||||
|
||||
func (g *PromRuleTask) Type() TaskType { return TaskTypeProm }
|
||||
|
||||
// Rules returns the group's rules.
|
||||
@@ -92,36 +99,6 @@ func (g *PromRuleTask) Pause(b bool) {
|
||||
func (g *PromRuleTask) Run(ctx context.Context) {
|
||||
defer close(g.terminated)
|
||||
|
||||
// Wait an initial amount to have consistently slotted intervals.
|
||||
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
|
||||
select {
|
||||
case <-time.After(time.Until(evalTimestamp)):
|
||||
case <-g.done:
|
||||
return
|
||||
}
|
||||
|
||||
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||
"ruleGroup": map[string]string{
|
||||
"name": g.Name(),
|
||||
},
|
||||
})
|
||||
|
||||
iter := func() {
|
||||
|
||||
start := time.Now()
|
||||
g.Eval(ctx, evalTimestamp)
|
||||
timeSinceStart := time.Since(start)
|
||||
|
||||
g.setEvaluationTime(timeSinceStart)
|
||||
g.setLastEvaluation(start)
|
||||
}
|
||||
|
||||
// The assumption here is that since the ticker was started after having
|
||||
// waited for `evalTimestamp` to pass, the ticks will trigger soon
|
||||
// after each `evalTimestamp + N * g.frequency` occurrence.
|
||||
tick := time.NewTicker(g.frequency)
|
||||
defer tick.Stop()
|
||||
|
||||
// defer cleanup
|
||||
defer func() {
|
||||
if !g.markStale {
|
||||
@@ -140,23 +117,36 @@ func (g *PromRuleTask) Run(ctx context.Context) {
|
||||
|
||||
}()
|
||||
|
||||
iter()
|
||||
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||
"ruleGroup": map[string]string{
|
||||
"name": g.Name(),
|
||||
},
|
||||
})
|
||||
|
||||
// let the group iterate and run
|
||||
for {
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
default:
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
case <-tick.C:
|
||||
missed := (time.Since(evalTimestamp) / g.frequency) - 1
|
||||
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
|
||||
iter()
|
||||
}
|
||||
if g.IsCronSchedule() {
|
||||
evalFunc := createCronEvalFunction(&g.pause, g.Eval, g.setEvaluationTime, g.setLastEvaluation, ctx)
|
||||
err := runCronScheduledTask(g.schedule, g.scheduleStartsAt, g.timezone, g.done, evalFunc)
|
||||
if err != nil {
|
||||
zap.L().Error("cron scheduler failed", zap.String("rule", g.Name()), zap.Error(err))
|
||||
}
|
||||
} else {
|
||||
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
|
||||
|
||||
evalFunc := createIntervalEvalFunction(
|
||||
&g.pause,
|
||||
g.Eval,
|
||||
g.setEvaluationTime,
|
||||
g.setLastEvaluation,
|
||||
ctx,
|
||||
func() time.Time { return evalTimestamp },
|
||||
)
|
||||
|
||||
runIntervalScheduledTask(
|
||||
g.frequency,
|
||||
g.done,
|
||||
func() time.Time { return evalTimestamp },
|
||||
evalFunc,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -245,6 +235,14 @@ func (g *PromRuleTask) setLastEvaluation(ts time.Time) {
|
||||
g.lastEvaluation = ts
|
||||
}
|
||||
|
||||
func (g *PromRuleTask) SetSchedule(schedule string, t time.Time, timezone string) {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
g.schedule = schedule
|
||||
g.scheduleStartsAt = t
|
||||
g.timezone = timezone
|
||||
}
|
||||
|
||||
// EvalTimestamp returns the immediately preceding consistently slotted evaluation time.
|
||||
func (g *PromRuleTask) EvalTimestamp(startTime int64) time.Time {
|
||||
var (
|
||||
|
||||
@@ -36,6 +36,11 @@ type RuleTask struct {
|
||||
|
||||
maintenanceStore ruletypes.MaintenanceStore
|
||||
orgID valuer.UUID
|
||||
|
||||
// New field for rrule-based scheduling
|
||||
schedule string
|
||||
scheduleStartsAt time.Time
|
||||
timezone string
|
||||
}
|
||||
|
||||
const DefaultFrequency = 1 * time.Minute
|
||||
@@ -71,6 +76,10 @@ func (g *RuleTask) Key() string {
|
||||
return g.name + ";" + g.file
|
||||
}
|
||||
|
||||
func (g *RuleTask) IsCronSchedule() bool {
|
||||
return g.schedule != ""
|
||||
}
|
||||
|
||||
// Name returns the group name.
|
||||
func (g *RuleTask) Type() TaskType { return TaskTypeCh }
|
||||
|
||||
@@ -95,58 +104,37 @@ func NewQueryOriginContext(ctx context.Context, data map[string]interface{}) con
|
||||
func (g *RuleTask) Run(ctx context.Context) {
|
||||
defer close(g.terminated)
|
||||
|
||||
// Wait an initial amount to have consistently slotted intervals.
|
||||
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
|
||||
zap.L().Debug("group run to begin at", zap.Time("evalTimestamp", evalTimestamp))
|
||||
select {
|
||||
case <-time.After(time.Until(evalTimestamp)):
|
||||
case <-g.done:
|
||||
return
|
||||
}
|
||||
|
||||
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||
"ruleRuleTask": map[string]string{
|
||||
"name": g.Name(),
|
||||
},
|
||||
})
|
||||
|
||||
iter := func() {
|
||||
if g.pause {
|
||||
// todo(amol): remove in memory active alerts
|
||||
// and last series state
|
||||
return
|
||||
if g.IsCronSchedule() {
|
||||
evalFunc := createCronEvalFunction(&g.pause, g.Eval, g.setEvaluationTime, g.setLastEvaluation, ctx)
|
||||
err := runCronScheduledTask(g.schedule, g.scheduleStartsAt, g.timezone, g.done, evalFunc)
|
||||
if err != nil {
|
||||
zap.L().Error("cron scheduler failed", zap.String("rule", g.Name()), zap.Error(err))
|
||||
}
|
||||
start := time.Now()
|
||||
g.Eval(ctx, evalTimestamp)
|
||||
timeSinceStart := time.Since(start)
|
||||
} else {
|
||||
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
|
||||
zap.L().Debug("group run to begin at", zap.Time("evalTimestamp", evalTimestamp))
|
||||
|
||||
g.setEvaluationTime(timeSinceStart)
|
||||
g.setLastEvaluation(start)
|
||||
}
|
||||
evalFunc := createIntervalEvalFunction(
|
||||
&g.pause,
|
||||
g.Eval,
|
||||
g.setEvaluationTime,
|
||||
g.setLastEvaluation,
|
||||
ctx,
|
||||
func() time.Time { return evalTimestamp },
|
||||
)
|
||||
|
||||
// The assumption here is that since the ticker was started after having
|
||||
// waited for `evalTimestamp` to pass, the ticks will trigger soon
|
||||
// after each `evalTimestamp + N * g.frequency` occurrence.
|
||||
tick := time.NewTicker(g.frequency)
|
||||
defer tick.Stop()
|
||||
|
||||
iter()
|
||||
|
||||
// let the group iterate and run
|
||||
for {
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
default:
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
case <-tick.C:
|
||||
missed := (time.Since(evalTimestamp) / g.frequency) - 1
|
||||
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
|
||||
iter()
|
||||
}
|
||||
}
|
||||
runIntervalScheduledTask(
|
||||
g.frequency,
|
||||
g.done,
|
||||
func() time.Time { return evalTimestamp },
|
||||
evalFunc,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -298,6 +286,14 @@ func (g *RuleTask) CopyState(fromTask Task) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *RuleTask) SetSchedule(schedule string, t time.Time, timezone string) {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
g.schedule = schedule
|
||||
g.scheduleStartsAt = t
|
||||
g.timezone = timezone
|
||||
}
|
||||
|
||||
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
|
||||
func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
|
||||
|
||||
|
||||
@@ -28,6 +28,8 @@ type Task interface {
|
||||
Rules() []Rule
|
||||
Stop()
|
||||
Pause(b bool)
|
||||
IsCronSchedule() bool
|
||||
SetSchedule(string, time.Time, string)
|
||||
}
|
||||
|
||||
// newTask returns an appropriate group for
|
||||
|
||||
172
pkg/query-service/rules/task_common.go
Normal file
172
pkg/query-service/rules/task_common.go
Normal file
@@ -0,0 +1,172 @@
|
||||
package rules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"time"
|
||||
|
||||
"github.com/teambition/rrule-go"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// runCronScheduledTask handles cron-based scheduling with timezone support
|
||||
func runCronScheduledTask(
|
||||
schedule string,
|
||||
scheduleStartsAt time.Time,
|
||||
timezone string,
|
||||
done chan struct{},
|
||||
evalFunc func(),
|
||||
) error {
|
||||
// Load the timezone
|
||||
loc, err := time.LoadLocation(timezone)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to load timezone", zap.String("timezone", timezone), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// Convert start time to the specified timezone
|
||||
startTimeInTZ := scheduleStartsAt.In(loc)
|
||||
|
||||
// Format DTSTART with timezone info
|
||||
var rruleStr string
|
||||
if loc == time.UTC {
|
||||
rruleStr = "DTSTART:" + startTimeInTZ.Format("20060102T150405Z") + "\n" + schedule
|
||||
} else {
|
||||
// For non-UTC timezones, include timezone info
|
||||
rruleStr = "DTSTART;TZID=" + timezone + ":" + startTimeInTZ.Format("20060102T150405") + "\n" + schedule
|
||||
}
|
||||
|
||||
parsedSchedule, err := rrule.StrToRRule(rruleStr)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to parse rrule expression", zap.String("rrule", schedule), zap.String("timezone", timezone), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// Use timezone-aware current time
|
||||
now := time.Now().In(loc)
|
||||
nextRun := parsedSchedule.After(now, false)
|
||||
|
||||
if nextRun.IsZero() {
|
||||
zap.L().Error("no future runs found for schedule", zap.String("schedule", schedule), zap.String("timezone", timezone))
|
||||
return errors.New(errors.TypeCanceled, errors.CodeCanceled, "no future runs found for schedule")
|
||||
}
|
||||
|
||||
zap.L().Debug("cron scheduler starting", zap.String("schedule", schedule), zap.String("timezone", timezone), zap.Time("nextRun", nextRun))
|
||||
|
||||
select {
|
||||
case <-time.After(time.Until(nextRun)):
|
||||
case <-done:
|
||||
return nil
|
||||
}
|
||||
|
||||
evalFunc()
|
||||
currentRun := nextRun
|
||||
|
||||
for {
|
||||
nextRun = parsedSchedule.After(currentRun, false)
|
||||
|
||||
if nextRun.IsZero() {
|
||||
zap.L().Info("no more scheduled runs", zap.String("schedule", schedule))
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
return nil
|
||||
default:
|
||||
select {
|
||||
case <-done:
|
||||
return nil
|
||||
case <-time.After(time.Until(nextRun)):
|
||||
now := time.Now().In(loc)
|
||||
if now.After(nextRun.Add(time.Minute)) {
|
||||
zap.L().Warn("missed scheduled run",
|
||||
zap.String("schedule", schedule),
|
||||
zap.String("timezone", timezone),
|
||||
zap.Time("scheduled", nextRun),
|
||||
zap.Time("actual", now))
|
||||
}
|
||||
currentRun = nextRun
|
||||
evalFunc()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runIntervalScheduledTask handles interval-based scheduling
|
||||
func runIntervalScheduledTask(
|
||||
frequency time.Duration,
|
||||
done chan struct{},
|
||||
evalTimestampFunc func() time.Time,
|
||||
evalFunc func(),
|
||||
) {
|
||||
evalTimestamp := evalTimestampFunc()
|
||||
|
||||
select {
|
||||
case <-time.After(time.Until(evalTimestamp)):
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
|
||||
tick := time.NewTicker(frequency)
|
||||
defer tick.Stop()
|
||||
|
||||
evalFunc()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-tick.C:
|
||||
missed := (time.Since(evalTimestamp) / frequency) - 1
|
||||
evalTimestamp = evalTimestamp.Add((missed + 1) * frequency)
|
||||
evalFunc()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// createCronEvalFunction creates evaluation function for cron scheduling
|
||||
func createCronEvalFunction(
|
||||
pause *bool,
|
||||
evalFunc func(ctx context.Context, ts time.Time),
|
||||
setEvaluationTime func(time.Duration),
|
||||
setLastEvaluation func(time.Time),
|
||||
ctx context.Context,
|
||||
) func() {
|
||||
return func() {
|
||||
if *pause {
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
evalFunc(ctx, start)
|
||||
timeSinceStart := time.Since(start)
|
||||
setEvaluationTime(timeSinceStart)
|
||||
setLastEvaluation(start)
|
||||
}
|
||||
}
|
||||
|
||||
// createIntervalEvalFunction creates evaluation function for interval scheduling
|
||||
func createIntervalEvalFunction(
|
||||
pause *bool,
|
||||
evalFunc func(ctx context.Context, ts time.Time),
|
||||
setEvaluationTime func(time.Duration),
|
||||
setLastEvaluation func(time.Time),
|
||||
ctx context.Context,
|
||||
getEvalTimestamp func() time.Time,
|
||||
) func() {
|
||||
return func() {
|
||||
if *pause {
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
evalFunc(ctx, getEvalTimestamp())
|
||||
timeSinceStart := time.Since(start)
|
||||
setEvaluationTime(timeSinceStart)
|
||||
setLastEvaluation(start)
|
||||
}
|
||||
}
|
||||
@@ -609,7 +609,6 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
|
||||
}
|
||||
|
||||
value := valueFormatter.Format(smpl.V, r.Unit())
|
||||
//todo(aniket): handle different threshold
|
||||
threshold := valueFormatter.Format(r.targetVal(), r.Unit())
|
||||
r.logger.DebugContext(ctx, "Alert template data for rule", "rule_name", r.Name(), "formatter", valueFormatter.Name(), "value", value, "threshold", threshold)
|
||||
|
||||
|
||||
@@ -6,10 +6,11 @@ import (
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/teambition/rrule-go"
|
||||
|
||||
signozError "github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/times"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
|
||||
)
|
||||
@@ -51,7 +52,10 @@ type PostableRule struct {
|
||||
|
||||
Version string `json:"version,omitempty"`
|
||||
|
||||
Evaluation *EvaluationEnvelope `yaml:"evaluation,omitempty" json:"evaluation,omitempty"`
|
||||
Evaluation *EvaluationEnvelope `json:"evaluation,omitempty"`
|
||||
ScheduleStartsAt int64 `json:"scheduleStartsAt,omitempty"`
|
||||
Schedule string `json:"schedule,omitempty"`
|
||||
ScheduleTimezone string `json:"scheduleTimezone,omitempty"`
|
||||
}
|
||||
|
||||
func (r *PostableRule) processRuleDefaults() error {
|
||||
@@ -104,6 +108,16 @@ func (r *PostableRule) processRuleDefaults() error {
|
||||
r.Evaluation = &EvaluationEnvelope{RollingEvaluation, RollingWindow{EvalWindow: r.EvalWindow, Frequency: r.Frequency}}
|
||||
}
|
||||
|
||||
// Validate rrule schedule if present
|
||||
if r.Schedule != "" {
|
||||
if r.ScheduleTimezone == "" {
|
||||
r.ScheduleTimezone = "UTC"
|
||||
}
|
||||
if err := validateRRule(r.Schedule, r.ScheduleTimezone, r.ScheduleStartsAt); err != nil {
|
||||
return signozError.NewInvalidInputf(signozError.CodeInvalidInput, "invalid rrule or timezone: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return r.Validate()
|
||||
}
|
||||
|
||||
@@ -263,3 +277,32 @@ type GettableRule struct {
|
||||
UpdatedAt *time.Time `json:"updateAt"`
|
||||
UpdatedBy *string `json:"updateBy"`
|
||||
}
|
||||
|
||||
// validateRRule validates the rrule schedule format with timezone support
|
||||
func validateRRule(schedule, timezone string, at int64) error {
|
||||
if schedule == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
loc, err := time.LoadLocation(timezone)
|
||||
if err != nil {
|
||||
return signozError.NewInvalidInputf(signozError.CodeInvalidInput, "invalid timezone: %s", err)
|
||||
}
|
||||
|
||||
startsAt := time.UnixMilli(at)
|
||||
|
||||
rruleStr := "DTSTART;TZID=" + timezone + ":" + startsAt.Format("20060102T150405") + "\n" + schedule
|
||||
|
||||
parsedRule, err := rrule.StrToRRule(rruleStr)
|
||||
if err != nil {
|
||||
return signozError.NewInvalidInputf(signozError.CodeInvalidInput, "rrule parsing error: %s", err)
|
||||
}
|
||||
|
||||
now := time.Now().In(loc)
|
||||
nextOccurrence := parsedRule.After(now, false)
|
||||
if nextOccurrence.IsZero() {
|
||||
return signozError.NewInvalidInputf(signozError.CodeInvalidInput, "schedule will never occur in future")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2,10 +2,11 @@ package ruletypes
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
)
|
||||
|
||||
|
||||
@@ -2,13 +2,14 @@ package ruletypes
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/converter"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/converter"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type ThresholdKind struct {
|
||||
|
||||
Reference in New Issue
Block a user