Compare commits

...

2 Commits

Author SHA1 Message Date
Srikanth Chekuri
a891dd4b50 chore: fix timestamps 2024-05-20 21:49:23 +05:30
Srikanth Chekuri
10530582ab fix: add zero value for missing timestamps in alert eval 2024-05-20 15:22:19 +05:30
3 changed files with 202 additions and 160 deletions

View File

@@ -40,9 +40,8 @@ func (s Sample) MarshalJSON() ([]byte, error) {
}
type Point struct {
T int64
V float64
Vs []float64
T int64
V float64
}
func (p Point) String() string {

View File

@@ -166,7 +166,9 @@ func (r *ThresholdRule) targetVal() float64 {
return 0
}
return *r.ruleCondition.Target
unitConverter := converter.FromUnit(converter.Unit(r.ruleCondition.TargetUnit))
value := unitConverter.Convert(converter.Value{F: *r.ruleCondition.Target, U: converter.Unit(r.ruleCondition.TargetUnit)}, converter.Unit(r.Unit()))
return value.F
}
func (r *ThresholdRule) matchType() MatchType {
@@ -414,40 +416,7 @@ func (r *ThresholdRule) Unit() string {
return ""
}
func (r *ThresholdRule) CheckCondition(v float64) bool {
if math.IsNaN(v) {
zap.L().Debug("found NaN in rule condition", zap.String("rule", r.Name()))
return false
}
if r.ruleCondition.Target == nil {
zap.L().Debug("found null target in rule condition", zap.String("rule", r.Name()))
return false
}
unitConverter := converter.FromUnit(converter.Unit(r.ruleCondition.TargetUnit))
value := unitConverter.Convert(converter.Value{F: *r.ruleCondition.Target, U: converter.Unit(r.ruleCondition.TargetUnit)}, converter.Unit(r.Unit()))
zap.L().Info("Checking condition for rule", zap.String("rule", r.Name()), zap.String("converter", unitConverter.Name()), zap.Float64("value", v), zap.Float64("target", value.F), zap.String("compareOp", string(r.ruleCondition.CompareOp)))
switch r.ruleCondition.CompareOp {
case ValueIsEq:
return v == value.F
case ValueIsNotEq:
return v != value.F
case ValueIsBelow:
return v < value.F
case ValueIsAbove:
return v > value.F
default:
return false
}
}
func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 {
// todo(amol): add 30 seconds to evalWindow for rate calc
// todo(srikanthccv): make this configurable
// 2 minutes is reasonable time to wait for data to be available
// 60 seconds (SDK) + 10 seconds (batch) + rest for n/w + serialization + write to disk etc..
@@ -478,23 +447,158 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 {
return &v3.QueryRangeParamsV3{
Start: start,
End: end,
Step: 60,
Step: int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60)),
CompositeQuery: r.ruleCondition.CompositeQuery,
}
}
func (r *ThresholdRule) shouldSkipFirstRecord() bool {
shouldSkip := false
for _, q := range r.ruleCondition.CompositeQuery.BuilderQueries {
if q.DataSource == v3.DataSourceMetrics && q.AggregateOperator.IsRateOperator() {
shouldSkip = true
func (r *ThresholdRule) shouldAlert(series []Sample) (Sample, bool) {
var alertSmpl Sample
var shouldAlert bool
var lbls labels.Labels
var lblsOrig labels.Labels
if len(series) > 0 {
lbls = series[0].Metric
lblsOrig = series[0].MetricOrig
}
switch r.matchType() {
case AtleastOnce:
// If any sample matches the condition, the rule is firing.
if r.compareOp() == ValueIsAbove {
for _, smpl := range series {
if smpl.V > r.targetVal() {
alertSmpl = smpl
shouldAlert = true
break
}
}
} else if r.compareOp() == ValueIsBelow {
for _, smpl := range series {
if smpl.V < r.targetVal() {
alertSmpl = smpl
shouldAlert = true
break
}
}
} else if r.compareOp() == ValueIsEq {
for _, smpl := range series {
if smpl.V == r.targetVal() {
alertSmpl = smpl
shouldAlert = true
break
}
}
} else if r.compareOp() == ValueIsNotEq {
for _, smpl := range series {
if smpl.V != r.targetVal() {
alertSmpl = smpl
shouldAlert = true
break
}
}
}
case AllTheTimes:
// If all samples match the condition, the rule is firing.
shouldAlert = true
alertSmpl = Sample{Point: Point{V: r.targetVal()}, Metric: lbls, MetricOrig: lblsOrig}
if r.compareOp() == ValueIsAbove {
for _, smpl := range series {
if smpl.V <= r.targetVal() {
shouldAlert = false
break
}
}
} else if r.compareOp() == ValueIsBelow {
for _, smpl := range series {
if smpl.V >= r.targetVal() {
shouldAlert = false
break
}
}
} else if r.compareOp() == ValueIsEq {
for _, smpl := range series {
if smpl.V != r.targetVal() {
shouldAlert = false
break
}
}
} else if r.compareOp() == ValueIsNotEq {
for _, smpl := range series {
if smpl.V == r.targetVal() {
shouldAlert = false
break
}
}
}
case OnAverage:
// If the average of all samples matches the condition, the rule is firing.
var sum float64
for _, smpl := range series {
if math.IsNaN(smpl.V) || math.IsInf(smpl.V, 0) {
continue
}
sum += smpl.V
}
avg := sum / float64(len(series))
alertSmpl = Sample{Point: Point{V: avg}, Metric: lbls, MetricOrig: lblsOrig}
if r.compareOp() == ValueIsAbove {
if avg > r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsBelow {
if avg < r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsEq {
if avg == r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsNotEq {
if avg != r.targetVal() {
shouldAlert = true
}
}
case InTotal:
// If the sum of all samples matches the condition, the rule is firing.
var sum float64
for _, smpl := range series {
if math.IsNaN(smpl.V) || math.IsInf(smpl.V, 0) {
continue
}
sum += smpl.V
}
alertSmpl = Sample{Point: Point{V: sum}, Metric: lbls, MetricOrig: lblsOrig}
if r.compareOp() == ValueIsAbove {
if sum > r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsBelow {
if sum < r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsEq {
if sum == r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsNotEq {
if sum != r.targetVal() {
shouldAlert = true
}
}
}
return shouldSkip
return alertSmpl, shouldAlert
}
// queryClickhouse runs actual query against clickhouse
func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, query string) (Vector, error) {
func (r *ThresholdRule) runChQuery(
ctx context.Context,
db clickhouse.Conn,
query string,
timestamps []int64,
) (Vector, error) {
rows, err := db.Query(ctx, query)
if err != nil {
zap.L().Error("failed to get alert query result", zap.String("rule", r.Name()), zap.Error(err))
@@ -512,15 +616,7 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer
// []sample list
var result Vector
// map[fingerprint]sample
resultMap := make(map[uint64]Sample, 0)
// for rates we want to skip the first record
// but we dont know when the rates are being used
// so we always pick timeframe - 30 seconds interval
// and skip the first record for a given label combo
// NOTE: this is not applicable for raw queries
skipFirstRecord := make(map[uint64]bool, 0)
seriesMap := make(map[uint64][]Sample, 0)
defer rows.Close()
for rows.Next() {
@@ -606,7 +702,6 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer
if math.IsNaN(sample.Point.V) {
continue
}
sample.Point.Vs = append(sample.Point.Vs, sample.Point.V)
// capture lables in result
sample.Metric = lbls.Labels()
@@ -614,106 +709,38 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer
labelHash := lbls.Labels().Hash()
// here we walk through values of time series
// and calculate the final value used to compare
// with rule target
if existing, ok := resultMap[labelHash]; ok {
switch r.matchType() {
case AllTheTimes:
if r.compareOp() == ValueIsAbove {
sample.Point.V = math.Min(existing.Point.V, sample.Point.V)
resultMap[labelHash] = sample
} else if r.compareOp() == ValueIsBelow {
sample.Point.V = math.Max(existing.Point.V, sample.Point.V)
resultMap[labelHash] = sample
} else {
sample.Point.Vs = append(existing.Point.Vs, sample.Point.V)
resultMap[labelHash] = sample
}
case AtleastOnce:
if r.compareOp() == ValueIsAbove {
sample.Point.V = math.Max(existing.Point.V, sample.Point.V)
resultMap[labelHash] = sample
} else if r.compareOp() == ValueIsBelow {
sample.Point.V = math.Min(existing.Point.V, sample.Point.V)
resultMap[labelHash] = sample
} else {
sample.Point.Vs = append(existing.Point.Vs, sample.Point.V)
resultMap[labelHash] = sample
}
case OnAverage:
sample.Point.Vs = append(existing.Point.Vs, sample.Point.V)
sample.Point.V = (existing.Point.V + sample.Point.V)
resultMap[labelHash] = sample
case InTotal:
sample.Point.V = (existing.Point.V + sample.Point.V)
resultMap[labelHash] = sample
}
} else {
if r.Condition().QueryType() == v3.QueryTypeBuilder {
// for query builder, time series data
// we skip the first record to support rate cases correctly
// improvement(amol): explore approaches to limit this only for
// rate uses cases
if exists := skipFirstRecord[labelHash]; exists || !r.shouldSkipFirstRecord() {
resultMap[labelHash] = sample
} else {
// looks like the first record for this label combo, skip it
skipFirstRecord[labelHash] = true
}
} else {
// for clickhouse raw queries, all records are considered
// improvement(amol): think about supporting rate queries
// written by user. may have to skip a record, similar to qb case(above)
resultMap[labelHash] = sample
}
}
seriesMap[labelHash] = append(seriesMap[labelHash], sample)
}
if r.matchType() == OnAverage {
for hash, s := range resultMap {
s.Point.V = s.Point.V / float64(len(s.Point.Vs))
resultMap[hash] = s
for hash, s := range seriesMap {
if len(s) == 0 {
continue
}
}
for hash, s := range resultMap {
if r.matchType() == AllTheTimes && r.compareOp() == ValueIsEq {
for _, v := range s.Point.Vs {
if v != r.targetVal() { // if any of the values is not equal to target, alert shouldn't be sent
s.Point.V = v
}
}
resultMap[hash] = s
} else if r.matchType() == AllTheTimes && r.compareOp() == ValueIsNotEq {
for _, v := range s.Point.Vs {
if v == r.targetVal() { // if any of the values is equal to target, alert shouldn't be sent
s.Point.V = v
}
}
resultMap[hash] = s
} else if r.matchType() == AtleastOnce && r.compareOp() == ValueIsEq {
for _, v := range s.Point.Vs {
if v == r.targetVal() { // if any of the values is equal to target, alert should be sent
s.Point.V = v
}
}
resultMap[hash] = s
} else if r.matchType() == AtleastOnce && r.compareOp() == ValueIsNotEq {
for _, v := range s.Point.Vs {
if v != r.targetVal() { // if any of the values is not equal to target, alert should be sent
s.Point.V = v
}
}
resultMap[hash] = s
// add zero value for missing timestamps
missingTimestamps := make(map[int64]bool)
labels := s[0].Metric
labelsOrig := s[0].MetricOrig
for _, ts := range timestamps {
missingTimestamps[ts] = true
}
for _, sample := range s {
delete(missingTimestamps, sample.Point.T*1000)
}
}
zap.L().Debug("resultmap(potential alerts)", zap.String("ruleid", r.ID()), zap.Int("count", len(resultMap)))
for ts := range missingTimestamps {
sample := Sample{
Point: Point{
T: ts,
V: 0,
},
Metric: labels,
MetricOrig: labelsOrig,
}
s = append(s, sample)
}
seriesMap[hash] = s
}
// if the data is missing for `For` duration then we should send alert
if r.ruleCondition.AlertOnAbsent && r.lastTimestampWithDatapoints.Add(time.Duration(r.Condition().AbsentFor)*time.Minute).Before(time.Now()) {
@@ -729,11 +756,10 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer
return result, nil
}
for _, sample := range resultMap {
// check alert rule condition before dumping results, if sendUnmatchedResults
// is set then add results irrespective of condition
if r.opts.SendUnmatched || r.CheckCondition(sample.Point.V) {
result = append(result, sample)
for _, series := range seriesMap {
alertSmpl, shouldAlert := r.shouldAlert(series)
if shouldAlert {
result = append(result, alertSmpl)
}
}
if len(result) != 0 {
@@ -742,7 +768,16 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer
return result, nil
}
func (r *ThresholdRule) prepareBuilderQueries(ts time.Time, ch driver.Conn) (map[string]string, error) {
// FIXME(srikanthccv): remove this hack
func (r *ThresholdRule) adjustedMetricTimeRange(start, end, step int64) (int64, int64) {
start = start - (start % (step * 1000))
start -= step * 1000
adjustStep := int64(math.Min(float64(step), 60))
end = end - (end % (adjustStep * 1000))
return start, end
}
func (r *ThresholdRule) prepareBuilderQueries(ts time.Time, ch driver.Conn) (map[string]string, []int64, error) {
params := r.prepareQueryRange(ts)
if params.CompositeQuery.QueryType == v3.QueryTypeBuilder {
// check if any enrichment is required for logs if yes then enrich them
@@ -754,6 +789,13 @@ func (r *ThresholdRule) prepareBuilderQueries(ts time.Time, ch driver.Conn) (map
}
start, end := r.adjustedMetricTimeRange(params.Start, params.End, params.Step)
timestamps := []int64{}
for i := start; i < end; i += params.Step * 1000 {
timestamps = append(timestamps, i)
}
var runQueries map[string]string
var err error
@@ -766,7 +808,7 @@ func (r *ThresholdRule) prepareBuilderQueries(ts time.Time, ch driver.Conn) (map
runQueries, err = r.queryBuilder.PrepareQueries(params)
}
return runQueries, err
return runQueries, timestamps, err
}
// The following function is used to prepare the where clause for the query
@@ -1023,7 +1065,7 @@ func (r *ThresholdRule) GetSelectedQuery() string {
var err error
if r.ruleCondition.QueryType() == v3.QueryTypeBuilder {
queries, err = r.prepareBuilderQueries(time.Now(), nil)
queries, _, err = r.prepareBuilderQueries(time.Now(), nil)
if err != nil {
zap.L().Error("failed to prepare metric queries", zap.String("ruleid", r.ID()), zap.Error(err))
return ""
@@ -1073,11 +1115,12 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch c
// var to hold target query to be executed
var queries map[string]string
var err error
var timestamps []int64
// fetch the target query based on query type
if r.ruleCondition.QueryType() == v3.QueryTypeBuilder {
queries, err = r.prepareBuilderQueries(ts, ch)
queries, timestamps, err = r.prepareBuilderQueries(ts, ch)
if err != nil {
zap.L().Error("failed to prepare metric queries", zap.String("ruleid", r.ID()), zap.Error(err))
@@ -1107,7 +1150,7 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch c
zap.L().Debug("Selected query lable for rule", zap.String("ruleid", r.ID()), zap.String("label", queryLabel))
if queryString, ok := queries[queryLabel]; ok {
return r.runChQuery(ctx, ch, queryString)
return r.runChQuery(ctx, ch, queryString, timestamps)
}
zap.L().Error("invalid query label", zap.String("ruleid", r.ID()), zap.String("label", queryLabel), zap.Any("queries", queries))

View File

@@ -324,7 +324,7 @@ func TestThresholdRuleCombinations(t *testing.T) {
assert.NoError(t, err)
}
result, err := rule.runChQuery(context.Background(), mock, queryString)
result, err := rule.runChQuery(context.Background(), mock, queryString, []int64{})
if err != nil {
assert.NoError(t, err)
}