Compare commits

...

32 Commits

Author SHA1 Message Date
grandwizard28
bd3e0eeb6c fix(ruler): fix lastinsertedid concundrum 2025-03-06 02:50:09 +05:30
grandwizard28
9d6e09d3f6 fix(legacyalertmanager): create the complete config in the migration 2025-03-06 02:01:11 +05:30
grandwizard28
cccc25e2ee fix(legacyalertmanager): pick the first organization 2025-03-06 01:46:40 +05:30
grandwizard28
9580da0478 fix(migration): make it idempotent 2025-03-06 01:20:56 +05:30
grandwizard28
8534b798c7 fix(migration): make it idempotent 2025-03-06 01:10:19 +05:30
grandwizard28
3bedd10ef9 fix(migration): add an alertmanager config by default 2025-03-06 00:49:35 +05:30
grandwizard28
b6f2ff052d fix(ruler): fix db query 2025-03-05 23:04:19 +05:30
grandwizard28
d02dc8687c refactor(ruler): add BaseModel 2025-03-05 22:48:25 +05:30
grandwizard28
04835d14e1 refactor(alertmanager): cleanup 2025-03-05 21:14:34 +05:30
grandwizard28
5bd60d3f03 feat(ruler): cleanup 2025-03-05 20:13:34 +05:30
grandwizard28
5168e0de2b Merge branch 'main' of github.com:SigNoz/signoz into integration-ruler 2025-03-05 18:51:50 +05:30
grandwizard28
f8a6c1940d Merge branch 'tx-support-sqlstore' into integration-ruler 2025-03-05 18:51:19 +05:30
grandwizard28
81945dd73b feat(alertmanager): do everything transactionally 2025-03-05 18:30:40 +05:30
grandwizard28
6bfae83779 feat(alertmanager): do everything transactionally 2025-03-05 18:29:19 +05:30
grandwizard28
116b1ac607 Merge branch 'tx-support-sqlstore' into integration-ruler 2025-03-05 17:41:26 +05:30
grandwizard28
5915866ef6 feat(alertmanager): do everything transactionally 2025-03-05 17:41:09 +05:30
grandwizard28
6362cfb482 refactor(sqlstore): add documentation 2025-03-05 16:01:15 +05:30
grandwizard28
a6be05b047 feat(sqlstore): add transaction support for sqlstore 2025-03-05 16:00:39 +05:30
grandwizard28
b6be22c687 ci(main): merge with main 2025-03-05 15:15:49 +05:30
grandwizard28
0abb5fca93 refactor(alertmanager): simplify 2025-03-05 02:21:52 +05:30
grandwizard28
ad58342319 Merge branch 'alertmanager-testing' into integration-ruler 2025-03-05 02:21:29 +05:30
grandwizard28
4abd67c67e refactor(alertmanager): simplify 2025-03-05 02:19:59 +05:30
grandwizard28
7de1fbe13f Merge branch 'alertmanager-testing' into integration-ruler 2025-03-05 00:59:47 +05:30
grandwizard28
5606f73074 refactor(alertmanager): simplify 2025-03-05 00:59:43 +05:30
grandwizard28
076b33dcba refactor(alertmanager): simplify 2025-03-05 00:55:55 +05:30
grandwizard28
e770c3054c feat(ruler): integrate with ruler 2025-03-05 00:27:07 +05:30
grandwizard28
4df43463da feat(alertmanager): add remaining scaffold 2025-03-05 00:26:22 +05:30
grandwizard28
2b4340a3e1 feat(legacyalertmanager): fix get alerts 2025-02-20 19:29:27 +05:30
grandwizard28
309ac976cd feat(legacyalertmanager): integrate 2025-02-20 18:54:07 +05:30
grandwizard28
5ab0d77087 feat(alertmanager): add tests for config 2025-02-20 17:09:47 +05:30
grandwizard28
307afa02ff feat(sqlmigration): add org_id foreign key in notification_channels 2025-02-20 16:33:31 +05:30
grandwizard28
b9e29424ce feat(legacyalertmanager): add functions for channels 2025-02-20 15:52:16 +05:30
30 changed files with 677 additions and 1300 deletions

View File

@@ -11,6 +11,7 @@ import (
"go.signoz.io/signoz/ee/query-service/interfaces"
"go.signoz.io/signoz/ee/query-service/license"
"go.signoz.io/signoz/ee/query-service/usage"
"go.signoz.io/signoz/pkg/alertmanager"
baseapp "go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
"go.signoz.io/signoz/pkg/query-service/app/integrations"
@@ -20,6 +21,7 @@ import (
basemodel "go.signoz.io/signoz/pkg/query-service/model"
rules "go.signoz.io/signoz/pkg/query-service/rules"
"go.signoz.io/signoz/pkg/query-service/version"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/types/authtypes"
)
@@ -51,7 +53,7 @@ type APIHandler struct {
}
// NewAPIHandler returns an APIHandler
func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler, error) {
baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{
Reader: opts.DataConnector,
@@ -67,6 +69,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
FluxInterval: opts.FluxInterval,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
})
if err != nil {

View File

@@ -23,8 +23,10 @@ import (
"go.signoz.io/signoz/ee/query-service/integrations/gateway"
"go.signoz.io/signoz/ee/query-service/interfaces"
"go.signoz.io/signoz/ee/query-service/rules"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/http/middleware"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/types/authtypes"
"go.signoz.io/signoz/pkg/web"
@@ -44,7 +46,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/cache"
baseconst "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/healthcheck"
basealm "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
baseint "go.signoz.io/signoz/pkg/query-service/interfaces"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
@@ -175,8 +176,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
<-readerReady
rm, err := makeRulesManager(serverOptions.PromConfigPath,
baseconst.GetAlertManagerApiPrefix(),
rm, err := makeRulesManager(
serverOptions.PromConfigPath,
serverOptions.RuleRepoURL,
serverOptions.SigNoz.SQLStore.SQLxDB(),
reader,
@@ -185,6 +186,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
lm,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
serverOptions.SigNoz.Alertmanager,
serverOptions.SigNoz.SQLStore,
)
if err != nil {
@@ -267,7 +270,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
JWT: serverOptions.Jwt,
}
apiHandler, err := api.NewAPIHandler(apiOpts)
apiHandler, err := api.NewAPIHandler(apiOpts, serverOptions.SigNoz)
if err != nil {
return nil, err
}
@@ -529,7 +532,6 @@ func (s *Server) Stop() error {
func makeRulesManager(
promConfigPath,
alertManagerURL string,
ruleRepoURL string,
db *sqlx.DB,
ch baseint.Reader,
@@ -537,39 +539,34 @@ func makeRulesManager(
disableRules bool,
fm baseint.FeatureLookup,
useLogsNewSchema bool,
useTraceNewSchema bool) (*baserules.Manager, error) {
useTraceNewSchema bool,
alertmanager alertmanager.Alertmanager,
sqlstore sqlstore.SQLStore,
) (*baserules.Manager, error) {
// create engine
pqle, err := pqle.FromConfigPath(promConfigPath)
if err != nil {
return nil, fmt.Errorf("failed to create pql engine : %v", err)
}
// notifier opts
notifierOpts := basealm.NotifierOptions{
QueueCapacity: 10000,
Timeout: 1 * time.Second,
AlertManagerURLs: []string{alertManagerURL},
}
// create manager opts
managerOpts := &baserules.ManagerOptions{
NotifierOpts: notifierOpts,
PqlEngine: pqle,
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),
Logger: zap.L(),
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
Cache: cache,
EvalDelay: baseconst.GetEvalDelay(),
PqlEngine: pqle,
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),
Logger: zap.L(),
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
Cache: cache,
EvalDelay: baseconst.GetEvalDelay(),
PrepareTaskFunc: rules.PrepareTaskFunc,
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
PrepareTestRuleFunc: rules.TestNotification,
Alertmanager: alertmanager,
SQLStore: sqlstore,
}
// create Manager

View File

@@ -7,7 +7,6 @@ import (
"os"
"os/signal"
"strconv"
"syscall"
"time"
"go.opentelemetry.io/otel/sdk/resource"
@@ -198,16 +197,19 @@ func main() {
zap.L().Fatal("Failed to initialize auth cache", zap.Error(err))
}
signalsChannel := make(chan os.Signal, 1)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
signoz.Start(context.Background())
for {
select {
case status := <-server.HealthCheckStatus():
zap.L().Info("Received HealthCheck status: ", zap.Int("status", int(status)))
case <-signalsChannel:
zap.L().Fatal("Received OS Interrupt Signal ... ")
server.Stop()
}
if err := signoz.Wait(context.Background()); err != nil {
zap.L().Fatal("Failed to start signoz", zap.Error(err))
}
err = server.Stop()
if err != nil {
zap.L().Fatal("Failed to stop server", zap.Error(err))
}
err = signoz.Stop(context.Background())
if err != nil {
zap.L().Fatal("Failed to stop signoz", zap.Error(err))
}
}

View File

@@ -28,6 +28,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithSQLStore(opts.SQLStore),
)
if err != nil {
@@ -48,6 +49,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Logger,
opts.Reader,
opts.ManagerOpts.PqlEngine,
baserules.WithSQLStore(opts.SQLStore),
)
if err != nil {
@@ -68,6 +70,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Reader,
opts.Cache,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithSQLStore(opts.SQLStore),
)
if err != nil {
return task, err
@@ -126,6 +129,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
opts.UseTraceNewSchema,
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),
)
if err != nil {
@@ -144,6 +148,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
opts.ManagerOpts.PqlEngine,
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),
)
if err != nil {
@@ -160,6 +165,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
opts.Cache,
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),
)
if err != nil {
zap.L().Error("failed to prepare a new anomaly rule for test", zap.String("name", rule.Name()), zap.Error(err))

View File

@@ -13,13 +13,11 @@ const getTriggered = async (
const response = await axios.get(`/alerts?${queryParams}`);
const amData = JSON.parse(response.data.data);
return {
statusCode: 200,
error: null,
message: response.data.status,
payload: amData.data,
payload: response.data.data,
};
} catch (error) {
return ErrorResponseHandler(error as AxiosError);

2
go.mod
View File

@@ -71,7 +71,6 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.31.0
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
golang.org/x/net v0.33.0
golang.org/x/oauth2 v0.24.0
golang.org/x/sync v0.10.0
golang.org/x/text v0.21.0
@@ -267,6 +266,7 @@ require (
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/time v0.6.0 // indirect
golang.org/x/tools v0.28.0 // indirect

View File

@@ -37,6 +37,7 @@ type provider struct {
configStore alertmanagertypes.ConfigStore
batcher *alertmanagerbatcher.Batcher
url *url.URL
orgID string
}
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
@@ -73,8 +74,25 @@ func (provider *provider) Start(ctx context.Context) error {
}
for alerts := range provider.batcher.C {
if err := provider.putAlerts(ctx, "", alerts); err != nil {
provider.settings.Logger().Error("failed to send alerts to alertmanager", "error", err)
// For the first time, we need to get the orgID from the config store.
// Since this is the legacy alertmanager, we get the first org from the store.
if provider.orgID == "" {
orgIDs, err := provider.configStore.ListOrgs(ctx)
if err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to send alerts to alertmanager", "error", err)
continue
}
if len(orgIDs) == 0 {
provider.settings.Logger().ErrorContext(ctx, "failed to send alerts to alertmanager", "error", "no orgs found")
continue
}
provider.orgID = orgIDs[0]
}
if err := provider.putAlerts(ctx, provider.orgID, alerts); err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to send alerts to alertmanager", "error", err)
}
}

View File

@@ -40,7 +40,7 @@ func NewRegistry(logger *slog.Logger, services ...NamedService) (*Registry, erro
}, nil
}
func (r *Registry) Start(ctx context.Context) error {
func (r *Registry) Start(ctx context.Context) {
for _, s := range r.services.GetInOrder() {
go func(s NamedService) {
r.logger.InfoContext(ctx, "starting service", "service", s.Name())
@@ -49,7 +49,6 @@ func (r *Registry) Start(ctx context.Context) error {
}(s)
}
return nil
}
func (r *Registry) Wait(ctx context.Context) error {

View File

@@ -41,7 +41,7 @@ func TestRegistryWith2Services(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
require.NoError(t, registry.Start(ctx))
registry.Start(ctx)
require.NoError(t, registry.Wait(ctx))
require.NoError(t, registry.Stop(ctx))
}()
@@ -62,7 +62,7 @@ func TestRegistryWith2ServicesWithoutWait(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
require.NoError(t, registry.Start(ctx))
registry.Start(ctx)
require.NoError(t, registry.Stop(ctx))
}()

View File

@@ -78,6 +78,12 @@ func (writer *nonFlushingBadResponseLoggingWriter) Write(data []byte) (int, erro
// https://godoc.org/net/http#ResponseWriter
writer.WriteHeader(http.StatusOK)
}
// 204 No Content is a success response that indicates that the request has been successfully processed and that the response body is intentionally empty.
if writer.statusCode == 204 {
return 0, nil
}
n, err := writer.rw.Write(data)
if writer.logBody {
writer.captureResponseBody(data)

View File

@@ -23,6 +23,7 @@ type SDK struct {
logger *slog.Logger
sdk contribsdkconfig.SDK
prometheusRegistry *prometheus.Registry
startCh chan struct{}
}
// New creates a new Instrumentation instance with configured providers.
@@ -96,14 +97,17 @@ func New(ctx context.Context, build version.Build, cfg Config) (*SDK, error) {
sdk: sdk,
prometheusRegistry: prometheusRegistry,
logger: NewLogger(cfg),
startCh: make(chan struct{}),
}, nil
}
func (i *SDK) Start(ctx context.Context) error {
<-i.startCh
return nil
}
func (i *SDK) Stop(ctx context.Context) error {
close(i.startCh)
return i.sdk.Shutdown(ctx)
}

View File

@@ -49,7 +49,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/metrics"
"go.signoz.io/signoz/pkg/query-service/model"
@@ -145,7 +144,6 @@ type ClickHouseReader struct {
promConfigFile string
promConfig *config.Config
alertManager am.Manager
featureFlags interfaces.FeatureLookup
liveTailRefreshSeconds int
@@ -194,13 +192,6 @@ func NewReaderFromClickhouseConnection(
fluxIntervalForTraceDetail time.Duration,
cache cache.Cache,
) *ClickHouseReader {
alertManager, err := am.New()
if err != nil {
zap.L().Error("failed to initialize alert manager", zap.Error(err))
zap.L().Error("check if the alert manager URL is correctly set and valid")
os.Exit(1)
}
logsTableName := options.primary.LogsTable
logsLocalTableName := options.primary.LogsLocalTable
if useLogsNewSchema {
@@ -219,7 +210,6 @@ func NewReaderFromClickhouseConnection(
db: db,
localDB: localDB,
TraceDB: options.primary.TraceDB,
alertManager: alertManager,
operationsTable: options.primary.OperationsTable,
indexTable: options.primary.IndexTable,
errorTable: options.primary.ErrorTable,

View File

@@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"go.signoz.io/signoz/pkg/query-service/app/metricsexplorer"
"io"
"math"
"net/http"
@@ -19,6 +18,9 @@ import (
"text/template"
"time"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/query-service/app/metricsexplorer"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
jsoniter "github.com/json-iterator/go"
@@ -58,7 +60,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/dao"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/query-service/rules"
@@ -84,7 +85,6 @@ type APIHandler struct {
reader interfaces.Reader
skipConfig *model.SkipConfig
appDao dao.ModelDao
alertManager am.Manager
ruleManager *rules.Manager
featureFlags interfaces.FeatureLookup
querier interfaces.Querier
@@ -133,6 +133,8 @@ type APIHandler struct {
pvcsRepo *inframetrics.PvcsRepo
JWT *authtypes.JWT
AlertmanagerAPI *alertmanager.API
}
type APIHandlerOpts struct {
@@ -174,16 +176,12 @@ type APIHandlerOpts struct {
UseTraceNewSchema bool
JWT *authtypes.JWT
AlertmanagerAPI *alertmanager.API
}
// NewAPIHandler returns an APIHandler
func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
alertManager, err := am.New()
if err != nil {
return nil, err
}
querierOpts := querier.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
@@ -227,7 +225,6 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
skipConfig: opts.SkipConfig,
preferSpanMetrics: opts.PreferSpanMetrics,
temporalityMap: make(map[string]map[v3.Temporality]bool),
alertManager: alertManager,
ruleManager: opts.RuleManager,
featureFlags: opts.FeatureFlags,
IntegrationsController: opts.IntegrationsController,
@@ -250,6 +247,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
pvcsRepo: pvcsRepo,
JWT: opts.JWT,
SummaryService: summaryService,
AlertmanagerAPI: opts.AlertmanagerAPI,
}
logsQueryBuilder := logsv3.PrepareLogsQuery
@@ -483,21 +481,21 @@ func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) {
// RegisterPrivateRoutes registers routes for this handler on the given router
func (aH *APIHandler) RegisterPrivateRoutes(router *mux.Router) {
router.HandleFunc("/api/v1/channels", aH.listChannels).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels", aH.AlertmanagerAPI.ListAllChannels).Methods(http.MethodGet)
}
// RegisterRoutes registers routes for this handler on the given router
func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
router.HandleFunc("/api/v1/query_range", am.ViewAccess(aH.queryRangeMetrics)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/query", am.ViewAccess(aH.queryMetrics)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels", am.ViewAccess(aH.listChannels)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels/{id}", am.ViewAccess(aH.getChannel)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.editChannel)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.deleteChannel)).Methods(http.MethodDelete)
router.HandleFunc("/api/v1/channels", am.EditAccess(aH.createChannel)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/testChannel", am.EditAccess(aH.testChannel)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/channels", am.ViewAccess(aH.AlertmanagerAPI.ListChannels)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels/{id}", am.ViewAccess(aH.AlertmanagerAPI.GetChannelByID)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.AlertmanagerAPI.UpdateChannelByID)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.AlertmanagerAPI.DeleteChannelByID)).Methods(http.MethodDelete)
router.HandleFunc("/api/v1/channels", am.EditAccess(aH.AlertmanagerAPI.CreateChannel)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/testChannel", am.EditAccess(aH.AlertmanagerAPI.TestReceiver)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/alerts", am.ViewAccess(aH.getAlerts)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/alerts", am.ViewAccess(aH.AlertmanagerAPI.GetAlerts)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/rules", am.ViewAccess(aH.listRules)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/rules/{id}", am.ViewAccess(aH.getRule)).Methods(http.MethodGet)
@@ -1363,138 +1361,6 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) {
}
func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
channel, apiErrorObj := aH.ruleManager.RuleDB().GetChannel(id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, channel)
}
func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
apiErrorObj := aH.ruleManager.RuleDB().DeleteChannel(id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, "notification channel successfully deleted")
}
func (aH *APIHandler) listChannels(w http.ResponseWriter, r *http.Request) {
channels, apiErrorObj := aH.ruleManager.RuleDB().GetChannels()
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, channels)
}
// testChannels sends test alert to all registered channels
func (aH *APIHandler) testChannel(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
zap.L().Error("Error in getting req body of testChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
receiver := &am.Receiver{}
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
zap.L().Error("Error in parsing req body of testChannel API\n", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
// send alert
apiErrorObj := aH.alertManager.TestReceiver(receiver)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, "test alert sent")
}
func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
zap.L().Error("Error in getting req body of editChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
receiver := &am.Receiver{}
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
zap.L().Error("Error in parsing req body of editChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
_, apiErrorObj := aH.ruleManager.RuleDB().EditChannel(receiver, id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, nil)
}
func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
zap.L().Error("Error in getting req body of createChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
receiver := &am.Receiver{}
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
zap.L().Error("Error in parsing req body of createChannel API", zap.Error(err))
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
_, apiErrorObj := aH.ruleManager.RuleDB().CreateChannel(receiver)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
aH.Respond(w, nil)
}
func (aH *APIHandler) getAlerts(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
amEndpoint := constants.GetAlertManagerApiPrefix()
resp, err := http.Get(amEndpoint + "v1/alerts" + "?" + params.Encode())
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, string(body))
}
func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

View File

@@ -14,6 +14,7 @@ import (
"github.com/rs/cors"
"github.com/soheilhy/cmux"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/http/middleware"
"go.signoz.io/signoz/pkg/query-service/agentConf"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
@@ -25,6 +26,7 @@ import (
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
"go.signoz.io/signoz/pkg/query-service/app/preferences"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/types/authtypes"
"go.signoz.io/signoz/pkg/web"
@@ -35,7 +37,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/dao"
"go.signoz.io/signoz/pkg/query-service/featureManager"
"go.signoz.io/signoz/pkg/query-service/healthcheck"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
@@ -151,9 +152,16 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
<-readerReady
rm, err := makeRulesManager(
serverOptions.PromConfigPath,
constants.GetAlertManagerApiPrefix(),
serverOptions.RuleRepoURL, serverOptions.SigNoz.SQLStore.SQLxDB(), reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema)
serverOptions.RuleRepoURL,
serverOptions.SigNoz.SQLStore.SQLxDB(),
reader,
c,
serverOptions.DisableRules,
fm,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
serverOptions.SigNoz.SQLStore,
)
if err != nil {
return nil, err
}
@@ -196,6 +204,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
JWT: serverOptions.Jwt,
AlertmanagerAPI: alertmanager.NewAPI(serverOptions.SigNoz.Alertmanager),
})
if err != nil {
return nil, err
@@ -278,7 +287,6 @@ func (s *Server) createPrivateServer(api *APIHandler) (*http.Server, error) {
}
func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server, error) {
r := NewRouter()
r.Use(middleware.NewAuth(zap.L(), s.serverOptions.Jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}).Wrap)
@@ -466,8 +474,6 @@ func (s *Server) Stop() error {
}
func makeRulesManager(
_,
alertManagerURL string,
ruleRepoURL string,
db *sqlx.DB,
ch interfaces.Reader,
@@ -475,7 +481,9 @@ func makeRulesManager(
disableRules bool,
fm interfaces.FeatureLookup,
useLogsNewSchema bool,
useTraceNewSchema bool) (*rules.Manager, error) {
useTraceNewSchema bool,
sqlstore sqlstore.SQLStore,
) (*rules.Manager, error) {
// create engine
pqle, err := pqle.FromReader(ch)
@@ -483,16 +491,8 @@ func makeRulesManager(
return nil, fmt.Errorf("failed to create pql engine : %v", err)
}
// notifier opts
notifierOpts := am.NotifierOptions{
QueueCapacity: 10000,
Timeout: 1 * time.Second,
AlertManagerURLs: []string{alertManagerURL},
}
// create manager opts
managerOpts := &rules.ManagerOptions{
NotifierOpts: notifierOpts,
PqlEngine: pqle,
RepoURL: ruleRepoURL,
DBConn: db,
@@ -505,6 +505,7 @@ func makeRulesManager(
EvalDelay: constants.GetEvalDelay(),
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
SQLStore: sqlstore,
}
// create Manager

View File

@@ -57,22 +57,12 @@ const PreferRPM = "PreferRPM"
const SpanSearchScopeRoot = "isroot"
const SpanSearchScopeEntryPoint = "isentrypoint"
func GetAlertManagerApiPrefix() string {
if os.Getenv("ALERTMANAGER_API_PREFIX") != "" {
return os.Getenv("ALERTMANAGER_API_PREFIX")
}
return "http://alertmanager:9093/api/"
}
var TELEMETRY_HEART_BEAT_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_HEART_BEAT_DURATION_MINUTES", 720)
var TELEMETRY_ACTIVE_USER_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_ACTIVE_USER_DURATION_MINUTES", 360)
var InviteEmailTemplate = GetOrDefaultEnv("INVITE_EMAIL_TEMPLATE", "/root/templates/invitation_email_template.html")
// Alert manager channel subpath
var AmChannelApiPath = GetOrDefaultEnv("ALERTMANAGER_API_CHANNEL_PATH", "v1/routes")
var OTLPTarget = GetOrDefaultEnv("OTEL_EXPORTER_OTLP_ENDPOINT", "")
var LogExportBatchSize = GetOrDefaultEnv("OTEL_BLRP_MAX_EXPORT_BATCH_SIZE", "512")

View File

@@ -1,21 +0,0 @@
package constants
import (
"os"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
func TestGetAlertManagerApiPrefix(t *testing.T) {
Convey("TestGetAlertManagerApiPrefix", t, func() {
res := GetAlertManagerApiPrefix()
So(res, ShouldEqual, "http://alertmanager:9093/api/")
Convey("WithEnvSet", func() {
os.Setenv("ALERTMANAGER_API_PREFIX", "http://test:9093/api/")
res = GetAlertManagerApiPrefix()
So(res, ShouldEqual, "http://test:9093/api/")
})
})
}

View File

@@ -1,200 +0,0 @@
package alertManager
// Wrapper to connect and process alert manager functions
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
neturl "net/url"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
)
const contentType = "application/json"
type Manager interface {
URL() *neturl.URL
URLPath(path string) *neturl.URL
AddRoute(receiver *Receiver) *model.ApiError
EditRoute(receiver *Receiver) *model.ApiError
DeleteRoute(name string) *model.ApiError
TestReceiver(receiver *Receiver) *model.ApiError
}
func defaultOptions() []ManagerOptions {
return []ManagerOptions{
WithURL(constants.GetAlertManagerApiPrefix()),
WithChannelApiPath(constants.AmChannelApiPath),
}
}
type ManagerOptions func(m *manager) error
func New(opts ...ManagerOptions) (Manager, error) {
m := &manager{}
newOpts := defaultOptions()
newOpts = append(newOpts, opts...)
for _, opt := range newOpts {
err := opt(m)
if err != nil {
return nil, err
}
}
return m, nil
}
func WithURL(url string) ManagerOptions {
return func(m *manager) error {
m.url = url
parsedURL, err := neturl.Parse(url)
if err != nil {
return err
}
m.parsedURL = parsedURL
return nil
}
}
func WithChannelApiPath(path string) ManagerOptions {
return func(m *manager) error {
m.channelApiPath = path
return nil
}
}
type manager struct {
url string
parsedURL *neturl.URL
channelApiPath string
}
func (m *manager) prepareAmChannelApiURL() string {
return fmt.Sprintf("%s%s", m.url, m.channelApiPath)
}
func (m *manager) prepareTestApiURL() string {
return fmt.Sprintf("%s%s", m.url, "v1/testReceiver")
}
func (m *manager) URL() *neturl.URL {
return m.parsedURL
}
func (m *manager) URLPath(path string) *neturl.URL {
upath, err := neturl.Parse(path)
if err != nil {
return nil
}
return m.parsedURL.ResolveReference(upath)
}
func (m *manager) AddRoute(receiver *Receiver) *model.ApiError {
receiverString, _ := json.Marshal(receiver)
amURL := m.prepareAmChannelApiURL()
response, err := http.Post(amURL, contentType, bytes.NewBuffer(receiverString))
if err != nil {
zap.L().Error("Error in getting response of API call to alertmanager", zap.String("url", amURL), zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
if response.StatusCode > 299 {
zap.L().Error("Error in getting 2xx response in API call to alertmanager", zap.String("url", amURL), zap.String("status", response.Status))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (m *manager) EditRoute(receiver *Receiver) *model.ApiError {
receiverString, _ := json.Marshal(receiver)
amURL := m.prepareAmChannelApiURL()
req, err := http.NewRequest(http.MethodPut, amURL, bytes.NewBuffer(receiverString))
if err != nil {
zap.L().Error("Error creating new update request for API call to alertmanager", zap.String("url", amURL), zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
req.Header.Add("Content-Type", contentType)
client := &http.Client{}
response, err := client.Do(req)
if err != nil {
zap.L().Error("Error in getting response of API call to alertmanager", zap.String("url", amURL), zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
if response.StatusCode > 299 {
zap.L().Error("Error in getting 2xx response in PUT API call to alertmanager", zap.String("url", amURL), zap.String("status", response.Status))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (m *manager) DeleteRoute(name string) *model.ApiError {
values := map[string]string{"name": name}
requestData, _ := json.Marshal(values)
amURL := m.prepareAmChannelApiURL()
req, err := http.NewRequest(http.MethodDelete, amURL, bytes.NewBuffer(requestData))
if err != nil {
zap.L().Error("Error in creating new delete request to alertmanager/v1/receivers", zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
req.Header.Add("Content-Type", contentType)
client := &http.Client{}
response, err := client.Do(req)
if err != nil {
zap.L().Error("Error in getting response of API call to alertmanager", zap.String("url", amURL), zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
if response.StatusCode > 299 {
err := fmt.Errorf(fmt.Sprintf("Error in getting 2xx response in PUT API call to alertmanager(DELETE %s)\n", amURL), response.Status)
zap.L().Error("Error in getting 2xx response in PUT API call to alertmanager", zap.String("url", amURL), zap.String("status", response.Status))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (m *manager) TestReceiver(receiver *Receiver) *model.ApiError {
receiverBytes, _ := json.Marshal(receiver)
amTestURL := m.prepareTestApiURL()
response, err := http.Post(amTestURL, contentType, bytes.NewBuffer(receiverBytes))
if err != nil {
zap.L().Error("Error in getting response of API call to alertmanager", zap.String("url", amTestURL), zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
if response.StatusCode > 201 && response.StatusCode < 400 {
err := fmt.Errorf(fmt.Sprintf("Invalid parameters in test alert api for alertmanager(POST %s)\n", amTestURL), response.Status)
zap.L().Error("Invalid parameters in test alert api for alertmanager", zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
if response.StatusCode > 400 {
err := fmt.Errorf(fmt.Sprintf("Received Server Error response for API call to alertmanager(POST %s)\n", amTestURL), response.Status)
zap.L().Error("Received Server Error response for API call to alertmanager", zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}

View File

@@ -1,79 +0,0 @@
package alertManager
import (
"fmt"
"time"
"go.signoz.io/signoz/pkg/query-service/utils/labels"
)
// Receiver configuration provides configuration on how to contact a receiver.
type Receiver struct {
// A unique identifier for this receiver.
Name string `yaml:"name" json:"name"`
EmailConfigs interface{} `yaml:"email_configs,omitempty" json:"email_configs,omitempty"`
PagerdutyConfigs interface{} `yaml:"pagerduty_configs,omitempty" json:"pagerduty_configs,omitempty"`
SlackConfigs interface{} `yaml:"slack_configs,omitempty" json:"slack_configs,omitempty"`
WebhookConfigs interface{} `yaml:"webhook_configs,omitempty" json:"webhook_configs,omitempty"`
OpsGenieConfigs interface{} `yaml:"opsgenie_configs,omitempty" json:"opsgenie_configs,omitempty"`
WechatConfigs interface{} `yaml:"wechat_configs,omitempty" json:"wechat_configs,omitempty"`
PushoverConfigs interface{} `yaml:"pushover_configs,omitempty" json:"pushover_configs,omitempty"`
VictorOpsConfigs interface{} `yaml:"victorops_configs,omitempty" json:"victorops_configs,omitempty"`
SNSConfigs interface{} `yaml:"sns_configs,omitempty" json:"sns_configs,omitempty"`
MSTeamsConfigs interface{} `yaml:"msteams_configs,omitempty" json:"msteams_configs,omitempty"`
}
type ReceiverResponse struct {
Status string `json:"status"`
Data Receiver `json:"data"`
}
// Alert is a generic representation of an alert in the Prometheus eco-system.
type Alert struct {
// Label value pairs for purpose of aggregation, matching, and disposition
// dispatching. This must minimally include an "alertname" label.
Labels labels.BaseLabels `json:"labels"`
// Extra key/value information which does not define alert identity.
Annotations labels.BaseLabels `json:"annotations"`
// The known time range for this alert. Both ends are optional.
StartsAt time.Time `json:"startsAt,omitempty"`
EndsAt time.Time `json:"endsAt,omitempty"`
GeneratorURL string `json:"generatorURL,omitempty"`
Receivers []string `json:"receivers,omitempty"`
}
// Name returns the name of the alert. It is equivalent to the "alertname" label.
func (a *Alert) Name() string {
return a.Labels.Get(labels.AlertNameLabel)
}
// Hash returns a hash over the alert. It is equivalent to the alert labels hash.
func (a *Alert) Hash() uint64 {
return a.Labels.Hash()
}
func (a *Alert) String() string {
s := fmt.Sprintf("%s[%s][%s]", a.Name(), fmt.Sprintf("%016x", a.Hash())[:7], a.Receivers)
if a.Resolved() {
return s + "[resolved]"
}
return s + "[active]"
}
// Resolved returns true iff the activity interval ended in the past.
func (a *Alert) Resolved() bool {
return a.ResolvedAt(time.Now())
}
// ResolvedAt returns true off the activity interval ended before
// the given timestamp.
func (a *Alert) ResolvedAt(ts time.Time) bool {
if a.EndsAt.IsZero() {
return false
}
return !a.EndsAt.After(ts)
}

View File

@@ -1,310 +0,0 @@
package alertManager
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync/atomic"
"net/http"
"net/url"
"sync"
"time"
old_ctx "golang.org/x/net/context"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"go.uber.org/zap"
"golang.org/x/net/context/ctxhttp"
)
const (
alertPushEndpoint = "v1/alerts"
contentTypeJSON = "application/json"
)
// Notifier is responsible for dispatching alert notifications to an
// alert manager service.
type Notifier struct {
queue []*Alert
opts *NotifierOptions
more chan struct{}
mtx sync.RWMutex
ctx context.Context
cancel func()
alertmanagers *alertmanagerSet
logger log.Logger
}
// NotifierOptions are the configurable parameters of a Handler.
type NotifierOptions struct {
QueueCapacity int
// Used for sending HTTP requests to the Alertmanager.
Do func(ctx old_ctx.Context, client *http.Client, req *http.Request) (*http.Response, error)
// List of alert manager urls
AlertManagerURLs []string
// timeout limit on requests
Timeout time.Duration
}
func (opts *NotifierOptions) String() string {
var urls string
for _, u := range opts.AlertManagerURLs {
urls = fmt.Sprintf("%s %s", urls, u)
}
return urls
}
// todo(amol): add metrics
func NewNotifier(o *NotifierOptions, logger log.Logger) (*Notifier, error) {
ctx, cancel := context.WithCancel(context.Background())
if o.Do == nil {
o.Do = ctxhttp.Do
}
if logger == nil {
logger = log.NewNopLogger()
}
n := &Notifier{
queue: make([]*Alert, 0, o.QueueCapacity),
ctx: ctx,
cancel: cancel,
more: make(chan struct{}, 1),
opts: o,
logger: logger,
}
timeout := o.Timeout
if int64(timeout) == 0 {
timeout = time.Duration(30 * time.Second)
}
amset, err := newAlertmanagerSet(o.AlertManagerURLs, timeout, logger)
if err != nil {
zap.L().Error("failed to parse alert manager urls")
return n, err
}
n.alertmanagers = amset
zap.L().Info("Starting notifier with alert manager", zap.Strings("urls", o.AlertManagerURLs))
return n, nil
}
const maxBatchSize = 64
func (n *Notifier) queueLen() int {
n.mtx.RLock()
defer n.mtx.RUnlock()
return len(n.queue)
}
func (n *Notifier) nextBatch() []*Alert {
n.mtx.Lock()
defer n.mtx.Unlock()
var alerts []*Alert
if len(n.queue) > maxBatchSize {
alerts = append(make([]*Alert, 0, maxBatchSize), n.queue[:maxBatchSize]...)
n.queue = n.queue[maxBatchSize:]
} else {
alerts = append(make([]*Alert, 0, len(n.queue)), n.queue...)
n.queue = n.queue[:0]
}
return alerts
}
// Run dispatches notifications continuously.
func (n *Notifier) Run() {
zap.L().Info("msg: Initiating alert notifier...")
for {
select {
case <-n.ctx.Done():
return
case <-n.more:
}
alerts := n.nextBatch()
if !n.sendAll(alerts...) {
zap.L().Warn("msg: dropped alerts", zap.Int("count", len(alerts)))
// n.metrics.dropped.Add(float64(len(alerts)))
}
// If the queue still has items left, kick off the next iteration.
if n.queueLen() > 0 {
n.setMore()
}
}
}
// Send queues the given notification requests for processing.
// Panics if called on a handler that is not running.
func (n *Notifier) Send(alerts ...*Alert) {
n.mtx.Lock()
defer n.mtx.Unlock()
// Queue capacity should be significantly larger than a single alert
// batch could be.
if d := len(alerts) - n.opts.QueueCapacity; d > 0 {
alerts = alerts[d:]
level.Warn(n.logger).Log("msg", "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d)
//n.metrics.dropped.Add(float64(d))
}
// If the queue is full, remove the oldest alerts in favor
// of newer ones.
if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 {
n.queue = n.queue[d:]
level.Warn(n.logger).Log("msg", "Alert notification queue full, dropping alerts", "num_dropped", d)
//n.metrics.dropped.Add(float64(d))
}
n.queue = append(n.queue, alerts...)
// Notify sending goroutine that there are alerts to be processed.
n.setMore()
}
// setMore signals that the alert queue has items.
func (n *Notifier) setMore() {
// If we cannot send on the channel, it means the signal already exists
// and has not been consumed yet.
select {
case n.more <- struct{}{}:
default:
}
}
// Alertmanagers returns a slice of Alertmanager URLs.
func (n *Notifier) Alertmanagers() []*url.URL {
n.mtx.RLock()
amset := n.alertmanagers
n.mtx.RUnlock()
var res []*url.URL
amset.mtx.RLock()
for _, am := range amset.ams {
res = append(res, am.URLPath(alertPushEndpoint))
}
amset.mtx.RUnlock()
return res
}
// sendAll sends the alerts to all configured Alertmanagers concurrently.
// It returns true if the alerts could be sent successfully to at least one Alertmanager.
func (n *Notifier) sendAll(alerts ...*Alert) bool {
b, err := json.Marshal(alerts)
if err != nil {
zap.L().Error("Encoding alerts failed", zap.Error(err))
return false
}
n.mtx.RLock()
ams := n.alertmanagers
n.mtx.RUnlock()
var (
wg sync.WaitGroup
numSuccess uint64
)
ams.mtx.RLock()
for _, am := range ams.ams {
wg.Add(1)
ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.timeout))
defer cancel()
go func(ams *alertmanagerSet, am Manager) {
u := am.URLPath(alertPushEndpoint).String()
if err := n.sendOne(ctx, ams.client, u, b); err != nil {
zap.L().Error("Error calling alert API", zap.String("alertmanager", u), zap.Int("count", len(alerts)), zap.Error(err))
} else {
atomic.AddUint64(&numSuccess, 1)
}
// n.metrics.latency.WithLabelValues(u).Observe(time.Since(begin).Seconds())
// n.metrics.sent.WithLabelValues(u).Add(float64(len(alerts)))
wg.Done()
}(ams, am)
}
ams.mtx.RUnlock()
wg.Wait()
return numSuccess > 0
}
func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error {
req, err := http.NewRequest("POST", url, bytes.NewReader(b))
if err != nil {
return err
}
req.Header.Set("Content-Type", contentTypeJSON)
resp, err := n.opts.Do(ctx, c, req)
if err != nil {
return err
}
defer resp.Body.Close()
// Any HTTP status 2xx is OK.
if resp.StatusCode/100 != 2 {
return fmt.Errorf("bad response status %v", resp.Status)
}
return err
}
// Stop shuts down the notification handler.
func (n *Notifier) Stop() {
level.Info(n.logger).Log("msg", "Stopping notification manager...")
n.cancel()
}
// alertmanagerSet contains a set of Alertmanagers discovered via a group of service
// discovery definitions that have a common configuration on how alerts should be sent.
type alertmanagerSet struct {
urls []string
client *http.Client
timeout time.Duration
mtx sync.RWMutex
ams []Manager
logger log.Logger
}
func newAlertmanagerSet(urls []string, timeout time.Duration, logger log.Logger) (*alertmanagerSet, error) {
client := &http.Client{}
s := &alertmanagerSet{
client: client,
urls: urls,
logger: logger,
timeout: timeout,
}
ams := []Manager{}
for _, u := range urls {
am, err := New(WithURL(u))
if err != nil {
level.Error(s.logger).Log(fmt.Sprintf("invalid alert manager url %s: %s", u, err))
} else {
ams = append(ams, am)
}
}
if len(ams) == 0 {
return s, fmt.Errorf("no alert managers")
}
s.ams = ams
return s, nil
}

View File

@@ -4,8 +4,6 @@ import (
"context"
"flag"
"os"
"os/signal"
"syscall"
"time"
prommodel "github.com/prometheus/common/model"
@@ -142,22 +140,20 @@ func main() {
logger.Fatal("Failed to initialize auth cache", zap.Error(err))
}
signalsChannel := make(chan os.Signal, 1)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
signoz.Start(context.Background())
for {
select {
case status := <-server.HealthCheckStatus():
logger.Info("Received HealthCheck status: ", zap.Int("status", int(status)))
case <-signalsChannel:
logger.Info("Received OS Interrupt Signal ... ")
err := server.Stop()
if err != nil {
logger.Fatal("Failed to stop server", zap.Error(err))
}
logger.Info("Server stopped")
return
}
if err := signoz.Wait(context.Background()); err != nil {
zap.L().Fatal("Failed to start signoz", zap.Error(err))
}
err = server.Stop()
if err != nil {
zap.L().Fatal("Failed to stop server", zap.Error(err))
}
err = signoz.Stop(context.Background())
if err != nil {
zap.L().Fatal("Failed to stop signoz", zap.Error(err))
}
}

View File

@@ -80,7 +80,6 @@ func parsePostableRule(content []byte, kind RuleDataKind) (*PostableRule, error)
// parseIntoRule loads the content (data) into PostableRule and also
// validates the end result
func parseIntoRule(initRule PostableRule, content []byte, kind RuleDataKind) (*PostableRule, error) {
rule := &initRule
var err error

View File

@@ -13,6 +13,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
qslabels "go.signoz.io/signoz/pkg/query-service/utils/labels"
"go.signoz.io/signoz/pkg/sqlstore"
"go.uber.org/zap"
)
@@ -78,6 +79,8 @@ type BaseRule struct {
// querying the v4 table on low cardinal temporality column
// should be fast but we can still avoid the query if we have the data in memory
TemporalityMap map[string]map[v3.Temporality]bool
sqlstore sqlstore.SQLStore
}
type RuleOption func(*BaseRule)
@@ -106,6 +109,12 @@ func WithLogger(logger *zap.Logger) RuleOption {
}
}
func WithSQLStore(sqlstore sqlstore.SQLStore) RuleOption {
return func(r *BaseRule) {
r.sqlstore = sqlstore
}
}
func NewBaseRule(id string, p *PostableRule, reader interfaces.Reader, opts ...RuleOption) (*BaseRule, error) {
if p.RuleCondition == nil || !p.RuleCondition.IsValid() {
return nil, fmt.Errorf("invalid rule condition")
@@ -309,6 +318,20 @@ func (r *BaseRule) ActiveAlerts() []*Alert {
}
func (r *BaseRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
var orgID string
err := r.
sqlstore.
BunDB().
NewSelect().
Table("organizations").
ColumnExpr("id").
Limit(1).
Scan(ctx, &orgID)
if err != nil {
r.logger.Error("failed to get org ids", zap.Error(err))
return
}
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
if alert.needsSending(ts, resendDelay) {
@@ -322,7 +345,7 @@ func (r *BaseRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay tim
alerts = append(alerts, &anew)
}
})
notifyFunc(ctx, "", alerts...)
notifyFunc(ctx, orgID, "", alerts...)
}
func (r *BaseRule) ForEachActiveAlert(f func(*Alert)) {

View File

@@ -11,30 +11,24 @@ import (
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/common"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"github.com/uptrace/bun"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/types/authtypes"
"go.uber.org/zap"
)
// Data store to capture user alert rule settings
type RuleDB interface {
GetChannel(id string) (*model.ChannelItem, *model.ApiError)
GetChannels() (*[]model.ChannelItem, *model.ApiError)
DeleteChannel(id string) *model.ApiError
CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError)
EditChannel(receiver *am.Receiver, id string) (*am.Receiver, *model.ApiError)
// CreateRuleTx stores rule in the db and returns tx and group name (on success)
CreateRuleTx(ctx context.Context, rule string) (int64, Tx, error)
// CreateRule stores rule in the db and returns tx and group name (on success)
CreateRule(context.Context, *StoredRule, func(context.Context, int64) error) (int64, error)
// EditRuleTx updates the given rule in the db and returns tx and group name (on success)
EditRuleTx(ctx context.Context, rule string, id string) (string, Tx, error)
EditRule(context.Context, *StoredRule, func(context.Context) error) error
// DeleteRuleTx deletes the given rule in the db and returns tx and group name (on success)
DeleteRuleTx(ctx context.Context, id string) (string, Tx, error)
DeleteRule(context.Context, string, func(context.Context) error) error
// GetStoredRules fetches the rule definitions from db
GetStoredRules(ctx context.Context) ([]StoredRule, error)
@@ -62,142 +56,83 @@ type RuleDB interface {
}
type StoredRule struct {
Id int `json:"id" db:"id"`
CreatedAt *time.Time `json:"created_at" db:"created_at"`
CreatedBy *string `json:"created_by" db:"created_by"`
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
UpdatedBy *string `json:"updated_by" db:"updated_by"`
Data string `json:"data" db:"data"`
}
bun.BaseModel `bun:"rules"`
type Tx interface {
Commit() error
Rollback() error
Id int `json:"id" db:"id" bun:"id,pk,autoincrement"`
CreatedAt *time.Time `json:"created_at" db:"created_at" bun:"created_at"`
CreatedBy *string `json:"created_by" db:"created_by" bun:"created_by"`
UpdatedAt *time.Time `json:"updated_at" db:"updated_at" bun:"updated_at"`
UpdatedBy *string `json:"updated_by" db:"updated_by" bun:"updated_by"`
Data string `json:"data" db:"data" bun:"data"`
}
type ruleDB struct {
*sqlx.DB
alertManager am.Manager
sqlstore sqlstore.SQLStore
}
// todo: move init methods for creating tables
func NewRuleDB(db *sqlx.DB, alertManager am.Manager) RuleDB {
return &ruleDB{
db,
alertManager,
}
func NewRuleDB(db *sqlx.DB, sqlstore sqlstore.SQLStore) RuleDB {
return &ruleDB{db, sqlstore}
}
// CreateRuleTx stores a given rule in db and returns task name,
// sql tx and error (if any)
func (r *ruleDB) CreateRuleTx(ctx context.Context, rule string) (int64, Tx, error) {
var lastInsertId int64
// CreateRule stores a given rule in db and returns task name and error (if any)
func (r *ruleDB) CreateRule(ctx context.Context, storedRule *StoredRule, cb func(context.Context, int64) error) (int64, error) {
err := r.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
_, err := r.sqlstore.
BunDBCtx(ctx).
NewInsert().
Model(storedRule).
Exec(ctx)
if err != nil {
return err
}
return cb(ctx, int64(storedRule.Id))
})
var userEmail string
if user := common.GetUserFromContext(ctx); user != nil {
userEmail = user.Email
}
createdAt := time.Now()
updatedAt := time.Now()
tx, err := r.Begin()
if err != nil {
return lastInsertId, nil, err
return 0, err
}
stmt, err := tx.Prepare(`INSERT into rules (created_at, created_by, updated_at, updated_by, data) VALUES($1,$2,$3,$4,$5);`)
if err != nil {
zap.L().Error("Error in preparing statement for INSERT to rules", zap.Error(err))
tx.Rollback()
return lastInsertId, nil, err
}
defer stmt.Close()
result, err := stmt.Exec(createdAt, userEmail, updatedAt, userEmail, rule)
if err != nil {
zap.L().Error("Error in Executing prepared statement for INSERT to rules", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return lastInsertId, nil, err
}
lastInsertId, err = result.LastInsertId()
if err != nil {
zap.L().Error("Error in getting last insert id for INSERT to rules\n", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return lastInsertId, nil, err
}
return lastInsertId, tx, nil
return int64(storedRule.Id), nil
}
// EditRuleTx stores a given rule string in database and returns
// task name, sql tx and error (if any)
func (r *ruleDB) EditRuleTx(ctx context.Context, rule string, id string) (string, Tx, error) {
// EditRule stores a given rule string in database and returns task name and error (if any)
func (r *ruleDB) EditRule(ctx context.Context, storedRule *StoredRule, cb func(context.Context) error) error {
return r.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
_, err := r.sqlstore.
BunDBCtx(ctx).
NewUpdate().
Model(storedRule).
WherePK().
Exec(ctx)
if err != nil {
return err
}
var groupName string
idInt, _ := strconv.Atoi(id)
if idInt == 0 {
return groupName, nil, fmt.Errorf("failed to read alert id from parameters")
}
var userEmail string
if user := common.GetUserFromContext(ctx); user != nil {
userEmail = user.Email
}
updatedAt := time.Now()
groupName = prepareTaskName(int64(idInt))
// todo(amol): resolve this error - database locked when using
// edit transaction with sqlx
// tx, err := r.Begin()
//if err != nil {
// return groupName, tx, err
//}
stmt, err := r.Prepare(`UPDATE rules SET updated_by=$1, updated_at=$2, data=$3 WHERE id=$4;`)
if err != nil {
zap.L().Error("Error in preparing statement for UPDATE to rules", zap.Error(err))
// tx.Rollback()
return groupName, nil, err
}
defer stmt.Close()
if _, err := stmt.Exec(userEmail, updatedAt, rule, idInt); err != nil {
zap.L().Error("Error in Executing prepared statement for UPDATE to rules", zap.Error(err))
// tx.Rollback() // return an error too, we may want to wrap them
return groupName, nil, err
}
return groupName, nil, nil
return cb(ctx)
})
}
// DeleteRuleTx deletes a given rule with id and returns
// taskname, sql tx and error (if any)
func (r *ruleDB) DeleteRuleTx(ctx context.Context, id string) (string, Tx, error) {
// DeleteRule deletes a given rule with id and returns taskname and error (if any)
func (r *ruleDB) DeleteRule(ctx context.Context, id string, cb func(context.Context) error) error {
if err := r.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
_, err := r.sqlstore.
BunDBCtx(ctx).
NewDelete().
Model(&StoredRule{}).
Where("id = ?", id).
Exec(ctx)
if err != nil {
return err
}
idInt, _ := strconv.Atoi(id)
groupName := prepareTaskName(int64(idInt))
// commented as this causes db locked error
// tx, err := r.Begin()
// if err != nil {
// return groupName, tx, err
// }
stmt, err := r.Prepare(`DELETE FROM rules WHERE id=$1;`)
if err != nil {
return groupName, nil, err
return cb(ctx)
}); err != nil {
return err
}
defer stmt.Close()
if _, err := stmt.Exec(idInt); err != nil {
zap.L().Error("Error in Executing prepared statement for DELETE to rules", zap.Error(err))
// tx.Rollback()
return groupName, nil, err
}
return groupName, nil, nil
return nil
}
func (r *ruleDB) GetStoredRules(ctx context.Context) ([]StoredRule, error) {
@@ -320,114 +255,7 @@ func (r *ruleDB) EditPlannedMaintenance(ctx context.Context, maintenance Planned
return "", nil
}
func getChannelType(receiver *am.Receiver) string {
if receiver.EmailConfigs != nil {
return "email"
}
if receiver.OpsGenieConfigs != nil {
return "opsgenie"
}
if receiver.PagerdutyConfigs != nil {
return "pagerduty"
}
if receiver.PushoverConfigs != nil {
return "pushover"
}
if receiver.SNSConfigs != nil {
return "sns"
}
if receiver.SlackConfigs != nil {
return "slack"
}
if receiver.VictorOpsConfigs != nil {
return "victorops"
}
if receiver.WebhookConfigs != nil {
return "webhook"
}
if receiver.WechatConfigs != nil {
return "wechat"
}
if receiver.MSTeamsConfigs != nil {
return "msteams"
}
return ""
}
func (r *ruleDB) GetChannel(id string) (*model.ChannelItem, *model.ApiError) {
idInt, _ := strconv.Atoi(id)
channel := model.ChannelItem{}
query := "SELECT id, created_at, updated_at, name, type, data data FROM notification_channels WHERE id=?;"
stmt, err := r.Preparex(query)
if err != nil {
zap.L().Error("Error in preparing sql query for GetChannel", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
err = stmt.Get(&channel, idInt)
if err != nil {
zap.L().Error("Error in getting channel with id", zap.Int("id", idInt), zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return &channel, nil
}
func (r *ruleDB) DeleteChannel(id string) *model.ApiError {
idInt, _ := strconv.Atoi(id)
channelToDelete, apiErrorObj := r.GetChannel(id)
if apiErrorObj != nil {
return apiErrorObj
}
tx, err := r.Begin()
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
{
stmt, err := tx.Prepare(`DELETE FROM notification_channels WHERE id=$1;`)
if err != nil {
zap.L().Error("Error in preparing statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback()
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()
if _, err := stmt.Exec(idInt); err != nil {
zap.L().Error("Error in Executing prepared statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}
apiError := r.alertManager.DeleteRoute(channelToDelete.Name)
if apiError != nil {
tx.Rollback()
return apiError
}
err = tx.Commit()
if err != nil {
zap.L().Error("Error in committing transaction for DELETE command to notification_channels", zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (r *ruleDB) GetChannels() (*[]model.ChannelItem, *model.ApiError) {
func (r *ruleDB) getChannels() (*[]model.ChannelItem, *model.ApiError) {
channels := []model.ChannelItem{}
query := "SELECT id, created_at, updated_at, name, type, data data FROM notification_channels"
@@ -442,105 +270,6 @@ func (r *ruleDB) GetChannels() (*[]model.ChannelItem, *model.ApiError) {
}
return &channels, nil
}
func (r *ruleDB) EditChannel(receiver *am.Receiver, id string) (*am.Receiver, *model.ApiError) {
idInt, _ := strconv.Atoi(id)
channel, apiErrObj := r.GetChannel(id)
if apiErrObj != nil {
return nil, apiErrObj
}
if channel.Name != receiver.Name {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("channel name cannot be changed")}
}
tx, err := r.Begin()
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
channel_type := getChannelType(receiver)
receiverString, _ := json.Marshal(receiver)
{
stmt, err := tx.Prepare(`UPDATE notification_channels SET updated_at=$1, type=$2, data=$3 WHERE id=$4;`)
if err != nil {
zap.L().Error("Error in preparing statement for UPDATE to notification_channels", zap.Error(err))
tx.Rollback()
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()
if _, err := stmt.Exec(time.Now(), channel_type, string(receiverString), idInt); err != nil {
zap.L().Error("Error in Executing prepared statement for UPDATE to notification_channels", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}
apiError := r.alertManager.EditRoute(receiver)
if apiError != nil {
tx.Rollback()
return nil, apiError
}
err = tx.Commit()
if err != nil {
zap.L().Error("Error in committing transaction for INSERT to notification_channels", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return receiver, nil
}
func (r *ruleDB) CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError) {
channel_type := getChannelType(receiver)
receiverString, _ := json.Marshal(receiver)
tx, err := r.Begin()
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
{
stmt, err := tx.Prepare(`INSERT INTO notification_channels (created_at, updated_at, name, type, data) VALUES($1,$2,$3,$4,$5);`)
if err != nil {
zap.L().Error("Error in preparing statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback()
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()
if _, err := stmt.Exec(time.Now(), time.Now(), receiver.Name, channel_type, string(receiverString)); err != nil {
zap.L().Error("Error in Executing prepared statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}
apiError := r.alertManager.AddRoute(receiver)
if apiError != nil {
tx.Rollback()
return nil, apiError
}
err = tx.Commit()
if err != nil {
zap.L().Error("Error in committing transaction for INSERT to notification_channels", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return receiver, nil
}
func (r *ruleDB) GetAlertsInfo(ctx context.Context) (*model.AlertsInfo, error) {
@@ -629,7 +358,7 @@ func (r *ruleDB) GetAlertsInfo(ctx context.Context) (*model.AlertsInfo, error) {
}
alertsInfo.AlertNames = alertNames
channels, _ := r.GetChannels()
channels, _ := r.getChannels()
if channels != nil {
alertsInfo.TotalChannels = len(*channels)
for _, channel := range *channels {

View File

@@ -14,41 +14,45 @@ import (
"errors"
"github.com/go-openapi/strfmt"
"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/query-service/cache"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
"go.signoz.io/signoz/pkg/query-service/telemetry"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
"go.signoz.io/signoz/pkg/types/authtypes"
)
type PrepareTaskOptions struct {
Rule *PostableRule
TaskName string
RuleDB RuleDB
Logger *zap.Logger
Reader interfaces.Reader
Cache cache.Cache
FF interfaces.FeatureLookup
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
Rule *PostableRule
TaskName string
RuleDB RuleDB
Logger *zap.Logger
Reader interfaces.Reader
Cache cache.Cache
FF interfaces.FeatureLookup
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
SQLStore sqlstore.SQLStore
UseLogsNewSchema bool
UseTraceNewSchema bool
}
type PrepareTestRuleOptions struct {
Rule *PostableRule
RuleDB RuleDB
Logger *zap.Logger
Reader interfaces.Reader
Cache cache.Cache
FF interfaces.FeatureLookup
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
Rule *PostableRule
RuleDB RuleDB
Logger *zap.Logger
Reader interfaces.Reader
Cache cache.Cache
FF interfaces.FeatureLookup
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
SQLStore sqlstore.SQLStore
UseLogsNewSchema bool
UseTraceNewSchema bool
}
@@ -72,8 +76,7 @@ func prepareTaskName(ruleId interface{}) string {
// ManagerOptions bundles options for the Manager.
type ManagerOptions struct {
NotifierOpts am.NotifierOptions
PqlEngine *pqle.PqlEngine
PqlEngine *pqle.PqlEngine
// RepoURL is used to generate a backlink in sent alert messages
RepoURL string
@@ -96,6 +99,8 @@ type ManagerOptions struct {
UseLogsNewSchema bool
UseTraceNewSchema bool
PrepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
Alertmanager alertmanager.Alertmanager
SQLStore sqlstore.SQLStore
}
// The Manager manages recording and alerting rules.
@@ -105,9 +110,6 @@ type Manager struct {
rules map[string]Rule
mtx sync.RWMutex
block chan struct{}
// Notifier sends messages through alert manager
notifier *am.Notifier
// datastore to store alert definitions
ruleDB RuleDB
@@ -121,15 +123,12 @@ type Manager struct {
UseLogsNewSchema bool
UseTraceNewSchema bool
alertmanager alertmanager.Alertmanager
sqlstore sqlstore.SQLStore
}
func defaultOptions(o *ManagerOptions) *ManagerOptions {
if o.NotifierOpts.QueueCapacity == 0 {
o.NotifierOpts.QueueCapacity = 10000
}
if o.NotifierOpts.Timeout == 0 {
o.NotifierOpts.Timeout = 10 * time.Second
}
if o.ResendDelay == time.Duration(0) {
o.ResendDelay = 1 * time.Minute
}
@@ -161,6 +160,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
WithEvalDelay(opts.ManagerOpts.EvalDelay),
WithSQLStore(opts.SQLStore),
)
if err != nil {
@@ -181,6 +181,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.Logger,
opts.Reader,
opts.ManagerOpts.PqlEngine,
WithSQLStore(opts.SQLStore),
)
if err != nil {
@@ -202,30 +203,12 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
// NewManager returns an implementation of Manager, ready to be started
// by calling the Run method.
func NewManager(o *ManagerOptions) (*Manager, error) {
o = defaultOptions(o)
// here we just initiate notifier, it will be started
// in run()
notifier, err := am.NewNotifier(&o.NotifierOpts, nil)
if err != nil {
// todo(amol): rethink on this, the query service
// should not be down because alert manager is not available
return nil, err
}
amManager, err := am.New()
if err != nil {
return nil, err
}
db := NewRuleDB(o.DBConn, amManager)
db := NewRuleDB(o.DBConn, o.SQLStore)
telemetry.GetInstance().SetAlertsInfoCallback(db.GetAlertsInfo)
m := &Manager{
tasks: map[string]Task{},
rules: map[string]Rule{},
notifier: notifier,
ruleDB: db,
opts: o,
block: make(chan struct{}),
@@ -235,7 +218,10 @@ func NewManager(o *ManagerOptions) (*Manager, error) {
cache: o.Cache,
prepareTaskFunc: o.PrepareTaskFunc,
prepareTestRuleFunc: o.PrepareTestRuleFunc,
alertmanager: o.Alertmanager,
sqlstore: o.SQLStore,
}
return m, nil
}
@@ -309,9 +295,6 @@ func (m *Manager) initiate() error {
// Run starts processing of the rule manager.
func (m *Manager) run() {
// initiate notifier
go m.notifier.Run()
// initiate blocked tasks
close(m.block)
}
@@ -333,26 +316,51 @@ func (m *Manager) Stop() {
// EditRuleDefinition writes the rule definition to the
// datastore and also updates the rule executor
func (m *Manager) EditRule(ctx context.Context, ruleStr string, id string) error {
claims, ok := authtypes.ClaimsFromContext(ctx)
if !ok {
return errors.New("claims not found in context")
}
parsedRule, err := ParsePostableRule([]byte(ruleStr))
if err != nil {
return err
}
taskName, _, err := m.ruleDB.EditRuleTx(ctx, ruleStr, id)
existingRule, err := m.ruleDB.GetStoredRule(ctx, id)
if err != nil {
return err
}
if !m.opts.DisableRules {
err = m.syncRuleStateWithTask(taskName, parsedRule)
now := time.Now()
existingRule.UpdatedAt = &now
existingRule.UpdatedBy = &claims.Email
existingRule.Data = ruleStr
return m.ruleDB.EditRule(ctx, existingRule, func(ctx context.Context) error {
cfg, err := m.alertmanager.GetConfig(ctx, claims.OrgID)
if err != nil {
return err
}
}
return nil
err = cfg.UpdateRuleIDMatcher(id, parsedRule.PreferredChannels)
if err != nil {
return err
}
err = m.alertmanager.SetConfig(ctx, cfg)
if err != nil {
return err
}
if !m.opts.DisableRules {
err = m.syncRuleStateWithTask(prepareTaskName(existingRule.Id), parsedRule)
if err != nil {
return err
}
}
return nil
})
}
func (m *Manager) editTask(rule *PostableRule, taskName string) error {
@@ -371,6 +379,7 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error {
FF: m.featureFlags,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
SQLStore: m.sqlstore,
UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
@@ -411,24 +420,45 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error {
}
func (m *Manager) DeleteRule(ctx context.Context, id string) error {
idInt, err := strconv.Atoi(id)
_, err := strconv.Atoi(id)
if err != nil {
zap.L().Error("delete rule received an rule id in invalid format, must be a number", zap.String("id", id), zap.Error(err))
return fmt.Errorf("delete rule received an rule id in invalid format, must be a number")
}
taskName := prepareTaskName(int64(idInt))
if !m.opts.DisableRules {
m.deleteTask(taskName)
claims, ok := authtypes.ClaimsFromContext(ctx)
if !ok {
return errors.New("claims not found in context")
}
if _, _, err := m.ruleDB.DeleteRuleTx(ctx, id); err != nil {
zap.L().Error("failed to delete the rule from rule db", zap.String("id", id), zap.Error(err))
_, err = m.ruleDB.GetStoredRule(ctx, id)
if err != nil {
return err
}
return nil
return m.ruleDB.DeleteRule(ctx, id, func(ctx context.Context) error {
cfg, err := m.alertmanager.GetConfig(ctx, claims.OrgID)
if err != nil {
return err
}
err = cfg.DeleteRuleIDMatcher(id)
if err != nil {
return err
}
err = m.alertmanager.SetConfig(ctx, cfg)
if err != nil {
return err
}
taskName := prepareTaskName(id)
if !m.opts.DisableRules {
m.deleteTask(taskName)
}
return nil
})
}
func (m *Manager) deleteTask(taskName string) {
@@ -451,32 +481,57 @@ func (m *Manager) deleteTask(taskName string) {
// starts an executor for the rule
func (m *Manager) CreateRule(ctx context.Context, ruleStr string) (*GettableRule, error) {
parsedRule, err := ParsePostableRule([]byte(ruleStr))
if err != nil {
return nil, err
}
lastInsertId, tx, err := m.ruleDB.CreateRuleTx(ctx, ruleStr)
taskName := prepareTaskName(lastInsertId)
if err != nil {
return nil, err
claims, ok := authtypes.ClaimsFromContext(ctx)
if !ok {
return nil, errors.New("claims not found in context")
}
if !m.opts.DisableRules {
if err := m.addTask(parsedRule, taskName); err != nil {
tx.Rollback()
return nil, err
now := time.Now()
storedRule := &StoredRule{
CreatedAt: &now,
CreatedBy: &claims.Email,
UpdatedAt: &now,
UpdatedBy: &claims.Email,
Data: ruleStr,
}
id, err := m.ruleDB.CreateRule(ctx, storedRule, func(ctx context.Context, id int64) error {
cfg, err := m.alertmanager.GetConfig(ctx, claims.OrgID)
if err != nil {
return err
}
}
err = tx.Commit()
err = cfg.CreateRuleIDMatcher(fmt.Sprintf("%d", id), parsedRule.PreferredChannels)
if err != nil {
return err
}
err = m.alertmanager.SetConfig(ctx, cfg)
if err != nil {
return err
}
taskName := prepareTaskName(id)
if !m.opts.DisableRules {
if err := m.addTask(parsedRule, taskName); err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}
gettableRule := &GettableRule{
Id: fmt.Sprintf("%d", lastInsertId),
return &GettableRule{
Id: fmt.Sprintf("%d", id),
PostableRule: *parsedRule,
}
return gettableRule, nil
}, nil
}
func (m *Manager) addTask(rule *PostableRule, taskName string) error {
@@ -494,6 +549,7 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error {
FF: m.featureFlags,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
SQLStore: m.sqlstore,
UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
@@ -594,12 +650,12 @@ func (m *Manager) TriggeredAlerts() []*NamedAlert {
}
// NotifyFunc sends notifications about a set of alerts generated by the given expression.
type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)
type NotifyFunc func(ctx context.Context, orgID string, expr string, alerts ...*Alert)
// prepareNotifyFunc implements the NotifyFunc for a Notifier.
func (m *Manager) prepareNotifyFunc() NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*Alert) {
var res []*am.Alert
return func(ctx context.Context, orgID string, expr string, alerts ...*Alert) {
var res []*alertmanagertypes.PostableAlert
for _, alert := range alerts {
generatorURL := alert.GeneratorURL
@@ -607,23 +663,51 @@ func (m *Manager) prepareNotifyFunc() NotifyFunc {
generatorURL = m.opts.RepoURL
}
a := &am.Alert{
StartsAt: alert.FiredAt,
Labels: alert.Labels,
Annotations: alert.Annotations,
GeneratorURL: generatorURL,
Receivers: alert.Receivers,
a := &alertmanagertypes.PostableAlert{
Annotations: alert.Annotations.Map(),
StartsAt: strfmt.DateTime(alert.FiredAt),
Alert: alertmanagertypes.AlertModel{
Labels: alert.Labels.Map(),
GeneratorURL: strfmt.URI(generatorURL),
},
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
a.EndsAt = strfmt.DateTime(alert.ResolvedAt)
} else {
a.EndsAt = alert.ValidUntil
a.EndsAt = strfmt.DateTime(alert.ValidUntil)
}
res = append(res, a)
}
if len(alerts) > 0 {
m.notifier.Send(res...)
m.alertmanager.PutAlerts(ctx, orgID, res)
}
}
}
func (m *Manager) prepareTestNotifyFunc() NotifyFunc {
return func(ctx context.Context, orgID string, expr string, alerts ...*Alert) {
for _, alert := range alerts {
generatorURL := alert.GeneratorURL
if generatorURL == "" {
generatorURL = m.opts.RepoURL
}
a := &alertmanagertypes.PostableAlert{
Annotations: alert.Annotations.Map(),
StartsAt: strfmt.DateTime(alert.FiredAt),
Alert: alertmanagertypes.AlertModel{
Labels: alert.Labels.Map(),
GeneratorURL: strfmt.URI(generatorURL),
},
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = strfmt.DateTime(alert.ResolvedAt)
} else {
a.EndsAt = strfmt.DateTime(alert.ValidUntil)
}
m.alertmanager.TestAlert(ctx, orgID, a, alert.Receivers)
}
}
}
@@ -736,6 +820,10 @@ func (m *Manager) syncRuleStateWithTask(taskName string, rule *PostableRule) err
// - re-deploy or undeploy task as necessary
// - update the patched rule in the DB
func (m *Manager) PatchRule(ctx context.Context, ruleStr string, ruleId string) (*GettableRule, error) {
claims, ok := authtypes.ClaimsFromContext(ctx)
if !ok {
return nil, errors.New("claims not found in context")
}
if ruleId == "" {
return nil, fmt.Errorf("id is mandatory for patching rule")
@@ -775,15 +863,19 @@ func (m *Manager) PatchRule(ctx context.Context, ruleStr string, ruleId string)
return nil, err
}
// write updated rule to db
if _, _, err = m.ruleDB.EditRuleTx(ctx, string(patchedRuleBytes), ruleId); err != nil {
// write failed, rollback task state
now := time.Now()
storedJSON.Data = string(patchedRuleBytes)
storedJSON.UpdatedBy = &claims.Email
storedJSON.UpdatedAt = &now
// restore task state from the stored rule
err = m.ruleDB.EditRule(ctx, storedJSON, func(ctx context.Context) error {
if err := m.syncRuleStateWithTask(taskName, &storedRule); err != nil {
zap.L().Error("failed to restore rule after patch failure", zap.String("taskName", taskName), zap.Error(err))
}
return nil
})
if err != nil {
return nil, err
}
@@ -822,7 +914,8 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
Cache: m.cache,
FF: m.featureFlags,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
NotifyFunc: m.prepareTestNotifyFunc(),
SQLStore: m.sqlstore,
UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
})

View File

@@ -52,6 +52,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
opts.UseTraceNewSchema,
WithSendAlways(),
WithSendUnmatched(),
WithSQLStore(opts.SQLStore),
)
if err != nil {
@@ -70,6 +71,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
opts.ManagerOpts.PqlEngine,
WithSendAlways(),
WithSendUnmatched(),
WithSQLStore(opts.SQLStore),
)
if err != nil {

View File

@@ -7,6 +7,7 @@ import (
"reflect"
"time"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/apiserver"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/config"
@@ -44,6 +45,9 @@ type Config struct {
// TelemetryStore config
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"`
// Alertmanager config
Alertmanager alertmanager.Config `mapstructure:"alertmanager"`
}
// DeprecatedFlags are the flags that are deprecated and scheduled for removal.
@@ -63,6 +67,7 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprec
sqlmigrator.NewConfigFactory(),
apiserver.NewConfigFactory(),
telemetrystore.NewConfigFactory(),
alertmanager.NewConfigFactory(),
}
conf, err := config.New(ctx, resolverConfig, configFactories)
@@ -138,17 +143,26 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
}
if deprecatedFlags.MaxIdleConns != 50 {
fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS env variable instead.")
fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS instead.")
config.TelemetryStore.Connection.MaxIdleConns = deprecatedFlags.MaxIdleConns
}
if deprecatedFlags.MaxOpenConns != 100 {
fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS env variable instead.")
fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS instead.")
config.TelemetryStore.Connection.MaxOpenConns = deprecatedFlags.MaxOpenConns
}
if deprecatedFlags.DialTimeout != 5*time.Second {
fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT environment variable instead.")
fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT instead.")
config.TelemetryStore.Connection.DialTimeout = deprecatedFlags.DialTimeout
}
if os.Getenv("ALERTMANAGER_API_PREFIX") != "" {
fmt.Println("[Deprecated] env ALERTMANAGER_API_PREFIX is deprecated and scheduled for removal. Please use SIGNOZ_ALERTMANAGER_LEGACY_API__URL instead.")
config.Alertmanager.Legacy.ApiURL = os.Getenv("ALERTMANAGER_API_PREFIX")
}
if os.Getenv("ALERTMANAGER_API_CHANNEL_PATH") != "" {
fmt.Println("[Deprecated] env ALERTMANAGER_API_CHANNEL_PATH is deprecated and scheduled for complete removal.")
}
}

View File

@@ -1,6 +1,9 @@
package signoz
import (
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/alertmanager/legacyalertmanager"
"go.signoz.io/signoz/pkg/alertmanager/signozalertmanager"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/cache/memorycache"
"go.signoz.io/signoz/pkg/cache/rediscache"
@@ -33,6 +36,9 @@ type ProviderConfig struct {
// Map of all telemetrystore provider factories
TelemetryStoreProviderFactories factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]]
// Map of all alertmanager provider factories
AlertmanagerProviderFactories factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]]
}
func NewProviderConfig() ProviderConfig {
@@ -62,9 +68,17 @@ func NewProviderConfig() ProviderConfig {
sqlmigration.NewAddPatsFactory(),
sqlmigration.NewModifyDatetimeFactory(),
sqlmigration.NewModifyOrgDomainFactory(),
sqlmigration.NewAddAlertmanagerFactory(),
),
TelemetryStoreProviderFactories: factory.MustNewNamedMap(
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewFactory()),
),
}
}
func NewAlertmanagerProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] {
return factory.MustNewNamedMap(
legacyalertmanager.NewFactory(sqlstore),
signozalertmanager.NewFactory(sqlstore),
)
}

View File

@@ -14,3 +14,9 @@ func TestNewProviderConfig(t *testing.T) {
NewProviderConfig()
})
}
func TestNewAlertmanagerProviderFactories(t *testing.T) {
assert.NotPanics(t, func() {
NewAlertmanagerProviderFactories(nil)
})
}

View File

@@ -3,6 +3,7 @@ package signoz
import (
"context"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/instrumentation"
@@ -16,10 +17,12 @@ import (
)
type SigNoz struct {
*factory.Registry
Cache cache.Cache
Web web.Web
SQLStore sqlstore.SQLStore
TelemetryStore telemetrystore.TelemetryStore
Alertmanager alertmanager.Alertmanager
}
func New(
@@ -102,10 +105,32 @@ func New(
return nil, err
}
alertmanager, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
config.Alertmanager,
NewAlertmanagerProviderFactories(sqlstore),
config.Alertmanager.Provider,
)
if err != nil {
return nil, err
}
registry, err := factory.NewRegistry(
instrumentation.Logger(),
factory.NewNamedService(factory.MustNewName("instrumentation"), instrumentation),
factory.NewNamedService(factory.MustNewName("alertmanager"), alertmanager),
)
if err != nil {
return nil, err
}
return &SigNoz{
Registry: registry,
Cache: cache,
Web: web,
SQLStore: sqlstore,
TelemetryStore: telemetrystore,
Alertmanager: alertmanager,
}, nil
}

View File

@@ -0,0 +1,206 @@
package sqlmigration
import (
"context"
"database/sql"
"strconv"
"strings"
"time"
"github.com/tidwall/gjson"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerserver"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
type addAlertmanager struct{}
func NewAddAlertmanagerFactory() factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("add_alertmanager"), newAddAlertmanager)
}
func newAddAlertmanager(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
return &addAlertmanager{}, nil
}
func (migration *addAlertmanager) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addAlertmanager) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() //nolint:errcheck
if _, err := tx.
NewDropColumn().
Table("notification_channels").
ColumnExpr("deleted").
Exec(ctx); err != nil {
if !strings.Contains(err.Error(), "no such column") {
return err
}
}
if _, err := tx.
NewAddColumn().
Table("notification_channels").
Apply(WrapIfNotExists(ctx, db, "notification_channels", "org_id")).
Exec(ctx); err != nil && err != ErrNoExecute {
return err
}
var orgID string
err = tx.
NewSelect().
ColumnExpr("id").
Table("organizations").
Limit(1).
Scan(ctx, &orgID)
if err != nil {
if err != sql.ErrNoRows {
return err
}
}
if err == nil {
err = migration.populateOrgID(ctx, tx, orgID)
if err != nil {
return err
}
}
if _, err := tx.
NewCreateTable().
Model(&struct {
bun.BaseModel `bun:"table:alertmanager_config"`
ID uint64 `bun:"id,pk,autoincrement"`
Config string `bun:"config,notnull,type:text"`
Hash string `bun:"hash,notnull,type:text"`
CreatedAt time.Time `bun:"created_at,notnull"`
UpdatedAt time.Time `bun:"updated_at,notnull"`
OrgID string `bun:"org_id,notnull,unique"`
}{}).
ForeignKey(`("org_id") REFERENCES "organizations" ("id")`).
IfNotExists().
Exec(ctx); err != nil {
return err
}
if _, err := tx.
NewCreateTable().
Model(&struct {
bun.BaseModel `bun:"table:alertmanager_state"`
ID uint64 `bun:"id,pk,autoincrement"`
Silences string `bun:"silences,nullzero,type:text"`
NFLog string `bun:"nflog,nullzero,type:text"`
CreatedAt time.Time `bun:"created_at,notnull"`
UpdatedAt time.Time `bun:"updated_at,notnull"`
OrgID string `bun:"org_id,notnull,unique"`
}{}).
ForeignKey(`("org_id") REFERENCES "organizations" ("id")`).
IfNotExists().
Exec(ctx); err != nil {
return err
}
if err := migration.populateAlertmanagerConfig(ctx, tx, orgID); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
func (migration *addAlertmanager) populateOrgID(ctx context.Context, tx bun.Tx, orgID string) error {
if _, err := tx.
NewUpdate().
Table("notification_channels").
Set("org_id = ?", orgID).
Where("org_id IS NULL").
Exec(ctx); err != nil {
return err
}
return nil
}
func (migration *addAlertmanager) populateAlertmanagerConfig(ctx context.Context, tx bun.Tx, orgID string) error {
var channels []*alertmanagertypes.Channel
err := tx.
NewSelect().
Model(&channels).
Where("org_id = ?", orgID).
Scan(ctx)
if err != nil {
return err
}
type matcher struct {
bun.BaseModel `bun:"table:rules"`
ID int `bun:"id,pk"`
Data string `bun:"data"`
}
matchers := []matcher{}
err = tx.
NewSelect().
Column("id", "data").
Model(&matchers).
Scan(ctx)
if err != nil {
return err
}
matchersMap := make(map[string][]string)
for _, matcher := range matchers {
receivers := gjson.Get(matcher.Data, "preferredChannels").Array()
for _, receiver := range receivers {
matchersMap[strconv.Itoa(matcher.ID)] = append(matchersMap[strconv.Itoa(matcher.ID)], receiver.String())
}
}
config, err := alertmanagertypes.NewConfigFromChannels(alertmanagerserver.NewConfig().Global, alertmanagerserver.NewConfig().Route, channels, orgID)
if err != nil {
return err
}
for ruleID, receivers := range matchersMap {
err = config.CreateRuleIDMatcher(ruleID, receivers)
if err != nil {
return err
}
}
if _, err := tx.
NewInsert().
Model(config.StoreableConfig()).
On("CONFLICT (org_id) DO UPDATE").
Set("config = ?", config.StoreableConfig().Config).
Set("hash = ?", config.StoreableConfig().Hash).
Set("updated_at = ?", config.StoreableConfig().UpdatedAt).
Exec(ctx); err != nil {
return err
}
return nil
}
func (migration *addAlertmanager) Down(ctx context.Context, db *bun.DB) error {
return nil
}