Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
85d7075d3b | ||
|
|
c648b72ace | ||
|
|
a449698dfe |
@@ -2950,6 +2950,58 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []s
|
||||
return metricNameToTemporality, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTemporalitySwitchPoints(ctx context.Context, metricName string, startTime int64, endTime int64) ([]v3.TemporalityChangePoint, error) {
|
||||
// Initialize slice to store temporality switch points
|
||||
var temporalitySwitches []v3.TemporalityChangePoint
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
temporality,
|
||||
unix_milli,
|
||||
lag_temporality
|
||||
FROM (
|
||||
SELECT
|
||||
metric_name,
|
||||
temporality,
|
||||
unix_milli,
|
||||
lagInFrame(temporality, 1, '') OVER (
|
||||
PARTITION BY metric_name ORDER BY unix_milli
|
||||
) AS lag_temporality
|
||||
FROM %s.%s
|
||||
WHERE unix_milli >= %d
|
||||
AND unix_milli <= %d
|
||||
AND metric_name = '%s'
|
||||
) AS subquery
|
||||
WHERE lag_temporality != temporality
|
||||
AND lag_temporality != ''
|
||||
ORDER BY unix_milli ASC;
|
||||
`, signozMetricDBName, signozTSLocalTableNameV4, startTime, endTime, metricName)
|
||||
|
||||
rows, err := r.db.Query(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var temporality string
|
||||
var timestamp int64
|
||||
var lagTemporality string
|
||||
err := rows.Scan(&temporality, ×tamp, &lagTemporality)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Store each temporality switch point with both temporalities
|
||||
temporalitySwitches = append(temporalitySwitches, v3.TemporalityChangePoint{
|
||||
Timestamp: timestamp,
|
||||
FromTemporality: v3.Temporality(lagTemporality),
|
||||
ToTemporality: v3.Temporality(temporality),
|
||||
})
|
||||
}
|
||||
|
||||
return temporalitySwitches, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) {
|
||||
|
||||
queryStr := fmt.Sprintf("SELECT countDistinct(fingerprint) as count from %s.%s where metric_name not like 'signoz_%%' group by metric_name order by count desc;", signozMetricDBName, signozTSTableNameV41Day)
|
||||
|
||||
@@ -655,6 +655,9 @@ func (aH *APIHandler) PopulateTemporality(ctx context.Context, qp *v3.QueryRange
|
||||
} else {
|
||||
query.Temporality = v3.Unspecified
|
||||
}
|
||||
if len(aH.temporalityMap[query.AggregateAttribute.Key]) > 1 {
|
||||
query.MultipleTemporalities = true
|
||||
}
|
||||
}
|
||||
// we don't have temporality for this metric
|
||||
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
|
||||
@@ -682,6 +685,9 @@ func (aH *APIHandler) PopulateTemporality(ctx context.Context, qp *v3.QueryRange
|
||||
} else {
|
||||
query.Temporality = v3.Unspecified
|
||||
}
|
||||
if len(nameToTemporality[query.AggregateAttribute.Key]) > 1 {
|
||||
query.MultipleTemporalities = true
|
||||
}
|
||||
aH.temporalityMap[query.AggregateAttribute.Key] = nameToTemporality[query.AggregateAttribute.Key]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -177,7 +177,23 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa
|
||||
for queryName, builderQuery := range params.CompositeQuery.BuilderQueries {
|
||||
if queryName == builderQuery.Expression {
|
||||
wg.Add(1)
|
||||
go q.runBuilderQuery(ctx, builderQuery, params, cacheKeys, ch, &wg)
|
||||
if builderQuery.MultipleTemporalities == true {
|
||||
go func() {
|
||||
|
||||
temporalitySwitches, err := q.reader.GetTemporalitySwitchPoints(ctx, builderQuery.AggregateAttribute.Key, params.Start, params.End)
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: err, Name: queryName}
|
||||
return
|
||||
}
|
||||
if len(temporalitySwitches) == 0 {
|
||||
q.runBuilderQuery(ctx, builderQuery, params, cacheKeys, ch, &wg)
|
||||
} else {
|
||||
q.handleTemporalitySwitches(ctx, temporalitySwitches, &wg, builderQuery, params, cacheKeys, ch, queryName)
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
go q.runBuilderQuery(ctx, builderQuery, params, cacheKeys, ch, &wg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,6 +225,58 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa
|
||||
return results, errQueriesByName, err
|
||||
}
|
||||
|
||||
func (q *querier) handleTemporalitySwitches(ctx context.Context, temporalitySwitches []v3.TemporalityChangePoint, wg *sync.WaitGroup, builderQuery *v3.BuilderQuery, params *v3.QueryRangeParamsV3, cacheKeys map[string]string, ch chan channelResult, queryName string) {
|
||||
defer wg.Done()
|
||||
|
||||
tempCh := make(chan channelResult, len(temporalitySwitches)+1)
|
||||
|
||||
var tempWg sync.WaitGroup
|
||||
// Handle each segment between switch points
|
||||
for i := 0; i <= len(temporalitySwitches); i++ {
|
||||
tempWg.Add(1)
|
||||
go func(idx int) {
|
||||
queryWithTemporality := *builderQuery
|
||||
queryParams := *params
|
||||
if i == 0 {
|
||||
queryParams.End = temporalitySwitches[idx].Timestamp
|
||||
queryWithTemporality.Temporality = temporalitySwitches[idx].FromTemporality
|
||||
} else if idx < len(temporalitySwitches) {
|
||||
queryParams.Start = temporalitySwitches[idx-1].Timestamp
|
||||
queryParams.End = temporalitySwitches[idx].Timestamp
|
||||
queryWithTemporality.Temporality = temporalitySwitches[idx].FromTemporality
|
||||
queryWithTemporality.ShiftBy = 0
|
||||
} else if idx == len(temporalitySwitches) {
|
||||
queryParams.Start = temporalitySwitches[idx-1].Timestamp
|
||||
queryParams.End = params.End
|
||||
queryWithTemporality.Temporality = temporalitySwitches[idx-1].ToTemporality
|
||||
}
|
||||
|
||||
q.runBuilderQuery(ctx, &queryWithTemporality, &queryParams, cacheKeys, tempCh, &tempWg)
|
||||
}(i)
|
||||
}
|
||||
// Wait for all temporal queries to complete
|
||||
tempWg.Wait()
|
||||
close(tempCh)
|
||||
|
||||
// Combine results from all temporal queries
|
||||
var combinedSeries []*v3.Series
|
||||
var lastErr error
|
||||
|
||||
for result := range tempCh {
|
||||
if result.Err != nil {
|
||||
lastErr = result.Err
|
||||
continue
|
||||
}
|
||||
combinedSeries = append(combinedSeries, result.Series...)
|
||||
}
|
||||
|
||||
ch <- channelResult{
|
||||
Series: combinedSeries,
|
||||
Err: lastErr,
|
||||
Name: queryName,
|
||||
}
|
||||
}
|
||||
|
||||
func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
|
||||
channelResults := make(chan channelResult, len(params.CompositeQuery.PromQueries))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
@@ -115,6 +115,8 @@ type Reader interface {
|
||||
//trace
|
||||
GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)
|
||||
UpdateTraceField(ctx context.Context, field *model.UpdateField) *model.ApiError
|
||||
|
||||
GetTemporalitySwitchPoints(ctx context.Context, metricName string, startTime int64, endTime int64) ([]v3.TemporalityChangePoint, error)
|
||||
}
|
||||
|
||||
type Querier interface {
|
||||
|
||||
@@ -807,33 +807,34 @@ func (m *MetricValueFilter) Clone() *MetricValueFilter {
|
||||
}
|
||||
|
||||
type BuilderQuery struct {
|
||||
QueryName string `json:"queryName"`
|
||||
StepInterval int64 `json:"stepInterval"`
|
||||
DataSource DataSource `json:"dataSource"`
|
||||
AggregateOperator AggregateOperator `json:"aggregateOperator"`
|
||||
AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"`
|
||||
Temporality Temporality `json:"temporality,omitempty"`
|
||||
Filters *FilterSet `json:"filters,omitempty"`
|
||||
GroupBy []AttributeKey `json:"groupBy,omitempty"`
|
||||
Expression string `json:"expression"`
|
||||
Disabled bool `json:"disabled"`
|
||||
Having []Having `json:"having,omitempty"`
|
||||
Legend string `json:"legend,omitempty"`
|
||||
Limit uint64 `json:"limit"`
|
||||
Offset uint64 `json:"offset"`
|
||||
PageSize uint64 `json:"pageSize"`
|
||||
OrderBy []OrderBy `json:"orderBy,omitempty"`
|
||||
ReduceTo ReduceToOperator `json:"reduceTo,omitempty"`
|
||||
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
|
||||
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
|
||||
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
|
||||
QueryName string `json:"queryName"`
|
||||
StepInterval int64 `json:"stepInterval"`
|
||||
DataSource DataSource `json:"dataSource"`
|
||||
AggregateOperator AggregateOperator `json:"aggregateOperator"`
|
||||
AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"`
|
||||
Temporality Temporality `json:"temporality,omitempty"`
|
||||
Filters *FilterSet `json:"filters,omitempty"`
|
||||
GroupBy []AttributeKey `json:"groupBy,omitempty"`
|
||||
Expression string `json:"expression"`
|
||||
Disabled bool `json:"disabled"`
|
||||
Having []Having `json:"having,omitempty"`
|
||||
Legend string `json:"legend,omitempty"`
|
||||
Limit uint64 `json:"limit"`
|
||||
Offset uint64 `json:"offset"`
|
||||
PageSize uint64 `json:"pageSize"`
|
||||
OrderBy []OrderBy `json:"orderBy,omitempty"`
|
||||
ReduceTo ReduceToOperator `json:"reduceTo,omitempty"`
|
||||
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
|
||||
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
|
||||
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
|
||||
SecondaryAggregation SecondaryAggregation `json:"seriesAggregation,omitempty"`
|
||||
Functions []Function `json:"functions,omitempty"`
|
||||
ShiftBy int64
|
||||
IsAnomaly bool
|
||||
QueriesUsedInFormula []string
|
||||
MetricTableHints *MetricTableHints `json:"-"`
|
||||
MetricValueFilter *MetricValueFilter `json:"-"`
|
||||
Functions []Function `json:"functions,omitempty"`
|
||||
ShiftBy int64
|
||||
IsAnomaly bool
|
||||
QueriesUsedInFormula []string
|
||||
MetricTableHints *MetricTableHints `json:"-"`
|
||||
MetricValueFilter *MetricValueFilter `json:"-"`
|
||||
MultipleTemporalities bool
|
||||
}
|
||||
|
||||
func (b *BuilderQuery) SetShiftByFromFunc() {
|
||||
@@ -1406,3 +1407,9 @@ type QBOptions struct {
|
||||
IsLivetailQuery bool
|
||||
PreferRPM bool
|
||||
}
|
||||
|
||||
type TemporalityChangePoint struct {
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
FromTemporality Temporality `json:"from_temporality"`
|
||||
ToTemporality Temporality `json:"to_temporality"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user