Compare commits

...

5 Commits

Author SHA1 Message Date
grandwizard28
2b4340a3e1 feat(legacyalertmanager): fix get alerts 2025-02-20 19:29:27 +05:30
grandwizard28
309ac976cd feat(legacyalertmanager): integrate 2025-02-20 18:54:07 +05:30
grandwizard28
5ab0d77087 feat(alertmanager): add tests for config 2025-02-20 17:09:47 +05:30
grandwizard28
307afa02ff feat(sqlmigration): add org_id foreign key in notification_channels 2025-02-20 16:33:31 +05:30
grandwizard28
b9e29424ce feat(legacyalertmanager): add functions for channels 2025-02-20 15:52:16 +05:30
34 changed files with 804 additions and 361 deletions

2
.gitignore vendored
View File

@@ -76,3 +76,5 @@ dist/
# ignore user_scripts that is fetched by init-clickhouse
deploy/common/clickhouse/user_scripts/
queries.active

View File

@@ -11,6 +11,7 @@ import (
"go.signoz.io/signoz/ee/query-service/interfaces"
"go.signoz.io/signoz/ee/query-service/license"
"go.signoz.io/signoz/ee/query-service/usage"
"go.signoz.io/signoz/pkg/alertmanager"
baseapp "go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
"go.signoz.io/signoz/pkg/query-service/app/integrations"
@@ -20,6 +21,7 @@ import (
basemodel "go.signoz.io/signoz/pkg/query-service/model"
rules "go.signoz.io/signoz/pkg/query-service/rules"
"go.signoz.io/signoz/pkg/query-service/version"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/types/authtypes"
)
@@ -51,7 +53,7 @@ type APIHandler struct {
}
// NewAPIHandler returns an APIHandler
func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler, error) {
baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{
Reader: opts.DataConnector,
@@ -67,6 +69,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
FluxInterval: opts.FluxInterval,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
})
if err != nil {

View File

@@ -267,7 +267,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
JWT: serverOptions.Jwt,
}
apiHandler, err := api.NewAPIHandler(apiOpts)
apiHandler, err := api.NewAPIHandler(apiOpts, serverOptions.SigNoz)
if err != nil {
return nil, err
}

View File

@@ -7,7 +7,6 @@ import (
"os"
"os/signal"
"strconv"
"syscall"
"time"
"go.opentelemetry.io/otel/sdk/resource"
@@ -198,16 +197,19 @@ func main() {
zap.L().Fatal("Failed to initialize auth cache", zap.Error(err))
}
signalsChannel := make(chan os.Signal, 1)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
signoz.Start(context.Background())
for {
select {
case status := <-server.HealthCheckStatus():
zap.L().Info("Received HealthCheck status: ", zap.Int("status", int(status)))
case <-signalsChannel:
zap.L().Fatal("Received OS Interrupt Signal ... ")
server.Stop()
}
if err := signoz.Wait(context.Background()); err != nil {
zap.L().Fatal("Failed to start signoz", zap.Error(err))
}
err = server.Stop()
if err != nil {
zap.L().Fatal("Failed to stop server", zap.Error(err))
}
err = signoz.Stop(context.Background())
if err != nil {
zap.L().Fatal("Failed to stop signoz", zap.Error(err))
}
}

3
go.mod
View File

@@ -55,6 +55,7 @@ require (
github.com/soheilhy/cmux v0.1.5
github.com/srikanthccv/ClickHouse-go-mock v0.9.0
github.com/stretchr/testify v1.10.0
github.com/tidwall/gjson v1.18.0
github.com/uptrace/bun v1.2.9
github.com/uptrace/bun/dialect/pgdialect v1.2.9
github.com/uptrace/bun/dialect/sqlitedialect v1.2.9
@@ -204,6 +205,8 @@ require (
github.com/smarty/assertions v1.15.0 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect

6
go.sum
View File

@@ -891,7 +891,13 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4=
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4=

View File

@@ -15,7 +15,7 @@ var (
type Alertmanager interface {
factory.Service
// GetAlerts gets the alerts from the alertmanager per organization.
GetAlerts(context.Context, string, alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error)
GetAlerts(context.Context, string, alertmanagertypes.GettableAlertsParams) (alertmanagertypes.DeprecatedGettableAlerts, error)
// PutAlerts puts the alerts into the alertmanager per organization.
PutAlerts(context.Context, string, alertmanagertypes.PostableAlerts) error
@@ -26,11 +26,14 @@ type Alertmanager interface {
// ListChannels lists all channels for the organization.
ListChannels(context.Context, string) ([]*alertmanagertypes.Channel, error)
// ListAllChannels lists all channels for all organizations. It is used by the legacy alertmanager only.
ListAllChannels(context.Context) ([]*alertmanagertypes.Channel, error)
// GetChannelByID gets a channel for the organization.
GetChannelByID(context.Context, string, int) (*alertmanagertypes.Channel, error)
// UpdateChannel updates a channel for the organization.
UpdateChannelByReceiver(context.Context, string, alertmanagertypes.Receiver) error
UpdateChannelByReceiverAndID(context.Context, string, alertmanagertypes.Receiver, int) error
// CreateChannel creates a channel for the organization.
CreateChannel(context.Context, string, alertmanagertypes.Receiver) error

View File

@@ -9,9 +9,11 @@ import (
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
// Notifier is responsible for dispatching alert notifications to an alertmanager.
// Batcher is responsible for batching alerts and broadcasting them on a channel.
type Batcher struct {
// C is the channel on which alerts are sent to alertmanager
C chan alertmanagertypes.PostableAlerts
// logger
logger *slog.Logger
@@ -23,14 +25,21 @@ type Batcher struct {
// more channel to signal the sender goroutine to send alerts
moreC chan struct{}
// stop channel to signal the sender goroutine to stop
stopC chan struct{}
mtx sync.RWMutex
// mutex to synchronize access to the queue
queueMtx sync.RWMutex
// wait group to wait for all goroutines to finish
goroutinesWg sync.WaitGroup
}
func New(logger *slog.Logger, config Config) *Batcher {
batcher := &Batcher{
logger: logger,
queue: make(alertmanagertypes.PostableAlerts, config.Capacity),
queue: make(alertmanagertypes.PostableAlerts, 0, config.Capacity),
config: config,
moreC: make(chan struct{}, 1),
stopC: make(chan struct{}),
@@ -41,22 +50,27 @@ func New(logger *slog.Logger, config Config) *Batcher {
}
// Start dispatches notifications continuously.
func (n *Batcher) Start(ctx context.Context) error {
func (batcher *Batcher) Start(ctx context.Context) error {
batcher.goroutinesWg.Add(1)
go func() {
n.logger.InfoContext(ctx, "starting alertmanager batcher")
defer batcher.goroutinesWg.Done()
for {
select {
case <-ctx.Done():
case <-batcher.stopC:
for batcher.queueLen() > 0 {
alerts := batcher.next()
batcher.C <- alerts
}
close(batcher.C)
return
case <-n.stopC:
return
case <-n.moreC:
case <-batcher.moreC:
}
alerts := n.nextBatch()
n.C <- alerts
alerts := batcher.next()
batcher.C <- alerts
// If the queue still has items left, kick off the next iteration.
if n.queueLen() > 0 {
n.setMore()
if batcher.queueLen() > 0 {
batcher.setMore()
}
}
}()
@@ -64,69 +78,67 @@ func (n *Batcher) Start(ctx context.Context) error {
return nil
}
func (n *Batcher) queueLen() int {
n.mtx.RLock()
defer n.mtx.RUnlock()
// Add queues the given notification requests for processing.
func (batcher *Batcher) Add(ctx context.Context, alerts ...*alertmanagertypes.PostableAlert) {
batcher.queueMtx.Lock()
defer batcher.queueMtx.Unlock()
return len(n.queue)
// Queue capacity should be significantly larger than a single alert
// batch could be.
if d := len(alerts) - batcher.config.Capacity; d > 0 {
alerts = alerts[d:]
batcher.logger.WarnContext(ctx, "alert batch larger than queue capacity, dropping alerts", "num_dropped", d, "capacity", batcher.config.Capacity)
}
// If the queue is full, remove the oldest alerts in favor
// of newer ones.
if d := (len(batcher.queue) + len(alerts)) - batcher.config.Capacity; d > 0 {
batcher.queue = batcher.queue[d:]
batcher.logger.WarnContext(ctx, "alert batch queue full, dropping alerts", "num_dropped", d)
}
batcher.queue = append(batcher.queue, alerts...)
// Notify sending goroutine that there are alerts to be processed.
batcher.setMore()
}
func (n *Batcher) nextBatch() alertmanagertypes.PostableAlerts {
n.mtx.Lock()
defer n.mtx.Unlock()
// Stop shuts down the batcher.
func (batcher *Batcher) Stop(ctx context.Context) {
close(batcher.stopC)
batcher.goroutinesWg.Wait()
}
func (batcher *Batcher) queueLen() int {
batcher.queueMtx.RLock()
defer batcher.queueMtx.RUnlock()
return len(batcher.queue)
}
func (batcher *Batcher) next() alertmanagertypes.PostableAlerts {
batcher.queueMtx.Lock()
defer batcher.queueMtx.Unlock()
var alerts alertmanagertypes.PostableAlerts
if len(n.queue) > n.config.Size {
alerts = append(make(alertmanagertypes.PostableAlerts, 0, n.config.Size), n.queue[:n.config.Size]...)
n.queue = n.queue[n.config.Size:]
if len(batcher.queue) > batcher.config.Size {
alerts = append(make(alertmanagertypes.PostableAlerts, 0, batcher.config.Size), batcher.queue[:batcher.config.Size]...)
batcher.queue = batcher.queue[batcher.config.Size:]
} else {
alerts = append(make(alertmanagertypes.PostableAlerts, 0, len(n.queue)), n.queue...)
n.queue = n.queue[:0]
alerts = append(make(alertmanagertypes.PostableAlerts, 0, len(batcher.queue)), batcher.queue...)
batcher.queue = batcher.queue[:0]
}
return alerts
}
// Send queues the given notification requests for processing.
// Panics if called on a handler that is not running.
func (n *Batcher) Send(ctx context.Context, alerts ...*alertmanagertypes.PostableAlert) {
n.mtx.Lock()
defer n.mtx.Unlock()
// Queue capacity should be significantly larger than a single alert
// batch could be.
if d := len(alerts) - n.config.Capacity; d > 0 {
alerts = alerts[d:]
n.logger.WarnContext(ctx, "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d)
}
// If the queue is full, remove the oldest alerts in favor
// of newer ones.
if d := (len(n.queue) + len(alerts)) - n.config.Capacity; d > 0 {
n.queue = n.queue[d:]
n.logger.WarnContext(ctx, "Alert notification queue full, dropping alerts", "num_dropped", d)
}
n.queue = append(n.queue, alerts...)
// Notify sending goroutine that there are alerts to be processed.
n.setMore()
}
// setMore signals that the alert queue has items.
func (n *Batcher) setMore() {
func (batcher *Batcher) setMore() {
// If we cannot send on the channel, it means the signal already exists
// and has not been consumed yet.
select {
case n.moreC <- struct{}{}:
case batcher.moreC <- struct{}{}:
default:
}
}
// Stop shuts down the notification handler.
func (n *Batcher) Stop(ctx context.Context) {
n.logger.InfoContext(ctx, "Stopping alertmanager batcher")
close(n.moreC)
close(n.stopC)
}

View File

@@ -0,0 +1,64 @@
package alertmanagerbatcher
import (
"context"
"io"
"log/slog"
"testing"
"github.com/stretchr/testify/assert"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
func TestBatcherWithOneAlertAndDefaultConfigs(t *testing.T) {
batcher := New(slog.New(slog.NewTextHandler(io.Discard, nil)), NewConfig())
batcher.Start(context.Background())
batcher.Add(context.Background(), &alertmanagertypes.PostableAlert{Alert: alertmanagertypes.AlertModel{
Labels: map[string]string{"alertname": "test"},
}})
alerts := <-batcher.C
assert.Equal(t, 1, len(alerts))
batcher.Stop(context.Background())
}
func TestBatcherWithBatchSize(t *testing.T) {
batcher := New(slog.New(slog.NewTextHandler(io.Discard, nil)), Config{Size: 2, Capacity: 4})
batcher.Start(context.Background())
var alerts alertmanagertypes.PostableAlerts
for i := 0; i < 4; i++ {
alerts = append(alerts, &alertmanagertypes.PostableAlert{Alert: alertmanagertypes.AlertModel{
Labels: map[string]string{"alertname": "test"},
}})
}
batcher.Add(context.Background(), alerts...)
for i := 0; i < 2; i++ {
alerts := <-batcher.C
assert.Equal(t, 2, len(alerts))
}
batcher.Stop(context.Background())
}
func TestBatcherWithCClosed(t *testing.T) {
batcher := New(slog.New(slog.NewTextHandler(io.Discard, nil)), Config{Size: 2, Capacity: 4})
batcher.Start(context.Background())
var alerts alertmanagertypes.PostableAlerts
for i := 0; i < 4; i++ {
alerts = append(alerts, &alertmanagertypes.PostableAlert{Alert: alertmanagertypes.AlertModel{
Labels: map[string]string{"alertname": "test"},
}})
}
batcher.Add(context.Background(), alerts...)
batcher.Stop(context.Background())
for alerts := range batcher.C {
assert.Equal(t, 2, len(alerts))
}
}

View File

@@ -1,13 +1,16 @@
package alertmanagerbatcher
type Config struct {
// Capacity is the maximum number of alerts that can be buffered in the batcher.
Capacity int
Size int
// Size is the number of alerts to send in each batch.
Size int
}
func NewConfig() Config {
return Config{
Capacity: 1000,
Capacity: 10000,
Size: 64,
}
}

View File

@@ -45,7 +45,7 @@ func (store *config) Get(ctx context.Context, orgID string) (*alertmanagertypes.
}
// Set implements alertmanagertypes.ConfigStore.
func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config, cb func(context.Context) error) error {
func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config) error {
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
if err != nil {
return err
@@ -75,12 +75,6 @@ func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config,
}
}
if cb != nil {
if err = cb(ctx); err != nil {
return err
}
}
if err = tx.Commit(); err != nil {
return err
}
@@ -104,3 +98,145 @@ func (store *config) ListOrgs(ctx context.Context) ([]string, error) {
return orgIDs, nil
}
func (store *config) CreateChannel(ctx context.Context, channel *alertmanagertypes.Channel, cb func(context.Context) error) error {
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() //nolint:errcheck
if _, err = tx.NewInsert().
Model(channel).
Exec(ctx); err != nil {
return err
}
if cb != nil {
if err = cb(ctx); err != nil {
return err
}
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}
func (store *config) GetChannelByID(ctx context.Context, orgID string, id int) (*alertmanagertypes.Channel, error) {
channel := new(alertmanagertypes.Channel)
err := store.
sqlstore.
BunDB().
NewSelect().
Model(channel).
Where("org_id = ?", orgID).
Where("id = ?", id).
Scan(ctx)
if err != nil {
if err == sql.ErrNoRows {
return nil, errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", id)
}
return nil, err
}
return channel, nil
}
func (store *config) UpdateChannel(ctx context.Context, orgID string, channel *alertmanagertypes.Channel, cb func(context.Context) error) error {
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() //nolint:errcheck
_, err = tx.NewUpdate().
Model(channel).
WherePK().
Exec(ctx)
if err != nil {
return err
}
if cb != nil {
if err = cb(ctx); err != nil {
return err
}
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}
func (store *config) DeleteChannelByID(ctx context.Context, orgID string, id int, cb func(context.Context) error) error {
channel := new(alertmanagertypes.Channel)
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() //nolint:errcheck
_, err = tx.NewDelete().
Model(channel).
Where("org_id = ?", orgID).
Where("id = ?", id).
Exec(ctx)
if err != nil {
return err
}
if cb != nil {
if err = cb(ctx); err != nil {
return err
}
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}
func (store *config) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) {
var channels []*alertmanagertypes.Channel
err := store.
sqlstore.
BunDB().
NewSelect().
Model(&channels).
Where("org_id = ?", orgID).
Scan(ctx)
if err != nil {
return nil, err
}
return channels, nil
}
func (store *config) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) {
var channels []*alertmanagertypes.Channel
err := store.
sqlstore.
BunDB().
NewSelect().
Model(&channels).
Scan(ctx)
if err != nil {
return nil, err
}
return channels, nil
}

View File

@@ -24,7 +24,7 @@ func NewAPI(alertmanager Alertmanager) *API {
}
}
func (api *API) GetAlerts(req *http.Request, rw http.ResponseWriter) {
func (api *API) GetAlerts(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()
@@ -49,7 +49,7 @@ func (api *API) GetAlerts(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusOK, alerts)
}
func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) {
func (api *API) TestReceiver(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()
@@ -81,7 +81,7 @@ func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusNoContent, nil)
}
func (api *API) ListChannels(req *http.Request, rw http.ResponseWriter) {
func (api *API) ListChannels(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()
@@ -100,7 +100,20 @@ func (api *API) ListChannels(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusOK, channels)
}
func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) {
func (api *API) ListAllChannels(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()
channels, err := api.alertmanager.ListAllChannels(ctx)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, channels)
}
func (api *API) GetChannelByID(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()
@@ -137,7 +150,7 @@ func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusOK, channel)
}
func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
func (api *API) UpdateChannelByID(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()
@@ -147,6 +160,24 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
return
}
vars := mux.Vars(req)
if vars == nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path"))
return
}
idString, ok := vars["id"]
if !ok {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path"))
return
}
id, err := strconv.Atoi(idString)
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid integer"))
return
}
body, err := io.ReadAll(req.Body)
if err != nil {
render.Error(rw, err)
@@ -160,7 +191,7 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
return
}
err = api.alertmanager.UpdateChannelByReceiver(ctx, claims.OrgID, receiver)
err = api.alertmanager.UpdateChannelByReceiverAndID(ctx, claims.OrgID, receiver, id)
if err != nil {
render.Error(rw, err)
return
@@ -169,7 +200,7 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusNoContent, nil)
}
func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) {
func (api *API) DeleteChannelByID(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()
@@ -206,7 +237,7 @@ func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusNoContent, nil)
}
func (api *API) CreateChannel(req *http.Request, rw http.ResponseWriter) {
func (api *API) CreateChannel(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()

View File

@@ -1,6 +1,8 @@
package alertmanager
import (
"errors"
"fmt"
"net/url"
"time"
@@ -9,9 +11,6 @@ import (
)
type Config struct {
// Config is the config for the alertmanager server.
alertmanagerserver.Config `mapstructure:",squash"`
// Provider is the provider for the alertmanager service.
Provider string `mapstructure:"provider"`
@@ -25,11 +24,14 @@ type Config struct {
type Signoz struct {
// PollInterval is the interval at which the alertmanager is synced.
PollInterval time.Duration `mapstructure:"poll_interval"`
// Config is the config for the alertmanager server.
alertmanagerserver.Config `mapstructure:",squash"`
}
type Legacy struct {
// URL is the URL of the legacy alertmanager.
URL *url.URL `mapstructure:"url"`
// ApiURL is the URL of the legacy signoz alertmanager.
ApiURL string `mapstructure:"api_url"`
}
func NewConfigFactory() factory.ConfigFactory {
@@ -38,14 +40,28 @@ func NewConfigFactory() factory.ConfigFactory {
func newConfig() factory.Config {
return Config{
Config: alertmanagerserver.NewConfig(),
Provider: "signoz",
Provider: "legacy",
Legacy: Legacy{
ApiURL: "http://alertmanager:9093/api",
},
Signoz: Signoz{
PollInterval: 15 * time.Second,
Config: alertmanagerserver.NewConfig(),
},
}
}
func (c Config) Validate() error {
if c.Provider == "legacy" {
if c.Legacy.ApiURL == "" {
return errors.New("api_url is required")
}
_, err := url.Parse(c.Legacy.ApiURL)
if err != nil {
return fmt.Errorf("api_url %q is invalid: %w", c.Legacy.ApiURL, err)
}
}
return nil
}

View File

@@ -0,0 +1,48 @@
package alertmanager
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/config"
"go.signoz.io/signoz/pkg/config/envprovider"
"go.signoz.io/signoz/pkg/factory"
)
func TestNewWithEnvProvider(t *testing.T) {
t.Setenv("SIGNOZ_ALERTMANAGER_PROVIDER", "legacy")
t.Setenv("SIGNOZ_ALERTMANAGER_LEGACY_API__URL", "http://localhost:9093/api")
conf, err := config.New(
context.Background(),
config.ResolverConfig{
Uris: []string{"env:"},
ProviderFactories: []config.ProviderFactory{
envprovider.NewFactory(),
},
},
[]factory.ConfigFactory{
NewConfigFactory(),
},
)
require.NoError(t, err)
actual := &Config{}
err = conf.Unmarshal("alertmanager", actual)
require.NoError(t, err)
def := NewConfigFactory().New().(Config)
expected := &Config{
Provider: "legacy",
Legacy: Legacy{
ApiURL: "http://localhost:9093/api",
},
Signoz: def.Signoz,
}
assert.Equal(t, expected, actual)
assert.NoError(t, actual.Validate())
}

View File

@@ -7,8 +7,10 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"time"
"github.com/tidwall/gjson"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerbatcher"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
@@ -29,6 +31,7 @@ type provider struct {
client *http.Client
configStore alertmanagertypes.ConfigStore
batcher *alertmanagerbatcher.Batcher
url *url.URL
}
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
@@ -41,6 +44,11 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/legacyalertmanager")
configStore := sqlalertmanagerstore.NewConfigStore(sqlstore)
url, err := url.Parse(config.Legacy.ApiURL)
if err != nil {
return nil, err
}
return &provider{
config: config,
settings: settings,
@@ -49,6 +57,7 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
},
configStore: configStore,
batcher: alertmanagerbatcher.New(settings.Logger(), alertmanagerbatcher.NewConfig()),
url: url,
}, nil
}
@@ -57,22 +66,18 @@ func (provider *provider) Start(ctx context.Context) error {
if err != nil {
return err
}
defer provider.batcher.Stop(ctx)
for {
select {
case <-ctx.Done():
return nil
case alerts := <-provider.batcher.C:
if err := provider.putAlerts(ctx, "", alerts); err != nil {
provider.settings.Logger().Error("failed to send alerts to alertmanager", "error", err)
}
for alerts := range provider.batcher.C {
if err := provider.putAlerts(ctx, "", alerts); err != nil {
provider.settings.Logger().Error("failed to send alerts to alertmanager", "error", err)
}
}
return nil
}
func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
url := provider.config.Legacy.URL.JoinPath(alertsPath)
func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.DeprecatedGettableAlerts, error) {
url := provider.url.JoinPath(alertsPath)
url.RawQuery = params.RawQuery
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil)
@@ -92,8 +97,12 @@ func (provider *provider) GetAlerts(ctx context.Context, orgID string, params al
return nil, err
}
var alerts alertmanagertypes.GettableAlerts
if err := json.Unmarshal(body, &alerts); err != nil {
if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("bad response status %v", resp.Status)
}
var alerts alertmanagertypes.DeprecatedGettableAlerts
if err := json.Unmarshal([]byte(gjson.GetBytes(body, "data").Raw), &alerts); err != nil {
return nil, err
}
@@ -101,12 +110,12 @@ func (provider *provider) GetAlerts(ctx context.Context, orgID string, params al
}
func (provider *provider) PutAlerts(ctx context.Context, _ string, alerts alertmanagertypes.PostableAlerts) error {
provider.batcher.Send(ctx, alerts...)
provider.batcher.Add(ctx, alerts...)
return nil
}
func (provider *provider) putAlerts(ctx context.Context, _ string, alerts alertmanagertypes.PostableAlerts) error {
url := provider.config.Legacy.URL.JoinPath(alertsPath)
url := provider.url.JoinPath(alertsPath)
body, err := json.Marshal(alerts)
if err != nil {
@@ -135,7 +144,7 @@ func (provider *provider) putAlerts(ctx context.Context, _ string, alerts alertm
}
func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
url := provider.config.Legacy.URL.JoinPath(testReceiverPath)
url := provider.url.JoinPath(testReceiverPath)
body, err := json.Marshal(receiver)
if err != nil {
@@ -164,48 +173,30 @@ func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiv
}
func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) {
config, err := provider.configStore.Get(ctx, orgID)
if err != nil {
return nil, err
}
return provider.configStore.ListChannels(ctx, orgID)
}
channels := config.Channels()
channelList := make([]*alertmanagertypes.Channel, 0, len(channels))
for _, channel := range channels {
channelList = append(channelList, channel)
}
return channelList, nil
func (provider *provider) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) {
return provider.configStore.ListAllChannels(ctx)
}
func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) {
config, err := provider.configStore.Get(ctx, orgID)
if err != nil {
return nil, err
}
channels := config.Channels()
channel, err := alertmanagertypes.GetChannelByID(channels, channelID)
if err != nil {
return nil, err
}
return channel, nil
return provider.configStore.GetChannelByID(ctx, orgID, channelID)
}
func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
config, err := provider.configStore.Get(ctx, orgID)
func (provider *provider) UpdateChannelByReceiverAndID(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver, id int) error {
channel, err := provider.configStore.GetChannelByID(ctx, orgID, id)
if err != nil {
return err
}
err = config.UpdateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver)
err = channel.Update(receiver)
if err != nil {
return err
}
err = provider.configStore.Set(ctx, config, func(ctx context.Context) error {
url := provider.config.Legacy.URL.JoinPath(routesPath)
err = provider.configStore.UpdateChannel(ctx, orgID, channel, func(ctx context.Context) error {
url := provider.url.JoinPath(routesPath)
body, err := json.Marshal(receiver)
if err != nil {
@@ -240,18 +231,10 @@ func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID str
}
func (provider *provider) CreateChannel(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
config, err := provider.configStore.Get(ctx, orgID)
if err != nil {
return err
}
channel := alertmanagertypes.NewChannelFromReceiver(receiver, orgID)
err = config.CreateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver)
if err != nil {
return err
}
err = provider.configStore.Set(ctx, config, func(ctx context.Context) error {
url := provider.config.Legacy.URL.JoinPath(routesPath)
err := provider.configStore.CreateChannel(ctx, channel, func(ctx context.Context) error {
url := provider.url.JoinPath(routesPath)
body, err := json.Marshal(receiver)
if err != nil {
@@ -286,24 +269,13 @@ func (provider *provider) CreateChannel(ctx context.Context, orgID string, recei
}
func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, channelID int) error {
config, err := provider.configStore.Get(ctx, orgID)
if err != nil {
return err
}
err := provider.configStore.DeleteChannelByID(ctx, orgID, channelID, func(ctx context.Context) error {
channel, err := provider.configStore.GetChannelByID(ctx, orgID, channelID)
if err != nil {
return err
}
channels := config.Channels()
channel, err := alertmanagertypes.GetChannelByID(channels, channelID)
if err != nil {
return err
}
err = config.DeleteReceiver(channel.Name)
if err != nil {
return err
}
err = provider.configStore.Set(ctx, config, func(ctx context.Context) error {
url := provider.config.Legacy.URL.JoinPath(routesPath)
url := provider.url.JoinPath(routesPath)
body, err := json.Marshal(map[string]string{"name": channel.Name})
if err != nil {
@@ -338,5 +310,6 @@ func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, c
}
func (provider *provider) Stop(ctx context.Context) error {
provider.batcher.Stop(ctx)
return nil
}

File diff suppressed because one or more lines are too long

View File

@@ -12,7 +12,7 @@ import (
type Service struct {
// config is the config for the alertmanager service
config Config
config alertmanagerserver.Config
// stateStore is the state store for the alertmanager service
stateStore alertmanagertypes.StateStore
@@ -30,7 +30,7 @@ type Service struct {
serversMtx sync.RWMutex
}
func New(ctx context.Context, settings factory.ScopedProviderSettings, config Config, stateStore alertmanagertypes.StateStore, configStore alertmanagertypes.ConfigStore) *Service {
func New(ctx context.Context, settings factory.ScopedProviderSettings, config alertmanagerserver.Config, stateStore alertmanagertypes.StateStore, configStore alertmanagertypes.ConfigStore) *Service {
service := &Service{
config: config,
stateStore: stateStore,
@@ -57,7 +57,7 @@ func (service *Service) SyncServers(ctx context.Context) error {
continue
}
service.servers[orgID], err = alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config.Config, orgID, service.stateStore)
service.servers[orgID], err = alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config, orgID, service.stateStore)
if err != nil {
service.settings.Logger().Error("failed to create alertmanagerserver", "orgID", orgID, "error", err)
continue
@@ -74,13 +74,18 @@ func (service *Service) SyncServers(ctx context.Context) error {
return nil
}
func (service *Service) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
func (service *Service) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.DeprecatedGettableAlerts, error) {
server, err := service.getServer(orgID)
if err != nil {
return nil, err
}
return server.GetAlerts(ctx, params)
alerts, err := server.GetAlerts(ctx, params)
if err != nil {
return nil, err
}
return alertmanagertypes.NewDeprecatedGettableAlertsFromGettableAlerts(alerts), nil
}
func (service *Service) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error {

View File

@@ -6,6 +6,7 @@ import (
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
"go.signoz.io/signoz/pkg/errors"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
@@ -17,6 +18,7 @@ type provider struct {
settings factory.ScopedProviderSettings
configStore alertmanagertypes.ConfigStore
stateStore alertmanagertypes.StateStore
stopC chan struct{}
}
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
@@ -34,7 +36,7 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
service: alertmanager.New(
ctx,
settings,
config,
config.Signoz.Config,
stateStore,
configStore,
),
@@ -42,6 +44,7 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
config: config,
configStore: configStore,
stateStore: stateStore,
stopC: make(chan struct{}),
}, nil
}
@@ -50,7 +53,7 @@ func (provider *provider) Start(ctx context.Context) error {
defer ticker.Stop()
for {
select {
case <-ctx.Done():
case <-provider.stopC:
return nil
case <-ticker.C:
if err := provider.service.SyncServers(ctx); err != nil {
@@ -61,10 +64,11 @@ func (provider *provider) Start(ctx context.Context) error {
}
func (provider *provider) Stop(ctx context.Context) error {
close(provider.stopC)
return provider.service.Stop(ctx)
}
func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.DeprecatedGettableAlerts, error) {
return provider.service.GetAlerts(ctx, orgID, params)
}
@@ -91,6 +95,10 @@ func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*al
return channelList, nil
}
func (provider *provider) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) {
return nil, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "not supported by provider signoz")
}
func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) {
config, err := provider.configStore.Get(ctx, orgID)
if err != nil {
@@ -106,7 +114,7 @@ func (provider *provider) GetChannelByID(ctx context.Context, orgID string, chan
return channel, nil
}
func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
func (provider *provider) UpdateChannelByReceiverAndID(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver, id int) error {
config, err := provider.configStore.Get(ctx, orgID)
if err != nil {
return err
@@ -117,7 +125,7 @@ func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID str
return err
}
err = provider.configStore.Set(ctx, config, nil)
err = provider.configStore.Set(ctx, config)
if err != nil {
return err
}
@@ -142,7 +150,7 @@ func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, c
return err
}
err = provider.configStore.Set(ctx, config, nil)
err = provider.configStore.Set(ctx, config)
if err != nil {
return err
}
@@ -161,7 +169,7 @@ func (provider *provider) CreateChannel(ctx context.Context, orgID string, recei
return err
}
err = provider.configStore.Set(ctx, config, nil)
err = provider.configStore.Set(ctx, config)
if err != nil {
return err
}

View File

@@ -40,7 +40,7 @@ func NewRegistry(logger *slog.Logger, services ...NamedService) (*Registry, erro
}, nil
}
func (r *Registry) Start(ctx context.Context) error {
func (r *Registry) Start(ctx context.Context) {
for _, s := range r.services.GetInOrder() {
go func(s NamedService) {
r.logger.InfoContext(ctx, "starting service", "service", s.Name())
@@ -49,7 +49,6 @@ func (r *Registry) Start(ctx context.Context) error {
}(s)
}
return nil
}
func (r *Registry) Wait(ctx context.Context) error {

View File

@@ -41,7 +41,7 @@ func TestRegistryWith2Services(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
require.NoError(t, registry.Start(ctx))
registry.Start(ctx)
require.NoError(t, registry.Wait(ctx))
require.NoError(t, registry.Stop(ctx))
}()
@@ -62,7 +62,7 @@ func TestRegistryWith2ServicesWithoutWait(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
require.NoError(t, registry.Start(ctx))
registry.Start(ctx)
require.NoError(t, registry.Stop(ctx))
}()

View File

@@ -78,6 +78,12 @@ func (writer *nonFlushingBadResponseLoggingWriter) Write(data []byte) (int, erro
// https://godoc.org/net/http#ResponseWriter
writer.WriteHeader(http.StatusOK)
}
// 204 No Content is a success response that indicates that the request has been successfully processed and that the response body is intentionally empty.
if writer.statusCode == 204 {
return 0, nil
}
n, err := writer.rw.Write(data)
if writer.logBody {
writer.captureResponseBody(data)

View File

@@ -23,6 +23,7 @@ type SDK struct {
logger *slog.Logger
sdk contribsdkconfig.SDK
prometheusRegistry *prometheus.Registry
startCh chan struct{}
}
// New creates a new Instrumentation instance with configured providers.
@@ -96,14 +97,17 @@ func New(ctx context.Context, build version.Build, cfg Config) (*SDK, error) {
sdk: sdk,
prometheusRegistry: prometheusRegistry,
logger: NewLogger(cfg),
startCh: make(chan struct{}),
}, nil
}
func (i *SDK) Start(ctx context.Context) error {
<-i.startCh
return nil
}
func (i *SDK) Stop(ctx context.Context) error {
close(i.startCh)
return i.sdk.Shutdown(ctx)
}

View File

@@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"go.signoz.io/signoz/pkg/query-service/app/metricsexplorer"
"io"
"math"
"net/http"
@@ -19,6 +18,9 @@ import (
"text/template"
"time"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/query-service/app/metricsexplorer"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
jsoniter "github.com/json-iterator/go"
@@ -58,7 +60,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/dao"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/query-service/rules"
@@ -84,7 +85,6 @@ type APIHandler struct {
reader interfaces.Reader
skipConfig *model.SkipConfig
appDao dao.ModelDao
alertManager am.Manager
ruleManager *rules.Manager
featureFlags interfaces.FeatureLookup
querier interfaces.Querier
@@ -133,6 +133,8 @@ type APIHandler struct {
pvcsRepo *inframetrics.PvcsRepo
JWT *authtypes.JWT
AlertmanagerAPI *alertmanager.API
}
type APIHandlerOpts struct {
@@ -174,16 +176,12 @@ type APIHandlerOpts struct {
UseTraceNewSchema bool
JWT *authtypes.JWT
AlertmanagerAPI *alertmanager.API
}
// NewAPIHandler returns an APIHandler
func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
alertManager, err := am.New()
if err != nil {
return nil, err
}
querierOpts := querier.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
@@ -227,7 +225,6 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
skipConfig: opts.SkipConfig,
preferSpanMetrics: opts.PreferSpanMetrics,
temporalityMap: make(map[string]map[v3.Temporality]bool),
alertManager: alertManager,
ruleManager: opts.RuleManager,
featureFlags: opts.FeatureFlags,
IntegrationsController: opts.IntegrationsController,
@@ -250,6 +247,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
pvcsRepo: pvcsRepo,
JWT: opts.JWT,
SummaryService: summaryService,
AlertmanagerAPI: opts.AlertmanagerAPI,
}
logsQueryBuilder := logsv3.PrepareLogsQuery
@@ -483,21 +481,21 @@ func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) {
// RegisterPrivateRoutes registers routes for this handler on the given router
func (aH *APIHandler) RegisterPrivateRoutes(router *mux.Router) {
router.HandleFunc("/api/v1/channels", aH.listChannels).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels", aH.AlertmanagerAPI.ListAllChannels).Methods(http.MethodGet)
}
// RegisterRoutes registers routes for this handler on the given router
func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
router.HandleFunc("/api/v1/query_range", am.ViewAccess(aH.queryRangeMetrics)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/query", am.ViewAccess(aH.queryMetrics)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels", am.ViewAccess(aH.listChannels)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels/{id}", am.ViewAccess(aH.getChannel)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.editChannel)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.deleteChannel)).Methods(http.MethodDelete)
router.HandleFunc("/api/v1/channels", am.EditAccess(aH.createChannel)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/testChannel", am.EditAccess(aH.testChannel)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/channels", am.ViewAccess(aH.AlertmanagerAPI.ListChannels)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels/{id}", am.ViewAccess(aH.AlertmanagerAPI.GetChannelByID)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.AlertmanagerAPI.UpdateChannelByID)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.AlertmanagerAPI.DeleteChannelByID)).Methods(http.MethodDelete)
router.HandleFunc("/api/v1/channels", am.EditAccess(aH.AlertmanagerAPI.CreateChannel)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/testChannel", am.EditAccess(aH.AlertmanagerAPI.TestReceiver)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/alerts", am.ViewAccess(aH.getAlerts)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/alerts", am.ViewAccess(aH.AlertmanagerAPI.GetAlerts)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/rules", am.ViewAccess(aH.listRules)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/rules/{id}", am.ViewAccess(aH.getRule)).Methods(http.MethodGet)
@@ -1360,138 +1358,6 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) {
}
func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
channel, apiErrorObj := aH.ruleManager.RuleDB().GetChannel(id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, channel)
}
func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
apiErrorObj := aH.ruleManager.RuleDB().DeleteChannel(id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, "notification channel successfully deleted")
}
func (aH *APIHandler) listChannels(w http.ResponseWriter, r *http.Request) {
channels, apiErrorObj := aH.ruleManager.RuleDB().GetChannels()
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, channels)
}
// testChannels sends test alert to all registered channels
func (aH *APIHandler) testChannel(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
zap.L().Error("Error in getting req body of testChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
receiver := &am.Receiver{}
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
zap.L().Error("Error in parsing req body of testChannel API\n", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
// send alert
apiErrorObj := aH.alertManager.TestReceiver(receiver)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, "test alert sent")
}
func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
zap.L().Error("Error in getting req body of editChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
receiver := &am.Receiver{}
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
zap.L().Error("Error in parsing req body of editChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
_, apiErrorObj := aH.ruleManager.RuleDB().EditChannel(receiver, id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, nil)
}
func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
zap.L().Error("Error in getting req body of createChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
receiver := &am.Receiver{}
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
zap.L().Error("Error in parsing req body of createChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
_, apiErrorObj := aH.ruleManager.RuleDB().CreateChannel(receiver)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, nil)
}
func (aH *APIHandler) getAlerts(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
amEndpoint := constants.GetAlertManagerApiPrefix()
resp, err := http.Get(amEndpoint + "v1/alerts" + "?" + params.Encode())
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, string(body))
}
func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

View File

@@ -14,6 +14,7 @@ import (
"github.com/rs/cors"
"github.com/soheilhy/cmux"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/http/middleware"
"go.signoz.io/signoz/pkg/query-service/agentConf"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
@@ -196,6 +197,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
JWT: serverOptions.Jwt,
AlertmanagerAPI: alertmanager.NewAPI(serverOptions.SigNoz.Alertmanager),
})
if err != nil {
return nil, err
@@ -278,7 +280,6 @@ func (s *Server) createPrivateServer(api *APIHandler) (*http.Server, error) {
}
func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server, error) {
r := NewRouter()
r.Use(middleware.NewAuth(zap.L(), s.serverOptions.Jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}).Wrap)

View File

@@ -4,8 +4,6 @@ import (
"context"
"flag"
"os"
"os/signal"
"syscall"
"time"
prommodel "github.com/prometheus/common/model"
@@ -142,22 +140,20 @@ func main() {
logger.Fatal("Failed to initialize auth cache", zap.Error(err))
}
signalsChannel := make(chan os.Signal, 1)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
signoz.Start(context.Background())
for {
select {
case status := <-server.HealthCheckStatus():
logger.Info("Received HealthCheck status: ", zap.Int("status", int(status)))
case <-signalsChannel:
logger.Info("Received OS Interrupt Signal ... ")
err := server.Stop()
if err != nil {
logger.Fatal("Failed to stop server", zap.Error(err))
}
logger.Info("Server stopped")
return
}
if err := signoz.Wait(context.Background()); err != nil {
zap.L().Fatal("Failed to start signoz", zap.Error(err))
}
err = server.Stop()
if err != nil {
zap.L().Fatal("Failed to stop server", zap.Error(err))
}
err = signoz.Stop(context.Background())
if err != nil {
zap.L().Fatal("Failed to stop signoz", zap.Error(err))
}
}

View File

@@ -7,6 +7,7 @@ import (
"reflect"
"time"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/apiserver"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/config"
@@ -44,6 +45,9 @@ type Config struct {
// TelemetryStore config
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"`
// Alertmanager config
Alertmanager alertmanager.Config `mapstructure:"alertmanager"`
}
// DeprecatedFlags are the flags that are deprecated and scheduled for removal.
@@ -63,6 +67,7 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprec
sqlmigrator.NewConfigFactory(),
apiserver.NewConfigFactory(),
telemetrystore.NewConfigFactory(),
alertmanager.NewConfigFactory(),
}
conf, err := config.New(ctx, resolverConfig, configFactories)
@@ -138,17 +143,26 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
}
if deprecatedFlags.MaxIdleConns != 50 {
fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS env variable instead.")
fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS instead.")
config.TelemetryStore.Connection.MaxIdleConns = deprecatedFlags.MaxIdleConns
}
if deprecatedFlags.MaxOpenConns != 100 {
fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS env variable instead.")
fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS instead.")
config.TelemetryStore.Connection.MaxOpenConns = deprecatedFlags.MaxOpenConns
}
if deprecatedFlags.DialTimeout != 5*time.Second {
fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT environment variable instead.")
fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT instead.")
config.TelemetryStore.Connection.DialTimeout = deprecatedFlags.DialTimeout
}
if os.Getenv("ALERTMANAGER_API_PREFIX") != "" {
fmt.Println("[Deprecated] env ALERTMANAGER_API_PREFIX is deprecated and scheduled for removal. Please use SIGNOZ_ALERTMANAGER_LEGACY_API__URL instead.")
config.Alertmanager.Legacy.ApiURL = os.Getenv("ALERTMANAGER_API_PREFIX")
}
if os.Getenv("ALERTMANAGER_API_CHANNEL_PATH") != "" {
fmt.Println("[Deprecated] env ALERTMANAGER_API_CHANNEL_PATH is deprecated and scheduled for complete removal.")
}
}

View File

@@ -1,6 +1,9 @@
package signoz
import (
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/alertmanager/legacyalertmanager"
"go.signoz.io/signoz/pkg/alertmanager/signozalertmanager"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/cache/memorycache"
"go.signoz.io/signoz/pkg/cache/rediscache"
@@ -33,6 +36,9 @@ type ProviderConfig struct {
// Map of all telemetrystore provider factories
TelemetryStoreProviderFactories factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]]
// Map of all alertmanager provider factories
AlertmanagerProviderFactories factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]]
}
func NewProviderConfig() ProviderConfig {
@@ -62,9 +68,17 @@ func NewProviderConfig() ProviderConfig {
sqlmigration.NewAddPatsFactory(),
sqlmigration.NewModifyDatetimeFactory(),
sqlmigration.NewModifyOrgDomainFactory(),
sqlmigration.NewModifyNotificationChannelsFactory(),
),
TelemetryStoreProviderFactories: factory.MustNewNamedMap(
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewFactory()),
),
}
}
func NewAlertmanagerProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] {
return factory.MustNewNamedMap(
legacyalertmanager.NewFactory(sqlstore),
signozalertmanager.NewFactory(sqlstore),
)
}

View File

@@ -14,3 +14,9 @@ func TestNewProviderConfig(t *testing.T) {
NewProviderConfig()
})
}
func TestNewAlertmanagerProviderFactories(t *testing.T) {
assert.NotPanics(t, func() {
NewAlertmanagerProviderFactories(nil)
})
}

View File

@@ -3,6 +3,7 @@ package signoz
import (
"context"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/instrumentation"
@@ -16,10 +17,12 @@ import (
)
type SigNoz struct {
*factory.Registry
Cache cache.Cache
Web web.Web
SQLStore sqlstore.SQLStore
TelemetryStore telemetrystore.TelemetryStore
Alertmanager alertmanager.Alertmanager
}
func New(
@@ -102,10 +105,32 @@ func New(
return nil, err
}
alertmanager, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
config.Alertmanager,
NewAlertmanagerProviderFactories(sqlstore),
config.Alertmanager.Provider,
)
if err != nil {
return nil, err
}
registry, err := factory.NewRegistry(
instrumentation.Logger(),
factory.NewNamedService(factory.MustNewName("instrumentation"), instrumentation),
factory.NewNamedService(factory.MustNewName("alertmanager"), alertmanager),
)
if err != nil {
return nil, err
}
return &SigNoz{
Registry: registry,
Cache: cache,
Web: web,
SQLStore: sqlstore,
TelemetryStore: telemetrystore,
Alertmanager: alertmanager,
}, nil
}

View File

@@ -0,0 +1,89 @@
package sqlmigration
import (
"context"
"database/sql"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
)
type modifyNotificationChannels struct{}
func NewModifyNotificationChannelsFactory() factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("modify_notification_channels"), newModifyNotificationChannels)
}
func newModifyNotificationChannels(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
return &modifyNotificationChannels{}, nil
}
func (migration *modifyNotificationChannels) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *modifyNotificationChannels) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() //nolint:errcheck
if _, err := tx.
NewAddColumn().
Table("notification_channels").
ColumnExpr("org_id TEXT").
Exec(ctx); err != nil {
return err
}
var orgID string
err = tx.
NewSelect().
ColumnExpr("id").
Table("organizations").
Limit(1).
Scan(ctx, &orgID)
if err != nil {
if err != sql.ErrNoRows {
return err
}
}
if err == nil {
err = migration.populateOrgID(ctx, tx, orgID)
if err != nil {
return err
}
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
func (migration *modifyNotificationChannels) populateOrgID(ctx context.Context, tx bun.Tx, orgID string) error {
if _, err := tx.
NewUpdate().
Table("notification_channels").
Set("org_id = ?", orgID).
Where("org_id IS NULL").
Exec(ctx); err != nil {
return err
}
return nil
}
func (migration *modifyNotificationChannels) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@@ -24,7 +24,15 @@ type migrator struct {
func New(ctx context.Context, providerSettings factory.ProviderSettings, sqlstore sqlstore.SQLStore, migrations *migrate.Migrations, config Config) SQLMigrator {
return &migrator{
migrator: migrate.NewMigrator(sqlstore.BunDB(), migrations, migrate.WithTableName(migrationTableName), migrate.WithLocksTableName(migrationLockTableName)),
migrator: migrate.NewMigrator(
sqlstore.BunDB(),
migrations,
migrate.WithTableName(migrationTableName),
migrate.WithLocksTableName(migrationLockTableName),
// This is to ensure that the migration is marked as applied only on success. If the migration fails, no entry is made in the migration table
// and the migration will be retried.
migrate.WithMarkAppliedOnSuccess(true),
),
settings: factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/sqlmigrator"),
config: config,
dialect: sqlstore.BunDB().Dialect().Name().String(),

View File

@@ -22,6 +22,9 @@ import (
)
type (
// An alias for the Alert type from the alertmanager package.
AlertModel = models.Alert
// An alias for the Alert type from the alertmanager package.
Alert = types.Alert
@@ -38,12 +41,51 @@ type (
GettableAlerts = models.GettableAlerts
)
type DeprecatedGettableAlert struct {
*model.Alert
Status types.AlertStatus `json:"status"`
Receivers []string `json:"receivers"`
Fingerprint string `json:"fingerprint"`
}
type DeprecatedGettableAlerts = []*DeprecatedGettableAlert
// An alias for the GettableAlertsParams type from the alertmanager package.
type GettableAlertsParams struct {
alert.GetAlertsParams
RawQuery string
}
func NewDeprecatedGettableAlertsFromGettableAlerts(gettableAlerts GettableAlerts) DeprecatedGettableAlerts {
deprecatedGettableAlerts := make(DeprecatedGettableAlerts, 0, len(gettableAlerts))
for _, gettableAlert := range gettableAlerts {
receivers := make([]string, 0, len(gettableAlert.Receivers))
for _, receiver := range gettableAlert.Receivers {
receivers = append(receivers, *receiver.Name)
}
deprecatedGettableAlerts = append(deprecatedGettableAlerts, &DeprecatedGettableAlert{
Alert: &model.Alert{
Labels: v2.APILabelSetToModelLabelSet(gettableAlert.Labels),
Annotations: v2.APILabelSetToModelLabelSet(gettableAlert.Annotations),
StartsAt: time.Time(*gettableAlert.StartsAt),
EndsAt: time.Time(*gettableAlert.EndsAt),
GeneratorURL: string(gettableAlert.GeneratorURL),
},
Status: types.AlertStatus{
State: types.AlertState(*gettableAlert.Status.State),
SilencedBy: gettableAlert.Status.SilencedBy,
InhibitedBy: gettableAlert.Status.InhibitedBy,
},
Fingerprint: *gettableAlert.Fingerprint,
Receivers: receivers,
})
}
return deprecatedGettableAlerts
}
// Converts a slice of Alert to a slice of PostableAlert.
func NewPostableAlertsFromAlerts(alerts []*types.Alert) PostableAlerts {
postableAlerts := make(PostableAlerts, 0, len(alerts))

View File

@@ -12,7 +12,8 @@ import (
)
var (
ErrCodeAlertmanagerChannelNotFound = errors.MustNewCode("alertmanager_channel_not_found")
ErrCodeAlertmanagerChannelNotFound = errors.MustNewCode("alertmanager_channel_not_found")
ErrCodeAlertmanagerChannelNameMismatch = errors.MustNewCode("alertmanager_channel_name_mismatch")
)
var (
@@ -152,3 +153,19 @@ func GetChannelByID(channels Channels, id int) (*Channel, error) {
return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", id)
}
func (c *Channel) Update(receiver Receiver) error {
channel := NewChannelFromReceiver(receiver, c.OrgID)
if channel == nil {
return errors.Newf(errors.TypeInvalidInput, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", c.ID)
}
if c.Name != channel.Name {
return errors.Newf(errors.TypeInvalidInput, ErrCodeAlertmanagerChannelNameMismatch, "cannot update channel name")
}
c.Data = channel.Data
c.UpdatedAt = time.Now()
return nil
}

View File

@@ -260,13 +260,31 @@ func (c *Config) DeleteReceiver(name string) error {
type ConfigStore interface {
// Set creates or updates a config.
Set(context.Context, *Config, func(context.Context) error) error
Set(context.Context, *Config) error
// Get returns the config for the given orgID
Get(context.Context, string) (*Config, error)
// ListOrgs returns the list of orgs
ListOrgs(context.Context) ([]string, error)
// CreateChannel creates a new channel.
CreateChannel(context.Context, *Channel, func(context.Context) error) error
// GetChannelByID returns the channel for the given id.
GetChannelByID(context.Context, string, int) (*Channel, error)
// UpdateChannel updates a channel.
UpdateChannel(context.Context, string, *Channel, func(context.Context) error) error
// DeleteChannelByID deletes a channel.
DeleteChannelByID(context.Context, string, int, func(context.Context) error) error
// ListChannels returns the list of channels.
ListChannels(context.Context, string) ([]*Channel, error)
// ListAllChannels returns the list of channels for all organizations.
ListAllChannels(context.Context) ([]*Channel, error)
}
// MarshalSecretValue if set to true will expose Secret type