Compare commits

...

19 Commits

Author SHA1 Message Date
nityanandagohain
8be9a79d56 fix: use name from evolutions 2025-12-26 23:33:34 +05:30
nityanandagohain
471ad88971 fix: address changes and add tests 2025-12-26 23:25:43 +05:30
nityanandagohain
a5c46beeec Merge remote-tracking branch 'origin/main' into issue_3017 2025-12-23 18:21:40 +05:30
nityanandagohain
41f720950d fix: minor changes 2025-12-09 10:46:09 +07:00
nityanandagohain
d9bce4a3c6 fix: lint issues 2025-12-08 22:06:38 +07:00
nityanandagohain
a5ac40c33c fix: test 2025-12-08 21:40:30 +07:00
nityanandagohain
86b1366d4a fix: comments 2025-12-08 21:31:26 +07:00
nityanandagohain
eddb43a901 fix: aggregation 2025-12-08 21:30:07 +07:00
nityanandagohain
505cfe2314 fix: use orgId properly 2025-12-08 20:57:17 +07:00
nityanandagohain
6e54ee822a fix: use proper cache 2025-12-08 20:46:56 +07:00
nityanandagohain
d88cb8aba4 fix: minor changes 2025-12-08 20:18:34 +07:00
nityanandagohain
b823b2a1e1 fix: minor changes 2025-12-08 20:14:01 +07:00
nityanandagohain
7cfb7118a3 fix: update tests 2025-12-08 19:54:01 +07:00
nityanandagohain
59dfe7c0ed fix: remove goroutine 2025-12-08 19:11:22 +07:00
nityanandagohain
96b68b91c9 fix: update comment 2025-12-06 08:29:25 +05:30
nityanandagohain
be6ce8d4f1 fix: update fetch code 2025-12-06 08:27:53 +05:30
nityanandagohain
1fc58695c6 fix: tests 2025-12-06 06:47:29 +05:30
nityanandagohain
43450a187e Merge remote-tracking branch 'origin/main' into issue_3017 2025-12-06 05:15:26 +05:30
nityanandagohain
f4666d9c97 feat: time aware dynamic field mapper 2025-11-25 09:59:12 +05:30
35 changed files with 786 additions and 127 deletions

View File

@@ -100,8 +100,10 @@ func newProvider(
traceAggExprRewriter,
)
logsKeyEvolutionMetadata := telemetrylogs.NewKeyEvolutionMetadata(telemetryStore, cache, settings.Logger)
// Create log statement builder
logFieldMapper := telemetrylogs.NewFieldMapper()
logFieldMapper := telemetrylogs.NewFieldMapper(logsKeyEvolutionMetadata)
logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper)
logResourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder(
settings,

View File

@@ -51,6 +51,8 @@ func NewAggExprRewriter(
// and the args if the parametric aggregation function is used.
func (r *aggExprRewriter) Rewrite(
ctx context.Context,
startNs uint64,
endNs uint64,
expr string,
rateInterval uint64,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
@@ -77,7 +79,12 @@ func (r *aggExprRewriter) Rewrite(
return "", nil, errors.NewInternalf(errors.CodeInternal, "no SELECT items for %q", expr)
}
visitor := newExprVisitor(r.logger, keys,
visitor := newExprVisitor(
ctx,
startNs,
endNs,
r.logger,
keys,
r.fullTextColumn,
r.fieldMapper,
r.conditionBuilder,
@@ -98,6 +105,8 @@ func (r *aggExprRewriter) Rewrite(
// RewriteMulti rewrites a slice of expressions.
func (r *aggExprRewriter) RewriteMulti(
ctx context.Context,
startNs uint64,
endNs uint64,
exprs []string,
rateInterval uint64,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
@@ -106,7 +115,7 @@ func (r *aggExprRewriter) RewriteMulti(
var errs []error
var chArgsList [][]any
for i, e := range exprs {
w, chArgs, err := r.Rewrite(ctx, e, rateInterval, keys)
w, chArgs, err := r.Rewrite(ctx, startNs, endNs, e, rateInterval, keys)
if err != nil {
errs = append(errs, err)
out[i] = e
@@ -123,6 +132,9 @@ func (r *aggExprRewriter) RewriteMulti(
// exprVisitor walks FunctionExpr nodes and applies the mappers.
type exprVisitor struct {
ctx context.Context
startNs uint64
endNs uint64
chparser.DefaultASTVisitor
logger *slog.Logger
fieldKeys map[string][]*telemetrytypes.TelemetryFieldKey
@@ -137,6 +149,9 @@ type exprVisitor struct {
}
func newExprVisitor(
ctx context.Context,
startNs uint64,
endNs uint64,
logger *slog.Logger,
fieldKeys map[string][]*telemetrytypes.TelemetryFieldKey,
fullTextColumn *telemetrytypes.TelemetryFieldKey,
@@ -146,6 +161,9 @@ func newExprVisitor(
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) *exprVisitor {
return &exprVisitor{
ctx: ctx,
startNs: startNs,
endNs: endNs,
logger: logger,
fieldKeys: fieldKeys,
fullTextColumn: fullTextColumn,
@@ -190,7 +208,7 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
if aggFunc.FuncCombinator {
// Map the predicate (last argument)
origPred := args[len(args)-1].String()
whereClause, err := PrepareWhereClause(
whereClause, err := PrepareWhereClause(
origPred,
FilterExprVisitorOpts{
Logger: v.logger,
@@ -199,7 +217,7 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
ConditionBuilder: v.conditionBuilder,
FullTextColumn: v.fullTextColumn,
JsonKeyToKey: v.jsonKeyToKey,
}, 0, 0,
}, v.startNs, v.endNs,
)
if err != nil {
return err
@@ -219,7 +237,7 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
for i := 0; i < len(args)-1; i++ {
origVal := args[i].String()
fieldKey := telemetrytypes.GetFieldKeyFromKeyText(origVal)
expr, exprArgs, err := CollisionHandledFinalExpr(context.Background(), &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonBodyPrefix, v.jsonKeyToKey)
expr, exprArgs, err := CollisionHandledFinalExpr(v.ctx, v.startNs, v.endNs, &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonBodyPrefix, v.jsonKeyToKey)
if err != nil {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "failed to get table field name for %q", origVal)
}
@@ -237,7 +255,7 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
for i, arg := range args {
orig := arg.String()
fieldKey := telemetrytypes.GetFieldKeyFromKeyText(orig)
expr, exprArgs, err := CollisionHandledFinalExpr(context.Background(), &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonBodyPrefix, v.jsonKeyToKey)
expr, exprArgs, err := CollisionHandledFinalExpr(v.ctx, v.startNs, v.endNs, &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonBodyPrefix, v.jsonKeyToKey)
if err != nil {
return err
}

View File

@@ -19,6 +19,8 @@ import (
func CollisionHandledFinalExpr(
ctx context.Context,
startNs uint64,
endNs uint64,
field *telemetrytypes.TelemetryFieldKey,
fm qbtypes.FieldMapper,
cb qbtypes.ConditionBuilder,
@@ -45,7 +47,7 @@ func CollisionHandledFinalExpr(
addCondition := func(key *telemetrytypes.TelemetryFieldKey) error {
sb := sqlbuilder.NewSelectBuilder()
condition, err := cb.ConditionFor(ctx, key, qbtypes.FilterOperatorExists, nil, sb, 0, 0)
condition, err := cb.ConditionFor(ctx, key, qbtypes.FilterOperatorExists, nil, sb, startNs, endNs)
if err != nil {
return err
}
@@ -58,7 +60,7 @@ func CollisionHandledFinalExpr(
return nil
}
colName, err := fm.FieldFor(ctx, field)
colName, err := fm.FieldFor(ctx, startNs, endNs, field)
if errors.Is(err, qbtypes.ErrColumnNotFound) {
// the key didn't have the right context to be added to the query
// we try to use the context we know of
@@ -93,7 +95,7 @@ func CollisionHandledFinalExpr(
if err != nil {
return "", nil, err
}
colName, _ = fm.FieldFor(ctx, key)
colName, _ = fm.FieldFor(ctx, startNs, endNs, key)
colName, _ = DataTypeCollisionHandledFieldName(key, dummyValue, colName, qbtypes.FilterOperatorUnknown)
stmts = append(stmts, colName)
}

View File

@@ -48,8 +48,8 @@ func (b *defaultConditionBuilder) ConditionFor(
op qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
_ uint64,
_ uint64,
startNs uint64,
endNs uint64,
) (string, error) {
if key.FieldContext != telemetrytypes.FieldContextResource {
@@ -68,7 +68,7 @@ func (b *defaultConditionBuilder) ConditionFor(
keyIdxFilter := sb.Like(column.Name, keyIndexFilter(key))
valueForIndexFilter := valueForIndexFilter(op, key, value)
fieldName, err := b.fm.FieldFor(ctx, key)
fieldName, err := b.fm.FieldFor(ctx, startNs, endNs, key)
if err != nil {
return "", err
}

View File

@@ -47,6 +47,7 @@ func (m *defaultFieldMapper) ColumnFor(
func (m *defaultFieldMapper) FieldFor(
ctx context.Context,
tsStart, tsEnd uint64,
key *telemetrytypes.TelemetryFieldKey,
) (string, error) {
column, err := m.getColumn(ctx, key)
@@ -61,10 +62,11 @@ func (m *defaultFieldMapper) FieldFor(
func (m *defaultFieldMapper) ColumnExpressionFor(
ctx context.Context,
tsStart, tsEnd uint64,
key *telemetrytypes.TelemetryFieldKey,
_ map[string][]*telemetrytypes.TelemetryFieldKey,
) (string, error) {
colName, err := m.FieldFor(ctx, key)
colName, err := m.FieldFor(ctx, tsStart, tsEnd, key)
if err != nil {
return "", err
}

View File

@@ -25,6 +25,7 @@ func NewConditionBuilder(fm qbtypes.FieldMapper) *conditionBuilder {
func (c *conditionBuilder) conditionFor(
ctx context.Context,
startNs, endNs uint64,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
@@ -46,7 +47,7 @@ func (c *conditionBuilder) conditionFor(
return "", err
}
tblFieldName, err := c.fm.FieldFor(ctx, key)
tblFieldName, err := c.fm.FieldFor(ctx, startNs, endNs, key)
if err != nil {
return "", err
}
@@ -239,10 +240,10 @@ func (c *conditionBuilder) ConditionFor(
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
_ uint64,
_ uint64,
startNs uint64,
endNs uint64,
) (string, error) {
condition, err := c.conditionFor(ctx, key, operator, value, sb)
condition, err := c.conditionFor(ctx, startNs, endNs, key, operator, value, sb)
if err != nil {
return "", err
}
@@ -250,12 +251,12 @@ func (c *conditionBuilder) ConditionFor(
if operator.AddDefaultExistsFilter() {
// skip adding exists filter for intrinsic fields
// with an exception for body json search
field, _ := c.fm.FieldFor(ctx, key)
field, _ := c.fm.FieldFor(ctx, startNs, endNs, key)
if slices.Contains(maps.Keys(IntrinsicFields), field) && key.FieldContext != telemetrytypes.FieldContextBody {
return condition, nil
}
existsCondition, err := c.conditionFor(ctx, key, qbtypes.FilterOperatorExists, nil, sb)
existsCondition, err := c.conditionFor(ctx, startNs, endNs, key, qbtypes.FilterOperatorExists, nil, sb)
if err != nil {
return "", err
}

View File

@@ -6,6 +6,7 @@ import (
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/huandu/go-sqlbuilder"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -372,7 +373,8 @@ func TestConditionFor(t *testing.T) {
},
}
fm := NewFieldMapper()
storeWithMetadata := telemetrytypestest.NewMockKeyEvolutionMetadataStore(nil)
fm := NewFieldMapper(storeWithMetadata)
conditionBuilder := NewConditionBuilder(fm)
for _, tc := range testCases {
@@ -425,7 +427,8 @@ func TestConditionForMultipleKeys(t *testing.T) {
},
}
fm := NewFieldMapper()
storeWithMetadata := telemetrytypestest.NewMockKeyEvolutionMetadataStore(nil)
fm := NewFieldMapper(storeWithMetadata)
conditionBuilder := NewConditionBuilder(fm)
for _, tc := range testCases {
@@ -684,7 +687,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
},
}
fm := NewFieldMapper()
storeWithMetadata := telemetrytypestest.NewMockKeyEvolutionMetadataStore(nil)
fm := NewFieldMapper(storeWithMetadata)
conditionBuilder := NewConditionBuilder(fm)
for _, tc := range testCases {

View File

@@ -4,11 +4,14 @@ import (
"context"
"fmt"
"strings"
"time"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/authtypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/huandu/go-sqlbuilder"
"golang.org/x/exp/maps"
@@ -55,10 +58,13 @@ var (
)
type fieldMapper struct {
evolutionMetadataStore telemetrytypes.KeyEvolutionMetadataStore
}
func NewFieldMapper() qbtypes.FieldMapper {
return &fieldMapper{}
func NewFieldMapper(evolutionMetadataStore telemetrytypes.KeyEvolutionMetadataStore) qbtypes.FieldMapper {
return &fieldMapper{
evolutionMetadataStore: evolutionMetadataStore,
}
}
func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
@@ -100,7 +106,7 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
return nil, qbtypes.ErrColumnNotFound
}
func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (string, error) {
func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
column, err := m.getColumn(ctx, key)
if err != nil {
return "", err
@@ -112,17 +118,34 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
if key.FieldContext != telemetrytypes.FieldContextResource {
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
}
oldColumn := logsV2Columns["resources_string"]
oldKeyName := fmt.Sprintf("%s['%s']", oldColumn.Name, key.Name)
// have to add ::string as clickHouse throws an error :- data types Variant/Dynamic are not allowed in GROUP BY
// once clickHouse dependency is updated, we need to check if we can remove it.
baseColumn := logsV2Columns["resources_string"]
tsStartTime := time.Unix(0, int64(tsStart))
// Extract orgId from context
var orgID valuer.UUID
if claims, err := authtypes.ClaimsFromContext(ctx); err == nil {
orgID, err = valuer.NewUUID(claims.OrgID)
if err != nil {
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid orgId %s", claims.OrgID)
}
}
// get all evolution for the column
evolutions := m.evolutionMetadataStore.Get(ctx, orgID, baseColumn.Name)
// restricting now to just one entry where we know we changed from map to json
if len(evolutions) > 0 && evolutions[0].ReleaseTime.Before(tsStartTime) {
return fmt.Sprintf("%s.`%s`::String", evolutions[0].NewColumn, key.Name), nil
}
if key.Materialized {
oldKeyName = telemetrytypes.FieldKeyToMaterializedColumnName(key)
oldKeyName := telemetrytypes.FieldKeyToMaterializedColumnName(key)
oldKeyNameExists := telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, %s==true, %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldKeyNameExists, oldKeyName), nil
} else {
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil
attrVal := fmt.Sprintf("%s['%s']", baseColumn.Name, key.Name)
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, baseColumn.Name, key.Name, attrVal), nil
}
case schema.ColumnTypeEnumLowCardinality:
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
@@ -161,11 +184,12 @@ func (m *fieldMapper) ColumnFor(ctx context.Context, key *telemetrytypes.Telemet
func (m *fieldMapper) ColumnExpressionFor(
ctx context.Context,
tsStart, tsEnd uint64,
field *telemetrytypes.TelemetryFieldKey,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
) (string, error) {
colName, err := m.FieldFor(ctx, field)
colName, err := m.FieldFor(ctx, tsStart, tsEnd, field)
if errors.Is(err, qbtypes.ErrColumnNotFound) {
// the key didn't have the right context to be added to the query
// we try to use the context we know of
@@ -175,7 +199,7 @@ func (m *fieldMapper) ColumnExpressionFor(
if _, ok := logsV2Columns[field.Name]; ok {
// if it is, attach the column name directly
field.FieldContext = telemetrytypes.FieldContextLog
colName, _ = m.FieldFor(ctx, field)
colName, _ = m.FieldFor(ctx, tsStart, tsEnd, field)
} else {
// - the context is not provided
// - there are not keys for the field
@@ -193,12 +217,12 @@ func (m *fieldMapper) ColumnExpressionFor(
}
} else if len(keysForField) == 1 {
// we have a single key for the field, use it
colName, _ = m.FieldFor(ctx, keysForField[0])
colName, _ = m.FieldFor(ctx, tsStart, tsEnd, keysForField[0])
} else {
// select any non-empty value from the keys
args := []string{}
for _, key := range keysForField {
colName, _ = m.FieldFor(ctx, key)
colName, _ = m.FieldFor(ctx, tsStart, tsEnd, key)
args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", colName, colName))
}
colName = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", "))

View File

@@ -3,10 +3,14 @@ package telemetrylogs
import (
"context"
"testing"
"time"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/types/authtypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -164,7 +168,8 @@ func TestGetColumn(t *testing.T) {
},
}
fm := NewFieldMapper()
mockStore := telemetrytypestest.NewMockKeyEvolutionMetadataStore(nil)
fm := NewFieldMapper(mockStore)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@@ -229,7 +234,7 @@ func TestGetFieldKeyName(t *testing.T) {
expectedError: nil,
},
{
name: "Map column type - resource attribute",
name: "Map column type - resource attribute - json",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
@@ -238,7 +243,7 @@ func TestGetFieldKeyName(t *testing.T) {
expectedError: nil,
},
{
name: "Map column type - resource attribute - Materialized",
name: "Map column type - resource attribute - Materialized - json",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
@@ -261,8 +266,96 @@ func TestGetFieldKeyName(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fm := NewFieldMapper()
result, err := fm.FieldFor(ctx, &tc.key)
mockStore := telemetrytypestest.NewMockKeyEvolutionMetadataStore(nil)
fm := NewFieldMapper(mockStore)
result, err := fm.FieldFor(ctx, 0, 0, &tc.key)
if tc.expectedError != nil {
assert.Equal(t, tc.expectedError, err)
} else {
require.NoError(t, err)
assert.Equal(t, tc.expectedResult, result)
}
})
}
}
func TestFieldForWithEvolutionMetadata(t *testing.T) {
ctx := context.Background()
orgId := valuer.GenerateUUID()
ctx = authtypes.NewContextWithClaims(ctx, authtypes.Claims{
OrgID: orgId.String(),
})
// Create a test release time
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
releaseTimeNano := uint64(releaseTime.UnixNano())
// Common test key
serviceNameKey := telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
}
// Common expected results
jsonOnlyResult := "resource.`service.name`::String"
fallbackResult := "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)"
// Set up stores once
storeWithMetadata := telemetrytypestest.NewMockKeyEvolutionMetadataStore(mockKeyEvolutionMetadata(ctx, orgId, releaseTime))
storeWithoutMetadata := telemetrytypestest.NewMockKeyEvolutionMetadataStore(nil)
testCases := []struct {
name string
tsStart uint64
tsEnd uint64
key telemetrytypes.TelemetryFieldKey
mockStore *telemetrytypestest.MockKeyEvolutionMetadataStore
expectedResult string
expectedError error
}{
{
name: "Resource attribute - tsStart before release time (use fallback with multiIf)",
tsStart: releaseTimeNano - uint64(24*time.Hour.Nanoseconds()),
tsEnd: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
key: serviceNameKey,
mockStore: storeWithMetadata,
expectedResult: fallbackResult,
expectedError: nil,
},
{
name: "Resource attribute - tsStart after release time (use new json column)",
tsStart: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
tsEnd: releaseTimeNano + uint64(48*time.Hour.Nanoseconds()),
key: serviceNameKey,
mockStore: storeWithMetadata,
expectedResult: jsonOnlyResult,
expectedError: nil,
},
{
name: "Resource attribute - no evolution metadata (use fallback with multiIf)",
tsStart: releaseTimeNano,
tsEnd: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
key: serviceNameKey,
mockStore: storeWithoutMetadata,
expectedResult: fallbackResult,
expectedError: nil,
},
{
name: "Resource attribute - tsStart exactly at release time (use fallback with multiIf)",
tsStart: releaseTimeNano,
tsEnd: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
key: serviceNameKey,
mockStore: storeWithMetadata,
expectedResult: fallbackResult,
expectedError: nil,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fm := NewFieldMapper(tc.mockStore)
result, err := fm.FieldFor(ctx, tc.tsStart, tc.tsEnd, &tc.key)
if tc.expectedError != nil {
assert.Equal(t, tc.expectedError, err)

View File

@@ -5,12 +5,14 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/stretchr/testify/require"
)
// TestLikeAndILikeWithoutWildcards_Warns Tests that LIKE/ILIKE without wildcards add warnings and include docs URL
func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) {
fm := NewFieldMapper()
storeWithMetadata := telemetrytypestest.NewMockKeyEvolutionMetadataStore(nil)
fm := NewFieldMapper(storeWithMetadata)
cb := NewConditionBuilder(fm)
keys := buildCompleteFieldKeyMap()
@@ -33,7 +35,7 @@ func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) {
for _, expr := range tests {
t.Run(expr, func(t *testing.T) {
clause, err := querybuilder.PrepareWhereClause(expr, opts, 0, 0)
clause, err := querybuilder.PrepareWhereClause(expr, opts, 0, 0)
require.NoError(t, err)
require.NotNil(t, clause)
@@ -46,7 +48,8 @@ func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) {
// TestLikeAndILikeWithWildcards_NoWarn Tests that LIKE/ILIKE with wildcards do not add warnings
func TestLikeAndILikeWithWildcards_NoWarn(t *testing.T) {
fm := NewFieldMapper()
storeWithMetadata := telemetrytypestest.NewMockKeyEvolutionMetadataStore(nil)
fm := NewFieldMapper(storeWithMetadata)
cb := NewConditionBuilder(fm)
keys := buildCompleteFieldKeyMap()
@@ -69,7 +72,7 @@ func TestLikeAndILikeWithWildcards_NoWarn(t *testing.T) {
for _, expr := range tests {
t.Run(expr, func(t *testing.T) {
clause, err := querybuilder.PrepareWhereClause(expr, opts, 0, 0)
clause, err := querybuilder.PrepareWhereClause(expr, opts, 0, 0)
require.NoError(t, err)
require.NotNil(t, clause)

View File

@@ -7,13 +7,15 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/huandu/go-sqlbuilder"
"github.com/stretchr/testify/require"
)
// TestFilterExprLogsBodyJSON tests a comprehensive set of query patterns for body JSON search
func TestFilterExprLogsBodyJSON(t *testing.T) {
fm := NewFieldMapper()
storeWithMetadata := telemetrytypestest.NewMockKeyEvolutionMetadataStore(nil)
fm := NewFieldMapper(storeWithMetadata)
cb := NewConditionBuilder(fm)
// Define a comprehensive set of field keys to support all test cases

View File

@@ -9,13 +9,15 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/huandu/go-sqlbuilder"
"github.com/stretchr/testify/require"
)
// TestFilterExprLogs tests a comprehensive set of query patterns for logs search
func TestFilterExprLogs(t *testing.T) {
fm := NewFieldMapper()
storeWithMetadata := telemetrytypestest.NewMockKeyEvolutionMetadataStore(nil)
fm := NewFieldMapper(storeWithMetadata)
cb := NewConditionBuilder(fm)
// Define a comprehensive set of field keys to support all test cases
@@ -2422,7 +2424,8 @@ func TestFilterExprLogs(t *testing.T) {
// TestFilterExprLogs tests a comprehensive set of query patterns for logs search
func TestFilterExprLogsConflictNegation(t *testing.T) {
fm := NewFieldMapper()
storeWithMetadata := telemetrytypestest.NewMockKeyEvolutionMetadataStore(nil)
fm := NewFieldMapper(storeWithMetadata)
cb := NewConditionBuilder(fm)
// Define a comprehensive set of field keys to support all test cases

View File

@@ -0,0 +1,158 @@
package telemetrylogs
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/cachetypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/huandu/go-sqlbuilder"
)
const (
// KeyEvolutionMetadataTableName is the table name for key evolution metadata
KeyEvolutionMetadataTableName = "distributed_column_key_evolution_metadata"
// KeyEvolutionMetadataDBName is the database name for key evolution metadata
KeyEvolutionMetadataDBName = "signoz_logs"
// KeyEvolutionMetadataCacheKeyPrefix is the prefix for cache keys
KeyEvolutionMetadataCacheKeyPrefix = "key_evolution_metadata:"
base_column = "base_column"
base_column_type = "base_column_type"
new_column = "new_column"
new_column_type = "new_column_type"
release_time = "release_time"
)
// CachedKeyEvolutionMetadata is a cacheable type for storing key evolution metadata
type CachedKeyEvolutionMetadata struct {
Keys []*telemetrytypes.KeyEvolutionMetadataKey `json:"keys"`
}
var _ cachetypes.Cacheable = (*CachedKeyEvolutionMetadata)(nil)
func (c *CachedKeyEvolutionMetadata) MarshalBinary() ([]byte, error) {
return json.Marshal(c)
}
func (c *CachedKeyEvolutionMetadata) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, c)
}
// Each key can have multiple evolution entries, allowing for multiple column transitions over time.
// The cache is organized by orgId, then by key name.
type KeyEvolutionMetadata struct {
cache cache.Cache
telemetryStore telemetrystore.TelemetryStore
logger *slog.Logger
}
var _ telemetrytypes.KeyEvolutionMetadataStore = (*KeyEvolutionMetadata)(nil)
func NewKeyEvolutionMetadata(telemetryStore telemetrystore.TelemetryStore, cache cache.Cache, logger *slog.Logger) *KeyEvolutionMetadata {
return &KeyEvolutionMetadata{
cache: cache,
telemetryStore: telemetryStore,
logger: logger,
}
}
func (k *KeyEvolutionMetadata) fetchFromClickHouse(ctx context.Context, orgID valuer.UUID, key string) []*telemetrytypes.KeyEvolutionMetadataKey {
store := k.telemetryStore
logger := k.logger
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// Build query to fetch all key evolution metadata
sb := sqlbuilder.NewSelectBuilder()
sb.Select(
base_column,
base_column_type,
new_column,
new_column_type,
release_time,
)
sb.From(fmt.Sprintf("%s.%s", KeyEvolutionMetadataDBName, KeyEvolutionMetadataTableName))
sb.OrderBy(base_column, release_time)
sb.Where(sb.E(base_column, key))
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := store.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
logger.WarnContext(ctx, "Failed to fetch key evolution metadata from ClickHouse", "error", err)
return nil
}
defer rows.Close()
// Group metadata by base_column
metadataByKey := make(map[string][]*telemetrytypes.KeyEvolutionMetadataKey)
for rows.Next() {
var (
baseColumn string
baseColumnType string
newColumn string
newColumnType string
releaseTime uint64
)
if err := rows.Scan(&baseColumn, &baseColumnType, &newColumn, &newColumnType, &releaseTime); err != nil {
logger.WarnContext(ctx, "Failed to scan key evolution metadata row", "error", err)
continue
}
key := &telemetrytypes.KeyEvolutionMetadataKey{
BaseColumn: baseColumn,
BaseColumnType: baseColumnType,
NewColumn: newColumn,
NewColumnType: newColumnType,
ReleaseTime: time.Unix(0, int64(releaseTime)),
}
metadataByKey[baseColumn] = append(metadataByKey[baseColumn], key)
}
if err := rows.Err(); err != nil {
logger.WarnContext(ctx, "Error iterating key evolution metadata rows", "error", err)
return nil
}
return metadataByKey[key]
}
// Get retrieves all metadata keys for the given key name and orgId from cache.
// Returns an empty slice if the key is not found in cache.
func (k *KeyEvolutionMetadata) Get(ctx context.Context, orgId valuer.UUID, keyName string) []*telemetrytypes.KeyEvolutionMetadataKey {
cacheKey := KeyEvolutionMetadataCacheKeyPrefix + keyName
cachedData := &CachedKeyEvolutionMetadata{}
if err := k.cache.Get(ctx, orgId, cacheKey, cachedData); err != nil {
if !errors.Ast(err, errors.TypeNotFound) {
k.logger.ErrorContext(ctx, "Failed to get key evolution metadata from cache", "error", err)
return nil
}
// Cache miss - fetch from ClickHouse and try again
metadata := k.fetchFromClickHouse(ctx, orgId, keyName)
if metadata != nil {
cacheKey := KeyEvolutionMetadataCacheKeyPrefix + keyName
cachedData = &CachedKeyEvolutionMetadata{Keys: metadata}
if err := k.cache.Set(ctx, orgId, cacheKey, cachedData, 24*time.Hour); err != nil {
k.logger.WarnContext(ctx, "Failed to set key evolution metadata in cache", "key", keyName, "error", err)
}
}
}
return cachedData.Keys
}

View File

@@ -0,0 +1,213 @@
package telemetrylogs
import (
"context"
"log/slog"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/cache/cachetest"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var (
clickHouseQueryPattern = "SELECT.*base_column.*FROM.*distributed_column_key_evolution_metadata.*WHERE.*base_column.*=.*"
clickHouseColumns = []cmock.ColumnType{
{Name: base_column, Type: "String"},
{Name: base_column_type, Type: "String"},
{Name: new_column, Type: "String"},
{Name: new_column_type, Type: "String"},
{Name: release_time, Type: "UInt64"},
}
)
func newTestCache(t *testing.T) cache.Cache {
t.Helper()
testCache, err := cachetest.New(cache.Config{
Provider: "memory",
Memory: cache.Memory{
NumCounters: 10 * 1000,
MaxCost: 1 << 26,
},
})
require.NoError(t, err)
return testCache
}
func newTestTelemetryStore() *telemetrystoretest.Provider {
return telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherRegexp)
}
func newKeyEvolutionMetadata(telemetryStore telemetrystore.TelemetryStore, cache cache.Cache) *KeyEvolutionMetadata {
return NewKeyEvolutionMetadata(telemetryStore, cache, slog.Default())
}
func createMockRows(values [][]any) *cmock.Rows {
return cmock.NewRows(clickHouseColumns, values)
}
func assertMetadataEqual(t *testing.T, expected, actual *telemetrytypes.KeyEvolutionMetadataKey) {
t.Helper()
assert.Equal(t, expected.BaseColumn, actual.BaseColumn)
assert.Equal(t, expected.BaseColumnType, actual.BaseColumnType)
assert.Equal(t, expected.NewColumn, actual.NewColumn)
assert.Equal(t, expected.NewColumnType, actual.NewColumnType)
assert.Equal(t, expected.ReleaseTime, actual.ReleaseTime)
}
func TestKeyEvolutionMetadata_Get_CacheHit(t *testing.T) {
ctx := context.Background()
orgId := valuer.GenerateUUID()
keyName := "service.name"
testCache := newTestCache(t)
telemetryStore := newTestTelemetryStore()
kem := newKeyEvolutionMetadata(telemetryStore, testCache)
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
expectedMetadata := []*telemetrytypes.KeyEvolutionMetadataKey{
{
BaseColumn: "resources_string",
BaseColumnType: "Map(LowCardinality(String), String)",
NewColumn: "resource",
NewColumnType: "JSON(max_dynamic_paths=100)",
ReleaseTime: releaseTime,
},
}
cacheKey := KeyEvolutionMetadataCacheKeyPrefix + keyName
cachedData := &CachedKeyEvolutionMetadata{Keys: expectedMetadata}
err := testCache.Set(ctx, orgId, cacheKey, cachedData, 24*time.Hour)
require.NoError(t, err)
result := kem.Get(ctx, orgId, keyName)
require.Len(t, result, 1)
assertMetadataEqual(t, expectedMetadata[0], result[0])
}
func TestKeyEvolutionMetadata_Get_CacheMiss_FetchFromClickHouse(t *testing.T) {
ctx := context.Background()
orgId := valuer.GenerateUUID()
keyName := "resources_string"
testCache := newTestCache(t)
telemetryStore := newTestTelemetryStore()
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
values := [][]any{
{
"resources_string",
"Map(LowCardinality(String), String)",
"resource",
"JSON(max_dynamic_paths=100)",
uint64(releaseTime.UnixNano()),
},
}
rows := createMockRows(values)
telemetryStore.Mock().ExpectQuery(clickHouseQueryPattern).WithArgs(keyName).WillReturnRows(rows)
kem := newKeyEvolutionMetadata(telemetryStore, testCache)
result := kem.Get(ctx, orgId, keyName)
require.Len(t, result, 1)
assert.Equal(t, "resources_string", result[0].BaseColumn)
assert.Equal(t, "Map(LowCardinality(String), String)", result[0].BaseColumnType)
assert.Equal(t, "resource", result[0].NewColumn)
assert.Equal(t, "JSON(max_dynamic_paths=100)", result[0].NewColumnType)
assert.Equal(t, releaseTime.UnixNano(), result[0].ReleaseTime.UnixNano())
// Verify data was cached
var cachedData CachedKeyEvolutionMetadata
cacheKey := KeyEvolutionMetadataCacheKeyPrefix + keyName
err := testCache.Get(ctx, orgId, cacheKey, &cachedData)
require.NoError(t, err)
require.Len(t, cachedData.Keys, 1)
assert.Equal(t, result[0].BaseColumn, cachedData.Keys[0].BaseColumn)
}
func TestKeyEvolutionMetadata_Get_MultipleMetadataEntries(t *testing.T) {
ctx := context.Background()
orgId := valuer.GenerateUUID()
keyName := "resources_string"
testCache := newTestCache(t)
telemetryStore := newTestTelemetryStore()
releaseTime1 := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
releaseTime2 := time.Date(2024, 2, 15, 10, 0, 0, 0, time.UTC)
values := [][]any{
{
"resources_string",
"Map(LowCardinality(String), String)",
"resource",
"JSON(max_dynamic_paths=100)",
uint64(releaseTime1.UnixNano()),
},
{
"resources_string",
"Map(LowCardinality(String), String)",
"resource_v2",
"JSON(max_dynamic_paths=100, max_dynamic_path_depth=10)",
uint64(releaseTime2.UnixNano()),
},
}
rows := createMockRows(values)
telemetryStore.Mock().ExpectQuery(clickHouseQueryPattern).WithArgs(keyName).WillReturnRows(rows)
kem := newKeyEvolutionMetadata(telemetryStore, testCache)
result := kem.Get(ctx, orgId, keyName)
require.Len(t, result, 2)
assert.Equal(t, "resources_string", result[0].BaseColumn)
assert.Equal(t, "resource", result[0].NewColumn)
assert.Equal(t, "JSON(max_dynamic_paths=100)", result[0].NewColumnType)
assert.Equal(t, releaseTime1.UnixNano(), result[0].ReleaseTime.UnixNano())
assert.Equal(t, "resource_v2", result[1].NewColumn)
assert.Equal(t, "JSON(max_dynamic_paths=100, max_dynamic_path_depth=10)", result[1].NewColumnType)
assert.Equal(t, releaseTime2.UnixNano(), result[1].ReleaseTime.UnixNano())
}
func TestKeyEvolutionMetadata_Get_EmptyResultFromClickHouse(t *testing.T) {
ctx := context.Background()
orgId := valuer.GenerateUUID()
keyName := "resources_string"
testCache := newTestCache(t)
telemetryStore := newTestTelemetryStore()
rows := createMockRows([][]any{})
telemetryStore.Mock().ExpectQuery(clickHouseQueryPattern).WithArgs(keyName).WillReturnRows(rows)
kem := newKeyEvolutionMetadata(telemetryStore, testCache)
result := kem.Get(ctx, orgId, keyName)
assert.Empty(t, result)
}
func TestKeyEvolutionMetadata_Get_ClickHouseQueryError(t *testing.T) {
ctx := context.Background()
orgId := valuer.GenerateUUID()
keyName := "resources_string"
testCache := newTestCache(t)
telemetryStore := newTestTelemetryStore()
telemetryStore.Mock().ExpectQuery(clickHouseQueryPattern).WithArgs(keyName).WillReturnError(assert.AnError)
kem := newKeyEvolutionMetadata(telemetryStore, testCache)
result := kem.Get(ctx, orgId, keyName)
assert.Empty(t, result)
}

View File

@@ -247,7 +247,7 @@ func (b *logQueryStatementBuilder) buildListQuery(
continue
}
// get column expression for the field - use array index directly to avoid pointer to loop variable
colExpr, err := b.fm.ColumnExpressionFor(ctx, &query.SelectFields[index], keys)
colExpr, err := b.fm.ColumnExpressionFor(ctx, start, end, &query.SelectFields[index], keys)
if err != nil {
return nil, err
}
@@ -267,7 +267,7 @@ func (b *logQueryStatementBuilder) buildListQuery(
// Add order by
for _, orderBy := range query.Order {
colExpr, err := b.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys)
colExpr, err := b.fm.ColumnExpressionFor(ctx, start, end, &orderBy.Key.TelemetryFieldKey, keys)
if err != nil {
return nil, err
}
@@ -333,7 +333,7 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
// Keep original column expressions so we can build the tuple
fieldNames := make([]string, 0, len(query.GroupBy))
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonBodyPrefix, b.jsonKeyToKey)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonBodyPrefix, b.jsonKeyToKey)
if err != nil {
return nil, err
}
@@ -347,7 +347,7 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
allAggChArgs := make([]any, 0)
for i, agg := range query.Aggregations {
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
ctx, agg.Expression,
ctx, start, end, agg.Expression,
uint64(query.StepInterval.Seconds()),
keys,
)
@@ -479,7 +479,7 @@ func (b *logQueryStatementBuilder) buildScalarQuery(
var allGroupByArgs []any
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonBodyPrefix, b.jsonKeyToKey)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonBodyPrefix, b.jsonKeyToKey)
if err != nil {
return nil, err
}
@@ -496,7 +496,7 @@ func (b *logQueryStatementBuilder) buildScalarQuery(
for idx := range query.Aggregations {
aggExpr := query.Aggregations[idx]
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
ctx, aggExpr.Expression,
ctx, start, end, aggExpr.Expression,
rateInterval,
keys,
)

View File

@@ -8,9 +8,11 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
"github.com/SigNoz/signoz/pkg/types/authtypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
@@ -38,7 +40,14 @@ func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.LogAggregation
}
func TestStatementBuilderTimeSeries(t *testing.T) {
// Create a test release time
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
releaseTimeNano := uint64(releaseTime.UnixNano())
cases := []struct {
startTs uint64
endTs uint64
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
@@ -46,14 +55,16 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
expectedErr error
}{
{
name: "Time series with limit",
startTs: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
endTs: releaseTimeNano + uint64(48*time.Hour.Nanoseconds()),
name: "Time series with limit and count distinct on service.name",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Aggregations: []qbtypes.LogAggregation{
{
Expression: "count()",
Expression: "count_distinct(service.name)",
},
},
Filter: &qbtypes.Filter{
@@ -69,20 +80,22 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1747945619), uint64(1747983448), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(resource.`service.name`::String IS NOT NULL, resource.`service.name`::String, NULL)) AS `service.name`, countDistinct(multiIf(resource.`service.name`::String IS NOT NULL, resource.`service.name`::String, NULL)) AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(resource.`service.name`::String IS NOT NULL, resource.`service.name`::String, NULL)) AS `service.name`, countDistinct(multiIf(resource.`service.name`::String IS NOT NULL, resource.`service.name`::String, NULL)) AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1705397400), uint64(1705485600), "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600), 10, "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600)},
},
expectedErr: nil,
},
{
name: "Time series with OR b/w resource attr and attribute filter",
startTs: releaseTimeNano - uint64(24*time.Hour.Nanoseconds()),
endTs: releaseTimeNano + uint64(48*time.Hour.Nanoseconds()),
name: "Time series with OR b/w resource attr and attribute filter and count distinct on service.name",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalTraces,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Aggregations: []qbtypes.LogAggregation{
{
Expression: "count()",
Expression: "count_distinct(service.name)",
},
},
Filter: &qbtypes.Filter{
@@ -98,12 +111,14 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE ((simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) OR true) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) = ? AND multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL) OR (attributes_string['http.method'] = ? AND mapContains(attributes_string, 'http.method') = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) = ? AND multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL) OR (attributes_string['http.method'] = ? AND mapContains(attributes_string, 'http.method') = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), "redis-manual", "GET", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10, "redis-manual", "GET", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE ((simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) OR true) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, countDistinct(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) = ? AND multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL) OR (attributes_string['http.method'] = ? AND mapContains(attributes_string, 'http.method') = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, countDistinct(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) = ? AND multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL) OR (attributes_string['http.method'] = ? AND mapContains(attributes_string, 'http.method') = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1705224600), uint64(1705485600), "redis-manual", "GET", true, "1705226400000000000", uint64(1705224600), "1705485600000000000", uint64(1705485600), 10, "redis-manual", "GET", true, "1705226400000000000", uint64(1705224600), "1705485600000000000", uint64(1705485600)},
},
expectedErr: nil,
},
{
startTs: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
endTs: releaseTimeNano + uint64(48*time.Hour.Nanoseconds()),
name: "Time series with limit + custom order by",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
@@ -137,12 +152,14 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY `service.name` desc LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name` ORDER BY `service.name` desc, ts desc",
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1747945619), uint64(1747983448), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(resource.`service.name`::String IS NOT NULL, resource.`service.name`::String, NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY `service.name` desc LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(resource.`service.name`::String IS NOT NULL, resource.`service.name`::String, NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name` ORDER BY `service.name` desc, ts desc",
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1705397400), uint64(1705485600), "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600), 10, "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600)},
},
expectedErr: nil,
},
{
startTs: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
endTs: releaseTimeNano + uint64(48*time.Hour.Nanoseconds()),
name: "Time series with group by on materialized column",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
@@ -169,10 +186,12 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(`attribute_string_materialized$$key$$name_exists` = ?, `attribute_string_materialized$$key$$name`, NULL)) AS `materialized.key.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `materialized.key.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(`attribute_string_materialized$$key$$name_exists` = ?, `attribute_string_materialized$$key$$name`, NULL)) AS `materialized.key.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`materialized.key.name`) GLOBAL IN (SELECT `materialized.key.name` FROM __limit_cte) GROUP BY ts, `materialized.key.name`",
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10, true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1705397400), uint64(1705485600), true, "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600), 10, true, "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600)},
},
},
{
startTs: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
endTs: releaseTimeNano + uint64(48*time.Hour.Nanoseconds()),
name: "Time series with materialised column using or with regex operator",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
@@ -190,13 +209,20 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (true OR true) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND ((match(`attribute_string_materialized$$key$$name`, ?) AND `attribute_string_materialized$$key$$name_exists` = ?) OR (`attribute_string_materialized$$key$$name` = ? AND `attribute_string_materialized$$key$$name_exists` = ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY ts",
Args: []any{uint64(1747945619), uint64(1747983448), "redis.*", true, "memcached", true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
Args: []any{uint64(1705397400), uint64(1705485600), "redis.*", true, "memcached", true, "1705399200000000000", uint64(1705397400), "1705485600000000000", uint64(1705485600)},
},
expectedErr: nil,
},
}
fm := NewFieldMapper()
ctx := context.Background()
orgId := valuer.GenerateUUID()
ctx = authtypes.NewContextWithClaims(ctx, authtypes.Claims{
OrgID: orgId.String(),
})
storeWithMetadata := telemetrytypestest.NewMockKeyEvolutionMetadataStore(mockKeyEvolutionMetadata(ctx, orgId, releaseTime))
fm := NewFieldMapper(storeWithMetadata)
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
@@ -220,7 +246,7 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
q, err := statementBuilder.Build(ctx, c.startTs, c.endTs, c.requestType, c.query, nil)
if c.expectedErr != nil {
require.Error(t, err)
@@ -317,7 +343,8 @@ func TestStatementBuilderListQuery(t *testing.T) {
},
}
fm := NewFieldMapper()
storeWithMetadata := telemetrytypestest.NewMockKeyEvolutionMetadataStore(nil)
fm := NewFieldMapper(storeWithMetadata)
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
@@ -426,7 +453,8 @@ func TestStatementBuilderListQueryResourceTests(t *testing.T) {
},
}
fm := NewFieldMapper()
storeWithMetadata := telemetrytypestest.NewMockKeyEvolutionMetadataStore(nil)
fm := NewFieldMapper(storeWithMetadata)
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
@@ -500,7 +528,8 @@ func TestStatementBuilderTimeSeriesBodyGroupBy(t *testing.T) {
},
}
fm := NewFieldMapper()
storeWithMetadata := telemetrytypestest.NewMockKeyEvolutionMetadataStore(nil)
fm := NewFieldMapper(storeWithMetadata)
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
@@ -596,7 +625,8 @@ func TestStatementBuilderListQueryServiceCollision(t *testing.T) {
},
}
fm := NewFieldMapper()
storeWithMetadata := telemetrytypestest.NewMockKeyEvolutionMetadataStore(nil)
fm := NewFieldMapper(storeWithMetadata)
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMapCollision()

View File

@@ -1,9 +1,12 @@
package telemetrylogs
import (
"context"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// Helper function to limit string length for display
@@ -951,3 +954,23 @@ func buildCompleteFieldKeyMapCollision() map[string][]*telemetrytypes.TelemetryF
}
return keysMap
}
// mockKeyEvolutionMetadata builds a mock org-scoped key evolution metadata map for a column evolution.
func mockKeyEvolutionMetadata(ctx context.Context, orgId valuer.UUID, releaseTime time.Time) map[string]map[string][]*telemetrytypes.KeyEvolutionMetadataKey {
metadata := make(map[string]map[string][]*telemetrytypes.KeyEvolutionMetadataKey)
// Make sure to initialize the inner map before assignment to prevent 'assignment to entry in nil map'
orgIdStr := orgId.String()
if _, exists := metadata[orgIdStr]; !exists {
metadata[orgIdStr] = make(map[string][]*telemetrytypes.KeyEvolutionMetadataKey)
}
newKeyEvolutionMetadataEntry := telemetrytypes.KeyEvolutionMetadataKey{
BaseColumn: "resources_string",
BaseColumnType: "Map(LowCardinality(String), String)",
NewColumn: "resource",
NewColumnType: "JSON(max_dynamic_paths=100)",
ReleaseTime: releaseTime,
}
metadata[orgIdStr]["resources_string"] = []*telemetrytypes.KeyEvolutionMetadataKey{&newKeyEvolutionMetadataEntry}
return metadata
}

View File

@@ -25,8 +25,7 @@ func (c *conditionBuilder) ConditionFor(
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
_ uint64,
_ uint64,
tsStart, tsEnd uint64,
) (string, error) {
switch operator {
@@ -45,7 +44,7 @@ func (c *conditionBuilder) ConditionFor(
return "", nil
}
tblFieldName, err := c.fm.FieldFor(ctx, key)
tblFieldName, err := c.fm.FieldFor(ctx, tsStart, tsEnd, key)
if err != nil {
// if we don't have a table field name, we can't build a condition for related values
return "", nil

View File

@@ -53,7 +53,7 @@ func TestConditionFor(t *testing.T) {
for _, tc := range testCases {
sb := sqlbuilder.NewSelectBuilder()
t.Run(tc.name, func(t *testing.T) {
cond, err := conditionBuilder.ConditionFor(ctx, &tc.key, tc.operator, tc.value, sb, 0, 0)
cond, err := conditionBuilder.ConditionFor(ctx, &tc.key, tc.operator, tc.value, sb, 0, 0)
sb.Where(cond)
if tc.expectedError != nil {

View File

@@ -51,7 +51,7 @@ func (m *fieldMapper) ColumnFor(ctx context.Context, key *telemetrytypes.Telemet
return column, nil
}
func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (string, error) {
func (m *fieldMapper) FieldFor(ctx context.Context, startNs, endNs uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
column, err := m.getColumn(ctx, key)
if err != nil {
return "", err
@@ -69,11 +69,12 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
func (m *fieldMapper) ColumnExpressionFor(
ctx context.Context,
startNs, endNs uint64,
field *telemetrytypes.TelemetryFieldKey,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
) (string, error) {
colName, err := m.FieldFor(ctx, field)
colName, err := m.FieldFor(ctx, startNs, endNs, field)
if errors.Is(err, qbtypes.ErrColumnNotFound) {
// the key didn't have the right context to be added to the query
// we try to use the context we know of
@@ -83,7 +84,7 @@ func (m *fieldMapper) ColumnExpressionFor(
if _, ok := attributeMetadataColumns[field.Name]; ok {
// if it is, attach the column name directly
field.FieldContext = telemetrytypes.FieldContextSpan
colName, _ = m.FieldFor(ctx, field)
colName, _ = m.FieldFor(ctx, startNs, endNs, field)
} else {
// - the context is not provided
// - there are not keys for the field
@@ -101,12 +102,12 @@ func (m *fieldMapper) ColumnExpressionFor(
}
} else if len(keysForField) == 1 {
// we have a single key for the field, use it
colName, _ = m.FieldFor(ctx, keysForField[0])
colName, _ = m.FieldFor(ctx, startNs, endNs, keysForField[0])
} else {
// select any non-empty value from the keys
args := []string{}
for _, key := range keysForField {
colName, _ = m.FieldFor(ctx, key)
colName, _ = m.FieldFor(ctx, startNs, endNs, key)
args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", colName, colName))
}
colName = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", "))

View File

@@ -145,6 +145,8 @@ func TestGetFieldKeyName(t *testing.T) {
testCases := []struct {
name string
tsStart uint64
tsEnd uint64
key telemetrytypes.TelemetryFieldKey
expectedResult string
expectedError error
@@ -203,7 +205,7 @@ func TestGetFieldKeyName(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result, err := fm.FieldFor(ctx, &tc.key)
result, err := fm.FieldFor(ctx, tc.tsStart, tc.tsEnd, &tc.key)
if tc.expectedError != nil {
assert.Equal(t, tc.expectedError, err)

View File

@@ -942,18 +942,18 @@ func (t *telemetryMetaStore) getRelatedValues(ctx context.Context, fieldValueSel
FieldDataType: fieldValueSelector.FieldDataType,
}
selectColumn, err := t.fm.FieldFor(ctx, key)
selectColumn, err := t.fm.FieldFor(ctx, 0, 0, key)
if err != nil {
// we don't have a explicit column to select from the related metadata table
// so we will select either from resource_attributes or attributes table
// in that order
resourceColumn, _ := t.fm.FieldFor(ctx, &telemetrytypes.TelemetryFieldKey{
resourceColumn, _ := t.fm.FieldFor(ctx, 0, 0, &telemetrytypes.TelemetryFieldKey{
Name: key.Name,
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
})
attributeColumn, _ := t.fm.FieldFor(ctx, &telemetrytypes.TelemetryFieldKey{
attributeColumn, _ := t.fm.FieldFor(ctx, 0, 0, &telemetrytypes.TelemetryFieldKey{
Name: key.Name,
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
@@ -978,7 +978,7 @@ func (t *telemetryMetaStore) getRelatedValues(ctx context.Context, fieldValueSel
FieldMapper: t.fm,
ConditionBuilder: t.conditionBuilder,
FieldKeys: keys,
}, 0, 0)
}, 0, 0)
if err == nil {
sb.AddWhereClause(whereClause.WhereClause)
} else {
@@ -1002,20 +1002,20 @@ func (t *telemetryMetaStore) getRelatedValues(ctx context.Context, fieldValueSel
// search on attributes
key.FieldContext = telemetrytypes.FieldContextAttribute
cond, err := t.conditionBuilder.ConditionFor(ctx, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb, 0, 0)
cond, err := t.conditionBuilder.ConditionFor(ctx, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb, 0, 0)
if err == nil {
conds = append(conds, cond)
}
// search on resource
key.FieldContext = telemetrytypes.FieldContextResource
cond, err = t.conditionBuilder.ConditionFor(ctx, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb, 0, 0)
cond, err = t.conditionBuilder.ConditionFor(ctx, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb, 0, 0)
if err == nil {
conds = append(conds, cond)
}
key.FieldContext = origContext
} else {
cond, err := t.conditionBuilder.ConditionFor(ctx, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb, 0, 0)
cond, err := t.conditionBuilder.ConditionFor(ctx, key, qbtypes.FilterOperatorContains, fieldValueSelector.Value, sb, 0, 0)
if err == nil {
conds = append(conds, cond)
}

View File

@@ -120,7 +120,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath(
stepSec,
))
for _, g := range query.GroupBy {
col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
col, err := b.fm.ColumnExpressionFor(ctx, start, end, &g.TelemetryFieldKey, keys)
if err != nil {
return "", []any{}, err
}
@@ -148,7 +148,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath(
FieldKeys: keys,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
Variables: variables,
}, start, end)
}, start, end)
if err != nil {
return "", []any{}, err
}
@@ -200,7 +200,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
))
for _, g := range query.GroupBy {
col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
col, err := b.fm.ColumnExpressionFor(ctx, start, end, &g.TelemetryFieldKey, keys)
if err != nil {
return "", nil, err
}
@@ -231,7 +231,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
FieldKeys: keys,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
Variables: variables,
}, start, end)
}, start, end)
if err != nil {
return "", nil, err
}
@@ -270,7 +270,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
stepSec,
))
for _, g := range query.GroupBy {
col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
col, err := b.fm.ColumnExpressionFor(ctx, start, end, &g.TelemetryFieldKey, keys)
if err != nil {
return "", nil, err
}
@@ -295,7 +295,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
FieldKeys: keys,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
Variables: variables,
}, start, end)
}, start, end)
if err != nil {
return "", nil, err
}

View File

@@ -23,6 +23,8 @@ func NewConditionBuilder(fm qbtypes.FieldMapper) *conditionBuilder {
func (c *conditionBuilder) conditionFor(
ctx context.Context,
startNs uint64,
endNs uint64,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
@@ -39,7 +41,7 @@ func (c *conditionBuilder) conditionFor(
value = querybuilder.FormatValueForContains(value)
}
tblFieldName, err := c.fm.FieldFor(ctx, key)
tblFieldName, err := c.fm.FieldFor(ctx, startNs, endNs, key)
if err != nil {
return "", err
}
@@ -139,10 +141,10 @@ func (c *conditionBuilder) ConditionFor(
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
_ uint64,
_ uint64,
startNs uint64,
endNs uint64,
) (string, error) {
condition, err := c.conditionFor(ctx, key, operator, value, sb)
condition, err := c.conditionFor(ctx, startNs, endNs, key, operator, value, sb)
if err != nil {
return "", err
}

View File

@@ -65,7 +65,7 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
return nil, qbtypes.ErrColumnNotFound
}
func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (string, error) {
func (m *fieldMapper) FieldFor(ctx context.Context, startNs, endNs uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
column, err := m.getColumn(ctx, key)
if err != nil {
return "", err
@@ -92,11 +92,12 @@ func (m *fieldMapper) ColumnFor(ctx context.Context, key *telemetrytypes.Telemet
func (m *fieldMapper) ColumnExpressionFor(
ctx context.Context,
startNs, endNs uint64,
field *telemetrytypes.TelemetryFieldKey,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
) (string, error) {
colName, err := m.FieldFor(ctx, field)
colName, err := m.FieldFor(ctx, startNs, endNs, field)
if err != nil {
return "", err
}

View File

@@ -207,7 +207,7 @@ func TestGetFieldKeyName(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result, err := fm.FieldFor(ctx, &tc.key)
result, err := fm.FieldFor(ctx, 0, 0, &tc.key)
if tc.expectedError != nil {
assert.Equal(t, tc.expectedError, err)

View File

@@ -359,7 +359,7 @@ func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
sb.Select("fingerprint")
for _, g := range query.GroupBy {
col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
col, err := b.fm.ColumnExpressionFor(ctx, start, end, &g.TelemetryFieldKey, keys)
if err != nil {
return "", nil, err
}

View File

@@ -29,6 +29,8 @@ func NewConditionBuilder(fm qbtypes.FieldMapper) *conditionBuilder {
func (c *conditionBuilder) conditionFor(
ctx context.Context,
startNs uint64,
endNs uint64,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
@@ -52,7 +54,7 @@ func (c *conditionBuilder) conditionFor(
}
// then ask the mapper for the actual SQL reference
tblFieldName, err := c.fm.FieldFor(ctx, key)
tblFieldName, err := c.fm.FieldFor(ctx, startNs, endNs, key)
if err != nil {
return "", err
}
@@ -239,20 +241,20 @@ func (c *conditionBuilder) ConditionFor(
value any,
sb *sqlbuilder.SelectBuilder,
startNs uint64,
_ uint64,
endNs uint64,
) (string, error) {
if c.isSpanScopeField(key.Name) {
return c.buildSpanScopeCondition(key, operator, value, startNs)
}
condition, err := c.conditionFor(ctx, key, operator, value, sb)
condition, err := c.conditionFor(ctx, startNs, endNs, key, operator, value, sb)
if err != nil {
return "", err
}
if operator.AddDefaultExistsFilter() {
// skip adding exists filter for intrinsic fields
field, _ := c.fm.FieldFor(ctx, key)
field, _ := c.fm.FieldFor(ctx, startNs, endNs, key)
if slices.Contains(maps.Keys(IntrinsicFields), field) ||
slices.Contains(maps.Keys(IntrinsicFieldsDeprecated), field) ||
slices.Contains(maps.Keys(CalculatedFields), field) ||
@@ -260,7 +262,7 @@ func (c *conditionBuilder) ConditionFor(
return condition, nil
}
existsCondition, err := c.conditionFor(ctx, key, qbtypes.FilterOperatorExists, nil, sb)
existsCondition, err := c.conditionFor(ctx, startNs, endNs, key, qbtypes.FilterOperatorExists, nil, sb)
if err != nil {
return "", err
}

View File

@@ -225,6 +225,7 @@ func (m *defaultFieldMapper) ColumnFor(
// otherwise it returns qbtypes.ErrColumnNotFound
func (m *defaultFieldMapper) FieldFor(
ctx context.Context,
startNs, endNs uint64,
key *telemetrytypes.TelemetryFieldKey,
) (string, error) {
// Special handling for span scope fields
@@ -297,11 +298,12 @@ func (m *defaultFieldMapper) FieldFor(
// if it exists otherwise it returns qbtypes.ErrColumnNotFound
func (m *defaultFieldMapper) ColumnExpressionFor(
ctx context.Context,
startNs, endNs uint64,
field *telemetrytypes.TelemetryFieldKey,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
) (string, error) {
colName, err := m.FieldFor(ctx, field)
colName, err := m.FieldFor(ctx, startNs, endNs, field)
if errors.Is(err, qbtypes.ErrColumnNotFound) {
// the key didn't have the right context to be added to the query
// we try to use the context we know of
@@ -311,7 +313,7 @@ func (m *defaultFieldMapper) ColumnExpressionFor(
if _, ok := indexV3Columns[field.Name]; ok {
// if it is, attach the column name directly
field.FieldContext = telemetrytypes.FieldContextSpan
colName, _ = m.FieldFor(ctx, field)
colName, _ = m.FieldFor(ctx, startNs, endNs, field)
} else {
// - the context is not provided
// - there are not keys for the field
@@ -329,12 +331,12 @@ func (m *defaultFieldMapper) ColumnExpressionFor(
}
} else if len(keysForField) == 1 {
// we have a single key for the field, use it
colName, _ = m.FieldFor(ctx, keysForField[0])
colName, _ = m.FieldFor(ctx, startNs, endNs, keysForField[0])
} else {
// select any non-empty value from the keys
args := []string{}
for _, key := range keysForField {
colName, _ = m.FieldFor(ctx, key)
colName, _ = m.FieldFor(ctx, startNs, endNs, key)
args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", colName, colName))
}
colName = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", "))

View File

@@ -92,7 +92,7 @@ func TestGetFieldKeyName(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fm := NewFieldMapper()
result, err := fm.FieldFor(ctx, &tc.key)
result, err := fm.FieldFor(ctx, 0, 0, &tc.key)
if tc.expectedError != nil {
assert.Equal(t, tc.expectedError, err)

View File

@@ -293,7 +293,7 @@ func (b *traceQueryStatementBuilder) buildListQuery(
// TODO: should we deprecate `SelectFields` and return everything from a span like we do for logs?
for _, field := range selectedFields {
colExpr, err := b.fm.ColumnExpressionFor(ctx, &field, keys)
colExpr, err := b.fm.ColumnExpressionFor(ctx, start, end, &field, keys)
if err != nil {
return nil, err
}
@@ -311,7 +311,7 @@ func (b *traceQueryStatementBuilder) buildListQuery(
// Add order by
for _, orderBy := range query.Order {
colExpr, err := b.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys)
colExpr, err := b.fm.ColumnExpressionFor(ctx, start, end, &orderBy.Key.TelemetryFieldKey, keys)
if err != nil {
return nil, err
}
@@ -495,7 +495,7 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
// Keep original column expressions so we can build the tuple
fieldNames := make([]string, 0, len(query.GroupBy))
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, "", nil)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, "", nil)
if err != nil {
return nil, err
}
@@ -509,7 +509,7 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
allAggChArgs := make([]any, 0)
for i, agg := range query.Aggregations {
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
ctx, agg.Expression,
ctx, start, end, agg.Expression,
uint64(query.StepInterval.Seconds()),
keys,
)
@@ -637,7 +637,7 @@ func (b *traceQueryStatementBuilder) buildScalarQuery(
var allGroupByArgs []any
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, "", nil)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, start, end, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, "", nil)
if err != nil {
return nil, err
}
@@ -654,7 +654,7 @@ func (b *traceQueryStatementBuilder) buildScalarQuery(
for idx := range query.Aggregations {
aggExpr := query.Aggregations[idx]
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
ctx, aggExpr.Expression,
ctx, start, end, aggExpr.Expression,
rateInterval,
keys,
)
@@ -746,7 +746,7 @@ func (b *traceQueryStatementBuilder) addFilterCondition(
FieldKeys: keys,
SkipResourceFilter: true,
Variables: variables,
}, start, end)
}, start, end)
if err != nil {
return nil, err

View File

@@ -237,7 +237,7 @@ func (b *traceOperatorCTEBuilder) buildQueryCTE(ctx context.Context, queryName s
ConditionBuilder: b.stmtBuilder.cb,
FieldKeys: keys,
SkipResourceFilter: true,
}, b.start, b.end,
}, b.start, b.end,
)
if err != nil {
b.stmtBuilder.logger.ErrorContext(ctx, "Failed to prepare where clause", "error", err, "filter", query.Filter.Expression)
@@ -450,7 +450,7 @@ func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFrom
if selectedFields[field.Name] {
continue
}
colExpr, err := b.stmtBuilder.fm.ColumnExpressionFor(ctx, &field, keys)
colExpr, err := b.stmtBuilder.fm.ColumnExpressionFor(ctx, b.start, b.end, &field, keys)
if err != nil {
b.stmtBuilder.logger.WarnContext(ctx, "failed to map select field",
"field", field.Name, "error", err)
@@ -465,7 +465,7 @@ func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFrom
// Add order by support using ColumnExpressionFor
orderApplied := false
for _, orderBy := range b.operator.Order {
colExpr, err := b.stmtBuilder.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys)
colExpr, err := b.stmtBuilder.fm.ColumnExpressionFor(ctx, b.start, b.end, &orderBy.Key.TelemetryFieldKey, keys)
if err != nil {
return nil, err
}
@@ -547,6 +547,8 @@ func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, sele
for _, gb := range b.operator.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(
ctx,
b.start,
b.end,
&gb.TelemetryFieldKey,
b.stmtBuilder.fm,
b.stmtBuilder.cb,
@@ -572,6 +574,8 @@ func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, sele
for i, agg := range b.operator.Aggregations {
rewritten, chArgs, err := b.stmtBuilder.aggExprRewriter.Rewrite(
ctx,
b.start,
b.end,
agg.Expression,
uint64(b.operator.StepInterval.Seconds()),
keys,
@@ -657,6 +661,8 @@ func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFro
for _, gb := range b.operator.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(
ctx,
b.start,
b.end,
&gb.TelemetryFieldKey,
b.stmtBuilder.fm,
b.stmtBuilder.cb,
@@ -684,6 +690,8 @@ func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFro
for i, agg := range b.operator.Aggregations {
rewritten, chArgs, err := b.stmtBuilder.aggExprRewriter.Rewrite(
ctx,
b.start,
b.end,
agg.Expression,
rateInterval,
keys,
@@ -797,6 +805,8 @@ func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFr
for _, gb := range b.operator.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(
ctx,
b.start,
b.end,
&gb.TelemetryFieldKey,
b.stmtBuilder.fm,
b.stmtBuilder.cb,
@@ -822,6 +832,8 @@ func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFr
for i, agg := range b.operator.Aggregations {
rewritten, chArgs, err := b.stmtBuilder.aggExprRewriter.Rewrite(
ctx,
b.start,
b.end,
agg.Expression,
uint64((b.end-b.start)/querybuilder.NsToSeconds),
keys,

View File

@@ -21,11 +21,11 @@ type JsonKeyToFieldFunc func(context.Context, *telemetrytypes.TelemetryFieldKey,
// FieldMapper maps the telemetry field key to the table field name.
type FieldMapper interface {
// FieldFor returns the field name for the given key.
FieldFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (string, error)
FieldFor(ctx context.Context, startNs, endNs uint64, key *telemetrytypes.TelemetryFieldKey) (string, error)
// ColumnFor returns the column for the given key.
ColumnFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error)
// ColumnExpressionFor returns the column expression for the given key.
ColumnExpressionFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) (string, error)
ColumnExpressionFor(ctx context.Context, startNs, endNs uint64, key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) (string, error)
}
// ConditionBuilder builds the condition for the filter.
@@ -37,8 +37,8 @@ type ConditionBuilder interface {
type AggExprRewriter interface {
// Rewrite rewrites the aggregation expression to be used in the query.
Rewrite(ctx context.Context, expr string, rateInterval uint64, keys map[string][]*telemetrytypes.TelemetryFieldKey) (string, []any, error)
RewriteMulti(ctx context.Context, exprs []string, rateInterval uint64, keys map[string][]*telemetrytypes.TelemetryFieldKey) ([]string, [][]any, error)
Rewrite(ctx context.Context, startNs, endNs uint64, expr string, rateInterval uint64, keys map[string][]*telemetrytypes.TelemetryFieldKey) (string, []any, error)
RewriteMulti(ctx context.Context, startNs, endNs uint64, exprs []string, rateInterval uint64, keys map[string][]*telemetrytypes.TelemetryFieldKey) ([]string, [][]any, error)
}
type Statement struct {

View File

@@ -0,0 +1,20 @@
package telemetrytypes
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/valuer"
)
type KeyEvolutionMetadataKey struct {
BaseColumn string
BaseColumnType string
NewColumn string
NewColumnType string
ReleaseTime time.Time
}
type KeyEvolutionMetadataStore interface {
Get(ctx context.Context, orgId valuer.UUID, keyName string) []*KeyEvolutionMetadataKey
}

View File

@@ -0,0 +1,40 @@
package telemetrytypestest
import (
"context"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// MockKeyEvolutionMetadataStore implements the KeyEvolutionMetadataStore interface for testing purposes
type MockKeyEvolutionMetadataStore struct {
metadata map[string]map[string][]*telemetrytypes.KeyEvolutionMetadataKey // orgId -> keyName -> metadata
}
// NewMockKeyEvolutionMetadataStore creates a new instance of MockKeyEvolutionMetadataStore with initialized maps
func NewMockKeyEvolutionMetadataStore(metadata map[string]map[string][]*telemetrytypes.KeyEvolutionMetadataKey) *MockKeyEvolutionMetadataStore {
return &MockKeyEvolutionMetadataStore{
metadata: metadata,
}
}
// Get retrieves all metadata keys for the given key name and orgId.
// Returns an empty slice if the key is not found.
func (m *MockKeyEvolutionMetadataStore) Get(ctx context.Context, orgId valuer.UUID, keyName string) []*telemetrytypes.KeyEvolutionMetadataKey {
if m.metadata == nil {
return nil
}
orgMetadata, orgExists := m.metadata[orgId.String()]
if !orgExists {
return nil
}
keys, exists := orgMetadata[keyName]
if !exists {
return nil
}
// Return a copy to prevent external modification
result := make([]*telemetrytypes.KeyEvolutionMetadataKey, len(keys))
copy(result, keys)
return result
}