Compare commits

...

87 Commits

Author SHA1 Message Date
Abhishek Kumar Singh
75e67a7e35 feat: apply templates on Clickhouse query before parsing 2025-12-26 18:22:53 +05:30
Abhishek Kumar Singh
8c67f6ff7a chore: fixed tests for ValidateCompositeQuery 2025-12-26 13:21:05 +05:30
Abhishek Kumar Singh
d62ed6f003 feat: added validation for QueryBuilderFormula 2025-12-26 13:07:34 +05:30
Abhishek Kumar Singh
ef4ef47634 feat: added new error type for Query parsing, added validation for QueryBuilderJoin 2025-12-26 12:52:30 +05:30
Abhishek Kumar Singh
0a42c77ca7 test: added test for ValidateCompositeQuery 2025-12-24 19:16:20 +05:30
Abhishek Kumar Singh
a89bb71f2c feat: added ValidateCompositeQuery in queryparser 2025-12-24 19:14:39 +05:30
Abhishek Kumar Singh
521e5d92e7 test: fixed breaking tests post PostableRule validations 2025-12-24 17:27:09 +05:30
Abhishek Kumar Singh
09b7360513 feat: added struct based validation for PostableRule and it's child structs 2025-12-24 17:20:57 +05:30
Abhishek Kumar Singh
0fd926b8a1 feat: exclude filtering new series in Logs and Traces queries with corresponding tests 2025-12-23 13:21:30 +05:30
Abhishek Kumar Singh
e4214309f4 refactor: moved series filteration logic to new function for prom_rule 2025-12-22 14:30:50 +05:30
Abhishek Kumar Singh
297383ddca feat: don't skip series with missing metadata as we can't decide in this case if series is old/new 2025-12-22 13:51:08 +05:30
Abhishek Kumar Singh
6871eccd28 feat: return series early on no skipped index 2025-12-22 12:01:12 +05:30
Abhishek Kumar Singh
0a272b5b43 Merge branch 'main' into feat/exclude_new_series_from_alert 2025-12-22 11:47:41 +05:30
Abhishek Kumar Singh
4c4387b6d2 test: added test for FilterNewSeries 2025-12-19 23:32:26 +05:30
Abhishek Kumar Singh
cb242e2d4c refactor: removed un-used check from filterNewSeries + fix CH metadata table name 2025-12-19 18:16:16 +05:30
Abhishek Kumar Singh
c98cdc174b refactor: code reuse 2025-12-19 17:00:29 +05:30
Abhishek Kumar Singh
6fc38bac79 refactor: updated FilterNewSeries to use v3.Series a common collection to filter new series 2025-12-19 16:42:03 +05:30
Abhishek Kumar Singh
ddba7e71b7 refactor: improve type assertion for filtered collections in anomaly, prom, and threshold rules 2025-12-19 14:18:21 +05:30
Abhishek Kumar Singh
23f9ff50a7 chore: added helpful comments for FilterNewSeries 2025-12-19 13:30:54 +05:30
Abhishek Kumar Singh
55e5c871fe refactor: user real QP for FilterNewSeries test 2025-12-19 12:53:22 +05:30
Abhishek Kumar Singh
511bb176dd Merge branch 'main' into feat/exclude_new_series_from_alert 2025-12-19 11:50:55 +05:30
Abhishek Kumar Singh
4e0c0319d0 Merge branch 'main' into feat/exclude_new_series_from_alert 2025-12-18 14:49:28 +05:30
Srikanth Chekuri
9e5ea4de9c Merge branch 'main' into feat/exclude_new_series_from_alert 2025-12-15 08:27:00 +05:30
Abhishek Kumar Singh
81e0df09b8 refactor: merge conflict fix 2025-12-03 19:06:18 +05:30
Abhishek Kumar Singh
a522f39b9b Merge branch 'main' into feat/exclude_new_series_from_alert 2025-12-03 19:04:18 +05:30
Abhishek Kumar Singh
affb6eee05 feat: added function in query parser to parse composite query and get filter result 2025-12-02 18:56:19 +05:30
Abhishek Kumar Singh
13a5e9dd24 Merge branch 'feat/groups_in_ch_and_promql_queries' into feat/exclude_new_series_from_alert 2025-12-02 18:02:40 +05:30
Abhishek Kumar Singh
f620767876 refactor: used binding.JSON.BindBody to parse body 2025-12-02 17:58:39 +05:30
Srikanth Chekuri
9fb8b2bb1b Merge branch 'main' into feat/groups_in_ch_and_promql_queries 2025-12-02 17:26:54 +05:30
Abhishek Kumar Singh
30494c9196 chore: updated comments 2025-12-02 16:58:06 +05:30
Abhishek Kumar Singh
cae4cf0777 refactor: use query parser in baseRule 2025-12-02 16:55:58 +05:30
Abhishek Kumar Singh
c9538b0604 Merge branch 'feat/groups_in_ch_and_promql_queries' into feat/exclude_new_series_from_alert 2025-12-02 15:57:15 +05:30
Abhishek Kumar Singh
204cc4e5c5 Merge branch 'main' into feat/groups_in_ch_and_promql_queries 2025-12-02 15:47:30 +05:30
Abhishek Kumar Singh
6dd2ffcb64 refactor: update API handler to use new queryparser package for query parsing 2025-11-27 20:11:16 +05:30
Abhishek Kumar Singh
13c15249c5 Merge branch 'feat/groups_in_ch_and_promql_queries' into feat/exclude_new_series_from_alert 2025-11-27 19:51:27 +05:30
Abhishek Kumar Singh
8419ca7982 Merge branch 'main' into feat/exclude_new_series_from_alert 2025-11-27 19:49:57 +05:30
Abhishek Kumar Singh
6b189b14c6 chore: updated series collection to labelledcollection 2025-11-27 19:45:01 +05:30
Abhishek Kumar Singh
550c49fab0 feat: created queryparser package with APIs 2025-11-27 19:37:41 +05:30
Abhishek Kumar Singh
5b6ff92648 refactor: moved package queryfilterextractor in pkg/queryparser 2025-11-27 19:14:55 +05:30
Abhishek Kumar Singh
45954b38fa Merge branch 'main' into feat/exclude_new_series_from_alert 2025-11-25 12:27:00 +05:30
Abhishek Kumar Singh
ceade6c7d7 refactor: moved query parser APIs to queryfilterextractor package 2025-11-24 14:15:44 +05:30
Srikanth Chekuri
f15c88836c Merge branch 'main' into feat/groups_in_ch_and_promql_queries 2025-11-21 15:26:44 +05:30
Abhishek Kumar Singh
9af45643a9 refactor: created valuer enum for extractor types + exposed alias along with column name for groups 2025-11-21 15:16:03 +05:30
Srikanth Chekuri
d15e974e9f Merge branch 'main' into feat/groups_in_ch_and_promql_queries 2025-11-20 22:59:29 +05:30
Abhishek Kumar Singh
71e752a015 refactor: moved api request and response to types 2025-11-20 20:00:51 +05:30
Abhishek Kumar Singh
3407760585 Merge branch 'feat/queryfilterextractor' into feat/groups_in_ch_and_promql_queries 2025-11-20 19:02:11 +05:30
Abhishek Kumar Singh
58a0e36869 refactor: return relevant errors when parsing query 2025-11-20 19:01:40 +05:30
Abhishek Kumar Singh
5d688eb919 test: updated test case to include CH query error case 2025-11-20 18:55:45 +05:30
Abhishek Kumar Singh
c0f237a7c4 refactor: use render.Error instead of RespondError 2025-11-20 18:12:47 +05:30
Abhishek Kumar Singh
8ce8bc940a Merge branch 'feat/queryfilterextractor' into feat/groups_in_ch_and_promql_queries 2025-11-20 17:42:46 +05:30
Abhishek Kumar Singh
abce05b289 feat: exposed group name from column info 2025-11-20 17:38:00 +05:30
Abhishek Kumar Singh
ccd25c3b67 refactor: removed redundant checks 2025-11-20 17:24:36 +05:30
Abhishek Kumar Singh
ddb98da217 refactor: change function visibility for extractMetricAndGroupBys 2025-11-20 15:56:45 +05:30
Abhishek Kumar Singh
18d63d2e66 fix: close CH rows on reading each chunk 2025-11-20 15:44:07 +05:30
Abhishek Kumar Singh
67c108f021 feat: added new series filter in anomaly rule as well 2025-11-20 15:26:14 +05:30
Abhishek Kumar Singh
02939cafa4 refactor: added GetFirstSeenFromMetricMetadata in clickhouse reader, removed caching for query result 2025-11-20 15:24:14 +05:30
Abhishek Kumar Singh
e62b070c1e fix: created interface for standard series filtration from both prom and threshold rule 2025-11-20 14:35:25 +05:30
Srikanth Chekuri
be0a7d8fd4 Merge branch 'main' into feat/queryfilterextractor 2025-11-20 02:48:43 +05:30
Srikanth Chekuri
419044dc9e Merge branch 'main' into feat/queryfilterextractor 2025-11-19 22:42:12 +05:30
Abhishek Kumar Singh
223465d6d5 Merge branch 'feat/queryfilterextractor' into feat/exclude_new_series_from_alert 2025-11-19 20:56:01 +05:30
Abhishek Kumar Singh
cec99674fa feat: exclude new samples from alert evals 2025-11-19 20:52:59 +05:30
Abhishek Kumar Singh
0ccf58ac7a fix: common behaviour across CH and PromQL originField and originExpr 2025-11-19 20:36:00 +05:30
Abhishek Kumar Singh
b08d636d6a refactor: updated query type check with constants 2025-11-19 20:12:20 +05:30
Abhishek Kumar Singh
f6141bc6c5 feat: added API to extract metric names and groups from CH or PromQL query 2025-11-19 16:48:03 +05:30
Srikanth Chekuri
bfe49f0f1b Merge branch 'main' into feat/queryfilterextractor 2025-11-19 14:21:35 +05:30
Abhishek Kumar Singh
8e8064c5c1 fix: ci lint issues 2025-11-19 13:28:04 +05:30
Abhishek Kumar Singh
4392341467 improv: added comments for CH originparser + some code improv 2025-11-19 12:58:34 +05:30
Abhishek Kumar Singh
521d8e4f4d improv: added more tests for promql and added comments 2025-11-19 12:15:20 +05:30
Abhishek Kumar Singh
b6103f371f Merge branch 'main' into feat/queryfilterextractor 2025-11-18 21:09:45 +05:30
Abhishek Kumar Singh
43283506db Merge branch 'feat/queryfilterextractor_complex' into feat/queryfilterextractor 2025-11-18 15:22:24 +05:30
Abhishek Kumar Singh
694d9958db improv: integrated origin field extraction and updated tests to check for origin fields 2025-11-18 15:03:24 +05:30
Abhishek Kumar Singh
addee4c0a5 feat: added origin field extractor for ch query 2025-11-18 14:36:03 +05:30
Abhishek Kumar Singh
f10cf7ac04 refactor: code organisation 2025-11-17 16:27:17 +05:30
Abhishek Kumar Singh
b336678639 fix: CH test cases 2025-11-17 15:01:32 +05:30
Abhishek Kumar Singh
c438b3444e refactor: removed GroupBy from FilterResult 2025-11-17 14:34:46 +05:30
Abhishek Kumar Singh
b624414507 feat: extract column origin from subquery and join before searching directly 2025-11-17 13:42:47 +05:30
Abhishek Kumar Singh
bde7963444 feat: implemented extractOriginFromSelectItem which will find the given columnName till the very end to return the origin column with given name 2025-11-17 09:00:18 +05:30
Abhishek Kumar Singh
2df93ff217 feat: extract column origin from query and add in column info 2025-11-16 10:20:38 +05:30
Abhishek Kumar Singh
f496a6ecde improv: updated result for queryfilterextractor to return column with alias 2025-11-16 08:58:33 +05:30
Abhishek Kumar Singh
599e230a72 feat: added NewExtractor function for creating extractor 2025-11-13 13:52:32 +05:30
Abhishek Kumar Singh
9a0e32ff3b refactor: removed redundant non nil checks 2025-11-13 13:41:51 +05:30
Abhishek Kumar Singh
5fe2732698 refactor: removed unused extractFromAnyFunction 2025-11-13 13:20:59 +05:30
Abhishek Kumar Singh
4993a44ecc refactor: removed unused cases + added comments 2025-11-13 12:59:35 +05:30
Abhishek Kumar Singh
ebd575a16b chore: comments + remove usage of seen map in extractGroupFromGroupByClause 2025-11-12 19:26:44 +05:30
Abhishek Kumar Singh
666582337e feat: support for CTE in clickhouse queryfilterextractor 2025-11-12 18:58:30 +05:30
Abhishek Kumar Singh
23512ab05c feat: added support for promql in queryfilterextractor 2025-11-10 20:50:42 +05:30
Abhishek Kumar Singh
1423749529 feat: added filter extractor interface and clickhouse impl with tests 2025-11-10 20:05:39 +05:30
33 changed files with 2747 additions and 77 deletions

View File

@@ -376,6 +376,7 @@ func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertma
RuleStore: ruleStore,
MaintenanceStore: maintenanceStore,
SqlStore: sqlstore,
QueryParser: queryParser,
}
// create Manager

View File

@@ -207,6 +207,42 @@ func (r *AnomalyRule) GetSelectedQuery() string {
return r.Condition().GetSelectedQueryName()
}
// filterNewSeries filters out new series based on the first_seen timestamp.
func (r *AnomalyRule) filterNewSeries(ctx context.Context, ts time.Time, series []*v3.Series) ([]*v3.Series, error) {
// Convert []*v3.Series to []v3.Series for filtering
v3Series := make([]v3.Series, 0, len(series))
for _, s := range series {
v3Series = append(v3Series, *s)
}
// Get indexes to skip
skipIndexes, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, v3Series)
if filterErr != nil {
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
return nil, filterErr
}
// if no series are skipped, return the original series
if len(skipIndexes) == 0 {
return series, nil
}
// Create a map of skip indexes for efficient lookup
skippedIdxMap := make(map[int]struct{}, len(skipIndexes))
for _, idx := range skipIndexes {
skippedIdxMap[idx] = struct{}{}
}
// Filter out skipped series
oldSeries := make([]*v3.Series, 0, len(series)-len(skipIndexes))
for i, s := range series {
if _, shouldSkip := skippedIdxMap[i]; !shouldSkip {
oldSeries = append(oldSeries, s)
}
}
return oldSeries, nil
}
func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, ts time.Time) (ruletypes.Vector, error) {
params, err := r.prepareQueryRange(ctx, ts)
@@ -239,7 +275,18 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
scoresJSON, _ := json.Marshal(queryResult.AnomalyScores)
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
for _, series := range queryResult.AnomalyScores {
// Filter out new series if newGroupEvalDelay is configured
seriesToProcess := queryResult.AnomalyScores
if r.ShouldSkipNewGroups() {
filteredSeries, filterErr := r.filterNewSeries(ctx, ts, seriesToProcess)
if filterErr != nil {
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
return nil, filterErr
}
seriesToProcess = filteredSeries
}
for _, series := range seriesToProcess {
if r.Condition() != nil && r.Condition().RequireMinPoints {
if len(series.Points) < r.Condition().RequiredNumPoints {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
@@ -291,7 +338,18 @@ func (r *AnomalyRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID,
scoresJSON, _ := json.Marshal(queryResult.AnomalyScores)
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
for _, series := range queryResult.AnomalyScores {
// Filter out new series if newGroupEvalDelay is configured
seriesToProcess := queryResult.AnomalyScores
if r.ShouldSkipNewGroups() {
filteredSeries, filterErr := r.filterNewSeries(ctx, ts, seriesToProcess)
if filterErr != nil {
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
return nil, filterErr
}
seriesToProcess = filteredSeries
}
for _, series := range seriesToProcess {
if r.Condition().RequireMinPoints {
if len(series.Points) < r.Condition().RequiredNumPoints {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)

View File

@@ -37,6 +37,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.SLogger,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithSQLStore(opts.SQLStore),
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
@@ -59,6 +60,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Reader,
opts.ManagerOpts.Prometheus,
baserules.WithSQLStore(opts.SQLStore),
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
@@ -82,6 +84,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Cache,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithSQLStore(opts.SQLStore),
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
return task, err
@@ -140,6 +143,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
@@ -160,6 +164,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
@@ -179,6 +184,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
zap.L().Error("failed to prepare a new anomaly rule for test", zap.String("name", alertname), zap.Error(err))

View File

@@ -95,6 +95,7 @@ const (
signozLocalTableAttributesMetadata = "attributes_metadata"
signozUpdatedMetricsMetadataLocalTable = "updated_metadata"
signozMetricsMetadataLocalTable = "metadata"
signozUpdatedMetricsMetadataTable = "distributed_updated_metadata"
minTimespanForProgressiveSearch = time.Hour
minTimespanForProgressiveSearchMargin = time.Minute
@@ -6439,6 +6440,73 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID
return cachedMetadata, nil
}
// GetFirstSeenFromMetricMetadata queries the metadata table to get the first_seen timestamp
// for each metric-attribute-value combination.
// Returns a map where key is `model.MetricMetadataLookupKey` and value is first_seen in milliseconds.
func (r *ClickHouseReader) GetFirstSeenFromMetricMetadata(ctx context.Context, lookupKeys []model.MetricMetadataLookupKey) (map[model.MetricMetadataLookupKey]int64, error) {
// Chunk the lookup keys to avoid overly large queries (max 300 tuples per query)
const chunkSize = 300
result := make(map[model.MetricMetadataLookupKey]int64)
for i := 0; i < len(lookupKeys); i += chunkSize {
end := i + chunkSize
if end > len(lookupKeys) {
end = len(lookupKeys)
}
chunk := lookupKeys[i:end]
// Build the IN clause values - ClickHouse uses tuple syntax with placeholders
var valueStrings []string
var args []interface{}
for _, key := range chunk {
valueStrings = append(valueStrings, "(?, ?, ?)")
args = append(args, key.MetricName, key.AttributeName, key.AttributeValue)
}
query := fmt.Sprintf(`
SELECT
m.metric_name,
m.attr_name,
m.attr_string_value,
min(m.last_reported_unix_milli) AS first_seen
FROM %s.%s AS m
WHERE (m.metric_name, m.attr_name, m.attr_string_value) IN (%s)
GROUP BY m.metric_name, m.attr_name, m.attr_string_value
ORDER BY first_seen`,
signozMetricDBName, signozMetricsMetadataLocalTable, strings.Join(valueStrings, ", "))
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
rows, err := r.db.Query(valueCtx, query, args...)
if err != nil {
zap.L().Error("Error querying metadata for first_seen", zap.Error(err))
return nil, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error querying metadata for first_seen: %v", err)}
}
for rows.Next() {
var metricName, attrName, attrValue string
var firstSeen uint64
if err := rows.Scan(&metricName, &attrName, &attrValue, &firstSeen); err != nil {
rows.Close()
return nil, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning metadata first_seen result: %v", err)}
}
result[model.MetricMetadataLookupKey{
MetricName: metricName,
AttributeName: attrName,
AttributeValue: attrValue,
}] = int64(firstSeen)
}
if err := rows.Err(); err != nil {
rows.Close()
return nil, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error iterating metadata first_seen results: %v", err)}
}
rows.Close()
}
return result, nil
}
func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) {
searchSpansResult := []model.SearchSpansResult{
{

View File

@@ -361,6 +361,7 @@ func makeRulesManager(
RuleStore: ruleStore,
MaintenanceStore: maintenanceStore,
SqlStore: sqlstore,
QueryParser: queryParser,
}
// create Manager

View File

@@ -1,8 +1,17 @@
package converter
import "github.com/SigNoz/signoz/pkg/errors"
// Unit represents a unit of measurement
type Unit string
func (u Unit) Validate() error {
if !IsValidUnit(u) {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid unit: %s", u)
}
return nil
}
// Value represents a value with a unit of measurement
type Value struct {
F float64
@@ -60,6 +69,27 @@ func FromUnit(u Unit) Converter {
}
}
// IsValidUnit returns true if the given unit is valid
func IsValidUnit(u Unit) bool {
switch u {
// Duration unit
case "ns", "us", "µs", "ms", "s", "m", "h", "d", "min",
// Data unit
"bytes", "decbytes", "bits", "decbits", "kbytes", "decKbytes", "deckbytes", "mbytes", "decMbytes", "decmbytes", "gbytes", "decGbytes", "decgbytes", "tbytes", "decTbytes", "dectbytes", "pbytes", "decPbytes", "decpbytes", "By", "kBy", "MBy", "GBy", "TBy", "PBy",
// Data rate unit
"binBps", "Bps", "binbps", "bps", "KiBs", "Kibits", "KBs", "Kbits", "MiBs", "Mibits", "MBs", "Mbits", "GiBs", "Gibits", "GBs", "Gbits", "TiBs", "Tibits", "TBs", "Tbits", "PiBs", "Pibits", "PBs", "Pbits", "By/s", "kBy/s", "MBy/s", "GBy/s", "TBy/s", "PBy/s", "bit/s", "kbit/s", "Mbit/s", "Gbit/s", "Tbit/s", "Pbit/s",
// Percent unit
"percent", "percentunit", "%",
// Bool unit
"bool", "bool_yes_no", "bool_true_false", "bool_1_0",
// Throughput unit
"cps", "ops", "reqps", "rps", "wps", "iops", "cpm", "opm", "rpm", "wpm", "{count}/s", "{ops}/s", "{req}/s", "{read}/s", "{write}/s", "{iops}/s", "{count}/min", "{ops}/min", "{read}/min", "{write}/min":
return true
default:
return false
}
}
func UnitToName(u string) string {
switch u {
case "ns":

View File

@@ -81,6 +81,7 @@ type Reader interface {
CheckClickHouse(ctx context.Context) error
GetMetricMetadata(context.Context, valuer.UUID, string, string) (*v3.MetricMetadataResponse, error)
GetFirstSeenFromMetricMetadata(ctx context.Context, lookupKeys []model.MetricMetadataLookupKey) (map[model.MetricMetadataLookupKey]int64, error)
AddRuleStateHistory(ctx context.Context, ruleStateHistory []model.RuleStateHistory) error
GetOverallStateTransitions(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.ReleStateItem, error)

View File

@@ -516,3 +516,9 @@ type LogsAggregateParams struct {
Function string `json:"function"`
StepSeconds int `json:"step"`
}
type MetricMetadataLookupKey struct {
MetricName string
AttributeName string
AttributeValue string
}

View File

@@ -9,6 +9,7 @@ import (
"strings"
"time"
"github.com/SigNoz/signoz/pkg/query-service/converter"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/pkg/errors"
"go.uber.org/zap"
@@ -640,6 +641,13 @@ func (c *CompositeQuery) Validate() error {
return fmt.Errorf("query type is invalid: %w", err)
}
// Validate Unit - if provided (non-empty), it should be a valid unit string
if c.Unit != "" {
if err := converter.Unit(c.Unit).Validate(); err != nil {
return err
}
}
return nil
}

View File

@@ -13,7 +13,9 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/sqlstore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
@@ -88,6 +90,11 @@ type BaseRule struct {
sqlstore sqlstore.SQLStore
evaluation ruletypes.Evaluation
// newGroupEvalDelay is the grace period for new alert groups
newGroupEvalDelay *time.Duration
queryParser queryparser.QueryParser
}
type RuleOption func(*BaseRule)
@@ -122,6 +129,12 @@ func WithSQLStore(sqlstore sqlstore.SQLStore) RuleOption {
}
}
func WithQueryParser(queryParser queryparser.QueryParser) RuleOption {
return func(r *BaseRule) {
r.queryParser = queryParser
}
}
func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader interfaces.Reader, opts ...RuleOption) (*BaseRule, error) {
if p.RuleCondition == nil || !p.RuleCondition.IsValid() {
return nil, fmt.Errorf("invalid rule condition")
@@ -154,6 +167,12 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader
evaluation: evaluation,
}
// Store newGroupEvalDelay and groupBy keys from NotificationSettings
if p.NotificationSettings != nil && p.NotificationSettings.NewGroupEvalDelay != nil {
newGroupEvalDelay := time.Duration(*p.NotificationSettings.NewGroupEvalDelay)
baseRule.newGroupEvalDelay = &newGroupEvalDelay
}
if baseRule.evalWindow == 0 {
baseRule.evalWindow = 5 * time.Minute
}
@@ -528,3 +547,166 @@ func (r *BaseRule) PopulateTemporality(ctx context.Context, orgID valuer.UUID, q
}
return nil
}
// ShouldSkipNewGroups returns true if new group filtering should be applied
func (r *BaseRule) ShouldSkipNewGroups() bool {
return r.newGroupEvalDelay != nil && *r.newGroupEvalDelay > 0
}
// isFilterNewSeriesSupported checks if the query is supported for new series filtering
func (r *BaseRule) isFilterNewSeriesSupported() bool {
if r.ruleCondition.CompositeQuery.QueryType == v3.QueryTypeBuilder {
for _, query := range r.ruleCondition.CompositeQuery.Queries {
if query.Type != qbtypes.QueryTypeBuilder {
continue
}
switch query.Spec.(type) {
// query spec is for Logs or Traces, return with blank metric names and group by fields
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
return false
}
}
}
return true
}
// extractMetricAndGroupBys extracts metric names and groupBy keys from the rule's query.
// TODO: implement caching for query parsing results to avoid re-parsing the query + cache invalidation
func (r *BaseRule) extractMetricAndGroupBys(ctx context.Context) ([]string, []string, error) {
var metricNames []string
var groupedFields []string
// check to avoid processing the query for Logs and Traces
// as excluding new series is not supported for Logs and Traces for now
if !r.isFilterNewSeriesSupported() {
return metricNames, groupedFields, nil
}
result, err := r.queryParser.AnalyzeCompositeQuery(ctx, r.ruleCondition.CompositeQuery)
if err != nil {
return nil, nil, err
}
metricNames = result.MetricNames
for _, col := range result.GroupByColumns {
groupedFields = append(groupedFields, col.OriginField)
}
return metricNames, groupedFields, nil
}
// FilterNewSeriesIndexes filters out items that are too new based on metadata first_seen timestamps.
// Returns the indexes that should be skipped (not included in the result).
func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series []v3.Series) ([]int, error) {
// Extract metric names and groupBy keys
metricNames, groupedFields, err := r.extractMetricAndGroupBys(ctx)
if err != nil {
return nil, err
}
if len(metricNames) == 0 || len(groupedFields) == 0 {
// No metrics or groupBy keys, nothing to filter (non-ideal case, return early)
return []int{}, nil
}
// Build lookup keys from series which will be used to query metadata from CH
lookupKeys := make([]model.MetricMetadataLookupKey, 0)
seriesIdxToLookupKeys := make(map[int][]model.MetricMetadataLookupKey) // series index -> lookup keys
for i := 0; i < len(series); i++ {
metricLabelMap := series[i].Labels
// Collect groupBy attribute-value pairs for this series
seriesKeys := make([]model.MetricMetadataLookupKey, 0)
for _, metricName := range metricNames {
for _, groupByKey := range groupedFields {
if attrValue, ok := metricLabelMap[groupByKey]; ok {
lookupKey := model.MetricMetadataLookupKey{
MetricName: metricName,
AttributeName: groupByKey,
AttributeValue: attrValue,
}
lookupKeys = append(lookupKeys, lookupKey)
seriesKeys = append(seriesKeys, lookupKey)
}
}
}
if len(seriesKeys) > 0 {
seriesIdxToLookupKeys[i] = seriesKeys
}
}
if len(lookupKeys) == 0 {
// No lookup keys to query, return empty skip list
// this can happen when the series has no labels at all
// in this case, we include all series as we don't know if it is new or old series
return []int{}, nil
}
// unique lookup keys
uniqueLookupKeysMap := make(map[model.MetricMetadataLookupKey]struct{})
uniqueLookupKeys := make([]model.MetricMetadataLookupKey, 0)
for _, key := range lookupKeys {
if _, ok := uniqueLookupKeysMap[key]; !ok {
uniqueLookupKeysMap[key] = struct{}{}
uniqueLookupKeys = append(uniqueLookupKeys, key)
}
}
// Query metadata for first_seen timestamps
firstSeenMap, err := r.reader.GetFirstSeenFromMetricMetadata(ctx, uniqueLookupKeys)
if err != nil {
return nil, err
}
// Filter series based on first_seen + delay
skipIndexes := make([]int, 0)
evalTimeMs := ts.UnixMilli()
newGroupEvalDelayMs := r.newGroupEvalDelay.Milliseconds()
for i := 0; i < len(series); i++ {
seriesKeys, ok := seriesIdxToLookupKeys[i]
if !ok {
// No matching labels used in groupBy from this series, don't exclude it
// as we can't decide if it is new or old series
continue
}
// Find the maximum first_seen across all groupBy attributes for this series
// if the latest is old enough we're good, if latest is new we need to skip it
maxFirstSeen := int64(0)
// metadataFound tracks if we have metadata for any of the lookup keys
metadataFound := false
for _, lookupKey := range seriesKeys {
if firstSeen, exists := firstSeenMap[lookupKey]; exists {
metadataFound = true
if firstSeen > maxFirstSeen {
maxFirstSeen = firstSeen
}
}
}
// if we don't have metadata for any of the lookup keys, we can't decide if it is new or old series
// in that case, we don't add it to the skip indexes
if !metadataFound {
continue
}
// Check if first_seen + delay has passed
if maxFirstSeen+newGroupEvalDelayMs > evalTimeMs {
// Still within grace period, skip this series
skipIndexes = append(skipIndexes, i)
continue
}
// Old enough, don't skip this series
}
if r.logger != nil && len(skipIndexes) > 0 {
r.logger.InfoContext(ctx, "Filtered new series", "rule_name", r.Name(), "skipped_count", len(skipIndexes), "total_count", len(series), "delay_ms", newGroupEvalDelayMs)
}
return skipIndexes, nil
}

View File

@@ -1,12 +1,31 @@
package rules
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/cache/cachetest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/prometheus/prometheustest"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
)
func TestBaseRule_RequireMinPoints(t *testing.T) {
@@ -81,3 +100,704 @@ func TestBaseRule_RequireMinPoints(t *testing.T) {
})
}
}
// createTestSeries creates a v3.Series with the given labels and optional points
// so we don't exactly need the points in the series because the labels are used to determine if the series is new or old
// we use the labels to create a lookup key for the series and then check the first_seen timestamp for the series in the metadata table
func createTestSeries(labels map[string]string, points []v3.Point) v3.Series {
if points == nil {
points = []v3.Point{}
}
return v3.Series{
Labels: labels,
Points: points,
}
}
// calculateFirstSeen calculates first_seen timestamp based on evalTime, delay, and isOld flag
func calculateFirstSeen(evalTime time.Time, delay time.Duration, isOld bool) int64 {
if isOld {
// Old: evalTime - (2 * delay)
return evalTime.Add(-2 * delay).UnixMilli()
}
// New: evalTime - (delay / 2)
return evalTime.Add(-delay / 2).UnixMilli()
}
// createFirstSeenMap creates a first_seen map for a series with given attributes
// metricName: the metric name
// groupByFields: list of groupBy field names
// evalTime: evaluation time
// delay: newGroupEvalDelay
// isOld: whether the series is old (true) or new (false)
// attributeValues: values for each groupBy field in order
func createFirstSeenMap(metricName string, groupByFields []string, evalTime time.Time, delay time.Duration, isOld bool, attributeValues ...string) map[model.MetricMetadataLookupKey]int64 {
result := make(map[model.MetricMetadataLookupKey]int64)
firstSeen := calculateFirstSeen(evalTime, delay, isOld)
for i, field := range groupByFields {
if i < len(attributeValues) {
key := model.MetricMetadataLookupKey{
MetricName: metricName,
AttributeName: field,
AttributeValue: attributeValues[i],
}
result[key] = firstSeen
}
}
return result
}
// mergeFirstSeenMaps merges multiple first_seen maps into one
// When the same key exists in multiple maps, it keeps the lowest value
// which simulatest the behavior of the ClickHouse query
// finding the minimum first_seen timestamp across all groupBy attributes for a single series
func mergeFirstSeenMaps(maps ...map[model.MetricMetadataLookupKey]int64) map[model.MetricMetadataLookupKey]int64 {
result := make(map[model.MetricMetadataLookupKey]int64)
for _, m := range maps {
for k, v := range m {
if existingValue, exists := result[k]; exists {
// Keep the lowest value
if v < existingValue {
result[k] = v
}
} else {
result[k] = v
}
}
}
return result
}
// createPostableRule creates a PostableRule with the given CompositeQuery
func createPostableRule(compositeQuery *v3.CompositeQuery) ruletypes.PostableRule {
return ruletypes.PostableRule{
AlertName: "Test Rule",
AlertType: ruletypes.AlertTypeMetric,
RuleType: ruletypes.RuleTypeThreshold,
Evaluation: &ruletypes.EvaluationEnvelope{
Kind: ruletypes.RollingEvaluation,
Spec: ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
},
},
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: compositeQuery,
Thresholds: &ruletypes.RuleThresholdData{
Kind: ruletypes.BasicThresholdKind,
Spec: ruletypes.BasicRuleThresholds{
{
Name: "test-threshold",
TargetValue: func() *float64 { v := 1.0; return &v }(),
CompareOp: ruletypes.ValueIsAbove,
MatchType: ruletypes.AtleastOnce,
},
},
},
},
}
}
// setupMetadataQueryMock sets up the ClickHouse mock for GetFirstSeenFromMetricMetadata query
func setupMetadataQueryMock(telemetryStore *telemetrystoretest.Provider, metricNames []string, groupedFields []string, series []v3.Series, firstSeenMap map[model.MetricMetadataLookupKey]int64) {
if len(firstSeenMap) == 0 || len(series) == 0 {
return
}
// Build args from series the same way we build lookup keys in FilterNewSeries
var args []any
uniqueArgsMap := make(map[string]struct{})
for _, s := range series {
labelMap := s.Labels
for _, metricName := range metricNames {
for _, groupByKey := range groupedFields {
if attrValue, ok := labelMap[groupByKey]; ok {
argKey := fmt.Sprintf("%s,%s,%s", metricName, groupByKey, attrValue)
if _, ok := uniqueArgsMap[argKey]; ok {
continue
}
uniqueArgsMap[argKey] = struct{}{}
args = append(args, metricName, groupByKey, attrValue)
}
}
}
}
// Build the query pattern - it uses IN clause with tuples
// We'll match any query that contains the metadata table pattern
metadataCols := []cmock.ColumnType{
{Name: "metric_name", Type: "String"},
{Name: "attr_name", Type: "String"},
{Name: "attr_string_value", Type: "String"},
{Name: "first_seen", Type: "UInt64"},
}
var values [][]interface{}
for key, firstSeen := range firstSeenMap {
values = append(values, []interface{}{
key.MetricName,
key.AttributeName,
key.AttributeValue,
uint64(firstSeen),
})
}
rows := cmock.NewRows(metadataCols, values)
telemetryStore.Mock().
ExpectQuery("SELECT any").
WithArgs(args...).
WillReturnRows(rows)
}
// filterNewSeriesTestCase represents a test case for FilterNewSeries
type filterNewSeriesTestCase struct {
name string
compositeQuery *v3.CompositeQuery
series []v3.Series
firstSeenMap map[model.MetricMetadataLookupKey]int64
newGroupEvalDelay *time.Duration
evalTime time.Time
expectedSkipIndexes []int
expectError bool
}
func TestBaseRule_FilterNewSeries(t *testing.T) {
defaultEvalTime := time.Unix(1700000000, 0)
defaultDelay := 2 * time.Minute
defaultGroupByFields := []string{"service_name", "env"}
logger := instrumentationtest.New().Logger()
settings := instrumentationtest.New().ToProviderSettings()
tests := []filterNewSeriesTestCase{
{
name: "mixed old and new series - Builder query",
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "request_total",
TimeAggregation: metrictypes.TimeAggregationCount,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
},
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
},
},
},
},
},
series: []v3.Series{
createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-new", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-missing", "env": "stage"}, nil),
},
firstSeenMap: mergeFirstSeenMaps(
createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc-old", "prod"),
createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, false, "svc-new", "prod"),
// svc-missing has no metadata, so it will be skipped
),
newGroupEvalDelay: &defaultDelay,
evalTime: defaultEvalTime,
expectedSkipIndexes: []int{1}, // svc-missing should be skipped as we can't decide if it is new or old series
},
{
name: "all new series - PromQL query",
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{
Name: "P1",
Query: "sum by (service_name,env) (rate(request_total[5m]))",
Disabled: false,
Step: qbtypes.Step{Duration: 0},
Stats: false,
},
},
},
},
series: []v3.Series{
createTestSeries(map[string]string{"service_name": "svc-new1", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-new2", "env": "stage"}, nil),
},
firstSeenMap: mergeFirstSeenMaps(
createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, false, "svc-new1", "prod"),
createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, false, "svc-new2", "stage"),
),
newGroupEvalDelay: &defaultDelay,
evalTime: defaultEvalTime,
expectedSkipIndexes: []int{0, 1}, // all should be skipped
},
{
name: "all old series - ClickHouse query",
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeClickHouseSQL,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeClickHouseSQL,
Spec: qbtypes.ClickHouseQuery{
Name: "CH1",
Query: "SELECT service_name, env FROM metrics WHERE metric_name='request_total' GROUP BY service_name, env",
Disabled: false,
},
},
},
},
series: []v3.Series{
createTestSeries(map[string]string{"service_name": "svc-old1", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-old2", "env": "stage"}, nil),
},
firstSeenMap: mergeFirstSeenMaps(
createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc-old1", "prod"),
createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc-old2", "stage"),
),
newGroupEvalDelay: &defaultDelay,
evalTime: defaultEvalTime,
expectedSkipIndexes: []int{}, // none should be skipped
},
{
name: "no grouping in query - Builder",
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "request_total",
TimeAggregation: metrictypes.TimeAggregationCount,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
},
GroupBy: []qbtypes.GroupByKey{},
},
},
},
},
series: []v3.Series{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
},
firstSeenMap: make(map[model.MetricMetadataLookupKey]int64),
newGroupEvalDelay: &defaultDelay,
evalTime: defaultEvalTime,
expectedSkipIndexes: []int{}, // early return, no filtering
},
{
name: "no metric names - Builder",
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{},
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
},
},
},
},
},
series: []v3.Series{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
},
firstSeenMap: make(map[model.MetricMetadataLookupKey]int64),
newGroupEvalDelay: &defaultDelay,
evalTime: defaultEvalTime,
expectedSkipIndexes: []int{}, // early return, no filtering
},
{
name: "series with no matching labels - Builder",
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "request_total",
TimeAggregation: metrictypes.TimeAggregationCount,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
},
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
},
},
},
},
},
series: []v3.Series{
createTestSeries(map[string]string{"status": "200"}, nil), // no service_name or env
},
firstSeenMap: make(map[model.MetricMetadataLookupKey]int64),
newGroupEvalDelay: &defaultDelay,
evalTime: defaultEvalTime,
expectedSkipIndexes: []int{}, // series included as we can't decide if it's new or old
},
{
name: "series with missing metadata - PromQL",
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{
Name: "P1",
Query: "sum by (service_name,env) (rate(request_total[5m]))",
Disabled: false,
Step: qbtypes.Step{Duration: 0},
Stats: false,
},
},
},
},
series: []v3.Series{
createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-no-metadata", "env": "prod"}, nil),
},
firstSeenMap: createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc-old", "prod"),
// svc-no-metadata has no entry in firstSeenMap
newGroupEvalDelay: &defaultDelay,
evalTime: defaultEvalTime,
expectedSkipIndexes: []int{}, // svc-no-metadata should not be skipped as we can't decide if it is new or old series
},
{
name: "series with partial metadata - ClickHouse",
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeClickHouseSQL,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeClickHouseSQL,
Spec: qbtypes.ClickHouseQuery{
Name: "CH1",
Query: "SELECT service_name, env FROM metrics WHERE metric_name='request_total' GROUP BY service_name, env",
Disabled: false,
},
},
},
},
series: []v3.Series{
createTestSeries(map[string]string{"service_name": "svc-partial", "env": "prod"}, nil),
},
// Only provide metadata for service_name, not env
firstSeenMap: map[model.MetricMetadataLookupKey]int64{
{MetricName: "request_total", AttributeName: "service_name", AttributeValue: "svc-partial"}: calculateFirstSeen(defaultEvalTime, defaultDelay, true),
// env metadata is missing
},
newGroupEvalDelay: &defaultDelay,
evalTime: defaultEvalTime,
expectedSkipIndexes: []int{}, // has some metadata, uses max first_seen which is old
},
{
name: "empty series array - Builder",
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "request_total",
TimeAggregation: metrictypes.TimeAggregationCount,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
},
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
},
},
},
},
},
series: []v3.Series{},
firstSeenMap: make(map[model.MetricMetadataLookupKey]int64),
newGroupEvalDelay: &defaultDelay,
evalTime: defaultEvalTime,
expectedSkipIndexes: []int{},
},
{
name: "zero delay - Builder",
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "request_total",
TimeAggregation: metrictypes.TimeAggregationCount,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
},
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
},
},
},
},
},
series: []v3.Series{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
},
firstSeenMap: createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc1", "prod"),
newGroupEvalDelay: func() *time.Duration { d := time.Duration(0); return &d }(), // zero delay
evalTime: defaultEvalTime,
expectedSkipIndexes: []int{}, // with zero delay, all series pass
},
{
name: "multiple metrics with same groupBy keys - Builder",
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "request_total",
TimeAggregation: metrictypes.TimeAggregationCount,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
{
MetricName: "error_total",
TimeAggregation: metrictypes.TimeAggregationCount,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
},
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
},
},
},
},
},
series: []v3.Series{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
},
firstSeenMap: mergeFirstSeenMaps(
createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc1", "prod"),
createFirstSeenMap("error_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc1", "prod"),
),
newGroupEvalDelay: &defaultDelay,
evalTime: defaultEvalTime,
expectedSkipIndexes: []int{},
},
{
name: "series with multiple groupBy attributes where one is new and one is old - Builder",
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "request_total",
TimeAggregation: metrictypes.TimeAggregationCount,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
},
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
},
},
},
},
},
series: []v3.Series{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
},
// service_name is old, env is new - should use max (new)
firstSeenMap: mergeFirstSeenMaps(
createFirstSeenMap("request_total", []string{"service_name"}, defaultEvalTime, defaultDelay, true, "svc1"),
createFirstSeenMap("request_total", []string{"env"}, defaultEvalTime, defaultDelay, false, "prod"),
),
newGroupEvalDelay: &defaultDelay,
evalTime: defaultEvalTime,
expectedSkipIndexes: []int{0}, // max first_seen is new, so should skip
},
{
name: "Logs query - should skip filtering and return empty skip indexes",
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Name: "A",
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
Signal: telemetrytypes.SignalLogs,
Aggregations: []qbtypes.LogAggregation{
{
Expression: "count()",
},
},
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
},
},
},
},
},
series: []v3.Series{
createTestSeries(map[string]string{"service_name": "svc1"}, nil),
createTestSeries(map[string]string{"service_name": "svc2"}, nil),
},
firstSeenMap: make(map[model.MetricMetadataLookupKey]int64),
newGroupEvalDelay: &defaultDelay,
evalTime: defaultEvalTime,
expectedSkipIndexes: []int{}, // Logs queries should return early, no filtering
},
{
name: "Traces query - should skip filtering and return empty skip indexes",
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "A",
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
Signal: telemetrytypes.SignalTraces,
Aggregations: []qbtypes.TraceAggregation{
{
Expression: "count()",
},
},
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
},
},
},
},
},
series: []v3.Series{
createTestSeries(map[string]string{"service_name": "svc1"}, nil),
createTestSeries(map[string]string{"service_name": "svc2"}, nil),
},
firstSeenMap: make(map[model.MetricMetadataLookupKey]int64),
newGroupEvalDelay: &defaultDelay,
evalTime: defaultEvalTime,
expectedSkipIndexes: []int{}, // Traces queries should return early, no filtering
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create postableRule from compositeQuery
postableRule := createPostableRule(tt.compositeQuery)
// Setup telemetry store mock
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
// Create query parser
queryParser := queryparser.New(settings)
// Use query parser to extract metric names and groupBy fields
analyzeResult, err := queryParser.AnalyzeCompositeQuery(context.Background(), tt.compositeQuery)
require.NoError(t, err)
metricNames := analyzeResult.MetricNames
groupedFields := []string{}
for _, col := range analyzeResult.GroupByColumns {
groupedFields = append(groupedFields, col.OriginField)
}
// Setup metadata query mock
setupMetadataQueryMock(telemetryStore, metricNames, groupedFields, tt.series, tt.firstSeenMap)
// Create reader with mocked telemetry store
readerCache, err := cachetest.New(
cache.Config{
Provider: "memory",
Memory: cache.Memory{
NumCounters: 10 * 1000,
MaxCost: 1 << 26,
},
},
)
require.NoError(t, err)
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(
nil,
telemetryStore,
prometheustest.New(context.Background(), settings, prometheus.Config{}, telemetryStore),
"",
time.Duration(time.Second),
nil,
readerCache,
options,
)
// Set newGroupEvalDelay in NotificationSettings if provided
if tt.newGroupEvalDelay != nil {
postableRule.NotificationSettings = &ruletypes.NotificationSettings{
NewGroupEvalDelay: func() *ruletypes.Duration {
d := ruletypes.Duration(*tt.newGroupEvalDelay)
return &d
}(),
}
}
// Create BaseRule using NewBaseRule
rule, err := NewBaseRule("test-rule", valuer.GenerateUUID(), &postableRule, reader, WithQueryParser(queryParser), WithLogger(logger))
require.NoError(t, err)
skipIndexes, err := rule.FilterNewSeries(context.Background(), tt.evalTime, tt.series)
if tt.expectError {
require.Error(t, err)
return
}
require.NoError(t, err)
require.ElementsMatch(t, tt.expectedSkipIndexes, skipIndexes, "skip indexes should match")
})
}
}

View File

@@ -11,6 +11,7 @@ import (
"time"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/queryparser"
"go.uber.org/zap"
@@ -103,6 +104,7 @@ type ManagerOptions struct {
RuleStore ruletypes.RuleStore
MaintenanceStore ruletypes.MaintenanceStore
SqlStore sqlstore.SQLStore
QueryParser queryparser.QueryParser
}
// The Manager manages recording and alerting rules.
@@ -125,6 +127,8 @@ type Manager struct {
alertmanager alertmanager.Alertmanager
sqlstore sqlstore.SQLStore
orgGetter organization.Getter
// queryParser is used for parsing queries for rules
queryParser queryparser.QueryParser
}
func defaultOptions(o *ManagerOptions) *ManagerOptions {
@@ -166,6 +170,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.SLogger,
WithEvalDelay(opts.ManagerOpts.EvalDelay),
WithSQLStore(opts.SQLStore),
WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
@@ -188,6 +193,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.Reader,
opts.ManagerOpts.Prometheus,
WithSQLStore(opts.SQLStore),
WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
@@ -226,6 +232,7 @@ func NewManager(o *ManagerOptions) (*Manager, error) {
alertmanager: o.Alertmanager,
orgGetter: o.OrgGetter,
sqlstore: o.SqlStore,
queryParser: o.QueryParser,
}
zap.L().Debug("Manager created successfully with NotificationGroup")

View File

@@ -119,6 +119,42 @@ func (r *PromRule) getPqlQuery() (string, error) {
return "", fmt.Errorf("invalid promql rule query")
}
// filterNewSeries filters out new series based on the first_seen timestamp.
func (r *PromRule) filterNewSeries(ctx context.Context, ts time.Time, res promql.Matrix) (promql.Matrix, error) {
// Convert promql.Matrix to []v3.Series
v3Series := make([]v3.Series, 0, len(res))
for _, series := range res {
v3Series = append(v3Series, toCommonSeries(series))
}
// Get indexes to skip
skipIndexes, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, v3Series)
if filterErr != nil {
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
return nil, filterErr
}
// if no series are skipped, return the original matrix
if len(skipIndexes) == 0 {
return res, nil
}
// Create a map of skip indexes for efficient lookup
skippedIdxMap := make(map[int]struct{}, len(skipIndexes))
for _, idx := range skipIndexes {
skippedIdxMap[idx] = struct{}{}
}
// Filter out skipped series from promql.Matrix
filteredMatrix := make(promql.Matrix, 0, len(res)-len(skipIndexes))
for i, series := range res {
if _, shouldSkip := skippedIdxMap[i]; !shouldSkip {
filteredMatrix = append(filteredMatrix, series)
}
}
return filteredMatrix, nil
}
func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletypes.Vector, error) {
start, end := r.Timestamps(ts)
interval := 60 * time.Second // TODO(srikanthccv): this should be configurable
@@ -135,8 +171,19 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
return nil, err
}
matrixToProcess := res
// Filter out new series if newGroupEvalDelay is configured
if r.ShouldSkipNewGroups() {
filteredSeries, filterErr := r.filterNewSeries(ctx, ts, matrixToProcess)
if filterErr != nil {
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
return nil, filterErr
}
matrixToProcess = filteredSeries
}
var resultVector ruletypes.Vector
for _, series := range res {
for _, series := range matrixToProcess {
resultSeries, err := r.Threshold.Eval(toCommonSeries(series), r.Unit(), ruletypes.EvalData{
ActiveAlerts: r.ActiveAlertsLabelFP(),
})

View File

@@ -52,6 +52,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
WithSendAlways(),
WithSendUnmatched(),
WithSQLStore(opts.SQLStore),
WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {
@@ -72,6 +73,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
WithSendAlways(),
WithSendUnmatched(),
WithSQLStore(opts.SQLStore),
WithQueryParser(opts.ManagerOpts.QueryParser),
)
if err != nil {

View File

@@ -378,6 +378,42 @@ func (r *ThresholdRule) GetSelectedQuery() string {
return r.ruleCondition.GetSelectedQueryName()
}
// filterNewSeries filters out new series based on the first_seen timestamp.
func (r *ThresholdRule) filterNewSeries(ctx context.Context, ts time.Time, series []*v3.Series) ([]*v3.Series, error) {
// Convert []*v3.Series to []v3.Series for filtering
v3Series := make([]v3.Series, 0, len(series))
for _, s := range series {
v3Series = append(v3Series, *s)
}
// Get indexes to skip
skipIndexes, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, v3Series)
if filterErr != nil {
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
return nil, filterErr
}
// if no series are skipped, return the original series
if len(skipIndexes) == 0 {
return series, nil
}
// Create a map of skip indexes for efficient lookup
skippedIdxMap := make(map[int]struct{}, len(skipIndexes))
for _, idx := range skipIndexes {
skippedIdxMap[idx] = struct{}{}
}
// Filter out skipped series
oldSeries := make([]*v3.Series, 0, len(series)-len(skipIndexes))
for i, s := range series {
if _, shouldSkip := skippedIdxMap[i]; !shouldSkip {
oldSeries = append(oldSeries, s)
}
}
return oldSeries, nil
}
func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, ts time.Time) (ruletypes.Vector, error) {
params, err := r.prepareQueryRange(ctx, ts)
@@ -481,7 +517,18 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
return resultVector, nil
}
for _, series := range queryResult.Series {
// Filter out new series if newGroupEvalDelay is configured
seriesToProcess := queryResult.Series
if r.ShouldSkipNewGroups() {
filteredSeries, filterErr := r.filterNewSeries(ctx, ts, seriesToProcess)
if filterErr != nil {
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
return nil, filterErr
}
seriesToProcess = filteredSeries
}
for _, series := range seriesToProcess {
if r.Condition() != nil && r.Condition().RequireMinPoints {
if len(series.Points) < r.ruleCondition.RequiredNumPoints {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
@@ -560,7 +607,17 @@ func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUI
return resultVector, nil
}
for _, series := range queryResult.Series {
// Filter out new series if newGroupEvalDelay is configured
seriesToProcess := queryResult.Series
if r.ShouldSkipNewGroups() {
filteredSeries, filterErr := r.filterNewSeries(ctx, ts, seriesToProcess)
if filterErr != nil {
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
return nil, filterErr
}
seriesToProcess = filteredSeries
}
for _, series := range seriesToProcess {
if r.Condition() != nil && r.Condition().RequireMinPoints {
if len(series.Points) < r.Condition().RequiredNumPoints {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)

View File

@@ -9,19 +9,22 @@ import (
// AssignReservedVars assigns values for go template vars. assumes that
// model.QueryRangeParamsV3.Start and End are Unix Nano timestamps
func AssignReservedVarsV3(queryRangeParams *v3.QueryRangeParamsV3) {
queryRangeParams.Variables["start_timestamp"] = queryRangeParams.Start / 1000
queryRangeParams.Variables["end_timestamp"] = queryRangeParams.End / 1000
queryRangeParams.Variables["start_timestamp_ms"] = queryRangeParams.Start
queryRangeParams.Variables["end_timestamp_ms"] = queryRangeParams.End
queryRangeParams.Variables["SIGNOZ_START_TIME"] = queryRangeParams.Start
queryRangeParams.Variables["SIGNOZ_END_TIME"] = queryRangeParams.End
queryRangeParams.Variables["start_timestamp_nano"] = queryRangeParams.Start * 1e6
queryRangeParams.Variables["end_timestamp_nano"] = queryRangeParams.End * 1e6
queryRangeParams.Variables["start_datetime"] = fmt.Sprintf("toDateTime(%d)", queryRangeParams.Start/1000)
queryRangeParams.Variables["end_datetime"] = fmt.Sprintf("toDateTime(%d)", queryRangeParams.End/1000)
AssignReservedVars(queryRangeParams.Variables, queryRangeParams.Start, queryRangeParams.End)
}
func AssignReservedVars(variables map[string]interface{}, start int64, end int64) {
variables["start_timestamp"] = start / 1000
variables["end_timestamp"] = end / 1000
variables["start_timestamp_ms"] = start
variables["end_timestamp_ms"] = end
variables["SIGNOZ_START_TIME"] = start
variables["SIGNOZ_END_TIME"] = end
variables["start_timestamp_nano"] = start * 1e6
variables["end_timestamp_nano"] = end * 1e6
variables["start_datetime"] = fmt.Sprintf("toDateTime(%d)", start/1000)
variables["end_datetime"] = fmt.Sprintf("toDateTime(%d)", end/1000)
}

View File

@@ -2,6 +2,7 @@ package queryfilterextractor
import (
"fmt"
"sort"
"strings"
clickhouse "github.com/AfterShip/clickhouse-sql-parser/parser"
@@ -87,6 +88,12 @@ func (e *ClickHouseFilterExtractor) Extract(query string) (*FilterResult, error)
result.GroupByColumns = append(result.GroupByColumns, colInfo)
}
// Sort the metric names and group by columns to return deterministic results
sort.Strings(result.MetricNames)
sort.Slice(result.GroupByColumns, func(i, j int) bool {
return result.GroupByColumns[i].Name < result.GroupByColumns[j].Name
})
return result, nil
}

View File

@@ -1,6 +1,8 @@
package queryfilterextractor
import (
"sort"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
@@ -45,6 +47,12 @@ func (e *PromQLFilterExtractor) Extract(query string) (*FilterResult, error) {
result.GroupByColumns = append(result.GroupByColumns, ColumnInfo{Name: groupKey, OriginExpr: groupKey, OriginField: groupKey})
}
// Sort the metric names and group by columns to return deterministic results
sort.Strings(result.MetricNames)
sort.Slice(result.GroupByColumns, func(i, j int) bool {
return result.GroupByColumns[i].Name < result.GroupByColumns[j].Name
})
return result, nil
}

View File

@@ -2,7 +2,9 @@ package queryparser
import (
"context"
"fmt"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
@@ -11,4 +13,22 @@ import (
type QueryParser interface {
// AnalyzeQueryFilter extracts filter conditions from a given query string.
AnalyzeQueryFilter(ctx context.Context, queryType querybuildertypesv5.QueryType, query string) (*queryfilterextractor.FilterResult, error)
// AnalyzeCompositeQuery extracts filter conditions from a composite query.
AnalyzeCompositeQuery(ctx context.Context, compositeQuery *v3.CompositeQuery) (*queryfilterextractor.FilterResult, error)
// ValidateCompositeQuery validates a composite query and returns an error if validation fails.
ValidateCompositeQuery(ctx context.Context, compositeQuery *v3.CompositeQuery) error
}
type QueryParseError struct {
StartPosition *int
EndPosition *int
ErrorMessage string
Query string
}
func (e *QueryParseError) Error() string {
if e.StartPosition != nil && e.EndPosition != nil {
return fmt.Sprintf("query parse error: %s at position %d:%d", e.ErrorMessage, *e.StartPosition, *e.EndPosition)
}
return fmt.Sprintf("query parse error: %s", e.ErrorMessage)
}

View File

@@ -1,40 +0,0 @@
package queryparser
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type queryParserImpl struct {
settings factory.ProviderSettings
}
// New creates a new implementation of the QueryParser service.
func New(settings factory.ProviderSettings) QueryParser {
return &queryParserImpl{
settings: settings,
}
}
func (p *queryParserImpl) AnalyzeQueryFilter(ctx context.Context, queryType querybuildertypesv5.QueryType, query string) (*queryfilterextractor.FilterResult, error) {
var extractorType queryfilterextractor.ExtractorType
switch queryType {
case querybuildertypesv5.QueryTypePromQL:
extractorType = queryfilterextractor.ExtractorTypePromQL
case querybuildertypesv5.QueryTypeClickHouseSQL:
extractorType = queryfilterextractor.ExtractorTypeClickHouseSQL
default:
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported queryType: %s. Supported values are '%s' and '%s'", queryType, querybuildertypesv5.QueryTypePromQL, querybuildertypesv5.QueryTypeClickHouseSQL)
}
// Create extractor
extractor, err := queryfilterextractor.NewExtractor(extractorType)
if err != nil {
return nil, err
}
return extractor.Extract(query)
}

View File

@@ -0,0 +1,256 @@
package queryparser
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type queryParserImpl struct {
settings factory.ProviderSettings
}
// New creates a new implementation of the QueryParser service.
func New(settings factory.ProviderSettings) QueryParser {
return &queryParserImpl{
settings: settings,
}
}
func (p *queryParserImpl) AnalyzeQueryFilter(ctx context.Context, queryType qbtypes.QueryType, query string) (*queryfilterextractor.FilterResult, error) {
var extractorType queryfilterextractor.ExtractorType
switch queryType {
case qbtypes.QueryTypePromQL:
extractorType = queryfilterextractor.ExtractorTypePromQL
case qbtypes.QueryTypeClickHouseSQL:
extractorType = queryfilterextractor.ExtractorTypeClickHouseSQL
default:
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported queryType: %s. Supported values are '%s' and '%s'", queryType, qbtypes.QueryTypePromQL, qbtypes.QueryTypeClickHouseSQL)
}
// Create extractor
extractor, err := queryfilterextractor.NewExtractor(extractorType)
if err != nil {
return nil, err
}
return extractor.Extract(query)
}
func (p *queryParserImpl) AnalyzeCompositeQuery(ctx context.Context, compositeQuery *v3.CompositeQuery) (*queryfilterextractor.FilterResult, error) {
var result = &queryfilterextractor.FilterResult{
MetricNames: []string{},
GroupByColumns: []queryfilterextractor.ColumnInfo{},
}
for _, query := range compositeQuery.Queries {
switch query.Type {
case qbtypes.QueryTypeBuilder:
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
// extract group by fields
for _, groupBy := range spec.GroupBy {
if groupBy.Name != "" {
result.GroupByColumns = append(result.GroupByColumns, queryfilterextractor.ColumnInfo{Name: groupBy.Name, OriginExpr: groupBy.Name, OriginField: groupBy.Name})
}
}
// extract metric names
for _, aggregation := range spec.Aggregations {
if aggregation.MetricName != "" {
result.MetricNames = append(result.MetricNames, aggregation.MetricName)
}
}
default:
// TODO: add support for Traces and Logs Aggregation types
if p.settings.Logger != nil {
p.settings.Logger.WarnContext(ctx, "unsupported QueryBuilderQuery type: %T", spec)
}
continue
}
case qbtypes.QueryTypePromQL:
spec, ok := query.Spec.(qbtypes.PromQuery)
if !ok || spec.Query == "" {
continue
}
res, err := p.AnalyzeQueryFilter(ctx, qbtypes.QueryTypePromQL, spec.Query)
if err != nil {
return nil, err
}
result.MetricNames = append(result.MetricNames, res.MetricNames...)
result.GroupByColumns = append(result.GroupByColumns, res.GroupByColumns...)
case qbtypes.QueryTypeClickHouseSQL:
spec, ok := query.Spec.(qbtypes.ClickHouseQuery)
if !ok || spec.Query == "" {
continue
}
res, err := p.AnalyzeQueryFilter(ctx, qbtypes.QueryTypeClickHouseSQL, spec.Query)
if err != nil {
return nil, err
}
result.MetricNames = append(result.MetricNames, res.MetricNames...)
result.GroupByColumns = append(result.GroupByColumns, res.GroupByColumns...)
default:
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported query type: %s", query.Type)
}
}
return result, nil
}
// ValidateCompositeQuery validates a composite query by checking all queries in the queries array
func (p *queryParserImpl) ValidateCompositeQuery(ctx context.Context, compositeQuery *v3.CompositeQuery) error {
if compositeQuery == nil {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"composite query is required",
)
}
if len(compositeQuery.Queries) == 0 {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"at least one query is required",
)
}
// Validate each query
for i, envelope := range compositeQuery.Queries {
queryId := qbtypes.GetQueryIdentifier(envelope, i)
switch envelope.Type {
case qbtypes.QueryTypeBuilder, qbtypes.QueryTypeSubQuery:
switch spec := envelope.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
if err := spec.Validate(qbtypes.RequestTypeTimeSeries); err != nil {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid %s: %s",
queryId,
err.Error(),
)
}
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
if err := spec.Validate(qbtypes.RequestTypeTimeSeries); err != nil {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid %s: %s",
queryId,
err.Error(),
)
}
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
if err := spec.Validate(qbtypes.RequestTypeTimeSeries); err != nil {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid %s: %s",
queryId,
err.Error(),
)
}
default:
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"unknown query spec type for %s",
queryId,
)
}
case qbtypes.QueryTypePromQL:
spec, ok := envelope.Spec.(qbtypes.PromQuery)
if !ok {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid spec for %s",
queryId,
)
}
if spec.Query == "" {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"query expression is required for %s",
queryId,
)
}
if err := validatePromQLQuery(spec.Query); err != nil {
return err
}
case qbtypes.QueryTypeClickHouseSQL:
spec, ok := envelope.Spec.(qbtypes.ClickHouseQuery)
if !ok {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid spec for %s",
queryId,
)
}
if spec.Query == "" {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"query expression is required for %s",
queryId,
)
}
if err := validateClickHouseQuery(spec.Query); err != nil {
return err
}
case qbtypes.QueryTypeFormula:
spec, ok := envelope.Spec.(qbtypes.QueryBuilderFormula)
if !ok {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid spec for %s",
queryId,
)
}
if err := spec.Validate(); err != nil {
return err
}
case qbtypes.QueryTypeJoin:
spec, ok := envelope.Spec.(qbtypes.QueryBuilderJoin)
if !ok {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid spec for %s",
queryId,
)
}
if err := spec.Validate(); err != nil {
return err
}
case qbtypes.QueryTypeTraceOperator:
spec, ok := envelope.Spec.(qbtypes.QueryBuilderTraceOperator)
if !ok {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid spec for %s",
queryId,
)
}
err := spec.ValidateTraceOperator(compositeQuery.Queries)
if err != nil {
return err
}
default:
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"unknown query type '%s' for %s",
envelope.Type,
queryId,
).WithAdditional(
"Valid query types are: builder_query, builder_sub_query, builder_formula, builder_join, promql, clickhouse_sql, trace_operator",
)
}
}
// Check if all queries are disabled
if allDisabled := checkQueriesDisabled(compositeQuery); allDisabled {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"all queries are disabled - at least one query must be enabled",
)
}
return nil
}

View File

@@ -0,0 +1,112 @@
package queryparser
import (
"context"
"encoding/json"
"testing"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
"github.com/stretchr/testify/require"
)
func TestBaseRule_ExtractMetricAndGroupBys(t *testing.T) {
ctx := context.Background()
tests := []struct {
name string
payload string
wantMetrics []string
wantGroupBy []queryfilterextractor.ColumnInfo
}{
{
name: "builder multiple grouping",
payload: builderQueryWithGrouping,
wantMetrics: []string{"test_metric_cardinality", "cpu_usage_total"},
wantGroupBy: []queryfilterextractor.ColumnInfo{
{Name: "service_name", Alias: "", OriginExpr: "service_name", OriginField: "service_name"},
{Name: "env", Alias: "", OriginExpr: "env", OriginField: "env"},
},
},
{
name: "builder single grouping",
payload: builderQuerySingleGrouping,
wantMetrics: []string{"latency_p50"},
wantGroupBy: []queryfilterextractor.ColumnInfo{
{Name: "namespace", Alias: "", OriginExpr: "namespace", OriginField: "namespace"},
},
},
{
name: "builder no grouping",
payload: builderQueryNoGrouping,
wantMetrics: []string{"disk_usage_total"},
wantGroupBy: []queryfilterextractor.ColumnInfo{},
},
{
name: "promql multiple grouping",
payload: promQueryWithGrouping,
wantMetrics: []string{"http_requests_total"},
wantGroupBy: []queryfilterextractor.ColumnInfo{
{Name: "pod", Alias: "", OriginExpr: "pod", OriginField: "pod"},
{Name: "region", Alias: "", OriginExpr: "region", OriginField: "region"},
},
},
{
name: "promql single grouping",
payload: promQuerySingleGrouping,
wantMetrics: []string{"cpu_usage_seconds_total"},
wantGroupBy: []queryfilterextractor.ColumnInfo{
{Name: "env", Alias: "", OriginExpr: "env", OriginField: "env"},
},
},
{
name: "promql no grouping",
payload: promQueryNoGrouping,
wantMetrics: []string{"node_cpu_seconds_total"},
wantGroupBy: []queryfilterextractor.ColumnInfo{},
},
{
name: "clickhouse multiple grouping",
payload: clickHouseQueryWithGrouping,
wantMetrics: []string{"cpu"},
wantGroupBy: []queryfilterextractor.ColumnInfo{
{Name: "region", Alias: "r", OriginExpr: "region", OriginField: "region"},
{Name: "zone", Alias: "", OriginExpr: "zone", OriginField: "zone"},
},
},
{
name: "clickhouse single grouping",
payload: clickHouseQuerySingleGrouping,
wantMetrics: []string{"cpu_usage"},
wantGroupBy: []queryfilterextractor.ColumnInfo{
{Name: "region", Alias: "r", OriginExpr: "region", OriginField: "region"},
},
},
{
name: "clickhouse no grouping",
payload: clickHouseQueryNoGrouping,
wantMetrics: []string{"memory_usage"},
wantGroupBy: []queryfilterextractor.ColumnInfo{},
},
}
queryParser := New(instrumentationtest.New().ToProviderSettings())
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cq := mustCompositeQuery(t, tt.payload)
res, err := queryParser.AnalyzeCompositeQuery(ctx, cq)
require.NoError(t, err)
require.ElementsMatch(t, tt.wantMetrics, res.MetricNames)
require.ElementsMatch(t, tt.wantGroupBy, res.GroupByColumns)
})
}
}
func mustCompositeQuery(t *testing.T, payload string) *v3.CompositeQuery {
t.Helper()
var compositeQuery v3.CompositeQuery
require.NoError(t, json.Unmarshal([]byte(payload), &compositeQuery))
return &compositeQuery
}

View File

@@ -0,0 +1,184 @@
package queryparser
var (
builderQueryWithGrouping = `
{
"queryType":"builder",
"panelType":"graph",
"queries":[
{
"type":"builder_query",
"spec":{
"name":"A",
"signal":"metrics",
"stepInterval":null,
"disabled":false,
"filter":{"expression":""},
"groupBy":[
{"name":"service_name","fieldDataType":"","fieldContext":""},
{"name":"env","fieldDataType":"","fieldContext":""}
],
"aggregations":[
{"metricName":"test_metric_cardinality","timeAggregation":"count","spaceAggregation":"sum"},
{"metricName":"cpu_usage_total","timeAggregation":"avg","spaceAggregation":"avg"}
]
}
}
]
}
`
builderQuerySingleGrouping = `
{
"queryType":"builder",
"panelType":"graph",
"queries":[
{
"type":"builder_query",
"spec":{
"name":"B",
"signal":"metrics",
"stepInterval":null,
"disabled":false,
"groupBy":[
{"name":"namespace","fieldDataType":"","fieldContext":""}
],
"aggregations":[
{"metricName":"latency_p50","timeAggregation":"avg","spaceAggregation":"max"}
]
}
}
]
}
`
builderQueryNoGrouping = `
{
"queryType":"builder",
"panelType":"graph",
"queries":[
{
"type":"builder_query",
"spec":{
"name":"C",
"signal":"metrics",
"stepInterval":null,
"disabled":false,
"groupBy":[],
"aggregations":[
{"metricName":"disk_usage_total","timeAggregation":"sum","spaceAggregation":"sum"}
]
}
}
]
}
`
promQueryWithGrouping = `
{
"queries":[
{
"type":"promql",
"spec":{
"name":"P1",
"query":"sum by (pod,region) (rate(http_requests_total[5m]))",
"disabled":false,
"step":0,
"stats":false
}
}
],
"panelType":"graph",
"queryType":"promql"
}
`
promQuerySingleGrouping = `
{
"queries":[
{
"type":"promql",
"spec":{
"name":"P2",
"query":"sum by (env)(rate(cpu_usage_seconds_total{job=\"api\"}[5m]))",
"disabled":false,
"step":0,
"stats":false
}
}
],
"panelType":"graph",
"queryType":"promql"
}
`
promQueryNoGrouping = `
{
"queries":[
{
"type":"promql",
"spec":{
"name":"P3",
"query":"rate(node_cpu_seconds_total[1m])",
"disabled":false,
"step":0,
"stats":false
}
}
],
"panelType":"graph",
"queryType":"promql"
}
`
clickHouseQueryWithGrouping = `
{
"queryType":"clickhouse_sql",
"panelType":"graph",
"queries":[
{
"type":"clickhouse_sql",
"spec":{
"name":"CH1",
"query":"SELECT region as r, zone FROM metrics WHERE metric_name='cpu' GROUP BY region, zone",
"disabled":false
}
}
]
}
`
clickHouseQuerySingleGrouping = `
{
"queryType":"clickhouse_sql",
"panelType":"graph",
"queries":[
{
"type":"clickhouse_sql",
"spec":{
"name":"CH2",
"query":"SELECT region as r FROM metrics WHERE metric_name='cpu_usage' GROUP BY region",
"disabled":false
}
}
]
}
`
clickHouseQueryNoGrouping = `
{
"queryType":"clickhouse_sql",
"panelType":"graph",
"queries":[
{
"type":"clickhouse_sql",
"spec":{
"name":"CH3",
"query":"SELECT * FROM metrics WHERE metric_name = 'memory_usage'",
"disabled":false
}
}
]
}
`
)

View File

@@ -0,0 +1,466 @@
package queryparser
import (
"context"
"testing"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
func TestValidateCompositeQuery(t *testing.T) {
ctx := context.Background()
queryParser := New(instrumentationtest.New().ToProviderSettings())
tests := []struct {
name string
compositeQuery *v3.CompositeQuery
wantErr bool
errContains string
}{
{
name: "nil composite query should return error",
compositeQuery: nil,
wantErr: true,
errContains: "composite query is required",
},
{
name: "empty queries array should return error",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{},
},
wantErr: true,
errContains: "at least one query is required",
},
{
name: "valid metric builder query should pass",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "metric_query",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "cpu_usage",
},
},
},
},
},
},
wantErr: false,
},
{
name: "valid log builder query should pass",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Name: "log_query",
Signal: telemetrytypes.SignalLogs,
Aggregations: []qbtypes.LogAggregation{
{
Expression: "count()",
},
},
},
},
},
},
wantErr: false,
},
{
name: "valid trace builder query should pass",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "trace_query",
Signal: telemetrytypes.SignalTraces,
Aggregations: []qbtypes.TraceAggregation{
{
Expression: "count()",
},
},
},
},
},
},
wantErr: false,
},
{
name: "valid PromQL query should pass",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{
Name: "prom_query",
Query: "rate(http_requests_total[5m])",
},
},
},
},
wantErr: false,
},
{
name: "valid ClickHouse query should pass",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeClickHouseSQL,
Spec: qbtypes.ClickHouseQuery{
Name: "ch_query",
Query: "SELECT count(*) FROM metrics WHERE metric_name = 'cpu_usage'",
},
},
},
},
wantErr: false,
},
{
name: "valid formula query should pass",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeFormula,
Spec: qbtypes.QueryBuilderFormula{
Name: "formula_query",
Expression: "A + B",
},
},
},
},
wantErr: false,
},
{
name: "valid join query should pass",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeJoin,
Spec: qbtypes.QueryBuilderJoin{
Name: "join_query",
Left: qbtypes.QueryRef{Name: "A"},
Right: qbtypes.QueryRef{Name: "B"},
Type: qbtypes.JoinTypeInner,
On: "service_name",
},
},
},
},
wantErr: false,
},
{
name: "valid trace operator query should pass",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Aggregations: []qbtypes.TraceAggregation{
{
Expression: "count()",
},
},
},
},
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
Aggregations: []qbtypes.TraceAggregation{
{
Expression: "count()",
},
},
},
},
{
Type: qbtypes.QueryTypeTraceOperator,
Spec: qbtypes.QueryBuilderTraceOperator{
Name: "trace_operator",
Expression: "A && B",
},
},
},
},
wantErr: false,
},
{
name: "invalid metric builder query - missing aggregation should return error",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "metric_query",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{},
},
},
},
},
wantErr: true,
errContains: "invalid",
},
{
name: "invalid PromQL query - empty query should return error",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{
Name: "prom_query",
Query: "",
},
},
},
},
wantErr: true,
errContains: "query expression is required",
},
{
name: "invalid PromQL query - syntax error should return error",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{
Name: "prom_query",
Query: "rate(http_requests_total[5m",
},
},
},
},
wantErr: true,
errContains: "unclosed left parenthesis",
},
{
name: "invalid ClickHouse query - empty query should return error",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeClickHouseSQL,
Spec: qbtypes.ClickHouseQuery{
Name: "ch_query",
Query: "",
},
},
},
},
wantErr: true,
errContains: "query expression is required",
},
{
name: "invalid ClickHouse query - syntax error should return error",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeClickHouseSQL,
Spec: qbtypes.ClickHouseQuery{
Name: "ch_query",
Query: "SELECT * FROM metrics WHERE",
},
},
},
},
wantErr: true,
errContains: "query parse error",
},
{
name: "invalid formula query - empty expression should return error",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeFormula,
Spec: qbtypes.QueryBuilderFormula{
Name: "formula_query",
Expression: "",
},
},
},
},
wantErr: true,
errContains: "formula expression cannot be blank",
},
{
name: "invalid trace operator query - empty expression should return error",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeTraceOperator,
Spec: qbtypes.QueryBuilderTraceOperator{
Name: "trace_operator",
Expression: "",
},
},
},
},
wantErr: true,
errContains: "expression cannot be empty",
},
{
name: "all queries disabled should return error",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "metric_query",
Disabled: true,
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "cpu_usage",
},
},
},
},
{
Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{
Name: "prom_query",
Query: "rate(http_requests_total[5m])",
Disabled: true,
},
},
},
},
wantErr: true,
errContains: "all queries are disabled",
},
{
name: "mixed disabled and enabled queries should pass",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "metric_query",
Disabled: true,
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "cpu_usage",
},
},
},
},
{
Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{
Name: "prom_query",
Query: "rate(http_requests_total[5m])",
Disabled: false,
},
},
},
},
wantErr: false,
},
{
name: "multiple valid queries should pass",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "metric_query",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "cpu_usage",
},
},
},
},
{
Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{
Name: "prom_query",
Query: "rate(http_requests_total[5m])",
},
},
{
Type: qbtypes.QueryTypeClickHouseSQL,
Spec: qbtypes.ClickHouseQuery{
Name: "ch_query",
Query: "SELECT count(*) FROM metrics WHERE metric_name = 'cpu_usage'",
},
},
},
},
wantErr: false,
},
{
name: "invalid query in multiple queries should return error",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "metric_query",
Signal: telemetrytypes.SignalMetrics,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "cpu_usage",
},
},
},
},
{
Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{
Name: "prom_query",
Query: "invalid promql syntax [",
},
},
},
},
wantErr: true,
errContains: "query parse error",
},
{
name: "unknown query type should return error",
compositeQuery: &v3.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryType{String: valuer.NewString("invalid_query_type")},
Spec: qbtypes.PromQuery{
Name: "prom_query",
Query: "rate(http_requests_total[5m])",
},
},
},
},
wantErr: true,
errContains: "unknown query type",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := queryParser.ValidateCompositeQuery(ctx, tt.compositeQuery)
if tt.wantErr {
require.Error(t, err)
if tt.errContains != "" {
require.Contains(t, err.Error(), tt.errContains)
}
} else {
require.NoError(t, err)
}
})
}
}

View File

@@ -0,0 +1,123 @@
package queryparser
import (
"bytes"
"text/template"
"time"
clickhouse "github.com/AfterShip/clickhouse-sql-parser/parser"
"github.com/SigNoz/signoz/pkg/errors"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
querytemplate "github.com/SigNoz/signoz/pkg/query-service/utils/queryTemplate"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/prometheus/prometheus/promql/parser"
)
// validatePromQLQuery validates a PromQL query syntax using the Prometheus parser
func validatePromQLQuery(query string) error {
_, err := parser.ParseExpr(query)
if err != nil {
if syntaxErrs, ok := err.(parser.ParseErrors); ok {
syntaxErr := syntaxErrs[0]
startPosition := int(syntaxErr.PositionRange.Start)
endPosition := int(syntaxErr.PositionRange.End)
return &QueryParseError{
StartPosition: &startPosition,
EndPosition: &endPosition,
ErrorMessage: syntaxErr.Error(),
Query: query,
}
}
}
return err
}
// validateClickHouseQuery validates a ClickHouse SQL query syntax using the ClickHouse parser
func validateClickHouseQuery(query string) error {
// Assign the default template variables with dummy values
variables := make(map[string]interface{})
start := time.Now().UnixMilli()
end := start + 1000
querytemplate.AssignReservedVars(variables, start, end)
// Apply the values for default template variables before parsing the query
tmpl := template.New("clickhouse-query")
tmpl, err := tmpl.Parse(query)
if err != nil {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"failed to parse clickhouse query: %s",
err.Error(),
)
}
var queryBuffer bytes.Buffer
err = tmpl.Execute(&queryBuffer, variables)
if err != nil {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"failed to execute clickhouse query template: %s",
err.Error(),
)
}
// Parse the ClickHouse query with the default template variables applied
p := clickhouse.NewParser(queryBuffer.String())
_, err = p.ParseStmts()
if err != nil {
// TODO: errors returned here is errors.errorString, rather than using regex to parser the error
// we should think on using some other library that parses the CH query in more accurate manner,
// current CH parser only does very minimal checks.
// Sample Error: "line 0:36 expected table name or subquery, got ;\nSELECT department, avg(salary) FROM ;\n ^\n"
return &QueryParseError{
ErrorMessage: err.Error(),
Query: query,
}
}
return nil
}
// checkQueriesDisabled checks if all queries are disabled. Returns true if all queries are disabled, false otherwise.
func checkQueriesDisabled(compositeQuery *v3.CompositeQuery) bool {
for _, envelope := range compositeQuery.Queries {
switch envelope.Type {
case qbtypes.QueryTypeBuilder, qbtypes.QueryTypeSubQuery:
switch spec := envelope.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
if !spec.Disabled {
return false
}
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
if !spec.Disabled {
return false
}
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
if !spec.Disabled {
return false
}
}
case qbtypes.QueryTypeFormula:
if spec, ok := envelope.Spec.(qbtypes.QueryBuilderFormula); ok && !spec.Disabled {
return false
}
case qbtypes.QueryTypeTraceOperator:
if spec, ok := envelope.Spec.(qbtypes.QueryBuilderTraceOperator); ok && !spec.Disabled {
return false
}
case qbtypes.QueryTypeJoin:
if spec, ok := envelope.Spec.(qbtypes.QueryBuilderJoin); ok && !spec.Disabled {
return false
}
case qbtypes.QueryTypePromQL:
if spec, ok := envelope.Spec.(qbtypes.PromQuery); ok && !spec.Disabled {
return false
}
case qbtypes.QueryTypeClickHouseSQL:
if spec, ok := envelope.Spec.(qbtypes.ClickHouseQuery); ok && !spec.Disabled {
return false
}
}
}
// If we reach here, all queries are disabled
return true
}

View File

@@ -539,6 +539,11 @@ func (f Function) Copy() Function {
return c
}
// Validate validates the Function by calling Validate on its Name
func (f Function) Validate() error {
return f.Name.Validate()
}
type LimitBy struct {
// keys to limit by
Keys []string `json:"keys"`

View File

@@ -73,6 +73,53 @@ func (f *QueryBuilderFormula) UnmarshalJSON(data []byte) error {
return nil
}
// Validate validates the QueryBuilderFormula
func (f QueryBuilderFormula) Validate() error {
// Validate name is not blank
if strings.TrimSpace(f.Name) == "" {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"formula name cannot be blank",
)
}
// Validate expression is not blank
if strings.TrimSpace(f.Expression) == "" {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"formula expression cannot be blank",
)
}
// If having is not null, validate that expression is not blank
if f.Having != nil {
if strings.TrimSpace(f.Having.Expression) == "" {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"having expression cannot be blank when having clause is present",
)
}
}
// Validate functions if present
for i, fn := range f.Functions {
if err := fn.Validate(); err != nil {
fnId := fmt.Sprintf("function #%d", i+1)
if f.Name != "" {
fnId = fmt.Sprintf("function #%d in formula '%s'", i+1, f.Name)
}
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid %s: %s",
fnId,
err.Error(),
)
}
}
return nil
}
// small container to store the query name and index or alias reference
// for a variable in the formula expression
// read below for more details on aggregation references

View File

@@ -5,6 +5,7 @@ import (
"slices"
"strconv"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -33,6 +34,37 @@ var (
FunctionNameFillZero = FunctionName{valuer.NewString("fillZero")}
)
// Validate validates that the FunctionName is one of the known types
func (fn FunctionName) Validate() error {
switch fn {
case FunctionNameCutOffMin,
FunctionNameCutOffMax,
FunctionNameClampMin,
FunctionNameClampMax,
FunctionNameAbsolute,
FunctionNameRunningDiff,
FunctionNameLog2,
FunctionNameLog10,
FunctionNameCumulativeSum,
FunctionNameEWMA3,
FunctionNameEWMA5,
FunctionNameEWMA7,
FunctionNameMedian3,
FunctionNameMedian5,
FunctionNameMedian7,
FunctionNameTimeShift,
FunctionNameAnomaly,
FunctionNameFillZero:
return nil
default:
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid function name: %s",
fn.StringValue(),
)
}
}
// ApplyFunction applies the given function to the result data
func ApplyFunction(fn Function, result *TimeSeries) *TimeSeries {
// Extract the function name and arguments

View File

@@ -1,6 +1,9 @@
package querybuildertypesv5
import (
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -16,6 +19,15 @@ var (
JoinTypeCross = JoinType{valuer.NewString("cross")}
)
func (j JoinType) Validate() error {
switch j {
case JoinTypeInner, JoinTypeLeft, JoinTypeRight, JoinTypeFull, JoinTypeCross:
return nil
default:
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid join type: %s", j.StringValue())
}
}
type QueryRef struct {
Name string `json:"name"`
}
@@ -53,6 +65,25 @@ type QueryBuilderJoin struct {
Functions []Function `json:"functions,omitempty"`
}
func (q *QueryBuilderJoin) Validate() error {
if strings.TrimSpace(q.Name) == "" {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "name is required")
}
if strings.TrimSpace(q.Left.Name) == "" {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "left name is required")
}
if strings.TrimSpace(q.Right.Name) == "" {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "right name is required")
}
if err := q.Type.Validate(); err != nil {
return err
}
if strings.TrimSpace(q.On) == "" {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "on is required")
}
return nil
}
// Copy creates a deep copy of QueryBuilderJoin
func (q QueryBuilderJoin) Copy() QueryBuilderJoin {
c := q

View File

@@ -10,8 +10,8 @@ import (
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
// getQueryIdentifier returns a friendly identifier for a query based on its type and name/content
func getQueryIdentifier(envelope QueryEnvelope, index int) string {
// GetQueryIdentifier returns a friendly identifier for a query based on its type and name/content
func GetQueryIdentifier(envelope QueryEnvelope, index int) string {
switch envelope.Type {
case QueryTypeBuilder, QueryTypeSubQuery:
switch spec := envelope.Spec.(type) {
@@ -567,7 +567,7 @@ func (r *QueryRangeRequest) validateCompositeQuery() error {
switch spec := envelope.Spec.(type) {
case QueryBuilderQuery[TraceAggregation]:
if err := spec.Validate(r.RequestType); err != nil {
queryId := getQueryIdentifier(envelope, i)
queryId := GetQueryIdentifier(envelope, i)
return wrapValidationError(err, queryId, "invalid %s: %s")
}
// Check name uniqueness for non-formula context
@@ -583,7 +583,7 @@ func (r *QueryRangeRequest) validateCompositeQuery() error {
}
case QueryBuilderQuery[LogAggregation]:
if err := spec.Validate(r.RequestType); err != nil {
queryId := getQueryIdentifier(envelope, i)
queryId := GetQueryIdentifier(envelope, i)
return wrapValidationError(err, queryId, "invalid %s: %s")
}
// Check name uniqueness for non-formula context
@@ -599,7 +599,7 @@ func (r *QueryRangeRequest) validateCompositeQuery() error {
}
case QueryBuilderQuery[MetricAggregation]:
if err := spec.Validate(r.RequestType); err != nil {
queryId := getQueryIdentifier(envelope, i)
queryId := GetQueryIdentifier(envelope, i)
return wrapValidationError(err, queryId, "invalid %s: %s")
}
// Check name uniqueness for non-formula context
@@ -614,7 +614,7 @@ func (r *QueryRangeRequest) validateCompositeQuery() error {
queryNames[spec.Name] = true
}
default:
queryId := getQueryIdentifier(envelope, i)
queryId := GetQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"unknown spec type for %s",
@@ -625,7 +625,7 @@ func (r *QueryRangeRequest) validateCompositeQuery() error {
// Formula validation is handled separately
spec, ok := envelope.Spec.(QueryBuilderFormula)
if !ok {
queryId := getQueryIdentifier(envelope, i)
queryId := GetQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid spec for %s",
@@ -633,7 +633,7 @@ func (r *QueryRangeRequest) validateCompositeQuery() error {
)
}
if spec.Expression == "" {
queryId := getQueryIdentifier(envelope, i)
queryId := GetQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"expression is required for %s",
@@ -644,7 +644,7 @@ func (r *QueryRangeRequest) validateCompositeQuery() error {
// Join validation is handled separately
_, ok := envelope.Spec.(QueryBuilderJoin)
if !ok {
queryId := getQueryIdentifier(envelope, i)
queryId := GetQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid spec for %s",
@@ -654,7 +654,7 @@ func (r *QueryRangeRequest) validateCompositeQuery() error {
case QueryTypeTraceOperator:
spec, ok := envelope.Spec.(QueryBuilderTraceOperator)
if !ok {
queryId := getQueryIdentifier(envelope, i)
queryId := GetQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid spec for %s",
@@ -662,7 +662,7 @@ func (r *QueryRangeRequest) validateCompositeQuery() error {
)
}
if spec.Expression == "" {
queryId := getQueryIdentifier(envelope, i)
queryId := GetQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"expression is required for %s",
@@ -673,7 +673,7 @@ func (r *QueryRangeRequest) validateCompositeQuery() error {
// PromQL validation is handled separately
spec, ok := envelope.Spec.(PromQuery)
if !ok {
queryId := getQueryIdentifier(envelope, i)
queryId := GetQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid spec for %s",
@@ -681,7 +681,7 @@ func (r *QueryRangeRequest) validateCompositeQuery() error {
)
}
if spec.Query == "" {
queryId := getQueryIdentifier(envelope, i)
queryId := GetQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"query expression is required for %s",
@@ -692,7 +692,7 @@ func (r *QueryRangeRequest) validateCompositeQuery() error {
// ClickHouse SQL validation is handled separately
spec, ok := envelope.Spec.(ClickHouseQuery)
if !ok {
queryId := getQueryIdentifier(envelope, i)
queryId := GetQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid spec for %s",
@@ -700,7 +700,7 @@ func (r *QueryRangeRequest) validateCompositeQuery() error {
)
}
if spec.Query == "" {
queryId := getQueryIdentifier(envelope, i)
queryId := GetQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"query expression is required for %s",
@@ -708,7 +708,7 @@ func (r *QueryRangeRequest) validateCompositeQuery() error {
)
}
default:
queryId := getQueryIdentifier(envelope, i)
queryId := GetQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"unknown query type '%s' for %s",
@@ -735,7 +735,7 @@ func (c *CompositeQuery) Validate(requestType RequestType) error {
// Validate each query
for i, envelope := range c.Queries {
if err := validateQueryEnvelope(envelope, requestType); err != nil {
queryId := getQueryIdentifier(envelope, i)
queryId := GetQueryIdentifier(envelope, i)
return wrapValidationError(err, queryId, "invalid %s: %s")
}
}

View File

@@ -8,6 +8,7 @@ import (
"strings"
"time"
signozError "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
@@ -121,6 +122,111 @@ type RuleCondition struct {
Thresholds *RuleThresholdData `json:"thresholds,omitempty"`
}
func (rc *RuleCondition) UnmarshalJSON(data []byte) error {
type Alias RuleCondition
aux := (*Alias)(rc)
if err := json.Unmarshal(data, aux); err != nil {
return signozError.NewInvalidInputf(signozError.CodeInvalidInput, "failed to parse rule condition json: %v", err)
}
var errs []error
// Validate CompositeQuery - must be non-nil and pass validation
if rc.CompositeQuery == nil {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "composite query is required"))
} else {
if err := rc.CompositeQuery.Validate(); err != nil {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "composite query validation failed: %v", err))
}
}
// Validate AlertOnAbsent + AbsentFor - if AlertOnAbsent is true, AbsentFor must be > 0
if rc.AlertOnAbsent && rc.AbsentFor == 0 {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "absentFor must be greater than 0 when alertOnAbsent is true"))
}
// Validate Seasonality - must be one of the allowed values when provided
if !isValidSeasonality(rc.Seasonality) {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "invalid seasonality: %s", rc.Seasonality))
}
// Validate SelectedQueryName - must match one of the query names from CompositeQuery
if rc.SelectedQuery != "" && rc.CompositeQuery != nil {
queryNames := getAllQueryNames(rc.CompositeQuery)
if _, exists := queryNames[rc.SelectedQuery]; !exists {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "selected query name '%s' does not match any query in composite query", rc.SelectedQuery))
}
}
// Validate RequireMinPoints + RequiredNumPoints - if RequireMinPoints is true, RequiredNumPoints must be > 0
if rc.RequireMinPoints && rc.RequiredNumPoints <= 0 {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "requiredNumPoints must be greater than 0 when requireMinPoints is true"))
}
if len(errs) > 0 {
return signozError.Join(errs...)
}
return nil
}
// getAllQueryNames extracts all query names from CompositeQuery across all query types
// Returns a map of query names for quick lookup
func getAllQueryNames(compositeQuery *v3.CompositeQuery) map[string]struct{} {
queryNames := make(map[string]struct{})
// Extract names from Queries (v5 envelopes)
if compositeQuery != nil && compositeQuery.Queries != nil {
for _, query := range compositeQuery.Queries {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
if spec.Name != "" {
queryNames[spec.Name] = struct{}{}
}
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
if spec.Name != "" {
queryNames[spec.Name] = struct{}{}
}
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
if spec.Name != "" {
queryNames[spec.Name] = struct{}{}
}
case qbtypes.QueryBuilderFormula:
if spec.Name != "" {
queryNames[spec.Name] = struct{}{}
}
case qbtypes.QueryBuilderTraceOperator:
if spec.Name != "" {
queryNames[spec.Name] = struct{}{}
}
case qbtypes.PromQuery:
if spec.Name != "" {
queryNames[spec.Name] = struct{}{}
}
case qbtypes.ClickHouseQuery:
if spec.Name != "" {
queryNames[spec.Name] = struct{}{}
}
}
}
}
return queryNames
}
// isValidSeasonality validates that Seasonality is one of the allowed values
func isValidSeasonality(seasonality string) bool {
if seasonality == "" {
return true // empty seasonality is allowed (optional field)
}
switch seasonality {
case "hourly", "daily", "weekly":
return true
default:
return false
}
}
func (rc *RuleCondition) GetSelectedQueryName() string {
if rc != nil {
if rc.SelectedQuery != "" {

View File

@@ -70,6 +70,8 @@ type NotificationSettings struct {
GroupBy []string `json:"groupBy,omitempty"`
Renotify Renotify `json:"renotify,omitempty"`
UsePolicy bool `json:"usePolicy,omitempty"`
// NewGroupEvalDelay is the grace period for new series to be excluded from alerts evaluation
NewGroupEvalDelay *Duration `json:"newGroupEvalDelay,omitempty"`
}
type Renotify struct {
@@ -302,6 +304,39 @@ func isValidLabelValue(v string) bool {
return utf8.ValidString(v)
}
// isValidAlertType validates that the AlertType is one of the allowed enum values
func isValidAlertType(alertType AlertType) bool {
switch alertType {
case AlertTypeMetric, AlertTypeTraces, AlertTypeLogs, AlertTypeExceptions:
return true
default:
return false
}
}
// isValidRuleType validates that the RuleType is one of the allowed enum values
func isValidRuleType(ruleType RuleType) bool {
switch ruleType {
case RuleTypeThreshold, RuleTypeProm, RuleTypeAnomaly:
return true
default:
return false
}
}
// isValidVersion validates that the version is one of the supported versions
func isValidVersion(version string) bool {
if version == "" {
return true // empty version is allowed (optional field)
}
switch version {
case "v3", "v4", "v5":
return true
default:
return false
}
}
func isAllQueriesDisabled(compositeQuery *v3.CompositeQuery) bool {
if compositeQuery == nil {
return false
@@ -357,6 +392,26 @@ func (r *PostableRule) validate() error {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "all queries are disabled in rule condition"))
}
// Validate AlertName - required field
if r.AlertName == "" {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "alert name is required"))
}
// Validate AlertType - must be one of the allowed enum values
if !isValidAlertType(r.AlertType) {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "invalid alert type: %s, must be one of: METRIC_BASED_ALERT, TRACES_BASED_ALERT, LOGS_BASED_ALERT, EXCEPTIONS_BASED_ALERT", r.AlertType))
}
// Validate RuleType - must be one of the allowed enum values
if !isValidRuleType(r.RuleType) {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "invalid rule type: %s, must be one of: threshold_rule, promql_rule, anomaly_rule", r.RuleType))
}
// Validate Version - must be one of the supported versions if provided
if !isValidVersion(r.Version) {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "invalid version: %s, must be one of: v3, v4, v5", r.Version))
}
for k, v := range r.Labels {
if !isValidLabelName(k) {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "invalid label name: %s", k))

View File

@@ -111,9 +111,12 @@ func TestParseIntoRule(t *testing.T) {
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"builderQueries": {
"A": {
"queryName": "A",
"expression": "A",
"dataSource": "metrics",
"disabled": false,
"aggregateAttribute": {
"key": "test_metric"
@@ -149,12 +152,17 @@ func TestParseIntoRule(t *testing.T) {
initRule: PostableRule{},
content: []byte(`{
"alert": "DefaultsRule",
"alertType": "METRIC_BASED_ALERT",
"ruleType": "threshold_rule",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"builderQueries": {
"A": {
"queryName": "A",
"expression": "A",
"dataSource": "metrics",
"disabled": false,
"aggregateAttribute": {
"key": "test_metric"
@@ -187,9 +195,11 @@ func TestParseIntoRule(t *testing.T) {
initRule: PostableRule{},
content: []byte(`{
"alert": "PromQLRule",
"alertType": "METRIC_BASED_ALERT",
"condition": {
"compositeQuery": {
"queryType": "promql",
"panelType": "graph",
"promQueries": {
"A": {
"query": "rate(http_requests_total[5m])",
@@ -255,12 +265,17 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
initRule: PostableRule{},
content: []byte(`{
"alert": "SeverityLabelTest",
"alertType": "METRIC_BASED_ALERT",
"schemaVersion": "v1",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"builderQueries": {
"A": {
"queryName": "A",
"expression": "A",
"dataSource": "metrics",
"aggregateAttribute": {
"key": "cpu_usage"
}
@@ -343,12 +358,17 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
initRule: PostableRule{},
content: []byte(`{
"alert": "NoLabelsTest",
"alertType": "METRIC_BASED_ALERT",
"schemaVersion": "v1",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"builderQueries": {
"A": {
"queryName": "A",
"expression": "A",
"dataSource": "metrics",
"aggregateAttribute": {
"key": "memory_usage"
}
@@ -383,12 +403,17 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
initRule: PostableRule{},
content: []byte(`{
"alert": "OverwriteTest",
"alertType": "METRIC_BASED_ALERT",
"schemaVersion": "v1",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"builderQueries": {
"A": {
"queryName": "A",
"expression": "A",
"dataSource": "metrics",
"aggregateAttribute": {
"key": "cpu_usage"
}
@@ -473,12 +498,17 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
initRule: PostableRule{},
content: []byte(`{
"alert": "V2Test",
"alertType": "METRIC_BASED_ALERT",
"schemaVersion": "v2",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"builderQueries": {
"A": {
"queryName": "A",
"expression": "A",
"dataSource": "metrics",
"aggregateAttribute": {
"key": "test_metric"
}
@@ -517,11 +547,16 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
initRule: PostableRule{},
content: []byte(`{
"alert": "DefaultSchemaTest",
"alertType": "METRIC_BASED_ALERT",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"builderQueries": {
"A": {
"queryName": "A",
"expression": "A",
"dataSource": "metrics",
"aggregateAttribute": {
"key": "test_metric"
}
@@ -569,12 +604,16 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
func TestParseIntoRuleThresholdGeneration(t *testing.T) {
content := []byte(`{
"alert": "TestThresholds",
"alertType": "METRIC_BASED_ALERT",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"builderQueries": {
"A": {
"queryName": "A",
"expression": "A",
"dataSource": "metrics",
"disabled": false,
"aggregateAttribute": {
"key": "response_time"
@@ -638,14 +677,18 @@ func TestParseIntoRuleMultipleThresholds(t *testing.T) {
content := []byte(`{
"schemaVersion": "v2",
"alert": "MultiThresholdAlert",
"alertType": "METRIC_BASED_ALERT",
"ruleType": "threshold_rule",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"unit": "%",
"builderQueries": {
"A": {
"queryName": "A",
"expression": "A",
"dataSource": "metrics",
"disabled": false,
"aggregateAttribute": {
"key": "cpu_usage"
@@ -731,10 +774,12 @@ func TestAnomalyNegationEval(t *testing.T) {
name: "anomaly rule with ValueIsBelow - should alert",
ruleJSON: []byte(`{
"alert": "AnomalyBelowTest",
"alertType": "METRIC_BASED_ALERT",
"ruleType": "anomaly_rule",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"queries": [{
"type": "builder_query",
"spec": {
@@ -765,10 +810,12 @@ func TestAnomalyNegationEval(t *testing.T) {
name: "anomaly rule with ValueIsBelow; should not alert",
ruleJSON: []byte(`{
"alert": "AnomalyBelowTest",
"alertType": "METRIC_BASED_ALERT",
"ruleType": "anomaly_rule",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"queries": [{
"type": "builder_query",
"spec": {
@@ -798,10 +845,12 @@ func TestAnomalyNegationEval(t *testing.T) {
name: "anomaly rule with ValueIsAbove; should alert",
ruleJSON: []byte(`{
"alert": "AnomalyAboveTest",
"alertType": "METRIC_BASED_ALERT",
"ruleType": "anomaly_rule",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"queries": [{
"type": "builder_query",
"spec": {
@@ -832,10 +881,12 @@ func TestAnomalyNegationEval(t *testing.T) {
name: "anomaly rule with ValueIsAbove; should not alert",
ruleJSON: []byte(`{
"alert": "AnomalyAboveTest",
"alertType": "METRIC_BASED_ALERT",
"ruleType": "anomaly_rule",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"queries": [{
"type": "builder_query",
"spec": {
@@ -865,10 +916,12 @@ func TestAnomalyNegationEval(t *testing.T) {
name: "anomaly rule with ValueIsBelow and AllTheTimes; should alert",
ruleJSON: []byte(`{
"alert": "AnomalyBelowAllTest",
"alertType": "METRIC_BASED_ALERT",
"ruleType": "anomaly_rule",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"queries": [{
"type": "builder_query",
"spec": {
@@ -900,10 +953,12 @@ func TestAnomalyNegationEval(t *testing.T) {
name: "anomaly rule with ValueIsBelow and AllTheTimes; should not alert",
ruleJSON: []byte(`{
"alert": "AnomalyBelowAllTest",
"alertType": "METRIC_BASED_ALERT",
"ruleType": "anomaly_rule",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"queries": [{
"type": "builder_query",
"spec": {
@@ -934,10 +989,12 @@ func TestAnomalyNegationEval(t *testing.T) {
name: "anomaly rule with ValueOutsideBounds; should alert",
ruleJSON: []byte(`{
"alert": "AnomalyOutOfBoundsTest",
"alertType": "METRIC_BASED_ALERT",
"ruleType": "anomaly_rule",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"queries": [{
"type": "builder_query",
"spec": {
@@ -968,10 +1025,12 @@ func TestAnomalyNegationEval(t *testing.T) {
name: "non-anomaly threshold rule with ValueIsBelow; should alert",
ruleJSON: []byte(`{
"alert": "ThresholdTest",
"alertType": "METRIC_BASED_ALERT",
"ruleType": "threshold_rule",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"queries": [{
"type": "builder_query",
"spec": {
@@ -1002,10 +1061,12 @@ func TestAnomalyNegationEval(t *testing.T) {
name: "non-anomaly rule with ValueIsBelow - should not alert",
ruleJSON: []byte(`{
"alert": "ThresholdTest",
"alertType": "METRIC_BASED_ALERT",
"ruleType": "threshold_rule",
"condition": {
"compositeQuery": {
"queryType": "builder",
"panelType": "graph",
"queries": [{
"type": "builder_query",
"spec": {