Compare commits

...

9 Commits

Author SHA1 Message Date
nityanandagohain
1487820750 fix: use correct prepQUery 2024-10-23 18:06:25 +05:30
nityanandagohain
fd09f57f76 fix: tests 2024-10-23 17:44:18 +05:30
nityanandagohain
9bc7c8708a fix: add servicename resource filter 2024-10-23 10:49:35 +05:30
nityanandagohain
2115093876 fix: get trace by id api updated 2024-10-22 22:50:57 +05:30
nityanandagohain
33f4d8306d Merge remote-tracking branch 'origin/develop' into feat/trace-v3-poc 2024-10-22 17:14:03 +05:30
nityanandagohain
dbf5f8b77a fix: integrate with querier 2024-10-22 14:06:55 +05:30
nityanandagohain
bfc46790bb Merge remote-tracking branch 'origin/develop' into feat/trace-v3-poc 2024-10-22 10:21:03 +05:30
nityanandagohain
7a011f3460 fix: add remaining files 2024-10-22 10:20:10 +05:30
nityanandagohain
2c30e1493f feat: trace v4 inital commit 2024-10-21 18:20:48 +05:30
27 changed files with 1680 additions and 136 deletions

View File

@@ -40,6 +40,7 @@ type APIHandlerOptions struct {
// Querier Influx Interval // Querier Influx Interval
FluxInterval time.Duration FluxInterval time.Duration
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
type APIHandler struct { type APIHandler struct {
@@ -65,6 +66,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
Cache: opts.Cache, Cache: opts.Cache,
FluxInterval: opts.FluxInterval, FluxInterval: opts.FluxInterval,
UseLogsNewSchema: opts.UseLogsNewSchema, UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
}) })
if err != nil { if err != nil {

View File

@@ -2,32 +2,31 @@ package api
import ( import (
"net/http" "net/http"
"go.signoz.io/signoz/ee/query-service/app/db"
"go.signoz.io/signoz/ee/query-service/model"
baseapp "go.signoz.io/signoz/pkg/query-service/app"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
) )
func (ah *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) { func (ah *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) {
if !ah.CheckFeature(basemodel.SmartTraceDetail) {
zap.L().Info("SmartTraceDetail feature is not enabled in this plan")
ah.APIHandler.SearchTraces(w, r) ah.APIHandler.SearchTraces(w, r)
return return
}
searchTracesParams, err := baseapp.ParseSearchTracesParams(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params")
return
}
result, err := ah.opts.DataConnector.SearchTraces(r.Context(), searchTracesParams, db.SmartTraceAlgorithm) // This is commented since this will be taken care by new trace API
if ah.HandleError(w, err, http.StatusBadRequest) {
return
}
ah.WriteJSON(w, r, result) // if !ah.CheckFeature(basemodel.SmartTraceDetail) {
// zap.L().Info("SmartTraceDetail feature is not enabled in this plan")
// ah.APIHandler.SearchTraces(w, r)
// return
// }
// searchTracesParams, err := baseapp.ParseSearchTracesParams(r)
// if err != nil {
// RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params")
// return
// }
// result, err := ah.opts.DataConnector.SearchTraces(r.Context(), searchTracesParams, db.SmartTraceAlgorithm)
// if ah.HandleError(w, err, http.StatusBadRequest) {
// return
// }
// ah.WriteJSON(w, r, result)
} }

View File

@@ -26,8 +26,9 @@ func NewDataConnector(
dialTimeout time.Duration, dialTimeout time.Duration,
cluster string, cluster string,
useLogsNewSchema bool, useLogsNewSchema bool,
useTraceNewSchema bool,
) *ClickhouseReader { ) *ClickhouseReader {
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema) ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema)
return &ClickhouseReader{ return &ClickhouseReader{
conn: ch.GetConn(), conn: ch.GetConn(),
appdb: localDB, appdb: localDB,

View File

@@ -78,6 +78,7 @@ type ServerOptions struct {
Cluster string Cluster string
GatewayUrl string GatewayUrl string
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
// Server runs HTTP api service // Server runs HTTP api service
@@ -156,6 +157,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.DialTimeout, serverOptions.DialTimeout,
serverOptions.Cluster, serverOptions.Cluster,
serverOptions.UseLogsNewSchema, serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
) )
go qb.Start(readerReady) go qb.Start(readerReady)
reader = qb reader = qb
@@ -189,6 +191,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.DisableRules, serverOptions.DisableRules,
lm, lm,
serverOptions.UseLogsNewSchema, serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
) )
if err != nil { if err != nil {
@@ -270,6 +273,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
FluxInterval: fluxInterval, FluxInterval: fluxInterval,
Gateway: gatewayProxy, Gateway: gatewayProxy,
UseLogsNewSchema: serverOptions.UseLogsNewSchema, UseLogsNewSchema: serverOptions.UseLogsNewSchema,
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
} }
apiHandler, err := api.NewAPIHandler(apiOpts) apiHandler, err := api.NewAPIHandler(apiOpts)
@@ -736,7 +740,8 @@ func makeRulesManager(
cache cache.Cache, cache cache.Cache,
disableRules bool, disableRules bool,
fm baseint.FeatureLookup, fm baseint.FeatureLookup,
useLogsNewSchema bool) (*baserules.Manager, error) { useLogsNewSchema bool,
useTraceNewSchema bool) (*baserules.Manager, error) {
// create engine // create engine
pqle, err := pqle.FromConfigPath(promConfigPath) pqle, err := pqle.FromConfigPath(promConfigPath)
@@ -767,6 +772,7 @@ func makeRulesManager(
PrepareTaskFunc: rules.PrepareTaskFunc, PrepareTaskFunc: rules.PrepareTaskFunc,
UseLogsNewSchema: useLogsNewSchema, UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
} }
// create Manager // create Manager

View File

@@ -94,6 +94,7 @@ func main() {
var cluster string var cluster string
var useLogsNewSchema bool var useLogsNewSchema bool
var useTraceNewSchema bool
var cacheConfigPath, fluxInterval string var cacheConfigPath, fluxInterval string
var enableQueryServiceLogOTLPExport bool var enableQueryServiceLogOTLPExport bool
var preferSpanMetrics bool var preferSpanMetrics bool
@@ -104,6 +105,7 @@ func main() {
var gatewayUrl string var gatewayUrl string
flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs") flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces")
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)") flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
@@ -143,6 +145,7 @@ func main() {
Cluster: cluster, Cluster: cluster,
GatewayUrl: gatewayUrl, GatewayUrl: gatewayUrl,
UseLogsNewSchema: useLogsNewSchema, UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
} }
// Read the jwt secret key // Read the jwt secret key

View File

@@ -21,6 +21,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.FF, opts.FF,
opts.Reader, opts.Reader,
opts.UseLogsNewSchema, opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay), baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
) )

View File

@@ -45,6 +45,9 @@ const (
defaultLogsTableV2 string = "distributed_logs_v2" defaultLogsTableV2 string = "distributed_logs_v2"
defaultLogsResourceLocalTableV2 string = "logs_v2_resource" defaultLogsResourceLocalTableV2 string = "logs_v2_resource"
defaultLogsResourceTableV2 string = "distributed_logs_v2_resource" defaultLogsResourceTableV2 string = "distributed_logs_v2_resource"
defaultTraceIndexTableV3 string = "distributed_signoz_index_v3"
defaultTraceResourceTableV3 string = "distributed_traces_v3_resource"
) )
// NamespaceConfig is Clickhouse's internal configuration data // NamespaceConfig is Clickhouse's internal configuration data
@@ -82,6 +85,9 @@ type namespaceConfig struct {
LogsTableV2 string LogsTableV2 string
LogsResourceLocalTableV2 string LogsResourceLocalTableV2 string
LogsResourceTableV2 string LogsResourceTableV2 string
TraceIndexTableV3 string
TraceResourceTableV3 string
} }
// Connecto defines how to connect to the database // Connecto defines how to connect to the database
@@ -174,6 +180,9 @@ func NewOptions(
LogsLocalTableV2: defaultLogsLocalTableV2, LogsLocalTableV2: defaultLogsLocalTableV2,
LogsResourceTableV2: defaultLogsResourceTableV2, LogsResourceTableV2: defaultLogsResourceTableV2,
LogsResourceLocalTableV2: defaultLogsResourceLocalTableV2, LogsResourceLocalTableV2: defaultLogsResourceLocalTableV2,
TraceIndexTableV3: defaultTraceIndexTableV3,
TraceResourceTableV3: defaultTraceResourceTableV3,
}, },
others: make(map[string]*namespaceConfig, len(otherNamespaces)), others: make(map[string]*namespaceConfig, len(otherNamespaces)),
} }

View File

@@ -146,8 +146,12 @@ type ClickHouseReader struct {
cluster string cluster string
useLogsNewSchema bool useLogsNewSchema bool
useTraceNewSchema bool
logsTableName string logsTableName string
logsLocalTableName string logsLocalTableName string
traceIndexTableV3 string
traceResourceTableV3 string
} }
// NewTraceReader returns a TraceReader for the database // NewTraceReader returns a TraceReader for the database
@@ -160,6 +164,7 @@ func NewReader(
dialTimeout time.Duration, dialTimeout time.Duration,
cluster string, cluster string,
useLogsNewSchema bool, useLogsNewSchema bool,
useTraceNewSchema bool,
) *ClickHouseReader { ) *ClickHouseReader {
datasource := os.Getenv("ClickHouseUrl") datasource := os.Getenv("ClickHouseUrl")
@@ -170,7 +175,7 @@ func NewReader(
zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err)) zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err))
} }
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema) return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema)
} }
func NewReaderFromClickhouseConnection( func NewReaderFromClickhouseConnection(
@@ -181,6 +186,7 @@ func NewReaderFromClickhouseConnection(
featureFlag interfaces.FeatureLookup, featureFlag interfaces.FeatureLookup,
cluster string, cluster string,
useLogsNewSchema bool, useLogsNewSchema bool,
useTraceNewSchema bool,
) *ClickHouseReader { ) *ClickHouseReader {
alertManager, err := am.New() alertManager, err := am.New()
if err != nil { if err != nil {
@@ -246,6 +252,7 @@ func NewReaderFromClickhouseConnection(
queryProgressTracker: queryprogress.NewQueryProgressTracker(), queryProgressTracker: queryprogress.NewQueryProgressTracker(),
useLogsNewSchema: useLogsNewSchema, useLogsNewSchema: useLogsNewSchema,
useTraceNewSchema: useTraceNewSchema,
logsTableV2: options.primary.LogsTableV2, logsTableV2: options.primary.LogsTableV2,
logsLocalTableV2: options.primary.LogsLocalTableV2, logsLocalTableV2: options.primary.LogsLocalTableV2,
@@ -253,6 +260,9 @@ func NewReaderFromClickhouseConnection(
logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2, logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2,
logsTableName: logsTableName, logsTableName: logsTableName,
logsLocalTableName: logsLocalTableName, logsLocalTableName: logsLocalTableName,
traceIndexTableV3: options.primary.TraceIndexTableV3,
traceResourceTableV3: options.primary.TraceResourceTableV3,
} }
} }
@@ -1666,10 +1676,125 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU
return &usageItems, nil return &usageItems, nil
} }
func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) {
var countSpans uint64
// TODO(nitya): check if we can use timestamp filter here
countQuery := fmt.Sprintf("SELECT count() as count from %s.%s WHERE traceID=$1", r.TraceDB, r.traceIndexTableV3)
err := r.db.QueryRow(ctx, countQuery, params.TraceID).Scan(&countSpans)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, fmt.Errorf("error in processing sql query")
}
if countSpans > uint64(params.MaxSpansInTrace) {
zap.L().Error("Max spans allowed in a trace limit reached", zap.Int("MaxSpansInTrace", params.MaxSpansInTrace),
zap.Uint64("Count", countSpans))
userEmail, err := auth.GetEmailFromJwt(ctx)
if err == nil {
data := map[string]interface{}{
"traceSize": countSpans,
"maxSpansInTraceLimit": params.MaxSpansInTrace,
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_MAX_SPANS_ALLOWED_LIMIT_REACHED, data, userEmail, true, false)
}
return nil, fmt.Errorf("max spans allowed in trace limit reached, please contact support for more details")
}
userEmail, err := auth.GetEmailFromJwt(ctx)
if err == nil {
data := map[string]interface{}{
"traceSize": countSpans,
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_TRACE_DETAIL_API, data, userEmail, true, false)
}
var startTime, endTime, durationNano uint64
var searchScanResponses []model.SearchSpanResponseItemV2
query := fmt.Sprintf("SELECT timestamp, durationNano, spanID, traceID, hasError, kind, serviceName, name, references, attributes_string, events, statusMessage, statusCodeString, spanKind FROM %s.%s WHERE traceID=$1", r.TraceDB, r.traceIndexTableV3)
start := time.Now()
err = r.db.Select(ctx, &searchScanResponses, query, params.TraceID)
zap.L().Info(query)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, fmt.Errorf("error in processing sql query")
}
end := time.Now()
zap.L().Debug("getTraceSQLQuery took: ", zap.Duration("duration", end.Sub(start)))
searchSpansResult := []model.SearchSpansResult{
{
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"},
Events: make([][]interface{}, len(searchScanResponses)),
IsSubTree: false,
},
}
searchSpanResponses := []model.SearchSpanResponseItem{}
start = time.Now()
for _, item := range searchScanResponses {
ref := []model.OtelSpanRef{}
err := json.Unmarshal([]byte(item.References), &ref)
if err != nil {
zap.L().Error("Error unmarshalling references", zap.Error(err))
return nil, err
}
// TODO(nitya): add attributes_number and attributes_bool and resources_string to this
jsonItem := model.SearchSpanResponseItem{
SpanID: item.SpanID,
TraceID: item.TraceID,
ServiceName: item.ServiceName,
Name: item.Name,
Kind: int32(item.Kind),
DurationNano: int64(item.DurationNano),
HasError: item.HasError,
StatusMessage: item.StatusMessage,
StatusCodeString: item.StatusCodeString,
SpanKind: item.SpanKind,
References: ref,
Events: item.Events,
TagMap: item.TagMap,
}
jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000)
searchSpanResponses = append(searchSpanResponses, jsonItem)
if startTime == 0 || jsonItem.TimeUnixNano < startTime {
startTime = jsonItem.TimeUnixNano
}
if endTime == 0 || jsonItem.TimeUnixNano > endTime {
endTime = jsonItem.TimeUnixNano
}
if durationNano == 0 || uint64(jsonItem.DurationNano) > durationNano {
durationNano = uint64(jsonItem.DurationNano)
}
}
end = time.Now()
zap.L().Debug("getTraceSQLQuery unmarshal took: ", zap.Duration("duration", end.Sub(start)))
for i, item := range searchSpanResponses {
spanEvents := item.GetValues()
searchSpansResult[0].Events[i] = spanEvents
}
searchSpansResult[0].StartTimestampMillis = startTime - (durationNano / 1000000)
searchSpansResult[0].EndTimestampMillis = endTime + (durationNano / 1000000)
return &searchSpansResult, nil
}
func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams, func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams,
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string, smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) { levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
if r.useTraceNewSchema {
return r.SearchTracesV2(ctx, params)
}
var countSpans uint64 var countSpans uint64
countQuery := fmt.Sprintf("SELECT count() as count from %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable) countQuery := fmt.Sprintf("SELECT count() as count from %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable)
err := r.db.QueryRow(ctx, countQuery, params.TraceID).Scan(&countSpans) err := r.db.QueryRow(ctx, countQuery, params.TraceID).Scan(&countSpans)
@@ -1746,6 +1871,7 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.Searc
err = r.featureFlags.CheckFeature(model.SmartTraceDetail) err = r.featureFlags.CheckFeature(model.SmartTraceDetail)
smartAlgoEnabled := err == nil smartAlgoEnabled := err == nil
// TODO(nitya): this will never run remove it
if len(searchScanResponses) > params.SpansRenderLimit && smartAlgoEnabled { if len(searchScanResponses) > params.SpansRenderLimit && smartAlgoEnabled {
start = time.Now() start = time.Now()
searchSpansResult, err = smartTraceAlgorithm(searchSpanResponses, params.SpanID, params.LevelUp, params.LevelDown, params.SpansRenderLimit) searchSpansResult, err = smartTraceAlgorithm(searchSpanResponses, params.SpanID, params.LevelUp, params.LevelDown, params.SpansRenderLimit)

View File

@@ -39,6 +39,7 @@ import (
querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2" querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/cache" "go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
@@ -111,6 +112,7 @@ type APIHandler struct {
Upgrader *websocket.Upgrader Upgrader *websocket.Upgrader
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
hostsRepo *inframetrics.HostsRepo hostsRepo *inframetrics.HostsRepo
processesRepo *inframetrics.ProcessesRepo processesRepo *inframetrics.ProcessesRepo
@@ -152,6 +154,8 @@ type APIHandlerOpts struct {
// Use Logs New schema // Use Logs New schema
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
// NewAPIHandler returns an APIHandler // NewAPIHandler returns an APIHandler
@@ -169,6 +173,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
FluxInterval: opts.FluxInterval, FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags, FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema, UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
} }
querierOptsV2 := querierV2.QuerierOptions{ querierOptsV2 := querierV2.QuerierOptions{
@@ -178,6 +183,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
FluxInterval: opts.FluxInterval, FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags, FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema, UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
} }
querier := querier.NewQuerier(querierOpts) querier := querier.NewQuerier(querierOpts)
@@ -203,6 +209,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
querier: querier, querier: querier,
querierV2: querierv2, querierV2: querierv2,
UseLogsNewSchema: opts.UseLogsNewSchema, UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
hostsRepo: hostsRepo, hostsRepo: hostsRepo,
processesRepo: processesRepo, processesRepo: processesRepo,
} }
@@ -212,9 +219,14 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
logsQueryBuilder = logsv4.PrepareLogsQuery logsQueryBuilder = logsv4.PrepareLogsQuery
} }
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if opts.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
builderOpts := queryBuilder.QueryBuilderOptions{ builderOpts := queryBuilder.QueryBuilderOptions{
BuildMetricQuery: metricsv3.PrepareMetricQuery, BuildMetricQuery: metricsv3.PrepareMetricQuery,
BuildTraceQuery: tracesV3.PrepareTracesQuery, BuildTraceQuery: tracesQueryBuilder,
BuildLogQuery: logsQueryBuilder, BuildLogQuery: logsQueryBuilder,
} }
aH.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, aH.featureFlags) aH.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, aH.featureFlags)
@@ -4360,9 +4372,14 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
RespondError(w, apiErrObj, errQuriesByName) RespondError(w, apiErrObj, errQuriesByName)
return return
} }
if aH.UseTraceNewSchema {
tracesV4.Enrich(queryRangeParams, spanKeys)
} else {
tracesV3.Enrich(queryRangeParams, spanKeys) tracesV3.Enrich(queryRangeParams, spanKeys)
} }
}
// WARN: Only works for AND operator in traces query // WARN: Only works for AND operator in traces query
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder { if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
// check if traceID is used as filter (with equal/similar operator) in traces query if yes add timestamp filter to queryRange params // check if traceID is used as filter (with equal/similar operator) in traces query if yes add timestamp filter to queryRange params

View File

@@ -10,6 +10,7 @@ import (
logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4"
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
@@ -158,11 +159,16 @@ func (q *querier) runBuilderQuery(
if builderQuery.DataSource == v3.DataSourceTraces { if builderQuery.DataSource == v3.DataSourceTraces {
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if q.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
var query string var query string
var err error var err error
// for ts query with group by and limit form two queries // for ts query with group by and limit form two queries
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
limitQuery, err := tracesV3.PrepareTracesQuery( limitQuery, err := tracesQueryBuilder(
start, start,
end, end,
params.CompositeQuery.PanelType, params.CompositeQuery.PanelType,
@@ -173,7 +179,7 @@ func (q *querier) runBuilderQuery(
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
return return
} }
placeholderQuery, err := tracesV3.PrepareTracesQuery( placeholderQuery, err := tracesQueryBuilder(
start, start,
end, end,
params.CompositeQuery.PanelType, params.CompositeQuery.PanelType,
@@ -186,7 +192,7 @@ func (q *querier) runBuilderQuery(
} }
query = fmt.Sprintf(placeholderQuery, limitQuery) query = fmt.Sprintf(placeholderQuery, limitQuery)
} else { } else {
query, err = tracesV3.PrepareTracesQuery( query, err = tracesQueryBuilder(
start, start,
end, end,
params.CompositeQuery.PanelType, params.CompositeQuery.PanelType,

View File

@@ -11,6 +11,8 @@ import (
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
chErrors "go.signoz.io/signoz/pkg/query-service/errors" chErrors "go.signoz.io/signoz/pkg/query-service/errors"
"go.signoz.io/signoz/pkg/query-service/querycache" "go.signoz.io/signoz/pkg/query-service/querycache"
@@ -53,6 +55,7 @@ type querier struct {
returnedErr error returnedErr error
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
type QuerierOptions struct { type QuerierOptions struct {
@@ -67,6 +70,7 @@ type QuerierOptions struct {
ReturnedSeries []*v3.Series ReturnedSeries []*v3.Series
ReturnedErr error ReturnedErr error
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
func NewQuerier(opts QuerierOptions) interfaces.Querier { func NewQuerier(opts QuerierOptions) interfaces.Querier {
@@ -74,6 +78,10 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
if opts.UseLogsNewSchema { if opts.UseLogsNewSchema {
logsQueryBuilder = logsV4.PrepareLogsQuery logsQueryBuilder = logsV4.PrepareLogsQuery
} }
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if opts.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval)) qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
@@ -85,7 +93,7 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
fluxInterval: opts.FluxInterval, fluxInterval: opts.FluxInterval,
builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{ builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{
BuildTraceQuery: tracesV3.PrepareTracesQuery, BuildTraceQuery: tracesQueryBuilder,
BuildLogQuery: logsQueryBuilder, BuildLogQuery: logsQueryBuilder,
BuildMetricQuery: metricsV3.PrepareMetricQuery, BuildMetricQuery: metricsV3.PrepareMetricQuery,
}, opts.FeatureLookup), }, opts.FeatureLookup),
@@ -95,6 +103,7 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
returnedSeries: opts.ReturnedSeries, returnedSeries: opts.ReturnedSeries,
returnedErr: opts.ReturnedErr, returnedErr: opts.ReturnedErr,
UseLogsNewSchema: opts.UseLogsNewSchema, UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
} }
} }

View File

@@ -11,6 +11,7 @@ import (
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
@@ -158,11 +159,16 @@ func (q *querier) runBuilderQuery(
if builderQuery.DataSource == v3.DataSourceTraces { if builderQuery.DataSource == v3.DataSourceTraces {
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if q.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
var query string var query string
var err error var err error
// for ts query with group by and limit form two queries // for ts query with group by and limit form two queries
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
limitQuery, err := tracesV3.PrepareTracesQuery( limitQuery, err := tracesQueryBuilder(
start, start,
end, end,
params.CompositeQuery.PanelType, params.CompositeQuery.PanelType,
@@ -173,7 +179,7 @@ func (q *querier) runBuilderQuery(
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
return return
} }
placeholderQuery, err := tracesV3.PrepareTracesQuery( placeholderQuery, err := tracesQueryBuilder(
start, start,
end, end,
params.CompositeQuery.PanelType, params.CompositeQuery.PanelType,
@@ -186,7 +192,7 @@ func (q *querier) runBuilderQuery(
} }
query = fmt.Sprintf(placeholderQuery, limitQuery) query = fmt.Sprintf(placeholderQuery, limitQuery)
} else { } else {
query, err = tracesV3.PrepareTracesQuery( query, err = tracesQueryBuilder(
start, start,
end, end,
params.CompositeQuery.PanelType, params.CompositeQuery.PanelType,

View File

@@ -11,6 +11,7 @@ import (
metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
chErrors "go.signoz.io/signoz/pkg/query-service/errors" chErrors "go.signoz.io/signoz/pkg/query-service/errors"
"go.signoz.io/signoz/pkg/query-service/querycache" "go.signoz.io/signoz/pkg/query-service/querycache"
@@ -52,6 +53,7 @@ type querier struct {
returnedSeries []*v3.Series returnedSeries []*v3.Series
returnedErr error returnedErr error
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
type QuerierOptions struct { type QuerierOptions struct {
@@ -66,6 +68,7 @@ type QuerierOptions struct {
ReturnedSeries []*v3.Series ReturnedSeries []*v3.Series
ReturnedErr error ReturnedErr error
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
func NewQuerier(opts QuerierOptions) interfaces.Querier { func NewQuerier(opts QuerierOptions) interfaces.Querier {
@@ -74,6 +77,11 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
logsQueryBuilder = logsV4.PrepareLogsQuery logsQueryBuilder = logsV4.PrepareLogsQuery
} }
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if opts.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval)) qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
return &querier{ return &querier{
@@ -84,7 +92,7 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
fluxInterval: opts.FluxInterval, fluxInterval: opts.FluxInterval,
builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{ builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{
BuildTraceQuery: tracesV3.PrepareTracesQuery, BuildTraceQuery: tracesQueryBuilder,
BuildLogQuery: logsQueryBuilder, BuildLogQuery: logsQueryBuilder,
BuildMetricQuery: metricsV4.PrepareMetricQuery, BuildMetricQuery: metricsV4.PrepareMetricQuery,
}, opts.FeatureLookup), }, opts.FeatureLookup),

View File

@@ -67,6 +67,7 @@ type ServerOptions struct {
FluxInterval string FluxInterval string
Cluster string Cluster string
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
// Server runs HTTP, Mux and a grpc server // Server runs HTTP, Mux and a grpc server
@@ -130,6 +131,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.DialTimeout, serverOptions.DialTimeout,
serverOptions.Cluster, serverOptions.Cluster,
serverOptions.UseLogsNewSchema, serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
) )
go clickhouseReader.Start(readerReady) go clickhouseReader.Start(readerReady)
reader = clickhouseReader reader = clickhouseReader
@@ -157,7 +159,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
rm, err := makeRulesManager( rm, err := makeRulesManager(
serverOptions.PromConfigPath, serverOptions.PromConfigPath,
constants.GetAlertManagerApiPrefix(), constants.GetAlertManagerApiPrefix(),
serverOptions.RuleRepoURL, localDB, reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema) serverOptions.RuleRepoURL, localDB, reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -202,6 +204,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
Cache: c, Cache: c,
FluxInterval: fluxInterval, FluxInterval: fluxInterval,
UseLogsNewSchema: serverOptions.UseLogsNewSchema, UseLogsNewSchema: serverOptions.UseLogsNewSchema,
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@@ -721,7 +724,8 @@ func makeRulesManager(
cache cache.Cache, cache cache.Cache,
disableRules bool, disableRules bool,
fm interfaces.FeatureLookup, fm interfaces.FeatureLookup,
useLogsNewSchema bool) (*rules.Manager, error) { useLogsNewSchema bool,
useTraceNewSchema bool) (*rules.Manager, error) {
// create engine // create engine
pqle, err := pqle.FromReader(ch) pqle, err := pqle.FromReader(ch)
@@ -750,6 +754,7 @@ func makeRulesManager(
Cache: cache, Cache: cache,
EvalDelay: constants.GetEvalDelay(), EvalDelay: constants.GetEvalDelay(),
UseLogsNewSchema: useLogsNewSchema, UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
} }
// create Manager // create Manager

View File

@@ -10,7 +10,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/utils" "go.signoz.io/signoz/pkg/query-service/utils"
) )
var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{ var AggregateOperatorToPercentile = map[v3.AggregateOperator]float64{
v3.AggregateOperatorP05: 0.05, v3.AggregateOperatorP05: 0.05,
v3.AggregateOperatorP10: 0.10, v3.AggregateOperatorP10: 0.10,
v3.AggregateOperatorP20: 0.20, v3.AggregateOperatorP20: 0.20,
@@ -22,7 +22,7 @@ var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{
v3.AggregateOperatorP99: 0.99, v3.AggregateOperatorP99: 0.99,
} }
var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{ var AggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{
v3.AggregateOperatorAvg: "avg", v3.AggregateOperatorAvg: "avg",
v3.AggregateOperatorMax: "max", v3.AggregateOperatorMax: "max",
v3.AggregateOperatorMin: "min", v3.AggregateOperatorMin: "min",
@@ -109,7 +109,7 @@ func getSelectLabels(aggregatorOperator v3.AggregateOperator, groupBy []v3.Attri
return selectLabels return selectLabels
} }
func getSelectKeys(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string { func GetSelectKeys(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string {
var selectLabels []string var selectLabels []string
if aggregatorOperator == v3.AggregateOperatorNoOp { if aggregatorOperator == v3.AggregateOperatorNoOp {
return "" return ""
@@ -173,7 +173,7 @@ func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) {
conditions = append(conditions, fmt.Sprintf(operator, columnName, fmtVal)) conditions = append(conditions, fmt.Sprintf(operator, columnName, fmtVal))
case v3.FilterOperatorExists, v3.FilterOperatorNotExists: case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
if item.Key.IsColumn { if item.Key.IsColumn {
subQuery, err := existsSubQueryForFixedColumn(item.Key, item.Operator) subQuery, err := ExistsSubQueryForFixedColumn(item.Key, item.Operator)
if err != nil { if err != nil {
return "", err return "", err
} }
@@ -199,7 +199,7 @@ func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) {
return queryString, nil return queryString, nil
} }
func existsSubQueryForFixedColumn(key v3.AttributeKey, op v3.FilterOperator) (string, error) { func ExistsSubQueryForFixedColumn(key v3.AttributeKey, op v3.FilterOperator) (string, error) {
if key.DataType == v3.AttributeKeyDataTypeString { if key.DataType == v3.AttributeKeyDataTypeString {
if op == v3.FilterOperatorExists { if op == v3.FilterOperatorExists {
return fmt.Sprintf("%s %s ''", key.Key, tracesOperatorMappingV3[v3.FilterOperatorNotEqual]), nil return fmt.Sprintf("%s %s ''", key.Key, tracesOperatorMappingV3[v3.FilterOperatorNotEqual]), nil
@@ -244,7 +244,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan
selectLabels := getSelectLabels(mq.AggregateOperator, mq.GroupBy) selectLabels := getSelectLabels(mq.AggregateOperator, mq.GroupBy)
having := having(mq.Having) having := Having(mq.Having)
if having != "" { if having != "" {
having = " having " + having having = " having " + having
} }
@@ -272,7 +272,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan
// we don't need value for first query // we don't need value for first query
if options.GraphLimitQtype == constants.FirstQueryGraphLimit { if options.GraphLimitQtype == constants.FirstQueryGraphLimit {
queryTmpl = "SELECT " + getSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")" queryTmpl = "SELECT " + GetSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")"
} }
emptyValuesInGroupByFilter, err := handleEmptyValuesInGroupBy(mq.GroupBy) emptyValuesInGroupByFilter, err := handleEmptyValuesInGroupBy(mq.GroupBy)
@@ -281,7 +281,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan
} }
filterSubQuery += emptyValuesInGroupByFilter filterSubQuery += emptyValuesInGroupByFilter
groupBy := groupByAttributeKeyTags(panelType, options.GraphLimitQtype, mq.GroupBy...) groupBy := GroupByAttributeKeyTags(panelType, options.GraphLimitQtype, mq.GroupBy...)
if groupBy != "" { if groupBy != "" {
groupBy = " group by " + groupBy groupBy = " group by " + groupBy
} }
@@ -291,7 +291,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan
} }
if options.GraphLimitQtype == constants.SecondQueryGraphLimit { if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", getSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "%s)" filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "%s)"
} }
aggregationKey := "" aggregationKey := ""
@@ -311,7 +311,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan
rate = rate / 60.0 rate = rate / 60.0
} }
op := fmt.Sprintf("%s(%s)/%f", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate) op := fmt.Sprintf("%s(%s)/%f", AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy) query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil return query, nil
case case
@@ -324,17 +324,17 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan
v3.AggregateOperatorP90, v3.AggregateOperatorP90,
v3.AggregateOperatorP95, v3.AggregateOperatorP95,
v3.AggregateOperatorP99: v3.AggregateOperatorP99:
op := fmt.Sprintf("quantile(%v)(%s)", aggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey) op := fmt.Sprintf("quantile(%v)(%s)", AggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy) query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil return query, nil
case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax:
op := fmt.Sprintf("%s(%s)", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey) op := fmt.Sprintf("%s(%s)", AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy) query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil return query, nil
case v3.AggregateOperatorCount: case v3.AggregateOperatorCount:
if mq.AggregateAttribute.Key != "" { if mq.AggregateAttribute.Key != "" {
if mq.AggregateAttribute.IsColumn { if mq.AggregateAttribute.IsColumn {
subQuery, err := existsSubQueryForFixedColumn(mq.AggregateAttribute, v3.FilterOperatorExists) subQuery, err := ExistsSubQueryForFixedColumn(mq.AggregateAttribute, v3.FilterOperatorExists)
if err == nil { if err == nil {
filterSubQuery = fmt.Sprintf("%s AND %s", filterSubQuery, subQuery) filterSubQuery = fmt.Sprintf("%s AND %s", filterSubQuery, subQuery)
} }
@@ -354,9 +354,9 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan
var query string var query string
if panelType == v3.PanelTypeTrace { if panelType == v3.PanelTypeTrace {
withSubQuery := fmt.Sprintf(constants.TracesExplorerViewSQLSelectWithSubQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME, spanIndexTableTimeFilter, filterSubQuery) withSubQuery := fmt.Sprintf(constants.TracesExplorerViewSQLSelectWithSubQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME, spanIndexTableTimeFilter, filterSubQuery)
withSubQuery = addLimitToQuery(withSubQuery, mq.Limit) withSubQuery = AddLimitToQuery(withSubQuery, mq.Limit)
if mq.Offset != 0 { if mq.Offset != 0 {
withSubQuery = addOffsetToQuery(withSubQuery, mq.Offset) withSubQuery = AddOffsetToQuery(withSubQuery, mq.Offset)
} }
// query = withSubQuery + ") " + fmt.Sprintf(constants.TracesExplorerViewSQLSelectQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_TABLENAME, constants.SIGNOZ_SPAN_INDEX_TABLENAME) // query = withSubQuery + ") " + fmt.Sprintf(constants.TracesExplorerViewSQLSelectQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_TABLENAME, constants.SIGNOZ_SPAN_INDEX_TABLENAME)
query = fmt.Sprintf(constants.TracesExplorerViewSQLSelectBeforeSubQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_TABLENAME) + withSubQuery + ") " + fmt.Sprintf(constants.TracesExplorerViewSQLSelectAfterSubQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_TABLENAME, spanIndexTableTimeFilter) query = fmt.Sprintf(constants.TracesExplorerViewSQLSelectBeforeSubQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_TABLENAME) + withSubQuery + ") " + fmt.Sprintf(constants.TracesExplorerViewSQLSelectAfterSubQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_TABLENAME, spanIndexTableTimeFilter)
@@ -403,7 +403,7 @@ func groupBy(panelType v3.PanelType, graphLimitQtype string, tags ...string) str
return strings.Join(tags, ",") return strings.Join(tags, ",")
} }
func groupByAttributeKeyTags(panelType v3.PanelType, graphLimitQtype string, tags ...v3.AttributeKey) string { func GroupByAttributeKeyTags(panelType v3.PanelType, graphLimitQtype string, tags ...v3.AttributeKey) string {
groupTags := []string{} groupTags := []string{}
for _, tag := range tags { for _, tag := range tags {
groupTags = append(groupTags, fmt.Sprintf("`%s`", tag.Key)) groupTags = append(groupTags, fmt.Sprintf("`%s`", tag.Key))
@@ -456,7 +456,7 @@ func orderByAttributeKeyTags(panelType v3.PanelType, items []v3.OrderBy, tags []
return str return str
} }
func having(items []v3.Having) string { func Having(items []v3.Having) string {
// aggregate something and filter on that aggregate // aggregate something and filter on that aggregate
var having []string var having []string
for _, item := range items { for _, item := range items {
@@ -465,7 +465,7 @@ func having(items []v3.Having) string {
return strings.Join(having, " AND ") return strings.Join(having, " AND ")
} }
func reduceToQuery(query string, reduceTo v3.ReduceToOperator, _ v3.AggregateOperator) (string, error) { func ReduceToQuery(query string, reduceTo v3.ReduceToOperator, _ v3.AggregateOperator) (string, error) {
var groupBy string var groupBy string
switch reduceTo { switch reduceTo {
@@ -485,14 +485,14 @@ func reduceToQuery(query string, reduceTo v3.ReduceToOperator, _ v3.AggregateOpe
return query, nil return query, nil
} }
func addLimitToQuery(query string, limit uint64) string { func AddLimitToQuery(query string, limit uint64) string {
if limit == 0 { if limit == 0 {
limit = 100 limit = 100
} }
return fmt.Sprintf("%s LIMIT %d", query, limit) return fmt.Sprintf("%s LIMIT %d", query, limit)
} }
func addOffsetToQuery(query string, offset uint64) string { func AddOffsetToQuery(query string, offset uint64) string {
return fmt.Sprintf("%s OFFSET %d", query, offset) return fmt.Sprintf("%s OFFSET %d", query, offset)
} }
@@ -509,7 +509,7 @@ func PrepareTracesQuery(start, end int64, panelType v3.PanelType, mq *v3.Builder
if err != nil { if err != nil {
return "", err return "", err
} }
query = addLimitToQuery(query, mq.Limit) query = AddLimitToQuery(query, mq.Limit)
return query, nil return query, nil
} else if options.GraphLimitQtype == constants.SecondQueryGraphLimit { } else if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
@@ -525,13 +525,13 @@ func PrepareTracesQuery(start, end int64, panelType v3.PanelType, mq *v3.Builder
return "", err return "", err
} }
if panelType == v3.PanelTypeValue { if panelType == v3.PanelTypeValue {
query, err = reduceToQuery(query, mq.ReduceTo, mq.AggregateOperator) query, err = ReduceToQuery(query, mq.ReduceTo, mq.AggregateOperator)
} }
if panelType == v3.PanelTypeList || panelType == v3.PanelTypeTable { if panelType == v3.PanelTypeList || panelType == v3.PanelTypeTable {
query = addLimitToQuery(query, mq.Limit) query = AddLimitToQuery(query, mq.Limit)
if mq.Offset != 0 { if mq.Offset != 0 {
query = addOffsetToQuery(query, mq.Offset) query = AddOffsetToQuery(query, mq.Offset)
} }
} }
return query, err return query, err

View File

@@ -0,0 +1,84 @@
package v4
import v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
func enrichKeyWithMetadata(key v3.AttributeKey, keys map[string]v3.AttributeKey) v3.AttributeKey {
// TODO(nitya) : update logic similar to logs
if key.Type == "" || key.DataType == "" {
// check if the key is present in the keys map
if existingKey, ok := keys[key.Key]; ok {
key.IsColumn = existingKey.IsColumn
key.Type = existingKey.Type
key.DataType = existingKey.DataType
} else { // if not present then set the default values
key.Type = v3.AttributeKeyTypeTag
key.DataType = v3.AttributeKeyDataTypeString
key.IsColumn = false
return key
}
}
return key
}
func Enrich(params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) {
if params.CompositeQuery.QueryType == v3.QueryTypeBuilder {
for _, query := range params.CompositeQuery.BuilderQueries {
if query.DataSource == v3.DataSourceTraces {
EnrichTracesQuery(query, keys)
}
}
}
}
func EnrichTracesQuery(query *v3.BuilderQuery, keys map[string]v3.AttributeKey) {
// enrich aggregate attribute
query.AggregateAttribute = enrichKeyWithMetadata(query.AggregateAttribute, keys)
// enrich filter items
if query.Filters != nil && len(query.Filters.Items) > 0 {
for idx, filter := range query.Filters.Items {
query.Filters.Items[idx].Key = enrichKeyWithMetadata(filter.Key, keys)
// if the serviceName column is used, use the corresponding resource attribute as well during filtering
if filter.Key.Key == "serviceName" && filter.Key.IsColumn {
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "service.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
IsColumn: false,
},
Operator: filter.Operator,
Value: filter.Value,
})
}
}
}
// enrich group by
for idx, groupBy := range query.GroupBy {
query.GroupBy[idx] = enrichKeyWithMetadata(groupBy, keys)
}
// enrich order by
query.OrderBy = enrichOrderBy(query.OrderBy, keys)
// enrich select columns
for idx, selectColumn := range query.SelectColumns {
query.SelectColumns[idx] = enrichKeyWithMetadata(selectColumn, keys)
}
}
func enrichOrderBy(items []v3.OrderBy, keys map[string]v3.AttributeKey) []v3.OrderBy {
enrichedItems := []v3.OrderBy{}
for i := 0; i < len(items); i++ {
attributeKey := enrichKeyWithMetadata(v3.AttributeKey{
Key: items[i].ColumnName,
}, keys)
enrichedItems = append(enrichedItems, v3.OrderBy{
ColumnName: items[i].ColumnName,
Order: items[i].Order,
Key: attributeKey.Key,
DataType: attributeKey.DataType,
Type: attributeKey.Type,
IsColumn: attributeKey.IsColumn,
})
}
return enrichedItems
}

View File

@@ -0,0 +1,57 @@
package v4
import (
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
var testColumnName = []struct {
Name string
AttributeKey v3.AttributeKey
ExpectedColumn string
}{
{
Name: "resource",
AttributeKey: v3.AttributeKey{Key: "collector_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource, IsColumn: false},
ExpectedColumn: "resourceTagsMap['collector_id']",
},
{
Name: "stringAttribute",
AttributeKey: v3.AttributeKey{Key: "customer_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: false},
ExpectedColumn: "stringTagMap['customer_id']",
},
{
Name: "boolAttribute",
AttributeKey: v3.AttributeKey{Key: "has_error", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag, IsColumn: false},
ExpectedColumn: "boolTagMap['has_error']",
},
{
Name: "float64Attribute",
AttributeKey: v3.AttributeKey{Key: "count", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag, IsColumn: false},
ExpectedColumn: "numberTagMap['count']",
},
{
Name: "int64Attribute",
AttributeKey: v3.AttributeKey{Key: "count", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag, IsColumn: false},
ExpectedColumn: "numberTagMap['count']",
},
{
Name: "column",
AttributeKey: v3.AttributeKey{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true},
ExpectedColumn: "name",
},
{
Name: "missing key",
AttributeKey: v3.AttributeKey{Key: "xyz"},
ExpectedColumn: "stringTagMap['xyz']",
},
}
// func TestColumnName(t *testing.T) {
// for _, tt := range testColumnName {
// tt.AttributeKey = enrichKeyWithMetadata(tt.AttributeKey, map[string]v3.AttributeKey{})
// Convey("testColumnName", t, func() {
// Column := getColumnName(tt.AttributeKey)
// So(Column, ShouldEqual, tt.ExpectedColumn)
// })
// }
// }

View File

@@ -0,0 +1,417 @@
package v4
import (
"fmt"
"strings"
"go.signoz.io/signoz/pkg/query-service/app/resource"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
)
type Options struct {
GraphLimitQtype string
PreferRPM bool
}
var tracesOperatorMappingV3 = map[v3.FilterOperator]string{
v3.FilterOperatorIn: "IN",
v3.FilterOperatorNotIn: "NOT IN",
v3.FilterOperatorEqual: "=",
v3.FilterOperatorNotEqual: "!=",
v3.FilterOperatorLessThan: "<",
v3.FilterOperatorLessThanOrEq: "<=",
v3.FilterOperatorGreaterThan: ">",
v3.FilterOperatorGreaterThanOrEq: ">=",
v3.FilterOperatorLike: "ILIKE",
v3.FilterOperatorNotLike: "NOT ILIKE",
v3.FilterOperatorRegex: "match(%s, %s)",
v3.FilterOperatorNotRegex: "NOT match(%s, %s)",
v3.FilterOperatorContains: "ILIKE",
v3.FilterOperatorNotContains: "NOT ILIKE",
v3.FilterOperatorExists: "mapContains(%s, '%s')",
v3.FilterOperatorNotExists: "NOT has(%s%s, '%s')",
}
func getClickHouseTracesColumnType(columnType v3.AttributeKeyType) string {
if columnType == v3.AttributeKeyTypeResource {
return "resources"
}
return "attributes"
}
func getClickHouseTracesColumnDataType(columnDataType v3.AttributeKeyDataType) string {
if columnDataType == v3.AttributeKeyDataTypeFloat64 || columnDataType == v3.AttributeKeyDataTypeInt64 {
return "number"
}
if columnDataType == v3.AttributeKeyDataTypeBool {
return "bool"
}
return "string"
}
func getColumnName(key v3.AttributeKey) string {
// TODO (nitya):
// consider routing things like serviceName col to service.name resource attribute for filtering
// consider using static details for some columns
if key.IsColumn {
return "`" + key.Key + "`"
}
keyType := getClickHouseTracesColumnType(key.Type)
keyDType := getClickHouseTracesColumnDataType(key.DataType)
return fmt.Sprintf("%s_%s['%s']", keyType, keyDType, key.Key)
}
// getSelectLabels returns the select labels for the query based on groupBy and aggregateOperator
func getSelectLabels(groupBy []v3.AttributeKey) string {
var labels []string
for _, tag := range groupBy {
name := getColumnName(tag)
labels = append(labels, fmt.Sprintf(" %s as `%s`", name, tag.Key))
}
return strings.Join(labels, ",")
}
func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) {
var conditions []string
if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items {
// skip if it's a resource attribute
if item.Key.Type == v3.AttributeKeyTypeResource {
continue
}
val := item.Value
// generate the key
columnName := getColumnName(item.Key)
var fmtVal string
item.Operator = v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
if item.Operator != v3.FilterOperatorExists && item.Operator != v3.FilterOperatorNotExists {
var err error
val, err = utils.ValidateAndCastValue(val, item.Key.DataType)
if err != nil {
return "", fmt.Errorf("invalid value for key %s: %v", item.Key.Key, err)
}
}
if val != nil {
fmtVal = utils.ClickHouseFormattedValue(val)
}
if operator, ok := tracesOperatorMappingV3[item.Operator]; ok {
switch item.Operator {
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
val = utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
conditions = append(conditions, fmt.Sprintf("%s %s '%%%s%%'", columnName, operator, val))
case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex:
conditions = append(conditions, fmt.Sprintf(operator, columnName, fmtVal))
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
if item.Key.IsColumn {
subQuery, err := tracesV3.ExistsSubQueryForFixedColumn(item.Key, item.Operator)
if err != nil {
return "", err
}
conditions = append(conditions, subQuery)
} else {
cType := getClickHouseTracesColumnType(item.Key.Type)
cDataType := getClickHouseTracesColumnDataType(item.Key.DataType)
col := fmt.Sprintf("%s_%s", cType, cDataType)
conditions = append(conditions, fmt.Sprintf(operator, col, item.Key.Key))
}
default:
conditions = append(conditions, fmt.Sprintf("%s %s %s", columnName, operator, fmtVal))
}
} else {
return "", fmt.Errorf("unsupported operator %s", item.Operator)
}
}
}
queryString := strings.Join(conditions, " AND ")
return queryString, nil
}
func handleEmptyValuesInGroupBy(groupBy []v3.AttributeKey) (string, error) {
filterItems := []v3.FilterItem{}
if len(groupBy) != 0 {
for _, item := range groupBy {
if !item.IsColumn {
filterItems = append(filterItems, v3.FilterItem{
Key: item,
Operator: v3.FilterOperatorExists,
})
}
}
}
if len(filterItems) != 0 {
filterSet := v3.FilterSet{
Operator: "AND",
Items: filterItems,
}
return buildTracesFilterQuery(&filterSet)
}
return "", nil
}
const NANOSECOND = 1000000000
// orderBy returns a string of comma separated tags for order by clause
// if there are remaining items which are not present in tags they are also added
// if the order is not specified, it defaults to ASC
func orderBy(panelType v3.PanelType, items []v3.OrderBy, tagLookup map[string]struct{}) []string {
var orderBy []string
for _, item := range items {
if item.ColumnName == constants.SigNozOrderByValue {
orderBy = append(orderBy, fmt.Sprintf("value %s", item.Order))
} else if _, ok := tagLookup[item.ColumnName]; ok {
orderBy = append(orderBy, fmt.Sprintf("`%s` %s", item.ColumnName, item.Order))
} else if panelType == v3.PanelTypeList {
attr := v3.AttributeKey{Key: item.ColumnName, DataType: item.DataType, Type: item.Type, IsColumn: item.IsColumn}
name := getColumnName(attr)
if item.IsColumn {
orderBy = append(orderBy, fmt.Sprintf("%s %s", name, item.Order))
} else {
orderBy = append(orderBy, fmt.Sprintf("%s %s", name, item.Order))
}
}
}
return orderBy
}
func orderByAttributeKeyTags(panelType v3.PanelType, items []v3.OrderBy, tags []v3.AttributeKey) string {
tagLookup := map[string]struct{}{}
for _, v := range tags {
tagLookup[v.Key] = struct{}{}
}
orderByArray := orderBy(panelType, items, tagLookup)
// TODO: check this with logs
if len(orderByArray) == 0 {
if panelType == v3.PanelTypeList {
orderByArray = append(orderByArray, constants.TIMESTAMP+" DESC")
} else if panelType == v3.PanelTypeGraph {
orderByArray = append(orderByArray, "value DESC")
}
}
str := strings.Join(orderByArray, ",")
return str
}
func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, panelType v3.PanelType, options v3.QBOptions) (string, error) {
tracesStart := utils.GetEpochNanoSecs(start)
tracesEnd := utils.GetEpochNanoSecs(end)
// -1800 this is added so that the bucket start considers all the fingerprints.
bucketStart := tracesStart/NANOSECOND - 1800
bucketEnd := tracesEnd / NANOSECOND
timeFilter := fmt.Sprintf("(timestamp >= '%d' AND timestamp <= '%d') AND (ts_bucket_start >= %d AND ts_bucket_start <= %d)", tracesStart, tracesEnd, bucketStart, bucketEnd)
filterSubQuery, err := buildTracesFilterQuery(mq.Filters)
if err != nil {
return "", err
}
if filterSubQuery != "" {
filterSubQuery = " AND " + filterSubQuery
}
resourceSubQuery, err := resource.BuildResourceSubQuery("signoz_traces", "distributed_traces_v3_resource", bucketStart, bucketEnd, mq.Filters, mq.GroupBy, mq.AggregateAttribute, false)
if err != nil {
return "", err
}
// join both the filter clauses
if resourceSubQuery != "" {
filterSubQuery = filterSubQuery + " AND (resource_fingerprint GLOBAL IN " + resourceSubQuery + ")"
}
// timerange will be sent in epoch millisecond
selectLabels := getSelectLabels(mq.GroupBy)
orderBy := orderByAttributeKeyTags(panelType, mq.OrderBy, mq.GroupBy)
if orderBy != "" {
orderBy = " order by " + orderBy
}
if mq.AggregateOperator == v3.AggregateOperatorNoOp {
var query string
if panelType == v3.PanelTypeTrace {
withSubQuery := fmt.Sprintf(constants.TracesExplorerViewSQLSelectWithSubQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_V3_LOCAL_TABLENAME, timeFilter, filterSubQuery)
withSubQuery = tracesV3.AddLimitToQuery(withSubQuery, mq.Limit)
if mq.Offset != 0 {
withSubQuery = tracesV3.AddOffsetToQuery(withSubQuery, mq.Offset)
}
query = fmt.Sprintf(constants.TracesExplorerViewSQLSelectBeforeSubQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_V3) + withSubQuery + ") " + fmt.Sprintf(constants.TracesExplorerViewSQLSelectAfterSubQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_V3, timeFilter)
} else if panelType == v3.PanelTypeList {
if len(mq.SelectColumns) == 0 {
return "", fmt.Errorf("select columns cannot be empty for panelType %s", panelType)
}
selectLabels = getSelectLabels(mq.SelectColumns)
queryNoOpTmpl := fmt.Sprintf("SELECT timestamp as timestamp_datetime, spanID, traceID,%s ", selectLabels) + "from " + constants.SIGNOZ_TRACE_DBNAME + "." + constants.SIGNOZ_SPAN_INDEX_V3 + " where %s %s" + "%s"
query = fmt.Sprintf(queryNoOpTmpl, timeFilter, filterSubQuery, orderBy)
} else {
return "", fmt.Errorf("unsupported aggregate operator %s for panelType %s", mq.AggregateOperator, panelType)
}
return query, nil
// ---- NOOP ends here ----
}
having := tracesV3.Having(mq.Having)
if having != "" {
having = " having " + having
}
groupBy := tracesV3.GroupByAttributeKeyTags(panelType, options.GraphLimitQtype, mq.GroupBy...)
if groupBy != "" {
groupBy = " group by " + groupBy
}
aggregationKey := ""
if mq.AggregateAttribute.Key != "" {
aggregationKey = getColumnName(mq.AggregateAttribute)
}
var queryTmpl string
if options.GraphLimitQtype == constants.FirstQueryGraphLimit {
queryTmpl = "SELECT"
} else if panelType == v3.PanelTypeTable {
queryTmpl =
"SELECT "
// step or aggregate interval is whole time period in case of table panel
step = (tracesEnd - tracesStart) / 1000000000
} else if panelType == v3.PanelTypeGraph || panelType == v3.PanelTypeValue {
// Select the aggregate value for interval
queryTmpl =
fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d SECOND) AS ts,", step)
}
queryTmpl = queryTmpl + selectLabels +
" %s as value " +
"from " + constants.SIGNOZ_TRACE_DBNAME + "." + constants.SIGNOZ_SPAN_INDEX_V3 +
" where " + timeFilter + "%s" +
"%s%s" +
"%s"
// we don't need value for first query
if options.GraphLimitQtype == constants.FirstQueryGraphLimit {
queryTmpl = "SELECT " + tracesV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")"
}
emptyValuesInGroupByFilter, err := handleEmptyValuesInGroupBy(mq.GroupBy)
if err != nil {
return "", err
}
filterSubQuery += emptyValuesInGroupByFilter
if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", tracesV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "%s)"
}
switch mq.AggregateOperator {
case v3.AggregateOperatorRateSum,
v3.AggregateOperatorRateMax,
v3.AggregateOperatorRateAvg,
v3.AggregateOperatorRateMin,
v3.AggregateOperatorRate:
rate := float64(step)
if options.PreferRPM {
rate = rate / 60.0
}
op := fmt.Sprintf("%s(%s)/%f", tracesV3.AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case
v3.AggregateOperatorP05,
v3.AggregateOperatorP10,
v3.AggregateOperatorP20,
v3.AggregateOperatorP25,
v3.AggregateOperatorP50,
v3.AggregateOperatorP75,
v3.AggregateOperatorP90,
v3.AggregateOperatorP95,
v3.AggregateOperatorP99:
op := fmt.Sprintf("quantile(%v)(%s)", tracesV3.AggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax:
op := fmt.Sprintf("%s(%s)", tracesV3.AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case v3.AggregateOperatorCount:
if mq.AggregateAttribute.Key != "" {
if mq.AggregateAttribute.IsColumn {
subQuery, err := tracesV3.ExistsSubQueryForFixedColumn(mq.AggregateAttribute, v3.FilterOperatorExists)
if err == nil {
filterSubQuery = fmt.Sprintf("%s AND %s", filterSubQuery, subQuery)
}
} else {
// columnType, columnDataType := getClickhouseTracesColumnDataTypeAndType(mq.AggregateAttribute)
column := getColumnName(mq.AggregateAttribute)
filterSubQuery = fmt.Sprintf("%s AND has(%s, '%s')", filterSubQuery, column, mq.AggregateAttribute.Key)
}
}
op := "toFloat64(count())"
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case v3.AggregateOperatorCountDistinct:
op := fmt.Sprintf("toFloat64(count(distinct(%s)))", aggregationKey)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
// case v3.AggregateOperatorNoOp:
// return query, nil
default:
return "", fmt.Errorf("unsupported aggregate operator %s", mq.AggregateOperator)
}
}
// PrepareTracesQuery returns the query string for traces
// start and end are in epoch millisecond
// step is in seconds
func PrepareTracesQuery(start, end int64, panelType v3.PanelType, mq *v3.BuilderQuery, options v3.QBOptions) (string, error) {
// adjust the start and end time to the step interval
start = start - (start % (mq.StepInterval * 1000))
end = end - (end % (mq.StepInterval * 1000))
if options.GraphLimitQtype == constants.FirstQueryGraphLimit {
// give me just the group by names
query, err := buildTracesQuery(start, end, mq.StepInterval, mq, panelType, options)
if err != nil {
return "", err
}
query = tracesV3.AddLimitToQuery(query, mq.Limit)
return query, nil
} else if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
query, err := buildTracesQuery(start, end, mq.StepInterval, mq, panelType, options)
if err != nil {
return "", err
}
return query, nil
}
query, err := buildTracesQuery(start, end, mq.StepInterval, mq, panelType, options)
if err != nil {
return "", err
}
if panelType == v3.PanelTypeValue {
query, err = tracesV3.ReduceToQuery(query, mq.ReduceTo, mq.AggregateOperator)
}
if panelType == v3.PanelTypeList || panelType == v3.PanelTypeTable {
query = tracesV3.AddLimitToQuery(query, mq.Limit)
if mq.Offset != 0 {
query = tracesV3.AddOffsetToQuery(query, mq.Offset)
}
}
return query, err
}

View File

@@ -0,0 +1,539 @@
package v4
import (
"testing"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func Test_getClickHouseTracesColumnType(t *testing.T) {
type args struct {
columnType v3.AttributeKeyType
}
tests := []struct {
name string
args args
want string
}{
{
name: "tag",
args: args{
columnType: v3.AttributeKeyTypeTag,
},
want: "attributes",
},
{
name: "resource",
args: args{
columnType: v3.AttributeKeyTypeResource,
},
want: "resources",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getClickHouseTracesColumnType(tt.args.columnType); got != tt.want {
t.Errorf("GetClickhouseTracesColumnType() = %v, want %v", got, tt.want)
}
})
}
}
func Test_getClickHouseTracesColumnDataType(t *testing.T) {
type args struct {
columnDataType v3.AttributeKeyDataType
}
tests := []struct {
name string
args args
want string
}{
{
name: "string",
args: args{
columnDataType: v3.AttributeKeyDataTypeString,
},
want: "string",
},
{
name: "float64",
args: args{
columnDataType: v3.AttributeKeyDataTypeFloat64,
},
want: "number",
},
{
name: "int64",
args: args{
columnDataType: v3.AttributeKeyDataTypeInt64,
},
want: "number",
},
{
name: "bool",
args: args{
columnDataType: v3.AttributeKeyDataTypeBool,
},
want: "bool",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getClickHouseTracesColumnDataType(tt.args.columnDataType); got != tt.want {
t.Errorf("getClickhouseTracesColumnDataType() = %v, want %v", got, tt.want)
}
})
}
}
func Test_getColumnName(t *testing.T) {
type args struct {
key v3.AttributeKey
}
tests := []struct {
name string
args args
want string
}{
{
name: "tag",
args: args{
key: v3.AttributeKey{Key: "data", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag},
},
want: "attributes_string['data']",
},
{
name: "column",
args: args{
key: v3.AttributeKey{Key: "data", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true},
},
want: "`data`",
},
{
name: "missing meta",
args: args{
key: v3.AttributeKey{Key: "xyz"},
},
want: "attributes_string['xyz']",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getColumnName(tt.args.key); got != tt.want {
t.Errorf("getColumnName() = %v, want %v", got, tt.want)
}
})
}
}
func Test_getSelectLabels(t *testing.T) {
type args struct {
groupBy []v3.AttributeKey
}
tests := []struct {
name string
args args
want string
}{
{
name: "count",
args: args{
groupBy: []v3.AttributeKey{{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}},
},
want: " attributes_string['user_name'] as `user_name`",
},
{
name: "multiple group by",
args: args{
groupBy: []v3.AttributeKey{
{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag},
{Key: "service_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource, IsColumn: true},
},
},
want: " attributes_string['user_name'] as `user_name`, `service_name` as `service_name`",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getSelectLabels(tt.args.groupBy); got != tt.want {
t.Errorf("getSelectLabels() = %v, want %v", got, tt.want)
}
})
}
}
func Test_buildTracesFilterQuery(t *testing.T) {
type args struct {
fs *v3.FilterSet
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "Test buildTracesFilterQuery in, nin",
args: args{
fs: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: []interface{}{"GET", "POST"}, Operator: v3.FilterOperatorIn},
{Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: []interface{}{"PUT"}, Operator: v3.FilterOperatorNotIn},
{Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: []interface{}{"server"}, Operator: v3.FilterOperatorNotIn},
}},
},
want: "attributes_string['method'] IN ['GET','POST'] AND attributes_string['method'] NOT IN ['PUT']",
wantErr: false,
},
{
name: "Test buildTracesFilterQuery not eq, neq, gt, lt, gte, lte",
args: args{
fs: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "duration", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}, Value: 102, Operator: v3.FilterOperatorEqual},
{Key: v3.AttributeKey{Key: "duration", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}, Value: 100, Operator: v3.FilterOperatorNotEqual},
{Key: v3.AttributeKey{Key: "duration", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}, Value: 10, Operator: v3.FilterOperatorGreaterThan},
{Key: v3.AttributeKey{Key: "duration", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}, Value: 200, Operator: v3.FilterOperatorLessThan},
{Key: v3.AttributeKey{Key: "duration", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}, Value: 10, Operator: v3.FilterOperatorGreaterThanOrEq},
{Key: v3.AttributeKey{Key: "duration", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}, Value: 200, Operator: v3.FilterOperatorLessThanOrEq},
}},
},
want: "attributes_number['duration'] = 102 AND attributes_number['duration'] != 100 AND attributes_number['duration'] > 10 AND attributes_number['duration'] < 200" +
" AND attributes_number['duration'] >= 10 AND attributes_number['duration'] <= 200",
wantErr: false,
},
{
name: "Test contains, ncontains, like, nlike, regex, nregex",
args: args{
fs: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "102.", Operator: v3.FilterOperatorContains},
{Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "103", Operator: v3.FilterOperatorNotContains},
{Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "102.", Operator: v3.FilterOperatorLike},
{Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "102", Operator: v3.FilterOperatorNotLike},
{Key: v3.AttributeKey{Key: "path", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "/mypath", Operator: v3.FilterOperatorRegex},
{Key: v3.AttributeKey{Key: "path", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "/health.*", Operator: v3.FilterOperatorNotRegex},
}},
},
want: "attributes_string['host'] ILIKE '%102.%' AND attributes_string['host'] NOT ILIKE '%103%' AND attributes_string['host'] ILIKE '102.' AND attributes_string['host'] NOT ILIKE '102' AND " +
"match(`path`, '/mypath') AND NOT match(`path`, '/health.*')",
},
{
name: "Test exists, nexists",
args: args{
fs: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Operator: v3.FilterOperatorExists},
{Key: v3.AttributeKey{Key: "path", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Operator: v3.FilterOperatorNotExists},
}},
},
want: "mapContains(attributes_string, 'host') AND path = ''",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := buildTracesFilterQuery(tt.args.fs)
if (err != nil) != tt.wantErr {
t.Errorf("buildTracesFilterQuery() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("buildTracesFilterQuery() = %v, want %v", got, tt.want)
}
})
}
}
var handleEmptyValuesInGroupByData = []struct {
Name string
GroupBy []v3.AttributeKey
ExpectedFilter string
}{
{
Name: "String type key",
GroupBy: []v3.AttributeKey{{Key: "bytes", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}},
ExpectedFilter: " AND has(stringTagMap, 'bytes')",
},
{
Name: "fixed column type key",
GroupBy: []v3.AttributeKey{{Key: "bytes", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}},
ExpectedFilter: "",
},
{
Name: "String, float64 and fixed column type key",
GroupBy: []v3.AttributeKey{
{Key: "bytes", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag},
{Key: "count", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag},
{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true},
},
ExpectedFilter: " AND has(stringTagMap, 'bytes') AND has(numberTagMap, 'count')",
},
}
func Test_handleEmptyValuesInGroupBy(t *testing.T) {
type args struct {
groupBy []v3.AttributeKey
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "Test handleEmptyValuesInGroupBy",
args: args{
groupBy: []v3.AttributeKey{{Key: "bytes", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}},
},
want: "mapContains(attributes_string, 'bytes')",
wantErr: false,
},
{
name: "Test handleEmptyValuesInGroupBy",
args: args{
groupBy: []v3.AttributeKey{{Key: "bytes", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}},
},
want: "",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := handleEmptyValuesInGroupBy(tt.args.groupBy)
if (err != nil) != tt.wantErr {
t.Errorf("handleEmptyValuesInGroupBy() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("handleEmptyValuesInGroupBy() = %v, want %v", got, tt.want)
}
})
}
}
func Test_buildTracesQuery(t *testing.T) {
type args struct {
start int64
end int64
step int64
mq *v3.BuilderQuery
panelType v3.PanelType
options v3.QBOptions
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "Test buildTracesQuery",
args: args{
panelType: v3.PanelTypeTable,
start: 1680066360726210000,
end: 1680066458000000000,
step: 1000,
mq: &v3.BuilderQuery{
AggregateOperator: v3.AggregateOperatorCount,
Filters: &v3.FilterSet{
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeInt64}, Value: 100, Operator: ">"},
{Key: v3.AttributeKey{Key: "service.name", Type: v3.AttributeKeyTypeResource, DataType: v3.AttributeKeyDataTypeString}, Value: "myService", Operator: "="},
},
},
GroupBy: []v3.AttributeKey{{Key: "host", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeResource}},
OrderBy: []v3.OrderBy{
{ColumnName: "host", Order: "ASC"}},
},
},
want: "SELECT resources_number['host'] as `host` toFloat64(count()) as value from signoz_traces.distributed_signoz_index_v3 where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') " +
"AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND attributes_number['bytes'] > 100 AND " +
"(resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (seen_at_ts_bucket_start >= 1680064560) AND " +
"(seen_at_ts_bucket_start <= 1680066458) AND simpleJSONExtractString(labels, 'service.name') = 'myService' AND labels like '%service.name%myService%' AND " +
"( (simpleJSONHas(labels, 'host') AND labels like '%host%') ))) " +
"group by `host` order by `host` ASC",
},
{
name: "test noop list view",
args: args{
panelType: v3.PanelTypeList,
start: 1680066360726210000,
end: 1680066458000000000,
mq: &v3.BuilderQuery{
AggregateOperator: v3.AggregateOperatorNoOp,
Filters: &v3.FilterSet{},
SelectColumns: []v3.AttributeKey{{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}},
},
},
want: "SELECT timestamp as timestamp_datetime, spanID, traceID, `name` as `name` from signoz_traces.distributed_signoz_index_v3 where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') " +
"AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) order by timestamp DESC",
},
{
name: "test noop trace view",
args: args{
panelType: v3.PanelTypeTrace,
start: 1680066360726210000,
end: 1680066458000000000,
mq: &v3.BuilderQuery{
AggregateOperator: v3.AggregateOperatorNoOp,
Filters: &v3.FilterSet{
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="},
{Key: v3.AttributeKey{Key: "service.name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: "myService", Operator: "="},
},
},
},
},
want: "SELECT subQuery.serviceName, subQuery.name, count() AS span_count, subQuery.durationNano, subQuery.traceID AS traceID FROM signoz_traces.distributed_signoz_index_v3 INNER JOIN " +
"( SELECT * FROM (SELECT traceID, durationNano, serviceName, name FROM signoz_traces.signoz_index_v3 WHERE parentSpanID = '' AND (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND " +
"(ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND attributes_string['method'] = 'GET' AND (resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource " +
"WHERE (seen_at_ts_bucket_start >= 1680064560) AND (seen_at_ts_bucket_start <= 1680066458) AND simpleJSONExtractString(labels, 'service.name') = 'myService' AND labels like '%service.name%myService%')) " +
"ORDER BY durationNano DESC LIMIT 1 BY traceID LIMIT 100) AS inner_subquery ) AS subQuery ON signoz_traces.distributed_signoz_index_v3.traceID = subQuery.traceID WHERE (timestamp >= '1680066360726210000' AND " +
"timestamp <= '1680066458000000000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) GROUP BY subQuery.traceID, subQuery.durationNano, subQuery.name, subQuery.serviceName ORDER BY " +
"subQuery.durationNano desc LIMIT 1 BY subQuery.traceID;",
},
{
name: "Test order by value with having",
args: args{
panelType: v3.PanelTypeTable,
start: 1680066360726210000,
end: 1680066458000000000,
mq: &v3.BuilderQuery{
AggregateOperator: v3.AggregateOperatorCountDistinct,
Filters: &v3.FilterSet{},
AggregateAttribute: v3.AttributeKey{Key: "name", IsColumn: true, DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag},
OrderBy: []v3.OrderBy{{ColumnName: "#SIGNOZ_VALUE", Order: "ASC"}},
Having: []v3.Having{
{
ColumnName: "name",
Operator: ">",
Value: 10,
},
},
},
},
want: "SELECT toFloat64(count(distinct(`name`))) as value from signoz_traces.distributed_signoz_index_v3 where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND " +
"(ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) having value > 10 order by value ASC",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := buildTracesQuery(tt.args.start, tt.args.end, tt.args.step, tt.args.mq, tt.args.panelType, tt.args.options)
if (err != nil) != tt.wantErr {
t.Errorf("buildTracesQuery() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("buildTracesQuery() = %v, want %v", got, tt.want)
}
})
}
}
func Test_orderByAttributeKeyTags(t *testing.T) {
type args struct {
panelType v3.PanelType
items []v3.OrderBy
tags []v3.AttributeKey
}
tests := []struct {
name string
args args
want string
}{
{
name: "test",
args: args{
panelType: v3.PanelTypeTrace,
items: []v3.OrderBy{{ColumnName: "name", Order: "ASC"}},
tags: []v3.AttributeKey{{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}},
},
want: "`name` ASC",
},
{
name: "test",
args: args{
panelType: v3.PanelTypeList,
items: []v3.OrderBy{{ColumnName: "name", Order: "DESC"}},
tags: []v3.AttributeKey{{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}},
},
want: "`name` DESC",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := orderByAttributeKeyTags(tt.args.panelType, tt.args.items, tt.args.tags); got != tt.want {
t.Errorf("orderByAttributeKeyTags() = %v, want %v", got, tt.want)
}
})
}
}
func TestPrepareTracesQuery(t *testing.T) {
type args struct {
start int64
end int64
panelType v3.PanelType
mq *v3.BuilderQuery
options v3.QBOptions
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "test with limit - first",
args: args{
start: 1680066360726210000,
end: 1680066458000000000,
panelType: v3.PanelTypeTable,
mq: &v3.BuilderQuery{
StepInterval: 60,
AggregateOperator: v3.AggregateOperatorCountDistinct,
Filters: &v3.FilterSet{},
AggregateAttribute: v3.AttributeKey{Key: "name", IsColumn: true, DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag},
GroupBy: []v3.AttributeKey{{Key: "serviceName", IsColumn: true, DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}},
Limit: 10,
},
options: v3.QBOptions{
GraphLimitQtype: constants.FirstQueryGraphLimit,
},
},
want: "SELECT `serviceName` from (SELECT `serviceName` as `serviceName` toFloat64(count(distinct(`name`))) as value from signoz_traces.distributed_signoz_index_v3 " +
"where (timestamp >= '1680066360726180000' AND timestamp <= '1680066457999980000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066457) group by `serviceName`) LIMIT 10",
},
{
name: "test with limit - second",
args: args{
start: 1680066360726210000,
end: 1680066458000000000,
panelType: v3.PanelTypeTable,
mq: &v3.BuilderQuery{
StepInterval: 60,
AggregateOperator: v3.AggregateOperatorCountDistinct,
Filters: &v3.FilterSet{},
AggregateAttribute: v3.AttributeKey{Key: "name", IsColumn: true, DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag},
GroupBy: []v3.AttributeKey{{Key: "serviceName", IsColumn: true, DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}},
Limit: 10,
},
options: v3.QBOptions{
GraphLimitQtype: constants.SecondQueryGraphLimit,
},
},
want: "SELECT `serviceName` as `serviceName` toFloat64(count(distinct(`name`))) as value from signoz_traces.distributed_signoz_index_v3 where " +
"(timestamp >= '1680066360726180000' AND timestamp <= '1680066457999980000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066457) AND (`serviceName`) GLOBAL IN (%s) group by `serviceName`",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := PrepareTracesQuery(tt.args.start, tt.args.end, tt.args.panelType, tt.args.mq, tt.args.options)
if (err != nil) != tt.wantErr {
t.Errorf("PrepareTracesQuery() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("PrepareTracesQuery() = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -0,0 +1,216 @@
package v4
import (
"strconv"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
"go.uber.org/zap"
)
var TracesListViewDefaultSelectedColumns = []v3.AttributeKey{
{
Key: "serviceName",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: true,
},
{
Key: "name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: true,
},
{
Key: "durationNano",
DataType: v3.AttributeKeyDataTypeArrayFloat64,
Type: v3.AttributeKeyTypeTag,
IsColumn: true,
},
{
Key: "httpMethod",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: true,
},
{
Key: "responseStatusCode",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: true,
},
}
// check if traceId filter is used in traces query and return the list of traceIds
func TraceIdFilterUsedWithEqual(params *v3.QueryRangeParamsV3) (bool, []string) {
compositeQuery := params.CompositeQuery
if compositeQuery == nil {
return false, []string{}
}
var traceIds []string
var traceIdFilterUsed bool
// Build queries for each builder query
for queryName, query := range compositeQuery.BuilderQueries {
if query.Expression != queryName && query.DataSource != v3.DataSourceTraces {
continue
}
// check filter attribute
if query.Filters != nil && len(query.Filters.Items) != 0 {
for _, item := range query.Filters.Items {
if item.Key.Key == "traceID" && (item.Operator == v3.FilterOperatorIn ||
item.Operator == v3.FilterOperatorEqual) {
traceIdFilterUsed = true
// validate value
var err error
val := item.Value
val, err = utils.ValidateAndCastValue(val, item.Key.DataType)
if err != nil {
zap.L().Error("invalid value for key", zap.String("key", item.Key.Key), zap.Error(err))
return false, []string{}
}
if val != nil {
fmtVal := extractFormattedStringValues(val)
traceIds = append(traceIds, fmtVal...)
}
}
}
}
}
zap.L().Debug("traceIds", zap.Any("traceIds", traceIds))
return traceIdFilterUsed, traceIds
}
func extractFormattedStringValues(v interface{}) []string {
// if it's pointer convert it to a value
v = getPointerValue(v)
switch x := v.(type) {
case string:
return []string{x}
case []interface{}:
if len(x) == 0 {
return []string{}
}
switch x[0].(type) {
case string:
values := []string{}
for _, val := range x {
values = append(values, val.(string))
}
return values
default:
return []string{}
}
default:
return []string{}
}
}
func getPointerValue(v interface{}) interface{} {
switch x := v.(type) {
case *uint8:
return *x
case *uint16:
return *x
case *uint32:
return *x
case *uint64:
return *x
case *int:
return *x
case *int8:
return *x
case *int16:
return *x
case *int32:
return *x
case *int64:
return *x
case *float32:
return *x
case *float64:
return *x
case *string:
return *x
case *bool:
return *x
case []interface{}:
values := []interface{}{}
for _, val := range x {
values = append(values, getPointerValue(val))
}
return values
default:
return v
}
}
func AddTimestampFilters(minTime int64, maxTime int64, params *v3.QueryRangeParamsV3) {
if minTime == 0 && maxTime == 0 {
return
}
compositeQuery := params.CompositeQuery
if compositeQuery == nil {
return
}
// Build queries for each builder query and apply timestamp filter only if TraceID is present
for queryName, query := range compositeQuery.BuilderQueries {
if query.Expression != queryName && query.DataSource != v3.DataSourceTraces {
continue
}
addTimeStampFilter := false
// check filter attribute
if query.Filters != nil && len(query.Filters.Items) != 0 {
for _, item := range query.Filters.Items {
if item.Key.Key == "traceID" && (item.Operator == v3.FilterOperatorIn ||
item.Operator == v3.FilterOperatorEqual) {
addTimeStampFilter = true
}
}
}
// add timestamp filter to query only if traceID filter along with equal/similar operator is used
if addTimeStampFilter {
timeFilters := []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "timestamp",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
IsColumn: true,
},
Value: strconv.FormatUint(uint64(minTime), 10),
Operator: v3.FilterOperatorGreaterThanOrEq,
},
{
Key: v3.AttributeKey{
Key: "timestamp",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
IsColumn: true,
},
Value: strconv.FormatUint(uint64(maxTime), 10),
Operator: v3.FilterOperatorLessThanOrEq,
},
}
// add new timestamp filter to query
if query.Filters == nil {
query.Filters = &v3.FilterSet{
Items: timeFilters,
}
} else {
query.Filters.Items = append(query.Filters.Items, timeFilters...)
}
}
}
}

View File

@@ -234,7 +234,9 @@ const (
SIGNOZ_EXP_HISTOGRAM_TABLENAME = "distributed_exp_hist" SIGNOZ_EXP_HISTOGRAM_TABLENAME = "distributed_exp_hist"
SIGNOZ_TRACE_DBNAME = "signoz_traces" SIGNOZ_TRACE_DBNAME = "signoz_traces"
SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2" SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2"
SIGNOZ_SPAN_INDEX_V3 = "distributed_signoz_index_v3"
SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME = "signoz_index_v2" SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME = "signoz_index_v2"
SIGNOZ_SPAN_INDEX_V3_LOCAL_TABLENAME = "signoz_index_v3"
SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4" SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4"
SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs" SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs"
SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day" SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day"

View File

@@ -39,6 +39,7 @@ func main() {
var disableRules bool var disableRules bool
var useLogsNewSchema bool var useLogsNewSchema bool
var useTraceNewSchema bool
// the url used to build link in the alert messages in slack and other systems // the url used to build link in the alert messages in slack and other systems
var ruleRepoURL, cacheConfigPath, fluxInterval string var ruleRepoURL, cacheConfigPath, fluxInterval string
var cluster string var cluster string
@@ -50,6 +51,7 @@ func main() {
var dialTimeout time.Duration var dialTimeout time.Duration
flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs") flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces")
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)") flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
@@ -87,6 +89,7 @@ func main() {
FluxInterval: fluxInterval, FluxInterval: fluxInterval,
Cluster: cluster, Cluster: cluster,
UseLogsNewSchema: useLogsNewSchema, UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
} }
// Read the jwt secret key // Read the jwt secret key

View File

@@ -269,6 +269,23 @@ type SearchSpanResponseItem struct {
SpanKind string `json:"spanKind"` SpanKind string `json:"spanKind"`
} }
type SearchSpanResponseItemV2 struct {
TimeUnixNano time.Time `json:"timestamp" ch:"timestamp"`
DurationNano uint64 `json:"durationNano" ch:"durationNano"`
SpanID string `json:"spanId" ch:"spanID"`
TraceID string `json:"traceId" ch:"traceID"`
HasError bool `json:"hasError" ch:"hasError"`
Kind int8 `json:"kind" ch:"kind"`
ServiceName string `json:"serviceName" ch:"serviceName"`
Name string `json:"name" ch:"name"`
References string `json:"references,omitempty" ch:"references"`
TagMap map[string]string `json:"tagMap" ch:"attributes_string"`
Events []string `json:"event" ch:"events"`
StatusMessage string `json:"statusMessage" ch:"statusMessage"`
StatusCodeString string `json:"statusCodeString" ch:"statusCodeString"`
SpanKind string `json:"spanKind" ch:"spanKind"`
}
type OtelSpanRef struct { type OtelSpanRef struct {
TraceId string `json:"traceId,omitempty"` TraceId string `json:"traceId,omitempty"`
SpanId string `json:"spanId,omitempty"` SpanId string `json:"spanId,omitempty"`

View File

@@ -39,6 +39,7 @@ type PrepareTaskOptions struct {
NotifyFunc NotifyFunc NotifyFunc NotifyFunc
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
const taskNamesuffix = "webAppEditor" const taskNamesuffix = "webAppEditor"
@@ -82,6 +83,7 @@ type ManagerOptions struct {
PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error) PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
// The Manager manages recording and alerting rules. // The Manager manages recording and alerting rules.
@@ -105,6 +107,7 @@ type Manager struct {
prepareTaskFunc func(opts PrepareTaskOptions) (Task, error) prepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
func defaultOptions(o *ManagerOptions) *ManagerOptions { func defaultOptions(o *ManagerOptions) *ManagerOptions {
@@ -140,6 +143,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.FF, opts.FF,
opts.Reader, opts.Reader,
opts.UseLogsNewSchema, opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
WithEvalDelay(opts.ManagerOpts.EvalDelay), WithEvalDelay(opts.ManagerOpts.EvalDelay),
) )
@@ -352,6 +356,7 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error {
NotifyFunc: m.prepareNotifyFunc(), NotifyFunc: m.prepareNotifyFunc(),
UseLogsNewSchema: m.opts.UseLogsNewSchema, UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
}) })
if err != nil { if err != nil {
@@ -474,6 +479,7 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error {
NotifyFunc: m.prepareNotifyFunc(), NotifyFunc: m.prepareNotifyFunc(),
UseLogsNewSchema: m.opts.UseLogsNewSchema, UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
}) })
if err != nil { if err != nil {
@@ -817,6 +823,7 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
m.featureFlags, m.featureFlags,
m.reader, m.reader,
m.opts.UseLogsNewSchema, m.opts.UseLogsNewSchema,
m.opts.UseTraceNewSchema,
WithSendAlways(), WithSendAlways(),
WithSendUnmatched(), WithSendUnmatched(),
) )

View File

@@ -58,6 +58,7 @@ func NewThresholdRule(
featureFlags interfaces.FeatureLookup, featureFlags interfaces.FeatureLookup,
reader interfaces.Reader, reader interfaces.Reader,
useLogsNewSchema bool, useLogsNewSchema bool,
useTraceNewSchema bool,
opts ...RuleOption, opts ...RuleOption,
) (*ThresholdRule, error) { ) (*ThresholdRule, error) {
@@ -79,6 +80,7 @@ func NewThresholdRule(
KeyGenerator: queryBuilder.NewKeyGenerator(), KeyGenerator: queryBuilder.NewKeyGenerator(),
FeatureLookup: featureFlags, FeatureLookup: featureFlags,
UseLogsNewSchema: useLogsNewSchema, UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
} }
querierOptsV2 := querierV2.QuerierOptions{ querierOptsV2 := querierV2.QuerierOptions{
@@ -87,6 +89,7 @@ func NewThresholdRule(
KeyGenerator: queryBuilder.NewKeyGenerator(), KeyGenerator: queryBuilder.NewKeyGenerator(),
FeatureLookup: featureFlags, FeatureLookup: featureFlags,
UseLogsNewSchema: useLogsNewSchema, UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
} }
t.querier = querier.NewQuerier(querierOption) t.querier = querier.NewQuerier(querierOption)

View File

@@ -791,7 +791,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) {
postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@@ -880,7 +880,7 @@ func TestPrepareLinksToLogs(t *testing.T) {
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@@ -922,7 +922,7 @@ func TestPrepareLinksToTraces(t *testing.T) {
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@@ -998,7 +998,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) {
postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@@ -1051,7 +1051,7 @@ func TestThresholdRuleEvalDelay(t *testing.T) {
fm := featureManager.StartManager() fm := featureManager.StartManager()
for idx, c := range cases { for idx, c := range cases {
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true) // no eval delay rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true) // no eval delay
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@@ -1100,7 +1100,7 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) {
fm := featureManager.StartManager() fm := featureManager.StartManager()
for idx, c := range cases { for idx, c := range cases {
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@@ -1241,9 +1241,9 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
} }
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{ rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": { "signoz_calls_total": {
v3.Delta: true, v3.Delta: true,
@@ -1340,9 +1340,9 @@ func TestThresholdRuleNoData(t *testing.T) {
} }
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{ rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": { "signoz_calls_total": {
v3.Delta: true, v3.Delta: true,
@@ -1445,9 +1445,9 @@ func TestThresholdRuleTracesLink(t *testing.T) {
} }
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{ rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": { "signoz_calls_total": {
v3.Delta: true, v3.Delta: true,
@@ -1570,9 +1570,9 @@ func TestThresholdRuleLogsLink(t *testing.T) {
} }
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{ rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": { "signoz_calls_total": {
v3.Delta: true, v3.Delta: true,

View File

@@ -46,6 +46,7 @@ func NewMockClickhouseReader(
featureFlags, featureFlags,
"", "",
true, true,
true,
) )
return reader, mockDB return reader, mockDB