Compare commits
2 Commits
json-plan
...
issue-8964
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
569a7fa4a8 | ||
|
|
a4699b1fed |
@@ -57,7 +57,7 @@ func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler,
|
||||
FluxInterval: opts.FluxInterval,
|
||||
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
|
||||
LicensingAPI: httplicensing.NewLicensingAPI(signoz.Licensing),
|
||||
FieldsAPI: fields.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.TelemetryStore),
|
||||
FieldsAPI: fields.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.TelemetryStore, signoz.Cache),
|
||||
Signoz: signoz,
|
||||
QuerierAPI: querierAPI.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.Querier, signoz.Analytics),
|
||||
})
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
@@ -25,10 +26,12 @@ type API struct {
|
||||
func NewAPI(
|
||||
settings factory.ProviderSettings,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
cache cache.Cache,
|
||||
) *API {
|
||||
telemetryMetadataStore := telemetrymetadata.NewTelemetryMetaStore(
|
||||
settings,
|
||||
telemetryStore,
|
||||
cache,
|
||||
telemetrytraces.DBName,
|
||||
telemetrytraces.TagAttributesV2TableName,
|
||||
telemetrytraces.SpanAttributesKeysTblName,
|
||||
|
||||
@@ -230,7 +230,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
var metricTemporality map[string]metrictypes.Temporality
|
||||
if len(metricNames) > 0 {
|
||||
var err error
|
||||
metricTemporality, err = q.metadataStore.FetchTemporalityMulti(ctx, metricNames...)
|
||||
metricTemporality, err = q.metadataStore.FetchTemporalityMulti(ctx, orgID, metricNames...)
|
||||
if err != nil {
|
||||
q.logger.WarnContext(ctx, "failed to fetch metric temporality", "error", err, "metrics", metricNames)
|
||||
// Continue without temporality - statement builder will handle unspecified
|
||||
|
||||
@@ -48,6 +48,7 @@ func newProvider(
|
||||
telemetryMetadataStore := telemetrymetadata.NewTelemetryMetaStore(
|
||||
settings,
|
||||
telemetryStore,
|
||||
cache,
|
||||
telemetrytraces.DBName,
|
||||
telemetrytraces.TagAttributesV2TableName,
|
||||
telemetrytraces.SpanAttributesKeysTblName,
|
||||
|
||||
@@ -117,7 +117,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
|
||||
FluxInterval: config.Querier.FluxInterval,
|
||||
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
|
||||
LicensingAPI: nooplicensing.NewLicenseAPI(),
|
||||
FieldsAPI: fields.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.TelemetryStore),
|
||||
FieldsAPI: fields.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.TelemetryStore, signoz.Cache),
|
||||
Signoz: signoz,
|
||||
QuerierAPI: querierAPI.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.Querier, signoz.Analytics),
|
||||
})
|
||||
|
||||
@@ -164,6 +164,7 @@ func (b *resourceFilterStatementBuilder[T]) addConditions(
|
||||
FullTextColumn: b.fullTextColumn,
|
||||
JsonBodyPrefix: b.jsonBodyPrefix,
|
||||
JsonKeyToKey: b.jsonKeyToKey,
|
||||
OnlyResourceFilter: true, // Only process resource terms
|
||||
SkipFullTextFilter: true,
|
||||
SkipFunctionCalls: true,
|
||||
// there is no need for "key" not found error for resource filtering
|
||||
|
||||
@@ -18,6 +18,199 @@ import (
|
||||
|
||||
var searchTroubleshootingGuideURL = "https://signoz.io/docs/userguide/search-troubleshooting/"
|
||||
|
||||
// BooleanExpression represents a boolean expression with proper evaluation context
|
||||
type BooleanExpression struct {
|
||||
SQL string
|
||||
IsTrue bool
|
||||
IsEmpty bool
|
||||
}
|
||||
|
||||
// NewBooleanExpression creates a BooleanExpression from SQL
|
||||
func NewBooleanExpression(sql string) BooleanExpression {
|
||||
return BooleanExpression{
|
||||
SQL: sql,
|
||||
IsTrue: sql == "true",
|
||||
IsEmpty: sql == "",
|
||||
}
|
||||
}
|
||||
|
||||
// booleanEvaluatingVisitor is a specialized visitor for resource filter context
|
||||
// that properly applies boolean algebra during tree traversal
|
||||
type booleanEvaluatingVisitor struct {
|
||||
*filterExpressionVisitor
|
||||
}
|
||||
|
||||
func newBooleanEvaluatingVisitor(opts FilterExprVisitorOpts) *booleanEvaluatingVisitor {
|
||||
return &booleanEvaluatingVisitor{
|
||||
filterExpressionVisitor: newFilterExpressionVisitor(opts),
|
||||
}
|
||||
}
|
||||
|
||||
// Visit dispatches to boolean-aware visit methods
|
||||
func (v *booleanEvaluatingVisitor) Visit(tree antlr.ParseTree) any {
|
||||
if tree == nil {
|
||||
return NewBooleanExpression("")
|
||||
}
|
||||
|
||||
switch t := tree.(type) {
|
||||
case *grammar.QueryContext:
|
||||
return v.VisitQuery(t)
|
||||
case *grammar.ExpressionContext:
|
||||
return v.VisitExpression(t)
|
||||
case *grammar.OrExpressionContext:
|
||||
return v.VisitOrExpression(t)
|
||||
case *grammar.AndExpressionContext:
|
||||
return v.VisitAndExpression(t)
|
||||
case *grammar.UnaryExpressionContext:
|
||||
return v.VisitUnaryExpression(t)
|
||||
case *grammar.PrimaryContext:
|
||||
return v.VisitPrimary(t)
|
||||
default:
|
||||
// For leaf nodes, delegate to original visitor and wrap result
|
||||
result := v.filterExpressionVisitor.Visit(tree)
|
||||
if sql, ok := result.(string); ok {
|
||||
return NewBooleanExpression(sql)
|
||||
}
|
||||
return NewBooleanExpression("")
|
||||
}
|
||||
}
|
||||
|
||||
func (v *booleanEvaluatingVisitor) VisitQuery(ctx *grammar.QueryContext) any {
|
||||
return v.Visit(ctx.Expression())
|
||||
}
|
||||
|
||||
func (v *booleanEvaluatingVisitor) VisitExpression(ctx *grammar.ExpressionContext) any {
|
||||
return v.Visit(ctx.OrExpression())
|
||||
}
|
||||
|
||||
func (v *booleanEvaluatingVisitor) VisitOrExpression(ctx *grammar.OrExpressionContext) any {
|
||||
andExpressions := ctx.AllAndExpression()
|
||||
|
||||
var result BooleanExpression
|
||||
hasTrue := false
|
||||
hasEmpty := false
|
||||
|
||||
for i, expr := range andExpressions {
|
||||
exprResult := v.Visit(expr).(BooleanExpression)
|
||||
if exprResult.IsTrue {
|
||||
hasTrue = true
|
||||
}
|
||||
if exprResult.IsEmpty {
|
||||
hasEmpty = true
|
||||
}
|
||||
|
||||
if i == 0 {
|
||||
result = exprResult
|
||||
} else {
|
||||
if result.IsEmpty {
|
||||
result = exprResult
|
||||
} else if !exprResult.IsEmpty {
|
||||
sql := v.builder.Or(result.SQL, exprResult.SQL)
|
||||
result = NewBooleanExpression(sql)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// In resource filter context, if any operand is empty (meaning "include all resources"),
|
||||
// the entire OR should be empty (include all resources)
|
||||
if hasEmpty && v.onlyResourceFilter {
|
||||
result.IsEmpty = true
|
||||
result.IsTrue = true
|
||||
result.SQL = ""
|
||||
} else if hasTrue {
|
||||
// Mark as always true if any operand is true, but preserve the SQL structure
|
||||
result.IsTrue = true
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (v *booleanEvaluatingVisitor) VisitAndExpression(ctx *grammar.AndExpressionContext) any {
|
||||
unaryExpressions := ctx.AllUnaryExpression()
|
||||
|
||||
var result BooleanExpression
|
||||
allTrue := true
|
||||
|
||||
for i, expr := range unaryExpressions {
|
||||
exprResult := v.Visit(expr).(BooleanExpression)
|
||||
if !exprResult.IsTrue && !exprResult.IsEmpty {
|
||||
allTrue = false
|
||||
}
|
||||
|
||||
if i == 0 {
|
||||
result = exprResult
|
||||
} else {
|
||||
// Apply boolean AND logic
|
||||
if exprResult.IsTrue {
|
||||
// A AND true = A, continue with result
|
||||
continue
|
||||
}
|
||||
if result.IsTrue {
|
||||
result = exprResult
|
||||
} else if result.IsEmpty {
|
||||
result = exprResult
|
||||
} else if !exprResult.IsEmpty {
|
||||
sql := v.builder.And(result.SQL, exprResult.SQL)
|
||||
result = NewBooleanExpression(sql)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If all terms were "true", mark the result as always true
|
||||
if allTrue && len(unaryExpressions) > 0 {
|
||||
result.IsTrue = true
|
||||
if result.SQL == "" {
|
||||
result.SQL = "true"
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (v *booleanEvaluatingVisitor) VisitUnaryExpression(ctx *grammar.UnaryExpressionContext) any {
|
||||
result := v.Visit(ctx.Primary()).(BooleanExpression)
|
||||
|
||||
if ctx.NOT() != nil {
|
||||
// Apply NOT logic with resource filter context awareness
|
||||
if v.onlyResourceFilter {
|
||||
if result.IsTrue {
|
||||
return NewBooleanExpression("") // NOT(true) = include all resources
|
||||
}
|
||||
if result.IsEmpty {
|
||||
return NewBooleanExpression("") // NOT(empty) = include all resources
|
||||
}
|
||||
}
|
||||
|
||||
sql := fmt.Sprintf("NOT (%s)", result.SQL)
|
||||
return NewBooleanExpression(sql)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (v *booleanEvaluatingVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any {
|
||||
if ctx.OrExpression() != nil {
|
||||
result := v.Visit(ctx.OrExpression()).(BooleanExpression)
|
||||
// If no boolean simplification happened, preserve original parentheses structure
|
||||
if !result.IsEmpty && !result.IsTrue {
|
||||
// Use original visitor to get proper parentheses structure
|
||||
originalSQL := v.filterExpressionVisitor.Visit(ctx)
|
||||
if sql, ok := originalSQL.(string); ok && sql != "" {
|
||||
return NewBooleanExpression(sql)
|
||||
}
|
||||
result.SQL = fmt.Sprintf("(%s)", result.SQL)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// For other cases, delegate to original visitor
|
||||
sqlResult := v.filterExpressionVisitor.Visit(ctx)
|
||||
if sql, ok := sqlResult.(string); ok {
|
||||
return NewBooleanExpression(sql)
|
||||
}
|
||||
return NewBooleanExpression("")
|
||||
}
|
||||
|
||||
// filterExpressionVisitor implements the FilterQueryVisitor interface
|
||||
// to convert the parsed filter expressions into ClickHouse WHERE clause
|
||||
type filterExpressionVisitor struct {
|
||||
@@ -34,6 +227,7 @@ type filterExpressionVisitor struct {
|
||||
jsonBodyPrefix string
|
||||
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
|
||||
skipResourceFilter bool
|
||||
onlyResourceFilter bool
|
||||
skipFullTextFilter bool
|
||||
skipFunctionCalls bool
|
||||
ignoreNotFoundKeys bool
|
||||
@@ -52,6 +246,7 @@ type FilterExprVisitorOpts struct {
|
||||
JsonBodyPrefix string
|
||||
JsonKeyToKey qbtypes.JsonKeyToFieldFunc
|
||||
SkipResourceFilter bool
|
||||
OnlyResourceFilter bool // Only process resource terms, skip non-resource terms
|
||||
SkipFullTextFilter bool
|
||||
SkipFunctionCalls bool
|
||||
IgnoreNotFoundKeys bool
|
||||
@@ -70,6 +265,7 @@ func newFilterExpressionVisitor(opts FilterExprVisitorOpts) *filterExpressionVis
|
||||
jsonBodyPrefix: opts.JsonBodyPrefix,
|
||||
jsonKeyToKey: opts.JsonKeyToKey,
|
||||
skipResourceFilter: opts.SkipResourceFilter,
|
||||
onlyResourceFilter: opts.OnlyResourceFilter,
|
||||
skipFullTextFilter: opts.SkipFullTextFilter,
|
||||
skipFunctionCalls: opts.SkipFunctionCalls,
|
||||
ignoreNotFoundKeys: opts.IgnoreNotFoundKeys,
|
||||
@@ -160,6 +356,31 @@ func PrepareWhereClause(query string, opts FilterExprVisitorOpts) (*PreparedWher
|
||||
cond = "true"
|
||||
}
|
||||
|
||||
// In resource filter context, apply robust boolean evaluation only when needed
|
||||
if opts.OnlyResourceFilter {
|
||||
// Check if the condition contains patterns that need boolean simplification
|
||||
// We need boolean evaluation when:
|
||||
// 1. Expression contains " true" (indicating simplified non-resource terms)
|
||||
// 2. Expression is exactly "true"
|
||||
// 3. Expression contains "NOT" with true values that need simplification
|
||||
needsBooleanEval := strings.Contains(cond, " true") ||
|
||||
cond == "true" ||
|
||||
(strings.Contains(cond, "NOT") && strings.Contains(cond, "true"))
|
||||
|
||||
if needsBooleanEval {
|
||||
// Re-parse and evaluate with boolean algebra
|
||||
boolVisitor := newBooleanEvaluatingVisitor(opts)
|
||||
boolResult := boolVisitor.Visit(tree)
|
||||
if boolExpr, ok := boolResult.(BooleanExpression); ok {
|
||||
if boolExpr.IsEmpty {
|
||||
cond = "true" // Empty means include all resources
|
||||
} else {
|
||||
cond = boolExpr.SQL
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
whereClause := sqlbuilder.NewWhereClause().AddWhereExpr(visitor.builder.Args, cond)
|
||||
|
||||
return &PreparedWhereClause{whereClause, visitor.warnings, visitor.mainWarnURL}, nil
|
||||
@@ -226,22 +447,23 @@ func (v *filterExpressionVisitor) VisitExpression(ctx *grammar.ExpressionContext
|
||||
func (v *filterExpressionVisitor) VisitOrExpression(ctx *grammar.OrExpressionContext) any {
|
||||
andExpressions := ctx.AllAndExpression()
|
||||
|
||||
andExpressionConditions := make([]string, len(andExpressions))
|
||||
for i, expr := range andExpressions {
|
||||
validConditions := []string{}
|
||||
|
||||
for _, expr := range andExpressions {
|
||||
if condExpr, ok := v.Visit(expr).(string); ok && condExpr != "" {
|
||||
andExpressionConditions[i] = condExpr
|
||||
validConditions = append(validConditions, condExpr)
|
||||
}
|
||||
}
|
||||
|
||||
if len(andExpressionConditions) == 0 {
|
||||
if len(validConditions) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
if len(andExpressionConditions) == 1 {
|
||||
return andExpressionConditions[0]
|
||||
if len(validConditions) == 1 {
|
||||
return validConditions[0]
|
||||
}
|
||||
|
||||
return v.builder.Or(andExpressionConditions...)
|
||||
return v.builder.Or(validConditions...)
|
||||
}
|
||||
|
||||
// VisitAndExpression handles AND expressions
|
||||
@@ -272,6 +494,17 @@ func (v *filterExpressionVisitor) VisitUnaryExpression(ctx *grammar.UnaryExpress
|
||||
|
||||
// Check if this is a NOT expression
|
||||
if ctx.NOT() != nil {
|
||||
// In resource filter context, handle NOT specially
|
||||
if v.onlyResourceFilter {
|
||||
// NOT(true) means NOT(all non-resource terms) which means "include all resources"
|
||||
if result == "true" {
|
||||
return "" // No filtering = include all resources
|
||||
}
|
||||
// NOT(empty) should return empty (no filtering)
|
||||
if result == "" {
|
||||
return ""
|
||||
}
|
||||
}
|
||||
return fmt.Sprintf("NOT (%s)", result)
|
||||
}
|
||||
|
||||
@@ -283,7 +516,7 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any
|
||||
if ctx.OrExpression() != nil {
|
||||
// This is a parenthesized expression
|
||||
if condExpr, ok := v.Visit(ctx.OrExpression()).(string); ok && condExpr != "" {
|
||||
return fmt.Sprintf("(%s)", v.Visit(ctx.OrExpression()).(string))
|
||||
return fmt.Sprintf("(%s)", condExpr)
|
||||
}
|
||||
return ""
|
||||
} else if ctx.Comparison() != nil {
|
||||
@@ -365,6 +598,22 @@ func (v *filterExpressionVisitor) VisitComparison(ctx *grammar.ComparisonContext
|
||||
}
|
||||
}
|
||||
|
||||
// this is used to only process resource terms in resource filter context
|
||||
if v.onlyResourceFilter {
|
||||
filteredKeys := []*telemetrytypes.TelemetryFieldKey{}
|
||||
for _, key := range keys {
|
||||
if key.FieldContext == telemetrytypes.FieldContextResource {
|
||||
filteredKeys = append(filteredKeys, key)
|
||||
}
|
||||
}
|
||||
keys = filteredKeys
|
||||
if len(keys) == 0 {
|
||||
// For non-resource terms in resource filter context, return "true"
|
||||
// This ensures OR expressions work correctly (e.g., resource OR non-resource)
|
||||
return "true"
|
||||
}
|
||||
}
|
||||
|
||||
// Handle EXISTS specially
|
||||
if ctx.EXISTS() != nil {
|
||||
op := qbtypes.FilterOperatorExists
|
||||
|
||||
@@ -142,6 +142,111 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "Time series with NOT predicate containing only non-resource terms",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Aggregations: []qbtypes.LogAggregation{
|
||||
{
|
||||
Expression: "count()",
|
||||
},
|
||||
},
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "NOT (message CONTAINS 'foo' AND hasToken(body, 'bar'))",
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND NOT ((((LOWER(attributes_string['message']) LIKE LOWER(?) AND mapContains(attributes_string, 'message') = ?) AND hasToken(LOWER(body), LOWER(?))))) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY ts",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "%foo%", true, "bar", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "Time series with NOT OR mixed resource/non-resource terms",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Aggregations: []qbtypes.LogAggregation{
|
||||
{
|
||||
Expression: "count()",
|
||||
},
|
||||
},
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "NOT (service.name = 'redis-manual' OR http.method = 'GET')",
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND NOT ((((resources_string['service.name'] = ? AND mapContains(resources_string, 'service.name') = ?) OR (attributes_string['http.method'] = ? AND mapContains(attributes_string, 'http.method') = ?)))) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY ts",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "redis-manual", true, "GET", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "Time series with complex NOT expression and nested OR conditions",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Aggregations: []qbtypes.LogAggregation{
|
||||
{
|
||||
Expression: "count()",
|
||||
},
|
||||
},
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "service.name IN 'redis' AND request.type = 'External' AND http.status_code < 500 AND http.status_code >= 400 AND NOT ((http.request.header.tenant_id = '[\"tenant-1\"]' AND http.status_code = 401) OR (http.request.header.tenant_id = '[\"tenant-2\"]' AND http.status_code = 404 AND http.route = '/tenants/{tenant_id}'))",
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE ((simpleJSONExtractString(labels, 'service.name') = ?) AND labels LIKE ? AND (labels LIKE ?)) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (((resources_string['service.name'] = ?) AND mapContains(resources_string, 'service.name') = ?) AND (attributes_string['request.type'] = ? AND mapContains(attributes_string, 'request.type') = ?) AND (toFloat64(attributes_number['http.status_code']) < ? AND mapContains(attributes_number, 'http.status_code') = ?) AND (toFloat64(attributes_number['http.status_code']) >= ? AND mapContains(attributes_number, 'http.status_code') = ?) AND NOT ((((((attributes_string['http.request.header.tenant_id'] = ? AND mapContains(attributes_string, 'http.request.header.tenant_id') = ?) AND (toFloat64(attributes_number['http.status_code']) = ? AND mapContains(attributes_number, 'http.status_code') = ?))) OR (((attributes_string['http.request.header.tenant_id'] = ? AND mapContains(attributes_string, 'http.request.header.tenant_id') = ?) AND (toFloat64(attributes_number['http.status_code']) = ? AND mapContains(attributes_number, 'http.status_code') = ?) AND (attributes_string['http.route'] = ? AND mapContains(attributes_string, 'http.route') = ?))))))) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY ts",
|
||||
Args: []any{"redis", "%service.name%", "%service.name\":\"redis%", uint64(1747945619), uint64(1747983448), "redis", true, "External", true, float64(500), true, float64(400), true, "[\"tenant-1\"]", true, float64(401), true, "[\"tenant-2\"]", true, float64(404), true, "/tenants/{tenant_id}", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "Time series with complex OR expression containing NOT with nested conditions",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Aggregations: []qbtypes.LogAggregation{
|
||||
{
|
||||
Expression: "count()",
|
||||
},
|
||||
},
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "(service.name IN 'redis' AND request.type = 'External' AND http.status_code < 500 AND http.status_code >= 400 OR NOT ((http.request.header.tenant_id = '[\"tenant-1\"]' AND http.status_code = 401) OR (http.request.header.tenant_id = '[\"tenant-2\"]' AND http.status_code = 404 AND http.route = '/tenants/{tenant_id}')))",
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (((((resources_string['service.name'] = ?) AND mapContains(resources_string, 'service.name') = ?) AND (attributes_string['request.type'] = ? AND mapContains(attributes_string, 'request.type') = ?) AND (toFloat64(attributes_number['http.status_code']) < ? AND mapContains(attributes_number, 'http.status_code') = ?) AND (toFloat64(attributes_number['http.status_code']) >= ? AND mapContains(attributes_number, 'http.status_code') = ?)) OR NOT ((((((attributes_string['http.request.header.tenant_id'] = ? AND mapContains(attributes_string, 'http.request.header.tenant_id') = ?) AND (toFloat64(attributes_number['http.status_code']) = ? AND mapContains(attributes_number, 'http.status_code') = ?))) OR (((attributes_string['http.request.header.tenant_id'] = ? AND mapContains(attributes_string, 'http.request.header.tenant_id') = ?) AND (toFloat64(attributes_number['http.status_code']) = ? AND mapContains(attributes_number, 'http.status_code') = ?) AND (attributes_string['http.route'] = ? AND mapContains(attributes_string, 'http.route') = ?)))))))) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY ts",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "redis", true, "External", true, float64(500), true, float64(400), true, "[\"tenant-1\"]", true, float64(401), true, "[\"tenant-2\"]", true, float64(404), true, "/tenants/{tenant_id}", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "Time series with OR between multiple resource conditions",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Aggregations: []qbtypes.LogAggregation{
|
||||
{
|
||||
Expression: "count()",
|
||||
},
|
||||
},
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "service.name = 'redis' OR service.name = 'driver'",
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE ((simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) OR (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?)) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((resources_string['service.name'] = ? AND mapContains(resources_string, 'service.name') = ?) OR (resources_string['service.name'] = ? AND mapContains(resources_string, 'service.name') = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY ts",
|
||||
Args: []any{"redis", "%service.name%", "%service.name\":\"redis%", "driver", "%service.name%", "%service.name\":\"driver%", uint64(1747945619), uint64(1747983448), "redis", true, "driver", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
|
||||
@@ -862,6 +862,27 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
|
||||
Materialized: true,
|
||||
},
|
||||
},
|
||||
"request.type": {
|
||||
{
|
||||
Name: "request.type",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
"http.request.header.tenant_id": {
|
||||
{
|
||||
Name: "http.request.header.tenant_id",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
"http.route": {
|
||||
{
|
||||
Name: "http.route",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, keys := range keysMap {
|
||||
|
||||
@@ -4,9 +4,11 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
@@ -16,6 +18,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
"golang.org/x/exp/maps"
|
||||
)
|
||||
@@ -51,6 +54,7 @@ type telemetryMetaStore struct {
|
||||
|
||||
fm qbtypes.FieldMapper
|
||||
conditionBuilder qbtypes.ConditionBuilder
|
||||
temporalityCache *TemporalityCache
|
||||
}
|
||||
|
||||
func escapeForLike(s string) string {
|
||||
@@ -60,6 +64,7 @@ func escapeForLike(s string) string {
|
||||
func NewTelemetryMetaStore(
|
||||
settings factory.ProviderSettings,
|
||||
telemetrystore telemetrystore.TelemetryStore,
|
||||
cache cache.Cache,
|
||||
tracesDBName string,
|
||||
tracesFieldsTblName string,
|
||||
spanAttributesKeysTblName string,
|
||||
@@ -104,6 +109,34 @@ func NewTelemetryMetaStore(
|
||||
t.fm = fm
|
||||
t.conditionBuilder = conditionBuilder
|
||||
|
||||
if cache != nil &&
|
||||
strings.ToLower(os.Getenv("TEMPORALITY_CACHE_ENABLED")) != "false" {
|
||||
// TODO(srikanthccv): is there a ever a case to make this configurable?
|
||||
temporalityCacheConfig := DefaultTemporalityCacheConfig()
|
||||
t.temporalityCache = NewTemporalityCache(
|
||||
settings,
|
||||
cache,
|
||||
temporalityCacheConfig,
|
||||
func(ctx context.Context, orgID valuer.UUID, metricName string) (metrictypes.Temporality, error) {
|
||||
temporalityMap, err := t.fetchTemporalityMultiDirect(ctx, metricName)
|
||||
if err != nil {
|
||||
return metrictypes.Unknown, err
|
||||
}
|
||||
temporality, ok := temporalityMap[metricName]
|
||||
if !ok {
|
||||
return metrictypes.Unknown, nil
|
||||
}
|
||||
return temporality, nil
|
||||
},
|
||||
)
|
||||
|
||||
t.temporalityCache.SetRefreshMultiCallback(func(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]metrictypes.Temporality, error) {
|
||||
return t.fetchTemporalityMultiDirect(ctx, metricNames...)
|
||||
})
|
||||
} else {
|
||||
t.logger.Info("skipping temporality cache")
|
||||
}
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
@@ -1429,12 +1462,20 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto
|
||||
return values, complete, nil
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error) {
|
||||
func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, orgID valuer.UUID, metricName string) (metrictypes.Temporality, error) {
|
||||
if metricName == "" {
|
||||
return metrictypes.Unknown, errors.Newf(errors.TypeInternal, errors.CodeInternal, "metric name cannot be empty")
|
||||
}
|
||||
|
||||
temporalityMap, err := t.FetchTemporalityMulti(ctx, metricName)
|
||||
if t.temporalityCache != nil {
|
||||
return t.temporalityCache.Get(ctx, orgID, metricName)
|
||||
}
|
||||
|
||||
return t.fetchTemporalityDirect(ctx, metricName)
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) fetchTemporalityDirect(ctx context.Context, metricName string) (metrictypes.Temporality, error) {
|
||||
temporalityMap, err := t.fetchTemporalityMultiDirect(ctx, metricName)
|
||||
if err != nil {
|
||||
return metrictypes.Unknown, err
|
||||
}
|
||||
@@ -1447,11 +1488,20 @@ func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, metricName st
|
||||
return temporality, nil
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
|
||||
func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]metrictypes.Temporality, error) {
|
||||
if len(metricNames) == 0 {
|
||||
return make(map[string]metrictypes.Temporality), nil
|
||||
}
|
||||
|
||||
if t.temporalityCache != nil {
|
||||
return t.temporalityCache.GetMulti(ctx, orgID, metricNames)
|
||||
}
|
||||
|
||||
return t.fetchTemporalityMultiDirect(ctx, metricNames...)
|
||||
}
|
||||
|
||||
// fetchTemporalityMultiDirect fetches temporalities directly from database without cache
|
||||
func (t *telemetryMetaStore) fetchTemporalityMultiDirect(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
|
||||
result := make(map[string]metrictypes.Temporality)
|
||||
metricsTemporality, err := t.fetchMetricsTemporality(ctx, metricNames...)
|
||||
if err != nil {
|
||||
|
||||
24
pkg/telemetrymetadata/metadata_temporality_test.go
Normal file
24
pkg/telemetrymetadata/metadata_temporality_test.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package telemetrymetadata
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMetadataStore_TemporalityCacheIntegration(t *testing.T) {
|
||||
// Test that metadata store works without cache
|
||||
metaStore := NewTelemetryMetaStore(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
nil, // telemetrystore
|
||||
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "",
|
||||
nil, // No cache
|
||||
)
|
||||
|
||||
store, ok := metaStore.(*telemetryMetaStore)
|
||||
require.True(t, ok)
|
||||
assert.Nil(t, store.temporalityCache, "Should not have cache when none provided")
|
||||
}
|
||||
|
||||
@@ -53,6 +53,7 @@ func TestGetKeys(t *testing.T) {
|
||||
telemetrylogs.LogResourceKeysTblName,
|
||||
DBName,
|
||||
AttributesMetadataLocalTableName,
|
||||
nil, // No cache for tests
|
||||
)
|
||||
|
||||
rows := cmock.NewRows([]cmock.ColumnType{
|
||||
|
||||
281
pkg/telemetrymetadata/temporality_cache.go
Normal file
281
pkg/telemetrymetadata/temporality_cache.go
Normal file
@@ -0,0 +1,281 @@
|
||||
package telemetrymetadata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// TemporalityCacheEntry represents a cached temporality entry
|
||||
type TemporalityCacheEntry struct {
|
||||
Temporality metrictypes.Temporality `json:"temporality"`
|
||||
CachedAt time.Time `json:"cached_at"`
|
||||
SoftTTL time.Duration `json:"soft_ttl"`
|
||||
HardTTL time.Duration `json:"hard_ttl"`
|
||||
}
|
||||
|
||||
func (e *TemporalityCacheEntry) MarshalBinary() ([]byte, error) {
|
||||
return json.Marshal(e)
|
||||
}
|
||||
|
||||
func (e *TemporalityCacheEntry) UnmarshalBinary(data []byte) error {
|
||||
return json.Unmarshal(data, e)
|
||||
}
|
||||
|
||||
type TemporalityCache struct {
|
||||
cache cache.Cache
|
||||
logger *slog.Logger
|
||||
softTTL time.Duration
|
||||
hardTTL time.Duration
|
||||
jitterPercent int
|
||||
refreshing sync.Map // map[string]bool to track ongoing refreshes
|
||||
refreshCallback func(ctx context.Context, orgID valuer.UUID, metricName string) (metrictypes.Temporality, error)
|
||||
refreshMultiCallback func(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]metrictypes.Temporality, error)
|
||||
}
|
||||
|
||||
// TemporalityCacheConfig holds configuration for the temporality cache
|
||||
type TemporalityCacheConfig struct {
|
||||
SoftTTL time.Duration
|
||||
HardTTL time.Duration
|
||||
JitterPercent int // Percentage of TTL to use as jitter range (e.g., 10 for ±10%)
|
||||
}
|
||||
|
||||
// DefaultTemporalityCacheConfig returns default cache configuration
|
||||
func DefaultTemporalityCacheConfig() TemporalityCacheConfig {
|
||||
return TemporalityCacheConfig{
|
||||
SoftTTL: 30 * time.Minute, // Fresh data threshold
|
||||
HardTTL: 240 * time.Minute, // Maximum cache lifetime
|
||||
JitterPercent: 20, // 20% jitter
|
||||
}
|
||||
}
|
||||
|
||||
// NewTemporalityCache creates a new temporality cache
|
||||
func NewTemporalityCache(
|
||||
settings factory.ProviderSettings,
|
||||
cache cache.Cache,
|
||||
config TemporalityCacheConfig,
|
||||
refreshCallback func(ctx context.Context, orgID valuer.UUID, metricName string) (metrictypes.Temporality, error),
|
||||
) *TemporalityCache {
|
||||
cacheSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetadata/temporality_cache")
|
||||
|
||||
return &TemporalityCache{
|
||||
cache: cache,
|
||||
logger: cacheSettings.Logger(),
|
||||
softTTL: config.SoftTTL,
|
||||
hardTTL: config.HardTTL,
|
||||
jitterPercent: config.JitterPercent,
|
||||
refreshCallback: refreshCallback,
|
||||
}
|
||||
}
|
||||
|
||||
// SetRefreshMultiCallback sets the batch refresh callback
|
||||
func (tc *TemporalityCache) SetRefreshMultiCallback(callback func(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]metrictypes.Temporality, error)) {
|
||||
tc.refreshMultiCallback = callback
|
||||
}
|
||||
|
||||
func (tc *TemporalityCache) generateCacheKey(metricName string) string {
|
||||
return fmt.Sprintf("temporality:metric:%s", metricName)
|
||||
}
|
||||
|
||||
func (tc *TemporalityCache) applyJitter(duration time.Duration) time.Duration {
|
||||
if tc.jitterPercent <= 0 {
|
||||
return duration
|
||||
}
|
||||
|
||||
jitterRange := int64(float64(duration.Nanoseconds()) * float64(tc.jitterPercent) / 100.0)
|
||||
jitter := rand.Int63n(2*jitterRange) - jitterRange
|
||||
|
||||
result := duration + time.Duration(jitter)
|
||||
if result < 0 {
|
||||
return 0
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (tc *TemporalityCache) Get(ctx context.Context, orgID valuer.UUID, metricName string) (metrictypes.Temporality, error) {
|
||||
cacheKey := tc.generateCacheKey(metricName)
|
||||
|
||||
var entry TemporalityCacheEntry
|
||||
err := tc.cache.Get(ctx, orgID, cacheKey, &entry, false)
|
||||
|
||||
if err != nil {
|
||||
if !errors.Ast(err, errors.TypeNotFound) {
|
||||
tc.logger.ErrorContext(ctx, "error getting cached temporality", "metric", metricName, "error", err)
|
||||
}
|
||||
|
||||
temporality, err := tc.refreshCallback(ctx, orgID, metricName)
|
||||
if err != nil {
|
||||
return metrictypes.Unknown, err
|
||||
}
|
||||
|
||||
tc.put(ctx, orgID, metricName, temporality)
|
||||
return temporality, nil
|
||||
}
|
||||
|
||||
age := time.Since(entry.CachedAt)
|
||||
|
||||
if age < entry.SoftTTL {
|
||||
tc.logger.DebugContext(ctx, "returning fresh cached temporality",
|
||||
"metric", metricName,
|
||||
"age", age,
|
||||
"soft_ttl", entry.SoftTTL)
|
||||
return entry.Temporality, nil
|
||||
}
|
||||
|
||||
if age < entry.HardTTL {
|
||||
tc.logger.DebugContext(ctx, "returning stale cached temporality and triggering refresh",
|
||||
"metric", metricName,
|
||||
"age", age,
|
||||
"soft_ttl", entry.SoftTTL,
|
||||
"hard_ttl", entry.HardTTL)
|
||||
|
||||
tc.triggerBackgroundRefresh(ctx, orgID, metricName)
|
||||
|
||||
return entry.Temporality, nil
|
||||
}
|
||||
|
||||
tc.logger.DebugContext(ctx, "cached temporality exceeded hard TTL, fetching fresh",
|
||||
"metric", metricName,
|
||||
"age", age,
|
||||
"hard_ttl", entry.HardTTL)
|
||||
|
||||
temporality, err := tc.refreshCallback(ctx, orgID, metricName)
|
||||
if err != nil {
|
||||
// when refresh fails and we have stale data, return it as fallback
|
||||
if entry.Temporality != metrictypes.Unknown {
|
||||
tc.logger.WarnContext(ctx, "failed to refresh temporality, returning stale data",
|
||||
"metric", metricName,
|
||||
"error", err)
|
||||
return entry.Temporality, nil
|
||||
}
|
||||
return metrictypes.Unknown, err
|
||||
}
|
||||
|
||||
tc.put(ctx, orgID, metricName, temporality)
|
||||
return temporality, nil
|
||||
}
|
||||
|
||||
func (tc *TemporalityCache) GetMulti(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]metrictypes.Temporality, error) {
|
||||
result := make(map[string]metrictypes.Temporality)
|
||||
var missedMetrics []string
|
||||
staleMetrics := make(map[string]metrictypes.Temporality)
|
||||
|
||||
// for each metric, check if it fresh, stale + valid, or invalid/missing
|
||||
// then trigger fetch for all missing metrics
|
||||
for _, metricName := range metricNames {
|
||||
cacheKey := tc.generateCacheKey(metricName)
|
||||
|
||||
var entry TemporalityCacheEntry
|
||||
err := tc.cache.Get(ctx, orgID, cacheKey, &entry, false)
|
||||
|
||||
if err != nil {
|
||||
missedMetrics = append(missedMetrics, metricName)
|
||||
continue
|
||||
}
|
||||
|
||||
age := time.Since(entry.CachedAt)
|
||||
|
||||
if age < entry.SoftTTL {
|
||||
result[metricName] = entry.Temporality
|
||||
} else if age < entry.HardTTL {
|
||||
result[metricName] = entry.Temporality
|
||||
staleMetrics[metricName] = entry.Temporality
|
||||
} else {
|
||||
missedMetrics = append(missedMetrics, metricName)
|
||||
}
|
||||
}
|
||||
|
||||
for metricName := range staleMetrics {
|
||||
tc.triggerBackgroundRefresh(ctx, orgID, metricName)
|
||||
}
|
||||
|
||||
if len(missedMetrics) > 0 {
|
||||
temporalities, err := tc.refreshMulti(ctx, orgID, missedMetrics)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
for metricName, temporality := range temporalities {
|
||||
tc.put(ctx, orgID, metricName, temporality)
|
||||
result[metricName] = temporality
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (tc *TemporalityCache) put(ctx context.Context, orgID valuer.UUID, metricName string, temporality metrictypes.Temporality) {
|
||||
entry := TemporalityCacheEntry{
|
||||
Temporality: temporality,
|
||||
CachedAt: time.Now(),
|
||||
SoftTTL: tc.applyJitter(tc.softTTL),
|
||||
HardTTL: tc.applyJitter(tc.hardTTL),
|
||||
}
|
||||
|
||||
cacheKey := tc.generateCacheKey(metricName)
|
||||
|
||||
if err := tc.cache.Set(ctx, orgID, cacheKey, &entry, entry.HardTTL); err != nil {
|
||||
tc.logger.ErrorContext(ctx, "failed to cache temporality",
|
||||
"metric", metricName,
|
||||
"error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *TemporalityCache) triggerBackgroundRefresh(_ context.Context, orgID valuer.UUID, metricName string) {
|
||||
if _, loading := tc.refreshing.LoadOrStore(metricName, true); loading {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer tc.refreshing.Delete(metricName)
|
||||
|
||||
refreshCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
tc.logger.DebugContext(refreshCtx, "starting background refresh", "metric", metricName)
|
||||
|
||||
temporality, err := tc.refreshCallback(refreshCtx, orgID, metricName)
|
||||
if err != nil {
|
||||
tc.logger.ErrorContext(refreshCtx, "background refresh failed",
|
||||
"metric", metricName,
|
||||
"error", err)
|
||||
return
|
||||
}
|
||||
|
||||
tc.put(refreshCtx, orgID, metricName, temporality)
|
||||
|
||||
tc.logger.DebugContext(refreshCtx, "background refresh completed", "metric", metricName)
|
||||
}()
|
||||
}
|
||||
|
||||
func (tc *TemporalityCache) refreshMulti(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]metrictypes.Temporality, error) {
|
||||
if tc.refreshMultiCallback != nil {
|
||||
return tc.refreshMultiCallback(ctx, orgID, metricNames)
|
||||
}
|
||||
|
||||
result := make(map[string]metrictypes.Temporality)
|
||||
|
||||
for _, metricName := range metricNames {
|
||||
temporality, err := tc.refreshCallback(ctx, orgID, metricName)
|
||||
if err != nil {
|
||||
tc.logger.ErrorContext(ctx, "failed to refresh temporality",
|
||||
"metric", metricName,
|
||||
"error", err)
|
||||
result[metricName] = metrictypes.Unknown
|
||||
continue
|
||||
}
|
||||
result[metricName] = temporality
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
67
pkg/telemetrymetadata/temporality_cache_test.go
Normal file
67
pkg/telemetrymetadata/temporality_cache_test.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package telemetrymetadata
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestTemporalityCache_Jitter(t *testing.T) {
|
||||
config := TemporalityCacheConfig{
|
||||
SoftTTL: 1 * time.Minute,
|
||||
HardTTL: 5 * time.Minute,
|
||||
JitterPercent: 20,
|
||||
}
|
||||
|
||||
tc := &TemporalityCache{
|
||||
jitterPercent: config.JitterPercent,
|
||||
}
|
||||
|
||||
// Test jitter produces different values
|
||||
ttlValues := make(map[time.Duration]bool)
|
||||
for i := 0; i < 10; i++ {
|
||||
jittered := tc.applyJitter(config.SoftTTL)
|
||||
ttlValues[jittered] = true
|
||||
|
||||
// Check jitter is within expected range
|
||||
minTTL := time.Duration(float64(config.SoftTTL) * 0.8)
|
||||
maxTTL := time.Duration(float64(config.SoftTTL) * 1.2)
|
||||
assert.GreaterOrEqual(t, jittered, minTTL)
|
||||
assert.LessOrEqual(t, jittered, maxTTL)
|
||||
}
|
||||
|
||||
// Should have multiple different values due to jitter
|
||||
assert.Greater(t, len(ttlValues), 1, "Jitter should produce different TTL values")
|
||||
}
|
||||
|
||||
func TestTemporalityCacheConfig_Default(t *testing.T) {
|
||||
config := DefaultTemporalityCacheConfig()
|
||||
|
||||
assert.Equal(t, 5*time.Minute, config.SoftTTL)
|
||||
assert.Equal(t, 30*time.Minute, config.HardTTL)
|
||||
assert.Equal(t, 10, config.JitterPercent)
|
||||
}
|
||||
|
||||
func TestTemporalityCacheEntry_Serialization(t *testing.T) {
|
||||
entry := TemporalityCacheEntry{
|
||||
Temporality: metrictypes.Delta,
|
||||
CachedAt: time.Now(),
|
||||
SoftTTL: 5 * time.Minute,
|
||||
HardTTL: 30 * time.Minute,
|
||||
}
|
||||
|
||||
// Test marshaling
|
||||
data, err := entry.MarshalBinary()
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, data)
|
||||
|
||||
// Test unmarshaling
|
||||
var decoded TemporalityCacheEntry
|
||||
err = decoded.UnmarshalBinary(data)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, entry.Temporality, decoded.Temporality)
|
||||
assert.Equal(t, entry.SoftTTL, decoded.SoftTTL)
|
||||
assert.Equal(t, entry.HardTTL, decoded.HardTTL)
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// MetadataStore is the interface for the telemetry metadata store.
|
||||
@@ -26,8 +27,8 @@ type MetadataStore interface {
|
||||
GetAllValues(ctx context.Context, fieldValueSelector *FieldValueSelector) (*TelemetryFieldValues, bool, error)
|
||||
|
||||
// FetchTemporality fetches the temporality for metric
|
||||
FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error)
|
||||
FetchTemporality(ctx context.Context, orgID valuer.UUID, metricName string) (metrictypes.Temporality, error)
|
||||
|
||||
// FetchTemporalityMulti fetches the temporality for multiple metrics
|
||||
FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error)
|
||||
FetchTemporalityMulti(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]metrictypes.Temporality, error)
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// MockMetadataStore implements the MetadataStore interface for testing purposes
|
||||
@@ -258,7 +259,7 @@ func (m *MockMetadataStore) SetAllValues(lookupKey string, values *telemetrytype
|
||||
}
|
||||
|
||||
// FetchTemporality fetches the temporality for a metric
|
||||
func (m *MockMetadataStore) FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error) {
|
||||
func (m *MockMetadataStore) FetchTemporality(ctx context.Context, orgID valuer.UUID, metricName string) (metrictypes.Temporality, error) {
|
||||
if temporality, exists := m.TemporalityMap[metricName]; exists {
|
||||
return temporality, nil
|
||||
}
|
||||
@@ -266,7 +267,7 @@ func (m *MockMetadataStore) FetchTemporality(ctx context.Context, metricName str
|
||||
}
|
||||
|
||||
// FetchTemporalityMulti fetches the temporality for multiple metrics
|
||||
func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
|
||||
func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]metrictypes.Temporality, error) {
|
||||
result := make(map[string]metrictypes.Temporality)
|
||||
|
||||
for _, metricName := range metricNames {
|
||||
|
||||
Reference in New Issue
Block a user