Compare commits

...

1 Commits

Author SHA1 Message Date
nikhilmantri0902
824ca62c61 fix: changed the querying mechanism from using @parameterization to $ parameterization 2025-10-07 12:45:54 +05:30

View File

@@ -323,6 +323,7 @@ func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context, start, end
}
operations[serviceName] = append(operations[serviceName], name)
}
fmt.Printf("===> operations :%+v\n", operations)
return &operations, nil
}
@@ -432,23 +433,23 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
avg(duration_nano) as avgDuration,
count(*) as numCalls
FROM %s.%s
WHERE resource_string_service$$name = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`,
WHERE resource_string_service$$name = $1 AND name In $2 AND timestamp>= $3 AND timestamp<= $4`,
r.TraceDB, r.traceTableName,
)
errorQuery := fmt.Sprintf(
`SELECT
count(*) as numErrors
FROM %s.%s
WHERE resource_string_service$$name = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`,
WHERE resource_string_service$$name = $1 AND name In $2 AND timestamp>= $3 AND timestamp<= $4 AND statusCode=2`,
r.TraceDB, r.traceTableName,
)
args := []interface{}{}
args = append(args,
clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
clickhouse.Named("serviceName", svc),
clickhouse.Named("names", ops),
svc,
ops,
strconv.FormatInt(queryParams.Start.UnixNano(), 10),
strconv.FormatInt(queryParams.End.UnixNano(), 10),
)
resourceSubQuery, err := r.buildResourceSubQuery(queryParams.Tags, svc, *queryParams.Start, *queryParams.End)
@@ -456,43 +457,72 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
zap.L().Error("Error in processing sql query", zap.Error(err))
return
}
query += `
AND (
resource_fingerprint GLOBAL IN ` +
resourceSubQuery +
`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`
args = append(args,
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
)
err = r.db.QueryRow(
ctx,
query,
args...,
).ScanStruct(&serviceItem)
if serviceItem.NumCalls == 0 {
return
}
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return
}
`) AND ts_bucket_start >= $5 AND ts_bucket_start <= $6`
errorQuery += `
AND (
resource_fingerprint GLOBAL IN ` +
resourceSubQuery +
`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`
`) AND ts_bucket_start >= $5 AND ts_bucket_start <= $6`
err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors)
args = append(args,
strconv.FormatInt(queryParams.Start.Unix()-1800, 10),
strconv.FormatInt(queryParams.End.Unix(), 10),
)
fmt.Printf("\n===> resourceSubQuery :%+v\n", resourceSubQuery)
fmt.Printf("\n===> query :%+v\n", query)
fmt.Printf("\n===> errorQuery :%+v\n", errorQuery)
fmt.Printf("\n===> args :%+v\n", args)
rows, err := r.db.Query(ctx, query, args...)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return
}
defer rows.Close()
found := false
for rows.Next() {
err := rows.ScanStruct(&serviceItem)
if err != nil {
zap.L().Error("Error scanning row", zap.Error(err))
return
}
found = true
}
if err := rows.Err(); err != nil {
zap.L().Error("Error iterating rows", zap.Error(err))
return
}
if !found || serviceItem.NumCalls == 0 {
return
}
errorRows, err := r.db.Query(ctx, errorQuery, args...)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return
}
defer errorRows.Close()
for errorRows.Next() {
err := errorRows.Scan(&numErrors)
if err != nil {
zap.L().Error("Error scanning error row", zap.Error(err))
return
}
}
if err := errorRows.Err(); err != nil {
zap.L().Error("Error iterating error rows", zap.Error(err))
return
}
serviceItem.ServiceName = svc
serviceItem.NumErrors = numErrors