Compare commits
8 Commits
v0.75.0-04
...
v0.75.0-bd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bd3e0eeb6c | ||
|
|
9d6e09d3f6 | ||
|
|
cccc25e2ee | ||
|
|
9580da0478 | ||
|
|
8534b798c7 | ||
|
|
3bedd10ef9 | ||
|
|
b6f2ff052d | ||
|
|
d02dc8687c |
@@ -37,6 +37,7 @@ type provider struct {
|
||||
configStore alertmanagertypes.ConfigStore
|
||||
batcher *alertmanagerbatcher.Batcher
|
||||
url *url.URL
|
||||
orgID string
|
||||
}
|
||||
|
||||
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
|
||||
@@ -73,8 +74,25 @@ func (provider *provider) Start(ctx context.Context) error {
|
||||
}
|
||||
|
||||
for alerts := range provider.batcher.C {
|
||||
if err := provider.putAlerts(ctx, "", alerts); err != nil {
|
||||
provider.settings.Logger().Error("failed to send alerts to alertmanager", "error", err)
|
||||
// For the first time, we need to get the orgID from the config store.
|
||||
// Since this is the legacy alertmanager, we get the first org from the store.
|
||||
if provider.orgID == "" {
|
||||
orgIDs, err := provider.configStore.ListOrgs(ctx)
|
||||
if err != nil {
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to send alerts to alertmanager", "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(orgIDs) == 0 {
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to send alerts to alertmanager", "error", "no orgs found")
|
||||
continue
|
||||
}
|
||||
|
||||
provider.orgID = orgIDs[0]
|
||||
}
|
||||
|
||||
if err := provider.putAlerts(ctx, provider.orgID, alerts); err != nil {
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to send alerts to alertmanager", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -57,22 +57,12 @@ const PreferRPM = "PreferRPM"
|
||||
const SpanSearchScopeRoot = "isroot"
|
||||
const SpanSearchScopeEntryPoint = "isentrypoint"
|
||||
|
||||
func GetAlertManagerApiPrefix() string {
|
||||
if os.Getenv("ALERTMANAGER_API_PREFIX") != "" {
|
||||
return os.Getenv("ALERTMANAGER_API_PREFIX")
|
||||
}
|
||||
return "http://alertmanager:9093/api/"
|
||||
}
|
||||
|
||||
var TELEMETRY_HEART_BEAT_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_HEART_BEAT_DURATION_MINUTES", 720)
|
||||
|
||||
var TELEMETRY_ACTIVE_USER_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_ACTIVE_USER_DURATION_MINUTES", 360)
|
||||
|
||||
var InviteEmailTemplate = GetOrDefaultEnv("INVITE_EMAIL_TEMPLATE", "/root/templates/invitation_email_template.html")
|
||||
|
||||
// Alert manager channel subpath
|
||||
var AmChannelApiPath = GetOrDefaultEnv("ALERTMANAGER_API_CHANNEL_PATH", "v1/routes")
|
||||
|
||||
var OTLPTarget = GetOrDefaultEnv("OTEL_EXPORTER_OTLP_ENDPOINT", "")
|
||||
var LogExportBatchSize = GetOrDefaultEnv("OTEL_BLRP_MAX_EXPORT_BATCH_SIZE", "512")
|
||||
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
package constants
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestGetAlertManagerApiPrefix(t *testing.T) {
|
||||
Convey("TestGetAlertManagerApiPrefix", t, func() {
|
||||
res := GetAlertManagerApiPrefix()
|
||||
So(res, ShouldEqual, "http://alertmanager:9093/api/")
|
||||
|
||||
Convey("WithEnvSet", func() {
|
||||
os.Setenv("ALERTMANAGER_API_PREFIX", "http://test:9093/api/")
|
||||
res = GetAlertManagerApiPrefix()
|
||||
So(res, ShouldEqual, "http://test:9093/api/")
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/uptrace/bun"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
@@ -55,6 +56,8 @@ type RuleDB interface {
|
||||
}
|
||||
|
||||
type StoredRule struct {
|
||||
bun.BaseModel `bun:"rules"`
|
||||
|
||||
Id int `json:"id" db:"id" bun:"id,pk,autoincrement"`
|
||||
CreatedAt *time.Time `json:"created_at" db:"created_at" bun:"created_at"`
|
||||
CreatedBy *string `json:"created_by" db:"created_by" bun:"created_by"`
|
||||
@@ -74,9 +77,8 @@ func NewRuleDB(db *sqlx.DB, sqlstore sqlstore.SQLStore) RuleDB {
|
||||
|
||||
// CreateRule stores a given rule in db and returns task name and error (if any)
|
||||
func (r *ruleDB) CreateRule(ctx context.Context, storedRule *StoredRule, cb func(context.Context, int64) error) (int64, error) {
|
||||
var lastInsertId int64
|
||||
err := r.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
|
||||
res, err := r.sqlstore.
|
||||
_, err := r.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewInsert().
|
||||
Model(storedRule).
|
||||
@@ -85,19 +87,14 @@ func (r *ruleDB) CreateRule(ctx context.Context, storedRule *StoredRule, cb func
|
||||
return err
|
||||
}
|
||||
|
||||
lastInsertId, err = res.LastInsertId()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return cb(ctx, lastInsertId)
|
||||
return cb(ctx, int64(storedRule.Id))
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return lastInsertId, nil
|
||||
return int64(storedRule.Id), nil
|
||||
}
|
||||
|
||||
// EditRule stores a given rule string in database and returns task name and error (if any)
|
||||
@@ -123,6 +120,7 @@ func (r *ruleDB) DeleteRule(ctx context.Context, id string, cb func(context.Cont
|
||||
_, err := r.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewDelete().
|
||||
Model(&StoredRule{}).
|
||||
Where("id = ?", id).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
|
||||
@@ -3,11 +3,16 @@ package sqlmigration
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/tidwall/gjson"
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerserver"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
type addAlertmanager struct{}
|
||||
@@ -40,8 +45,10 @@ func (migration *addAlertmanager) Up(ctx context.Context, db *bun.DB) error {
|
||||
NewDropColumn().
|
||||
Table("notification_channels").
|
||||
ColumnExpr("deleted").
|
||||
Exec(ctx); err != nil && err != ErrNoExecute {
|
||||
return err
|
||||
Exec(ctx); err != nil {
|
||||
if !strings.Contains(err.Error(), "no such column") {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := tx.
|
||||
@@ -77,12 +84,12 @@ func (migration *addAlertmanager) Up(ctx context.Context, db *bun.DB) error {
|
||||
NewCreateTable().
|
||||
Model(&struct {
|
||||
bun.BaseModel `bun:"table:alertmanager_config"`
|
||||
ID uint64 `bun:"id"`
|
||||
Config string `bun:"config"`
|
||||
Hash string `bun:"hash"`
|
||||
CreatedAt time.Time `bun:"created_at"`
|
||||
UpdatedAt time.Time `bun:"updated_at"`
|
||||
OrgID string `bun:"org_id,unique"`
|
||||
ID uint64 `bun:"id,pk,autoincrement"`
|
||||
Config string `bun:"config,notnull,type:text"`
|
||||
Hash string `bun:"hash,notnull,type:text"`
|
||||
CreatedAt time.Time `bun:"created_at,notnull"`
|
||||
UpdatedAt time.Time `bun:"updated_at,notnull"`
|
||||
OrgID string `bun:"org_id,notnull,unique"`
|
||||
}{}).
|
||||
ForeignKey(`("org_id") REFERENCES "organizations" ("id")`).
|
||||
IfNotExists().
|
||||
@@ -95,11 +102,11 @@ func (migration *addAlertmanager) Up(ctx context.Context, db *bun.DB) error {
|
||||
Model(&struct {
|
||||
bun.BaseModel `bun:"table:alertmanager_state"`
|
||||
ID uint64 `bun:"id,pk,autoincrement"`
|
||||
Silences string `bun:"silences,nullzero"`
|
||||
NFLog string `bun:"nflog,nullzero"`
|
||||
CreatedAt time.Time `bun:"created_at"`
|
||||
UpdatedAt time.Time `bun:"updated_at"`
|
||||
OrgID string `bun:"org_id,unique"`
|
||||
Silences string `bun:"silences,nullzero,type:text"`
|
||||
NFLog string `bun:"nflog,nullzero,type:text"`
|
||||
CreatedAt time.Time `bun:"created_at,notnull"`
|
||||
UpdatedAt time.Time `bun:"updated_at,notnull"`
|
||||
OrgID string `bun:"org_id,notnull,unique"`
|
||||
}{}).
|
||||
ForeignKey(`("org_id") REFERENCES "organizations" ("id")`).
|
||||
IfNotExists().
|
||||
@@ -107,6 +114,10 @@ func (migration *addAlertmanager) Up(ctx context.Context, db *bun.DB) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := migration.populateAlertmanagerConfig(ctx, tx, orgID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -127,6 +138,69 @@ func (migration *addAlertmanager) populateOrgID(ctx context.Context, tx bun.Tx,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *addAlertmanager) populateAlertmanagerConfig(ctx context.Context, tx bun.Tx, orgID string) error {
|
||||
var channels []*alertmanagertypes.Channel
|
||||
|
||||
err := tx.
|
||||
NewSelect().
|
||||
Model(&channels).
|
||||
Where("org_id = ?", orgID).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
type matcher struct {
|
||||
bun.BaseModel `bun:"table:rules"`
|
||||
ID int `bun:"id,pk"`
|
||||
Data string `bun:"data"`
|
||||
}
|
||||
|
||||
matchers := []matcher{}
|
||||
|
||||
err = tx.
|
||||
NewSelect().
|
||||
Column("id", "data").
|
||||
Model(&matchers).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
matchersMap := make(map[string][]string)
|
||||
for _, matcher := range matchers {
|
||||
receivers := gjson.Get(matcher.Data, "preferredChannels").Array()
|
||||
for _, receiver := range receivers {
|
||||
matchersMap[strconv.Itoa(matcher.ID)] = append(matchersMap[strconv.Itoa(matcher.ID)], receiver.String())
|
||||
}
|
||||
}
|
||||
|
||||
config, err := alertmanagertypes.NewConfigFromChannels(alertmanagerserver.NewConfig().Global, alertmanagerserver.NewConfig().Route, channels, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for ruleID, receivers := range matchersMap {
|
||||
err = config.CreateRuleIDMatcher(ruleID, receivers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := tx.
|
||||
NewInsert().
|
||||
Model(config.StoreableConfig()).
|
||||
On("CONFLICT (org_id) DO UPDATE").
|
||||
Set("config = ?", config.StoreableConfig().Config).
|
||||
Set("hash = ?", config.StoreableConfig().Hash).
|
||||
Set("updated_at = ?", config.StoreableConfig().UpdatedAt).
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *addAlertmanager) Down(ctx context.Context, db *bun.DB) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user