Compare commits

...

11 Commits

Author SHA1 Message Date
grandwizard28
d05e279bcd Merge branch 'main' into alertmanager 2025-02-12 22:58:29 +05:30
grandwizard28
f1c5d873f7 feat(alertmanager): first attempt at bootstrap 2025-02-12 18:46:28 +05:30
grandwizard28
aec239cc7c feat(alertmanager): first attempt at bootstrap 2025-02-12 18:46:28 +05:30
grandwizard28
59e26652dc feat(alertmanager): first attempt at bootstrap 2025-02-12 18:46:27 +05:30
grandwizard28
e02afc5e97 feat(alertmanager): first attempt at bootstrap 2025-02-12 18:46:27 +05:30
grandwizard28
3eac8ac30b feat(alertmanager): first attempt at bootstrap 2025-02-12 18:46:27 +05:30
grandwizard28
382c4f58e1 refactor(alertmanager): add alertmanager 2025-02-12 18:46:27 +05:30
grandwizard28
73ea632a3f refactor(alertmanager): move to types package 2025-02-12 18:46:27 +05:30
grandwizard28
00fa8810c0 feat(alertmanager): add support for multi org 2025-02-12 18:46:26 +05:30
grandwizard28
6cee330d44 feat(alertmanager): add support for testing receiver 2025-02-12 18:46:26 +05:30
grandwizard28
871c6e642c feat(alertmanager): first iteration of alertmanager 2025-02-12 18:46:25 +05:30
32 changed files with 1638 additions and 92 deletions

2
.gitignore vendored
View File

@@ -76,3 +76,5 @@ dist/
# ignore user_scripts that is fetched by init-clickhouse
deploy/common/clickhouse/user_scripts/
# queries.active
queries.active

View File

@@ -20,6 +20,7 @@ import (
basemodel "go.signoz.io/signoz/pkg/query-service/model"
rules "go.signoz.io/signoz/pkg/query-service/rules"
"go.signoz.io/signoz/pkg/query-service/version"
"go.signoz.io/signoz/pkg/signoz"
)
type APIHandlerOptions struct {
@@ -41,6 +42,7 @@ type APIHandlerOptions struct {
FluxInterval time.Duration
UseLogsNewSchema bool
UseTraceNewSchema bool
SigNoz *signoz.SigNoz
}
type APIHandler struct {
@@ -65,6 +67,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
FluxInterval: opts.FluxInterval,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
SigNoz: opts.SigNoz,
})
if err != nil {

View File

@@ -269,6 +269,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
GatewayUrl: serverOptions.GatewayUrl,
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
SigNoz: serverOptions.SigNoz,
}
apiHandler, err := api.NewAPIHandler(apiOpts)

View File

@@ -7,7 +7,6 @@ import (
"os"
"os/signal"
"strconv"
"syscall"
"time"
"go.opentelemetry.io/otel/sdk/resource"
@@ -87,6 +86,8 @@ func init() {
}
func main() {
ctx := context.Background()
var promConfigPath, skipTopLvlOpsPath string
// disables rule execution but allows change to the rule definition
@@ -191,20 +192,20 @@ func main() {
zap.L().Fatal("Could not start server", zap.Error(err))
}
if err := auth.InitAuthCache(context.Background()); err != nil {
if err := auth.InitAuthCache(ctx); err != nil {
zap.L().Fatal("Failed to initialize auth cache", zap.Error(err))
}
signalsChannel := make(chan os.Signal, 1)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
if err := signoz.Start(ctx); err != nil {
zap.L().Fatal("Failed to start signoz", zap.Error(err))
}
for {
select {
case status := <-server.HealthCheckStatus():
zap.L().Info("Received HealthCheck status: ", zap.Int("status", int(status)))
case <-signalsChannel:
zap.L().Fatal("Received OS Interrupt Signal ... ")
server.Stop()
}
if err := signoz.Wait(ctx); err != nil {
zap.L().Fatal("Failed to wait for signoz", zap.Error(err))
}
server.Stop()
if err := signoz.Stop(ctx); err != nil {
zap.L().Fatal("Failed to stop signoz", zap.Error(err))
}
}

View File

@@ -0,0 +1,89 @@
package alertmanagerstoretest
import (
"context"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
"go.signoz.io/signoz/pkg/errors"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
var _ alertmanagerstore.Store = (*Provider)(nil)
type Provider struct {
States map[string]map[alertmanagertypes.StateName][]byte
Configs map[string]string
OrgIDs []string
}
func New(ctx context.Context, settings factory.ProviderSettings, config alertmanagerstore.Config, orgIDs []string) (*Provider, error) {
states := make(map[string]map[alertmanagertypes.StateName][]byte)
for _, orgID := range orgIDs {
states[orgID] = make(map[alertmanagertypes.StateName][]byte)
states[orgID][alertmanagertypes.SilenceStateName] = []byte{}
states[orgID][alertmanagertypes.NFLogStateName] = []byte{}
}
return &Provider{
States: states,
Configs: make(map[string]string),
OrgIDs: orgIDs,
}, nil
}
func (provider *Provider) GetState(ctx context.Context, orgID string, stateName alertmanagertypes.StateName) (string, error) {
if _, ok := provider.States[orgID][stateName]; !ok {
return "", errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerStateNotFound, "cannot find state %q for org %q", stateName, orgID)
}
return string(provider.States[orgID][stateName]), nil
}
func (provider *Provider) SetState(ctx context.Context, orgID string, stateName alertmanagertypes.StateName, state alertmanagertypes.State) (int64, error) {
var err error
provider.States[orgID][stateName], err = state.MarshalBinary()
if err != nil {
return 0, err
}
return int64(len(provider.States[orgID][stateName])), nil
}
func (provider *Provider) GetConfig(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) {
if _, ok := provider.Configs[orgID]; !ok {
return nil, errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerConfigNotFound, "cannot find config for org %s", orgID)
}
return alertmanagertypes.NewConfigFromString(provider.Configs[orgID], orgID)
}
func (provider *Provider) SetConfig(ctx context.Context, orgID string, config *alertmanagertypes.Config) error {
provider.Configs[orgID] = string(config.Raw())
return nil
}
func (provider *Provider) DelConfig(ctx context.Context, orgID string) error {
delete(provider.Configs, orgID)
return nil
}
func (provider *Provider) ListOrgIDs(ctx context.Context) ([]string, error) {
return provider.OrgIDs, nil
}
func (provider *Provider) ListChannels(ctx context.Context, orgID string) (alertmanagertypes.Channels, error) {
if _, ok := provider.Configs[orgID]; !ok {
return nil, errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerConfigNotFound, "cannot find config for org %s", orgID)
}
config, err := alertmanagertypes.NewConfigFromString(provider.Configs[orgID], orgID)
if err != nil {
return nil, err
}
return config.Channels(), nil
}
func (provider *Provider) GetChannel(ctx context.Context, orgID string, id uint64) (*alertmanagertypes.Channel, error) {
return nil, nil
}

View File

@@ -0,0 +1,17 @@
package alertmanagerstore
import "go.signoz.io/signoz/pkg/factory"
type Config struct {
Provider string `mapstructure:"provider"`
}
func NewConfig() factory.Config {
return Config{
Provider: "sql",
}
}
func (c Config) Validate() error {
return nil
}

View File

@@ -0,0 +1,248 @@
package sqlalertmanagerstore
import (
"context"
"database/sql"
"encoding/base64"
"fmt"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
"go.signoz.io/signoz/pkg/errors"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
type provider struct {
sqlstore sqlstore.SQLStore
settings factory.ScopedProviderSettings
}
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanagerstore.Store, alertmanagerstore.Config] {
return factory.NewProviderFactory(factory.MustNewName("sql"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanagerstore.Config) (alertmanagerstore.Store, error) {
return New(ctx, settings, config, sqlstore)
})
}
func New(ctx context.Context, settings factory.ProviderSettings, config alertmanagerstore.Config, sqlstore sqlstore.SQLStore) (*provider, error) {
return &provider{
sqlstore: sqlstore,
settings: factory.NewScopedProviderSettings(settings, "go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"),
}, nil
}
func (provider *provider) GetState(ctx context.Context, orgID string, stateName alertmanagertypes.StateName) (string, error) {
storedConfig := new(alertmanagertypes.StoredConfig)
err := provider.
sqlstore.
BunDB().
NewSelect().
Model(storedConfig).
Where("org_id = ?", orgID).
Scan(ctx)
if err != nil {
if err == sql.ErrNoRows {
return "", errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerStateNotFound, "cannot find alertmanager state for org %s", orgID)
}
return "", err
}
if stateName == alertmanagertypes.SilenceStateName {
decodedState, err := base64.RawStdEncoding.DecodeString(storedConfig.SilencesState)
if err != nil {
return "", err
}
return string(decodedState), nil
}
if stateName == alertmanagertypes.NFLogStateName {
decodedState, err := base64.RawStdEncoding.DecodeString(storedConfig.NFLogState)
if err != nil {
return "", err
}
return string(decodedState), nil
}
return "", errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerStateNotFound, "cannot find alertmanager state for org %s", orgID)
}
func (provider *provider) SetState(ctx context.Context, orgID string, stateName alertmanagertypes.StateName, state alertmanagertypes.State) (int64, error) {
marshalledState, err := state.MarshalBinary()
if err != nil {
return 0, err
}
encodedState := base64.StdEncoding.EncodeToString(marshalledState)
q := provider.
sqlstore.
BunDB().
NewUpdate().
Model(&alertmanagertypes.StoredConfig{}).
Where("org_id = ?", orgID)
if stateName == alertmanagertypes.SilenceStateName {
q.Set("silences_state = ?", encodedState)
}
if stateName == alertmanagertypes.NFLogStateName {
q.Set("nflog_state = ?", encodedState)
}
_, err = q.Exec(ctx)
if err != nil {
return 0, err
}
return int64(len(marshalledState)), nil
}
func (provider *provider) GetConfig(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) {
storedConfig := new(alertmanagertypes.StoredConfig)
err := provider.
sqlstore.
BunDB().
NewSelect().
Model(storedConfig).
Where("org_id = ?", orgID).
Scan(ctx)
if err != nil {
if err == sql.ErrNoRows {
return nil, errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerConfigNotFound, "cannot find alertmanager config for org %s", orgID)
}
return nil, err
}
config, err := alertmanagertypes.NewConfigFromString(storedConfig.Config, orgID)
if err != nil {
return nil, err
}
return config, nil
}
func (provider *provider) SetConfig(ctx context.Context, orgID string, config *alertmanagertypes.Config) error {
tx, err := provider.sqlstore.BunDB().BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() //nolint:errcheck
if _, err = tx.
NewInsert().
Model(config.StoredConfig()).
On("CONFLICT (org_id) DO UPDATE").
Set("config = ?", string(config.StoredConfig().Config)).
Set("updated_at = ?", config.StoredConfig().UpdatedAt).
Exec(ctx); err != nil {
return err
}
channels := config.Channels()
fmt.Println("channels", channels)
if len(channels) != 0 {
fmt.Println("channels", channels)
if _, err = tx.NewInsert().
Model(&channels).
On("CONFLICT (name) DO UPDATE").
Set("data = EXCLUDED.data").
Set("updated_at = EXCLUDED.updated_at").
Exec(ctx); err != nil {
return err
}
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}
func (provider *provider) DelConfig(ctx context.Context, orgID string) error {
_, err := provider.
sqlstore.
BunDB().
NewDelete().
Model(&alertmanagertypes.StoredConfig{}).
Where("org_id = ?", orgID).
Exec(ctx)
if err != nil {
return err
}
_, err = provider.
sqlstore.
BunDB().
NewDelete().
Model(&alertmanagertypes.Channel{}).
Where("org_id = ?", orgID).
Exec(ctx)
if err != nil {
return err
}
return nil
}
func (provider *provider) ListOrgIDs(ctx context.Context) ([]string, error) {
var orgIDs []string
err := provider.
sqlstore.
BunDB().
NewSelect().
Table("organizations").
ColumnExpr("id").
Scan(ctx, &orgIDs)
if err != nil {
return nil, err
}
return orgIDs, nil
}
func (provider *provider) ListChannels(ctx context.Context, orgID string) (alertmanagertypes.Channels, error) {
channels := alertmanagertypes.Channels{}
err := provider.
sqlstore.
BunDB().
NewSelect().
Model(&channels).
Where("org_id = ?", orgID).
Scan(ctx)
if err != nil {
return nil, err
}
return channels, nil
}
func (provider *provider) GetChannel(ctx context.Context, orgID string, id uint64) (*alertmanagertypes.Channel, error) {
channel := new(alertmanagertypes.Channel)
err := provider.
sqlstore.
BunDB().
NewSelect().
Model(channel).
Where("org_id = ?", orgID).
Where("id = ?", id).
Scan(ctx)
if err != nil {
if err == sql.ErrNoRows {
return nil, errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerChannelNotFound, "cannot find channel for org %s", orgID)
}
return nil, err
}
return channel, nil
}

View File

@@ -0,0 +1,44 @@
package alertmanagerstore
import (
"context"
"go.signoz.io/signoz/pkg/errors"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
var (
ErrCodeAlertmanagerConfigNotFound = errors.MustNewCode("alertmanager_config_not_found")
ErrCodeAlertmanagerStateNotFound = errors.MustNewCode("alertmanager_state_not_found")
ErrCodeAlertmanagerChannelNotFound = errors.MustNewCode("alertmanager_channel_not_found")
)
type Store interface {
// Creates the silence or the notification log state and returns the number of bytes in the state.
// The return type matches the return of `silence.Maintenance` or `nflog.Maintenance`.
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/silence/silence.go#L217
// and https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L94
SetState(context.Context, string, alertmanagertypes.StateName, alertmanagertypes.State) (int64, error)
// Gets the silence state or the notification log state as a string from the store. This is used as a snapshot to load the
// initial state of silences or notification log when starting the alertmanager.
GetState(context.Context, string, alertmanagertypes.StateName) (string, error)
// Get an alertmanager config for an organization
GetConfig(context.Context, string) (*alertmanagertypes.Config, error)
// Set an alertmanager config for an organization
SetConfig(context.Context, string, *alertmanagertypes.Config) error
// Deletes the config for an organization
DelConfig(context.Context, string) error
// Get all organization ids
ListOrgIDs(context.Context) ([]string, error)
// Get all channels for an organization
ListChannels(context.Context, string) (alertmanagertypes.Channels, error)
// Get a channel for an organization
GetChannel(context.Context, string, uint64) (*alertmanagertypes.Channel, error)
}

View File

@@ -0,0 +1,36 @@
package alertmanager
import (
"context"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
type Client interface {
// GetAlerts gets the alerts from the alertmanager per organization.
GetAlerts(context.Context, string, alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error)
// PutAlerts puts the alerts into the alertmanager per organization.
PutAlerts(context.Context, string, alertmanagertypes.PostableAlerts) error
// SetConfig sets the config into the alertmanager per organization.
SetConfig(context.Context, string, alertmanagertypes.PostableConfig) error
// TestReceiver sends a test alert to a receiver.
TestReceiver(context.Context, string, alertmanagertypes.Receiver) error
// CreateChannel creates a channel for the organization.
CreateChannel(context.Context, string, *alertmanagertypes.Channel) error
// GetChannel gets a channel for the organization.
GetChannel(context.Context, string, uint64) (*alertmanagertypes.Channel, error)
// DeleteChannel deletes a channel for the organization.
DelChannel(context.Context, string, uint64) error
// ListChannels lists all channels for the organization.
ListChannels(context.Context, string) (alertmanagertypes.Channels, error)
// UpdateChannel updates a channel for the organization.
UpdateChannel(context.Context, string, uint64, string) error
}

145
pkg/alertmanager/config.go Normal file
View File

@@ -0,0 +1,145 @@
package alertmanager
import (
"net/url"
"time"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
"go.signoz.io/signoz/pkg/factory"
)
type Config struct {
// PollInterval is the interval at which the alertmanager config is polled from the store.
PollInterval time.Duration `mapstructure:"poll_interval"`
// The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself.
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L155C54-L155C249
ExternalUrl *url.URL `mapstructure:"external_url"`
// ResolveTimeout is the time after which an alert is declared resolved if it has not been updated.
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/config/config.go#L836
ResolveTimeout time.Duration `mapstructure:"resolve_timeout"`
// Config of the root node of the routing tree.
Route RouteConfig `mapstructure:"route"`
// Configuration for alerts.
Alerts AlertsConfig `mapstructure:"alerts"`
// Configuration for silences.
Silences SilencesConfig `mapstructure:"silences"`
// Configuration for the notification log.
NFLog NFLogConfig `mapstructure:"nflog"`
// Configuration for the Email receiver. We are explicitly defining this here instead of taking it as part of the receiver configuration.
// This is because we want to use the same SMTP configuration for all receivers.
SMTP SMTPConfig `mapstructure:"smtp"`
// Configuration for the alertmanagerstore.
Store alertmanagerstore.Config `mapstructure:"store"`
}
type RouteConfig struct {
GroupBy []string `mapstructure:"group_by"`
GroupInterval time.Duration `mapstructure:"group_interval"`
GroupWait time.Duration `mapstructure:"group_wait"`
RepeatInterval time.Duration `mapstructure:"repeat_interval"`
}
// This is a best effort to make it similar to the upstream alertmanager config. See
// https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/config/config.go#L843
type SMTPConfig struct {
Hello string `mapstructure:"hello"`
From string `mapstructure:"from"`
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
AuthUsername string `mapstructure:"auth_username"`
AuthPassword string `mapstructure:"auth_password"`
AuthSecret string `mapstructure:"auth_secret"`
AuthIdentity string `mapstructure:"auth_identity"`
RequireTLS bool `mapstructure:"require_tls"`
}
type AlertsConfig struct {
// Interval between garbage collection of alerts.
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L152
GCInterval time.Duration `mapstructure:"gc_interval"`
}
type SilencesConfig struct {
// Maximum number of silences, including expired silences. If negative or zero, no limit is set.
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L150C64-L150C157
Max int `mapstructure:"max"`
// Maximum size of the silences in bytes. If negative or zero, no limit is set.
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L150C64-L150C157
MaxSizeBytes int `mapstructure:"max_size_bytes"`
// Interval between garbage collection and snapshotting of the silences. The snapshot will be stored in the state store.
// The upstream alertmanager config (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149) has
// been split between silences and nflog.
MaintenanceInterval time.Duration `mapstructure:"maintenance_interval"`
// Retention of the silences.
Retention time.Duration `mapstructure:"retention"`
}
type NFLogConfig struct {
// Interval between garbage collection and snapshotting of the notification logs. The snapshot will be stored in the state store.
// The upstream alertmanager config (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149) has
// been split between silences and nflog.
MaintenanceInterval time.Duration `mapstructure:"maintenance_interval"`
// Retention of the notification logs.
Retention time.Duration `mapstructure:"retention"`
}
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("alertmanager"), newConfig)
}
func newConfig() factory.Config {
return Config{
PollInterval: 15 * time.Second,
ExternalUrl: &url.URL{
Host: "localhost:8080",
},
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/config/config.go#L727)
ResolveTimeout: 5 * time.Minute,
Route: RouteConfig{
GroupBy: []string{"alertname"},
GroupInterval: 5 * time.Minute,
GroupWait: 30 * time.Second,
RepeatInterval: 4 * time.Hour,
},
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L152)
Alerts: AlertsConfig{
GCInterval: 30 * time.Minute,
},
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149-L151)
Silences: SilencesConfig{
Max: 0,
MaxSizeBytes: 0,
MaintenanceInterval: 15 * time.Minute,
Retention: 120 * time.Hour,
},
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149)
NFLog: NFLogConfig{
MaintenanceInterval: 15 * time.Minute,
Retention: 120 * time.Hour,
},
SMTP: SMTPConfig{
Hello: "localhost",
From: "alertmanager@signoz.io",
Host: "localhost",
Port: 25,
RequireTLS: true,
},
Store: alertmanagerstore.NewConfig().(alertmanagerstore.Config),
}
}
func (c Config) Validate() error {
return nil
}

339
pkg/alertmanager/server.go Normal file
View File

@@ -0,0 +1,339 @@
package alertmanager
import (
"context"
"strings"
"sync"
"time"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/featurecontrol"
"github.com/prometheus/alertmanager/inhibit"
"github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/provider/mem"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/template"
"github.com/prometheus/alertmanager/timeinterval"
"github.com/prometheus/common/model"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
"go.signoz.io/signoz/pkg/errors"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
var _ factory.Service = (*Server)(nil)
var (
// This is not a real file and will never be used. We need this placeholder to ensure maintenance runs on shutdown. See
// https://github.com/prometheus/server/blob/3ee2cd0f1271e277295c02b6160507b4d193dde2/silence/silence.go#L435-L438
// and https://github.com/prometheus/server/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L362.
snapfnoop string = "snapfnoop"
)
type Server struct {
// srvConfig is the server config for the alertmanager
srvConfig Config
// alertmanagerConfigHash is the hash of the alertmanager config
alertmanagerConfigHash [16]byte
// alertmanagerConfigRaw is the raw config of the alertmanager
alertmanagerConfigRaw []byte
// alertmanagerConfig is the config of the alertmanager
alertmanagerConfig *alertmanagertypes.Config
// Settings is the factorysettings for the alertmanager
settings factory.NamespacedSettings
// orgID is the orgID for the alertmanager
orgID string
// store is the backing store for the alertmanager
store alertmanagerstore.Store
// alertmanager primitives from upstream alertmanager
alerts *mem.Alerts
nflog *nflog.Log
dispatcher *dispatch.Dispatcher
dispatcherMetrics *dispatch.DispatcherMetrics
inhibitor *inhibit.Inhibitor
silencer *silence.Silencer
silences *silence.Silences
timeIntervals map[string][]timeinterval.TimeInterval
pipelineBuilder *notify.PipelineBuilder
marker *alertmanagertypes.MemMarker
tmpl *template.Template
wg sync.WaitGroup
stopc chan struct{}
}
func NewForOrg(ctx context.Context, settings factory.Settings, srvConfig Config, orgID string, store alertmanagerstore.Store) (*Server, error) {
server := &Server{
srvConfig: srvConfig,
settings: factory.NewNamespacedSettings(settings, "go.signoz.io/signoz/pkg/alertmanager"),
orgID: orgID,
store: store,
stopc: make(chan struct{}),
}
// initialize marker
server.marker = alertmanagertypes.NewMarker(server.settings.PrometheusRegisterer())
// get silences for initial state
silencesstate, err := store.GetState(ctx, server.orgID, alertmanagertypes.SilenceStateName)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
return nil, err
}
// get nflog for initial state
nflogstate, err := store.GetState(ctx, server.orgID, alertmanagertypes.NFLogStateName)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
return nil, err
}
// Initialize silences
server.silences, err = silence.New(silence.Options{
SnapshotReader: strings.NewReader(silencesstate),
Retention: srvConfig.Silences.Retention,
Limits: silence.Limits{
MaxSilences: func() int { return srvConfig.Silences.Max },
MaxSilenceSizeBytes: func() int { return srvConfig.Silences.MaxSizeBytes },
},
Metrics: server.settings.PrometheusRegisterer(),
Logger: server.settings.Logger(),
})
if err != nil {
return nil, err
}
// Initialize notification log
server.nflog, err = nflog.New(nflog.Options{
SnapshotReader: strings.NewReader(nflogstate),
Retention: server.srvConfig.NFLog.Retention,
Metrics: server.settings.PrometheusRegisterer(),
Logger: server.settings.Logger(),
})
if err != nil {
return nil, err
}
// Start maintenance for silences
server.wg.Add(1)
go func() {
defer server.wg.Done()
server.silences.Maintenance(server.srvConfig.Silences.MaintenanceInterval, snapfnoop, server.stopc, func() (int64, error) {
// Delete silences older than the retention period.
if _, err := server.silences.GC(); err != nil {
server.settings.Logger().ErrorContext(ctx, "silence garbage collection", "error", err)
// Don't return here - we need to snapshot our state first.
}
return server.store.SetState(ctx, server.orgID, alertmanagertypes.SilenceStateName, server.silences)
})
}()
// Start maintenance for notification logs
server.wg.Add(1)
go func() {
defer server.wg.Done()
server.nflog.Maintenance(server.srvConfig.NFLog.MaintenanceInterval, snapfnoop, server.stopc, func() (int64, error) {
if _, err := server.nflog.GC(); err != nil {
server.settings.Logger().ErrorContext(ctx, "notification log garbage collection", "error", err)
// Don't return without saving the current state.
}
return server.store.SetState(ctx, server.orgID, alertmanagertypes.NFLogStateName, server.nflog)
})
}()
server.alerts, err = mem.NewAlerts(ctx, server.marker, server.srvConfig.Alerts.GCInterval, nil, server.settings.Logger(), server.settings.PrometheusRegisterer())
if err != nil {
return nil, err
}
server.pipelineBuilder = notify.NewPipelineBuilder(server.settings.PrometheusRegisterer(), featurecontrol.NoopFlags{})
server.dispatcherMetrics = dispatch.NewDispatcherMetrics(false, server.settings.PrometheusRegisterer())
return server, nil
}
func (server *Server) Start(ctx context.Context) error {
config, err := server.store.GetConfig(ctx, server.orgID)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
return err
}
if config == nil {
config = alertmanagertypes.NewDefaultConfig(
server.srvConfig.ResolveTimeout,
server.srvConfig.SMTP.Hello,
server.srvConfig.SMTP.From,
server.srvConfig.SMTP.Host,
server.srvConfig.SMTP.Port,
server.srvConfig.SMTP.AuthUsername,
server.srvConfig.SMTP.AuthPassword,
server.srvConfig.SMTP.AuthSecret,
server.srvConfig.SMTP.AuthIdentity,
server.srvConfig.SMTP.RequireTLS,
server.srvConfig.Route.GroupBy,
server.srvConfig.Route.GroupInterval,
server.srvConfig.Route.GroupWait,
server.srvConfig.Route.RepeatInterval,
server.orgID,
)
}
return server.SetConfig(ctx, config)
}
func (server *Server) GetAlerts(ctx context.Context, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
return alertmanagertypes.NewGettableAlertsFromAlertProvider(server.alerts, server.alertmanagerConfig, server.marker.Status, func(labels model.LabelSet) {
server.inhibitor.Mutes(labels)
server.silencer.Mutes(labels)
}, params)
}
func (server *Server) PutAlerts(ctx context.Context, postableAlerts alertmanagertypes.PostableAlerts) error {
alerts, err := alertmanagertypes.NewAlertsFromPostableAlerts(postableAlerts, server.srvConfig.ResolveTimeout, time.Now())
// Notification sending alert takes precedence over validation errors.
if err := server.alerts.Put(alerts...); err != nil {
return err
}
if err != nil {
return errors.Join(err...)
}
return nil
}
func (server *Server) ConfigHash() [16]byte {
return server.alertmanagerConfigHash
}
func (server *Server) ConfigRaw() []byte {
return server.alertmanagerConfigRaw
}
func (server *Server) SetConfig(ctx context.Context, alertmanagerConfig *alertmanagertypes.Config) error {
config := alertmanagerConfig.AlertmanagerConfig()
var err error
server.tmpl, err = template.FromGlobs(config.Templates)
if err != nil {
return err
}
server.tmpl.ExternalURL = server.srvConfig.ExternalUrl
// Build the routing tree and record which receivers are used.
routes := dispatch.NewRoute(config.Route, nil)
activeReceivers := make(map[string]struct{})
routes.Walk(func(r *dispatch.Route) {
activeReceivers[r.RouteOpts.Receiver] = struct{}{}
})
// Build the map of receiver to integrations.
receivers := make(map[string][]notify.Integration, len(activeReceivers))
var integrationsNum int
for _, rcv := range config.Receivers {
if _, found := activeReceivers[rcv.Name]; !found {
// No need to build a receiver if no route is using it.
server.settings.Logger().InfoContext(ctx, "skipping creation of receiver not referenced by any route", "receiver", rcv.Name)
continue
}
integrations, err := alertmanagertypes.NewReceiverIntegrations(rcv, server.tmpl, server.settings.Logger())
if err != nil {
return err
}
// rcv.Name is guaranteed to be unique across all receivers.
receivers[rcv.Name] = integrations
integrationsNum += len(integrations)
}
// Build the map of time interval names to time interval definitions.
timeIntervals := make(map[string][]timeinterval.TimeInterval, len(config.MuteTimeIntervals)+len(config.TimeIntervals))
for _, ti := range config.MuteTimeIntervals {
timeIntervals[ti.Name] = ti.TimeIntervals
}
for _, ti := range config.TimeIntervals {
timeIntervals[ti.Name] = ti.TimeIntervals
}
intervener := timeinterval.NewIntervener(timeIntervals)
if server.inhibitor != nil {
server.inhibitor.Stop()
}
if server.dispatcher != nil {
server.dispatcher.Stop()
}
server.inhibitor = inhibit.NewInhibitor(server.alerts, config.InhibitRules, server.marker, server.settings.Logger())
server.timeIntervals = timeIntervals
server.silencer = silence.NewSilencer(server.silences, server.marker, server.settings.Logger())
var pipelinePeer notify.Peer
pipeline := server.pipelineBuilder.New(
receivers,
func() time.Duration { return 0 },
server.inhibitor,
server.silencer,
intervener,
server.marker,
server.nflog,
pipelinePeer,
)
timeoutFunc := func(d time.Duration) time.Duration {
if d < notify.MinTimeout {
d = notify.MinTimeout
}
return d
}
server.dispatcher = dispatch.NewDispatcher(
server.alerts,
routes,
pipeline,
server.marker,
timeoutFunc,
nil,
server.settings.Logger(),
server.dispatcherMetrics,
)
// Do not try to add these to `server.wg as there seems to be a race condition if
// we call Start() and Stop() in quick succession.
// Both these goroutines will run indefinitely.
go server.dispatcher.Run()
go server.inhibitor.Run()
server.alertmanagerConfigHash = alertmanagerConfig.Hash()
server.alertmanagerConfigRaw = alertmanagerConfig.Raw()
server.alertmanagerConfig = alertmanagerConfig
return nil
}
func (server *Server) TestReceiver(ctx context.Context, receiver alertmanagertypes.Receiver) error {
return alertmanagertypes.TestReceiver(ctx, receiver, server.tmpl, server.settings.Logger())
}
func (server *Server) Stop(ctx context.Context) error {
if server.dispatcher != nil {
server.dispatcher.Stop()
}
if server.inhibitor != nil {
server.inhibitor.Stop()
}
// Close the alert provider.
server.alerts.Close()
// Signals maintenance goroutines of server states to stop.
close(server.stopc)
// Wait for all goroutines to finish.
server.wg.Wait()
return nil
}

View File

@@ -0,0 +1,84 @@
package alertmanager
import (
"bytes"
"context"
"net"
"net/http"
"net/url"
"testing"
"github.com/prometheus/alertmanager/config"
commoncfg "github.com/prometheus/common/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/alertmanagerstoretest"
"go.signoz.io/signoz/pkg/factory/factorytest"
"go.signoz.io/signoz/pkg/factory/providertest"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
func TestServerStartStop(t *testing.T) {
store, err := alertmanagerstoretest.New(context.Background(), providertest.NewSettings(), alertmanagerstore.NewConfig().(alertmanagerstore.Config), []string{"1"})
require.NoError(t, err)
server, err := NewForOrg(context.Background(), factorytest.NewSettings(), newConfig().(Config), "1", store)
require.NoError(t, err)
require.NoError(t, server.Start(context.Background()))
require.NoError(t, server.Stop(context.Background()))
}
func TestServerWithDefaultConfig(t *testing.T) {
store, err := alertmanagerstoretest.New(context.Background(), providertest.NewSettings(), alertmanagerstore.NewConfig().(alertmanagerstore.Config), []string{"1"})
require.NoError(t, err)
server, err := NewForOrg(context.Background(), factorytest.NewSettings(), newConfig().(Config), "1", store)
require.NoError(t, err)
require.NoError(t, server.Start(context.Background()))
defer require.NoError(t, server.Stop(context.Background()))
assert.Equal(t, `{"global":{"resolve_timeout":"5m","http_config":{"tls_config":{"insecure_skip_verify":false},"follow_redirects":true,"enable_http2":true,"proxy_url":null},"smtp_from":"alertmanager@signoz.io","smtp_hello":"localhost","smtp_smarthost":"localhost:25","smtp_require_tls":true,"smtp_tls_config":{"insecure_skip_verify":false},"pagerduty_url":"https://events.pagerduty.com/v2/enqueue","opsgenie_api_url":"https://api.opsgenie.com/","wechat_api_url":"https://qyapi.weixin.qq.com/cgi-bin/","victorops_api_url":"https://alert.victorops.com/integrations/generic/20131114/alert/","telegram_api_url":"https://api.telegram.org","webex_api_url":"https://webexapis.com/v1/messages","rocketchat_api_url":"https://open.rocket.chat/"},"route":{"receiver":"default-receiver","group_by":["alertname"],"group_wait":"30s","group_interval":"5m","repeat_interval":"4h"},"receivers":[{"name":"default-receiver"}],"templates":null}`, string(server.alertmanagerConfigRaw))
}
func TestServerTestReceiverWebhook(t *testing.T) {
store, err := alertmanagerstoretest.New(context.Background(), providertest.NewSettings(), alertmanagerstore.NewConfig().(alertmanagerstore.Config), []string{"1"})
require.NoError(t, err)
server, err := NewForOrg(context.Background(), factorytest.NewSettings(), newConfig().(Config), "1", store)
require.NoError(t, err)
webhookListener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
requestBody := new(bytes.Buffer)
webhookServer := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := requestBody.ReadFrom(r.Body)
require.NoError(t, err)
w.WriteHeader(http.StatusOK)
}),
}
go func() {
require.NoError(t, webhookServer.Serve(webhookListener))
}()
require.NoError(t, server.Start(context.Background()))
defer require.NoError(t, server.Stop(context.Background()))
webhookURL, err := url.Parse("http://" + webhookListener.Addr().String() + "/webhook")
require.NoError(t, err)
err = server.TestReceiver(context.Background(), alertmanagertypes.Receiver{
Name: "test-receiver",
WebhookConfigs: []*config.WebhookConfig{
{
HTTPConfig: &commoncfg.HTTPClientConfig{},
URL: &config.SecretURL{URL: webhookURL},
},
},
})
require.NoError(t, err)
require.Contains(t, requestBody.String(), "test-receiver")
require.Contains(t, requestBody.String(), "firing")
}

214
pkg/alertmanager/servers.go Normal file
View File

@@ -0,0 +1,214 @@
package alertmanager
import (
"context"
"sync"
"time"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
"go.signoz.io/signoz/pkg/errors"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
var (
ErrCodeAlertmanagerNotFound = errors.MustNewCode("alertmanager_not_found")
)
var _ factory.Service = (*Servers)(nil)
var _ Client = (*Servers)(nil)
type Servers struct {
config Config
// Store is the store for the alertmanager
store alertmanagerstore.Store
// Map of organization id to server
servers map[string]*Server
// Mutex to protect the servers map
serversMtx sync.RWMutex
}
func New(ctx context.Context, settings factory.Settings, config Config, store alertmanagerstore.Store) (*Servers, error) {
servers := &Servers{
config: config,
store: store,
servers: map[string]*Server{},
serversMtx: sync.RWMutex{},
}
orgIDs, err := store.ListOrgIDs(ctx)
if err != nil {
return nil, err
}
for _, orgID := range orgIDs {
server, err := NewForOrg(ctx, settings, config, orgID, store)
if err != nil {
return nil, err
}
servers.servers[orgID] = server
}
return servers, nil
}
func (ss *Servers) Start(ctx context.Context) error {
for _, server := range ss.servers {
err := server.Start(ctx)
if err != nil {
return err
}
}
ticker := time.NewTicker(ss.config.PollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
for _, server := range ss.servers {
config, err := ss.store.GetConfig(ctx, server.orgID)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
server.settings.Logger().ErrorContext(ctx, "failed to get config", "error", err, "orgID", server.orgID)
continue
}
if config == nil {
config = alertmanagertypes.NewDefaultConfig(
server.srvConfig.ResolveTimeout,
server.srvConfig.SMTP.Hello,
server.srvConfig.SMTP.From,
server.srvConfig.SMTP.Host,
server.srvConfig.SMTP.Port,
server.srvConfig.SMTP.AuthUsername,
server.srvConfig.SMTP.AuthPassword,
server.srvConfig.SMTP.AuthSecret,
server.srvConfig.SMTP.AuthIdentity,
server.srvConfig.SMTP.RequireTLS,
server.srvConfig.Route.GroupBy,
server.srvConfig.Route.GroupInterval,
server.srvConfig.Route.GroupWait,
server.srvConfig.Route.RepeatInterval,
server.orgID,
)
}
if err := server.SetConfig(ctx, config); err != nil {
server.settings.Logger().ErrorContext(ctx, "failed to set config in alertmanager", "error", err, "orgID", server.orgID)
}
}
}
}
}
func (ss *Servers) Stop(ctx context.Context) error {
for _, server := range ss.servers {
server.Stop(ctx)
}
return nil
}
func (ss *Servers) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
server, ok := ss.servers[orgID]
if !ok {
return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for orgID %q", orgID)
}
return server.GetAlerts(ctx, params)
}
func (ss *Servers) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error {
server, ok := ss.servers[orgID]
if !ok {
return errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for orgID %q", orgID)
}
return server.PutAlerts(ctx, alerts)
}
func (ss *Servers) SetConfig(ctx context.Context, orgID string, postableConfig alertmanagertypes.PostableConfig) error {
cfg, err := ss.store.GetConfig(ctx, orgID)
if err != nil {
return err
}
err = cfg.MergeWithPostableConfig(postableConfig)
if err != nil {
return err
}
if err := ss.store.SetConfig(ctx, orgID, cfg); err != nil {
return err
}
return nil
}
func (ss *Servers) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
server, ok := ss.servers[orgID]
if !ok {
return errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for orgID %q", orgID)
}
return server.TestReceiver(ctx, receiver)
}
func (ss *Servers) CreateChannel(ctx context.Context, orgID string, channel *alertmanagertypes.Channel) error {
receiver, err := alertmanagertypes.NewReceiverFromChannel(channel)
if err != nil {
return err
}
return ss.SetConfig(ctx, orgID, alertmanagertypes.PostableConfig{
Action: alertmanagertypes.PostableConfigActionCreate,
Receiver: receiver,
})
}
func (ss *Servers) GetChannel(ctx context.Context, orgID string, id uint64) (*alertmanagertypes.Channel, error) {
return ss.store.GetChannel(ctx, orgID, id)
}
func (ss *Servers) DelChannel(ctx context.Context, orgID string, id uint64) error {
channel, err := ss.store.GetChannel(ctx, orgID, id)
if err != nil {
return err
}
receiver, err := alertmanagertypes.NewReceiverFromChannel(channel)
if err != nil {
return err
}
return ss.SetConfig(ctx, orgID, alertmanagertypes.PostableConfig{
Action: alertmanagertypes.PostableConfigActionDelete,
Receiver: receiver,
})
}
func (ss *Servers) ListChannels(ctx context.Context, orgID string) (alertmanagertypes.Channels, error) {
return ss.store.ListChannels(ctx, orgID)
}
func (ss *Servers) UpdateChannel(ctx context.Context, orgID string, id uint64, data string) error {
existingChannel, err := ss.store.GetChannel(ctx, orgID, id)
if err != nil {
return err
}
existingChannel.Data = data
existingChannel.UpdatedAt = time.Now()
receiver, err := alertmanagertypes.NewReceiverFromChannel(existingChannel)
if err != nil {
return err
}
return ss.SetConfig(ctx, orgID, alertmanagertypes.PostableConfig{
Action: alertmanagertypes.PostableConfigActionUpdate,
Receiver: receiver,
})
}

View File

@@ -0,0 +1,10 @@
package factorytest
import (
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/instrumentation/instrumentationtest"
)
func NewSettings() factory.Settings {
return instrumentationtest.New().ToFactorySettings()
}

View File

@@ -2,9 +2,12 @@ package factory
import (
"fmt"
"log/slog"
"regexp"
)
var _ slog.LogValuer = Name{}
var (
// nameRegex is a regex that matches a valid name.
// It must start with a alphabet, and can only contain alphabets, numbers, underscores or hyphens.
@@ -15,6 +18,10 @@ type Name struct {
name string
}
func (n Name) LogValue() slog.Value {
return slog.StringValue(n.name)
}
func (n Name) String() string {
return n.name
}

View File

@@ -1,6 +1,8 @@
package factory
import "context"
import (
"context"
)
type Service interface {
// Starts a service. The service should return an error if it cannot be started.
@@ -8,3 +10,24 @@ type Service interface {
// Stops a service.
Stop(context.Context) error
}
type NamedService interface {
Named
Service
}
type namedService struct {
name Name
Service
}
func (s *namedService) Name() Name {
return s.name
}
func NewNamedService(name Name, service Service) NamedService {
return &namedService{
name: name,
Service: service,
}
}

View File

@@ -8,6 +8,56 @@ import (
sdktrace "go.opentelemetry.io/otel/trace"
)
type Settings struct {
// Logger is the logger.
Logger *slog.Logger
// MeterProvider is the meter provider.
MeterProvider sdkmetric.MeterProvider
// TracerProvider is the tracer provider.
TracerProvider sdktrace.TracerProvider
// PrometheusRegisterer is the prometheus registerer.
PrometheusRegisterer prometheus.Registerer
}
type NamespacedSettings interface {
Logger() *slog.Logger
Meter() sdkmetric.Meter
Tracer() sdktrace.Tracer
PrometheusRegisterer() prometheus.Registerer
}
type namespacedSettings struct {
logger *slog.Logger
meter sdkmetric.Meter
tracer sdktrace.Tracer
prometheusRegisterer prometheus.Registerer
}
func NewNamespacedSettings(settings Settings, pkgName string) NamespacedSettings {
return &namespacedSettings{
logger: settings.Logger.With("pkg", pkgName),
meter: settings.MeterProvider.Meter(pkgName),
tracer: settings.TracerProvider.Tracer(pkgName),
prometheusRegisterer: prometheus.WrapRegistererWith(prometheus.Labels{"pkg": pkgName}, settings.PrometheusRegisterer),
}
}
func (s *namespacedSettings) Logger() *slog.Logger {
return s.logger
}
func (s *namespacedSettings) Meter() sdkmetric.Meter {
return s.meter
}
func (s *namespacedSettings) Tracer() sdktrace.Tracer {
return s.tracer
}
func (s *namespacedSettings) PrometheusRegisterer() prometheus.Registerer {
return s.prometheusRegisterer
}
type ProviderSettings struct {
// SlogLogger is the slog logger.
Logger *slog.Logger

View File

@@ -8,8 +8,16 @@ import (
sdkresource "go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/trace"
"go.signoz.io/signoz/pkg/factory"
"go.uber.org/zap/zapcore"
)
var zapLogLevelToSlogLevel = map[zapcore.Level]slog.Level{
zapcore.DebugLevel: slog.LevelDebug,
zapcore.InfoLevel: slog.LevelInfo,
zapcore.WarnLevel: slog.LevelWarn,
zapcore.ErrorLevel: slog.LevelError,
}
// Instrumentation provides the core components for application instrumentation.
type Instrumentation interface {
// Logger returns the Slog logger.
@@ -22,6 +30,8 @@ type Instrumentation interface {
PrometheusRegisterer() prometheus.Registerer
// ToProviderSettings converts instrumentation to provider settings.
ToProviderSettings() factory.ProviderSettings
// ToFactorySettings converts instrumentation to factory settings.
ToFactorySettings() factory.Settings
}
// Merges the input attributes with the resource attributes.

View File

@@ -51,3 +51,12 @@ func (i *noopInstrumentation) ToProviderSettings() factory.ProviderSettings {
PrometheusRegisterer: i.PrometheusRegisterer(),
}
}
func (i *noopInstrumentation) ToFactorySettings() factory.Settings {
return factory.Settings{
Logger: i.Logger(),
MeterProvider: i.MeterProvider(),
TracerProvider: i.TracerProvider(),
PrometheusRegisterer: i.PrometheusRegisterer(),
}
}

View File

@@ -131,3 +131,12 @@ func (i *SDK) ToProviderSettings() factory.ProviderSettings {
PrometheusRegisterer: i.PrometheusRegisterer(),
}
}
func (i *SDK) ToFactorySettings() factory.Settings {
return factory.Settings{
Logger: i.Logger(),
MeterProvider: i.MeterProvider(),
TracerProvider: i.TracerProvider(),
PrometheusRegisterer: i.PrometheusRegisterer(),
}
}

View File

@@ -24,6 +24,7 @@ import (
_ "github.com/mattn/go-sqlite3"
"github.com/prometheus/prometheus/promql"
"go.signoz.io/signoz/pkg/http/render"
"go.signoz.io/signoz/pkg/query-service/agentConf"
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
@@ -49,6 +50,8 @@ import (
"go.signoz.io/signoz/pkg/query-service/contextlinks"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/postprocess"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
"go.uber.org/zap"
@@ -126,6 +129,7 @@ type APIHandler struct {
jobsRepo *inframetrics.JobsRepo
pvcsRepo *inframetrics.PvcsRepo
SigNoz *signoz.SigNoz
}
type APIHandlerOpts struct {
@@ -165,6 +169,8 @@ type APIHandlerOpts struct {
UseLogsNewSchema bool
UseTraceNewSchema bool
SigNoz *signoz.SigNoz
}
// NewAPIHandler returns an APIHandler
@@ -237,6 +243,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
statefulsetsRepo: statefulsetsRepo,
jobsRepo: jobsRepo,
pvcsRepo: pvcsRepo,
SigNoz: opts.SigNoz,
}
logsQueryBuilder := logsv3.PrepareLogsQuery
@@ -1310,36 +1317,74 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) {
}
func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
channel, apiErrorObj := aH.ruleManager.RuleDB().GetChannel(id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
idVar := mux.Vars(r)["id"]
id, err := strconv.ParseUint(idVar, 10, 64)
if err != nil {
render.Error(w, err)
return
}
orgId, err := auth.GetOrgIdFromJwt(r.Context())
if err != nil {
render.Error(w, err)
return
}
channel, err := aH.SigNoz.AlertmanagerClient.GetChannel(r.Context(), orgId, id)
if err != nil {
render.Error(w, err)
return
}
aH.Respond(w, channel)
}
func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
apiErrorObj := aH.ruleManager.RuleDB().DeleteChannel(id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
idVar := mux.Vars(r)["id"]
id, err := strconv.ParseUint(idVar, 10, 64)
if err != nil {
render.Error(w, err)
return
}
orgId, err := auth.GetOrgIdFromJwt(r.Context())
if err != nil {
render.Error(w, err)
return
}
err = aH.SigNoz.AlertmanagerClient.DelChannel(r.Context(), orgId, id)
if err != nil {
render.Error(w, err)
return
}
aH.Respond(w, "notification channel successfully deleted")
}
func (aH *APIHandler) listChannels(w http.ResponseWriter, r *http.Request) {
channels, apiErrorObj := aH.ruleManager.RuleDB().GetChannels()
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
orgId, err := auth.GetOrgIdFromJwt(r.Context())
if err != nil {
render.Error(w, err)
return
}
channels, err := aH.SigNoz.AlertmanagerClient.ListChannels(r.Context(), orgId)
if err != nil {
render.Error(w, err)
return
}
aH.Respond(w, channels)
}
// testChannels sends test alert to all registered channels
func (aH *APIHandler) testChannel(w http.ResponseWriter, r *http.Request) {
orgId, err := auth.GetOrgIdFromJwt(r.Context())
if err != nil {
render.Error(w, err)
return
}
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
@@ -1349,24 +1394,34 @@ func (aH *APIHandler) testChannel(w http.ResponseWriter, r *http.Request) {
return
}
receiver := &am.Receiver{}
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
zap.L().Error("Error in parsing req body of testChannel API\n", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
receiver, err := alertmanagertypes.NewReceiverFromString(string(body))
if err != nil {
render.Error(w, err)
return
}
// send alert
apiErrorObj := aH.alertManager.TestReceiver(receiver)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
err = aH.SigNoz.AlertmanagerClient.TestReceiver(r.Context(), orgId, receiver)
if err != nil {
render.Error(w, err)
return
}
aH.Respond(w, "test alert sent")
}
func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
idVar := mux.Vars(r)["id"]
id, err := strconv.ParseUint(idVar, 10, 64)
if err != nil {
render.Error(w, err)
return
}
id := mux.Vars(r)["id"]
orgId, err := auth.GetOrgIdFromJwt(r.Context())
if err != nil {
render.Error(w, err)
return
}
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
@@ -1376,17 +1431,9 @@ func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
return
}
receiver := &am.Receiver{}
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
zap.L().Error("Error in parsing req body of editChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
_, apiErrorObj := aH.ruleManager.RuleDB().EditChannel(receiver, id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
err = aH.SigNoz.AlertmanagerClient.UpdateChannel(r.Context(), orgId, id, string(body))
if err != nil {
render.Error(w, err)
return
}
@@ -1395,6 +1442,11 @@ func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
}
func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) {
orgId, err := auth.GetOrgIdFromJwt(r.Context())
if err != nil {
render.Error(w, err)
return
}
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
@@ -1404,41 +1456,41 @@ func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) {
return
}
receiver := &am.Receiver{}
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
zap.L().Error("Error in parsing req body of createChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
channel, err := alertmanagertypes.NewChannelFromReceiverString(string(body), orgId)
if err != nil {
render.Error(w, err)
return
}
_, apiErrorObj := aH.ruleManager.RuleDB().CreateChannel(receiver)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
err = aH.SigNoz.AlertmanagerClient.CreateChannel(r.Context(), orgId, channel)
if err != nil {
render.Error(w, err)
return
}
aH.Respond(w, nil)
}
func (aH *APIHandler) getAlerts(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
amEndpoint := constants.GetAlertManagerApiPrefix()
resp, err := http.Get(amEndpoint + "v1/alerts" + "?" + params.Encode())
orgId, err := auth.GetOrgIdFromJwt(r.Context())
if err != nil {
render.Error(w, err)
return
}
params, err := alertmanagertypes.NewGettableAlertsParams(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
alerts, err := aH.SigNoz.AlertmanagerClient.GetAlerts(r.Context(), orgId, params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, string(body))
aH.Respond(w, alerts)
}
func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) {

View File

@@ -200,6 +200,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
FluxInterval: fluxInterval,
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
SigNoz: serverOptions.SigNoz,
})
if err != nil {
return nil, err

View File

@@ -132,3 +132,17 @@ func GetEmailFromJwt(ctx context.Context) (string, error) {
return claims["email"].(string), nil
}
func GetOrgIdFromJwt(ctx context.Context) (string, error) {
jwt, ok := ExtractJwtFromContext(ctx)
if !ok {
return "", model.InternalError(fmt.Errorf("failed to extract jwt from context"))
}
claims, err := ParseJWT(jwt)
if err != nil {
return "", model.InternalError(fmt.Errorf("failed get claims from jwt %v", err))
}
return claims["orgId"].(string), nil
}

View File

@@ -4,23 +4,23 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"go.signoz.io/signoz/pkg/factory"
"go.uber.org/zap"
)
type Registry struct {
services []factory.Service
logger *zap.Logger
services factory.NamedMap[factory.NamedService]
logger *slog.Logger
startCh chan error
stopCh chan error
}
// New creates a new registry of services. It needs at least one service in the input.
func New(logger *zap.Logger, services ...factory.Service) (*Registry, error) {
func New(logger *slog.Logger, services ...factory.NamedService) (*Registry, error) {
if logger == nil {
return nil, fmt.Errorf("cannot build registry, logger is required")
}
@@ -29,19 +29,25 @@ func New(logger *zap.Logger, services ...factory.Service) (*Registry, error) {
return nil, fmt.Errorf("cannot build registry, at least one service is required")
}
m := factory.MustNewNamedMap(services...)
return &Registry{
logger: logger.Named("go.signoz.io/pkg/registry"),
services: services,
logger: logger,
services: m,
startCh: make(chan error, 1),
stopCh: make(chan error, len(services)),
}, nil
}
func (r *Registry) Start(ctx context.Context) error {
for _, s := range r.services {
go func(s factory.Service) {
for _, s := range r.services.GetInOrder() {
go func(s factory.NamedService) {
r.logger.InfoContext(ctx, "starting service", "service", s.Name())
err := s.Start(ctx)
r.startCh <- err
if err != nil {
r.logger.ErrorContext(ctx, "failed to start service", "service", s.Name(), "error", err)
r.startCh <- err
}
}(s)
}
@@ -54,11 +60,11 @@ func (r *Registry) Wait(ctx context.Context) error {
select {
case <-ctx.Done():
r.logger.Info("caught context error, exiting", zap.Any("context", ctx))
r.logger.InfoContext(ctx, "caught context error, exiting", "error", ctx.Err())
case s := <-interrupt:
r.logger.Info("caught interrupt signal, exiting", zap.Any("context", ctx), zap.Any("signal", s))
r.logger.InfoContext(ctx, "caught interrupt signal, exiting", "signal", s)
case err := <-r.startCh:
r.logger.Info("caught service error, exiting", zap.Any("context", ctx), zap.Error(err))
r.logger.ErrorContext(ctx, "caught service error, exiting", "error", err)
return err
}
@@ -66,15 +72,16 @@ func (r *Registry) Wait(ctx context.Context) error {
}
func (r *Registry) Stop(ctx context.Context) error {
for _, s := range r.services {
go func(s factory.Service) {
for _, s := range r.services.GetInOrder() {
go func(s factory.NamedService) {
r.logger.InfoContext(ctx, "stopping service", "service", s.Name())
err := s.Stop(ctx)
r.stopCh <- err
}(s)
}
errs := make([]error, len(r.services))
for i := 0; i < len(r.services); i++ {
errs := make([]error, len(r.services.GetInOrder()))
for i := 0; i < len(r.services.GetInOrder()); i++ {
err := <-r.stopCh
if err != nil {
errs = append(errs, err)

View File

@@ -2,12 +2,13 @@ package registry
import (
"context"
"io"
"log/slog"
"sync"
"testing"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/factory/servicetest"
"go.uber.org/zap"
)
func TestRegistryWith2HttpServers(t *testing.T) {
@@ -17,7 +18,7 @@ func TestRegistryWith2HttpServers(t *testing.T) {
http2, err := servicetest.NewHttpService("http2")
require.NoError(t, err)
registry, err := New(zap.NewNop(), http1, http2)
registry, err := New(slog.New(slog.NewTextHandler(io.Discard, nil)), http1, http2)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
@@ -41,7 +42,7 @@ func TestRegistryWith2HttpServersWithoutWait(t *testing.T) {
http2, err := servicetest.NewHttpService("http2")
require.NoError(t, err)
registry, err := New(zap.NewNop(), http1, http2)
registry, err := New(slog.New(slog.NewTextHandler(io.Discard, nil)), http1, http2)
require.NoError(t, err)
ctx := context.Background()

View File

@@ -7,6 +7,7 @@ import (
"reflect"
"time"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/apiserver"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/config"
@@ -44,6 +45,9 @@ type Config struct {
// TelemetryStore config
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"`
// Alertmanager config
Alertmanager alertmanager.Config `mapstructure:"alertmanager"`
}
// DeprecatedFlags are the flags that are deprecated and scheduled for removal.
@@ -63,6 +67,7 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprec
sqlmigrator.NewConfigFactory(),
apiserver.NewConfigFactory(),
telemetrystore.NewConfigFactory(),
alertmanager.NewConfigFactory(),
}
conf, err := config.New(ctx, resolverConfig, configFactories)

View File

@@ -59,6 +59,7 @@ func NewProviderConfig() ProviderConfig {
sqlmigration.NewAddIntegrationsFactory(),
sqlmigration.NewAddLicensesFactory(),
sqlmigration.NewAddPatsFactory(),
sqlmigration.NewAddAlertmanagerConfigurationFactory(),
sqlmigration.NewModifyDatetimeFactory(),
),
TelemetryStoreProviderFactories: factory.MustNewNamedMap(

View File

@@ -3,9 +3,12 @@ package signoz
import (
"context"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/instrumentation"
"go.signoz.io/signoz/pkg/registry"
"go.signoz.io/signoz/pkg/sqlmigration"
"go.signoz.io/signoz/pkg/sqlmigrator"
"go.signoz.io/signoz/pkg/sqlstore"
@@ -16,10 +19,12 @@ import (
)
type SigNoz struct {
Cache cache.Cache
Web web.Web
SQLStore sqlstore.SQLStore
TelemetryStore telemetrystore.TelemetryStore
*registry.Registry
Cache cache.Cache
Web web.Web
SQLStore sqlstore.SQLStore
TelemetryStore telemetrystore.TelemetryStore
AlertmanagerClient alertmanager.Client
}
func New(
@@ -38,6 +43,9 @@ func New(
// Get the provider settings from instrumentation
providerSettings := instrumentation.ToProviderSettings()
// Get the factory settings from instrumentation
factorySettings := instrumentation.ToFactorySettings()
// Initialize cache from the available cache provider factories
cache, err := factory.NewProviderFromNamedMap(
ctx,
@@ -102,10 +110,29 @@ func New(
return nil, err
}
alertmanagerstore, err := sqlalertmanagerstore.
NewFactory(sqlstore).
New(ctx, providerSettings, config.Alertmanager.Store)
if err != nil {
return nil, err
}
alertmanagerService, err := alertmanager.New(ctx, factorySettings, config.Alertmanager, alertmanagerstore)
if err != nil {
return nil, err
}
registry, err := registry.New(instrumentation.Logger(), factory.NewNamedService(factory.MustNewName("alertmanager"), alertmanagerService))
if err != nil {
return nil, err
}
return &SigNoz{
Cache: cache,
Web: web,
SQLStore: sqlstore,
TelemetryStore: telemetrystore,
Registry: registry,
Cache: cache,
Web: web,
SQLStore: sqlstore,
TelemetryStore: telemetrystore,
AlertmanagerClient: alertmanagerService,
}, nil
}

View File

@@ -0,0 +1,62 @@
package sqlmigration
import (
"context"
"time"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
)
type addAlertmanagerConfiguration struct{}
func NewAddAlertmanagerConfigurationFactory() factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("add_alertmanager_configuration"), newAddAlertmanagerConfiguration)
}
func newAddAlertmanagerConfiguration(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
return &addAlertmanagerConfiguration{}, nil
}
func (migration *addAlertmanagerConfiguration) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addAlertmanagerConfiguration) Up(ctx context.Context, db *bun.DB) error {
if _, err := db.
NewCreateTable().
Model(&struct {
bun.BaseModel `bun:"table:alertmanager_config"`
ID uint64 `bun:"id"`
Config string `bun:"config"`
SilencesState string `bun:"silences_state"`
NFLogState string `bun:"nflog_state"`
CreatedAt time.Time `bun:"created_at"`
UpdatedAt time.Time `bun:"updated_at"`
OrgID string `bun:"org_id,unique"`
}{}).
ForeignKey(`("org_id") REFERENCES "organizations" ("id") ON DELETE CASCADE`).
IfNotExists().
Exec(ctx); err != nil {
return err
}
if _, err := db.
NewAddColumn().
Table("notification_channels").
ColumnExpr("org_id TEXT").
Exec(ctx); err != nil {
return err
}
return nil
}
func (migration *addAlertmanagerConfiguration) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@@ -3,6 +3,7 @@ package sqlitesqlstore
import (
"context"
"database/sql"
"log/slog"
"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3"
@@ -10,6 +11,7 @@ import (
"github.com/uptrace/bun/dialect/sqlitedialect"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/sqlstore/sqlstorehook"
)
type provider struct {
@@ -33,10 +35,13 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
settings.Logger().InfoContext(ctx, "connected to sqlite", "path", config.Sqlite.Path)
sqldb.SetMaxOpenConns(config.Connection.MaxOpenConns)
bundb := bun.NewDB(sqldb, sqlitedialect.New())
bundb.AddQueryHook(sqlstorehook.NewDebug(settings.Logger(), slog.LevelDebug))
return &provider{
settings: settings,
sqldb: sqldb,
bundb: bun.NewDB(sqldb, sqlitedialect.New()),
bundb: bundb,
sqlxdb: sqlx.NewDb(sqldb, "sqlite3"),
}, nil
}

View File

@@ -0,0 +1,30 @@
package sqlstorehook
import (
"context"
"log/slog"
"time"
"github.com/uptrace/bun"
)
type debug struct {
bun.QueryHook
logger *slog.Logger
level slog.Level
}
func NewDebug(logger *slog.Logger, level slog.Level) bun.QueryHook {
return &debug{
logger: logger,
level: level,
}
}
func (debug) BeforeQuery(ctx context.Context, event *bun.QueryEvent) context.Context {
return ctx
}
func (hook debug) AfterQuery(ctx context.Context, event *bun.QueryEvent) {
hook.logger.Log(ctx, hook.level, "::SQLSTORE-QUERY::", "db.query.operation", event.Operation(), "db.query.text", event.Query, "db.duration", time.Since(event.StartTime).String())
}

BIN
queries.active Normal file

Binary file not shown.