fix: deadlock in prom rule (#9741)

This commit is contained in:
Abhishek Kumar Singh
2025-12-02 12:27:08 +05:30
committed by GitHub
parent cde99ba1a0
commit c8608c18ae
7 changed files with 751 additions and 148 deletions

3
go.mod
View File

@@ -114,7 +114,6 @@ require (
github.com/armon/go-metrics v0.4.1 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/aws/aws-sdk-go v1.55.7 // indirect
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect
github.com/beevik/etree v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
@@ -158,7 +157,6 @@ require (
github.com/golang/snappy v1.0.0 // indirect
github.com/google/btree v1.1.3 // indirect
github.com/google/cel-go v0.26.1 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/google/s2a-go v0.1.9 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/googleapis/gax-go/v2 v2.14.2 // indirect
@@ -324,7 +322,6 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.38.0
go.opentelemetry.io/proto/otlp v1.8.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
go.uber.org/mock v0.6.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/mod v0.27.0 // indirect

View File

@@ -1,55 +1,42 @@
package prometheustest
import (
"log/slog"
"os"
"time"
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/prometheus/clickhouseprometheus"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/storage/remote"
)
var _ prometheus.Prometheus = (*Provider)(nil)
type Provider struct {
db *tsdb.DB
dir string
engine *prometheus.Engine
queryable storage.SampleAndChunkQueryable
engine *prometheus.Engine
}
func New(logger *slog.Logger, cfg prometheus.Config, outOfOrderTimeWindow ...int64) *Provider {
dir, err := os.MkdirTemp("", "test_storage")
if err != nil {
panic(err)
}
var stCallback = func() (int64, error) {
return int64(model.Latest), nil
}
// Tests just load data for a series sequentially. Thus we
// need a long appendable window.
opts := tsdb.DefaultOptions()
opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.RetentionDuration = 0
opts.EnableNativeHistograms = true
func New(ctx context.Context, providerSettings factory.ProviderSettings, config prometheus.Config, telemetryStore telemetrystore.TelemetryStore) *Provider {
// Set OutOfOrderTimeWindow if provided, otherwise use default (0)
if len(outOfOrderTimeWindow) > 0 {
opts.OutOfOrderTimeWindow = outOfOrderTimeWindow[0]
} else {
opts.OutOfOrderTimeWindow = 0 // Default value is zero
}
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/prometheus/prometheustest")
db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats())
if err != nil {
panic(err)
}
engine := prometheus.NewEngine(settings.Logger(), config)
engine := prometheus.NewEngine(logger, cfg)
readClient := clickhouseprometheus.NewReadClient(settings, telemetryStore)
queryable := remote.NewSampleAndChunkQueryableClient(readClient, labels.EmptyLabels(), []*labels.Matcher{}, false, stCallback)
return &Provider{
db: db,
dir: dir,
engine: engine,
engine: engine,
queryable: queryable,
}
}
@@ -58,12 +45,12 @@ func (provider *Provider) Engine() *prometheus.Engine {
}
func (provider *Provider) Storage() storage.Queryable {
return provider.db
return provider.queryable
}
func (provider *Provider) Close() error {
if err := provider.db.Close(); err != nil {
return err
if provider.engine != nil {
provider.engine.Close()
}
return os.RemoveAll(provider.dir)
return nil
}

View File

@@ -1405,7 +1405,7 @@ func Test_querier_Traces_runWindowBasedListQueryDesc(t *testing.T) {
reader := clickhouseReader.NewReader(
nil,
telemetryStore,
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),
"",
time.Duration(time.Second),
nil,
@@ -1630,7 +1630,7 @@ func Test_querier_Traces_runWindowBasedListQueryAsc(t *testing.T) {
reader := clickhouseReader.NewReader(
nil,
telemetryStore,
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),
"",
time.Duration(time.Second),
nil,
@@ -1930,7 +1930,7 @@ func Test_querier_Logs_runWindowBasedListQueryDesc(t *testing.T) {
reader := clickhouseReader.NewReader(
nil,
telemetryStore,
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),
"",
time.Duration(time.Second),
nil,
@@ -2157,7 +2157,7 @@ func Test_querier_Logs_runWindowBasedListQueryAsc(t *testing.T) {
reader := clickhouseReader.NewReader(
nil,
telemetryStore,
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),
"",
time.Duration(time.Second),
nil,

View File

@@ -1457,7 +1457,7 @@ func Test_querier_Traces_runWindowBasedListQueryDesc(t *testing.T) {
reader := clickhouseReader.NewReader(
nil,
telemetryStore,
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),
"",
time.Duration(time.Second),
nil,
@@ -1682,7 +1682,7 @@ func Test_querier_Traces_runWindowBasedListQueryAsc(t *testing.T) {
reader := clickhouseReader.NewReader(
nil,
telemetryStore,
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),
"",
time.Duration(time.Second),
nil,
@@ -1981,7 +1981,7 @@ func Test_querier_Logs_runWindowBasedListQueryDesc(t *testing.T) {
reader := clickhouseReader.NewReader(
nil,
telemetryStore,
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),
"",
time.Duration(time.Second),
nil,
@@ -2208,7 +2208,7 @@ func Test_querier_Logs_runWindowBasedListQueryAsc(t *testing.T) {
reader := clickhouseReader.NewReader(
nil,
telemetryStore,
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),
"",
time.Duration(time.Second),
nil,

View File

@@ -119,15 +119,10 @@ func (r *PromRule) getPqlQuery() (string, error) {
return "", fmt.Errorf("invalid promql rule query")
}
func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) {
prevState := r.State()
func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletypes.Vector, error) {
start, end := r.Timestamps(ts)
interval := 60 * time.Second // TODO(srikanthccv): this should be configurable
valueFormatter := formatter.FromUnit(r.Unit())
q, err := r.getPqlQuery()
if err != nil {
return nil, err
@@ -140,12 +135,35 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
return nil, err
}
var resultVector ruletypes.Vector
for _, series := range res {
resultSeries, err := r.Threshold.Eval(toCommonSeries(series), r.Unit(), ruletypes.EvalData{
ActiveAlerts: r.ActiveAlertsLabelFP(),
})
if err != nil {
return nil, err
}
resultVector = append(resultVector, resultSeries...)
}
return resultVector, nil
}
func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) {
prevState := r.State()
valueFormatter := formatter.FromUnit(r.Unit())
// prepare query, run query get data and filter the data based on the threshold
results, err := r.buildAndRunQuery(ctx, ts)
if err != nil {
return nil, err
}
r.mtx.Lock()
defer r.mtx.Unlock()
resultFPs := map[uint64]struct{}{}
var alerts = make(map[uint64]*ruletypes.Alert, len(res))
var alerts = make(map[uint64]*ruletypes.Alert, len(results))
ruleReceivers := r.Threshold.GetRuleReceivers()
ruleReceiverMap := make(map[string][]string)
@@ -153,90 +171,76 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
ruleReceiverMap[value.Name] = value.Channels
}
for _, series := range res {
for _, result := range results {
l := make(map[string]string, len(result.Metric))
for _, lbl := range result.Metric {
l[lbl.Name] = lbl.Value
}
r.logger.DebugContext(ctx, "alerting for series", "rule_name", r.Name(), "series", result)
if len(series.Floats) == 0 {
continue
threshold := valueFormatter.Format(result.Target, result.TargetUnit)
tmplData := ruletypes.AlertTemplateData(l, valueFormatter.Format(result.V, r.Unit()), threshold)
// Inject some convenience variables that are easier to remember for users
// who are not used to Go's templating system.
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
expand := func(text string) string {
tmpl := ruletypes.NewTemplateExpander(
ctx,
defs+text,
"__alert_"+r.Name(),
tmplData,
times.Time(timestamp.FromTime(ts)),
nil,
)
result, err := tmpl.Expand()
if err != nil {
result = fmt.Sprintf("<error expanding template: %s>", err)
r.logger.WarnContext(ctx, "Expanding alert template failed", "rule_name", r.Name(), "error", err, "data", tmplData)
}
return result
}
results, err := r.Threshold.Eval(toCommonSeries(series), r.Unit(), ruletypes.EvalData{
ActiveAlerts: r.ActiveAlertsLabelFP(),
})
if err != nil {
lb := qslabels.NewBuilder(result.Metric).Del(qslabels.MetricNameLabel)
resultLabels := qslabels.NewBuilder(result.Metric).Del(qslabels.MetricNameLabel).Labels()
for name, value := range r.labels.Map() {
lb.Set(name, expand(value))
}
lb.Set(qslabels.AlertNameLabel, r.Name())
lb.Set(qslabels.AlertRuleIdLabel, r.ID())
lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL())
annotations := make(qslabels.Labels, 0, len(r.annotations.Map()))
for name, value := range r.annotations.Map() {
annotations = append(annotations, qslabels.Label{Name: name, Value: expand(value)})
}
lbs := lb.Labels()
h := lbs.Hash()
resultFPs[h] = struct{}{}
if _, ok := alerts[h]; ok {
err = fmt.Errorf("vector contains metrics with the same labelset after applying alert labels")
// We have already acquired the lock above hence using SetHealth and
// SetLastError will deadlock.
r.health = ruletypes.HealthBad
r.lastError = err
return nil, err
}
for _, result := range results {
l := make(map[string]string, len(series.Metric))
for _, lbl := range series.Metric {
l[lbl.Name] = lbl.Value
}
r.logger.DebugContext(ctx, "alerting for series", "rule_name", r.Name(), "series", series)
threshold := valueFormatter.Format(result.Target, result.TargetUnit)
tmplData := ruletypes.AlertTemplateData(l, valueFormatter.Format(result.V, r.Unit()), threshold)
// Inject some convenience variables that are easier to remember for users
// who are not used to Go's templating system.
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
expand := func(text string) string {
tmpl := ruletypes.NewTemplateExpander(
ctx,
defs+text,
"__alert_"+r.Name(),
tmplData,
times.Time(timestamp.FromTime(ts)),
nil,
)
result, err := tmpl.Expand()
if err != nil {
result = fmt.Sprintf("<error expanding template: %s>", err)
r.logger.WarnContext(ctx, "Expanding alert template failed", "rule_name", r.Name(), "error", err, "data", tmplData)
}
return result
}
lb := qslabels.NewBuilder(result.Metric).Del(qslabels.MetricNameLabel)
resultLabels := qslabels.NewBuilder(result.Metric).Del(qslabels.MetricNameLabel).Labels()
for name, value := range r.labels.Map() {
lb.Set(name, expand(value))
}
lb.Set(qslabels.AlertNameLabel, r.Name())
lb.Set(qslabels.AlertRuleIdLabel, r.ID())
lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL())
annotations := make(qslabels.Labels, 0, len(r.annotations.Map()))
for name, value := range r.annotations.Map() {
annotations = append(annotations, qslabels.Label{Name: name, Value: expand(value)})
}
lbs := lb.Labels()
h := lbs.Hash()
resultFPs[h] = struct{}{}
if _, ok := alerts[h]; ok {
err = fmt.Errorf("vector contains metrics with the same labelset after applying alert labels")
// We have already acquired the lock above hence using SetHealth and
// SetLastError will deadlock.
r.health = ruletypes.HealthBad
r.lastError = err
return nil, err
}
alerts[h] = &ruletypes.Alert{
Labels: lbs,
QueryResultLables: resultLabels,
Annotations: annotations,
ActiveAt: ts,
State: model.StatePending,
Value: result.V,
GeneratorURL: r.GeneratorURL(),
Receivers: ruleReceiverMap[lbs.Map()[ruletypes.LabelThresholdName]],
IsRecovering: result.IsRecovering,
}
alerts[h] = &ruletypes.Alert{
Labels: lbs,
QueryResultLables: resultLabels,
Annotations: annotations,
ActiveAt: ts,
State: model.StatePending,
Value: result.V,
GeneratorURL: r.GeneratorURL(),
Receivers: ruleReceiverMap[lbs.Map()[ruletypes.LabelThresholdName]],
IsRecovering: result.IsRecovering,
}
}

View File

@@ -1,14 +1,23 @@
package rules
import (
"context"
"strings"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/prometheus/prometheustest"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
pql "github.com/prometheus/prometheus/promql"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/assert"
)
@@ -723,6 +732,612 @@ func TestPromRuleEval(t *testing.T) {
assert.Empty(t, resultVectors, "Expected no alert but got result vectors for case %d", idx)
}
}
}
}
func TestPromRuleUnitCombinations(t *testing.T) {
// fixed base time for deterministic tests
baseTime := time.Unix(1700000000, 0)
evalTime := baseTime.Add(5 * time.Minute)
postableRule := ruletypes.PostableRule{
AlertName: "Units test",
AlertType: ruletypes.AlertTypeMetric,
RuleType: ruletypes.RuleTypeProm,
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
}},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{
"A": {
Query: "test_metric",
},
},
},
},
}
// time_series_v4 cols of interest
fingerprintCols := []cmock.ColumnType{
{Name: "fingerprint", Type: "UInt64"},
{Name: "any(labels)", Type: "String"},
}
// samples_v4 columns
samplesCols := []cmock.ColumnType{
{Name: "metric_name", Type: "String"},
{Name: "fingerprint", Type: "UInt64"},
{Name: "unix_milli", Type: "Int64"},
{Name: "value", Type: "Float64"},
{Name: "flags", Type: "UInt32"},
}
// see Timestamps on base_rule
evalWindowMs := int64(5 * 60 * 1000) // 5 minutes in ms
evalTimeMs := evalTime.UnixMilli()
queryStart := ((evalTimeMs-2*evalWindowMs)/60000)*60000 + 1 // truncate to minute + 1ms
queryEnd := (evalTimeMs / 60000) * 60000 // truncate to minute
cases := []struct {
targetUnit string
yAxisUnit string
values []struct {
timestamp time.Time
value float64
}
expectAlerts int
compareOp string
matchType string
target float64
summaryAny []string
}{
{
targetUnit: "s",
yAxisUnit: "ns",
values: []struct {
timestamp time.Time
value float64
}{
{baseTime, 572588400}, // 0.57 seconds
{baseTime.Add(1 * time.Minute), 572386400}, // 0.57 seconds
{baseTime.Add(2 * time.Minute), 300947400}, // 0.3 seconds
{baseTime.Add(3 * time.Minute), 299316000}, // 0.3 seconds
{baseTime.Add(4 * time.Minute), 66640400.00000001}, // 0.06 seconds
},
expectAlerts: 0,
compareOp: "1", // Above
matchType: "1", // Once
target: 1, // 1 second
},
{
targetUnit: "ms",
yAxisUnit: "ns",
values: []struct {
timestamp time.Time
value float64
}{
{baseTime, 572588400}, // 572.58 ms
{baseTime.Add(1 * time.Minute), 572386400}, // 572.38 ms
{baseTime.Add(2 * time.Minute), 300947400}, // 300.94 ms
{baseTime.Add(3 * time.Minute), 299316000}, // 299.31 ms
{baseTime.Add(4 * time.Minute), 66640400.00000001}, // 66.64 ms
},
expectAlerts: 1,
compareOp: "1", // Above
matchType: "1", // Once
target: 200, // 200 ms
summaryAny: []string{
"observed metric value is 299 ms",
"the observed metric value is 573 ms",
"the observed metric value is 572 ms",
"the observed metric value is 301 ms",
},
},
{
targetUnit: "decgbytes",
yAxisUnit: "bytes",
values: []struct {
timestamp time.Time
value float64
}{
{baseTime, 2863284053}, // 2.86 GB
{baseTime.Add(1 * time.Minute), 2863388842}, // 2.86 GB
{baseTime.Add(2 * time.Minute), 300947400}, // 0.3 GB
{baseTime.Add(3 * time.Minute), 299316000}, // 0.3 GB
{baseTime.Add(4 * time.Minute), 66640400.00000001}, // 66.64 MB
},
expectAlerts: 0,
compareOp: "1", // Above
matchType: "1", // Once
target: 200, // 200 GB
},
{
targetUnit: "decgbytes",
yAxisUnit: "By",
values: []struct {
timestamp time.Time
value float64
}{
{baseTime, 2863284053}, // 2.86 GB
{baseTime.Add(1 * time.Minute), 2863388842}, // 2.86 GB
{baseTime.Add(2 * time.Minute), 300947400}, // 0.3 GB
{baseTime.Add(3 * time.Minute), 299316000}, // 0.3 GB
{baseTime.Add(4 * time.Minute), 66640400.00000001}, // 66.64 MB
},
expectAlerts: 0,
compareOp: "1", // Above
matchType: "1", // Once
target: 200, // 200 GB
},
{
targetUnit: "h",
yAxisUnit: "min",
values: []struct {
timestamp time.Time
value float64
}{
{baseTime, 55}, // 55 minutes
{baseTime.Add(1 * time.Minute), 57}, // 57 minutes
{baseTime.Add(2 * time.Minute), 30}, // 30 minutes
{baseTime.Add(3 * time.Minute), 29}, // 29 minutes
},
expectAlerts: 0,
compareOp: "1", // Above
matchType: "1", // Once
target: 1, // 1 hour
},
}
logger := instrumentationtest.New().Logger()
for idx, c := range cases {
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
// single fingerprint with labels JSON
fingerprint := uint64(12345)
labelsJSON := `{"__name__":"test_metric"}`
fingerprintData := [][]interface{}{
{fingerprint, labelsJSON},
}
fingerprintRows := cmock.NewRows(fingerprintCols, fingerprintData)
// create samples data from test case values
samplesData := make([][]interface{}, len(c.values))
for i, v := range c.values {
samplesData[i] = []interface{}{
"test_metric",
fingerprint,
v.timestamp.UnixMilli(),
v.value,
uint32(0), // flags - 0 means normal value, 1 means stale, we are not doing staleness tests
}
}
samplesRows := cmock.NewRows(samplesCols, samplesData)
// args: $1=metric_name, $2=label_name, $3=label_value
telemetryStore.Mock().
ExpectQuery("SELECT fingerprint, any").
WithArgs("test_metric", "__name__", "test_metric").
WillReturnRows(fingerprintRows)
// args: $1=metric_name (outer), $2=metric_name (subquery), $3=label_name, $4=label_value, $5=start, $6=end
telemetryStore.Mock().
ExpectQuery("SELECT metric_name, fingerprint, unix_milli").
WithArgs(
"test_metric",
"test_metric",
"__name__",
"test_metric",
queryStart,
queryEnd,
).
WillReturnRows(samplesRows)
promProvider := prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore)
postableRule.RuleCondition.CompareOp = ruletypes.CompareOp(c.compareOp)
postableRule.RuleCondition.MatchType = ruletypes.MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
postableRule.RuleCondition.TargetUnit = c.targetUnit
postableRule.RuleCondition.Thresholds = &ruletypes.RuleThresholdData{
Kind: ruletypes.BasicThresholdKind,
Spec: ruletypes.BasicRuleThresholds{
{
Name: postableRule.AlertName,
TargetValue: &c.target,
TargetUnit: c.targetUnit,
MatchType: ruletypes.MatchType(c.matchType),
CompareOp: ruletypes.CompareOp(c.compareOp),
},
},
}
postableRule.Annotations = map[string]string{
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Duration(time.Second), nil, nil, options)
rule, err := NewPromRule("69", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider)
if err != nil {
assert.NoError(t, err)
promProvider.Close()
continue
}
retVal, err := rule.Eval(context.Background(), evalTime)
if err != nil {
assert.NoError(t, err)
promProvider.Close()
continue
}
assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx)
if c.expectAlerts != 0 {
foundCount := 0
for _, item := range rule.Active {
for _, summary := range c.summaryAny {
if strings.Contains(item.Annotations.Get("summary"), summary) {
foundCount++
break
}
}
}
assert.Equal(t, c.expectAlerts, foundCount, "case %d", idx)
}
promProvider.Close()
}
}
// TODO(abhishekhugetech): enable this
func _Enable_this_after_9146_issue_fix_is_merged_TestPromRuleNoData(t *testing.T) {
baseTime := time.Unix(1700000000, 0)
evalTime := baseTime.Add(5 * time.Minute)
postableRule := ruletypes.PostableRule{
AlertName: "No data test",
AlertType: ruletypes.AlertTypeMetric,
RuleType: ruletypes.RuleTypeProm,
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
}},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{
"A": {
Query: "test_metric",
},
},
},
AlertOnAbsent: true,
},
}
// time_series_v4 cols of interest
fingerprintCols := []cmock.ColumnType{
{Name: "fingerprint", Type: "UInt64"},
{Name: "any(labels)", Type: "String"},
}
cases := []struct {
values []struct {
timestamp time.Time
value float64
}
expectNoData bool
}{
{
values: []struct {
timestamp time.Time
value float64
}{},
expectNoData: true,
},
}
logger := instrumentationtest.New().Logger()
for idx, c := range cases {
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
// no data
fingerprintData := [][]interface{}{}
fingerprintRows := cmock.NewRows(fingerprintCols, fingerprintData)
// no rows == no data
telemetryStore.Mock().
ExpectQuery("SELECT fingerprint, any").
WithArgs("test_metric", "__name__", "test_metric").
WillReturnRows(fingerprintRows)
promProvider := prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore)
var target float64 = 0
postableRule.RuleCondition.Thresholds = &ruletypes.RuleThresholdData{
Kind: ruletypes.BasicThresholdKind,
Spec: ruletypes.BasicRuleThresholds{
{
Name: postableRule.AlertName,
TargetValue: &target,
MatchType: ruletypes.AtleastOnce,
CompareOp: ruletypes.ValueIsEq,
},
},
}
postableRule.Annotations = map[string]string{
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Duration(time.Second), nil, nil, options)
rule, err := NewPromRule("69", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider)
if err != nil {
assert.NoError(t, err)
promProvider.Close()
continue
}
retVal, err := rule.Eval(context.Background(), evalTime)
if err != nil {
assert.NoError(t, err)
promProvider.Close()
continue
}
assert.Equal(t, 1, retVal.(int), "case %d", idx)
for _, item := range rule.Active {
if c.expectNoData {
assert.True(t, strings.Contains(item.Labels.Get(qslabels.AlertNameLabel), "[No data]"), "case %d", idx)
} else {
assert.False(t, strings.Contains(item.Labels.Get(qslabels.AlertNameLabel), "[No data]"), "case %d", idx)
}
}
promProvider.Close()
}
}
func TestMultipleThresholdPromRule(t *testing.T) {
// fixed base time for deterministic tests
baseTime := time.Unix(1700000000, 0)
evalTime := baseTime.Add(5 * time.Minute)
postableRule := ruletypes.PostableRule{
AlertName: "Multiple threshold test",
AlertType: ruletypes.AlertTypeMetric,
RuleType: ruletypes.RuleTypeProm,
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
}},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{
"A": {
Query: "test_metric",
},
},
},
},
}
fingerprintCols := []cmock.ColumnType{
{Name: "fingerprint", Type: "UInt64"},
{Name: "any(labels)", Type: "String"},
}
samplesCols := []cmock.ColumnType{
{Name: "metric_name", Type: "String"},
{Name: "fingerprint", Type: "UInt64"},
{Name: "unix_milli", Type: "Int64"},
{Name: "value", Type: "Float64"},
{Name: "flags", Type: "UInt32"},
}
// see .Timestamps of base rule
evalWindowMs := int64(5 * 60 * 1000)
evalTimeMs := evalTime.UnixMilli()
queryStart := ((evalTimeMs-2*evalWindowMs)/60000)*60000 + 1
queryEnd := (evalTimeMs / 60000) * 60000
cases := []struct {
targetUnit string
yAxisUnit string
values []struct {
timestamp time.Time
value float64
}
expectAlerts int
compareOp string
matchType string
target float64
secondTarget float64
summaryAny []string
}{
{
targetUnit: "s",
yAxisUnit: "ns",
values: []struct {
timestamp time.Time
value float64
}{
{baseTime, 572588400}, // 0.57 seconds
{baseTime.Add(1 * time.Minute), 572386400}, // 0.57 seconds
{baseTime.Add(2 * time.Minute), 300947400}, // 0.3 seconds
{baseTime.Add(3 * time.Minute), 299316000}, // 0.3 seconds
{baseTime.Add(4 * time.Minute), 66640400.00000001}, // 0.06 seconds
},
expectAlerts: 1,
compareOp: "1", // Above
matchType: "1", // Once
target: 1, // 1 second
secondTarget: .5,
summaryAny: []string{
"observed metric value is 573 ms",
"observed metric value is 572 ms",
},
},
{
targetUnit: "ms",
yAxisUnit: "ns",
values: []struct {
timestamp time.Time
value float64
}{
{baseTime, 572588400}, // 572.58 ms
{baseTime.Add(1 * time.Minute), 572386400}, // 572.38 ms
{baseTime.Add(2 * time.Minute), 300947400}, // 300.94 ms
{baseTime.Add(3 * time.Minute), 299316000}, // 299.31 ms
{baseTime.Add(4 * time.Minute), 66640400.00000001}, // 66.64 ms
},
expectAlerts: 2, // One alert per threshold that fires
compareOp: "1", // Above
matchType: "1", // Once
target: 200, // 200 ms
secondTarget: 500,
summaryAny: []string{
"observed metric value is 299 ms",
"the observed metric value is 573 ms",
"the observed metric value is 572 ms",
"the observed metric value is 301 ms",
},
},
{
targetUnit: "decgbytes",
yAxisUnit: "bytes",
values: []struct {
timestamp time.Time
value float64
}{
{baseTime, 2863284053}, // 2.86 GB
{baseTime.Add(1 * time.Minute), 2863388842}, // 2.86 GB
{baseTime.Add(2 * time.Minute), 300947400}, // 0.3 GB
{baseTime.Add(3 * time.Minute), 299316000}, // 0.3 GB
{baseTime.Add(4 * time.Minute), 66640400.00000001}, // 66.64 MB
},
expectAlerts: 1,
compareOp: "1", // Above
matchType: "1", // Once
target: 200, // 200 GB
secondTarget: 2, // 2GB
summaryAny: []string{
"observed metric value is 2.7 GiB",
"the observed metric value is 0.3 GB",
},
},
}
logger := instrumentationtest.New().Logger()
for idx, c := range cases {
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
fingerprint := uint64(12345)
labelsJSON := `{"__name__":"test_metric"}`
fingerprintData := [][]interface{}{
{fingerprint, labelsJSON},
}
fingerprintRows := cmock.NewRows(fingerprintCols, fingerprintData)
samplesData := make([][]interface{}, len(c.values))
for i, v := range c.values {
samplesData[i] = []interface{}{
"test_metric",
fingerprint,
v.timestamp.UnixMilli(),
v.value,
uint32(0),
}
}
samplesRows := cmock.NewRows(samplesCols, samplesData)
telemetryStore.Mock().
ExpectQuery("SELECT fingerprint, any").
WithArgs("test_metric", "__name__", "test_metric").
WillReturnRows(fingerprintRows)
telemetryStore.Mock().
ExpectQuery("SELECT metric_name, fingerprint, unix_milli").
WithArgs(
"test_metric",
"test_metric",
"__name__",
"test_metric",
queryStart,
queryEnd,
).
WillReturnRows(samplesRows)
promProvider := prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore)
postableRule.RuleCondition.CompareOp = ruletypes.CompareOp(c.compareOp)
postableRule.RuleCondition.MatchType = ruletypes.MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target
postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit
postableRule.RuleCondition.TargetUnit = c.targetUnit
postableRule.RuleCondition.Thresholds = &ruletypes.RuleThresholdData{
Kind: ruletypes.BasicThresholdKind,
Spec: ruletypes.BasicRuleThresholds{
{
Name: "first_threshold",
TargetValue: &c.target,
TargetUnit: c.targetUnit,
MatchType: ruletypes.MatchType(c.matchType),
CompareOp: ruletypes.CompareOp(c.compareOp),
},
{
Name: "second_threshold",
TargetValue: &c.secondTarget,
TargetUnit: c.targetUnit,
MatchType: ruletypes.MatchType(c.matchType),
CompareOp: ruletypes.CompareOp(c.compareOp),
},
},
}
postableRule.Annotations = map[string]string{
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Duration(time.Second), nil, nil, options)
rule, err := NewPromRule("69", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider)
if err != nil {
assert.NoError(t, err)
promProvider.Close()
continue
}
retVal, err := rule.Eval(context.Background(), evalTime)
if err != nil {
assert.NoError(t, err)
promProvider.Close()
continue
}
assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx)
if c.expectAlerts != 0 {
foundCount := 0
for _, item := range rule.Active {
for _, summary := range c.summaryAny {
if strings.Contains(item.Annotations.Get("summary"), summary) {
foundCount++
break
}
}
}
assert.Equal(t, c.expectAlerts, foundCount, "case %d", idx)
}
promProvider.Close()
}
}

View File

@@ -488,7 +488,7 @@ func TestThresholdRuleEvalDelay(t *testing.T) {
AlertName: "Test Eval Delay",
AlertType: ruletypes.AlertTypeMetric,
RuleType: ruletypes.RuleTypeThreshold,
Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
}},
@@ -551,7 +551,7 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) {
AlertName: "Tricky Condition Tests",
AlertType: ruletypes.AlertTypeMetric,
RuleType: ruletypes.RuleTypeThreshold,
Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
}},
@@ -620,7 +620,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
AlertName: "Units test",
AlertType: ruletypes.AlertTypeMetric,
RuleType: ruletypes.RuleTypeThreshold,
Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
}},
@@ -784,7 +784,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
},
)
require.NoError(t, err)
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil, readerCache, options)
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, readerCache, options)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
@@ -821,7 +821,7 @@ func TestThresholdRuleNoData(t *testing.T) {
AlertName: "No data test",
AlertType: ruletypes.AlertTypeMetric,
RuleType: ruletypes.RuleTypeThreshold,
Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
}},
@@ -899,7 +899,7 @@ func TestThresholdRuleNoData(t *testing.T) {
)
assert.NoError(t, err)
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil, readerCache, options)
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, readerCache, options)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
@@ -932,7 +932,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
AlertName: "Traces link test",
AlertType: ruletypes.AlertTypeTraces,
RuleType: ruletypes.RuleTypeThreshold,
Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
}},
@@ -1019,7 +1019,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil, nil, options)
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, nil, options)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
@@ -1057,7 +1057,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
AlertName: "Logs link test",
AlertType: ruletypes.AlertTypeLogs,
RuleType: ruletypes.RuleTypeThreshold,
Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
}},
@@ -1156,7 +1156,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil, nil, options)
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, nil, options)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
@@ -1195,7 +1195,7 @@ func TestThresholdRuleShiftBy(t *testing.T) {
AlertName: "Logs link test",
AlertType: ruletypes.AlertTypeLogs,
RuleType: ruletypes.RuleTypeThreshold,
Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
}},
@@ -1269,7 +1269,7 @@ func TestMultipleThresholdRule(t *testing.T) {
AlertName: "Mulitple threshold test",
AlertType: ruletypes.AlertTypeMetric,
RuleType: ruletypes.RuleTypeThreshold,
Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
}},
@@ -1423,7 +1423,7 @@ func TestMultipleThresholdRule(t *testing.T) {
},
)
require.NoError(t, err)
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil, readerCache, options)
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, readerCache, options)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {