Compare commits

...

2 Commits

Author SHA1 Message Date
Srikanth Chekuri
0760c2165c Merge branch 'develop' into fix/handle-large-traces 2024-04-24 23:13:54 +05:30
makeavish
6319c53772 fix: handle-large-traces 2024-04-23 23:51:07 +05:30
8 changed files with 85 additions and 40 deletions

View File

@@ -2,10 +2,8 @@ package api
import (
"net/http"
"strconv"
"go.signoz.io/signoz/ee/query-service/app/db"
"go.signoz.io/signoz/ee/query-service/constants"
"go.signoz.io/signoz/ee/query-service/model"
baseapp "go.signoz.io/signoz/pkg/query-service/app"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
@@ -19,17 +17,13 @@ func (ah *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) {
ah.APIHandler.SearchTraces(w, r)
return
}
traceId, spanId, levelUpInt, levelDownInt, err := baseapp.ParseSearchTracesParams(r)
searchTracesParams, err := baseapp.ParseSearchTracesParams(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params")
return
}
spanLimit, err := strconv.Atoi(constants.SpanLimitStr)
if err != nil {
zap.L().Error("Error during strconv.Atoi() on SPAN_LIMIT env variable", zap.Error(err))
return
}
result, err := ah.opts.DataConnector.SearchTraces(r.Context(), traceId, spanId, levelUpInt, levelDownInt, spanLimit, db.SmartTraceAlgorithm)
result, err := ah.opts.DataConnector.SearchTraces(r.Context(), searchTracesParams, db.SmartTraceAlgorithm)
if ah.HandleError(w, err, http.StatusBadRequest) {
return
}

View File

@@ -13,6 +13,11 @@ import (
func SmartTraceAlgorithm(payload []basemodel.SearchSpanResponseItem, targetSpanId string, levelUp int, levelDown int, spanLimit int) ([]basemodel.SearchSpansResult, error) {
var spans []*model.SpanForTraceDetails
// if targetSpanId is null or not present then randomly select a span as targetSpanId
if (targetSpanId == "" || targetSpanId == "null") && len(payload) > 0 {
targetSpanId = payload[0].SpanID
}
// Build a slice of spans from the payload
for _, spanItem := range payload {
var parentID string

View File

@@ -11,7 +11,8 @@ const (
var LicenseSignozIo = "https://license.signoz.io/api/v1"
var LicenseAPIKey = GetOrDefaultEnv("SIGNOZ_LICENSE_API_KEY", "")
var SaasSegmentKey = GetOrDefaultEnv("SIGNOZ_SAAS_SEGMENT_KEY", "")
var SpanLimitStr = GetOrDefaultEnv("SPAN_LIMIT", "5000")
var SpanRenderLimitStr = GetOrDefaultEnv("SPAN_RENDER_LIMIT", "2000")
var MaxSpansInTraceStr = GetOrDefaultEnv("MAX_SPANS_IN_TRACE", "50000")
func GetOrDefaultEnv(key string, fallback string) string {
v := os.Getenv(key)

View File

@@ -1995,7 +1995,23 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU
return &usageItems, nil
}
func (r *ClickHouseReader) SearchTraces(ctx context.Context, traceId string, spanId string, levelUp int, levelDown int, spanLimit int, smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string, levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams,
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
var countSpans uint64
countQuery := fmt.Sprintf("SELECT count() as count from %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable)
err := r.db.QueryRow(ctx, countQuery, params.TraceID).Scan(&countSpans)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, fmt.Errorf("error in processing sql query")
}
if countSpans > uint64(params.MaxSpansInTrace) {
zap.L().Error("Max spans allowed in trace limit reached", zap.Int("MaxSpansInTrace", params.MaxSpansInTrace),
zap.Uint64("Count", countSpans))
return nil, fmt.Errorf("Max spans allowed in trace limit reached")
}
var searchScanResponses []model.SearchSpanDBResponseItem
@@ -2003,7 +2019,7 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, traceId string, spa
start := time.Now()
err := r.db.Select(ctx, &searchScanResponses, query, traceId)
err = r.db.Select(ctx, &searchScanResponses, query, params.TraceID)
zap.L().Info(query)
@@ -2032,9 +2048,9 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, traceId string, spa
err = r.featureFlags.CheckFeature(model.SmartTraceDetail)
smartAlgoEnabled := err == nil
if len(searchScanResponses) > spanLimit && spanId != "" && smartAlgoEnabled {
if len(searchScanResponses) > params.SpansRenderLimit && smartAlgoEnabled {
start = time.Now()
searchSpansResult, err = smartTraceAlgorithm(searchSpanResponses, spanId, levelUp, levelDown, spanLimit)
searchSpansResult, err = smartTraceAlgorithm(searchSpanResponses, params.SpanID, params.LevelUp, params.LevelDown, params.SpansRenderLimit)
if err != nil {
return nil, err
}

View File

@@ -1363,13 +1363,13 @@ func (aH *APIHandler) getServicesList(w http.ResponseWriter, r *http.Request) {
func (aH *APIHandler) SearchTraces(w http.ResponseWriter, r *http.Request) {
traceId, spanId, levelUpInt, levelDownInt, err := ParseSearchTracesParams(r)
params, err := ParseSearchTracesParams(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params")
return
}
result, err := aH.reader.SearchTraces(r.Context(), traceId, spanId, levelUpInt, levelDownInt, 0, nil)
result, err := aH.reader.SearchTraces(r.Context(), params, nil)
if aH.HandleError(w, err, http.StatusBadRequest) {
return
}

View File

@@ -17,10 +17,12 @@ import (
promModel "github.com/prometheus/common/model"
"go.uber.org/multierr"
"go.signoz.io/signoz/ee/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/app/metrics"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/constants"
baseconstants "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
@@ -248,28 +250,46 @@ func parseGetServicesRequest(r *http.Request) (*model.GetServicesParams, error)
return postData, nil
}
func ParseSearchTracesParams(r *http.Request) (string, string, int, int, error) {
func ParseSearchTracesParams(r *http.Request) (*model.SearchTracesParams, error) {
vars := mux.Vars(r)
traceId := vars["traceId"]
spanId := r.URL.Query().Get("spanId")
levelUp := r.URL.Query().Get("levelUp")
levelDown := r.URL.Query().Get("levelDown")
if levelUp == "" || levelUp == "null" {
levelUp = "0"
params := &model.SearchTracesParams{}
params.TraceID = vars["traceId"]
params.SpanID = r.URL.Query().Get("spanId")
levelUpStr := r.URL.Query().Get("levelUp")
levelDownStr := r.URL.Query().Get("levelDown")
SpanRenderLimitStr := r.URL.Query().Get("spanRenderLimit")
if levelUpStr == "" || levelUpStr == "null" {
levelUpStr = "0"
}
if levelDown == "" || levelDown == "null" {
levelDown = "0"
if levelDownStr == "" || levelDownStr == "null" {
levelDownStr = "0"
}
if SpanRenderLimitStr == "" || SpanRenderLimitStr == "null" {
SpanRenderLimitStr = constants.SpanRenderLimitStr
}
levelUpInt, err := strconv.Atoi(levelUp)
levelUpInt, err := strconv.Atoi(levelUpStr)
if err != nil {
return "", "", 0, 0, err
return nil, err
}
levelDownInt, err := strconv.Atoi(levelDown)
levelDownInt, err := strconv.Atoi(levelDownStr)
if err != nil {
return "", "", 0, 0, err
return nil, err
}
return traceId, spanId, levelUpInt, levelDownInt, nil
SpanRenderLimitInt, err := strconv.Atoi(SpanRenderLimitStr)
if err != nil {
return nil, err
}
MaxSpansInTraceInt, err := strconv.Atoi(constants.MaxSpansInTraceStr)
if err != nil {
return nil, err
}
params.LevelUp = levelUpInt
params.LevelDown = levelDownInt
params.SpansRenderLimit = SpanRenderLimitInt
params.MaxSpansInTrace = MaxSpansInTraceInt
return params, nil
}
func DoesExistInSlice(item string, list []string) bool {
@@ -325,16 +345,16 @@ func parseFilteredSpansRequest(r *http.Request, aH *APIHandler) (*model.GetFilte
}
if len(postData.Order) != 0 {
if postData.Order != constants.Ascending && postData.Order != constants.Descending {
if postData.Order != baseconstants.Ascending && postData.Order != baseconstants.Descending {
return nil, errors.New("order param is not in correct format")
}
if postData.OrderParam != constants.Duration && postData.OrderParam != constants.Timestamp {
if postData.OrderParam != baseconstants.Duration && postData.OrderParam != baseconstants.Timestamp {
return nil, errors.New("order param is not in correct format")
}
if postData.OrderParam == constants.Duration && !aH.CheckFeature(constants.DurationSort) {
return nil, model.ErrFeatureUnavailable{Key: constants.DurationSort}
} else if postData.OrderParam == constants.Timestamp && !aH.CheckFeature(constants.TimestampSort) {
return nil, model.ErrFeatureUnavailable{Key: constants.TimestampSort}
if postData.OrderParam == baseconstants.Duration && !aH.CheckFeature(baseconstants.DurationSort) {
return nil, model.ErrFeatureUnavailable{Key: baseconstants.DurationSort}
} else if postData.OrderParam == baseconstants.Timestamp && !aH.CheckFeature(baseconstants.TimestampSort) {
return nil, model.ErrFeatureUnavailable{Key: baseconstants.TimestampSort}
}
}
tags, err := extractTagKeys(postData.Tags)
@@ -674,7 +694,7 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
}
// Validate the type parameter
if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL && typeTTL != constants.LogsTTL {
if typeTTL != baseconstants.TraceTTL && typeTTL != baseconstants.MetricsTTL && typeTTL != baseconstants.LogsTTL {
return nil, fmt.Errorf("type param should be metrics|traces|logs, got %v", typeTTL)
}
@@ -713,7 +733,7 @@ func parseGetTTL(r *http.Request) (*model.GetTTLParams, error) {
return nil, fmt.Errorf("type param cannot be empty from the query")
} else {
// Validate the type parameter
if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL && typeTTL != constants.LogsTTL {
if typeTTL != baseconstants.TraceTTL && typeTTL != baseconstants.MetricsTTL && typeTTL != baseconstants.LogsTTL {
return nil, fmt.Errorf("type param should be metrics|traces|logs, got %v", typeTTL)
}
}

View File

@@ -52,7 +52,7 @@ type Reader interface {
GetNextPrevErrorIDs(ctx context.Context, params *model.GetErrorParams) (*model.NextPrevErrorIDs, *model.ApiError)
// Search Interfaces
SearchTraces(ctx context.Context, traceID string, spanId string, levelUp int, levelDown int, spanLimit int, smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string, levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error)
SearchTraces(ctx context.Context, params *model.SearchTracesParams, smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string, levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error)
// Setter Interfaces
SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)

View File

@@ -424,6 +424,15 @@ type GetFilteredSpanAggregatesParams struct {
End *time.Time
}
type SearchTracesParams struct {
TraceID string `json:"traceId"`
LevelUp int `json:"levelUp"`
LevelDown int `json:"levelDown"`
SpanID string `json:"spanId"`
SpansRenderLimit int `json:"spansRenderLimit"`
MaxSpansInTrace int `json:"maxSpansInTrace"`
}
type SpanFilterParams struct {
TraceID []string `json:"traceID"`
Status []string `json:"status"`