Compare commits
34 Commits
main
...
v0.75.0-98
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
98eee6f7fd | ||
|
|
65ee87beb9 | ||
|
|
bd3e0eeb6c | ||
|
|
9d6e09d3f6 | ||
|
|
cccc25e2ee | ||
|
|
9580da0478 | ||
|
|
8534b798c7 | ||
|
|
3bedd10ef9 | ||
|
|
b6f2ff052d | ||
|
|
d02dc8687c | ||
|
|
04835d14e1 | ||
|
|
5bd60d3f03 | ||
|
|
5168e0de2b | ||
|
|
f8a6c1940d | ||
|
|
81945dd73b | ||
|
|
6bfae83779 | ||
|
|
116b1ac607 | ||
|
|
5915866ef6 | ||
|
|
6362cfb482 | ||
|
|
a6be05b047 | ||
|
|
b6be22c687 | ||
|
|
0abb5fca93 | ||
|
|
ad58342319 | ||
|
|
4abd67c67e | ||
|
|
7de1fbe13f | ||
|
|
5606f73074 | ||
|
|
076b33dcba | ||
|
|
e770c3054c | ||
|
|
4df43463da | ||
|
|
2b4340a3e1 | ||
|
|
309ac976cd | ||
|
|
5ab0d77087 | ||
|
|
307afa02ff | ||
|
|
b9e29424ce |
@@ -11,6 +11,7 @@ import (
|
||||
"go.signoz.io/signoz/ee/query-service/interfaces"
|
||||
"go.signoz.io/signoz/ee/query-service/license"
|
||||
"go.signoz.io/signoz/ee/query-service/usage"
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
baseapp "go.signoz.io/signoz/pkg/query-service/app"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/integrations"
|
||||
@@ -20,6 +21,7 @@ import (
|
||||
basemodel "go.signoz.io/signoz/pkg/query-service/model"
|
||||
rules "go.signoz.io/signoz/pkg/query-service/rules"
|
||||
"go.signoz.io/signoz/pkg/query-service/version"
|
||||
"go.signoz.io/signoz/pkg/signoz"
|
||||
"go.signoz.io/signoz/pkg/types/authtypes"
|
||||
)
|
||||
|
||||
@@ -51,7 +53,7 @@ type APIHandler struct {
|
||||
}
|
||||
|
||||
// NewAPIHandler returns an APIHandler
|
||||
func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
|
||||
func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler, error) {
|
||||
|
||||
baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{
|
||||
Reader: opts.DataConnector,
|
||||
@@ -67,6 +69,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
|
||||
FluxInterval: opts.FluxInterval,
|
||||
UseLogsNewSchema: opts.UseLogsNewSchema,
|
||||
UseTraceNewSchema: opts.UseTraceNewSchema,
|
||||
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -23,8 +23,10 @@ import (
|
||||
"go.signoz.io/signoz/ee/query-service/integrations/gateway"
|
||||
"go.signoz.io/signoz/ee/query-service/interfaces"
|
||||
"go.signoz.io/signoz/ee/query-service/rules"
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/http/middleware"
|
||||
"go.signoz.io/signoz/pkg/signoz"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/types/authtypes"
|
||||
"go.signoz.io/signoz/pkg/web"
|
||||
|
||||
@@ -44,7 +46,6 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/cache"
|
||||
baseconst "go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/healthcheck"
|
||||
basealm "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
|
||||
baseint "go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
basemodel "go.signoz.io/signoz/pkg/query-service/model"
|
||||
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
|
||||
@@ -175,8 +176,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
}
|
||||
|
||||
<-readerReady
|
||||
rm, err := makeRulesManager(serverOptions.PromConfigPath,
|
||||
baseconst.GetAlertManagerApiPrefix(),
|
||||
rm, err := makeRulesManager(
|
||||
serverOptions.PromConfigPath,
|
||||
serverOptions.RuleRepoURL,
|
||||
serverOptions.SigNoz.SQLStore.SQLxDB(),
|
||||
reader,
|
||||
@@ -185,6 +186,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
lm,
|
||||
serverOptions.UseLogsNewSchema,
|
||||
serverOptions.UseTraceNewSchema,
|
||||
serverOptions.SigNoz.Alertmanager,
|
||||
serverOptions.SigNoz.SQLStore,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -267,7 +270,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
JWT: serverOptions.Jwt,
|
||||
}
|
||||
|
||||
apiHandler, err := api.NewAPIHandler(apiOpts)
|
||||
apiHandler, err := api.NewAPIHandler(apiOpts, serverOptions.SigNoz)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -529,7 +532,6 @@ func (s *Server) Stop() error {
|
||||
|
||||
func makeRulesManager(
|
||||
promConfigPath,
|
||||
alertManagerURL string,
|
||||
ruleRepoURL string,
|
||||
db *sqlx.DB,
|
||||
ch baseint.Reader,
|
||||
@@ -537,39 +539,34 @@ func makeRulesManager(
|
||||
disableRules bool,
|
||||
fm baseint.FeatureLookup,
|
||||
useLogsNewSchema bool,
|
||||
useTraceNewSchema bool) (*baserules.Manager, error) {
|
||||
|
||||
useTraceNewSchema bool,
|
||||
alertmanager alertmanager.Alertmanager,
|
||||
sqlstore sqlstore.SQLStore,
|
||||
) (*baserules.Manager, error) {
|
||||
// create engine
|
||||
pqle, err := pqle.FromConfigPath(promConfigPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create pql engine : %v", err)
|
||||
}
|
||||
|
||||
// notifier opts
|
||||
notifierOpts := basealm.NotifierOptions{
|
||||
QueueCapacity: 10000,
|
||||
Timeout: 1 * time.Second,
|
||||
AlertManagerURLs: []string{alertManagerURL},
|
||||
}
|
||||
|
||||
// create manager opts
|
||||
managerOpts := &baserules.ManagerOptions{
|
||||
NotifierOpts: notifierOpts,
|
||||
PqlEngine: pqle,
|
||||
RepoURL: ruleRepoURL,
|
||||
DBConn: db,
|
||||
Context: context.Background(),
|
||||
Logger: zap.L(),
|
||||
DisableRules: disableRules,
|
||||
FeatureFlags: fm,
|
||||
Reader: ch,
|
||||
Cache: cache,
|
||||
EvalDelay: baseconst.GetEvalDelay(),
|
||||
|
||||
PqlEngine: pqle,
|
||||
RepoURL: ruleRepoURL,
|
||||
DBConn: db,
|
||||
Context: context.Background(),
|
||||
Logger: zap.L(),
|
||||
DisableRules: disableRules,
|
||||
FeatureFlags: fm,
|
||||
Reader: ch,
|
||||
Cache: cache,
|
||||
EvalDelay: baseconst.GetEvalDelay(),
|
||||
PrepareTaskFunc: rules.PrepareTaskFunc,
|
||||
UseLogsNewSchema: useLogsNewSchema,
|
||||
UseTraceNewSchema: useTraceNewSchema,
|
||||
PrepareTestRuleFunc: rules.TestNotification,
|
||||
Alertmanager: alertmanager,
|
||||
SQLStore: sqlstore,
|
||||
}
|
||||
|
||||
// create Manager
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
@@ -198,16 +197,19 @@ func main() {
|
||||
zap.L().Fatal("Failed to initialize auth cache", zap.Error(err))
|
||||
}
|
||||
|
||||
signalsChannel := make(chan os.Signal, 1)
|
||||
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
|
||||
signoz.Start(context.Background())
|
||||
|
||||
for {
|
||||
select {
|
||||
case status := <-server.HealthCheckStatus():
|
||||
zap.L().Info("Received HealthCheck status: ", zap.Int("status", int(status)))
|
||||
case <-signalsChannel:
|
||||
zap.L().Fatal("Received OS Interrupt Signal ... ")
|
||||
server.Stop()
|
||||
}
|
||||
if err := signoz.Wait(context.Background()); err != nil {
|
||||
zap.L().Fatal("Failed to start signoz", zap.Error(err))
|
||||
}
|
||||
|
||||
err = server.Stop()
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to stop server", zap.Error(err))
|
||||
}
|
||||
|
||||
err = signoz.Stop(context.Background())
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to stop signoz", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
opts.UseLogsNewSchema,
|
||||
opts.UseTraceNewSchema,
|
||||
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -48,6 +49,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
opts.Logger,
|
||||
opts.Reader,
|
||||
opts.ManagerOpts.PqlEngine,
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -68,6 +70,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
opts.Reader,
|
||||
opts.Cache,
|
||||
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
)
|
||||
if err != nil {
|
||||
return task, err
|
||||
@@ -126,6 +129,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
|
||||
opts.UseTraceNewSchema,
|
||||
baserules.WithSendAlways(),
|
||||
baserules.WithSendUnmatched(),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -144,6 +148,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
|
||||
opts.ManagerOpts.PqlEngine,
|
||||
baserules.WithSendAlways(),
|
||||
baserules.WithSendUnmatched(),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -160,6 +165,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
|
||||
opts.Cache,
|
||||
baserules.WithSendAlways(),
|
||||
baserules.WithSendUnmatched(),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to prepare a new anomaly rule for test", zap.String("name", rule.Name()), zap.Error(err))
|
||||
|
||||
@@ -13,13 +13,11 @@ const getTriggered = async (
|
||||
|
||||
const response = await axios.get(`/alerts?${queryParams}`);
|
||||
|
||||
const amData = JSON.parse(response.data.data);
|
||||
|
||||
return {
|
||||
statusCode: 200,
|
||||
error: null,
|
||||
message: response.data.status,
|
||||
payload: amData.data,
|
||||
payload: response.data.data,
|
||||
};
|
||||
} catch (error) {
|
||||
return ErrorResponseHandler(error as AxiosError);
|
||||
|
||||
2
go.mod
2
go.mod
@@ -71,7 +71,6 @@ require (
|
||||
go.uber.org/zap v1.27.0
|
||||
golang.org/x/crypto v0.31.0
|
||||
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
|
||||
golang.org/x/net v0.33.0
|
||||
golang.org/x/oauth2 v0.24.0
|
||||
golang.org/x/sync v0.10.0
|
||||
golang.org/x/text v0.21.0
|
||||
@@ -267,6 +266,7 @@ require (
|
||||
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
|
||||
go.uber.org/atomic v1.11.0 // indirect
|
||||
golang.org/x/mod v0.22.0 // indirect
|
||||
golang.org/x/net v0.33.0 // indirect
|
||||
golang.org/x/sys v0.29.0 // indirect
|
||||
golang.org/x/time v0.6.0 // indirect
|
||||
golang.org/x/tools v0.28.0 // indirect
|
||||
|
||||
@@ -37,6 +37,7 @@ type provider struct {
|
||||
configStore alertmanagertypes.ConfigStore
|
||||
batcher *alertmanagerbatcher.Batcher
|
||||
url *url.URL
|
||||
orgID string
|
||||
}
|
||||
|
||||
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
|
||||
@@ -73,8 +74,25 @@ func (provider *provider) Start(ctx context.Context) error {
|
||||
}
|
||||
|
||||
for alerts := range provider.batcher.C {
|
||||
if err := provider.putAlerts(ctx, "", alerts); err != nil {
|
||||
provider.settings.Logger().Error("failed to send alerts to alertmanager", "error", err)
|
||||
// For the first time, we need to get the orgID from the config store.
|
||||
// Since this is the legacy alertmanager, we get the first org from the store.
|
||||
if provider.orgID == "" {
|
||||
orgIDs, err := provider.configStore.ListOrgs(ctx)
|
||||
if err != nil {
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to send alerts to alertmanager", "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(orgIDs) == 0 {
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to send alerts to alertmanager", "error", "no orgs found")
|
||||
continue
|
||||
}
|
||||
|
||||
provider.orgID = orgIDs[0]
|
||||
}
|
||||
|
||||
if err := provider.putAlerts(ctx, provider.orgID, alerts); err != nil {
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to send alerts to alertmanager", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@ func NewRegistry(logger *slog.Logger, services ...NamedService) (*Registry, erro
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *Registry) Start(ctx context.Context) error {
|
||||
func (r *Registry) Start(ctx context.Context) {
|
||||
for _, s := range r.services.GetInOrder() {
|
||||
go func(s NamedService) {
|
||||
r.logger.InfoContext(ctx, "starting service", "service", s.Name())
|
||||
@@ -49,7 +49,6 @@ func (r *Registry) Start(ctx context.Context) error {
|
||||
}(s)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Registry) Wait(ctx context.Context) error {
|
||||
|
||||
@@ -41,7 +41,7 @@ func TestRegistryWith2Services(t *testing.T) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
require.NoError(t, registry.Start(ctx))
|
||||
registry.Start(ctx)
|
||||
require.NoError(t, registry.Wait(ctx))
|
||||
require.NoError(t, registry.Stop(ctx))
|
||||
}()
|
||||
@@ -62,7 +62,7 @@ func TestRegistryWith2ServicesWithoutWait(t *testing.T) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
require.NoError(t, registry.Start(ctx))
|
||||
registry.Start(ctx)
|
||||
require.NoError(t, registry.Stop(ctx))
|
||||
}()
|
||||
|
||||
|
||||
@@ -78,6 +78,12 @@ func (writer *nonFlushingBadResponseLoggingWriter) Write(data []byte) (int, erro
|
||||
// https://godoc.org/net/http#ResponseWriter
|
||||
writer.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
// 204 No Content is a success response that indicates that the request has been successfully processed and that the response body is intentionally empty.
|
||||
if writer.statusCode == 204 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
n, err := writer.rw.Write(data)
|
||||
if writer.logBody {
|
||||
writer.captureResponseBody(data)
|
||||
|
||||
@@ -23,6 +23,7 @@ type SDK struct {
|
||||
logger *slog.Logger
|
||||
sdk contribsdkconfig.SDK
|
||||
prometheusRegistry *prometheus.Registry
|
||||
startCh chan struct{}
|
||||
}
|
||||
|
||||
// New creates a new Instrumentation instance with configured providers.
|
||||
@@ -96,14 +97,17 @@ func New(ctx context.Context, build version.Build, cfg Config) (*SDK, error) {
|
||||
sdk: sdk,
|
||||
prometheusRegistry: prometheusRegistry,
|
||||
logger: NewLogger(cfg),
|
||||
startCh: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (i *SDK) Start(ctx context.Context) error {
|
||||
<-i.startCh
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *SDK) Stop(ctx context.Context) error {
|
||||
close(i.startCh)
|
||||
return i.sdk.Shutdown(ctx)
|
||||
}
|
||||
|
||||
|
||||
@@ -49,7 +49,6 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
|
||||
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
|
||||
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
"go.signoz.io/signoz/pkg/query-service/metrics"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
@@ -145,7 +144,6 @@ type ClickHouseReader struct {
|
||||
|
||||
promConfigFile string
|
||||
promConfig *config.Config
|
||||
alertManager am.Manager
|
||||
featureFlags interfaces.FeatureLookup
|
||||
|
||||
liveTailRefreshSeconds int
|
||||
@@ -194,13 +192,6 @@ func NewReaderFromClickhouseConnection(
|
||||
fluxIntervalForTraceDetail time.Duration,
|
||||
cache cache.Cache,
|
||||
) *ClickHouseReader {
|
||||
alertManager, err := am.New()
|
||||
if err != nil {
|
||||
zap.L().Error("failed to initialize alert manager", zap.Error(err))
|
||||
zap.L().Error("check if the alert manager URL is correctly set and valid")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
logsTableName := options.primary.LogsTable
|
||||
logsLocalTableName := options.primary.LogsLocalTable
|
||||
if useLogsNewSchema {
|
||||
@@ -219,7 +210,6 @@ func NewReaderFromClickhouseConnection(
|
||||
db: db,
|
||||
localDB: localDB,
|
||||
TraceDB: options.primary.TraceDB,
|
||||
alertManager: alertManager,
|
||||
operationsTable: options.primary.OperationsTable,
|
||||
indexTable: options.primary.IndexTable,
|
||||
errorTable: options.primary.ErrorTable,
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/metricsexplorer"
|
||||
"io"
|
||||
"math"
|
||||
"net/http"
|
||||
@@ -19,6 +18,9 @@ import (
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/metricsexplorer"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gorilla/websocket"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
@@ -58,7 +60,6 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
|
||||
"go.signoz.io/signoz/pkg/query-service/dao"
|
||||
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
|
||||
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
"go.signoz.io/signoz/pkg/query-service/rules"
|
||||
@@ -84,7 +85,6 @@ type APIHandler struct {
|
||||
reader interfaces.Reader
|
||||
skipConfig *model.SkipConfig
|
||||
appDao dao.ModelDao
|
||||
alertManager am.Manager
|
||||
ruleManager *rules.Manager
|
||||
featureFlags interfaces.FeatureLookup
|
||||
querier interfaces.Querier
|
||||
@@ -133,6 +133,8 @@ type APIHandler struct {
|
||||
pvcsRepo *inframetrics.PvcsRepo
|
||||
|
||||
JWT *authtypes.JWT
|
||||
|
||||
AlertmanagerAPI *alertmanager.API
|
||||
}
|
||||
|
||||
type APIHandlerOpts struct {
|
||||
@@ -174,16 +176,12 @@ type APIHandlerOpts struct {
|
||||
UseTraceNewSchema bool
|
||||
|
||||
JWT *authtypes.JWT
|
||||
|
||||
AlertmanagerAPI *alertmanager.API
|
||||
}
|
||||
|
||||
// NewAPIHandler returns an APIHandler
|
||||
func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
||||
|
||||
alertManager, err := am.New()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
querierOpts := querier.QuerierOptions{
|
||||
Reader: opts.Reader,
|
||||
Cache: opts.Cache,
|
||||
@@ -227,7 +225,6 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
||||
skipConfig: opts.SkipConfig,
|
||||
preferSpanMetrics: opts.PreferSpanMetrics,
|
||||
temporalityMap: make(map[string]map[v3.Temporality]bool),
|
||||
alertManager: alertManager,
|
||||
ruleManager: opts.RuleManager,
|
||||
featureFlags: opts.FeatureFlags,
|
||||
IntegrationsController: opts.IntegrationsController,
|
||||
@@ -250,6 +247,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
||||
pvcsRepo: pvcsRepo,
|
||||
JWT: opts.JWT,
|
||||
SummaryService: summaryService,
|
||||
AlertmanagerAPI: opts.AlertmanagerAPI,
|
||||
}
|
||||
|
||||
logsQueryBuilder := logsv3.PrepareLogsQuery
|
||||
@@ -483,21 +481,21 @@ func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) {
|
||||
|
||||
// RegisterPrivateRoutes registers routes for this handler on the given router
|
||||
func (aH *APIHandler) RegisterPrivateRoutes(router *mux.Router) {
|
||||
router.HandleFunc("/api/v1/channels", aH.listChannels).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/channels", aH.AlertmanagerAPI.ListAllChannels).Methods(http.MethodGet)
|
||||
}
|
||||
|
||||
// RegisterRoutes registers routes for this handler on the given router
|
||||
func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
|
||||
router.HandleFunc("/api/v1/query_range", am.ViewAccess(aH.queryRangeMetrics)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/query", am.ViewAccess(aH.queryMetrics)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/channels", am.ViewAccess(aH.listChannels)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/channels/{id}", am.ViewAccess(aH.getChannel)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.editChannel)).Methods(http.MethodPut)
|
||||
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.deleteChannel)).Methods(http.MethodDelete)
|
||||
router.HandleFunc("/api/v1/channels", am.EditAccess(aH.createChannel)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/testChannel", am.EditAccess(aH.testChannel)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/channels", am.ViewAccess(aH.AlertmanagerAPI.ListChannels)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/channels/{id}", am.ViewAccess(aH.AlertmanagerAPI.GetChannelByID)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.AlertmanagerAPI.UpdateChannelByID)).Methods(http.MethodPut)
|
||||
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.AlertmanagerAPI.DeleteChannelByID)).Methods(http.MethodDelete)
|
||||
router.HandleFunc("/api/v1/channels", am.EditAccess(aH.AlertmanagerAPI.CreateChannel)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/testChannel", am.EditAccess(aH.AlertmanagerAPI.TestReceiver)).Methods(http.MethodPost)
|
||||
|
||||
router.HandleFunc("/api/v1/alerts", am.ViewAccess(aH.getAlerts)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/alerts", am.ViewAccess(aH.AlertmanagerAPI.GetAlerts)).Methods(http.MethodGet)
|
||||
|
||||
router.HandleFunc("/api/v1/rules", am.ViewAccess(aH.listRules)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/rules/{id}", am.ViewAccess(aH.getRule)).Methods(http.MethodGet)
|
||||
@@ -1363,138 +1361,6 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) {
|
||||
id := mux.Vars(r)["id"]
|
||||
channel, apiErrorObj := aH.ruleManager.RuleDB().GetChannel(id)
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, channel)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) {
|
||||
id := mux.Vars(r)["id"]
|
||||
apiErrorObj := aH.ruleManager.RuleDB().DeleteChannel(id)
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, "notification channel successfully deleted")
|
||||
}
|
||||
|
||||
func (aH *APIHandler) listChannels(w http.ResponseWriter, r *http.Request) {
|
||||
channels, apiErrorObj := aH.ruleManager.RuleDB().GetChannels()
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, channels)
|
||||
}
|
||||
|
||||
// testChannels sends test alert to all registered channels
|
||||
func (aH *APIHandler) testChannel(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
defer r.Body.Close()
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in getting req body of testChannel API", zap.Error(err))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
receiver := &am.Receiver{}
|
||||
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
|
||||
zap.L().Error("Error in parsing req body of testChannel API\n", zap.Error(err))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
// send alert
|
||||
apiErrorObj := aH.alertManager.TestReceiver(receiver)
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, "test alert sent")
|
||||
}
|
||||
|
||||
func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
id := mux.Vars(r)["id"]
|
||||
|
||||
defer r.Body.Close()
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in getting req body of editChannel API", zap.Error(err))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
receiver := &am.Receiver{}
|
||||
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
|
||||
zap.L().Error("Error in parsing req body of editChannel API", zap.Error(err))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
_, apiErrorObj := aH.ruleManager.RuleDB().EditChannel(receiver, id)
|
||||
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, nil)
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
defer r.Body.Close()
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in getting req body of createChannel API", zap.Error(err))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
receiver := &am.Receiver{}
|
||||
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
|
||||
zap.L().Error("Error in parsing req body of createChannel API", zap.Error(err))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
_, apiErrorObj := aH.ruleManager.RuleDB().CreateChannel(receiver)
|
||||
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, nil)
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getAlerts(w http.ResponseWriter, r *http.Request) {
|
||||
params := r.URL.Query()
|
||||
amEndpoint := constants.GetAlertManagerApiPrefix()
|
||||
resp, err := http.Get(amEndpoint + "v1/alerts" + "?" + params.Encode())
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, string(body))
|
||||
}
|
||||
|
||||
func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
defer r.Body.Close()
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/rs/cors"
|
||||
"github.com/soheilhy/cmux"
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/http/middleware"
|
||||
"go.signoz.io/signoz/pkg/query-service/agentConf"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
|
||||
@@ -25,6 +26,7 @@ import (
|
||||
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/preferences"
|
||||
"go.signoz.io/signoz/pkg/signoz"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/types/authtypes"
|
||||
"go.signoz.io/signoz/pkg/web"
|
||||
|
||||
@@ -35,7 +37,6 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/dao"
|
||||
"go.signoz.io/signoz/pkg/query-service/featureManager"
|
||||
"go.signoz.io/signoz/pkg/query-service/healthcheck"
|
||||
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
|
||||
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
|
||||
@@ -151,9 +152,16 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
|
||||
<-readerReady
|
||||
rm, err := makeRulesManager(
|
||||
serverOptions.PromConfigPath,
|
||||
constants.GetAlertManagerApiPrefix(),
|
||||
serverOptions.RuleRepoURL, serverOptions.SigNoz.SQLStore.SQLxDB(), reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema)
|
||||
serverOptions.RuleRepoURL,
|
||||
serverOptions.SigNoz.SQLStore.SQLxDB(),
|
||||
reader,
|
||||
c,
|
||||
serverOptions.DisableRules,
|
||||
fm,
|
||||
serverOptions.UseLogsNewSchema,
|
||||
serverOptions.UseTraceNewSchema,
|
||||
serverOptions.SigNoz.SQLStore,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -196,6 +204,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
|
||||
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
|
||||
JWT: serverOptions.Jwt,
|
||||
AlertmanagerAPI: alertmanager.NewAPI(serverOptions.SigNoz.Alertmanager),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -278,7 +287,6 @@ func (s *Server) createPrivateServer(api *APIHandler) (*http.Server, error) {
|
||||
}
|
||||
|
||||
func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server, error) {
|
||||
|
||||
r := NewRouter()
|
||||
|
||||
r.Use(middleware.NewAuth(zap.L(), s.serverOptions.Jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}).Wrap)
|
||||
@@ -466,8 +474,6 @@ func (s *Server) Stop() error {
|
||||
}
|
||||
|
||||
func makeRulesManager(
|
||||
_,
|
||||
alertManagerURL string,
|
||||
ruleRepoURL string,
|
||||
db *sqlx.DB,
|
||||
ch interfaces.Reader,
|
||||
@@ -475,7 +481,9 @@ func makeRulesManager(
|
||||
disableRules bool,
|
||||
fm interfaces.FeatureLookup,
|
||||
useLogsNewSchema bool,
|
||||
useTraceNewSchema bool) (*rules.Manager, error) {
|
||||
useTraceNewSchema bool,
|
||||
sqlstore sqlstore.SQLStore,
|
||||
) (*rules.Manager, error) {
|
||||
|
||||
// create engine
|
||||
pqle, err := pqle.FromReader(ch)
|
||||
@@ -483,16 +491,8 @@ func makeRulesManager(
|
||||
return nil, fmt.Errorf("failed to create pql engine : %v", err)
|
||||
}
|
||||
|
||||
// notifier opts
|
||||
notifierOpts := am.NotifierOptions{
|
||||
QueueCapacity: 10000,
|
||||
Timeout: 1 * time.Second,
|
||||
AlertManagerURLs: []string{alertManagerURL},
|
||||
}
|
||||
|
||||
// create manager opts
|
||||
managerOpts := &rules.ManagerOptions{
|
||||
NotifierOpts: notifierOpts,
|
||||
PqlEngine: pqle,
|
||||
RepoURL: ruleRepoURL,
|
||||
DBConn: db,
|
||||
@@ -505,6 +505,7 @@ func makeRulesManager(
|
||||
EvalDelay: constants.GetEvalDelay(),
|
||||
UseLogsNewSchema: useLogsNewSchema,
|
||||
UseTraceNewSchema: useTraceNewSchema,
|
||||
SQLStore: sqlstore,
|
||||
}
|
||||
|
||||
// create Manager
|
||||
|
||||
@@ -57,22 +57,12 @@ const PreferRPM = "PreferRPM"
|
||||
const SpanSearchScopeRoot = "isroot"
|
||||
const SpanSearchScopeEntryPoint = "isentrypoint"
|
||||
|
||||
func GetAlertManagerApiPrefix() string {
|
||||
if os.Getenv("ALERTMANAGER_API_PREFIX") != "" {
|
||||
return os.Getenv("ALERTMANAGER_API_PREFIX")
|
||||
}
|
||||
return "http://alertmanager:9093/api/"
|
||||
}
|
||||
|
||||
var TELEMETRY_HEART_BEAT_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_HEART_BEAT_DURATION_MINUTES", 720)
|
||||
|
||||
var TELEMETRY_ACTIVE_USER_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_ACTIVE_USER_DURATION_MINUTES", 360)
|
||||
|
||||
var InviteEmailTemplate = GetOrDefaultEnv("INVITE_EMAIL_TEMPLATE", "/root/templates/invitation_email_template.html")
|
||||
|
||||
// Alert manager channel subpath
|
||||
var AmChannelApiPath = GetOrDefaultEnv("ALERTMANAGER_API_CHANNEL_PATH", "v1/routes")
|
||||
|
||||
var OTLPTarget = GetOrDefaultEnv("OTEL_EXPORTER_OTLP_ENDPOINT", "")
|
||||
var LogExportBatchSize = GetOrDefaultEnv("OTEL_BLRP_MAX_EXPORT_BATCH_SIZE", "512")
|
||||
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
package constants
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestGetAlertManagerApiPrefix(t *testing.T) {
|
||||
Convey("TestGetAlertManagerApiPrefix", t, func() {
|
||||
res := GetAlertManagerApiPrefix()
|
||||
So(res, ShouldEqual, "http://alertmanager:9093/api/")
|
||||
|
||||
Convey("WithEnvSet", func() {
|
||||
os.Setenv("ALERTMANAGER_API_PREFIX", "http://test:9093/api/")
|
||||
res = GetAlertManagerApiPrefix()
|
||||
So(res, ShouldEqual, "http://test:9093/api/")
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -1,200 +0,0 @@
|
||||
package alertManager
|
||||
|
||||
// Wrapper to connect and process alert manager functions
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
neturl "net/url"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const contentType = "application/json"
|
||||
|
||||
type Manager interface {
|
||||
URL() *neturl.URL
|
||||
URLPath(path string) *neturl.URL
|
||||
AddRoute(receiver *Receiver) *model.ApiError
|
||||
EditRoute(receiver *Receiver) *model.ApiError
|
||||
DeleteRoute(name string) *model.ApiError
|
||||
TestReceiver(receiver *Receiver) *model.ApiError
|
||||
}
|
||||
|
||||
func defaultOptions() []ManagerOptions {
|
||||
return []ManagerOptions{
|
||||
WithURL(constants.GetAlertManagerApiPrefix()),
|
||||
WithChannelApiPath(constants.AmChannelApiPath),
|
||||
}
|
||||
}
|
||||
|
||||
type ManagerOptions func(m *manager) error
|
||||
|
||||
func New(opts ...ManagerOptions) (Manager, error) {
|
||||
m := &manager{}
|
||||
|
||||
newOpts := defaultOptions()
|
||||
newOpts = append(newOpts, opts...)
|
||||
|
||||
for _, opt := range newOpts {
|
||||
err := opt(m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func WithURL(url string) ManagerOptions {
|
||||
return func(m *manager) error {
|
||||
m.url = url
|
||||
parsedURL, err := neturl.Parse(url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.parsedURL = parsedURL
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithChannelApiPath(path string) ManagerOptions {
|
||||
return func(m *manager) error {
|
||||
m.channelApiPath = path
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
type manager struct {
|
||||
url string
|
||||
parsedURL *neturl.URL
|
||||
channelApiPath string
|
||||
}
|
||||
|
||||
func (m *manager) prepareAmChannelApiURL() string {
|
||||
return fmt.Sprintf("%s%s", m.url, m.channelApiPath)
|
||||
}
|
||||
|
||||
func (m *manager) prepareTestApiURL() string {
|
||||
return fmt.Sprintf("%s%s", m.url, "v1/testReceiver")
|
||||
}
|
||||
|
||||
func (m *manager) URL() *neturl.URL {
|
||||
return m.parsedURL
|
||||
}
|
||||
|
||||
func (m *manager) URLPath(path string) *neturl.URL {
|
||||
upath, err := neturl.Parse(path)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return m.parsedURL.ResolveReference(upath)
|
||||
}
|
||||
|
||||
func (m *manager) AddRoute(receiver *Receiver) *model.ApiError {
|
||||
|
||||
receiverString, _ := json.Marshal(receiver)
|
||||
|
||||
amURL := m.prepareAmChannelApiURL()
|
||||
response, err := http.Post(amURL, contentType, bytes.NewBuffer(receiverString))
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in getting response of API call to alertmanager", zap.String("url", amURL), zap.Error(err))
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
if response.StatusCode > 299 {
|
||||
zap.L().Error("Error in getting 2xx response in API call to alertmanager", zap.String("url", amURL), zap.String("status", response.Status))
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *manager) EditRoute(receiver *Receiver) *model.ApiError {
|
||||
receiverString, _ := json.Marshal(receiver)
|
||||
|
||||
amURL := m.prepareAmChannelApiURL()
|
||||
req, err := http.NewRequest(http.MethodPut, amURL, bytes.NewBuffer(receiverString))
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error creating new update request for API call to alertmanager", zap.String("url", amURL), zap.Error(err))
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
req.Header.Add("Content-Type", contentType)
|
||||
|
||||
client := &http.Client{}
|
||||
response, err := client.Do(req)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in getting response of API call to alertmanager", zap.String("url", amURL), zap.Error(err))
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
if response.StatusCode > 299 {
|
||||
zap.L().Error("Error in getting 2xx response in PUT API call to alertmanager", zap.String("url", amURL), zap.String("status", response.Status))
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *manager) DeleteRoute(name string) *model.ApiError {
|
||||
values := map[string]string{"name": name}
|
||||
requestData, _ := json.Marshal(values)
|
||||
|
||||
amURL := m.prepareAmChannelApiURL()
|
||||
req, err := http.NewRequest(http.MethodDelete, amURL, bytes.NewBuffer(requestData))
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in creating new delete request to alertmanager/v1/receivers", zap.Error(err))
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
req.Header.Add("Content-Type", contentType)
|
||||
|
||||
client := &http.Client{}
|
||||
response, err := client.Do(req)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in getting response of API call to alertmanager", zap.String("url", amURL), zap.Error(err))
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
if response.StatusCode > 299 {
|
||||
err := fmt.Errorf(fmt.Sprintf("Error in getting 2xx response in PUT API call to alertmanager(DELETE %s)\n", amURL), response.Status)
|
||||
zap.L().Error("Error in getting 2xx response in PUT API call to alertmanager", zap.String("url", amURL), zap.String("status", response.Status))
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *manager) TestReceiver(receiver *Receiver) *model.ApiError {
|
||||
|
||||
receiverBytes, _ := json.Marshal(receiver)
|
||||
|
||||
amTestURL := m.prepareTestApiURL()
|
||||
response, err := http.Post(amTestURL, contentType, bytes.NewBuffer(receiverBytes))
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in getting response of API call to alertmanager", zap.String("url", amTestURL), zap.Error(err))
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
if response.StatusCode > 201 && response.StatusCode < 400 {
|
||||
err := fmt.Errorf(fmt.Sprintf("Invalid parameters in test alert api for alertmanager(POST %s)\n", amTestURL), response.Status)
|
||||
zap.L().Error("Invalid parameters in test alert api for alertmanager", zap.Error(err))
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
if response.StatusCode > 400 {
|
||||
err := fmt.Errorf(fmt.Sprintf("Received Server Error response for API call to alertmanager(POST %s)\n", amTestURL), response.Status)
|
||||
zap.L().Error("Received Server Error response for API call to alertmanager", zap.Error(err))
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1,79 +0,0 @@
|
||||
package alertManager
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/utils/labels"
|
||||
)
|
||||
|
||||
// Receiver configuration provides configuration on how to contact a receiver.
|
||||
type Receiver struct {
|
||||
// A unique identifier for this receiver.
|
||||
Name string `yaml:"name" json:"name"`
|
||||
|
||||
EmailConfigs interface{} `yaml:"email_configs,omitempty" json:"email_configs,omitempty"`
|
||||
PagerdutyConfigs interface{} `yaml:"pagerduty_configs,omitempty" json:"pagerduty_configs,omitempty"`
|
||||
SlackConfigs interface{} `yaml:"slack_configs,omitempty" json:"slack_configs,omitempty"`
|
||||
WebhookConfigs interface{} `yaml:"webhook_configs,omitempty" json:"webhook_configs,omitempty"`
|
||||
OpsGenieConfigs interface{} `yaml:"opsgenie_configs,omitempty" json:"opsgenie_configs,omitempty"`
|
||||
WechatConfigs interface{} `yaml:"wechat_configs,omitempty" json:"wechat_configs,omitempty"`
|
||||
PushoverConfigs interface{} `yaml:"pushover_configs,omitempty" json:"pushover_configs,omitempty"`
|
||||
VictorOpsConfigs interface{} `yaml:"victorops_configs,omitempty" json:"victorops_configs,omitempty"`
|
||||
SNSConfigs interface{} `yaml:"sns_configs,omitempty" json:"sns_configs,omitempty"`
|
||||
MSTeamsConfigs interface{} `yaml:"msteams_configs,omitempty" json:"msteams_configs,omitempty"`
|
||||
}
|
||||
|
||||
type ReceiverResponse struct {
|
||||
Status string `json:"status"`
|
||||
Data Receiver `json:"data"`
|
||||
}
|
||||
|
||||
// Alert is a generic representation of an alert in the Prometheus eco-system.
|
||||
type Alert struct {
|
||||
// Label value pairs for purpose of aggregation, matching, and disposition
|
||||
// dispatching. This must minimally include an "alertname" label.
|
||||
Labels labels.BaseLabels `json:"labels"`
|
||||
|
||||
// Extra key/value information which does not define alert identity.
|
||||
Annotations labels.BaseLabels `json:"annotations"`
|
||||
|
||||
// The known time range for this alert. Both ends are optional.
|
||||
StartsAt time.Time `json:"startsAt,omitempty"`
|
||||
EndsAt time.Time `json:"endsAt,omitempty"`
|
||||
GeneratorURL string `json:"generatorURL,omitempty"`
|
||||
|
||||
Receivers []string `json:"receivers,omitempty"`
|
||||
}
|
||||
|
||||
// Name returns the name of the alert. It is equivalent to the "alertname" label.
|
||||
func (a *Alert) Name() string {
|
||||
return a.Labels.Get(labels.AlertNameLabel)
|
||||
}
|
||||
|
||||
// Hash returns a hash over the alert. It is equivalent to the alert labels hash.
|
||||
func (a *Alert) Hash() uint64 {
|
||||
return a.Labels.Hash()
|
||||
}
|
||||
|
||||
func (a *Alert) String() string {
|
||||
s := fmt.Sprintf("%s[%s][%s]", a.Name(), fmt.Sprintf("%016x", a.Hash())[:7], a.Receivers)
|
||||
if a.Resolved() {
|
||||
return s + "[resolved]"
|
||||
}
|
||||
return s + "[active]"
|
||||
}
|
||||
|
||||
// Resolved returns true iff the activity interval ended in the past.
|
||||
func (a *Alert) Resolved() bool {
|
||||
return a.ResolvedAt(time.Now())
|
||||
}
|
||||
|
||||
// ResolvedAt returns true off the activity interval ended before
|
||||
// the given timestamp.
|
||||
func (a *Alert) ResolvedAt(ts time.Time) bool {
|
||||
if a.EndsAt.IsZero() {
|
||||
return false
|
||||
}
|
||||
return !a.EndsAt.After(ts)
|
||||
}
|
||||
@@ -1,310 +0,0 @@
|
||||
package alertManager
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
old_ctx "golang.org/x/net/context"
|
||||
|
||||
"github.com/go-kit/log"
|
||||
"github.com/go-kit/log/level"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/net/context/ctxhttp"
|
||||
)
|
||||
|
||||
const (
|
||||
alertPushEndpoint = "v1/alerts"
|
||||
contentTypeJSON = "application/json"
|
||||
)
|
||||
|
||||
// Notifier is responsible for dispatching alert notifications to an
|
||||
// alert manager service.
|
||||
type Notifier struct {
|
||||
queue []*Alert
|
||||
opts *NotifierOptions
|
||||
|
||||
more chan struct{}
|
||||
mtx sync.RWMutex
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
|
||||
alertmanagers *alertmanagerSet
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// NotifierOptions are the configurable parameters of a Handler.
|
||||
type NotifierOptions struct {
|
||||
QueueCapacity int
|
||||
// Used for sending HTTP requests to the Alertmanager.
|
||||
Do func(ctx old_ctx.Context, client *http.Client, req *http.Request) (*http.Response, error)
|
||||
// List of alert manager urls
|
||||
AlertManagerURLs []string
|
||||
// timeout limit on requests
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
func (opts *NotifierOptions) String() string {
|
||||
var urls string
|
||||
for _, u := range opts.AlertManagerURLs {
|
||||
urls = fmt.Sprintf("%s %s", urls, u)
|
||||
}
|
||||
return urls
|
||||
}
|
||||
|
||||
// todo(amol): add metrics
|
||||
|
||||
func NewNotifier(o *NotifierOptions, logger log.Logger) (*Notifier, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
if o.Do == nil {
|
||||
o.Do = ctxhttp.Do
|
||||
}
|
||||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
}
|
||||
|
||||
n := &Notifier{
|
||||
queue: make([]*Alert, 0, o.QueueCapacity),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
more: make(chan struct{}, 1),
|
||||
opts: o,
|
||||
logger: logger,
|
||||
}
|
||||
timeout := o.Timeout
|
||||
|
||||
if int64(timeout) == 0 {
|
||||
timeout = time.Duration(30 * time.Second)
|
||||
}
|
||||
|
||||
amset, err := newAlertmanagerSet(o.AlertManagerURLs, timeout, logger)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to parse alert manager urls")
|
||||
return n, err
|
||||
}
|
||||
n.alertmanagers = amset
|
||||
zap.L().Info("Starting notifier with alert manager", zap.Strings("urls", o.AlertManagerURLs))
|
||||
return n, nil
|
||||
}
|
||||
|
||||
const maxBatchSize = 64
|
||||
|
||||
func (n *Notifier) queueLen() int {
|
||||
n.mtx.RLock()
|
||||
defer n.mtx.RUnlock()
|
||||
|
||||
return len(n.queue)
|
||||
}
|
||||
|
||||
func (n *Notifier) nextBatch() []*Alert {
|
||||
n.mtx.Lock()
|
||||
defer n.mtx.Unlock()
|
||||
|
||||
var alerts []*Alert
|
||||
|
||||
if len(n.queue) > maxBatchSize {
|
||||
alerts = append(make([]*Alert, 0, maxBatchSize), n.queue[:maxBatchSize]...)
|
||||
n.queue = n.queue[maxBatchSize:]
|
||||
} else {
|
||||
alerts = append(make([]*Alert, 0, len(n.queue)), n.queue...)
|
||||
n.queue = n.queue[:0]
|
||||
}
|
||||
|
||||
return alerts
|
||||
}
|
||||
|
||||
// Run dispatches notifications continuously.
|
||||
func (n *Notifier) Run() {
|
||||
zap.L().Info("msg: Initiating alert notifier...")
|
||||
for {
|
||||
select {
|
||||
case <-n.ctx.Done():
|
||||
return
|
||||
case <-n.more:
|
||||
}
|
||||
alerts := n.nextBatch()
|
||||
|
||||
if !n.sendAll(alerts...) {
|
||||
zap.L().Warn("msg: dropped alerts", zap.Int("count", len(alerts)))
|
||||
// n.metrics.dropped.Add(float64(len(alerts)))
|
||||
}
|
||||
// If the queue still has items left, kick off the next iteration.
|
||||
if n.queueLen() > 0 {
|
||||
n.setMore()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send queues the given notification requests for processing.
|
||||
// Panics if called on a handler that is not running.
|
||||
func (n *Notifier) Send(alerts ...*Alert) {
|
||||
n.mtx.Lock()
|
||||
defer n.mtx.Unlock()
|
||||
|
||||
// Queue capacity should be significantly larger than a single alert
|
||||
// batch could be.
|
||||
if d := len(alerts) - n.opts.QueueCapacity; d > 0 {
|
||||
alerts = alerts[d:]
|
||||
|
||||
level.Warn(n.logger).Log("msg", "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d)
|
||||
//n.metrics.dropped.Add(float64(d))
|
||||
}
|
||||
|
||||
// If the queue is full, remove the oldest alerts in favor
|
||||
// of newer ones.
|
||||
if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 {
|
||||
n.queue = n.queue[d:]
|
||||
|
||||
level.Warn(n.logger).Log("msg", "Alert notification queue full, dropping alerts", "num_dropped", d)
|
||||
//n.metrics.dropped.Add(float64(d))
|
||||
}
|
||||
n.queue = append(n.queue, alerts...)
|
||||
|
||||
// Notify sending goroutine that there are alerts to be processed.
|
||||
n.setMore()
|
||||
}
|
||||
|
||||
// setMore signals that the alert queue has items.
|
||||
func (n *Notifier) setMore() {
|
||||
// If we cannot send on the channel, it means the signal already exists
|
||||
// and has not been consumed yet.
|
||||
select {
|
||||
case n.more <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Alertmanagers returns a slice of Alertmanager URLs.
|
||||
func (n *Notifier) Alertmanagers() []*url.URL {
|
||||
n.mtx.RLock()
|
||||
amset := n.alertmanagers
|
||||
n.mtx.RUnlock()
|
||||
|
||||
var res []*url.URL
|
||||
|
||||
amset.mtx.RLock()
|
||||
for _, am := range amset.ams {
|
||||
res = append(res, am.URLPath(alertPushEndpoint))
|
||||
}
|
||||
amset.mtx.RUnlock()
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
// sendAll sends the alerts to all configured Alertmanagers concurrently.
|
||||
// It returns true if the alerts could be sent successfully to at least one Alertmanager.
|
||||
func (n *Notifier) sendAll(alerts ...*Alert) bool {
|
||||
|
||||
b, err := json.Marshal(alerts)
|
||||
if err != nil {
|
||||
zap.L().Error("Encoding alerts failed", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
n.mtx.RLock()
|
||||
ams := n.alertmanagers
|
||||
n.mtx.RUnlock()
|
||||
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
numSuccess uint64
|
||||
)
|
||||
|
||||
ams.mtx.RLock()
|
||||
|
||||
for _, am := range ams.ams {
|
||||
wg.Add(1)
|
||||
|
||||
ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.timeout))
|
||||
defer cancel()
|
||||
|
||||
go func(ams *alertmanagerSet, am Manager) {
|
||||
u := am.URLPath(alertPushEndpoint).String()
|
||||
if err := n.sendOne(ctx, ams.client, u, b); err != nil {
|
||||
zap.L().Error("Error calling alert API", zap.String("alertmanager", u), zap.Int("count", len(alerts)), zap.Error(err))
|
||||
} else {
|
||||
atomic.AddUint64(&numSuccess, 1)
|
||||
}
|
||||
// n.metrics.latency.WithLabelValues(u).Observe(time.Since(begin).Seconds())
|
||||
// n.metrics.sent.WithLabelValues(u).Add(float64(len(alerts)))
|
||||
|
||||
wg.Done()
|
||||
}(ams, am)
|
||||
}
|
||||
ams.mtx.RUnlock()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return numSuccess > 0
|
||||
}
|
||||
|
||||
func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error {
|
||||
req, err := http.NewRequest("POST", url, bytes.NewReader(b))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", contentTypeJSON)
|
||||
resp, err := n.opts.Do(ctx, c, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Any HTTP status 2xx is OK.
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return fmt.Errorf("bad response status %v", resp.Status)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Stop shuts down the notification handler.
|
||||
func (n *Notifier) Stop() {
|
||||
level.Info(n.logger).Log("msg", "Stopping notification manager...")
|
||||
n.cancel()
|
||||
}
|
||||
|
||||
// alertmanagerSet contains a set of Alertmanagers discovered via a group of service
|
||||
// discovery definitions that have a common configuration on how alerts should be sent.
|
||||
type alertmanagerSet struct {
|
||||
urls []string
|
||||
client *http.Client
|
||||
timeout time.Duration
|
||||
mtx sync.RWMutex
|
||||
ams []Manager
|
||||
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func newAlertmanagerSet(urls []string, timeout time.Duration, logger log.Logger) (*alertmanagerSet, error) {
|
||||
client := &http.Client{}
|
||||
|
||||
s := &alertmanagerSet{
|
||||
client: client,
|
||||
urls: urls,
|
||||
logger: logger,
|
||||
timeout: timeout,
|
||||
}
|
||||
|
||||
ams := []Manager{}
|
||||
for _, u := range urls {
|
||||
am, err := New(WithURL(u))
|
||||
if err != nil {
|
||||
level.Error(s.logger).Log(fmt.Sprintf("invalid alert manager url %s: %s", u, err))
|
||||
} else {
|
||||
ams = append(ams, am)
|
||||
}
|
||||
}
|
||||
if len(ams) == 0 {
|
||||
return s, fmt.Errorf("no alert managers")
|
||||
}
|
||||
s.ams = ams
|
||||
return s, nil
|
||||
}
|
||||
@@ -4,8 +4,6 @@ import (
|
||||
"context"
|
||||
"flag"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
prommodel "github.com/prometheus/common/model"
|
||||
@@ -142,22 +140,20 @@ func main() {
|
||||
logger.Fatal("Failed to initialize auth cache", zap.Error(err))
|
||||
}
|
||||
|
||||
signalsChannel := make(chan os.Signal, 1)
|
||||
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
|
||||
signoz.Start(context.Background())
|
||||
|
||||
for {
|
||||
select {
|
||||
case status := <-server.HealthCheckStatus():
|
||||
logger.Info("Received HealthCheck status: ", zap.Int("status", int(status)))
|
||||
case <-signalsChannel:
|
||||
logger.Info("Received OS Interrupt Signal ... ")
|
||||
err := server.Stop()
|
||||
if err != nil {
|
||||
logger.Fatal("Failed to stop server", zap.Error(err))
|
||||
}
|
||||
logger.Info("Server stopped")
|
||||
return
|
||||
}
|
||||
if err := signoz.Wait(context.Background()); err != nil {
|
||||
zap.L().Fatal("Failed to start signoz", zap.Error(err))
|
||||
}
|
||||
|
||||
err = server.Stop()
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to stop server", zap.Error(err))
|
||||
}
|
||||
|
||||
err = signoz.Stop(context.Background())
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to stop signoz", zap.Error(err))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -80,7 +80,6 @@ func parsePostableRule(content []byte, kind RuleDataKind) (*PostableRule, error)
|
||||
// parseIntoRule loads the content (data) into PostableRule and also
|
||||
// validates the end result
|
||||
func parseIntoRule(initRule PostableRule, content []byte, kind RuleDataKind) (*PostableRule, error) {
|
||||
|
||||
rule := &initRule
|
||||
|
||||
var err error
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
qslabels "go.signoz.io/signoz/pkg/query-service/utils/labels"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@@ -78,6 +79,8 @@ type BaseRule struct {
|
||||
// querying the v4 table on low cardinal temporality column
|
||||
// should be fast but we can still avoid the query if we have the data in memory
|
||||
TemporalityMap map[string]map[v3.Temporality]bool
|
||||
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
type RuleOption func(*BaseRule)
|
||||
@@ -106,6 +109,12 @@ func WithLogger(logger *zap.Logger) RuleOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithSQLStore(sqlstore sqlstore.SQLStore) RuleOption {
|
||||
return func(r *BaseRule) {
|
||||
r.sqlstore = sqlstore
|
||||
}
|
||||
}
|
||||
|
||||
func NewBaseRule(id string, p *PostableRule, reader interfaces.Reader, opts ...RuleOption) (*BaseRule, error) {
|
||||
if p.RuleCondition == nil || !p.RuleCondition.IsValid() {
|
||||
return nil, fmt.Errorf("invalid rule condition")
|
||||
@@ -309,6 +318,20 @@ func (r *BaseRule) ActiveAlerts() []*Alert {
|
||||
}
|
||||
|
||||
func (r *BaseRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
|
||||
var orgID string
|
||||
err := r.
|
||||
sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Table("organizations").
|
||||
ColumnExpr("id").
|
||||
Limit(1).
|
||||
Scan(ctx, &orgID)
|
||||
if err != nil {
|
||||
r.logger.Error("failed to get org ids", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
alerts := []*Alert{}
|
||||
r.ForEachActiveAlert(func(alert *Alert) {
|
||||
if alert.needsSending(ts, resendDelay) {
|
||||
@@ -322,7 +345,7 @@ func (r *BaseRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay tim
|
||||
alerts = append(alerts, &anew)
|
||||
}
|
||||
})
|
||||
notifyFunc(ctx, "", alerts...)
|
||||
notifyFunc(ctx, orgID, "", alerts...)
|
||||
}
|
||||
|
||||
func (r *BaseRule) ForEachActiveAlert(f func(*Alert)) {
|
||||
|
||||
@@ -11,30 +11,24 @@ import (
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
|
||||
"github.com/uptrace/bun"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/types/authtypes"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Data store to capture user alert rule settings
|
||||
type RuleDB interface {
|
||||
GetChannel(id string) (*model.ChannelItem, *model.ApiError)
|
||||
GetChannels() (*[]model.ChannelItem, *model.ApiError)
|
||||
DeleteChannel(id string) *model.ApiError
|
||||
CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError)
|
||||
EditChannel(receiver *am.Receiver, id string) (*am.Receiver, *model.ApiError)
|
||||
|
||||
// CreateRuleTx stores rule in the db and returns tx and group name (on success)
|
||||
CreateRuleTx(ctx context.Context, rule string) (int64, Tx, error)
|
||||
// CreateRule stores rule in the db and returns tx and group name (on success)
|
||||
CreateRule(context.Context, *StoredRule, func(context.Context, int64) error) (int64, error)
|
||||
|
||||
// EditRuleTx updates the given rule in the db and returns tx and group name (on success)
|
||||
EditRuleTx(ctx context.Context, rule string, id string) (string, Tx, error)
|
||||
EditRule(context.Context, *StoredRule, func(context.Context) error) error
|
||||
|
||||
// DeleteRuleTx deletes the given rule in the db and returns tx and group name (on success)
|
||||
DeleteRuleTx(ctx context.Context, id string) (string, Tx, error)
|
||||
DeleteRule(context.Context, string, func(context.Context) error) error
|
||||
|
||||
// GetStoredRules fetches the rule definitions from db
|
||||
GetStoredRules(ctx context.Context) ([]StoredRule, error)
|
||||
@@ -62,142 +56,83 @@ type RuleDB interface {
|
||||
}
|
||||
|
||||
type StoredRule struct {
|
||||
Id int `json:"id" db:"id"`
|
||||
CreatedAt *time.Time `json:"created_at" db:"created_at"`
|
||||
CreatedBy *string `json:"created_by" db:"created_by"`
|
||||
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
|
||||
UpdatedBy *string `json:"updated_by" db:"updated_by"`
|
||||
Data string `json:"data" db:"data"`
|
||||
}
|
||||
bun.BaseModel `bun:"rules"`
|
||||
|
||||
type Tx interface {
|
||||
Commit() error
|
||||
Rollback() error
|
||||
Id int `json:"id" db:"id" bun:"id,pk,autoincrement"`
|
||||
CreatedAt *time.Time `json:"created_at" db:"created_at" bun:"created_at"`
|
||||
CreatedBy *string `json:"created_by" db:"created_by" bun:"created_by"`
|
||||
UpdatedAt *time.Time `json:"updated_at" db:"updated_at" bun:"updated_at"`
|
||||
UpdatedBy *string `json:"updated_by" db:"updated_by" bun:"updated_by"`
|
||||
Data string `json:"data" db:"data" bun:"data"`
|
||||
}
|
||||
|
||||
type ruleDB struct {
|
||||
*sqlx.DB
|
||||
alertManager am.Manager
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
// todo: move init methods for creating tables
|
||||
|
||||
func NewRuleDB(db *sqlx.DB, alertManager am.Manager) RuleDB {
|
||||
return &ruleDB{
|
||||
db,
|
||||
alertManager,
|
||||
}
|
||||
func NewRuleDB(db *sqlx.DB, sqlstore sqlstore.SQLStore) RuleDB {
|
||||
return &ruleDB{db, sqlstore}
|
||||
}
|
||||
|
||||
// CreateRuleTx stores a given rule in db and returns task name,
|
||||
// sql tx and error (if any)
|
||||
func (r *ruleDB) CreateRuleTx(ctx context.Context, rule string) (int64, Tx, error) {
|
||||
var lastInsertId int64
|
||||
// CreateRule stores a given rule in db and returns task name and error (if any)
|
||||
func (r *ruleDB) CreateRule(ctx context.Context, storedRule *StoredRule, cb func(context.Context, int64) error) (int64, error) {
|
||||
err := r.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
|
||||
_, err := r.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewInsert().
|
||||
Model(storedRule).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return cb(ctx, int64(storedRule.Id))
|
||||
})
|
||||
|
||||
var userEmail string
|
||||
if user := common.GetUserFromContext(ctx); user != nil {
|
||||
userEmail = user.Email
|
||||
}
|
||||
createdAt := time.Now()
|
||||
updatedAt := time.Now()
|
||||
tx, err := r.Begin()
|
||||
if err != nil {
|
||||
return lastInsertId, nil, err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`INSERT into rules (created_at, created_by, updated_at, updated_by, data) VALUES($1,$2,$3,$4,$5);`)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in preparing statement for INSERT to rules", zap.Error(err))
|
||||
tx.Rollback()
|
||||
return lastInsertId, nil, err
|
||||
}
|
||||
|
||||
defer stmt.Close()
|
||||
|
||||
result, err := stmt.Exec(createdAt, userEmail, updatedAt, userEmail, rule)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in Executing prepared statement for INSERT to rules", zap.Error(err))
|
||||
tx.Rollback() // return an error too, we may want to wrap them
|
||||
return lastInsertId, nil, err
|
||||
}
|
||||
|
||||
lastInsertId, err = result.LastInsertId()
|
||||
if err != nil {
|
||||
zap.L().Error("Error in getting last insert id for INSERT to rules\n", zap.Error(err))
|
||||
tx.Rollback() // return an error too, we may want to wrap them
|
||||
return lastInsertId, nil, err
|
||||
}
|
||||
|
||||
return lastInsertId, tx, nil
|
||||
return int64(storedRule.Id), nil
|
||||
}
|
||||
|
||||
// EditRuleTx stores a given rule string in database and returns
|
||||
// task name, sql tx and error (if any)
|
||||
func (r *ruleDB) EditRuleTx(ctx context.Context, rule string, id string) (string, Tx, error) {
|
||||
// EditRule stores a given rule string in database and returns task name and error (if any)
|
||||
func (r *ruleDB) EditRule(ctx context.Context, storedRule *StoredRule, cb func(context.Context) error) error {
|
||||
return r.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
|
||||
_, err := r.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewUpdate().
|
||||
Model(storedRule).
|
||||
WherePK().
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var groupName string
|
||||
idInt, _ := strconv.Atoi(id)
|
||||
if idInt == 0 {
|
||||
return groupName, nil, fmt.Errorf("failed to read alert id from parameters")
|
||||
}
|
||||
|
||||
var userEmail string
|
||||
if user := common.GetUserFromContext(ctx); user != nil {
|
||||
userEmail = user.Email
|
||||
}
|
||||
updatedAt := time.Now()
|
||||
groupName = prepareTaskName(int64(idInt))
|
||||
|
||||
// todo(amol): resolve this error - database locked when using
|
||||
// edit transaction with sqlx
|
||||
// tx, err := r.Begin()
|
||||
//if err != nil {
|
||||
// return groupName, tx, err
|
||||
//}
|
||||
stmt, err := r.Prepare(`UPDATE rules SET updated_by=$1, updated_at=$2, data=$3 WHERE id=$4;`)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in preparing statement for UPDATE to rules", zap.Error(err))
|
||||
// tx.Rollback()
|
||||
return groupName, nil, err
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
if _, err := stmt.Exec(userEmail, updatedAt, rule, idInt); err != nil {
|
||||
zap.L().Error("Error in Executing prepared statement for UPDATE to rules", zap.Error(err))
|
||||
// tx.Rollback() // return an error too, we may want to wrap them
|
||||
return groupName, nil, err
|
||||
}
|
||||
return groupName, nil, nil
|
||||
return cb(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteRuleTx deletes a given rule with id and returns
|
||||
// taskname, sql tx and error (if any)
|
||||
func (r *ruleDB) DeleteRuleTx(ctx context.Context, id string) (string, Tx, error) {
|
||||
// DeleteRule deletes a given rule with id and returns taskname and error (if any)
|
||||
func (r *ruleDB) DeleteRule(ctx context.Context, id string, cb func(context.Context) error) error {
|
||||
if err := r.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
|
||||
_, err := r.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewDelete().
|
||||
Model(&StoredRule{}).
|
||||
Where("id = ?", id).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
idInt, _ := strconv.Atoi(id)
|
||||
groupName := prepareTaskName(int64(idInt))
|
||||
|
||||
// commented as this causes db locked error
|
||||
// tx, err := r.Begin()
|
||||
// if err != nil {
|
||||
// return groupName, tx, err
|
||||
// }
|
||||
|
||||
stmt, err := r.Prepare(`DELETE FROM rules WHERE id=$1;`)
|
||||
|
||||
if err != nil {
|
||||
return groupName, nil, err
|
||||
return cb(ctx)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer stmt.Close()
|
||||
|
||||
if _, err := stmt.Exec(idInt); err != nil {
|
||||
zap.L().Error("Error in Executing prepared statement for DELETE to rules", zap.Error(err))
|
||||
// tx.Rollback()
|
||||
return groupName, nil, err
|
||||
}
|
||||
|
||||
return groupName, nil, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ruleDB) GetStoredRules(ctx context.Context) ([]StoredRule, error) {
|
||||
@@ -320,114 +255,7 @@ func (r *ruleDB) EditPlannedMaintenance(ctx context.Context, maintenance Planned
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func getChannelType(receiver *am.Receiver) string {
|
||||
|
||||
if receiver.EmailConfigs != nil {
|
||||
return "email"
|
||||
}
|
||||
if receiver.OpsGenieConfigs != nil {
|
||||
return "opsgenie"
|
||||
}
|
||||
if receiver.PagerdutyConfigs != nil {
|
||||
return "pagerduty"
|
||||
}
|
||||
if receiver.PushoverConfigs != nil {
|
||||
return "pushover"
|
||||
}
|
||||
if receiver.SNSConfigs != nil {
|
||||
return "sns"
|
||||
}
|
||||
if receiver.SlackConfigs != nil {
|
||||
return "slack"
|
||||
}
|
||||
if receiver.VictorOpsConfigs != nil {
|
||||
return "victorops"
|
||||
}
|
||||
if receiver.WebhookConfigs != nil {
|
||||
return "webhook"
|
||||
}
|
||||
if receiver.WechatConfigs != nil {
|
||||
return "wechat"
|
||||
}
|
||||
if receiver.MSTeamsConfigs != nil {
|
||||
return "msteams"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (r *ruleDB) GetChannel(id string) (*model.ChannelItem, *model.ApiError) {
|
||||
|
||||
idInt, _ := strconv.Atoi(id)
|
||||
channel := model.ChannelItem{}
|
||||
|
||||
query := "SELECT id, created_at, updated_at, name, type, data data FROM notification_channels WHERE id=?;"
|
||||
|
||||
stmt, err := r.Preparex(query)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in preparing sql query for GetChannel", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
err = stmt.Get(&channel, idInt)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in getting channel with id", zap.Int("id", idInt), zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
return &channel, nil
|
||||
}
|
||||
|
||||
func (r *ruleDB) DeleteChannel(id string) *model.ApiError {
|
||||
|
||||
idInt, _ := strconv.Atoi(id)
|
||||
|
||||
channelToDelete, apiErrorObj := r.GetChannel(id)
|
||||
|
||||
if apiErrorObj != nil {
|
||||
return apiErrorObj
|
||||
}
|
||||
|
||||
tx, err := r.Begin()
|
||||
if err != nil {
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
{
|
||||
stmt, err := tx.Prepare(`DELETE FROM notification_channels WHERE id=$1;`)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in preparing statement for INSERT to notification_channels", zap.Error(err))
|
||||
tx.Rollback()
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
if _, err := stmt.Exec(idInt); err != nil {
|
||||
zap.L().Error("Error in Executing prepared statement for INSERT to notification_channels", zap.Error(err))
|
||||
tx.Rollback() // return an error too, we may want to wrap them
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
}
|
||||
|
||||
apiError := r.alertManager.DeleteRoute(channelToDelete.Name)
|
||||
if apiError != nil {
|
||||
tx.Rollback()
|
||||
return apiError
|
||||
}
|
||||
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
zap.L().Error("Error in committing transaction for DELETE command to notification_channels", zap.Error(err))
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (r *ruleDB) GetChannels() (*[]model.ChannelItem, *model.ApiError) {
|
||||
|
||||
func (r *ruleDB) getChannels() (*[]model.ChannelItem, *model.ApiError) {
|
||||
channels := []model.ChannelItem{}
|
||||
|
||||
query := "SELECT id, created_at, updated_at, name, type, data data FROM notification_channels"
|
||||
@@ -442,105 +270,6 @@ func (r *ruleDB) GetChannels() (*[]model.ChannelItem, *model.ApiError) {
|
||||
}
|
||||
|
||||
return &channels, nil
|
||||
|
||||
}
|
||||
|
||||
func (r *ruleDB) EditChannel(receiver *am.Receiver, id string) (*am.Receiver, *model.ApiError) {
|
||||
|
||||
idInt, _ := strconv.Atoi(id)
|
||||
|
||||
channel, apiErrObj := r.GetChannel(id)
|
||||
|
||||
if apiErrObj != nil {
|
||||
return nil, apiErrObj
|
||||
}
|
||||
if channel.Name != receiver.Name {
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("channel name cannot be changed")}
|
||||
}
|
||||
|
||||
tx, err := r.Begin()
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
channel_type := getChannelType(receiver)
|
||||
|
||||
receiverString, _ := json.Marshal(receiver)
|
||||
|
||||
{
|
||||
stmt, err := tx.Prepare(`UPDATE notification_channels SET updated_at=$1, type=$2, data=$3 WHERE id=$4;`)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in preparing statement for UPDATE to notification_channels", zap.Error(err))
|
||||
tx.Rollback()
|
||||
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
if _, err := stmt.Exec(time.Now(), channel_type, string(receiverString), idInt); err != nil {
|
||||
zap.L().Error("Error in Executing prepared statement for UPDATE to notification_channels", zap.Error(err))
|
||||
tx.Rollback() // return an error too, we may want to wrap them
|
||||
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
}
|
||||
|
||||
apiError := r.alertManager.EditRoute(receiver)
|
||||
if apiError != nil {
|
||||
tx.Rollback()
|
||||
return nil, apiError
|
||||
}
|
||||
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
zap.L().Error("Error in committing transaction for INSERT to notification_channels", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
return receiver, nil
|
||||
|
||||
}
|
||||
|
||||
func (r *ruleDB) CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError) {
|
||||
|
||||
channel_type := getChannelType(receiver)
|
||||
|
||||
receiverString, _ := json.Marshal(receiver)
|
||||
|
||||
tx, err := r.Begin()
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
{
|
||||
stmt, err := tx.Prepare(`INSERT INTO notification_channels (created_at, updated_at, name, type, data) VALUES($1,$2,$3,$4,$5);`)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in preparing statement for INSERT to notification_channels", zap.Error(err))
|
||||
tx.Rollback()
|
||||
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
if _, err := stmt.Exec(time.Now(), time.Now(), receiver.Name, channel_type, string(receiverString)); err != nil {
|
||||
zap.L().Error("Error in Executing prepared statement for INSERT to notification_channels", zap.Error(err))
|
||||
tx.Rollback() // return an error too, we may want to wrap them
|
||||
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
}
|
||||
|
||||
apiError := r.alertManager.AddRoute(receiver)
|
||||
if apiError != nil {
|
||||
tx.Rollback()
|
||||
return nil, apiError
|
||||
}
|
||||
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
zap.L().Error("Error in committing transaction for INSERT to notification_channels", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
|
||||
return receiver, nil
|
||||
|
||||
}
|
||||
|
||||
func (r *ruleDB) GetAlertsInfo(ctx context.Context) (*model.AlertsInfo, error) {
|
||||
@@ -629,7 +358,7 @@ func (r *ruleDB) GetAlertsInfo(ctx context.Context) (*model.AlertsInfo, error) {
|
||||
}
|
||||
alertsInfo.AlertNames = alertNames
|
||||
|
||||
channels, _ := r.GetChannels()
|
||||
channels, _ := r.getChannels()
|
||||
if channels != nil {
|
||||
alertsInfo.TotalChannels = len(*channels)
|
||||
for _, channel := range *channels {
|
||||
|
||||
@@ -14,41 +14,45 @@ import (
|
||||
|
||||
"errors"
|
||||
|
||||
"github.com/go-openapi/strfmt"
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/query-service/cache"
|
||||
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
|
||||
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
|
||||
"go.signoz.io/signoz/pkg/query-service/telemetry"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
"go.signoz.io/signoz/pkg/types/authtypes"
|
||||
)
|
||||
|
||||
type PrepareTaskOptions struct {
|
||||
Rule *PostableRule
|
||||
TaskName string
|
||||
RuleDB RuleDB
|
||||
Logger *zap.Logger
|
||||
Reader interfaces.Reader
|
||||
Cache cache.Cache
|
||||
FF interfaces.FeatureLookup
|
||||
ManagerOpts *ManagerOptions
|
||||
NotifyFunc NotifyFunc
|
||||
|
||||
Rule *PostableRule
|
||||
TaskName string
|
||||
RuleDB RuleDB
|
||||
Logger *zap.Logger
|
||||
Reader interfaces.Reader
|
||||
Cache cache.Cache
|
||||
FF interfaces.FeatureLookup
|
||||
ManagerOpts *ManagerOptions
|
||||
NotifyFunc NotifyFunc
|
||||
SQLStore sqlstore.SQLStore
|
||||
UseLogsNewSchema bool
|
||||
UseTraceNewSchema bool
|
||||
}
|
||||
|
||||
type PrepareTestRuleOptions struct {
|
||||
Rule *PostableRule
|
||||
RuleDB RuleDB
|
||||
Logger *zap.Logger
|
||||
Reader interfaces.Reader
|
||||
Cache cache.Cache
|
||||
FF interfaces.FeatureLookup
|
||||
ManagerOpts *ManagerOptions
|
||||
NotifyFunc NotifyFunc
|
||||
|
||||
Rule *PostableRule
|
||||
RuleDB RuleDB
|
||||
Logger *zap.Logger
|
||||
Reader interfaces.Reader
|
||||
Cache cache.Cache
|
||||
FF interfaces.FeatureLookup
|
||||
ManagerOpts *ManagerOptions
|
||||
NotifyFunc NotifyFunc
|
||||
SQLStore sqlstore.SQLStore
|
||||
UseLogsNewSchema bool
|
||||
UseTraceNewSchema bool
|
||||
}
|
||||
@@ -72,8 +76,7 @@ func prepareTaskName(ruleId interface{}) string {
|
||||
|
||||
// ManagerOptions bundles options for the Manager.
|
||||
type ManagerOptions struct {
|
||||
NotifierOpts am.NotifierOptions
|
||||
PqlEngine *pqle.PqlEngine
|
||||
PqlEngine *pqle.PqlEngine
|
||||
|
||||
// RepoURL is used to generate a backlink in sent alert messages
|
||||
RepoURL string
|
||||
@@ -96,6 +99,8 @@ type ManagerOptions struct {
|
||||
UseLogsNewSchema bool
|
||||
UseTraceNewSchema bool
|
||||
PrepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
|
||||
Alertmanager alertmanager.Alertmanager
|
||||
SQLStore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
// The Manager manages recording and alerting rules.
|
||||
@@ -105,9 +110,6 @@ type Manager struct {
|
||||
rules map[string]Rule
|
||||
mtx sync.RWMutex
|
||||
block chan struct{}
|
||||
// Notifier sends messages through alert manager
|
||||
notifier *am.Notifier
|
||||
|
||||
// datastore to store alert definitions
|
||||
ruleDB RuleDB
|
||||
|
||||
@@ -121,15 +123,12 @@ type Manager struct {
|
||||
|
||||
UseLogsNewSchema bool
|
||||
UseTraceNewSchema bool
|
||||
|
||||
alertmanager alertmanager.Alertmanager
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func defaultOptions(o *ManagerOptions) *ManagerOptions {
|
||||
if o.NotifierOpts.QueueCapacity == 0 {
|
||||
o.NotifierOpts.QueueCapacity = 10000
|
||||
}
|
||||
if o.NotifierOpts.Timeout == 0 {
|
||||
o.NotifierOpts.Timeout = 10 * time.Second
|
||||
}
|
||||
if o.ResendDelay == time.Duration(0) {
|
||||
o.ResendDelay = 1 * time.Minute
|
||||
}
|
||||
@@ -161,6 +160,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
opts.UseLogsNewSchema,
|
||||
opts.UseTraceNewSchema,
|
||||
WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
||||
WithSQLStore(opts.SQLStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -181,6 +181,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
opts.Logger,
|
||||
opts.Reader,
|
||||
opts.ManagerOpts.PqlEngine,
|
||||
WithSQLStore(opts.SQLStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -202,30 +203,12 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
// NewManager returns an implementation of Manager, ready to be started
|
||||
// by calling the Run method.
|
||||
func NewManager(o *ManagerOptions) (*Manager, error) {
|
||||
|
||||
o = defaultOptions(o)
|
||||
// here we just initiate notifier, it will be started
|
||||
// in run()
|
||||
notifier, err := am.NewNotifier(&o.NotifierOpts, nil)
|
||||
if err != nil {
|
||||
// todo(amol): rethink on this, the query service
|
||||
// should not be down because alert manager is not available
|
||||
return nil, err
|
||||
}
|
||||
|
||||
amManager, err := am.New()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db := NewRuleDB(o.DBConn, amManager)
|
||||
|
||||
db := NewRuleDB(o.DBConn, o.SQLStore)
|
||||
telemetry.GetInstance().SetAlertsInfoCallback(db.GetAlertsInfo)
|
||||
|
||||
m := &Manager{
|
||||
tasks: map[string]Task{},
|
||||
rules: map[string]Rule{},
|
||||
notifier: notifier,
|
||||
ruleDB: db,
|
||||
opts: o,
|
||||
block: make(chan struct{}),
|
||||
@@ -235,7 +218,10 @@ func NewManager(o *ManagerOptions) (*Manager, error) {
|
||||
cache: o.Cache,
|
||||
prepareTaskFunc: o.PrepareTaskFunc,
|
||||
prepareTestRuleFunc: o.PrepareTestRuleFunc,
|
||||
alertmanager: o.Alertmanager,
|
||||
sqlstore: o.SQLStore,
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
@@ -309,9 +295,6 @@ func (m *Manager) initiate() error {
|
||||
|
||||
// Run starts processing of the rule manager.
|
||||
func (m *Manager) run() {
|
||||
// initiate notifier
|
||||
go m.notifier.Run()
|
||||
|
||||
// initiate blocked tasks
|
||||
close(m.block)
|
||||
}
|
||||
@@ -333,26 +316,51 @@ func (m *Manager) Stop() {
|
||||
// EditRuleDefinition writes the rule definition to the
|
||||
// datastore and also updates the rule executor
|
||||
func (m *Manager) EditRule(ctx context.Context, ruleStr string, id string) error {
|
||||
claims, ok := authtypes.ClaimsFromContext(ctx)
|
||||
if !ok {
|
||||
return errors.New("claims not found in context")
|
||||
}
|
||||
|
||||
parsedRule, err := ParsePostableRule([]byte(ruleStr))
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
taskName, _, err := m.ruleDB.EditRuleTx(ctx, ruleStr, id)
|
||||
existingRule, err := m.ruleDB.GetStoredRule(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !m.opts.DisableRules {
|
||||
err = m.syncRuleStateWithTask(taskName, parsedRule)
|
||||
now := time.Now()
|
||||
existingRule.UpdatedAt = &now
|
||||
existingRule.UpdatedBy = &claims.Email
|
||||
existingRule.Data = ruleStr
|
||||
|
||||
return m.ruleDB.EditRule(ctx, existingRule, func(ctx context.Context) error {
|
||||
cfg, err := m.alertmanager.GetConfig(ctx, claims.OrgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
err = cfg.UpdateRuleIDMatcher(id, parsedRule.PreferredChannels)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = m.alertmanager.SetConfig(ctx, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !m.opts.DisableRules {
|
||||
err = m.syncRuleStateWithTask(prepareTaskName(existingRule.Id), parsedRule)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Manager) editTask(rule *PostableRule, taskName string) error {
|
||||
@@ -371,6 +379,7 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error {
|
||||
FF: m.featureFlags,
|
||||
ManagerOpts: m.opts,
|
||||
NotifyFunc: m.prepareNotifyFunc(),
|
||||
SQLStore: m.sqlstore,
|
||||
|
||||
UseLogsNewSchema: m.opts.UseLogsNewSchema,
|
||||
UseTraceNewSchema: m.opts.UseTraceNewSchema,
|
||||
@@ -411,24 +420,45 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error {
|
||||
}
|
||||
|
||||
func (m *Manager) DeleteRule(ctx context.Context, id string) error {
|
||||
|
||||
idInt, err := strconv.Atoi(id)
|
||||
_, err := strconv.Atoi(id)
|
||||
if err != nil {
|
||||
zap.L().Error("delete rule received an rule id in invalid format, must be a number", zap.String("id", id), zap.Error(err))
|
||||
return fmt.Errorf("delete rule received an rule id in invalid format, must be a number")
|
||||
}
|
||||
|
||||
taskName := prepareTaskName(int64(idInt))
|
||||
if !m.opts.DisableRules {
|
||||
m.deleteTask(taskName)
|
||||
claims, ok := authtypes.ClaimsFromContext(ctx)
|
||||
if !ok {
|
||||
return errors.New("claims not found in context")
|
||||
}
|
||||
|
||||
if _, _, err := m.ruleDB.DeleteRuleTx(ctx, id); err != nil {
|
||||
zap.L().Error("failed to delete the rule from rule db", zap.String("id", id), zap.Error(err))
|
||||
_, err = m.ruleDB.GetStoredRule(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return m.ruleDB.DeleteRule(ctx, id, func(ctx context.Context) error {
|
||||
cfg, err := m.alertmanager.GetConfig(ctx, claims.OrgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = cfg.DeleteRuleIDMatcher(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = m.alertmanager.SetConfig(ctx, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
taskName := prepareTaskName(id)
|
||||
if !m.opts.DisableRules {
|
||||
m.deleteTask(taskName)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Manager) deleteTask(taskName string) {
|
||||
@@ -451,32 +481,57 @@ func (m *Manager) deleteTask(taskName string) {
|
||||
// starts an executor for the rule
|
||||
func (m *Manager) CreateRule(ctx context.Context, ruleStr string) (*GettableRule, error) {
|
||||
parsedRule, err := ParsePostableRule([]byte(ruleStr))
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lastInsertId, tx, err := m.ruleDB.CreateRuleTx(ctx, ruleStr)
|
||||
taskName := prepareTaskName(lastInsertId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
claims, ok := authtypes.ClaimsFromContext(ctx)
|
||||
if !ok {
|
||||
return nil, errors.New("claims not found in context")
|
||||
}
|
||||
if !m.opts.DisableRules {
|
||||
if err := m.addTask(parsedRule, taskName); err != nil {
|
||||
tx.Rollback()
|
||||
return nil, err
|
||||
|
||||
now := time.Now()
|
||||
storedRule := &StoredRule{
|
||||
CreatedAt: &now,
|
||||
CreatedBy: &claims.Email,
|
||||
UpdatedAt: &now,
|
||||
UpdatedBy: &claims.Email,
|
||||
Data: ruleStr,
|
||||
}
|
||||
|
||||
id, err := m.ruleDB.CreateRule(ctx, storedRule, func(ctx context.Context, id int64) error {
|
||||
cfg, err := m.alertmanager.GetConfig(ctx, claims.OrgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = tx.Commit()
|
||||
|
||||
err = cfg.CreateRuleIDMatcher(fmt.Sprintf("%d", id), parsedRule.PreferredChannels)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = m.alertmanager.SetConfig(ctx, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
taskName := prepareTaskName(id)
|
||||
if !m.opts.DisableRules {
|
||||
if err := m.addTask(parsedRule, taskName); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
gettableRule := &GettableRule{
|
||||
Id: fmt.Sprintf("%d", lastInsertId),
|
||||
return &GettableRule{
|
||||
Id: fmt.Sprintf("%d", id),
|
||||
PostableRule: *parsedRule,
|
||||
}
|
||||
return gettableRule, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *Manager) addTask(rule *PostableRule, taskName string) error {
|
||||
@@ -494,6 +549,7 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error {
|
||||
FF: m.featureFlags,
|
||||
ManagerOpts: m.opts,
|
||||
NotifyFunc: m.prepareNotifyFunc(),
|
||||
SQLStore: m.sqlstore,
|
||||
|
||||
UseLogsNewSchema: m.opts.UseLogsNewSchema,
|
||||
UseTraceNewSchema: m.opts.UseTraceNewSchema,
|
||||
@@ -594,12 +650,12 @@ func (m *Manager) TriggeredAlerts() []*NamedAlert {
|
||||
}
|
||||
|
||||
// NotifyFunc sends notifications about a set of alerts generated by the given expression.
|
||||
type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)
|
||||
type NotifyFunc func(ctx context.Context, orgID string, expr string, alerts ...*Alert)
|
||||
|
||||
// prepareNotifyFunc implements the NotifyFunc for a Notifier.
|
||||
func (m *Manager) prepareNotifyFunc() NotifyFunc {
|
||||
return func(ctx context.Context, expr string, alerts ...*Alert) {
|
||||
var res []*am.Alert
|
||||
return func(ctx context.Context, orgID string, expr string, alerts ...*Alert) {
|
||||
var res []*alertmanagertypes.PostableAlert
|
||||
|
||||
for _, alert := range alerts {
|
||||
generatorURL := alert.GeneratorURL
|
||||
@@ -607,23 +663,51 @@ func (m *Manager) prepareNotifyFunc() NotifyFunc {
|
||||
generatorURL = m.opts.RepoURL
|
||||
}
|
||||
|
||||
a := &am.Alert{
|
||||
StartsAt: alert.FiredAt,
|
||||
Labels: alert.Labels,
|
||||
Annotations: alert.Annotations,
|
||||
GeneratorURL: generatorURL,
|
||||
Receivers: alert.Receivers,
|
||||
a := &alertmanagertypes.PostableAlert{
|
||||
Annotations: alert.Annotations.Map(),
|
||||
StartsAt: strfmt.DateTime(alert.FiredAt),
|
||||
Alert: alertmanagertypes.AlertModel{
|
||||
Labels: alert.Labels.Map(),
|
||||
GeneratorURL: strfmt.URI(generatorURL),
|
||||
},
|
||||
}
|
||||
if !alert.ResolvedAt.IsZero() {
|
||||
a.EndsAt = alert.ResolvedAt
|
||||
a.EndsAt = strfmt.DateTime(alert.ResolvedAt)
|
||||
} else {
|
||||
a.EndsAt = alert.ValidUntil
|
||||
a.EndsAt = strfmt.DateTime(alert.ValidUntil)
|
||||
}
|
||||
|
||||
res = append(res, a)
|
||||
}
|
||||
|
||||
if len(alerts) > 0 {
|
||||
m.notifier.Send(res...)
|
||||
m.alertmanager.PutAlerts(ctx, orgID, res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) prepareTestNotifyFunc() NotifyFunc {
|
||||
return func(ctx context.Context, orgID string, expr string, alerts ...*Alert) {
|
||||
for _, alert := range alerts {
|
||||
generatorURL := alert.GeneratorURL
|
||||
if generatorURL == "" {
|
||||
generatorURL = m.opts.RepoURL
|
||||
}
|
||||
|
||||
a := &alertmanagertypes.PostableAlert{
|
||||
Annotations: alert.Annotations.Map(),
|
||||
StartsAt: strfmt.DateTime(alert.FiredAt),
|
||||
Alert: alertmanagertypes.AlertModel{
|
||||
Labels: alert.Labels.Map(),
|
||||
GeneratorURL: strfmt.URI(generatorURL),
|
||||
},
|
||||
}
|
||||
if !alert.ResolvedAt.IsZero() {
|
||||
a.EndsAt = strfmt.DateTime(alert.ResolvedAt)
|
||||
} else {
|
||||
a.EndsAt = strfmt.DateTime(alert.ValidUntil)
|
||||
}
|
||||
m.alertmanager.TestAlert(ctx, orgID, a, alert.Receivers)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -736,6 +820,10 @@ func (m *Manager) syncRuleStateWithTask(taskName string, rule *PostableRule) err
|
||||
// - re-deploy or undeploy task as necessary
|
||||
// - update the patched rule in the DB
|
||||
func (m *Manager) PatchRule(ctx context.Context, ruleStr string, ruleId string) (*GettableRule, error) {
|
||||
claims, ok := authtypes.ClaimsFromContext(ctx)
|
||||
if !ok {
|
||||
return nil, errors.New("claims not found in context")
|
||||
}
|
||||
|
||||
if ruleId == "" {
|
||||
return nil, fmt.Errorf("id is mandatory for patching rule")
|
||||
@@ -775,15 +863,19 @@ func (m *Manager) PatchRule(ctx context.Context, ruleStr string, ruleId string)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// write updated rule to db
|
||||
if _, _, err = m.ruleDB.EditRuleTx(ctx, string(patchedRuleBytes), ruleId); err != nil {
|
||||
// write failed, rollback task state
|
||||
now := time.Now()
|
||||
storedJSON.Data = string(patchedRuleBytes)
|
||||
storedJSON.UpdatedBy = &claims.Email
|
||||
storedJSON.UpdatedAt = &now
|
||||
|
||||
// restore task state from the stored rule
|
||||
err = m.ruleDB.EditRule(ctx, storedJSON, func(ctx context.Context) error {
|
||||
if err := m.syncRuleStateWithTask(taskName, &storedRule); err != nil {
|
||||
zap.L().Error("failed to restore rule after patch failure", zap.String("taskName", taskName), zap.Error(err))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -822,7 +914,8 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
|
||||
Cache: m.cache,
|
||||
FF: m.featureFlags,
|
||||
ManagerOpts: m.opts,
|
||||
NotifyFunc: m.prepareNotifyFunc(),
|
||||
NotifyFunc: m.prepareTestNotifyFunc(),
|
||||
SQLStore: m.sqlstore,
|
||||
UseLogsNewSchema: m.opts.UseLogsNewSchema,
|
||||
UseTraceNewSchema: m.opts.UseTraceNewSchema,
|
||||
})
|
||||
|
||||
@@ -52,6 +52,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
|
||||
opts.UseTraceNewSchema,
|
||||
WithSendAlways(),
|
||||
WithSendUnmatched(),
|
||||
WithSQLStore(opts.SQLStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -70,6 +71,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
|
||||
opts.ManagerOpts.PqlEngine,
|
||||
WithSendAlways(),
|
||||
WithSendUnmatched(),
|
||||
WithSQLStore(opts.SQLStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/apiserver"
|
||||
"go.signoz.io/signoz/pkg/cache"
|
||||
"go.signoz.io/signoz/pkg/config"
|
||||
@@ -44,6 +45,9 @@ type Config struct {
|
||||
|
||||
// TelemetryStore config
|
||||
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"`
|
||||
|
||||
// Alertmanager config
|
||||
Alertmanager alertmanager.Config `mapstructure:"alertmanager"`
|
||||
}
|
||||
|
||||
// DeprecatedFlags are the flags that are deprecated and scheduled for removal.
|
||||
@@ -63,6 +67,7 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprec
|
||||
sqlmigrator.NewConfigFactory(),
|
||||
apiserver.NewConfigFactory(),
|
||||
telemetrystore.NewConfigFactory(),
|
||||
alertmanager.NewConfigFactory(),
|
||||
}
|
||||
|
||||
conf, err := config.New(ctx, resolverConfig, configFactories)
|
||||
@@ -138,17 +143,26 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
|
||||
}
|
||||
|
||||
if deprecatedFlags.MaxIdleConns != 50 {
|
||||
fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS env variable instead.")
|
||||
fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS instead.")
|
||||
config.TelemetryStore.Connection.MaxIdleConns = deprecatedFlags.MaxIdleConns
|
||||
}
|
||||
|
||||
if deprecatedFlags.MaxOpenConns != 100 {
|
||||
fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS env variable instead.")
|
||||
fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS instead.")
|
||||
config.TelemetryStore.Connection.MaxOpenConns = deprecatedFlags.MaxOpenConns
|
||||
}
|
||||
|
||||
if deprecatedFlags.DialTimeout != 5*time.Second {
|
||||
fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT environment variable instead.")
|
||||
fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT instead.")
|
||||
config.TelemetryStore.Connection.DialTimeout = deprecatedFlags.DialTimeout
|
||||
}
|
||||
|
||||
if os.Getenv("ALERTMANAGER_API_PREFIX") != "" {
|
||||
fmt.Println("[Deprecated] env ALERTMANAGER_API_PREFIX is deprecated and scheduled for removal. Please use SIGNOZ_ALERTMANAGER_LEGACY_API__URL instead.")
|
||||
config.Alertmanager.Legacy.ApiURL = os.Getenv("ALERTMANAGER_API_PREFIX")
|
||||
}
|
||||
|
||||
if os.Getenv("ALERTMANAGER_API_CHANNEL_PATH") != "" {
|
||||
fmt.Println("[Deprecated] env ALERTMANAGER_API_CHANNEL_PATH is deprecated and scheduled for complete removal.")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package signoz
|
||||
|
||||
import (
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/legacyalertmanager"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/signozalertmanager"
|
||||
"go.signoz.io/signoz/pkg/cache"
|
||||
"go.signoz.io/signoz/pkg/cache/memorycache"
|
||||
"go.signoz.io/signoz/pkg/cache/rediscache"
|
||||
@@ -33,6 +36,9 @@ type ProviderConfig struct {
|
||||
|
||||
// Map of all telemetrystore provider factories
|
||||
TelemetryStoreProviderFactories factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]]
|
||||
|
||||
// Map of all alertmanager provider factories
|
||||
AlertmanagerProviderFactories factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]]
|
||||
}
|
||||
|
||||
func NewProviderConfig() ProviderConfig {
|
||||
@@ -62,9 +68,17 @@ func NewProviderConfig() ProviderConfig {
|
||||
sqlmigration.NewAddPatsFactory(),
|
||||
sqlmigration.NewModifyDatetimeFactory(),
|
||||
sqlmigration.NewModifyOrgDomainFactory(),
|
||||
sqlmigration.NewAddAlertmanagerFactory(),
|
||||
),
|
||||
TelemetryStoreProviderFactories: factory.MustNewNamedMap(
|
||||
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewFactory()),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
func NewAlertmanagerProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
legacyalertmanager.NewFactory(sqlstore),
|
||||
signozalertmanager.NewFactory(sqlstore),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -14,3 +14,9 @@ func TestNewProviderConfig(t *testing.T) {
|
||||
NewProviderConfig()
|
||||
})
|
||||
}
|
||||
|
||||
func TestNewAlertmanagerProviderFactories(t *testing.T) {
|
||||
assert.NotPanics(t, func() {
|
||||
NewAlertmanagerProviderFactories(nil)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package signoz
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/cache"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/instrumentation"
|
||||
@@ -16,10 +17,12 @@ import (
|
||||
)
|
||||
|
||||
type SigNoz struct {
|
||||
*factory.Registry
|
||||
Cache cache.Cache
|
||||
Web web.Web
|
||||
SQLStore sqlstore.SQLStore
|
||||
TelemetryStore telemetrystore.TelemetryStore
|
||||
Alertmanager alertmanager.Alertmanager
|
||||
}
|
||||
|
||||
func New(
|
||||
@@ -102,10 +105,32 @@ func New(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
alertmanager, err := factory.NewProviderFromNamedMap(
|
||||
ctx,
|
||||
providerSettings,
|
||||
config.Alertmanager,
|
||||
NewAlertmanagerProviderFactories(sqlstore),
|
||||
config.Alertmanager.Provider,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
registry, err := factory.NewRegistry(
|
||||
instrumentation.Logger(),
|
||||
factory.NewNamedService(factory.MustNewName("instrumentation"), instrumentation),
|
||||
factory.NewNamedService(factory.MustNewName("alertmanager"), alertmanager),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &SigNoz{
|
||||
Registry: registry,
|
||||
Cache: cache,
|
||||
Web: web,
|
||||
SQLStore: sqlstore,
|
||||
TelemetryStore: telemetrystore,
|
||||
Alertmanager: alertmanager,
|
||||
}, nil
|
||||
}
|
||||
|
||||
206
pkg/sqlmigration/013_add_alertmanager.go
Normal file
206
pkg/sqlmigration/013_add_alertmanager.go
Normal file
@@ -0,0 +1,206 @@
|
||||
package sqlmigration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/tidwall/gjson"
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerserver"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
type addAlertmanager struct{}
|
||||
|
||||
func NewAddAlertmanagerFactory() factory.ProviderFactory[SQLMigration, Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("add_alertmanager"), newAddAlertmanager)
|
||||
}
|
||||
|
||||
func newAddAlertmanager(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
|
||||
return &addAlertmanager{}, nil
|
||||
}
|
||||
|
||||
func (migration *addAlertmanager) Register(migrations *migrate.Migrations) error {
|
||||
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *addAlertmanager) Up(ctx context.Context, db *bun.DB) error {
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer tx.Rollback() //nolint:errcheck
|
||||
|
||||
if _, err := tx.
|
||||
NewDropColumn().
|
||||
Table("notification_channels").
|
||||
ColumnExpr("deleted").
|
||||
Exec(ctx); err != nil {
|
||||
if !strings.Contains(err.Error(), "no such column") {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := tx.
|
||||
NewAddColumn().
|
||||
Table("notification_channels").
|
||||
Apply(WrapIfNotExists(ctx, db, "notification_channels", "org_id")).
|
||||
Exec(ctx); err != nil && err != ErrNoExecute {
|
||||
return err
|
||||
}
|
||||
|
||||
var orgID string
|
||||
|
||||
err = tx.
|
||||
NewSelect().
|
||||
ColumnExpr("id").
|
||||
Table("organizations").
|
||||
Limit(1).
|
||||
Scan(ctx, &orgID)
|
||||
if err != nil {
|
||||
if err != sql.ErrNoRows {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
err = migration.populateOrgID(ctx, tx, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := tx.
|
||||
NewCreateTable().
|
||||
Model(&struct {
|
||||
bun.BaseModel `bun:"table:alertmanager_config"`
|
||||
ID uint64 `bun:"id,pk,autoincrement"`
|
||||
Config string `bun:"config,notnull,type:text"`
|
||||
Hash string `bun:"hash,notnull,type:text"`
|
||||
CreatedAt time.Time `bun:"created_at,notnull"`
|
||||
UpdatedAt time.Time `bun:"updated_at,notnull"`
|
||||
OrgID string `bun:"org_id,notnull,unique"`
|
||||
}{}).
|
||||
ForeignKey(`("org_id") REFERENCES "organizations" ("id")`).
|
||||
IfNotExists().
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.
|
||||
NewCreateTable().
|
||||
Model(&struct {
|
||||
bun.BaseModel `bun:"table:alertmanager_state"`
|
||||
ID uint64 `bun:"id,pk,autoincrement"`
|
||||
Silences string `bun:"silences,nullzero,type:text"`
|
||||
NFLog string `bun:"nflog,nullzero,type:text"`
|
||||
CreatedAt time.Time `bun:"created_at,notnull"`
|
||||
UpdatedAt time.Time `bun:"updated_at,notnull"`
|
||||
OrgID string `bun:"org_id,notnull,unique"`
|
||||
}{}).
|
||||
ForeignKey(`("org_id") REFERENCES "organizations" ("id")`).
|
||||
IfNotExists().
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := migration.populateAlertmanagerConfig(ctx, tx, orgID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *addAlertmanager) populateOrgID(ctx context.Context, tx bun.Tx, orgID string) error {
|
||||
if _, err := tx.
|
||||
NewUpdate().
|
||||
Table("notification_channels").
|
||||
Set("org_id = ?", orgID).
|
||||
Where("org_id IS NULL").
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *addAlertmanager) populateAlertmanagerConfig(ctx context.Context, tx bun.Tx, orgID string) error {
|
||||
var channels []*alertmanagertypes.Channel
|
||||
|
||||
err := tx.
|
||||
NewSelect().
|
||||
Model(&channels).
|
||||
Where("org_id = ?", orgID).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
type matcher struct {
|
||||
bun.BaseModel `bun:"table:rules"`
|
||||
ID int `bun:"id,pk"`
|
||||
Data string `bun:"data"`
|
||||
}
|
||||
|
||||
matchers := []matcher{}
|
||||
|
||||
err = tx.
|
||||
NewSelect().
|
||||
Column("id", "data").
|
||||
Model(&matchers).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
matchersMap := make(map[string][]string)
|
||||
for _, matcher := range matchers {
|
||||
receivers := gjson.Get(matcher.Data, "preferredChannels").Array()
|
||||
for _, receiver := range receivers {
|
||||
matchersMap[strconv.Itoa(matcher.ID)] = append(matchersMap[strconv.Itoa(matcher.ID)], receiver.String())
|
||||
}
|
||||
}
|
||||
|
||||
config, err := alertmanagertypes.NewConfigFromChannels(alertmanagerserver.NewConfig().Global, alertmanagerserver.NewConfig().Route, channels, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for ruleID, receivers := range matchersMap {
|
||||
err = config.CreateRuleIDMatcher(ruleID, receivers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := tx.
|
||||
NewInsert().
|
||||
Model(config.StoreableConfig()).
|
||||
On("CONFLICT (org_id) DO UPDATE").
|
||||
Set("config = ?", config.StoreableConfig().Config).
|
||||
Set("hash = ?", config.StoreableConfig().Hash).
|
||||
Set("updated_at = ?", config.StoreableConfig().UpdatedAt).
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *addAlertmanager) Down(ctx context.Context, db *bun.DB) error {
|
||||
return nil
|
||||
}
|
||||
@@ -6,11 +6,11 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"dario.cat/mergo"
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
"github.com/prometheus/alertmanager/pkg/labels"
|
||||
commoncfg "github.com/prometheus/common/config"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/uptrace/bun"
|
||||
@@ -274,15 +274,11 @@ func (c *Config) CreateRuleIDMatcher(ruleID string, receiverNames []string) erro
|
||||
return errors.New(errors.TypeInvalidInput, ErrCodeAlertmanagerConfigInvalid, "route is nil")
|
||||
}
|
||||
|
||||
routes := c.alertmanagerConfig.Route.Routes
|
||||
for i, route := range routes {
|
||||
if slices.Contains(receiverNames, route.Receiver) {
|
||||
matcher, err := labels.NewMatcher(labels.MatchEqual, "ruleId", ruleID)
|
||||
if err != nil {
|
||||
for i := range c.alertmanagerConfig.Route.Routes {
|
||||
if slices.Contains(receiverNames, c.alertmanagerConfig.Route.Routes[i].Receiver) {
|
||||
if err := appendRuleIDToRoute(c.alertmanagerConfig.Route.Routes[i], ruleID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.alertmanagerConfig.Route.Routes[i].Matchers = append(c.alertmanagerConfig.Route.Routes[i].Matchers, matcher)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -304,12 +300,9 @@ func (c *Config) UpdateRuleIDMatcher(ruleID string, receiverNames []string) erro
|
||||
|
||||
func (c *Config) DeleteRuleIDMatcher(ruleID string) error {
|
||||
routes := c.alertmanagerConfig.Route.Routes
|
||||
for i, r := range routes {
|
||||
j := slices.IndexFunc(r.Matchers, func(m *labels.Matcher) bool {
|
||||
return m.Name == "ruleId" && m.Value == ruleID
|
||||
})
|
||||
if j != -1 {
|
||||
c.alertmanagerConfig.Route.Routes[i].Matchers = slices.Delete(r.Matchers, j, j+1)
|
||||
for i := range routes {
|
||||
if err := removeRuleIDFromRoute(c.alertmanagerConfig.Route.Routes[i], ruleID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -323,10 +316,10 @@ func (c *Config) DeleteRuleIDMatcher(ruleID string) error {
|
||||
func (c *Config) ReceiverNamesFromRuleID(ruleID string) ([]string, error) {
|
||||
receiverNames := make([]string, 0)
|
||||
routes := c.alertmanagerConfig.Route.Routes
|
||||
for _, r := range routes {
|
||||
for _, m := range r.Matchers {
|
||||
if m.Name == "ruleId" && m.Value == ruleID {
|
||||
receiverNames = append(receiverNames, r.Receiver)
|
||||
for _, route := range routes {
|
||||
for _, matcher := range route.Matchers {
|
||||
if matcher.Name == ruleIDMatcherName && strings.Contains(matcher.Value, ruleID) {
|
||||
receiverNames = append(receiverNames, route.Receiver)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ func TestCreateRuleIDMatcher(t *testing.T) {
|
||||
},
|
||||
},
|
||||
ruleIDToReceivers: map[string][]string{"test-rule": {"slack-receiver"}},
|
||||
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}},
|
||||
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=~\"test-rule\""}}},
|
||||
},
|
||||
{
|
||||
name: "SlackAndEmailReceivers",
|
||||
@@ -58,7 +58,7 @@ func TestCreateRuleIDMatcher(t *testing.T) {
|
||||
},
|
||||
},
|
||||
ruleIDToReceivers: map[string][]string{"test-rule": {"slack-receiver", "email-receiver"}},
|
||||
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}, {"receiver": "email-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}},
|
||||
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=~\"test-rule\""}}, {"receiver": "email-receiver", "continue": true, "matchers": []any{"ruleId=~\"test-rule\""}}},
|
||||
},
|
||||
{
|
||||
name: "ReceiverDoesNotExist",
|
||||
@@ -92,7 +92,7 @@ func TestCreateRuleIDMatcher(t *testing.T) {
|
||||
},
|
||||
},
|
||||
ruleIDToReceivers: map[string][]string{"test-rule-1": {"slack-receiver", "does-not-exist"}, "test-rule-2": {"slack-receiver"}},
|
||||
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule-1\"", "ruleId=\"test-rule-2\""}}},
|
||||
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=~\"test-rule-1|test-rule-2\""}}},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -157,7 +157,7 @@ func TestDeleteRuleIDMatcher(t *testing.T) {
|
||||
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true}, {"receiver": "email-receiver", "continue": true}},
|
||||
},
|
||||
{
|
||||
name: "AlertNameDoesNotExist",
|
||||
name: "RuleIDDoesNotExist",
|
||||
orgID: "1",
|
||||
receivers: []config.Receiver{
|
||||
{
|
||||
@@ -180,7 +180,7 @@ func TestDeleteRuleIDMatcher(t *testing.T) {
|
||||
},
|
||||
ruleIDToReceivers: map[string][]string{"test-rule": {"email-receiver", "slack-receiver"}},
|
||||
ruleIDsToDelete: []string{"does-not-exist"},
|
||||
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}, {"receiver": "email-receiver", "continue": true, "matchers": []any{"ruleId=\"test-rule\""}}},
|
||||
expectedRoutes: []map[string]any{{"receiver": "slack-receiver", "continue": true, "matchers": []any{"ruleId=~\"test-rule\""}}, {"receiver": "email-receiver", "continue": true, "matchers": []any{"ruleId=~\"test-rule\""}}},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
64
pkg/types/alertmanagertypes/matcher.go
Normal file
64
pkg/types/alertmanagertypes/matcher.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package alertmanagertypes
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
"github.com/prometheus/alertmanager/pkg/labels"
|
||||
)
|
||||
|
||||
const (
|
||||
ruleIDMatcherName string = "ruleId"
|
||||
ruleIDMatcherValueSep string = "|"
|
||||
)
|
||||
|
||||
func appendRuleIDToRoute(route *config.Route, ruleID string) error {
|
||||
matcherIdx := slices.IndexFunc(route.Matchers, func(m *labels.Matcher) bool {
|
||||
return m.Name == ruleIDMatcherName
|
||||
})
|
||||
|
||||
if matcherIdx == -1 {
|
||||
matcher, err := labels.NewMatcher(labels.MatchRegexp, ruleIDMatcherName, ruleID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
route.Matchers = append(route.Matchers, matcher)
|
||||
return nil
|
||||
}
|
||||
|
||||
existingRuleIDs := strings.Split(route.Matchers[matcherIdx].Value, ruleIDMatcherValueSep)
|
||||
existingRuleIDs = append(existingRuleIDs, ruleID)
|
||||
route.Matchers[matcherIdx].Value = strings.Join(existingRuleIDs, ruleIDMatcherValueSep)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeRuleIDFromRoute(route *config.Route, ruleID string) error {
|
||||
matcherIdx := slices.IndexFunc(route.Matchers, func(m *labels.Matcher) bool {
|
||||
return m.Name == ruleIDMatcherName
|
||||
})
|
||||
|
||||
if matcherIdx == -1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
existingRuleIDs := strings.Split(route.Matchers[matcherIdx].Value, ruleIDMatcherValueSep)
|
||||
existingRuleIDs = slices.DeleteFunc(existingRuleIDs, func(id string) bool {
|
||||
return id == ruleID
|
||||
})
|
||||
|
||||
if len(existingRuleIDs) == 0 {
|
||||
route.Matchers = slices.Delete(route.Matchers, matcherIdx, matcherIdx+1)
|
||||
return nil
|
||||
}
|
||||
|
||||
matcher, err := labels.NewMatcher(labels.MatchRegexp, ruleIDMatcherName, strings.Join(existingRuleIDs, ruleIDMatcherValueSep))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
route.Matchers[matcherIdx] = matcher
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user