Compare commits
65 Commits
main
...
feat/exclu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e5ea4de9c | ||
|
|
81e0df09b8 | ||
|
|
a522f39b9b | ||
|
|
affb6eee05 | ||
|
|
13a5e9dd24 | ||
|
|
f620767876 | ||
|
|
9fb8b2bb1b | ||
|
|
30494c9196 | ||
|
|
cae4cf0777 | ||
|
|
c9538b0604 | ||
|
|
204cc4e5c5 | ||
|
|
6dd2ffcb64 | ||
|
|
13c15249c5 | ||
|
|
8419ca7982 | ||
|
|
6b189b14c6 | ||
|
|
550c49fab0 | ||
|
|
5b6ff92648 | ||
|
|
45954b38fa | ||
|
|
ceade6c7d7 | ||
|
|
f15c88836c | ||
|
|
9af45643a9 | ||
|
|
d15e974e9f | ||
|
|
71e752a015 | ||
|
|
3407760585 | ||
|
|
58a0e36869 | ||
|
|
5d688eb919 | ||
|
|
c0f237a7c4 | ||
|
|
8ce8bc940a | ||
|
|
abce05b289 | ||
|
|
ccd25c3b67 | ||
|
|
ddb98da217 | ||
|
|
18d63d2e66 | ||
|
|
67c108f021 | ||
|
|
02939cafa4 | ||
|
|
e62b070c1e | ||
|
|
be0a7d8fd4 | ||
|
|
419044dc9e | ||
|
|
223465d6d5 | ||
|
|
cec99674fa | ||
|
|
0ccf58ac7a | ||
|
|
b08d636d6a | ||
|
|
f6141bc6c5 | ||
|
|
bfe49f0f1b | ||
|
|
8e8064c5c1 | ||
|
|
4392341467 | ||
|
|
521d8e4f4d | ||
|
|
b6103f371f | ||
|
|
43283506db | ||
|
|
694d9958db | ||
|
|
addee4c0a5 | ||
|
|
f10cf7ac04 | ||
|
|
b336678639 | ||
|
|
c438b3444e | ||
|
|
b624414507 | ||
|
|
bde7963444 | ||
|
|
2df93ff217 | ||
|
|
f496a6ecde | ||
|
|
599e230a72 | ||
|
|
9a0e32ff3b | ||
|
|
5fe2732698 | ||
|
|
4993a44ecc | ||
|
|
ebd575a16b | ||
|
|
666582337e | ||
|
|
23512ab05c | ||
|
|
1423749529 |
@@ -10,6 +10,7 @@ import (
|
||||
"slices"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/cache/memorycache"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
"github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
|
||||
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
@@ -107,6 +108,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
|
||||
signoz.Modules.OrgGetter,
|
||||
signoz.Querier,
|
||||
signoz.Instrumentation.Logger(),
|
||||
signoz.QueryParser,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -348,7 +350,7 @@ func (s *Server) Stop(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertmanager.Alertmanager, sqlstore sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, orgGetter organization.Getter, querier querier.Querier, logger *slog.Logger) (*baserules.Manager, error) {
|
||||
func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertmanager.Alertmanager, sqlstore sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, orgGetter organization.Getter, querier querier.Querier, logger *slog.Logger, queryParser queryparser.QueryParser) (*baserules.Manager, error) {
|
||||
ruleStore := sqlrulestore.NewRuleStore(sqlstore)
|
||||
maintenanceStore := sqlrulestore.NewMaintenanceStore(sqlstore)
|
||||
// create manager opts
|
||||
@@ -369,6 +371,7 @@ func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertma
|
||||
RuleStore: ruleStore,
|
||||
MaintenanceStore: maintenanceStore,
|
||||
SqlStore: sqlstore,
|
||||
QueryParser: queryParser,
|
||||
}
|
||||
|
||||
// create Manager
|
||||
|
||||
@@ -329,6 +329,17 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (interface{}, erro
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Filter out new series if newGroupEvalDelay is configured
|
||||
if r.ShouldSkipNewGroups() {
|
||||
collection := ruletypes.NewVectorLabelledCollection(res)
|
||||
filteredCollection, _, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, collection)
|
||||
if filterErr != nil {
|
||||
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
|
||||
return nil, filterErr
|
||||
}
|
||||
res = filteredCollection.(*ruletypes.VectorLabelledCollection).Vector()
|
||||
}
|
||||
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
opts.SLogger,
|
||||
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -59,6 +60,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
opts.Reader,
|
||||
opts.ManagerOpts.Prometheus,
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -82,6 +84,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
opts.Cache,
|
||||
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
)
|
||||
if err != nil {
|
||||
return task, err
|
||||
@@ -140,6 +143,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
|
||||
baserules.WithSendAlways(),
|
||||
baserules.WithSendUnmatched(),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -160,6 +164,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
|
||||
baserules.WithSendAlways(),
|
||||
baserules.WithSendUnmatched(),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -179,6 +184,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
|
||||
baserules.WithSendAlways(),
|
||||
baserules.WithSendUnmatched(),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to prepare a new anomaly rule for test", zap.String("name", alertname), zap.Error(err))
|
||||
|
||||
@@ -6439,6 +6439,73 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID
|
||||
return cachedMetadata, nil
|
||||
}
|
||||
|
||||
// GetFirstSeenFromMetricMetadata queries the metadata table to get the first_seen timestamp
|
||||
// for each metric-attribute-value combination.
|
||||
// Returns a map where key is `model.MetricMetadataLookupKey` and value is first_seen in milliseconds.
|
||||
func (r *ClickHouseReader) GetFirstSeenFromMetricMetadata(ctx context.Context, lookupKeys []model.MetricMetadataLookupKey) (map[model.MetricMetadataLookupKey]int64, error) {
|
||||
// Chunk the lookup keys to avoid overly large queries (max 300 tuples per query)
|
||||
const chunkSize = 300
|
||||
result := make(map[model.MetricMetadataLookupKey]int64)
|
||||
|
||||
for i := 0; i < len(lookupKeys); i += chunkSize {
|
||||
end := i + chunkSize
|
||||
if end > len(lookupKeys) {
|
||||
end = len(lookupKeys)
|
||||
}
|
||||
chunk := lookupKeys[i:end]
|
||||
|
||||
// Build the IN clause values - ClickHouse uses tuple syntax with placeholders
|
||||
var valueStrings []string
|
||||
var args []interface{}
|
||||
|
||||
for _, key := range chunk {
|
||||
valueStrings = append(valueStrings, "(?, ?, ?)")
|
||||
args = append(args, key.MetricName, key.AttributeName, key.AttributeValue)
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
m.metric_name,
|
||||
m.attr_name,
|
||||
m.attr_string_value,
|
||||
min(m.last_reported_unix_milli) AS first_seen
|
||||
FROM %s.%s AS m
|
||||
WHERE (m.metric_name, m.attr_name, m.attr_string_value) IN (%s)
|
||||
GROUP BY m.metric_name, m.attr_name, m.attr_string_value
|
||||
ORDER BY first_seen`,
|
||||
signozMetricDBName, r.metadataTable, strings.Join(valueStrings, ", "))
|
||||
|
||||
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
|
||||
rows, err := r.db.Query(valueCtx, query, args...)
|
||||
if err != nil {
|
||||
zap.L().Error("Error querying metadata for first_seen", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error querying metadata for first_seen: %v", err)}
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var metricName, attrName, attrValue string
|
||||
var firstSeen int64
|
||||
if err := rows.Scan(&metricName, &attrName, &attrValue, &firstSeen); err != nil {
|
||||
rows.Close()
|
||||
return nil, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning metadata first_seen result: %v", err)}
|
||||
}
|
||||
result[model.MetricMetadataLookupKey{
|
||||
MetricName: metricName,
|
||||
AttributeName: attrName,
|
||||
AttributeValue: attrValue,
|
||||
}] = firstSeen
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
rows.Close()
|
||||
return nil, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error iterating metadata first_seen results: %v", err)}
|
||||
}
|
||||
rows.Close()
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) {
|
||||
searchSpansResult := []model.SearchSpansResult{
|
||||
{
|
||||
|
||||
@@ -108,6 +108,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
|
||||
signoz.Modules.OrgGetter,
|
||||
signoz.Querier,
|
||||
signoz.Instrumentation.Logger(),
|
||||
signoz.QueryParser,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -335,6 +336,7 @@ func makeRulesManager(
|
||||
orgGetter organization.Getter,
|
||||
querier querier.Querier,
|
||||
logger *slog.Logger,
|
||||
queryParser queryparser.QueryParser,
|
||||
) (*rules.Manager, error) {
|
||||
ruleStore := sqlrulestore.NewRuleStore(sqlstore)
|
||||
maintenanceStore := sqlrulestore.NewMaintenanceStore(sqlstore)
|
||||
@@ -354,6 +356,7 @@ func makeRulesManager(
|
||||
RuleStore: ruleStore,
|
||||
MaintenanceStore: maintenanceStore,
|
||||
SqlStore: sqlstore,
|
||||
QueryParser: queryParser,
|
||||
}
|
||||
|
||||
// create Manager
|
||||
|
||||
@@ -81,6 +81,7 @@ type Reader interface {
|
||||
CheckClickHouse(ctx context.Context) error
|
||||
|
||||
GetMetricMetadata(context.Context, valuer.UUID, string, string) (*v3.MetricMetadataResponse, error)
|
||||
GetFirstSeenFromMetricMetadata(ctx context.Context, lookupKeys []model.MetricMetadataLookupKey) (map[model.MetricMetadataLookupKey]int64, error)
|
||||
|
||||
AddRuleStateHistory(ctx context.Context, ruleStateHistory []model.RuleStateHistory) error
|
||||
GetOverallStateTransitions(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.ReleStateItem, error)
|
||||
|
||||
@@ -516,3 +516,9 @@ type LogsAggregateParams struct {
|
||||
Function string `json:"function"`
|
||||
StepSeconds int `json:"step"`
|
||||
}
|
||||
|
||||
type MetricMetadataLookupKey struct {
|
||||
MetricName string
|
||||
AttributeName string
|
||||
AttributeValue string
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
@@ -88,6 +89,11 @@ type BaseRule struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
|
||||
evaluation ruletypes.Evaluation
|
||||
|
||||
// newGroupEvalDelay is the grace period for new alert groups
|
||||
newGroupEvalDelay *time.Duration
|
||||
|
||||
queryParser queryparser.QueryParser
|
||||
}
|
||||
|
||||
type RuleOption func(*BaseRule)
|
||||
@@ -122,6 +128,12 @@ func WithSQLStore(sqlstore sqlstore.SQLStore) RuleOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithQueryParser(queryParser queryparser.QueryParser) RuleOption {
|
||||
return func(r *BaseRule) {
|
||||
r.queryParser = queryParser
|
||||
}
|
||||
}
|
||||
|
||||
func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader interfaces.Reader, opts ...RuleOption) (*BaseRule, error) {
|
||||
if p.RuleCondition == nil || !p.RuleCondition.IsValid() {
|
||||
return nil, fmt.Errorf("invalid rule condition")
|
||||
@@ -154,6 +166,12 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader
|
||||
evaluation: evaluation,
|
||||
}
|
||||
|
||||
// Store newGroupEvalDelay and groupBy keys from NotificationSettings
|
||||
if p.NotificationSettings != nil && p.NotificationSettings.NewGroupEvalDelay != nil {
|
||||
newGroupEvalDelay := time.Duration(*p.NotificationSettings.NewGroupEvalDelay)
|
||||
baseRule.newGroupEvalDelay = &newGroupEvalDelay
|
||||
}
|
||||
|
||||
if baseRule.evalWindow == 0 {
|
||||
baseRule.evalWindow = 5 * time.Minute
|
||||
}
|
||||
@@ -528,3 +546,136 @@ func (r *BaseRule) PopulateTemporality(ctx context.Context, orgID valuer.UUID, q
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ShouldSkipNewGroups returns true if new group filtering should be applied
|
||||
func (r *BaseRule) ShouldSkipNewGroups() bool {
|
||||
return r.newGroupEvalDelay != nil && *r.newGroupEvalDelay > 0
|
||||
}
|
||||
|
||||
// extractMetricAndGroupBys extracts metric names and groupBy keys from the rule's query.
|
||||
// TODO: implement caching for query parsing results to avoid re-parsing the query + cache invalidation
|
||||
func (r *BaseRule) extractMetricAndGroupBys(ctx context.Context) ([]string, []string, error) {
|
||||
var metricNames []string
|
||||
var groupedFields []string
|
||||
|
||||
result, err := r.queryParser.AnalyzeCompositeQuery(ctx, r.ruleCondition.CompositeQuery)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
metricNames = result.MetricNames
|
||||
for _, col := range result.GroupByColumns {
|
||||
groupedFields = append(groupedFields, col.OriginField)
|
||||
}
|
||||
|
||||
return metricNames, groupedFields, nil
|
||||
}
|
||||
|
||||
// FilterNewSeries filters out items that are too new based on metadata first_seen timestamps.
|
||||
// Returns the filtered series and the number of series that were skipped.
|
||||
func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series ruletypes.LabelledCollection) (ruletypes.LabelledCollection, int, error) {
|
||||
// Extract metric names and groupBy keys
|
||||
metricNames, groupedFields, err := r.extractMetricAndGroupBys(ctx)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if len(metricNames) == 0 || len(groupedFields) == 0 {
|
||||
// No metrics or groupBy keys, nothing to filter (non-ideal case, return early)
|
||||
return series, 0, nil
|
||||
}
|
||||
|
||||
// Build lookup keys from series
|
||||
lookupKeys := make([]model.MetricMetadataLookupKey, 0)
|
||||
seriesIdxToLookupKeys := make(map[int][]model.MetricMetadataLookupKey) // series index -> lookup keys
|
||||
|
||||
for i := 0; i < series.Len(); i++ {
|
||||
labels := series.GetItem(i).GetLabels()
|
||||
metricLabelMap := make(map[string]string)
|
||||
for _, lbl := range labels {
|
||||
metricLabelMap[lbl.Name] = lbl.Value
|
||||
}
|
||||
|
||||
// Collect groupBy attribute-value pairs for this series
|
||||
seriesKeys := make([]model.MetricMetadataLookupKey, 0)
|
||||
|
||||
for _, metricName := range metricNames {
|
||||
for _, groupByKey := range groupedFields {
|
||||
if attrValue, ok := metricLabelMap[groupByKey]; ok {
|
||||
lookupKey := model.MetricMetadataLookupKey{
|
||||
MetricName: metricName,
|
||||
AttributeName: groupByKey,
|
||||
AttributeValue: attrValue,
|
||||
}
|
||||
lookupKeys = append(lookupKeys, lookupKey)
|
||||
seriesKeys = append(seriesKeys, lookupKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(seriesKeys) > 0 {
|
||||
seriesIdxToLookupKeys[i] = seriesKeys
|
||||
}
|
||||
}
|
||||
|
||||
if len(lookupKeys) == 0 {
|
||||
// No lookup keys to query, return original series
|
||||
return series, 0, nil
|
||||
}
|
||||
|
||||
// Query metadata for first_seen timestamps
|
||||
firstSeenMap, err := r.reader.GetFirstSeenFromMetricMetadata(ctx, lookupKeys)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// Filter series based on first_seen + delay
|
||||
preservedIndices := make([]int, 0)
|
||||
skippedCount := 0
|
||||
evalTimeMs := ts.UnixMilli()
|
||||
newGroupEvalDelayMs := r.newGroupEvalDelay.Milliseconds()
|
||||
|
||||
for i := 0; i < series.Len(); i++ {
|
||||
seriesKeys, ok := seriesIdxToLookupKeys[i]
|
||||
if !ok {
|
||||
// No matching lables used in groupBy from this series, include it
|
||||
preservedIndices = append(preservedIndices, i)
|
||||
continue
|
||||
}
|
||||
|
||||
// Find the maximum first_seen across all groupBy attributes for this series
|
||||
// if the lastest is old enought we're good, if latest is new we need to skip it
|
||||
maxFirstSeen := int64(0)
|
||||
foundAny := false
|
||||
for _, lookupKey := range seriesKeys {
|
||||
if firstSeen, exists := firstSeenMap[lookupKey]; exists {
|
||||
foundAny = true
|
||||
if firstSeen > maxFirstSeen {
|
||||
maxFirstSeen = firstSeen
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !foundAny {
|
||||
// No metadata found - treat as new, skip it
|
||||
skippedCount++
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if first_seen + delay has passed
|
||||
if maxFirstSeen+newGroupEvalDelayMs > evalTimeMs {
|
||||
// Still within grace period, skip this series
|
||||
skippedCount++
|
||||
continue
|
||||
}
|
||||
|
||||
// Old enough, include it
|
||||
preservedIndices = append(preservedIndices, i)
|
||||
}
|
||||
|
||||
if r.logger != nil && skippedCount > 0 {
|
||||
r.logger.InfoContext(ctx, "Filtered new series", "rule_name", r.Name(), "skipped_count", skippedCount, "total_count", series.Len(), "delay_ms", newGroupEvalDelayMs)
|
||||
}
|
||||
|
||||
return series.Filter(preservedIndices), skippedCount, nil
|
||||
}
|
||||
|
||||
@@ -1,12 +1,20 @@
|
||||
package rules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
func TestBaseRule_RequireMinPoints(t *testing.T) {
|
||||
@@ -81,3 +89,80 @@ func TestBaseRule_RequireMinPoints(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBaseRule_FilterNewSeries(t *testing.T) {
|
||||
logger := instrumentationtest.New().Logger()
|
||||
ctx := context.Background()
|
||||
delay := 30 * time.Minute
|
||||
evalTime := time.Unix(1_700_000_000, 0)
|
||||
|
||||
builderSpec := qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Name: "A",
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
|
||||
},
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{MetricName: "request_total"},
|
||||
},
|
||||
}
|
||||
|
||||
firstSeen := map[model.MetricMetadataLookupKey]int64{
|
||||
{MetricName: "request_total", AttributeName: "service_name", AttributeValue: "svc-old"}: evalTime.Add(-2 * delay).UnixMilli(),
|
||||
{MetricName: "request_total", AttributeName: "env", AttributeValue: "prod"}: evalTime.Add(-2 * delay).UnixMilli(),
|
||||
{MetricName: "request_total", AttributeName: "service_name", AttributeValue: "svc-new"}: evalTime.Add(-5 * time.Minute).UnixMilli(),
|
||||
}
|
||||
|
||||
reader := &mockReader{response: firstSeen}
|
||||
baseRule := &BaseRule{
|
||||
ruleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
Queries: []qbtypes.QueryEnvelope{{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: builderSpec,
|
||||
}},
|
||||
},
|
||||
},
|
||||
newGroupEvalDelay: &delay,
|
||||
reader: reader,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
vector := ruletypes.Vector{
|
||||
{Metric: labels.Labels{{Name: labels.MetricNameLabel, Value: "request_total"}, {Name: "service_name", Value: "svc-old"}, {Name: "env", Value: "prod"}}},
|
||||
{Metric: labels.Labels{{Name: labels.MetricNameLabel, Value: "request_total"}, {Name: "service_name", Value: "svc-new"}, {Name: "env", Value: "prod"}}},
|
||||
{Metric: labels.Labels{{Name: labels.MetricNameLabel, Value: "request_total"}, {Name: "service_name", Value: "svc-missing"}, {Name: "env", Value: "stage"}}},
|
||||
{Metric: labels.Labels{{Name: labels.MetricNameLabel, Value: "request_total"}, {Name: "status", Value: "200"}}},
|
||||
}
|
||||
|
||||
collection := ruletypes.NewVectorLabelledCollection(vector)
|
||||
filtered, skipped, err := baseRule.FilterNewSeries(ctx, evalTime, collection)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, skipped)
|
||||
|
||||
filteredVector := filtered.(*ruletypes.VectorLabelledCollection).Vector()
|
||||
require.Len(t, filteredVector, 2)
|
||||
services := make([]string, 0, len(filteredVector))
|
||||
for _, sample := range filteredVector {
|
||||
services = append(services, sample.Metric.Get("service_name"))
|
||||
}
|
||||
require.ElementsMatch(t, []string{"svc-old", ""}, services)
|
||||
}
|
||||
|
||||
type mockReader struct {
|
||||
interfaces.Reader
|
||||
response map[model.MetricMetadataLookupKey]int64
|
||||
err error
|
||||
calls [][]model.MetricMetadataLookupKey
|
||||
}
|
||||
|
||||
func (m *mockReader) GetFirstSeenFromMetricMetadata(_ context.Context, lookupKeys []model.MetricMetadataLookupKey) (map[model.MetricMetadataLookupKey]int64, error) {
|
||||
keysCopy := append([]model.MetricMetadataLookupKey(nil), lookupKeys...)
|
||||
m.calls = append(m.calls, keysCopy)
|
||||
if m.err != nil {
|
||||
return nil, m.err
|
||||
}
|
||||
return m.response, nil
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
@@ -103,6 +104,7 @@ type ManagerOptions struct {
|
||||
RuleStore ruletypes.RuleStore
|
||||
MaintenanceStore ruletypes.MaintenanceStore
|
||||
SqlStore sqlstore.SQLStore
|
||||
QueryParser queryparser.QueryParser
|
||||
}
|
||||
|
||||
// The Manager manages recording and alerting rules.
|
||||
@@ -125,6 +127,8 @@ type Manager struct {
|
||||
alertmanager alertmanager.Alertmanager
|
||||
sqlstore sqlstore.SQLStore
|
||||
orgGetter organization.Getter
|
||||
// queryParser is used for parsing queries for rules
|
||||
queryParser queryparser.QueryParser
|
||||
}
|
||||
|
||||
func defaultOptions(o *ManagerOptions) *ManagerOptions {
|
||||
@@ -166,6 +170,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
opts.SLogger,
|
||||
WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
||||
WithSQLStore(opts.SQLStore),
|
||||
WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -188,6 +193,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
opts.Reader,
|
||||
opts.ManagerOpts.Prometheus,
|
||||
WithSQLStore(opts.SQLStore),
|
||||
WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -226,6 +232,7 @@ func NewManager(o *ManagerOptions) (*Manager, error) {
|
||||
alertmanager: o.Alertmanager,
|
||||
orgGetter: o.OrgGetter,
|
||||
sqlstore: o.SqlStore,
|
||||
queryParser: o.QueryParser,
|
||||
}
|
||||
|
||||
zap.L().Debug("Manager created successfully with NotificationGroup")
|
||||
|
||||
@@ -135,6 +135,17 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Filter out new series if newGroupEvalDelay is configured
|
||||
if r.ShouldSkipNewGroups() {
|
||||
collection := ruletypes.NewPromMatrixLabelledCollection(res)
|
||||
filteredCollection, _, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, collection)
|
||||
if filterErr != nil {
|
||||
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
|
||||
return nil, filterErr
|
||||
}
|
||||
res = filteredCollection.(*ruletypes.PromMatrixLabelledCollection).Matrix()
|
||||
}
|
||||
|
||||
var resultVector ruletypes.Vector
|
||||
for _, series := range res {
|
||||
resultSeries, err := r.Threshold.Eval(toCommonSeries(series), r.Unit(), ruletypes.EvalData{
|
||||
|
||||
@@ -600,6 +600,17 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Filter out new series if newGroupEvalDelay is configured
|
||||
if r.ShouldSkipNewGroups() {
|
||||
collection := ruletypes.NewVectorLabelledCollection(res)
|
||||
filteredCollection, _, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, collection)
|
||||
if filterErr != nil {
|
||||
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
|
||||
return nil, filterErr
|
||||
}
|
||||
res = filteredCollection.(*ruletypes.VectorLabelledCollection).Vector()
|
||||
}
|
||||
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package queryparser
|
||||
import (
|
||||
"context"
|
||||
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
|
||||
"github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
)
|
||||
@@ -11,4 +12,6 @@ import (
|
||||
type QueryParser interface {
|
||||
// AnalyzeQueryFilter extracts filter conditions from a given query string.
|
||||
AnalyzeQueryFilter(ctx context.Context, queryType querybuildertypesv5.QueryType, query string) (*queryfilterextractor.FilterResult, error)
|
||||
// AnalyzeCompositeQuery extracts filter conditions from a composite query.
|
||||
AnalyzeCompositeQuery(ctx context.Context, compositeQuery *v3.CompositeQuery) (*queryfilterextractor.FilterResult, error)
|
||||
}
|
||||
|
||||
@@ -1,40 +0,0 @@
|
||||
package queryparser
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
|
||||
"github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
)
|
||||
|
||||
type queryParserImpl struct {
|
||||
settings factory.ProviderSettings
|
||||
}
|
||||
|
||||
// New creates a new implementation of the QueryParser service.
|
||||
func New(settings factory.ProviderSettings) QueryParser {
|
||||
return &queryParserImpl{
|
||||
settings: settings,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *queryParserImpl) AnalyzeQueryFilter(ctx context.Context, queryType querybuildertypesv5.QueryType, query string) (*queryfilterextractor.FilterResult, error) {
|
||||
var extractorType queryfilterextractor.ExtractorType
|
||||
switch queryType {
|
||||
case querybuildertypesv5.QueryTypePromQL:
|
||||
extractorType = queryfilterextractor.ExtractorTypePromQL
|
||||
case querybuildertypesv5.QueryTypeClickHouseSQL:
|
||||
extractorType = queryfilterextractor.ExtractorTypeClickHouseSQL
|
||||
default:
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported queryType: %s. Supported values are '%s' and '%s'", queryType, querybuildertypesv5.QueryTypePromQL, querybuildertypesv5.QueryTypeClickHouseSQL)
|
||||
}
|
||||
|
||||
// Create extractor
|
||||
extractor, err := queryfilterextractor.NewExtractor(extractorType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return extractor.Extract(query)
|
||||
}
|
||||
101
pkg/queryparser/queryparserimpl.go
Normal file
101
pkg/queryparser/queryparserimpl.go
Normal file
@@ -0,0 +1,101 @@
|
||||
package queryparser
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
)
|
||||
|
||||
type queryParserImpl struct {
|
||||
settings factory.ProviderSettings
|
||||
}
|
||||
|
||||
// New creates a new implementation of the QueryParser service.
|
||||
func New(settings factory.ProviderSettings) QueryParser {
|
||||
return &queryParserImpl{
|
||||
settings: settings,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *queryParserImpl) AnalyzeQueryFilter(ctx context.Context, queryType qbtypes.QueryType, query string) (*queryfilterextractor.FilterResult, error) {
|
||||
var extractorType queryfilterextractor.ExtractorType
|
||||
switch queryType {
|
||||
case qbtypes.QueryTypePromQL:
|
||||
extractorType = queryfilterextractor.ExtractorTypePromQL
|
||||
case qbtypes.QueryTypeClickHouseSQL:
|
||||
extractorType = queryfilterextractor.ExtractorTypeClickHouseSQL
|
||||
default:
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported queryType: %s. Supported values are '%s' and '%s'", queryType, qbtypes.QueryTypePromQL, qbtypes.QueryTypeClickHouseSQL)
|
||||
}
|
||||
|
||||
// Create extractor
|
||||
extractor, err := queryfilterextractor.NewExtractor(extractorType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return extractor.Extract(query)
|
||||
}
|
||||
|
||||
func (p *queryParserImpl) AnalyzeCompositeQuery(ctx context.Context, compositeQuery *v3.CompositeQuery) (*queryfilterextractor.FilterResult, error) {
|
||||
var result = &queryfilterextractor.FilterResult{
|
||||
MetricNames: []string{},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{},
|
||||
}
|
||||
|
||||
for _, query := range compositeQuery.Queries {
|
||||
switch query.Type {
|
||||
case qbtypes.QueryTypeBuilder:
|
||||
switch spec := query.Spec.(type) {
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
||||
// extract group by fields
|
||||
for _, groupBy := range spec.GroupBy {
|
||||
if groupBy.Name != "" {
|
||||
result.GroupByColumns = append(result.GroupByColumns, queryfilterextractor.ColumnInfo{Name: groupBy.Name, OriginExpr: groupBy.Name, OriginField: groupBy.Name})
|
||||
}
|
||||
}
|
||||
// extract metric names
|
||||
for _, aggregation := range spec.Aggregations {
|
||||
if aggregation.MetricName != "" {
|
||||
result.MetricNames = append(result.MetricNames, aggregation.MetricName)
|
||||
}
|
||||
}
|
||||
default:
|
||||
// TODO: add support for Traces and Logs Aggregation types
|
||||
if p.settings.Logger != nil {
|
||||
p.settings.Logger.WarnContext(ctx, "unsupported QueryBuilderQuery type: %T", spec)
|
||||
}
|
||||
continue
|
||||
}
|
||||
case qbtypes.QueryTypePromQL:
|
||||
spec, ok := query.Spec.(qbtypes.PromQuery)
|
||||
if !ok || spec.Query == "" {
|
||||
continue
|
||||
}
|
||||
res, err := p.AnalyzeQueryFilter(ctx, qbtypes.QueryTypePromQL, spec.Query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.MetricNames = append(result.MetricNames, res.MetricNames...)
|
||||
result.GroupByColumns = append(result.GroupByColumns, res.GroupByColumns...)
|
||||
case qbtypes.QueryTypeClickHouseSQL:
|
||||
spec, ok := query.Spec.(qbtypes.ClickHouseQuery)
|
||||
if !ok || spec.Query == "" {
|
||||
continue
|
||||
}
|
||||
res, err := p.AnalyzeQueryFilter(ctx, qbtypes.QueryTypeClickHouseSQL, spec.Query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.MetricNames = append(result.MetricNames, res.MetricNames...)
|
||||
result.GroupByColumns = append(result.GroupByColumns, res.GroupByColumns...)
|
||||
default:
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported query type: %s", query.Type)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
112
pkg/queryparser/queryparserimpl_test.go
Normal file
112
pkg/queryparser/queryparserimpl_test.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package queryparser
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBaseRule_ExtractMetricAndGroupBys(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
payload string
|
||||
wantMetrics []string
|
||||
wantGroupBy []queryfilterextractor.ColumnInfo
|
||||
}{
|
||||
{
|
||||
name: "builder multiple grouping",
|
||||
payload: builderQueryWithGrouping,
|
||||
wantMetrics: []string{"test_metric_cardinality", "cpu_usage_total"},
|
||||
wantGroupBy: []queryfilterextractor.ColumnInfo{
|
||||
{Name: "service_name", Alias: "", OriginExpr: "service_name", OriginField: "service_name"},
|
||||
{Name: "env", Alias: "", OriginExpr: "env", OriginField: "env"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "builder single grouping",
|
||||
payload: builderQuerySingleGrouping,
|
||||
wantMetrics: []string{"latency_p50"},
|
||||
wantGroupBy: []queryfilterextractor.ColumnInfo{
|
||||
{Name: "namespace", Alias: "", OriginExpr: "namespace", OriginField: "namespace"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "builder no grouping",
|
||||
payload: builderQueryNoGrouping,
|
||||
wantMetrics: []string{"disk_usage_total"},
|
||||
wantGroupBy: []queryfilterextractor.ColumnInfo{},
|
||||
},
|
||||
{
|
||||
name: "promql multiple grouping",
|
||||
payload: promQueryWithGrouping,
|
||||
wantMetrics: []string{"http_requests_total"},
|
||||
wantGroupBy: []queryfilterextractor.ColumnInfo{
|
||||
{Name: "pod", Alias: "", OriginExpr: "pod", OriginField: "pod"},
|
||||
{Name: "region", Alias: "", OriginExpr: "region", OriginField: "region"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "promql single grouping",
|
||||
payload: promQuerySingleGrouping,
|
||||
wantMetrics: []string{"cpu_usage_seconds_total"},
|
||||
wantGroupBy: []queryfilterextractor.ColumnInfo{
|
||||
{Name: "env", Alias: "", OriginExpr: "env", OriginField: "env"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "promql no grouping",
|
||||
payload: promQueryNoGrouping,
|
||||
wantMetrics: []string{"node_cpu_seconds_total"},
|
||||
wantGroupBy: []queryfilterextractor.ColumnInfo{},
|
||||
},
|
||||
{
|
||||
name: "clickhouse multiple grouping",
|
||||
payload: clickHouseQueryWithGrouping,
|
||||
wantMetrics: []string{"cpu"},
|
||||
wantGroupBy: []queryfilterextractor.ColumnInfo{
|
||||
{Name: "region", Alias: "r", OriginExpr: "region", OriginField: "region"},
|
||||
{Name: "zone", Alias: "", OriginExpr: "zone", OriginField: "zone"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "clickhouse single grouping",
|
||||
payload: clickHouseQuerySingleGrouping,
|
||||
wantMetrics: []string{"cpu_usage"},
|
||||
wantGroupBy: []queryfilterextractor.ColumnInfo{
|
||||
{Name: "region", Alias: "r", OriginExpr: "region", OriginField: "region"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "clickhouse no grouping",
|
||||
payload: clickHouseQueryNoGrouping,
|
||||
wantMetrics: []string{"memory_usage"},
|
||||
wantGroupBy: []queryfilterextractor.ColumnInfo{},
|
||||
},
|
||||
}
|
||||
|
||||
queryParser := New(instrumentationtest.New().ToProviderSettings())
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cq := mustCompositeQuery(t, tt.payload)
|
||||
res, err := queryParser.AnalyzeCompositeQuery(ctx, cq)
|
||||
require.NoError(t, err)
|
||||
require.ElementsMatch(t, tt.wantMetrics, res.MetricNames)
|
||||
require.ElementsMatch(t, tt.wantGroupBy, res.GroupByColumns)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func mustCompositeQuery(t *testing.T, payload string) *v3.CompositeQuery {
|
||||
t.Helper()
|
||||
var compositeQuery v3.CompositeQuery
|
||||
require.NoError(t, json.Unmarshal([]byte(payload), &compositeQuery))
|
||||
return &compositeQuery
|
||||
}
|
||||
184
pkg/queryparser/queryparserimpl_testdata.go
Normal file
184
pkg/queryparser/queryparserimpl_testdata.go
Normal file
@@ -0,0 +1,184 @@
|
||||
package queryparser
|
||||
|
||||
var (
|
||||
builderQueryWithGrouping = `
|
||||
{
|
||||
"queryType":"builder",
|
||||
"panelType":"graph",
|
||||
"queries":[
|
||||
{
|
||||
"type":"builder_query",
|
||||
"spec":{
|
||||
"name":"A",
|
||||
"signal":"metrics",
|
||||
"stepInterval":null,
|
||||
"disabled":false,
|
||||
"filter":{"expression":""},
|
||||
"groupBy":[
|
||||
{"name":"service_name","fieldDataType":"","fieldContext":""},
|
||||
{"name":"env","fieldDataType":"","fieldContext":""}
|
||||
],
|
||||
"aggregations":[
|
||||
{"metricName":"test_metric_cardinality","timeAggregation":"count","spaceAggregation":"sum"},
|
||||
{"metricName":"cpu_usage_total","timeAggregation":"avg","spaceAggregation":"avg"}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
|
||||
builderQuerySingleGrouping = `
|
||||
{
|
||||
"queryType":"builder",
|
||||
"panelType":"graph",
|
||||
"queries":[
|
||||
{
|
||||
"type":"builder_query",
|
||||
"spec":{
|
||||
"name":"B",
|
||||
"signal":"metrics",
|
||||
"stepInterval":null,
|
||||
"disabled":false,
|
||||
"groupBy":[
|
||||
{"name":"namespace","fieldDataType":"","fieldContext":""}
|
||||
],
|
||||
"aggregations":[
|
||||
{"metricName":"latency_p50","timeAggregation":"avg","spaceAggregation":"max"}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
|
||||
builderQueryNoGrouping = `
|
||||
{
|
||||
"queryType":"builder",
|
||||
"panelType":"graph",
|
||||
"queries":[
|
||||
{
|
||||
"type":"builder_query",
|
||||
"spec":{
|
||||
"name":"C",
|
||||
"signal":"metrics",
|
||||
"stepInterval":null,
|
||||
"disabled":false,
|
||||
"groupBy":[],
|
||||
"aggregations":[
|
||||
{"metricName":"disk_usage_total","timeAggregation":"sum","spaceAggregation":"sum"}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
|
||||
promQueryWithGrouping = `
|
||||
{
|
||||
"queries":[
|
||||
{
|
||||
"type":"promql",
|
||||
"spec":{
|
||||
"name":"P1",
|
||||
"query":"sum by (pod,region) (rate(http_requests_total[5m]))",
|
||||
"disabled":false,
|
||||
"step":0,
|
||||
"stats":false
|
||||
}
|
||||
}
|
||||
],
|
||||
"panelType":"graph",
|
||||
"queryType":"promql"
|
||||
}
|
||||
`
|
||||
|
||||
promQuerySingleGrouping = `
|
||||
{
|
||||
"queries":[
|
||||
{
|
||||
"type":"promql",
|
||||
"spec":{
|
||||
"name":"P2",
|
||||
"query":"sum by (env)(rate(cpu_usage_seconds_total{job=\"api\"}[5m]))",
|
||||
"disabled":false,
|
||||
"step":0,
|
||||
"stats":false
|
||||
}
|
||||
}
|
||||
],
|
||||
"panelType":"graph",
|
||||
"queryType":"promql"
|
||||
}
|
||||
`
|
||||
|
||||
promQueryNoGrouping = `
|
||||
{
|
||||
"queries":[
|
||||
{
|
||||
"type":"promql",
|
||||
"spec":{
|
||||
"name":"P3",
|
||||
"query":"rate(node_cpu_seconds_total[1m])",
|
||||
"disabled":false,
|
||||
"step":0,
|
||||
"stats":false
|
||||
}
|
||||
}
|
||||
],
|
||||
"panelType":"graph",
|
||||
"queryType":"promql"
|
||||
}
|
||||
`
|
||||
|
||||
clickHouseQueryWithGrouping = `
|
||||
{
|
||||
"queryType":"clickhouse_sql",
|
||||
"panelType":"graph",
|
||||
"queries":[
|
||||
{
|
||||
"type":"clickhouse_sql",
|
||||
"spec":{
|
||||
"name":"CH1",
|
||||
"query":"SELECT region as r, zone FROM metrics WHERE metric_name='cpu' GROUP BY region, zone",
|
||||
"disabled":false
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
|
||||
clickHouseQuerySingleGrouping = `
|
||||
{
|
||||
"queryType":"clickhouse_sql",
|
||||
"panelType":"graph",
|
||||
"queries":[
|
||||
{
|
||||
"type":"clickhouse_sql",
|
||||
"spec":{
|
||||
"name":"CH2",
|
||||
"query":"SELECT region as r FROM metrics WHERE metric_name='cpu_usage' GROUP BY region",
|
||||
"disabled":false
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
|
||||
clickHouseQueryNoGrouping = `
|
||||
{
|
||||
"queryType":"clickhouse_sql",
|
||||
"panelType":"graph",
|
||||
"queries":[
|
||||
{
|
||||
"type":"clickhouse_sql",
|
||||
"spec":{
|
||||
"name":"CH3",
|
||||
"query":"SELECT * FROM metrics WHERE metric_name = 'memory_usage'",
|
||||
"disabled":false
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
)
|
||||
@@ -70,6 +70,8 @@ type NotificationSettings struct {
|
||||
GroupBy []string `json:"groupBy,omitempty"`
|
||||
Renotify Renotify `json:"renotify,omitempty"`
|
||||
UsePolicy bool `json:"usePolicy,omitempty"`
|
||||
// NewGroupEvalDelay is the grace period for new series to be excluded from alerts evaluation
|
||||
NewGroupEvalDelay *Duration `json:"newGroupEvalDelay,omitempty"`
|
||||
}
|
||||
|
||||
type Renotify struct {
|
||||
|
||||
120
pkg/types/ruletypes/labelled_collection.go
Normal file
120
pkg/types/ruletypes/labelled_collection.go
Normal file
@@ -0,0 +1,120 @@
|
||||
package ruletypes
|
||||
|
||||
import (
|
||||
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
)
|
||||
|
||||
// LabelledItem represents an entity that can provide labels
|
||||
type LabelledItem interface {
|
||||
// GetLabels returns the labels for this entity
|
||||
GetLabels() qslabels.Labels
|
||||
}
|
||||
|
||||
// LabelledCollection represents a collection of labelled items
|
||||
// this is used to abstract the underlying collection type
|
||||
// and provide a unified interface for working with items which have labels
|
||||
type LabelledCollection interface {
|
||||
// Len returns the number of entities in the collection
|
||||
Len() int
|
||||
|
||||
// GetItem returns the entity at the given index
|
||||
GetItem(index int) LabelledItem
|
||||
|
||||
// Filter returns a new filtered collection containing only items at the given indices
|
||||
Filter(preservedIndices []int) LabelledCollection
|
||||
}
|
||||
|
||||
// VectorLabelledCollection wraps ruletypes.Vector to implement LabelledCollection
|
||||
type VectorLabelledCollection struct {
|
||||
vector Vector
|
||||
}
|
||||
|
||||
// NewVectorLabelledCollection creates a new VectorLabelledCollection from a ruletypes.Vector
|
||||
func NewVectorLabelledCollection(vector Vector) *VectorLabelledCollection {
|
||||
return &VectorLabelledCollection{vector: vector}
|
||||
}
|
||||
|
||||
// Len returns the number of entities in the vector
|
||||
func (vc *VectorLabelledCollection) Len() int {
|
||||
return len(vc.vector)
|
||||
}
|
||||
|
||||
// GetItem returns the entity at the given index
|
||||
func (vc *VectorLabelledCollection) GetItem(index int) LabelledItem {
|
||||
return &VectorLabelledItem{sample: vc.vector[index]}
|
||||
}
|
||||
|
||||
// Filter returns a new VectorLabelledCollection containing only items at the given indices
|
||||
func (vc *VectorLabelledCollection) Filter(indices []int) LabelledCollection {
|
||||
filtered := make(Vector, 0, len(indices))
|
||||
for _, idx := range indices {
|
||||
if idx >= 0 && idx < len(vc.vector) {
|
||||
filtered = append(filtered, vc.vector[idx])
|
||||
}
|
||||
}
|
||||
return NewVectorLabelledCollection(filtered)
|
||||
}
|
||||
|
||||
func (vc *VectorLabelledCollection) Vector() Vector {
|
||||
return vc.vector
|
||||
}
|
||||
|
||||
// VectorLabelledItem wraps ruletypes.Sample to implement LabelledItem
|
||||
type VectorLabelledItem struct {
|
||||
sample Sample
|
||||
}
|
||||
|
||||
// GetLabels returns the labels from the sample
|
||||
func (vi *VectorLabelledItem) GetLabels() qslabels.Labels {
|
||||
return vi.sample.Metric
|
||||
}
|
||||
|
||||
// PromMatrixLabelledCollection wraps promql.Matrix to implement LabelledCollection
|
||||
type PromMatrixLabelledCollection struct {
|
||||
matrix promql.Matrix
|
||||
}
|
||||
|
||||
// NewPromMatrixLabelledCollection creates a new PromMatrixLabelledCollection from a promql.Matrix
|
||||
func NewPromMatrixLabelledCollection(matrix promql.Matrix) *PromMatrixLabelledCollection {
|
||||
return &PromMatrixLabelledCollection{matrix: matrix}
|
||||
}
|
||||
|
||||
// Len returns the number of entities in the matrix
|
||||
func (pmc *PromMatrixLabelledCollection) Len() int {
|
||||
return len(pmc.matrix)
|
||||
}
|
||||
|
||||
// GetItem returns the entity at the given index
|
||||
func (pmc *PromMatrixLabelledCollection) GetItem(index int) LabelledItem {
|
||||
return &PromSeriesLabelledItem{series: pmc.matrix[index]}
|
||||
}
|
||||
|
||||
// Filter returns a new PromMatrixLabelledCollection containing only items at the given indices
|
||||
func (pmc *PromMatrixLabelledCollection) Filter(indices []int) LabelledCollection {
|
||||
filtered := make(promql.Matrix, 0, len(indices))
|
||||
for _, idx := range indices {
|
||||
if idx >= 0 && idx < len(pmc.matrix) {
|
||||
filtered = append(filtered, pmc.matrix[idx])
|
||||
}
|
||||
}
|
||||
return NewPromMatrixLabelledCollection(filtered)
|
||||
}
|
||||
|
||||
func (pmc *PromMatrixLabelledCollection) Matrix() promql.Matrix {
|
||||
return pmc.matrix
|
||||
}
|
||||
|
||||
// PromSeriesLabelledItem wraps promql.Series to implement LabelledItem
|
||||
type PromSeriesLabelledItem struct {
|
||||
series promql.Series
|
||||
}
|
||||
|
||||
// GetLabels returns the labels from the prometheus series
|
||||
func (psi *PromSeriesLabelledItem) GetLabels() qslabels.Labels {
|
||||
metricLabels := make(qslabels.Labels, 0, len(psi.series.Metric))
|
||||
for _, lbl := range psi.series.Metric {
|
||||
metricLabels = append(metricLabels, qslabels.Label{Name: lbl.Name, Value: lbl.Value})
|
||||
}
|
||||
return metricLabels
|
||||
}
|
||||
Reference in New Issue
Block a user