Compare commits

...

4 Commits

Author SHA1 Message Date
Srikanth Chekuri
a705e7320f chore: make step multiple of aggregation interval of table 2024-08-13 00:08:11 +05:30
Srikanth Chekuri
3297934a90 Merge branch 'develop' into agg-tables 2024-08-12 22:00:21 +05:30
Srikanth Chekuri
a56b32328d chore: update the step interval 2024-08-12 19:58:07 +05:30
Srikanth Chekuri
424c08805e chore: update query builder to use 5min/30min aggregation tables 2024-08-12 19:24:45 +05:30
10 changed files with 183 additions and 64 deletions

View File

@@ -114,12 +114,14 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
tableName := helpers.WhichSamplesTableToUse(start, end, mq)
// Select the aggregate value for interval
queryTmpl :=
"SELECT fingerprint, %s" +
" toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
" %s as per_series_value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + tableName +
" INNER JOIN" +
" (%s) as filtered_time_series" +
" USING fingerprint" +
@@ -130,37 +132,30 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
selectLabelsAny := helpers.SelectLabelsAny(mq.GroupBy)
selectLabels := helpers.SelectLabels(mq.GroupBy)
op := helpers.AggregationColumnForSamplesTable(start, end, mq)
switch mq.TimeAggregation {
case v3.TimeAggregationAvg:
op := "avg(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationSum:
op := "sum(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationMin:
op := "min(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationMax:
op := "max(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationCount:
op := "count(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationCountDistinct:
op := "count(distinct(value))"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationAnyLast:
op := "anyLast(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationRate:
op := "max(value)"
innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
rateQueryTmpl :=
"SELECT %s ts, " + rateWithoutNegative +
" as per_series_value FROM (%s) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)"
subQuery = fmt.Sprintf(rateQueryTmpl, selectLabels, innerSubQuery)
case v3.TimeAggregationIncrease:
op := "max(value)"
innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
rateQueryTmpl :=
"SELECT %s ts, " + increaseWithoutNegative +

View File

@@ -62,11 +62,11 @@ func TestPrepareTimeAggregationSubQuery(t *testing.T) {
}},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationAvg,
TimeAggregation: v3.TimeAggregationIncrease,
},
start: 1701794980000,
end: 1701796780000,
expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts",
expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts",
},
{
name: "test time aggregation = rate, temporality = cumulative",

View File

@@ -62,11 +62,11 @@ func TestPrepareTimeAggregationSubQuery(t *testing.T) {
}},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationAvg,
TimeAggregation: v3.TimeAggregationIncrease,
},
start: 1701794980000,
end: 1701796780000,
expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts",
expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts",
},
{
name: "test time aggregation = rate, temporality = delta",

View File

@@ -26,12 +26,14 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
tableName := helpers.WhichSamplesTableToUse(start, end, mq)
// Select the aggregate value for interval
queryTmpl :=
"SELECT fingerprint, %s" +
" toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
" %s as per_series_value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + tableName +
" INNER JOIN" +
" (%s) as filtered_time_series" +
" USING fingerprint" +
@@ -41,33 +43,27 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
selectLabelsAny := helpers.SelectLabelsAny(mq.GroupBy)
op := helpers.AggregationColumnForSamplesTable(start, end, mq)
switch mq.TimeAggregation {
case v3.TimeAggregationAvg:
op := "avg(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationSum:
op := "sum(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationMin:
op := "min(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationMax:
op := "max(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationCount:
op := "count(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationCountDistinct:
op := "count(distinct(value))"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationAnyLast:
op := "anyLast(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationRate:
op := fmt.Sprintf("sum(value)/%d", step)
op := fmt.Sprintf("%s/%d", op, step)
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationIncrease:
op := "sum(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
}
return subQuery, nil
@@ -89,10 +85,8 @@ func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string,
samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
var tableName string = constants.SIGNOZ_SAMPLES_V4_TABLENAME
if mq.AggregateAttribute.Type == v3.AttributeKeyType(v3.MetricTypeExponentialHistogram) {
tableName = "distributed_exp_hist"
}
tableName := helpers.WhichSamplesTableToUse(start, end, mq)
// Select the aggregate value for interval
queryTmpl :=
"SELECT %s" +
@@ -108,16 +102,16 @@ func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string,
switch mq.SpaceAggregation {
case v3.SpaceAggregationSum:
op := "sum(value)"
op := helpers.AggregationColumnForSamplesTable(start, end, mq)
if mq.TimeAggregation == v3.TimeAggregationRate {
op = "sum(value)/" + fmt.Sprintf("%d", step)
op = fmt.Sprintf("%s/%d", op, step)
}
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
case v3.SpaceAggregationMin:
op := "min(value)"
op := helpers.AggregationColumnForSamplesTable(start, end, mq)
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
case v3.SpaceAggregationMax:
op := "max(value)"
op := helpers.AggregationColumnForSamplesTable(start, end, mq)
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
case v3.SpaceAggregationPercentile50,
v3.SpaceAggregationPercentile75,

View File

@@ -13,31 +13,129 @@ import (
var (
sixHoursInMilliseconds = time.Hour.Milliseconds() * 6
oneDayInMilliseconds = time.Hour.Milliseconds() * 24
oneWeekInMilliseconds = oneDayInMilliseconds * 7
)
// start and end are in milliseconds
func which(start, end int64) (int64, int64, string) {
func whichTSTableToUse(start, end int64) (int64, int64, string) {
// If time range is less than 6 hours, we need to use the `time_series_v4` table
// else if time range is less than 1 day and greater than 6 hours, we need to use the `time_series_v4_6hrs` table
// else we need to use the `time_series_v4_1day` table
// else if time range is less than 1 week and greater than 1 day, we need to use the `time_series_v4_1day` table
// else we need to use the `time_series_v4_1week` table
var tableName string
if end-start <= sixHoursInMilliseconds {
if end-start < sixHoursInMilliseconds {
// adjust the start time to nearest 1 hour
start = start - (start % (time.Hour.Milliseconds() * 1))
tableName = constants.SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME
} else if end-start <= oneDayInMilliseconds {
} else if end-start < oneDayInMilliseconds {
// adjust the start time to nearest 6 hours
start = start - (start % (time.Hour.Milliseconds() * 6))
tableName = constants.SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME
} else {
} else if end-start < oneWeekInMilliseconds {
// adjust the start time to nearest 1 day
start = start - (start % (time.Hour.Milliseconds() * 24))
tableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME
} else {
// adjust the start time to nearest 1 week
start = start - (start % (time.Hour.Milliseconds() * 24 * 7))
tableName = constants.SIGNOZ_TIMESERIES_v4_1WEEK_LOCAL_TABLENAME
}
return start, end, tableName
}
// start and end are in milliseconds
// we have three tables for samples
// 1. distributed_samples_v4
// 2. distributed_samples_v4_agg_5m - for queries with time range above or equal to 1 day and less than 1 week
// 3. distributed_samples_v4_agg_30m - for queries with time range above or equal to 1 week
// if the `timeAggregation` is `count_distinct` we can't use the aggregated tables because they don't support it
func WhichSamplesTableToUse(start, end int64, mq *v3.BuilderQuery) string {
// we don't have any aggregated table for sketches (yet)
if mq.AggregateAttribute.Type == v3.AttributeKeyType(v3.MetricTypeExponentialHistogram) {
return constants.SIGNOZ_EXP_HISTOGRAM_TABLENAME
}
// if the time aggregation is count_distinct, we need to use the distributed_samples_v4 table
// because the aggregated tables don't support count_distinct
if mq.TimeAggregation == v3.TimeAggregationCountDistinct {
return constants.SIGNOZ_SAMPLES_V4_TABLENAME
}
if end-start < oneDayInMilliseconds {
return constants.SIGNOZ_SAMPLES_V4_TABLENAME
} else if end-start < oneWeekInMilliseconds {
return constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME
} else {
return constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME
}
}
func AggregationColumnForSamplesTable(start, end int64, mq *v3.BuilderQuery) string {
tableName := WhichSamplesTableToUse(start, end, mq)
var aggregationColumn string
switch mq.Temporality {
case v3.Delta:
// for delta metrics, we only support `RATE`/`INCREASE` both of which are sum
switch tableName {
case constants.SIGNOZ_SAMPLES_V4_TABLENAME:
aggregationColumn = "sum(value)"
case constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME, constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME:
aggregationColumn = "sum(sum)"
}
case v3.Cumulative:
// for cumulative metrics, we only support `RATE`/`INCREASE`. The max value in window is
// used to calculate the sum which is then divided by the window size to get the rate
switch tableName {
case constants.SIGNOZ_SAMPLES_V4_TABLENAME:
aggregationColumn = "max(value)"
case constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME, constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME:
aggregationColumn = "max(max)"
}
case v3.Unspecified:
switch tableName {
case constants.SIGNOZ_SAMPLES_V4_TABLENAME:
switch mq.TimeAggregation {
case v3.TimeAggregationAnyLast:
aggregationColumn = "anyLast(value)"
case v3.TimeAggregationSum:
aggregationColumn = "sum(value)"
case v3.TimeAggregationAvg:
aggregationColumn = "avg(value)"
case v3.TimeAggregationMin:
aggregationColumn = "min(value)"
case v3.TimeAggregationMax:
aggregationColumn = "max(value)"
case v3.TimeAggregationCount:
aggregationColumn = "count(value)"
case v3.TimeAggregationCountDistinct:
aggregationColumn = "countDistinct(value)"
case v3.TimeAggregationRate, v3.TimeAggregationIncrease: // ideally, this should never happen
aggregationColumn = "sum(value)"
}
case constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME, constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME:
switch mq.TimeAggregation {
case v3.TimeAggregationAnyLast:
aggregationColumn = "anyLast(last)"
case v3.TimeAggregationSum:
aggregationColumn = "sum(sum)"
case v3.TimeAggregationAvg:
aggregationColumn = "sum(sum) / sum(count)"
case v3.TimeAggregationMin:
aggregationColumn = "min(min)"
case v3.TimeAggregationMax:
aggregationColumn = "max(max)"
case v3.TimeAggregationCount:
aggregationColumn = "sum(count)"
case v3.TimeAggregationRate, v3.TimeAggregationIncrease: // ideally, this should never happen
aggregationColumn = "sum(sum)"
}
}
}
return aggregationColumn
}
// PrepareTimeseriesFilterQuery builds the sub-query to be used for filtering timeseries based on the search criteria
func PrepareTimeseriesFilterQuery(start, end int64, mq *v3.BuilderQuery) (string, error) {
var conditions []string
@@ -47,7 +145,7 @@ func PrepareTimeseriesFilterQuery(start, end int64, mq *v3.BuilderQuery) (string
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key)))
conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality))
start, end, tableName := which(start, end)
start, end, tableName := whichTSTableToUse(start, end)
conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end))
@@ -127,7 +225,7 @@ func PrepareTimeseriesFilterQueryV3(start, end int64, mq *v3.BuilderQuery) (stri
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key)))
conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality))
start, end, tableName := which(start, end)
start, end, tableName := whichTSTableToUse(start, end)
conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end))

View File

@@ -193,7 +193,7 @@ func TestPrepareMetricQueryCumulativeRate(t *testing.T) {
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(max) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
},
{
name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative, multiple group by",
@@ -226,7 +226,7 @@ func TestPrepareMetricQueryCumulativeRate(t *testing.T) {
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
expectedQueryContains: "SELECT service_name, endpoint, ts, sum(per_series_value) as value FROM (SELECT service_name, endpoint, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(endpoint) as endpoint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'endpoint') as endpoint, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, endpoint, ts ORDER BY service_name ASC, endpoint ASC, ts ASC",
expectedQueryContains: "SELECT service_name, endpoint, ts, sum(per_series_value) as value FROM (SELECT service_name, endpoint, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(endpoint) as endpoint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(max) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'endpoint') as endpoint, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, endpoint, ts ORDER BY service_name ASC, endpoint ASC, ts ASC",
},
}
@@ -266,7 +266,7 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) {
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY ts ORDER BY ts ASC",
expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum)/60 as value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY ts ORDER BY ts ASC",
},
{
name: "test time aggregation = rate, space aggregation = sum, temporality = delta, group by service_name",
@@ -292,12 +292,14 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) {
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum)/60 as value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
// 1650991982000 - April 26, 2022 10:23:02 PM
// 1651078382000 - April 27, 2022 10:23:02 PM
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
@@ -344,7 +346,7 @@ func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) {
Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99,
},
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(max) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
},
{
name: "test temporality = cumulative, quantile = 0.99 without group by",
@@ -374,12 +376,14 @@ func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) {
Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99,
},
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(max) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
// 1650991982000 - April 26, 2022 10:23:02 PM
// 1651078382000 - April 27, 2022 10:23:02 PM
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
@@ -426,7 +430,7 @@ func TestPrepreMetricQueryDeltaQuantile(t *testing.T) {
Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99,
},
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum)/60 as value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
},
{
name: "test temporality = delta, quantile = 0.99 no group by",
@@ -456,12 +460,14 @@ func TestPrepreMetricQueryDeltaQuantile(t *testing.T) {
Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99,
},
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum)/60 as value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
// 1650991982000 - April 26, 2022 10:23:02 PM
// 1651078382000 - April 27, 2022 10:23:02 PM
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
@@ -494,7 +500,7 @@ func TestPrepareMetricQueryGauge(t *testing.T) {
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC",
expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum) / sum(count) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC",
},
{
name: "test gauge query with group by host_name",
@@ -520,12 +526,14 @@ func TestPrepareMetricQueryGauge(t *testing.T) {
Expression: "A",
Disabled: false,
},
expectedQueryContains: "SELECT host_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY host_name, ts ORDER BY host_name ASC, ts ASC",
expectedQueryContains: "SELECT host_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum) / sum(count) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY host_name, ts ORDER BY host_name ASC, ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
// 1650991982000 - April 26, 2022 10:23:02 PM
// 1651078382000 - April 27, 2022 10:23:02 PM
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)

View File

@@ -1068,6 +1068,17 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE
query.StepInterval = minStep
}
if query.DataSource == v3.DataSourceMetrics {
// if the time range is greater than 1 day, and less than 1 week set the step interval to be multiple of 5 minutes
// if the time range is greater than 1 week, set the step interval to be multiple of 30 mins
start, end := queryRangeParams.Start, queryRangeParams.End
if end-start >= 24*time.Hour.Milliseconds() && end-start < 7*24*time.Hour.Milliseconds() {
query.StepInterval = int64(math.Round(float64(query.StepInterval)/300)) * 300
} else if end-start >= 7*24*time.Hour.Milliseconds() {
query.StepInterval = int64(math.Round(float64(query.StepInterval)/1800)) * 1800
}
}
// Remove the time shift function from the list of functions and set the shift by value
var timeShiftBy int64
if len(query.Functions) > 0 {

View File

@@ -227,8 +227,8 @@ func TestDeltaQueryBuilder(t *testing.T) {
{
name: "TestQueryWithName - Request rate",
query: &v3.QueryRangeParamsV3{
Start: 1650991982000,
End: 1651078382000,
Start: 1650991982000, // 2022-04-25 10:53:02
End: 1651078382000, // 2022-04-26 10:53:02
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
@@ -260,13 +260,13 @@ func TestDeltaQueryBuilder(t *testing.T) {
},
},
queryToTest: "A",
expected: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts",
expected: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts",
},
{
name: "TestQueryWithExpression - Error rate",
query: &v3.QueryRangeParamsV3{
Start: 1650991982000,
End: 1651078382000,
Start: 1650991982000, // 2022-04-25 10:53:02
End: 1651078382000, // 2022-04-26 10:53:02
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
@@ -330,7 +330,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
},
},
queryToTest: "C",
expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`",
expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`",
},
{
name: "TestQuery - Quantile",
@@ -358,7 +358,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
},
},
queryToTest: "A",
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
},
}

View File

@@ -215,14 +215,18 @@ var GroupByColMap = map[string]struct{}{
}
const (
SIGNOZ_METRIC_DBNAME = "signoz_metrics"
SIGNOZ_SAMPLES_V4_TABLENAME = "distributed_samples_v4"
SIGNOZ_TRACE_DBNAME = "signoz_traces"
SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2"
SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4"
SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs"
SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day"
SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME = "distributed_time_series_v4_1day"
SIGNOZ_METRIC_DBNAME = "signoz_metrics"
SIGNOZ_SAMPLES_V4_TABLENAME = "distributed_samples_v4"
SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME = "distributed_samples_v4_agg_5m"
SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME = "distributed_samples_v4_agg_30m"
SIGNOZ_EXP_HISTOGRAM_TABLENAME = "distributed_exp_hist"
SIGNOZ_TRACE_DBNAME = "signoz_traces"
SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2"
SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4"
SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs"
SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day"
SIGNOZ_TIMESERIES_v4_1WEEK_LOCAL_TABLENAME = "time_series_v4_1week"
SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME = "distributed_time_series_v4_1day"
)
var TimeoutExcludedRoutes = map[string]bool{

View File

@@ -531,6 +531,15 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 {
if minStep := common.MinAllowedStepInterval(start, end); q.StepInterval < minStep {
q.StepInterval = minStep
}
if q.DataSource == v3.DataSourceMetrics {
// if the time range is greater than 1 day, and less than 1 week set the step interval to be multiple of 5 minutes
// if the time range is greater than 1 week, set the step interval to be multiple of 30 mins
if end-start >= 24*time.Hour.Milliseconds() && end-start < 7*24*time.Hour.Milliseconds() {
q.StepInterval = int64(math.Round(float64(q.StepInterval)/300)) * 300
} else if end-start >= 7*24*time.Hour.Milliseconds() {
q.StepInterval = int64(math.Round(float64(q.StepInterval)/1800)) * 1800
}
}
}
}