Compare commits

...

8 Commits

Author SHA1 Message Date
grandwizard28
bd3e0eeb6c fix(ruler): fix lastinsertedid concundrum 2025-03-06 02:50:09 +05:30
grandwizard28
9d6e09d3f6 fix(legacyalertmanager): create the complete config in the migration 2025-03-06 02:01:11 +05:30
grandwizard28
cccc25e2ee fix(legacyalertmanager): pick the first organization 2025-03-06 01:46:40 +05:30
grandwizard28
9580da0478 fix(migration): make it idempotent 2025-03-06 01:20:56 +05:30
grandwizard28
8534b798c7 fix(migration): make it idempotent 2025-03-06 01:10:19 +05:30
grandwizard28
3bedd10ef9 fix(migration): add an alertmanager config by default 2025-03-06 00:49:35 +05:30
grandwizard28
b6f2ff052d fix(ruler): fix db query 2025-03-05 23:04:19 +05:30
grandwizard28
d02dc8687c refactor(ruler): add BaseModel 2025-03-05 22:48:25 +05:30
5 changed files with 114 additions and 55 deletions

View File

@@ -37,6 +37,7 @@ type provider struct {
configStore alertmanagertypes.ConfigStore
batcher *alertmanagerbatcher.Batcher
url *url.URL
orgID string
}
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
@@ -73,8 +74,25 @@ func (provider *provider) Start(ctx context.Context) error {
}
for alerts := range provider.batcher.C {
if err := provider.putAlerts(ctx, "", alerts); err != nil {
provider.settings.Logger().Error("failed to send alerts to alertmanager", "error", err)
// For the first time, we need to get the orgID from the config store.
// Since this is the legacy alertmanager, we get the first org from the store.
if provider.orgID == "" {
orgIDs, err := provider.configStore.ListOrgs(ctx)
if err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to send alerts to alertmanager", "error", err)
continue
}
if len(orgIDs) == 0 {
provider.settings.Logger().ErrorContext(ctx, "failed to send alerts to alertmanager", "error", "no orgs found")
continue
}
provider.orgID = orgIDs[0]
}
if err := provider.putAlerts(ctx, provider.orgID, alerts); err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to send alerts to alertmanager", "error", err)
}
}

View File

@@ -57,22 +57,12 @@ const PreferRPM = "PreferRPM"
const SpanSearchScopeRoot = "isroot"
const SpanSearchScopeEntryPoint = "isentrypoint"
func GetAlertManagerApiPrefix() string {
if os.Getenv("ALERTMANAGER_API_PREFIX") != "" {
return os.Getenv("ALERTMANAGER_API_PREFIX")
}
return "http://alertmanager:9093/api/"
}
var TELEMETRY_HEART_BEAT_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_HEART_BEAT_DURATION_MINUTES", 720)
var TELEMETRY_ACTIVE_USER_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_ACTIVE_USER_DURATION_MINUTES", 360)
var InviteEmailTemplate = GetOrDefaultEnv("INVITE_EMAIL_TEMPLATE", "/root/templates/invitation_email_template.html")
// Alert manager channel subpath
var AmChannelApiPath = GetOrDefaultEnv("ALERTMANAGER_API_CHANNEL_PATH", "v1/routes")
var OTLPTarget = GetOrDefaultEnv("OTEL_EXPORTER_OTLP_ENDPOINT", "")
var LogExportBatchSize = GetOrDefaultEnv("OTEL_BLRP_MAX_EXPORT_BATCH_SIZE", "512")

View File

@@ -1,21 +0,0 @@
package constants
import (
"os"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
func TestGetAlertManagerApiPrefix(t *testing.T) {
Convey("TestGetAlertManagerApiPrefix", t, func() {
res := GetAlertManagerApiPrefix()
So(res, ShouldEqual, "http://alertmanager:9093/api/")
Convey("WithEnvSet", func() {
os.Setenv("ALERTMANAGER_API_PREFIX", "http://test:9093/api/")
res = GetAlertManagerApiPrefix()
So(res, ShouldEqual, "http://test:9093/api/")
})
})
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"github.com/uptrace/bun"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/sqlstore"
@@ -55,6 +56,8 @@ type RuleDB interface {
}
type StoredRule struct {
bun.BaseModel `bun:"rules"`
Id int `json:"id" db:"id" bun:"id,pk,autoincrement"`
CreatedAt *time.Time `json:"created_at" db:"created_at" bun:"created_at"`
CreatedBy *string `json:"created_by" db:"created_by" bun:"created_by"`
@@ -74,9 +77,8 @@ func NewRuleDB(db *sqlx.DB, sqlstore sqlstore.SQLStore) RuleDB {
// CreateRule stores a given rule in db and returns task name and error (if any)
func (r *ruleDB) CreateRule(ctx context.Context, storedRule *StoredRule, cb func(context.Context, int64) error) (int64, error) {
var lastInsertId int64
err := r.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
res, err := r.sqlstore.
_, err := r.sqlstore.
BunDBCtx(ctx).
NewInsert().
Model(storedRule).
@@ -85,19 +87,14 @@ func (r *ruleDB) CreateRule(ctx context.Context, storedRule *StoredRule, cb func
return err
}
lastInsertId, err = res.LastInsertId()
if err != nil {
return err
}
return cb(ctx, lastInsertId)
return cb(ctx, int64(storedRule.Id))
})
if err != nil {
return 0, err
}
return lastInsertId, nil
return int64(storedRule.Id), nil
}
// EditRule stores a given rule string in database and returns task name and error (if any)
@@ -123,6 +120,7 @@ func (r *ruleDB) DeleteRule(ctx context.Context, id string, cb func(context.Cont
_, err := r.sqlstore.
BunDBCtx(ctx).
NewDelete().
Model(&StoredRule{}).
Where("id = ?", id).
Exec(ctx)
if err != nil {

View File

@@ -3,11 +3,16 @@ package sqlmigration
import (
"context"
"database/sql"
"strconv"
"strings"
"time"
"github.com/tidwall/gjson"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerserver"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
type addAlertmanager struct{}
@@ -40,8 +45,10 @@ func (migration *addAlertmanager) Up(ctx context.Context, db *bun.DB) error {
NewDropColumn().
Table("notification_channels").
ColumnExpr("deleted").
Exec(ctx); err != nil && err != ErrNoExecute {
return err
Exec(ctx); err != nil {
if !strings.Contains(err.Error(), "no such column") {
return err
}
}
if _, err := tx.
@@ -77,12 +84,12 @@ func (migration *addAlertmanager) Up(ctx context.Context, db *bun.DB) error {
NewCreateTable().
Model(&struct {
bun.BaseModel `bun:"table:alertmanager_config"`
ID uint64 `bun:"id"`
Config string `bun:"config"`
Hash string `bun:"hash"`
CreatedAt time.Time `bun:"created_at"`
UpdatedAt time.Time `bun:"updated_at"`
OrgID string `bun:"org_id,unique"`
ID uint64 `bun:"id,pk,autoincrement"`
Config string `bun:"config,notnull,type:text"`
Hash string `bun:"hash,notnull,type:text"`
CreatedAt time.Time `bun:"created_at,notnull"`
UpdatedAt time.Time `bun:"updated_at,notnull"`
OrgID string `bun:"org_id,notnull,unique"`
}{}).
ForeignKey(`("org_id") REFERENCES "organizations" ("id")`).
IfNotExists().
@@ -95,11 +102,11 @@ func (migration *addAlertmanager) Up(ctx context.Context, db *bun.DB) error {
Model(&struct {
bun.BaseModel `bun:"table:alertmanager_state"`
ID uint64 `bun:"id,pk,autoincrement"`
Silences string `bun:"silences,nullzero"`
NFLog string `bun:"nflog,nullzero"`
CreatedAt time.Time `bun:"created_at"`
UpdatedAt time.Time `bun:"updated_at"`
OrgID string `bun:"org_id,unique"`
Silences string `bun:"silences,nullzero,type:text"`
NFLog string `bun:"nflog,nullzero,type:text"`
CreatedAt time.Time `bun:"created_at,notnull"`
UpdatedAt time.Time `bun:"updated_at,notnull"`
OrgID string `bun:"org_id,notnull,unique"`
}{}).
ForeignKey(`("org_id") REFERENCES "organizations" ("id")`).
IfNotExists().
@@ -107,6 +114,10 @@ func (migration *addAlertmanager) Up(ctx context.Context, db *bun.DB) error {
return err
}
if err := migration.populateAlertmanagerConfig(ctx, tx, orgID); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
@@ -127,6 +138,69 @@ func (migration *addAlertmanager) populateOrgID(ctx context.Context, tx bun.Tx,
return nil
}
func (migration *addAlertmanager) populateAlertmanagerConfig(ctx context.Context, tx bun.Tx, orgID string) error {
var channels []*alertmanagertypes.Channel
err := tx.
NewSelect().
Model(&channels).
Where("org_id = ?", orgID).
Scan(ctx)
if err != nil {
return err
}
type matcher struct {
bun.BaseModel `bun:"table:rules"`
ID int `bun:"id,pk"`
Data string `bun:"data"`
}
matchers := []matcher{}
err = tx.
NewSelect().
Column("id", "data").
Model(&matchers).
Scan(ctx)
if err != nil {
return err
}
matchersMap := make(map[string][]string)
for _, matcher := range matchers {
receivers := gjson.Get(matcher.Data, "preferredChannels").Array()
for _, receiver := range receivers {
matchersMap[strconv.Itoa(matcher.ID)] = append(matchersMap[strconv.Itoa(matcher.ID)], receiver.String())
}
}
config, err := alertmanagertypes.NewConfigFromChannels(alertmanagerserver.NewConfig().Global, alertmanagerserver.NewConfig().Route, channels, orgID)
if err != nil {
return err
}
for ruleID, receivers := range matchersMap {
err = config.CreateRuleIDMatcher(ruleID, receivers)
if err != nil {
return err
}
}
if _, err := tx.
NewInsert().
Model(config.StoreableConfig()).
On("CONFLICT (org_id) DO UPDATE").
Set("config = ?", config.StoreableConfig().Config).
Set("hash = ?", config.StoreableConfig().Hash).
Set("updated_at = ?", config.StoreableConfig().UpdatedAt).
Exec(ctx); err != nil {
return err
}
return nil
}
func (migration *addAlertmanager) Down(ctx context.Context, db *bun.DB) error {
return nil
}