Compare commits
37 Commits
refactor/q
...
v0.56.0-43
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
43e1fe6863 | ||
|
|
a7fa0bb4e4 | ||
|
|
252e0b698e | ||
|
|
f64285b89d | ||
|
|
93849ea850 | ||
|
|
471bd684c8 | ||
|
|
16d538e1ba | ||
|
|
549485bbe9 | ||
|
|
b843661097 | ||
|
|
80eda3c805 | ||
|
|
bb6f027b21 | ||
|
|
0418bfff0e | ||
|
|
aee3ca4fb1 | ||
|
|
09ff359610 | ||
|
|
c5c648748e | ||
|
|
f410355088 | ||
|
|
4bd531ce08 | ||
|
|
895856fa04 | ||
|
|
753eb0847e | ||
|
|
25020edfb6 | ||
|
|
6335d5eb22 | ||
|
|
e5d425f06e | ||
|
|
aeeb77bbc1 | ||
|
|
fa6fda0497 | ||
|
|
bb41435a20 | ||
|
|
dd23e4ebf7 | ||
|
|
16a7717598 | ||
|
|
4749ec18bc | ||
|
|
1487820750 | ||
|
|
fd09f57f76 | ||
|
|
9bc7c8708a | ||
|
|
2115093876 | ||
|
|
33f4d8306d | ||
|
|
dbf5f8b77a | ||
|
|
bfc46790bb | ||
|
|
7a011f3460 | ||
|
|
2c30e1493f |
@@ -38,9 +38,10 @@ type APIHandlerOptions struct {
|
|||||||
Cache cache.Cache
|
Cache cache.Cache
|
||||||
Gateway *httputil.ReverseProxy
|
Gateway *httputil.ReverseProxy
|
||||||
// Querier Influx Interval
|
// Querier Influx Interval
|
||||||
FluxInterval time.Duration
|
FluxInterval time.Duration
|
||||||
UseLogsNewSchema bool
|
UseLogsNewSchema bool
|
||||||
UseLicensesV3 bool
|
UseTraceNewSchema bool
|
||||||
|
UseLicensesV3 bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type APIHandler struct {
|
type APIHandler struct {
|
||||||
@@ -66,6 +67,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
|
|||||||
Cache: opts.Cache,
|
Cache: opts.Cache,
|
||||||
FluxInterval: opts.FluxInterval,
|
FluxInterval: opts.FluxInterval,
|
||||||
UseLogsNewSchema: opts.UseLogsNewSchema,
|
UseLogsNewSchema: opts.UseLogsNewSchema,
|
||||||
|
UseTraceNewSchema: opts.UseTraceNewSchema,
|
||||||
UseLicensesV3: opts.UseLicensesV3,
|
UseLicensesV3: opts.UseLicensesV3,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@@ -2,32 +2,31 @@ package api
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"go.signoz.io/signoz/ee/query-service/app/db"
|
|
||||||
"go.signoz.io/signoz/ee/query-service/model"
|
|
||||||
baseapp "go.signoz.io/signoz/pkg/query-service/app"
|
|
||||||
basemodel "go.signoz.io/signoz/pkg/query-service/model"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (ah *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) {
|
func (ah *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
if !ah.CheckFeature(basemodel.SmartTraceDetail) {
|
ah.APIHandler.SearchTraces(w, r)
|
||||||
zap.L().Info("SmartTraceDetail feature is not enabled in this plan")
|
return
|
||||||
ah.APIHandler.SearchTraces(w, r)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
searchTracesParams, err := baseapp.ParseSearchTracesParams(r)
|
|
||||||
if err != nil {
|
|
||||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
result, err := ah.opts.DataConnector.SearchTraces(r.Context(), searchTracesParams, db.SmartTraceAlgorithm)
|
// This is commented since this will be taken care by new trace API
|
||||||
if ah.HandleError(w, err, http.StatusBadRequest) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ah.WriteJSON(w, r, result)
|
// if !ah.CheckFeature(basemodel.SmartTraceDetail) {
|
||||||
|
// zap.L().Info("SmartTraceDetail feature is not enabled in this plan")
|
||||||
|
// ah.APIHandler.SearchTraces(w, r)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
// searchTracesParams, err := baseapp.ParseSearchTracesParams(r)
|
||||||
|
// if err != nil {
|
||||||
|
// RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params")
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// result, err := ah.opts.DataConnector.SearchTraces(r.Context(), searchTracesParams, db.SmartTraceAlgorithm)
|
||||||
|
// if ah.HandleError(w, err, http.StatusBadRequest) {
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// ah.WriteJSON(w, r, result)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,8 +26,9 @@ func NewDataConnector(
|
|||||||
dialTimeout time.Duration,
|
dialTimeout time.Duration,
|
||||||
cluster string,
|
cluster string,
|
||||||
useLogsNewSchema bool,
|
useLogsNewSchema bool,
|
||||||
|
useTraceNewSchema bool,
|
||||||
) *ClickhouseReader {
|
) *ClickhouseReader {
|
||||||
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema)
|
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema)
|
||||||
return &ClickhouseReader{
|
return &ClickhouseReader{
|
||||||
conn: ch.GetConn(),
|
conn: ch.GetConn(),
|
||||||
appdb: localDB,
|
appdb: localDB,
|
||||||
|
|||||||
@@ -77,6 +77,7 @@ type ServerOptions struct {
|
|||||||
Cluster string
|
Cluster string
|
||||||
GatewayUrl string
|
GatewayUrl string
|
||||||
UseLogsNewSchema bool
|
UseLogsNewSchema bool
|
||||||
|
UseTraceNewSchema bool
|
||||||
UseLicensesV3 bool
|
UseLicensesV3 bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -156,6 +157,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
serverOptions.DialTimeout,
|
serverOptions.DialTimeout,
|
||||||
serverOptions.Cluster,
|
serverOptions.Cluster,
|
||||||
serverOptions.UseLogsNewSchema,
|
serverOptions.UseLogsNewSchema,
|
||||||
|
serverOptions.UseTraceNewSchema,
|
||||||
)
|
)
|
||||||
go qb.Start(readerReady)
|
go qb.Start(readerReady)
|
||||||
reader = qb
|
reader = qb
|
||||||
@@ -189,6 +191,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
serverOptions.DisableRules,
|
serverOptions.DisableRules,
|
||||||
lm,
|
lm,
|
||||||
serverOptions.UseLogsNewSchema,
|
serverOptions.UseLogsNewSchema,
|
||||||
|
serverOptions.UseTraceNewSchema,
|
||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -270,6 +273,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
FluxInterval: fluxInterval,
|
FluxInterval: fluxInterval,
|
||||||
Gateway: gatewayProxy,
|
Gateway: gatewayProxy,
|
||||||
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
|
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
|
||||||
|
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
|
||||||
UseLicensesV3: serverOptions.UseLicensesV3,
|
UseLicensesV3: serverOptions.UseLicensesV3,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -737,7 +741,8 @@ func makeRulesManager(
|
|||||||
cache cache.Cache,
|
cache cache.Cache,
|
||||||
disableRules bool,
|
disableRules bool,
|
||||||
fm baseint.FeatureLookup,
|
fm baseint.FeatureLookup,
|
||||||
useLogsNewSchema bool) (*baserules.Manager, error) {
|
useLogsNewSchema bool,
|
||||||
|
useTraceNewSchema bool) (*baserules.Manager, error) {
|
||||||
|
|
||||||
// create engine
|
// create engine
|
||||||
pqle, err := pqle.FromConfigPath(promConfigPath)
|
pqle, err := pqle.FromConfigPath(promConfigPath)
|
||||||
@@ -767,8 +772,9 @@ func makeRulesManager(
|
|||||||
EvalDelay: baseconst.GetEvalDelay(),
|
EvalDelay: baseconst.GetEvalDelay(),
|
||||||
|
|
||||||
PrepareTaskFunc: rules.PrepareTaskFunc,
|
PrepareTaskFunc: rules.PrepareTaskFunc,
|
||||||
PrepareTestRuleFunc: rules.TestNotification,
|
|
||||||
UseLogsNewSchema: useLogsNewSchema,
|
UseLogsNewSchema: useLogsNewSchema,
|
||||||
|
UseTraceNewSchema: useTraceNewSchema,
|
||||||
|
PrepareTestRuleFunc: rules.TestNotification,
|
||||||
}
|
}
|
||||||
|
|
||||||
// create Manager
|
// create Manager
|
||||||
|
|||||||
@@ -94,6 +94,7 @@ func main() {
|
|||||||
var cluster string
|
var cluster string
|
||||||
|
|
||||||
var useLogsNewSchema bool
|
var useLogsNewSchema bool
|
||||||
|
var useTraceNewSchema bool
|
||||||
var useLicensesV3 bool
|
var useLicensesV3 bool
|
||||||
var cacheConfigPath, fluxInterval string
|
var cacheConfigPath, fluxInterval string
|
||||||
var enableQueryServiceLogOTLPExport bool
|
var enableQueryServiceLogOTLPExport bool
|
||||||
@@ -105,6 +106,7 @@ func main() {
|
|||||||
var gatewayUrl string
|
var gatewayUrl string
|
||||||
|
|
||||||
flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
|
flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
|
||||||
|
flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces")
|
||||||
flag.BoolVar(&useLicensesV3, "use-licenses-v3", false, "use licenses_v3 schema for licenses")
|
flag.BoolVar(&useLicensesV3, "use-licenses-v3", false, "use licenses_v3 schema for licenses")
|
||||||
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
|
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
|
||||||
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
|
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
|
||||||
@@ -145,6 +147,7 @@ func main() {
|
|||||||
Cluster: cluster,
|
Cluster: cluster,
|
||||||
GatewayUrl: gatewayUrl,
|
GatewayUrl: gatewayUrl,
|
||||||
UseLogsNewSchema: useLogsNewSchema,
|
UseLogsNewSchema: useLogsNewSchema,
|
||||||
|
UseTraceNewSchema: useTraceNewSchema,
|
||||||
UseLicensesV3: useLicensesV3,
|
UseLicensesV3: useLicensesV3,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -61,6 +61,11 @@ func NewAnomalyRule(
|
|||||||
|
|
||||||
zap.L().Info("creating new AnomalyRule", zap.String("id", id), zap.Any("opts", opts))
|
zap.L().Info("creating new AnomalyRule", zap.String("id", id), zap.Any("opts", opts))
|
||||||
|
|
||||||
|
if p.RuleCondition.CompareOp == baserules.ValueIsBelow {
|
||||||
|
target := -1 * *p.RuleCondition.Target
|
||||||
|
p.RuleCondition.Target = &target
|
||||||
|
}
|
||||||
|
|
||||||
baseRule, err := baserules.NewBaseRule(id, p, reader, opts...)
|
baseRule, err := baserules.NewBaseRule(id, p, reader, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
|||||||
opts.FF,
|
opts.FF,
|
||||||
opts.Reader,
|
opts.Reader,
|
||||||
opts.UseLogsNewSchema,
|
opts.UseLogsNewSchema,
|
||||||
|
opts.UseTraceNewSchema,
|
||||||
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -122,6 +123,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
|
|||||||
opts.FF,
|
opts.FF,
|
||||||
opts.Reader,
|
opts.Reader,
|
||||||
opts.UseLogsNewSchema,
|
opts.UseLogsNewSchema,
|
||||||
|
opts.UseTraceNewSchema,
|
||||||
baserules.WithSendAlways(),
|
baserules.WithSendAlways(),
|
||||||
baserules.WithSendUnmatched(),
|
baserules.WithSendUnmatched(),
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -102,9 +102,9 @@ function RuleOptions({
|
|||||||
<Select.Option value="4">{t('option_notequal')}</Select.Option>
|
<Select.Option value="4">{t('option_notequal')}</Select.Option>
|
||||||
</>
|
</>
|
||||||
)}
|
)}
|
||||||
|
{/* the value 5 and 6 are reserved for above or equal and below or equal */}
|
||||||
{ruleType === 'anomaly_rule' && (
|
{ruleType === 'anomaly_rule' && (
|
||||||
<Select.Option value="5">{t('option_above_below')}</Select.Option>
|
<Select.Option value="7">{t('option_above_below')}</Select.Option>
|
||||||
)}
|
)}
|
||||||
</InlineSelect>
|
</InlineSelect>
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ const (
|
|||||||
defaultTraceDB string = "signoz_traces"
|
defaultTraceDB string = "signoz_traces"
|
||||||
defaultOperationsTable string = "distributed_signoz_operations"
|
defaultOperationsTable string = "distributed_signoz_operations"
|
||||||
defaultIndexTable string = "distributed_signoz_index_v2"
|
defaultIndexTable string = "distributed_signoz_index_v2"
|
||||||
|
defaultLocalIndexTable string = "signoz_index_v2"
|
||||||
defaultErrorTable string = "distributed_signoz_error_index_v2"
|
defaultErrorTable string = "distributed_signoz_error_index_v2"
|
||||||
defaultDurationTable string = "distributed_durationSort"
|
defaultDurationTable string = "distributed_durationSort"
|
||||||
defaultUsageExplorerTable string = "distributed_usage_explorer"
|
defaultUsageExplorerTable string = "distributed_usage_explorer"
|
||||||
@@ -45,6 +46,11 @@ const (
|
|||||||
defaultLogsTableV2 string = "distributed_logs_v2"
|
defaultLogsTableV2 string = "distributed_logs_v2"
|
||||||
defaultLogsResourceLocalTableV2 string = "logs_v2_resource"
|
defaultLogsResourceLocalTableV2 string = "logs_v2_resource"
|
||||||
defaultLogsResourceTableV2 string = "distributed_logs_v2_resource"
|
defaultLogsResourceTableV2 string = "distributed_logs_v2_resource"
|
||||||
|
|
||||||
|
defaultTraceIndexTableV3 string = "distributed_signoz_index_v3"
|
||||||
|
defaultTraceLocalTableName string = "signoz_index_v3"
|
||||||
|
defaultTraceResourceTableV3 string = "distributed_traces_v3_resource"
|
||||||
|
defaultTraceSummaryTable string = "distributed_trace_summary"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NamespaceConfig is Clickhouse's internal configuration data
|
// NamespaceConfig is Clickhouse's internal configuration data
|
||||||
@@ -58,6 +64,7 @@ type namespaceConfig struct {
|
|||||||
TraceDB string
|
TraceDB string
|
||||||
OperationsTable string
|
OperationsTable string
|
||||||
IndexTable string
|
IndexTable string
|
||||||
|
LocalIndexTable string
|
||||||
DurationTable string
|
DurationTable string
|
||||||
UsageExplorerTable string
|
UsageExplorerTable string
|
||||||
SpansTable string
|
SpansTable string
|
||||||
@@ -82,6 +89,11 @@ type namespaceConfig struct {
|
|||||||
LogsTableV2 string
|
LogsTableV2 string
|
||||||
LogsResourceLocalTableV2 string
|
LogsResourceLocalTableV2 string
|
||||||
LogsResourceTableV2 string
|
LogsResourceTableV2 string
|
||||||
|
|
||||||
|
TraceIndexTableV3 string
|
||||||
|
TraceLocalTableNameV3 string
|
||||||
|
TraceResourceTableV3 string
|
||||||
|
TraceSummaryTable string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connecto defines how to connect to the database
|
// Connecto defines how to connect to the database
|
||||||
@@ -150,6 +162,7 @@ func NewOptions(
|
|||||||
TraceDB: defaultTraceDB,
|
TraceDB: defaultTraceDB,
|
||||||
OperationsTable: defaultOperationsTable,
|
OperationsTable: defaultOperationsTable,
|
||||||
IndexTable: defaultIndexTable,
|
IndexTable: defaultIndexTable,
|
||||||
|
LocalIndexTable: defaultLocalIndexTable,
|
||||||
ErrorTable: defaultErrorTable,
|
ErrorTable: defaultErrorTable,
|
||||||
DurationTable: defaultDurationTable,
|
DurationTable: defaultDurationTable,
|
||||||
UsageExplorerTable: defaultUsageExplorerTable,
|
UsageExplorerTable: defaultUsageExplorerTable,
|
||||||
@@ -174,6 +187,11 @@ func NewOptions(
|
|||||||
LogsLocalTableV2: defaultLogsLocalTableV2,
|
LogsLocalTableV2: defaultLogsLocalTableV2,
|
||||||
LogsResourceTableV2: defaultLogsResourceTableV2,
|
LogsResourceTableV2: defaultLogsResourceTableV2,
|
||||||
LogsResourceLocalTableV2: defaultLogsResourceLocalTableV2,
|
LogsResourceLocalTableV2: defaultLogsResourceLocalTableV2,
|
||||||
|
|
||||||
|
TraceIndexTableV3: defaultTraceIndexTableV3,
|
||||||
|
TraceLocalTableNameV3: defaultTraceLocalTableName,
|
||||||
|
TraceResourceTableV3: defaultTraceResourceTableV3,
|
||||||
|
TraceSummaryTable: defaultTraceSummaryTable,
|
||||||
},
|
},
|
||||||
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
|
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -145,9 +145,16 @@ type ClickHouseReader struct {
|
|||||||
liveTailRefreshSeconds int
|
liveTailRefreshSeconds int
|
||||||
cluster string
|
cluster string
|
||||||
|
|
||||||
useLogsNewSchema bool
|
useLogsNewSchema bool
|
||||||
|
useTraceNewSchema bool
|
||||||
|
|
||||||
logsTableName string
|
logsTableName string
|
||||||
logsLocalTableName string
|
logsLocalTableName string
|
||||||
|
|
||||||
|
traceTableName string
|
||||||
|
traceLocalTableName string
|
||||||
|
traceResourceTableV3 string
|
||||||
|
traceSummaryTable string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTraceReader returns a TraceReader for the database
|
// NewTraceReader returns a TraceReader for the database
|
||||||
@@ -160,6 +167,7 @@ func NewReader(
|
|||||||
dialTimeout time.Duration,
|
dialTimeout time.Duration,
|
||||||
cluster string,
|
cluster string,
|
||||||
useLogsNewSchema bool,
|
useLogsNewSchema bool,
|
||||||
|
useTraceNewSchema bool,
|
||||||
) *ClickHouseReader {
|
) *ClickHouseReader {
|
||||||
|
|
||||||
datasource := os.Getenv("ClickHouseUrl")
|
datasource := os.Getenv("ClickHouseUrl")
|
||||||
@@ -170,7 +178,7 @@ func NewReader(
|
|||||||
zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err))
|
zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema)
|
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewReaderFromClickhouseConnection(
|
func NewReaderFromClickhouseConnection(
|
||||||
@@ -181,6 +189,7 @@ func NewReaderFromClickhouseConnection(
|
|||||||
featureFlag interfaces.FeatureLookup,
|
featureFlag interfaces.FeatureLookup,
|
||||||
cluster string,
|
cluster string,
|
||||||
useLogsNewSchema bool,
|
useLogsNewSchema bool,
|
||||||
|
useTraceNewSchema bool,
|
||||||
) *ClickHouseReader {
|
) *ClickHouseReader {
|
||||||
alertManager, err := am.New()
|
alertManager, err := am.New()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -218,6 +227,13 @@ func NewReaderFromClickhouseConnection(
|
|||||||
logsLocalTableName = options.primary.LogsLocalTableV2
|
logsLocalTableName = options.primary.LogsLocalTableV2
|
||||||
}
|
}
|
||||||
|
|
||||||
|
traceTableName := options.primary.IndexTable
|
||||||
|
traceLocalTableName := options.primary.LocalIndexTable
|
||||||
|
if useTraceNewSchema {
|
||||||
|
traceTableName = options.primary.TraceIndexTableV3
|
||||||
|
traceLocalTableName = options.primary.TraceLocalTableNameV3
|
||||||
|
}
|
||||||
|
|
||||||
return &ClickHouseReader{
|
return &ClickHouseReader{
|
||||||
db: wrap,
|
db: wrap,
|
||||||
localDB: localDB,
|
localDB: localDB,
|
||||||
@@ -245,7 +261,8 @@ func NewReaderFromClickhouseConnection(
|
|||||||
cluster: cluster,
|
cluster: cluster,
|
||||||
queryProgressTracker: queryprogress.NewQueryProgressTracker(),
|
queryProgressTracker: queryprogress.NewQueryProgressTracker(),
|
||||||
|
|
||||||
useLogsNewSchema: useLogsNewSchema,
|
useLogsNewSchema: useLogsNewSchema,
|
||||||
|
useTraceNewSchema: useTraceNewSchema,
|
||||||
|
|
||||||
logsTableV2: options.primary.LogsTableV2,
|
logsTableV2: options.primary.LogsTableV2,
|
||||||
logsLocalTableV2: options.primary.LogsLocalTableV2,
|
logsLocalTableV2: options.primary.LogsLocalTableV2,
|
||||||
@@ -253,6 +270,11 @@ func NewReaderFromClickhouseConnection(
|
|||||||
logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2,
|
logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2,
|
||||||
logsTableName: logsTableName,
|
logsTableName: logsTableName,
|
||||||
logsLocalTableName: logsLocalTableName,
|
logsLocalTableName: logsLocalTableName,
|
||||||
|
|
||||||
|
traceLocalTableName: traceLocalTableName,
|
||||||
|
traceTableName: traceTableName,
|
||||||
|
traceResourceTableV3: options.primary.TraceResourceTableV3,
|
||||||
|
traceSummaryTable: options.primary.TraceSummaryTable,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -463,9 +485,8 @@ func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, error) {
|
func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, error) {
|
||||||
|
|
||||||
services := []string{}
|
services := []string{}
|
||||||
query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.indexTable)
|
query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE ts_bucket_start > (toUnixTimestamp(now()) - 1800) AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.traceLocalTableName)
|
||||||
|
|
||||||
rows, err := r.db.Query(ctx, query)
|
rows, err := r.db.Query(ctx, query)
|
||||||
|
|
||||||
@@ -574,14 +595,14 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
|
|||||||
count(*) as numCalls
|
count(*) as numCalls
|
||||||
FROM %s.%s
|
FROM %s.%s
|
||||||
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`,
|
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`,
|
||||||
r.TraceDB, r.indexTable,
|
r.TraceDB, r.traceTableName,
|
||||||
)
|
)
|
||||||
errorQuery := fmt.Sprintf(
|
errorQuery := fmt.Sprintf(
|
||||||
`SELECT
|
`SELECT
|
||||||
count(*) as numErrors
|
count(*) as numErrors
|
||||||
FROM %s.%s
|
FROM %s.%s
|
||||||
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`,
|
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`,
|
||||||
r.TraceDB, r.indexTable,
|
r.TraceDB, r.traceTableName,
|
||||||
)
|
)
|
||||||
|
|
||||||
args := []interface{}{}
|
args := []interface{}{}
|
||||||
@@ -591,6 +612,17 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
|
|||||||
clickhouse.Named("serviceName", svc),
|
clickhouse.Named("serviceName", svc),
|
||||||
clickhouse.Named("names", ops),
|
clickhouse.Named("names", ops),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if r.useTraceNewSchema {
|
||||||
|
bFilter := " AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket"
|
||||||
|
query += bFilter
|
||||||
|
errorQuery += bFilter
|
||||||
|
args = append(args,
|
||||||
|
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
|
||||||
|
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// create TagQuery from TagQueryParams
|
// create TagQuery from TagQueryParams
|
||||||
tags := createTagQueryFromTagQueryParams(queryParams.Tags)
|
tags := createTagQueryFromTagQueryParams(queryParams.Tags)
|
||||||
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
|
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
|
||||||
@@ -673,7 +705,7 @@ func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *
|
|||||||
count(*) as numCalls
|
count(*) as numCalls
|
||||||
FROM %s.%s
|
FROM %s.%s
|
||||||
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`,
|
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`,
|
||||||
r.TraceDB, r.indexTable,
|
r.TraceDB, r.traceTableName,
|
||||||
)
|
)
|
||||||
args := []interface{}{}
|
args := []interface{}{}
|
||||||
args = append(args, namedArgs...)
|
args = append(args, namedArgs...)
|
||||||
@@ -704,7 +736,7 @@ func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *
|
|||||||
count(*) as numErrors
|
count(*) as numErrors
|
||||||
FROM %s.%s
|
FROM %s.%s
|
||||||
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`,
|
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`,
|
||||||
r.TraceDB, r.indexTable,
|
r.TraceDB, r.traceTableName,
|
||||||
)
|
)
|
||||||
args = []interface{}{}
|
args = []interface{}{}
|
||||||
args = append(args, namedArgs...)
|
args = append(args, namedArgs...)
|
||||||
@@ -841,7 +873,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
|||||||
case constants.TraceID:
|
case constants.TraceID:
|
||||||
continue
|
continue
|
||||||
case constants.ServiceName:
|
case constants.ServiceName:
|
||||||
finalQuery := fmt.Sprintf("SELECT serviceName, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
finalQuery := fmt.Sprintf("SELECT serviceName, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
|
||||||
finalQuery += query
|
finalQuery += query
|
||||||
finalQuery += " GROUP BY serviceName"
|
finalQuery += " GROUP BY serviceName"
|
||||||
var dBResponse []model.DBResponseServiceName
|
var dBResponse []model.DBResponseServiceName
|
||||||
@@ -858,7 +890,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
case constants.HttpRoute:
|
case constants.HttpRoute:
|
||||||
finalQuery := fmt.Sprintf("SELECT httpRoute, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
finalQuery := fmt.Sprintf("SELECT httpRoute, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
|
||||||
finalQuery += query
|
finalQuery += query
|
||||||
finalQuery += " GROUP BY httpRoute"
|
finalQuery += " GROUP BY httpRoute"
|
||||||
var dBResponse []model.DBResponseHttpRoute
|
var dBResponse []model.DBResponseHttpRoute
|
||||||
@@ -875,7 +907,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
case constants.HttpUrl:
|
case constants.HttpUrl:
|
||||||
finalQuery := fmt.Sprintf("SELECT httpUrl, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
finalQuery := fmt.Sprintf("SELECT httpUrl, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
|
||||||
finalQuery += query
|
finalQuery += query
|
||||||
finalQuery += " GROUP BY httpUrl"
|
finalQuery += " GROUP BY httpUrl"
|
||||||
var dBResponse []model.DBResponseHttpUrl
|
var dBResponse []model.DBResponseHttpUrl
|
||||||
@@ -892,7 +924,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
case constants.HttpMethod:
|
case constants.HttpMethod:
|
||||||
finalQuery := fmt.Sprintf("SELECT httpMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
finalQuery := fmt.Sprintf("SELECT httpMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
|
||||||
finalQuery += query
|
finalQuery += query
|
||||||
finalQuery += " GROUP BY httpMethod"
|
finalQuery += " GROUP BY httpMethod"
|
||||||
var dBResponse []model.DBResponseHttpMethod
|
var dBResponse []model.DBResponseHttpMethod
|
||||||
@@ -909,7 +941,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
case constants.HttpHost:
|
case constants.HttpHost:
|
||||||
finalQuery := fmt.Sprintf("SELECT httpHost, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
finalQuery := fmt.Sprintf("SELECT httpHost, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
|
||||||
finalQuery += query
|
finalQuery += query
|
||||||
finalQuery += " GROUP BY httpHost"
|
finalQuery += " GROUP BY httpHost"
|
||||||
var dBResponse []model.DBResponseHttpHost
|
var dBResponse []model.DBResponseHttpHost
|
||||||
@@ -926,7 +958,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
case constants.OperationRequest:
|
case constants.OperationRequest:
|
||||||
finalQuery := fmt.Sprintf("SELECT name, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
finalQuery := fmt.Sprintf("SELECT name, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
|
||||||
finalQuery += query
|
finalQuery += query
|
||||||
finalQuery += " GROUP BY name"
|
finalQuery += " GROUP BY name"
|
||||||
var dBResponse []model.DBResponseOperation
|
var dBResponse []model.DBResponseOperation
|
||||||
@@ -943,7 +975,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
case constants.Status:
|
case constants.Status:
|
||||||
finalQuery := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = true", r.TraceDB, r.indexTable)
|
finalQuery := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = true", r.TraceDB, r.traceTableName)
|
||||||
finalQuery += query
|
finalQuery += query
|
||||||
var dBResponse []model.DBResponseTotal
|
var dBResponse []model.DBResponseTotal
|
||||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||||
@@ -954,7 +986,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
|||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||||
}
|
}
|
||||||
|
|
||||||
finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = false", r.TraceDB, r.indexTable)
|
finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = false", r.TraceDB, r.traceTableName)
|
||||||
finalQuery2 += query
|
finalQuery2 += query
|
||||||
var dBResponse2 []model.DBResponseTotal
|
var dBResponse2 []model.DBResponseTotal
|
||||||
err = r.db.Select(ctx, &dBResponse2, finalQuery2, args...)
|
err = r.db.Select(ctx, &dBResponse2, finalQuery2, args...)
|
||||||
@@ -979,7 +1011,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
|||||||
finalQuery := ""
|
finalQuery := ""
|
||||||
if !durationSortEnabled {
|
if !durationSortEnabled {
|
||||||
// if duration sort is not enabled, we need to get the min and max duration from the index table
|
// if duration sort is not enabled, we need to get the min and max duration from the index table
|
||||||
finalQuery = fmt.Sprintf("SELECT min(durationNano) as min, max(durationNano) as max FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
finalQuery = fmt.Sprintf("SELECT min(durationNano) as min, max(durationNano) as max FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
|
||||||
finalQuery += query
|
finalQuery += query
|
||||||
var dBResponse []model.DBResponseMinMax
|
var dBResponse []model.DBResponseMinMax
|
||||||
err = r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
err = r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||||
@@ -1024,7 +1056,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
case constants.RPCMethod:
|
case constants.RPCMethod:
|
||||||
finalQuery := fmt.Sprintf("SELECT rpcMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
finalQuery := fmt.Sprintf("SELECT rpcMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
|
||||||
finalQuery += query
|
finalQuery += query
|
||||||
finalQuery += " GROUP BY rpcMethod"
|
finalQuery += " GROUP BY rpcMethod"
|
||||||
var dBResponse []model.DBResponseRPCMethod
|
var dBResponse []model.DBResponseRPCMethod
|
||||||
@@ -1042,7 +1074,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
|||||||
}
|
}
|
||||||
|
|
||||||
case constants.ResponseStatusCode:
|
case constants.ResponseStatusCode:
|
||||||
finalQuery := fmt.Sprintf("SELECT responseStatusCode, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
finalQuery := fmt.Sprintf("SELECT responseStatusCode, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName)
|
||||||
finalQuery += query
|
finalQuery += query
|
||||||
finalQuery += " GROUP BY responseStatusCode"
|
finalQuery += " GROUP BY responseStatusCode"
|
||||||
var dBResponse []model.DBResponseStatusCodeMethod
|
var dBResponse []model.DBResponseStatusCodeMethod
|
||||||
@@ -1090,7 +1122,7 @@ func getStatusFilters(query string, statusParams []string, excludeMap map[string
|
|||||||
|
|
||||||
func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) {
|
func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) {
|
||||||
|
|
||||||
queryTable := fmt.Sprintf("%s.%s", r.TraceDB, r.indexTable)
|
queryTable := fmt.Sprintf("%s.%s", r.TraceDB, r.traceTableName)
|
||||||
|
|
||||||
excludeMap := make(map[string]struct{})
|
excludeMap := make(map[string]struct{})
|
||||||
for _, e := range queryParams.Exclude {
|
for _, e := range queryParams.Exclude {
|
||||||
@@ -1436,8 +1468,8 @@ func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model
|
|||||||
|
|
||||||
tagFilters := []model.TagFilters{}
|
tagFilters := []model.TagFilters{}
|
||||||
|
|
||||||
// Alternative finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
|
// Alternative finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, indexTable)
|
||||||
finalQuery := fmt.Sprintf(`SELECT groupUniqArrayArray(mapKeys(stringTagMap)) as stringTagKeys, groupUniqArrayArray(mapKeys(numberTagMap)) as numberTagKeys, groupUniqArrayArray(mapKeys(boolTagMap)) as boolTagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
|
finalQuery := fmt.Sprintf(`SELECT groupUniqArrayArray(mapKeys(stringTagMap)) as stringTagKeys, groupUniqArrayArray(mapKeys(numberTagMap)) as numberTagKeys, groupUniqArrayArray(mapKeys(boolTagMap)) as boolTagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.traceTableName)
|
||||||
finalQuery += query
|
finalQuery += query
|
||||||
err := r.db.Select(ctx, &tagFilters, finalQuery, args...)
|
err := r.db.Select(ctx, &tagFilters, finalQuery, args...)
|
||||||
|
|
||||||
@@ -1548,7 +1580,7 @@ func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model.
|
|||||||
|
|
||||||
tagValues := []model.TagValues{}
|
tagValues := []model.TagValues{}
|
||||||
|
|
||||||
finalQuery := fmt.Sprintf(`SELECT groupArray(DISTINCT stringTagMap[@key]) as stringTagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
|
finalQuery := fmt.Sprintf(`SELECT groupArray(DISTINCT attributes_string[@key]) as stringTagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.traceTableName)
|
||||||
finalQuery += query
|
finalQuery += query
|
||||||
finalQuery += " LIMIT @limit"
|
finalQuery += " LIMIT @limit"
|
||||||
|
|
||||||
@@ -1599,7 +1631,7 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo
|
|||||||
name
|
name
|
||||||
FROM %s.%s
|
FROM %s.%s
|
||||||
WHERE serviceName = @serviceName AND timestamp>= @start AND timestamp<= @end`,
|
WHERE serviceName = @serviceName AND timestamp>= @start AND timestamp<= @end`,
|
||||||
r.TraceDB, r.indexTable,
|
r.TraceDB, r.traceTableName,
|
||||||
)
|
)
|
||||||
args := []interface{}{}
|
args := []interface{}{}
|
||||||
args = append(args, namedArgs...)
|
args = append(args, namedArgs...)
|
||||||
@@ -1666,10 +1698,137 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU
|
|||||||
return &usageItems, nil
|
return &usageItems, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) {
|
||||||
|
searchSpansResult := []model.SearchSpansResult{
|
||||||
|
{
|
||||||
|
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"},
|
||||||
|
IsSubTree: false,
|
||||||
|
Events: make([][]interface{}, 0),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var traceSummary model.TraceSummary
|
||||||
|
summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE traceID=$1", r.TraceDB, r.traceSummaryTable)
|
||||||
|
err := r.db.QueryRow(ctx, summaryQuery, params.TraceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans)
|
||||||
|
if err != nil {
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return &searchSpansResult, nil
|
||||||
|
}
|
||||||
|
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||||
|
return nil, fmt.Errorf("error in processing sql query")
|
||||||
|
}
|
||||||
|
|
||||||
|
if traceSummary.NumSpans > uint64(params.MaxSpansInTrace) {
|
||||||
|
zap.L().Error("Max spans allowed in a trace limit reached", zap.Int("MaxSpansInTrace", params.MaxSpansInTrace),
|
||||||
|
zap.Uint64("Count", traceSummary.NumSpans))
|
||||||
|
userEmail, err := auth.GetEmailFromJwt(ctx)
|
||||||
|
if err == nil {
|
||||||
|
data := map[string]interface{}{
|
||||||
|
"traceSize": traceSummary.NumSpans,
|
||||||
|
"maxSpansInTraceLimit": params.MaxSpansInTrace,
|
||||||
|
}
|
||||||
|
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_MAX_SPANS_ALLOWED_LIMIT_REACHED, data, userEmail, true, false)
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("max spans allowed in trace limit reached, please contact support for more details")
|
||||||
|
}
|
||||||
|
|
||||||
|
userEmail, err := auth.GetEmailFromJwt(ctx)
|
||||||
|
if err == nil {
|
||||||
|
data := map[string]interface{}{
|
||||||
|
"traceSize": traceSummary.NumSpans,
|
||||||
|
}
|
||||||
|
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_TRACE_DETAIL_API, data, userEmail, true, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
var startTime, endTime, durationNano uint64
|
||||||
|
var searchScanResponses []model.SearchSpanResponseItemV2
|
||||||
|
|
||||||
|
query := fmt.Sprintf("SELECT timestamp, durationNano, spanID, traceID, hasError, kind, serviceName, name, references, attributes_string, events, statusMessage, statusCodeString, spanKind FROM %s.%s WHERE traceID=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3", r.TraceDB, r.traceTableName)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
err = r.db.Select(ctx, &searchScanResponses, query, params.TraceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10))
|
||||||
|
|
||||||
|
zap.L().Info(query)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||||
|
return nil, fmt.Errorf("error in processing sql query")
|
||||||
|
}
|
||||||
|
end := time.Now()
|
||||||
|
zap.L().Debug("getTraceSQLQuery took: ", zap.Duration("duration", end.Sub(start)))
|
||||||
|
|
||||||
|
searchSpansResult[0].Events = make([][]interface{}, len(searchScanResponses))
|
||||||
|
|
||||||
|
searchSpanResponses := []model.SearchSpanResponseItem{}
|
||||||
|
start = time.Now()
|
||||||
|
for _, item := range searchScanResponses {
|
||||||
|
ref := []model.OtelSpanRef{}
|
||||||
|
err := json.Unmarshal([]byte(item.References), &ref)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("Error unmarshalling references", zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// merge attributes_number and attributes_bool to attributes_string
|
||||||
|
for k, v := range item.Attributes_bool {
|
||||||
|
item.Attributes_string[k] = fmt.Sprintf("%v", v)
|
||||||
|
}
|
||||||
|
for k, v := range item.Attributes_number {
|
||||||
|
item.Attributes_string[k] = fmt.Sprintf("%v", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonItem := model.SearchSpanResponseItem{
|
||||||
|
SpanID: item.SpanID,
|
||||||
|
TraceID: item.TraceID,
|
||||||
|
ServiceName: item.ServiceName,
|
||||||
|
Name: item.Name,
|
||||||
|
Kind: int32(item.Kind),
|
||||||
|
DurationNano: int64(item.DurationNano),
|
||||||
|
HasError: item.HasError,
|
||||||
|
StatusMessage: item.StatusMessage,
|
||||||
|
StatusCodeString: item.StatusCodeString,
|
||||||
|
SpanKind: item.SpanKind,
|
||||||
|
References: ref,
|
||||||
|
Events: item.Events,
|
||||||
|
TagMap: item.Attributes_string,
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000)
|
||||||
|
|
||||||
|
searchSpanResponses = append(searchSpanResponses, jsonItem)
|
||||||
|
if startTime == 0 || jsonItem.TimeUnixNano < startTime {
|
||||||
|
startTime = jsonItem.TimeUnixNano
|
||||||
|
}
|
||||||
|
if endTime == 0 || jsonItem.TimeUnixNano > endTime {
|
||||||
|
endTime = jsonItem.TimeUnixNano
|
||||||
|
}
|
||||||
|
if durationNano == 0 || uint64(jsonItem.DurationNano) > durationNano {
|
||||||
|
durationNano = uint64(jsonItem.DurationNano)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
end = time.Now()
|
||||||
|
zap.L().Debug("getTraceSQLQuery unmarshal took: ", zap.Duration("duration", end.Sub(start)))
|
||||||
|
|
||||||
|
for i, item := range searchSpanResponses {
|
||||||
|
spanEvents := item.GetValues()
|
||||||
|
searchSpansResult[0].Events[i] = spanEvents
|
||||||
|
}
|
||||||
|
|
||||||
|
searchSpansResult[0].StartTimestampMillis = startTime - (durationNano / 1000000)
|
||||||
|
searchSpansResult[0].EndTimestampMillis = endTime + (durationNano / 1000000)
|
||||||
|
|
||||||
|
return &searchSpansResult, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams,
|
func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams,
|
||||||
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
|
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
|
||||||
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
|
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
|
||||||
|
|
||||||
|
if r.useTraceNewSchema {
|
||||||
|
return r.SearchTracesV2(ctx, params)
|
||||||
|
}
|
||||||
|
|
||||||
var countSpans uint64
|
var countSpans uint64
|
||||||
countQuery := fmt.Sprintf("SELECT count() as count from %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable)
|
countQuery := fmt.Sprintf("SELECT count() as count from %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable)
|
||||||
err := r.db.QueryRow(ctx, countQuery, params.TraceID).Scan(&countSpans)
|
err := r.db.QueryRow(ctx, countQuery, params.TraceID).Scan(&countSpans)
|
||||||
@@ -1746,6 +1905,7 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.Searc
|
|||||||
|
|
||||||
err = r.featureFlags.CheckFeature(model.SmartTraceDetail)
|
err = r.featureFlags.CheckFeature(model.SmartTraceDetail)
|
||||||
smartAlgoEnabled := err == nil
|
smartAlgoEnabled := err == nil
|
||||||
|
// TODO(nitya): this will never run remove it
|
||||||
if len(searchScanResponses) > params.SpansRenderLimit && smartAlgoEnabled {
|
if len(searchScanResponses) > params.SpansRenderLimit && smartAlgoEnabled {
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
searchSpansResult, err = smartTraceAlgorithm(searchSpanResponses, params.SpanID, params.LevelUp, params.LevelDown, params.SpansRenderLimit)
|
searchSpansResult, err = smartTraceAlgorithm(searchSpanResponses, params.SpanID, params.LevelUp, params.LevelDown, params.SpansRenderLimit)
|
||||||
@@ -1824,7 +1984,6 @@ func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, queryParams *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) {
|
func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, queryParams *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) {
|
||||||
|
|
||||||
excludeMap := make(map[string]struct{})
|
excludeMap := make(map[string]struct{})
|
||||||
for _, e := range queryParams.Exclude {
|
for _, e := range queryParams.Exclude {
|
||||||
if e == constants.OperationRequest {
|
if e == constants.OperationRequest {
|
||||||
@@ -1870,7 +2029,7 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query
|
|||||||
// Using %s for groupBy params as it can be a custom column and custom columns are not supported by clickhouse-go yet:
|
// Using %s for groupBy params as it can be a custom column and custom columns are not supported by clickhouse-go yet:
|
||||||
// issue link: https://github.com/ClickHouse/clickhouse-go/issues/870
|
// issue link: https://github.com/ClickHouse/clickhouse-go/issues/870
|
||||||
if queryParams.GroupBy != "" && columnExists {
|
if queryParams.GroupBy != "" && columnExists {
|
||||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, queryParams.GroupBy, aggregation_query, r.TraceDB, r.indexTable)
|
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, queryParams.GroupBy, aggregation_query, r.TraceDB, r.traceTableName)
|
||||||
args = append(args, clickhouse.Named("groupByVar", queryParams.GroupBy))
|
args = append(args, clickhouse.Named("groupByVar", queryParams.GroupBy))
|
||||||
} else if queryParams.GroupBy != "" {
|
} else if queryParams.GroupBy != "" {
|
||||||
customStr = strings.Split(queryParams.GroupBy, ".(")
|
customStr = strings.Split(queryParams.GroupBy, ".(")
|
||||||
@@ -1878,17 +2037,17 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query
|
|||||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)}
|
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)}
|
||||||
}
|
}
|
||||||
if customStr[1] == string(model.TagTypeString)+")" {
|
if customStr[1] == string(model.TagTypeString)+")" {
|
||||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, stringTagMap['%s'] as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable)
|
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, stringTagMap['%s'] as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.traceTableName)
|
||||||
} else if customStr[1] == string(model.TagTypeNumber)+")" {
|
} else if customStr[1] == string(model.TagTypeNumber)+")" {
|
||||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(numberTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable)
|
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(numberTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.traceTableName)
|
||||||
} else if customStr[1] == string(model.TagTypeBool)+")" {
|
} else if customStr[1] == string(model.TagTypeBool)+")" {
|
||||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(boolTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable)
|
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(boolTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.traceTableName)
|
||||||
} else {
|
} else {
|
||||||
// return error for unsupported group by
|
// return error for unsupported group by
|
||||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)}
|
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
|
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.traceTableName)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(queryParams.TraceID) > 0 {
|
if len(queryParams.TraceID) > 0 {
|
||||||
@@ -3056,11 +3215,10 @@ func (r *ClickHouseReader) GetLogsInfoInLastHeartBeatInterval(ctx context.Contex
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (*model.TagsInfo, error) {
|
func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (*model.TagsInfo, error) {
|
||||||
|
|
||||||
queryStr := fmt.Sprintf(`select serviceName, stringTagMap['deployment.environment'] as env,
|
queryStr := fmt.Sprintf(`select serviceName, stringTagMap['deployment.environment'] as env,
|
||||||
stringTagMap['telemetry.sdk.language'] as language from %s.%s
|
stringTagMap['telemetry.sdk.language'] as language from %s.%s
|
||||||
where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d))
|
where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d))
|
||||||
group by serviceName, env, language;`, r.TraceDB, r.indexTable, int(interval.Minutes()))
|
group by serviceName, env, language;`, r.TraceDB, r.traceTableName, int(interval.Minutes()))
|
||||||
|
|
||||||
tagTelemetryDataList := []model.TagTelemetryData{}
|
tagTelemetryDataList := []model.TagTelemetryData{}
|
||||||
err := r.db.Select(ctx, &tagTelemetryDataList, queryStr)
|
err := r.db.Select(ctx, &tagTelemetryDataList, queryStr)
|
||||||
@@ -4575,8 +4733,6 @@ func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req
|
|||||||
if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil {
|
if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil {
|
||||||
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
|
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
|
||||||
}
|
}
|
||||||
// TODO: Remove this once the column name are updated in the table
|
|
||||||
tagKey = tempHandleFixedColumns(tagKey)
|
|
||||||
key := v3.AttributeKey{
|
key := v3.AttributeKey{
|
||||||
Key: tagKey,
|
Key: tagKey,
|
||||||
DataType: v3.AttributeKeyDataType(dataType),
|
DataType: v3.AttributeKeyDataType(dataType),
|
||||||
@@ -4616,8 +4772,6 @@ func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.Fi
|
|||||||
if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil {
|
if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil {
|
||||||
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
|
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
|
||||||
}
|
}
|
||||||
// TODO: Remove this once the column name are updated in the table
|
|
||||||
tagKey = tempHandleFixedColumns(tagKey)
|
|
||||||
key := v3.AttributeKey{
|
key := v3.AttributeKey{
|
||||||
Key: tagKey,
|
Key: tagKey,
|
||||||
DataType: v3.AttributeKeyDataType(dataType),
|
DataType: v3.AttributeKeyDataType(dataType),
|
||||||
@@ -4629,19 +4783,6 @@ func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.Fi
|
|||||||
return &response, nil
|
return &response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// tempHandleFixedColumns is a temporary function to handle the fixed columns whose name has been changed in AttributeKeys Table
|
|
||||||
func tempHandleFixedColumns(tagKey string) string {
|
|
||||||
switch {
|
|
||||||
case tagKey == "traceId":
|
|
||||||
tagKey = "traceID"
|
|
||||||
case tagKey == "spanId":
|
|
||||||
tagKey = "spanID"
|
|
||||||
case tagKey == "parentSpanId":
|
|
||||||
tagKey = "parentSpanID"
|
|
||||||
}
|
|
||||||
return tagKey
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
|
func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
|
||||||
|
|
||||||
var query string
|
var query string
|
||||||
@@ -4702,31 +4843,38 @@ func (r *ClickHouseReader) GetSpanAttributeKeys(ctx context.Context) (map[string
|
|||||||
var rows driver.Rows
|
var rows driver.Rows
|
||||||
response := map[string]v3.AttributeKey{}
|
response := map[string]v3.AttributeKey{}
|
||||||
|
|
||||||
query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType, isColumn FROM %s.%s", r.TraceDB, r.spanAttributesKeysTable)
|
query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType FROM %s.%s", r.TraceDB, r.spanAttributesKeysTable)
|
||||||
|
|
||||||
rows, err = r.db.Query(ctx, query)
|
rows, err = r.db.Query(ctx, query)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error("Error while executing query", zap.Error(err))
|
zap.L().Error("Error while executing query", zap.Error(err))
|
||||||
return nil, fmt.Errorf("error while executing query: %s", err.Error())
|
return nil, fmt.Errorf("error while executing query: %s", err.Error())
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
|
statements := []model.ShowCreateTableStatement{}
|
||||||
|
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceTableName)
|
||||||
|
err = r.db.Select(ctx, &statements, query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
var tagKey string
|
var tagKey string
|
||||||
var dataType string
|
var dataType string
|
||||||
var tagType string
|
var tagType string
|
||||||
var isColumn bool
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil {
|
if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil {
|
||||||
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
|
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
|
||||||
}
|
}
|
||||||
key := v3.AttributeKey{
|
key := v3.AttributeKey{
|
||||||
Key: tagKey,
|
Key: tagKey,
|
||||||
DataType: v3.AttributeKeyDataType(dataType),
|
DataType: v3.AttributeKeyDataType(dataType),
|
||||||
Type: v3.AttributeKeyType(tagType),
|
Type: v3.AttributeKeyType(tagType),
|
||||||
IsColumn: isColumn,
|
IsColumn: isColumn(r.useLogsNewSchema, statements[0].Statement, tagType, tagKey, dataType),
|
||||||
}
|
}
|
||||||
response[tagKey] = key
|
|
||||||
|
name := tagKey + "##" + tagType + "##" + strings.ToLower(dataType)
|
||||||
|
response[name] = key
|
||||||
}
|
}
|
||||||
return response, nil
|
return response, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -39,6 +39,7 @@ import (
|
|||||||
querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2"
|
querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2"
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
||||||
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
||||||
|
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
|
||||||
"go.signoz.io/signoz/pkg/query-service/auth"
|
"go.signoz.io/signoz/pkg/query-service/auth"
|
||||||
"go.signoz.io/signoz/pkg/query-service/cache"
|
"go.signoz.io/signoz/pkg/query-service/cache"
|
||||||
"go.signoz.io/signoz/pkg/query-service/common"
|
"go.signoz.io/signoz/pkg/query-service/common"
|
||||||
@@ -110,8 +111,9 @@ type APIHandler struct {
|
|||||||
// Websocket connection upgrader
|
// Websocket connection upgrader
|
||||||
Upgrader *websocket.Upgrader
|
Upgrader *websocket.Upgrader
|
||||||
|
|
||||||
UseLogsNewSchema bool
|
UseLogsNewSchema bool
|
||||||
UseLicensesV3 bool
|
UseTraceNewSchema bool
|
||||||
|
UseLicensesV3 bool
|
||||||
|
|
||||||
hostsRepo *inframetrics.HostsRepo
|
hostsRepo *inframetrics.HostsRepo
|
||||||
processesRepo *inframetrics.ProcessesRepo
|
processesRepo *inframetrics.ProcessesRepo
|
||||||
@@ -163,6 +165,7 @@ type APIHandlerOpts struct {
|
|||||||
// Use Logs New schema
|
// Use Logs New schema
|
||||||
UseLogsNewSchema bool
|
UseLogsNewSchema bool
|
||||||
|
|
||||||
|
UseTraceNewSchema bool
|
||||||
// Use Licenses V3 structure
|
// Use Licenses V3 structure
|
||||||
UseLicensesV3 bool
|
UseLicensesV3 bool
|
||||||
}
|
}
|
||||||
@@ -176,21 +179,23 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
querierOpts := querier.QuerierOptions{
|
querierOpts := querier.QuerierOptions{
|
||||||
Reader: opts.Reader,
|
Reader: opts.Reader,
|
||||||
Cache: opts.Cache,
|
Cache: opts.Cache,
|
||||||
KeyGenerator: queryBuilder.NewKeyGenerator(),
|
KeyGenerator: queryBuilder.NewKeyGenerator(),
|
||||||
FluxInterval: opts.FluxInterval,
|
FluxInterval: opts.FluxInterval,
|
||||||
FeatureLookup: opts.FeatureFlags,
|
FeatureLookup: opts.FeatureFlags,
|
||||||
UseLogsNewSchema: opts.UseLogsNewSchema,
|
UseLogsNewSchema: opts.UseLogsNewSchema,
|
||||||
|
UseTraceNewSchema: opts.UseTraceNewSchema,
|
||||||
}
|
}
|
||||||
|
|
||||||
querierOptsV2 := querierV2.QuerierOptions{
|
querierOptsV2 := querierV2.QuerierOptions{
|
||||||
Reader: opts.Reader,
|
Reader: opts.Reader,
|
||||||
Cache: opts.Cache,
|
Cache: opts.Cache,
|
||||||
KeyGenerator: queryBuilder.NewKeyGenerator(),
|
KeyGenerator: queryBuilder.NewKeyGenerator(),
|
||||||
FluxInterval: opts.FluxInterval,
|
FluxInterval: opts.FluxInterval,
|
||||||
FeatureLookup: opts.FeatureFlags,
|
FeatureLookup: opts.FeatureFlags,
|
||||||
UseLogsNewSchema: opts.UseLogsNewSchema,
|
UseLogsNewSchema: opts.UseLogsNewSchema,
|
||||||
|
UseTraceNewSchema: opts.UseTraceNewSchema,
|
||||||
}
|
}
|
||||||
|
|
||||||
querier := querier.NewQuerier(querierOpts)
|
querier := querier.NewQuerier(querierOpts)
|
||||||
@@ -224,6 +229,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
|||||||
querier: querier,
|
querier: querier,
|
||||||
querierV2: querierv2,
|
querierV2: querierv2,
|
||||||
UseLogsNewSchema: opts.UseLogsNewSchema,
|
UseLogsNewSchema: opts.UseLogsNewSchema,
|
||||||
|
UseTraceNewSchema: opts.UseTraceNewSchema,
|
||||||
UseLicensesV3: opts.UseLicensesV3,
|
UseLicensesV3: opts.UseLicensesV3,
|
||||||
hostsRepo: hostsRepo,
|
hostsRepo: hostsRepo,
|
||||||
processesRepo: processesRepo,
|
processesRepo: processesRepo,
|
||||||
@@ -242,9 +248,14 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
|||||||
logsQueryBuilder = logsv4.PrepareLogsQuery
|
logsQueryBuilder = logsv4.PrepareLogsQuery
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tracesQueryBuilder := tracesV3.PrepareTracesQuery
|
||||||
|
if opts.UseTraceNewSchema {
|
||||||
|
tracesQueryBuilder = tracesV4.PrepareTracesQuery
|
||||||
|
}
|
||||||
|
|
||||||
builderOpts := queryBuilder.QueryBuilderOptions{
|
builderOpts := queryBuilder.QueryBuilderOptions{
|
||||||
BuildMetricQuery: metricsv3.PrepareMetricQuery,
|
BuildMetricQuery: metricsv3.PrepareMetricQuery,
|
||||||
BuildTraceQuery: tracesV3.PrepareTracesQuery,
|
BuildTraceQuery: tracesQueryBuilder,
|
||||||
BuildLogQuery: logsQueryBuilder,
|
BuildLogQuery: logsQueryBuilder,
|
||||||
}
|
}
|
||||||
aH.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, aH.featureFlags)
|
aH.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, aH.featureFlags)
|
||||||
@@ -4433,7 +4444,12 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
|
|||||||
RespondError(w, apiErrObj, errQuriesByName)
|
RespondError(w, apiErrObj, errQuriesByName)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
tracesV3.Enrich(queryRangeParams, spanKeys)
|
if aH.UseTraceNewSchema {
|
||||||
|
tracesV4.Enrich(queryRangeParams, spanKeys)
|
||||||
|
} else {
|
||||||
|
tracesV3.Enrich(queryRangeParams, spanKeys)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WARN: Only works for AND operator in traces query
|
// WARN: Only works for AND operator in traces query
|
||||||
|
|||||||
@@ -142,7 +142,7 @@ func enrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.Attribu
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check if the field is present in the fields map
|
// check if the field is present in the fields map
|
||||||
for _, key := range utils.GenerateLogEnrichmentKeys(field) {
|
for _, key := range utils.GenerateEnrichmentKeys(field) {
|
||||||
if val, ok := fields[key]; ok {
|
if val, ok := fields[key]; ok {
|
||||||
return val
|
return val
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4"
|
logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4"
|
||||||
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
|
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
|
||||||
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
||||||
|
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
|
||||||
"go.signoz.io/signoz/pkg/query-service/common"
|
"go.signoz.io/signoz/pkg/query-service/common"
|
||||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
@@ -158,11 +159,16 @@ func (q *querier) runBuilderQuery(
|
|||||||
|
|
||||||
if builderQuery.DataSource == v3.DataSourceTraces {
|
if builderQuery.DataSource == v3.DataSourceTraces {
|
||||||
|
|
||||||
|
tracesQueryBuilder := tracesV3.PrepareTracesQuery
|
||||||
|
if q.UseTraceNewSchema {
|
||||||
|
tracesQueryBuilder = tracesV4.PrepareTracesQuery
|
||||||
|
}
|
||||||
|
|
||||||
var query string
|
var query string
|
||||||
var err error
|
var err error
|
||||||
// for ts query with group by and limit form two queries
|
// for ts query with group by and limit form two queries
|
||||||
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
|
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
|
||||||
limitQuery, err := tracesV3.PrepareTracesQuery(
|
limitQuery, err := tracesQueryBuilder(
|
||||||
start,
|
start,
|
||||||
end,
|
end,
|
||||||
params.CompositeQuery.PanelType,
|
params.CompositeQuery.PanelType,
|
||||||
@@ -173,7 +179,7 @@ func (q *querier) runBuilderQuery(
|
|||||||
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
|
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
placeholderQuery, err := tracesV3.PrepareTracesQuery(
|
placeholderQuery, err := tracesQueryBuilder(
|
||||||
start,
|
start,
|
||||||
end,
|
end,
|
||||||
params.CompositeQuery.PanelType,
|
params.CompositeQuery.PanelType,
|
||||||
@@ -186,7 +192,7 @@ func (q *querier) runBuilderQuery(
|
|||||||
}
|
}
|
||||||
query = fmt.Sprintf(placeholderQuery, limitQuery)
|
query = fmt.Sprintf(placeholderQuery, limitQuery)
|
||||||
} else {
|
} else {
|
||||||
query, err = tracesV3.PrepareTracesQuery(
|
query, err = tracesQueryBuilder(
|
||||||
start,
|
start,
|
||||||
end,
|
end,
|
||||||
params.CompositeQuery.PanelType,
|
params.CompositeQuery.PanelType,
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
|
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
||||||
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
||||||
|
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
|
||||||
"go.signoz.io/signoz/pkg/query-service/common"
|
"go.signoz.io/signoz/pkg/query-service/common"
|
||||||
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
|
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
|
||||||
"go.signoz.io/signoz/pkg/query-service/querycache"
|
"go.signoz.io/signoz/pkg/query-service/querycache"
|
||||||
@@ -52,7 +53,8 @@ type querier struct {
|
|||||||
returnedSeries []*v3.Series
|
returnedSeries []*v3.Series
|
||||||
returnedErr error
|
returnedErr error
|
||||||
|
|
||||||
UseLogsNewSchema bool
|
UseLogsNewSchema bool
|
||||||
|
UseTraceNewSchema bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type QuerierOptions struct {
|
type QuerierOptions struct {
|
||||||
@@ -63,10 +65,11 @@ type QuerierOptions struct {
|
|||||||
FeatureLookup interfaces.FeatureLookup
|
FeatureLookup interfaces.FeatureLookup
|
||||||
|
|
||||||
// used for testing
|
// used for testing
|
||||||
TestingMode bool
|
TestingMode bool
|
||||||
ReturnedSeries []*v3.Series
|
ReturnedSeries []*v3.Series
|
||||||
ReturnedErr error
|
ReturnedErr error
|
||||||
UseLogsNewSchema bool
|
UseLogsNewSchema bool
|
||||||
|
UseTraceNewSchema bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewQuerier(opts QuerierOptions) interfaces.Querier {
|
func NewQuerier(opts QuerierOptions) interfaces.Querier {
|
||||||
@@ -74,6 +77,10 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
|
|||||||
if opts.UseLogsNewSchema {
|
if opts.UseLogsNewSchema {
|
||||||
logsQueryBuilder = logsV4.PrepareLogsQuery
|
logsQueryBuilder = logsV4.PrepareLogsQuery
|
||||||
}
|
}
|
||||||
|
tracesQueryBuilder := tracesV3.PrepareTracesQuery
|
||||||
|
if opts.UseTraceNewSchema {
|
||||||
|
tracesQueryBuilder = tracesV4.PrepareTracesQuery
|
||||||
|
}
|
||||||
|
|
||||||
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
|
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
|
||||||
|
|
||||||
@@ -85,16 +92,17 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
|
|||||||
fluxInterval: opts.FluxInterval,
|
fluxInterval: opts.FluxInterval,
|
||||||
|
|
||||||
builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{
|
builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{
|
||||||
BuildTraceQuery: tracesV3.PrepareTracesQuery,
|
BuildTraceQuery: tracesQueryBuilder,
|
||||||
BuildLogQuery: logsQueryBuilder,
|
BuildLogQuery: logsQueryBuilder,
|
||||||
BuildMetricQuery: metricsV3.PrepareMetricQuery,
|
BuildMetricQuery: metricsV3.PrepareMetricQuery,
|
||||||
}, opts.FeatureLookup),
|
}, opts.FeatureLookup),
|
||||||
featureLookUp: opts.FeatureLookup,
|
featureLookUp: opts.FeatureLookup,
|
||||||
|
|
||||||
testingMode: opts.TestingMode,
|
testingMode: opts.TestingMode,
|
||||||
returnedSeries: opts.ReturnedSeries,
|
returnedSeries: opts.ReturnedSeries,
|
||||||
returnedErr: opts.ReturnedErr,
|
returnedErr: opts.ReturnedErr,
|
||||||
UseLogsNewSchema: opts.UseLogsNewSchema,
|
UseLogsNewSchema: opts.UseLogsNewSchema,
|
||||||
|
UseTraceNewSchema: opts.UseTraceNewSchema,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -308,56 +316,115 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
|
|||||||
return results, errQueriesByName, err
|
return results, errQueriesByName, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
|
func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
|
||||||
res := make([]*v3.Result, 0)
|
res := make([]*v3.Result, 0)
|
||||||
qName := ""
|
qName := ""
|
||||||
pageSize := uint64(0)
|
pageSize := uint64(0)
|
||||||
|
limit := uint64(0)
|
||||||
|
offset := uint64(0)
|
||||||
|
|
||||||
// se we are considering only one query
|
// se we are considering only one query
|
||||||
for name, v := range params.CompositeQuery.BuilderQueries {
|
for name, v := range params.CompositeQuery.BuilderQueries {
|
||||||
qName = name
|
qName = name
|
||||||
pageSize = v.PageSize
|
pageSize = v.PageSize
|
||||||
|
|
||||||
|
// for traces specifically
|
||||||
|
limit = v.Limit
|
||||||
|
offset = v.Offset
|
||||||
}
|
}
|
||||||
data := []*v3.Row{}
|
data := []*v3.Row{}
|
||||||
|
|
||||||
|
tracesLimit := limit + offset
|
||||||
|
|
||||||
for _, v := range tsRanges {
|
for _, v := range tsRanges {
|
||||||
params.Start = v.Start
|
params.Start = v.Start
|
||||||
params.End = v.End
|
params.End = v.End
|
||||||
|
|
||||||
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
|
length := uint64(0)
|
||||||
queries, err := q.builder.PrepareQueries(params)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// this will to run only once
|
// this will to run only once
|
||||||
for name, query := range queries {
|
|
||||||
rowList, err := q.reader.GetListResultV3(ctx, query)
|
// appending the filter to get the next set of data
|
||||||
|
if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs {
|
||||||
|
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
|
||||||
|
queries, err := q.builder.PrepareQueries(params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errs := []error{err}
|
return nil, nil, err
|
||||||
errQuriesByName := map[string]error{
|
}
|
||||||
name: err,
|
for name, query := range queries {
|
||||||
}
|
rowList, err := q.reader.GetListResultV3(ctx, query)
|
||||||
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
if err != nil {
|
||||||
|
errs := []error{err}
|
||||||
|
errQuriesByName := map[string]error{
|
||||||
|
name: err,
|
||||||
|
}
|
||||||
|
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||||
|
}
|
||||||
|
length += uint64(len(rowList))
|
||||||
|
data = append(data, rowList...)
|
||||||
}
|
}
|
||||||
data = append(data, rowList...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// append a filter to the params
|
if length > 0 {
|
||||||
if len(data) > 0 {
|
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
|
||||||
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
|
Key: v3.AttributeKey{
|
||||||
Key: v3.AttributeKey{
|
Key: "id",
|
||||||
Key: "id",
|
IsColumn: true,
|
||||||
IsColumn: true,
|
DataType: "string",
|
||||||
DataType: "string",
|
},
|
||||||
},
|
Operator: v3.FilterOperatorLessThan,
|
||||||
Operator: v3.FilterOperatorLessThan,
|
Value: data[len(data)-1].Data["id"],
|
||||||
Value: data[len(data)-1].Data["id"],
|
})
|
||||||
})
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if uint64(len(data)) >= pageSize {
|
if uint64(len(data)) >= pageSize {
|
||||||
break
|
break
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// TRACE
|
||||||
|
// we are updating the offset and limit based on the number of traces we have found in the current timerange
|
||||||
|
// eg -
|
||||||
|
// 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
||||||
|
//
|
||||||
|
// if 100 traces are there in [t1, t10] then 100 will return immediately.
|
||||||
|
// if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20]
|
||||||
|
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100
|
||||||
|
|
||||||
|
//
|
||||||
|
// 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
||||||
|
//
|
||||||
|
// If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces
|
||||||
|
// If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20]
|
||||||
|
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0
|
||||||
|
params.CompositeQuery.BuilderQueries[qName].Offset = 0
|
||||||
|
params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit
|
||||||
|
queries, err := q.builder.PrepareQueries(params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
for name, query := range queries {
|
||||||
|
rowList, err := q.reader.GetListResultV3(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
errs := []error{err}
|
||||||
|
errQuriesByName := map[string]error{
|
||||||
|
name: err,
|
||||||
|
}
|
||||||
|
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||||
|
}
|
||||||
|
length += uint64(len(rowList))
|
||||||
|
|
||||||
|
// skip the traces unless offset is 0
|
||||||
|
for _, row := range rowList {
|
||||||
|
if offset == 0 {
|
||||||
|
data = append(data, row)
|
||||||
|
} else {
|
||||||
|
offset--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tracesLimit = tracesLimit - length
|
||||||
|
|
||||||
|
if uint64(len(data)) >= limit {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
res = append(res, &v3.Result{
|
res = append(res, &v3.Result{
|
||||||
@@ -368,15 +435,25 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
|
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
|
||||||
// List query has support for only one query.
|
// List query has support for only one query
|
||||||
if q.UseLogsNewSchema && params.CompositeQuery != nil && len(params.CompositeQuery.BuilderQueries) == 1 {
|
// we are skipping for PanelTypeTrace as it has a custom order by regardless of what's in the payload
|
||||||
|
if params.CompositeQuery != nil &&
|
||||||
|
len(params.CompositeQuery.BuilderQueries) == 1 &&
|
||||||
|
params.CompositeQuery.PanelType != v3.PanelTypeTrace {
|
||||||
for _, v := range params.CompositeQuery.BuilderQueries {
|
for _, v := range params.CompositeQuery.BuilderQueries {
|
||||||
|
if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) ||
|
||||||
|
(v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
// only allow of logs queries with timestamp ordering desc
|
// only allow of logs queries with timestamp ordering desc
|
||||||
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" {
|
// TODO(nitya): allow for timestamp asc
|
||||||
startEndArr := utils.GetLogsListTsRanges(params.Start, params.End)
|
if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) &&
|
||||||
if len(startEndArr) > 0 {
|
len(v.OrderBy) == 1 &&
|
||||||
return q.runLogsListQuery(ctx, params, startEndArr)
|
v.OrderBy[0].ColumnName == "timestamp" &&
|
||||||
}
|
v.OrderBy[0].Order == "desc" {
|
||||||
|
startEndArr := utils.GetListTsRanges(params.Start, params.End)
|
||||||
|
return q.runWindowBasedListQuery(ctx, params, startEndArr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,15 +5,21 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
||||||
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
||||||
"go.signoz.io/signoz/pkg/query-service/cache/inmemory"
|
"go.signoz.io/signoz/pkg/query-service/cache/inmemory"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/featureManager"
|
||||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
"go.signoz.io/signoz/pkg/query-service/querycache"
|
"go.signoz.io/signoz/pkg/query-service/querycache"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
func minTimestamp(series []*v3.Series) int64 {
|
func minTimestamp(series []*v3.Series) int64 {
|
||||||
@@ -1124,3 +1130,288 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type regexMatcher struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
|
||||||
|
re, err := regexp.Compile(expectedSQL)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !re.MatchString(actualSQL) {
|
||||||
|
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||||
|
params := &v3.QueryRangeParamsV3{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
|
PanelType: v3.PanelTypeList,
|
||||||
|
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||||
|
"A": {
|
||||||
|
QueryName: "A",
|
||||||
|
Expression: "A",
|
||||||
|
DataSource: v3.DataSourceTraces,
|
||||||
|
PageSize: 10,
|
||||||
|
Limit: 100,
|
||||||
|
StepInterval: 60,
|
||||||
|
AggregateOperator: v3.AggregateOperatorNoOp,
|
||||||
|
SelectColumns: []v3.AttributeKey{{Key: "serviceName"}},
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
tsRanges := []utils.LogsListTsRange{
|
||||||
|
{
|
||||||
|
Start: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
End: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
End: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
End: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryParams struct {
|
||||||
|
start int64
|
||||||
|
end int64
|
||||||
|
limit uint64
|
||||||
|
offset uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryResponse struct {
|
||||||
|
expectedQuery string
|
||||||
|
timestamps []uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// create test struct with moc data i.e array of timestamps, limit, offset and expected results
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
queryResponses []queryResponse
|
||||||
|
queryParams queryParams
|
||||||
|
expectedTimestamps []int64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "should return correct timestamps when querying within time window",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2",
|
||||||
|
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 2,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all data not in first windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3",
|
||||||
|
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{1722253000000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 3,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "data in multiple windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5",
|
||||||
|
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3",
|
||||||
|
timestamps: []uint64{1722253000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 2",
|
||||||
|
timestamps: []uint64{1722237700000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 5,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000, 1722237700000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "query with offset",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7",
|
||||||
|
timestamps: []uint64{1722259210000000000, 1722259220000000000, 1722259230000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4",
|
||||||
|
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{1722237700000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 4,
|
||||||
|
offset: 3,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722253000000000000, 1722254000000000000, 1722255000000000000, 1722237700000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "query with offset and limit- data spread across multiple windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 11",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11",
|
||||||
|
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8",
|
||||||
|
timestamps: []uint64{1722237700000000000, 1722237800000000000, 1722237900000000000, 1722237910000000000, 1722237920000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3",
|
||||||
|
timestamps: []uint64{1722208810000000000, 1722208820000000000, 1722208830000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 5,
|
||||||
|
offset: 6,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722237910000000000, 1722237920000000000, 1722208810000000000, 1722208820000000000, 1722208830000000000},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cols := []cmock.ColumnType{
|
||||||
|
{Name: "timestamp", Type: "UInt64"},
|
||||||
|
{Name: "name", Type: "String"},
|
||||||
|
}
|
||||||
|
testName := "name"
|
||||||
|
|
||||||
|
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||||
|
|
||||||
|
// iterate over test data, create reader and run test
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
// Setup mock
|
||||||
|
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, ®exMatcher{})
|
||||||
|
require.NoError(t, err, "Failed to create ClickHouse mock")
|
||||||
|
|
||||||
|
// Configure mock responses
|
||||||
|
for _, response := range tc.queryResponses {
|
||||||
|
values := make([][]any, 0, len(response.timestamps))
|
||||||
|
for _, ts := range response.timestamps {
|
||||||
|
values = append(values, []any{&ts, &testName})
|
||||||
|
}
|
||||||
|
// if len(values) > 0 {
|
||||||
|
mock.ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||||
|
cmock.NewRows(cols, values),
|
||||||
|
)
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create reader and querier
|
||||||
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||||
|
mock,
|
||||||
|
options,
|
||||||
|
nil,
|
||||||
|
"",
|
||||||
|
featureManager.StartManager(),
|
||||||
|
"",
|
||||||
|
true,
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
|
||||||
|
q := &querier{
|
||||||
|
reader: reader,
|
||||||
|
builder: queryBuilder.NewQueryBuilder(
|
||||||
|
queryBuilder.QueryBuilderOptions{
|
||||||
|
BuildTraceQuery: tracesV3.PrepareTracesQuery,
|
||||||
|
},
|
||||||
|
featureManager.StartManager(),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
// Update query parameters
|
||||||
|
params.Start = tc.queryParams.start
|
||||||
|
params.End = tc.queryParams.end
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset
|
||||||
|
|
||||||
|
// Execute query
|
||||||
|
results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRanges)
|
||||||
|
|
||||||
|
// Assertions
|
||||||
|
require.NoError(t, err, "Query execution failed")
|
||||||
|
require.Nil(t, errMap, "Unexpected error map in results")
|
||||||
|
require.Len(t, results, 1, "Expected exactly one result set")
|
||||||
|
|
||||||
|
result := results[0]
|
||||||
|
require.Equal(t, "A", result.QueryName, "Incorrect query name in results")
|
||||||
|
require.Len(t, result.List, len(tc.expectedTimestamps),
|
||||||
|
"Result count mismatch: got %d results, expected %d",
|
||||||
|
len(result.List), len(tc.expectedTimestamps))
|
||||||
|
|
||||||
|
for i, expected := range tc.expectedTimestamps {
|
||||||
|
require.Equal(t, expected, result.List[i].Timestamp.UnixNano(),
|
||||||
|
"Timestamp mismatch at index %d: got %d, expected %d",
|
||||||
|
i, result.List[i].Timestamp.UnixNano(), expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify mock expectations
|
||||||
|
err = mock.ExpectationsWereMet()
|
||||||
|
require.NoError(t, err, "Mock expectations were not met")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
|
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
|
||||||
metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
|
metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
|
||||||
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
||||||
|
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
|
||||||
"go.signoz.io/signoz/pkg/query-service/common"
|
"go.signoz.io/signoz/pkg/query-service/common"
|
||||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
@@ -158,11 +159,16 @@ func (q *querier) runBuilderQuery(
|
|||||||
|
|
||||||
if builderQuery.DataSource == v3.DataSourceTraces {
|
if builderQuery.DataSource == v3.DataSourceTraces {
|
||||||
|
|
||||||
|
tracesQueryBuilder := tracesV3.PrepareTracesQuery
|
||||||
|
if q.UseTraceNewSchema {
|
||||||
|
tracesQueryBuilder = tracesV4.PrepareTracesQuery
|
||||||
|
}
|
||||||
|
|
||||||
var query string
|
var query string
|
||||||
var err error
|
var err error
|
||||||
// for ts query with group by and limit form two queries
|
// for ts query with group by and limit form two queries
|
||||||
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
|
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
|
||||||
limitQuery, err := tracesV3.PrepareTracesQuery(
|
limitQuery, err := tracesQueryBuilder(
|
||||||
start,
|
start,
|
||||||
end,
|
end,
|
||||||
params.CompositeQuery.PanelType,
|
params.CompositeQuery.PanelType,
|
||||||
@@ -173,7 +179,7 @@ func (q *querier) runBuilderQuery(
|
|||||||
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
|
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
placeholderQuery, err := tracesV3.PrepareTracesQuery(
|
placeholderQuery, err := tracesQueryBuilder(
|
||||||
start,
|
start,
|
||||||
end,
|
end,
|
||||||
params.CompositeQuery.PanelType,
|
params.CompositeQuery.PanelType,
|
||||||
@@ -186,7 +192,7 @@ func (q *querier) runBuilderQuery(
|
|||||||
}
|
}
|
||||||
query = fmt.Sprintf(placeholderQuery, limitQuery)
|
query = fmt.Sprintf(placeholderQuery, limitQuery)
|
||||||
} else {
|
} else {
|
||||||
query, err = tracesV3.PrepareTracesQuery(
|
query, err = tracesQueryBuilder(
|
||||||
start,
|
start,
|
||||||
end,
|
end,
|
||||||
params.CompositeQuery.PanelType,
|
params.CompositeQuery.PanelType,
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
|
metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
||||||
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
||||||
|
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
|
||||||
"go.signoz.io/signoz/pkg/query-service/common"
|
"go.signoz.io/signoz/pkg/query-service/common"
|
||||||
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
|
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
|
||||||
"go.signoz.io/signoz/pkg/query-service/querycache"
|
"go.signoz.io/signoz/pkg/query-service/querycache"
|
||||||
@@ -48,10 +49,11 @@ type querier struct {
|
|||||||
testingMode bool
|
testingMode bool
|
||||||
queriesExecuted []string
|
queriesExecuted []string
|
||||||
// tuple of start and end time in milliseconds
|
// tuple of start and end time in milliseconds
|
||||||
timeRanges [][]int
|
timeRanges [][]int
|
||||||
returnedSeries []*v3.Series
|
returnedSeries []*v3.Series
|
||||||
returnedErr error
|
returnedErr error
|
||||||
UseLogsNewSchema bool
|
UseLogsNewSchema bool
|
||||||
|
UseTraceNewSchema bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type QuerierOptions struct {
|
type QuerierOptions struct {
|
||||||
@@ -62,10 +64,11 @@ type QuerierOptions struct {
|
|||||||
FeatureLookup interfaces.FeatureLookup
|
FeatureLookup interfaces.FeatureLookup
|
||||||
|
|
||||||
// used for testing
|
// used for testing
|
||||||
TestingMode bool
|
TestingMode bool
|
||||||
ReturnedSeries []*v3.Series
|
ReturnedSeries []*v3.Series
|
||||||
ReturnedErr error
|
ReturnedErr error
|
||||||
UseLogsNewSchema bool
|
UseLogsNewSchema bool
|
||||||
|
UseTraceNewSchema bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewQuerier(opts QuerierOptions) interfaces.Querier {
|
func NewQuerier(opts QuerierOptions) interfaces.Querier {
|
||||||
@@ -74,6 +77,11 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
|
|||||||
logsQueryBuilder = logsV4.PrepareLogsQuery
|
logsQueryBuilder = logsV4.PrepareLogsQuery
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tracesQueryBuilder := tracesV3.PrepareTracesQuery
|
||||||
|
if opts.UseTraceNewSchema {
|
||||||
|
tracesQueryBuilder = tracesV4.PrepareTracesQuery
|
||||||
|
}
|
||||||
|
|
||||||
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
|
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
|
||||||
|
|
||||||
return &querier{
|
return &querier{
|
||||||
@@ -84,16 +92,17 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
|
|||||||
fluxInterval: opts.FluxInterval,
|
fluxInterval: opts.FluxInterval,
|
||||||
|
|
||||||
builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{
|
builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{
|
||||||
BuildTraceQuery: tracesV3.PrepareTracesQuery,
|
BuildTraceQuery: tracesQueryBuilder,
|
||||||
BuildLogQuery: logsQueryBuilder,
|
BuildLogQuery: logsQueryBuilder,
|
||||||
BuildMetricQuery: metricsV4.PrepareMetricQuery,
|
BuildMetricQuery: metricsV4.PrepareMetricQuery,
|
||||||
}, opts.FeatureLookup),
|
}, opts.FeatureLookup),
|
||||||
featureLookUp: opts.FeatureLookup,
|
featureLookUp: opts.FeatureLookup,
|
||||||
|
|
||||||
testingMode: opts.TestingMode,
|
testingMode: opts.TestingMode,
|
||||||
returnedSeries: opts.ReturnedSeries,
|
returnedSeries: opts.ReturnedSeries,
|
||||||
returnedErr: opts.ReturnedErr,
|
returnedErr: opts.ReturnedErr,
|
||||||
UseLogsNewSchema: opts.UseLogsNewSchema,
|
UseLogsNewSchema: opts.UseLogsNewSchema,
|
||||||
|
UseTraceNewSchema: opts.UseTraceNewSchema,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -308,56 +317,115 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
|
|||||||
return results, errQueriesByName, err
|
return results, errQueriesByName, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
|
func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
|
||||||
res := make([]*v3.Result, 0)
|
res := make([]*v3.Result, 0)
|
||||||
qName := ""
|
qName := ""
|
||||||
pageSize := uint64(0)
|
pageSize := uint64(0)
|
||||||
|
limit := uint64(0)
|
||||||
|
offset := uint64(0)
|
||||||
|
|
||||||
// se we are considering only one query
|
// se we are considering only one query
|
||||||
for name, v := range params.CompositeQuery.BuilderQueries {
|
for name, v := range params.CompositeQuery.BuilderQueries {
|
||||||
qName = name
|
qName = name
|
||||||
pageSize = v.PageSize
|
pageSize = v.PageSize
|
||||||
|
|
||||||
|
// for traces specifically
|
||||||
|
limit = v.Limit
|
||||||
|
offset = v.Offset
|
||||||
}
|
}
|
||||||
data := []*v3.Row{}
|
data := []*v3.Row{}
|
||||||
|
|
||||||
|
tracesLimit := limit + offset
|
||||||
|
|
||||||
for _, v := range tsRanges {
|
for _, v := range tsRanges {
|
||||||
params.Start = v.Start
|
params.Start = v.Start
|
||||||
params.End = v.End
|
params.End = v.End
|
||||||
|
|
||||||
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
|
length := uint64(0)
|
||||||
queries, err := q.builder.PrepareQueries(params)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// this will to run only once
|
// this will to run only once
|
||||||
for name, query := range queries {
|
|
||||||
rowList, err := q.reader.GetListResultV3(ctx, query)
|
// appending the filter to get the next set of data
|
||||||
|
if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs {
|
||||||
|
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
|
||||||
|
queries, err := q.builder.PrepareQueries(params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errs := []error{err}
|
return nil, nil, err
|
||||||
errQuriesByName := map[string]error{
|
}
|
||||||
name: err,
|
for name, query := range queries {
|
||||||
}
|
rowList, err := q.reader.GetListResultV3(ctx, query)
|
||||||
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
if err != nil {
|
||||||
|
errs := []error{err}
|
||||||
|
errQuriesByName := map[string]error{
|
||||||
|
name: err,
|
||||||
|
}
|
||||||
|
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||||
|
}
|
||||||
|
length += uint64(len(rowList))
|
||||||
|
data = append(data, rowList...)
|
||||||
}
|
}
|
||||||
data = append(data, rowList...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// append a filter to the params
|
if length > 0 {
|
||||||
if len(data) > 0 {
|
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
|
||||||
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
|
Key: v3.AttributeKey{
|
||||||
Key: v3.AttributeKey{
|
Key: "id",
|
||||||
Key: "id",
|
IsColumn: true,
|
||||||
IsColumn: true,
|
DataType: "string",
|
||||||
DataType: "string",
|
},
|
||||||
},
|
Operator: v3.FilterOperatorLessThan,
|
||||||
Operator: v3.FilterOperatorLessThan,
|
Value: data[len(data)-1].Data["id"],
|
||||||
Value: data[len(data)-1].Data["id"],
|
})
|
||||||
})
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if uint64(len(data)) >= pageSize {
|
if uint64(len(data)) >= pageSize {
|
||||||
break
|
break
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// TRACE
|
||||||
|
// we are updating the offset and limit based on the number of traces we have found in the current timerange
|
||||||
|
// eg -
|
||||||
|
// 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
||||||
|
//
|
||||||
|
// if 100 traces are there in [t1, t10] then 100 will return immediately.
|
||||||
|
// if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20]
|
||||||
|
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100
|
||||||
|
|
||||||
|
//
|
||||||
|
// 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
||||||
|
//
|
||||||
|
// If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces
|
||||||
|
// If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20]
|
||||||
|
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0
|
||||||
|
params.CompositeQuery.BuilderQueries[qName].Offset = 0
|
||||||
|
params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit
|
||||||
|
queries, err := q.builder.PrepareQueries(params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
for name, query := range queries {
|
||||||
|
rowList, err := q.reader.GetListResultV3(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
errs := []error{err}
|
||||||
|
errQuriesByName := map[string]error{
|
||||||
|
name: err,
|
||||||
|
}
|
||||||
|
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||||
|
}
|
||||||
|
length += uint64(len(rowList))
|
||||||
|
|
||||||
|
// skip the traces unless offset is 0
|
||||||
|
for _, row := range rowList {
|
||||||
|
if offset == 0 {
|
||||||
|
data = append(data, row)
|
||||||
|
} else {
|
||||||
|
offset--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tracesLimit = tracesLimit - length
|
||||||
|
|
||||||
|
if uint64(len(data)) >= limit {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
res = append(res, &v3.Result{
|
res = append(res, &v3.Result{
|
||||||
@@ -369,14 +437,24 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar
|
|||||||
|
|
||||||
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
|
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
|
||||||
// List query has support for only one query.
|
// List query has support for only one query.
|
||||||
if q.UseLogsNewSchema && params.CompositeQuery != nil && len(params.CompositeQuery.BuilderQueries) == 1 {
|
// we are skipping for PanelTypeTrace as it has a custom order by regardless of what's in the payload
|
||||||
|
if params.CompositeQuery != nil &&
|
||||||
|
len(params.CompositeQuery.BuilderQueries) == 1 &&
|
||||||
|
params.CompositeQuery.PanelType != v3.PanelTypeTrace {
|
||||||
for _, v := range params.CompositeQuery.BuilderQueries {
|
for _, v := range params.CompositeQuery.BuilderQueries {
|
||||||
|
if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) ||
|
||||||
|
(v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
// only allow of logs queries with timestamp ordering desc
|
// only allow of logs queries with timestamp ordering desc
|
||||||
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" {
|
// TODO(nitya): allow for timestamp asc
|
||||||
startEndArr := utils.GetLogsListTsRanges(params.Start, params.End)
|
if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) &&
|
||||||
if len(startEndArr) > 0 {
|
len(v.OrderBy) == 1 &&
|
||||||
return q.runLogsListQuery(ctx, params, startEndArr)
|
v.OrderBy[0].ColumnName == "timestamp" &&
|
||||||
}
|
v.OrderBy[0].Order == "desc" {
|
||||||
|
startEndArr := utils.GetListTsRanges(params.Start, params.End)
|
||||||
|
return q.runWindowBasedListQuery(ctx, params, startEndArr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,15 +5,21 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
||||||
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
||||||
"go.signoz.io/signoz/pkg/query-service/cache/inmemory"
|
"go.signoz.io/signoz/pkg/query-service/cache/inmemory"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/featureManager"
|
||||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
"go.signoz.io/signoz/pkg/query-service/querycache"
|
"go.signoz.io/signoz/pkg/query-service/querycache"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
func minTimestamp(series []*v3.Series) int64 {
|
func minTimestamp(series []*v3.Series) int64 {
|
||||||
@@ -798,8 +804,8 @@ func TestV2QueryRangeValueType(t *testing.T) {
|
|||||||
}
|
}
|
||||||
q := NewQuerier(opts)
|
q := NewQuerier(opts)
|
||||||
expectedTimeRangeInQueryString := []string{
|
expectedTimeRangeInQueryString := []string{
|
||||||
fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000, 1675115580000+120*60*1000), // 31st Jan, 03:23:00 to 31st Jan, 05:23:00
|
fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000, 1675115580000+120*60*1000), // 31st Jan, 03:23:00 to 31st Jan, 05:23:00
|
||||||
fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00
|
fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00
|
||||||
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675119196722)*int64(1000000), (1675126396722)*int64(1000000)), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00
|
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675119196722)*int64(1000000), (1675126396722)*int64(1000000)), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1178,3 +1184,288 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type regexMatcher struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
|
||||||
|
re, err := regexp.Compile(expectedSQL)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !re.MatchString(actualSQL) {
|
||||||
|
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||||
|
params := &v3.QueryRangeParamsV3{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
|
PanelType: v3.PanelTypeList,
|
||||||
|
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||||
|
"A": {
|
||||||
|
QueryName: "A",
|
||||||
|
Expression: "A",
|
||||||
|
DataSource: v3.DataSourceTraces,
|
||||||
|
PageSize: 10,
|
||||||
|
Limit: 100,
|
||||||
|
StepInterval: 60,
|
||||||
|
AggregateOperator: v3.AggregateOperatorNoOp,
|
||||||
|
SelectColumns: []v3.AttributeKey{{Key: "serviceName"}},
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
tsRanges := []utils.LogsListTsRange{
|
||||||
|
{
|
||||||
|
Start: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
End: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
End: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
End: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryParams struct {
|
||||||
|
start int64
|
||||||
|
end int64
|
||||||
|
limit uint64
|
||||||
|
offset uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryResponse struct {
|
||||||
|
expectedQuery string
|
||||||
|
timestamps []uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// create test struct with moc data i.e array of timestamps, limit, offset and expected results
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
queryResponses []queryResponse
|
||||||
|
queryParams queryParams
|
||||||
|
expectedTimestamps []int64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "should return correct timestamps when querying within time window",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2",
|
||||||
|
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 2,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all data not in first windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3",
|
||||||
|
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{1722253000000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 3,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "data in multiple windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5",
|
||||||
|
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3",
|
||||||
|
timestamps: []uint64{1722253000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 2",
|
||||||
|
timestamps: []uint64{1722237700000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 5,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000, 1722237700000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "query with offset",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7",
|
||||||
|
timestamps: []uint64{1722259210000000000, 1722259220000000000, 1722259230000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4",
|
||||||
|
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{1722237700000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 4,
|
||||||
|
offset: 3,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722253000000000000, 1722254000000000000, 1722255000000000000, 1722237700000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "query with offset and limit- data spread across multiple windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 11",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11",
|
||||||
|
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8",
|
||||||
|
timestamps: []uint64{1722237700000000000, 1722237800000000000, 1722237900000000000, 1722237910000000000, 1722237920000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3",
|
||||||
|
timestamps: []uint64{1722208810000000000, 1722208820000000000, 1722208830000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 5,
|
||||||
|
offset: 6,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722237910000000000, 1722237920000000000, 1722208810000000000, 1722208820000000000, 1722208830000000000},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cols := []cmock.ColumnType{
|
||||||
|
{Name: "timestamp", Type: "UInt64"},
|
||||||
|
{Name: "name", Type: "String"},
|
||||||
|
}
|
||||||
|
testName := "name"
|
||||||
|
|
||||||
|
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||||
|
|
||||||
|
// iterate over test data, create reader and run test
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
// Setup mock
|
||||||
|
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, ®exMatcher{})
|
||||||
|
require.NoError(t, err, "Failed to create ClickHouse mock")
|
||||||
|
|
||||||
|
// Configure mock responses
|
||||||
|
for _, response := range tc.queryResponses {
|
||||||
|
values := make([][]any, 0, len(response.timestamps))
|
||||||
|
for _, ts := range response.timestamps {
|
||||||
|
values = append(values, []any{&ts, &testName})
|
||||||
|
}
|
||||||
|
// if len(values) > 0 {
|
||||||
|
mock.ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||||
|
cmock.NewRows(cols, values),
|
||||||
|
)
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create reader and querier
|
||||||
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||||
|
mock,
|
||||||
|
options,
|
||||||
|
nil,
|
||||||
|
"",
|
||||||
|
featureManager.StartManager(),
|
||||||
|
"",
|
||||||
|
true,
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
|
||||||
|
q := &querier{
|
||||||
|
reader: reader,
|
||||||
|
builder: queryBuilder.NewQueryBuilder(
|
||||||
|
queryBuilder.QueryBuilderOptions{
|
||||||
|
BuildTraceQuery: tracesV3.PrepareTracesQuery,
|
||||||
|
},
|
||||||
|
featureManager.StartManager(),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
// Update query parameters
|
||||||
|
params.Start = tc.queryParams.start
|
||||||
|
params.End = tc.queryParams.end
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset
|
||||||
|
|
||||||
|
// Execute query
|
||||||
|
results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRanges)
|
||||||
|
|
||||||
|
// Assertions
|
||||||
|
require.NoError(t, err, "Query execution failed")
|
||||||
|
require.Nil(t, errMap, "Unexpected error map in results")
|
||||||
|
require.Len(t, results, 1, "Expected exactly one result set")
|
||||||
|
|
||||||
|
result := results[0]
|
||||||
|
require.Equal(t, "A", result.QueryName, "Incorrect query name in results")
|
||||||
|
require.Len(t, result.List, len(tc.expectedTimestamps),
|
||||||
|
"Result count mismatch: got %d results, expected %d",
|
||||||
|
len(result.List), len(tc.expectedTimestamps))
|
||||||
|
|
||||||
|
for i, expected := range tc.expectedTimestamps {
|
||||||
|
require.Equal(t, expected, result.List[i].Timestamp.UnixNano(),
|
||||||
|
"Timestamp mismatch at index %d: got %d, expected %d",
|
||||||
|
i, result.List[i].Timestamp.UnixNano(), expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify mock expectations
|
||||||
|
err = mock.ExpectationsWereMet()
|
||||||
|
require.NoError(t, err, "Mock expectations were not met")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -67,6 +67,7 @@ type ServerOptions struct {
|
|||||||
FluxInterval string
|
FluxInterval string
|
||||||
Cluster string
|
Cluster string
|
||||||
UseLogsNewSchema bool
|
UseLogsNewSchema bool
|
||||||
|
UseTraceNewSchema bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Server runs HTTP, Mux and a grpc server
|
// Server runs HTTP, Mux and a grpc server
|
||||||
@@ -130,6 +131,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
serverOptions.DialTimeout,
|
serverOptions.DialTimeout,
|
||||||
serverOptions.Cluster,
|
serverOptions.Cluster,
|
||||||
serverOptions.UseLogsNewSchema,
|
serverOptions.UseLogsNewSchema,
|
||||||
|
serverOptions.UseTraceNewSchema,
|
||||||
)
|
)
|
||||||
go clickhouseReader.Start(readerReady)
|
go clickhouseReader.Start(readerReady)
|
||||||
reader = clickhouseReader
|
reader = clickhouseReader
|
||||||
@@ -157,7 +159,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
rm, err := makeRulesManager(
|
rm, err := makeRulesManager(
|
||||||
serverOptions.PromConfigPath,
|
serverOptions.PromConfigPath,
|
||||||
constants.GetAlertManagerApiPrefix(),
|
constants.GetAlertManagerApiPrefix(),
|
||||||
serverOptions.RuleRepoURL, localDB, reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema)
|
serverOptions.RuleRepoURL, localDB, reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -202,6 +204,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
Cache: c,
|
Cache: c,
|
||||||
FluxInterval: fluxInterval,
|
FluxInterval: fluxInterval,
|
||||||
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
|
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
|
||||||
|
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -721,7 +724,8 @@ func makeRulesManager(
|
|||||||
cache cache.Cache,
|
cache cache.Cache,
|
||||||
disableRules bool,
|
disableRules bool,
|
||||||
fm interfaces.FeatureLookup,
|
fm interfaces.FeatureLookup,
|
||||||
useLogsNewSchema bool) (*rules.Manager, error) {
|
useLogsNewSchema bool,
|
||||||
|
useTraceNewSchema bool) (*rules.Manager, error) {
|
||||||
|
|
||||||
// create engine
|
// create engine
|
||||||
pqle, err := pqle.FromReader(ch)
|
pqle, err := pqle.FromReader(ch)
|
||||||
@@ -738,18 +742,19 @@ func makeRulesManager(
|
|||||||
|
|
||||||
// create manager opts
|
// create manager opts
|
||||||
managerOpts := &rules.ManagerOptions{
|
managerOpts := &rules.ManagerOptions{
|
||||||
NotifierOpts: notifierOpts,
|
NotifierOpts: notifierOpts,
|
||||||
PqlEngine: pqle,
|
PqlEngine: pqle,
|
||||||
RepoURL: ruleRepoURL,
|
RepoURL: ruleRepoURL,
|
||||||
DBConn: db,
|
DBConn: db,
|
||||||
Context: context.Background(),
|
Context: context.Background(),
|
||||||
Logger: zap.L(),
|
Logger: zap.L(),
|
||||||
DisableRules: disableRules,
|
DisableRules: disableRules,
|
||||||
FeatureFlags: fm,
|
FeatureFlags: fm,
|
||||||
Reader: ch,
|
Reader: ch,
|
||||||
Cache: cache,
|
Cache: cache,
|
||||||
EvalDelay: constants.GetEvalDelay(),
|
EvalDelay: constants.GetEvalDelay(),
|
||||||
UseLogsNewSchema: useLogsNewSchema,
|
UseLogsNewSchema: useLogsNewSchema,
|
||||||
|
UseTraceNewSchema: useTraceNewSchema,
|
||||||
}
|
}
|
||||||
|
|
||||||
// create Manager
|
// create Manager
|
||||||
|
|||||||
104
pkg/query-service/app/traces/v4/enrich.go
Normal file
104
pkg/query-service/app/traces/v4/enrich.go
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
package v4
|
||||||
|
|
||||||
|
import (
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||||
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
func isEnriched(field v3.AttributeKey) bool {
|
||||||
|
// if it is timestamp/id dont check
|
||||||
|
if field.Key == "timestamp" || field.Key == "id" || field.Key == constants.SigNozOrderByValue {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// don't need to enrich the static fields as they will be always used a column
|
||||||
|
if _, ok := constants.StaticFieldsTraces[field.Key]; ok && field.IsColumn {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func enrichKeyWithMetadata(key v3.AttributeKey, keys map[string]v3.AttributeKey) v3.AttributeKey {
|
||||||
|
if isEnriched(key) {
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, ok := constants.StaticFieldsTraces[key.Key]; ok {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, key := range utils.GenerateEnrichmentKeys(key) {
|
||||||
|
if val, ok := keys[key]; ok {
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// enrich with default values if metadata is not found
|
||||||
|
if key.Type == "" {
|
||||||
|
key.Type = v3.AttributeKeyTypeTag
|
||||||
|
}
|
||||||
|
if key.DataType == "" {
|
||||||
|
key.DataType = v3.AttributeKeyDataTypeString
|
||||||
|
}
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
|
||||||
|
func Enrich(params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) {
|
||||||
|
if params.CompositeQuery.QueryType == v3.QueryTypeBuilder {
|
||||||
|
for _, query := range params.CompositeQuery.BuilderQueries {
|
||||||
|
if query.DataSource == v3.DataSourceTraces {
|
||||||
|
EnrichTracesQuery(query, keys)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func EnrichTracesQuery(query *v3.BuilderQuery, keys map[string]v3.AttributeKey) {
|
||||||
|
// enrich aggregate attribute
|
||||||
|
query.AggregateAttribute = enrichKeyWithMetadata(query.AggregateAttribute, keys)
|
||||||
|
// enrich filter items
|
||||||
|
if query.Filters != nil && len(query.Filters.Items) > 0 {
|
||||||
|
for idx, filter := range query.Filters.Items {
|
||||||
|
query.Filters.Items[idx].Key = enrichKeyWithMetadata(filter.Key, keys)
|
||||||
|
// if the serviceName column is used, use the corresponding resource attribute as well during filtering
|
||||||
|
if filter.Key.Key == "serviceName" && filter.Key.IsColumn {
|
||||||
|
query.Filters.Items[idx].Key = v3.AttributeKey{
|
||||||
|
Key: "service.name",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeResource,
|
||||||
|
IsColumn: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// enrich group by
|
||||||
|
for idx, groupBy := range query.GroupBy {
|
||||||
|
query.GroupBy[idx] = enrichKeyWithMetadata(groupBy, keys)
|
||||||
|
}
|
||||||
|
// enrich order by
|
||||||
|
query.OrderBy = enrichOrderBy(query.OrderBy, keys)
|
||||||
|
// enrich select columns
|
||||||
|
for idx, selectColumn := range query.SelectColumns {
|
||||||
|
query.SelectColumns[idx] = enrichKeyWithMetadata(selectColumn, keys)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func enrichOrderBy(items []v3.OrderBy, keys map[string]v3.AttributeKey) []v3.OrderBy {
|
||||||
|
enrichedItems := []v3.OrderBy{}
|
||||||
|
for i := 0; i < len(items); i++ {
|
||||||
|
attributeKey := enrichKeyWithMetadata(v3.AttributeKey{
|
||||||
|
Key: items[i].ColumnName,
|
||||||
|
}, keys)
|
||||||
|
enrichedItems = append(enrichedItems, v3.OrderBy{
|
||||||
|
ColumnName: items[i].ColumnName,
|
||||||
|
Order: items[i].Order,
|
||||||
|
Key: attributeKey.Key,
|
||||||
|
DataType: attributeKey.DataType,
|
||||||
|
Type: attributeKey.Type,
|
||||||
|
IsColumn: attributeKey.IsColumn,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return enrichedItems
|
||||||
|
}
|
||||||
97
pkg/query-service/app/traces/v4/enrich_test.go
Normal file
97
pkg/query-service/app/traces/v4/enrich_test.go
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
package v4
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEnrichTracesQuery(t *testing.T) {
|
||||||
|
type args struct {
|
||||||
|
query *v3.BuilderQuery
|
||||||
|
keys map[string]v3.AttributeKey
|
||||||
|
want *v3.BuilderQuery
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "test 1",
|
||||||
|
args: args{
|
||||||
|
query: &v3.BuilderQuery{
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{Key: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag}, Value: 100, Operator: ">"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
keys: map[string]v3.AttributeKey{
|
||||||
|
"bytes##tag##int64": {Key: "bytes", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag},
|
||||||
|
},
|
||||||
|
want: &v3.BuilderQuery{
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{Key: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeInt64}, Value: 100, Operator: ">"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "test service name",
|
||||||
|
args: args{
|
||||||
|
query: &v3.BuilderQuery{
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{Key: v3.AttributeKey{Key: "serviceName", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "myservice", Operator: "="},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
keys: map[string]v3.AttributeKey{},
|
||||||
|
want: &v3.BuilderQuery{
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{Key: v3.AttributeKey{Key: "service.name", Type: v3.AttributeKeyTypeResource, DataType: v3.AttributeKeyDataTypeString}, Value: "myservice", Operator: "="},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "test mat attrs",
|
||||||
|
args: args{
|
||||||
|
query: &v3.BuilderQuery{
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "/api", Operator: "="},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
keys: map[string]v3.AttributeKey{},
|
||||||
|
want: &v3.BuilderQuery{
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "/api", Operator: "="},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
EnrichTracesQuery(tt.args.query, tt.args.keys)
|
||||||
|
if !reflect.DeepEqual(tt.args.query.Filters.Items[0].Key, tt.args.want.Filters.Items[0].Key) {
|
||||||
|
t.Errorf("EnrichTracesQuery() = %v, want %v", tt.args.query, tt.args.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
216
pkg/query-service/app/traces/v4/utils.go
Normal file
216
pkg/query-service/app/traces/v4/utils.go
Normal file
@@ -0,0 +1,216 @@
|
|||||||
|
package v4
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
var TracesListViewDefaultSelectedColumns = []v3.AttributeKey{
|
||||||
|
{
|
||||||
|
Key: "serviceName",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
IsColumn: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: "name",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
IsColumn: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: "durationNano",
|
||||||
|
DataType: v3.AttributeKeyDataTypeArrayFloat64,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
IsColumn: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: "httpMethod",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
IsColumn: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: "responseStatusCode",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
IsColumn: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if traceId filter is used in traces query and return the list of traceIds
|
||||||
|
func TraceIdFilterUsedWithEqual(params *v3.QueryRangeParamsV3) (bool, []string) {
|
||||||
|
compositeQuery := params.CompositeQuery
|
||||||
|
if compositeQuery == nil {
|
||||||
|
return false, []string{}
|
||||||
|
}
|
||||||
|
var traceIds []string
|
||||||
|
var traceIdFilterUsed bool
|
||||||
|
|
||||||
|
// Build queries for each builder query
|
||||||
|
for queryName, query := range compositeQuery.BuilderQueries {
|
||||||
|
if query.Expression != queryName && query.DataSource != v3.DataSourceTraces {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// check filter attribute
|
||||||
|
if query.Filters != nil && len(query.Filters.Items) != 0 {
|
||||||
|
for _, item := range query.Filters.Items {
|
||||||
|
|
||||||
|
if item.Key.Key == "traceID" && (item.Operator == v3.FilterOperatorIn ||
|
||||||
|
item.Operator == v3.FilterOperatorEqual) {
|
||||||
|
traceIdFilterUsed = true
|
||||||
|
// validate value
|
||||||
|
var err error
|
||||||
|
val := item.Value
|
||||||
|
val, err = utils.ValidateAndCastValue(val, item.Key.DataType)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("invalid value for key", zap.String("key", item.Key.Key), zap.Error(err))
|
||||||
|
return false, []string{}
|
||||||
|
}
|
||||||
|
if val != nil {
|
||||||
|
fmtVal := extractFormattedStringValues(val)
|
||||||
|
traceIds = append(traceIds, fmtVal...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
zap.L().Debug("traceIds", zap.Any("traceIds", traceIds))
|
||||||
|
return traceIdFilterUsed, traceIds
|
||||||
|
}
|
||||||
|
|
||||||
|
func extractFormattedStringValues(v interface{}) []string {
|
||||||
|
// if it's pointer convert it to a value
|
||||||
|
v = getPointerValue(v)
|
||||||
|
|
||||||
|
switch x := v.(type) {
|
||||||
|
case string:
|
||||||
|
return []string{x}
|
||||||
|
|
||||||
|
case []interface{}:
|
||||||
|
if len(x) == 0 {
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
switch x[0].(type) {
|
||||||
|
case string:
|
||||||
|
values := []string{}
|
||||||
|
for _, val := range x {
|
||||||
|
values = append(values, val.(string))
|
||||||
|
}
|
||||||
|
return values
|
||||||
|
default:
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPointerValue(v interface{}) interface{} {
|
||||||
|
switch x := v.(type) {
|
||||||
|
case *uint8:
|
||||||
|
return *x
|
||||||
|
case *uint16:
|
||||||
|
return *x
|
||||||
|
case *uint32:
|
||||||
|
return *x
|
||||||
|
case *uint64:
|
||||||
|
return *x
|
||||||
|
case *int:
|
||||||
|
return *x
|
||||||
|
case *int8:
|
||||||
|
return *x
|
||||||
|
case *int16:
|
||||||
|
return *x
|
||||||
|
case *int32:
|
||||||
|
return *x
|
||||||
|
case *int64:
|
||||||
|
return *x
|
||||||
|
case *float32:
|
||||||
|
return *x
|
||||||
|
case *float64:
|
||||||
|
return *x
|
||||||
|
case *string:
|
||||||
|
return *x
|
||||||
|
case *bool:
|
||||||
|
return *x
|
||||||
|
case []interface{}:
|
||||||
|
values := []interface{}{}
|
||||||
|
for _, val := range x {
|
||||||
|
values = append(values, getPointerValue(val))
|
||||||
|
}
|
||||||
|
return values
|
||||||
|
default:
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func AddTimestampFilters(minTime int64, maxTime int64, params *v3.QueryRangeParamsV3) {
|
||||||
|
if minTime == 0 && maxTime == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
compositeQuery := params.CompositeQuery
|
||||||
|
if compositeQuery == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Build queries for each builder query and apply timestamp filter only if TraceID is present
|
||||||
|
for queryName, query := range compositeQuery.BuilderQueries {
|
||||||
|
if query.Expression != queryName && query.DataSource != v3.DataSourceTraces {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
addTimeStampFilter := false
|
||||||
|
|
||||||
|
// check filter attribute
|
||||||
|
if query.Filters != nil && len(query.Filters.Items) != 0 {
|
||||||
|
for _, item := range query.Filters.Items {
|
||||||
|
if item.Key.Key == "traceID" && (item.Operator == v3.FilterOperatorIn ||
|
||||||
|
item.Operator == v3.FilterOperatorEqual) {
|
||||||
|
addTimeStampFilter = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// add timestamp filter to query only if traceID filter along with equal/similar operator is used
|
||||||
|
if addTimeStampFilter {
|
||||||
|
timeFilters := []v3.FilterItem{
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "timestamp",
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
IsColumn: true,
|
||||||
|
},
|
||||||
|
Value: strconv.FormatUint(uint64(minTime), 10),
|
||||||
|
Operator: v3.FilterOperatorGreaterThanOrEq,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "timestamp",
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
IsColumn: true,
|
||||||
|
},
|
||||||
|
Value: strconv.FormatUint(uint64(maxTime), 10),
|
||||||
|
Operator: v3.FilterOperatorLessThanOrEq,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// add new timestamp filter to query
|
||||||
|
if query.Filters == nil {
|
||||||
|
query.Filters = &v3.FilterSet{
|
||||||
|
Items: timeFilters,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
query.Filters.Items = append(query.Filters.Items, timeFilters...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -238,8 +238,8 @@ const (
|
|||||||
SIGNOZ_EXP_HISTOGRAM_TABLENAME = "distributed_exp_hist"
|
SIGNOZ_EXP_HISTOGRAM_TABLENAME = "distributed_exp_hist"
|
||||||
SIGNOZ_TRACE_DBNAME = "signoz_traces"
|
SIGNOZ_TRACE_DBNAME = "signoz_traces"
|
||||||
SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2"
|
SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2"
|
||||||
SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME = "signoz_index_v2"
|
|
||||||
SIGNOZ_SPAN_INDEX_V3 = "distributed_signoz_index_v3"
|
SIGNOZ_SPAN_INDEX_V3 = "distributed_signoz_index_v3"
|
||||||
|
SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME = "signoz_index_v2"
|
||||||
SIGNOZ_SPAN_INDEX_V3_LOCAL_TABLENAME = "signoz_index_v3"
|
SIGNOZ_SPAN_INDEX_V3_LOCAL_TABLENAME = "signoz_index_v3"
|
||||||
SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4"
|
SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4"
|
||||||
SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs"
|
SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs"
|
||||||
|
|||||||
@@ -183,7 +183,7 @@ func PrepareFilters(labels map[string]string, whereClauseItems []v3.FilterItem,
|
|||||||
var attrFound bool
|
var attrFound bool
|
||||||
|
|
||||||
// as of now this logic will only apply for logs
|
// as of now this logic will only apply for logs
|
||||||
for _, tKey := range utils.GenerateLogEnrichmentKeys(v3.AttributeKey{Key: key}) {
|
for _, tKey := range utils.GenerateEnrichmentKeys(v3.AttributeKey{Key: key}) {
|
||||||
if val, ok := keys[tKey]; ok {
|
if val, ok := keys[tKey]; ok {
|
||||||
attributeKey = val
|
attributeKey = val
|
||||||
attrFound = true
|
attrFound = true
|
||||||
|
|||||||
@@ -39,6 +39,7 @@ func main() {
|
|||||||
var disableRules bool
|
var disableRules bool
|
||||||
|
|
||||||
var useLogsNewSchema bool
|
var useLogsNewSchema bool
|
||||||
|
var useTraceNewSchema bool
|
||||||
// the url used to build link in the alert messages in slack and other systems
|
// the url used to build link in the alert messages in slack and other systems
|
||||||
var ruleRepoURL, cacheConfigPath, fluxInterval string
|
var ruleRepoURL, cacheConfigPath, fluxInterval string
|
||||||
var cluster string
|
var cluster string
|
||||||
@@ -50,6 +51,7 @@ func main() {
|
|||||||
var dialTimeout time.Duration
|
var dialTimeout time.Duration
|
||||||
|
|
||||||
flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
|
flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
|
||||||
|
flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces")
|
||||||
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
|
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
|
||||||
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
|
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
|
||||||
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
|
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
|
||||||
@@ -87,6 +89,7 @@ func main() {
|
|||||||
FluxInterval: fluxInterval,
|
FluxInterval: fluxInterval,
|
||||||
Cluster: cluster,
|
Cluster: cluster,
|
||||||
UseLogsNewSchema: useLogsNewSchema,
|
UseLogsNewSchema: useLogsNewSchema,
|
||||||
|
UseTraceNewSchema: useTraceNewSchema,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the jwt secret key
|
// Read the jwt secret key
|
||||||
|
|||||||
@@ -269,6 +269,32 @@ type SearchSpanResponseItem struct {
|
|||||||
SpanKind string `json:"spanKind"`
|
SpanKind string `json:"spanKind"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SearchSpanResponseItemV2 struct {
|
||||||
|
TimeUnixNano time.Time `json:"timestamp" ch:"timestamp"`
|
||||||
|
DurationNano uint64 `json:"durationNano" ch:"durationNano"`
|
||||||
|
SpanID string `json:"spanId" ch:"spanID"`
|
||||||
|
TraceID string `json:"traceId" ch:"traceID"`
|
||||||
|
HasError bool `json:"hasError" ch:"hasError"`
|
||||||
|
Kind int8 `json:"kind" ch:"kind"`
|
||||||
|
ServiceName string `json:"serviceName" ch:"serviceName"`
|
||||||
|
Name string `json:"name" ch:"name"`
|
||||||
|
References string `json:"references,omitempty" ch:"references"`
|
||||||
|
Attributes_string map[string]string `json:"attributes_string" ch:"attributes_string"`
|
||||||
|
Attributes_number map[string]float64 `json:"attributes_number" ch:"attributes_number"`
|
||||||
|
Attributes_bool map[string]bool `json:"attributes_bool" ch:"attributes_bool"`
|
||||||
|
Events []string `json:"event" ch:"events"`
|
||||||
|
StatusMessage string `json:"statusMessage" ch:"statusMessage"`
|
||||||
|
StatusCodeString string `json:"statusCodeString" ch:"statusCodeString"`
|
||||||
|
SpanKind string `json:"spanKind" ch:"spanKind"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type TraceSummary struct {
|
||||||
|
TraceID string `json:"traceId" ch:"trace_id"`
|
||||||
|
Start time.Time `json:"start" ch:"start"`
|
||||||
|
End time.Time `json:"end" ch:"end"`
|
||||||
|
NumSpans uint64 `json:"numSpans" ch:"num_spans"`
|
||||||
|
}
|
||||||
|
|
||||||
type OtelSpanRef struct {
|
type OtelSpanRef struct {
|
||||||
TraceId string `json:"traceId,omitempty"`
|
TraceId string `json:"traceId,omitempty"`
|
||||||
SpanId string `json:"spanId,omitempty"`
|
SpanId string `json:"spanId,omitempty"`
|
||||||
|
|||||||
@@ -463,9 +463,9 @@ func (r *BaseRule) ShouldAlert(series v3.Series) (Sample, bool) {
|
|||||||
}
|
}
|
||||||
} else if r.compareOp() == ValueOutsideBounds {
|
} else if r.compareOp() == ValueOutsideBounds {
|
||||||
for _, smpl := range series.Points {
|
for _, smpl := range series.Points {
|
||||||
if math.Abs(smpl.Value) >= r.targetVal() {
|
if math.Abs(smpl.Value) < r.targetVal() {
|
||||||
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||||
shouldAlert = true
|
shouldAlert = false
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -35,7 +35,8 @@ type PrepareTaskOptions struct {
|
|||||||
ManagerOpts *ManagerOptions
|
ManagerOpts *ManagerOptions
|
||||||
NotifyFunc NotifyFunc
|
NotifyFunc NotifyFunc
|
||||||
|
|
||||||
UseLogsNewSchema bool
|
UseLogsNewSchema bool
|
||||||
|
UseTraceNewSchema bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type PrepareTestRuleOptions struct {
|
type PrepareTestRuleOptions struct {
|
||||||
@@ -48,7 +49,8 @@ type PrepareTestRuleOptions struct {
|
|||||||
ManagerOpts *ManagerOptions
|
ManagerOpts *ManagerOptions
|
||||||
NotifyFunc NotifyFunc
|
NotifyFunc NotifyFunc
|
||||||
|
|
||||||
UseLogsNewSchema bool
|
UseLogsNewSchema bool
|
||||||
|
UseTraceNewSchema bool
|
||||||
}
|
}
|
||||||
|
|
||||||
const taskNamesuffix = "webAppEditor"
|
const taskNamesuffix = "webAppEditor"
|
||||||
@@ -91,9 +93,9 @@ type ManagerOptions struct {
|
|||||||
|
|
||||||
PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
|
PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
|
||||||
|
|
||||||
|
UseLogsNewSchema bool
|
||||||
|
UseTraceNewSchema bool
|
||||||
PrepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
|
PrepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
|
||||||
|
|
||||||
UseLogsNewSchema bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// The Manager manages recording and alerting rules.
|
// The Manager manages recording and alerting rules.
|
||||||
@@ -117,7 +119,8 @@ type Manager struct {
|
|||||||
prepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
|
prepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
|
||||||
prepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
|
prepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
|
||||||
|
|
||||||
UseLogsNewSchema bool
|
UseLogsNewSchema bool
|
||||||
|
UseTraceNewSchema bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultOptions(o *ManagerOptions) *ManagerOptions {
|
func defaultOptions(o *ManagerOptions) *ManagerOptions {
|
||||||
@@ -156,6 +159,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
|||||||
opts.FF,
|
opts.FF,
|
||||||
opts.Reader,
|
opts.Reader,
|
||||||
opts.UseLogsNewSchema,
|
opts.UseLogsNewSchema,
|
||||||
|
opts.UseTraceNewSchema,
|
||||||
WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -368,7 +372,8 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error {
|
|||||||
ManagerOpts: m.opts,
|
ManagerOpts: m.opts,
|
||||||
NotifyFunc: m.prepareNotifyFunc(),
|
NotifyFunc: m.prepareNotifyFunc(),
|
||||||
|
|
||||||
UseLogsNewSchema: m.opts.UseLogsNewSchema,
|
UseLogsNewSchema: m.opts.UseLogsNewSchema,
|
||||||
|
UseTraceNewSchema: m.opts.UseTraceNewSchema,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -490,7 +495,8 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error {
|
|||||||
ManagerOpts: m.opts,
|
ManagerOpts: m.opts,
|
||||||
NotifyFunc: m.prepareNotifyFunc(),
|
NotifyFunc: m.prepareNotifyFunc(),
|
||||||
|
|
||||||
UseLogsNewSchema: m.opts.UseLogsNewSchema,
|
UseLogsNewSchema: m.opts.UseLogsNewSchema,
|
||||||
|
UseTraceNewSchema: m.opts.UseTraceNewSchema,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -809,15 +815,16 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
|
|||||||
}
|
}
|
||||||
|
|
||||||
alertCount, apiErr := m.prepareTestRuleFunc(PrepareTestRuleOptions{
|
alertCount, apiErr := m.prepareTestRuleFunc(PrepareTestRuleOptions{
|
||||||
Rule: parsedRule,
|
Rule: parsedRule,
|
||||||
RuleDB: m.ruleDB,
|
RuleDB: m.ruleDB,
|
||||||
Logger: m.logger,
|
Logger: m.logger,
|
||||||
Reader: m.reader,
|
Reader: m.reader,
|
||||||
Cache: m.cache,
|
Cache: m.cache,
|
||||||
FF: m.featureFlags,
|
FF: m.featureFlags,
|
||||||
ManagerOpts: m.opts,
|
ManagerOpts: m.opts,
|
||||||
NotifyFunc: m.prepareNotifyFunc(),
|
NotifyFunc: m.prepareNotifyFunc(),
|
||||||
UseLogsNewSchema: m.opts.UseLogsNewSchema,
|
UseLogsNewSchema: m.opts.UseLogsNewSchema,
|
||||||
|
UseTraceNewSchema: m.opts.UseTraceNewSchema,
|
||||||
})
|
})
|
||||||
|
|
||||||
return alertCount, apiErr
|
return alertCount, apiErr
|
||||||
|
|||||||
@@ -49,6 +49,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
|
|||||||
opts.FF,
|
opts.FF,
|
||||||
opts.Reader,
|
opts.Reader,
|
||||||
opts.UseLogsNewSchema,
|
opts.UseLogsNewSchema,
|
||||||
|
opts.UseTraceNewSchema,
|
||||||
WithSendAlways(),
|
WithSendAlways(),
|
||||||
WithSendUnmatched(),
|
WithSendUnmatched(),
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -58,6 +58,7 @@ func NewThresholdRule(
|
|||||||
featureFlags interfaces.FeatureLookup,
|
featureFlags interfaces.FeatureLookup,
|
||||||
reader interfaces.Reader,
|
reader interfaces.Reader,
|
||||||
useLogsNewSchema bool,
|
useLogsNewSchema bool,
|
||||||
|
useTraceNewSchema bool,
|
||||||
opts ...RuleOption,
|
opts ...RuleOption,
|
||||||
) (*ThresholdRule, error) {
|
) (*ThresholdRule, error) {
|
||||||
|
|
||||||
@@ -74,19 +75,21 @@ func NewThresholdRule(
|
|||||||
}
|
}
|
||||||
|
|
||||||
querierOption := querier.QuerierOptions{
|
querierOption := querier.QuerierOptions{
|
||||||
Reader: reader,
|
Reader: reader,
|
||||||
Cache: nil,
|
Cache: nil,
|
||||||
KeyGenerator: queryBuilder.NewKeyGenerator(),
|
KeyGenerator: queryBuilder.NewKeyGenerator(),
|
||||||
FeatureLookup: featureFlags,
|
FeatureLookup: featureFlags,
|
||||||
UseLogsNewSchema: useLogsNewSchema,
|
UseLogsNewSchema: useLogsNewSchema,
|
||||||
|
UseTraceNewSchema: useTraceNewSchema,
|
||||||
}
|
}
|
||||||
|
|
||||||
querierOptsV2 := querierV2.QuerierOptions{
|
querierOptsV2 := querierV2.QuerierOptions{
|
||||||
Reader: reader,
|
Reader: reader,
|
||||||
Cache: nil,
|
Cache: nil,
|
||||||
KeyGenerator: queryBuilder.NewKeyGenerator(),
|
KeyGenerator: queryBuilder.NewKeyGenerator(),
|
||||||
FeatureLookup: featureFlags,
|
FeatureLookup: featureFlags,
|
||||||
UseLogsNewSchema: useLogsNewSchema,
|
UseLogsNewSchema: useLogsNewSchema,
|
||||||
|
UseTraceNewSchema: useTraceNewSchema,
|
||||||
}
|
}
|
||||||
|
|
||||||
t.querier = querier.NewQuerier(querierOption)
|
t.querier = querier.NewQuerier(querierOption)
|
||||||
|
|||||||
@@ -791,7 +791,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) {
|
|||||||
postableRule.RuleCondition.MatchType = MatchType(c.matchType)
|
postableRule.RuleCondition.MatchType = MatchType(c.matchType)
|
||||||
postableRule.RuleCondition.Target = &c.target
|
postableRule.RuleCondition.Target = &c.target
|
||||||
|
|
||||||
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute))
|
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
@@ -880,7 +880,7 @@ func TestPrepareLinksToLogs(t *testing.T) {
|
|||||||
}
|
}
|
||||||
fm := featureManager.StartManager()
|
fm := featureManager.StartManager()
|
||||||
|
|
||||||
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute))
|
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
@@ -922,7 +922,7 @@ func TestPrepareLinksToTraces(t *testing.T) {
|
|||||||
}
|
}
|
||||||
fm := featureManager.StartManager()
|
fm := featureManager.StartManager()
|
||||||
|
|
||||||
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute))
|
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
@@ -998,7 +998,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) {
|
|||||||
postableRule.RuleCondition.MatchType = MatchType(c.matchType)
|
postableRule.RuleCondition.MatchType = MatchType(c.matchType)
|
||||||
postableRule.RuleCondition.Target = &c.target
|
postableRule.RuleCondition.Target = &c.target
|
||||||
|
|
||||||
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute))
|
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
@@ -1051,7 +1051,7 @@ func TestThresholdRuleEvalDelay(t *testing.T) {
|
|||||||
|
|
||||||
fm := featureManager.StartManager()
|
fm := featureManager.StartManager()
|
||||||
for idx, c := range cases {
|
for idx, c := range cases {
|
||||||
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true) // no eval delay
|
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true) // no eval delay
|
||||||
if err != nil {
|
if err != nil {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
@@ -1100,7 +1100,7 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) {
|
|||||||
|
|
||||||
fm := featureManager.StartManager()
|
fm := featureManager.StartManager()
|
||||||
for idx, c := range cases {
|
for idx, c := range cases {
|
||||||
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute))
|
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
@@ -1241,9 +1241,9 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true)
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
|
||||||
|
|
||||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true)
|
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||||
"signoz_calls_total": {
|
"signoz_calls_total": {
|
||||||
v3.Delta: true,
|
v3.Delta: true,
|
||||||
@@ -1340,9 +1340,9 @@ func TestThresholdRuleNoData(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true)
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
|
||||||
|
|
||||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true)
|
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||||
"signoz_calls_total": {
|
"signoz_calls_total": {
|
||||||
v3.Delta: true,
|
v3.Delta: true,
|
||||||
@@ -1445,9 +1445,9 @@ func TestThresholdRuleTracesLink(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true)
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
|
||||||
|
|
||||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true)
|
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||||
"signoz_calls_total": {
|
"signoz_calls_total": {
|
||||||
v3.Delta: true,
|
v3.Delta: true,
|
||||||
@@ -1570,9 +1570,9 @@ func TestThresholdRuleLogsLink(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true)
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
|
||||||
|
|
||||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true)
|
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||||
"signoz_calls_total": {
|
"signoz_calls_total": {
|
||||||
v3.Delta: true,
|
v3.Delta: true,
|
||||||
@@ -1648,7 +1648,7 @@ func TestThresholdRuleShiftBy(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
rule, err := NewThresholdRule("69", &postableRule, nil, nil, true)
|
rule, err := NewThresholdRule("69", &postableRule, nil, nil, true, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ func NewMockClickhouseReader(
|
|||||||
featureFlags,
|
featureFlags,
|
||||||
"",
|
"",
|
||||||
true,
|
true,
|
||||||
|
true,
|
||||||
)
|
)
|
||||||
|
|
||||||
return reader, mockDB
|
return reader, mockDB
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ type LogsListTsRange struct {
|
|||||||
End int64
|
End int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetLogsListTsRanges(start, end int64) []LogsListTsRange {
|
func GetListTsRanges(start, end int64) []LogsListTsRange {
|
||||||
startNano := GetEpochNanoSecs(start)
|
startNano := GetEpochNanoSecs(start)
|
||||||
endNano := GetEpochNanoSecs(end)
|
endNano := GetEpochNanoSecs(end)
|
||||||
result := []LogsListTsRange{}
|
result := []LogsListTsRange{}
|
||||||
@@ -35,13 +35,15 @@ func GetLogsListTsRanges(start, end int64) []LogsListTsRange {
|
|||||||
tStartNano = startNano
|
tStartNano = startNano
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
result = append(result, LogsListTsRange{Start: startNano, End: endNano})
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// This tries to see all possible fields that it can fall back to if some meta is missing
|
// This tries to see all possible fields that it can fall back to if some meta is missing
|
||||||
// check Test_GenerateLogEnrichmentKeys for example
|
// check Test_GenerateEnrichmentKeys for example
|
||||||
func GenerateLogEnrichmentKeys(field v3.AttributeKey) []string {
|
func GenerateEnrichmentKeys(field v3.AttributeKey) []string {
|
||||||
names := []string{}
|
names := []string{}
|
||||||
if field.Type != v3.AttributeKeyTypeUnspecified && field.DataType != v3.AttributeKeyDataTypeUnspecified {
|
if field.Type != v3.AttributeKeyTypeUnspecified && field.DataType != v3.AttributeKeyDataTypeUnspecified {
|
||||||
names = append(names, field.Key+"##"+field.Type.String()+"##"+field.DataType.String())
|
names = append(names, field.Key+"##"+field.Type.String()+"##"+field.DataType.String())
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLogsListTsRange(t *testing.T) {
|
func TestListTsRange(t *testing.T) {
|
||||||
startEndData := []struct {
|
startEndData := []struct {
|
||||||
name string
|
name string
|
||||||
start int64
|
start int64
|
||||||
@@ -18,7 +18,9 @@ func TestLogsListTsRange(t *testing.T) {
|
|||||||
name: "testing for less then one hour",
|
name: "testing for less then one hour",
|
||||||
start: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
start: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
end: 1722263800000000000, // July 29, 2024 8:06:40 PM
|
end: 1722263800000000000, // July 29, 2024 8:06:40 PM
|
||||||
res: []LogsListTsRange{},
|
res: []LogsListTsRange{
|
||||||
|
{1722262800000000000, 1722263800000000000},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "testing for more than one hour",
|
name: "testing for more than one hour",
|
||||||
@@ -44,7 +46,7 @@ func TestLogsListTsRange(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range startEndData {
|
for _, test := range startEndData {
|
||||||
res := GetLogsListTsRanges(test.start, test.end)
|
res := GetListTsRanges(test.start, test.end)
|
||||||
for i, v := range res {
|
for i, v := range res {
|
||||||
if test.res[i].Start != v.Start || test.res[i].End != v.End {
|
if test.res[i].Start != v.Start || test.res[i].End != v.End {
|
||||||
t.Errorf("expected range was %v - %v, got %v - %v", v.Start, v.End, test.res[i].Start, test.res[i].End)
|
t.Errorf("expected range was %v - %v, got %v - %v", v.Start, v.End, test.res[i].Start, test.res[i].End)
|
||||||
@@ -53,7 +55,7 @@ func TestLogsListTsRange(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_GenerateLogEnrichmentKeys(t *testing.T) {
|
func Test_GenerateEnrichmentKeys(t *testing.T) {
|
||||||
type args struct {
|
type args struct {
|
||||||
field v3.AttributeKey
|
field v3.AttributeKey
|
||||||
}
|
}
|
||||||
@@ -96,8 +98,8 @@ func Test_GenerateLogEnrichmentKeys(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
if got := GenerateLogEnrichmentKeys(tt.args.field); !reflect.DeepEqual(got, tt.want) {
|
if got := GenerateEnrichmentKeys(tt.args.field); !reflect.DeepEqual(got, tt.want) {
|
||||||
t.Errorf("generateLogEnrichmentKeys() = %v, want %v", got, tt.want)
|
t.Errorf("generateEnrichmentKeys() = %v, want %v", got, tt.want)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user