Compare commits

...

2 Commits

Author SHA1 Message Date
Srikanth Chekuri
51838cd0a9 Merge branch 'main' into feat/1196 2025-01-14 23:14:14 +05:30
Aniket
e67a024262 feat: added max_time_sereis and max_samples in prom queries 2025-01-12 15:31:54 +05:30
3 changed files with 29 additions and 6 deletions

View File

@@ -219,6 +219,19 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
continue
}
wg.Add(1)
// Create the modified context before launching goroutine for each query
queryCtx := ctx
if promQuery.MaxTimeSeries != 0 || promQuery.MaxSamples != 0 {
queryCtx = context.WithValue(ctx, constants.ResultOverflowMode, "throw")
if promQuery.MaxTimeSeries != 0 {
queryCtx = context.WithValue(queryCtx, constants.MaxRowsToGroupBy, promQuery.MaxTimeSeries)
}
if promQuery.MaxSamples != 0 {
queryCtx = context.WithValue(queryCtx, constants.MaxResultRows, promQuery.MaxSamples)
}
}
go func(queryName string, promQuery *v3.PromQuery) {
defer wg.Done()
cacheKey, ok := cacheKeys[queryName]
@@ -226,16 +239,17 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
if !ok || params.NoCache {
zap.L().Info("skipping cache for metrics prom query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
query := metricsV4.BuildPromQuery(promQuery, params.Step, params.Start, params.End)
series, err := q.execPromQuery(ctx, query)
series, err := q.execPromQuery(queryCtx, query)
channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: series}
return
}
misses := q.queryCache.FindMissingTimeRanges(params.Start, params.End, params.Step, cacheKey)
zap.L().Info("cache misses for metrics prom query", zap.Any("misses", misses))
missedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses {
query := metricsV4.BuildPromQuery(promQuery, params.Step, miss.Start, miss.End)
series, err := q.execPromQuery(ctx, query)
series, err := q.execPromQuery(queryCtx, query)
if err != nil {
channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: nil}
return

View File

@@ -734,3 +734,10 @@ func init() {
}
const TRACE_V4_MAX_PAGINATION_LIMIT = 10000
// ClickHouse context settings
const (
ResultOverflowMode = "result_overflow_mode"
MaxRowsToGroupBy = "max_rows_to_group_by"
MaxResultRows = "max_result_rows"
)

View File

@@ -398,10 +398,12 @@ func (q *QueryRangeParamsV3) Clone() *QueryRangeParamsV3 {
}
type PromQuery struct {
Query string `json:"query"`
Stats string `json:"stats,omitempty"`
Disabled bool `json:"disabled"`
Legend string `json:"legend,omitempty"`
Query string `json:"query"`
Stats string `json:"stats,omitempty"`
Disabled bool `json:"disabled"`
Legend string `json:"legend,omitempty"`
MaxTimeSeries int `json:"maxTimeSeries,omitempty"`
MaxSamples int `json:"maxSamples,omitempty"`
}
func (p *PromQuery) Clone() *PromQuery {