diff --git a/conf/example.yaml b/conf/example.yaml index 225019eb01..0aba8b8773 100644 --- a/conf/example.yaml +++ b/conf/example.yaml @@ -47,10 +47,10 @@ cache: provider: memory # memory: Uses in-memory caching. memory: - # Time-to-live for cache entries in memory. Specify the duration in ns - ttl: 60000000000 - # The interval at which the cache will be cleaned up - cleanup_interval: 1m + # Max items for the in-memory cache (10x the entries) + num_counters: 100000 + # Total cost in bytes allocated bounded cache + max_cost: 67108864 # redis: Uses Redis as the caching backend. redis: # The hostname or IP address of the Redis server. diff --git a/go.mod b/go.mod index 322eb39e17..4e1f65ed8e 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,6 @@ require ( github.com/openfga/api/proto v0.0.0-20250909172242-b4b2a12f5c67 github.com/openfga/language/pkg/go v0.2.0-beta.2.0.20250428093642-7aeebe78bbfe github.com/opentracing/opentracing-go v1.2.0 - github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pkg/errors v0.9.1 github.com/prometheus/alertmanager v0.28.1 github.com/prometheus/client_golang v1.23.2 diff --git a/go.sum b/go.sum index a4198ab1e6..740cca558f 100644 --- a/go.sum +++ b/go.sum @@ -786,8 +786,6 @@ github.com/ovh/go-ovh v1.7.0/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= -github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= -github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU= github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= diff --git a/pkg/cache/config.go b/pkg/cache/config.go index fc558a5611..1c9a044b09 100644 --- a/pkg/cache/config.go +++ b/pkg/cache/config.go @@ -1,14 +1,12 @@ package cache import ( - "time" - "github.com/SigNoz/signoz/pkg/factory" ) type Memory struct { - TTL time.Duration `mapstructure:"ttl"` - CleanupInterval time.Duration `mapstructure:"cleanup_interval"` + NumCounters int64 `mapstructure:"num_counters"` + MaxCost int64 `mapstructure:"max_cost"` } type Redis struct { @@ -32,8 +30,8 @@ func newConfig() factory.Config { return &Config{ Provider: "memory", Memory: Memory{ - TTL: time.Hour * 168, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 10000, // 10k cache entries * 10x as per ristretto + MaxCost: 1 << 27, // 128 MB }, Redis: Redis{ Host: "localhost", diff --git a/pkg/cache/memorycache/provider.go b/pkg/cache/memorycache/provider.go index cf2eca69fc..b5338cf9a0 100644 --- a/pkg/cache/memorycache/provider.go +++ b/pkg/cache/memorycache/provider.go @@ -11,14 +11,15 @@ import ( "github.com/SigNoz/signoz/pkg/factory" "github.com/SigNoz/signoz/pkg/types/cachetypes" "github.com/SigNoz/signoz/pkg/valuer" - gocache "github.com/patrickmn/go-cache" + "github.com/dgraph-io/ristretto/v2" semconv "go.opentelemetry.io/collector/semconv/v1.6.1" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" ) type provider struct { - cc *gocache.Cache + cc *ristretto.Cache[string, any] config cache.Config settings factory.ScopedProviderSettings } @@ -30,8 +31,62 @@ func NewFactory() factory.ProviderFactory[cache.Cache, cache.Config] { func New(ctx context.Context, settings factory.ProviderSettings, config cache.Config) (cache.Cache, error) { scopedProviderSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/cache/memorycache") + cc, err := ristretto.NewCache(&ristretto.Config[string, any]{ + NumCounters: config.Memory.NumCounters, + MaxCost: config.Memory.MaxCost, + BufferItems: 64, + Metrics: true, + }) + if err != nil { + return nil, err + } + + meter := scopedProviderSettings.Meter() + telemetry, err := newMetrics(meter) + if err != nil { + return nil, err + } + + _, err = meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { + metrics := cc.Metrics + attributes := []attribute.KeyValue{ + attribute.String("provider", "memorycache"), + } + o.ObserveFloat64(telemetry.cacheRatio, metrics.Ratio(), metric.WithAttributes(attributes...)) + o.ObserveInt64(telemetry.cacheHits, int64(metrics.Hits()), metric.WithAttributes(attributes...)) + o.ObserveInt64(telemetry.cacheMisses, int64(metrics.Misses()), metric.WithAttributes(attributes...)) + o.ObserveInt64(telemetry.costAdded, int64(metrics.CostAdded()), metric.WithAttributes(attributes...)) + o.ObserveInt64(telemetry.costEvicted, int64(metrics.CostEvicted()), metric.WithAttributes(attributes...)) + o.ObserveInt64(telemetry.keysAdded, int64(metrics.KeysAdded()), metric.WithAttributes(attributes...)) + o.ObserveInt64(telemetry.keysEvicted, int64(metrics.KeysEvicted()), metric.WithAttributes(attributes...)) + o.ObserveInt64(telemetry.keysUpdated, int64(metrics.KeysUpdated()), metric.WithAttributes(attributes...)) + o.ObserveInt64(telemetry.setsDropped, int64(metrics.SetsDropped()), metric.WithAttributes(attributes...)) + o.ObserveInt64(telemetry.setsRejected, int64(metrics.SetsRejected()), metric.WithAttributes(attributes...)) + o.ObserveInt64(telemetry.getsDropped, int64(metrics.GetsDropped()), metric.WithAttributes(attributes...)) + o.ObserveInt64(telemetry.getsKept, int64(metrics.GetsKept()), metric.WithAttributes(attributes...)) + o.ObserveInt64(telemetry.totalCost, int64(cc.MaxCost()), metric.WithAttributes(attributes...)) + return nil + }, + telemetry.cacheRatio, + telemetry.cacheHits, + telemetry.cacheMisses, + telemetry.costAdded, + telemetry.costEvicted, + telemetry.keysAdded, + telemetry.keysEvicted, + telemetry.keysUpdated, + telemetry.setsDropped, + telemetry.setsRejected, + telemetry.getsDropped, + telemetry.getsKept, + telemetry.totalCost, + ) + if err != nil { + return nil, err + } + return &provider{ - cc: gocache.New(config.Memory.TTL, config.Memory.CleanupInterval), + cc: cc, settings: scopedProviderSettings, config: config, }, nil @@ -51,19 +106,32 @@ func (provider *provider) Set(ctx context.Context, orgID valuer.UUID, cacheKey s } if cloneable, ok := data.(cachetypes.Cloneable); ok { - span.SetAttributes(attribute.Bool("db.cloneable", true)) + span.SetAttributes(attribute.Bool("memory.cloneable", true)) + span.SetAttributes(attribute.Int64("memory.cost", 1)) toCache := cloneable.Clone() - provider.cc.Set(strings.Join([]string{orgID.StringValue(), cacheKey}, "::"), toCache, ttl) + // In case of contention we are choosing to evict the cloneable entries first hence cost is set to 1 + if ok := provider.cc.SetWithTTL(strings.Join([]string{orgID.StringValue(), cacheKey}, "::"), toCache, 1, ttl); !ok { + return errors.New(errors.TypeInternal, errors.CodeInternal, "error writing to cache") + } + + provider.cc.Wait() return nil } - span.SetAttributes(attribute.Bool("db.cloneable", false)) toCache, err := data.MarshalBinary() + cost := int64(len(toCache)) if err != nil { return err } - provider.cc.Set(strings.Join([]string{orgID.StringValue(), cacheKey}, "::"), toCache, ttl) + span.SetAttributes(attribute.Bool("memory.cloneable", false)) + span.SetAttributes(attribute.Int64("memory.cost", cost)) + + if ok := provider.cc.SetWithTTL(strings.Join([]string{orgID.StringValue(), cacheKey}, "::"), toCache, 1, ttl); !ok { + return errors.New(errors.TypeInternal, errors.CodeInternal, "error writing to cache") + } + + provider.cc.Wait() return nil } @@ -86,7 +154,7 @@ func (provider *provider) Get(ctx context.Context, orgID valuer.UUID, cacheKey s } if cloneable, ok := cachedData.(cachetypes.Cloneable); ok { - span.SetAttributes(attribute.Bool("db.cloneable", true)) + span.SetAttributes(attribute.Bool("memory.cloneable", true)) // check if the destination value is settable dstv := reflect.ValueOf(dest) if !dstv.Elem().CanSet() { @@ -107,7 +175,7 @@ func (provider *provider) Get(ctx context.Context, orgID valuer.UUID, cacheKey s } if fromCache, ok := cachedData.([]byte); ok { - span.SetAttributes(attribute.Bool("db.cloneable", false)) + span.SetAttributes(attribute.Bool("memory.cloneable", false)) if err = dest.UnmarshalBinary(fromCache); err != nil { return err } @@ -126,11 +194,11 @@ func (provider *provider) Delete(ctx context.Context, orgID valuer.UUID, cacheKe )) defer span.End() - provider.cc.Delete(strings.Join([]string{orgID.StringValue(), cacheKey}, "::")) + provider.cc.Del(strings.Join([]string{orgID.StringValue(), cacheKey}, "::")) } func (provider *provider) DeleteMany(_ context.Context, orgID valuer.UUID, cacheKeys []string) { for _, cacheKey := range cacheKeys { - provider.cc.Delete(strings.Join([]string{orgID.StringValue(), cacheKey}, "::")) + provider.cc.Del(strings.Join([]string{orgID.StringValue(), cacheKey}, "::")) } } diff --git a/pkg/cache/memorycache/provider_test.go b/pkg/cache/memorycache/provider_test.go index 74de8c8847..0b2c988f4a 100644 --- a/pkg/cache/memorycache/provider_test.go +++ b/pkg/cache/memorycache/provider_test.go @@ -55,8 +55,8 @@ func (cacheable *CacheableB) UnmarshalBinary(data []byte) error { func TestCloneableSetWithNilPointer(t *testing.T) { cache, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{ - TTL: 10 * time.Second, - CleanupInterval: 10 * time.Second, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, }}) require.NoError(t, err) @@ -66,8 +66,8 @@ func TestCloneableSetWithNilPointer(t *testing.T) { func TestCacheableSetWithNilPointer(t *testing.T) { cache, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{ - TTL: 10 * time.Second, - CleanupInterval: 10 * time.Second, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, }}) require.NoError(t, err) @@ -77,8 +77,8 @@ func TestCacheableSetWithNilPointer(t *testing.T) { func TestCloneableSetGet(t *testing.T) { cache, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{ - TTL: 10 * time.Second, - CleanupInterval: 10 * time.Second, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, }}) require.NoError(t, err) @@ -106,8 +106,8 @@ func TestCloneableSetGet(t *testing.T) { func TestCacheableSetGet(t *testing.T) { cache, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{ - TTL: 10 * time.Second, - CleanupInterval: 10 * time.Second, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, }}) require.NoError(t, err) @@ -135,8 +135,8 @@ func TestCacheableSetGet(t *testing.T) { func TestGetWithNilPointer(t *testing.T) { cache, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{ - TTL: 10 * time.Second, - CleanupInterval: 10 * time.Second, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, }}) require.NoError(t, err) @@ -146,8 +146,8 @@ func TestGetWithNilPointer(t *testing.T) { func TestSetGetWithDifferentTypes(t *testing.T) { cache, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{ - TTL: 10 * time.Second, - CleanupInterval: 10 * time.Second, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, }}) require.NoError(t, err) @@ -167,8 +167,8 @@ func TestSetGetWithDifferentTypes(t *testing.T) { func TestCloneableConcurrentSetGet(t *testing.T) { cache, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{ - TTL: 10 * time.Second, - CleanupInterval: 10 * time.Second, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, }}) require.NoError(t, err) diff --git a/pkg/cache/memorycache/telemetry.go b/pkg/cache/memorycache/telemetry.go new file mode 100644 index 0000000000..811acf3828 --- /dev/null +++ b/pkg/cache/memorycache/telemetry.go @@ -0,0 +1,110 @@ +package memorycache + +import ( + "github.com/SigNoz/signoz/pkg/errors" + "go.opentelemetry.io/otel/metric" +) + +type telemetry struct { + cacheRatio metric.Float64ObservableGauge + cacheHits metric.Int64ObservableGauge + cacheMisses metric.Int64ObservableGauge + costAdded metric.Int64ObservableGauge + costEvicted metric.Int64ObservableGauge + keysAdded metric.Int64ObservableGauge + keysEvicted metric.Int64ObservableGauge + keysUpdated metric.Int64ObservableGauge + setsDropped metric.Int64ObservableGauge + setsRejected metric.Int64ObservableGauge + getsDropped metric.Int64ObservableGauge + getsKept metric.Int64ObservableGauge + totalCost metric.Int64ObservableGauge +} + +func newMetrics(meter metric.Meter) (*telemetry, error) { + var errs error + cacheRatio, err := meter.Float64ObservableGauge("signoz.cache.ratio", metric.WithDescription("Ratio is the number of Hits over all accesses (Hits + Misses). This is the percentage of successful Get calls."), metric.WithUnit("1")) + if err != nil { + errs = errors.Join(errs, err) + } + + cacheHits, err := meter.Int64ObservableGauge("signoz.cache.hits", metric.WithDescription("Hits is the number of Get calls where a value was found for the corresponding key.")) + if err != nil { + errs = errors.Join(errs, err) + } + + cacheMisses, err := meter.Int64ObservableGauge("signoz.cache.misses", metric.WithDescription("Misses is the number of Get calls where a value was not found for the corresponding key")) + if err != nil { + errs = errors.Join(errs, err) + } + + costAdded, err := meter.Int64ObservableGauge("signoz.cache.cost.added", metric.WithDescription("CostAdded is the sum of costs that have been added (successful Set calls)")) + if err != nil { + errs = errors.Join(errs, err) + } + + costEvicted, err := meter.Int64ObservableGauge("signoz.cache.cost.evicted", metric.WithDescription("CostEvicted is the sum of all costs that have been evicted")) + if err != nil { + errs = errors.Join(errs, err) + } + + keysAdded, err := meter.Int64ObservableGauge("signoz.cache.keys.added", metric.WithDescription("KeysAdded is the total number of Set calls where a new key-value item was added")) + if err != nil { + errs = errors.Join(errs, err) + } + + keysEvicted, err := meter.Int64ObservableGauge("signoz.cache.keys.evicted", metric.WithDescription("KeysEvicted is the total number of keys evicted")) + if err != nil { + errs = errors.Join(errs, err) + } + + keysUpdated, err := meter.Int64ObservableGauge("signoz.cache.keys.updated", metric.WithDescription("KeysUpdated is the total number of Set calls where the value was updated")) + if err != nil { + errs = errors.Join(errs, err) + } + + setsDropped, err := meter.Int64ObservableGauge("signoz.cache.sets.dropped", metric.WithDescription("SetsDropped is the number of Set calls that don't make it into internal buffers (due to contention or some other reason)")) + if err != nil { + errs = errors.Join(errs, err) + } + + setsRejected, err := meter.Int64ObservableGauge("signoz.cache.sets.rejected", metric.WithDescription("SetsRejected is the number of Set calls rejected by the policy (TinyLFU)")) + if err != nil { + errs = errors.Join(errs, err) + } + + getsDropped, err := meter.Int64ObservableGauge("signoz.cache.gets.dropped", metric.WithDescription("GetsDropped is the number of Get calls that don't make it into internal buffers (due to contention or some other reason)")) + if err != nil { + errs = errors.Join(errs, err) + } + + getsKept, err := meter.Int64ObservableGauge("signoz.cache.gets.kept", metric.WithDescription("GetsKept is the number of Get calls that make it into internal buffers")) + if err != nil { + errs = errors.Join(errs, err) + } + + totalCost, err := meter.Int64ObservableGauge("signoz.cache.total.cost", metric.WithDescription("TotalCost is the available cost configured for the cache")) + if err != nil { + errs = errors.Join(errs, err) + } + + if errs != nil { + return nil, errs + } + + return &telemetry{ + cacheRatio: cacheRatio, + cacheHits: cacheHits, + cacheMisses: cacheMisses, + costAdded: costAdded, + costEvicted: costEvicted, + keysAdded: keysAdded, + keysEvicted: keysEvicted, + keysUpdated: keysUpdated, + setsDropped: setsDropped, + setsRejected: setsRejected, + getsDropped: getsDropped, + getsKept: getsKept, + totalCost: totalCost, + }, nil +} diff --git a/pkg/querier/bucket_cache_bench_test.go b/pkg/querier/bucket_cache_bench_test.go index 8be802d841..2070b9b452 100644 --- a/pkg/querier/bucket_cache_bench_test.go +++ b/pkg/querier/bucket_cache_bench_test.go @@ -339,8 +339,8 @@ func createBenchmarkBucketCache(tb testing.TB) BucketCache { config := cache.Config{ Provider: "memory", Memory: cache.Memory{ - TTL: time.Hour * 168, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, }, } memCache, err := cachetest.New(config) diff --git a/pkg/querier/bucket_cache_test.go b/pkg/querier/bucket_cache_test.go index c52fdfb2e4..e86ba96eb8 100644 --- a/pkg/querier/bucket_cache_test.go +++ b/pkg/querier/bucket_cache_test.go @@ -26,8 +26,8 @@ func createTestCache(t *testing.T) cache.Cache { config := cache.Config{ Provider: "memory", Memory: cache.Memory{ - TTL: time.Hour * 168, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, }, } memCache, err := cachetest.New(config) diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index c86a0ece41..f3f1dd07e6 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -238,8 +238,8 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { } opts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) @@ -458,8 +458,8 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { } opts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) @@ -631,8 +631,8 @@ func TestQueryRange(t *testing.T) { }, } cacheOpts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: cacheOpts}) require.NoError(t, err) @@ -748,8 +748,8 @@ func TestQueryRangeValueType(t *testing.T) { }, } cacheOpts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: cacheOpts}) require.NoError(t, err) @@ -911,8 +911,8 @@ func TestQueryRangeTimeShiftWithCache(t *testing.T) { }, } cacheOpts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: cacheOpts}) require.NoError(t, err) @@ -1017,8 +1017,8 @@ func TestQueryRangeTimeShiftWithLimitAndCache(t *testing.T) { }, } cacheOpts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: cacheOpts}) require.NoError(t, err) @@ -1094,8 +1094,8 @@ func TestQueryRangeValueTypePromQL(t *testing.T) { }, } cacheOpts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: cacheOpts}) require.NoError(t, err) diff --git a/pkg/query-service/app/querier/v2/querier_test.go b/pkg/query-service/app/querier/v2/querier_test.go index c0cc7670bc..f40ac06183 100644 --- a/pkg/query-service/app/querier/v2/querier_test.go +++ b/pkg/query-service/app/querier/v2/querier_test.go @@ -238,8 +238,8 @@ func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) { } opts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) @@ -458,8 +458,8 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) { } opts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) @@ -638,8 +638,8 @@ func TestV2QueryRangePanelGraph(t *testing.T) { }, } cacheOpts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: cacheOpts}) require.NoError(t, err) @@ -793,8 +793,8 @@ func TestV2QueryRangeValueType(t *testing.T) { }, } cacheOpts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: cacheOpts}) require.NoError(t, err) @@ -960,8 +960,8 @@ func TestV2QueryRangeTimeShiftWithCache(t *testing.T) { }, } cacheOpts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: cacheOpts}) require.NoError(t, err) @@ -1068,8 +1068,8 @@ func TestV2QueryRangeTimeShiftWithLimitAndCache(t *testing.T) { }, } cacheOpts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: cacheOpts}) require.NoError(t, err) @@ -1147,8 +1147,8 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) { }, } cacheOpts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: cacheOpts}) require.NoError(t, err) diff --git a/pkg/query-service/querycache/query_range_cache_test.go b/pkg/query-service/querycache/query_range_cache_test.go index 0b15fd6d68..c9decc000e 100644 --- a/pkg/query-service/querycache/query_range_cache_test.go +++ b/pkg/query-service/querycache/query_range_cache_test.go @@ -3,7 +3,6 @@ package querycache_test import ( "context" "testing" - "time" "github.com/SigNoz/signoz/pkg/cache" "github.com/SigNoz/signoz/pkg/cache/cachetest" @@ -17,8 +16,8 @@ import ( func TestFindMissingTimeRanges(t *testing.T) { // Initialize the mock cache opts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) @@ -243,8 +242,8 @@ func TestFindMissingTimeRanges(t *testing.T) { func TestFindMissingTimeRangesV2(t *testing.T) { // Initialize the mock cache opts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) @@ -590,8 +589,8 @@ func TestFindMissingTimeRangesV2(t *testing.T) { func TestMergeWithCachedSeriesData(t *testing.T) { // Initialize the mock cache opts := cache.Memory{ - TTL: 5 * time.Minute, - CleanupInterval: 10 * time.Minute, + NumCounters: 10 * 1000, + MaxCost: 1 << 26, } c, err := cachetest.New(cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) diff --git a/pkg/query-service/rules/threshold_rule_test.go b/pkg/query-service/rules/threshold_rule_test.go index c0689a312d..db23c4ed7d 100644 --- a/pkg/query-service/rules/threshold_rule_test.go +++ b/pkg/query-service/rules/threshold_rule_test.go @@ -774,7 +774,15 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { } options := clickhouseReader.NewOptions("", "", "archiveNamespace") - readerCache, err := cachetest.New(cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}}) + readerCache, err := cachetest.New( + cache.Config{ + Provider: "memory", + Memory: cache.Memory{ + NumCounters: 10 * 1000, + MaxCost: 1 << 26, + }, + }, + ) require.NoError(t, err) reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), readerCache) rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger) @@ -880,7 +888,15 @@ func TestThresholdRuleNoData(t *testing.T) { "description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})", "summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}", } - readerCache, err := cachetest.New(cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}}) + readerCache, err := cachetest.New( + cache.Config{ + Provider: "memory", + Memory: cache.Memory{ + NumCounters: 10 * 1000, + MaxCost: 1 << 26, + }, + }, + ) assert.NoError(t, err) options := clickhouseReader.NewOptions("", "", "archiveNamespace") reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), readerCache) @@ -1397,7 +1413,15 @@ func TestMultipleThresholdRule(t *testing.T) { } options := clickhouseReader.NewOptions("", "", "archiveNamespace") - readerCache, err := cachetest.New(cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}}) + readerCache, err := cachetest.New( + cache.Config{ + Provider: "memory", + Memory: cache.Memory{ + NumCounters: 10 * 1000, + MaxCost: 1 << 26, + }, + }, + ) require.NoError(t, err) reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), readerCache) rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger) diff --git a/pkg/signoz/provider.go b/pkg/signoz/provider.go index 279009d3d3..2ff973812e 100644 --- a/pkg/signoz/provider.go +++ b/pkg/signoz/provider.go @@ -155,6 +155,9 @@ func NewTelemetryStoreProviderFactories() factory.NamedMap[factory.ProviderFacto telemetrystore.TelemetryStoreHookFactoryFunc(func(s string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] { return telemetrystorehook.NewLoggingFactory() }), + telemetrystore.TelemetryStoreHookFactoryFunc(func(s string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] { + return telemetrystorehook.NewInstrumentationFactory(s) + }), ), ) } diff --git a/pkg/telemetrystore/event.go b/pkg/telemetrystore/event.go index 59d649b392..72525ba66d 100644 --- a/pkg/telemetrystore/event.go +++ b/pkg/telemetrystore/event.go @@ -1,13 +1,16 @@ package telemetrystore import ( + "strings" "time" + "unicode" ) type QueryEvent struct { Query string QueryArgs []any StartTime time.Time + Operation string Err error } @@ -16,5 +19,18 @@ func NewQueryEvent(query string, args []any) *QueryEvent { Query: query, QueryArgs: args, StartTime: time.Now(), + Operation: queryOperation(query), } } + +func queryOperation(query string) string { + queryOp := strings.TrimLeftFunc(query, unicode.IsSpace) + + if idx := strings.IndexByte(queryOp, ' '); idx > 0 { + queryOp = queryOp[:idx] + } + if len(queryOp) > 16 { + queryOp = queryOp[:16] + } + return queryOp +} diff --git a/pkg/telemetrystore/telemetrystorehook/instrumentation.go b/pkg/telemetrystore/telemetrystorehook/instrumentation.go new file mode 100644 index 0000000000..8d335f69c9 --- /dev/null +++ b/pkg/telemetrystore/telemetrystorehook/instrumentation.go @@ -0,0 +1,69 @@ +package telemetrystorehook + +import ( + "context" + + "github.com/SigNoz/signoz/pkg/factory" + "github.com/SigNoz/signoz/pkg/telemetrystore" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.12.0" + "go.opentelemetry.io/otel/trace" +) + +type instrumentation struct { + clickhouseVersion string + clickhouseCluster string + tracer trace.Tracer + meter metric.Meter +} + +func NewInstrumentationFactory(version string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] { + return factory.NewProviderFactory(factory.MustNewName("instrumentation"), func(ctx context.Context, ps factory.ProviderSettings, c telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) { + return NewInstrumentation(ctx, ps, c, version) + }) +} + +func NewInstrumentation(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, version string) (telemetrystore.TelemetryStoreHook, error) { + meter := providerSettings.MeterProvider.Meter("github.com/SigNoz/signoz/pkg/telemetrystore") + + return &instrumentation{ + clickhouseVersion: version, + clickhouseCluster: config.Clickhouse.Cluster, + tracer: providerSettings.TracerProvider.Tracer("github.com/SigNoz/signoz/pkg/telemetrystore"), + meter: meter, + }, nil +} + +func (hook *instrumentation) BeforeQuery(ctx context.Context, event *telemetrystore.QueryEvent) context.Context { + ctx, _ = hook.tracer.Start(ctx, "", trace.WithSpanKind(trace.SpanKindClient)) + return ctx +} + +func (hook *instrumentation) AfterQuery(ctx context.Context, event *telemetrystore.QueryEvent) { + span := trace.SpanFromContext(ctx) + if !span.IsRecording() { + return + } + + span.SetName(event.Operation) + defer span.End() + + var attrs []attribute.KeyValue + attrs = append( + attrs, + semconv.DBStatementKey.String(event.Query), + attribute.String("db.version", hook.clickhouseVersion), + semconv.DBSystemKey.String("clickhouse"), + semconv.DBOperationKey.String(event.Operation), + attribute.String("clickhouse.cluster", hook.clickhouseCluster), + ) + + if event.Err != nil { + span.RecordError(event.Err) + span.SetStatus(codes.Error, event.Err.Error()) + } + + span.SetAttributes(attrs...) +}