Compare commits

...

33 Commits

Author SHA1 Message Date
Nityananda Gohain
ae6bbc7192 Merge branch 'main' into issue_7183_filter 2025-03-06 20:10:47 +05:30
nityanandagohain
da627b9779 fix: move function to common query range 2025-03-06 20:09:49 +05:30
nityanandagohain
5bd30af3f7 Merge remote-tracking branch 'origin/issue_7183_filter' into issue_7183_filter 2025-03-06 18:36:35 +05:30
nityanandagohain
29f72451d8 fix: address comments 2025-03-06 18:36:20 +05:30
nityanandagohain
85fe1a2a18 fix: added comments 2025-03-06 18:33:30 +05:30
Nityananda Gohain
2115656a5b Merge branch 'main' into issue_7183_filter 2025-03-06 18:31:07 +05:30
nityanandagohain
c9da6006db fix: handle case where end is equal to a complete window end 2025-03-06 18:29:49 +05:30
Nityananda Gohain
efe86b0a00 Merge branch 'main' into issue_7183_filter 2025-03-06 14:15:26 +05:30
nityanandagohain
52780a7ad9 fix: add error log 2025-03-06 14:14:48 +05:30
Nityananda Gohain
44b46c089b Update pkg/query-service/common/query_range.go
Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
2025-03-06 14:13:16 +05:30
nityanandagohain
ca65b4148c fix: address comments 2025-03-06 14:12:09 +05:30
nityanandagohain
064a522293 fix: address comments 2025-03-06 14:08:02 +05:30
nityanandagohain
8563bcdacf fix: address comments 2025-03-06 14:06:17 +05:30
Nityananda Gohain
727cd7747b Update pkg/query-service/app/querier/v2/helper.go
Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
2025-03-06 11:30:43 +05:30
nityanandagohain
a7ff27ef07 fix: name updated 2025-03-06 11:28:14 +05:30
nityanandagohain
f61e33aa23 fix: update logic to handle actual empty series 2025-03-06 11:26:19 +05:30
nityanandagohain
5bf79edb8b fix: tests 2025-03-05 20:29:23 +05:30
nityanandagohain
3e2c23d015 Merge remote-tracking branch 'origin/main' into issue_7183_filter 2025-03-05 20:27:57 +05:30
nityanandagohain
c6bd1dd283 fix: use step ms 2025-03-05 16:14:49 +05:30
nityanandagohain
51b4c8d85b Merge remote-tracking branch 'origin/issue_7183' into issue_7183_filter 2025-03-05 16:13:56 +05:30
nityanandagohain
697f16743f fix: use step ms 2025-03-05 16:13:38 +05:30
Nityananda Gohain
0f4e4473ef Update pkg/query-service/querycache/query_range_cache.go
Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
2025-03-05 16:11:08 +05:30
nityanandagohain
4eb2e0b97b Merge remote-tracking branch 'origin/issue_7183' into issue_7183_filter 2025-03-05 16:08:08 +05:30
Nityananda Gohain
8e5526c66c Merge branch 'main' into issue_7183 2025-03-05 16:07:06 +05:30
nityanandagohain
423561f652 fix: add comments 2025-03-05 16:06:13 +05:30
nityanandagohain
dc61db6936 fix: fix overlapping test case 2025-03-05 15:55:23 +05:30
nityanandagohain
e9bba641bc fix: fix the logic to use the points correctly 2025-03-05 15:40:54 +05:30
nityanandagohain
fbc4e50136 fix: filter points which are not a complete agg interval 2025-03-04 18:54:55 +05:30
nityanandagohain
6049ba194a fix: correct name 2025-03-04 15:40:47 +05:30
nityanandagohain
068126db40 fix: update logic and the test cases 2025-03-04 14:03:43 +05:30
nityanandagohain
4b87ac6424 fix: update if condition 2025-03-04 10:41:34 +05:30
nityanandagohain
f47a4207a9 fix: remove unwanted code 2025-03-04 01:08:02 +05:30
nityanandagohain
657240c71b fix: new implementation for finding missing timerange 2025-03-04 00:02:01 +05:30
6 changed files with 602 additions and 12 deletions

View File

@@ -122,6 +122,7 @@ func (q *querier) runBuilderQuery(
misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName])
zap.L().Info("cache misses for logs query", zap.Any("misses", misses))
missedSeries := make([]querycache.CachedSeriesData, 0)
filteredMissedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses {
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params, preferRPM)
if err != nil {
@@ -138,15 +139,32 @@ func (q *querier) runBuilderQuery(
}
return
}
filteredSeries, startTime, endTime := common.FilterSeriesPoints(series, miss.Start, miss.End, builderQuery.StepInterval)
// making sure that empty range doesn't doesn't enter the cache
// empty results from filteredSeries means data was filtered out, but empty series means actual empty data
if len(filteredSeries) > 0 || len(series) == 0 {
filteredMissedSeries = append(filteredMissedSeries, querycache.CachedSeriesData{
Data: filteredSeries,
Start: startTime,
End: endTime,
})
}
// for the actual response
missedSeries = append(missedSeries, querycache.CachedSeriesData{
Data: series,
Start: miss.Start,
End: miss.End,
Data: series,
})
}
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKeys[queryName], missedSeries)
resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end)
filteredMergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], filteredMissedSeries)
q.queryCache.StoreSeriesInCache(cacheKeys[queryName], filteredMergedSeries)
mergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], missedSeries)
resultSeries := common.GetSeriesFromCachedDataV2(mergedSeries, start, end, builderQuery.StepInterval)
ch <- channelResult{
Err: nil,

View File

@@ -119,9 +119,10 @@ func (q *querier) runBuilderQuery(
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
return
}
misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName])
misses := q.queryCache.FindMissingTimeRangesV2(start, end, builderQuery.StepInterval, cacheKeys[queryName])
zap.L().Info("cache misses for logs query", zap.Any("misses", misses))
missedSeries := make([]querycache.CachedSeriesData, 0)
filteredMissedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses {
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params, preferRPM)
if err != nil {
@@ -138,15 +139,33 @@ func (q *querier) runBuilderQuery(
}
return
}
filteredSeries, startTime, endTime := common.FilterSeriesPoints(series, miss.Start, miss.End, builderQuery.StepInterval)
// making sure that empty range doesn't doesn't enter the cache
// empty results from filteredSeries means data was filtered out, but empty series means actual empty data
if len(filteredSeries) > 0 || len(series) == 0 {
filteredMissedSeries = append(filteredMissedSeries, querycache.CachedSeriesData{
Data: filteredSeries,
Start: startTime,
End: endTime,
})
}
// for the actual response
missedSeries = append(missedSeries, querycache.CachedSeriesData{
Data: series,
Start: miss.Start,
End: miss.End,
})
}
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKeys[queryName], missedSeries)
resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end)
filteredMergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], filteredMissedSeries)
q.queryCache.StoreSeriesInCache(cacheKeys[queryName], filteredMergedSeries)
mergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], missedSeries)
resultSeries := common.GetSeriesFromCachedDataV2(mergedSeries, start, end, builderQuery.StepInterval)
ch <- channelResult{
Err: nil,

View File

@@ -3,6 +3,7 @@ package common
import (
"math"
"regexp"
"sort"
"time"
"unicode"
@@ -123,3 +124,108 @@ func GetSeriesFromCachedData(data []querycache.CachedSeriesData, start, end int6
}
return newSeries
}
// It is different from GetSeriesFromCachedData because doesn't remove a point if it is >= (start - (start % step*1000))
func GetSeriesFromCachedDataV2(data []querycache.CachedSeriesData, start, end, step int64) []*v3.Series {
series := make(map[uint64]*v3.Series)
for _, cachedData := range data {
for _, data := range cachedData.Data {
h := labels.FromMap(data.Labels).Hash()
if _, ok := series[h]; !ok {
series[h] = &v3.Series{
Labels: data.Labels,
LabelsArray: data.LabelsArray,
Points: make([]v3.Point, 0),
}
}
for _, point := range data.Points {
if point.Timestamp >= (start-(start%(step*1000))) && point.Timestamp <= end {
series[h].Points = append(series[h].Points, point)
}
}
}
}
newSeries := make([]*v3.Series, 0, len(series))
for _, s := range series {
s.SortPoints()
s.RemoveDuplicatePoints()
newSeries = append(newSeries, s)
}
return newSeries
}
// filter series points for storing in cache
func FilterSeriesPoints(seriesList []*v3.Series, missStart, missEnd int64, stepInterval int64) ([]*v3.Series, int64, int64) {
filteredSeries := make([]*v3.Series, 0)
startTime := missStart
endTime := missEnd
stepMs := stepInterval * 1000
// return empty series if the interval is not complete
if missStart+stepMs > missEnd {
return []*v3.Series{}, missStart, missEnd
}
// if the end time is not a complete aggregation window, then we will have to adjust the end time
// to the previous complete aggregation window end
endCompleteWindow := missEnd%stepMs == 0
if !endCompleteWindow {
endTime = missEnd - (missEnd % stepMs)
}
// if the start time is not a complete aggregation window, then we will have to adjust the start time
// to the next complete aggregation window
if missStart%stepMs != 0 {
startTime = missStart + stepMs - (missStart % stepMs)
}
for _, series := range seriesList {
// if data for the series is empty, then we will add it to the cache
if len(series.Points) == 0 {
filteredSeries = append(filteredSeries, &v3.Series{
Labels: series.Labels,
LabelsArray: series.LabelsArray,
Points: make([]v3.Point, 0),
})
continue
}
// Sort the points based on timestamp
sort.Slice(series.Points, func(i, j int) bool {
return series.Points[i].Timestamp < series.Points[j].Timestamp
})
points := make([]v3.Point, len(series.Points))
copy(points, series.Points)
// Filter the first point that is not a complete aggregation window
if series.Points[0].Timestamp < missStart {
// Remove the first point
points = points[1:]
}
// filter the last point if it is not a complete aggregation window
// adding or condition to handle the end time is equal to a complete window end https://github.com/SigNoz/signoz/pull/7212#issuecomment-2703677190
if (!endCompleteWindow && series.Points[len(series.Points)-1].Timestamp == missEnd-(missEnd%stepMs)) ||
(endCompleteWindow && series.Points[len(series.Points)-1].Timestamp == missEnd) {
// Remove the last point
points = points[:len(points)-1]
}
// making sure that empty range doesn't enter the cache
if len(points) > 0 {
filteredSeries = append(filteredSeries, &v3.Series{
Labels: series.Labels,
LabelsArray: series.LabelsArray,
Points: points,
})
}
}
return filteredSeries, startTime, endTime
}

View File

@@ -0,0 +1,435 @@
package common
import (
"testing"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/querycache"
)
func TestFilterSeriesPoints(t *testing.T) {
// Define test cases
testCases := []struct {
name string
seriesList []*v3.Series
missStart int64 // in milliseconds
missEnd int64 // in milliseconds
stepInterval int64 // in seconds
expectedPoints []*v3.Series
expectedStart int64 // in milliseconds
expectedEnd int64 // in milliseconds
}{
{
name: "Complete aggregation window",
missStart: 1609459200000, // 01 Jan 2021 00:00:00 UTC
missEnd: 1609466400000, // 01 Jan 2021 02:00:00 UTC
stepInterval: 3600, // 1 hour
seriesList: []*v3.Series{
{
Points: []v3.Point{
{Timestamp: 1609459200000, Value: 1.0}, // 01 Jan 2021 00:00:00 UTC
{Timestamp: 1609462800000, Value: 2.0}, // 01 Jan 2021 01:00:00 UTC
{Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC
},
},
},
expectedPoints: []*v3.Series{
{
Points: []v3.Point{
{Timestamp: 1609459200000, Value: 1.0},
{Timestamp: 1609462800000, Value: 2.0},
},
},
},
expectedStart: 1609459200000,
expectedEnd: 1609466400000,
},
{
name: "Filter first point",
missStart: 1609464600000, // 01 Jan 2021 01:30:00 UTC
missEnd: 1609470000000, // 01 Jan 2021 03:00:00 UTC
stepInterval: 3600, // 1 hour
seriesList: []*v3.Series{
{
Points: []v3.Point{
{Timestamp: 1609462800000, Value: 2.0}, // 01 Jan 2021 01:00:00 UTC
{Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC
},
},
},
expectedPoints: []*v3.Series{
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 3.0},
},
},
},
expectedStart: 1609466400000,
expectedEnd: 1609470000000,
},
{
name: "Filter last point",
missStart: 1609466400000, // 01 Jan 2021 02:00:00 UTC
missEnd: 1609471800000, // 01 Jan 2021 03:30:00 UTC
stepInterval: 3600, // 1 hour
seriesList: []*v3.Series{
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC
{Timestamp: 1609470000000, Value: 3.0}, // 01 Jan 2021 03:00:00 UTC
},
},
},
expectedPoints: []*v3.Series{
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 3.0},
},
},
},
expectedStart: 1609466400000,
expectedEnd: 1609470000000,
},
{
name: "Incomplete aggregation window",
missStart: 1609470000000, // 01 Jan 2021 03:00:00 UTC
missEnd: 1609471800000, // 01 Jan 2021 03:30:00 UTC
stepInterval: 3600, // 1 hour
seriesList: []*v3.Series{
{
Points: []v3.Point{},
},
},
expectedPoints: []*v3.Series{},
expectedStart: 1609470000000,
expectedEnd: 1609471800000,
},
{
name: "Filter first point with multiple series",
missStart: 1609464600000, // 01 Jan 2021 01:30:00 UTC
missEnd: 1609477200000, // 01 Jan 2021 05:00:00 UTC
stepInterval: 3600, // 1 hour
seriesList: []*v3.Series{
{
Points: []v3.Point{
{Timestamp: 1609462800000, Value: 2.0}, // 01 Jan 2021 01:00:00 UTC
{Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC
{Timestamp: 1609470000000, Value: 4.0}, // 01 Jan 2021 03:00:00 UTC
{Timestamp: 1609473600000, Value: 5.0}, // 01 Jan 2021 04:00:00 UTC
},
},
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 6.0}, // 01 Jan 2021 02:00:00 UTC
{Timestamp: 1609470000000, Value: 7.0}, // 01 Jan 2021 03:00:00 UTC
{Timestamp: 1609473600000, Value: 8.0}, // 01 Jan 2021 04:00:00 UTC
},
},
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 9.0}, // 01 Jan 2021 02:00:00 UTC
{Timestamp: 1609470000000, Value: 10.0}, // 01 Jan 2021 03:00:00 UTC
{Timestamp: 1609473600000, Value: 11.0}, // 01 Jan 2021 04:00:00 UTC
},
},
},
expectedPoints: []*v3.Series{
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC
{Timestamp: 1609470000000, Value: 4.0}, // 01 Jan 2021 03:00:00 UTC
{Timestamp: 1609473600000, Value: 5.0}, // 01 Jan 2021 04:00:00 UTC
},
},
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 6.0}, // 01 Jan 2021 02:00:00 UTC
{Timestamp: 1609470000000, Value: 7.0}, // 01 Jan 2021 03:00:00 UTC
{Timestamp: 1609473600000, Value: 8.0}, // 01 Jan 2021 04:00:00 UTC
},
},
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 9.0}, // 01 Jan 2021 02:00:00 UTC
{Timestamp: 1609470000000, Value: 10.0}, // 01 Jan 2021 03:00:00 UTC
{Timestamp: 1609473600000, Value: 11.0}, // 01 Jan 2021 04:00:00 UTC
},
},
},
expectedStart: 1609466400000,
expectedEnd: 1609477200000,
},
{
name: "Filter last point",
missStart: 1609466400000, // 01 Jan 2021 02:00:00 UTC
missEnd: 1609475400000, // 01 Jan 2021 04:30:00 UTC
stepInterval: 3600, // 1 hour
seriesList: []*v3.Series{
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC
{Timestamp: 1609470000000, Value: 4.0}, // 01 Jan 2021 03:00:00 UTC
{Timestamp: 1609473600000, Value: 5.0}, // 01 Jan 2021 04:00:00 UTC
},
},
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 6.0}, // 01 Jan 2021 02:00:00 UTC
},
},
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 9.0}, // 01 Jan 2021 02:00:00 UTC
{Timestamp: 1609470000000, Value: 10.0}, // 01 Jan 2021 03:00:00 UTC
},
},
},
expectedPoints: []*v3.Series{
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC
{Timestamp: 1609470000000, Value: 4.0}, // 01 Jan 2021 03:00:00 UTC
},
},
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 6.0}, // 01 Jan 2021 02:00:00 UTC
},
},
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 9.0}, // 01 Jan 2021 02:00:00 UTC
{Timestamp: 1609470000000, Value: 10.0}, // 01 Jan 2021 03:00:00 UTC
},
},
},
expectedStart: 1609466400000,
expectedEnd: 1609473600000,
},
{
name: "half range should return empty result",
missStart: 1609473600000, // 01 Jan 2021 04:00:00 UTC
missEnd: 1609475400000, // 01 Jan 2021 04:30:00 UTC
stepInterval: 3600, // 1 hour
seriesList: []*v3.Series{
{
Points: []v3.Point{
{Timestamp: 1609473600000, Value: 1.0}, // 01 Jan 2021 04:00:00 UTC
},
},
},
expectedPoints: []*v3.Series{},
expectedStart: 1609473600000,
expectedEnd: 1609475400000,
},
{
name: "respect actual empty series",
missStart: 1609466400000, // 01 Jan 2021 02:00:00 UTC
missEnd: 1609475400000, // 01 Jan 2021 04:30:00 UTC
stepInterval: 3600, // 1 hour
seriesList: []*v3.Series{
{
Points: []v3.Point{},
},
},
expectedPoints: []*v3.Series{
{
Points: []v3.Point{},
},
},
expectedStart: 1609466400000,
expectedEnd: 1609473600000,
},
{
name: "Remove point that is not a complete aggregation window",
missStart: 1609466400000, // 01 Jan 2021 02:00:00 UTC
missEnd: 1609470000000, // 01 Jan 2021 03:00:00 UTC
stepInterval: 3600, // 1 hour
seriesList: []*v3.Series{
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 2.0}, // 01 Jan 2021 02:00:00 UTC
{Timestamp: 1609470000000, Value: 3.0}, // 01 Jan 2021 03:00:00 UTC
},
},
},
expectedPoints: []*v3.Series{
{
Points: []v3.Point{
{Timestamp: 1609466400000, Value: 2.0},
},
},
},
expectedStart: 1609466400000,
expectedEnd: 1609470000000,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
filteredSeries, startTime, endTime := FilterSeriesPoints(tc.seriesList, tc.missStart, tc.missEnd, tc.stepInterval)
if len(tc.expectedPoints) != len(filteredSeries) {
t.Errorf("Expected %d series, got %d", len(tc.expectedPoints), len(filteredSeries))
return
}
for i := range tc.expectedPoints {
if len(tc.expectedPoints[i].Points) != len(filteredSeries[i].Points) {
t.Errorf("Series %d: Expected %d points, got %d\nExpected points: %+v\nGot points: %+v",
i,
len(tc.expectedPoints[i].Points),
len(filteredSeries[i].Points),
tc.expectedPoints[i].Points,
filteredSeries[i].Points)
continue
}
for j := range tc.expectedPoints[i].Points {
if tc.expectedPoints[i].Points[j].Timestamp != filteredSeries[i].Points[j].Timestamp {
t.Errorf("Series %d Point %d: Expected timestamp %d, got %d", i, j, tc.expectedPoints[i].Points[j].Timestamp, filteredSeries[i].Points[j].Timestamp)
}
if tc.expectedPoints[i].Points[j].Value != filteredSeries[i].Points[j].Value {
t.Errorf("Series %d Point %d: Expected value %f, got %f", i, j, tc.expectedPoints[i].Points[j].Value, filteredSeries[i].Points[j].Value)
}
}
}
if tc.expectedStart != startTime {
t.Errorf("Expected start time %d, got %d", tc.expectedStart, startTime)
}
if tc.expectedEnd != endTime {
t.Errorf("Expected end time %d, got %d", tc.expectedEnd, endTime)
}
})
}
}
func TestGetSeriesFromCachedData(t *testing.T) {
testCases := []struct {
name string
data []querycache.CachedSeriesData
start int64
end int64
expectedCount int
expectedPoints int
}{
{
name: "Single point outside range",
data: []querycache.CachedSeriesData{
{
Data: []*v3.Series{
{
Labels: map[string]string{"label1": "value1"},
Points: []v3.Point{
{Timestamp: 1609473600000, Value: 1.0},
},
},
},
},
},
start: 1609475400000, // 01 Jan 2021 04:30:00 UTC
end: 1609477200000, // 01 Jan 2021 05:00:00 UTC
expectedCount: 1,
expectedPoints: 0,
},
{
name: "Single point inside range",
data: []querycache.CachedSeriesData{
{
Data: []*v3.Series{
{
Labels: map[string]string{"label1": "value1"},
Points: []v3.Point{
{Timestamp: 1609476000000, Value: 1.0},
},
},
},
},
},
start: 1609475400000, // 01 Jan 2021 04:30:00 UTC
end: 1609477200000, // 01 Jan 2021 05:00:00 UTC
expectedCount: 1,
expectedPoints: 1,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
series := GetSeriesFromCachedData(tc.data, tc.start, tc.end)
if len(series) != tc.expectedCount {
t.Errorf("Expected %d series, got %d", tc.expectedCount, len(series))
}
if len(series[0].Points) != tc.expectedPoints {
t.Errorf("Expected %d points, got %d", tc.expectedPoints, len(series[0].Points))
}
})
}
}
func TestGetSeriesFromCachedDataV2(t *testing.T) {
testCases := []struct {
name string
data []querycache.CachedSeriesData
start int64
end int64
step int64
expectedCount int
expectedPoints int
}{
{
name: "Single point outside range",
data: []querycache.CachedSeriesData{
{
Data: []*v3.Series{
{
Labels: map[string]string{"label1": "value1"},
Points: []v3.Point{
{Timestamp: 1609473600000, Value: 1.0},
},
},
},
},
},
start: 1609475400000,
end: 1609477200000,
step: 1000,
expectedCount: 1,
expectedPoints: 0,
},
{
name: "Single point inside range",
data: []querycache.CachedSeriesData{
{
Data: []*v3.Series{
{
Labels: map[string]string{"label1": "value1"},
Points: []v3.Point{
{Timestamp: 1609476000000, Value: 1.0},
},
},
},
},
},
start: 1609475400000,
end: 1609477200000,
step: 1000,
expectedCount: 1,
expectedPoints: 1,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
series := GetSeriesFromCachedDataV2(tc.data, tc.start, tc.end, tc.step)
if len(series) != tc.expectedCount {
t.Errorf("Expected %d series, got %d", tc.expectedCount, len(series))
}
if len(series[0].Points) != tc.expectedPoints {
t.Errorf("Expected %d points, got %d", tc.expectedPoints, len(series[0].Points))
}
})
}
}

View File

@@ -149,4 +149,6 @@ type QueryCache interface {
FindMissingTimeRanges(start, end int64, step int64, cacheKey string) []querycache.MissInterval
FindMissingTimeRangesV2(start, end int64, step int64, cacheKey string) []querycache.MissInterval
MergeWithCachedSeriesData(cacheKey string, newData []querycache.CachedSeriesData) []querycache.CachedSeriesData
StoreSeriesInCache(cacheKey string, series []querycache.CachedSeriesData)
MergeWithCachedSeriesDataV2(cacheKey string, series []querycache.CachedSeriesData) []querycache.CachedSeriesData
}

View File

@@ -264,6 +264,9 @@ func (q *queryCache) mergeSeries(cachedSeries, missedSeries []*v3.Series) []*v3.
}
func (q *queryCache) storeMergedData(cacheKey string, mergedData []CachedSeriesData) {
if q.cache == nil {
return
}
mergedDataJSON, err := json.Marshal(mergedData)
if err != nil {
zap.L().Error("error marshalling merged data", zap.Error(err))
@@ -275,8 +278,7 @@ func (q *queryCache) storeMergedData(cacheKey string, mergedData []CachedSeriesD
}
}
func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []CachedSeriesData) []CachedSeriesData {
func (q *queryCache) MergeWithCachedSeriesDataV2(cacheKey string, newData []CachedSeriesData) []CachedSeriesData {
if q.cache == nil {
return newData
}
@@ -284,8 +286,7 @@ func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []Cached
cachedData, _, _ := q.cache.Retrieve(cacheKey, true)
var existingData []CachedSeriesData
if err := json.Unmarshal(cachedData, &existingData); err != nil {
// In case of error, we return the entire range as a miss
q.storeMergedData(cacheKey, newData)
zap.L().Error("error unmarshalling existing data", zap.Error(err))
return newData
}
@@ -330,7 +331,16 @@ func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []Cached
mergedData = append(mergedData, *current)
}
q.storeMergedData(cacheKey, mergedData)
return mergedData
}
func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []CachedSeriesData) []CachedSeriesData {
mergedData := q.MergeWithCachedSeriesDataV2(cacheKey, newData)
q.storeMergedData(cacheKey, mergedData)
return mergedData
}
func (q *queryCache) StoreSeriesInCache(cacheKey string, series []CachedSeriesData) {
q.storeMergedData(cacheKey, series)
}