Compare commits

...

1 Commits

Author SHA1 Message Date
srikanthccv
a4699b1fed chore: add metric temporality cache 2025-08-31 19:46:29 +05:30
12 changed files with 439 additions and 10 deletions

View File

@@ -57,7 +57,7 @@ func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler,
FluxInterval: opts.FluxInterval,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
LicensingAPI: httplicensing.NewLicensingAPI(signoz.Licensing),
FieldsAPI: fields.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.TelemetryStore),
FieldsAPI: fields.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.TelemetryStore, signoz.Cache),
Signoz: signoz,
QuerierAPI: querierAPI.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.Querier, signoz.Analytics),
})

View File

@@ -5,6 +5,7 @@ import (
"io"
"net/http"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
@@ -25,10 +26,12 @@ type API struct {
func NewAPI(
settings factory.ProviderSettings,
telemetryStore telemetrystore.TelemetryStore,
cache cache.Cache,
) *API {
telemetryMetadataStore := telemetrymetadata.NewTelemetryMetaStore(
settings,
telemetryStore,
cache,
telemetrytraces.DBName,
telemetrytraces.TagAttributesV2TableName,
telemetrytraces.SpanAttributesKeysTblName,

View File

@@ -230,7 +230,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
var metricTemporality map[string]metrictypes.Temporality
if len(metricNames) > 0 {
var err error
metricTemporality, err = q.metadataStore.FetchTemporalityMulti(ctx, metricNames...)
metricTemporality, err = q.metadataStore.FetchTemporalityMulti(ctx, orgID, metricNames...)
if err != nil {
q.logger.WarnContext(ctx, "failed to fetch metric temporality", "error", err, "metrics", metricNames)
// Continue without temporality - statement builder will handle unspecified

View File

@@ -48,6 +48,7 @@ func newProvider(
telemetryMetadataStore := telemetrymetadata.NewTelemetryMetaStore(
settings,
telemetryStore,
cache,
telemetrytraces.DBName,
telemetrytraces.TagAttributesV2TableName,
telemetrytraces.SpanAttributesKeysTblName,

View File

@@ -117,7 +117,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
FluxInterval: config.Querier.FluxInterval,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
LicensingAPI: nooplicensing.NewLicenseAPI(),
FieldsAPI: fields.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.TelemetryStore),
FieldsAPI: fields.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.TelemetryStore, signoz.Cache),
Signoz: signoz,
QuerierAPI: querierAPI.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.Querier, signoz.Analytics),
})

View File

@@ -4,9 +4,11 @@ import (
"context"
"fmt"
"log/slog"
"os"
"slices"
"strings"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
@@ -16,6 +18,7 @@ import (
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/huandu/go-sqlbuilder"
"golang.org/x/exp/maps"
)
@@ -51,6 +54,7 @@ type telemetryMetaStore struct {
fm qbtypes.FieldMapper
conditionBuilder qbtypes.ConditionBuilder
temporalityCache *TemporalityCache
}
func escapeForLike(s string) string {
@@ -60,6 +64,7 @@ func escapeForLike(s string) string {
func NewTelemetryMetaStore(
settings factory.ProviderSettings,
telemetrystore telemetrystore.TelemetryStore,
cache cache.Cache,
tracesDBName string,
tracesFieldsTblName string,
spanAttributesKeysTblName string,
@@ -104,6 +109,34 @@ func NewTelemetryMetaStore(
t.fm = fm
t.conditionBuilder = conditionBuilder
if cache != nil &&
strings.ToLower(os.Getenv("TEMPORALITY_CACHE_ENABLED")) != "false" {
// TODO(srikanthccv): is there a ever a case to make this configurable?
temporalityCacheConfig := DefaultTemporalityCacheConfig()
t.temporalityCache = NewTemporalityCache(
settings,
cache,
temporalityCacheConfig,
func(ctx context.Context, orgID valuer.UUID, metricName string) (metrictypes.Temporality, error) {
temporalityMap, err := t.fetchTemporalityMultiDirect(ctx, metricName)
if err != nil {
return metrictypes.Unknown, err
}
temporality, ok := temporalityMap[metricName]
if !ok {
return metrictypes.Unknown, nil
}
return temporality, nil
},
)
t.temporalityCache.SetRefreshMultiCallback(func(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]metrictypes.Temporality, error) {
return t.fetchTemporalityMultiDirect(ctx, metricNames...)
})
} else {
t.logger.Info("skipping temporality cache")
}
return t
}
@@ -1429,12 +1462,20 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto
return values, complete, nil
}
func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error) {
func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, orgID valuer.UUID, metricName string) (metrictypes.Temporality, error) {
if metricName == "" {
return metrictypes.Unknown, errors.Newf(errors.TypeInternal, errors.CodeInternal, "metric name cannot be empty")
}
temporalityMap, err := t.FetchTemporalityMulti(ctx, metricName)
if t.temporalityCache != nil {
return t.temporalityCache.Get(ctx, orgID, metricName)
}
return t.fetchTemporalityDirect(ctx, metricName)
}
func (t *telemetryMetaStore) fetchTemporalityDirect(ctx context.Context, metricName string) (metrictypes.Temporality, error) {
temporalityMap, err := t.fetchTemporalityMultiDirect(ctx, metricName)
if err != nil {
return metrictypes.Unknown, err
}
@@ -1447,11 +1488,20 @@ func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, metricName st
return temporality, nil
}
func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]metrictypes.Temporality, error) {
if len(metricNames) == 0 {
return make(map[string]metrictypes.Temporality), nil
}
if t.temporalityCache != nil {
return t.temporalityCache.GetMulti(ctx, orgID, metricNames)
}
return t.fetchTemporalityMultiDirect(ctx, metricNames...)
}
// fetchTemporalityMultiDirect fetches temporalities directly from database without cache
func (t *telemetryMetaStore) fetchTemporalityMultiDirect(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
result := make(map[string]metrictypes.Temporality)
metricsTemporality, err := t.fetchMetricsTemporality(ctx, metricNames...)
if err != nil {

View File

@@ -0,0 +1,24 @@
package telemetrymetadata
import (
"testing"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMetadataStore_TemporalityCacheIntegration(t *testing.T) {
// Test that metadata store works without cache
metaStore := NewTelemetryMetaStore(
instrumentationtest.New().ToProviderSettings(),
nil, // telemetrystore
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "",
nil, // No cache
)
store, ok := metaStore.(*telemetryMetaStore)
require.True(t, ok)
assert.Nil(t, store.temporalityCache, "Should not have cache when none provided")
}

View File

@@ -53,6 +53,7 @@ func TestGetKeys(t *testing.T) {
telemetrylogs.LogResourceKeysTblName,
DBName,
AttributesMetadataLocalTableName,
nil, // No cache for tests
)
rows := cmock.NewRows([]cmock.ColumnType{

View File

@@ -0,0 +1,281 @@
package telemetrymetadata
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"math/rand"
"sync"
"time"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// TemporalityCacheEntry represents a cached temporality entry
type TemporalityCacheEntry struct {
Temporality metrictypes.Temporality `json:"temporality"`
CachedAt time.Time `json:"cached_at"`
SoftTTL time.Duration `json:"soft_ttl"`
HardTTL time.Duration `json:"hard_ttl"`
}
func (e *TemporalityCacheEntry) MarshalBinary() ([]byte, error) {
return json.Marshal(e)
}
func (e *TemporalityCacheEntry) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, e)
}
type TemporalityCache struct {
cache cache.Cache
logger *slog.Logger
softTTL time.Duration
hardTTL time.Duration
jitterPercent int
refreshing sync.Map // map[string]bool to track ongoing refreshes
refreshCallback func(ctx context.Context, orgID valuer.UUID, metricName string) (metrictypes.Temporality, error)
refreshMultiCallback func(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]metrictypes.Temporality, error)
}
// TemporalityCacheConfig holds configuration for the temporality cache
type TemporalityCacheConfig struct {
SoftTTL time.Duration
HardTTL time.Duration
JitterPercent int // Percentage of TTL to use as jitter range (e.g., 10 for ±10%)
}
// DefaultTemporalityCacheConfig returns default cache configuration
func DefaultTemporalityCacheConfig() TemporalityCacheConfig {
return TemporalityCacheConfig{
SoftTTL: 30 * time.Minute, // Fresh data threshold
HardTTL: 240 * time.Minute, // Maximum cache lifetime
JitterPercent: 20, // 20% jitter
}
}
// NewTemporalityCache creates a new temporality cache
func NewTemporalityCache(
settings factory.ProviderSettings,
cache cache.Cache,
config TemporalityCacheConfig,
refreshCallback func(ctx context.Context, orgID valuer.UUID, metricName string) (metrictypes.Temporality, error),
) *TemporalityCache {
cacheSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetadata/temporality_cache")
return &TemporalityCache{
cache: cache,
logger: cacheSettings.Logger(),
softTTL: config.SoftTTL,
hardTTL: config.HardTTL,
jitterPercent: config.JitterPercent,
refreshCallback: refreshCallback,
}
}
// SetRefreshMultiCallback sets the batch refresh callback
func (tc *TemporalityCache) SetRefreshMultiCallback(callback func(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]metrictypes.Temporality, error)) {
tc.refreshMultiCallback = callback
}
func (tc *TemporalityCache) generateCacheKey(metricName string) string {
return fmt.Sprintf("temporality:metric:%s", metricName)
}
func (tc *TemporalityCache) applyJitter(duration time.Duration) time.Duration {
if tc.jitterPercent <= 0 {
return duration
}
jitterRange := int64(float64(duration.Nanoseconds()) * float64(tc.jitterPercent) / 100.0)
jitter := rand.Int63n(2*jitterRange) - jitterRange
result := duration + time.Duration(jitter)
if result < 0 {
return 0
}
return result
}
func (tc *TemporalityCache) Get(ctx context.Context, orgID valuer.UUID, metricName string) (metrictypes.Temporality, error) {
cacheKey := tc.generateCacheKey(metricName)
var entry TemporalityCacheEntry
err := tc.cache.Get(ctx, orgID, cacheKey, &entry, false)
if err != nil {
if !errors.Ast(err, errors.TypeNotFound) {
tc.logger.ErrorContext(ctx, "error getting cached temporality", "metric", metricName, "error", err)
}
temporality, err := tc.refreshCallback(ctx, orgID, metricName)
if err != nil {
return metrictypes.Unknown, err
}
tc.put(ctx, orgID, metricName, temporality)
return temporality, nil
}
age := time.Since(entry.CachedAt)
if age < entry.SoftTTL {
tc.logger.DebugContext(ctx, "returning fresh cached temporality",
"metric", metricName,
"age", age,
"soft_ttl", entry.SoftTTL)
return entry.Temporality, nil
}
if age < entry.HardTTL {
tc.logger.DebugContext(ctx, "returning stale cached temporality and triggering refresh",
"metric", metricName,
"age", age,
"soft_ttl", entry.SoftTTL,
"hard_ttl", entry.HardTTL)
tc.triggerBackgroundRefresh(ctx, orgID, metricName)
return entry.Temporality, nil
}
tc.logger.DebugContext(ctx, "cached temporality exceeded hard TTL, fetching fresh",
"metric", metricName,
"age", age,
"hard_ttl", entry.HardTTL)
temporality, err := tc.refreshCallback(ctx, orgID, metricName)
if err != nil {
// when refresh fails and we have stale data, return it as fallback
if entry.Temporality != metrictypes.Unknown {
tc.logger.WarnContext(ctx, "failed to refresh temporality, returning stale data",
"metric", metricName,
"error", err)
return entry.Temporality, nil
}
return metrictypes.Unknown, err
}
tc.put(ctx, orgID, metricName, temporality)
return temporality, nil
}
func (tc *TemporalityCache) GetMulti(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]metrictypes.Temporality, error) {
result := make(map[string]metrictypes.Temporality)
var missedMetrics []string
staleMetrics := make(map[string]metrictypes.Temporality)
// for each metric, check if it fresh, stale + valid, or invalid/missing
// then trigger fetch for all missing metrics
for _, metricName := range metricNames {
cacheKey := tc.generateCacheKey(metricName)
var entry TemporalityCacheEntry
err := tc.cache.Get(ctx, orgID, cacheKey, &entry, false)
if err != nil {
missedMetrics = append(missedMetrics, metricName)
continue
}
age := time.Since(entry.CachedAt)
if age < entry.SoftTTL {
result[metricName] = entry.Temporality
} else if age < entry.HardTTL {
result[metricName] = entry.Temporality
staleMetrics[metricName] = entry.Temporality
} else {
missedMetrics = append(missedMetrics, metricName)
}
}
for metricName := range staleMetrics {
tc.triggerBackgroundRefresh(ctx, orgID, metricName)
}
if len(missedMetrics) > 0 {
temporalities, err := tc.refreshMulti(ctx, orgID, missedMetrics)
if err != nil {
return result, err
}
for metricName, temporality := range temporalities {
tc.put(ctx, orgID, metricName, temporality)
result[metricName] = temporality
}
}
return result, nil
}
func (tc *TemporalityCache) put(ctx context.Context, orgID valuer.UUID, metricName string, temporality metrictypes.Temporality) {
entry := TemporalityCacheEntry{
Temporality: temporality,
CachedAt: time.Now(),
SoftTTL: tc.applyJitter(tc.softTTL),
HardTTL: tc.applyJitter(tc.hardTTL),
}
cacheKey := tc.generateCacheKey(metricName)
if err := tc.cache.Set(ctx, orgID, cacheKey, &entry, entry.HardTTL); err != nil {
tc.logger.ErrorContext(ctx, "failed to cache temporality",
"metric", metricName,
"error", err)
}
}
func (tc *TemporalityCache) triggerBackgroundRefresh(_ context.Context, orgID valuer.UUID, metricName string) {
if _, loading := tc.refreshing.LoadOrStore(metricName, true); loading {
return
}
go func() {
defer tc.refreshing.Delete(metricName)
refreshCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
tc.logger.DebugContext(refreshCtx, "starting background refresh", "metric", metricName)
temporality, err := tc.refreshCallback(refreshCtx, orgID, metricName)
if err != nil {
tc.logger.ErrorContext(refreshCtx, "background refresh failed",
"metric", metricName,
"error", err)
return
}
tc.put(refreshCtx, orgID, metricName, temporality)
tc.logger.DebugContext(refreshCtx, "background refresh completed", "metric", metricName)
}()
}
func (tc *TemporalityCache) refreshMulti(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]metrictypes.Temporality, error) {
if tc.refreshMultiCallback != nil {
return tc.refreshMultiCallback(ctx, orgID, metricNames)
}
result := make(map[string]metrictypes.Temporality)
for _, metricName := range metricNames {
temporality, err := tc.refreshCallback(ctx, orgID, metricName)
if err != nil {
tc.logger.ErrorContext(ctx, "failed to refresh temporality",
"metric", metricName,
"error", err)
result[metricName] = metrictypes.Unknown
continue
}
result[metricName] = temporality
}
return result, nil
}

View File

@@ -0,0 +1,67 @@
package telemetrymetadata
import (
"testing"
"time"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/stretchr/testify/assert"
)
func TestTemporalityCache_Jitter(t *testing.T) {
config := TemporalityCacheConfig{
SoftTTL: 1 * time.Minute,
HardTTL: 5 * time.Minute,
JitterPercent: 20,
}
tc := &TemporalityCache{
jitterPercent: config.JitterPercent,
}
// Test jitter produces different values
ttlValues := make(map[time.Duration]bool)
for i := 0; i < 10; i++ {
jittered := tc.applyJitter(config.SoftTTL)
ttlValues[jittered] = true
// Check jitter is within expected range
minTTL := time.Duration(float64(config.SoftTTL) * 0.8)
maxTTL := time.Duration(float64(config.SoftTTL) * 1.2)
assert.GreaterOrEqual(t, jittered, minTTL)
assert.LessOrEqual(t, jittered, maxTTL)
}
// Should have multiple different values due to jitter
assert.Greater(t, len(ttlValues), 1, "Jitter should produce different TTL values")
}
func TestTemporalityCacheConfig_Default(t *testing.T) {
config := DefaultTemporalityCacheConfig()
assert.Equal(t, 5*time.Minute, config.SoftTTL)
assert.Equal(t, 30*time.Minute, config.HardTTL)
assert.Equal(t, 10, config.JitterPercent)
}
func TestTemporalityCacheEntry_Serialization(t *testing.T) {
entry := TemporalityCacheEntry{
Temporality: metrictypes.Delta,
CachedAt: time.Now(),
SoftTTL: 5 * time.Minute,
HardTTL: 30 * time.Minute,
}
// Test marshaling
data, err := entry.MarshalBinary()
assert.NoError(t, err)
assert.NotEmpty(t, data)
// Test unmarshaling
var decoded TemporalityCacheEntry
err = decoded.UnmarshalBinary(data)
assert.NoError(t, err)
assert.Equal(t, entry.Temporality, decoded.Temporality)
assert.Equal(t, entry.SoftTTL, decoded.SoftTTL)
assert.Equal(t, entry.HardTTL, decoded.HardTTL)
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// MetadataStore is the interface for the telemetry metadata store.
@@ -26,8 +27,8 @@ type MetadataStore interface {
GetAllValues(ctx context.Context, fieldValueSelector *FieldValueSelector) (*TelemetryFieldValues, bool, error)
// FetchTemporality fetches the temporality for metric
FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error)
FetchTemporality(ctx context.Context, orgID valuer.UUID, metricName string) (metrictypes.Temporality, error)
// FetchTemporalityMulti fetches the temporality for multiple metrics
FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error)
FetchTemporalityMulti(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]metrictypes.Temporality, error)
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// MockMetadataStore implements the MetadataStore interface for testing purposes
@@ -258,7 +259,7 @@ func (m *MockMetadataStore) SetAllValues(lookupKey string, values *telemetrytype
}
// FetchTemporality fetches the temporality for a metric
func (m *MockMetadataStore) FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error) {
func (m *MockMetadataStore) FetchTemporality(ctx context.Context, orgID valuer.UUID, metricName string) (metrictypes.Temporality, error) {
if temporality, exists := m.TemporalityMap[metricName]; exists {
return temporality, nil
}
@@ -266,7 +267,7 @@ func (m *MockMetadataStore) FetchTemporality(ctx context.Context, metricName str
}
// FetchTemporalityMulti fetches the temporality for multiple metrics
func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]metrictypes.Temporality, error) {
result := make(map[string]metrictypes.Temporality)
for _, metricName := range metricNames {