Compare commits

...

19 Commits

Author SHA1 Message Date
aniket
902536f371 feat(notification-routing): added threshold label 2025-09-04 13:05:50 +05:30
aniket
d36d883a80 fix(notification-routing): added fix for channel specific route 2025-09-04 02:08:42 +05:30
aniket
4f796536c4 Merge branch 'feat/notification-routing-v2' of github.com:SigNoz/signoz into feat/notification-routing-v2 2025-09-04 00:40:09 +05:30
aniket
01e9d0dcdd feat(notification-routing): added test cases for strategy 2025-09-04 00:39:50 +05:30
Srikanth Chekuri
5f7f120be3 Merge branch 'main' into feat/notification-routing-v2 2025-09-03 22:21:55 +05:30
aniket
b68253b87e fix(notification-routing): added unmarshalling for routes 2025-09-03 21:26:53 +05:30
aniket
f06f06227f fix(notificaiton-routing): added fix for delete rule 2025-09-03 16:06:59 +05:30
aniket
8a28373c26 fix(notification-routing): delete channel leads to invalid config 2025-09-03 01:15:42 +05:30
aniket
8888306597 feat(notification-routing): added notificaiton api dependency 2025-09-02 22:36:44 +05:30
aniket
f307913d97 Revert "feat(notification-routing): added debug logs"
This reverts commit 1f5bb2242f.
2025-09-02 22:17:03 +05:30
aniket
1f5bb2242f feat(notification-routing): added debug logs 2025-09-02 21:40:29 +05:30
aniket
907c0543f2 feat(notification-routing): added rule store miss dependency for enterprise 2025-09-02 16:22:02 +05:30
aniket
990ae275ae Merge branch 'main' of github.com:SigNoz/signoz into feat/notification-routing-v2 2025-09-02 15:50:47 +05:30
aniket
a130511dda feat(notification-routing): added channel based routing logic in policy routes 2025-09-02 15:22:38 +05:30
aniket
ef4ec5280e feat(notification-routing): added channel based routing logic in policy routes 2025-09-02 14:36:37 +05:30
aniket
5287b455ff feat(notification-routing): added channel absed routing logic in policy routes 2025-09-02 02:52:18 +05:30
aniket
5b3092d602 Merge branch 'main' of github.com:SigNoz/signoz into feat/notification-routing-v2 2025-08-30 00:46:22 +05:30
aniket
7da42c3c45 feat(notification-routing): architectural changes in routing tree to support notificatio routing 2025-08-30 00:17:01 +05:30
aniket
72d0cf5ac1 fix(rule-alert-state): added 404 status code for invalid rules 2025-08-28 13:09:19 +05:30
29 changed files with 3564 additions and 135 deletions

View File

@@ -1,6 +1,7 @@
package api
import (
"github.com/SigNoz/signoz/pkg/nfrouting"
"net/http"
"net/http/httputil"
"time"
@@ -39,6 +40,7 @@ type APIHandlerOptions struct {
UseLogsNewSchema bool
UseTraceNewSchema bool
JWT *authtypes.JWT
RoutingManager *alertmanager.RouteManager
}
type APIHandler struct {
@@ -55,11 +57,12 @@ func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler,
CloudIntegrationsController: opts.CloudIntegrationsController,
LogsParsingPipelineController: opts.LogsParsingPipelineController,
FluxInterval: opts.FluxInterval,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager, opts.RoutingManager),
LicensingAPI: httplicensing.NewLicensingAPI(signoz.Licensing),
FieldsAPI: fields.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.TelemetryStore),
Signoz: signoz,
QuerierAPI: querierAPI.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.Querier, signoz.Analytics),
NotificationRoutesAPI: nfrouting.NewAPI(signoz.Analytics, signoz.RouteStore, opts.RoutingManager),
})
if err != nil {

View File

@@ -3,6 +3,8 @@ package app
import (
"context"
"fmt"
"github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"log/slog"
"net"
"net/http"
@@ -98,6 +100,9 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
signoz.Cache,
)
ruleStore := sqlrulestore.NewRuleStore(signoz.SQLStore)
routeManager := alertmanager.NewManagerWithChannelRoutingStrategy(signoz.Alertmanager, ruleStore, signoz.RouteStore)
rm, err := makeRulesManager(
reader,
signoz.Cache,
@@ -108,6 +113,8 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
signoz.Modules.OrgGetter,
signoz.Querier,
signoz.Instrumentation.Logger(),
ruleStore,
routeManager,
)
if err != nil {
@@ -170,6 +177,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
Gateway: gatewayProxy,
GatewayUrl: config.Gateway.URL.String(),
JWT: jwt,
RoutingManager: routeManager,
}
apiHandler, err := api.NewAPIHandler(apiOpts, signoz)
@@ -418,17 +426,7 @@ func (s *Server) Stop(ctx context.Context) error {
return nil
}
func makeRulesManager(
ch baseint.Reader,
cache cache.Cache,
alertmanager alertmanager.Alertmanager,
sqlstore sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus,
orgGetter organization.Getter,
querier querier.Querier,
logger *slog.Logger,
) (*baserules.Manager, error) {
func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertmanager.Alertmanager, sqlstore sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, orgGetter organization.Getter, querier querier.Querier, logger *slog.Logger, ruleStore ruletypes.RuleStore, routeManager *alertmanager.RouteManager) (*baserules.Manager, error) {
// create manager opts
managerOpts := &baserules.ManagerOptions{
TelemetryStore: telemetryStore,
@@ -445,9 +443,11 @@ func makeRulesManager(
Alertmanager: alertmanager,
SQLStore: sqlstore,
OrgGetter: orgGetter,
RoutingManager: routeManager,
RuleStore: ruleStore,
}
// create Manager
// create RouteManager
manager, err := baserules.NewManager(managerOpts)
if err != nil {
return nil, fmt.Errorf("rule manager error: %v", err)

View File

@@ -185,8 +185,11 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere
if err != nil {
return nil, err
}
server.pipelineBuilder = notify.NewPipelineBuilder(server.registry, featurecontrol.NoopFlags{})
flags, err := featurecontrol.NewFlags(server.logger, featurecontrol.FeatureUTF8StrictMode)
if err != nil {
return nil, err
}
server.pipelineBuilder = notify.NewPipelineBuilder(server.registry, flags)
server.dispatcherMetrics = dispatch.NewDispatcherMetrics(false, server.registry)
return server, nil
@@ -225,7 +228,10 @@ func (server *Server) SetConfig(ctx context.Context, alertmanagerConfig *alertma
server.tmpl.ExternalURL = server.srvConfig.ExternalURL
// Build the routing tree and record which receivers are used.
err = alertmanagertypes.UnmarshalRouteConfig(config.Route)
if err != nil {
return err
}
routes := dispatch.NewRoute(config.Route, nil)
activeReceivers := make(map[string]struct{})
routes.Walk(func(r *dispatch.Route) {

View File

@@ -16,11 +16,13 @@ import (
type API struct {
alertmanager Alertmanager
routeManager *RouteManager
}
func NewAPI(alertmanager Alertmanager) *API {
func NewAPI(alertmanager Alertmanager, routeManager *RouteManager) *API {
return &API{
alertmanager: alertmanager,
routeManager: routeManager,
}
}
@@ -233,6 +235,18 @@ func (api *API) DeleteChannelByID(rw http.ResponseWriter, req *http.Request) {
return
}
channel, err := api.alertmanager.GetChannelByID(ctx, claims.OrgID, id)
if err != nil {
render.Error(rw, err)
return
}
err = api.routeManager.DeleteChannel(ctx, claims.OrgID, channel.Name)
if err != nil {
render.Error(rw, err)
return
}
err = api.alertmanager.DeleteChannelByID(ctx, claims.OrgID, id)
if err != nil {
render.Error(rw, err)

View File

@@ -0,0 +1,241 @@
package alertmanager
import (
"context"
"fmt"
"github.com/SigNoz/signoz/pkg/alertmanager/routestrategy"
"github.com/SigNoz/signoz/pkg/types/nfroutingtypes"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"strings"
)
type RouteManager struct {
strategy routestrategy.RoutingStrategy
alertmanager Alertmanager
ruleStore ruletypes.RuleStore
routeStore nfroutingtypes.RouteStore
}
func NewManagerWithChannelRoutingStrategy(
alertmanager Alertmanager,
ruleStore ruletypes.RuleStore,
routeStore nfroutingtypes.RouteStore,
) *RouteManager {
return &RouteManager{
strategy: routestrategy.NewChannelRoutingStrategy(),
alertmanager: alertmanager,
ruleStore: ruleStore,
routeStore: routeStore,
}
}
func (m *RouteManager) AddDirectRules(ctx context.Context, orgID, ruleId string, postableRule ruletypes.PostableRule) error {
if postableRule.Thresholds == nil {
var preferredChannels []string
if len(postableRule.PreferredChannels) == 0 {
channels, err := m.alertmanager.ListChannels(ctx, orgID)
if err != nil {
return err
}
for _, channel := range channels {
preferredChannels = append(preferredChannels, channel.Name)
}
postableRule.PreferredChannels = preferredChannels
} else {
preferredChannels = postableRule.PreferredChannels
}
postableRule.Thresholds = map[string][]string{
postableRule.Labels["severity"]: preferredChannels,
}
}
cfg, err := m.alertmanager.GetConfig(ctx, orgID)
if err != nil {
return fmt.Errorf("failed to get alertmanager config: %w", err)
}
if err := m.strategy.AddDirectRules(cfg, ruleId, postableRule); err != nil {
return fmt.Errorf("failed to add direct rules: %w", err)
}
cfg.UpdateStoreableConfig()
if err := m.alertmanager.SetConfig(ctx, cfg); err != nil {
return fmt.Errorf("failed to update alertmanager config: %w", err)
}
return nil
}
func (m *RouteManager) AddNotificationPolicyRules(ctx context.Context, orgID, ruleId string, postableRule ruletypes.PostableRule) error {
cfg, err := m.alertmanager.GetConfig(ctx, orgID)
if err != nil {
return fmt.Errorf("failed to get alertmanager config: %w", err)
}
if err := m.strategy.AddNotificationPolicyRules(cfg, ruleId, postableRule); err != nil {
return fmt.Errorf("failed to add notification policy rules: %w", err)
}
cfg.UpdateStoreableConfig()
if err := m.alertmanager.SetConfig(ctx, cfg); err != nil {
return fmt.Errorf("failed to update alertmanager config: %w", err)
}
return nil
}
func (m *RouteManager) AddNotificationPolicy(ctx context.Context, orgID, policyMatchers string, receivers []string, routeId string) error {
cfg, err := m.alertmanager.GetConfig(ctx, orgID)
if err != nil {
return fmt.Errorf("failed to get alertmanager config: %w", err)
}
rules, err := m.ruleStore.GetStoredRules(ctx, orgID)
if err != nil {
return err
}
ruleMaps := map[string]ruletypes.PostableRule{}
for _, rule := range rules {
postableRule, err := ruletypes.ParsePostableRule([]byte(rule.Data))
if err != nil {
return err
}
if postableRule.NotificationPolicies {
ruleMaps[rule.ID.StringValue()] = *postableRule
}
}
if err := m.strategy.AddNotificationPolicy(cfg, policyMatchers, receivers, ruleMaps, routeId); err != nil {
return fmt.Errorf("failed to add notification policy: %w", err)
}
cfg.UpdateStoreableConfig()
if err := m.alertmanager.SetConfig(ctx, cfg); err != nil {
return fmt.Errorf("failed to update alertmanager config: %w", err)
}
return nil
}
func (m *RouteManager) DeleteDirectRules(ctx context.Context, orgID, ruleId string) error {
cfg, err := m.alertmanager.GetConfig(ctx, orgID)
if err != nil {
return fmt.Errorf("failed to get alertmanager config: %w", err)
}
if err := m.strategy.DeleteDirectRules(ruleId, cfg); err != nil {
return fmt.Errorf("failed to delete direct rules: %w", err)
}
cfg.UpdateStoreableConfig()
if err := m.alertmanager.SetConfig(ctx, cfg); err != nil {
return fmt.Errorf("failed to update alertmanager config: %w", err)
}
return nil
}
func (m *RouteManager) DeleteNotificationPolicyRules(ctx context.Context, orgID, ruleId string) error {
cfg, err := m.alertmanager.GetConfig(ctx, orgID)
if err != nil {
return fmt.Errorf("failed to get alertmanager config: %w", err)
}
if err := m.strategy.DeleteNotificationPolicyRules(cfg, ruleId); err != nil {
return fmt.Errorf("failed to delete notification policy rules: %w", err)
}
cfg.UpdateStoreableConfig()
if err := m.alertmanager.SetConfig(ctx, cfg); err != nil {
return fmt.Errorf("failed to update alertmanager config: %w", err)
}
return nil
}
func (m *RouteManager) DeleteNotificationPolicy(ctx context.Context, orgID, routeId string) error {
cfg, err := m.alertmanager.GetConfig(ctx, orgID)
if err != nil {
return fmt.Errorf("failed to get alertmanager config: %w", err)
}
if err := m.strategy.DeleteNotificationPolicy(cfg, routeId); err != nil {
return fmt.Errorf("failed to delete notification policy: %w", err)
}
cfg.UpdateStoreableConfig()
if err := m.alertmanager.SetConfig(ctx, cfg); err != nil {
return fmt.Errorf("failed to update alertmanager config: %w", err)
}
return nil
}
func (m *RouteManager) DeleteChannel(ctx context.Context, orgID, channelName string) error {
rules, err := m.ruleStore.GetStoredRules(ctx, orgID)
if err != nil {
return err
}
ruleNames := []string{}
for _, rule := range rules {
postableRule, err := ruletypes.ParsePostableRule([]byte(rule.Data))
if err != nil {
return err
}
for _, channel := range postableRule.PreferredChannels {
if channel == channelName {
ruleNames = append(ruleNames, postableRule.AlertName)
}
}
}
if len(ruleNames) > 0 {
return fmt.Errorf("cant delete channel used in rules %s", strings.Join(ruleNames, ","))
}
npId, err := m.routeStore.GetAllByOrgID(ctx, orgID)
if err != nil {
return err
}
notificationPolicies := []string{}
for _, notificationPolicy := range npId {
for _, channel := range notificationPolicy.Channels {
if channel == channelName {
notificationPolicies = append(notificationPolicies, channel)
}
}
}
if len(notificationPolicies) > 0 {
return fmt.Errorf("cant delete channel used in noitificaiton policy %s", strings.Join(ruleNames, ","))
}
// Get current alertmanager config
cfg, err := m.alertmanager.GetConfig(ctx, orgID)
if err != nil {
return fmt.Errorf("failed to get alertmanager config: %w", err)
}
// Delete channel using strategy
if err := m.strategy.DeleteChannel(cfg, channelName); err != nil {
return fmt.Errorf("failed to delete channel: %w", err)
}
cfg.UpdateStoreableConfig()
// Update alertmanager config
if err := m.alertmanager.SetConfig(ctx, cfg); err != nil {
return fmt.Errorf("failed to update alertmanager config: %w", err)
}
return nil
}

View File

@@ -0,0 +1,611 @@
package routestrategy
import (
"fmt"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
amconfig "github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/matcher/compat"
"github.com/prometheus/alertmanager/pkg/labels"
"github.com/prometheus/common/model"
"slices"
"strings"
)
const defaultRuleReceiverName = "default-rule-receiver"
const defaultNotificationPolicyReceiverName = "default-notification-policy"
const defaultChannelReceiverName = "default-channel-receiver"
type RoutingStrategy interface {
AddDirectRules(config *alertmanagertypes.Config, ruleId string, postableRule ruletypes.PostableRule) error
AddNotificationPolicyRules(config *alertmanagertypes.Config, ruleId string, postableRule ruletypes.PostableRule) error
AddNotificationPolicy(config *alertmanagertypes.Config, policyMatchers string, receivers []string, ruleIds map[string]ruletypes.PostableRule, id string) error
AddChannel(config *alertmanagertypes.Config, ruleId string) error
DeleteDirectRules(ruleId string, config *alertmanagertypes.Config) error
DeleteNotificationPolicyRules(config *alertmanagertypes.Config, ruleId string) error
DeleteNotificationPolicy(config *alertmanagertypes.Config, policyMatchers string) error
DeleteChannel(config *alertmanagertypes.Config, channelName string) error
}
type ChannelRoutingStrategy struct {
}
// NewChannelRoutingStrategy creates a new instance of ChannelRoutingStrategy
func NewChannelRoutingStrategy() RoutingStrategy {
return &ChannelRoutingStrategy{}
}
func (crs *ChannelRoutingStrategy) AddChannel(config *alertmanagertypes.Config, channelName string) error {
if config.AlertmanagerConfig().Route == nil {
return fmt.Errorf("AlertManager config has no root route")
}
if channelName == "" {
return fmt.Errorf("AlertManager config has no channel name")
}
notificationPolicyRoutes := crs.findAllNotificationPolicyRoutes(config)
if len(notificationPolicyRoutes) == 0 {
return fmt.Errorf("AlertManager config has no notification policy routes")
}
for _, route := range notificationPolicyRoutes {
route.Routes = append(route.Routes, &amconfig.Route{
Continue: true,
Routes: []*amconfig.Route{},
Receiver: channelName,
})
}
return nil
}
// AddDirectRules adds rules directly to default rule route with thresholds
func (crs *ChannelRoutingStrategy) AddDirectRules(config *alertmanagertypes.Config, ruleId string, postableRule ruletypes.PostableRule) error {
if ruleId == "" {
return fmt.Errorf("invalid ruleId %s", ruleId)
}
if config.AlertmanagerConfig().Route == nil {
return fmt.Errorf("AlertManager config has no root route")
}
ruleRoute := crs.createDefaultRuleRoute(ruleId)
for threshold, receivers := range postableRule.Thresholds {
thresholdRoutes, err := crs.createThresholdRoute(threshold, receivers, postableRule.GroupBy, postableRule.Renotify)
if err != nil {
return fmt.Errorf("failed to create threshold route: %w", err)
}
ruleRoute.Routes = append(ruleRoute.Routes, thresholdRoutes...)
}
insertPos := crs.findDefaultRuleSection(config)
config.AlertmanagerConfig().Receivers = append(config.AlertmanagerConfig().Receivers, alertmanagertypes.Receiver{Name: getRuleReceiverName(ruleId)})
config.AlertmanagerConfig().Route.Routes[insertPos].Routes = slices.Insert(config.AlertmanagerConfig().Route.Routes[insertPos].Routes, insertPos, ruleRoute)
return nil
}
// AddNotificationPolicyRules adds rules to existing notification policy routes
func (crs *ChannelRoutingStrategy) AddNotificationPolicyRules(config *alertmanagertypes.Config, ruleId string, postableRule ruletypes.PostableRule) error {
if ruleId == "" {
return fmt.Errorf("invalid ruleId %s", ruleId)
}
notificationPolicyRoutes := crs.findAllNotificationPolicyRoutes(config)
// Add a new child route with single ruleId matcher for each channel route
for _, policyRoute := range notificationPolicyRoutes {
for _, channelRoute := range policyRoute.Routes {
ruleSpecificRoute := crs.createRuleSpecificChannelRoute(getChannelFormPolicyChannelReceiverName(channelRoute.Receiver), ruleId, postableRule)
channelRoute.Routes = append(channelRoute.Routes, ruleSpecificRoute)
}
}
return nil
}
func (crs *ChannelRoutingStrategy) AddNotificationPolicy(config *alertmanagertypes.Config, policyMatchers string, receivers []string, ruleIds map[string]ruletypes.PostableRule, routeId string) error {
if config.AlertmanagerConfig().Route == nil {
return fmt.Errorf("invalid config")
}
if len(receivers) == 0 {
return fmt.Errorf("no receivers found in route")
}
if len(ruleIds) == 0 {
return fmt.Errorf("no ruleids found in route")
}
parsedMatchers, err := crs.parseMatchers(policyMatchers)
if err != nil {
return fmt.Errorf("failed to parse policy matchers: %w", err)
}
notificationIdMatcher, err := labels.NewMatcher(labels.MatchEqual, "notification_policy", "true")
if err != nil {
return fmt.Errorf("failed to parse policy matchers: %w", err)
}
parsedMatchers = append(parsedMatchers, *notificationIdMatcher)
policyRoute := crs.createNotificationPolicyRoute(parsedMatchers, routeId)
// Create channel routes for each receiver
var channelReceivers []alertmanagertypes.Receiver
for _, receiver := range receivers {
channelRoute := crs.createPolicyChannelRoute(receiver)
channelReceivers = append(channelReceivers, alertmanagertypes.Receiver{Name: getPolicyChannelReceiverName(receiver)})
for ruleId, postableRule := range ruleIds {
ruleSpecificRoute := crs.createRuleSpecificChannelRoute(receiver, ruleId, postableRule)
channelRoute.Routes = append(channelRoute.Routes, ruleSpecificRoute)
}
policyRoute.Routes = append(policyRoute.Routes, channelRoute)
}
config.AlertmanagerConfig().Route.Routes = slices.Insert(config.AlertmanagerConfig().Route.Routes, 0, policyRoute)
config.AlertmanagerConfig().Receivers = append(config.AlertmanagerConfig().Receivers, channelReceivers...)
config.AlertmanagerConfig().Receivers = append(config.AlertmanagerConfig().Receivers, alertmanagertypes.Receiver{Name: getNotificationPolicyReceiverName(routeId)})
return nil
}
// DeleteDirectRules removes rule routes from default rule section
func (crs *ChannelRoutingStrategy) DeleteDirectRules(ruleId string, config *alertmanagertypes.Config) error {
if ruleId == "" {
return fmt.Errorf("invalid ruleId %s", ruleId)
}
if config.AlertmanagerConfig().Route == nil {
return fmt.Errorf("AlertManager config has no root route")
}
routes := config.AlertmanagerConfig().Route.Routes
var filteredRoutes []*amconfig.Route
for _, route := range routes {
if crs.isDefaultRuleRoute(route) {
for _, childRoute := range route.Routes {
if !(getRuleReceiverName(ruleId) == childRoute.Receiver) {
filteredRoutes = append(filteredRoutes, childRoute)
}
}
route.Routes = filteredRoutes
break
}
}
var filteredReceivers []amconfig.Receiver
for _, receiver := range config.AlertmanagerConfig().Receivers {
if !(receiver.Name == getRuleReceiverName(ruleId)) {
filteredReceivers = append(filteredReceivers, receiver)
}
}
config.AlertmanagerConfig().Receivers = filteredReceivers
return nil
}
// DeleteNotificationPolicyRules removes ruleId from existing notification policy routes
func (crs *ChannelRoutingStrategy) DeleteNotificationPolicyRules(config *alertmanagertypes.Config, ruleId string) error {
if ruleId == "" {
return fmt.Errorf("invalid ruleId %s", ruleId)
}
notificationPolicyRoutes := crs.findAllNotificationPolicyRoutes(config)
for _, policyRoute := range notificationPolicyRoutes {
for _, channelRoute := range policyRoute.Routes {
var filteredRuleRoutes []*amconfig.Route
for _, ruleRoute := range channelRoute.Routes {
if !crs.hasSpecificRuleIDMatcher(ruleRoute, ruleId) {
filteredRuleRoutes = append(filteredRuleRoutes, ruleRoute)
}
}
channelRoute.Routes = filteredRuleRoutes
}
}
return nil
}
// DeleteNotificationPolicy removes entire notification policy and its channel routes
func (crs *ChannelRoutingStrategy) DeleteNotificationPolicy(config *alertmanagertypes.Config, routeId string) error {
if config.AlertmanagerConfig().Route == nil {
return fmt.Errorf("invalid config")
}
// Find and remove routes that match the policy matchers
routes := config.AlertmanagerConfig().Route.Routes
var filteredRoutes []*amconfig.Route
for _, route := range routes {
if !(route.Receiver == getNotificationPolicyReceiverName(routeId)) {
filteredRoutes = append(filteredRoutes, route)
}
}
var filteredReceivers []amconfig.Receiver
for _, receiver := range config.AlertmanagerConfig().Receivers {
if !(receiver.Name == getRuleReceiverName(routeId)) {
filteredReceivers = append(filteredReceivers, receiver)
}
}
config.AlertmanagerConfig().Receivers = filteredReceivers
config.AlertmanagerConfig().Route.Routes = filteredRoutes
return nil
}
// DeleteChannel removes specific channel from all routes
func (crs *ChannelRoutingStrategy) DeleteChannel(config *alertmanagertypes.Config, channelName string) error {
if channelName == "" {
return fmt.Errorf("invalid channelName")
}
if config.AlertmanagerConfig().Route == nil {
return fmt.Errorf("invalid config")
}
for i, existingReceiver := range config.AlertmanagerConfig().Receivers {
if existingReceiver.Name == channelName {
config.AlertmanagerConfig().Receivers = append(config.AlertmanagerConfig().Receivers[:i], config.AlertmanagerConfig().Receivers[i+1:]...)
break
}
}
return nil
}
// Helper functions
// createDefaultRuleRoute creates a route with only ruleId matcher (no notification policy matchers)
func (crs *ChannelRoutingStrategy) createDefaultRuleRoute(ruleId string) *amconfig.Route {
ruleIDMatcher, _ := labels.NewMatcher(labels.MatchEqual, "ruleId", ruleId)
notificationIdMatcher, _ := labels.NewMatcher(labels.MatchEqual, "notification_policy", "false")
return &amconfig.Route{
Receiver: getRuleReceiverName(ruleId),
Matchers: amconfig.Matchers{ruleIDMatcher, notificationIdMatcher},
Continue: true,
Routes: []*amconfig.Route{}, // Will contain threshold routes
}
}
func (crs *ChannelRoutingStrategy) createDefaultBaseRuleRoute() *amconfig.Route {
return &amconfig.Route{
Receiver: defaultRuleReceiverName,
Continue: true,
Routes: []*amconfig.Route{}, // Will contain threshold routes
}
}
// createThresholdRoute creates a threshold route as child of rule route
func (crs *ChannelRoutingStrategy) createThresholdRoute(threshold string, receivers []string, groupBy []string, renotify ruletypes.Duration) ([]*amconfig.Route, error) {
thresholdMatcher, _ := labels.NewMatcher(labels.MatchEqual, "threshold.name", threshold)
matchers := []labels.Matcher{*thresholdMatcher}
repeatInterval := (*model.Duration)(&renotify)
configMatchers := crs.convertToConfigMatchers(matchers)
var thresholdRoutes []*amconfig.Route
for _, receiver := range receivers {
thresholdRoute := &amconfig.Route{
Receiver: receiver,
Matchers: configMatchers,
Continue: false,
GroupByStr: groupBy,
RepeatInterval: repeatInterval,
}
thresholdRoutes = append(thresholdRoutes, thresholdRoute)
}
return thresholdRoutes, nil
}
// createNotificationPolicyRoute creates parent route with policy expression matchers
func (crs *ChannelRoutingStrategy) createNotificationPolicyRoute(policyMatchers []labels.Matcher, id string) *amconfig.Route {
configMatchers := crs.convertToConfigMatchers(policyMatchers)
return &amconfig.Route{
Matchers: configMatchers,
Continue: true,
Routes: []*amconfig.Route{},
Receiver: getNotificationPolicyReceiverName(id),
}
}
func getNotificationPolicyReceiverName(routeId string) string {
return defaultNotificationPolicyReceiverName + "_" + routeId
}
func getRuleReceiverName(ruleId string) string {
return defaultRuleReceiverName + "_" + ruleId
}
func getPolicyChannelReceiverName(channelId string) string {
return defaultChannelReceiverName + "_" + channelId
}
func getChannelFormPolicyChannelReceiverName(channelId string) string {
return strings.Replace(channelId, defaultChannelReceiverName+"_", "", -1)
}
// createPolicyChannelRoute creates channel route without rule matchers (parent for rule-specific routes)
func (crs *ChannelRoutingStrategy) createPolicyChannelRoute(receiver string) *amconfig.Route {
return &amconfig.Route{
Receiver: getPolicyChannelReceiverName(receiver),
Continue: true, // Continue to rule-specific child routes
Routes: []*amconfig.Route{}, // Will contain rule-specific routes
}
}
// createRuleSpecificChannelRoute creates a child route for a specific rule ID
func (crs *ChannelRoutingStrategy) createRuleSpecificChannelRoute(receiver string, ruleId string, postableRule ruletypes.PostableRule) *amconfig.Route {
// Create single ruleId matcher
ruleIDMatcher, _ := labels.NewMatcher(labels.MatchEqual, "ruleId", ruleId)
matchers := []labels.Matcher{*ruleIDMatcher}
repeatInterval := (*model.Duration)(&postableRule.Renotify)
ruleRoute := &amconfig.Route{
Receiver: receiver,
Matchers: crs.convertToConfigMatchers(matchers),
Continue: true,
GroupByStr: postableRule.GroupBy,
RepeatInterval: repeatInterval,
}
return ruleRoute
}
// parseMatchers parses matcher string into labels.Matcher slice
func (crs *ChannelRoutingStrategy) parseMatchers(matcherStr string) ([]labels.Matcher, error) {
if strings.TrimSpace(matcherStr) == "" {
return []labels.Matcher{}, nil
}
// Split by & or | operators
parts := strings.FieldsFunc(matcherStr, func(r rune) bool {
return r == '&' || r == '|'
})
var matchers []labels.Matcher
for _, part := range parts {
part = strings.Trim(strings.TrimSpace(part), "()")
if part == "" {
continue
}
matcher, err := compat.Matcher(part, "noop")
if err != nil {
return nil, fmt.Errorf("invalid matcher %s: %w", part, err)
}
matchers = append(matchers, *matcher)
}
return matchers, nil
}
// convertToConfigMatchers converts labels.Matcher to amconfig.Matchers
func (crs *ChannelRoutingStrategy) convertToConfigMatchers(matchers []labels.Matcher) amconfig.Matchers {
var configMatchers amconfig.Matchers
for _, matcher := range matchers {
matcherCopy := matcher
configMatchers = append(configMatchers, &matcherCopy)
}
return configMatchers
}
// findDefaultRuleSection finds where to insert default rule routes (after notification policies)
func (crs *ChannelRoutingStrategy) findDefaultRuleSection(config *alertmanagertypes.Config) int {
routes := config.AlertmanagerConfig().Route.Routes
for i, route := range routes {
if crs.isDefaultRuleRoute(route) {
return i // Insert before first default rule
}
}
//create default base rule route
baseRoute := crs.createDefaultBaseRuleRoute()
config.AlertmanagerConfig().Route.Routes = append(config.AlertmanagerConfig().Route.Routes, crs.createDefaultBaseRuleRoute())
config.AlertmanagerConfig().Receivers = append(config.AlertmanagerConfig().Receivers, amconfig.Receiver{
Name: baseRoute.Receiver,
})
return len(routes)
}
// isDefaultRuleRoute checks if route is a default rule route (only has ruleId matcher)
func (crs *ChannelRoutingStrategy) isDefaultRuleRoute(route *amconfig.Route) bool {
return route.Receiver == defaultRuleReceiverName
}
func (crs *ChannelRoutingStrategy) isNotificationRuleRoute(route *amconfig.Route) bool {
return strings.HasPrefix(route.Receiver, defaultNotificationPolicyReceiverName)
}
// isSystemMatcher checks if matcher is a system/metadata matcher
func (crs *ChannelRoutingStrategy) isSystemMatcher(name string) bool {
systemMatchers := []string{"routing_strategy", "route_id", "org_id", "created_by"}
return slices.Contains(systemMatchers, name)
}
// findAllNotificationPolicyRoutes finds all notification policy routes (non-default rule routes)
func (crs *ChannelRoutingStrategy) findAllNotificationPolicyRoutes(config *alertmanagertypes.Config) []*amconfig.Route {
var notificationPolicyRoutes []*amconfig.Route
for _, route := range config.AlertmanagerConfig().Route.Routes {
// Notification policy routes are routes that are not default rule routes
if crs.isNotificationRuleRoute(route) {
notificationPolicyRoutes = append(notificationPolicyRoutes, route)
}
}
return notificationPolicyRoutes
}
// findPolicyChannelRoutes finds channel routes that match policy matchers
func (crs *ChannelRoutingStrategy) findPolicyChannelRoutes(config *alertmanagertypes.Config, policyMatchers []labels.Matcher) []*amconfig.Route {
var channelRoutes []*amconfig.Route
for _, route := range config.AlertmanagerConfig().Route.Routes {
if crs.routeMatchesPolicyMatchers(route, policyMatchers) && crs.hasRuleIDMatcher(route) {
channelRoutes = append(channelRoutes, route)
}
}
return channelRoutes
}
// routeMatchesPolicyMatchers checks if route contains all policy matchers
func (crs *ChannelRoutingStrategy) routeMatchesPolicyMatchers(route *amconfig.Route, policyMatchers []labels.Matcher) bool {
for _, policyMatcher := range policyMatchers {
found := false
for _, routeMatcher := range route.Matchers {
if routeMatcher.Name == policyMatcher.Name &&
routeMatcher.Value == policyMatcher.Value &&
routeMatcher.Type == policyMatcher.Type {
found = true
break
}
}
if !found {
return false
}
}
return true
}
// hasRuleIDMatcher checks if route has ruleId matcher
func (crs *ChannelRoutingStrategy) hasRuleIDMatcher(route *amconfig.Route) bool {
for _, matcher := range route.Matchers {
if matcher.Name == "ruleId" {
return true
}
}
return false
}
// hasSpecificRuleIDMatcher checks if route has a specific ruleId matcher value
func (crs *ChannelRoutingStrategy) hasSpecificRuleIDMatcher(route *amconfig.Route, ruleId string) bool {
for _, matcher := range route.Matchers {
if matcher.Name == "ruleId" && matcher.Value == ruleId {
return true
}
}
return false
}
// appendToRuleIDMatcher appends new rule IDs to existing ruleId matcher
func (crs *ChannelRoutingStrategy) appendToRuleIDMatcher(route *amconfig.Route, newRuleIDs []string) error {
for i, matcher := range route.Matchers {
if matcher.Name == "ruleId" {
existingRules := strings.Split(matcher.Value, "|")
for _, newRule := range newRuleIDs {
if !slices.Contains(existingRules, newRule) {
existingRules = append(existingRules, newRule)
}
}
newValue := strings.Join(existingRules, "|")
newMatcher, _ := labels.NewMatcher(matcher.Type, "ruleId", newValue)
route.Matchers[i] = newMatcher
return nil
}
}
return fmt.Errorf("no ruleId matcher found in route")
}
// extractPolicyMatchers extracts policy matchers from route (non-ruleId matchers)
func (crs *ChannelRoutingStrategy) extractPolicyMatchers(route *amconfig.Route) []labels.Matcher {
var policyMatchers []labels.Matcher
for _, matcher := range route.Matchers {
if matcher.Name != "ruleId" && !crs.isSystemMatcher(matcher.Name) {
policyMatchers = append(policyMatchers, *matcher)
}
}
return policyMatchers
}
// extractRuleIDs extracts rule IDs from route's ruleId matcher
func (crs *ChannelRoutingStrategy) extractRuleIDs(route *amconfig.Route) []string {
for _, matcher := range route.Matchers {
if matcher.Name == "ruleId" {
if strings.Contains(matcher.Value, "|") {
return strings.Split(matcher.Value, "|")
}
return []string{matcher.Value}
}
}
return []string{}
}
// extractChannels extracts receiver channel from route (for policy creation)
func (crs *ChannelRoutingStrategy) extractChannels(route *amconfig.Route) []string {
if route.Receiver != "" {
return []string{route.Receiver}
}
// If route has child routes, extract their receivers
var channels []string
for _, childRoute := range route.Routes {
if childRoute.Receiver != "" {
channels = append(channels, childRoute.Receiver)
}
}
return channels
}
// removeFromRuleIDMatcher removes a specific rule ID from existing ruleId matcher
func (crs *ChannelRoutingStrategy) removeFromRuleIDMatcher(route *amconfig.Route, ruleIdToRemove string) error {
for i, matcher := range route.Matchers {
if matcher.Name == "ruleId" {
existingRules := strings.Split(matcher.Value, "|")
// Remove the specified ruleId
var filteredRules []string
for _, rule := range existingRules {
if rule != ruleIdToRemove {
filteredRules = append(filteredRules, rule)
}
}
// If no rules left, we could remove the matcher entirely or keep it empty
// For now, we'll update with the filtered rules
if len(filteredRules) > 0 {
newValue := strings.Join(filteredRules, "|")
newMatcher, _ := labels.NewMatcher(matcher.Type, "ruleId", newValue)
route.Matchers[i] = newMatcher
} else {
// Remove the matcher entirely if no rules left
route.Matchers = append(route.Matchers[:i], route.Matchers[i+1:]...)
}
return nil
}
}
return fmt.Errorf("no ruleId matcher found in route")
}
// removeChannelFromRoutes recursively removes channel routes with specific receiver
func (crs *ChannelRoutingStrategy) removeChannelFromRoutes(routes []*amconfig.Route, channelName string) {
for _, route := range routes {
if route.Routes != nil {
// Filter out channel routes with matching receiver
var filteredChildRoutes []*amconfig.Route
for _, childRoute := range route.Routes {
if childRoute.Receiver != channelName {
filteredChildRoutes = append(filteredChildRoutes, childRoute)
}
}
route.Routes = filteredChildRoutes
// Recursively process child routes
crs.removeChannelFromRoutes(route.Routes, channelName)
}
}
}
// cleanupEmptyPolicyRoutes removes notification policy routes that have no child routes left
func (crs *ChannelRoutingStrategy) cleanupEmptyPolicyRoutes(config *alertmanagertypes.Config) {
routes := config.AlertmanagerConfig().Route.Routes
var filteredRoutes []*amconfig.Route
for _, route := range routes {
// If this is a notification policy route and it has no child routes, remove it
if !crs.isDefaultRuleRoute(route) && len(route.Routes) == 0 {
// Skip this route (delete it)
continue
}
filteredRoutes = append(filteredRoutes, route)
}
config.AlertmanagerConfig().Route.Routes = filteredRoutes
}

File diff suppressed because it is too large Load Diff

View File

@@ -172,16 +172,22 @@ func (service *Service) newServer(ctx context.Context, orgID string) (*alertmana
return nil, err
}
beforeCompareAndSelectHash := config.StoreableConfig().Hash
config, err = service.compareAndSelectConfig(ctx, config)
if err != nil {
return nil, err
}
// Note: The new routing system is rule-centric rather than matcher-centric.
// Individual rules will add themselves to routes when they are created/updated
// using the AddRuleToRoutes method, so we don't need to recreate all matchers here.
// This function now primarily ensures the base alertmanager config structure
// (receivers, global config) is up to date with the channel configuration.
if beforeCompareAndSelectHash == config.StoreableConfig().Hash {
service.settings.Logger().DebugContext(ctx, "skipping config store update for org", "org_id", orgID, "hash", config.StoreableConfig().Hash)
return server, nil
}
//beforeCompareAndSelectHash := config.StoreableConfig().Hash
//config, err = service.compareAndSelectConfig(ctx, config)
//if err != nil {
// return nil, err
//}
//if beforeCompareAndSelectHash == config.StoreableConfig().Hash {
// service.settings.Logger().DebugContext(ctx, "skipping config store update for org", "org_id", orgID, "hash", config.StoreableConfig().Hash)
// return server, nil
//}
err = service.configStore.Set(ctx, config)
if err != nil {

View File

@@ -2,6 +2,8 @@ package signozalertmanager
import (
"context"
"github.com/prometheus/alertmanager/featurecontrol"
"github.com/prometheus/alertmanager/matcher/compat"
"time"
"github.com/SigNoz/signoz/pkg/alertmanager"
@@ -54,6 +56,7 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
}
func (provider *provider) Start(ctx context.Context) error {
compat.InitFromFlags(provider.settings.Logger(), featurecontrol.NoopFlags{})
if err := provider.service.SyncServers(ctx); err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to sync alertmanager servers", "error", err)
return err
@@ -131,20 +134,11 @@ func (provider *provider) UpdateChannelByReceiverAndID(ctx context.Context, orgI
}
func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, channelID valuer.UUID) error {
channel, err := provider.configStore.GetChannelByID(ctx, orgID, channelID)
if err != nil {
return err
}
config, err := provider.configStore.Get(ctx, orgID)
if err != nil {
return err
}
if err := config.DeleteReceiver(channel.Name); err != nil {
return err
}
return provider.configStore.DeleteChannelByID(ctx, orgID, channelID, alertmanagertypes.WithCb(func(ctx context.Context) error {
return provider.configStore.Set(ctx, config)
}))

150
pkg/nfrouting/api.go Normal file
View File

@@ -0,0 +1,150 @@
package nfrouting
import (
"encoding/json"
"fmt"
"github.com/SigNoz/signoz/pkg/valuer"
"net/http"
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/analytics"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/nfroutingtypes"
"github.com/gorilla/mux"
)
type API struct {
analytics analytics.Analytics
routeStore nfroutingtypes.RouteStore
routeManager *alertmanager.RouteManager
}
func NewAPI(analytics analytics.Analytics, routeStore nfroutingtypes.RouteStore, routeManager *alertmanager.RouteManager) *API {
return &API{
analytics: analytics,
routeStore: routeStore,
routeManager: routeManager,
}
}
func (api *API) CreateNotificationRoute(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(w, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(w, err)
return
}
var route nfroutingtypes.ExpressionRouteRequest
if err := json.NewDecoder(r.Body).Decode(&route); err != nil {
render.Error(w, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid request body: %v", err))
return
}
if err := route.Validate(); err != nil {
render.Error(w, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid request body: %v", err))
return
}
expressionRoute := route.ToExpressionRoute(claims.OrgID, claims.UserID)
routeId, err := api.routeStore.Create(ctx, expressionRoute)
if err != nil {
render.Error(w, fmt.Errorf("failed to create route: %v", err))
return
}
expressionRoute.Identifiable.ID = routeId
err = api.routeManager.AddNotificationPolicy(ctx, orgID.StringValue(), expressionRoute.Expression, expressionRoute.Channels, expressionRoute.ID.StringValue())
if err != nil {
render.Error(w, err)
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
render.Success(w, http.StatusOK, expressionRoute)
}
func (api *API) GetNotificationRouteByID(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)["id"]
if vars == "" {
render.Error(w, errors.NewInvalidInputf(errors.CodeInvalidInput, "route ID is required"))
return
}
id, err := valuer.NewUUID(vars)
if err != nil {
render.Error(w, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid route ID: %v", err))
return
}
route, err := api.routeStore.GetByID(ctx, id)
if err != nil {
render.Error(w, fmt.Errorf("failed to get route: %v", err))
return
}
w.Header().Set("Content-Type", "application/json")
render.Success(w, http.StatusOK, route)
}
func (api *API) GetAllNotificationRoutesByOrgID(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
orgID := r.URL.Query().Get("org_id")
if orgID == "" {
render.Error(w, errors.NewInvalidInputf(errors.CodeInvalidInput, "org_id query parameter is required"))
return
}
routes, err := api.routeStore.GetAllByOrgID(ctx, orgID)
if err != nil {
render.Error(w, fmt.Errorf("failed to get routes: %v", err))
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(routes)
}
func (api *API) DeleteNotificationRouteByID(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(w, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(w, err)
return
}
vars := mux.Vars(r)
id := vars["id"]
if id == "" {
render.Error(w, errors.NewInvalidInputf(errors.CodeInvalidInput, "route ID is required"))
return
}
if err := api.routeStore.Delete(ctx, id); err != nil {
render.Error(w, fmt.Errorf("failed to delete route: %v", err))
return
}
err = api.routeManager.DeleteNotificationPolicy(ctx, orgID.StringValue(), id)
if err != nil {
render.Error(w, errors.NewNotFoundf(errors.CodeNotFound, "failed to get route: %v", err))
return
}
w.WriteHeader(http.StatusNoContent)
}

24
pkg/nfrouting/config.go Normal file
View File

@@ -0,0 +1,24 @@
package nfrouting
import "github.com/SigNoz/signoz/pkg/factory"
type Config struct {
Provider string `mapstructure:"provider"`
}
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("nfrouting"), newConfig)
}
// newConfig creates a new default configuration for notification grouping.
func newConfig() factory.Config {
return Config{
Provider: "expression",
}
}
// Validate validates the configuration and returns an error if invalid.
func (c Config) Validate() error {
// Add validation logic here if needed
return nil
}

View File

@@ -0,0 +1,36 @@
package expressionroutes
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/nfrouting"
"github.com/SigNoz/signoz/pkg/types/nfroutingtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// provider handles expression-based routing for notifications
type provider struct {
routeStore nfroutingtypes.RouteStore
}
func NewFactory(store nfroutingtypes.RouteStore) factory.ProviderFactory[nfrouting.NotificationRoutes, nfrouting.Config] {
return factory.NewProviderFactory(
factory.MustNewName("expression"),
func(ctx context.Context, settings factory.ProviderSettings, config nfrouting.Config) (nfrouting.NotificationRoutes, error) {
return New(ctx, settings, store)
},
)
}
// New creates a new rule-based grouping strategy provider.
func New(ctx context.Context, providerSettings factory.ProviderSettings, store nfroutingtypes.RouteStore) (nfrouting.NotificationRoutes, error) {
_ = factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/nfrouting/expressionroutes")
return &provider{
routeStore: store,
}, nil
}
func (p *provider) Collect(context.Context, valuer.UUID) (map[string]any, error) {
return nil, nil
}

View File

@@ -0,0 +1,210 @@
package expressionroutes
import (
"context"
"testing"
"github.com/SigNoz/signoz/pkg/factory/factorytest"
"github.com/SigNoz/signoz/pkg/types/nfroutingtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// mockRouteStore implements nfroutingtypes.RouteStore for testing
type mockRouteStore struct {
routes map[string][]nfroutingtypes.ExpressionRoute
}
func newMockRouteStore() *mockRouteStore {
return &mockRouteStore{
routes: make(map[string][]nfroutingtypes.ExpressionRoute),
}
}
func (m *mockRouteStore) GetByID(ctx context.Context, id valuer.UUID) (*nfroutingtypes.ExpressionRoute, error) {
for _, orgRoutes := range m.routes {
for _, route := range orgRoutes {
if route.ID.StringValue() == id.StringValue() {
return &route, nil
}
}
}
return nil, nil
}
func (m *mockRouteStore) Create(ctx context.Context, route *nfroutingtypes.ExpressionRoute) (valuer.UUID, error) {
if m.routes[route.OrgID] == nil {
m.routes[route.OrgID] = make([]nfroutingtypes.ExpressionRoute, 0)
}
m.routes[route.OrgID] = append(m.routes[route.OrgID], *route)
return route.ID, nil
}
func (m *mockRouteStore) Update(ctx context.Context, route *nfroutingtypes.ExpressionRoute) error {
for orgID, orgRoutes := range m.routes {
if orgID == route.OrgID {
for i, existingRoute := range orgRoutes {
if existingRoute.ID.StringValue() == route.ID.StringValue() {
m.routes[orgID][i] = *route
return nil
}
}
}
}
return nil
}
func (m *mockRouteStore) Delete(ctx context.Context, id string) error {
for orgID, orgRoutes := range m.routes {
for i, route := range orgRoutes {
if route.ID.StringValue() == id {
m.routes[orgID] = append(orgRoutes[:i], orgRoutes[i+1:]...)
return nil
}
}
}
return nil
}
func (m *mockRouteStore) GetAllByOrgID(ctx context.Context, orgID string) ([]nfroutingtypes.ExpressionRoute, error) {
routes, exists := m.routes[orgID]
if !exists {
return []nfroutingtypes.ExpressionRoute{}, nil
}
return routes, nil
}
func TestNew(t *testing.T) {
ctx := context.Background()
mockStore := newMockRouteStore()
settings := factorytest.NewSettings()
provider, err := New(ctx, settings, mockStore)
require.NoError(t, err)
require.NotNil(t, provider)
// Test that it implements the statsreporter.StatsCollector interface
stats, err := provider.Collect(ctx, valuer.GenerateUUID())
require.NoError(t, err)
assert.Nil(t, stats) // Current implementation returns nil
}
func TestNewFactory(t *testing.T) {
mockStore := newMockRouteStore()
factory := NewFactory(mockStore)
require.NotNil(t, factory)
assert.Equal(t, "expression", factory.Name().String())
}
func TestProvider_Collect(t *testing.T) {
ctx := context.Background()
mockStore := newMockRouteStore()
settings := factorytest.NewSettings()
provider, err := New(ctx, settings, mockStore)
require.NoError(t, err)
// Test Collect method
uuid := valuer.GenerateUUID()
stats, err := provider.Collect(ctx, uuid)
require.NoError(t, err)
assert.Nil(t, stats) // Current implementation returns nil stats
}
func TestRouteStore_CRUD_Operations(t *testing.T) {
ctx := context.Background()
mockStore := newMockRouteStore()
// Test Create
route := &nfroutingtypes.ExpressionRoute{
Expression: `labels["severity"] == "critical"`,
Channels: []string{"slack-alerts"},
Priority: "high",
Name: "Critical Alert Route",
OrgID: "test-org-1",
}
route.ID = valuer.GenerateUUID()
_, err := mockStore.Create(ctx, route)
require.NoError(t, err)
// Test GetByID
retrieved, err := mockStore.GetByID(ctx, route.ID)
require.NoError(t, err)
require.NotNil(t, retrieved)
assert.Equal(t, route.Expression, retrieved.Expression)
assert.Equal(t, route.Channels, retrieved.Channels)
assert.Equal(t, route.OrgID, retrieved.OrgID)
// Test GetAllByOrgID
routes, err := mockStore.GetAllByOrgID(ctx, "test-org-1")
require.NoError(t, err)
assert.Len(t, routes, 1)
assert.Equal(t, route.Expression, routes[0].Expression)
// Test Update
route.Expression = `labels["severity"] == "warning"`
err = mockStore.Update(ctx, route)
require.NoError(t, err)
updated, err := mockStore.GetByID(ctx, route.ID)
require.NoError(t, err)
assert.Equal(t, `labels["severity"] == "warning"`, updated.Expression)
// Test Delete
err = mockStore.Delete(ctx, route.ID.StringValue())
require.NoError(t, err)
deleted, err := mockStore.GetByID(ctx, route.ID)
require.NoError(t, err)
assert.Nil(t, deleted)
// Test GetAllByOrgID after delete
routes, err = mockStore.GetAllByOrgID(ctx, "test-org-1")
require.NoError(t, err)
assert.Len(t, routes, 0)
}
func TestRouteStore_MultipleOrganizations(t *testing.T) {
ctx := context.Background()
mockStore := newMockRouteStore()
// Create routes for different organizations
route1 := &nfroutingtypes.ExpressionRoute{
Expression: `labels["service"] == "auth"`,
Channels: []string{"auth-team"},
OrgID: "org-1",
}
route1.ID = valuer.GenerateUUID()
route2 := &nfroutingtypes.ExpressionRoute{
Expression: `labels["service"] == "payment"`,
Channels: []string{"payment-team"},
OrgID: "org-2",
}
route2.ID = valuer.GenerateUUID()
_, err := mockStore.Create(ctx, route1)
require.NoError(t, err)
_, err = mockStore.Create(ctx, route2)
require.NoError(t, err)
// Test isolation between organizations
org1Routes, err := mockStore.GetAllByOrgID(ctx, "org-1")
require.NoError(t, err)
assert.Len(t, org1Routes, 1)
assert.Equal(t, "auth-team", org1Routes[0].Channels[0])
org2Routes, err := mockStore.GetAllByOrgID(ctx, "org-2")
require.NoError(t, err)
assert.Len(t, org2Routes, 1)
assert.Equal(t, "payment-team", org2Routes[0].Channels[0])
// Test nonexistent organization
emptyRoutes, err := mockStore.GetAllByOrgID(ctx, "nonexistent-org")
require.NoError(t, err)
assert.Len(t, emptyRoutes, 0)
}

View File

@@ -0,0 +1,7 @@
package nfrouting
import "github.com/SigNoz/signoz/pkg/statsreporter"
type NotificationRoutes interface {
statsreporter.StatsCollector
}

View File

@@ -0,0 +1,62 @@
package sqlroutingstore
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/sqlstore"
routeTypes "github.com/SigNoz/signoz/pkg/types/nfroutingtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type store struct {
sqlstore sqlstore.SQLStore
}
func NewStore(sqlstore sqlstore.SQLStore) routeTypes.RouteStore {
return &store{
sqlstore: sqlstore,
}
}
func (store *store) GetByID(ctx context.Context, id valuer.UUID) (*routeTypes.ExpressionRoute, error) {
route := new(routeTypes.ExpressionRoute)
err := store.sqlstore.BunDB().NewSelect().Model(route).Where("id = ?", id).Scan(ctx)
if err != nil {
return nil, store.sqlstore.WrapNotFoundErrf(err, errors.CodeNotFound, "expression route with ID: %s does not exist", id)
}
return route, nil
}
func (store *store) Create(ctx context.Context, route *routeTypes.ExpressionRoute) (valuer.UUID, error) {
_, err := store.sqlstore.BunDB().NewInsert().Model(route).Exec(ctx)
if err != nil {
return valuer.UUID{}, err
}
return route.ID, nil
}
func (store *store) Update(ctx context.Context, route *routeTypes.ExpressionRoute) error {
_, err := store.sqlstore.BunDB().NewUpdate().Model(route).WherePK().Exec(ctx)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "unable to update expression route with ID: %s", route.ID)
}
return nil
}
func (store *store) Delete(ctx context.Context, id string) error {
_, err := store.sqlstore.BunDB().NewDelete().Model((*routeTypes.ExpressionRoute)(nil)).Where("id = ?", id).Exec(ctx)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "unable to delete expression route with ID: %s", id)
}
return nil
}
func (store *store) GetAllByOrgID(ctx context.Context, orgID string) ([]routeTypes.ExpressionRoute, error) {
var routes []routeTypes.ExpressionRoute
err := store.sqlstore.BunDB().NewSelect().Model(&routes).Where("org_id = ?", orgID).Scan(ctx)
if err != nil {
return nil, store.sqlstore.WrapNotFoundErrf(err, errors.CodeNotFound, "unable to fetch expression routes for orgID: %s", orgID)
}
return routes, nil
}

View File

@@ -41,6 +41,7 @@ import (
"github.com/SigNoz/signoz/pkg/contextlinks"
traceFunnelsModule "github.com/SigNoz/signoz/pkg/modules/tracefunnel"
"github.com/SigNoz/signoz/pkg/nfrouting"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/inframetrics"
@@ -148,6 +149,8 @@ type APIHandler struct {
QuerierAPI *querierAPI.API
Signoz *signoz.SigNoz
NotificationRoutesAPI *nfrouting.API
}
type APIHandlerOpts struct {
@@ -178,6 +181,8 @@ type APIHandlerOpts struct {
QuerierAPI *querierAPI.API
Signoz *signoz.SigNoz
NotificationRoutesAPI *nfrouting.API
}
// NewAPIHandler returns an APIHandler
@@ -239,6 +244,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
Signoz: opts.Signoz,
FieldsAPI: opts.FieldsAPI,
QuerierAPI: opts.QuerierAPI,
NotificationRoutesAPI: opts.NotificationRoutesAPI,
}
logsQueryBuilder := logsv4.PrepareLogsQuery
@@ -310,7 +316,7 @@ func RespondError(w http.ResponseWriter, apiErr model.BaseApiError, data interfa
})
if err != nil {
zap.L().Error("error marshalling json response", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
render.Error(w, err)
return
}
@@ -354,7 +360,7 @@ func writeHttpResponse(w http.ResponseWriter, data interface{}) {
})
if err != nil {
zap.L().Error("error marshalling json response", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
render.Error(w, err)
return
}
@@ -617,6 +623,11 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *middleware.AuthZ) {
router.HandleFunc("/api/v3/licenses/active", am.ViewAccess(func(rw http.ResponseWriter, req *http.Request) {
aH.LicensingAPI.Activate(rw, req)
})).Methods(http.MethodGet)
router.HandleFunc("/api/v1/notification-routes", am.ViewAccess(aH.NotificationRoutesAPI.GetAllNotificationRoutesByOrgID)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/notification-routes/{id}", am.ViewAccess(aH.NotificationRoutesAPI.GetNotificationRouteByID)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/notification-routes", am.AdminAccess(aH.NotificationRoutesAPI.CreateNotificationRoute)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/notification-routes/{id}", am.AdminAccess(aH.NotificationRoutesAPI.DeleteNotificationRouteByID)).Methods(http.MethodDelete)
}
func (ah *APIHandler) MetricExplorerRoutes(router *mux.Router, am *middleware.AuthZ) {

View File

@@ -3,6 +3,9 @@ package app
import (
"context"
"fmt"
"github.com/SigNoz/signoz/pkg/nfrouting"
"github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"log/slog"
"net"
"net/http"
@@ -16,7 +19,6 @@ import (
"github.com/SigNoz/signoz/pkg/licensing/nooplicensing"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querier"
querierAPI "github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
@@ -85,6 +87,9 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
signoz.Cache,
)
ruleStore := sqlrulestore.NewRuleStore(signoz.SQLStore)
routeManager := alertmanager.NewManagerWithChannelRoutingStrategy(signoz.Alertmanager, ruleStore, signoz.RouteStore)
rm, err := makeRulesManager(
reader,
signoz.Cache,
@@ -95,6 +100,8 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
signoz.Modules.OrgGetter,
signoz.Querier,
signoz.Instrumentation.Logger(),
ruleStore,
routeManager,
)
if err != nil {
return nil, err
@@ -115,11 +122,12 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
CloudIntegrationsController: cloudIntegrationsController,
LogsParsingPipelineController: logParsingPipelineController,
FluxInterval: config.Querier.FluxInterval,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager, routeManager),
LicensingAPI: nooplicensing.NewLicenseAPI(),
FieldsAPI: fields.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.TelemetryStore),
Signoz: signoz,
QuerierAPI: querierAPI.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.Querier, signoz.Analytics),
NotificationRoutesAPI: nfrouting.NewAPI(signoz.Analytics, signoz.RouteStore, routeManager),
})
if err != nil {
return nil, err
@@ -380,17 +388,7 @@ func (s *Server) Stop(ctx context.Context) error {
return nil
}
func makeRulesManager(
ch interfaces.Reader,
cache cache.Cache,
alertmanager alertmanager.Alertmanager,
sqlstore sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus,
orgGetter organization.Getter,
querier querier.Querier,
logger *slog.Logger,
) (*rules.Manager, error) {
func makeRulesManager(ch interfaces.Reader, cache cache.Cache, alertmanager alertmanager.Alertmanager, sqlstore sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, orgGetter organization.Getter, querier querierAPI.Querier, logger *slog.Logger, ruleStore ruletypes.RuleStore, routeManager *alertmanager.RouteManager) (*rules.Manager, error) {
// create manager opts
managerOpts := &rules.ManagerOptions{
TelemetryStore: telemetryStore,
@@ -405,9 +403,11 @@ func makeRulesManager(
SQLStore: sqlstore,
OrgGetter: orgGetter,
Alertmanager: alertmanager,
RoutingManager: routeManager,
RuleStore: ruleStore,
}
// create Manager
// create RouteManager
manager, err := rules.NewManager(managerOpts)
if err != nil {
return nil, fmt.Errorf("rule manager error: %v", err)

View File

@@ -123,7 +123,13 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader
if p.RuleCondition == nil || !p.RuleCondition.IsValid() {
return nil, fmt.Errorf("invalid rule condition")
}
//will remove later post multi threshsold
if p.NotificationPolicies {
p.Labels["notification_policy"] = "true"
} else {
p.Labels["notification_policy"] = "false"
}
p.Labels["threshold.name"] = p.Labels["severity"]
baseRule := &BaseRule{
id: id,
orgID: orgID,

View File

@@ -14,8 +14,6 @@ import (
"errors"
"github.com/go-openapi/strfmt"
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/modules/organization"
@@ -31,6 +29,7 @@ import (
"github.com/SigNoz/signoz/pkg/types/authtypes"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/go-openapi/strfmt"
)
type PrepareTaskOptions struct {
@@ -101,6 +100,8 @@ type ManagerOptions struct {
Alertmanager alertmanager.Alertmanager
SQLStore sqlstore.SQLStore
OrgGetter organization.Getter
RoutingManager *alertmanager.RouteManager
RuleStore ruletypes.RuleStore
}
// The Manager manages recording and alerting rules.
@@ -120,9 +121,10 @@ type Manager struct {
prepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
prepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
alertmanager alertmanager.Alertmanager
sqlstore sqlstore.SQLStore
orgGetter organization.Getter
alertmanager alertmanager.Alertmanager
sqlstore sqlstore.SQLStore
orgGetter organization.Getter
routingManager *alertmanager.RouteManager
}
func defaultOptions(o *ManagerOptions) *ManagerOptions {
@@ -202,13 +204,12 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
// by calling the Run method.
func NewManager(o *ManagerOptions) (*Manager, error) {
o = defaultOptions(o)
ruleStore := sqlrulestore.NewRuleStore(o.SQLStore)
maintenanceStore := sqlrulestore.NewMaintenanceStore(o.SQLStore)
m := &Manager{
tasks: map[string]Task{},
rules: map[string]Rule{},
ruleStore: ruleStore,
ruleStore: o.RuleStore,
maintenanceStore: maintenanceStore,
opts: o,
block: make(chan struct{}),
@@ -220,6 +221,7 @@ func NewManager(o *ManagerOptions) (*Manager, error) {
alertmanager: o.Alertmanager,
sqlstore: o.SQLStore,
orgGetter: o.OrgGetter,
routingManager: o.RoutingManager,
}
return m, nil
@@ -351,33 +353,11 @@ func (m *Manager) EditRule(ctx context.Context, ruleStr string, id valuer.UUID)
existingRule.Data = ruleStr
return m.ruleStore.EditRule(ctx, existingRule, func(ctx context.Context) error {
cfg, err := m.alertmanager.GetConfig(ctx, claims.OrgID)
if err != nil {
return err
}
var preferredChannels []string
if len(parsedRule.PreferredChannels) == 0 {
channels, err := m.alertmanager.ListChannels(ctx, claims.OrgID)
if err != nil {
return err
}
for _, channel := range channels {
preferredChannels = append(preferredChannels, channel.Name)
}
if parsedRule.NotificationPolicies {
err = m.routingManager.AddNotificationPolicyRules(ctx, orgID.StringValue(), id.StringValue(), *parsedRule)
} else {
preferredChannels = parsedRule.PreferredChannels
}
err = cfg.UpdateRuleIDMatcher(id.StringValue(), preferredChannels)
if err != nil {
return err
}
err = m.alertmanager.SetConfig(ctx, cfg)
if err != nil {
return err
err = m.routingManager.AddDirectRules(ctx, orgID.StringValue(), id.StringValue(), *parsedRule)
}
err = m.syncRuleStateWithTask(ctx, orgID, prepareTaskName(existingRule.ID.StringValue()), parsedRule)
@@ -457,25 +437,21 @@ func (m *Manager) DeleteRule(ctx context.Context, idStr string) error {
return err
}
_, err = m.ruleStore.GetStoredRule(ctx, id)
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
return err
}
storedRule, err := m.GetRule(ctx, id)
if err != nil {
return err
}
return m.ruleStore.DeleteRule(ctx, id, func(ctx context.Context) error {
cfg, err := m.alertmanager.GetConfig(ctx, claims.OrgID)
if err != nil {
return err
}
err = cfg.DeleteRuleIDMatcher(id.StringValue())
if err != nil {
return err
}
err = m.alertmanager.SetConfig(ctx, cfg)
if err != nil {
return err
if storedRule.NotificationPolicies {
err = m.routingManager.DeleteNotificationPolicyRules(ctx, orgID.StringValue(), id.StringValue())
} else {
err = m.routingManager.DeleteDirectRules(ctx, orgID.StringValue(), id.StringValue())
}
taskName := prepareTaskName(id.StringValue())
@@ -537,31 +513,12 @@ func (m *Manager) CreateRule(ctx context.Context, ruleStr string) (*ruletypes.Ge
}
id, err := m.ruleStore.CreateRule(ctx, storedRule, func(ctx context.Context, id valuer.UUID) error {
cfg, err := m.alertmanager.GetConfig(ctx, claims.OrgID)
if err != nil {
return err
}
var preferredChannels []string
if len(parsedRule.PreferredChannels) == 0 {
channels, err := m.alertmanager.ListChannels(ctx, claims.OrgID)
if err != nil {
return err
}
for _, channel := range channels {
preferredChannels = append(preferredChannels, channel.Name)
}
if parsedRule.NotificationPolicies {
err = m.routingManager.AddNotificationPolicyRules(ctx, orgID.StringValue(), id.StringValue(), *parsedRule)
} else {
preferredChannels = parsedRule.PreferredChannels
err = m.routingManager.AddDirectRules(ctx, orgID.StringValue(), id.StringValue(), *parsedRule)
}
err = cfg.CreateRuleIDMatcher(id.StringValue(), preferredChannels)
if err != nil {
return err
}
err = m.alertmanager.SetConfig(ctx, cfg)
if err != nil {
return err
}

View File

@@ -56,6 +56,7 @@ func NewTestSqliteDB(t *testing.T) (sqlStore sqlstore.SQLStore, testDBFilePath s
sqlmigration.NewUpdateInvitesFactory(sqlStore),
sqlmigration.NewUpdatePatFactory(sqlStore),
sqlmigration.NewUpdateAlertmanagerFactory(sqlStore),
sqlmigration.NewMigrateAlertmanagerRoutesFactory(),
sqlmigration.NewUpdatePreferencesFactory(sqlStore),
sqlmigration.NewUpdateApdexTtlFactory(sqlStore),
sqlmigration.NewUpdateResetPasswordFactory(sqlStore),

View File

@@ -16,6 +16,8 @@ import (
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/nfrouting"
"github.com/SigNoz/signoz/pkg/nfrouting/expressionroutes"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/prometheus/clickhouseprometheus"
"github.com/SigNoz/signoz/pkg/querier"
@@ -37,6 +39,7 @@ import (
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/clickhousetelemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystorehook"
"github.com/SigNoz/signoz/pkg/types/nfroutingtypes"
"github.com/SigNoz/signoz/pkg/version"
"github.com/SigNoz/signoz/pkg/web"
"github.com/SigNoz/signoz/pkg/web/noopweb"
@@ -132,6 +135,8 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewQueryBuilderV5MigrationFactory(sqlstore, telemetryStore),
sqlmigration.NewAddMeterQuickFiltersFactory(sqlstore, sqlschema),
sqlmigration.NewUpdateTTLSettingForCustomRetentionFactory(sqlstore, sqlschema),
sqlmigration.NewAddNotificationRoutesFactory(sqlstore, sqlschema),
sqlmigration.NewMigrateAlertmanagerRoutesFactory(),
)
}
@@ -193,3 +198,9 @@ func NewQuerierProviderFactories(telemetryStore telemetrystore.TelemetryStore, p
signozquerier.NewFactory(telemetryStore, prometheus, cache),
)
}
func NewNotificationRoutingProviderFactories(routeStore nfroutingtypes.RouteStore) factory.NamedMap[factory.ProviderFactory[nfrouting.NotificationRoutes, nfrouting.Config]] {
return factory.MustNewNamedMap(
expressionroutes.NewFactory(routeStore),
)
}

View File

@@ -2,6 +2,9 @@ package signoz
import (
"context"
"github.com/SigNoz/signoz/pkg/nfrouting"
"github.com/SigNoz/signoz/pkg/nfrouting/nfroutingstore/sqlroutingstore"
"github.com/SigNoz/signoz/pkg/types/nfroutingtypes"
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/analytics"
@@ -49,6 +52,7 @@ type SigNoz struct {
StatsReporter statsreporter.StatsReporter
Modules Modules
Handlers Handlers
RouteStore nfroutingtypes.RouteStore
}
func New(
@@ -254,6 +258,20 @@ func New(
return nil, err
}
routeStore := sqlroutingstore.NewStore(sqlstore)
notificationRoutes, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
nfrouting.Config{
Provider: "expression",
},
NewNotificationRoutingProviderFactories(routeStore),
"expression",
)
if err != nil {
return nil, err
}
licensingProviderFactory := licenseProviderFactory(sqlstore, zeus, orgGetter, analytics)
licensing, err := licensingProviderFactory.New(
ctx,
@@ -278,6 +296,7 @@ func New(
modules.SavedView,
modules.User,
licensing,
notificationRoutes,
}
// Initialize stats reporter from the available stats reporter provider factories
@@ -321,5 +340,6 @@ func New(
Sharder: sharder,
Modules: modules,
Handlers: handlers,
RouteStore: routeStore,
}, nil
}

View File

@@ -0,0 +1,113 @@
package sqlmigration
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlschema"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type addNotificationRoutes struct {
sqlstore sqlstore.SQLStore
sqlschema sqlschema.SQLSchema
}
func NewAddNotificationRoutesFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("add_notification_routes"), func(ctx context.Context, providerSettings factory.ProviderSettings, config Config) (SQLMigration, error) {
return newAddNotificationRoutes(ctx, providerSettings, config, sqlstore, sqlschema)
})
}
func newAddNotificationRoutes(_ context.Context, _ factory.ProviderSettings, _ Config, sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) (SQLMigration, error) {
return &addNotificationRoutes{
sqlstore: sqlstore,
sqlschema: sqlschema,
}, nil
}
func (migration *addNotificationRoutes) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addNotificationRoutes) Up(ctx context.Context, db *bun.DB) error {
type identifiable struct {
ID valuer.UUID `json:"id" bun:"id,pk,type:text"`
}
type timeAuditable struct {
CreatedAt time.Time `bun:"created_at" json:"createdAt"`
UpdatedAt time.Time `bun:"updated_at" json:"updatedAt"`
}
type userAuditable struct {
CreatedBy string `bun:"created_by,type:text" json:"createdBy"`
UpdatedBy string `bun:"updated_by,type:text" json:"updatedBy"`
}
type expressionRoute struct {
bun.BaseModel `bun:"table:notification_routes"`
identifiable
timeAuditable
userAuditable
// Core routing fields
Expression string `bun:"expression,type:text,notnull" json:"expression"`
// Action configuration (stored as JSON)
Channels []string `bun:"channels,type:jsonb" json:"channels"`
Priority string `bun:"priority,type:text" json:"priority"`
// Extensibility fields
Name string `bun:"name,type:text" json:"name"`
Description string `bun:"description,type:text" json:"description"`
Enabled bool `bun:"enabled,type:boolean,default:true" json:"enabled"`
Tags []string `bun:"tags,type:jsonb" json:"tags,omitempty"`
// Organization/tenant isolation
OrgID string `bun:"org_id,type:text,notnull" json:"orgId"`
}
// Create the notification_routes table
_, err := db.NewCreateTable().
Model((*expressionRoute)(nil)).
IfNotExists().
Exec(ctx)
if err != nil {
return err
}
// Create indexes for better performance
indexes := []string{
"CREATE INDEX IF NOT EXISTS idx_notification_routes_org_id ON notification_routes (org_id)",
"CREATE INDEX IF NOT EXISTS idx_notification_routes_enabled ON notification_routes (enabled)",
"CREATE INDEX IF NOT EXISTS idx_notification_routes_org_enabled ON notification_routes (org_id, enabled)",
"CREATE INDEX IF NOT EXISTS idx_notification_routes_created_at ON notification_routes (created_at)",
}
for _, indexSQL := range indexes {
_, err := db.ExecContext(ctx, indexSQL)
if err != nil {
return err
}
}
return nil
}
func (migration *addNotificationRoutes) Down(ctx context.Context, db *bun.DB) error {
// Drop the table if it exists
_, err := db.NewDropTable().
Table("notification_routes").
IfExists().
Exec(ctx)
return err
}

View File

@@ -0,0 +1,146 @@
package sqlmigration
import (
"context"
"github.com/SigNoz/signoz/pkg/alertmanager/routestrategy"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/prometheus/alertmanager/config"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"log/slog"
"strings"
)
type migrateAlertmanagerRoutes struct {
strategy routestrategy.RoutingStrategy
}
func NewMigrateAlertmanagerRoutesFactory() factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("migrate_alertmanager_routes"), newMigrateAlertmanagerRoutes)
}
func newMigrateAlertmanagerRoutes(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
strategy := routestrategy.NewChannelRoutingStrategy()
return &migrateAlertmanagerRoutes{strategy: strategy}, nil
}
func (migration *migrateAlertmanagerRoutes) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *migrateAlertmanagerRoutes) Up(ctx context.Context, db *bun.DB) error {
// Get existing alertmanager configs to extract receiver->ruleIds mappings
configRows, err := db.NewSelect().
Table("alertmanager_config").
Rows(ctx)
if err != nil {
return err
}
// Map to store ruleId -> receivers mappings from existing configs
ruleReceiverMap := make(map[string][]string)
var amConfig *alertmanagertypes.Config
for configRows.Next() {
var storableConfig alertmanagertypes.StoreableConfig
if err := db.ScanRow(ctx, configRows, &storableConfig); err != nil {
continue
}
amConfig, err = alertmanagertypes.NewConfigFromStoreableConfig(&storableConfig)
if err != nil {
return err
}
extractReceiverRuleMappings(amConfig.AlertmanagerConfig().Route, ruleReceiverMap)
}
if err := configRows.Close(); err != nil {
slog.Error("Failed to close config rows", "error", err)
}
if amConfig == nil {
return nil // No configs found, nothing to migrate
}
amConfig.AlertmanagerConfig().Route.Routes = nil
ruleRows, err := db.NewSelect().
Table("rule").
Where("deleted = 0").
Rows(ctx)
if err != nil {
return err
}
for ruleRows.Next() {
var dbRule ruletypes.Rule
if err := db.ScanRow(ctx, ruleRows, &dbRule); err != nil {
continue
}
postableRule, err := ruletypes.ParsePostableRule([]byte(dbRule.Data))
if err != nil {
continue
}
severity := postableRule.Labels["severity"]
if severity == "" {
severity = "critical"
}
if _, ok := ruleReceiverMap[dbRule.ID.String()]; !ok {
continue
}
postableRule.Thresholds = map[string][]string{
severity: ruleReceiverMap[dbRule.ID.String()],
}
if err := migration.strategy.AddDirectRules(amConfig, dbRule.ID.String(), *postableRule); err != nil {
return err
}
}
if err := ruleRows.Close(); err != nil {
slog.Error("Failed to close rule rows", "error", err)
}
amConfig.UpdateStoreableConfig()
// Update the existing config in the database
_, err = db.NewUpdate().
Model(amConfig.StoreableConfig()).
Where("id = ?", amConfig.StoreableConfig().ID).
Exec(ctx)
if err != nil {
return err
}
return nil
}
// extractReceiverRuleMappings extracts ruleId->receivers mappings from alertmanager routes
func extractReceiverRuleMappings(route *config.Route, ruleReceiverMap map[string][]string) {
if route == nil {
return
}
// Check matchers for ruleId patterns
for _, matcher := range route.Matchers {
if matcher.Name == "ruleId" {
ruleIds := strings.Split(matcher.Value, "|")
for _, ruleId := range ruleIds {
if ruleId == "-1" {
continue
}
ruleReceiverMap[ruleId] = append(ruleReceiverMap[ruleId], route.Receiver)
}
}
}
// Recursively check child routes
for _, childRoute := range route.Routes {
extractReceiverRuleMappings(childRoute, ruleReceiverMap)
}
}
func (migration *migrateAlertmanagerRoutes) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@@ -206,13 +206,6 @@ func (c *Config) CreateReceiver(receiver config.Receiver) error {
return errors.New(errors.TypeInvalidInput, ErrCodeAlertmanagerConfigConflict, "the receiver name has to be unique, please choose a different name")
}
}
route, err := NewRouteFromReceiver(receiver)
if err != nil {
return err
}
c.alertmanagerConfig.Route.Routes = append(c.alertmanagerConfig.Route.Routes, route)
c.alertmanagerConfig.Receivers = append(c.alertmanagerConfig.Receivers, receiver)
if err := c.alertmanagerConfig.UnmarshalYAML(func(i interface{}) error { return nil }); err != nil {
@@ -302,6 +295,12 @@ func (c *Config) CreateRuleIDMatcher(ruleID string, receiverNames []string) erro
return nil
}
func (c *Config) UpdateStoreableConfig() {
c.storeableConfig.Config = string(newRawFromConfig(c.alertmanagerConfig))
c.storeableConfig.Hash = fmt.Sprintf("%x", newConfigHash(c.storeableConfig.Config))
c.storeableConfig.UpdatedAt = time.Now()
}
func (c *Config) UpdateRuleIDMatcher(ruleID string, receiverNames []string) error {
err := c.DeleteRuleIDMatcher(ruleID)
if err != nil {

View File

@@ -3,6 +3,7 @@ package alertmanagertypes
import (
"slices"
"strings"
"time"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/pkg/labels"
@@ -13,6 +14,12 @@ const (
ruleIDMatcherValueSep string = "|"
)
// RuleGrouping defines per-rule grouping configuration
type RuleGrouping struct {
GroupBy []string `json:"group_by"` // [service.name, instance]
RepeatInterval time.Duration `json:"repeat_interval"` // 12h
}
var (
// noRuleIDMatcher is a matcher that matches no ruleId.
// This is used to ensure that when a new receiver is created, it does not start matching any ruleId.

View File

@@ -21,10 +21,6 @@ func NewRouteFromRouteConfig(route *config.Route, cfg RouteConfig) (*config.Rout
route.RepeatInterval = (*model.Duration)(&cfg.RepeatInterval)
}
if err := route.UnmarshalYAML(func(i interface{}) error { return nil }); err != nil {
return nil, err
}
return route, nil
}
@@ -36,3 +32,16 @@ func NewRouteFromReceiver(receiver Receiver) (*config.Route, error) {
return route, nil
}
func UnmarshalRouteConfig(route *config.Route) error {
for _, childRoute := range route.Routes {
if err := UnmarshalRouteConfig(childRoute); err != nil {
return err
}
}
if err := route.UnmarshalYAML(func(i interface{}) error { return nil }); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,141 @@
package nfroutingtypes
import (
"context"
"fmt"
"log/slog"
"os"
"strings"
"sync"
"time"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/prometheus/alertmanager/featurecontrol"
"github.com/prometheus/alertmanager/matcher/compat"
"github.com/uptrace/bun"
)
type Actions struct {
Channels []string `json:"channels"`
Priority string `json:"priority"`
}
type ExpressionRouteRequest struct {
Expression string `json:"expression"`
Actions Actions `json:"actions"`
Name string `bun:"name,type:text" json:"name"`
Description string `bun:"description,type:text" json:"description"`
Tags []string `bun:"tags,type:jsonb" json:"tags,omitempty"`
}
func (req *ExpressionRouteRequest) Validate() error {
if req.Expression == "" {
return fmt.Errorf("expression required")
} else {
if !CanConvertToPrometheusMatchers(req.Expression) {
return fmt.Errorf("expression cannot be converted to valid Prometheus matchers")
}
}
if req.Name == "" {
return fmt.Errorf("name required")
}
if req.Actions.Channels == nil || len(req.Actions.Channels) == 0 {
return fmt.Errorf("channels required")
}
return nil
}
// ToExpressionRoute converts ExpressionRouteRequest to ExpressionRoute
func (req *ExpressionRouteRequest) ToExpressionRoute(orgID, userID string) *ExpressionRoute {
return &ExpressionRoute{
Expression: req.Expression,
Channels: req.Actions.Channels,
Priority: req.Actions.Priority,
Name: req.Name,
Description: req.Description,
Enabled: true, // Default to enabled
Tags: req.Tags,
OrgID: orgID,
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
UserAuditable: types.UserAuditable{
CreatedBy: userID,
UpdatedBy: userID,
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
}
}
type ExpressionRoute struct {
bun.BaseModel `bun:"table:notification_routes"`
types.Identifiable
types.TimeAuditable
types.UserAuditable
// Core routing fields
Expression string `bun:"expression,type:text,notnull" json:"expression"`
// Action configuration (stored as JSON)
Channels []string `bun:"channels,type:jsonb" json:"channels"`
Priority string `bun:"priority,type:text" json:"priority"`
// Extensibility fields
Name string `bun:"name,type:text" json:"name"`
Description string `bun:"description,type:text" json:"description"`
Enabled bool `bun:"enabled,type:boolean,default:true" json:"enabled"`
Tags []string `bun:"tags,type:jsonb" json:"tags,omitempty"`
// Organization/tenant isolation
OrgID string `bun:"org_id,type:text,notnull" json:"orgId"`
}
type RouteStore interface {
GetByID(ctx context.Context, id valuer.UUID) (*ExpressionRoute, error)
Create(ctx context.Context, route *ExpressionRoute) (valuer.UUID, error)
Update(ctx context.Context, route *ExpressionRoute) error
Delete(ctx context.Context, id string) error
GetAllByOrgID(ctx context.Context, orgID string) ([]ExpressionRoute, error)
}
var (
initOnce sync.Once
)
// CanConvertToPrometheusMatchers checks if an expression can be converted to valid Prometheus matchers
func CanConvertToPrometheusMatchers(expression string) bool {
expression = strings.TrimSpace(expression)
if expression == "" {
return false
}
// Initialize parser once
initOnce.Do(func() {
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
compat.InitFromFlags(logger, featurecontrol.NoopFlags{})
})
// Split by logical operators and validate each part
parts := strings.FieldsFunc(expression, func(r rune) bool {
return r == '&' || r == '|'
})
for _, part := range parts {
part = strings.Trim(strings.TrimSpace(part), "()")
if part == "" {
continue
}
if _, err := compat.Matcher(part, "noop"); err != nil {
return false
}
}
return true
}

View File

@@ -62,11 +62,41 @@ type PostableRule struct {
PreferredChannels []string `json:"preferredChannels,omitempty"`
Version string `json:"version,omitempty"`
// Grouping configuration for this rule
GroupBy []string `json:"groupBy,omitempty"` // [service.name, instance]
Version string `json:"version,omitempty"`
// Threshold-based routing configuration
ThresholdMapping map[string][]string `json:"thresholdMapping,omitempty"` // {"critical": "critical-receiver", "warning": "warning-receiver"}
// legacy
Expr string `yaml:"expr,omitempty" json:"expr,omitempty"`
OldYaml string `json:"yaml,omitempty"`
Expr string `yaml:"expr,omitempty" json:"expr,omitempty"`
OldYaml string `json:"yaml,omitempty"`
NotificationPolicies bool `yaml:"notificationPolicies,omitempty" json:"notificationPolicies,omitempty"`
Renotify Duration `yaml:"renotify,omitempty" json:"renotify,omitempty"`
Thresholds map[string][]string `yaml:"thresholds,omitempty" json:"thresholds,omitempty"`
}
// GetRuleGrouping extracts grouping configuration from PostableRule and provides defaults
func (pr *PostableRule) GetRuleGrouping() RuleGrouping {
grouping := RuleGrouping{}
// Set GroupBy - default to empty if not provided
if len(pr.GroupBy) > 0 {
grouping.GroupBy = pr.GroupBy
} else {
grouping.GroupBy = []string{} // Default: group by all labels
}
return grouping
}
// RuleGrouping matches the structure in alertmanagertypes
type RuleGrouping struct {
GroupBy []string `json:"group_by"`
GroupWait time.Duration `json:"group_wait"`
GroupInterval time.Duration `json:"group_interval"`
RepeatInterval time.Duration `json:"repeat_interval"`
}
func ParsePostableRule(content []byte) (*PostableRule, error) {
@@ -136,6 +166,10 @@ func ParseIntoRule(initRule PostableRule, content []byte, kind RuleDataKind) (*P
}
}
if rule.Renotify == 0 {
rule.Renotify = Duration(4 * time.Hour) //default time
}
if err := rule.Validate(); err != nil {
return nil, err
}
@@ -148,7 +182,7 @@ func isValidLabelName(ln string) bool {
return false
}
for i, b := range ln {
if !((b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || b == '_' || (b >= '0' && b <= '9' && i > 0)) {
if !((b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || b == '_' || b == '.' || (b >= '0' && b <= '9' && i > 0)) {
return false
}
}