feat: add bounded cache for opaque tokenizer only for last observed at cache (#9581)

Move away from unbounded cache for `lastObservedAt` stat, which was powered by BigCache (unbounded), to Ristretto, a bounded in-memory cache (https://github.com/dgraph-io/ristretto).

This PR is first step towards moving away from unbounded caches in the system, more PRs to follow.
This commit is contained in:
Karan Balani
2025-11-14 21:23:57 +05:30
committed by GitHub
parent 1a193015a7
commit 54b67d9cfd
3 changed files with 68 additions and 72 deletions

2
go.mod
View File

@@ -9,11 +9,11 @@ require (
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd
github.com/SigNoz/signoz-otel-collector v0.129.4
github.com/allegro/bigcache/v3 v3.1.0
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/antonmedv/expr v1.15.3
github.com/cespare/xxhash/v2 v2.3.0
github.com/coreos/go-oidc/v3 v3.14.1
github.com/dgraph-io/ristretto/v2 v2.3.0
github.com/dustin/go-humanize v1.0.1
github.com/go-co-op/gocron v1.30.1
github.com/go-openapi/runtime v0.28.0

6
go.sum
View File

@@ -118,8 +118,6 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b h1:mimo19zliBX/vSQ6PWWSL9lK8qwHozUj03+zLoEB8O0=
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b/go.mod h1:fvzegU4vN3H1qMT+8wDmzjAcDONcgo2/SZ/TyfdUOFs=
github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3bSzwk=
github.com/allegro/bigcache/v3 v3.1.0/go.mod h1:aPyh7jEvrog9zAwx5N7+JUQX5dZTSGpxF1LAR4dr35I=
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
@@ -211,6 +209,10 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE=
github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA=
github.com/dgraph-io/ristretto/v2 v2.3.0 h1:qTQ38m7oIyd4GAed/QkUZyPFNMnvVWyazGXRwvOt5zk=
github.com/dgraph-io/ristretto/v2 v2.3.0/go.mod h1:gpoRV3VzrEY1a9dWAYV6T1U7YzfgttXdd/ZzL1s9OZM=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/digitalocean/godo v1.144.0 h1:rDCsmpwcDe5egFQ3Ae45HTde685/GzX037mWRMPufW0=

View File

@@ -3,7 +3,6 @@ package opaquetokenizer
import (
"context"
"slices"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/cache"
@@ -11,10 +10,11 @@ import (
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/tokenizer"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/cachetypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/allegro/bigcache/v3"
"github.com/dgraph-io/ristretto/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
@@ -23,6 +23,10 @@ var (
emptyOrgID valuer.UUID = valuer.UUID{}
)
const (
expectedLastObservedAtCacheEntries int64 = 5000 // 1000 users * Max 5 tokens per user
)
type provider struct {
config tokenizer.Config
settings factory.ScopedProviderSettings
@@ -30,7 +34,7 @@ type provider struct {
tokenStore authtypes.TokenStore
orgGetter organization.Getter
stopC chan struct{}
lastObservedAtCache *bigcache.BigCache
lastObservedAtCache *ristretto.Cache[string, time.Time]
}
func NewFactory(cache cache.Cache, tokenStore authtypes.TokenStore, orgGetter organization.Getter) factory.ProviderFactory[tokenizer.Tokenizer, tokenizer.Config] {
@@ -42,11 +46,12 @@ func NewFactory(cache cache.Cache, tokenStore authtypes.TokenStore, orgGetter or
func New(ctx context.Context, providerSettings factory.ProviderSettings, config tokenizer.Config, cache cache.Cache, tokenStore authtypes.TokenStore, orgGetter organization.Getter) (tokenizer.Tokenizer, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/tokenizer/opaquetokenizer")
lastObservedAtCache, err := bigcache.New(ctx, bigcache.Config{
Shards: 1024,
LifeWindow: config.Lifetime.Max,
CleanWindow: config.Opaque.GC.Interval,
StatsEnabled: false,
// * move these hardcoded values to a config based value when needed
lastObservedAtCache, err := ristretto.NewCache(&ristretto.Config[string, time.Time]{
NumCounters: 10 * expectedLastObservedAtCacheEntries, // 10x of expected entries
MaxCost: 1 << 19, // ~ 512 KB
BufferItems: 64,
Metrics: false,
})
if err != nil {
return nil, err
@@ -74,14 +79,23 @@ func (provider *provider) Start(ctx context.Context) error {
case <-ticker.C:
ctx, span := provider.settings.Tracer().Start(ctx, "tokenizer.GC", trace.WithAttributes(attribute.String("tokenizer.provider", provider.config.Provider)))
if err := provider.gc(ctx); err != nil {
span.RecordError(err)
provider.settings.Logger().ErrorContext(ctx, "failed to garbage collect tokens", "error", err)
orgs, err := provider.orgGetter.ListByOwnedKeyRange(ctx)
if err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to get orgs data", "error", err)
span.End()
continue
}
if err := provider.flushLastObservedAt(ctx); err != nil {
span.RecordError(err)
provider.settings.Logger().ErrorContext(ctx, "failed to flush tokens", "error", err)
for _, org := range orgs {
if err := provider.gc(ctx, org); err != nil {
span.RecordError(err)
provider.settings.Logger().ErrorContext(ctx, "failed to garbage collect tokens", "error", err, "org_id", org.ID)
}
if err := provider.flushLastObservedAt(ctx, org); err != nil {
span.RecordError(err)
provider.settings.Logger().ErrorContext(ctx, "failed to flush tokens", "error", err, "org_id", org.ID)
}
}
span.End()
@@ -208,15 +222,23 @@ func (provider *provider) DeleteIdentity(ctx context.Context, userID valuer.UUID
func (provider *provider) Stop(ctx context.Context) error {
close(provider.stopC)
// garbage collect tokens on stop
if err := provider.gc(ctx); err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to garbage collect tokens", "error", err)
orgs, err := provider.orgGetter.ListByOwnedKeyRange(ctx)
if err != nil {
return err
}
// flush tokens on stop
if err := provider.flushLastObservedAt(ctx); err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to flush tokens", "error", err)
for _, org := range orgs {
// garbage collect tokens on stop
if err := provider.gc(ctx, org); err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to garbage collect tokens", "error", err, "org_id", org.ID)
}
// flush tokens on stop
if err := provider.flushLastObservedAt(ctx, org); err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to flush tokens", "error", err, "org_id", org.ID)
}
}
return nil
}
@@ -231,8 +253,8 @@ func (provider *provider) SetLastObservedAt(ctx context.Context, accessToken str
return nil
}
if err := provider.lastObservedAtCache.Set(lastObservedAtCacheKey(accessToken, token.UserID), []byte(lastObservedAt.Format(time.RFC3339))); err != nil {
return err
if ok := provider.lastObservedAtCache.Set(lastObservedAtCacheKey(accessToken, token.UserID), lastObservedAt, 24); !ok {
provider.settings.Logger().ErrorContext(ctx, "error caching last observed at timestamp", "user_id", token.UserID)
}
err = provider.cache.Set(ctx, emptyOrgID, accessTokenCacheKey(accessToken), token, provider.config.Lifetime.Max)
@@ -256,7 +278,7 @@ func (provider *provider) Collect(ctx context.Context, orgID valuer.UUID) (map[s
stats := make(map[string]any)
stats["auth_token.count"] = len(tokens)
accessTokenToLastObservedAt, err := provider.listLastObservedAtDesc()
accessTokenToLastObservedAt, err := provider.listLastObservedAtDesc(ctx, orgID)
if err != nil {
return nil, err
}
@@ -278,7 +300,7 @@ func (provider *provider) Collect(ctx context.Context, orgID valuer.UUID) (map[s
}
func (provider *provider) ListMaxLastObservedAtByOrgID(ctx context.Context, orgID valuer.UUID) (map[valuer.UUID]time.Time, error) {
accessTokenToLastObservedAts, err := provider.listLastObservedAtDesc()
accessTokenToLastObservedAts, err := provider.listLastObservedAtDesc(ctx, orgID)
if err != nil {
return nil, err
}
@@ -314,18 +336,8 @@ func (provider *provider) ListMaxLastObservedAtByOrgID(ctx context.Context, orgI
}
func (provider *provider) gc(ctx context.Context) error {
orgs, err := provider.orgGetter.ListByOwnedKeyRange(ctx)
if err != nil {
return err
}
orgIDs := make([]valuer.UUID, 0, len(orgs))
for _, org := range orgs {
orgIDs = append(orgIDs, org.ID)
}
tokens, err := provider.tokenStore.ListByOrgIDs(ctx, orgIDs)
func (provider *provider) gc(ctx context.Context, org *types.Organization) error {
tokens, err := provider.tokenStore.ListByOrgID(ctx, org.ID)
if err != nil {
return err
}
@@ -347,8 +359,8 @@ func (provider *provider) gc(ctx context.Context) error {
return nil
}
func (provider *provider) flushLastObservedAt(ctx context.Context) error {
accessTokenToLastObservedAt, err := provider.listLastObservedAtDesc()
func (provider *provider) flushLastObservedAt(ctx context.Context, org *types.Organization) error {
accessTokenToLastObservedAt, err := provider.listLastObservedAtDesc(ctx, org.ID)
if err != nil {
return err
}
@@ -430,32 +442,23 @@ func (provider *provider) getOrGetSetIdentity(ctx context.Context, userID valuer
return identity, nil
}
func (provider *provider) listLastObservedAtDesc() ([]map[string]any, error) {
iterator := provider.lastObservedAtCache.Iterator()
func (provider *provider) listLastObservedAtDesc(ctx context.Context, orgID valuer.UUID) ([]map[string]any, error) {
tokens, err := provider.tokenStore.ListByOrgID(ctx, orgID)
if err != nil {
return nil, err
}
var accessTokenToLastObservedAt []map[string]any
for iterator.SetNext() {
value, err := iterator.Value()
if err != nil {
return nil, err
for _, token := range tokens {
tokenCachedLastObservedAt, ok := provider.lastObservedAtCache.Get(lastObservedAtCacheKey(token.AccessToken, token.UserID))
if ok {
accessTokenToLastObservedAt = append(accessTokenToLastObservedAt, map[string]any{
"user_id": token.UserID,
"access_token": token.AccessToken,
"last_observed_at": tokenCachedLastObservedAt,
})
}
accessToken, userID, err := accessTokenAndUserIDFromLastObservedAtCacheKey(value.Key())
if err != nil {
return nil, err
}
lastObservedAt, err := time.Parse(time.RFC3339, string(value.Value()))
if err != nil {
return nil, err
}
accessTokenToLastObservedAt = append(accessTokenToLastObservedAt, map[string]any{
"user_id": userID,
"access_token": accessToken,
"last_observed_at": lastObservedAt,
})
}
// sort by descending order of last_observed_at
@@ -477,12 +480,3 @@ func identityCacheKey(userID valuer.UUID) string {
func lastObservedAtCacheKey(accessToken string, userID valuer.UUID) string {
return "access_token::" + accessToken + "::" + userID.String()
}
func accessTokenAndUserIDFromLastObservedAtCacheKey(key string) (string, valuer.UUID, error) {
parts := strings.Split(key, "::")
if len(parts) != 3 {
return "", valuer.UUID{}, errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid last observed at cache key")
}
return parts[1], valuer.MustNewUUID(parts[2]), nil
}