Compare commits

...

65 Commits

Author SHA1 Message Date
Srikanth Chekuri
9e5ea4de9c Merge branch 'main' into feat/exclude_new_series_from_alert 2025-12-15 08:27:00 +05:30
Abhishek Kumar Singh
81e0df09b8 refactor: merge conflict fix 2025-12-03 19:06:18 +05:30
Abhishek Kumar Singh
a522f39b9b Merge branch 'main' into feat/exclude_new_series_from_alert 2025-12-03 19:04:18 +05:30
Abhishek Kumar Singh
affb6eee05 feat: added function in query parser to parse composite query and get filter result 2025-12-02 18:56:19 +05:30
Abhishek Kumar Singh
13a5e9dd24 Merge branch 'feat/groups_in_ch_and_promql_queries' into feat/exclude_new_series_from_alert 2025-12-02 18:02:40 +05:30
Abhishek Kumar Singh
f620767876 refactor: used binding.JSON.BindBody to parse body 2025-12-02 17:58:39 +05:30
Srikanth Chekuri
9fb8b2bb1b Merge branch 'main' into feat/groups_in_ch_and_promql_queries 2025-12-02 17:26:54 +05:30
Abhishek Kumar Singh
30494c9196 chore: updated comments 2025-12-02 16:58:06 +05:30
Abhishek Kumar Singh
cae4cf0777 refactor: use query parser in baseRule 2025-12-02 16:55:58 +05:30
Abhishek Kumar Singh
c9538b0604 Merge branch 'feat/groups_in_ch_and_promql_queries' into feat/exclude_new_series_from_alert 2025-12-02 15:57:15 +05:30
Abhishek Kumar Singh
204cc4e5c5 Merge branch 'main' into feat/groups_in_ch_and_promql_queries 2025-12-02 15:47:30 +05:30
Abhishek Kumar Singh
6dd2ffcb64 refactor: update API handler to use new queryparser package for query parsing 2025-11-27 20:11:16 +05:30
Abhishek Kumar Singh
13c15249c5 Merge branch 'feat/groups_in_ch_and_promql_queries' into feat/exclude_new_series_from_alert 2025-11-27 19:51:27 +05:30
Abhishek Kumar Singh
8419ca7982 Merge branch 'main' into feat/exclude_new_series_from_alert 2025-11-27 19:49:57 +05:30
Abhishek Kumar Singh
6b189b14c6 chore: updated series collection to labelledcollection 2025-11-27 19:45:01 +05:30
Abhishek Kumar Singh
550c49fab0 feat: created queryparser package with APIs 2025-11-27 19:37:41 +05:30
Abhishek Kumar Singh
5b6ff92648 refactor: moved package queryfilterextractor in pkg/queryparser 2025-11-27 19:14:55 +05:30
Abhishek Kumar Singh
45954b38fa Merge branch 'main' into feat/exclude_new_series_from_alert 2025-11-25 12:27:00 +05:30
Abhishek Kumar Singh
ceade6c7d7 refactor: moved query parser APIs to queryfilterextractor package 2025-11-24 14:15:44 +05:30
Srikanth Chekuri
f15c88836c Merge branch 'main' into feat/groups_in_ch_and_promql_queries 2025-11-21 15:26:44 +05:30
Abhishek Kumar Singh
9af45643a9 refactor: created valuer enum for extractor types + exposed alias along with column name for groups 2025-11-21 15:16:03 +05:30
Srikanth Chekuri
d15e974e9f Merge branch 'main' into feat/groups_in_ch_and_promql_queries 2025-11-20 22:59:29 +05:30
Abhishek Kumar Singh
71e752a015 refactor: moved api request and response to types 2025-11-20 20:00:51 +05:30
Abhishek Kumar Singh
3407760585 Merge branch 'feat/queryfilterextractor' into feat/groups_in_ch_and_promql_queries 2025-11-20 19:02:11 +05:30
Abhishek Kumar Singh
58a0e36869 refactor: return relevant errors when parsing query 2025-11-20 19:01:40 +05:30
Abhishek Kumar Singh
5d688eb919 test: updated test case to include CH query error case 2025-11-20 18:55:45 +05:30
Abhishek Kumar Singh
c0f237a7c4 refactor: use render.Error instead of RespondError 2025-11-20 18:12:47 +05:30
Abhishek Kumar Singh
8ce8bc940a Merge branch 'feat/queryfilterextractor' into feat/groups_in_ch_and_promql_queries 2025-11-20 17:42:46 +05:30
Abhishek Kumar Singh
abce05b289 feat: exposed group name from column info 2025-11-20 17:38:00 +05:30
Abhishek Kumar Singh
ccd25c3b67 refactor: removed redundant checks 2025-11-20 17:24:36 +05:30
Abhishek Kumar Singh
ddb98da217 refactor: change function visibility for extractMetricAndGroupBys 2025-11-20 15:56:45 +05:30
Abhishek Kumar Singh
18d63d2e66 fix: close CH rows on reading each chunk 2025-11-20 15:44:07 +05:30
Abhishek Kumar Singh
67c108f021 feat: added new series filter in anomaly rule as well 2025-11-20 15:26:14 +05:30
Abhishek Kumar Singh
02939cafa4 refactor: added GetFirstSeenFromMetricMetadata in clickhouse reader, removed caching for query result 2025-11-20 15:24:14 +05:30
Abhishek Kumar Singh
e62b070c1e fix: created interface for standard series filtration from both prom and threshold rule 2025-11-20 14:35:25 +05:30
Srikanth Chekuri
be0a7d8fd4 Merge branch 'main' into feat/queryfilterextractor 2025-11-20 02:48:43 +05:30
Srikanth Chekuri
419044dc9e Merge branch 'main' into feat/queryfilterextractor 2025-11-19 22:42:12 +05:30
Abhishek Kumar Singh
223465d6d5 Merge branch 'feat/queryfilterextractor' into feat/exclude_new_series_from_alert 2025-11-19 20:56:01 +05:30
Abhishek Kumar Singh
cec99674fa feat: exclude new samples from alert evals 2025-11-19 20:52:59 +05:30
Abhishek Kumar Singh
0ccf58ac7a fix: common behaviour across CH and PromQL originField and originExpr 2025-11-19 20:36:00 +05:30
Abhishek Kumar Singh
b08d636d6a refactor: updated query type check with constants 2025-11-19 20:12:20 +05:30
Abhishek Kumar Singh
f6141bc6c5 feat: added API to extract metric names and groups from CH or PromQL query 2025-11-19 16:48:03 +05:30
Srikanth Chekuri
bfe49f0f1b Merge branch 'main' into feat/queryfilterextractor 2025-11-19 14:21:35 +05:30
Abhishek Kumar Singh
8e8064c5c1 fix: ci lint issues 2025-11-19 13:28:04 +05:30
Abhishek Kumar Singh
4392341467 improv: added comments for CH originparser + some code improv 2025-11-19 12:58:34 +05:30
Abhishek Kumar Singh
521d8e4f4d improv: added more tests for promql and added comments 2025-11-19 12:15:20 +05:30
Abhishek Kumar Singh
b6103f371f Merge branch 'main' into feat/queryfilterextractor 2025-11-18 21:09:45 +05:30
Abhishek Kumar Singh
43283506db Merge branch 'feat/queryfilterextractor_complex' into feat/queryfilterextractor 2025-11-18 15:22:24 +05:30
Abhishek Kumar Singh
694d9958db improv: integrated origin field extraction and updated tests to check for origin fields 2025-11-18 15:03:24 +05:30
Abhishek Kumar Singh
addee4c0a5 feat: added origin field extractor for ch query 2025-11-18 14:36:03 +05:30
Abhishek Kumar Singh
f10cf7ac04 refactor: code organisation 2025-11-17 16:27:17 +05:30
Abhishek Kumar Singh
b336678639 fix: CH test cases 2025-11-17 15:01:32 +05:30
Abhishek Kumar Singh
c438b3444e refactor: removed GroupBy from FilterResult 2025-11-17 14:34:46 +05:30
Abhishek Kumar Singh
b624414507 feat: extract column origin from subquery and join before searching directly 2025-11-17 13:42:47 +05:30
Abhishek Kumar Singh
bde7963444 feat: implemented extractOriginFromSelectItem which will find the given columnName till the very end to return the origin column with given name 2025-11-17 09:00:18 +05:30
Abhishek Kumar Singh
2df93ff217 feat: extract column origin from query and add in column info 2025-11-16 10:20:38 +05:30
Abhishek Kumar Singh
f496a6ecde improv: updated result for queryfilterextractor to return column with alias 2025-11-16 08:58:33 +05:30
Abhishek Kumar Singh
599e230a72 feat: added NewExtractor function for creating extractor 2025-11-13 13:52:32 +05:30
Abhishek Kumar Singh
9a0e32ff3b refactor: removed redundant non nil checks 2025-11-13 13:41:51 +05:30
Abhishek Kumar Singh
5fe2732698 refactor: removed unused extractFromAnyFunction 2025-11-13 13:20:59 +05:30
Abhishek Kumar Singh
4993a44ecc refactor: removed unused cases + added comments 2025-11-13 12:59:35 +05:30
Abhishek Kumar Singh
ebd575a16b chore: comments + remove usage of seen map in extractGroupFromGroupByClause 2025-11-12 19:26:44 +05:30
Abhishek Kumar Singh
666582337e feat: support for CTE in clickhouse queryfilterextractor 2025-11-12 18:58:30 +05:30
Abhishek Kumar Singh
23512ab05c feat: added support for promql in queryfilterextractor 2025-11-10 20:50:42 +05:30
Abhishek Kumar Singh
1423749529 feat: added filter extractor interface and clickhouse impl with tests 2025-11-10 20:05:39 +05:30
19 changed files with 885 additions and 41 deletions

View File

@@ -10,6 +10,7 @@ import (
"slices"
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"
"go.opentelemetry.io/otel/propagation"
@@ -107,6 +108,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
signoz.Modules.OrgGetter,
signoz.Querier,
signoz.Instrumentation.Logger(),
signoz.QueryParser,
)
if err != nil {
@@ -348,7 +350,7 @@ func (s *Server) Stop(ctx context.Context) error {
return nil
}
func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertmanager.Alertmanager, sqlstore sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, orgGetter organization.Getter, querier querier.Querier, logger *slog.Logger) (*baserules.Manager, error) {
func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertmanager.Alertmanager, sqlstore sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, orgGetter organization.Getter, querier querier.Querier, logger *slog.Logger, queryParser queryparser.QueryParser) (*baserules.Manager, error) {
ruleStore := sqlrulestore.NewRuleStore(sqlstore)
maintenanceStore := sqlrulestore.NewMaintenanceStore(sqlstore)
// create manager opts
@@ -369,6 +371,7 @@ func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertma
RuleStore: ruleStore,
MaintenanceStore: maintenanceStore,
SqlStore: sqlstore,
QueryParser: queryParser,
}
// create Manager

View File

@@ -329,6 +329,17 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (interface{}, erro
return nil, err
}
// Filter out new series if newGroupEvalDelay is configured
if r.ShouldSkipNewGroups() {
collection := ruletypes.NewVectorLabelledCollection(res)
filteredCollection, _, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, collection)
if filterErr != nil {
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
return nil, filterErr
}
res = filteredCollection.(*ruletypes.VectorLabelledCollection).Vector()
}
r.mtx.Lock()
defer r.mtx.Unlock()

View File

@@ -37,6 +37,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.SLogger,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithSQLStore(opts.SQLStore),
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
@@ -59,6 +60,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Reader,
opts.ManagerOpts.Prometheus,
baserules.WithSQLStore(opts.SQLStore),
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
@@ -82,6 +84,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Cache,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithSQLStore(opts.SQLStore),
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
return task, err
@@ -140,6 +143,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
@@ -160,6 +164,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
@@ -179,6 +184,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
zap.L().Error("failed to prepare a new anomaly rule for test", zap.String("name", alertname), zap.Error(err))

View File

@@ -6439,6 +6439,73 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID
return cachedMetadata, nil
}
// GetFirstSeenFromMetricMetadata queries the metadata table to get the first_seen timestamp
// for each metric-attribute-value combination.
// Returns a map where key is `model.MetricMetadataLookupKey` and value is first_seen in milliseconds.
func (r *ClickHouseReader) GetFirstSeenFromMetricMetadata(ctx context.Context, lookupKeys []model.MetricMetadataLookupKey) (map[model.MetricMetadataLookupKey]int64, error) {
// Chunk the lookup keys to avoid overly large queries (max 300 tuples per query)
const chunkSize = 300
result := make(map[model.MetricMetadataLookupKey]int64)
for i := 0; i < len(lookupKeys); i += chunkSize {
end := i + chunkSize
if end > len(lookupKeys) {
end = len(lookupKeys)
}
chunk := lookupKeys[i:end]
// Build the IN clause values - ClickHouse uses tuple syntax with placeholders
var valueStrings []string
var args []interface{}
for _, key := range chunk {
valueStrings = append(valueStrings, "(?, ?, ?)")
args = append(args, key.MetricName, key.AttributeName, key.AttributeValue)
}
query := fmt.Sprintf(`
SELECT
m.metric_name,
m.attr_name,
m.attr_string_value,
min(m.last_reported_unix_milli) AS first_seen
FROM %s.%s AS m
WHERE (m.metric_name, m.attr_name, m.attr_string_value) IN (%s)
GROUP BY m.metric_name, m.attr_name, m.attr_string_value
ORDER BY first_seen`,
signozMetricDBName, r.metadataTable, strings.Join(valueStrings, ", "))
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
rows, err := r.db.Query(valueCtx, query, args...)
if err != nil {
zap.L().Error("Error querying metadata for first_seen", zap.Error(err))
return nil, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error querying metadata for first_seen: %v", err)}
}
for rows.Next() {
var metricName, attrName, attrValue string
var firstSeen int64
if err := rows.Scan(&metricName, &attrName, &attrValue, &firstSeen); err != nil {
rows.Close()
return nil, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning metadata first_seen result: %v", err)}
}
result[model.MetricMetadataLookupKey{
MetricName: metricName,
AttributeName: attrName,
AttributeValue: attrValue,
}] = firstSeen
}
if err := rows.Err(); err != nil {
rows.Close()
return nil, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error iterating metadata first_seen results: %v", err)}
}
rows.Close()
}
return result, nil
}
func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) {
searchSpansResult := []model.SearchSpansResult{
{

View File

@@ -108,6 +108,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
signoz.Modules.OrgGetter,
signoz.Querier,
signoz.Instrumentation.Logger(),
signoz.QueryParser,
)
if err != nil {
return nil, err
@@ -335,6 +336,7 @@ func makeRulesManager(
orgGetter organization.Getter,
querier querier.Querier,
logger *slog.Logger,
queryParser queryparser.QueryParser,
) (*rules.Manager, error) {
ruleStore := sqlrulestore.NewRuleStore(sqlstore)
maintenanceStore := sqlrulestore.NewMaintenanceStore(sqlstore)
@@ -354,6 +356,7 @@ func makeRulesManager(
RuleStore: ruleStore,
MaintenanceStore: maintenanceStore,
SqlStore: sqlstore,
QueryParser: queryParser,
}
// create Manager

View File

@@ -81,6 +81,7 @@ type Reader interface {
CheckClickHouse(ctx context.Context) error
GetMetricMetadata(context.Context, valuer.UUID, string, string) (*v3.MetricMetadataResponse, error)
GetFirstSeenFromMetricMetadata(ctx context.Context, lookupKeys []model.MetricMetadataLookupKey) (map[model.MetricMetadataLookupKey]int64, error)
AddRuleStateHistory(ctx context.Context, ruleStateHistory []model.RuleStateHistory) error
GetOverallStateTransitions(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.ReleStateItem, error)

View File

@@ -516,3 +516,9 @@ type LogsAggregateParams struct {
Function string `json:"function"`
StepSeconds int `json:"step"`
}
type MetricMetadataLookupKey struct {
MetricName string
AttributeName string
AttributeValue string
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/sqlstore"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -88,6 +89,11 @@ type BaseRule struct {
sqlstore sqlstore.SQLStore
evaluation ruletypes.Evaluation
// newGroupEvalDelay is the grace period for new alert groups
newGroupEvalDelay *time.Duration
queryParser queryparser.QueryParser
}
type RuleOption func(*BaseRule)
@@ -122,6 +128,12 @@ func WithSQLStore(sqlstore sqlstore.SQLStore) RuleOption {
}
}
func WithQueryParser(queryParser queryparser.QueryParser) RuleOption {
return func(r *BaseRule) {
r.queryParser = queryParser
}
}
func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader interfaces.Reader, opts ...RuleOption) (*BaseRule, error) {
if p.RuleCondition == nil || !p.RuleCondition.IsValid() {
return nil, fmt.Errorf("invalid rule condition")
@@ -154,6 +166,12 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader
evaluation: evaluation,
}
// Store newGroupEvalDelay and groupBy keys from NotificationSettings
if p.NotificationSettings != nil && p.NotificationSettings.NewGroupEvalDelay != nil {
newGroupEvalDelay := time.Duration(*p.NotificationSettings.NewGroupEvalDelay)
baseRule.newGroupEvalDelay = &newGroupEvalDelay
}
if baseRule.evalWindow == 0 {
baseRule.evalWindow = 5 * time.Minute
}
@@ -528,3 +546,136 @@ func (r *BaseRule) PopulateTemporality(ctx context.Context, orgID valuer.UUID, q
}
return nil
}
// ShouldSkipNewGroups returns true if new group filtering should be applied
func (r *BaseRule) ShouldSkipNewGroups() bool {
return r.newGroupEvalDelay != nil && *r.newGroupEvalDelay > 0
}
// extractMetricAndGroupBys extracts metric names and groupBy keys from the rule's query.
// TODO: implement caching for query parsing results to avoid re-parsing the query + cache invalidation
func (r *BaseRule) extractMetricAndGroupBys(ctx context.Context) ([]string, []string, error) {
var metricNames []string
var groupedFields []string
result, err := r.queryParser.AnalyzeCompositeQuery(ctx, r.ruleCondition.CompositeQuery)
if err != nil {
return nil, nil, err
}
metricNames = result.MetricNames
for _, col := range result.GroupByColumns {
groupedFields = append(groupedFields, col.OriginField)
}
return metricNames, groupedFields, nil
}
// FilterNewSeries filters out items that are too new based on metadata first_seen timestamps.
// Returns the filtered series and the number of series that were skipped.
func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series ruletypes.LabelledCollection) (ruletypes.LabelledCollection, int, error) {
// Extract metric names and groupBy keys
metricNames, groupedFields, err := r.extractMetricAndGroupBys(ctx)
if err != nil {
return nil, 0, err
}
if len(metricNames) == 0 || len(groupedFields) == 0 {
// No metrics or groupBy keys, nothing to filter (non-ideal case, return early)
return series, 0, nil
}
// Build lookup keys from series
lookupKeys := make([]model.MetricMetadataLookupKey, 0)
seriesIdxToLookupKeys := make(map[int][]model.MetricMetadataLookupKey) // series index -> lookup keys
for i := 0; i < series.Len(); i++ {
labels := series.GetItem(i).GetLabels()
metricLabelMap := make(map[string]string)
for _, lbl := range labels {
metricLabelMap[lbl.Name] = lbl.Value
}
// Collect groupBy attribute-value pairs for this series
seriesKeys := make([]model.MetricMetadataLookupKey, 0)
for _, metricName := range metricNames {
for _, groupByKey := range groupedFields {
if attrValue, ok := metricLabelMap[groupByKey]; ok {
lookupKey := model.MetricMetadataLookupKey{
MetricName: metricName,
AttributeName: groupByKey,
AttributeValue: attrValue,
}
lookupKeys = append(lookupKeys, lookupKey)
seriesKeys = append(seriesKeys, lookupKey)
}
}
}
if len(seriesKeys) > 0 {
seriesIdxToLookupKeys[i] = seriesKeys
}
}
if len(lookupKeys) == 0 {
// No lookup keys to query, return original series
return series, 0, nil
}
// Query metadata for first_seen timestamps
firstSeenMap, err := r.reader.GetFirstSeenFromMetricMetadata(ctx, lookupKeys)
if err != nil {
return nil, 0, err
}
// Filter series based on first_seen + delay
preservedIndices := make([]int, 0)
skippedCount := 0
evalTimeMs := ts.UnixMilli()
newGroupEvalDelayMs := r.newGroupEvalDelay.Milliseconds()
for i := 0; i < series.Len(); i++ {
seriesKeys, ok := seriesIdxToLookupKeys[i]
if !ok {
// No matching lables used in groupBy from this series, include it
preservedIndices = append(preservedIndices, i)
continue
}
// Find the maximum first_seen across all groupBy attributes for this series
// if the lastest is old enought we're good, if latest is new we need to skip it
maxFirstSeen := int64(0)
foundAny := false
for _, lookupKey := range seriesKeys {
if firstSeen, exists := firstSeenMap[lookupKey]; exists {
foundAny = true
if firstSeen > maxFirstSeen {
maxFirstSeen = firstSeen
}
}
}
if !foundAny {
// No metadata found - treat as new, skip it
skippedCount++
continue
}
// Check if first_seen + delay has passed
if maxFirstSeen+newGroupEvalDelayMs > evalTimeMs {
// Still within grace period, skip this series
skippedCount++
continue
}
// Old enough, include it
preservedIndices = append(preservedIndices, i)
}
if r.logger != nil && skippedCount > 0 {
r.logger.InfoContext(ctx, "Filtered new series", "rule_name", r.Name(), "skipped_count", skippedCount, "total_count", series.Len(), "delay_ms", newGroupEvalDelayMs)
}
return series.Filter(preservedIndices), skippedCount, nil
}

View File

@@ -1,12 +1,20 @@
package rules
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
func TestBaseRule_RequireMinPoints(t *testing.T) {
@@ -81,3 +89,80 @@ func TestBaseRule_RequireMinPoints(t *testing.T) {
})
}
}
func TestBaseRule_FilterNewSeries(t *testing.T) {
logger := instrumentationtest.New().Logger()
ctx := context.Background()
delay := 30 * time.Minute
evalTime := time.Unix(1_700_000_000, 0)
builderSpec := qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
Signal: telemetrytypes.SignalMetrics,
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
},
Aggregations: []qbtypes.MetricAggregation{
{MetricName: "request_total"},
},
}
firstSeen := map[model.MetricMetadataLookupKey]int64{
{MetricName: "request_total", AttributeName: "service_name", AttributeValue: "svc-old"}: evalTime.Add(-2 * delay).UnixMilli(),
{MetricName: "request_total", AttributeName: "env", AttributeValue: "prod"}: evalTime.Add(-2 * delay).UnixMilli(),
{MetricName: "request_total", AttributeName: "service_name", AttributeValue: "svc-new"}: evalTime.Add(-5 * time.Minute).UnixMilli(),
}
reader := &mockReader{response: firstSeen}
baseRule := &BaseRule{
ruleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{{
Type: qbtypes.QueryTypeBuilder,
Spec: builderSpec,
}},
},
},
newGroupEvalDelay: &delay,
reader: reader,
logger: logger,
}
vector := ruletypes.Vector{
{Metric: labels.Labels{{Name: labels.MetricNameLabel, Value: "request_total"}, {Name: "service_name", Value: "svc-old"}, {Name: "env", Value: "prod"}}},
{Metric: labels.Labels{{Name: labels.MetricNameLabel, Value: "request_total"}, {Name: "service_name", Value: "svc-new"}, {Name: "env", Value: "prod"}}},
{Metric: labels.Labels{{Name: labels.MetricNameLabel, Value: "request_total"}, {Name: "service_name", Value: "svc-missing"}, {Name: "env", Value: "stage"}}},
{Metric: labels.Labels{{Name: labels.MetricNameLabel, Value: "request_total"}, {Name: "status", Value: "200"}}},
}
collection := ruletypes.NewVectorLabelledCollection(vector)
filtered, skipped, err := baseRule.FilterNewSeries(ctx, evalTime, collection)
require.NoError(t, err)
require.Equal(t, 2, skipped)
filteredVector := filtered.(*ruletypes.VectorLabelledCollection).Vector()
require.Len(t, filteredVector, 2)
services := make([]string, 0, len(filteredVector))
for _, sample := range filteredVector {
services = append(services, sample.Metric.Get("service_name"))
}
require.ElementsMatch(t, []string{"svc-old", ""}, services)
}
type mockReader struct {
interfaces.Reader
response map[model.MetricMetadataLookupKey]int64
err error
calls [][]model.MetricMetadataLookupKey
}
func (m *mockReader) GetFirstSeenFromMetricMetadata(_ context.Context, lookupKeys []model.MetricMetadataLookupKey) (map[model.MetricMetadataLookupKey]int64, error) {
keysCopy := append([]model.MetricMetadataLookupKey(nil), lookupKeys...)
m.calls = append(m.calls, keysCopy)
if m.err != nil {
return nil, m.err
}
return m.response, nil
}

View File

@@ -11,6 +11,7 @@ import (
"time"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/queryparser"
"go.uber.org/zap"
@@ -103,6 +104,7 @@ type ManagerOptions struct {
RuleStore ruletypes.RuleStore
MaintenanceStore ruletypes.MaintenanceStore
SqlStore sqlstore.SQLStore
QueryParser queryparser.QueryParser
}
// The Manager manages recording and alerting rules.
@@ -125,6 +127,8 @@ type Manager struct {
alertmanager alertmanager.Alertmanager
sqlstore sqlstore.SQLStore
orgGetter organization.Getter
// queryParser is used for parsing queries for rules
queryParser queryparser.QueryParser
}
func defaultOptions(o *ManagerOptions) *ManagerOptions {
@@ -166,6 +170,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.SLogger,
WithEvalDelay(opts.ManagerOpts.EvalDelay),
WithSQLStore(opts.SQLStore),
WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
@@ -188,6 +193,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.Reader,
opts.ManagerOpts.Prometheus,
WithSQLStore(opts.SQLStore),
WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
@@ -226,6 +232,7 @@ func NewManager(o *ManagerOptions) (*Manager, error) {
alertmanager: o.Alertmanager,
orgGetter: o.OrgGetter,
sqlstore: o.SqlStore,
queryParser: o.QueryParser,
}
zap.L().Debug("Manager created successfully with NotificationGroup")

View File

@@ -135,6 +135,17 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
return nil, err
}
// Filter out new series if newGroupEvalDelay is configured
if r.ShouldSkipNewGroups() {
collection := ruletypes.NewPromMatrixLabelledCollection(res)
filteredCollection, _, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, collection)
if filterErr != nil {
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
return nil, filterErr
}
res = filteredCollection.(*ruletypes.PromMatrixLabelledCollection).Matrix()
}
var resultVector ruletypes.Vector
for _, series := range res {
resultSeries, err := r.Threshold.Eval(toCommonSeries(series), r.Unit(), ruletypes.EvalData{

View File

@@ -600,6 +600,17 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
return nil, err
}
// Filter out new series if newGroupEvalDelay is configured
if r.ShouldSkipNewGroups() {
collection := ruletypes.NewVectorLabelledCollection(res)
filteredCollection, _, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, collection)
if filterErr != nil {
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
return nil, filterErr
}
res = filteredCollection.(*ruletypes.VectorLabelledCollection).Vector()
}
r.mtx.Lock()
defer r.mtx.Unlock()

View File

@@ -3,6 +3,7 @@ package queryparser
import (
"context"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
@@ -11,4 +12,6 @@ import (
type QueryParser interface {
// AnalyzeQueryFilter extracts filter conditions from a given query string.
AnalyzeQueryFilter(ctx context.Context, queryType querybuildertypesv5.QueryType, query string) (*queryfilterextractor.FilterResult, error)
// AnalyzeCompositeQuery extracts filter conditions from a composite query.
AnalyzeCompositeQuery(ctx context.Context, compositeQuery *v3.CompositeQuery) (*queryfilterextractor.FilterResult, error)
}

View File

@@ -1,40 +0,0 @@
package queryparser
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type queryParserImpl struct {
settings factory.ProviderSettings
}
// New creates a new implementation of the QueryParser service.
func New(settings factory.ProviderSettings) QueryParser {
return &queryParserImpl{
settings: settings,
}
}
func (p *queryParserImpl) AnalyzeQueryFilter(ctx context.Context, queryType querybuildertypesv5.QueryType, query string) (*queryfilterextractor.FilterResult, error) {
var extractorType queryfilterextractor.ExtractorType
switch queryType {
case querybuildertypesv5.QueryTypePromQL:
extractorType = queryfilterextractor.ExtractorTypePromQL
case querybuildertypesv5.QueryTypeClickHouseSQL:
extractorType = queryfilterextractor.ExtractorTypeClickHouseSQL
default:
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported queryType: %s. Supported values are '%s' and '%s'", queryType, querybuildertypesv5.QueryTypePromQL, querybuildertypesv5.QueryTypeClickHouseSQL)
}
// Create extractor
extractor, err := queryfilterextractor.NewExtractor(extractorType)
if err != nil {
return nil, err
}
return extractor.Extract(query)
}

View File

@@ -0,0 +1,101 @@
package queryparser
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type queryParserImpl struct {
settings factory.ProviderSettings
}
// New creates a new implementation of the QueryParser service.
func New(settings factory.ProviderSettings) QueryParser {
return &queryParserImpl{
settings: settings,
}
}
func (p *queryParserImpl) AnalyzeQueryFilter(ctx context.Context, queryType qbtypes.QueryType, query string) (*queryfilterextractor.FilterResult, error) {
var extractorType queryfilterextractor.ExtractorType
switch queryType {
case qbtypes.QueryTypePromQL:
extractorType = queryfilterextractor.ExtractorTypePromQL
case qbtypes.QueryTypeClickHouseSQL:
extractorType = queryfilterextractor.ExtractorTypeClickHouseSQL
default:
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported queryType: %s. Supported values are '%s' and '%s'", queryType, qbtypes.QueryTypePromQL, qbtypes.QueryTypeClickHouseSQL)
}
// Create extractor
extractor, err := queryfilterextractor.NewExtractor(extractorType)
if err != nil {
return nil, err
}
return extractor.Extract(query)
}
func (p *queryParserImpl) AnalyzeCompositeQuery(ctx context.Context, compositeQuery *v3.CompositeQuery) (*queryfilterextractor.FilterResult, error) {
var result = &queryfilterextractor.FilterResult{
MetricNames: []string{},
GroupByColumns: []queryfilterextractor.ColumnInfo{},
}
for _, query := range compositeQuery.Queries {
switch query.Type {
case qbtypes.QueryTypeBuilder:
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
// extract group by fields
for _, groupBy := range spec.GroupBy {
if groupBy.Name != "" {
result.GroupByColumns = append(result.GroupByColumns, queryfilterextractor.ColumnInfo{Name: groupBy.Name, OriginExpr: groupBy.Name, OriginField: groupBy.Name})
}
}
// extract metric names
for _, aggregation := range spec.Aggregations {
if aggregation.MetricName != "" {
result.MetricNames = append(result.MetricNames, aggregation.MetricName)
}
}
default:
// TODO: add support for Traces and Logs Aggregation types
if p.settings.Logger != nil {
p.settings.Logger.WarnContext(ctx, "unsupported QueryBuilderQuery type: %T", spec)
}
continue
}
case qbtypes.QueryTypePromQL:
spec, ok := query.Spec.(qbtypes.PromQuery)
if !ok || spec.Query == "" {
continue
}
res, err := p.AnalyzeQueryFilter(ctx, qbtypes.QueryTypePromQL, spec.Query)
if err != nil {
return nil, err
}
result.MetricNames = append(result.MetricNames, res.MetricNames...)
result.GroupByColumns = append(result.GroupByColumns, res.GroupByColumns...)
case qbtypes.QueryTypeClickHouseSQL:
spec, ok := query.Spec.(qbtypes.ClickHouseQuery)
if !ok || spec.Query == "" {
continue
}
res, err := p.AnalyzeQueryFilter(ctx, qbtypes.QueryTypeClickHouseSQL, spec.Query)
if err != nil {
return nil, err
}
result.MetricNames = append(result.MetricNames, res.MetricNames...)
result.GroupByColumns = append(result.GroupByColumns, res.GroupByColumns...)
default:
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported query type: %s", query.Type)
}
}
return result, nil
}

View File

@@ -0,0 +1,112 @@
package queryparser
import (
"context"
"encoding/json"
"testing"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
"github.com/stretchr/testify/require"
)
func TestBaseRule_ExtractMetricAndGroupBys(t *testing.T) {
ctx := context.Background()
tests := []struct {
name string
payload string
wantMetrics []string
wantGroupBy []queryfilterextractor.ColumnInfo
}{
{
name: "builder multiple grouping",
payload: builderQueryWithGrouping,
wantMetrics: []string{"test_metric_cardinality", "cpu_usage_total"},
wantGroupBy: []queryfilterextractor.ColumnInfo{
{Name: "service_name", Alias: "", OriginExpr: "service_name", OriginField: "service_name"},
{Name: "env", Alias: "", OriginExpr: "env", OriginField: "env"},
},
},
{
name: "builder single grouping",
payload: builderQuerySingleGrouping,
wantMetrics: []string{"latency_p50"},
wantGroupBy: []queryfilterextractor.ColumnInfo{
{Name: "namespace", Alias: "", OriginExpr: "namespace", OriginField: "namespace"},
},
},
{
name: "builder no grouping",
payload: builderQueryNoGrouping,
wantMetrics: []string{"disk_usage_total"},
wantGroupBy: []queryfilterextractor.ColumnInfo{},
},
{
name: "promql multiple grouping",
payload: promQueryWithGrouping,
wantMetrics: []string{"http_requests_total"},
wantGroupBy: []queryfilterextractor.ColumnInfo{
{Name: "pod", Alias: "", OriginExpr: "pod", OriginField: "pod"},
{Name: "region", Alias: "", OriginExpr: "region", OriginField: "region"},
},
},
{
name: "promql single grouping",
payload: promQuerySingleGrouping,
wantMetrics: []string{"cpu_usage_seconds_total"},
wantGroupBy: []queryfilterextractor.ColumnInfo{
{Name: "env", Alias: "", OriginExpr: "env", OriginField: "env"},
},
},
{
name: "promql no grouping",
payload: promQueryNoGrouping,
wantMetrics: []string{"node_cpu_seconds_total"},
wantGroupBy: []queryfilterextractor.ColumnInfo{},
},
{
name: "clickhouse multiple grouping",
payload: clickHouseQueryWithGrouping,
wantMetrics: []string{"cpu"},
wantGroupBy: []queryfilterextractor.ColumnInfo{
{Name: "region", Alias: "r", OriginExpr: "region", OriginField: "region"},
{Name: "zone", Alias: "", OriginExpr: "zone", OriginField: "zone"},
},
},
{
name: "clickhouse single grouping",
payload: clickHouseQuerySingleGrouping,
wantMetrics: []string{"cpu_usage"},
wantGroupBy: []queryfilterextractor.ColumnInfo{
{Name: "region", Alias: "r", OriginExpr: "region", OriginField: "region"},
},
},
{
name: "clickhouse no grouping",
payload: clickHouseQueryNoGrouping,
wantMetrics: []string{"memory_usage"},
wantGroupBy: []queryfilterextractor.ColumnInfo{},
},
}
queryParser := New(instrumentationtest.New().ToProviderSettings())
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cq := mustCompositeQuery(t, tt.payload)
res, err := queryParser.AnalyzeCompositeQuery(ctx, cq)
require.NoError(t, err)
require.ElementsMatch(t, tt.wantMetrics, res.MetricNames)
require.ElementsMatch(t, tt.wantGroupBy, res.GroupByColumns)
})
}
}
func mustCompositeQuery(t *testing.T, payload string) *v3.CompositeQuery {
t.Helper()
var compositeQuery v3.CompositeQuery
require.NoError(t, json.Unmarshal([]byte(payload), &compositeQuery))
return &compositeQuery
}

View File

@@ -0,0 +1,184 @@
package queryparser
var (
builderQueryWithGrouping = `
{
"queryType":"builder",
"panelType":"graph",
"queries":[
{
"type":"builder_query",
"spec":{
"name":"A",
"signal":"metrics",
"stepInterval":null,
"disabled":false,
"filter":{"expression":""},
"groupBy":[
{"name":"service_name","fieldDataType":"","fieldContext":""},
{"name":"env","fieldDataType":"","fieldContext":""}
],
"aggregations":[
{"metricName":"test_metric_cardinality","timeAggregation":"count","spaceAggregation":"sum"},
{"metricName":"cpu_usage_total","timeAggregation":"avg","spaceAggregation":"avg"}
]
}
}
]
}
`
builderQuerySingleGrouping = `
{
"queryType":"builder",
"panelType":"graph",
"queries":[
{
"type":"builder_query",
"spec":{
"name":"B",
"signal":"metrics",
"stepInterval":null,
"disabled":false,
"groupBy":[
{"name":"namespace","fieldDataType":"","fieldContext":""}
],
"aggregations":[
{"metricName":"latency_p50","timeAggregation":"avg","spaceAggregation":"max"}
]
}
}
]
}
`
builderQueryNoGrouping = `
{
"queryType":"builder",
"panelType":"graph",
"queries":[
{
"type":"builder_query",
"spec":{
"name":"C",
"signal":"metrics",
"stepInterval":null,
"disabled":false,
"groupBy":[],
"aggregations":[
{"metricName":"disk_usage_total","timeAggregation":"sum","spaceAggregation":"sum"}
]
}
}
]
}
`
promQueryWithGrouping = `
{
"queries":[
{
"type":"promql",
"spec":{
"name":"P1",
"query":"sum by (pod,region) (rate(http_requests_total[5m]))",
"disabled":false,
"step":0,
"stats":false
}
}
],
"panelType":"graph",
"queryType":"promql"
}
`
promQuerySingleGrouping = `
{
"queries":[
{
"type":"promql",
"spec":{
"name":"P2",
"query":"sum by (env)(rate(cpu_usage_seconds_total{job=\"api\"}[5m]))",
"disabled":false,
"step":0,
"stats":false
}
}
],
"panelType":"graph",
"queryType":"promql"
}
`
promQueryNoGrouping = `
{
"queries":[
{
"type":"promql",
"spec":{
"name":"P3",
"query":"rate(node_cpu_seconds_total[1m])",
"disabled":false,
"step":0,
"stats":false
}
}
],
"panelType":"graph",
"queryType":"promql"
}
`
clickHouseQueryWithGrouping = `
{
"queryType":"clickhouse_sql",
"panelType":"graph",
"queries":[
{
"type":"clickhouse_sql",
"spec":{
"name":"CH1",
"query":"SELECT region as r, zone FROM metrics WHERE metric_name='cpu' GROUP BY region, zone",
"disabled":false
}
}
]
}
`
clickHouseQuerySingleGrouping = `
{
"queryType":"clickhouse_sql",
"panelType":"graph",
"queries":[
{
"type":"clickhouse_sql",
"spec":{
"name":"CH2",
"query":"SELECT region as r FROM metrics WHERE metric_name='cpu_usage' GROUP BY region",
"disabled":false
}
}
]
}
`
clickHouseQueryNoGrouping = `
{
"queryType":"clickhouse_sql",
"panelType":"graph",
"queries":[
{
"type":"clickhouse_sql",
"spec":{
"name":"CH3",
"query":"SELECT * FROM metrics WHERE metric_name = 'memory_usage'",
"disabled":false
}
}
]
}
`
)

View File

@@ -70,6 +70,8 @@ type NotificationSettings struct {
GroupBy []string `json:"groupBy,omitempty"`
Renotify Renotify `json:"renotify,omitempty"`
UsePolicy bool `json:"usePolicy,omitempty"`
// NewGroupEvalDelay is the grace period for new series to be excluded from alerts evaluation
NewGroupEvalDelay *Duration `json:"newGroupEvalDelay,omitempty"`
}
type Renotify struct {

View File

@@ -0,0 +1,120 @@
package ruletypes
import (
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/prometheus/prometheus/promql"
)
// LabelledItem represents an entity that can provide labels
type LabelledItem interface {
// GetLabels returns the labels for this entity
GetLabels() qslabels.Labels
}
// LabelledCollection represents a collection of labelled items
// this is used to abstract the underlying collection type
// and provide a unified interface for working with items which have labels
type LabelledCollection interface {
// Len returns the number of entities in the collection
Len() int
// GetItem returns the entity at the given index
GetItem(index int) LabelledItem
// Filter returns a new filtered collection containing only items at the given indices
Filter(preservedIndices []int) LabelledCollection
}
// VectorLabelledCollection wraps ruletypes.Vector to implement LabelledCollection
type VectorLabelledCollection struct {
vector Vector
}
// NewVectorLabelledCollection creates a new VectorLabelledCollection from a ruletypes.Vector
func NewVectorLabelledCollection(vector Vector) *VectorLabelledCollection {
return &VectorLabelledCollection{vector: vector}
}
// Len returns the number of entities in the vector
func (vc *VectorLabelledCollection) Len() int {
return len(vc.vector)
}
// GetItem returns the entity at the given index
func (vc *VectorLabelledCollection) GetItem(index int) LabelledItem {
return &VectorLabelledItem{sample: vc.vector[index]}
}
// Filter returns a new VectorLabelledCollection containing only items at the given indices
func (vc *VectorLabelledCollection) Filter(indices []int) LabelledCollection {
filtered := make(Vector, 0, len(indices))
for _, idx := range indices {
if idx >= 0 && idx < len(vc.vector) {
filtered = append(filtered, vc.vector[idx])
}
}
return NewVectorLabelledCollection(filtered)
}
func (vc *VectorLabelledCollection) Vector() Vector {
return vc.vector
}
// VectorLabelledItem wraps ruletypes.Sample to implement LabelledItem
type VectorLabelledItem struct {
sample Sample
}
// GetLabels returns the labels from the sample
func (vi *VectorLabelledItem) GetLabels() qslabels.Labels {
return vi.sample.Metric
}
// PromMatrixLabelledCollection wraps promql.Matrix to implement LabelledCollection
type PromMatrixLabelledCollection struct {
matrix promql.Matrix
}
// NewPromMatrixLabelledCollection creates a new PromMatrixLabelledCollection from a promql.Matrix
func NewPromMatrixLabelledCollection(matrix promql.Matrix) *PromMatrixLabelledCollection {
return &PromMatrixLabelledCollection{matrix: matrix}
}
// Len returns the number of entities in the matrix
func (pmc *PromMatrixLabelledCollection) Len() int {
return len(pmc.matrix)
}
// GetItem returns the entity at the given index
func (pmc *PromMatrixLabelledCollection) GetItem(index int) LabelledItem {
return &PromSeriesLabelledItem{series: pmc.matrix[index]}
}
// Filter returns a new PromMatrixLabelledCollection containing only items at the given indices
func (pmc *PromMatrixLabelledCollection) Filter(indices []int) LabelledCollection {
filtered := make(promql.Matrix, 0, len(indices))
for _, idx := range indices {
if idx >= 0 && idx < len(pmc.matrix) {
filtered = append(filtered, pmc.matrix[idx])
}
}
return NewPromMatrixLabelledCollection(filtered)
}
func (pmc *PromMatrixLabelledCollection) Matrix() promql.Matrix {
return pmc.matrix
}
// PromSeriesLabelledItem wraps promql.Series to implement LabelledItem
type PromSeriesLabelledItem struct {
series promql.Series
}
// GetLabels returns the labels from the prometheus series
func (psi *PromSeriesLabelledItem) GetLabels() qslabels.Labels {
metricLabels := make(qslabels.Labels, 0, len(psi.series.Metric))
for _, lbl := range psi.series.Metric {
metricLabels = append(metricLabels, qslabels.Label{Name: lbl.Name, Value: lbl.Value})
}
return metricLabels
}