Compare commits

...

14 Commits

28 changed files with 1711 additions and 140 deletions

View File

@@ -3,6 +3,7 @@ package app
import (
"context"
"fmt"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"log/slog"
"net"
"net/http"
@@ -108,6 +109,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
signoz.Modules.OrgGetter,
signoz.Querier,
signoz.Instrumentation.Logger(),
signoz.NotificationGroups,
)
if err != nil {
@@ -417,17 +419,7 @@ func (s *Server) Stop(ctx context.Context) error {
return nil
}
func makeRulesManager(
ch baseint.Reader,
cache cache.Cache,
alertmanager alertmanager.Alertmanager,
sqlstore sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus,
orgGetter organization.Getter,
querier querier.Querier,
logger *slog.Logger,
) (*baserules.Manager, error) {
func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertmanager.Alertmanager, sqlstore sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, orgGetter organization.Getter, querier querier.Querier, logger *slog.Logger, groups notificationgrouping.NotificationGroups) (*baserules.Manager, error) {
// create manager opts
managerOpts := &baserules.ManagerOptions{
TelemetryStore: telemetryStore,
@@ -444,6 +436,7 @@ func makeRulesManager(
Alertmanager: alertmanager,
SQLStore: sqlstore,
OrgGetter: orgGetter,
NotificationGroups: groups,
}
// create Manager

View File

@@ -0,0 +1,566 @@
// Copyright 2018 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package dispatch contains SigNoz's custom dispatcher implementation
// based on Prometheus Alertmanager dispatcher
package dispatch
import (
"context"
"errors"
"fmt"
"log/slog"
"sort"
"sync"
"time"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/store"
"github.com/prometheus/alertmanager/types"
)
// DispatcherMetrics represents metrics associated to a dispatcher.
type DispatcherMetrics struct {
aggrGroups prometheus.Gauge
processingDuration prometheus.Summary
aggrGroupLimitReached prometheus.Counter
}
// NewDispatcherMetrics returns a new registered DispatchMetrics.
func NewDispatcherMetrics(registerLimitMetrics bool, r prometheus.Registerer) *DispatcherMetrics {
m := DispatcherMetrics{
aggrGroups: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "signoz_alertmanager_dispatcher_aggregation_groups",
Help: "Number of active aggregation groups",
},
),
processingDuration: prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "signoz_alertmanager_dispatcher_alert_processing_duration_seconds",
Help: "Summary of latencies for the processing of alerts.",
},
),
aggrGroupLimitReached: prometheus.NewCounter(
prometheus.CounterOpts{
Name: "signoz_alertmanager_dispatcher_aggregation_group_limit_reached_total",
Help: "Number of times when dispatcher failed to create new aggregation group due to limit.",
},
),
}
if r != nil {
r.MustRegister(m.aggrGroups, m.processingDuration)
if registerLimitMetrics {
r.MustRegister(m.aggrGroupLimitReached)
}
}
return &m
}
// Dispatcher sorts incoming alerts into aggregation groups and
// assigns the correct notifiers to each.
type Dispatcher struct {
route *dispatch.Route
alerts provider.Alerts
stage notify.Stage
marker types.GroupMarker
metrics *DispatcherMetrics
limits Limits
timeout func(time.Duration) time.Duration
mtx sync.RWMutex
aggrGroupsPerRoute map[*dispatch.Route]map[model.Fingerprint]*aggrGroup
aggrGroupsNum int
done chan struct{}
ctx context.Context
cancel func()
logger *slog.Logger
notificationGroups notificationgrouping.NotificationGroups
orgID string
}
// We use the upstream Limits interface from Prometheus
type Limits = dispatch.Limits
// NewDispatcher returns a new Dispatcher.
func NewDispatcher(
ap provider.Alerts,
r *dispatch.Route,
s notify.Stage,
mk types.GroupMarker,
to func(time.Duration) time.Duration,
lim Limits,
l *slog.Logger,
m *DispatcherMetrics,
n notificationgrouping.NotificationGroups,
orgID string,
) *Dispatcher {
if lim == nil {
// Use a simple implementation when no limits are provided
lim = &unlimitedLimits{}
}
disp := &Dispatcher{
alerts: ap,
stage: s,
route: r,
marker: mk,
timeout: to,
logger: l.With("component", "signoz-dispatcher"),
metrics: m,
limits: lim,
notificationGroups: n,
orgID: orgID,
}
return disp
}
// Run starts dispatching alerts incoming via the updates channel.
func (d *Dispatcher) Run() {
d.done = make(chan struct{})
d.mtx.Lock()
d.aggrGroupsPerRoute = map[*dispatch.Route]map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsNum = 0
d.metrics.aggrGroups.Set(0)
d.ctx, d.cancel = context.WithCancel(context.Background())
d.mtx.Unlock()
d.run(d.alerts.Subscribe())
close(d.done)
}
func (d *Dispatcher) run(it provider.AlertIterator) {
maintenance := time.NewTicker(30 * time.Second)
defer maintenance.Stop()
defer it.Close()
for {
select {
case alert, ok := <-it.Next():
if !ok {
// Iterator exhausted for some reason.
if err := it.Err(); err != nil {
d.logger.ErrorContext(d.ctx, "Error on alert update", "err", err)
}
return
}
d.logger.DebugContext(d.ctx, "SigNoz Custom Dispatcher: Received alert", "alert", alert)
// Log errors but keep trying.
if err := it.Err(); err != nil {
d.logger.ErrorContext(d.ctx, "Error on alert update", "err", err)
continue
}
now := time.Now()
for _, r := range d.route.Match(alert.Labels) {
d.processAlert(alert, r)
}
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
case <-maintenance.C:
d.doMaintenance()
case <-d.ctx.Done():
return
}
}
}
func (d *Dispatcher) doMaintenance() {
d.mtx.Lock()
defer d.mtx.Unlock()
for _, groups := range d.aggrGroupsPerRoute {
for _, ag := range groups {
if ag.empty() {
ag.stop()
d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey())
delete(groups, ag.fingerprint())
d.aggrGroupsNum--
d.metrics.aggrGroups.Dec()
}
}
}
}
// AlertGroup represents how alerts exist within an aggrGroup.
type AlertGroup struct {
Alerts types.AlertSlice
Labels model.LabelSet
Receiver string
GroupKey string
RouteID string
}
type AlertGroups []*AlertGroup
func (ag AlertGroups) Swap(i, j int) { ag[i], ag[j] = ag[j], ag[i] }
func (ag AlertGroups) Less(i, j int) bool {
if ag[i].Labels.Equal(ag[j].Labels) {
return ag[i].Receiver < ag[j].Receiver
}
return ag[i].Labels.Before(ag[j].Labels)
}
func (ag AlertGroups) Len() int { return len(ag) }
// Groups returns a slice of AlertGroups from the dispatcher's internal state.
func (d *Dispatcher) Groups(routeFilter func(*dispatch.Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
groups := AlertGroups{}
d.mtx.RLock()
defer d.mtx.RUnlock()
// Keep a list of receivers for an alert to prevent checking each alert
// again against all routes. The alert has already matched against this
// route on ingestion.
receivers := map[model.Fingerprint][]string{}
now := time.Now()
for route, ags := range d.aggrGroupsPerRoute {
if !routeFilter(route) {
continue
}
for _, ag := range ags {
receiver := route.RouteOpts.Receiver
alertGroup := &AlertGroup{
Labels: ag.labels,
Receiver: receiver,
GroupKey: ag.GroupKey(),
RouteID: ag.routeID,
}
alerts := ag.alerts.List()
filteredAlerts := make([]*types.Alert, 0, len(alerts))
for _, a := range alerts {
if !alertFilter(a, now) {
continue
}
fp := a.Fingerprint()
if r, ok := receivers[fp]; ok {
// Receivers slice already exists. Add
// the current receiver to the slice.
receivers[fp] = append(r, receiver)
} else {
// First time we've seen this alert fingerprint.
// Initialize a new receivers slice.
receivers[fp] = []string{receiver}
}
filteredAlerts = append(filteredAlerts, a)
}
if len(filteredAlerts) == 0 {
continue
}
alertGroup.Alerts = filteredAlerts
groups = append(groups, alertGroup)
}
}
sort.Sort(groups)
for i := range groups {
sort.Sort(groups[i].Alerts)
}
for i := range receivers {
sort.Strings(receivers[i])
}
return groups, receivers
}
// Stop the dispatcher.
func (d *Dispatcher) Stop() {
if d == nil {
return
}
d.mtx.Lock()
if d.cancel == nil {
d.mtx.Unlock()
return
}
d.cancel()
d.cancel = nil
d.mtx.Unlock()
<-d.done
}
// GetStats returns statistics about the dispatcher for SigNoz monitoring
func (d *Dispatcher) GetStats() map[string]interface{} {
if d == nil {
return nil
}
d.mtx.RLock()
defer d.mtx.RUnlock()
stats := map[string]interface{}{
"total_aggregation_groups": d.aggrGroupsNum,
"routes_count": len(d.aggrGroupsPerRoute),
}
// Count groups per route
routeStats := make(map[string]int)
for route, groups := range d.aggrGroupsPerRoute {
routeKey := route.Key()
routeStats[routeKey] = len(groups)
}
stats["groups_per_route"] = routeStats
return stats
}
// notifyFunc is a function that performs notification for the alert
// with the given fingerprint. It aborts on context cancelation.
// Returns false iff notifying failed.
type notifyFunc func(context.Context, ...*types.Alert) bool
// processAlert determines in which aggregation group the alert falls
// and inserts it.
func (d *Dispatcher) processAlert(alert *types.Alert, route *dispatch.Route) {
groupLabels := d.notificationGroups.GetGroupLabels(d.orgID, alert, route)
fp := groupLabels.Fingerprint()
d.mtx.Lock()
defer d.mtx.Unlock()
routeGroups, ok := d.aggrGroupsPerRoute[route]
if !ok {
routeGroups = map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsPerRoute[route] = routeGroups
}
ag, ok := routeGroups[fp]
if ok {
ag.insert(alert)
return
}
// If the group does not exist, create it. But check the limit first.
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
d.metrics.aggrGroupLimitReached.Inc()
d.logger.ErrorContext(d.ctx, "Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
return
}
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
routeGroups[fp] = ag
d.aggrGroupsNum++
d.metrics.aggrGroups.Inc()
// Insert the 1st alert in the group before starting the group's run()
// function, to make sure that when the run() will be executed the 1st
// alert is already there.
ag.insert(alert)
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
logger := d.logger.With("num_alerts", len(alerts), "err", err)
if errors.Is(ctx.Err(), context.Canceled) {
// It is expected for the context to be canceled on
// configuration reload or shutdown. In this case, the
// message should only be logged at the debug level.
logger.DebugContext(ctx, "Notify for alerts failed")
} else {
logger.ErrorContext(ctx, "Notify for alerts failed")
}
}
return err == nil
})
}
// aggrGroup aggregates alert fingerprints into groups to which a
// common set of routing options applies.
// It emits notifications in the specified intervals.
type aggrGroup struct {
labels model.LabelSet
opts *dispatch.RouteOpts
logger *slog.Logger
routeID string
routeKey string
alerts *store.Alerts
ctx context.Context
cancel func()
done chan struct{}
next *time.Timer
timeout func(time.Duration) time.Duration
mtx sync.RWMutex
hasFlushed bool
}
// newAggrGroup returns a new aggregation group.
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *dispatch.Route, to func(time.Duration) time.Duration, logger *slog.Logger) *aggrGroup {
if to == nil {
to = func(d time.Duration) time.Duration { return d }
}
ag := &aggrGroup{
labels: labels,
routeID: r.ID(),
routeKey: r.Key(),
opts: &r.RouteOpts,
timeout: to,
alerts: store.NewAlerts(),
done: make(chan struct{}),
}
ag.ctx, ag.cancel = context.WithCancel(ctx)
ag.logger = logger.With("aggr_group", ag)
// Set an initial one-time wait before flushing
// the first batch of notifications.
ag.next = time.NewTimer(ag.opts.GroupWait)
return ag
}
func (ag *aggrGroup) fingerprint() model.Fingerprint {
return ag.labels.Fingerprint()
}
func (ag *aggrGroup) GroupKey() string {
return fmt.Sprintf("%s:%s", ag.routeKey, ag.labels)
}
func (ag *aggrGroup) String() string {
return ag.GroupKey()
}
func (ag *aggrGroup) run(nf notifyFunc) {
defer close(ag.done)
defer ag.next.Stop()
for {
select {
case now := <-ag.next.C:
// Give the notifications time until the next flush to
// finish before terminating them.
ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval))
// The now time we retrieve from the ticker is the only reliable
// point of time reference for the subsequent notification pipeline.
// Calculating the current time directly is prone to flaky behavior,
// which usually only becomes apparent in tests.
ctx = notify.WithNow(ctx, now)
// Populate context with information needed along the pipeline.
ctx = notify.WithGroupKey(ctx, ag.GroupKey())
ctx = notify.WithGroupLabels(ctx, ag.labels)
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)
ctx = notify.WithRouteID(ctx, ag.routeID)
// Wait the configured interval before calling flush again.
ag.mtx.Lock()
ag.next.Reset(ag.opts.GroupInterval)
ag.hasFlushed = true
ag.mtx.Unlock()
ag.flush(func(alerts ...*types.Alert) bool {
return nf(ctx, alerts...)
})
cancel()
case <-ag.ctx.Done():
return
}
}
}
func (ag *aggrGroup) stop() {
// Calling cancel will terminate all in-process notifications
// and the run() loop.
ag.cancel()
<-ag.done
}
// insert inserts the alert into the aggregation group.
func (ag *aggrGroup) insert(alert *types.Alert) {
if err := ag.alerts.Set(alert); err != nil {
ag.logger.ErrorContext(ag.ctx, "error on set alert", "err", err)
}
// Immediately trigger a flush if the wait duration for this
// alert is already over.
ag.mtx.Lock()
defer ag.mtx.Unlock()
if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
ag.next.Reset(0)
}
}
func (ag *aggrGroup) empty() bool {
return ag.alerts.Empty()
}
// flush sends notifications for all new alerts.
func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
if ag.empty() {
return
}
var (
alerts = ag.alerts.List()
alertsSlice = make(types.AlertSlice, 0, len(alerts))
resolvedSlice = make(types.AlertSlice, 0, len(alerts))
now = time.Now()
)
for _, alert := range alerts {
a := *alert
// Ensure that alerts don't resolve as time move forwards.
if a.ResolvedAt(now) {
resolvedSlice = append(resolvedSlice, &a)
} else {
a.EndsAt = time.Time{}
}
alertsSlice = append(alertsSlice, &a)
}
sort.Stable(alertsSlice)
ag.logger.DebugContext(ag.ctx, "flushing", "alerts", fmt.Sprintf("%v", alertsSlice))
if notify(alertsSlice...) {
// Delete all resolved alerts as we just sent a notification for them,
// and we don't want to send another one. However, we need to make sure
// that each resolved alert has not fired again during the flush as then
// we would delete an active alert thinking it was resolved.
if err := ag.alerts.DeleteIfNotModified(resolvedSlice); err != nil {
ag.logger.ErrorContext(ag.ctx, "error on delete alerts", "err", err)
}
}
}
// unlimitedLimits provides unlimited aggregation groups for SigNoz
type unlimitedLimits struct{}
func (u *unlimitedLimits) MaxNumberOfAggregationGroups() int { return 0 }

View File

@@ -7,7 +7,10 @@ import (
"sync"
"time"
signozDispatcher "github.com/SigNoz/signoz/pkg/alertmanager/alertmanagerserver/dispatch"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/featurecontrol"
@@ -49,29 +52,31 @@ type Server struct {
stateStore alertmanagertypes.StateStore
// alertmanager primitives from upstream alertmanager
alerts *mem.Alerts
nflog *nflog.Log
dispatcher *dispatch.Dispatcher
dispatcherMetrics *dispatch.DispatcherMetrics
inhibitor *inhibit.Inhibitor
silencer *silence.Silencer
silences *silence.Silences
timeIntervals map[string][]timeinterval.TimeInterval
pipelineBuilder *notify.PipelineBuilder
marker *alertmanagertypes.MemMarker
tmpl *template.Template
wg sync.WaitGroup
stopc chan struct{}
alerts *mem.Alerts
nflog *nflog.Log
dispatcher *signozDispatcher.Dispatcher
dispatcherMetrics *signozDispatcher.DispatcherMetrics
inhibitor *inhibit.Inhibitor
silencer *silence.Silencer
silences *silence.Silences
timeIntervals map[string][]timeinterval.TimeInterval
pipelineBuilder *notify.PipelineBuilder
marker *alertmanagertypes.MemMarker
tmpl *template.Template
wg sync.WaitGroup
stopc chan struct{}
notificationGroups notificationgrouping.NotificationGroups
}
func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registerer, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore) (*Server, error) {
func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registerer, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore, groups notificationgrouping.NotificationGroups) (*Server, error) {
server := &Server{
logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/alertmanagerserver"),
registry: registry,
srvConfig: srvConfig,
orgID: orgID,
stateStore: stateStore,
stopc: make(chan struct{}),
logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/alertmanagerserver"),
registry: registry,
srvConfig: srvConfig,
orgID: orgID,
stateStore: stateStore,
stopc: make(chan struct{}),
notificationGroups: groups,
}
// initialize marker
server.marker = alertmanagertypes.NewMarker(server.registry)
@@ -187,7 +192,7 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere
}
server.pipelineBuilder = notify.NewPipelineBuilder(server.registry, featurecontrol.NoopFlags{})
server.dispatcherMetrics = dispatch.NewDispatcherMetrics(false, server.registry)
server.dispatcherMetrics = signozDispatcher.NewDispatcherMetrics(false, server.registry)
return server, nil
}
@@ -292,7 +297,7 @@ func (server *Server) SetConfig(ctx context.Context, alertmanagerConfig *alertma
return d
}
server.dispatcher = dispatch.NewDispatcher(
server.dispatcher = signozDispatcher.NewDispatcher(
server.alerts,
routes,
pipeline,
@@ -301,6 +306,8 @@ func (server *Server) SetConfig(ctx context.Context, alertmanagerConfig *alertma
nil,
server.logger,
server.dispatcherMetrics,
server.notificationGroups,
server.orgID,
)
// Do not try to add these to server.wg as there seems to be a race condition if

View File

@@ -11,6 +11,9 @@ import (
"testing"
"time"
"github.com/SigNoz/signoz/pkg/factory/factorytest"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes/alertmanagertypestest"
"github.com/go-openapi/strfmt"
@@ -23,7 +26,9 @@ import (
)
func TestServerSetConfigAndStop(t *testing.T) {
server, err := New(context.Background(), slog.New(slog.NewTextHandler(io.Discard, nil)), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore())
notificationGroups, err := notificationgroupingtest.New(context.Background(), factorytest.NewSettings(), notificationgrouping.Config{})
require.NoError(t, err)
server, err := New(context.Background(), slog.New(slog.NewTextHandler(io.Discard, nil)), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationGroups)
require.NoError(t, err)
amConfig, err := alertmanagertypes.NewDefaultConfig(alertmanagertypes.GlobalConfig{}, alertmanagertypes.RouteConfig{GroupInterval: 1 * time.Minute, RepeatInterval: 1 * time.Minute, GroupWait: 1 * time.Minute}, "1")
@@ -34,7 +39,9 @@ func TestServerSetConfigAndStop(t *testing.T) {
}
func TestServerTestReceiverTypeWebhook(t *testing.T) {
server, err := New(context.Background(), slog.New(slog.NewTextHandler(io.Discard, nil)), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore())
notificationGroups, err := notificationgroupingtest.New(context.Background(), factorytest.NewSettings(), notificationgrouping.Config{})
require.NoError(t, err)
server, err := New(context.Background(), slog.New(slog.NewTextHandler(io.Discard, nil)), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationGroups)
require.NoError(t, err)
amConfig, err := alertmanagertypes.NewDefaultConfig(alertmanagertypes.GlobalConfig{}, alertmanagertypes.RouteConfig{GroupInterval: 1 * time.Minute, RepeatInterval: 1 * time.Minute, GroupWait: 1 * time.Minute}, "1")
@@ -81,7 +88,9 @@ func TestServerPutAlerts(t *testing.T) {
stateStore := alertmanagertypestest.NewStateStore()
srvCfg := NewConfig()
srvCfg.Route.GroupInterval = 1 * time.Second
server, err := New(context.Background(), slog.New(slog.NewTextHandler(io.Discard, nil)), prometheus.NewRegistry(), srvCfg, "1", stateStore)
notificationGroups, err := notificationgroupingtest.New(context.Background(), factorytest.NewSettings(), notificationgrouping.Config{})
require.NoError(t, err)
server, err := New(context.Background(), slog.New(slog.NewTextHandler(io.Discard, nil)), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationGroups)
require.NoError(t, err)
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")

View File

@@ -8,6 +8,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
)
@@ -32,6 +33,8 @@ type Service struct {
// Mutex to protect the servers map
serversMtx sync.RWMutex
notificationGroups notificationgrouping.NotificationGroups
}
func New(
@@ -41,15 +44,17 @@ func New(
stateStore alertmanagertypes.StateStore,
configStore alertmanagertypes.ConfigStore,
orgGetter organization.Getter,
groups notificationgrouping.NotificationGroups,
) *Service {
service := &Service{
config: config,
stateStore: stateStore,
configStore: configStore,
orgGetter: orgGetter,
settings: settings,
servers: make(map[string]*alertmanagerserver.Server),
serversMtx: sync.RWMutex{},
config: config,
stateStore: stateStore,
configStore: configStore,
orgGetter: orgGetter,
settings: settings,
servers: make(map[string]*alertmanagerserver.Server),
serversMtx: sync.RWMutex{},
notificationGroups: groups,
}
return service
@@ -167,7 +172,7 @@ func (service *Service) newServer(ctx context.Context, orgID string) (*alertmana
return nil, err
}
server, err := alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config, orgID, service.stateStore)
server, err := alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config, orgID, service.stateStore, service.notificationGroups)
if err != nil {
return nil, err
}

View File

@@ -9,27 +9,29 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type provider struct {
service *alertmanager.Service
config alertmanager.Config
settings factory.ScopedProviderSettings
configStore alertmanagertypes.ConfigStore
stateStore alertmanagertypes.StateStore
stopC chan struct{}
service *alertmanager.Service
config alertmanager.Config
settings factory.ScopedProviderSettings
configStore alertmanagertypes.ConfigStore
stateStore alertmanagertypes.StateStore
notificationGroups notificationgrouping.NotificationGroups
stopC chan struct{}
}
func NewFactory(sqlstore sqlstore.SQLStore, orgGetter organization.Getter) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
func NewFactory(sqlstore sqlstore.SQLStore, orgGetter organization.Getter, notificationGroups notificationgrouping.NotificationGroups) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanager.Config) (alertmanager.Alertmanager, error) {
return New(ctx, settings, config, sqlstore, orgGetter)
return New(ctx, settings, config, sqlstore, orgGetter, notificationGroups)
})
}
func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore, orgGetter organization.Getter) (*provider, error) {
func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore, orgGetter organization.Getter, notificationGroups notificationgrouping.NotificationGroups) (*provider, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager")
configStore := sqlalertmanagerstore.NewConfigStore(sqlstore)
stateStore := sqlalertmanagerstore.NewStateStore(sqlstore)
@@ -42,12 +44,14 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
stateStore,
configStore,
orgGetter,
notificationGroups,
),
settings: settings,
config: config,
configStore: configStore,
stateStore: stateStore,
stopC: make(chan struct{}),
settings: settings,
config: config,
configStore: configStore,
stateStore: stateStore,
notificationGroups: notificationGroups,
stopC: make(chan struct{}),
}
return p, nil

View File

@@ -0,0 +1,30 @@
package notificationgrouping
import (
"github.com/SigNoz/signoz/pkg/factory"
)
type Config struct {
// Provider is the provider to use for notification grouping.
Provider string `mapstructure:"provider"`
// DefaultStrategy is the default grouping strategy to use when no rule-specific strategy is configured.
DefaultStrategy string `mapstructure:"default_strategy"`
}
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("notificationgrouping"), newConfig)
}
// newConfig creates a new default configuration for notification grouping.
func newConfig() factory.Config {
return Config{
Provider: "rulebased",
DefaultStrategy: "standard",
}
}
// Validate validates the configuration and returns an error if invalid.
func (c Config) Validate() error {
// Add validation logic here if needed
return nil
}

View File

@@ -0,0 +1,15 @@
// Package notificationgrouping provides interfaces and implementations for alert notification grouping strategies.
// It supports multi-tenancy and rule-based grouping configurations.
package notificationgrouping
import (
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/model"
)
// NotificationGroups defines how alerts should be grouped for notification with multi-tenancy support.
type NotificationGroups interface {
GetGroupLabels(orgID string, alert *types.Alert, route *dispatch.Route) model.LabelSet
SetGroupLabels(orgID string, ruleID string, groupByLabels []string) error
}

View File

@@ -0,0 +1,50 @@
package notificationgrouping
import (
"testing"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)
func TestNotificationGroupsInterface(t *testing.T) {
// Test that the interface can be implemented
var ng NotificationGroups = &mockNotificationGroups{}
alert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"alertname": "test_alert",
"severity": "critical",
},
},
}
route := &dispatch.Route{
RouteOpts: dispatch.RouteOpts{
GroupBy: map[model.LabelName]struct{}{
"alertname": {},
},
},
}
groupLabels := ng.GetGroupLabels("test-org", alert, route)
assert.NotNil(t, groupLabels)
err := ng.SetGroupLabels("test-org", "test-rule", []string{"alertname"})
assert.NoError(t, err)
}
type mockNotificationGroups struct{}
func (m *mockNotificationGroups) GetGroupLabels(orgID string, alert *types.Alert, route *dispatch.Route) model.LabelSet {
return model.LabelSet{
"alertname": alert.Labels["alertname"],
}
}
func (m *mockNotificationGroups) SetGroupLabels(orgID string, ruleID string, groupByLabels []string) error {
return nil
}

View File

@@ -0,0 +1,61 @@
package notificationgroupingtest
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/model"
)
type provider struct {
settings factory.ScopedProviderSettings
// Mock data for testing
mockGroupLabels model.LabelSet
mockError error
}
// NewFactory creates a new factory for the test notification grouping strategy.
func NewFactory() factory.ProviderFactory[notificationgrouping.NotificationGroups, notificationgrouping.Config] {
return factory.NewProviderFactory(
factory.MustNewName("test"),
func(ctx context.Context, settings factory.ProviderSettings, config notificationgrouping.Config) (notificationgrouping.NotificationGroups, error) {
return New(ctx, settings, config)
},
)
}
// New creates a new test notification grouping strategy provider.
func New(ctx context.Context, providerSettings factory.ProviderSettings, config notificationgrouping.Config) (notificationgrouping.NotificationGroups, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest")
return &provider{
settings: settings,
mockGroupLabels: model.LabelSet{},
}, nil
}
// GetGroupLabels implements the NotificationGroups interface for testing.
func (p *provider) GetGroupLabels(orgID string, alert *types.Alert, route *dispatch.Route) model.LabelSet {
if p.mockError != nil {
return model.LabelSet{}
}
return p.mockGroupLabels
}
// SetGroupLabels implements the NotificationGroups interface for testing.
func (p *provider) SetGroupLabels(orgID string, ruleID string, groupByLabels []string) error {
return p.mockError
}
// SetMockGroupLabels sets mock group labels for testing.
func (p *provider) SetMockGroupLabels(labels model.LabelSet) {
p.mockGroupLabels = labels
}
// SetMockError sets a mock error for testing.
func (p *provider) SetMockError(err error) {
p.mockError = err
}

View File

@@ -0,0 +1,102 @@
package rulebasedgrouping
import (
"context"
"sync"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/notificationgrouping/standardgrouping"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/model"
)
type provider struct {
settings factory.ScopedProviderSettings
orgToRuleToNotificationGroupBy map[string]map[string][]string
fallbackStrategy notificationgrouping.NotificationGroups
mutex sync.RWMutex
}
// NewFactory creates a new factory for the rule-based grouping strategy.
func NewFactory() factory.ProviderFactory[notificationgrouping.NotificationGroups, notificationgrouping.Config] {
return factory.NewProviderFactory(
factory.MustNewName("rulebased"),
func(ctx context.Context, settings factory.ProviderSettings, config notificationgrouping.Config) (notificationgrouping.NotificationGroups, error) {
return New(ctx, settings, config)
},
)
}
// New creates a new rule-based grouping strategy provider.
func New(ctx context.Context, providerSettings factory.ProviderSettings, config notificationgrouping.Config) (notificationgrouping.NotificationGroups, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/notificationgrouping/rulebasedgrouping")
// Create fallback strategy based on config
fallbackStrategy, err := standardgrouping.New(ctx, providerSettings, config)
if err != nil {
return nil, err
}
return &provider{
settings: settings,
orgToRuleToNotificationGroupBy: make(map[string]map[string][]string),
fallbackStrategy: fallbackStrategy,
}, nil
}
// GetGroupLabels uses rule-specific notificationGroupBy if available, otherwise falls back to standard groupBy.
func (r *provider) GetGroupLabels(orgID string, alert *types.Alert, route *dispatch.Route) model.LabelSet {
r.mutex.RLock()
defer r.mutex.RUnlock()
ruleID := getRuleIDFromRoute(alert)
var notificationGroupBy []string
if orgRules, exists := r.orgToRuleToNotificationGroupBy[orgID]; exists {
if groupBy, ruleExists := orgRules[ruleID]; ruleExists {
notificationGroupBy = groupBy
}
}
groupLabels := r.fallbackStrategy.GetGroupLabels(orgID, alert, route)
if len(notificationGroupBy) > 0 {
// Use notificationGroupBy from rule
for _, labelName := range notificationGroupBy {
if labelValue, exists := alert.Labels[model.LabelName(labelName)]; exists {
groupLabels[model.LabelName(labelName)] = labelValue
}
}
}
return groupLabels
}
// SetGroupLabels updates the rule-to-notificationGroupBy mapping for the specified rule and organization.
func (r *provider) SetGroupLabels(orgID string, ruleID string, groupByLabels []string) error {
if orgID == "" || ruleID == "" {
return nil
}
r.mutex.Lock()
defer r.mutex.Unlock()
// Initialize org map if it doesn't exist
if r.orgToRuleToNotificationGroupBy[orgID] == nil {
r.orgToRuleToNotificationGroupBy[orgID] = make(map[string][]string)
}
r.orgToRuleToNotificationGroupBy[orgID][ruleID] = groupByLabels
return nil
}
func getRuleIDFromRoute(alert *types.Alert) string {
for name, value := range alert.Labels {
if string(name) == "ruleId" {
return string(value)
}
}
return ""
}

View File

@@ -0,0 +1,558 @@
package rulebasedgrouping
import (
"context"
"sync"
"testing"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func createTestProviderSettings() factory.ProviderSettings {
return instrumentationtest.New().ToProviderSettings()
}
func TestNewFactory(t *testing.T) {
providerFactory := NewFactory()
assert.NotNil(t, providerFactory)
assert.Equal(t, "rulebased", providerFactory.Name().String())
}
func TestNew(t *testing.T) {
ctx := context.Background()
providerSettings := createTestProviderSettings()
config := notificationgrouping.Config{}
provider, err := New(ctx, providerSettings, config)
require.NoError(t, err)
assert.NotNil(t, provider)
// Verify provider implements the interface correctly
assert.Implements(t, (*notificationgrouping.NotificationGroups)(nil), provider)
}
func TestProvider_SetGroupLabels(t *testing.T) {
ctx := context.Background()
providerSettings := createTestProviderSettings()
config := notificationgrouping.Config{}
provider, err := New(ctx, providerSettings, config)
require.NoError(t, err)
tests := []struct {
name string
orgID string
ruleID string
groupLabels []string
wantErr bool
}{
{
name: "valid parameters",
orgID: "org1",
ruleID: "rule1",
groupLabels: []string{"alertname", "severity"},
wantErr: false,
},
{
name: "empty orgID",
orgID: "",
ruleID: "rule1",
groupLabels: []string{"alertname"},
wantErr: false, // Should not error but also not set anything
},
{
name: "empty ruleID",
orgID: "org1",
ruleID: "",
groupLabels: []string{"alertname"},
wantErr: false, // Should not error but also not set anything
},
{
name: "nil groupLabels",
orgID: "org1",
ruleID: "rule1",
groupLabels: nil,
wantErr: false,
},
{
name: "empty groupLabels",
orgID: "org1",
ruleID: "rule1",
groupLabels: []string{},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := provider.SetGroupLabels(tt.orgID, tt.ruleID, tt.groupLabels)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
if tt.orgID != "" && tt.ruleID != "" && tt.groupLabels != nil {
alert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"ruleId": model.LabelValue(tt.ruleID),
},
},
}
// Add all group labels to the alert so they can be retrieved
for _, label := range tt.groupLabels {
alert.Labels[model.LabelName(label)] = model.LabelValue("test_value")
}
route := &dispatch.Route{
RouteOpts: dispatch.RouteOpts{
GroupBy: map[model.LabelName]struct{}{},
},
}
groupLabels := provider.GetGroupLabels(tt.orgID, alert, route)
// Verify that all configured group labels are present in the result
for _, expectedLabel := range tt.groupLabels {
_, exists := groupLabels[model.LabelName(expectedLabel)]
assert.True(t, exists, "Expected stored label %s to be present in group labels", expectedLabel)
}
}
}
})
}
}
func TestProvider_SetGroupLabels_VerifyIsolatedStorage(t *testing.T) {
ctx := context.Background()
providerSettings := createTestProviderSettings()
config := notificationgrouping.Config{}
provider, err := New(ctx, providerSettings, config)
require.NoError(t, err)
// Set different labels for different org/rule combinations
err = provider.SetGroupLabels("org1", "rule1", []string{"label_org1_rule1"})
require.NoError(t, err)
err = provider.SetGroupLabels("org1", "rule2", []string{"label_org1_rule2"})
require.NoError(t, err)
err = provider.SetGroupLabels("org2", "rule1", []string{"label_org2_rule1"})
require.NoError(t, err)
// Test that each org/rule combination retrieves only its own labels
testCases := []struct {
orgID string
ruleID string
expectedLabel string
unexpectedLabels []string
}{
{
orgID: "org1",
ruleID: "rule1",
expectedLabel: "label_org1_rule1",
unexpectedLabels: []string{"label_org1_rule2", "label_org2_rule1"},
},
{
orgID: "org1",
ruleID: "rule2",
expectedLabel: "label_org1_rule2",
unexpectedLabels: []string{"label_org1_rule1", "label_org2_rule1"},
},
{
orgID: "org2",
ruleID: "rule1",
expectedLabel: "label_org2_rule1",
unexpectedLabels: []string{"label_org1_rule1", "label_org1_rule2"},
},
}
for _, tc := range testCases {
t.Run(tc.orgID+"_"+tc.ruleID, func(t *testing.T) {
alert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"ruleId": model.LabelValue(tc.ruleID),
model.LabelName(tc.expectedLabel): "test_value",
// Add all possible labels to the alert so we can test isolation
"label_org1_rule1": "test_value",
"label_org1_rule2": "test_value",
"label_org2_rule1": "test_value",
},
},
}
route := &dispatch.Route{
RouteOpts: dispatch.RouteOpts{
GroupBy: map[model.LabelName]struct{}{},
},
}
groupLabels := provider.GetGroupLabels(tc.orgID, alert, route)
// Should have the expected label for this org/rule combination
_, exists := groupLabels[model.LabelName(tc.expectedLabel)]
assert.True(t, exists, "Expected label %s to be present for %s/%s", tc.expectedLabel, tc.orgID, tc.ruleID)
// Should NOT have labels from other org/rule combinations
for _, unexpectedLabel := range tc.unexpectedLabels {
_, exists := groupLabels[model.LabelName(unexpectedLabel)]
assert.False(t, exists, "Unexpected label %s should NOT be present for %s/%s", unexpectedLabel, tc.orgID, tc.ruleID)
}
})
}
}
func TestProvider_GetGroupLabels_WithRuleSpecificGrouping(t *testing.T) {
ctx := context.Background()
providerSettings := createTestProviderSettings()
config := notificationgrouping.Config{}
provider, err := New(ctx, providerSettings, config)
require.NoError(t, err)
// Set up test data
orgID := "test-org"
ruleID := "test-rule"
groupByLabels := []string{"alertname", "severity"}
err = provider.SetGroupLabels(orgID, ruleID, groupByLabels)
require.NoError(t, err)
alert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"ruleId": model.LabelValue(ruleID),
"alertname": "test_alert",
"severity": "critical",
"instance": "localhost:9090",
},
},
}
route := &dispatch.Route{
RouteOpts: dispatch.RouteOpts{
GroupBy: map[model.LabelName]struct{}{
"instance": {},
},
},
}
groupLabels := provider.GetGroupLabels(orgID, alert, route)
assert.NotNil(t, groupLabels)
// Should have rule-specific labels plus fallback labels
expectedKeys := []string{"alertname", "severity", "instance"}
for _, key := range expectedKeys {
_, exists := groupLabels[model.LabelName(key)]
assert.True(t, exists, "Expected key %s to be present in group labels", key)
}
}
func TestProvider_GetGroupLabels_FallbackToStandardGrouping(t *testing.T) {
ctx := context.Background()
providerSettings := createTestProviderSettings()
config := notificationgrouping.Config{}
provider, err := New(ctx, providerSettings, config)
require.NoError(t, err)
alert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"ruleId": "non-existent-rule",
"alertname": "test_alert",
"severity": "critical",
"instance": "localhost:9090",
},
},
}
route := &dispatch.Route{
RouteOpts: dispatch.RouteOpts{
GroupBy: map[model.LabelName]struct{}{
"instance": {},
},
},
}
groupLabels := provider.GetGroupLabels("different-org", alert, route)
assert.NotNil(t, groupLabels)
// Should only have standard grouping labels
_, instanceExists := groupLabels[model.LabelName("instance")]
assert.True(t, instanceExists, "Expected instance label from route groupBy")
_, alertnameExists := groupLabels[model.LabelName("alertname")]
assert.False(t, alertnameExists, "Should not have alertname since it's not in route groupBy")
}
func TestProvider_GetGroupLabels_NoRuleId(t *testing.T) {
ctx := context.Background()
providerSettings := createTestProviderSettings()
config := notificationgrouping.Config{}
provider, err := New(ctx, providerSettings, config)
require.NoError(t, err)
alert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"alertname": "test_alert",
"severity": "critical",
"instance": "localhost:9090",
},
},
}
route := &dispatch.Route{
RouteOpts: dispatch.RouteOpts{
GroupBy: map[model.LabelName]struct{}{
"instance": {},
"severity": {},
},
},
}
groupLabels := provider.GetGroupLabels("test-org", alert, route)
assert.NotNil(t, groupLabels)
// Should have standard grouping labels only
expectedKeys := []string{"instance", "severity"}
for _, key := range expectedKeys {
_, exists := groupLabels[model.LabelName(key)]
assert.True(t, exists, "Expected key %s from route groupBy", key)
}
_, alertnameExists := groupLabels[model.LabelName("alertname")]
assert.False(t, alertnameExists, "Should not have alertname since it's not in route groupBy")
}
func TestProvider_GetGroupLabels_PartialLabelMatch(t *testing.T) {
ctx := context.Background()
providerSettings := createTestProviderSettings()
config := notificationgrouping.Config{}
provider, err := New(ctx, providerSettings, config)
require.NoError(t, err)
orgID := "test-org"
ruleID := "test-rule"
groupByLabels := []string{"alertname", "severity", "missing_label"}
err = provider.SetGroupLabels(orgID, ruleID, groupByLabels)
require.NoError(t, err)
alert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"ruleId": model.LabelValue(ruleID),
"alertname": "test_alert",
"instance": "localhost:9090",
// Note: missing "severity" and "missing_label"
},
},
}
route := &dispatch.Route{
RouteOpts: dispatch.RouteOpts{
GroupBy: map[model.LabelName]struct{}{
"instance": {},
},
},
}
groupLabels := provider.GetGroupLabels(orgID, alert, route)
assert.NotNil(t, groupLabels)
// Should have alertname and instance, but not severity or missing_label
_, alertnameExists := groupLabels[model.LabelName("alertname")]
assert.True(t, alertnameExists, "Expected alertname from rule groupBy")
_, instanceExists := groupLabels[model.LabelName("instance")]
assert.True(t, instanceExists, "Expected instance from route groupBy")
_, severityExists := groupLabels[model.LabelName("severity")]
assert.False(t, severityExists, "Should not have severity since it's not in alert labels")
_, missingExists := groupLabels[model.LabelName("missing_label")]
assert.False(t, missingExists, "Should not have missing_label since it's not in alert labels")
}
func TestGetRuleIDFromRoute(t *testing.T) {
tests := []struct {
name string
alert *types.Alert
expected string
}{
{
name: "alert with ruleId label",
alert: &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"ruleId": "test-rule-123",
"alertname": "test_alert",
},
},
},
expected: "test-rule-123",
},
{
name: "alert without ruleId label",
alert: &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"alertname": "test_alert",
"severity": "critical",
},
},
},
expected: "",
},
{
name: "alert with empty ruleId",
alert: &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"ruleId": "",
"alertname": "test_alert",
},
},
},
expected: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := getRuleIDFromRoute(tt.alert)
assert.Equal(t, tt.expected, result)
})
}
}
func TestProvider_ConcurrentAccess(t *testing.T) {
ctx := context.Background()
providerSettings := createTestProviderSettings()
config := notificationgrouping.Config{}
provider, err := New(ctx, providerSettings, config)
require.NoError(t, err)
var wg sync.WaitGroup
// Writer goroutine
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 50; i++ {
err := provider.SetGroupLabels("org1", "rule1", []string{"label1", "label2"})
assert.NoError(t, err)
}
}()
// Reader goroutine
wg.Add(1)
go func() {
defer wg.Done()
alert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"ruleId": "rule1",
"label1": "value1",
"label2": "value2",
},
},
}
route := &dispatch.Route{
RouteOpts: dispatch.RouteOpts{
GroupBy: map[model.LabelName]struct{}{},
},
}
for i := 0; i < 50; i++ {
groupLabels := provider.GetGroupLabels("org1", alert, route)
assert.NotNil(t, groupLabels)
}
}()
// Wait for both goroutines to complete
wg.Wait()
}
func TestProvider_MultipleOrgsAndRules(t *testing.T) {
ctx := context.Background()
providerSettings := createTestProviderSettings()
config := notificationgrouping.Config{}
provider, err := New(ctx, providerSettings, config)
require.NoError(t, err)
// Set up multiple orgs and rules
err = provider.SetGroupLabels("org1", "rule1", []string{"alertname"})
require.NoError(t, err)
err = provider.SetGroupLabels("org1", "rule2", []string{"severity"})
require.NoError(t, err)
err = provider.SetGroupLabels("org2", "rule1", []string{"instance"})
require.NoError(t, err)
tests := []struct {
name string
orgID string
ruleID string
expectedLabel string
}{
{
name: "org1 rule1",
orgID: "org1",
ruleID: "rule1",
expectedLabel: "alertname",
},
{
name: "org1 rule2",
orgID: "org1",
ruleID: "rule2",
expectedLabel: "severity",
},
{
name: "org2 rule1",
orgID: "org2",
ruleID: "rule1",
expectedLabel: "instance",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
alert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"ruleId": model.LabelValue(tt.ruleID),
"alertname": "test_alert",
"severity": "critical",
"instance": "localhost:9090",
},
},
}
route := &dispatch.Route{
RouteOpts: dispatch.RouteOpts{
GroupBy: map[model.LabelName]struct{}{},
},
}
groupLabels := provider.GetGroupLabels(tt.orgID, alert, route)
// Should have the expected label from rule-specific grouping
_, exists := groupLabels[model.LabelName(tt.expectedLabel)]
assert.True(t, exists, "Expected label %s to be present", tt.expectedLabel)
})
}
}

View File

@@ -0,0 +1,49 @@
package standardgrouping
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/model"
)
type provider struct {
settings factory.ScopedProviderSettings
}
func NewFactory() factory.ProviderFactory[notificationgrouping.NotificationGroups, notificationgrouping.Config] {
return factory.NewProviderFactory(
factory.MustNewName("standard"),
func(ctx context.Context, settings factory.ProviderSettings, config notificationgrouping.Config) (notificationgrouping.NotificationGroups, error) {
return New(ctx, settings, config)
},
)
}
func New(ctx context.Context, providerSettings factory.ProviderSettings, config notificationgrouping.Config) (notificationgrouping.NotificationGroups, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/notificationgrouping/standardgrouping")
return &provider{
settings: settings,
}, nil
}
func (p *provider) GetGroupLabels(orgID string, alert *types.Alert, route *dispatch.Route) model.LabelSet {
// orgID is ignored for standard grouping as it uses route configuration
groupLabels := model.LabelSet{}
for ln, lv := range alert.Labels {
if _, ok := route.RouteOpts.GroupBy[ln]; ok || route.RouteOpts.GroupByAll {
groupLabels[ln] = lv
}
}
return groupLabels
}
func (p *provider) SetGroupLabels(orgID string, ruleID string, groupByLabels []string) error {
// Standard grouping doesn't support dynamic label setting
// Grouping is determined by the route configuration, not rule-specific settings
return nil
}

View File

@@ -14,6 +14,8 @@ import (
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/sharder"
@@ -35,7 +37,9 @@ func TestRegenerateConnectionUrlWithUpdatedConfig(t *testing.T) {
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
require.NoError(err)
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlStore), sharder)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter)
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
require.NoError(err)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter, notificationGroups)
require.NoError(err)
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
emailing := emailingtest.New()
@@ -92,7 +96,9 @@ func TestAgentCheckIns(t *testing.T) {
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
require.NoError(err)
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlStore), sharder)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter)
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
require.NoError(err)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter, notificationGroups)
require.NoError(err)
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
emailing := emailingtest.New()
@@ -188,7 +194,9 @@ func TestCantDisconnectNonExistentAccount(t *testing.T) {
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
require.NoError(err)
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlStore), sharder)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter)
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
require.NoError(err)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter, notificationGroups)
require.NoError(err)
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
emailing := emailingtest.New()
@@ -216,7 +224,9 @@ func TestConfigureService(t *testing.T) {
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
require.NoError(err)
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlStore), sharder)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter)
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
require.NoError(err)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter, notificationGroups)
require.NoError(err)
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
emailing := emailingtest.New()

View File

@@ -12,6 +12,8 @@ import (
"github.com/SigNoz/signoz/pkg/emailing/emailingtest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
"github.com/SigNoz/signoz/pkg/sharder"
"github.com/SigNoz/signoz/pkg/sharder/noopsharder"
"github.com/SigNoz/signoz/pkg/signoz"
@@ -29,7 +31,8 @@ func TestIntegrationLifecycle(t *testing.T) {
providerSettings := instrumentationtest.New().ToProviderSettings()
sharder, _ := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
orgGetter := implorganization.NewGetter(implorganization.NewStore(store), sharder)
alertmanager, _ := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, store, orgGetter)
notificationGroups, _ := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
alertmanager, _ := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Provider: "signoz", Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, store, orgGetter, notificationGroups)
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
emailing := emailingtest.New()
analytics := analyticstest.New()

View File

@@ -15,6 +15,7 @@ import (
"github.com/SigNoz/signoz/pkg/http/middleware"
"github.com/SigNoz/signoz/pkg/licensing/nooplicensing"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querier"
querierAPI "github.com/SigNoz/signoz/pkg/querier"
@@ -95,6 +96,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
signoz.Modules.OrgGetter,
signoz.Querier,
signoz.Instrumentation.Logger(),
signoz.NotificationGroups,
)
if err != nil {
return nil, err
@@ -389,21 +391,23 @@ func makeRulesManager(
orgGetter organization.Getter,
querier querier.Querier,
logger *slog.Logger,
groups notificationgrouping.NotificationGroups,
) (*rules.Manager, error) {
// create manager opts
managerOpts := &rules.ManagerOptions{
TelemetryStore: telemetryStore,
Prometheus: prometheus,
Context: context.Background(),
Logger: zap.L(),
Reader: ch,
Querier: querier,
SLogger: logger,
Cache: cache,
EvalDelay: constants.GetEvalDelay(),
SQLStore: sqlstore,
OrgGetter: orgGetter,
Alertmanager: alertmanager,
TelemetryStore: telemetryStore,
Prometheus: prometheus,
Context: context.Background(),
Logger: zap.L(),
Reader: ch,
Querier: querier,
SLogger: logger,
Cache: cache,
EvalDelay: constants.GetEvalDelay(),
SQLStore: sqlstore,
OrgGetter: orgGetter,
Alertmanager: alertmanager,
NotificationGroups: groups,
}
// create Manager

View File

@@ -84,7 +84,8 @@ type BaseRule struct {
// should be fast but we can still avoid the query if we have the data in memory
TemporalityMap map[string]map[v3.Temporality]bool
sqlstore sqlstore.SQLStore
sqlstore sqlstore.SQLStore
NotificationGroupBy []string
}
type RuleOption func(*BaseRule)
@@ -125,20 +126,21 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader
}
baseRule := &BaseRule{
id: id,
orgID: orgID,
name: p.AlertName,
source: p.Source,
typ: p.AlertType,
ruleCondition: p.RuleCondition,
evalWindow: time.Duration(p.EvalWindow),
labels: qslabels.FromMap(p.Labels),
annotations: qslabels.FromMap(p.Annotations),
preferredChannels: p.PreferredChannels,
health: ruletypes.HealthUnknown,
Active: map[uint64]*ruletypes.Alert{},
reader: reader,
TemporalityMap: make(map[string]map[v3.Temporality]bool),
id: id,
orgID: orgID,
name: p.AlertName,
source: p.Source,
typ: p.AlertType,
ruleCondition: p.RuleCondition,
evalWindow: time.Duration(p.EvalWindow),
labels: qslabels.FromMap(p.Labels),
annotations: qslabels.FromMap(p.Annotations),
preferredChannels: p.PreferredChannels,
health: ruletypes.HealthUnknown,
Active: map[uint64]*ruletypes.Alert{},
reader: reader,
TemporalityMap: make(map[string]map[v3.Temporality]bool),
NotificationGroupBy: p.NotificationGroups,
}
if baseRule.evalWindow == 0 {

View File

@@ -19,6 +19,7 @@ import (
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/prometheus"
querierV5 "github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
@@ -101,6 +102,7 @@ type ManagerOptions struct {
Alertmanager alertmanager.Alertmanager
SQLStore sqlstore.SQLStore
OrgGetter organization.Getter
NotificationGroups notificationgrouping.NotificationGroups
}
// The Manager manages recording and alerting rules.
@@ -120,9 +122,10 @@ type Manager struct {
prepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
prepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
alertmanager alertmanager.Alertmanager
sqlstore sqlstore.SQLStore
orgGetter organization.Getter
alertmanager alertmanager.Alertmanager
sqlstore sqlstore.SQLStore
orgGetter organization.Getter
NotificationGroup notificationgrouping.NotificationGroups
}
func defaultOptions(o *ManagerOptions) *ManagerOptions {
@@ -202,6 +205,13 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
// by calling the Run method.
func NewManager(o *ManagerOptions) (*Manager, error) {
o = defaultOptions(o)
// Validate that NotificationGroups is properly initialized
if o.NotificationGroups == nil {
zap.L().Error("NotificationGroups is nil during Manager creation, this will cause panics")
return nil, fmt.Errorf("NotificationGroups is required but was nil")
}
ruleStore := sqlrulestore.NewRuleStore(o.SQLStore)
maintenanceStore := sqlrulestore.NewMaintenanceStore(o.SQLStore)
@@ -220,8 +230,16 @@ func NewManager(o *ManagerOptions) (*Manager, error) {
alertmanager: o.Alertmanager,
sqlstore: o.SQLStore,
orgGetter: o.OrgGetter,
NotificationGroup: o.NotificationGroups,
}
// Double-check that NotificationGroup was properly assigned
if m.NotificationGroup == nil {
zap.L().Error("NotificationGroup is nil after assignment, this should not happen")
return nil, fmt.Errorf("NotificationGroup assignment failed")
}
zap.L().Debug("Manager created successfully with NotificationGroup")
return m, nil
}
@@ -267,7 +285,6 @@ func (m *Manager) initiate(ctx context.Context) error {
for _, rec := range storedRules {
taskName := fmt.Sprintf("%s-groupname", rec.ID.StringValue())
parsedRule, err := ruletypes.ParsePostableRule([]byte(rec.Data))
if err != nil {
if errors.Is(err, ruletypes.ErrFailedToParseJSON) {
zap.L().Info("failed to load rule in json format, trying yaml now:", zap.String("name", taskName))
@@ -289,6 +306,12 @@ func (m *Manager) initiate(ctx context.Context) error {
}
}
if !parsedRule.Disabled {
if parsedRule.NotificationGroups != nil {
err = m.NotificationGroup.SetGroupLabels(org.ID.StringValue(), rec.ID.StringValue(), parsedRule.NotificationGroups)
if err != nil {
zap.L().Error("failed to load the notification group definition", zap.String("name", rec.ID.StringValue()), zap.Error(err))
}
}
err := m.addTask(ctx, org.ID, parsedRule, taskName)
if err != nil {
zap.L().Error("failed to load the rule definition", zap.String("name", taskName), zap.Error(err))
@@ -370,6 +393,13 @@ func (m *Manager) EditRule(ctx context.Context, ruleStr string, id valuer.UUID)
preferredChannels = parsedRule.PreferredChannels
}
if parsedRule.NotificationGroups != nil {
err = m.NotificationGroup.SetGroupLabels(claims.OrgID, id.StringValue(), parsedRule.NotificationGroups)
if err != nil {
return err
}
}
err = cfg.UpdateRuleIDMatcher(id.StringValue(), preferredChannels)
if err != nil {
return err
@@ -379,7 +409,6 @@ func (m *Manager) EditRule(ctx context.Context, ruleStr string, id valuer.UUID)
if err != nil {
return err
}
err = m.syncRuleStateWithTask(ctx, orgID, prepareTaskName(existingRule.ID.StringValue()), parsedRule)
if err != nil {
return err
@@ -567,7 +596,13 @@ func (m *Manager) CreateRule(ctx context.Context, ruleStr string) (*ruletypes.Ge
}
taskName := prepareTaskName(id.StringValue())
if err := m.addTask(ctx, orgID, parsedRule, taskName); err != nil {
if parsedRule.NotificationGroups != nil {
err = m.NotificationGroup.SetGroupLabels(claims.OrgID, id.StringValue(), parsedRule.NotificationGroups)
if err != nil {
return err
}
}
if err = m.addTask(ctx, orgID, parsedRule, taskName); err != nil {
return err
}

View File

@@ -16,6 +16,8 @@ import (
"github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager"
"github.com/SigNoz/signoz/pkg/analytics/analyticstest"
"github.com/SigNoz/signoz/pkg/emailing/emailingtest"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
"github.com/SigNoz/signoz/pkg/sharder"
"github.com/SigNoz/signoz/pkg/sharder/noopsharder"
"github.com/SigNoz/signoz/pkg/types/authtypes"
@@ -312,7 +314,11 @@ func NewFilterSuggestionsTestBed(t *testing.T) *FilterSuggestionsTestBed {
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
require.NoError(t, err)
orgGetter := implorganization.NewGetter(implorganization.NewStore(testDB), sharder)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, testDB, orgGetter)
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
if err != nil {
t.Fatal(err)
}
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, testDB, orgGetter, notificationGroups)
require.NoError(t, err)
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
emailing := emailingtest.New()

View File

@@ -17,6 +17,8 @@ import (
"github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager"
"github.com/SigNoz/signoz/pkg/analytics/analyticstest"
"github.com/SigNoz/signoz/pkg/emailing/emailingtest"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/user"
@@ -492,7 +494,11 @@ func NewTestbedWithoutOpamp(t *testing.T, sqlStore sqlstore.SQLStore) *LogPipeli
sharder, err := noopsharder.New(context.Background(), providerSettings, sharder.Config{})
require.NoError(t, err)
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlStore), sharder)
alertmanager, err := signozalertmanager.New(context.Background(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter)
notificationGroups, err := notificationgroupingtest.New(context.Background(), providerSettings, notificationgrouping.Config{})
if err != nil {
t.Fatal(err)
}
alertmanager, err := signozalertmanager.New(context.Background(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, sqlStore, orgGetter, notificationGroups)
require.NoError(t, err)
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
emailing := emailingtest.New()

View File

@@ -14,6 +14,8 @@ import (
"github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager"
"github.com/SigNoz/signoz/pkg/analytics/analyticstest"
"github.com/SigNoz/signoz/pkg/emailing/emailingtest"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
"github.com/SigNoz/signoz/pkg/sharder"
"github.com/SigNoz/signoz/pkg/sharder/noopsharder"
"github.com/SigNoz/signoz/pkg/types/authtypes"
@@ -373,7 +375,11 @@ func NewCloudIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *CloudI
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
require.NoError(t, err)
orgGetter := implorganization.NewGetter(implorganization.NewStore(testDB), sharder)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, testDB, orgGetter)
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
if err != nil {
t.Fatal(err)
}
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, testDB, orgGetter, notificationGroups)
require.NoError(t, err)
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
emailing := emailingtest.New()

View File

@@ -14,6 +14,8 @@ import (
"github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager"
"github.com/SigNoz/signoz/pkg/analytics/analyticstest"
"github.com/SigNoz/signoz/pkg/emailing/emailingtest"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
"github.com/SigNoz/signoz/pkg/http/middleware"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
@@ -588,7 +590,11 @@ func NewIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *Integration
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
require.NoError(t, err)
orgGetter := implorganization.NewGetter(implorganization.NewStore(testDB), sharder)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, testDB, orgGetter)
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
if err != nil {
t.Fatal(err)
}
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{Signoz: alertmanager.Signoz{PollInterval: 10 * time.Second, Config: alertmanagerserver.NewConfig()}}, testDB, orgGetter, notificationGroups)
require.NoError(t, err)
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
emailing := emailingtest.New()

View File

@@ -10,6 +10,8 @@ import (
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager"
"github.com/SigNoz/signoz/pkg/emailing/emailingtest"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
"github.com/SigNoz/signoz/pkg/factory/factorytest"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/sharder"
@@ -29,7 +31,9 @@ func TestNewHandlers(t *testing.T) {
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
require.NoError(t, err)
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstore), sharder)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter)
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
require.NoError(t, err)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationGroups)
require.NoError(t, err)
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
emailing := emailingtest.New()

View File

@@ -10,6 +10,8 @@ import (
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager"
"github.com/SigNoz/signoz/pkg/emailing/emailingtest"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
"github.com/SigNoz/signoz/pkg/factory/factorytest"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/sharder"
@@ -29,7 +31,9 @@ func TestNewModules(t *testing.T) {
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
require.NoError(t, err)
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstore), sharder)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter)
notificationGroups, err := notificationgroupingtest.New(context.TODO(), providerSettings, notificationgrouping.Config{})
require.NoError(t, err)
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationGroups)
require.NoError(t, err)
jwt := authtypes.NewJWT("", 1*time.Hour, 1*time.Hour)
emailing := emailingtest.New()

View File

@@ -16,6 +16,10 @@ import (
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
"github.com/SigNoz/signoz/pkg/notificationgrouping/rulebasedgrouping"
"github.com/SigNoz/signoz/pkg/notificationgrouping/standardgrouping"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/prometheus/clickhouseprometheus"
"github.com/SigNoz/signoz/pkg/querier"
@@ -153,10 +157,18 @@ func NewPrometheusProviderFactories(telemetryStore telemetrystore.TelemetryStore
)
}
func NewAlertmanagerProviderFactories(sqlstore sqlstore.SQLStore, orgGetter organization.Getter) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] {
func NewNotificationGroupingProviderFactories() factory.NamedMap[factory.ProviderFactory[notificationgrouping.NotificationGroups, notificationgrouping.Config]] {
return factory.MustNewNamedMap(
rulebasedgrouping.NewFactory(),
standardgrouping.NewFactory(),
notificationgroupingtest.NewFactory(),
)
}
func NewAlertmanagerProviderFactories(sqlstore sqlstore.SQLStore, orgGetter organization.Getter, notificationGroups notificationgrouping.NotificationGroups) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] {
return factory.MustNewNamedMap(
legacyalertmanager.NewFactory(sqlstore, orgGetter),
signozalertmanager.NewFactory(sqlstore, orgGetter),
signozalertmanager.NewFactory(sqlstore, orgGetter, notificationGroups),
)
}

View File

@@ -1,12 +1,15 @@
package signoz
import (
"context"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/SigNoz/signoz/pkg/analytics"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/notificationgrouping/notificationgroupingtest"
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
"github.com/SigNoz/signoz/pkg/sqlschema"
"github.com/SigNoz/signoz/pkg/sqlschema/sqlschematest"
@@ -54,7 +57,8 @@ func TestNewProviderFactories(t *testing.T) {
assert.NotPanics(t, func() {
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual)), nil)
NewAlertmanagerProviderFactories(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual), orgGetter)
notificationGroups, _ := notificationgroupingtest.New(context.TODO(), instrumentationtest.New().ToProviderSettings(), notificationgrouping.Config{})
NewAlertmanagerProviderFactories(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual), orgGetter, notificationGroups)
})
assert.NotPanics(t, func() {

View File

@@ -2,7 +2,6 @@ package signoz
import (
"context"
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/analytics"
"github.com/SigNoz/signoz/pkg/cache"
@@ -13,6 +12,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
"github.com/SigNoz/signoz/pkg/notificationgrouping"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/ruler"
@@ -32,23 +32,24 @@ import (
type SigNoz struct {
*factory.Registry
Instrumentation instrumentation.Instrumentation
Analytics analytics.Analytics
Cache cache.Cache
Web web.Web
SQLStore sqlstore.SQLStore
TelemetryStore telemetrystore.TelemetryStore
Prometheus prometheus.Prometheus
Alertmanager alertmanager.Alertmanager
Querier querier.Querier
Rules ruler.Ruler
Zeus zeus.Zeus
Licensing licensing.Licensing
Emailing emailing.Emailing
Sharder sharder.Sharder
StatsReporter statsreporter.StatsReporter
Modules Modules
Handlers Handlers
Instrumentation instrumentation.Instrumentation
Analytics analytics.Analytics
Cache cache.Cache
Web web.Web
SQLStore sqlstore.SQLStore
TelemetryStore telemetrystore.TelemetryStore
Prometheus prometheus.Prometheus
Alertmanager alertmanager.Alertmanager
Querier querier.Querier
Rules ruler.Ruler
Zeus zeus.Zeus
Licensing licensing.Licensing
Emailing emailing.Emailing
Sharder sharder.Sharder
StatsReporter statsreporter.StatsReporter
Modules Modules
Handlers Handlers
NotificationGroups notificationgrouping.NotificationGroups
}
func New(
@@ -230,12 +231,27 @@ func New(
// Initialize user getter
userGetter := impluser.NewGetter(impluser.NewStore(sqlstore, providerSettings))
// shared NotificationGroups instance for both alertmanager and rules
notificationGroups, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
notificationgrouping.Config{
Provider: "rulebased",
DefaultStrategy: "standard",
},
NewNotificationGroupingProviderFactories(),
"rulebased",
)
if err != nil {
return nil, err
}
// Initialize alertmanager from the available alertmanager provider factories
alertmanager, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
config.Alertmanager,
NewAlertmanagerProviderFactories(sqlstore, orgGetter),
NewAlertmanagerProviderFactories(sqlstore, orgGetter, notificationGroups),
config.Alertmanager.Provider,
)
if err != nil {
@@ -305,21 +321,23 @@ func New(
}
return &SigNoz{
Registry: registry,
Analytics: analytics,
Instrumentation: instrumentation,
Cache: cache,
Web: web,
SQLStore: sqlstore,
TelemetryStore: telemetrystore,
Prometheus: prometheus,
Alertmanager: alertmanager,
Querier: querier,
Zeus: zeus,
Licensing: licensing,
Emailing: emailing,
Sharder: sharder,
Modules: modules,
Handlers: handlers,
Registry: registry,
Analytics: analytics,
Instrumentation: instrumentation,
Cache: cache,
Web: web,
SQLStore: sqlstore,
TelemetryStore: telemetrystore,
Prometheus: prometheus,
Alertmanager: alertmanager,
Querier: querier,
Rules: ruler,
Zeus: zeus,
Licensing: licensing,
Emailing: emailing,
Sharder: sharder,
Modules: modules,
Handlers: handlers,
NotificationGroups: notificationGroups,
}, nil
}

View File

@@ -67,6 +67,8 @@ type PostableRule struct {
// legacy
Expr string `yaml:"expr,omitempty" json:"expr,omitempty"`
OldYaml string `json:"yaml,omitempty"`
NotificationGroups []string `yaml:"notificationGroups,omitempty" json:"notificationGroups,omitempty"`
}
func ParsePostableRule(content []byte) (*PostableRule, error) {