Compare commits

...

37 Commits

Author SHA1 Message Date
Srikanth Chekuri
43e1fe6863 chore: anomaly fixes 2024-11-16 19:51:38 +05:30
nityanandagohain
a7fa0bb4e4 fix: update pagination logic 2024-11-15 16:33:10 +05:30
nityanandagohain
252e0b698e fix: update pagination logic 2024-11-14 17:40:56 +05:30
nityanandagohain
f64285b89d feat: minor fixes 2024-11-14 16:22:21 +05:30
nityanandagohain
93849ea850 Merge remote-tracking branch 'origin/develop' into feat/trace-v3-poc 2024-11-14 10:15:19 +05:30
nityanandagohain
471bd684c8 fix: add tests 2024-11-13 20:06:34 +05:30
nityanandagohain
16d538e1ba fix: update qb 2024-11-13 18:00:15 +05:30
nityanandagohain
549485bbe9 fix: update pagination logic and few ch column names 2024-11-10 17:18:49 +05:30
nityanandagohain
b843661097 fix: trigger builde 2024-11-08 14:13:31 +05:30
nityanandagohain
80eda3c805 fix: add subquery 2024-11-08 14:02:44 +05:30
nityanandagohain
bb6f027b21 fix: group by with filters 2024-11-08 13:13:18 +05:30
nityanandagohain
0418bfff0e fix: tests 2024-11-08 13:01:01 +05:30
nityanandagohain
aee3ca4fb1 fix: test file added 2024-11-06 09:53:47 +05:30
nityanandagohain
09ff359610 Merge remote-tracking branch 'origin/feat/trace-v3-poc' into feat/trace-v3-poc 2024-11-05 21:45:45 +05:30
nityanandagohain
c5c648748e Merge remote-tracking branch 'origin/develop' into feat/trace-v3-poc 2024-11-05 21:45:02 +05:30
nityanandagohain
f410355088 fix: enrichment using alias 2024-11-05 21:44:39 +05:30
nityanandagohain
4bd531ce08 Merge remote-tracking branch 'origin/develop' into feat/trace-v3-poc 2024-11-04 15:31:22 +05:30
Srikanth Chekuri
895856fa04 Merge branch 'develop' into feat/trace-v3-poc 2024-11-02 11:57:11 +05:30
nityanandagohain
753eb0847e fix: issues in group by 2024-11-01 17:31:23 +05:30
nityanandagohain
25020edfb6 fix: attribute enrichment updated and issue in group by 2024-11-01 15:04:48 +05:30
nityanandagohain
6335d5eb22 Merge remote-tracking branch 'origin/develop' into feat/trace-v3-poc 2024-11-01 12:22:08 +05:30
nityanandagohain
e5d425f06e fix: searchTraces 2024-10-25 18:06:33 +05:30
nityanandagohain
aeeb77bbc1 feat: support for faster trace detail 2024-10-25 16:13:44 +05:30
nityanandagohain
fa6fda0497 Merge remote-tracking branch 'origin/feat/trace-v3-poc' into feat/trace-v3-poc 2024-10-25 13:10:59 +05:30
nityanandagohain
bb41435a20 fix: add support for window based pagination 2024-10-25 13:06:54 +05:30
Srikanth Chekuri
dd23e4ebf7 Merge branch 'develop' into feat/trace-v3-poc 2024-10-24 12:45:03 +05:30
nityanandagohain
16a7717598 fix: minor fixes to use the new table in api's and querier 2024-10-24 12:29:29 +05:30
nityanandagohain
4749ec18bc fix: services page 2024-10-23 23:59:20 +05:30
nityanandagohain
1487820750 fix: use correct prepQUery 2024-10-23 18:06:25 +05:30
nityanandagohain
fd09f57f76 fix: tests 2024-10-23 17:44:18 +05:30
nityanandagohain
9bc7c8708a fix: add servicename resource filter 2024-10-23 10:49:35 +05:30
nityanandagohain
2115093876 fix: get trace by id api updated 2024-10-22 22:50:57 +05:30
nityanandagohain
33f4d8306d Merge remote-tracking branch 'origin/develop' into feat/trace-v3-poc 2024-10-22 17:14:03 +05:30
nityanandagohain
dbf5f8b77a fix: integrate with querier 2024-10-22 14:06:55 +05:30
nityanandagohain
bfc46790bb Merge remote-tracking branch 'origin/develop' into feat/trace-v3-poc 2024-10-22 10:21:03 +05:30
nityanandagohain
7a011f3460 fix: add remaining files 2024-10-22 10:20:10 +05:30
nityanandagohain
2c30e1493f feat: trace v4 inital commit 2024-10-21 18:20:48 +05:30
34 changed files with 1690 additions and 274 deletions

View File

@@ -38,9 +38,10 @@ type APIHandlerOptions struct {
Cache cache.Cache Cache cache.Cache
Gateway *httputil.ReverseProxy Gateway *httputil.ReverseProxy
// Querier Influx Interval // Querier Influx Interval
FluxInterval time.Duration FluxInterval time.Duration
UseLogsNewSchema bool UseLogsNewSchema bool
UseLicensesV3 bool UseTraceNewSchema bool
UseLicensesV3 bool
} }
type APIHandler struct { type APIHandler struct {
@@ -66,6 +67,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
Cache: opts.Cache, Cache: opts.Cache,
FluxInterval: opts.FluxInterval, FluxInterval: opts.FluxInterval,
UseLogsNewSchema: opts.UseLogsNewSchema, UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
UseLicensesV3: opts.UseLicensesV3, UseLicensesV3: opts.UseLicensesV3,
}) })

View File

@@ -2,32 +2,31 @@ package api
import ( import (
"net/http" "net/http"
"go.signoz.io/signoz/ee/query-service/app/db"
"go.signoz.io/signoz/ee/query-service/model"
baseapp "go.signoz.io/signoz/pkg/query-service/app"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
) )
func (ah *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) { func (ah *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) {
if !ah.CheckFeature(basemodel.SmartTraceDetail) { ah.APIHandler.SearchTraces(w, r)
zap.L().Info("SmartTraceDetail feature is not enabled in this plan") return
ah.APIHandler.SearchTraces(w, r)
return
}
searchTracesParams, err := baseapp.ParseSearchTracesParams(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params")
return
}
result, err := ah.opts.DataConnector.SearchTraces(r.Context(), searchTracesParams, db.SmartTraceAlgorithm) // This is commented since this will be taken care by new trace API
if ah.HandleError(w, err, http.StatusBadRequest) {
return
}
ah.WriteJSON(w, r, result) // if !ah.CheckFeature(basemodel.SmartTraceDetail) {
// zap.L().Info("SmartTraceDetail feature is not enabled in this plan")
// ah.APIHandler.SearchTraces(w, r)
// return
// }
// searchTracesParams, err := baseapp.ParseSearchTracesParams(r)
// if err != nil {
// RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params")
// return
// }
// result, err := ah.opts.DataConnector.SearchTraces(r.Context(), searchTracesParams, db.SmartTraceAlgorithm)
// if ah.HandleError(w, err, http.StatusBadRequest) {
// return
// }
// ah.WriteJSON(w, r, result)
} }

View File

@@ -26,8 +26,9 @@ func NewDataConnector(
dialTimeout time.Duration, dialTimeout time.Duration,
cluster string, cluster string,
useLogsNewSchema bool, useLogsNewSchema bool,
useTraceNewSchema bool,
) *ClickhouseReader { ) *ClickhouseReader {
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema) ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema)
return &ClickhouseReader{ return &ClickhouseReader{
conn: ch.GetConn(), conn: ch.GetConn(),
appdb: localDB, appdb: localDB,

View File

@@ -77,6 +77,7 @@ type ServerOptions struct {
Cluster string Cluster string
GatewayUrl string GatewayUrl string
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
UseLicensesV3 bool UseLicensesV3 bool
} }
@@ -156,6 +157,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.DialTimeout, serverOptions.DialTimeout,
serverOptions.Cluster, serverOptions.Cluster,
serverOptions.UseLogsNewSchema, serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
) )
go qb.Start(readerReady) go qb.Start(readerReady)
reader = qb reader = qb
@@ -189,6 +191,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.DisableRules, serverOptions.DisableRules,
lm, lm,
serverOptions.UseLogsNewSchema, serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
) )
if err != nil { if err != nil {
@@ -270,6 +273,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
FluxInterval: fluxInterval, FluxInterval: fluxInterval,
Gateway: gatewayProxy, Gateway: gatewayProxy,
UseLogsNewSchema: serverOptions.UseLogsNewSchema, UseLogsNewSchema: serverOptions.UseLogsNewSchema,
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
UseLicensesV3: serverOptions.UseLicensesV3, UseLicensesV3: serverOptions.UseLicensesV3,
} }
@@ -737,7 +741,8 @@ func makeRulesManager(
cache cache.Cache, cache cache.Cache,
disableRules bool, disableRules bool,
fm baseint.FeatureLookup, fm baseint.FeatureLookup,
useLogsNewSchema bool) (*baserules.Manager, error) { useLogsNewSchema bool,
useTraceNewSchema bool) (*baserules.Manager, error) {
// create engine // create engine
pqle, err := pqle.FromConfigPath(promConfigPath) pqle, err := pqle.FromConfigPath(promConfigPath)
@@ -767,8 +772,9 @@ func makeRulesManager(
EvalDelay: baseconst.GetEvalDelay(), EvalDelay: baseconst.GetEvalDelay(),
PrepareTaskFunc: rules.PrepareTaskFunc, PrepareTaskFunc: rules.PrepareTaskFunc,
PrepareTestRuleFunc: rules.TestNotification,
UseLogsNewSchema: useLogsNewSchema, UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
PrepareTestRuleFunc: rules.TestNotification,
} }
// create Manager // create Manager

View File

@@ -94,6 +94,7 @@ func main() {
var cluster string var cluster string
var useLogsNewSchema bool var useLogsNewSchema bool
var useTraceNewSchema bool
var useLicensesV3 bool var useLicensesV3 bool
var cacheConfigPath, fluxInterval string var cacheConfigPath, fluxInterval string
var enableQueryServiceLogOTLPExport bool var enableQueryServiceLogOTLPExport bool
@@ -105,6 +106,7 @@ func main() {
var gatewayUrl string var gatewayUrl string
flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs") flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces")
flag.BoolVar(&useLicensesV3, "use-licenses-v3", false, "use licenses_v3 schema for licenses") flag.BoolVar(&useLicensesV3, "use-licenses-v3", false, "use licenses_v3 schema for licenses")
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
@@ -145,6 +147,7 @@ func main() {
Cluster: cluster, Cluster: cluster,
GatewayUrl: gatewayUrl, GatewayUrl: gatewayUrl,
UseLogsNewSchema: useLogsNewSchema, UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
UseLicensesV3: useLicensesV3, UseLicensesV3: useLicensesV3,
} }

View File

@@ -61,6 +61,11 @@ func NewAnomalyRule(
zap.L().Info("creating new AnomalyRule", zap.String("id", id), zap.Any("opts", opts)) zap.L().Info("creating new AnomalyRule", zap.String("id", id), zap.Any("opts", opts))
if p.RuleCondition.CompareOp == baserules.ValueIsBelow {
target := -1 * *p.RuleCondition.Target
p.RuleCondition.Target = &target
}
baseRule, err := baserules.NewBaseRule(id, p, reader, opts...) baseRule, err := baserules.NewBaseRule(id, p, reader, opts...)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -26,6 +26,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.FF, opts.FF,
opts.Reader, opts.Reader,
opts.UseLogsNewSchema, opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay), baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
) )
@@ -122,6 +123,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
opts.FF, opts.FF,
opts.Reader, opts.Reader,
opts.UseLogsNewSchema, opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
baserules.WithSendAlways(), baserules.WithSendAlways(),
baserules.WithSendUnmatched(), baserules.WithSendUnmatched(),
) )

View File

@@ -102,9 +102,9 @@ function RuleOptions({
<Select.Option value="4">{t('option_notequal')}</Select.Option> <Select.Option value="4">{t('option_notequal')}</Select.Option>
</> </>
)} )}
{/* the value 5 and 6 are reserved for above or equal and below or equal */}
{ruleType === 'anomaly_rule' && ( {ruleType === 'anomaly_rule' && (
<Select.Option value="5">{t('option_above_below')}</Select.Option> <Select.Option value="7">{t('option_above_below')}</Select.Option>
)} )}
</InlineSelect> </InlineSelect>
); );

View File

@@ -22,6 +22,7 @@ const (
defaultTraceDB string = "signoz_traces" defaultTraceDB string = "signoz_traces"
defaultOperationsTable string = "distributed_signoz_operations" defaultOperationsTable string = "distributed_signoz_operations"
defaultIndexTable string = "distributed_signoz_index_v2" defaultIndexTable string = "distributed_signoz_index_v2"
defaultLocalIndexTable string = "signoz_index_v2"
defaultErrorTable string = "distributed_signoz_error_index_v2" defaultErrorTable string = "distributed_signoz_error_index_v2"
defaultDurationTable string = "distributed_durationSort" defaultDurationTable string = "distributed_durationSort"
defaultUsageExplorerTable string = "distributed_usage_explorer" defaultUsageExplorerTable string = "distributed_usage_explorer"
@@ -45,6 +46,11 @@ const (
defaultLogsTableV2 string = "distributed_logs_v2" defaultLogsTableV2 string = "distributed_logs_v2"
defaultLogsResourceLocalTableV2 string = "logs_v2_resource" defaultLogsResourceLocalTableV2 string = "logs_v2_resource"
defaultLogsResourceTableV2 string = "distributed_logs_v2_resource" defaultLogsResourceTableV2 string = "distributed_logs_v2_resource"
defaultTraceIndexTableV3 string = "distributed_signoz_index_v3"
defaultTraceLocalTableName string = "signoz_index_v3"
defaultTraceResourceTableV3 string = "distributed_traces_v3_resource"
defaultTraceSummaryTable string = "distributed_trace_summary"
) )
// NamespaceConfig is Clickhouse's internal configuration data // NamespaceConfig is Clickhouse's internal configuration data
@@ -58,6 +64,7 @@ type namespaceConfig struct {
TraceDB string TraceDB string
OperationsTable string OperationsTable string
IndexTable string IndexTable string
LocalIndexTable string
DurationTable string DurationTable string
UsageExplorerTable string UsageExplorerTable string
SpansTable string SpansTable string
@@ -82,6 +89,11 @@ type namespaceConfig struct {
LogsTableV2 string LogsTableV2 string
LogsResourceLocalTableV2 string LogsResourceLocalTableV2 string
LogsResourceTableV2 string LogsResourceTableV2 string
TraceIndexTableV3 string
TraceLocalTableNameV3 string
TraceResourceTableV3 string
TraceSummaryTable string
} }
// Connecto defines how to connect to the database // Connecto defines how to connect to the database
@@ -150,6 +162,7 @@ func NewOptions(
TraceDB: defaultTraceDB, TraceDB: defaultTraceDB,
OperationsTable: defaultOperationsTable, OperationsTable: defaultOperationsTable,
IndexTable: defaultIndexTable, IndexTable: defaultIndexTable,
LocalIndexTable: defaultLocalIndexTable,
ErrorTable: defaultErrorTable, ErrorTable: defaultErrorTable,
DurationTable: defaultDurationTable, DurationTable: defaultDurationTable,
UsageExplorerTable: defaultUsageExplorerTable, UsageExplorerTable: defaultUsageExplorerTable,
@@ -174,6 +187,11 @@ func NewOptions(
LogsLocalTableV2: defaultLogsLocalTableV2, LogsLocalTableV2: defaultLogsLocalTableV2,
LogsResourceTableV2: defaultLogsResourceTableV2, LogsResourceTableV2: defaultLogsResourceTableV2,
LogsResourceLocalTableV2: defaultLogsResourceLocalTableV2, LogsResourceLocalTableV2: defaultLogsResourceLocalTableV2,
TraceIndexTableV3: defaultTraceIndexTableV3,
TraceLocalTableNameV3: defaultTraceLocalTableName,
TraceResourceTableV3: defaultTraceResourceTableV3,
TraceSummaryTable: defaultTraceSummaryTable,
}, },
others: make(map[string]*namespaceConfig, len(otherNamespaces)), others: make(map[string]*namespaceConfig, len(otherNamespaces)),
} }

View File

@@ -145,9 +145,16 @@ type ClickHouseReader struct {
liveTailRefreshSeconds int liveTailRefreshSeconds int
cluster string cluster string
useLogsNewSchema bool useLogsNewSchema bool
useTraceNewSchema bool
logsTableName string logsTableName string
logsLocalTableName string logsLocalTableName string
traceTableName string
traceLocalTableName string
traceResourceTableV3 string
traceSummaryTable string
} }
// NewTraceReader returns a TraceReader for the database // NewTraceReader returns a TraceReader for the database
@@ -160,6 +167,7 @@ func NewReader(
dialTimeout time.Duration, dialTimeout time.Duration,
cluster string, cluster string,
useLogsNewSchema bool, useLogsNewSchema bool,
useTraceNewSchema bool,
) *ClickHouseReader { ) *ClickHouseReader {
datasource := os.Getenv("ClickHouseUrl") datasource := os.Getenv("ClickHouseUrl")
@@ -170,7 +178,7 @@ func NewReader(
zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err)) zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err))
} }
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema) return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema)
} }
func NewReaderFromClickhouseConnection( func NewReaderFromClickhouseConnection(
@@ -181,6 +189,7 @@ func NewReaderFromClickhouseConnection(
featureFlag interfaces.FeatureLookup, featureFlag interfaces.FeatureLookup,
cluster string, cluster string,
useLogsNewSchema bool, useLogsNewSchema bool,
useTraceNewSchema bool,
) *ClickHouseReader { ) *ClickHouseReader {
alertManager, err := am.New() alertManager, err := am.New()
if err != nil { if err != nil {
@@ -218,6 +227,13 @@ func NewReaderFromClickhouseConnection(
logsLocalTableName = options.primary.LogsLocalTableV2 logsLocalTableName = options.primary.LogsLocalTableV2
} }
traceTableName := options.primary.IndexTable
traceLocalTableName := options.primary.LocalIndexTable
if useTraceNewSchema {
traceTableName = options.primary.TraceIndexTableV3
traceLocalTableName = options.primary.TraceLocalTableNameV3
}
return &ClickHouseReader{ return &ClickHouseReader{
db: wrap, db: wrap,
localDB: localDB, localDB: localDB,
@@ -245,7 +261,8 @@ func NewReaderFromClickhouseConnection(
cluster: cluster, cluster: cluster,
queryProgressTracker: queryprogress.NewQueryProgressTracker(), queryProgressTracker: queryprogress.NewQueryProgressTracker(),
useLogsNewSchema: useLogsNewSchema, useLogsNewSchema: useLogsNewSchema,
useTraceNewSchema: useTraceNewSchema,
logsTableV2: options.primary.LogsTableV2, logsTableV2: options.primary.LogsTableV2,
logsLocalTableV2: options.primary.LogsLocalTableV2, logsLocalTableV2: options.primary.LogsLocalTableV2,
@@ -253,6 +270,11 @@ func NewReaderFromClickhouseConnection(
logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2, logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2,
logsTableName: logsTableName, logsTableName: logsTableName,
logsLocalTableName: logsLocalTableName, logsLocalTableName: logsLocalTableName,
traceLocalTableName: traceLocalTableName,
traceTableName: traceTableName,
traceResourceTableV3: options.primary.TraceResourceTableV3,
traceSummaryTable: options.primary.TraceSummaryTable,
} }
} }
@@ -463,9 +485,8 @@ func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model
} }
func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, error) { func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, error) {
services := []string{} services := []string{}
query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.indexTable) query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE ts_bucket_start > (toUnixTimestamp(now()) - 1800) AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.traceLocalTableName)
rows, err := r.db.Query(ctx, query) rows, err := r.db.Query(ctx, query)
@@ -574,14 +595,14 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
count(*) as numCalls count(*) as numCalls
FROM %s.%s FROM %s.%s
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`, WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`,
r.TraceDB, r.indexTable, r.TraceDB, r.traceTableName,
) )
errorQuery := fmt.Sprintf( errorQuery := fmt.Sprintf(
`SELECT `SELECT
count(*) as numErrors count(*) as numErrors
FROM %s.%s FROM %s.%s
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`, WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`,
r.TraceDB, r.indexTable, r.TraceDB, r.traceTableName,
) )
args := []interface{}{} args := []interface{}{}
@@ -591,6 +612,17 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
clickhouse.Named("serviceName", svc), clickhouse.Named("serviceName", svc),
clickhouse.Named("names", ops), clickhouse.Named("names", ops),
) )
if r.useTraceNewSchema {
bFilter := " AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket"
query += bFilter
errorQuery += bFilter
args = append(args,
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
)
}
// create TagQuery from TagQueryParams // create TagQuery from TagQueryParams
tags := createTagQueryFromTagQueryParams(queryParams.Tags) tags := createTagQueryFromTagQueryParams(queryParams.Tags)
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
@@ -673,7 +705,7 @@ func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *
count(*) as numCalls count(*) as numCalls
FROM %s.%s FROM %s.%s
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`, WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`,
r.TraceDB, r.indexTable, r.TraceDB, r.traceTableName,
) )
args := []interface{}{} args := []interface{}{}
args = append(args, namedArgs...) args = append(args, namedArgs...)
@@ -704,7 +736,7 @@ func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *
count(*) as numErrors count(*) as numErrors
FROM %s.%s FROM %s.%s
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`, WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`,
r.TraceDB, r.indexTable, r.TraceDB, r.traceTableName,
) )
args = []interface{}{} args = []interface{}{}
args = append(args, namedArgs...) args = append(args, namedArgs...)
@@ -841,7 +873,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
case constants.TraceID: case constants.TraceID:
continue continue
case constants.ServiceName: case constants.ServiceName:
finalQuery := fmt.Sprintf("SELECT serviceName, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery := fmt.Sprintf("SELECT serviceName, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
finalQuery += query finalQuery += query
finalQuery += " GROUP BY serviceName" finalQuery += " GROUP BY serviceName"
var dBResponse []model.DBResponseServiceName var dBResponse []model.DBResponseServiceName
@@ -858,7 +890,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
} }
} }
case constants.HttpRoute: case constants.HttpRoute:
finalQuery := fmt.Sprintf("SELECT httpRoute, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery := fmt.Sprintf("SELECT httpRoute, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
finalQuery += query finalQuery += query
finalQuery += " GROUP BY httpRoute" finalQuery += " GROUP BY httpRoute"
var dBResponse []model.DBResponseHttpRoute var dBResponse []model.DBResponseHttpRoute
@@ -875,7 +907,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
} }
} }
case constants.HttpUrl: case constants.HttpUrl:
finalQuery := fmt.Sprintf("SELECT httpUrl, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery := fmt.Sprintf("SELECT httpUrl, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
finalQuery += query finalQuery += query
finalQuery += " GROUP BY httpUrl" finalQuery += " GROUP BY httpUrl"
var dBResponse []model.DBResponseHttpUrl var dBResponse []model.DBResponseHttpUrl
@@ -892,7 +924,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
} }
} }
case constants.HttpMethod: case constants.HttpMethod:
finalQuery := fmt.Sprintf("SELECT httpMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery := fmt.Sprintf("SELECT httpMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
finalQuery += query finalQuery += query
finalQuery += " GROUP BY httpMethod" finalQuery += " GROUP BY httpMethod"
var dBResponse []model.DBResponseHttpMethod var dBResponse []model.DBResponseHttpMethod
@@ -909,7 +941,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
} }
} }
case constants.HttpHost: case constants.HttpHost:
finalQuery := fmt.Sprintf("SELECT httpHost, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery := fmt.Sprintf("SELECT httpHost, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
finalQuery += query finalQuery += query
finalQuery += " GROUP BY httpHost" finalQuery += " GROUP BY httpHost"
var dBResponse []model.DBResponseHttpHost var dBResponse []model.DBResponseHttpHost
@@ -926,7 +958,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
} }
} }
case constants.OperationRequest: case constants.OperationRequest:
finalQuery := fmt.Sprintf("SELECT name, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery := fmt.Sprintf("SELECT name, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
finalQuery += query finalQuery += query
finalQuery += " GROUP BY name" finalQuery += " GROUP BY name"
var dBResponse []model.DBResponseOperation var dBResponse []model.DBResponseOperation
@@ -943,7 +975,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
} }
} }
case constants.Status: case constants.Status:
finalQuery := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = true", r.TraceDB, r.indexTable) finalQuery := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = true", r.TraceDB, r.traceTableName)
finalQuery += query finalQuery += query
var dBResponse []model.DBResponseTotal var dBResponse []model.DBResponseTotal
err := r.db.Select(ctx, &dBResponse, finalQuery, args...) err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
@@ -954,7 +986,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
} }
finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = false", r.TraceDB, r.indexTable) finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = false", r.TraceDB, r.traceTableName)
finalQuery2 += query finalQuery2 += query
var dBResponse2 []model.DBResponseTotal var dBResponse2 []model.DBResponseTotal
err = r.db.Select(ctx, &dBResponse2, finalQuery2, args...) err = r.db.Select(ctx, &dBResponse2, finalQuery2, args...)
@@ -979,7 +1011,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
finalQuery := "" finalQuery := ""
if !durationSortEnabled { if !durationSortEnabled {
// if duration sort is not enabled, we need to get the min and max duration from the index table // if duration sort is not enabled, we need to get the min and max duration from the index table
finalQuery = fmt.Sprintf("SELECT min(durationNano) as min, max(durationNano) as max FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery = fmt.Sprintf("SELECT min(durationNano) as min, max(durationNano) as max FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
finalQuery += query finalQuery += query
var dBResponse []model.DBResponseMinMax var dBResponse []model.DBResponseMinMax
err = r.db.Select(ctx, &dBResponse, finalQuery, args...) err = r.db.Select(ctx, &dBResponse, finalQuery, args...)
@@ -1024,7 +1056,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
} }
} }
case constants.RPCMethod: case constants.RPCMethod:
finalQuery := fmt.Sprintf("SELECT rpcMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery := fmt.Sprintf("SELECT rpcMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
finalQuery += query finalQuery += query
finalQuery += " GROUP BY rpcMethod" finalQuery += " GROUP BY rpcMethod"
var dBResponse []model.DBResponseRPCMethod var dBResponse []model.DBResponseRPCMethod
@@ -1042,7 +1074,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
} }
case constants.ResponseStatusCode: case constants.ResponseStatusCode:
finalQuery := fmt.Sprintf("SELECT responseStatusCode, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery := fmt.Sprintf("SELECT responseStatusCode, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
finalQuery += query finalQuery += query
finalQuery += " GROUP BY responseStatusCode" finalQuery += " GROUP BY responseStatusCode"
var dBResponse []model.DBResponseStatusCodeMethod var dBResponse []model.DBResponseStatusCodeMethod
@@ -1090,7 +1122,7 @@ func getStatusFilters(query string, statusParams []string, excludeMap map[string
func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) { func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) {
queryTable := fmt.Sprintf("%s.%s", r.TraceDB, r.indexTable) queryTable := fmt.Sprintf("%s.%s", r.TraceDB, r.traceTableName)
excludeMap := make(map[string]struct{}) excludeMap := make(map[string]struct{})
for _, e := range queryParams.Exclude { for _, e := range queryParams.Exclude {
@@ -1436,8 +1468,8 @@ func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model
tagFilters := []model.TagFilters{} tagFilters := []model.TagFilters{}
// Alternative finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) // Alternative finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, indexTable)
finalQuery := fmt.Sprintf(`SELECT groupUniqArrayArray(mapKeys(stringTagMap)) as stringTagKeys, groupUniqArrayArray(mapKeys(numberTagMap)) as numberTagKeys, groupUniqArrayArray(mapKeys(boolTagMap)) as boolTagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) finalQuery := fmt.Sprintf(`SELECT groupUniqArrayArray(mapKeys(stringTagMap)) as stringTagKeys, groupUniqArrayArray(mapKeys(numberTagMap)) as numberTagKeys, groupUniqArrayArray(mapKeys(boolTagMap)) as boolTagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.traceTableName)
finalQuery += query finalQuery += query
err := r.db.Select(ctx, &tagFilters, finalQuery, args...) err := r.db.Select(ctx, &tagFilters, finalQuery, args...)
@@ -1548,7 +1580,7 @@ func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model.
tagValues := []model.TagValues{} tagValues := []model.TagValues{}
finalQuery := fmt.Sprintf(`SELECT groupArray(DISTINCT stringTagMap[@key]) as stringTagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) finalQuery := fmt.Sprintf(`SELECT groupArray(DISTINCT attributes_string[@key]) as stringTagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.traceTableName)
finalQuery += query finalQuery += query
finalQuery += " LIMIT @limit" finalQuery += " LIMIT @limit"
@@ -1599,7 +1631,7 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo
name name
FROM %s.%s FROM %s.%s
WHERE serviceName = @serviceName AND timestamp>= @start AND timestamp<= @end`, WHERE serviceName = @serviceName AND timestamp>= @start AND timestamp<= @end`,
r.TraceDB, r.indexTable, r.TraceDB, r.traceTableName,
) )
args := []interface{}{} args := []interface{}{}
args = append(args, namedArgs...) args = append(args, namedArgs...)
@@ -1666,10 +1698,137 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU
return &usageItems, nil return &usageItems, nil
} }
func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) {
searchSpansResult := []model.SearchSpansResult{
{
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"},
IsSubTree: false,
Events: make([][]interface{}, 0),
},
}
var traceSummary model.TraceSummary
summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE traceID=$1", r.TraceDB, r.traceSummaryTable)
err := r.db.QueryRow(ctx, summaryQuery, params.TraceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans)
if err != nil {
if err == sql.ErrNoRows {
return &searchSpansResult, nil
}
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, fmt.Errorf("error in processing sql query")
}
if traceSummary.NumSpans > uint64(params.MaxSpansInTrace) {
zap.L().Error("Max spans allowed in a trace limit reached", zap.Int("MaxSpansInTrace", params.MaxSpansInTrace),
zap.Uint64("Count", traceSummary.NumSpans))
userEmail, err := auth.GetEmailFromJwt(ctx)
if err == nil {
data := map[string]interface{}{
"traceSize": traceSummary.NumSpans,
"maxSpansInTraceLimit": params.MaxSpansInTrace,
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_MAX_SPANS_ALLOWED_LIMIT_REACHED, data, userEmail, true, false)
}
return nil, fmt.Errorf("max spans allowed in trace limit reached, please contact support for more details")
}
userEmail, err := auth.GetEmailFromJwt(ctx)
if err == nil {
data := map[string]interface{}{
"traceSize": traceSummary.NumSpans,
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_TRACE_DETAIL_API, data, userEmail, true, false)
}
var startTime, endTime, durationNano uint64
var searchScanResponses []model.SearchSpanResponseItemV2
query := fmt.Sprintf("SELECT timestamp, durationNano, spanID, traceID, hasError, kind, serviceName, name, references, attributes_string, events, statusMessage, statusCodeString, spanKind FROM %s.%s WHERE traceID=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3", r.TraceDB, r.traceTableName)
start := time.Now()
err = r.db.Select(ctx, &searchScanResponses, query, params.TraceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10))
zap.L().Info(query)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, fmt.Errorf("error in processing sql query")
}
end := time.Now()
zap.L().Debug("getTraceSQLQuery took: ", zap.Duration("duration", end.Sub(start)))
searchSpansResult[0].Events = make([][]interface{}, len(searchScanResponses))
searchSpanResponses := []model.SearchSpanResponseItem{}
start = time.Now()
for _, item := range searchScanResponses {
ref := []model.OtelSpanRef{}
err := json.Unmarshal([]byte(item.References), &ref)
if err != nil {
zap.L().Error("Error unmarshalling references", zap.Error(err))
return nil, err
}
// merge attributes_number and attributes_bool to attributes_string
for k, v := range item.Attributes_bool {
item.Attributes_string[k] = fmt.Sprintf("%v", v)
}
for k, v := range item.Attributes_number {
item.Attributes_string[k] = fmt.Sprintf("%v", v)
}
jsonItem := model.SearchSpanResponseItem{
SpanID: item.SpanID,
TraceID: item.TraceID,
ServiceName: item.ServiceName,
Name: item.Name,
Kind: int32(item.Kind),
DurationNano: int64(item.DurationNano),
HasError: item.HasError,
StatusMessage: item.StatusMessage,
StatusCodeString: item.StatusCodeString,
SpanKind: item.SpanKind,
References: ref,
Events: item.Events,
TagMap: item.Attributes_string,
}
jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000)
searchSpanResponses = append(searchSpanResponses, jsonItem)
if startTime == 0 || jsonItem.TimeUnixNano < startTime {
startTime = jsonItem.TimeUnixNano
}
if endTime == 0 || jsonItem.TimeUnixNano > endTime {
endTime = jsonItem.TimeUnixNano
}
if durationNano == 0 || uint64(jsonItem.DurationNano) > durationNano {
durationNano = uint64(jsonItem.DurationNano)
}
}
end = time.Now()
zap.L().Debug("getTraceSQLQuery unmarshal took: ", zap.Duration("duration", end.Sub(start)))
for i, item := range searchSpanResponses {
spanEvents := item.GetValues()
searchSpansResult[0].Events[i] = spanEvents
}
searchSpansResult[0].StartTimestampMillis = startTime - (durationNano / 1000000)
searchSpansResult[0].EndTimestampMillis = endTime + (durationNano / 1000000)
return &searchSpansResult, nil
}
func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams, func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams,
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string, smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) { levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
if r.useTraceNewSchema {
return r.SearchTracesV2(ctx, params)
}
var countSpans uint64 var countSpans uint64
countQuery := fmt.Sprintf("SELECT count() as count from %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable) countQuery := fmt.Sprintf("SELECT count() as count from %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable)
err := r.db.QueryRow(ctx, countQuery, params.TraceID).Scan(&countSpans) err := r.db.QueryRow(ctx, countQuery, params.TraceID).Scan(&countSpans)
@@ -1746,6 +1905,7 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.Searc
err = r.featureFlags.CheckFeature(model.SmartTraceDetail) err = r.featureFlags.CheckFeature(model.SmartTraceDetail)
smartAlgoEnabled := err == nil smartAlgoEnabled := err == nil
// TODO(nitya): this will never run remove it
if len(searchScanResponses) > params.SpansRenderLimit && smartAlgoEnabled { if len(searchScanResponses) > params.SpansRenderLimit && smartAlgoEnabled {
start = time.Now() start = time.Now()
searchSpansResult, err = smartTraceAlgorithm(searchSpanResponses, params.SpanID, params.LevelUp, params.LevelDown, params.SpansRenderLimit) searchSpansResult, err = smartTraceAlgorithm(searchSpanResponses, params.SpanID, params.LevelUp, params.LevelDown, params.SpansRenderLimit)
@@ -1824,7 +1984,6 @@ func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *
} }
func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, queryParams *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) { func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, queryParams *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) {
excludeMap := make(map[string]struct{}) excludeMap := make(map[string]struct{})
for _, e := range queryParams.Exclude { for _, e := range queryParams.Exclude {
if e == constants.OperationRequest { if e == constants.OperationRequest {
@@ -1870,7 +2029,7 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query
// Using %s for groupBy params as it can be a custom column and custom columns are not supported by clickhouse-go yet: // Using %s for groupBy params as it can be a custom column and custom columns are not supported by clickhouse-go yet:
// issue link: https://github.com/ClickHouse/clickhouse-go/issues/870 // issue link: https://github.com/ClickHouse/clickhouse-go/issues/870
if queryParams.GroupBy != "" && columnExists { if queryParams.GroupBy != "" && columnExists {
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, queryParams.GroupBy, aggregation_query, r.TraceDB, r.indexTable) query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, queryParams.GroupBy, aggregation_query, r.TraceDB, r.traceTableName)
args = append(args, clickhouse.Named("groupByVar", queryParams.GroupBy)) args = append(args, clickhouse.Named("groupByVar", queryParams.GroupBy))
} else if queryParams.GroupBy != "" { } else if queryParams.GroupBy != "" {
customStr = strings.Split(queryParams.GroupBy, ".(") customStr = strings.Split(queryParams.GroupBy, ".(")
@@ -1878,17 +2037,17 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)} return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)}
} }
if customStr[1] == string(model.TagTypeString)+")" { if customStr[1] == string(model.TagTypeString)+")" {
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, stringTagMap['%s'] as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable) query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, stringTagMap['%s'] as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.traceTableName)
} else if customStr[1] == string(model.TagTypeNumber)+")" { } else if customStr[1] == string(model.TagTypeNumber)+")" {
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(numberTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable) query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(numberTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.traceTableName)
} else if customStr[1] == string(model.TagTypeBool)+")" { } else if customStr[1] == string(model.TagTypeBool)+")" {
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(boolTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable) query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(boolTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.traceTableName)
} else { } else {
// return error for unsupported group by // return error for unsupported group by
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)} return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)}
} }
} else { } else {
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.traceTableName)
} }
if len(queryParams.TraceID) > 0 { if len(queryParams.TraceID) > 0 {
@@ -3056,11 +3215,10 @@ func (r *ClickHouseReader) GetLogsInfoInLastHeartBeatInterval(ctx context.Contex
} }
func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (*model.TagsInfo, error) { func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (*model.TagsInfo, error) {
queryStr := fmt.Sprintf(`select serviceName, stringTagMap['deployment.environment'] as env, queryStr := fmt.Sprintf(`select serviceName, stringTagMap['deployment.environment'] as env,
stringTagMap['telemetry.sdk.language'] as language from %s.%s stringTagMap['telemetry.sdk.language'] as language from %s.%s
where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d)) where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d))
group by serviceName, env, language;`, r.TraceDB, r.indexTable, int(interval.Minutes())) group by serviceName, env, language;`, r.TraceDB, r.traceTableName, int(interval.Minutes()))
tagTelemetryDataList := []model.TagTelemetryData{} tagTelemetryDataList := []model.TagTelemetryData{}
err := r.db.Select(ctx, &tagTelemetryDataList, queryStr) err := r.db.Select(ctx, &tagTelemetryDataList, queryStr)
@@ -4575,8 +4733,6 @@ func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req
if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil { if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
} }
// TODO: Remove this once the column name are updated in the table
tagKey = tempHandleFixedColumns(tagKey)
key := v3.AttributeKey{ key := v3.AttributeKey{
Key: tagKey, Key: tagKey,
DataType: v3.AttributeKeyDataType(dataType), DataType: v3.AttributeKeyDataType(dataType),
@@ -4616,8 +4772,6 @@ func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.Fi
if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil { if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
} }
// TODO: Remove this once the column name are updated in the table
tagKey = tempHandleFixedColumns(tagKey)
key := v3.AttributeKey{ key := v3.AttributeKey{
Key: tagKey, Key: tagKey,
DataType: v3.AttributeKeyDataType(dataType), DataType: v3.AttributeKeyDataType(dataType),
@@ -4629,19 +4783,6 @@ func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.Fi
return &response, nil return &response, nil
} }
// tempHandleFixedColumns is a temporary function to handle the fixed columns whose name has been changed in AttributeKeys Table
func tempHandleFixedColumns(tagKey string) string {
switch {
case tagKey == "traceId":
tagKey = "traceID"
case tagKey == "spanId":
tagKey = "spanID"
case tagKey == "parentSpanId":
tagKey = "parentSpanID"
}
return tagKey
}
func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
var query string var query string
@@ -4702,31 +4843,38 @@ func (r *ClickHouseReader) GetSpanAttributeKeys(ctx context.Context) (map[string
var rows driver.Rows var rows driver.Rows
response := map[string]v3.AttributeKey{} response := map[string]v3.AttributeKey{}
query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType, isColumn FROM %s.%s", r.TraceDB, r.spanAttributesKeysTable) query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType FROM %s.%s", r.TraceDB, r.spanAttributesKeysTable)
rows, err = r.db.Query(ctx, query) rows, err = r.db.Query(ctx, query)
if err != nil { if err != nil {
zap.L().Error("Error while executing query", zap.Error(err)) zap.L().Error("Error while executing query", zap.Error(err))
return nil, fmt.Errorf("error while executing query: %s", err.Error()) return nil, fmt.Errorf("error while executing query: %s", err.Error())
} }
defer rows.Close() defer rows.Close()
statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceTableName)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
}
var tagKey string var tagKey string
var dataType string var dataType string
var tagType string var tagType string
var isColumn bool
for rows.Next() { for rows.Next() {
if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil { if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
} }
key := v3.AttributeKey{ key := v3.AttributeKey{
Key: tagKey, Key: tagKey,
DataType: v3.AttributeKeyDataType(dataType), DataType: v3.AttributeKeyDataType(dataType),
Type: v3.AttributeKeyType(tagType), Type: v3.AttributeKeyType(tagType),
IsColumn: isColumn, IsColumn: isColumn(r.useLogsNewSchema, statements[0].Statement, tagType, tagKey, dataType),
} }
response[tagKey] = key
name := tagKey + "##" + tagType + "##" + strings.ToLower(dataType)
response[name] = key
} }
return response, nil return response, nil
} }

View File

@@ -39,6 +39,7 @@ import (
querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2" querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/cache" "go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
@@ -110,8 +111,9 @@ type APIHandler struct {
// Websocket connection upgrader // Websocket connection upgrader
Upgrader *websocket.Upgrader Upgrader *websocket.Upgrader
UseLogsNewSchema bool UseLogsNewSchema bool
UseLicensesV3 bool UseTraceNewSchema bool
UseLicensesV3 bool
hostsRepo *inframetrics.HostsRepo hostsRepo *inframetrics.HostsRepo
processesRepo *inframetrics.ProcessesRepo processesRepo *inframetrics.ProcessesRepo
@@ -163,6 +165,7 @@ type APIHandlerOpts struct {
// Use Logs New schema // Use Logs New schema
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
// Use Licenses V3 structure // Use Licenses V3 structure
UseLicensesV3 bool UseLicensesV3 bool
} }
@@ -176,21 +179,23 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
} }
querierOpts := querier.QuerierOptions{ querierOpts := querier.QuerierOptions{
Reader: opts.Reader, Reader: opts.Reader,
Cache: opts.Cache, Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(), KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval, FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags, FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema, UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
} }
querierOptsV2 := querierV2.QuerierOptions{ querierOptsV2 := querierV2.QuerierOptions{
Reader: opts.Reader, Reader: opts.Reader,
Cache: opts.Cache, Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(), KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval, FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags, FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema, UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
} }
querier := querier.NewQuerier(querierOpts) querier := querier.NewQuerier(querierOpts)
@@ -224,6 +229,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
querier: querier, querier: querier,
querierV2: querierv2, querierV2: querierv2,
UseLogsNewSchema: opts.UseLogsNewSchema, UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
UseLicensesV3: opts.UseLicensesV3, UseLicensesV3: opts.UseLicensesV3,
hostsRepo: hostsRepo, hostsRepo: hostsRepo,
processesRepo: processesRepo, processesRepo: processesRepo,
@@ -242,9 +248,14 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
logsQueryBuilder = logsv4.PrepareLogsQuery logsQueryBuilder = logsv4.PrepareLogsQuery
} }
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if opts.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
builderOpts := queryBuilder.QueryBuilderOptions{ builderOpts := queryBuilder.QueryBuilderOptions{
BuildMetricQuery: metricsv3.PrepareMetricQuery, BuildMetricQuery: metricsv3.PrepareMetricQuery,
BuildTraceQuery: tracesV3.PrepareTracesQuery, BuildTraceQuery: tracesQueryBuilder,
BuildLogQuery: logsQueryBuilder, BuildLogQuery: logsQueryBuilder,
} }
aH.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, aH.featureFlags) aH.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, aH.featureFlags)
@@ -4433,7 +4444,12 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
RespondError(w, apiErrObj, errQuriesByName) RespondError(w, apiErrObj, errQuriesByName)
return return
} }
tracesV3.Enrich(queryRangeParams, spanKeys) if aH.UseTraceNewSchema {
tracesV4.Enrich(queryRangeParams, spanKeys)
} else {
tracesV3.Enrich(queryRangeParams, spanKeys)
}
} }
// WARN: Only works for AND operator in traces query // WARN: Only works for AND operator in traces query

View File

@@ -142,7 +142,7 @@ func enrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.Attribu
} }
// check if the field is present in the fields map // check if the field is present in the fields map
for _, key := range utils.GenerateLogEnrichmentKeys(field) { for _, key := range utils.GenerateEnrichmentKeys(field) {
if val, ok := fields[key]; ok { if val, ok := fields[key]; ok {
return val return val
} }

View File

@@ -10,6 +10,7 @@ import (
logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4"
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
@@ -158,11 +159,16 @@ func (q *querier) runBuilderQuery(
if builderQuery.DataSource == v3.DataSourceTraces { if builderQuery.DataSource == v3.DataSourceTraces {
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if q.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
var query string var query string
var err error var err error
// for ts query with group by and limit form two queries // for ts query with group by and limit form two queries
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
limitQuery, err := tracesV3.PrepareTracesQuery( limitQuery, err := tracesQueryBuilder(
start, start,
end, end,
params.CompositeQuery.PanelType, params.CompositeQuery.PanelType,
@@ -173,7 +179,7 @@ func (q *querier) runBuilderQuery(
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
return return
} }
placeholderQuery, err := tracesV3.PrepareTracesQuery( placeholderQuery, err := tracesQueryBuilder(
start, start,
end, end,
params.CompositeQuery.PanelType, params.CompositeQuery.PanelType,
@@ -186,7 +192,7 @@ func (q *querier) runBuilderQuery(
} }
query = fmt.Sprintf(placeholderQuery, limitQuery) query = fmt.Sprintf(placeholderQuery, limitQuery)
} else { } else {
query, err = tracesV3.PrepareTracesQuery( query, err = tracesQueryBuilder(
start, start,
end, end,
params.CompositeQuery.PanelType, params.CompositeQuery.PanelType,

View File

@@ -11,6 +11,7 @@ import (
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
chErrors "go.signoz.io/signoz/pkg/query-service/errors" chErrors "go.signoz.io/signoz/pkg/query-service/errors"
"go.signoz.io/signoz/pkg/query-service/querycache" "go.signoz.io/signoz/pkg/query-service/querycache"
@@ -52,7 +53,8 @@ type querier struct {
returnedSeries []*v3.Series returnedSeries []*v3.Series
returnedErr error returnedErr error
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
type QuerierOptions struct { type QuerierOptions struct {
@@ -63,10 +65,11 @@ type QuerierOptions struct {
FeatureLookup interfaces.FeatureLookup FeatureLookup interfaces.FeatureLookup
// used for testing // used for testing
TestingMode bool TestingMode bool
ReturnedSeries []*v3.Series ReturnedSeries []*v3.Series
ReturnedErr error ReturnedErr error
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
func NewQuerier(opts QuerierOptions) interfaces.Querier { func NewQuerier(opts QuerierOptions) interfaces.Querier {
@@ -74,6 +77,10 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
if opts.UseLogsNewSchema { if opts.UseLogsNewSchema {
logsQueryBuilder = logsV4.PrepareLogsQuery logsQueryBuilder = logsV4.PrepareLogsQuery
} }
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if opts.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval)) qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
@@ -85,16 +92,17 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
fluxInterval: opts.FluxInterval, fluxInterval: opts.FluxInterval,
builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{ builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{
BuildTraceQuery: tracesV3.PrepareTracesQuery, BuildTraceQuery: tracesQueryBuilder,
BuildLogQuery: logsQueryBuilder, BuildLogQuery: logsQueryBuilder,
BuildMetricQuery: metricsV3.PrepareMetricQuery, BuildMetricQuery: metricsV3.PrepareMetricQuery,
}, opts.FeatureLookup), }, opts.FeatureLookup),
featureLookUp: opts.FeatureLookup, featureLookUp: opts.FeatureLookup,
testingMode: opts.TestingMode, testingMode: opts.TestingMode,
returnedSeries: opts.ReturnedSeries, returnedSeries: opts.ReturnedSeries,
returnedErr: opts.ReturnedErr, returnedErr: opts.ReturnedErr,
UseLogsNewSchema: opts.UseLogsNewSchema, UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
} }
} }
@@ -308,56 +316,115 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
return results, errQueriesByName, err return results, errQueriesByName, err
} }
func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) { func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
res := make([]*v3.Result, 0) res := make([]*v3.Result, 0)
qName := "" qName := ""
pageSize := uint64(0) pageSize := uint64(0)
limit := uint64(0)
offset := uint64(0)
// se we are considering only one query // se we are considering only one query
for name, v := range params.CompositeQuery.BuilderQueries { for name, v := range params.CompositeQuery.BuilderQueries {
qName = name qName = name
pageSize = v.PageSize pageSize = v.PageSize
// for traces specifically
limit = v.Limit
offset = v.Offset
} }
data := []*v3.Row{} data := []*v3.Row{}
tracesLimit := limit + offset
for _, v := range tsRanges { for _, v := range tsRanges {
params.Start = v.Start params.Start = v.Start
params.End = v.End params.End = v.End
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data)) length := uint64(0)
queries, err := q.builder.PrepareQueries(params)
if err != nil {
return nil, nil, err
}
// this will to run only once // this will to run only once
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query) // appending the filter to get the next set of data
if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs {
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
queries, err := q.builder.PrepareQueries(params)
if err != nil { if err != nil {
errs := []error{err} return nil, nil, err
errQuriesByName := map[string]error{ }
name: err, for name, query := range queries {
} rowList, err := q.reader.GetListResultV3(ctx, query)
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) if err != nil {
errs := []error{err}
errQuriesByName := map[string]error{
name: err,
}
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
length += uint64(len(rowList))
data = append(data, rowList...)
} }
data = append(data, rowList...)
}
// append a filter to the params if length > 0 {
if len(data) > 0 { params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{ Key: v3.AttributeKey{
Key: v3.AttributeKey{ Key: "id",
Key: "id", IsColumn: true,
IsColumn: true, DataType: "string",
DataType: "string", },
}, Operator: v3.FilterOperatorLessThan,
Operator: v3.FilterOperatorLessThan, Value: data[len(data)-1].Data["id"],
Value: data[len(data)-1].Data["id"], })
}) }
}
if uint64(len(data)) >= pageSize { if uint64(len(data)) >= pageSize {
break break
}
} else {
// TRACE
// we are updating the offset and limit based on the number of traces we have found in the current timerange
// eg -
// 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
//
// if 100 traces are there in [t1, t10] then 100 will return immediately.
// if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20]
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100
//
// 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
//
// If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces
// If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20]
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0
params.CompositeQuery.BuilderQueries[qName].Offset = 0
params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit
queries, err := q.builder.PrepareQueries(params)
if err != nil {
return nil, nil, err
}
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query)
if err != nil {
errs := []error{err}
errQuriesByName := map[string]error{
name: err,
}
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
length += uint64(len(rowList))
// skip the traces unless offset is 0
for _, row := range rowList {
if offset == 0 {
data = append(data, row)
} else {
offset--
}
}
}
tracesLimit = tracesLimit - length
if uint64(len(data)) >= limit {
break
}
} }
} }
res = append(res, &v3.Result{ res = append(res, &v3.Result{
@@ -368,15 +435,25 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar
} }
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) { func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
// List query has support for only one query. // List query has support for only one query
if q.UseLogsNewSchema && params.CompositeQuery != nil && len(params.CompositeQuery.BuilderQueries) == 1 { // we are skipping for PanelTypeTrace as it has a custom order by regardless of what's in the payload
if params.CompositeQuery != nil &&
len(params.CompositeQuery.BuilderQueries) == 1 &&
params.CompositeQuery.PanelType != v3.PanelTypeTrace {
for _, v := range params.CompositeQuery.BuilderQueries { for _, v := range params.CompositeQuery.BuilderQueries {
if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) ||
(v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) {
break
}
// only allow of logs queries with timestamp ordering desc // only allow of logs queries with timestamp ordering desc
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" { // TODO(nitya): allow for timestamp asc
startEndArr := utils.GetLogsListTsRanges(params.Start, params.End) if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) &&
if len(startEndArr) > 0 { len(v.OrderBy) == 1 &&
return q.runLogsListQuery(ctx, params, startEndArr) v.OrderBy[0].ColumnName == "timestamp" &&
} v.OrderBy[0].Order == "desc" {
startEndArr := utils.GetListTsRanges(params.Start, params.End)
return q.runWindowBasedListQuery(ctx, params, startEndArr)
} }
} }
} }

View File

@@ -5,15 +5,21 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"math" "math"
"regexp"
"strings" "strings"
"testing" "testing"
"time" "time"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/cache/inmemory" "go.signoz.io/signoz/pkg/query-service/cache/inmemory"
"go.signoz.io/signoz/pkg/query-service/featureManager"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/querycache" "go.signoz.io/signoz/pkg/query-service/querycache"
"go.signoz.io/signoz/pkg/query-service/utils"
) )
func minTimestamp(series []*v3.Series) int64 { func minTimestamp(series []*v3.Series) int64 {
@@ -1124,3 +1130,288 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
} }
} }
} }
type regexMatcher struct {
}
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
re, err := regexp.Compile(expectedSQL)
if err != nil {
return err
}
if !re.MatchString(actualSQL) {
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
}
return nil
}
func Test_querier_runWindowBasedListQuery(t *testing.T) {
params := &v3.QueryRangeParamsV3{
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
CompositeQuery: &v3.CompositeQuery{
PanelType: v3.PanelTypeList,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
Expression: "A",
DataSource: v3.DataSourceTraces,
PageSize: 10,
Limit: 100,
StepInterval: 60,
AggregateOperator: v3.AggregateOperatorNoOp,
SelectColumns: []v3.AttributeKey{{Key: "serviceName"}},
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
},
},
},
}
tsRanges := []utils.LogsListTsRange{
{
Start: 1722259200000000000, // July 29, 2024 6:50:00 PM
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
},
{
Start: 1722252000000000000, // July 29, 2024 4:50:00 PM
End: 1722259200000000000, // July 29, 2024 6:50:00 PM
},
{
Start: 1722237600000000000, // July 29, 2024 12:50:00 PM
End: 1722252000000000000, // July 29, 2024 4:50:00 PM
},
{
Start: 1722208800000000000, // July 29, 2024 4:50:00 AM
End: 1722237600000000000, // July 29, 2024 12:50:00 PM
},
{
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
End: 1722208800000000000, // July 29, 2024 4:50:00 AM
},
}
type queryParams struct {
start int64
end int64
limit uint64
offset uint64
}
type queryResponse struct {
expectedQuery string
timestamps []uint64
}
// create test struct with moc data i.e array of timestamps, limit, offset and expected results
testCases := []struct {
name string
queryResponses []queryResponse
queryParams queryParams
expectedTimestamps []int64
}{
{
name: "should return correct timestamps when querying within time window",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2",
timestamps: []uint64{1722259300000000000, 1722259400000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 2,
offset: 0,
},
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000},
},
{
name: "all data not in first windows",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3",
timestamps: []uint64{1722259300000000000, 1722259400000000000},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1",
timestamps: []uint64{1722253000000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 3,
offset: 0,
},
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000},
},
{
name: "data in multiple windows",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5",
timestamps: []uint64{1722259300000000000, 1722259400000000000},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3",
timestamps: []uint64{1722253000000000000},
},
{
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 2",
timestamps: []uint64{1722237700000000000},
},
{
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 1",
timestamps: []uint64{},
},
{
expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* DESC LIMIT 1",
timestamps: []uint64{},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 5,
offset: 0,
},
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000, 1722237700000000000},
},
{
name: "query with offset",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7",
timestamps: []uint64{1722259210000000000, 1722259220000000000, 1722259230000000000},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4",
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
},
{
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1",
timestamps: []uint64{1722237700000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 4,
offset: 3,
},
expectedTimestamps: []int64{1722253000000000000, 1722254000000000000, 1722255000000000000, 1722237700000000000},
},
{
name: "query with offset and limit- data spread across multiple windows",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 11",
timestamps: []uint64{},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11",
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
},
{
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8",
timestamps: []uint64{1722237700000000000, 1722237800000000000, 1722237900000000000, 1722237910000000000, 1722237920000000000},
},
{
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3",
timestamps: []uint64{1722208810000000000, 1722208820000000000, 1722208830000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 5,
offset: 6,
},
expectedTimestamps: []int64{1722237910000000000, 1722237920000000000, 1722208810000000000, 1722208820000000000, 1722208830000000000},
},
}
cols := []cmock.ColumnType{
{Name: "timestamp", Type: "UInt64"},
{Name: "name", Type: "String"},
}
testName := "name"
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
// iterate over test data, create reader and run test
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Setup mock
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &regexMatcher{})
require.NoError(t, err, "Failed to create ClickHouse mock")
// Configure mock responses
for _, response := range tc.queryResponses {
values := make([][]any, 0, len(response.timestamps))
for _, ts := range response.timestamps {
values = append(values, []any{&ts, &testName})
}
// if len(values) > 0 {
mock.ExpectQuery(response.expectedQuery).WillReturnRows(
cmock.NewRows(cols, values),
)
// }
}
// Create reader and querier
reader := clickhouseReader.NewReaderFromClickhouseConnection(
mock,
options,
nil,
"",
featureManager.StartManager(),
"",
true,
true,
)
q := &querier{
reader: reader,
builder: queryBuilder.NewQueryBuilder(
queryBuilder.QueryBuilderOptions{
BuildTraceQuery: tracesV3.PrepareTracesQuery,
},
featureManager.StartManager(),
),
}
// Update query parameters
params.Start = tc.queryParams.start
params.End = tc.queryParams.end
params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit
params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset
// Execute query
results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRanges)
// Assertions
require.NoError(t, err, "Query execution failed")
require.Nil(t, errMap, "Unexpected error map in results")
require.Len(t, results, 1, "Expected exactly one result set")
result := results[0]
require.Equal(t, "A", result.QueryName, "Incorrect query name in results")
require.Len(t, result.List, len(tc.expectedTimestamps),
"Result count mismatch: got %d results, expected %d",
len(result.List), len(tc.expectedTimestamps))
for i, expected := range tc.expectedTimestamps {
require.Equal(t, expected, result.List[i].Timestamp.UnixNano(),
"Timestamp mismatch at index %d: got %d, expected %d",
i, result.List[i].Timestamp.UnixNano(), expected)
}
// Verify mock expectations
err = mock.ExpectationsWereMet()
require.NoError(t, err, "Mock expectations were not met")
})
}
}

View File

@@ -11,6 +11,7 @@ import (
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
@@ -158,11 +159,16 @@ func (q *querier) runBuilderQuery(
if builderQuery.DataSource == v3.DataSourceTraces { if builderQuery.DataSource == v3.DataSourceTraces {
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if q.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
var query string var query string
var err error var err error
// for ts query with group by and limit form two queries // for ts query with group by and limit form two queries
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
limitQuery, err := tracesV3.PrepareTracesQuery( limitQuery, err := tracesQueryBuilder(
start, start,
end, end,
params.CompositeQuery.PanelType, params.CompositeQuery.PanelType,
@@ -173,7 +179,7 @@ func (q *querier) runBuilderQuery(
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
return return
} }
placeholderQuery, err := tracesV3.PrepareTracesQuery( placeholderQuery, err := tracesQueryBuilder(
start, start,
end, end,
params.CompositeQuery.PanelType, params.CompositeQuery.PanelType,
@@ -186,7 +192,7 @@ func (q *querier) runBuilderQuery(
} }
query = fmt.Sprintf(placeholderQuery, limitQuery) query = fmt.Sprintf(placeholderQuery, limitQuery)
} else { } else {
query, err = tracesV3.PrepareTracesQuery( query, err = tracesQueryBuilder(
start, start,
end, end,
params.CompositeQuery.PanelType, params.CompositeQuery.PanelType,

View File

@@ -11,6 +11,7 @@ import (
metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
chErrors "go.signoz.io/signoz/pkg/query-service/errors" chErrors "go.signoz.io/signoz/pkg/query-service/errors"
"go.signoz.io/signoz/pkg/query-service/querycache" "go.signoz.io/signoz/pkg/query-service/querycache"
@@ -48,10 +49,11 @@ type querier struct {
testingMode bool testingMode bool
queriesExecuted []string queriesExecuted []string
// tuple of start and end time in milliseconds // tuple of start and end time in milliseconds
timeRanges [][]int timeRanges [][]int
returnedSeries []*v3.Series returnedSeries []*v3.Series
returnedErr error returnedErr error
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
type QuerierOptions struct { type QuerierOptions struct {
@@ -62,10 +64,11 @@ type QuerierOptions struct {
FeatureLookup interfaces.FeatureLookup FeatureLookup interfaces.FeatureLookup
// used for testing // used for testing
TestingMode bool TestingMode bool
ReturnedSeries []*v3.Series ReturnedSeries []*v3.Series
ReturnedErr error ReturnedErr error
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
func NewQuerier(opts QuerierOptions) interfaces.Querier { func NewQuerier(opts QuerierOptions) interfaces.Querier {
@@ -74,6 +77,11 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
logsQueryBuilder = logsV4.PrepareLogsQuery logsQueryBuilder = logsV4.PrepareLogsQuery
} }
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if opts.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval)) qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
return &querier{ return &querier{
@@ -84,16 +92,17 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
fluxInterval: opts.FluxInterval, fluxInterval: opts.FluxInterval,
builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{ builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{
BuildTraceQuery: tracesV3.PrepareTracesQuery, BuildTraceQuery: tracesQueryBuilder,
BuildLogQuery: logsQueryBuilder, BuildLogQuery: logsQueryBuilder,
BuildMetricQuery: metricsV4.PrepareMetricQuery, BuildMetricQuery: metricsV4.PrepareMetricQuery,
}, opts.FeatureLookup), }, opts.FeatureLookup),
featureLookUp: opts.FeatureLookup, featureLookUp: opts.FeatureLookup,
testingMode: opts.TestingMode, testingMode: opts.TestingMode,
returnedSeries: opts.ReturnedSeries, returnedSeries: opts.ReturnedSeries,
returnedErr: opts.ReturnedErr, returnedErr: opts.ReturnedErr,
UseLogsNewSchema: opts.UseLogsNewSchema, UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
} }
} }
@@ -308,56 +317,115 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
return results, errQueriesByName, err return results, errQueriesByName, err
} }
func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) { func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
res := make([]*v3.Result, 0) res := make([]*v3.Result, 0)
qName := "" qName := ""
pageSize := uint64(0) pageSize := uint64(0)
limit := uint64(0)
offset := uint64(0)
// se we are considering only one query // se we are considering only one query
for name, v := range params.CompositeQuery.BuilderQueries { for name, v := range params.CompositeQuery.BuilderQueries {
qName = name qName = name
pageSize = v.PageSize pageSize = v.PageSize
// for traces specifically
limit = v.Limit
offset = v.Offset
} }
data := []*v3.Row{} data := []*v3.Row{}
tracesLimit := limit + offset
for _, v := range tsRanges { for _, v := range tsRanges {
params.Start = v.Start params.Start = v.Start
params.End = v.End params.End = v.End
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data)) length := uint64(0)
queries, err := q.builder.PrepareQueries(params)
if err != nil {
return nil, nil, err
}
// this will to run only once // this will to run only once
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query) // appending the filter to get the next set of data
if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs {
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
queries, err := q.builder.PrepareQueries(params)
if err != nil { if err != nil {
errs := []error{err} return nil, nil, err
errQuriesByName := map[string]error{ }
name: err, for name, query := range queries {
} rowList, err := q.reader.GetListResultV3(ctx, query)
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) if err != nil {
errs := []error{err}
errQuriesByName := map[string]error{
name: err,
}
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
length += uint64(len(rowList))
data = append(data, rowList...)
} }
data = append(data, rowList...)
}
// append a filter to the params if length > 0 {
if len(data) > 0 { params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{ Key: v3.AttributeKey{
Key: v3.AttributeKey{ Key: "id",
Key: "id", IsColumn: true,
IsColumn: true, DataType: "string",
DataType: "string", },
}, Operator: v3.FilterOperatorLessThan,
Operator: v3.FilterOperatorLessThan, Value: data[len(data)-1].Data["id"],
Value: data[len(data)-1].Data["id"], })
}) }
}
if uint64(len(data)) >= pageSize { if uint64(len(data)) >= pageSize {
break break
}
} else {
// TRACE
// we are updating the offset and limit based on the number of traces we have found in the current timerange
// eg -
// 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
//
// if 100 traces are there in [t1, t10] then 100 will return immediately.
// if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20]
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100
//
// 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
//
// If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces
// If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20]
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0
params.CompositeQuery.BuilderQueries[qName].Offset = 0
params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit
queries, err := q.builder.PrepareQueries(params)
if err != nil {
return nil, nil, err
}
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query)
if err != nil {
errs := []error{err}
errQuriesByName := map[string]error{
name: err,
}
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
length += uint64(len(rowList))
// skip the traces unless offset is 0
for _, row := range rowList {
if offset == 0 {
data = append(data, row)
} else {
offset--
}
}
}
tracesLimit = tracesLimit - length
if uint64(len(data)) >= limit {
break
}
} }
} }
res = append(res, &v3.Result{ res = append(res, &v3.Result{
@@ -369,14 +437,24 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) { func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
// List query has support for only one query. // List query has support for only one query.
if q.UseLogsNewSchema && params.CompositeQuery != nil && len(params.CompositeQuery.BuilderQueries) == 1 { // we are skipping for PanelTypeTrace as it has a custom order by regardless of what's in the payload
if params.CompositeQuery != nil &&
len(params.CompositeQuery.BuilderQueries) == 1 &&
params.CompositeQuery.PanelType != v3.PanelTypeTrace {
for _, v := range params.CompositeQuery.BuilderQueries { for _, v := range params.CompositeQuery.BuilderQueries {
if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) ||
(v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) {
break
}
// only allow of logs queries with timestamp ordering desc // only allow of logs queries with timestamp ordering desc
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" { // TODO(nitya): allow for timestamp asc
startEndArr := utils.GetLogsListTsRanges(params.Start, params.End) if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) &&
if len(startEndArr) > 0 { len(v.OrderBy) == 1 &&
return q.runLogsListQuery(ctx, params, startEndArr) v.OrderBy[0].ColumnName == "timestamp" &&
} v.OrderBy[0].Order == "desc" {
startEndArr := utils.GetListTsRanges(params.Start, params.End)
return q.runWindowBasedListQuery(ctx, params, startEndArr)
} }
} }
} }

View File

@@ -5,15 +5,21 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"math" "math"
"regexp"
"strings" "strings"
"testing" "testing"
"time" "time"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/cache/inmemory" "go.signoz.io/signoz/pkg/query-service/cache/inmemory"
"go.signoz.io/signoz/pkg/query-service/featureManager"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/querycache" "go.signoz.io/signoz/pkg/query-service/querycache"
"go.signoz.io/signoz/pkg/query-service/utils"
) )
func minTimestamp(series []*v3.Series) int64 { func minTimestamp(series []*v3.Series) int64 {
@@ -798,8 +804,8 @@ func TestV2QueryRangeValueType(t *testing.T) {
} }
q := NewQuerier(opts) q := NewQuerier(opts)
expectedTimeRangeInQueryString := []string{ expectedTimeRangeInQueryString := []string{
fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000, 1675115580000+120*60*1000), // 31st Jan, 03:23:00 to 31st Jan, 05:23:00 fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000, 1675115580000+120*60*1000), // 31st Jan, 03:23:00 to 31st Jan, 05:23:00
fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00 fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675119196722)*int64(1000000), (1675126396722)*int64(1000000)), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00 fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675119196722)*int64(1000000), (1675126396722)*int64(1000000)), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00
} }
@@ -1178,3 +1184,288 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
} }
} }
} }
type regexMatcher struct {
}
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
re, err := regexp.Compile(expectedSQL)
if err != nil {
return err
}
if !re.MatchString(actualSQL) {
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
}
return nil
}
func Test_querier_runWindowBasedListQuery(t *testing.T) {
params := &v3.QueryRangeParamsV3{
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
CompositeQuery: &v3.CompositeQuery{
PanelType: v3.PanelTypeList,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
Expression: "A",
DataSource: v3.DataSourceTraces,
PageSize: 10,
Limit: 100,
StepInterval: 60,
AggregateOperator: v3.AggregateOperatorNoOp,
SelectColumns: []v3.AttributeKey{{Key: "serviceName"}},
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
},
},
},
}
tsRanges := []utils.LogsListTsRange{
{
Start: 1722259200000000000, // July 29, 2024 6:50:00 PM
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
},
{
Start: 1722252000000000000, // July 29, 2024 4:50:00 PM
End: 1722259200000000000, // July 29, 2024 6:50:00 PM
},
{
Start: 1722237600000000000, // July 29, 2024 12:50:00 PM
End: 1722252000000000000, // July 29, 2024 4:50:00 PM
},
{
Start: 1722208800000000000, // July 29, 2024 4:50:00 AM
End: 1722237600000000000, // July 29, 2024 12:50:00 PM
},
{
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
End: 1722208800000000000, // July 29, 2024 4:50:00 AM
},
}
type queryParams struct {
start int64
end int64
limit uint64
offset uint64
}
type queryResponse struct {
expectedQuery string
timestamps []uint64
}
// create test struct with moc data i.e array of timestamps, limit, offset and expected results
testCases := []struct {
name string
queryResponses []queryResponse
queryParams queryParams
expectedTimestamps []int64
}{
{
name: "should return correct timestamps when querying within time window",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2",
timestamps: []uint64{1722259300000000000, 1722259400000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 2,
offset: 0,
},
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000},
},
{
name: "all data not in first windows",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3",
timestamps: []uint64{1722259300000000000, 1722259400000000000},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1",
timestamps: []uint64{1722253000000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 3,
offset: 0,
},
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000},
},
{
name: "data in multiple windows",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5",
timestamps: []uint64{1722259300000000000, 1722259400000000000},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3",
timestamps: []uint64{1722253000000000000},
},
{
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 2",
timestamps: []uint64{1722237700000000000},
},
{
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 1",
timestamps: []uint64{},
},
{
expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* DESC LIMIT 1",
timestamps: []uint64{},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 5,
offset: 0,
},
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000, 1722237700000000000},
},
{
name: "query with offset",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7",
timestamps: []uint64{1722259210000000000, 1722259220000000000, 1722259230000000000},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4",
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
},
{
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1",
timestamps: []uint64{1722237700000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 4,
offset: 3,
},
expectedTimestamps: []int64{1722253000000000000, 1722254000000000000, 1722255000000000000, 1722237700000000000},
},
{
name: "query with offset and limit- data spread across multiple windows",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 11",
timestamps: []uint64{},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11",
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
},
{
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8",
timestamps: []uint64{1722237700000000000, 1722237800000000000, 1722237900000000000, 1722237910000000000, 1722237920000000000},
},
{
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3",
timestamps: []uint64{1722208810000000000, 1722208820000000000, 1722208830000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 5,
offset: 6,
},
expectedTimestamps: []int64{1722237910000000000, 1722237920000000000, 1722208810000000000, 1722208820000000000, 1722208830000000000},
},
}
cols := []cmock.ColumnType{
{Name: "timestamp", Type: "UInt64"},
{Name: "name", Type: "String"},
}
testName := "name"
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
// iterate over test data, create reader and run test
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Setup mock
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &regexMatcher{})
require.NoError(t, err, "Failed to create ClickHouse mock")
// Configure mock responses
for _, response := range tc.queryResponses {
values := make([][]any, 0, len(response.timestamps))
for _, ts := range response.timestamps {
values = append(values, []any{&ts, &testName})
}
// if len(values) > 0 {
mock.ExpectQuery(response.expectedQuery).WillReturnRows(
cmock.NewRows(cols, values),
)
// }
}
// Create reader and querier
reader := clickhouseReader.NewReaderFromClickhouseConnection(
mock,
options,
nil,
"",
featureManager.StartManager(),
"",
true,
true,
)
q := &querier{
reader: reader,
builder: queryBuilder.NewQueryBuilder(
queryBuilder.QueryBuilderOptions{
BuildTraceQuery: tracesV3.PrepareTracesQuery,
},
featureManager.StartManager(),
),
}
// Update query parameters
params.Start = tc.queryParams.start
params.End = tc.queryParams.end
params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit
params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset
// Execute query
results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRanges)
// Assertions
require.NoError(t, err, "Query execution failed")
require.Nil(t, errMap, "Unexpected error map in results")
require.Len(t, results, 1, "Expected exactly one result set")
result := results[0]
require.Equal(t, "A", result.QueryName, "Incorrect query name in results")
require.Len(t, result.List, len(tc.expectedTimestamps),
"Result count mismatch: got %d results, expected %d",
len(result.List), len(tc.expectedTimestamps))
for i, expected := range tc.expectedTimestamps {
require.Equal(t, expected, result.List[i].Timestamp.UnixNano(),
"Timestamp mismatch at index %d: got %d, expected %d",
i, result.List[i].Timestamp.UnixNano(), expected)
}
// Verify mock expectations
err = mock.ExpectationsWereMet()
require.NoError(t, err, "Mock expectations were not met")
})
}
}

View File

@@ -67,6 +67,7 @@ type ServerOptions struct {
FluxInterval string FluxInterval string
Cluster string Cluster string
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
// Server runs HTTP, Mux and a grpc server // Server runs HTTP, Mux and a grpc server
@@ -130,6 +131,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.DialTimeout, serverOptions.DialTimeout,
serverOptions.Cluster, serverOptions.Cluster,
serverOptions.UseLogsNewSchema, serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
) )
go clickhouseReader.Start(readerReady) go clickhouseReader.Start(readerReady)
reader = clickhouseReader reader = clickhouseReader
@@ -157,7 +159,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
rm, err := makeRulesManager( rm, err := makeRulesManager(
serverOptions.PromConfigPath, serverOptions.PromConfigPath,
constants.GetAlertManagerApiPrefix(), constants.GetAlertManagerApiPrefix(),
serverOptions.RuleRepoURL, localDB, reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema) serverOptions.RuleRepoURL, localDB, reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -202,6 +204,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
Cache: c, Cache: c,
FluxInterval: fluxInterval, FluxInterval: fluxInterval,
UseLogsNewSchema: serverOptions.UseLogsNewSchema, UseLogsNewSchema: serverOptions.UseLogsNewSchema,
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@@ -721,7 +724,8 @@ func makeRulesManager(
cache cache.Cache, cache cache.Cache,
disableRules bool, disableRules bool,
fm interfaces.FeatureLookup, fm interfaces.FeatureLookup,
useLogsNewSchema bool) (*rules.Manager, error) { useLogsNewSchema bool,
useTraceNewSchema bool) (*rules.Manager, error) {
// create engine // create engine
pqle, err := pqle.FromReader(ch) pqle, err := pqle.FromReader(ch)
@@ -738,18 +742,19 @@ func makeRulesManager(
// create manager opts // create manager opts
managerOpts := &rules.ManagerOptions{ managerOpts := &rules.ManagerOptions{
NotifierOpts: notifierOpts, NotifierOpts: notifierOpts,
PqlEngine: pqle, PqlEngine: pqle,
RepoURL: ruleRepoURL, RepoURL: ruleRepoURL,
DBConn: db, DBConn: db,
Context: context.Background(), Context: context.Background(),
Logger: zap.L(), Logger: zap.L(),
DisableRules: disableRules, DisableRules: disableRules,
FeatureFlags: fm, FeatureFlags: fm,
Reader: ch, Reader: ch,
Cache: cache, Cache: cache,
EvalDelay: constants.GetEvalDelay(), EvalDelay: constants.GetEvalDelay(),
UseLogsNewSchema: useLogsNewSchema, UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
} }
// create Manager // create Manager

View File

@@ -0,0 +1,104 @@
package v4
import (
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
)
func isEnriched(field v3.AttributeKey) bool {
// if it is timestamp/id dont check
if field.Key == "timestamp" || field.Key == "id" || field.Key == constants.SigNozOrderByValue {
return true
}
// don't need to enrich the static fields as they will be always used a column
if _, ok := constants.StaticFieldsTraces[field.Key]; ok && field.IsColumn {
return true
}
return false
}
func enrichKeyWithMetadata(key v3.AttributeKey, keys map[string]v3.AttributeKey) v3.AttributeKey {
if isEnriched(key) {
return key
}
if v, ok := constants.StaticFieldsTraces[key.Key]; ok {
return v
}
for _, key := range utils.GenerateEnrichmentKeys(key) {
if val, ok := keys[key]; ok {
return val
}
}
// enrich with default values if metadata is not found
if key.Type == "" {
key.Type = v3.AttributeKeyTypeTag
}
if key.DataType == "" {
key.DataType = v3.AttributeKeyDataTypeString
}
return key
}
func Enrich(params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) {
if params.CompositeQuery.QueryType == v3.QueryTypeBuilder {
for _, query := range params.CompositeQuery.BuilderQueries {
if query.DataSource == v3.DataSourceTraces {
EnrichTracesQuery(query, keys)
}
}
}
}
func EnrichTracesQuery(query *v3.BuilderQuery, keys map[string]v3.AttributeKey) {
// enrich aggregate attribute
query.AggregateAttribute = enrichKeyWithMetadata(query.AggregateAttribute, keys)
// enrich filter items
if query.Filters != nil && len(query.Filters.Items) > 0 {
for idx, filter := range query.Filters.Items {
query.Filters.Items[idx].Key = enrichKeyWithMetadata(filter.Key, keys)
// if the serviceName column is used, use the corresponding resource attribute as well during filtering
if filter.Key.Key == "serviceName" && filter.Key.IsColumn {
query.Filters.Items[idx].Key = v3.AttributeKey{
Key: "service.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
IsColumn: false,
}
}
}
}
// enrich group by
for idx, groupBy := range query.GroupBy {
query.GroupBy[idx] = enrichKeyWithMetadata(groupBy, keys)
}
// enrich order by
query.OrderBy = enrichOrderBy(query.OrderBy, keys)
// enrich select columns
for idx, selectColumn := range query.SelectColumns {
query.SelectColumns[idx] = enrichKeyWithMetadata(selectColumn, keys)
}
}
func enrichOrderBy(items []v3.OrderBy, keys map[string]v3.AttributeKey) []v3.OrderBy {
enrichedItems := []v3.OrderBy{}
for i := 0; i < len(items); i++ {
attributeKey := enrichKeyWithMetadata(v3.AttributeKey{
Key: items[i].ColumnName,
}, keys)
enrichedItems = append(enrichedItems, v3.OrderBy{
ColumnName: items[i].ColumnName,
Order: items[i].Order,
Key: attributeKey.Key,
DataType: attributeKey.DataType,
Type: attributeKey.Type,
IsColumn: attributeKey.IsColumn,
})
}
return enrichedItems
}

View File

@@ -0,0 +1,97 @@
package v4
import (
"reflect"
"testing"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func TestEnrichTracesQuery(t *testing.T) {
type args struct {
query *v3.BuilderQuery
keys map[string]v3.AttributeKey
want *v3.BuilderQuery
}
tests := []struct {
name string
args args
}{
{
name: "test 1",
args: args{
query: &v3.BuilderQuery{
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag}, Value: 100, Operator: ">"},
},
},
},
keys: map[string]v3.AttributeKey{
"bytes##tag##int64": {Key: "bytes", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag},
},
want: &v3.BuilderQuery{
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeInt64}, Value: 100, Operator: ">"},
},
},
},
},
},
{
name: "test service name",
args: args{
query: &v3.BuilderQuery{
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "serviceName", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "myservice", Operator: "="},
},
},
},
keys: map[string]v3.AttributeKey{},
want: &v3.BuilderQuery{
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "service.name", Type: v3.AttributeKeyTypeResource, DataType: v3.AttributeKeyDataTypeString}, Value: "myservice", Operator: "="},
},
},
},
},
},
{
name: "test mat attrs",
args: args{
query: &v3.BuilderQuery{
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "/api", Operator: "="},
},
},
},
keys: map[string]v3.AttributeKey{},
want: &v3.BuilderQuery{
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "/api", Operator: "="},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
EnrichTracesQuery(tt.args.query, tt.args.keys)
if !reflect.DeepEqual(tt.args.query.Filters.Items[0].Key, tt.args.want.Filters.Items[0].Key) {
t.Errorf("EnrichTracesQuery() = %v, want %v", tt.args.query, tt.args.want)
}
})
}
}

View File

@@ -0,0 +1,216 @@
package v4
import (
"strconv"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
"go.uber.org/zap"
)
var TracesListViewDefaultSelectedColumns = []v3.AttributeKey{
{
Key: "serviceName",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: true,
},
{
Key: "name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: true,
},
{
Key: "durationNano",
DataType: v3.AttributeKeyDataTypeArrayFloat64,
Type: v3.AttributeKeyTypeTag,
IsColumn: true,
},
{
Key: "httpMethod",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: true,
},
{
Key: "responseStatusCode",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: true,
},
}
// check if traceId filter is used in traces query and return the list of traceIds
func TraceIdFilterUsedWithEqual(params *v3.QueryRangeParamsV3) (bool, []string) {
compositeQuery := params.CompositeQuery
if compositeQuery == nil {
return false, []string{}
}
var traceIds []string
var traceIdFilterUsed bool
// Build queries for each builder query
for queryName, query := range compositeQuery.BuilderQueries {
if query.Expression != queryName && query.DataSource != v3.DataSourceTraces {
continue
}
// check filter attribute
if query.Filters != nil && len(query.Filters.Items) != 0 {
for _, item := range query.Filters.Items {
if item.Key.Key == "traceID" && (item.Operator == v3.FilterOperatorIn ||
item.Operator == v3.FilterOperatorEqual) {
traceIdFilterUsed = true
// validate value
var err error
val := item.Value
val, err = utils.ValidateAndCastValue(val, item.Key.DataType)
if err != nil {
zap.L().Error("invalid value for key", zap.String("key", item.Key.Key), zap.Error(err))
return false, []string{}
}
if val != nil {
fmtVal := extractFormattedStringValues(val)
traceIds = append(traceIds, fmtVal...)
}
}
}
}
}
zap.L().Debug("traceIds", zap.Any("traceIds", traceIds))
return traceIdFilterUsed, traceIds
}
func extractFormattedStringValues(v interface{}) []string {
// if it's pointer convert it to a value
v = getPointerValue(v)
switch x := v.(type) {
case string:
return []string{x}
case []interface{}:
if len(x) == 0 {
return []string{}
}
switch x[0].(type) {
case string:
values := []string{}
for _, val := range x {
values = append(values, val.(string))
}
return values
default:
return []string{}
}
default:
return []string{}
}
}
func getPointerValue(v interface{}) interface{} {
switch x := v.(type) {
case *uint8:
return *x
case *uint16:
return *x
case *uint32:
return *x
case *uint64:
return *x
case *int:
return *x
case *int8:
return *x
case *int16:
return *x
case *int32:
return *x
case *int64:
return *x
case *float32:
return *x
case *float64:
return *x
case *string:
return *x
case *bool:
return *x
case []interface{}:
values := []interface{}{}
for _, val := range x {
values = append(values, getPointerValue(val))
}
return values
default:
return v
}
}
func AddTimestampFilters(minTime int64, maxTime int64, params *v3.QueryRangeParamsV3) {
if minTime == 0 && maxTime == 0 {
return
}
compositeQuery := params.CompositeQuery
if compositeQuery == nil {
return
}
// Build queries for each builder query and apply timestamp filter only if TraceID is present
for queryName, query := range compositeQuery.BuilderQueries {
if query.Expression != queryName && query.DataSource != v3.DataSourceTraces {
continue
}
addTimeStampFilter := false
// check filter attribute
if query.Filters != nil && len(query.Filters.Items) != 0 {
for _, item := range query.Filters.Items {
if item.Key.Key == "traceID" && (item.Operator == v3.FilterOperatorIn ||
item.Operator == v3.FilterOperatorEqual) {
addTimeStampFilter = true
}
}
}
// add timestamp filter to query only if traceID filter along with equal/similar operator is used
if addTimeStampFilter {
timeFilters := []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "timestamp",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
IsColumn: true,
},
Value: strconv.FormatUint(uint64(minTime), 10),
Operator: v3.FilterOperatorGreaterThanOrEq,
},
{
Key: v3.AttributeKey{
Key: "timestamp",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
IsColumn: true,
},
Value: strconv.FormatUint(uint64(maxTime), 10),
Operator: v3.FilterOperatorLessThanOrEq,
},
}
// add new timestamp filter to query
if query.Filters == nil {
query.Filters = &v3.FilterSet{
Items: timeFilters,
}
} else {
query.Filters.Items = append(query.Filters.Items, timeFilters...)
}
}
}
}

View File

@@ -238,8 +238,8 @@ const (
SIGNOZ_EXP_HISTOGRAM_TABLENAME = "distributed_exp_hist" SIGNOZ_EXP_HISTOGRAM_TABLENAME = "distributed_exp_hist"
SIGNOZ_TRACE_DBNAME = "signoz_traces" SIGNOZ_TRACE_DBNAME = "signoz_traces"
SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2" SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2"
SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME = "signoz_index_v2"
SIGNOZ_SPAN_INDEX_V3 = "distributed_signoz_index_v3" SIGNOZ_SPAN_INDEX_V3 = "distributed_signoz_index_v3"
SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME = "signoz_index_v2"
SIGNOZ_SPAN_INDEX_V3_LOCAL_TABLENAME = "signoz_index_v3" SIGNOZ_SPAN_INDEX_V3_LOCAL_TABLENAME = "signoz_index_v3"
SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4" SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4"
SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs" SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs"

View File

@@ -183,7 +183,7 @@ func PrepareFilters(labels map[string]string, whereClauseItems []v3.FilterItem,
var attrFound bool var attrFound bool
// as of now this logic will only apply for logs // as of now this logic will only apply for logs
for _, tKey := range utils.GenerateLogEnrichmentKeys(v3.AttributeKey{Key: key}) { for _, tKey := range utils.GenerateEnrichmentKeys(v3.AttributeKey{Key: key}) {
if val, ok := keys[tKey]; ok { if val, ok := keys[tKey]; ok {
attributeKey = val attributeKey = val
attrFound = true attrFound = true

View File

@@ -39,6 +39,7 @@ func main() {
var disableRules bool var disableRules bool
var useLogsNewSchema bool var useLogsNewSchema bool
var useTraceNewSchema bool
// the url used to build link in the alert messages in slack and other systems // the url used to build link in the alert messages in slack and other systems
var ruleRepoURL, cacheConfigPath, fluxInterval string var ruleRepoURL, cacheConfigPath, fluxInterval string
var cluster string var cluster string
@@ -50,6 +51,7 @@ func main() {
var dialTimeout time.Duration var dialTimeout time.Duration
flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs") flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces")
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)") flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
@@ -87,6 +89,7 @@ func main() {
FluxInterval: fluxInterval, FluxInterval: fluxInterval,
Cluster: cluster, Cluster: cluster,
UseLogsNewSchema: useLogsNewSchema, UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
} }
// Read the jwt secret key // Read the jwt secret key

View File

@@ -269,6 +269,32 @@ type SearchSpanResponseItem struct {
SpanKind string `json:"spanKind"` SpanKind string `json:"spanKind"`
} }
type SearchSpanResponseItemV2 struct {
TimeUnixNano time.Time `json:"timestamp" ch:"timestamp"`
DurationNano uint64 `json:"durationNano" ch:"durationNano"`
SpanID string `json:"spanId" ch:"spanID"`
TraceID string `json:"traceId" ch:"traceID"`
HasError bool `json:"hasError" ch:"hasError"`
Kind int8 `json:"kind" ch:"kind"`
ServiceName string `json:"serviceName" ch:"serviceName"`
Name string `json:"name" ch:"name"`
References string `json:"references,omitempty" ch:"references"`
Attributes_string map[string]string `json:"attributes_string" ch:"attributes_string"`
Attributes_number map[string]float64 `json:"attributes_number" ch:"attributes_number"`
Attributes_bool map[string]bool `json:"attributes_bool" ch:"attributes_bool"`
Events []string `json:"event" ch:"events"`
StatusMessage string `json:"statusMessage" ch:"statusMessage"`
StatusCodeString string `json:"statusCodeString" ch:"statusCodeString"`
SpanKind string `json:"spanKind" ch:"spanKind"`
}
type TraceSummary struct {
TraceID string `json:"traceId" ch:"trace_id"`
Start time.Time `json:"start" ch:"start"`
End time.Time `json:"end" ch:"end"`
NumSpans uint64 `json:"numSpans" ch:"num_spans"`
}
type OtelSpanRef struct { type OtelSpanRef struct {
TraceId string `json:"traceId,omitempty"` TraceId string `json:"traceId,omitempty"`
SpanId string `json:"spanId,omitempty"` SpanId string `json:"spanId,omitempty"`

View File

@@ -463,9 +463,9 @@ func (r *BaseRule) ShouldAlert(series v3.Series) (Sample, bool) {
} }
} else if r.compareOp() == ValueOutsideBounds { } else if r.compareOp() == ValueOutsideBounds {
for _, smpl := range series.Points { for _, smpl := range series.Points {
if math.Abs(smpl.Value) >= r.targetVal() { if math.Abs(smpl.Value) < r.targetVal() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls} alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true shouldAlert = false
break break
} }
} }

View File

@@ -35,7 +35,8 @@ type PrepareTaskOptions struct {
ManagerOpts *ManagerOptions ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc NotifyFunc NotifyFunc
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
type PrepareTestRuleOptions struct { type PrepareTestRuleOptions struct {
@@ -48,7 +49,8 @@ type PrepareTestRuleOptions struct {
ManagerOpts *ManagerOptions ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc NotifyFunc NotifyFunc
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
const taskNamesuffix = "webAppEditor" const taskNamesuffix = "webAppEditor"
@@ -91,9 +93,9 @@ type ManagerOptions struct {
PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error) PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
UseLogsNewSchema bool
UseTraceNewSchema bool
PrepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError) PrepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
UseLogsNewSchema bool
} }
// The Manager manages recording and alerting rules. // The Manager manages recording and alerting rules.
@@ -117,7 +119,8 @@ type Manager struct {
prepareTaskFunc func(opts PrepareTaskOptions) (Task, error) prepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
prepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError) prepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
func defaultOptions(o *ManagerOptions) *ManagerOptions { func defaultOptions(o *ManagerOptions) *ManagerOptions {
@@ -156,6 +159,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.FF, opts.FF,
opts.Reader, opts.Reader,
opts.UseLogsNewSchema, opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
WithEvalDelay(opts.ManagerOpts.EvalDelay), WithEvalDelay(opts.ManagerOpts.EvalDelay),
) )
@@ -368,7 +372,8 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error {
ManagerOpts: m.opts, ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(), NotifyFunc: m.prepareNotifyFunc(),
UseLogsNewSchema: m.opts.UseLogsNewSchema, UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
}) })
if err != nil { if err != nil {
@@ -490,7 +495,8 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error {
ManagerOpts: m.opts, ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(), NotifyFunc: m.prepareNotifyFunc(),
UseLogsNewSchema: m.opts.UseLogsNewSchema, UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
}) })
if err != nil { if err != nil {
@@ -809,15 +815,16 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
} }
alertCount, apiErr := m.prepareTestRuleFunc(PrepareTestRuleOptions{ alertCount, apiErr := m.prepareTestRuleFunc(PrepareTestRuleOptions{
Rule: parsedRule, Rule: parsedRule,
RuleDB: m.ruleDB, RuleDB: m.ruleDB,
Logger: m.logger, Logger: m.logger,
Reader: m.reader, Reader: m.reader,
Cache: m.cache, Cache: m.cache,
FF: m.featureFlags, FF: m.featureFlags,
ManagerOpts: m.opts, ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(), NotifyFunc: m.prepareNotifyFunc(),
UseLogsNewSchema: m.opts.UseLogsNewSchema, UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
}) })
return alertCount, apiErr return alertCount, apiErr

View File

@@ -49,6 +49,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
opts.FF, opts.FF,
opts.Reader, opts.Reader,
opts.UseLogsNewSchema, opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
WithSendAlways(), WithSendAlways(),
WithSendUnmatched(), WithSendUnmatched(),
) )

View File

@@ -58,6 +58,7 @@ func NewThresholdRule(
featureFlags interfaces.FeatureLookup, featureFlags interfaces.FeatureLookup,
reader interfaces.Reader, reader interfaces.Reader,
useLogsNewSchema bool, useLogsNewSchema bool,
useTraceNewSchema bool,
opts ...RuleOption, opts ...RuleOption,
) (*ThresholdRule, error) { ) (*ThresholdRule, error) {
@@ -74,19 +75,21 @@ func NewThresholdRule(
} }
querierOption := querier.QuerierOptions{ querierOption := querier.QuerierOptions{
Reader: reader, Reader: reader,
Cache: nil, Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(), KeyGenerator: queryBuilder.NewKeyGenerator(),
FeatureLookup: featureFlags, FeatureLookup: featureFlags,
UseLogsNewSchema: useLogsNewSchema, UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
} }
querierOptsV2 := querierV2.QuerierOptions{ querierOptsV2 := querierV2.QuerierOptions{
Reader: reader, Reader: reader,
Cache: nil, Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(), KeyGenerator: queryBuilder.NewKeyGenerator(),
FeatureLookup: featureFlags, FeatureLookup: featureFlags,
UseLogsNewSchema: useLogsNewSchema, UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
} }
t.querier = querier.NewQuerier(querierOption) t.querier = querier.NewQuerier(querierOption)

View File

@@ -791,7 +791,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) {
postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@@ -880,7 +880,7 @@ func TestPrepareLinksToLogs(t *testing.T) {
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@@ -922,7 +922,7 @@ func TestPrepareLinksToTraces(t *testing.T) {
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@@ -998,7 +998,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) {
postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@@ -1051,7 +1051,7 @@ func TestThresholdRuleEvalDelay(t *testing.T) {
fm := featureManager.StartManager() fm := featureManager.StartManager()
for idx, c := range cases { for idx, c := range cases {
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true) // no eval delay rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true) // no eval delay
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@@ -1100,7 +1100,7 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) {
fm := featureManager.StartManager() fm := featureManager.StartManager()
for idx, c := range cases { for idx, c := range cases {
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@@ -1241,9 +1241,9 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
} }
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{ rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": { "signoz_calls_total": {
v3.Delta: true, v3.Delta: true,
@@ -1340,9 +1340,9 @@ func TestThresholdRuleNoData(t *testing.T) {
} }
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{ rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": { "signoz_calls_total": {
v3.Delta: true, v3.Delta: true,
@@ -1445,9 +1445,9 @@ func TestThresholdRuleTracesLink(t *testing.T) {
} }
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{ rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": { "signoz_calls_total": {
v3.Delta: true, v3.Delta: true,
@@ -1570,9 +1570,9 @@ func TestThresholdRuleLogsLink(t *testing.T) {
} }
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{ rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": { "signoz_calls_total": {
v3.Delta: true, v3.Delta: true,
@@ -1648,7 +1648,7 @@ func TestThresholdRuleShiftBy(t *testing.T) {
}, },
} }
rule, err := NewThresholdRule("69", &postableRule, nil, nil, true) rule, err := NewThresholdRule("69", &postableRule, nil, nil, true, true)
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }

View File

@@ -46,6 +46,7 @@ func NewMockClickhouseReader(
featureFlags, featureFlags,
"", "",
true, true,
true,
) )
return reader, mockDB return reader, mockDB

View File

@@ -9,7 +9,7 @@ type LogsListTsRange struct {
End int64 End int64
} }
func GetLogsListTsRanges(start, end int64) []LogsListTsRange { func GetListTsRanges(start, end int64) []LogsListTsRange {
startNano := GetEpochNanoSecs(start) startNano := GetEpochNanoSecs(start)
endNano := GetEpochNanoSecs(end) endNano := GetEpochNanoSecs(end)
result := []LogsListTsRange{} result := []LogsListTsRange{}
@@ -35,13 +35,15 @@ func GetLogsListTsRanges(start, end int64) []LogsListTsRange {
tStartNano = startNano tStartNano = startNano
} }
} }
} else {
result = append(result, LogsListTsRange{Start: startNano, End: endNano})
} }
return result return result
} }
// This tries to see all possible fields that it can fall back to if some meta is missing // This tries to see all possible fields that it can fall back to if some meta is missing
// check Test_GenerateLogEnrichmentKeys for example // check Test_GenerateEnrichmentKeys for example
func GenerateLogEnrichmentKeys(field v3.AttributeKey) []string { func GenerateEnrichmentKeys(field v3.AttributeKey) []string {
names := []string{} names := []string{}
if field.Type != v3.AttributeKeyTypeUnspecified && field.DataType != v3.AttributeKeyDataTypeUnspecified { if field.Type != v3.AttributeKeyTypeUnspecified && field.DataType != v3.AttributeKeyDataTypeUnspecified {
names = append(names, field.Key+"##"+field.Type.String()+"##"+field.DataType.String()) names = append(names, field.Key+"##"+field.Type.String()+"##"+field.DataType.String())

View File

@@ -7,7 +7,7 @@ import (
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
) )
func TestLogsListTsRange(t *testing.T) { func TestListTsRange(t *testing.T) {
startEndData := []struct { startEndData := []struct {
name string name string
start int64 start int64
@@ -18,7 +18,9 @@ func TestLogsListTsRange(t *testing.T) {
name: "testing for less then one hour", name: "testing for less then one hour",
start: 1722262800000000000, // July 29, 2024 7:50:00 PM start: 1722262800000000000, // July 29, 2024 7:50:00 PM
end: 1722263800000000000, // July 29, 2024 8:06:40 PM end: 1722263800000000000, // July 29, 2024 8:06:40 PM
res: []LogsListTsRange{}, res: []LogsListTsRange{
{1722262800000000000, 1722263800000000000},
},
}, },
{ {
name: "testing for more than one hour", name: "testing for more than one hour",
@@ -44,7 +46,7 @@ func TestLogsListTsRange(t *testing.T) {
} }
for _, test := range startEndData { for _, test := range startEndData {
res := GetLogsListTsRanges(test.start, test.end) res := GetListTsRanges(test.start, test.end)
for i, v := range res { for i, v := range res {
if test.res[i].Start != v.Start || test.res[i].End != v.End { if test.res[i].Start != v.Start || test.res[i].End != v.End {
t.Errorf("expected range was %v - %v, got %v - %v", v.Start, v.End, test.res[i].Start, test.res[i].End) t.Errorf("expected range was %v - %v, got %v - %v", v.Start, v.End, test.res[i].Start, test.res[i].End)
@@ -53,7 +55,7 @@ func TestLogsListTsRange(t *testing.T) {
} }
} }
func Test_GenerateLogEnrichmentKeys(t *testing.T) { func Test_GenerateEnrichmentKeys(t *testing.T) {
type args struct { type args struct {
field v3.AttributeKey field v3.AttributeKey
} }
@@ -96,8 +98,8 @@ func Test_GenerateLogEnrichmentKeys(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
if got := GenerateLogEnrichmentKeys(tt.args.field); !reflect.DeepEqual(got, tt.want) { if got := GenerateEnrichmentKeys(tt.args.field); !reflect.DeepEqual(got, tt.want) {
t.Errorf("generateLogEnrichmentKeys() = %v, want %v", got, tt.want) t.Errorf("generateEnrichmentKeys() = %v, want %v", got, tt.want)
} }
}) })
} }