Compare commits
11 Commits
main
...
alertmanag
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d05e279bcd | ||
|
|
f1c5d873f7 | ||
|
|
aec239cc7c | ||
|
|
59e26652dc | ||
|
|
e02afc5e97 | ||
|
|
3eac8ac30b | ||
|
|
382c4f58e1 | ||
|
|
73ea632a3f | ||
|
|
00fa8810c0 | ||
|
|
6cee330d44 | ||
|
|
871c6e642c |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -76,3 +76,5 @@ dist/
|
|||||||
|
|
||||||
# ignore user_scripts that is fetched by init-clickhouse
|
# ignore user_scripts that is fetched by init-clickhouse
|
||||||
deploy/common/clickhouse/user_scripts/
|
deploy/common/clickhouse/user_scripts/
|
||||||
|
# queries.active
|
||||||
|
queries.active
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ import (
|
|||||||
basemodel "go.signoz.io/signoz/pkg/query-service/model"
|
basemodel "go.signoz.io/signoz/pkg/query-service/model"
|
||||||
rules "go.signoz.io/signoz/pkg/query-service/rules"
|
rules "go.signoz.io/signoz/pkg/query-service/rules"
|
||||||
"go.signoz.io/signoz/pkg/query-service/version"
|
"go.signoz.io/signoz/pkg/query-service/version"
|
||||||
|
"go.signoz.io/signoz/pkg/signoz"
|
||||||
)
|
)
|
||||||
|
|
||||||
type APIHandlerOptions struct {
|
type APIHandlerOptions struct {
|
||||||
@@ -41,6 +42,7 @@ type APIHandlerOptions struct {
|
|||||||
FluxInterval time.Duration
|
FluxInterval time.Duration
|
||||||
UseLogsNewSchema bool
|
UseLogsNewSchema bool
|
||||||
UseTraceNewSchema bool
|
UseTraceNewSchema bool
|
||||||
|
SigNoz *signoz.SigNoz
|
||||||
}
|
}
|
||||||
|
|
||||||
type APIHandler struct {
|
type APIHandler struct {
|
||||||
@@ -65,6 +67,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
|
|||||||
FluxInterval: opts.FluxInterval,
|
FluxInterval: opts.FluxInterval,
|
||||||
UseLogsNewSchema: opts.UseLogsNewSchema,
|
UseLogsNewSchema: opts.UseLogsNewSchema,
|
||||||
UseTraceNewSchema: opts.UseTraceNewSchema,
|
UseTraceNewSchema: opts.UseTraceNewSchema,
|
||||||
|
SigNoz: opts.SigNoz,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -269,6 +269,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
GatewayUrl: serverOptions.GatewayUrl,
|
GatewayUrl: serverOptions.GatewayUrl,
|
||||||
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
|
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
|
||||||
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
|
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
|
||||||
|
SigNoz: serverOptions.SigNoz,
|
||||||
}
|
}
|
||||||
|
|
||||||
apiHandler, err := api.NewAPIHandler(apiOpts)
|
apiHandler, err := api.NewAPIHandler(apiOpts)
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"strconv"
|
"strconv"
|
||||||
"syscall"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.opentelemetry.io/otel/sdk/resource"
|
"go.opentelemetry.io/otel/sdk/resource"
|
||||||
@@ -87,6 +86,8 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
var promConfigPath, skipTopLvlOpsPath string
|
var promConfigPath, skipTopLvlOpsPath string
|
||||||
|
|
||||||
// disables rule execution but allows change to the rule definition
|
// disables rule execution but allows change to the rule definition
|
||||||
@@ -191,20 +192,20 @@ func main() {
|
|||||||
zap.L().Fatal("Could not start server", zap.Error(err))
|
zap.L().Fatal("Could not start server", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := auth.InitAuthCache(context.Background()); err != nil {
|
if err := auth.InitAuthCache(ctx); err != nil {
|
||||||
zap.L().Fatal("Failed to initialize auth cache", zap.Error(err))
|
zap.L().Fatal("Failed to initialize auth cache", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
signalsChannel := make(chan os.Signal, 1)
|
if err := signoz.Start(ctx); err != nil {
|
||||||
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
|
zap.L().Fatal("Failed to start signoz", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
if err := signoz.Wait(ctx); err != nil {
|
||||||
select {
|
zap.L().Fatal("Failed to wait for signoz", zap.Error(err))
|
||||||
case status := <-server.HealthCheckStatus():
|
}
|
||||||
zap.L().Info("Received HealthCheck status: ", zap.Int("status", int(status)))
|
|
||||||
case <-signalsChannel:
|
server.Stop()
|
||||||
zap.L().Fatal("Received OS Interrupt Signal ... ")
|
if err := signoz.Stop(ctx); err != nil {
|
||||||
server.Stop()
|
zap.L().Fatal("Failed to stop signoz", zap.Error(err))
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,89 @@
|
|||||||
|
package alertmanagerstoretest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
|
||||||
|
"go.signoz.io/signoz/pkg/errors"
|
||||||
|
"go.signoz.io/signoz/pkg/factory"
|
||||||
|
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ alertmanagerstore.Store = (*Provider)(nil)
|
||||||
|
|
||||||
|
type Provider struct {
|
||||||
|
States map[string]map[alertmanagertypes.StateName][]byte
|
||||||
|
Configs map[string]string
|
||||||
|
OrgIDs []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(ctx context.Context, settings factory.ProviderSettings, config alertmanagerstore.Config, orgIDs []string) (*Provider, error) {
|
||||||
|
states := make(map[string]map[alertmanagertypes.StateName][]byte)
|
||||||
|
for _, orgID := range orgIDs {
|
||||||
|
states[orgID] = make(map[alertmanagertypes.StateName][]byte)
|
||||||
|
states[orgID][alertmanagertypes.SilenceStateName] = []byte{}
|
||||||
|
states[orgID][alertmanagertypes.NFLogStateName] = []byte{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Provider{
|
||||||
|
States: states,
|
||||||
|
Configs: make(map[string]string),
|
||||||
|
OrgIDs: orgIDs,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *Provider) GetState(ctx context.Context, orgID string, stateName alertmanagertypes.StateName) (string, error) {
|
||||||
|
if _, ok := provider.States[orgID][stateName]; !ok {
|
||||||
|
return "", errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerStateNotFound, "cannot find state %q for org %q", stateName, orgID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(provider.States[orgID][stateName]), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *Provider) SetState(ctx context.Context, orgID string, stateName alertmanagertypes.StateName, state alertmanagertypes.State) (int64, error) {
|
||||||
|
var err error
|
||||||
|
provider.States[orgID][stateName], err = state.MarshalBinary()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return int64(len(provider.States[orgID][stateName])), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *Provider) GetConfig(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) {
|
||||||
|
if _, ok := provider.Configs[orgID]; !ok {
|
||||||
|
return nil, errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerConfigNotFound, "cannot find config for org %s", orgID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return alertmanagertypes.NewConfigFromString(provider.Configs[orgID], orgID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *Provider) SetConfig(ctx context.Context, orgID string, config *alertmanagertypes.Config) error {
|
||||||
|
provider.Configs[orgID] = string(config.Raw())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *Provider) DelConfig(ctx context.Context, orgID string) error {
|
||||||
|
delete(provider.Configs, orgID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *Provider) ListOrgIDs(ctx context.Context) ([]string, error) {
|
||||||
|
return provider.OrgIDs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *Provider) ListChannels(ctx context.Context, orgID string) (alertmanagertypes.Channels, error) {
|
||||||
|
if _, ok := provider.Configs[orgID]; !ok {
|
||||||
|
return nil, errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerConfigNotFound, "cannot find config for org %s", orgID)
|
||||||
|
}
|
||||||
|
|
||||||
|
config, err := alertmanagertypes.NewConfigFromString(provider.Configs[orgID], orgID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return config.Channels(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *Provider) GetChannel(ctx context.Context, orgID string, id uint64) (*alertmanagertypes.Channel, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
17
pkg/alertmanager/alertmanagerstore/config.go
Normal file
17
pkg/alertmanager/alertmanagerstore/config.go
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
package alertmanagerstore
|
||||||
|
|
||||||
|
import "go.signoz.io/signoz/pkg/factory"
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
Provider string `mapstructure:"provider"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConfig() factory.Config {
|
||||||
|
return Config{
|
||||||
|
Provider: "sql",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Config) Validate() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,248 @@
|
|||||||
|
package sqlalertmanagerstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"encoding/base64"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
|
||||||
|
"go.signoz.io/signoz/pkg/errors"
|
||||||
|
"go.signoz.io/signoz/pkg/factory"
|
||||||
|
"go.signoz.io/signoz/pkg/sqlstore"
|
||||||
|
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||||
|
)
|
||||||
|
|
||||||
|
type provider struct {
|
||||||
|
sqlstore sqlstore.SQLStore
|
||||||
|
settings factory.ScopedProviderSettings
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanagerstore.Store, alertmanagerstore.Config] {
|
||||||
|
return factory.NewProviderFactory(factory.MustNewName("sql"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanagerstore.Config) (alertmanagerstore.Store, error) {
|
||||||
|
return New(ctx, settings, config, sqlstore)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(ctx context.Context, settings factory.ProviderSettings, config alertmanagerstore.Config, sqlstore sqlstore.SQLStore) (*provider, error) {
|
||||||
|
return &provider{
|
||||||
|
sqlstore: sqlstore,
|
||||||
|
settings: factory.NewScopedProviderSettings(settings, "go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) GetState(ctx context.Context, orgID string, stateName alertmanagertypes.StateName) (string, error) {
|
||||||
|
storedConfig := new(alertmanagertypes.StoredConfig)
|
||||||
|
|
||||||
|
err := provider.
|
||||||
|
sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewSelect().
|
||||||
|
Model(storedConfig).
|
||||||
|
Where("org_id = ?", orgID).
|
||||||
|
Scan(ctx)
|
||||||
|
if err != nil {
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return "", errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerStateNotFound, "cannot find alertmanager state for org %s", orgID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if stateName == alertmanagertypes.SilenceStateName {
|
||||||
|
decodedState, err := base64.RawStdEncoding.DecodeString(storedConfig.SilencesState)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(decodedState), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if stateName == alertmanagertypes.NFLogStateName {
|
||||||
|
decodedState, err := base64.RawStdEncoding.DecodeString(storedConfig.NFLogState)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(decodedState), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return "", errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerStateNotFound, "cannot find alertmanager state for org %s", orgID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) SetState(ctx context.Context, orgID string, stateName alertmanagertypes.StateName, state alertmanagertypes.State) (int64, error) {
|
||||||
|
marshalledState, err := state.MarshalBinary()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
encodedState := base64.StdEncoding.EncodeToString(marshalledState)
|
||||||
|
|
||||||
|
q := provider.
|
||||||
|
sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewUpdate().
|
||||||
|
Model(&alertmanagertypes.StoredConfig{}).
|
||||||
|
Where("org_id = ?", orgID)
|
||||||
|
|
||||||
|
if stateName == alertmanagertypes.SilenceStateName {
|
||||||
|
q.Set("silences_state = ?", encodedState)
|
||||||
|
}
|
||||||
|
|
||||||
|
if stateName == alertmanagertypes.NFLogStateName {
|
||||||
|
q.Set("nflog_state = ?", encodedState)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = q.Exec(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return int64(len(marshalledState)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) GetConfig(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) {
|
||||||
|
storedConfig := new(alertmanagertypes.StoredConfig)
|
||||||
|
|
||||||
|
err := provider.
|
||||||
|
sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewSelect().
|
||||||
|
Model(storedConfig).
|
||||||
|
Where("org_id = ?", orgID).
|
||||||
|
Scan(ctx)
|
||||||
|
if err != nil {
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return nil, errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerConfigNotFound, "cannot find alertmanager config for org %s", orgID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
config, err := alertmanagertypes.NewConfigFromString(storedConfig.Config, orgID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return config, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) SetConfig(ctx context.Context, orgID string, config *alertmanagertypes.Config) error {
|
||||||
|
tx, err := provider.sqlstore.BunDB().BeginTx(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer tx.Rollback() //nolint:errcheck
|
||||||
|
|
||||||
|
if _, err = tx.
|
||||||
|
NewInsert().
|
||||||
|
Model(config.StoredConfig()).
|
||||||
|
On("CONFLICT (org_id) DO UPDATE").
|
||||||
|
Set("config = ?", string(config.StoredConfig().Config)).
|
||||||
|
Set("updated_at = ?", config.StoredConfig().UpdatedAt).
|
||||||
|
Exec(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
channels := config.Channels()
|
||||||
|
fmt.Println("channels", channels)
|
||||||
|
if len(channels) != 0 {
|
||||||
|
fmt.Println("channels", channels)
|
||||||
|
if _, err = tx.NewInsert().
|
||||||
|
Model(&channels).
|
||||||
|
On("CONFLICT (name) DO UPDATE").
|
||||||
|
Set("data = EXCLUDED.data").
|
||||||
|
Set("updated_at = EXCLUDED.updated_at").
|
||||||
|
Exec(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = tx.Commit(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) DelConfig(ctx context.Context, orgID string) error {
|
||||||
|
_, err := provider.
|
||||||
|
sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewDelete().
|
||||||
|
Model(&alertmanagertypes.StoredConfig{}).
|
||||||
|
Where("org_id = ?", orgID).
|
||||||
|
Exec(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = provider.
|
||||||
|
sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewDelete().
|
||||||
|
Model(&alertmanagertypes.Channel{}).
|
||||||
|
Where("org_id = ?", orgID).
|
||||||
|
Exec(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) ListOrgIDs(ctx context.Context) ([]string, error) {
|
||||||
|
var orgIDs []string
|
||||||
|
|
||||||
|
err := provider.
|
||||||
|
sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewSelect().
|
||||||
|
Table("organizations").
|
||||||
|
ColumnExpr("id").
|
||||||
|
Scan(ctx, &orgIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return orgIDs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) ListChannels(ctx context.Context, orgID string) (alertmanagertypes.Channels, error) {
|
||||||
|
channels := alertmanagertypes.Channels{}
|
||||||
|
|
||||||
|
err := provider.
|
||||||
|
sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewSelect().
|
||||||
|
Model(&channels).
|
||||||
|
Where("org_id = ?", orgID).
|
||||||
|
Scan(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return channels, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) GetChannel(ctx context.Context, orgID string, id uint64) (*alertmanagertypes.Channel, error) {
|
||||||
|
channel := new(alertmanagertypes.Channel)
|
||||||
|
|
||||||
|
err := provider.
|
||||||
|
sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewSelect().
|
||||||
|
Model(channel).
|
||||||
|
Where("org_id = ?", orgID).
|
||||||
|
Where("id = ?", id).
|
||||||
|
Scan(ctx)
|
||||||
|
if err != nil {
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return nil, errors.Newf(errors.TypeNotFound, alertmanagerstore.ErrCodeAlertmanagerChannelNotFound, "cannot find channel for org %s", orgID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return channel, nil
|
||||||
|
}
|
||||||
44
pkg/alertmanager/alertmanagerstore/store.go
Normal file
44
pkg/alertmanager/alertmanagerstore/store.go
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
package alertmanagerstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"go.signoz.io/signoz/pkg/errors"
|
||||||
|
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrCodeAlertmanagerConfigNotFound = errors.MustNewCode("alertmanager_config_not_found")
|
||||||
|
ErrCodeAlertmanagerStateNotFound = errors.MustNewCode("alertmanager_state_not_found")
|
||||||
|
ErrCodeAlertmanagerChannelNotFound = errors.MustNewCode("alertmanager_channel_not_found")
|
||||||
|
)
|
||||||
|
|
||||||
|
type Store interface {
|
||||||
|
// Creates the silence or the notification log state and returns the number of bytes in the state.
|
||||||
|
// The return type matches the return of `silence.Maintenance` or `nflog.Maintenance`.
|
||||||
|
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/silence/silence.go#L217
|
||||||
|
// and https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L94
|
||||||
|
SetState(context.Context, string, alertmanagertypes.StateName, alertmanagertypes.State) (int64, error)
|
||||||
|
|
||||||
|
// Gets the silence state or the notification log state as a string from the store. This is used as a snapshot to load the
|
||||||
|
// initial state of silences or notification log when starting the alertmanager.
|
||||||
|
GetState(context.Context, string, alertmanagertypes.StateName) (string, error)
|
||||||
|
|
||||||
|
// Get an alertmanager config for an organization
|
||||||
|
GetConfig(context.Context, string) (*alertmanagertypes.Config, error)
|
||||||
|
|
||||||
|
// Set an alertmanager config for an organization
|
||||||
|
SetConfig(context.Context, string, *alertmanagertypes.Config) error
|
||||||
|
|
||||||
|
// Deletes the config for an organization
|
||||||
|
DelConfig(context.Context, string) error
|
||||||
|
|
||||||
|
// Get all organization ids
|
||||||
|
ListOrgIDs(context.Context) ([]string, error)
|
||||||
|
|
||||||
|
// Get all channels for an organization
|
||||||
|
ListChannels(context.Context, string) (alertmanagertypes.Channels, error)
|
||||||
|
|
||||||
|
// Get a channel for an organization
|
||||||
|
GetChannel(context.Context, string, uint64) (*alertmanagertypes.Channel, error)
|
||||||
|
}
|
||||||
36
pkg/alertmanager/client.go
Normal file
36
pkg/alertmanager/client.go
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
package alertmanager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Client interface {
|
||||||
|
// GetAlerts gets the alerts from the alertmanager per organization.
|
||||||
|
GetAlerts(context.Context, string, alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error)
|
||||||
|
|
||||||
|
// PutAlerts puts the alerts into the alertmanager per organization.
|
||||||
|
PutAlerts(context.Context, string, alertmanagertypes.PostableAlerts) error
|
||||||
|
|
||||||
|
// SetConfig sets the config into the alertmanager per organization.
|
||||||
|
SetConfig(context.Context, string, alertmanagertypes.PostableConfig) error
|
||||||
|
|
||||||
|
// TestReceiver sends a test alert to a receiver.
|
||||||
|
TestReceiver(context.Context, string, alertmanagertypes.Receiver) error
|
||||||
|
|
||||||
|
// CreateChannel creates a channel for the organization.
|
||||||
|
CreateChannel(context.Context, string, *alertmanagertypes.Channel) error
|
||||||
|
|
||||||
|
// GetChannel gets a channel for the organization.
|
||||||
|
GetChannel(context.Context, string, uint64) (*alertmanagertypes.Channel, error)
|
||||||
|
|
||||||
|
// DeleteChannel deletes a channel for the organization.
|
||||||
|
DelChannel(context.Context, string, uint64) error
|
||||||
|
|
||||||
|
// ListChannels lists all channels for the organization.
|
||||||
|
ListChannels(context.Context, string) (alertmanagertypes.Channels, error)
|
||||||
|
|
||||||
|
// UpdateChannel updates a channel for the organization.
|
||||||
|
UpdateChannel(context.Context, string, uint64, string) error
|
||||||
|
}
|
||||||
145
pkg/alertmanager/config.go
Normal file
145
pkg/alertmanager/config.go
Normal file
@@ -0,0 +1,145 @@
|
|||||||
|
package alertmanager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
|
||||||
|
"go.signoz.io/signoz/pkg/factory"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
// PollInterval is the interval at which the alertmanager config is polled from the store.
|
||||||
|
PollInterval time.Duration `mapstructure:"poll_interval"`
|
||||||
|
|
||||||
|
// The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself.
|
||||||
|
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L155C54-L155C249
|
||||||
|
ExternalUrl *url.URL `mapstructure:"external_url"`
|
||||||
|
|
||||||
|
// ResolveTimeout is the time after which an alert is declared resolved if it has not been updated.
|
||||||
|
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/config/config.go#L836
|
||||||
|
ResolveTimeout time.Duration `mapstructure:"resolve_timeout"`
|
||||||
|
|
||||||
|
// Config of the root node of the routing tree.
|
||||||
|
Route RouteConfig `mapstructure:"route"`
|
||||||
|
|
||||||
|
// Configuration for alerts.
|
||||||
|
Alerts AlertsConfig `mapstructure:"alerts"`
|
||||||
|
|
||||||
|
// Configuration for silences.
|
||||||
|
Silences SilencesConfig `mapstructure:"silences"`
|
||||||
|
|
||||||
|
// Configuration for the notification log.
|
||||||
|
NFLog NFLogConfig `mapstructure:"nflog"`
|
||||||
|
|
||||||
|
// Configuration for the Email receiver. We are explicitly defining this here instead of taking it as part of the receiver configuration.
|
||||||
|
// This is because we want to use the same SMTP configuration for all receivers.
|
||||||
|
SMTP SMTPConfig `mapstructure:"smtp"`
|
||||||
|
|
||||||
|
// Configuration for the alertmanagerstore.
|
||||||
|
Store alertmanagerstore.Config `mapstructure:"store"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type RouteConfig struct {
|
||||||
|
GroupBy []string `mapstructure:"group_by"`
|
||||||
|
GroupInterval time.Duration `mapstructure:"group_interval"`
|
||||||
|
GroupWait time.Duration `mapstructure:"group_wait"`
|
||||||
|
RepeatInterval time.Duration `mapstructure:"repeat_interval"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is a best effort to make it similar to the upstream alertmanager config. See
|
||||||
|
// https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/config/config.go#L843
|
||||||
|
type SMTPConfig struct {
|
||||||
|
Hello string `mapstructure:"hello"`
|
||||||
|
From string `mapstructure:"from"`
|
||||||
|
Host string `mapstructure:"host"`
|
||||||
|
Port int `mapstructure:"port"`
|
||||||
|
AuthUsername string `mapstructure:"auth_username"`
|
||||||
|
AuthPassword string `mapstructure:"auth_password"`
|
||||||
|
AuthSecret string `mapstructure:"auth_secret"`
|
||||||
|
AuthIdentity string `mapstructure:"auth_identity"`
|
||||||
|
RequireTLS bool `mapstructure:"require_tls"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type AlertsConfig struct {
|
||||||
|
// Interval between garbage collection of alerts.
|
||||||
|
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L152
|
||||||
|
GCInterval time.Duration `mapstructure:"gc_interval"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type SilencesConfig struct {
|
||||||
|
// Maximum number of silences, including expired silences. If negative or zero, no limit is set.
|
||||||
|
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L150C64-L150C157
|
||||||
|
Max int `mapstructure:"max"`
|
||||||
|
|
||||||
|
// Maximum size of the silences in bytes. If negative or zero, no limit is set.
|
||||||
|
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L150C64-L150C157
|
||||||
|
MaxSizeBytes int `mapstructure:"max_size_bytes"`
|
||||||
|
|
||||||
|
// Interval between garbage collection and snapshotting of the silences. The snapshot will be stored in the state store.
|
||||||
|
// The upstream alertmanager config (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149) has
|
||||||
|
// been split between silences and nflog.
|
||||||
|
MaintenanceInterval time.Duration `mapstructure:"maintenance_interval"`
|
||||||
|
|
||||||
|
// Retention of the silences.
|
||||||
|
Retention time.Duration `mapstructure:"retention"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type NFLogConfig struct {
|
||||||
|
// Interval between garbage collection and snapshotting of the notification logs. The snapshot will be stored in the state store.
|
||||||
|
// The upstream alertmanager config (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149) has
|
||||||
|
// been split between silences and nflog.
|
||||||
|
MaintenanceInterval time.Duration `mapstructure:"maintenance_interval"`
|
||||||
|
|
||||||
|
// Retention of the notification logs.
|
||||||
|
Retention time.Duration `mapstructure:"retention"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConfigFactory() factory.ConfigFactory {
|
||||||
|
return factory.NewConfigFactory(factory.MustNewName("alertmanager"), newConfig)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newConfig() factory.Config {
|
||||||
|
return Config{
|
||||||
|
PollInterval: 15 * time.Second,
|
||||||
|
ExternalUrl: &url.URL{
|
||||||
|
Host: "localhost:8080",
|
||||||
|
},
|
||||||
|
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/config/config.go#L727)
|
||||||
|
ResolveTimeout: 5 * time.Minute,
|
||||||
|
Route: RouteConfig{
|
||||||
|
GroupBy: []string{"alertname"},
|
||||||
|
GroupInterval: 5 * time.Minute,
|
||||||
|
GroupWait: 30 * time.Second,
|
||||||
|
RepeatInterval: 4 * time.Hour,
|
||||||
|
},
|
||||||
|
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L152)
|
||||||
|
Alerts: AlertsConfig{
|
||||||
|
GCInterval: 30 * time.Minute,
|
||||||
|
},
|
||||||
|
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149-L151)
|
||||||
|
Silences: SilencesConfig{
|
||||||
|
Max: 0,
|
||||||
|
MaxSizeBytes: 0,
|
||||||
|
MaintenanceInterval: 15 * time.Minute,
|
||||||
|
Retention: 120 * time.Hour,
|
||||||
|
},
|
||||||
|
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149)
|
||||||
|
NFLog: NFLogConfig{
|
||||||
|
MaintenanceInterval: 15 * time.Minute,
|
||||||
|
Retention: 120 * time.Hour,
|
||||||
|
},
|
||||||
|
SMTP: SMTPConfig{
|
||||||
|
Hello: "localhost",
|
||||||
|
From: "alertmanager@signoz.io",
|
||||||
|
Host: "localhost",
|
||||||
|
Port: 25,
|
||||||
|
RequireTLS: true,
|
||||||
|
},
|
||||||
|
Store: alertmanagerstore.NewConfig().(alertmanagerstore.Config),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Config) Validate() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
339
pkg/alertmanager/server.go
Normal file
339
pkg/alertmanager/server.go
Normal file
@@ -0,0 +1,339 @@
|
|||||||
|
package alertmanager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/alertmanager/dispatch"
|
||||||
|
"github.com/prometheus/alertmanager/featurecontrol"
|
||||||
|
"github.com/prometheus/alertmanager/inhibit"
|
||||||
|
"github.com/prometheus/alertmanager/nflog"
|
||||||
|
"github.com/prometheus/alertmanager/notify"
|
||||||
|
"github.com/prometheus/alertmanager/provider/mem"
|
||||||
|
"github.com/prometheus/alertmanager/silence"
|
||||||
|
"github.com/prometheus/alertmanager/template"
|
||||||
|
"github.com/prometheus/alertmanager/timeinterval"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
|
||||||
|
"go.signoz.io/signoz/pkg/errors"
|
||||||
|
"go.signoz.io/signoz/pkg/factory"
|
||||||
|
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ factory.Service = (*Server)(nil)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// This is not a real file and will never be used. We need this placeholder to ensure maintenance runs on shutdown. See
|
||||||
|
// https://github.com/prometheus/server/blob/3ee2cd0f1271e277295c02b6160507b4d193dde2/silence/silence.go#L435-L438
|
||||||
|
// and https://github.com/prometheus/server/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L362.
|
||||||
|
snapfnoop string = "snapfnoop"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Server struct {
|
||||||
|
// srvConfig is the server config for the alertmanager
|
||||||
|
srvConfig Config
|
||||||
|
// alertmanagerConfigHash is the hash of the alertmanager config
|
||||||
|
alertmanagerConfigHash [16]byte
|
||||||
|
// alertmanagerConfigRaw is the raw config of the alertmanager
|
||||||
|
alertmanagerConfigRaw []byte
|
||||||
|
// alertmanagerConfig is the config of the alertmanager
|
||||||
|
alertmanagerConfig *alertmanagertypes.Config
|
||||||
|
// Settings is the factorysettings for the alertmanager
|
||||||
|
settings factory.NamespacedSettings
|
||||||
|
// orgID is the orgID for the alertmanager
|
||||||
|
orgID string
|
||||||
|
// store is the backing store for the alertmanager
|
||||||
|
store alertmanagerstore.Store
|
||||||
|
// alertmanager primitives from upstream alertmanager
|
||||||
|
alerts *mem.Alerts
|
||||||
|
nflog *nflog.Log
|
||||||
|
dispatcher *dispatch.Dispatcher
|
||||||
|
dispatcherMetrics *dispatch.DispatcherMetrics
|
||||||
|
inhibitor *inhibit.Inhibitor
|
||||||
|
silencer *silence.Silencer
|
||||||
|
silences *silence.Silences
|
||||||
|
timeIntervals map[string][]timeinterval.TimeInterval
|
||||||
|
pipelineBuilder *notify.PipelineBuilder
|
||||||
|
marker *alertmanagertypes.MemMarker
|
||||||
|
tmpl *template.Template
|
||||||
|
wg sync.WaitGroup
|
||||||
|
stopc chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewForOrg(ctx context.Context, settings factory.Settings, srvConfig Config, orgID string, store alertmanagerstore.Store) (*Server, error) {
|
||||||
|
server := &Server{
|
||||||
|
srvConfig: srvConfig,
|
||||||
|
settings: factory.NewNamespacedSettings(settings, "go.signoz.io/signoz/pkg/alertmanager"),
|
||||||
|
orgID: orgID,
|
||||||
|
store: store,
|
||||||
|
stopc: make(chan struct{}),
|
||||||
|
}
|
||||||
|
// initialize marker
|
||||||
|
server.marker = alertmanagertypes.NewMarker(server.settings.PrometheusRegisterer())
|
||||||
|
|
||||||
|
// get silences for initial state
|
||||||
|
silencesstate, err := store.GetState(ctx, server.orgID, alertmanagertypes.SilenceStateName)
|
||||||
|
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// get nflog for initial state
|
||||||
|
nflogstate, err := store.GetState(ctx, server.orgID, alertmanagertypes.NFLogStateName)
|
||||||
|
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize silences
|
||||||
|
server.silences, err = silence.New(silence.Options{
|
||||||
|
SnapshotReader: strings.NewReader(silencesstate),
|
||||||
|
Retention: srvConfig.Silences.Retention,
|
||||||
|
Limits: silence.Limits{
|
||||||
|
MaxSilences: func() int { return srvConfig.Silences.Max },
|
||||||
|
MaxSilenceSizeBytes: func() int { return srvConfig.Silences.MaxSizeBytes },
|
||||||
|
},
|
||||||
|
Metrics: server.settings.PrometheusRegisterer(),
|
||||||
|
Logger: server.settings.Logger(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize notification log
|
||||||
|
server.nflog, err = nflog.New(nflog.Options{
|
||||||
|
SnapshotReader: strings.NewReader(nflogstate),
|
||||||
|
Retention: server.srvConfig.NFLog.Retention,
|
||||||
|
Metrics: server.settings.PrometheusRegisterer(),
|
||||||
|
Logger: server.settings.Logger(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start maintenance for silences
|
||||||
|
server.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer server.wg.Done()
|
||||||
|
server.silences.Maintenance(server.srvConfig.Silences.MaintenanceInterval, snapfnoop, server.stopc, func() (int64, error) {
|
||||||
|
// Delete silences older than the retention period.
|
||||||
|
if _, err := server.silences.GC(); err != nil {
|
||||||
|
server.settings.Logger().ErrorContext(ctx, "silence garbage collection", "error", err)
|
||||||
|
// Don't return here - we need to snapshot our state first.
|
||||||
|
}
|
||||||
|
|
||||||
|
return server.store.SetState(ctx, server.orgID, alertmanagertypes.SilenceStateName, server.silences)
|
||||||
|
})
|
||||||
|
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Start maintenance for notification logs
|
||||||
|
server.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer server.wg.Done()
|
||||||
|
server.nflog.Maintenance(server.srvConfig.NFLog.MaintenanceInterval, snapfnoop, server.stopc, func() (int64, error) {
|
||||||
|
if _, err := server.nflog.GC(); err != nil {
|
||||||
|
server.settings.Logger().ErrorContext(ctx, "notification log garbage collection", "error", err)
|
||||||
|
// Don't return without saving the current state.
|
||||||
|
}
|
||||||
|
|
||||||
|
return server.store.SetState(ctx, server.orgID, alertmanagertypes.NFLogStateName, server.nflog)
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
|
||||||
|
server.alerts, err = mem.NewAlerts(ctx, server.marker, server.srvConfig.Alerts.GCInterval, nil, server.settings.Logger(), server.settings.PrometheusRegisterer())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
server.pipelineBuilder = notify.NewPipelineBuilder(server.settings.PrometheusRegisterer(), featurecontrol.NoopFlags{})
|
||||||
|
server.dispatcherMetrics = dispatch.NewDispatcherMetrics(false, server.settings.PrometheusRegisterer())
|
||||||
|
|
||||||
|
return server, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (server *Server) Start(ctx context.Context) error {
|
||||||
|
config, err := server.store.GetConfig(ctx, server.orgID)
|
||||||
|
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if config == nil {
|
||||||
|
config = alertmanagertypes.NewDefaultConfig(
|
||||||
|
server.srvConfig.ResolveTimeout,
|
||||||
|
server.srvConfig.SMTP.Hello,
|
||||||
|
server.srvConfig.SMTP.From,
|
||||||
|
server.srvConfig.SMTP.Host,
|
||||||
|
server.srvConfig.SMTP.Port,
|
||||||
|
server.srvConfig.SMTP.AuthUsername,
|
||||||
|
server.srvConfig.SMTP.AuthPassword,
|
||||||
|
server.srvConfig.SMTP.AuthSecret,
|
||||||
|
server.srvConfig.SMTP.AuthIdentity,
|
||||||
|
server.srvConfig.SMTP.RequireTLS,
|
||||||
|
server.srvConfig.Route.GroupBy,
|
||||||
|
server.srvConfig.Route.GroupInterval,
|
||||||
|
server.srvConfig.Route.GroupWait,
|
||||||
|
server.srvConfig.Route.RepeatInterval,
|
||||||
|
server.orgID,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
return server.SetConfig(ctx, config)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (server *Server) GetAlerts(ctx context.Context, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
|
||||||
|
return alertmanagertypes.NewGettableAlertsFromAlertProvider(server.alerts, server.alertmanagerConfig, server.marker.Status, func(labels model.LabelSet) {
|
||||||
|
server.inhibitor.Mutes(labels)
|
||||||
|
server.silencer.Mutes(labels)
|
||||||
|
}, params)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (server *Server) PutAlerts(ctx context.Context, postableAlerts alertmanagertypes.PostableAlerts) error {
|
||||||
|
alerts, err := alertmanagertypes.NewAlertsFromPostableAlerts(postableAlerts, server.srvConfig.ResolveTimeout, time.Now())
|
||||||
|
|
||||||
|
// Notification sending alert takes precedence over validation errors.
|
||||||
|
if err := server.alerts.Put(alerts...); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return errors.Join(err...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (server *Server) ConfigHash() [16]byte {
|
||||||
|
return server.alertmanagerConfigHash
|
||||||
|
}
|
||||||
|
|
||||||
|
func (server *Server) ConfigRaw() []byte {
|
||||||
|
return server.alertmanagerConfigRaw
|
||||||
|
}
|
||||||
|
|
||||||
|
func (server *Server) SetConfig(ctx context.Context, alertmanagerConfig *alertmanagertypes.Config) error {
|
||||||
|
config := alertmanagerConfig.AlertmanagerConfig()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
server.tmpl, err = template.FromGlobs(config.Templates)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
server.tmpl.ExternalURL = server.srvConfig.ExternalUrl
|
||||||
|
|
||||||
|
// Build the routing tree and record which receivers are used.
|
||||||
|
routes := dispatch.NewRoute(config.Route, nil)
|
||||||
|
activeReceivers := make(map[string]struct{})
|
||||||
|
routes.Walk(func(r *dispatch.Route) {
|
||||||
|
activeReceivers[r.RouteOpts.Receiver] = struct{}{}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Build the map of receiver to integrations.
|
||||||
|
receivers := make(map[string][]notify.Integration, len(activeReceivers))
|
||||||
|
var integrationsNum int
|
||||||
|
for _, rcv := range config.Receivers {
|
||||||
|
if _, found := activeReceivers[rcv.Name]; !found {
|
||||||
|
// No need to build a receiver if no route is using it.
|
||||||
|
server.settings.Logger().InfoContext(ctx, "skipping creation of receiver not referenced by any route", "receiver", rcv.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
integrations, err := alertmanagertypes.NewReceiverIntegrations(rcv, server.tmpl, server.settings.Logger())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// rcv.Name is guaranteed to be unique across all receivers.
|
||||||
|
receivers[rcv.Name] = integrations
|
||||||
|
integrationsNum += len(integrations)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the map of time interval names to time interval definitions.
|
||||||
|
timeIntervals := make(map[string][]timeinterval.TimeInterval, len(config.MuteTimeIntervals)+len(config.TimeIntervals))
|
||||||
|
for _, ti := range config.MuteTimeIntervals {
|
||||||
|
timeIntervals[ti.Name] = ti.TimeIntervals
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ti := range config.TimeIntervals {
|
||||||
|
timeIntervals[ti.Name] = ti.TimeIntervals
|
||||||
|
}
|
||||||
|
|
||||||
|
intervener := timeinterval.NewIntervener(timeIntervals)
|
||||||
|
|
||||||
|
if server.inhibitor != nil {
|
||||||
|
server.inhibitor.Stop()
|
||||||
|
}
|
||||||
|
if server.dispatcher != nil {
|
||||||
|
server.dispatcher.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
server.inhibitor = inhibit.NewInhibitor(server.alerts, config.InhibitRules, server.marker, server.settings.Logger())
|
||||||
|
server.timeIntervals = timeIntervals
|
||||||
|
server.silencer = silence.NewSilencer(server.silences, server.marker, server.settings.Logger())
|
||||||
|
|
||||||
|
var pipelinePeer notify.Peer
|
||||||
|
pipeline := server.pipelineBuilder.New(
|
||||||
|
receivers,
|
||||||
|
func() time.Duration { return 0 },
|
||||||
|
server.inhibitor,
|
||||||
|
server.silencer,
|
||||||
|
intervener,
|
||||||
|
server.marker,
|
||||||
|
server.nflog,
|
||||||
|
pipelinePeer,
|
||||||
|
)
|
||||||
|
|
||||||
|
timeoutFunc := func(d time.Duration) time.Duration {
|
||||||
|
if d < notify.MinTimeout {
|
||||||
|
d = notify.MinTimeout
|
||||||
|
}
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
server.dispatcher = dispatch.NewDispatcher(
|
||||||
|
server.alerts,
|
||||||
|
routes,
|
||||||
|
pipeline,
|
||||||
|
server.marker,
|
||||||
|
timeoutFunc,
|
||||||
|
nil,
|
||||||
|
server.settings.Logger(),
|
||||||
|
server.dispatcherMetrics,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Do not try to add these to `server.wg as there seems to be a race condition if
|
||||||
|
// we call Start() and Stop() in quick succession.
|
||||||
|
// Both these goroutines will run indefinitely.
|
||||||
|
go server.dispatcher.Run()
|
||||||
|
go server.inhibitor.Run()
|
||||||
|
|
||||||
|
server.alertmanagerConfigHash = alertmanagerConfig.Hash()
|
||||||
|
server.alertmanagerConfigRaw = alertmanagerConfig.Raw()
|
||||||
|
server.alertmanagerConfig = alertmanagerConfig
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (server *Server) TestReceiver(ctx context.Context, receiver alertmanagertypes.Receiver) error {
|
||||||
|
return alertmanagertypes.TestReceiver(ctx, receiver, server.tmpl, server.settings.Logger())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (server *Server) Stop(ctx context.Context) error {
|
||||||
|
if server.dispatcher != nil {
|
||||||
|
server.dispatcher.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
if server.inhibitor != nil {
|
||||||
|
server.inhibitor.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the alert provider.
|
||||||
|
server.alerts.Close()
|
||||||
|
|
||||||
|
// Signals maintenance goroutines of server states to stop.
|
||||||
|
close(server.stopc)
|
||||||
|
|
||||||
|
// Wait for all goroutines to finish.
|
||||||
|
server.wg.Wait()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
84
pkg/alertmanager/server_test.go
Normal file
84
pkg/alertmanager/server_test.go
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
package alertmanager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/prometheus/alertmanager/config"
|
||||||
|
commoncfg "github.com/prometheus/common/config"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
|
||||||
|
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/alertmanagerstoretest"
|
||||||
|
"go.signoz.io/signoz/pkg/factory/factorytest"
|
||||||
|
"go.signoz.io/signoz/pkg/factory/providertest"
|
||||||
|
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestServerStartStop(t *testing.T) {
|
||||||
|
store, err := alertmanagerstoretest.New(context.Background(), providertest.NewSettings(), alertmanagerstore.NewConfig().(alertmanagerstore.Config), []string{"1"})
|
||||||
|
require.NoError(t, err)
|
||||||
|
server, err := NewForOrg(context.Background(), factorytest.NewSettings(), newConfig().(Config), "1", store)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, server.Start(context.Background()))
|
||||||
|
require.NoError(t, server.Stop(context.Background()))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServerWithDefaultConfig(t *testing.T) {
|
||||||
|
store, err := alertmanagerstoretest.New(context.Background(), providertest.NewSettings(), alertmanagerstore.NewConfig().(alertmanagerstore.Config), []string{"1"})
|
||||||
|
require.NoError(t, err)
|
||||||
|
server, err := NewForOrg(context.Background(), factorytest.NewSettings(), newConfig().(Config), "1", store)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, server.Start(context.Background()))
|
||||||
|
defer require.NoError(t, server.Stop(context.Background()))
|
||||||
|
|
||||||
|
assert.Equal(t, `{"global":{"resolve_timeout":"5m","http_config":{"tls_config":{"insecure_skip_verify":false},"follow_redirects":true,"enable_http2":true,"proxy_url":null},"smtp_from":"alertmanager@signoz.io","smtp_hello":"localhost","smtp_smarthost":"localhost:25","smtp_require_tls":true,"smtp_tls_config":{"insecure_skip_verify":false},"pagerduty_url":"https://events.pagerduty.com/v2/enqueue","opsgenie_api_url":"https://api.opsgenie.com/","wechat_api_url":"https://qyapi.weixin.qq.com/cgi-bin/","victorops_api_url":"https://alert.victorops.com/integrations/generic/20131114/alert/","telegram_api_url":"https://api.telegram.org","webex_api_url":"https://webexapis.com/v1/messages","rocketchat_api_url":"https://open.rocket.chat/"},"route":{"receiver":"default-receiver","group_by":["alertname"],"group_wait":"30s","group_interval":"5m","repeat_interval":"4h"},"receivers":[{"name":"default-receiver"}],"templates":null}`, string(server.alertmanagerConfigRaw))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServerTestReceiverWebhook(t *testing.T) {
|
||||||
|
store, err := alertmanagerstoretest.New(context.Background(), providertest.NewSettings(), alertmanagerstore.NewConfig().(alertmanagerstore.Config), []string{"1"})
|
||||||
|
require.NoError(t, err)
|
||||||
|
server, err := NewForOrg(context.Background(), factorytest.NewSettings(), newConfig().(Config), "1", store)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
webhookListener, err := net.Listen("tcp", "localhost:0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
requestBody := new(bytes.Buffer)
|
||||||
|
webhookServer := &http.Server{
|
||||||
|
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
_, err := requestBody.ReadFrom(r.Body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
require.NoError(t, webhookServer.Serve(webhookListener))
|
||||||
|
}()
|
||||||
|
|
||||||
|
require.NoError(t, server.Start(context.Background()))
|
||||||
|
defer require.NoError(t, server.Stop(context.Background()))
|
||||||
|
|
||||||
|
webhookURL, err := url.Parse("http://" + webhookListener.Addr().String() + "/webhook")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = server.TestReceiver(context.Background(), alertmanagertypes.Receiver{
|
||||||
|
Name: "test-receiver",
|
||||||
|
WebhookConfigs: []*config.WebhookConfig{
|
||||||
|
{
|
||||||
|
HTTPConfig: &commoncfg.HTTPClientConfig{},
|
||||||
|
URL: &config.SecretURL{URL: webhookURL},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Contains(t, requestBody.String(), "test-receiver")
|
||||||
|
require.Contains(t, requestBody.String(), "firing")
|
||||||
|
}
|
||||||
214
pkg/alertmanager/servers.go
Normal file
214
pkg/alertmanager/servers.go
Normal file
@@ -0,0 +1,214 @@
|
|||||||
|
package alertmanager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore"
|
||||||
|
"go.signoz.io/signoz/pkg/errors"
|
||||||
|
"go.signoz.io/signoz/pkg/factory"
|
||||||
|
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrCodeAlertmanagerNotFound = errors.MustNewCode("alertmanager_not_found")
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ factory.Service = (*Servers)(nil)
|
||||||
|
var _ Client = (*Servers)(nil)
|
||||||
|
|
||||||
|
type Servers struct {
|
||||||
|
config Config
|
||||||
|
// Store is the store for the alertmanager
|
||||||
|
store alertmanagerstore.Store
|
||||||
|
// Map of organization id to server
|
||||||
|
servers map[string]*Server
|
||||||
|
// Mutex to protect the servers map
|
||||||
|
serversMtx sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(ctx context.Context, settings factory.Settings, config Config, store alertmanagerstore.Store) (*Servers, error) {
|
||||||
|
servers := &Servers{
|
||||||
|
config: config,
|
||||||
|
store: store,
|
||||||
|
servers: map[string]*Server{},
|
||||||
|
serversMtx: sync.RWMutex{},
|
||||||
|
}
|
||||||
|
|
||||||
|
orgIDs, err := store.ListOrgIDs(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, orgID := range orgIDs {
|
||||||
|
server, err := NewForOrg(ctx, settings, config, orgID, store)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
servers.servers[orgID] = server
|
||||||
|
}
|
||||||
|
|
||||||
|
return servers, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *Servers) Start(ctx context.Context) error {
|
||||||
|
for _, server := range ss.servers {
|
||||||
|
err := server.Start(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ticker := time.NewTicker(ss.config.PollInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case <-ticker.C:
|
||||||
|
for _, server := range ss.servers {
|
||||||
|
config, err := ss.store.GetConfig(ctx, server.orgID)
|
||||||
|
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||||
|
server.settings.Logger().ErrorContext(ctx, "failed to get config", "error", err, "orgID", server.orgID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if config == nil {
|
||||||
|
config = alertmanagertypes.NewDefaultConfig(
|
||||||
|
server.srvConfig.ResolveTimeout,
|
||||||
|
server.srvConfig.SMTP.Hello,
|
||||||
|
server.srvConfig.SMTP.From,
|
||||||
|
server.srvConfig.SMTP.Host,
|
||||||
|
server.srvConfig.SMTP.Port,
|
||||||
|
server.srvConfig.SMTP.AuthUsername,
|
||||||
|
server.srvConfig.SMTP.AuthPassword,
|
||||||
|
server.srvConfig.SMTP.AuthSecret,
|
||||||
|
server.srvConfig.SMTP.AuthIdentity,
|
||||||
|
server.srvConfig.SMTP.RequireTLS,
|
||||||
|
server.srvConfig.Route.GroupBy,
|
||||||
|
server.srvConfig.Route.GroupInterval,
|
||||||
|
server.srvConfig.Route.GroupWait,
|
||||||
|
server.srvConfig.Route.RepeatInterval,
|
||||||
|
server.orgID,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := server.SetConfig(ctx, config); err != nil {
|
||||||
|
server.settings.Logger().ErrorContext(ctx, "failed to set config in alertmanager", "error", err, "orgID", server.orgID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *Servers) Stop(ctx context.Context) error {
|
||||||
|
for _, server := range ss.servers {
|
||||||
|
server.Stop(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *Servers) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
|
||||||
|
server, ok := ss.servers[orgID]
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for orgID %q", orgID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return server.GetAlerts(ctx, params)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *Servers) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error {
|
||||||
|
server, ok := ss.servers[orgID]
|
||||||
|
if !ok {
|
||||||
|
return errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for orgID %q", orgID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return server.PutAlerts(ctx, alerts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *Servers) SetConfig(ctx context.Context, orgID string, postableConfig alertmanagertypes.PostableConfig) error {
|
||||||
|
cfg, err := ss.store.GetConfig(ctx, orgID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = cfg.MergeWithPostableConfig(postableConfig)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ss.store.SetConfig(ctx, orgID, cfg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *Servers) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||||
|
server, ok := ss.servers[orgID]
|
||||||
|
if !ok {
|
||||||
|
return errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for orgID %q", orgID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return server.TestReceiver(ctx, receiver)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *Servers) CreateChannel(ctx context.Context, orgID string, channel *alertmanagertypes.Channel) error {
|
||||||
|
receiver, err := alertmanagertypes.NewReceiverFromChannel(channel)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ss.SetConfig(ctx, orgID, alertmanagertypes.PostableConfig{
|
||||||
|
Action: alertmanagertypes.PostableConfigActionCreate,
|
||||||
|
Receiver: receiver,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *Servers) GetChannel(ctx context.Context, orgID string, id uint64) (*alertmanagertypes.Channel, error) {
|
||||||
|
return ss.store.GetChannel(ctx, orgID, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *Servers) DelChannel(ctx context.Context, orgID string, id uint64) error {
|
||||||
|
channel, err := ss.store.GetChannel(ctx, orgID, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
receiver, err := alertmanagertypes.NewReceiverFromChannel(channel)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ss.SetConfig(ctx, orgID, alertmanagertypes.PostableConfig{
|
||||||
|
Action: alertmanagertypes.PostableConfigActionDelete,
|
||||||
|
Receiver: receiver,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *Servers) ListChannels(ctx context.Context, orgID string) (alertmanagertypes.Channels, error) {
|
||||||
|
return ss.store.ListChannels(ctx, orgID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *Servers) UpdateChannel(ctx context.Context, orgID string, id uint64, data string) error {
|
||||||
|
existingChannel, err := ss.store.GetChannel(ctx, orgID, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
existingChannel.Data = data
|
||||||
|
existingChannel.UpdatedAt = time.Now()
|
||||||
|
|
||||||
|
receiver, err := alertmanagertypes.NewReceiverFromChannel(existingChannel)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ss.SetConfig(ctx, orgID, alertmanagertypes.PostableConfig{
|
||||||
|
Action: alertmanagertypes.PostableConfigActionUpdate,
|
||||||
|
Receiver: receiver,
|
||||||
|
})
|
||||||
|
}
|
||||||
10
pkg/factory/factorytest/settings.go
Normal file
10
pkg/factory/factorytest/settings.go
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
package factorytest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"go.signoz.io/signoz/pkg/factory"
|
||||||
|
"go.signoz.io/signoz/pkg/instrumentation/instrumentationtest"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewSettings() factory.Settings {
|
||||||
|
return instrumentationtest.New().ToFactorySettings()
|
||||||
|
}
|
||||||
@@ -2,9 +2,12 @@ package factory
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
"regexp"
|
"regexp"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var _ slog.LogValuer = Name{}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// nameRegex is a regex that matches a valid name.
|
// nameRegex is a regex that matches a valid name.
|
||||||
// It must start with a alphabet, and can only contain alphabets, numbers, underscores or hyphens.
|
// It must start with a alphabet, and can only contain alphabets, numbers, underscores or hyphens.
|
||||||
@@ -15,6 +18,10 @@ type Name struct {
|
|||||||
name string
|
name string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n Name) LogValue() slog.Value {
|
||||||
|
return slog.StringValue(n.name)
|
||||||
|
}
|
||||||
|
|
||||||
func (n Name) String() string {
|
func (n Name) String() string {
|
||||||
return n.name
|
return n.name
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
package factory
|
package factory
|
||||||
|
|
||||||
import "context"
|
import (
|
||||||
|
"context"
|
||||||
|
)
|
||||||
|
|
||||||
type Service interface {
|
type Service interface {
|
||||||
// Starts a service. The service should return an error if it cannot be started.
|
// Starts a service. The service should return an error if it cannot be started.
|
||||||
@@ -8,3 +10,24 @@ type Service interface {
|
|||||||
// Stops a service.
|
// Stops a service.
|
||||||
Stop(context.Context) error
|
Stop(context.Context) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type NamedService interface {
|
||||||
|
Named
|
||||||
|
Service
|
||||||
|
}
|
||||||
|
|
||||||
|
type namedService struct {
|
||||||
|
name Name
|
||||||
|
Service
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *namedService) Name() Name {
|
||||||
|
return s.name
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewNamedService(name Name, service Service) NamedService {
|
||||||
|
return &namedService{
|
||||||
|
name: name,
|
||||||
|
Service: service,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -8,6 +8,56 @@ import (
|
|||||||
sdktrace "go.opentelemetry.io/otel/trace"
|
sdktrace "go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Settings struct {
|
||||||
|
// Logger is the logger.
|
||||||
|
Logger *slog.Logger
|
||||||
|
// MeterProvider is the meter provider.
|
||||||
|
MeterProvider sdkmetric.MeterProvider
|
||||||
|
// TracerProvider is the tracer provider.
|
||||||
|
TracerProvider sdktrace.TracerProvider
|
||||||
|
// PrometheusRegisterer is the prometheus registerer.
|
||||||
|
PrometheusRegisterer prometheus.Registerer
|
||||||
|
}
|
||||||
|
|
||||||
|
type NamespacedSettings interface {
|
||||||
|
Logger() *slog.Logger
|
||||||
|
Meter() sdkmetric.Meter
|
||||||
|
Tracer() sdktrace.Tracer
|
||||||
|
PrometheusRegisterer() prometheus.Registerer
|
||||||
|
}
|
||||||
|
|
||||||
|
type namespacedSettings struct {
|
||||||
|
logger *slog.Logger
|
||||||
|
meter sdkmetric.Meter
|
||||||
|
tracer sdktrace.Tracer
|
||||||
|
prometheusRegisterer prometheus.Registerer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewNamespacedSettings(settings Settings, pkgName string) NamespacedSettings {
|
||||||
|
return &namespacedSettings{
|
||||||
|
logger: settings.Logger.With("pkg", pkgName),
|
||||||
|
meter: settings.MeterProvider.Meter(pkgName),
|
||||||
|
tracer: settings.TracerProvider.Tracer(pkgName),
|
||||||
|
prometheusRegisterer: prometheus.WrapRegistererWith(prometheus.Labels{"pkg": pkgName}, settings.PrometheusRegisterer),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *namespacedSettings) Logger() *slog.Logger {
|
||||||
|
return s.logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *namespacedSettings) Meter() sdkmetric.Meter {
|
||||||
|
return s.meter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *namespacedSettings) Tracer() sdktrace.Tracer {
|
||||||
|
return s.tracer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *namespacedSettings) PrometheusRegisterer() prometheus.Registerer {
|
||||||
|
return s.prometheusRegisterer
|
||||||
|
}
|
||||||
|
|
||||||
type ProviderSettings struct {
|
type ProviderSettings struct {
|
||||||
// SlogLogger is the slog logger.
|
// SlogLogger is the slog logger.
|
||||||
Logger *slog.Logger
|
Logger *slog.Logger
|
||||||
@@ -8,8 +8,16 @@ import (
|
|||||||
sdkresource "go.opentelemetry.io/otel/sdk/resource"
|
sdkresource "go.opentelemetry.io/otel/sdk/resource"
|
||||||
sdktrace "go.opentelemetry.io/otel/trace"
|
sdktrace "go.opentelemetry.io/otel/trace"
|
||||||
"go.signoz.io/signoz/pkg/factory"
|
"go.signoz.io/signoz/pkg/factory"
|
||||||
|
"go.uber.org/zap/zapcore"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var zapLogLevelToSlogLevel = map[zapcore.Level]slog.Level{
|
||||||
|
zapcore.DebugLevel: slog.LevelDebug,
|
||||||
|
zapcore.InfoLevel: slog.LevelInfo,
|
||||||
|
zapcore.WarnLevel: slog.LevelWarn,
|
||||||
|
zapcore.ErrorLevel: slog.LevelError,
|
||||||
|
}
|
||||||
|
|
||||||
// Instrumentation provides the core components for application instrumentation.
|
// Instrumentation provides the core components for application instrumentation.
|
||||||
type Instrumentation interface {
|
type Instrumentation interface {
|
||||||
// Logger returns the Slog logger.
|
// Logger returns the Slog logger.
|
||||||
@@ -22,6 +30,8 @@ type Instrumentation interface {
|
|||||||
PrometheusRegisterer() prometheus.Registerer
|
PrometheusRegisterer() prometheus.Registerer
|
||||||
// ToProviderSettings converts instrumentation to provider settings.
|
// ToProviderSettings converts instrumentation to provider settings.
|
||||||
ToProviderSettings() factory.ProviderSettings
|
ToProviderSettings() factory.ProviderSettings
|
||||||
|
// ToFactorySettings converts instrumentation to factory settings.
|
||||||
|
ToFactorySettings() factory.Settings
|
||||||
}
|
}
|
||||||
|
|
||||||
// Merges the input attributes with the resource attributes.
|
// Merges the input attributes with the resource attributes.
|
||||||
|
|||||||
@@ -51,3 +51,12 @@ func (i *noopInstrumentation) ToProviderSettings() factory.ProviderSettings {
|
|||||||
PrometheusRegisterer: i.PrometheusRegisterer(),
|
PrometheusRegisterer: i.PrometheusRegisterer(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i *noopInstrumentation) ToFactorySettings() factory.Settings {
|
||||||
|
return factory.Settings{
|
||||||
|
Logger: i.Logger(),
|
||||||
|
MeterProvider: i.MeterProvider(),
|
||||||
|
TracerProvider: i.TracerProvider(),
|
||||||
|
PrometheusRegisterer: i.PrometheusRegisterer(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -131,3 +131,12 @@ func (i *SDK) ToProviderSettings() factory.ProviderSettings {
|
|||||||
PrometheusRegisterer: i.PrometheusRegisterer(),
|
PrometheusRegisterer: i.PrometheusRegisterer(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i *SDK) ToFactorySettings() factory.Settings {
|
||||||
|
return factory.Settings{
|
||||||
|
Logger: i.Logger(),
|
||||||
|
MeterProvider: i.MeterProvider(),
|
||||||
|
TracerProvider: i.TracerProvider(),
|
||||||
|
PrometheusRegisterer: i.PrometheusRegisterer(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import (
|
|||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
|
|
||||||
|
"go.signoz.io/signoz/pkg/http/render"
|
||||||
"go.signoz.io/signoz/pkg/query-service/agentConf"
|
"go.signoz.io/signoz/pkg/query-service/agentConf"
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
|
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
|
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
|
||||||
@@ -49,6 +50,8 @@ import (
|
|||||||
"go.signoz.io/signoz/pkg/query-service/contextlinks"
|
"go.signoz.io/signoz/pkg/query-service/contextlinks"
|
||||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
"go.signoz.io/signoz/pkg/query-service/postprocess"
|
"go.signoz.io/signoz/pkg/query-service/postprocess"
|
||||||
|
"go.signoz.io/signoz/pkg/signoz"
|
||||||
|
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
@@ -126,6 +129,7 @@ type APIHandler struct {
|
|||||||
jobsRepo *inframetrics.JobsRepo
|
jobsRepo *inframetrics.JobsRepo
|
||||||
|
|
||||||
pvcsRepo *inframetrics.PvcsRepo
|
pvcsRepo *inframetrics.PvcsRepo
|
||||||
|
SigNoz *signoz.SigNoz
|
||||||
}
|
}
|
||||||
|
|
||||||
type APIHandlerOpts struct {
|
type APIHandlerOpts struct {
|
||||||
@@ -165,6 +169,8 @@ type APIHandlerOpts struct {
|
|||||||
UseLogsNewSchema bool
|
UseLogsNewSchema bool
|
||||||
|
|
||||||
UseTraceNewSchema bool
|
UseTraceNewSchema bool
|
||||||
|
|
||||||
|
SigNoz *signoz.SigNoz
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAPIHandler returns an APIHandler
|
// NewAPIHandler returns an APIHandler
|
||||||
@@ -237,6 +243,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
|||||||
statefulsetsRepo: statefulsetsRepo,
|
statefulsetsRepo: statefulsetsRepo,
|
||||||
jobsRepo: jobsRepo,
|
jobsRepo: jobsRepo,
|
||||||
pvcsRepo: pvcsRepo,
|
pvcsRepo: pvcsRepo,
|
||||||
|
SigNoz: opts.SigNoz,
|
||||||
}
|
}
|
||||||
|
|
||||||
logsQueryBuilder := logsv3.PrepareLogsQuery
|
logsQueryBuilder := logsv3.PrepareLogsQuery
|
||||||
@@ -1310,36 +1317,74 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) {
|
||||||
id := mux.Vars(r)["id"]
|
idVar := mux.Vars(r)["id"]
|
||||||
channel, apiErrorObj := aH.ruleManager.RuleDB().GetChannel(id)
|
id, err := strconv.ParseUint(idVar, 10, 64)
|
||||||
if apiErrorObj != nil {
|
if err != nil {
|
||||||
RespondError(w, apiErrorObj, nil)
|
render.Error(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
orgId, err := auth.GetOrgIdFromJwt(r.Context())
|
||||||
|
if err != nil {
|
||||||
|
render.Error(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
channel, err := aH.SigNoz.AlertmanagerClient.GetChannel(r.Context(), orgId, id)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
aH.Respond(w, channel)
|
aH.Respond(w, channel)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) {
|
||||||
id := mux.Vars(r)["id"]
|
idVar := mux.Vars(r)["id"]
|
||||||
apiErrorObj := aH.ruleManager.RuleDB().DeleteChannel(id)
|
id, err := strconv.ParseUint(idVar, 10, 64)
|
||||||
if apiErrorObj != nil {
|
if err != nil {
|
||||||
RespondError(w, apiErrorObj, nil)
|
render.Error(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
orgId, err := auth.GetOrgIdFromJwt(r.Context())
|
||||||
|
if err != nil {
|
||||||
|
render.Error(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = aH.SigNoz.AlertmanagerClient.DelChannel(r.Context(), orgId, id)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
aH.Respond(w, "notification channel successfully deleted")
|
aH.Respond(w, "notification channel successfully deleted")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) listChannels(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) listChannels(w http.ResponseWriter, r *http.Request) {
|
||||||
channels, apiErrorObj := aH.ruleManager.RuleDB().GetChannels()
|
orgId, err := auth.GetOrgIdFromJwt(r.Context())
|
||||||
if apiErrorObj != nil {
|
if err != nil {
|
||||||
RespondError(w, apiErrorObj, nil)
|
render.Error(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
channels, err := aH.SigNoz.AlertmanagerClient.ListChannels(r.Context(), orgId)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
aH.Respond(w, channels)
|
aH.Respond(w, channels)
|
||||||
}
|
}
|
||||||
|
|
||||||
// testChannels sends test alert to all registered channels
|
// testChannels sends test alert to all registered channels
|
||||||
func (aH *APIHandler) testChannel(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) testChannel(w http.ResponseWriter, r *http.Request) {
|
||||||
|
orgId, err := auth.GetOrgIdFromJwt(r.Context())
|
||||||
|
if err != nil {
|
||||||
|
render.Error(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
defer r.Body.Close()
|
defer r.Body.Close()
|
||||||
body, err := io.ReadAll(r.Body)
|
body, err := io.ReadAll(r.Body)
|
||||||
@@ -1349,24 +1394,34 @@ func (aH *APIHandler) testChannel(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
receiver := &am.Receiver{}
|
receiver, err := alertmanagertypes.NewReceiverFromString(string(body))
|
||||||
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
|
if err != nil {
|
||||||
zap.L().Error("Error in parsing req body of testChannel API\n", zap.Error(err))
|
render.Error(w, err)
|
||||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// send alert
|
|
||||||
apiErrorObj := aH.alertManager.TestReceiver(receiver)
|
err = aH.SigNoz.AlertmanagerClient.TestReceiver(r.Context(), orgId, receiver)
|
||||||
if apiErrorObj != nil {
|
if err != nil {
|
||||||
RespondError(w, apiErrorObj, nil)
|
render.Error(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
aH.Respond(w, "test alert sent")
|
aH.Respond(w, "test alert sent")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
|
||||||
|
idVar := mux.Vars(r)["id"]
|
||||||
|
id, err := strconv.ParseUint(idVar, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
id := mux.Vars(r)["id"]
|
orgId, err := auth.GetOrgIdFromJwt(r.Context())
|
||||||
|
if err != nil {
|
||||||
|
render.Error(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
defer r.Body.Close()
|
defer r.Body.Close()
|
||||||
body, err := io.ReadAll(r.Body)
|
body, err := io.ReadAll(r.Body)
|
||||||
@@ -1376,17 +1431,9 @@ func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
receiver := &am.Receiver{}
|
err = aH.SigNoz.AlertmanagerClient.UpdateChannel(r.Context(), orgId, id, string(body))
|
||||||
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
|
if err != nil {
|
||||||
zap.L().Error("Error in parsing req body of editChannel API", zap.Error(err))
|
render.Error(w, err)
|
||||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
_, apiErrorObj := aH.ruleManager.RuleDB().EditChannel(receiver, id)
|
|
||||||
|
|
||||||
if apiErrorObj != nil {
|
|
||||||
RespondError(w, apiErrorObj, nil)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1395,6 +1442,11 @@ func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) {
|
||||||
|
orgId, err := auth.GetOrgIdFromJwt(r.Context())
|
||||||
|
if err != nil {
|
||||||
|
render.Error(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
defer r.Body.Close()
|
defer r.Body.Close()
|
||||||
body, err := io.ReadAll(r.Body)
|
body, err := io.ReadAll(r.Body)
|
||||||
@@ -1404,41 +1456,41 @@ func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
receiver := &am.Receiver{}
|
channel, err := alertmanagertypes.NewChannelFromReceiverString(string(body), orgId)
|
||||||
if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer
|
if err != nil {
|
||||||
zap.L().Error("Error in parsing req body of createChannel API", zap.Error(err))
|
render.Error(w, err)
|
||||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_, apiErrorObj := aH.ruleManager.RuleDB().CreateChannel(receiver)
|
err = aH.SigNoz.AlertmanagerClient.CreateChannel(r.Context(), orgId, channel)
|
||||||
|
if err != nil {
|
||||||
if apiErrorObj != nil {
|
render.Error(w, err)
|
||||||
RespondError(w, apiErrorObj, nil)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
aH.Respond(w, nil)
|
aH.Respond(w, nil)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) getAlerts(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) getAlerts(w http.ResponseWriter, r *http.Request) {
|
||||||
params := r.URL.Query()
|
orgId, err := auth.GetOrgIdFromJwt(r.Context())
|
||||||
amEndpoint := constants.GetAlertManagerApiPrefix()
|
if err != nil {
|
||||||
resp, err := http.Get(amEndpoint + "v1/alerts" + "?" + params.Encode())
|
render.Error(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
params, err := alertmanagertypes.NewGettableAlertsParams(r)
|
||||||
|
if err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
alerts, err := aH.SigNoz.AlertmanagerClient.GetAlerts(r.Context(), orgId, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
defer resp.Body.Close()
|
aH.Respond(w, alerts)
|
||||||
body, err := io.ReadAll(resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
aH.Respond(w, string(body))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|||||||
@@ -200,6 +200,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
FluxInterval: fluxInterval,
|
FluxInterval: fluxInterval,
|
||||||
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
|
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
|
||||||
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
|
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
|
||||||
|
SigNoz: serverOptions.SigNoz,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -132,3 +132,17 @@ func GetEmailFromJwt(ctx context.Context) (string, error) {
|
|||||||
|
|
||||||
return claims["email"].(string), nil
|
return claims["email"].(string), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetOrgIdFromJwt(ctx context.Context) (string, error) {
|
||||||
|
jwt, ok := ExtractJwtFromContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
return "", model.InternalError(fmt.Errorf("failed to extract jwt from context"))
|
||||||
|
}
|
||||||
|
|
||||||
|
claims, err := ParseJWT(jwt)
|
||||||
|
if err != nil {
|
||||||
|
return "", model.InternalError(fmt.Errorf("failed get claims from jwt %v", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return claims["orgId"].(string), nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,23 +4,23 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"go.signoz.io/signoz/pkg/factory"
|
"go.signoz.io/signoz/pkg/factory"
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Registry struct {
|
type Registry struct {
|
||||||
services []factory.Service
|
services factory.NamedMap[factory.NamedService]
|
||||||
logger *zap.Logger
|
logger *slog.Logger
|
||||||
startCh chan error
|
startCh chan error
|
||||||
stopCh chan error
|
stopCh chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new registry of services. It needs at least one service in the input.
|
// New creates a new registry of services. It needs at least one service in the input.
|
||||||
func New(logger *zap.Logger, services ...factory.Service) (*Registry, error) {
|
func New(logger *slog.Logger, services ...factory.NamedService) (*Registry, error) {
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
return nil, fmt.Errorf("cannot build registry, logger is required")
|
return nil, fmt.Errorf("cannot build registry, logger is required")
|
||||||
}
|
}
|
||||||
@@ -29,19 +29,25 @@ func New(logger *zap.Logger, services ...factory.Service) (*Registry, error) {
|
|||||||
return nil, fmt.Errorf("cannot build registry, at least one service is required")
|
return nil, fmt.Errorf("cannot build registry, at least one service is required")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m := factory.MustNewNamedMap(services...)
|
||||||
|
|
||||||
return &Registry{
|
return &Registry{
|
||||||
logger: logger.Named("go.signoz.io/pkg/registry"),
|
logger: logger,
|
||||||
services: services,
|
services: m,
|
||||||
startCh: make(chan error, 1),
|
startCh: make(chan error, 1),
|
||||||
stopCh: make(chan error, len(services)),
|
stopCh: make(chan error, len(services)),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registry) Start(ctx context.Context) error {
|
func (r *Registry) Start(ctx context.Context) error {
|
||||||
for _, s := range r.services {
|
for _, s := range r.services.GetInOrder() {
|
||||||
go func(s factory.Service) {
|
go func(s factory.NamedService) {
|
||||||
|
r.logger.InfoContext(ctx, "starting service", "service", s.Name())
|
||||||
err := s.Start(ctx)
|
err := s.Start(ctx)
|
||||||
r.startCh <- err
|
if err != nil {
|
||||||
|
r.logger.ErrorContext(ctx, "failed to start service", "service", s.Name(), "error", err)
|
||||||
|
r.startCh <- err
|
||||||
|
}
|
||||||
}(s)
|
}(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -54,11 +60,11 @@ func (r *Registry) Wait(ctx context.Context) error {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
r.logger.Info("caught context error, exiting", zap.Any("context", ctx))
|
r.logger.InfoContext(ctx, "caught context error, exiting", "error", ctx.Err())
|
||||||
case s := <-interrupt:
|
case s := <-interrupt:
|
||||||
r.logger.Info("caught interrupt signal, exiting", zap.Any("context", ctx), zap.Any("signal", s))
|
r.logger.InfoContext(ctx, "caught interrupt signal, exiting", "signal", s)
|
||||||
case err := <-r.startCh:
|
case err := <-r.startCh:
|
||||||
r.logger.Info("caught service error, exiting", zap.Any("context", ctx), zap.Error(err))
|
r.logger.ErrorContext(ctx, "caught service error, exiting", "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -66,15 +72,16 @@ func (r *Registry) Wait(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registry) Stop(ctx context.Context) error {
|
func (r *Registry) Stop(ctx context.Context) error {
|
||||||
for _, s := range r.services {
|
for _, s := range r.services.GetInOrder() {
|
||||||
go func(s factory.Service) {
|
go func(s factory.NamedService) {
|
||||||
|
r.logger.InfoContext(ctx, "stopping service", "service", s.Name())
|
||||||
err := s.Stop(ctx)
|
err := s.Stop(ctx)
|
||||||
r.stopCh <- err
|
r.stopCh <- err
|
||||||
}(s)
|
}(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
errs := make([]error, len(r.services))
|
errs := make([]error, len(r.services.GetInOrder()))
|
||||||
for i := 0; i < len(r.services); i++ {
|
for i := 0; i < len(r.services.GetInOrder()); i++ {
|
||||||
err := <-r.stopCh
|
err := <-r.stopCh
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errs = append(errs, err)
|
errs = append(errs, err)
|
||||||
|
|||||||
@@ -2,12 +2,13 @@ package registry
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"io"
|
||||||
|
"log/slog"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.signoz.io/signoz/pkg/factory/servicetest"
|
"go.signoz.io/signoz/pkg/factory/servicetest"
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRegistryWith2HttpServers(t *testing.T) {
|
func TestRegistryWith2HttpServers(t *testing.T) {
|
||||||
@@ -17,7 +18,7 @@ func TestRegistryWith2HttpServers(t *testing.T) {
|
|||||||
http2, err := servicetest.NewHttpService("http2")
|
http2, err := servicetest.NewHttpService("http2")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
registry, err := New(zap.NewNop(), http1, http2)
|
registry, err := New(slog.New(slog.NewTextHandler(io.Discard, nil)), http1, http2)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
@@ -41,7 +42,7 @@ func TestRegistryWith2HttpServersWithoutWait(t *testing.T) {
|
|||||||
http2, err := servicetest.NewHttpService("http2")
|
http2, err := servicetest.NewHttpService("http2")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
registry, err := New(zap.NewNop(), http1, http2)
|
registry, err := New(slog.New(slog.NewTextHandler(io.Discard, nil)), http1, http2)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.signoz.io/signoz/pkg/alertmanager"
|
||||||
"go.signoz.io/signoz/pkg/apiserver"
|
"go.signoz.io/signoz/pkg/apiserver"
|
||||||
"go.signoz.io/signoz/pkg/cache"
|
"go.signoz.io/signoz/pkg/cache"
|
||||||
"go.signoz.io/signoz/pkg/config"
|
"go.signoz.io/signoz/pkg/config"
|
||||||
@@ -44,6 +45,9 @@ type Config struct {
|
|||||||
|
|
||||||
// TelemetryStore config
|
// TelemetryStore config
|
||||||
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"`
|
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"`
|
||||||
|
|
||||||
|
// Alertmanager config
|
||||||
|
Alertmanager alertmanager.Config `mapstructure:"alertmanager"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeprecatedFlags are the flags that are deprecated and scheduled for removal.
|
// DeprecatedFlags are the flags that are deprecated and scheduled for removal.
|
||||||
@@ -63,6 +67,7 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprec
|
|||||||
sqlmigrator.NewConfigFactory(),
|
sqlmigrator.NewConfigFactory(),
|
||||||
apiserver.NewConfigFactory(),
|
apiserver.NewConfigFactory(),
|
||||||
telemetrystore.NewConfigFactory(),
|
telemetrystore.NewConfigFactory(),
|
||||||
|
alertmanager.NewConfigFactory(),
|
||||||
}
|
}
|
||||||
|
|
||||||
conf, err := config.New(ctx, resolverConfig, configFactories)
|
conf, err := config.New(ctx, resolverConfig, configFactories)
|
||||||
|
|||||||
@@ -59,6 +59,7 @@ func NewProviderConfig() ProviderConfig {
|
|||||||
sqlmigration.NewAddIntegrationsFactory(),
|
sqlmigration.NewAddIntegrationsFactory(),
|
||||||
sqlmigration.NewAddLicensesFactory(),
|
sqlmigration.NewAddLicensesFactory(),
|
||||||
sqlmigration.NewAddPatsFactory(),
|
sqlmigration.NewAddPatsFactory(),
|
||||||
|
sqlmigration.NewAddAlertmanagerConfigurationFactory(),
|
||||||
sqlmigration.NewModifyDatetimeFactory(),
|
sqlmigration.NewModifyDatetimeFactory(),
|
||||||
),
|
),
|
||||||
TelemetryStoreProviderFactories: factory.MustNewNamedMap(
|
TelemetryStoreProviderFactories: factory.MustNewNamedMap(
|
||||||
|
|||||||
@@ -3,9 +3,12 @@ package signoz
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"go.signoz.io/signoz/pkg/alertmanager"
|
||||||
|
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
|
||||||
"go.signoz.io/signoz/pkg/cache"
|
"go.signoz.io/signoz/pkg/cache"
|
||||||
"go.signoz.io/signoz/pkg/factory"
|
"go.signoz.io/signoz/pkg/factory"
|
||||||
"go.signoz.io/signoz/pkg/instrumentation"
|
"go.signoz.io/signoz/pkg/instrumentation"
|
||||||
|
"go.signoz.io/signoz/pkg/registry"
|
||||||
"go.signoz.io/signoz/pkg/sqlmigration"
|
"go.signoz.io/signoz/pkg/sqlmigration"
|
||||||
"go.signoz.io/signoz/pkg/sqlmigrator"
|
"go.signoz.io/signoz/pkg/sqlmigrator"
|
||||||
"go.signoz.io/signoz/pkg/sqlstore"
|
"go.signoz.io/signoz/pkg/sqlstore"
|
||||||
@@ -16,10 +19,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type SigNoz struct {
|
type SigNoz struct {
|
||||||
Cache cache.Cache
|
*registry.Registry
|
||||||
Web web.Web
|
Cache cache.Cache
|
||||||
SQLStore sqlstore.SQLStore
|
Web web.Web
|
||||||
TelemetryStore telemetrystore.TelemetryStore
|
SQLStore sqlstore.SQLStore
|
||||||
|
TelemetryStore telemetrystore.TelemetryStore
|
||||||
|
AlertmanagerClient alertmanager.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(
|
func New(
|
||||||
@@ -38,6 +43,9 @@ func New(
|
|||||||
// Get the provider settings from instrumentation
|
// Get the provider settings from instrumentation
|
||||||
providerSettings := instrumentation.ToProviderSettings()
|
providerSettings := instrumentation.ToProviderSettings()
|
||||||
|
|
||||||
|
// Get the factory settings from instrumentation
|
||||||
|
factorySettings := instrumentation.ToFactorySettings()
|
||||||
|
|
||||||
// Initialize cache from the available cache provider factories
|
// Initialize cache from the available cache provider factories
|
||||||
cache, err := factory.NewProviderFromNamedMap(
|
cache, err := factory.NewProviderFromNamedMap(
|
||||||
ctx,
|
ctx,
|
||||||
@@ -102,10 +110,29 @@ func New(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
alertmanagerstore, err := sqlalertmanagerstore.
|
||||||
|
NewFactory(sqlstore).
|
||||||
|
New(ctx, providerSettings, config.Alertmanager.Store)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
alertmanagerService, err := alertmanager.New(ctx, factorySettings, config.Alertmanager, alertmanagerstore)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
registry, err := registry.New(instrumentation.Logger(), factory.NewNamedService(factory.MustNewName("alertmanager"), alertmanagerService))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return &SigNoz{
|
return &SigNoz{
|
||||||
Cache: cache,
|
Registry: registry,
|
||||||
Web: web,
|
Cache: cache,
|
||||||
SQLStore: sqlstore,
|
Web: web,
|
||||||
TelemetryStore: telemetrystore,
|
SQLStore: sqlstore,
|
||||||
|
TelemetryStore: telemetrystore,
|
||||||
|
AlertmanagerClient: alertmanagerService,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
62
pkg/sqlmigration/010_add_alertmanager_configuration.go
Normal file
62
pkg/sqlmigration/010_add_alertmanager_configuration.go
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
package sqlmigration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/uptrace/bun"
|
||||||
|
"github.com/uptrace/bun/migrate"
|
||||||
|
"go.signoz.io/signoz/pkg/factory"
|
||||||
|
)
|
||||||
|
|
||||||
|
type addAlertmanagerConfiguration struct{}
|
||||||
|
|
||||||
|
func NewAddAlertmanagerConfigurationFactory() factory.ProviderFactory[SQLMigration, Config] {
|
||||||
|
return factory.NewProviderFactory(factory.MustNewName("add_alertmanager_configuration"), newAddAlertmanagerConfiguration)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newAddAlertmanagerConfiguration(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
|
||||||
|
return &addAlertmanagerConfiguration{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (migration *addAlertmanagerConfiguration) Register(migrations *migrate.Migrations) error {
|
||||||
|
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (migration *addAlertmanagerConfiguration) Up(ctx context.Context, db *bun.DB) error {
|
||||||
|
if _, err := db.
|
||||||
|
NewCreateTable().
|
||||||
|
Model(&struct {
|
||||||
|
bun.BaseModel `bun:"table:alertmanager_config"`
|
||||||
|
ID uint64 `bun:"id"`
|
||||||
|
Config string `bun:"config"`
|
||||||
|
SilencesState string `bun:"silences_state"`
|
||||||
|
NFLogState string `bun:"nflog_state"`
|
||||||
|
CreatedAt time.Time `bun:"created_at"`
|
||||||
|
UpdatedAt time.Time `bun:"updated_at"`
|
||||||
|
OrgID string `bun:"org_id,unique"`
|
||||||
|
}{}).
|
||||||
|
ForeignKey(`("org_id") REFERENCES "organizations" ("id") ON DELETE CASCADE`).
|
||||||
|
IfNotExists().
|
||||||
|
Exec(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := db.
|
||||||
|
NewAddColumn().
|
||||||
|
Table("notification_channels").
|
||||||
|
ColumnExpr("org_id TEXT").
|
||||||
|
Exec(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (migration *addAlertmanagerConfiguration) Down(ctx context.Context, db *bun.DB) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -3,6 +3,7 @@ package sqlitesqlstore
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"log/slog"
|
||||||
|
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
@@ -10,6 +11,7 @@ import (
|
|||||||
"github.com/uptrace/bun/dialect/sqlitedialect"
|
"github.com/uptrace/bun/dialect/sqlitedialect"
|
||||||
"go.signoz.io/signoz/pkg/factory"
|
"go.signoz.io/signoz/pkg/factory"
|
||||||
"go.signoz.io/signoz/pkg/sqlstore"
|
"go.signoz.io/signoz/pkg/sqlstore"
|
||||||
|
"go.signoz.io/signoz/pkg/sqlstore/sqlstorehook"
|
||||||
)
|
)
|
||||||
|
|
||||||
type provider struct {
|
type provider struct {
|
||||||
@@ -33,10 +35,13 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
|
|||||||
settings.Logger().InfoContext(ctx, "connected to sqlite", "path", config.Sqlite.Path)
|
settings.Logger().InfoContext(ctx, "connected to sqlite", "path", config.Sqlite.Path)
|
||||||
sqldb.SetMaxOpenConns(config.Connection.MaxOpenConns)
|
sqldb.SetMaxOpenConns(config.Connection.MaxOpenConns)
|
||||||
|
|
||||||
|
bundb := bun.NewDB(sqldb, sqlitedialect.New())
|
||||||
|
bundb.AddQueryHook(sqlstorehook.NewDebug(settings.Logger(), slog.LevelDebug))
|
||||||
|
|
||||||
return &provider{
|
return &provider{
|
||||||
settings: settings,
|
settings: settings,
|
||||||
sqldb: sqldb,
|
sqldb: sqldb,
|
||||||
bundb: bun.NewDB(sqldb, sqlitedialect.New()),
|
bundb: bundb,
|
||||||
sqlxdb: sqlx.NewDb(sqldb, "sqlite3"),
|
sqlxdb: sqlx.NewDb(sqldb, "sqlite3"),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
30
pkg/sqlstore/sqlstorehook/debug.go
Normal file
30
pkg/sqlstore/sqlstorehook/debug.go
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
package sqlstorehook
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/uptrace/bun"
|
||||||
|
)
|
||||||
|
|
||||||
|
type debug struct {
|
||||||
|
bun.QueryHook
|
||||||
|
logger *slog.Logger
|
||||||
|
level slog.Level
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDebug(logger *slog.Logger, level slog.Level) bun.QueryHook {
|
||||||
|
return &debug{
|
||||||
|
logger: logger,
|
||||||
|
level: level,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (debug) BeforeQuery(ctx context.Context, event *bun.QueryEvent) context.Context {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hook debug) AfterQuery(ctx context.Context, event *bun.QueryEvent) {
|
||||||
|
hook.logger.Log(ctx, hook.level, "::SQLSTORE-QUERY::", "db.query.operation", event.Operation(), "db.query.text", event.Query, "db.duration", time.Since(event.StartTime).String())
|
||||||
|
}
|
||||||
BIN
queries.active
Normal file
BIN
queries.active
Normal file
Binary file not shown.
Reference in New Issue
Block a user