Compare commits

...

8 Commits

Author SHA1 Message Date
Srikanth Chekuri
e6ad741835 Merge branch 'main' into qbv5-final 2025-07-28 18:02:26 +05:30
srikanthccv
890e811a07 chore: some updates 2025-07-28 18:02:08 +05:30
srikanthccv
6ca6d8633d chore: fix tests 2025-07-26 03:01:29 +05:30
srikanthccv
02c320a6fa chore: fix tests 2025-07-26 02:10:43 +05:30
srikanthccv
8e6c38de15 Merge branch 'qbv5-final' of github.com:SigNoz/signoz into qbv5-final 2025-07-26 01:44:06 +05:30
srikanthccv
2b617e01e8 chore: add events, links and trace view 2025-07-26 01:43:32 +05:30
Srikanth Chekuri
39261439b3 Merge branch 'main' into qbv5-final 2025-07-25 15:46:03 +05:30
srikanthccv
35c2667caa chore: qbv5 migration 2025-07-25 15:17:05 +05:30
63 changed files with 4530 additions and 1436 deletions

34
ee/anomaly/daily.go Normal file
View File

@@ -0,0 +1,34 @@
package anomaly
import (
"context"
"github.com/SigNoz/signoz/pkg/valuer"
)
type DailyProvider struct {
BaseSeasonalProvider
}
var _ BaseProvider = (*DailyProvider)(nil)
func (dp *DailyProvider) GetBaseSeasonalProvider() *BaseSeasonalProvider {
return &dp.BaseSeasonalProvider
}
func NewDailyProvider(opts ...GenericProviderOption[*DailyProvider]) *DailyProvider {
dp := &DailyProvider{
BaseSeasonalProvider: BaseSeasonalProvider{},
}
for _, opt := range opts {
opt(dp)
}
return dp
}
func (p *DailyProvider) GetAnomalies(ctx context.Context, orgID valuer.UUID, req *AnomaliesRequest) (*AnomaliesResponse, error) {
req.Seasonality = SeasonalityDaily
return p.getAnomalies(ctx, orgID, req)
}

35
ee/anomaly/hourly.go Normal file
View File

@@ -0,0 +1,35 @@
package anomaly
import (
"context"
"github.com/SigNoz/signoz/pkg/valuer"
)
type HourlyProvider struct {
BaseSeasonalProvider
}
var _ BaseProvider = (*HourlyProvider)(nil)
func (hp *HourlyProvider) GetBaseSeasonalProvider() *BaseSeasonalProvider {
return &hp.BaseSeasonalProvider
}
// NewHourlyProvider now uses the generic option type
func NewHourlyProvider(opts ...GenericProviderOption[*HourlyProvider]) *HourlyProvider {
hp := &HourlyProvider{
BaseSeasonalProvider: BaseSeasonalProvider{},
}
for _, opt := range opts {
opt(hp)
}
return hp
}
func (p *HourlyProvider) GetAnomalies(ctx context.Context, orgID valuer.UUID, req *AnomaliesRequest) (*AnomaliesResponse, error) {
req.Seasonality = SeasonalityHourly
return p.getAnomalies(ctx, orgID, req)
}

223
ee/anomaly/params.go Normal file
View File

@@ -0,0 +1,223 @@
package anomaly
import (
"time"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Seasonality struct{ valuer.String }
var (
SeasonalityHourly = Seasonality{valuer.NewString("hourly")}
SeasonalityDaily = Seasonality{valuer.NewString("daily")}
SeasonalityWeekly = Seasonality{valuer.NewString("weekly")}
)
var (
oneWeekOffset = uint64(24 * 7 * time.Hour.Milliseconds())
oneDayOffset = uint64(24 * time.Hour.Milliseconds())
oneHourOffset = uint64(time.Hour.Milliseconds())
fiveMinOffset = uint64(5 * time.Minute.Milliseconds())
)
func (s Seasonality) IsValid() bool {
switch s {
case SeasonalityHourly, SeasonalityDaily, SeasonalityWeekly:
return true
default:
return false
}
}
type AnomaliesRequest struct {
Params qbtypes.QueryRangeRequest
Seasonality Seasonality
}
type AnomaliesResponse struct {
Results []*qbtypes.TimeSeriesData
}
// anomalyParams is the params for anomaly detection
// prediction = avg(past_period_query) + avg(current_season_query) - mean(past_season_query, past2_season_query, past3_season_query)
//
// ^ ^
// | |
// (rounded value for past peiod) + (seasonal growth)
//
// score = abs(value - prediction) / stddev (current_season_query)
type anomalyQueryParams struct {
// CurrentPeriodQuery is the query range params for period user is looking at or eval window
// Example: (now-5m, now), (now-30m, now), (now-1h, now)
// The results obtained from this query are used to compare with predicted values
// and to detect anomalies
CurrentPeriodQuery qbtypes.QueryRangeRequest
// PastPeriodQuery is the query range params for past period of seasonality
// Example: For weekly seasonality, (now-1w-5m, now-1w)
// : For daily seasonality, (now-1d-5m, now-1d)
// : For hourly seasonality, (now-1h-5m, now-1h)
PastPeriodQuery qbtypes.QueryRangeRequest
// CurrentSeasonQuery is the query range params for current period (seasonal)
// Example: For weekly seasonality, this is the query range params for the (now-1w-5m, now)
// : For daily seasonality, this is the query range params for the (now-1d-5m, now)
// : For hourly seasonality, this is the query range params for the (now-1h-5m, now)
CurrentSeasonQuery qbtypes.QueryRangeRequest
// PastSeasonQuery is the query range params for past seasonal period to the current season
// Example: For weekly seasonality, this is the query range params for the (now-2w-5m, now-1w)
// : For daily seasonality, this is the query range params for the (now-2d-5m, now-1d)
// : For hourly seasonality, this is the query range params for the (now-2h-5m, now-1h)
PastSeasonQuery qbtypes.QueryRangeRequest
// Past2SeasonQuery is the query range params for past 2 seasonal period to the current season
// Example: For weekly seasonality, this is the query range params for the (now-3w-5m, now-2w)
// : For daily seasonality, this is the query range params for the (now-3d-5m, now-2d)
// : For hourly seasonality, this is the query range params for the (now-3h-5m, now-2h)
Past2SeasonQuery qbtypes.QueryRangeRequest
// Past3SeasonQuery is the query range params for past 3 seasonal period to the current season
// Example: For weekly seasonality, this is the query range params for the (now-4w-5m, now-3w)
// : For daily seasonality, this is the query range params for the (now-4d-5m, now-3d)
// : For hourly seasonality, this is the query range params for the (now-4h-5m, now-3h)
Past3SeasonQuery qbtypes.QueryRangeRequest
}
func prepareAnomalyQueryParams(req qbtypes.QueryRangeRequest, seasonality Seasonality) *anomalyQueryParams {
start := req.Start
end := req.End
currentPeriodQuery := qbtypes.QueryRangeRequest{
Start: start,
End: end,
RequestType: qbtypes.RequestTypeTimeSeries,
CompositeQuery: req.CompositeQuery,
NoCache: false,
}
var pastPeriodStart, pastPeriodEnd uint64
switch seasonality {
// for one week period, we fetch the data from the past week with 5 min offset
case SeasonalityWeekly:
pastPeriodStart = start - oneWeekOffset - fiveMinOffset
pastPeriodEnd = end - oneWeekOffset
// for one day period, we fetch the data from the past day with 5 min offset
case SeasonalityDaily:
pastPeriodStart = start - oneDayOffset - fiveMinOffset
pastPeriodEnd = end - oneDayOffset
// for one hour period, we fetch the data from the past hour with 5 min offset
case SeasonalityHourly:
pastPeriodStart = start - oneHourOffset - fiveMinOffset
pastPeriodEnd = end - oneHourOffset
}
pastPeriodQuery := qbtypes.QueryRangeRequest{
Start: pastPeriodStart,
End: pastPeriodEnd,
RequestType: qbtypes.RequestTypeTimeSeries,
CompositeQuery: req.CompositeQuery,
NoCache: false,
}
// seasonality growth trend
var currentGrowthPeriodStart, currentGrowthPeriodEnd uint64
switch seasonality {
case SeasonalityWeekly:
currentGrowthPeriodStart = start - oneWeekOffset
currentGrowthPeriodEnd = start
case SeasonalityDaily:
currentGrowthPeriodStart = start - oneDayOffset
currentGrowthPeriodEnd = start
case SeasonalityHourly:
currentGrowthPeriodStart = start - oneHourOffset
currentGrowthPeriodEnd = start
}
currentGrowthQuery := qbtypes.QueryRangeRequest{
Start: currentGrowthPeriodStart,
End: currentGrowthPeriodEnd,
RequestType: qbtypes.RequestTypeTimeSeries,
CompositeQuery: req.CompositeQuery,
NoCache: false,
}
var pastGrowthPeriodStart, pastGrowthPeriodEnd uint64
switch seasonality {
case SeasonalityWeekly:
pastGrowthPeriodStart = start - 2*oneWeekOffset
pastGrowthPeriodEnd = start - 1*oneWeekOffset
case SeasonalityDaily:
pastGrowthPeriodStart = start - 2*oneDayOffset
pastGrowthPeriodEnd = start - 1*oneDayOffset
case SeasonalityHourly:
pastGrowthPeriodStart = start - 2*oneHourOffset
pastGrowthPeriodEnd = start - 1*oneHourOffset
}
pastGrowthQuery := qbtypes.QueryRangeRequest{
Start: pastGrowthPeriodStart,
End: pastGrowthPeriodEnd,
RequestType: qbtypes.RequestTypeTimeSeries,
CompositeQuery: req.CompositeQuery,
NoCache: false,
}
var past2GrowthPeriodStart, past2GrowthPeriodEnd uint64
switch seasonality {
case SeasonalityWeekly:
past2GrowthPeriodStart = start - 3*oneWeekOffset
past2GrowthPeriodEnd = start - 2*oneWeekOffset
case SeasonalityDaily:
past2GrowthPeriodStart = start - 3*oneDayOffset
past2GrowthPeriodEnd = start - 2*oneDayOffset
case SeasonalityHourly:
past2GrowthPeriodStart = start - 3*oneHourOffset
past2GrowthPeriodEnd = start - 2*oneHourOffset
}
past2GrowthQuery := qbtypes.QueryRangeRequest{
Start: past2GrowthPeriodStart,
End: past2GrowthPeriodEnd,
RequestType: qbtypes.RequestTypeTimeSeries,
CompositeQuery: req.CompositeQuery,
NoCache: false,
}
var past3GrowthPeriodStart, past3GrowthPeriodEnd uint64
switch seasonality {
case SeasonalityWeekly:
past3GrowthPeriodStart = start - 4*oneWeekOffset
past3GrowthPeriodEnd = start - 3*oneWeekOffset
case SeasonalityDaily:
past3GrowthPeriodStart = start - 4*oneDayOffset
past3GrowthPeriodEnd = start - 3*oneDayOffset
case SeasonalityHourly:
past3GrowthPeriodStart = start - 4*oneHourOffset
past3GrowthPeriodEnd = start - 3*oneHourOffset
}
past3GrowthQuery := qbtypes.QueryRangeRequest{
Start: past3GrowthPeriodStart,
End: past3GrowthPeriodEnd,
RequestType: qbtypes.RequestTypeTimeSeries,
CompositeQuery: req.CompositeQuery,
NoCache: false,
}
return &anomalyQueryParams{
CurrentPeriodQuery: currentPeriodQuery,
PastPeriodQuery: pastPeriodQuery,
CurrentSeasonQuery: currentGrowthQuery,
PastSeasonQuery: pastGrowthQuery,
Past2SeasonQuery: past2GrowthQuery,
Past3SeasonQuery: past3GrowthQuery,
}
}
type anomalyQueryResults struct {
CurrentPeriodResults []*qbtypes.TimeSeriesData
PastPeriodResults []*qbtypes.TimeSeriesData
CurrentSeasonResults []*qbtypes.TimeSeriesData
PastSeasonResults []*qbtypes.TimeSeriesData
Past2SeasonResults []*qbtypes.TimeSeriesData
Past3SeasonResults []*qbtypes.TimeSeriesData
}

11
ee/anomaly/provider.go Normal file
View File

@@ -0,0 +1,11 @@
package anomaly
import (
"context"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Provider interface {
GetAnomalies(ctx context.Context, orgID valuer.UUID, req *AnomaliesRequest) (*AnomaliesResponse, error)
}

463
ee/anomaly/seasonal.go Normal file
View File

@@ -0,0 +1,463 @@
package anomaly
import (
"context"
"log/slog"
"math"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/valuer"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
var (
// TODO(srikanthccv): make this configurable?
movingAvgWindowSize = 7
)
// BaseProvider is an interface that includes common methods for all provider types
type BaseProvider interface {
GetBaseSeasonalProvider() *BaseSeasonalProvider
}
// GenericProviderOption is a generic type for provider options
type GenericProviderOption[T BaseProvider] func(T)
func WithQuerier[T BaseProvider](querier querier.Querier) GenericProviderOption[T] {
return func(p T) {
p.GetBaseSeasonalProvider().querier = querier
}
}
func WithLogger[T BaseProvider](logger *slog.Logger) GenericProviderOption[T] {
return func(p T) {
p.GetBaseSeasonalProvider().logger = logger
}
}
type BaseSeasonalProvider struct {
querier querier.Querier
logger *slog.Logger
}
func (p *BaseSeasonalProvider) getQueryParams(req *AnomaliesRequest) *anomalyQueryParams {
if !req.Seasonality.IsValid() {
req.Seasonality = SeasonalityDaily
}
return prepareAnomalyQueryParams(req.Params, req.Seasonality)
}
func (p *BaseSeasonalProvider) toTSResults(ctx context.Context, resp *qbtypes.QueryRangeResponse) []*qbtypes.TimeSeriesData {
if resp == nil || resp.Data == nil {
p.logger.InfoContext(ctx, "nil response from query range")
}
data, ok := resp.Data.(struct {
Results []any `json:"results"`
Warnings []string `json:"warnings"`
})
if !ok {
return nil
}
tsData := []*qbtypes.TimeSeriesData{}
for _, item := range data.Results {
if resultData, ok := item.(*qbtypes.TimeSeriesData); ok {
tsData = append(tsData, resultData)
}
}
return tsData
}
func (p *BaseSeasonalProvider) getResults(ctx context.Context, orgID valuer.UUID, params *anomalyQueryParams) (*anomalyQueryResults, error) {
// TODO(srikanthccv): parallelize this?
p.logger.InfoContext(ctx, "fetching results for current period", "anomaly.current_period_query", params.CurrentPeriodQuery)
currentPeriodResults, err := p.querier.QueryRange(ctx, orgID, &params.CurrentPeriodQuery)
if err != nil {
return nil, err
}
p.logger.InfoContext(ctx, "fetching results for past period", "anomaly.past_period_query", params.PastPeriodQuery)
pastPeriodResults, err := p.querier.QueryRange(ctx, orgID, &params.PastPeriodQuery)
if err != nil {
return nil, err
}
p.logger.InfoContext(ctx, "fetching results for current season", "anomaly.current_season_query", params.CurrentSeasonQuery)
currentSeasonResults, err := p.querier.QueryRange(ctx, orgID, &params.CurrentSeasonQuery)
if err != nil {
return nil, err
}
p.logger.InfoContext(ctx, "fetching results for past season", "anomaly.past_season_query", params.PastSeasonQuery)
pastSeasonResults, err := p.querier.QueryRange(ctx, orgID, &params.PastSeasonQuery)
if err != nil {
return nil, err
}
p.logger.InfoContext(ctx, "fetching results for past 2 season", "anomaly.past_2season_query", params.Past2SeasonQuery)
past2SeasonResults, err := p.querier.QueryRange(ctx, orgID, &params.Past2SeasonQuery)
if err != nil {
return nil, err
}
p.logger.InfoContext(ctx, "fetching results for past 3 season", "anomaly.past_3season_query", params.Past3SeasonQuery)
past3SeasonResults, err := p.querier.QueryRange(ctx, orgID, &params.Past3SeasonQuery)
if err != nil {
return nil, err
}
return &anomalyQueryResults{
CurrentPeriodResults: p.toTSResults(ctx, currentPeriodResults),
PastPeriodResults: p.toTSResults(ctx, pastPeriodResults),
CurrentSeasonResults: p.toTSResults(ctx, currentSeasonResults),
PastSeasonResults: p.toTSResults(ctx, pastSeasonResults),
Past2SeasonResults: p.toTSResults(ctx, past2SeasonResults),
Past3SeasonResults: p.toTSResults(ctx, past3SeasonResults),
}, nil
}
// getMatchingSeries gets the matching series from the query result
// for the given series
func (p *BaseSeasonalProvider) getMatchingSeries(_ context.Context, queryResult *qbtypes.TimeSeriesData, series *qbtypes.TimeSeries) *qbtypes.TimeSeries {
if queryResult == nil || len(queryResult.Aggregations) == 0 || len(queryResult.Aggregations[0].Series) == 0 {
return nil
}
for _, curr := range queryResult.Aggregations[0].Series {
currLabelsKey := qbtypes.GetUniqueSeriesKey(curr.Labels)
seriesLabelsKey := qbtypes.GetUniqueSeriesKey(series.Labels)
if currLabelsKey == seriesLabelsKey {
return curr
}
}
return nil
}
func (p *BaseSeasonalProvider) getAvg(series *qbtypes.TimeSeries) float64 {
if series == nil || len(series.Values) == 0 {
return 0
}
var sum float64
for _, smpl := range series.Values {
sum += smpl.Value
}
return sum / float64(len(series.Values))
}
func (p *BaseSeasonalProvider) getStdDev(series *qbtypes.TimeSeries) float64 {
if series == nil || len(series.Values) == 0 {
return 0
}
avg := p.getAvg(series)
var sum float64
for _, smpl := range series.Values {
sum += math.Pow(smpl.Value-avg, 2)
}
return math.Sqrt(sum / float64(len(series.Values)))
}
// getMovingAvg gets the moving average for the given series
// for the given window size and start index
func (p *BaseSeasonalProvider) getMovingAvg(series *qbtypes.TimeSeries, movingAvgWindowSize, startIdx int) float64 {
if series == nil || len(series.Values) == 0 {
return 0
}
if startIdx >= len(series.Values)-movingAvgWindowSize {
startIdx = int(math.Max(0, float64(len(series.Values)-movingAvgWindowSize)))
}
var sum float64
points := series.Values[startIdx:]
windowSize := int(math.Min(float64(movingAvgWindowSize), float64(len(points))))
for i := 0; i < windowSize; i++ {
sum += points[i].Value
}
avg := sum / float64(windowSize)
return avg
}
func (p *BaseSeasonalProvider) getMean(floats ...float64) float64 {
if len(floats) == 0 {
return 0
}
var sum float64
for _, f := range floats {
sum += f
}
return sum / float64(len(floats))
}
func (p *BaseSeasonalProvider) getPredictedSeries(
ctx context.Context,
series, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries *qbtypes.TimeSeries,
) *qbtypes.TimeSeries {
predictedSeries := &qbtypes.TimeSeries{
Labels: series.Labels,
Values: make([]*qbtypes.TimeSeriesValue, 0),
}
// for each point in the series, get the predicted value
// the predicted value is the moving average (with window size = 7) of the previous period series
// plus the average of the current season series
// minus the mean of the past season series, past2 season series and past3 season series
for idx, curr := range series.Values {
movingAvg := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx)
avg := p.getAvg(currentSeasonSeries)
mean := p.getMean(p.getAvg(pastSeasonSeries), p.getAvg(past2SeasonSeries), p.getAvg(past3SeasonSeries))
predictedValue := movingAvg + avg - mean
if predictedValue < 0 {
// this should not happen (except when the data has extreme outliers)
// we will use the moving avg of the previous period series in this case
p.logger.WarnContext(ctx, "predicted value is less than 0 for series", "anomaly.predicted_value", predictedValue, "anomaly.labels", series.Labels)
predictedValue = p.getMovingAvg(prevSeries, movingAvgWindowSize, idx)
}
p.logger.DebugContext(ctx, "predicted value for series",
"anomaly.moving_avg", movingAvg,
"anomaly.avg", avg,
"anomaly.mean", mean,
"anomaly.labels", series.Labels,
"anomaly.predicted_value", predictedValue,
"anomaly.curr", curr.Value,
)
predictedSeries.Values = append(predictedSeries.Values, &qbtypes.TimeSeriesValue{
Timestamp: curr.Timestamp,
Value: predictedValue,
})
}
return predictedSeries
}
// getBounds gets the upper and lower bounds for the given series
// for the given z score threshold
// moving avg of the previous period series + z score threshold * std dev of the series
// moving avg of the previous period series - z score threshold * std dev of the series
func (p *BaseSeasonalProvider) getBounds(
series, predictedSeries *qbtypes.TimeSeries,
zScoreThreshold float64,
) (*qbtypes.TimeSeries, *qbtypes.TimeSeries) {
upperBoundSeries := &qbtypes.TimeSeries{
Labels: series.Labels,
Values: make([]*qbtypes.TimeSeriesValue, 0),
}
lowerBoundSeries := &qbtypes.TimeSeries{
Labels: series.Labels,
Values: make([]*qbtypes.TimeSeriesValue, 0),
}
for idx, curr := range series.Values {
upperBound := p.getMovingAvg(predictedSeries, movingAvgWindowSize, idx) + zScoreThreshold*p.getStdDev(series)
lowerBound := p.getMovingAvg(predictedSeries, movingAvgWindowSize, idx) - zScoreThreshold*p.getStdDev(series)
upperBoundSeries.Values = append(upperBoundSeries.Values, &qbtypes.TimeSeriesValue{
Timestamp: curr.Timestamp,
Value: upperBound,
})
lowerBoundSeries.Values = append(lowerBoundSeries.Values, &qbtypes.TimeSeriesValue{
Timestamp: curr.Timestamp,
Value: math.Max(lowerBound, 0),
})
}
return upperBoundSeries, lowerBoundSeries
}
// getExpectedValue gets the expected value for the given series
// for the given index
// prevSeriesAvg + currentSeasonSeriesAvg - mean of past season series, past2 season series and past3 season series
func (p *BaseSeasonalProvider) getExpectedValue(
_, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries *qbtypes.TimeSeries, idx int,
) float64 {
prevSeriesAvg := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx)
currentSeasonSeriesAvg := p.getAvg(currentSeasonSeries)
pastSeasonSeriesAvg := p.getAvg(pastSeasonSeries)
past2SeasonSeriesAvg := p.getAvg(past2SeasonSeries)
past3SeasonSeriesAvg := p.getAvg(past3SeasonSeries)
return prevSeriesAvg + currentSeasonSeriesAvg - p.getMean(pastSeasonSeriesAvg, past2SeasonSeriesAvg, past3SeasonSeriesAvg)
}
// getScore gets the anomaly score for the given series
// for the given index
// (value - expectedValue) / std dev of the series
func (p *BaseSeasonalProvider) getScore(
series, prevSeries, weekSeries, weekPrevSeries, past2SeasonSeries, past3SeasonSeries *qbtypes.TimeSeries, value float64, idx int,
) float64 {
expectedValue := p.getExpectedValue(series, prevSeries, weekSeries, weekPrevSeries, past2SeasonSeries, past3SeasonSeries, idx)
if expectedValue < 0 {
expectedValue = p.getMovingAvg(prevSeries, movingAvgWindowSize, idx)
}
return (value - expectedValue) / p.getStdDev(weekSeries)
}
// getAnomalyScores gets the anomaly scores for the given series
// for the given index
// (value - expectedValue) / std dev of the series
func (p *BaseSeasonalProvider) getAnomalyScores(
series, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries *qbtypes.TimeSeries,
) *qbtypes.TimeSeries {
anomalyScoreSeries := &qbtypes.TimeSeries{
Labels: series.Labels,
Values: make([]*qbtypes.TimeSeriesValue, 0),
}
for idx, curr := range series.Values {
anomalyScore := p.getScore(series, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries, curr.Value, idx)
anomalyScoreSeries.Values = append(anomalyScoreSeries.Values, &qbtypes.TimeSeriesValue{
Timestamp: curr.Timestamp,
Value: anomalyScore,
})
}
return anomalyScoreSeries
}
func (p *BaseSeasonalProvider) getAnomalies(ctx context.Context, orgID valuer.UUID, req *AnomaliesRequest) (*AnomaliesResponse, error) {
anomalyParams := p.getQueryParams(req)
anomalyQueryResults, err := p.getResults(ctx, orgID, anomalyParams)
if err != nil {
return nil, err
}
currentPeriodResults := make(map[string]*qbtypes.TimeSeriesData)
for _, result := range anomalyQueryResults.CurrentPeriodResults {
currentPeriodResults[result.QueryName] = result
}
pastPeriodResults := make(map[string]*qbtypes.TimeSeriesData)
for _, result := range anomalyQueryResults.PastPeriodResults {
pastPeriodResults[result.QueryName] = result
}
currentSeasonResults := make(map[string]*qbtypes.TimeSeriesData)
for _, result := range anomalyQueryResults.CurrentSeasonResults {
currentSeasonResults[result.QueryName] = result
}
pastSeasonResults := make(map[string]*qbtypes.TimeSeriesData)
for _, result := range anomalyQueryResults.PastSeasonResults {
pastSeasonResults[result.QueryName] = result
}
past2SeasonResults := make(map[string]*qbtypes.TimeSeriesData)
for _, result := range anomalyQueryResults.Past2SeasonResults {
past2SeasonResults[result.QueryName] = result
}
past3SeasonResults := make(map[string]*qbtypes.TimeSeriesData)
for _, result := range anomalyQueryResults.Past3SeasonResults {
past3SeasonResults[result.QueryName] = result
}
for _, result := range currentPeriodResults {
funcs := req.Params.FuncsForQuery(result.QueryName)
var zScoreThreshold float64
for _, f := range funcs {
if f.Name == qbtypes.FunctionNameAnomaly {
for _, arg := range f.Args {
if arg.Name != "z_score_threshold" {
continue
}
value, ok := arg.Value.(float64)
if ok {
zScoreThreshold = value
} else {
p.logger.InfoContext(ctx, "z_score_threshold not provided, defaulting")
zScoreThreshold = 3
}
break
}
}
}
pastPeriodResult, ok := pastPeriodResults[result.QueryName]
if !ok {
continue
}
currentSeasonResult, ok := currentSeasonResults[result.QueryName]
if !ok {
continue
}
pastSeasonResult, ok := pastSeasonResults[result.QueryName]
if !ok {
continue
}
past2SeasonResult, ok := past2SeasonResults[result.QueryName]
if !ok {
continue
}
past3SeasonResult, ok := past3SeasonResults[result.QueryName]
if !ok {
continue
}
aggOfInterest := result.Aggregations[0]
for _, series := range aggOfInterest.Series {
stdDev := p.getStdDev(series)
p.logger.InfoContext(ctx, "calculated standard deviation for series", "anomaly.std_dev", stdDev, "anomaly.labels", series.Labels)
pastPeriodSeries := p.getMatchingSeries(ctx, pastPeriodResult, series)
currentSeasonSeries := p.getMatchingSeries(ctx, currentSeasonResult, series)
pastSeasonSeries := p.getMatchingSeries(ctx, pastSeasonResult, series)
past2SeasonSeries := p.getMatchingSeries(ctx, past2SeasonResult, series)
past3SeasonSeries := p.getMatchingSeries(ctx, past3SeasonResult, series)
prevSeriesAvg := p.getAvg(pastPeriodSeries)
currentSeasonSeriesAvg := p.getAvg(currentSeasonSeries)
pastSeasonSeriesAvg := p.getAvg(pastSeasonSeries)
past2SeasonSeriesAvg := p.getAvg(past2SeasonSeries)
past3SeasonSeriesAvg := p.getAvg(past3SeasonSeries)
p.logger.InfoContext(ctx, "calculated mean for series",
"anomaly.prev_series_gvg", prevSeriesAvg,
"anomaly.current_season_series_avg", currentSeasonSeriesAvg,
"anomaly.past_season_series_avg", pastSeasonSeriesAvg,
"anomaly.past_2season_series_avg", past2SeasonSeriesAvg,
"anomaly.past_3season_series_avg", past3SeasonSeriesAvg,
"anomaly.labels", series.Labels,
)
predictedSeries := p.getPredictedSeries(
ctx,
series,
pastPeriodSeries,
currentSeasonSeries,
pastSeasonSeries,
past2SeasonSeries,
past3SeasonSeries,
)
aggOfInterest.PredictedSeries = append(aggOfInterest.PredictedSeries, predictedSeries)
upperBoundSeries, lowerBoundSeries := p.getBounds(
series,
predictedSeries,
zScoreThreshold,
)
aggOfInterest.UpperBoundSeries = append(aggOfInterest.UpperBoundSeries, upperBoundSeries)
aggOfInterest.LowerBoundSeries = append(aggOfInterest.LowerBoundSeries, lowerBoundSeries)
anomalyScoreSeries := p.getAnomalyScores(
series,
pastPeriodSeries,
currentSeasonSeries,
pastSeasonSeries,
past2SeasonSeries,
past3SeasonSeries,
)
aggOfInterest.AnomalyScores = append(aggOfInterest.AnomalyScores, anomalyScoreSeries)
}
}
results := make([]*qbtypes.TimeSeriesData, 0, len(currentPeriodResults))
for _, result := range currentPeriodResults {
results = append(results, result)
}
return &AnomaliesResponse{
Results: results,
}, nil
}

34
ee/anomaly/weekly.go Normal file
View File

@@ -0,0 +1,34 @@
package anomaly
import (
"context"
"github.com/SigNoz/signoz/pkg/valuer"
)
type WeeklyProvider struct {
BaseSeasonalProvider
}
var _ BaseProvider = (*WeeklyProvider)(nil)
func (wp *WeeklyProvider) GetBaseSeasonalProvider() *BaseSeasonalProvider {
return &wp.BaseSeasonalProvider
}
func NewWeeklyProvider(opts ...GenericProviderOption[*WeeklyProvider]) *WeeklyProvider {
wp := &WeeklyProvider{
BaseSeasonalProvider: BaseSeasonalProvider{},
}
for _, opt := range opts {
opt(wp)
}
return wp
}
func (p *WeeklyProvider) GetAnomalies(ctx context.Context, orgID valuer.UUID, req *AnomaliesRequest) (*AnomaliesResponse, error) {
req.Seasonality = SeasonalityWeekly
return p.getAnomalies(ctx, orgID, req)
}

View File

@@ -59,7 +59,7 @@ func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler,
LicensingAPI: httplicensing.NewLicensingAPI(signoz.Licensing),
FieldsAPI: fields.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.TelemetryStore),
Signoz: signoz,
QuerierAPI: querierAPI.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.Querier),
QuerierAPI: querierAPI.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.Querier, signoz.Analytics),
})
if err != nil {
@@ -110,6 +110,9 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *middleware.AuthZ) {
// v4
router.HandleFunc("/api/v4/query_range", am.ViewAccess(ah.queryRangeV4)).Methods(http.MethodPost)
// v5
router.HandleFunc("/api/v5/query_range", am.ViewAccess(ah.queryRangeV5)).Methods(http.MethodPost)
// Gateway
router.PathPrefix(gateway.RoutePrefix).HandlerFunc(am.EditAccess(ah.ServeGatewayHTTP))

View File

@@ -2,11 +2,16 @@ package api
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"runtime/debug"
anomalyV2 "github.com/SigNoz/signoz/ee/anomaly"
"github.com/SigNoz/signoz/ee/query-service/anomaly"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
baseapp "github.com/SigNoz/signoz/pkg/query-service/app"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
@@ -15,6 +20,8 @@ import (
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
func (aH *APIHandler) queryRangeV4(w http.ResponseWriter, r *http.Request) {
@@ -136,3 +143,141 @@ func (aH *APIHandler) queryRangeV4(w http.ResponseWriter, r *http.Request) {
aH.QueryRangeV4(w, r)
}
}
func extractSeasonality(anomalyQuery *qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) anomalyV2.Seasonality {
for _, fn := range anomalyQuery.Functions {
if fn.Name == qbtypes.FunctionNameAnomaly {
for _, arg := range fn.Args {
if arg.Name == "seasonality" {
if seasonalityStr, ok := arg.Value.(string); ok {
switch seasonalityStr {
case "weekly":
return anomalyV2.SeasonalityWeekly
case "hourly":
return anomalyV2.SeasonalityHourly
}
}
}
}
}
}
return anomalyV2.SeasonalityDaily // default
}
func createAnomalyProvider(aH *APIHandler, seasonality anomalyV2.Seasonality) anomalyV2.Provider {
switch seasonality {
case anomalyV2.SeasonalityWeekly:
return anomalyV2.NewWeeklyProvider(
anomalyV2.WithQuerier[*anomalyV2.WeeklyProvider](aH.Signoz.Querier),
anomalyV2.WithLogger[*anomalyV2.WeeklyProvider](aH.Signoz.Instrumentation.Logger()),
)
case anomalyV2.SeasonalityHourly:
return anomalyV2.NewHourlyProvider(
anomalyV2.WithQuerier[*anomalyV2.HourlyProvider](aH.Signoz.Querier),
anomalyV2.WithLogger[*anomalyV2.HourlyProvider](aH.Signoz.Instrumentation.Logger()),
)
default:
return anomalyV2.NewDailyProvider(
anomalyV2.WithQuerier[*anomalyV2.DailyProvider](aH.Signoz.Querier),
anomalyV2.WithLogger[*anomalyV2.DailyProvider](aH.Signoz.Instrumentation.Logger()),
)
}
}
func (aH *APIHandler) handleAnomalyQuery(ctx context.Context, orgID valuer.UUID, anomalyQuery *qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], queryRangeRequest qbtypes.QueryRangeRequest) (*anomalyV2.AnomaliesResponse, error) {
seasonality := extractSeasonality(anomalyQuery)
provider := createAnomalyProvider(aH, seasonality)
return provider.GetAnomalies(ctx, orgID, &anomalyV2.AnomaliesRequest{Params: queryRangeRequest})
}
func (aH *APIHandler) queryRangeV5(rw http.ResponseWriter, req *http.Request) {
bodyBytes, err := io.ReadAll(req.Body)
if err != nil {
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "failed to read request body: %v", err))
return
}
req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
ctx := req.Context()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
var queryRangeRequest qbtypes.QueryRangeRequest
if err := json.NewDecoder(req.Body).Decode(&queryRangeRequest); err != nil {
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "failed to decode request body: %v", err))
return
}
defer func() {
if r := recover(); r != nil {
stackTrace := string(debug.Stack())
queryJSON, _ := json.Marshal(queryRangeRequest)
aH.Signoz.Instrumentation.Logger().ErrorContext(ctx, "panic in QueryRange",
"error", r,
"user", claims.UserID,
"payload", string(queryJSON),
"stacktrace", stackTrace,
)
render.Error(rw, errors.NewInternalf(
errors.CodeInternal,
"Something went wrong on our end. It's not you, it's us. Our team is notified about it. Reach out to support if issue persists.",
))
}
}()
if err := queryRangeRequest.Validate(); err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
if anomalyQuery, ok := queryRangeRequest.IsAnomalyRequest(); ok {
anomalies, err := aH.handleAnomalyQuery(ctx, orgID, anomalyQuery, queryRangeRequest)
if err != nil {
render.Error(rw, errors.NewInternalf(errors.CodeInternal, "failed to get anomalies: %v", err))
return
}
results := []any{}
for _, item := range anomalies.Results {
results = append(results, item)
}
finalResp := &qbtypes.QueryRangeResponse{
Type: queryRangeRequest.RequestType,
Data: struct {
Results []any `json:"results"`
Warnings []string `json:"warnings"`
}{
Results: results,
Warnings: make([]string, 0), // TODO(srikanthccv): will there be any warnings here?
},
Meta: struct {
RowsScanned uint64 `json:"rowsScanned"`
BytesScanned uint64 `json:"bytesScanned"`
DurationMS uint64 `json:"durationMs"`
}{},
}
render.Success(rw, http.StatusOK, finalResp)
return
} else {
// regular query range request, let the querier handle it
req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
aH.QuerierAPI.QueryRange(rw, req)
}
}

View File

@@ -3,6 +3,7 @@ package app
import (
"context"
"fmt"
"log/slog"
"net"
"net/http"
_ "net/http/pprof" // http profiler
@@ -18,6 +19,7 @@ import (
"github.com/SigNoz/signoz/pkg/http/middleware"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
@@ -104,6 +106,8 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
signoz.TelemetryStore,
signoz.Prometheus,
signoz.Modules.OrgGetter,
signoz.Querier,
signoz.Instrumentation.Logger(),
)
if err != nil {
@@ -421,6 +425,8 @@ func makeRulesManager(
telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus,
orgGetter organization.Getter,
querier querier.Querier,
logger *slog.Logger,
) (*baserules.Manager, error) {
// create manager opts
managerOpts := &baserules.ManagerOptions{
@@ -429,6 +435,8 @@ func makeRulesManager(
Context: context.Background(),
Logger: zap.L(),
Reader: ch,
Querier: querier,
SLogger: logger,
Cache: cache,
EvalDelay: baseconst.GetEvalDelay(),
PrepareTaskFunc: rules.PrepareTaskFunc,

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"math"
"strings"
"sync"
@@ -15,6 +16,7 @@ import (
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/transition"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -30,6 +32,11 @@ import (
baserules "github.com/SigNoz/signoz/pkg/query-service/rules"
querierV5 "github.com/SigNoz/signoz/pkg/querier"
anomalyV2 "github.com/SigNoz/signoz/ee/anomaly"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
yaml "gopkg.in/yaml.v2"
)
@@ -47,7 +54,13 @@ type AnomalyRule struct {
// querierV2 is used for alerts created after the introduction of new metrics query builder
querierV2 interfaces.Querier
provider anomaly.Provider
// querierV5 is used for alerts migrated after the introduction of new query builder
querierV5 querierV5.Querier
provider anomaly.Provider
providerV2 anomalyV2.Provider
version string
seasonality anomaly.Seasonality
}
@@ -57,6 +70,8 @@ func NewAnomalyRule(
orgID valuer.UUID,
p *ruletypes.PostableRule,
reader interfaces.Reader,
querierV5 querierV5.Querier,
logger *slog.Logger,
cache cache.Cache,
opts ...baserules.RuleOption,
) (*AnomalyRule, error) {
@@ -117,6 +132,26 @@ func NewAnomalyRule(
anomaly.WithReader[*anomaly.WeeklyProvider](reader),
)
}
if t.seasonality == anomaly.SeasonalityHourly {
t.providerV2 = anomalyV2.NewHourlyProvider(
anomalyV2.WithQuerier[*anomalyV2.HourlyProvider](querierV5),
anomalyV2.WithLogger[*anomalyV2.HourlyProvider](logger),
)
} else if t.seasonality == anomaly.SeasonalityDaily {
t.providerV2 = anomalyV2.NewDailyProvider(
anomalyV2.WithQuerier[*anomalyV2.DailyProvider](querierV5),
anomalyV2.WithLogger[*anomalyV2.DailyProvider](logger),
)
} else if t.seasonality == anomaly.SeasonalityWeekly {
t.providerV2 = anomalyV2.NewWeeklyProvider(
anomalyV2.WithQuerier[*anomalyV2.WeeklyProvider](querierV5),
anomalyV2.WithLogger[*anomalyV2.WeeklyProvider](logger),
)
}
t.querierV5 = querierV5
t.version = p.Version
return &t, nil
}
@@ -156,6 +191,26 @@ func (r *AnomalyRule) prepareQueryRange(ts time.Time) (*v3.QueryRangeParamsV3, e
}, nil
}
func (r *AnomalyRule) prepareQueryRangeV5(ts time.Time) (*qbtypes.QueryRangeRequest, error) {
zap.L().Info("prepareQueryRangeV5", zap.Int64("ts", ts.UnixMilli()), zap.Int64("evalWindow", r.EvalWindow().Milliseconds()), zap.Int64("evalDelay", r.EvalDelay().Milliseconds()))
startTs, endTs := r.Timestamps(ts)
start, end := startTs.UnixMilli(), endTs.UnixMilli()
req := &qbtypes.QueryRangeRequest{
Start: uint64(start),
End: uint64(end),
RequestType: qbtypes.RequestTypeTimeSeries,
CompositeQuery: qbtypes.CompositeQuery{
Queries: make([]qbtypes.QueryEnvelope, 0),
},
NoCache: true,
}
copy(r.Condition().CompositeQuery.Queries, req.CompositeQuery.Queries)
return req, nil
}
func (r *AnomalyRule) GetSelectedQuery() string {
return r.Condition().GetSelectedQueryName()
}
@@ -201,13 +256,61 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
return resultVector, nil
}
func (r *AnomalyRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID, ts time.Time) (ruletypes.Vector, error) {
params, err := r.prepareQueryRangeV5(ts)
if err != nil {
return nil, err
}
anomalies, err := r.providerV2.GetAnomalies(ctx, orgID, &anomalyV2.AnomaliesRequest{
Params: *params,
Seasonality: anomalyV2.Seasonality{String: valuer.NewString(r.seasonality.String())},
})
if err != nil {
return nil, err
}
var qbResult *qbtypes.TimeSeriesData
for _, result := range anomalies.Results {
if result.QueryName == r.GetSelectedQuery() {
qbResult = result
break
}
}
queryResult := transition.ConvertV5TimeSeriesDataToV4Result(qbResult)
var resultVector ruletypes.Vector
scoresJSON, _ := json.Marshal(queryResult.AnomalyScores)
zap.L().Info("anomaly scores", zap.String("scores", string(scoresJSON)))
for _, series := range queryResult.AnomalyScores {
smpl, shouldAlert := r.ShouldAlert(*series)
if shouldAlert {
resultVector = append(resultVector, smpl)
}
}
return resultVector, nil
}
func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) {
prevState := r.State()
valueFormatter := formatter.FromUnit(r.Unit())
res, err := r.buildAndRunQuery(ctx, r.OrgID(), ts)
var res ruletypes.Vector
var err error
if r.version == "v5" {
zap.L().Info("running v5 query")
res, err = r.buildAndRunQueryV5(ctx, r.OrgID(), ts)
} else {
zap.L().Info("running v4 query")
res, err = r.buildAndRunQuery(ctx, r.OrgID(), ts)
}
if err != nil {
return nil, err
}

View File

@@ -27,6 +27,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.OrgID,
opts.Rule,
opts.Reader,
opts.Querier,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithSQLStore(opts.SQLStore),
)
@@ -69,6 +70,8 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.OrgID,
opts.Rule,
opts.Reader,
opts.Querier,
opts.SLogger,
opts.Cache,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithSQLStore(opts.SQLStore),
@@ -126,6 +129,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
opts.OrgID,
parsedRule,
opts.Reader,
opts.Querier,
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),
@@ -162,6 +166,8 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
opts.OrgID,
parsedRule,
opts.Reader,
opts.Querier,
opts.SLogger,
opts.Cache,
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),

2
go.mod
View File

@@ -70,6 +70,7 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.39.0
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac
golang.org/x/net v0.41.0
golang.org/x/oauth2 v0.30.0
golang.org/x/sync v0.15.0
golang.org/x/text v0.26.0
@@ -283,7 +284,6 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/mod v0.25.0 // indirect
golang.org/x/net v0.41.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/time v0.11.0 // indirect
golang.org/x/tools v0.33.0 // indirect

View File

@@ -0,0 +1,448 @@
package contextlinks
import (
"fmt"
"slices"
"strings"
parser "github.com/SigNoz/signoz/pkg/parser/grammar"
"github.com/antlr4-go/antlr/v4"
"golang.org/x/exp/maps"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type WhereClauseRewriter struct {
parser.BaseFilterQueryVisitor
labels map[string]string
groupByItems []qbtypes.GroupByKey
groupBySet map[string]struct{}
keysSeen map[string]struct{}
rewritten strings.Builder
}
// PrepareFilterExpression prepares the where clause for the query
// `labels` contains the key value pairs of the labels from the result of the query
// We "visit" the where clause and make necessary changes to existing query
// There are two cases:
// 1. The label is present in the where clause
// 2. The label is not present in the where clause
//
// Example for case 2:
// Latency by service.name without any filter
// In this case, for each service with latency > threshold we send a notification
// The expectation is that clicking on the related traces for service A, will
// take us to the traces page with the filter service.name=A
// So for all the missing labels in the where clause, we add them as key = value
//
// Example for case 1:
// Severity text IN (WARN, ERROR)
// In this case, the Severity text will appear in the `labels` if it were part of the group
// by clause, in which case we replace it with the actual value for the notification
// i.e Severity text = WARN
// If the Severity text is not part of the group by clause, then we add it as it is
func PrepareFilterExpression(labels map[string]string, whereClause string, groupByItems []qbtypes.GroupByKey) string {
if whereClause == "" && len(labels) == 0 {
return ""
}
groupBySet := make(map[string]struct{})
for _, item := range groupByItems {
groupBySet[item.Name] = struct{}{}
}
input := antlr.NewInputStream(whereClause)
lexer := parser.NewFilterQueryLexer(input)
stream := antlr.NewCommonTokenStream(lexer, 0)
parser := parser.NewFilterQueryParser(stream)
tree := parser.Query()
rewriter := &WhereClauseRewriter{
labels: labels,
groupByItems: groupByItems,
groupBySet: groupBySet,
keysSeen: map[string]struct{}{},
}
// visit the tree to rewrite the where clause
rewriter.Visit(tree)
rewrittenClause := strings.TrimSpace(rewriter.rewritten.String())
// sorted key for deterministic order
sortedKeys := maps.Keys(labels)
slices.Sort(sortedKeys)
// case 2: add missing labels from the labels map
missingLabels := []string{}
for _, key := range sortedKeys {
if !rewriter.isKeyInWhereClause(key) {
// escape the value if it contains special characters or spaces
escapedValue := escapeValueIfNeeded(labels[key])
missingLabels = append(missingLabels, fmt.Sprintf("%s=%s", key, escapedValue))
}
}
// combine
if len(missingLabels) > 0 {
if rewrittenClause != "" {
rewrittenClause = fmt.Sprintf("(%s) AND %s", rewrittenClause, strings.Join(missingLabels, " AND "))
} else {
rewrittenClause = strings.Join(missingLabels, " AND ")
}
}
return rewrittenClause
}
// Visit implements the visitor for the query rule
func (r *WhereClauseRewriter) Visit(tree antlr.ParseTree) interface{} {
return tree.Accept(r)
}
// VisitQuery visits the query node
func (r *WhereClauseRewriter) VisitQuery(ctx *parser.QueryContext) interface{} {
if ctx.Expression() != nil {
ctx.Expression().Accept(r)
}
return nil
}
// VisitExpression visits the expression node
func (r *WhereClauseRewriter) VisitExpression(ctx *parser.ExpressionContext) interface{} {
if ctx.OrExpression() != nil {
ctx.OrExpression().Accept(r)
}
return nil
}
// VisitOrExpression visits OR expressions
func (r *WhereClauseRewriter) VisitOrExpression(ctx *parser.OrExpressionContext) interface{} {
for i, andExpr := range ctx.AllAndExpression() {
if i > 0 {
r.rewritten.WriteString(" OR ")
}
andExpr.Accept(r)
}
return nil
}
// VisitAndExpression visits AND expressions
func (r *WhereClauseRewriter) VisitAndExpression(ctx *parser.AndExpressionContext) interface{} {
unaryExprs := ctx.AllUnaryExpression()
for i, unaryExpr := range unaryExprs {
if i > 0 {
// Check if there's an explicit AND
if i-1 < len(ctx.AllAND()) && ctx.AND(i-1) != nil {
r.rewritten.WriteString(" AND ")
} else {
// implicit
r.rewritten.WriteString(" AND ")
}
}
unaryExpr.Accept(r)
}
return nil
}
// VisitUnaryExpression visits unary expressions (with optional NOT)
func (r *WhereClauseRewriter) VisitUnaryExpression(ctx *parser.UnaryExpressionContext) interface{} {
if ctx.NOT() != nil {
r.rewritten.WriteString("NOT ")
}
if ctx.Primary() != nil {
ctx.Primary().Accept(r)
}
return nil
}
// VisitPrimary visits primary expressions
func (r *WhereClauseRewriter) VisitPrimary(ctx *parser.PrimaryContext) interface{} {
if ctx.LPAREN() != nil && ctx.RPAREN() != nil {
r.rewritten.WriteString("(")
if ctx.OrExpression() != nil {
ctx.OrExpression().Accept(r)
}
r.rewritten.WriteString(")")
} else if ctx.Comparison() != nil {
ctx.Comparison().Accept(r)
} else if ctx.FunctionCall() != nil {
ctx.FunctionCall().Accept(r)
} else if ctx.FullText() != nil {
ctx.FullText().Accept(r)
} else if ctx.Key() != nil {
ctx.Key().Accept(r)
} else if ctx.Value() != nil {
ctx.Value().Accept(r)
}
return nil
}
// VisitComparison visits comparison expressions
func (r *WhereClauseRewriter) VisitComparison(ctx *parser.ComparisonContext) interface{} {
if ctx.Key() == nil {
return nil
}
key := ctx.Key().GetText()
r.keysSeen[key] = struct{}{}
// Check if this key is in the labels and was part of group by
if value, exists := r.labels[key]; exists {
if _, partOfGroup := r.groupBySet[key]; partOfGroup {
// Case 1: Replace with actual value
escapedValue := escapeValueIfNeeded(value)
r.rewritten.WriteString(fmt.Sprintf("%s=%s", key, escapedValue))
return nil
}
}
// Otherwise, keep the original comparison
r.rewritten.WriteString(key)
if ctx.EQUALS() != nil {
r.rewritten.WriteString("=")
if ctx.Value(0) != nil {
r.rewritten.WriteString(ctx.Value(0).GetText())
}
} else if ctx.NOT_EQUALS() != nil || ctx.NEQ() != nil {
if ctx.NOT_EQUALS() != nil {
r.rewritten.WriteString("!=")
} else {
r.rewritten.WriteString("<>")
}
if ctx.Value(0) != nil {
r.rewritten.WriteString(ctx.Value(0).GetText())
}
} else if ctx.LT() != nil {
r.rewritten.WriteString("<")
if ctx.Value(0) != nil {
r.rewritten.WriteString(ctx.Value(0).GetText())
}
} else if ctx.LE() != nil {
r.rewritten.WriteString("<=")
if ctx.Value(0) != nil {
r.rewritten.WriteString(ctx.Value(0).GetText())
}
} else if ctx.GT() != nil {
r.rewritten.WriteString(">")
if ctx.Value(0) != nil {
r.rewritten.WriteString(ctx.Value(0).GetText())
}
} else if ctx.GE() != nil {
r.rewritten.WriteString(">=")
if ctx.Value(0) != nil {
r.rewritten.WriteString(ctx.Value(0).GetText())
}
} else if ctx.LIKE() != nil || ctx.ILIKE() != nil {
if ctx.LIKE() != nil {
r.rewritten.WriteString(" LIKE ")
} else {
r.rewritten.WriteString(" ILIKE ")
}
if ctx.Value(0) != nil {
r.rewritten.WriteString(ctx.Value(0).GetText())
}
} else if ctx.NOT_LIKE() != nil || ctx.NOT_ILIKE() != nil {
if ctx.NOT_LIKE() != nil {
r.rewritten.WriteString(" NOT LIKE ")
} else {
r.rewritten.WriteString(" NOT ILIKE ")
}
if ctx.Value(0) != nil {
r.rewritten.WriteString(ctx.Value(0).GetText())
}
} else if ctx.BETWEEN() != nil {
if ctx.NOT() != nil {
r.rewritten.WriteString(" NOT BETWEEN ")
} else {
r.rewritten.WriteString(" BETWEEN ")
}
if len(ctx.AllValue()) >= 2 {
r.rewritten.WriteString(ctx.Value(0).GetText())
r.rewritten.WriteString(" AND ")
r.rewritten.WriteString(ctx.Value(1).GetText())
}
} else if ctx.InClause() != nil {
r.rewritten.WriteString(" ")
ctx.InClause().Accept(r)
} else if ctx.NotInClause() != nil {
r.rewritten.WriteString(" ")
ctx.NotInClause().Accept(r)
} else if ctx.EXISTS() != nil {
if ctx.NOT() != nil {
r.rewritten.WriteString(" NOT EXISTS")
} else {
r.rewritten.WriteString(" EXISTS")
}
} else if ctx.REGEXP() != nil {
if ctx.NOT() != nil {
r.rewritten.WriteString(" NOT REGEXP ")
} else {
r.rewritten.WriteString(" REGEXP ")
}
if ctx.Value(0) != nil {
r.rewritten.WriteString(ctx.Value(0).GetText())
}
} else if ctx.CONTAINS() != nil {
if ctx.NOT() != nil {
r.rewritten.WriteString(" NOT CONTAINS ")
} else {
r.rewritten.WriteString(" CONTAINS ")
}
if ctx.Value(0) != nil {
r.rewritten.WriteString(ctx.Value(0).GetText())
}
}
return nil
}
// VisitInClause visits IN clauses
func (r *WhereClauseRewriter) VisitInClause(ctx *parser.InClauseContext) interface{} {
r.rewritten.WriteString("IN ")
if ctx.LPAREN() != nil {
r.rewritten.WriteString("(")
if ctx.ValueList() != nil {
ctx.ValueList().Accept(r)
}
r.rewritten.WriteString(")")
} else if ctx.LBRACK() != nil {
r.rewritten.WriteString("[")
if ctx.ValueList() != nil {
ctx.ValueList().Accept(r)
}
r.rewritten.WriteString("]")
} else if ctx.Value() != nil {
r.rewritten.WriteString(ctx.Value().GetText())
}
return nil
}
// VisitNotInClause visits NOT IN clauses
func (r *WhereClauseRewriter) VisitNotInClause(ctx *parser.NotInClauseContext) interface{} {
r.rewritten.WriteString("NOT IN ")
if ctx.LPAREN() != nil {
r.rewritten.WriteString("(")
if ctx.ValueList() != nil {
ctx.ValueList().Accept(r)
}
r.rewritten.WriteString(")")
} else if ctx.LBRACK() != nil {
r.rewritten.WriteString("[")
if ctx.ValueList() != nil {
ctx.ValueList().Accept(r)
}
r.rewritten.WriteString("]")
} else if ctx.Value() != nil {
r.rewritten.WriteString(ctx.Value().GetText())
}
return nil
}
// VisitValueList visits value lists
func (r *WhereClauseRewriter) VisitValueList(ctx *parser.ValueListContext) interface{} {
values := ctx.AllValue()
for i, val := range values {
if i > 0 {
r.rewritten.WriteString(", ")
}
r.rewritten.WriteString(val.GetText())
}
return nil
}
// VisitFullText visits full text expressions
func (r *WhereClauseRewriter) VisitFullText(ctx *parser.FullTextContext) interface{} {
r.rewritten.WriteString(ctx.GetText())
return nil
}
// VisitFunctionCall visits function calls
func (r *WhereClauseRewriter) VisitFunctionCall(ctx *parser.FunctionCallContext) interface{} {
// Write function name
if ctx.HAS() != nil {
r.rewritten.WriteString("has")
} else if ctx.HASANY() != nil {
r.rewritten.WriteString("hasany")
} else if ctx.HASALL() != nil {
r.rewritten.WriteString("hasall")
}
r.rewritten.WriteString("(")
if ctx.FunctionParamList() != nil {
ctx.FunctionParamList().Accept(r)
}
r.rewritten.WriteString(")")
return nil
}
// VisitFunctionParamList visits function parameter lists
func (r *WhereClauseRewriter) VisitFunctionParamList(ctx *parser.FunctionParamListContext) interface{} {
params := ctx.AllFunctionParam()
for i, param := range params {
if i > 0 {
r.rewritten.WriteString(", ")
}
param.Accept(r)
}
return nil
}
// VisitFunctionParam visits function parameters
func (r *WhereClauseRewriter) VisitFunctionParam(ctx *parser.FunctionParamContext) interface{} {
if ctx.Key() != nil {
ctx.Key().Accept(r)
} else if ctx.Value() != nil {
ctx.Value().Accept(r)
} else if ctx.Array() != nil {
ctx.Array().Accept(r)
}
return nil
}
// VisitArray visits array expressions
func (r *WhereClauseRewriter) VisitArray(ctx *parser.ArrayContext) interface{} {
r.rewritten.WriteString("[")
if ctx.ValueList() != nil {
ctx.ValueList().Accept(r)
}
r.rewritten.WriteString("]")
return nil
}
// VisitValue visits value expressions
func (r *WhereClauseRewriter) VisitValue(ctx *parser.ValueContext) interface{} {
r.rewritten.WriteString(ctx.GetText())
return nil
}
// VisitKey visits key expressions
func (r *WhereClauseRewriter) VisitKey(ctx *parser.KeyContext) interface{} {
r.keysSeen[ctx.GetText()] = struct{}{}
r.rewritten.WriteString(ctx.GetText())
return nil
}
func (r *WhereClauseRewriter) isKeyInWhereClause(key string) bool {
_, ok := r.keysSeen[key]
return ok
}
// escapeValueIfNeeded adds single quotes to string values and escapes single quotes within them
// Numeric and boolean values are returned as-is
func escapeValueIfNeeded(value string) string {
// Check if it's a number
if _, err := fmt.Sscanf(value, "%f", new(float64)); err == nil {
return value
}
// Check if it's a boolean
if strings.ToLower(value) == "true" || strings.ToLower(value) == "false" {
return value
}
// For all other values (strings), escape single quotes and wrap in single quotes
escaped := strings.ReplaceAll(value, "'", "\\'")
return fmt.Sprintf("'%s'", escaped)
}

View File

@@ -0,0 +1,260 @@
package contextlinks
import (
"testing"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/stretchr/testify/assert"
)
func TestPrepareFiltersV5(t *testing.T) {
tests := []struct {
name string
labels map[string]string
whereClause string
groupByItems []qbtypes.GroupByKey
expected string
description string
}{
{
name: "empty_inputs",
labels: map[string]string{},
whereClause: "",
groupByItems: []qbtypes.GroupByKey{},
expected: "",
description: "Should return empty string for empty inputs",
},
{
name: "no_label_replacement",
labels: map[string]string{},
whereClause: "service.name = 'serviceB'",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
expected: "service.name='serviceB'",
description: "No change",
},
{
name: "in_clause_replacement",
labels: map[string]string{
"severity_text": "WARN",
},
whereClause: "severity_text IN ('WARN', 'ERROR')",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "severity_text"}},
},
expected: "severity_text='WARN'",
description: "Should replace IN clause with actual value when key is in group by",
},
{
name: "missing_label_addition", // case 2
labels: map[string]string{
"service.name": "serviceA",
},
whereClause: "status_code > 400",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
expected: "(status_code>400) AND service.name='serviceA'",
description: "Should add missing labels from labels map",
},
{
name: "multiple_missing_labels",
labels: map[string]string{
"service.name": "serviceA",
"region": "us-east-1",
},
whereClause: "status_code > 400",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "region"}},
},
expected: "(status_code>400) AND region='us-east-1' AND service.name='serviceA'",
description: "Should add all missing labels",
},
{
name: "complex_where_clause",
labels: map[string]string{
"service.name": "serviceA",
},
whereClause: "(status_code > 400 OR status_code < 200) AND method = 'GET'",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
expected: "((status_code>400 OR status_code<200) AND method='GET') AND service.name='serviceA'",
description: "Should preserve complex boolean logic and add missing labels",
},
{
name: "label_not_in_group_by",
labels: map[string]string{
"service.name": "serviceA",
},
whereClause: "service.name = 'serviceB'",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "region"}}, // service.name not in group by
},
expected: "service.name='serviceB'",
description: "Should not replace label if not in group by items",
},
{
name: "special_characters_in_values",
labels: map[string]string{
"message": "Error: Connection failed",
"path": "/api/v1/users",
},
whereClause: "",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "message"}},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "path"}},
},
expected: "message='Error: Connection failed' AND path='/api/v1/users'",
description: "Should quote values with special characters",
},
{
name: "numeric_and_boolean_values",
labels: map[string]string{
"count": "42",
"isEnabled": "true",
},
whereClause: "",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "count"}},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "isEnabled"}},
},
expected: "count=42 AND isEnabled=true",
description: "Should not quote numeric and boolean values",
},
{
name: "like_operator",
labels: map[string]string{
"path": "/api/users",
},
whereClause: "path LIKE '/api%'",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "path"}},
},
expected: "path='/api/users'",
description: "Should replace LIKE comparisons when key is in group by",
},
{
name: "not_operators",
labels: map[string]string{
"status": "active",
},
whereClause: "status NOT IN ('deleted', 'archived')",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "status"}},
},
expected: "status='active'",
description: "Should replace NOT IN clause when key is in group by",
},
{
name: "between_operator",
labels: map[string]string{
"response_time": "250",
},
whereClause: "response_time BETWEEN 100 AND 500",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "response_time"}},
},
expected: "response_time=250",
description: "Should replace BETWEEN clause when key is in group by",
},
{
name: "function_calls",
labels: map[string]string{
"service.name": "serviceA",
},
whereClause: "has(tags, 'production')",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
expected: "(has(tags, 'production')) AND service.name='serviceA'",
description: "Should preserve function calls and add missing labels",
},
{
name: "already_quoted_values",
labels: map[string]string{
"message": "\"Error message\"",
"tag": "'production'",
},
whereClause: "",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "message"}},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "tag"}},
},
expected: "message='\"Error message\"' AND tag='\\'production\\''",
description: "Should not double-quote already quoted values",
},
{
name: "mixed_replacement_and_addition",
labels: map[string]string{
"service.name": "serviceA",
"severity_text": "ERROR",
"region": "us-west-2",
},
whereClause: "severity_text IN ('WARN', 'ERROR') AND status_code > 400",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "severity_text"}},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "region"}},
},
expected: "(severity_text='ERROR' AND status_code>400) AND region='us-west-2' AND service.name='serviceA'",
description: "Should both replace existing labels and add missing ones",
},
{
name: "implicit_and_handling",
labels: map[string]string{
"env": "production",
},
whereClause: "status_code=200 method='GET'", // implicit AND
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
},
expected: "(status_code=200 AND method='GET') AND env='production'",
description: "Should handle implicit AND between expressions",
},
{
name: "exists_operator",
labels: map[string]string{
"service.name": "serviceA",
},
whereClause: "error_details EXISTS",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
expected: "(error_details EXISTS) AND service.name='serviceA'",
description: "Should preserve EXISTS operator",
},
{
name: "empty_where_clause_with_labels",
labels: map[string]string{
"service.name": "serviceA",
"region": "us-east-1",
},
whereClause: "",
groupByItems: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "region"}},
},
expected: "region='us-east-1' AND service.name='serviceA'",
description: "Should create where clause from labels when original is empty",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := PrepareFilterExpression(tt.labels, tt.whereClause, tt.groupByItems)
assert.Equal(t, tt.expected, result, tt.description)
})
}
}

View File

@@ -14,13 +14,13 @@ import (
func PrepareLinksToTraces(start, end time.Time, filterItems []v3.FilterItem) string {
// Traces list view expects time in nanoseconds
tr := v3.URLShareableTimeRange{
tr := URLShareableTimeRange{
Start: start.UnixNano(),
End: end.UnixNano(),
PageSize: 100,
}
options := v3.URLShareableOptions{
options := URLShareableOptions{
MaxLines: 2,
Format: "list",
SelectColumns: tracesV3.TracesListViewDefaultSelectedColumns,
@@ -29,32 +29,34 @@ func PrepareLinksToTraces(start, end time.Time, filterItems []v3.FilterItem) str
period, _ := json.Marshal(tr)
urlEncodedTimeRange := url.QueryEscape(string(period))
builderQuery := v3.BuilderQuery{
DataSource: v3.DataSourceTraces,
QueryName: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
AggregateAttribute: v3.AttributeKey{},
Filters: &v3.FilterSet{
Items: filterItems,
Operator: "AND",
},
Expression: "A",
Disabled: false,
Having: []v3.Having{},
StepInterval: 60,
OrderBy: []v3.OrderBy{
{
ColumnName: "timestamp",
Order: "desc",
linkQuery := LinkQuery{
BuilderQuery: v3.BuilderQuery{
DataSource: v3.DataSourceTraces,
QueryName: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
AggregateAttribute: v3.AttributeKey{},
Filters: &v3.FilterSet{
Items: filterItems,
Operator: "AND",
},
Expression: "A",
Disabled: false,
Having: []v3.Having{},
StepInterval: 60,
OrderBy: []v3.OrderBy{
{
ColumnName: "timestamp",
Order: "desc",
},
},
},
}
urlData := v3.URLShareableCompositeQuery{
urlData := URLShareableCompositeQuery{
QueryType: string(v3.QueryTypeBuilder),
Builder: v3.URLShareableBuilderQuery{
QueryData: []v3.BuilderQuery{
builderQuery,
Builder: URLShareableBuilderQuery{
QueryData: []LinkQuery{
linkQuery,
},
QueryFormulas: make([]string, 0),
},
@@ -72,13 +74,13 @@ func PrepareLinksToTraces(start, end time.Time, filterItems []v3.FilterItem) str
func PrepareLinksToLogs(start, end time.Time, filterItems []v3.FilterItem) string {
// Logs list view expects time in milliseconds
tr := v3.URLShareableTimeRange{
tr := URLShareableTimeRange{
Start: start.UnixMilli(),
End: end.UnixMilli(),
PageSize: 100,
}
options := v3.URLShareableOptions{
options := URLShareableOptions{
MaxLines: 2,
Format: "list",
SelectColumns: []v3.AttributeKey{},
@@ -87,32 +89,34 @@ func PrepareLinksToLogs(start, end time.Time, filterItems []v3.FilterItem) strin
period, _ := json.Marshal(tr)
urlEncodedTimeRange := url.QueryEscape(string(period))
builderQuery := v3.BuilderQuery{
DataSource: v3.DataSourceLogs,
QueryName: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
AggregateAttribute: v3.AttributeKey{},
Filters: &v3.FilterSet{
Items: filterItems,
Operator: "AND",
},
Expression: "A",
Disabled: false,
Having: []v3.Having{},
StepInterval: 60,
OrderBy: []v3.OrderBy{
{
ColumnName: "timestamp",
Order: "desc",
linkQuery := LinkQuery{
BuilderQuery: v3.BuilderQuery{
DataSource: v3.DataSourceLogs,
QueryName: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
AggregateAttribute: v3.AttributeKey{},
Filters: &v3.FilterSet{
Items: filterItems,
Operator: "AND",
},
Expression: "A",
Disabled: false,
Having: []v3.Having{},
StepInterval: 60,
OrderBy: []v3.OrderBy{
{
ColumnName: "timestamp",
Order: "desc",
},
},
},
}
urlData := v3.URLShareableCompositeQuery{
urlData := URLShareableCompositeQuery{
QueryType: string(v3.QueryTypeBuilder),
Builder: v3.URLShareableBuilderQuery{
QueryData: []v3.BuilderQuery{
builderQuery,
Builder: URLShareableBuilderQuery{
QueryData: []LinkQuery{
linkQuery,
},
QueryFormulas: make([]string, 0),
},
@@ -220,3 +224,97 @@ func PrepareFilters(labels map[string]string, whereClauseItems []v3.FilterItem,
return filterItems
}
func PrepareLinksToTracesV5(start, end time.Time, whereClause string) string {
// Traces list view expects time in nanoseconds
tr := URLShareableTimeRange{
Start: start.UnixNano(),
End: end.UnixNano(),
PageSize: 100,
}
options := URLShareableOptions{}
period, _ := json.Marshal(tr)
urlEncodedTimeRange := url.QueryEscape(string(period))
linkQuery := LinkQuery{
BuilderQuery: v3.BuilderQuery{
DataSource: v3.DataSourceTraces,
QueryName: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
AggregateAttribute: v3.AttributeKey{},
Expression: "A",
Disabled: false,
Having: []v3.Having{},
StepInterval: 60,
},
Filter: &FilterExpression{Expression: whereClause},
}
urlData := URLShareableCompositeQuery{
QueryType: string(v3.QueryTypeBuilder),
Builder: URLShareableBuilderQuery{
QueryData: []LinkQuery{
linkQuery,
},
QueryFormulas: make([]string, 0),
},
}
data, _ := json.Marshal(urlData)
compositeQuery := url.QueryEscape(url.QueryEscape(string(data)))
optionsData, _ := json.Marshal(options)
urlEncodedOptions := url.QueryEscape(string(optionsData))
return fmt.Sprintf("compositeQuery=%s&timeRange=%s&startTime=%d&endTime=%d&options=%s", compositeQuery, urlEncodedTimeRange, tr.Start, tr.End, urlEncodedOptions)
}
func PrepareLinksToLogsV5(start, end time.Time, whereClause string) string {
// Logs list view expects time in milliseconds
tr := URLShareableTimeRange{
Start: start.UnixMilli(),
End: end.UnixMilli(),
PageSize: 100,
}
options := URLShareableOptions{}
period, _ := json.Marshal(tr)
urlEncodedTimeRange := url.QueryEscape(string(period))
linkQuery := LinkQuery{
BuilderQuery: v3.BuilderQuery{
DataSource: v3.DataSourceLogs,
QueryName: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
AggregateAttribute: v3.AttributeKey{},
Expression: "A",
Disabled: false,
Having: []v3.Having{},
StepInterval: 60,
},
Filter: &FilterExpression{Expression: whereClause},
}
urlData := URLShareableCompositeQuery{
QueryType: string(v3.QueryTypeBuilder),
Builder: URLShareableBuilderQuery{
QueryData: []LinkQuery{
linkQuery,
},
QueryFormulas: make([]string, 0),
},
}
data, _ := json.Marshal(urlData)
compositeQuery := url.QueryEscape(url.QueryEscape(string(data)))
optionsData, _ := json.Marshal(options)
urlEncodedOptions := url.QueryEscape(string(optionsData))
return fmt.Sprintf("compositeQuery=%s&timeRange=%s&startTime=%d&endTime=%d&options=%s", compositeQuery, urlEncodedTimeRange, tr.Start, tr.End, urlEncodedOptions)
}

40
pkg/contextlinks/types.go Normal file
View File

@@ -0,0 +1,40 @@
package contextlinks
import v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
// TODO(srikanthccv): Fix the URL management
type URLShareableTimeRange struct {
Start int64 `json:"start"`
End int64 `json:"end"`
PageSize int64 `json:"pageSize"`
}
type FilterExpression struct {
Expression string `json:"expression,omitempty"`
}
type Aggregation struct {
Expression string `json:"expression,omitempty"`
}
type LinkQuery struct {
v3.BuilderQuery
Filter *FilterExpression `json:"filter,omitempty"`
Aggregations []*Aggregation `json:"aggregations,omitempty"`
}
type URLShareableBuilderQuery struct {
QueryData []LinkQuery `json:"queryData"`
QueryFormulas []string `json:"queryFormulas"`
}
type URLShareableCompositeQuery struct {
QueryType string `json:"queryType"`
Builder URLShareableBuilderQuery `json:"builder"`
}
type URLShareableOptions struct {
MaxLines int `json:"maxLines"`
Format string `json:"format"`
SelectColumns []v3.AttributeKey `json:"selectColumns"`
}

View File

@@ -7,8 +7,11 @@ import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/transition"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -16,11 +19,12 @@ import (
)
type handler struct {
module dashboard.Module
module dashboard.Module
providerSettings factory.ProviderSettings
}
func NewHandler(module dashboard.Module) dashboard.Handler {
return &handler{module: module}
func NewHandler(module dashboard.Module, providerSettings factory.ProviderSettings) dashboard.Handler {
return &handler{module: module, providerSettings: providerSettings}
}
func (handler *handler) Create(rw http.ResponseWriter, r *http.Request) {
@@ -46,6 +50,13 @@ func (handler *handler) Create(rw http.ResponseWriter, r *http.Request) {
return
}
if querybuilder.QBV5Enabled {
dashboardMigrator := transition.NewDashboardMigrateV5(handler.providerSettings.Logger, nil, nil)
if req["version"] != "v5" {
dashboardMigrator.Migrate(ctx, req)
}
}
dashboard, err := handler.module.Create(ctx, orgID, claims.Email, valuer.MustNewUUID(claims.UserID), req)
if err != nil {
render.Error(rw, err)

View File

@@ -1,10 +1,13 @@
package querier
import (
"context"
"encoding/json"
"net/http"
"regexp"
"runtime/debug"
"github.com/SigNoz/signoz/pkg/analytics"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/http/render"
@@ -14,12 +17,13 @@ import (
)
type API struct {
set factory.ProviderSettings
querier Querier
set factory.ProviderSettings
analytics analytics.Analytics
querier Querier
}
func NewAPI(set factory.ProviderSettings, querier Querier) *API {
return &API{set: set, querier: querier}
func NewAPI(set factory.ProviderSettings, querier Querier, analytics analytics.Analytics) *API {
return &API{set: set, querier: querier, analytics: analytics}
}
func (a *API) QueryRange(rw http.ResponseWriter, req *http.Request) {
@@ -76,5 +80,91 @@ func (a *API) QueryRange(rw http.ResponseWriter, req *http.Request) {
return
}
a.logEvent(req.Context(), req.Header.Get("Referer"), queryRangeResponse.QBEvent)
render.Success(rw, http.StatusOK, queryRangeResponse)
}
func (a *API) logEvent(ctx context.Context, referrer string, event *qbtypes.QBEvent) {
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
a.set.Logger.DebugContext(ctx, "couldn't get claims from context")
return
}
if !(event.LogsUsed || event.MetricsUsed || event.TracesUsed) {
a.set.Logger.DebugContext(ctx, "no data source in request, dubious?")
return
}
properties := map[string]any{
"version": event.Version,
"logs_used": event.LogsUsed,
"traces_used": event.TracesUsed,
"metrics_used": event.MetricsUsed,
"filter_applied": event.FilterApplied,
"group_by_applied": event.GroupByApplied,
"query_type": event.QueryType,
"panel_type": event.PanelType,
"number_of_queries": event.NumberOfQueries,
}
if referrer == "" {
a.set.Logger.DebugContext(ctx, "no referrer, we don't ball non-UI requests")
return
}
properties["referrer"] = referrer
logsExplorerMatched, _ := regexp.MatchString(`/logs/logs-explorer(?:\?.*)?$`, referrer)
traceExplorerMatched, _ := regexp.MatchString(`/traces-explorer(?:\?.*)?$`, referrer)
metricsExplorerMatched, _ := regexp.MatchString(`/metrics-explorer/explorer(?:\?.*)?$`, referrer)
dashboardMatched, _ := regexp.MatchString(`/dashboard/[a-zA-Z0-9\-]+/(new|edit)(?:\?.*)?$`, referrer)
alertMatched, _ := regexp.MatchString(`/alerts/(new|edit)(?:\?.*)?$`, referrer)
switch {
case dashboardMatched:
properties["module_name"] = "dashboard"
case alertMatched:
properties["module_name"] = "rule"
case metricsExplorerMatched:
properties["module_name"] = "metrics-explorer"
case logsExplorerMatched:
properties["module_name"] = "logs-explorer"
case traceExplorerMatched:
properties["module_name"] = "traces-explorer"
default:
a.set.Logger.DebugContext(ctx, "nothing matches referrer", "referrer", referrer)
return
}
if dashboardMatched {
if dashboardIDRegex, err := regexp.Compile(`/dashboard/([a-f0-9\-]+)/`); err == nil {
if matches := dashboardIDRegex.FindStringSubmatch(referrer); len(matches) > 1 {
properties["dashboard_id"] = matches[1]
}
}
if widgetIDRegex, err := regexp.Compile(`widgetId=([a-f0-9\-]+)`); err == nil {
if matches := widgetIDRegex.FindStringSubmatch(referrer); len(matches) > 1 {
properties["widget_id"] = matches[1]
}
}
}
if alertMatched {
if alertIDRegex, err := regexp.Compile(`ruleId=(\d+)`); err == nil {
if matches := alertIDRegex.FindStringSubmatch(referrer); len(matches) > 1 {
properties["rule_id"] = matches[1]
}
}
}
a.set.Logger.DebugContext(ctx, "sending analytics events", "analytics.event.properties", properties)
if !event.HasData {
a.analytics.TrackUser(ctx, claims.OrgID, claims.UserID, "Telemetry Query Returned Empty", properties)
return
}
a.analytics.TrackUser(ctx, claims.OrgID, claims.UserID, "Telemetry Query Returned Results", properties)
}

View File

@@ -629,7 +629,7 @@ func (bc *bucketCache) isEmptyResult(result *qbtypes.Result) (isEmpty bool, isFi
return !hasValues, !hasValues && totalSeries > 0
}
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar:
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar, qbtypes.RequestTypeTrace:
// Raw and scalar data are not cached
return true, false
}
@@ -775,7 +775,7 @@ func (bc *bucketCache) trimResultToFluxBoundary(result *qbtypes.Result, fluxBoun
trimmedResult.Value = trimmedData
}
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar:
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar, qbtypes.RequestTypeTrace:
// Don't cache raw or scalar data
return nil
}

View File

@@ -42,7 +42,7 @@ func consume(rows driver.Rows, kind qbtypes.RequestType, queryWindow *qbtypes.Ti
payload, err = readAsTimeSeries(rows, queryWindow, step, queryName)
case qbtypes.RequestTypeScalar:
payload, err = readAsScalar(rows, queryName)
case qbtypes.RequestTypeRaw:
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeTrace:
payload, err = readAsRaw(rows, queryName)
// TODO: add support for other request types
}

View File

@@ -9,7 +9,6 @@ import (
"strings"
"github.com/SigNoz/govaluate"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
@@ -45,73 +44,6 @@ func getQueryName(spec any) string {
return getqueryInfo(spec).Name
}
func StepIntervalForQuery(req *qbtypes.QueryRangeRequest, name string) int64 {
stepsMap := make(map[string]int64)
for _, query := range req.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
stepsMap[spec.Name] = int64(spec.StepInterval.Seconds())
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
stepsMap[spec.Name] = int64(spec.StepInterval.Seconds())
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
stepsMap[spec.Name] = int64(spec.StepInterval.Seconds())
case qbtypes.PromQuery:
stepsMap[spec.Name] = int64(spec.Step.Seconds())
}
}
if step, ok := stepsMap[name]; ok {
return step
}
exprStr := ""
for _, query := range req.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderFormula:
if spec.Name == name {
exprStr = spec.Expression
}
}
}
expression, _ := govaluate.NewEvaluableExpressionWithFunctions(exprStr, qbtypes.EvalFuncs())
steps := []int64{}
for _, v := range expression.Vars() {
steps = append(steps, stepsMap[v])
}
return querybuilder.LCMList(steps)
}
func NumAggregationForQuery(req *qbtypes.QueryRangeRequest, name string) int64 {
numAgg := 0
for _, query := range req.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
if spec.Name == name {
numAgg += 1
}
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
if spec.Name == name {
numAgg += 1
}
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
if spec.Name == name {
numAgg += 1
}
case qbtypes.QueryBuilderFormula:
if spec.Name == name {
numAgg += 1
}
}
}
return int64(numAgg)
}
func (q *querier) postProcessResults(ctx context.Context, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
// Convert results to typed format for processing
typedResults := make(map[string]*qbtypes.Result)
@@ -179,7 +111,7 @@ func (q *querier) postProcessResults(ctx context.Context, results map[string]any
if req.RequestType == qbtypes.RequestTypeTimeSeries && req.FormatOptions != nil && req.FormatOptions.FillGaps {
for name := range typedResults {
funcs := []qbtypes.Function{{Name: qbtypes.FunctionNameFillZero}}
funcs = q.prepareFillZeroArgsWithStep(funcs, req, StepIntervalForQuery(req, name))
funcs = q.prepareFillZeroArgsWithStep(funcs, req, req.StepIntervalForQuery(name))
// empty time series if it doesn't exist
tsData, ok := typedResults[name].Value.(*qbtypes.TimeSeriesData)
if !ok {
@@ -187,7 +119,7 @@ func (q *querier) postProcessResults(ctx context.Context, results map[string]any
}
if len(tsData.Aggregations) == 0 {
numAgg := NumAggregationForQuery(req, name)
numAgg := req.NumAggregationForQuery(name)
tsData.Aggregations = make([]*qbtypes.AggregationBucket, numAgg)
for idx := range numAgg {
tsData.Aggregations[idx] = &qbtypes.AggregationBucket{

View File

@@ -6,6 +6,7 @@ import (
"log/slog"
"slices"
"strconv"
"strings"
"sync"
"time"
@@ -111,10 +112,16 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
if tmplVars == nil {
tmplVars = make(map[string]qbtypes.VariableItem)
}
event := &qbtypes.QBEvent{
Version: "v5",
NumberOfQueries: len(req.CompositeQuery.Queries),
PanelType: req.RequestType.StringValue(),
}
// First pass: collect all metric names that need temporality
metricNames := make([]string, 0)
for idx, query := range req.CompositeQuery.Queries {
event.QueryType = query.Type.StringValue()
if query.Type == qbtypes.QueryTypeBuilder {
if spec, ok := query.Spec.(qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]); ok {
for _, agg := range spec.Aggregations {
@@ -128,6 +135,9 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
// allowed, we override it.
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
event.TracesUsed = true
event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != ""
event.GroupByApplied = len(spec.GroupBy) > 0
if spec.StepInterval.Seconds() == 0 {
spec.StepInterval = qbtypes.Step{
Duration: time.Second * time.Duration(querybuilder.RecommendedStepInterval(req.Start, req.End)),
@@ -140,6 +150,9 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
}
req.CompositeQuery.Queries[idx].Spec = spec
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
event.LogsUsed = true
event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != ""
event.GroupByApplied = len(spec.GroupBy) > 0
if spec.StepInterval.Seconds() == 0 {
spec.StepInterval = qbtypes.Step{
Duration: time.Second * time.Duration(querybuilder.RecommendedStepInterval(req.Start, req.End)),
@@ -152,6 +165,9 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
}
req.CompositeQuery.Queries[idx].Spec = spec
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
event.MetricsUsed = true
event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != ""
event.GroupByApplied = len(spec.GroupBy) > 0
if spec.StepInterval.Seconds() == 0 {
spec.StepInterval = qbtypes.Step{
Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End)),
@@ -166,6 +182,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
req.CompositeQuery.Queries[idx].Spec = spec
}
} else if query.Type == qbtypes.QueryTypePromQL {
event.MetricsUsed = true
switch spec := query.Spec.(type) {
case qbtypes.PromQuery:
if spec.Step.Seconds() == 0 {
@@ -175,6 +192,15 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
}
req.CompositeQuery.Queries[idx].Spec = spec
}
} else if query.Type == qbtypes.QueryTypeClickHouseSQL {
switch spec := query.Spec.(type) {
case qbtypes.ClickHouseQuery:
if strings.TrimSpace(spec.Query) != "" {
event.MetricsUsed = strings.Contains(spec.Query, "signoz_metrics")
event.LogsUsed = strings.Contains(spec.Query, "signoz_logs")
event.TracesUsed = strings.Contains(spec.Query, "signoz_traces")
}
}
}
}
@@ -247,14 +273,56 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
}
}
}
return q.run(ctx, orgID, queries, req, steps)
qbResp, qbErr := q.run(ctx, orgID, queries, req, steps, event)
if qbResp != nil {
qbResp.QBEvent = event
}
return qbResp, qbErr
}
func (q *querier) run(ctx context.Context, orgID valuer.UUID, qs map[string]qbtypes.Query, req *qbtypes.QueryRangeRequest, steps map[string]qbtypes.Step) (*qbtypes.QueryRangeResponse, error) {
func (q *querier) run(
ctx context.Context,
orgID valuer.UUID,
qs map[string]qbtypes.Query,
req *qbtypes.QueryRangeRequest,
steps map[string]qbtypes.Step,
qbEvent *qbtypes.QBEvent,
) (*qbtypes.QueryRangeResponse, error) {
results := make(map[string]any)
warnings := make([]string, 0)
stats := qbtypes.ExecStats{}
hasData := func(result *qbtypes.Result) bool {
if result == nil || result.Value == nil {
return false
}
switch result.Type {
case qbtypes.RequestTypeScalar:
if val, ok := result.Value.(*qbtypes.ScalarData); ok && val != nil {
return len(val.Data) != 0
}
case qbtypes.RequestTypeRaw:
if val, ok := result.Value.(*qbtypes.RawData); ok && val != nil {
return len(val.Rows) != 0
}
case qbtypes.RequestTypeTimeSeries:
if val, ok := result.Value.(*qbtypes.TimeSeriesData); ok && val != nil {
if len(val.Aggregations) != 0 {
anyNonEmpty := false
for _, aggBucket := range val.Aggregations {
if len(aggBucket.Series) != 0 {
anyNonEmpty = true
break
}
}
return anyNonEmpty
}
return false
}
}
return false
}
for name, query := range qs {
// Skip cache if NoCache is set, or if cache is not available
if req.NoCache || q.bucketCache == nil || query.Fingerprint() == "" {
@@ -264,6 +332,7 @@ func (q *querier) run(ctx context.Context, orgID valuer.UUID, qs map[string]qbty
q.logger.InfoContext(ctx, "no bucket cache or fingerprint, executing query", "fingerprint", query.Fingerprint())
}
result, err := query.Execute(ctx)
qbEvent.HasData = qbEvent.HasData || hasData(result)
if err != nil {
return nil, err
}
@@ -274,6 +343,7 @@ func (q *querier) run(ctx context.Context, orgID valuer.UUID, qs map[string]qbty
stats.DurationMS += result.Stats.DurationMS
} else {
result, err := q.executeWithCache(ctx, orgID, query, steps[name], req.NoCache)
qbEvent.HasData = qbEvent.HasData || hasData(result)
if err != nil {
return nil, err
}

View File

@@ -9,7 +9,6 @@ import (
"fmt"
"io"
"math"
"math/rand/v2"
"net/http"
"net/url"
"regexp"
@@ -30,7 +29,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer"
"github.com/SigNoz/signoz/pkg/query-service/transition"
"github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/prometheus/prometheus/promql"
@@ -40,6 +38,7 @@ import (
jsoniter "github.com/json-iterator/go"
_ "github.com/mattn/go-sqlite3"
"github.com/SigNoz/signoz/pkg/contextlinks"
traceFunnelsModule "github.com/SigNoz/signoz/pkg/modules/tracefunnel"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
@@ -57,7 +56,6 @@ import (
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/query-service/contextlinks"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/postprocess"
"github.com/SigNoz/signoz/pkg/types"
@@ -66,7 +64,6 @@ import (
"github.com/SigNoz/signoz/pkg/types/licensetypes"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunneltypes"
@@ -1076,6 +1073,7 @@ func (aH *APIHandler) getRuleStateHistoryTopContributors(w http.ResponseWriter,
if err != nil {
continue
}
// TODO(srikanthccv): fix the links here after first QB milestone
filterItems, groupBy, keys := aH.metaForLinks(r.Context(), rule)
newFilters := contextlinks.PrepareFilters(lbls, filterItems, groupBy, keys)
end := time.Unix(params.End/1000, 0)
@@ -4912,45 +4910,6 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que
Result: result,
}
if rand.Float64() < (1.0/30.0) &&
queryRangeParams.CompositeQuery.PanelType != v3.PanelTypeList &&
queryRangeParams.CompositeQuery.PanelType != v3.PanelTypeTrace {
v4JSON, _ := json.Marshal(queryRangeParams)
func() {
defer func() {
if rr := recover(); rr != nil {
zap.L().Warn(
"unexpected panic while converting to v5",
zap.Any("panic", rr),
zap.String("v4_payload", string(v4JSON)),
)
}
}()
v5Req, err := transition.ConvertV3ToV5(queryRangeParams)
if err != nil {
zap.L().Warn("unable to convert to v5 request payload", zap.Error(err), zap.String("v4_payload", string(v4JSON)))
return
}
v5ReqJSON, _ := json.Marshal(v5Req)
v3Resp := v3.QueryRangeResponse{
Result: result,
}
v5Resp, err := transition.ConvertV3ResponseToV5(&v3Resp, querybuildertypesv5.RequestTypeTimeSeries)
if err != nil {
zap.L().Warn("unable to convert to v5 response payload", zap.Error(err))
return
}
v5RespJSON, _ := json.Marshal(v5Resp)
zap.L().Info("v5 request and expected response",
zap.String("request_payload", string(v5ReqJSON)),
zap.String("response_payload", string(v5RespJSON)),
)
}()
}
aH.Respond(w, resp)
}

View File

@@ -3,6 +3,7 @@ package app
import (
"context"
"fmt"
"log/slog"
"net"
"net/http"
_ "net/http/pprof" // http profiler
@@ -15,6 +16,7 @@ import (
"github.com/SigNoz/signoz/pkg/licensing/nooplicensing"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querier"
querierAPI "github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
@@ -91,6 +93,8 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
signoz.TelemetryStore,
signoz.Prometheus,
signoz.Modules.OrgGetter,
signoz.Querier,
signoz.Instrumentation.Logger(),
)
if err != nil {
return nil, err
@@ -115,7 +119,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
LicensingAPI: nooplicensing.NewLicenseAPI(),
FieldsAPI: fields.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.TelemetryStore),
Signoz: signoz,
QuerierAPI: querierAPI.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.Querier),
QuerierAPI: querierAPI.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.Querier, signoz.Analytics),
})
if err != nil {
return nil, err
@@ -383,6 +387,8 @@ func makeRulesManager(
telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus,
orgGetter organization.Getter,
querier querier.Querier,
logger *slog.Logger,
) (*rules.Manager, error) {
// create manager opts
managerOpts := &rules.ManagerOptions{
@@ -391,6 +397,8 @@ func makeRulesManager(
Context: context.Background(),
Logger: zap.L(),
Reader: ch,
Querier: querier,
SLogger: logger,
Cache: cache,
EvalDelay: constants.GetEvalDelay(),
SQLStore: sqlstore,

View File

@@ -12,6 +12,8 @@ import (
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/pkg/errors"
"go.uber.org/zap"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type DataSource string
@@ -510,8 +512,11 @@ type CompositeQuery struct {
BuilderQueries map[string]*BuilderQuery `json:"builderQueries,omitempty"`
ClickHouseQueries map[string]*ClickHouseQuery `json:"chQueries,omitempty"`
PromQueries map[string]*PromQuery `json:"promQueries,omitempty"`
PanelType PanelType `json:"panelType"`
QueryType QueryType `json:"queryType"`
Queries []qbtypes.QueryEnvelope `json:"queries,omitempty"`
PanelType PanelType `json:"panelType"`
QueryType QueryType `json:"queryType"`
// Unit for the time series data shown in the graph
// This is used in alerts to format the value and threshold
Unit string `json:"unit,omitempty"`
@@ -1454,28 +1459,6 @@ type MetricMetadataResponse struct {
Temporality string `json:"temporality"`
}
type URLShareableTimeRange struct {
Start int64 `json:"start"`
End int64 `json:"end"`
PageSize int64 `json:"pageSize"`
}
type URLShareableBuilderQuery struct {
QueryData []BuilderQuery `json:"queryData"`
QueryFormulas []string `json:"queryFormulas"`
}
type URLShareableCompositeQuery struct {
QueryType string `json:"queryType"`
Builder URLShareableBuilderQuery `json:"builder"`
}
type URLShareableOptions struct {
MaxLines int `json:"maxLines"`
Format string `json:"format"`
SelectColumns []AttributeKey `json:"selectColumns"`
}
type QBOptions struct {
GraphLimitQtype string
IsLivetailQuery bool

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sort"
"strings"
"sync"
@@ -19,6 +20,7 @@ import (
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/prometheus"
querierV5 "github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
@@ -38,6 +40,8 @@ type PrepareTaskOptions struct {
MaintenanceStore ruletypes.MaintenanceStore
Logger *zap.Logger
Reader interfaces.Reader
Querier querierV5.Querier
SLogger *slog.Logger
Cache cache.Cache
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
@@ -51,6 +55,8 @@ type PrepareTestRuleOptions struct {
MaintenanceStore ruletypes.MaintenanceStore
Logger *zap.Logger
Reader interfaces.Reader
Querier querierV5.Querier
SLogger *slog.Logger
Cache cache.Cache
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
@@ -84,6 +90,8 @@ type ManagerOptions struct {
Logger *zap.Logger
ResendDelay time.Duration
Reader interfaces.Reader
Querier querierV5.Querier
SLogger *slog.Logger
Cache cache.Cache
EvalDelay time.Duration
@@ -146,6 +154,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.OrgID,
opts.Rule,
opts.Reader,
opts.Querier,
WithEvalDelay(opts.ManagerOpts.EvalDelay),
WithSQLStore(opts.SQLStore),
)
@@ -392,6 +401,8 @@ func (m *Manager) editTask(_ context.Context, orgID valuer.UUID, rule *ruletypes
MaintenanceStore: m.maintenanceStore,
Logger: m.logger,
Reader: m.reader,
Querier: m.opts.Querier,
SLogger: m.opts.SLogger,
Cache: m.cache,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
@@ -583,6 +594,8 @@ func (m *Manager) addTask(_ context.Context, orgID valuer.UUID, rule *ruletypes.
MaintenanceStore: m.maintenanceStore,
Logger: m.logger,
Reader: m.reader,
Querier: m.opts.Querier,
SLogger: m.opts.SLogger,
Cache: m.cache,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),

View File

@@ -8,6 +8,7 @@ import (
"go.uber.org/zap"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/query-service/formatter"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
@@ -20,10 +21,13 @@ import (
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/prometheus/prometheus/promql"
yaml "gopkg.in/yaml.v2"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type PromRule struct {
*BaseRule
version string
prometheus prometheus.Prometheus
}
@@ -44,6 +48,7 @@ func NewPromRule(
p := PromRule{
BaseRule: baseRule,
version: postableRule.Version,
prometheus: prometheus,
}
p.logger = logger
@@ -80,6 +85,25 @@ func (r *PromRule) GetSelectedQuery() string {
func (r *PromRule) getPqlQuery() (string, error) {
if r.version == "v5" {
if len(r.ruleCondition.CompositeQuery.Queries) > 0 {
selectedQuery := r.GetSelectedQuery()
for _, item := range r.ruleCondition.CompositeQuery.Queries {
switch item.Type {
case qbtypes.QueryTypePromQL:
promQuery, ok := item.Spec.(qbtypes.PromQuery)
if !ok {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query spec %T", item.Spec)
}
if promQuery.Name == selectedQuery {
return promQuery.Query, nil
}
}
}
}
return "", fmt.Errorf("invalid promql rule setup")
}
if r.ruleCondition.CompositeQuery.QueryType == v3.QueryTypePromQL {
if len(r.ruleCondition.CompositeQuery.PromQueries) > 0 {
selectedQuery := r.GetSelectedQuery()

View File

@@ -48,6 +48,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
opts.OrgID,
parsedRule,
opts.Reader,
opts.Querier,
WithSendAlways(),
WithSendUnmatched(),
WithSQLStore(opts.SQLStore),

View File

@@ -6,19 +6,19 @@ import (
"encoding/json"
"fmt"
"math"
"math/rand/v2"
"reflect"
"text/template"
"time"
"go.uber.org/zap"
"github.com/SigNoz/signoz/pkg/contextlinks"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/contextlinks"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/query-service/postprocess"
"github.com/SigNoz/signoz/pkg/query-service/transition"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/transition"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/pkg/query-service/app/querier"
@@ -36,6 +36,10 @@ import (
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
"github.com/SigNoz/signoz/pkg/query-service/formatter"
querierV5 "github.com/SigNoz/signoz/pkg/querier"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
yaml "gopkg.in/yaml.v2"
)
@@ -52,6 +56,9 @@ type ThresholdRule struct {
// querierV2 is used for alerts created after the introduction of new metrics query builder
querierV2 interfaces.Querier
// querierV5 is used for alerts migrated after the introduction of new query builder
querierV5 querierV5.Querier
// used for attribute metadata enrichment for logs and traces
logsKeys map[string]v3.AttributeKey
spansKeys map[string]v3.AttributeKey
@@ -65,6 +72,7 @@ func NewThresholdRule(
orgID valuer.UUID,
p *ruletypes.PostableRule,
reader interfaces.Reader,
querierV5 querierV5.Querier,
opts ...RuleOption,
) (*ThresholdRule, error) {
@@ -94,6 +102,7 @@ func NewThresholdRule(
t.querier = querier.NewQuerier(querierOption)
t.querierV2 = querierV2.NewQuerier(querierOptsV2)
t.querierV5 = querierV5
t.reader = reader
return &t, nil
}
@@ -185,7 +194,32 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) (*v3.QueryRangeParamsV3,
}, nil
}
func (r *ThresholdRule) prepareQueryRangeV5(ts time.Time) (*qbtypes.QueryRangeRequest, error) {
zap.L().Info("prepareQueryRangeV5", zap.Int64("ts", ts.UnixMilli()), zap.Int64("evalWindow", r.evalWindow.Milliseconds()), zap.Int64("evalDelay", r.evalDelay.Milliseconds()))
startTs, endTs := r.Timestamps(ts)
start, end := startTs.UnixMilli(), endTs.UnixMilli()
req := &qbtypes.QueryRangeRequest{
Start: uint64(start),
End: uint64(end),
RequestType: qbtypes.RequestTypeTimeSeries,
CompositeQuery: qbtypes.CompositeQuery{
Queries: make([]qbtypes.QueryEnvelope, 0),
},
NoCache: true,
}
copy(r.Condition().CompositeQuery.Queries, req.CompositeQuery.Queries)
return req, nil
}
func (r *ThresholdRule) prepareLinksToLogs(ts time.Time, lbls labels.Labels) string {
if r.version == "v5" {
return r.prepareLinksToLogsV5(ts, lbls)
}
selectedQuery := r.GetSelectedQuery()
qr, err := r.prepareQueryRange(ts)
@@ -220,6 +254,11 @@ func (r *ThresholdRule) prepareLinksToLogs(ts time.Time, lbls labels.Labels) str
}
func (r *ThresholdRule) prepareLinksToTraces(ts time.Time, lbls labels.Labels) string {
if r.version == "v5" {
return r.prepareLinksToTracesV5(ts, lbls)
}
selectedQuery := r.GetSelectedQuery()
qr, err := r.prepareQueryRange(ts)
@@ -253,6 +292,86 @@ func (r *ThresholdRule) prepareLinksToTraces(ts time.Time, lbls labels.Labels) s
return contextlinks.PrepareLinksToTraces(start, end, filterItems)
}
func (r *ThresholdRule) prepareLinksToLogsV5(ts time.Time, lbls labels.Labels) string {
selectedQuery := r.GetSelectedQuery()
qr, err := r.prepareQueryRangeV5(ts)
if err != nil {
return ""
}
start := time.UnixMilli(int64(qr.Start))
end := time.UnixMilli(int64(qr.End))
// TODO(srikanthccv): handle formula queries
if selectedQuery < "A" || selectedQuery > "Z" {
return ""
}
var q qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
for _, query := range r.ruleCondition.CompositeQuery.Queries {
if query.Type == qbtypes.QueryTypeBuilder {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
q = spec
}
}
}
if q.Signal != telemetrytypes.SignalLogs {
return ""
}
filterExpr := ""
if q.Filter != nil && q.Filter.Expression != "" {
filterExpr = q.Filter.Expression
}
whereClause := contextlinks.PrepareFilterExpression(lbls.Map(), filterExpr, q.GroupBy)
return contextlinks.PrepareLinksToLogsV5(start, end, whereClause)
}
func (r *ThresholdRule) prepareLinksToTracesV5(ts time.Time, lbls labels.Labels) string {
selectedQuery := r.GetSelectedQuery()
qr, err := r.prepareQueryRangeV5(ts)
if err != nil {
return ""
}
start := time.UnixMilli(int64(qr.Start))
end := time.UnixMilli(int64(qr.End))
// TODO(srikanthccv): handle formula queries
if selectedQuery < "A" || selectedQuery > "Z" {
return ""
}
var q qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]
for _, query := range r.ruleCondition.CompositeQuery.Queries {
if query.Type == qbtypes.QueryTypeBuilder {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
q = spec
}
}
}
if q.Signal != telemetrytypes.SignalTraces {
return ""
}
filterExpr := ""
if q.Filter != nil && q.Filter.Expression != "" {
filterExpr = q.Filter.Expression
}
whereClause := contextlinks.PrepareFilterExpression(lbls.Map(), filterExpr, q.GroupBy)
return contextlinks.PrepareLinksToTracesV5(start, end, whereClause)
}
func (r *ThresholdRule) GetSelectedQuery() string {
return r.ruleCondition.GetSelectedQueryName()
}
@@ -355,51 +474,85 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
return resultVector, nil
}
shouldLog := false
for _, series := range queryResult.Series {
smpl, shouldAlert := r.ShouldAlert(*series)
if shouldAlert {
shouldLog = true
resultVector = append(resultVector, smpl)
}
}
if (shouldLog && r.triggerCnt < 100) || rand.Float64() < (1.0/30.0) {
func(ts time.Time) {
r.triggerCnt++
defer func() {
if rr := recover(); rr != nil {
zap.L().Warn("unexpected panic while converting to v5",
zap.Any("panic", rr),
zap.String("ruleid", r.ID()),
)
}
}()
v5Req, err := transition.ConvertV3ToV5(params)
if err != nil {
zap.L().Warn("unable to convert to v5 request payload", zap.Error(err), zap.String("ruleid", r.ID()))
return
}
v5ReqJSON, _ := json.Marshal(v5Req)
return resultVector, nil
}
v3Resp := v3.QueryRangeResponse{
Result: results,
}
func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID, ts time.Time) (ruletypes.Vector, error) {
v5Resp, err := transition.ConvertV3ResponseToV5(&v3Resp, querybuildertypesv5.RequestTypeTimeSeries)
if err != nil {
zap.L().Warn("unable to convert to v5 response payload", zap.Error(err), zap.String("ruleid", r.ID()))
return
}
params, err := r.prepareQueryRangeV5(ts)
if err != nil {
return nil, err
}
v5RespJSON, _ := json.Marshal(v5Resp)
zap.L().Info("v5 request and expected response for triggered alert",
zap.String("request_payload", string(v5ReqJSON)),
zap.String("response_payload", string(v5RespJSON)),
zap.String("ruleid", r.ID()),
)
}(ts)
var results []*v3.Result
v5Result, err := r.querierV5.QueryRange(ctx, orgID, params)
if err != nil {
zap.L().Error("failed to get alert query result", zap.String("rule", r.Name()), zap.Error(err), zap.Error(err))
return nil, fmt.Errorf("internal error while querying")
}
data, ok := v5Result.Data.(struct {
Results []any `json:"results"`
Warnings []string `json:"warnings"`
})
if !ok {
return nil, fmt.Errorf("upexpected result from v5 querier")
}
for _, item := range data.Results {
if tsData, ok := item.(*qbtypes.TimeSeriesData); ok {
results = append(results, transition.ConvertV5TimeSeriesDataToV4Result(tsData))
} else {
// NOTE: should not happen but just to ensure we don't miss it if it happens for some reason
zap.L().Warn("expected qbtypes.TimeSeriesData but got", zap.Any("item_type", reflect.TypeOf(item)))
}
}
selectedQuery := r.GetSelectedQuery()
var queryResult *v3.Result
for _, res := range results {
if res.QueryName == selectedQuery {
queryResult = res
break
}
}
if queryResult != nil && len(queryResult.Series) > 0 {
r.lastTimestampWithDatapoints = time.Now()
}
var resultVector ruletypes.Vector
// if the data is missing for `For` duration then we should send alert
if r.ruleCondition.AlertOnAbsent && r.lastTimestampWithDatapoints.Add(time.Duration(r.Condition().AbsentFor)*time.Minute).Before(time.Now()) {
zap.L().Info("no data found for rule condition", zap.String("ruleid", r.ID()))
lbls := labels.NewBuilder(labels.Labels{})
if !r.lastTimestampWithDatapoints.IsZero() {
lbls.Set("lastSeen", r.lastTimestampWithDatapoints.Format(constants.AlertTimeFormat))
}
resultVector = append(resultVector, ruletypes.Sample{
Metric: lbls.Labels(),
IsMissing: true,
})
return resultVector, nil
}
for _, series := range queryResult.Series {
smpl, shouldAlert := r.ShouldAlert(*series)
if shouldAlert {
resultVector = append(resultVector, smpl)
}
}
return resultVector, nil
@@ -410,7 +563,17 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
prevState := r.State()
valueFormatter := formatter.FromUnit(r.Unit())
res, err := r.buildAndRunQuery(ctx, r.orgID, ts)
var res ruletypes.Vector
var err error
if r.version == "v5" {
zap.L().Info("running v5 query")
res, err = r.buildAndRunQueryV5(ctx, r.orgID, ts)
} else {
zap.L().Info("running v4 query")
res, err = r.buildAndRunQuery(ctx, r.orgID, ts)
}
if err != nil {
return nil, err

View File

@@ -14,6 +14,7 @@ import (
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
@@ -24,6 +25,8 @@ import (
"github.com/stretchr/testify/require"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
func TestThresholdRuleShouldAlert(t *testing.T) {
@@ -800,7 +803,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) {
postableRule.RuleCondition.MatchType = ruletypes.MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, nil, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@@ -888,7 +891,7 @@ func TestPrepareLinksToLogs(t *testing.T) {
},
}
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, nil, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@@ -899,6 +902,102 @@ func TestPrepareLinksToLogs(t *testing.T) {
assert.Contains(t, link, "&timeRange=%7B%22start%22%3A1705468620000%2C%22end%22%3A1705468920000%2C%22pageSize%22%3A100%7D&startTime=1705468620000&endTime=1705468920000")
}
func TestPrepareLinksToLogsV5(t *testing.T) {
postableRule := ruletypes.PostableRule{
AlertName: "Tricky Condition Tests",
AlertType: ruletypes.AlertTypeLogs,
RuleType: ruletypes.RuleTypeThreshold,
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Name: "A",
StepInterval: qbtypes.Step{Duration: 1 * time.Minute},
Aggregations: []qbtypes.LogAggregation{
{
Expression: "count()",
},
},
Filter: &qbtypes.Filter{
Expression: "service.name EXISTS",
},
Signal: telemetrytypes.SignalLogs,
},
},
},
},
CompareOp: "4", // Not Equals
MatchType: "1", // Once
Target: &[]float64{0.0}[0],
SelectedQuery: "A",
},
Version: "v5",
}
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, nil, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
ts := time.UnixMilli(1753527163000)
link := rule.prepareLinksToLogs(ts, labels.Labels{})
assert.Contains(t, link, "compositeQuery=%257B%2522queryType%2522%253A%2522builder%2522%252C%2522builder%2522%253A%257B%2522queryData%2522%253A%255B%257B%2522queryName%2522%253A%2522A%2522%252C%2522stepInterval%2522%253A60%252C%2522dataSource%2522%253A%2522logs%2522%252C%2522aggregateOperator%2522%253A%2522noop%2522%252C%2522aggregateAttribute%2522%253A%257B%2522key%2522%253A%2522%2522%252C%2522dataType%2522%253A%2522%2522%252C%2522type%2522%253A%2522%2522%252C%2522isColumn%2522%253Afalse%252C%2522isJSON%2522%253Afalse%257D%252C%2522expression%2522%253A%2522A%2522%252C%2522disabled%2522%253Afalse%252C%2522limit%2522%253A0%252C%2522offset%2522%253A0%252C%2522pageSize%2522%253A0%252C%2522ShiftBy%2522%253A0%252C%2522IsAnomaly%2522%253Afalse%252C%2522QueriesUsedInFormula%2522%253Anull%252C%2522filter%2522%253A%257B%2522expression%2522%253A%2522service.name%2BEXISTS%2522%257D%257D%255D%252C%2522queryFormulas%2522%253A%255B%255D%257D%257D&timeRange=%7B%22start%22%3A1753526700000%2C%22end%22%3A1753527000000%2C%22pageSize%22%3A100%7D&startTime=1753526700000&endTime=1753527000000&options=%7B%22maxLines%22%3A0%2C%22format%22%3A%22%22%2C%22selectColumns%22%3Anull%7D")
}
func TestPrepareLinksToTracesV5(t *testing.T) {
postableRule := ruletypes.PostableRule{
AlertName: "Tricky Condition Tests",
AlertType: ruletypes.AlertTypeTraces,
RuleType: ruletypes.RuleTypeThreshold,
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "A",
StepInterval: qbtypes.Step{Duration: 1 * time.Minute},
Aggregations: []qbtypes.TraceAggregation{
{
Expression: "count()",
},
},
Filter: &qbtypes.Filter{
Expression: "service.name EXISTS",
},
Signal: telemetrytypes.SignalTraces,
},
},
},
},
CompareOp: "4", // Not Equals
MatchType: "1", // Once
Target: &[]float64{0.0}[0],
SelectedQuery: "A",
},
Version: "v5",
}
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, nil, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
ts := time.UnixMilli(1753527163000)
link := rule.prepareLinksToTraces(ts, labels.Labels{})
assert.Contains(t, link, "compositeQuery=%257B%2522queryType%2522%253A%2522builder%2522%252C%2522builder%2522%253A%257B%2522queryData%2522%253A%255B%257B%2522queryName%2522%253A%2522A%2522%252C%2522stepInterval%2522%253A60%252C%2522dataSource%2522%253A%2522traces%2522%252C%2522aggregateOperator%2522%253A%2522noop%2522%252C%2522aggregateAttribute%2522%253A%257B%2522key%2522%253A%2522%2522%252C%2522dataType%2522%253A%2522%2522%252C%2522type%2522%253A%2522%2522%252C%2522isColumn%2522%253Afalse%252C%2522isJSON%2522%253Afalse%257D%252C%2522expression%2522%253A%2522A%2522%252C%2522disabled%2522%253Afalse%252C%2522limit%2522%253A0%252C%2522offset%2522%253A0%252C%2522pageSize%2522%253A0%252C%2522ShiftBy%2522%253A0%252C%2522IsAnomaly%2522%253Afalse%252C%2522QueriesUsedInFormula%2522%253Anull%252C%2522filter%2522%253A%257B%2522expression%2522%253A%2522service.name%2BEXISTS%2522%257D%257D%255D%252C%2522queryFormulas%2522%253A%255B%255D%257D%257D&timeRange=%7B%22start%22%3A1753526700000000000%2C%22end%22%3A1753527000000000000%2C%22pageSize%22%3A100%7D&startTime=1753526700000000000&endTime=1753527000000000000&options=%7B%22maxLines%22%3A0%2C%22format%22%3A%22%22%2C%22selectColumns%22%3Anull%7D")
}
func TestPrepareLinksToTraces(t *testing.T) {
postableRule := ruletypes.PostableRule{
AlertName: "Links to traces test",
@@ -929,7 +1028,7 @@ func TestPrepareLinksToTraces(t *testing.T) {
},
}
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, nil, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@@ -1004,7 +1103,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) {
postableRule.RuleCondition.MatchType = ruletypes.MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, nil, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@@ -1056,7 +1155,7 @@ func TestThresholdRuleEvalDelay(t *testing.T) {
}
for idx, c := range cases {
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil) // no eval delay
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, nil) // no eval delay
if err != nil {
assert.NoError(t, err)
}
@@ -1104,7 +1203,7 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) {
}
for idx, c := range cases {
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, nil, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@@ -1243,7 +1342,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
readerCache, err := cachetest.New(cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
require.NoError(t, err)
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), readerCache)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
@@ -1339,7 +1438,7 @@ func TestThresholdRuleNoData(t *testing.T) {
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), readerCache)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
@@ -1443,7 +1542,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
@@ -1564,7 +1663,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
@@ -1640,7 +1739,7 @@ func TestThresholdRuleShiftBy(t *testing.T) {
},
}
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, nil)
if err != nil {
assert.NoError(t, err)
}

View File

@@ -318,7 +318,7 @@ func NewFilterSuggestionsTestBed(t *testing.T) *FilterSuggestionsTestBed {
emailing := emailingtest.New()
analytics := analyticstest.New()
modules := signoz.NewModules(testDB, jwt, emailing, providerSettings, orgGetter, alertmanager, analytics)
handlers := signoz.NewHandlers(modules)
handlers := signoz.NewHandlers(modules, providerSettings)
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{
Reader: reader,

View File

@@ -498,7 +498,7 @@ func NewTestbedWithoutOpamp(t *testing.T, sqlStore sqlstore.SQLStore) *LogPipeli
emailing := emailingtest.New()
analytics := analyticstest.New()
modules := signoz.NewModules(sqlStore, jwt, emailing, providerSettings, orgGetter, alertmanager, analytics)
handlers := signoz.NewHandlers(modules)
handlers := signoz.NewHandlers(modules, providerSettings)
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{
LogsParsingPipelineController: controller,

View File

@@ -379,7 +379,7 @@ func NewCloudIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *CloudI
emailing := emailingtest.New()
analytics := analyticstest.New()
modules := signoz.NewModules(testDB, jwt, emailing, providerSettings, orgGetter, alertmanager, analytics)
handlers := signoz.NewHandlers(modules)
handlers := signoz.NewHandlers(modules, providerSettings)
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{
Reader: reader,

View File

@@ -594,7 +594,7 @@ func NewIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *Integration
emailing := emailingtest.New()
analytics := analyticstest.New()
modules := signoz.NewModules(testDB, jwt, emailing, providerSettings, orgGetter, alertmanager, analytics)
handlers := signoz.NewHandlers(modules)
handlers := signoz.NewHandlers(modules, providerSettings)
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{
Reader: reader,

View File

@@ -1,683 +0,0 @@
package transition
import (
"fmt"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/query-service/constants"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils"
v5 "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
func ConvertV3ToV5(params *v3.QueryRangeParamsV3) (*v5.QueryRangeRequest, error) {
v3Params := params.Clone()
if v3Params == nil || v3Params.CompositeQuery == nil {
return nil, fmt.Errorf("v3 params or composite query is nil")
}
varItems := map[string]v5.VariableItem{}
for name, value := range v3Params.Variables {
varItems[name] = v5.VariableItem{
Type: v5.QueryVariableType, // doesn't matter at the moment
Value: value,
}
}
v5Request := &v5.QueryRangeRequest{
SchemaVersion: "v5",
Start: uint64(v3Params.Start),
End: uint64(v3Params.End),
RequestType: convertPanelTypeToRequestType(v3Params.CompositeQuery.PanelType),
Variables: varItems,
CompositeQuery: v5.CompositeQuery{
Queries: []v5.QueryEnvelope{},
},
FormatOptions: &v5.FormatOptions{
FormatTableResultForUI: v3Params.FormatForWeb,
FillGaps: v3Params.CompositeQuery.FillGaps,
},
}
// Convert based on query type
switch v3Params.CompositeQuery.QueryType {
case v3.QueryTypeBuilder:
if err := convertBuilderQueries(v3Params.CompositeQuery.BuilderQueries, &v5Request.CompositeQuery); err != nil {
return nil, err
}
case v3.QueryTypeClickHouseSQL:
if err := convertClickHouseQueries(v3Params.CompositeQuery.ClickHouseQueries, &v5Request.CompositeQuery); err != nil {
return nil, err
}
case v3.QueryTypePromQL:
if err := convertPromQueries(v3Params.CompositeQuery.PromQueries, v3Params.Step, &v5Request.CompositeQuery); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported query type: %s", v3Params.CompositeQuery.QueryType)
}
return v5Request, nil
}
func convertPanelTypeToRequestType(panelType v3.PanelType) v5.RequestType {
switch panelType {
case v3.PanelTypeValue, v3.PanelTypeTable:
return v5.RequestTypeScalar
case v3.PanelTypeGraph:
return v5.RequestTypeTimeSeries
case v3.PanelTypeList, v3.PanelTypeTrace:
return v5.RequestTypeRaw
default:
return v5.RequestTypeUnknown
}
}
func convertBuilderQueries(v3Queries map[string]*v3.BuilderQuery, v5Composite *v5.CompositeQuery) error {
for name, query := range v3Queries {
if query == nil {
continue
}
// Handle formula queries
if query.Expression != "" && query.Expression != name {
v5Envelope := v5.QueryEnvelope{
Type: v5.QueryTypeFormula,
Spec: v5.QueryBuilderFormula{
Name: name,
Expression: query.Expression,
Disabled: query.Disabled,
Order: convertOrderBy(query.OrderBy, query),
Limit: int(query.Limit),
Having: convertHaving(query.Having, query),
Functions: convertFunctions(query.Functions),
},
}
v5Composite.Queries = append(v5Composite.Queries, v5Envelope)
continue
}
// Regular builder query
envelope, err := convertSingleBuilderQuery(name, query)
if err != nil {
return err
}
v5Composite.Queries = append(v5Composite.Queries, envelope)
}
return nil
}
func convertSingleBuilderQuery(name string, v3Query *v3.BuilderQuery) (v5.QueryEnvelope, error) {
v5Envelope := v5.QueryEnvelope{
Type: v5.QueryTypeBuilder,
}
switch v3Query.DataSource {
case v3.DataSourceTraces:
v5Query := v5.QueryBuilderQuery[v5.TraceAggregation]{
Name: name,
Signal: telemetrytypes.SignalTraces,
Disabled: v3Query.Disabled,
StepInterval: v5.Step{Duration: time.Duration(v3Query.StepInterval) * time.Second},
Filter: convertFilter(v3Query.Filters),
GroupBy: convertGroupBy(v3Query.GroupBy),
Order: convertOrderBy(v3Query.OrderBy, v3Query),
Limit: int(v3Query.Limit),
Offset: int(v3Query.Offset),
Having: convertHaving(v3Query.Having, v3Query),
Functions: convertFunctions(v3Query.Functions),
SelectFields: convertSelectColumns(v3Query.SelectColumns),
}
// Convert trace aggregations
if v3Query.AggregateOperator != v3.AggregateOperatorNoOp {
v5Query.Aggregations = []v5.TraceAggregation{
{
Expression: buildTraceAggregationExpression(v3Query),
Alias: "",
},
}
}
v5Envelope.Spec = v5Query
case v3.DataSourceLogs:
v5Query := v5.QueryBuilderQuery[v5.LogAggregation]{
Name: name,
Signal: telemetrytypes.SignalLogs,
Disabled: v3Query.Disabled,
StepInterval: v5.Step{Duration: time.Duration(v3Query.StepInterval) * time.Second},
Filter: convertFilter(v3Query.Filters),
GroupBy: convertGroupBy(v3Query.GroupBy),
Order: convertOrderBy(v3Query.OrderBy, v3Query),
Limit: int(v3Query.PageSize),
Offset: int(v3Query.Offset),
Having: convertHaving(v3Query.Having, v3Query),
Functions: convertFunctions(v3Query.Functions),
}
// Convert log aggregations
if v3Query.AggregateOperator != v3.AggregateOperatorNoOp {
v5Query.Aggregations = []v5.LogAggregation{
{
Expression: buildLogAggregationExpression(v3Query),
Alias: "",
},
}
}
v5Envelope.Spec = v5Query
case v3.DataSourceMetrics:
v5Query := v5.QueryBuilderQuery[v5.MetricAggregation]{
Name: name,
Signal: telemetrytypes.SignalMetrics,
Disabled: v3Query.Disabled,
StepInterval: v5.Step{Duration: time.Duration(v3Query.StepInterval) * time.Second},
Filter: convertFilter(v3Query.Filters),
GroupBy: convertGroupBy(v3Query.GroupBy),
Order: convertOrderBy(v3Query.OrderBy, v3Query),
Limit: int(v3Query.Limit),
Offset: int(v3Query.Offset),
Having: convertHaving(v3Query.Having, v3Query),
Functions: convertFunctions(v3Query.Functions),
}
if v3Query.AggregateAttribute.Key != "" {
v5Query.Aggregations = []v5.MetricAggregation{
{
MetricName: v3Query.AggregateAttribute.Key,
Temporality: convertTemporality(v3Query.Temporality),
TimeAggregation: convertTimeAggregation(v3Query.TimeAggregation),
SpaceAggregation: convertSpaceAggregation(v3Query.SpaceAggregation),
},
}
}
v5Envelope.Spec = v5Query
default:
return v5Envelope, fmt.Errorf("unsupported data source: %s", v3Query.DataSource)
}
return v5Envelope, nil
}
func buildTraceAggregationExpression(v3Query *v3.BuilderQuery) string {
switch v3Query.AggregateOperator {
case v3.AggregateOperatorCount:
if v3Query.AggregateAttribute.Key != "" {
return fmt.Sprintf("count(%s)", v3Query.AggregateAttribute.Key)
}
return "count()"
case v3.AggregateOperatorCountDistinct:
if v3Query.AggregateAttribute.Key != "" {
return fmt.Sprintf("countDistinct(%s)", v3Query.AggregateAttribute.Key)
}
return "countDistinct()"
case v3.AggregateOperatorSum:
return fmt.Sprintf("sum(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorAvg:
return fmt.Sprintf("avg(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorMin:
return fmt.Sprintf("min(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorMax:
return fmt.Sprintf("max(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorP05:
return fmt.Sprintf("p05(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorP10:
return fmt.Sprintf("p10(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorP20:
return fmt.Sprintf("p20(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorP25:
return fmt.Sprintf("p25(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorP50:
return fmt.Sprintf("p50(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorP75:
return fmt.Sprintf("p75(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorP90:
return fmt.Sprintf("p90(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorP95:
return fmt.Sprintf("p95(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorP99:
return fmt.Sprintf("p99(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorRate:
return "rate()"
case v3.AggregateOperatorRateSum:
return fmt.Sprintf("rate_sum(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorRateAvg:
return fmt.Sprintf("rate_avg(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorRateMin:
return fmt.Sprintf("rate_min(%s)", v3Query.AggregateAttribute.Key)
case v3.AggregateOperatorRateMax:
return fmt.Sprintf("rate_max(%s)", v3Query.AggregateAttribute.Key)
default:
return "count()"
}
}
func buildLogAggregationExpression(v3Query *v3.BuilderQuery) string {
// Similar to traces
return buildTraceAggregationExpression(v3Query)
}
func convertFilter(v3Filter *v3.FilterSet) *v5.Filter {
if v3Filter == nil || len(v3Filter.Items) == 0 {
return nil
}
expressions := []string{}
for _, item := range v3Filter.Items {
expr := buildFilterExpression(item)
if expr != "" {
expressions = append(expressions, expr)
}
}
if len(expressions) == 0 {
return nil
}
operator := "AND"
if v3Filter.Operator == "OR" {
operator = "OR"
}
return &v5.Filter{
Expression: strings.Join(expressions, fmt.Sprintf(" %s ", operator)),
}
}
func buildFilterExpression(item v3.FilterItem) string {
key := item.Key.Key
value := item.Value
switch item.Operator {
case v3.FilterOperatorEqual:
return fmt.Sprintf("%s = %s", key, formatValue(value))
case v3.FilterOperatorNotEqual:
return fmt.Sprintf("%s != %s", key, formatValue(value))
case v3.FilterOperatorGreaterThan:
return fmt.Sprintf("%s > %s", key, formatValue(value))
case v3.FilterOperatorGreaterThanOrEq:
return fmt.Sprintf("%s >= %s", key, formatValue(value))
case v3.FilterOperatorLessThan:
return fmt.Sprintf("%s < %s", key, formatValue(value))
case v3.FilterOperatorLessThanOrEq:
return fmt.Sprintf("%s <= %s", key, formatValue(value))
case v3.FilterOperatorIn:
return fmt.Sprintf("%s IN %s", key, formatValue(value))
case v3.FilterOperatorNotIn:
return fmt.Sprintf("%s NOT IN %s", key, formatValue(value))
case v3.FilterOperatorContains:
return fmt.Sprintf("%s LIKE '%%%v%%'", key, value)
case v3.FilterOperatorNotContains:
return fmt.Sprintf("%s NOT LIKE '%%%v%%'", key, value)
case v3.FilterOperatorRegex:
return fmt.Sprintf("%s REGEXP %s", key, formatValue(value))
case v3.FilterOperatorNotRegex:
return fmt.Sprintf("%s NOT REGEXP %s", key, formatValue(value))
case v3.FilterOperatorExists:
return fmt.Sprintf("%s EXISTS", key)
case v3.FilterOperatorNotExists:
return fmt.Sprintf("%s NOT EXISTS", key)
default:
return ""
}
}
func formatValue(value interface{}) string {
return utils.ClickHouseFormattedValue(value)
}
func convertGroupBy(v3GroupBy []v3.AttributeKey) []v5.GroupByKey {
v5GroupBy := []v5.GroupByKey{}
for _, key := range v3GroupBy {
v5GroupBy = append(v5GroupBy, v5.GroupByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: key.Key,
FieldDataType: convertDataType(key.DataType),
FieldContext: convertAttributeType(key.Type),
Materialized: key.IsColumn,
},
})
}
return v5GroupBy
}
func convertOrderBy(v3OrderBy []v3.OrderBy, v3Query *v3.BuilderQuery) []v5.OrderBy {
v5OrderBy := []v5.OrderBy{}
for _, order := range v3OrderBy {
direction := v5.OrderDirectionAsc
if order.Order == v3.DirectionDesc {
direction = v5.OrderDirectionDesc
}
var orderByName string
if order.ColumnName == "#SIGNOZ_VALUE" {
if v3Query.DataSource == v3.DataSourceLogs || v3Query.DataSource == v3.DataSourceTraces {
orderByName = buildTraceAggregationExpression(v3Query)
} else {
if v3Query.Expression != v3Query.QueryName {
orderByName = v3Query.Expression
} else {
orderByName = fmt.Sprintf("%s(%s)", v3Query.SpaceAggregation, v3Query.AggregateAttribute.Key)
}
}
} else {
orderByName = order.ColumnName
}
v5OrderBy = append(v5OrderBy, v5.OrderBy{
Key: v5.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: orderByName,
Materialized: order.IsColumn,
},
},
Direction: direction,
})
}
return v5OrderBy
}
func convertHaving(v3Having []v3.Having, v3Query *v3.BuilderQuery) *v5.Having {
if len(v3Having) == 0 {
return nil
}
expressions := []string{}
for _, h := range v3Having {
var expr string
if v3Query.DataSource == v3.DataSourceLogs || v3Query.DataSource == v3.DataSourceTraces {
h.ColumnName = buildTraceAggregationExpression(v3Query)
} else {
if v3Query.Expression != v3Query.QueryName {
h.ColumnName = v3Query.Expression
} else {
h.ColumnName = fmt.Sprintf("%s(%s)", v3Query.SpaceAggregation, v3Query.AggregateAttribute.Key)
}
}
expr = buildHavingExpression(h)
if expr != "" {
expressions = append(expressions, expr)
}
}
if len(expressions) == 0 {
return nil
}
return &v5.Having{
Expression: strings.Join(expressions, " AND "),
}
}
func buildHavingExpression(having v3.Having) string {
switch having.Operator {
case v3.HavingOperatorEqual:
return fmt.Sprintf("%s = %s", having.ColumnName, formatValue(having.Value))
case v3.HavingOperatorNotEqual:
return fmt.Sprintf("%s != %s", having.ColumnName, formatValue(having.Value))
case v3.HavingOperatorGreaterThan:
return fmt.Sprintf("%s > %s", having.ColumnName, formatValue(having.Value))
case v3.HavingOperatorGreaterThanOrEq:
return fmt.Sprintf("%s >= %s", having.ColumnName, formatValue(having.Value))
case v3.HavingOperatorLessThan:
return fmt.Sprintf("%s < %s", having.ColumnName, formatValue(having.Value))
case v3.HavingOperatorLessThanOrEq:
return fmt.Sprintf("%s <= %s", having.ColumnName, formatValue(having.Value))
case v3.HavingOperatorIn:
return fmt.Sprintf("%s IN %s", having.ColumnName, formatValue(having.Value))
case v3.HavingOperatorNotIn:
return fmt.Sprintf("%s NOT IN %s", having.ColumnName, formatValue(having.Value))
default:
return ""
}
}
func convertFunctions(v3Functions []v3.Function) []v5.Function {
v5Functions := []v5.Function{}
for _, fn := range v3Functions {
v5Fn := v5.Function{
Name: convertFunctionName(fn.Name),
Args: []v5.FunctionArg{},
}
for _, arg := range fn.Args {
v5Fn.Args = append(v5Fn.Args, v5.FunctionArg{
Value: arg,
})
}
for name, value := range fn.NamedArgs {
v5Fn.Args = append(v5Fn.Args, v5.FunctionArg{
Name: name,
Value: value,
})
}
v5Functions = append(v5Functions, v5Fn)
}
return v5Functions
}
func convertFunctionName(v3Name v3.FunctionName) v5.FunctionName {
switch v3Name {
case v3.FunctionNameCutOffMin:
return v5.FunctionNameCutOffMin
case v3.FunctionNameCutOffMax:
return v5.FunctionNameCutOffMax
case v3.FunctionNameClampMin:
return v5.FunctionNameClampMin
case v3.FunctionNameClampMax:
return v5.FunctionNameClampMax
case v3.FunctionNameAbsolute:
return v5.FunctionNameAbsolute
case v3.FunctionNameRunningDiff:
return v5.FunctionNameRunningDiff
case v3.FunctionNameLog2:
return v5.FunctionNameLog2
case v3.FunctionNameLog10:
return v5.FunctionNameLog10
case v3.FunctionNameCumSum:
return v5.FunctionNameCumulativeSum
case v3.FunctionNameEWMA3:
return v5.FunctionNameEWMA3
case v3.FunctionNameEWMA5:
return v5.FunctionNameEWMA5
case v3.FunctionNameEWMA7:
return v5.FunctionNameEWMA7
case v3.FunctionNameMedian3:
return v5.FunctionNameMedian3
case v3.FunctionNameMedian5:
return v5.FunctionNameMedian5
case v3.FunctionNameMedian7:
return v5.FunctionNameMedian7
case v3.FunctionNameTimeShift:
return v5.FunctionNameTimeShift
case v3.FunctionNameAnomaly:
return v5.FunctionNameAnomaly
default:
return v5.FunctionName{}
}
}
func convertSelectColumns(cols []v3.AttributeKey) []telemetrytypes.TelemetryFieldKey {
fields := []telemetrytypes.TelemetryFieldKey{}
for _, key := range cols {
newKey := telemetrytypes.TelemetryFieldKey{
Name: key.Key,
}
if _, exists := constants.NewStaticFieldsTraces[key.Key]; exists {
fields = append(fields, newKey)
continue
}
if _, exists := constants.DeprecatedStaticFieldsTraces[key.Key]; exists {
fields = append(fields, newKey)
continue
}
if _, exists := constants.StaticFieldsLogsV3[key.Key]; exists {
fields = append(fields, newKey)
continue
}
newKey.FieldDataType = convertDataType(key.DataType)
newKey.FieldContext = convertAttributeType(key.Type)
newKey.Materialized = key.IsColumn
}
return fields
}
func convertDataType(v3Type v3.AttributeKeyDataType) telemetrytypes.FieldDataType {
switch v3Type {
case v3.AttributeKeyDataTypeString:
return telemetrytypes.FieldDataTypeString
case v3.AttributeKeyDataTypeInt64:
return telemetrytypes.FieldDataTypeInt64
case v3.AttributeKeyDataTypeFloat64:
return telemetrytypes.FieldDataTypeFloat64
case v3.AttributeKeyDataTypeBool:
return telemetrytypes.FieldDataTypeBool
case v3.AttributeKeyDataTypeArrayString:
return telemetrytypes.FieldDataTypeArrayString
case v3.AttributeKeyDataTypeArrayInt64:
return telemetrytypes.FieldDataTypeArrayInt64
case v3.AttributeKeyDataTypeArrayFloat64:
return telemetrytypes.FieldDataTypeArrayFloat64
case v3.AttributeKeyDataTypeArrayBool:
return telemetrytypes.FieldDataTypeArrayBool
default:
return telemetrytypes.FieldDataTypeUnspecified
}
}
func convertAttributeType(v3Type v3.AttributeKeyType) telemetrytypes.FieldContext {
switch v3Type {
case v3.AttributeKeyTypeTag:
return telemetrytypes.FieldContextAttribute
case v3.AttributeKeyTypeResource:
return telemetrytypes.FieldContextResource
case v3.AttributeKeyTypeInstrumentationScope:
return telemetrytypes.FieldContextScope
default:
return telemetrytypes.FieldContextUnspecified
}
}
func convertTemporality(v3Temp v3.Temporality) metrictypes.Temporality {
switch v3Temp {
case v3.Delta:
return metrictypes.Delta
case v3.Cumulative:
return metrictypes.Cumulative
default:
return metrictypes.Unspecified
}
}
func convertTimeAggregation(v3TimeAgg v3.TimeAggregation) metrictypes.TimeAggregation {
switch v3TimeAgg {
case v3.TimeAggregationAnyLast:
return metrictypes.TimeAggregationLatest
case v3.TimeAggregationSum:
return metrictypes.TimeAggregationSum
case v3.TimeAggregationAvg:
return metrictypes.TimeAggregationAvg
case v3.TimeAggregationMin:
return metrictypes.TimeAggregationMin
case v3.TimeAggregationMax:
return metrictypes.TimeAggregationMax
case v3.TimeAggregationCount:
return metrictypes.TimeAggregationCount
case v3.TimeAggregationCountDistinct:
return metrictypes.TimeAggregationCountDistinct
case v3.TimeAggregationRate:
return metrictypes.TimeAggregationRate
case v3.TimeAggregationIncrease:
return metrictypes.TimeAggregationIncrease
default:
return metrictypes.TimeAggregationUnspecified
}
}
func convertSpaceAggregation(v3SpaceAgg v3.SpaceAggregation) metrictypes.SpaceAggregation {
switch v3SpaceAgg {
case v3.SpaceAggregationSum:
return metrictypes.SpaceAggregationSum
case v3.SpaceAggregationAvg:
return metrictypes.SpaceAggregationAvg
case v3.SpaceAggregationMin:
return metrictypes.SpaceAggregationMin
case v3.SpaceAggregationMax:
return metrictypes.SpaceAggregationMax
case v3.SpaceAggregationCount:
return metrictypes.SpaceAggregationCount
case v3.SpaceAggregationPercentile50:
return metrictypes.SpaceAggregationPercentile50
case v3.SpaceAggregationPercentile75:
return metrictypes.SpaceAggregationPercentile75
case v3.SpaceAggregationPercentile90:
return metrictypes.SpaceAggregationPercentile90
case v3.SpaceAggregationPercentile95:
return metrictypes.SpaceAggregationPercentile95
case v3.SpaceAggregationPercentile99:
return metrictypes.SpaceAggregationPercentile99
default:
return metrictypes.SpaceAggregationUnspecified
}
}
func convertClickHouseQueries(v3Queries map[string]*v3.ClickHouseQuery, v5Composite *v5.CompositeQuery) error {
for name, query := range v3Queries {
if query == nil {
continue
}
v5Envelope := v5.QueryEnvelope{
Type: v5.QueryTypeClickHouseSQL,
Spec: v5.ClickHouseQuery{
Name: name,
Query: query.Query,
Disabled: query.Disabled,
},
}
v5Composite.Queries = append(v5Composite.Queries, v5Envelope)
}
return nil
}
func convertPromQueries(v3Queries map[string]*v3.PromQuery, step int64, v5Composite *v5.CompositeQuery) error {
for name, query := range v3Queries {
if query == nil {
continue
}
v5Envelope := v5.QueryEnvelope{
Type: v5.QueryTypePromQL,
Spec: v5.PromQuery{
Name: name,
Query: query.Query,
Disabled: query.Disabled,
Step: v5.Step{Duration: time.Duration(step) * time.Second},
Stats: query.Stats != "",
},
}
v5Composite.Queries = append(v5Composite.Queries, v5Envelope)
}
return nil
}

View File

@@ -1,442 +0,0 @@
package transition
import (
"encoding/json"
"fmt"
"sort"
"strings"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
v5 "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
func ConvertV3ResponseToV5(v3Response *v3.QueryRangeResponse, requestType v5.RequestType) (*v5.QueryRangeResponse, error) {
if v3Response == nil {
return nil, fmt.Errorf("v3 response is nil")
}
v5Response := &v5.QueryRangeResponse{
Type: requestType,
}
switch requestType {
case v5.RequestTypeTimeSeries:
data, err := convertToTimeSeriesData(v3Response.Result)
if err != nil {
return nil, err
}
v5Response.Data = data
case v5.RequestTypeScalar:
data, err := convertToScalarData(v3Response.Result)
if err != nil {
return nil, err
}
v5Response.Data = data
case v5.RequestTypeRaw:
data, err := convertToRawData(v3Response.Result)
if err != nil {
return nil, err
}
v5Response.Data = data
default:
return nil, fmt.Errorf("unsupported request type: %v", requestType)
}
return v5Response, nil
}
func convertToTimeSeriesData(v3Results []*v3.Result) ([]*v5.TimeSeriesData, error) {
v5Data := []*v5.TimeSeriesData{}
for _, result := range v3Results {
if result == nil {
continue
}
tsData := &v5.TimeSeriesData{
QueryName: result.QueryName,
Aggregations: []*v5.AggregationBucket{},
}
if len(result.Series) > 0 {
bucket := &v5.AggregationBucket{
Index: 0,
Alias: "",
Series: convertSeries(result.Series),
}
tsData.Aggregations = append(tsData.Aggregations, bucket)
}
v5Data = append(v5Data, tsData)
}
return v5Data, nil
}
func convertSeries(v3Series []*v3.Series) []*v5.TimeSeries {
v5Series := []*v5.TimeSeries{}
for _, series := range v3Series {
if series == nil {
continue
}
v5TimeSeries := &v5.TimeSeries{
Labels: convertLabels(series.Labels),
Values: convertPoints(series.Points),
}
v5Series = append(v5Series, v5TimeSeries)
}
return v5Series
}
func convertLabels(v3Labels map[string]string) []*v5.Label {
v5Labels := []*v5.Label{}
keys := make([]string, 0, len(v3Labels))
for k := range v3Labels {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
v5Labels = append(v5Labels, &v5.Label{
Key: telemetrytypes.TelemetryFieldKey{
Name: key,
},
Value: v3Labels[key],
})
}
return v5Labels
}
func convertPoints(v3Points []v3.Point) []*v5.TimeSeriesValue {
v5Values := []*v5.TimeSeriesValue{}
for _, point := range v3Points {
v5Values = append(v5Values, &v5.TimeSeriesValue{
Timestamp: point.Timestamp,
Value: point.Value,
})
}
return v5Values
}
func convertToScalarData(v3Results []*v3.Result) (*v5.ScalarData, error) {
scalarData := &v5.ScalarData{
Columns: []*v5.ColumnDescriptor{},
Data: [][]any{},
}
for _, result := range v3Results {
if result == nil || result.Table == nil {
continue
}
for _, col := range result.Table.Columns {
columnType := v5.ColumnTypeGroup
if col.IsValueColumn {
columnType = v5.ColumnTypeAggregation
}
scalarData.Columns = append(scalarData.Columns, &v5.ColumnDescriptor{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: col.Name,
},
QueryName: col.QueryName,
AggregationIndex: 0,
Type: columnType,
})
}
for _, row := range result.Table.Rows {
rowData := []any{}
for _, col := range result.Table.Columns {
if val, ok := row.Data[col.Name]; ok {
rowData = append(rowData, val)
} else {
rowData = append(rowData, nil)
}
}
scalarData.Data = append(scalarData.Data, rowData)
}
}
return scalarData, nil
}
func convertToRawData(v3Results []*v3.Result) ([]*v5.RawData, error) {
v5Data := []*v5.RawData{}
for _, result := range v3Results {
if result == nil {
continue
}
rawData := &v5.RawData{
QueryName: result.QueryName,
NextCursor: "",
Rows: []*v5.RawRow{},
}
for _, row := range result.List {
if row == nil {
continue
}
dataMap := make(map[string]*any)
for k, v := range row.Data {
val := v
dataMap[k] = &val
}
rawData.Rows = append(rawData.Rows, &v5.RawRow{
Timestamp: row.Timestamp,
Data: dataMap,
})
}
v5Data = append(v5Data, rawData)
}
return v5Data, nil
}
func LogV5Response(response *v5.QueryRangeResponse, logger func(string)) {
if response == nil {
logger("Response: nil")
return
}
logger(fmt.Sprintf("[%s] Meta{rows:%d bytes:%d ms:%d}",
response.Type, response.Meta.RowsScanned, response.Meta.BytesScanned, response.Meta.DurationMS))
switch response.Type {
case v5.RequestTypeTimeSeries:
logTimeSeriesDataCompact(response.Data, logger)
case v5.RequestTypeScalar:
logScalarDataCompact(response.Data, logger)
case v5.RequestTypeRaw:
logRawDataCompact(response.Data, logger)
default:
logger(fmt.Sprintf("Unknown response type: %v", response.Type))
}
}
func logTimeSeriesDataCompact(data any, logger func(string)) {
tsData, ok := data.([]*v5.TimeSeriesData)
if !ok {
logger("ERROR: Failed to cast data to TimeSeriesData")
return
}
sort.Slice(tsData, func(i, j int) bool {
return tsData[i].QueryName < tsData[j].QueryName
})
for _, ts := range tsData {
allSeries := flattenSeries(ts.Aggregations)
sort.Slice(allSeries, func(i, j int) bool {
return createLabelSignature(allSeries[i].Labels) < createLabelSignature(allSeries[j].Labels)
})
for _, series := range allSeries {
labels := []string{}
for _, label := range series.Labels {
labels = append(labels, fmt.Sprintf("%s:%v", label.Key.Name, label.Value))
}
labelStr := strings.Join(labels, ",")
values := make([]*v5.TimeSeriesValue, len(series.Values))
copy(values, series.Values)
sort.Slice(values, func(i, j int) bool {
return values[i].Timestamp < values[j].Timestamp
})
valueStrs := []string{}
for _, val := range values {
relTime := val.Timestamp
if len(values) > 0 && values[0].Timestamp > 0 {
relTime = (val.Timestamp - values[0].Timestamp) / 1000 // Convert to seconds
}
valueStrs = append(valueStrs, fmt.Sprintf("%d:%.2f", relTime, val.Value))
}
logger(fmt.Sprintf("%s {%s} [%s]", ts.QueryName, labelStr, strings.Join(valueStrs, " ")))
}
}
}
func createLabelSignature(labels []*v5.Label) string {
parts := []string{}
for _, label := range labels {
parts = append(parts, fmt.Sprintf("%s=%v", label.Key.Name, label.Value))
}
sort.Strings(parts)
return strings.Join(parts, ",")
}
func logScalarDataCompact(data any, logger func(string)) {
scalar, ok := data.(*v5.ScalarData)
if !ok {
logger("ERROR: Failed to cast data to ScalarData")
return
}
colNames := []string{}
for _, col := range scalar.Columns {
colNames = append(colNames, col.Name)
}
logger(fmt.Sprintf("SCALAR [%s]", strings.Join(colNames, "|")))
for i, row := range scalar.Data {
rowVals := []string{}
for _, val := range row {
rowVals = append(rowVals, fmt.Sprintf("%v", val))
}
logger(fmt.Sprintf(" %d: [%s]", i, strings.Join(rowVals, "|")))
}
}
func flattenSeries(buckets []*v5.AggregationBucket) []*v5.TimeSeries {
var allSeries []*v5.TimeSeries
for _, bucket := range buckets {
allSeries = append(allSeries, bucket.Series...)
}
return allSeries
}
func logRawDataCompact(data any, logger func(string)) {
rawData, ok := data.([]*v5.RawData)
if !ok {
logger("ERROR: Failed to cast data to RawData")
return
}
sort.Slice(rawData, func(i, j int) bool {
return rawData[i].QueryName < rawData[j].QueryName
})
for _, rd := range rawData {
logger(fmt.Sprintf("RAW %s (rows:%d cursor:%s)", rd.QueryName, len(rd.Rows), rd.NextCursor))
rows := make([]*v5.RawRow, len(rd.Rows))
copy(rows, rd.Rows)
sort.Slice(rows, func(i, j int) bool {
return rows[i].Timestamp.Before(rows[j].Timestamp)
})
allFields := make(map[string]bool)
for _, row := range rows {
for k := range row.Data {
allFields[k] = true
}
}
fieldNames := []string{}
for k := range allFields {
fieldNames = append(fieldNames, k)
}
sort.Strings(fieldNames)
logger(fmt.Sprintf(" Fields: [%s]", strings.Join(fieldNames, "|")))
for i, row := range rows {
vals := []string{}
for _, field := range fieldNames {
if val, exists := row.Data[field]; exists && val != nil {
vals = append(vals, fmt.Sprintf("%v", *val))
} else {
vals = append(vals, "-")
}
}
tsStr := row.Timestamp.Format("15:04:05")
logger(fmt.Sprintf(" %d@%s: [%s]", i, tsStr, strings.Join(vals, "|")))
}
}
}
func LogV5ResponseJSON(response *v5.QueryRangeResponse, logger func(string)) {
sortedResponse := sortV5ResponseForLogging(response)
jsonBytes, err := json.MarshalIndent(sortedResponse, "", " ")
if err != nil {
logger(fmt.Sprintf("ERROR: Failed to marshal response: %v", err))
return
}
logger(string(jsonBytes))
}
func sortV5ResponseForLogging(response *v5.QueryRangeResponse) *v5.QueryRangeResponse {
if response == nil {
return nil
}
responseCopy := &v5.QueryRangeResponse{
Type: response.Type,
Meta: response.Meta,
}
switch response.Type {
case v5.RequestTypeTimeSeries:
if tsData, ok := response.Data.([]*v5.TimeSeriesData); ok {
sortedData := make([]*v5.TimeSeriesData, len(tsData))
for i, ts := range tsData {
sortedData[i] = &v5.TimeSeriesData{
QueryName: ts.QueryName,
Aggregations: make([]*v5.AggregationBucket, len(ts.Aggregations)),
}
for j, bucket := range ts.Aggregations {
sortedBucket := &v5.AggregationBucket{
Index: bucket.Index,
Alias: bucket.Alias,
Series: make([]*v5.TimeSeries, len(bucket.Series)),
}
for k, series := range bucket.Series {
sortedSeries := &v5.TimeSeries{
Labels: series.Labels,
Values: make([]*v5.TimeSeriesValue, len(series.Values)),
}
copy(sortedSeries.Values, series.Values)
sort.Slice(sortedSeries.Values, func(i, j int) bool {
return sortedSeries.Values[i].Timestamp < sortedSeries.Values[j].Timestamp
})
sortedBucket.Series[k] = sortedSeries
}
sort.Slice(sortedBucket.Series, func(i, j int) bool {
return createLabelSignature(sortedBucket.Series[i].Labels) <
createLabelSignature(sortedBucket.Series[j].Labels)
})
sortedData[i].Aggregations[j] = sortedBucket
}
}
sort.Slice(sortedData, func(i, j int) bool {
return sortedData[i].QueryName < sortedData[j].QueryName
})
responseCopy.Data = sortedData
}
default:
responseCopy.Data = response.Data
}
return responseCopy
}

15
pkg/querybuilder/init.go Normal file
View File

@@ -0,0 +1,15 @@
package querybuilder
import (
"os"
"strings"
)
var QBV5Enabled = false
func init() {
v := os.Getenv("ENABLE_QB_V5")
if strings.ToLower(v) == "true" || strings.ToLower(v) == "1" {
QBV5Enabled = true
}
}

View File

@@ -33,6 +33,8 @@ func ToNanoSecs(epoch uint64) uint64 {
return temp * uint64(math.Pow(10, float64(19-count)))
}
// TODO(srikanthccv): should these be rounded to nearest multiple of 60 instead of 5 if step > 60?
// That would make graph look nice but "nice"ness should be less important than the usefulness
func RecommendedStepInterval(start, end uint64) uint64 {
start = ToNanoSecs(start)
end = ToNanoSecs(end)
@@ -134,29 +136,6 @@ func AdjustedMetricTimeRange(start, end, step uint64, mq qbtypes.QueryBuilderQue
return start, end
}
func GCD(a, b int64) int64 {
for b != 0 {
a, b = b, a%b
}
return a
}
func LCM(a, b int64) int64 {
return (a * b) / GCD(a, b)
}
// LCMList computes the LCM of a list of int64 numbers.
func LCMList(nums []int64) int64 {
if len(nums) == 0 {
return 1
}
result := nums[0]
for _, num := range nums[1:] {
result = LCM(result, num)
}
return result
}
func AssignReservedVars(vars map[string]any, start, end uint64) {
start = ToNanoSecs(start)
end = ToNanoSecs(end)

View File

@@ -1,6 +1,7 @@
package signoz
import (
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/modules/apdex"
"github.com/SigNoz/signoz/pkg/modules/apdex/implapdex"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
@@ -30,14 +31,14 @@ type Handlers struct {
TraceFunnel tracefunnel.Handler
}
func NewHandlers(modules Modules) Handlers {
func NewHandlers(modules Modules, providerSettings factory.ProviderSettings) Handlers {
return Handlers{
Organization: implorganization.NewHandler(modules.OrgGetter, modules.OrgSetter),
Preference: implpreference.NewHandler(modules.Preference),
User: impluser.NewHandler(modules.User),
SavedView: implsavedview.NewHandler(modules.SavedView),
Apdex: implapdex.NewHandler(modules.Apdex),
Dashboard: impldashboard.NewHandler(modules.Dashboard),
Dashboard: impldashboard.NewHandler(modules.Dashboard, providerSettings),
QuickFilter: implquickfilter.NewHandler(modules.QuickFilter),
TraceFunnel: impltracefunnel.NewHandler(modules.TraceFunnel),
}

View File

@@ -35,7 +35,7 @@ func TestNewHandlers(t *testing.T) {
emailing := emailingtest.New()
modules := NewModules(sqlstore, jwt, emailing, providerSettings, orgGetter, alertmanager, nil)
handlers := NewHandlers(modules)
handlers := NewHandlers(modules, providerSettings)
reflectVal := reflect.ValueOf(handlers)
for i := 0; i < reflectVal.NumField(); i++ {

View File

@@ -77,7 +77,12 @@ func NewSQLSchemaProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedMap[
)
}
func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.NamedMap[factory.ProviderFactory[sqlmigration.SQLMigration, sqlmigration.Config]] {
func NewSQLMigrationProviderFactories(
sqlstore sqlstore.SQLStore,
sqlschema sqlschema.SQLSchema,
telemetryStore telemetrystore.TelemetryStore,
providerSettings factory.ProviderSettings,
) factory.NamedMap[factory.ProviderFactory[sqlmigration.SQLMigration, sqlmigration.Config]] {
return factory.MustNewNamedMap(
sqlmigration.NewAddDataMigrationsFactory(),
sqlmigration.NewAddOrganizationFactory(),

View File

@@ -40,7 +40,12 @@ func TestNewProviderFactories(t *testing.T) {
})
assert.NotPanics(t, func() {
NewSQLMigrationProviderFactories(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual), sqlschematest.New(map[string]*sqlschema.Table{}, map[string][]*sqlschema.UniqueConstraint{}, map[string]sqlschema.Index{}))
NewSQLMigrationProviderFactories(
sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual),
sqlschematest.New(map[string]*sqlschema.Table{}, map[string][]*sqlschema.UniqueConstraint{}, map[string]sqlschema.Index{}),
telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherEqual),
instrumentationtest.New().ToProviderSettings(),
)
})
assert.NotPanics(t, func() {

View File

@@ -201,7 +201,7 @@ func New(
ctx,
providerSettings,
config.SQLMigration,
NewSQLMigrationProviderFactories(sqlstore, sqlschema),
NewSQLMigrationProviderFactories(sqlstore, sqlschema, telemetrystore, providerSettings),
)
if err != nil {
return nil, err
@@ -268,7 +268,7 @@ func New(
modules := NewModules(sqlstore, jwt, emailing, providerSettings, orgGetter, alertmanager, analytics)
// Initialize all handlers for the modules
handlers := NewHandlers(modules)
handlers := NewHandlers(modules, providerSettings)
// Create a list of all stats collectors
statsCollectors := []statsreporter.StatsCollector{

View File

@@ -0,0 +1,299 @@
package sqlmigration
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/transition"
"github.com/SigNoz/signoz/pkg/types"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type queryBuilderV5Migration struct {
store sqlstore.SQLStore
telemetryStore telemetrystore.TelemetryStore
logger *slog.Logger
}
func NewQueryBuilderV5MigrationFactory(
store sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(
factory.MustNewName("query_builder_v5_migration"),
func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return newQueryBuilderV5Migration(ctx, c, store, telemetryStore, ps.Logger)
})
}
func newQueryBuilderV5Migration(
_ context.Context,
_ Config, store sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
logger *slog.Logger,
) (SQLMigration, error) {
return &queryBuilderV5Migration{store: store, telemetryStore: telemetryStore, logger: logger}, nil
}
func (migration *queryBuilderV5Migration) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *queryBuilderV5Migration) getTraceDuplicateKeys(ctx context.Context) ([]string, error) {
query := `
SELECT tagKey
FROM signoz_traces.distributed_span_attributes_keys
WHERE tagType IN ('tag', 'resource')
GROUP BY tagKey
HAVING COUNT(DISTINCT tagType) > 1
ORDER BY tagKey
`
rows, err := migration.telemetryStore.ClickhouseDB().Query(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to query trace duplicate keys: %w", err)
}
defer rows.Close()
var keys []string
for rows.Next() {
var key string
if err := rows.Scan(&key); err != nil {
return nil, fmt.Errorf("failed to scan trace duplicate key: %w", err)
}
keys = append(keys, key)
}
return keys, nil
}
func (migration *queryBuilderV5Migration) getLogDuplicateKeys(ctx context.Context) ([]string, error) {
query := `
SELECT name
FROM (
SELECT DISTINCT name FROM signoz_logs.distributed_logs_attribute_keys
INTERSECT
SELECT DISTINCT name FROM signoz_logs.distributed_logs_resource_keys
)
ORDER BY name
`
rows, err := migration.telemetryStore.ClickhouseDB().Query(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to query log duplicate keys: %w", err)
}
defer rows.Close()
var keys []string
for rows.Next() {
var key string
if err := rows.Scan(&key); err != nil {
return nil, fmt.Errorf("failed to scan log duplicate key: %w", err)
}
keys = append(keys, key)
}
return keys, nil
}
func (migration *queryBuilderV5Migration) Up(ctx context.Context, db *bun.DB) error {
// fetch keys that have both attribute and resource attribute types
logsKeys, err := migration.getLogDuplicateKeys(ctx)
if err != nil {
return err
}
tracesKeys, err := migration.getTraceDuplicateKeys(ctx)
if err != nil {
return err
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
if err := migration.migrateDashboards(ctx, tx, logsKeys, tracesKeys); err != nil {
return err
}
if err := migration.migrateSavedViews(ctx, tx, logsKeys, tracesKeys); err != nil {
return err
}
if err := migration.migrateRules(ctx, tx, logsKeys, tracesKeys); err != nil {
return err
}
return tx.Commit()
}
func (migration *queryBuilderV5Migration) Down(ctx context.Context, db *bun.DB) error {
// this migration is not reversible as we're transforming the structure
return nil
}
func (migration *queryBuilderV5Migration) migrateDashboards(
ctx context.Context,
tx bun.Tx,
logsKeys []string,
tracesKeys []string,
) error {
var dashboards []struct {
ID string `bun:"id"`
Data map[string]any `bun:"data"`
}
err := tx.NewSelect().
Table("dashboard").
Column("id", "data").
Scan(ctx, &dashboards)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}
dashboardMigrator := transition.NewDashboardMigrateV5(migration.logger, logsKeys, tracesKeys)
for _, dashboard := range dashboards {
updated := dashboardMigrator.Migrate(ctx, dashboard.Data)
if updated {
dashboard.Data["version"] = "v5"
dataJSON, err := json.Marshal(dashboard.Data)
if err != nil {
return err
}
_, err = tx.NewUpdate().
Table("dashboard").
Set("data = ?", string(dataJSON)).
Where("id = ?", dashboard.ID).
Exec(ctx)
if err != nil {
return err
}
}
}
return nil
}
func (migration *queryBuilderV5Migration) migrateSavedViews(
ctx context.Context,
tx bun.Tx,
logsKeys []string,
tracesKeys []string,
) error {
var savedViews []*types.SavedView
err := tx.NewSelect().
Model(&savedViews).
Scan(ctx)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}
savedViewsMigrator := transition.NewSavedViewMigrateV5(migration.logger, logsKeys, tracesKeys)
for _, savedView := range savedViews {
var data map[string]any
if err := json.Unmarshal([]byte(savedView.Data), &data); err != nil {
continue // invalid JSON
}
updated := savedViewsMigrator.Migrate(ctx, data)
if updated {
dataJSON, err := json.Marshal(data)
if err != nil {
return err
}
_, err = tx.NewUpdate().
Model((*types.SavedView)(nil)).
Set("data = ?", string(dataJSON)).
Where("id = ?", savedView.ID).
Exec(ctx)
if err != nil {
return err
}
}
}
return nil
}
func (migration *queryBuilderV5Migration) migrateRules(
ctx context.Context,
tx bun.Tx,
logsKeys []string,
tracesKeys []string,
) error {
// Fetch all rules
var rules []struct {
ID string `bun:"id"`
Data map[string]any `bun:"data"`
}
err := tx.NewSelect().
Table("rule").
Column("id", "data").
Scan(ctx, &rules)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}
alertsMigrator := transition.NewAlertMigrateV5(migration.logger, logsKeys, tracesKeys)
for _, rule := range rules {
migration.logger.InfoContext(ctx, "migrating rule", "rule.id", rule.ID)
updated := alertsMigrator.Migrate(ctx, rule.Data)
if updated {
rule.Data["version"] = "v5"
fmt.Println("updated rule", rule.ID)
dataJSON, err := json.Marshal(rule.Data)
if err != nil {
return err
}
_, err = tx.NewUpdate().
Table("rule").
Set("data = ?", string(dataJSON)).
Where("id = ?", rule.ID).
Exec(ctx)
if err != nil {
return err
}
}
}
return nil
}

View File

@@ -355,6 +355,17 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
sb.Having(rewrittenExpr)
}
if len(query.Order) != 0 {
// Add order by
for _, orderBy := range query.Order {
_, ok := aggOrderBy(orderBy, query)
if !ok {
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
}
}
sb.OrderBy("ts desc")
}
combinedArgs := append(allGroupByArgs, allAggChArgs...)
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
@@ -372,6 +383,16 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
sb.Having(rewrittenExpr)
}
if len(query.Order) != 0 {
// Add order by
for _, orderBy := range query.Order {
_, ok := aggOrderBy(orderBy, query)
if !ok {
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
}
}
sb.OrderBy("ts desc")
}
combinedArgs := append(allGroupByArgs, allAggChArgs...)
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)

View File

@@ -107,7 +107,7 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY `service.name` desc LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY `service.name` desc LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name` ORDER BY `service.name` desc, ts desc",
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
},
expectedErr: nil,

View File

@@ -103,6 +103,8 @@ func (b *traceQueryStatementBuilder) Build(
return b.buildTimeSeriesQuery(ctx, q, query, start, end, keys, variables)
case qbtypes.RequestTypeScalar:
return b.buildScalarQuery(ctx, q, query, start, end, keys, variables, false, false)
case qbtypes.RequestTypeTrace:
return b.buildTraceExplorerQuery(ctx, q, query, start, end, keys, variables)
}
return nil, fmt.Errorf("unsupported request type: %s", requestType)
@@ -338,6 +340,114 @@ func (b *traceQueryStatementBuilder) buildListQuery(
}, nil
}
func (b *traceQueryStatementBuilder) buildTraceExplorerQuery(
ctx context.Context,
_ *sqlbuilder.SelectBuilder,
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation],
start, end uint64,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
startBucket := start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment
endBucket := end / querybuilder.NsToSeconds
distSB := sqlbuilder.NewSelectBuilder()
distSB.Select("trace_id")
distSB.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName))
var (
cteFragments []string
cteArgs [][]any
)
if frag, args, err := b.maybeAttachResourceFilter(ctx, distSB, query, start, end, variables); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
// Add filter conditions
warnings, err := b.addFilterCondition(ctx, distSB, start, end, query, keys, variables)
if err != nil {
return nil, err
}
distSQL, distArgs := distSB.BuildWithFlavor(sqlbuilder.ClickHouse)
cteFragments = append(cteFragments, fmt.Sprintf("__toe AS (%s)", distSQL))
cteArgs = append(cteArgs, distArgs)
// Build the inner subquery for root spans
innerSB := sqlbuilder.NewSelectBuilder()
innerSB.Select("trace_id", "duration_nano", sqlbuilder.Escape("resource_string_service$$name as `service.name`"), "name")
innerSB.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName))
innerSB.Where("parent_span_id = ''")
// Add time filter to inner query
innerSB.Where(
innerSB.GE("timestamp", fmt.Sprintf("%d", start)),
innerSB.L("timestamp", fmt.Sprintf("%d", end)),
innerSB.GE("ts_bucket_start", startBucket),
innerSB.LE("ts_bucket_start", endBucket))
// order by duration and limit 1 per trace
innerSB.OrderBy("duration_nano DESC")
innerSB.SQL("LIMIT 1 BY trace_id")
innerSQL, innerArgs := innerSB.BuildWithFlavor(sqlbuilder.ClickHouse)
cteFragments = append(cteFragments, fmt.Sprintf("__toe_duration_sorted AS (%s)", innerSQL))
cteArgs = append(cteArgs, innerArgs)
// main query that joins everything
mainSB := sqlbuilder.NewSelectBuilder()
mainSB.Select(
"__toe_duration_sorted.`service.name` AS `service.name`",
"__toe_duration_sorted.name AS `name`",
"count() AS span_count",
"__toe_duration_sorted.duration_nano AS `duration_nano`",
"__toe_duration_sorted.trace_id AS `trace_id`",
)
// Join the distributed table with the inner subquery
mainSB.SQL("FROM __toe")
mainSB.SQL("INNER JOIN __toe_duration_sorted")
mainSB.SQL("ON __toe.trace_id = __toe_duration_sorted.trace_id")
// Group by trace-level fields
mainSB.GroupBy("trace_id", "duration_nano", "name", "`service.name`")
// order by duration only supported for now
mainSB.OrderBy("duration_nano DESC")
// Limit by trace_id to ensure one row per trace
mainSB.SQL("LIMIT 1 BY trace_id")
if query.Limit > 0 {
mainSB.Limit(query.Limit)
} else {
mainSB.Limit(100)
}
if query.Offset > 0 {
mainSB.Offset(query.Offset)
}
mainSQL, mainArgs := mainSB.BuildWithFlavor(sqlbuilder.ClickHouse)
// combine it all together: WITH … SELECT …
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
return &qbtypes.Statement{
Query: finalSQL,
Args: finalArgs,
Warnings: warnings,
}, nil
}
func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
ctx context.Context,
sb *sqlbuilder.SelectBuilder,
@@ -427,6 +537,17 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
sb.Having(rewrittenExpr)
}
if len(query.Order) != 0 {
// Add order by
for _, orderBy := range query.Order {
_, ok := aggOrderBy(orderBy, query)
if !ok {
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
}
}
sb.OrderBy("ts desc")
}
combinedArgs := append(allGroupByArgs, allAggChArgs...)
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
@@ -443,6 +564,17 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
sb.Having(rewrittenExpr)
}
if len(query.Order) != 0 {
// Add order by
for _, orderBy := range query.Order {
_, ok := aggOrderBy(orderBy, query)
if !ok {
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
}
}
sb.OrderBy("ts desc")
}
combinedArgs := append(allGroupByArgs, allAggChArgs...)
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)

View File

@@ -458,3 +458,65 @@ func TestStatementBuilderListQuery(t *testing.T) {
})
}
}
func TestStatementBuilderTraceQuery(t *testing.T) {
cases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]
expected qbtypes.Statement
expectedErr error
}{
{
name: "List query with mat selected fields",
requestType: qbtypes.RequestTypeTrace,
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{
Expression: "service.name = 'redis-manual'",
},
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __toe AS (SELECT trace_id FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __toe_duration_sorted AS (SELECT trace_id, duration_nano, resource_string_service$$name as `service.name`, name FROM signoz_traces.distributed_signoz_index_v3 WHERE parent_span_id = '' AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY duration_nano DESC LIMIT 1 BY trace_id) SELECT __toe_duration_sorted.`service.name` AS `service.name`, __toe_duration_sorted.name AS `name`, count() AS span_count, __toe_duration_sorted.duration_nano AS `duration_nano`, __toe_duration_sorted.trace_id AS `trace_id` FROM __toe INNER JOIN __toe_duration_sorted ON __toe.trace_id = __toe_duration_sorted.trace_id GROUP BY trace_id, duration_nano, name, `service.name` ORDER BY duration_nano DESC LIMIT 1 BY trace_id LIMIT ?",
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
nil,
)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
if c.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
require.Equal(t, c.expected.Warnings, q.Warnings)
}
})
}
}

View File

@@ -0,0 +1,123 @@
package transition
import (
"log/slog"
"golang.org/x/net/context"
)
type alertMigrateV5 struct {
migrateCommon
logger *slog.Logger
}
func NewAlertMigrateV5(logger *slog.Logger, logsDuplicateKeys []string, tracesDuplicateKeys []string) *alertMigrateV5 {
ambiguity := map[string][]string{
"logs": logsDuplicateKeys,
"traces": tracesDuplicateKeys,
}
return &alertMigrateV5{
migrateCommon: migrateCommon{
ambiguity: ambiguity,
logger: logger,
},
logger: logger,
}
}
func (m *alertMigrateV5) Migrate(ctx context.Context, ruleData map[string]any) bool {
updated := false
ruleCondition, ok := ruleData["condition"].(map[string]any)
if !ok {
m.logger.InfoContext(ctx, "didn't find condition")
return updated
}
compositeQuery, ok := ruleCondition["compositeQuery"].(map[string]any)
if !ok {
m.logger.InfoContext(ctx, "didn't find composite query")
return updated
}
if compositeQuery["queries"] == nil {
compositeQuery["queries"] = []any{}
}
m.logger.InfoContext(ctx, "setup empty list")
queryType := compositeQuery["queryType"]
// Migrate builder queries
if builderQueries, ok := compositeQuery["builderQueries"].(map[string]any); ok && len(builderQueries) > 0 && queryType == "builder" {
m.logger.InfoContext(ctx, "found builderQueries")
queryType, _ := compositeQuery["queryType"].(string)
if queryType == "builder" {
for name, query := range builderQueries {
if queryMap, ok := query.(map[string]any); ok {
m.logger.InfoContext(ctx, "mapping builder query")
var panelType string
if pt, ok := compositeQuery["panelType"].(string); ok {
panelType = pt
}
if m.updateQueryData(ctx, queryMap, "v4", panelType) {
updated = true
}
m.logger.InfoContext(ctx, "migrated querymap")
// wrap it in the v5 envelope
envelope := m.wrapInV5Envelope(name, queryMap, "builder_query")
m.logger.InfoContext(ctx, "envelope after wrap", "envelope", envelope)
compositeQuery["queries"] = append(compositeQuery["queries"].([]any), envelope)
}
}
// Clear old field after migration
delete(compositeQuery, "builderQueries")
}
}
// Migrate prom queries
if promQueries, ok := compositeQuery["promQueries"].(map[string]any); ok && len(promQueries) > 0 && queryType == "promql" {
for name, query := range promQueries {
if queryMap, ok := query.(map[string]any); ok {
envelope := map[string]any{
"type": "promql",
"spec": map[string]any{
"name": name,
"query": queryMap["query"],
"disabled": queryMap["disabled"],
"legend": queryMap["legend"],
},
}
compositeQuery["queries"] = append(compositeQuery["queries"].([]any), envelope)
updated = true
}
}
// Clear old field after migration
delete(compositeQuery, "promQueries")
}
// Migrate clickhouse queries
if chQueries, ok := compositeQuery["chQueries"].(map[string]any); ok && len(chQueries) > 0 && queryType == "clickhouse_sql" {
for name, query := range chQueries {
if queryMap, ok := query.(map[string]any); ok {
envelope := map[string]any{
"type": "clickhouse_sql",
"spec": map[string]any{
"name": name,
"query": queryMap["query"],
"disabled": queryMap["disabled"],
"legend": queryMap["legend"],
},
}
compositeQuery["queries"] = append(compositeQuery["queries"].([]any), envelope)
updated = true
}
}
// Clear old field after migration
delete(compositeQuery, "chQueries")
}
return updated
}

View File

@@ -0,0 +1,931 @@
package transition
import (
"context"
"fmt"
"log/slog"
"regexp"
"slices"
"strings"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
)
type migrateCommon struct {
ambiguity map[string][]string
logger *slog.Logger
}
func (migration *migrateCommon) wrapInV5Envelope(name string, queryMap map[string]any, queryType string) map[string]any {
// Create a properly structured v5 query
v5Query := map[string]any{
"name": name,
"disabled": queryMap["disabled"],
"legend": queryMap["legend"],
}
if name != queryMap["expression"] {
// formula
queryType = "builder_formula"
v5Query["expression"] = queryMap["expression"]
if functions, ok := queryMap["functions"]; ok {
v5Query["functions"] = functions
}
return map[string]any{
"type": queryType,
"spec": v5Query,
}
}
// Add signal based on data source
if dataSource, ok := queryMap["dataSource"].(string); ok {
switch dataSource {
case "traces":
v5Query["signal"] = "traces"
case "logs":
v5Query["signal"] = "logs"
case "metrics":
v5Query["signal"] = "metrics"
}
}
if stepInterval, ok := queryMap["stepInterval"]; ok {
v5Query["stepInterval"] = stepInterval
}
if aggregations, ok := queryMap["aggregations"]; ok {
v5Query["aggregations"] = aggregations
}
if filter, ok := queryMap["filter"]; ok {
v5Query["filter"] = filter
}
// Copy groupBy with proper structure
if groupBy, ok := queryMap["groupBy"].([]any); ok {
v5GroupBy := make([]any, len(groupBy))
for i, gb := range groupBy {
if gbMap, ok := gb.(map[string]any); ok {
v5GroupBy[i] = map[string]any{
"name": gbMap["key"],
"fieldDataType": gbMap["dataType"],
"fieldContext": gbMap["type"],
}
}
}
v5Query["groupBy"] = v5GroupBy
}
// Copy orderBy with proper structure
if orderBy, ok := queryMap["orderBy"].([]any); ok {
v5OrderBy := make([]any, len(orderBy))
for i, ob := range orderBy {
if obMap, ok := ob.(map[string]any); ok {
v5OrderBy[i] = map[string]any{
"key": map[string]any{
"name": obMap["columnName"],
"fieldDataType": obMap["dataType"],
"fieldContext": obMap["type"],
},
"direction": obMap["order"],
}
}
}
v5Query["order"] = v5OrderBy
}
// Copy selectColumns as selectFields
if selectColumns, ok := queryMap["selectColumns"].([]any); ok {
v5SelectFields := make([]any, len(selectColumns))
for i, col := range selectColumns {
if colMap, ok := col.(map[string]any); ok {
v5SelectFields[i] = map[string]any{
"name": colMap["key"],
"fieldDataType": colMap["dataType"],
"fieldContext": colMap["type"],
}
}
}
v5Query["selectFields"] = v5SelectFields
}
// Copy limit and offset
if limit, ok := queryMap["limit"]; ok {
v5Query["limit"] = limit
}
if offset, ok := queryMap["offset"]; ok {
v5Query["offset"] = offset
}
if having, ok := queryMap["having"]; ok {
v5Query["having"] = having
}
if functions, ok := queryMap["functions"]; ok {
v5Query["functions"] = functions
}
return map[string]any{
"type": queryType,
"spec": v5Query,
}
}
func (mc *migrateCommon) updateQueryData(ctx context.Context, queryData map[string]any, version, widgetType string) bool {
updated := false
aggregateOp, _ := queryData["aggregateOperator"].(string)
hasAggregation := aggregateOp != "" && aggregateOp != "noop"
if mc.createAggregations(ctx, queryData, version, widgetType) {
updated = true
}
if mc.createFilterExpression(ctx, queryData) {
updated = true
}
if mc.fixGroupBy(queryData) {
updated = true
}
if mc.createHavingExpression(ctx, queryData) {
updated = true
}
if hasAggregation {
if orderBy, ok := queryData["orderBy"].([]any); ok {
newOrderBy := make([]any, 0)
for _, order := range orderBy {
if orderMap, ok := order.(map[string]any); ok {
columnName, _ := orderMap["columnName"].(string)
// skip timestamp, id (logs, traces), samples(metrics) ordering for aggregation queries
if columnName != "timestamp" && columnName != "samples" && columnName != "id" {
if columnName == "#SIGNOZ_VALUE" {
if expr, has := mc.orderByExpr(queryData); has {
orderMap["columnName"] = expr
}
} else {
// if the order by key is not part of the group by keys, remove it
present := false
groupBy, ok := queryData["groupBy"].([]any)
if !ok {
return false
}
for idx := range groupBy {
item, ok := groupBy[idx].(map[string]any)
if !ok {
continue
}
key, ok := item["key"].(string)
if !ok {
continue
}
if key == columnName {
present = true
}
}
if !present {
mc.logger.InfoContext(ctx, "found a order by without group by, skipping", "order.col_name", columnName)
continue
}
}
newOrderBy = append(newOrderBy, orderMap)
}
}
}
queryData["orderBy"] = newOrderBy
updated = true
}
} else {
dataSource, _ := queryData["dataSource"].(string)
if orderBy, ok := queryData["orderBy"].([]any); ok {
newOrderBy := make([]any, 0)
for _, order := range orderBy {
if orderMap, ok := order.(map[string]any); ok {
columnName, _ := orderMap["columnName"].(string)
// skip id for (traces)
if columnName == "id" && dataSource == "traces" {
mc.logger.InfoContext(ctx, "skipping `id` order by for traces")
continue
}
newOrderBy = append(newOrderBy, orderMap)
}
}
queryData["orderBy"] = newOrderBy
updated = true
}
}
if functions, ok := queryData["functions"].([]any); ok {
v5Functions := make([]any, len(functions))
for i, fn := range functions {
if fnMap, ok := fn.(map[string]any); ok {
v5Function := map[string]any{
"name": fnMap["name"],
}
// Convert args from v4 format to v5 FunctionArg format
if args, ok := fnMap["args"].([]any); ok {
v5Args := make([]any, len(args))
for j, arg := range args {
// In v4, args were just values. In v5, they are FunctionArg objects
v5Args[j] = map[string]any{
"name": "", // v4 didn't have named args
"value": arg,
}
}
v5Function["args"] = v5Args
}
// Handle namedArgs if present (some functions might have used this)
if namedArgs, ok := fnMap["namedArgs"].(map[string]any); ok {
// Convert named args to the new format
existingArgs, _ := v5Function["args"].([]any)
if existingArgs == nil {
existingArgs = []any{}
}
for name, value := range namedArgs {
existingArgs = append(existingArgs, map[string]any{
"name": name,
"value": value,
})
}
v5Function["args"] = existingArgs
}
v5Functions[i] = v5Function
}
}
queryData["functions"] = v5Functions
updated = true
}
delete(queryData, "aggregateOperator")
delete(queryData, "aggregateAttribute")
delete(queryData, "temporality")
delete(queryData, "timeAggregation")
delete(queryData, "spaceAggregation")
delete(queryData, "reduceTo")
delete(queryData, "filters")
delete(queryData, "ShiftBy")
delete(queryData, "IsAnomaly")
delete(queryData, "QueriesUsedInFormula")
delete(queryData, "seriesAggregation")
return updated
}
func (mc *migrateCommon) orderByExpr(queryData map[string]any) (string, bool) {
aggregateOp, hasOp := queryData["aggregateOperator"].(string)
aggregateAttr, hasAttr := queryData["aggregateAttribute"].(map[string]any)
dataSource, _ := queryData["dataSource"].(string)
if aggregateOp == "noop" {
return "", false
}
if !hasOp || !hasAttr {
return "", false
}
var expr string
var has bool
switch dataSource {
case "metrics":
aggs, ok := queryData["aggregations"].([]any)
if !ok {
return "", false
}
if len(aggs) == 0 {
return "", false
}
agg, ok := aggs[0].(map[string]any)
if !ok {
return "", false
}
spaceAgg, ok := agg["spaceAggregation"].(string)
if !ok {
return "", false
}
expr = fmt.Sprintf("%s(%s)", spaceAgg, aggregateAttr["key"])
has = true
case "logs":
expr = mc.buildAggregationExpression(aggregateOp, aggregateAttr)
has = true
case "traces":
expr = mc.buildAggregationExpression(aggregateOp, aggregateAttr)
has = true
default:
has = false
}
return expr, has
}
func (mc *migrateCommon) createAggregations(ctx context.Context, queryData map[string]any, version, widgetType string) bool {
aggregateOp, hasOp := queryData["aggregateOperator"].(string)
aggregateAttr, hasAttr := queryData["aggregateAttribute"].(map[string]any)
dataSource, _ := queryData["dataSource"].(string)
if aggregateOp == "noop" {
return false
}
if !hasOp || !hasAttr {
return false
}
var aggregation map[string]any
switch dataSource {
case "metrics":
if version == "v4" {
if _, ok := queryData["spaceAggregation"]; !ok {
queryData["spaceAggregation"] = aggregateOp
}
aggregation = map[string]any{
"metricName": aggregateAttr["key"],
"temporality": queryData["temporality"],
"timeAggregation": aggregateOp,
"spaceAggregation": queryData["spaceAggregation"],
}
if reduceTo, ok := queryData["reduceTo"].(string); ok {
aggregation["reduceTo"] = reduceTo
}
} else {
var timeAgg, spaceAgg, reduceTo string
switch aggregateOp {
case "sum_rate", "rate_sum":
timeAgg = "rate"
spaceAgg = "sum"
reduceTo = "sum"
case "avg_rate", "rate_avg":
timeAgg = "rate"
spaceAgg = "avg"
reduceTo = "avg"
case "min_rate", "rate_min":
timeAgg = "rate"
spaceAgg = "min"
reduceTo = "min"
case "max_rate", "rate_max":
timeAgg = "rate"
spaceAgg = "max"
reduceTo = "max"
case "hist_quantile_50":
timeAgg = ""
spaceAgg = "p50"
reduceTo = "avg"
case "hist_quantile_75":
timeAgg = ""
spaceAgg = "p75"
reduceTo = "avg"
case "hist_quantile_90":
timeAgg = ""
spaceAgg = "p90"
reduceTo = "avg"
case "hist_quantile_95":
timeAgg = ""
spaceAgg = "p95"
reduceTo = "avg"
case "hist_quantile_99":
timeAgg = ""
spaceAgg = "p99"
reduceTo = "avg"
case "rate":
timeAgg = "rate"
spaceAgg = "sum"
reduceTo = "sum"
case "p99", "p90", "p75", "p50", "p25", "p20", "p10", "p05":
mc.logger.InfoContext(ctx, "found invalid config")
timeAgg = "avg"
spaceAgg = "avg"
reduceTo = "avg"
case "min":
timeAgg = "min"
spaceAgg = "min"
reduceTo = "min"
case "max":
timeAgg = "max"
spaceAgg = "max"
reduceTo = "max"
case "avg":
timeAgg = "avg"
spaceAgg = "avg"
reduceTo = "avg"
case "sum":
timeAgg = "sum"
spaceAgg = "sum"
reduceTo = "sum"
case "count":
timeAgg = "count"
spaceAgg = "sum"
reduceTo = "sum"
case "count_distinct":
timeAgg = "count_distinct"
spaceAgg = "sum"
reduceTo = "sum"
case "noop":
mc.logger.InfoContext(ctx, "noop found in the data")
timeAgg = "max"
spaceAgg = "max"
reduceTo = "max"
}
aggregation = map[string]any{
"metricName": aggregateAttr["key"],
"temporality": queryData["temporality"],
"timeAggregation": timeAgg,
"spaceAggregation": spaceAgg,
}
if widgetType == "table" {
aggregation["reduceTo"] = reduceTo
} else {
if reduceTo, ok := queryData["reduceTo"].(string); ok {
aggregation["reduceTo"] = reduceTo
}
}
}
case "logs":
expression := mc.buildAggregationExpression(aggregateOp, aggregateAttr)
aggregation = map[string]any{
"expression": expression,
}
case "traces":
expression := mc.buildAggregationExpression(aggregateOp, aggregateAttr)
aggregation = map[string]any{
"expression": expression,
}
default:
return false
}
queryData["aggregations"] = []any{aggregation}
return true
}
func (mc *migrateCommon) createFilterExpression(ctx context.Context, queryData map[string]any) bool {
filters, ok := queryData["filters"].(map[string]any)
if !ok {
return false
}
items, ok := filters["items"].([]any)
if !ok || len(items) == 0 {
return false
}
op, ok := filters["op"].(string)
if !ok {
op = "AND"
}
dataSource, _ := queryData["dataSource"].(string)
expression := mc.buildExpression(ctx, items, op, dataSource)
if expression != "" {
if groupByExists := mc.groupByExistsExpr(queryData); groupByExists != "" {
mc.logger.InfoContext(ctx, "adding default exists for old qb", "group_by_exists", groupByExists)
expression += groupByExists
}
queryData["filter"] = map[string]any{
"expression": expression,
}
delete(queryData, "filters")
return true
}
return false
}
func (mc *migrateCommon) groupByExistsExpr(queryData map[string]any) string {
expr := []string{}
groupBy, ok := queryData["groupBy"].([]any)
if !ok {
return strings.Join(expr, " AND ")
}
for idx := range groupBy {
item, ok := groupBy[idx].(map[string]any)
if !ok {
continue
}
key, ok := item["key"].(string)
if !ok {
continue
}
expr = append(expr, fmt.Sprintf("%s EXISTS", key))
if _, ok := telemetrytraces.IntrinsicFields[key]; ok {
delete(item, "type")
}
if _, ok := telemetrytraces.CalculatedFields[key]; ok {
delete(item, "type")
}
if _, ok := telemetrytraces.IntrinsicFieldsDeprecated[key]; ok {
delete(item, "type")
}
if _, ok := telemetrytraces.CalculatedFieldsDeprecated[key]; ok {
delete(item, "type")
}
}
return strings.Join(expr, " AND ")
}
func (mc *migrateCommon) fixGroupBy(queryData map[string]any) bool {
groupBy, ok := queryData["groupBy"].([]any)
if !ok {
return false
}
for idx := range groupBy {
item, ok := groupBy[idx].(map[string]any)
if !ok {
continue
}
key, ok := item["key"].(string)
if !ok {
continue
}
if _, ok := telemetrytraces.IntrinsicFields[key]; ok {
delete(item, "type")
}
if _, ok := telemetrytraces.CalculatedFields[key]; ok {
delete(item, "type")
}
if _, ok := telemetrytraces.IntrinsicFieldsDeprecated[key]; ok {
delete(item, "type")
}
if _, ok := telemetrytraces.CalculatedFieldsDeprecated[key]; ok {
delete(item, "type")
}
}
return false
}
func (mc *migrateCommon) createHavingExpression(ctx context.Context, queryData map[string]any) bool {
having, ok := queryData["having"].([]any)
if !ok || len(having) == 0 {
queryData["having"] = map[string]any{
"expression": "",
}
return true
}
dataSource, _ := queryData["dataSource"].(string)
for idx := range having {
if havingItem, ok := having[idx].(map[string]any); ok {
havingCol, has := mc.orderByExpr(queryData)
if has {
havingItem["columnName"] = havingCol
havingItem["key"] = map[string]any{"key": havingCol}
}
having[idx] = havingItem
}
}
mc.logger.InfoContext(ctx, "having before expression", "having", having)
expression := mc.buildExpression(ctx, having, "AND", dataSource)
mc.logger.InfoContext(ctx, "having expression after building", "expression", expression, "having", having)
queryData["having"] = map[string]any{
"expression": expression,
}
return true
}
func (mc *migrateCommon) buildExpression(ctx context.Context, items []any, op, dataSource string) string {
if len(items) == 0 {
return ""
}
var conditions []string
for _, item := range items {
itemMap, ok := item.(map[string]any)
if !ok {
continue
}
key, keyOk := itemMap["key"].(map[string]any)
operator, opOk := itemMap["op"].(string)
value, valueOk := itemMap["value"]
if !keyOk || !opOk || !valueOk {
mc.logger.InfoContext(ctx, "didn't find either key, op, or value; continuing")
continue
}
keyStr, ok := key["key"].(string)
if !ok {
continue
}
if slices.Contains(mc.ambiguity[dataSource], keyStr) {
mc.logger.InfoContext(ctx, "ambiguity found for a key", "ambiguity.key", keyStr)
typeStr, ok := key["type"].(string)
if ok {
if typeStr == "tag" {
typeStr = "attribute"
} else {
typeStr = "resource"
}
keyStr = typeStr + "." + keyStr
}
}
condition := mc.buildCondition(ctx, keyStr, operator, value, key)
if condition != "" {
conditions = append(conditions, condition)
}
}
if len(conditions) == 0 {
return ""
}
if len(conditions) == 1 {
return conditions[0]
}
return "(" + strings.Join(conditions, " "+op+" ") + ")"
}
func (mc *migrateCommon) buildCondition(ctx context.Context, key, operator string, value any, keyMetadata map[string]any) string {
dataType, _ := keyMetadata["dataType"].(string)
formattedValue := mc.formatValue(ctx, value, dataType)
switch operator {
case "=":
return fmt.Sprintf("%s = %s", key, formattedValue)
case "!=":
return fmt.Sprintf("%s != %s", key, formattedValue)
case ">":
return fmt.Sprintf("%s > %s", key, formattedValue)
case ">=":
return fmt.Sprintf("%s >= %s", key, formattedValue)
case "<":
return fmt.Sprintf("%s < %s", key, formattedValue)
case "<=":
return fmt.Sprintf("%s <= %s", key, formattedValue)
case "in", "IN":
return fmt.Sprintf("%s IN %s", key, formattedValue)
case "nin", "NOT IN":
return fmt.Sprintf("%s NOT IN %s", key, formattedValue)
case "like", "LIKE":
return fmt.Sprintf("%s LIKE %s", key, formattedValue)
case "nlike", "NOT LIKE":
return fmt.Sprintf("%s NOT LIKE %s", key, formattedValue)
case "contains":
return fmt.Sprintf("%s CONTAINS %s", key, formattedValue)
case "ncontains":
return fmt.Sprintf("%s NOT CONTAINS %s", key, formattedValue)
case "regex":
return fmt.Sprintf("%s REGEXP %s", key, formattedValue)
case "nregex":
return fmt.Sprintf("%s NOT REGEXP %s", key, formattedValue)
case "exists":
return fmt.Sprintf("%s EXISTS", key)
case "nexists":
return fmt.Sprintf("%s NOT EXISTS", key)
case "has":
return fmt.Sprintf("has(%s, %s)", key, formattedValue)
case "nhas":
return fmt.Sprintf("NOT has(%s, %s)", key, formattedValue)
default:
return fmt.Sprintf("%s %s %s", key, operator, formattedValue)
}
}
func (mc *migrateCommon) buildAggregationExpression(operator string, attribute map[string]any) string {
key, _ := attribute["key"].(string)
switch operator {
case "count":
return "count()"
case "sum":
if key != "" {
return fmt.Sprintf("sum(%s)", key)
}
return "sum()"
case "avg":
if key != "" {
return fmt.Sprintf("avg(%s)", key)
}
return "avg()"
case "min":
if key != "" {
return fmt.Sprintf("min(%s)", key)
}
return "min()"
case "max":
if key != "" {
return fmt.Sprintf("max(%s)", key)
}
return "max()"
case "p05":
if key != "" {
return fmt.Sprintf("p05(%s)", key)
}
return "p05()"
case "p10":
if key != "" {
return fmt.Sprintf("p10(%s)", key)
}
return "p10()"
case "p20":
if key != "" {
return fmt.Sprintf("p20(%s)", key)
}
return "p20()"
case "p25":
if key != "" {
return fmt.Sprintf("p25(%s)", key)
}
return "p25()"
case "p50":
if key != "" {
return fmt.Sprintf("p50(%s)", key)
}
return "p50()"
case "p90":
if key != "" {
return fmt.Sprintf("p90(%s)", key)
}
return "p90()"
case "p95":
if key != "" {
return fmt.Sprintf("p95(%s)", key)
}
return "p95()"
case "p99":
if key != "" {
return fmt.Sprintf("p99(%s)", key)
}
return "p99()"
case "rate":
if key != "" {
return fmt.Sprintf("rate(%s)", key)
}
return "rate()"
case "rate_sum":
if key != "" {
return fmt.Sprintf("rate_sum(%s)", key)
}
return "rate_sum()"
case "rate_avg":
if key != "" {
return fmt.Sprintf("rate_avg(%s)", key)
}
return "rate_avg()"
case "rate_min":
if key != "" {
return fmt.Sprintf("rate_min(%s)", key)
}
return "rate_min()"
case "rate_max":
if key != "" {
return fmt.Sprintf("rate_max(%s)", key)
}
return "rate_max()"
case "sum_rate":
if key != "" {
return fmt.Sprintf("sum(rate(%s))", key)
}
return "sum(rate())"
case "count_distinct":
if key != "" {
return fmt.Sprintf("count_distinct(%s)", key)
}
return "count_distinct()"
default:
// For unknown operators, try to use them as-is
if key != "" {
return fmt.Sprintf("%s(%s)", operator, key)
}
return fmt.Sprintf("%s()", operator)
}
}
func (mc *migrateCommon) formatValue(ctx context.Context, value any, dataType string) string {
switch v := value.(type) {
case string:
if mc.isVariable(v) {
mc.logger.InfoContext(ctx, "found a variable", "dashboard.variable", v)
return mc.normalizeVariable(ctx, v)
}
if mc.isNumericType(dataType) {
if _, err := fmt.Sscanf(v, "%f", new(float64)); err == nil {
return v // Return the numeric string without quotes
}
}
// Otherwise, it's a string literal - escape single quotes and wrap in quotes
escaped := strings.ReplaceAll(v, "'", "\\'")
return fmt.Sprintf("'%s'", escaped)
case float64:
return fmt.Sprintf("%v", v)
case int:
return fmt.Sprintf("%d", v)
case bool:
return fmt.Sprintf("%t", v)
case []any:
if len(v) == 1 {
return mc.formatValue(ctx, v[0], dataType)
}
var values []string
for _, item := range v {
values = append(values, mc.formatValue(ctx, item, dataType))
}
return "[" + strings.Join(values, ", ") + "]"
default:
return fmt.Sprintf("%v", v)
}
}
func (mc *migrateCommon) isNumericType(dataType string) bool {
switch dataType {
case "int", "int8", "int16", "int32", "int64",
"uint", "uint8", "uint16", "uint32", "uint64",
"float", "float32", "float64",
"number", "numeric", "integer":
return true
default:
return false
}
}
func (mc *migrateCommon) isVariable(s string) bool {
s = strings.TrimSpace(s)
patterns := []string{
`^\{\{.*\}\}$`, // {{var}} or {{.var}}
`^\$.*$`, // $var or $service.name
`^\[\[.*\]\]$`, // [[var]] or [[.var]]
`^\$\{\{.*\}\}$`, // ${{env}} or ${{.var}}
}
for _, pattern := range patterns {
matched, _ := regexp.MatchString(pattern, s)
if matched {
return true
}
}
return false
}
func (mc *migrateCommon) normalizeVariable(ctx context.Context, s string) string {
s = strings.TrimSpace(s)
var varName string
// {{var}} or {{.var}}
if strings.HasPrefix(s, "{{") && strings.HasSuffix(s, "}}") {
varName = strings.TrimPrefix(strings.TrimSuffix(s, "}}"), "{{")
varName = strings.TrimPrefix(varName, ".")
// this is probably going to be problem if user has $ as start of key
varName = strings.TrimPrefix(varName, "$")
} else if strings.HasPrefix(s, "[[") && strings.HasSuffix(s, "]]") {
// [[var]] or [[.var]]
varName = strings.TrimPrefix(strings.TrimSuffix(s, "]]"), "[[")
varName = strings.TrimPrefix(varName, ".")
} else if strings.HasPrefix(s, "${{") && strings.HasSuffix(s, "}}") {
varName = strings.TrimPrefix(strings.TrimSuffix(s, "}}"), "${{")
varName = strings.TrimPrefix(varName, ".")
varName = strings.TrimPrefix(varName, "$")
} else if strings.HasPrefix(s, "$") {
// $var
return s
} else {
return s
}
if strings.Contains(varName, " ") {
mc.logger.InfoContext(ctx, "found white space in var name, replacing it", "dashboard.var_name", varName)
varName = strings.ReplaceAll(varName, " ", "")
}
return "$" + varName
}

View File

@@ -0,0 +1,95 @@
package transition
import (
"context"
"log/slog"
"strings"
)
type dashboardMigrateV5 struct {
migrateCommon
logger *slog.Logger
}
func NewDashboardMigrateV5(logger *slog.Logger, logsDuplicateKeys []string, tracesDuplicateKeys []string) *dashboardMigrateV5 {
ambiguity := map[string][]string{
"logs": logsDuplicateKeys,
"traces": tracesDuplicateKeys,
}
return &dashboardMigrateV5{
migrateCommon: migrateCommon{
ambiguity: ambiguity,
logger: logger,
},
logger: logger,
}
}
func (m *dashboardMigrateV5) Migrate(ctx context.Context, dashboardData map[string]any) bool {
updated := false
var version string
if _, ok := dashboardData["version"].(string); ok {
version = dashboardData["version"].(string)
}
// if there is a white space in variable, replace it
if variables, ok := dashboardData["variables"].(map[string]any); ok {
for _, variable := range variables {
if varMap, ok := variable.(map[string]any); ok {
name, ok := varMap["name"].(string)
if ok {
if strings.Contains(name, " ") {
m.logger.InfoContext(ctx, "found a variable with space in map, replacing it", "name", name)
name = strings.ReplaceAll(name, " ", "")
updated = true
varMap["name"] = name
}
}
}
}
}
if widgets, ok := dashboardData["widgets"].([]any); ok {
for _, widget := range widgets {
if widgetMap, ok := widget.(map[string]any); ok {
if m.updateWidget(ctx, widgetMap, version) {
updated = true
}
}
}
}
return updated
}
func (migration *dashboardMigrateV5) updateWidget(ctx context.Context, widget map[string]any, version string) bool {
query, ok := widget["query"].(map[string]any)
if !ok {
return false
}
builder, ok := query["builder"].(map[string]any)
if !ok {
return false
}
queryData, ok := builder["queryData"].([]any)
if !ok {
return false
}
widgetType := widget["panelTypes"].(string)
updated := false
for _, qd := range queryData {
if queryDataMap, ok := qd.(map[string]any); ok {
if migration.updateQueryData(ctx, queryDataMap, version, widgetType) {
updated = true
}
}
}
return updated
}

View File

@@ -0,0 +1,44 @@
package transition
import (
"log/slog"
"golang.org/x/net/context"
)
type savedViewMigrateV5 struct {
migrateCommon
logger *slog.Logger
}
func NewSavedViewMigrateV5(logger *slog.Logger, logsDuplicateKeys []string, tracesDuplicateKeys []string) *savedViewMigrateV5 {
return &savedViewMigrateV5{
logger: logger,
}
}
func (m *savedViewMigrateV5) Migrate(ctx context.Context, data map[string]any) bool {
updated := false
if builderQueries, ok := data["builderQueries"].(map[string]any); ok {
for name, query := range builderQueries {
if queryMap, ok := query.(map[string]any); ok {
var panelType string
if _, ok := data["panelType"].(string); ok {
panelType = data["panelType"].(string)
}
if m.updateQueryData(ctx, queryMap, "v4", panelType) {
updated = true
}
m.logger.InfoContext(ctx, "migrated querymap")
// wrap it in the v5 envelope
envelope := m.wrapInV5Envelope(name, queryMap, "builder_query")
m.logger.InfoContext(ctx, "envelope after wrap", "envelope", envelope)
data["queries"] = append(data["queries"].([]any), envelope)
}
}
}
return updated
}

102
pkg/transition/v5_to_v4.go Normal file
View File

@@ -0,0 +1,102 @@
package transition
import (
"fmt"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
// ConvertV5TimeSeriesDataToV4Result converts v5 TimeSeriesData to v4 Result
func ConvertV5TimeSeriesDataToV4Result(v5Data *qbtypes.TimeSeriesData) *v3.Result {
if v5Data == nil {
return nil
}
result := &v3.Result{
QueryName: v5Data.QueryName,
Series: make([]*v3.Series, 0),
}
toV4Series := func(ts *qbtypes.TimeSeries) *v3.Series {
series := &v3.Series{
Labels: make(map[string]string),
LabelsArray: make([]map[string]string, 0),
Points: make([]v3.Point, 0, len(ts.Values)),
}
for _, label := range ts.Labels {
valueStr := fmt.Sprintf("%v", label.Value)
series.Labels[label.Key.Name] = valueStr
}
if len(series.Labels) > 0 {
series.LabelsArray = append(series.LabelsArray, series.Labels)
}
for _, tsValue := range ts.Values {
if tsValue.Partial {
continue
}
point := v3.Point{
Timestamp: tsValue.Timestamp,
Value: tsValue.Value,
}
series.Points = append(series.Points, point)
}
return series
}
for _, aggBucket := range v5Data.Aggregations {
for _, ts := range aggBucket.Series {
result.Series = append(result.Series, toV4Series(ts))
}
if len(aggBucket.AnomalyScores) != 0 {
result.AnomalyScores = make([]*v3.Series, 0)
for _, ts := range aggBucket.AnomalyScores {
result.AnomalyScores = append(result.AnomalyScores, toV4Series(ts))
}
}
if len(aggBucket.PredictedSeries) != 0 {
result.PredictedSeries = make([]*v3.Series, 0)
for _, ts := range aggBucket.PredictedSeries {
result.PredictedSeries = append(result.PredictedSeries, toV4Series(ts))
}
}
if len(aggBucket.LowerBoundSeries) != 0 {
result.LowerBoundSeries = make([]*v3.Series, 0)
for _, ts := range aggBucket.LowerBoundSeries {
result.LowerBoundSeries = append(result.LowerBoundSeries, toV4Series(ts))
}
}
if len(aggBucket.UpperBoundSeries) != 0 {
result.UpperBoundSeries = make([]*v3.Series, 0)
for _, ts := range aggBucket.UpperBoundSeries {
result.UpperBoundSeries = append(result.UpperBoundSeries, toV4Series(ts))
}
}
}
return result
}
// ConvertV5TimeSeriesDataSliceToV4Results converts a slice of v5 TimeSeriesData to v4 QueryRangeResponse
func ConvertV5TimeSeriesDataSliceToV4Results(v5DataSlice []*qbtypes.TimeSeriesData) *v3.QueryRangeResponse {
response := &v3.QueryRangeResponse{
ResultType: "matrix", // Time series data is typically "matrix" type
Result: make([]*v3.Result, 0, len(v5DataSlice)),
}
for _, v5Data := range v5DataSlice {
if result := ConvertV5TimeSeriesDataToV4Result(v5Data); result != nil {
response.Result = append(response.Result, result)
}
}
return response
}

View File

@@ -56,6 +56,8 @@ type QueryBuilderQuery[T any] struct {
// functions to apply to the query
Functions []Function `json:"functions,omitempty"`
Legend string `json:"legend,omitempty"`
// ShiftBy is extracted from timeShift function for internal use
// This field is not serialized to JSON
ShiftBy int64 `json:"-"`

View File

@@ -7,4 +7,6 @@ type ClickHouseQuery struct {
Query string `json:"query"`
// disabled if true, the query will not be executed
Disabled bool `json:"disabled"`
Legend string `json:"legend,omitempty"`
}

View File

@@ -33,6 +33,8 @@ type QueryBuilderFormula struct {
// functions to apply to the formula result
Functions []Function `json:"functions,omitempty"`
Legend string `json:"legend,omitempty"`
}
// UnmarshalJSON implements custom JSON unmarshaling to disallow unknown fields

View File

@@ -0,0 +1,24 @@
package querybuildertypesv5
func GCD(a, b int64) int64 {
for b != 0 {
a, b = b, a%b
}
return a
}
func LCM(a, b int64) int64 {
return (a * b) / GCD(a, b)
}
// LCMList computes the LCM of a list of int64 numbers.
func LCMList(nums []int64) int64 {
if len(nums) == 0 {
return 1
}
result := nums[0]
for _, num := range nums[1:] {
result = LCM(result, num)
}
return result
}

View File

@@ -11,4 +11,6 @@ type PromQuery struct {
Step Step `json:"step"`
// stats if true, the query will return stats
Stats bool `json:"stats"`
Legend string `json:"legend,omitempty"`
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"strings"
"github.com/SigNoz/govaluate"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -211,6 +212,113 @@ type QueryRangeRequest struct {
FormatOptions *FormatOptions `json:"formatOptions,omitempty"`
}
func (r *QueryRangeRequest) StepIntervalForQuery(name string) int64 {
stepsMap := make(map[string]int64)
for _, query := range r.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case QueryBuilderQuery[TraceAggregation]:
stepsMap[spec.Name] = int64(spec.StepInterval.Seconds())
case QueryBuilderQuery[LogAggregation]:
stepsMap[spec.Name] = int64(spec.StepInterval.Seconds())
case QueryBuilderQuery[MetricAggregation]:
stepsMap[spec.Name] = int64(spec.StepInterval.Seconds())
case PromQuery:
stepsMap[spec.Name] = int64(spec.Step.Seconds())
}
}
if step, ok := stepsMap[name]; ok {
return step
}
exprStr := ""
for _, query := range r.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case QueryBuilderFormula:
if spec.Name == name {
exprStr = spec.Expression
}
}
}
expression, _ := govaluate.NewEvaluableExpressionWithFunctions(exprStr, EvalFuncs())
steps := []int64{}
for _, v := range expression.Vars() {
steps = append(steps, stepsMap[v])
}
return LCMList(steps)
}
func (r *QueryRangeRequest) NumAggregationForQuery(name string) int64 {
numAgg := 0
for _, query := range r.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case QueryBuilderQuery[TraceAggregation]:
if spec.Name == name {
numAgg += 1
}
case QueryBuilderQuery[LogAggregation]:
if spec.Name == name {
numAgg += 1
}
case QueryBuilderQuery[MetricAggregation]:
if spec.Name == name {
numAgg += 1
}
case QueryBuilderFormula:
if spec.Name == name {
numAgg += 1
}
}
}
return int64(numAgg)
}
func (r *QueryRangeRequest) FuncsForQuery(name string) []Function {
funcs := []Function{}
for _, query := range r.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case QueryBuilderQuery[TraceAggregation]:
if spec.Name == name {
funcs = spec.Functions
}
case QueryBuilderQuery[LogAggregation]:
if spec.Name == name {
funcs = spec.Functions
}
case QueryBuilderQuery[MetricAggregation]:
if spec.Name == name {
funcs = spec.Functions
}
case QueryBuilderFormula:
if spec.Name == name {
funcs = spec.Functions
}
}
}
return funcs
}
func (r *QueryRangeRequest) IsAnomalyRequest() (*QueryBuilderQuery[MetricAggregation], bool) {
hasAnomaly := false
var q QueryBuilderQuery[MetricAggregation]
for _, query := range r.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
// only metrics support anomaly right now
case QueryBuilderQuery[MetricAggregation]:
for _, f := range spec.Functions {
if f.Name == FunctionNameAnomaly {
hasAnomaly = true
q = spec
}
}
}
}
return &q, hasAnomaly
}
// UnmarshalJSON implements custom JSON unmarshaling to disallow unknown fields
func (r *QueryRangeRequest) UnmarshalJSON(data []byte) error {
// Define a type alias to avoid infinite recursion

View File

@@ -14,6 +14,8 @@ var (
RequestTypeTimeSeries = RequestType{valuer.NewString("time_series")}
// [][]any, SQL result set, but paginated, example: list view
RequestTypeRaw = RequestType{valuer.NewString("raw")}
// [][]any, Specialized SQL result set, paginated
RequestTypeTrace = RequestType{valuer.NewString("trace")}
// []Bucket (struct{Lower,Upper,Count float64}), example: histogram
RequestTypeDistribution = RequestType{valuer.NewString("distribution")}
)

View File

@@ -13,10 +13,25 @@ import (
"github.com/SigNoz/signoz/pkg/valuer"
)
type QBEvent struct {
Version string `json:"version"`
LogsUsed bool `json:"logs_used,omitempty"`
MetricsUsed bool `json:"metrics_used,omitempty"`
TracesUsed bool `json:"traces_used,omitempty"`
FilterApplied bool `json:"filter_applied,omitempty"`
GroupByApplied bool `json:"group_by_applied,omitempty"`
QueryType string `json:"query_type,omitempty"`
PanelType string `json:"panel_type,omitempty"`
NumberOfQueries int `json:"number_of_queries,omitempty"`
HasData bool `json:"-"`
}
type QueryRangeResponse struct {
Type RequestType `json:"type"`
Data any `json:"data"`
Meta ExecStats `json:"meta"`
QBEvent *QBEvent `json:"-"`
}
type TimeSeriesData struct {
@@ -31,6 +46,11 @@ type AggregationBucket struct {
Unit string `json:"unit,omitempty"`
} `json:"meta,omitempty"`
Series []*TimeSeries `json:"series"` // no extra nesting
PredictedSeries []*TimeSeries `json:"predictedSeries,omitempty"`
UpperBoundSeries []*TimeSeries `json:"upperBoundSeries,omitempty"`
LowerBoundSeries []*TimeSeries `json:"lowerBoundSeries,omitempty"`
AnomalyScores []*TimeSeries `json:"anomalyScores,omitempty"`
}
type TimeSeries struct {

View File

@@ -108,7 +108,7 @@ func (q *QueryBuilderQuery[T]) Validate(requestType RequestType) error {
}
// Validate aggregations only for non-raw request types
if requestType != RequestTypeRaw {
if requestType != RequestTypeRaw && requestType != RequestTypeTrace {
if err := q.validateAggregations(); err != nil {
return err
}
@@ -129,7 +129,7 @@ func (q *QueryBuilderQuery[T]) Validate(requestType RequestType) error {
return err
}
if requestType != RequestTypeRaw && len(q.Aggregations) > 0 {
if requestType != RequestTypeRaw && requestType != RequestTypeTrace && len(q.Aggregations) > 0 {
if err := q.validateOrderByForAggregation(); err != nil {
return err
}
@@ -139,7 +139,7 @@ func (q *QueryBuilderQuery[T]) Validate(requestType RequestType) error {
}
}
if requestType != RequestTypeRaw {
if requestType != RequestTypeRaw && requestType != RequestTypeTrace {
if err := q.validateHaving(); err != nil {
return err
}
@@ -440,7 +440,7 @@ func (r *QueryRangeRequest) Validate() error {
// Validate request type
switch r.RequestType {
case RequestTypeRaw, RequestTypeTimeSeries, RequestTypeScalar:
case RequestTypeRaw, RequestTypeTimeSeries, RequestTypeScalar, RequestTypeTrace:
// Valid request types
default:
return errors.NewInvalidInputf(

View File

@@ -1,21 +0,0 @@
package telemetrytypes
import (
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
type VirtualField struct {
bun.BaseModel `bun:"table:virtual_field"`
types.Identifiable
types.TimeAuditable
types.UserAuditable
Name string `bun:"name,type:text,notnull" json:"name"`
Expression string `bun:"expression,type:text,notnull" json:"expression"`
Description string `bun:"description,type:text" json:"description"`
Signal Signal `bun:"signal,type:text,notnull" json:"signal"`
OrgID valuer.UUID `bun:"org_id,type:text,notnull" json:"orgId"`
}