Compare commits

...

19 Commits

Author SHA1 Message Date
nikhilmantri0902
c66ad5738e chore: use span metrics 2025-12-28 10:56:20 +05:30
nikhilmantri0902
634e8923c7 chore: add call rate fix 2025-12-27 13:39:02 +05:30
nikhilmantri0902
69855a1136 chore: rate->increase in top ops and entry point ops 2025-12-26 16:09:05 +05:30
nikhilmantri0902
cc1513c7e3 chore: top Ops and entry point calcula
tion fix
2025-12-26 15:33:33 +05:30
nikhilmantri0902
ba60d9ee50 chore: removed method 2025-12-26 12:38:04 +05:30
nikhilmantri0902
261622616f chore: removed log 2025-12-26 12:36:26 +05:30
nikhilmantri0902
592f5a7e11 chore: errorFilterExpr fix 2025-12-26 12:20:51 +05:30
nikhilmantri0902
dc390c813d chore: rate fix 2025-12-26 11:58:03 +05:30
nikhilmantri0902
ba6690cddb chore: lint fix 2025-12-26 11:20:02 +05:30
nikhilmantri0902
3a45ae12d1 chore: frontend change 2025-12-25 19:03:46 +05:30
nikhilmantri0902
5aef65fc11 chore: backend code modification for top_operations and entrypoint_operations 2025-12-25 18:53:41 +05:30
nikhilmantri0902
efd2d961d0 chore: redesign 2025-12-25 17:52:14 +05:30
nikhilmantri0902
ec618a00ce chore: redesign 2025-12-25 17:51:46 +05:30
nikhilmantri0902
c896753d3a chore: latency to nano seconds 2025-12-25 17:43:10 +05:30
nikhilmantri0902
cc616602eb chore: fix 2025-12-25 11:25:03 +05:30
nikhilmantri0902
255847ac61 chore: added mapping functions to handle span metrics 2025-12-24 19:33:09 +05:30
nikhilmantri0902
a1e4461865 chore: isMetricScopeField method + buildMetricScopeCondition method 2025-12-24 14:48:20 +05:30
nikhilmantri0902
146fd9892b chore: isMetricScopeField method + buildMetricScopeCondition method 2025-12-24 14:43:59 +05:30
nikhilmantri0902
b9ecdcf210 chore: added selector for isTopLevelOperation 2025-12-24 14:26:49 +05:30
10 changed files with 945 additions and 334 deletions

View File

@@ -1,155 +0,0 @@
import { OPERATORS } from 'constants/queryBuilder';
import {
BaseAutocompleteData,
DataTypes,
} from 'types/api/queryBuilder/queryAutocompleteResponse';
import { TagFilterItem } from 'types/api/queryBuilder/queryBuilderData';
import {
DataSource,
MetricAggregateOperator,
QueryBuilderData,
} from 'types/common/queryBuilder';
import {
GraphTitle,
KeyOperationTableHeader,
MetricsType,
WidgetKeys,
} from '../constant';
import { TopOperationQueryFactoryProps } from '../Tabs/types';
import { getQueryBuilderQuerieswithFormula } from './MetricsPageQueriesFactory';
export const topOperationQueries = ({
servicename,
dotMetricsEnabled,
}: TopOperationQueryFactoryProps): QueryBuilderData => {
const latencyAutoCompleteData: BaseAutocompleteData = {
key: dotMetricsEnabled
? WidgetKeys.Signoz_latency_bucket
: WidgetKeys.Signoz_latency_bucket_norm,
dataType: DataTypes.Float64,
type: '',
};
const errorRateAutoCompleteData: BaseAutocompleteData = {
key: WidgetKeys.SignozCallsTotal,
dataType: DataTypes.Float64,
type: '',
};
const numOfCallAutoCompleteData: BaseAutocompleteData = {
key: dotMetricsEnabled
? WidgetKeys.SignozLatencyCount
: WidgetKeys.SignozLatencyCountNorm,
dataType: DataTypes.Float64,
type: '',
};
const latencyAndNumberOfCallAdditionalItems: TagFilterItem[] = [
{
id: '',
key: {
key: dotMetricsEnabled
? WidgetKeys.Service_name
: WidgetKeys.Service_name_norm,
dataType: DataTypes.String,
type: MetricsType.Resource,
},
value: [servicename],
op: OPERATORS.IN,
},
];
const errorRateAdditionalItemsA: TagFilterItem[] = [
{
id: '',
key: {
dataType: DataTypes.String,
key: dotMetricsEnabled
? WidgetKeys.Service_name
: WidgetKeys.Service_name_norm,
type: MetricsType.Resource,
},
op: OPERATORS.IN,
value: [servicename],
},
{
id: '',
key: {
dataType: DataTypes.Int64,
key: dotMetricsEnabled ? WidgetKeys.StatusCode : WidgetKeys.StatusCodeNorm,
type: MetricsType.Tag,
},
op: OPERATORS.IN,
value: ['STATUS_CODE_ERROR'],
},
];
const errorRateAdditionalItemsB = latencyAndNumberOfCallAdditionalItems;
const groupBy: BaseAutocompleteData[] = [
{
dataType: DataTypes.String,
key: WidgetKeys.Operation,
type: MetricsType.Tag,
},
];
const autocompleteData = [
latencyAutoCompleteData,
latencyAutoCompleteData,
latencyAutoCompleteData,
errorRateAutoCompleteData,
errorRateAutoCompleteData,
numOfCallAutoCompleteData,
];
const additionalItems = [
latencyAndNumberOfCallAdditionalItems,
latencyAndNumberOfCallAdditionalItems,
latencyAndNumberOfCallAdditionalItems,
errorRateAdditionalItemsA,
errorRateAdditionalItemsB,
latencyAndNumberOfCallAdditionalItems,
];
const disabled = [false, false, false, true, true, false];
const legends = [
KeyOperationTableHeader.P50,
KeyOperationTableHeader.P90,
KeyOperationTableHeader.P99,
KeyOperationTableHeader.ERROR_RATE,
KeyOperationTableHeader.ERROR_RATE,
KeyOperationTableHeader.NUM_OF_CALLS,
];
const timeAggregateOperators = [
MetricAggregateOperator.EMPTY,
MetricAggregateOperator.EMPTY,
MetricAggregateOperator.EMPTY,
MetricAggregateOperator.RATE,
MetricAggregateOperator.RATE,
MetricAggregateOperator.RATE,
];
const spaceAggregateOperators = [
MetricAggregateOperator.P50,
MetricAggregateOperator.P90,
MetricAggregateOperator.P99,
MetricAggregateOperator.SUM,
MetricAggregateOperator.SUM,
MetricAggregateOperator.SUM,
];
const expressions = ['D*100/E'];
const legendFormulas = [GraphTitle.ERROR_PERCENTAGE];
const dataSource = DataSource.METRICS;
return getQueryBuilderQuerieswithFormula({
autocompleteData,
additionalItems,
disabled,
legends,
timeAggregateOperators,
spaceAggregateOperators,
expressions,
legendFormulas,
dataSource,
groupBy,
});
};

View File

@@ -46,7 +46,6 @@ import GraphControlsPanel from './Overview/GraphControlsPanel/GraphControlsPanel
import ServiceOverview from './Overview/ServiceOverview';
import TopLevelOperation from './Overview/TopLevelOperations';
import TopOperation from './Overview/TopOperation';
import TopOperationMetrics from './Overview/TopOperationMetrics';
import { Button, Card } from './styles';
import { IServiceName } from './types';
import {
@@ -72,10 +71,6 @@ function Application(): JSX.Element {
const urlQuery = useUrlQuery();
const { featureFlags } = useAppContext();
const isSpanMetricEnabled =
featureFlags?.find((flag) => flag.name === FeatureKeys.USE_SPAN_METRICS)
?.active || false;
const handleSetTimeStamp = useCallback((selectTime: number) => {
setSelectedTimeStamp(selectTime);
}, []);
@@ -396,7 +391,7 @@ function Application(): JSX.Element {
<Col span={12}>
<Card>
{isSpanMetricEnabled ? <TopOperationMetrics /> : <TopOperation />}{' '}
<TopOperation />
</Card>
</Col>
</Row>

View File

@@ -1,130 +0,0 @@
import { ENTITY_VERSION_V4 } from 'constants/app';
import { PANEL_TYPES } from 'constants/queryBuilder';
import { topOperationMetricsDownloadOptions } from 'container/MetricsApplication/constant';
import { getWidgetQueryBuilder } from 'container/MetricsApplication/MetricsApplication.factory';
import { topOperationQueries } from 'container/MetricsApplication/MetricsPageQueries/TopOperationQueries';
import { QueryTable } from 'container/QueryTable';
import { useGetQueryRange } from 'hooks/queryBuilder/useGetQueryRange';
import { updateStepInterval } from 'hooks/queryBuilder/useStepInterval';
import { useNotifications } from 'hooks/useNotifications';
import useResourceAttribute from 'hooks/useResourceAttribute';
import { convertRawQueriesToTraceSelectedTags } from 'hooks/useResourceAttribute/utils';
import { RowData } from 'lib/query/createTableColumnsFromQuery';
import { ReactNode, useMemo } from 'react';
import { useSelector } from 'react-redux';
import { useParams } from 'react-router-dom';
import { AppState } from 'store/reducers';
import { EQueryType } from 'types/common/dashboard';
import { GlobalReducer } from 'types/reducer/globalTime';
import { v4 as uuid } from 'uuid';
import { FeatureKeys } from '../../../../constants/features';
import { useAppContext } from '../../../../providers/App/App';
import { IServiceName } from '../types';
import { title } from './config';
import ColumnWithLink from './TableRenderer/ColumnWithLink';
import { getTableColumnRenderer } from './TableRenderer/TableColumnRenderer';
function TopOperationMetrics(): JSX.Element {
const { servicename: encodedServiceName } = useParams<IServiceName>();
const servicename = decodeURIComponent(encodedServiceName);
const { notifications } = useNotifications();
const { minTime, maxTime, selectedTime: globalSelectedInterval } = useSelector<
AppState,
GlobalReducer
>((state) => state.globalTime);
const { queries } = useResourceAttribute();
const selectedTraceTags = JSON.stringify(
convertRawQueriesToTraceSelectedTags(queries) || [],
);
const { featureFlags } = useAppContext();
const dotMetricsEnabled =
featureFlags?.find((flag) => flag.name === FeatureKeys.DOT_METRICS_ENABLED)
?.active || false;
const keyOperationWidget = useMemo(
() =>
getWidgetQueryBuilder({
query: {
queryType: EQueryType.QUERY_BUILDER,
promql: [],
builder: topOperationQueries({
servicename,
dotMetricsEnabled,
}),
clickhouse_sql: [],
id: uuid(),
},
panelTypes: PANEL_TYPES.TABLE,
}),
[servicename, dotMetricsEnabled],
);
const updatedQuery = updateStepInterval(keyOperationWidget.query);
const isEmptyWidget = keyOperationWidget.id === PANEL_TYPES.EMPTY_WIDGET;
const { data, isLoading } = useGetQueryRange(
{
selectedTime: keyOperationWidget?.timePreferance,
graphType: keyOperationWidget?.panelTypes,
query: updatedQuery,
globalSelectedInterval,
variables: {},
},
ENTITY_VERSION_V4,
{
queryKey: [
`GetMetricsQueryRange-${keyOperationWidget?.timePreferance}-${globalSelectedInterval}-${keyOperationWidget?.id}`,
keyOperationWidget,
maxTime,
minTime,
globalSelectedInterval,
],
keepPreviousData: true,
enabled: !isEmptyWidget,
refetchOnMount: false,
onError: (error) => {
notifications.error({ message: error.message });
},
},
);
const queryTableData = data?.payload?.data?.newResult?.data?.result || [];
const renderColumnCell = useMemo(
() =>
getTableColumnRenderer({
columnName: 'operation',
renderFunction: (record: RowData): ReactNode => (
<ColumnWithLink
servicename={servicename}
minTime={minTime}
maxTime={maxTime}
selectedTraceTags={selectedTraceTags}
record={record}
/>
),
}),
[servicename, minTime, maxTime, selectedTraceTags],
);
return (
<QueryTable
title={title}
query={updatedQuery}
queryTableData={queryTableData}
loading={isLoading}
renderColumnCell={renderColumnCell}
downloadOption={topOperationMetricsDownloadOptions}
sticky
/>
);
}
export default TopOperationMetrics;

View File

@@ -1,6 +1,5 @@
/* eslint-disable sonarjs/no-duplicate-string */
import { DownloadOptions } from 'container/Download/Download.types';
import { MenuItemKeys } from 'container/GridCardLayout/WidgetHeader/contants';
import {
MetricAggregateOperator,
@@ -107,11 +106,6 @@ export enum WidgetKeys {
Db_system_norm = 'db_system',
}
export const topOperationMetricsDownloadOptions: DownloadOptions = {
isDownloadEnabled: true,
fileName: 'top-operation',
} as const;
export const SERVICE_CHART_ID = {
latency: 'SERVICE_OVERVIEW_LATENCY',
error: 'SERVICE_OVERVIEW_ERROR',

View File

@@ -1,22 +1,14 @@
import * as Sentry from '@sentry/react';
import { FeatureKeys } from 'constants/features';
import ErrorBoundaryFallback from 'pages/ErrorBoundaryFallback/ErrorBoundaryFallback';
import { useAppContext } from 'providers/App/App';
import ServiceMetrics from './ServiceMetrics';
import ServiceTraces from './ServiceTraces';
import { Container } from './styles';
function Services(): JSX.Element {
const { featureFlags } = useAppContext();
const isSpanMetricEnabled =
featureFlags?.find((flag) => flag.name === FeatureKeys.USE_SPAN_METRICS)
?.active || false;
return (
<Sentry.ErrorBoundary fallback={<ErrorBoundaryFallback />}>
<Container style={{ marginTop: 0 }}>
{isSpanMetricEnabled ? <ServiceMetrics /> : <ServiceTraces />}
<ServiceTraces />
</Container>
</Sentry.ErrorBoundary>
);

View File

@@ -3,16 +3,19 @@ package implservices
import (
"context"
"fmt"
"time"
"math"
"sort"
"strconv"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/services"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/servicetypes/servicetypesv1"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -74,10 +77,25 @@ func (m *module) Get(ctx context.Context, orgUUID valuer.UUID, req *servicetypes
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "request is nil")
}
// Prepare phase
queryRangeReq, startMs, endMs, err := m.buildQueryRangeRequest(req)
if err != nil {
return nil, err
var (
startMs uint64
endMs uint64
err error
queryRangeReq *qbtypes.QueryRangeRequest
)
// Prefer span metrics path when enabled via flag or explicit override
// TODO(nikhilmantri0902): the following constant should be read from the en variable in this module itself.
useSpanMetrics := constants.PreferSpanMetrics
if useSpanMetrics {
queryRangeReq, startMs, endMs, err = m.buildSpanMetricsQueryRangeRequest(req)
if err != nil {
return nil, err
}
} else {
queryRangeReq, startMs, endMs, err = m.buildQueryRangeRequest(req)
if err != nil {
return nil, err
}
}
// Fetch phase
@@ -87,7 +105,14 @@ func (m *module) Get(ctx context.Context, orgUUID valuer.UUID, req *servicetypes
}
// Process phase
items, serviceNames := m.mapQueryRangeRespToServices(resp, startMs, endMs)
var items []*servicetypesv1.ResponseItem
var serviceNames []string
if useSpanMetrics {
items, serviceNames = m.mapSpanMetricsRespToServices(resp, startMs, endMs)
} else {
items, serviceNames = m.mapQueryRangeRespToServices(resp, startMs, endMs)
}
if len(items) == 0 {
return []*servicetypesv1.ResponseItem{}, nil
}
@@ -108,9 +133,22 @@ func (m *module) GetTopOperations(ctx context.Context, orgUUID valuer.UUID, req
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "request is nil")
}
qr, err := m.buildTopOpsQueryRangeRequest(req)
if err != nil {
return nil, err
var (
qr *qbtypes.QueryRangeRequest
err error
)
// Prefer span metrics path when enabled via flag
useSpanMetrics := constants.PreferSpanMetrics
if useSpanMetrics {
qr, err = m.buildSpanMetricsTopOpsQueryRangeRequest(req)
if err != nil {
return nil, err
}
} else {
qr, err = m.buildTopOpsQueryRangeRequest(req)
if err != nil {
return nil, err
}
}
resp, err := m.executeQuery(ctx, orgUUID, qr)
@@ -118,7 +156,17 @@ func (m *module) GetTopOperations(ctx context.Context, orgUUID valuer.UUID, req
return nil, err
}
items := m.mapTopOpsQueryRangeResp(resp)
var items []servicetypesv1.OperationItem
if useSpanMetrics {
items = m.mapSpanMetricsTopOpsResp(resp)
// Apply limit after merging multiple queries
if req.Limit > 0 && len(items) > req.Limit {
items = items[:req.Limit]
}
} else {
items = m.mapTopOpsQueryRangeResp(resp)
}
return items, nil
}
@@ -128,9 +176,22 @@ func (m *module) GetEntryPointOperations(ctx context.Context, orgUUID valuer.UUI
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "request is nil")
}
qr, err := m.buildEntryPointOpsQueryRangeRequest(req)
if err != nil {
return nil, err
var (
qr *qbtypes.QueryRangeRequest
err error
)
// Prefer span metrics path when enabled via flag
useSpanMetrics := constants.PreferSpanMetrics
if useSpanMetrics {
qr, err = m.buildSpanMetricsEntryPointOpsQueryRangeRequest(req)
if err != nil {
return nil, err
}
} else {
qr, err = m.buildEntryPointOpsQueryRangeRequest(req)
if err != nil {
return nil, err
}
}
resp, err := m.executeQuery(ctx, orgUUID, qr)
@@ -138,7 +199,17 @@ func (m *module) GetEntryPointOperations(ctx context.Context, orgUUID valuer.UUI
return nil, err
}
items := m.mapEntryPointOpsQueryRangeResp(resp)
var items []servicetypesv1.OperationItem
if useSpanMetrics {
items = m.mapSpanMetricsEntryPointOpsResp(resp)
// Apply limit after merging multiple queries
if req.Limit > 0 && len(items) > req.Limit {
items = items[:req.Limit]
}
} else {
items = m.mapEntryPointOpsQueryRangeResp(resp)
}
return items, nil
}
@@ -211,6 +282,162 @@ func (m *module) buildQueryRangeRequest(req *servicetypesv1.Request) (*qbtypes.Q
return &reqV5, startMs, endMs, nil
}
// buildSpanMetricsQueryRangeRequest constructs span-metrics queries for services.
func (m *module) buildSpanMetricsQueryRangeRequest(req *servicetypesv1.Request) (*qbtypes.QueryRangeRequest, uint64, uint64, error) {
// base filters from request
// Parse start/end (nanoseconds) from strings and convert to milliseconds for QBv5
startNs, err := strconv.ParseUint(req.Start, 10, 64)
if err != nil {
return nil, 0, 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid start time: %v", err)
}
endNs, err := strconv.ParseUint(req.End, 10, 64)
if err != nil {
return nil, 0, 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid end time: %v", err)
}
if startNs >= endNs {
return nil, 0, 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "start must be before end")
}
if err := validateTagFilterItems(req.Tags); err != nil {
return nil, 0, 0, err
}
startMs := startNs / 1_000_000
endMs := endNs / 1_000_000
filterExpr, variables := buildFilterExpression(req.Tags)
// enforce top-level scope via synthetic field
scopeExpr := "isTopLevelOperation = 'true'"
if filterExpr != "" {
filterExpr = "(" + filterExpr + ") AND (" + scopeExpr + ")"
} else {
filterExpr = scopeExpr
}
// Build error filter for num_errors query
var errorFilterExpr string
if filterExpr != "" {
errorFilterExpr = "(" + filterExpr + ") AND (status.code = 'STATUS_CODE_ERROR')"
} else {
errorFilterExpr = "status.code = 'STATUS_CODE_ERROR'"
}
// common groupBy on service.name
groupByService := []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextAttribute, // aligns with working payload
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
}},
}
queries := []qbtypes.QueryEnvelope{
{Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "p99_latency",
Signal: telemetrytypes.SignalMetrics,
Filter: &qbtypes.Filter{Expression: filterExpr},
GroupBy: groupByService,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_latency.bucket",
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationPercentile99,
ReduceTo: qbtypes.ReduceToAvg,
},
},
},
},
{Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "avg_latency",
Signal: telemetrytypes.SignalMetrics,
Filter: &qbtypes.Filter{Expression: filterExpr},
GroupBy: groupByService,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_latency.sum",
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationAvg,
ReduceTo: qbtypes.ReduceToAvg,
},
},
},
},
{Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "num_calls",
Signal: telemetrytypes.SignalMetrics,
Filter: &qbtypes.Filter{Expression: filterExpr},
GroupBy: groupByService,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_calls_total",
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
},
},
{Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "num_errors",
Signal: telemetrytypes.SignalMetrics,
Filter: &qbtypes.Filter{Expression: errorFilterExpr},
GroupBy: groupByService,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_calls_total",
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
},
},
{Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "num_4xx",
Signal: telemetrytypes.SignalMetrics,
// TODO: fix this, below we should add filter for 4xx http status codes
Filter: &qbtypes.Filter{Expression: errorFilterExpr},
GroupBy: groupByService,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_calls_total",
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
},
},
}
reqV5 := qbtypes.QueryRangeRequest{
Start: startMs,
End: endMs,
RequestType: qbtypes.RequestTypeScalar,
Variables: variables,
CompositeQuery: qbtypes.CompositeQuery{
Queries: queries,
},
FormatOptions: &qbtypes.FormatOptions{
FormatTableResultForUI: true,
FillGaps: false,
},
}
return &reqV5, startMs, endMs, nil
}
// executeQuery calls the underlying Querier with the provided request.
func (m *module) executeQuery(ctx context.Context, orgUUID valuer.UUID, qr *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error) {
return m.Querier.QueryRange(ctx, orgUUID, qr)
@@ -227,6 +454,7 @@ func (m *module) mapQueryRangeRespToServices(resp *qbtypes.QueryRangeResponse, s
return []*servicetypesv1.ResponseItem{}, []string{}
}
// traces path (original behavior)
// this stores the index at which service name is found in the response
serviceNameRespIndex := -1
aggIndexMappings := map[int]int{}
@@ -285,6 +513,97 @@ func (m *module) mapQueryRangeRespToServices(resp *qbtypes.QueryRangeResponse, s
return out, serviceNames
}
// TODO(nikhilmantri0902): add test cases for the functions in this PR
// mapSpanMetricsRespToServices merges span-metrics scalar results keyed by service.name using queryName for aggregation mapping.
func (m *module) mapSpanMetricsRespToServices(resp *qbtypes.QueryRangeResponse, startMs, endMs uint64) ([]*servicetypesv1.ResponseItem, []string) {
// TODO(nikhilmantri0902, in case of nil response, should we return nil directly from here for both values)
if resp == nil || len(resp.Data.Results) == 0 {
return []*servicetypesv1.ResponseItem{}, []string{}
}
sd, ok := resp.Data.Results[0].(*qbtypes.ScalarData)
if !ok || sd == nil {
return []*servicetypesv1.ResponseItem{}, []string{}
}
// locate service.name column and aggregation columns by queryName
serviceNameRespIndex := -1
aggCols := make(map[string]int)
for i, c := range sd.Columns {
switch c.Type {
case qbtypes.ColumnTypeGroup:
if c.Name == "service.name" {
serviceNameRespIndex = i
}
case qbtypes.ColumnTypeAggregation:
if c.QueryName != "" {
aggCols[c.QueryName] = i
}
}
}
if serviceNameRespIndex == -1 {
return []*servicetypesv1.ResponseItem{}, []string{}
}
type agg struct {
p99 float64
avg float64
callRate float64
errorRate float64
fourxxRate float64
}
perSvc := make(map[string]*agg)
for _, row := range sd.Data {
svcName := fmt.Sprintf("%v", row[serviceNameRespIndex])
a := perSvc[svcName]
if a == nil {
a = &agg{}
perSvc[svcName] = a
}
for qn, idx := range aggCols {
val := toFloat(row, idx)
switch qn {
case "p99_latency":
a.p99 = val * math.Pow(10, 6)
case "avg_latency":
a.avg = val * math.Pow(10, 6)
case "num_calls":
a.callRate = val
case "num_errors":
a.errorRate = val
case "num_4xx":
a.fourxxRate = val
}
}
}
out := make([]*servicetypesv1.ResponseItem, 0, len(perSvc))
serviceNames := make([]string, 0, len(perSvc))
for svcName, a := range perSvc {
// a.calls is already a rate (calls/second) from TimeAggregationRate, no need to divide by periodSeconds
errorRate := 0.0
if a.callRate > 0 {
errorRate = a.errorRate * 100 / a.callRate
}
fourXXRate := 0.0
if a.callRate > 0 {
fourXXRate = a.fourxxRate * 100 / a.callRate
}
out = append(out, &servicetypesv1.ResponseItem{
ServiceName: svcName,
Percentile99: a.p99,
AvgDuration: a.avg,
CallRate: a.callRate,
ErrorRate: errorRate,
FourXXRate: fourXXRate,
DataWarning: servicetypesv1.DataWarning{TopLevelOps: []string{}},
})
serviceNames = append(serviceNames, svcName)
}
return out, serviceNames
}
// attachTopLevelOps fetches top-level ops from TelemetryStore and attaches them to items.
func (m *module) attachTopLevelOps(ctx context.Context, serviceNames []string, startMs uint64, items []*servicetypesv1.ResponseItem) error {
startTime := time.UnixMilli(int64(startMs)).UTC()
@@ -404,6 +723,546 @@ func (m *module) mapTopOpsQueryRangeResp(resp *qbtypes.QueryRangeResponse) []ser
return out
}
// buildSpanMetricsTopOpsQueryRangeRequest constructs span-metrics queries for top operations.
func (m *module) buildSpanMetricsTopOpsQueryRangeRequest(req *servicetypesv1.OperationsRequest) (*qbtypes.QueryRangeRequest, error) {
if req.Service == "" {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "service is required")
}
startNs, err := strconv.ParseUint(req.Start, 10, 64)
if err != nil {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid start time: %v", err)
}
endNs, err := strconv.ParseUint(req.End, 10, 64)
if err != nil {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid end time: %v", err)
}
if startNs >= endNs {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "start must be before end")
}
if req.Limit < 1 || req.Limit > 5000 {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "limit must be between 1 and 5000")
}
if err := validateTagFilterItems(req.Tags); err != nil {
return nil, err
}
startMs := startNs / 1_000_000
endMs := endNs / 1_000_000
// Build service filter
serviceTag := servicetypesv1.TagFilterItem{
Key: "service.name",
Operator: "in",
StringValues: []string{req.Service},
}
tags := append([]servicetypesv1.TagFilterItem{serviceTag}, req.Tags...)
filterExpr, variables := buildFilterExpression(tags)
// Build error filter for num_errors query
var errorFilterExpr string
if filterExpr != "" {
errorFilterExpr = "(" + filterExpr + ") AND (status.code = 'STATUS_CODE_ERROR')"
} else {
errorFilterExpr = "status.code = 'STATUS_CODE_ERROR'"
}
// Common groupBy on operation
groupByOperation := []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "operation",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
}},
}
queries := []qbtypes.QueryEnvelope{
{Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "p50_latency",
Signal: telemetrytypes.SignalMetrics,
Filter: &qbtypes.Filter{Expression: filterExpr},
GroupBy: groupByOperation,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_latency.bucket",
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationPercentile50,
ReduceTo: qbtypes.ReduceToAvg,
},
},
},
},
{Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "p95_latency",
Signal: telemetrytypes.SignalMetrics,
Filter: &qbtypes.Filter{Expression: filterExpr},
GroupBy: groupByOperation,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_latency.bucket",
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationPercentile95,
ReduceTo: qbtypes.ReduceToAvg,
},
},
},
},
{Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "p99_latency",
Signal: telemetrytypes.SignalMetrics,
Filter: &qbtypes.Filter{Expression: filterExpr},
GroupBy: groupByOperation,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_latency.bucket",
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationPercentile99,
ReduceTo: qbtypes.ReduceToAvg,
},
},
},
},
{Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "num_calls",
Signal: telemetrytypes.SignalMetrics,
Filter: &qbtypes.Filter{Expression: filterExpr},
GroupBy: groupByOperation,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_calls_total",
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationIncrease,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
},
},
{Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "num_errors",
Signal: telemetrytypes.SignalMetrics,
Filter: &qbtypes.Filter{Expression: errorFilterExpr},
GroupBy: groupByOperation,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_calls_total",
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationIncrease,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
},
},
}
reqV5 := qbtypes.QueryRangeRequest{
Start: startMs,
End: endMs,
RequestType: qbtypes.RequestTypeScalar,
Variables: variables,
CompositeQuery: qbtypes.CompositeQuery{
Queries: queries,
},
}
return &reqV5, nil
}
// mapSpanMetricsTopOpsResp maps span-metrics scalar results to OperationItem array using queryName for aggregation mapping.
func (m *module) mapSpanMetricsTopOpsResp(resp *qbtypes.QueryRangeResponse) []servicetypesv1.OperationItem {
if resp == nil || len(resp.Data.Results) == 0 {
return []servicetypesv1.OperationItem{}
}
// Group data by operation name and merge aggregations from all results
type agg struct {
p50 float64
p95 float64
p99 float64
calls uint64
errors uint64
}
perOp := make(map[string]*agg)
// Iterate through all results (each query returns a separate ScalarData)
for _, result := range resp.Data.Results {
sd, ok := result.(*qbtypes.ScalarData)
if !ok || sd == nil {
continue
}
// Skip empty results
if len(sd.Columns) == 0 || len(sd.Data) == 0 {
continue
}
// Find operation column index (should be consistent across all results)
operationIdx := -1
for i, c := range sd.Columns {
if c.Type == qbtypes.ColumnTypeGroup && c.Name == "operation" {
operationIdx = i
break
}
}
if operationIdx == -1 {
continue
}
// Find aggregation column index
aggIdx := -1
for i, c := range sd.Columns {
if c.Type == qbtypes.ColumnTypeAggregation {
aggIdx = i
break
}
}
if aggIdx == -1 {
continue
}
// Process each row in this result and merge by operation name
queryName := sd.QueryName
for _, row := range sd.Data {
if len(row) <= operationIdx || len(row) <= aggIdx {
continue
}
opName := fmt.Sprintf("%v", row[operationIdx])
val := toFloat(row, aggIdx)
a := perOp[opName]
if a == nil {
a = &agg{}
perOp[opName] = a
}
// Map values based on queryName
switch queryName {
case "p50_latency":
a.p50 = val * math.Pow(10, 6) // convert seconds to nanoseconds
case "p95_latency":
a.p95 = val * math.Pow(10, 6)
case "p99_latency":
a.p99 = val * math.Pow(10, 6)
case "num_calls":
a.calls = uint64(val)
case "num_errors":
a.errors = uint64(val)
}
}
}
if len(perOp) == 0 {
return []servicetypesv1.OperationItem{}
}
// Convert to OperationItem array and sort by P99 desc
out := make([]servicetypesv1.OperationItem, 0, len(perOp))
for opName, a := range perOp {
out = append(out, servicetypesv1.OperationItem{
Name: opName,
P50: a.p50,
P95: a.p95,
P99: a.p99,
NumCalls: a.calls,
ErrorCount: a.errors,
})
}
// Sort by P99 descending (matching traces behavior)
sort.Slice(out, func(i, j int) bool {
return out[i].P99 > out[j].P99
})
return out
}
// mapSpanMetricsEntryPointOpsResp maps span-metrics scalar results to OperationItem array for entry point operations.
// Uses queryName for aggregation mapping.
func (m *module) mapSpanMetricsEntryPointOpsResp(resp *qbtypes.QueryRangeResponse) []servicetypesv1.OperationItem {
if resp == nil || len(resp.Data.Results) == 0 {
return []servicetypesv1.OperationItem{}
}
// Group data by operation name and merge aggregations from all results
type agg struct {
p50 float64
p95 float64
p99 float64
calls uint64
errors uint64
}
perOp := make(map[string]*agg)
// Iterate through all results (each query returns a separate ScalarData)
for _, result := range resp.Data.Results {
sd, ok := result.(*qbtypes.ScalarData)
if !ok || sd == nil {
continue
}
// Skip empty results
if len(sd.Columns) == 0 || len(sd.Data) == 0 {
continue
}
// Find operation column index (should be consistent across all results)
operationIdx := -1
for i, c := range sd.Columns {
if c.Type == qbtypes.ColumnTypeGroup && c.Name == "operation" {
operationIdx = i
break
}
}
if operationIdx == -1 {
continue
}
// Find aggregation column index
aggIdx := -1
for i, c := range sd.Columns {
if c.Type == qbtypes.ColumnTypeAggregation {
aggIdx = i
break
}
}
if aggIdx == -1 {
continue
}
// Process each row in this result and merge by operation name
queryName := sd.QueryName
for _, row := range sd.Data {
if len(row) <= operationIdx || len(row) <= aggIdx {
continue
}
opName := fmt.Sprintf("%v", row[operationIdx])
val := toFloat(row, aggIdx)
a := perOp[opName]
if a == nil {
a = &agg{}
perOp[opName] = a
}
// Map values based on queryName
switch queryName {
case "p50_latency":
a.p50 = val * math.Pow(10, 6) // convert seconds to nanoseconds
case "p95_latency":
a.p95 = val * math.Pow(10, 6)
case "p99_latency":
a.p99 = val * math.Pow(10, 6)
case "num_calls":
a.calls = uint64(val)
case "num_errors":
a.errors = uint64(val)
}
}
}
if len(perOp) == 0 {
return []servicetypesv1.OperationItem{}
}
// Convert to OperationItem array and sort by P99 desc
out := make([]servicetypesv1.OperationItem, 0, len(perOp))
for opName, a := range perOp {
out = append(out, servicetypesv1.OperationItem{
Name: opName,
P50: a.p50,
P95: a.p95,
P99: a.p99,
NumCalls: a.calls,
ErrorCount: a.errors,
})
}
// Sort by P99 descending (matching traces behavior)
sort.Slice(out, func(i, j int) bool {
return out[i].P99 > out[j].P99
})
return out
}
// buildSpanMetricsEntryPointOpsQueryRangeRequest constructs span-metrics queries for entry point operations.
// Similar to buildSpanMetricsTopOpsQueryRangeRequest but includes isTopLevelOperation filter.
func (m *module) buildSpanMetricsEntryPointOpsQueryRangeRequest(req *servicetypesv1.OperationsRequest) (*qbtypes.QueryRangeRequest, error) {
if req.Service == "" {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "service is required")
}
startNs, err := strconv.ParseUint(req.Start, 10, 64)
if err != nil {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid start time: %v", err)
}
endNs, err := strconv.ParseUint(req.End, 10, 64)
if err != nil {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid end time: %v", err)
}
if startNs >= endNs {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "start must be before end")
}
if req.Limit < 1 || req.Limit > 5000 {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "limit must be between 1 and 5000")
}
if err := validateTagFilterItems(req.Tags); err != nil {
return nil, err
}
startMs := startNs / 1_000_000
endMs := endNs / 1_000_000
// Build service filter
serviceTag := servicetypesv1.TagFilterItem{
Key: "service.name",
Operator: "in",
StringValues: []string{req.Service},
}
tags := append([]servicetypesv1.TagFilterItem{serviceTag}, req.Tags...)
filterExpr, variables := buildFilterExpression(tags)
// Enforce top-level scope via synthetic field (entry point operations only)
scopeExpr := "isTopLevelOperation = 'true'"
if filterExpr != "" {
filterExpr = "(" + filterExpr + ") AND (" + scopeExpr + ")"
} else {
filterExpr = scopeExpr
}
// Build error filter for num_errors query
var errorFilterExpr string
if filterExpr != "" {
errorFilterExpr = "(" + filterExpr + ") AND (status.code = 'STATUS_CODE_ERROR')"
} else {
errorFilterExpr = "status.code = 'STATUS_CODE_ERROR'"
}
// Common groupBy on operation
groupByOperation := []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "operation",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
}},
}
queries := []qbtypes.QueryEnvelope{
{Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "p50_latency",
Signal: telemetrytypes.SignalMetrics,
Filter: &qbtypes.Filter{Expression: filterExpr},
GroupBy: groupByOperation,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_latency.bucket",
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationPercentile50,
ReduceTo: qbtypes.ReduceToAvg,
},
},
},
},
{Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "p95_latency",
Signal: telemetrytypes.SignalMetrics,
Filter: &qbtypes.Filter{Expression: filterExpr},
GroupBy: groupByOperation,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_latency.bucket",
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationPercentile95,
ReduceTo: qbtypes.ReduceToAvg,
},
},
},
},
{Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "p99_latency",
Signal: telemetrytypes.SignalMetrics,
Filter: &qbtypes.Filter{Expression: filterExpr},
GroupBy: groupByOperation,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_latency.bucket",
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationPercentile99,
ReduceTo: qbtypes.ReduceToAvg,
},
},
},
},
{Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "num_calls",
Signal: telemetrytypes.SignalMetrics,
Filter: &qbtypes.Filter{Expression: filterExpr},
GroupBy: groupByOperation,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_calls_total",
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationIncrease,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
},
},
{Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "num_errors",
Signal: telemetrytypes.SignalMetrics,
Filter: &qbtypes.Filter{Expression: errorFilterExpr},
GroupBy: groupByOperation,
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_calls_total",
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationIncrease,
SpaceAggregation: metrictypes.SpaceAggregationSum,
ReduceTo: qbtypes.ReduceToAvg,
},
},
},
},
}
reqV5 := qbtypes.QueryRangeRequest{
Start: startMs,
End: endMs,
RequestType: qbtypes.RequestTypeScalar,
Variables: variables,
CompositeQuery: qbtypes.CompositeQuery{
Queries: queries,
},
}
return &reqV5, nil
}
func (m *module) buildEntryPointOpsQueryRangeRequest(req *servicetypesv1.OperationsRequest) (*qbtypes.QueryRangeRequest, error) {
if req.Service == "" {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "service is required")

View File

@@ -689,6 +689,19 @@ func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelecto
// hit the limit?
complete := rowCount <= limit
// Add synthetic metrics-only key isTopLevelOperation so filters can be parsed even if not present in metrics tables
for _, selector := range fieldKeySelectors {
if selector.Name == "isTopLevelOperation" {
keys = append(keys, &telemetrytypes.TelemetryFieldKey{
Name: "isTopLevelOperation",
Signal: telemetrytypes.SignalMetrics,
FieldContext: telemetrytypes.FieldContextUnspecified,
FieldDataType: telemetrytypes.FieldDataTypeBool,
})
break
}
}
return keys, complete, nil
}
@@ -978,7 +991,7 @@ func (t *telemetryMetaStore) getRelatedValues(ctx context.Context, fieldValueSel
FieldMapper: t.fm,
ConditionBuilder: t.conditionBuilder,
FieldKeys: keys,
}, 0, 0)
}, 0, 0)
if err == nil {
sb.AddWhereClause(whereClause.WhereClause)
} else {
@@ -1002,20 +1015,20 @@ func (t *telemetryMetaStore) getRelatedValues(ctx context.Context, fieldValueSel
// search on attributes
key.FieldContext = telemetrytypes.FieldContextAttribute
cond, err := t.conditionBuilder.ConditionFor(ctx, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb, 0, 0)
cond, err := t.conditionBuilder.ConditionFor(ctx, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb, 0, 0)
if err == nil {
conds = append(conds, cond)
}
// search on resource
key.FieldContext = telemetrytypes.FieldContextResource
cond, err = t.conditionBuilder.ConditionFor(ctx, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb, 0, 0)
cond, err = t.conditionBuilder.ConditionFor(ctx, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb, 0, 0)
if err == nil {
conds = append(conds, cond)
}
key.FieldContext = origContext
} else {
cond, err := t.conditionBuilder.ConditionFor(ctx, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb, 0, 0)
cond, err := t.conditionBuilder.ConditionFor(ctx, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb, 0, 0)
if err == nil {
conds = append(conds, cond)
}

View File

@@ -4,9 +4,11 @@ import (
"context"
"fmt"
"slices"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -139,9 +141,13 @@ func (c *conditionBuilder) ConditionFor(
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
_ uint64,
start uint64,
_ uint64,
) (string, error) {
if c.isMetricScopeField(key.Name) {
return c.buildMetricScopeCondition(operator, value, start)
}
condition, err := c.conditionFor(ctx, key, operator, value, sb)
if err != nil {
return "", err
@@ -149,3 +155,37 @@ func (c *conditionBuilder) ConditionFor(
return condition, nil
}
func (c *conditionBuilder) isMetricScopeField(keyName string) bool {
return keyName == MetricScopeFieldIsTopLevelOperation
}
// buildMetricScopeCondition handles synthetic field isTopLevelOperation for metrics signal.
func (c *conditionBuilder) buildMetricScopeCondition(operator qbtypes.FilterOperator, value any, start uint64) (string, error) {
if operator != qbtypes.FilterOperatorEqual {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "%s only supports '=' operator", MetricScopeFieldIsTopLevelOperation)
}
// Accept true in bool or string form; anything else is invalid
isTrue := false
switch v := value.(type) {
case bool:
isTrue = v
case string:
isTrue = strings.ToLower(v) == "true"
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "%s expects boolean value, got %T", MetricScopeFieldIsTopLevelOperation, value)
}
if !isTrue {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "%s can only be filtered with value 'true'", MetricScopeFieldIsTopLevelOperation)
}
startSec := int64(start / 1000)
// Note: Escape $$ to $$$$ to avoid sqlbuilder interpreting materialized $ signs
return fmt.Sprintf(
"((JSONExtractString(labels, 'operation'), JSONExtractString(labels, 'service.name')) GLOBAL IN (SELECT DISTINCT name, serviceName FROM %s.%s WHERE time >= toDateTime(%d)))",
telemetrytraces.DBName,
telemetrytraces.LocalTopLevelOperationsTableName,
startSec,
), nil
}

View File

@@ -7,3 +7,5 @@ var IntrinsicFields = []string{
"type",
"is_monotonic",
}
const MetricScopeFieldIsTopLevelOperation = "isTopLevelOperation"

View File

@@ -1,12 +1,13 @@
package telemetrytraces
const (
DBName = "signoz_traces"
SpanIndexV3TableName = "distributed_signoz_index_v3"
SpanIndexV3LocalTableName = "signoz_index_v3"
TagAttributesV2TableName = "distributed_tag_attributes_v2"
TagAttributesV2LocalTableName = "tag_attributes_v2"
TopLevelOperationsTableName = "distributed_top_level_operations"
TraceSummaryTableName = "distributed_trace_summary"
SpanAttributesKeysTblName = "distributed_span_attributes_keys"
DBName = "signoz_traces"
SpanIndexV3TableName = "distributed_signoz_index_v3"
SpanIndexV3LocalTableName = "signoz_index_v3"
TagAttributesV2TableName = "distributed_tag_attributes_v2"
TagAttributesV2LocalTableName = "tag_attributes_v2"
TopLevelOperationsTableName = "distributed_top_level_operations"
LocalTopLevelOperationsTableName = "top_level_operations"
TraceSummaryTableName = "distributed_trace_summary"
SpanAttributesKeysTblName = "distributed_span_attributes_keys"
)