|
|
|
|
@@ -14,10 +14,9 @@ import (
|
|
|
|
|
"time"
|
|
|
|
|
"unicode"
|
|
|
|
|
|
|
|
|
|
"github.com/SigNoz/govaluate"
|
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
|
|
|
|
|
"github.com/ClickHouse/clickhouse-go/v2"
|
|
|
|
|
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
|
|
|
|
"go.signoz.io/signoz/pkg/query-service/common"
|
|
|
|
|
"go.signoz.io/signoz/pkg/query-service/converter"
|
|
|
|
|
"go.signoz.io/signoz/pkg/query-service/postprocess"
|
|
|
|
|
@@ -66,7 +65,8 @@ type ThresholdRule struct {
|
|
|
|
|
// to avoid fetching temporality for the same metric multiple times
|
|
|
|
|
// querying the v4 table on low cardinal temporality column
|
|
|
|
|
// should be fast but we can still avoid the query if we have the data in memory
|
|
|
|
|
temporalityMap map[string]map[v3.Temporality]bool
|
|
|
|
|
temporalityMap map[string]map[v3.Temporality]bool
|
|
|
|
|
temporalityLock sync.Mutex
|
|
|
|
|
|
|
|
|
|
opts ThresholdRuleOpts
|
|
|
|
|
|
|
|
|
|
@@ -75,6 +75,7 @@ type ThresholdRule struct {
|
|
|
|
|
|
|
|
|
|
querier interfaces.Querier
|
|
|
|
|
querierV2 interfaces.Querier
|
|
|
|
|
reader interfaces.Reader
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ThresholdRuleOpts struct {
|
|
|
|
|
@@ -139,6 +140,7 @@ func NewThresholdRule(
|
|
|
|
|
|
|
|
|
|
t.querier = querier.NewQuerier(querierOption)
|
|
|
|
|
t.querierV2 = querierV2.NewQuerier(querierOptsV2)
|
|
|
|
|
t.reader = reader
|
|
|
|
|
|
|
|
|
|
zap.L().Info("creating new ThresholdRule", zap.String("name", t.name), zap.String("id", t.id))
|
|
|
|
|
|
|
|
|
|
@@ -301,34 +303,11 @@ func (r *ThresholdRule) ActiveAlerts() []*Alert {
|
|
|
|
|
return res
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *ThresholdRule) FetchTemporality(ctx context.Context, metricNames []string, ch driver.Conn) (map[string]map[v3.Temporality]bool, error) {
|
|
|
|
|
// populateTemporality add temporality if it doesn't exist already
|
|
|
|
|
func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
|
|
|
|
|
|
|
|
|
|
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
|
|
|
|
|
|
|
|
|
|
query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME)
|
|
|
|
|
|
|
|
|
|
rows, err := ch.Query(ctx, query, metricNames)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
defer rows.Close()
|
|
|
|
|
|
|
|
|
|
for rows.Next() {
|
|
|
|
|
var metricName, temporality string
|
|
|
|
|
err := rows.Scan(&metricName, &temporality)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if _, ok := metricNameToTemporality[metricName]; !ok {
|
|
|
|
|
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
|
|
|
|
|
}
|
|
|
|
|
metricNameToTemporality[metricName][v3.Temporality(temporality)] = true
|
|
|
|
|
}
|
|
|
|
|
return metricNameToTemporality, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// populateTemporality same as addTemporality but for v4 and better
|
|
|
|
|
func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3, ch driver.Conn) error {
|
|
|
|
|
r.temporalityLock.Lock()
|
|
|
|
|
defer r.temporalityLock.Unlock()
|
|
|
|
|
|
|
|
|
|
missingTemporality := make([]string, 0)
|
|
|
|
|
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
|
|
|
|
|
@@ -356,7 +335,7 @@ func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRan
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nameToTemporality, err := r.FetchTemporality(ctx, missingTemporality, ch)
|
|
|
|
|
nameToTemporality, err := r.reader.FetchTemporality(ctx, missingTemporality)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
@@ -393,7 +372,6 @@ func (r *ThresholdRule) ForEachActiveAlert(f func(*Alert)) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *ThresholdRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
|
|
|
|
|
zap.L().Info("sending alerts", zap.String("rule", r.Name()))
|
|
|
|
|
alerts := []*Alert{}
|
|
|
|
|
r.ForEachActiveAlert(func(alert *Alert) {
|
|
|
|
|
if r.opts.SendAlways || alert.needsSending(ts, resendDelay) {
|
|
|
|
|
@@ -527,15 +505,24 @@ func (r *ThresholdRule) fetchFilters(selectedQuery string, lbls labels.Labels) [
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !exists {
|
|
|
|
|
// if the label is not present in the where clause, add it as it is
|
|
|
|
|
// if filter item doesn't match with any label, add it as it is
|
|
|
|
|
filterItems = append(filterItems, item)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
groupBy := make(map[string]bool)
|
|
|
|
|
if r.ruleCondition.CompositeQuery.QueryType == v3.QueryTypeBuilder &&
|
|
|
|
|
r.ruleCondition.CompositeQuery.BuilderQueries[selectedQuery] != nil &&
|
|
|
|
|
r.ruleCondition.CompositeQuery.BuilderQueries[selectedQuery].GroupBy != nil {
|
|
|
|
|
for _, key := range r.ruleCondition.CompositeQuery.BuilderQueries[selectedQuery].GroupBy {
|
|
|
|
|
groupBy[key.Key] = true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// add the labels which are not present in the where clause
|
|
|
|
|
for _, label := range lbls {
|
|
|
|
|
if _, ok := added[label.Name]; !ok {
|
|
|
|
|
if _, ok := added[label.Name]; !ok && groupBy[label.Name] {
|
|
|
|
|
filterItems = append(filterItems, v3.FilterItem{
|
|
|
|
|
Key: v3.AttributeKey{Key: label.Name},
|
|
|
|
|
Operator: v3.FilterOperatorEqual,
|
|
|
|
|
@@ -551,7 +538,7 @@ func (r *ThresholdRule) prepareLinksToLogs(ts time.Time, lbls labels.Labels) str
|
|
|
|
|
selectedQuery := r.GetSelectedQuery()
|
|
|
|
|
|
|
|
|
|
// TODO(srikanthccv): handle formula queries
|
|
|
|
|
if selectedQuery < "A" || selectedQuery > "Z" {
|
|
|
|
|
if isFormula(selectedQuery) {
|
|
|
|
|
return ""
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -598,7 +585,7 @@ func (r *ThresholdRule) prepareLinksToLogs(ts time.Time, lbls labels.Labels) str
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
QueryFormulas: make([]string, 0),
|
|
|
|
|
QueryFormulas: make([]v3.BuilderQuery, 0),
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -615,7 +602,7 @@ func (r *ThresholdRule) prepareLinksToTraces(ts time.Time, lbls labels.Labels) s
|
|
|
|
|
selectedQuery := r.GetSelectedQuery()
|
|
|
|
|
|
|
|
|
|
// TODO(srikanthccv): handle formula queries
|
|
|
|
|
if selectedQuery < "A" || selectedQuery > "Z" {
|
|
|
|
|
if isFormula(selectedQuery) {
|
|
|
|
|
return ""
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -662,7 +649,7 @@ func (r *ThresholdRule) prepareLinksToTraces(ts time.Time, lbls labels.Labels) s
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
QueryFormulas: make([]string, 0),
|
|
|
|
|
QueryFormulas: make([]v3.BuilderQuery, 0),
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -675,6 +662,77 @@ func (r *ThresholdRule) prepareLinksToTraces(ts time.Time, lbls labels.Labels) s
|
|
|
|
|
return fmt.Sprintf("compositeQuery=%s&timeRange=%s&startTime=%d&endTime=%d&options=%s", compositeQuery, urlEncodedTimeRange, tr.Start, tr.End, urlEncodedOptions)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *ThresholdRule) getURLShareableCompositeQuery() urlShareableCompositeQuery {
|
|
|
|
|
var compositeQuery *v3.CompositeQuery
|
|
|
|
|
// TODO(srikanthccv): benchmark field-by-field copying vs ser/deser
|
|
|
|
|
jsunQuery, _ := json.Marshal(r.ruleCondition.CompositeQuery)
|
|
|
|
|
_ = json.Unmarshal(jsunQuery, &compositeQuery)
|
|
|
|
|
urlData := urlShareableCompositeQuery{
|
|
|
|
|
QueryType: string(v3.QueryTypeBuilder),
|
|
|
|
|
Builder: builderQuery{
|
|
|
|
|
QueryData: make([]v3.BuilderQuery, 0),
|
|
|
|
|
QueryFormulas: make([]v3.BuilderQuery, 0),
|
|
|
|
|
},
|
|
|
|
|
PromQL: make([]v3.PromQuery, 0),
|
|
|
|
|
ClickHouse: make([]v3.ClickHouseQuery, 0),
|
|
|
|
|
}
|
|
|
|
|
for _, query := range compositeQuery.BuilderQueries {
|
|
|
|
|
if query.QueryName == query.Expression {
|
|
|
|
|
urlData.Builder.QueryData = append(urlData.Builder.QueryData, *query)
|
|
|
|
|
} else {
|
|
|
|
|
urlData.Builder.QueryFormulas = append(urlData.Builder.QueryFormulas, *query)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return urlData
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// isFormula checks if the given string matches the formula pattern "F1", "F2", ..., "F{number}"
|
|
|
|
|
func isFormula(s string) bool {
|
|
|
|
|
return constants.FormulaRe.MatchString(s)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *ThresholdRule) prepareLinkToAlert(ts time.Time, lbls labels.Labels) string {
|
|
|
|
|
selectedQuery := r.GetSelectedQuery()
|
|
|
|
|
|
|
|
|
|
urlData := r.getURLShareableCompositeQuery()
|
|
|
|
|
|
|
|
|
|
if isFormula(selectedQuery) {
|
|
|
|
|
query := r.ruleCondition.CompositeQuery.BuilderQueries[selectedQuery]
|
|
|
|
|
expression, _ := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, postprocess.EvalFuncs())
|
|
|
|
|
for _, v := range expression.Vars() {
|
|
|
|
|
for idx := range urlData.Builder.QueryData {
|
|
|
|
|
if urlData.Builder.QueryData[idx].QueryName == v {
|
|
|
|
|
urlData.Builder.QueryData[idx].Filters.Items = r.fetchFilters(v, lbls)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
for idx := range urlData.Builder.QueryData {
|
|
|
|
|
if urlData.Builder.QueryData[idx].QueryName == selectedQuery {
|
|
|
|
|
urlData.Builder.QueryData[idx].Filters.Items = r.fetchFilters(selectedQuery, lbls)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
q := r.prepareQueryRange(ts)
|
|
|
|
|
// Traces list view expects time in nanoseconds
|
|
|
|
|
tr := timeRange{
|
|
|
|
|
Start: q.Start * time.Second.Microseconds(),
|
|
|
|
|
End: q.End * time.Second.Microseconds(),
|
|
|
|
|
PageSize: 100,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
period, _ := json.Marshal(tr)
|
|
|
|
|
urlEncodedTimeRange := url.QueryEscape(string(period))
|
|
|
|
|
|
|
|
|
|
data, _ := json.Marshal(urlData)
|
|
|
|
|
compositeQuery := url.QueryEscape(url.QueryEscape(string(data)))
|
|
|
|
|
|
|
|
|
|
return fmt.Sprintf("compositeQuery=%s&timeRange=%s&startTime=%d&endTime=%d&panelTypes=graph&ruleId=%s", compositeQuery, urlEncodedTimeRange, tr.Start, tr.End, r.ID())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *ThresholdRule) hostFromSource() string {
|
|
|
|
|
parsedUrl, err := url.Parse(r.source)
|
|
|
|
|
if err != nil {
|
|
|
|
|
@@ -727,7 +785,7 @@ func (r *ThresholdRule) GetSelectedQuery() string {
|
|
|
|
|
return ""
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch clickhouse.Conn) (Vector, error) {
|
|
|
|
|
func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time) (Vector, error) {
|
|
|
|
|
if r.ruleCondition == nil || r.ruleCondition.CompositeQuery == nil {
|
|
|
|
|
r.SetHealth(HealthBad)
|
|
|
|
|
r.SetLastError(fmt.Errorf("no rule condition"))
|
|
|
|
|
@@ -735,7 +793,7 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch c
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
params := r.prepareQueryRange(ts)
|
|
|
|
|
err := r.populateTemporality(ctx, params, ch)
|
|
|
|
|
err := r.populateTemporality(ctx, params)
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.SetHealth(HealthBad)
|
|
|
|
|
zap.L().Error("failed to set temporality", zap.String("rule", r.Name()), zap.Error(err))
|
|
|
|
|
@@ -834,7 +892,7 @@ func normalizeLabelName(name string) string {
|
|
|
|
|
func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) {
|
|
|
|
|
|
|
|
|
|
valueFormatter := formatter.FromUnit(r.Unit())
|
|
|
|
|
res, err := r.buildAndRunQuery(ctx, ts, queriers.Ch)
|
|
|
|
|
res, err := r.buildAndRunQuery(ctx, ts)
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.SetHealth(HealthBad)
|
|
|
|
|
@@ -919,6 +977,9 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if r.ruleCondition.QueryType() == v3.QueryTypeBuilder {
|
|
|
|
|
annotations = append(annotations, labels.Label{Name: "alert_link", Value: fmt.Sprintf("%s/alerts/edit?%s", r.hostFromSource(), r.prepareLinkToAlert(ts, smpl.MetricOrig))})
|
|
|
|
|
}
|
|
|
|
|
lbs := lb.Labels()
|
|
|
|
|
h := lbs.Hash()
|
|
|
|
|
resultFPs[h] = struct{}{}
|