mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-27 09:22:12 +00:00
Compare commits
18 Commits
perf/log-v
...
feat/send_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3398041e6 | ||
|
|
e7f75c4896 | ||
|
|
c9b9ecd96d | ||
|
|
1e4a18e932 | ||
|
|
79e57dee13 | ||
|
|
9c63901385 | ||
|
|
b01797b7a9 | ||
|
|
4dc8b6cea8 | ||
|
|
d940344a81 | ||
|
|
568f51ae27 | ||
|
|
d5a7ff5c96 | ||
|
|
d899480a27 | ||
|
|
0d47ea0bd8 | ||
|
|
6bdeb54bd6 | ||
|
|
f865442d5b | ||
|
|
fa50bd7564 | ||
|
|
cad62c4be5 | ||
|
|
ff7bc3017f |
10
.mockery.yml
Normal file
10
.mockery.yml
Normal file
@@ -0,0 +1,10 @@
|
||||
# Link to template variables: https://pkg.go.dev/github.com/vektra/mockery/v3/config#TemplateData
|
||||
template: testify
|
||||
packages:
|
||||
github.com/SigNoz/signoz/pkg/alertmanager:
|
||||
config:
|
||||
all: true
|
||||
dir: '{{.InterfaceDir}}/mocks'
|
||||
filename: "mocks.go"
|
||||
structname: 'Mock{{.InterfaceName}}'
|
||||
pkgname: '{{.SrcPackageName}}mock'
|
||||
@@ -247,7 +247,8 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
|
||||
}
|
||||
}
|
||||
results, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
|
||||
ActiveAlerts: r.ActiveAlertsLabelFP(),
|
||||
ActiveAlerts: r.ActiveAlertsLabelFP(),
|
||||
SendUnmatched: r.ShouldSendUnmatched(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -299,7 +300,8 @@ func (r *AnomalyRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID,
|
||||
}
|
||||
}
|
||||
results, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
|
||||
ActiveAlerts: r.ActiveAlertsLabelFP(),
|
||||
ActiveAlerts: r.ActiveAlertsLabelFP(),
|
||||
SendUnmatched: r.ShouldSendUnmatched(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
589
ee/query-service/rules/manager_test.go
Normal file
589
ee/query-service/rules/manager_test.go
Normal file
@@ -0,0 +1,589 @@
|
||||
package rules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"math"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
alertmanagermock "github.com/SigNoz/signoz/pkg/alertmanager/mocks"
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
"github.com/SigNoz/signoz/pkg/cache/cachetest"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus/prometheustest"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/querier/signozquerier"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
qsRules "github.com/SigNoz/signoz/pkg/query-service/rules"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore/sqlstoretest"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
)
|
||||
|
||||
type queryMatcherAny struct {
|
||||
}
|
||||
|
||||
func (m *queryMatcherAny) Match(x string, y string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestManager_TestNotification_SendUnmatched_ThresholdRule(t *testing.T) {
|
||||
target := 10.0
|
||||
recovery := 5.0
|
||||
|
||||
buildRule := func() ruletypes.PostableRule {
|
||||
return ruletypes.PostableRule{
|
||||
AlertName: "test-alert",
|
||||
AlertType: ruletypes.AlertTypeMetric,
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
}},
|
||||
Labels: map[string]string{
|
||||
"service.name": "frontend",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"value": "{{$value}}",
|
||||
},
|
||||
Version: "v5",
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
MatchType: ruletypes.AtleastOnce,
|
||||
CompareOp: ruletypes.ValueIsAbove,
|
||||
Target: &target,
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Name: "A",
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "probe_success",
|
||||
TimeAggregation: metrictypes.TimeAggregationAvg,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationAvg,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Thresholds: &ruletypes.RuleThresholdData{
|
||||
Kind: ruletypes.BasicThresholdKind,
|
||||
Spec: ruletypes.BasicRuleThresholds{
|
||||
{
|
||||
Name: "primary",
|
||||
TargetValue: &target,
|
||||
RecoveryTarget: &recovery,
|
||||
MatchType: ruletypes.AtleastOnce,
|
||||
CompareOp: ruletypes.ValueIsAbove,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
NotificationSettings: &ruletypes.NotificationSettings{},
|
||||
}
|
||||
}
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
values [][]interface{}
|
||||
expectAlerts int
|
||||
expectValue float64
|
||||
}
|
||||
|
||||
cases := []testCase{
|
||||
{
|
||||
name: "return first valid point in case of test notification",
|
||||
values: [][]interface{}{
|
||||
{float64(3), "attr", time.Now()},
|
||||
{float64(4), "attr", time.Now().Add(1 * time.Minute)},
|
||||
},
|
||||
expectAlerts: 1,
|
||||
expectValue: 3,
|
||||
},
|
||||
{
|
||||
name: "No data in DB so no alerts fired",
|
||||
values: [][]interface{}{},
|
||||
expectAlerts: 0,
|
||||
},
|
||||
{
|
||||
name: "return first valid point in case of test notification skips NaN and Inf",
|
||||
values: [][]interface{}{
|
||||
{math.NaN(), "attr", time.Now()},
|
||||
{math.Inf(1), "attr", time.Now().Add(1 * time.Minute)},
|
||||
{float64(7), "attr", time.Now().Add(2 * time.Minute)},
|
||||
},
|
||||
expectAlerts: 1,
|
||||
expectValue: 7,
|
||||
},
|
||||
{
|
||||
name: "If found matching alert with given target value, return the alerting value rather than first valid point",
|
||||
values: [][]interface{}{
|
||||
{float64(1), "attr", time.Now()},
|
||||
{float64(2), "attr", time.Now().Add(1 * time.Minute)},
|
||||
{float64(3), "attr", time.Now().Add(2 * time.Minute)},
|
||||
{float64(12), "attr", time.Now().Add(3 * time.Minute)},
|
||||
},
|
||||
expectAlerts: 1,
|
||||
expectValue: 12,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
rule := buildRule()
|
||||
|
||||
// Marshal rule to JSON as TestNotification expects
|
||||
ruleBytes, err := json.Marshal(rule)
|
||||
require.NoError(t, err)
|
||||
|
||||
// mocking the alertmanager + capturing the triggered test alerts
|
||||
fAlert := alertmanagermock.NewMockAlertmanager(t)
|
||||
// mock set notification config
|
||||
fAlert.On("SetNotificationConfig", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
// for saving temp alerts that are triggered via TestNotification
|
||||
triggeredTestAlerts := []map[*alertmanagertypes.PostableAlert][]string{}
|
||||
if tc.expectAlerts > 0 {
|
||||
fAlert.On("TestAlert", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
|
||||
triggeredTestAlerts = append(triggeredTestAlerts, args.Get(3).(map[*alertmanagertypes.PostableAlert][]string))
|
||||
}).Return(nil).Times(tc.expectAlerts)
|
||||
}
|
||||
|
||||
cacheObj, err := cachetest.New(cache.Config{
|
||||
Provider: "memory",
|
||||
Memory: cache.Memory{
|
||||
NumCounters: 1000,
|
||||
MaxCost: 1 << 20,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
orgID := valuer.GenerateUUID()
|
||||
|
||||
// Create SQLStore mock for SendAlerts function which queries organizations table
|
||||
sqlStore := sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherRegexp)
|
||||
// Mock the organizations query that SendAlerts makes
|
||||
// Bun generates: SELECT id FROM organizations LIMIT 1 (or SELECT "id" FROM "organizations" LIMIT 1)
|
||||
orgRows := sqlStore.Mock().NewRows([]string{"id"}).AddRow(orgID.StringValue())
|
||||
// Match bun's generated query pattern - bun may quote identifiers
|
||||
sqlStore.Mock().ExpectQuery("SELECT (.+) FROM (.+)organizations(.+) LIMIT (.+)").WillReturnRows(orgRows)
|
||||
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
|
||||
|
||||
// Set up mock data for telemetry store
|
||||
cols := make([]cmock.ColumnType, 0)
|
||||
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"})
|
||||
cols = append(cols, cmock.ColumnType{Name: "attr", Type: "String"})
|
||||
cols = append(cols, cmock.ColumnType{Name: "ts", Type: "DateTime"})
|
||||
|
||||
alertDataRows := cmock.NewRows(cols, tc.values)
|
||||
|
||||
mock := telemetryStore.Mock()
|
||||
|
||||
// Generate query arguments for the metric query
|
||||
evalTime := time.Now().UTC()
|
||||
evalWindow := 5 * time.Minute
|
||||
evalDelay := time.Duration(0)
|
||||
queryArgs := qsRules.GenerateMetricQueryCHArgs(
|
||||
evalTime,
|
||||
evalWindow,
|
||||
evalDelay,
|
||||
"probe_success",
|
||||
metrictypes.Unspecified,
|
||||
)
|
||||
|
||||
mock.ExpectQuery("*WITH __temporal_aggregation_cte*").
|
||||
WithArgs(queryArgs...).
|
||||
WillReturnRows(alertDataRows)
|
||||
|
||||
// Create reader with mocked telemetry store
|
||||
readerCache, err := cachetest.New(cache.Config{
|
||||
Provider: "memory",
|
||||
Memory: cache.Memory{
|
||||
NumCounters: 10 * 1000,
|
||||
MaxCost: 1 << 26,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
providerSettings := instrumentationtest.New().ToProviderSettings()
|
||||
prometheus := prometheustest.New(context.Background(), providerSettings, prometheus.Config{}, telemetryStore)
|
||||
reader := clickhouseReader.NewReader(
|
||||
nil,
|
||||
telemetryStore,
|
||||
prometheus,
|
||||
"",
|
||||
time.Duration(time.Second),
|
||||
nil,
|
||||
readerCache,
|
||||
options,
|
||||
)
|
||||
|
||||
// Create mock querierV5 with test values
|
||||
providerFactory := signozquerier.NewFactory(telemetryStore, prometheus, readerCache)
|
||||
mockQuerier, err := providerFactory.New(context.Background(), providerSettings, querier.Config{})
|
||||
require.NoError(t, err)
|
||||
|
||||
mgrOpts := &qsRules.ManagerOptions{
|
||||
Logger: zap.NewNop(),
|
||||
SLogger: instrumentationtest.New().Logger(),
|
||||
Cache: cacheObj,
|
||||
Alertmanager: fAlert,
|
||||
Querier: mockQuerier,
|
||||
TelemetryStore: telemetryStore,
|
||||
Reader: reader,
|
||||
SqlStore: sqlStore, // SQLStore needed for SendAlerts to query organizations
|
||||
// Custom Test Notification function
|
||||
PrepareTestRuleFunc: TestNotification,
|
||||
}
|
||||
|
||||
mgr, err := qsRules.NewManager(mgrOpts)
|
||||
require.NoError(t, err)
|
||||
|
||||
count, apiErr := mgr.TestNotification(context.Background(), orgID, string(ruleBytes))
|
||||
if apiErr != nil {
|
||||
t.Logf("TestNotification error: %v, type: %s", apiErr.Err, apiErr.Typ)
|
||||
}
|
||||
require.Nil(t, apiErr)
|
||||
assert.Equal(t, tc.expectAlerts, count)
|
||||
|
||||
if tc.expectAlerts > 0 {
|
||||
// check if the alert has been triggered
|
||||
require.Len(t, triggeredTestAlerts, 1)
|
||||
var gotAlerts []*alertmanagertypes.PostableAlert
|
||||
for a := range triggeredTestAlerts[0] {
|
||||
gotAlerts = append(gotAlerts, a)
|
||||
}
|
||||
require.Len(t, gotAlerts, tc.expectAlerts)
|
||||
// check if the alert has triggered with correct threshold value
|
||||
if tc.expectValue != 0 {
|
||||
assert.Equal(t, strconv.FormatFloat(tc.expectValue, 'f', -1, 64), gotAlerts[0].Annotations["value"])
|
||||
}
|
||||
} else {
|
||||
// check if no alerts have been triggered
|
||||
assert.Empty(t, triggeredTestAlerts)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_TestNotification_SendUnmatched_PromRule(t *testing.T) {
|
||||
target := 10.0
|
||||
|
||||
buildRule := func() ruletypes.PostableRule {
|
||||
return ruletypes.PostableRule{
|
||||
AlertName: "test-prom-alert",
|
||||
AlertType: ruletypes.AlertTypeMetric,
|
||||
RuleType: ruletypes.RuleTypeProm,
|
||||
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
}},
|
||||
Labels: map[string]string{
|
||||
"service.name": "frontend",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"value": "{{$value}}",
|
||||
},
|
||||
Version: "v5",
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
MatchType: ruletypes.AtleastOnce,
|
||||
SelectedQuery: "A",
|
||||
CompareOp: ruletypes.ValueIsAbove,
|
||||
Target: &target,
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypePromQL,
|
||||
PanelType: v3.PanelTypeGraph,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypePromQL,
|
||||
Spec: qbtypes.PromQuery{
|
||||
Name: "A",
|
||||
Query: "{\"test_metric\"}",
|
||||
Disabled: false,
|
||||
Stats: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Thresholds: &ruletypes.RuleThresholdData{
|
||||
Kind: ruletypes.BasicThresholdKind,
|
||||
Spec: ruletypes.BasicRuleThresholds{
|
||||
{
|
||||
Name: "primary",
|
||||
TargetValue: &target,
|
||||
MatchType: ruletypes.AtleastOnce,
|
||||
CompareOp: ruletypes.ValueIsAbove,
|
||||
Channels: []string{"slack"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
NotificationSettings: &ruletypes.NotificationSettings{},
|
||||
}
|
||||
}
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
values []struct {
|
||||
offset time.Duration // offset from baseTime (negative = in the past)
|
||||
value float64
|
||||
}
|
||||
expectAlerts int
|
||||
expectValue float64
|
||||
}
|
||||
|
||||
cases := []testCase{
|
||||
{
|
||||
name: "return first valid point in case of test notification",
|
||||
values: []struct {
|
||||
offset time.Duration
|
||||
value float64
|
||||
}{
|
||||
{-4 * time.Minute, 3},
|
||||
{-3 * time.Minute, 4},
|
||||
},
|
||||
expectAlerts: 1,
|
||||
expectValue: 3,
|
||||
},
|
||||
{
|
||||
name: "No data in DB so no alerts fired",
|
||||
values: []struct {
|
||||
offset time.Duration
|
||||
value float64
|
||||
}{},
|
||||
expectAlerts: 0,
|
||||
},
|
||||
{
|
||||
name: "return first valid point in case of test notification skips NaN and Inf",
|
||||
values: []struct {
|
||||
offset time.Duration
|
||||
value float64
|
||||
}{
|
||||
{-4 * time.Minute, math.NaN()},
|
||||
{-3 * time.Minute, math.Inf(1)},
|
||||
{-2 * time.Minute, 7},
|
||||
},
|
||||
expectAlerts: 1,
|
||||
expectValue: 7,
|
||||
},
|
||||
{
|
||||
name: "If found matching alert with given target value, return the alerting value rather than first valid point",
|
||||
values: []struct {
|
||||
offset time.Duration
|
||||
value float64
|
||||
}{
|
||||
{-4 * time.Minute, 1},
|
||||
{-3 * time.Minute, 2},
|
||||
{-2 * time.Minute, 3},
|
||||
{-1 * time.Minute, 12},
|
||||
},
|
||||
expectAlerts: 1,
|
||||
expectValue: 12,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Capture base time once per test case to ensure consistent timestamps
|
||||
baseTime := time.Now().UTC()
|
||||
|
||||
rule := buildRule()
|
||||
|
||||
// Marshal rule to JSON as TestNotification expects
|
||||
ruleBytes, err := json.Marshal(rule)
|
||||
require.NoError(t, err)
|
||||
|
||||
// mocking the alertmanager + capturing the triggered test alerts
|
||||
fAlert := alertmanagermock.NewMockAlertmanager(t)
|
||||
// mock set notification config
|
||||
fAlert.On("SetNotificationConfig", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
// for saving temp alerts that are triggered via TestNotification
|
||||
triggeredTestAlerts := []map[*alertmanagertypes.PostableAlert][]string{}
|
||||
if tc.expectAlerts > 0 {
|
||||
fAlert.On("TestAlert", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
|
||||
triggeredTestAlerts = append(triggeredTestAlerts, args.Get(3).(map[*alertmanagertypes.PostableAlert][]string))
|
||||
}).Return(nil).Times(tc.expectAlerts)
|
||||
}
|
||||
|
||||
cacheObj, err := cachetest.New(cache.Config{
|
||||
Provider: "memory",
|
||||
Memory: cache.Memory{
|
||||
NumCounters: 1000,
|
||||
MaxCost: 1 << 20,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
orgID := valuer.GenerateUUID()
|
||||
|
||||
// Create SQLStore mock for SendAlerts function which queries organizations table
|
||||
sqlStore := sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherRegexp)
|
||||
// Mock the organizations query that SendAlerts makes
|
||||
orgRows := sqlStore.Mock().NewRows([]string{"id"}).AddRow(orgID.StringValue())
|
||||
sqlStore.Mock().ExpectQuery("SELECT (.+) FROM (.+)organizations(.+) LIMIT (.+)").WillReturnRows(orgRows)
|
||||
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
|
||||
|
||||
// Set up Prometheus-specific mock data
|
||||
// Fingerprint columns for Prometheus queries
|
||||
fingerprintCols := []cmock.ColumnType{
|
||||
{Name: "fingerprint", Type: "UInt64"},
|
||||
{Name: "any(labels)", Type: "String"},
|
||||
}
|
||||
|
||||
// Samples columns for Prometheus queries
|
||||
samplesCols := []cmock.ColumnType{
|
||||
{Name: "metric_name", Type: "String"},
|
||||
{Name: "fingerprint", Type: "UInt64"},
|
||||
{Name: "unix_milli", Type: "Int64"},
|
||||
{Name: "value", Type: "Float64"},
|
||||
{Name: "flags", Type: "UInt32"},
|
||||
}
|
||||
|
||||
// Calculate query time range similar to Prometheus rule tests
|
||||
// TestNotification uses time.Now().UTC() for evaluation
|
||||
// We calculate the query window based on current time to match what the actual evaluation will use
|
||||
evalTime := baseTime
|
||||
evalWindowMs := int64(5 * 60 * 1000) // 5 minutes in ms
|
||||
evalTimeMs := evalTime.UnixMilli()
|
||||
queryStart := ((evalTimeMs-2*evalWindowMs)/60000)*60000 + 1 // truncate to minute + 1ms
|
||||
queryEnd := (evalTimeMs / 60000) * 60000 // truncate to minute
|
||||
|
||||
// Create fingerprint data
|
||||
fingerprint := uint64(12345)
|
||||
labelsJSON := `{"__name__":"test_metric"}`
|
||||
fingerprintData := [][]interface{}{
|
||||
{fingerprint, labelsJSON},
|
||||
}
|
||||
fingerprintRows := cmock.NewRows(fingerprintCols, fingerprintData)
|
||||
|
||||
// Create samples data from test case values, calculating timestamps relative to baseTime
|
||||
validSamplesData := make([][]interface{}, 0)
|
||||
for _, v := range tc.values {
|
||||
// Skip NaN and Inf values in the samples data
|
||||
if math.IsNaN(v.value) || math.IsInf(v.value, 0) {
|
||||
continue
|
||||
}
|
||||
// Calculate timestamp relative to baseTime
|
||||
sampleTimestamp := baseTime.Add(v.offset).UnixMilli()
|
||||
validSamplesData = append(validSamplesData, []interface{}{
|
||||
"test_metric",
|
||||
fingerprint,
|
||||
sampleTimestamp,
|
||||
v.value,
|
||||
uint32(0), // flags - 0 means normal value
|
||||
})
|
||||
}
|
||||
samplesRows := cmock.NewRows(samplesCols, validSamplesData)
|
||||
|
||||
mock := telemetryStore.Mock()
|
||||
|
||||
// Mock the fingerprint query (for Prometheus label matching)
|
||||
mock.ExpectQuery("SELECT fingerprint, any").
|
||||
WithArgs("test_metric", "__name__", "test_metric").
|
||||
WillReturnRows(fingerprintRows)
|
||||
|
||||
// Mock the samples query (for Prometheus metric data)
|
||||
mock.ExpectQuery("SELECT metric_name, fingerprint, unix_milli").
|
||||
WithArgs(
|
||||
"test_metric",
|
||||
"test_metric",
|
||||
"__name__",
|
||||
"test_metric",
|
||||
queryStart,
|
||||
queryEnd,
|
||||
).
|
||||
WillReturnRows(samplesRows)
|
||||
|
||||
// Create reader with mocked telemetry store
|
||||
readerCache, err := cachetest.New(cache.Config{
|
||||
Provider: "memory",
|
||||
Memory: cache.Memory{
|
||||
NumCounters: 10 * 1000,
|
||||
MaxCost: 1 << 26,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
promProvider := prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore)
|
||||
reader := clickhouseReader.NewReader(
|
||||
nil,
|
||||
telemetryStore,
|
||||
promProvider,
|
||||
"",
|
||||
time.Duration(time.Second),
|
||||
nil,
|
||||
readerCache,
|
||||
options,
|
||||
)
|
||||
|
||||
mgrOpts := &qsRules.ManagerOptions{
|
||||
Logger: zap.NewNop(),
|
||||
SLogger: instrumentationtest.New().Logger(),
|
||||
Cache: cacheObj,
|
||||
Alertmanager: fAlert,
|
||||
TelemetryStore: telemetryStore,
|
||||
Reader: reader,
|
||||
SqlStore: sqlStore, // SQLStore needed for SendAlerts to query organizations
|
||||
Prometheus: promProvider,
|
||||
// Custom Test Notification function
|
||||
PrepareTestRuleFunc: TestNotification,
|
||||
}
|
||||
|
||||
mgr, err := qsRules.NewManager(mgrOpts)
|
||||
require.NoError(t, err)
|
||||
|
||||
count, apiErr := mgr.TestNotification(context.Background(), orgID, string(ruleBytes))
|
||||
if apiErr != nil {
|
||||
t.Logf("TestNotification error: %v, type: %s", apiErr.Err, apiErr.Typ)
|
||||
}
|
||||
require.Nil(t, apiErr)
|
||||
assert.Equal(t, tc.expectAlerts, count)
|
||||
|
||||
if tc.expectAlerts > 0 {
|
||||
// check if the alert has been triggered
|
||||
require.Len(t, triggeredTestAlerts, 1)
|
||||
var gotAlerts []*alertmanagertypes.PostableAlert
|
||||
for a := range triggeredTestAlerts[0] {
|
||||
gotAlerts = append(gotAlerts, a)
|
||||
}
|
||||
require.Len(t, gotAlerts, tc.expectAlerts)
|
||||
// check if the alert has triggered with correct threshold value
|
||||
if tc.expectValue != 0 && !math.IsNaN(tc.expectValue) && !math.IsInf(tc.expectValue, 0) {
|
||||
assert.Equal(t, strconv.FormatFloat(tc.expectValue, 'f', -1, 64), gotAlerts[0].Annotations["value"])
|
||||
}
|
||||
} else {
|
||||
// check if no alerts have been triggered
|
||||
assert.Empty(t, triggeredTestAlerts)
|
||||
}
|
||||
|
||||
promProvider.Close()
|
||||
})
|
||||
}
|
||||
}
|
||||
2
go.mod
2
go.mod
@@ -53,7 +53,7 @@ require (
|
||||
github.com/smartystreets/goconvey v1.8.1
|
||||
github.com/soheilhy/cmux v0.1.5
|
||||
github.com/spf13/cobra v1.10.1
|
||||
github.com/srikanthccv/ClickHouse-go-mock v0.12.0
|
||||
github.com/srikanthccv/ClickHouse-go-mock v0.13.0
|
||||
github.com/stretchr/testify v1.11.1
|
||||
github.com/swaggest/jsonschema-go v0.3.78
|
||||
github.com/swaggest/rest v0.2.75
|
||||
|
||||
4
go.sum
4
go.sum
@@ -962,8 +962,8 @@ github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3A
|
||||
github.com/spf13/viper v1.13.0/go.mod h1:Icm2xNL3/8uyh/wFuB1jI7TiTNKp8632Nwegu+zgdYw=
|
||||
github.com/spf13/viper v1.20.1 h1:ZMi+z/lvLyPSCoNtFCpqjy0S4kPbirhpTMwl8BkW9X4=
|
||||
github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4=
|
||||
github.com/srikanthccv/ClickHouse-go-mock v0.12.0 h1:KUzaWTwuqMc2uf5FylM/oAcTFdE2DdZjvISm9V0/NAA=
|
||||
github.com/srikanthccv/ClickHouse-go-mock v0.12.0/go.mod h1:1oUmLtXEXOyS0EEWVKlKEfLfv9y02agCMAvD3tVnhlo=
|
||||
github.com/srikanthccv/ClickHouse-go-mock v0.13.0 h1:/b7DQphGkh29ocNtLh4DGmQxQYA0CfHz65Wy2zAH2GM=
|
||||
github.com/srikanthccv/ClickHouse-go-mock v0.13.0/go.mod h1:LiiyBUdXNwB/1DE9rgK/8q9qjVYsTzg6WXQ/3mU3TeY=
|
||||
github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs=
|
||||
github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
|
||||
1836
pkg/alertmanager/mocks/mocks.go
Normal file
1836
pkg/alertmanager/mocks/mocks.go
Normal file
File diff suppressed because it is too large
Load Diff
@@ -191,6 +191,13 @@ func (r *BaseRule) currentAlerts() []*ruletypes.Alert {
|
||||
return alerts
|
||||
}
|
||||
|
||||
// ShouldSendUnmatched returns true if the rule should send unmatched samples
|
||||
// during alert evaluation, even if they don't match the rule condition.
|
||||
// This is useful in testing the rule.
|
||||
func (r *BaseRule) ShouldSendUnmatched() bool {
|
||||
return r.sendUnmatched
|
||||
}
|
||||
|
||||
// ActiveAlertsLabelFP returns a map of active alert labels fingerprint and
|
||||
// the fingerprint is computed using the QueryResultLables.Hash() method.
|
||||
// We use the QueryResultLables instead of labels as these labels are raw labels
|
||||
|
||||
577
pkg/query-service/rules/manager_test.go
Normal file
577
pkg/query-service/rules/manager_test.go
Normal file
@@ -0,0 +1,577 @@
|
||||
package rules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"math"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
alertmanagermock "github.com/SigNoz/signoz/pkg/alertmanager/mocks"
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
"github.com/SigNoz/signoz/pkg/cache/cachetest"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus/prometheustest"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/querier/signozquerier"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore/sqlstoretest"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
)
|
||||
|
||||
func TestManager_TestNotification_SendUnmatched_ThresholdRule(t *testing.T) {
|
||||
target := 10.0
|
||||
recovery := 5.0
|
||||
|
||||
buildRule := func() ruletypes.PostableRule {
|
||||
return ruletypes.PostableRule{
|
||||
AlertName: "test-alert",
|
||||
AlertType: ruletypes.AlertTypeMetric,
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
}},
|
||||
Labels: map[string]string{
|
||||
"service.name": "frontend",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"value": "{{$value}}",
|
||||
},
|
||||
Version: "v5",
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
MatchType: ruletypes.AtleastOnce,
|
||||
CompareOp: ruletypes.ValueIsAbove,
|
||||
Target: &target,
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Name: "A",
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "probe_success",
|
||||
TimeAggregation: metrictypes.TimeAggregationAvg,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationAvg,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Thresholds: &ruletypes.RuleThresholdData{
|
||||
Kind: ruletypes.BasicThresholdKind,
|
||||
Spec: ruletypes.BasicRuleThresholds{
|
||||
{
|
||||
Name: "primary",
|
||||
TargetValue: &target,
|
||||
RecoveryTarget: &recovery,
|
||||
MatchType: ruletypes.AtleastOnce,
|
||||
CompareOp: ruletypes.ValueIsAbove,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
NotificationSettings: &ruletypes.NotificationSettings{},
|
||||
}
|
||||
}
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
values [][]interface{}
|
||||
expectAlerts int
|
||||
expectValue float64
|
||||
}
|
||||
|
||||
cases := []testCase{
|
||||
{
|
||||
name: "return first valid point in case of test notification",
|
||||
values: [][]interface{}{
|
||||
{float64(3), "attr", time.Now()},
|
||||
{float64(4), "attr", time.Now().Add(1 * time.Minute)},
|
||||
},
|
||||
expectAlerts: 1,
|
||||
expectValue: 3,
|
||||
},
|
||||
{
|
||||
name: "No data in DB so no alerts fired",
|
||||
values: [][]interface{}{},
|
||||
expectAlerts: 0,
|
||||
},
|
||||
{
|
||||
name: "return first valid point in case of test notification skips NaN and Inf",
|
||||
values: [][]interface{}{
|
||||
{math.NaN(), "attr", time.Now()},
|
||||
{math.Inf(1), "attr", time.Now().Add(1 * time.Minute)},
|
||||
{float64(7), "attr", time.Now().Add(2 * time.Minute)},
|
||||
},
|
||||
expectAlerts: 1,
|
||||
expectValue: 7,
|
||||
},
|
||||
{
|
||||
name: "If found matching alert with given target value, return the alerting value rather than first valid point",
|
||||
values: [][]interface{}{
|
||||
{float64(1), "attr", time.Now()},
|
||||
{float64(2), "attr", time.Now().Add(1 * time.Minute)},
|
||||
{float64(3), "attr", time.Now().Add(2 * time.Minute)},
|
||||
{float64(12), "attr", time.Now().Add(3 * time.Minute)},
|
||||
},
|
||||
expectAlerts: 1,
|
||||
expectValue: 12,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
rule := buildRule()
|
||||
|
||||
// Marshal rule to JSON as TestNotification expects
|
||||
ruleBytes, err := json.Marshal(rule)
|
||||
require.NoError(t, err)
|
||||
|
||||
// mocking the alertmanager + capturing the triggered test alerts
|
||||
fAlert := alertmanagermock.NewMockAlertmanager(t)
|
||||
// mock set notification config
|
||||
fAlert.On("SetNotificationConfig", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
// for saving temp alerts that are triggered via TestNotification
|
||||
triggeredTestAlerts := []map[*alertmanagertypes.PostableAlert][]string{}
|
||||
if tc.expectAlerts > 0 {
|
||||
fAlert.On("TestAlert", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
|
||||
triggeredTestAlerts = append(triggeredTestAlerts, args.Get(3).(map[*alertmanagertypes.PostableAlert][]string))
|
||||
}).Return(nil).Times(tc.expectAlerts)
|
||||
}
|
||||
|
||||
cacheObj, err := cachetest.New(cache.Config{
|
||||
Provider: "memory",
|
||||
Memory: cache.Memory{
|
||||
NumCounters: 1000,
|
||||
MaxCost: 1 << 20,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
orgID := valuer.GenerateUUID()
|
||||
|
||||
// Create SQLStore mock for SendAlerts function which queries organizations table
|
||||
sqlStore := sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherRegexp)
|
||||
// Mock the organizations query that SendAlerts makes
|
||||
// Bun generates: SELECT id FROM organizations LIMIT 1 (or SELECT "id" FROM "organizations" LIMIT 1)
|
||||
orgRows := sqlStore.Mock().NewRows([]string{"id"}).AddRow(orgID.StringValue())
|
||||
// Match bun's generated query pattern - bun may quote identifiers
|
||||
sqlStore.Mock().ExpectQuery("SELECT (.+) FROM (.+)organizations(.+) LIMIT (.+)").WillReturnRows(orgRows)
|
||||
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
|
||||
|
||||
// Set up mock data for telemetry store
|
||||
cols := make([]cmock.ColumnType, 0)
|
||||
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"})
|
||||
cols = append(cols, cmock.ColumnType{Name: "attr", Type: "String"})
|
||||
cols = append(cols, cmock.ColumnType{Name: "ts", Type: "DateTime"})
|
||||
|
||||
alertDataRows := cmock.NewRows(cols, tc.values)
|
||||
|
||||
mock := telemetryStore.Mock()
|
||||
|
||||
// Generate query arguments for the metric query
|
||||
evalTime := time.Now().UTC()
|
||||
evalWindow := 5 * time.Minute
|
||||
evalDelay := time.Duration(0)
|
||||
queryArgs := GenerateMetricQueryCHArgs(
|
||||
evalTime,
|
||||
evalWindow,
|
||||
evalDelay,
|
||||
"probe_success",
|
||||
metrictypes.Unspecified,
|
||||
)
|
||||
|
||||
mock.ExpectQuery("*WITH __temporal_aggregation_cte*").
|
||||
WithArgs(queryArgs...).
|
||||
WillReturnRows(alertDataRows)
|
||||
|
||||
// Create reader with mocked telemetry store
|
||||
readerCache, err := cachetest.New(cache.Config{
|
||||
Provider: "memory",
|
||||
Memory: cache.Memory{
|
||||
NumCounters: 10 * 1000,
|
||||
MaxCost: 1 << 26,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
providerSettings := instrumentationtest.New().ToProviderSettings()
|
||||
prometheus := prometheustest.New(context.Background(), providerSettings, prometheus.Config{}, telemetryStore)
|
||||
reader := clickhouseReader.NewReader(
|
||||
nil,
|
||||
telemetryStore,
|
||||
prometheus,
|
||||
"",
|
||||
time.Duration(time.Second),
|
||||
nil,
|
||||
readerCache,
|
||||
options,
|
||||
)
|
||||
|
||||
// Create mock querierV5 with test values
|
||||
providerFactory := signozquerier.NewFactory(telemetryStore, prometheus, readerCache)
|
||||
mockQuerier, err := providerFactory.New(context.Background(), providerSettings, querier.Config{})
|
||||
require.NoError(t, err)
|
||||
|
||||
mgrOpts := &ManagerOptions{
|
||||
Logger: zap.NewNop(),
|
||||
SLogger: instrumentationtest.New().Logger(),
|
||||
Cache: cacheObj,
|
||||
Alertmanager: fAlert,
|
||||
Querier: mockQuerier,
|
||||
TelemetryStore: telemetryStore,
|
||||
Reader: reader,
|
||||
SqlStore: sqlStore, // SQLStore needed for SendAlerts to query organizations
|
||||
}
|
||||
|
||||
mgr, err := NewManager(mgrOpts)
|
||||
require.NoError(t, err)
|
||||
|
||||
count, apiErr := mgr.TestNotification(context.Background(), orgID, string(ruleBytes))
|
||||
if apiErr != nil {
|
||||
t.Logf("TestNotification error: %v, type: %s", apiErr.Err, apiErr.Typ)
|
||||
}
|
||||
require.Nil(t, apiErr)
|
||||
assert.Equal(t, tc.expectAlerts, count)
|
||||
|
||||
if tc.expectAlerts > 0 {
|
||||
// check if the alert has been triggered
|
||||
require.Len(t, triggeredTestAlerts, 1)
|
||||
var gotAlerts []*alertmanagertypes.PostableAlert
|
||||
for a := range triggeredTestAlerts[0] {
|
||||
gotAlerts = append(gotAlerts, a)
|
||||
}
|
||||
require.Len(t, gotAlerts, tc.expectAlerts)
|
||||
// check if the alert has triggered with correct threshold value
|
||||
if tc.expectValue != 0 {
|
||||
assert.Equal(t, strconv.FormatFloat(tc.expectValue, 'f', -1, 64), gotAlerts[0].Annotations["value"])
|
||||
}
|
||||
} else {
|
||||
// check if no alerts have been triggered
|
||||
assert.Empty(t, triggeredTestAlerts)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_TestNotification_SendUnmatched_PromRule(t *testing.T) {
|
||||
target := 10.0
|
||||
|
||||
buildRule := func() ruletypes.PostableRule {
|
||||
return ruletypes.PostableRule{
|
||||
AlertName: "test-prom-alert",
|
||||
AlertType: ruletypes.AlertTypeMetric,
|
||||
RuleType: ruletypes.RuleTypeProm,
|
||||
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
}},
|
||||
Labels: map[string]string{
|
||||
"service.name": "frontend",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"value": "{{$value}}",
|
||||
},
|
||||
Version: "v5",
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
MatchType: ruletypes.AtleastOnce,
|
||||
SelectedQuery: "A",
|
||||
CompareOp: ruletypes.ValueIsAbove,
|
||||
Target: &target,
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypePromQL,
|
||||
PanelType: v3.PanelTypeGraph,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypePromQL,
|
||||
Spec: qbtypes.PromQuery{
|
||||
Name: "A",
|
||||
Query: "{\"test_metric\"}",
|
||||
Disabled: false,
|
||||
Stats: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Thresholds: &ruletypes.RuleThresholdData{
|
||||
Kind: ruletypes.BasicThresholdKind,
|
||||
Spec: ruletypes.BasicRuleThresholds{
|
||||
{
|
||||
Name: "primary",
|
||||
TargetValue: &target,
|
||||
MatchType: ruletypes.AtleastOnce,
|
||||
CompareOp: ruletypes.ValueIsAbove,
|
||||
Channels: []string{"slack"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
NotificationSettings: &ruletypes.NotificationSettings{},
|
||||
}
|
||||
}
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
values []struct {
|
||||
offset time.Duration // offset from baseTime (negative = in the past)
|
||||
value float64
|
||||
}
|
||||
expectAlerts int
|
||||
expectValue float64
|
||||
}
|
||||
|
||||
cases := []testCase{
|
||||
{
|
||||
name: "return first valid point in case of test notification",
|
||||
values: []struct {
|
||||
offset time.Duration
|
||||
value float64
|
||||
}{
|
||||
{-4 * time.Minute, 3},
|
||||
{-3 * time.Minute, 4},
|
||||
},
|
||||
expectAlerts: 1,
|
||||
expectValue: 3,
|
||||
},
|
||||
{
|
||||
name: "No data in DB so no alerts fired",
|
||||
values: []struct {
|
||||
offset time.Duration
|
||||
value float64
|
||||
}{},
|
||||
expectAlerts: 0,
|
||||
},
|
||||
{
|
||||
name: "return first valid point in case of test notification skips NaN and Inf",
|
||||
values: []struct {
|
||||
offset time.Duration
|
||||
value float64
|
||||
}{
|
||||
{-4 * time.Minute, math.NaN()},
|
||||
{-3 * time.Minute, math.Inf(1)},
|
||||
{-2 * time.Minute, 7},
|
||||
},
|
||||
expectAlerts: 1,
|
||||
expectValue: 7,
|
||||
},
|
||||
{
|
||||
name: "If found matching alert with given target value, return the alerting value rather than first valid point",
|
||||
values: []struct {
|
||||
offset time.Duration
|
||||
value float64
|
||||
}{
|
||||
{-4 * time.Minute, 1},
|
||||
{-3 * time.Minute, 2},
|
||||
{-2 * time.Minute, 3},
|
||||
{-1 * time.Minute, 12},
|
||||
},
|
||||
expectAlerts: 1,
|
||||
expectValue: 12,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Capture base time once per test case to ensure consistent timestamps
|
||||
baseTime := time.Now().UTC()
|
||||
|
||||
rule := buildRule()
|
||||
|
||||
// Marshal rule to JSON as TestNotification expects
|
||||
ruleBytes, err := json.Marshal(rule)
|
||||
require.NoError(t, err)
|
||||
|
||||
// mocking the alertmanager + capturing the triggered test alerts
|
||||
fAlert := alertmanagermock.NewMockAlertmanager(t)
|
||||
// mock set notification config
|
||||
fAlert.On("SetNotificationConfig", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
// for saving temp alerts that are triggered via TestNotification
|
||||
triggeredTestAlerts := []map[*alertmanagertypes.PostableAlert][]string{}
|
||||
if tc.expectAlerts > 0 {
|
||||
fAlert.On("TestAlert", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
|
||||
triggeredTestAlerts = append(triggeredTestAlerts, args.Get(3).(map[*alertmanagertypes.PostableAlert][]string))
|
||||
}).Return(nil).Times(tc.expectAlerts)
|
||||
}
|
||||
|
||||
cacheObj, err := cachetest.New(cache.Config{
|
||||
Provider: "memory",
|
||||
Memory: cache.Memory{
|
||||
NumCounters: 1000,
|
||||
MaxCost: 1 << 20,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
orgID := valuer.GenerateUUID()
|
||||
|
||||
// Create SQLStore mock for SendAlerts function which queries organizations table
|
||||
sqlStore := sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherRegexp)
|
||||
// Mock the organizations query that SendAlerts makes
|
||||
orgRows := sqlStore.Mock().NewRows([]string{"id"}).AddRow(orgID.StringValue())
|
||||
sqlStore.Mock().ExpectQuery("SELECT (.+) FROM (.+)organizations(.+) LIMIT (.+)").WillReturnRows(orgRows)
|
||||
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
|
||||
|
||||
// Set up Prometheus-specific mock data
|
||||
// Fingerprint columns for Prometheus queries
|
||||
fingerprintCols := []cmock.ColumnType{
|
||||
{Name: "fingerprint", Type: "UInt64"},
|
||||
{Name: "any(labels)", Type: "String"},
|
||||
}
|
||||
|
||||
// Samples columns for Prometheus queries
|
||||
samplesCols := []cmock.ColumnType{
|
||||
{Name: "metric_name", Type: "String"},
|
||||
{Name: "fingerprint", Type: "UInt64"},
|
||||
{Name: "unix_milli", Type: "Int64"},
|
||||
{Name: "value", Type: "Float64"},
|
||||
{Name: "flags", Type: "UInt32"},
|
||||
}
|
||||
|
||||
// Calculate query time range similar to Prometheus rule tests
|
||||
// TestNotification uses time.Now().UTC() for evaluation
|
||||
// We calculate the query window based on current time to match what the actual evaluation will use
|
||||
evalTime := baseTime
|
||||
evalWindowMs := int64(5 * 60 * 1000) // 5 minutes in ms
|
||||
evalTimeMs := evalTime.UnixMilli()
|
||||
queryStart := ((evalTimeMs-2*evalWindowMs)/60000)*60000 + 1 // truncate to minute + 1ms
|
||||
queryEnd := (evalTimeMs / 60000) * 60000 // truncate to minute
|
||||
|
||||
// Create fingerprint data
|
||||
fingerprint := uint64(12345)
|
||||
labelsJSON := `{"__name__":"test_metric"}`
|
||||
fingerprintData := [][]interface{}{
|
||||
{fingerprint, labelsJSON},
|
||||
}
|
||||
fingerprintRows := cmock.NewRows(fingerprintCols, fingerprintData)
|
||||
|
||||
// Create samples data from test case values, calculating timestamps relative to baseTime
|
||||
validSamplesData := make([][]interface{}, 0)
|
||||
for _, v := range tc.values {
|
||||
// Skip NaN and Inf values in the samples data
|
||||
if math.IsNaN(v.value) || math.IsInf(v.value, 0) {
|
||||
continue
|
||||
}
|
||||
// Calculate timestamp relative to baseTime
|
||||
sampleTimestamp := baseTime.Add(v.offset).UnixMilli()
|
||||
validSamplesData = append(validSamplesData, []interface{}{
|
||||
"test_metric",
|
||||
fingerprint,
|
||||
sampleTimestamp,
|
||||
v.value,
|
||||
uint32(0), // flags - 0 means normal value
|
||||
})
|
||||
}
|
||||
samplesRows := cmock.NewRows(samplesCols, validSamplesData)
|
||||
|
||||
mock := telemetryStore.Mock()
|
||||
|
||||
// Mock the fingerprint query (for Prometheus label matching)
|
||||
mock.ExpectQuery("SELECT fingerprint, any").
|
||||
WithArgs("test_metric", "__name__", "test_metric").
|
||||
WillReturnRows(fingerprintRows)
|
||||
|
||||
// Mock the samples query (for Prometheus metric data)
|
||||
mock.ExpectQuery("SELECT metric_name, fingerprint, unix_milli").
|
||||
WithArgs(
|
||||
"test_metric",
|
||||
"test_metric",
|
||||
"__name__",
|
||||
"test_metric",
|
||||
queryStart,
|
||||
queryEnd,
|
||||
).
|
||||
WillReturnRows(samplesRows)
|
||||
|
||||
// Create reader with mocked telemetry store
|
||||
readerCache, err := cachetest.New(cache.Config{
|
||||
Provider: "memory",
|
||||
Memory: cache.Memory{
|
||||
NumCounters: 10 * 1000,
|
||||
MaxCost: 1 << 26,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
promProvider := prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore)
|
||||
reader := clickhouseReader.NewReader(
|
||||
nil,
|
||||
telemetryStore,
|
||||
promProvider,
|
||||
"",
|
||||
time.Duration(time.Second),
|
||||
nil,
|
||||
readerCache,
|
||||
options,
|
||||
)
|
||||
|
||||
mgrOpts := &ManagerOptions{
|
||||
Logger: zap.NewNop(),
|
||||
SLogger: instrumentationtest.New().Logger(),
|
||||
Cache: cacheObj,
|
||||
Alertmanager: fAlert,
|
||||
TelemetryStore: telemetryStore,
|
||||
Reader: reader,
|
||||
SqlStore: sqlStore, // SQLStore needed for SendAlerts to query organizations
|
||||
Prometheus: promProvider,
|
||||
}
|
||||
|
||||
mgr, err := NewManager(mgrOpts)
|
||||
require.NoError(t, err)
|
||||
|
||||
count, apiErr := mgr.TestNotification(context.Background(), orgID, string(ruleBytes))
|
||||
if apiErr != nil {
|
||||
t.Logf("TestNotification error: %v, type: %s", apiErr.Err, apiErr.Typ)
|
||||
}
|
||||
require.Nil(t, apiErr)
|
||||
assert.Equal(t, tc.expectAlerts, count)
|
||||
|
||||
if tc.expectAlerts > 0 {
|
||||
// check if the alert has been triggered
|
||||
require.Len(t, triggeredTestAlerts, 1)
|
||||
var gotAlerts []*alertmanagertypes.PostableAlert
|
||||
for a := range triggeredTestAlerts[0] {
|
||||
gotAlerts = append(gotAlerts, a)
|
||||
}
|
||||
require.Len(t, gotAlerts, tc.expectAlerts)
|
||||
// check if the alert has triggered with correct threshold value
|
||||
if tc.expectValue != 0 && !math.IsNaN(tc.expectValue) && !math.IsInf(tc.expectValue, 0) {
|
||||
assert.Equal(t, strconv.FormatFloat(tc.expectValue, 'f', -1, 64), gotAlerts[0].Annotations["value"])
|
||||
}
|
||||
} else {
|
||||
// check if no alerts have been triggered
|
||||
assert.Empty(t, triggeredTestAlerts)
|
||||
}
|
||||
|
||||
promProvider.Close()
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -138,7 +138,8 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
|
||||
var resultVector ruletypes.Vector
|
||||
for _, series := range res {
|
||||
resultSeries, err := r.Threshold.Eval(toCommonSeries(series), r.Unit(), ruletypes.EvalData{
|
||||
ActiveAlerts: r.ActiveAlertsLabelFP(),
|
||||
ActiveAlerts: r.ActiveAlertsLabelFP(),
|
||||
SendUnmatched: r.ShouldSendUnmatched(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -489,7 +489,8 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
|
||||
}
|
||||
}
|
||||
resultSeries, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
|
||||
ActiveAlerts: r.ActiveAlertsLabelFP(),
|
||||
ActiveAlerts: r.ActiveAlertsLabelFP(),
|
||||
SendUnmatched: r.ShouldSendUnmatched(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -568,7 +569,8 @@ func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUI
|
||||
}
|
||||
}
|
||||
resultSeries, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
|
||||
ActiveAlerts: r.ActiveAlertsLabelFP(),
|
||||
ActiveAlerts: r.ActiveAlertsLabelFP(),
|
||||
SendUnmatched: r.ShouldSendUnmatched(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -2,6 +2,7 @@ package rules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -1519,6 +1520,283 @@ func TestThresholdRuleEval_MatchPlusCompareOps(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
// TestThresholdRuleEval_SendUnmatchedBypassesRecovery tests the case where the sendUnmatched is true and the recovery target is met.
|
||||
// In this case, the rule should return the first sample as sendUnmatched is supposed to be used in tests and in case of tests
|
||||
// recovery target is expected to be present. This test make sure this behavior is working as expected.
|
||||
func TestThresholdRuleEval_SendUnmatchedBypassesRecovery(t *testing.T) {
|
||||
target := 10.0
|
||||
recovery := 4.0
|
||||
|
||||
postableRule := ruletypes.PostableRule{
|
||||
AlertName: "Send unmatched bypass recovery",
|
||||
AlertType: ruletypes.AlertTypeMetric,
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
}},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||
"A": {
|
||||
QueryName: "A",
|
||||
StepInterval: 60,
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "probe_success",
|
||||
},
|
||||
AggregateOperator: v3.AggregateOperatorNoOp,
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
Expression: "A",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
postableRule.RuleCondition.Thresholds = &ruletypes.RuleThresholdData{
|
||||
Kind: ruletypes.BasicThresholdKind,
|
||||
Spec: ruletypes.BasicRuleThresholds{
|
||||
{
|
||||
Name: "primary",
|
||||
TargetValue: &target,
|
||||
RecoveryTarget: &recovery,
|
||||
MatchType: ruletypes.AtleastOnce,
|
||||
CompareOp: ruletypes.ValueIsAbove,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
logger := instrumentationtest.New().Logger()
|
||||
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, nil, logger, WithEvalDelay(2*time.Minute))
|
||||
require.NoError(t, err)
|
||||
|
||||
now := time.Now()
|
||||
series := v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Timestamp: now.UnixMilli(), Value: 3},
|
||||
{Timestamp: now.Add(time.Minute).UnixMilli(), Value: 4},
|
||||
{Timestamp: now.Add(2 * time.Minute).UnixMilli(), Value: 5},
|
||||
},
|
||||
Labels: map[string]string{
|
||||
"service.name": "frontend",
|
||||
},
|
||||
LabelsArray: []map[string]string{
|
||||
{
|
||||
"service.name": "frontend",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
alertLabels := ruletypes.PrepareSampleLabelsForRule(series.Labels, "primary")
|
||||
activeAlerts := map[uint64]struct{}{alertLabels.Hash(): {}}
|
||||
|
||||
resultVectors, err := rule.Threshold.Eval(series, rule.Unit(), ruletypes.EvalData{
|
||||
ActiveAlerts: activeAlerts,
|
||||
SendUnmatched: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, resultVectors, 1, "expected unmatched sample to be returned")
|
||||
|
||||
smpl := resultVectors[0]
|
||||
assert.Equal(t, float64(3), smpl.V)
|
||||
assert.False(t, smpl.IsRecovering, "unmatched path should not mark sample as recovering")
|
||||
assert.Equal(t, float64(4), *smpl.RecoveryTarget, "unmatched path should set recovery target")
|
||||
assert.InDelta(t, target, smpl.Target, 0.01)
|
||||
assert.Equal(t, "primary", smpl.Metric.Get(ruletypes.LabelThresholdName))
|
||||
}
|
||||
|
||||
func intPtr(v int) *int {
|
||||
return &v
|
||||
}
|
||||
|
||||
// TestThresholdRuleEval_SendUnmatchedVariants tests the different variants of sendUnmatched behavior.
|
||||
// It tests the case where sendUnmatched is true, false.
|
||||
func TestThresholdRuleEval_SendUnmatchedVariants(t *testing.T) {
|
||||
target := 10.0
|
||||
recovery := 5.0
|
||||
postableRule := ruletypes.PostableRule{
|
||||
AlertName: "Send unmatched variants",
|
||||
AlertType: ruletypes.AlertTypeMetric,
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
}},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||
"A": {
|
||||
QueryName: "A",
|
||||
StepInterval: 60,
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "probe_success",
|
||||
},
|
||||
AggregateOperator: v3.AggregateOperatorNoOp,
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
Expression: "A",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
tests := []recoveryTestCase{
|
||||
{
|
||||
description: "sendUnmatched returns first valid point",
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Timestamp: now.UnixMilli(), Value: 3},
|
||||
{Timestamp: now.Add(time.Minute).UnixMilli(), Value: 4},
|
||||
},
|
||||
Labels: map[string]string{
|
||||
"service.name": "frontend",
|
||||
},
|
||||
LabelsArray: []map[string]string{
|
||||
{
|
||||
"service.name": "frontend",
|
||||
},
|
||||
},
|
||||
},
|
||||
compareOp: string(ruletypes.ValueIsAbove),
|
||||
matchType: string(ruletypes.AtleastOnce),
|
||||
target: target,
|
||||
recoveryTarget: &recovery,
|
||||
thresholdName: "primary",
|
||||
// Since sendUnmatched is true, the rule should return the first valid point
|
||||
// even if it doesn't match the rule condition with current target value of 10.0
|
||||
sendUnmatched: true,
|
||||
expectSamples: intPtr(1),
|
||||
expectedSampleValue: 3,
|
||||
},
|
||||
{
|
||||
description: "sendUnmatched false suppresses unmatched",
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Timestamp: now.UnixMilli(), Value: 3},
|
||||
{Timestamp: now.Add(time.Minute).UnixMilli(), Value: 4},
|
||||
},
|
||||
Labels: map[string]string{
|
||||
"service.name": "frontend",
|
||||
},
|
||||
LabelsArray: []map[string]string{
|
||||
{
|
||||
"service.name": "frontend",
|
||||
},
|
||||
},
|
||||
},
|
||||
compareOp: string(ruletypes.ValueIsAbove),
|
||||
matchType: string(ruletypes.AtleastOnce),
|
||||
target: target,
|
||||
recoveryTarget: &recovery,
|
||||
thresholdName: "primary",
|
||||
// Since sendUnmatched is false, the rule should not return any samples
|
||||
sendUnmatched: false,
|
||||
expectSamples: intPtr(0),
|
||||
},
|
||||
{
|
||||
description: "sendUnmatched skips NaN and uses next point",
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Timestamp: now.UnixMilli(), Value: math.NaN()},
|
||||
{Timestamp: now.Add(time.Minute).UnixMilli(), Value: math.Inf(1)},
|
||||
{Timestamp: now.Add(2 * time.Minute).UnixMilli(), Value: 7},
|
||||
},
|
||||
Labels: map[string]string{
|
||||
"service.name": "frontend",
|
||||
},
|
||||
LabelsArray: []map[string]string{
|
||||
{
|
||||
"service.name": "frontend",
|
||||
},
|
||||
},
|
||||
},
|
||||
compareOp: string(ruletypes.ValueIsAbove),
|
||||
matchType: string(ruletypes.AtleastOnce),
|
||||
target: target,
|
||||
recoveryTarget: &recovery,
|
||||
thresholdName: "primary",
|
||||
// Since sendUnmatched is true, the rule should return the first valid point
|
||||
// even if it doesn't match the rule condition with current target value of 10.0
|
||||
sendUnmatched: true,
|
||||
expectSamples: intPtr(1),
|
||||
expectedSampleValue: 7,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
runEvalTests(t, postableRule, []recoveryTestCase{tc})
|
||||
}
|
||||
}
|
||||
|
||||
// TestThresholdRuleEval_RecoveryNotMetSendUnmatchedFalse tests the case where the recovery target is not met and sendUnmatched is false.
|
||||
// In this case, the rule should not return any samples as no alert is active plus the recovery target is not met.
|
||||
func TestThresholdRuleEval_RecoveryNotMetSendUnmatchedFalse(t *testing.T) {
|
||||
target := 10.0
|
||||
recovery := 5.0
|
||||
|
||||
now := time.Now()
|
||||
postableRule := ruletypes.PostableRule{
|
||||
AlertName: "Recovery not met sendUnmatched false",
|
||||
AlertType: ruletypes.AlertTypeMetric,
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
}},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||
"A": {
|
||||
QueryName: "A",
|
||||
StepInterval: 60,
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "probe_success",
|
||||
},
|
||||
AggregateOperator: v3.AggregateOperatorNoOp,
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
Expression: "A",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tc := recoveryTestCase{
|
||||
description: "recovery target present but not met, sendUnmatched false",
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Timestamp: now.UnixMilli(), Value: 3},
|
||||
{Timestamp: now.Add(time.Minute).UnixMilli(), Value: 4},
|
||||
},
|
||||
Labels: map[string]string{
|
||||
"service.name": "frontend",
|
||||
},
|
||||
LabelsArray: []map[string]string{
|
||||
{
|
||||
"service.name": "frontend",
|
||||
},
|
||||
},
|
||||
},
|
||||
compareOp: string(ruletypes.ValueIsAbove),
|
||||
matchType: string(ruletypes.AtleastOnce),
|
||||
target: target,
|
||||
recoveryTarget: &recovery,
|
||||
thresholdName: "primary",
|
||||
sendUnmatched: false,
|
||||
expectSamples: intPtr(0),
|
||||
activeAlerts: nil, // will auto-calc
|
||||
expectedTarget: target,
|
||||
expectedRecoveryTarget: recovery,
|
||||
}
|
||||
|
||||
runEvalTests(t, postableRule, []recoveryTestCase{tc})
|
||||
}
|
||||
|
||||
func runEvalTests(t *testing.T, postableRule ruletypes.PostableRule, testCases []recoveryTestCase) {
|
||||
logger := instrumentationtest.New().Logger()
|
||||
for _, c := range testCases {
|
||||
@@ -1577,12 +1855,21 @@ func runEvalTests(t *testing.T, postableRule ruletypes.PostableRule, testCases [
|
||||
}
|
||||
|
||||
evalData := ruletypes.EvalData{
|
||||
ActiveAlerts: activeAlerts,
|
||||
ActiveAlerts: activeAlerts,
|
||||
SendUnmatched: c.sendUnmatched,
|
||||
}
|
||||
|
||||
resultVectors, err := rule.Threshold.Eval(values, rule.Unit(), evalData)
|
||||
assert.NoError(t, err)
|
||||
|
||||
if c.expectSamples != nil {
|
||||
assert.Equal(t, *c.expectSamples, len(resultVectors), "sample count mismatch")
|
||||
if *c.expectSamples > 0 {
|
||||
assert.InDelta(t, c.expectedSampleValue, resultVectors[0].V, 0.01, "sample value mismatch")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Verify results
|
||||
if c.expectAlert || c.expectRecovery {
|
||||
// Either a new alert fires or recovery happens - both return result vectors
|
||||
|
||||
@@ -27,6 +27,10 @@ type recoveryTestCase struct {
|
||||
expectedTarget float64
|
||||
expectedRecoveryTarget float64
|
||||
thresholdName string // for hash calculation
|
||||
// Optional fields for SendUnmatched scenarios
|
||||
sendUnmatched bool // whether to set EvalData.SendUnmatched
|
||||
expectSamples *int // if set, assert exact sample count
|
||||
expectedSampleValue float64 // used when expectSamples is set
|
||||
}
|
||||
|
||||
// thresholdExpectation defines expected behavior for a single threshold in multi-threshold tests
|
||||
|
||||
79
pkg/query-service/rules/utils.go
Normal file
79
pkg/query-service/rules/utils.go
Normal file
@@ -0,0 +1,79 @@
|
||||
package rules
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
)
|
||||
|
||||
// GenerateMetricQueryCHArgs generates query arguments for metric queries used in tests.
|
||||
// It calculates the time range, builds time series CTE args, temporal aggregation args,
|
||||
// and spatial aggregation args to match the actual query builder behavior.
|
||||
func GenerateMetricQueryCHArgs(
|
||||
evalTime time.Time,
|
||||
evalWindow time.Duration,
|
||||
evalDelay time.Duration,
|
||||
metricName string,
|
||||
temporality metrictypes.Temporality,
|
||||
) []interface{} {
|
||||
// Calculate time range
|
||||
startTime := evalTime.Add(-evalWindow)
|
||||
endTime := evalTime
|
||||
|
||||
startMs := startTime.UnixMilli()
|
||||
endMs := endTime.UnixMilli()
|
||||
|
||||
// Apply eval delay if present
|
||||
if evalDelay > 0 {
|
||||
startMs = startMs - int64(evalDelay.Milliseconds())
|
||||
endMs = endMs - int64(evalDelay.Milliseconds())
|
||||
}
|
||||
|
||||
// Round to nearest minute
|
||||
startMs = startMs - (startMs % (60 * 1000))
|
||||
endMs = endMs - (endMs % (60 * 1000))
|
||||
|
||||
start := uint64(startMs)
|
||||
end := uint64(endMs)
|
||||
|
||||
// Step1: Build time series CTE args
|
||||
|
||||
// Adjust start time to nearest hour
|
||||
oneHourInMilliseconds := uint64(time.Hour.Milliseconds())
|
||||
// start time for filtering signoz_metrics.time_series_v4 with start time
|
||||
timeSeriesCTEStartTime := start - (start % oneHourInMilliseconds)
|
||||
|
||||
queryArgs := []interface{}{
|
||||
metricName,
|
||||
timeSeriesCTEStartTime,
|
||||
end,
|
||||
}
|
||||
|
||||
// Add temporality if specified
|
||||
if temporality == metrictypes.Unknown {
|
||||
temporality = metrictypes.Unspecified
|
||||
}
|
||||
if temporality != metrictypes.Unknown {
|
||||
queryArgs = append(queryArgs, temporality.StringValue())
|
||||
}
|
||||
|
||||
// Add normalized flag
|
||||
queryArgs = append(queryArgs, false)
|
||||
|
||||
// Step2: Add temporal aggregation args
|
||||
// build args for filtering signoz_metrics.distributed_samples_v4 table
|
||||
temporalAggArgs := []interface{}{
|
||||
metricName,
|
||||
start,
|
||||
end,
|
||||
}
|
||||
queryArgs = append(queryArgs, temporalAggArgs...)
|
||||
|
||||
// Add spatial aggregation args
|
||||
spatialAggArgs := []interface{}{
|
||||
0, // isNaN check
|
||||
}
|
||||
queryArgs = append(queryArgs, spatialAggArgs...)
|
||||
|
||||
return queryArgs
|
||||
}
|
||||
@@ -63,6 +63,11 @@ type EvalData struct {
|
||||
// used to check if a sample is part of an active alert
|
||||
// when evaluating the recovery threshold.
|
||||
ActiveAlerts map[uint64]struct{}
|
||||
|
||||
// SendUnmatched is a flag to return samples
|
||||
// even if they don't match the rule condition.
|
||||
// This is useful in testing the rule.
|
||||
SendUnmatched bool
|
||||
}
|
||||
|
||||
// HasActiveAlert checks if the given sample figerprint is active
|
||||
@@ -131,6 +136,24 @@ func (r BasicRuleThresholds) Eval(series v3.Series, unit string, evalData EvalDa
|
||||
smpl.TargetUnit = threshold.TargetUnit
|
||||
resultVector = append(resultVector, smpl)
|
||||
continue
|
||||
} else if evalData.SendUnmatched {
|
||||
// Sanitise the series points to remove any NaN or Inf values
|
||||
series.Points = removeGroupinSetPoints(series)
|
||||
if len(series.Points) == 0 {
|
||||
continue
|
||||
}
|
||||
// prepare the sample with the first point of the series
|
||||
smpl := Sample{
|
||||
Point: Point{T: series.Points[0].Timestamp, V: series.Points[0].Value},
|
||||
Metric: PrepareSampleLabelsForRule(series.Labels, threshold.Name),
|
||||
Target: *threshold.TargetValue,
|
||||
TargetUnit: threshold.TargetUnit,
|
||||
}
|
||||
if threshold.RecoveryTarget != nil {
|
||||
smpl.RecoveryTarget = threshold.RecoveryTarget
|
||||
}
|
||||
resultVector = append(resultVector, smpl)
|
||||
continue
|
||||
}
|
||||
|
||||
// Prepare alert hash from series labels and threshold name if recovery target option was provided
|
||||
|
||||
Reference in New Issue
Block a user