Compare commits

...

49 Commits

Author SHA1 Message Date
grandwizard28
ed165f0609 fix(alertmanager): fix ReceiverNamesFromRuleID logic 2025-03-07 13:04:30 +05:30
grandwizard28
42246eb898 fix(legacy): fix TestAlert 2025-03-07 12:40:21 +05:30
grandwizard28
09a6deac45 fix(provider): fix provider test 2025-03-07 01:26:04 +05:30
grandwizard28
6ca7fa82ba fix(ruler): panic on patch 2025-03-07 00:45:52 +05:30
grandwizard28
99ced1177e feat(alertmanager): consistency 2025-03-06 23:52:14 +05:30
grandwizard28
6107dc8423 feat(alertmanager): consistency 2025-03-06 23:49:24 +05:30
grandwizard28
302b73b8fe feat(alertmanager): consistency 2025-03-06 23:48:53 +05:30
grandwizard28
9766ed21c4 feat(alertmanager): remove compareAndSelect 2025-03-06 23:37:16 +05:30
grandwizard28
763b4fffe3 feat(alertmanager): add configuration 2025-03-06 23:32:30 +05:30
grandwizard28
c6e5434162 feat(alertmanager): add default poll interval to 1 minute 2025-03-06 20:24:13 +05:30
grandwizard28
ae66d0ad88 fix(service): use a read lock when performing operations with alertmanager 2025-03-06 20:20:16 +05:30
grandwizard28
ca395f8dc8 fix(test): send only the first test alert 2025-03-06 19:53:04 +05:30
grandwizard28
a643097015 fix(organization): annoying error 2025-03-06 18:54:43 +05:30
grandwizard28
fc0352867a fix(di): fix how we are doing DI 2025-03-06 18:51:23 +05:30
grandwizard28
c5f7972946 ci(main): merge with main 2025-03-06 17:59:22 +05:30
grandwizard28
98eee6f7fd fix(matcher): fix matcher logic on multiple ruleIds 2025-03-06 15:26:02 +05:30
grandwizard28
65ee87beb9 fix(matcher): fix matcher logic on multiple ruleIds 2025-03-06 15:25:00 +05:30
grandwizard28
bd3e0eeb6c fix(ruler): fix lastinsertedid concundrum 2025-03-06 02:50:09 +05:30
grandwizard28
9d6e09d3f6 fix(legacyalertmanager): create the complete config in the migration 2025-03-06 02:01:11 +05:30
grandwizard28
cccc25e2ee fix(legacyalertmanager): pick the first organization 2025-03-06 01:46:40 +05:30
grandwizard28
9580da0478 fix(migration): make it idempotent 2025-03-06 01:20:56 +05:30
grandwizard28
8534b798c7 fix(migration): make it idempotent 2025-03-06 01:10:19 +05:30
grandwizard28
3bedd10ef9 fix(migration): add an alertmanager config by default 2025-03-06 00:49:35 +05:30
grandwizard28
b6f2ff052d fix(ruler): fix db query 2025-03-05 23:04:19 +05:30
grandwizard28
d02dc8687c refactor(ruler): add BaseModel 2025-03-05 22:48:25 +05:30
grandwizard28
04835d14e1 refactor(alertmanager): cleanup 2025-03-05 21:14:34 +05:30
grandwizard28
5bd60d3f03 feat(ruler): cleanup 2025-03-05 20:13:34 +05:30
grandwizard28
5168e0de2b Merge branch 'main' of github.com:SigNoz/signoz into integration-ruler 2025-03-05 18:51:50 +05:30
grandwizard28
f8a6c1940d Merge branch 'tx-support-sqlstore' into integration-ruler 2025-03-05 18:51:19 +05:30
grandwizard28
81945dd73b feat(alertmanager): do everything transactionally 2025-03-05 18:30:40 +05:30
grandwizard28
6bfae83779 feat(alertmanager): do everything transactionally 2025-03-05 18:29:19 +05:30
grandwizard28
116b1ac607 Merge branch 'tx-support-sqlstore' into integration-ruler 2025-03-05 17:41:26 +05:30
grandwizard28
5915866ef6 feat(alertmanager): do everything transactionally 2025-03-05 17:41:09 +05:30
grandwizard28
6362cfb482 refactor(sqlstore): add documentation 2025-03-05 16:01:15 +05:30
grandwizard28
a6be05b047 feat(sqlstore): add transaction support for sqlstore 2025-03-05 16:00:39 +05:30
grandwizard28
b6be22c687 ci(main): merge with main 2025-03-05 15:15:49 +05:30
grandwizard28
0abb5fca93 refactor(alertmanager): simplify 2025-03-05 02:21:52 +05:30
grandwizard28
ad58342319 Merge branch 'alertmanager-testing' into integration-ruler 2025-03-05 02:21:29 +05:30
grandwizard28
4abd67c67e refactor(alertmanager): simplify 2025-03-05 02:19:59 +05:30
grandwizard28
7de1fbe13f Merge branch 'alertmanager-testing' into integration-ruler 2025-03-05 00:59:47 +05:30
grandwizard28
5606f73074 refactor(alertmanager): simplify 2025-03-05 00:59:43 +05:30
grandwizard28
076b33dcba refactor(alertmanager): simplify 2025-03-05 00:55:55 +05:30
grandwizard28
e770c3054c feat(ruler): integrate with ruler 2025-03-05 00:27:07 +05:30
grandwizard28
4df43463da feat(alertmanager): add remaining scaffold 2025-03-05 00:26:22 +05:30
grandwizard28
2b4340a3e1 feat(legacyalertmanager): fix get alerts 2025-02-20 19:29:27 +05:30
grandwizard28
309ac976cd feat(legacyalertmanager): integrate 2025-02-20 18:54:07 +05:30
grandwizard28
5ab0d77087 feat(alertmanager): add tests for config 2025-02-20 17:09:47 +05:30
grandwizard28
307afa02ff feat(sqlmigration): add org_id foreign key in notification_channels 2025-02-20 16:33:31 +05:30
grandwizard28
b9e29424ce feat(legacyalertmanager): add functions for channels 2025-02-20 15:52:16 +05:30
40 changed files with 976 additions and 1499 deletions

View File

@@ -11,6 +11,7 @@ import (
"go.signoz.io/signoz/ee/query-service/interfaces"
"go.signoz.io/signoz/ee/query-service/license"
"go.signoz.io/signoz/ee/query-service/usage"
"go.signoz.io/signoz/pkg/alertmanager"
baseapp "go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
"go.signoz.io/signoz/pkg/query-service/app/integrations"
@@ -20,6 +21,7 @@ import (
basemodel "go.signoz.io/signoz/pkg/query-service/model"
rules "go.signoz.io/signoz/pkg/query-service/rules"
"go.signoz.io/signoz/pkg/query-service/version"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/types/authtypes"
)
@@ -51,7 +53,7 @@ type APIHandler struct {
}
// NewAPIHandler returns an APIHandler
func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler, error) {
baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{
Reader: opts.DataConnector,
@@ -67,6 +69,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
FluxInterval: opts.FluxInterval,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
})
if err != nil {

View File

@@ -23,8 +23,10 @@ import (
"go.signoz.io/signoz/ee/query-service/integrations/gateway"
"go.signoz.io/signoz/ee/query-service/interfaces"
"go.signoz.io/signoz/ee/query-service/rules"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/http/middleware"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/types"
"go.signoz.io/signoz/pkg/types/authtypes"
"go.signoz.io/signoz/pkg/web"
@@ -45,7 +47,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/cache"
baseconst "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/healthcheck"
basealm "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
baseint "go.signoz.io/signoz/pkg/query-service/interfaces"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
@@ -176,8 +177,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
<-readerReady
rm, err := makeRulesManager(serverOptions.PromConfigPath,
baseconst.GetAlertManagerApiPrefix(),
rm, err := makeRulesManager(
serverOptions.PromConfigPath,
serverOptions.RuleRepoURL,
serverOptions.SigNoz.SQLStore.SQLxDB(),
reader,
@@ -186,6 +187,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
lm,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
serverOptions.SigNoz.Alertmanager,
serverOptions.SigNoz.SQLStore,
)
if err != nil {
@@ -268,7 +271,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
JWT: serverOptions.Jwt,
}
apiHandler, err := api.NewAPIHandler(apiOpts)
apiHandler, err := api.NewAPIHandler(apiOpts, serverOptions.SigNoz)
if err != nil {
return nil, err
}
@@ -530,7 +533,6 @@ func (s *Server) Stop() error {
func makeRulesManager(
promConfigPath,
alertManagerURL string,
ruleRepoURL string,
db *sqlx.DB,
ch baseint.Reader,
@@ -538,39 +540,34 @@ func makeRulesManager(
disableRules bool,
fm baseint.FeatureLookup,
useLogsNewSchema bool,
useTraceNewSchema bool) (*baserules.Manager, error) {
useTraceNewSchema bool,
alertmanager alertmanager.Alertmanager,
sqlstore sqlstore.SQLStore,
) (*baserules.Manager, error) {
// create engine
pqle, err := pqle.FromConfigPath(promConfigPath)
if err != nil {
return nil, fmt.Errorf("failed to create pql engine : %v", err)
}
// notifier opts
notifierOpts := basealm.NotifierOptions{
QueueCapacity: 10000,
Timeout: 1 * time.Second,
AlertManagerURLs: []string{alertManagerURL},
}
// create manager opts
managerOpts := &baserules.ManagerOptions{
NotifierOpts: notifierOpts,
PqlEngine: pqle,
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),
Logger: zap.L(),
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
Cache: cache,
EvalDelay: baseconst.GetEvalDelay(),
PqlEngine: pqle,
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),
Logger: zap.L(),
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
Cache: cache,
EvalDelay: baseconst.GetEvalDelay(),
PrepareTaskFunc: rules.PrepareTaskFunc,
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
PrepareTestRuleFunc: rules.TestNotification,
Alertmanager: alertmanager,
SQLStore: sqlstore,
}
// create Manager

View File

@@ -7,7 +7,6 @@ import (
"os"
"os/signal"
"strconv"
"syscall"
"time"
"go.opentelemetry.io/otel/sdk/resource"
@@ -150,7 +149,14 @@ func main() {
zap.L().Fatal("Failed to create config", zap.Error(err))
}
signoz, err := signoz.New(context.Background(), config, signoz.NewProviderConfig())
signoz, err := signoz.New(
context.Background(),
config,
signoz.NewCacheProviderFactories(),
signoz.NewWebProviderFactories(),
signoz.NewSQLStoreProviderFactories(),
signoz.NewTelemetryStoreProviderFactories(),
)
if err != nil {
zap.L().Fatal("Failed to create signoz struct", zap.Error(err))
}
@@ -198,16 +204,19 @@ func main() {
zap.L().Fatal("Failed to initialize auth cache", zap.Error(err))
}
signalsChannel := make(chan os.Signal, 1)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
signoz.Start(context.Background())
for {
select {
case status := <-server.HealthCheckStatus():
zap.L().Info("Received HealthCheck status: ", zap.Int("status", int(status)))
case <-signalsChannel:
zap.L().Fatal("Received OS Interrupt Signal ... ")
server.Stop()
}
if err := signoz.Wait(context.Background()); err != nil {
zap.L().Fatal("Failed to start signoz", zap.Error(err))
}
err = server.Stop()
if err != nil {
zap.L().Fatal("Failed to stop server", zap.Error(err))
}
err = signoz.Stop(context.Background())
if err != nil {
zap.L().Fatal("Failed to stop signoz", zap.Error(err))
}
}

View File

@@ -28,6 +28,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithSQLStore(opts.SQLStore),
)
if err != nil {
@@ -48,6 +49,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Logger,
opts.Reader,
opts.ManagerOpts.PqlEngine,
baserules.WithSQLStore(opts.SQLStore),
)
if err != nil {
@@ -68,6 +70,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Reader,
opts.Cache,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithSQLStore(opts.SQLStore),
)
if err != nil {
return task, err
@@ -126,6 +129,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
opts.UseTraceNewSchema,
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),
)
if err != nil {
@@ -144,6 +148,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
opts.ManagerOpts.PqlEngine,
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),
)
if err != nil {
@@ -160,6 +165,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
opts.Cache,
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),
)
if err != nil {
zap.L().Error("failed to prepare a new anomaly rule for test", zap.String("name", rule.Name()), zap.Error(err))

View File

@@ -13,13 +13,11 @@ const getTriggered = async (
const response = await axios.get(`/alerts?${queryParams}`);
const amData = JSON.parse(response.data.data);
return {
statusCode: 200,
error: null,
message: response.data.status,
payload: amData.data,
payload: response.data.data,
};
} catch (error) {
return ErrorResponseHandler(error as AxiosError);

2
go.mod
View File

@@ -71,7 +71,6 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.31.0
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
golang.org/x/net v0.33.0
golang.org/x/oauth2 v0.24.0
golang.org/x/sync v0.10.0
golang.org/x/text v0.21.0
@@ -267,6 +266,7 @@ require (
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/time v0.6.0 // indirect
golang.org/x/tools v0.28.0 // indirect

View File

@@ -12,10 +12,10 @@ import (
type Config struct {
// The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself.
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L155C54-L155C249
ExternalUrl *url.URL `mapstructure:"external_url"`
ExternalURL *url.URL `mapstructure:"external_url"`
// GlobalConfig is the global configuration for the alertmanager
Global alertmanagertypes.GlobalConfig `mapstructure:"global"`
Global alertmanagertypes.GlobalConfig `mapstructure:"global" yaml:"global"`
// Config of the root node of the routing tree.
Route alertmanagertypes.RouteConfig `mapstructure:"route"`
@@ -66,8 +66,9 @@ type NFLogConfig struct {
func NewConfig() Config {
return Config{
ExternalUrl: &url.URL{
Host: "localhost:8080",
ExternalURL: &url.URL{
Scheme: "http",
Host: "localhost:8080",
},
Global: alertmanagertypes.GlobalConfig{
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/config/config.go#L727)

View File

@@ -223,7 +223,7 @@ func (server *Server) SetConfig(ctx context.Context, alertmanagerConfig *alertma
return err
}
server.tmpl.ExternalURL = server.srvConfig.ExternalUrl
server.tmpl.ExternalURL = server.srvConfig.ExternalURL
// Build the routing tree and record which receivers are used.
routes := dispatch.NewRoute(config.Route, nil)

View File

@@ -1,8 +1,6 @@
package alertmanager
import (
"errors"
"fmt"
"net/url"
"time"
@@ -15,7 +13,7 @@ type Config struct {
Provider string `mapstructure:"provider"`
// Internal is the internal alertmanager configuration.
Signoz Signoz `mapstructure:"signoz"`
Signoz Signoz `mapstructure:"signoz" yaml:"signoz"`
// Legacy is the legacy alertmanager configuration.
Legacy Legacy `mapstructure:"legacy"`
@@ -26,12 +24,12 @@ type Signoz struct {
PollInterval time.Duration `mapstructure:"poll_interval"`
// Config is the config for the alertmanager server.
alertmanagerserver.Config `mapstructure:",squash"`
alertmanagerserver.Config `mapstructure:",squash" yaml:",squash"`
}
type Legacy struct {
// ApiURL is the URL of the legacy signoz alertmanager.
ApiURL string `mapstructure:"api_url"`
ApiURL *url.URL `mapstructure:"api_url"`
}
func NewConfigFactory() factory.ConfigFactory {
@@ -42,26 +40,19 @@ func newConfig() factory.Config {
return Config{
Provider: "legacy",
Legacy: Legacy{
ApiURL: "http://alertmanager:9093/api",
ApiURL: &url.URL{
Scheme: "http",
Host: "alertmanager:9093",
Path: "/api",
},
},
Signoz: Signoz{
PollInterval: 15 * time.Second,
PollInterval: 1 * time.Minute,
Config: alertmanagerserver.NewConfig(),
},
}
}
func (c Config) Validate() error {
if c.Provider == "legacy" {
if c.Legacy.ApiURL == "" {
return errors.New("api_url is required")
}
_, err := url.Parse(c.Legacy.ApiURL)
if err != nil {
return fmt.Errorf("api_url %q is invalid: %w", c.Legacy.ApiURL, err)
}
}
return nil
}

View File

@@ -2,8 +2,11 @@ package alertmanager
import (
"context"
"net/url"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/config"
@@ -14,6 +17,9 @@ import (
func TestNewWithEnvProvider(t *testing.T) {
t.Setenv("SIGNOZ_ALERTMANAGER_PROVIDER", "legacy")
t.Setenv("SIGNOZ_ALERTMANAGER_LEGACY_API__URL", "http://localhost:9093/api")
t.Setenv("SIGNOZ_ALERTMANAGER_SIGNOZ_ROUTE_REPEAT__INTERVAL", "5m")
t.Setenv("SIGNOZ_ALERTMANAGER_SIGNOZ_EXTERNAL__URL", "https://example.com/test")
t.Setenv("SIGNOZ_ALERTMANAGER_SIGNOZ_GLOBAL_RESOLVE__TIMEOUT", "10s")
conf, err := config.New(
context.Background(),
@@ -32,13 +38,26 @@ func TestNewWithEnvProvider(t *testing.T) {
actual := &Config{}
err = conf.Unmarshal("alertmanager", actual)
require.NoError(t, err)
err = conf.UnmarshalYaml("alertmanager", actual)
require.NoError(t, err)
def := NewConfigFactory().New().(Config)
def.Signoz.Global.ResolveTimeout = model.Duration(10 * time.Second)
def.Signoz.Route.RepeatInterval = 5 * time.Minute
def.Signoz.ExternalURL = &url.URL{
Scheme: "https",
Host: "example.com",
Path: "/test",
}
expected := &Config{
Provider: "legacy",
Legacy: Legacy{
ApiURL: "http://localhost:9093/api",
ApiURL: &url.URL{
Scheme: "http",
Host: "localhost:9093",
Path: "/api",
},
},
Signoz: def.Signoz,
}

View File

@@ -37,6 +37,7 @@ type provider struct {
configStore alertmanagertypes.ConfigStore
batcher *alertmanagerbatcher.Batcher
url *url.URL
orgID string
}
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
@@ -49,11 +50,6 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/legacyalertmanager")
configStore := sqlalertmanagerstore.NewConfigStore(sqlstore)
url, err := url.Parse(config.Legacy.ApiURL)
if err != nil {
return nil, err
}
return &provider{
config: config,
settings: settings,
@@ -62,7 +58,7 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
},
configStore: configStore,
batcher: alertmanagerbatcher.New(settings.Logger(), alertmanagerbatcher.NewConfig()),
url: url,
url: config.Legacy.ApiURL,
}, nil
}
@@ -73,8 +69,25 @@ func (provider *provider) Start(ctx context.Context) error {
}
for alerts := range provider.batcher.C {
if err := provider.putAlerts(ctx, "", alerts); err != nil {
provider.settings.Logger().Error("failed to send alerts to alertmanager", "error", err)
// For the first time, we need to get the orgID from the config store.
// Since this is the legacy alertmanager, we get the first org from the store.
if provider.orgID == "" {
orgIDs, err := provider.configStore.ListOrgs(ctx)
if err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to send alerts to alertmanager", "error", err)
continue
}
if len(orgIDs) == 0 {
provider.settings.Logger().ErrorContext(ctx, "failed to send alerts to alertmanager", "error", "no orgs found")
continue
}
provider.orgID = orgIDs[0]
}
if err := provider.putAlerts(ctx, provider.orgID, alerts); err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to send alerts to alertmanager", "error", err)
}
}
@@ -127,9 +140,16 @@ func (provider *provider) putAlerts(ctx context.Context, orgID string, alerts al
legacyAlerts := make([]postableAlert, len(alerts))
for i, alert := range alerts {
receivers, err := cfg.ReceiverNamesFromRuleID(alert.Alert.Labels["ruleID"])
if err != nil {
return err
ruleID, ok := alert.Alert.Labels[alertmanagertypes.RuleIDMatcherName]
if !ok {
provider.settings.Logger().WarnContext(ctx, "cannot find ruleID for alert, skipping sending alert to alertmanager", "alert", alert)
continue
}
receivers := cfg.ReceiverNamesFromRuleID(ruleID)
if len(receivers) == 0 {
provider.settings.Logger().WarnContext(ctx, "cannot find receivers for alert, skipping sending alert to alertmanager", "ruleID", ruleID, "alert", alert)
continue
}
legacyAlerts[i] = postableAlert{
@@ -198,12 +218,13 @@ func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiv
func (provider *provider) TestAlert(ctx context.Context, orgID string, alert *alertmanagertypes.PostableAlert, receivers []string) error {
url := provider.url.JoinPath(alertsPath)
legacyAlert := postableAlert{
legacyAlerts := make([]postableAlert, 1)
legacyAlerts[0] = postableAlert{
PostableAlert: alert,
Receivers: receivers,
}
body, err := json.Marshal(legacyAlert)
body, err := json.Marshal(legacyAlerts)
if err != nil {
return err
}

View File

@@ -85,6 +85,9 @@ func (service *Service) SyncServers(ctx context.Context) error {
}
func (service *Service) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.DeprecatedGettableAlerts, error) {
service.serversMtx.RLock()
defer service.serversMtx.RUnlock()
server, err := service.getServer(orgID)
if err != nil {
return nil, err
@@ -99,6 +102,9 @@ func (service *Service) GetAlerts(ctx context.Context, orgID string, params aler
}
func (service *Service) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error {
service.serversMtx.RLock()
defer service.serversMtx.RUnlock()
server, err := service.getServer(orgID)
if err != nil {
return err
@@ -108,6 +114,9 @@ func (service *Service) PutAlerts(ctx context.Context, orgID string, alerts aler
}
func (service *Service) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
service.serversMtx.RLock()
defer service.serversMtx.RUnlock()
server, err := service.getServer(orgID)
if err != nil {
return err
@@ -117,6 +126,9 @@ func (service *Service) TestReceiver(ctx context.Context, orgID string, receiver
}
func (service *Service) TestAlert(ctx context.Context, orgID string, alert *alertmanagertypes.PostableAlert, receivers []string) error {
service.serversMtx.RLock()
defer service.serversMtx.RUnlock()
server, err := service.getServer(orgID)
if err != nil {
return err
@@ -144,17 +156,6 @@ func (service *Service) newServer(ctx context.Context, orgID string) (*alertmana
return nil, err
}
beforeCompareAndSelectHash := config.StoreableConfig().Hash
config, err = service.compareAndSelectConfig(ctx, config)
if err != nil {
return nil, err
}
if beforeCompareAndSelectHash == config.StoreableConfig().Hash {
service.settings.Logger().Debug("skipping config store update for org", "orgID", orgID, "hash", config.StoreableConfig().Hash)
return server, nil
}
err = service.configStore.Set(ctx, config)
if err != nil {
return nil, err
@@ -174,56 +175,15 @@ func (service *Service) getConfig(ctx context.Context, orgID string) (*alertmana
if err != nil {
return nil, err
}
config.SetGlobalConfig(service.config.Global)
if config.AlertmanagerConfig().Route == nil {
config.SetRouteConfig(service.config.Route)
} else {
config.UpdateRouteConfig(service.config.Route)
}
}
config.SetGlobalConfig(service.config.Global)
config.SetRouteConfig(service.config.Route)
return config, nil
}
// compareAndSelectConfig compares the existing config with the config derived from channels.
// If the hash of the config and the channels mismatch, the config derived from channels is returned.
func (service *Service) compareAndSelectConfig(ctx context.Context, incomingConfig *alertmanagertypes.Config) (*alertmanagertypes.Config, error) {
channels, err := service.configStore.ListChannels(ctx, incomingConfig.StoreableConfig().OrgID)
if err != nil {
return nil, err
}
matchers, err := service.configStore.GetMatchers(ctx, incomingConfig.StoreableConfig().OrgID)
if err != nil {
return nil, err
}
config, err := alertmanagertypes.NewConfigFromChannels(service.config.Global, service.config.Route, channels, incomingConfig.StoreableConfig().OrgID)
if err != nil {
return nil, err
}
for ruleID, receivers := range matchers {
err = config.CreateRuleIDMatcher(ruleID, receivers)
if err != nil {
return nil, err
}
}
if incomingConfig.StoreableConfig().Hash != config.StoreableConfig().Hash {
service.settings.Logger().InfoContext(ctx, "mismatch found, updating config to match channels and matchers")
return config, nil
}
return incomingConfig, nil
}
// getServer returns the server for the given orgID. It should be called with a the lock held.
func (service *Service) getServer(orgID string) (*alertmanagerserver.Server, error) {
service.serversMtx.RLock()
defer service.serversMtx.RUnlock()
server, ok := service.servers[orgID]
if !ok {
return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for org %s", orgID)

View File

@@ -1,6 +1,9 @@
package config
import (
"net/url"
"reflect"
"github.com/go-viper/mapstructure/v2"
"github.com/knadh/koanf/providers/confmap"
"github.com/knadh/koanf/v2"
@@ -61,6 +64,7 @@ func (conf *Conf) Unmarshal(path string, input any) error {
mapstructure.StringToSliceHookFunc(","),
mapstructure.StringToTimeDurationHookFunc(),
mapstructure.TextUnmarshallerHookFunc(),
StringToURLHookFunc(),
),
Result: input,
}
@@ -68,6 +72,22 @@ func (conf *Conf) Unmarshal(path string, input any) error {
return conf.Koanf.UnmarshalWithConf(path, input, koanf.UnmarshalConf{Tag: "mapstructure", DecoderConfig: dc})
}
func (conf *Conf) UnmarshalYaml(path string, input any) error {
dc := &mapstructure.DecoderConfig{
TagName: "yaml",
WeaklyTypedInput: true,
DecodeHook: mapstructure.ComposeDecodeHookFunc(
mapstructure.StringToSliceHookFunc(","),
mapstructure.StringToTimeDurationHookFunc(),
mapstructure.TextUnmarshallerHookFunc(),
StringToURLHookFunc(),
),
Result: input,
}
return conf.Koanf.UnmarshalWithConf(path, input, koanf.UnmarshalConf{Tag: "yaml", DecoderConfig: dc})
}
// Set sets the configuration at the given key.
// It decodes the input into a map as per mapstructure.Decode and then merges it into the configuration.
func (conf *Conf) Set(key string, input any) error {
@@ -88,3 +108,22 @@ func (conf *Conf) Set(key string, input any) error {
return nil
}
func StringToURLHookFunc() mapstructure.DecodeHookFunc {
return func(
f reflect.Type,
t reflect.Type,
data interface{},
) (interface{}, error) {
if f.Kind() != reflect.String {
return data, nil
}
if t != reflect.TypeOf(url.URL{}) {
return data, nil
}
// Convert it by parsing
u, err := url.Parse(data.(string))
return u, err
}
}

View File

@@ -40,7 +40,7 @@ func NewRegistry(logger *slog.Logger, services ...NamedService) (*Registry, erro
}, nil
}
func (r *Registry) Start(ctx context.Context) error {
func (r *Registry) Start(ctx context.Context) {
for _, s := range r.services.GetInOrder() {
go func(s NamedService) {
r.logger.InfoContext(ctx, "starting service", "service", s.Name())
@@ -49,7 +49,6 @@ func (r *Registry) Start(ctx context.Context) error {
}(s)
}
return nil
}
func (r *Registry) Wait(ctx context.Context) error {

View File

@@ -41,7 +41,7 @@ func TestRegistryWith2Services(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
require.NoError(t, registry.Start(ctx))
registry.Start(ctx)
require.NoError(t, registry.Wait(ctx))
require.NoError(t, registry.Stop(ctx))
}()
@@ -62,7 +62,7 @@ func TestRegistryWith2ServicesWithoutWait(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
require.NoError(t, registry.Start(ctx))
registry.Start(ctx)
require.NoError(t, registry.Stop(ctx))
}()

View File

@@ -78,6 +78,12 @@ func (writer *nonFlushingBadResponseLoggingWriter) Write(data []byte) (int, erro
// https://godoc.org/net/http#ResponseWriter
writer.WriteHeader(http.StatusOK)
}
// 204 No Content is a success response that indicates that the request has been successfully processed and that the response body is intentionally empty.
if writer.statusCode == 204 {
return 0, nil
}
n, err := writer.rw.Write(data)
if writer.logBody {
writer.captureResponseBody(data)

View File

@@ -23,6 +23,7 @@ type SDK struct {
logger *slog.Logger
sdk contribsdkconfig.SDK
prometheusRegistry *prometheus.Registry
startCh chan struct{}
}
// New creates a new Instrumentation instance with configured providers.
@@ -96,14 +97,17 @@ func New(ctx context.Context, build version.Build, cfg Config) (*SDK, error) {
sdk: sdk,
prometheusRegistry: prometheusRegistry,
logger: NewLogger(cfg),
startCh: make(chan struct{}),
}, nil
}
func (i *SDK) Start(ctx context.Context) error {
<-i.startCh
return nil
}
func (i *SDK) Stop(ctx context.Context) error {
close(i.startCh)
return i.sdk.Shutdown(ctx)
}

View File

@@ -49,7 +49,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/metrics"
"go.signoz.io/signoz/pkg/query-service/model"
@@ -145,7 +144,6 @@ type ClickHouseReader struct {
promConfigFile string
promConfig *config.Config
alertManager am.Manager
featureFlags interfaces.FeatureLookup
liveTailRefreshSeconds int
@@ -194,13 +192,6 @@ func NewReaderFromClickhouseConnection(
fluxIntervalForTraceDetail time.Duration,
cache cache.Cache,
) *ClickHouseReader {
alertManager, err := am.New()
if err != nil {
zap.L().Error("failed to initialize alert manager", zap.Error(err))
zap.L().Error("check if the alert manager URL is correctly set and valid")
os.Exit(1)
}
logsTableName := options.primary.LogsTable
logsLocalTableName := options.primary.LogsLocalTable
if useLogsNewSchema {
@@ -219,7 +210,6 @@ func NewReaderFromClickhouseConnection(
db: db,
localDB: localDB,
TraceDB: options.primary.TraceDB,
alertManager: alertManager,
operationsTable: options.primary.OperationsTable,
indexTable: options.primary.IndexTable,
errorTable: options.primary.ErrorTable,

View File

@@ -18,6 +18,7 @@ import (
"text/template"
"time"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/query-service/app/metricsexplorer"
"github.com/gorilla/mux"
@@ -60,7 +61,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/dao"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/query-service/rules"
@@ -86,7 +86,6 @@ type APIHandler struct {
reader interfaces.Reader
skipConfig *model.SkipConfig
appDao dao.ModelDao
alertManager am.Manager
ruleManager *rules.Manager
featureFlags interfaces.FeatureLookup
querier interfaces.Querier
@@ -135,6 +134,8 @@ type APIHandler struct {
pvcsRepo *inframetrics.PvcsRepo
JWT *authtypes.JWT
AlertmanagerAPI *alertmanager.API
}
type APIHandlerOpts struct {
@@ -176,16 +177,12 @@ type APIHandlerOpts struct {
UseTraceNewSchema bool
JWT *authtypes.JWT
AlertmanagerAPI *alertmanager.API
}
// NewAPIHandler returns an APIHandler
func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
alertManager, err := am.New()
if err != nil {
return nil, err
}
querierOpts := querier.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
@@ -229,7 +226,6 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
skipConfig: opts.SkipConfig,
preferSpanMetrics: opts.PreferSpanMetrics,
temporalityMap: make(map[string]map[v3.Temporality]bool),
alertManager: alertManager,
ruleManager: opts.RuleManager,
featureFlags: opts.FeatureFlags,
IntegrationsController: opts.IntegrationsController,
@@ -252,6 +248,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
pvcsRepo: pvcsRepo,
JWT: opts.JWT,
SummaryService: summaryService,
AlertmanagerAPI: opts.AlertmanagerAPI,
}
logsQueryBuilder := logsv3.PrepareLogsQuery
@@ -485,21 +482,21 @@ func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) {
// RegisterPrivateRoutes registers routes for this handler on the given router
func (aH *APIHandler) RegisterPrivateRoutes(router *mux.Router) {
router.HandleFunc("/api/v1/channels", aH.listChannels).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels", aH.AlertmanagerAPI.ListAllChannels).Methods(http.MethodGet)
}
// RegisterRoutes registers routes for this handler on the given router
func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
router.HandleFunc("/api/v1/query_range", am.ViewAccess(aH.queryRangeMetrics)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/query", am.ViewAccess(aH.queryMetrics)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels", am.ViewAccess(aH.listChannels)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels/{id}", am.ViewAccess(aH.getChannel)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.editChannel)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.deleteChannel)).Methods(http.MethodDelete)
router.HandleFunc("/api/v1/channels", am.EditAccess(aH.createChannel)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/testChannel", am.EditAccess(aH.testChannel)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/channels", am.ViewAccess(aH.AlertmanagerAPI.ListChannels)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels/{id}", am.ViewAccess(aH.AlertmanagerAPI.GetChannelByID)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.AlertmanagerAPI.UpdateChannelByID)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.AlertmanagerAPI.DeleteChannelByID)).Methods(http.MethodDelete)
router.HandleFunc("/api/v1/channels", am.EditAccess(aH.AlertmanagerAPI.CreateChannel)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/testChannel", am.EditAccess(aH.AlertmanagerAPI.TestReceiver)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/alerts", am.ViewAccess(aH.getAlerts)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/alerts", am.ViewAccess(aH.AlertmanagerAPI.GetAlerts)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/rules", am.ViewAccess(aH.listRules)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/rules/{id}", am.ViewAccess(aH.getRule)).Methods(http.MethodGet)
@@ -1363,138 +1360,6 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) {
}
func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
channel, apiErrorObj := aH.ruleManager.RuleDB().GetChannel(id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, channel)
}
func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
apiErrorObj := aH.ruleManager.RuleDB().DeleteChannel(id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, "notification channel successfully deleted")
}
func (aH *APIHandler) listChannels(w http.ResponseWriter, r *http.Request) {
channels, apiErrorObj := aH.ruleManager.RuleDB().GetChannels()
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, channels)
}
// testChannels sends test alert to all registered channels
func (aH *APIHandler) testChannel(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
zap.L().Error("Error in getting req body of testChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
receiver := &am.Receiver{}
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
zap.L().Error("Error in parsing req body of testChannel API\n", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
// send alert
apiErrorObj := aH.alertManager.TestReceiver(receiver)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, "test alert sent")
}
func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
zap.L().Error("Error in getting req body of editChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
receiver := &am.Receiver{}
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
zap.L().Error("Error in parsing req body of editChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
_, apiErrorObj := aH.ruleManager.RuleDB().EditChannel(receiver, id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, nil)
}
func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
zap.L().Error("Error in getting req body of createChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
receiver := &am.Receiver{}
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
zap.L().Error("Error in parsing req body of createChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
_, apiErrorObj := aH.ruleManager.RuleDB().CreateChannel(receiver)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, nil)
}
func (aH *APIHandler) getAlerts(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
amEndpoint := constants.GetAlertManagerApiPrefix()
resp, err := http.Get(amEndpoint + "v1/alerts" + "?" + params.Encode())
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, string(body))
}
func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

View File

@@ -14,6 +14,7 @@ import (
"github.com/rs/cors"
"github.com/soheilhy/cmux"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/http/middleware"
"go.signoz.io/signoz/pkg/query-service/agentConf"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
@@ -25,6 +26,7 @@ import (
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
"go.signoz.io/signoz/pkg/query-service/app/preferences"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/types"
"go.signoz.io/signoz/pkg/types/authtypes"
"go.signoz.io/signoz/pkg/web"
@@ -36,7 +38,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/dao"
"go.signoz.io/signoz/pkg/query-service/featureManager"
"go.signoz.io/signoz/pkg/query-service/healthcheck"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
@@ -152,9 +153,16 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
<-readerReady
rm, err := makeRulesManager(
serverOptions.PromConfigPath,
constants.GetAlertManagerApiPrefix(),
serverOptions.RuleRepoURL, serverOptions.SigNoz.SQLStore.SQLxDB(), reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema)
serverOptions.RuleRepoURL,
serverOptions.SigNoz.SQLStore.SQLxDB(),
reader,
c,
serverOptions.DisableRules,
fm,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
serverOptions.SigNoz.SQLStore,
)
if err != nil {
return nil, err
}
@@ -197,6 +205,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
JWT: serverOptions.Jwt,
AlertmanagerAPI: alertmanager.NewAPI(serverOptions.SigNoz.Alertmanager),
})
if err != nil {
return nil, err
@@ -279,7 +288,6 @@ func (s *Server) createPrivateServer(api *APIHandler) (*http.Server, error) {
}
func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server, error) {
r := NewRouter()
r.Use(middleware.NewAuth(zap.L(), s.serverOptions.Jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}).Wrap)
@@ -467,8 +475,6 @@ func (s *Server) Stop() error {
}
func makeRulesManager(
_,
alertManagerURL string,
ruleRepoURL string,
db *sqlx.DB,
ch interfaces.Reader,
@@ -476,7 +482,9 @@ func makeRulesManager(
disableRules bool,
fm interfaces.FeatureLookup,
useLogsNewSchema bool,
useTraceNewSchema bool) (*rules.Manager, error) {
useTraceNewSchema bool,
sqlstore sqlstore.SQLStore,
) (*rules.Manager, error) {
// create engine
pqle, err := pqle.FromReader(ch)
@@ -484,16 +492,8 @@ func makeRulesManager(
return nil, fmt.Errorf("failed to create pql engine : %v", err)
}
// notifier opts
notifierOpts := am.NotifierOptions{
QueueCapacity: 10000,
Timeout: 1 * time.Second,
AlertManagerURLs: []string{alertManagerURL},
}
// create manager opts
managerOpts := &rules.ManagerOptions{
NotifierOpts: notifierOpts,
PqlEngine: pqle,
RepoURL: ruleRepoURL,
DBConn: db,
@@ -506,6 +506,7 @@ func makeRulesManager(
EvalDelay: constants.GetEvalDelay(),
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
SQLStore: sqlstore,
}
// create Manager

View File

@@ -57,22 +57,12 @@ const PreferRPM = "PreferRPM"
const SpanSearchScopeRoot = "isroot"
const SpanSearchScopeEntryPoint = "isentrypoint"
func GetAlertManagerApiPrefix() string {
if os.Getenv("ALERTMANAGER_API_PREFIX") != "" {
return os.Getenv("ALERTMANAGER_API_PREFIX")
}
return "http://alertmanager:9093/api/"
}
var TELEMETRY_HEART_BEAT_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_HEART_BEAT_DURATION_MINUTES", 720)
var TELEMETRY_ACTIVE_USER_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_ACTIVE_USER_DURATION_MINUTES", 360)
var InviteEmailTemplate = GetOrDefaultEnv("INVITE_EMAIL_TEMPLATE", "/root/templates/invitation_email_template.html")
// Alert manager channel subpath
var AmChannelApiPath = GetOrDefaultEnv("ALERTMANAGER_API_CHANNEL_PATH", "v1/routes")
var OTLPTarget = GetOrDefaultEnv("OTEL_EXPORTER_OTLP_ENDPOINT", "")
var LogExportBatchSize = GetOrDefaultEnv("OTEL_BLRP_MAX_EXPORT_BATCH_SIZE", "512")

View File

@@ -1,21 +0,0 @@
package constants
import (
"os"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
func TestGetAlertManagerApiPrefix(t *testing.T) {
Convey("TestGetAlertManagerApiPrefix", t, func() {
res := GetAlertManagerApiPrefix()
So(res, ShouldEqual, "http://alertmanager:9093/api/")
Convey("WithEnvSet", func() {
os.Setenv("ALERTMANAGER_API_PREFIX", "http://test:9093/api/")
res = GetAlertManagerApiPrefix()
So(res, ShouldEqual, "http://test:9093/api/")
})
})
}

View File

@@ -1,200 +0,0 @@
package alertManager
// Wrapper to connect and process alert manager functions
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
neturl "net/url"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
)
const contentType = "application/json"
type Manager interface {
URL() *neturl.URL
URLPath(path string) *neturl.URL
AddRoute(receiver *Receiver) *model.ApiError
EditRoute(receiver *Receiver) *model.ApiError
DeleteRoute(name string) *model.ApiError
TestReceiver(receiver *Receiver) *model.ApiError
}
func defaultOptions() []ManagerOptions {
return []ManagerOptions{
WithURL(constants.GetAlertManagerApiPrefix()),
WithChannelApiPath(constants.AmChannelApiPath),
}
}
type ManagerOptions func(m *manager) error
func New(opts ...ManagerOptions) (Manager, error) {
m := &manager{}
newOpts := defaultOptions()
newOpts = append(newOpts, opts...)
for _, opt := range newOpts {
err := opt(m)
if err != nil {
return nil, err
}
}
return m, nil
}
func WithURL(url string) ManagerOptions {
return func(m *manager) error {
m.url = url
parsedURL, err := neturl.Parse(url)
if err != nil {
return err
}
m.parsedURL = parsedURL
return nil
}
}
func WithChannelApiPath(path string) ManagerOptions {
return func(m *manager) error {
m.channelApiPath = path
return nil
}
}
type manager struct {
url string
parsedURL *neturl.URL
channelApiPath string
}
func (m *manager) prepareAmChannelApiURL() string {
return fmt.Sprintf("%s%s", m.url, m.channelApiPath)
}
func (m *manager) prepareTestApiURL() string {
return fmt.Sprintf("%s%s", m.url, "v1/testReceiver")
}
func (m *manager) URL() *neturl.URL {
return m.parsedURL
}
func (m *manager) URLPath(path string) *neturl.URL {
upath, err := neturl.Parse(path)
if err != nil {
return nil
}
return m.parsedURL.ResolveReference(upath)
}
func (m *manager) AddRoute(receiver *Receiver) *model.ApiError {
receiverString, _ := json.Marshal(receiver)
amURL := m.prepareAmChannelApiURL()
response, err := http.Post(amURL, contentType, bytes.NewBuffer(receiverString))
if err != nil {
zap.L().Error("Error in getting response of API call to alertmanager", zap.String("url", amURL), zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
if response.StatusCode > 299 {
zap.L().Error("Error in getting 2xx response in API call to alertmanager", zap.String("url", amURL), zap.String("status", response.Status))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (m *manager) EditRoute(receiver *Receiver) *model.ApiError {
receiverString, _ := json.Marshal(receiver)
amURL := m.prepareAmChannelApiURL()
req, err := http.NewRequest(http.MethodPut, amURL, bytes.NewBuffer(receiverString))
if err != nil {
zap.L().Error("Error creating new update request for API call to alertmanager", zap.String("url", amURL), zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
req.Header.Add("Content-Type", contentType)
client := &http.Client{}
response, err := client.Do(req)
if err != nil {
zap.L().Error("Error in getting response of API call to alertmanager", zap.String("url", amURL), zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
if response.StatusCode > 299 {
zap.L().Error("Error in getting 2xx response in PUT API call to alertmanager", zap.String("url", amURL), zap.String("status", response.Status))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (m *manager) DeleteRoute(name string) *model.ApiError {
values := map[string]string{"name": name}
requestData, _ := json.Marshal(values)
amURL := m.prepareAmChannelApiURL()
req, err := http.NewRequest(http.MethodDelete, amURL, bytes.NewBuffer(requestData))
if err != nil {
zap.L().Error("Error in creating new delete request to alertmanager/v1/receivers", zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
req.Header.Add("Content-Type", contentType)
client := &http.Client{}
response, err := client.Do(req)
if err != nil {
zap.L().Error("Error in getting response of API call to alertmanager", zap.String("url", amURL), zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
if response.StatusCode > 299 {
err := fmt.Errorf(fmt.Sprintf("Error in getting 2xx response in PUT API call to alertmanager(DELETE %s)\n", amURL), response.Status)
zap.L().Error("Error in getting 2xx response in PUT API call to alertmanager", zap.String("url", amURL), zap.String("status", response.Status))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (m *manager) TestReceiver(receiver *Receiver) *model.ApiError {
receiverBytes, _ := json.Marshal(receiver)
amTestURL := m.prepareTestApiURL()
response, err := http.Post(amTestURL, contentType, bytes.NewBuffer(receiverBytes))
if err != nil {
zap.L().Error("Error in getting response of API call to alertmanager", zap.String("url", amTestURL), zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
if response.StatusCode > 201 && response.StatusCode < 400 {
err := fmt.Errorf(fmt.Sprintf("Invalid parameters in test alert api for alertmanager(POST %s)\n", amTestURL), response.Status)
zap.L().Error("Invalid parameters in test alert api for alertmanager", zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
if response.StatusCode > 400 {
err := fmt.Errorf(fmt.Sprintf("Received Server Error response for API call to alertmanager(POST %s)\n", amTestURL), response.Status)
zap.L().Error("Received Server Error response for API call to alertmanager", zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}

View File

@@ -1,79 +0,0 @@
package alertManager
import (
"fmt"
"time"
"go.signoz.io/signoz/pkg/query-service/utils/labels"
)
// Receiver configuration provides configuration on how to contact a receiver.
type Receiver struct {
// A unique identifier for this receiver.
Name string `yaml:"name" json:"name"`
EmailConfigs interface{} `yaml:"email_configs,omitempty" json:"email_configs,omitempty"`
PagerdutyConfigs interface{} `yaml:"pagerduty_configs,omitempty" json:"pagerduty_configs,omitempty"`
SlackConfigs interface{} `yaml:"slack_configs,omitempty" json:"slack_configs,omitempty"`
WebhookConfigs interface{} `yaml:"webhook_configs,omitempty" json:"webhook_configs,omitempty"`
OpsGenieConfigs interface{} `yaml:"opsgenie_configs,omitempty" json:"opsgenie_configs,omitempty"`
WechatConfigs interface{} `yaml:"wechat_configs,omitempty" json:"wechat_configs,omitempty"`
PushoverConfigs interface{} `yaml:"pushover_configs,omitempty" json:"pushover_configs,omitempty"`
VictorOpsConfigs interface{} `yaml:"victorops_configs,omitempty" json:"victorops_configs,omitempty"`
SNSConfigs interface{} `yaml:"sns_configs,omitempty" json:"sns_configs,omitempty"`
MSTeamsConfigs interface{} `yaml:"msteams_configs,omitempty" json:"msteams_configs,omitempty"`
}
type ReceiverResponse struct {
Status string `json:"status"`
Data Receiver `json:"data"`
}
// Alert is a generic representation of an alert in the Prometheus eco-system.
type Alert struct {
// Label value pairs for purpose of aggregation, matching, and disposition
// dispatching. This must minimally include an "alertname" label.
Labels labels.BaseLabels `json:"labels"`
// Extra key/value information which does not define alert identity.
Annotations labels.BaseLabels `json:"annotations"`
// The known time range for this alert. Both ends are optional.
StartsAt time.Time `json:"startsAt,omitempty"`
EndsAt time.Time `json:"endsAt,omitempty"`
GeneratorURL string `json:"generatorURL,omitempty"`
Receivers []string `json:"receivers,omitempty"`
}
// Name returns the name of the alert. It is equivalent to the "alertname" label.
func (a *Alert) Name() string {
return a.Labels.Get(labels.AlertNameLabel)
}
// Hash returns a hash over the alert. It is equivalent to the alert labels hash.
func (a *Alert) Hash() uint64 {
return a.Labels.Hash()
}
func (a *Alert) String() string {
s := fmt.Sprintf("%s[%s][%s]", a.Name(), fmt.Sprintf("%016x", a.Hash())[:7], a.Receivers)
if a.Resolved() {
return s + "[resolved]"
}
return s + "[active]"
}
// Resolved returns true iff the activity interval ended in the past.
func (a *Alert) Resolved() bool {
return a.ResolvedAt(time.Now())
}
// ResolvedAt returns true off the activity interval ended before
// the given timestamp.
func (a *Alert) ResolvedAt(ts time.Time) bool {
if a.EndsAt.IsZero() {
return false
}
return !a.EndsAt.After(ts)
}

View File

@@ -1,310 +0,0 @@
package alertManager
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync/atomic"
"net/http"
"net/url"
"sync"
"time"
old_ctx "golang.org/x/net/context"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"go.uber.org/zap"
"golang.org/x/net/context/ctxhttp"
)
const (
alertPushEndpoint = "v1/alerts"
contentTypeJSON = "application/json"
)
// Notifier is responsible for dispatching alert notifications to an
// alert manager service.
type Notifier struct {
queue []*Alert
opts *NotifierOptions
more chan struct{}
mtx sync.RWMutex
ctx context.Context
cancel func()
alertmanagers *alertmanagerSet
logger log.Logger
}
// NotifierOptions are the configurable parameters of a Handler.
type NotifierOptions struct {
QueueCapacity int
// Used for sending HTTP requests to the Alertmanager.
Do func(ctx old_ctx.Context, client *http.Client, req *http.Request) (*http.Response, error)
// List of alert manager urls
AlertManagerURLs []string
// timeout limit on requests
Timeout time.Duration
}
func (opts *NotifierOptions) String() string {
var urls string
for _, u := range opts.AlertManagerURLs {
urls = fmt.Sprintf("%s %s", urls, u)
}
return urls
}
// todo(amol): add metrics
func NewNotifier(o *NotifierOptions, logger log.Logger) (*Notifier, error) {
ctx, cancel := context.WithCancel(context.Background())
if o.Do == nil {
o.Do = ctxhttp.Do
}
if logger == nil {
logger = log.NewNopLogger()
}
n := &Notifier{
queue: make([]*Alert, 0, o.QueueCapacity),
ctx: ctx,
cancel: cancel,
more: make(chan struct{}, 1),
opts: o,
logger: logger,
}
timeout := o.Timeout
if int64(timeout) == 0 {
timeout = time.Duration(30 * time.Second)
}
amset, err := newAlertmanagerSet(o.AlertManagerURLs, timeout, logger)
if err != nil {
zap.L().Error("failed to parse alert manager urls")
return n, err
}
n.alertmanagers = amset
zap.L().Info("Starting notifier with alert manager", zap.Strings("urls", o.AlertManagerURLs))
return n, nil
}
const maxBatchSize = 64
func (n *Notifier) queueLen() int {
n.mtx.RLock()
defer n.mtx.RUnlock()
return len(n.queue)
}
func (n *Notifier) nextBatch() []*Alert {
n.mtx.Lock()
defer n.mtx.Unlock()
var alerts []*Alert
if len(n.queue) > maxBatchSize {
alerts = append(make([]*Alert, 0, maxBatchSize), n.queue[:maxBatchSize]...)
n.queue = n.queue[maxBatchSize:]
} else {
alerts = append(make([]*Alert, 0, len(n.queue)), n.queue...)
n.queue = n.queue[:0]
}
return alerts
}
// Run dispatches notifications continuously.
func (n *Notifier) Run() {
zap.L().Info("msg: Initiating alert notifier...")
for {
select {
case <-n.ctx.Done():
return
case <-n.more:
}
alerts := n.nextBatch()
if !n.sendAll(alerts...) {
zap.L().Warn("msg: dropped alerts", zap.Int("count", len(alerts)))
// n.metrics.dropped.Add(float64(len(alerts)))
}
// If the queue still has items left, kick off the next iteration.
if n.queueLen() > 0 {
n.setMore()
}
}
}
// Send queues the given notification requests for processing.
// Panics if called on a handler that is not running.
func (n *Notifier) Send(alerts ...*Alert) {
n.mtx.Lock()
defer n.mtx.Unlock()
// Queue capacity should be significantly larger than a single alert
// batch could be.
if d := len(alerts) - n.opts.QueueCapacity; d > 0 {
alerts = alerts[d:]
level.Warn(n.logger).Log("msg", "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d)
//n.metrics.dropped.Add(float64(d))
}
// If the queue is full, remove the oldest alerts in favor
// of newer ones.
if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 {
n.queue = n.queue[d:]
level.Warn(n.logger).Log("msg", "Alert notification queue full, dropping alerts", "num_dropped", d)
//n.metrics.dropped.Add(float64(d))
}
n.queue = append(n.queue, alerts...)
// Notify sending goroutine that there are alerts to be processed.
n.setMore()
}
// setMore signals that the alert queue has items.
func (n *Notifier) setMore() {
// If we cannot send on the channel, it means the signal already exists
// and has not been consumed yet.
select {
case n.more <- struct{}{}:
default:
}
}
// Alertmanagers returns a slice of Alertmanager URLs.
func (n *Notifier) Alertmanagers() []*url.URL {
n.mtx.RLock()
amset := n.alertmanagers
n.mtx.RUnlock()
var res []*url.URL
amset.mtx.RLock()
for _, am := range amset.ams {
res = append(res, am.URLPath(alertPushEndpoint))
}
amset.mtx.RUnlock()
return res
}
// sendAll sends the alerts to all configured Alertmanagers concurrently.
// It returns true if the alerts could be sent successfully to at least one Alertmanager.
func (n *Notifier) sendAll(alerts ...*Alert) bool {
b, err := json.Marshal(alerts)
if err != nil {
zap.L().Error("Encoding alerts failed", zap.Error(err))
return false
}
n.mtx.RLock()
ams := n.alertmanagers
n.mtx.RUnlock()
var (
wg sync.WaitGroup
numSuccess uint64
)
ams.mtx.RLock()
for _, am := range ams.ams {
wg.Add(1)
ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.timeout))
defer cancel()
go func(ams *alertmanagerSet, am Manager) {
u := am.URLPath(alertPushEndpoint).String()
if err := n.sendOne(ctx, ams.client, u, b); err != nil {
zap.L().Error("Error calling alert API", zap.String("alertmanager", u), zap.Int("count", len(alerts)), zap.Error(err))
} else {
atomic.AddUint64(&numSuccess, 1)
}
// n.metrics.latency.WithLabelValues(u).Observe(time.Since(begin).Seconds())
// n.metrics.sent.WithLabelValues(u).Add(float64(len(alerts)))
wg.Done()
}(ams, am)
}
ams.mtx.RUnlock()
wg.Wait()
return numSuccess > 0
}
func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error {
req, err := http.NewRequest("POST", url, bytes.NewReader(b))
if err != nil {
return err
}
req.Header.Set("Content-Type", contentTypeJSON)
resp, err := n.opts.Do(ctx, c, req)
if err != nil {
return err
}
defer resp.Body.Close()
// Any HTTP status 2xx is OK.
if resp.StatusCode/100 != 2 {
return fmt.Errorf("bad response status %v", resp.Status)
}
return err
}
// Stop shuts down the notification handler.
func (n *Notifier) Stop() {
level.Info(n.logger).Log("msg", "Stopping notification manager...")
n.cancel()
}
// alertmanagerSet contains a set of Alertmanagers discovered via a group of service
// discovery definitions that have a common configuration on how alerts should be sent.
type alertmanagerSet struct {
urls []string
client *http.Client
timeout time.Duration
mtx sync.RWMutex
ams []Manager
logger log.Logger
}
func newAlertmanagerSet(urls []string, timeout time.Duration, logger log.Logger) (*alertmanagerSet, error) {
client := &http.Client{}
s := &alertmanagerSet{
client: client,
urls: urls,
logger: logger,
timeout: timeout,
}
ams := []Manager{}
for _, u := range urls {
am, err := New(WithURL(u))
if err != nil {
level.Error(s.logger).Log(fmt.Sprintf("invalid alert manager url %s: %s", u, err))
} else {
ams = append(ams, am)
}
}
if len(ams) == 0 {
return s, fmt.Errorf("no alert managers")
}
s.ams = ams
return s, nil
}

View File

@@ -4,8 +4,6 @@ import (
"context"
"flag"
"os"
"os/signal"
"syscall"
"time"
prommodel "github.com/prometheus/common/model"
@@ -94,7 +92,14 @@ func main() {
zap.L().Fatal("Failed to create config", zap.Error(err))
}
signoz, err := signoz.New(context.Background(), config, signoz.NewProviderConfig())
signoz, err := signoz.New(
context.Background(),
config,
signoz.NewCacheProviderFactories(),
signoz.NewWebProviderFactories(),
signoz.NewSQLStoreProviderFactories(),
signoz.NewTelemetryStoreProviderFactories(),
)
if err != nil {
zap.L().Fatal("Failed to create signoz struct", zap.Error(err))
}
@@ -142,22 +147,20 @@ func main() {
logger.Fatal("Failed to initialize auth cache", zap.Error(err))
}
signalsChannel := make(chan os.Signal, 1)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
signoz.Start(context.Background())
for {
select {
case status := <-server.HealthCheckStatus():
logger.Info("Received HealthCheck status: ", zap.Int("status", int(status)))
case <-signalsChannel:
logger.Info("Received OS Interrupt Signal ... ")
err := server.Stop()
if err != nil {
logger.Fatal("Failed to stop server", zap.Error(err))
}
logger.Info("Server stopped")
return
}
if err := signoz.Wait(context.Background()); err != nil {
zap.L().Fatal("Failed to start signoz", zap.Error(err))
}
err = server.Stop()
if err != nil {
zap.L().Fatal("Failed to stop server", zap.Error(err))
}
err = signoz.Stop(context.Background())
if err != nil {
zap.L().Fatal("Failed to stop signoz", zap.Error(err))
}
}

View File

@@ -80,7 +80,6 @@ func parsePostableRule(content []byte, kind RuleDataKind) (*PostableRule, error)
// parseIntoRule loads the content (data) into PostableRule and also
// validates the end result
func parseIntoRule(initRule PostableRule, content []byte, kind RuleDataKind) (*PostableRule, error) {
rule := &initRule
var err error

View File

@@ -13,6 +13,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
qslabels "go.signoz.io/signoz/pkg/query-service/utils/labels"
"go.signoz.io/signoz/pkg/sqlstore"
"go.uber.org/zap"
)
@@ -78,6 +79,8 @@ type BaseRule struct {
// querying the v4 table on low cardinal temporality column
// should be fast but we can still avoid the query if we have the data in memory
TemporalityMap map[string]map[v3.Temporality]bool
sqlstore sqlstore.SQLStore
}
type RuleOption func(*BaseRule)
@@ -106,6 +109,12 @@ func WithLogger(logger *zap.Logger) RuleOption {
}
}
func WithSQLStore(sqlstore sqlstore.SQLStore) RuleOption {
return func(r *BaseRule) {
r.sqlstore = sqlstore
}
}
func NewBaseRule(id string, p *PostableRule, reader interfaces.Reader, opts ...RuleOption) (*BaseRule, error) {
if p.RuleCondition == nil || !p.RuleCondition.IsValid() {
return nil, fmt.Errorf("invalid rule condition")
@@ -309,6 +318,20 @@ func (r *BaseRule) ActiveAlerts() []*Alert {
}
func (r *BaseRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
var orgID string
err := r.
sqlstore.
BunDB().
NewSelect().
Table("organizations").
ColumnExpr("id").
Limit(1).
Scan(ctx, &orgID)
if err != nil {
r.logger.Error("failed to get org ids", zap.Error(err))
return
}
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
if alert.needsSending(ts, resendDelay) {
@@ -322,7 +345,7 @@ func (r *BaseRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay tim
alerts = append(alerts, &anew)
}
})
notifyFunc(ctx, "", alerts...)
notifyFunc(ctx, orgID, "", alerts...)
}
func (r *BaseRule) ForEachActiveAlert(f func(*Alert)) {

View File

@@ -11,30 +11,24 @@ import (
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/common"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"github.com/uptrace/bun"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/types/authtypes"
"go.uber.org/zap"
)
// Data store to capture user alert rule settings
type RuleDB interface {
GetChannel(id string) (*model.ChannelItem, *model.ApiError)
GetChannels() (*[]model.ChannelItem, *model.ApiError)
DeleteChannel(id string) *model.ApiError
CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError)
EditChannel(receiver *am.Receiver, id string) (*am.Receiver, *model.ApiError)
// CreateRuleTx stores rule in the db and returns tx and group name (on success)
CreateRuleTx(ctx context.Context, rule string) (int64, Tx, error)
// CreateRule stores rule in the db and returns tx and group name (on success)
CreateRule(context.Context, *StoredRule, func(context.Context, int64) error) (int64, error)
// EditRuleTx updates the given rule in the db and returns tx and group name (on success)
EditRuleTx(ctx context.Context, rule string, id string) (string, Tx, error)
EditRule(context.Context, *StoredRule, func(context.Context) error) error
// DeleteRuleTx deletes the given rule in the db and returns tx and group name (on success)
DeleteRuleTx(ctx context.Context, id string) (string, Tx, error)
DeleteRule(context.Context, string, func(context.Context) error) error
// GetStoredRules fetches the rule definitions from db
GetStoredRules(ctx context.Context) ([]StoredRule, error)
@@ -62,142 +56,83 @@ type RuleDB interface {
}
type StoredRule struct {
Id int `json:"id" db:"id"`
CreatedAt *time.Time `json:"created_at" db:"created_at"`
CreatedBy *string `json:"created_by" db:"created_by"`
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
UpdatedBy *string `json:"updated_by" db:"updated_by"`
Data string `json:"data" db:"data"`
}
bun.BaseModel `bun:"rules"`
type Tx interface {
Commit() error
Rollback() error
Id int `json:"id" db:"id" bun:"id,pk,autoincrement"`
CreatedAt *time.Time `json:"created_at" db:"created_at" bun:"created_at"`
CreatedBy *string `json:"created_by" db:"created_by" bun:"created_by"`
UpdatedAt *time.Time `json:"updated_at" db:"updated_at" bun:"updated_at"`
UpdatedBy *string `json:"updated_by" db:"updated_by" bun:"updated_by"`
Data string `json:"data" db:"data" bun:"data"`
}
type ruleDB struct {
*sqlx.DB
alertManager am.Manager
sqlstore sqlstore.SQLStore
}
// todo: move init methods for creating tables
func NewRuleDB(db *sqlx.DB, alertManager am.Manager) RuleDB {
return &ruleDB{
db,
alertManager,
}
func NewRuleDB(db *sqlx.DB, sqlstore sqlstore.SQLStore) RuleDB {
return &ruleDB{db, sqlstore}
}
// CreateRuleTx stores a given rule in db and returns task name,
// sql tx and error (if any)
func (r *ruleDB) CreateRuleTx(ctx context.Context, rule string) (int64, Tx, error) {
var lastInsertId int64
// CreateRule stores a given rule in db and returns task name and error (if any)
func (r *ruleDB) CreateRule(ctx context.Context, storedRule *StoredRule, cb func(context.Context, int64) error) (int64, error) {
err := r.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
_, err := r.sqlstore.
BunDBCtx(ctx).
NewInsert().
Model(storedRule).
Exec(ctx)
if err != nil {
return err
}
return cb(ctx, int64(storedRule.Id))
})
var userEmail string
if user := common.GetUserFromContext(ctx); user != nil {
userEmail = user.Email
}
createdAt := time.Now()
updatedAt := time.Now()
tx, err := r.Begin()
if err != nil {
return lastInsertId, nil, err
return 0, err
}
stmt, err := tx.Prepare(`INSERT into rules (created_at, created_by, updated_at, updated_by, data) VALUES($1,$2,$3,$4,$5);`)
if err != nil {
zap.L().Error("Error in preparing statement for INSERT to rules", zap.Error(err))
tx.Rollback()
return lastInsertId, nil, err
}
defer stmt.Close()
result, err := stmt.Exec(createdAt, userEmail, updatedAt, userEmail, rule)
if err != nil {
zap.L().Error("Error in Executing prepared statement for INSERT to rules", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return lastInsertId, nil, err
}
lastInsertId, err = result.LastInsertId()
if err != nil {
zap.L().Error("Error in getting last insert id for INSERT to rules\n", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return lastInsertId, nil, err
}
return lastInsertId, tx, nil
return int64(storedRule.Id), nil
}
// EditRuleTx stores a given rule string in database and returns
// task name, sql tx and error (if any)
func (r *ruleDB) EditRuleTx(ctx context.Context, rule string, id string) (string, Tx, error) {
// EditRule stores a given rule string in database and returns task name and error (if any)
func (r *ruleDB) EditRule(ctx context.Context, storedRule *StoredRule, cb func(context.Context) error) error {
return r.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
_, err := r.sqlstore.
BunDBCtx(ctx).
NewUpdate().
Model(storedRule).
WherePK().
Exec(ctx)
if err != nil {
return err
}
var groupName string
idInt, _ := strconv.Atoi(id)
if idInt == 0 {
return groupName, nil, fmt.Errorf("failed to read alert id from parameters")
}
var userEmail string
if user := common.GetUserFromContext(ctx); user != nil {
userEmail = user.Email
}
updatedAt := time.Now()
groupName = prepareTaskName(int64(idInt))
// todo(amol): resolve this error - database locked when using
// edit transaction with sqlx
// tx, err := r.Begin()
//if err != nil {
// return groupName, tx, err
//}
stmt, err := r.Prepare(`UPDATE rules SET updated_by=$1, updated_at=$2, data=$3 WHERE id=$4;`)
if err != nil {
zap.L().Error("Error in preparing statement for UPDATE to rules", zap.Error(err))
// tx.Rollback()
return groupName, nil, err
}
defer stmt.Close()
if _, err := stmt.Exec(userEmail, updatedAt, rule, idInt); err != nil {
zap.L().Error("Error in Executing prepared statement for UPDATE to rules", zap.Error(err))
// tx.Rollback() // return an error too, we may want to wrap them
return groupName, nil, err
}
return groupName, nil, nil
return cb(ctx)
})
}
// DeleteRuleTx deletes a given rule with id and returns
// taskname, sql tx and error (if any)
func (r *ruleDB) DeleteRuleTx(ctx context.Context, id string) (string, Tx, error) {
// DeleteRule deletes a given rule with id and returns taskname and error (if any)
func (r *ruleDB) DeleteRule(ctx context.Context, id string, cb func(context.Context) error) error {
if err := r.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
_, err := r.sqlstore.
BunDBCtx(ctx).
NewDelete().
Model(&StoredRule{}).
Where("id = ?", id).
Exec(ctx)
if err != nil {
return err
}
idInt, _ := strconv.Atoi(id)
groupName := prepareTaskName(int64(idInt))
// commented as this causes db locked error
// tx, err := r.Begin()
// if err != nil {
// return groupName, tx, err
// }
stmt, err := r.Prepare(`DELETE FROM rules WHERE id=$1;`)
if err != nil {
return groupName, nil, err
return cb(ctx)
}); err != nil {
return err
}
defer stmt.Close()
if _, err := stmt.Exec(idInt); err != nil {
zap.L().Error("Error in Executing prepared statement for DELETE to rules", zap.Error(err))
// tx.Rollback()
return groupName, nil, err
}
return groupName, nil, nil
return nil
}
func (r *ruleDB) GetStoredRules(ctx context.Context) ([]StoredRule, error) {
@@ -320,114 +255,7 @@ func (r *ruleDB) EditPlannedMaintenance(ctx context.Context, maintenance Planned
return "", nil
}
func getChannelType(receiver *am.Receiver) string {
if receiver.EmailConfigs != nil {
return "email"
}
if receiver.OpsGenieConfigs != nil {
return "opsgenie"
}
if receiver.PagerdutyConfigs != nil {
return "pagerduty"
}
if receiver.PushoverConfigs != nil {
return "pushover"
}
if receiver.SNSConfigs != nil {
return "sns"
}
if receiver.SlackConfigs != nil {
return "slack"
}
if receiver.VictorOpsConfigs != nil {
return "victorops"
}
if receiver.WebhookConfigs != nil {
return "webhook"
}
if receiver.WechatConfigs != nil {
return "wechat"
}
if receiver.MSTeamsConfigs != nil {
return "msteams"
}
return ""
}
func (r *ruleDB) GetChannel(id string) (*model.ChannelItem, *model.ApiError) {
idInt, _ := strconv.Atoi(id)
channel := model.ChannelItem{}
query := "SELECT id, created_at, updated_at, name, type, data data FROM notification_channels WHERE id=?;"
stmt, err := r.Preparex(query)
if err != nil {
zap.L().Error("Error in preparing sql query for GetChannel", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
err = stmt.Get(&channel, idInt)
if err != nil {
zap.L().Error("Error in getting channel with id", zap.Int("id", idInt), zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return &channel, nil
}
func (r *ruleDB) DeleteChannel(id string) *model.ApiError {
idInt, _ := strconv.Atoi(id)
channelToDelete, apiErrorObj := r.GetChannel(id)
if apiErrorObj != nil {
return apiErrorObj
}
tx, err := r.Begin()
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
{
stmt, err := tx.Prepare(`DELETE FROM notification_channels WHERE id=$1;`)
if err != nil {
zap.L().Error("Error in preparing statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback()
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()
if _, err := stmt.Exec(idInt); err != nil {
zap.L().Error("Error in Executing prepared statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}
apiError := r.alertManager.DeleteRoute(channelToDelete.Name)
if apiError != nil {
tx.Rollback()
return apiError
}
err = tx.Commit()
if err != nil {
zap.L().Error("Error in committing transaction for DELETE command to notification_channels", zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (r *ruleDB) GetChannels() (*[]model.ChannelItem, *model.ApiError) {
func (r *ruleDB) getChannels() (*[]model.ChannelItem, *model.ApiError) {
channels := []model.ChannelItem{}
query := "SELECT id, created_at, updated_at, name, type, data data FROM notification_channels"
@@ -442,105 +270,6 @@ func (r *ruleDB) GetChannels() (*[]model.ChannelItem, *model.ApiError) {
}
return &channels, nil
}
func (r *ruleDB) EditChannel(receiver *am.Receiver, id string) (*am.Receiver, *model.ApiError) {
idInt, _ := strconv.Atoi(id)
channel, apiErrObj := r.GetChannel(id)
if apiErrObj != nil {
return nil, apiErrObj
}
if channel.Name != receiver.Name {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("channel name cannot be changed")}
}
tx, err := r.Begin()
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
channel_type := getChannelType(receiver)
receiverString, _ := json.Marshal(receiver)
{
stmt, err := tx.Prepare(`UPDATE notification_channels SET updated_at=$1, type=$2, data=$3 WHERE id=$4;`)
if err != nil {
zap.L().Error("Error in preparing statement for UPDATE to notification_channels", zap.Error(err))
tx.Rollback()
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()
if _, err := stmt.Exec(time.Now(), channel_type, string(receiverString), idInt); err != nil {
zap.L().Error("Error in Executing prepared statement for UPDATE to notification_channels", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}
apiError := r.alertManager.EditRoute(receiver)
if apiError != nil {
tx.Rollback()
return nil, apiError
}
err = tx.Commit()
if err != nil {
zap.L().Error("Error in committing transaction for INSERT to notification_channels", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return receiver, nil
}
func (r *ruleDB) CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError) {
channel_type := getChannelType(receiver)
receiverString, _ := json.Marshal(receiver)
tx, err := r.Begin()
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
{
stmt, err := tx.Prepare(`INSERT INTO notification_channels (created_at, updated_at, name, type, data) VALUES($1,$2,$3,$4,$5);`)
if err != nil {
zap.L().Error("Error in preparing statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback()
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()
if _, err := stmt.Exec(time.Now(), time.Now(), receiver.Name, channel_type, string(receiverString)); err != nil {
zap.L().Error("Error in Executing prepared statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}
apiError := r.alertManager.AddRoute(receiver)
if apiError != nil {
tx.Rollback()
return nil, apiError
}
err = tx.Commit()
if err != nil {
zap.L().Error("Error in committing transaction for INSERT to notification_channels", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return receiver, nil
}
func (r *ruleDB) GetAlertsInfo(ctx context.Context) (*model.AlertsInfo, error) {
@@ -629,7 +358,7 @@ func (r *ruleDB) GetAlertsInfo(ctx context.Context) (*model.AlertsInfo, error) {
}
alertsInfo.AlertNames = alertNames
channels, _ := r.GetChannels()
channels, _ := r.getChannels()
if channels != nil {
alertsInfo.TotalChannels = len(*channels)
for _, channel := range *channels {

View File

@@ -14,41 +14,45 @@ import (
"errors"
"github.com/go-openapi/strfmt"
"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/query-service/cache"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
"go.signoz.io/signoz/pkg/query-service/telemetry"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
"go.signoz.io/signoz/pkg/types/authtypes"
)
type PrepareTaskOptions struct {
Rule *PostableRule
TaskName string
RuleDB RuleDB
Logger *zap.Logger
Reader interfaces.Reader
Cache cache.Cache
FF interfaces.FeatureLookup
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
Rule *PostableRule
TaskName string
RuleDB RuleDB
Logger *zap.Logger
Reader interfaces.Reader
Cache cache.Cache
FF interfaces.FeatureLookup
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
SQLStore sqlstore.SQLStore
UseLogsNewSchema bool
UseTraceNewSchema bool
}
type PrepareTestRuleOptions struct {
Rule *PostableRule
RuleDB RuleDB
Logger *zap.Logger
Reader interfaces.Reader
Cache cache.Cache
FF interfaces.FeatureLookup
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
Rule *PostableRule
RuleDB RuleDB
Logger *zap.Logger
Reader interfaces.Reader
Cache cache.Cache
FF interfaces.FeatureLookup
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
SQLStore sqlstore.SQLStore
UseLogsNewSchema bool
UseTraceNewSchema bool
}
@@ -72,8 +76,7 @@ func prepareTaskName(ruleId interface{}) string {
// ManagerOptions bundles options for the Manager.
type ManagerOptions struct {
NotifierOpts am.NotifierOptions
PqlEngine *pqle.PqlEngine
PqlEngine *pqle.PqlEngine
// RepoURL is used to generate a backlink in sent alert messages
RepoURL string
@@ -96,6 +99,8 @@ type ManagerOptions struct {
UseLogsNewSchema bool
UseTraceNewSchema bool
PrepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
Alertmanager alertmanager.Alertmanager
SQLStore sqlstore.SQLStore
}
// The Manager manages recording and alerting rules.
@@ -105,9 +110,6 @@ type Manager struct {
rules map[string]Rule
mtx sync.RWMutex
block chan struct{}
// Notifier sends messages through alert manager
notifier *am.Notifier
// datastore to store alert definitions
ruleDB RuleDB
@@ -121,15 +123,12 @@ type Manager struct {
UseLogsNewSchema bool
UseTraceNewSchema bool
alertmanager alertmanager.Alertmanager
sqlstore sqlstore.SQLStore
}
func defaultOptions(o *ManagerOptions) *ManagerOptions {
if o.NotifierOpts.QueueCapacity == 0 {
o.NotifierOpts.QueueCapacity = 10000
}
if o.NotifierOpts.Timeout == 0 {
o.NotifierOpts.Timeout = 10 * time.Second
}
if o.ResendDelay == time.Duration(0) {
o.ResendDelay = 1 * time.Minute
}
@@ -161,6 +160,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
WithEvalDelay(opts.ManagerOpts.EvalDelay),
WithSQLStore(opts.SQLStore),
)
if err != nil {
@@ -181,6 +181,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.Logger,
opts.Reader,
opts.ManagerOpts.PqlEngine,
WithSQLStore(opts.SQLStore),
)
if err != nil {
@@ -202,30 +203,12 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
// NewManager returns an implementation of Manager, ready to be started
// by calling the Run method.
func NewManager(o *ManagerOptions) (*Manager, error) {
o = defaultOptions(o)
// here we just initiate notifier, it will be started
// in run()
notifier, err := am.NewNotifier(&o.NotifierOpts, nil)
if err != nil {
// todo(amol): rethink on this, the query service
// should not be down because alert manager is not available
return nil, err
}
amManager, err := am.New()
if err != nil {
return nil, err
}
db := NewRuleDB(o.DBConn, amManager)
db := NewRuleDB(o.DBConn, o.SQLStore)
telemetry.GetInstance().SetAlertsInfoCallback(db.GetAlertsInfo)
m := &Manager{
tasks: map[string]Task{},
rules: map[string]Rule{},
notifier: notifier,
ruleDB: db,
opts: o,
block: make(chan struct{}),
@@ -235,7 +218,10 @@ func NewManager(o *ManagerOptions) (*Manager, error) {
cache: o.Cache,
prepareTaskFunc: o.PrepareTaskFunc,
prepareTestRuleFunc: o.PrepareTestRuleFunc,
alertmanager: o.Alertmanager,
sqlstore: o.SQLStore,
}
return m, nil
}
@@ -309,9 +295,6 @@ func (m *Manager) initiate() error {
// Run starts processing of the rule manager.
func (m *Manager) run() {
// initiate notifier
go m.notifier.Run()
// initiate blocked tasks
close(m.block)
}
@@ -333,26 +316,51 @@ func (m *Manager) Stop() {
// EditRuleDefinition writes the rule definition to the
// datastore and also updates the rule executor
func (m *Manager) EditRule(ctx context.Context, ruleStr string, id string) error {
claims, ok := authtypes.ClaimsFromContext(ctx)
if !ok {
return errors.New("claims not found in context")
}
parsedRule, err := ParsePostableRule([]byte(ruleStr))
if err != nil {
return err
}
taskName, _, err := m.ruleDB.EditRuleTx(ctx, ruleStr, id)
existingRule, err := m.ruleDB.GetStoredRule(ctx, id)
if err != nil {
return err
}
if !m.opts.DisableRules {
err = m.syncRuleStateWithTask(taskName, parsedRule)
now := time.Now()
existingRule.UpdatedAt = &now
existingRule.UpdatedBy = &claims.Email
existingRule.Data = ruleStr
return m.ruleDB.EditRule(ctx, existingRule, func(ctx context.Context) error {
cfg, err := m.alertmanager.GetConfig(ctx, claims.OrgID)
if err != nil {
return err
}
}
return nil
err = cfg.UpdateRuleIDMatcher(id, parsedRule.PreferredChannels)
if err != nil {
return err
}
err = m.alertmanager.SetConfig(ctx, cfg)
if err != nil {
return err
}
if !m.opts.DisableRules {
err = m.syncRuleStateWithTask(prepareTaskName(existingRule.Id), parsedRule)
if err != nil {
return err
}
}
return nil
})
}
func (m *Manager) editTask(rule *PostableRule, taskName string) error {
@@ -371,6 +379,7 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error {
FF: m.featureFlags,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
SQLStore: m.sqlstore,
UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
@@ -411,24 +420,45 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error {
}
func (m *Manager) DeleteRule(ctx context.Context, id string) error {
idInt, err := strconv.Atoi(id)
_, err := strconv.Atoi(id)
if err != nil {
zap.L().Error("delete rule received an rule id in invalid format, must be a number", zap.String("id", id), zap.Error(err))
return fmt.Errorf("delete rule received an rule id in invalid format, must be a number")
}
taskName := prepareTaskName(int64(idInt))
if !m.opts.DisableRules {
m.deleteTask(taskName)
claims, ok := authtypes.ClaimsFromContext(ctx)
if !ok {
return errors.New("claims not found in context")
}
if _, _, err := m.ruleDB.DeleteRuleTx(ctx, id); err != nil {
zap.L().Error("failed to delete the rule from rule db", zap.String("id", id), zap.Error(err))
_, err = m.ruleDB.GetStoredRule(ctx, id)
if err != nil {
return err
}
return nil
return m.ruleDB.DeleteRule(ctx, id, func(ctx context.Context) error {
cfg, err := m.alertmanager.GetConfig(ctx, claims.OrgID)
if err != nil {
return err
}
err = cfg.DeleteRuleIDMatcher(id)
if err != nil {
return err
}
err = m.alertmanager.SetConfig(ctx, cfg)
if err != nil {
return err
}
taskName := prepareTaskName(id)
if !m.opts.DisableRules {
m.deleteTask(taskName)
}
return nil
})
}
func (m *Manager) deleteTask(taskName string) {
@@ -451,32 +481,57 @@ func (m *Manager) deleteTask(taskName string) {
// starts an executor for the rule
func (m *Manager) CreateRule(ctx context.Context, ruleStr string) (*GettableRule, error) {
parsedRule, err := ParsePostableRule([]byte(ruleStr))
if err != nil {
return nil, err
}
lastInsertId, tx, err := m.ruleDB.CreateRuleTx(ctx, ruleStr)
taskName := prepareTaskName(lastInsertId)
if err != nil {
return nil, err
claims, ok := authtypes.ClaimsFromContext(ctx)
if !ok {
return nil, errors.New("claims not found in context")
}
if !m.opts.DisableRules {
if err := m.addTask(parsedRule, taskName); err != nil {
tx.Rollback()
return nil, err
now := time.Now()
storedRule := &StoredRule{
CreatedAt: &now,
CreatedBy: &claims.Email,
UpdatedAt: &now,
UpdatedBy: &claims.Email,
Data: ruleStr,
}
id, err := m.ruleDB.CreateRule(ctx, storedRule, func(ctx context.Context, id int64) error {
cfg, err := m.alertmanager.GetConfig(ctx, claims.OrgID)
if err != nil {
return err
}
}
err = tx.Commit()
err = cfg.CreateRuleIDMatcher(fmt.Sprintf("%d", id), parsedRule.PreferredChannels)
if err != nil {
return err
}
err = m.alertmanager.SetConfig(ctx, cfg)
if err != nil {
return err
}
taskName := prepareTaskName(id)
if !m.opts.DisableRules {
if err := m.addTask(parsedRule, taskName); err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}
gettableRule := &GettableRule{
Id: fmt.Sprintf("%d", lastInsertId),
return &GettableRule{
Id: fmt.Sprintf("%d", id),
PostableRule: *parsedRule,
}
return gettableRule, nil
}, nil
}
func (m *Manager) addTask(rule *PostableRule, taskName string) error {
@@ -494,6 +549,7 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error {
FF: m.featureFlags,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
SQLStore: m.sqlstore,
UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
@@ -594,12 +650,12 @@ func (m *Manager) TriggeredAlerts() []*NamedAlert {
}
// NotifyFunc sends notifications about a set of alerts generated by the given expression.
type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)
type NotifyFunc func(ctx context.Context, orgID string, expr string, alerts ...*Alert)
// prepareNotifyFunc implements the NotifyFunc for a Notifier.
func (m *Manager) prepareNotifyFunc() NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*Alert) {
var res []*am.Alert
return func(ctx context.Context, orgID string, expr string, alerts ...*Alert) {
var res []*alertmanagertypes.PostableAlert
for _, alert := range alerts {
generatorURL := alert.GeneratorURL
@@ -607,27 +663,59 @@ func (m *Manager) prepareNotifyFunc() NotifyFunc {
generatorURL = m.opts.RepoURL
}
a := &am.Alert{
StartsAt: alert.FiredAt,
Labels: alert.Labels,
Annotations: alert.Annotations,
GeneratorURL: generatorURL,
Receivers: alert.Receivers,
a := &alertmanagertypes.PostableAlert{
Annotations: alert.Annotations.Map(),
StartsAt: strfmt.DateTime(alert.FiredAt),
Alert: alertmanagertypes.AlertModel{
Labels: alert.Labels.Map(),
GeneratorURL: strfmt.URI(generatorURL),
},
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
a.EndsAt = strfmt.DateTime(alert.ResolvedAt)
} else {
a.EndsAt = alert.ValidUntil
a.EndsAt = strfmt.DateTime(alert.ValidUntil)
}
res = append(res, a)
}
if len(alerts) > 0 {
m.notifier.Send(res...)
m.alertmanager.PutAlerts(ctx, orgID, res)
}
}
}
func (m *Manager) prepareTestNotifyFunc() NotifyFunc {
return func(ctx context.Context, orgID string, expr string, alerts ...*Alert) {
if len(alerts) == 0 {
return
}
alert := alerts[0]
generatorURL := alert.GeneratorURL
if generatorURL == "" {
generatorURL = m.opts.RepoURL
}
a := &alertmanagertypes.PostableAlert{
Annotations: alert.Annotations.Map(),
StartsAt: strfmt.DateTime(alert.FiredAt),
Alert: alertmanagertypes.AlertModel{
Labels: alert.Labels.Map(),
GeneratorURL: strfmt.URI(generatorURL),
},
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = strfmt.DateTime(alert.ResolvedAt)
} else {
a.EndsAt = strfmt.DateTime(alert.ValidUntil)
}
m.alertmanager.TestAlert(ctx, orgID, a, alert.Receivers)
}
}
func (m *Manager) ListActiveRules() ([]Rule, error) {
ruleList := []Rule{}
@@ -736,6 +824,10 @@ func (m *Manager) syncRuleStateWithTask(taskName string, rule *PostableRule) err
// - re-deploy or undeploy task as necessary
// - update the patched rule in the DB
func (m *Manager) PatchRule(ctx context.Context, ruleStr string, ruleId string) (*GettableRule, error) {
claims, ok := authtypes.ClaimsFromContext(ctx)
if !ok {
return nil, errors.New("claims not found in context")
}
if ruleId == "" {
return nil, fmt.Errorf("id is mandatory for patching rule")
@@ -775,15 +867,16 @@ func (m *Manager) PatchRule(ctx context.Context, ruleStr string, ruleId string)
return nil, err
}
// write updated rule to db
if _, _, err = m.ruleDB.EditRuleTx(ctx, string(patchedRuleBytes), ruleId); err != nil {
// write failed, rollback task state
now := time.Now()
storedJSON.Data = string(patchedRuleBytes)
storedJSON.UpdatedBy = &claims.Email
storedJSON.UpdatedAt = &now
// restore task state from the stored rule
err = m.ruleDB.EditRule(ctx, storedJSON, func(ctx context.Context) error { return nil })
if err != nil {
if err := m.syncRuleStateWithTask(taskName, &storedRule); err != nil {
zap.L().Error("failed to restore rule after patch failure", zap.String("taskName", taskName), zap.Error(err))
}
return nil, err
}
@@ -822,7 +915,8 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
Cache: m.cache,
FF: m.featureFlags,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
NotifyFunc: m.prepareTestNotifyFunc(),
SQLStore: m.sqlstore,
UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
})

View File

@@ -52,6 +52,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
opts.UseTraceNewSchema,
WithSendAlways(),
WithSendUnmatched(),
WithSQLStore(opts.SQLStore),
)
if err != nil {
@@ -70,6 +71,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
opts.ManagerOpts.PqlEngine,
WithSendAlways(),
WithSendUnmatched(),
WithSQLStore(opts.SQLStore),
)
if err != nil {

View File

@@ -3,10 +3,12 @@ package signoz
import (
"context"
"fmt"
"net/url"
"os"
"reflect"
"time"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/apiserver"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/config"
@@ -44,6 +46,9 @@ type Config struct {
// TelemetryStore config
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"`
// Alertmanager config
Alertmanager alertmanager.Config `mapstructure:"alertmanager" yaml:"alertmanager"`
}
// DeprecatedFlags are the flags that are deprecated and scheduled for removal.
@@ -63,6 +68,7 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprec
sqlmigrator.NewConfigFactory(),
apiserver.NewConfigFactory(),
telemetrystore.NewConfigFactory(),
alertmanager.NewConfigFactory(),
}
conf, err := config.New(ctx, resolverConfig, configFactories)
@@ -75,6 +81,10 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprec
return Config{}, err
}
if err := conf.UnmarshalYaml("", &config); err != nil {
return Config{}, err
}
mergeAndEnsureBackwardCompatibility(&config, deprecatedFlags)
if err := validateConfig(config); err != nil {
@@ -138,17 +148,31 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
}
if deprecatedFlags.MaxIdleConns != 50 {
fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS env variable instead.")
fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS instead.")
config.TelemetryStore.Connection.MaxIdleConns = deprecatedFlags.MaxIdleConns
}
if deprecatedFlags.MaxOpenConns != 100 {
fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS env variable instead.")
fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS instead.")
config.TelemetryStore.Connection.MaxOpenConns = deprecatedFlags.MaxOpenConns
}
if deprecatedFlags.DialTimeout != 5*time.Second {
fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT environment variable instead.")
fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT instead.")
config.TelemetryStore.Connection.DialTimeout = deprecatedFlags.DialTimeout
}
if os.Getenv("ALERTMANAGER_API_PREFIX") != "" {
fmt.Println("[Deprecated] env ALERTMANAGER_API_PREFIX is deprecated and scheduled for removal. Please use SIGNOZ_ALERTMANAGER_LEGACY_API__URL instead.")
u, err := url.Parse(os.Getenv("ALERTMANAGER_API_PREFIX"))
if err != nil {
fmt.Println("Error parsing ALERTMANAGER_API_PREFIX, using default value")
} else {
config.Alertmanager.Legacy.ApiURL = u
}
}
if os.Getenv("ALERTMANAGER_API_CHANNEL_PATH") != "" {
fmt.Println("[Deprecated] env ALERTMANAGER_API_CHANNEL_PATH is deprecated and scheduled for complete removal.")
}
}

View File

@@ -1,6 +1,9 @@
package signoz
import (
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/alertmanager/legacyalertmanager"
"go.signoz.io/signoz/pkg/alertmanager/signozalertmanager"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/cache/memorycache"
"go.signoz.io/signoz/pkg/cache/rediscache"
@@ -18,53 +21,56 @@ import (
"go.signoz.io/signoz/pkg/web/routerweb"
)
type ProviderConfig struct {
// Map of all cache provider factories
CacheProviderFactories factory.NamedMap[factory.ProviderFactory[cache.Cache, cache.Config]]
// Map of all web provider factories
WebProviderFactories factory.NamedMap[factory.ProviderFactory[web.Web, web.Config]]
// Map of all sqlstore provider factories
SQLStoreProviderFactories factory.NamedMap[factory.ProviderFactory[sqlstore.SQLStore, sqlstore.Config]]
// Map of all sql migration provider factories
SQLMigrationProviderFactories factory.NamedMap[factory.ProviderFactory[sqlmigration.SQLMigration, sqlmigration.Config]]
// Map of all telemetrystore provider factories
TelemetryStoreProviderFactories factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]]
func NewCacheProviderFactories() factory.NamedMap[factory.ProviderFactory[cache.Cache, cache.Config]] {
return factory.MustNewNamedMap(
memorycache.NewFactory(),
rediscache.NewFactory(),
)
}
func NewProviderConfig() ProviderConfig {
return ProviderConfig{
CacheProviderFactories: factory.MustNewNamedMap(
memorycache.NewFactory(),
rediscache.NewFactory(),
),
WebProviderFactories: factory.MustNewNamedMap(
routerweb.NewFactory(),
noopweb.NewFactory(),
),
SQLStoreProviderFactories: factory.MustNewNamedMap(
sqlitesqlstore.NewFactory(sqlstorehook.NewLoggingFactory()),
postgressqlstore.NewFactory(sqlstorehook.NewLoggingFactory()),
),
SQLMigrationProviderFactories: factory.MustNewNamedMap(
sqlmigration.NewAddDataMigrationsFactory(),
sqlmigration.NewAddOrganizationFactory(),
sqlmigration.NewAddPreferencesFactory(),
sqlmigration.NewAddDashboardsFactory(),
sqlmigration.NewAddSavedViewsFactory(),
sqlmigration.NewAddAgentsFactory(),
sqlmigration.NewAddPipelinesFactory(),
sqlmigration.NewAddIntegrationsFactory(),
sqlmigration.NewAddLicensesFactory(),
sqlmigration.NewAddPatsFactory(),
sqlmigration.NewModifyDatetimeFactory(),
sqlmigration.NewModifyOrgDomainFactory(),
),
TelemetryStoreProviderFactories: factory.MustNewNamedMap(
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewFactory()),
),
}
func NewWebProviderFactories() factory.NamedMap[factory.ProviderFactory[web.Web, web.Config]] {
return factory.MustNewNamedMap(
routerweb.NewFactory(),
noopweb.NewFactory(),
)
}
func NewSQLStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[sqlstore.SQLStore, sqlstore.Config]] {
hook := sqlstorehook.NewLoggingFactory()
return factory.MustNewNamedMap(
sqlitesqlstore.NewFactory(hook),
postgressqlstore.NewFactory(hook),
)
}
func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedMap[factory.ProviderFactory[sqlmigration.SQLMigration, sqlmigration.Config]] {
return factory.MustNewNamedMap(
sqlmigration.NewAddDataMigrationsFactory(),
sqlmigration.NewAddOrganizationFactory(),
sqlmigration.NewAddPreferencesFactory(),
sqlmigration.NewAddDashboardsFactory(),
sqlmigration.NewAddSavedViewsFactory(),
sqlmigration.NewAddAgentsFactory(),
sqlmigration.NewAddPipelinesFactory(),
sqlmigration.NewAddIntegrationsFactory(),
sqlmigration.NewAddLicensesFactory(),
sqlmigration.NewAddPatsFactory(),
sqlmigration.NewModifyDatetimeFactory(),
sqlmigration.NewModifyOrgDomainFactory(),
sqlmigration.NewUpdateOrganizationFactory(sqlstore),
sqlmigration.NewAddAlertmanagerFactory(),
)
}
func NewTelemetryStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]] {
return factory.MustNewNamedMap(
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewFactory()),
)
}
func NewAlertmanagerProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] {
return factory.MustNewNamedMap(
legacyalertmanager.NewFactory(sqlstore),
signozalertmanager.NewFactory(sqlstore),
)
}

View File

@@ -3,14 +3,37 @@ package signoz
import (
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/assert"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/sqlstore/sqlstoretest"
)
func TestNewProviderConfig(t *testing.T) {
// This is a test to ensure that provider factories can be created without panicking since
// we are using the factory.MustNewNamedMap function to initialize the provider factories.
// It also helps us catch these errors during testing instead of runtime.
// This is a test to ensure that provider factories can be created without panicking since
// we are using the factory.MustNewNamedMap function to initialize the provider factories.
// It also helps us catch these errors during testing instead of runtime.
func TestNewProviderFactories(t *testing.T) {
assert.NotPanics(t, func() {
NewProviderConfig()
NewCacheProviderFactories()
})
assert.NotPanics(t, func() {
NewWebProviderFactories()
})
assert.NotPanics(t, func() {
NewSQLStoreProviderFactories()
})
assert.NotPanics(t, func() {
NewTelemetryStoreProviderFactories()
})
assert.NotPanics(t, func() {
NewSQLMigrationProviderFactories(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual))
})
assert.NotPanics(t, func() {
NewAlertmanagerProviderFactories(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual))
})
}

View File

@@ -3,6 +3,7 @@ package signoz
import (
"context"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/instrumentation"
@@ -16,16 +17,21 @@ import (
)
type SigNoz struct {
*factory.Registry
Cache cache.Cache
Web web.Web
SQLStore sqlstore.SQLStore
TelemetryStore telemetrystore.TelemetryStore
Alertmanager alertmanager.Alertmanager
}
func New(
ctx context.Context,
config Config,
providerConfig ProviderConfig,
cacheProviderFactories factory.NamedMap[factory.ProviderFactory[cache.Cache, cache.Config]],
webProviderFactories factory.NamedMap[factory.ProviderFactory[web.Web, web.Config]],
sqlstoreProviderFactories factory.NamedMap[factory.ProviderFactory[sqlstore.SQLStore, sqlstore.Config]],
telemetrystoreProviderFactories factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]],
) (*SigNoz, error) {
// Initialize instrumentation
instrumentation, err := instrumentation.New(ctx, version.Build{}, config.Instrumentation)
@@ -33,7 +39,7 @@ func New(
return nil, err
}
instrumentation.Logger().InfoContext(ctx, "starting signoz", "config", config)
instrumentation.Logger().DebugContext(ctx, "starting signoz", "config", config)
// Get the provider settings from instrumentation
providerSettings := instrumentation.ToProviderSettings()
@@ -43,7 +49,7 @@ func New(
ctx,
providerSettings,
config.Cache,
providerConfig.CacheProviderFactories,
cacheProviderFactories,
config.Cache.Provider,
)
if err != nil {
@@ -55,7 +61,7 @@ func New(
ctx,
providerSettings,
config.Web,
providerConfig.WebProviderFactories,
webProviderFactories,
config.Web.Provider(),
)
if err != nil {
@@ -67,22 +73,19 @@ func New(
ctx,
providerSettings,
config.SQLStore,
providerConfig.SQLStoreProviderFactories,
sqlstoreProviderFactories,
config.SQLStore.Provider,
)
if err != nil {
return nil, err
}
// add the org migration here since we need to pass the sqlstore
providerConfig.SQLMigrationProviderFactories.Add(sqlmigration.NewUpdateOrganizationFactory(sqlstore))
// Initialize telemetrystore from the available telemetrystore provider factories
telemetrystore, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
config.TelemetryStore,
providerConfig.TelemetryStoreProviderFactories,
telemetrystoreProviderFactories,
config.TelemetryStore.Provider,
)
if err != nil {
@@ -94,7 +97,7 @@ func New(
ctx,
providerSettings,
config.SQLMigration,
providerConfig.SQLMigrationProviderFactories,
NewSQLMigrationProviderFactories(sqlstore),
)
if err != nil {
return nil, err
@@ -105,10 +108,32 @@ func New(
return nil, err
}
alertmanager, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
config.Alertmanager,
NewAlertmanagerProviderFactories(sqlstore),
config.Alertmanager.Provider,
)
if err != nil {
return nil, err
}
registry, err := factory.NewRegistry(
instrumentation.Logger(),
factory.NewNamedService(factory.MustNewName("instrumentation"), instrumentation),
factory.NewNamedService(factory.MustNewName("alertmanager"), alertmanager),
)
if err != nil {
return nil, err
}
return &SigNoz{
Registry: registry,
Cache: cache,
Web: web,
SQLStore: sqlstore,
TelemetryStore: telemetrystore,
Alertmanager: alertmanager,
}, nil
}

View File

@@ -36,8 +36,8 @@ func (migration *addOrganization) Up(ctx context.Context, db *bun.DB) error {
ID string `bun:"id,pk,type:text"`
Name string `bun:"name,type:text,notnull"`
CreatedAt int `bun:"created_at,notnull"`
IsAnonymous int `bun:"is_anonymous,notnull,default:0,CHECK(is_anonymous IN (0,1))"`
HasOptedUpdates int `bun:"has_opted_updates,notnull,default:1,CHECK(has_opted_updates IN (0,1))"`
IsAnonymous int `bun:"is_anonymous,notnull,default:0"`
HasOptedUpdates int `bun:"has_opted_updates,notnull,default:1"`
}{}).
IfNotExists().
Exec(ctx); err != nil {

View File

@@ -0,0 +1,206 @@
package sqlmigration
import (
"context"
"database/sql"
"strconv"
"strings"
"time"
"github.com/tidwall/gjson"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerserver"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
type addAlertmanager struct{}
func NewAddAlertmanagerFactory() factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("add_alertmanager"), newAddAlertmanager)
}
func newAddAlertmanager(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
return &addAlertmanager{}, nil
}
func (migration *addAlertmanager) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addAlertmanager) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() //nolint:errcheck
if _, err := tx.
NewDropColumn().
Table("notification_channels").
ColumnExpr("deleted").
Exec(ctx); err != nil {
if !strings.Contains(err.Error(), "no such column") {
return err
}
}
if _, err := tx.
NewAddColumn().
Table("notification_channels").
Apply(WrapIfNotExists(ctx, db, "notification_channels", "org_id")).
Exec(ctx); err != nil && err != ErrNoExecute {
return err
}
var orgID string
err = tx.
NewSelect().
ColumnExpr("id").
Table("organizations").
Limit(1).
Scan(ctx, &orgID)
if err != nil {
if err != sql.ErrNoRows {
return err
}
}
if err == nil {
err = migration.populateOrgID(ctx, tx, orgID)
if err != nil {
return err
}
}
if _, err := tx.
NewCreateTable().
Model(&struct {
bun.BaseModel `bun:"table:alertmanager_config"`
ID uint64 `bun:"id,pk,autoincrement"`
Config string `bun:"config,notnull,type:text"`
Hash string `bun:"hash,notnull,type:text"`
CreatedAt time.Time `bun:"created_at,notnull"`
UpdatedAt time.Time `bun:"updated_at,notnull"`
OrgID string `bun:"org_id,notnull,unique"`
}{}).
ForeignKey(`("org_id") REFERENCES "organizations" ("id")`).
IfNotExists().
Exec(ctx); err != nil {
return err
}
if _, err := tx.
NewCreateTable().
Model(&struct {
bun.BaseModel `bun:"table:alertmanager_state"`
ID uint64 `bun:"id,pk,autoincrement"`
Silences string `bun:"silences,nullzero,type:text"`
NFLog string `bun:"nflog,nullzero,type:text"`
CreatedAt time.Time `bun:"created_at,notnull"`
UpdatedAt time.Time `bun:"updated_at,notnull"`
OrgID string `bun:"org_id,notnull,unique"`
}{}).
ForeignKey(`("org_id") REFERENCES "organizations" ("id")`).
IfNotExists().
Exec(ctx); err != nil {
return err
}
if err := migration.populateAlertmanagerConfig(ctx, tx, orgID); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
func (migration *addAlertmanager) populateOrgID(ctx context.Context, tx bun.Tx, orgID string) error {
if _, err := tx.
NewUpdate().
Table("notification_channels").
Set("org_id = ?", orgID).
Where("org_id IS NULL").
Exec(ctx); err != nil {
return err
}
return nil
}
func (migration *addAlertmanager) populateAlertmanagerConfig(ctx context.Context, tx bun.Tx, orgID string) error {
var channels []*alertmanagertypes.Channel
err := tx.
NewSelect().
Model(&channels).
Where("org_id = ?", orgID).
Scan(ctx)
if err != nil {
return err
}
type matcher struct {
bun.BaseModel `bun:"table:rules"`
ID int `bun:"id,pk"`
Data string `bun:"data"`
}
matchers := []matcher{}
err = tx.
NewSelect().
Column("id", "data").
Model(&matchers).
Scan(ctx)
if err != nil {
return err
}
matchersMap := make(map[string][]string)
for _, matcher := range matchers {
receivers := gjson.Get(matcher.Data, "preferredChannels").Array()
for _, receiver := range receivers {
matchersMap[strconv.Itoa(matcher.ID)] = append(matchersMap[strconv.Itoa(matcher.ID)], receiver.String())
}
}
config, err := alertmanagertypes.NewConfigFromChannels(alertmanagerserver.NewConfig().Global, alertmanagerserver.NewConfig().Route, channels, orgID)
if err != nil {
return err
}
for ruleID, receivers := range matchersMap {
err = config.CreateRuleIDMatcher(ruleID, receivers)
if err != nil {
return err
}
}
if _, err := tx.
NewInsert().
Model(config.StoreableConfig()).
On("CONFLICT (org_id) DO UPDATE").
Set("config = ?", config.StoreableConfig().Config).
Set("hash = ?", config.StoreableConfig().Hash).
Set("updated_at = ?", config.StoreableConfig().UpdatedAt).
Exec(ctx); err != nil {
return err
}
return nil
}
func (migration *addAlertmanager) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@@ -10,7 +10,6 @@ import (
"dario.cat/mergo"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/pkg/labels"
commoncfg "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/uptrace/bun"
@@ -33,10 +32,10 @@ type (
)
type RouteConfig struct {
GroupByStr []string
GroupInterval time.Duration
GroupWait time.Duration
RepeatInterval time.Duration
GroupByStr []string `mapstructure:"group_by"`
GroupInterval time.Duration `mapstructure:"group_interval"`
GroupWait time.Duration `mapstructure:"group_wait"`
RepeatInterval time.Duration `mapstructure:"repeat_interval"`
}
type StoreableConfig struct {
@@ -166,19 +165,6 @@ func (c *Config) SetRouteConfig(routeConfig RouteConfig) {
c.storeableConfig.UpdatedAt = time.Now()
}
func (c *Config) UpdateRouteConfig(routeConfig RouteConfig) {
for _, route := range c.alertmanagerConfig.Route.Routes {
route.GroupByStr = routeConfig.GroupByStr
route.GroupInterval = (*model.Duration)(&routeConfig.GroupInterval)
route.GroupWait = (*model.Duration)(&routeConfig.GroupWait)
route.RepeatInterval = (*model.Duration)(&routeConfig.RepeatInterval)
}
c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig))
c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config))
c.storeableConfig.UpdatedAt = time.Now()
}
func (c *Config) AlertmanagerConfig() *config.Config {
return c.alertmanagerConfig
}
@@ -274,15 +260,11 @@ func (c *Config) CreateRuleIDMatcher(ruleID string, receiverNames []string) erro
return errors.New(errors.TypeInvalidInput, ErrCodeAlertmanagerConfigInvalid, "route is nil")
}
routes := c.alertmanagerConfig.Route.Routes
for i, route := range routes {
if slices.Contains(receiverNames, route.Receiver) {
matcher, err := labels.NewMatcher(labels.MatchEqual, "ruleId", ruleID)
if err != nil {
for i := range c.alertmanagerConfig.Route.Routes {
if slices.Contains(receiverNames, c.alertmanagerConfig.Route.Routes[i].Receiver) {
if err := appendRuleIDToRoute(c.alertmanagerConfig.Route.Routes[i], ruleID); err != nil {
return err
}
c.alertmanagerConfig.Route.Routes[i].Matchers = append(c.alertmanagerConfig.Route.Routes[i].Matchers, matcher)
}
}
@@ -304,12 +286,9 @@ func (c *Config) UpdateRuleIDMatcher(ruleID string, receiverNames []string) erro
func (c *Config) DeleteRuleIDMatcher(ruleID string) error {
routes := c.alertmanagerConfig.Route.Routes
for i, r := range routes {
j := slices.IndexFunc(r.Matchers, func(m *labels.Matcher) bool {
return m.Name == "ruleId" && m.Value == ruleID
})
if j != -1 {
c.alertmanagerConfig.Route.Routes[i].Matchers = slices.Delete(r.Matchers, j, j+1)
for i := range routes {
if err := removeRuleIDFromRoute(c.alertmanagerConfig.Route.Routes[i], ruleID); err != nil {
return err
}
}
@@ -320,18 +299,16 @@ func (c *Config) DeleteRuleIDMatcher(ruleID string) error {
return nil
}
func (c *Config) ReceiverNamesFromRuleID(ruleID string) ([]string, error) {
func (c *Config) ReceiverNamesFromRuleID(ruleID string) []string {
receiverNames := make([]string, 0)
routes := c.alertmanagerConfig.Route.Routes
for _, r := range routes {
for _, m := range r.Matchers {
if m.Name == "ruleId" && m.Value == ruleID {
receiverNames = append(receiverNames, r.Receiver)
}
for _, route := range routes {
if ok := matcherContainsRuleID(route.Matchers, ruleID); ok {
receiverNames = append(receiverNames, route.Receiver)
}
}
return receiverNames, nil
return receiverNames
}
type storeOptions struct {

View File

@@ -35,7 +35,7 @@ func TestCreateRuleIDMatcher(t *testing.T) {
},
},
ruleIDToReceivers: map[string][]string{"test-rule": {"slack-receiver"}},
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}},
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=~\"test-rule\""}}},
},
{
name: "SlackAndEmailReceivers",
@@ -60,7 +60,7 @@ func TestCreateRuleIDMatcher(t *testing.T) {
},
},
ruleIDToReceivers: map[string][]string{"test-rule": {"slack-receiver", "email-receiver"}},
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}, {"receiver": "email-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}},
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=~\"test-rule\""}}, {"receiver": "email-receiver", "continue": true, "matchers": []any{"ruleId=~\"test-rule\""}}},
},
{
name: "ReceiverDoesNotExist",
@@ -94,7 +94,7 @@ func TestCreateRuleIDMatcher(t *testing.T) {
},
},
ruleIDToReceivers: map[string][]string{"test-rule-1": {"slack-receiver", "does-not-exist"}, "test-rule-2": {"slack-receiver"}},
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule-1\"", "ruleId=\"test-rule-2\""}}},
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=~\"test-rule-1|test-rule-2\""}}},
},
}
@@ -162,7 +162,7 @@ func TestDeleteRuleIDMatcher(t *testing.T) {
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true}, {"receiver": "email-receiver", "continue": true}},
},
{
name: "AlertNameDoesNotExist",
name: "RuleIDDoesNotExist",
orgID: "1",
receivers: []config.Receiver{
{
@@ -185,7 +185,7 @@ func TestDeleteRuleIDMatcher(t *testing.T) {
},
ruleIDToReceivers: map[string][]string{"test-rule": {"email-receiver", "slack-receiver"}},
ruleIDsToDelete: []string{"does-not-exist"},
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}, {"receiver": "email-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}},
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=~\"test-rule\""}}, {"receiver": "email-receiver", "continue": true, "matchers": []any{"ruleId=~\"test-rule\""}}},
},
}

View File

@@ -0,0 +1,77 @@
package alertmanagertypes
import (
"slices"
"strings"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/pkg/labels"
)
const (
RuleIDMatcherName string = "ruleId"
ruleIDMatcherValueSep string = "|"
)
func appendRuleIDToRoute(route *config.Route, ruleID string) error {
matcherIdx := slices.IndexFunc(route.Matchers, func(m *labels.Matcher) bool {
return m.Name == RuleIDMatcherName
})
if matcherIdx == -1 {
matcher, err := labels.NewMatcher(labels.MatchRegexp, RuleIDMatcherName, ruleID)
if err != nil {
return err
}
route.Matchers = append(route.Matchers, matcher)
return nil
}
existingRuleIDs := strings.Split(route.Matchers[matcherIdx].Value, ruleIDMatcherValueSep)
existingRuleIDs = append(existingRuleIDs, ruleID)
route.Matchers[matcherIdx].Value = strings.Join(existingRuleIDs, ruleIDMatcherValueSep)
return nil
}
func removeRuleIDFromRoute(route *config.Route, ruleID string) error {
matcherIdx := slices.IndexFunc(route.Matchers, func(m *labels.Matcher) bool {
return m.Name == RuleIDMatcherName
})
if matcherIdx == -1 {
return nil
}
existingRuleIDs := strings.Split(route.Matchers[matcherIdx].Value, ruleIDMatcherValueSep)
existingRuleIDs = slices.DeleteFunc(existingRuleIDs, func(id string) bool {
return id == ruleID
})
if len(existingRuleIDs) == 0 {
route.Matchers = slices.Delete(route.Matchers, matcherIdx, matcherIdx+1)
return nil
}
matcher, err := labels.NewMatcher(labels.MatchRegexp, RuleIDMatcherName, strings.Join(existingRuleIDs, ruleIDMatcherValueSep))
if err != nil {
return err
}
route.Matchers[matcherIdx] = matcher
return nil
}
func matcherContainsRuleID(matchers config.Matchers, ruleID string) bool {
for _, matcher := range matchers {
if matcher.Name == RuleIDMatcherName {
existingRuleIDs := strings.Split(matcher.Value, ruleIDMatcherValueSep)
if slices.Contains(existingRuleIDs, ruleID) {
return true
}
}
}
return false
}