Compare commits

..

5 Commits

Author SHA1 Message Date
aniket
85d4b88391 chore(2493): removed old metrics exporter 2025-06-23 13:27:40 +05:30
aniketio-ctrl
c488a24d09 fix(prom-aggr): fix prom aggregation queries using utf-8 charset (#8262)
* fix(prom-aggr): added fix for prom aggregation

* fix(prom-aggr): added fix for prom aggregation
2025-06-16 19:42:17 +05:30
Vikrant Gupta
9091cf61fd chore(github): fix codeowners file (#8261) 2025-06-16 11:37:07 +00:00
Ekansh Gupta
eeb2ab3212 feat: added support for trace_operators in query range v5 (#8165)
* feat: added support for trace_operators in query range v5

* feat: added support for trace_operators in query range v5

* feat: added support for trace_operators in query range v5

* feat: added support for trace_operators in query range v5\n

* feat: added support for trace_operators in query range v5\n

* feat: added support for trace_operators in query range v5

* feat: added support for trace_operators in query range v5

* feat: added support for trace_operators in query range v5

* feat: added support for trace_operators in query range v5

---------

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
2025-06-16 16:43:51 +05:30
Nageshbansal
3f128f0f1d fix: configs in multi-node docker-swarm cluster (#8239) 2025-06-16 11:42:02 +05:30
17 changed files with 1413 additions and 1802 deletions

7
.github/CODEOWNERS vendored
View File

@@ -12,4 +12,9 @@
/pkg/factory/ @grandwizard28
/pkg/types/ @grandwizard28
.golangci.yml @grandwizard28
**/(zeus|licensing|sqlmigration)/ @vikrantgupta25
/pkg/zeus/ @vikrantgupta25
/pkg/licensing/ @vikrantgupta25
/pkg/sqlmigration/ @vikrantgupta25
/ee/zeus/ @vikrantgupta25
/ee/licensing/ @vikrantgupta25
/ee/sqlmigration/ @vikrantgupta25

View File

@@ -100,12 +100,18 @@ services:
# - "9000:9000"
# - "8123:8123"
# - "9181:9181"
configs:
- source: clickhouse-config
target: /etc/clickhouse-server/config.xml
- source: clickhouse-users
target: /etc/clickhouse-server/users.xml
- source: clickhouse-custom-function
target: /etc/clickhouse-server/custom-function.xml
- source: clickhouse-cluster
target: /etc/clickhouse-server/config.d/cluster.xml
volumes:
- ../common/clickhouse/config.xml:/etc/clickhouse-server/config.xml
- ../common/clickhouse/users.xml:/etc/clickhouse-server/users.xml
- ../common/clickhouse/custom-function.xml:/etc/clickhouse-server/custom-function.xml
- ../common/clickhouse/user_scripts:/var/lib/clickhouse/user_scripts/
- ../common/clickhouse/cluster.xml:/etc/clickhouse-server/config.d/cluster.xml
- clickhouse:/var/lib/clickhouse/
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
@@ -117,9 +123,10 @@ services:
- "8080:8080" # signoz port
# - "6060:6060" # pprof port
volumes:
- ../common/signoz/prometheus.yml:/root/config/prometheus.yml
- ../common/dashboards:/root/config/dashboards
- sqlite:/var/lib/signoz/
configs:
- source: signoz-prometheus-config
target: /root/config/prometheus.yml
environment:
- SIGNOZ_ALERTMANAGER_PROVIDER=signoz
- SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN=tcp://clickhouse:9000
@@ -147,9 +154,11 @@ services:
- --manager-config=/etc/manager-config.yaml
- --copy-path=/var/tmp/collector-config.yaml
- --feature-gates=-pkg.translator.prometheus.NormalizeName
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
- ../common/signoz/otel-collector-opamp-config.yaml:/etc/manager-config.yaml
configs:
- source: otel-collector-config
target: /etc/otel-collector-config.yaml
- source: otel-manager-config
target: /etc/manager-config.yaml
environment:
- OTEL_RESOURCE_ATTRIBUTES=host.name={{.Node.Hostname}},os.type={{.Node.Platform.OS}}
- LOW_CARDINAL_EXCEPTION_GROUPING=false
@@ -186,3 +195,26 @@ volumes:
name: signoz-sqlite
zookeeper-1:
name: signoz-zookeeper-1
configs:
clickhouse-config:
file: ../common/clickhouse/config.xml
clickhouse-users:
file: ../common/clickhouse/users.xml
clickhouse-custom-function:
file: ../common/clickhouse/custom-function.xml
clickhouse-cluster:
file: ../common/clickhouse/cluster.xml
signoz-prometheus-config:
file: ../common/signoz/prometheus.yml
# If you have multiple dashboard files, you can list them individually:
# dashboard-foo:
# file: ../common/dashboards/foo.json
# dashboard-bar:
# file: ../common/dashboards/bar.json
otel-collector-config:
file: ./otel-collector-config.yaml
otel-manager-config:
file: ../common/signoz/otel-collector-opamp-config.yaml

View File

@@ -26,7 +26,7 @@ processors:
detectors: [env, system]
timeout: 2s
signozspanmetrics/delta:
metrics_exporter: clickhousemetricswrite, signozclickhousemetrics
metrics_exporter: signozclickhousemetrics
metrics_flush_interval: 60s
latency_histogram_buckets: [100us, 1ms, 2ms, 6ms, 10ms, 50ms, 100ms, 250ms, 500ms, 1000ms, 1400ms, 2000ms, 5s, 10s, 20s, 40s, 60s ]
dimensions_cache_size: 100000
@@ -92,11 +92,11 @@ service:
metrics:
receivers: [otlp]
processors: [batch]
exporters: [clickhousemetricswrite, signozclickhousemetrics]
exporters: [signozclickhousemetrics]
metrics/prometheus:
receivers: [prometheus]
processors: [batch]
exporters: [clickhousemetricswrite/prometheus, signozclickhousemetrics]
exporters: [signozclickhousemetrics]
logs:
receivers: [otlp]
processors: [batch]

View File

@@ -26,7 +26,7 @@ processors:
detectors: [env, system]
timeout: 2s
signozspanmetrics/delta:
metrics_exporter: clickhousemetricswrite, signozclickhousemetrics
metrics_exporter: signozclickhousemetrics
metrics_flush_interval: 60s
latency_histogram_buckets: [100us, 1ms, 2ms, 6ms, 10ms, 50ms, 100ms, 250ms, 500ms, 1000ms, 1400ms, 2000ms, 5s, 10s, 20s, 40s, 60s ]
dimensions_cache_size: 100000
@@ -92,11 +92,11 @@ service:
metrics:
receivers: [otlp]
processors: [batch]
exporters: [clickhousemetricswrite, signozclickhousemetrics]
exporters: [signozclickhousemetrics]
metrics/prometheus:
receivers: [prometheus]
processors: [batch]
exporters: [clickhousemetricswrite/prometheus, signozclickhousemetrics]
exporters: [signozclickhousemetrics]
logs:
receivers: [otlp]
processors: [batch]

View File

@@ -149,28 +149,30 @@ function SpanOverview({
<Typography.Text className="service-name">
{span.serviceName}
</Typography.Text>
{!!span.serviceName && !!span.name && (
<div className="add-funnel-button">
<span className="add-funnel-button__separator">·</span>
<Button
type="text"
size="small"
className="add-funnel-button__button"
onClick={(e): void => {
e.preventDefault();
e.stopPropagation();
handleAddSpanToFunnel(span);
}}
icon={
<img
className="add-funnel-button__icon"
src="/Icons/funnel-add.svg"
alt="funnel-icon"
/>
}
/>
</div>
)}
{!!span.serviceName &&
!!span.name &&
process.env.NODE_ENV === 'development' && (
<div className="add-funnel-button">
<span className="add-funnel-button__separator">·</span>
<Button
type="text"
size="small"
className="add-funnel-button__button"
onClick={(e): void => {
e.preventDefault();
e.stopPropagation();
handleAddSpanToFunnel(span);
}}
icon={
<img
className="add-funnel-button__icon"
src="/Icons/funnel-add.svg"
alt="funnel-icon"
/>
}
/>
</div>
)}
</section>
</div>
</div>
@@ -473,7 +475,7 @@ function Success(props: ISuccessProps): JSX.Element {
virtualiserRef={virtualizerRef}
setColumnWidths={setTraceFlamegraphStatsWidth}
/>
{selectedSpanToAddToFunnel && (
{selectedSpanToAddToFunnel && process.env.NODE_ENV === 'development' && (
<AddSpanToFunnelModal
span={selectedSpanToAddToFunnel}
isOpen={isAddSpanToFunnelModalOpen}

View File

@@ -67,15 +67,19 @@ export default function TraceDetailsPage(): JSX.Element {
key: 'trace-details',
children: <TraceDetailsV2 />,
},
{
label: (
<div className="tab-item">
<Cone className="funnel-icon" size={16} /> Funnels
</div>
),
key: 'funnels',
children: <div />,
},
...(process.env.NODE_ENV === 'development'
? [
{
label: (
<div className="tab-item">
<Cone className="funnel-icon" size={16} /> Funnels
</div>
),
key: 'funnels',
children: <div />,
},
]
: []),
{
label: (
<div className="tab-item">

View File

@@ -14,7 +14,8 @@ function TracesModulePage(): JSX.Element {
const routes: TabRoutes[] = [
tracesExplorer,
tracesFunnel(pathname),
// TODO(shaheer): remove this check after everything is ready
process.env.NODE_ENV === 'development' ? tracesFunnel(pathname) : null,
tracesSaveView,
].filter(Boolean) as TabRoutes[];

View File

@@ -1,790 +0,0 @@
package tracefunnel
import (
"fmt"
)
func BuildTwoStepFunnelValidationQuery(
containsErrorT1 int,
containsErrorT2 int,
startTs int64,
endTs int64,
serviceNameT1 string,
spanNameT1 string,
serviceNameT2 string,
spanNameT2 string,
clauseStep1 string,
clauseStep2 string,
) string {
queryTemplate := `
WITH
%[1]d AS contains_error_t1,
%[2]d AS contains_error_t2,
toDateTime64(%[3]d/1e9, 9) AS start_ts,
toDateTime64(%[4]d/1e9, 9) AS end_ts,
('%[5]s','%[6]s') AS step1,
('%[7]s','%[8]s') AS step2
SELECT
trace_id
FROM (
SELECT
trace_id,
minIf(timestamp, serviceName = step1.1 AND name = step1.2) AS t1_time,
minIf(timestamp, serviceName = step2.1 AND name = step2.2) AS t2_time
FROM signoz_traces.signoz_index_v3
WHERE
timestamp BETWEEN start_ts AND end_ts
AND (
(serviceName = step1.1 AND name = step1.2 AND (contains_error_t1 = 0 OR has_error = true) %[9]s)
OR
(serviceName = step2.1 AND name = step2.2 AND (contains_error_t2 = 0 OR has_error = true) %[10]s)
)
GROUP BY trace_id
HAVING t1_time > 0 AND t2_time > t1_time
)
ORDER BY t1_time
LIMIT 5;`
return fmt.Sprintf(queryTemplate,
containsErrorT1,
containsErrorT2,
startTs,
endTs,
serviceNameT1,
spanNameT1,
serviceNameT2,
spanNameT2,
clauseStep1,
clauseStep2,
)
}
func BuildThreeStepFunnelValidationQuery(
containsErrorT1 int,
containsErrorT2 int,
containsErrorT3 int,
startTs int64,
endTs int64,
serviceNameT1 string,
spanNameT1 string,
serviceNameT2 string,
spanNameT2 string,
serviceNameT3 string,
spanNameT3 string,
clauseStep1 string,
clauseStep2 string,
clauseStep3 string,
) string {
queryTemplate := `
WITH
%[1]d AS contains_error_t1,
%[2]d AS contains_error_t2,
%[3]d AS contains_error_t3,
toDateTime64(%[4]d/1e9, 9) AS start_ts,
toDateTime64(%[5]d/1e9, 9) AS end_ts,
('%[6]s','%[7]s') AS step1,
('%[8]s','%[9]s') AS step2,
('%[10]s','%[11]s') AS step3
SELECT
trace_id
FROM (
SELECT
trace_id,
minIf(timestamp, serviceName = step1.1 AND name = step1.2) AS t1_time,
minIf(timestamp, serviceName = step2.1 AND name = step2.2) AS t2_time,
minIf(timestamp, serviceName = step3.1 AND name = step3.2) AS t3_time
FROM signoz_traces.signoz_index_v3
WHERE
timestamp BETWEEN start_ts AND end_ts
AND (
(serviceName = step1.1 AND name = step1.2 AND (contains_error_t1 = 0 OR has_error = true) %[12]s)
OR (serviceName = step2.1 AND name = step2.2 AND (contains_error_t2 = 0 OR has_error = true) %[13]s)
OR (serviceName = step3.1 AND name = step3.2 AND (contains_error_t3 = 0 OR has_error = true) %[14]s)
)
GROUP BY trace_id
HAVING t1_time > 0 AND t2_time > t1_time AND t3_time > t2_time
)
ORDER BY t1_time
LIMIT 5;`
return fmt.Sprintf(queryTemplate,
containsErrorT1,
containsErrorT2,
containsErrorT3,
startTs,
endTs,
serviceNameT1,
spanNameT1,
serviceNameT2,
spanNameT2,
serviceNameT3,
spanNameT3,
clauseStep1,
clauseStep2,
clauseStep3,
)
}
func BuildTwoStepFunnelOverviewQuery(
containsErrorT1 int,
containsErrorT2 int,
latencyPointerT1 string,
latencyPointerT2 string,
startTs int64,
endTs int64,
serviceNameT1 string,
spanNameT1 string,
serviceNameT2 string,
spanNameT2 string,
clauseStep1 string,
clauseStep2 string,
) string {
queryTemplate := `
WITH
%[1]d AS contains_error_t1,
%[2]d AS contains_error_t2,
'%[3]s' AS latency_pointer_t1,
'%[4]s' AS latency_pointer_t2,
toDateTime64(%[5]d/1e9, 9) AS start_ts,
toDateTime64(%[6]d/1e9, 9) AS end_ts,
(%[6]d - %[5]d)/1e9 AS time_window_sec,
('%[7]s','%[8]s') AS step1,
('%[9]s','%[10]s') AS step2
, funnel AS (
SELECT
trace_id,
minIf(timestamp, serviceName = step1.1 AND name = step1.2) AS t1_time,
minIf(timestamp, serviceName = step2.1 AND name = step2.2) AS t2_time,
toUInt8(anyIf(has_error, serviceName = step1.1 AND name = step1.2)) AS s1_error,
toUInt8(anyIf(has_error, serviceName = step2.1 AND name = step2.2)) AS s2_error
FROM signoz_traces.signoz_index_v3
WHERE
timestamp BETWEEN start_ts AND end_ts
AND (
(serviceName = step1.1 AND name = step1.2 AND (contains_error_t1 = 0 OR has_error = true) %[11]s)
OR
(serviceName = step2.1 AND name = step2.2 AND (contains_error_t2 = 0 OR has_error = true) %[12]s)
)
GROUP BY trace_id
HAVING t1_time > 0 AND t2_time > t1_time
)
, totals AS (
SELECT
count(DISTINCT trace_id) AS total_s1_spans,
count(DISTINCT CASE WHEN t2_time > t1_time THEN trace_id END) AS total_s2_spans,
count(DISTINCT CASE WHEN s1_error = 1 THEN trace_id END) AS sum_s1_error,
count(DISTINCT CASE WHEN s2_error = 1 THEN trace_id END) AS sum_s2_error,
avg((toUnixTimestamp64Nano(t2_time) - toUnixTimestamp64Nano(t1_time)) / 1e6) AS avg_duration,
quantile(0.99)((toUnixTimestamp64Nano(t2_time) - toUnixTimestamp64Nano(t1_time)) / 1e6) AS latency
FROM funnel
)
SELECT
round(if(total_s1_spans > 0, total_s2_spans * 100.0 / total_s1_spans, 0), 2) AS conversion_rate,
total_s2_spans / time_window_sec AS avg_rate,
greatest(sum_s1_error, sum_s2_error) AS errors,
avg_duration,
latency
FROM totals;
`
return fmt.Sprintf(queryTemplate,
containsErrorT1,
containsErrorT2,
latencyPointerT1,
latencyPointerT2,
startTs,
endTs,
serviceNameT1,
spanNameT1,
serviceNameT2,
spanNameT2,
clauseStep1,
clauseStep2,
)
}
func BuildThreeStepFunnelOverviewQuery(
containsErrorT1 int,
containsErrorT2 int,
containsErrorT3 int,
latencyPointerT1 string,
latencyPointerT2 string,
latencyPointerT3 string,
startTs int64,
endTs int64,
serviceNameT1 string,
spanNameT1 string,
serviceNameT2 string,
spanNameT2 string,
serviceNameT3 string,
spanNameT3 string,
clauseStep1 string,
clauseStep2 string,
clauseStep3 string,
) string {
queryTemplate := `
WITH
%[1]d AS contains_error_t1,
%[2]d AS contains_error_t2,
%[3]d AS contains_error_t3,
'%[4]s' AS latency_pointer_t1,
'%[5]s' AS latency_pointer_t2,
'%[6]s' AS latency_pointer_t3,
toDateTime64(%[7]d/1e9, 9) AS start_ts,
toDateTime64(%[8]d/1e9, 9) AS end_ts,
(%[8]d - %[7]d)/1e9 AS time_window_sec,
('%[9]s','%[10]s') AS step1,
('%[11]s','%[12]s') AS step2,
('%[13]s','%[14]s') AS step3
, funnel AS (
SELECT
trace_id,
minIf(timestamp, serviceName = step1.1 AND name = step1.2) AS t1_time,
minIf(timestamp, serviceName = step2.1 AND name = step2.2) AS t2_time,
minIf(timestamp, serviceName = step3.1 AND name = step3.2) AS t3_time,
toUInt8(anyIf(has_error, serviceName = step1.1 AND name = step1.2)) AS s1_error,
toUInt8(anyIf(has_error, serviceName = step2.1 AND name = step2.2)) AS s2_error,
toUInt8(anyIf(has_error, serviceName = step3.1 AND name = step3.2)) AS s3_error
FROM signoz_traces.signoz_index_v3
WHERE
timestamp BETWEEN start_ts AND end_ts
AND (
(serviceName = step1.1 AND name = step1.2 AND (contains_error_t1 = 0 OR has_error = true) %[15]s)
OR (serviceName = step2.1 AND name = step2.2 AND (contains_error_t2 = 0 OR has_error = true) %[16]s)
OR (serviceName = step3.1 AND name = step3.2 AND (contains_error_t3 = 0 OR has_error = true) %[17]s)
)
GROUP BY trace_id
)
, totals AS (
SELECT
count(DISTINCT trace_id) AS total_s1_spans,
count(DISTINCT CASE WHEN t2_time > t1_time THEN trace_id END) AS total_s2_spans,
count(DISTINCT CASE WHEN t3_time > t2_time THEN trace_id END) AS total_s3_spans,
count(DISTINCT CASE WHEN s1_error = 1 THEN trace_id END) AS sum_s1_error,
count(DISTINCT CASE WHEN s2_error = 1 THEN trace_id END) AS sum_s2_error,
count(DISTINCT CASE WHEN s3_error = 1 THEN trace_id END) AS sum_s3_error,
avgIf((toUnixTimestamp64Nano(t2_time) - toUnixTimestamp64Nano(t1_time))/1e6, t1_time > 0 AND t2_time > t1_time) AS avg_duration_12,
quantileIf(0.99)((toUnixTimestamp64Nano(t2_time) - toUnixTimestamp64Nano(t1_time))/1e6, t1_time > 0 AND t2_time > t1_time) AS latency_12,
avgIf((toUnixTimestamp64Nano(t3_time) - toUnixTimestamp64Nano(t2_time))/1e6, t2_time > 0 AND t3_time > t2_time) AS avg_duration_23,
quantileIf(0.99)((toUnixTimestamp64Nano(t3_time) - toUnixTimestamp64Nano(t2_time))/1e6, t2_time > 0 AND t3_time > t2_time) AS latency_23
FROM funnel
)
SELECT
round(if(total_s1_spans > 0, total_s3_spans * 100.0 / total_s1_spans, 0), 2) AS conversion_rate,
total_s3_spans / nullIf(time_window_sec, 0) AS avg_rate,
greatest(sum_s1_error, sum_s2_error, sum_s3_error) AS errors,
avg_duration_23 AS avg_duration,
latency_23 AS latency
FROM totals;
`
return fmt.Sprintf(
queryTemplate,
containsErrorT1,
containsErrorT2,
containsErrorT3,
latencyPointerT1,
latencyPointerT2,
latencyPointerT3,
startTs,
endTs,
serviceNameT1,
spanNameT1,
serviceNameT2,
spanNameT2,
serviceNameT3,
spanNameT3,
clauseStep1,
clauseStep2,
clauseStep3,
)
}
func BuildTwoStepFunnelCountQuery(
containsErrorT1 int,
containsErrorT2 int,
startTs int64,
endTs int64,
serviceNameT1 string,
spanNameT1 string,
serviceNameT2 string,
spanNameT2 string,
clauseStep1 string,
clauseStep2 string,
) string {
queryTemplate := `
WITH
%[1]d AS contains_error_t1,
%[2]d AS contains_error_t2,
toDateTime64(%[3]d/1e9,9) AS start_ts,
toDateTime64(%[4]d/1e9,9) AS end_ts,
('%[5]s','%[6]s') AS step1,
('%[7]s','%[8]s') AS step2
SELECT
count(DISTINCT trace_id) AS total_s1_spans,
count(DISTINCT CASE WHEN t1_error = 1 THEN trace_id END) AS total_s1_errored_spans,
count(DISTINCT CASE WHEN t2_time > t1_time THEN trace_id END) AS total_s2_spans,
count(DISTINCT CASE WHEN t2_time > t1_time AND t2_error = 1 THEN trace_id END) AS total_s2_errored_spans
FROM (
SELECT
trace_id,
minIf(timestamp, serviceName = step1.1 AND name = step1.2) AS t1_time,
minIf(timestamp, serviceName = step2.1 AND name = step2.2) AS t2_time,
toUInt8(anyIf(has_error, serviceName = step1.1 AND name = step1.2)) AS t1_error,
toUInt8(anyIf(has_error, serviceName = step2.1 AND name = step2.2)) AS t2_error
FROM signoz_traces.signoz_index_v3
WHERE
timestamp BETWEEN start_ts AND end_ts
AND (
(serviceName = step1.1 AND name = step1.2 AND (contains_error_t1 = 0 OR has_error = true) %[9]s)
OR
(serviceName = step2.1 AND name = step2.2 AND (contains_error_t2 = 0 OR has_error = true) %[10]s)
)
GROUP BY trace_id
HAVING t1_time > 0 AND t2_time > t1_time
) AS funnel;
`
return fmt.Sprintf(queryTemplate,
containsErrorT1,
containsErrorT2,
startTs,
endTs,
serviceNameT1,
spanNameT1,
serviceNameT2,
spanNameT2,
clauseStep1,
clauseStep2,
)
}
func BuildThreeStepFunnelCountQuery(
containsErrorT1 int,
containsErrorT2 int,
containsErrorT3 int,
startTs int64,
endTs int64,
serviceNameT1 string,
spanNameT1 string,
serviceNameT2 string,
spanNameT2 string,
serviceNameT3 string,
spanNameT3 string,
clauseStep1 string,
clauseStep2 string,
clauseStep3 string,
) string {
queryTemplate := `
WITH
%[1]d AS contains_error_t1,
%[2]d AS contains_error_t2,
%[3]d AS contains_error_t3,
toDateTime64(%[4]d/1e9,9) AS start_ts,
toDateTime64(%[5]d/1e9,9) AS end_ts,
('%[6]s','%[7]s') AS step1,
('%[8]s','%[9]s') AS step2,
('%[10]s','%[11]s') AS step3
SELECT
count(DISTINCT trace_id) AS total_s1_spans,
count(DISTINCT CASE WHEN t1_error = 1 THEN trace_id END) AS total_s1_errored_spans,
count(DISTINCT CASE WHEN t2_time > t1_time THEN trace_id END) AS total_s2_spans,
count(DISTINCT CASE WHEN t2_time > t1_time AND t2_error = 1 THEN trace_id END) AS total_s2_errored_spans,
count(DISTINCT CASE WHEN t2_time > t1_time AND t3_time > t2_time THEN trace_id END) AS total_s3_spans,
count(DISTINCT CASE WHEN t2_time > t1_time AND t3_time > t2_time AND t3_error = 1 THEN trace_id END) AS total_s3_errored_spans
FROM (
SELECT
trace_id,
minIf(timestamp, serviceName = step1.1 AND name = step1.2) AS t1_time,
minIf(timestamp, serviceName = step2.1 AND name = step2.2) AS t2_time,
minIf(timestamp, serviceName = step3.1 AND name = step3.2) AS t3_time,
toUInt8(anyIf(has_error, serviceName = step1.1 AND name = step1.2)) AS t1_error,
toUInt8(anyIf(has_error, serviceName = step2.1 AND name = step2.2)) AS t2_error,
toUInt8(anyIf(has_error, serviceName = step3.1 AND name = step3.2)) AS t3_error
FROM signoz_traces.signoz_index_v3
WHERE
timestamp BETWEEN start_ts AND end_ts
AND (
(serviceName = step1.1 AND name = step1.2 AND (contains_error_t1 = 0 OR has_error = true) %[12]s)
OR (serviceName = step2.1 AND name = step2.2 AND (contains_error_t2 = 0 OR has_error = true) %[13]s)
OR (serviceName = step3.1 AND name = step3.2 AND (contains_error_t3 = 0 OR has_error = true) %[14]s)
)
GROUP BY trace_id
HAVING t1_time > 0 AND t2_time > t1_time AND t3_time > t2_time
) AS funnel;
`
return fmt.Sprintf(queryTemplate,
containsErrorT1,
containsErrorT2,
containsErrorT3,
startTs,
endTs,
serviceNameT1,
spanNameT1,
serviceNameT2,
spanNameT2,
serviceNameT3,
spanNameT3,
clauseStep1,
clauseStep2,
clauseStep3,
)
}
func BuildTwoStepFunnelTopSlowTracesQuery(
containsErrorT1 int,
containsErrorT2 int,
startTs int64,
endTs int64,
serviceNameT1 string,
spanNameT1 string,
serviceNameT2 string,
spanNameT2 string,
clauseStep1 string,
clauseStep2 string,
) string {
queryTemplate := `
WITH
%[1]d AS contains_error_t1,
%[2]d AS contains_error_t2,
toDateTime64(%[3]d/1e9, 9) AS start_ts,
toDateTime64(%[4]d/1e9, 9) AS end_ts,
('%[5]s','%[6]s') AS step1,
('%[7]s','%[8]s') AS step2
SELECT
trace_id,
(toUnixTimestamp64Nano(t2_time) - toUnixTimestamp64Nano(t1_time)) / 1e6 AS duration_ms,
span_count
FROM (
SELECT
trace_id,
minIf(timestamp, serviceName = step1.1 AND name = step1.2) AS t1_time,
minIf(timestamp, serviceName = step2.1 AND name = step2.2) AS t2_time,
count() AS span_count
FROM signoz_traces.signoz_index_v3
WHERE
timestamp BETWEEN start_ts AND end_ts
AND (
(serviceName = step1.1 AND name = step1.2 AND (contains_error_t1 = 0 OR has_error = true) %[9]s)
OR
(serviceName = step2.1 AND name = step2.2 AND (contains_error_t2 = 0 OR has_error = true) %[10]s)
)
GROUP BY trace_id
HAVING t1_time > 0 AND t2_time > t1_time
) AS funnel
ORDER BY duration_ms DESC
LIMIT 5;
`
return fmt.Sprintf(queryTemplate,
containsErrorT1,
containsErrorT2,
startTs,
endTs,
serviceNameT1,
spanNameT1,
serviceNameT2,
spanNameT2,
clauseStep1,
clauseStep2,
)
}
func BuildTwoStepFunnelTopSlowErrorTracesQuery(
containsErrorT1 int,
containsErrorT2 int,
startTs int64,
endTs int64,
serviceNameT1 string,
spanNameT1 string,
serviceNameT2 string,
spanNameT2 string,
clauseStep1 string,
clauseStep2 string,
) string {
queryTemplate := `
WITH
%[1]d AS contains_error_t1,
%[2]d AS contains_error_t2,
toDateTime64(%[3]d/1e9, 9) AS start_ts,
toDateTime64(%[4]d/1e9, 9) AS end_ts,
('%[5]s','%[6]s') AS step1,
('%[7]s','%[8]s') AS step2
SELECT
trace_id,
(toUnixTimestamp64Nano(t2_time) - toUnixTimestamp64Nano(t1_time)) / 1e6 AS duration_ms,
span_count
FROM (
SELECT
trace_id,
minIf(timestamp, serviceName = step1.1 AND name = step1.2) AS t1_time,
minIf(timestamp, serviceName = step2.1 AND name = step2.2) AS t2_time,
toUInt8(anyIf(has_error, serviceName = step1.1 AND name = step1.2)) AS t1_error,
toUInt8(anyIf(has_error, serviceName = step2.1 AND name = step2.2)) AS t2_error,
count() AS span_count
FROM signoz_traces.signoz_index_v3
WHERE
timestamp BETWEEN start_ts AND end_ts
AND (
(serviceName = step1.1 AND name = step1.2 AND (contains_error_t1 = 0 OR has_error = true) %[9]s)
OR
(serviceName = step2.1 AND name = step2.2 AND (contains_error_t2 = 0 OR has_error = true) %[10]s)
)
GROUP BY trace_id
HAVING t1_time > 0 AND t2_time > t1_time
) AS funnel
WHERE
(t1_error = 1 OR t2_error = 1)
ORDER BY duration_ms DESC
LIMIT 5;
`
return fmt.Sprintf(queryTemplate,
containsErrorT1,
containsErrorT2,
startTs,
endTs,
serviceNameT1,
spanNameT1,
serviceNameT2,
spanNameT2,
clauseStep1,
clauseStep2,
)
}
func BuildTwoStepFunnelStepOverviewQuery(
containsErrorT1 int,
containsErrorT2 int,
latencyPointerT1 string,
latencyPointerT2 string,
startTs int64,
endTs int64,
serviceNameT1 string,
spanNameT1 string,
serviceNameT2 string,
spanNameT2 string,
clauseStep1 string,
clauseStep2 string,
latencyTypeT2 string,
) string {
const tpl = `
WITH
toDateTime64(%[5]d / 1e9, 9) AS start_ts,
toDateTime64(%[6]d / 1e9, 9) AS end_ts,
(%[6]d - %[5]d) / 1e9 AS time_window_sec,
('%[7]s', '%[8]s') AS step1,
('%[9]s', '%[10]s') AS step2,
%[1]d AS contains_error_t1,
%[2]d AS contains_error_t2
SELECT
round(total_s2_spans * 100.0 / total_s1_spans, 2) AS conversion_rate,
total_s2_spans / time_window_sec AS avg_rate,
greatest(sum_s1_error, sum_s2_error) AS errors,
avg_duration,
latency
FROM (
SELECT
count(DISTINCT trace_id) AS total_s1_spans,
count(DISTINCT CASE WHEN t2_time > t1_time THEN trace_id END) AS total_s2_spans,
count(DISTINCT CASE WHEN s1_error = 1 THEN trace_id END) AS sum_s1_error,
count(DISTINCT CASE WHEN s2_error = 1 THEN trace_id END) AS sum_s2_error,
avgIf(
(toUnixTimestamp64Nano(t2_time) - toUnixTimestamp64Nano(t1_time)) / 1e6,
t1_time > 0 AND t2_time > t1_time
) AS avg_duration,
quantileIf(%[13]s)(
(toUnixTimestamp64Nano(t2_time) - toUnixTimestamp64Nano(t1_time)) / 1e6,
t1_time > 0 AND t2_time > t1_time
) AS latency
FROM (
SELECT
trace_id,
minIf(timestamp, serviceName = step1.1 AND name = step1.2) AS t1_time,
minIf(timestamp, serviceName = step2.1 AND name = step2.2) AS t2_time,
toUInt8(anyIf(has_error, serviceName = step1.1 AND name = step1.2)) AS s1_error,
toUInt8(anyIf(has_error, serviceName = step2.1 AND name = step2.2)) AS s2_error
FROM signoz_traces.signoz_index_v3
WHERE
timestamp BETWEEN start_ts AND end_ts
AND (
(serviceName = step1.1 AND name = step1.2 AND (contains_error_t1 = 0 OR has_error = true) %[11]s)
OR
(serviceName = step2.1 AND name = step2.2 AND (contains_error_t2 = 0 OR has_error = true) %[12]s)
)
GROUP BY trace_id
) AS funnel
) AS totals;
`
return fmt.Sprintf(tpl,
containsErrorT1,
containsErrorT2,
latencyPointerT1,
latencyPointerT2,
startTs,
endTs,
serviceNameT1,
spanNameT1,
serviceNameT2,
spanNameT2,
clauseStep1,
clauseStep2,
latencyTypeT2,
)
}
func BuildThreeStepFunnelStepOverviewQuery(
containsErrorT1 int,
containsErrorT2 int,
containsErrorT3 int,
latencyPointerT1 string,
latencyPointerT2 string,
latencyPointerT3 string,
startTs int64,
endTs int64,
serviceNameT1 string,
spanNameT1 string,
serviceNameT2 string,
spanNameT2 string,
serviceNameT3 string,
spanNameT3 string,
clauseStep1 string,
clauseStep2 string,
clauseStep3 string,
stepStart int64,
stepEnd int64,
latencyTypeT2 string,
latencyTypeT3 string,
) string {
const baseWithAndFunnel = `
WITH
toDateTime64(%[7]d/1e9, 9) AS start_ts,
toDateTime64(%[8]d/1e9, 9) AS end_ts,
(%[8]d - %[7]d) / 1e9 AS time_window_sec,
('%[9]s','%[10]s') AS step1,
('%[11]s','%[12]s') AS step2,
('%[13]s','%[14]s') AS step3,
%[1]d AS contains_error_t1,
%[2]d AS contains_error_t2,
%[3]d AS contains_error_t3,
funnel AS (
SELECT
trace_id,
minIf(timestamp, serviceName = step1.1 AND name = step1.2) AS t1_time,
minIf(timestamp, serviceName = step2.1 AND name = step2.2) AS t2_time,
minIf(timestamp, serviceName = step3.1 AND name = step3.2) AS t3_time,
toUInt8(anyIf(has_error, serviceName = step1.1 AND name = step1.2)) AS s1_error,
toUInt8(anyIf(has_error, serviceName = step2.1 AND name = step2.2)) AS s2_error,
toUInt8(anyIf(has_error, serviceName = step3.1 AND name = step3.2)) AS s3_error
FROM signoz_traces.signoz_index_v3
WHERE
timestamp BETWEEN start_ts AND end_ts
AND (
(serviceName = step1.1 AND name = step1.2 AND (contains_error_t1 = 0 OR has_error = true) %[15]s)
OR (serviceName = step2.1 AND name = step2.2 AND (contains_error_t2 = 0 OR has_error = true) %[16]s)
OR (serviceName = step3.1 AND name = step3.2 AND (contains_error_t3 = 0 OR has_error = true) %[17]s)
)
GROUP BY trace_id
)
`
const totals12 = `
SELECT
round(if(total_s1_spans > 0, total_s2_spans * 100.0 / total_s1_spans, 0), 2) AS conversion_rate,
total_s2_spans / time_window_sec AS avg_rate,
greatest(sum_s1_error, sum_s2_error) AS errors,
avg_duration_12 AS avg_duration,
latency_12 AS latency
FROM (
SELECT
count(DISTINCT CASE WHEN t2_time > t1_time THEN trace_id END) AS total_s2_spans,
count(DISTINCT trace_id) AS total_s1_spans,
count(DISTINCT CASE WHEN s1_error = 1 THEN trace_id END) AS sum_s1_error,
count(DISTINCT CASE WHEN s2_error = 1 THEN trace_id END) AS sum_s2_error,
avgIf((toUnixTimestamp64Nano(t2_time) - toUnixTimestamp64Nano(t1_time)) / 1e6, t1_time > 0 AND t2_time > t1_time) AS avg_duration_12,
quantileIf(%[18]s)((toUnixTimestamp64Nano(t2_time) - toUnixTimestamp64Nano(t1_time)) / 1e6, t1_time > 0 AND t2_time > t1_time) AS latency_12
FROM funnel
) AS totals;
`
const totals23 = `
SELECT
round(if(total_s2_spans > 0, total_s3_spans * 100.0 / total_s2_spans, 0), 2) AS conversion_rate,
total_s3_spans / time_window_sec AS avg_rate,
greatest(sum_s2_error, sum_s3_error) AS errors,
avg_duration_23 AS avg_duration,
latency_23 AS latency
FROM (
SELECT
count(DISTINCT CASE WHEN t2_time > 0 AND t3_time > t2_time THEN trace_id END) AS total_s3_spans,
count(DISTINCT CASE WHEN t2_time > 0 THEN trace_id END) AS total_s2_spans,
count(DISTINCT CASE WHEN s2_error = 1 THEN trace_id END) AS sum_s2_error,
count(DISTINCT CASE WHEN s3_error = 1 THEN trace_id END) AS sum_s3_error,
avgIf((toUnixTimestamp64Nano(t3_time) - toUnixTimestamp64Nano(t2_time)) / 1e6, t2_time > 0 AND t3_time > t2_time) AS avg_duration_23,
quantileIf(%[19]s)((toUnixTimestamp64Nano(t3_time) - toUnixTimestamp64Nano(t2_time)) / 1e6, t2_time > 0 AND t3_time > t2_time) AS latency_23
FROM funnel
) AS totals;
`
const fallback = `
SELECT 0 AS conversion_rate, 0 AS avg_rate, 0 AS errors, 0 AS avg_duration, 0 AS latency;
`
var totalsTpl string
switch {
case stepStart == 1 && stepEnd == 2:
totalsTpl = totals12
case stepStart == 2 && stepEnd == 3:
totalsTpl = totals23
default:
totalsTpl = fallback
}
return fmt.Sprintf(
baseWithAndFunnel+totalsTpl,
containsErrorT1,
containsErrorT2,
containsErrorT3,
latencyPointerT1,
latencyPointerT2,
latencyPointerT3,
startTs,
endTs,
serviceNameT1,
spanNameT1,
serviceNameT2,
spanNameT2,
serviceNameT3,
spanNameT3,
clauseStep1,
clauseStep2,
clauseStep3,
latencyTypeT2,
latencyTypeT3,
)
}

View File

@@ -1,475 +0,0 @@
package tracefunnel
import (
"fmt"
"strings"
tracev4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/types/tracefunneltypes"
)
// sanitizeClause adds AND prefix to non-empty clauses if not already present
func sanitizeClause(clause string) string {
if clause == "" {
return ""
}
// Check if clause already starts with AND
if strings.HasPrefix(strings.TrimSpace(clause), "AND") {
return clause
}
return "AND " + clause
}
func ValidateTraces(funnel *tracefunneltypes.StorableFunnel, timeRange tracefunneltypes.TimeRange) (*v3.ClickHouseQuery, error) {
var query string
var err error
funnelSteps := funnel.Steps
containsErrorT1 := 0
containsErrorT2 := 0
containsErrorT3 := 0
if funnelSteps[0].HasErrors {
containsErrorT1 = 1
}
if funnelSteps[1].HasErrors {
containsErrorT2 = 1
}
if len(funnel.Steps) > 2 && funnelSteps[2].HasErrors {
containsErrorT3 = 1
}
// Build filter clauses for each step
clauseStep1, err := tracev4.BuildTracesFilter(funnelSteps[0].Filters)
if err != nil {
return nil, err
}
clauseStep2, err := tracev4.BuildTracesFilter(funnelSteps[1].Filters)
if err != nil {
return nil, err
}
clauseStep3 := ""
if len(funnel.Steps) > 2 {
clauseStep3, err = tracev4.BuildTracesFilter(funnelSteps[2].Filters)
if err != nil {
return nil, err
}
}
// Sanitize clauses
clauseStep1 = sanitizeClause(clauseStep1)
clauseStep2 = sanitizeClause(clauseStep2)
clauseStep3 = sanitizeClause(clauseStep3)
if len(funnel.Steps) > 2 {
query = BuildThreeStepFunnelValidationQuery(
containsErrorT1,
containsErrorT2,
containsErrorT3,
timeRange.StartTime,
timeRange.EndTime,
funnelSteps[0].ServiceName,
funnelSteps[0].SpanName,
funnelSteps[1].ServiceName,
funnelSteps[1].SpanName,
funnelSteps[2].ServiceName,
funnelSteps[2].SpanName,
clauseStep1,
clauseStep2,
clauseStep3,
)
} else {
query = BuildTwoStepFunnelValidationQuery(
containsErrorT1,
containsErrorT2,
timeRange.StartTime,
timeRange.EndTime,
funnelSteps[0].ServiceName,
funnelSteps[0].SpanName,
funnelSteps[1].ServiceName,
funnelSteps[1].SpanName,
clauseStep1,
clauseStep2,
)
}
return &v3.ClickHouseQuery{
Query: query,
}, nil
}
func GetFunnelAnalytics(funnel *tracefunneltypes.StorableFunnel, timeRange tracefunneltypes.TimeRange) (*v3.ClickHouseQuery, error) {
var query string
var err error
funnelSteps := funnel.Steps
containsErrorT1 := 0
containsErrorT2 := 0
containsErrorT3 := 0
latencyPointerT1 := funnelSteps[0].LatencyPointer
latencyPointerT2 := funnelSteps[1].LatencyPointer
latencyPointerT3 := "start"
if len(funnel.Steps) > 2 {
latencyPointerT3 = funnelSteps[2].LatencyPointer
}
if funnelSteps[0].HasErrors {
containsErrorT1 = 1
}
if funnelSteps[1].HasErrors {
containsErrorT2 = 1
}
if len(funnel.Steps) > 2 && funnelSteps[2].HasErrors {
containsErrorT3 = 1
}
// Build filter clauses for each step
clauseStep1, err := tracev4.BuildTracesFilter(funnelSteps[0].Filters)
if err != nil {
return nil, err
}
clauseStep2, err := tracev4.BuildTracesFilter(funnelSteps[1].Filters)
if err != nil {
return nil, err
}
clauseStep3 := ""
if len(funnel.Steps) > 2 {
clauseStep3, err = tracev4.BuildTracesFilter(funnelSteps[2].Filters)
if err != nil {
return nil, err
}
}
// Sanitize clauses
clauseStep1 = sanitizeClause(clauseStep1)
clauseStep2 = sanitizeClause(clauseStep2)
clauseStep3 = sanitizeClause(clauseStep3)
if len(funnel.Steps) > 2 {
query = BuildThreeStepFunnelOverviewQuery(
containsErrorT1,
containsErrorT2,
containsErrorT3,
latencyPointerT1,
latencyPointerT2,
latencyPointerT3,
timeRange.StartTime,
timeRange.EndTime,
funnelSteps[0].ServiceName,
funnelSteps[0].SpanName,
funnelSteps[1].ServiceName,
funnelSteps[1].SpanName,
funnelSteps[2].ServiceName,
funnelSteps[2].SpanName,
clauseStep1,
clauseStep2,
clauseStep3,
)
} else {
query = BuildTwoStepFunnelOverviewQuery(
containsErrorT1,
containsErrorT2,
latencyPointerT1,
latencyPointerT2,
timeRange.StartTime,
timeRange.EndTime,
funnelSteps[0].ServiceName,
funnelSteps[0].SpanName,
funnelSteps[1].ServiceName,
funnelSteps[1].SpanName,
clauseStep1,
clauseStep2,
)
}
return &v3.ClickHouseQuery{Query: query}, nil
}
func GetFunnelStepAnalytics(funnel *tracefunneltypes.StorableFunnel, timeRange tracefunneltypes.TimeRange, stepStart, stepEnd int64) (*v3.ClickHouseQuery, error) {
var query string
var err error
funnelSteps := funnel.Steps
containsErrorT1 := 0
containsErrorT2 := 0
containsErrorT3 := 0
latencyPointerT1 := funnelSteps[0].LatencyPointer
latencyPointerT2 := funnelSteps[1].LatencyPointer
latencyPointerT3 := "start"
if len(funnel.Steps) > 2 {
latencyPointerT3 = funnelSteps[2].LatencyPointer
}
latencyTypeT2 := "0.99"
latencyTypeT3 := "0.99"
if stepStart == stepEnd {
return nil, fmt.Errorf("step start and end cannot be the same for /step/overview")
}
if funnelSteps[0].HasErrors {
containsErrorT1 = 1
}
if funnelSteps[1].HasErrors {
containsErrorT2 = 1
}
if len(funnel.Steps) > 2 && funnelSteps[2].HasErrors {
containsErrorT3 = 1
}
if funnelSteps[1].LatencyType != "" {
latency := strings.ToLower(funnelSteps[1].LatencyType)
if latency == "p90" {
latencyTypeT2 = "0.90"
} else if latency == "p95" {
latencyTypeT2 = "0.95"
} else {
latencyTypeT2 = "0.99"
}
}
if len(funnel.Steps) > 2 && funnelSteps[2].LatencyType != "" {
latency := strings.ToLower(funnelSteps[2].LatencyType)
if latency == "p90" {
latencyTypeT3 = "0.90"
} else if latency == "p95" {
latencyTypeT3 = "0.95"
} else {
latencyTypeT3 = "0.99"
}
}
// Build filter clauses for each step
clauseStep1, err := tracev4.BuildTracesFilter(funnelSteps[0].Filters)
if err != nil {
return nil, err
}
clauseStep2, err := tracev4.BuildTracesFilter(funnelSteps[1].Filters)
if err != nil {
return nil, err
}
clauseStep3 := ""
if len(funnel.Steps) > 2 {
clauseStep3, err = tracev4.BuildTracesFilter(funnelSteps[2].Filters)
if err != nil {
return nil, err
}
}
// Sanitize clauses
clauseStep1 = sanitizeClause(clauseStep1)
clauseStep2 = sanitizeClause(clauseStep2)
clauseStep3 = sanitizeClause(clauseStep3)
if len(funnel.Steps) > 2 {
query = BuildThreeStepFunnelStepOverviewQuery(
containsErrorT1,
containsErrorT2,
containsErrorT3,
latencyPointerT1,
latencyPointerT2,
latencyPointerT3,
timeRange.StartTime,
timeRange.EndTime,
funnelSteps[0].ServiceName,
funnelSteps[0].SpanName,
funnelSteps[1].ServiceName,
funnelSteps[1].SpanName,
funnelSteps[2].ServiceName,
funnelSteps[2].SpanName,
clauseStep1,
clauseStep2,
clauseStep3,
stepStart,
stepEnd,
latencyTypeT2,
latencyTypeT3,
)
} else {
query = BuildTwoStepFunnelStepOverviewQuery(
containsErrorT1,
containsErrorT2,
latencyPointerT1,
latencyPointerT2,
timeRange.StartTime,
timeRange.EndTime,
funnelSteps[0].ServiceName,
funnelSteps[0].SpanName,
funnelSteps[1].ServiceName,
funnelSteps[1].SpanName,
clauseStep1,
clauseStep2,
latencyTypeT2,
)
}
return &v3.ClickHouseQuery{Query: query}, nil
}
func GetStepAnalytics(funnel *tracefunneltypes.StorableFunnel, timeRange tracefunneltypes.TimeRange) (*v3.ClickHouseQuery, error) {
var query string
funnelSteps := funnel.Steps
containsErrorT1 := 0
containsErrorT2 := 0
containsErrorT3 := 0
if funnelSteps[0].HasErrors {
containsErrorT1 = 1
}
if funnelSteps[1].HasErrors {
containsErrorT2 = 1
}
if len(funnel.Steps) > 2 && funnelSteps[2].HasErrors {
containsErrorT3 = 1
}
// Build filter clauses for each step
clauseStep1, err := tracev4.BuildTracesFilter(funnelSteps[0].Filters)
if err != nil {
return nil, err
}
clauseStep2, err := tracev4.BuildTracesFilter(funnelSteps[1].Filters)
if err != nil {
return nil, err
}
clauseStep3 := ""
if len(funnel.Steps) > 2 {
clauseStep3, err = tracev4.BuildTracesFilter(funnelSteps[2].Filters)
if err != nil {
return nil, err
}
}
// Sanitize clauses
clauseStep1 = sanitizeClause(clauseStep1)
clauseStep2 = sanitizeClause(clauseStep2)
clauseStep3 = sanitizeClause(clauseStep3)
if len(funnel.Steps) > 2 {
query = BuildThreeStepFunnelCountQuery(
containsErrorT1,
containsErrorT2,
containsErrorT3,
timeRange.StartTime,
timeRange.EndTime,
funnelSteps[0].ServiceName,
funnelSteps[0].SpanName,
funnelSteps[1].ServiceName,
funnelSteps[1].SpanName,
funnelSteps[2].ServiceName,
funnelSteps[2].SpanName,
clauseStep1,
clauseStep2,
clauseStep3,
)
} else {
query = BuildTwoStepFunnelCountQuery(
containsErrorT1,
containsErrorT2,
timeRange.StartTime,
timeRange.EndTime,
funnelSteps[0].ServiceName,
funnelSteps[0].SpanName,
funnelSteps[1].ServiceName,
funnelSteps[1].SpanName,
clauseStep1,
clauseStep2,
)
}
return &v3.ClickHouseQuery{
Query: query,
}, nil
}
func GetSlowestTraces(funnel *tracefunneltypes.StorableFunnel, timeRange tracefunneltypes.TimeRange, stepStart, stepEnd int64) (*v3.ClickHouseQuery, error) {
funnelSteps := funnel.Steps
containsErrorT1 := 0
containsErrorT2 := 0
stepStartOrder := 0
stepEndOrder := 1
if stepStart != stepEnd {
stepStartOrder = int(stepStart) - 1
stepEndOrder = int(stepEnd) - 1
if funnelSteps[stepStartOrder].HasErrors {
containsErrorT1 = 1
}
if funnelSteps[stepEndOrder].HasErrors {
containsErrorT2 = 1
}
}
// Build filter clauses for the steps
clauseStep1, err := tracev4.BuildTracesFilter(funnelSteps[stepStartOrder].Filters)
if err != nil {
return nil, err
}
clauseStep2, err := tracev4.BuildTracesFilter(funnelSteps[stepEndOrder].Filters)
if err != nil {
return nil, err
}
// Sanitize clauses
clauseStep1 = sanitizeClause(clauseStep1)
clauseStep2 = sanitizeClause(clauseStep2)
query := BuildTwoStepFunnelTopSlowTracesQuery(
containsErrorT1,
containsErrorT2,
timeRange.StartTime,
timeRange.EndTime,
funnelSteps[stepStartOrder].ServiceName,
funnelSteps[stepStartOrder].SpanName,
funnelSteps[stepEndOrder].ServiceName,
funnelSteps[stepEndOrder].SpanName,
clauseStep1,
clauseStep2,
)
return &v3.ClickHouseQuery{Query: query}, nil
}
func GetErroredTraces(funnel *tracefunneltypes.StorableFunnel, timeRange tracefunneltypes.TimeRange, stepStart, stepEnd int64) (*v3.ClickHouseQuery, error) {
funnelSteps := funnel.Steps
containsErrorT1 := 0
containsErrorT2 := 0
stepStartOrder := 0
stepEndOrder := 1
if stepStart != stepEnd {
stepStartOrder = int(stepStart) - 1
stepEndOrder = int(stepEnd) - 1
if funnelSteps[stepStartOrder].HasErrors {
containsErrorT1 = 1
}
if funnelSteps[stepEndOrder].HasErrors {
containsErrorT2 = 1
}
}
// Build filter clauses for the steps
clauseStep1, err := tracev4.BuildTracesFilter(funnelSteps[stepStartOrder].Filters)
if err != nil {
return nil, err
}
clauseStep2, err := tracev4.BuildTracesFilter(funnelSteps[stepEndOrder].Filters)
if err != nil {
return nil, err
}
// Sanitize clauses
clauseStep1 = sanitizeClause(clauseStep1)
clauseStep2 = sanitizeClause(clauseStep2)
query := BuildTwoStepFunnelTopSlowErrorTracesQuery(
containsErrorT1,
containsErrorT2,
timeRange.StartTime,
timeRange.EndTime,
funnelSteps[stepStartOrder].ServiceName,
funnelSteps[stepStartOrder].SpanName,
funnelSteps[stepEndOrder].ServiceName,
funnelSteps[stepEndOrder].SpanName,
clauseStep1,
clauseStep2,
)
return &v3.ClickHouseQuery{Query: query}, nil
}

View File

@@ -2,7 +2,7 @@ package clickhouseprometheus
import (
"encoding/json"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
)
@@ -17,7 +17,12 @@ func unmarshalLabels(s string) ([]prompb.Label, string, error) {
for n, v := range m {
if n == "__name__" {
metricName = v
} else {
if !model.IsValidLegacyMetricName(n) {
n = `"` + n + `"`
}
}
res = append(res, prompb.Label{
Name: n,
Value: v,

View File

@@ -20,6 +20,8 @@ import (
"text/template"
"time"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/apis/fields"
errorsV2 "github.com/SigNoz/signoz/pkg/errors"
@@ -39,7 +41,6 @@ import (
_ "github.com/mattn/go-sqlite3"
"github.com/SigNoz/signoz/pkg/cache"
traceFunnelsModule "github.com/SigNoz/signoz/pkg/modules/tracefunnel"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/inframetrics"
@@ -55,7 +56,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/query-service/contextlinks"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/postprocess"
@@ -65,7 +65,6 @@ import (
"github.com/SigNoz/signoz/pkg/types/licensetypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunneltypes"
"go.uber.org/zap"
@@ -5217,421 +5216,4 @@ func (aH *APIHandler) RegisterTraceFunnelsRoutes(router *mux.Router, am *middlew
traceFunnelsRouter.HandleFunc("/{funnel_id}",
am.EditAccess(aH.Signoz.Handlers.TraceFunnel.UpdateFunnel)).
Methods(http.MethodPut)
// Analytics endpoints
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/validate", aH.handleValidateTraces).Methods("POST")
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/overview", aH.handleFunnelAnalytics).Methods("POST")
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/steps", aH.handleStepAnalytics).Methods("POST")
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/steps/overview", aH.handleFunnelStepAnalytics).Methods("POST")
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/slow-traces", aH.handleFunnelSlowTraces).Methods("POST")
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/error-traces", aH.handleFunnelErrorTraces).Methods("POST")
// Analytics endpoints
traceFunnelsRouter.HandleFunc("/analytics/validate", aH.handleValidateTracesWithPayload).Methods("POST")
traceFunnelsRouter.HandleFunc("/analytics/overview", aH.handleFunnelAnalyticsWithPayload).Methods("POST")
traceFunnelsRouter.HandleFunc("/analytics/steps", aH.handleStepAnalyticsWithPayload).Methods("POST")
traceFunnelsRouter.HandleFunc("/analytics/steps/overview", aH.handleFunnelStepAnalyticsWithPayload).Methods("POST")
traceFunnelsRouter.HandleFunc("/analytics/slow-traces", aH.handleFunnelSlowTracesWithPayload).Methods("POST")
traceFunnelsRouter.HandleFunc("/analytics/error-traces", aH.handleFunnelErrorTracesWithPayload).Methods("POST")
}
func (aH *APIHandler) handleValidateTraces(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
funnelID := vars["funnel_id"]
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
funnel, err := aH.Signoz.Modules.TraceFunnel.Get(r.Context(), valuer.MustNewUUID(funnelID), valuer.MustNewUUID(claims.OrgID))
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
return
}
var timeRange traceFunnels.TimeRange
if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding time range: %v", err)}, nil)
return
}
if len(funnel.Steps) < 2 {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("funnel must have at least 2 steps")}, nil)
return
}
chq, err := traceFunnelsModule.ValidateTraces(funnel, timeRange)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}
func (aH *APIHandler) handleFunnelAnalytics(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
funnelID := vars["funnel_id"]
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
funnel, err := aH.Signoz.Modules.TraceFunnel.Get(r.Context(), valuer.MustNewUUID(funnelID), valuer.MustNewUUID(claims.OrgID))
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
return
}
var stepTransition traceFunnels.StepTransitionRequest
if err := json.NewDecoder(r.Body).Decode(&stepTransition); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding time range: %v", err)}, nil)
return
}
chq, err := traceFunnelsModule.GetFunnelAnalytics(funnel, stepTransition.TimeRange)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}
func (aH *APIHandler) handleFunnelStepAnalytics(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
funnelID := vars["funnel_id"]
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
funnel, err := aH.Signoz.Modules.TraceFunnel.Get(r.Context(), valuer.MustNewUUID(funnelID), valuer.MustNewUUID(claims.OrgID))
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
return
}
var stepTransition traceFunnels.StepTransitionRequest
if err := json.NewDecoder(r.Body).Decode(&stepTransition); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding time range: %v", err)}, nil)
return
}
chq, err := traceFunnelsModule.GetFunnelStepAnalytics(funnel, stepTransition.TimeRange, stepTransition.StepStart, stepTransition.StepEnd)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}
func (aH *APIHandler) handleStepAnalytics(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
funnelID := vars["funnel_id"]
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
funnel, err := aH.Signoz.Modules.TraceFunnel.Get(r.Context(), valuer.MustNewUUID(funnelID), valuer.MustNewUUID(claims.OrgID))
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
return
}
var timeRange traceFunnels.TimeRange
if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding time range: %v", err)}, nil)
return
}
chq, err := traceFunnelsModule.GetStepAnalytics(funnel, timeRange)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}
func (aH *APIHandler) handleFunnelSlowTraces(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
funnelID := vars["funnel_id"]
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
funnel, err := aH.Signoz.Modules.TraceFunnel.Get(r.Context(), valuer.MustNewUUID(funnelID), valuer.MustNewUUID(claims.OrgID))
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
return
}
var req traceFunnels.StepTransitionRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid request body: %v", err)}, nil)
return
}
chq, err := traceFunnelsModule.GetSlowestTraces(funnel, req.TimeRange, req.StepStart, req.StepEnd)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}
func (aH *APIHandler) handleFunnelErrorTraces(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
funnelID := vars["funnel_id"]
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
funnel, err := aH.Signoz.Modules.TraceFunnel.Get(r.Context(), valuer.MustNewUUID(funnelID), valuer.MustNewUUID(claims.OrgID))
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
return
}
var req traceFunnels.StepTransitionRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid request body: %v", err)}, nil)
return
}
chq, err := traceFunnelsModule.GetErroredTraces(funnel, req.TimeRange, req.StepStart, req.StepEnd)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}
func (aH *APIHandler) handleValidateTracesWithPayload(w http.ResponseWriter, r *http.Request) {
var req traceFunnels.PostableFunnel
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding request: %v", err)}, nil)
return
}
if len(req.Steps) < 2 {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("funnel must have at least 2 steps")}, nil)
return
}
// Create a StorableFunnel from the request
funnel := &traceFunnels.StorableFunnel{
Steps: req.Steps,
}
chq, err := traceFunnelsModule.ValidateTraces(funnel, traceFunnels.TimeRange{
StartTime: req.StartTime,
EndTime: req.EndTime,
})
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}
func (aH *APIHandler) handleFunnelAnalyticsWithPayload(w http.ResponseWriter, r *http.Request) {
var req traceFunnels.PostableFunnel
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding request: %v", err)}, nil)
return
}
funnel := &traceFunnels.StorableFunnel{
Steps: req.Steps,
}
chq, err := traceFunnelsModule.GetFunnelAnalytics(funnel, traceFunnels.TimeRange{
StartTime: req.StartTime,
EndTime: req.EndTime,
})
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}
func (aH *APIHandler) handleStepAnalyticsWithPayload(w http.ResponseWriter, r *http.Request) {
var req traceFunnels.PostableFunnel
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding request: %v", err)}, nil)
return
}
funnel := &traceFunnels.StorableFunnel{
Steps: req.Steps,
}
chq, err := traceFunnelsModule.GetStepAnalytics(funnel, traceFunnels.TimeRange{
StartTime: req.StartTime,
EndTime: req.EndTime,
})
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}
func (aH *APIHandler) handleFunnelStepAnalyticsWithPayload(w http.ResponseWriter, r *http.Request) {
var req traceFunnels.PostableFunnel
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding request: %v", err)}, nil)
return
}
funnel := &traceFunnels.StorableFunnel{
Steps: req.Steps,
}
chq, err := traceFunnelsModule.GetFunnelStepAnalytics(funnel, traceFunnels.TimeRange{
StartTime: req.StartTime,
EndTime: req.EndTime,
}, req.StepStart, req.StepEnd)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}
func (aH *APIHandler) handleFunnelSlowTracesWithPayload(w http.ResponseWriter, r *http.Request) {
var req traceFunnels.PostableFunnel
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding request: %v", err)}, nil)
return
}
funnel := &traceFunnels.StorableFunnel{
Steps: req.Steps,
}
chq, err := traceFunnelsModule.GetSlowestTraces(funnel, traceFunnels.TimeRange{
StartTime: req.StartTime,
EndTime: req.EndTime,
}, req.StepStart, req.StepEnd)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}
func (aH *APIHandler) handleFunnelErrorTracesWithPayload(w http.ResponseWriter, r *http.Request) {
var req traceFunnels.PostableFunnel
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding request: %v", err)}, nil)
return
}
funnel := &traceFunnels.StorableFunnel{
Steps: req.Steps,
}
chq, err := traceFunnelsModule.GetErroredTraces(funnel, traceFunnels.TimeRange{
StartTime: req.StartTime,
EndTime: req.EndTime,
}, req.StepStart, req.StepEnd)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}

View File

@@ -148,59 +148,6 @@ func BuildTracesFilterQuery(fs *v3.FilterSet) (string, error) {
return queryString, nil
}
func BuildTracesFilter(fs *v3.FilterSet) (string, error) {
var conditions []string
if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items {
val := item.Value
// generate the key
columnName := getColumnName(item.Key)
var fmtVal string
item.Operator = v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
if item.Operator != v3.FilterOperatorExists && item.Operator != v3.FilterOperatorNotExists {
var err error
val, err = utils.ValidateAndCastValue(val, item.Key.DataType)
if err != nil {
return "", fmt.Errorf("invalid value for key %s: %v", item.Key.Key, err)
}
}
if val != nil {
fmtVal = utils.ClickHouseFormattedValue(val)
}
if operator, ok := tracesOperatorMappingV3[item.Operator]; ok {
switch item.Operator {
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
// we also want to treat %, _ as literals for contains
val := utils.QuoteEscapedStringForContains(fmt.Sprintf("%s", item.Value), false)
conditions = append(conditions, fmt.Sprintf("%s %s '%%%s%%'", columnName, operator, val))
case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex:
conditions = append(conditions, fmt.Sprintf(operator, columnName, fmtVal))
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
if item.Key.IsColumn {
subQuery, err := existsSubQueryForFixedColumn(item.Key, item.Operator)
if err != nil {
return "", err
}
conditions = append(conditions, subQuery)
} else {
cType := getClickHouseTracesColumnType(item.Key.Type)
cDataType := getClickHouseTracesColumnDataType(item.Key.DataType)
col := fmt.Sprintf("%s_%s", cType, cDataType)
conditions = append(conditions, fmt.Sprintf(operator, col, item.Key.Key))
}
default:
conditions = append(conditions, fmt.Sprintf("%s %s %s", columnName, operator, fmtVal))
}
} else {
return "", fmt.Errorf("unsupported operator %s", item.Operator)
}
}
}
return strings.Join(conditions, " AND "), nil
}
func handleEmptyValuesInGroupBy(groupBy []v3.AttributeKey) (string, error) {
// TODO(nitya): in future when we support user based mat column handle them
// skipping now as we don't support creating them

View File

@@ -12,6 +12,7 @@ var (
QueryTypeFormula = QueryType{valuer.NewString("builder_formula")}
QueryTypeSubQuery = QueryType{valuer.NewString("builder_sub_query")}
QueryTypeJoin = QueryType{valuer.NewString("builder_join")}
QueryTypeTraceOperator = QueryType{valuer.NewString("builder_trace_operator")}
QueryTypeClickHouseSQL = QueryType{valuer.NewString("clickhouse_sql")}
QueryTypePromQL = QueryType{valuer.NewString("promql")}
)

View File

@@ -74,6 +74,13 @@ func (q *QueryEnvelope) UnmarshalJSON(data []byte) error {
}
q.Spec = spec
case QueryTypeTraceOperator:
var spec QueryBuilderTraceOperator
if err := json.Unmarshal(shadow.Spec, &spec); err != nil {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid trace operator spec")
}
q.Spec = spec
case QueryTypePromQL:
var spec PromQuery
if err := json.Unmarshal(shadow.Spec, &spec); err != nil {

View File

@@ -102,6 +102,341 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
},
wantErr: false,
},
{
name: "valid trace operator query with simple expression",
jsonData: `{
"schemaVersion": "v1",
"start": 1640995200000,
"end": 1640998800000,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"filter": {
"expression": "service.name = 'checkoutservice'"
}
}
},
{
"type": "builder_trace_operator",
"spec": {
"name": "trace_flow_analysis",
"expression": "A => B",
"filter": {
"expression": "trace_duration > 200ms AND span_count >= 5"
},
"orderBy": [{
"key": {
"name": "trace_duration"
},
"direction": "desc"
}],
"limit": 100,
"cursor": "eyJsYXN0X3RyYWNlX2lkIjoiYWJjZGVmIn0="
}
}
]
},
"variables": {
"service": "frontend"
}
}`,
expected: QueryRangeRequest{
SchemaVersion: "v1",
Start: 1640995200000,
End: 1640998800000,
RequestType: RequestTypeTimeSeries,
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Filter: &Filter{
Expression: "service.name = 'checkoutservice'",
},
},
},
{
Type: QueryTypeTraceOperator,
Spec: QueryBuilderTraceOperator{
Name: "trace_flow_analysis",
Expression: "A => B",
Filter: &Filter{
Expression: "trace_duration > 200ms AND span_count >= 5",
},
Order: []OrderBy{{
Key: OrderByKey{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "trace_duration"}},
Direction: OrderDirectionDesc,
}},
Limit: 100,
Cursor: "eyJsYXN0X3RyYWNlX2lkIjoiYWJjZGVmIn0=",
},
},
},
},
Variables: map[string]any{
"service": "frontend",
},
},
wantErr: false,
},
{
name: "valid trace operator with complex expression and span_count ordering",
jsonData: `{
"schemaVersion": "v1",
"start": 1640995200000,
"end": 1640998800000,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"filter": { "expression": "service.name = 'frontend'" }
}
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "traces",
"filter": { "expression": "hasError = true" }
}
},
{
"type": "builder_query",
"spec": {
"name": "C",
"signal": "traces",
"filter": { "expression": "response_status_code = '200'" }
}
},
{
"type": "builder_trace_operator",
"spec": {
"name": "complex_trace_analysis",
"expression": "A => (B && NOT C)",
"filter": { "expression": "trace_duration BETWEEN 100ms AND 5s AND span_count IN (5, 10, 15)" },
"orderBy": [{
"key": { "name": "span_count" },
"direction": "asc"
}],
"limit": 50,
"functions": [{ "name": "absolute", "args": [] }]
}
}
]
}
}`,
expected: QueryRangeRequest{
SchemaVersion: "v1",
Start: 1640995200000,
End: 1640998800000,
RequestType: RequestTypeTimeSeries,
CompositeQuery: CompositeQuery{Queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Filter: &Filter{Expression: "service.name = 'frontend'"},
},
},
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
Filter: &Filter{Expression: "hasError = true"},
},
},
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "C",
Signal: telemetrytypes.SignalTraces,
Filter: &Filter{Expression: "response_status_code = '200'"},
},
},
{
Type: QueryTypeTraceOperator,
Spec: QueryBuilderTraceOperator{
Name: "complex_trace_analysis",
Expression: "A => (B && NOT C)",
Filter: &Filter{Expression: "trace_duration BETWEEN 100ms AND 5s AND span_count IN (5, 10, 15)"},
Order: []OrderBy{{
Key: OrderByKey{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: OrderBySpanCount.StringValue()}},
Direction: OrderDirectionAsc,
}},
Limit: 50,
Functions: []Function{{Name: FunctionNameAbsolute, Args: []FunctionArg{}}},
},
},
}},
},
wantErr: false,
},
{
name: "valid trace operator with NOT expression",
jsonData: `{
"schemaVersion": "v1",
"start": 1640995200000,
"end": 1640998800000,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"filter": {
"expression": "service.name = 'frontend'"
}
}
},
{
"type": "builder_trace_operator",
"spec": {
"name": "not_trace_analysis",
"expression": "NOT A",
"filter": {
"expression": "trace_duration < 1s"
},
"disabled": false
}
}
]
}
}`,
expected: QueryRangeRequest{
SchemaVersion: "v1",
Start: 1640995200000,
End: 1640998800000,
RequestType: RequestTypeTimeSeries,
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Filter: &Filter{
Expression: "service.name = 'frontend'",
},
},
},
{
Type: QueryTypeTraceOperator,
Spec: QueryBuilderTraceOperator{
Name: "not_trace_analysis",
Expression: "NOT A",
Filter: &Filter{
Expression: "trace_duration < 1s",
},
Disabled: false,
},
},
},
},
},
wantErr: false,
},
{
name: "trace operator with binary NOT (exclusion)",
jsonData: `{
"schemaVersion": "v1",
"start": 1640995200000,
"end": 1640998800000,
"requestType": "time_series",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"filter": {
"expression": "service.name = 'frontend'"
}
}
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "traces",
"filter": {
"expression": "hasError = true"
}
}
},
{
"type": "builder_trace_operator",
"spec": {
"name": "exclusion_analysis",
"expression": "A NOT B",
"filter": {
"expression": "span_count > 3"
},
"limit": 75
}
}
]
}
}`,
expected: QueryRangeRequest{
SchemaVersion: "v1",
Start: 1640995200000,
End: 1640998800000,
RequestType: RequestTypeTimeSeries,
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Filter: &Filter{
Expression: "service.name = 'frontend'",
},
},
},
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
Filter: &Filter{
Expression: "hasError = true",
},
},
},
{
Type: QueryTypeTraceOperator,
Spec: QueryBuilderTraceOperator{
Name: "exclusion_analysis",
Expression: "A NOT B",
Filter: &Filter{
Expression: "span_count > 3",
},
Limit: 75,
},
},
},
},
},
wantErr: false,
},
{
name: "valid log builder query",
jsonData: `{
@@ -120,8 +455,8 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
"expression": "severity_text = 'ERROR'"
},
"selectFields": [{
"key": "body",
"type": "log"
"name": "body",
"fieldContext": "log"
}],
"limit": 50,
"offset": 10
@@ -177,8 +512,8 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
}],
"stepInterval": 120,
"groupBy": [{
"key": "method",
"type": "tag"
"name": "method",
"fieldContext": "attribute"
}]
}
}]
@@ -270,7 +605,7 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
"name": "error_rate",
"expression": "A / B * 100",
"functions": [{
"name": "cut_off_min",
"name": "cutOffMin",
"args": [{
"value": "0.3"
}]
@@ -436,7 +771,6 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
}
},
{
"name": "B",
"type": "builder_formula",
"spec": {
"name": "rate",
@@ -526,7 +860,6 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
"requestType": "time_series",
"compositeQuery": {
"queries": [{
"name": "A",
"type": "unknown_type",
"spec": {}
}]
@@ -543,9 +876,9 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
"requestType": "time_series",
"compositeQuery": {
"queries": [{
"name": "A",
"type": "builder_query",
"spec": {
"name": "A",
"signal": "unknown_signal",
"aggregations": []
}
@@ -563,9 +896,9 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
"requestType": "time_series",
"compositeQuery": {
"queries": [{
"name": "A",
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"aggregations": [],
"stepInterval": "invalid_duration"
@@ -650,6 +983,21 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
assert.Equal(t, expectedSpec.Right.Name, actualSpec.Right.Name)
assert.Equal(t, expectedSpec.Type, actualSpec.Type)
assert.Equal(t, expectedSpec.On, actualSpec.On)
case QueryTypeTraceOperator:
expectedSpec := expectedQuery.Spec.(QueryBuilderTraceOperator)
actualSpec, ok := actualQuery.Spec.(QueryBuilderTraceOperator)
require.True(t, ok, "Expected QueryBuilderTraceOperator but got %T", actualQuery.Spec)
assert.Equal(t, expectedSpec.Name, actualSpec.Name)
assert.Equal(t, expectedSpec.Expression, actualSpec.Expression)
assert.Equal(t, expectedSpec.Limit, actualSpec.Limit)
assert.Equal(t, expectedSpec.Cursor, actualSpec.Cursor)
assert.Equal(t, len(expectedSpec.Order), len(actualSpec.Order))
for i, expectedOrder := range expectedSpec.Order {
if i < len(actualSpec.Order) {
assert.Equal(t, expectedOrder.Key.Name, actualSpec.Order[i].Key.Name)
assert.Equal(t, expectedOrder.Direction, actualSpec.Order[i].Direction)
}
}
case QueryTypePromQL:
expectedSpec := expectedQuery.Spec.(PromQuery)
actualSpec, ok := actualQuery.Spec.(PromQuery)
@@ -673,3 +1021,507 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
})
}
}
func TestParseTraceExpression(t *testing.T) {
tests := []struct {
name string
expression string
expectError bool
checkResult func(t *testing.T, result *TraceOperand)
}{
{
name: "simple query reference",
expression: "A",
expectError: false,
checkResult: func(t *testing.T, result *TraceOperand) {
assert.NotNil(t, result.QueryRef)
assert.Equal(t, "A", result.QueryRef.Name)
assert.Nil(t, result.Operator)
},
},
{
name: "simple implication",
expression: "A => B",
expectError: false,
checkResult: func(t *testing.T, result *TraceOperand) {
assert.NotNil(t, result.Operator)
assert.Equal(t, TraceOperatorDirectDescendant, *result.Operator)
assert.NotNil(t, result.Left)
assert.NotNil(t, result.Right)
assert.Equal(t, "A", result.Left.QueryRef.Name)
assert.Equal(t, "B", result.Right.QueryRef.Name)
},
},
{
name: "and operation",
expression: "A && B",
expectError: false,
checkResult: func(t *testing.T, result *TraceOperand) {
assert.NotNil(t, result.Operator)
assert.Equal(t, TraceOperatorAnd, *result.Operator)
assert.Equal(t, "A", result.Left.QueryRef.Name)
assert.Equal(t, "B", result.Right.QueryRef.Name)
},
},
{
name: "or operation",
expression: "A || B",
expectError: false,
checkResult: func(t *testing.T, result *TraceOperand) {
assert.NotNil(t, result.Operator)
assert.Equal(t, TraceOperatorOr, *result.Operator)
assert.Equal(t, "A", result.Left.QueryRef.Name)
assert.Equal(t, "B", result.Right.QueryRef.Name)
},
},
{
name: "unary NOT operation",
expression: "NOT A",
expectError: false,
checkResult: func(t *testing.T, result *TraceOperand) {
assert.NotNil(t, result.Operator)
assert.Equal(t, TraceOperatorNot, *result.Operator)
assert.NotNil(t, result.Left)
assert.Nil(t, result.Right)
assert.Equal(t, "A", result.Left.QueryRef.Name)
},
},
{
name: "binary NOT operation",
expression: "A NOT B",
expectError: false,
checkResult: func(t *testing.T, result *TraceOperand) {
assert.NotNil(t, result.Operator)
assert.Equal(t, TraceOperatorExclude, *result.Operator)
assert.NotNil(t, result.Left)
assert.NotNil(t, result.Right)
assert.Equal(t, "A", result.Left.QueryRef.Name)
assert.Equal(t, "B", result.Right.QueryRef.Name)
},
},
{
name: "complex expression with precedence",
expression: "A => B && C || D",
expectError: false,
checkResult: func(t *testing.T, result *TraceOperand) {
// Should parse as: A => (B && (C || D)) due to precedence: NOT > || > && > =>
// The parsing finds operators from lowest precedence first
assert.NotNil(t, result.Operator)
assert.Equal(t, TraceOperatorDirectDescendant, *result.Operator)
assert.Equal(t, "A", result.Left.QueryRef.Name)
// Right side should be an AND operation (next lowest precedence after =>)
assert.NotNil(t, result.Right.Operator)
assert.Equal(t, TraceOperatorAnd, *result.Right.Operator)
},
},
{
name: "simple parentheses",
expression: "(A)",
expectError: false,
checkResult: func(t *testing.T, result *TraceOperand) {
assert.NotNil(t, result.QueryRef)
assert.Equal(t, "A", result.QueryRef.Name)
assert.Nil(t, result.Operator)
},
},
{
name: "parentheses expression",
expression: "A => (B || C)",
expectError: false,
checkResult: func(t *testing.T, result *TraceOperand) {
assert.NotNil(t, result.Operator)
assert.Equal(t, TraceOperatorDirectDescendant, *result.Operator)
assert.Equal(t, "A", result.Left.QueryRef.Name)
// Right side should be an OR operation
assert.NotNil(t, result.Right.Operator)
assert.Equal(t, TraceOperatorOr, *result.Right.Operator)
assert.Equal(t, "B", result.Right.Left.QueryRef.Name)
assert.Equal(t, "C", result.Right.Right.QueryRef.Name)
},
},
{
name: "nested NOT with parentheses",
expression: "NOT (A && B)",
expectError: false,
checkResult: func(t *testing.T, result *TraceOperand) {
assert.NotNil(t, result.Operator)
assert.Equal(t, TraceOperatorNot, *result.Operator)
assert.Nil(t, result.Right) // Unary operator
// Left side should be an AND operation
assert.NotNil(t, result.Left.Operator)
assert.Equal(t, TraceOperatorAnd, *result.Left.Operator)
},
},
{
name: "invalid query reference with numbers",
expression: "123",
expectError: true,
},
{
name: "invalid query reference with special chars",
expression: "A-B",
expectError: true,
},
{
name: "empty expression",
expression: "",
expectError: true,
},
{
name: "expression with extra whitespace",
expression: " A => B ",
expectError: false,
checkResult: func(t *testing.T, result *TraceOperand) {
assert.NotNil(t, result.Operator)
assert.Equal(t, TraceOperatorDirectDescendant, *result.Operator)
assert.Equal(t, "A", result.Left.QueryRef.Name)
assert.Equal(t, "B", result.Right.QueryRef.Name)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := parseTraceExpression(tt.expression)
if tt.expectError {
assert.Error(t, err)
assert.Nil(t, result)
return
}
require.NoError(t, err)
require.NotNil(t, result)
if tt.checkResult != nil {
tt.checkResult(t, result)
}
})
}
}
func TestQueryBuilderTraceOperator_ValidateTraceOperator(t *testing.T) {
tests := []struct {
name string
traceOperator QueryBuilderTraceOperator
queries []QueryEnvelope
expectError bool
errorContains string
}{
{
name: "valid trace operator with trace queries",
traceOperator: QueryBuilderTraceOperator{
Name: "test_operator",
Expression: "A => B",
Filter: &Filter{
Expression: "trace_duration > 200ms",
},
Order: []OrderBy{{
Key: OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: OrderByTraceDuration.StringValue(),
FieldContext: telemetrytypes.FieldContextSpan,
},
},
Direction: OrderDirectionDesc,
}},
Limit: 100,
},
queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
},
},
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
},
},
},
expectError: false,
},
{
name: "empty expression",
traceOperator: QueryBuilderTraceOperator{
Name: "test_operator",
Expression: "",
},
queries: []QueryEnvelope{},
expectError: true,
errorContains: "expression cannot be empty",
},
{
name: "referenced query does not exist",
traceOperator: QueryBuilderTraceOperator{
Name: "test_operator",
Expression: "A => B",
},
queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
},
},
},
expectError: true,
errorContains: "query 'B' referenced in trace operator expression does not exist or is not a trace query",
},
{
name: "referenced query is not trace signal",
traceOperator: QueryBuilderTraceOperator{
Name: "test_operator",
Expression: "A => B",
},
queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
},
},
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[LogAggregation]{
Name: "B",
Signal: telemetrytypes.SignalLogs,
},
},
},
expectError: true,
errorContains: "query 'B' referenced in trace operator expression does not exist or is not a trace query",
},
{
name: "invalid orderBy field",
traceOperator: QueryBuilderTraceOperator{
Name: "test_operator",
Expression: "A",
Order: []OrderBy{{
Key: OrderByKey{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "invalid_string"}},
Direction: OrderDirectionDesc,
}},
},
queries: []QueryEnvelope{{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{Name: "A", Signal: telemetrytypes.SignalTraces},
}},
expectError: true,
errorContains: "orderBy[0] field must be either 'span_count' or 'trace_duration'",
},
{
name: "invalid pagination limit",
traceOperator: QueryBuilderTraceOperator{
Name: "test_operator",
Expression: "A",
Limit: -1,
},
queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
},
},
},
expectError: true,
errorContains: "limit must be non-negative",
},
{
name: "limit exceeds maximum",
traceOperator: QueryBuilderTraceOperator{
Name: "test_operator",
Expression: "A",
Limit: 15000,
},
queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
},
},
},
expectError: true,
errorContains: "limit cannot exceed 10000",
},
{
name: "valid returnSpansFrom",
traceOperator: QueryBuilderTraceOperator{
Name: "test_operator",
Expression: "A => B",
ReturnSpansFrom: "A",
},
queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
},
},
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
},
},
},
expectError: false,
},
{
name: "returnSpansFrom references non-existent query",
traceOperator: QueryBuilderTraceOperator{
Name: "test_operator",
Expression: "A => B",
ReturnSpansFrom: "C",
},
queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
},
},
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
},
},
},
expectError: true,
errorContains: "returnSpansFrom references query 'C' which does not exist or is not a trace query",
},
{
name: "returnSpansFrom references query not in expression",
traceOperator: QueryBuilderTraceOperator{
Name: "test_operator",
Expression: "A => B",
ReturnSpansFrom: "C",
},
queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
},
},
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
},
},
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "C",
Signal: telemetrytypes.SignalTraces,
},
},
},
expectError: true,
errorContains: "returnSpansFrom references query 'C' which is not used in the expression",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.traceOperator.ValidateTraceOperator(tt.queries)
if tt.expectError {
assert.Error(t, err)
if tt.errorContains != "" {
assert.Contains(t, err.Error(), tt.errorContains)
}
} else {
assert.NoError(t, err)
}
})
}
}
func TestValidateUniqueTraceOperator(t *testing.T) {
tests := []struct {
name string
queries []QueryEnvelope
expectError bool
errorContains string
}{
{
name: "no trace operators",
queries: []QueryEnvelope{
{Type: QueryTypeBuilder},
{Type: QueryTypeFormula},
},
expectError: false,
},
{
name: "single trace operator",
queries: []QueryEnvelope{
{Type: QueryTypeBuilder},
{
Type: QueryTypeTraceOperator,
Spec: QueryBuilderTraceOperator{
Name: "T1",
},
},
{Type: QueryTypeFormula},
},
expectError: false,
},
{
name: "multiple trace operators",
queries: []QueryEnvelope{
{Type: QueryTypeBuilder},
{
Type: QueryTypeTraceOperator,
Spec: QueryBuilderTraceOperator{
Name: "T1",
},
},
{
Type: QueryTypeTraceOperator,
Spec: QueryBuilderTraceOperator{
Name: "T2",
},
},
{Type: QueryTypeFormula},
},
expectError: true,
errorContains: "only one trace operator is allowed per request, found 2 trace operators: [T1 T2]",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := ValidateUniqueTraceOperator(tt.queries)
if tt.expectError {
assert.Error(t, err)
if tt.errorContains != "" {
assert.Contains(t, err.Error(), tt.errorContains)
}
} else {
assert.NoError(t, err)
}
})
}
}

View File

@@ -0,0 +1,438 @@
package querybuildertypesv5
import (
"regexp"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type TraceOperatorType struct{ valuer.String }
var (
TraceOperatorDirectDescendant = TraceOperatorType{valuer.NewString("=>")}
TraceOperatorIndirectDescendant = TraceOperatorType{valuer.NewString("->")}
TraceOperatorAnd = TraceOperatorType{valuer.NewString("&&")}
TraceOperatorOr = TraceOperatorType{valuer.NewString("||")}
TraceOperatorNot = TraceOperatorType{valuer.NewString("NOT")}
TraceOperatorExclude = TraceOperatorType{valuer.NewString("NOT")}
)
type TraceOrderBy struct {
valuer.String
}
var (
OrderBySpanCount = TraceOrderBy{valuer.NewString("span_count")}
OrderByTraceDuration = TraceOrderBy{valuer.NewString("trace_duration")}
)
type QueryBuilderTraceOperator struct {
Name string `json:"name"`
Disabled bool `json:"disabled,omitempty"`
Expression string `json:"expression"`
Filter *Filter `json:"filter,omitempty"`
// User-configurable span return strategy - which query's spans to return
ReturnSpansFrom string `json:"returnSpansFrom,omitempty"`
// Trace-specific ordering (only span_count and trace_duration allowed)
Order []OrderBy `json:"orderBy,omitempty"`
Aggregations []TraceAggregation `json:"aggregations,omitempty"`
StepInterval Step `json:"stepInterval,omitempty"`
GroupBy []GroupByKey `json:"groupBy,omitempty"`
Limit int `json:"limit,omitempty"`
Cursor string `json:"cursor,omitempty"`
// Other post-processing options
SelectFields []telemetrytypes.TelemetryFieldKey `json:"selectFields,omitempty"`
Functions []Function `json:"functions,omitempty"`
// Internal parsed representation (not exposed in JSON)
ParsedExpression *TraceOperand `json:"-"`
}
// TraceOperand represents the internal parsed tree structure
type TraceOperand struct {
// For leaf nodes - reference to a query
QueryRef *TraceOperatorQueryRef `json:"-"`
// For nested operations
Operator *TraceOperatorType `json:"-"`
Left *TraceOperand `json:"-"`
Right *TraceOperand `json:"-"`
}
// TraceOperatorQueryRef represents a reference to another query
type TraceOperatorQueryRef struct {
Name string `json:"name"`
}
// ParseExpression parses the expression string into a tree structure
func (q *QueryBuilderTraceOperator) ParseExpression() error {
if q.Expression == "" {
return errors.WrapInvalidInputf(
nil,
errors.CodeInvalidInput,
"expression cannot be empty",
)
}
parsed, err := parseTraceExpression(q.Expression)
if err != nil {
return errors.WrapInvalidInputf(
err,
errors.CodeInvalidInput,
"failed to parse expression '%s'",
q.Expression,
)
}
q.ParsedExpression = parsed
return nil
}
// ValidateTraceOperator validates that all referenced queries exist and are trace queries
func (q *QueryBuilderTraceOperator) ValidateTraceOperator(queries []QueryEnvelope) error {
// Parse the expression
if err := q.ParseExpression(); err != nil {
return err
}
// Validate orderBy field if present
if err := q.ValidateOrderBy(); err != nil {
return err
}
// Validate pagination parameters
if err := q.ValidatePagination(); err != nil {
return err
}
// Create a map of query names to track if they exist and their signal type
availableQueries := make(map[string]telemetrytypes.Signal)
// Only collect trace queries
for _, query := range queries {
if query.Type == QueryTypeBuilder {
switch spec := query.Spec.(type) {
case QueryBuilderQuery[TraceAggregation]:
if spec.Signal == telemetrytypes.SignalTraces {
availableQueries[spec.Name] = spec.Signal
}
}
}
}
// Get all query names referenced in the expression
referencedQueries := q.collectReferencedQueries(q.ParsedExpression)
// Validate that all referenced queries exist and are trace queries
for _, queryName := range referencedQueries {
signal, exists := availableQueries[queryName]
if !exists {
return errors.WrapInvalidInputf(
nil,
errors.CodeInvalidInput,
"query '%s' referenced in trace operator expression does not exist or is not a trace query",
queryName,
)
}
// This check is redundant since we only add trace queries to availableQueries, but keeping for clarity
if signal != telemetrytypes.SignalTraces {
return errors.WrapInvalidInputf(
nil,
errors.CodeInvalidInput,
"query '%s' must be a trace query, but found signal '%s'",
queryName,
signal,
)
}
}
// Validate ReturnSpansFrom if specified
if q.ReturnSpansFrom != "" {
if _, exists := availableQueries[q.ReturnSpansFrom]; !exists {
return errors.WrapInvalidInputf(
nil,
errors.CodeInvalidInput,
"returnSpansFrom references query '%s' which does not exist or is not a trace query",
q.ReturnSpansFrom,
)
}
// Ensure the query is referenced in the expression
found := false
for _, queryName := range referencedQueries {
if queryName == q.ReturnSpansFrom {
found = true
break
}
}
if !found {
return errors.WrapInvalidInputf(
nil,
errors.CodeInvalidInput,
"returnSpansFrom references query '%s' which is not used in the expression '%s'",
q.ReturnSpansFrom,
q.Expression,
)
}
}
return nil
}
// ValidateOrderBy validates the orderBy field
func (q *QueryBuilderTraceOperator) ValidateOrderBy() error {
if len(q.Order) == 0 {
return nil
}
for i, orderBy := range q.Order {
// Validate field is one of the allowed values
fieldName := orderBy.Key.Name
if fieldName != OrderBySpanCount.StringValue() && fieldName != OrderByTraceDuration.StringValue() {
return errors.WrapInvalidInputf(
nil,
errors.CodeInvalidInput,
"orderBy[%d] field must be either '%s' or '%s', got '%s'",
i, OrderBySpanCount.StringValue(), OrderByTraceDuration.StringValue(), fieldName,
)
}
// Validate direction
if orderBy.Direction != OrderDirectionAsc && orderBy.Direction != OrderDirectionDesc {
return errors.WrapInvalidInputf(
nil,
errors.CodeInvalidInput,
"orderBy[%d] direction must be either 'asc' or 'desc', got '%s'",
i, orderBy.Direction,
)
}
}
return nil
}
// ValidatePagination validates pagination parameters (AIP-158 compliance)
func (q *QueryBuilderTraceOperator) ValidatePagination() error {
if q.Limit < 0 {
return errors.WrapInvalidInputf(
nil,
errors.CodeInvalidInput,
"limit must be non-negative, got %d",
q.Limit,
)
}
// For production use, you might want to enforce maximum limits
if q.Limit > 10000 {
return errors.WrapInvalidInputf(
nil,
errors.CodeInvalidInput,
"limit cannot exceed 10000, got %d",
q.Limit,
)
}
return nil
}
// collectReferencedQueries collects all query names referenced in the expression tree
func (q *QueryBuilderTraceOperator) collectReferencedQueries(operand *TraceOperand) []string {
if operand == nil {
return nil
}
var queries []string
if operand.QueryRef != nil {
queries = append(queries, operand.QueryRef.Name)
}
// Recursively collect from children
queries = append(queries, q.collectReferencedQueries(operand.Left)...)
queries = append(queries, q.collectReferencedQueries(operand.Right)...)
// Remove duplicates
seen := make(map[string]bool)
unique := []string{}
for _, q := range queries {
if !seen[q] {
seen[q] = true
unique = append(unique, q)
}
}
return unique
}
// ValidateUniqueTraceOperator ensures only one trace operator exists in queries
func ValidateUniqueTraceOperator(queries []QueryEnvelope) error {
traceOperatorCount := 0
var traceOperatorNames []string
for _, query := range queries {
if query.Type == QueryTypeTraceOperator {
// Extract the name from the trace operator spec
if spec, ok := query.Spec.(QueryBuilderTraceOperator); ok {
traceOperatorCount++
traceOperatorNames = append(traceOperatorNames, spec.Name)
}
}
}
if traceOperatorCount > 1 {
return errors.WrapInvalidInputf(
nil,
errors.CodeInvalidInput,
"only one trace operator is allowed per request, found %d trace operators: %v",
traceOperatorCount,
traceOperatorNames,
)
}
return nil
}
// parseTraceExpression parses an expression string into a tree structure
// Handles precedence: NOT (highest) > || > && > => (lowest)
func parseTraceExpression(expr string) (*TraceOperand, error) {
expr = strings.TrimSpace(expr)
// Handle parentheses
if strings.HasPrefix(expr, "(") && strings.HasSuffix(expr, ")") {
// Check if parentheses are balanced
if isBalancedParentheses(expr[1 : len(expr)-1]) {
return parseTraceExpression(expr[1 : len(expr)-1])
}
}
// Handle unary NOT operator (prefix)
if strings.HasPrefix(expr, "NOT ") {
operand, err := parseTraceExpression(expr[4:])
if err != nil {
return nil, err
}
notOp := TraceOperatorNot
return &TraceOperand{
Operator: &notOp,
Left: operand,
}, nil
}
// Find binary operators with lowest precedence first (=> has lowest precedence)
// Order: => (lowest) < && < || < NOT (highest)
operators := []string{"=>", "&&", "||", " NOT "}
for _, op := range operators {
if pos := findOperatorPosition(expr, op); pos != -1 {
leftExpr := strings.TrimSpace(expr[:pos])
rightExpr := strings.TrimSpace(expr[pos+len(op):])
left, err := parseTraceExpression(leftExpr)
if err != nil {
return nil, err
}
right, err := parseTraceExpression(rightExpr)
if err != nil {
return nil, err
}
var opType TraceOperatorType
switch strings.TrimSpace(op) {
case "=>":
opType = TraceOperatorDirectDescendant
case "&&":
opType = TraceOperatorAnd
case "||":
opType = TraceOperatorOr
case "NOT":
opType = TraceOperatorExclude // Binary NOT (A NOT B)
}
return &TraceOperand{
Operator: &opType,
Left: left,
Right: right,
}, nil
}
}
// If no operators found, this should be a query reference
if matched, _ := regexp.MatchString(`^[A-Za-z][A-Za-z0-9_]*$`, expr); !matched {
return nil, errors.WrapInvalidInputf(
nil,
errors.CodeInvalidInput,
"invalid query reference '%s'",
expr,
)
}
return &TraceOperand{
QueryRef: &TraceOperatorQueryRef{Name: expr},
}, nil
}
// isBalancedParentheses checks if parentheses are balanced in the expression
func isBalancedParentheses(expr string) bool {
depth := 0
for _, char := range expr {
if char == '(' {
depth++
} else if char == ')' {
depth--
if depth < 0 {
return false
}
}
}
return depth == 0
}
// findOperatorPosition finds the position of an operator, respecting parentheses
func findOperatorPosition(expr, op string) int {
depth := 0
opLen := len(op)
// Scan from right to left to find the rightmost operator at depth 0
for i := len(expr) - 1; i >= 0; i-- {
char := expr[i]
// Update depth based on parentheses (scanning right to left)
if char == ')' {
depth++
} else if char == '(' {
depth--
}
// Only check for operators when we're at depth 0 (outside parentheses)
// and make sure we have enough characters for the operator
if depth == 0 && i+opLen <= len(expr) {
// Check if the substring matches our operator
if expr[i:i+opLen] == op {
// For " NOT " (binary), ensure proper spacing
if op == " NOT " {
// Make sure it's properly space-padded
if i > 0 && i+opLen < len(expr) {
return i
}
} else {
// For other operators (=>, &&, ||), return immediately
return i
}
}
}
}
return -1
}

View File

@@ -49,10 +49,10 @@ type PostableFunnel struct {
UserID string `json:"user_id,omitempty"`
// Analytics specific fields
StartTime int64 `json:"start_time,omitempty"`
EndTime int64 `json:"end_time,omitempty"`
StepStart int64 `json:"step_start,omitempty"`
StepEnd int64 `json:"step_end,omitempty"`
StartTime int64 `json:"start_time,omitempty"`
EndTime int64 `json:"end_time,omitempty"`
StepAOrder int64 `json:"step_a_order,omitempty"`
StepBOrder int64 `json:"step_b_order,omitempty"`
}
// GettableFunnel represents all possible funnel-related responses