feat(alertmanager): simplify and test e2e alertmanager (#7217)

* refactor(alertmanager): complete e2e testing and simplify

* fix(alertmanager): fix typo

* fix(alertmanager): set to true for prometheus
This commit is contained in:
Vibhu Pandey
2025-03-05 10:01:02 +05:30
committed by GitHub
parent 2117075f50
commit 8a01312967
23 changed files with 1252 additions and 436 deletions

2
.gitignore vendored
View File

@@ -76,3 +76,5 @@ dist/
# ignore user_scripts that is fetched by init-clickhouse
deploy/common/clickhouse/user_scripts/
queries.active

3
go.mod
View File

@@ -55,6 +55,7 @@ require (
github.com/soheilhy/cmux v0.1.5
github.com/srikanthccv/ClickHouse-go-mock v0.9.0
github.com/stretchr/testify v1.10.0
github.com/tidwall/gjson v1.18.0
github.com/uptrace/bun v1.2.9
github.com/uptrace/bun/dialect/pgdialect v1.2.9
github.com/uptrace/bun/dialect/sqlitedialect v1.2.9
@@ -204,6 +205,8 @@ require (
github.com/smarty/assertions v1.15.0 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect

6
go.sum
View File

@@ -891,7 +891,13 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4=
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4=

View File

@@ -15,7 +15,7 @@ var (
type Alertmanager interface {
factory.Service
// GetAlerts gets the alerts from the alertmanager per organization.
GetAlerts(context.Context, string, alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error)
GetAlerts(context.Context, string, alertmanagertypes.GettableAlertsParams) (alertmanagertypes.DeprecatedGettableAlerts, error)
// PutAlerts puts the alerts into the alertmanager per organization.
PutAlerts(context.Context, string, alertmanagertypes.PostableAlerts) error
@@ -23,18 +23,30 @@ type Alertmanager interface {
// TestReceiver sends a test alert to a receiver.
TestReceiver(context.Context, string, alertmanagertypes.Receiver) error
// TestAlert sends an alert to a list of receivers.
TestAlert(ctx context.Context, orgID string, alert *alertmanagertypes.PostableAlert, receivers []string) error
// ListChannels lists all channels for the organization.
ListChannels(context.Context, string) ([]*alertmanagertypes.Channel, error)
// ListAllChannels lists all channels for all organizations. It is used by the legacy alertmanager only.
ListAllChannels(context.Context) ([]*alertmanagertypes.Channel, error)
// GetChannelByID gets a channel for the organization.
GetChannelByID(context.Context, string, int) (*alertmanagertypes.Channel, error)
// UpdateChannel updates a channel for the organization.
UpdateChannelByReceiver(context.Context, string, alertmanagertypes.Receiver) error
UpdateChannelByReceiverAndID(context.Context, string, alertmanagertypes.Receiver, int) error
// CreateChannel creates a channel for the organization.
CreateChannel(context.Context, string, alertmanagertypes.Receiver) error
// DeleteChannelByID deletes a channel for the organization.
DeleteChannelByID(context.Context, string, int) error
// SetConfig sets the config for the organization.
SetConfig(context.Context, *alertmanagertypes.Config) error
// GetConfig gets the config for the organization.
GetConfig(context.Context, string) (*alertmanagertypes.Config, error)
}

View File

@@ -9,9 +9,11 @@ import (
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
// Notifier is responsible for dispatching alert notifications to an alertmanager.
// Batcher is responsible for batching alerts and broadcasting them on a channel.
type Batcher struct {
// C is the channel on which alerts are sent to alertmanager
C chan alertmanagertypes.PostableAlerts
// logger
logger *slog.Logger
@@ -23,14 +25,21 @@ type Batcher struct {
// more channel to signal the sender goroutine to send alerts
moreC chan struct{}
// stop channel to signal the sender goroutine to stop
stopC chan struct{}
mtx sync.RWMutex
// mutex to synchronize access to the queue
queueMtx sync.RWMutex
// wait group to wait for all goroutines to finish
goroutinesWg sync.WaitGroup
}
func New(logger *slog.Logger, config Config) *Batcher {
batcher := &Batcher{
logger: logger,
queue: make(alertmanagertypes.PostableAlerts, config.Capacity),
queue: make(alertmanagertypes.PostableAlerts, 0, config.Capacity),
config: config,
moreC: make(chan struct{}, 1),
stopC: make(chan struct{}),
@@ -41,22 +50,27 @@ func New(logger *slog.Logger, config Config) *Batcher {
}
// Start dispatches notifications continuously.
func (n *Batcher) Start(ctx context.Context) error {
func (batcher *Batcher) Start(ctx context.Context) error {
batcher.goroutinesWg.Add(1)
go func() {
n.logger.InfoContext(ctx, "starting alertmanager batcher")
defer batcher.goroutinesWg.Done()
for {
select {
case <-ctx.Done():
case <-batcher.stopC:
for batcher.queueLen() > 0 {
alerts := batcher.next()
batcher.C <- alerts
}
close(batcher.C)
return
case <-n.stopC:
return
case <-n.moreC:
case <-batcher.moreC:
}
alerts := n.nextBatch()
n.C <- alerts
alerts := batcher.next()
batcher.C <- alerts
// If the queue still has items left, kick off the next iteration.
if n.queueLen() > 0 {
n.setMore()
if batcher.queueLen() > 0 {
batcher.setMore()
}
}
}()
@@ -64,69 +78,67 @@ func (n *Batcher) Start(ctx context.Context) error {
return nil
}
func (n *Batcher) queueLen() int {
n.mtx.RLock()
defer n.mtx.RUnlock()
// Add queues the given alerts for processing.
func (batcher *Batcher) Add(ctx context.Context, alerts ...*alertmanagertypes.PostableAlert) {
batcher.queueMtx.Lock()
defer batcher.queueMtx.Unlock()
return len(n.queue)
// Queue capacity should be significantly larger than a single alert
// batch could be.
if d := len(alerts) - batcher.config.Capacity; d > 0 {
alerts = alerts[d:]
batcher.logger.WarnContext(ctx, "alert batch larger than queue capacity, dropping alerts", "num_dropped", d, "capacity", batcher.config.Capacity)
}
// If the queue is full, remove the oldest alerts in favor
// of newer ones.
if d := (len(batcher.queue) + len(alerts)) - batcher.config.Capacity; d > 0 {
batcher.queue = batcher.queue[d:]
batcher.logger.WarnContext(ctx, "alert batch queue full, dropping alerts", "num_dropped", d)
}
batcher.queue = append(batcher.queue, alerts...)
// Notify sending goroutine that there are alerts to be processed.
batcher.setMore()
}
func (n *Batcher) nextBatch() alertmanagertypes.PostableAlerts {
n.mtx.Lock()
defer n.mtx.Unlock()
// Stop shuts down the batcher.
func (batcher *Batcher) Stop(ctx context.Context) {
close(batcher.stopC)
batcher.goroutinesWg.Wait()
}
func (batcher *Batcher) queueLen() int {
batcher.queueMtx.RLock()
defer batcher.queueMtx.RUnlock()
return len(batcher.queue)
}
func (batcher *Batcher) next() alertmanagertypes.PostableAlerts {
batcher.queueMtx.Lock()
defer batcher.queueMtx.Unlock()
var alerts alertmanagertypes.PostableAlerts
if len(n.queue) > n.config.Size {
alerts = append(make(alertmanagertypes.PostableAlerts, 0, n.config.Size), n.queue[:n.config.Size]...)
n.queue = n.queue[n.config.Size:]
if len(batcher.queue) > batcher.config.Size {
alerts = append(make(alertmanagertypes.PostableAlerts, 0, batcher.config.Size), batcher.queue[:batcher.config.Size]...)
batcher.queue = batcher.queue[batcher.config.Size:]
} else {
alerts = append(make(alertmanagertypes.PostableAlerts, 0, len(n.queue)), n.queue...)
n.queue = n.queue[:0]
alerts = append(make(alertmanagertypes.PostableAlerts, 0, len(batcher.queue)), batcher.queue...)
batcher.queue = batcher.queue[:0]
}
return alerts
}
// Send queues the given notification requests for processing.
// Panics if called on a handler that is not running.
func (n *Batcher) Send(ctx context.Context, alerts ...*alertmanagertypes.PostableAlert) {
n.mtx.Lock()
defer n.mtx.Unlock()
// Queue capacity should be significantly larger than a single alert
// batch could be.
if d := len(alerts) - n.config.Capacity; d > 0 {
alerts = alerts[d:]
n.logger.WarnContext(ctx, "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d)
}
// If the queue is full, remove the oldest alerts in favor
// of newer ones.
if d := (len(n.queue) + len(alerts)) - n.config.Capacity; d > 0 {
n.queue = n.queue[d:]
n.logger.WarnContext(ctx, "Alert notification queue full, dropping alerts", "num_dropped", d)
}
n.queue = append(n.queue, alerts...)
// Notify sending goroutine that there are alerts to be processed.
n.setMore()
}
// setMore signals that the alert queue has items.
func (n *Batcher) setMore() {
func (batcher *Batcher) setMore() {
// If we cannot send on the channel, it means the signal already exists
// and has not been consumed yet.
select {
case n.moreC <- struct{}{}:
case batcher.moreC <- struct{}{}:
default:
}
}
// Stop shuts down the notification handler.
func (n *Batcher) Stop(ctx context.Context) {
n.logger.InfoContext(ctx, "Stopping alertmanager batcher")
close(n.moreC)
close(n.stopC)
}

View File

@@ -0,0 +1,64 @@
package alertmanagerbatcher
import (
"context"
"io"
"log/slog"
"testing"
"github.com/stretchr/testify/assert"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
func TestBatcherWithOneAlertAndDefaultConfigs(t *testing.T) {
batcher := New(slog.New(slog.NewTextHandler(io.Discard, nil)), NewConfig())
batcher.Start(context.Background())
batcher.Add(context.Background(), &alertmanagertypes.PostableAlert{Alert: alertmanagertypes.AlertModel{
Labels: map[string]string{"alertname": "test"},
}})
alerts := <-batcher.C
assert.Equal(t, 1, len(alerts))
batcher.Stop(context.Background())
}
func TestBatcherWithBatchSize(t *testing.T) {
batcher := New(slog.New(slog.NewTextHandler(io.Discard, nil)), Config{Size: 2, Capacity: 4})
batcher.Start(context.Background())
var alerts alertmanagertypes.PostableAlerts
for i := 0; i < 4; i++ {
alerts = append(alerts, &alertmanagertypes.PostableAlert{Alert: alertmanagertypes.AlertModel{
Labels: map[string]string{"alertname": "test"},
}})
}
batcher.Add(context.Background(), alerts...)
for i := 0; i < 2; i++ {
alerts := <-batcher.C
assert.Equal(t, 2, len(alerts))
}
batcher.Stop(context.Background())
}
func TestBatcherWithCClosed(t *testing.T) {
batcher := New(slog.New(slog.NewTextHandler(io.Discard, nil)), Config{Size: 2, Capacity: 4})
batcher.Start(context.Background())
var alerts alertmanagertypes.PostableAlerts
for i := 0; i < 4; i++ {
alerts = append(alerts, &alertmanagertypes.PostableAlert{Alert: alertmanagertypes.AlertModel{
Labels: map[string]string{"alertname": "test"},
}})
}
batcher.Add(context.Background(), alerts...)
batcher.Stop(context.Background())
for alerts := range batcher.C {
assert.Equal(t, 2, len(alerts))
}
}

View File

@@ -1,13 +1,16 @@
package alertmanagerbatcher
type Config struct {
// Capacity is the maximum number of alerts that can be buffered in the batcher.
Capacity int
Size int
// Size is the number of alerts to send in each batch.
Size int
}
func NewConfig() Config {
return Config{
Capacity: 1000,
Capacity: 10000,
Size: 64,
}
}

View File

@@ -66,7 +66,7 @@ type Server struct {
func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registerer, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore) (*Server, error) {
server := &Server{
logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/server"),
logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/alertmanagerserver"),
registry: registry,
srvConfig: srvConfig,
orgID: orgID,
@@ -84,7 +84,10 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere
silencesSnapshot := ""
if state != nil {
silencesSnapshot = state.Silences
silencesSnapshot, err = state.Get(alertmanagertypes.SilenceStateName)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
return nil, err
}
}
// Initialize silences
server.silences, err = silence.New(silence.Options{
@@ -103,7 +106,10 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere
nflogSnapshot := ""
if state != nil {
nflogSnapshot = state.NFLog
nflogSnapshot, err = state.Get(alertmanagertypes.NFLogStateName)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
return nil, err
}
}
// Initialize notification log
@@ -308,7 +314,51 @@ func (server *Server) SetConfig(ctx context.Context, alertmanagerConfig *alertma
}
func (server *Server) TestReceiver(ctx context.Context, receiver alertmanagertypes.Receiver) error {
return alertmanagertypes.TestReceiver(ctx, receiver, server.tmpl, server.logger)
return alertmanagertypes.TestReceiver(ctx, receiver, server.tmpl, server.logger, alertmanagertypes.NewTestAlert(receiver, time.Now(), time.Now()))
}
func (server *Server) TestAlert(ctx context.Context, postableAlert *alertmanagertypes.PostableAlert, receivers []string) error {
alerts, err := alertmanagertypes.NewAlertsFromPostableAlerts(alertmanagertypes.PostableAlerts{postableAlert}, time.Duration(server.srvConfig.Global.ResolveTimeout), time.Now())
if err != nil {
return errors.Join(err...)
}
if len(alerts) != 1 {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "expected 1 alert, got %d", len(alerts))
}
ch := make(chan error, len(receivers))
for _, receiverName := range receivers {
go func(receiverName string) {
receiver, err := server.alertmanagerConfig.GetReceiver(receiverName)
if err != nil {
ch <- err
return
}
ch <- alertmanagertypes.TestReceiver(ctx, receiver, server.tmpl, server.logger, alerts[0])
}(receiverName)
}
var errs []error
for i := 0; i < len(receivers); i++ {
if err := <-ch; err != nil {
errs = append(errs, err)
}
}
if errs != nil {
return errors.Join(errs...)
}
return nil
}
func (server *Server) Hash() string {
if server.alertmanagerConfig == nil {
return ""
}
return server.alertmanagerConfig.StoreableConfig().Hash
}
func (server *Server) Stop(ctx context.Context) error {

View File

@@ -87,7 +87,7 @@ func TestServerPutAlerts(t *testing.T) {
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")
require.NoError(t, err)
require.NoError(t, amConfig.CreateReceiver(&config.Route{Receiver: "test-receiver", Continue: true}, alertmanagertypes.Receiver{
require.NoError(t, amConfig.CreateReceiver(alertmanagertypes.Receiver{
Name: "test-receiver",
WebhookConfigs: []*config.WebhookConfig{
{

View File

@@ -3,7 +3,10 @@ package sqlalertmanagerstore
import (
"context"
"database/sql"
"strconv"
"github.com/tidwall/gjson"
"github.com/uptrace/bun"
"go.signoz.io/signoz/pkg/errors"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
@@ -45,46 +48,20 @@ func (store *config) Get(ctx context.Context, orgID string) (*alertmanagertypes.
}
// Set implements alertmanagertypes.ConfigStore.
func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config, cb func(context.Context) error) error {
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() //nolint:errcheck
if _, err = tx.
func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config) error {
if _, err := store.
sqlstore.
BunDB().
NewInsert().
Model(config.StoreableConfig()).
On("CONFLICT (org_id) DO UPDATE").
Set("config = ?", string(config.StoreableConfig().Config)).
Set("config = ?", config.StoreableConfig().Config).
Set("hash = ?", config.StoreableConfig().Hash).
Set("updated_at = ?", config.StoreableConfig().UpdatedAt).
Exec(ctx); err != nil {
return err
}
channels := config.Channels()
if len(channels) != 0 {
if _, err = tx.NewInsert().
Model(&channels).
On("CONFLICT (name) DO UPDATE").
Set("data = EXCLUDED.data").
Set("updated_at = EXCLUDED.updated_at").
Exec(ctx); err != nil {
return err
}
}
if cb != nil {
if err = cb(ctx); err != nil {
return err
}
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}
@@ -104,3 +81,176 @@ func (store *config) ListOrgs(ctx context.Context) ([]string, error) {
return orgIDs, nil
}
func (store *config) CreateChannel(ctx context.Context, channel *alertmanagertypes.Channel, cb func(context.Context) error) error {
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() //nolint:errcheck
if _, err = tx.NewInsert().
Model(channel).
Exec(ctx); err != nil {
return err
}
if cb != nil {
if err = cb(ctx); err != nil {
return err
}
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}
func (store *config) GetChannelByID(ctx context.Context, orgID string, id int) (*alertmanagertypes.Channel, error) {
channel := new(alertmanagertypes.Channel)
err := store.
sqlstore.
BunDB().
NewSelect().
Model(channel).
Where("org_id = ?", orgID).
Where("id = ?", id).
Scan(ctx)
if err != nil {
if err == sql.ErrNoRows {
return nil, errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", id)
}
return nil, err
}
return channel, nil
}
func (store *config) UpdateChannel(ctx context.Context, orgID string, channel *alertmanagertypes.Channel, cb func(context.Context) error) error {
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() //nolint:errcheck
_, err = tx.NewUpdate().
Model(channel).
WherePK().
Exec(ctx)
if err != nil {
return err
}
if cb != nil {
if err = cb(ctx); err != nil {
return err
}
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}
func (store *config) DeleteChannelByID(ctx context.Context, orgID string, id int, cb func(context.Context) error) error {
channel := new(alertmanagertypes.Channel)
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() //nolint:errcheck
_, err = tx.NewDelete().
Model(channel).
Where("org_id = ?", orgID).
Where("id = ?", id).
Exec(ctx)
if err != nil {
return err
}
if cb != nil {
if err = cb(ctx); err != nil {
return err
}
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}
func (store *config) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) {
var channels []*alertmanagertypes.Channel
err := store.
sqlstore.
BunDB().
NewSelect().
Model(&channels).
Where("org_id = ?", orgID).
Scan(ctx)
if err != nil {
return nil, err
}
return channels, nil
}
func (store *config) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) {
var channels []*alertmanagertypes.Channel
err := store.
sqlstore.
BunDB().
NewSelect().
Model(&channels).
Scan(ctx)
if err != nil {
return nil, err
}
return channels, nil
}
func (store *config) GetMatchers(ctx context.Context, orgID string) (map[string][]string, error) {
type matcher struct {
bun.BaseModel `bun:"table:rules"`
ID int `bun:"id,pk"`
Data string `bun:"data"`
}
matchers := []matcher{}
err := store.
sqlstore.
BunDB().
NewSelect().
Column("id", "data").
Model(&matchers).
Scan(ctx)
if err != nil {
return nil, err
}
matchersMap := make(map[string][]string)
for _, matcher := range matchers {
receivers := gjson.Get(matcher.Data, "preferredChannels").Array()
for _, receiver := range receivers {
matchersMap[strconv.Itoa(matcher.ID)] = append(matchersMap[strconv.Itoa(matcher.ID)], receiver.String())
}
}
return matchersMap, nil
}

View File

@@ -24,7 +24,7 @@ func NewAPI(alertmanager Alertmanager) *API {
}
}
func (api *API) GetAlerts(req *http.Request, rw http.ResponseWriter) {
func (api *API) GetAlerts(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()
@@ -49,7 +49,7 @@ func (api *API) GetAlerts(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusOK, alerts)
}
func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) {
func (api *API) TestReceiver(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()
@@ -81,7 +81,7 @@ func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusNoContent, nil)
}
func (api *API) ListChannels(req *http.Request, rw http.ResponseWriter) {
func (api *API) ListChannels(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()
@@ -100,7 +100,20 @@ func (api *API) ListChannels(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusOK, channels)
}
func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) {
func (api *API) ListAllChannels(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()
channels, err := api.alertmanager.ListAllChannels(ctx)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, channels)
}
func (api *API) GetChannelByID(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()
@@ -137,7 +150,7 @@ func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusOK, channel)
}
func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
func (api *API) UpdateChannelByID(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()
@@ -147,6 +160,24 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
return
}
vars := mux.Vars(req)
if vars == nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path"))
return
}
idString, ok := vars["id"]
if !ok {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path"))
return
}
id, err := strconv.Atoi(idString)
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid integer"))
return
}
body, err := io.ReadAll(req.Body)
if err != nil {
render.Error(rw, err)
@@ -160,7 +191,7 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
return
}
err = api.alertmanager.UpdateChannelByReceiver(ctx, claims.OrgID, receiver)
err = api.alertmanager.UpdateChannelByReceiverAndID(ctx, claims.OrgID, receiver, id)
if err != nil {
render.Error(rw, err)
return
@@ -169,7 +200,7 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusNoContent, nil)
}
func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) {
func (api *API) DeleteChannelByID(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()
@@ -206,7 +237,7 @@ func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusNoContent, nil)
}
func (api *API) CreateChannel(req *http.Request, rw http.ResponseWriter) {
func (api *API) CreateChannel(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()

View File

@@ -1,6 +1,8 @@
package alertmanager
import (
"errors"
"fmt"
"net/url"
"time"
@@ -9,9 +11,6 @@ import (
)
type Config struct {
// Config is the config for the alertmanager server.
alertmanagerserver.Config `mapstructure:",squash"`
// Provider is the provider for the alertmanager service.
Provider string `mapstructure:"provider"`
@@ -25,11 +24,14 @@ type Config struct {
type Signoz struct {
// PollInterval is the interval at which the alertmanager is synced.
PollInterval time.Duration `mapstructure:"poll_interval"`
// Config is the config for the alertmanager server.
alertmanagerserver.Config `mapstructure:",squash"`
}
type Legacy struct {
// URL is the URL of the legacy alertmanager.
URL *url.URL `mapstructure:"url"`
// ApiURL is the URL of the legacy signoz alertmanager.
ApiURL string `mapstructure:"api_url"`
}
func NewConfigFactory() factory.ConfigFactory {
@@ -38,14 +40,28 @@ func NewConfigFactory() factory.ConfigFactory {
func newConfig() factory.Config {
return Config{
Config: alertmanagerserver.NewConfig(),
Provider: "signoz",
Provider: "legacy",
Legacy: Legacy{
ApiURL: "http://alertmanager:9093/api",
},
Signoz: Signoz{
PollInterval: 15 * time.Second,
Config: alertmanagerserver.NewConfig(),
},
}
}
func (c Config) Validate() error {
if c.Provider == "legacy" {
if c.Legacy.ApiURL == "" {
return errors.New("api_url is required")
}
_, err := url.Parse(c.Legacy.ApiURL)
if err != nil {
return fmt.Errorf("api_url %q is invalid: %w", c.Legacy.ApiURL, err)
}
}
return nil
}

View File

@@ -0,0 +1,48 @@
package alertmanager
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/config"
"go.signoz.io/signoz/pkg/config/envprovider"
"go.signoz.io/signoz/pkg/factory"
)
func TestNewWithEnvProvider(t *testing.T) {
t.Setenv("SIGNOZ_ALERTMANAGER_PROVIDER", "legacy")
t.Setenv("SIGNOZ_ALERTMANAGER_LEGACY_API__URL", "http://localhost:9093/api")
conf, err := config.New(
context.Background(),
config.ResolverConfig{
Uris: []string{"env:"},
ProviderFactories: []config.ProviderFactory{
envprovider.NewFactory(),
},
},
[]factory.ConfigFactory{
NewConfigFactory(),
},
)
require.NoError(t, err)
actual := &Config{}
err = conf.Unmarshal("alertmanager", actual)
require.NoError(t, err)
def := NewConfigFactory().New().(Config)
expected := &Config{
Provider: "legacy",
Legacy: Legacy{
ApiURL: "http://localhost:9093/api",
},
Signoz: def.Signoz,
}
assert.Equal(t, expected, actual)
assert.NoError(t, actual.Validate())
}

View File

@@ -7,8 +7,10 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"time"
"github.com/tidwall/gjson"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerbatcher"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
@@ -17,6 +19,11 @@ import (
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
type postableAlert struct {
*alertmanagertypes.PostableAlert
Receivers []string `json:"receivers"`
}
const (
alertsPath string = "/v1/alerts"
routesPath string = "/v1/routes"
@@ -29,6 +36,7 @@ type provider struct {
client *http.Client
configStore alertmanagertypes.ConfigStore
batcher *alertmanagerbatcher.Batcher
url *url.URL
}
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
@@ -41,6 +49,11 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/legacyalertmanager")
configStore := sqlalertmanagerstore.NewConfigStore(sqlstore)
url, err := url.Parse(config.Legacy.ApiURL)
if err != nil {
return nil, err
}
return &provider{
config: config,
settings: settings,
@@ -49,6 +62,7 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
},
configStore: configStore,
batcher: alertmanagerbatcher.New(settings.Logger(), alertmanagerbatcher.NewConfig()),
url: url,
}, nil
}
@@ -57,22 +71,18 @@ func (provider *provider) Start(ctx context.Context) error {
if err != nil {
return err
}
defer provider.batcher.Stop(ctx)
for {
select {
case <-ctx.Done():
return nil
case alerts := <-provider.batcher.C:
if err := provider.putAlerts(ctx, "", alerts); err != nil {
provider.settings.Logger().Error("failed to send alerts to alertmanager", "error", err)
}
for alerts := range provider.batcher.C {
if err := provider.putAlerts(ctx, "", alerts); err != nil {
provider.settings.Logger().Error("failed to send alerts to alertmanager", "error", err)
}
}
return nil
}
func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
url := provider.config.Legacy.URL.JoinPath(alertsPath)
func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.DeprecatedGettableAlerts, error) {
url := provider.url.JoinPath(alertsPath)
url.RawQuery = params.RawQuery
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil)
@@ -92,23 +102,45 @@ func (provider *provider) GetAlerts(ctx context.Context, orgID string, params al
return nil, err
}
var alerts alertmanagertypes.GettableAlerts
if err := json.Unmarshal(body, &alerts); err != nil {
if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("bad response status %v", resp.Status)
}
var alerts alertmanagertypes.DeprecatedGettableAlerts
if err := json.Unmarshal([]byte(gjson.GetBytes(body, "data").Raw), &alerts); err != nil {
return nil, err
}
return alerts, nil
}
func (provider *provider) PutAlerts(ctx context.Context, _ string, alerts alertmanagertypes.PostableAlerts) error {
provider.batcher.Send(ctx, alerts...)
func (provider *provider) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error {
provider.batcher.Add(ctx, alerts...)
return nil
}
func (provider *provider) putAlerts(ctx context.Context, _ string, alerts alertmanagertypes.PostableAlerts) error {
url := provider.config.Legacy.URL.JoinPath(alertsPath)
func (provider *provider) putAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error {
cfg, err := provider.configStore.Get(ctx, orgID)
if err != nil {
return err
}
body, err := json.Marshal(alerts)
legacyAlerts := make([]postableAlert, len(alerts))
for i, alert := range alerts {
receivers, err := cfg.ReceiverNamesFromRuleID(alert.Alert.Labels["ruleID"])
if err != nil {
return err
}
legacyAlerts[i] = postableAlert{
PostableAlert: alert,
Receivers: receivers,
}
}
url := provider.url.JoinPath(alertsPath)
body, err := json.Marshal(legacyAlerts)
if err != nil {
return err
}
@@ -135,7 +167,7 @@ func (provider *provider) putAlerts(ctx context.Context, _ string, alerts alertm
}
func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
url := provider.config.Legacy.URL.JoinPath(testReceiverPath)
url := provider.url.JoinPath(testReceiverPath)
body, err := json.Marshal(receiver)
if err != nil {
@@ -163,49 +195,65 @@ func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiv
return nil
}
func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) {
config, err := provider.configStore.Get(ctx, orgID)
func (provider *provider) TestAlert(ctx context.Context, orgID string, alert *alertmanagertypes.PostableAlert, receivers []string) error {
url := provider.url.JoinPath(alertsPath)
legacyAlert := postableAlert{
PostableAlert: alert,
Receivers: receivers,
}
body, err := json.Marshal(legacyAlert)
if err != nil {
return nil, err
return err
}
channels := config.Channels()
channelList := make([]*alertmanagertypes.Channel, 0, len(channels))
for _, channel := range channels {
channelList = append(channelList, channel)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url.String(), bytes.NewBuffer(body))
if err != nil {
return err
}
req.Header.Add("Content-Type", "application/json")
resp, err := provider.client.Do(req)
if err != nil {
return err
}
return channelList, nil
defer resp.Body.Close() //nolint:errcheck
// Any HTTP status 2xx is OK.
if resp.StatusCode/100 != 2 {
return fmt.Errorf("bad response status %v", resp.Status)
}
return nil
}
func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) {
return provider.configStore.ListChannels(ctx, orgID)
}
func (provider *provider) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) {
return provider.configStore.ListAllChannels(ctx)
}
func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) {
config, err := provider.configStore.Get(ctx, orgID)
if err != nil {
return nil, err
}
channels := config.Channels()
channel, err := alertmanagertypes.GetChannelByID(channels, channelID)
if err != nil {
return nil, err
}
return channel, nil
return provider.configStore.GetChannelByID(ctx, orgID, channelID)
}
func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
config, err := provider.configStore.Get(ctx, orgID)
func (provider *provider) UpdateChannelByReceiverAndID(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver, id int) error {
channel, err := provider.configStore.GetChannelByID(ctx, orgID, id)
if err != nil {
return err
}
err = config.UpdateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver)
err = channel.Update(receiver)
if err != nil {
return err
}
err = provider.configStore.Set(ctx, config, func(ctx context.Context) error {
url := provider.config.Legacy.URL.JoinPath(routesPath)
err = provider.configStore.UpdateChannel(ctx, orgID, channel, func(ctx context.Context) error {
url := provider.url.JoinPath(routesPath)
body, err := json.Marshal(receiver)
if err != nil {
@@ -240,18 +288,10 @@ func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID str
}
func (provider *provider) CreateChannel(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
config, err := provider.configStore.Get(ctx, orgID)
if err != nil {
return err
}
channel := alertmanagertypes.NewChannelFromReceiver(receiver, orgID)
err = config.CreateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver)
if err != nil {
return err
}
err = provider.configStore.Set(ctx, config, func(ctx context.Context) error {
url := provider.config.Legacy.URL.JoinPath(routesPath)
err := provider.configStore.CreateChannel(ctx, channel, func(ctx context.Context) error {
url := provider.url.JoinPath(routesPath)
body, err := json.Marshal(receiver)
if err != nil {
@@ -286,24 +326,13 @@ func (provider *provider) CreateChannel(ctx context.Context, orgID string, recei
}
func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, channelID int) error {
config, err := provider.configStore.Get(ctx, orgID)
if err != nil {
return err
}
err := provider.configStore.DeleteChannelByID(ctx, orgID, channelID, func(ctx context.Context) error {
channel, err := provider.configStore.GetChannelByID(ctx, orgID, channelID)
if err != nil {
return err
}
channels := config.Channels()
channel, err := alertmanagertypes.GetChannelByID(channels, channelID)
if err != nil {
return err
}
err = config.DeleteReceiver(channel.Name)
if err != nil {
return err
}
err = provider.configStore.Set(ctx, config, func(ctx context.Context) error {
url := provider.config.Legacy.URL.JoinPath(routesPath)
url := provider.url.JoinPath(routesPath)
body, err := json.Marshal(map[string]string{"name": channel.Name})
if err != nil {
@@ -337,6 +366,15 @@ func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, c
return nil
}
func (provider *provider) SetConfig(ctx context.Context, config *alertmanagertypes.Config) error {
return provider.configStore.Set(ctx, config)
}
func (provider *provider) Stop(ctx context.Context) error {
provider.batcher.Stop(ctx)
return nil
}
func (provider *provider) GetConfig(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) {
return provider.configStore.Get(ctx, orgID)
}

View File

@@ -12,7 +12,7 @@ import (
type Service struct {
// config is the config for the alertmanager service
config Config
config alertmanagerserver.Config
// stateStore is the state store for the alertmanager service
stateStore alertmanagertypes.StateStore
@@ -30,7 +30,7 @@ type Service struct {
serversMtx sync.RWMutex
}
func New(ctx context.Context, settings factory.ScopedProviderSettings, config Config, stateStore alertmanagertypes.StateStore, configStore alertmanagertypes.ConfigStore) *Service {
func New(ctx context.Context, settings factory.ScopedProviderSettings, config alertmanagerserver.Config, stateStore alertmanagertypes.StateStore, configStore alertmanagertypes.ConfigStore) *Service {
service := &Service{
config: config,
stateStore: stateStore,
@@ -53,13 +53,23 @@ func (service *Service) SyncServers(ctx context.Context) error {
for _, orgID := range orgIDs {
config, err := service.getConfig(ctx, orgID)
if err != nil {
service.settings.Logger().Error("failed to get alertmanagerconfig for org", "orgID", orgID, "error", err)
service.settings.Logger().Error("failed to get alertmanager config for org", "orgID", orgID, "error", err)
continue
}
service.servers[orgID], err = alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config.Config, orgID, service.stateStore)
if err != nil {
service.settings.Logger().Error("failed to create alertmanagerserver", "orgID", orgID, "error", err)
// If the server is not present, create it and sync the config
if _, ok := service.servers[orgID]; !ok {
server, err := service.newServer(ctx, orgID)
if err != nil {
service.settings.Logger().Error("failed to create alertmanager server", "orgID", orgID, "error", err)
continue
}
service.servers[orgID] = server
}
if service.servers[orgID].Hash() == config.StoreableConfig().Hash {
service.settings.Logger().Debug("skipping alertmanager sync for org", "orgID", orgID, "hash", config.StoreableConfig().Hash)
continue
}
@@ -74,13 +84,18 @@ func (service *Service) SyncServers(ctx context.Context) error {
return nil
}
func (service *Service) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
func (service *Service) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.DeprecatedGettableAlerts, error) {
server, err := service.getServer(orgID)
if err != nil {
return nil, err
}
return server.GetAlerts(ctx, params)
alerts, err := server.GetAlerts(ctx, params)
if err != nil {
return nil, err
}
return alertmanagertypes.NewDeprecatedGettableAlertsFromGettableAlerts(alerts), nil
}
func (service *Service) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error {
@@ -101,6 +116,15 @@ func (service *Service) TestReceiver(ctx context.Context, orgID string, receiver
return server.TestReceiver(ctx, receiver)
}
func (service *Service) TestAlert(ctx context.Context, orgID string, alert *alertmanagertypes.PostableAlert, receivers []string) error {
server, err := service.getServer(orgID)
if err != nil {
return err
}
return server.TestAlert(ctx, alert, receivers)
}
func (service *Service) Stop(ctx context.Context) error {
for _, server := range service.servers {
server.Stop(ctx)
@@ -109,6 +133,36 @@ func (service *Service) Stop(ctx context.Context) error {
return nil
}
func (service *Service) newServer(ctx context.Context, orgID string) (*alertmanagerserver.Server, error) {
config, err := service.getConfig(ctx, orgID)
if err != nil {
return nil, err
}
server, err := alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config, orgID, service.stateStore)
if err != nil {
return nil, err
}
beforeCompareAndSelectHash := config.StoreableConfig().Hash
config, err = service.compareAndSelectConfig(ctx, config)
if err != nil {
return nil, err
}
if beforeCompareAndSelectHash == config.StoreableConfig().Hash {
service.settings.Logger().Debug("skipping config store update for org", "orgID", orgID, "hash", config.StoreableConfig().Hash)
return server, nil
}
err = service.configStore.Set(ctx, config)
if err != nil {
return nil, err
}
return server, nil
}
func (service *Service) getConfig(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) {
config, err := service.configStore.Get(ctx, orgID)
if err != nil {
@@ -121,13 +175,55 @@ func (service *Service) getConfig(ctx context.Context, orgID string) (*alertmana
return nil, err
}
return config, err
config.SetGlobalConfig(service.config.Global)
if config.AlertmanagerConfig().Route == nil {
config.SetRouteConfig(service.config.Route)
} else {
config.UpdateRouteConfig(service.config.Route)
}
}
return config, nil
}
// compareAndSelectConfig compares the existing config with the config derived from channels.
// If the hash of the config and the channels mismatch, the config derived from channels is returned.
func (service *Service) compareAndSelectConfig(ctx context.Context, incomingConfig *alertmanagertypes.Config) (*alertmanagertypes.Config, error) {
channels, err := service.configStore.ListChannels(ctx, incomingConfig.StoreableConfig().OrgID)
if err != nil {
return nil, err
}
matchers, err := service.configStore.GetMatchers(ctx, incomingConfig.StoreableConfig().OrgID)
if err != nil {
return nil, err
}
config, err := alertmanagertypes.NewConfigFromChannels(service.config.Global, service.config.Route, channels, incomingConfig.StoreableConfig().OrgID)
if err != nil {
return nil, err
}
for ruleID, receivers := range matchers {
err = config.CreateRuleIDMatcher(ruleID, receivers)
if err != nil {
return nil, err
}
}
if incomingConfig.StoreableConfig().Hash != config.StoreableConfig().Hash {
service.settings.Logger().InfoContext(ctx, "mismatch found, updating config to match channels and matchers")
return config, nil
}
return incomingConfig, nil
}
func (service *Service) getServer(orgID string) (*alertmanagerserver.Server, error) {
service.serversMtx.RLock()
defer service.serversMtx.RUnlock()
server, ok := service.servers[orgID]
if !ok {
return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for org %s", orgID)

View File

@@ -6,6 +6,7 @@ import (
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
"go.signoz.io/signoz/pkg/errors"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
@@ -17,6 +18,7 @@ type provider struct {
settings factory.ScopedProviderSettings
configStore alertmanagertypes.ConfigStore
stateStore alertmanagertypes.StateStore
stopC chan struct{}
}
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
@@ -26,15 +28,15 @@ func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager
}
func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore) (*provider, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/internalalertmanager")
settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/signozalertmanager")
configStore := sqlalertmanagerstore.NewConfigStore(sqlstore)
stateStore := sqlalertmanagerstore.NewStateStore(sqlstore)
return &provider{
p := &provider{
service: alertmanager.New(
ctx,
settings,
config,
config.Signoz.Config,
stateStore,
configStore,
),
@@ -42,15 +44,23 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
config: config,
configStore: configStore,
stateStore: stateStore,
}, nil
stopC: make(chan struct{}),
}
return p, nil
}
func (provider *provider) Start(ctx context.Context) error {
if err := provider.service.SyncServers(ctx); err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to sync alertmanager servers", "error", err)
return err
}
ticker := time.NewTicker(provider.config.Signoz.PollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
case <-provider.stopC:
return nil
case <-ticker.C:
if err := provider.service.SyncServers(ctx); err != nil {
@@ -61,10 +71,11 @@ func (provider *provider) Start(ctx context.Context) error {
}
func (provider *provider) Stop(ctx context.Context) error {
close(provider.stopC)
return provider.service.Stop(ctx)
}
func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.DeprecatedGettableAlerts, error) {
return provider.service.GetAlerts(ctx, orgID, params)
}
@@ -76,78 +87,68 @@ func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiv
return provider.service.TestReceiver(ctx, orgID, receiver)
}
func (provider *provider) TestAlert(ctx context.Context, orgID string, alert *alertmanagertypes.PostableAlert, receivers []string) error {
return provider.service.TestAlert(ctx, orgID, alert, receivers)
}
func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) {
config, err := provider.configStore.Get(ctx, orgID)
if err != nil {
return nil, err
}
return provider.configStore.ListChannels(ctx, orgID)
}
channels := config.Channels()
channelList := make([]*alertmanagertypes.Channel, 0, len(channels))
for _, channel := range channels {
channelList = append(channelList, channel)
}
return channelList, nil
func (provider *provider) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) {
return nil, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "not supported by provider signoz")
}
func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) {
config, err := provider.configStore.Get(ctx, orgID)
if err != nil {
return nil, err
}
channels := config.Channels()
channel, err := alertmanagertypes.GetChannelByID(channels, channelID)
if err != nil {
return nil, err
}
return channel, nil
return provider.configStore.GetChannelByID(ctx, orgID, channelID)
}
func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
func (provider *provider) UpdateChannelByReceiverAndID(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver, id int) error {
channel, err := provider.configStore.GetChannelByID(ctx, orgID, id)
if err != nil {
return err
}
config, err := provider.configStore.Get(ctx, orgID)
if err != nil {
return err
}
err = config.UpdateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver)
if err != nil {
if err := config.UpdateReceiver(receiver); err != nil {
return err
}
err = provider.configStore.Set(ctx, config, nil)
if err != nil {
if err := provider.configStore.Set(ctx, config); err != nil {
return err
}
return nil
if err := channel.Update(receiver); err != nil {
return err
}
return provider.configStore.UpdateChannel(ctx, orgID, channel, alertmanagertypes.ConfigStoreNoopCallback)
}
func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, channelID int) error {
channel, err := provider.configStore.GetChannelByID(ctx, orgID, channelID)
if err != nil {
return err
}
config, err := provider.configStore.Get(ctx, orgID)
if err != nil {
return err
}
channels := config.Channels()
channel, err := alertmanagertypes.GetChannelByID(channels, channelID)
if err != nil {
if err := config.DeleteReceiver(channel.Name); err != nil {
return err
}
err = config.DeleteReceiver(channel.Name)
if err != nil {
if err := provider.configStore.Set(ctx, config); err != nil {
return err
}
err = provider.configStore.Set(ctx, config, nil)
if err != nil {
return err
}
return nil
return provider.configStore.DeleteChannelByID(ctx, orgID, channelID, alertmanagertypes.ConfigStoreNoopCallback)
}
func (provider *provider) CreateChannel(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
@@ -156,15 +157,22 @@ func (provider *provider) CreateChannel(ctx context.Context, orgID string, recei
return err
}
err = config.CreateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver)
if err != nil {
if err := config.CreateReceiver(receiver); err != nil {
return err
}
err = provider.configStore.Set(ctx, config, nil)
if err != nil {
if err := provider.configStore.Set(ctx, config); err != nil {
return err
}
return nil
channel := alertmanagertypes.NewChannelFromReceiver(receiver, orgID)
return provider.configStore.CreateChannel(ctx, channel, alertmanagertypes.ConfigStoreNoopCallback)
}
func (provider *provider) SetConfig(ctx context.Context, config *alertmanagertypes.Config) error {
return provider.configStore.Set(ctx, config)
}
func (provider *provider) GetConfig(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) {
return provider.configStore.Get(ctx, orgID)
}

View File

@@ -24,7 +24,15 @@ type migrator struct {
func New(ctx context.Context, providerSettings factory.ProviderSettings, sqlstore sqlstore.SQLStore, migrations *migrate.Migrations, config Config) SQLMigrator {
return &migrator{
migrator: migrate.NewMigrator(sqlstore.BunDB(), migrations, migrate.WithTableName(migrationTableName), migrate.WithLocksTableName(migrationLockTableName)),
migrator: migrate.NewMigrator(
sqlstore.BunDB(),
migrations,
migrate.WithTableName(migrationTableName),
migrate.WithLocksTableName(migrationLockTableName),
// This is to ensure that the migration is marked as applied only on success. If the migration fails, no entry is made in the migration table
// and the migration will be retried.
migrate.WithMarkAppliedOnSuccess(true),
),
settings: factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/sqlmigrator"),
config: config,
dialect: sqlstore.BunDB().Dialect().Name().String(),

View File

@@ -22,6 +22,9 @@ import (
)
type (
// An alias for the Alert type from the alertmanager package.
AlertModel = models.Alert
// An alias for the Alert type from the alertmanager package.
Alert = types.Alert
@@ -38,12 +41,51 @@ type (
GettableAlerts = models.GettableAlerts
)
type DeprecatedGettableAlert struct {
*model.Alert
Status types.AlertStatus `json:"status"`
Receivers []string `json:"receivers"`
Fingerprint string `json:"fingerprint"`
}
type DeprecatedGettableAlerts = []*DeprecatedGettableAlert
// An alias for the GettableAlertsParams type from the alertmanager package.
type GettableAlertsParams struct {
alert.GetAlertsParams
RawQuery string
}
func NewDeprecatedGettableAlertsFromGettableAlerts(gettableAlerts GettableAlerts) DeprecatedGettableAlerts {
deprecatedGettableAlerts := make(DeprecatedGettableAlerts, 0, len(gettableAlerts))
for _, gettableAlert := range gettableAlerts {
receivers := make([]string, 0, len(gettableAlert.Receivers))
for _, receiver := range gettableAlert.Receivers {
receivers = append(receivers, *receiver.Name)
}
deprecatedGettableAlerts = append(deprecatedGettableAlerts, &DeprecatedGettableAlert{
Alert: &model.Alert{
Labels: v2.APILabelSetToModelLabelSet(gettableAlert.Labels),
Annotations: v2.APILabelSetToModelLabelSet(gettableAlert.Annotations),
StartsAt: time.Time(*gettableAlert.StartsAt),
EndsAt: time.Time(*gettableAlert.EndsAt),
GeneratorURL: string(gettableAlert.GeneratorURL),
},
Status: types.AlertStatus{
State: types.AlertState(*gettableAlert.Status.State),
SilencedBy: gettableAlert.Status.SilencedBy,
InhibitedBy: gettableAlert.Status.InhibitedBy,
},
Fingerprint: *gettableAlert.Fingerprint,
Receivers: receivers,
})
}
return deprecatedGettableAlerts
}
// Converts a slice of Alert to a slice of PostableAlert.
func NewPostableAlertsFromAlerts(alerts []*types.Alert) PostableAlerts {
postableAlerts := make(PostableAlerts, 0, len(alerts))

View File

@@ -12,7 +12,8 @@ import (
)
var (
ErrCodeAlertmanagerChannelNotFound = errors.MustNewCode("alertmanager_channel_not_found")
ErrCodeAlertmanagerChannelNotFound = errors.MustNewCode("alertmanager_channel_not_found")
ErrCodeAlertmanagerChannelNameMismatch = errors.MustNewCode("alertmanager_channel_name_mismatch")
)
var (
@@ -20,7 +21,7 @@ var (
receiverTypeRegex = regexp.MustCompile(`^(.+)_configs`)
)
type Channels = map[string]*Channel
type Channels = []*Channel
type GettableChannels = []*Channel
@@ -104,20 +105,6 @@ func NewReceiverFromChannel(channel *Channel) (Receiver, error) {
return receiver, nil
}
func NewChannelsFromConfig(c *config.Config, orgID string) Channels {
channels := Channels{}
for _, receiver := range c.Receivers {
channel := NewChannelFromReceiver(receiver, orgID)
if channel == nil {
continue
}
channels[channel.Name] = channel
}
return channels
}
func NewConfigFromChannels(globalConfig GlobalConfig, routeConfig RouteConfig, channels Channels, orgID string) (*Config, error) {
cfg, err := NewDefaultConfig(
globalConfig,
@@ -134,7 +121,7 @@ func NewConfigFromChannels(globalConfig GlobalConfig, routeConfig RouteConfig, c
return nil, err
}
err = cfg.CreateReceiver(&config.Route{Receiver: channel.Name, Continue: true}, receiver)
err = cfg.CreateReceiver(receiver)
if err != nil {
return nil, err
}
@@ -143,12 +130,38 @@ func NewConfigFromChannels(globalConfig GlobalConfig, routeConfig RouteConfig, c
return cfg, nil
}
func GetChannelByID(channels Channels, id int) (*Channel, error) {
for _, channel := range channels {
func GetChannelByID(channels Channels, id int) (int, *Channel, error) {
for i, channel := range channels {
if channel.ID == id {
return channel, nil
return i, channel, nil
}
}
return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", id)
return 0, nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", id)
}
func GetChannelByName(channels Channels, name string) (int, *Channel, error) {
for i, channel := range channels {
if channel.Name == name {
return i, channel, nil
}
}
return 0, nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with name %s", name)
}
func (c *Channel) Update(receiver Receiver) error {
channel := NewChannelFromReceiver(receiver, c.OrgID)
if channel == nil {
return errors.Newf(errors.TypeInvalidInput, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", c.ID)
}
if c.Name != channel.Name {
return errors.Newf(errors.TypeInvalidInput, ErrCodeAlertmanagerChannelNameMismatch, "cannot update channel name")
}
c.Data = channel.Data
c.UpdatedAt = time.Now()
return nil
}

View File

@@ -10,91 +10,6 @@ import (
"github.com/stretchr/testify/assert"
)
func TestNewChannelsFromAlertmanagerConfig(t *testing.T) {
testCases := []struct {
name string
orgID string
alertmanagerConfig *config.Config
expectedChannels Channels
}{
{
name: "DefaultReceiver",
orgID: "1",
alertmanagerConfig: &config.Config{
Receivers: []config.Receiver{
{
Name: DefaultReceiverName,
},
},
},
expectedChannels: Channels{},
},
{
name: "OneEmailReceiver",
orgID: "1",
alertmanagerConfig: &config.Config{
Receivers: []config.Receiver{
{
Name: "email-receiver",
EmailConfigs: []*config.EmailConfig{
{
To: "test@example.com",
},
},
},
},
},
expectedChannels: Channels{
"email-receiver": {
Name: "email-receiver",
Type: "email",
Data: `{"name":"email-receiver","email_configs":[{"send_resolved":false,"to":"test@example.com","smarthost":""}]}`,
OrgID: "1",
},
},
},
{
name: "OneSlackReceiver",
orgID: "1",
alertmanagerConfig: &config.Config{
Receivers: []config.Receiver{
{
Name: "slack-receiver",
SlackConfigs: []*config.SlackConfig{
{
Channel: "#alerts",
},
},
},
},
},
expectedChannels: Channels{
"slack-receiver": {
Name: "slack-receiver",
Type: "slack",
Data: `{"name":"slack-receiver","slack_configs":[{"send_resolved":false,"channel":"#alerts"}]}`,
OrgID: "1",
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualChannels := NewChannelsFromConfig(tc.alertmanagerConfig, tc.orgID)
assert.Equal(t, len(tc.expectedChannels), len(actualChannels))
for _, channel := range actualChannels {
expectedChannel, ok := tc.expectedChannels[channel.Name]
assert.True(t, ok)
assert.Equal(t, expectedChannel.Name, channel.Name)
assert.Equal(t, expectedChannel.Type, channel.Type)
assert.Equal(t, expectedChannel.Data, channel.Data)
assert.Equal(t, expectedChannel.OrgID, channel.OrgID)
}
})
}
}
func TestNewConfigFromChannels(t *testing.T) {
testCases := []struct {
name string
@@ -105,7 +20,7 @@ func TestNewConfigFromChannels(t *testing.T) {
{
name: "OneEmailChannel",
channels: Channels{
"email-receiver": {
{
Name: "email-receiver",
Type: "email",
Data: `{"name":"email-receiver","email_configs":[{"to":"test@example.com"}]}`,
@@ -117,7 +32,7 @@ func TestNewConfigFromChannels(t *testing.T) {
{
name: "OneSlackChannel",
channels: Channels{
"slack-receiver": {
{
Name: "slack-receiver",
Type: "slack",
Data: `{"name":"slack-receiver","slack_configs":[{"channel":"#alerts","api_url":"https://slack.com/api/test","send_resolved":true}]}`,
@@ -129,7 +44,7 @@ func TestNewConfigFromChannels(t *testing.T) {
{
name: "OnePagerdutyChannel",
channels: Channels{
"pagerduty-receiver": {
{
Name: "pagerduty-receiver",
Type: "pagerduty",
Data: `{"name":"pagerduty-receiver","pagerduty_configs":[{"service_key":"test"}]}`,
@@ -141,12 +56,12 @@ func TestNewConfigFromChannels(t *testing.T) {
{
name: "OnePagerdutyAndOneSlackChannel",
channels: Channels{
"pagerduty-receiver": {
{
Name: "pagerduty-receiver",
Type: "pagerduty",
Data: `{"name":"pagerduty-receiver","pagerduty_configs":[{"service_key":"test"}]}`,
},
"slack-receiver": {
{
Name: "slack-receiver",
Type: "slack",
Data: `{"name":"slack-receiver","slack_configs":[{"channel":"#alerts","api_url":"https://slack.com/api/test","send_resolved":true}]}`,

View File

@@ -5,10 +5,13 @@ import (
"crypto/md5"
"encoding/json"
"fmt"
"slices"
"time"
"dario.cat/mergo"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/pkg/labels"
commoncfg "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/uptrace/bun"
"go.signoz.io/signoz/pkg/errors"
@@ -41,6 +44,7 @@ type StoreableConfig struct {
ID uint64 `bun:"id,pk,autoincrement"`
Config string `bun:"config"`
Hash string `bun:"hash"`
CreatedAt time.Time `bun:"created_at"`
UpdatedAt time.Time `bun:"updated_at"`
OrgID string `bun:"org_id"`
@@ -53,25 +57,19 @@ type Config struct {
// storeableConfig is the representation of the config in the store
storeableConfig *StoreableConfig
// channels is the list of channels
channels Channels
// orgID is the organization ID
orgID string
}
func NewConfig(c *config.Config, orgID string) *Config {
channels := NewChannelsFromConfig(c, orgID)
raw := string(newRawFromConfig(c))
return &Config{
alertmanagerConfig: c,
storeableConfig: &StoreableConfig{
Config: string(newRawFromConfig(c)),
Config: raw,
Hash: fmt.Sprintf("%x", newConfigHash(raw)),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
OrgID: orgID,
},
channels: channels,
}
}
@@ -81,20 +79,12 @@ func NewConfigFromStoreableConfig(sc *StoreableConfig) (*Config, error) {
return nil, err
}
channels := NewChannelsFromConfig(alertmanagerConfig, sc.OrgID)
return &Config{
alertmanagerConfig: alertmanagerConfig,
storeableConfig: sc,
channels: channels,
orgID: sc.OrgID,
}, nil
}
func NewRouteFromReceiver(receiver Receiver) *config.Route {
return &config.Route{Receiver: receiver.Name, Continue: true}
}
func NewDefaultConfig(globalConfig GlobalConfig, routeConfig RouteConfig, orgID string) (*Config, error) {
err := mergo.Merge(&globalConfig, config.DefaultGlobalConfig())
if err != nil {
@@ -134,6 +124,43 @@ func newRawFromConfig(c *config.Config) []byte {
return b
}
func newConfigHash(s string) [16]byte {
return md5.Sum([]byte(s))
}
func (c *Config) SetGlobalConfig(globalConfig GlobalConfig) {
c.alertmanagerConfig.Global = &globalConfig
c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig))
c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config))
c.storeableConfig.UpdatedAt = time.Now()
}
func (c *Config) SetRouteConfig(routeConfig RouteConfig) {
c.alertmanagerConfig.Route = &config.Route{
Receiver: DefaultReceiverName,
GroupByStr: routeConfig.GroupByStr,
GroupInterval: (*model.Duration)(&routeConfig.GroupInterval),
GroupWait: (*model.Duration)(&routeConfig.GroupWait),
RepeatInterval: (*model.Duration)(&routeConfig.RepeatInterval),
}
c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig))
c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config))
c.storeableConfig.UpdatedAt = time.Now()
}
func (c *Config) UpdateRouteConfig(routeConfig RouteConfig) {
for _, route := range c.alertmanagerConfig.Route.Routes {
route.GroupByStr = routeConfig.GroupByStr
route.GroupInterval = (*model.Duration)(&routeConfig.GroupInterval)
route.GroupWait = (*model.Duration)(&routeConfig.GroupWait)
route.RepeatInterval = (*model.Duration)(&routeConfig.RepeatInterval)
}
c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig))
c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config))
c.storeableConfig.UpdatedAt = time.Now()
}
func (c *Config) AlertmanagerConfig() *config.Config {
return c.alertmanagerConfig
}
@@ -142,65 +169,43 @@ func (c *Config) StoreableConfig() *StoreableConfig {
return c.storeableConfig
}
func (c *Config) Channels() Channels {
return c.channels
}
func (c *Config) OrgID() string {
return c.orgID
}
func (c *Config) Raw() []byte {
return newRawFromConfig(c.alertmanagerConfig)
}
func (c *Config) Hash() [16]byte {
return md5.Sum(newRawFromConfig(c.alertmanagerConfig))
}
func (c *Config) CreateReceiver(route *config.Route, receiver config.Receiver) error {
if route == nil {
return errors.New(errors.TypeInvalidInput, ErrCodeAlertmanagerConfigInvalid, "route is nil")
}
if route.Receiver == "" || receiver.Name == "" {
func (c *Config) CreateReceiver(receiver config.Receiver) error {
if receiver.Name == "" {
return errors.New(errors.TypeInvalidInput, ErrCodeAlertmanagerConfigInvalid, "receiver is mandatory in route and receiver")
}
// check that receiver name is not already used
for _, existingReceiver := range c.alertmanagerConfig.Receivers {
if existingReceiver.Name == receiver.Name || existingReceiver.Name == route.Receiver {
if existingReceiver.Name == receiver.Name {
return errors.New(errors.TypeInvalidInput, ErrCodeAlertmanagerConfigConflict, "the receiver name has to be unique, please choose a different name")
}
}
// must set continue on route to allow subsequent
// routes to work. may have to rethink on this after
// adding matchers and filters in upstream
route.Continue = true
c.alertmanagerConfig.Route.Routes = append(c.alertmanagerConfig.Route.Routes, route)
c.alertmanagerConfig.Route.Routes = append(c.alertmanagerConfig.Route.Routes, newRouteFromReceiver(receiver))
c.alertmanagerConfig.Receivers = append(c.alertmanagerConfig.Receivers, receiver)
if err := c.alertmanagerConfig.UnmarshalYAML(func(i interface{}) error { return nil }); err != nil {
return err
}
channel := NewChannelFromReceiver(receiver, c.orgID)
if channel != nil {
c.channels[channel.Name] = channel
}
c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig))
c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config))
c.storeableConfig.UpdatedAt = time.Now()
return nil
}
func (c *Config) UpdateReceiver(route *config.Route, receiver config.Receiver) error {
if route == nil {
return errors.New(errors.TypeInvalidInput, ErrCodeAlertmanagerConfigInvalid, "route is nil")
func (c *Config) GetReceiver(name string) (Receiver, error) {
for _, receiver := range c.alertmanagerConfig.Receivers {
if receiver.Name == name {
return receiver, nil
}
}
if route.Receiver == "" || receiver.Name == "" {
return Receiver{}, errors.Newf(errors.TypeInvalidInput, ErrCodeAlertmanagerChannelNotFound, "channel with name %q not found", name)
}
func (c *Config) UpdateReceiver(receiver config.Receiver) error {
if receiver.Name == "" {
return errors.New(errors.TypeInvalidInput, ErrCodeAlertmanagerConfigInvalid, "receiver is mandatory in route and receiver")
}
@@ -208,24 +213,12 @@ func (c *Config) UpdateReceiver(route *config.Route, receiver config.Receiver) e
for i, existingReceiver := range c.alertmanagerConfig.Receivers {
if existingReceiver.Name == receiver.Name {
c.alertmanagerConfig.Receivers[i] = receiver
channel := NewChannelFromReceiver(receiver, c.orgID)
if channel != nil {
c.channels[channel.Name] = channel
c.channels[channel.Name].UpdatedAt = time.Now()
}
break
}
}
routes := c.alertmanagerConfig.Route.Routes
for i, existingRoute := range routes {
if existingRoute.Receiver == route.Receiver {
c.alertmanagerConfig.Route.Routes[i] = route
break
}
}
c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig))
c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config))
c.storeableConfig.UpdatedAt = time.Now()
return nil
@@ -247,31 +240,121 @@ func (c *Config) DeleteReceiver(name string) error {
for i, existingReceiver := range c.alertmanagerConfig.Receivers {
if existingReceiver.Name == name {
c.alertmanagerConfig.Receivers = append(c.alertmanagerConfig.Receivers[:i], c.alertmanagerConfig.Receivers[i+1:]...)
delete(c.channels, name)
break
}
}
c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig))
c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config))
c.storeableConfig.UpdatedAt = time.Now()
return nil
}
func (c *Config) CreateRuleIDMatcher(ruleID string, receiverNames []string) error {
if c.alertmanagerConfig.Route == nil {
return errors.New(errors.TypeInvalidInput, ErrCodeAlertmanagerConfigInvalid, "route is nil")
}
routes := c.alertmanagerConfig.Route.Routes
for i, route := range routes {
if slices.Contains(receiverNames, route.Receiver) {
matcher, err := labels.NewMatcher(labels.MatchEqual, "ruleId", ruleID)
if err != nil {
return err
}
c.alertmanagerConfig.Route.Routes[i].Matchers = append(c.alertmanagerConfig.Route.Routes[i].Matchers, matcher)
}
}
c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig))
c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config))
c.storeableConfig.UpdatedAt = time.Now()
return nil
}
func (c *Config) UpdateRuleIDMatcher(ruleID string, receiverNames []string) error {
err := c.DeleteRuleIDMatcher(ruleID)
if err != nil {
return err
}
return c.CreateRuleIDMatcher(ruleID, receiverNames)
}
func (c *Config) DeleteRuleIDMatcher(ruleID string) error {
routes := c.alertmanagerConfig.Route.Routes
for i, r := range routes {
j := slices.IndexFunc(r.Matchers, func(m *labels.Matcher) bool {
return m.Name == "ruleId" && m.Value == ruleID
})
if j != -1 {
c.alertmanagerConfig.Route.Routes[i].Matchers = slices.Delete(r.Matchers, j, j+1)
}
}
c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig))
c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config))
c.storeableConfig.UpdatedAt = time.Now()
return nil
}
func (c *Config) ReceiverNamesFromRuleID(ruleID string) ([]string, error) {
receiverNames := make([]string, 0)
routes := c.alertmanagerConfig.Route.Routes
for _, r := range routes {
for _, m := range r.Matchers {
if m.Name == "ruleId" && m.Value == ruleID {
receiverNames = append(receiverNames, r.Receiver)
}
}
}
return receiverNames, nil
}
type ConfigStore interface {
// Set creates or updates a config.
Set(context.Context, *Config, func(context.Context) error) error
Set(context.Context, *Config) error
// Get returns the config for the given orgID
Get(context.Context, string) (*Config, error)
// ListOrgs returns the list of orgs
ListOrgs(context.Context) ([]string, error)
// CreateChannel creates a new channel.
CreateChannel(context.Context, *Channel, func(context.Context) error) error
// GetChannelByID returns the channel for the given id.
GetChannelByID(context.Context, string, int) (*Channel, error)
// UpdateChannel updates a channel.
UpdateChannel(context.Context, string, *Channel, func(context.Context) error) error
// DeleteChannelByID deletes a channel.
DeleteChannelByID(context.Context, string, int, func(context.Context) error) error
// ListChannels returns the list of channels.
ListChannels(context.Context, string) ([]*Channel, error)
// ListAllChannels returns the list of channels for all organizations.
ListAllChannels(context.Context) ([]*Channel, error)
// GetMatchers gets a list of matchers per organization.
// Matchers is an array of ruleId to receiver names.
GetMatchers(context.Context, string) (map[string][]string, error)
}
var ConfigStoreNoopCallback = func(ctx context.Context) error { return nil }
// MarshalSecretValue if set to true will expose Secret type
// through the marshal interfaces. We need to store the actual value of the secret
// in the database, so we need to set this to true.
func init() {
commoncfg.MarshalSecretValue = true
config.MarshalSecretValue = true
}

View File

@@ -0,0 +1,215 @@
package alertmanagertypes
import (
"encoding/json"
"net/url"
"testing"
"github.com/prometheus/alertmanager/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCreateRuleIDMatcher(t *testing.T) {
testCases := []struct {
name string
orgID string
receivers []config.Receiver
ruleIDToReceivers map[string][]string
expectedRoutes []map[string]any
}{
{
name: "OneSlackReceiver",
orgID: "1",
receivers: []config.Receiver{
{
Name: "slack-receiver",
SlackConfigs: []*config.SlackConfig{
{
Channel: "#alerts",
APIURL: &config.SecretURL{URL: &url.URL{Scheme: "https", Host: "slack.com", Path: "/api/test"}},
},
},
},
},
ruleIDToReceivers: map[string][]string{"test-rule": {"slack-receiver"}},
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}},
},
{
name: "SlackAndEmailReceivers",
orgID: "1",
receivers: []config.Receiver{
{
Name: "slack-receiver",
SlackConfigs: []*config.SlackConfig{
{
Channel: "#alerts",
APIURL: &config.SecretURL{URL: &url.URL{Scheme: "https", Host: "slack.com", Path: "/api/test"}},
},
},
},
{
Name: "email-receiver",
EmailConfigs: []*config.EmailConfig{
{
To: "test@example.com",
},
},
},
},
ruleIDToReceivers: map[string][]string{"test-rule": {"slack-receiver", "email-receiver"}},
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}, {"receiver": "email-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}},
},
{
name: "ReceiverDoesNotExist",
orgID: "1",
receivers: []config.Receiver{
{
Name: "slack-receiver",
SlackConfigs: []*config.SlackConfig{
{
Channel: "#alerts",
APIURL: &config.SecretURL{URL: &url.URL{Scheme: "https", Host: "slack.com", Path: "/api/test"}},
},
},
},
},
ruleIDToReceivers: map[string][]string{"test-rule": {"does-not-exist"}},
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true}},
},
{
name: "MultipleAlertsOnOneSlackReceiver",
orgID: "1",
receivers: []config.Receiver{
{
Name: "slack-receiver",
SlackConfigs: []*config.SlackConfig{
{
Channel: "#alerts",
APIURL: &config.SecretURL{URL: &url.URL{Scheme: "https", Host: "slack.com", Path: "/api/test"}},
},
},
},
},
ruleIDToReceivers: map[string][]string{"test-rule-1": {"slack-receiver", "does-not-exist"}, "test-rule-2": {"slack-receiver"}},
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule-1\"", "ruleId=\"test-rule-2\""}}},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cfg, err := NewDefaultConfig(GlobalConfig{SMTPSmarthost: config.HostPort{Host: "localhost", Port: "25"}, SMTPFrom: "test@example.com"}, RouteConfig{}, tc.orgID)
require.NoError(t, err)
for _, receiver := range tc.receivers {
err := cfg.CreateReceiver(receiver)
require.NoError(t, err)
}
for ruleID, receiverNames := range tc.ruleIDToReceivers {
err = cfg.CreateRuleIDMatcher(ruleID, receiverNames)
assert.NoError(t, err)
}
routes, err := json.Marshal(cfg.alertmanagerConfig.Route.Routes)
require.NoError(t, err)
var actualRoutes []map[string]any
err = json.Unmarshal(routes, &actualRoutes)
require.NoError(t, err)
assert.ElementsMatch(t, tc.expectedRoutes, actualRoutes)
})
}
}
func TestDeleteRuleIDMatcher(t *testing.T) {
testCases := []struct {
name string
orgID string
receivers []config.Receiver
ruleIDToReceivers map[string][]string
ruleIDsToDelete []string
expectedRoutes []map[string]any
}{
{
name: "DeleteEmailAndSlackMatchers",
orgID: "1",
receivers: []config.Receiver{
{
Name: "slack-receiver",
SlackConfigs: []*config.SlackConfig{
{
Channel: "#alerts",
APIURL: &config.SecretURL{URL: &url.URL{Scheme: "https", Host: "slack.com", Path: "/api/test"}},
},
},
},
{
Name: "email-receiver",
EmailConfigs: []*config.EmailConfig{
{
To: "test@example.com",
},
},
},
},
ruleIDToReceivers: map[string][]string{"test-rule": {"email-receiver", "slack-receiver"}},
ruleIDsToDelete: []string{"test-rule"},
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true}, {"receiver": "email-receiver", "continue": true}},
},
{
name: "AlertNameDoesNotExist",
orgID: "1",
receivers: []config.Receiver{
{
Name: "slack-receiver",
SlackConfigs: []*config.SlackConfig{
{
Channel: "#alerts",
APIURL: &config.SecretURL{URL: &url.URL{Scheme: "https", Host: "slack.com", Path: "/api/test"}},
},
},
},
{
Name: "email-receiver",
EmailConfigs: []*config.EmailConfig{
{
To: "test@example.com",
},
},
},
},
ruleIDToReceivers: map[string][]string{"test-rule": {"email-receiver", "slack-receiver"}},
ruleIDsToDelete: []string{"does-not-exist"},
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}, {"receiver": "email-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cfg, err := NewDefaultConfig(GlobalConfig{SMTPSmarthost: config.HostPort{Host: "localhost", Port: "25"}, SMTPFrom: "test@example.com"}, RouteConfig{}, tc.orgID)
require.NoError(t, err)
for _, receiver := range tc.receivers {
err := cfg.CreateReceiver(receiver)
require.NoError(t, err)
}
for ruleID, receiverNames := range tc.ruleIDToReceivers {
err = cfg.CreateRuleIDMatcher(ruleID, receiverNames)
require.NoError(t, err)
}
for _, ruleID := range tc.ruleIDsToDelete {
err = cfg.DeleteRuleIDMatcher(ruleID)
assert.NoError(t, err)
}
routes, err := json.Marshal(cfg.alertmanagerConfig.Route.Routes)
require.NoError(t, err)
var actualRoutes []map[string]any
err = json.Unmarshal(routes, &actualRoutes)
require.NoError(t, err)
assert.ElementsMatch(t, tc.expectedRoutes, actualRoutes)
})
}
}

View File

@@ -29,16 +29,17 @@ func NewReceiver(input string) (Receiver, error) {
return receiver, nil
}
func newRouteFromReceiver(receiver Receiver) *config.Route {
return &config.Route{Receiver: receiver.Name, Continue: true}
}
func NewReceiverIntegrations(nc Receiver, tmpl *template.Template, logger *slog.Logger) ([]notify.Integration, error) {
return receiver.BuildReceiverIntegrations(nc, tmpl, logger)
}
func TestReceiver(ctx context.Context, receiver Receiver, tmpl *template.Template, logger *slog.Logger) error {
now := time.Now()
testAlert := NewTestAlert(receiver, now, now)
ctx = notify.WithGroupKey(ctx, fmt.Sprintf("%s-%s-%d", receiver.Name, testAlert.Labels.Fingerprint(), now.Unix()))
ctx = notify.WithGroupLabels(ctx, testAlert.Labels)
func TestReceiver(ctx context.Context, receiver Receiver, tmpl *template.Template, logger *slog.Logger, alert *Alert) error {
ctx = notify.WithGroupKey(ctx, fmt.Sprintf("%s-%s-%d", receiver.Name, alert.Labels.Fingerprint(), time.Now().Unix()))
ctx = notify.WithGroupLabels(ctx, alert.Labels)
ctx = notify.WithReceiverName(ctx, receiver.Name)
integrations, err := NewReceiverIntegrations(receiver, tmpl, logger)
@@ -46,7 +47,7 @@ func TestReceiver(ctx context.Context, receiver Receiver, tmpl *template.Templat
return err
}
if _, err = integrations[0].Notify(ctx, testAlert); err != nil {
if _, err = integrations[0].Notify(ctx, alert); err != nil {
return err
}