Compare commits
14 Commits
main
...
chore/cust
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9c3af96235 | ||
|
|
5e0e89ecf0 | ||
|
|
a504fd5664 | ||
|
|
9b006ffc16 | ||
|
|
92cfd6133e | ||
|
|
1c14420855 | ||
|
|
37abb1c2db | ||
|
|
0ec728653e | ||
|
|
db4dd9fb93 | ||
|
|
f6652164ce | ||
|
|
8cdb6e03d6 | ||
|
|
4a4706182c | ||
|
|
7e65ea090d | ||
|
|
6d5a76768b |
@@ -3,6 +3,7 @@ package app
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"log/slog"
|
||||
"net"
|
||||
"net/http"
|
||||
@@ -108,6 +109,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
|
||||
signoz.Modules.OrgGetter,
|
||||
signoz.Querier,
|
||||
signoz.Instrumentation.Logger(),
|
||||
signoz.NotificationGroups,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -417,17 +419,7 @@ func (s *Server) Stop(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeRulesManager(
|
||||
ch baseint.Reader,
|
||||
cache cache.Cache,
|
||||
alertmanager alertmanager.Alertmanager,
|
||||
sqlstore sqlstore.SQLStore,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
prometheus prometheus.Prometheus,
|
||||
orgGetter organization.Getter,
|
||||
querier querier.Querier,
|
||||
logger *slog.Logger,
|
||||
) (*baserules.Manager, error) {
|
||||
func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertmanager.Alertmanager, sqlstore sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, orgGetter organization.Getter, querier querier.Querier, logger *slog.Logger, groups notificationgrouping.NotificationGroups) (*baserules.Manager, error) {
|
||||
// create manager opts
|
||||
managerOpts := &baserules.ManagerOptions{
|
||||
TelemetryStore: telemetryStore,
|
||||
@@ -444,6 +436,7 @@ func makeRulesManager(
|
||||
Alertmanager: alertmanager,
|
||||
SQLStore: sqlstore,
|
||||
OrgGetter: orgGetter,
|
||||
NotificationGroups: groups,
|
||||
}
|
||||
|
||||
// create Manager
|
||||
|
||||
566
pkg/alertmanager/alertmanagerserver/dispatch/dispatcher.go
Normal file
566
pkg/alertmanager/alertmanagerserver/dispatch/dispatcher.go
Normal file
@@ -0,0 +1,566 @@
|
||||
// Copyright 2018 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package dispatch contains SigNoz's custom dispatcher implementation
|
||||
// based on Prometheus Alertmanager dispatcher
|
||||
package dispatch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/alertmanager/dispatch"
|
||||
"github.com/prometheus/alertmanager/notify"
|
||||
"github.com/prometheus/alertmanager/provider"
|
||||
"github.com/prometheus/alertmanager/store"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
)
|
||||
|
||||
// DispatcherMetrics represents metrics associated to a dispatcher.
|
||||
type DispatcherMetrics struct {
|
||||
aggrGroups prometheus.Gauge
|
||||
processingDuration prometheus.Summary
|
||||
aggrGroupLimitReached prometheus.Counter
|
||||
}
|
||||
|
||||
// NewDispatcherMetrics returns a new registered DispatchMetrics.
|
||||
func NewDispatcherMetrics(registerLimitMetrics bool, r prometheus.Registerer) *DispatcherMetrics {
|
||||
m := DispatcherMetrics{
|
||||
aggrGroups: prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "signoz_alertmanager_dispatcher_aggregation_groups",
|
||||
Help: "Number of active aggregation groups",
|
||||
},
|
||||
),
|
||||
processingDuration: prometheus.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "signoz_alertmanager_dispatcher_alert_processing_duration_seconds",
|
||||
Help: "Summary of latencies for the processing of alerts.",
|
||||
},
|
||||
),
|
||||
aggrGroupLimitReached: prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "signoz_alertmanager_dispatcher_aggregation_group_limit_reached_total",
|
||||
Help: "Number of times when dispatcher failed to create new aggregation group due to limit.",
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
if r != nil {
|
||||
r.MustRegister(m.aggrGroups, m.processingDuration)
|
||||
if registerLimitMetrics {
|
||||
r.MustRegister(m.aggrGroupLimitReached)
|
||||
}
|
||||
}
|
||||
|
||||
return &m
|
||||
}
|
||||
|
||||
// Dispatcher sorts incoming alerts into aggregation groups and
|
||||
// assigns the correct notifiers to each.
|
||||
type Dispatcher struct {
|
||||
route *dispatch.Route
|
||||
alerts provider.Alerts
|
||||
stage notify.Stage
|
||||
marker types.GroupMarker
|
||||
metrics *DispatcherMetrics
|
||||
limits Limits
|
||||
|
||||
timeout func(time.Duration) time.Duration
|
||||
|
||||
mtx sync.RWMutex
|
||||
aggrGroupsPerRoute map[*dispatch.Route]map[model.Fingerprint]*aggrGroup
|
||||
aggrGroupsNum int
|
||||
|
||||
done chan struct{}
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
|
||||
logger *slog.Logger
|
||||
notificationGroups notificationgrouping.NotificationGroups
|
||||
orgID string
|
||||
}
|
||||
|
||||
// We use the upstream Limits interface from Prometheus
|
||||
type Limits = dispatch.Limits
|
||||
|
||||
// NewDispatcher returns a new Dispatcher.
|
||||
func NewDispatcher(
|
||||
ap provider.Alerts,
|
||||
r *dispatch.Route,
|
||||
s notify.Stage,
|
||||
mk types.GroupMarker,
|
||||
to func(time.Duration) time.Duration,
|
||||
lim Limits,
|
||||
l *slog.Logger,
|
||||
m *DispatcherMetrics,
|
||||
n notificationgrouping.NotificationGroups,
|
||||
orgID string,
|
||||
) *Dispatcher {
|
||||
if lim == nil {
|
||||
// Use a simple implementation when no limits are provided
|
||||
lim = &unlimitedLimits{}
|
||||
}
|
||||
|
||||
disp := &Dispatcher{
|
||||
alerts: ap,
|
||||
stage: s,
|
||||
route: r,
|
||||
marker: mk,
|
||||
timeout: to,
|
||||
logger: l.With("component", "signoz-dispatcher"),
|
||||
metrics: m,
|
||||
limits: lim,
|
||||
notificationGroups: n,
|
||||
orgID: orgID,
|
||||
}
|
||||
return disp
|
||||
}
|
||||
|
||||
// Run starts dispatching alerts incoming via the updates channel.
|
||||
func (d *Dispatcher) Run() {
|
||||
d.done = make(chan struct{})
|
||||
|
||||
d.mtx.Lock()
|
||||
d.aggrGroupsPerRoute = map[*dispatch.Route]map[model.Fingerprint]*aggrGroup{}
|
||||
d.aggrGroupsNum = 0
|
||||
d.metrics.aggrGroups.Set(0)
|
||||
d.ctx, d.cancel = context.WithCancel(context.Background())
|
||||
d.mtx.Unlock()
|
||||
|
||||
d.run(d.alerts.Subscribe())
|
||||
close(d.done)
|
||||
}
|
||||
|
||||
func (d *Dispatcher) run(it provider.AlertIterator) {
|
||||
maintenance := time.NewTicker(30 * time.Second)
|
||||
defer maintenance.Stop()
|
||||
|
||||
defer it.Close()
|
||||
|
||||
for {
|
||||
select {
|
||||
case alert, ok := <-it.Next():
|
||||
if !ok {
|
||||
// Iterator exhausted for some reason.
|
||||
if err := it.Err(); err != nil {
|
||||
d.logger.ErrorContext(d.ctx, "Error on alert update", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
d.logger.DebugContext(d.ctx, "SigNoz Custom Dispatcher: Received alert", "alert", alert)
|
||||
|
||||
// Log errors but keep trying.
|
||||
if err := it.Err(); err != nil {
|
||||
d.logger.ErrorContext(d.ctx, "Error on alert update", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
for _, r := range d.route.Match(alert.Labels) {
|
||||
d.processAlert(alert, r)
|
||||
}
|
||||
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
|
||||
|
||||
case <-maintenance.C:
|
||||
d.doMaintenance()
|
||||
case <-d.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Dispatcher) doMaintenance() {
|
||||
d.mtx.Lock()
|
||||
defer d.mtx.Unlock()
|
||||
for _, groups := range d.aggrGroupsPerRoute {
|
||||
for _, ag := range groups {
|
||||
if ag.empty() {
|
||||
ag.stop()
|
||||
d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey())
|
||||
delete(groups, ag.fingerprint())
|
||||
d.aggrGroupsNum--
|
||||
d.metrics.aggrGroups.Dec()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AlertGroup represents how alerts exist within an aggrGroup.
|
||||
type AlertGroup struct {
|
||||
Alerts types.AlertSlice
|
||||
Labels model.LabelSet
|
||||
Receiver string
|
||||
GroupKey string
|
||||
RouteID string
|
||||
}
|
||||
|
||||
type AlertGroups []*AlertGroup
|
||||
|
||||
func (ag AlertGroups) Swap(i, j int) { ag[i], ag[j] = ag[j], ag[i] }
|
||||
func (ag AlertGroups) Less(i, j int) bool {
|
||||
if ag[i].Labels.Equal(ag[j].Labels) {
|
||||
return ag[i].Receiver < ag[j].Receiver
|
||||
}
|
||||
return ag[i].Labels.Before(ag[j].Labels)
|
||||
}
|
||||
func (ag AlertGroups) Len() int { return len(ag) }
|
||||
|
||||
// Groups returns a slice of AlertGroups from the dispatcher's internal state.
|
||||
func (d *Dispatcher) Groups(routeFilter func(*dispatch.Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
|
||||
groups := AlertGroups{}
|
||||
|
||||
d.mtx.RLock()
|
||||
defer d.mtx.RUnlock()
|
||||
|
||||
// Keep a list of receivers for an alert to prevent checking each alert
|
||||
// again against all routes. The alert has already matched against this
|
||||
// route on ingestion.
|
||||
receivers := map[model.Fingerprint][]string{}
|
||||
|
||||
now := time.Now()
|
||||
for route, ags := range d.aggrGroupsPerRoute {
|
||||
if !routeFilter(route) {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, ag := range ags {
|
||||
receiver := route.RouteOpts.Receiver
|
||||
alertGroup := &AlertGroup{
|
||||
Labels: ag.labels,
|
||||
Receiver: receiver,
|
||||
GroupKey: ag.GroupKey(),
|
||||
RouteID: ag.routeID,
|
||||
}
|
||||
|
||||
alerts := ag.alerts.List()
|
||||
filteredAlerts := make([]*types.Alert, 0, len(alerts))
|
||||
for _, a := range alerts {
|
||||
if !alertFilter(a, now) {
|
||||
continue
|
||||
}
|
||||
|
||||
fp := a.Fingerprint()
|
||||
if r, ok := receivers[fp]; ok {
|
||||
// Receivers slice already exists. Add
|
||||
// the current receiver to the slice.
|
||||
receivers[fp] = append(r, receiver)
|
||||
} else {
|
||||
// First time we've seen this alert fingerprint.
|
||||
// Initialize a new receivers slice.
|
||||
receivers[fp] = []string{receiver}
|
||||
}
|
||||
|
||||
filteredAlerts = append(filteredAlerts, a)
|
||||
}
|
||||
if len(filteredAlerts) == 0 {
|
||||
continue
|
||||
}
|
||||
alertGroup.Alerts = filteredAlerts
|
||||
|
||||
groups = append(groups, alertGroup)
|
||||
}
|
||||
}
|
||||
sort.Sort(groups)
|
||||
for i := range groups {
|
||||
sort.Sort(groups[i].Alerts)
|
||||
}
|
||||
for i := range receivers {
|
||||
sort.Strings(receivers[i])
|
||||
}
|
||||
|
||||
return groups, receivers
|
||||
}
|
||||
|
||||
// Stop the dispatcher.
|
||||
func (d *Dispatcher) Stop() {
|
||||
if d == nil {
|
||||
return
|
||||
}
|
||||
d.mtx.Lock()
|
||||
if d.cancel == nil {
|
||||
d.mtx.Unlock()
|
||||
return
|
||||
}
|
||||
d.cancel()
|
||||
d.cancel = nil
|
||||
d.mtx.Unlock()
|
||||
|
||||
<-d.done
|
||||
}
|
||||
|
||||
// GetStats returns statistics about the dispatcher for SigNoz monitoring
|
||||
func (d *Dispatcher) GetStats() map[string]interface{} {
|
||||
if d == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
d.mtx.RLock()
|
||||
defer d.mtx.RUnlock()
|
||||
|
||||
stats := map[string]interface{}{
|
||||
"total_aggregation_groups": d.aggrGroupsNum,
|
||||
"routes_count": len(d.aggrGroupsPerRoute),
|
||||
}
|
||||
|
||||
// Count groups per route
|
||||
routeStats := make(map[string]int)
|
||||
for route, groups := range d.aggrGroupsPerRoute {
|
||||
routeKey := route.Key()
|
||||
routeStats[routeKey] = len(groups)
|
||||
}
|
||||
stats["groups_per_route"] = routeStats
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
// notifyFunc is a function that performs notification for the alert
|
||||
// with the given fingerprint. It aborts on context cancelation.
|
||||
// Returns false iff notifying failed.
|
||||
type notifyFunc func(context.Context, ...*types.Alert) bool
|
||||
|
||||
// processAlert determines in which aggregation group the alert falls
|
||||
// and inserts it.
|
||||
func (d *Dispatcher) processAlert(alert *types.Alert, route *dispatch.Route) {
|
||||
groupLabels := d.notificationGroups.GetGroupLabels(d.orgID, alert, route)
|
||||
|
||||
fp := groupLabels.Fingerprint()
|
||||
|
||||
d.mtx.Lock()
|
||||
defer d.mtx.Unlock()
|
||||
|
||||
routeGroups, ok := d.aggrGroupsPerRoute[route]
|
||||
if !ok {
|
||||
routeGroups = map[model.Fingerprint]*aggrGroup{}
|
||||
d.aggrGroupsPerRoute[route] = routeGroups
|
||||
}
|
||||
|
||||
ag, ok := routeGroups[fp]
|
||||
if ok {
|
||||
ag.insert(alert)
|
||||
return
|
||||
}
|
||||
|
||||
// If the group does not exist, create it. But check the limit first.
|
||||
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
|
||||
d.metrics.aggrGroupLimitReached.Inc()
|
||||
d.logger.ErrorContext(d.ctx, "Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
|
||||
return
|
||||
}
|
||||
|
||||
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
|
||||
routeGroups[fp] = ag
|
||||
d.aggrGroupsNum++
|
||||
d.metrics.aggrGroups.Inc()
|
||||
|
||||
// Insert the 1st alert in the group before starting the group's run()
|
||||
// function, to make sure that when the run() will be executed the 1st
|
||||
// alert is already there.
|
||||
ag.insert(alert)
|
||||
|
||||
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
|
||||
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
|
||||
if err != nil {
|
||||
logger := d.logger.With("num_alerts", len(alerts), "err", err)
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
// It is expected for the context to be canceled on
|
||||
// configuration reload or shutdown. In this case, the
|
||||
// message should only be logged at the debug level.
|
||||
logger.DebugContext(ctx, "Notify for alerts failed")
|
||||
} else {
|
||||
logger.ErrorContext(ctx, "Notify for alerts failed")
|
||||
}
|
||||
}
|
||||
return err == nil
|
||||
})
|
||||
}
|
||||
|
||||
// aggrGroup aggregates alert fingerprints into groups to which a
|
||||
// common set of routing options applies.
|
||||
// It emits notifications in the specified intervals.
|
||||
type aggrGroup struct {
|
||||
labels model.LabelSet
|
||||
opts *dispatch.RouteOpts
|
||||
logger *slog.Logger
|
||||
routeID string
|
||||
routeKey string
|
||||
|
||||
alerts *store.Alerts
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
done chan struct{}
|
||||
next *time.Timer
|
||||
timeout func(time.Duration) time.Duration
|
||||
|
||||
mtx sync.RWMutex
|
||||
hasFlushed bool
|
||||
}
|
||||
|
||||
// newAggrGroup returns a new aggregation group.
|
||||
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *dispatch.Route, to func(time.Duration) time.Duration, logger *slog.Logger) *aggrGroup {
|
||||
if to == nil {
|
||||
to = func(d time.Duration) time.Duration { return d }
|
||||
}
|
||||
ag := &aggrGroup{
|
||||
labels: labels,
|
||||
routeID: r.ID(),
|
||||
routeKey: r.Key(),
|
||||
opts: &r.RouteOpts,
|
||||
timeout: to,
|
||||
alerts: store.NewAlerts(),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
ag.ctx, ag.cancel = context.WithCancel(ctx)
|
||||
|
||||
ag.logger = logger.With("aggr_group", ag)
|
||||
|
||||
// Set an initial one-time wait before flushing
|
||||
// the first batch of notifications.
|
||||
ag.next = time.NewTimer(ag.opts.GroupWait)
|
||||
|
||||
return ag
|
||||
}
|
||||
|
||||
func (ag *aggrGroup) fingerprint() model.Fingerprint {
|
||||
return ag.labels.Fingerprint()
|
||||
}
|
||||
|
||||
func (ag *aggrGroup) GroupKey() string {
|
||||
return fmt.Sprintf("%s:%s", ag.routeKey, ag.labels)
|
||||
}
|
||||
|
||||
func (ag *aggrGroup) String() string {
|
||||
return ag.GroupKey()
|
||||
}
|
||||
|
||||
func (ag *aggrGroup) run(nf notifyFunc) {
|
||||
defer close(ag.done)
|
||||
defer ag.next.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case now := <-ag.next.C:
|
||||
// Give the notifications time until the next flush to
|
||||
// finish before terminating them.
|
||||
ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval))
|
||||
|
||||
// The now time we retrieve from the ticker is the only reliable
|
||||
// point of time reference for the subsequent notification pipeline.
|
||||
// Calculating the current time directly is prone to flaky behavior,
|
||||
// which usually only becomes apparent in tests.
|
||||
ctx = notify.WithNow(ctx, now)
|
||||
|
||||
// Populate context with information needed along the pipeline.
|
||||
ctx = notify.WithGroupKey(ctx, ag.GroupKey())
|
||||
ctx = notify.WithGroupLabels(ctx, ag.labels)
|
||||
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
|
||||
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
|
||||
ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
|
||||
ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)
|
||||
ctx = notify.WithRouteID(ctx, ag.routeID)
|
||||
|
||||
// Wait the configured interval before calling flush again.
|
||||
ag.mtx.Lock()
|
||||
ag.next.Reset(ag.opts.GroupInterval)
|
||||
ag.hasFlushed = true
|
||||
ag.mtx.Unlock()
|
||||
|
||||
ag.flush(func(alerts ...*types.Alert) bool {
|
||||
return nf(ctx, alerts...)
|
||||
})
|
||||
|
||||
cancel()
|
||||
|
||||
case <-ag.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ag *aggrGroup) stop() {
|
||||
// Calling cancel will terminate all in-process notifications
|
||||
// and the run() loop.
|
||||
ag.cancel()
|
||||
<-ag.done
|
||||
}
|
||||
|
||||
// insert inserts the alert into the aggregation group.
|
||||
func (ag *aggrGroup) insert(alert *types.Alert) {
|
||||
if err := ag.alerts.Set(alert); err != nil {
|
||||
ag.logger.ErrorContext(ag.ctx, "error on set alert", "err", err)
|
||||
}
|
||||
|
||||
// Immediately trigger a flush if the wait duration for this
|
||||
// alert is already over.
|
||||
ag.mtx.Lock()
|
||||
defer ag.mtx.Unlock()
|
||||
if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
|
||||
ag.next.Reset(0)
|
||||
}
|
||||
}
|
||||
|
||||
func (ag *aggrGroup) empty() bool {
|
||||
return ag.alerts.Empty()
|
||||
}
|
||||
|
||||
// flush sends notifications for all new alerts.
|
||||
func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
|
||||
if ag.empty() {
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
alerts = ag.alerts.List()
|
||||
alertsSlice = make(types.AlertSlice, 0, len(alerts))
|
||||
resolvedSlice = make(types.AlertSlice, 0, len(alerts))
|
||||
now = time.Now()
|
||||
)
|
||||
for _, alert := range alerts {
|
||||
a := *alert
|
||||
// Ensure that alerts don't resolve as time move forwards.
|
||||
if a.ResolvedAt(now) {
|
||||
resolvedSlice = append(resolvedSlice, &a)
|
||||
} else {
|
||||
a.EndsAt = time.Time{}
|
||||
}
|
||||
alertsSlice = append(alertsSlice, &a)
|
||||
}
|
||||
sort.Stable(alertsSlice)
|
||||
|
||||
ag.logger.DebugContext(ag.ctx, "flushing", "alerts", fmt.Sprintf("%v", alertsSlice))
|
||||
|
||||
if notify(alertsSlice...) {
|
||||
// Delete all resolved alerts as we just sent a notification for them,
|
||||
// and we don't want to send another one. However, we need to make sure
|
||||
// that each resolved alert has not fired again during the flush as then
|
||||
// we would delete an active alert thinking it was resolved.
|
||||
if err := ag.alerts.DeleteIfNotModified(resolvedSlice); err != nil {
|
||||
ag.logger.ErrorContext(ag.ctx, "error on delete alerts", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// unlimitedLimits provides unlimited aggregation groups for SigNoz
|
||||
type unlimitedLimits struct{}
|
||||
|
||||
func (u *unlimitedLimits) MaxNumberOfAggregationGroups() int { return 0 }
|
||||
@@ -7,7 +7,10 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
signozDispatcher "github.com/SigNoz/signoz/pkg/alertmanager/alertmanagerserver/dispatch"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||
"github.com/prometheus/alertmanager/dispatch"
|
||||
"github.com/prometheus/alertmanager/featurecontrol"
|
||||
@@ -49,29 +52,31 @@ type Server struct {
|
||||
stateStore alertmanagertypes.StateStore
|
||||
|
||||
// alertmanager primitives from upstream alertmanager
|
||||
alerts *mem.Alerts
|
||||
nflog *nflog.Log
|
||||
dispatcher *dispatch.Dispatcher
|
||||
dispatcherMetrics *dispatch.DispatcherMetrics
|
||||
inhibitor *inhibit.Inhibitor
|
||||
silencer *silence.Silencer
|
||||
silences *silence.Silences
|
||||
timeIntervals map[string][]timeinterval.TimeInterval
|
||||
pipelineBuilder *notify.PipelineBuilder
|
||||
marker *alertmanagertypes.MemMarker
|
||||
tmpl *template.Template
|
||||
wg sync.WaitGroup
|
||||
stopc chan struct{}
|
||||
alerts *mem.Alerts
|
||||
nflog *nflog.Log
|
||||
dispatcher *signozDispatcher.Dispatcher
|
||||
dispatcherMetrics *signozDispatcher.DispatcherMetrics
|
||||
inhibitor *inhibit.Inhibitor
|
||||
silencer *silence.Silencer
|
||||
silences *silence.Silences
|
||||
timeIntervals map[string][]timeinterval.TimeInterval
|
||||
pipelineBuilder *notify.PipelineBuilder
|
||||
marker *alertmanagertypes.MemMarker
|
||||
tmpl *template.Template
|
||||
wg sync.WaitGroup
|
||||
stopc chan struct{}
|
||||
notificationGroups notificationgrouping.NotificationGroups
|
||||
}
|
||||
|
||||
func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registerer, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore) (*Server, error) {
|
||||
func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registerer, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore, groups notificationgrouping.NotificationGroups) (*Server, error) {
|
||||
server := &Server{
|
||||
logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/alertmanagerserver"),
|
||||
registry: registry,
|
||||
srvConfig: srvConfig,
|
||||
orgID: orgID,
|
||||
stateStore: stateStore,
|
||||
stopc: make(chan struct{}),
|
||||
logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/alertmanagerserver"),
|
||||
registry: registry,
|
||||
srvConfig: srvConfig,
|
||||
orgID: orgID,
|
||||
stateStore: stateStore,
|
||||
stopc: make(chan struct{}),
|
||||
notificationGroups: groups,
|
||||
}
|
||||
// initialize marker
|
||||
server.marker = alertmanagertypes.NewMarker(server.registry)
|
||||
@@ -187,7 +192,7 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere
|
||||
}
|
||||
|
||||
server.pipelineBuilder = notify.NewPipelineBuilder(server.registry, featurecontrol.NoopFlags{})
|
||||
server.dispatcherMetrics = dispatch.NewDispatcherMetrics(false, server.registry)
|
||||
server.dispatcherMetrics = signozDispatcher.NewDispatcherMetrics(false, server.registry)
|
||||
|
||||
return server, nil
|
||||
}
|
||||
@@ -292,7 +297,7 @@ func (server *Server) SetConfig(ctx context.Context, alertmanagerConfig *alertma
|
||||
return d
|
||||
}
|
||||
|
||||
server.dispatcher = dispatch.NewDispatcher(
|
||||
server.dispatcher = signozDispatcher.NewDispatcher(
|
||||
server.alerts,
|
||||
routes,
|
||||
pipeline,
|
||||
@@ -301,6 +306,8 @@ func (server *Server) SetConfig(ctx context.Context, alertmanagerConfig *alertma
|
||||
nil,
|
||||
server.logger,
|
||||
server.dispatcherMetrics,
|
||||
server.notificationGroups,
|
||||
server.orgID,
|
||||
)
|
||||
|
||||
// Do not try to add these to server.wg as there seems to be a race condition if
|
||||
|
||||
@@ -11,6 +11,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory/factorytest"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
|
||||
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes/alertmanagertypestest"
|
||||
"github.com/go-openapi/strfmt"
|
||||
@@ -23,7 +26,9 @@ import (
|
||||
)
|
||||
|
||||
func TestServerSetConfigAndStop(t *testing.T) {
|
||||
server, err := New(context.Background(), slog.New(slog.NewTextHandler(io.Discard, nil)), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore())
|
||||
notificationGroups, err := notificationgroupingtest.New(context.Background(), factorytest.NewSettings(), notificationgrouping.Config{})
|
||||
require.NoError(t, err)
|
||||
server, err := New(context.Background(), slog.New(slog.NewTextHandler(io.Discard, nil)), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationGroups)
|
||||
require.NoError(t, err)
|
||||
|
||||
amConfig, err := alertmanagertypes.NewDefaultConfig(alertmanagertypes.GlobalConfig{}, alertmanagertypes.RouteConfig{GroupInterval: 1 * time.Minute, RepeatInterval: 1 * time.Minute, GroupWait: 1 * time.Minute}, "1")
|
||||
@@ -34,7 +39,9 @@ func TestServerSetConfigAndStop(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestServerTestReceiverTypeWebhook(t *testing.T) {
|
||||
server, err := New(context.Background(), slog.New(slog.NewTextHandler(io.Discard, nil)), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore())
|
||||
notificationGroups, err := notificationgroupingtest.New(context.Background(), factorytest.NewSettings(), notificationgrouping.Config{})
|
||||
require.NoError(t, err)
|
||||
server, err := New(context.Background(), slog.New(slog.NewTextHandler(io.Discard, nil)), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationGroups)
|
||||
require.NoError(t, err)
|
||||
|
||||
amConfig, err := alertmanagertypes.NewDefaultConfig(alertmanagertypes.GlobalConfig{}, alertmanagertypes.RouteConfig{GroupInterval: 1 * time.Minute, RepeatInterval: 1 * time.Minute, GroupWait: 1 * time.Minute}, "1")
|
||||
@@ -81,7 +88,9 @@ func TestServerPutAlerts(t *testing.T) {
|
||||
stateStore := alertmanagertypestest.NewStateStore()
|
||||
srvCfg := NewConfig()
|
||||
srvCfg.Route.GroupInterval = 1 * time.Second
|
||||
server, err := New(context.Background(), slog.New(slog.NewTextHandler(io.Discard, nil)), prometheus.NewRegistry(), srvCfg, "1", stateStore)
|
||||
notificationGroups, err := notificationgroupingtest.New(context.Background(), factorytest.NewSettings(), notificationgrouping.Config{})
|
||||
require.NoError(t, err)
|
||||
server, err := New(context.Background(), slog.New(slog.NewTextHandler(io.Discard, nil)), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationGroups)
|
||||
require.NoError(t, err)
|
||||
|
||||
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
@@ -32,6 +33,8 @@ type Service struct {
|
||||
|
||||
// Mutex to protect the servers map
|
||||
serversMtx sync.RWMutex
|
||||
|
||||
notificationGroups notificationgrouping.NotificationGroups
|
||||
}
|
||||
|
||||
func New(
|
||||
@@ -41,15 +44,17 @@ func New(
|
||||
stateStore alertmanagertypes.StateStore,
|
||||
configStore alertmanagertypes.ConfigStore,
|
||||
orgGetter organization.Getter,
|
||||
groups notificationgrouping.NotificationGroups,
|
||||
) *Service {
|
||||
service := &Service{
|
||||
config: config,
|
||||
stateStore: stateStore,
|
||||
configStore: configStore,
|
||||
orgGetter: orgGetter,
|
||||
settings: settings,
|
||||
servers: make(map[string]*alertmanagerserver.Server),
|
||||
serversMtx: sync.RWMutex{},
|
||||
config: config,
|
||||
stateStore: stateStore,
|
||||
configStore: configStore,
|
||||
orgGetter: orgGetter,
|
||||
settings: settings,
|
||||
servers: make(map[string]*alertmanagerserver.Server),
|
||||
serversMtx: sync.RWMutex{},
|
||||
notificationGroups: groups,
|
||||
}
|
||||
|
||||
return service
|
||||
@@ -167,7 +172,7 @@ func (service *Service) newServer(ctx context.Context, orgID string) (*alertmana
|
||||
return nil, err
|
||||
}
|
||||
|
||||
server, err := alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config, orgID, service.stateStore)
|
||||
server, err := alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config, orgID, service.stateStore, service.notificationGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -9,27 +9,29 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
service *alertmanager.Service
|
||||
config alertmanager.Config
|
||||
settings factory.ScopedProviderSettings
|
||||
configStore alertmanagertypes.ConfigStore
|
||||
stateStore alertmanagertypes.StateStore
|
||||
stopC chan struct{}
|
||||
service *alertmanager.Service
|
||||
config alertmanager.Config
|
||||
settings factory.ScopedProviderSettings
|
||||
configStore alertmanagertypes.ConfigStore
|
||||
stateStore alertmanagertypes.StateStore
|
||||
notificationGroups notificationgrouping.NotificationGroups
|
||||
stopC chan struct{}
|
||||
}
|
||||
|
||||
func NewFactory(sqlstore sqlstore.SQLStore, orgGetter organization.Getter) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
|
||||
func NewFactory(sqlstore sqlstore.SQLStore, orgGetter organization.Getter, notificationGroups notificationgrouping.NotificationGroups) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanager.Config) (alertmanager.Alertmanager, error) {
|
||||
return New(ctx, settings, config, sqlstore, orgGetter)
|
||||
return New(ctx, settings, config, sqlstore, orgGetter, notificationGroups)
|
||||
})
|
||||
}
|
||||
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore, orgGetter organization.Getter) (*provider, error) {
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore, orgGetter organization.Getter, notificationGroups notificationgrouping.NotificationGroups) (*provider, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager")
|
||||
configStore := sqlalertmanagerstore.NewConfigStore(sqlstore)
|
||||
stateStore := sqlalertmanagerstore.NewStateStore(sqlstore)
|
||||
@@ -42,12 +44,14 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
|
||||
stateStore,
|
||||
configStore,
|
||||
orgGetter,
|
||||
notificationGroups,
|
||||
),
|
||||
settings: settings,
|
||||
config: config,
|
||||
configStore: configStore,
|
||||
stateStore: stateStore,
|
||||
stopC: make(chan struct{}),
|
||||
settings: settings,
|
||||
config: config,
|
||||
configStore: configStore,
|
||||
stateStore: stateStore,
|
||||
notificationGroups: notificationGroups,
|
||||
stopC: make(chan struct{}),
|
||||
}
|
||||
|
||||
return p, nil
|
||||
|
||||
30
pkg/notificationgrouping/config.go
Normal file
30
pkg/notificationgrouping/config.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package notificationgrouping
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
// Provider is the provider to use for notification grouping.
|
||||
Provider string `mapstructure:"provider"`
|
||||
// DefaultStrategy is the default grouping strategy to use when no rule-specific strategy is configured.
|
||||
DefaultStrategy string `mapstructure:"default_strategy"`
|
||||
}
|
||||
|
||||
func NewConfigFactory() factory.ConfigFactory {
|
||||
return factory.NewConfigFactory(factory.MustNewName("notificationgrouping"), newConfig)
|
||||
}
|
||||
|
||||
// newConfig creates a new default configuration for notification grouping.
|
||||
func newConfig() factory.Config {
|
||||
return Config{
|
||||
Provider: "rulebased",
|
||||
DefaultStrategy: "standard",
|
||||
}
|
||||
}
|
||||
|
||||
// Validate validates the configuration and returns an error if invalid.
|
||||
func (c Config) Validate() error {
|
||||
// Add validation logic here if needed
|
||||
return nil
|
||||
}
|
||||
15
pkg/notificationgrouping/notificationgrouping.go
Normal file
15
pkg/notificationgrouping/notificationgrouping.go
Normal file
@@ -0,0 +1,15 @@
|
||||
// Package notificationgrouping provides interfaces and implementations for alert notification grouping strategies.
|
||||
// It supports multi-tenancy and rule-based grouping configurations.
|
||||
package notificationgrouping
|
||||
|
||||
import (
|
||||
"github.com/prometheus/alertmanager/dispatch"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
// NotificationGroups defines how alerts should be grouped for notification with multi-tenancy support.
|
||||
type NotificationGroups interface {
|
||||
GetGroupLabels(orgID string, alert *types.Alert, route *dispatch.Route) model.LabelSet
|
||||
SetGroupLabels(orgID string, ruleID string, groupByLabels []string) error
|
||||
}
|
||||
50
pkg/notificationgrouping/notificationgrouping_test.go
Normal file
50
pkg/notificationgrouping/notificationgrouping_test.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package notificationgrouping
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/alertmanager/dispatch"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestNotificationGroupsInterface(t *testing.T) {
|
||||
// Test that the interface can be implemented
|
||||
var ng NotificationGroups = &mockNotificationGroups{}
|
||||
|
||||
alert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": "test_alert",
|
||||
"severity": "critical",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
route := &dispatch.Route{
|
||||
RouteOpts: dispatch.RouteOpts{
|
||||
GroupBy: map[model.LabelName]struct{}{
|
||||
"alertname": {},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
groupLabels := ng.GetGroupLabels("test-org", alert, route)
|
||||
assert.NotNil(t, groupLabels)
|
||||
|
||||
err := ng.SetGroupLabels("test-org", "test-rule", []string{"alertname"})
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
type mockNotificationGroups struct{}
|
||||
|
||||
func (m *mockNotificationGroups) GetGroupLabels(orgID string, alert *types.Alert, route *dispatch.Route) model.LabelSet {
|
||||
return model.LabelSet{
|
||||
"alertname": alert.Labels["alertname"],
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockNotificationGroups) SetGroupLabels(orgID string, ruleID string, groupByLabels []string) error {
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
package notificationgroupingtest
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/prometheus/alertmanager/dispatch"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
settings factory.ScopedProviderSettings
|
||||
// Mock data for testing
|
||||
mockGroupLabels model.LabelSet
|
||||
mockError error
|
||||
}
|
||||
|
||||
// NewFactory creates a new factory for the test notification grouping strategy.
|
||||
func NewFactory() factory.ProviderFactory[notificationgrouping.NotificationGroups, notificationgrouping.Config] {
|
||||
return factory.NewProviderFactory(
|
||||
factory.MustNewName("test"),
|
||||
func(ctx context.Context, settings factory.ProviderSettings, config notificationgrouping.Config) (notificationgrouping.NotificationGroups, error) {
|
||||
return New(ctx, settings, config)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// New creates a new test notification grouping strategy provider.
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config notificationgrouping.Config) (notificationgrouping.NotificationGroups, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest")
|
||||
|
||||
return &provider{
|
||||
settings: settings,
|
||||
mockGroupLabels: model.LabelSet{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetGroupLabels implements the NotificationGroups interface for testing.
|
||||
func (p *provider) GetGroupLabels(orgID string, alert *types.Alert, route *dispatch.Route) model.LabelSet {
|
||||
if p.mockError != nil {
|
||||
return model.LabelSet{}
|
||||
}
|
||||
return p.mockGroupLabels
|
||||
}
|
||||
|
||||
// SetGroupLabels implements the NotificationGroups interface for testing.
|
||||
func (p *provider) SetGroupLabels(orgID string, ruleID string, groupByLabels []string) error {
|
||||
return p.mockError
|
||||
}
|
||||
|
||||
// SetMockGroupLabels sets mock group labels for testing.
|
||||
func (p *provider) SetMockGroupLabels(labels model.LabelSet) {
|
||||
p.mockGroupLabels = labels
|
||||
}
|
||||
|
||||
// SetMockError sets a mock error for testing.
|
||||
func (p *provider) SetMockError(err error) {
|
||||
p.mockError = err
|
||||
}
|
||||
102
pkg/notificationgrouping/rulebasedgrouping/provider.go
Normal file
102
pkg/notificationgrouping/rulebasedgrouping/provider.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package rulebasedgrouping
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping/standardgrouping"
|
||||
"github.com/prometheus/alertmanager/dispatch"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
settings factory.ScopedProviderSettings
|
||||
orgToRuleToNotificationGroupBy map[string]map[string][]string
|
||||
fallbackStrategy notificationgrouping.NotificationGroups
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
// NewFactory creates a new factory for the rule-based grouping strategy.
|
||||
func NewFactory() factory.ProviderFactory[notificationgrouping.NotificationGroups, notificationgrouping.Config] {
|
||||
return factory.NewProviderFactory(
|
||||
factory.MustNewName("rulebased"),
|
||||
func(ctx context.Context, settings factory.ProviderSettings, config notificationgrouping.Config) (notificationgrouping.NotificationGroups, error) {
|
||||
return New(ctx, settings, config)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// New creates a new rule-based grouping strategy provider.
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config notificationgrouping.Config) (notificationgrouping.NotificationGroups, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/notificationgrouping/rulebasedgrouping")
|
||||
|
||||
// Create fallback strategy based on config
|
||||
fallbackStrategy, err := standardgrouping.New(ctx, providerSettings, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &provider{
|
||||
settings: settings,
|
||||
orgToRuleToNotificationGroupBy: make(map[string]map[string][]string),
|
||||
fallbackStrategy: fallbackStrategy,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetGroupLabels uses rule-specific notificationGroupBy if available, otherwise falls back to standard groupBy.
|
||||
func (r *provider) GetGroupLabels(orgID string, alert *types.Alert, route *dispatch.Route) model.LabelSet {
|
||||
r.mutex.RLock()
|
||||
defer r.mutex.RUnlock()
|
||||
|
||||
ruleID := getRuleIDFromRoute(alert)
|
||||
|
||||
var notificationGroupBy []string
|
||||
if orgRules, exists := r.orgToRuleToNotificationGroupBy[orgID]; exists {
|
||||
if groupBy, ruleExists := orgRules[ruleID]; ruleExists {
|
||||
notificationGroupBy = groupBy
|
||||
}
|
||||
}
|
||||
|
||||
groupLabels := r.fallbackStrategy.GetGroupLabels(orgID, alert, route)
|
||||
|
||||
if len(notificationGroupBy) > 0 {
|
||||
// Use notificationGroupBy from rule
|
||||
for _, labelName := range notificationGroupBy {
|
||||
if labelValue, exists := alert.Labels[model.LabelName(labelName)]; exists {
|
||||
groupLabels[model.LabelName(labelName)] = labelValue
|
||||
}
|
||||
}
|
||||
}
|
||||
return groupLabels
|
||||
}
|
||||
|
||||
// SetGroupLabels updates the rule-to-notificationGroupBy mapping for the specified rule and organization.
|
||||
func (r *provider) SetGroupLabels(orgID string, ruleID string, groupByLabels []string) error {
|
||||
if orgID == "" || ruleID == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
||||
// Initialize org map if it doesn't exist
|
||||
if r.orgToRuleToNotificationGroupBy[orgID] == nil {
|
||||
r.orgToRuleToNotificationGroupBy[orgID] = make(map[string][]string)
|
||||
}
|
||||
|
||||
r.orgToRuleToNotificationGroupBy[orgID][ruleID] = groupByLabels
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getRuleIDFromRoute(alert *types.Alert) string {
|
||||
for name, value := range alert.Labels {
|
||||
if string(name) == "ruleId" {
|
||||
return string(value)
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
558
pkg/notificationgrouping/rulebasedgrouping/provider_test.go
Normal file
558
pkg/notificationgrouping/rulebasedgrouping/provider_test.go
Normal file
@@ -0,0 +1,558 @@
|
||||
package rulebasedgrouping
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/prometheus/alertmanager/dispatch"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func createTestProviderSettings() factory.ProviderSettings {
|
||||
return instrumentationtest.New().ToProviderSettings()
|
||||
}
|
||||
|
||||
func TestNewFactory(t *testing.T) {
|
||||
providerFactory := NewFactory()
|
||||
assert.NotNil(t, providerFactory)
|
||||
assert.Equal(t, "rulebased", providerFactory.Name().String())
|
||||
}
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
providerSettings := createTestProviderSettings()
|
||||
config := notificationgrouping.Config{}
|
||||
|
||||
provider, err := New(ctx, providerSettings, config)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, provider)
|
||||
|
||||
// Verify provider implements the interface correctly
|
||||
assert.Implements(t, (*notificationgrouping.NotificationGroups)(nil), provider)
|
||||
}
|
||||
|
||||
func TestProvider_SetGroupLabels(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
providerSettings := createTestProviderSettings()
|
||||
config := notificationgrouping.Config{}
|
||||
|
||||
provider, err := New(ctx, providerSettings, config)
|
||||
require.NoError(t, err)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
orgID string
|
||||
ruleID string
|
||||
groupLabels []string
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "valid parameters",
|
||||
orgID: "org1",
|
||||
ruleID: "rule1",
|
||||
groupLabels: []string{"alertname", "severity"},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "empty orgID",
|
||||
orgID: "",
|
||||
ruleID: "rule1",
|
||||
groupLabels: []string{"alertname"},
|
||||
wantErr: false, // Should not error but also not set anything
|
||||
},
|
||||
{
|
||||
name: "empty ruleID",
|
||||
orgID: "org1",
|
||||
ruleID: "",
|
||||
groupLabels: []string{"alertname"},
|
||||
wantErr: false, // Should not error but also not set anything
|
||||
},
|
||||
{
|
||||
name: "nil groupLabels",
|
||||
orgID: "org1",
|
||||
ruleID: "rule1",
|
||||
groupLabels: nil,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "empty groupLabels",
|
||||
orgID: "org1",
|
||||
ruleID: "rule1",
|
||||
groupLabels: []string{},
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := provider.SetGroupLabels(tt.orgID, tt.ruleID, tt.groupLabels)
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
if tt.orgID != "" && tt.ruleID != "" && tt.groupLabels != nil {
|
||||
alert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"ruleId": model.LabelValue(tt.ruleID),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Add all group labels to the alert so they can be retrieved
|
||||
for _, label := range tt.groupLabels {
|
||||
alert.Labels[model.LabelName(label)] = model.LabelValue("test_value")
|
||||
}
|
||||
|
||||
route := &dispatch.Route{
|
||||
RouteOpts: dispatch.RouteOpts{
|
||||
GroupBy: map[model.LabelName]struct{}{},
|
||||
},
|
||||
}
|
||||
|
||||
groupLabels := provider.GetGroupLabels(tt.orgID, alert, route)
|
||||
|
||||
// Verify that all configured group labels are present in the result
|
||||
for _, expectedLabel := range tt.groupLabels {
|
||||
_, exists := groupLabels[model.LabelName(expectedLabel)]
|
||||
assert.True(t, exists, "Expected stored label %s to be present in group labels", expectedLabel)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestProvider_SetGroupLabels_VerifyIsolatedStorage(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
providerSettings := createTestProviderSettings()
|
||||
config := notificationgrouping.Config{}
|
||||
|
||||
provider, err := New(ctx, providerSettings, config)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Set different labels for different org/rule combinations
|
||||
err = provider.SetGroupLabels("org1", "rule1", []string{"label_org1_rule1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = provider.SetGroupLabels("org1", "rule2", []string{"label_org1_rule2"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = provider.SetGroupLabels("org2", "rule1", []string{"label_org2_rule1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Test that each org/rule combination retrieves only its own labels
|
||||
testCases := []struct {
|
||||
orgID string
|
||||
ruleID string
|
||||
expectedLabel string
|
||||
unexpectedLabels []string
|
||||
}{
|
||||
{
|
||||
orgID: "org1",
|
||||
ruleID: "rule1",
|
||||
expectedLabel: "label_org1_rule1",
|
||||
unexpectedLabels: []string{"label_org1_rule2", "label_org2_rule1"},
|
||||
},
|
||||
{
|
||||
orgID: "org1",
|
||||
ruleID: "rule2",
|
||||
expectedLabel: "label_org1_rule2",
|
||||
unexpectedLabels: []string{"label_org1_rule1", "label_org2_rule1"},
|
||||
},
|
||||
{
|
||||
orgID: "org2",
|
||||
ruleID: "rule1",
|
||||
expectedLabel: "label_org2_rule1",
|
||||
unexpectedLabels: []string{"label_org1_rule1", "label_org1_rule2"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.orgID+"_"+tc.ruleID, func(t *testing.T) {
|
||||
alert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"ruleId": model.LabelValue(tc.ruleID),
|
||||
model.LabelName(tc.expectedLabel): "test_value",
|
||||
// Add all possible labels to the alert so we can test isolation
|
||||
"label_org1_rule1": "test_value",
|
||||
"label_org1_rule2": "test_value",
|
||||
"label_org2_rule1": "test_value",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
route := &dispatch.Route{
|
||||
RouteOpts: dispatch.RouteOpts{
|
||||
GroupBy: map[model.LabelName]struct{}{},
|
||||
},
|
||||
}
|
||||
|
||||
groupLabels := provider.GetGroupLabels(tc.orgID, alert, route)
|
||||
|
||||
// Should have the expected label for this org/rule combination
|
||||
_, exists := groupLabels[model.LabelName(tc.expectedLabel)]
|
||||
assert.True(t, exists, "Expected label %s to be present for %s/%s", tc.expectedLabel, tc.orgID, tc.ruleID)
|
||||
|
||||
// Should NOT have labels from other org/rule combinations
|
||||
for _, unexpectedLabel := range tc.unexpectedLabels {
|
||||
_, exists := groupLabels[model.LabelName(unexpectedLabel)]
|
||||
assert.False(t, exists, "Unexpected label %s should NOT be present for %s/%s", unexpectedLabel, tc.orgID, tc.ruleID)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestProvider_GetGroupLabels_WithRuleSpecificGrouping(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
providerSettings := createTestProviderSettings()
|
||||
config := notificationgrouping.Config{}
|
||||
|
||||
provider, err := New(ctx, providerSettings, config)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Set up test data
|
||||
orgID := "test-org"
|
||||
ruleID := "test-rule"
|
||||
groupByLabels := []string{"alertname", "severity"}
|
||||
err = provider.SetGroupLabels(orgID, ruleID, groupByLabels)
|
||||
require.NoError(t, err)
|
||||
|
||||
alert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"ruleId": model.LabelValue(ruleID),
|
||||
"alertname": "test_alert",
|
||||
"severity": "critical",
|
||||
"instance": "localhost:9090",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
route := &dispatch.Route{
|
||||
RouteOpts: dispatch.RouteOpts{
|
||||
GroupBy: map[model.LabelName]struct{}{
|
||||
"instance": {},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
groupLabels := provider.GetGroupLabels(orgID, alert, route)
|
||||
assert.NotNil(t, groupLabels)
|
||||
|
||||
// Should have rule-specific labels plus fallback labels
|
||||
expectedKeys := []string{"alertname", "severity", "instance"}
|
||||
for _, key := range expectedKeys {
|
||||
_, exists := groupLabels[model.LabelName(key)]
|
||||
assert.True(t, exists, "Expected key %s to be present in group labels", key)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProvider_GetGroupLabels_FallbackToStandardGrouping(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
providerSettings := createTestProviderSettings()
|
||||
config := notificationgrouping.Config{}
|
||||
|
||||
provider, err := New(ctx, providerSettings, config)
|
||||
require.NoError(t, err)
|
||||
|
||||
alert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"ruleId": "non-existent-rule",
|
||||
"alertname": "test_alert",
|
||||
"severity": "critical",
|
||||
"instance": "localhost:9090",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
route := &dispatch.Route{
|
||||
RouteOpts: dispatch.RouteOpts{
|
||||
GroupBy: map[model.LabelName]struct{}{
|
||||
"instance": {},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
groupLabels := provider.GetGroupLabels("different-org", alert, route)
|
||||
assert.NotNil(t, groupLabels)
|
||||
|
||||
// Should only have standard grouping labels
|
||||
_, instanceExists := groupLabels[model.LabelName("instance")]
|
||||
assert.True(t, instanceExists, "Expected instance label from route groupBy")
|
||||
|
||||
_, alertnameExists := groupLabels[model.LabelName("alertname")]
|
||||
assert.False(t, alertnameExists, "Should not have alertname since it's not in route groupBy")
|
||||
}
|
||||
|
||||
func TestProvider_GetGroupLabels_NoRuleId(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
providerSettings := createTestProviderSettings()
|
||||
config := notificationgrouping.Config{}
|
||||
|
||||
provider, err := New(ctx, providerSettings, config)
|
||||
require.NoError(t, err)
|
||||
|
||||
alert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": "test_alert",
|
||||
"severity": "critical",
|
||||
"instance": "localhost:9090",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
route := &dispatch.Route{
|
||||
RouteOpts: dispatch.RouteOpts{
|
||||
GroupBy: map[model.LabelName]struct{}{
|
||||
"instance": {},
|
||||
"severity": {},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
groupLabels := provider.GetGroupLabels("test-org", alert, route)
|
||||
assert.NotNil(t, groupLabels)
|
||||
|
||||
// Should have standard grouping labels only
|
||||
expectedKeys := []string{"instance", "severity"}
|
||||
for _, key := range expectedKeys {
|
||||
_, exists := groupLabels[model.LabelName(key)]
|
||||
assert.True(t, exists, "Expected key %s from route groupBy", key)
|
||||
}
|
||||
|
||||
_, alertnameExists := groupLabels[model.LabelName("alertname")]
|
||||
assert.False(t, alertnameExists, "Should not have alertname since it's not in route groupBy")
|
||||
}
|
||||
|
||||
func TestProvider_GetGroupLabels_PartialLabelMatch(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
providerSettings := createTestProviderSettings()
|
||||
config := notificationgrouping.Config{}
|
||||
|
||||
provider, err := New(ctx, providerSettings, config)
|
||||
require.NoError(t, err)
|
||||
|
||||
orgID := "test-org"
|
||||
ruleID := "test-rule"
|
||||
groupByLabels := []string{"alertname", "severity", "missing_label"}
|
||||
err = provider.SetGroupLabels(orgID, ruleID, groupByLabels)
|
||||
require.NoError(t, err)
|
||||
|
||||
alert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"ruleId": model.LabelValue(ruleID),
|
||||
"alertname": "test_alert",
|
||||
"instance": "localhost:9090",
|
||||
// Note: missing "severity" and "missing_label"
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
route := &dispatch.Route{
|
||||
RouteOpts: dispatch.RouteOpts{
|
||||
GroupBy: map[model.LabelName]struct{}{
|
||||
"instance": {},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
groupLabels := provider.GetGroupLabels(orgID, alert, route)
|
||||
assert.NotNil(t, groupLabels)
|
||||
|
||||
// Should have alertname and instance, but not severity or missing_label
|
||||
_, alertnameExists := groupLabels[model.LabelName("alertname")]
|
||||
assert.True(t, alertnameExists, "Expected alertname from rule groupBy")
|
||||
|
||||
_, instanceExists := groupLabels[model.LabelName("instance")]
|
||||
assert.True(t, instanceExists, "Expected instance from route groupBy")
|
||||
|
||||
_, severityExists := groupLabels[model.LabelName("severity")]
|
||||
assert.False(t, severityExists, "Should not have severity since it's not in alert labels")
|
||||
|
||||
_, missingExists := groupLabels[model.LabelName("missing_label")]
|
||||
assert.False(t, missingExists, "Should not have missing_label since it's not in alert labels")
|
||||
}
|
||||
|
||||
func TestGetRuleIDFromRoute(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
alert *types.Alert
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "alert with ruleId label",
|
||||
alert: &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"ruleId": "test-rule-123",
|
||||
"alertname": "test_alert",
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: "test-rule-123",
|
||||
},
|
||||
{
|
||||
name: "alert without ruleId label",
|
||||
alert: &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": "test_alert",
|
||||
"severity": "critical",
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: "",
|
||||
},
|
||||
{
|
||||
name: "alert with empty ruleId",
|
||||
alert: &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"ruleId": "",
|
||||
"alertname": "test_alert",
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: "",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := getRuleIDFromRoute(tt.alert)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestProvider_ConcurrentAccess(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
providerSettings := createTestProviderSettings()
|
||||
config := notificationgrouping.Config{}
|
||||
|
||||
provider, err := New(ctx, providerSettings, config)
|
||||
require.NoError(t, err)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Writer goroutine
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 50; i++ {
|
||||
err := provider.SetGroupLabels("org1", "rule1", []string{"label1", "label2"})
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Reader goroutine
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
alert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"ruleId": "rule1",
|
||||
"label1": "value1",
|
||||
"label2": "value2",
|
||||
},
|
||||
},
|
||||
}
|
||||
route := &dispatch.Route{
|
||||
RouteOpts: dispatch.RouteOpts{
|
||||
GroupBy: map[model.LabelName]struct{}{},
|
||||
},
|
||||
}
|
||||
|
||||
for i := 0; i < 50; i++ {
|
||||
groupLabels := provider.GetGroupLabels("org1", alert, route)
|
||||
assert.NotNil(t, groupLabels)
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for both goroutines to complete
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestProvider_MultipleOrgsAndRules(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
providerSettings := createTestProviderSettings()
|
||||
config := notificationgrouping.Config{}
|
||||
|
||||
provider, err := New(ctx, providerSettings, config)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Set up multiple orgs and rules
|
||||
err = provider.SetGroupLabels("org1", "rule1", []string{"alertname"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = provider.SetGroupLabels("org1", "rule2", []string{"severity"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = provider.SetGroupLabels("org2", "rule1", []string{"instance"})
|
||||
require.NoError(t, err)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
orgID string
|
||||
ruleID string
|
||||
expectedLabel string
|
||||
}{
|
||||
{
|
||||
name: "org1 rule1",
|
||||
orgID: "org1",
|
||||
ruleID: "rule1",
|
||||
expectedLabel: "alertname",
|
||||
},
|
||||
{
|
||||
name: "org1 rule2",
|
||||
orgID: "org1",
|
||||
ruleID: "rule2",
|
||||
expectedLabel: "severity",
|
||||
},
|
||||
{
|
||||
name: "org2 rule1",
|
||||
orgID: "org2",
|
||||
ruleID: "rule1",
|
||||
expectedLabel: "instance",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
alert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"ruleId": model.LabelValue(tt.ruleID),
|
||||
"alertname": "test_alert",
|
||||
"severity": "critical",
|
||||
"instance": "localhost:9090",
|
||||
},
|
||||
},
|
||||
}
|
||||
route := &dispatch.Route{
|
||||
RouteOpts: dispatch.RouteOpts{
|
||||
GroupBy: map[model.LabelName]struct{}{},
|
||||
},
|
||||
}
|
||||
|
||||
groupLabels := provider.GetGroupLabels(tt.orgID, alert, route)
|
||||
|
||||
// Should have the expected label from rule-specific grouping
|
||||
_, exists := groupLabels[model.LabelName(tt.expectedLabel)]
|
||||
assert.True(t, exists, "Expected label %s to be present", tt.expectedLabel)
|
||||
})
|
||||
}
|
||||
}
|
||||
49
pkg/notificationgrouping/standardgrouping/provider.go
Normal file
49
pkg/notificationgrouping/standardgrouping/provider.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package standardgrouping
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/prometheus/alertmanager/dispatch"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
settings factory.ScopedProviderSettings
|
||||
}
|
||||
|
||||
func NewFactory() factory.ProviderFactory[notificationgrouping.NotificationGroups, notificationgrouping.Config] {
|
||||
return factory.NewProviderFactory(
|
||||
factory.MustNewName("standard"),
|
||||
func(ctx context.Context, settings factory.ProviderSettings, config notificationgrouping.Config) (notificationgrouping.NotificationGroups, error) {
|
||||
return New(ctx, settings, config)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config notificationgrouping.Config) (notificationgrouping.NotificationGroups, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/notificationgrouping/standardgrouping")
|
||||
|
||||
return &provider{
|
||||
settings: settings,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *provider) GetGroupLabels(orgID string, alert *types.Alert, route *dispatch.Route) model.LabelSet {
|
||||
// orgID is ignored for standard grouping as it uses route configuration
|
||||
groupLabels := model.LabelSet{}
|
||||
for ln, lv := range alert.Labels {
|
||||
if _, ok := route.RouteOpts.GroupBy[ln]; ok || route.RouteOpts.GroupByAll {
|
||||
groupLabels[ln] = lv
|
||||
}
|
||||
}
|
||||
return groupLabels
|
||||
}
|
||||
|
||||
func (p *provider) SetGroupLabels(orgID string, ruleID string, groupByLabels []string) error {
|
||||
// Standard grouping doesn't support dynamic label setting
|
||||
// Grouping is determined by the route configuration, not rule-specific settings
|
||||
return nil
|
||||
}
|
||||
@@ -14,6 +14,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||
"github.com/SigNoz/signoz/pkg/sharder"
|
||||
@@ -35,7 +37,9 @@ func TestRegenerateConnectionUrlWithUpdatedConfig(t *testing.T) {
|
||||
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
|
||||
require.NoError(err)
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlStore), sharder)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter)
|
||||
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
|
||||
require.NoError(err)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter, notificationGroups)
|
||||
require.NoError(err)
|
||||
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
|
||||
emailing := emailingtest.New()
|
||||
@@ -92,7 +96,9 @@ func TestAgentCheckIns(t *testing.T) {
|
||||
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
|
||||
require.NoError(err)
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlStore), sharder)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter)
|
||||
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
|
||||
require.NoError(err)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter, notificationGroups)
|
||||
require.NoError(err)
|
||||
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
|
||||
emailing := emailingtest.New()
|
||||
@@ -188,7 +194,9 @@ func TestCantDisconnectNonExistentAccount(t *testing.T) {
|
||||
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
|
||||
require.NoError(err)
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlStore), sharder)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter)
|
||||
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
|
||||
require.NoError(err)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter, notificationGroups)
|
||||
require.NoError(err)
|
||||
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
|
||||
emailing := emailingtest.New()
|
||||
@@ -216,7 +224,9 @@ func TestConfigureService(t *testing.T) {
|
||||
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
|
||||
require.NoError(err)
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlStore), sharder)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter)
|
||||
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
|
||||
require.NoError(err)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter, notificationGroups)
|
||||
require.NoError(err)
|
||||
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
|
||||
emailing := emailingtest.New()
|
||||
|
||||
@@ -12,6 +12,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/emailing/emailingtest"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
|
||||
"github.com/SigNoz/signoz/pkg/sharder"
|
||||
"github.com/SigNoz/signoz/pkg/sharder/noopsharder"
|
||||
"github.com/SigNoz/signoz/pkg/signoz"
|
||||
@@ -29,7 +31,8 @@ func TestIntegrationLifecycle(t *testing.T) {
|
||||
providerSettings := instrumentationtest.New().ToProviderSettings()
|
||||
sharder, _ := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(store), sharder)
|
||||
alertmanager, _ := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, store, orgGetter)
|
||||
notificationGroups, _ := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
|
||||
alertmanager, _ := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, store, orgGetter, notificationGroups)
|
||||
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
|
||||
emailing := emailingtest.New()
|
||||
analytics := analyticstest.New()
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/http/middleware"
|
||||
"github.com/SigNoz/signoz/pkg/licensing/nooplicensing"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
querierAPI "github.com/SigNoz/signoz/pkg/querier"
|
||||
@@ -95,6 +96,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
|
||||
signoz.Modules.OrgGetter,
|
||||
signoz.Querier,
|
||||
signoz.Instrumentation.Logger(),
|
||||
signoz.NotificationGroups,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -389,21 +391,23 @@ func makeRulesManager(
|
||||
orgGetter organization.Getter,
|
||||
querier querier.Querier,
|
||||
logger *slog.Logger,
|
||||
groups notificationgrouping.NotificationGroups,
|
||||
) (*rules.Manager, error) {
|
||||
// create manager opts
|
||||
managerOpts := &rules.ManagerOptions{
|
||||
TelemetryStore: telemetryStore,
|
||||
Prometheus: prometheus,
|
||||
Context: context.Background(),
|
||||
Logger: zap.L(),
|
||||
Reader: ch,
|
||||
Querier: querier,
|
||||
SLogger: logger,
|
||||
Cache: cache,
|
||||
EvalDelay: constants.GetEvalDelay(),
|
||||
SQLStore: sqlstore,
|
||||
OrgGetter: orgGetter,
|
||||
Alertmanager: alertmanager,
|
||||
TelemetryStore: telemetryStore,
|
||||
Prometheus: prometheus,
|
||||
Context: context.Background(),
|
||||
Logger: zap.L(),
|
||||
Reader: ch,
|
||||
Querier: querier,
|
||||
SLogger: logger,
|
||||
Cache: cache,
|
||||
EvalDelay: constants.GetEvalDelay(),
|
||||
SQLStore: sqlstore,
|
||||
OrgGetter: orgGetter,
|
||||
Alertmanager: alertmanager,
|
||||
NotificationGroups: groups,
|
||||
}
|
||||
|
||||
// create Manager
|
||||
|
||||
@@ -84,7 +84,8 @@ type BaseRule struct {
|
||||
// should be fast but we can still avoid the query if we have the data in memory
|
||||
TemporalityMap map[string]map[v3.Temporality]bool
|
||||
|
||||
sqlstore sqlstore.SQLStore
|
||||
sqlstore sqlstore.SQLStore
|
||||
NotificationGroupBy []string
|
||||
}
|
||||
|
||||
type RuleOption func(*BaseRule)
|
||||
@@ -125,20 +126,21 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader
|
||||
}
|
||||
|
||||
baseRule := &BaseRule{
|
||||
id: id,
|
||||
orgID: orgID,
|
||||
name: p.AlertName,
|
||||
source: p.Source,
|
||||
typ: p.AlertType,
|
||||
ruleCondition: p.RuleCondition,
|
||||
evalWindow: time.Duration(p.EvalWindow),
|
||||
labels: qslabels.FromMap(p.Labels),
|
||||
annotations: qslabels.FromMap(p.Annotations),
|
||||
preferredChannels: p.PreferredChannels,
|
||||
health: ruletypes.HealthUnknown,
|
||||
Active: map[uint64]*ruletypes.Alert{},
|
||||
reader: reader,
|
||||
TemporalityMap: make(map[string]map[v3.Temporality]bool),
|
||||
id: id,
|
||||
orgID: orgID,
|
||||
name: p.AlertName,
|
||||
source: p.Source,
|
||||
typ: p.AlertType,
|
||||
ruleCondition: p.RuleCondition,
|
||||
evalWindow: time.Duration(p.EvalWindow),
|
||||
labels: qslabels.FromMap(p.Labels),
|
||||
annotations: qslabels.FromMap(p.Annotations),
|
||||
preferredChannels: p.PreferredChannels,
|
||||
health: ruletypes.HealthUnknown,
|
||||
Active: map[uint64]*ruletypes.Alert{},
|
||||
reader: reader,
|
||||
TemporalityMap: make(map[string]map[v3.Temporality]bool),
|
||||
NotificationGroupBy: p.NotificationGroups,
|
||||
}
|
||||
|
||||
if baseRule.evalWindow == 0 {
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager"
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
querierV5 "github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||
@@ -101,6 +102,7 @@ type ManagerOptions struct {
|
||||
Alertmanager alertmanager.Alertmanager
|
||||
SQLStore sqlstore.SQLStore
|
||||
OrgGetter organization.Getter
|
||||
NotificationGroups notificationgrouping.NotificationGroups
|
||||
}
|
||||
|
||||
// The Manager manages recording and alerting rules.
|
||||
@@ -120,9 +122,10 @@ type Manager struct {
|
||||
prepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
|
||||
prepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
|
||||
|
||||
alertmanager alertmanager.Alertmanager
|
||||
sqlstore sqlstore.SQLStore
|
||||
orgGetter organization.Getter
|
||||
alertmanager alertmanager.Alertmanager
|
||||
sqlstore sqlstore.SQLStore
|
||||
orgGetter organization.Getter
|
||||
NotificationGroup notificationgrouping.NotificationGroups
|
||||
}
|
||||
|
||||
func defaultOptions(o *ManagerOptions) *ManagerOptions {
|
||||
@@ -202,6 +205,13 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
// by calling the Run method.
|
||||
func NewManager(o *ManagerOptions) (*Manager, error) {
|
||||
o = defaultOptions(o)
|
||||
|
||||
// Validate that NotificationGroups is properly initialized
|
||||
if o.NotificationGroups == nil {
|
||||
zap.L().Error("NotificationGroups is nil during Manager creation, this will cause panics")
|
||||
return nil, fmt.Errorf("NotificationGroups is required but was nil")
|
||||
}
|
||||
|
||||
ruleStore := sqlrulestore.NewRuleStore(o.SQLStore)
|
||||
maintenanceStore := sqlrulestore.NewMaintenanceStore(o.SQLStore)
|
||||
|
||||
@@ -220,8 +230,16 @@ func NewManager(o *ManagerOptions) (*Manager, error) {
|
||||
alertmanager: o.Alertmanager,
|
||||
sqlstore: o.SQLStore,
|
||||
orgGetter: o.OrgGetter,
|
||||
NotificationGroup: o.NotificationGroups,
|
||||
}
|
||||
|
||||
// Double-check that NotificationGroup was properly assigned
|
||||
if m.NotificationGroup == nil {
|
||||
zap.L().Error("NotificationGroup is nil after assignment, this should not happen")
|
||||
return nil, fmt.Errorf("NotificationGroup assignment failed")
|
||||
}
|
||||
|
||||
zap.L().Debug("Manager created successfully with NotificationGroup")
|
||||
return m, nil
|
||||
}
|
||||
|
||||
@@ -267,7 +285,6 @@ func (m *Manager) initiate(ctx context.Context) error {
|
||||
for _, rec := range storedRules {
|
||||
taskName := fmt.Sprintf("%s-groupname", rec.ID.StringValue())
|
||||
parsedRule, err := ruletypes.ParsePostableRule([]byte(rec.Data))
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, ruletypes.ErrFailedToParseJSON) {
|
||||
zap.L().Info("failed to load rule in json format, trying yaml now:", zap.String("name", taskName))
|
||||
@@ -289,6 +306,12 @@ func (m *Manager) initiate(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
if !parsedRule.Disabled {
|
||||
if parsedRule.NotificationGroups != nil {
|
||||
err = m.NotificationGroup.SetGroupLabels(org.ID.StringValue(), rec.ID.StringValue(), parsedRule.NotificationGroups)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to load the notification group definition", zap.String("name", rec.ID.StringValue()), zap.Error(err))
|
||||
}
|
||||
}
|
||||
err := m.addTask(ctx, org.ID, parsedRule, taskName)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to load the rule definition", zap.String("name", taskName), zap.Error(err))
|
||||
@@ -370,6 +393,13 @@ func (m *Manager) EditRule(ctx context.Context, ruleStr string, id valuer.UUID)
|
||||
preferredChannels = parsedRule.PreferredChannels
|
||||
}
|
||||
|
||||
if parsedRule.NotificationGroups != nil {
|
||||
err = m.NotificationGroup.SetGroupLabels(claims.OrgID, id.StringValue(), parsedRule.NotificationGroups)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err = cfg.UpdateRuleIDMatcher(id.StringValue(), preferredChannels)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -379,7 +409,6 @@ func (m *Manager) EditRule(ctx context.Context, ruleStr string, id valuer.UUID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = m.syncRuleStateWithTask(ctx, orgID, prepareTaskName(existingRule.ID.StringValue()), parsedRule)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -567,7 +596,13 @@ func (m *Manager) CreateRule(ctx context.Context, ruleStr string) (*ruletypes.Ge
|
||||
}
|
||||
|
||||
taskName := prepareTaskName(id.StringValue())
|
||||
if err := m.addTask(ctx, orgID, parsedRule, taskName); err != nil {
|
||||
if parsedRule.NotificationGroups != nil {
|
||||
err = m.NotificationGroup.SetGroupLabels(claims.OrgID, id.StringValue(), parsedRule.NotificationGroups)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err = m.addTask(ctx, orgID, parsedRule, taskName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager"
|
||||
"github.com/SigNoz/signoz/pkg/analytics/analyticstest"
|
||||
"github.com/SigNoz/signoz/pkg/emailing/emailingtest"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
|
||||
"github.com/SigNoz/signoz/pkg/sharder"
|
||||
"github.com/SigNoz/signoz/pkg/sharder/noopsharder"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
@@ -312,7 +314,11 @@ func NewFilterSuggestionsTestBed(t *testing.T) *FilterSuggestionsTestBed {
|
||||
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
|
||||
require.NoError(t, err)
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(testDB), sharder)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, testDB, orgGetter)
|
||||
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, testDB, orgGetter, notificationGroups)
|
||||
require.NoError(t, err)
|
||||
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
|
||||
emailing := emailingtest.New()
|
||||
|
||||
@@ -17,6 +17,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager"
|
||||
"github.com/SigNoz/signoz/pkg/analytics/analyticstest"
|
||||
"github.com/SigNoz/signoz/pkg/emailing/emailingtest"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user"
|
||||
@@ -492,7 +494,11 @@ func NewTestbedWithoutOpamp(t *testing.T, sqlStore sqlstore.SQLStore) *LogPipeli
|
||||
sharder, err := noopsharder.New(context.Background(), providerSettings, sharder.Config{})
|
||||
require.NoError(t, err)
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlStore), sharder)
|
||||
alertmanager, err := signozalertmanager.New(context.Background(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter)
|
||||
notificationGroups, err := notificationgroupingtest.New(context.Background(), providerSettings, notificationgrouping.Config{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
alertmanager, err := signozalertmanager.New(context.Background(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter, notificationGroups)
|
||||
require.NoError(t, err)
|
||||
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
|
||||
emailing := emailingtest.New()
|
||||
|
||||
@@ -14,6 +14,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager"
|
||||
"github.com/SigNoz/signoz/pkg/analytics/analyticstest"
|
||||
"github.com/SigNoz/signoz/pkg/emailing/emailingtest"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
|
||||
"github.com/SigNoz/signoz/pkg/sharder"
|
||||
"github.com/SigNoz/signoz/pkg/sharder/noopsharder"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
@@ -373,7 +375,11 @@ func NewCloudIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *CloudI
|
||||
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
|
||||
require.NoError(t, err)
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(testDB), sharder)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, testDB, orgGetter)
|
||||
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, testDB, orgGetter, notificationGroups)
|
||||
require.NoError(t, err)
|
||||
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
|
||||
emailing := emailingtest.New()
|
||||
|
||||
@@ -14,6 +14,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager"
|
||||
"github.com/SigNoz/signoz/pkg/analytics/analyticstest"
|
||||
"github.com/SigNoz/signoz/pkg/emailing/emailingtest"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
|
||||
"github.com/SigNoz/signoz/pkg/http/middleware"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
@@ -588,7 +590,11 @@ func NewIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *Integration
|
||||
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
|
||||
require.NoError(t, err)
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(testDB), sharder)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, testDB, orgGetter)
|
||||
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, testDB, orgGetter, notificationGroups)
|
||||
require.NoError(t, err)
|
||||
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
|
||||
emailing := emailingtest.New()
|
||||
|
||||
@@ -10,6 +10,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager"
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager"
|
||||
"github.com/SigNoz/signoz/pkg/emailing/emailingtest"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
|
||||
"github.com/SigNoz/signoz/pkg/factory/factorytest"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/sharder"
|
||||
@@ -29,7 +31,9 @@ func TestNewHandlers(t *testing.T) {
|
||||
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
|
||||
require.NoError(t, err)
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstore), sharder)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter)
|
||||
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
|
||||
require.NoError(t, err)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationGroups)
|
||||
require.NoError(t, err)
|
||||
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
|
||||
emailing := emailingtest.New()
|
||||
|
||||
@@ -10,6 +10,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager"
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager"
|
||||
"github.com/SigNoz/signoz/pkg/emailing/emailingtest"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
|
||||
"github.com/SigNoz/signoz/pkg/factory/factorytest"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/sharder"
|
||||
@@ -29,7 +31,9 @@ func TestNewModules(t *testing.T) {
|
||||
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
|
||||
require.NoError(t, err)
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstore), sharder)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter)
|
||||
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
|
||||
require.NoError(t, err)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationGroups)
|
||||
require.NoError(t, err)
|
||||
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
|
||||
emailing := emailingtest.New()
|
||||
|
||||
@@ -16,6 +16,10 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping/rulebasedgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping/standardgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus/clickhouseprometheus"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
@@ -153,10 +157,18 @@ func NewPrometheusProviderFactories(telemetryStore telemetrystore.TelemetryStore
|
||||
)
|
||||
}
|
||||
|
||||
func NewAlertmanagerProviderFactories(sqlstore sqlstore.SQLStore, orgGetter organization.Getter) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] {
|
||||
func NewNotificationGroupingProviderFactories() factory.NamedMap[factory.ProviderFactory[notificationgrouping.NotificationGroups, notificationgrouping.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
rulebasedgrouping.NewFactory(),
|
||||
standardgrouping.NewFactory(),
|
||||
notificationgroupingtest.NewFactory(),
|
||||
)
|
||||
}
|
||||
|
||||
func NewAlertmanagerProviderFactories(sqlstore sqlstore.SQLStore, orgGetter organization.Getter, notificationGroups notificationgrouping.NotificationGroups) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
legacyalertmanager.NewFactory(sqlstore, orgGetter),
|
||||
signozalertmanager.NewFactory(sqlstore, orgGetter),
|
||||
signozalertmanager.NewFactory(sqlstore, orgGetter, notificationGroups),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
package signoz
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/SigNoz/signoz/pkg/analytics"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
|
||||
"github.com/SigNoz/signoz/pkg/sqlschema"
|
||||
"github.com/SigNoz/signoz/pkg/sqlschema/sqlschematest"
|
||||
@@ -54,7 +57,8 @@ func TestNewProviderFactories(t *testing.T) {
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual)), nil)
|
||||
NewAlertmanagerProviderFactories(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual), orgGetter)
|
||||
notificationGroups, _ := notificationgroupingtest.New(context.TODO(), instrumentationtest.New().ToProviderSettings(), notificationgrouping.Config{})
|
||||
NewAlertmanagerProviderFactories(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual), orgGetter, notificationGroups)
|
||||
})
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
|
||||
@@ -2,7 +2,6 @@ package signoz
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager"
|
||||
"github.com/SigNoz/signoz/pkg/analytics"
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
@@ -13,6 +12,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
|
||||
"github.com/SigNoz/signoz/pkg/notificationgrouping"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/ruler"
|
||||
@@ -32,23 +32,24 @@ import (
|
||||
|
||||
type SigNoz struct {
|
||||
*factory.Registry
|
||||
Instrumentation instrumentation.Instrumentation
|
||||
Analytics analytics.Analytics
|
||||
Cache cache.Cache
|
||||
Web web.Web
|
||||
SQLStore sqlstore.SQLStore
|
||||
TelemetryStore telemetrystore.TelemetryStore
|
||||
Prometheus prometheus.Prometheus
|
||||
Alertmanager alertmanager.Alertmanager
|
||||
Querier querier.Querier
|
||||
Rules ruler.Ruler
|
||||
Zeus zeus.Zeus
|
||||
Licensing licensing.Licensing
|
||||
Emailing emailing.Emailing
|
||||
Sharder sharder.Sharder
|
||||
StatsReporter statsreporter.StatsReporter
|
||||
Modules Modules
|
||||
Handlers Handlers
|
||||
Instrumentation instrumentation.Instrumentation
|
||||
Analytics analytics.Analytics
|
||||
Cache cache.Cache
|
||||
Web web.Web
|
||||
SQLStore sqlstore.SQLStore
|
||||
TelemetryStore telemetrystore.TelemetryStore
|
||||
Prometheus prometheus.Prometheus
|
||||
Alertmanager alertmanager.Alertmanager
|
||||
Querier querier.Querier
|
||||
Rules ruler.Ruler
|
||||
Zeus zeus.Zeus
|
||||
Licensing licensing.Licensing
|
||||
Emailing emailing.Emailing
|
||||
Sharder sharder.Sharder
|
||||
StatsReporter statsreporter.StatsReporter
|
||||
Modules Modules
|
||||
Handlers Handlers
|
||||
NotificationGroups notificationgrouping.NotificationGroups
|
||||
}
|
||||
|
||||
func New(
|
||||
@@ -230,12 +231,27 @@ func New(
|
||||
// Initialize user getter
|
||||
userGetter := impluser.NewGetter(impluser.NewStore(sqlstore, providerSettings))
|
||||
|
||||
// shared NotificationGroups instance for both alertmanager and rules
|
||||
notificationGroups, err := factory.NewProviderFromNamedMap(
|
||||
ctx,
|
||||
providerSettings,
|
||||
notificationgrouping.Config{
|
||||
Provider: "rulebased",
|
||||
DefaultStrategy: "standard",
|
||||
},
|
||||
NewNotificationGroupingProviderFactories(),
|
||||
"rulebased",
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize alertmanager from the available alertmanager provider factories
|
||||
alertmanager, err := factory.NewProviderFromNamedMap(
|
||||
ctx,
|
||||
providerSettings,
|
||||
config.Alertmanager,
|
||||
NewAlertmanagerProviderFactories(sqlstore, orgGetter),
|
||||
NewAlertmanagerProviderFactories(sqlstore, orgGetter, notificationGroups),
|
||||
config.Alertmanager.Provider,
|
||||
)
|
||||
if err != nil {
|
||||
@@ -305,21 +321,23 @@ func New(
|
||||
}
|
||||
|
||||
return &SigNoz{
|
||||
Registry: registry,
|
||||
Analytics: analytics,
|
||||
Instrumentation: instrumentation,
|
||||
Cache: cache,
|
||||
Web: web,
|
||||
SQLStore: sqlstore,
|
||||
TelemetryStore: telemetrystore,
|
||||
Prometheus: prometheus,
|
||||
Alertmanager: alertmanager,
|
||||
Querier: querier,
|
||||
Zeus: zeus,
|
||||
Licensing: licensing,
|
||||
Emailing: emailing,
|
||||
Sharder: sharder,
|
||||
Modules: modules,
|
||||
Handlers: handlers,
|
||||
Registry: registry,
|
||||
Analytics: analytics,
|
||||
Instrumentation: instrumentation,
|
||||
Cache: cache,
|
||||
Web: web,
|
||||
SQLStore: sqlstore,
|
||||
TelemetryStore: telemetrystore,
|
||||
Prometheus: prometheus,
|
||||
Alertmanager: alertmanager,
|
||||
Querier: querier,
|
||||
Rules: ruler,
|
||||
Zeus: zeus,
|
||||
Licensing: licensing,
|
||||
Emailing: emailing,
|
||||
Sharder: sharder,
|
||||
Modules: modules,
|
||||
Handlers: handlers,
|
||||
NotificationGroups: notificationGroups,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -67,6 +67,8 @@ type PostableRule struct {
|
||||
// legacy
|
||||
Expr string `yaml:"expr,omitempty" json:"expr,omitempty"`
|
||||
OldYaml string `json:"yaml,omitempty"`
|
||||
|
||||
NotificationGroups []string `yaml:"notificationGroups,omitempty" json:"notificationGroups,omitempty"`
|
||||
}
|
||||
|
||||
func ParsePostableRule(content []byte) (*PostableRule, error) {
|
||||
|
||||
Reference in New Issue
Block a user