Compare commits

...

9 Commits

Author SHA1 Message Date
nityanandagohain
1487820750 fix: use correct prepQUery 2024-10-23 18:06:25 +05:30
nityanandagohain
fd09f57f76 fix: tests 2024-10-23 17:44:18 +05:30
nityanandagohain
9bc7c8708a fix: add servicename resource filter 2024-10-23 10:49:35 +05:30
nityanandagohain
2115093876 fix: get trace by id api updated 2024-10-22 22:50:57 +05:30
nityanandagohain
33f4d8306d Merge remote-tracking branch 'origin/develop' into feat/trace-v3-poc 2024-10-22 17:14:03 +05:30
nityanandagohain
dbf5f8b77a fix: integrate with querier 2024-10-22 14:06:55 +05:30
nityanandagohain
bfc46790bb Merge remote-tracking branch 'origin/develop' into feat/trace-v3-poc 2024-10-22 10:21:03 +05:30
nityanandagohain
7a011f3460 fix: add remaining files 2024-10-22 10:20:10 +05:30
nityanandagohain
2c30e1493f feat: trace v4 inital commit 2024-10-21 18:20:48 +05:30
27 changed files with 1680 additions and 136 deletions

View File

@@ -38,8 +38,9 @@ type APIHandlerOptions struct {
Cache cache.Cache
Gateway *httputil.ReverseProxy
// Querier Influx Interval
FluxInterval time.Duration
UseLogsNewSchema bool
FluxInterval time.Duration
UseLogsNewSchema bool
UseTraceNewSchema bool
}
type APIHandler struct {
@@ -65,6 +66,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
Cache: opts.Cache,
FluxInterval: opts.FluxInterval,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
})
if err != nil {

View File

@@ -2,32 +2,31 @@ package api
import (
"net/http"
"go.signoz.io/signoz/ee/query-service/app/db"
"go.signoz.io/signoz/ee/query-service/model"
baseapp "go.signoz.io/signoz/pkg/query-service/app"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
)
func (ah *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) {
if !ah.CheckFeature(basemodel.SmartTraceDetail) {
zap.L().Info("SmartTraceDetail feature is not enabled in this plan")
ah.APIHandler.SearchTraces(w, r)
return
}
searchTracesParams, err := baseapp.ParseSearchTracesParams(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params")
return
}
ah.APIHandler.SearchTraces(w, r)
return
result, err := ah.opts.DataConnector.SearchTraces(r.Context(), searchTracesParams, db.SmartTraceAlgorithm)
if ah.HandleError(w, err, http.StatusBadRequest) {
return
}
// This is commented since this will be taken care by new trace API
ah.WriteJSON(w, r, result)
// if !ah.CheckFeature(basemodel.SmartTraceDetail) {
// zap.L().Info("SmartTraceDetail feature is not enabled in this plan")
// ah.APIHandler.SearchTraces(w, r)
// return
// }
// searchTracesParams, err := baseapp.ParseSearchTracesParams(r)
// if err != nil {
// RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params")
// return
// }
// result, err := ah.opts.DataConnector.SearchTraces(r.Context(), searchTracesParams, db.SmartTraceAlgorithm)
// if ah.HandleError(w, err, http.StatusBadRequest) {
// return
// }
// ah.WriteJSON(w, r, result)
}

View File

@@ -26,8 +26,9 @@ func NewDataConnector(
dialTimeout time.Duration,
cluster string,
useLogsNewSchema bool,
useTraceNewSchema bool,
) *ClickhouseReader {
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema)
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema)
return &ClickhouseReader{
conn: ch.GetConn(),
appdb: localDB,

View File

@@ -78,6 +78,7 @@ type ServerOptions struct {
Cluster string
GatewayUrl string
UseLogsNewSchema bool
UseTraceNewSchema bool
}
// Server runs HTTP api service
@@ -156,6 +157,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.DialTimeout,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
)
go qb.Start(readerReady)
reader = qb
@@ -189,6 +191,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.DisableRules,
lm,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
)
if err != nil {
@@ -270,6 +273,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
FluxInterval: fluxInterval,
Gateway: gatewayProxy,
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
}
apiHandler, err := api.NewAPIHandler(apiOpts)
@@ -736,7 +740,8 @@ func makeRulesManager(
cache cache.Cache,
disableRules bool,
fm baseint.FeatureLookup,
useLogsNewSchema bool) (*baserules.Manager, error) {
useLogsNewSchema bool,
useTraceNewSchema bool) (*baserules.Manager, error) {
// create engine
pqle, err := pqle.FromConfigPath(promConfigPath)
@@ -765,8 +770,9 @@ func makeRulesManager(
Cache: cache,
EvalDelay: baseconst.GetEvalDelay(),
PrepareTaskFunc: rules.PrepareTaskFunc,
UseLogsNewSchema: useLogsNewSchema,
PrepareTaskFunc: rules.PrepareTaskFunc,
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
}
// create Manager

View File

@@ -94,6 +94,7 @@ func main() {
var cluster string
var useLogsNewSchema bool
var useTraceNewSchema bool
var cacheConfigPath, fluxInterval string
var enableQueryServiceLogOTLPExport bool
var preferSpanMetrics bool
@@ -104,6 +105,7 @@ func main() {
var gatewayUrl string
flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces")
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
@@ -143,6 +145,7 @@ func main() {
Cluster: cluster,
GatewayUrl: gatewayUrl,
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
}
// Read the jwt secret key

View File

@@ -21,6 +21,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.FF,
opts.Reader,
opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
)

View File

@@ -45,6 +45,9 @@ const (
defaultLogsTableV2 string = "distributed_logs_v2"
defaultLogsResourceLocalTableV2 string = "logs_v2_resource"
defaultLogsResourceTableV2 string = "distributed_logs_v2_resource"
defaultTraceIndexTableV3 string = "distributed_signoz_index_v3"
defaultTraceResourceTableV3 string = "distributed_traces_v3_resource"
)
// NamespaceConfig is Clickhouse's internal configuration data
@@ -82,6 +85,9 @@ type namespaceConfig struct {
LogsTableV2 string
LogsResourceLocalTableV2 string
LogsResourceTableV2 string
TraceIndexTableV3 string
TraceResourceTableV3 string
}
// Connecto defines how to connect to the database
@@ -174,6 +180,9 @@ func NewOptions(
LogsLocalTableV2: defaultLogsLocalTableV2,
LogsResourceTableV2: defaultLogsResourceTableV2,
LogsResourceLocalTableV2: defaultLogsResourceLocalTableV2,
TraceIndexTableV3: defaultTraceIndexTableV3,
TraceResourceTableV3: defaultTraceResourceTableV3,
},
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
}

View File

@@ -146,8 +146,12 @@ type ClickHouseReader struct {
cluster string
useLogsNewSchema bool
useTraceNewSchema bool
logsTableName string
logsLocalTableName string
traceIndexTableV3 string
traceResourceTableV3 string
}
// NewTraceReader returns a TraceReader for the database
@@ -160,6 +164,7 @@ func NewReader(
dialTimeout time.Duration,
cluster string,
useLogsNewSchema bool,
useTraceNewSchema bool,
) *ClickHouseReader {
datasource := os.Getenv("ClickHouseUrl")
@@ -170,7 +175,7 @@ func NewReader(
zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err))
}
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema)
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema)
}
func NewReaderFromClickhouseConnection(
@@ -181,6 +186,7 @@ func NewReaderFromClickhouseConnection(
featureFlag interfaces.FeatureLookup,
cluster string,
useLogsNewSchema bool,
useTraceNewSchema bool,
) *ClickHouseReader {
alertManager, err := am.New()
if err != nil {
@@ -245,7 +251,8 @@ func NewReaderFromClickhouseConnection(
cluster: cluster,
queryProgressTracker: queryprogress.NewQueryProgressTracker(),
useLogsNewSchema: useLogsNewSchema,
useLogsNewSchema: useLogsNewSchema,
useTraceNewSchema: useTraceNewSchema,
logsTableV2: options.primary.LogsTableV2,
logsLocalTableV2: options.primary.LogsLocalTableV2,
@@ -253,6 +260,9 @@ func NewReaderFromClickhouseConnection(
logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2,
logsTableName: logsTableName,
logsLocalTableName: logsLocalTableName,
traceIndexTableV3: options.primary.TraceIndexTableV3,
traceResourceTableV3: options.primary.TraceResourceTableV3,
}
}
@@ -1666,10 +1676,125 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU
return &usageItems, nil
}
func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) {
var countSpans uint64
// TODO(nitya): check if we can use timestamp filter here
countQuery := fmt.Sprintf("SELECT count() as count from %s.%s WHERE traceID=$1", r.TraceDB, r.traceIndexTableV3)
err := r.db.QueryRow(ctx, countQuery, params.TraceID).Scan(&countSpans)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, fmt.Errorf("error in processing sql query")
}
if countSpans > uint64(params.MaxSpansInTrace) {
zap.L().Error("Max spans allowed in a trace limit reached", zap.Int("MaxSpansInTrace", params.MaxSpansInTrace),
zap.Uint64("Count", countSpans))
userEmail, err := auth.GetEmailFromJwt(ctx)
if err == nil {
data := map[string]interface{}{
"traceSize": countSpans,
"maxSpansInTraceLimit": params.MaxSpansInTrace,
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_MAX_SPANS_ALLOWED_LIMIT_REACHED, data, userEmail, true, false)
}
return nil, fmt.Errorf("max spans allowed in trace limit reached, please contact support for more details")
}
userEmail, err := auth.GetEmailFromJwt(ctx)
if err == nil {
data := map[string]interface{}{
"traceSize": countSpans,
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_TRACE_DETAIL_API, data, userEmail, true, false)
}
var startTime, endTime, durationNano uint64
var searchScanResponses []model.SearchSpanResponseItemV2
query := fmt.Sprintf("SELECT timestamp, durationNano, spanID, traceID, hasError, kind, serviceName, name, references, attributes_string, events, statusMessage, statusCodeString, spanKind FROM %s.%s WHERE traceID=$1", r.TraceDB, r.traceIndexTableV3)
start := time.Now()
err = r.db.Select(ctx, &searchScanResponses, query, params.TraceID)
zap.L().Info(query)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, fmt.Errorf("error in processing sql query")
}
end := time.Now()
zap.L().Debug("getTraceSQLQuery took: ", zap.Duration("duration", end.Sub(start)))
searchSpansResult := []model.SearchSpansResult{
{
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"},
Events: make([][]interface{}, len(searchScanResponses)),
IsSubTree: false,
},
}
searchSpanResponses := []model.SearchSpanResponseItem{}
start = time.Now()
for _, item := range searchScanResponses {
ref := []model.OtelSpanRef{}
err := json.Unmarshal([]byte(item.References), &ref)
if err != nil {
zap.L().Error("Error unmarshalling references", zap.Error(err))
return nil, err
}
// TODO(nitya): add attributes_number and attributes_bool and resources_string to this
jsonItem := model.SearchSpanResponseItem{
SpanID: item.SpanID,
TraceID: item.TraceID,
ServiceName: item.ServiceName,
Name: item.Name,
Kind: int32(item.Kind),
DurationNano: int64(item.DurationNano),
HasError: item.HasError,
StatusMessage: item.StatusMessage,
StatusCodeString: item.StatusCodeString,
SpanKind: item.SpanKind,
References: ref,
Events: item.Events,
TagMap: item.TagMap,
}
jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000)
searchSpanResponses = append(searchSpanResponses, jsonItem)
if startTime == 0 || jsonItem.TimeUnixNano < startTime {
startTime = jsonItem.TimeUnixNano
}
if endTime == 0 || jsonItem.TimeUnixNano > endTime {
endTime = jsonItem.TimeUnixNano
}
if durationNano == 0 || uint64(jsonItem.DurationNano) > durationNano {
durationNano = uint64(jsonItem.DurationNano)
}
}
end = time.Now()
zap.L().Debug("getTraceSQLQuery unmarshal took: ", zap.Duration("duration", end.Sub(start)))
for i, item := range searchSpanResponses {
spanEvents := item.GetValues()
searchSpansResult[0].Events[i] = spanEvents
}
searchSpansResult[0].StartTimestampMillis = startTime - (durationNano / 1000000)
searchSpansResult[0].EndTimestampMillis = endTime + (durationNano / 1000000)
return &searchSpansResult, nil
}
func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams,
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
if r.useTraceNewSchema {
return r.SearchTracesV2(ctx, params)
}
var countSpans uint64
countQuery := fmt.Sprintf("SELECT count() as count from %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable)
err := r.db.QueryRow(ctx, countQuery, params.TraceID).Scan(&countSpans)
@@ -1746,6 +1871,7 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.Searc
err = r.featureFlags.CheckFeature(model.SmartTraceDetail)
smartAlgoEnabled := err == nil
// TODO(nitya): this will never run remove it
if len(searchScanResponses) > params.SpansRenderLimit && smartAlgoEnabled {
start = time.Now()
searchSpansResult, err = smartTraceAlgorithm(searchSpanResponses, params.SpanID, params.LevelUp, params.LevelDown, params.SpansRenderLimit)

View File

@@ -39,6 +39,7 @@ import (
querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/common"
@@ -110,7 +111,8 @@ type APIHandler struct {
// Websocket connection upgrader
Upgrader *websocket.Upgrader
UseLogsNewSchema bool
UseLogsNewSchema bool
UseTraceNewSchema bool
hostsRepo *inframetrics.HostsRepo
processesRepo *inframetrics.ProcessesRepo
@@ -152,6 +154,8 @@ type APIHandlerOpts struct {
// Use Logs New schema
UseLogsNewSchema bool
UseTraceNewSchema bool
}
// NewAPIHandler returns an APIHandler
@@ -163,21 +167,23 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
}
querierOpts := querier.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema,
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
}
querierOptsV2 := querierV2.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema,
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
}
querier := querier.NewQuerier(querierOpts)
@@ -203,6 +209,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
querier: querier,
querierV2: querierv2,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
hostsRepo: hostsRepo,
processesRepo: processesRepo,
}
@@ -212,9 +219,14 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
logsQueryBuilder = logsv4.PrepareLogsQuery
}
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if opts.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
builderOpts := queryBuilder.QueryBuilderOptions{
BuildMetricQuery: metricsv3.PrepareMetricQuery,
BuildTraceQuery: tracesV3.PrepareTracesQuery,
BuildTraceQuery: tracesQueryBuilder,
BuildLogQuery: logsQueryBuilder,
}
aH.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, aH.featureFlags)
@@ -4360,7 +4372,12 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
RespondError(w, apiErrObj, errQuriesByName)
return
}
tracesV3.Enrich(queryRangeParams, spanKeys)
if aH.UseTraceNewSchema {
tracesV4.Enrich(queryRangeParams, spanKeys)
} else {
tracesV3.Enrich(queryRangeParams, spanKeys)
}
}
// WARN: Only works for AND operator in traces query

View File

@@ -10,6 +10,7 @@ import (
logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4"
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
@@ -158,11 +159,16 @@ func (q *querier) runBuilderQuery(
if builderQuery.DataSource == v3.DataSourceTraces {
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if q.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
var query string
var err error
// for ts query with group by and limit form two queries
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
limitQuery, err := tracesV3.PrepareTracesQuery(
limitQuery, err := tracesQueryBuilder(
start,
end,
params.CompositeQuery.PanelType,
@@ -173,7 +179,7 @@ func (q *querier) runBuilderQuery(
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
return
}
placeholderQuery, err := tracesV3.PrepareTracesQuery(
placeholderQuery, err := tracesQueryBuilder(
start,
end,
params.CompositeQuery.PanelType,
@@ -186,7 +192,7 @@ func (q *querier) runBuilderQuery(
}
query = fmt.Sprintf(placeholderQuery, limitQuery)
} else {
query, err = tracesV3.PrepareTracesQuery(
query, err = tracesQueryBuilder(
start,
end,
params.CompositeQuery.PanelType,

View File

@@ -11,6 +11,8 @@ import (
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/common"
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
"go.signoz.io/signoz/pkg/query-service/querycache"
@@ -52,7 +54,8 @@ type querier struct {
returnedSeries []*v3.Series
returnedErr error
UseLogsNewSchema bool
UseLogsNewSchema bool
UseTraceNewSchema bool
}
type QuerierOptions struct {
@@ -63,10 +66,11 @@ type QuerierOptions struct {
FeatureLookup interfaces.FeatureLookup
// used for testing
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
UseLogsNewSchema bool
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
UseLogsNewSchema bool
UseTraceNewSchema bool
}
func NewQuerier(opts QuerierOptions) interfaces.Querier {
@@ -74,6 +78,10 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
if opts.UseLogsNewSchema {
logsQueryBuilder = logsV4.PrepareLogsQuery
}
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if opts.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
@@ -85,16 +93,17 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
fluxInterval: opts.FluxInterval,
builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{
BuildTraceQuery: tracesV3.PrepareTracesQuery,
BuildTraceQuery: tracesQueryBuilder,
BuildLogQuery: logsQueryBuilder,
BuildMetricQuery: metricsV3.PrepareMetricQuery,
}, opts.FeatureLookup),
featureLookUp: opts.FeatureLookup,
testingMode: opts.TestingMode,
returnedSeries: opts.ReturnedSeries,
returnedErr: opts.ReturnedErr,
UseLogsNewSchema: opts.UseLogsNewSchema,
testingMode: opts.TestingMode,
returnedSeries: opts.ReturnedSeries,
returnedErr: opts.ReturnedErr,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
}
}

View File

@@ -11,6 +11,7 @@ import (
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
@@ -158,11 +159,16 @@ func (q *querier) runBuilderQuery(
if builderQuery.DataSource == v3.DataSourceTraces {
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if q.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
var query string
var err error
// for ts query with group by and limit form two queries
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
limitQuery, err := tracesV3.PrepareTracesQuery(
limitQuery, err := tracesQueryBuilder(
start,
end,
params.CompositeQuery.PanelType,
@@ -173,7 +179,7 @@ func (q *querier) runBuilderQuery(
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
return
}
placeholderQuery, err := tracesV3.PrepareTracesQuery(
placeholderQuery, err := tracesQueryBuilder(
start,
end,
params.CompositeQuery.PanelType,
@@ -186,7 +192,7 @@ func (q *querier) runBuilderQuery(
}
query = fmt.Sprintf(placeholderQuery, limitQuery)
} else {
query, err = tracesV3.PrepareTracesQuery(
query, err = tracesQueryBuilder(
start,
end,
params.CompositeQuery.PanelType,

View File

@@ -11,6 +11,7 @@ import (
metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/common"
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
"go.signoz.io/signoz/pkg/query-service/querycache"
@@ -48,10 +49,11 @@ type querier struct {
testingMode bool
queriesExecuted []string
// tuple of start and end time in milliseconds
timeRanges [][]int
returnedSeries []*v3.Series
returnedErr error
UseLogsNewSchema bool
timeRanges [][]int
returnedSeries []*v3.Series
returnedErr error
UseLogsNewSchema bool
UseTraceNewSchema bool
}
type QuerierOptions struct {
@@ -62,10 +64,11 @@ type QuerierOptions struct {
FeatureLookup interfaces.FeatureLookup
// used for testing
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
UseLogsNewSchema bool
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
UseLogsNewSchema bool
UseTraceNewSchema bool
}
func NewQuerier(opts QuerierOptions) interfaces.Querier {
@@ -74,6 +77,11 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
logsQueryBuilder = logsV4.PrepareLogsQuery
}
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if opts.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
return &querier{
@@ -84,7 +92,7 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
fluxInterval: opts.FluxInterval,
builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{
BuildTraceQuery: tracesV3.PrepareTracesQuery,
BuildTraceQuery: tracesQueryBuilder,
BuildLogQuery: logsQueryBuilder,
BuildMetricQuery: metricsV4.PrepareMetricQuery,
}, opts.FeatureLookup),

View File

@@ -67,6 +67,7 @@ type ServerOptions struct {
FluxInterval string
Cluster string
UseLogsNewSchema bool
UseTraceNewSchema bool
}
// Server runs HTTP, Mux and a grpc server
@@ -130,6 +131,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.DialTimeout,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
)
go clickhouseReader.Start(readerReady)
reader = clickhouseReader
@@ -157,7 +159,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
rm, err := makeRulesManager(
serverOptions.PromConfigPath,
constants.GetAlertManagerApiPrefix(),
serverOptions.RuleRepoURL, localDB, reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema)
serverOptions.RuleRepoURL, localDB, reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema)
if err != nil {
return nil, err
}
@@ -202,6 +204,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
Cache: c,
FluxInterval: fluxInterval,
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
})
if err != nil {
return nil, err
@@ -721,7 +724,8 @@ func makeRulesManager(
cache cache.Cache,
disableRules bool,
fm interfaces.FeatureLookup,
useLogsNewSchema bool) (*rules.Manager, error) {
useLogsNewSchema bool,
useTraceNewSchema bool) (*rules.Manager, error) {
// create engine
pqle, err := pqle.FromReader(ch)
@@ -738,18 +742,19 @@ func makeRulesManager(
// create manager opts
managerOpts := &rules.ManagerOptions{
NotifierOpts: notifierOpts,
PqlEngine: pqle,
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),
Logger: zap.L(),
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
Cache: cache,
EvalDelay: constants.GetEvalDelay(),
UseLogsNewSchema: useLogsNewSchema,
NotifierOpts: notifierOpts,
PqlEngine: pqle,
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),
Logger: zap.L(),
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
Cache: cache,
EvalDelay: constants.GetEvalDelay(),
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
}
// create Manager

View File

@@ -10,7 +10,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/utils"
)
var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{
var AggregateOperatorToPercentile = map[v3.AggregateOperator]float64{
v3.AggregateOperatorP05: 0.05,
v3.AggregateOperatorP10: 0.10,
v3.AggregateOperatorP20: 0.20,
@@ -22,7 +22,7 @@ var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{
v3.AggregateOperatorP99: 0.99,
}
var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{
var AggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{
v3.AggregateOperatorAvg: "avg",
v3.AggregateOperatorMax: "max",
v3.AggregateOperatorMin: "min",
@@ -109,7 +109,7 @@ func getSelectLabels(aggregatorOperator v3.AggregateOperator, groupBy []v3.Attri
return selectLabels
}
func getSelectKeys(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string {
func GetSelectKeys(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string {
var selectLabels []string
if aggregatorOperator == v3.AggregateOperatorNoOp {
return ""
@@ -173,7 +173,7 @@ func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) {
conditions = append(conditions, fmt.Sprintf(operator, columnName, fmtVal))
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
if item.Key.IsColumn {
subQuery, err := existsSubQueryForFixedColumn(item.Key, item.Operator)
subQuery, err := ExistsSubQueryForFixedColumn(item.Key, item.Operator)
if err != nil {
return "", err
}
@@ -199,7 +199,7 @@ func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) {
return queryString, nil
}
func existsSubQueryForFixedColumn(key v3.AttributeKey, op v3.FilterOperator) (string, error) {
func ExistsSubQueryForFixedColumn(key v3.AttributeKey, op v3.FilterOperator) (string, error) {
if key.DataType == v3.AttributeKeyDataTypeString {
if op == v3.FilterOperatorExists {
return fmt.Sprintf("%s %s ''", key.Key, tracesOperatorMappingV3[v3.FilterOperatorNotEqual]), nil
@@ -244,7 +244,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan
selectLabels := getSelectLabels(mq.AggregateOperator, mq.GroupBy)
having := having(mq.Having)
having := Having(mq.Having)
if having != "" {
having = " having " + having
}
@@ -272,7 +272,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan
// we don't need value for first query
if options.GraphLimitQtype == constants.FirstQueryGraphLimit {
queryTmpl = "SELECT " + getSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")"
queryTmpl = "SELECT " + GetSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")"
}
emptyValuesInGroupByFilter, err := handleEmptyValuesInGroupBy(mq.GroupBy)
@@ -281,7 +281,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan
}
filterSubQuery += emptyValuesInGroupByFilter
groupBy := groupByAttributeKeyTags(panelType, options.GraphLimitQtype, mq.GroupBy...)
groupBy := GroupByAttributeKeyTags(panelType, options.GraphLimitQtype, mq.GroupBy...)
if groupBy != "" {
groupBy = " group by " + groupBy
}
@@ -291,7 +291,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan
}
if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", getSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "%s)"
filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "%s)"
}
aggregationKey := ""
@@ -311,7 +311,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan
rate = rate / 60.0
}
op := fmt.Sprintf("%s(%s)/%f", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate)
op := fmt.Sprintf("%s(%s)/%f", AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case
@@ -324,17 +324,17 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan
v3.AggregateOperatorP90,
v3.AggregateOperatorP95,
v3.AggregateOperatorP99:
op := fmt.Sprintf("quantile(%v)(%s)", aggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey)
op := fmt.Sprintf("quantile(%v)(%s)", AggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax:
op := fmt.Sprintf("%s(%s)", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey)
op := fmt.Sprintf("%s(%s)", AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case v3.AggregateOperatorCount:
if mq.AggregateAttribute.Key != "" {
if mq.AggregateAttribute.IsColumn {
subQuery, err := existsSubQueryForFixedColumn(mq.AggregateAttribute, v3.FilterOperatorExists)
subQuery, err := ExistsSubQueryForFixedColumn(mq.AggregateAttribute, v3.FilterOperatorExists)
if err == nil {
filterSubQuery = fmt.Sprintf("%s AND %s", filterSubQuery, subQuery)
}
@@ -354,9 +354,9 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan
var query string
if panelType == v3.PanelTypeTrace {
withSubQuery := fmt.Sprintf(constants.TracesExplorerViewSQLSelectWithSubQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME, spanIndexTableTimeFilter, filterSubQuery)
withSubQuery = addLimitToQuery(withSubQuery, mq.Limit)
withSubQuery = AddLimitToQuery(withSubQuery, mq.Limit)
if mq.Offset != 0 {
withSubQuery = addOffsetToQuery(withSubQuery, mq.Offset)
withSubQuery = AddOffsetToQuery(withSubQuery, mq.Offset)
}
// query = withSubQuery + ") " + fmt.Sprintf(constants.TracesExplorerViewSQLSelectQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_TABLENAME, constants.SIGNOZ_SPAN_INDEX_TABLENAME)
query = fmt.Sprintf(constants.TracesExplorerViewSQLSelectBeforeSubQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_TABLENAME) + withSubQuery + ") " + fmt.Sprintf(constants.TracesExplorerViewSQLSelectAfterSubQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_TABLENAME, spanIndexTableTimeFilter)
@@ -403,7 +403,7 @@ func groupBy(panelType v3.PanelType, graphLimitQtype string, tags ...string) str
return strings.Join(tags, ",")
}
func groupByAttributeKeyTags(panelType v3.PanelType, graphLimitQtype string, tags ...v3.AttributeKey) string {
func GroupByAttributeKeyTags(panelType v3.PanelType, graphLimitQtype string, tags ...v3.AttributeKey) string {
groupTags := []string{}
for _, tag := range tags {
groupTags = append(groupTags, fmt.Sprintf("`%s`", tag.Key))
@@ -456,7 +456,7 @@ func orderByAttributeKeyTags(panelType v3.PanelType, items []v3.OrderBy, tags []
return str
}
func having(items []v3.Having) string {
func Having(items []v3.Having) string {
// aggregate something and filter on that aggregate
var having []string
for _, item := range items {
@@ -465,7 +465,7 @@ func having(items []v3.Having) string {
return strings.Join(having, " AND ")
}
func reduceToQuery(query string, reduceTo v3.ReduceToOperator, _ v3.AggregateOperator) (string, error) {
func ReduceToQuery(query string, reduceTo v3.ReduceToOperator, _ v3.AggregateOperator) (string, error) {
var groupBy string
switch reduceTo {
@@ -485,14 +485,14 @@ func reduceToQuery(query string, reduceTo v3.ReduceToOperator, _ v3.AggregateOpe
return query, nil
}
func addLimitToQuery(query string, limit uint64) string {
func AddLimitToQuery(query string, limit uint64) string {
if limit == 0 {
limit = 100
}
return fmt.Sprintf("%s LIMIT %d", query, limit)
}
func addOffsetToQuery(query string, offset uint64) string {
func AddOffsetToQuery(query string, offset uint64) string {
return fmt.Sprintf("%s OFFSET %d", query, offset)
}
@@ -509,7 +509,7 @@ func PrepareTracesQuery(start, end int64, panelType v3.PanelType, mq *v3.Builder
if err != nil {
return "", err
}
query = addLimitToQuery(query, mq.Limit)
query = AddLimitToQuery(query, mq.Limit)
return query, nil
} else if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
@@ -525,13 +525,13 @@ func PrepareTracesQuery(start, end int64, panelType v3.PanelType, mq *v3.Builder
return "", err
}
if panelType == v3.PanelTypeValue {
query, err = reduceToQuery(query, mq.ReduceTo, mq.AggregateOperator)
query, err = ReduceToQuery(query, mq.ReduceTo, mq.AggregateOperator)
}
if panelType == v3.PanelTypeList || panelType == v3.PanelTypeTable {
query = addLimitToQuery(query, mq.Limit)
query = AddLimitToQuery(query, mq.Limit)
if mq.Offset != 0 {
query = addOffsetToQuery(query, mq.Offset)
query = AddOffsetToQuery(query, mq.Offset)
}
}
return query, err

View File

@@ -0,0 +1,84 @@
package v4
import v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
func enrichKeyWithMetadata(key v3.AttributeKey, keys map[string]v3.AttributeKey) v3.AttributeKey {
// TODO(nitya) : update logic similar to logs
if key.Type == "" || key.DataType == "" {
// check if the key is present in the keys map
if existingKey, ok := keys[key.Key]; ok {
key.IsColumn = existingKey.IsColumn
key.Type = existingKey.Type
key.DataType = existingKey.DataType
} else { // if not present then set the default values
key.Type = v3.AttributeKeyTypeTag
key.DataType = v3.AttributeKeyDataTypeString
key.IsColumn = false
return key
}
}
return key
}
func Enrich(params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) {
if params.CompositeQuery.QueryType == v3.QueryTypeBuilder {
for _, query := range params.CompositeQuery.BuilderQueries {
if query.DataSource == v3.DataSourceTraces {
EnrichTracesQuery(query, keys)
}
}
}
}
func EnrichTracesQuery(query *v3.BuilderQuery, keys map[string]v3.AttributeKey) {
// enrich aggregate attribute
query.AggregateAttribute = enrichKeyWithMetadata(query.AggregateAttribute, keys)
// enrich filter items
if query.Filters != nil && len(query.Filters.Items) > 0 {
for idx, filter := range query.Filters.Items {
query.Filters.Items[idx].Key = enrichKeyWithMetadata(filter.Key, keys)
// if the serviceName column is used, use the corresponding resource attribute as well during filtering
if filter.Key.Key == "serviceName" && filter.Key.IsColumn {
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "service.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
IsColumn: false,
},
Operator: filter.Operator,
Value: filter.Value,
})
}
}
}
// enrich group by
for idx, groupBy := range query.GroupBy {
query.GroupBy[idx] = enrichKeyWithMetadata(groupBy, keys)
}
// enrich order by
query.OrderBy = enrichOrderBy(query.OrderBy, keys)
// enrich select columns
for idx, selectColumn := range query.SelectColumns {
query.SelectColumns[idx] = enrichKeyWithMetadata(selectColumn, keys)
}
}
func enrichOrderBy(items []v3.OrderBy, keys map[string]v3.AttributeKey) []v3.OrderBy {
enrichedItems := []v3.OrderBy{}
for i := 0; i < len(items); i++ {
attributeKey := enrichKeyWithMetadata(v3.AttributeKey{
Key: items[i].ColumnName,
}, keys)
enrichedItems = append(enrichedItems, v3.OrderBy{
ColumnName: items[i].ColumnName,
Order: items[i].Order,
Key: attributeKey.Key,
DataType: attributeKey.DataType,
Type: attributeKey.Type,
IsColumn: attributeKey.IsColumn,
})
}
return enrichedItems
}

View File

@@ -0,0 +1,57 @@
package v4
import (
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
var testColumnName = []struct {
Name string
AttributeKey v3.AttributeKey
ExpectedColumn string
}{
{
Name: "resource",
AttributeKey: v3.AttributeKey{Key: "collector_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource, IsColumn: false},
ExpectedColumn: "resourceTagsMap['collector_id']",
},
{
Name: "stringAttribute",
AttributeKey: v3.AttributeKey{Key: "customer_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: false},
ExpectedColumn: "stringTagMap['customer_id']",
},
{
Name: "boolAttribute",
AttributeKey: v3.AttributeKey{Key: "has_error", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag, IsColumn: false},
ExpectedColumn: "boolTagMap['has_error']",
},
{
Name: "float64Attribute",
AttributeKey: v3.AttributeKey{Key: "count", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag, IsColumn: false},
ExpectedColumn: "numberTagMap['count']",
},
{
Name: "int64Attribute",
AttributeKey: v3.AttributeKey{Key: "count", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag, IsColumn: false},
ExpectedColumn: "numberTagMap['count']",
},
{
Name: "column",
AttributeKey: v3.AttributeKey{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true},
ExpectedColumn: "name",
},
{
Name: "missing key",
AttributeKey: v3.AttributeKey{Key: "xyz"},
ExpectedColumn: "stringTagMap['xyz']",
},
}
// func TestColumnName(t *testing.T) {
// for _, tt := range testColumnName {
// tt.AttributeKey = enrichKeyWithMetadata(tt.AttributeKey, map[string]v3.AttributeKey{})
// Convey("testColumnName", t, func() {
// Column := getColumnName(tt.AttributeKey)
// So(Column, ShouldEqual, tt.ExpectedColumn)
// })
// }
// }

View File

@@ -0,0 +1,417 @@
package v4
import (
"fmt"
"strings"
"go.signoz.io/signoz/pkg/query-service/app/resource"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
)
type Options struct {
GraphLimitQtype string
PreferRPM bool
}
var tracesOperatorMappingV3 = map[v3.FilterOperator]string{
v3.FilterOperatorIn: "IN",
v3.FilterOperatorNotIn: "NOT IN",
v3.FilterOperatorEqual: "=",
v3.FilterOperatorNotEqual: "!=",
v3.FilterOperatorLessThan: "<",
v3.FilterOperatorLessThanOrEq: "<=",
v3.FilterOperatorGreaterThan: ">",
v3.FilterOperatorGreaterThanOrEq: ">=",
v3.FilterOperatorLike: "ILIKE",
v3.FilterOperatorNotLike: "NOT ILIKE",
v3.FilterOperatorRegex: "match(%s, %s)",
v3.FilterOperatorNotRegex: "NOT match(%s, %s)",
v3.FilterOperatorContains: "ILIKE",
v3.FilterOperatorNotContains: "NOT ILIKE",
v3.FilterOperatorExists: "mapContains(%s, '%s')",
v3.FilterOperatorNotExists: "NOT has(%s%s, '%s')",
}
func getClickHouseTracesColumnType(columnType v3.AttributeKeyType) string {
if columnType == v3.AttributeKeyTypeResource {
return "resources"
}
return "attributes"
}
func getClickHouseTracesColumnDataType(columnDataType v3.AttributeKeyDataType) string {
if columnDataType == v3.AttributeKeyDataTypeFloat64 || columnDataType == v3.AttributeKeyDataTypeInt64 {
return "number"
}
if columnDataType == v3.AttributeKeyDataTypeBool {
return "bool"
}
return "string"
}
func getColumnName(key v3.AttributeKey) string {
// TODO (nitya):
// consider routing things like serviceName col to service.name resource attribute for filtering
// consider using static details for some columns
if key.IsColumn {
return "`" + key.Key + "`"
}
keyType := getClickHouseTracesColumnType(key.Type)
keyDType := getClickHouseTracesColumnDataType(key.DataType)
return fmt.Sprintf("%s_%s['%s']", keyType, keyDType, key.Key)
}
// getSelectLabels returns the select labels for the query based on groupBy and aggregateOperator
func getSelectLabels(groupBy []v3.AttributeKey) string {
var labels []string
for _, tag := range groupBy {
name := getColumnName(tag)
labels = append(labels, fmt.Sprintf(" %s as `%s`", name, tag.Key))
}
return strings.Join(labels, ",")
}
func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) {
var conditions []string
if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items {
// skip if it's a resource attribute
if item.Key.Type == v3.AttributeKeyTypeResource {
continue
}
val := item.Value
// generate the key
columnName := getColumnName(item.Key)
var fmtVal string
item.Operator = v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
if item.Operator != v3.FilterOperatorExists && item.Operator != v3.FilterOperatorNotExists {
var err error
val, err = utils.ValidateAndCastValue(val, item.Key.DataType)
if err != nil {
return "", fmt.Errorf("invalid value for key %s: %v", item.Key.Key, err)
}
}
if val != nil {
fmtVal = utils.ClickHouseFormattedValue(val)
}
if operator, ok := tracesOperatorMappingV3[item.Operator]; ok {
switch item.Operator {
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
val = utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
conditions = append(conditions, fmt.Sprintf("%s %s '%%%s%%'", columnName, operator, val))
case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex:
conditions = append(conditions, fmt.Sprintf(operator, columnName, fmtVal))
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
if item.Key.IsColumn {
subQuery, err := tracesV3.ExistsSubQueryForFixedColumn(item.Key, item.Operator)
if err != nil {
return "", err
}
conditions = append(conditions, subQuery)
} else {
cType := getClickHouseTracesColumnType(item.Key.Type)
cDataType := getClickHouseTracesColumnDataType(item.Key.DataType)
col := fmt.Sprintf("%s_%s", cType, cDataType)
conditions = append(conditions, fmt.Sprintf(operator, col, item.Key.Key))
}
default:
conditions = append(conditions, fmt.Sprintf("%s %s %s", columnName, operator, fmtVal))
}
} else {
return "", fmt.Errorf("unsupported operator %s", item.Operator)
}
}
}
queryString := strings.Join(conditions, " AND ")
return queryString, nil
}
func handleEmptyValuesInGroupBy(groupBy []v3.AttributeKey) (string, error) {
filterItems := []v3.FilterItem{}
if len(groupBy) != 0 {
for _, item := range groupBy {
if !item.IsColumn {
filterItems = append(filterItems, v3.FilterItem{
Key: item,
Operator: v3.FilterOperatorExists,
})
}
}
}
if len(filterItems) != 0 {
filterSet := v3.FilterSet{
Operator: "AND",
Items: filterItems,
}
return buildTracesFilterQuery(&filterSet)
}
return "", nil
}
const NANOSECOND = 1000000000
// orderBy returns a string of comma separated tags for order by clause
// if there are remaining items which are not present in tags they are also added
// if the order is not specified, it defaults to ASC
func orderBy(panelType v3.PanelType, items []v3.OrderBy, tagLookup map[string]struct{}) []string {
var orderBy []string
for _, item := range items {
if item.ColumnName == constants.SigNozOrderByValue {
orderBy = append(orderBy, fmt.Sprintf("value %s", item.Order))
} else if _, ok := tagLookup[item.ColumnName]; ok {
orderBy = append(orderBy, fmt.Sprintf("`%s` %s", item.ColumnName, item.Order))
} else if panelType == v3.PanelTypeList {
attr := v3.AttributeKey{Key: item.ColumnName, DataType: item.DataType, Type: item.Type, IsColumn: item.IsColumn}
name := getColumnName(attr)
if item.IsColumn {
orderBy = append(orderBy, fmt.Sprintf("%s %s", name, item.Order))
} else {
orderBy = append(orderBy, fmt.Sprintf("%s %s", name, item.Order))
}
}
}
return orderBy
}
func orderByAttributeKeyTags(panelType v3.PanelType, items []v3.OrderBy, tags []v3.AttributeKey) string {
tagLookup := map[string]struct{}{}
for _, v := range tags {
tagLookup[v.Key] = struct{}{}
}
orderByArray := orderBy(panelType, items, tagLookup)
// TODO: check this with logs
if len(orderByArray) == 0 {
if panelType == v3.PanelTypeList {
orderByArray = append(orderByArray, constants.TIMESTAMP+" DESC")
} else if panelType == v3.PanelTypeGraph {
orderByArray = append(orderByArray, "value DESC")
}
}
str := strings.Join(orderByArray, ",")
return str
}
func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, panelType v3.PanelType, options v3.QBOptions) (string, error) {
tracesStart := utils.GetEpochNanoSecs(start)
tracesEnd := utils.GetEpochNanoSecs(end)
// -1800 this is added so that the bucket start considers all the fingerprints.
bucketStart := tracesStart/NANOSECOND - 1800
bucketEnd := tracesEnd / NANOSECOND
timeFilter := fmt.Sprintf("(timestamp >= '%d' AND timestamp <= '%d') AND (ts_bucket_start >= %d AND ts_bucket_start <= %d)", tracesStart, tracesEnd, bucketStart, bucketEnd)
filterSubQuery, err := buildTracesFilterQuery(mq.Filters)
if err != nil {
return "", err
}
if filterSubQuery != "" {
filterSubQuery = " AND " + filterSubQuery
}
resourceSubQuery, err := resource.BuildResourceSubQuery("signoz_traces", "distributed_traces_v3_resource", bucketStart, bucketEnd, mq.Filters, mq.GroupBy, mq.AggregateAttribute, false)
if err != nil {
return "", err
}
// join both the filter clauses
if resourceSubQuery != "" {
filterSubQuery = filterSubQuery + " AND (resource_fingerprint GLOBAL IN " + resourceSubQuery + ")"
}
// timerange will be sent in epoch millisecond
selectLabels := getSelectLabels(mq.GroupBy)
orderBy := orderByAttributeKeyTags(panelType, mq.OrderBy, mq.GroupBy)
if orderBy != "" {
orderBy = " order by " + orderBy
}
if mq.AggregateOperator == v3.AggregateOperatorNoOp {
var query string
if panelType == v3.PanelTypeTrace {
withSubQuery := fmt.Sprintf(constants.TracesExplorerViewSQLSelectWithSubQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_V3_LOCAL_TABLENAME, timeFilter, filterSubQuery)
withSubQuery = tracesV3.AddLimitToQuery(withSubQuery, mq.Limit)
if mq.Offset != 0 {
withSubQuery = tracesV3.AddOffsetToQuery(withSubQuery, mq.Offset)
}
query = fmt.Sprintf(constants.TracesExplorerViewSQLSelectBeforeSubQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_V3) + withSubQuery + ") " + fmt.Sprintf(constants.TracesExplorerViewSQLSelectAfterSubQuery, constants.SIGNOZ_TRACE_DBNAME, constants.SIGNOZ_SPAN_INDEX_V3, timeFilter)
} else if panelType == v3.PanelTypeList {
if len(mq.SelectColumns) == 0 {
return "", fmt.Errorf("select columns cannot be empty for panelType %s", panelType)
}
selectLabels = getSelectLabels(mq.SelectColumns)
queryNoOpTmpl := fmt.Sprintf("SELECT timestamp as timestamp_datetime, spanID, traceID,%s ", selectLabels) + "from " + constants.SIGNOZ_TRACE_DBNAME + "." + constants.SIGNOZ_SPAN_INDEX_V3 + " where %s %s" + "%s"
query = fmt.Sprintf(queryNoOpTmpl, timeFilter, filterSubQuery, orderBy)
} else {
return "", fmt.Errorf("unsupported aggregate operator %s for panelType %s", mq.AggregateOperator, panelType)
}
return query, nil
// ---- NOOP ends here ----
}
having := tracesV3.Having(mq.Having)
if having != "" {
having = " having " + having
}
groupBy := tracesV3.GroupByAttributeKeyTags(panelType, options.GraphLimitQtype, mq.GroupBy...)
if groupBy != "" {
groupBy = " group by " + groupBy
}
aggregationKey := ""
if mq.AggregateAttribute.Key != "" {
aggregationKey = getColumnName(mq.AggregateAttribute)
}
var queryTmpl string
if options.GraphLimitQtype == constants.FirstQueryGraphLimit {
queryTmpl = "SELECT"
} else if panelType == v3.PanelTypeTable {
queryTmpl =
"SELECT "
// step or aggregate interval is whole time period in case of table panel
step = (tracesEnd - tracesStart) / 1000000000
} else if panelType == v3.PanelTypeGraph || panelType == v3.PanelTypeValue {
// Select the aggregate value for interval
queryTmpl =
fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d SECOND) AS ts,", step)
}
queryTmpl = queryTmpl + selectLabels +
" %s as value " +
"from " + constants.SIGNOZ_TRACE_DBNAME + "." + constants.SIGNOZ_SPAN_INDEX_V3 +
" where " + timeFilter + "%s" +
"%s%s" +
"%s"
// we don't need value for first query
if options.GraphLimitQtype == constants.FirstQueryGraphLimit {
queryTmpl = "SELECT " + tracesV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")"
}
emptyValuesInGroupByFilter, err := handleEmptyValuesInGroupBy(mq.GroupBy)
if err != nil {
return "", err
}
filterSubQuery += emptyValuesInGroupByFilter
if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", tracesV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "%s)"
}
switch mq.AggregateOperator {
case v3.AggregateOperatorRateSum,
v3.AggregateOperatorRateMax,
v3.AggregateOperatorRateAvg,
v3.AggregateOperatorRateMin,
v3.AggregateOperatorRate:
rate := float64(step)
if options.PreferRPM {
rate = rate / 60.0
}
op := fmt.Sprintf("%s(%s)/%f", tracesV3.AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case
v3.AggregateOperatorP05,
v3.AggregateOperatorP10,
v3.AggregateOperatorP20,
v3.AggregateOperatorP25,
v3.AggregateOperatorP50,
v3.AggregateOperatorP75,
v3.AggregateOperatorP90,
v3.AggregateOperatorP95,
v3.AggregateOperatorP99:
op := fmt.Sprintf("quantile(%v)(%s)", tracesV3.AggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax:
op := fmt.Sprintf("%s(%s)", tracesV3.AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case v3.AggregateOperatorCount:
if mq.AggregateAttribute.Key != "" {
if mq.AggregateAttribute.IsColumn {
subQuery, err := tracesV3.ExistsSubQueryForFixedColumn(mq.AggregateAttribute, v3.FilterOperatorExists)
if err == nil {
filterSubQuery = fmt.Sprintf("%s AND %s", filterSubQuery, subQuery)
}
} else {
// columnType, columnDataType := getClickhouseTracesColumnDataTypeAndType(mq.AggregateAttribute)
column := getColumnName(mq.AggregateAttribute)
filterSubQuery = fmt.Sprintf("%s AND has(%s, '%s')", filterSubQuery, column, mq.AggregateAttribute.Key)
}
}
op := "toFloat64(count())"
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case v3.AggregateOperatorCountDistinct:
op := fmt.Sprintf("toFloat64(count(distinct(%s)))", aggregationKey)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
// case v3.AggregateOperatorNoOp:
// return query, nil
default:
return "", fmt.Errorf("unsupported aggregate operator %s", mq.AggregateOperator)
}
}
// PrepareTracesQuery returns the query string for traces
// start and end are in epoch millisecond
// step is in seconds
func PrepareTracesQuery(start, end int64, panelType v3.PanelType, mq *v3.BuilderQuery, options v3.QBOptions) (string, error) {
// adjust the start and end time to the step interval
start = start - (start % (mq.StepInterval * 1000))
end = end - (end % (mq.StepInterval * 1000))
if options.GraphLimitQtype == constants.FirstQueryGraphLimit {
// give me just the group by names
query, err := buildTracesQuery(start, end, mq.StepInterval, mq, panelType, options)
if err != nil {
return "", err
}
query = tracesV3.AddLimitToQuery(query, mq.Limit)
return query, nil
} else if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
query, err := buildTracesQuery(start, end, mq.StepInterval, mq, panelType, options)
if err != nil {
return "", err
}
return query, nil
}
query, err := buildTracesQuery(start, end, mq.StepInterval, mq, panelType, options)
if err != nil {
return "", err
}
if panelType == v3.PanelTypeValue {
query, err = tracesV3.ReduceToQuery(query, mq.ReduceTo, mq.AggregateOperator)
}
if panelType == v3.PanelTypeList || panelType == v3.PanelTypeTable {
query = tracesV3.AddLimitToQuery(query, mq.Limit)
if mq.Offset != 0 {
query = tracesV3.AddOffsetToQuery(query, mq.Offset)
}
}
return query, err
}

View File

@@ -0,0 +1,539 @@
package v4
import (
"testing"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func Test_getClickHouseTracesColumnType(t *testing.T) {
type args struct {
columnType v3.AttributeKeyType
}
tests := []struct {
name string
args args
want string
}{
{
name: "tag",
args: args{
columnType: v3.AttributeKeyTypeTag,
},
want: "attributes",
},
{
name: "resource",
args: args{
columnType: v3.AttributeKeyTypeResource,
},
want: "resources",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getClickHouseTracesColumnType(tt.args.columnType); got != tt.want {
t.Errorf("GetClickhouseTracesColumnType() = %v, want %v", got, tt.want)
}
})
}
}
func Test_getClickHouseTracesColumnDataType(t *testing.T) {
type args struct {
columnDataType v3.AttributeKeyDataType
}
tests := []struct {
name string
args args
want string
}{
{
name: "string",
args: args{
columnDataType: v3.AttributeKeyDataTypeString,
},
want: "string",
},
{
name: "float64",
args: args{
columnDataType: v3.AttributeKeyDataTypeFloat64,
},
want: "number",
},
{
name: "int64",
args: args{
columnDataType: v3.AttributeKeyDataTypeInt64,
},
want: "number",
},
{
name: "bool",
args: args{
columnDataType: v3.AttributeKeyDataTypeBool,
},
want: "bool",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getClickHouseTracesColumnDataType(tt.args.columnDataType); got != tt.want {
t.Errorf("getClickhouseTracesColumnDataType() = %v, want %v", got, tt.want)
}
})
}
}
func Test_getColumnName(t *testing.T) {
type args struct {
key v3.AttributeKey
}
tests := []struct {
name string
args args
want string
}{
{
name: "tag",
args: args{
key: v3.AttributeKey{Key: "data", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag},
},
want: "attributes_string['data']",
},
{
name: "column",
args: args{
key: v3.AttributeKey{Key: "data", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true},
},
want: "`data`",
},
{
name: "missing meta",
args: args{
key: v3.AttributeKey{Key: "xyz"},
},
want: "attributes_string['xyz']",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getColumnName(tt.args.key); got != tt.want {
t.Errorf("getColumnName() = %v, want %v", got, tt.want)
}
})
}
}
func Test_getSelectLabels(t *testing.T) {
type args struct {
groupBy []v3.AttributeKey
}
tests := []struct {
name string
args args
want string
}{
{
name: "count",
args: args{
groupBy: []v3.AttributeKey{{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}},
},
want: " attributes_string['user_name'] as `user_name`",
},
{
name: "multiple group by",
args: args{
groupBy: []v3.AttributeKey{
{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag},
{Key: "service_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource, IsColumn: true},
},
},
want: " attributes_string['user_name'] as `user_name`, `service_name` as `service_name`",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getSelectLabels(tt.args.groupBy); got != tt.want {
t.Errorf("getSelectLabels() = %v, want %v", got, tt.want)
}
})
}
}
func Test_buildTracesFilterQuery(t *testing.T) {
type args struct {
fs *v3.FilterSet
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "Test buildTracesFilterQuery in, nin",
args: args{
fs: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: []interface{}{"GET", "POST"}, Operator: v3.FilterOperatorIn},
{Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: []interface{}{"PUT"}, Operator: v3.FilterOperatorNotIn},
{Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: []interface{}{"server"}, Operator: v3.FilterOperatorNotIn},
}},
},
want: "attributes_string['method'] IN ['GET','POST'] AND attributes_string['method'] NOT IN ['PUT']",
wantErr: false,
},
{
name: "Test buildTracesFilterQuery not eq, neq, gt, lt, gte, lte",
args: args{
fs: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "duration", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}, Value: 102, Operator: v3.FilterOperatorEqual},
{Key: v3.AttributeKey{Key: "duration", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}, Value: 100, Operator: v3.FilterOperatorNotEqual},
{Key: v3.AttributeKey{Key: "duration", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}, Value: 10, Operator: v3.FilterOperatorGreaterThan},
{Key: v3.AttributeKey{Key: "duration", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}, Value: 200, Operator: v3.FilterOperatorLessThan},
{Key: v3.AttributeKey{Key: "duration", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}, Value: 10, Operator: v3.FilterOperatorGreaterThanOrEq},
{Key: v3.AttributeKey{Key: "duration", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}, Value: 200, Operator: v3.FilterOperatorLessThanOrEq},
}},
},
want: "attributes_number['duration'] = 102 AND attributes_number['duration'] != 100 AND attributes_number['duration'] > 10 AND attributes_number['duration'] < 200" +
" AND attributes_number['duration'] >= 10 AND attributes_number['duration'] <= 200",
wantErr: false,
},
{
name: "Test contains, ncontains, like, nlike, regex, nregex",
args: args{
fs: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "102.", Operator: v3.FilterOperatorContains},
{Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "103", Operator: v3.FilterOperatorNotContains},
{Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "102.", Operator: v3.FilterOperatorLike},
{Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "102", Operator: v3.FilterOperatorNotLike},
{Key: v3.AttributeKey{Key: "path", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "/mypath", Operator: v3.FilterOperatorRegex},
{Key: v3.AttributeKey{Key: "path", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "/health.*", Operator: v3.FilterOperatorNotRegex},
}},
},
want: "attributes_string['host'] ILIKE '%102.%' AND attributes_string['host'] NOT ILIKE '%103%' AND attributes_string['host'] ILIKE '102.' AND attributes_string['host'] NOT ILIKE '102' AND " +
"match(`path`, '/mypath') AND NOT match(`path`, '/health.*')",
},
{
name: "Test exists, nexists",
args: args{
fs: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Operator: v3.FilterOperatorExists},
{Key: v3.AttributeKey{Key: "path", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Operator: v3.FilterOperatorNotExists},
}},
},
want: "mapContains(attributes_string, 'host') AND path = ''",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := buildTracesFilterQuery(tt.args.fs)
if (err != nil) != tt.wantErr {
t.Errorf("buildTracesFilterQuery() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("buildTracesFilterQuery() = %v, want %v", got, tt.want)
}
})
}
}
var handleEmptyValuesInGroupByData = []struct {
Name string
GroupBy []v3.AttributeKey
ExpectedFilter string
}{
{
Name: "String type key",
GroupBy: []v3.AttributeKey{{Key: "bytes", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}},
ExpectedFilter: " AND has(stringTagMap, 'bytes')",
},
{
Name: "fixed column type key",
GroupBy: []v3.AttributeKey{{Key: "bytes", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}},
ExpectedFilter: "",
},
{
Name: "String, float64 and fixed column type key",
GroupBy: []v3.AttributeKey{
{Key: "bytes", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag},
{Key: "count", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag},
{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true},
},
ExpectedFilter: " AND has(stringTagMap, 'bytes') AND has(numberTagMap, 'count')",
},
}
func Test_handleEmptyValuesInGroupBy(t *testing.T) {
type args struct {
groupBy []v3.AttributeKey
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "Test handleEmptyValuesInGroupBy",
args: args{
groupBy: []v3.AttributeKey{{Key: "bytes", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}},
},
want: "mapContains(attributes_string, 'bytes')",
wantErr: false,
},
{
name: "Test handleEmptyValuesInGroupBy",
args: args{
groupBy: []v3.AttributeKey{{Key: "bytes", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}},
},
want: "",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := handleEmptyValuesInGroupBy(tt.args.groupBy)
if (err != nil) != tt.wantErr {
t.Errorf("handleEmptyValuesInGroupBy() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("handleEmptyValuesInGroupBy() = %v, want %v", got, tt.want)
}
})
}
}
func Test_buildTracesQuery(t *testing.T) {
type args struct {
start int64
end int64
step int64
mq *v3.BuilderQuery
panelType v3.PanelType
options v3.QBOptions
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "Test buildTracesQuery",
args: args{
panelType: v3.PanelTypeTable,
start: 1680066360726210000,
end: 1680066458000000000,
step: 1000,
mq: &v3.BuilderQuery{
AggregateOperator: v3.AggregateOperatorCount,
Filters: &v3.FilterSet{
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeInt64}, Value: 100, Operator: ">"},
{Key: v3.AttributeKey{Key: "service.name", Type: v3.AttributeKeyTypeResource, DataType: v3.AttributeKeyDataTypeString}, Value: "myService", Operator: "="},
},
},
GroupBy: []v3.AttributeKey{{Key: "host", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeResource}},
OrderBy: []v3.OrderBy{
{ColumnName: "host", Order: "ASC"}},
},
},
want: "SELECT resources_number['host'] as `host` toFloat64(count()) as value from signoz_traces.distributed_signoz_index_v3 where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') " +
"AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND attributes_number['bytes'] > 100 AND " +
"(resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (seen_at_ts_bucket_start >= 1680064560) AND " +
"(seen_at_ts_bucket_start <= 1680066458) AND simpleJSONExtractString(labels, 'service.name') = 'myService' AND labels like '%service.name%myService%' AND " +
"( (simpleJSONHas(labels, 'host') AND labels like '%host%') ))) " +
"group by `host` order by `host` ASC",
},
{
name: "test noop list view",
args: args{
panelType: v3.PanelTypeList,
start: 1680066360726210000,
end: 1680066458000000000,
mq: &v3.BuilderQuery{
AggregateOperator: v3.AggregateOperatorNoOp,
Filters: &v3.FilterSet{},
SelectColumns: []v3.AttributeKey{{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}},
},
},
want: "SELECT timestamp as timestamp_datetime, spanID, traceID, `name` as `name` from signoz_traces.distributed_signoz_index_v3 where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') " +
"AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) order by timestamp DESC",
},
{
name: "test noop trace view",
args: args{
panelType: v3.PanelTypeTrace,
start: 1680066360726210000,
end: 1680066458000000000,
mq: &v3.BuilderQuery{
AggregateOperator: v3.AggregateOperatorNoOp,
Filters: &v3.FilterSet{
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="},
{Key: v3.AttributeKey{Key: "service.name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: "myService", Operator: "="},
},
},
},
},
want: "SELECT subQuery.serviceName, subQuery.name, count() AS span_count, subQuery.durationNano, subQuery.traceID AS traceID FROM signoz_traces.distributed_signoz_index_v3 INNER JOIN " +
"( SELECT * FROM (SELECT traceID, durationNano, serviceName, name FROM signoz_traces.signoz_index_v3 WHERE parentSpanID = '' AND (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND " +
"(ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND attributes_string['method'] = 'GET' AND (resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource " +
"WHERE (seen_at_ts_bucket_start >= 1680064560) AND (seen_at_ts_bucket_start <= 1680066458) AND simpleJSONExtractString(labels, 'service.name') = 'myService' AND labels like '%service.name%myService%')) " +
"ORDER BY durationNano DESC LIMIT 1 BY traceID LIMIT 100) AS inner_subquery ) AS subQuery ON signoz_traces.distributed_signoz_index_v3.traceID = subQuery.traceID WHERE (timestamp >= '1680066360726210000' AND " +
"timestamp <= '1680066458000000000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) GROUP BY subQuery.traceID, subQuery.durationNano, subQuery.name, subQuery.serviceName ORDER BY " +
"subQuery.durationNano desc LIMIT 1 BY subQuery.traceID;",
},
{
name: "Test order by value with having",
args: args{
panelType: v3.PanelTypeTable,
start: 1680066360726210000,
end: 1680066458000000000,
mq: &v3.BuilderQuery{
AggregateOperator: v3.AggregateOperatorCountDistinct,
Filters: &v3.FilterSet{},
AggregateAttribute: v3.AttributeKey{Key: "name", IsColumn: true, DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag},
OrderBy: []v3.OrderBy{{ColumnName: "#SIGNOZ_VALUE", Order: "ASC"}},
Having: []v3.Having{
{
ColumnName: "name",
Operator: ">",
Value: 10,
},
},
},
},
want: "SELECT toFloat64(count(distinct(`name`))) as value from signoz_traces.distributed_signoz_index_v3 where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND " +
"(ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) having value > 10 order by value ASC",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := buildTracesQuery(tt.args.start, tt.args.end, tt.args.step, tt.args.mq, tt.args.panelType, tt.args.options)
if (err != nil) != tt.wantErr {
t.Errorf("buildTracesQuery() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("buildTracesQuery() = %v, want %v", got, tt.want)
}
})
}
}
func Test_orderByAttributeKeyTags(t *testing.T) {
type args struct {
panelType v3.PanelType
items []v3.OrderBy
tags []v3.AttributeKey
}
tests := []struct {
name string
args args
want string
}{
{
name: "test",
args: args{
panelType: v3.PanelTypeTrace,
items: []v3.OrderBy{{ColumnName: "name", Order: "ASC"}},
tags: []v3.AttributeKey{{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}},
},
want: "`name` ASC",
},
{
name: "test",
args: args{
panelType: v3.PanelTypeList,
items: []v3.OrderBy{{ColumnName: "name", Order: "DESC"}},
tags: []v3.AttributeKey{{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}},
},
want: "`name` DESC",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := orderByAttributeKeyTags(tt.args.panelType, tt.args.items, tt.args.tags); got != tt.want {
t.Errorf("orderByAttributeKeyTags() = %v, want %v", got, tt.want)
}
})
}
}
func TestPrepareTracesQuery(t *testing.T) {
type args struct {
start int64
end int64
panelType v3.PanelType
mq *v3.BuilderQuery
options v3.QBOptions
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "test with limit - first",
args: args{
start: 1680066360726210000,
end: 1680066458000000000,
panelType: v3.PanelTypeTable,
mq: &v3.BuilderQuery{
StepInterval: 60,
AggregateOperator: v3.AggregateOperatorCountDistinct,
Filters: &v3.FilterSet{},
AggregateAttribute: v3.AttributeKey{Key: "name", IsColumn: true, DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag},
GroupBy: []v3.AttributeKey{{Key: "serviceName", IsColumn: true, DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}},
Limit: 10,
},
options: v3.QBOptions{
GraphLimitQtype: constants.FirstQueryGraphLimit,
},
},
want: "SELECT `serviceName` from (SELECT `serviceName` as `serviceName` toFloat64(count(distinct(`name`))) as value from signoz_traces.distributed_signoz_index_v3 " +
"where (timestamp >= '1680066360726180000' AND timestamp <= '1680066457999980000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066457) group by `serviceName`) LIMIT 10",
},
{
name: "test with limit - second",
args: args{
start: 1680066360726210000,
end: 1680066458000000000,
panelType: v3.PanelTypeTable,
mq: &v3.BuilderQuery{
StepInterval: 60,
AggregateOperator: v3.AggregateOperatorCountDistinct,
Filters: &v3.FilterSet{},
AggregateAttribute: v3.AttributeKey{Key: "name", IsColumn: true, DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag},
GroupBy: []v3.AttributeKey{{Key: "serviceName", IsColumn: true, DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}},
Limit: 10,
},
options: v3.QBOptions{
GraphLimitQtype: constants.SecondQueryGraphLimit,
},
},
want: "SELECT `serviceName` as `serviceName` toFloat64(count(distinct(`name`))) as value from signoz_traces.distributed_signoz_index_v3 where " +
"(timestamp >= '1680066360726180000' AND timestamp <= '1680066457999980000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066457) AND (`serviceName`) GLOBAL IN (%s) group by `serviceName`",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := PrepareTracesQuery(tt.args.start, tt.args.end, tt.args.panelType, tt.args.mq, tt.args.options)
if (err != nil) != tt.wantErr {
t.Errorf("PrepareTracesQuery() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("PrepareTracesQuery() = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -0,0 +1,216 @@
package v4
import (
"strconv"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
"go.uber.org/zap"
)
var TracesListViewDefaultSelectedColumns = []v3.AttributeKey{
{
Key: "serviceName",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: true,
},
{
Key: "name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: true,
},
{
Key: "durationNano",
DataType: v3.AttributeKeyDataTypeArrayFloat64,
Type: v3.AttributeKeyTypeTag,
IsColumn: true,
},
{
Key: "httpMethod",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: true,
},
{
Key: "responseStatusCode",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: true,
},
}
// check if traceId filter is used in traces query and return the list of traceIds
func TraceIdFilterUsedWithEqual(params *v3.QueryRangeParamsV3) (bool, []string) {
compositeQuery := params.CompositeQuery
if compositeQuery == nil {
return false, []string{}
}
var traceIds []string
var traceIdFilterUsed bool
// Build queries for each builder query
for queryName, query := range compositeQuery.BuilderQueries {
if query.Expression != queryName && query.DataSource != v3.DataSourceTraces {
continue
}
// check filter attribute
if query.Filters != nil && len(query.Filters.Items) != 0 {
for _, item := range query.Filters.Items {
if item.Key.Key == "traceID" && (item.Operator == v3.FilterOperatorIn ||
item.Operator == v3.FilterOperatorEqual) {
traceIdFilterUsed = true
// validate value
var err error
val := item.Value
val, err = utils.ValidateAndCastValue(val, item.Key.DataType)
if err != nil {
zap.L().Error("invalid value for key", zap.String("key", item.Key.Key), zap.Error(err))
return false, []string{}
}
if val != nil {
fmtVal := extractFormattedStringValues(val)
traceIds = append(traceIds, fmtVal...)
}
}
}
}
}
zap.L().Debug("traceIds", zap.Any("traceIds", traceIds))
return traceIdFilterUsed, traceIds
}
func extractFormattedStringValues(v interface{}) []string {
// if it's pointer convert it to a value
v = getPointerValue(v)
switch x := v.(type) {
case string:
return []string{x}
case []interface{}:
if len(x) == 0 {
return []string{}
}
switch x[0].(type) {
case string:
values := []string{}
for _, val := range x {
values = append(values, val.(string))
}
return values
default:
return []string{}
}
default:
return []string{}
}
}
func getPointerValue(v interface{}) interface{} {
switch x := v.(type) {
case *uint8:
return *x
case *uint16:
return *x
case *uint32:
return *x
case *uint64:
return *x
case *int:
return *x
case *int8:
return *x
case *int16:
return *x
case *int32:
return *x
case *int64:
return *x
case *float32:
return *x
case *float64:
return *x
case *string:
return *x
case *bool:
return *x
case []interface{}:
values := []interface{}{}
for _, val := range x {
values = append(values, getPointerValue(val))
}
return values
default:
return v
}
}
func AddTimestampFilters(minTime int64, maxTime int64, params *v3.QueryRangeParamsV3) {
if minTime == 0 && maxTime == 0 {
return
}
compositeQuery := params.CompositeQuery
if compositeQuery == nil {
return
}
// Build queries for each builder query and apply timestamp filter only if TraceID is present
for queryName, query := range compositeQuery.BuilderQueries {
if query.Expression != queryName && query.DataSource != v3.DataSourceTraces {
continue
}
addTimeStampFilter := false
// check filter attribute
if query.Filters != nil && len(query.Filters.Items) != 0 {
for _, item := range query.Filters.Items {
if item.Key.Key == "traceID" && (item.Operator == v3.FilterOperatorIn ||
item.Operator == v3.FilterOperatorEqual) {
addTimeStampFilter = true
}
}
}
// add timestamp filter to query only if traceID filter along with equal/similar operator is used
if addTimeStampFilter {
timeFilters := []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "timestamp",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
IsColumn: true,
},
Value: strconv.FormatUint(uint64(minTime), 10),
Operator: v3.FilterOperatorGreaterThanOrEq,
},
{
Key: v3.AttributeKey{
Key: "timestamp",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
IsColumn: true,
},
Value: strconv.FormatUint(uint64(maxTime), 10),
Operator: v3.FilterOperatorLessThanOrEq,
},
}
// add new timestamp filter to query
if query.Filters == nil {
query.Filters = &v3.FilterSet{
Items: timeFilters,
}
} else {
query.Filters.Items = append(query.Filters.Items, timeFilters...)
}
}
}
}

View File

@@ -234,7 +234,9 @@ const (
SIGNOZ_EXP_HISTOGRAM_TABLENAME = "distributed_exp_hist"
SIGNOZ_TRACE_DBNAME = "signoz_traces"
SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2"
SIGNOZ_SPAN_INDEX_V3 = "distributed_signoz_index_v3"
SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME = "signoz_index_v2"
SIGNOZ_SPAN_INDEX_V3_LOCAL_TABLENAME = "signoz_index_v3"
SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4"
SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs"
SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day"

View File

@@ -39,6 +39,7 @@ func main() {
var disableRules bool
var useLogsNewSchema bool
var useTraceNewSchema bool
// the url used to build link in the alert messages in slack and other systems
var ruleRepoURL, cacheConfigPath, fluxInterval string
var cluster string
@@ -50,6 +51,7 @@ func main() {
var dialTimeout time.Duration
flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces")
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
@@ -87,6 +89,7 @@ func main() {
FluxInterval: fluxInterval,
Cluster: cluster,
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
}
// Read the jwt secret key

View File

@@ -269,6 +269,23 @@ type SearchSpanResponseItem struct {
SpanKind string `json:"spanKind"`
}
type SearchSpanResponseItemV2 struct {
TimeUnixNano time.Time `json:"timestamp" ch:"timestamp"`
DurationNano uint64 `json:"durationNano" ch:"durationNano"`
SpanID string `json:"spanId" ch:"spanID"`
TraceID string `json:"traceId" ch:"traceID"`
HasError bool `json:"hasError" ch:"hasError"`
Kind int8 `json:"kind" ch:"kind"`
ServiceName string `json:"serviceName" ch:"serviceName"`
Name string `json:"name" ch:"name"`
References string `json:"references,omitempty" ch:"references"`
TagMap map[string]string `json:"tagMap" ch:"attributes_string"`
Events []string `json:"event" ch:"events"`
StatusMessage string `json:"statusMessage" ch:"statusMessage"`
StatusCodeString string `json:"statusCodeString" ch:"statusCodeString"`
SpanKind string `json:"spanKind" ch:"spanKind"`
}
type OtelSpanRef struct {
TraceId string `json:"traceId,omitempty"`
SpanId string `json:"spanId,omitempty"`

View File

@@ -38,7 +38,8 @@ type PrepareTaskOptions struct {
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
UseLogsNewSchema bool
UseLogsNewSchema bool
UseTraceNewSchema bool
}
const taskNamesuffix = "webAppEditor"
@@ -81,7 +82,8 @@ type ManagerOptions struct {
PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
UseLogsNewSchema bool
UseLogsNewSchema bool
UseTraceNewSchema bool
}
// The Manager manages recording and alerting rules.
@@ -104,7 +106,8 @@ type Manager struct {
cache cache.Cache
prepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
UseLogsNewSchema bool
UseLogsNewSchema bool
UseTraceNewSchema bool
}
func defaultOptions(o *ManagerOptions) *ManagerOptions {
@@ -140,6 +143,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.FF,
opts.Reader,
opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
WithEvalDelay(opts.ManagerOpts.EvalDelay),
)
@@ -351,7 +355,8 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error {
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
})
if err != nil {
@@ -473,7 +478,8 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error {
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
})
if err != nil {
@@ -817,6 +823,7 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
m.featureFlags,
m.reader,
m.opts.UseLogsNewSchema,
m.opts.UseTraceNewSchema,
WithSendAlways(),
WithSendUnmatched(),
)

View File

@@ -58,6 +58,7 @@ func NewThresholdRule(
featureFlags interfaces.FeatureLookup,
reader interfaces.Reader,
useLogsNewSchema bool,
useTraceNewSchema bool,
opts ...RuleOption,
) (*ThresholdRule, error) {
@@ -74,19 +75,21 @@ func NewThresholdRule(
}
querierOption := querier.QuerierOptions{
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FeatureLookup: featureFlags,
UseLogsNewSchema: useLogsNewSchema,
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FeatureLookup: featureFlags,
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
}
querierOptsV2 := querierV2.QuerierOptions{
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FeatureLookup: featureFlags,
UseLogsNewSchema: useLogsNewSchema,
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FeatureLookup: featureFlags,
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
}
t.querier = querier.NewQuerier(querierOption)

View File

@@ -791,7 +791,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) {
postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@@ -880,7 +880,7 @@ func TestPrepareLinksToLogs(t *testing.T) {
}
fm := featureManager.StartManager()
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@@ -922,7 +922,7 @@ func TestPrepareLinksToTraces(t *testing.T) {
}
fm := featureManager.StartManager()
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@@ -998,7 +998,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) {
postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@@ -1051,7 +1051,7 @@ func TestThresholdRuleEvalDelay(t *testing.T) {
fm := featureManager.StartManager()
for idx, c := range cases {
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true) // no eval delay
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true) // no eval delay
if err != nil {
assert.NoError(t, err)
}
@@ -1100,7 +1100,7 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) {
fm := featureManager.StartManager()
for idx, c := range cases {
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@@ -1241,9 +1241,9 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
}
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true)
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
@@ -1340,9 +1340,9 @@ func TestThresholdRuleNoData(t *testing.T) {
}
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true)
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
@@ -1445,9 +1445,9 @@ func TestThresholdRuleTracesLink(t *testing.T) {
}
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true)
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
@@ -1570,9 +1570,9 @@ func TestThresholdRuleLogsLink(t *testing.T) {
}
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true)
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,

View File

@@ -46,6 +46,7 @@ func NewMockClickhouseReader(
featureFlags,
"",
true,
true,
)
return reader, mockDB