Compare commits
11 Commits
main
...
alertmanag
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d05e279bcd | ||
|
|
f1c5d873f7 | ||
|
|
aec239cc7c | ||
|
|
59e26652dc | ||
|
|
e02afc5e97 | ||
|
|
3eac8ac30b | ||
|
|
382c4f58e1 | ||
|
|
73ea632a3f | ||
|
|
00fa8810c0 | ||
|
|
6cee330d44 | ||
|
|
871c6e642c |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -76,3 +76,5 @@ dist/
|
||||
|
||||
# ignore user_scripts that is fetched by init-clickhouse
|
||||
deploy/common/clickhouse/user_scripts/
|
||||
# queries.active
|
||||
queries.active
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
basemodel "go.signoz.io/signoz/pkg/query-service/model"
|
||||
rules "go.signoz.io/signoz/pkg/query-service/rules"
|
||||
"go.signoz.io/signoz/pkg/query-service/version"
|
||||
"go.signoz.io/signoz/pkg/signoz"
|
||||
)
|
||||
|
||||
type APIHandlerOptions struct {
|
||||
@@ -41,6 +42,7 @@ type APIHandlerOptions struct {
|
||||
FluxInterval time.Duration
|
||||
UseLogsNewSchema bool
|
||||
UseTraceNewSchema bool
|
||||
SigNoz *signoz.SigNoz
|
||||
}
|
||||
|
||||
type APIHandler struct {
|
||||
@@ -65,6 +67,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
|
||||
FluxInterval: opts.FluxInterval,
|
||||
UseLogsNewSchema: opts.UseLogsNewSchema,
|
||||
UseTraceNewSchema: opts.UseTraceNewSchema,
|
||||
SigNoz: opts.SigNoz,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -269,6 +269,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
GatewayUrl: serverOptions.GatewayUrl,
|
||||
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
|
||||
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
|
||||
SigNoz: serverOptions.SigNoz,
|
||||
}
|
||||
|
||||
apiHandler, err := api.NewAPIHandler(apiOpts)
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
@@ -87,6 +86,8 @@ func init() {
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
var promConfigPath, skipTopLvlOpsPath string
|
||||
|
||||
// disables rule execution but allows change to the rule definition
|
||||
@@ -191,20 +192,20 @@ func main() {
|
||||
zap.L().Fatal("Could not start server", zap.Error(err))
|
||||
}
|
||||
|
||||
if err := auth.InitAuthCache(context.Background()); err != nil {
|
||||
if err := auth.InitAuthCache(ctx); err != nil {
|
||||
zap.L().Fatal("Failed to initialize auth cache", zap.Error(err))
|
||||
}
|
||||
|
||||
signalsChannel := make(chan os.Signal, 1)
|
||||
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
|
||||
if err := signoz.Start(ctx); err != nil {
|
||||
zap.L().Fatal("Failed to start signoz", zap.Error(err))
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case status := <-server.HealthCheckStatus():
|
||||
zap.L().Info("Received HealthCheck status: ", zap.Int("status", int(status)))
|
||||
case <-signalsChannel:
|
||||
zap.L().Fatal("Received OS Interrupt Signal ... ")
|
||||
server.Stop()
|
||||
}
|
||||
if err := signoz.Wait(ctx); err != nil {
|
||||
zap.L().Fatal("Failed to wait for signoz", zap.Error(err))
|
||||
}
|
||||
|
||||
server.Stop()
|
||||
if err := signoz.Stop(ctx); err != nil {
|
||||
zap.L().Fatal("Failed to stop signoz", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,89 @@
|
||||
package alertmanagerstoretest
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
|
||||
"go.signoz.io/signoz/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
var _ alertmanagerstore.Store = (*Provider)(nil)
|
||||
|
||||
type Provider struct {
|
||||
States map[string]map[alertmanagertypes.StateName][]byte
|
||||
Configs map[string]string
|
||||
OrgIDs []string
|
||||
}
|
||||
|
||||
func New(ctx context.Context, settings factory.ProviderSettings, config alertmanagerstore.Config, orgIDs []string) (*Provider, error) {
|
||||
states := make(map[string]map[alertmanagertypes.StateName][]byte)
|
||||
for _, orgID := range orgIDs {
|
||||
states[orgID] = make(map[alertmanagertypes.StateName][]byte)
|
||||
states[orgID][alertmanagertypes.SilenceStateName] = []byte{}
|
||||
states[orgID][alertmanagertypes.NFLogStateName] = []byte{}
|
||||
}
|
||||
|
||||
return &Provider{
|
||||
States: states,
|
||||
Configs: make(map[string]string),
|
||||
OrgIDs: orgIDs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (provider *Provider) GetState(ctx context.Context, orgID string, stateName alertmanagertypes.StateName) (string, error) {
|
||||
if _, ok := provider.States[orgID][stateName]; !ok {
|
||||
return "", errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerStateNotFound, "cannot find state %q for org %q", stateName, orgID)
|
||||
}
|
||||
|
||||
return string(provider.States[orgID][stateName]), nil
|
||||
}
|
||||
|
||||
func (provider *Provider) SetState(ctx context.Context, orgID string, stateName alertmanagertypes.StateName, state alertmanagertypes.State) (int64, error) {
|
||||
var err error
|
||||
provider.States[orgID][stateName], err = state.MarshalBinary()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return int64(len(provider.States[orgID][stateName])), nil
|
||||
}
|
||||
|
||||
func (provider *Provider) GetConfig(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) {
|
||||
if _, ok := provider.Configs[orgID]; !ok {
|
||||
return nil, errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerConfigNotFound, "cannot find config for org %s", orgID)
|
||||
}
|
||||
|
||||
return alertmanagertypes.NewConfigFromString(provider.Configs[orgID], orgID)
|
||||
}
|
||||
|
||||
func (provider *Provider) SetConfig(ctx context.Context, orgID string, config *alertmanagertypes.Config) error {
|
||||
provider.Configs[orgID] = string(config.Raw())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *Provider) DelConfig(ctx context.Context, orgID string) error {
|
||||
delete(provider.Configs, orgID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *Provider) ListOrgIDs(ctx context.Context) ([]string, error) {
|
||||
return provider.OrgIDs, nil
|
||||
}
|
||||
|
||||
func (provider *Provider) ListChannels(ctx context.Context, orgID string) (alertmanagertypes.Channels, error) {
|
||||
if _, ok := provider.Configs[orgID]; !ok {
|
||||
return nil, errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerConfigNotFound, "cannot find config for org %s", orgID)
|
||||
}
|
||||
|
||||
config, err := alertmanagertypes.NewConfigFromString(provider.Configs[orgID], orgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return config.Channels(), nil
|
||||
}
|
||||
|
||||
func (provider *Provider) GetChannel(ctx context.Context, orgID string, id uint64) (*alertmanagertypes.Channel, error) {
|
||||
return nil, nil
|
||||
}
|
||||
17
pkg/alertmanager/alertmanagerstore/config.go
Normal file
17
pkg/alertmanager/alertmanagerstore/config.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package alertmanagerstore
|
||||
|
||||
import "go.signoz.io/signoz/pkg/factory"
|
||||
|
||||
type Config struct {
|
||||
Provider string `mapstructure:"provider"`
|
||||
}
|
||||
|
||||
func NewConfig() factory.Config {
|
||||
return Config{
|
||||
Provider: "sql",
|
||||
}
|
||||
}
|
||||
|
||||
func (c Config) Validate() error {
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,248 @@
|
||||
package sqlalertmanagerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
|
||||
"go.signoz.io/signoz/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
settings factory.ScopedProviderSettings
|
||||
}
|
||||
|
||||
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanagerstore.Store, alertmanagerstore.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("sql"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanagerstore.Config) (alertmanagerstore.Store, error) {
|
||||
return New(ctx, settings, config, sqlstore)
|
||||
})
|
||||
}
|
||||
|
||||
func New(ctx context.Context, settings factory.ProviderSettings, config alertmanagerstore.Config, sqlstore sqlstore.SQLStore) (*provider, error) {
|
||||
return &provider{
|
||||
sqlstore: sqlstore,
|
||||
settings: factory.NewScopedProviderSettings(settings, "go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (provider *provider) GetState(ctx context.Context, orgID string, stateName alertmanagertypes.StateName) (string, error) {
|
||||
storedConfig := new(alertmanagertypes.StoredConfig)
|
||||
|
||||
err := provider.
|
||||
sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(storedConfig).
|
||||
Where("org_id = ?", orgID).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return "", errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerStateNotFound, "cannot find alertmanager state for org %s", orgID)
|
||||
}
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
if stateName == alertmanagertypes.SilenceStateName {
|
||||
decodedState, err := base64.RawStdEncoding.DecodeString(storedConfig.SilencesState)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(decodedState), nil
|
||||
}
|
||||
|
||||
if stateName == alertmanagertypes.NFLogStateName {
|
||||
decodedState, err := base64.RawStdEncoding.DecodeString(storedConfig.NFLogState)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(decodedState), nil
|
||||
}
|
||||
|
||||
return "", errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerStateNotFound, "cannot find alertmanager state for org %s", orgID)
|
||||
}
|
||||
|
||||
func (provider *provider) SetState(ctx context.Context, orgID string, stateName alertmanagertypes.StateName, state alertmanagertypes.State) (int64, error) {
|
||||
marshalledState, err := state.MarshalBinary()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
encodedState := base64.StdEncoding.EncodeToString(marshalledState)
|
||||
|
||||
q := provider.
|
||||
sqlstore.
|
||||
BunDB().
|
||||
NewUpdate().
|
||||
Model(&alertmanagertypes.StoredConfig{}).
|
||||
Where("org_id = ?", orgID)
|
||||
|
||||
if stateName == alertmanagertypes.SilenceStateName {
|
||||
q.Set("silences_state = ?", encodedState)
|
||||
}
|
||||
|
||||
if stateName == alertmanagertypes.NFLogStateName {
|
||||
q.Set("nflog_state = ?", encodedState)
|
||||
}
|
||||
|
||||
_, err = q.Exec(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return int64(len(marshalledState)), nil
|
||||
}
|
||||
|
||||
func (provider *provider) GetConfig(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) {
|
||||
storedConfig := new(alertmanagertypes.StoredConfig)
|
||||
|
||||
err := provider.
|
||||
sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(storedConfig).
|
||||
Where("org_id = ?", orgID).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerConfigNotFound, "cannot find alertmanager config for org %s", orgID)
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
config, err := alertmanagertypes.NewConfigFromString(storedConfig.Config, orgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func (provider *provider) SetConfig(ctx context.Context, orgID string, config *alertmanagertypes.Config) error {
|
||||
tx, err := provider.sqlstore.BunDB().BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer tx.Rollback() //nolint:errcheck
|
||||
|
||||
if _, err = tx.
|
||||
NewInsert().
|
||||
Model(config.StoredConfig()).
|
||||
On("CONFLICT (org_id) DO UPDATE").
|
||||
Set("config = ?", string(config.StoredConfig().Config)).
|
||||
Set("updated_at = ?", config.StoredConfig().UpdatedAt).
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
fmt.Println("channels", channels)
|
||||
if len(channels) != 0 {
|
||||
fmt.Println("channels", channels)
|
||||
if _, err = tx.NewInsert().
|
||||
Model(&channels).
|
||||
On("CONFLICT (name) DO UPDATE").
|
||||
Set("data = EXCLUDED.data").
|
||||
Set("updated_at = EXCLUDED.updated_at").
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *provider) DelConfig(ctx context.Context, orgID string) error {
|
||||
_, err := provider.
|
||||
sqlstore.
|
||||
BunDB().
|
||||
NewDelete().
|
||||
Model(&alertmanagertypes.StoredConfig{}).
|
||||
Where("org_id = ?", orgID).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = provider.
|
||||
sqlstore.
|
||||
BunDB().
|
||||
NewDelete().
|
||||
Model(&alertmanagertypes.Channel{}).
|
||||
Where("org_id = ?", orgID).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *provider) ListOrgIDs(ctx context.Context) ([]string, error) {
|
||||
var orgIDs []string
|
||||
|
||||
err := provider.
|
||||
sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Table("organizations").
|
||||
ColumnExpr("id").
|
||||
Scan(ctx, &orgIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return orgIDs, nil
|
||||
}
|
||||
|
||||
func (provider *provider) ListChannels(ctx context.Context, orgID string) (alertmanagertypes.Channels, error) {
|
||||
channels := alertmanagertypes.Channels{}
|
||||
|
||||
err := provider.
|
||||
sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(&channels).
|
||||
Where("org_id = ?", orgID).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return channels, nil
|
||||
}
|
||||
|
||||
func (provider *provider) GetChannel(ctx context.Context, orgID string, id uint64) (*alertmanagertypes.Channel, error) {
|
||||
channel := new(alertmanagertypes.Channel)
|
||||
|
||||
err := provider.
|
||||
sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(channel).
|
||||
Where("org_id = ?", orgID).
|
||||
Where("id = ?", id).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerChannelNotFound, "cannot find channel for org %s", orgID)
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return channel, nil
|
||||
}
|
||||
44
pkg/alertmanager/alertmanagerstore/store.go
Normal file
44
pkg/alertmanager/alertmanagerstore/store.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package alertmanagerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.signoz.io/signoz/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeAlertmanagerConfigNotFound = errors.MustNewCode("alertmanager_config_not_found")
|
||||
ErrCodeAlertmanagerStateNotFound = errors.MustNewCode("alertmanager_state_not_found")
|
||||
ErrCodeAlertmanagerChannelNotFound = errors.MustNewCode("alertmanager_channel_not_found")
|
||||
)
|
||||
|
||||
type Store interface {
|
||||
// Creates the silence or the notification log state and returns the number of bytes in the state.
|
||||
// The return type matches the return of `silence.Maintenance` or `nflog.Maintenance`.
|
||||
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/silence/silence.go#L217
|
||||
// and https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L94
|
||||
SetState(context.Context, string, alertmanagertypes.StateName, alertmanagertypes.State) (int64, error)
|
||||
|
||||
// Gets the silence state or the notification log state as a string from the store. This is used as a snapshot to load the
|
||||
// initial state of silences or notification log when starting the alertmanager.
|
||||
GetState(context.Context, string, alertmanagertypes.StateName) (string, error)
|
||||
|
||||
// Get an alertmanager config for an organization
|
||||
GetConfig(context.Context, string) (*alertmanagertypes.Config, error)
|
||||
|
||||
// Set an alertmanager config for an organization
|
||||
SetConfig(context.Context, string, *alertmanagertypes.Config) error
|
||||
|
||||
// Deletes the config for an organization
|
||||
DelConfig(context.Context, string) error
|
||||
|
||||
// Get all organization ids
|
||||
ListOrgIDs(context.Context) ([]string, error)
|
||||
|
||||
// Get all channels for an organization
|
||||
ListChannels(context.Context, string) (alertmanagertypes.Channels, error)
|
||||
|
||||
// Get a channel for an organization
|
||||
GetChannel(context.Context, string, uint64) (*alertmanagertypes.Channel, error)
|
||||
}
|
||||
36
pkg/alertmanager/client.go
Normal file
36
pkg/alertmanager/client.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package alertmanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
type Client interface {
|
||||
// GetAlerts gets the alerts from the alertmanager per organization.
|
||||
GetAlerts(context.Context, string, alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error)
|
||||
|
||||
// PutAlerts puts the alerts into the alertmanager per organization.
|
||||
PutAlerts(context.Context, string, alertmanagertypes.PostableAlerts) error
|
||||
|
||||
// SetConfig sets the config into the alertmanager per organization.
|
||||
SetConfig(context.Context, string, alertmanagertypes.PostableConfig) error
|
||||
|
||||
// TestReceiver sends a test alert to a receiver.
|
||||
TestReceiver(context.Context, string, alertmanagertypes.Receiver) error
|
||||
|
||||
// CreateChannel creates a channel for the organization.
|
||||
CreateChannel(context.Context, string, *alertmanagertypes.Channel) error
|
||||
|
||||
// GetChannel gets a channel for the organization.
|
||||
GetChannel(context.Context, string, uint64) (*alertmanagertypes.Channel, error)
|
||||
|
||||
// DeleteChannel deletes a channel for the organization.
|
||||
DelChannel(context.Context, string, uint64) error
|
||||
|
||||
// ListChannels lists all channels for the organization.
|
||||
ListChannels(context.Context, string) (alertmanagertypes.Channels, error)
|
||||
|
||||
// UpdateChannel updates a channel for the organization.
|
||||
UpdateChannel(context.Context, string, uint64, string) error
|
||||
}
|
||||
145
pkg/alertmanager/config.go
Normal file
145
pkg/alertmanager/config.go
Normal file
@@ -0,0 +1,145 @@
|
||||
package alertmanager
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
// PollInterval is the interval at which the alertmanager config is polled from the store.
|
||||
PollInterval time.Duration `mapstructure:"poll_interval"`
|
||||
|
||||
// The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself.
|
||||
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L155C54-L155C249
|
||||
ExternalUrl *url.URL `mapstructure:"external_url"`
|
||||
|
||||
// ResolveTimeout is the time after which an alert is declared resolved if it has not been updated.
|
||||
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/config/config.go#L836
|
||||
ResolveTimeout time.Duration `mapstructure:"resolve_timeout"`
|
||||
|
||||
// Config of the root node of the routing tree.
|
||||
Route RouteConfig `mapstructure:"route"`
|
||||
|
||||
// Configuration for alerts.
|
||||
Alerts AlertsConfig `mapstructure:"alerts"`
|
||||
|
||||
// Configuration for silences.
|
||||
Silences SilencesConfig `mapstructure:"silences"`
|
||||
|
||||
// Configuration for the notification log.
|
||||
NFLog NFLogConfig `mapstructure:"nflog"`
|
||||
|
||||
// Configuration for the Email receiver. We are explicitly defining this here instead of taking it as part of the receiver configuration.
|
||||
// This is because we want to use the same SMTP configuration for all receivers.
|
||||
SMTP SMTPConfig `mapstructure:"smtp"`
|
||||
|
||||
// Configuration for the alertmanagerstore.
|
||||
Store alertmanagerstore.Config `mapstructure:"store"`
|
||||
}
|
||||
|
||||
type RouteConfig struct {
|
||||
GroupBy []string `mapstructure:"group_by"`
|
||||
GroupInterval time.Duration `mapstructure:"group_interval"`
|
||||
GroupWait time.Duration `mapstructure:"group_wait"`
|
||||
RepeatInterval time.Duration `mapstructure:"repeat_interval"`
|
||||
}
|
||||
|
||||
// This is a best effort to make it similar to the upstream alertmanager config. See
|
||||
// https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/config/config.go#L843
|
||||
type SMTPConfig struct {
|
||||
Hello string `mapstructure:"hello"`
|
||||
From string `mapstructure:"from"`
|
||||
Host string `mapstructure:"host"`
|
||||
Port int `mapstructure:"port"`
|
||||
AuthUsername string `mapstructure:"auth_username"`
|
||||
AuthPassword string `mapstructure:"auth_password"`
|
||||
AuthSecret string `mapstructure:"auth_secret"`
|
||||
AuthIdentity string `mapstructure:"auth_identity"`
|
||||
RequireTLS bool `mapstructure:"require_tls"`
|
||||
}
|
||||
|
||||
type AlertsConfig struct {
|
||||
// Interval between garbage collection of alerts.
|
||||
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L152
|
||||
GCInterval time.Duration `mapstructure:"gc_interval"`
|
||||
}
|
||||
|
||||
type SilencesConfig struct {
|
||||
// Maximum number of silences, including expired silences. If negative or zero, no limit is set.
|
||||
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L150C64-L150C157
|
||||
Max int `mapstructure:"max"`
|
||||
|
||||
// Maximum size of the silences in bytes. If negative or zero, no limit is set.
|
||||
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L150C64-L150C157
|
||||
MaxSizeBytes int `mapstructure:"max_size_bytes"`
|
||||
|
||||
// Interval between garbage collection and snapshotting of the silences. The snapshot will be stored in the state store.
|
||||
// The upstream alertmanager config (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149) has
|
||||
// been split between silences and nflog.
|
||||
MaintenanceInterval time.Duration `mapstructure:"maintenance_interval"`
|
||||
|
||||
// Retention of the silences.
|
||||
Retention time.Duration `mapstructure:"retention"`
|
||||
}
|
||||
|
||||
type NFLogConfig struct {
|
||||
// Interval between garbage collection and snapshotting of the notification logs. The snapshot will be stored in the state store.
|
||||
// The upstream alertmanager config (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149) has
|
||||
// been split between silences and nflog.
|
||||
MaintenanceInterval time.Duration `mapstructure:"maintenance_interval"`
|
||||
|
||||
// Retention of the notification logs.
|
||||
Retention time.Duration `mapstructure:"retention"`
|
||||
}
|
||||
|
||||
func NewConfigFactory() factory.ConfigFactory {
|
||||
return factory.NewConfigFactory(factory.MustNewName("alertmanager"), newConfig)
|
||||
}
|
||||
|
||||
func newConfig() factory.Config {
|
||||
return Config{
|
||||
PollInterval: 15 * time.Second,
|
||||
ExternalUrl: &url.URL{
|
||||
Host: "localhost:8080",
|
||||
},
|
||||
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/config/config.go#L727)
|
||||
ResolveTimeout: 5 * time.Minute,
|
||||
Route: RouteConfig{
|
||||
GroupBy: []string{"alertname"},
|
||||
GroupInterval: 5 * time.Minute,
|
||||
GroupWait: 30 * time.Second,
|
||||
RepeatInterval: 4 * time.Hour,
|
||||
},
|
||||
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L152)
|
||||
Alerts: AlertsConfig{
|
||||
GCInterval: 30 * time.Minute,
|
||||
},
|
||||
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149-L151)
|
||||
Silences: SilencesConfig{
|
||||
Max: 0,
|
||||
MaxSizeBytes: 0,
|
||||
MaintenanceInterval: 15 * time.Minute,
|
||||
Retention: 120 * time.Hour,
|
||||
},
|
||||
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149)
|
||||
NFLog: NFLogConfig{
|
||||
MaintenanceInterval: 15 * time.Minute,
|
||||
Retention: 120 * time.Hour,
|
||||
},
|
||||
SMTP: SMTPConfig{
|
||||
Hello: "localhost",
|
||||
From: "alertmanager@signoz.io",
|
||||
Host: "localhost",
|
||||
Port: 25,
|
||||
RequireTLS: true,
|
||||
},
|
||||
Store: alertmanagerstore.NewConfig().(alertmanagerstore.Config),
|
||||
}
|
||||
}
|
||||
|
||||
func (c Config) Validate() error {
|
||||
return nil
|
||||
}
|
||||
339
pkg/alertmanager/server.go
Normal file
339
pkg/alertmanager/server.go
Normal file
@@ -0,0 +1,339 @@
|
||||
package alertmanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/alertmanager/dispatch"
|
||||
"github.com/prometheus/alertmanager/featurecontrol"
|
||||
"github.com/prometheus/alertmanager/inhibit"
|
||||
"github.com/prometheus/alertmanager/nflog"
|
||||
"github.com/prometheus/alertmanager/notify"
|
||||
"github.com/prometheus/alertmanager/provider/mem"
|
||||
"github.com/prometheus/alertmanager/silence"
|
||||
"github.com/prometheus/alertmanager/template"
|
||||
"github.com/prometheus/alertmanager/timeinterval"
|
||||
"github.com/prometheus/common/model"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
|
||||
"go.signoz.io/signoz/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
var _ factory.Service = (*Server)(nil)
|
||||
|
||||
var (
|
||||
// This is not a real file and will never be used. We need this placeholder to ensure maintenance runs on shutdown. See
|
||||
// https://github.com/prometheus/server/blob/3ee2cd0f1271e277295c02b6160507b4d193dde2/silence/silence.go#L435-L438
|
||||
// and https://github.com/prometheus/server/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L362.
|
||||
snapfnoop string = "snapfnoop"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
// srvConfig is the server config for the alertmanager
|
||||
srvConfig Config
|
||||
// alertmanagerConfigHash is the hash of the alertmanager config
|
||||
alertmanagerConfigHash [16]byte
|
||||
// alertmanagerConfigRaw is the raw config of the alertmanager
|
||||
alertmanagerConfigRaw []byte
|
||||
// alertmanagerConfig is the config of the alertmanager
|
||||
alertmanagerConfig *alertmanagertypes.Config
|
||||
// Settings is the factorysettings for the alertmanager
|
||||
settings factory.NamespacedSettings
|
||||
// orgID is the orgID for the alertmanager
|
||||
orgID string
|
||||
// store is the backing store for the alertmanager
|
||||
store alertmanagerstore.Store
|
||||
// alertmanager primitives from upstream alertmanager
|
||||
alerts *mem.Alerts
|
||||
nflog *nflog.Log
|
||||
dispatcher *dispatch.Dispatcher
|
||||
dispatcherMetrics *dispatch.DispatcherMetrics
|
||||
inhibitor *inhibit.Inhibitor
|
||||
silencer *silence.Silencer
|
||||
silences *silence.Silences
|
||||
timeIntervals map[string][]timeinterval.TimeInterval
|
||||
pipelineBuilder *notify.PipelineBuilder
|
||||
marker *alertmanagertypes.MemMarker
|
||||
tmpl *template.Template
|
||||
wg sync.WaitGroup
|
||||
stopc chan struct{}
|
||||
}
|
||||
|
||||
func NewForOrg(ctx context.Context, settings factory.Settings, srvConfig Config, orgID string, store alertmanagerstore.Store) (*Server, error) {
|
||||
server := &Server{
|
||||
srvConfig: srvConfig,
|
||||
settings: factory.NewNamespacedSettings(settings, "go.signoz.io/signoz/pkg/alertmanager"),
|
||||
orgID: orgID,
|
||||
store: store,
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
// initialize marker
|
||||
server.marker = alertmanagertypes.NewMarker(server.settings.PrometheusRegisterer())
|
||||
|
||||
// get silences for initial state
|
||||
silencesstate, err := store.GetState(ctx, server.orgID, alertmanagertypes.SilenceStateName)
|
||||
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// get nflog for initial state
|
||||
nflogstate, err := store.GetState(ctx, server.orgID, alertmanagertypes.NFLogStateName)
|
||||
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize silences
|
||||
server.silences, err = silence.New(silence.Options{
|
||||
SnapshotReader: strings.NewReader(silencesstate),
|
||||
Retention: srvConfig.Silences.Retention,
|
||||
Limits: silence.Limits{
|
||||
MaxSilences: func() int { return srvConfig.Silences.Max },
|
||||
MaxSilenceSizeBytes: func() int { return srvConfig.Silences.MaxSizeBytes },
|
||||
},
|
||||
Metrics: server.settings.PrometheusRegisterer(),
|
||||
Logger: server.settings.Logger(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize notification log
|
||||
server.nflog, err = nflog.New(nflog.Options{
|
||||
SnapshotReader: strings.NewReader(nflogstate),
|
||||
Retention: server.srvConfig.NFLog.Retention,
|
||||
Metrics: server.settings.PrometheusRegisterer(),
|
||||
Logger: server.settings.Logger(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Start maintenance for silences
|
||||
server.wg.Add(1)
|
||||
go func() {
|
||||
defer server.wg.Done()
|
||||
server.silences.Maintenance(server.srvConfig.Silences.MaintenanceInterval, snapfnoop, server.stopc, func() (int64, error) {
|
||||
// Delete silences older than the retention period.
|
||||
if _, err := server.silences.GC(); err != nil {
|
||||
server.settings.Logger().ErrorContext(ctx, "silence garbage collection", "error", err)
|
||||
// Don't return here - we need to snapshot our state first.
|
||||
}
|
||||
|
||||
return server.store.SetState(ctx, server.orgID, alertmanagertypes.SilenceStateName, server.silences)
|
||||
})
|
||||
|
||||
}()
|
||||
|
||||
// Start maintenance for notification logs
|
||||
server.wg.Add(1)
|
||||
go func() {
|
||||
defer server.wg.Done()
|
||||
server.nflog.Maintenance(server.srvConfig.NFLog.MaintenanceInterval, snapfnoop, server.stopc, func() (int64, error) {
|
||||
if _, err := server.nflog.GC(); err != nil {
|
||||
server.settings.Logger().ErrorContext(ctx, "notification log garbage collection", "error", err)
|
||||
// Don't return without saving the current state.
|
||||
}
|
||||
|
||||
return server.store.SetState(ctx, server.orgID, alertmanagertypes.NFLogStateName, server.nflog)
|
||||
})
|
||||
}()
|
||||
|
||||
server.alerts, err = mem.NewAlerts(ctx, server.marker, server.srvConfig.Alerts.GCInterval, nil, server.settings.Logger(), server.settings.PrometheusRegisterer())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
server.pipelineBuilder = notify.NewPipelineBuilder(server.settings.PrometheusRegisterer(), featurecontrol.NoopFlags{})
|
||||
server.dispatcherMetrics = dispatch.NewDispatcherMetrics(false, server.settings.PrometheusRegisterer())
|
||||
|
||||
return server, nil
|
||||
}
|
||||
|
||||
func (server *Server) Start(ctx context.Context) error {
|
||||
config, err := server.store.GetConfig(ctx, server.orgID)
|
||||
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
if config == nil {
|
||||
config = alertmanagertypes.NewDefaultConfig(
|
||||
server.srvConfig.ResolveTimeout,
|
||||
server.srvConfig.SMTP.Hello,
|
||||
server.srvConfig.SMTP.From,
|
||||
server.srvConfig.SMTP.Host,
|
||||
server.srvConfig.SMTP.Port,
|
||||
server.srvConfig.SMTP.AuthUsername,
|
||||
server.srvConfig.SMTP.AuthPassword,
|
||||
server.srvConfig.SMTP.AuthSecret,
|
||||
server.srvConfig.SMTP.AuthIdentity,
|
||||
server.srvConfig.SMTP.RequireTLS,
|
||||
server.srvConfig.Route.GroupBy,
|
||||
server.srvConfig.Route.GroupInterval,
|
||||
server.srvConfig.Route.GroupWait,
|
||||
server.srvConfig.Route.RepeatInterval,
|
||||
server.orgID,
|
||||
)
|
||||
}
|
||||
|
||||
return server.SetConfig(ctx, config)
|
||||
}
|
||||
|
||||
func (server *Server) GetAlerts(ctx context.Context, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
|
||||
return alertmanagertypes.NewGettableAlertsFromAlertProvider(server.alerts, server.alertmanagerConfig, server.marker.Status, func(labels model.LabelSet) {
|
||||
server.inhibitor.Mutes(labels)
|
||||
server.silencer.Mutes(labels)
|
||||
}, params)
|
||||
}
|
||||
|
||||
func (server *Server) PutAlerts(ctx context.Context, postableAlerts alertmanagertypes.PostableAlerts) error {
|
||||
alerts, err := alertmanagertypes.NewAlertsFromPostableAlerts(postableAlerts, server.srvConfig.ResolveTimeout, time.Now())
|
||||
|
||||
// Notification sending alert takes precedence over validation errors.
|
||||
if err := server.alerts.Put(alerts...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return errors.Join(err...)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (server *Server) ConfigHash() [16]byte {
|
||||
return server.alertmanagerConfigHash
|
||||
}
|
||||
|
||||
func (server *Server) ConfigRaw() []byte {
|
||||
return server.alertmanagerConfigRaw
|
||||
}
|
||||
|
||||
func (server *Server) SetConfig(ctx context.Context, alertmanagerConfig *alertmanagertypes.Config) error {
|
||||
config := alertmanagerConfig.AlertmanagerConfig()
|
||||
|
||||
var err error
|
||||
server.tmpl, err = template.FromGlobs(config.Templates)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
server.tmpl.ExternalURL = server.srvConfig.ExternalUrl
|
||||
|
||||
// Build the routing tree and record which receivers are used.
|
||||
routes := dispatch.NewRoute(config.Route, nil)
|
||||
activeReceivers := make(map[string]struct{})
|
||||
routes.Walk(func(r *dispatch.Route) {
|
||||
activeReceivers[r.RouteOpts.Receiver] = struct{}{}
|
||||
})
|
||||
|
||||
// Build the map of receiver to integrations.
|
||||
receivers := make(map[string][]notify.Integration, len(activeReceivers))
|
||||
var integrationsNum int
|
||||
for _, rcv := range config.Receivers {
|
||||
if _, found := activeReceivers[rcv.Name]; !found {
|
||||
// No need to build a receiver if no route is using it.
|
||||
server.settings.Logger().InfoContext(ctx, "skipping creation of receiver not referenced by any route", "receiver", rcv.Name)
|
||||
continue
|
||||
}
|
||||
integrations, err := alertmanagertypes.NewReceiverIntegrations(rcv, server.tmpl, server.settings.Logger())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// rcv.Name is guaranteed to be unique across all receivers.
|
||||
receivers[rcv.Name] = integrations
|
||||
integrationsNum += len(integrations)
|
||||
}
|
||||
|
||||
// Build the map of time interval names to time interval definitions.
|
||||
timeIntervals := make(map[string][]timeinterval.TimeInterval, len(config.MuteTimeIntervals)+len(config.TimeIntervals))
|
||||
for _, ti := range config.MuteTimeIntervals {
|
||||
timeIntervals[ti.Name] = ti.TimeIntervals
|
||||
}
|
||||
|
||||
for _, ti := range config.TimeIntervals {
|
||||
timeIntervals[ti.Name] = ti.TimeIntervals
|
||||
}
|
||||
|
||||
intervener := timeinterval.NewIntervener(timeIntervals)
|
||||
|
||||
if server.inhibitor != nil {
|
||||
server.inhibitor.Stop()
|
||||
}
|
||||
if server.dispatcher != nil {
|
||||
server.dispatcher.Stop()
|
||||
}
|
||||
|
||||
server.inhibitor = inhibit.NewInhibitor(server.alerts, config.InhibitRules, server.marker, server.settings.Logger())
|
||||
server.timeIntervals = timeIntervals
|
||||
server.silencer = silence.NewSilencer(server.silences, server.marker, server.settings.Logger())
|
||||
|
||||
var pipelinePeer notify.Peer
|
||||
pipeline := server.pipelineBuilder.New(
|
||||
receivers,
|
||||
func() time.Duration { return 0 },
|
||||
server.inhibitor,
|
||||
server.silencer,
|
||||
intervener,
|
||||
server.marker,
|
||||
server.nflog,
|
||||
pipelinePeer,
|
||||
)
|
||||
|
||||
timeoutFunc := func(d time.Duration) time.Duration {
|
||||
if d < notify.MinTimeout {
|
||||
d = notify.MinTimeout
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
server.dispatcher = dispatch.NewDispatcher(
|
||||
server.alerts,
|
||||
routes,
|
||||
pipeline,
|
||||
server.marker,
|
||||
timeoutFunc,
|
||||
nil,
|
||||
server.settings.Logger(),
|
||||
server.dispatcherMetrics,
|
||||
)
|
||||
|
||||
// Do not try to add these to `server.wg as there seems to be a race condition if
|
||||
// we call Start() and Stop() in quick succession.
|
||||
// Both these goroutines will run indefinitely.
|
||||
go server.dispatcher.Run()
|
||||
go server.inhibitor.Run()
|
||||
|
||||
server.alertmanagerConfigHash = alertmanagerConfig.Hash()
|
||||
server.alertmanagerConfigRaw = alertmanagerConfig.Raw()
|
||||
server.alertmanagerConfig = alertmanagerConfig
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (server *Server) TestReceiver(ctx context.Context, receiver alertmanagertypes.Receiver) error {
|
||||
return alertmanagertypes.TestReceiver(ctx, receiver, server.tmpl, server.settings.Logger())
|
||||
}
|
||||
|
||||
func (server *Server) Stop(ctx context.Context) error {
|
||||
if server.dispatcher != nil {
|
||||
server.dispatcher.Stop()
|
||||
}
|
||||
|
||||
if server.inhibitor != nil {
|
||||
server.inhibitor.Stop()
|
||||
}
|
||||
|
||||
// Close the alert provider.
|
||||
server.alerts.Close()
|
||||
|
||||
// Signals maintenance goroutines of server states to stop.
|
||||
close(server.stopc)
|
||||
|
||||
// Wait for all goroutines to finish.
|
||||
server.wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
84
pkg/alertmanager/server_test.go
Normal file
84
pkg/alertmanager/server_test.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package alertmanager
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
commoncfg "github.com/prometheus/common/config"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/alertmanagerstoretest"
|
||||
"go.signoz.io/signoz/pkg/factory/factorytest"
|
||||
"go.signoz.io/signoz/pkg/factory/providertest"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
func TestServerStartStop(t *testing.T) {
|
||||
store, err := alertmanagerstoretest.New(context.Background(), providertest.NewSettings(), alertmanagerstore.NewConfig().(alertmanagerstore.Config), []string{"1"})
|
||||
require.NoError(t, err)
|
||||
server, err := NewForOrg(context.Background(), factorytest.NewSettings(), newConfig().(Config), "1", store)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, server.Start(context.Background()))
|
||||
require.NoError(t, server.Stop(context.Background()))
|
||||
}
|
||||
|
||||
func TestServerWithDefaultConfig(t *testing.T) {
|
||||
store, err := alertmanagerstoretest.New(context.Background(), providertest.NewSettings(), alertmanagerstore.NewConfig().(alertmanagerstore.Config), []string{"1"})
|
||||
require.NoError(t, err)
|
||||
server, err := NewForOrg(context.Background(), factorytest.NewSettings(), newConfig().(Config), "1", store)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, server.Start(context.Background()))
|
||||
defer require.NoError(t, server.Stop(context.Background()))
|
||||
|
||||
assert.Equal(t, `{"global":{"resolve_timeout":"5m","http_config":{"tls_config":{"insecure_skip_verify":false},"follow_redirects":true,"enable_http2":true,"proxy_url":null},"smtp_from":"alertmanager@signoz.io","smtp_hello":"localhost","smtp_smarthost":"localhost:25","smtp_require_tls":true,"smtp_tls_config":{"insecure_skip_verify":false},"pagerduty_url":"https://events.pagerduty.com/v2/enqueue","opsgenie_api_url":"https://api.opsgenie.com/","wechat_api_url":"https://qyapi.weixin.qq.com/cgi-bin/","victorops_api_url":"https://alert.victorops.com/integrations/generic/20131114/alert/","telegram_api_url":"https://api.telegram.org","webex_api_url":"https://webexapis.com/v1/messages","rocketchat_api_url":"https://open.rocket.chat/"},"route":{"receiver":"default-receiver","group_by":["alertname"],"group_wait":"30s","group_interval":"5m","repeat_interval":"4h"},"receivers":[{"name":"default-receiver"}],"templates":null}`, string(server.alertmanagerConfigRaw))
|
||||
}
|
||||
|
||||
func TestServerTestReceiverWebhook(t *testing.T) {
|
||||
store, err := alertmanagerstoretest.New(context.Background(), providertest.NewSettings(), alertmanagerstore.NewConfig().(alertmanagerstore.Config), []string{"1"})
|
||||
require.NoError(t, err)
|
||||
server, err := NewForOrg(context.Background(), factorytest.NewSettings(), newConfig().(Config), "1", store)
|
||||
require.NoError(t, err)
|
||||
|
||||
webhookListener, err := net.Listen("tcp", "localhost:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
requestBody := new(bytes.Buffer)
|
||||
webhookServer := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, err := requestBody.ReadFrom(r.Body)
|
||||
require.NoError(t, err)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}),
|
||||
}
|
||||
|
||||
go func() {
|
||||
require.NoError(t, webhookServer.Serve(webhookListener))
|
||||
}()
|
||||
|
||||
require.NoError(t, server.Start(context.Background()))
|
||||
defer require.NoError(t, server.Stop(context.Background()))
|
||||
|
||||
webhookURL, err := url.Parse("http://" + webhookListener.Addr().String() + "/webhook")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = server.TestReceiver(context.Background(), alertmanagertypes.Receiver{
|
||||
Name: "test-receiver",
|
||||
WebhookConfigs: []*config.WebhookConfig{
|
||||
{
|
||||
HTTPConfig: &commoncfg.HTTPClientConfig{},
|
||||
URL: &config.SecretURL{URL: webhookURL},
|
||||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, requestBody.String(), "test-receiver")
|
||||
require.Contains(t, requestBody.String(), "firing")
|
||||
}
|
||||
214
pkg/alertmanager/servers.go
Normal file
214
pkg/alertmanager/servers.go
Normal file
@@ -0,0 +1,214 @@
|
||||
package alertmanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
|
||||
"go.signoz.io/signoz/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeAlertmanagerNotFound = errors.MustNewCode("alertmanager_not_found")
|
||||
)
|
||||
|
||||
var _ factory.Service = (*Servers)(nil)
|
||||
var _ Client = (*Servers)(nil)
|
||||
|
||||
type Servers struct {
|
||||
config Config
|
||||
// Store is the store for the alertmanager
|
||||
store alertmanagerstore.Store
|
||||
// Map of organization id to server
|
||||
servers map[string]*Server
|
||||
// Mutex to protect the servers map
|
||||
serversMtx sync.RWMutex
|
||||
}
|
||||
|
||||
func New(ctx context.Context, settings factory.Settings, config Config, store alertmanagerstore.Store) (*Servers, error) {
|
||||
servers := &Servers{
|
||||
config: config,
|
||||
store: store,
|
||||
servers: map[string]*Server{},
|
||||
serversMtx: sync.RWMutex{},
|
||||
}
|
||||
|
||||
orgIDs, err := store.ListOrgIDs(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, orgID := range orgIDs {
|
||||
server, err := NewForOrg(ctx, settings, config, orgID, store)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
servers.servers[orgID] = server
|
||||
}
|
||||
|
||||
return servers, nil
|
||||
}
|
||||
|
||||
func (ss *Servers) Start(ctx context.Context) error {
|
||||
for _, server := range ss.servers {
|
||||
err := server.Start(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(ss.config.PollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
for _, server := range ss.servers {
|
||||
config, err := ss.store.GetConfig(ctx, server.orgID)
|
||||
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||
server.settings.Logger().ErrorContext(ctx, "failed to get config", "error", err, "orgID", server.orgID)
|
||||
continue
|
||||
}
|
||||
if config == nil {
|
||||
config = alertmanagertypes.NewDefaultConfig(
|
||||
server.srvConfig.ResolveTimeout,
|
||||
server.srvConfig.SMTP.Hello,
|
||||
server.srvConfig.SMTP.From,
|
||||
server.srvConfig.SMTP.Host,
|
||||
server.srvConfig.SMTP.Port,
|
||||
server.srvConfig.SMTP.AuthUsername,
|
||||
server.srvConfig.SMTP.AuthPassword,
|
||||
server.srvConfig.SMTP.AuthSecret,
|
||||
server.srvConfig.SMTP.AuthIdentity,
|
||||
server.srvConfig.SMTP.RequireTLS,
|
||||
server.srvConfig.Route.GroupBy,
|
||||
server.srvConfig.Route.GroupInterval,
|
||||
server.srvConfig.Route.GroupWait,
|
||||
server.srvConfig.Route.RepeatInterval,
|
||||
server.orgID,
|
||||
)
|
||||
}
|
||||
|
||||
if err := server.SetConfig(ctx, config); err != nil {
|
||||
server.settings.Logger().ErrorContext(ctx, "failed to set config in alertmanager", "error", err, "orgID", server.orgID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ss *Servers) Stop(ctx context.Context) error {
|
||||
for _, server := range ss.servers {
|
||||
server.Stop(ctx)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ss *Servers) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
|
||||
server, ok := ss.servers[orgID]
|
||||
if !ok {
|
||||
return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for orgID %q", orgID)
|
||||
}
|
||||
|
||||
return server.GetAlerts(ctx, params)
|
||||
}
|
||||
|
||||
func (ss *Servers) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error {
|
||||
server, ok := ss.servers[orgID]
|
||||
if !ok {
|
||||
return errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for orgID %q", orgID)
|
||||
}
|
||||
|
||||
return server.PutAlerts(ctx, alerts)
|
||||
}
|
||||
|
||||
func (ss *Servers) SetConfig(ctx context.Context, orgID string, postableConfig alertmanagertypes.PostableConfig) error {
|
||||
cfg, err := ss.store.GetConfig(ctx, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = cfg.MergeWithPostableConfig(postableConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := ss.store.SetConfig(ctx, orgID, cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ss *Servers) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||
server, ok := ss.servers[orgID]
|
||||
if !ok {
|
||||
return errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for orgID %q", orgID)
|
||||
}
|
||||
|
||||
return server.TestReceiver(ctx, receiver)
|
||||
}
|
||||
|
||||
func (ss *Servers) CreateChannel(ctx context.Context, orgID string, channel *alertmanagertypes.Channel) error {
|
||||
receiver, err := alertmanagertypes.NewReceiverFromChannel(channel)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ss.SetConfig(ctx, orgID, alertmanagertypes.PostableConfig{
|
||||
Action: alertmanagertypes.PostableConfigActionCreate,
|
||||
Receiver: receiver,
|
||||
})
|
||||
}
|
||||
|
||||
func (ss *Servers) GetChannel(ctx context.Context, orgID string, id uint64) (*alertmanagertypes.Channel, error) {
|
||||
return ss.store.GetChannel(ctx, orgID, id)
|
||||
}
|
||||
|
||||
func (ss *Servers) DelChannel(ctx context.Context, orgID string, id uint64) error {
|
||||
channel, err := ss.store.GetChannel(ctx, orgID, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
receiver, err := alertmanagertypes.NewReceiverFromChannel(channel)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ss.SetConfig(ctx, orgID, alertmanagertypes.PostableConfig{
|
||||
Action: alertmanagertypes.PostableConfigActionDelete,
|
||||
Receiver: receiver,
|
||||
})
|
||||
}
|
||||
|
||||
func (ss *Servers) ListChannels(ctx context.Context, orgID string) (alertmanagertypes.Channels, error) {
|
||||
return ss.store.ListChannels(ctx, orgID)
|
||||
}
|
||||
|
||||
func (ss *Servers) UpdateChannel(ctx context.Context, orgID string, id uint64, data string) error {
|
||||
existingChannel, err := ss.store.GetChannel(ctx, orgID, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
existingChannel.Data = data
|
||||
existingChannel.UpdatedAt = time.Now()
|
||||
|
||||
receiver, err := alertmanagertypes.NewReceiverFromChannel(existingChannel)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ss.SetConfig(ctx, orgID, alertmanagertypes.PostableConfig{
|
||||
Action: alertmanagertypes.PostableConfigActionUpdate,
|
||||
Receiver: receiver,
|
||||
})
|
||||
}
|
||||
10
pkg/factory/factorytest/settings.go
Normal file
10
pkg/factory/factorytest/settings.go
Normal file
@@ -0,0 +1,10 @@
|
||||
package factorytest
|
||||
|
||||
import (
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/instrumentation/instrumentationtest"
|
||||
)
|
||||
|
||||
func NewSettings() factory.Settings {
|
||||
return instrumentationtest.New().ToFactorySettings()
|
||||
}
|
||||
@@ -2,9 +2,12 @@ package factory
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"regexp"
|
||||
)
|
||||
|
||||
var _ slog.LogValuer = Name{}
|
||||
|
||||
var (
|
||||
// nameRegex is a regex that matches a valid name.
|
||||
// It must start with a alphabet, and can only contain alphabets, numbers, underscores or hyphens.
|
||||
@@ -15,6 +18,10 @@ type Name struct {
|
||||
name string
|
||||
}
|
||||
|
||||
func (n Name) LogValue() slog.Value {
|
||||
return slog.StringValue(n.name)
|
||||
}
|
||||
|
||||
func (n Name) String() string {
|
||||
return n.name
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package factory
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type Service interface {
|
||||
// Starts a service. The service should return an error if it cannot be started.
|
||||
@@ -8,3 +10,24 @@ type Service interface {
|
||||
// Stops a service.
|
||||
Stop(context.Context) error
|
||||
}
|
||||
|
||||
type NamedService interface {
|
||||
Named
|
||||
Service
|
||||
}
|
||||
|
||||
type namedService struct {
|
||||
name Name
|
||||
Service
|
||||
}
|
||||
|
||||
func (s *namedService) Name() Name {
|
||||
return s.name
|
||||
}
|
||||
|
||||
func NewNamedService(name Name, service Service) NamedService {
|
||||
return &namedService{
|
||||
name: name,
|
||||
Service: service,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,56 @@ import (
|
||||
sdktrace "go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
type Settings struct {
|
||||
// Logger is the logger.
|
||||
Logger *slog.Logger
|
||||
// MeterProvider is the meter provider.
|
||||
MeterProvider sdkmetric.MeterProvider
|
||||
// TracerProvider is the tracer provider.
|
||||
TracerProvider sdktrace.TracerProvider
|
||||
// PrometheusRegisterer is the prometheus registerer.
|
||||
PrometheusRegisterer prometheus.Registerer
|
||||
}
|
||||
|
||||
type NamespacedSettings interface {
|
||||
Logger() *slog.Logger
|
||||
Meter() sdkmetric.Meter
|
||||
Tracer() sdktrace.Tracer
|
||||
PrometheusRegisterer() prometheus.Registerer
|
||||
}
|
||||
|
||||
type namespacedSettings struct {
|
||||
logger *slog.Logger
|
||||
meter sdkmetric.Meter
|
||||
tracer sdktrace.Tracer
|
||||
prometheusRegisterer prometheus.Registerer
|
||||
}
|
||||
|
||||
func NewNamespacedSettings(settings Settings, pkgName string) NamespacedSettings {
|
||||
return &namespacedSettings{
|
||||
logger: settings.Logger.With("pkg", pkgName),
|
||||
meter: settings.MeterProvider.Meter(pkgName),
|
||||
tracer: settings.TracerProvider.Tracer(pkgName),
|
||||
prometheusRegisterer: prometheus.WrapRegistererWith(prometheus.Labels{"pkg": pkgName}, settings.PrometheusRegisterer),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *namespacedSettings) Logger() *slog.Logger {
|
||||
return s.logger
|
||||
}
|
||||
|
||||
func (s *namespacedSettings) Meter() sdkmetric.Meter {
|
||||
return s.meter
|
||||
}
|
||||
|
||||
func (s *namespacedSettings) Tracer() sdktrace.Tracer {
|
||||
return s.tracer
|
||||
}
|
||||
|
||||
func (s *namespacedSettings) PrometheusRegisterer() prometheus.Registerer {
|
||||
return s.prometheusRegisterer
|
||||
}
|
||||
|
||||
type ProviderSettings struct {
|
||||
// SlogLogger is the slog logger.
|
||||
Logger *slog.Logger
|
||||
@@ -8,8 +8,16 @@ import (
|
||||
sdkresource "go.opentelemetry.io/otel/sdk/resource"
|
||||
sdktrace "go.opentelemetry.io/otel/trace"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
var zapLogLevelToSlogLevel = map[zapcore.Level]slog.Level{
|
||||
zapcore.DebugLevel: slog.LevelDebug,
|
||||
zapcore.InfoLevel: slog.LevelInfo,
|
||||
zapcore.WarnLevel: slog.LevelWarn,
|
||||
zapcore.ErrorLevel: slog.LevelError,
|
||||
}
|
||||
|
||||
// Instrumentation provides the core components for application instrumentation.
|
||||
type Instrumentation interface {
|
||||
// Logger returns the Slog logger.
|
||||
@@ -22,6 +30,8 @@ type Instrumentation interface {
|
||||
PrometheusRegisterer() prometheus.Registerer
|
||||
// ToProviderSettings converts instrumentation to provider settings.
|
||||
ToProviderSettings() factory.ProviderSettings
|
||||
// ToFactorySettings converts instrumentation to factory settings.
|
||||
ToFactorySettings() factory.Settings
|
||||
}
|
||||
|
||||
// Merges the input attributes with the resource attributes.
|
||||
|
||||
@@ -51,3 +51,12 @@ func (i *noopInstrumentation) ToProviderSettings() factory.ProviderSettings {
|
||||
PrometheusRegisterer: i.PrometheusRegisterer(),
|
||||
}
|
||||
}
|
||||
|
||||
func (i *noopInstrumentation) ToFactorySettings() factory.Settings {
|
||||
return factory.Settings{
|
||||
Logger: i.Logger(),
|
||||
MeterProvider: i.MeterProvider(),
|
||||
TracerProvider: i.TracerProvider(),
|
||||
PrometheusRegisterer: i.PrometheusRegisterer(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -131,3 +131,12 @@ func (i *SDK) ToProviderSettings() factory.ProviderSettings {
|
||||
PrometheusRegisterer: i.PrometheusRegisterer(),
|
||||
}
|
||||
}
|
||||
|
||||
func (i *SDK) ToFactorySettings() factory.Settings {
|
||||
return factory.Settings{
|
||||
Logger: i.Logger(),
|
||||
MeterProvider: i.MeterProvider(),
|
||||
TracerProvider: i.TracerProvider(),
|
||||
PrometheusRegisterer: i.PrometheusRegisterer(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
|
||||
"go.signoz.io/signoz/pkg/http/render"
|
||||
"go.signoz.io/signoz/pkg/query-service/agentConf"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
|
||||
@@ -49,6 +50,8 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/contextlinks"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/postprocess"
|
||||
"go.signoz.io/signoz/pkg/signoz"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
@@ -126,6 +129,7 @@ type APIHandler struct {
|
||||
jobsRepo *inframetrics.JobsRepo
|
||||
|
||||
pvcsRepo *inframetrics.PvcsRepo
|
||||
SigNoz *signoz.SigNoz
|
||||
}
|
||||
|
||||
type APIHandlerOpts struct {
|
||||
@@ -165,6 +169,8 @@ type APIHandlerOpts struct {
|
||||
UseLogsNewSchema bool
|
||||
|
||||
UseTraceNewSchema bool
|
||||
|
||||
SigNoz *signoz.SigNoz
|
||||
}
|
||||
|
||||
// NewAPIHandler returns an APIHandler
|
||||
@@ -237,6 +243,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
||||
statefulsetsRepo: statefulsetsRepo,
|
||||
jobsRepo: jobsRepo,
|
||||
pvcsRepo: pvcsRepo,
|
||||
SigNoz: opts.SigNoz,
|
||||
}
|
||||
|
||||
logsQueryBuilder := logsv3.PrepareLogsQuery
|
||||
@@ -1310,36 +1317,74 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) {
|
||||
id := mux.Vars(r)["id"]
|
||||
channel, apiErrorObj := aH.ruleManager.RuleDB().GetChannel(id)
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
idVar := mux.Vars(r)["id"]
|
||||
id, err := strconv.ParseUint(idVar, 10, 64)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgId, err := auth.GetOrgIdFromJwt(r.Context())
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
channel, err := aH.SigNoz.AlertmanagerClient.GetChannel(r.Context(), orgId, id)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, channel)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) {
|
||||
id := mux.Vars(r)["id"]
|
||||
apiErrorObj := aH.ruleManager.RuleDB().DeleteChannel(id)
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
idVar := mux.Vars(r)["id"]
|
||||
id, err := strconv.ParseUint(idVar, 10, 64)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgId, err := auth.GetOrgIdFromJwt(r.Context())
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = aH.SigNoz.AlertmanagerClient.DelChannel(r.Context(), orgId, id)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, "notification channel successfully deleted")
|
||||
}
|
||||
|
||||
func (aH *APIHandler) listChannels(w http.ResponseWriter, r *http.Request) {
|
||||
channels, apiErrorObj := aH.ruleManager.RuleDB().GetChannels()
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
orgId, err := auth.GetOrgIdFromJwt(r.Context())
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
channels, err := aH.SigNoz.AlertmanagerClient.ListChannels(r.Context(), orgId)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, channels)
|
||||
}
|
||||
|
||||
// testChannels sends test alert to all registered channels
|
||||
func (aH *APIHandler) testChannel(w http.ResponseWriter, r *http.Request) {
|
||||
orgId, err := auth.GetOrgIdFromJwt(r.Context())
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
defer r.Body.Close()
|
||||
body, err := io.ReadAll(r.Body)
|
||||
@@ -1349,24 +1394,34 @@ func (aH *APIHandler) testChannel(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
receiver := &am.Receiver{}
|
||||
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
|
||||
zap.L().Error("Error in parsing req body of testChannel API\n", zap.Error(err))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
receiver, err := alertmanagertypes.NewReceiverFromString(string(body))
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
// send alert
|
||||
apiErrorObj := aH.alertManager.TestReceiver(receiver)
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
|
||||
err = aH.SigNoz.AlertmanagerClient.TestReceiver(r.Context(), orgId, receiver)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, "test alert sent")
|
||||
}
|
||||
|
||||
func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
|
||||
idVar := mux.Vars(r)["id"]
|
||||
id, err := strconv.ParseUint(idVar, 10, 64)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
id := mux.Vars(r)["id"]
|
||||
orgId, err := auth.GetOrgIdFromJwt(r.Context())
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
defer r.Body.Close()
|
||||
body, err := io.ReadAll(r.Body)
|
||||
@@ -1376,17 +1431,9 @@ func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
receiver := &am.Receiver{}
|
||||
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
|
||||
zap.L().Error("Error in parsing req body of editChannel API", zap.Error(err))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
_, apiErrorObj := aH.ruleManager.RuleDB().EditChannel(receiver, id)
|
||||
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
err = aH.SigNoz.AlertmanagerClient.UpdateChannel(r.Context(), orgId, id, string(body))
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1395,6 +1442,11 @@ func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) {
|
||||
orgId, err := auth.GetOrgIdFromJwt(r.Context())
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
defer r.Body.Close()
|
||||
body, err := io.ReadAll(r.Body)
|
||||
@@ -1404,41 +1456,41 @@ func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
receiver := &am.Receiver{}
|
||||
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
|
||||
zap.L().Error("Error in parsing req body of createChannel API", zap.Error(err))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
channel, err := alertmanagertypes.NewChannelFromReceiverString(string(body), orgId)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
_, apiErrorObj := aH.ruleManager.RuleDB().CreateChannel(receiver)
|
||||
|
||||
if apiErrorObj != nil {
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
err = aH.SigNoz.AlertmanagerClient.CreateChannel(r.Context(), orgId, channel)
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, nil)
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getAlerts(w http.ResponseWriter, r *http.Request) {
|
||||
params := r.URL.Query()
|
||||
amEndpoint := constants.GetAlertManagerApiPrefix()
|
||||
resp, err := http.Get(amEndpoint + "v1/alerts" + "?" + params.Encode())
|
||||
orgId, err := auth.GetOrgIdFromJwt(r.Context())
|
||||
if err != nil {
|
||||
render.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
params, err := alertmanagertypes.NewGettableAlertsParams(r)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
alerts, err := aH.SigNoz.AlertmanagerClient.GetAlerts(r.Context(), orgId, params)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, string(body))
|
||||
aH.Respond(w, alerts)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
@@ -200,6 +200,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
FluxInterval: fluxInterval,
|
||||
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
|
||||
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
|
||||
SigNoz: serverOptions.SigNoz,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -132,3 +132,17 @@ func GetEmailFromJwt(ctx context.Context) (string, error) {
|
||||
|
||||
return claims["email"].(string), nil
|
||||
}
|
||||
|
||||
func GetOrgIdFromJwt(ctx context.Context) (string, error) {
|
||||
jwt, ok := ExtractJwtFromContext(ctx)
|
||||
if !ok {
|
||||
return "", model.InternalError(fmt.Errorf("failed to extract jwt from context"))
|
||||
}
|
||||
|
||||
claims, err := ParseJWT(jwt)
|
||||
if err != nil {
|
||||
return "", model.InternalError(fmt.Errorf("failed get claims from jwt %v", err))
|
||||
}
|
||||
|
||||
return claims["orgId"].(string), nil
|
||||
}
|
||||
|
||||
@@ -4,23 +4,23 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type Registry struct {
|
||||
services []factory.Service
|
||||
logger *zap.Logger
|
||||
services factory.NamedMap[factory.NamedService]
|
||||
logger *slog.Logger
|
||||
startCh chan error
|
||||
stopCh chan error
|
||||
}
|
||||
|
||||
// New creates a new registry of services. It needs at least one service in the input.
|
||||
func New(logger *zap.Logger, services ...factory.Service) (*Registry, error) {
|
||||
func New(logger *slog.Logger, services ...factory.NamedService) (*Registry, error) {
|
||||
if logger == nil {
|
||||
return nil, fmt.Errorf("cannot build registry, logger is required")
|
||||
}
|
||||
@@ -29,19 +29,25 @@ func New(logger *zap.Logger, services ...factory.Service) (*Registry, error) {
|
||||
return nil, fmt.Errorf("cannot build registry, at least one service is required")
|
||||
}
|
||||
|
||||
m := factory.MustNewNamedMap(services...)
|
||||
|
||||
return &Registry{
|
||||
logger: logger.Named("go.signoz.io/pkg/registry"),
|
||||
services: services,
|
||||
logger: logger,
|
||||
services: m,
|
||||
startCh: make(chan error, 1),
|
||||
stopCh: make(chan error, len(services)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *Registry) Start(ctx context.Context) error {
|
||||
for _, s := range r.services {
|
||||
go func(s factory.Service) {
|
||||
for _, s := range r.services.GetInOrder() {
|
||||
go func(s factory.NamedService) {
|
||||
r.logger.InfoContext(ctx, "starting service", "service", s.Name())
|
||||
err := s.Start(ctx)
|
||||
r.startCh <- err
|
||||
if err != nil {
|
||||
r.logger.ErrorContext(ctx, "failed to start service", "service", s.Name(), "error", err)
|
||||
r.startCh <- err
|
||||
}
|
||||
}(s)
|
||||
}
|
||||
|
||||
@@ -54,11 +60,11 @@ func (r *Registry) Wait(ctx context.Context) error {
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
r.logger.Info("caught context error, exiting", zap.Any("context", ctx))
|
||||
r.logger.InfoContext(ctx, "caught context error, exiting", "error", ctx.Err())
|
||||
case s := <-interrupt:
|
||||
r.logger.Info("caught interrupt signal, exiting", zap.Any("context", ctx), zap.Any("signal", s))
|
||||
r.logger.InfoContext(ctx, "caught interrupt signal, exiting", "signal", s)
|
||||
case err := <-r.startCh:
|
||||
r.logger.Info("caught service error, exiting", zap.Any("context", ctx), zap.Error(err))
|
||||
r.logger.ErrorContext(ctx, "caught service error, exiting", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -66,15 +72,16 @@ func (r *Registry) Wait(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (r *Registry) Stop(ctx context.Context) error {
|
||||
for _, s := range r.services {
|
||||
go func(s factory.Service) {
|
||||
for _, s := range r.services.GetInOrder() {
|
||||
go func(s factory.NamedService) {
|
||||
r.logger.InfoContext(ctx, "stopping service", "service", s.Name())
|
||||
err := s.Stop(ctx)
|
||||
r.stopCh <- err
|
||||
}(s)
|
||||
}
|
||||
|
||||
errs := make([]error, len(r.services))
|
||||
for i := 0; i < len(r.services); i++ {
|
||||
errs := make([]error, len(r.services.GetInOrder()))
|
||||
for i := 0; i < len(r.services.GetInOrder()); i++ {
|
||||
err := <-r.stopCh
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
|
||||
@@ -2,12 +2,13 @@ package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.signoz.io/signoz/pkg/factory/servicetest"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestRegistryWith2HttpServers(t *testing.T) {
|
||||
@@ -17,7 +18,7 @@ func TestRegistryWith2HttpServers(t *testing.T) {
|
||||
http2, err := servicetest.NewHttpService("http2")
|
||||
require.NoError(t, err)
|
||||
|
||||
registry, err := New(zap.NewNop(), http1, http2)
|
||||
registry, err := New(slog.New(slog.NewTextHandler(io.Discard, nil)), http1, http2)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@@ -41,7 +42,7 @@ func TestRegistryWith2HttpServersWithoutWait(t *testing.T) {
|
||||
http2, err := servicetest.NewHttpService("http2")
|
||||
require.NoError(t, err)
|
||||
|
||||
registry, err := New(zap.NewNop(), http1, http2)
|
||||
registry, err := New(slog.New(slog.NewTextHandler(io.Discard, nil)), http1, http2)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/apiserver"
|
||||
"go.signoz.io/signoz/pkg/cache"
|
||||
"go.signoz.io/signoz/pkg/config"
|
||||
@@ -44,6 +45,9 @@ type Config struct {
|
||||
|
||||
// TelemetryStore config
|
||||
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"`
|
||||
|
||||
// Alertmanager config
|
||||
Alertmanager alertmanager.Config `mapstructure:"alertmanager"`
|
||||
}
|
||||
|
||||
// DeprecatedFlags are the flags that are deprecated and scheduled for removal.
|
||||
@@ -63,6 +67,7 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprec
|
||||
sqlmigrator.NewConfigFactory(),
|
||||
apiserver.NewConfigFactory(),
|
||||
telemetrystore.NewConfigFactory(),
|
||||
alertmanager.NewConfigFactory(),
|
||||
}
|
||||
|
||||
conf, err := config.New(ctx, resolverConfig, configFactories)
|
||||
|
||||
@@ -59,6 +59,7 @@ func NewProviderConfig() ProviderConfig {
|
||||
sqlmigration.NewAddIntegrationsFactory(),
|
||||
sqlmigration.NewAddLicensesFactory(),
|
||||
sqlmigration.NewAddPatsFactory(),
|
||||
sqlmigration.NewAddAlertmanagerConfigurationFactory(),
|
||||
sqlmigration.NewModifyDatetimeFactory(),
|
||||
),
|
||||
TelemetryStoreProviderFactories: factory.MustNewNamedMap(
|
||||
|
||||
@@ -3,9 +3,12 @@ package signoz
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
|
||||
"go.signoz.io/signoz/pkg/cache"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/instrumentation"
|
||||
"go.signoz.io/signoz/pkg/registry"
|
||||
"go.signoz.io/signoz/pkg/sqlmigration"
|
||||
"go.signoz.io/signoz/pkg/sqlmigrator"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
@@ -16,10 +19,12 @@ import (
|
||||
)
|
||||
|
||||
type SigNoz struct {
|
||||
Cache cache.Cache
|
||||
Web web.Web
|
||||
SQLStore sqlstore.SQLStore
|
||||
TelemetryStore telemetrystore.TelemetryStore
|
||||
*registry.Registry
|
||||
Cache cache.Cache
|
||||
Web web.Web
|
||||
SQLStore sqlstore.SQLStore
|
||||
TelemetryStore telemetrystore.TelemetryStore
|
||||
AlertmanagerClient alertmanager.Client
|
||||
}
|
||||
|
||||
func New(
|
||||
@@ -38,6 +43,9 @@ func New(
|
||||
// Get the provider settings from instrumentation
|
||||
providerSettings := instrumentation.ToProviderSettings()
|
||||
|
||||
// Get the factory settings from instrumentation
|
||||
factorySettings := instrumentation.ToFactorySettings()
|
||||
|
||||
// Initialize cache from the available cache provider factories
|
||||
cache, err := factory.NewProviderFromNamedMap(
|
||||
ctx,
|
||||
@@ -102,10 +110,29 @@ func New(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
alertmanagerstore, err := sqlalertmanagerstore.
|
||||
NewFactory(sqlstore).
|
||||
New(ctx, providerSettings, config.Alertmanager.Store)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
alertmanagerService, err := alertmanager.New(ctx, factorySettings, config.Alertmanager, alertmanagerstore)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
registry, err := registry.New(instrumentation.Logger(), factory.NewNamedService(factory.MustNewName("alertmanager"), alertmanagerService))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &SigNoz{
|
||||
Cache: cache,
|
||||
Web: web,
|
||||
SQLStore: sqlstore,
|
||||
TelemetryStore: telemetrystore,
|
||||
Registry: registry,
|
||||
Cache: cache,
|
||||
Web: web,
|
||||
SQLStore: sqlstore,
|
||||
TelemetryStore: telemetrystore,
|
||||
AlertmanagerClient: alertmanagerService,
|
||||
}, nil
|
||||
}
|
||||
|
||||
62
pkg/sqlmigration/010_add_alertmanager_configuration.go
Normal file
62
pkg/sqlmigration/010_add_alertmanager_configuration.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package sqlmigration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
type addAlertmanagerConfiguration struct{}
|
||||
|
||||
func NewAddAlertmanagerConfigurationFactory() factory.ProviderFactory[SQLMigration, Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("add_alertmanager_configuration"), newAddAlertmanagerConfiguration)
|
||||
}
|
||||
|
||||
func newAddAlertmanagerConfiguration(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
|
||||
return &addAlertmanagerConfiguration{}, nil
|
||||
}
|
||||
|
||||
func (migration *addAlertmanagerConfiguration) Register(migrations *migrate.Migrations) error {
|
||||
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *addAlertmanagerConfiguration) Up(ctx context.Context, db *bun.DB) error {
|
||||
if _, err := db.
|
||||
NewCreateTable().
|
||||
Model(&struct {
|
||||
bun.BaseModel `bun:"table:alertmanager_config"`
|
||||
ID uint64 `bun:"id"`
|
||||
Config string `bun:"config"`
|
||||
SilencesState string `bun:"silences_state"`
|
||||
NFLogState string `bun:"nflog_state"`
|
||||
CreatedAt time.Time `bun:"created_at"`
|
||||
UpdatedAt time.Time `bun:"updated_at"`
|
||||
OrgID string `bun:"org_id,unique"`
|
||||
}{}).
|
||||
ForeignKey(`("org_id") REFERENCES "organizations" ("id") ON DELETE CASCADE`).
|
||||
IfNotExists().
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := db.
|
||||
NewAddColumn().
|
||||
Table("notification_channels").
|
||||
ColumnExpr("org_id TEXT").
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *addAlertmanagerConfiguration) Down(ctx context.Context, db *bun.DB) error {
|
||||
return nil
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package sqlitesqlstore
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"log/slog"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
@@ -10,6 +11,7 @@ import (
|
||||
"github.com/uptrace/bun/dialect/sqlitedialect"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/sqlstore/sqlstorehook"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
@@ -33,10 +35,13 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
|
||||
settings.Logger().InfoContext(ctx, "connected to sqlite", "path", config.Sqlite.Path)
|
||||
sqldb.SetMaxOpenConns(config.Connection.MaxOpenConns)
|
||||
|
||||
bundb := bun.NewDB(sqldb, sqlitedialect.New())
|
||||
bundb.AddQueryHook(sqlstorehook.NewDebug(settings.Logger(), slog.LevelDebug))
|
||||
|
||||
return &provider{
|
||||
settings: settings,
|
||||
sqldb: sqldb,
|
||||
bundb: bun.NewDB(sqldb, sqlitedialect.New()),
|
||||
bundb: bundb,
|
||||
sqlxdb: sqlx.NewDb(sqldb, "sqlite3"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
30
pkg/sqlstore/sqlstorehook/debug.go
Normal file
30
pkg/sqlstore/sqlstorehook/debug.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package sqlstorehook
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
type debug struct {
|
||||
bun.QueryHook
|
||||
logger *slog.Logger
|
||||
level slog.Level
|
||||
}
|
||||
|
||||
func NewDebug(logger *slog.Logger, level slog.Level) bun.QueryHook {
|
||||
return &debug{
|
||||
logger: logger,
|
||||
level: level,
|
||||
}
|
||||
}
|
||||
|
||||
func (debug) BeforeQuery(ctx context.Context, event *bun.QueryEvent) context.Context {
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (hook debug) AfterQuery(ctx context.Context, event *bun.QueryEvent) {
|
||||
hook.logger.Log(ctx, hook.level, "::SQLSTORE-QUERY::", "db.query.operation", event.Operation(), "db.query.text", event.Query, "db.duration", time.Since(event.StartTime).String())
|
||||
}
|
||||
BIN
queries.active
Normal file
BIN
queries.active
Normal file
Binary file not shown.
Reference in New Issue
Block a user