Compare commits

...

16 Commits

Author SHA1 Message Date
nikhilmantri0902
1948cdfaa4 chore: added for comparison across group by and merges debugging quantile exact 2025-08-19 01:16:24 +05:30
nikhilmantri0902
2fb7fe49ef chore: added for comparison across group by and merges debugging quantile exact 2025-08-19 00:56:43 +05:30
nikhilmantri0902
c4762045a6 chore: added debug functionality 2025-08-18 15:34:20 +05:30
nikhilmantri0902
1541734542 feat: match query building semantics for tags of 0 length 2025-08-18 15:26:12 +05:30
nikhilmantri0902
46e5b407f7 fix: added necessary 0 numCalls handling 2025-08-18 13:53:59 +05:30
nikhilmantri0902
f2c3946101 fix: added necessary 0 numCalls handling 2025-08-18 13:52:55 +05:30
nikhilmantri0902
4dca46de40 chore: added debugging funcs 2025-08-18 13:24:28 +05:30
nikhilmantri0902
6f420abe27 chore: removed comparison block 2025-08-18 13:09:12 +05:30
nikhilmantri0902
1d9b457af6 chore: removed comparison block 2025-08-18 12:40:51 +05:30
nikhilmantri0902
d437998750 chore: tuple issue fixed 2025-08-18 12:13:08 +05:30
nikhilmantri0902
e02d0cdd98 chore: tuple issue fixed 2025-08-18 11:44:45 +05:30
nikhilmantri0902
1ad4a6699a chore: added debug logs 2025-08-18 11:24:29 +05:30
nikhilmantri0902
00ae45022b fix: added filtering based on both name and serviceName pairs 2025-08-18 11:20:08 +05:30
nikhilmantri0902
6f4a965c6d fix: added filtering based on both name and serviceName pairs 2025-08-18 11:19:01 +05:30
nikhilmantri0902
4c29b03577 chore: added logs for debugging 2025-08-15 14:15:20 +05:30
nikhilmantri0902
ea1409bc4f fix: added query optimization 2025-08-14 20:12:48 +05:30

View File

@@ -385,7 +385,7 @@ func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc
return resourceSubQuery, nil
}
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
func (r *ClickHouseReader) GetServicesOG(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
if r.indexTable == "" {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
@@ -428,7 +428,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
query := fmt.Sprintf(
`SELECT
quantile(0.99)(duration_nano) as p99,
toFloat64(quantileExact(0.99)(duration_nano)) as p99,
avg(duration_nano) as avgDuration,
count(*) as numCalls
FROM %s.%s
@@ -510,6 +510,274 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
return &serviceItems, nil
}
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
if r.indexTable == "" {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
}
topLevelOps, apiErr := r.GetTopLevelOperations(ctx, *queryParams.Start, *queryParams.End, nil)
if apiErr != nil {
return nil, apiErr
}
// Build parallel arrays for arrayZip approach
var ops []string
var svcs []string
serviceOperationsMap := make(map[string][]string)
for svc, opsList := range *topLevelOps {
// Cap operations to 1500 per service (same as original logic)
cappedOps := opsList[:int(math.Min(1500, float64(len(opsList))))]
serviceOperationsMap[svc] = cappedOps
// Add to parallel arrays
for _, op := range cappedOps {
ops = append(ops, op)
svcs = append(svcs, svc)
}
}
fmt.Printf("Operation pairs count: %d\n", len(ops))
// Build resource subquery for all services, but only include our target services
targetServices := make([]string, 0, len(*topLevelOps))
for svc := range *topLevelOps {
targetServices = append(targetServices, svc)
}
resourceSubQuery, err := r.buildResourceSubQueryForServices(queryParams.Tags, targetServices, *queryParams.Start, *queryParams.End)
if err != nil {
zap.L().Error("Error building resource subquery", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
// Build the optimized single query using arrayZip for tuple creation
query := fmt.Sprintf(`
SELECT
resource_string_service$$name AS serviceName,
toFloat64(quantileExact(0.99)(duration_nano)) AS p99,
avg(duration_nano) AS avgDuration,
count(*) AS numCalls,
countIf(statusCode = 2) AS numErrors
FROM %s.%s
WHERE (name, resource_string_service$$name) IN arrayZip(@ops, @svcs)
AND timestamp >= @start
AND timestamp <= @end
AND ts_bucket_start >= @start_bucket
AND ts_bucket_start <= @end_bucket
AND (resource_fingerprint GLOBAL IN %s)
GROUP BY serviceName
ORDER BY numCalls DESC`,
r.TraceDB, r.traceTableName, resourceSubQuery,
)
args := []interface{}{
clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
// Important: wrap slices with clickhouse.Array for IN/array params
clickhouse.Named("ops", ops),
clickhouse.Named("svcs", svcs),
}
fmt.Printf("Query: %s\n", query)
// Execute the single optimized query
rows, err := r.db.Query(ctx, query, args...)
if err != nil {
zap.L().Error("Error executing optimized services query", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
defer rows.Close()
// Process results
serviceItems := []model.ServiceItem{}
for rows.Next() {
var serviceItem model.ServiceItem
err := rows.ScanStruct(&serviceItem)
if err != nil {
zap.L().Error("Error scanning service item", zap.Error(err))
continue
}
// Skip services with zero calls (match original behavior)
if serviceItem.NumCalls == 0 {
continue
}
// Add data warning for this service
if ops, exists := serviceOperationsMap[serviceItem.ServiceName]; exists {
serviceItem.DataWarning = model.DataWarning{
TopLevelOps: ops,
}
}
// Calculate derived fields
serviceItem.CallRate = float64(serviceItem.NumCalls) / float64(queryParams.Period)
if serviceItem.NumCalls > 0 {
serviceItem.ErrorRate = float64(serviceItem.NumErrors) * 100 / float64(serviceItem.NumCalls)
}
serviceItems = append(serviceItems, serviceItem)
}
if err = rows.Err(); err != nil {
zap.L().Error("Error iterating over service results", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
// Fetch results from the original GetServicesOG for comparison
ogResults, ogErr := r.GetServicesOG(ctx, queryParams)
if ogErr != nil {
zap.L().Error("Error fetching OG service results", zap.Error(ogErr))
} else {
// Compare the optimized results with OG results
ogMap := make(map[string]model.ServiceItem)
for _, ogItem := range *ogResults {
ogMap[ogItem.ServiceName] = ogItem
}
for _, optItem := range serviceItems {
if ogItem, exists := ogMap[optItem.ServiceName]; exists {
// Compare key fields (NumCalls, NumErrors, etc.)
if optItem.NumCalls != ogItem.NumCalls ||
optItem.NumErrors != ogItem.NumErrors ||
int64(optItem.Percentile99) != int64(ogItem.Percentile99) ||
int64(optItem.AvgDuration) != int64(ogItem.AvgDuration) {
fmt.Printf(
"[Discrepancy] Service: %s | optNumCalls: %d, ogNumCalls: %d | optNumErrors: %d, ogNumErrors: %d | optP99: %.2f, ogP99: %.2f | optAvgDuration: %.2f, ogAvgDuration: %.2f\n",
optItem.ServiceName,
optItem.NumCalls, ogItem.NumCalls,
optItem.NumErrors, ogItem.NumErrors,
optItem.Percentile99, ogItem.Percentile99,
optItem.AvgDuration, ogItem.AvgDuration,
)
}
} else {
zap.L().Warn("Service present in optimized results but missing in OG results",
zap.String("service", optItem.ServiceName))
}
}
// Check for services present in OG but missing in optimized
optMap := make(map[string]struct{})
for _, optItem := range serviceItems {
optMap[optItem.ServiceName] = struct{}{}
}
for _, ogItem := range *ogResults {
if _, exists := optMap[ogItem.ServiceName]; !exists {
fmt.Printf("Service present in OG results but missing in optimized results: %s\n", ogItem.ServiceName)
}
}
}
return &serviceItems, nil
}
// buildResourceSubQueryForServices builds a resource subquery that includes only specific services
// This maintains service context while optimizing for multiple services in a single query
func (r *ClickHouseReader) buildResourceSubQueryForServices(tags []model.TagQueryParam, targetServices []string, start, end time.Time) (string, error) {
if len(targetServices) == 0 {
return "", fmt.Errorf("no target services provided")
}
if len(tags) == 0 {
// For exact parity with per-service behavior, build via resource builder with only service filter
filterSet := v3.FilterSet{}
filterSet.Items = append(filterSet.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "service.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: v3.FilterOperatorIn,
Value: targetServices,
})
resourceSubQuery, err := resource.BuildResourceSubQuery(
r.TraceDB,
r.traceResourceTableV3,
start.Unix()-1800,
end.Unix(),
&filterSet,
[]v3.AttributeKey{},
v3.AttributeKey{},
false)
if err != nil {
zap.L().Error("Error building resource subquery for services", zap.Error(err))
return "", err
}
return resourceSubQuery, nil
}
// Convert tags to filter set
filterSet := v3.FilterSet{}
for _, tag := range tags {
// Skip the collector id as we don't add it to traces
if tag.Key == "signoz.collector.id" {
continue
}
var it v3.FilterItem
it.Key = v3.AttributeKey{
Key: tag.Key,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
}
switch tag.Operator {
case model.NotInOperator:
it.Operator = v3.FilterOperatorNotIn
it.Value = tag.StringValues
case model.InOperator:
it.Operator = v3.FilterOperatorIn
it.Value = tag.StringValues
default:
return "", fmt.Errorf("operator %s not supported", tag.Operator)
}
filterSet.Items = append(filterSet.Items, it)
}
// Add service filter to limit to our target services
filterSet.Items = append(filterSet.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "service.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: v3.FilterOperatorIn,
Value: targetServices,
})
// Build resource subquery with service-specific filtering
resourceSubQuery, err := resource.BuildResourceSubQuery(
r.TraceDB,
r.traceResourceTableV3,
start.Unix()-1800,
end.Unix(),
&filterSet,
[]v3.AttributeKey{},
v3.AttributeKey{},
false)
if err != nil {
zap.L().Error("Error building resource subquery for services", zap.Error(err))
return "", err
}
return resourceSubQuery, nil
}
// buildServiceInClause creates a properly quoted IN clause for service names
func (r *ClickHouseReader) buildServiceInClause(services []string) string {
var quotedServices []string
for _, svc := range services {
// Escape single quotes and wrap in quotes
escapedSvc := strings.ReplaceAll(svc, "'", "\\'")
quotedServices = append(quotedServices, fmt.Sprintf("'%s'", escapedSvc))
}
return strings.Join(quotedServices, ", ")
}
func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string {
// status can only be two and if both are selected than they are equivalent to none selected
if _, ok := excludeMap["status"]; ok {
@@ -529,7 +797,6 @@ func getStatusFilters(query string, statusParams []string, excludeMap map[string
}
return query
}
func createTagQueryFromTagQueryParams(queryParams []model.TagQueryParam) []model.TagQuery {
tags := []model.TagQuery{}
for _, tag := range queryParams {
@@ -686,7 +953,6 @@ func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string
}
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args
}
func (r *ClickHouseReader) GetEntryPointOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, error) {
// Step 1: Get top operations for the given service
topOps, err := r.GetTopOperations(ctx, queryParams)
@@ -755,9 +1021,9 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo
query := fmt.Sprintf(`
SELECT
quantile(0.5)(durationNano) as p50,
quantile(0.95)(durationNano) as p95,
quantile(0.99)(durationNano) as p99,
toFloat64(quantileExact(0.5)(durationNano)) as p50,
toFloat64(quantileExact(0.95)(durationNano)) as p95,
toFloat64(quantileExact(0.99)(durationNano)) as p99,
COUNT(*) as numCalls,
countIf(status_code=2) as errorCount,
name
@@ -1239,11 +1505,11 @@ func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *
SELECT
src as parent,
dest as child,
result[1] AS p50,
result[2] AS p75,
result[3] AS p90,
result[4] AS p95,
result[5] AS p99,
toFloat64(result[1]) AS p50,
toFloat64(result[2]) AS p75,
toFloat64(result[3]) AS p90,
toFloat64(result[4]) AS p95,
toFloat64(result[5]) AS p99,
sum(total_count) as callCount,
sum(total_count)/ @duration AS callRate,
sum(error_count)/sum(total_count) * 100 as errorRate
@@ -1275,7 +1541,6 @@ func getLocalTableName(tableName string) string {
return tableNameSplit[0] + "." + strings.Split(tableNameSplit[1], "distributed_")[1]
}
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// uuid is used as transaction id
uuidWithHyphen := uuid.New()
@@ -1416,7 +1681,6 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
}(ttlPayload)
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}
func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// uuid is used as transaction id
uuidWithHyphen := uuid.New()
@@ -2057,7 +2321,6 @@ func (r *ClickHouseReader) ListErrors(ctx context.Context, queryParams *model.Li
return &getErrorResponses, nil
}
func (r *ClickHouseReader) CountErrors(ctx context.Context, queryParams *model.CountErrorsParams) (uint64, *model.ApiError) {
var errorCount uint64
@@ -2169,7 +2432,6 @@ func (r *ClickHouseReader) GetNextPrevErrorIDs(ctx context.Context, queryParams
return &getNextPrevErrorIDsResponse, nil
}
func (r *ClickHouseReader) getNextErrorID(ctx context.Context, queryParams *model.GetErrorParams) (string, time.Time, *model.ApiError) {
var getNextErrorIDReponse []model.NextPrevErrorIDsDBResponse
@@ -2830,7 +3092,6 @@ func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.F
return &response, nil
}
func (r *ClickHouseReader) GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
var query string
var err error
@@ -2905,7 +3166,6 @@ func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3
return &attributeValues, nil
}
func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, orgID valuer.UUID, metricName, serviceName string) (*v3.MetricMetadataResponse, error) {
unixMilli := common.PastDayRoundOff()
@@ -3577,7 +3837,6 @@ func readRow(vars []interface{}, columnNames []string, countOfNumberCols int) ([
}
return groupBy, groupAttributes, groupAttributesArray, nil
}
func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNames []string, countOfNumberCols int) ([]*v3.Series, error) {
// when groupBy is applied, each combination of cartesian product
// of attribute values is a separate series. Each item in seriesToPoints
@@ -4373,7 +4632,6 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
return timeline, nil
}
func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID(
ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.RuleStateHistoryContributor, error) {
query := fmt.Sprintf(`SELECT
@@ -4962,7 +5220,6 @@ func (r *ClickHouseReader) GetActiveTimeSeriesForMetricName(ctx context.Context,
}
return timeSeries, nil
}
func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, orgID valuer.UUID, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) {
var args []interface{}
@@ -5180,7 +5437,6 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, orgID valuer.
return &response, nil
}
func (r *ClickHouseReader) GetMetricsTimeSeriesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) {
var args []interface{}
@@ -5760,7 +6016,6 @@ func (r *ClickHouseReader) GetInspectMetrics(ctx context.Context, req *metrics_e
Series: &seriesList,
}, nil
}
func (r *ClickHouseReader) GetInspectMetricsFingerprints(ctx context.Context, attributes []string, req *metrics_explorer.InspectMetricsRequest) ([]string, *model.ApiError) {
// Build dynamic key selections and JSON extracts
var jsonExtracts []string
@@ -5933,7 +6188,6 @@ func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricNam
}
return hasLE, nil
}
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) {
cachedMetadata := make(map[string]*model.UpdateMetricsMetadata)
var missingMetrics []string