Compare commits

...

3 Commits

Author SHA1 Message Date
Vikrant Gupta
ce48733f4a feat(preference): add multi-tenant preference module (#7442)
* feat(sqlmigration): update the alertmanager tables

* feat(sqlmigration): added missing files

* feat(sqlmigration): port changes for alertmanager migration

* feat(sqlmigration): address nit picks

* feat(sqlmigration): make the preference package multi tenant

* feat(sqlmigration): bun insert changes

* feat(sqlmigration): bun insert changes

* feat(preference): add preference module

* feat(preference): add preference module

* feat(preference): handle the DI for modules

* feat(preference): add preference module

* feat(preference): add preference module

* feat(preference): add preference module

* feat(sqlmigration): merge base branch

* feat(sqlmigration): update apdex and TTL status tables  (#7481)

* feat(sqlmigration): update the apdex and ttl tables

* feat(sqlmigration): register the new migration and rename table

* feat(sqlmigration): fix the ttl queries

* feat(sqlmigration): fix the reset password and pat tables (#7482)

* feat(sqlmigration): fix the reset password and pat tables

* feat(sqlmigration): revert PAT changes

* feat(sqlmigration): register and rename the new migration

* feat(sqlmigration): return proper errors
2025-03-31 08:12:10 +05:30
Vikrant Gupta
900877a5bb feat(sqlmigration): update the alertmanager tables (#7431)
* feat(sqlmigration): update the alertmanager tables

* feat(sqlmigration): added missing files

* feat(sqlmigration): port changes for alertmanager migration

* feat(sqlmigration): address nit picks

* feat(sqlmigration): merge base branch
2025-03-30 03:02:45 +05:30
nityanandagohain
c63667c0e7 feat: merge agents
commit 959b7405f8
Author: nityanandagohain <nityanandagohain@gmail.com>
Date:   Thu Mar 27 23:38:07 2025 +0530

    fix: use default orgID for single tenant

commit 45d34b8528
Author: nityanandagohain <nityanandagohain@gmail.com>
Date:   Thu Mar 27 17:33:41 2025 +0530

    fix: use uuid7

commit 2e57d01068
Author: nityanandagohain <nityanandagohain@gmail.com>
Date:   Thu Mar 27 13:02:29 2025 +0530

    fix: migrations

commit 71cc60ca1d
Merge: 1120a97c8 027a1631e
Author: nityanandagohain <nityanandagohain@gmail.com>
Date:   Thu Mar 27 12:11:05 2025 +0530

    Merge remote-tracking branch 'origin/main' into issue_463

commit 1120a97c81
Author: nityanandagohain <nityanandagohain@gmail.com>
Date:   Wed Mar 26 22:34:52 2025 +0530

    fix: minor changes

commit 25044875d9
Author: nityanandagohain <nityanandagohain@gmail.com>
Date:   Wed Mar 26 13:29:02 2025 +0530

    fix: tests

commit b7f0e186dc
Author: nityanandagohain <nityanandagohain@gmail.com>
Date:   Wed Mar 26 00:38:08 2025 +0530

    fix: tests

commit e94347891d
Author: nityanandagohain <nityanandagohain@gmail.com>
Date:   Tue Mar 25 18:21:00 2025 +0530

    fix: opamp server changes

commit 1e37e0ef66
Author: nityanandagohain <nityanandagohain@gmail.com>
Date:   Mon Mar 24 18:15:08 2025 +0530

    fix: use sqlstore

commit 7f3f4bc10b
Author: nityanandagohain <nityanandagohain@gmail.com>
Date:   Mon Mar 24 17:40:29 2025 +0530

    fix: remove frontend package manger commit

commit bc3963ff96
Merge: cfb226df4 3515686da
Author: nityanandagohain <nityanandagohain@gmail.com>
Date:   Mon Mar 24 17:38:01 2025 +0530

    Merge remote-tracking branch 'origin/main' into issue_463

commit cfb226df4d
Author: nityanandagohain <nityanandagohain@gmail.com>
Date:   Mon Mar 17 16:56:42 2025 +0530

    fix: initial commit for agents
2025-03-28 10:15:33 +05:30
66 changed files with 3153 additions and 1356 deletions

View File

@@ -11,6 +11,8 @@ import (
"github.com/SigNoz/signoz/ee/query-service/license" "github.com/SigNoz/signoz/ee/query-service/license"
"github.com/SigNoz/signoz/ee/query-service/usage" "github.com/SigNoz/signoz/ee/query-service/usage"
"github.com/SigNoz/signoz/pkg/alertmanager" "github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/modules/preference"
preferencecore "github.com/SigNoz/signoz/pkg/modules/preference/core"
baseapp "github.com/SigNoz/signoz/pkg/query-service/app" baseapp "github.com/SigNoz/signoz/pkg/query-service/app"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations" "github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations" "github.com/SigNoz/signoz/pkg/query-service/app/integrations"
@@ -21,6 +23,7 @@ import (
rules "github.com/SigNoz/signoz/pkg/query-service/rules" rules "github.com/SigNoz/signoz/pkg/query-service/rules"
"github.com/SigNoz/signoz/pkg/signoz" "github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/preferencetypes"
"github.com/SigNoz/signoz/pkg/version" "github.com/SigNoz/signoz/pkg/version"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
@@ -54,6 +57,7 @@ type APIHandler struct {
// NewAPIHandler returns an APIHandler // NewAPIHandler returns an APIHandler
func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler, error) { func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler, error) {
preference := preference.NewAPI(preferencecore.NewPreference(preferencecore.NewStore(signoz.SQLStore), preferencetypes.NewDefaultPreferenceMap()))
baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{ baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{
Reader: opts.DataConnector, Reader: opts.DataConnector,
@@ -71,6 +75,7 @@ func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler,
UseTraceNewSchema: opts.UseTraceNewSchema, UseTraceNewSchema: opts.UseTraceNewSchema,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager), AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
Signoz: signoz, Signoz: signoz,
Preference: preference,
}) })
if err != nil { if err != nil {

View File

@@ -5,21 +5,20 @@ import (
"github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2"
"github.com/jmoiron/sqlx"
"github.com/SigNoz/signoz/pkg/cache" "github.com/SigNoz/signoz/pkg/cache"
basechr "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader" basechr "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/interfaces" "github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/sqlstore"
) )
type ClickhouseReader struct { type ClickhouseReader struct {
conn clickhouse.Conn conn clickhouse.Conn
appdb *sqlx.DB appdb sqlstore.SQLStore
*basechr.ClickHouseReader *basechr.ClickHouseReader
} }
func NewDataConnector( func NewDataConnector(
localDB *sqlx.DB, sqlDB sqlstore.SQLStore,
ch clickhouse.Conn, ch clickhouse.Conn,
promConfigPath string, promConfigPath string,
lm interfaces.FeatureLookup, lm interfaces.FeatureLookup,
@@ -29,10 +28,10 @@ func NewDataConnector(
fluxIntervalForTraceDetail time.Duration, fluxIntervalForTraceDetail time.Duration,
cache cache.Cache, cache cache.Cache,
) *ClickhouseReader { ) *ClickhouseReader {
chReader := basechr.NewReader(localDB, ch, promConfigPath, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) chReader := basechr.NewReader(sqlDB, ch, promConfigPath, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
return &ClickhouseReader{ return &ClickhouseReader{
conn: ch, conn: ch,
appdb: localDB, appdb: sqlDB,
ClickHouseReader: chReader, ClickHouseReader: chReader,
} }
} }

View File

@@ -43,7 +43,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline" "github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
"github.com/SigNoz/signoz/pkg/query-service/app/opamp" "github.com/SigNoz/signoz/pkg/query-service/app/opamp"
opAmpModel "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model" opAmpModel "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
"github.com/SigNoz/signoz/pkg/query-service/app/preferences"
"github.com/SigNoz/signoz/pkg/query-service/cache" "github.com/SigNoz/signoz/pkg/query-service/cache"
baseconst "github.com/SigNoz/signoz/pkg/query-service/constants" baseconst "github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/query-service/healthcheck" "github.com/SigNoz/signoz/pkg/query-service/healthcheck"
@@ -116,10 +115,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err return nil, err
} }
if err := preferences.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB()); err != nil {
return nil, err
}
if err := dashboards.InitDB(serverOptions.SigNoz.SQLStore); err != nil { if err := dashboards.InitDB(serverOptions.SigNoz.SQLStore); err != nil {
return nil, err return nil, err
} }
@@ -146,7 +141,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
var reader interfaces.DataConnector var reader interfaces.DataConnector
qb := db.NewDataConnector( qb := db.NewDataConnector(
serverOptions.SigNoz.SQLStore.SQLxDB(), serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(), serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
serverOptions.PromConfigPath, serverOptions.PromConfigPath,
lm, lm,
@@ -196,10 +191,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
} }
// initiate opamp // initiate opamp
_, err = opAmpModel.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB()) opAmpModel.InitDB(serverOptions.SigNoz.SQLStore)
if err != nil {
return nil, err
}
integrationsController, err := integrations.NewController(serverOptions.SigNoz.SQLStore) integrationsController, err := integrations.NewController(serverOptions.SigNoz.SQLStore)
if err != nil { if err != nil {
@@ -225,7 +217,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
// initiate agent config handler // initiate agent config handler
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{ agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: serverOptions.SigNoz.SQLStore.SQLxDB(), Store: serverOptions.SigNoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController}, AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController},
}) })
if err != nil { if err != nil {

View File

@@ -2,11 +2,29 @@ package postgressqlstore
import ( import (
"context" "context"
"fmt"
"reflect" "reflect"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/uptrace/bun" "github.com/uptrace/bun"
) )
var (
Identity = "id"
Integer = "bigint"
Text = "text"
)
var (
Org = "org"
User = "user"
)
var (
OrgReference = `("org_id") REFERENCES "organizations" ("id")`
UserReference = `("user_id") REFERENCES "users" ("id") ON DELETE CASCADE ON UPDATE CASCADE`
)
type dialect struct { type dialect struct {
} }
@@ -103,7 +121,7 @@ func (dialect *dialect) GetColumnType(ctx context.Context, bun bun.IDB, table st
err := bun.NewSelect(). err := bun.NewSelect().
ColumnExpr("data_type"). ColumnExpr("data_type").
TableExpr("information_schema.columns"). TableExpr("").
Where("table_name = ?", table). Where("table_name = ?", table).
Where("column_name = ?", column). Where("column_name = ?", column).
Scan(ctx, &columnType) Scan(ctx, &columnType)
@@ -130,6 +148,22 @@ func (dialect *dialect) ColumnExists(ctx context.Context, bun bun.IDB, table str
return count > 0, nil return count > 0, nil
} }
func (dialect *dialect) IndexExists(ctx context.Context, bun bun.IDB, table string, index string) (bool, error) {
var count int
err := bun.NewSelect().
ColumnExpr("COUNT(*)").
TableExpr("pg_indexes").
Where("tablename = ?", table).
Where("indexname = ?", index).
Scan(ctx, &count)
if err != nil {
return false, err
}
return count > 0, nil
}
func (dialect *dialect) RenameColumn(ctx context.Context, bun bun.IDB, table string, oldColumnName string, newColumnName string) (bool, error) { func (dialect *dialect) RenameColumn(ctx context.Context, bun bun.IDB, table string, oldColumnName string, newColumnName string) (bool, error) {
oldColumnExists, err := dialect.ColumnExists(ctx, bun, table, oldColumnName) oldColumnExists, err := dialect.ColumnExists(ctx, bun, table, oldColumnName)
if err != nil { if err != nil {
@@ -174,7 +208,10 @@ func (dialect *dialect) TableExists(ctx context.Context, bun bun.IDB, table inte
return true, nil return true, nil
} }
func (dialect *dialect) RenameTableAndModifyModel(ctx context.Context, bun bun.IDB, oldModel interface{}, newModel interface{}, cb func(context.Context) error) error { func (dialect *dialect) RenameTableAndModifyModel(ctx context.Context, bun bun.IDB, oldModel interface{}, newModel interface{}, reference string, cb func(context.Context) error) error {
if reference == "" {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "cannot run migration without reference")
}
exists, err := dialect.TableExists(ctx, bun, newModel) exists, err := dialect.TableExists(ctx, bun, newModel)
if err != nil { if err != nil {
return err return err
@@ -183,10 +220,18 @@ func (dialect *dialect) RenameTableAndModifyModel(ctx context.Context, bun bun.I
return nil return nil
} }
fkReference := ""
if reference == Org {
fkReference = OrgReference
} else if reference == User {
fkReference = UserReference
}
_, err = bun. _, err = bun.
NewCreateTable(). NewCreateTable().
IfNotExists(). IfNotExists().
Model(newModel). Model(newModel).
ForeignKey(fkReference).
Exec(ctx) Exec(ctx)
if err != nil { if err != nil {
@@ -209,3 +254,115 @@ func (dialect *dialect) RenameTableAndModifyModel(ctx context.Context, bun bun.I
return nil return nil
} }
func (dialect *dialect) UpdatePrimaryKey(ctx context.Context, bun bun.IDB, oldModel interface{}, newModel interface{}, reference string, cb func(context.Context) error) error {
if reference == "" {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "cannot run migration without reference")
}
oldTableName := bun.Dialect().Tables().Get(reflect.TypeOf(oldModel)).Name
newTableName := bun.Dialect().Tables().Get(reflect.TypeOf(newModel)).Name
columnType, err := dialect.GetColumnType(ctx, bun, oldTableName, Identity)
if err != nil {
return err
}
if columnType == Text {
return nil
}
fkReference := ""
if reference == Org {
fkReference = OrgReference
} else if reference == User {
fkReference = UserReference
}
_, err = bun.
NewCreateTable().
IfNotExists().
Model(newModel).
ForeignKey(fkReference).
Exec(ctx)
if err != nil {
return err
}
err = cb(ctx)
if err != nil {
return err
}
_, err = bun.
NewDropTable().
IfExists().
Model(oldModel).
Exec(ctx)
if err != nil {
return err
}
_, err = bun.
ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", newTableName, oldTableName))
if err != nil {
return err
}
return nil
}
func (dialect *dialect) AddPrimaryKey(ctx context.Context, bun bun.IDB, oldModel interface{}, newModel interface{}, reference string, cb func(context.Context) error) error {
if reference == "" {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "cannot run migration without reference")
}
oldTableName := bun.Dialect().Tables().Get(reflect.TypeOf(oldModel)).Name
newTableName := bun.Dialect().Tables().Get(reflect.TypeOf(newModel)).Name
identityExists, err := dialect.ColumnExists(ctx, bun, oldTableName, Identity)
if err != nil {
return err
}
if identityExists {
return nil
}
fkReference := ""
if reference == Org {
fkReference = OrgReference
} else if reference == User {
fkReference = UserReference
}
_, err = bun.
NewCreateTable().
IfNotExists().
Model(newModel).
ForeignKey(fkReference).
Exec(ctx)
if err != nil {
return err
}
err = cb(ctx)
if err != nil {
return err
}
_, err = bun.
NewDropTable().
IfExists().
Model(oldModel).
Exec(ctx)
if err != nil {
return err
}
_, err = bun.
ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", newTableName, oldTableName))
if err != nil {
return err
}
return nil
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory" "github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes" "github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
"github.com/SigNoz/signoz/pkg/valuer"
) )
var ( var (
@@ -33,16 +34,16 @@ type Alertmanager interface {
ListAllChannels(context.Context) ([]*alertmanagertypes.Channel, error) ListAllChannels(context.Context) ([]*alertmanagertypes.Channel, error)
// GetChannelByID gets a channel for the organization. // GetChannelByID gets a channel for the organization.
GetChannelByID(context.Context, string, int) (*alertmanagertypes.Channel, error) GetChannelByID(context.Context, string, valuer.UUID) (*alertmanagertypes.Channel, error)
// UpdateChannel updates a channel for the organization. // UpdateChannel updates a channel for the organization.
UpdateChannelByReceiverAndID(context.Context, string, alertmanagertypes.Receiver, int) error UpdateChannelByReceiverAndID(context.Context, string, alertmanagertypes.Receiver, valuer.UUID) error
// CreateChannel creates a channel for the organization. // CreateChannel creates a channel for the organization.
CreateChannel(context.Context, string, alertmanagertypes.Receiver) error CreateChannel(context.Context, string, alertmanagertypes.Receiver) error
// DeleteChannelByID deletes a channel for the organization. // DeleteChannelByID deletes a channel for the organization.
DeleteChannelByID(context.Context, string, int) error DeleteChannelByID(context.Context, string, valuer.UUID) error
// SetConfig sets the config for the organization. // SetConfig sets the config for the organization.
SetConfig(context.Context, *alertmanagertypes.Config) error SetConfig(context.Context, *alertmanagertypes.Config) error

View File

@@ -8,6 +8,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes" "github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
"github.com/uptrace/bun" "github.com/uptrace/bun"
) )
@@ -99,7 +100,7 @@ func (store *config) CreateChannel(ctx context.Context, channel *alertmanagertyp
}, opts...) }, opts...)
} }
func (store *config) GetChannelByID(ctx context.Context, orgID string, id int) (*alertmanagertypes.Channel, error) { func (store *config) GetChannelByID(ctx context.Context, orgID string, id valuer.UUID) (*alertmanagertypes.Channel, error) {
channel := new(alertmanagertypes.Channel) channel := new(alertmanagertypes.Channel)
err := store. err := store.
@@ -108,11 +109,11 @@ func (store *config) GetChannelByID(ctx context.Context, orgID string, id int) (
NewSelect(). NewSelect().
Model(channel). Model(channel).
Where("org_id = ?", orgID). Where("org_id = ?", orgID).
Where("id = ?", id). Where("id = ?", id.StringValue()).
Scan(ctx) Scan(ctx)
if err != nil { if err != nil {
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
return nil, errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", id) return nil, errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %s", id.StringValue())
} }
return nil, err return nil, err
} }
@@ -136,7 +137,7 @@ func (store *config) UpdateChannel(ctx context.Context, orgID string, channel *a
}, opts...) }, opts...)
} }
func (store *config) DeleteChannelByID(ctx context.Context, orgID string, id int, opts ...alertmanagertypes.StoreOption) error { func (store *config) DeleteChannelByID(ctx context.Context, orgID string, id valuer.UUID, opts ...alertmanagertypes.StoreOption) error {
return store.wrap(ctx, func(ctx context.Context) error { return store.wrap(ctx, func(ctx context.Context) error {
channel := new(alertmanagertypes.Channel) channel := new(alertmanagertypes.Channel)
@@ -146,7 +147,7 @@ func (store *config) DeleteChannelByID(ctx context.Context, orgID string, id int
NewDelete(). NewDelete().
Model(channel). Model(channel).
Where("org_id = ?", orgID). Where("org_id = ?", orgID).
Where("id = ?", id). Where("id = ?", id.StringValue()).
Exec(ctx); err != nil { Exec(ctx); err != nil {
return err return err
} }

View File

@@ -4,13 +4,13 @@ import (
"context" "context"
"io" "io"
"net/http" "net/http"
"strconv"
"time" "time"
"github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render" "github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes" "github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
"github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
@@ -140,9 +140,9 @@ func (api *API) GetChannelByID(rw http.ResponseWriter, req *http.Request) {
return return
} }
id, err := strconv.Atoi(idString) id, err := valuer.NewUUID(idString)
if err != nil { if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid integer")) render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid uuid-v7"))
return return
} }
@@ -177,9 +177,9 @@ func (api *API) UpdateChannelByID(rw http.ResponseWriter, req *http.Request) {
return return
} }
id, err := strconv.Atoi(idString) id, err := valuer.NewUUID(idString)
if err != nil { if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid integer")) render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid uuid-v7"))
return return
} }
@@ -227,9 +227,9 @@ func (api *API) DeleteChannelByID(rw http.ResponseWriter, req *http.Request) {
return return
} }
id, err := strconv.Atoi(idString) id, err := valuer.NewUUID(idString)
if err != nil { if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid integer")) render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid uuid-v7"))
return return
} }

View File

@@ -16,6 +16,7 @@ import (
"github.com/SigNoz/signoz/pkg/factory" "github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes" "github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
) )
@@ -269,11 +270,11 @@ func (provider *provider) ListAllChannels(ctx context.Context) ([]*alertmanagert
return channels, nil return channels, nil
} }
func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) { func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID valuer.UUID) (*alertmanagertypes.Channel, error) {
return provider.configStore.GetChannelByID(ctx, orgID, channelID) return provider.configStore.GetChannelByID(ctx, orgID, channelID)
} }
func (provider *provider) UpdateChannelByReceiverAndID(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver, id int) error { func (provider *provider) UpdateChannelByReceiverAndID(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver, id valuer.UUID) error {
channel, err := provider.configStore.GetChannelByID(ctx, orgID, id) channel, err := provider.configStore.GetChannelByID(ctx, orgID, id)
if err != nil { if err != nil {
return err return err
@@ -378,7 +379,7 @@ func (provider *provider) CreateChannel(ctx context.Context, orgID string, recei
})) }))
} }
func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, channelID int) error { func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, channelID valuer.UUID) error {
channel, err := provider.configStore.GetChannelByID(ctx, orgID, channelID) channel, err := provider.configStore.GetChannelByID(ctx, orgID, channelID)
if err != nil { if err != nil {
return err return err

View File

@@ -10,6 +10,7 @@ import (
"github.com/SigNoz/signoz/pkg/factory" "github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes" "github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
"github.com/SigNoz/signoz/pkg/valuer"
) )
type provider struct { type provider struct {
@@ -99,11 +100,11 @@ func (provider *provider) ListAllChannels(ctx context.Context) ([]*alertmanagert
return nil, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "not supported by provider signoz") return nil, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "not supported by provider signoz")
} }
func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) { func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID valuer.UUID) (*alertmanagertypes.Channel, error) {
return provider.configStore.GetChannelByID(ctx, orgID, channelID) return provider.configStore.GetChannelByID(ctx, orgID, channelID)
} }
func (provider *provider) UpdateChannelByReceiverAndID(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver, id int) error { func (provider *provider) UpdateChannelByReceiverAndID(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver, id valuer.UUID) error {
channel, err := provider.configStore.GetChannelByID(ctx, orgID, id) channel, err := provider.configStore.GetChannelByID(ctx, orgID, id)
if err != nil { if err != nil {
return err return err
@@ -127,7 +128,7 @@ func (provider *provider) UpdateChannelByReceiverAndID(ctx context.Context, orgI
})) }))
} }
func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, channelID int) error { func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, channelID valuer.UUID) error {
channel, err := provider.configStore.GetChannelByID(ctx, orgID, channelID) channel, err := provider.configStore.GetChannelByID(ctx, orgID, channelID)
if err != nil { if err != nil {
return err return err

View File

@@ -0,0 +1,149 @@
package preference
import (
"encoding/json"
"net/http"
errorsV2 "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/preferencetypes"
"github.com/gorilla/mux"
)
type API interface {
GetOrgPreference(http.ResponseWriter, *http.Request)
UpdateOrgPreference(http.ResponseWriter, *http.Request)
GetAllOrgPreferences(http.ResponseWriter, *http.Request)
GetUserPreference(http.ResponseWriter, *http.Request)
UpdateUserPreference(http.ResponseWriter, *http.Request)
GetAllUserPreferences(http.ResponseWriter, *http.Request)
}
type preferenceAPI struct {
usecase Usecase
}
func NewAPI(usecase Usecase) API {
return &preferenceAPI{usecase: usecase}
}
func (p *preferenceAPI) GetOrgPreference(rw http.ResponseWriter, r *http.Request) {
preferenceId := mux.Vars(r)["preferenceId"]
claims, ok := authtypes.ClaimsFromContext(r.Context())
if !ok {
render.Error(rw, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated"))
return
}
preference, err := p.usecase.GetOrgPreference(
r.Context(), preferenceId, claims.OrgID,
)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, preference)
}
func (p *preferenceAPI) UpdateOrgPreference(rw http.ResponseWriter, r *http.Request) {
preferenceId := mux.Vars(r)["preferenceId"]
req := preferencetypes.UpdatablePreference{}
claims, ok := authtypes.ClaimsFromContext(r.Context())
if !ok {
render.Error(rw, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated"))
return
}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
render.Error(rw, err)
return
}
err = p.usecase.UpdateOrgPreference(r.Context(), preferenceId, req.PreferenceValue, claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusNoContent, nil)
}
func (p *preferenceAPI) GetAllOrgPreferences(rw http.ResponseWriter, r *http.Request) {
claims, ok := authtypes.ClaimsFromContext(r.Context())
if !ok {
render.Error(rw, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated"))
return
}
preferences, err := p.usecase.GetAllOrgPreferences(
r.Context(), claims.OrgID,
)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, preferences)
}
func (p *preferenceAPI) GetUserPreference(rw http.ResponseWriter, r *http.Request) {
preferenceId := mux.Vars(r)["preferenceId"]
claims, ok := authtypes.ClaimsFromContext(r.Context())
if !ok {
render.Error(rw, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated"))
return
}
preference, err := p.usecase.GetUserPreference(
r.Context(), preferenceId, claims.OrgID, claims.UserID,
)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, preference)
}
func (p *preferenceAPI) UpdateUserPreference(rw http.ResponseWriter, r *http.Request) {
preferenceId := mux.Vars(r)["preferenceId"]
claims, ok := authtypes.ClaimsFromContext(r.Context())
if !ok {
render.Error(rw, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated"))
return
}
req := preferencetypes.UpdatablePreference{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
render.Error(rw, err)
return
}
err = p.usecase.UpdateUserPreference(r.Context(), preferenceId, req.PreferenceValue, claims.UserID)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusNoContent, nil)
}
func (p *preferenceAPI) GetAllUserPreferences(rw http.ResponseWriter, r *http.Request) {
claims, ok := authtypes.ClaimsFromContext(r.Context())
if !ok {
render.Error(rw, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated"))
return
}
preferences, err := p.usecase.GetAllUserPreferences(
r.Context(), claims.OrgID, claims.UserID,
)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, preferences)
}

View File

@@ -0,0 +1,278 @@
package core
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/preference"
"github.com/SigNoz/signoz/pkg/types/preferencetypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type usecase struct {
store preferencetypes.PreferenceStore
defaultMap map[string]preferencetypes.Preference
}
func NewPreference(store preferencetypes.PreferenceStore, defaultMap map[string]preferencetypes.Preference) preference.Usecase {
return &usecase{store: store, defaultMap: defaultMap}
}
func (usecase *usecase) GetOrgPreference(ctx context.Context, preferenceID string, orgID string) (*preferencetypes.GettablePreference, error) {
preference, seen := usecase.defaultMap[preferenceID]
if !seen {
return nil, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, fmt.Sprintf("no such preferenceID exists: %s", preferenceID))
}
isPreferenceEnabled := preference.IsEnabledForScope(preferencetypes.OrgAllowedScope)
if !isPreferenceEnabled {
return nil, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, fmt.Sprintf("preference is not enabled at org scope: %s", preferenceID))
}
orgPreference, err := usecase.store.GetOrgPreference(ctx, orgID, preferenceID)
if err != nil {
if err == sql.ErrNoRows {
return &preferencetypes.GettablePreference{
PreferenceID: preferenceID,
PreferenceValue: preference.DefaultValue,
}, nil
}
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, fmt.Sprintf("error in fetching the org preference: %s", preferenceID))
}
return &preferencetypes.GettablePreference{
PreferenceID: preferenceID,
PreferenceValue: preference.SanitizeValue(orgPreference.PreferenceValue),
}, nil
}
func (usecase *usecase) UpdateOrgPreference(ctx context.Context, preferenceID string, preferenceValue interface{}, orgId string) error {
preference, seen := usecase.defaultMap[preferenceID]
if !seen {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, fmt.Sprintf("no such preferenceID exists: %s", preferenceID))
}
isPreferenceEnabled := preference.IsEnabledForScope(preferencetypes.OrgAllowedScope)
if !isPreferenceEnabled {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, fmt.Sprintf("preference is not enabled at org scope: %s", preferenceID))
}
err := preference.IsValidValue(preferenceValue)
if err != nil {
return err
}
storablePreferenceValue, encodeErr := json.Marshal(preferenceValue)
if encodeErr != nil {
return errors.Wrapf(encodeErr, errors.TypeInvalidInput, errors.CodeInvalidInput, "error in encoding the preference value")
}
orgPreference, dberr := usecase.store.GetOrgPreference(ctx, orgId, preferenceID)
if dberr != nil && dberr != sql.ErrNoRows {
return errors.Wrapf(dberr, errors.TypeInternal, errors.CodeInternal, "error in getting the preference value")
}
if dberr != nil {
orgPreference.ID = valuer.GenerateUUID()
orgPreference.PreferenceID = preferenceID
orgPreference.PreferenceValue = string(storablePreferenceValue)
orgPreference.OrgID = orgId
} else {
orgPreference.PreferenceValue = string(storablePreferenceValue)
}
dberr = usecase.store.UpsertOrgPreference(ctx, orgPreference)
if dberr != nil {
return errors.Wrapf(dberr, errors.TypeInternal, errors.CodeInternal, "error in setting the preference value")
}
return nil
}
func (usecase *usecase) GetAllOrgPreferences(ctx context.Context, orgID string) ([]*preferencetypes.PreferenceWithValue, error) {
allOrgPreferences := []*preferencetypes.PreferenceWithValue{}
orgPreferences, err := usecase.store.GetAllOrgPreferences(ctx, orgID)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "error in setting all org preference values")
}
preferenceValueMap := map[string]interface{}{}
for _, preferenceValue := range orgPreferences {
preferenceValueMap[preferenceValue.PreferenceID] = preferenceValue.PreferenceValue
}
for _, preference := range usecase.defaultMap {
isEnabledForOrgScope := preference.IsEnabledForScope(preferencetypes.OrgAllowedScope)
if isEnabledForOrgScope {
preferenceWithValue := &preferencetypes.PreferenceWithValue{}
preferenceWithValue.Key = preference.Key
preferenceWithValue.Name = preference.Name
preferenceWithValue.Description = preference.Description
preferenceWithValue.AllowedScopes = preference.AllowedScopes
preferenceWithValue.AllowedValues = preference.AllowedValues
preferenceWithValue.DefaultValue = preference.DefaultValue
preferenceWithValue.Range = preference.Range
preferenceWithValue.ValueType = preference.ValueType
preferenceWithValue.IsDiscreteValues = preference.IsDiscreteValues
value, seen := preferenceValueMap[preference.Key]
if seen {
preferenceWithValue.Value = value
} else {
preferenceWithValue.Value = preference.DefaultValue
}
preferenceWithValue.Value = preference.SanitizeValue(preferenceWithValue.Value)
allOrgPreferences = append(allOrgPreferences, preferenceWithValue)
}
}
return allOrgPreferences, nil
}
func (usecase *usecase) GetUserPreference(ctx context.Context, preferenceID string, orgId string, userId string) (*preferencetypes.GettablePreference, error) {
preference, seen := usecase.defaultMap[preferenceID]
if !seen {
return nil, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, fmt.Sprintf("no such preferenceID exists: %s", preferenceID))
}
preferenceValue := preferencetypes.GettablePreference{
PreferenceID: preferenceID,
PreferenceValue: preference.DefaultValue,
}
isPreferenceEnabledAtUserScope := preference.IsEnabledForScope(preferencetypes.UserAllowedScope)
if !isPreferenceEnabledAtUserScope {
return nil, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, fmt.Sprintf("preference is not enabled at user scope: %s", preferenceID))
}
isPreferenceEnabledAtOrgScope := preference.IsEnabledForScope(preferencetypes.OrgAllowedScope)
if isPreferenceEnabledAtOrgScope {
orgPreference, err := usecase.store.GetOrgPreference(ctx, orgId, preferenceID)
if err != nil && err != sql.ErrNoRows {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, fmt.Sprintf("error in fetching the org preference: %s", preferenceID))
}
if err == nil {
preferenceValue.PreferenceValue = orgPreference.PreferenceValue
}
}
userPreference, err := usecase.store.GetUserPreference(ctx, userId, preferenceID)
if err != nil && err != sql.ErrNoRows {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, fmt.Sprintf("error in fetching the user preference: %s", preferenceID))
}
if err == nil {
preferenceValue.PreferenceValue = userPreference.PreferenceValue
}
return &preferencetypes.GettablePreference{
PreferenceID: preferenceValue.PreferenceID,
PreferenceValue: preference.SanitizeValue(preferenceValue.PreferenceValue),
}, nil
}
func (usecase *usecase) UpdateUserPreference(ctx context.Context, preferenceID string, preferenceValue interface{}, userId string) error {
preference, seen := usecase.defaultMap[preferenceID]
if !seen {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, fmt.Sprintf("no such preferenceID exists: %s", preferenceID))
}
isPreferenceEnabledAtUserScope := preference.IsEnabledForScope(preferencetypes.UserAllowedScope)
if !isPreferenceEnabledAtUserScope {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, fmt.Sprintf("preference is not enabled at user scope: %s", preferenceID))
}
err := preference.IsValidValue(preferenceValue)
if err != nil {
return err
}
storablePreferenceValue, encodeErr := json.Marshal(preferenceValue)
if encodeErr != nil {
return errors.Wrapf(encodeErr, errors.TypeInvalidInput, errors.CodeInvalidInput, "error in encoding the preference value")
}
userPreference, dberr := usecase.store.GetUserPreference(ctx, userId, preferenceID)
if dberr != nil && dberr != sql.ErrNoRows {
return errors.Wrapf(dberr, errors.TypeInternal, errors.CodeInternal, "error in getting the preference value")
}
if dberr != nil {
userPreference.ID = valuer.GenerateUUID()
userPreference.PreferenceID = preferenceID
userPreference.PreferenceValue = string(storablePreferenceValue)
userPreference.UserID = userId
} else {
userPreference.PreferenceValue = string(storablePreferenceValue)
}
dberr = usecase.store.UpsertUserPreference(ctx, userPreference)
if dberr != nil {
return errors.Wrapf(dberr, errors.TypeInternal, errors.CodeInternal, "error in setting the preference value")
}
return nil
}
func (usecase *usecase) GetAllUserPreferences(ctx context.Context, orgID string, userID string) ([]*preferencetypes.PreferenceWithValue, error) {
allUserPreferences := []*preferencetypes.PreferenceWithValue{}
orgPreferences, err := usecase.store.GetAllOrgPreferences(ctx, orgID)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "error in setting all org preference values")
}
preferenceOrgValueMap := map[string]interface{}{}
for _, preferenceValue := range orgPreferences {
preferenceOrgValueMap[preferenceValue.PreferenceID] = preferenceValue.PreferenceValue
}
userPreferences, err := usecase.store.GetAllUserPreferences(ctx, userID)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "error in setting all user preference values")
}
preferenceUserValueMap := map[string]interface{}{}
for _, preferenceValue := range userPreferences {
preferenceUserValueMap[preferenceValue.PreferenceID] = preferenceValue.PreferenceValue
}
for _, preference := range usecase.defaultMap {
isEnabledForUserScope := preference.IsEnabledForScope(preferencetypes.UserAllowedScope)
if isEnabledForUserScope {
preferenceWithValue := &preferencetypes.PreferenceWithValue{}
preferenceWithValue.Key = preference.Key
preferenceWithValue.Name = preference.Name
preferenceWithValue.Description = preference.Description
preferenceWithValue.AllowedScopes = preference.AllowedScopes
preferenceWithValue.AllowedValues = preference.AllowedValues
preferenceWithValue.DefaultValue = preference.DefaultValue
preferenceWithValue.Range = preference.Range
preferenceWithValue.ValueType = preference.ValueType
preferenceWithValue.IsDiscreteValues = preference.IsDiscreteValues
preferenceWithValue.Value = preference.DefaultValue
isEnabledForOrgScope := preference.IsEnabledForScope(preferencetypes.OrgAllowedScope)
if isEnabledForOrgScope {
value, seen := preferenceOrgValueMap[preference.Key]
if seen {
preferenceWithValue.Value = value
}
}
value, seen := preferenceUserValueMap[preference.Key]
if seen {
preferenceWithValue.Value = value
}
preferenceWithValue.Value = preference.SanitizeValue(preferenceWithValue.Value)
allUserPreferences = append(allUserPreferences, preferenceWithValue)
}
}
return allUserPreferences, nil
}

View File

@@ -0,0 +1,116 @@
package core
import (
"context"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/preferencetypes"
)
type store struct {
store sqlstore.SQLStore
}
func NewStore(db sqlstore.SQLStore) preferencetypes.PreferenceStore {
return &store{store: db}
}
func (store *store) GetOrgPreference(ctx context.Context, orgID string, preferenceID string) (*preferencetypes.StorableOrgPreference, error) {
orgPreference := new(preferencetypes.StorableOrgPreference)
err := store.
store.
BunDB().
NewSelect().
Model(orgPreference).
Where("preference_id = ?", preferenceID).
Where("org_id = ?", orgID).
Scan(ctx)
if err != nil {
return orgPreference, err
}
return orgPreference, nil
}
func (store *store) GetAllOrgPreferences(ctx context.Context, orgID string) ([]*preferencetypes.StorableOrgPreference, error) {
orgPreferences := make([]*preferencetypes.StorableOrgPreference, 0)
err := store.
store.
BunDB().
NewSelect().
Model(&orgPreferences).
Where("org_id = ?", orgID).
Scan(ctx)
if err != nil {
return orgPreferences, err
}
return orgPreferences, nil
}
func (store *store) UpsertOrgPreference(ctx context.Context, orgPreference *preferencetypes.StorableOrgPreference) error {
_, err := store.
store.
BunDB().
NewInsert().
Model(orgPreference).
On("CONFLICT (id) DO UPDATE").
Exec(ctx)
if err != nil {
return err
}
return nil
}
func (store *store) GetUserPreference(ctx context.Context, userID string, preferenceID string) (*preferencetypes.StorableUserPreference, error) {
userPreference := new(preferencetypes.StorableUserPreference)
err := store.
store.
BunDB().
NewSelect().
Model(userPreference).
Where("preference_id = ?", preferenceID).
Where("user_id = ?", userID).
Scan(ctx)
if err != nil {
return userPreference, err
}
return userPreference, nil
}
func (store *store) GetAllUserPreferences(ctx context.Context, userID string) ([]*preferencetypes.StorableUserPreference, error) {
userPreferences := make([]*preferencetypes.StorableUserPreference, 0)
err := store.
store.
BunDB().
NewSelect().
Model(&userPreferences).
Where("user_id = ?", userID).
Scan(ctx)
if err != nil {
return userPreferences, err
}
return userPreferences, nil
}
func (store *store) UpsertUserPreference(ctx context.Context, userPreference *preferencetypes.StorableUserPreference) error {
_, err := store.
store.
BunDB().
NewInsert().
Model(userPreference).
On("CONFLICT (id) DO UPDATE").
Exec(ctx)
if err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,17 @@
package preference
import (
"context"
"github.com/SigNoz/signoz/pkg/types/preferencetypes"
)
type Usecase interface {
GetOrgPreference(ctx context.Context, preferenceId string, orgId string) (*preferencetypes.GettablePreference, error)
UpdateOrgPreference(ctx context.Context, preferenceId string, preferenceValue interface{}, orgId string) error
GetAllOrgPreferences(ctx context.Context, orgId string) ([]*preferencetypes.PreferenceWithValue, error)
GetUserPreference(ctx context.Context, preferenceId string, orgId string, userId string) (*preferencetypes.GettablePreference, error)
UpdateUserPreference(ctx context.Context, preferenceId string, preferenceValue interface{}, userId string) error
GetAllUserPreferences(ctx context.Context, orgId string, userId string) ([]*preferencetypes.PreferenceWithValue, error)
}

View File

@@ -1,6 +1,9 @@
package agentConf package agentConf
import "github.com/SigNoz/signoz/pkg/query-service/model" import (
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/types"
)
// Interface for features implemented via agent config. // Interface for features implemented via agent config.
// Eg: ingestion side signal pre-processing features like log processing pipelines etc // Eg: ingestion side signal pre-processing features like log processing pipelines etc
@@ -11,8 +14,9 @@ type AgentFeature interface {
// Recommend config for an agent based on its `currentConfYaml` and // Recommend config for an agent based on its `currentConfYaml` and
// `configVersion` for the feature's settings // `configVersion` for the feature's settings
RecommendAgentConfig( RecommendAgentConfig(
orgId string,
currentConfYaml []byte, currentConfYaml []byte,
configVersion *ConfigVersion, configVersion *types.AgentConfigVersion,
) ( ) (
recommendedConfYaml []byte, recommendedConfYaml []byte,

View File

@@ -6,8 +6,9 @@ import (
"fmt" "fmt"
"github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/google/uuid" "github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/jmoiron/sqlx" "github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/pkg/errors" "github.com/pkg/errors"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
@@ -15,42 +16,33 @@ import (
// Repo handles DDL and DML ops on ingestion rules // Repo handles DDL and DML ops on ingestion rules
type Repo struct { type Repo struct {
db *sqlx.DB store sqlstore.SQLStore
} }
func (r *Repo) GetConfigHistory( func (r *Repo) GetConfigHistory(
ctx context.Context, typ ElementTypeDef, limit int, ctx context.Context, orgId string, typ types.ElementTypeDef, limit int,
) ([]ConfigVersion, *model.ApiError) { ) ([]types.AgentConfigVersion, *model.ApiError) {
var c []ConfigVersion var c []types.AgentConfigVersion
err := r.db.SelectContext(ctx, &c, fmt.Sprintf(`SELECT err := r.store.BunDB().NewSelect().
version, Model(&c).
id, ColumnExpr("id, version, element_type, active, is_valid, disabled, deploy_status, deploy_result, created_at").
element_type, ColumnExpr("COALESCE(created_by, '') as created_by").
COALESCE(created_by, -1) as created_by, ColumnExpr(`COALESCE((SELECT NAME FROM users WHERE users.id = acv.created_by), 'unknown') as created_by_name`).
created_at, ColumnExpr("COALESCE(last_hash, '') as last_hash, COALESCE(last_config, '{}') as last_config").
COALESCE((SELECT NAME FROM users Where("acv.element_type = ?", typ).
WHERE id = v.created_by), "unknown") created_by_name, Where("acv.org_id = ?", orgId).
active, OrderExpr("acv.created_at DESC, acv.version DESC").
is_valid, Limit(limit).
disabled, Scan(ctx)
deploy_status,
deploy_result,
coalesce(last_hash, '') as last_hash,
coalesce(last_config, '{}') as last_config
FROM agent_config_versions AS v
WHERE element_type = $1
ORDER BY created_at desc, version desc
limit %v`, limit),
typ)
if err != nil { if err != nil {
return nil, model.InternalError(err) return nil, model.InternalError(err)
} }
incompleteStatuses := []DeployStatus{DeployInitiated, Deploying} incompleteStatuses := []types.DeployStatus{types.DeployInitiated, types.Deploying}
for idx := 1; idx < len(c); idx++ { for idx := 1; idx < len(c); idx++ {
if slices.Contains(incompleteStatuses, c[idx].DeployStatus) { if slices.Contains(incompleteStatuses, c[idx].DeployStatus) {
c[idx].DeployStatus = DeployStatusUnknown c[idx].DeployStatus = types.DeployStatusUnknown
} }
} }
@@ -58,32 +50,24 @@ func (r *Repo) GetConfigHistory(
} }
func (r *Repo) GetConfigVersion( func (r *Repo) GetConfigVersion(
ctx context.Context, typ ElementTypeDef, v int, ctx context.Context, orgId string, typ types.ElementTypeDef, v int,
) (*ConfigVersion, *model.ApiError) { ) (*types.AgentConfigVersion, *model.ApiError) {
var c ConfigVersion var c types.AgentConfigVersion
err := r.db.GetContext(ctx, &c, `SELECT err := r.store.BunDB().NewSelect().
id, Model(&c).
version, ColumnExpr("id, version, element_type, active, is_valid, disabled, deploy_status, deploy_result, created_at").
element_type, ColumnExpr("COALESCE(created_by, '') as created_by").
COALESCE(created_by, -1) as created_by, ColumnExpr(`COALESCE((SELECT NAME FROM users WHERE users.id = acv.created_by), 'unknown') as created_by_name`).
created_at, ColumnExpr("COALESCE(last_hash, '') as last_hash, COALESCE(last_config, '{}') as last_config").
COALESCE((SELECT NAME FROM users Where("acv.element_type = ?", typ).
WHERE id = v.created_by), "unknown") created_by_name, Where("acv.version = ?", v).
active, Where("acv.org_id = ?", orgId).
is_valid, Scan(ctx)
disabled,
deploy_status,
deploy_result,
coalesce(last_hash, '') as last_hash,
coalesce(last_config, '{}') as last_config
FROM agent_config_versions v
WHERE element_type = $1
AND version = $2`, typ, v)
if err == sql.ErrNoRows {
return nil, model.NotFoundError(err)
}
if err != nil { if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, model.NotFoundError(err)
}
return nil, model.InternalError(err) return nil, model.InternalError(err)
} }
@@ -91,33 +75,23 @@ func (r *Repo) GetConfigVersion(
} }
func (r *Repo) GetLatestVersion( func (r *Repo) GetLatestVersion(
ctx context.Context, typ ElementTypeDef, ctx context.Context, orgId string, typ types.ElementTypeDef,
) (*ConfigVersion, *model.ApiError) { ) (*types.AgentConfigVersion, *model.ApiError) {
var c ConfigVersion var c types.AgentConfigVersion
err := r.db.GetContext(ctx, &c, `SELECT err := r.store.BunDB().NewSelect().
id, Model(&c).
version, ColumnExpr("id, version, element_type, active, is_valid, disabled, deploy_status, deploy_result, created_at").
element_type, ColumnExpr("COALESCE(created_by, '') as created_by").
COALESCE(created_by, -1) as created_by, ColumnExpr(`COALESCE((SELECT NAME FROM users WHERE users.id = acv.created_by), 'unknown') as created_by_name`).
created_at, Where("acv.element_type = ?", typ).
COALESCE((SELECT NAME FROM users Where("acv.org_id = ?", orgId).
WHERE id = v.created_by), "unknown") created_by_name, Where("version = (SELECT MAX(version) FROM agent_config_versions WHERE acv.element_type = ?)", typ).
active, Scan(ctx)
is_valid,
disabled,
deploy_status,
deploy_result
FROM agent_config_versions AS v
WHERE element_type = $1
AND version = (
SELECT MAX(version)
FROM agent_config_versions
WHERE element_type=$2)`, typ, typ)
if err == sql.ErrNoRows {
return nil, model.NotFoundError(err)
}
if err != nil { if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, model.NotFoundError(err)
}
return nil, model.InternalError(err) return nil, model.InternalError(err)
} }
@@ -125,7 +99,7 @@ func (r *Repo) GetLatestVersion(
} }
func (r *Repo) insertConfig( func (r *Repo) insertConfig(
ctx context.Context, userId string, c *ConfigVersion, elements []string, ctx context.Context, orgId string, userId string, c *types.AgentConfigVersion, elements []string,
) (fnerr *model.ApiError) { ) (fnerr *model.ApiError) {
if string(c.ElementType) == "" { if string(c.ElementType) == "" {
@@ -135,7 +109,7 @@ func (r *Repo) insertConfig(
} }
// allowing empty elements for logs - use case is deleting all pipelines // allowing empty elements for logs - use case is deleting all pipelines
if len(elements) == 0 && c.ElementType != ElementTypeLogPipelines { if len(elements) == 0 && c.ElementType != types.ElementTypeLogPipelines {
zap.L().Error("insert config called with no elements ", zap.String("ElementType", string(c.ElementType))) zap.L().Error("insert config called with no elements ", zap.String("ElementType", string(c.ElementType)))
return model.BadRequest(fmt.Errorf("config must have atleast one element")) return model.BadRequest(fmt.Errorf("config must have atleast one element"))
} }
@@ -150,14 +124,14 @@ func (r *Repo) insertConfig(
)) ))
} }
configVersion, err := r.GetLatestVersion(ctx, c.ElementType) configVersion, err := r.GetLatestVersion(ctx, orgId, c.ElementType)
if err != nil && err.Type() != model.ErrorNotFound { if err != nil && err.Type() != model.ErrorNotFound {
zap.L().Error("failed to fetch latest config version", zap.Error(err)) zap.L().Error("failed to fetch latest config version", zap.Error(err))
return model.InternalError(fmt.Errorf("failed to fetch latest config version")) return model.InternalError(fmt.Errorf("failed to fetch latest config version"))
} }
if configVersion != nil { if configVersion != nil {
c.Version = updateVersion(configVersion.Version) c.Version = types.UpdateVersion(configVersion.Version)
} else { } else {
// first version // first version
c.Version = 1 c.Version = 1
@@ -166,57 +140,43 @@ func (r *Repo) insertConfig(
defer func() { defer func() {
if fnerr != nil { if fnerr != nil {
// remove all the damage (invalid rows from db) // remove all the damage (invalid rows from db)
_, _ = r.db.Exec("DELETE FROM agent_config_versions WHERE id = $1", c.ID) r.store.BunDB().NewDelete().Model((*types.AgentConfigVersion)(nil)).Where("id = ?", c.ID).Where("org_id = ?", orgId).Exec(ctx)
_, _ = r.db.Exec("DELETE FROM agent_config_elements WHERE version_id=$1", c.ID) r.store.BunDB().NewDelete().Model((*types.AgentConfigElement)(nil)).Where("version_id = ?", c.ID).Where("org_id = ?", orgId).Exec(ctx)
} }
}() }()
// insert config // insert config
configQuery := `INSERT INTO agent_config_versions( _, dbErr := r.store.BunDB().NewInsert().
id, Model(&types.AgentConfigVersion{
version, OrgID: orgId,
created_by, Identifiable: types.Identifiable{ID: c.ID},
element_type, Version: c.Version,
active, UserAuditable: types.UserAuditable{
is_valid, CreatedBy: userId,
disabled, },
deploy_status, ElementType: c.ElementType,
deploy_result) Active: false, // default value
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)` IsValid: false, // default value
Disabled: false, // default value
_, dbErr := r.db.ExecContext(ctx, DeployStatus: c.DeployStatus,
configQuery, DeployResult: c.DeployResult,
c.ID, }).
c.Version, Exec(ctx)
userId,
c.ElementType,
false,
false,
false,
c.DeployStatus,
c.DeployResult)
if dbErr != nil { if dbErr != nil {
zap.L().Error("error in inserting config version: ", zap.Error(dbErr)) zap.L().Error("error in inserting config version: ", zap.Error(dbErr))
return model.InternalError(errors.Wrap(dbErr, "failed to insert ingestion rule")) return model.InternalError(errors.Wrap(dbErr, "failed to insert ingestion rule"))
} }
elementsQuery := `INSERT INTO agent_config_elements(
id,
version_id,
element_type,
element_id)
VALUES ($1, $2, $3, $4)`
for _, e := range elements { for _, e := range elements {
_, dbErr = r.db.ExecContext( agentConfigElement := &types.AgentConfigElement{
ctx, OrgID: orgId,
elementsQuery, Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
uuid.NewString(), VersionID: c.ID.StringValue(),
c.ID, ElementType: string(c.ElementType),
c.ElementType, ElementID: e,
e, }
) _, dbErr = r.store.BunDB().NewInsert().Model(agentConfigElement).Exec(ctx)
if dbErr != nil { if dbErr != nil {
return model.InternalError(dbErr) return model.InternalError(dbErr)
} }
@@ -226,40 +186,43 @@ func (r *Repo) insertConfig(
} }
func (r *Repo) updateDeployStatus(ctx context.Context, func (r *Repo) updateDeployStatus(ctx context.Context,
elementType ElementTypeDef, orgId string,
elementType types.ElementTypeDef,
version int, version int,
status string, status string,
result string, result string,
lastHash string, lastHash string,
lastconf string) *model.ApiError { lastconf string) *model.ApiError {
updateQuery := `UPDATE agent_config_versions _, err := r.store.BunDB().NewUpdate().
set deploy_status = $1, Model((*types.AgentConfigVersion)(nil)).
deploy_result = $2, Set("deploy_status = ?", status).
last_hash = COALESCE($3, last_hash), Set("deploy_result = ?", result).
last_config = $4 Set("last_hash = COALESCE(?, last_hash)", lastHash).
WHERE version=$5 Set("last_config = ?", lastconf).
AND element_type = $6` Where("version = ?", version).
Where("element_type = ?", elementType).
_, err := r.db.ExecContext(ctx, updateQuery, status, result, lastHash, lastconf, version, string(elementType)) Where("org_id = ?", orgId).
Exec(ctx)
if err != nil { if err != nil {
zap.L().Error("failed to update deploy status", zap.Error(err)) zap.L().Error("failed to update deploy status", zap.Error(err))
return model.BadRequest(fmt.Errorf("failed to update deploy status")) return model.BadRequest(fmt.Errorf("failed to update deploy status"))
} }
return nil return nil
} }
func (r *Repo) updateDeployStatusByHash( func (r *Repo) updateDeployStatusByHash(
ctx context.Context, confighash string, status string, result string, ctx context.Context, orgId string, confighash string, status string, result string,
) *model.ApiError { ) *model.ApiError {
updateQuery := `UPDATE agent_config_versions _, err := r.store.BunDB().NewUpdate().
set deploy_status = $1, Model((*types.AgentConfigVersion)(nil)).
deploy_result = $2 Set("deploy_status = ?", status).
WHERE last_hash=$4` Set("deploy_result = ?", result).
Where("last_hash = ?", confighash).
_, err := r.db.ExecContext(ctx, updateQuery, status, result, confighash) Where("org_id = ?", orgId).
Exec(ctx)
if err != nil { if err != nil {
zap.L().Error("failed to update deploy status", zap.Error(err)) zap.L().Error("failed to update deploy status", zap.Error(err))
return model.InternalError(errors.Wrap(err, "failed to update deploy status")) return model.InternalError(errors.Wrap(err, "failed to update deploy status"))

View File

@@ -12,8 +12,9 @@ import (
filterprocessor "github.com/SigNoz/signoz/pkg/query-service/app/opamp/otelconfig/filterprocessor" filterprocessor "github.com/SigNoz/signoz/pkg/query-service/app/opamp/otelconfig/filterprocessor"
tsp "github.com/SigNoz/signoz/pkg/query-service/app/opamp/otelconfig/tailsampler" tsp "github.com/SigNoz/signoz/pkg/query-service/app/opamp/otelconfig/tailsampler"
"github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors" "github.com/pkg/errors"
"go.uber.org/zap" "go.uber.org/zap"
yaml "gopkg.in/yaml.v3" yaml "gopkg.in/yaml.v3"
@@ -39,7 +40,7 @@ type Manager struct {
} }
type ManagerOptions struct { type ManagerOptions struct {
DB *sqlx.DB Store sqlstore.SQLStore
// When acting as opamp.AgentConfigProvider, agent conf recommendations are // When acting as opamp.AgentConfigProvider, agent conf recommendations are
// applied to the base conf in the order the features have been specified here. // applied to the base conf in the order the features have been specified here.
@@ -60,7 +61,7 @@ func Initiate(options *ManagerOptions) (*Manager, error) {
} }
m = &Manager{ m = &Manager{
Repo: Repo{options.DB}, Repo: Repo{options.Store},
agentFeatures: options.AgentFeatures, agentFeatures: options.AgentFeatures,
configSubscribers: map[string]func(){}, configSubscribers: map[string]func(){},
} }
@@ -90,7 +91,7 @@ func (m *Manager) notifyConfigUpdateSubscribers() {
} }
// Implements opamp.AgentConfigProvider // Implements opamp.AgentConfigProvider
func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) ( func (m *Manager) RecommendAgentConfig(orgId string, currentConfYaml []byte) (
recommendedConfYaml []byte, recommendedConfYaml []byte,
// Opaque id of the recommended config, used for reporting deployment status updates // Opaque id of the recommended config, used for reporting deployment status updates
configId string, configId string,
@@ -100,13 +101,13 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) (
settingVersionsUsed := []string{} settingVersionsUsed := []string{}
for _, feature := range m.agentFeatures { for _, feature := range m.agentFeatures {
featureType := ElementTypeDef(feature.AgentFeatureType()) featureType := types.ElementTypeDef(feature.AgentFeatureType())
latestConfig, apiErr := GetLatestVersion(context.Background(), featureType) latestConfig, apiErr := GetLatestVersion(context.Background(), orgId, featureType)
if apiErr != nil && apiErr.Type() != model.ErrorNotFound { if apiErr != nil && apiErr.Type() != model.ErrorNotFound {
return nil, "", errors.Wrap(apiErr.ToError(), "failed to get latest agent config version") return nil, "", errors.Wrap(apiErr.ToError(), "failed to get latest agent config version")
} }
updatedConf, serializedSettingsUsed, apiErr := feature.RecommendAgentConfig(recommendation, latestConfig) updatedConf, serializedSettingsUsed, apiErr := feature.RecommendAgentConfig(orgId, recommendation, latestConfig)
if apiErr != nil { if apiErr != nil {
return nil, "", errors.Wrap(apiErr.ToError(), fmt.Sprintf( return nil, "", errors.Wrap(apiErr.ToError(), fmt.Sprintf(
"failed to generate agent config recommendation for %s", featureType, "failed to generate agent config recommendation for %s", featureType,
@@ -129,9 +130,10 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) (
_ = m.updateDeployStatus( _ = m.updateDeployStatus(
context.Background(), context.Background(),
orgId,
featureType, featureType,
configVersion, configVersion,
string(DeployInitiated), string(types.DeployInitiated),
"Deployment has started", "Deployment has started",
configId, configId,
serializedSettingsUsed, serializedSettingsUsed,
@@ -154,52 +156,53 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) (
// Implements opamp.AgentConfigProvider // Implements opamp.AgentConfigProvider
func (m *Manager) ReportConfigDeploymentStatus( func (m *Manager) ReportConfigDeploymentStatus(
orgId string,
agentId string, agentId string,
configId string, configId string,
err error, err error,
) { ) {
featureConfigIds := strings.Split(configId, ",") featureConfigIds := strings.Split(configId, ",")
for _, featureConfId := range featureConfigIds { for _, featureConfId := range featureConfigIds {
newStatus := string(Deployed) newStatus := string(types.Deployed)
message := "Deployment was successful" message := "Deployment was successful"
if err != nil { if err != nil {
newStatus = string(DeployFailed) newStatus = string(types.DeployFailed)
message = fmt.Sprintf("%s: %s", agentId, err.Error()) message = fmt.Sprintf("%s: %s", agentId, err.Error())
} }
_ = m.updateDeployStatusByHash( _ = m.updateDeployStatusByHash(
context.Background(), featureConfId, newStatus, message, context.Background(), orgId, featureConfId, newStatus, message,
) )
} }
} }
func GetLatestVersion( func GetLatestVersion(
ctx context.Context, elementType ElementTypeDef, ctx context.Context, orgId string, elementType types.ElementTypeDef,
) (*ConfigVersion, *model.ApiError) { ) (*types.AgentConfigVersion, *model.ApiError) {
return m.GetLatestVersion(ctx, elementType) return m.GetLatestVersion(ctx, orgId, elementType)
} }
func GetConfigVersion( func GetConfigVersion(
ctx context.Context, elementType ElementTypeDef, version int, ctx context.Context, orgId string, elementType types.ElementTypeDef, version int,
) (*ConfigVersion, *model.ApiError) { ) (*types.AgentConfigVersion, *model.ApiError) {
return m.GetConfigVersion(ctx, elementType, version) return m.GetConfigVersion(ctx, orgId, elementType, version)
} }
func GetConfigHistory( func GetConfigHistory(
ctx context.Context, typ ElementTypeDef, limit int, ctx context.Context, orgId string, typ types.ElementTypeDef, limit int,
) ([]ConfigVersion, *model.ApiError) { ) ([]types.AgentConfigVersion, *model.ApiError) {
return m.GetConfigHistory(ctx, typ, limit) return m.GetConfigHistory(ctx, orgId, typ, limit)
} }
// StartNewVersion launches a new config version for given set of elements // StartNewVersion launches a new config version for given set of elements
func StartNewVersion( func StartNewVersion(
ctx context.Context, userId string, eleType ElementTypeDef, elementIds []string, ctx context.Context, orgId string, userId string, eleType types.ElementTypeDef, elementIds []string,
) (*ConfigVersion, *model.ApiError) { ) (*types.AgentConfigVersion, *model.ApiError) {
// create a new version // create a new version
cfg := NewConfigVersion(eleType) cfg := types.NewAgentConfigVersion(orgId, eleType)
// insert new config and elements into database // insert new config and elements into database
err := m.insertConfig(ctx, userId, cfg, elementIds) err := m.insertConfig(ctx, orgId, userId, cfg, elementIds)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -213,22 +216,22 @@ func NotifyConfigUpdate(ctx context.Context) {
m.notifyConfigUpdateSubscribers() m.notifyConfigUpdateSubscribers()
} }
func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiError { func Redeploy(ctx context.Context, orgId string, typ types.ElementTypeDef, version int) *model.ApiError {
configVersion, err := GetConfigVersion(ctx, typ, version) configVersion, err := GetConfigVersion(ctx, orgId, typ, version)
if err != nil { if err != nil {
zap.L().Error("failed to fetch config version during redeploy", zap.Error(err)) zap.L().Error("failed to fetch config version during redeploy", zap.Error(err))
return model.WrapApiError(err, "failed to fetch details of the config version") return model.WrapApiError(err, "failed to fetch details of the config version")
} }
if configVersion == nil || (configVersion != nil && configVersion.LastConf == "") { if configVersion == nil || (configVersion != nil && configVersion.LastConfig == "") {
zap.L().Debug("config version has no conf yaml", zap.Any("configVersion", configVersion)) zap.L().Debug("config version has no conf yaml", zap.Any("configVersion", configVersion))
return model.BadRequest(fmt.Errorf("the config version can not be redeployed")) return model.BadRequest(fmt.Errorf("the config version can not be redeployed"))
} }
switch typ { switch typ {
case ElementTypeSamplingRules: case types.ElementTypeSamplingRules:
var config *tsp.Config var config *tsp.Config
if err := yaml.Unmarshal([]byte(configVersion.LastConf), &config); err != nil { if err := yaml.Unmarshal([]byte(configVersion.LastConfig), &config); err != nil {
zap.L().Debug("failed to read last conf correctly", zap.Error(err)) zap.L().Debug("failed to read last conf correctly", zap.Error(err))
return model.BadRequest(fmt.Errorf("failed to read the stored config correctly")) return model.BadRequest(fmt.Errorf("failed to read the stored config correctly"))
} }
@@ -245,10 +248,10 @@ func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiEr
return model.InternalError(fmt.Errorf("failed to deploy the config")) return model.InternalError(fmt.Errorf("failed to deploy the config"))
} }
_ = m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, configVersion.LastConf) m.updateDeployStatus(ctx, orgId, types.ElementTypeSamplingRules, version, string(types.DeployInitiated), "Deployment started", configHash, configVersion.LastConfig)
case ElementTypeDropRules: case types.ElementTypeDropRules:
var filterConfig *filterprocessor.Config var filterConfig *filterprocessor.Config
if err := yaml.Unmarshal([]byte(configVersion.LastConf), &filterConfig); err != nil { if err := yaml.Unmarshal([]byte(configVersion.LastConfig), &filterConfig); err != nil {
zap.L().Error("failed to read last conf correctly", zap.Error(err)) zap.L().Error("failed to read last conf correctly", zap.Error(err))
return model.InternalError(fmt.Errorf("failed to read the stored config correctly")) return model.InternalError(fmt.Errorf("failed to read the stored config correctly"))
} }
@@ -263,14 +266,14 @@ func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiEr
return err return err
} }
_ = m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, configVersion.LastConf) m.updateDeployStatus(ctx, orgId, types.ElementTypeSamplingRules, version, string(types.DeployInitiated), "Deployment started", configHash, configVersion.LastConfig)
} }
return nil return nil
} }
// UpsertFilterProcessor updates the agent config with new filter processor params // UpsertFilterProcessor updates the agent config with new filter processor params
func UpsertFilterProcessor(ctx context.Context, version int, config *filterprocessor.Config) error { func UpsertFilterProcessor(ctx context.Context, orgId string, version int, config *filterprocessor.Config) error {
if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) { if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) {
return fmt.Errorf("agent updater is busy") return fmt.Errorf("agent updater is busy")
} }
@@ -294,7 +297,7 @@ func UpsertFilterProcessor(ctx context.Context, version int, config *filterproce
zap.L().Warn("unexpected error while transforming processor config to yaml", zap.Error(yamlErr)) zap.L().Warn("unexpected error while transforming processor config to yaml", zap.Error(yamlErr))
} }
_ = m.updateDeployStatus(ctx, ElementTypeDropRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml)) m.updateDeployStatus(ctx, orgId, types.ElementTypeDropRules, version, string(types.DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
return nil return nil
} }
@@ -303,9 +306,9 @@ func UpsertFilterProcessor(ctx context.Context, version int, config *filterproce
// successful deployment if no error is received. // successful deployment if no error is received.
// this method is currently expected to be called only once in the lifecycle // this method is currently expected to be called only once in the lifecycle
// but can be improved in future to accept continuous request status updates from opamp // but can be improved in future to accept continuous request status updates from opamp
func (m *Manager) OnConfigUpdate(agentId string, hash string, err error) { func (m *Manager) OnConfigUpdate(orgId string, agentId string, hash string, err error) {
status := string(Deployed) status := string(types.Deployed)
message := "Deployment was successful" message := "Deployment was successful"
@@ -314,15 +317,15 @@ func (m *Manager) OnConfigUpdate(agentId string, hash string, err error) {
}() }()
if err != nil { if err != nil {
status = string(DeployFailed) status = string(types.DeployFailed)
message = fmt.Sprintf("%s: %s", agentId, err.Error()) message = fmt.Sprintf("%s: %s", agentId, err.Error())
} }
_ = m.updateDeployStatusByHash(context.Background(), hash, status, message) _ = m.updateDeployStatusByHash(context.Background(), orgId, hash, status, message)
} }
// UpsertSamplingProcessor updates the agent config with new filter processor params // UpsertSamplingProcessor updates the agent config with new filter processor params
func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Config) error { func UpsertSamplingProcessor(ctx context.Context, orgId string, version int, config *tsp.Config) error {
if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) { if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) {
return fmt.Errorf("agent updater is busy") return fmt.Errorf("agent updater is busy")
} }
@@ -345,6 +348,6 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi
zap.L().Warn("unexpected error while transforming processor config to yaml", zap.Error(yamlErr)) zap.L().Warn("unexpected error while transforming processor config to yaml", zap.Error(yamlErr))
} }
_ = m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml)) m.updateDeployStatus(ctx, orgId, types.ElementTypeSamplingRules, version, string(types.DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
return nil return nil
} }

View File

@@ -1,72 +1,10 @@
package agentConf package agentConf
import ( import "github.com/SigNoz/signoz/pkg/types"
"time"
"github.com/google/uuid"
)
type ElementTypeDef string
const (
ElementTypeSamplingRules ElementTypeDef = "sampling_rules"
ElementTypeDropRules ElementTypeDef = "drop_rules"
ElementTypeLogPipelines ElementTypeDef = "log_pipelines"
ElementTypeLbExporter ElementTypeDef = "lb_exporter"
)
type DeployStatus string
const (
PendingDeploy DeployStatus = "DIRTY"
Deploying DeployStatus = "DEPLOYING"
Deployed DeployStatus = "DEPLOYED"
DeployInitiated DeployStatus = "IN_PROGRESS"
DeployFailed DeployStatus = "FAILED"
DeployStatusUnknown DeployStatus = "UNKNOWN"
)
type ConfigVersion struct {
ID string `json:"id" db:"id"`
Version int `json:"version" db:"version"`
ElementType ElementTypeDef `json:"elementType" db:"element_type"`
Active bool `json:"active" db:"active"`
IsValid bool `json:"is_valid" db:"is_valid"`
Disabled bool `json:"disabled" db:"disabled"`
DeployStatus DeployStatus `json:"deployStatus" db:"deploy_status"`
DeployResult string `json:"deployResult" db:"deploy_result"`
LastHash string `json:"lastHash" db:"last_hash"`
LastConf string `json:"lastConf" db:"last_config"`
CreatedBy string `json:"createdBy" db:"created_by"`
CreatedByName string `json:"createdByName" db:"created_by_name"`
CreatedAt time.Time `json:"createdAt" db:"created_at"`
}
func NewConfigVersion(typeDef ElementTypeDef) *ConfigVersion {
return &ConfigVersion{
ID: uuid.NewString(),
ElementType: typeDef,
Active: false,
IsValid: false,
Disabled: false,
DeployStatus: PendingDeploy,
LastHash: "",
LastConf: "{}",
// todo: get user id from context?
// CreatedBy
}
}
func updateVersion(v int) int {
return v + 1
}
type ConfigElements struct { type ConfigElements struct {
VersionID string VersionID string
Version int Version int
ElementType ElementTypeDef ElementType types.ElementTypeDef
ElementId string ElementId string
} }

View File

@@ -17,6 +17,10 @@ import (
"time" "time"
"github.com/SigNoz/signoz/pkg/query-service/model/metrics_explorer" "github.com/SigNoz/signoz/pkg/query-service/model/metrics_explorer"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
@@ -36,7 +40,6 @@ import (
"github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/SigNoz/signoz/pkg/cache" "github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/jmoiron/sqlx"
promModel "github.com/prometheus/common/model" promModel "github.com/prometheus/common/model"
"go.uber.org/zap" "go.uber.org/zap"
@@ -120,7 +123,7 @@ var (
// SpanWriter for reading spans from ClickHouse // SpanWriter for reading spans from ClickHouse
type ClickHouseReader struct { type ClickHouseReader struct {
db clickhouse.Conn db clickhouse.Conn
localDB *sqlx.DB sqlDB sqlstore.SQLStore
TraceDB string TraceDB string
operationsTable string operationsTable string
durationTable string durationTable string
@@ -174,7 +177,7 @@ type ClickHouseReader struct {
// NewTraceReader returns a TraceReader for the database // NewTraceReader returns a TraceReader for the database
func NewReader( func NewReader(
localDB *sqlx.DB, sqlDB sqlstore.SQLStore,
db driver.Conn, db driver.Conn,
configFile string, configFile string,
featureFlag interfaces.FeatureLookup, featureFlag interfaces.FeatureLookup,
@@ -185,13 +188,13 @@ func NewReader(
cache cache.Cache, cache cache.Cache,
) *ClickHouseReader { ) *ClickHouseReader {
options := NewOptions(primaryNamespace, archiveNamespace) options := NewOptions(primaryNamespace, archiveNamespace)
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) return NewReaderFromClickhouseConnection(db, options, sqlDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
} }
func NewReaderFromClickhouseConnection( func NewReaderFromClickhouseConnection(
db driver.Conn, db driver.Conn,
options *Options, options *Options,
localDB *sqlx.DB, sqlDB sqlstore.SQLStore,
configFile string, configFile string,
featureFlag interfaces.FeatureLookup, featureFlag interfaces.FeatureLookup,
cluster string, cluster string,
@@ -216,7 +219,7 @@ func NewReaderFromClickhouseConnection(
return &ClickHouseReader{ return &ClickHouseReader{
db: db, db: db,
localDB: localDB, sqlDB: sqlDB,
TraceDB: options.primary.TraceDB, TraceDB: options.primary.TraceDB,
operationsTable: options.primary.OperationsTable, operationsTable: options.primary.OperationsTable,
indexTable: options.primary.IndexTable, indexTable: options.primary.IndexTable,
@@ -1839,7 +1842,7 @@ func getLocalTableName(tableName string) string {
} }
func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// Keep only latest 100 transactions/requests // Keep only latest 100 transactions/requests
r.deleteTtlTransactions(ctx, 100) r.deleteTtlTransactions(ctx, 100)
// uuid is used as transaction id // uuid is used as transaction id
@@ -1855,7 +1858,7 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa
// check if there is existing things to be done // check if there is existing things to be done
for _, tableName := range tableNameArray { for _, tableName := range tableNameArray {
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err != nil { if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
} }
@@ -1897,7 +1900,27 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa
// we will change ttl for only the new parts and not the old ones // we will change ttl for only the new parts and not the old ones
query += " SETTINGS materialize_ttl_after_modify=0" query += " SETTINGS materialize_ttl_after_modify=0"
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
_, dbErr := r.
sqlDB.
BunDB().
NewInsert().
Model(&ttl).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr)) zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
return return
@@ -1906,9 +1929,17 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil { if err != nil {
zap.L().Error("error in setting cold storage", zap.Error(err)) zap.L().Error("error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err == nil { if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.Identifiable.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@@ -1917,17 +1948,33 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa
return return
} }
zap.L().Info("Executing TTL request: ", zap.String("request", query)) zap.L().Info("Executing TTL request: ", zap.String("request", query))
statusItem, _ := r.checkTTLStatusItem(ctx, tableName) statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
if err := r.db.Exec(ctx, query); err != nil { if err := r.db.Exec(ctx, query); err != nil {
zap.L().Error("error while setting ttl", zap.Error(err)) zap.L().Error("error while setting ttl", zap.Error(err))
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.Identifiable.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
} }
return return
} }
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) _, dbErr = r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.Identifiable.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@@ -1938,7 +1985,7 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
} }
func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// uuid is used as transaction id // uuid is used as transaction id
uuidWithHyphen := uuid.New() uuidWithHyphen := uuid.New()
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
@@ -1958,7 +2005,7 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL
// check if there is existing things to be done // check if there is existing things to be done
for _, tableName := range tableNames { for _, tableName := range tableNames {
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err != nil { if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
} }
@@ -1985,11 +2032,32 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL
timestamp = "end" timestamp = "end"
} }
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
_, dbErr := r.
sqlDB.
BunDB().
NewInsert().
Model(&ttl).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in inserting to ttl_status table", zap.Error(dbErr)) zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
return return
} }
req := fmt.Sprintf(ttlV2, tableName, r.cluster, timestamp, params.DelDuration) req := fmt.Sprintf(ttlV2, tableName, r.cluster, timestamp, params.DelDuration)
if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) { if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) {
req = fmt.Sprintf(ttlV2Resource, tableName, r.cluster, params.DelDuration) req = fmt.Sprintf(ttlV2Resource, tableName, r.cluster, params.DelDuration)
@@ -2005,9 +2073,17 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil { if err != nil {
zap.L().Error("Error in setting cold storage", zap.Error(err)) zap.L().Error("Error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err == nil { if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.Identifiable.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@@ -2017,17 +2093,33 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL
} }
req += " SETTINGS materialize_ttl_after_modify=0;" req += " SETTINGS materialize_ttl_after_modify=0;"
zap.L().Error(" ExecutingTTL request: ", zap.String("request", req)) zap.L().Error(" ExecutingTTL request: ", zap.String("request", req))
statusItem, _ := r.checkTTLStatusItem(ctx, tableName) statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
if err := r.db.Exec(ctx, req); err != nil { if err := r.db.Exec(ctx, req); err != nil {
zap.L().Error("Error in executing set TTL query", zap.Error(err)) zap.L().Error("Error in executing set TTL query", zap.Error(err))
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.Identifiable.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
} }
return return
} }
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) _, dbErr = r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.Identifiable.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@@ -2040,7 +2132,7 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL
// SetTTL sets the TTL for traces or metrics or logs tables. // SetTTL sets the TTL for traces or metrics or logs tables.
// This is an async API which creates goroutines to set TTL. // This is an async API which creates goroutines to set TTL.
// Status of TTL update is tracked with ttl_status table in sqlite db. // Status of TTL update is tracked with ttl_status table in sqlite db.
func (r *ClickHouseReader) SetTTL(ctx context.Context, func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string,
params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// Keep only latest 100 transactions/requests // Keep only latest 100 transactions/requests
r.deleteTtlTransactions(ctx, 100) r.deleteTtlTransactions(ctx, 100)
@@ -2056,7 +2148,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
switch params.Type { switch params.Type {
case constants.TraceTTL: case constants.TraceTTL:
if r.useTraceNewSchema { if r.useTraceNewSchema {
return r.SetTTLTracesV2(ctx, params) return r.SetTTLTracesV2(ctx, orgID, params)
} }
tableNames := []string{ tableNames := []string{
@@ -2069,7 +2161,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
} }
for _, tableName := range tableNames { for _, tableName := range tableNames {
tableName := getLocalTableName(tableName) tableName := getLocalTableName(tableName)
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err != nil { if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
} }
@@ -2081,9 +2173,29 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
tableName := getLocalTableName(tableName) tableName := getLocalTableName(tableName)
// TODO: DB queries should be implemented with transactional statements but currently clickhouse doesn't support them. Issue: https://github.com/ClickHouse/ClickHouse/issues/22086 // TODO: DB queries should be implemented with transactional statements but currently clickhouse doesn't support them. Issue: https://github.com/ClickHouse/ClickHouse/issues/22086
go func(tableName string) { go func(tableName string) {
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
_, dbErr := r.
sqlDB.
BunDB().
NewInsert().
Model(&ttl).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in inserting to ttl_status table", zap.Error(dbErr)) zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
return return
} }
req := fmt.Sprintf( req := fmt.Sprintf(
@@ -2096,9 +2208,17 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil { if err != nil {
zap.L().Error("Error in setting cold storage", zap.Error(err)) zap.L().Error("Error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err == nil { if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@@ -2108,17 +2228,33 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
} }
req += " SETTINGS materialize_ttl_after_modify=0;" req += " SETTINGS materialize_ttl_after_modify=0;"
zap.L().Error("Executing TTL request: ", zap.String("request", req)) zap.L().Error("Executing TTL request: ", zap.String("request", req))
statusItem, _ := r.checkTTLStatusItem(ctx, tableName) statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
if err := r.db.Exec(context.Background(), req); err != nil { if err := r.db.Exec(context.Background(), req); err != nil {
zap.L().Error("Error in executing set TTL query", zap.Error(err)) zap.L().Error("Error in executing set TTL query", zap.Error(err))
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
} }
return return
} }
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) _, dbErr = r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@@ -2138,7 +2274,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
signozMetricDBName + "." + signozTSLocalTableNameV41Week, signozMetricDBName + "." + signozTSLocalTableNameV41Week,
} }
for _, tableName := range tableNames { for _, tableName := range tableNames {
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err != nil { if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
} }
@@ -2147,9 +2283,29 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
} }
} }
metricTTL := func(tableName string) { metricTTL := func(tableName string) {
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
_, dbErr := r.
sqlDB.
BunDB().
NewInsert().
Model(&ttl).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in inserting to ttl_status table", zap.Error(dbErr)) zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
return return
} }
timeColumn := "timestamp_ms" timeColumn := "timestamp_ms"
@@ -2168,9 +2324,17 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil { if err != nil {
zap.L().Error("Error in setting cold storage", zap.Error(err)) zap.L().Error("Error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err == nil { if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.Identifiable.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@@ -2180,17 +2344,33 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
} }
req += " SETTINGS materialize_ttl_after_modify=0" req += " SETTINGS materialize_ttl_after_modify=0"
zap.L().Info("Executing TTL request: ", zap.String("request", req)) zap.L().Info("Executing TTL request: ", zap.String("request", req))
statusItem, _ := r.checkTTLStatusItem(ctx, tableName) statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
if err := r.db.Exec(ctx, req); err != nil { if err := r.db.Exec(ctx, req); err != nil {
zap.L().Error("error while setting ttl.", zap.Error(err)) zap.L().Error("error while setting ttl.", zap.Error(err))
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.Identifiable.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
} }
return return
} }
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) _, dbErr = r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.Identifiable.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@@ -2201,11 +2381,11 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
} }
case constants.LogsTTL: case constants.LogsTTL:
if r.useLogsNewSchema { if r.useLogsNewSchema {
return r.SetTTLLogsV2(ctx, params) return r.SetTTLLogsV2(ctx, orgID, params)
} }
tableName := r.logsDB + "." + r.logsLocalTable tableName := r.logsDB + "." + r.logsLocalTable
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err != nil { if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
} }
@@ -2213,7 +2393,27 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")} return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
} }
go func(tableName string) { go func(tableName string) {
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
_, dbErr := r.
sqlDB.
BunDB().
NewInsert().
Model(&ttl).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr)) zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
return return
@@ -2229,9 +2429,17 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil { if err != nil {
zap.L().Error("error in setting cold storage", zap.Error(err)) zap.L().Error("error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err == nil { if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.Identifiable.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@@ -2241,17 +2449,33 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
} }
req += " SETTINGS materialize_ttl_after_modify=0" req += " SETTINGS materialize_ttl_after_modify=0"
zap.L().Info("Executing TTL request: ", zap.String("request", req)) zap.L().Info("Executing TTL request: ", zap.String("request", req))
statusItem, _ := r.checkTTLStatusItem(ctx, tableName) statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
if err := r.db.Exec(ctx, req); err != nil { if err := r.db.Exec(ctx, req); err != nil {
zap.L().Error("error while setting ttl", zap.Error(err)) zap.L().Error("error while setting ttl", zap.Error(err))
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.Identifiable.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
} }
return return
} }
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) _, dbErr = r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.Identifiable.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@@ -2266,47 +2490,62 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
} }
func (r *ClickHouseReader) deleteTtlTransactions(_ context.Context, numberOfTransactionsStore int) { func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, numberOfTransactionsStore int) {
_, err := r.localDB.Exec("DELETE FROM ttl_status WHERE transaction_id NOT IN (SELECT distinct transaction_id FROM ttl_status ORDER BY created_at DESC LIMIT ?)", numberOfTransactionsStore) limitTransactions := []string{}
err := r.
sqlDB.
BunDB().
NewSelect().
ColumnExpr("distinct(transaction_id)").
Model(new(types.TTLSetting)).
OrderExpr("created_at DESC").
Limit(numberOfTransactionsStore).
Scan(ctx, &limitTransactions)
if err != nil {
zap.L().Error("Error in processing ttl_status delete sql query", zap.Error(err))
}
_, err = r.
sqlDB.
BunDB().
NewDelete().
Model(new(types.TTLSetting)).
Where("transaction_id NOT IN (?)", bun.In(limitTransactions)).
Exec(ctx)
if err != nil { if err != nil {
zap.L().Error("Error in processing ttl_status delete sql query", zap.Error(err)) zap.L().Error("Error in processing ttl_status delete sql query", zap.Error(err))
} }
} }
// checkTTLStatusItem checks if ttl_status table has an entry for the given table name // checkTTLStatusItem checks if ttl_status table has an entry for the given table name
func (r *ClickHouseReader) checkTTLStatusItem(_ context.Context, tableName string) (model.TTLStatusItem, *model.ApiError) { func (r *ClickHouseReader) checkTTLStatusItem(ctx context.Context, orgID string, tableName string) (*types.TTLSetting, *model.ApiError) {
statusItem := []model.TTLStatusItem{} zap.L().Info("checkTTLStatusItem query", zap.String("tableName", tableName))
ttl := new(types.TTLSetting)
query := `SELECT id, status, ttl, cold_storage_ttl FROM ttl_status WHERE table_name = ? ORDER BY created_at DESC` err := r.
sqlDB.
zap.L().Info("checkTTLStatusItem query", zap.String("query", query), zap.String("tableName", tableName)) BunDB().
NewSelect().
stmt, err := r.localDB.Preparex(query) Model(ttl).
Where("table_name = ?", tableName).
if err != nil { Where("org_id = ?", orgID).
zap.L().Error("Error preparing query for checkTTLStatusItem", zap.Error(err)) OrderExpr("created_at DESC").
return model.TTLStatusItem{}, &model.ApiError{Typ: model.ErrorInternal, Err: err} Limit(1).
} Scan(ctx)
if err != nil && err != sql.ErrNoRows {
err = stmt.Select(&statusItem, tableName)
if len(statusItem) == 0 {
return model.TTLStatusItem{}, nil
}
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err)) zap.L().Error("Error in processing sql query", zap.Error(err))
return model.TTLStatusItem{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} return ttl, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
} }
return statusItem[0], nil return ttl, nil
} }
// setTTLQueryStatus fetches ttl_status table status from DB // setTTLQueryStatus fetches ttl_status table status from DB
func (r *ClickHouseReader) setTTLQueryStatus(ctx context.Context, tableNameArray []string) (string, *model.ApiError) { func (r *ClickHouseReader) setTTLQueryStatus(ctx context.Context, orgID string, tableNameArray []string) (string, *model.ApiError) {
failFlag := false failFlag := false
status := constants.StatusSuccess status := constants.StatusSuccess
for _, tableName := range tableNameArray { for _, tableName := range tableNameArray {
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
emptyStatusStruct := model.TTLStatusItem{} emptyStatusStruct := new(types.TTLSetting)
if statusItem == emptyStatusStruct { if statusItem == emptyStatusStruct {
return "", nil return "", nil
} }
@@ -2367,7 +2606,7 @@ func getLocalTableNameArray(tableNames []string) []string {
} }
// GetTTL returns current ttl, expected ttl and past setTTL status for metrics/traces. // GetTTL returns current ttl, expected ttl and past setTTL status for metrics/traces.
func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) { func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {
parseTTL := func(queryResp string) (int, int) { parseTTL := func(queryResp string) (int, int) {
@@ -2457,7 +2696,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable} tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable}
tableNameArray = getLocalTableNameArray(tableNameArray) tableNameArray = getLocalTableNameArray(tableNameArray)
status, err := r.setTTLQueryStatus(ctx, tableNameArray) status, err := r.setTTLQueryStatus(ctx, orgID, tableNameArray)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -2465,22 +2704,22 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
if err != nil { if err != nil {
return nil, err return nil, err
} }
ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0]) ttlQuery, err := r.checkTTLStatusItem(ctx, orgID, tableNameArray[0])
if err != nil { if err != nil {
return nil, err return nil, err
} }
ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours
if ttlQuery.ColdStorageTtl != -1 { if ttlQuery.ColdStorageTTL != -1 {
ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours ttlQuery.ColdStorageTTL = ttlQuery.ColdStorageTTL / 3600 // convert to hours
} }
delTTL, moveTTL := parseTTL(dbResp.EngineFull) delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
case constants.MetricsTTL: case constants.MetricsTTL:
tableNameArray := []string{signozMetricDBName + "." + signozSampleTableName} tableNameArray := []string{signozMetricDBName + "." + signozSampleTableName}
tableNameArray = getLocalTableNameArray(tableNameArray) tableNameArray = getLocalTableNameArray(tableNameArray)
status, err := r.setTTLQueryStatus(ctx, tableNameArray) status, err := r.setTTLQueryStatus(ctx, orgID, tableNameArray)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -2488,22 +2727,22 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
if err != nil { if err != nil {
return nil, err return nil, err
} }
ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0]) ttlQuery, err := r.checkTTLStatusItem(ctx, orgID, tableNameArray[0])
if err != nil { if err != nil {
return nil, err return nil, err
} }
ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours
if ttlQuery.ColdStorageTtl != -1 { if ttlQuery.ColdStorageTTL != -1 {
ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours ttlQuery.ColdStorageTTL = ttlQuery.ColdStorageTTL / 3600 // convert to hours
} }
delTTL, moveTTL := parseTTL(dbResp.EngineFull) delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
case constants.LogsTTL: case constants.LogsTTL:
tableNameArray := []string{r.logsDB + "." + r.logsTable} tableNameArray := []string{r.logsDB + "." + r.logsTable}
tableNameArray = getLocalTableNameArray(tableNameArray) tableNameArray = getLocalTableNameArray(tableNameArray)
status, err := r.setTTLQueryStatus(ctx, tableNameArray) status, err := r.setTTLQueryStatus(ctx, orgID, tableNameArray)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -2511,17 +2750,17 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
if err != nil { if err != nil {
return nil, err return nil, err
} }
ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0]) ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0], orgID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours
if ttlQuery.ColdStorageTtl != -1 { if ttlQuery.ColdStorageTTL != -1 {
ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours ttlQuery.ColdStorageTTL = ttlQuery.ColdStorageTTL / 3600 // convert to hours
} }
delTTL, moveTTL := parseTTL(dbResp.EngineFull) delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil return &model.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
default: default:
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v", return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v",

View File

@@ -21,6 +21,7 @@ import (
"github.com/SigNoz/signoz/pkg/alertmanager" "github.com/SigNoz/signoz/pkg/alertmanager"
errorsV2 "github.com/SigNoz/signoz/pkg/errors" errorsV2 "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render" "github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/preference"
"github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer" "github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer"
"github.com/SigNoz/signoz/pkg/signoz" "github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/valuer" "github.com/SigNoz/signoz/pkg/valuer"
@@ -44,7 +45,6 @@ import (
logsv4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4" logsv4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4"
"github.com/SigNoz/signoz/pkg/query-service/app/metrics" "github.com/SigNoz/signoz/pkg/query-service/app/metrics"
metricsv3 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v3" metricsv3 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v3"
"github.com/SigNoz/signoz/pkg/query-service/app/preferences"
"github.com/SigNoz/signoz/pkg/query-service/app/querier" "github.com/SigNoz/signoz/pkg/query-service/app/querier"
querierV2 "github.com/SigNoz/signoz/pkg/query-service/app/querier/v2" querierV2 "github.com/SigNoz/signoz/pkg/query-service/app/querier/v2"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder" "github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
@@ -142,6 +142,8 @@ type APIHandler struct {
AlertmanagerAPI *alertmanager.API AlertmanagerAPI *alertmanager.API
Signoz *signoz.SigNoz Signoz *signoz.SigNoz
Preference preference.API
} }
type APIHandlerOpts struct { type APIHandlerOpts struct {
@@ -187,6 +189,8 @@ type APIHandlerOpts struct {
AlertmanagerAPI *alertmanager.API AlertmanagerAPI *alertmanager.API
Signoz *signoz.SigNoz Signoz *signoz.SigNoz
Preference preference.API
} }
// NewAPIHandler returns an APIHandler // NewAPIHandler returns an APIHandler
@@ -257,6 +261,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
SummaryService: summaryService, SummaryService: summaryService,
AlertmanagerAPI: opts.AlertmanagerAPI, AlertmanagerAPI: opts.AlertmanagerAPI,
Signoz: opts.Signoz, Signoz: opts.Signoz,
Preference: opts.Preference,
} }
logsQueryBuilder := logsv3.PrepareLogsQuery logsQueryBuilder := logsv3.PrepareLogsQuery
@@ -1858,8 +1863,15 @@ func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) {
return return
} }
ctx := r.Context()
claims, ok := authtypes.ClaimsFromContext(ctx)
if !ok {
RespondError(w, &model.ApiError{Err: errors.New("failed to get org id from context"), Typ: model.ErrorInternal}, nil)
return
}
// Context is not used here as TTL is long duration DB operation // Context is not used here as TTL is long duration DB operation
result, apiErr := aH.reader.SetTTL(context.Background(), ttlParams) result, apiErr := aH.reader.SetTTL(context.Background(), claims.OrgID, ttlParams)
if apiErr != nil { if apiErr != nil {
if apiErr.Typ == model.ErrorConflict { if apiErr.Typ == model.ErrorConflict {
aH.HandleError(w, apiErr.Err, http.StatusConflict) aH.HandleError(w, apiErr.Err, http.StatusConflict)
@@ -1879,7 +1891,14 @@ func (aH *APIHandler) getTTL(w http.ResponseWriter, r *http.Request) {
return return
} }
result, apiErr := aH.reader.GetTTL(r.Context(), ttlParams) ctx := r.Context()
claims, ok := authtypes.ClaimsFromContext(ctx)
if !ok {
RespondError(w, &model.ApiError{Err: errors.New("failed to get org id from context"), Typ: model.ErrorInternal}, nil)
return
}
result, apiErr := aH.reader.GetTTL(r.Context(), claims.OrgID, ttlParams)
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) { if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
return return
} }
@@ -3408,132 +3427,37 @@ func (aH *APIHandler) getProducerConsumerEval(
func (aH *APIHandler) getUserPreference( func (aH *APIHandler) getUserPreference(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
preferenceId := mux.Vars(r)["preferenceId"] aH.Preference.GetUserPreference(w, r)
claims, ok := authtypes.ClaimsFromContext(r.Context())
if !ok {
render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated"))
return
}
preference, apiErr := preferences.GetUserPreference(
r.Context(), preferenceId, claims.OrgID, claims.UserID,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, preference)
} }
func (aH *APIHandler) updateUserPreference( func (aH *APIHandler) updateUserPreference(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
preferenceId := mux.Vars(r)["preferenceId"] aH.Preference.UpdateUserPreference(w, r)
claims, ok := authtypes.ClaimsFromContext(r.Context())
if !ok {
render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated"))
return
}
req := preferences.UpdatePreference{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
RespondError(w, model.BadRequest(err), nil)
return
}
preference, apiErr := preferences.UpdateUserPreference(r.Context(), preferenceId, req.PreferenceValue, claims.UserID)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, preference)
} }
func (aH *APIHandler) getAllUserPreferences( func (aH *APIHandler) getAllUserPreferences(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
claims, ok := authtypes.ClaimsFromContext(r.Context()) aH.Preference.GetAllUserPreferences(w, r)
if !ok {
render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated"))
return
}
preference, apiErr := preferences.GetAllUserPreferences(
r.Context(), claims.OrgID, claims.UserID,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, preference)
} }
func (aH *APIHandler) getOrgPreference( func (aH *APIHandler) getOrgPreference(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
preferenceId := mux.Vars(r)["preferenceId"] aH.Preference.GetOrgPreference(w, r)
claims, ok := authtypes.ClaimsFromContext(r.Context())
if !ok {
render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated"))
return
}
preference, apiErr := preferences.GetOrgPreference(
r.Context(), preferenceId, claims.OrgID,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, preference)
} }
func (aH *APIHandler) updateOrgPreference( func (aH *APIHandler) updateOrgPreference(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
preferenceId := mux.Vars(r)["preferenceId"] aH.Preference.UpdateOrgPreference(w, r)
req := preferences.UpdatePreference{}
claims, ok := authtypes.ClaimsFromContext(r.Context())
if !ok {
render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated"))
return
}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
RespondError(w, model.BadRequest(err), nil)
return
}
preference, apiErr := preferences.UpdateOrgPreference(r.Context(), preferenceId, req.PreferenceValue, claims.OrgID)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, preference)
} }
func (aH *APIHandler) getAllOrgPreferences( func (aH *APIHandler) getAllOrgPreferences(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
claims, ok := authtypes.ClaimsFromContext(r.Context()) aH.Preference.GetAllOrgPreferences(w, r)
if !ok {
render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated"))
return
}
preference, apiErr := preferences.GetAllOrgPreferences(
r.Context(), claims.OrgID,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, preference)
} }
// RegisterIntegrationRoutes Registers all Integrations // RegisterIntegrationRoutes Registers all Integrations
@@ -3812,9 +3736,14 @@ func (aH *APIHandler) InstallIntegration(
RespondError(w, model.BadRequest(err), nil) RespondError(w, model.BadRequest(err), nil)
return return
} }
claims, ok := authtypes.ClaimsFromContext(r.Context())
if !ok {
RespondError(w, model.UnauthorizedError(errors.New("unauthorized")), nil)
return
}
integration, apiErr := aH.IntegrationsController.Install( integration, apiErr := aH.IntegrationsController.Install(
r.Context(), &req, r.Context(), claims.OrgID, &req,
) )
if apiErr != nil { if apiErr != nil {
RespondError(w, apiErr, nil) RespondError(w, apiErr, nil)
@@ -3834,8 +3763,13 @@ func (aH *APIHandler) UninstallIntegration(
RespondError(w, model.BadRequest(err), nil) RespondError(w, model.BadRequest(err), nil)
return return
} }
claims, ok := authtypes.ClaimsFromContext(r.Context())
if !ok {
RespondError(w, model.UnauthorizedError(errors.New("unauthorized")), nil)
return
}
apiErr := aH.IntegrationsController.Uninstall(r.Context(), &req) apiErr := aH.IntegrationsController.Uninstall(r.Context(), claims.OrgID, &req)
if apiErr != nil { if apiErr != nil {
RespondError(w, apiErr, nil) RespondError(w, apiErr, nil)
return return
@@ -4494,7 +4428,7 @@ func (aH *APIHandler) listLogsPipelines(ctx context.Context, orgID string) (
) { ) {
// get lateset agent config // get lateset agent config
latestVersion := -1 latestVersion := -1
lastestConfig, err := agentConf.GetLatestVersion(ctx, logPipelines) lastestConfig, err := agentConf.GetLatestVersion(ctx, orgID, logPipelines)
if err != nil && err.Type() != model.ErrorNotFound { if err != nil && err.Type() != model.ErrorNotFound {
return nil, model.WrapApiError(err, "failed to get latest agent config version") return nil, model.WrapApiError(err, "failed to get latest agent config version")
} }
@@ -4503,14 +4437,14 @@ func (aH *APIHandler) listLogsPipelines(ctx context.Context, orgID string) (
latestVersion = lastestConfig.Version latestVersion = lastestConfig.Version
} }
payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, latestVersion) payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, orgID, latestVersion)
if err != nil { if err != nil {
return nil, model.WrapApiError(err, "failed to get pipelines") return nil, model.WrapApiError(err, "failed to get pipelines")
} }
// todo(Nitya): make a new API for history pagination // todo(Nitya): make a new API for history pagination
limit := 10 limit := 10
history, err := agentConf.GetConfigHistory(ctx, logPipelines, limit) history, err := agentConf.GetConfigHistory(ctx, orgID, logPipelines, limit)
if err != nil { if err != nil {
return nil, model.WrapApiError(err, "failed to get config history") return nil, model.WrapApiError(err, "failed to get config history")
} }
@@ -4522,14 +4456,14 @@ func (aH *APIHandler) listLogsPipelines(ctx context.Context, orgID string) (
func (aH *APIHandler) listLogsPipelinesByVersion(ctx context.Context, orgID string, version int) ( func (aH *APIHandler) listLogsPipelinesByVersion(ctx context.Context, orgID string, version int) (
*logparsingpipeline.PipelinesResponse, *model.ApiError, *logparsingpipeline.PipelinesResponse, *model.ApiError,
) { ) {
payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version) payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, orgID, version)
if err != nil { if err != nil {
return nil, model.WrapApiError(err, "failed to get pipelines by version") return nil, model.WrapApiError(err, "failed to get pipelines by version")
} }
// todo(Nitya): make a new API for history pagination // todo(Nitya): make a new API for history pagination
limit := 10 limit := 10
history, err := agentConf.GetConfigHistory(ctx, logPipelines, limit) history, err := agentConf.GetConfigHistory(ctx, orgID, logPipelines, limit)
if err != nil { if err != nil {
return nil, model.WrapApiError(err, "failed to retrieve agent config history") return nil, model.WrapApiError(err, "failed to retrieve agent config history")
} }

View File

@@ -87,7 +87,7 @@ type InstallIntegrationRequest struct {
} }
func (c *Controller) Install( func (c *Controller) Install(
ctx context.Context, req *InstallIntegrationRequest, ctx context.Context, orgId string, req *InstallIntegrationRequest,
) (*IntegrationsListItem, *model.ApiError) { ) (*IntegrationsListItem, *model.ApiError) {
res, apiErr := c.mgr.InstallIntegration( res, apiErr := c.mgr.InstallIntegration(
ctx, req.IntegrationId, req.Config, ctx, req.IntegrationId, req.Config,
@@ -104,7 +104,7 @@ type UninstallIntegrationRequest struct {
} }
func (c *Controller) Uninstall( func (c *Controller) Uninstall(
ctx context.Context, req *UninstallIntegrationRequest, ctx context.Context, orgId string, req *UninstallIntegrationRequest,
) *model.ApiError { ) *model.ApiError {
if len(req.IntegrationId) < 1 { if len(req.IntegrationId) < 1 {
return model.BadRequest(fmt.Errorf( return model.BadRequest(fmt.Errorf(

View File

@@ -12,6 +12,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/query-service/utils" "github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes" "github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/google/uuid" "github.com/google/uuid"
@@ -39,10 +40,10 @@ func NewLogParsingPipelinesController(
// PipelinesResponse is used to prepare http response for pipelines config related requests // PipelinesResponse is used to prepare http response for pipelines config related requests
type PipelinesResponse struct { type PipelinesResponse struct {
*agentConf.ConfigVersion *types.AgentConfigVersion
Pipelines []pipelinetypes.GettablePipeline `json:"pipelines"` Pipelines []pipelinetypes.GettablePipeline `json:"pipelines"`
History []agentConf.ConfigVersion `json:"history"` History []types.AgentConfigVersion `json:"history"`
} }
// ApplyPipelines stores new or changed pipelines and initiates a new config update // ApplyPipelines stores new or changed pipelines and initiates a new config update
@@ -86,12 +87,12 @@ func (ic *LogParsingPipelineController) ApplyPipelines(
} }
// prepare config by calling gen func // prepare config by calling gen func
cfg, err := agentConf.StartNewVersion(ctx, claims.UserID, agentConf.ElementTypeLogPipelines, elements) cfg, err := agentConf.StartNewVersion(ctx, claims.OrgID, claims.UserID, types.ElementTypeLogPipelines, elements)
if err != nil || cfg == nil { if err != nil || cfg == nil {
return nil, err return nil, err
} }
return ic.GetPipelinesByVersion(ctx, cfg.Version) return ic.GetPipelinesByVersion(ctx, claims.OrgID, cfg.Version)
} }
func (ic *LogParsingPipelineController) ValidatePipelines( func (ic *LogParsingPipelineController) ValidatePipelines(
@@ -138,20 +139,12 @@ func (ic *LogParsingPipelineController) ValidatePipelines(
// Returns effective list of pipelines including user created // Returns effective list of pipelines including user created
// pipelines and pipelines for installed integrations // pipelines and pipelines for installed integrations
func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion( func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion(
ctx context.Context, version int, ctx context.Context, orgID string, version int,
) ([]pipelinetypes.GettablePipeline, *model.ApiError) { ) ([]pipelinetypes.GettablePipeline, *model.ApiError) {
result := []pipelinetypes.GettablePipeline{} result := []pipelinetypes.GettablePipeline{}
// todo(nitya): remove this once we fix agents in multitenancy
defaultOrgID, err := ic.GetDefaultOrgID(ctx)
if err != nil {
return nil, model.WrapApiError(err, "failed to get default org ID")
}
fmt.Println("defaultOrgID", defaultOrgID)
if version >= 0 { if version >= 0 {
savedPipelines, errors := ic.getPipelinesByVersion(ctx, defaultOrgID, version) savedPipelines, errors := ic.getPipelinesByVersion(ctx, orgID, version)
if errors != nil { if errors != nil {
zap.L().Error("failed to get pipelines for version", zap.Int("version", version), zap.Errors("errors", errors)) zap.L().Error("failed to get pipelines for version", zap.Int("version", version), zap.Errors("errors", errors))
return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version %v", errors)) return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version %v", errors))
@@ -203,18 +196,18 @@ func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion(
// GetPipelinesByVersion responds with version info and associated pipelines // GetPipelinesByVersion responds with version info and associated pipelines
func (ic *LogParsingPipelineController) GetPipelinesByVersion( func (ic *LogParsingPipelineController) GetPipelinesByVersion(
ctx context.Context, version int, ctx context.Context, orgId string, version int,
) (*PipelinesResponse, *model.ApiError) { ) (*PipelinesResponse, *model.ApiError) {
pipelines, errors := ic.getEffectivePipelinesByVersion(ctx, version) pipelines, errors := ic.getEffectivePipelinesByVersion(ctx, orgId, version)
if errors != nil { if errors != nil {
zap.L().Error("failed to get pipelines for version", zap.Int("version", version), zap.Error(errors)) zap.L().Error("failed to get pipelines for version", zap.Int("version", version), zap.Error(errors))
return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version %v", errors)) return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version %v", errors))
} }
var configVersion *agentConf.ConfigVersion var configVersion *types.AgentConfigVersion
if version >= 0 { if version >= 0 {
cv, err := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, version) cv, err := agentConf.GetConfigVersion(ctx, orgId, types.ElementTypeLogPipelines, version)
if err != nil { if err != nil {
zap.L().Error("failed to get config for version", zap.Int("version", version), zap.Error(err)) zap.L().Error("failed to get config for version", zap.Int("version", version), zap.Error(err))
return nil, model.WrapApiError(err, "failed to get config for given version") return nil, model.WrapApiError(err, "failed to get config for given version")
@@ -223,8 +216,8 @@ func (ic *LogParsingPipelineController) GetPipelinesByVersion(
} }
return &PipelinesResponse{ return &PipelinesResponse{
ConfigVersion: configVersion, AgentConfigVersion: configVersion,
Pipelines: pipelines, Pipelines: pipelines,
}, nil }, nil
} }
@@ -263,8 +256,9 @@ func (pc *LogParsingPipelineController) AgentFeatureType() agentConf.AgentFeatur
// Implements agentConf.AgentFeature interface. // Implements agentConf.AgentFeature interface.
func (pc *LogParsingPipelineController) RecommendAgentConfig( func (pc *LogParsingPipelineController) RecommendAgentConfig(
orgId string,
currentConfYaml []byte, currentConfYaml []byte,
configVersion *agentConf.ConfigVersion, configVersion *types.AgentConfigVersion,
) ( ) (
recommendedConfYaml []byte, recommendedConfYaml []byte,
serializedSettingsUsed string, serializedSettingsUsed string,
@@ -276,7 +270,7 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
} }
pipelinesResp, apiErr := pc.GetPipelinesByVersion( pipelinesResp, apiErr := pc.GetPipelinesByVersion(
context.Background(), pipelinesVersion, context.Background(), orgId, pipelinesVersion,
) )
if apiErr != nil { if apiErr != nil {
return nil, "", apiErr return nil, "", apiErr

View File

@@ -129,20 +129,6 @@ func (r *Repo) getPipelinesByVersion(
return gettablePipelines, errors return gettablePipelines, errors
} }
func (r *Repo) GetDefaultOrgID(ctx context.Context) (string, *model.ApiError) {
var orgs []types.Organization
err := r.sqlStore.BunDB().NewSelect().
Model(&orgs).
Scan(ctx)
if err != nil {
return "", model.InternalError(errors.Wrap(err, "failed to get default org ID"))
}
if len(orgs) == 0 {
return "", model.InternalError(errors.New("no orgs found"))
}
return orgs[0].ID, nil
}
// GetPipelines returns pipeline and errors (if any) // GetPipelines returns pipeline and errors (if any)
func (r *Repo) GetPipeline( func (r *Repo) GetPipeline(
ctx context.Context, orgID string, id string, ctx context.Context, orgID string, id string,

View File

@@ -6,6 +6,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/app/opamp/model" "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
"github.com/SigNoz/signoz/pkg/query-service/utils" "github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/knadh/koanf" "github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/yaml" "github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes" "github.com/knadh/koanf/providers/rawbytes"
@@ -21,6 +22,9 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
tb := newTestbed(t) tb := newTestbed(t)
orgID, err := utils.GetTestOrgId(tb.sqlStore)
require.Nil(err)
require.Equal( require.Equal(
0, len(tb.testConfigProvider.ConfigUpdateSubscribers), 0, len(tb.testConfigProvider.ConfigUpdateSubscribers),
"there should be no agent config subscribers at the start", "there should be no agent config subscribers at the start",
@@ -36,6 +40,7 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
require.False(tb.testConfigProvider.HasRecommendations()) require.False(tb.testConfigProvider.HasRecommendations())
agent1Conn := &MockOpAmpConnection{} agent1Conn := &MockOpAmpConnection{}
agent1Id := "testAgent1" agent1Id := "testAgent1"
// get orgId from the db
tb.opampServer.OnMessage( tb.opampServer.OnMessage(
agent1Conn, agent1Conn,
&protobufs.AgentToServer{ &protobufs.AgentToServer{
@@ -43,6 +48,16 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
EffectiveConfig: &protobufs.EffectiveConfig{ EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: initialAgentConf(), ConfigMap: initialAgentConf(),
}, },
AgentDescription: &protobufs.AgentDescription{
IdentifyingAttributes: []*protobufs.KeyValue{
{
Key: "orgId",
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{StringValue: orgID},
},
},
},
},
}, },
) )
lastAgent1Msg := agent1Conn.LatestMsgFromServer() lastAgent1Msg := agent1Conn.LatestMsgFromServer()
@@ -66,6 +81,16 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
EffectiveConfig: &protobufs.EffectiveConfig{ EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: initialAgentConf(), ConfigMap: initialAgentConf(),
}, },
AgentDescription: &protobufs.AgentDescription{
IdentifyingAttributes: []*protobufs.KeyValue{
{
Key: "orgId",
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{StringValue: orgID},
},
},
},
},
}, },
) )
lastAgent2Msg := agent2Conn.LatestMsgFromServer() lastAgent2Msg := agent2Conn.LatestMsgFromServer()
@@ -162,22 +187,26 @@ type testbed struct {
testConfigProvider *MockAgentConfigProvider testConfigProvider *MockAgentConfigProvider
opampServer *Server opampServer *Server
t *testing.T t *testing.T
sqlStore sqlstore.SQLStore
} }
func newTestbed(t *testing.T) *testbed { func newTestbed(t *testing.T) *testbed {
testDB := utils.NewQueryServiceDBForTests(t) testDB := utils.NewQueryServiceDBForTests(t)
_, err := model.InitDB(testDB.SQLxDB()) model.InitDB(testDB)
if err != nil {
t.Fatalf("could not init opamp model: %v", err)
}
testConfigProvider := NewMockAgentConfigProvider() testConfigProvider := NewMockAgentConfigProvider()
opampServer := InitializeServer(nil, testConfigProvider) opampServer := InitializeServer(nil, testConfigProvider)
// create a test org
err := utils.CreateTestOrg(t, testDB)
if err != nil {
t.Fatalf("could not create test org: %v", err)
}
return &testbed{ return &testbed{
testConfigProvider: testConfigProvider, testConfigProvider: testConfigProvider,
opampServer: opampServer, opampServer: opampServer,
t: t, t: t,
sqlStore: testDB,
} }
} }

View File

@@ -60,13 +60,13 @@ func UpsertControlProcessors(
agenthash, err := addIngestionControlToAgent(agent, signal, processors, false) agenthash, err := addIngestionControlToAgent(agent, signal, processors, false)
if err != nil { if err != nil {
zap.L().Error("failed to push ingestion rules config to agent", zap.String("agentID", agent.ID), zap.Error(err)) zap.L().Error("failed to push ingestion rules config to agent", zap.String("agentID", agent.ID.StringValue()), zap.Error(err))
continue continue
} }
if agenthash != "" { if agenthash != "" {
// subscribe callback // subscribe callback
model.ListenToConfigUpdate(agent.ID, agenthash, callback) model.ListenToConfigUpdate(agent.ID.StringValue(), agenthash, callback)
} }
hash = agenthash hash = agenthash
@@ -89,7 +89,7 @@ func addIngestionControlToAgent(agent *model.Agent, signal string, processors ma
// add ingestion control spec // add ingestion control spec
err = makeIngestionControlSpec(agentConf, Signal(signal), processors) err = makeIngestionControlSpec(agentConf, Signal(signal), processors)
if err != nil { if err != nil {
zap.L().Error("failed to prepare ingestion control processors for agent", zap.String("agentID", agent.ID), zap.Error(err)) zap.L().Error("failed to prepare ingestion control processors for agent", zap.String("agentID", agent.ID.StringValue()), zap.Error(err))
return confHash, err return confHash, err
} }

View File

@@ -67,7 +67,7 @@ func (ta *MockAgentConfigProvider) HasRecommendations() bool {
} }
// AgentConfigProvider interface // AgentConfigProvider interface
func (ta *MockAgentConfigProvider) RecommendAgentConfig(baseConfYaml []byte) ( func (ta *MockAgentConfigProvider) RecommendAgentConfig(orgId string, baseConfYaml []byte) (
[]byte, string, error, []byte, string, error,
) { ) {
if len(ta.ZPagesEndpoint) < 1 { if len(ta.ZPagesEndpoint) < 1 {
@@ -92,6 +92,7 @@ func (ta *MockAgentConfigProvider) RecommendAgentConfig(baseConfYaml []byte) (
// AgentConfigProvider interface // AgentConfigProvider interface
func (ta *MockAgentConfigProvider) ReportConfigDeploymentStatus( func (ta *MockAgentConfigProvider) ReportConfigDeploymentStatus(
orgId string,
agentId string, agentId string,
configId string, configId string,
err error, err error,

View File

@@ -7,33 +7,20 @@ import (
"sync" "sync"
"time" "time"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"github.com/open-telemetry/opamp-go/protobufs" "github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server/types" opampTypes "github.com/open-telemetry/opamp-go/server/types"
) )
type AgentStatus int
const (
AgentStatusUnknown AgentStatus = iota
AgentStatusConnected
AgentStatusDisconnected
)
// set in agent description when agent is capable of supporting
// lb exporter configuration. values: 1 (true) or 0 (false)
const lbExporterFlag = "capabilities.lbexporter"
type Agent struct { type Agent struct {
ID string `json:"agentId" yaml:"agentId" db:"agent_id"` types.StorableAgent
StartedAt time.Time `json:"startedAt" yaml:"startedAt" db:"started_at"` remoteConfig *protobufs.AgentRemoteConfig
TerminatedAt time.Time `json:"terminatedAt" yaml:"terminatedAt" db:"terminated_at"` Status *protobufs.AgentToServer
EffectiveConfig string `json:"effectiveConfig" yaml:"effectiveConfig" db:"effective_config"`
CurrentStatus AgentStatus `json:"currentStatus" yaml:"currentStatus" db:"current_status"`
remoteConfig *protobufs.AgentRemoteConfig
Status *protobufs.AgentToServer
// can this agent be load balancer // can this agent be load balancer
CanLB bool CanLB bool
@@ -41,13 +28,18 @@ type Agent struct {
// is this agent setup as load balancer // is this agent setup as load balancer
IsLb bool IsLb bool
conn types.Connection conn opampTypes.Connection
connMutex sync.Mutex connMutex sync.Mutex
mux sync.RWMutex mux sync.RWMutex
store sqlstore.SQLStore
} }
func New(ID string, conn types.Connection) *Agent { // set in agent description when agent is capable of supporting
return &Agent{ID: ID, StartedAt: time.Now(), CurrentStatus: AgentStatusConnected, conn: conn} // lb exporter configuration. values: 1 (true) or 0 (false)
const lbExporterFlag = "capabilities.lbexporter"
func New(store sqlstore.SQLStore, orgID string, ID string, conn opampTypes.Connection) *Agent {
return &Agent{StorableAgent: types.StorableAgent{OrgID: orgID, Identifiable: types.Identifiable{ID: valuer.GenerateUUID()}, StartedAt: time.Now(), CurrentStatus: types.AgentStatusConnected}, conn: conn, store: store}
} }
// Upsert inserts or updates the agent in the database. // Upsert inserts or updates the agent in the database.
@@ -55,17 +47,13 @@ func (agent *Agent) Upsert() error {
agent.mux.Lock() agent.mux.Lock()
defer agent.mux.Unlock() defer agent.mux.Unlock()
_, err := db.NamedExec(`INSERT OR REPLACE INTO agents ( _, err := agent.store.BunDB().NewInsert().
agent_id, Model(&agent.StorableAgent).
started_at, On("CONFLICT (org_id, id) DO UPDATE").
effective_config, Set("started_at = EXCLUDED.started_at").
current_status Set("effective_config = EXCLUDED.effective_config").
) VALUES ( Set("current_status = EXCLUDED.current_status").
:agent_id, Exec(context.Background())
:started_at,
:effective_config,
:current_status
)`, agent)
if err != nil { if err != nil {
return err return err
} }
@@ -135,11 +123,11 @@ func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer) (
// todo: need to address multiple agent scenario here // todo: need to address multiple agent scenario here
// for now, the first response will be sent back to the UI // for now, the first response will be sent back to the UI
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED { if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED {
onConfigSuccess(agent.ID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash)) onConfigSuccess(agent.OrgID, agent.ID.StringValue(), string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
} }
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED { if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED {
onConfigFailure(agent.ID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage) onConfigFailure(agent.OrgID, agent.ID.StringValue(), string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
} }
} }
} }
@@ -269,7 +257,7 @@ func (agent *Agent) processStatusUpdate(
agent.SendToAgent(response) agent.SendToAgent(response)
ListenToConfigUpdate( ListenToConfigUpdate(
agent.ID, agent.ID.StringValue(),
string(response.RemoteConfig.ConfigHash), string(response.RemoteConfig.ConfigHash),
configProvider.ReportConfigDeploymentStatus, configProvider.ReportConfigDeploymentStatus,
) )
@@ -277,9 +265,9 @@ func (agent *Agent) processStatusUpdate(
} }
func (agent *Agent) updateRemoteConfig(configProvider AgentConfigProvider) bool { func (agent *Agent) updateRemoteConfig(configProvider AgentConfigProvider) bool {
recommendedConfig, confId, err := configProvider.RecommendAgentConfig([]byte(agent.EffectiveConfig)) recommendedConfig, confId, err := configProvider.RecommendAgentConfig(agent.OrgID, []byte(agent.EffectiveConfig))
if err != nil { if err != nil {
zap.L().Error("could not generate config recommendation for agent", zap.String("agentID", agent.ID), zap.Error(err)) zap.L().Error("could not generate config recommendation for agent", zap.String("agentID", agent.ID.StringValue()), zap.Error(err))
return false return false
} }

View File

@@ -1,19 +1,19 @@
package model package model
import ( import (
"context"
"fmt" "fmt"
"sync" "sync"
"time" "time"
"github.com/jmoiron/sqlx" "github.com/SigNoz/signoz/pkg/sqlstore"
signozTypes "github.com/SigNoz/signoz/pkg/types"
"github.com/open-telemetry/opamp-go/protobufs" "github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server/types" "github.com/open-telemetry/opamp-go/server/types"
"github.com/pkg/errors" "github.com/pkg/errors"
"go.uber.org/zap" "go.uber.org/zap"
) )
var db *sqlx.DB
var AllAgents = Agents{ var AllAgents = Agents{
agentsById: map[string]*Agent{}, agentsById: map[string]*Agent{},
connections: map[types.Connection]map[string]bool{}, connections: map[types.Connection]map[string]bool{},
@@ -23,6 +23,7 @@ type Agents struct {
mux sync.RWMutex mux sync.RWMutex
agentsById map[string]*Agent agentsById map[string]*Agent
connections map[types.Connection]map[string]bool connections map[types.Connection]map[string]bool
store sqlstore.SQLStore
} }
func (a *Agents) Count() int { func (a *Agents) Count() int {
@@ -30,15 +31,14 @@ func (a *Agents) Count() int {
} }
// Initialize the database and create schema if needed // Initialize the database and create schema if needed
func InitDB(qsDB *sqlx.DB) (*sqlx.DB, error) { func InitDB(sqlStore sqlstore.SQLStore) {
db = qsDB
AllAgents = Agents{ AllAgents = Agents{
agentsById: make(map[string]*Agent), agentsById: make(map[string]*Agent),
connections: make(map[types.Connection]map[string]bool), connections: make(map[types.Connection]map[string]bool),
mux: sync.RWMutex{}, mux: sync.RWMutex{},
store: sqlStore,
} }
return db, nil
} }
// RemoveConnection removes the connection all Agent instances associated with the // RemoveConnection removes the connection all Agent instances associated with the
@@ -49,7 +49,7 @@ func (agents *Agents) RemoveConnection(conn types.Connection) {
for instanceId := range agents.connections[conn] { for instanceId := range agents.connections[conn] {
agent := agents.agentsById[instanceId] agent := agents.agentsById[instanceId]
agent.CurrentStatus = AgentStatusDisconnected agent.CurrentStatus = signozTypes.AgentStatusDisconnected
agent.TerminatedAt = time.Now() agent.TerminatedAt = time.Now()
_ = agent.Upsert() _ = agent.Upsert()
delete(agents.agentsById, instanceId) delete(agents.agentsById, instanceId)
@@ -67,27 +67,43 @@ func (agents *Agents) FindAgent(agentID string) *Agent {
// FindOrCreateAgent returns the Agent instance associated with the given agentID. // FindOrCreateAgent returns the Agent instance associated with the given agentID.
// If the Agent instance does not exist, it is created and added to the list of // If the Agent instance does not exist, it is created and added to the list of
// Agent instances. // Agent instances.
func (agents *Agents) FindOrCreateAgent(agentID string, conn types.Connection) (*Agent, bool, error) { func (agents *Agents) FindOrCreateAgent(agentID string, conn types.Connection, orgId string) (*Agent, bool, error) {
agents.mux.Lock() agents.mux.Lock()
defer agents.mux.Unlock() defer agents.mux.Unlock()
var created bool
agent, ok := agents.agentsById[agentID] agent, ok := agents.agentsById[agentID]
var err error
if !ok || agent == nil {
agent = New(agentID, conn)
err = agent.Upsert()
if err != nil {
return nil, created, err
}
agents.agentsById[agentID] = agent
if agents.connections[conn] == nil { if ok && agent != nil {
agents.connections[conn] = map[string]bool{} return agent, false, nil
}
agents.connections[conn][agentID] = true
created = true
} }
return agent, created, nil
// This is for single org mode
if orgId == "SIGNOZ##DEFAULT##ORG##ID" {
err := agents.store.BunDB().NewSelect().
Model((*signozTypes.Organization)(nil)).
ColumnExpr("id").
Limit(1).
Scan(context.Background(), &orgId)
if err != nil {
return nil, false, err
}
}
if !ok && orgId == "" {
return nil, false, errors.New("cannot create agent without orgId")
}
agent = New(agents.store, orgId, agentID, conn)
err := agent.Upsert()
if err != nil {
return nil, false, err
}
agents.agentsById[agentID] = agent
if agents.connections[conn] == nil {
agents.connections[conn] = map[string]bool{}
}
agents.connections[conn][agentID] = true
return agent, true, nil
} }
func (agents *Agents) GetAllAgents() []*Agent { func (agents *Agents) GetAllAgents() []*Agent {
@@ -108,18 +124,19 @@ func (agents *Agents) RecommendLatestConfigToAll(
) error { ) error {
for _, agent := range agents.GetAllAgents() { for _, agent := range agents.GetAllAgents() {
newConfig, confId, err := provider.RecommendAgentConfig( newConfig, confId, err := provider.RecommendAgentConfig(
agent.OrgID,
[]byte(agent.EffectiveConfig), []byte(agent.EffectiveConfig),
) )
if err != nil { if err != nil {
return errors.Wrap(err, fmt.Sprintf( return errors.Wrap(err, fmt.Sprintf(
"could not generate conf recommendation for %v", agent.ID, "could not generate conf recommendation for %v", agent.ID.StringValue(),
)) ))
} }
// Recommendation is same as current config // Recommendation is same as current config
if string(newConfig) == agent.EffectiveConfig { if string(newConfig) == agent.EffectiveConfig {
zap.L().Info( zap.L().Info(
"Recommended config same as current effective config for agent", zap.String("agentID", agent.ID), "Recommended config same as current effective config for agent", zap.String("agentID", agent.ID.StringValue()),
) )
return nil return nil
} }
@@ -144,7 +161,7 @@ func (agents *Agents) RecommendLatestConfigToAll(
RemoteConfig: newRemoteConfig, RemoteConfig: newRemoteConfig,
}) })
ListenToConfigUpdate(agent.ID, confId, provider.ReportConfigDeploymentStatus) ListenToConfigUpdate(agent.ID.StringValue(), confId, provider.ReportConfigDeploymentStatus)
} }
return nil return nil
} }

View File

@@ -4,7 +4,7 @@ package model
type AgentConfigProvider interface { type AgentConfigProvider interface {
// Generate recommended config for an agent based on its `currentConfYaml` // Generate recommended config for an agent based on its `currentConfYaml`
// and current state of user facing settings for agent based features. // and current state of user facing settings for agent based features.
RecommendAgentConfig(currentConfYaml []byte) ( RecommendAgentConfig(orgId string, currentConfYaml []byte) (
recommendedConfYaml []byte, recommendedConfYaml []byte,
// Opaque id of the recommended config, used for reporting deployment status updates // Opaque id of the recommended config, used for reporting deployment status updates
configId string, configId string,
@@ -13,6 +13,7 @@ type AgentConfigProvider interface {
// Report deployment status for config recommendations generated by RecommendAgentConfig // Report deployment status for config recommendations generated by RecommendAgentConfig
ReportConfigDeploymentStatus( ReportConfigDeploymentStatus(
orgId string,
agentId string, agentId string,
configId string, configId string,
err error, err error,

View File

@@ -15,7 +15,7 @@ func init() {
} }
} }
type OnChangeCallback func(agentId string, hash string, err error) type OnChangeCallback func(orgId string, agentId string, hash string, err error)
// responsible for managing subscribers on config change // responsible for managing subscribers on config change
type Coordinator struct { type Coordinator struct {
@@ -25,16 +25,16 @@ type Coordinator struct {
subscribers map[string][]OnChangeCallback subscribers map[string][]OnChangeCallback
} }
func onConfigSuccess(agentId string, hash string) { func onConfigSuccess(orgId string, agentId string, hash string) {
notifySubscribers(agentId, hash, nil) notifySubscribers(orgId, agentId, hash, nil)
} }
func onConfigFailure(agentId string, hash string, errorMessage string) { func onConfigFailure(orgId string, agentId string, hash string, errorMessage string) {
notifySubscribers(agentId, hash, fmt.Errorf(errorMessage)) notifySubscribers(orgId, agentId, hash, fmt.Errorf(errorMessage))
} }
// OnSuccess listens to config changes and notifies subscribers // OnSuccess listens to config changes and notifies subscribers
func notifySubscribers(agentId string, hash string, err error) { func notifySubscribers(orgId string, agentId string, hash string, err error) {
// this method currently does not handle multi-agent scenario. // this method currently does not handle multi-agent scenario.
// as soon as a message is delivered, we release all the subscribers // as soon as a message is delivered, we release all the subscribers
// for a given hash // for a given hash
@@ -44,7 +44,7 @@ func notifySubscribers(agentId string, hash string, err error) {
} }
for _, s := range subs { for _, s := range subs {
s(agentId, hash, err) s(orgId, agentId, hash, err)
} }
// delete all subscribers for this hash, assume future // delete all subscribers for this hash, assume future

View File

@@ -2,6 +2,7 @@ package opamp
import ( import (
"context" "context"
"time"
model "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model" model "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
"github.com/open-telemetry/opamp-go/protobufs" "github.com/open-telemetry/opamp-go/protobufs"
@@ -53,6 +54,7 @@ func (srv *Server) Start(listener string) error {
ListenEndpoint: listener, ListenEndpoint: listener,
} }
// This will have to send request to all the agents of all tenants
unsubscribe := srv.agentConfigProvider.SubscribeToConfigUpdates(func() { unsubscribe := srv.agentConfigProvider.SubscribeToConfigUpdates(func() {
err := srv.agents.RecommendLatestConfigToAll(srv.agentConfigProvider) err := srv.agents.RecommendLatestConfigToAll(srv.agentConfigProvider)
if err != nil { if err != nil {
@@ -78,20 +80,46 @@ func (srv *Server) onDisconnect(conn types.Connection) {
srv.agents.RemoveConnection(conn) srv.agents.RemoveConnection(conn)
} }
// When the agent sends the message for the first time, then we need to know the orgID
// For the subsequent requests, agents don't send the attributes unless something is changed
// but we keep them in context mapped which is mapped to the instanceID, so we would know the
// orgID from the context
func (srv *Server) OnMessage(conn types.Connection, msg *protobufs.AgentToServer) *protobufs.ServerToAgent { func (srv *Server) OnMessage(conn types.Connection, msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
agentID := msg.InstanceUid agentID := msg.InstanceUid
agent, created, err := srv.agents.FindOrCreateAgent(agentID, conn) var orgId string
if msg.AgentDescription != nil && msg.AgentDescription.IdentifyingAttributes != nil {
for _, attr := range msg.AgentDescription.IdentifyingAttributes {
if attr.Key == "orgId" {
orgId = attr.Value.GetStringValue()
break
}
}
}
agent, created, err := srv.agents.FindOrCreateAgent(agentID, conn, orgId)
if err != nil { if err != nil {
zap.L().Error("Failed to find or create agent", zap.String("agentID", agentID), zap.Error(err)) zap.L().Error("Failed to find or create agent", zap.String("agentID", agentID), zap.Error(err))
// TODO: handle error
// Return error response according to OpAMP protocol
return &protobufs.ServerToAgent{
InstanceUid: agentID,
ErrorResponse: &protobufs.ServerErrorResponse{
Type: protobufs.ServerErrorResponseType_ServerErrorResponseType_Unavailable,
Details: &protobufs.ServerErrorResponse_RetryInfo{
RetryInfo: &protobufs.RetryInfo{
RetryAfterNanoseconds: uint64(5 * time.Second), // minimum recommended retry interval
},
},
},
}
} }
if created { if created {
agent.CanLB = model.ExtractLbFlag(msg.AgentDescription) agent.CanLB = model.ExtractLbFlag(msg.AgentDescription)
zap.L().Debug( zap.L().Debug(
"New agent added", zap.Bool("canLb", agent.CanLB), "New agent added", zap.Bool("canLb", agent.CanLB),
zap.String("ID", agent.ID), zap.String("ID", agent.ID.StringValue()),
zap.Any("status", agent.CurrentStatus), zap.Any("status", agent.CurrentStatus),
) )
} }

View File

@@ -1,84 +0,0 @@
package preferences
var preferenceMap = map[string]Preference{
"ORG_ONBOARDING": {
Key: "ORG_ONBOARDING",
Name: "Organisation Onboarding",
Description: "Organisation Onboarding",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"org"},
},
"WELCOME_CHECKLIST_DO_LATER": {
Key: "WELCOME_CHECKLIST_DO_LATER",
Name: "Welcome Checklist Do Later",
Description: "Welcome Checklist Do Later",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"user"},
},
"WELCOME_CHECKLIST_SEND_LOGS_SKIPPED": {
Key: "WELCOME_CHECKLIST_SEND_LOGS_SKIPPED",
Name: "Welcome Checklist Send Logs Skipped",
Description: "Welcome Checklist Send Logs Skipped",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"user"},
},
"WELCOME_CHECKLIST_SEND_TRACES_SKIPPED": {
Key: "WELCOME_CHECKLIST_SEND_TRACES_SKIPPED",
Name: "Welcome Checklist Send Traces Skipped",
Description: "Welcome Checklist Send Traces Skipped",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"user"},
},
"WELCOME_CHECKLIST_SEND_INFRA_METRICS_SKIPPED": {
Key: "WELCOME_CHECKLIST_SEND_INFRA_METRICS_SKIPPED",
Name: "Welcome Checklist Send Infra Metrics Skipped",
Description: "Welcome Checklist Send Infra Metrics Skipped",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"user"},
},
"WELCOME_CHECKLIST_SETUP_DASHBOARDS_SKIPPED": {
Key: "WELCOME_CHECKLIST_SETUP_DASHBOARDS_SKIPPED",
Name: "Welcome Checklist Setup Dashboards Skipped",
Description: "Welcome Checklist Setup Dashboards Skipped",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"user"},
},
"WELCOME_CHECKLIST_SETUP_ALERTS_SKIPPED": {
Key: "WELCOME_CHECKLIST_SETUP_ALERTS_SKIPPED",
Name: "Welcome Checklist Setup Alerts Skipped",
Description: "Welcome Checklist Setup Alerts Skipped",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"user"},
},
"WELCOME_CHECKLIST_SETUP_SAVED_VIEW_SKIPPED": {
Key: "WELCOME_CHECKLIST_SETUP_SAVED_VIEW_SKIPPED",
Name: "Welcome Checklist Setup Saved View Skipped",
Description: "Welcome Checklist Setup Saved View Skipped",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"user"},
},
}

View File

@@ -1,500 +0,0 @@
package preferences
import (
"context"
"database/sql"
"fmt"
"strings"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/jmoiron/sqlx"
)
type Range struct {
Min int64 `json:"min"`
Max int64 `json:"max"`
}
type Preference struct {
Key string `json:"key"`
Name string `json:"name"`
Description string `json:"description"`
ValueType string `json:"valueType"`
DefaultValue interface{} `json:"defaultValue"`
AllowedValues []interface{} `json:"allowedValues"`
IsDiscreteValues bool `json:"isDiscreteValues"`
Range Range `json:"range"`
AllowedScopes []string `json:"allowedScopes"`
}
func (p *Preference) ErrorValueTypeMismatch() *model.ApiError {
return &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("the preference value is not of expected type: %s", p.ValueType)}
}
const (
PreferenceValueTypeInteger string = "integer"
PreferenceValueTypeFloat string = "float"
PreferenceValueTypeString string = "string"
PreferenceValueTypeBoolean string = "boolean"
)
const (
OrgAllowedScope string = "org"
UserAllowedScope string = "user"
)
func (p *Preference) checkIfInAllowedValues(preferenceValue interface{}) (bool, *model.ApiError) {
switch p.ValueType {
case PreferenceValueTypeInteger:
_, ok := preferenceValue.(int64)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
case PreferenceValueTypeFloat:
_, ok := preferenceValue.(float64)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
case PreferenceValueTypeString:
_, ok := preferenceValue.(string)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
case PreferenceValueTypeBoolean:
_, ok := preferenceValue.(bool)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
}
isInAllowedValues := false
for _, value := range p.AllowedValues {
switch p.ValueType {
case PreferenceValueTypeInteger:
allowedValue, ok := value.(int64)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
if allowedValue == preferenceValue {
isInAllowedValues = true
}
case PreferenceValueTypeFloat:
allowedValue, ok := value.(float64)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
if allowedValue == preferenceValue {
isInAllowedValues = true
}
case PreferenceValueTypeString:
allowedValue, ok := value.(string)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
if allowedValue == preferenceValue {
isInAllowedValues = true
}
case PreferenceValueTypeBoolean:
allowedValue, ok := value.(bool)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
if allowedValue == preferenceValue {
isInAllowedValues = true
}
}
}
return isInAllowedValues, nil
}
func (p *Preference) IsValidValue(preferenceValue interface{}) *model.ApiError {
typeSafeValue := preferenceValue
switch p.ValueType {
case PreferenceValueTypeInteger:
val, ok := preferenceValue.(int64)
if !ok {
floatVal, ok := preferenceValue.(float64)
if !ok || floatVal != float64(int64(floatVal)) {
return p.ErrorValueTypeMismatch()
}
val = int64(floatVal)
typeSafeValue = val
}
if !p.IsDiscreteValues {
if val < p.Range.Min || val > p.Range.Max {
return &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("the preference value is not in the range specified, min: %v , max:%v", p.Range.Min, p.Range.Max)}
}
}
case PreferenceValueTypeString:
_, ok := preferenceValue.(string)
if !ok {
return p.ErrorValueTypeMismatch()
}
case PreferenceValueTypeFloat:
_, ok := preferenceValue.(float64)
if !ok {
return p.ErrorValueTypeMismatch()
}
case PreferenceValueTypeBoolean:
_, ok := preferenceValue.(bool)
if !ok {
return p.ErrorValueTypeMismatch()
}
}
// check the validity of the value being part of allowed values or the range specified if any
if p.IsDiscreteValues {
if p.AllowedValues != nil {
isInAllowedValues, valueMisMatchErr := p.checkIfInAllowedValues(typeSafeValue)
if valueMisMatchErr != nil {
return valueMisMatchErr
}
if !isInAllowedValues {
return &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("the preference value is not in the list of allowedValues: %v", p.AllowedValues)}
}
}
}
return nil
}
func (p *Preference) IsEnabledForScope(scope string) bool {
isPreferenceEnabledForGivenScope := false
if p.AllowedScopes != nil {
for _, allowedScope := range p.AllowedScopes {
if allowedScope == strings.ToLower(scope) {
isPreferenceEnabledForGivenScope = true
}
}
}
return isPreferenceEnabledForGivenScope
}
func (p *Preference) SanitizeValue(preferenceValue interface{}) interface{} {
switch p.ValueType {
case PreferenceValueTypeBoolean:
if preferenceValue == "1" || preferenceValue == true {
return true
} else {
return false
}
default:
return preferenceValue
}
}
type AllPreferences struct {
Preference
Value interface{} `json:"value"`
}
type PreferenceKV struct {
PreferenceId string `json:"preference_id" db:"preference_id"`
PreferenceValue interface{} `json:"preference_value" db:"preference_value"`
}
type UpdatePreference struct {
PreferenceValue interface{} `json:"preference_value"`
}
var db *sqlx.DB
func InitDB(inputDB *sqlx.DB) error {
db = inputDB
return nil
}
// org preference functions
func GetOrgPreference(ctx context.Context, preferenceId string, orgId string) (*PreferenceKV, *model.ApiError) {
// check if the preference key exists or not
preference, seen := preferenceMap[preferenceId]
if !seen {
return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("no such preferenceId exists: %s", preferenceId)}
}
// check if the preference is enabled for org scope or not
isPreferenceEnabled := preference.IsEnabledForScope(OrgAllowedScope)
if !isPreferenceEnabled {
return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("preference is not enabled at org scope: %s", preferenceId)}
}
// fetch the value from the database
var orgPreference PreferenceKV
query := `SELECT preference_id , preference_value FROM org_preference WHERE preference_id=$1 AND org_id=$2;`
err := db.Get(&orgPreference, query, preferenceId, orgId)
// if the value doesn't exist in db then return the default value
if err != nil {
if err == sql.ErrNoRows {
return &PreferenceKV{
PreferenceId: preferenceId,
PreferenceValue: preference.DefaultValue,
}, nil
}
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in fetching the org preference: %s", err.Error())}
}
// else return the value fetched from the org_preference table
return &PreferenceKV{
PreferenceId: preferenceId,
PreferenceValue: preference.SanitizeValue(orgPreference.PreferenceValue),
}, nil
}
func UpdateOrgPreference(ctx context.Context, preferenceId string, preferenceValue interface{}, orgId string) (*PreferenceKV, *model.ApiError) {
// check if the preference key exists or not
preference, seen := preferenceMap[preferenceId]
if !seen {
return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("no such preferenceId exists: %s", preferenceId)}
}
// check if the preference is enabled at org scope or not
isPreferenceEnabled := preference.IsEnabledForScope(OrgAllowedScope)
if !isPreferenceEnabled {
return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("preference is not enabled at org scope: %s", preferenceId)}
}
err := preference.IsValidValue(preferenceValue)
if err != nil {
return nil, err
}
// update the values in the org_preference table and return the key and the value
query := `INSERT INTO org_preference(preference_id,preference_value,org_id) VALUES($1,$2,$3)
ON CONFLICT(preference_id,org_id) DO
UPDATE SET preference_value= $2 WHERE preference_id=$1 AND org_id=$3;`
_, dberr := db.Exec(query, preferenceId, preferenceValue, orgId)
if dberr != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in setting the preference value: %s", dberr.Error())}
}
return &PreferenceKV{
PreferenceId: preferenceId,
PreferenceValue: preferenceValue,
}, nil
}
func GetAllOrgPreferences(ctx context.Context, orgId string) (*[]AllPreferences, *model.ApiError) {
// filter out all the org enabled preferences from the preference variable
allOrgPreferences := []AllPreferences{}
// fetch all the org preference values stored in org_preference table
orgPreferenceValues := []PreferenceKV{}
query := `SELECT preference_id,preference_value FROM org_preference WHERE org_id=$1;`
err := db.Select(&orgPreferenceValues, query, orgId)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in getting all org preference values: %s", err)}
}
// create a map of key vs values from the above response
preferenceValueMap := map[string]interface{}{}
for _, preferenceValue := range orgPreferenceValues {
preferenceValueMap[preferenceValue.PreferenceId] = preferenceValue.PreferenceValue
}
// update in the above filtered list wherver value present in the map
for _, preference := range preferenceMap {
isEnabledForOrgScope := preference.IsEnabledForScope(OrgAllowedScope)
if isEnabledForOrgScope {
preferenceWithValue := AllPreferences{}
preferenceWithValue.Key = preference.Key
preferenceWithValue.Name = preference.Name
preferenceWithValue.Description = preference.Description
preferenceWithValue.AllowedScopes = preference.AllowedScopes
preferenceWithValue.AllowedValues = preference.AllowedValues
preferenceWithValue.DefaultValue = preference.DefaultValue
preferenceWithValue.Range = preference.Range
preferenceWithValue.ValueType = preference.ValueType
preferenceWithValue.IsDiscreteValues = preference.IsDiscreteValues
value, seen := preferenceValueMap[preference.Key]
if seen {
preferenceWithValue.Value = value
} else {
preferenceWithValue.Value = preference.DefaultValue
}
preferenceWithValue.Value = preference.SanitizeValue(preferenceWithValue.Value)
allOrgPreferences = append(allOrgPreferences, preferenceWithValue)
}
}
return &allOrgPreferences, nil
}
// user preference functions
func GetUserPreference(ctx context.Context, preferenceId string, orgId string, userId string) (*PreferenceKV, *model.ApiError) {
// check if the preference key exists
preference, seen := preferenceMap[preferenceId]
if !seen {
return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("no such preferenceId exists: %s", preferenceId)}
}
preferenceValue := PreferenceKV{
PreferenceId: preferenceId,
PreferenceValue: preference.DefaultValue,
}
// check if the preference is enabled at user scope
isPreferenceEnabledAtUserScope := preference.IsEnabledForScope(UserAllowedScope)
if !isPreferenceEnabledAtUserScope {
return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("preference is not enabled at user scope: %s", preferenceId)}
}
isPreferenceEnabledAtOrgScope := preference.IsEnabledForScope(OrgAllowedScope)
// get the value from the org scope if enabled at org scope
if isPreferenceEnabledAtOrgScope {
orgPreference := PreferenceKV{}
query := `SELECT preference_id , preference_value FROM org_preference WHERE preference_id=$1 AND org_id=$2;`
err := db.Get(&orgPreference, query, preferenceId, orgId)
// if there is error in getting values and its not an empty rows error return from here
if err != nil && err != sql.ErrNoRows {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in getting org preference values: %s", err.Error())}
}
// if there is no error update the preference value with value from org preference
if err == nil {
preferenceValue.PreferenceValue = orgPreference.PreferenceValue
}
}
// get the value from the user_preference table, if exists return this value else the one calculated in the above step
userPreference := PreferenceKV{}
query := `SELECT preference_id, preference_value FROM user_preference WHERE preference_id=$1 AND user_id=$2;`
err := db.Get(&userPreference, query, preferenceId, userId)
if err != nil && err != sql.ErrNoRows {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in getting user preference values: %s", err.Error())}
}
if err == nil {
preferenceValue.PreferenceValue = userPreference.PreferenceValue
}
return &PreferenceKV{
PreferenceId: preferenceValue.PreferenceId,
PreferenceValue: preference.SanitizeValue(preferenceValue.PreferenceValue),
}, nil
}
func UpdateUserPreference(ctx context.Context, preferenceId string, preferenceValue interface{}, userId string) (*PreferenceKV, *model.ApiError) {
// check if the preference id is valid
preference, seen := preferenceMap[preferenceId]
if !seen {
return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("no such preferenceId exists: %s", preferenceId)}
}
// check if the preference is enabled at user scope
isPreferenceEnabledAtUserScope := preference.IsEnabledForScope(UserAllowedScope)
if !isPreferenceEnabledAtUserScope {
return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("preference is not enabled at user scope: %s", preferenceId)}
}
err := preference.IsValidValue(preferenceValue)
if err != nil {
return nil, err
}
// update the user preference values
query := `INSERT INTO user_preference(preference_id,preference_value,user_id) VALUES($1,$2,$3)
ON CONFLICT(preference_id,user_id) DO
UPDATE SET preference_value= $2 WHERE preference_id=$1 AND user_id=$3;`
_, dberrr := db.Exec(query, preferenceId, preferenceValue, userId)
if dberrr != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in setting the preference value: %s", dberrr.Error())}
}
return &PreferenceKV{
PreferenceId: preferenceId,
PreferenceValue: preferenceValue,
}, nil
}
func GetAllUserPreferences(ctx context.Context, orgId string, userId string) (*[]AllPreferences, *model.ApiError) {
allUserPreferences := []AllPreferences{}
// fetch all the org preference values stored in org_preference table
orgPreferenceValues := []PreferenceKV{}
query := `SELECT preference_id,preference_value FROM org_preference WHERE org_id=$1;`
err := db.Select(&orgPreferenceValues, query, orgId)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in getting all org preference values: %s", err)}
}
// create a map of key vs values from the above response
preferenceOrgValueMap := map[string]interface{}{}
for _, preferenceValue := range orgPreferenceValues {
preferenceOrgValueMap[preferenceValue.PreferenceId] = preferenceValue.PreferenceValue
}
// fetch all the user preference values stored in user_preference table
userPreferenceValues := []PreferenceKV{}
query = `SELECT preference_id,preference_value FROM user_preference WHERE user_id=$1;`
err = db.Select(&userPreferenceValues, query, userId)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in getting all user preference values: %s", err)}
}
// create a map of key vs values from the above response
preferenceUserValueMap := map[string]interface{}{}
for _, preferenceValue := range userPreferenceValues {
preferenceUserValueMap[preferenceValue.PreferenceId] = preferenceValue.PreferenceValue
}
// update in the above filtered list wherver value present in the map
for _, preference := range preferenceMap {
isEnabledForUserScope := preference.IsEnabledForScope(UserAllowedScope)
if isEnabledForUserScope {
preferenceWithValue := AllPreferences{}
preferenceWithValue.Key = preference.Key
preferenceWithValue.Name = preference.Name
preferenceWithValue.Description = preference.Description
preferenceWithValue.AllowedScopes = preference.AllowedScopes
preferenceWithValue.AllowedValues = preference.AllowedValues
preferenceWithValue.DefaultValue = preference.DefaultValue
preferenceWithValue.Range = preference.Range
preferenceWithValue.ValueType = preference.ValueType
preferenceWithValue.IsDiscreteValues = preference.IsDiscreteValues
preferenceWithValue.Value = preference.DefaultValue
isEnabledForOrgScope := preference.IsEnabledForScope(OrgAllowedScope)
if isEnabledForOrgScope {
value, seen := preferenceOrgValueMap[preference.Key]
if seen {
preferenceWithValue.Value = value
}
}
value, seen := preferenceUserValueMap[preference.Key]
if seen {
preferenceWithValue.Value = value
}
preferenceWithValue.Value = preference.SanitizeValue(preferenceWithValue.Value)
allUserPreferences = append(allUserPreferences, preferenceWithValue)
}
}
return &allUserPreferences, nil
}

View File

@@ -14,6 +14,8 @@ import (
"github.com/SigNoz/signoz/pkg/alertmanager" "github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/http/middleware" "github.com/SigNoz/signoz/pkg/http/middleware"
"github.com/SigNoz/signoz/pkg/modules/preference"
preferencecore "github.com/SigNoz/signoz/pkg/modules/preference/core"
"github.com/SigNoz/signoz/pkg/query-service/agentConf" "github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader" "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations" "github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
@@ -22,11 +24,11 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline" "github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
"github.com/SigNoz/signoz/pkg/query-service/app/opamp" "github.com/SigNoz/signoz/pkg/query-service/app/opamp"
opAmpModel "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model" opAmpModel "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
"github.com/SigNoz/signoz/pkg/query-service/app/preferences"
"github.com/SigNoz/signoz/pkg/signoz" "github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/preferencetypes"
"github.com/SigNoz/signoz/pkg/web" "github.com/SigNoz/signoz/pkg/web"
"github.com/rs/cors" "github.com/rs/cors"
"github.com/soheilhy/cmux" "github.com/soheilhy/cmux"
@@ -97,10 +99,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err return nil, err
} }
if err := preferences.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB()); err != nil {
return nil, err
}
if err := dashboards.InitDB(serverOptions.SigNoz.SQLStore); err != nil { if err := dashboards.InitDB(serverOptions.SigNoz.SQLStore); err != nil {
return nil, err return nil, err
} }
@@ -120,7 +118,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
} }
clickhouseReader := clickhouseReader.NewReader( clickhouseReader := clickhouseReader.NewReader(
serverOptions.SigNoz.SQLStore.SQLxDB(), serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(), serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
serverOptions.PromConfigPath, serverOptions.PromConfigPath,
fm, fm,
@@ -151,6 +149,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
c = cache.NewCache(cacheOpts) c = cache.NewCache(cacheOpts)
} }
preference := preference.NewAPI(preferencecore.NewPreference(preferencecore.NewStore(serverOptions.SigNoz.SQLStore), preferencetypes.NewDefaultPreferenceMap()))
<-readerReady <-readerReady
rm, err := makeRulesManager( rm, err := makeRulesManager(
serverOptions.RuleRepoURL, serverOptions.RuleRepoURL,
@@ -207,6 +207,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
JWT: serverOptions.Jwt, JWT: serverOptions.Jwt,
AlertmanagerAPI: alertmanager.NewAPI(serverOptions.SigNoz.Alertmanager), AlertmanagerAPI: alertmanager.NewAPI(serverOptions.SigNoz.Alertmanager),
Signoz: serverOptions.SigNoz, Signoz: serverOptions.SigNoz,
Preference: preference,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@@ -235,13 +236,10 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
s.privateHTTP = privateServer s.privateHTTP = privateServer
_, err = opAmpModel.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB()) opAmpModel.InitDB(serverOptions.SigNoz.SQLStore)
if err != nil {
return nil, err
}
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{ agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: serverOptions.SigNoz.SQLStore.SQLxDB(), Store: serverOptions.SigNoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{ AgentFeatures: []agentConf.AgentFeature{
logParsingPipelineController, logParsingPipelineController,
}, },

View File

@@ -329,6 +329,9 @@ func CreateResetPasswordToken(ctx context.Context, userId string) (*types.ResetP
} }
req := &types.ResetPasswordRequest{ req := &types.ResetPasswordRequest{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
UserID: userId, UserID: userId,
Token: token, Token: token,
} }

View File

@@ -5,6 +5,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun" "github.com/uptrace/bun"
) )
@@ -48,6 +49,7 @@ func (mds *ModelDaoSqlite) GetApdexSettings(ctx context.Context, orgID string, s
func (mds *ModelDaoSqlite) SetApdexSettings(ctx context.Context, orgID string, apdexSettings *types.ApdexSettings) *model.ApiError { func (mds *ModelDaoSqlite) SetApdexSettings(ctx context.Context, orgID string, apdexSettings *types.ApdexSettings) *model.ApiError {
// Set the org_id from the parameter since it's required for the foreign key constraint // Set the org_id from the parameter since it's required for the foreign key constraint
apdexSettings.OrgID = orgID apdexSettings.OrgID = orgID
apdexSettings.Identifiable.ID = valuer.GenerateUUID()
_, err := mds.bundb.NewInsert(). _, err := mds.bundb.NewInsert().
Model(apdexSettings). Model(apdexSettings).

View File

@@ -23,7 +23,7 @@ type Reader interface {
GetServicesList(ctx context.Context) (*[]string, error) GetServicesList(ctx context.Context) (*[]string, error)
GetDependencyGraph(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) GetDependencyGraph(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error)
GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError)
// GetDisks returns a list of disks configured in the underlying DB. It is supported by // GetDisks returns a list of disks configured in the underlying DB. It is supported by
// clickhouse only. // clickhouse only.
@@ -45,7 +45,7 @@ type Reader interface {
GetFlamegraphSpansForTrace(ctx context.Context, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError) GetFlamegraphSpansForTrace(ctx context.Context, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError)
// Setter Interfaces // Setter Interfaces
SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) SetTTL(ctx context.Context, orgID string, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error)
GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest, skipDotNames bool, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest, skipDotNames bool, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error)

View File

@@ -317,9 +317,10 @@ func createTelemetry() {
getLogsInfoInLastHeartBeatInterval, _ := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION) getLogsInfoInLastHeartBeatInterval, _ := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION)
traceTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.TraceTTL}) // TODO update this post bootstrap decision
metricsTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.MetricsTTL}) traceTTL, _ := telemetry.reader.GetTTL(ctx, "", &model.GetTTLParams{Type: constants.TraceTTL})
logsTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.LogsTTL}) metricsTTL, _ := telemetry.reader.GetTTL(ctx, "", &model.GetTTLParams{Type: constants.MetricsTTL})
logsTTL, _ := telemetry.reader.GetTTL(ctx, "", &model.GetTTLParams{Type: constants.LogsTTL})
userCount, _ := telemetry.userCountCallback(ctx) userCount, _ := telemetry.userCountCallback(ctx)

View File

@@ -293,7 +293,7 @@ func NewFilterSuggestionsTestBed(t *testing.T) *FilterSuggestionsTestBed {
testDB := utils.NewQueryServiceDBForTests(t) testDB := utils.NewQueryServiceDBForTests(t)
fm := featureManager.StartManager() fm := featureManager.StartManager()
reader, mockClickhouse := NewMockClickhouseReader(t, testDB.SQLxDB(), fm) reader, mockClickhouse := NewMockClickhouseReader(t, testDB, fm)
mockClickhouse.MatchExpectationsInOrder(false) mockClickhouse.MatchExpectationsInOrder(false)
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{ apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{

View File

@@ -14,7 +14,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/app/integrations" "github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline" "github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
"github.com/SigNoz/signoz/pkg/query-service/app/opamp" "github.com/SigNoz/signoz/pkg/query-service/app/opamp"
opampModel "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model" "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
"github.com/SigNoz/signoz/pkg/query-service/constants" "github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/query-service/dao" "github.com/SigNoz/signoz/pkg/query-service/dao"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
@@ -37,6 +37,9 @@ func TestLogPipelinesLifecycle(t *testing.T) {
testbed := NewLogPipelinesTestBed(t, nil) testbed := NewLogPipelinesTestBed(t, nil)
require := require.New(t) require := require.New(t)
orgID, err := utils.GetTestOrgId(testbed.sqlStore)
require.Nil(err)
getPipelinesResp := testbed.GetPipelinesFromQS() getPipelinesResp := testbed.GetPipelinesFromQS()
require.Equal( require.Equal(
0, len(getPipelinesResp.Pipelines), 0, len(getPipelinesResp.Pipelines),
@@ -107,7 +110,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
t, postablePipelines, createPipelinesResp, t, postablePipelines, createPipelinesResp,
) )
testbed.assertPipelinesSentToOpampClient(createPipelinesResp.Pipelines) testbed.assertPipelinesSentToOpampClient(createPipelinesResp.Pipelines)
testbed.assertNewAgentGetsPipelinesOnConnection(createPipelinesResp.Pipelines) testbed.assertNewAgentGetsPipelinesOnConnection(orgID, createPipelinesResp.Pipelines)
// Should be able to get the configured pipelines. // Should be able to get the configured pipelines.
getPipelinesResp = testbed.GetPipelinesFromQS() getPipelinesResp = testbed.GetPipelinesFromQS()
@@ -121,7 +124,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
"pipelines config history should not be empty after 1st configuration", "pipelines config history should not be empty after 1st configuration",
) )
require.Equal( require.Equal(
agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus, types.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
"pipelines deployment should be in progress after 1st configuration", "pipelines deployment should be in progress after 1st configuration",
) )
@@ -133,7 +136,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
t, postablePipelines, getPipelinesResp, t, postablePipelines, getPipelinesResp,
) )
require.Equal( require.Equal(
agentConf.Deployed, types.Deployed,
getPipelinesResp.History[0].DeployStatus, getPipelinesResp.History[0].DeployStatus,
"pipeline deployment should be complete after acknowledgment from opamp client", "pipeline deployment should be complete after acknowledgment from opamp client",
) )
@@ -145,7 +148,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
t, postablePipelines, updatePipelinesResp, t, postablePipelines, updatePipelinesResp,
) )
testbed.assertPipelinesSentToOpampClient(updatePipelinesResp.Pipelines) testbed.assertPipelinesSentToOpampClient(updatePipelinesResp.Pipelines)
testbed.assertNewAgentGetsPipelinesOnConnection(updatePipelinesResp.Pipelines) testbed.assertNewAgentGetsPipelinesOnConnection(orgID, updatePipelinesResp.Pipelines)
getPipelinesResp = testbed.GetPipelinesFromQS() getPipelinesResp = testbed.GetPipelinesFromQS()
require.Equal( require.Equal(
@@ -153,7 +156,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
"there should be 2 history entries after posting pipelines config for the 2nd time", "there should be 2 history entries after posting pipelines config for the 2nd time",
) )
require.Equal( require.Equal(
agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus, types.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
"deployment should be in progress for latest pipeline config", "deployment should be in progress for latest pipeline config",
) )
@@ -165,7 +168,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
t, postablePipelines, getPipelinesResp, t, postablePipelines, getPipelinesResp,
) )
require.Equal( require.Equal(
agentConf.Deployed, types.Deployed,
getPipelinesResp.History[0].DeployStatus, getPipelinesResp.History[0].DeployStatus,
"deployment for latest pipeline config should be complete after acknowledgment from opamp client", "deployment for latest pipeline config should be complete after acknowledgment from opamp client",
) )
@@ -219,7 +222,7 @@ func TestLogPipelinesHistory(t *testing.T) {
testbed.PostPipelinesToQS(postablePipelines) testbed.PostPipelinesToQS(postablePipelines)
getPipelinesResp = testbed.GetPipelinesFromQS() getPipelinesResp = testbed.GetPipelinesFromQS()
require.Equal(1, len(getPipelinesResp.History)) require.Equal(1, len(getPipelinesResp.History))
require.Equal(agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus) require.Equal(types.DeployInitiated, getPipelinesResp.History[0].DeployStatus)
postablePipelines.Pipelines[0].Config = append( postablePipelines.Pipelines[0].Config = append(
postablePipelines.Pipelines[0].Config, postablePipelines.Pipelines[0].Config,
@@ -238,8 +241,8 @@ func TestLogPipelinesHistory(t *testing.T) {
getPipelinesResp = testbed.GetPipelinesFromQS() getPipelinesResp = testbed.GetPipelinesFromQS()
require.Equal(2, len(getPipelinesResp.History)) require.Equal(2, len(getPipelinesResp.History))
require.Equal(agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus) require.Equal(types.DeployInitiated, getPipelinesResp.History[0].DeployStatus)
require.Equal(agentConf.DeployStatusUnknown, getPipelinesResp.History[1].DeployStatus) require.Equal(types.DeployStatusUnknown, getPipelinesResp.History[1].DeployStatus)
} }
func TestLogPipelinesValidation(t *testing.T) { func TestLogPipelinesValidation(t *testing.T) {
@@ -447,24 +450,22 @@ type LogPipelinesTestBed struct {
agentConfMgr *agentConf.Manager agentConfMgr *agentConf.Manager
opampServer *opamp.Server opampServer *opamp.Server
opampClientConn *opamp.MockOpAmpConnection opampClientConn *opamp.MockOpAmpConnection
sqlStore sqlstore.SQLStore
} }
// testDB can be injected for sharing a DB across multiple integration testbeds. // testDB can be injected for sharing a DB across multiple integration testbeds.
func NewTestbedWithoutOpamp(t *testing.T, sqlStore sqlstore.SQLStore) *LogPipelinesTestBed { func NewTestbedWithoutOpamp(t *testing.T, store sqlstore.SQLStore) *LogPipelinesTestBed {
if sqlStore == nil { if store == nil {
sqlStore = utils.NewQueryServiceDBForTests(t) store = utils.NewQueryServiceDBForTests(t)
} }
// create test org ic, err := integrations.NewController(store)
// utils.CreateTestOrg(t, sqlStore)
ic, err := integrations.NewController(sqlStore)
if err != nil { if err != nil {
t.Fatalf("could not create integrations controller: %v", err) t.Fatalf("could not create integrations controller: %v", err)
} }
controller, err := logparsingpipeline.NewLogParsingPipelinesController( controller, err := logparsingpipeline.NewLogParsingPipelinesController(
sqlStore, ic.GetPipelinesForInstalledIntegrations, store, ic.GetPipelinesForInstalledIntegrations,
) )
if err != nil { if err != nil {
t.Fatalf("could not create a logparsingpipelines controller: %v", err) t.Fatalf("could not create a logparsingpipelines controller: %v", err)
@@ -485,11 +486,11 @@ func NewTestbedWithoutOpamp(t *testing.T, sqlStore sqlstore.SQLStore) *LogPipeli
} }
// Mock an available opamp agent // Mock an available opamp agent
testDB, err := opampModel.InitDB(sqlStore.SQLxDB()) // testDB, err := opampModel.InitDB(sqlStore.SQLxDB())
require.Nil(t, err, "failed to init opamp model") require.Nil(t, err, "failed to init opamp model")
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{ agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: testDB, Store: store,
AgentFeatures: []agentConf.AgentFeature{ AgentFeatures: []agentConf.AgentFeature{
apiHandler.LogsParsingPipelineController, apiHandler.LogsParsingPipelineController,
}}) }})
@@ -500,14 +501,20 @@ func NewTestbedWithoutOpamp(t *testing.T, sqlStore sqlstore.SQLStore) *LogPipeli
testUser: user, testUser: user,
apiHandler: apiHandler, apiHandler: apiHandler,
agentConfMgr: agentConfMgr, agentConfMgr: agentConfMgr,
sqlStore: store,
} }
} }
func NewLogPipelinesTestBed(t *testing.T, testDB sqlstore.SQLStore) *LogPipelinesTestBed { func NewLogPipelinesTestBed(t *testing.T, testDB sqlstore.SQLStore) *LogPipelinesTestBed {
testbed := NewTestbedWithoutOpamp(t, testDB) testbed := NewTestbedWithoutOpamp(t, testDB)
orgID, err := utils.GetTestOrgId(testbed.sqlStore)
require.Nil(t, err)
model.InitDB(testbed.sqlStore)
opampServer := opamp.InitializeServer(nil, testbed.agentConfMgr) opampServer := opamp.InitializeServer(nil, testbed.agentConfMgr)
err := opampServer.Start(opamp.GetAvailableLocalAddress()) err = opampServer.Start(opamp.GetAvailableLocalAddress())
require.Nil(t, err, "failed to start opamp server") require.Nil(t, err, "failed to start opamp server")
t.Cleanup(func() { t.Cleanup(func() {
@@ -522,6 +529,16 @@ func NewLogPipelinesTestBed(t *testing.T, testDB sqlstore.SQLStore) *LogPipeline
EffectiveConfig: &protobufs.EffectiveConfig{ EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: newInitialAgentConfigMap(), ConfigMap: newInitialAgentConfigMap(),
}, },
AgentDescription: &protobufs.AgentDescription{
IdentifyingAttributes: []*protobufs.KeyValue{
{
Key: "orgId",
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{StringValue: orgID},
},
},
},
},
}, },
) )
@@ -728,6 +745,7 @@ func (tb *LogPipelinesTestBed) simulateOpampClientAcknowledgementForLatestConfig
} }
func (tb *LogPipelinesTestBed) assertNewAgentGetsPipelinesOnConnection( func (tb *LogPipelinesTestBed) assertNewAgentGetsPipelinesOnConnection(
orgID string,
pipelines []pipelinetypes.GettablePipeline, pipelines []pipelinetypes.GettablePipeline,
) { ) {
newAgentConn := &opamp.MockOpAmpConnection{} newAgentConn := &opamp.MockOpAmpConnection{}
@@ -738,6 +756,16 @@ func (tb *LogPipelinesTestBed) assertNewAgentGetsPipelinesOnConnection(
EffectiveConfig: &protobufs.EffectiveConfig{ EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: newInitialAgentConfigMap(), ConfigMap: newInitialAgentConfigMap(),
}, },
AgentDescription: &protobufs.AgentDescription{
IdentifyingAttributes: []*protobufs.KeyValue{
{
Key: "orgId",
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{StringValue: orgID},
},
},
},
},
}, },
) )
latestMsgFromServer := newAgentConn.LatestMsgFromServer() latestMsgFromServer := newAgentConn.LatestMsgFromServer()

View File

@@ -355,7 +355,7 @@ func NewCloudIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *CloudI
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
reader, mockClickhouse := NewMockClickhouseReader(t, testDB.SQLxDB(), fm) reader, mockClickhouse := NewMockClickhouseReader(t, testDB, fm)
mockClickhouse.MatchExpectationsInOrder(false) mockClickhouse.MatchExpectationsInOrder(false)
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{ apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{

View File

@@ -32,6 +32,9 @@ func TestSignozIntegrationLifeCycle(t *testing.T) {
require := require.New(t) require := require.New(t)
testbed := NewIntegrationsTestBed(t, nil) testbed := NewIntegrationsTestBed(t, nil)
merr := utils.CreateTestOrg(t, testbed.store)
require.NoError(merr)
installedResp := testbed.GetInstalledIntegrationsFromQS() installedResp := testbed.GetInstalledIntegrationsFromQS()
require.Equal( require.Equal(
len(installedResp.Integrations), 0, len(installedResp.Integrations), 0,
@@ -115,6 +118,11 @@ func TestLogPipelinesForInstalledSignozIntegrations(t *testing.T) {
require := require.New(t) require := require.New(t)
testDB := utils.NewQueryServiceDBForTests(t) testDB := utils.NewQueryServiceDBForTests(t)
utils.CreateTestOrg(t, testDB)
orgID, err := utils.GetTestOrgId(testDB)
require.Nil(err)
integrationsTB := NewIntegrationsTestBed(t, testDB) integrationsTB := NewIntegrationsTestBed(t, testDB)
pipelinesTB := NewLogPipelinesTestBed(t, testDB) pipelinesTB := NewLogPipelinesTestBed(t, testDB)
@@ -172,7 +180,7 @@ func TestLogPipelinesForInstalledSignozIntegrations(t *testing.T) {
require.Equal(testIntegration.Id, *integrations.IntegrationIdForPipeline(lastPipeline)) require.Equal(testIntegration.Id, *integrations.IntegrationIdForPipeline(lastPipeline))
pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines) pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines)
pipelinesTB.assertNewAgentGetsPipelinesOnConnection(getPipelinesResp.Pipelines) pipelinesTB.assertNewAgentGetsPipelinesOnConnection(orgID, getPipelinesResp.Pipelines)
// After saving a user created pipeline, pipelines response should include // After saving a user created pipeline, pipelines response should include
// both user created pipelines and pipelines for installed integrations. // both user created pipelines and pipelines for installed integrations.
@@ -217,7 +225,7 @@ func TestLogPipelinesForInstalledSignozIntegrations(t *testing.T) {
getPipelinesResp = pipelinesTB.GetPipelinesFromQS() getPipelinesResp = pipelinesTB.GetPipelinesFromQS()
require.Equal(1+len(testIntegrationPipelines), len(getPipelinesResp.Pipelines)) require.Equal(1+len(testIntegrationPipelines), len(getPipelinesResp.Pipelines))
pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines) pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines)
pipelinesTB.assertNewAgentGetsPipelinesOnConnection(getPipelinesResp.Pipelines) pipelinesTB.assertNewAgentGetsPipelinesOnConnection(orgID, getPipelinesResp.Pipelines)
// Reordering integration pipelines should be possible. // Reordering integration pipelines should be possible.
postable := postableFromPipelines(getPipelinesResp.Pipelines) postable := postableFromPipelines(getPipelinesResp.Pipelines)
@@ -234,7 +242,7 @@ func TestLogPipelinesForInstalledSignozIntegrations(t *testing.T) {
require.Equal(testIntegration.Id, *integrations.IntegrationIdForPipeline(firstPipeline)) require.Equal(testIntegration.Id, *integrations.IntegrationIdForPipeline(firstPipeline))
pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines) pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines)
pipelinesTB.assertNewAgentGetsPipelinesOnConnection(getPipelinesResp.Pipelines) pipelinesTB.assertNewAgentGetsPipelinesOnConnection(orgID, getPipelinesResp.Pipelines)
// enabling/disabling integration pipelines should be possible. // enabling/disabling integration pipelines should be possible.
require.True(firstPipeline.Enabled) require.True(firstPipeline.Enabled)
@@ -252,7 +260,7 @@ func TestLogPipelinesForInstalledSignozIntegrations(t *testing.T) {
require.False(firstPipeline.Enabled) require.False(firstPipeline.Enabled)
pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines) pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines)
pipelinesTB.assertNewAgentGetsPipelinesOnConnection(getPipelinesResp.Pipelines) pipelinesTB.assertNewAgentGetsPipelinesOnConnection(orgID, getPipelinesResp.Pipelines)
// should not be able to edit integrations pipeline. // should not be able to edit integrations pipeline.
require.Greater(len(postable.Pipelines[0].Config), 0) require.Greater(len(postable.Pipelines[0].Config), 0)
@@ -291,7 +299,7 @@ func TestLogPipelinesForInstalledSignozIntegrations(t *testing.T) {
"Pipelines for uninstalled integrations should get removed from pipelines list", "Pipelines for uninstalled integrations should get removed from pipelines list",
) )
pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines) pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines)
pipelinesTB.assertNewAgentGetsPipelinesOnConnection(getPipelinesResp.Pipelines) pipelinesTB.assertNewAgentGetsPipelinesOnConnection(orgID, getPipelinesResp.Pipelines)
} }
func TestDashboardsForInstalledIntegrationDashboards(t *testing.T) { func TestDashboardsForInstalledIntegrationDashboards(t *testing.T) {
@@ -370,6 +378,7 @@ type IntegrationsTestBed struct {
testUser *types.User testUser *types.User
qsHttpHandler http.Handler qsHttpHandler http.Handler
mockClickhouse mockhouse.ClickConnMockCommon mockClickhouse mockhouse.ClickConnMockCommon
store sqlstore.SQLStore
} }
func (tb *IntegrationsTestBed) GetAvailableIntegrationsFromQS() *integrations.IntegrationsListResponse { func (tb *IntegrationsTestBed) GetAvailableIntegrationsFromQS() *integrations.IntegrationsListResponse {
@@ -557,7 +566,7 @@ func NewIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *Integration
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
reader, mockClickhouse := NewMockClickhouseReader(t, testDB.SQLxDB(), fm) reader, mockClickhouse := NewMockClickhouseReader(t, testDB, fm)
mockClickhouse.MatchExpectationsInOrder(false) mockClickhouse.MatchExpectationsInOrder(false)
cloudIntegrationsController, err := cloudintegrations.NewController(testDB) cloudIntegrationsController, err := cloudintegrations.NewController(testDB)
@@ -593,6 +602,7 @@ func NewIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *Integration
testUser: user, testUser: user,
qsHttpHandler: router, qsHttpHandler: router,
mockClickhouse: mockClickhouse, mockClickhouse: mockClickhouse,
store: testDB,
} }
} }

View File

@@ -20,10 +20,10 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/dao" "github.com/SigNoz/signoz/pkg/query-service/dao"
"github.com/SigNoz/signoz/pkg/query-service/interfaces" "github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
mockhouse "github.com/srikanthccv/ClickHouse-go-mock" mockhouse "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@@ -33,7 +33,7 @@ import (
var jwt = authtypes.NewJWT("secret", 1*time.Hour, 2*time.Hour) var jwt = authtypes.NewJWT("secret", 1*time.Hour, 2*time.Hour)
func NewMockClickhouseReader( func NewMockClickhouseReader(
t *testing.T, testDB *sqlx.DB, featureFlags interfaces.FeatureLookup, t *testing.T, testDB sqlstore.SQLStore, featureFlags interfaces.FeatureLookup,
) ( ) (
*clickhouseReader.ClickHouseReader, mockhouse.ClickConnMockCommon, *clickhouseReader.ClickHouseReader, mockhouse.ClickConnMockCommon,
) { ) {

View File

@@ -13,6 +13,8 @@ import (
"github.com/SigNoz/signoz/pkg/sqlmigrator" "github.com/SigNoz/signoz/pkg/sqlmigrator"
"github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/sqlstore/sqlitesqlstore" "github.com/SigNoz/signoz/pkg/sqlstore/sqlitesqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/google/uuid"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
@@ -51,6 +53,7 @@ func NewTestSqliteDB(t *testing.T) (sqlStore sqlstore.SQLStore, testDBFilePath s
sqlmigration.NewUpdateDashboardAndSavedViewsFactory(sqlStore), sqlmigration.NewUpdateDashboardAndSavedViewsFactory(sqlStore),
sqlmigration.NewUpdatePatAndOrgDomainsFactory(sqlStore), sqlmigration.NewUpdatePatAndOrgDomainsFactory(sqlStore),
sqlmigration.NewUpdatePipelines(sqlStore), sqlmigration.NewUpdatePipelines(sqlStore),
sqlmigration.NewUpdateAgentsFactory(sqlStore),
), ),
) )
if err != nil { if err != nil {
@@ -76,3 +79,28 @@ func NewQueryServiceDBForTests(t *testing.T) sqlstore.SQLStore {
return sqlStore return sqlStore
} }
func CreateTestOrg(t *testing.T, store sqlstore.SQLStore) error {
org := &types.Organization{
ID: uuid.NewString(),
Name: "testOrg",
}
_, err := store.BunDB().NewInsert().Model(org).Exec(context.Background())
if err != nil {
return err
}
return nil
}
func GetTestOrgId(store sqlstore.SQLStore) (string, error) {
var orgID string
err := store.BunDB().NewSelect().
Model(&types.Organization{}).
Column("id").
Limit(1).
Scan(context.Background(), &orgID)
if err != nil {
return "", err
}
return orgID, nil
}

View File

@@ -62,6 +62,11 @@ func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedM
sqlmigration.NewUpdatePipelines(sqlstore), sqlmigration.NewUpdatePipelines(sqlstore),
sqlmigration.NewDropLicensesSitesFactory(sqlstore), sqlmigration.NewDropLicensesSitesFactory(sqlstore),
sqlmigration.NewUpdateInvitesFactory(sqlstore), sqlmigration.NewUpdateInvitesFactory(sqlstore),
sqlmigration.NewUpdateAgentsFactory(sqlstore),
sqlmigration.NewUpdateAlertmanagerFactory(sqlstore),
sqlmigration.NewUpdatePreferencesFactory(sqlstore),
sqlmigration.NewUpdateApdexTtlFactory(sqlstore),
sqlmigration.NewUpdateResetPasswordFactory(sqlstore),
) )
} }

View File

@@ -5,7 +5,6 @@ import (
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/SigNoz/signoz/pkg/alertmanager/alertmanagerserver" "github.com/SigNoz/signoz/pkg/alertmanager/alertmanagerserver"
@@ -50,12 +49,15 @@ func (migration *addAlertmanager) Up(ctx context.Context, db *bun.DB) error {
defer tx.Rollback() //nolint:errcheck defer tx.Rollback() //nolint:errcheck
if _, err := tx. // check if column exists
NewDropColumn(). if exists, err := migration.store.Dialect().ColumnExists(ctx, tx, "notification_channels", "deleted"); err != nil {
Table("notification_channels"). return err
ColumnExpr("deleted"). } else if exists {
Exec(ctx); err != nil { if _, err := tx.
if !strings.Contains(err.Error(), "no such column") { NewDropColumn().
Table("notification_channels").
ColumnExpr("deleted").
Exec(ctx); err != nil {
return err return err
} }
} }

View File

@@ -115,7 +115,7 @@ func updateOrgId(ctx context.Context, tx bun.Tx, table string) error {
} }
// copy data from org_domains to org_domains_new // copy data from org_domains to org_domains_new
if _, err := tx.ExecContext(ctx, `INSERT INTO org_domains_new (id, org_id, name, created_at, updated_at, data) SELECT id, org_id, name, created_at, updated_at, data FROM org_domains`); err != nil { if _, err := tx.ExecContext(ctx, `INSERT INTO org_domains_new (id, org_id, name, data) SELECT id, org_id, name, data FROM org_domains`); err != nil {
return err return err
} }
// delete old table // delete old table

View File

@@ -75,7 +75,7 @@ func (migration *updateInvites) Up(ctx context.Context, db *bun.DB) error {
err = migration. err = migration.
store. store.
Dialect(). Dialect().
RenameTableAndModifyModel(ctx, tx, new(existingInvite), new(newInvite), func(ctx context.Context) error { RenameTableAndModifyModel(ctx, tx, new(existingInvite), new(newInvite), OrgReference, func(ctx context.Context) error {
existingInvites := make([]*existingInvite, 0) existingInvites := make([]*existingInvite, 0)
err = tx. err = tx.
NewSelect(). NewSelect().

View File

@@ -0,0 +1,95 @@
package sqlmigration
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type updateAgents struct {
store sqlstore.SQLStore
}
func NewUpdateAgentsFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("update_agents"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return newUpdateAgents(ctx, ps, c, sqlstore)
})
}
func newUpdateAgents(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) {
return &updateAgents{
store: store,
}, nil
}
func (migration *updateAgents) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *updateAgents) Up(ctx context.Context, db *bun.DB) error {
// begin transaction
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// get all org ids
var orgIDs []string
if err := migration.store.BunDB().NewSelect().Model((*types.Organization)(nil)).Column("id").Scan(ctx, &orgIDs); err != nil {
return err
}
// add org id to dashboards table
for _, table := range []string{"agents", "agent_config_versions", "agent_config_elements"} {
if exists, err := migration.store.Dialect().ColumnExists(ctx, tx, table, "org_id"); err != nil {
return err
} else if !exists {
if _, err := tx.NewAddColumn().Table(table).ColumnExpr("org_id TEXT REFERENCES organizations(id) ON DELETE CASCADE").Exec(ctx); err != nil {
return err
}
// check if there is one org ID if yes then set it to all dashboards.
if len(orgIDs) == 1 {
orgID := orgIDs[0]
if _, err := tx.NewUpdate().Table(table).Set("org_id = ?", orgID).Where("org_id IS NULL").Exec(ctx); err != nil {
return err
}
}
}
}
// add unique constraint to agents table of org_id and agent_id
if exists, err := migration.store.Dialect().IndexExists(ctx, tx, "agents", "idx_agents_org_id_agent_id"); err != nil {
return err
} else if !exists {
if _, err := tx.NewCreateIndex().Table("agents").Index("idx_agents_org_id_agent_id").Column("org_id", "agent_id").Unique().Exec(ctx); err != nil {
return err
}
}
// rename agent_id to id
_, err = migration.store.Dialect().RenameColumn(ctx, tx, "agents", "agent_id", "id")
if err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
func (migration *updateAgents) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@@ -0,0 +1,277 @@
package sqlmigration
import (
"context"
"database/sql"
"time"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type updateAlertmanager struct {
store sqlstore.SQLStore
}
type existingChannel struct {
bun.BaseModel `bun:"table:notification_channels"`
ID int `json:"id" bun:"id,pk,autoincrement"`
Name string `json:"name" bun:"name"`
Type string `json:"type" bun:"type"`
Data string `json:"data" bun:"data"`
CreatedAt time.Time `json:"created_at" bun:"created_at"`
UpdatedAt time.Time `json:"updated_at" bun:"updated_at"`
OrgID string `json:"org_id" bun:"org_id"`
}
type newChannel struct {
bun.BaseModel `bun:"table:notification_channel"`
types.Identifiable
types.TimeAuditable
Name string `json:"name" bun:"name"`
Type string `json:"type" bun:"type"`
Data string `json:"data" bun:"data"`
OrgID string `json:"org_id" bun:"org_id"`
}
type existingAlertmanagerConfig struct {
bun.BaseModel `bun:"table:alertmanager_config"`
ID uint64 `bun:"id,pk,autoincrement"`
Config string `bun:"config,notnull,type:text"`
Hash string `bun:"hash,notnull,type:text"`
CreatedAt time.Time `bun:"created_at,notnull"`
UpdatedAt time.Time `bun:"updated_at,notnull"`
OrgID string `bun:"org_id,notnull,unique"`
}
type newAlertmanagerConfig struct {
bun.BaseModel `bun:"table:alertmanager_config_new"`
types.Identifiable
types.TimeAuditable
Config string `bun:"config,notnull,type:text"`
Hash string `bun:"hash,notnull,type:text"`
OrgID string `bun:"org_id,notnull,unique"`
}
type existingAlertmanagerState struct {
bun.BaseModel `bun:"table:alertmanager_state"`
ID uint64 `bun:"id,pk,autoincrement"`
Silences string `bun:"silences,nullzero,type:text"`
NFLog string `bun:"nflog,nullzero,type:text"`
CreatedAt time.Time `bun:"created_at,notnull"`
UpdatedAt time.Time `bun:"updated_at,notnull"`
OrgID string `bun:"org_id,notnull,unique"`
}
type newAlertmanagerState struct {
bun.BaseModel `bun:"table:alertmanager_state_new"`
types.Identifiable
types.TimeAuditable
Silences string `bun:"silences,nullzero,type:text"`
NFLog string `bun:"nflog,nullzero,type:text"`
OrgID string `bun:"org_id,notnull,unique"`
}
func NewUpdateAlertmanagerFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
return factory.
NewProviderFactory(
factory.MustNewName("update_alertmanager"),
func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return newUpdateAlertmanager(ctx, ps, c, sqlstore)
})
}
func newUpdateAlertmanager(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) {
return &updateAlertmanager{store: store}, nil
}
func (migration *updateAlertmanager) Register(migrations *migrate.Migrations) error {
if err := migrations.
Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *updateAlertmanager) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.
BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
err = migration.
store.
Dialect().
RenameTableAndModifyModel(ctx, tx, new(existingChannel), new(newChannel), OrgReference, func(ctx context.Context) error {
existingChannels := make([]*existingChannel, 0)
err = tx.
NewSelect().
Model(&existingChannels).
Scan(ctx)
if err != nil {
if err != sql.ErrNoRows {
return err
}
}
if err == nil && len(existingChannels) > 0 {
newChannels := migration.
CopyOldChannelToNewChannel(existingChannels)
_, err = tx.
NewInsert().
Model(&newChannels).
Exec(ctx)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
err = migration.
store.
Dialect().
UpdatePrimaryKey(ctx, tx, new(existingAlertmanagerConfig), new(newAlertmanagerConfig), OrgReference, func(ctx context.Context) error {
existingAlertmanagerConfigs := make([]*existingAlertmanagerConfig, 0)
err = tx.
NewSelect().
Model(&existingAlertmanagerConfigs).
Scan(ctx)
if err != nil {
if err != sql.ErrNoRows {
return err
}
}
if err == nil && len(existingAlertmanagerConfigs) > 0 {
newAlertmanagerConfigs := migration.
CopyOldConfigToNewConfig(existingAlertmanagerConfigs)
_, err = tx.
NewInsert().
Model(&newAlertmanagerConfigs).
Exec(ctx)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
err = migration.
store.
Dialect().
UpdatePrimaryKey(ctx, tx, new(existingAlertmanagerState), new(newAlertmanagerState), OrgReference, func(ctx context.Context) error {
existingAlertmanagerStates := make([]*existingAlertmanagerState, 0)
err = tx.
NewSelect().
Model(&existingAlertmanagerStates).
Scan(ctx)
if err != nil {
if err != sql.ErrNoRows {
return err
}
}
if err == nil && len(existingAlertmanagerStates) > 0 {
newAlertmanagerStates := migration.
CopyOldStateToNewState(existingAlertmanagerStates)
_, err = tx.
NewInsert().
Model(&newAlertmanagerStates).
Exec(ctx)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return err
}
return nil
}
func (migration *updateAlertmanager) Down(context.Context, *bun.DB) error {
return nil
}
func (migration *updateAlertmanager) CopyOldChannelToNewChannel(existingChannels []*existingChannel) []*newChannel {
newChannels := make([]*newChannel, 0)
for _, channel := range existingChannels {
newChannels = append(newChannels, &newChannel{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: channel.CreatedAt,
UpdatedAt: channel.UpdatedAt,
},
Name: channel.Name,
Type: channel.Type,
Data: channel.Data,
OrgID: channel.OrgID,
})
}
return newChannels
}
func (migration *updateAlertmanager) CopyOldConfigToNewConfig(existingAlertmanagerConfigs []*existingAlertmanagerConfig) []*newAlertmanagerConfig {
newAlertmanagerConfigs := make([]*newAlertmanagerConfig, 0)
for _, config := range existingAlertmanagerConfigs {
newAlertmanagerConfigs = append(newAlertmanagerConfigs, &newAlertmanagerConfig{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: config.CreatedAt,
UpdatedAt: config.UpdatedAt,
},
Config: config.Config,
Hash: config.Hash,
OrgID: config.OrgID,
})
}
return newAlertmanagerConfigs
}
func (migration *updateAlertmanager) CopyOldStateToNewState(existingAlertmanagerStates []*existingAlertmanagerState) []*newAlertmanagerState {
newAlertmanagerStates := make([]*newAlertmanagerState, 0)
for _, state := range existingAlertmanagerStates {
newAlertmanagerStates = append(newAlertmanagerStates, &newAlertmanagerState{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: state.CreatedAt,
UpdatedAt: state.UpdatedAt,
},
Silences: state.Silences,
NFLog: state.NFLog,
OrgID: state.OrgID,
})
}
return newAlertmanagerStates
}

View File

@@ -0,0 +1,202 @@
package sqlmigration
import (
"context"
"database/sql"
"fmt"
"reflect"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type updatePreferences struct {
store sqlstore.SQLStore
}
type existingOrgPreference struct {
bun.BaseModel `bun:"table:org_preference"`
PreferenceID string `bun:"preference_id,pk,type:text,notnull"`
PreferenceValue string `bun:"preference_value,type:text,notnull"`
OrgID string `bun:"org_id,pk,type:text,notnull"`
}
type newOrgPreference struct {
bun.BaseModel `bun:"table:org_preference_new"`
types.Identifiable
PreferenceID string `bun:"preference_id,type:text,notnull"`
PreferenceValue string `bun:"preference_value,type:text,notnull"`
OrgID string `bun:"org_id,type:text,notnull"`
}
type existingUserPreference struct {
bun.BaseModel `bun:"table:user_preference"`
PreferenceID string `bun:"preference_id,type:text,pk"`
PreferenceValue string `bun:"preference_value,type:text"`
UserID string `bun:"user_id,type:text,pk"`
}
type newUserPreference struct {
bun.BaseModel `bun:"table:user_preference_new"`
types.Identifiable
PreferenceID string `bun:"preference_id,type:text,notnull"`
PreferenceValue string `bun:"preference_value,type:text,notnull"`
UserID string `bun:"user_id,type:text,notnull"`
}
func NewUpdatePreferencesFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
return factory.
NewProviderFactory(
factory.MustNewName("update_preferences"),
func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return newUpdatePreferences(ctx, ps, c, sqlstore)
})
}
func newUpdatePreferences(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) {
return &updatePreferences{store: store}, nil
}
func (migration *updatePreferences) Register(migrations *migrate.Migrations) error {
if err := migrations.
Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *updatePreferences) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.
BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
err = migration.
store.
Dialect().
AddPrimaryKey(ctx, tx, new(existingOrgPreference), new(newOrgPreference), OrgReference, func(ctx context.Context) error {
existingOrgPreferences := make([]*existingOrgPreference, 0)
err = tx.
NewSelect().
Model(&existingOrgPreferences).
Scan(ctx)
if err != nil {
if err != sql.ErrNoRows {
return err
}
}
if err == nil && len(existingOrgPreferences) > 0 {
newOrgPreferences := migration.
CopyOldOrgPreferencesToNewOrgPreferences(existingOrgPreferences)
_, err = tx.
NewInsert().
Model(&newOrgPreferences).
Exec(ctx)
if err != nil {
return err
}
}
tableName := tx.Dialect().Tables().Get(reflect.TypeOf(new(existingOrgPreference))).Name
_, err = tx.
ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX IF NOT EXISTS %s_unique_idx ON %s (preference_id, org_id)", tableName, fmt.Sprintf("%s_new", tableName)))
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
err = migration.
store.
Dialect().
AddPrimaryKey(ctx, tx, new(existingUserPreference), new(newUserPreference), UserReference, func(ctx context.Context) error {
existingUserPreferences := make([]*existingUserPreference, 0)
err = tx.
NewSelect().
Model(&existingUserPreferences).
Scan(ctx)
if err != nil {
if err != sql.ErrNoRows {
return err
}
}
if err == nil && len(existingUserPreferences) > 0 {
newUserPreferences := migration.
CopyOldUserPreferencesToNewUserPreferences(existingUserPreferences)
_, err = tx.
NewInsert().
Model(&newUserPreferences).
Exec(ctx)
if err != nil {
return err
}
}
tableName := tx.Dialect().Tables().Get(reflect.TypeOf(new(existingUserPreference))).Name
_, err = tx.
ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX IF NOT EXISTS %s_unique_idx ON %s (preference_id, user_id)", tableName, fmt.Sprintf("%s_new", tableName)))
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return err
}
return nil
}
func (migration *updatePreferences) Down(context.Context, *bun.DB) error {
return nil
}
func (migration *updatePreferences) CopyOldOrgPreferencesToNewOrgPreferences(existingOrgPreferences []*existingOrgPreference) []*newOrgPreference {
newOrgPreferences := make([]*newOrgPreference, 0)
for _, preference := range existingOrgPreferences {
newOrgPreferences = append(newOrgPreferences, &newOrgPreference{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
PreferenceID: preference.PreferenceID,
PreferenceValue: preference.PreferenceValue,
OrgID: preference.OrgID,
})
}
return newOrgPreferences
}
func (migration *updatePreferences) CopyOldUserPreferencesToNewUserPreferences(existingUserPreferences []*existingUserPreference) []*newUserPreference {
newUserPreferences := make([]*newUserPreference, 0)
for _, preference := range existingUserPreferences {
newUserPreferences = append(newUserPreferences, &newUserPreference{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
PreferenceID: preference.PreferenceID,
PreferenceValue: preference.PreferenceValue,
UserID: preference.UserID,
})
}
return newUserPreferences
}

View File

@@ -0,0 +1,232 @@
package sqlmigration
import (
"context"
"database/sql"
"fmt"
"reflect"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type updateApdexTtl struct {
store sqlstore.SQLStore
}
type existingApdexSettings struct {
bun.BaseModel `bun:"table:apdex_settings"`
OrgID string `bun:"org_id,pk,type:text" json:"orgId"`
ServiceName string `bun:"service_name,pk,type:text" json:"serviceName"`
Threshold float64 `bun:"threshold,type:float,notnull" json:"threshold"`
ExcludeStatusCodes string `bun:"exclude_status_codes,type:text,notnull" json:"excludeStatusCodes"`
}
type newApdexSettings struct {
bun.BaseModel `bun:"table:apdex_setting"`
types.Identifiable
OrgID string `bun:"org_id,type:text" json:"orgId"`
ServiceName string `bun:"service_name,type:text" json:"serviceName"`
Threshold float64 `bun:"threshold,type:float,notnull" json:"threshold"`
ExcludeStatusCodes string `bun:"exclude_status_codes,type:text,notnull" json:"excludeStatusCodes"`
}
type existingTTLStatus struct {
bun.BaseModel `bun:"table:ttl_status"`
ID int `bun:"id,pk,autoincrement"`
TransactionID string `bun:"transaction_id,type:text,notnull"`
CreatedAt time.Time `bun:"created_at,type:datetime,notnull"`
UpdatedAt time.Time `bun:"updated_at,type:datetime,notnull"`
TableName string `bun:"table_name,type:text,notnull"`
TTL int `bun:"ttl,notnull,default:0"`
ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"`
Status string `bun:"status,type:text,notnull"`
}
type newTTLStatus struct {
bun.BaseModel `bun:"table:ttl_setting"`
types.Identifiable
types.TimeAuditable
TransactionID string `bun:"transaction_id,type:text,notnull"`
TableName string `bun:"table_name,type:text,notnull"`
TTL int `bun:"ttl,notnull,default:0"`
ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"`
Status string `bun:"status,type:text,notnull"`
OrgID string `json:"-" bun:"org_id,notnull"`
}
func NewUpdateApdexTtlFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
return factory.
NewProviderFactory(
factory.MustNewName("update_apdex_ttl"),
func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return newUpdateApdexTtl(ctx, ps, c, sqlstore)
})
}
func newUpdateApdexTtl(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) {
return &updateApdexTtl{store: store}, nil
}
func (migration *updateApdexTtl) Register(migrations *migrate.Migrations) error {
if err := migrations.
Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *updateApdexTtl) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.
BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
err = migration.
store.
Dialect().
RenameTableAndModifyModel(ctx, tx, new(existingApdexSettings), new(newApdexSettings), OrgReference, func(ctx context.Context) error {
existingApdexSettings := make([]*existingApdexSettings, 0)
err = tx.
NewSelect().
Model(&existingApdexSettings).
Scan(ctx)
if err != nil {
if err != sql.ErrNoRows {
return err
}
}
if err == nil && len(existingApdexSettings) > 0 {
newSettings := migration.
CopyExistingApdexSettingsToNewApdexSettings(existingApdexSettings)
_, err = tx.
NewInsert().
Model(&newSettings).
Exec(ctx)
if err != nil {
return err
}
}
tableName := tx.Dialect().Tables().Get(reflect.TypeOf(new(newApdexSettings))).Name
_, err = tx.
ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX IF NOT EXISTS %s_unique_idx ON %s (service_name, org_id)", tableName, tableName))
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
err = migration.
store.
Dialect().
RenameTableAndModifyModel(ctx, tx, new(existingTTLStatus), new(newTTLStatus), OrgReference, func(ctx context.Context) error {
existingTTLStatus := make([]*existingTTLStatus, 0)
err = tx.
NewSelect().
Model(&existingTTLStatus).
Scan(ctx)
if err != nil {
if err != sql.ErrNoRows {
return err
}
}
var orgIDs []string
if err := migration.
store.
BunDB().
NewSelect().
Model((*types.Organization)(nil)).
Column("id").
Scan(ctx, &orgIDs); err != nil {
return err
}
if len(orgIDs) > 1 {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "cannot have more than one org id")
}
if err == nil && len(existingTTLStatus) > 0 {
newTTLStatus := migration.
CopyExistingTTLStatusToNewTTLStatus(existingTTLStatus, orgIDs[0])
_, err = tx.
NewInsert().
Model(&newTTLStatus).
Exec(ctx)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return err
}
return nil
}
func (migration *updateApdexTtl) Down(context.Context, *bun.DB) error {
return nil
}
func (migration *updateApdexTtl) CopyExistingApdexSettingsToNewApdexSettings(existingApdexSettings []*existingApdexSettings) []*newApdexSettings {
newSettings := make([]*newApdexSettings, 0)
for _, apdexSetting := range existingApdexSettings {
newSettings = append(newSettings, &newApdexSettings{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
ServiceName: apdexSetting.ServiceName,
Threshold: apdexSetting.Threshold,
ExcludeStatusCodes: apdexSetting.ExcludeStatusCodes,
OrgID: apdexSetting.OrgID,
})
}
return newSettings
}
func (migration *updateApdexTtl) CopyExistingTTLStatusToNewTTLStatus(existingTTLStatus []*existingTTLStatus, orgID string) []*newTTLStatus {
newTTLStatuses := make([]*newTTLStatus, 0)
for _, ttl := range existingTTLStatus {
newTTLStatuses = append(newTTLStatuses, &newTTLStatus{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: ttl.CreatedAt,
UpdatedAt: ttl.UpdatedAt,
},
TransactionID: ttl.TransactionID,
TTL: ttl.TTL,
TableName: ttl.TableName,
ColdStorageTTL: ttl.ColdStorageTTL,
Status: ttl.Status,
OrgID: orgID,
})
}
return newTTLStatuses
}

View File

@@ -0,0 +1,118 @@
package sqlmigration
import (
"context"
"database/sql"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type updateResetPassword struct {
store sqlstore.SQLStore
}
type existingResetPasswordRequest struct {
bun.BaseModel `bun:"table:reset_password_request"`
ID int `bun:"id,pk,autoincrement" json:"id"`
Token string `bun:"token,type:text,notnull" json:"token"`
UserID string `bun:"user_id,type:text,notnull" json:"userId"`
}
type newResetPasswordRequest struct {
bun.BaseModel `bun:"table:reset_password_request_new"`
types.Identifiable
Token string `bun:"token,type:text,notnull" json:"token"`
UserID string `bun:"user_id,type:text,notnull" json:"userId"`
}
func NewUpdateResetPasswordFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
return factory.
NewProviderFactory(
factory.MustNewName("update_reset_password"),
func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return newUpdateResetPassword(ctx, ps, c, sqlstore)
})
}
func newUpdateResetPassword(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) {
return &updateResetPassword{store: store}, nil
}
func (migration *updateResetPassword) Register(migrations *migrate.Migrations) error {
if err := migrations.
Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *updateResetPassword) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.
BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
err = migration.store.Dialect().UpdatePrimaryKey(ctx, tx, new(existingResetPasswordRequest), new(newResetPasswordRequest), UserReference, func(ctx context.Context) error {
existingResetPasswordRequests := make([]*existingResetPasswordRequest, 0)
err = tx.
NewSelect().
Model(&existingResetPasswordRequests).
Scan(ctx)
if err != nil {
if err != sql.ErrNoRows {
return err
}
}
if err == nil && len(existingResetPasswordRequests) > 0 {
newResetPasswordRequests := migration.
CopyExistingResetPasswordRequestsToNewResetPasswordRequests(existingResetPasswordRequests)
_, err = tx.
NewInsert().
Model(&newResetPasswordRequests).
Exec(ctx)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return err
}
return nil
}
func (migration *updateResetPassword) Down(context.Context, *bun.DB) error {
return nil
}
func (migration *updateResetPassword) CopyExistingResetPasswordRequestsToNewResetPasswordRequests(existingPasswordRequests []*existingResetPasswordRequest) []*newResetPasswordRequest {
newResetPasswordRequests := make([]*newResetPasswordRequest, 0)
for _, request := range existingPasswordRequests {
newResetPasswordRequests = append(newResetPasswordRequests, &newResetPasswordRequest{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
Token: request.Token,
UserID: request.UserID,
})
}
return newResetPasswordRequests
}

View File

@@ -25,6 +25,11 @@ var (
ErrNoExecute = errors.New("no execute") ErrNoExecute = errors.New("no execute")
) )
var (
OrgReference = "org"
UserReference = "user"
)
func New( func New(
ctx context.Context, ctx context.Context,
settings factory.ProviderSettings, settings factory.ProviderSettings,

View File

@@ -2,11 +2,29 @@ package sqlitesqlstore
import ( import (
"context" "context"
"fmt"
"reflect" "reflect"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/uptrace/bun" "github.com/uptrace/bun"
) )
var (
Identity = "id"
Integer = "INTEGER"
Text = "TEXT"
)
var (
Org = "org"
User = "user"
)
var (
OrgReference = `("org_id") REFERENCES "organizations" ("id")`
UserReference = `("user_id") REFERENCES "users" ("id") ON DELETE CASCADE ON UPDATE CASCADE`
)
type dialect struct { type dialect struct {
} }
@@ -120,6 +138,22 @@ func (dialect *dialect) ColumnExists(ctx context.Context, bun bun.IDB, table str
return count > 0, nil return count > 0, nil
} }
func (dialect *dialect) IndexExists(ctx context.Context, bun bun.IDB, table string, index string) (bool, error) {
var count int
err := bun.NewSelect().
ColumnExpr("COUNT(*)").
TableExpr("sqlite_master").
Where("type = ?", "index").
Where("name = ?", index).
Scan(ctx, &count)
if err != nil {
return false, err
}
return count > 0, nil
}
func (dialect *dialect) RenameColumn(ctx context.Context, bun bun.IDB, table string, oldColumnName string, newColumnName string) (bool, error) { func (dialect *dialect) RenameColumn(ctx context.Context, bun bun.IDB, table string, oldColumnName string, newColumnName string) (bool, error) {
oldColumnExists, err := dialect.ColumnExists(ctx, bun, table, oldColumnName) oldColumnExists, err := dialect.ColumnExists(ctx, bun, table, oldColumnName)
if err != nil { if err != nil {
@@ -165,7 +199,10 @@ func (dialect *dialect) TableExists(ctx context.Context, bun bun.IDB, table inte
return true, nil return true, nil
} }
func (dialect *dialect) RenameTableAndModifyModel(ctx context.Context, bun bun.IDB, oldModel interface{}, newModel interface{}, cb func(context.Context) error) error { func (dialect *dialect) RenameTableAndModifyModel(ctx context.Context, bun bun.IDB, oldModel interface{}, newModel interface{}, reference string, cb func(context.Context) error) error {
if reference == "" {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "cannot run migration without reference")
}
exists, err := dialect.TableExists(ctx, bun, newModel) exists, err := dialect.TableExists(ctx, bun, newModel)
if err != nil { if err != nil {
return err return err
@@ -174,11 +211,18 @@ func (dialect *dialect) RenameTableAndModifyModel(ctx context.Context, bun bun.I
return nil return nil
} }
fkReference := ""
if reference == Org {
fkReference = OrgReference
} else if reference == User {
fkReference = UserReference
}
_, err = bun. _, err = bun.
NewCreateTable(). NewCreateTable().
IfNotExists(). IfNotExists().
Model(newModel). Model(newModel).
ForeignKey(`("org_id") REFERENCES "organizations" ("id")`). ForeignKey(fkReference).
Exec(ctx) Exec(ctx)
if err != nil { if err != nil {
@@ -201,3 +245,115 @@ func (dialect *dialect) RenameTableAndModifyModel(ctx context.Context, bun bun.I
return nil return nil
} }
func (dialect *dialect) UpdatePrimaryKey(ctx context.Context, bun bun.IDB, oldModel interface{}, newModel interface{}, reference string, cb func(context.Context) error) error {
if reference == "" {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "cannot run migration without reference")
}
oldTableName := bun.Dialect().Tables().Get(reflect.TypeOf(oldModel)).Name
newTableName := bun.Dialect().Tables().Get(reflect.TypeOf(newModel)).Name
columnType, err := dialect.GetColumnType(ctx, bun, oldTableName, Identity)
if err != nil {
return err
}
if columnType == Text {
return nil
}
fkReference := ""
if reference == Org {
fkReference = OrgReference
} else if reference == User {
fkReference = UserReference
}
_, err = bun.
NewCreateTable().
IfNotExists().
Model(newModel).
ForeignKey(fkReference).
Exec(ctx)
if err != nil {
return err
}
err = cb(ctx)
if err != nil {
return err
}
_, err = bun.
NewDropTable().
IfExists().
Model(oldModel).
Exec(ctx)
if err != nil {
return err
}
_, err = bun.
ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", newTableName, oldTableName))
if err != nil {
return err
}
return nil
}
func (dialect *dialect) AddPrimaryKey(ctx context.Context, bun bun.IDB, oldModel interface{}, newModel interface{}, reference string, cb func(context.Context) error) error {
if reference == "" {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "cannot run migration without reference")
}
oldTableName := bun.Dialect().Tables().Get(reflect.TypeOf(oldModel)).Name
newTableName := bun.Dialect().Tables().Get(reflect.TypeOf(newModel)).Name
identityExists, err := dialect.ColumnExists(ctx, bun, oldTableName, Identity)
if err != nil {
return err
}
if identityExists {
return nil
}
fkReference := ""
if reference == Org {
fkReference = OrgReference
} else if reference == User {
fkReference = UserReference
}
_, err = bun.
NewCreateTable().
IfNotExists().
Model(newModel).
ForeignKey(fkReference).
Exec(ctx)
if err != nil {
return err
}
err = cb(ctx)
if err != nil {
return err
}
_, err = bun.
NewDropTable().
IfExists().
Model(oldModel).
Exec(ctx)
if err != nil {
return err
}
_, err = bun.
ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", newTableName, oldTableName))
if err != nil {
return err
}
return nil
}

View File

@@ -42,5 +42,8 @@ type SQLDialect interface {
GetColumnType(context.Context, bun.IDB, string, string) (string, error) GetColumnType(context.Context, bun.IDB, string, string) (string, error)
ColumnExists(context.Context, bun.IDB, string, string) (bool, error) ColumnExists(context.Context, bun.IDB, string, string) (bool, error)
RenameColumn(context.Context, bun.IDB, string, string, string) (bool, error) RenameColumn(context.Context, bun.IDB, string, string, string) (bool, error)
RenameTableAndModifyModel(context.Context, bun.IDB, interface{}, interface{}, func(context.Context) error) error RenameTableAndModifyModel(context.Context, bun.IDB, interface{}, interface{}, string, func(context.Context) error) error
UpdatePrimaryKey(context.Context, bun.IDB, interface{}, interface{}, string, func(context.Context) error) error
AddPrimaryKey(context.Context, bun.IDB, interface{}, interface{}, string, func(context.Context) error) error
IndexExists(context.Context, bun.IDB, string, string) (bool, error)
} }

View File

@@ -29,6 +29,18 @@ func (dialect *dialect) RenameColumn(ctx context.Context, bun bun.IDB, table str
return true, nil return true, nil
} }
func (dialect *dialect) RenameTableAndModifyModel(ctx context.Context, bun bun.IDB, oldModel interface{}, newModel interface{}, cb func(context.Context) error) error { func (dialect *dialect) RenameTableAndModifyModel(ctx context.Context, bun bun.IDB, oldModel interface{}, newModel interface{}, reference string, cb func(context.Context) error) error {
return nil return nil
} }
func (dialect *dialect) UpdatePrimaryKey(ctx context.Context, bun bun.IDB, oldModel interface{}, newModel interface{}, reference string, cb func(context.Context) error) error {
return nil
}
func (dialect *dialect) AddPrimaryKey(ctx context.Context, bun bun.IDB, oldModel interface{}, newModel interface{}, reference string, cb func(context.Context) error) error {
return nil
}
func (dialect *dialect) IndexExists(ctx context.Context, bun bun.IDB, table string, index string) (bool, error) {
return false, nil
}

View File

@@ -3,42 +3,94 @@ package types
import ( import (
"time" "time"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun" "github.com/uptrace/bun"
) )
type Agent struct { type AgentStatus int
bun.BaseModel `bun:"table:agents"`
AgentID string `bun:"agent_id,pk,type:text"` const (
StartedAt time.Time `bun:"started_at,type:datetime,notnull"` AgentStatusUnknown AgentStatus = iota
TerminatedAt time.Time `bun:"terminated_at,type:datetime"` AgentStatusConnected
CurrentStatus string `bun:"current_status,type:text,notnull"` AgentStatusDisconnected
EffectiveConfig string `bun:"effective_config,type:text,notnull"` )
type StorableAgent struct {
bun.BaseModel `bun:"table:agents"`
Identifiable
OrgID string `json:"orgId" yaml:"orgId" bun:"org_id,type:text"`
StartedAt time.Time `json:"startedAt" yaml:"startedAt" bun:"started_at,type:datetime,notnull"`
TerminatedAt time.Time `json:"terminatedAt" yaml:"terminatedAt" bun:"terminated_at,type:datetime"`
CurrentStatus AgentStatus `json:"currentStatus" yaml:"currentStatus" bun:"current_status,type:text,notnull"`
EffectiveConfig string `bun:"effective_config,type:text,notnull"`
} }
type AgentConfigVersion struct { type ElementTypeDef string
bun.BaseModel `bun:"table:agent_config_versions"`
ID string `bun:"id,pk,type:text"` const (
CreatedBy string `bun:"created_by,type:text"` ElementTypeSamplingRules ElementTypeDef = "sampling_rules"
CreatedAt time.Time `bun:"created_at,default:CURRENT_TIMESTAMP"` ElementTypeDropRules ElementTypeDef = "drop_rules"
UpdatedBy string `bun:"updated_by,type:text"` ElementTypeLogPipelines ElementTypeDef = "log_pipelines"
UpdatedAt time.Time `bun:"updated_at,default:CURRENT_TIMESTAMP"` ElementTypeLbExporter ElementTypeDef = "lb_exporter"
Version int `bun:"version,default:1,unique:element_version_idx"` )
Active int `bun:"active"`
IsValid int `bun:"is_valid"` type DeployStatus string
Disabled int `bun:"disabled"`
ElementType string `bun:"element_type,notnull,type:varchar(120),unique:element_version_idx"` const (
DeployStatus string `bun:"deploy_status,notnull,type:varchar(80),default:'DIRTY'"` PendingDeploy DeployStatus = "DIRTY"
DeploySequence int `bun:"deploy_sequence"` Deploying DeployStatus = "DEPLOYING"
DeployResult string `bun:"deploy_result,type:text"` Deployed DeployStatus = "DEPLOYED"
LastHash string `bun:"last_hash,type:text"` DeployInitiated DeployStatus = "IN_PROGRESS"
LastConfig string `bun:"last_config,type:text"` DeployFailed DeployStatus = "FAILED"
DeployStatusUnknown DeployStatus = "UNKNOWN"
)
type AgentConfigVersion struct {
bun.BaseModel `bun:"table:agent_config_versions,alias:acv"`
TimeAuditable
UserAuditable
CreatedByName string `json:"createdByName" bun:"created_by_name,scanonly"`
Identifiable
OrgID string `json:"orgId" bun:"org_id,type:text"`
Version int `json:"version" bun:"version,default:1,unique:element_version_idx"`
Active bool `json:"active" bun:"active"`
IsValid bool `json:"is_valid" bun:"is_valid"`
Disabled bool `json:"disabled" bun:"disabled"`
ElementType ElementTypeDef `json:"elementType" bun:"element_type,notnull,type:varchar(120),unique:element_version_idx"`
DeployStatus DeployStatus `json:"deployStatus" bun:"deploy_status,notnull,type:varchar(80),default:'DIRTY'"`
DeploySequence int `json:"deploySequence" bun:"deploy_sequence"`
DeployResult string `json:"deployResult" bun:"deploy_result,type:text"`
LastHash string `json:"lastHash" bun:"last_hash,type:text"`
LastConfig string `json:"lastConfig" bun:"last_config,type:text"`
}
func NewAgentConfigVersion(orgId string, typeDef ElementTypeDef) *AgentConfigVersion {
return &AgentConfigVersion{
OrgID: orgId,
Identifiable: Identifiable{ID: valuer.GenerateUUID()},
ElementType: typeDef,
Active: false,
IsValid: false,
Disabled: false,
DeployStatus: PendingDeploy,
LastHash: "",
LastConfig: "{}",
}
}
func UpdateVersion(v int) int {
return v + 1
} }
type AgentConfigElement struct { type AgentConfigElement struct {
bun.BaseModel `bun:"table:agent_config_elements"` bun.BaseModel `bun:"table:agent_config_elements"`
ID string `bun:"id,pk,type:text"` Identifiable
OrgID string `bun:"org_id,type:text"`
CreatedBy string `bun:"created_by,type:text"` CreatedBy string `bun:"created_by,type:text"`
CreatedAt time.Time `bun:"created_at,default:CURRENT_TIMESTAMP"` CreatedAt time.Time `bun:"created_at,default:CURRENT_TIMESTAMP"`
UpdatedBy string `bun:"updated_by,type:text"` UpdatedBy string `bun:"updated_by,type:text"`

View File

@@ -7,6 +7,8 @@ import (
"time" "time"
"github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/config"
"github.com/uptrace/bun" "github.com/uptrace/bun"
) )
@@ -27,15 +29,14 @@ type GettableChannels = []*Channel
// Channel represents a single receiver of the alertmanager config. // Channel represents a single receiver of the alertmanager config.
type Channel struct { type Channel struct {
bun.BaseModel `bun:"table:notification_channels"` bun.BaseModel `bun:"table:notification_channel"`
ID int `json:"id" bun:"id,pk,autoincrement"` types.Identifiable
Name string `json:"name" bun:"name"` types.TimeAuditable
Type string `json:"type" bun:"type"` Name string `json:"name" bun:"name"`
Data string `json:"data" bun:"data"` Type string `json:"type" bun:"type"`
CreatedAt time.Time `json:"created_at" bun:"created_at"` Data string `json:"data" bun:"data"`
UpdatedAt time.Time `json:"updated_at" bun:"updated_at"` OrgID string `json:"org_id" bun:"org_id"`
OrgID string `json:"org_id" bun:"org_id"`
} }
// NewChannelFromReceiver creates a new Channel from a Receiver. // NewChannelFromReceiver creates a new Channel from a Receiver.
@@ -47,10 +48,15 @@ func NewChannelFromReceiver(receiver config.Receiver, orgID string) *Channel {
// Initialize channel with common fields // Initialize channel with common fields
channel := Channel{ channel := Channel{
Name: receiver.Name, Identifiable: types.Identifiable{
CreatedAt: time.Now(), ID: valuer.GenerateUUID(),
UpdatedAt: time.Now(), },
OrgID: orgID, TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
Name: receiver.Name,
OrgID: orgID,
} }
// Use reflection to examine receiver struct fields // Use reflection to examine receiver struct fields
@@ -120,14 +126,14 @@ func NewConfigFromChannels(globalConfig GlobalConfig, routeConfig RouteConfig, c
return cfg, nil return cfg, nil
} }
func GetChannelByID(channels Channels, id int) (int, *Channel, error) { func GetChannelByID(channels Channels, id valuer.UUID) (int, *Channel, error) {
for i, channel := range channels { for i, channel := range channels {
if channel.ID == id { if channel.ID == id {
return i, channel, nil return i, channel, nil
} }
} }
return 0, nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", id) return 0, nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %s", id.StringValue())
} }
func GetChannelByName(channels Channels, name string) (int, *Channel, error) { func GetChannelByName(channels Channels, name string) (int, *Channel, error) {
@@ -143,7 +149,7 @@ func GetChannelByName(channels Channels, name string) (int, *Channel, error) {
func (c *Channel) Update(receiver Receiver) error { func (c *Channel) Update(receiver Receiver) error {
channel := NewChannelFromReceiver(receiver, c.OrgID) channel := NewChannelFromReceiver(receiver, c.OrgID)
if channel == nil { if channel == nil {
return errors.Newf(errors.TypeInvalidInput, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", c.ID) return errors.Newf(errors.TypeInvalidInput, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %s", c.ID.StringValue())
} }
if c.Name != channel.Name { if c.Name != channel.Name {

View File

@@ -10,6 +10,8 @@ import (
"dario.cat/mergo" "dario.cat/mergo"
"github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/config"
commoncfg "github.com/prometheus/common/config" commoncfg "github.com/prometheus/common/config"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
@@ -41,12 +43,11 @@ type RouteConfig struct {
type StoreableConfig struct { type StoreableConfig struct {
bun.BaseModel `bun:"table:alertmanager_config"` bun.BaseModel `bun:"table:alertmanager_config"`
ID uint64 `bun:"id,pk,autoincrement"` types.Identifiable
Config string `bun:"config"` types.TimeAuditable
Hash string `bun:"hash"` Config string `bun:"config"`
CreatedAt time.Time `bun:"created_at"` Hash string `bun:"hash"`
UpdatedAt time.Time `bun:"updated_at"` OrgID string `bun:"org_id"`
OrgID string `bun:"org_id"`
} }
// Config is the type for the entire alertmanager configuration // Config is the type for the entire alertmanager configuration
@@ -63,11 +64,16 @@ func NewConfig(c *config.Config, orgID string) *Config {
return &Config{ return &Config{
alertmanagerConfig: c, alertmanagerConfig: c,
storeableConfig: &StoreableConfig{ storeableConfig: &StoreableConfig{
Config: raw, Identifiable: types.Identifiable{
Hash: fmt.Sprintf("%x", newConfigHash(raw)), ID: valuer.GenerateUUID(),
CreatedAt: time.Now(), },
UpdatedAt: time.Now(), TimeAuditable: types.TimeAuditable{
OrgID: orgID, CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
Config: raw,
Hash: fmt.Sprintf("%x", newConfigHash(raw)),
OrgID: orgID,
}, },
} }
} }
@@ -370,13 +376,13 @@ type ConfigStore interface {
CreateChannel(context.Context, *Channel, ...StoreOption) error CreateChannel(context.Context, *Channel, ...StoreOption) error
// GetChannelByID returns the channel for the given id. // GetChannelByID returns the channel for the given id.
GetChannelByID(context.Context, string, int) (*Channel, error) GetChannelByID(context.Context, string, valuer.UUID) (*Channel, error)
// UpdateChannel updates a channel. // UpdateChannel updates a channel.
UpdateChannel(context.Context, string, *Channel, ...StoreOption) error UpdateChannel(context.Context, string, *Channel, ...StoreOption) error
// DeleteChannelByID deletes a channel. // DeleteChannelByID deletes a channel.
DeleteChannelByID(context.Context, string, int, ...StoreOption) error DeleteChannelByID(context.Context, string, valuer.UUID, ...StoreOption) error
// ListChannels returns the list of channels. // ListChannels returns the list of channels.
ListChannels(context.Context, string) ([]*Channel, error) ListChannels(context.Context, string) ([]*Channel, error)

View File

@@ -6,6 +6,8 @@ import (
"time" "time"
"github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/cluster"
"github.com/uptrace/bun" "github.com/uptrace/bun"
) )
@@ -28,19 +30,23 @@ var (
type StoreableState struct { type StoreableState struct {
bun.BaseModel `bun:"table:alertmanager_state"` bun.BaseModel `bun:"table:alertmanager_state"`
ID uint64 `bun:"id,pk,autoincrement"` types.Identifiable
Silences string `bun:"silences,nullzero"` types.TimeAuditable
NFLog string `bun:"nflog,nullzero"` Silences string `bun:"silences,nullzero"`
CreatedAt time.Time `bun:"created_at"` NFLog string `bun:"nflog,nullzero"`
UpdatedAt time.Time `bun:"updated_at"` OrgID string `bun:"org_id"`
OrgID string `bun:"org_id"`
} }
func NewStoreableState(orgID string) *StoreableState { func NewStoreableState(orgID string) *StoreableState {
return &StoreableState{ return &StoreableState{
OrgID: orgID, Identifiable: types.Identifiable{
CreatedAt: time.Now(), ID: valuer.GenerateUUID(),
UpdatedAt: time.Now(), },
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
OrgID: orgID,
} }
} }

View File

@@ -94,15 +94,14 @@ type PlannedMaintenance struct {
UpdatedBy string `bun:"updated_by,type:text,notnull"` UpdatedBy string `bun:"updated_by,type:text,notnull"`
} }
type TTLStatus struct { type TTLSetting struct {
bun.BaseModel `bun:"table:ttl_status"` bun.BaseModel `bun:"table:ttl_setting"`
Identifiable
ID int `bun:"id,pk,autoincrement"` TimeAuditable
TransactionID string `bun:"transaction_id,type:text,notnull"` TransactionID string `bun:"transaction_id,type:text,notnull"`
CreatedAt time.Time `bun:"created_at,type:datetime,notnull"` TableName string `bun:"table_name,type:text,notnull"`
UpdatedAt time.Time `bun:"updated_at,type:datetime,notnull"` TTL int `bun:"ttl,notnull,default:0"`
TableName string `bun:"table_name,type:text,notnull"` ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"`
TTL int `bun:"ttl,notnull,default:0"` Status string `bun:"status,type:text,notnull"`
ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"` OrgID string `json:"-" bun:"org_id,notnull"`
Status string `bun:"status,type:text,notnull"`
} }

View File

@@ -7,7 +7,6 @@ import (
// TODO: check constraints are not working // TODO: check constraints are not working
type Organization struct { type Organization struct {
bun.BaseModel `bun:"table:organizations"` bun.BaseModel `bun:"table:organizations"`
TimeAuditable TimeAuditable
ID string `bun:"id,pk,type:text" json:"id"` ID string `bun:"id,pk,type:text" json:"id"`
Name string `bun:"name,type:text,notnull" json:"name"` Name string `bun:"name,type:text,notnull" json:"name"`
@@ -16,8 +15,10 @@ type Organization struct {
} }
type ApdexSettings struct { type ApdexSettings struct {
OrgID string `bun:"org_id,pk,type:text" json:"orgId"` bun.BaseModel `bun:"table:apdex_setting"`
ServiceName string `bun:"service_name,pk,type:text" json:"serviceName"` Identifiable
OrgID string `bun:"org_id,type:text" json:"orgId"`
ServiceName string `bun:"service_name,type:text" json:"serviceName"`
Threshold float64 `bun:"threshold,type:float,notnull" json:"threshold"` Threshold float64 `bun:"threshold,type:float,notnull" json:"threshold"`
ExcludeStatusCodes string `bun:"exclude_status_codes,type:text,notnull" json:"excludeStatusCodes"` ExcludeStatusCodes string `bun:"exclude_status_codes,type:text,notnull" json:"excludeStatusCodes"`
} }

View File

@@ -1,21 +0,0 @@
package types
import "github.com/uptrace/bun"
// on_delete:CASCADE,on_update:CASCADE not working
type UserPreference struct {
bun.BaseModel `bun:"table:user_preference"`
PreferenceID string `bun:"preference_id,type:text,pk"`
PreferenceValue string `bun:"preference_value,type:text"`
UserID string `bun:"user_id,type:text,pk"`
}
// on_delete:CASCADE,on_update:CASCADE not working
type OrgPreference struct {
bun.BaseModel `bun:"table:org_preference"`
PreferenceID string `bun:"preference_id,pk,type:text,notnull"`
PreferenceValue string `bun:"preference_value,type:text,notnull"`
OrgID string `bun:"org_id,pk,type:text,notnull"`
}

View File

@@ -0,0 +1,290 @@
package preferencetypes
import (
"context"
"fmt"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/uptrace/bun"
)
type GettablePreference struct {
PreferenceID string `json:"preference_id" db:"preference_id"`
PreferenceValue interface{} `json:"preference_value" db:"preference_value"`
}
type UpdatablePreference struct {
PreferenceValue interface{} `json:"preference_value" db:"preference_value"`
}
type StorableOrgPreference struct {
bun.BaseModel `bun:"table:org_preference"`
types.Identifiable
PreferenceID string `bun:"preference_id,type:text,notnull"`
PreferenceValue string `bun:"preference_value,type:text,notnull"`
OrgID string `bun:"org_id,type:text,notnull"`
}
type StorableUserPreference struct {
bun.BaseModel `bun:"table:user_preference"`
types.Identifiable
PreferenceID string `bun:"preference_id,type:text,notnull"`
PreferenceValue string `bun:"preference_value,type:text,notnull"`
UserID string `bun:"user_id,type:text,notnull"`
}
type Preference struct {
Key string `json:"key"`
Name string `json:"name"`
Description string `json:"description"`
ValueType string `json:"valueType"`
DefaultValue interface{} `json:"defaultValue"`
AllowedValues []interface{} `json:"allowedValues"`
IsDiscreteValues bool `json:"isDiscreteValues"`
Range Range `json:"range"`
AllowedScopes []string `json:"allowedScopes"`
}
func NewDefaultPreferenceMap() map[string]Preference {
return map[string]Preference{
"ORG_ONBOARDING": {
Key: "ORG_ONBOARDING",
Name: "Organisation Onboarding",
Description: "Organisation Onboarding",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"org"},
},
"WELCOME_CHECKLIST_DO_LATER": {
Key: "WELCOME_CHECKLIST_DO_LATER",
Name: "Welcome Checklist Do Later",
Description: "Welcome Checklist Do Later",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"user"},
},
"WELCOME_CHECKLIST_SEND_LOGS_SKIPPED": {
Key: "WELCOME_CHECKLIST_SEND_LOGS_SKIPPED",
Name: "Welcome Checklist Send Logs Skipped",
Description: "Welcome Checklist Send Logs Skipped",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"user"},
},
"WELCOME_CHECKLIST_SEND_TRACES_SKIPPED": {
Key: "WELCOME_CHECKLIST_SEND_TRACES_SKIPPED",
Name: "Welcome Checklist Send Traces Skipped",
Description: "Welcome Checklist Send Traces Skipped",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"user"},
},
"WELCOME_CHECKLIST_SEND_INFRA_METRICS_SKIPPED": {
Key: "WELCOME_CHECKLIST_SEND_INFRA_METRICS_SKIPPED",
Name: "Welcome Checklist Send Infra Metrics Skipped",
Description: "Welcome Checklist Send Infra Metrics Skipped",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"user"},
},
"WELCOME_CHECKLIST_SETUP_DASHBOARDS_SKIPPED": {
Key: "WELCOME_CHECKLIST_SETUP_DASHBOARDS_SKIPPED",
Name: "Welcome Checklist Setup Dashboards Skipped",
Description: "Welcome Checklist Setup Dashboards Skipped",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"user"},
},
"WELCOME_CHECKLIST_SETUP_ALERTS_SKIPPED": {
Key: "WELCOME_CHECKLIST_SETUP_ALERTS_SKIPPED",
Name: "Welcome Checklist Setup Alerts Skipped",
Description: "Welcome Checklist Setup Alerts Skipped",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"user"},
},
"WELCOME_CHECKLIST_SETUP_SAVED_VIEW_SKIPPED": {
Key: "WELCOME_CHECKLIST_SETUP_SAVED_VIEW_SKIPPED",
Name: "Welcome Checklist Setup Saved View Skipped",
Description: "Welcome Checklist Setup Saved View Skipped",
ValueType: "boolean",
DefaultValue: false,
AllowedValues: []interface{}{true, false},
IsDiscreteValues: true,
AllowedScopes: []string{"user"},
},
}
}
func (p *Preference) ErrorValueTypeMismatch() error {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, fmt.Sprintf("the preference value is not of expected type: %s", p.ValueType))
}
func (p *Preference) checkIfInAllowedValues(preferenceValue interface{}) (bool, error) {
switch p.ValueType {
case PreferenceValueTypeInteger:
_, ok := preferenceValue.(int64)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
case PreferenceValueTypeFloat:
_, ok := preferenceValue.(float64)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
case PreferenceValueTypeString:
_, ok := preferenceValue.(string)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
case PreferenceValueTypeBoolean:
_, ok := preferenceValue.(bool)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
}
isInAllowedValues := false
for _, value := range p.AllowedValues {
switch p.ValueType {
case PreferenceValueTypeInteger:
allowedValue, ok := value.(int64)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
if allowedValue == preferenceValue {
isInAllowedValues = true
}
case PreferenceValueTypeFloat:
allowedValue, ok := value.(float64)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
if allowedValue == preferenceValue {
isInAllowedValues = true
}
case PreferenceValueTypeString:
allowedValue, ok := value.(string)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
if allowedValue == preferenceValue {
isInAllowedValues = true
}
case PreferenceValueTypeBoolean:
allowedValue, ok := value.(bool)
if !ok {
return false, p.ErrorValueTypeMismatch()
}
if allowedValue == preferenceValue {
isInAllowedValues = true
}
}
}
return isInAllowedValues, nil
}
func (p *Preference) IsValidValue(preferenceValue interface{}) error {
typeSafeValue := preferenceValue
switch p.ValueType {
case PreferenceValueTypeInteger:
val, ok := preferenceValue.(int64)
if !ok {
floatVal, ok := preferenceValue.(float64)
if !ok || floatVal != float64(int64(floatVal)) {
return p.ErrorValueTypeMismatch()
}
val = int64(floatVal)
typeSafeValue = val
}
if !p.IsDiscreteValues {
if val < p.Range.Min || val > p.Range.Max {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, fmt.Sprintf("the preference value is not in the range specified, min: %v , max:%v", p.Range.Min, p.Range.Max))
}
}
case PreferenceValueTypeString:
_, ok := preferenceValue.(string)
if !ok {
return p.ErrorValueTypeMismatch()
}
case PreferenceValueTypeFloat:
_, ok := preferenceValue.(float64)
if !ok {
return p.ErrorValueTypeMismatch()
}
case PreferenceValueTypeBoolean:
_, ok := preferenceValue.(bool)
if !ok {
return p.ErrorValueTypeMismatch()
}
}
// check the validity of the value being part of allowed values or the range specified if any
if p.IsDiscreteValues {
if p.AllowedValues != nil {
isInAllowedValues, valueMisMatchErr := p.checkIfInAllowedValues(typeSafeValue)
if valueMisMatchErr != nil {
return valueMisMatchErr
}
if !isInAllowedValues {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, fmt.Sprintf("the preference value is not in the list of allowedValues: %v", p.AllowedValues))
}
}
}
return nil
}
func (p *Preference) IsEnabledForScope(scope string) bool {
isPreferenceEnabledForGivenScope := false
if p.AllowedScopes != nil {
for _, allowedScope := range p.AllowedScopes {
if allowedScope == strings.ToLower(scope) {
isPreferenceEnabledForGivenScope = true
}
}
}
return isPreferenceEnabledForGivenScope
}
func (p *Preference) SanitizeValue(preferenceValue interface{}) interface{} {
switch p.ValueType {
case PreferenceValueTypeBoolean:
if preferenceValue == "1" || preferenceValue == true || preferenceValue == "true" {
return true
} else {
return false
}
default:
return preferenceValue
}
}
type PreferenceStore interface {
GetOrgPreference(context.Context, string, string) (*StorableOrgPreference, error)
GetAllOrgPreferences(context.Context, string) ([]*StorableOrgPreference, error)
UpsertOrgPreference(context.Context, *StorableOrgPreference) error
GetUserPreference(context.Context, string, string) (*StorableUserPreference, error)
GetAllUserPreferences(context.Context, string) ([]*StorableUserPreference, error)
UpsertUserPreference(context.Context, *StorableUserPreference) error
}

View File

@@ -0,0 +1,23 @@
package preferencetypes
const (
PreferenceValueTypeInteger string = "integer"
PreferenceValueTypeFloat string = "float"
PreferenceValueTypeString string = "string"
PreferenceValueTypeBoolean string = "boolean"
)
const (
OrgAllowedScope string = "org"
UserAllowedScope string = "user"
)
type Range struct {
Min int64 `json:"min"`
Max int64 `json:"max"`
}
type PreferenceWithValue struct {
Preference
Value interface{} `json:"value"`
}

View File

@@ -46,7 +46,7 @@ type User struct {
type ResetPasswordRequest struct { type ResetPasswordRequest struct {
bun.BaseModel `bun:"table:reset_password_request"` bun.BaseModel `bun:"table:reset_password_request"`
ID int `bun:"id,pk,autoincrement" json:"id"` Identifiable
Token string `bun:"token,type:text,notnull" json:"token"` Token string `bun:"token,type:text,notnull" json:"token"`
UserID string `bun:"user_id,type:text,notnull" json:"userId"` UserID string `bun:"user_id,type:text,notnull" json:"userId"`
} }