Compare commits

...

5 Commits

Author SHA1 Message Date
vikrantgupta25
3fe6aa9fdf feat(cache): add deprecated flag 2025-05-01 12:39:17 +05:30
vikrantgupta25
caaab625cc feat(cache): cleanuo the interface pt1 2025-05-01 01:25:59 +05:30
vikrantgupta25
8160e1a499 feat(cache): add query_cache tests 2025-05-01 00:08:20 +05:30
Vikrant Gupta
fcf633b397 Merge branch 'main' into feat/multi-tenant-cache 2025-04-30 23:39:24 +05:30
vikrantgupta25
ef36f1e84a feat(cache): remove the old cache implementation and usages 2025-04-30 23:38:31 +05:30
37 changed files with 488 additions and 1108 deletions

View File

@@ -5,7 +5,7 @@ import (
"math"
"time"
"github.com/SigNoz/signoz/pkg/query-service/cache"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/postprocess"

View File

@@ -17,7 +17,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
"github.com/SigNoz/signoz/pkg/query-service/cache"
baseint "github.com/SigNoz/signoz/pkg/query-service/interfaces"
basemodel "github.com/SigNoz/signoz/pkg/query-service/model"
rules "github.com/SigNoz/signoz/pkg/query-service/rules"
@@ -38,7 +37,6 @@ type APIHandlerOptions struct {
IntegrationsController *integrations.Controller
CloudIntegrationsController *cloudintegrations.Controller
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
Cache cache.Cache
Gateway *httputil.ReverseProxy
GatewayUrl string
// Querier Influx Interval
@@ -64,7 +62,6 @@ func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler,
IntegrationsController: opts.IntegrationsController,
CloudIntegrationsController: opts.CloudIntegrationsController,
LogsParsingPipelineController: opts.LogsParsingPipelineController,
Cache: opts.Cache,
FluxInterval: opts.FluxInterval,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
FieldsAPI: fields.NewAPI(signoz.TelemetryStore),

View File

@@ -85,25 +85,25 @@ func (aH *APIHandler) queryRangeV4(w http.ResponseWriter, r *http.Request) {
switch seasonality {
case anomaly.SeasonalityWeekly:
provider = anomaly.NewWeeklyProvider(
anomaly.WithCache[*anomaly.WeeklyProvider](aH.opts.Cache),
anomaly.WithCache[*anomaly.WeeklyProvider](aH.Signoz.Cache),
anomaly.WithKeyGenerator[*anomaly.WeeklyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.WeeklyProvider](aH.opts.DataConnector),
)
case anomaly.SeasonalityDaily:
provider = anomaly.NewDailyProvider(
anomaly.WithCache[*anomaly.DailyProvider](aH.opts.Cache),
anomaly.WithCache[*anomaly.DailyProvider](aH.Signoz.Cache),
anomaly.WithKeyGenerator[*anomaly.DailyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.DailyProvider](aH.opts.DataConnector),
)
case anomaly.SeasonalityHourly:
provider = anomaly.NewHourlyProvider(
anomaly.WithCache[*anomaly.HourlyProvider](aH.opts.Cache),
anomaly.WithCache[*anomaly.HourlyProvider](aH.Signoz.Cache),
anomaly.WithKeyGenerator[*anomaly.HourlyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.HourlyProvider](aH.opts.DataConnector),
)
default:
provider = anomaly.NewDailyProvider(
anomaly.WithCache[*anomaly.DailyProvider](aH.opts.Cache),
anomaly.WithCache[*anomaly.DailyProvider](aH.Signoz.Cache),
anomaly.WithKeyGenerator[*anomaly.DailyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.DailyProvider](aH.opts.DataConnector),
)

View File

@@ -19,6 +19,7 @@ import (
"github.com/SigNoz/signoz/ee/query-service/integrations/gateway"
"github.com/SigNoz/signoz/ee/query-service/rules"
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/http/middleware"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/signoz"
@@ -41,7 +42,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
"github.com/SigNoz/signoz/pkg/query-service/app/opamp"
opAmpModel "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
"github.com/SigNoz/signoz/pkg/query-service/cache"
baseconst "github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/query-service/healthcheck"
baseint "github.com/SigNoz/signoz/pkg/query-service/interfaces"
@@ -57,7 +57,6 @@ type ServerOptions struct {
HTTPHostPort string
PrivateHostPort string
PreferSpanMetrics bool
CacheConfigPath string
FluxInterval string
FluxIntervalForTraceDetail string
Cluster string
@@ -134,19 +133,10 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.SigNoz.Cache,
)
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
if err != nil {
return nil, err
}
c = cache.NewCache(cacheOpts)
}
rm, err := makeRulesManager(
serverOptions.SigNoz.SQLStore.SQLxDB(),
reader,
c,
serverOptions.SigNoz.Cache,
serverOptions.SigNoz.Alertmanager,
serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore,
@@ -223,7 +213,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
IntegrationsController: integrationsController,
CloudIntegrationsController: cloudIntegrationsController,
LogsParsingPipelineController: logParsingPipelineController,
Cache: c,
FluxInterval: fluxInterval,
Gateway: gatewayProxy,
GatewayUrl: serverOptions.GatewayUrl,

View File

@@ -138,7 +138,6 @@ func main() {
HTTPHostPort: baseconst.HTTPHostPort,
PreferSpanMetrics: preferSpanMetrics,
PrivateHostPort: baseconst.PrivateHostPort,
CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval,
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
Cluster: cluster,

View File

@@ -12,7 +12,7 @@ import (
"go.uber.org/zap"
"github.com/SigNoz/signoz/ee/query-service/anomaly"
"github.com/SigNoz/signoz/pkg/query-service/cache"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/model"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"

19
pkg/cache/cache.go vendored
View File

@@ -6,6 +6,8 @@ import (
"fmt"
"reflect"
"time"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
)
// cacheable entity
@@ -61,11 +63,14 @@ func (s RetrieveStatus) String() string {
// cache interface
type Cache interface {
Connect(ctx context.Context) error
Store(ctx context.Context, cacheKey string, data CacheableEntity, ttl time.Duration) error
Retrieve(ctx context.Context, cacheKey string, dest CacheableEntity, allowExpired bool) (RetrieveStatus, error)
SetTTL(ctx context.Context, cacheKey string, ttl time.Duration)
Remove(ctx context.Context, cacheKey string)
BulkRemove(ctx context.Context, cacheKeys []string)
Close(ctx context.Context) error
Set(ctx context.Context, orgID string, cacheKey string, data CacheableEntity, ttl time.Duration) error
Get(ctx context.Context, orgID string, cacheKey string, dest CacheableEntity, allowExpired bool) (RetrieveStatus, error)
Delete(ctx context.Context, orgID string, cacheKey string)
DeleteMany(ctx context.Context, orgID string, cacheKeys []string)
}
type KeyGenerator interface {
// GenerateKeys generates the cache keys for the given query range params
// The keys are returned as a map where the key is the query name and the value is the cache key
GenerateKeys(*v3.QueryRangeParamsV3) map[string]string
}

View File

@@ -23,25 +23,20 @@ func New(ctx context.Context, settings factory.ProviderSettings, config cache.Co
return &provider{cc: go_cache.New(config.Memory.TTL, config.Memory.CleanupInterval)}, nil
}
// Connect does nothing
func (c *provider) Connect(_ context.Context) error {
return nil
}
// Store stores the data in the cache
func (c *provider) Store(_ context.Context, cacheKey string, data cache.CacheableEntity, ttl time.Duration) error {
func (c *provider) Set(_ context.Context, orgID string, cacheKey string, data cache.CacheableEntity, ttl time.Duration) error {
// check if the data being passed is a pointer and is not nil
rv := reflect.ValueOf(data)
if rv.Kind() != reflect.Pointer || rv.IsNil() {
return cache.WrapCacheableEntityErrors(reflect.TypeOf(data), "inmemory")
}
c.cc.Set(cacheKey, data, ttl)
c.cc.Set(fmt.Sprintf("%s::%s", orgID, cacheKey), data, ttl)
return nil
}
// Retrieve retrieves the data from the cache
func (c *provider) Retrieve(_ context.Context, cacheKey string, dest cache.CacheableEntity, allowExpired bool) (cache.RetrieveStatus, error) {
func (c *provider) Get(_ context.Context, orgID string, cacheKey string, dest cache.CacheableEntity, allowExpired bool) (cache.RetrieveStatus, error) {
// check if the destination being passed is a pointer and is not nil
dstv := reflect.ValueOf(dest)
if dstv.Kind() != reflect.Pointer || dstv.IsNil() {
@@ -53,7 +48,7 @@ func (c *provider) Retrieve(_ context.Context, cacheKey string, dest cache.Cache
return cache.RetrieveStatusError, fmt.Errorf("destination value is not settable, %s", dstv.Elem())
}
data, found := c.cc.Get(cacheKey)
data, found := c.cc.Get(fmt.Sprintf("%s::%s", orgID, cacheKey))
if !found {
return cache.RetrieveStatusKeyMiss, nil
}
@@ -69,33 +64,14 @@ func (c *provider) Retrieve(_ context.Context, cacheKey string, dest cache.Cache
return cache.RetrieveStatusHit, nil
}
// SetTTL sets the TTL for the cache entry
func (c *provider) SetTTL(_ context.Context, cacheKey string, ttl time.Duration) {
item, found := c.cc.Get(cacheKey)
if !found {
return
}
_ = c.cc.Replace(cacheKey, item, ttl)
}
// Remove removes the cache entry
func (c *provider) Remove(_ context.Context, cacheKey string) {
c.cc.Delete(cacheKey)
func (c *provider) Delete(_ context.Context, orgID string, cacheKey string) {
c.cc.Delete(fmt.Sprintf("%s::%s", orgID, cacheKey))
}
// BulkRemove removes the cache entries
func (c *provider) BulkRemove(_ context.Context, cacheKeys []string) {
func (c *provider) DeleteMany(_ context.Context, orgID string, cacheKeys []string) {
for _, cacheKey := range cacheKeys {
c.cc.Delete(cacheKey)
c.cc.Delete(fmt.Sprintf("%s::%s", orgID, cacheKey))
}
}
// Close does nothing
func (c *provider) Close(_ context.Context) error {
return nil
}
// Configuration returns the cache configuration
func (c *provider) Configuration() *cache.Memory {
return nil
}

View File

@@ -14,7 +14,6 @@ import (
type provider struct {
client *redis.Client
opts cache.Redis
}
func NewFactory() factory.ProviderFactory[cache.Cache, cache.Config] {
@@ -22,7 +21,14 @@ func NewFactory() factory.ProviderFactory[cache.Cache, cache.Config] {
}
func New(ctx context.Context, settings factory.ProviderSettings, config cache.Config) (cache.Cache, error) {
return &provider{opts: config.Redis}, nil
provider := new(provider)
redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", config.Redis.Host, config.Redis.Port),
Password: config.Redis.Password,
DB: config.Redis.DB,
})
provider.client = redisClient
return provider, nil
}
// WithClient creates a new cache with the given client
@@ -30,24 +36,14 @@ func WithClient(client *redis.Client) *provider {
return &provider{client: client}
}
// Connect connects to the redis server
func (c *provider) Connect(_ context.Context) error {
c.client = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", c.opts.Host, c.opts.Port),
Password: c.opts.Password,
DB: c.opts.DB,
})
return nil
}
// Store stores the data in the cache
func (c *provider) Store(ctx context.Context, cacheKey string, data cache.CacheableEntity, ttl time.Duration) error {
return c.client.Set(ctx, cacheKey, data, ttl).Err()
func (c *provider) Set(ctx context.Context, orgID string, cacheKey string, data cache.CacheableEntity, ttl time.Duration) error {
return c.client.Set(ctx, fmt.Sprintf("%s::%s", orgID, cacheKey), data, ttl).Err()
}
// Retrieve retrieves the data from the cache
func (c *provider) Retrieve(ctx context.Context, cacheKey string, dest cache.CacheableEntity, allowExpired bool) (cache.RetrieveStatus, error) {
err := c.client.Get(ctx, cacheKey).Scan(dest)
func (c *provider) Get(ctx context.Context, orgID string, cacheKey string, dest cache.CacheableEntity, allowExpired bool) (cache.RetrieveStatus, error) {
err := c.client.Get(ctx, fmt.Sprintf("%s::%s", orgID, cacheKey)).Scan(dest)
if err != nil {
if errors.Is(err, redis.Nil) {
return cache.RetrieveStatusKeyMiss, nil
@@ -57,23 +53,19 @@ func (c *provider) Retrieve(ctx context.Context, cacheKey string, dest cache.Cac
return cache.RetrieveStatusHit, nil
}
// SetTTL sets the TTL for the cache entry
func (c *provider) SetTTL(ctx context.Context, cacheKey string, ttl time.Duration) {
err := c.client.Expire(ctx, cacheKey, ttl).Err()
if err != nil {
zap.L().Error("error setting TTL for cache key", zap.String("cacheKey", cacheKey), zap.Duration("ttl", ttl), zap.Error(err))
}
}
// Remove removes the cache entry
func (c *provider) Remove(ctx context.Context, cacheKey string) {
c.BulkRemove(ctx, []string{cacheKey})
func (c *provider) Delete(ctx context.Context, orgID string, cacheKey string) {
c.DeleteMany(ctx, orgID, []string{cacheKey})
}
// BulkRemove removes the cache entries
func (c *provider) BulkRemove(ctx context.Context, cacheKeys []string) {
func (c *provider) DeleteMany(ctx context.Context, orgID string, cacheKeys []string) {
updatedCacheKeys := []string{}
for _, cacheKey := range cacheKeys {
updatedCacheKeys = append(updatedCacheKeys, fmt.Sprintf("%s::%s", orgID, cacheKey))
}
if err := c.client.Del(ctx, cacheKeys...).Err(); err != nil {
zap.L().Error("error deleting cache keys", zap.Strings("cacheKeys", cacheKeys), zap.Error(err))
zap.L().Error("error deleting cache keys", zap.Strings("cacheKeys", updatedCacheKeys), zap.Error(err))
}
}
@@ -81,40 +73,3 @@ func (c *provider) BulkRemove(ctx context.Context, cacheKeys []string) {
func (c *provider) Close(_ context.Context) error {
return c.client.Close()
}
// Ping pings the redis server
func (c *provider) Ping(ctx context.Context) error {
return c.client.Ping(ctx).Err()
}
// GetClient returns the redis client
func (c *provider) GetClient() *redis.Client {
return c.client
}
// GetTTL returns the TTL for the cache entry
func (c *provider) GetTTL(ctx context.Context, cacheKey string) time.Duration {
ttl, err := c.client.TTL(ctx, cacheKey).Result()
if err != nil {
zap.L().Error("error getting TTL for cache key", zap.String("cacheKey", cacheKey), zap.Error(err))
}
return ttl
}
// GetKeys returns the keys matching the pattern
func (c *provider) GetKeys(ctx context.Context, pattern string) ([]string, error) {
return c.client.Keys(ctx, pattern).Result()
}
// GetKeysWithTTL returns the keys matching the pattern with their TTL
func (c *provider) GetKeysWithTTL(ctx context.Context, pattern string) (map[string]time.Duration, error) {
keys, err := c.GetKeys(ctx, pattern)
if err != nil {
return nil, err
}
result := make(map[string]time.Duration)
for _, key := range keys {
result[key] = c.GetTTL(ctx, key)
}
return result, nil
}

View File

@@ -795,9 +795,9 @@ func (r *ClickHouseReader) GetSpansForTrace(ctx context.Context, traceID string,
return searchScanResponses, nil
}
func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadataCache(ctx context.Context, traceID string) (*model.GetWaterfallSpansForTraceWithMetadataCache, error) {
func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadataCache(ctx context.Context, orgID string, traceID string) (*model.GetWaterfallSpansForTraceWithMetadataCache, error) {
cachedTraceData := new(model.GetWaterfallSpansForTraceWithMetadataCache)
cacheStatus, err := r.cache.Retrieve(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), cachedTraceData, false)
cacheStatus, err := r.cache.Get(ctx, orgID, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), cachedTraceData, false)
if err != nil {
zap.L().Debug("error in retrieving getWaterfallSpansForTraceWithMetadata cache", zap.Error(err), zap.String("traceID", traceID))
return nil, err
@@ -816,7 +816,7 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadataCache(ctx contex
return cachedTraceData, nil
}
func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Context, traceID string, req *model.GetWaterfallSpansForTraceWithMetadataParams) (*model.GetWaterfallSpansForTraceWithMetadataResponse, *model.ApiError) {
func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Context, orgID string, traceID string, req *model.GetWaterfallSpansForTraceWithMetadataParams) (*model.GetWaterfallSpansForTraceWithMetadataResponse, *model.ApiError) {
response := new(model.GetWaterfallSpansForTraceWithMetadataResponse)
var startTime, endTime, durationNano, totalErrorSpans, totalSpans uint64
var spanIdToSpanNodeMap = map[string]*model.Span{}
@@ -826,7 +826,7 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con
var hasMissingSpans bool
claims, errv2 := authtypes.ClaimsFromContext(ctx)
cachedTraceData, err := r.GetWaterfallSpansForTraceWithMetadataCache(ctx, traceID)
cachedTraceData, err := r.GetWaterfallSpansForTraceWithMetadataCache(ctx, orgID, traceID)
if err == nil {
startTime = cachedTraceData.StartTime
endTime = cachedTraceData.EndTime
@@ -984,7 +984,7 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con
}
zap.L().Info("getWaterfallSpansForTraceWithMetadata: processing pre cache", zap.Duration("duration", time.Since(processingBeforeCache)), zap.String("traceID", traceID))
cacheErr := r.cache.Store(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), &traceCache, time.Minute*5)
cacheErr := r.cache.Set(ctx, orgID, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), &traceCache, time.Minute*5)
if cacheErr != nil {
zap.L().Debug("failed to store cache for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID), zap.Error(err))
}
@@ -1007,9 +1007,9 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con
return response, nil
}
func (r *ClickHouseReader) GetFlamegraphSpansForTraceCache(ctx context.Context, traceID string) (*model.GetFlamegraphSpansForTraceCache, error) {
func (r *ClickHouseReader) GetFlamegraphSpansForTraceCache(ctx context.Context, orgID string, traceID string) (*model.GetFlamegraphSpansForTraceCache, error) {
cachedTraceData := new(model.GetFlamegraphSpansForTraceCache)
cacheStatus, err := r.cache.Retrieve(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), cachedTraceData, false)
cacheStatus, err := r.cache.Get(ctx, orgID, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), cachedTraceData, false)
if err != nil {
zap.L().Debug("error in retrieving getFlamegraphSpansForTrace cache", zap.Error(err), zap.String("traceID", traceID))
return nil, err
@@ -1028,7 +1028,7 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTraceCache(ctx context.Context,
return cachedTraceData, nil
}
func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError) {
func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID string, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError) {
trace := new(model.GetFlamegraphSpansForTraceResponse)
var startTime, endTime, durationNano uint64
var spanIdToSpanNodeMap = map[string]*model.FlamegraphSpan{}
@@ -1037,7 +1037,7 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace
var traceRoots []*model.FlamegraphSpan
// get the trace tree from cache!
cachedTraceData, err := r.GetFlamegraphSpansForTraceCache(ctx, traceID)
cachedTraceData, err := r.GetFlamegraphSpansForTraceCache(ctx, orgID, traceID)
if err == nil {
startTime = cachedTraceData.StartTime
@@ -1136,7 +1136,7 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace
}
zap.L().Info("getFlamegraphSpansForTrace: processing pre cache", zap.Duration("duration", time.Since(processingBeforeCache)), zap.String("traceID", traceID))
cacheErr := r.cache.Store(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), &traceCache, time.Minute*5)
cacheErr := r.cache.Set(ctx, orgID, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), &traceCache, time.Minute*5)
if cacheErr != nil {
zap.L().Debug("failed to store cache for getFlamegraphSpansForTrace", zap.String("traceID", traceID), zap.Error(err))
}
@@ -5187,7 +5187,7 @@ func (r *ClickHouseReader) GetActiveTimeSeriesForMetricName(ctx context.Context,
return timeSeries, nil
}
func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) {
func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, orgID string, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) {
var args []interface{}
// Build filter conditions (if any)
@@ -5365,7 +5365,7 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_
}
//get updated metrics data
batch, apiError := r.GetUpdatedMetricsMetadata(ctx, metricNames...)
batch, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricNames...)
if apiError != nil {
zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
}
@@ -6022,18 +6022,18 @@ LIMIT 40`, // added rand to get diff value every time we run this query
return fingerprints, nil
}
func (r *ClickHouseReader) DeleteMetricsMetadata(ctx context.Context, metricName string) *model.ApiError {
func (r *ClickHouseReader) DeleteMetricsMetadata(ctx context.Context, orgID string, metricName string) *model.ApiError {
delQuery := fmt.Sprintf(`ALTER TABLE %s.%s DELETE WHERE metric_name = ?;`, signozMetricDBName, signozUpdatedMetricsMetadataLocalTable)
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
err := r.db.Exec(valueCtx, delQuery, metricName)
if err != nil {
return &model.ApiError{Typ: "ClickHouseError", Err: err}
}
r.cache.Remove(ctx, constants.UpdatedMetricsMetadataCachePrefix+metricName)
r.cache.Delete(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+metricName)
return nil
}
func (r *ClickHouseReader) UpdateMetricsMetadata(ctx context.Context, req *model.UpdateMetricsMetadata) *model.ApiError {
func (r *ClickHouseReader) UpdateMetricsMetadata(ctx context.Context, orgID string, req *model.UpdateMetricsMetadata) *model.ApiError {
if req.MetricType == v3.MetricTypeHistogram {
labels := []string{"le"}
hasLabels, apiError := r.CheckForLabelsInMetric(ctx, req.MetricName, labels)
@@ -6062,7 +6062,7 @@ func (r *ClickHouseReader) UpdateMetricsMetadata(ctx context.Context, req *model
}
}
apiErr := r.DeleteMetricsMetadata(ctx, req.MetricName)
apiErr := r.DeleteMetricsMetadata(ctx, orgID, req.MetricName)
if apiErr != nil {
return apiErr
}
@@ -6073,7 +6073,7 @@ VALUES ( ?, ?, ?, ?, ?, ?, ?);`, signozMetricDBName, signozUpdatedMetricsMetadat
if err != nil {
return &model.ApiError{Typ: "ClickHouseError", Err: err}
}
err = r.cache.Store(ctx, constants.UpdatedMetricsMetadataCachePrefix+req.MetricName, req, -1)
err = r.cache.Set(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+req.MetricName, req, -1)
if err != nil {
return &model.ApiError{Typ: "CachingErr", Err: err}
}
@@ -6114,7 +6114,7 @@ func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricNam
return hasLE, nil
}
func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context) []error {
func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context, orgID string) []error {
var allMetricsMetadata []model.UpdateMetricsMetadata
var errorList []error
// Fetch all rows from ClickHouse
@@ -6127,7 +6127,7 @@ func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context) []error {
return errorList
}
for _, m := range allMetricsMetadata {
err := r.cache.Store(ctx, constants.UpdatedMetricsMetadataCachePrefix+m.MetricName, &m, -1)
err := r.cache.Set(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+m.MetricName, &m, -1)
if err != nil {
errorList = append(errorList, err)
}
@@ -6136,7 +6136,7 @@ func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context) []error {
return errorList
}
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) {
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID string, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) {
cachedMetadata := make(map[string]*model.UpdateMetricsMetadata)
var missingMetrics []string
@@ -6144,7 +6144,7 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, metric
for _, metricName := range metricNames {
metadata := new(model.UpdateMetricsMetadata)
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metricName
retrieveStatus, err := r.cache.Retrieve(ctx, cacheKey, metadata, true)
retrieveStatus, err := r.cache.Get(ctx, orgID, cacheKey, metadata, true)
if err == nil && retrieveStatus == cache.RetrieveStatusHit {
cachedMetadata[metricName] = metadata
} else {
@@ -6185,7 +6185,7 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, metric
// Cache the result for future requests.
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metadata.MetricName
if cacheErr := r.cache.Store(ctx, cacheKey, metadata, -1); cacheErr != nil {
if cacheErr := r.cache.Set(ctx, orgID, cacheKey, metadata, -1); cacheErr != nil {
zap.L().Error("Failed to store metrics metadata in cache", zap.String("metric_name", metadata.MetricName), zap.Error(cacheErr))
}
cachedMetadata[metadata.MetricName] = metadata

View File

@@ -52,7 +52,6 @@ import (
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
"github.com/SigNoz/signoz/pkg/query-service/auth"
"github.com/SigNoz/signoz/pkg/query-service/cache"
"github.com/SigNoz/signoz/pkg/query-service/contextlinks"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/postprocess"
@@ -168,9 +167,6 @@ type APIHandlerOpts struct {
// Log parsing pipelines
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
// cache
Cache cache.Cache
// Querier Influx Interval
FluxInterval time.Duration
@@ -187,14 +183,14 @@ type APIHandlerOpts struct {
func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
querierOpts := querier.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
Cache: opts.Signoz.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
}
querierOptsV2 := querierV2.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
Cache: opts.Signoz.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
}

View File

@@ -13,7 +13,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/constants"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/postprocess"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes"
"go.uber.org/zap"
)
@@ -106,10 +106,10 @@ func (q *querier) runBuilderQuery(
return
}
misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName])
misses := q.queryCache.FindMissingTimeRanges(ctx, start, end, builderQuery.StepInterval, cacheKeys[queryName])
zap.L().Info("cache misses for logs query", zap.Any("misses", misses))
missedSeries := make([]querycache.CachedSeriesData, 0)
filteredMissedSeries := make([]querycache.CachedSeriesData, 0)
missedSeries := make([]*querybuildertypes.SeriesData, 0)
filteredMissedSeries := make([]*querybuildertypes.SeriesData, 0)
for _, miss := range misses {
query, err = prepareLogsQuery(ctx, miss.Start, miss.End, builderQuery, params)
if err != nil {
@@ -131,7 +131,7 @@ func (q *querier) runBuilderQuery(
// making sure that empty range doesn't doesn't enter the cache
// empty results from filteredSeries means data was filtered out, but empty series means actual empty data
if len(filteredSeries) > 0 || len(series) == 0 {
filteredMissedSeries = append(filteredMissedSeries, querycache.CachedSeriesData{
filteredMissedSeries = append(filteredMissedSeries, &querybuildertypes.SeriesData{
Data: filteredSeries,
Start: startTime,
End: endTime,
@@ -139,17 +139,17 @@ func (q *querier) runBuilderQuery(
}
// for the actual response
missedSeries = append(missedSeries, querycache.CachedSeriesData{
missedSeries = append(missedSeries, &querybuildertypes.SeriesData{
Data: series,
Start: miss.Start,
End: miss.End,
})
}
filteredMergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], filteredMissedSeries)
q.queryCache.StoreSeriesInCache(cacheKeys[queryName], filteredMergedSeries)
filteredMergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(ctx, cacheKeys[queryName], filteredMissedSeries)
q.queryCache.StoreSeriesInCache(ctx, cacheKeys[queryName], filteredMergedSeries)
mergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], missedSeries)
mergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(ctx, cacheKeys[queryName], missedSeries)
resultSeries := common.GetSeriesFromCachedDataV2(mergedSeries, start, end, builderQuery.StepInterval)
@@ -238,9 +238,9 @@ func (q *querier) runBuilderQuery(
}
cacheKey := cacheKeys[queryName]
misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKey)
misses := q.queryCache.FindMissingTimeRanges(ctx, start, end, builderQuery.StepInterval, cacheKey)
zap.L().Info("cache misses for metrics query", zap.Any("misses", misses))
missedSeries := make([]querycache.CachedSeriesData, 0)
missedSeries := make([]*querybuildertypes.SeriesData, 0)
for _, miss := range misses {
query, err := metricsV3.PrepareMetricQuery(
miss.Start,
@@ -269,13 +269,13 @@ func (q *querier) runBuilderQuery(
}
return
}
missedSeries = append(missedSeries, querycache.CachedSeriesData{
missedSeries = append(missedSeries, &querybuildertypes.SeriesData{
Start: miss.Start,
End: miss.End,
Data: series,
})
}
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKey, missedSeries)
mergedSeries := q.queryCache.MergeWithCachedSeriesData(ctx, cacheKey, missedSeries)
resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end)
@@ -314,9 +314,9 @@ func (q *querier) runBuilderExpression(
cacheKey := cacheKeys[queryName]
step := postprocess.StepIntervalForFunction(params, queryName)
misses := q.queryCache.FindMissingTimeRanges(params.Start, params.End, step, cacheKey)
misses := q.queryCache.FindMissingTimeRanges(ctx, params.Start, params.End, step, cacheKey)
zap.L().Info("cache misses for expression query", zap.Any("misses", misses))
missedSeries := make([]querycache.CachedSeriesData, 0)
missedSeries := make([]*querybuildertypes.SeriesData, 0)
for _, miss := range misses {
missQueries, _ := q.builder.PrepareQueries(&v3.QueryRangeParamsV3{
Start: miss.Start,
@@ -332,13 +332,13 @@ func (q *querier) runBuilderExpression(
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
}
missedSeries = append(missedSeries, querycache.CachedSeriesData{
missedSeries = append(missedSeries, &querybuildertypes.SeriesData{
Start: miss.Start,
End: miss.End,
Data: series,
})
}
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKey, missedSeries)
mergedSeries := q.queryCache.MergeWithCachedSeriesData(ctx, cacheKey, missedSeries)
resultSeries := common.GetSeriesFromCachedData(mergedSeries, params.Start, params.End)

View File

@@ -6,6 +6,7 @@ import (
"sync"
"time"
"github.com/SigNoz/signoz/pkg/cache"
logsV4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4"
metricsV3 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v3"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
@@ -13,10 +14,9 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/constants"
chErrors "github.com/SigNoz/signoz/pkg/query-service/errors"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes"
"github.com/SigNoz/signoz/pkg/query-service/cache"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
@@ -68,7 +68,7 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
logsQueryBuilder := logsV4.PrepareLogsQuery
tracesQueryBuilder := tracesV4.PrepareTracesQuery
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
qc := querybuildertypes.NewQueryCache(querybuildertypes.WithCache(opts.Cache), querybuildertypes.WithFluxInterval(opts.FluxInterval))
return &querier{
cache: opts.Cache,
@@ -210,9 +210,9 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: series}
return
}
misses := q.queryCache.FindMissingTimeRanges(params.Start, params.End, params.Step, cacheKey)
misses := q.queryCache.FindMissingTimeRanges(ctx, params.Start, params.End, params.Step, cacheKey)
zap.L().Info("cache misses for metrics prom query", zap.Any("misses", misses))
missedSeries := make([]querycache.CachedSeriesData, 0)
missedSeries := make([]*querybuildertypes.SeriesData, 0)
for _, miss := range misses {
query := metricsV3.BuildPromQuery(promQuery, params.Step, miss.Start, miss.End)
series, err := q.execPromQuery(ctx, query)
@@ -220,13 +220,13 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: nil}
return
}
missedSeries = append(missedSeries, querycache.CachedSeriesData{
missedSeries = append(missedSeries, &querybuildertypes.SeriesData{
Data: series,
Start: miss.Start,
End: miss.End,
})
}
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKey, missedSeries)
mergedSeries := q.queryCache.MergeWithCachedSeriesData(ctx, cacheKey, missedSeries)
resultSeries := common.GetSeriesFromCachedData(mergedSeries, params.Start, params.End)
channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: resultSeries}

View File

@@ -2,7 +2,6 @@ package querier
import (
"context"
"encoding/json"
"fmt"
"math"
"strings"
@@ -10,18 +9,20 @@ import (
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/factory/factorytest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/prometheus/prometheustest"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
"github.com/SigNoz/signoz/pkg/query-service/cache/inmemory"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require"
)
@@ -63,7 +64,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
requestedEnd int64 // in milliseconds
requestedStep int64 // in seconds
cachedSeries []*v3.Series
expectedMiss []querycache.MissInterval
expectedMiss []*querybuildertypes.MissInterval
replaceCachedData bool
}{
{
@@ -88,7 +89,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722,
End: 1675115596722 + 60*60*1000,
@@ -129,7 +130,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
},
},
},
expectedMiss: []querycache.MissInterval{},
expectedMiss: []*querybuildertypes.MissInterval{},
},
{
name: "cached time range is a left overlap of the requested time range",
@@ -157,7 +158,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722 + 120*60*1000,
End: 1675115596722 + 180*60*1000,
@@ -190,7 +191,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722,
End: 1675115596722 + 60*60*1000,
@@ -223,7 +224,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722,
End: 1675115596722 + 180*60*1000,
@@ -233,28 +234,34 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
},
}
c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
qc := querycache.NewQueryCache(querycache.WithCache(c))
opts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
qc := querybuildertypes.NewQueryCache(querybuildertypes.WithCache(c))
for idx, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cacheKey := fmt.Sprintf("test-cache-key-%d", idx)
cachedData := &querycache.CachedSeriesData{
cachedData := &querybuildertypes.SeriesData{
Start: minTimestamp(tc.cachedSeries),
End: maxTimestamp(tc.cachedSeries),
Data: tc.cachedSeries,
}
jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData})
data := querybuildertypes.CachedSeriesData{Series: []*querybuildertypes.SeriesData{cachedData}}
if err != nil {
t.Errorf("error marshalling cached data: %v", err)
}
err = c.Store(cacheKey, jsonData, 5*time.Minute)
err = c.Store(context.Background(), cacheKey, &data, 5*time.Minute)
if err != nil {
t.Errorf("error storing cached data: %v", err)
}
misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey)
misses := qc.FindMissingTimeRanges(context.Background(), tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey)
if len(misses) != len(tc.expectedMiss) {
t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses))
}
@@ -280,7 +287,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) {
requestedStep int64
cachedSeries []*v3.Series
fluxInterval time.Duration
expectedMiss []querycache.MissInterval
expectedMiss []*querybuildertypes.MissInterval
}{
{
name: "cached time range is a subset of the requested time range",
@@ -305,7 +312,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) {
},
},
fluxInterval: 5 * time.Minute,
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722,
End: 1675115596722 + 60*60*1000,
@@ -347,7 +354,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) {
},
},
fluxInterval: 5 * time.Minute,
expectedMiss: []querycache.MissInterval{},
expectedMiss: []*querybuildertypes.MissInterval{},
},
{
name: "cache time range is a left overlap of the requested time range",
@@ -376,7 +383,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) {
},
},
fluxInterval: 5 * time.Minute,
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722 + 120*60*1000,
End: 1675115596722 + 180*60*1000,
@@ -410,7 +417,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) {
},
},
fluxInterval: 5 * time.Minute,
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722,
End: 1675115596722 + 60*60*1000,
@@ -444,7 +451,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) {
},
},
fluxInterval: 5 * time.Minute,
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722,
End: 1675115596722 + 180*60*1000,
@@ -453,27 +460,30 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) {
},
}
c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
qc := querycache.NewQueryCache(querycache.WithCache(c))
opts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
qc := querybuildertypes.NewQueryCache(querybuildertypes.WithCache(c))
for idx, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cacheKey := fmt.Sprintf("test-cache-key-%d", idx)
cachedData := &querycache.CachedSeriesData{
cachedData := &querybuildertypes.SeriesData{
Start: minTimestamp(tc.cachedSeries),
End: maxTimestamp(tc.cachedSeries),
Data: tc.cachedSeries,
}
jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData})
if err != nil {
t.Errorf("error marshalling cached data: %v", err)
}
err = c.Store(cacheKey, jsonData, 5*time.Minute)
data := querybuildertypes.CachedSeriesData{Series: []*querybuildertypes.SeriesData{cachedData}}
err = c.Store(context.Background(), cacheKey, &data, 5*time.Minute)
if err != nil {
t.Errorf("error storing cached data: %v", err)
}
misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey)
misses := qc.FindMissingTimeRanges(context.Background(), tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey)
if len(misses) != len(tc.expectedMiss) {
t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses))
}
@@ -625,9 +635,16 @@ func TestQueryRange(t *testing.T) {
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
cacheOpts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cacheOpts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
opts := QuerierOptions{
Cache: cache,
Cache: c,
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
@@ -736,9 +753,16 @@ func TestQueryRangeValueType(t *testing.T) {
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute})
cacheOpts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cacheOpts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
opts := QuerierOptions{
Cache: cache,
Cache: c,
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
@@ -894,9 +918,16 @@ func TestQueryRangeTimeShiftWithCache(t *testing.T) {
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute})
cacheOpts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cacheOpts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
opts := QuerierOptions{
Cache: cache,
Cache: c,
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
@@ -995,9 +1026,16 @@ func TestQueryRangeTimeShiftWithLimitAndCache(t *testing.T) {
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute})
cacheOpts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cacheOpts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
opts := QuerierOptions{
Cache: cache,
Cache: c,
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
@@ -1067,9 +1105,16 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute})
cacheOpts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cacheOpts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
opts := QuerierOptions{
Cache: cache,
Cache: c,
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
@@ -1094,17 +1139,17 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
expectedQueryAndTimeRanges := []struct {
query string
ranges []querycache.MissInterval
ranges []*querybuildertypes.MissInterval
}{
{
query: "signoz_calls_total",
ranges: []querycache.MissInterval{
ranges: []*querybuildertypes.MissInterval{
{Start: 1675115596722, End: 1675115596722 + 120*60*1000},
},
},
{
query: "signoz_latency_bucket",
ranges: []querycache.MissInterval{
ranges: []*querybuildertypes.MissInterval{
{Start: 1675115596722 + 60*60*1000, End: 1675115596722 + 180*60*1000},
},
},

View File

@@ -13,7 +13,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/constants"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes"
"go.uber.org/zap"
)
@@ -106,10 +106,10 @@ func (q *querier) runBuilderQuery(
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
return
}
misses := q.queryCache.FindMissingTimeRangesV2(start, end, builderQuery.StepInterval, cacheKeys[queryName])
misses := q.queryCache.FindMissingTimeRangesV2(ctx, start, end, builderQuery.StepInterval, cacheKeys[queryName])
zap.L().Info("cache misses for logs query", zap.Any("misses", misses))
missedSeries := make([]querycache.CachedSeriesData, 0)
filteredMissedSeries := make([]querycache.CachedSeriesData, 0)
missedSeries := make([]*querybuildertypes.SeriesData, 0)
filteredMissedSeries := make([]*querybuildertypes.SeriesData, 0)
for _, miss := range misses {
query, err = prepareLogsQuery(ctx, miss.Start, miss.End, builderQuery, params)
if err != nil {
@@ -132,7 +132,7 @@ func (q *querier) runBuilderQuery(
// making sure that empty range doesn't doesn't enter the cache
// empty results from filteredSeries means data was filtered out, but empty series means actual empty data
if len(filteredSeries) > 0 || len(series) == 0 {
filteredMissedSeries = append(filteredMissedSeries, querycache.CachedSeriesData{
filteredMissedSeries = append(filteredMissedSeries, &querybuildertypes.SeriesData{
Data: filteredSeries,
Start: startTime,
End: endTime,
@@ -140,17 +140,17 @@ func (q *querier) runBuilderQuery(
}
// for the actual response
missedSeries = append(missedSeries, querycache.CachedSeriesData{
missedSeries = append(missedSeries, &querybuildertypes.SeriesData{
Data: series,
Start: miss.Start,
End: miss.End,
})
}
filteredMergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], filteredMissedSeries)
q.queryCache.StoreSeriesInCache(cacheKeys[queryName], filteredMergedSeries)
filteredMergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(ctx, cacheKeys[queryName], filteredMissedSeries)
q.queryCache.StoreSeriesInCache(ctx, cacheKeys[queryName], filteredMergedSeries)
mergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], missedSeries)
mergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(ctx, cacheKeys[queryName], missedSeries)
resultSeries := common.GetSeriesFromCachedDataV2(mergedSeries, start, end, builderQuery.StepInterval)
@@ -238,9 +238,9 @@ func (q *querier) runBuilderQuery(
return
}
misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName])
misses := q.queryCache.FindMissingTimeRanges(ctx, start, end, builderQuery.StepInterval, cacheKeys[queryName])
zap.L().Info("cache misses for metrics query", zap.Any("misses", misses))
missedSeries := make([]querycache.CachedSeriesData, 0)
missedSeries := make([]*querybuildertypes.SeriesData, 0)
for _, miss := range misses {
query, err := metricsV4.PrepareMetricQuery(
miss.Start,
@@ -269,13 +269,13 @@ func (q *querier) runBuilderQuery(
}
return
}
missedSeries = append(missedSeries, querycache.CachedSeriesData{
missedSeries = append(missedSeries, &querybuildertypes.SeriesData{
Data: series,
Start: miss.Start,
End: miss.End,
})
}
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKeys[queryName], missedSeries)
mergedSeries := q.queryCache.MergeWithCachedSeriesData(ctx, cacheKeys[queryName], missedSeries)
resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end)

View File

@@ -6,6 +6,7 @@ import (
"sync"
"time"
"github.com/SigNoz/signoz/pkg/cache"
logsV4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4"
metricsV4 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v4"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
@@ -13,10 +14,9 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/constants"
chErrors "github.com/SigNoz/signoz/pkg/query-service/errors"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes"
"github.com/SigNoz/signoz/pkg/query-service/cache"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
@@ -68,7 +68,7 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
logsQueryBuilder := logsV4.PrepareLogsQuery
tracesQueryBuilder := tracesV4.PrepareTracesQuery
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
qc := querybuildertypes.NewQueryCache(querybuildertypes.WithCache(opts.Cache), querybuildertypes.WithFluxInterval(opts.FluxInterval))
return &querier{
cache: opts.Cache,
@@ -212,9 +212,9 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: series}
return
}
misses := q.queryCache.FindMissingTimeRanges(params.Start, params.End, params.Step, cacheKey)
misses := q.queryCache.FindMissingTimeRanges(ctx, params.Start, params.End, params.Step, cacheKey)
zap.L().Info("cache misses for metrics prom query", zap.Any("misses", misses))
missedSeries := make([]querycache.CachedSeriesData, 0)
missedSeries := make([]*querybuildertypes.SeriesData, 0)
for _, miss := range misses {
query := metricsV4.BuildPromQuery(promQuery, params.Step, miss.Start, miss.End)
series, err := q.execPromQuery(ctx, query)
@@ -222,13 +222,13 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: nil}
return
}
missedSeries = append(missedSeries, querycache.CachedSeriesData{
missedSeries = append(missedSeries, &querybuildertypes.SeriesData{
Data: series,
Start: miss.Start,
End: miss.End,
})
}
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKey, missedSeries)
mergedSeries := q.queryCache.MergeWithCachedSeriesData(ctx, cacheKey, missedSeries)
resultSeries := common.GetSeriesFromCachedData(mergedSeries, params.Start, params.End)
channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: resultSeries}
}(queryName, promQuery)

View File

@@ -2,7 +2,6 @@ package v2
import (
"context"
"encoding/json"
"fmt"
"math"
"strings"
@@ -10,18 +9,20 @@ import (
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/factory/factorytest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/prometheus/prometheustest"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
"github.com/SigNoz/signoz/pkg/query-service/cache/inmemory"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require"
)
@@ -63,7 +64,7 @@ func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) {
requestedEnd int64 // in milliseconds
requestedStep int64 // in seconds
cachedSeries []*v3.Series
expectedMiss []querycache.MissInterval
expectedMiss []*querybuildertypes.MissInterval
replaceCachedData bool
}{
{
@@ -88,7 +89,7 @@ func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) {
},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722,
End: 1675115596722 + 60*60*1000,
@@ -129,7 +130,7 @@ func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) {
},
},
},
expectedMiss: []querycache.MissInterval{},
expectedMiss: []*querybuildertypes.MissInterval{},
},
{
name: "cached time range is a left overlap of the requested time range",
@@ -157,7 +158,7 @@ func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) {
},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722 + 120*60*1000,
End: 1675115596722 + 180*60*1000,
@@ -190,7 +191,7 @@ func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) {
},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722,
End: 1675115596722 + 60*60*1000,
@@ -223,7 +224,7 @@ func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) {
},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722,
End: 1675115596722 + 180*60*1000,
@@ -232,29 +233,35 @@ func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) {
replaceCachedData: true,
},
}
opts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
qc := querycache.NewQueryCache(querycache.WithCache(c))
qc := querybuildertypes.NewQueryCache(querybuildertypes.WithCache(c))
for idx, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cacheKey := fmt.Sprintf("test-cache-key-%d", idx)
cachedData := &querycache.CachedSeriesData{
cachedData := &querybuildertypes.SeriesData{
Start: minTimestamp(tc.cachedSeries),
End: maxTimestamp(tc.cachedSeries),
Data: tc.cachedSeries,
}
jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData})
data := querybuildertypes.CachedSeriesData{Series: []*querybuildertypes.SeriesData{cachedData}}
if err != nil {
t.Errorf("error marshalling cached data: %v", err)
}
err = c.Store(cacheKey, jsonData, 5*time.Minute)
err = c.Store(context.Background(), cacheKey, &data, 5*time.Minute)
if err != nil {
t.Errorf("error storing cached data: %v", err)
}
misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey)
misses := qc.FindMissingTimeRanges(context.Background(), tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey)
if len(misses) != len(tc.expectedMiss) {
t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses))
}
@@ -280,7 +287,7 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) {
requestedStep int64
cachedSeries []*v3.Series
fluxInterval time.Duration
expectedMiss []querycache.MissInterval
expectedMiss []*querybuildertypes.MissInterval
}{
{
name: "cached time range is a subset of the requested time range",
@@ -305,7 +312,7 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) {
},
},
fluxInterval: 5 * time.Minute,
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722,
End: 1675115596722 + 60*60*1000,
@@ -347,7 +354,7 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) {
},
},
fluxInterval: 5 * time.Minute,
expectedMiss: []querycache.MissInterval{},
expectedMiss: []*querybuildertypes.MissInterval{},
},
{
name: "cache time range is a left overlap of the requested time range",
@@ -376,7 +383,7 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) {
},
},
fluxInterval: 5 * time.Minute,
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722 + 120*60*1000,
End: 1675115596722 + 180*60*1000,
@@ -410,7 +417,7 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) {
},
},
fluxInterval: 5 * time.Minute,
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722,
End: 1675115596722 + 60*60*1000,
@@ -444,7 +451,7 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) {
},
},
fluxInterval: 5 * time.Minute,
expectedMiss: []querycache.MissInterval{
expectedMiss: []*querybuildertypes.MissInterval{
{
Start: 1675115596722,
End: 1675115596722 + 180*60*1000,
@@ -453,29 +460,35 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) {
},
}
c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
qc := querycache.NewQueryCache(querycache.WithCache(c))
opts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
qc := querybuildertypes.NewQueryCache(querybuildertypes.WithCache(c))
for idx, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cacheKey := fmt.Sprintf("test-cache-key-%d", idx)
cachedData := &querycache.CachedSeriesData{
cachedData := &querybuildertypes.SeriesData{
Start: minTimestamp(tc.cachedSeries),
End: maxTimestamp(tc.cachedSeries),
Data: tc.cachedSeries,
}
jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData})
data := querybuildertypes.CachedSeriesData{Series: []*querybuildertypes.SeriesData{cachedData}}
if err != nil {
t.Errorf("error marshalling cached data: %v", err)
return
}
err = c.Store(cacheKey, jsonData, 5*time.Minute)
err = c.Store(context.Background(), cacheKey, &data, 5*time.Minute)
if err != nil {
t.Errorf("error storing cached data: %v", err)
return
}
misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey)
misses := qc.FindMissingTimeRanges(context.Background(), tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey)
if len(misses) != len(tc.expectedMiss) {
t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses))
}
@@ -634,9 +647,17 @@ func TestV2QueryRangePanelGraph(t *testing.T) {
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
cacheOpts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cacheOpts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
opts := QuerierOptions{
Cache: cache,
Cache: c,
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
@@ -783,9 +804,17 @@ func TestV2QueryRangeValueType(t *testing.T) {
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute})
cacheopts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cacheopts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
opts := QuerierOptions{
Cache: cache,
Cache: c,
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
@@ -944,9 +973,16 @@ func TestV2QueryRangeTimeShiftWithCache(t *testing.T) {
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute})
cacheopts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cacheopts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
opts := QuerierOptions{
Cache: cache,
Cache: c,
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
@@ -1047,9 +1083,16 @@ func TestV2QueryRangeTimeShiftWithLimitAndCache(t *testing.T) {
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute})
cacheopts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cacheopts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
opts := QuerierOptions{
Cache: cache,
Cache: c,
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
@@ -1121,9 +1164,16 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute})
cacheopts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cacheopts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
opts := QuerierOptions{
Cache: cache,
Cache: c,
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
@@ -1148,17 +1198,17 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
expectedQueryAndTimeRanges := []struct {
query string
ranges []querycache.MissInterval
ranges []*querybuildertypes.MissInterval
}{
{
query: "signoz_calls_total",
ranges: []querycache.MissInterval{
ranges: []*querybuildertypes.MissInterval{
{Start: 1675115596722, End: 1675115596722 + 120*60*1000},
},
},
{
query: "signoz_latency_bucket",
ranges: []querycache.MissInterval{
ranges: []*querybuildertypes.MissInterval{
{Start: 1675115596722 + 60*60*1000, End: 1675115596722 + 180*60*1000},
},
},

View File

@@ -5,8 +5,8 @@ import (
"strings"
"github.com/SigNoz/govaluate"
"github.com/SigNoz/signoz/pkg/cache"
metricsV3 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v3"
"github.com/SigNoz/signoz/pkg/query-service/cache"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"

View File

@@ -13,6 +13,7 @@ import (
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/apis/fields"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/http/middleware"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
@@ -32,7 +33,6 @@ import (
"github.com/soheilhy/cmux"
"github.com/SigNoz/signoz/pkg/query-service/app/explorer"
"github.com/SigNoz/signoz/pkg/query-service/cache"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/query-service/dao"
"github.com/SigNoz/signoz/pkg/query-service/featureManager"
@@ -49,7 +49,6 @@ type ServerOptions struct {
HTTPHostPort string
PrivateHostPort string
PreferSpanMetrics bool
CacheConfigPath string
FluxInterval string
FluxIntervalForTraceDetail string
Cluster string
@@ -112,19 +111,10 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.SigNoz.Cache,
)
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
if err != nil {
return nil, err
}
c = cache.NewCache(cacheOpts)
}
rm, err := makeRulesManager(
serverOptions.SigNoz.SQLStore.SQLxDB(),
reader,
c,
serverOptions.SigNoz.Cache,
serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore,
serverOptions.SigNoz.Prometheus,
@@ -165,7 +155,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
IntegrationsController: integrationsController,
CloudIntegrationsController: cloudIntegrationsController,
LogsParsingPipelineController: logParsingPipelineController,
Cache: c,
FluxInterval: fluxInterval,
JWT: serverOptions.Jwt,
AlertmanagerAPI: alertmanager.NewAPI(serverOptions.SigNoz.Alertmanager),

View File

@@ -1,69 +0,0 @@
package cache
import (
"os"
"time"
inmemory "github.com/SigNoz/signoz/pkg/query-service/cache/inmemory"
redis "github.com/SigNoz/signoz/pkg/query-service/cache/redis"
"github.com/SigNoz/signoz/pkg/query-service/cache/status"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"gopkg.in/yaml.v2"
)
type Options struct {
Name string `yaml:"-"`
Provider string `yaml:"provider"`
Redis *redis.Options `yaml:"redis,omitempty"`
InMemory *inmemory.Options `yaml:"inmemory,omitempty"`
}
// Cache is the interface for the storage backend
type Cache interface {
Connect() error
Store(cacheKey string, data []byte, ttl time.Duration) error
Retrieve(cacheKey string, allowExpired bool) ([]byte, status.RetrieveStatus, error)
SetTTL(cacheKey string, ttl time.Duration)
Remove(cacheKey string)
BulkRemove(cacheKeys []string)
Close() error
}
// KeyGenerator is the interface for the key generator
// The key generator is used to generate the cache keys for the cache entries
type KeyGenerator interface {
// GenerateKeys generates the cache keys for the given query range params
// The keys are returned as a map where the key is the query name and the value is the cache key
GenerateKeys(*v3.QueryRangeParamsV3) map[string]string
}
// LoadFromYAMLCacheConfig loads the cache options from the given YAML config bytes
func LoadFromYAMLCacheConfig(yamlConfig []byte) (*Options, error) {
var options Options
err := yaml.Unmarshal(yamlConfig, &options)
if err != nil {
return nil, err
}
return &options, nil
}
// LoadFromYAMLCacheConfigFile loads the cache options from the given YAML config file
func LoadFromYAMLCacheConfigFile(configFile string) (*Options, error) {
bytes, err := os.ReadFile(configFile)
if err != nil {
return nil, err
}
return LoadFromYAMLCacheConfig(bytes)
}
// NewCache creates a new cache based on the given options
func NewCache(options *Options) Cache {
switch options.Provider {
case "redis":
return redis.New(options.Redis)
case "inmemory":
return inmemory.New(options.InMemory)
default:
return nil
}
}

View File

@@ -1,52 +0,0 @@
package cache
import "testing"
func TestNewCacheUnKnownProvider(t *testing.T) {
c := NewCache(&Options{
Name: "test",
Provider: "unknown",
})
if c != nil {
t.Fatalf("expected nil, got %v", c)
}
}
func TestNewCacheInMemory(t *testing.T) {
c := NewCache(&Options{
Name: "test",
Provider: "inmemory",
})
if c == nil {
t.Fatalf("expected non-nil, got nil")
}
}
func TestNewCacheRedis(t *testing.T) {
c := NewCache(&Options{
Name: "test",
Provider: "redis",
})
if c == nil {
t.Fatalf("expected non-nil, got nil")
}
}
func TestLoadFromYAMLCacheConfig(t *testing.T) {
_, err := LoadFromYAMLCacheConfig([]byte(`
provider: inmemory
`))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
func TestLoadFromYAMLCacheConfigFile(t *testing.T) {
_, err := LoadFromYAMLCacheConfigFile("testdata/cache.yaml")
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
}

View File

@@ -1,73 +0,0 @@
package inmemory
import (
"time"
"github.com/SigNoz/signoz/pkg/query-service/cache/status"
go_cache "github.com/patrickmn/go-cache"
)
// cache implements the Cache interface
type cache struct {
cc *go_cache.Cache
}
// New creates a new in-memory cache
func New(opts *Options) *cache {
if opts == nil {
opts = defaultOptions()
}
return &cache{cc: go_cache.New(opts.TTL, opts.CleanupInterval)}
}
// Connect does nothing
func (c *cache) Connect() error {
return nil
}
// Store stores the data in the cache
func (c *cache) Store(cacheKey string, data []byte, ttl time.Duration) error {
c.cc.Set(cacheKey, data, ttl)
return nil
}
// Retrieve retrieves the data from the cache
func (c *cache) Retrieve(cacheKey string, allowExpired bool) ([]byte, status.RetrieveStatus, error) {
data, found := c.cc.Get(cacheKey)
if !found {
return nil, status.RetrieveStatusKeyMiss, nil
}
return data.([]byte), status.RetrieveStatusHit, nil
}
// SetTTL sets the TTL for the cache entry
func (c *cache) SetTTL(cacheKey string, ttl time.Duration) {
item, found := c.cc.Get(cacheKey)
if !found {
return
}
_ = c.cc.Replace(cacheKey, item, ttl)
}
// Remove removes the cache entry
func (c *cache) Remove(cacheKey string) {
c.cc.Delete(cacheKey)
}
// BulkRemove removes the cache entries
func (c *cache) BulkRemove(cacheKeys []string) {
for _, cacheKey := range cacheKeys {
c.cc.Delete(cacheKey)
}
}
// Close does nothing
func (c *cache) Close() error {
return nil
}
// Configuration returns the cache configuration
func (c *cache) Configuration() *Options {
return nil
}

View File

@@ -1,102 +0,0 @@
package inmemory
import (
"testing"
"time"
"github.com/SigNoz/signoz/pkg/query-service/cache/status"
"github.com/stretchr/testify/assert"
)
// TestNew tests the New function
func TestNew(t *testing.T) {
opts := &Options{
TTL: 10 * time.Second,
CleanupInterval: 10 * time.Second,
}
c := New(opts)
assert.NotNil(t, c)
assert.NotNil(t, c.cc)
}
// TestConnect tests the Connect function
func TestConnect(t *testing.T) {
c := New(nil)
assert.NoError(t, c.Connect())
}
// TestStore tests the Store function
func TestStore(t *testing.T) {
c := New(nil)
assert.NoError(t, c.Store("key", []byte("value"), 10*time.Second))
}
// TestRetrieve tests the Retrieve function
func TestRetrieve(t *testing.T) {
c := New(nil)
assert.NoError(t, c.Store("key", []byte("value"), 10*time.Second))
data, retrieveStatus, err := c.Retrieve("key", false)
assert.NoError(t, err)
assert.Equal(t, retrieveStatus, status.RetrieveStatusHit)
assert.Equal(t, data, []byte("value"))
}
// TestSetTTL tests the SetTTL function
func TestSetTTL(t *testing.T) {
c := New(&Options{TTL: 10 * time.Second, CleanupInterval: 1 * time.Second})
assert.NoError(t, c.Store("key", []byte("value"), 2*time.Second))
time.Sleep(3 * time.Second)
data, retrieveStatus, err := c.Retrieve("key", false)
assert.NoError(t, err)
assert.Equal(t, retrieveStatus, status.RetrieveStatusKeyMiss)
assert.Nil(t, data)
assert.NoError(t, c.Store("key", []byte("value"), 2*time.Second))
c.SetTTL("key", 4*time.Second)
time.Sleep(3 * time.Second)
data, retrieveStatus, err = c.Retrieve("key", false)
assert.NoError(t, err)
assert.Equal(t, retrieveStatus, status.RetrieveStatusHit)
assert.Equal(t, data, []byte("value"))
}
// TestRemove tests the Remove function
func TestRemove(t *testing.T) {
c := New(nil)
assert.NoError(t, c.Store("key", []byte("value"), 10*time.Second))
c.Remove("key")
data, retrieveStatus, err := c.Retrieve("key", false)
assert.NoError(t, err)
assert.Equal(t, retrieveStatus, status.RetrieveStatusKeyMiss)
assert.Nil(t, data)
}
// TestBulkRemove tests the BulkRemove function
func TestBulkRemove(t *testing.T) {
c := New(nil)
assert.NoError(t, c.Store("key1", []byte("value"), 10*time.Second))
assert.NoError(t, c.Store("key2", []byte("value"), 10*time.Second))
c.BulkRemove([]string{"key1", "key2"})
data, retrieveStatus, err := c.Retrieve("key1", false)
assert.NoError(t, err)
assert.Equal(t, retrieveStatus, status.RetrieveStatusKeyMiss)
assert.Nil(t, data)
data, retrieveStatus, err = c.Retrieve("key2", false)
assert.NoError(t, err)
assert.Equal(t, retrieveStatus, status.RetrieveStatusKeyMiss)
assert.Nil(t, data)
}
// TestCache tests the cache
func TestCache(t *testing.T) {
c := New(nil)
assert.NoError(t, c.Store("key", []byte("value"), 10*time.Second))
data, retrieveStatus, err := c.Retrieve("key", false)
assert.NoError(t, err)
assert.Equal(t, retrieveStatus, status.RetrieveStatusHit)
assert.Equal(t, data, []byte("value"))
c.Remove("key")
}

View File

@@ -1,23 +0,0 @@
package inmemory
import (
"time"
go_cache "github.com/patrickmn/go-cache"
)
const (
defaultTTL = go_cache.NoExpiration
defaultCleanupInterval = 1 * time.Minute
)
// Options holds the options for the in-memory cache
type Options struct {
// TTL is the time to live for the cache entries
TTL time.Duration `yaml:"ttl,omitempty"`
CleanupInterval time.Duration `yaml:"cleanupInterval,omitempty"`
}
func defaultOptions() *Options {
return &Options{TTL: defaultTTL, CleanupInterval: defaultCleanupInterval}
}

View File

@@ -1,24 +0,0 @@
package redis
const (
defaultHost = "localhost"
defaultPort = 6379
defaultPassword = ""
defaultDB = 0
)
type Options struct {
Host string `yaml:"host,omitempty"`
Port int `yaml:"port,omitempty"`
Password string `yaml:"password,omitempty"`
DB int `yaml:"db,omitempty"`
}
func defaultOptions() *Options {
return &Options{
Host: defaultHost,
Port: defaultPort,
Password: defaultPassword,
DB: defaultDB,
}
}

View File

@@ -1,124 +0,0 @@
package redis
import (
"context"
"errors"
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/query-service/cache/status"
"github.com/go-redis/redis/v8"
"go.uber.org/zap"
)
type cache struct {
client *redis.Client
opts *Options
}
// New creates a new cache
func New(opts *Options) *cache {
if opts == nil {
opts = defaultOptions()
}
return &cache{opts: opts}
}
// WithClient creates a new cache with the given client
func WithClient(client *redis.Client) *cache {
return &cache{client: client}
}
// Connect connects to the redis server
func (c *cache) Connect() error {
c.client = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", c.opts.Host, c.opts.Port),
Password: c.opts.Password,
DB: c.opts.DB,
})
return nil
}
// Store stores the data in the cache
func (c *cache) Store(cacheKey string, data []byte, ttl time.Duration) error {
return c.client.Set(context.Background(), cacheKey, data, ttl).Err()
}
// Retrieve retrieves the data from the cache
func (c *cache) Retrieve(cacheKey string, allowExpired bool) ([]byte, status.RetrieveStatus, error) {
data, err := c.client.Get(context.Background(), cacheKey).Bytes()
if err != nil {
if errors.Is(err, redis.Nil) {
return nil, status.RetrieveStatusKeyMiss, nil
}
return nil, status.RetrieveStatusError, err
}
return data, status.RetrieveStatusHit, nil
}
// SetTTL sets the TTL for the cache entry
func (c *cache) SetTTL(cacheKey string, ttl time.Duration) {
err := c.client.Expire(context.Background(), cacheKey, ttl).Err()
if err != nil {
zap.L().Error("error setting TTL for cache key", zap.String("cacheKey", cacheKey), zap.Duration("ttl", ttl), zap.Error(err))
}
}
// Remove removes the cache entry
func (c *cache) Remove(cacheKey string) {
c.BulkRemove([]string{cacheKey})
}
// BulkRemove removes the cache entries
func (c *cache) BulkRemove(cacheKeys []string) {
if err := c.client.Del(context.Background(), cacheKeys...).Err(); err != nil {
zap.L().Error("error deleting cache keys", zap.Strings("cacheKeys", cacheKeys), zap.Error(err))
}
}
// Close closes the connection to the redis server
func (c *cache) Close() error {
return c.client.Close()
}
// Ping pings the redis server
func (c *cache) Ping() error {
return c.client.Ping(context.Background()).Err()
}
// GetClient returns the redis client
func (c *cache) GetClient() *redis.Client {
return c.client
}
// GetOptions returns the options
func (c *cache) GetOptions() *Options {
return c.opts
}
// GetTTL returns the TTL for the cache entry
func (c *cache) GetTTL(cacheKey string) time.Duration {
ttl, err := c.client.TTL(context.Background(), cacheKey).Result()
if err != nil {
zap.L().Error("error getting TTL for cache key", zap.String("cacheKey", cacheKey), zap.Error(err))
}
return ttl
}
// GetKeys returns the keys matching the pattern
func (c *cache) GetKeys(pattern string) ([]string, error) {
return c.client.Keys(context.Background(), pattern).Result()
}
// GetKeysWithTTL returns the keys matching the pattern with their TTL
func (c *cache) GetKeysWithTTL(pattern string) (map[string]time.Duration, error) {
keys, err := c.GetKeys(pattern)
if err != nil {
return nil, err
}
result := make(map[string]time.Duration)
for _, key := range keys {
result[key] = c.GetTTL(key)
}
return result, nil
}

View File

@@ -1,91 +0,0 @@
package redis
import (
"testing"
"time"
"github.com/SigNoz/signoz/pkg/query-service/cache/status"
"github.com/go-redis/redismock/v8"
)
func TestStore(t *testing.T) {
db, mock := redismock.NewClientMock()
c := WithClient(db)
mock.ExpectSet("key", []byte("value"), 10*time.Second).RedisNil()
_ = c.Store("key", []byte("value"), 10*time.Second)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled expectations: %s", err)
}
}
func TestRetrieve(t *testing.T) {
db, mock := redismock.NewClientMock()
c := WithClient(db)
mock.ExpectSet("key", []byte("value"), 10*time.Second).RedisNil()
_ = c.Store("key", []byte("value"), 10*time.Second)
mock.ExpectGet("key").SetVal("value")
data, retrieveStatus, err := c.Retrieve("key", false)
if err != nil {
t.Errorf("unexpected error: %s", err)
}
if retrieveStatus != status.RetrieveStatusHit {
t.Errorf("expected status %d, got %d", status.RetrieveStatusHit, retrieveStatus)
}
if string(data) != "value" {
t.Errorf("expected value %s, got %s", "value", string(data))
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled expectations: %s", err)
}
}
func TestSetTTL(t *testing.T) {
db, mock := redismock.NewClientMock()
c := WithClient(db)
mock.ExpectSet("key", []byte("value"), 10*time.Second).RedisNil()
_ = c.Store("key", []byte("value"), 10*time.Second)
mock.ExpectExpire("key", 4*time.Second).RedisNil()
c.SetTTL("key", 4*time.Second)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled expectations: %s", err)
}
}
func TestRemove(t *testing.T) {
db, mock := redismock.NewClientMock()
c := WithClient(db)
mock.ExpectSet("key", []byte("value"), 10*time.Second).RedisNil()
_ = c.Store("key", []byte("value"), 10*time.Second)
mock.ExpectDel("key").RedisNil()
c.Remove("key")
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled expectations: %s", err)
}
}
func TestBulkRemove(t *testing.T) {
db, mock := redismock.NewClientMock()
c := WithClient(db)
mock.ExpectSet("key", []byte("value"), 10*time.Second).RedisNil()
_ = c.Store("key", []byte("value"), 10*time.Second)
mock.ExpectSet("key2", []byte("value2"), 10*time.Second).RedisNil()
_ = c.Store("key2", []byte("value2"), 10*time.Second)
mock.ExpectDel("key", "key2").RedisNil()
c.BulkRemove([]string{"key", "key2"})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled expectations: %s", err)
}
}

View File

@@ -1,33 +0,0 @@
package status
// RetrieveStatus defines the possible status of a cache lookup
type RetrieveStatus int
const (
RetrieveStatusHit = RetrieveStatus(iota)
RetrieveStatusPartialHit
RetrieveStatusRangeMiss
RetrieveStatusKeyMiss
RetrieveStatusRevalidated
RetrieveStatusError
)
func (s RetrieveStatus) String() string {
switch s {
case RetrieveStatusHit:
return "hit"
case RetrieveStatusPartialHit:
return "partial hit"
case RetrieveStatusRangeMiss:
return "range miss"
case RetrieveStatusKeyMiss:
return "key miss"
case RetrieveStatusRevalidated:
return "revalidated"
case RetrieveStatusError:
return "error"
default:
return "unknown"
}
}

View File

@@ -1,43 +0,0 @@
package status
import (
"testing"
)
func TestRetrieveStatusString(t *testing.T) {
tests := []struct {
status RetrieveStatus
want string
}{
{
status: RetrieveStatusHit,
want: "hit",
},
{
status: RetrieveStatusPartialHit,
want: "partial hit",
},
{
status: RetrieveStatusRangeMiss,
want: "range miss",
},
{
status: RetrieveStatusKeyMiss,
want: "key miss",
},
{
status: RetrieveStatusRevalidated,
want: "revalidated",
},
{
status: RetrieveStatusError,
want: "error",
},
}
for _, tt := range tests {
if got := tt.status.String(); got != tt.want {
t.Errorf("RetrieveStatus.String() = %v, want %v", got, tt.want)
}
}
}

View File

@@ -1,2 +0,0 @@
name: test
provider: inmemory

View File

@@ -9,8 +9,8 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/constants"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes"
)
func AdjustedMetricTimeRange(start, end, step int64, mq v3.BuilderQuery) (int64, int64) {
@@ -93,7 +93,7 @@ func NormalizeLabelName(name string) string {
return normalized
}
func GetSeriesFromCachedData(data []querycache.CachedSeriesData, start, end int64) []*v3.Series {
func GetSeriesFromCachedData(data []*querybuildertypes.SeriesData, start, end int64) []*v3.Series {
series := make(map[uint64]*v3.Series)
for _, cachedData := range data {
@@ -126,7 +126,7 @@ func GetSeriesFromCachedData(data []querycache.CachedSeriesData, start, end int6
}
// It is different from GetSeriesFromCachedData because doesn't remove a point if it is >= (start - (start % step*1000))
func GetSeriesFromCachedDataV2(data []querycache.CachedSeriesData, start, end, step int64) []*v3.Series {
func GetSeriesFromCachedDataV2(data []*querybuildertypes.SeriesData, start, end, step int64) []*v3.Series {
series := make(map[uint64]*v3.Series)
for _, cachedData := range data {

View File

@@ -4,7 +4,7 @@ import (
"testing"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes"
)
func TestFilterSeriesPoints(t *testing.T) {
@@ -308,7 +308,7 @@ func TestFilterSeriesPoints(t *testing.T) {
func TestGetSeriesFromCachedData(t *testing.T) {
testCases := []struct {
name string
data []querycache.CachedSeriesData
data []*querybuildertypes.SeriesData
start int64
end int64
expectedCount int
@@ -316,7 +316,7 @@ func TestGetSeriesFromCachedData(t *testing.T) {
}{
{
name: "Single point outside range",
data: []querycache.CachedSeriesData{
data: []*querybuildertypes.SeriesData{
{
Data: []*v3.Series{
{
@@ -335,7 +335,7 @@ func TestGetSeriesFromCachedData(t *testing.T) {
},
{
name: "Single point inside range",
data: []querycache.CachedSeriesData{
data: []*querybuildertypes.SeriesData{
{
Data: []*v3.Series{
{
@@ -371,7 +371,7 @@ func TestGetSeriesFromCachedData(t *testing.T) {
func TestGetSeriesFromCachedDataV2(t *testing.T) {
testCases := []struct {
name string
data []querycache.CachedSeriesData
data []*querybuildertypes.SeriesData
start int64
end int64
step int64
@@ -380,7 +380,7 @@ func TestGetSeriesFromCachedDataV2(t *testing.T) {
}{
{
name: "Single point outside range",
data: []querycache.CachedSeriesData{
data: []*querybuildertypes.SeriesData{
{
Data: []*v3.Series{
{
@@ -400,7 +400,7 @@ func TestGetSeriesFromCachedDataV2(t *testing.T) {
},
{
name: "Single point inside range",
data: []querycache.CachedSeriesData{
data: []*querybuildertypes.SeriesData{
{
Data: []*v3.Series{
{

View File

@@ -7,7 +7,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/query-service/model/metrics_explorer"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/types/querybuildertypes"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/stats"
)
@@ -151,9 +151,9 @@ type Querier interface {
}
type QueryCache interface {
FindMissingTimeRanges(start, end int64, step int64, cacheKey string) []querycache.MissInterval
FindMissingTimeRangesV2(start, end int64, step int64, cacheKey string) []querycache.MissInterval
MergeWithCachedSeriesData(cacheKey string, newData []querycache.CachedSeriesData) []querycache.CachedSeriesData
StoreSeriesInCache(cacheKey string, series []querycache.CachedSeriesData)
MergeWithCachedSeriesDataV2(cacheKey string, series []querycache.CachedSeriesData) []querycache.CachedSeriesData
FindMissingTimeRanges(ctx context.Context, start, end int64, step int64, cacheKey string) []*querybuildertypes.MissInterval
FindMissingTimeRangesV2(ctx context.Context, start, end int64, step int64, cacheKey string) []*querybuildertypes.MissInterval
MergeWithCachedSeriesData(ctx context.Context, cacheKey string, newData []*querybuildertypes.SeriesData) []*querybuildertypes.SeriesData
StoreSeriesInCache(ctx context.Context, cacheKey string, series []*querybuildertypes.SeriesData)
MergeWithCachedSeriesDataV2(ctx context.Context, cacheKey string, series []*querybuildertypes.SeriesData) []*querybuildertypes.SeriesData
}

View File

@@ -60,6 +60,7 @@ func main() {
flag.BoolVar(&preferSpanMetrics, "prefer-span-metrics", false, "(prefer span metrics for service level metrics)")
// Deprecated
flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)")
// Deprecated
flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)")
flag.StringVar(&fluxInterval, "flux-interval", "5m", "(the interval to exclude data from being cached to avoid incorrect cache for data in motion)")
flag.StringVar(&fluxIntervalForTraceDetail, "flux-interval-trace-detail", "2m", "(the interval to exclude data from being cached to avoid incorrect cache for trace data in motion)")
@@ -128,7 +129,6 @@ func main() {
HTTPHostPort: constants.HTTPHostPort,
PreferSpanMetrics: preferSpanMetrics,
PrivateHostPort: constants.PrivateHostPort,
CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval,
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
Cluster: cluster,

View File

@@ -18,8 +18,8 @@ import (
"github.com/jmoiron/sqlx"
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/query-service/cache"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/query-service/telemetry"

View File

@@ -1,34 +1,46 @@
package querycache
package querybuildertypes
import (
"context"
"encoding/json"
"math"
"sort"
"time"
"github.com/SigNoz/signoz/pkg/query-service/cache"
"github.com/SigNoz/signoz/pkg/cache"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"go.uber.org/zap"
)
type queryCache struct {
cache cache.Cache
fluxInterval time.Duration
type SeriesData struct {
Start int64 `json:"start"`
End int64 `json:"end"`
Data []*v3.Series `json:"data"`
}
type CachedSeriesData struct {
Series []*SeriesData
}
type MissInterval struct {
Start, End int64 // in milliseconds
}
type CachedSeriesData struct {
Start int64 `json:"start"`
End int64 `json:"end"`
Data []*v3.Series `json:"data"`
func (c *CachedSeriesData) MarshalBinary() (data []byte, err error) {
return json.Marshal(c)
}
func (c *CachedSeriesData) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, c)
}
type QueryCacheOption func(q *queryCache)
type queryCache struct {
cache cache.Cache
fluxInterval time.Duration
}
func NewQueryCache(opts ...QueryCacheOption) *queryCache {
q := &queryCache{}
for _, opt := range opts {
@@ -51,29 +63,29 @@ func WithFluxInterval(fluxInterval time.Duration) QueryCacheOption {
// FindMissingTimeRangesV2 is a new correct implementation of FindMissingTimeRanges
// It takes care of any timestamps that were not queried due to rounding in the first version.
func (q *queryCache) FindMissingTimeRangesV2(start, end int64, step int64, cacheKey string) []MissInterval {
func (q *queryCache) FindMissingTimeRangesV2(ctx context.Context, start, end int64, step int64, cacheKey string) []*MissInterval {
if q.cache == nil || cacheKey == "" {
return []MissInterval{{Start: start, End: end}}
return []*MissInterval{{Start: start, End: end}}
}
stepMs := step * 1000
// when the window is too small to be cached, we return the entire range as a miss
if (start + stepMs) > end {
return []MissInterval{{Start: start, End: end}}
return []*MissInterval{{Start: start, End: end}}
}
cachedSeriesDataList := q.getCachedSeriesData(cacheKey)
cachedSeriesDataList := q.getCachedSeriesData(ctx, cacheKey)
// Sort the cached data by start time
sort.Slice(cachedSeriesDataList, func(i, j int) bool {
return cachedSeriesDataList[i].Start < cachedSeriesDataList[j].Start
})
zap.L().Info("Number of non-overlapping cached series data", zap.Int("count", len(cachedSeriesDataList)))
// TODO[@vikrantgupta25]: added logger here
// zap.L().Info("Number of non-overlapping cached series data", zap.Int("count", len(cachedSeriesDataList)))
// Exclude the flux interval from the cached end time
// Why do we use `time.Now()` here?
// When querying for a range [start, now())
// we don't want to use the cached data inside the flux interval period
@@ -90,13 +102,13 @@ func (q *queryCache) FindMissingTimeRangesV2(start, end int64, step int64, cache
)
}
var missingRanges []MissInterval
var missingRanges []*MissInterval
currentTime := start
// check if start is a complete aggregation window if not then add it as a miss
if start%stepMs != 0 {
nextAggStart := start - (start % stepMs) + stepMs
missingRanges = append(missingRanges, MissInterval{Start: start, End: nextAggStart})
missingRanges = append(missingRanges, &MissInterval{Start: start, End: nextAggStart})
currentTime = nextAggStart
}
@@ -112,7 +124,7 @@ func (q *queryCache) FindMissingTimeRangesV2(start, end int64, step int64, cache
// Add missing range if there's a gap
if currentTime < data.Start {
missingRanges = append(missingRanges, MissInterval{Start: currentTime, End: min(data.Start, end)})
missingRanges = append(missingRanges, &MissInterval{Start: currentTime, End: min(data.Start, end)})
}
// Update currentTime, but don't go past the end time
@@ -127,20 +139,20 @@ func (q *queryCache) FindMissingTimeRangesV2(start, end int64, step int64, cache
// The test case "start lies near the start of aggregation interval and end lies near the end of another aggregation interval"
// shows this case.
if currentTime < end {
missingRanges = append(missingRanges, MissInterval{Start: currentTime, End: end})
missingRanges = append(missingRanges, &MissInterval{Start: currentTime, End: end})
} else if end%stepMs != 0 {
// check if end is a complete aggregation window if not then add it as a miss
prevAggEnd := end - (end % stepMs)
missingRanges = append(missingRanges, MissInterval{Start: prevAggEnd, End: end})
missingRanges = append(missingRanges, &MissInterval{Start: prevAggEnd, End: end})
}
// Merge overlapping or adjacent missing ranges
if len(missingRanges) <= 1 {
return missingRanges
}
merged := []MissInterval{missingRanges[0]}
merged := []*MissInterval{missingRanges[0]}
for _, curr := range missingRanges[1:] {
last := &merged[len(merged)-1]
last := merged[len(merged)-1]
if last.End >= curr.Start {
last.End = max(last.End, curr.End)
} else {
@@ -151,19 +163,20 @@ func (q *queryCache) FindMissingTimeRangesV2(start, end int64, step int64, cache
return merged
}
func (q *queryCache) FindMissingTimeRanges(start, end, step int64, cacheKey string) []MissInterval {
func (q *queryCache) FindMissingTimeRanges(ctx context.Context, start, end, step int64, cacheKey string) []*MissInterval {
if q.cache == nil || cacheKey == "" {
return []MissInterval{{Start: start, End: end}}
return []*MissInterval{{Start: start, End: end}}
}
cachedSeriesDataList := q.getCachedSeriesData(cacheKey)
cachedSeriesDataList := q.getCachedSeriesData(ctx, cacheKey)
// Sort the cached data by start time
sort.Slice(cachedSeriesDataList, func(i, j int) bool {
return cachedSeriesDataList[i].Start < cachedSeriesDataList[j].Start
})
zap.L().Info("Number of non-overlapping cached series data", zap.Int("count", len(cachedSeriesDataList)))
// TODO[@vikrantgupta25]: added logger here
// zap.L().Info("Number of non-overlapping cached series data", zap.Int("count", len(cachedSeriesDataList)))
// Exclude the flux interval from the cached end time
@@ -187,7 +200,7 @@ func (q *queryCache) FindMissingTimeRanges(start, end, step int64, cacheKey stri
)
}
var missingRanges []MissInterval
var missingRanges []*MissInterval
currentTime := start
for _, data := range cachedSeriesDataList {
@@ -202,7 +215,7 @@ func (q *queryCache) FindMissingTimeRanges(start, end, step int64, cacheKey stri
// Add missing range if there's a gap
if currentTime < data.Start {
missingRanges = append(missingRanges, MissInterval{Start: currentTime, End: min(data.Start, end)})
missingRanges = append(missingRanges, &MissInterval{Start: currentTime, End: min(data.Start, end)})
}
// Update currentTime, but don't go past the end time
@@ -211,19 +224,20 @@ func (q *queryCache) FindMissingTimeRanges(start, end, step int64, cacheKey stri
// Add final missing range if necessary
if currentTime < end {
missingRanges = append(missingRanges, MissInterval{Start: currentTime, End: end})
missingRanges = append(missingRanges, &MissInterval{Start: currentTime, End: end})
}
return missingRanges
}
func (q *queryCache) getCachedSeriesData(cacheKey string) []*CachedSeriesData {
cachedData, _, _ := q.cache.Retrieve(cacheKey, true)
var cachedSeriesDataList []*CachedSeriesData
if err := json.Unmarshal(cachedData, &cachedSeriesDataList); err != nil {
func (q *queryCache) getCachedSeriesData(ctx context.Context, cacheKey string) []*SeriesData {
cachedSeriesData := new(CachedSeriesData)
_, err := q.cache.Retrieve(ctx, cacheKey, cachedSeriesData, true)
if err != nil {
return nil
}
return cachedSeriesDataList
return cachedSeriesData.Series
}
func (q *queryCache) mergeSeries(cachedSeries, missedSeries []*v3.Series) []*v3.Series {
@@ -263,45 +277,40 @@ func (q *queryCache) mergeSeries(cachedSeries, missedSeries []*v3.Series) []*v3.
return mergedSeries
}
func (q *queryCache) storeMergedData(cacheKey string, mergedData []CachedSeriesData) {
func (q *queryCache) storeMergedData(ctx context.Context, cacheKey string, mergedData []*SeriesData) {
if q.cache == nil {
return
}
mergedDataJSON, err := json.Marshal(mergedData)
if err != nil {
zap.L().Error("error marshalling merged data", zap.Error(err))
return
}
err = q.cache.Store(cacheKey, mergedDataJSON, 0)
cachedSeriesData := CachedSeriesData{Series: mergedData}
err := q.cache.Store(ctx, cacheKey, &cachedSeriesData, 0)
if err != nil {
zap.L().Error("error storing merged data", zap.Error(err))
}
}
func (q *queryCache) MergeWithCachedSeriesDataV2(cacheKey string, newData []CachedSeriesData) []CachedSeriesData {
func (q *queryCache) MergeWithCachedSeriesDataV2(ctx context.Context, cacheKey string, newData []*SeriesData) []*SeriesData {
if q.cache == nil {
return newData
}
cachedData, _, _ := q.cache.Retrieve(cacheKey, true)
var existingData []CachedSeriesData
if err := json.Unmarshal(cachedData, &existingData); err != nil {
zap.L().Error("error unmarshalling existing data", zap.Error(err))
return newData
cachedSeriesData := new(CachedSeriesData)
_, err := q.cache.Retrieve(ctx, cacheKey, cachedSeriesData, true)
if err != nil {
return nil
}
allData := append(existingData, newData...)
allData := append(cachedSeriesData.Series, newData...)
sort.Slice(allData, func(i, j int) bool {
return allData[i].Start < allData[j].Start
})
var mergedData []CachedSeriesData
var current *CachedSeriesData
var mergedData []*SeriesData
var current *SeriesData
for _, data := range allData {
if current == nil {
current = &CachedSeriesData{
current = &SeriesData{
Start: data.Start,
End: data.End,
Data: data.Data,
@@ -316,9 +325,9 @@ func (q *queryCache) MergeWithCachedSeriesDataV2(cacheKey string, newData []Cach
current.Data = q.mergeSeries(current.Data, data.Data)
} else {
// No overlap, add current to mergedData
mergedData = append(mergedData, *current)
mergedData = append(mergedData, current)
// Start new current
current = &CachedSeriesData{
current = &SeriesData{
Start: data.Start,
End: data.End,
Data: data.Data,
@@ -328,19 +337,19 @@ func (q *queryCache) MergeWithCachedSeriesDataV2(cacheKey string, newData []Cach
// After the loop, add the last current
if current != nil {
mergedData = append(mergedData, *current)
mergedData = append(mergedData, current)
}
return mergedData
}
func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []CachedSeriesData) []CachedSeriesData {
func (q *queryCache) MergeWithCachedSeriesData(ctx context.Context, cacheKey string, newData []*SeriesData) []*SeriesData {
mergedData := q.MergeWithCachedSeriesDataV2(cacheKey, newData)
q.storeMergedData(cacheKey, mergedData)
mergedData := q.MergeWithCachedSeriesDataV2(ctx, cacheKey, newData)
q.storeMergedData(ctx, cacheKey, mergedData)
return mergedData
}
func (q *queryCache) StoreSeriesInCache(cacheKey string, series []CachedSeriesData) {
q.storeMergedData(cacheKey, series)
func (q *queryCache) StoreSeriesInCache(ctx context.Context, cacheKey string, series []*SeriesData) {
q.storeMergedData(ctx, cacheKey, series)
}

View File

@@ -1,25 +1,29 @@
package querycache_test
package querybuildertypes
import (
"encoding/json"
"context"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/query-service/cache/inmemory"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/factory/factorytest"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/stretchr/testify/assert"
)
func TestFindMissingTimeRanges(t *testing.T) {
// Initialize the mock cache
mockCache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
// Create a queryCache instance with the mock cache and a fluxInterval
q := querycache.NewQueryCache(
querycache.WithCache(mockCache),
querycache.WithFluxInterval(0), // Set to zero for testing purposes
)
opts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
qc := NewQueryCache(WithCache(c), WithFluxInterval(0))
// Define the test cases
testCases := []struct {
@@ -28,8 +32,8 @@ func TestFindMissingTimeRanges(t *testing.T) {
requestedEnd int64 // in milliseconds
step int64 // in seconds
cacheKey string
cachedData []querycache.CachedSeriesData
expectedMiss []querycache.MissInterval
cachedData []*SeriesData
expectedMiss []*MissInterval
}{
{
name: "Cached time range is a subset of the requested time range",
@@ -37,14 +41,14 @@ func TestFindMissingTimeRanges(t *testing.T) {
requestedEnd: 5000,
step: 60,
cacheKey: "testKey1",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 2000,
End: 3000,
Data: []*v3.Series{}, // Data can be empty for this test
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1000, End: 2000},
{Start: 3000, End: 5000},
},
@@ -55,7 +59,7 @@ func TestFindMissingTimeRanges(t *testing.T) {
requestedEnd: 3000,
step: 60,
cacheKey: "testKey2",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 1000,
End: 4000,
@@ -70,14 +74,14 @@ func TestFindMissingTimeRanges(t *testing.T) {
requestedEnd: 4000,
step: 60,
cacheKey: "testKey3",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 1000,
End: 2500,
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 2500, End: 4000},
},
},
@@ -87,14 +91,14 @@ func TestFindMissingTimeRanges(t *testing.T) {
requestedEnd: 4000,
step: 60,
cacheKey: "testKey4",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 3500,
End: 5000,
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 2000, End: 3500},
},
},
@@ -104,14 +108,14 @@ func TestFindMissingTimeRanges(t *testing.T) {
requestedEnd: 4000,
step: 60,
cacheKey: "testKey5",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 5000,
End: 6000,
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 2000, End: 4000},
},
},
@@ -122,12 +126,12 @@ func TestFindMissingTimeRanges(t *testing.T) {
requestedEnd: 5000,
step: 60,
cacheKey: "testKey6",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{Start: 1100, End: 1200, Data: []*v3.Series{}},
{Start: 1300, End: 1400, Data: []*v3.Series{}},
{Start: 1500, End: 1600, Data: []*v3.Series{}},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1000, End: 1100},
{Start: 1200, End: 1300},
{Start: 1400, End: 1500},
@@ -140,13 +144,13 @@ func TestFindMissingTimeRanges(t *testing.T) {
requestedEnd: 2000,
step: 60,
cacheKey: "testKey7",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{Start: 1000, End: 1100, Data: []*v3.Series{}},
{Start: 1200, End: 1300, Data: []*v3.Series{}},
{Start: 1400, End: 1500, Data: []*v3.Series{}},
{Start: 1600, End: 1700, Data: []*v3.Series{}},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1100, End: 1200},
{Start: 1300, End: 1400},
{Start: 1500, End: 1600},
@@ -159,11 +163,11 @@ func TestFindMissingTimeRanges(t *testing.T) {
requestedEnd: 3000,
step: 60,
cacheKey: "testKey8",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{Start: 1000, End: 1500, Data: []*v3.Series{}},
{Start: 3500, End: 4000, Data: []*v3.Series{}},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 2000, End: 3000},
},
},
@@ -174,7 +178,7 @@ func TestFindMissingTimeRanges(t *testing.T) {
step: 60,
cacheKey: "testKey10",
cachedData: nil,
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1000, End: 2000},
},
},
@@ -184,13 +188,13 @@ func TestFindMissingTimeRanges(t *testing.T) {
requestedEnd: 5000,
step: 60,
cacheKey: "testKey11",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{Start: 1000, End: 2000, Data: []*v3.Series{}},
{Start: 1500, End: 2500, Data: []*v3.Series{}}, // Overlaps with previous
{Start: 3000, End: 3500, Data: []*v3.Series{}},
{Start: 4000, End: 4500, Data: []*v3.Series{}},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 2500, End: 3000},
{Start: 3500, End: 4000},
{Start: 4500, End: 5000},
@@ -202,11 +206,11 @@ func TestFindMissingTimeRanges(t *testing.T) {
requestedEnd: 5000,
step: 60,
cacheKey: "testKey12",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{Start: 1000, End: 1500, Data: []*v3.Series{}},
{Start: 4500, End: 5000, Data: []*v3.Series{}},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1500, End: 4500},
},
},
@@ -217,14 +221,13 @@ func TestFindMissingTimeRanges(t *testing.T) {
// Store the cached data in the mock cache
if len(tc.cachedData) > 0 {
cachedDataJSON, err := json.Marshal(tc.cachedData)
assert.NoError(t, err)
err = mockCache.Store(tc.cacheKey, cachedDataJSON, 0)
data := CachedSeriesData{Series: tc.cachedData}
err = c.Store(context.Background(), tc.cacheKey, &data, 0)
assert.NoError(t, err)
}
// Call FindMissingTimeRanges
missingRanges := q.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.step, tc.cacheKey)
missingRanges := qc.FindMissingTimeRanges(context.Background(), tc.requestedStart, tc.requestedEnd, tc.step, tc.cacheKey)
// Verify the missing ranges
assert.Equal(t, tc.expectedMiss, missingRanges)
@@ -234,13 +237,15 @@ func TestFindMissingTimeRanges(t *testing.T) {
func TestFindMissingTimeRangesV2(t *testing.T) {
// Initialize the mock cache
mockCache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
// Create a queryCache instance with the mock cache and a fluxInterval
q := querycache.NewQueryCache(
querycache.WithCache(mockCache),
querycache.WithFluxInterval(0), // Set to zero for testing purposes
)
opts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
qc := NewQueryCache(WithCache(c), WithFluxInterval(0))
// Define the test cases
testCases := []struct {
@@ -249,8 +254,8 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd int64 // in milliseconds
step int64 // in seconds
cacheKey string
cachedData []querycache.CachedSeriesData
expectedMiss []querycache.MissInterval
cachedData []*SeriesData
expectedMiss []*MissInterval
}{
{
name: "Cached time range is a subset of the requested time range",
@@ -258,14 +263,14 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738836000000, // 06 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey1",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 1738576800000, // 03 Feb 2025 10:00:00
End: 1738749600000, // 05 Feb 2025 10:00:00
Data: []*v3.Series{}, // Data can be empty for this test
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1738404000000, End: 1738576800000}, // 01 Feb 2025 10:00:00 - 03 Feb 2025 10:00:00
{Start: 1738749600000, End: 1738836000000}, // 05 Feb 2025 10:00:00 - 06 Feb 2025 10:00:00
},
@@ -276,7 +281,7 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738749600000, // 05 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey2",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 1738404000000, // 01 Feb 2025 10:00:00
End: 1738836000000, // 06 Feb 2025 10:00:00
@@ -291,14 +296,14 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738836000000, // 06 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey3",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 1738404000000, // 01 Feb 2025 10:00:00
End: 1738663200000, // 04 Feb 2025 10:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1738663200000, End: 1738836000000}, // 04 Feb 2025 10:00:00 - 06 Feb 2025 10:00:00
},
},
@@ -308,14 +313,14 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738576800000, // 03 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey4",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 1738490400000, // 02 Feb 2025 10:00:00
End: 1738663200000, // 04 Feb 2025 10:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1738404000000, End: 1738490400000}, // 01 Feb 2025 10:00:00 - 02 Feb 2025 10:00:00
},
},
@@ -325,14 +330,14 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738576800000, // 03 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey5",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 1738836000000, // 06 Feb 2025 10:00:00
End: 1739008800000, // 08 Feb 2025 10:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1738404000000, End: 1738576800000}, // 01 Feb 2025 10:00:00 - 03 Feb 2025 10:00:00
},
},
@@ -343,7 +348,7 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738836000000, // 06 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey6",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 1738490400000, // 02 Feb 2025 10:00:00
End: 1738576800000, // 03 Feb 2025 10:00:00
@@ -360,7 +365,7 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1738404000000, End: 1738490400000}, // 01 Feb 2025 10:00:00 - 02 Feb 2025 10:00:00
{Start: 1738576800000, End: 1738663200000}, // 03 Feb 2025 10:00:00 - 04 Feb 2025 10:00:00
{Start: 1738749600000, End: 1738836000000}, // 05 Feb 2025 10:00:00 - 06 Feb 2025 10:00:00
@@ -372,13 +377,13 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738490400000, // 02 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey7",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{Start: 1738404000000, End: 1738418400000, Data: []*v3.Series{}}, // 01 Feb 2025 10:00:00 - 14:00:00
{Start: 1738425600000, End: 1738432800000, Data: []*v3.Series{}}, // 01 Feb 2025 16:00:00 - 18:00:00
{Start: 1738440000000, End: 1738447200000, Data: []*v3.Series{}}, // 01 Feb 2025 20:00:00 - 22:00:00
{Start: 1738454400000, End: 1738461600000, Data: []*v3.Series{}}, // 02 Feb 2025 00:00:00 - 02:00:00
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
// {Start: 1738404000000, End: 1738404060000}, // 01 Feb 2025 10:00:00 - 10:01:00
{Start: 1738418400000, End: 1738425600000}, // 01 Feb 2025 14:00:00 - 16:00:00
{Start: 1738432800000, End: 1738440000000}, // 01 Feb 2025 18:00:00 - 20:00:00
@@ -392,11 +397,11 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738576800000, // 03 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey8",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{Start: 1738404000000, End: 1738447200000, Data: []*v3.Series{}}, // 01 Feb 2025 10:00:00 - 22:00:00
{Start: 1738620000000, End: 1738663200000, Data: []*v3.Series{}}, // 03 Feb 2025 22:00:00 - 04 Feb 2025 10:00:00
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1738490400000, End: 1738576800000}, // 02 Feb 2025 10:00:00 - 03 Feb 2025 10:00:00
},
},
@@ -407,7 +412,7 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
step: 60,
cacheKey: "testKey10",
cachedData: nil,
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1738404000000, End: 1738490400000}, // 01 Feb 2025 10:00:00 - 02 Feb 2025 10:00:00
},
},
@@ -417,13 +422,13 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738407600000, // 01 Feb 2025 11:00:00
step: 60,
cacheKey: "testKey11",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{Start: 1738404000000, End: 1738405200000, Data: []*v3.Series{}}, // 01 Feb 2025 10:00:00 - 10:20:00
{Start: 1738404600000, End: 1738405200000, Data: []*v3.Series{}}, // 01 Feb 2025 10:10:00 - 10:20:00
{Start: 1738406100000, End: 1738406700000, Data: []*v3.Series{}}, // 01 Feb 2025 10:35:00 - 10:45:00
{Start: 1738407000000, End: 1738407300000, Data: []*v3.Series{}}, // 01 Feb 2025 10:50:00 - 10:55:00
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1738405200000, End: 1738406100000}, // 01 Feb 2025 10:20:00 - 10:35:00
{Start: 1738406700000, End: 1738407000000}, // 01 Feb 2025 10:45:00 - 10:50:00
{Start: 1738407300000, End: 1738407600000}, // 01 Feb 2025 10:55:00 - 11:00:00
@@ -435,11 +440,11 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738407600000, // 01 Feb 2025 11:00:00
step: 60,
cacheKey: "testKey12",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{Start: 1738404000000, End: 1738405200000, Data: []*v3.Series{}}, // 01 Feb 2025 10:00:00 - 10:20:00
{Start: 1738406400000, End: 1738407600000, Data: []*v3.Series{}}, // 01 Feb 2025 10:40:00 - 11:00:00
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1738405200000, End: 1738406400000}, // 01 Feb 2025 10:20:00 - 10:40:00
},
},
@@ -449,10 +454,10 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738576800001,
step: 60,
cacheKey: "testKey13",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{Start: 1738576800000, End: 1738576860000, Data: []*v3.Series{}},
},
expectedMiss: []querycache.MissInterval{{Start: 1738576800000, End: 1738576800001}},
expectedMiss: []*MissInterval{{Start: 1738576800000, End: 1738576800001}},
},
{
name: "requested data is exactly one step or aggregation window",
@@ -460,7 +465,7 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738576860000,
step: 60,
cacheKey: "testKey13",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{Start: 1738576800000, End: 1738576860000, Data: []*v3.Series{}},
},
expectedMiss: nil,
@@ -471,14 +476,14 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738749600000, // 05 Feb 2025 10:00:00
step: 86400, // 24 hours
cacheKey: "testKey13",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 1738540800000, // 03 Feb 2025 00:00:00
End: 1738713600000, // 05 Feb 2025 00:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1738576800000, End: 1738627200000}, // 03 Feb 2025 10:00:00 - 04 Feb 2025 00:00:00
{Start: 1738713600000, End: 1738749600000}, // 05 Feb 2025 00:00:00 - 05 Feb 2025 10:00:00
},
@@ -489,14 +494,14 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738749600000, // 05 Feb 2025 10:00:00
step: 86400, // 24 hours
cacheKey: "testKey13",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 1738540800000, // 03 Feb 2025 00:00:00
End: 1738713600000, // 05 Feb 2025 00:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1738713600000, End: 1738749600000}, // 05 Feb 2025 00:00:00 - 05 Feb 2025 10:00:00
},
},
@@ -506,14 +511,14 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738713000000, // 04 Feb 2025 11:50:00
step: 86400, // 24 hours
cacheKey: "testKey13",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 1738540800000, // 03 Feb 2025 00:00:00
End: 1738713600000, // 05 Feb 2025 00:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1738541400000, End: 1738713000000}, // 03 Feb 2025 00:10:00 - 04 Feb 2025 11:50:00
},
},
@@ -523,14 +528,14 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738713000000, // 04 Feb 2025 11:50:00
step: 86400, // 24 hours
cacheKey: "testKey13",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 1738540800000, // 03 Feb 2025 00:00:00
End: 1738713600000, // 05 Feb 2025 00:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1738411859000, End: 1738540800000}, // 01 Feb 2025 00:10:00 - 03 Feb 2025 00:00:00
{Start: 1738627200000, End: 1738713000000}, // 04 Feb 2025 00:00:00 - 04 Feb 2025 11:50:00
},
@@ -541,14 +546,14 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
requestedEnd: 1738713600000, // 05 Feb 2025 00:00:00
step: 86400, // 24 hours
cacheKey: "testKey13",
cachedData: []querycache.CachedSeriesData{
cachedData: []*SeriesData{
{
Start: 1738540800000, // 03 Feb 2025 00:00:00
End: 1738713600000, // 05 Feb 2025 00:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
expectedMiss: []*MissInterval{
{Start: 1738498255000, End: 1738540800000}, // 03 Feb 2025 00:10:00 - 03 Feb 2025 00:00:00
},
},
@@ -559,14 +564,14 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
// Store the cached data in the mock cache
if len(tc.cachedData) > 0 {
cachedDataJSON, err := json.Marshal(tc.cachedData)
data := CachedSeriesData{Series: tc.cachedData}
err = c.Store(context.Background(), tc.cacheKey, &data, 0)
assert.NoError(t, err)
err = mockCache.Store(tc.cacheKey, cachedDataJSON, 0)
assert.NoError(t, err)
}
// Call FindMissingTimeRanges
missingRanges := q.FindMissingTimeRangesV2(tc.requestedStart, tc.requestedEnd, tc.step, tc.cacheKey)
missingRanges := qc.FindMissingTimeRangesV2(context.Background(), tc.requestedStart, tc.requestedEnd, tc.step, tc.cacheKey)
// Verify the missing ranges
assert.Equal(t, tc.expectedMiss, missingRanges)
@@ -576,19 +581,21 @@ func TestFindMissingTimeRangesV2(t *testing.T) {
func TestMergeWithCachedSeriesData(t *testing.T) {
// Initialize the mock cache
mockCache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
// Create a queryCache instance with the mock cache and a fluxInterval
q := querycache.NewQueryCache(
querycache.WithCache(mockCache),
querycache.WithFluxInterval(0), // Set to zero for testing purposes
)
opts := cache.Memory{
TTL: 5 * time.Minute,
CleanupInterval: 10 * time.Minute,
}
c, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts})
if err != nil {
t.Errorf("error initialising cache: %v", err)
}
qc := NewQueryCache(WithCache(c), WithFluxInterval(0))
// Define test data
cacheKey := "mergeTestKey"
// Existing cached data
existingData := []querycache.CachedSeriesData{
existingData := []*SeriesData{
{
Start: 1000,
End: 2000,
@@ -604,7 +611,7 @@ func TestMergeWithCachedSeriesData(t *testing.T) {
}
// New data to merge
newData := []querycache.CachedSeriesData{
newData := []*SeriesData{
{
Start: 1500,
End: 2500,
@@ -626,7 +633,7 @@ func TestMergeWithCachedSeriesData(t *testing.T) {
}
// Expected merged data
expectedMergedData := []querycache.CachedSeriesData{
expectedMergedData := []*SeriesData{
{
Start: 1000,
End: 2500,
@@ -649,13 +656,12 @@ func TestMergeWithCachedSeriesData(t *testing.T) {
}
// Store existing data in cache
cachedDataJSON, err := json.Marshal(existingData)
assert.NoError(t, err)
err = mockCache.Store(cacheKey, cachedDataJSON, 0)
data := CachedSeriesData{Series: existingData}
err = c.Store(context.Background(), cacheKey, &data, 0)
assert.NoError(t, err)
// Call MergeWithCachedSeriesData
mergedData := q.MergeWithCachedSeriesData(cacheKey, newData)
mergedData := qc.MergeWithCachedSeriesData(context.Background(), cacheKey, newData)
// Verify the merged data
assert.Equal(t, len(expectedMergedData), len(mergedData))