diff --git a/.gitignore b/.gitignore index 33ad6ab250..8a3a586a99 100644 --- a/.gitignore +++ b/.gitignore @@ -76,3 +76,5 @@ dist/ # ignore user_scripts that is fetched by init-clickhouse deploy/common/clickhouse/user_scripts/ + +queries.active \ No newline at end of file diff --git a/go.mod b/go.mod index 2ec3741c39..fd5bd41530 100644 --- a/go.mod +++ b/go.mod @@ -55,6 +55,7 @@ require ( github.com/soheilhy/cmux v0.1.5 github.com/srikanthccv/ClickHouse-go-mock v0.9.0 github.com/stretchr/testify v1.10.0 + github.com/tidwall/gjson v1.18.0 github.com/uptrace/bun v1.2.9 github.com/uptrace/bun/dialect/pgdialect v1.2.9 github.com/uptrace/bun/dialect/sqlitedialect v1.2.9 @@ -204,6 +205,8 @@ require ( github.com/smarty/assertions v1.15.0 // indirect github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect github.com/tklauser/go-sysconf v0.3.13 // indirect github.com/tklauser/numcpus v0.7.0 // indirect github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect diff --git a/go.sum b/go.sum index 1ad277145d..9a1b7f9a52 100644 --- a/go.sum +++ b/go.sum @@ -891,7 +891,13 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8 github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4= github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0= github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4= diff --git a/pkg/alertmanager/alertmanager.go b/pkg/alertmanager/alertmanager.go index c228386ff7..4f9f87e659 100644 --- a/pkg/alertmanager/alertmanager.go +++ b/pkg/alertmanager/alertmanager.go @@ -15,7 +15,7 @@ var ( type Alertmanager interface { factory.Service // GetAlerts gets the alerts from the alertmanager per organization. - GetAlerts(context.Context, string, alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) + GetAlerts(context.Context, string, alertmanagertypes.GettableAlertsParams) (alertmanagertypes.DeprecatedGettableAlerts, error) // PutAlerts puts the alerts into the alertmanager per organization. PutAlerts(context.Context, string, alertmanagertypes.PostableAlerts) error @@ -23,18 +23,30 @@ type Alertmanager interface { // TestReceiver sends a test alert to a receiver. TestReceiver(context.Context, string, alertmanagertypes.Receiver) error + // TestAlert sends an alert to a list of receivers. + TestAlert(ctx context.Context, orgID string, alert *alertmanagertypes.PostableAlert, receivers []string) error + // ListChannels lists all channels for the organization. ListChannels(context.Context, string) ([]*alertmanagertypes.Channel, error) + // ListAllChannels lists all channels for all organizations. It is used by the legacy alertmanager only. + ListAllChannels(context.Context) ([]*alertmanagertypes.Channel, error) + // GetChannelByID gets a channel for the organization. GetChannelByID(context.Context, string, int) (*alertmanagertypes.Channel, error) // UpdateChannel updates a channel for the organization. - UpdateChannelByReceiver(context.Context, string, alertmanagertypes.Receiver) error + UpdateChannelByReceiverAndID(context.Context, string, alertmanagertypes.Receiver, int) error // CreateChannel creates a channel for the organization. CreateChannel(context.Context, string, alertmanagertypes.Receiver) error // DeleteChannelByID deletes a channel for the organization. DeleteChannelByID(context.Context, string, int) error + + // SetConfig sets the config for the organization. + SetConfig(context.Context, *alertmanagertypes.Config) error + + // GetConfig gets the config for the organization. + GetConfig(context.Context, string) (*alertmanagertypes.Config, error) } diff --git a/pkg/alertmanager/alertmanagerbatcher/batcher.go b/pkg/alertmanager/alertmanagerbatcher/batcher.go index 1264f1dce5..34e68d2806 100644 --- a/pkg/alertmanager/alertmanagerbatcher/batcher.go +++ b/pkg/alertmanager/alertmanagerbatcher/batcher.go @@ -9,9 +9,11 @@ import ( "go.signoz.io/signoz/pkg/types/alertmanagertypes" ) -// Notifier is responsible for dispatching alert notifications to an alertmanager. +// Batcher is responsible for batching alerts and broadcasting them on a channel. type Batcher struct { + // C is the channel on which alerts are sent to alertmanager C chan alertmanagertypes.PostableAlerts + // logger logger *slog.Logger @@ -23,14 +25,21 @@ type Batcher struct { // more channel to signal the sender goroutine to send alerts moreC chan struct{} + + // stop channel to signal the sender goroutine to stop stopC chan struct{} - mtx sync.RWMutex + + // mutex to synchronize access to the queue + queueMtx sync.RWMutex + + // wait group to wait for all goroutines to finish + goroutinesWg sync.WaitGroup } func New(logger *slog.Logger, config Config) *Batcher { batcher := &Batcher{ logger: logger, - queue: make(alertmanagertypes.PostableAlerts, config.Capacity), + queue: make(alertmanagertypes.PostableAlerts, 0, config.Capacity), config: config, moreC: make(chan struct{}, 1), stopC: make(chan struct{}), @@ -41,22 +50,27 @@ func New(logger *slog.Logger, config Config) *Batcher { } // Start dispatches notifications continuously. -func (n *Batcher) Start(ctx context.Context) error { +func (batcher *Batcher) Start(ctx context.Context) error { + batcher.goroutinesWg.Add(1) go func() { - n.logger.InfoContext(ctx, "starting alertmanager batcher") + defer batcher.goroutinesWg.Done() + for { select { - case <-ctx.Done(): + case <-batcher.stopC: + for batcher.queueLen() > 0 { + alerts := batcher.next() + batcher.C <- alerts + } + close(batcher.C) return - case <-n.stopC: - return - case <-n.moreC: + case <-batcher.moreC: } - alerts := n.nextBatch() - n.C <- alerts + alerts := batcher.next() + batcher.C <- alerts // If the queue still has items left, kick off the next iteration. - if n.queueLen() > 0 { - n.setMore() + if batcher.queueLen() > 0 { + batcher.setMore() } } }() @@ -64,69 +78,67 @@ func (n *Batcher) Start(ctx context.Context) error { return nil } -func (n *Batcher) queueLen() int { - n.mtx.RLock() - defer n.mtx.RUnlock() +// Add queues the given alerts for processing. +func (batcher *Batcher) Add(ctx context.Context, alerts ...*alertmanagertypes.PostableAlert) { + batcher.queueMtx.Lock() + defer batcher.queueMtx.Unlock() - return len(n.queue) + // Queue capacity should be significantly larger than a single alert + // batch could be. + if d := len(alerts) - batcher.config.Capacity; d > 0 { + alerts = alerts[d:] + batcher.logger.WarnContext(ctx, "alert batch larger than queue capacity, dropping alerts", "num_dropped", d, "capacity", batcher.config.Capacity) + } + + // If the queue is full, remove the oldest alerts in favor + // of newer ones. + if d := (len(batcher.queue) + len(alerts)) - batcher.config.Capacity; d > 0 { + batcher.queue = batcher.queue[d:] + batcher.logger.WarnContext(ctx, "alert batch queue full, dropping alerts", "num_dropped", d) + } + + batcher.queue = append(batcher.queue, alerts...) + + // Notify sending goroutine that there are alerts to be processed. + batcher.setMore() } -func (n *Batcher) nextBatch() alertmanagertypes.PostableAlerts { - n.mtx.Lock() - defer n.mtx.Unlock() +// Stop shuts down the batcher. +func (batcher *Batcher) Stop(ctx context.Context) { + close(batcher.stopC) + batcher.goroutinesWg.Wait() +} + +func (batcher *Batcher) queueLen() int { + batcher.queueMtx.RLock() + defer batcher.queueMtx.RUnlock() + + return len(batcher.queue) +} + +func (batcher *Batcher) next() alertmanagertypes.PostableAlerts { + batcher.queueMtx.Lock() + defer batcher.queueMtx.Unlock() var alerts alertmanagertypes.PostableAlerts - if len(n.queue) > n.config.Size { - alerts = append(make(alertmanagertypes.PostableAlerts, 0, n.config.Size), n.queue[:n.config.Size]...) - n.queue = n.queue[n.config.Size:] + if len(batcher.queue) > batcher.config.Size { + alerts = append(make(alertmanagertypes.PostableAlerts, 0, batcher.config.Size), batcher.queue[:batcher.config.Size]...) + batcher.queue = batcher.queue[batcher.config.Size:] } else { - alerts = append(make(alertmanagertypes.PostableAlerts, 0, len(n.queue)), n.queue...) - n.queue = n.queue[:0] + alerts = append(make(alertmanagertypes.PostableAlerts, 0, len(batcher.queue)), batcher.queue...) + batcher.queue = batcher.queue[:0] } return alerts } -// Send queues the given notification requests for processing. -// Panics if called on a handler that is not running. -func (n *Batcher) Send(ctx context.Context, alerts ...*alertmanagertypes.PostableAlert) { - n.mtx.Lock() - defer n.mtx.Unlock() - - // Queue capacity should be significantly larger than a single alert - // batch could be. - if d := len(alerts) - n.config.Capacity; d > 0 { - alerts = alerts[d:] - n.logger.WarnContext(ctx, "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d) - } - - // If the queue is full, remove the oldest alerts in favor - // of newer ones. - if d := (len(n.queue) + len(alerts)) - n.config.Capacity; d > 0 { - n.queue = n.queue[d:] - - n.logger.WarnContext(ctx, "Alert notification queue full, dropping alerts", "num_dropped", d) - } - n.queue = append(n.queue, alerts...) - - // Notify sending goroutine that there are alerts to be processed. - n.setMore() -} - // setMore signals that the alert queue has items. -func (n *Batcher) setMore() { +func (batcher *Batcher) setMore() { // If we cannot send on the channel, it means the signal already exists // and has not been consumed yet. select { - case n.moreC <- struct{}{}: + case batcher.moreC <- struct{}{}: default: } } - -// Stop shuts down the notification handler. -func (n *Batcher) Stop(ctx context.Context) { - n.logger.InfoContext(ctx, "Stopping alertmanager batcher") - close(n.moreC) - close(n.stopC) -} diff --git a/pkg/alertmanager/alertmanagerbatcher/batcher_test.go b/pkg/alertmanager/alertmanagerbatcher/batcher_test.go new file mode 100644 index 0000000000..1017505aea --- /dev/null +++ b/pkg/alertmanager/alertmanagerbatcher/batcher_test.go @@ -0,0 +1,64 @@ +package alertmanagerbatcher + +import ( + "context" + "io" + "log/slog" + "testing" + + "github.com/stretchr/testify/assert" + "go.signoz.io/signoz/pkg/types/alertmanagertypes" +) + +func TestBatcherWithOneAlertAndDefaultConfigs(t *testing.T) { + batcher := New(slog.New(slog.NewTextHandler(io.Discard, nil)), NewConfig()) + batcher.Start(context.Background()) + + batcher.Add(context.Background(), &alertmanagertypes.PostableAlert{Alert: alertmanagertypes.AlertModel{ + Labels: map[string]string{"alertname": "test"}, + }}) + + alerts := <-batcher.C + assert.Equal(t, 1, len(alerts)) + + batcher.Stop(context.Background()) +} + +func TestBatcherWithBatchSize(t *testing.T) { + batcher := New(slog.New(slog.NewTextHandler(io.Discard, nil)), Config{Size: 2, Capacity: 4}) + batcher.Start(context.Background()) + + var alerts alertmanagertypes.PostableAlerts + for i := 0; i < 4; i++ { + alerts = append(alerts, &alertmanagertypes.PostableAlert{Alert: alertmanagertypes.AlertModel{ + Labels: map[string]string{"alertname": "test"}, + }}) + } + batcher.Add(context.Background(), alerts...) + + for i := 0; i < 2; i++ { + alerts := <-batcher.C + assert.Equal(t, 2, len(alerts)) + } + + batcher.Stop(context.Background()) +} + +func TestBatcherWithCClosed(t *testing.T) { + batcher := New(slog.New(slog.NewTextHandler(io.Discard, nil)), Config{Size: 2, Capacity: 4}) + batcher.Start(context.Background()) + + var alerts alertmanagertypes.PostableAlerts + for i := 0; i < 4; i++ { + alerts = append(alerts, &alertmanagertypes.PostableAlert{Alert: alertmanagertypes.AlertModel{ + Labels: map[string]string{"alertname": "test"}, + }}) + } + batcher.Add(context.Background(), alerts...) + + batcher.Stop(context.Background()) + + for alerts := range batcher.C { + assert.Equal(t, 2, len(alerts)) + } +} diff --git a/pkg/alertmanager/alertmanagerbatcher/config.go b/pkg/alertmanager/alertmanagerbatcher/config.go index ca6446a58a..368f80cfe2 100644 --- a/pkg/alertmanager/alertmanagerbatcher/config.go +++ b/pkg/alertmanager/alertmanagerbatcher/config.go @@ -1,13 +1,16 @@ package alertmanagerbatcher type Config struct { + // Capacity is the maximum number of alerts that can be buffered in the batcher. Capacity int - Size int + + // Size is the number of alerts to send in each batch. + Size int } func NewConfig() Config { return Config{ - Capacity: 1000, + Capacity: 10000, Size: 64, } } diff --git a/pkg/alertmanager/alertmanagerserver/server.go b/pkg/alertmanager/alertmanagerserver/server.go index 3ab34d0df8..088cec494e 100644 --- a/pkg/alertmanager/alertmanagerserver/server.go +++ b/pkg/alertmanager/alertmanagerserver/server.go @@ -66,7 +66,7 @@ type Server struct { func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registerer, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore) (*Server, error) { server := &Server{ - logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/server"), + logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/alertmanagerserver"), registry: registry, srvConfig: srvConfig, orgID: orgID, @@ -84,7 +84,10 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere silencesSnapshot := "" if state != nil { - silencesSnapshot = state.Silences + silencesSnapshot, err = state.Get(alertmanagertypes.SilenceStateName) + if err != nil && !errors.Ast(err, errors.TypeNotFound) { + return nil, err + } } // Initialize silences server.silences, err = silence.New(silence.Options{ @@ -103,7 +106,10 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere nflogSnapshot := "" if state != nil { - nflogSnapshot = state.NFLog + nflogSnapshot, err = state.Get(alertmanagertypes.NFLogStateName) + if err != nil && !errors.Ast(err, errors.TypeNotFound) { + return nil, err + } } // Initialize notification log @@ -308,7 +314,51 @@ func (server *Server) SetConfig(ctx context.Context, alertmanagerConfig *alertma } func (server *Server) TestReceiver(ctx context.Context, receiver alertmanagertypes.Receiver) error { - return alertmanagertypes.TestReceiver(ctx, receiver, server.tmpl, server.logger) + return alertmanagertypes.TestReceiver(ctx, receiver, server.tmpl, server.logger, alertmanagertypes.NewTestAlert(receiver, time.Now(), time.Now())) +} + +func (server *Server) TestAlert(ctx context.Context, postableAlert *alertmanagertypes.PostableAlert, receivers []string) error { + alerts, err := alertmanagertypes.NewAlertsFromPostableAlerts(alertmanagertypes.PostableAlerts{postableAlert}, time.Duration(server.srvConfig.Global.ResolveTimeout), time.Now()) + if err != nil { + return errors.Join(err...) + } + + if len(alerts) != 1 { + return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "expected 1 alert, got %d", len(alerts)) + } + + ch := make(chan error, len(receivers)) + for _, receiverName := range receivers { + go func(receiverName string) { + receiver, err := server.alertmanagerConfig.GetReceiver(receiverName) + if err != nil { + ch <- err + return + } + ch <- alertmanagertypes.TestReceiver(ctx, receiver, server.tmpl, server.logger, alerts[0]) + }(receiverName) + } + + var errs []error + for i := 0; i < len(receivers); i++ { + if err := <-ch; err != nil { + errs = append(errs, err) + } + } + + if errs != nil { + return errors.Join(errs...) + } + + return nil +} + +func (server *Server) Hash() string { + if server.alertmanagerConfig == nil { + return "" + } + + return server.alertmanagerConfig.StoreableConfig().Hash } func (server *Server) Stop(ctx context.Context) error { diff --git a/pkg/alertmanager/alertmanagerserver/server_test.go b/pkg/alertmanager/alertmanagerserver/server_test.go index bb2d6a6c57..339fddf4c7 100644 --- a/pkg/alertmanager/alertmanagerserver/server_test.go +++ b/pkg/alertmanager/alertmanagerserver/server_test.go @@ -87,7 +87,7 @@ func TestServerPutAlerts(t *testing.T) { amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1") require.NoError(t, err) - require.NoError(t, amConfig.CreateReceiver(&config.Route{Receiver: "test-receiver", Continue: true}, alertmanagertypes.Receiver{ + require.NoError(t, amConfig.CreateReceiver(alertmanagertypes.Receiver{ Name: "test-receiver", WebhookConfigs: []*config.WebhookConfig{ { diff --git a/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go b/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go index 2dfa5ef424..7c6dd90489 100644 --- a/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go +++ b/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go @@ -3,7 +3,10 @@ package sqlalertmanagerstore import ( "context" "database/sql" + "strconv" + "github.com/tidwall/gjson" + "github.com/uptrace/bun" "go.signoz.io/signoz/pkg/errors" "go.signoz.io/signoz/pkg/sqlstore" "go.signoz.io/signoz/pkg/types/alertmanagertypes" @@ -45,46 +48,20 @@ func (store *config) Get(ctx context.Context, orgID string) (*alertmanagertypes. } // Set implements alertmanagertypes.ConfigStore. -func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config, cb func(context.Context) error) error { - tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil) - if err != nil { - return err - } - - defer tx.Rollback() //nolint:errcheck - - if _, err = tx. +func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config) error { + if _, err := store. + sqlstore. + BunDB(). NewInsert(). Model(config.StoreableConfig()). On("CONFLICT (org_id) DO UPDATE"). - Set("config = ?", string(config.StoreableConfig().Config)). + Set("config = ?", config.StoreableConfig().Config). + Set("hash = ?", config.StoreableConfig().Hash). Set("updated_at = ?", config.StoreableConfig().UpdatedAt). Exec(ctx); err != nil { return err } - channels := config.Channels() - if len(channels) != 0 { - if _, err = tx.NewInsert(). - Model(&channels). - On("CONFLICT (name) DO UPDATE"). - Set("data = EXCLUDED.data"). - Set("updated_at = EXCLUDED.updated_at"). - Exec(ctx); err != nil { - return err - } - } - - if cb != nil { - if err = cb(ctx); err != nil { - return err - } - } - - if err = tx.Commit(); err != nil { - return err - } - return nil } @@ -104,3 +81,176 @@ func (store *config) ListOrgs(ctx context.Context) ([]string, error) { return orgIDs, nil } + +func (store *config) CreateChannel(ctx context.Context, channel *alertmanagertypes.Channel, cb func(context.Context) error) error { + tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil) + if err != nil { + return err + } + + defer tx.Rollback() //nolint:errcheck + + if _, err = tx.NewInsert(). + Model(channel). + Exec(ctx); err != nil { + return err + } + + if cb != nil { + if err = cb(ctx); err != nil { + return err + } + } + + if err = tx.Commit(); err != nil { + return err + } + + return nil +} + +func (store *config) GetChannelByID(ctx context.Context, orgID string, id int) (*alertmanagertypes.Channel, error) { + channel := new(alertmanagertypes.Channel) + + err := store. + sqlstore. + BunDB(). + NewSelect(). + Model(channel). + Where("org_id = ?", orgID). + Where("id = ?", id). + Scan(ctx) + if err != nil { + if err == sql.ErrNoRows { + return nil, errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", id) + } + return nil, err + } + + return channel, nil +} + +func (store *config) UpdateChannel(ctx context.Context, orgID string, channel *alertmanagertypes.Channel, cb func(context.Context) error) error { + tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil) + if err != nil { + return err + } + + defer tx.Rollback() //nolint:errcheck + + _, err = tx.NewUpdate(). + Model(channel). + WherePK(). + Exec(ctx) + if err != nil { + return err + } + + if cb != nil { + if err = cb(ctx); err != nil { + return err + } + } + + if err = tx.Commit(); err != nil { + return err + } + + return nil +} + +func (store *config) DeleteChannelByID(ctx context.Context, orgID string, id int, cb func(context.Context) error) error { + channel := new(alertmanagertypes.Channel) + + tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil) + if err != nil { + return err + } + + defer tx.Rollback() //nolint:errcheck + + _, err = tx.NewDelete(). + Model(channel). + Where("org_id = ?", orgID). + Where("id = ?", id). + Exec(ctx) + if err != nil { + return err + } + + if cb != nil { + if err = cb(ctx); err != nil { + return err + } + } + + if err = tx.Commit(); err != nil { + return err + } + + return nil +} + +func (store *config) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) { + var channels []*alertmanagertypes.Channel + + err := store. + sqlstore. + BunDB(). + NewSelect(). + Model(&channels). + Where("org_id = ?", orgID). + Scan(ctx) + if err != nil { + return nil, err + } + + return channels, nil +} + +func (store *config) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) { + var channels []*alertmanagertypes.Channel + + err := store. + sqlstore. + BunDB(). + NewSelect(). + Model(&channels). + Scan(ctx) + if err != nil { + return nil, err + } + + return channels, nil +} + +func (store *config) GetMatchers(ctx context.Context, orgID string) (map[string][]string, error) { + type matcher struct { + bun.BaseModel `bun:"table:rules"` + ID int `bun:"id,pk"` + Data string `bun:"data"` + } + + matchers := []matcher{} + + err := store. + sqlstore. + BunDB(). + NewSelect(). + Column("id", "data"). + Model(&matchers). + Scan(ctx) + if err != nil { + return nil, err + } + + matchersMap := make(map[string][]string) + for _, matcher := range matchers { + receivers := gjson.Get(matcher.Data, "preferredChannels").Array() + for _, receiver := range receivers { + matchersMap[strconv.Itoa(matcher.ID)] = append(matchersMap[strconv.Itoa(matcher.ID)], receiver.String()) + } + } + + return matchersMap, nil +} diff --git a/pkg/alertmanager/api.go b/pkg/alertmanager/api.go index c34fd3d636..1f7e7bbaa7 100644 --- a/pkg/alertmanager/api.go +++ b/pkg/alertmanager/api.go @@ -24,7 +24,7 @@ func NewAPI(alertmanager Alertmanager) *API { } } -func (api *API) GetAlerts(req *http.Request, rw http.ResponseWriter) { +func (api *API) GetAlerts(rw http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() @@ -49,7 +49,7 @@ func (api *API) GetAlerts(req *http.Request, rw http.ResponseWriter) { render.Success(rw, http.StatusOK, alerts) } -func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) { +func (api *API) TestReceiver(rw http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() @@ -81,7 +81,7 @@ func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) { render.Success(rw, http.StatusNoContent, nil) } -func (api *API) ListChannels(req *http.Request, rw http.ResponseWriter) { +func (api *API) ListChannels(rw http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() @@ -100,7 +100,20 @@ func (api *API) ListChannels(req *http.Request, rw http.ResponseWriter) { render.Success(rw, http.StatusOK, channels) } -func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) { +func (api *API) ListAllChannels(rw http.ResponseWriter, req *http.Request) { + ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) + defer cancel() + + channels, err := api.alertmanager.ListAllChannels(ctx) + if err != nil { + render.Error(rw, err) + return + } + + render.Success(rw, http.StatusOK, channels) +} + +func (api *API) GetChannelByID(rw http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() @@ -137,7 +150,7 @@ func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) { render.Success(rw, http.StatusOK, channel) } -func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) { +func (api *API) UpdateChannelByID(rw http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() @@ -147,6 +160,24 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) { return } + vars := mux.Vars(req) + if vars == nil { + render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path")) + return + } + + idString, ok := vars["id"] + if !ok { + render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path")) + return + } + + id, err := strconv.Atoi(idString) + if err != nil { + render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid integer")) + return + } + body, err := io.ReadAll(req.Body) if err != nil { render.Error(rw, err) @@ -160,7 +191,7 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) { return } - err = api.alertmanager.UpdateChannelByReceiver(ctx, claims.OrgID, receiver) + err = api.alertmanager.UpdateChannelByReceiverAndID(ctx, claims.OrgID, receiver, id) if err != nil { render.Error(rw, err) return @@ -169,7 +200,7 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) { render.Success(rw, http.StatusNoContent, nil) } -func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) { +func (api *API) DeleteChannelByID(rw http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() @@ -206,7 +237,7 @@ func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) { render.Success(rw, http.StatusNoContent, nil) } -func (api *API) CreateChannel(req *http.Request, rw http.ResponseWriter) { +func (api *API) CreateChannel(rw http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() diff --git a/pkg/alertmanager/config.go b/pkg/alertmanager/config.go index 261d2b2dbe..861c707a8f 100644 --- a/pkg/alertmanager/config.go +++ b/pkg/alertmanager/config.go @@ -1,6 +1,8 @@ package alertmanager import ( + "errors" + "fmt" "net/url" "time" @@ -9,9 +11,6 @@ import ( ) type Config struct { - // Config is the config for the alertmanager server. - alertmanagerserver.Config `mapstructure:",squash"` - // Provider is the provider for the alertmanager service. Provider string `mapstructure:"provider"` @@ -25,11 +24,14 @@ type Config struct { type Signoz struct { // PollInterval is the interval at which the alertmanager is synced. PollInterval time.Duration `mapstructure:"poll_interval"` + + // Config is the config for the alertmanager server. + alertmanagerserver.Config `mapstructure:",squash"` } type Legacy struct { - // URL is the URL of the legacy alertmanager. - URL *url.URL `mapstructure:"url"` + // ApiURL is the URL of the legacy signoz alertmanager. + ApiURL string `mapstructure:"api_url"` } func NewConfigFactory() factory.ConfigFactory { @@ -38,14 +40,28 @@ func NewConfigFactory() factory.ConfigFactory { func newConfig() factory.Config { return Config{ - Config: alertmanagerserver.NewConfig(), - Provider: "signoz", + Provider: "legacy", + Legacy: Legacy{ + ApiURL: "http://alertmanager:9093/api", + }, Signoz: Signoz{ PollInterval: 15 * time.Second, + Config: alertmanagerserver.NewConfig(), }, } } func (c Config) Validate() error { + if c.Provider == "legacy" { + if c.Legacy.ApiURL == "" { + return errors.New("api_url is required") + } + + _, err := url.Parse(c.Legacy.ApiURL) + if err != nil { + return fmt.Errorf("api_url %q is invalid: %w", c.Legacy.ApiURL, err) + } + } + return nil } diff --git a/pkg/alertmanager/config_test.go b/pkg/alertmanager/config_test.go new file mode 100644 index 0000000000..435e92efb7 --- /dev/null +++ b/pkg/alertmanager/config_test.go @@ -0,0 +1,48 @@ +package alertmanager + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.signoz.io/signoz/pkg/config" + "go.signoz.io/signoz/pkg/config/envprovider" + "go.signoz.io/signoz/pkg/factory" +) + +func TestNewWithEnvProvider(t *testing.T) { + t.Setenv("SIGNOZ_ALERTMANAGER_PROVIDER", "legacy") + t.Setenv("SIGNOZ_ALERTMANAGER_LEGACY_API__URL", "http://localhost:9093/api") + + conf, err := config.New( + context.Background(), + config.ResolverConfig{ + Uris: []string{"env:"}, + ProviderFactories: []config.ProviderFactory{ + envprovider.NewFactory(), + }, + }, + []factory.ConfigFactory{ + NewConfigFactory(), + }, + ) + require.NoError(t, err) + + actual := &Config{} + err = conf.Unmarshal("alertmanager", actual) + require.NoError(t, err) + + def := NewConfigFactory().New().(Config) + + expected := &Config{ + Provider: "legacy", + Legacy: Legacy{ + ApiURL: "http://localhost:9093/api", + }, + Signoz: def.Signoz, + } + + assert.Equal(t, expected, actual) + assert.NoError(t, actual.Validate()) +} diff --git a/pkg/alertmanager/legacyalertmanager/provider.go b/pkg/alertmanager/legacyalertmanager/provider.go index 93f43a7440..126bc08608 100644 --- a/pkg/alertmanager/legacyalertmanager/provider.go +++ b/pkg/alertmanager/legacyalertmanager/provider.go @@ -7,8 +7,10 @@ import ( "fmt" "io" "net/http" + "net/url" "time" + "github.com/tidwall/gjson" "go.signoz.io/signoz/pkg/alertmanager" "go.signoz.io/signoz/pkg/alertmanager/alertmanagerbatcher" "go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore" @@ -17,6 +19,11 @@ import ( "go.signoz.io/signoz/pkg/types/alertmanagertypes" ) +type postableAlert struct { + *alertmanagertypes.PostableAlert + Receivers []string `json:"receivers"` +} + const ( alertsPath string = "/v1/alerts" routesPath string = "/v1/routes" @@ -29,6 +36,7 @@ type provider struct { client *http.Client configStore alertmanagertypes.ConfigStore batcher *alertmanagerbatcher.Batcher + url *url.URL } func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] { @@ -41,6 +49,11 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/legacyalertmanager") configStore := sqlalertmanagerstore.NewConfigStore(sqlstore) + url, err := url.Parse(config.Legacy.ApiURL) + if err != nil { + return nil, err + } + return &provider{ config: config, settings: settings, @@ -49,6 +62,7 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config }, configStore: configStore, batcher: alertmanagerbatcher.New(settings.Logger(), alertmanagerbatcher.NewConfig()), + url: url, }, nil } @@ -57,22 +71,18 @@ func (provider *provider) Start(ctx context.Context) error { if err != nil { return err } - defer provider.batcher.Stop(ctx) - for { - select { - case <-ctx.Done(): - return nil - case alerts := <-provider.batcher.C: - if err := provider.putAlerts(ctx, "", alerts); err != nil { - provider.settings.Logger().Error("failed to send alerts to alertmanager", "error", err) - } + for alerts := range provider.batcher.C { + if err := provider.putAlerts(ctx, "", alerts); err != nil { + provider.settings.Logger().Error("failed to send alerts to alertmanager", "error", err) } } + + return nil } -func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) { - url := provider.config.Legacy.URL.JoinPath(alertsPath) +func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.DeprecatedGettableAlerts, error) { + url := provider.url.JoinPath(alertsPath) url.RawQuery = params.RawQuery req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil) @@ -92,23 +102,45 @@ func (provider *provider) GetAlerts(ctx context.Context, orgID string, params al return nil, err } - var alerts alertmanagertypes.GettableAlerts - if err := json.Unmarshal(body, &alerts); err != nil { + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("bad response status %v", resp.Status) + } + + var alerts alertmanagertypes.DeprecatedGettableAlerts + if err := json.Unmarshal([]byte(gjson.GetBytes(body, "data").Raw), &alerts); err != nil { return nil, err } return alerts, nil } -func (provider *provider) PutAlerts(ctx context.Context, _ string, alerts alertmanagertypes.PostableAlerts) error { - provider.batcher.Send(ctx, alerts...) +func (provider *provider) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error { + provider.batcher.Add(ctx, alerts...) return nil } -func (provider *provider) putAlerts(ctx context.Context, _ string, alerts alertmanagertypes.PostableAlerts) error { - url := provider.config.Legacy.URL.JoinPath(alertsPath) +func (provider *provider) putAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error { + cfg, err := provider.configStore.Get(ctx, orgID) + if err != nil { + return err + } - body, err := json.Marshal(alerts) + legacyAlerts := make([]postableAlert, len(alerts)) + for i, alert := range alerts { + receivers, err := cfg.ReceiverNamesFromRuleID(alert.Alert.Labels["ruleID"]) + if err != nil { + return err + } + + legacyAlerts[i] = postableAlert{ + PostableAlert: alert, + Receivers: receivers, + } + } + + url := provider.url.JoinPath(alertsPath) + + body, err := json.Marshal(legacyAlerts) if err != nil { return err } @@ -135,7 +167,7 @@ func (provider *provider) putAlerts(ctx context.Context, _ string, alerts alertm } func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error { - url := provider.config.Legacy.URL.JoinPath(testReceiverPath) + url := provider.url.JoinPath(testReceiverPath) body, err := json.Marshal(receiver) if err != nil { @@ -163,49 +195,65 @@ func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiv return nil } -func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) { - config, err := provider.configStore.Get(ctx, orgID) +func (provider *provider) TestAlert(ctx context.Context, orgID string, alert *alertmanagertypes.PostableAlert, receivers []string) error { + url := provider.url.JoinPath(alertsPath) + + legacyAlert := postableAlert{ + PostableAlert: alert, + Receivers: receivers, + } + + body, err := json.Marshal(legacyAlert) if err != nil { - return nil, err + return err } - channels := config.Channels() - channelList := make([]*alertmanagertypes.Channel, 0, len(channels)) - for _, channel := range channels { - channelList = append(channelList, channel) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url.String(), bytes.NewBuffer(body)) + if err != nil { + return err + } + req.Header.Add("Content-Type", "application/json") + + resp, err := provider.client.Do(req) + if err != nil { + return err } - return channelList, nil + defer resp.Body.Close() //nolint:errcheck + + // Any HTTP status 2xx is OK. + if resp.StatusCode/100 != 2 { + return fmt.Errorf("bad response status %v", resp.Status) + } + + return nil +} + +func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) { + return provider.configStore.ListChannels(ctx, orgID) +} + +func (provider *provider) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) { + return provider.configStore.ListAllChannels(ctx) } func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) { - config, err := provider.configStore.Get(ctx, orgID) - if err != nil { - return nil, err - } - - channels := config.Channels() - channel, err := alertmanagertypes.GetChannelByID(channels, channelID) - if err != nil { - return nil, err - } - - return channel, nil + return provider.configStore.GetChannelByID(ctx, orgID, channelID) } -func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error { - config, err := provider.configStore.Get(ctx, orgID) +func (provider *provider) UpdateChannelByReceiverAndID(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver, id int) error { + channel, err := provider.configStore.GetChannelByID(ctx, orgID, id) if err != nil { return err } - err = config.UpdateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver) + err = channel.Update(receiver) if err != nil { return err } - err = provider.configStore.Set(ctx, config, func(ctx context.Context) error { - url := provider.config.Legacy.URL.JoinPath(routesPath) + err = provider.configStore.UpdateChannel(ctx, orgID, channel, func(ctx context.Context) error { + url := provider.url.JoinPath(routesPath) body, err := json.Marshal(receiver) if err != nil { @@ -240,18 +288,10 @@ func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID str } func (provider *provider) CreateChannel(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error { - config, err := provider.configStore.Get(ctx, orgID) - if err != nil { - return err - } + channel := alertmanagertypes.NewChannelFromReceiver(receiver, orgID) - err = config.CreateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver) - if err != nil { - return err - } - - err = provider.configStore.Set(ctx, config, func(ctx context.Context) error { - url := provider.config.Legacy.URL.JoinPath(routesPath) + err := provider.configStore.CreateChannel(ctx, channel, func(ctx context.Context) error { + url := provider.url.JoinPath(routesPath) body, err := json.Marshal(receiver) if err != nil { @@ -286,24 +326,13 @@ func (provider *provider) CreateChannel(ctx context.Context, orgID string, recei } func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, channelID int) error { - config, err := provider.configStore.Get(ctx, orgID) - if err != nil { - return err - } + err := provider.configStore.DeleteChannelByID(ctx, orgID, channelID, func(ctx context.Context) error { + channel, err := provider.configStore.GetChannelByID(ctx, orgID, channelID) + if err != nil { + return err + } - channels := config.Channels() - channel, err := alertmanagertypes.GetChannelByID(channels, channelID) - if err != nil { - return err - } - - err = config.DeleteReceiver(channel.Name) - if err != nil { - return err - } - - err = provider.configStore.Set(ctx, config, func(ctx context.Context) error { - url := provider.config.Legacy.URL.JoinPath(routesPath) + url := provider.url.JoinPath(routesPath) body, err := json.Marshal(map[string]string{"name": channel.Name}) if err != nil { @@ -337,6 +366,15 @@ func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, c return nil } +func (provider *provider) SetConfig(ctx context.Context, config *alertmanagertypes.Config) error { + return provider.configStore.Set(ctx, config) +} + func (provider *provider) Stop(ctx context.Context) error { + provider.batcher.Stop(ctx) return nil } + +func (provider *provider) GetConfig(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) { + return provider.configStore.Get(ctx, orgID) +} diff --git a/pkg/alertmanager/service.go b/pkg/alertmanager/service.go index 447c8e862a..2c8f59049f 100644 --- a/pkg/alertmanager/service.go +++ b/pkg/alertmanager/service.go @@ -12,7 +12,7 @@ import ( type Service struct { // config is the config for the alertmanager service - config Config + config alertmanagerserver.Config // stateStore is the state store for the alertmanager service stateStore alertmanagertypes.StateStore @@ -30,7 +30,7 @@ type Service struct { serversMtx sync.RWMutex } -func New(ctx context.Context, settings factory.ScopedProviderSettings, config Config, stateStore alertmanagertypes.StateStore, configStore alertmanagertypes.ConfigStore) *Service { +func New(ctx context.Context, settings factory.ScopedProviderSettings, config alertmanagerserver.Config, stateStore alertmanagertypes.StateStore, configStore alertmanagertypes.ConfigStore) *Service { service := &Service{ config: config, stateStore: stateStore, @@ -53,13 +53,23 @@ func (service *Service) SyncServers(ctx context.Context) error { for _, orgID := range orgIDs { config, err := service.getConfig(ctx, orgID) if err != nil { - service.settings.Logger().Error("failed to get alertmanagerconfig for org", "orgID", orgID, "error", err) + service.settings.Logger().Error("failed to get alertmanager config for org", "orgID", orgID, "error", err) continue } - service.servers[orgID], err = alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config.Config, orgID, service.stateStore) - if err != nil { - service.settings.Logger().Error("failed to create alertmanagerserver", "orgID", orgID, "error", err) + // If the server is not present, create it and sync the config + if _, ok := service.servers[orgID]; !ok { + server, err := service.newServer(ctx, orgID) + if err != nil { + service.settings.Logger().Error("failed to create alertmanager server", "orgID", orgID, "error", err) + continue + } + + service.servers[orgID] = server + } + + if service.servers[orgID].Hash() == config.StoreableConfig().Hash { + service.settings.Logger().Debug("skipping alertmanager sync for org", "orgID", orgID, "hash", config.StoreableConfig().Hash) continue } @@ -74,13 +84,18 @@ func (service *Service) SyncServers(ctx context.Context) error { return nil } -func (service *Service) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) { +func (service *Service) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.DeprecatedGettableAlerts, error) { server, err := service.getServer(orgID) if err != nil { return nil, err } - return server.GetAlerts(ctx, params) + alerts, err := server.GetAlerts(ctx, params) + if err != nil { + return nil, err + } + + return alertmanagertypes.NewDeprecatedGettableAlertsFromGettableAlerts(alerts), nil } func (service *Service) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error { @@ -101,6 +116,15 @@ func (service *Service) TestReceiver(ctx context.Context, orgID string, receiver return server.TestReceiver(ctx, receiver) } +func (service *Service) TestAlert(ctx context.Context, orgID string, alert *alertmanagertypes.PostableAlert, receivers []string) error { + server, err := service.getServer(orgID) + if err != nil { + return err + } + + return server.TestAlert(ctx, alert, receivers) +} + func (service *Service) Stop(ctx context.Context) error { for _, server := range service.servers { server.Stop(ctx) @@ -109,6 +133,36 @@ func (service *Service) Stop(ctx context.Context) error { return nil } +func (service *Service) newServer(ctx context.Context, orgID string) (*alertmanagerserver.Server, error) { + config, err := service.getConfig(ctx, orgID) + if err != nil { + return nil, err + } + + server, err := alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config, orgID, service.stateStore) + if err != nil { + return nil, err + } + + beforeCompareAndSelectHash := config.StoreableConfig().Hash + config, err = service.compareAndSelectConfig(ctx, config) + if err != nil { + return nil, err + } + + if beforeCompareAndSelectHash == config.StoreableConfig().Hash { + service.settings.Logger().Debug("skipping config store update for org", "orgID", orgID, "hash", config.StoreableConfig().Hash) + return server, nil + } + + err = service.configStore.Set(ctx, config) + if err != nil { + return nil, err + } + + return server, nil +} + func (service *Service) getConfig(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) { config, err := service.configStore.Get(ctx, orgID) if err != nil { @@ -121,13 +175,55 @@ func (service *Service) getConfig(ctx context.Context, orgID string) (*alertmana return nil, err } - return config, err + config.SetGlobalConfig(service.config.Global) + if config.AlertmanagerConfig().Route == nil { + config.SetRouteConfig(service.config.Route) + } else { + config.UpdateRouteConfig(service.config.Route) + } } return config, nil } +// compareAndSelectConfig compares the existing config with the config derived from channels. +// If the hash of the config and the channels mismatch, the config derived from channels is returned. +func (service *Service) compareAndSelectConfig(ctx context.Context, incomingConfig *alertmanagertypes.Config) (*alertmanagertypes.Config, error) { + channels, err := service.configStore.ListChannels(ctx, incomingConfig.StoreableConfig().OrgID) + if err != nil { + return nil, err + } + + matchers, err := service.configStore.GetMatchers(ctx, incomingConfig.StoreableConfig().OrgID) + if err != nil { + return nil, err + } + + config, err := alertmanagertypes.NewConfigFromChannels(service.config.Global, service.config.Route, channels, incomingConfig.StoreableConfig().OrgID) + if err != nil { + return nil, err + } + + for ruleID, receivers := range matchers { + err = config.CreateRuleIDMatcher(ruleID, receivers) + if err != nil { + return nil, err + } + } + + if incomingConfig.StoreableConfig().Hash != config.StoreableConfig().Hash { + service.settings.Logger().InfoContext(ctx, "mismatch found, updating config to match channels and matchers") + return config, nil + } + + return incomingConfig, nil + +} + func (service *Service) getServer(orgID string) (*alertmanagerserver.Server, error) { + service.serversMtx.RLock() + defer service.serversMtx.RUnlock() + server, ok := service.servers[orgID] if !ok { return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for org %s", orgID) diff --git a/pkg/alertmanager/signozalertmanager/provider.go b/pkg/alertmanager/signozalertmanager/provider.go index 22631ef367..c472b94970 100644 --- a/pkg/alertmanager/signozalertmanager/provider.go +++ b/pkg/alertmanager/signozalertmanager/provider.go @@ -6,6 +6,7 @@ import ( "go.signoz.io/signoz/pkg/alertmanager" "go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore" + "go.signoz.io/signoz/pkg/errors" "go.signoz.io/signoz/pkg/factory" "go.signoz.io/signoz/pkg/sqlstore" "go.signoz.io/signoz/pkg/types/alertmanagertypes" @@ -17,6 +18,7 @@ type provider struct { settings factory.ScopedProviderSettings configStore alertmanagertypes.ConfigStore stateStore alertmanagertypes.StateStore + stopC chan struct{} } func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] { @@ -26,15 +28,15 @@ func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager } func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore) (*provider, error) { - settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/internalalertmanager") + settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/signozalertmanager") configStore := sqlalertmanagerstore.NewConfigStore(sqlstore) stateStore := sqlalertmanagerstore.NewStateStore(sqlstore) - return &provider{ + p := &provider{ service: alertmanager.New( ctx, settings, - config, + config.Signoz.Config, stateStore, configStore, ), @@ -42,15 +44,23 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config config: config, configStore: configStore, stateStore: stateStore, - }, nil + stopC: make(chan struct{}), + } + + return p, nil } func (provider *provider) Start(ctx context.Context) error { + if err := provider.service.SyncServers(ctx); err != nil { + provider.settings.Logger().ErrorContext(ctx, "failed to sync alertmanager servers", "error", err) + return err + } + ticker := time.NewTicker(provider.config.Signoz.PollInterval) defer ticker.Stop() for { select { - case <-ctx.Done(): + case <-provider.stopC: return nil case <-ticker.C: if err := provider.service.SyncServers(ctx); err != nil { @@ -61,10 +71,11 @@ func (provider *provider) Start(ctx context.Context) error { } func (provider *provider) Stop(ctx context.Context) error { + close(provider.stopC) return provider.service.Stop(ctx) } -func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) { +func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.DeprecatedGettableAlerts, error) { return provider.service.GetAlerts(ctx, orgID, params) } @@ -76,78 +87,68 @@ func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiv return provider.service.TestReceiver(ctx, orgID, receiver) } +func (provider *provider) TestAlert(ctx context.Context, orgID string, alert *alertmanagertypes.PostableAlert, receivers []string) error { + return provider.service.TestAlert(ctx, orgID, alert, receivers) +} + func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) { - config, err := provider.configStore.Get(ctx, orgID) - if err != nil { - return nil, err - } + return provider.configStore.ListChannels(ctx, orgID) +} - channels := config.Channels() - channelList := make([]*alertmanagertypes.Channel, 0, len(channels)) - for _, channel := range channels { - channelList = append(channelList, channel) - } - - return channelList, nil +func (provider *provider) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) { + return nil, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "not supported by provider signoz") } func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) { - config, err := provider.configStore.Get(ctx, orgID) - if err != nil { - return nil, err - } - - channels := config.Channels() - channel, err := alertmanagertypes.GetChannelByID(channels, channelID) - if err != nil { - return nil, err - } - - return channel, nil + return provider.configStore.GetChannelByID(ctx, orgID, channelID) } -func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error { +func (provider *provider) UpdateChannelByReceiverAndID(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver, id int) error { + channel, err := provider.configStore.GetChannelByID(ctx, orgID, id) + if err != nil { + return err + } + config, err := provider.configStore.Get(ctx, orgID) if err != nil { return err } - err = config.UpdateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver) - if err != nil { + if err := config.UpdateReceiver(receiver); err != nil { return err } - err = provider.configStore.Set(ctx, config, nil) - if err != nil { + if err := provider.configStore.Set(ctx, config); err != nil { return err } - return nil + if err := channel.Update(receiver); err != nil { + return err + } + + return provider.configStore.UpdateChannel(ctx, orgID, channel, alertmanagertypes.ConfigStoreNoopCallback) } func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, channelID int) error { + channel, err := provider.configStore.GetChannelByID(ctx, orgID, channelID) + if err != nil { + return err + } + config, err := provider.configStore.Get(ctx, orgID) if err != nil { return err } - channels := config.Channels() - channel, err := alertmanagertypes.GetChannelByID(channels, channelID) - if err != nil { + if err := config.DeleteReceiver(channel.Name); err != nil { return err } - err = config.DeleteReceiver(channel.Name) - if err != nil { + if err := provider.configStore.Set(ctx, config); err != nil { return err } - err = provider.configStore.Set(ctx, config, nil) - if err != nil { - return err - } - - return nil + return provider.configStore.DeleteChannelByID(ctx, orgID, channelID, alertmanagertypes.ConfigStoreNoopCallback) } func (provider *provider) CreateChannel(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error { @@ -156,15 +157,22 @@ func (provider *provider) CreateChannel(ctx context.Context, orgID string, recei return err } - err = config.CreateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver) - if err != nil { + if err := config.CreateReceiver(receiver); err != nil { return err } - err = provider.configStore.Set(ctx, config, nil) - if err != nil { + if err := provider.configStore.Set(ctx, config); err != nil { return err } - return nil + channel := alertmanagertypes.NewChannelFromReceiver(receiver, orgID) + return provider.configStore.CreateChannel(ctx, channel, alertmanagertypes.ConfigStoreNoopCallback) +} + +func (provider *provider) SetConfig(ctx context.Context, config *alertmanagertypes.Config) error { + return provider.configStore.Set(ctx, config) +} + +func (provider *provider) GetConfig(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) { + return provider.configStore.Get(ctx, orgID) } diff --git a/pkg/sqlmigrator/migrator.go b/pkg/sqlmigrator/migrator.go index 244e79c599..37a3eeb4e1 100644 --- a/pkg/sqlmigrator/migrator.go +++ b/pkg/sqlmigrator/migrator.go @@ -24,7 +24,15 @@ type migrator struct { func New(ctx context.Context, providerSettings factory.ProviderSettings, sqlstore sqlstore.SQLStore, migrations *migrate.Migrations, config Config) SQLMigrator { return &migrator{ - migrator: migrate.NewMigrator(sqlstore.BunDB(), migrations, migrate.WithTableName(migrationTableName), migrate.WithLocksTableName(migrationLockTableName)), + migrator: migrate.NewMigrator( + sqlstore.BunDB(), + migrations, + migrate.WithTableName(migrationTableName), + migrate.WithLocksTableName(migrationLockTableName), + // This is to ensure that the migration is marked as applied only on success. If the migration fails, no entry is made in the migration table + // and the migration will be retried. + migrate.WithMarkAppliedOnSuccess(true), + ), settings: factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/sqlmigrator"), config: config, dialect: sqlstore.BunDB().Dialect().Name().String(), diff --git a/pkg/types/alertmanagertypes/alert.go b/pkg/types/alertmanagertypes/alert.go index 6008a2a1d3..4066106026 100644 --- a/pkg/types/alertmanagertypes/alert.go +++ b/pkg/types/alertmanagertypes/alert.go @@ -22,6 +22,9 @@ import ( ) type ( + // An alias for the Alert type from the alertmanager package. + AlertModel = models.Alert + // An alias for the Alert type from the alertmanager package. Alert = types.Alert @@ -38,12 +41,51 @@ type ( GettableAlerts = models.GettableAlerts ) +type DeprecatedGettableAlert struct { + *model.Alert + Status types.AlertStatus `json:"status"` + Receivers []string `json:"receivers"` + Fingerprint string `json:"fingerprint"` +} + +type DeprecatedGettableAlerts = []*DeprecatedGettableAlert + // An alias for the GettableAlertsParams type from the alertmanager package. type GettableAlertsParams struct { alert.GetAlertsParams RawQuery string } +func NewDeprecatedGettableAlertsFromGettableAlerts(gettableAlerts GettableAlerts) DeprecatedGettableAlerts { + deprecatedGettableAlerts := make(DeprecatedGettableAlerts, 0, len(gettableAlerts)) + + for _, gettableAlert := range gettableAlerts { + receivers := make([]string, 0, len(gettableAlert.Receivers)) + for _, receiver := range gettableAlert.Receivers { + receivers = append(receivers, *receiver.Name) + } + + deprecatedGettableAlerts = append(deprecatedGettableAlerts, &DeprecatedGettableAlert{ + Alert: &model.Alert{ + Labels: v2.APILabelSetToModelLabelSet(gettableAlert.Labels), + Annotations: v2.APILabelSetToModelLabelSet(gettableAlert.Annotations), + StartsAt: time.Time(*gettableAlert.StartsAt), + EndsAt: time.Time(*gettableAlert.EndsAt), + GeneratorURL: string(gettableAlert.GeneratorURL), + }, + Status: types.AlertStatus{ + State: types.AlertState(*gettableAlert.Status.State), + SilencedBy: gettableAlert.Status.SilencedBy, + InhibitedBy: gettableAlert.Status.InhibitedBy, + }, + Fingerprint: *gettableAlert.Fingerprint, + Receivers: receivers, + }) + } + + return deprecatedGettableAlerts +} + // Converts a slice of Alert to a slice of PostableAlert. func NewPostableAlertsFromAlerts(alerts []*types.Alert) PostableAlerts { postableAlerts := make(PostableAlerts, 0, len(alerts)) diff --git a/pkg/types/alertmanagertypes/channel.go b/pkg/types/alertmanagertypes/channel.go index 48f86ff6ba..a6df3614d9 100644 --- a/pkg/types/alertmanagertypes/channel.go +++ b/pkg/types/alertmanagertypes/channel.go @@ -12,7 +12,8 @@ import ( ) var ( - ErrCodeAlertmanagerChannelNotFound = errors.MustNewCode("alertmanager_channel_not_found") + ErrCodeAlertmanagerChannelNotFound = errors.MustNewCode("alertmanager_channel_not_found") + ErrCodeAlertmanagerChannelNameMismatch = errors.MustNewCode("alertmanager_channel_name_mismatch") ) var ( @@ -20,7 +21,7 @@ var ( receiverTypeRegex = regexp.MustCompile(`^(.+)_configs`) ) -type Channels = map[string]*Channel +type Channels = []*Channel type GettableChannels = []*Channel @@ -104,20 +105,6 @@ func NewReceiverFromChannel(channel *Channel) (Receiver, error) { return receiver, nil } -func NewChannelsFromConfig(c *config.Config, orgID string) Channels { - channels := Channels{} - for _, receiver := range c.Receivers { - channel := NewChannelFromReceiver(receiver, orgID) - if channel == nil { - continue - } - - channels[channel.Name] = channel - } - - return channels -} - func NewConfigFromChannels(globalConfig GlobalConfig, routeConfig RouteConfig, channels Channels, orgID string) (*Config, error) { cfg, err := NewDefaultConfig( globalConfig, @@ -134,7 +121,7 @@ func NewConfigFromChannels(globalConfig GlobalConfig, routeConfig RouteConfig, c return nil, err } - err = cfg.CreateReceiver(&config.Route{Receiver: channel.Name, Continue: true}, receiver) + err = cfg.CreateReceiver(receiver) if err != nil { return nil, err } @@ -143,12 +130,38 @@ func NewConfigFromChannels(globalConfig GlobalConfig, routeConfig RouteConfig, c return cfg, nil } -func GetChannelByID(channels Channels, id int) (*Channel, error) { - for _, channel := range channels { +func GetChannelByID(channels Channels, id int) (int, *Channel, error) { + for i, channel := range channels { if channel.ID == id { - return channel, nil + return i, channel, nil } } - return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", id) + return 0, nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", id) +} + +func GetChannelByName(channels Channels, name string) (int, *Channel, error) { + for i, channel := range channels { + if channel.Name == name { + return i, channel, nil + } + } + + return 0, nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with name %s", name) +} + +func (c *Channel) Update(receiver Receiver) error { + channel := NewChannelFromReceiver(receiver, c.OrgID) + if channel == nil { + return errors.Newf(errors.TypeInvalidInput, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", c.ID) + } + + if c.Name != channel.Name { + return errors.Newf(errors.TypeInvalidInput, ErrCodeAlertmanagerChannelNameMismatch, "cannot update channel name") + } + + c.Data = channel.Data + c.UpdatedAt = time.Now() + + return nil } diff --git a/pkg/types/alertmanagertypes/channel_test.go b/pkg/types/alertmanagertypes/channel_test.go index de18ac0232..c7623c819e 100644 --- a/pkg/types/alertmanagertypes/channel_test.go +++ b/pkg/types/alertmanagertypes/channel_test.go @@ -10,91 +10,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestNewChannelsFromAlertmanagerConfig(t *testing.T) { - testCases := []struct { - name string - orgID string - alertmanagerConfig *config.Config - expectedChannels Channels - }{ - { - name: "DefaultReceiver", - orgID: "1", - alertmanagerConfig: &config.Config{ - Receivers: []config.Receiver{ - { - Name: DefaultReceiverName, - }, - }, - }, - expectedChannels: Channels{}, - }, - { - name: "OneEmailReceiver", - orgID: "1", - alertmanagerConfig: &config.Config{ - Receivers: []config.Receiver{ - { - Name: "email-receiver", - EmailConfigs: []*config.EmailConfig{ - { - To: "test@example.com", - }, - }, - }, - }, - }, - expectedChannels: Channels{ - "email-receiver": { - Name: "email-receiver", - Type: "email", - Data: `{"name":"email-receiver","email_configs":[{"send_resolved":false,"to":"test@example.com","smarthost":""}]}`, - OrgID: "1", - }, - }, - }, - { - name: "OneSlackReceiver", - orgID: "1", - alertmanagerConfig: &config.Config{ - Receivers: []config.Receiver{ - { - Name: "slack-receiver", - SlackConfigs: []*config.SlackConfig{ - { - Channel: "#alerts", - }, - }, - }, - }, - }, - expectedChannels: Channels{ - "slack-receiver": { - Name: "slack-receiver", - Type: "slack", - Data: `{"name":"slack-receiver","slack_configs":[{"send_resolved":false,"channel":"#alerts"}]}`, - OrgID: "1", - }, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - actualChannels := NewChannelsFromConfig(tc.alertmanagerConfig, tc.orgID) - assert.Equal(t, len(tc.expectedChannels), len(actualChannels)) - for _, channel := range actualChannels { - expectedChannel, ok := tc.expectedChannels[channel.Name] - assert.True(t, ok) - assert.Equal(t, expectedChannel.Name, channel.Name) - assert.Equal(t, expectedChannel.Type, channel.Type) - assert.Equal(t, expectedChannel.Data, channel.Data) - assert.Equal(t, expectedChannel.OrgID, channel.OrgID) - } - }) - } -} - func TestNewConfigFromChannels(t *testing.T) { testCases := []struct { name string @@ -105,7 +20,7 @@ func TestNewConfigFromChannels(t *testing.T) { { name: "OneEmailChannel", channels: Channels{ - "email-receiver": { + { Name: "email-receiver", Type: "email", Data: `{"name":"email-receiver","email_configs":[{"to":"test@example.com"}]}`, @@ -117,7 +32,7 @@ func TestNewConfigFromChannels(t *testing.T) { { name: "OneSlackChannel", channels: Channels{ - "slack-receiver": { + { Name: "slack-receiver", Type: "slack", Data: `{"name":"slack-receiver","slack_configs":[{"channel":"#alerts","api_url":"https://slack.com/api/test","send_resolved":true}]}`, @@ -129,7 +44,7 @@ func TestNewConfigFromChannels(t *testing.T) { { name: "OnePagerdutyChannel", channels: Channels{ - "pagerduty-receiver": { + { Name: "pagerduty-receiver", Type: "pagerduty", Data: `{"name":"pagerduty-receiver","pagerduty_configs":[{"service_key":"test"}]}`, @@ -141,12 +56,12 @@ func TestNewConfigFromChannels(t *testing.T) { { name: "OnePagerdutyAndOneSlackChannel", channels: Channels{ - "pagerduty-receiver": { + { Name: "pagerduty-receiver", Type: "pagerduty", Data: `{"name":"pagerduty-receiver","pagerduty_configs":[{"service_key":"test"}]}`, }, - "slack-receiver": { + { Name: "slack-receiver", Type: "slack", Data: `{"name":"slack-receiver","slack_configs":[{"channel":"#alerts","api_url":"https://slack.com/api/test","send_resolved":true}]}`, diff --git a/pkg/types/alertmanagertypes/config.go b/pkg/types/alertmanagertypes/config.go index a1643845f9..03b2634de4 100644 --- a/pkg/types/alertmanagertypes/config.go +++ b/pkg/types/alertmanagertypes/config.go @@ -5,10 +5,13 @@ import ( "crypto/md5" "encoding/json" "fmt" + "slices" "time" "dario.cat/mergo" "github.com/prometheus/alertmanager/config" + "github.com/prometheus/alertmanager/pkg/labels" + commoncfg "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/uptrace/bun" "go.signoz.io/signoz/pkg/errors" @@ -41,6 +44,7 @@ type StoreableConfig struct { ID uint64 `bun:"id,pk,autoincrement"` Config string `bun:"config"` + Hash string `bun:"hash"` CreatedAt time.Time `bun:"created_at"` UpdatedAt time.Time `bun:"updated_at"` OrgID string `bun:"org_id"` @@ -53,25 +57,19 @@ type Config struct { // storeableConfig is the representation of the config in the store storeableConfig *StoreableConfig - - // channels is the list of channels - channels Channels - - // orgID is the organization ID - orgID string } func NewConfig(c *config.Config, orgID string) *Config { - channels := NewChannelsFromConfig(c, orgID) + raw := string(newRawFromConfig(c)) return &Config{ alertmanagerConfig: c, storeableConfig: &StoreableConfig{ - Config: string(newRawFromConfig(c)), + Config: raw, + Hash: fmt.Sprintf("%x", newConfigHash(raw)), CreatedAt: time.Now(), UpdatedAt: time.Now(), OrgID: orgID, }, - channels: channels, } } @@ -81,20 +79,12 @@ func NewConfigFromStoreableConfig(sc *StoreableConfig) (*Config, error) { return nil, err } - channels := NewChannelsFromConfig(alertmanagerConfig, sc.OrgID) - return &Config{ alertmanagerConfig: alertmanagerConfig, storeableConfig: sc, - channels: channels, - orgID: sc.OrgID, }, nil } -func NewRouteFromReceiver(receiver Receiver) *config.Route { - return &config.Route{Receiver: receiver.Name, Continue: true} -} - func NewDefaultConfig(globalConfig GlobalConfig, routeConfig RouteConfig, orgID string) (*Config, error) { err := mergo.Merge(&globalConfig, config.DefaultGlobalConfig()) if err != nil { @@ -134,6 +124,43 @@ func newRawFromConfig(c *config.Config) []byte { return b } +func newConfigHash(s string) [16]byte { + return md5.Sum([]byte(s)) +} + +func (c *Config) SetGlobalConfig(globalConfig GlobalConfig) { + c.alertmanagerConfig.Global = &globalConfig + c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig)) + c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config)) + c.storeableConfig.UpdatedAt = time.Now() +} + +func (c *Config) SetRouteConfig(routeConfig RouteConfig) { + c.alertmanagerConfig.Route = &config.Route{ + Receiver: DefaultReceiverName, + GroupByStr: routeConfig.GroupByStr, + GroupInterval: (*model.Duration)(&routeConfig.GroupInterval), + GroupWait: (*model.Duration)(&routeConfig.GroupWait), + RepeatInterval: (*model.Duration)(&routeConfig.RepeatInterval), + } + c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig)) + c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config)) + c.storeableConfig.UpdatedAt = time.Now() +} + +func (c *Config) UpdateRouteConfig(routeConfig RouteConfig) { + for _, route := range c.alertmanagerConfig.Route.Routes { + route.GroupByStr = routeConfig.GroupByStr + route.GroupInterval = (*model.Duration)(&routeConfig.GroupInterval) + route.GroupWait = (*model.Duration)(&routeConfig.GroupWait) + route.RepeatInterval = (*model.Duration)(&routeConfig.RepeatInterval) + } + + c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig)) + c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config)) + c.storeableConfig.UpdatedAt = time.Now() +} + func (c *Config) AlertmanagerConfig() *config.Config { return c.alertmanagerConfig } @@ -142,65 +169,43 @@ func (c *Config) StoreableConfig() *StoreableConfig { return c.storeableConfig } -func (c *Config) Channels() Channels { - return c.channels -} - -func (c *Config) OrgID() string { - return c.orgID -} - -func (c *Config) Raw() []byte { - return newRawFromConfig(c.alertmanagerConfig) -} - -func (c *Config) Hash() [16]byte { - return md5.Sum(newRawFromConfig(c.alertmanagerConfig)) -} - -func (c *Config) CreateReceiver(route *config.Route, receiver config.Receiver) error { - if route == nil { - return errors.New(errors.TypeInvalidInput, ErrCodeAlertmanagerConfigInvalid, "route is nil") - } - - if route.Receiver == "" || receiver.Name == "" { +func (c *Config) CreateReceiver(receiver config.Receiver) error { + if receiver.Name == "" { return errors.New(errors.TypeInvalidInput, ErrCodeAlertmanagerConfigInvalid, "receiver is mandatory in route and receiver") } // check that receiver name is not already used for _, existingReceiver := range c.alertmanagerConfig.Receivers { - if existingReceiver.Name == receiver.Name || existingReceiver.Name == route.Receiver { + if existingReceiver.Name == receiver.Name { return errors.New(errors.TypeInvalidInput, ErrCodeAlertmanagerConfigConflict, "the receiver name has to be unique, please choose a different name") } } - // must set continue on route to allow subsequent - // routes to work. may have to rethink on this after - // adding matchers and filters in upstream - route.Continue = true - c.alertmanagerConfig.Route.Routes = append(c.alertmanagerConfig.Route.Routes, route) + c.alertmanagerConfig.Route.Routes = append(c.alertmanagerConfig.Route.Routes, newRouteFromReceiver(receiver)) c.alertmanagerConfig.Receivers = append(c.alertmanagerConfig.Receivers, receiver) if err := c.alertmanagerConfig.UnmarshalYAML(func(i interface{}) error { return nil }); err != nil { return err } - channel := NewChannelFromReceiver(receiver, c.orgID) - if channel != nil { - c.channels[channel.Name] = channel - } - c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig)) + c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config)) c.storeableConfig.UpdatedAt = time.Now() return nil } -func (c *Config) UpdateReceiver(route *config.Route, receiver config.Receiver) error { - if route == nil { - return errors.New(errors.TypeInvalidInput, ErrCodeAlertmanagerConfigInvalid, "route is nil") +func (c *Config) GetReceiver(name string) (Receiver, error) { + for _, receiver := range c.alertmanagerConfig.Receivers { + if receiver.Name == name { + return receiver, nil + } } - if route.Receiver == "" || receiver.Name == "" { + return Receiver{}, errors.Newf(errors.TypeInvalidInput, ErrCodeAlertmanagerChannelNotFound, "channel with name %q not found", name) +} + +func (c *Config) UpdateReceiver(receiver config.Receiver) error { + if receiver.Name == "" { return errors.New(errors.TypeInvalidInput, ErrCodeAlertmanagerConfigInvalid, "receiver is mandatory in route and receiver") } @@ -208,24 +213,12 @@ func (c *Config) UpdateReceiver(route *config.Route, receiver config.Receiver) e for i, existingReceiver := range c.alertmanagerConfig.Receivers { if existingReceiver.Name == receiver.Name { c.alertmanagerConfig.Receivers[i] = receiver - channel := NewChannelFromReceiver(receiver, c.orgID) - if channel != nil { - c.channels[channel.Name] = channel - c.channels[channel.Name].UpdatedAt = time.Now() - } - break - } - } - - routes := c.alertmanagerConfig.Route.Routes - for i, existingRoute := range routes { - if existingRoute.Receiver == route.Receiver { - c.alertmanagerConfig.Route.Routes[i] = route break } } c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig)) + c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config)) c.storeableConfig.UpdatedAt = time.Now() return nil @@ -247,31 +240,121 @@ func (c *Config) DeleteReceiver(name string) error { for i, existingReceiver := range c.alertmanagerConfig.Receivers { if existingReceiver.Name == name { c.alertmanagerConfig.Receivers = append(c.alertmanagerConfig.Receivers[:i], c.alertmanagerConfig.Receivers[i+1:]...) - delete(c.channels, name) break } } c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig)) + c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config)) c.storeableConfig.UpdatedAt = time.Now() return nil } +func (c *Config) CreateRuleIDMatcher(ruleID string, receiverNames []string) error { + if c.alertmanagerConfig.Route == nil { + return errors.New(errors.TypeInvalidInput, ErrCodeAlertmanagerConfigInvalid, "route is nil") + } + + routes := c.alertmanagerConfig.Route.Routes + for i, route := range routes { + if slices.Contains(receiverNames, route.Receiver) { + matcher, err := labels.NewMatcher(labels.MatchEqual, "ruleId", ruleID) + if err != nil { + return err + } + + c.alertmanagerConfig.Route.Routes[i].Matchers = append(c.alertmanagerConfig.Route.Routes[i].Matchers, matcher) + } + } + + c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig)) + c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config)) + c.storeableConfig.UpdatedAt = time.Now() + + return nil +} + +func (c *Config) UpdateRuleIDMatcher(ruleID string, receiverNames []string) error { + err := c.DeleteRuleIDMatcher(ruleID) + if err != nil { + return err + } + + return c.CreateRuleIDMatcher(ruleID, receiverNames) +} + +func (c *Config) DeleteRuleIDMatcher(ruleID string) error { + routes := c.alertmanagerConfig.Route.Routes + for i, r := range routes { + j := slices.IndexFunc(r.Matchers, func(m *labels.Matcher) bool { + return m.Name == "ruleId" && m.Value == ruleID + }) + if j != -1 { + c.alertmanagerConfig.Route.Routes[i].Matchers = slices.Delete(r.Matchers, j, j+1) + } + } + + c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig)) + c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config)) + c.storeableConfig.UpdatedAt = time.Now() + + return nil +} + +func (c *Config) ReceiverNamesFromRuleID(ruleID string) ([]string, error) { + receiverNames := make([]string, 0) + routes := c.alertmanagerConfig.Route.Routes + for _, r := range routes { + for _, m := range r.Matchers { + if m.Name == "ruleId" && m.Value == ruleID { + receiverNames = append(receiverNames, r.Receiver) + } + } + } + + return receiverNames, nil +} + type ConfigStore interface { // Set creates or updates a config. - Set(context.Context, *Config, func(context.Context) error) error + Set(context.Context, *Config) error // Get returns the config for the given orgID Get(context.Context, string) (*Config, error) // ListOrgs returns the list of orgs ListOrgs(context.Context) ([]string, error) + + // CreateChannel creates a new channel. + CreateChannel(context.Context, *Channel, func(context.Context) error) error + + // GetChannelByID returns the channel for the given id. + GetChannelByID(context.Context, string, int) (*Channel, error) + + // UpdateChannel updates a channel. + UpdateChannel(context.Context, string, *Channel, func(context.Context) error) error + + // DeleteChannelByID deletes a channel. + DeleteChannelByID(context.Context, string, int, func(context.Context) error) error + + // ListChannels returns the list of channels. + ListChannels(context.Context, string) ([]*Channel, error) + + // ListAllChannels returns the list of channels for all organizations. + ListAllChannels(context.Context) ([]*Channel, error) + + // GetMatchers gets a list of matchers per organization. + // Matchers is an array of ruleId to receiver names. + GetMatchers(context.Context, string) (map[string][]string, error) } +var ConfigStoreNoopCallback = func(ctx context.Context) error { return nil } + // MarshalSecretValue if set to true will expose Secret type // through the marshal interfaces. We need to store the actual value of the secret // in the database, so we need to set this to true. func init() { + commoncfg.MarshalSecretValue = true config.MarshalSecretValue = true } diff --git a/pkg/types/alertmanagertypes/config_test.go b/pkg/types/alertmanagertypes/config_test.go new file mode 100644 index 0000000000..3e4e1eb129 --- /dev/null +++ b/pkg/types/alertmanagertypes/config_test.go @@ -0,0 +1,215 @@ +package alertmanagertypes + +import ( + "encoding/json" + "net/url" + "testing" + + "github.com/prometheus/alertmanager/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCreateRuleIDMatcher(t *testing.T) { + testCases := []struct { + name string + orgID string + receivers []config.Receiver + ruleIDToReceivers map[string][]string + expectedRoutes []map[string]any + }{ + { + name: "OneSlackReceiver", + orgID: "1", + receivers: []config.Receiver{ + { + Name: "slack-receiver", + SlackConfigs: []*config.SlackConfig{ + { + Channel: "#alerts", + APIURL: &config.SecretURL{URL: &url.URL{Scheme: "https", Host: "slack.com", Path: "/api/test"}}, + }, + }, + }, + }, + ruleIDToReceivers: map[string][]string{"test-rule": {"slack-receiver"}}, + expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}}, + }, + { + name: "SlackAndEmailReceivers", + orgID: "1", + receivers: []config.Receiver{ + { + Name: "slack-receiver", + SlackConfigs: []*config.SlackConfig{ + { + Channel: "#alerts", + APIURL: &config.SecretURL{URL: &url.URL{Scheme: "https", Host: "slack.com", Path: "/api/test"}}, + }, + }, + }, + { + Name: "email-receiver", + EmailConfigs: []*config.EmailConfig{ + { + To: "test@example.com", + }, + }, + }, + }, + ruleIDToReceivers: map[string][]string{"test-rule": {"slack-receiver", "email-receiver"}}, + expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}, {"receiver": "email-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}}, + }, + { + name: "ReceiverDoesNotExist", + orgID: "1", + receivers: []config.Receiver{ + { + Name: "slack-receiver", + SlackConfigs: []*config.SlackConfig{ + { + Channel: "#alerts", + APIURL: &config.SecretURL{URL: &url.URL{Scheme: "https", Host: "slack.com", Path: "/api/test"}}, + }, + }, + }, + }, + ruleIDToReceivers: map[string][]string{"test-rule": {"does-not-exist"}}, + expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true}}, + }, + { + name: "MultipleAlertsOnOneSlackReceiver", + orgID: "1", + receivers: []config.Receiver{ + { + Name: "slack-receiver", + SlackConfigs: []*config.SlackConfig{ + { + Channel: "#alerts", + APIURL: &config.SecretURL{URL: &url.URL{Scheme: "https", Host: "slack.com", Path: "/api/test"}}, + }, + }, + }, + }, + ruleIDToReceivers: map[string][]string{"test-rule-1": {"slack-receiver", "does-not-exist"}, "test-rule-2": {"slack-receiver"}}, + expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule-1\"", "ruleId=\"test-rule-2\""}}}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cfg, err := NewDefaultConfig(GlobalConfig{SMTPSmarthost: config.HostPort{Host: "localhost", Port: "25"}, SMTPFrom: "test@example.com"}, RouteConfig{}, tc.orgID) + require.NoError(t, err) + + for _, receiver := range tc.receivers { + err := cfg.CreateReceiver(receiver) + require.NoError(t, err) + } + + for ruleID, receiverNames := range tc.ruleIDToReceivers { + err = cfg.CreateRuleIDMatcher(ruleID, receiverNames) + assert.NoError(t, err) + } + + routes, err := json.Marshal(cfg.alertmanagerConfig.Route.Routes) + require.NoError(t, err) + var actualRoutes []map[string]any + err = json.Unmarshal(routes, &actualRoutes) + require.NoError(t, err) + assert.ElementsMatch(t, tc.expectedRoutes, actualRoutes) + }) + } +} + +func TestDeleteRuleIDMatcher(t *testing.T) { + testCases := []struct { + name string + orgID string + receivers []config.Receiver + ruleIDToReceivers map[string][]string + ruleIDsToDelete []string + expectedRoutes []map[string]any + }{ + { + name: "DeleteEmailAndSlackMatchers", + orgID: "1", + receivers: []config.Receiver{ + { + Name: "slack-receiver", + SlackConfigs: []*config.SlackConfig{ + { + Channel: "#alerts", + APIURL: &config.SecretURL{URL: &url.URL{Scheme: "https", Host: "slack.com", Path: "/api/test"}}, + }, + }, + }, + { + Name: "email-receiver", + EmailConfigs: []*config.EmailConfig{ + { + To: "test@example.com", + }, + }, + }, + }, + ruleIDToReceivers: map[string][]string{"test-rule": {"email-receiver", "slack-receiver"}}, + ruleIDsToDelete: []string{"test-rule"}, + expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true}, {"receiver": "email-receiver", "continue": true}}, + }, + { + name: "AlertNameDoesNotExist", + orgID: "1", + receivers: []config.Receiver{ + { + Name: "slack-receiver", + SlackConfigs: []*config.SlackConfig{ + { + Channel: "#alerts", + APIURL: &config.SecretURL{URL: &url.URL{Scheme: "https", Host: "slack.com", Path: "/api/test"}}, + }, + }, + }, + { + Name: "email-receiver", + EmailConfigs: []*config.EmailConfig{ + { + To: "test@example.com", + }, + }, + }, + }, + ruleIDToReceivers: map[string][]string{"test-rule": {"email-receiver", "slack-receiver"}}, + ruleIDsToDelete: []string{"does-not-exist"}, + expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}, {"receiver": "email-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cfg, err := NewDefaultConfig(GlobalConfig{SMTPSmarthost: config.HostPort{Host: "localhost", Port: "25"}, SMTPFrom: "test@example.com"}, RouteConfig{}, tc.orgID) + require.NoError(t, err) + + for _, receiver := range tc.receivers { + err := cfg.CreateReceiver(receiver) + require.NoError(t, err) + } + + for ruleID, receiverNames := range tc.ruleIDToReceivers { + err = cfg.CreateRuleIDMatcher(ruleID, receiverNames) + require.NoError(t, err) + } + + for _, ruleID := range tc.ruleIDsToDelete { + err = cfg.DeleteRuleIDMatcher(ruleID) + assert.NoError(t, err) + } + + routes, err := json.Marshal(cfg.alertmanagerConfig.Route.Routes) + require.NoError(t, err) + var actualRoutes []map[string]any + err = json.Unmarshal(routes, &actualRoutes) + require.NoError(t, err) + assert.ElementsMatch(t, tc.expectedRoutes, actualRoutes) + }) + } +} diff --git a/pkg/types/alertmanagertypes/receiver.go b/pkg/types/alertmanagertypes/receiver.go index 12c4383b4e..ab13feb17c 100644 --- a/pkg/types/alertmanagertypes/receiver.go +++ b/pkg/types/alertmanagertypes/receiver.go @@ -29,16 +29,17 @@ func NewReceiver(input string) (Receiver, error) { return receiver, nil } +func newRouteFromReceiver(receiver Receiver) *config.Route { + return &config.Route{Receiver: receiver.Name, Continue: true} +} + func NewReceiverIntegrations(nc Receiver, tmpl *template.Template, logger *slog.Logger) ([]notify.Integration, error) { return receiver.BuildReceiverIntegrations(nc, tmpl, logger) } -func TestReceiver(ctx context.Context, receiver Receiver, tmpl *template.Template, logger *slog.Logger) error { - now := time.Now() - testAlert := NewTestAlert(receiver, now, now) - - ctx = notify.WithGroupKey(ctx, fmt.Sprintf("%s-%s-%d", receiver.Name, testAlert.Labels.Fingerprint(), now.Unix())) - ctx = notify.WithGroupLabels(ctx, testAlert.Labels) +func TestReceiver(ctx context.Context, receiver Receiver, tmpl *template.Template, logger *slog.Logger, alert *Alert) error { + ctx = notify.WithGroupKey(ctx, fmt.Sprintf("%s-%s-%d", receiver.Name, alert.Labels.Fingerprint(), time.Now().Unix())) + ctx = notify.WithGroupLabels(ctx, alert.Labels) ctx = notify.WithReceiverName(ctx, receiver.Name) integrations, err := NewReceiverIntegrations(receiver, tmpl, logger) @@ -46,7 +47,7 @@ func TestReceiver(ctx context.Context, receiver Receiver, tmpl *template.Templat return err } - if _, err = integrations[0].Notify(ctx, testAlert); err != nil { + if _, err = integrations[0].Notify(ctx, alert); err != nil { return err }