Compare commits

...

9 Commits

Author SHA1 Message Date
vikrantgupta25
a451044938 feat: temporary changes for inmemory cache 2024-12-27 14:23:56 +05:30
vikrantgupta25
f9c8850642 feat: use range changed to handle loads better 2024-12-26 23:54:59 +05:30
vikrantgupta25
e9f5e77175 feat: added code comments and TODO 2024-12-26 19:59:48 +05:30
vikrantgupta25
d7e6b412a2 feat: add caching for tree at query service 2024-12-26 19:46:56 +05:30
vikrantgupta25
8d16ec37fe feat: handle the scroll to top and bottom 2024-12-26 17:13:20 +05:30
vikrantgupta25
ef971484dc feat: setup the success state for the spans waterfall model 2024-12-26 16:25:11 +05:30
vikrantgupta25
5da2b39716 feat: structure the code in sync with api lifecycle 2024-12-26 14:15:31 +05:30
vikrantgupta25
6f33842e4f feat: frontend base setup and api integration for trace v2 2024-12-24 23:37:39 +05:30
vikrantgupta25
6e1371b6a0 feat: enable waterfall tree construction at query service 2024-12-24 17:30:30 +05:30
28 changed files with 890 additions and 23 deletions

View File

@@ -8,6 +8,7 @@ import (
"github.com/jmoiron/sqlx"
basechr "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/interfaces"
)
@@ -27,8 +28,9 @@ func NewDataConnector(
cluster string,
useLogsNewSchema bool,
useTraceNewSchema bool,
cache cache.Cache,
) *ClickhouseReader {
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema)
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema, cache)
return &ClickhouseReader{
conn: ch.GetConn(),
appdb: localDB,

View File

@@ -139,6 +139,15 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err
}
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
if err != nil {
return nil, err
}
c = cache.NewCache(cacheOpts)
}
// set license manager as feature flag provider in dao
modelDao.SetFlagProvider(lm)
readerReady := make(chan bool)
@@ -157,6 +166,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
c,
)
go qb.Start(readerReady)
reader = qb
@@ -171,14 +181,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err
}
}
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
if err != nil {
return nil, err
}
c = cache.NewCache(cacheOpts)
}
<-readerReady
rm, err := makeRulesManager(serverOptions.PromConfigPath,

View File

@@ -43,7 +43,7 @@ export const TraceFilter = Loadable(
);
export const TraceDetail = Loadable(
() => import(/* webpackChunkName: "TraceDetail Page" */ 'pages/TraceDetail'),
() => import(/* webpackChunkName: "TraceDetail Page" */ 'pages/TraceDetailV2'),
);
export const UsageExplorerPage = Loadable(

View File

@@ -0,0 +1,31 @@
import { ApiV2Instance as axios } from 'api';
import { ErrorResponseHandler } from 'api/ErrorResponseHandler';
import { AxiosError } from 'axios';
import { omit } from 'lodash-es';
import { ErrorResponse, SuccessResponse } from 'types/api';
import {
GetTraceV2PayloadProps,
GetTraceV2SuccessResponse,
} from 'types/api/trace/getTraceV2';
const getTraceV2 = async (
props: GetTraceV2PayloadProps,
): Promise<SuccessResponse<GetTraceV2SuccessResponse> | ErrorResponse> => {
try {
const response = await axios.post<GetTraceV2SuccessResponse>(
`/traces/${props.traceId}`,
omit(props, 'traceId'),
);
return {
statusCode: 200,
error: null,
message: 'Success',
payload: response.data,
};
} catch (error) {
return ErrorResponseHandler(error as AxiosError);
}
};
export default getTraceV2;

View File

@@ -21,4 +21,5 @@ export const REACT_QUERY_KEY = {
GET_HOST_LIST: 'GET_HOST_LIST',
UPDATE_ALERT_RULE: 'UPDATE_ALERT_RULE',
GET_ACTIVE_LICENSE_V3: 'GET_ACTIVE_LICENSE_V3',
GET_TRACE_V2: 'GET_TRACE_V2',
};

View File

@@ -0,0 +1,3 @@
.trace-waterfall {
height: 70vh;
}

View File

@@ -0,0 +1,105 @@
import './TraceWaterfall.styles.scss';
import { AxiosError } from 'axios';
import Spinner from 'components/Spinner';
import useGetTraceV2 from 'hooks/trace/useGetTraceV2';
import { useMemo, useState } from 'react';
import { useParams } from 'react-router-dom';
import { TraceDetailV2URLProps } from 'types/api/trace/getTraceV2';
import { TraceWaterfallStates } from './constants';
import Error from './TraceWaterfallStates/Error/Error';
import NoData from './TraceWaterfallStates/NoData/NoData';
import Success from './TraceWaterfallStates/Success/Success';
/**
* render a virtuoso list with the spans recieved from the trace details API call
* trigger API call on bottom reached and on top reached, set the interestedSpanId and make that as the query key along with uncollapsed nodes
* render the tree structure based on hasChildren and the level. the left spacing depends on the level. a window pane with horizontal scroll for the same as well.
* min width to be set [] and then scroll post that based on content
*/
function TraceWaterfall(): JSX.Element {
const { id: traceId } = useParams<TraceDetailV2URLProps>();
const [interestedSpanId, setInterestedSpanId] = useState<string | null>();
const [uncollapsedNodes, setUncollapsedNodes] = useState<string[]>([]);
const {
data: traceData,
isFetching: isFetchingTraceData,
error: errorFetchingTraceData,
} = useGetTraceV2({
traceId,
interestedSpanId: interestedSpanId || '',
uncollapsedNodes,
});
// get the current state of trace waterfall based on the API lifecycle
const traceWaterfallState = useMemo(() => {
if (isFetchingTraceData) {
if (
traceData &&
traceData.payload &&
traceData.payload.spans &&
traceData.payload.spans.length > 0
) {
return TraceWaterfallStates.FETCHING_WITH_OLD_DATA_PRESENT;
}
return TraceWaterfallStates.LOADING;
}
if (errorFetchingTraceData) {
return TraceWaterfallStates.ERROR;
}
if (
traceData &&
traceData.payload &&
traceData.payload.spans &&
traceData.payload.spans.length === 0
) {
return TraceWaterfallStates.NO_DATA;
}
return TraceWaterfallStates.SUCCESS;
}, [errorFetchingTraceData, isFetchingTraceData, traceData]);
// capture the spans from the response, since we do not need to do any manipulation on the same we will keep this as a simple constant [ memoized ]
const spans = useMemo(() => traceData?.payload?.spans || [], [
traceData?.payload?.spans,
]);
// get the content based on the current state of the trace waterfall
const getContent = useMemo(() => {
switch (traceWaterfallState) {
case TraceWaterfallStates.LOADING:
return <Spinner tip="Fetching the trace!" />;
case TraceWaterfallStates.ERROR:
return <Error error={errorFetchingTraceData as AxiosError} />;
case TraceWaterfallStates.NO_DATA:
return <NoData id={traceId} />;
case TraceWaterfallStates.SUCCESS:
case TraceWaterfallStates.FETCHING_WITH_OLD_DATA_PRESENT:
return (
<Success
spans={spans}
traceWaterfallState={traceWaterfallState}
interestedSpanId={interestedSpanId || ''}
uncollapsedNodes={uncollapsedNodes}
setInterestedSpanId={setInterestedSpanId}
setUncollapsedNodes={setUncollapsedNodes}
/>
);
default:
return <Spinner tip="Fetching the trace!" />;
}
}, [
errorFetchingTraceData,
interestedSpanId,
spans,
traceId,
traceWaterfallState,
uncollapsedNodes,
]);
return <div className="trace-waterfall">{getContent}</div>;
}
export default TraceWaterfall;

View File

@@ -0,0 +1,19 @@
import { Typography } from 'antd';
import { AxiosError } from 'axios';
interface IErrorProps {
error: AxiosError;
}
function Error(props: IErrorProps): JSX.Element {
const { error } = props;
return (
<>
<Typography.Text>Error fetching trace</Typography.Text>
<Typography.Text>{error.message}</Typography.Text>
</>
);
}
export default Error;

View File

@@ -0,0 +1,12 @@
import { Typography } from 'antd';
interface INoDataProps {
id: string;
}
function NoData(props: INoDataProps): JSX.Element {
const { id } = props;
return <Typography.Text>No Trace found with the id: {id} </Typography.Text>;
}
export default NoData;

View File

@@ -0,0 +1,39 @@
.success-cotent {
width: 100%;
height: 100%;
.span-item {
display: flex;
flex-direction: column;
width: fit-content;
&.vertical-connector {
border-left: 1px solid #d9d9d9;
}
.horizontal-connector {
border: 1px solid #d9d9d9;
}
.first-row {
display: flex;
align-items: center;
.collapse-uncollapse-button {
display: flex;
align-items: center;
justify-content: center;
height: 22px;
width: 30px;
padding: 0px;
margin-right: 10px;
}
}
.second-row {
display: flex;
padding-left: 5px;
border-left: 1px solid #d9d9d9;
}
}
}

View File

@@ -0,0 +1,159 @@
import './Success.styles.scss';
import { Button, Typography } from 'antd';
import cx from 'classnames';
import {
FIXED_LEFT_PADDING_BASE,
TraceWaterfallStates,
} from 'container/TraceWaterfall/constants';
import { isEmpty } from 'lodash-es';
import { ChevronDown, ChevronRight, Leaf } from 'lucide-react';
import {
Dispatch,
SetStateAction,
useCallback,
useEffect,
useRef,
} from 'react';
import { ListRange, Virtuoso, VirtuosoHandle } from 'react-virtuoso';
import { Span } from 'types/api/trace/getTraceV2';
interface ISuccessProps {
spans: Span[];
traceWaterfallState: TraceWaterfallStates;
interestedSpanId: string;
uncollapsedNodes: string[];
setInterestedSpanId: Dispatch<SetStateAction<string | null | undefined>>;
setUncollapsedNodes: Dispatch<SetStateAction<string[]>>;
}
function Success(props: ISuccessProps): JSX.Element {
const {
spans,
traceWaterfallState,
interestedSpanId,
uncollapsedNodes,
setInterestedSpanId,
setUncollapsedNodes,
} = props;
const ref = useRef<VirtuosoHandle>(null);
const handleRangeChanged = (range: ListRange): void => {
const { startIndex, endIndex } = range;
// if the start is reached and it's not the root span then fetch!
if (startIndex === 0 && spans[0].parentSpanId !== '') {
setInterestedSpanId(spans[0].spanId);
}
// if the end is reached
if (endIndex === spans.length - 1) {
// and the startIndex is also present in the same range then it esentially means the trace ends in a single screen
if (startIndex === 0) {
return;
}
// else fetch more spans
setInterestedSpanId(spans[spans.length - 1].spanId);
}
};
// when scrolling to the bottom we fetch the spans before and after the current span. hence we need to maintain the scroll position to the interested span
useEffect(() => {
if (ref.current && interestedSpanId && !isEmpty(interestedSpanId)) {
const index = spans.findIndex((node) => node.spanId === interestedSpanId);
if (index !== -1) {
ref.current.scrollToIndex({
index,
behavior: 'auto',
align: 'start',
});
}
}
}, [interestedSpanId, spans]);
const getItemContent = useCallback(
(_: number, span: Span): JSX.Element => {
const isRootSpan = span.parentSpanId === '';
const leftMarginBeforeTheHorizontalConnector =
span.level > 0
? `${(span.level - 1) * (FIXED_LEFT_PADDING_BASE + 1)}px`
: '0px';
const isUnCollapsed = uncollapsedNodes.includes(span.spanId);
return (
// do not crop the service names and let the window overflow.
// the pane height changes needs to be addressed by resizable columns
<div
className={cx('span-item', !isRootSpan ? 'vertical-connector' : '')}
style={{ marginLeft: leftMarginBeforeTheHorizontalConnector }}
>
<div className="first-row">
{!isRootSpan && (
<div
className="horizontal-connector"
style={{ width: `${FIXED_LEFT_PADDING_BASE}px` }}
/>
)}
{span.hasChildren ? (
<Button
icon={
isUnCollapsed ? <ChevronDown size={14} /> : <ChevronRight size={14} />
}
onClick={(): void => {
setInterestedSpanId(span.spanId);
if (isUnCollapsed) {
setUncollapsedNodes((prev) =>
prev.filter((id) => id !== span.spanId),
);
} else {
setUncollapsedNodes((prev) => [...prev, span.spanId]);
}
}}
className="collapse-uncollapse-button"
/>
) : (
<Button
icon={<Leaf size={14} />}
className="collapse-uncollapse-button"
/>
)}
<Typography.Text>{span.name}</Typography.Text>
</div>
<div
className="second-row"
style={{
marginLeft: !isRootSpan ? `${FIXED_LEFT_PADDING_BASE}px` : '0px',
}}
>
<Typography.Text>{span.serviceName}</Typography.Text>
</div>
</div>
);
},
[setInterestedSpanId, setUncollapsedNodes, uncollapsedNodes],
);
return (
<div className="success-cotent">
{traceWaterfallState ===
TraceWaterfallStates.FETCHING_WITH_OLD_DATA_PRESENT &&
interestedSpanId === spans[0].spanId && (
<Typography.Text>Fetching Spans....</Typography.Text>
)}
<Virtuoso
ref={ref}
style={{ height: '100%' }}
data={spans}
rangeChanged={handleRangeChanged}
itemContent={getItemContent}
/>
{traceWaterfallState ===
TraceWaterfallStates.FETCHING_WITH_OLD_DATA_PRESENT &&
interestedSpanId === spans[spans.length - 1].spanId && (
<Typography.Text>Fetching Spans....</Typography.Text>
)}
</div>
);
}
export default Success;

View File

@@ -0,0 +1,9 @@
export enum TraceWaterfallStates {
LOADING = 'LOADING',
SUCCESS = 'SUCCSS',
NO_DATA = 'NO_DATA',
ERROR = 'ERROR',
FETCHING_WITH_OLD_DATA_PRESENT = 'FETCHING_WTIH_OLD_DATA_PRESENT',
}
export const FIXED_LEFT_PADDING_BASE = 10;

View File

@@ -0,0 +1,29 @@
import getTraceV2 from 'api/trace/getTraceV2';
import { REACT_QUERY_KEY } from 'constants/reactQueryKeys';
import { useQuery, UseQueryResult } from 'react-query';
import { ErrorResponse, SuccessResponse } from 'types/api';
import {
GetTraceV2PayloadProps,
GetTraceV2SuccessResponse,
} from 'types/api/trace/getTraceV2';
const useGetTraceV2 = (props: GetTraceV2PayloadProps): UseLicense =>
useQuery({
queryFn: () => getTraceV2(props),
// if any of the props changes then we need to trigger an API call as the older data will be obsolete
queryKey: [
REACT_QUERY_KEY.GET_TRACE_V2,
props.traceId,
props.interestedSpanId,
props.uncollapsedNodes,
],
enabled: !!props.traceId,
keepPreviousData: true,
});
type UseLicense = UseQueryResult<
SuccessResponse<GetTraceV2SuccessResponse> | ErrorResponse,
unknown
>;
export default useGetTraceV2;

View File

@@ -0,0 +1,13 @@
.trace-layout {
display: flex;
flex-direction: column;
gap: 25px;
.flame-graph {
display: flex;
align-items: center;
justify-content: center;
height: 40vh;
border: 1px solid #d9d9d9;
}
}

View File

@@ -0,0 +1,16 @@
import './TraceDetailV2.styles.scss';
import { Typography } from 'antd';
import TraceWaterfall from 'container/TraceWaterfall/TraceWaterfall';
function TraceDetailsV2(): JSX.Element {
return (
<div className="trace-layout">
<Typography.Text>Trace Details V2 Layout</Typography.Text>
<div className="flame-graph">FlameGraph comes here!</div>
<TraceWaterfall />
</div>
);
}
export default TraceDetailsV2;

View File

@@ -0,0 +1,35 @@
export interface TraceDetailV2URLProps {
id: string;
}
export interface GetTraceV2PayloadProps {
traceId: string;
interestedSpanId: string;
uncollapsedNodes: string[];
}
export interface Span {
timestamp: number;
durationNano: number;
spanId: string;
rootSpanId: string;
parentSpanId: string;
traceId: string;
hasError: boolean;
kind: number;
serviceName: string;
name: string;
references: any;
tagMap: Record<string, string>;
event: string[];
rootName: string;
statusMessage: string;
statusCodeString: string;
spanKind: string;
hasChildren: boolean;
level: number;
}
export interface GetTraceV2SuccessResponse {
spans: Span[];
}

View File

@@ -42,6 +42,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/resource"
"go.signoz.io/signoz/pkg/query-service/app/services"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
@@ -156,6 +157,8 @@ type ClickHouseReader struct {
traceLocalTableName string
traceResourceTableV3 string
traceSummaryTable string
Cache cache.Cache
}
// NewTraceReader returns a TraceReader for the database
@@ -169,6 +172,7 @@ func NewReader(
cluster string,
useLogsNewSchema bool,
useTraceNewSchema bool,
cache cache.Cache,
) *ClickHouseReader {
datasource := os.Getenv("ClickHouseUrl")
@@ -179,7 +183,7 @@ func NewReader(
zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err))
}
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema)
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, cache)
}
func NewReaderFromClickhouseConnection(
@@ -191,6 +195,7 @@ func NewReaderFromClickhouseConnection(
cluster string,
useLogsNewSchema bool,
useTraceNewSchema bool,
cache cache.Cache,
) *ClickHouseReader {
alertManager, err := am.New()
if err != nil {
@@ -276,6 +281,8 @@ func NewReaderFromClickhouseConnection(
traceTableName: traceTableName,
traceResourceTableV3: options.primary.TraceResourceTableV3,
traceSummaryTable: options.primary.TraceSummaryTable,
Cache: cache,
}
}
@@ -1441,6 +1448,311 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.Searc
return &searchSpansResult, nil
}
func contains(slice []string, item string) bool {
for _, v := range slice {
if v == item {
return true
}
}
return false
}
var temp_cache map[string]model.SearchTracesV3Cache = make(map[string]model.SearchTracesV3Cache)
func (r *ClickHouseReader) SearchTracesV3(ctx context.Context, traceID string, req *model.SearchTracesV3Params) (*model.SearchTracesV3Response, *model.ApiError) {
trace := new(model.SearchTracesV3Response)
var startTime, endTime, durationNano uint64
var spanIdToSpanNodeMap = map[string]*model.Span{}
var traceRoots []string
var useCache bool = true
// get the trace tree from cache!
// cachedTraceData, cacheStatus, err := r.Cache.Retrieve(fmt.Sprintf("trace-detail-%v", traceID), false)
// if err != nil {
// // if there is error in retrieving the cache, log the same and move with ch queries.
// zap.L().Debug("error in retrieving cached trace data", zap.Error(err))
// useCache = false
// }
val, exist := temp_cache[fmt.Sprintf("trace-detail-%v", traceID)]
if !exist {
zap.L().Debug("error in retrieving cached trace data")
useCache = false
} else {
zap.L().Info("cache is successfully hit, applying cache for trace details", zap.String("traceID", traceID))
startTime = val.StartTime
endTime = val.EndTime
durationNano = val.DurationNano
spanIdToSpanNodeMap = val.SpanIdToSpanNodeMap
traceRoots = val.TraceRoots
}
// if there is no error and there has been a perfect hit for cache then get the data
// if err == nil && cacheStatus == status.RetrieveStatusHit {
// startBeforeCacheUnmarshelling := time.Now()
// var cachedTraceResponse = new(model.SearchTracesV3Cache)
// err = json.Unmarshal(cachedTraceData, cachedTraceResponse)
// endAfterCacheUnmarshelling := time.Now()
// zap.L().Info("cache unmarshelling took", zap.Duration("time", endAfterCacheUnmarshelling.Sub(startBeforeCacheUnmarshelling)))
// if err != nil {
// // log the error and move ahead with clickhouse queries
// zap.L().Debug("error in unmarshalling the cached data", zap.Error(err))
// useCache = false
// }
// if err == nil {
// // cache hit is successful, retrieve the required data
// zap.L().Info("cache is successfully hit, applying cache for trace details", zap.String("traceID", traceID))
// startTime = cachedTraceResponse.StartTime
// endTime = cachedTraceResponse.EndTime
// durationNano = cachedTraceResponse.DurationNano
// spanIdToSpanNodeMap = cachedTraceResponse.SpanIdToSpanNodeMap
// traceRoots = cachedTraceResponse.TraceRoots
// }
// }
if !useCache {
zap.L().Info("cache miss for trace details", zap.String("traceID", traceID))
// fetch the start, end and number of spans from the summary table, start and end are required for the trace query
var traceSummary model.TraceSummary
summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE trace_id=$1", r.TraceDB, r.traceSummaryTable)
err := r.db.QueryRow(ctx, summaryQuery, traceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans)
if err != nil {
if err == sql.ErrNoRows {
return trace, nil
}
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %w", err)}
}
// fetch all the spans belonging to the trace from the main table
var searchScanResponses []model.SpanItemV2
query := fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error, kind, resource_string_service$$name, name, references, attributes_string, attributes_number, attributes_bool, resources_string, events, status_message, status_code_string, kind_string , parent_span_id FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName)
start := time.Now()
err = r.db.Select(ctx, &searchScanResponses, query, traceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10))
zap.L().Info(query)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %w", err)}
}
end := time.Now()
zap.L().Debug("searchTracesV3SQLQuery took: ", zap.Duration("duration", end.Sub(start)))
// create the trace tree based on the spans fetched above
// create a map of [spanId]: spanNode
for _, item := range searchScanResponses {
// get the span refs in the OTELSpanRef model
ref := []model.OtelSpanRef{}
err := json.Unmarshal([]byte(item.References), &ref)
if err != nil {
zap.L().Error("Error unmarshalling references", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error in unmarshalling references: %w", err)}
}
// merge attributes_number and attributes_bool to attributes_string
for k, v := range item.Attributes_bool {
item.Attributes_string[k] = fmt.Sprintf("%v", v)
}
for k, v := range item.Attributes_number {
item.Attributes_string[k] = fmt.Sprintf("%v", v)
}
for k, v := range item.Resources_string {
item.Attributes_string[k] = v
}
// create the span node
jsonItem := model.Span{
SpanID: item.SpanID,
TraceID: item.TraceID,
ServiceName: item.ServiceName,
Name: item.Name,
Kind: int32(item.Kind),
DurationNano: int64(item.DurationNano),
HasError: item.HasError,
StatusMessage: item.StatusMessage,
StatusCodeString: item.StatusCodeString,
SpanKind: item.SpanKind,
References: ref,
Events: item.Events,
TagMap: item.Attributes_string,
ParentSpanId: item.ParentSpanId,
Children: make([]*model.Span, 0),
}
jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000)
// assign the span node to the span map
spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem
// metadata calculation
if startTime == 0 || jsonItem.TimeUnixNano < startTime {
startTime = jsonItem.TimeUnixNano
}
if endTime == 0 || jsonItem.TimeUnixNano > endTime {
endTime = jsonItem.TimeUnixNano
}
if durationNano == 0 || uint64(jsonItem.DurationNano) > durationNano {
durationNano = uint64(jsonItem.DurationNano)
}
}
// traverse through the map and append each node to the children array of the parent node
// capture the root nodes as well
for _, spanNode := range spanIdToSpanNodeMap {
if spanNode.ParentSpanId != "" {
if parentNode, exists := spanIdToSpanNodeMap[spanNode.ParentSpanId]; exists {
parentNode.Children = append(parentNode.Children, spanNode)
} else {
// insert the missing spans
missingSpan := model.Span{
SpanID: spanNode.ParentSpanId,
TraceID: spanNode.TraceID,
ServiceName: "",
Name: "Missing Span",
Kind: 0,
DurationNano: 0,
HasError: false,
StatusMessage: "",
StatusCodeString: "",
SpanKind: "",
Children: make([]*model.Span, 0),
}
missingSpan.Children = append(missingSpan.Children, spanNode)
spanIdToSpanNodeMap[missingSpan.SpanID] = &missingSpan
}
} else {
traceRoots = append(traceRoots, spanNode.SpanID)
}
}
traceCache := model.SearchTracesV3Cache{
StartTime: startTime,
EndTime: endTime,
DurationNano: durationNano,
SpanIdToSpanNodeMap: spanIdToSpanNodeMap,
TraceRoots: traceRoots,
}
temp_cache[fmt.Sprintf("trace-detail-%v", traceID)] = traceCache
// tracheCacheByte, err := json.Marshal(traceCache)
// if err != nil {
// zap.L().Debug("error in marshalling trace cached data, skipping the data to be cached", zap.Error(err))
// } else {
// r.Cache.Store(fmt.Sprintf("trace-detail-%v", traceID), tracheCacheByte, time.Minute*30)
// }
}
// determestic sort for the children based on timestamp and span name
for _, spanNode := range spanIdToSpanNodeMap {
sort.Slice(spanNode.Children, func(i, j int) bool {
if spanNode.Children[i].TimeUnixNano == spanNode.Children[j].TimeUnixNano {
return spanNode.Children[i].Name < spanNode.Children[j].Name
}
return spanNode.Children[i].TimeUnixNano < spanNode.Children[j].TimeUnixNano
})
}
// if there are multiple roots that means we have missing spans. how to handle this case ??
if len(traceRoots) > 1 {
zap.L().Info(fmt.Sprintf("the trace %v has missing spans", traceID))
return nil, nil
}
// now traverse through the tree and create a preorder traversal for the tree
// then select the range of spans based on the interested span id
var preOrderTraversal []*model.Span
var traverse func(node *model.Span, level int64)
var rootToInterestedNodePath func(node *model.Span) bool
uncollapsedNodes := req.UncollapsedNodes
// mark the current path from root to the interested node as uncollapsed
// Important - do not mark the interested node as uncollapsed in the above exercise to handle node collapses
rootToInterestedNodePath = func(node *model.Span) bool {
if node.SpanID == req.InterestedSpanID {
return true
}
isPresentInSubtreeForTheNode := false
for _, child := range node.Children {
isPresentInThisSubtree := rootToInterestedNodePath(child)
// if the interested node is present in the given subtree then add the span node to uncollapsed node list
if isPresentInThisSubtree {
uncollapsedNodes = append(uncollapsedNodes, node.SpanID)
isPresentInSubtreeForTheNode = true
}
}
return isPresentInSubtreeForTheNode
}
traverse = func(node *model.Span, level int64) {
nodeWithoutChildren := model.Span{
SpanID: node.SpanID,
TraceID: node.TraceID,
ServiceName: node.ServiceName,
TimeUnixNano: node.TimeUnixNano,
Name: node.Name,
Kind: int32(node.Kind),
DurationNano: int64(node.DurationNano),
HasError: node.HasError,
StatusMessage: node.StatusMessage,
StatusCodeString: node.StatusCodeString,
SpanKind: node.SpanKind,
References: node.References,
Events: node.Events,
TagMap: node.TagMap,
ParentSpanId: node.ParentSpanId,
Children: make([]*model.Span, 0),
HasChildren: len(node.Children) > 0,
Level: level,
}
preOrderTraversal = append(preOrderTraversal, &nodeWithoutChildren)
// traverse the child if the current node is uncollapsed
if contains(uncollapsedNodes, node.SpanID) {
for _, child := range node.Children {
traverse(child, level+1)
}
}
}
for _, rootSpanID := range traceRoots {
if rootNode, exists := spanIdToSpanNodeMap[rootSpanID]; exists {
_ = rootToInterestedNodePath(rootNode)
traverse(rootNode, 0)
}
}
interestedSpanIndex := -1
// get the index for the interested span id
if req.InterestedSpanID != "" {
for i, span := range preOrderTraversal {
if span.SpanID == req.InterestedSpanID {
interestedSpanIndex = i
break
}
}
}
// the index of the interested span id shouldn't be -1 as the span should exist
if interestedSpanIndex == -1 && req.InterestedSpanID != "" {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("interested span ID not found in the traversal")}
}
// get the 0.4*[span limit] before the interested span index
startIndex := interestedSpanIndex - 200
// get the 0.6*[span limit] after the intrested span index
endIndex := interestedSpanIndex + 300
// adjust the sliding window according to the available left and right spaces.
if startIndex < 0 {
endIndex = endIndex - startIndex
startIndex = 0
}
if endIndex > len(preOrderTraversal) {
endIndex = len(preOrderTraversal)
}
selectedSpans := preOrderTraversal[startIndex:endIndex]
// generate the response [ spans , metadata ]
trace.Spans = selectedSpans
return trace, nil
}
func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) {
response := []model.ServiceMapDependencyResponseItem{}

View File

@@ -538,6 +538,7 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
router.HandleFunc("/api/v2/traces/fields", am.ViewAccess(aH.traceFields)).Methods(http.MethodGet)
router.HandleFunc("/api/v2/traces/fields", am.EditAccess(aH.updateTraceField)).Methods(http.MethodPost)
router.HandleFunc("/api/v2/traces/{traceId}", am.ViewAccess(aH.SearchTracesV3)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/version", am.OpenAccess(aH.getVersion)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/featureFlags", am.OpenAccess(aH.getFeatureFlags)).Methods(http.MethodGet)
@@ -1769,6 +1770,29 @@ func (aH *APIHandler) SearchTraces(w http.ResponseWriter, r *http.Request) {
}
func (aH *APIHandler) SearchTracesV3(w http.ResponseWriter, r *http.Request) {
traceID := mux.Vars(r)["traceId"]
if traceID == "" {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: errors.New("traceID is required")}, nil)
return
}
req := new(model.SearchTracesV3Params)
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
RespondError(w, model.BadRequest(err), nil)
return
}
result, apiErr := aH.reader.SearchTracesV3(r.Context(), traceID, req)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.WriteJSON(w, r, result)
}
func (aH *APIHandler) listErrors(w http.ResponseWriter, r *http.Request) {
query, err := parseListErrorsRequest(r)

View File

@@ -1384,6 +1384,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
"",
true,
true,
nil,
)
q := &querier{

View File

@@ -1438,6 +1438,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
"",
true,
true,
nil,
)
q := &querier{

View File

@@ -118,6 +118,15 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
readerReady := make(chan bool)
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
if err != nil {
return nil, err
}
c = cache.NewCache(cacheOpts)
}
var reader interfaces.Reader
storage := os.Getenv("STORAGE")
if storage == "clickhouse" {
@@ -132,6 +141,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
c,
)
go clickhouseReader.Start(readerReady)
reader = clickhouseReader
@@ -146,14 +156,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err
}
}
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
if err != nil {
return nil, err
}
c = cache.NewCache(cacheOpts)
}
<-readerReady
rm, err := makeRulesManager(

View File

@@ -41,6 +41,7 @@ type Reader interface {
// Search Interfaces
SearchTraces(ctx context.Context, params *model.SearchTracesParams, smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string, levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error)
SearchTracesV3(ctx context.Context, traceID string, req *model.SearchTracesV3Params) (*model.SearchTracesV3Response, *model.ApiError)
// Setter Interfaces
SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)

View File

@@ -315,6 +315,11 @@ type SearchTracesParams struct {
MaxSpansInTrace int `json:"maxSpansInTrace"`
}
type SearchTracesV3Params struct {
InterestedSpanID string `json:"interestedSpanId"`
UncollapsedNodes []string `json:"uncollapsedNodes"`
}
type SpanFilterParams struct {
TraceID []string `json:"traceID"`
Status []string `json:"status"`

View File

@@ -269,6 +269,50 @@ type SearchSpanResponseItem struct {
SpanKind string `json:"spanKind"`
}
type Span struct {
TimeUnixNano uint64 `json:"timestamp"`
DurationNano int64 `json:"durationNano"`
SpanID string `json:"spanId"`
RootSpanID string `json:"rootSpanId"`
ParentSpanId string `json:"parentSpanId"`
TraceID string `json:"traceId"`
HasError bool `json:"hasError"`
Kind int32 `json:"kind"`
ServiceName string `json:"serviceName"`
Name string `json:"name"`
References []OtelSpanRef `json:"references,omitempty"`
TagMap map[string]string `json:"tagMap"`
Events []string `json:"event"`
RootName string `json:"rootName"`
StatusMessage string `json:"statusMessage"`
StatusCodeString string `json:"statusCodeString"`
SpanKind string `json:"spanKind"`
Children []*Span `json:"children"`
// the below two fields are for frontend to render the spans
HasChildren bool `json:"hasChildren"`
Level int64 `json:"level"`
}
type SearchTracesV3Cache struct {
StartTime uint64 `json:"startTime"`
EndTime uint64 `json:"endTime"`
DurationNano uint64 `json:"durationNano"`
SpanIdToSpanNodeMap map[string]*Span `json:"spanIdToSpanNodeMap"`
TraceRoots []string `json:"traceRoots"`
}
type SearchTracesV3Response struct {
StartTimestampMillis uint64 `json:"startTimestampMillis"`
EndTimestampMillis uint64 `json:"endTimestampMillis"`
DurationNano uint64 `json:"durationNano"`
RootServiceName string `json:"rootServiceName"`
RootServiceEntryPoint string `json:"rootServiceEntryPoint"`
TotalSpansCount uint64 `json:"totalSpansCount"`
TotalErrorSpansCount uint64 `json:"TotalErrorSpansCount"`
Spans []*Span `json:"spans"`
}
type OtelSpanRef struct {
TraceId string `json:"traceId,omitempty"`
SpanId string `json:"spanId,omitempty"`

View File

@@ -20,6 +20,7 @@ type SpanItemV2 struct {
StatusMessage string `ch:"status_message"`
StatusCodeString string `ch:"status_code_string"`
SpanKind string `ch:"kind_string"`
ParentSpanId string `ch:"parent_span_id"`
}
type TraceSummary struct {

View File

@@ -1241,7 +1241,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
}
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, nil)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
@@ -1340,7 +1340,7 @@ func TestThresholdRuleNoData(t *testing.T) {
}
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, nil)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
@@ -1448,7 +1448,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
}
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, nil)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
@@ -1573,7 +1573,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
}
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, nil)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{

View File

@@ -47,6 +47,7 @@ func NewMockClickhouseReader(
"",
true,
true,
nil,
)
return reader, mockDB