Compare commits
4 Commits
main
...
v0.74.0-30
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
309ac976cd | ||
|
|
5ab0d77087 | ||
|
|
307afa02ff | ||
|
|
b9e29424ce |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -76,3 +76,5 @@ dist/
|
||||
|
||||
# ignore user_scripts that is fetched by init-clickhouse
|
||||
deploy/common/clickhouse/user_scripts/
|
||||
|
||||
queries.active
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"go.signoz.io/signoz/ee/query-service/interfaces"
|
||||
"go.signoz.io/signoz/ee/query-service/license"
|
||||
"go.signoz.io/signoz/ee/query-service/usage"
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
baseapp "go.signoz.io/signoz/pkg/query-service/app"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/integrations"
|
||||
@@ -20,6 +21,7 @@ import (
|
||||
basemodel "go.signoz.io/signoz/pkg/query-service/model"
|
||||
rules "go.signoz.io/signoz/pkg/query-service/rules"
|
||||
"go.signoz.io/signoz/pkg/query-service/version"
|
||||
"go.signoz.io/signoz/pkg/signoz"
|
||||
"go.signoz.io/signoz/pkg/types/authtypes"
|
||||
)
|
||||
|
||||
@@ -51,7 +53,7 @@ type APIHandler struct {
|
||||
}
|
||||
|
||||
// NewAPIHandler returns an APIHandler
|
||||
func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
|
||||
func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler, error) {
|
||||
|
||||
baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{
|
||||
Reader: opts.DataConnector,
|
||||
@@ -67,6 +69,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
|
||||
FluxInterval: opts.FluxInterval,
|
||||
UseLogsNewSchema: opts.UseLogsNewSchema,
|
||||
UseTraceNewSchema: opts.UseTraceNewSchema,
|
||||
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -267,7 +267,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
JWT: serverOptions.Jwt,
|
||||
}
|
||||
|
||||
apiHandler, err := api.NewAPIHandler(apiOpts)
|
||||
apiHandler, err := api.NewAPIHandler(apiOpts, serverOptions.SigNoz)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
@@ -198,16 +197,19 @@ func main() {
|
||||
zap.L().Fatal("Failed to initialize auth cache", zap.Error(err))
|
||||
}
|
||||
|
||||
signalsChannel := make(chan os.Signal, 1)
|
||||
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
|
||||
signoz.Start(context.Background())
|
||||
|
||||
for {
|
||||
select {
|
||||
case status := <-server.HealthCheckStatus():
|
||||
zap.L().Info("Received HealthCheck status: ", zap.Int("status", int(status)))
|
||||
case <-signalsChannel:
|
||||
zap.L().Fatal("Received OS Interrupt Signal ... ")
|
||||
server.Stop()
|
||||
}
|
||||
if err := signoz.Wait(context.Background()); err != nil {
|
||||
zap.L().Fatal("Failed to start signoz", zap.Error(err))
|
||||
}
|
||||
|
||||
err = server.Stop()
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to stop server", zap.Error(err))
|
||||
}
|
||||
|
||||
err = signoz.Stop(context.Background())
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to stop signoz", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
3
go.mod
3
go.mod
@@ -55,6 +55,7 @@ require (
|
||||
github.com/soheilhy/cmux v0.1.5
|
||||
github.com/srikanthccv/ClickHouse-go-mock v0.9.0
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/tidwall/gjson v1.18.0
|
||||
github.com/uptrace/bun v1.2.9
|
||||
github.com/uptrace/bun/dialect/pgdialect v1.2.9
|
||||
github.com/uptrace/bun/dialect/sqlitedialect v1.2.9
|
||||
@@ -204,6 +205,8 @@ require (
|
||||
github.com/smarty/assertions v1.15.0 // indirect
|
||||
github.com/spf13/cobra v1.8.1 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/tidwall/match v1.1.1 // indirect
|
||||
github.com/tidwall/pretty v1.2.0 // indirect
|
||||
github.com/tklauser/go-sysconf v0.3.13 // indirect
|
||||
github.com/tklauser/numcpus v0.7.0 // indirect
|
||||
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
|
||||
|
||||
6
go.sum
6
go.sum
@@ -891,7 +891,13 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
|
||||
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
|
||||
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
|
||||
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
|
||||
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
|
||||
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
|
||||
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||
github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4=
|
||||
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
|
||||
github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4=
|
||||
|
||||
@@ -26,11 +26,14 @@ type Alertmanager interface {
|
||||
// ListChannels lists all channels for the organization.
|
||||
ListChannels(context.Context, string) ([]*alertmanagertypes.Channel, error)
|
||||
|
||||
// ListAllChannels lists all channels for all organizations. It is used by the legacy alertmanager only.
|
||||
ListAllChannels(context.Context) ([]*alertmanagertypes.Channel, error)
|
||||
|
||||
// GetChannelByID gets a channel for the organization.
|
||||
GetChannelByID(context.Context, string, int) (*alertmanagertypes.Channel, error)
|
||||
|
||||
// UpdateChannel updates a channel for the organization.
|
||||
UpdateChannelByReceiver(context.Context, string, alertmanagertypes.Receiver) error
|
||||
UpdateChannelByReceiverAndID(context.Context, string, alertmanagertypes.Receiver, int) error
|
||||
|
||||
// CreateChannel creates a channel for the organization.
|
||||
CreateChannel(context.Context, string, alertmanagertypes.Receiver) error
|
||||
|
||||
@@ -9,9 +9,11 @@ import (
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
// Notifier is responsible for dispatching alert notifications to an alertmanager.
|
||||
// Batcher is responsible for batching alerts and broadcasting them on a channel.
|
||||
type Batcher struct {
|
||||
// C is the channel on which alerts are sent to alertmanager
|
||||
C chan alertmanagertypes.PostableAlerts
|
||||
|
||||
// logger
|
||||
logger *slog.Logger
|
||||
|
||||
@@ -23,14 +25,21 @@ type Batcher struct {
|
||||
|
||||
// more channel to signal the sender goroutine to send alerts
|
||||
moreC chan struct{}
|
||||
|
||||
// stop channel to signal the sender goroutine to stop
|
||||
stopC chan struct{}
|
||||
mtx sync.RWMutex
|
||||
|
||||
// mutex to synchronize access to the queue
|
||||
queueMtx sync.RWMutex
|
||||
|
||||
// wait group to wait for all goroutines to finish
|
||||
goroutinesWg sync.WaitGroup
|
||||
}
|
||||
|
||||
func New(logger *slog.Logger, config Config) *Batcher {
|
||||
batcher := &Batcher{
|
||||
logger: logger,
|
||||
queue: make(alertmanagertypes.PostableAlerts, config.Capacity),
|
||||
queue: make(alertmanagertypes.PostableAlerts, 0, config.Capacity),
|
||||
config: config,
|
||||
moreC: make(chan struct{}, 1),
|
||||
stopC: make(chan struct{}),
|
||||
@@ -41,22 +50,27 @@ func New(logger *slog.Logger, config Config) *Batcher {
|
||||
}
|
||||
|
||||
// Start dispatches notifications continuously.
|
||||
func (n *Batcher) Start(ctx context.Context) error {
|
||||
func (batcher *Batcher) Start(ctx context.Context) error {
|
||||
batcher.goroutinesWg.Add(1)
|
||||
go func() {
|
||||
n.logger.InfoContext(ctx, "starting alertmanager batcher")
|
||||
defer batcher.goroutinesWg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-batcher.stopC:
|
||||
for batcher.queueLen() > 0 {
|
||||
alerts := batcher.next()
|
||||
batcher.C <- alerts
|
||||
}
|
||||
close(batcher.C)
|
||||
return
|
||||
case <-n.stopC:
|
||||
return
|
||||
case <-n.moreC:
|
||||
case <-batcher.moreC:
|
||||
}
|
||||
alerts := n.nextBatch()
|
||||
n.C <- alerts
|
||||
alerts := batcher.next()
|
||||
batcher.C <- alerts
|
||||
// If the queue still has items left, kick off the next iteration.
|
||||
if n.queueLen() > 0 {
|
||||
n.setMore()
|
||||
if batcher.queueLen() > 0 {
|
||||
batcher.setMore()
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -64,69 +78,67 @@ func (n *Batcher) Start(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Batcher) queueLen() int {
|
||||
n.mtx.RLock()
|
||||
defer n.mtx.RUnlock()
|
||||
// Add queues the given notification requests for processing.
|
||||
func (batcher *Batcher) Add(ctx context.Context, alerts ...*alertmanagertypes.PostableAlert) {
|
||||
batcher.queueMtx.Lock()
|
||||
defer batcher.queueMtx.Unlock()
|
||||
|
||||
return len(n.queue)
|
||||
// Queue capacity should be significantly larger than a single alert
|
||||
// batch could be.
|
||||
if d := len(alerts) - batcher.config.Capacity; d > 0 {
|
||||
alerts = alerts[d:]
|
||||
batcher.logger.WarnContext(ctx, "alert batch larger than queue capacity, dropping alerts", "num_dropped", d, "capacity", batcher.config.Capacity)
|
||||
}
|
||||
|
||||
// If the queue is full, remove the oldest alerts in favor
|
||||
// of newer ones.
|
||||
if d := (len(batcher.queue) + len(alerts)) - batcher.config.Capacity; d > 0 {
|
||||
batcher.queue = batcher.queue[d:]
|
||||
batcher.logger.WarnContext(ctx, "alert batch queue full, dropping alerts", "num_dropped", d)
|
||||
}
|
||||
|
||||
batcher.queue = append(batcher.queue, alerts...)
|
||||
|
||||
// Notify sending goroutine that there are alerts to be processed.
|
||||
batcher.setMore()
|
||||
}
|
||||
|
||||
func (n *Batcher) nextBatch() alertmanagertypes.PostableAlerts {
|
||||
n.mtx.Lock()
|
||||
defer n.mtx.Unlock()
|
||||
// Stop shuts down the batcher.
|
||||
func (batcher *Batcher) Stop(ctx context.Context) {
|
||||
close(batcher.stopC)
|
||||
batcher.goroutinesWg.Wait()
|
||||
}
|
||||
|
||||
func (batcher *Batcher) queueLen() int {
|
||||
batcher.queueMtx.RLock()
|
||||
defer batcher.queueMtx.RUnlock()
|
||||
|
||||
return len(batcher.queue)
|
||||
}
|
||||
|
||||
func (batcher *Batcher) next() alertmanagertypes.PostableAlerts {
|
||||
batcher.queueMtx.Lock()
|
||||
defer batcher.queueMtx.Unlock()
|
||||
|
||||
var alerts alertmanagertypes.PostableAlerts
|
||||
|
||||
if len(n.queue) > n.config.Size {
|
||||
alerts = append(make(alertmanagertypes.PostableAlerts, 0, n.config.Size), n.queue[:n.config.Size]...)
|
||||
n.queue = n.queue[n.config.Size:]
|
||||
if len(batcher.queue) > batcher.config.Size {
|
||||
alerts = append(make(alertmanagertypes.PostableAlerts, 0, batcher.config.Size), batcher.queue[:batcher.config.Size]...)
|
||||
batcher.queue = batcher.queue[batcher.config.Size:]
|
||||
} else {
|
||||
alerts = append(make(alertmanagertypes.PostableAlerts, 0, len(n.queue)), n.queue...)
|
||||
n.queue = n.queue[:0]
|
||||
alerts = append(make(alertmanagertypes.PostableAlerts, 0, len(batcher.queue)), batcher.queue...)
|
||||
batcher.queue = batcher.queue[:0]
|
||||
}
|
||||
|
||||
return alerts
|
||||
}
|
||||
|
||||
// Send queues the given notification requests for processing.
|
||||
// Panics if called on a handler that is not running.
|
||||
func (n *Batcher) Send(ctx context.Context, alerts ...*alertmanagertypes.PostableAlert) {
|
||||
n.mtx.Lock()
|
||||
defer n.mtx.Unlock()
|
||||
|
||||
// Queue capacity should be significantly larger than a single alert
|
||||
// batch could be.
|
||||
if d := len(alerts) - n.config.Capacity; d > 0 {
|
||||
alerts = alerts[d:]
|
||||
n.logger.WarnContext(ctx, "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d)
|
||||
}
|
||||
|
||||
// If the queue is full, remove the oldest alerts in favor
|
||||
// of newer ones.
|
||||
if d := (len(n.queue) + len(alerts)) - n.config.Capacity; d > 0 {
|
||||
n.queue = n.queue[d:]
|
||||
|
||||
n.logger.WarnContext(ctx, "Alert notification queue full, dropping alerts", "num_dropped", d)
|
||||
}
|
||||
n.queue = append(n.queue, alerts...)
|
||||
|
||||
// Notify sending goroutine that there are alerts to be processed.
|
||||
n.setMore()
|
||||
}
|
||||
|
||||
// setMore signals that the alert queue has items.
|
||||
func (n *Batcher) setMore() {
|
||||
func (batcher *Batcher) setMore() {
|
||||
// If we cannot send on the channel, it means the signal already exists
|
||||
// and has not been consumed yet.
|
||||
select {
|
||||
case n.moreC <- struct{}{}:
|
||||
case batcher.moreC <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Stop shuts down the notification handler.
|
||||
func (n *Batcher) Stop(ctx context.Context) {
|
||||
n.logger.InfoContext(ctx, "Stopping alertmanager batcher")
|
||||
close(n.moreC)
|
||||
close(n.stopC)
|
||||
}
|
||||
|
||||
64
pkg/alertmanager/alertmanagerbatcher/batcher_test.go
Normal file
64
pkg/alertmanager/alertmanagerbatcher/batcher_test.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package alertmanagerbatcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"log/slog"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
func TestBatcherWithOneAlertAndDefaultConfigs(t *testing.T) {
|
||||
batcher := New(slog.New(slog.NewTextHandler(io.Discard, nil)), NewConfig())
|
||||
batcher.Start(context.Background())
|
||||
|
||||
batcher.Add(context.Background(), &alertmanagertypes.PostableAlert{Alert: alertmanagertypes.AlertModel{
|
||||
Labels: map[string]string{"alertname": "test"},
|
||||
}})
|
||||
|
||||
alerts := <-batcher.C
|
||||
assert.Equal(t, 1, len(alerts))
|
||||
|
||||
batcher.Stop(context.Background())
|
||||
}
|
||||
|
||||
func TestBatcherWithBatchSize(t *testing.T) {
|
||||
batcher := New(slog.New(slog.NewTextHandler(io.Discard, nil)), Config{Size: 2, Capacity: 4})
|
||||
batcher.Start(context.Background())
|
||||
|
||||
var alerts alertmanagertypes.PostableAlerts
|
||||
for i := 0; i < 4; i++ {
|
||||
alerts = append(alerts, &alertmanagertypes.PostableAlert{Alert: alertmanagertypes.AlertModel{
|
||||
Labels: map[string]string{"alertname": "test"},
|
||||
}})
|
||||
}
|
||||
batcher.Add(context.Background(), alerts...)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
alerts := <-batcher.C
|
||||
assert.Equal(t, 2, len(alerts))
|
||||
}
|
||||
|
||||
batcher.Stop(context.Background())
|
||||
}
|
||||
|
||||
func TestBatcherWithCClosed(t *testing.T) {
|
||||
batcher := New(slog.New(slog.NewTextHandler(io.Discard, nil)), Config{Size: 2, Capacity: 4})
|
||||
batcher.Start(context.Background())
|
||||
|
||||
var alerts alertmanagertypes.PostableAlerts
|
||||
for i := 0; i < 4; i++ {
|
||||
alerts = append(alerts, &alertmanagertypes.PostableAlert{Alert: alertmanagertypes.AlertModel{
|
||||
Labels: map[string]string{"alertname": "test"},
|
||||
}})
|
||||
}
|
||||
batcher.Add(context.Background(), alerts...)
|
||||
|
||||
batcher.Stop(context.Background())
|
||||
|
||||
for alerts := range batcher.C {
|
||||
assert.Equal(t, 2, len(alerts))
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,11 @@
|
||||
package alertmanagerbatcher
|
||||
|
||||
type Config struct {
|
||||
// Capacity is the maximum number of alerts that can be buffered in the batcher.
|
||||
Capacity int
|
||||
Size int
|
||||
|
||||
// Size is the number of alerts to send in each batch.
|
||||
Size int
|
||||
}
|
||||
|
||||
func NewConfig() Config {
|
||||
|
||||
@@ -45,7 +45,7 @@ func (store *config) Get(ctx context.Context, orgID string) (*alertmanagertypes.
|
||||
}
|
||||
|
||||
// Set implements alertmanagertypes.ConfigStore.
|
||||
func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config, cb func(context.Context) error) error {
|
||||
func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config) error {
|
||||
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -75,12 +75,6 @@ func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config,
|
||||
}
|
||||
}
|
||||
|
||||
if cb != nil {
|
||||
if err = cb(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -104,3 +98,145 @@ func (store *config) ListOrgs(ctx context.Context) ([]string, error) {
|
||||
|
||||
return orgIDs, nil
|
||||
}
|
||||
|
||||
func (store *config) CreateChannel(ctx context.Context, channel *alertmanagertypes.Channel, cb func(context.Context) error) error {
|
||||
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer tx.Rollback() //nolint:errcheck
|
||||
|
||||
if _, err = tx.NewInsert().
|
||||
Model(channel).
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cb != nil {
|
||||
if err = cb(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *config) GetChannelByID(ctx context.Context, orgID string, id int) (*alertmanagertypes.Channel, error) {
|
||||
channel := new(alertmanagertypes.Channel)
|
||||
|
||||
err := store.
|
||||
sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(channel).
|
||||
Where("org_id = ?", orgID).
|
||||
Where("id = ?", id).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", id)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return channel, nil
|
||||
}
|
||||
|
||||
func (store *config) UpdateChannel(ctx context.Context, orgID string, channel *alertmanagertypes.Channel, cb func(context.Context) error) error {
|
||||
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer tx.Rollback() //nolint:errcheck
|
||||
|
||||
_, err = tx.NewUpdate().
|
||||
Model(channel).
|
||||
WherePK().
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cb != nil {
|
||||
if err = cb(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *config) DeleteChannelByID(ctx context.Context, orgID string, id int, cb func(context.Context) error) error {
|
||||
channel := new(alertmanagertypes.Channel)
|
||||
|
||||
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer tx.Rollback() //nolint:errcheck
|
||||
|
||||
_, err = tx.NewDelete().
|
||||
Model(channel).
|
||||
Where("org_id = ?", orgID).
|
||||
Where("id = ?", id).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cb != nil {
|
||||
if err = cb(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *config) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) {
|
||||
var channels []*alertmanagertypes.Channel
|
||||
|
||||
err := store.
|
||||
sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(&channels).
|
||||
Where("org_id = ?", orgID).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return channels, nil
|
||||
}
|
||||
|
||||
func (store *config) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) {
|
||||
var channels []*alertmanagertypes.Channel
|
||||
|
||||
err := store.
|
||||
sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(&channels).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return channels, nil
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ func NewAPI(alertmanager Alertmanager) *API {
|
||||
}
|
||||
}
|
||||
|
||||
func (api *API) GetAlerts(req *http.Request, rw http.ResponseWriter) {
|
||||
func (api *API) GetAlerts(rw http.ResponseWriter, req *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -49,7 +49,7 @@ func (api *API) GetAlerts(req *http.Request, rw http.ResponseWriter) {
|
||||
render.Success(rw, http.StatusOK, alerts)
|
||||
}
|
||||
|
||||
func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) {
|
||||
func (api *API) TestReceiver(rw http.ResponseWriter, req *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -81,7 +81,7 @@ func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) {
|
||||
render.Success(rw, http.StatusNoContent, nil)
|
||||
}
|
||||
|
||||
func (api *API) ListChannels(req *http.Request, rw http.ResponseWriter) {
|
||||
func (api *API) ListChannels(rw http.ResponseWriter, req *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -100,7 +100,20 @@ func (api *API) ListChannels(req *http.Request, rw http.ResponseWriter) {
|
||||
render.Success(rw, http.StatusOK, channels)
|
||||
}
|
||||
|
||||
func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) {
|
||||
func (api *API) ListAllChannels(rw http.ResponseWriter, req *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
channels, err := api.alertmanager.ListAllChannels(ctx)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, channels)
|
||||
}
|
||||
|
||||
func (api *API) GetChannelByID(rw http.ResponseWriter, req *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -137,7 +150,7 @@ func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) {
|
||||
render.Success(rw, http.StatusOK, channel)
|
||||
}
|
||||
|
||||
func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
|
||||
func (api *API) UpdateChannelByID(rw http.ResponseWriter, req *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -147,6 +160,24 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(req)
|
||||
if vars == nil {
|
||||
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path"))
|
||||
return
|
||||
}
|
||||
|
||||
idString, ok := vars["id"]
|
||||
if !ok {
|
||||
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path"))
|
||||
return
|
||||
}
|
||||
|
||||
id, err := strconv.Atoi(idString)
|
||||
if err != nil {
|
||||
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid integer"))
|
||||
return
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
@@ -160,7 +191,7 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
|
||||
return
|
||||
}
|
||||
|
||||
err = api.alertmanager.UpdateChannelByReceiver(ctx, claims.OrgID, receiver)
|
||||
err = api.alertmanager.UpdateChannelByReceiverAndID(ctx, claims.OrgID, receiver, id)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
@@ -169,7 +200,7 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
|
||||
render.Success(rw, http.StatusNoContent, nil)
|
||||
}
|
||||
|
||||
func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) {
|
||||
func (api *API) DeleteChannelByID(rw http.ResponseWriter, req *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -206,7 +237,7 @@ func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) {
|
||||
render.Success(rw, http.StatusNoContent, nil)
|
||||
}
|
||||
|
||||
func (api *API) CreateChannel(req *http.Request, rw http.ResponseWriter) {
|
||||
func (api *API) CreateChannel(rw http.ResponseWriter, req *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package alertmanager
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
@@ -9,9 +11,6 @@ import (
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
// Config is the config for the alertmanager server.
|
||||
alertmanagerserver.Config `mapstructure:",squash"`
|
||||
|
||||
// Provider is the provider for the alertmanager service.
|
||||
Provider string `mapstructure:"provider"`
|
||||
|
||||
@@ -25,11 +24,14 @@ type Config struct {
|
||||
type Signoz struct {
|
||||
// PollInterval is the interval at which the alertmanager is synced.
|
||||
PollInterval time.Duration `mapstructure:"poll_interval"`
|
||||
|
||||
// Config is the config for the alertmanager server.
|
||||
alertmanagerserver.Config `mapstructure:",squash"`
|
||||
}
|
||||
|
||||
type Legacy struct {
|
||||
// URL is the URL of the legacy alertmanager.
|
||||
URL *url.URL `mapstructure:"url"`
|
||||
// ApiURL is the URL of the legacy signoz alertmanager.
|
||||
ApiURL string `mapstructure:"api_url"`
|
||||
}
|
||||
|
||||
func NewConfigFactory() factory.ConfigFactory {
|
||||
@@ -38,14 +40,28 @@ func NewConfigFactory() factory.ConfigFactory {
|
||||
|
||||
func newConfig() factory.Config {
|
||||
return Config{
|
||||
Config: alertmanagerserver.NewConfig(),
|
||||
Provider: "signoz",
|
||||
Provider: "legacy",
|
||||
Legacy: Legacy{
|
||||
ApiURL: "http://alertmanager:9093/api",
|
||||
},
|
||||
Signoz: Signoz{
|
||||
PollInterval: 15 * time.Second,
|
||||
Config: alertmanagerserver.NewConfig(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c Config) Validate() error {
|
||||
if c.Provider == "legacy" {
|
||||
if c.Legacy.ApiURL == "" {
|
||||
return errors.New("api_url is required")
|
||||
}
|
||||
|
||||
_, err := url.Parse(c.Legacy.ApiURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("api_url %q is invalid: %w", c.Legacy.ApiURL, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
48
pkg/alertmanager/config_test.go
Normal file
48
pkg/alertmanager/config_test.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package alertmanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.signoz.io/signoz/pkg/config"
|
||||
"go.signoz.io/signoz/pkg/config/envprovider"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
func TestNewWithEnvProvider(t *testing.T) {
|
||||
t.Setenv("SIGNOZ_ALERTMANAGER_PROVIDER", "legacy")
|
||||
t.Setenv("SIGNOZ_ALERTMANAGER_LEGACY_API__URL", "http://localhost:9093/api")
|
||||
|
||||
conf, err := config.New(
|
||||
context.Background(),
|
||||
config.ResolverConfig{
|
||||
Uris: []string{"env:"},
|
||||
ProviderFactories: []config.ProviderFactory{
|
||||
envprovider.NewFactory(),
|
||||
},
|
||||
},
|
||||
[]factory.ConfigFactory{
|
||||
NewConfigFactory(),
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
actual := &Config{}
|
||||
err = conf.Unmarshal("alertmanager", actual)
|
||||
require.NoError(t, err)
|
||||
|
||||
def := NewConfigFactory().New().(Config)
|
||||
|
||||
expected := &Config{
|
||||
Provider: "legacy",
|
||||
Legacy: Legacy{
|
||||
ApiURL: "http://localhost:9093/api",
|
||||
},
|
||||
Signoz: def.Signoz,
|
||||
}
|
||||
|
||||
assert.Equal(t, expected, actual)
|
||||
assert.NoError(t, actual.Validate())
|
||||
}
|
||||
@@ -7,8 +7,10 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/tidwall/gjson"
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerbatcher"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
|
||||
@@ -29,6 +31,7 @@ type provider struct {
|
||||
client *http.Client
|
||||
configStore alertmanagertypes.ConfigStore
|
||||
batcher *alertmanagerbatcher.Batcher
|
||||
url *url.URL
|
||||
}
|
||||
|
||||
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
|
||||
@@ -41,6 +44,11 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/legacyalertmanager")
|
||||
configStore := sqlalertmanagerstore.NewConfigStore(sqlstore)
|
||||
|
||||
url, err := url.Parse(config.Legacy.ApiURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &provider{
|
||||
config: config,
|
||||
settings: settings,
|
||||
@@ -49,6 +57,7 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
|
||||
},
|
||||
configStore: configStore,
|
||||
batcher: alertmanagerbatcher.New(settings.Logger(), alertmanagerbatcher.NewConfig()),
|
||||
url: url,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -57,22 +66,18 @@ func (provider *provider) Start(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer provider.batcher.Stop(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case alerts := <-provider.batcher.C:
|
||||
if err := provider.putAlerts(ctx, "", alerts); err != nil {
|
||||
provider.settings.Logger().Error("failed to send alerts to alertmanager", "error", err)
|
||||
}
|
||||
for alerts := range provider.batcher.C {
|
||||
if err := provider.putAlerts(ctx, "", alerts); err != nil {
|
||||
provider.settings.Logger().Error("failed to send alerts to alertmanager", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
|
||||
url := provider.config.Legacy.URL.JoinPath(alertsPath)
|
||||
url := provider.url.JoinPath(alertsPath)
|
||||
url.RawQuery = params.RawQuery
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil)
|
||||
@@ -92,8 +97,12 @@ func (provider *provider) GetAlerts(ctx context.Context, orgID string, params al
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return nil, fmt.Errorf("bad response status %v", resp.Status)
|
||||
}
|
||||
|
||||
var alerts alertmanagertypes.GettableAlerts
|
||||
if err := json.Unmarshal(body, &alerts); err != nil {
|
||||
if err := json.Unmarshal([]byte(gjson.GetBytes(body, "data").Raw), &alerts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -101,12 +110,12 @@ func (provider *provider) GetAlerts(ctx context.Context, orgID string, params al
|
||||
}
|
||||
|
||||
func (provider *provider) PutAlerts(ctx context.Context, _ string, alerts alertmanagertypes.PostableAlerts) error {
|
||||
provider.batcher.Send(ctx, alerts...)
|
||||
provider.batcher.Add(ctx, alerts...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *provider) putAlerts(ctx context.Context, _ string, alerts alertmanagertypes.PostableAlerts) error {
|
||||
url := provider.config.Legacy.URL.JoinPath(alertsPath)
|
||||
url := provider.url.JoinPath(alertsPath)
|
||||
|
||||
body, err := json.Marshal(alerts)
|
||||
if err != nil {
|
||||
@@ -135,7 +144,7 @@ func (provider *provider) putAlerts(ctx context.Context, _ string, alerts alertm
|
||||
}
|
||||
|
||||
func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||
url := provider.config.Legacy.URL.JoinPath(testReceiverPath)
|
||||
url := provider.url.JoinPath(testReceiverPath)
|
||||
|
||||
body, err := json.Marshal(receiver)
|
||||
if err != nil {
|
||||
@@ -164,48 +173,30 @@ func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiv
|
||||
}
|
||||
|
||||
func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return provider.configStore.ListChannels(ctx, orgID)
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
channelList := make([]*alertmanagertypes.Channel, 0, len(channels))
|
||||
for _, channel := range channels {
|
||||
channelList = append(channelList, channel)
|
||||
}
|
||||
|
||||
return channelList, nil
|
||||
func (provider *provider) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) {
|
||||
return provider.configStore.ListAllChannels(ctx)
|
||||
}
|
||||
|
||||
func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
channel, err := alertmanagertypes.GetChannelByID(channels, channelID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return channel, nil
|
||||
return provider.configStore.GetChannelByID(ctx, orgID, channelID)
|
||||
}
|
||||
|
||||
func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
func (provider *provider) UpdateChannelByReceiverAndID(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver, id int) error {
|
||||
channel, err := provider.configStore.GetChannelByID(ctx, orgID, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = config.UpdateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver)
|
||||
err = channel.Update(receiver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = provider.configStore.Set(ctx, config, func(ctx context.Context) error {
|
||||
url := provider.config.Legacy.URL.JoinPath(routesPath)
|
||||
err = provider.configStore.UpdateChannel(ctx, orgID, channel, func(ctx context.Context) error {
|
||||
url := provider.url.JoinPath(routesPath)
|
||||
|
||||
body, err := json.Marshal(receiver)
|
||||
if err != nil {
|
||||
@@ -240,18 +231,10 @@ func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID str
|
||||
}
|
||||
|
||||
func (provider *provider) CreateChannel(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
channel := alertmanagertypes.NewChannelFromReceiver(receiver, orgID)
|
||||
|
||||
err = config.CreateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = provider.configStore.Set(ctx, config, func(ctx context.Context) error {
|
||||
url := provider.config.Legacy.URL.JoinPath(routesPath)
|
||||
err := provider.configStore.CreateChannel(ctx, channel, func(ctx context.Context) error {
|
||||
url := provider.url.JoinPath(routesPath)
|
||||
|
||||
body, err := json.Marshal(receiver)
|
||||
if err != nil {
|
||||
@@ -286,24 +269,13 @@ func (provider *provider) CreateChannel(ctx context.Context, orgID string, recei
|
||||
}
|
||||
|
||||
func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, channelID int) error {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err := provider.configStore.DeleteChannelByID(ctx, orgID, channelID, func(ctx context.Context) error {
|
||||
channel, err := provider.configStore.GetChannelByID(ctx, orgID, channelID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
channel, err := alertmanagertypes.GetChannelByID(channels, channelID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = config.DeleteReceiver(channel.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = provider.configStore.Set(ctx, config, func(ctx context.Context) error {
|
||||
url := provider.config.Legacy.URL.JoinPath(routesPath)
|
||||
url := provider.url.JoinPath(routesPath)
|
||||
|
||||
body, err := json.Marshal(map[string]string{"name": channel.Name})
|
||||
if err != nil {
|
||||
@@ -338,5 +310,6 @@ func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, c
|
||||
}
|
||||
|
||||
func (provider *provider) Stop(ctx context.Context) error {
|
||||
provider.batcher.Stop(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
|
||||
type Service struct {
|
||||
// config is the config for the alertmanager service
|
||||
config Config
|
||||
config alertmanagerserver.Config
|
||||
|
||||
// stateStore is the state store for the alertmanager service
|
||||
stateStore alertmanagertypes.StateStore
|
||||
@@ -30,7 +30,7 @@ type Service struct {
|
||||
serversMtx sync.RWMutex
|
||||
}
|
||||
|
||||
func New(ctx context.Context, settings factory.ScopedProviderSettings, config Config, stateStore alertmanagertypes.StateStore, configStore alertmanagertypes.ConfigStore) *Service {
|
||||
func New(ctx context.Context, settings factory.ScopedProviderSettings, config alertmanagerserver.Config, stateStore alertmanagertypes.StateStore, configStore alertmanagertypes.ConfigStore) *Service {
|
||||
service := &Service{
|
||||
config: config,
|
||||
stateStore: stateStore,
|
||||
@@ -57,7 +57,7 @@ func (service *Service) SyncServers(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
service.servers[orgID], err = alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config.Config, orgID, service.stateStore)
|
||||
service.servers[orgID], err = alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config, orgID, service.stateStore)
|
||||
if err != nil {
|
||||
service.settings.Logger().Error("failed to create alertmanagerserver", "orgID", orgID, "error", err)
|
||||
continue
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
|
||||
"go.signoz.io/signoz/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
@@ -17,6 +18,7 @@ type provider struct {
|
||||
settings factory.ScopedProviderSettings
|
||||
configStore alertmanagertypes.ConfigStore
|
||||
stateStore alertmanagertypes.StateStore
|
||||
stopC chan struct{}
|
||||
}
|
||||
|
||||
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
|
||||
@@ -34,7 +36,7 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
|
||||
service: alertmanager.New(
|
||||
ctx,
|
||||
settings,
|
||||
config,
|
||||
config.Signoz.Config,
|
||||
stateStore,
|
||||
configStore,
|
||||
),
|
||||
@@ -42,6 +44,7 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
|
||||
config: config,
|
||||
configStore: configStore,
|
||||
stateStore: stateStore,
|
||||
stopC: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -50,7 +53,7 @@ func (provider *provider) Start(ctx context.Context) error {
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-provider.stopC:
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
if err := provider.service.SyncServers(ctx); err != nil {
|
||||
@@ -61,6 +64,7 @@ func (provider *provider) Start(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (provider *provider) Stop(ctx context.Context) error {
|
||||
close(provider.stopC)
|
||||
return provider.service.Stop(ctx)
|
||||
}
|
||||
|
||||
@@ -91,6 +95,10 @@ func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*al
|
||||
return channelList, nil
|
||||
}
|
||||
|
||||
func (provider *provider) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) {
|
||||
return nil, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "not supported by provider signoz")
|
||||
}
|
||||
|
||||
func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
@@ -106,7 +114,7 @@ func (provider *provider) GetChannelByID(ctx context.Context, orgID string, chan
|
||||
return channel, nil
|
||||
}
|
||||
|
||||
func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||
func (provider *provider) UpdateChannelByReceiverAndID(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver, id int) error {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -117,7 +125,7 @@ func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID str
|
||||
return err
|
||||
}
|
||||
|
||||
err = provider.configStore.Set(ctx, config, nil)
|
||||
err = provider.configStore.Set(ctx, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -142,7 +150,7 @@ func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, c
|
||||
return err
|
||||
}
|
||||
|
||||
err = provider.configStore.Set(ctx, config, nil)
|
||||
err = provider.configStore.Set(ctx, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -161,7 +169,7 @@ func (provider *provider) CreateChannel(ctx context.Context, orgID string, recei
|
||||
return err
|
||||
}
|
||||
|
||||
err = provider.configStore.Set(ctx, config, nil)
|
||||
err = provider.configStore.Set(ctx, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ func NewRegistry(logger *slog.Logger, services ...NamedService) (*Registry, erro
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *Registry) Start(ctx context.Context) error {
|
||||
func (r *Registry) Start(ctx context.Context) {
|
||||
for _, s := range r.services.GetInOrder() {
|
||||
go func(s NamedService) {
|
||||
r.logger.InfoContext(ctx, "starting service", "service", s.Name())
|
||||
@@ -49,7 +49,6 @@ func (r *Registry) Start(ctx context.Context) error {
|
||||
}(s)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Registry) Wait(ctx context.Context) error {
|
||||
|
||||
@@ -41,7 +41,7 @@ func TestRegistryWith2Services(t *testing.T) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
require.NoError(t, registry.Start(ctx))
|
||||
registry.Start(ctx)
|
||||
require.NoError(t, registry.Wait(ctx))
|
||||
require.NoError(t, registry.Stop(ctx))
|
||||
}()
|
||||
@@ -62,7 +62,7 @@ func TestRegistryWith2ServicesWithoutWait(t *testing.T) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
require.NoError(t, registry.Start(ctx))
|
||||
registry.Start(ctx)
|
||||
require.NoError(t, registry.Stop(ctx))
|
||||
}()
|
||||
|
||||
|
||||
@@ -78,6 +78,12 @@ func (writer *nonFlushingBadResponseLoggingWriter) Write(data []byte) (int, erro
|
||||
// https://godoc.org/net/http#ResponseWriter
|
||||
writer.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
// 204 No Content is a success response that indicates that the request has been successfully processed and that the response body is intentionally empty.
|
||||
if writer.statusCode == 204 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
n, err := writer.rw.Write(data)
|
||||
if writer.logBody {
|
||||
writer.captureResponseBody(data)
|
||||
|
||||
@@ -23,6 +23,7 @@ type SDK struct {
|
||||
logger *slog.Logger
|
||||
sdk contribsdkconfig.SDK
|
||||
prometheusRegistry *prometheus.Registry
|
||||
startCh chan struct{}
|
||||
}
|
||||
|
||||
// New creates a new Instrumentation instance with configured providers.
|
||||
@@ -96,14 +97,17 @@ func New(ctx context.Context, build version.Build, cfg Config) (*SDK, error) {
|
||||
sdk: sdk,
|
||||
prometheusRegistry: prometheusRegistry,
|
||||
logger: NewLogger(cfg),
|
||||
startCh: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (i *SDK) Start(ctx context.Context) error {
|
||||
<-i.startCh
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *SDK) Stop(ctx context.Context) error {
|
||||
close(i.startCh)
|
||||
return i.sdk.Shutdown(ctx)
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/metricsexplorer"
|
||||
"io"
|
||||
"math"
|
||||
"net/http"
|
||||
@@ -19,6 +18,9 @@ import (
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/metricsexplorer"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gorilla/websocket"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
@@ -58,7 +60,6 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
|
||||
"go.signoz.io/signoz/pkg/query-service/dao"
|
||||
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
|
||||
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
"go.signoz.io/signoz/pkg/query-service/rules"
|
||||
@@ -84,7 +85,6 @@ type APIHandler struct {
|
||||
reader interfaces.Reader
|
||||
skipConfig *model.SkipConfig
|
||||
appDao dao.ModelDao
|
||||
alertManager am.Manager
|
||||
ruleManager *rules.Manager
|
||||
featureFlags interfaces.FeatureLookup
|
||||
querier interfaces.Querier
|
||||
@@ -133,6 +133,8 @@ type APIHandler struct {
|
||||
pvcsRepo *inframetrics.PvcsRepo
|
||||
|
||||
JWT *authtypes.JWT
|
||||
|
||||
AlertmanagerAPI *alertmanager.API
|
||||
}
|
||||
|
||||
type APIHandlerOpts struct {
|
||||
@@ -174,16 +176,12 @@ type APIHandlerOpts struct {
|
||||
UseTraceNewSchema bool
|
||||
|
||||
JWT *authtypes.JWT
|
||||
|
||||
AlertmanagerAPI *alertmanager.API
|
||||
}
|
||||
|
||||
// NewAPIHandler returns an APIHandler
|
||||
func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
||||
|
||||
alertManager, err := am.New()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
querierOpts := querier.QuerierOptions{
|
||||
Reader: opts.Reader,
|
||||
Cache: opts.Cache,
|
||||
@@ -227,7 +225,6 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
||||
skipConfig: opts.SkipConfig,
|
||||
preferSpanMetrics: opts.PreferSpanMetrics,
|
||||
temporalityMap: make(map[string]map[v3.Temporality]bool),
|
||||
alertManager: alertManager,
|
||||
ruleManager: opts.RuleManager,
|
||||
featureFlags: opts.FeatureFlags,
|
||||
IntegrationsController: opts.IntegrationsController,
|
||||
@@ -250,6 +247,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
||||
pvcsRepo: pvcsRepo,
|
||||
JWT: opts.JWT,
|
||||
SummaryService: summaryService,
|
||||
AlertmanagerAPI: opts.AlertmanagerAPI,
|
||||
}
|
||||
|
||||
logsQueryBuilder := logsv3.PrepareLogsQuery
|
||||
@@ -483,21 +481,21 @@ func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) {
|
||||
|
||||
// RegisterPrivateRoutes registers routes for this handler on the given router
|
||||
func (aH *APIHandler) RegisterPrivateRoutes(router *mux.Router) {
|
||||
router.HandleFunc("/api/v1/channels", aH.listChannels).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/channels", aH.AlertmanagerAPI.ListAllChannels).Methods(http.MethodGet)
|
||||
}
|
||||
|
||||
// RegisterRoutes registers routes for this handler on the given router
|
||||
func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
|
||||
router.HandleFunc("/api/v1/query_range", am.ViewAccess(aH.queryRangeMetrics)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/query", am.ViewAccess(aH.queryMetrics)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/channels", am.ViewAccess(aH.listChannels)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/channels/{id}", am.ViewAccess(aH.getChannel)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.editChannel)).Methods(http.MethodPut)
|
||||
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.deleteChannel)).Methods(http.MethodDelete)
|
||||
router.HandleFunc("/api/v1/channels", am.EditAccess(aH.createChannel)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/testChannel", am.EditAccess(aH.testChannel)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/channels", am.ViewAccess(aH.AlertmanagerAPI.ListChannels)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/channels/{id}", am.ViewAccess(aH.AlertmanagerAPI.GetChannelByID)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.AlertmanagerAPI.UpdateChannelByID)).Methods(http.MethodPut)
|
||||
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.AlertmanagerAPI.DeleteChannelByID)).Methods(http.MethodDelete)
|
||||
router.HandleFunc("/api/v1/channels", am.EditAccess(aH.AlertmanagerAPI.CreateChannel)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/testChannel", am.EditAccess(aH.AlertmanagerAPI.TestReceiver)).Methods(http.MethodPost)
|
||||
|
||||
router.HandleFunc("/api/v1/alerts", am.ViewAccess(aH.getAlerts)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/alerts", am.ViewAccess(aH.AlertmanagerAPI.GetAlerts)).Methods(http.MethodGet)
|
||||
|
||||
router.HandleFunc("/api/v1/rules", am.ViewAccess(aH.listRules)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/rules/{id}", am.ViewAccess(aH.getRule)).Methods(http.MethodGet)
|
||||
@@ -1360,138 +1358,6 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) {
|
||||
id := mux.Vars(r)["id"]
|
||||
channel, apiErrorObj := aH.ruleManager.RuleDB().GetChannel(id)
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, channel)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) {
|
||||
id := mux.Vars(r)["id"]
|
||||
apiErrorObj := aH.ruleManager.RuleDB().DeleteChannel(id)
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, "notification channel successfully deleted")
|
||||
}
|
||||
|
||||
func (aH *APIHandler) listChannels(w http.ResponseWriter, r *http.Request) {
|
||||
channels, apiErrorObj := aH.ruleManager.RuleDB().GetChannels()
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, channels)
|
||||
}
|
||||
|
||||
// testChannels sends test alert to all registered channels
|
||||
func (aH *APIHandler) testChannel(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
defer r.Body.Close()
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in getting req body of testChannel API", zap.Error(err))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
receiver := &am.Receiver{}
|
||||
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
|
||||
zap.L().Error("Error in parsing req body of testChannel API\n", zap.Error(err))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
// send alert
|
||||
apiErrorObj := aH.alertManager.TestReceiver(receiver)
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, "test alert sent")
|
||||
}
|
||||
|
||||
func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
id := mux.Vars(r)["id"]
|
||||
|
||||
defer r.Body.Close()
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in getting req body of editChannel API", zap.Error(err))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
receiver := &am.Receiver{}
|
||||
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
|
||||
zap.L().Error("Error in parsing req body of editChannel API", zap.Error(err))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
_, apiErrorObj := aH.ruleManager.RuleDB().EditChannel(receiver, id)
|
||||
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, nil)
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
defer r.Body.Close()
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in getting req body of createChannel API", zap.Error(err))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
receiver := &am.Receiver{}
|
||||
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
|
||||
zap.L().Error("Error in parsing req body of createChannel API", zap.Error(err))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
_, apiErrorObj := aH.ruleManager.RuleDB().CreateChannel(receiver)
|
||||
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, nil)
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getAlerts(w http.ResponseWriter, r *http.Request) {
|
||||
params := r.URL.Query()
|
||||
amEndpoint := constants.GetAlertManagerApiPrefix()
|
||||
resp, err := http.Get(amEndpoint + "v1/alerts" + "?" + params.Encode())
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, string(body))
|
||||
}
|
||||
|
||||
func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
defer r.Body.Close()
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/rs/cors"
|
||||
"github.com/soheilhy/cmux"
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/http/middleware"
|
||||
"go.signoz.io/signoz/pkg/query-service/agentConf"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
|
||||
@@ -196,6 +197,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
|
||||
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
|
||||
JWT: serverOptions.Jwt,
|
||||
AlertmanagerAPI: alertmanager.NewAPI(serverOptions.SigNoz.Alertmanager),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -278,7 +280,6 @@ func (s *Server) createPrivateServer(api *APIHandler) (*http.Server, error) {
|
||||
}
|
||||
|
||||
func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server, error) {
|
||||
|
||||
r := NewRouter()
|
||||
|
||||
r.Use(middleware.NewAuth(zap.L(), s.serverOptions.Jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}).Wrap)
|
||||
|
||||
@@ -4,8 +4,6 @@ import (
|
||||
"context"
|
||||
"flag"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
prommodel "github.com/prometheus/common/model"
|
||||
@@ -142,22 +140,20 @@ func main() {
|
||||
logger.Fatal("Failed to initialize auth cache", zap.Error(err))
|
||||
}
|
||||
|
||||
signalsChannel := make(chan os.Signal, 1)
|
||||
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
|
||||
signoz.Start(context.Background())
|
||||
|
||||
for {
|
||||
select {
|
||||
case status := <-server.HealthCheckStatus():
|
||||
logger.Info("Received HealthCheck status: ", zap.Int("status", int(status)))
|
||||
case <-signalsChannel:
|
||||
logger.Info("Received OS Interrupt Signal ... ")
|
||||
err := server.Stop()
|
||||
if err != nil {
|
||||
logger.Fatal("Failed to stop server", zap.Error(err))
|
||||
}
|
||||
logger.Info("Server stopped")
|
||||
return
|
||||
}
|
||||
if err := signoz.Wait(context.Background()); err != nil {
|
||||
zap.L().Fatal("Failed to start signoz", zap.Error(err))
|
||||
}
|
||||
|
||||
err = server.Stop()
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to stop server", zap.Error(err))
|
||||
}
|
||||
|
||||
err = signoz.Stop(context.Background())
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to stop signoz", zap.Error(err))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/apiserver"
|
||||
"go.signoz.io/signoz/pkg/cache"
|
||||
"go.signoz.io/signoz/pkg/config"
|
||||
@@ -44,6 +45,9 @@ type Config struct {
|
||||
|
||||
// TelemetryStore config
|
||||
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"`
|
||||
|
||||
// Alertmanager config
|
||||
Alertmanager alertmanager.Config `mapstructure:"alertmanager"`
|
||||
}
|
||||
|
||||
// DeprecatedFlags are the flags that are deprecated and scheduled for removal.
|
||||
@@ -63,6 +67,7 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprec
|
||||
sqlmigrator.NewConfigFactory(),
|
||||
apiserver.NewConfigFactory(),
|
||||
telemetrystore.NewConfigFactory(),
|
||||
alertmanager.NewConfigFactory(),
|
||||
}
|
||||
|
||||
conf, err := config.New(ctx, resolverConfig, configFactories)
|
||||
@@ -138,17 +143,26 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
|
||||
}
|
||||
|
||||
if deprecatedFlags.MaxIdleConns != 50 {
|
||||
fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS env variable instead.")
|
||||
fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS instead.")
|
||||
config.TelemetryStore.Connection.MaxIdleConns = deprecatedFlags.MaxIdleConns
|
||||
}
|
||||
|
||||
if deprecatedFlags.MaxOpenConns != 100 {
|
||||
fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS env variable instead.")
|
||||
fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS instead.")
|
||||
config.TelemetryStore.Connection.MaxOpenConns = deprecatedFlags.MaxOpenConns
|
||||
}
|
||||
|
||||
if deprecatedFlags.DialTimeout != 5*time.Second {
|
||||
fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT environment variable instead.")
|
||||
fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT instead.")
|
||||
config.TelemetryStore.Connection.DialTimeout = deprecatedFlags.DialTimeout
|
||||
}
|
||||
|
||||
if os.Getenv("ALERTMANAGER_API_PREFIX") != "" {
|
||||
fmt.Println("[Deprecated] env ALERTMANAGER_API_PREFIX is deprecated and scheduled for removal. Please use SIGNOZ_ALERTMANAGER_LEGACY_API__URL instead.")
|
||||
config.Alertmanager.Legacy.ApiURL = os.Getenv("ALERTMANAGER_API_PREFIX")
|
||||
}
|
||||
|
||||
if os.Getenv("ALERTMANAGER_API_CHANNEL_PATH") != "" {
|
||||
fmt.Println("[Deprecated] env ALERTMANAGER_API_CHANNEL_PATH is deprecated and scheduled for complete removal.")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package signoz
|
||||
|
||||
import (
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/legacyalertmanager"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/signozalertmanager"
|
||||
"go.signoz.io/signoz/pkg/cache"
|
||||
"go.signoz.io/signoz/pkg/cache/memorycache"
|
||||
"go.signoz.io/signoz/pkg/cache/rediscache"
|
||||
@@ -33,6 +36,9 @@ type ProviderConfig struct {
|
||||
|
||||
// Map of all telemetrystore provider factories
|
||||
TelemetryStoreProviderFactories factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]]
|
||||
|
||||
// Map of all alertmanager provider factories
|
||||
AlertmanagerProviderFactories factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]]
|
||||
}
|
||||
|
||||
func NewProviderConfig() ProviderConfig {
|
||||
@@ -62,9 +68,17 @@ func NewProviderConfig() ProviderConfig {
|
||||
sqlmigration.NewAddPatsFactory(),
|
||||
sqlmigration.NewModifyDatetimeFactory(),
|
||||
sqlmigration.NewModifyOrgDomainFactory(),
|
||||
sqlmigration.NewModifyNotificationChannelsFactory(),
|
||||
),
|
||||
TelemetryStoreProviderFactories: factory.MustNewNamedMap(
|
||||
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewFactory()),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
func NewAlertmanagerProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
legacyalertmanager.NewFactory(sqlstore),
|
||||
signozalertmanager.NewFactory(sqlstore),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -14,3 +14,9 @@ func TestNewProviderConfig(t *testing.T) {
|
||||
NewProviderConfig()
|
||||
})
|
||||
}
|
||||
|
||||
func TestNewAlertmanagerProviderFactories(t *testing.T) {
|
||||
assert.NotPanics(t, func() {
|
||||
NewAlertmanagerProviderFactories(nil)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package signoz
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/cache"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/instrumentation"
|
||||
@@ -16,10 +17,12 @@ import (
|
||||
)
|
||||
|
||||
type SigNoz struct {
|
||||
*factory.Registry
|
||||
Cache cache.Cache
|
||||
Web web.Web
|
||||
SQLStore sqlstore.SQLStore
|
||||
TelemetryStore telemetrystore.TelemetryStore
|
||||
Alertmanager alertmanager.Alertmanager
|
||||
}
|
||||
|
||||
func New(
|
||||
@@ -102,10 +105,32 @@ func New(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
alertmanager, err := factory.NewProviderFromNamedMap(
|
||||
ctx,
|
||||
providerSettings,
|
||||
config.Alertmanager,
|
||||
NewAlertmanagerProviderFactories(sqlstore),
|
||||
config.Alertmanager.Provider,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
registry, err := factory.NewRegistry(
|
||||
instrumentation.Logger(),
|
||||
factory.NewNamedService(factory.MustNewName("instrumentation"), instrumentation),
|
||||
factory.NewNamedService(factory.MustNewName("alertmanager"), alertmanager),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &SigNoz{
|
||||
Registry: registry,
|
||||
Cache: cache,
|
||||
Web: web,
|
||||
SQLStore: sqlstore,
|
||||
TelemetryStore: telemetrystore,
|
||||
Alertmanager: alertmanager,
|
||||
}, nil
|
||||
}
|
||||
|
||||
89
pkg/sqlmigration/013_modify_notification_channels.go
Normal file
89
pkg/sqlmigration/013_modify_notification_channels.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package sqlmigration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
type modifyNotificationChannels struct{}
|
||||
|
||||
func NewModifyNotificationChannelsFactory() factory.ProviderFactory[SQLMigration, Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("modify_notification_channels"), newModifyNotificationChannels)
|
||||
}
|
||||
|
||||
func newModifyNotificationChannels(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
|
||||
return &modifyNotificationChannels{}, nil
|
||||
}
|
||||
|
||||
func (migration *modifyNotificationChannels) Register(migrations *migrate.Migrations) error {
|
||||
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *modifyNotificationChannels) Up(ctx context.Context, db *bun.DB) error {
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer tx.Rollback() //nolint:errcheck
|
||||
|
||||
if _, err := tx.
|
||||
NewAddColumn().
|
||||
Table("notification_channels").
|
||||
ColumnExpr("org_id TEXT").
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var orgID string
|
||||
|
||||
err = tx.
|
||||
NewSelect().
|
||||
ColumnExpr("id").
|
||||
Table("organizations").
|
||||
Limit(1).
|
||||
Scan(ctx, &orgID)
|
||||
if err != nil {
|
||||
if err != sql.ErrNoRows {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
err = migration.populateOrgID(ctx, tx, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *modifyNotificationChannels) populateOrgID(ctx context.Context, tx bun.Tx, orgID string) error {
|
||||
if _, err := tx.
|
||||
NewUpdate().
|
||||
Table("notification_channels").
|
||||
Set("org_id = ?", orgID).
|
||||
Where("org_id IS NULL").
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *modifyNotificationChannels) Down(ctx context.Context, db *bun.DB) error {
|
||||
return nil
|
||||
}
|
||||
@@ -24,7 +24,15 @@ type migrator struct {
|
||||
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, sqlstore sqlstore.SQLStore, migrations *migrate.Migrations, config Config) SQLMigrator {
|
||||
return &migrator{
|
||||
migrator: migrate.NewMigrator(sqlstore.BunDB(), migrations, migrate.WithTableName(migrationTableName), migrate.WithLocksTableName(migrationLockTableName)),
|
||||
migrator: migrate.NewMigrator(
|
||||
sqlstore.BunDB(),
|
||||
migrations,
|
||||
migrate.WithTableName(migrationTableName),
|
||||
migrate.WithLocksTableName(migrationLockTableName),
|
||||
// This is to ensure that the migration is marked as applied only on success. If the migration fails, no entry is made in the migration table
|
||||
// and the migration will be retried.
|
||||
migrate.WithMarkAppliedOnSuccess(true),
|
||||
),
|
||||
settings: factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/sqlmigrator"),
|
||||
config: config,
|
||||
dialect: sqlstore.BunDB().Dialect().Name().String(),
|
||||
|
||||
@@ -22,6 +22,9 @@ import (
|
||||
)
|
||||
|
||||
type (
|
||||
// An alias for the Alert type from the alertmanager package.
|
||||
AlertModel = models.Alert
|
||||
|
||||
// An alias for the Alert type from the alertmanager package.
|
||||
Alert = types.Alert
|
||||
|
||||
|
||||
@@ -12,7 +12,8 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeAlertmanagerChannelNotFound = errors.MustNewCode("alertmanager_channel_not_found")
|
||||
ErrCodeAlertmanagerChannelNotFound = errors.MustNewCode("alertmanager_channel_not_found")
|
||||
ErrCodeAlertmanagerChannelNameMismatch = errors.MustNewCode("alertmanager_channel_name_mismatch")
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -152,3 +153,19 @@ func GetChannelByID(channels Channels, id int) (*Channel, error) {
|
||||
|
||||
return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", id)
|
||||
}
|
||||
|
||||
func (c *Channel) Update(receiver Receiver) error {
|
||||
channel := NewChannelFromReceiver(receiver, c.OrgID)
|
||||
if channel == nil {
|
||||
return errors.Newf(errors.TypeInvalidInput, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", c.ID)
|
||||
}
|
||||
|
||||
if c.Name != channel.Name {
|
||||
return errors.Newf(errors.TypeInvalidInput, ErrCodeAlertmanagerChannelNameMismatch, "cannot update channel name")
|
||||
}
|
||||
|
||||
c.Data = channel.Data
|
||||
c.UpdatedAt = time.Now()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -260,13 +260,31 @@ func (c *Config) DeleteReceiver(name string) error {
|
||||
|
||||
type ConfigStore interface {
|
||||
// Set creates or updates a config.
|
||||
Set(context.Context, *Config, func(context.Context) error) error
|
||||
Set(context.Context, *Config) error
|
||||
|
||||
// Get returns the config for the given orgID
|
||||
Get(context.Context, string) (*Config, error)
|
||||
|
||||
// ListOrgs returns the list of orgs
|
||||
ListOrgs(context.Context) ([]string, error)
|
||||
|
||||
// CreateChannel creates a new channel.
|
||||
CreateChannel(context.Context, *Channel, func(context.Context) error) error
|
||||
|
||||
// GetChannelByID returns the channel for the given id.
|
||||
GetChannelByID(context.Context, string, int) (*Channel, error)
|
||||
|
||||
// UpdateChannel updates a channel.
|
||||
UpdateChannel(context.Context, string, *Channel, func(context.Context) error) error
|
||||
|
||||
// DeleteChannelByID deletes a channel.
|
||||
DeleteChannelByID(context.Context, string, int, func(context.Context) error) error
|
||||
|
||||
// ListChannels returns the list of channels.
|
||||
ListChannels(context.Context, string) ([]*Channel, error)
|
||||
|
||||
// ListAllChannels returns the list of channels for all organizations.
|
||||
ListAllChannels(context.Context) ([]*Channel, error)
|
||||
}
|
||||
|
||||
// MarshalSecretValue if set to true will expose Secret type
|
||||
|
||||
Reference in New Issue
Block a user