Compare commits

...

3 Commits

Author SHA1 Message Date
Srikanth Chekuri
85d7075d3b Merge branch 'main' into fix/6175 2025-01-30 13:17:44 +05:30
Aniket
c648b72ace feat: added simultaneous temporality changes | 6175 2025-01-24 11:22:11 +05:30
Aniket
a449698dfe feat: added simultaneous temporality changes | 6175 2025-01-24 11:16:28 +05:30
5 changed files with 162 additions and 27 deletions

View File

@@ -2950,6 +2950,58 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []s
return metricNameToTemporality, nil
}
func (r *ClickHouseReader) GetTemporalitySwitchPoints(ctx context.Context, metricName string, startTime int64, endTime int64) ([]v3.TemporalityChangePoint, error) {
// Initialize slice to store temporality switch points
var temporalitySwitches []v3.TemporalityChangePoint
query := fmt.Sprintf(`
SELECT
temporality,
unix_milli,
lag_temporality
FROM (
SELECT
metric_name,
temporality,
unix_milli,
lagInFrame(temporality, 1, '') OVER (
PARTITION BY metric_name ORDER BY unix_milli
) AS lag_temporality
FROM %s.%s
WHERE unix_milli >= %d
AND unix_milli <= %d
AND metric_name = '%s'
) AS subquery
WHERE lag_temporality != temporality
AND lag_temporality != ''
ORDER BY unix_milli ASC;
`, signozMetricDBName, signozTSLocalTableNameV4, startTime, endTime, metricName)
rows, err := r.db.Query(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var temporality string
var timestamp int64
var lagTemporality string
err := rows.Scan(&temporality, &timestamp, &lagTemporality)
if err != nil {
return nil, err
}
// Store each temporality switch point with both temporalities
temporalitySwitches = append(temporalitySwitches, v3.TemporalityChangePoint{
Timestamp: timestamp,
FromTemporality: v3.Temporality(lagTemporality),
ToTemporality: v3.Temporality(temporality),
})
}
return temporalitySwitches, nil
}
func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) {
queryStr := fmt.Sprintf("SELECT countDistinct(fingerprint) as count from %s.%s where metric_name not like 'signoz_%%' group by metric_name order by count desc;", signozMetricDBName, signozTSTableNameV41Day)

View File

@@ -655,6 +655,9 @@ func (aH *APIHandler) PopulateTemporality(ctx context.Context, qp *v3.QueryRange
} else {
query.Temporality = v3.Unspecified
}
if len(aH.temporalityMap[query.AggregateAttribute.Key]) > 1 {
query.MultipleTemporalities = true
}
}
// we don't have temporality for this metric
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
@@ -682,6 +685,9 @@ func (aH *APIHandler) PopulateTemporality(ctx context.Context, qp *v3.QueryRange
} else {
query.Temporality = v3.Unspecified
}
if len(nameToTemporality[query.AggregateAttribute.Key]) > 1 {
query.MultipleTemporalities = true
}
aH.temporalityMap[query.AggregateAttribute.Key] = nameToTemporality[query.AggregateAttribute.Key]
}
}

View File

@@ -177,7 +177,23 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa
for queryName, builderQuery := range params.CompositeQuery.BuilderQueries {
if queryName == builderQuery.Expression {
wg.Add(1)
go q.runBuilderQuery(ctx, builderQuery, params, cacheKeys, ch, &wg)
if builderQuery.MultipleTemporalities == true {
go func() {
temporalitySwitches, err := q.reader.GetTemporalitySwitchPoints(ctx, builderQuery.AggregateAttribute.Key, params.Start, params.End)
if err != nil {
ch <- channelResult{Err: err, Name: queryName}
return
}
if len(temporalitySwitches) == 0 {
q.runBuilderQuery(ctx, builderQuery, params, cacheKeys, ch, &wg)
} else {
q.handleTemporalitySwitches(ctx, temporalitySwitches, &wg, builderQuery, params, cacheKeys, ch, queryName)
}
}()
} else {
go q.runBuilderQuery(ctx, builderQuery, params, cacheKeys, ch, &wg)
}
}
}
@@ -209,6 +225,58 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa
return results, errQueriesByName, err
}
func (q *querier) handleTemporalitySwitches(ctx context.Context, temporalitySwitches []v3.TemporalityChangePoint, wg *sync.WaitGroup, builderQuery *v3.BuilderQuery, params *v3.QueryRangeParamsV3, cacheKeys map[string]string, ch chan channelResult, queryName string) {
defer wg.Done()
tempCh := make(chan channelResult, len(temporalitySwitches)+1)
var tempWg sync.WaitGroup
// Handle each segment between switch points
for i := 0; i <= len(temporalitySwitches); i++ {
tempWg.Add(1)
go func(idx int) {
queryWithTemporality := *builderQuery
queryParams := *params
if i == 0 {
queryParams.End = temporalitySwitches[idx].Timestamp
queryWithTemporality.Temporality = temporalitySwitches[idx].FromTemporality
} else if idx < len(temporalitySwitches) {
queryParams.Start = temporalitySwitches[idx-1].Timestamp
queryParams.End = temporalitySwitches[idx].Timestamp
queryWithTemporality.Temporality = temporalitySwitches[idx].FromTemporality
queryWithTemporality.ShiftBy = 0
} else if idx == len(temporalitySwitches) {
queryParams.Start = temporalitySwitches[idx-1].Timestamp
queryParams.End = params.End
queryWithTemporality.Temporality = temporalitySwitches[idx-1].ToTemporality
}
q.runBuilderQuery(ctx, &queryWithTemporality, &queryParams, cacheKeys, tempCh, &tempWg)
}(i)
}
// Wait for all temporal queries to complete
tempWg.Wait()
close(tempCh)
// Combine results from all temporal queries
var combinedSeries []*v3.Series
var lastErr error
for result := range tempCh {
if result.Err != nil {
lastErr = result.Err
continue
}
combinedSeries = append(combinedSeries, result.Series...)
}
ch <- channelResult{
Series: combinedSeries,
Err: lastErr,
Name: queryName,
}
}
func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
channelResults := make(chan channelResult, len(params.CompositeQuery.PromQueries))
var wg sync.WaitGroup

View File

@@ -115,6 +115,8 @@ type Reader interface {
//trace
GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)
UpdateTraceField(ctx context.Context, field *model.UpdateField) *model.ApiError
GetTemporalitySwitchPoints(ctx context.Context, metricName string, startTime int64, endTime int64) ([]v3.TemporalityChangePoint, error)
}
type Querier interface {

View File

@@ -807,33 +807,34 @@ func (m *MetricValueFilter) Clone() *MetricValueFilter {
}
type BuilderQuery struct {
QueryName string `json:"queryName"`
StepInterval int64 `json:"stepInterval"`
DataSource DataSource `json:"dataSource"`
AggregateOperator AggregateOperator `json:"aggregateOperator"`
AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"`
Temporality Temporality `json:"temporality,omitempty"`
Filters *FilterSet `json:"filters,omitempty"`
GroupBy []AttributeKey `json:"groupBy,omitempty"`
Expression string `json:"expression"`
Disabled bool `json:"disabled"`
Having []Having `json:"having,omitempty"`
Legend string `json:"legend,omitempty"`
Limit uint64 `json:"limit"`
Offset uint64 `json:"offset"`
PageSize uint64 `json:"pageSize"`
OrderBy []OrderBy `json:"orderBy,omitempty"`
ReduceTo ReduceToOperator `json:"reduceTo,omitempty"`
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
QueryName string `json:"queryName"`
StepInterval int64 `json:"stepInterval"`
DataSource DataSource `json:"dataSource"`
AggregateOperator AggregateOperator `json:"aggregateOperator"`
AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"`
Temporality Temporality `json:"temporality,omitempty"`
Filters *FilterSet `json:"filters,omitempty"`
GroupBy []AttributeKey `json:"groupBy,omitempty"`
Expression string `json:"expression"`
Disabled bool `json:"disabled"`
Having []Having `json:"having,omitempty"`
Legend string `json:"legend,omitempty"`
Limit uint64 `json:"limit"`
Offset uint64 `json:"offset"`
PageSize uint64 `json:"pageSize"`
OrderBy []OrderBy `json:"orderBy,omitempty"`
ReduceTo ReduceToOperator `json:"reduceTo,omitempty"`
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
SecondaryAggregation SecondaryAggregation `json:"seriesAggregation,omitempty"`
Functions []Function `json:"functions,omitempty"`
ShiftBy int64
IsAnomaly bool
QueriesUsedInFormula []string
MetricTableHints *MetricTableHints `json:"-"`
MetricValueFilter *MetricValueFilter `json:"-"`
Functions []Function `json:"functions,omitempty"`
ShiftBy int64
IsAnomaly bool
QueriesUsedInFormula []string
MetricTableHints *MetricTableHints `json:"-"`
MetricValueFilter *MetricValueFilter `json:"-"`
MultipleTemporalities bool
}
func (b *BuilderQuery) SetShiftByFromFunc() {
@@ -1406,3 +1407,9 @@ type QBOptions struct {
IsLivetailQuery bool
PreferRPM bool
}
type TemporalityChangePoint struct {
Timestamp int64 `json:"timestamp"`
FromTemporality Temporality `json:"from_temporality"`
ToTemporality Temporality `json:"to_temporality"`
}