Compare commits

..

18 Commits

Author SHA1 Message Date
grandwizard28
b10a9061ec refactor(.): final refactor 2025-01-16 17:34:03 +05:30
grandwizard28
fae0581d83 refactor(.): final refactor 2025-01-16 17:29:37 +05:30
grandwizard28
9d6928b3e6 feat(config): add tests for conf 2025-01-16 15:10:21 +05:30
grandwizard28
baaffd79de test(uri): add tests for uri 2025-01-16 14:11:07 +05:30
grandwizard28
59c416c222 refactor(config): move to top level 2025-01-16 14:05:21 +05:30
grandwizard28
63a7ae586b ci(go): run go mod tidy 2025-01-15 17:17:26 +05:30
grandwizard28
f0cef2d9d5 feat(instrumentation): initialize instrumentation 2025-01-15 17:16:46 +05:30
grandwizard28
616ada91dd feat(instrumentation): add a test instrumentation package 2025-01-15 17:16:46 +05:30
grandwizard28
aab6a1c914 feat(instrumentation): move to instrumentation package 2025-01-15 17:16:45 +05:30
grandwizard28
bc708cd891 feat(factory): embrace the factory pattern 2025-01-15 17:16:45 +05:30
grandwizard28
c2a11960c1 feat(migrations): add cloud integrations 2025-01-15 17:16:45 +05:30
grandwizard28
6d76d56dbd ci(git): rebase with main 2025-01-15 17:16:45 +05:30
grandwizard28
1977756591 feat(signoz): refactor connection config struct 2025-01-15 17:16:45 +05:30
grandwizard28
1d9c10a214 feat(signoz): make mattn the permanent driver 2025-01-15 17:16:45 +05:30
grandwizard28
57a25bf98f feat(signoz): remove references to sqlite 2025-01-15 17:16:45 +05:30
grandwizard28
35c310aa9d fix(tests): fix unit tests 2025-01-15 17:16:44 +05:30
grandwizard28
7c0481de7d refactor(migrations): move migrations into a single package 2025-01-15 17:16:42 +05:30
grandwizard28
147cf28024 refactor(config): refactor config package 2025-01-15 17:16:10 +05:30
124 changed files with 1945 additions and 3736 deletions

View File

@@ -3,8 +3,36 @@
# Do not modify this file
#
##################### Instrumentation #####################
instrumentation:
logs:
level: info
enabled: false
processors:
batch:
exporter:
otlp:
endpoint: localhost:4317
traces:
enabled: false
processors:
batch:
exporter:
otlp:
endpoint: localhost:4317
metrics:
enabled: true
readers:
pull:
exporter:
prometheus:
host: "0.0.0.0"
port: 9090
##################### Web #####################
web:
# Whether to enable the web frontend
enabled: true
# The prefix to serve web on
prefix: /
# The directory containing the static build files.
@@ -29,4 +57,14 @@ cache:
# The password for authenticating with the Redis server, if required.
password:
# The Redis database number to use
db: 0
db: 0
##################### SQLStore #####################
sqlstore:
# specifies the SQLStore provider to use.
provider: sqlite
# The maximum number of open connections to the database.
max_open_conns: 100
sqlite:
# The path to the SQLite database file.
path: /var/lib/signoz/signoz.db

View File

@@ -64,7 +64,6 @@ import (
const AppDbEngine = "sqlite"
type ServerOptions struct {
SigNoz *signoz.SigNoz
PromConfigPath string
SkipTopLvlOpsPath string
HTTPHostPort string
@@ -82,7 +81,6 @@ type ServerOptions struct {
GatewayUrl string
UseLogsNewSchema bool
UseTraceNewSchema bool
SkipWebFrontend bool
}
// Server runs HTTP api service
@@ -112,26 +110,15 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status {
}
// NewServer creates and initializes Server
func NewServer(serverOptions *ServerOptions) (*Server, error) {
modelDao, err := dao.InitDao("sqlite", baseconst.RELATIONAL_DATASOURCE_PATH)
func NewServer(serverOptions *ServerOptions, config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
modelDao, err := dao.InitDao(signoz.SQLStore.SQLxDB())
if err != nil {
return nil, err
}
baseexplorer.InitWithDSN(baseconst.RELATIONAL_DATASOURCE_PATH)
if err := preferences.InitDB(baseconst.RELATIONAL_DATASOURCE_PATH); err != nil {
return nil, err
}
localDB, err := dashboards.InitDB(baseconst.RELATIONAL_DATASOURCE_PATH)
if err != nil {
return nil, err
}
localDB.SetMaxOpenConns(10)
baseexplorer.InitWithDB(signoz.SQLStore.SQLxDB())
preferences.InitDB(signoz.SQLStore.SQLxDB())
dashboards.InitDB(signoz.SQLStore.SQLxDB())
gatewayProxy, err := gateway.NewProxy(serverOptions.GatewayUrl, gateway.RoutePrefix)
if err != nil {
@@ -139,7 +126,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
// initiate license manager
lm, err := licensepkg.StartManager("sqlite", localDB)
lm, err := licensepkg.StartManager("sqlite", signoz.SQLStore.SQLxDB())
if err != nil {
return nil, err
}
@@ -153,7 +140,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
if storage == "clickhouse" {
zap.L().Info("Using ClickHouse as datastore ...")
qb := db.NewDataConnector(
localDB,
signoz.SQLStore.SQLxDB(),
serverOptions.PromConfigPath,
lm,
serverOptions.MaxIdleConns,
@@ -189,7 +176,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
rm, err := makeRulesManager(serverOptions.PromConfigPath,
baseconst.GetAlertManagerApiPrefix(),
serverOptions.RuleRepoURL,
localDB,
signoz.SQLStore.SQLxDB(),
reader,
c,
serverOptions.DisableRules,
@@ -210,19 +197,16 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}()
// initiate opamp
_, err = opAmpModel.InitDB(localDB)
if err != nil {
return nil, err
}
_ = opAmpModel.InitDB(signoz.SQLStore.SQLxDB())
integrationsController, err := integrations.NewController(localDB)
integrationsController, err := integrations.NewController(signoz.SQLStore.SQLxDB())
if err != nil {
return nil, fmt.Errorf(
"couldn't create integrations controller: %w", err,
)
}
cloudIntegrationsController, err := cloudintegrations.NewController(localDB)
cloudIntegrationsController, err := cloudintegrations.NewController(signoz.SQLStore.SQLxDB())
if err != nil {
return nil, fmt.Errorf(
"couldn't create cloud provider integrations controller: %w", err,
@@ -231,7 +215,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
// ingestion pipelines manager
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
localDB, "sqlite", integrationsController.GetPipelinesForInstalledIntegrations,
signoz.SQLStore.SQLxDB(), integrationsController.GetPipelinesForInstalledIntegrations,
)
if err != nil {
return nil, err
@@ -239,8 +223,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
// initiate agent config handler
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: localDB,
DBEngine: AppDbEngine,
DB: signoz.SQLStore.SQLxDB(),
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController},
})
if err != nil {
@@ -302,7 +285,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
usageManager: usageManager,
}
httpServer, err := s.createPublicServer(apiHandler, serverOptions.SigNoz.Web)
httpServer, err := s.createPublicServer(apiHandler, signoz.Web)
if err != nil {
return nil, err
@@ -396,11 +379,9 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
handler = handlers.CompressHandler(handler)
if !s.serverOptions.SkipWebFrontend {
err := web.AddToRouter(r)
if err != nil {
return nil, err
}
err := web.AddToRouter(r)
if err != nil {
return nil, err
}
return &http.Server{

View File

@@ -1,18 +1,11 @@
package dao
import (
"fmt"
"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/ee/query-service/dao/sqlite"
)
func InitDao(engine, path string) (ModelDao, error) {
switch engine {
case "sqlite":
return sqlite.InitDB(path)
default:
return nil, fmt.Errorf("qsdb type: %s is not supported in query service", engine)
}
func InitDao(inputDB *sqlx.DB) (ModelDao, error) {
return sqlite.InitDB(inputDB)
}

View File

@@ -65,8 +65,8 @@ func columnExists(db *sqlx.DB, tableName, columnName string) bool {
}
// InitDB creates and extends base model DB repository
func InitDB(dataSourceName string) (*modelDao, error) {
dao, err := basedsql.InitDB(dataSourceName)
func InitDB(inputDB *sqlx.DB) (*modelDao, error) {
dao, err := basedsql.InitDB(inputDB)
if err != nil {
return nil, err
}

View File

@@ -3,86 +3,28 @@ package main
import (
"context"
"flag"
"log"
"fmt"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.signoz.io/signoz/ee/query-service/app"
"go.signoz.io/signoz/pkg/config"
signozconfig "go.signoz.io/signoz/pkg/config"
"go.signoz.io/signoz/pkg/confmap/provider/signozenvprovider"
"go.signoz.io/signoz/pkg/config/envprovider"
"go.signoz.io/signoz/pkg/instrumentation"
"go.signoz.io/signoz/pkg/query-service/auth"
baseconst "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/migrate"
"go.signoz.io/signoz/pkg/query-service/version"
"go.signoz.io/signoz/pkg/signoz"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pkgversion "go.signoz.io/signoz/pkg/version"
prommodel "github.com/prometheus/common/model"
zapotlpencoder "github.com/SigNoz/zap_otlp/zap_otlp_encoder"
zapotlpsync "github.com/SigNoz/zap_otlp/zap_otlp_sync"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func initZapLog(enableQueryServiceLogOTLPExport bool) *zap.Logger {
config := zap.NewProductionConfig()
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
config.EncoderConfig.EncodeDuration = zapcore.MillisDurationEncoder
config.EncoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
config.EncoderConfig.TimeKey = "timestamp"
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
otlpEncoder := zapotlpencoder.NewOTLPEncoder(config.EncoderConfig)
consoleEncoder := zapcore.NewJSONEncoder(config.EncoderConfig)
defaultLogLevel := zapcore.InfoLevel
res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("query-service"),
)
core := zapcore.NewTee(
zapcore.NewCore(consoleEncoder, os.Stdout, defaultLogLevel),
)
if enableQueryServiceLogOTLPExport {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
conn, err := grpc.DialContext(ctx, baseconst.OTLPTarget, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("failed to establish connection: %v", err)
} else {
logExportBatchSizeInt, err := strconv.Atoi(baseconst.LogExportBatchSize)
if err != nil {
logExportBatchSizeInt = 512
}
ws := zapcore.AddSync(zapotlpsync.NewOtlpSyncer(conn, zapotlpsync.Options{
BatchSize: logExportBatchSizeInt,
ResourceSchema: semconv.SchemaURL,
Resource: res,
}))
core = zapcore.NewTee(
zapcore.NewCore(consoleEncoder, os.Stdout, defaultLogLevel),
zapcore.NewCore(otlpEncoder, zapcore.NewMultiWriteSyncer(ws), defaultLogLevel),
)
}
}
logger := zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel))
return logger
}
func init() {
prommodel.NameValidationScheme = prommodel.UTF8Validation
}
@@ -100,7 +42,6 @@ func main() {
var useLogsNewSchema bool
var useTraceNewSchema bool
var cacheConfigPath, fluxInterval string
var enableQueryServiceLogOTLPExport bool
var preferSpanMetrics bool
var maxIdleConns int
@@ -108,7 +49,6 @@ func main() {
var dialTimeout time.Duration
var gatewayUrl string
var useLicensesV3 bool
var skipWebFrontend bool
flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces")
@@ -122,39 +62,39 @@ func main() {
flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)")
flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)")
flag.StringVar(&fluxInterval, "flux-interval", "5m", "(the interval to exclude data from being cached to avoid incorrect cache for data in motion)")
flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)")
flag.StringVar(&cluster, "cluster", "cluster", "(cluster name - defaults to 'cluster')")
flag.StringVar(&gatewayUrl, "gateway-url", "", "(url to the gateway)")
flag.BoolVar(&useLicensesV3, "use-licenses-v3", false, "use licenses_v3 schema for licenses")
flag.BoolVar(&skipWebFrontend, "skip-web-frontend", false, "skip web frontend")
flag.Parse()
loggerMgr := initZapLog(enableQueryServiceLogOTLPExport)
zap.ReplaceGlobals(loggerMgr)
defer loggerMgr.Sync() // flushes buffer, if any
version.PrintVersion()
config, err := signozconfig.New(context.Background(), signozconfig.ProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{"signozenv:"},
ProviderFactories: []confmap.ProviderFactory{
signozenvprovider.NewFactory(),
},
config, err := signoz.NewConfig(context.Background(), signozconfig.ResolverConfig{
Uris: []string{"env:"},
ProviderFactories: []config.ProviderFactory{
envprovider.NewFactory(),
},
})
if err != nil {
zap.L().Fatal("Failed to create config", zap.Error(err))
}
signoz, err := signoz.New(config, skipWebFrontend)
instrumentation, err := instrumentation.New(context.Background(), pkgversion.Build{}, config.Instrumentation)
if err != nil {
fmt.Println(err, err.Error())
zap.L().Fatal("Failed to create instrumentation", zap.Error(err))
}
defer instrumentation.Stop(context.Background())
zap.ReplaceGlobals(instrumentation.Logger())
defer instrumentation.Logger().Sync() // flushes buffer, if any
signoz, err := signoz.New(context.Background(), instrumentation, config, signoz.NewProviderFactories())
if err != nil {
zap.L().Fatal("Failed to create signoz struct", zap.Error(err))
}
serverOptions := &app.ServerOptions{
SigNoz: signoz,
HTTPHostPort: baseconst.HTTPHostPort,
PromConfigPath: promConfigPath,
SkipTopLvlOpsPath: skipTopLvlOpsPath,
@@ -171,7 +111,6 @@ func main() {
GatewayUrl: gatewayUrl,
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
SkipWebFrontend: skipWebFrontend,
}
// Read the jwt secret key
@@ -183,13 +122,7 @@ func main() {
zap.L().Info("JWT secret key set successfully.")
}
if err := migrate.Migrate(baseconst.RELATIONAL_DATASOURCE_PATH); err != nil {
zap.L().Error("Failed to migrate", zap.Error(err))
} else {
zap.L().Info("Migration successful")
}
server, err := app.NewServer(serverOptions)
server, err := app.NewServer(serverOptions, config, signoz)
if err != nil {
zap.L().Fatal("Failed to create server", zap.Error(err))
}

View File

@@ -1,4 +1,4 @@
import axios from 'api';
import { ApiBaseInstance } from 'api';
import { ErrorResponseHandler } from 'api/ErrorResponseHandler';
import { AxiosError } from 'axios';
import { ErrorResponse, SuccessResponse } from 'types/api';
@@ -59,7 +59,7 @@ export const getHostLists = async (
headers?: Record<string, string>,
): Promise<SuccessResponse<HostListResponse> | ErrorResponse> => {
try {
const response = await axios.post('/hosts/list', props, {
const response = await ApiBaseInstance.post('/hosts/list', props, {
signal,
headers,
});

View File

@@ -58,11 +58,7 @@ import { useTranslation } from 'react-i18next';
import { useMutation } from 'react-query';
import { useCopyToClipboard } from 'react-use';
import { ErrorResponse } from 'types/api';
import {
AddLimitProps,
LimitProps,
UpdateLimitProps,
} from 'types/api/ingestionKeys/limits/types';
import { LimitProps } from 'types/api/ingestionKeys/limits/types';
import {
IngestionKeyProps,
PaginationProps,
@@ -73,18 +69,6 @@ const { Option } = Select;
const BYTES = 1073741824;
const COUNT_MULTIPLIER = {
thousand: 1000,
million: 1000000,
billion: 1000000000,
};
const SIGNALS_CONFIG = [
{ name: 'logs', usesSize: true, usesCount: false },
{ name: 'traces', usesSize: true, usesCount: false },
{ name: 'metrics', usesSize: false, usesCount: true },
];
// Using any type here because antd's DatePicker expects its own internal Dayjs type
// which conflicts with our project's Dayjs type that has additional plugins (tz, utc etc).
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/explicit-module-boundary-types
@@ -92,6 +76,8 @@ export const disabledDate = (current: any): boolean =>
// Disable all dates before today
current && current < dayjs().endOf('day');
const SIGNALS = ['logs', 'traces', 'metrics'];
export const showErrorNotification = (
notifications: NotificationInstance,
err: Error,
@@ -115,31 +101,6 @@ export const API_KEY_EXPIRY_OPTIONS: ExpiryOption[] = [
{ value: '0', label: 'No Expiry' },
];
const countToUnit = (count: number): { value: number; unit: string } => {
if (
count >= COUNT_MULTIPLIER.billion ||
count / COUNT_MULTIPLIER.million >= 1000
) {
return { value: count / COUNT_MULTIPLIER.billion, unit: 'billion' };
}
if (
count >= COUNT_MULTIPLIER.million ||
count / COUNT_MULTIPLIER.thousand >= 1000
) {
return { value: count / COUNT_MULTIPLIER.million, unit: 'million' };
}
if (count >= COUNT_MULTIPLIER.thousand) {
return { value: count / COUNT_MULTIPLIER.thousand, unit: 'thousand' };
}
// Default to million for small numbers
return { value: count / COUNT_MULTIPLIER.million, unit: 'million' };
};
const countFromUnit = (value: number, unit: string): number =>
value *
(COUNT_MULTIPLIER[unit as keyof typeof COUNT_MULTIPLIER] ||
COUNT_MULTIPLIER.million);
function MultiIngestionSettings(): JSX.Element {
const { user } = useAppContext();
const { notifications } = useNotifications();
@@ -220,6 +181,7 @@ function MultiIngestionSettings(): JSX.Element {
const showEditModal = (apiKey: IngestionKeyProps): void => {
setActiveAPIKey(apiKey);
handleFormReset();
setUpdatedTags(apiKey.tags || []);
@@ -462,90 +424,44 @@ function MultiIngestionSettings(): JSX.Element {
addEditLimitForm.resetFields();
};
/* eslint-disable sonarjs/cognitive-complexity */
const handleAddLimit = (
APIKey: IngestionKeyProps,
signalName: string,
): void => {
const {
dailyLimit,
secondsLimit,
dailyCount,
dailyCountUnit,
secondsCount,
secondsCountUnit,
} = addEditLimitForm.getFieldsValue();
const { dailyLimit, secondsLimit } = addEditLimitForm.getFieldsValue();
const payload: AddLimitProps = {
const payload = {
keyID: APIKey.id,
signal: signalName,
config: {},
};
const signalCfg = SIGNALS_CONFIG.find((cfg) => cfg.name === signalName);
if (!signalCfg) return;
// Only set size if usesSize is true
if (signalCfg.usesSize) {
if (!isUndefined(dailyLimit)) {
payload.config.day = {
...payload.config.day,
if (!isUndefined(dailyLimit)) {
payload.config = {
day: {
size: gbToBytes(dailyLimit),
};
}
if (!isUndefined(secondsLimit)) {
payload.config.second = {
...payload.config.second,
},
};
}
if (!isUndefined(secondsLimit)) {
payload.config = {
...payload.config,
second: {
size: gbToBytes(secondsLimit),
};
}
},
};
}
// Only set count if usesCount is true
if (signalCfg.usesCount) {
if (!isUndefined(dailyCount)) {
payload.config.day = {
...payload.config.day,
count: countFromUnit(dailyCount, dailyCountUnit || 'million'),
};
}
if (!isUndefined(secondsCount)) {
payload.config.second = {
...payload.config.second,
count: countFromUnit(secondsCount, secondsCountUnit || 'million'),
};
}
}
// If neither size nor count was given, skip
const noSizeProvided =
isUndefined(dailyLimit) && isUndefined(secondsLimit) && signalCfg.usesSize;
const noCountProvided =
isUndefined(dailyCount) && isUndefined(secondsCount) && signalCfg.usesCount;
if (
signalCfg.usesSize &&
signalCfg.usesCount &&
noSizeProvided &&
noCountProvided
) {
// Both size and count are effectively empty
if (isUndefined(dailyLimit) && isUndefined(secondsLimit)) {
// No need to save as no limit is provided, close the edit view and reset active signal and api key
setActiveSignal(null);
setActiveAPIKey(null);
setIsEditAddLimitOpen(false);
setUpdatedTags([]);
hideAddViewModal();
setHasCreateLimitForIngestionKeyError(false);
return;
}
if (!signalCfg.usesSize && !signalCfg.usesCount) {
// Edge case: If there's no count or size usage at all
setActiveSignal(null);
setActiveAPIKey(null);
setIsEditAddLimitOpen(false);
setUpdatedTags([]);
hideAddViewModal();
return;
}
@@ -556,73 +472,44 @@ function MultiIngestionSettings(): JSX.Element {
APIKey: IngestionKeyProps,
signal: LimitProps,
): void => {
const {
dailyLimit,
secondsLimit,
dailyCount,
dailyCountUnit,
secondsCount,
secondsCountUnit,
} = addEditLimitForm.getFieldsValue();
const payload: UpdateLimitProps = {
const { dailyLimit, secondsLimit } = addEditLimitForm.getFieldsValue();
const payload = {
limitID: signal.id,
signal: signal.signal,
config: {},
};
const signalCfg = SIGNALS_CONFIG.find((cfg) => cfg.name === signal.signal);
if (!signalCfg) return;
const noSizeProvided =
isUndefined(dailyLimit) && isUndefined(secondsLimit) && signalCfg.usesSize;
const noCountProvided =
isUndefined(dailyCount) && isUndefined(secondsCount) && signalCfg.usesCount;
// If the user cleared out all fields, remove the limit
if (noSizeProvided && noCountProvided) {
if (isUndefined(dailyLimit) && isUndefined(secondsLimit)) {
showDeleteLimitModal(APIKey, signal);
return;
}
if (signalCfg.usesSize) {
if (!isUndefined(dailyLimit)) {
payload.config.day = {
...payload.config.day,
if (!isUndefined(dailyLimit)) {
payload.config = {
day: {
size: gbToBytes(dailyLimit),
};
}
if (!isUndefined(secondsLimit)) {
payload.config.second = {
...payload.config.second,
size: gbToBytes(secondsLimit),
};
}
},
};
}
if (signalCfg.usesCount) {
if (!isUndefined(dailyCount)) {
payload.config.day = {
...payload.config.day,
count: countFromUnit(dailyCount, dailyCountUnit || 'million'),
};
}
if (!isUndefined(secondsCount)) {
payload.config.second = {
...payload.config.second,
count: countFromUnit(secondsCount, secondsCountUnit || 'million'),
};
}
if (!isUndefined(secondsLimit)) {
payload.config = {
...payload.config,
second: {
size: gbToBytes(secondsLimit),
},
};
}
updateLimitForIngestionKey(payload);
};
/* eslint-enable sonarjs/cognitive-complexity */
const bytesToGb = (size: number | undefined): number => {
if (!size) {
return 0;
}
return size / BYTES;
};
@@ -630,12 +517,6 @@ function MultiIngestionSettings(): JSX.Element {
APIKey: IngestionKeyProps,
signal: LimitProps,
): void => {
const dayCount = signal?.config?.day?.count;
const secondCount = signal?.config?.second?.count;
const dayCountConverted = countToUnit(dayCount || 0);
const secondCountConverted = countToUnit(secondCount || 0);
setActiveAPIKey(APIKey);
setActiveSignal({
...signal,
@@ -643,14 +524,11 @@ function MultiIngestionSettings(): JSX.Element {
...signal.config,
day: {
...signal.config?.day,
enabled:
!isNil(signal?.config?.day?.size) || !isNil(signal?.config?.day?.count),
enabled: !isNil(signal?.config?.day?.size),
},
second: {
...signal.config?.second,
enabled:
!isNil(signal?.config?.second?.size) ||
!isNil(signal?.config?.second?.count),
enabled: !isNil(signal?.config?.second?.size),
},
},
});
@@ -658,22 +536,15 @@ function MultiIngestionSettings(): JSX.Element {
addEditLimitForm.setFieldsValue({
dailyLimit: bytesToGb(signal?.config?.day?.size || 0),
secondsLimit: bytesToGb(signal?.config?.second?.size || 0),
enableDailyLimit:
!isNil(signal?.config?.day?.size) || !isNil(signal?.config?.day?.count),
enableSecondLimit:
!isNil(signal?.config?.second?.size) ||
!isNil(signal?.config?.second?.count),
dailyCount: dayCountConverted.value,
dailyCountUnit: dayCountConverted.unit,
secondsCount: secondCountConverted.value,
secondsCountUnit: secondCountConverted.unit,
enableDailyLimit: !isNil(signal?.config?.day?.size),
enableSecondLimit: !isNil(signal?.config?.second?.size),
});
setIsEditAddLimitOpen(true);
};
const onDeleteLimitHandler = (): void => {
if (activeSignal && activeSignal.id) {
if (activeSignal && activeSignal?.id) {
deleteLimitForKey(activeSignal.id);
}
};
@@ -701,13 +572,13 @@ function MultiIngestionSettings(): JSX.Element {
formatTimezoneAdjustedTimestamp,
);
// Convert array of limits to a dictionary for quick access
const limitsDict: Record<string, LimitProps> = {};
APIKey.limits?.forEach((limitItem: LimitProps) => {
limitsDict[limitItem.signal] = limitItem;
const limits: { [key: string]: LimitProps } = {};
APIKey.limits?.forEach((limit: LimitProps) => {
limits[limit.signal] = limit;
});
const hasLimits = (signalName: string): boolean => !!limitsDict[signalName];
const hasLimits = (signal: string): boolean => !!limits[signal];
const items: CollapseProps['items'] = [
{
@@ -743,9 +614,11 @@ function MultiIngestionSettings(): JSX.Element {
onClick={(e): void => {
e.stopPropagation();
e.preventDefault();
showEditModal(APIKey);
}}
/>
<Button
className="periscope-btn ghost"
icon={<Trash2 color={Color.BG_CHERRY_500} size={14} />}
@@ -797,23 +670,18 @@ function MultiIngestionSettings(): JSX.Element {
<div className="limits-data">
<div className="signals">
{SIGNALS_CONFIG.map((signalCfg) => {
const signalName = signalCfg.name;
const limit = limitsDict[signalName];
const hasValidDayLimit =
limit?.config?.day?.size !== undefined ||
limit?.config?.day?.count !== undefined;
const hasValidSecondLimit =
limit?.config?.second?.size !== undefined ||
limit?.config?.second?.count !== undefined;
{SIGNALS.map((signal) => {
const hasValidDayLimit = !isNil(limits[signal]?.config?.day?.size);
const hasValidSecondLimit = !isNil(
limits[signal]?.config?.second?.size,
);
return (
<div className="signal" key={signalName}>
<div className="signal" key={signal}>
<div className="header">
<div className="signal-name">{signalName}</div>
<div className="signal-name">{signal}</div>
<div className="actions">
{hasLimits(signalName) ? (
{hasLimits(signal) ? (
<>
<Button
className="periscope-btn ghost"
@@ -822,9 +690,10 @@ function MultiIngestionSettings(): JSX.Element {
onClick={(e): void => {
e.stopPropagation();
e.preventDefault();
enableEditLimitMode(APIKey, limit);
enableEditLimitMode(APIKey, limits[signal]);
}}
/>
<Button
className="periscope-btn ghost"
icon={<Trash2 color={Color.BG_CHERRY_500} size={14} />}
@@ -832,7 +701,7 @@ function MultiIngestionSettings(): JSX.Element {
onClick={(e): void => {
e.stopPropagation();
e.preventDefault();
showDeleteLimitModal(APIKey, limit);
showDeleteLimitModal(APIKey, limits[signal]);
}}
/>
</>
@@ -843,12 +712,14 @@ function MultiIngestionSettings(): JSX.Element {
shape="round"
icon={<PlusIcon size={14} />}
disabled={!!(activeAPIKey?.id === APIKey.id && activeSignal)}
// eslint-disable-next-line sonarjs/no-identical-functions
onClick={(e): void => {
e.stopPropagation();
e.preventDefault();
enableEditLimitMode(APIKey, {
id: signalName,
signal: signalName,
id: signal,
signal,
config: {},
});
}}
@@ -861,7 +732,7 @@ function MultiIngestionSettings(): JSX.Element {
<div className="signal-limit-values">
{activeAPIKey?.id === APIKey.id &&
activeSignal?.signal === signalName &&
activeSignal?.signal === signal &&
isEditAddLimitOpen ? (
<Form
name="edit-ingestion-key-limit-form"
@@ -869,8 +740,8 @@ function MultiIngestionSettings(): JSX.Element {
form={addEditLimitForm}
autoComplete="off"
initialValues={{
dailyLimit: bytesToGb(limit?.config?.day?.size || 0),
secondsLimit: bytesToGb(limit?.config?.second?.size || 0),
dailyLimit: bytesToGb(limits[signal]?.config?.day?.size),
secondsLimit: bytesToGb(limits[signal]?.config?.second?.size),
}}
className="edit-ingestion-key-limit-form"
>
@@ -885,20 +756,16 @@ function MultiIngestionSettings(): JSX.Element {
size="small"
checked={activeSignal?.config?.day?.enabled}
onChange={(value): void => {
setActiveSignal((prev) =>
prev
? {
...prev,
config: {
...prev.config,
day: {
...prev.config?.day,
enabled: value,
},
},
}
: null,
);
setActiveSignal({
...activeSignal,
config: {
...activeSignal.config,
day: {
...activeSignal.config?.day,
enabled: value,
},
},
});
}}
/>
</Form.Item>
@@ -908,87 +775,50 @@ function MultiIngestionSettings(): JSX.Element {
Add a limit for data ingested daily
</div>
</div>
{signalCfg.usesSize && (
<div className="size">
{activeSignal?.config?.day?.enabled ? (
<Form.Item name="dailyLimit" key="dailyLimit">
<InputNumber
disabled={!activeSignal?.config?.day?.enabled}
addonAfter={
<Select defaultValue="GiB" disabled>
<Option value="TiB">TiB</Option>
<Option value="GiB">GiB</Option>
<Option value="MiB">MiB</Option>
<Option value="KiB">KiB</Option>
</Select>
}
/>
</Form.Item>
) : (
<div className="no-limit">
<Infinity size={16} /> NO LIMIT
</div>
)}
</div>
)}
{signalCfg.usesCount && (
<div className="count">
{activeSignal?.config?.day?.enabled ? (
<Form.Item name="dailyCount" key="dailyCount">
<InputNumber
placeholder="Enter max # of samples/day"
addonAfter={
<Form.Item
name="dailyCountUnit"
noStyle
initialValue="million"
>
<Select
style={{
width: 90,
}}
>
<Option value="thousand">Thousand</Option>
<Option value="million">Million</Option>
<Option value="billion">Billion</Option>
</Select>
</Form.Item>
}
/>
</Form.Item>
) : (
<div className="no-limit">
<Infinity size={16} /> NO LIMIT
</div>
)}
</div>
)}
<div className="size">
{activeSignal?.config?.day?.enabled ? (
<Form.Item name="dailyLimit" key="dailyLimit">
<InputNumber
disabled={!activeSignal?.config?.day?.enabled}
key="dailyLimit"
addonAfter={
<Select defaultValue="GiB" disabled>
<Option value="TiB"> TiB</Option>
<Option value="GiB"> GiB</Option>
<Option value="MiB"> MiB </Option>
<Option value="KiB"> KiB </Option>
</Select>
}
/>
</Form.Item>
) : (
<div className="no-limit">
<Infinity size={16} /> NO LIMIT
</div>
)}
</div>
</div>
<div className="second-limit">
<div className="heading">
<div className="title">
Per Second limit
Per Second limit{' '}
<div className="limit-enable-disable-toggle">
<Form.Item name="enableSecondLimit">
<Switch
size="small"
checked={activeSignal?.config?.second?.enabled}
onChange={(value): void => {
setActiveSignal((prev) =>
prev
? {
...prev,
config: {
...prev.config,
second: {
...prev.config?.second,
enabled: value,
},
},
}
: null,
);
setActiveSignal({
...activeSignal,
config: {
...activeSignal.config,
second: {
...activeSignal.config?.second,
enabled: value,
},
},
});
}}
/>
</Form.Item>
@@ -998,68 +828,37 @@ function MultiIngestionSettings(): JSX.Element {
Add a limit for data ingested every second
</div>
</div>
{signalCfg.usesSize && (
<div className="size">
{activeSignal?.config?.second?.enabled ? (
<Form.Item name="secondsLimit" key="secondsLimit">
<InputNumber
disabled={!activeSignal?.config?.second?.enabled}
addonAfter={
<Select defaultValue="GiB" disabled>
<Option value="TiB">TiB</Option>
<Option value="GiB">GiB</Option>
<Option value="MiB">MiB</Option>
<Option value="KiB">KiB</Option>
</Select>
}
/>
</Form.Item>
) : (
<div className="no-limit">
<Infinity size={16} /> NO LIMIT
</div>
)}
</div>
)}
{signalCfg.usesCount && (
<div className="count">
{activeSignal?.config?.second?.enabled ? (
<Form.Item name="secondsCount" key="secondsCount">
<InputNumber
placeholder="Enter max # of samples/s"
addonAfter={
<Form.Item
name="secondsCountUnit"
noStyle
initialValue="million"
>
<Select
style={{
width: 90,
}}
>
<Option value="thousand">Thousand</Option>
<Option value="million">Million</Option>
<Option value="billion">Billion</Option>
</Select>
</Form.Item>
}
/>
</Form.Item>
) : (
<div className="no-limit">
<Infinity size={16} /> NO LIMIT
</div>
)}
</div>
)}
<div className="size">
{activeSignal?.config?.second?.enabled ? (
<Form.Item name="secondsLimit" key="secondsLimit">
<InputNumber
key="secondsLimit"
disabled={!activeSignal?.config?.second?.enabled}
addonAfter={
<Select defaultValue="GiB" disabled>
<Option value="TiB"> TiB</Option>
<Option value="GiB"> GiB</Option>
<Option value="MiB"> MiB </Option>
<Option value="KiB"> KiB </Option>
</Select>
}
/>
</Form.Item>
) : (
<div className="no-limit">
<Infinity size={16} /> NO LIMIT
</div>
)}
</div>
</div>
</div>
{activeAPIKey?.id === APIKey.id &&
activeSignal.signal === signalName &&
activeSignal.signal === signal &&
!isLoadingLimitForKey &&
hasCreateLimitForIngestionKeyError &&
createLimitForIngestionKeyError &&
createLimitForIngestionKeyError?.error && (
<div className="error">
{createLimitForIngestionKeyError?.error}
@@ -1067,17 +866,17 @@ function MultiIngestionSettings(): JSX.Element {
)}
{activeAPIKey?.id === APIKey.id &&
activeSignal.signal === signalName &&
activeSignal.signal === signal &&
!isLoadingLimitForKey &&
hasUpdateLimitForIngestionKeyError &&
updateLimitForIngestionKeyError?.error && (
updateLimitForIngestionKeyError && (
<div className="error">
{updateLimitForIngestionKeyError?.error}
</div>
)}
{activeAPIKey?.id === APIKey.id &&
activeSignal.signal === signalName &&
activeSignal.signal === signal &&
isEditAddLimitOpen && (
<div className="signal-limit-save-discard">
<Button
@@ -1091,10 +890,10 @@ function MultiIngestionSettings(): JSX.Element {
isLoadingLimitForKey || isLoadingUpdatedLimitForKey
}
onClick={(): void => {
if (!hasLimits(signalName)) {
handleAddLimit(APIKey, signalName);
if (!hasLimits(signal)) {
handleAddLimit(APIKey, signal);
} else {
handleUpdateLimit(APIKey, limitsDict[signalName]);
handleUpdateLimit(APIKey, limits[signal]);
}
}}
>
@@ -1116,99 +915,55 @@ function MultiIngestionSettings(): JSX.Element {
</Form>
) : (
<div className="signal-limit-view-mode">
{/* DAILY limit usage/limit */}
<div className="signal-limit-value">
<div className="limit-type">
Daily <Minus size={16} />
Daily <Minus size={16} />{' '}
</div>
<div className="limit-value">
{/* Size (if usesSize) */}
{signalCfg.usesSize &&
(hasValidDayLimit &&
limit?.config?.day?.size !== undefined ? (
<>
{getYAxisFormattedValue(
(limit?.metric?.day?.size || 0).toString(),
'bytes',
)}{' '}
/{' '}
{getYAxisFormattedValue(
(limit?.config?.day?.size || 0).toString(),
'bytes',
)}
</>
) : (
<>
<Infinity size={16} /> NO LIMIT
</>
))}
{/* Count (if usesCount) */}
{signalCfg.usesCount &&
(limit?.config?.day?.count !== undefined ? (
<div style={{ marginTop: 4 }}>
{countToUnit(
limit?.metric?.day?.count || 0,
).value.toFixed(2)}{' '}
{countToUnit(limit?.metric?.day?.count || 0).unit} /{' '}
{countToUnit(
limit?.config?.day?.count || 0,
).value.toFixed(2)}{' '}
{countToUnit(limit?.config?.day?.count || 0).unit}
</div>
) : (
<>
<Infinity size={16} /> NO LIMIT
</>
))}
<div className="limit-value">
{hasValidDayLimit ? (
<>
{getYAxisFormattedValue(
(limits[signal]?.metric?.day?.size || 0).toString(),
'bytes',
)}{' '}
/{' '}
{getYAxisFormattedValue(
(limits[signal]?.config?.day?.size || 0).toString(),
'bytes',
)}
</>
) : (
<>
<Infinity size={16} /> NO LIMIT
</>
)}
</div>
</div>
{/* SECOND limit usage/limit */}
<div className="signal-limit-value">
<div className="limit-type">
Seconds <Minus size={16} />
</div>
<div className="limit-value">
{/* Size (if usesSize) */}
{signalCfg.usesSize &&
(hasValidSecondLimit &&
limit?.config?.second?.size !== undefined ? (
<>
{getYAxisFormattedValue(
(limit?.metric?.second?.size || 0).toString(),
'bytes',
)}{' '}
/{' '}
{getYAxisFormattedValue(
(limit?.config?.second?.size || 0).toString(),
'bytes',
)}
</>
) : (
<>
<Infinity size={16} /> NO LIMIT
</>
))}
{/* Count (if usesCount) */}
{signalCfg.usesCount &&
(limit?.config?.second?.count !== undefined ? (
<div style={{ marginTop: 4 }}>
{countToUnit(
limit?.metric?.second?.count || 0,
).value.toFixed(2)}{' '}
{countToUnit(limit?.metric?.second?.count || 0).unit} /{' '}
{countToUnit(
limit?.config?.second?.count || 0,
).value.toFixed(2)}{' '}
{countToUnit(limit?.config?.second?.count || 0).unit}
</div>
) : (
<>
<Infinity size={16} /> NO LIMIT
</>
))}
<div className="limit-value">
{hasValidSecondLimit ? (
<>
{getYAxisFormattedValue(
(limits[signal]?.metric?.second?.size || 0).toString(),
'bytes',
)}{' '}
/{' '}
{getYAxisFormattedValue(
(limits[signal]?.config?.second?.size || 0).toString(),
'bytes',
)}
</>
) : (
<>
<Infinity size={16} /> NO LIMIT
</>
)}
</div>
</div>
</div>
@@ -1278,6 +1033,7 @@ function MultiIngestionSettings(): JSX.Element {
className="learn-more"
rel="noreferrer"
>
{' '}
Learn more <ArrowUpRight size={14} />
</a>
</Typography.Text>

View File

@@ -692,7 +692,7 @@ function QueryBuilderSearchV2(
operatorOptions = QUERY_BUILDER_OPERATORS_BY_TYPES[
currentFilterItem.key
.dataType as keyof typeof QUERY_BUILDER_OPERATORS_BY_TYPES
]?.map((operator) => ({
].map((operator) => ({
label: operator,
value: operator,
}));

View File

@@ -8,7 +8,7 @@ import RouteTab from 'components/RouteTab';
import Spinner from 'components/Spinner';
import ROUTES from 'constants/routes';
import history from 'lib/history';
import { useEffect, useMemo } from 'react';
import { useMemo } from 'react';
import { useTranslation } from 'react-i18next';
import { useLocation } from 'react-router-dom';
@@ -80,11 +80,6 @@ function AlertDetails(): JSX.Element {
alertDetailsResponse,
} = useGetAlertRuleDetails();
useEffect(() => {
const alertTitle = alertDetailsResponse?.payload?.data.alert;
document.title = alertTitle || document.title;
}, [alertDetailsResponse?.payload?.data.alert, isRefetching]);
if (
isError ||
!isValidRuleId ||

View File

@@ -4,7 +4,6 @@ import NotFound from 'components/NotFound';
import Spinner from 'components/Spinner';
import NewDashboard from 'container/NewDashboard';
import { useDashboard } from 'providers/Dashboard/Dashboard';
import { useEffect } from 'react';
import { ErrorType } from 'types/common';
function DashboardPage(): JSX.Element {
@@ -18,11 +17,6 @@ function DashboardPage(): JSX.Element {
(dashboardResponse?.error as AxiosError)?.response?.data?.errorType
: 'Something went wrong';
useEffect(() => {
const dashboardTitle = dashboardResponse.data?.data.title;
document.title = dashboardTitle || document.title;
}, [dashboardResponse.data?.data.title, isFetching]);
if (isError && !isFetching && errorMessage === ErrorType.NotFound) {
return <NotFound />;
}

View File

@@ -1,14 +1,3 @@
export interface LimitConfig {
size?: number;
count?: number; // mainly used for metrics
enabled?: boolean;
}
export interface LimitSettings {
day?: LimitConfig;
second?: LimitConfig;
}
export interface LimitProps {
id: string;
signal: string;
@@ -16,20 +5,56 @@ export interface LimitProps {
key_id?: string;
created_at?: string;
updated_at?: string;
config?: LimitSettings;
metric?: LimitSettings;
config?: {
day?: {
size?: number;
enabled?: boolean;
};
second?: {
size?: number;
enabled?: boolean;
};
};
metric?: {
day?: {
size?: number;
enabled?: boolean;
};
second?: {
size?: number;
enabled?: boolean;
};
};
}
export interface AddLimitProps {
keyID: string;
signal: string;
config: LimitSettings;
config: {
day?: {
size?: number;
enabled?: boolean;
};
second?: {
size?: number;
enabled?: boolean;
};
};
}
export interface UpdateLimitProps {
limitID: string;
signal: string;
config: LimitSettings;
config: {
day?: {
size?: number;
enabled?: boolean;
};
second?: {
size?: number;
enabled?: boolean;
};
};
}
export interface LimitSuccessProps {

11
go.mod
View File

@@ -9,8 +9,6 @@ require (
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd
github.com/SigNoz/signoz-otel-collector v0.111.16
github.com/SigNoz/zap_otlp/zap_otlp_encoder v0.0.0-20230822164844-1b861a431974
github.com/SigNoz/zap_otlp/zap_otlp_sync v0.0.0-20230822164844-1b861a431974
github.com/antonmedv/expr v1.15.3
github.com/auth0/go-jwt-middleware v1.0.1
github.com/cespare/xxhash/v2 v2.3.0
@@ -20,6 +18,7 @@ require (
github.com/go-kit/log v0.2.1
github.com/go-redis/redis/v8 v8.11.5
github.com/go-redis/redismock/v8 v8.11.5
github.com/go-viper/mapstructure/v2 v2.1.0
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/google/uuid v1.6.0
github.com/gorilla/handlers v1.5.1
@@ -29,8 +28,9 @@ require (
github.com/jmoiron/sqlx v1.3.4
github.com/json-iterator/go v1.1.12
github.com/knadh/koanf v1.5.0
github.com/knadh/koanf/v2 v2.1.1
github.com/mailru/easyjson v0.7.7
github.com/mattn/go-sqlite3 v2.0.3+incompatible
github.com/mattn/go-sqlite3 v1.14.24
github.com/oklog/oklog v0.3.2
github.com/open-telemetry/opamp-go v0.5.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.111.0
@@ -67,7 +67,6 @@ require (
golang.org/x/net v0.33.0
golang.org/x/oauth2 v0.23.0
golang.org/x/text v0.21.0
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.34.2
gopkg.in/segmentio/analytics-go.v3 v3.1.0
gopkg.in/yaml.v2 v2.4.0
@@ -101,6 +100,7 @@ require (
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/go-jose/go-jose/v4 v4.0.2 // indirect
@@ -108,7 +108,6 @@ require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/go-viper/mapstructure/v2 v2.1.0 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
@@ -129,7 +128,6 @@ require (
github.com/jpillora/backoff v1.0.0 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/klauspost/compress v1.17.10 // indirect
github.com/knadh/koanf/v2 v2.1.1 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/leodido/go-syslog/v4 v4.2.0 // indirect
github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b // indirect
@@ -225,6 +223,7 @@ require (
google.golang.org/api v0.199.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.67.1 // indirect
k8s.io/client-go v0.31.1 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect

10
go.sum
View File

@@ -70,12 +70,6 @@ github.com/SigNoz/prometheus v1.12.0 h1:+BXeIHyMOOWWa+xjhJ+x80JFva7r1WzWIfIhQ5PU
github.com/SigNoz/prometheus v1.12.0/go.mod h1:EqNM27OwmPfqMUk+E+XG1L9rfDFcyXnzzDrg0EPOfxA=
github.com/SigNoz/signoz-otel-collector v0.111.16 h1:535uKH5Oux+35EsI+L3C6pnAP/Ye0PTCbVizXoL+VqE=
github.com/SigNoz/signoz-otel-collector v0.111.16/go.mod h1:HJ4m0LY1MPsuZmuRF7Ixb+bY8rxgRzI0VXzOedESsjg=
github.com/SigNoz/zap_otlp v0.1.0 h1:T7rRcFN87GavY8lDGZj0Z3Xv6OhJA6Pj3I9dNPmqvRc=
github.com/SigNoz/zap_otlp v0.1.0/go.mod h1:lcHvbDbRgvDnPxo9lDlaL1JK2PyOyouP/C3ynnYIvyo=
github.com/SigNoz/zap_otlp/zap_otlp_encoder v0.0.0-20230822164844-1b861a431974 h1:PKVgdf83Yw+lZJbFtNGBgqXiXNf3+kOXW2qZ7Ms7OaY=
github.com/SigNoz/zap_otlp/zap_otlp_encoder v0.0.0-20230822164844-1b861a431974/go.mod h1:fpiHtiboLJpIE5TtkQfiWx6xtnlA+uWmv+N9opETqKY=
github.com/SigNoz/zap_otlp/zap_otlp_sync v0.0.0-20230822164844-1b861a431974 h1:G2JzCrqdeOTtAn4tDFZEg5gCAEYVRXcddG3ZlrFMumo=
github.com/SigNoz/zap_otlp/zap_otlp_sync v0.0.0-20230822164844-1b861a431974/go.mod h1:YtDal1xBRQfPRNo7iSU3W37RGT0jMW7Rnzk6EON3a4M=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -521,8 +515,8 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U=
github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=

14
pkg/cache/config.go vendored
View File

@@ -4,12 +4,9 @@ import (
"time"
go_cache "github.com/patrickmn/go-cache"
"go.signoz.io/signoz/pkg/confmap"
"go.signoz.io/signoz/pkg/factory"
)
// Config satisfies the confmap.Config interface
var _ confmap.Config = (*Config)(nil)
type Memory struct {
TTL time.Duration `mapstructure:"ttl"`
CleanupInterval time.Duration `mapstructure:"cleanupInterval"`
@@ -28,7 +25,11 @@ type Config struct {
Redis Redis `mapstructure:"redis"`
}
func (c *Config) NewWithDefaults() confmap.Config {
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("cache"), newConfig)
}
func newConfig() factory.Config {
return &Config{
Provider: "memory",
Memory: Memory{
@@ -42,8 +43,9 @@ func (c *Config) NewWithDefaults() confmap.Config {
DB: 0,
},
}
}
func (c *Config) Validate() error {
func (c Config) Validate() error {
return nil
}

101
pkg/cache/memorycache/memory.go vendored Normal file
View File

@@ -0,0 +1,101 @@
package memorycache
import (
"context"
"fmt"
"reflect"
"time"
gocache "github.com/patrickmn/go-cache"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/factory"
)
type memory struct {
cc *gocache.Cache
}
func NewFactory() factory.ProviderFactory[cache.Cache, cache.Config] {
return factory.NewProviderFactory(factory.MustNewName("memory"), New)
}
func New(ctx context.Context, settings factory.ProviderSettings, config cache.Config) (cache.Cache, error) {
return &memory{cc: gocache.New(config.Memory.TTL, config.Memory.CleanupInterval)}, nil
}
// Connect does nothing
func (c *memory) Connect(_ context.Context) error {
return nil
}
// Store stores the data in the cache
func (c *memory) Store(_ context.Context, cacheKey string, data cache.CacheableEntity, ttl time.Duration) error {
// check if the data being passed is a pointer and is not nil
rv := reflect.ValueOf(data)
if rv.Kind() != reflect.Pointer || rv.IsNil() {
return cache.WrapCacheableEntityErrors(reflect.TypeOf(data), "inmemory")
}
c.cc.Set(cacheKey, data, ttl)
return nil
}
// Retrieve retrieves the data from the cache
func (c *memory) Retrieve(_ context.Context, cacheKey string, dest cache.CacheableEntity, allowExpired bool) (cache.RetrieveStatus, error) {
// check if the destination being passed is a pointer and is not nil
dstv := reflect.ValueOf(dest)
if dstv.Kind() != reflect.Pointer || dstv.IsNil() {
return cache.RetrieveStatusError, cache.WrapCacheableEntityErrors(reflect.TypeOf(dest), "inmemory")
}
// check if the destination value is settable
if !dstv.Elem().CanSet() {
return cache.RetrieveStatusError, fmt.Errorf("destination value is not settable, %s", dstv.Elem())
}
data, found := c.cc.Get(cacheKey)
if !found {
return cache.RetrieveStatusKeyMiss, nil
}
// check the type compatbility between the src and dest
srcv := reflect.ValueOf(data)
if !srcv.Type().AssignableTo(dstv.Type()) {
return cache.RetrieveStatusError, fmt.Errorf("src type is not assignable to dst type")
}
// set the value to from src to dest
dstv.Elem().Set(srcv.Elem())
return cache.RetrieveStatusHit, nil
}
// SetTTL sets the TTL for the cache entry
func (c *memory) SetTTL(_ context.Context, cacheKey string, ttl time.Duration) {
item, found := c.cc.Get(cacheKey)
if !found {
return
}
c.cc.Replace(cacheKey, item, ttl)
}
// Remove removes the cache entry
func (c *memory) Remove(_ context.Context, cacheKey string) {
c.cc.Delete(cacheKey)
}
// BulkRemove removes the cache entries
func (c *memory) BulkRemove(_ context.Context, cacheKeys []string) {
for _, cacheKey := range cacheKeys {
c.cc.Delete(cacheKey)
}
}
// Close does nothing
func (c *memory) Close(_ context.Context) error {
return nil
}
// Configuration returns the cache configuration
func (c *memory) Configuration() *cache.Memory {
return nil
}

View File

@@ -7,18 +7,21 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
_cache "go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/factory"
)
// TestNew tests the New function
func TestNew(t *testing.T) {
opts := &_cache.Memory{
opts := _cache.Memory{
TTL: 10 * time.Second,
CleanupInterval: 10 * time.Second,
}
c := New(opts)
c, err := New(context.Background(), factory.ProviderSettings{}, _cache.Config{Provider: "memory", Memory: opts})
require.NoError(t, err)
assert.NotNil(t, c)
assert.NotNil(t, c.cc)
assert.NotNil(t, c.(*memory).cc)
assert.NoError(t, c.Connect(context.Background()))
}
@@ -53,32 +56,35 @@ func (dce DCacheableEntity) UnmarshalBinary(data []byte) error {
// TestStore tests the Store function
// this should fail because of nil pointer error
func TestStoreWithNilPointer(t *testing.T) {
opts := &_cache.Memory{
opts := _cache.Memory{
TTL: 10 * time.Second,
CleanupInterval: 10 * time.Second,
}
c := New(opts)
c, err := New(context.Background(), factory.ProviderSettings{}, _cache.Config{Provider: "memory", Memory: opts})
require.NoError(t, err)
var storeCacheableEntity *CacheableEntity
assert.Error(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second))
}
// this should fail because of no pointer error
func TestStoreWithStruct(t *testing.T) {
opts := &_cache.Memory{
opts := _cache.Memory{
TTL: 10 * time.Second,
CleanupInterval: 10 * time.Second,
}
c := New(opts)
c, err := New(context.Background(), factory.ProviderSettings{}, _cache.Config{Provider: "memory", Memory: opts})
require.NoError(t, err)
var storeCacheableEntity CacheableEntity
assert.Error(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second))
}
func TestStoreWithNonNilPointer(t *testing.T) {
opts := &_cache.Memory{
opts := _cache.Memory{
TTL: 10 * time.Second,
CleanupInterval: 10 * time.Second,
}
c := New(opts)
c, err := New(context.Background(), factory.ProviderSettings{}, _cache.Config{Provider: "memory", Memory: opts})
require.NoError(t, err)
storeCacheableEntity := &CacheableEntity{
Key: "some-random-key",
Value: 1,
@@ -89,11 +95,12 @@ func TestStoreWithNonNilPointer(t *testing.T) {
// TestRetrieve tests the Retrieve function
func TestRetrieveWithNilPointer(t *testing.T) {
opts := &_cache.Memory{
opts := _cache.Memory{
TTL: 10 * time.Second,
CleanupInterval: 10 * time.Second,
}
c := New(opts)
c, err := New(context.Background(), factory.ProviderSettings{}, _cache.Config{Provider: "memory", Memory: opts})
require.NoError(t, err)
storeCacheableEntity := &CacheableEntity{
Key: "some-random-key",
Value: 1,
@@ -109,11 +116,12 @@ func TestRetrieveWithNilPointer(t *testing.T) {
}
func TestRetrieveWitNonPointer(t *testing.T) {
opts := &_cache.Memory{
opts := _cache.Memory{
TTL: 10 * time.Second,
CleanupInterval: 10 * time.Second,
}
c := New(opts)
c, err := New(context.Background(), factory.ProviderSettings{}, _cache.Config{Provider: "memory", Memory: opts})
require.NoError(t, err)
storeCacheableEntity := &CacheableEntity{
Key: "some-random-key",
Value: 1,
@@ -129,11 +137,12 @@ func TestRetrieveWitNonPointer(t *testing.T) {
}
func TestRetrieveWithDifferentTypes(t *testing.T) {
opts := &_cache.Memory{
opts := _cache.Memory{
TTL: 10 * time.Second,
CleanupInterval: 10 * time.Second,
}
c := New(opts)
c, err := New(context.Background(), factory.ProviderSettings{}, _cache.Config{Provider: "memory", Memory: opts})
require.NoError(t, err)
storeCacheableEntity := &CacheableEntity{
Key: "some-random-key",
Value: 1,
@@ -148,11 +157,8 @@ func TestRetrieveWithDifferentTypes(t *testing.T) {
}
func TestRetrieveWithSameTypes(t *testing.T) {
opts := &_cache.Memory{
TTL: 10 * time.Second,
CleanupInterval: 10 * time.Second,
}
c := New(opts)
c, err := New(context.Background(), factory.ProviderSettings{}, _cache.Config{Provider: "memory", Memory: _cache.Memory{TTL: 10 * time.Second, CleanupInterval: 10 * time.Second}})
require.NoError(t, err)
storeCacheableEntity := &CacheableEntity{
Key: "some-random-key",
Value: 1,
@@ -169,7 +175,8 @@ func TestRetrieveWithSameTypes(t *testing.T) {
// TestSetTTL tests the SetTTL function
func TestSetTTL(t *testing.T) {
c := New(&_cache.Memory{TTL: 10 * time.Second, CleanupInterval: 1 * time.Second})
c, err := New(context.Background(), factory.ProviderSettings{}, _cache.Config{Provider: "memory", Memory: _cache.Memory{TTL: 10 * time.Second, CleanupInterval: 1 * time.Second}})
require.NoError(t, err)
storeCacheableEntity := &CacheableEntity{
Key: "some-random-key",
Value: 1,
@@ -194,11 +201,11 @@ func TestSetTTL(t *testing.T) {
// TestRemove tests the Remove function
func TestRemove(t *testing.T) {
opts := &_cache.Memory{
opts := _cache.Memory{
TTL: 10 * time.Second,
CleanupInterval: 10 * time.Second,
}
c := New(opts)
c, err := New(context.Background(), factory.ProviderSettings{}, _cache.Config{Provider: "memory", Memory: opts})
storeCacheableEntity := &CacheableEntity{
Key: "some-random-key",
Value: 1,
@@ -216,11 +223,12 @@ func TestRemove(t *testing.T) {
// TestBulkRemove tests the BulkRemove function
func TestBulkRemove(t *testing.T) {
opts := &_cache.Memory{
opts := _cache.Memory{
TTL: 10 * time.Second,
CleanupInterval: 10 * time.Second,
}
c := New(opts)
c, err := New(context.Background(), factory.ProviderSettings{}, _cache.Config{Provider: "memory", Memory: opts})
require.NoError(t, err)
storeCacheableEntity := &CacheableEntity{
Key: "some-random-key",
Value: 1,
@@ -244,11 +252,12 @@ func TestBulkRemove(t *testing.T) {
// TestCache tests the cache
func TestCache(t *testing.T) {
opts := &_cache.Memory{
opts := _cache.Memory{
TTL: 10 * time.Second,
CleanupInterval: 10 * time.Second,
}
c := New(opts)
c, err := New(context.Background(), factory.ProviderSettings{}, _cache.Config{Provider: "memory", Memory: opts})
require.NoError(t, err)
storeCacheableEntity := &CacheableEntity{
Key: "some-random-key",
Value: 1,

View File

@@ -1,96 +0,0 @@
package memorycache
import (
"context"
"fmt"
"reflect"
"time"
go_cache "github.com/patrickmn/go-cache"
_cache "go.signoz.io/signoz/pkg/cache"
)
type provider struct {
cc *go_cache.Cache
}
func New(opts *_cache.Memory) *provider {
return &provider{cc: go_cache.New(opts.TTL, opts.CleanupInterval)}
}
// Connect does nothing
func (c *provider) Connect(_ context.Context) error {
return nil
}
// Store stores the data in the cache
func (c *provider) Store(_ context.Context, cacheKey string, data _cache.CacheableEntity, ttl time.Duration) error {
// check if the data being passed is a pointer and is not nil
rv := reflect.ValueOf(data)
if rv.Kind() != reflect.Pointer || rv.IsNil() {
return _cache.WrapCacheableEntityErrors(reflect.TypeOf(data), "inmemory")
}
c.cc.Set(cacheKey, data, ttl)
return nil
}
// Retrieve retrieves the data from the cache
func (c *provider) Retrieve(_ context.Context, cacheKey string, dest _cache.CacheableEntity, allowExpired bool) (_cache.RetrieveStatus, error) {
// check if the destination being passed is a pointer and is not nil
dstv := reflect.ValueOf(dest)
if dstv.Kind() != reflect.Pointer || dstv.IsNil() {
return _cache.RetrieveStatusError, _cache.WrapCacheableEntityErrors(reflect.TypeOf(dest), "inmemory")
}
// check if the destination value is settable
if !dstv.Elem().CanSet() {
return _cache.RetrieveStatusError, fmt.Errorf("destination value is not settable, %s", dstv.Elem())
}
data, found := c.cc.Get(cacheKey)
if !found {
return _cache.RetrieveStatusKeyMiss, nil
}
// check the type compatbility between the src and dest
srcv := reflect.ValueOf(data)
if !srcv.Type().AssignableTo(dstv.Type()) {
return _cache.RetrieveStatusError, fmt.Errorf("src type is not assignable to dst type")
}
// set the value to from src to dest
dstv.Elem().Set(srcv.Elem())
return _cache.RetrieveStatusHit, nil
}
// SetTTL sets the TTL for the cache entry
func (c *provider) SetTTL(_ context.Context, cacheKey string, ttl time.Duration) {
item, found := c.cc.Get(cacheKey)
if !found {
return
}
c.cc.Replace(cacheKey, item, ttl)
}
// Remove removes the cache entry
func (c *provider) Remove(_ context.Context, cacheKey string) {
c.cc.Delete(cacheKey)
}
// BulkRemove removes the cache entries
func (c *provider) BulkRemove(_ context.Context, cacheKeys []string) {
for _, cacheKey := range cacheKeys {
c.cc.Delete(cacheKey)
}
}
// Close does nothing
func (c *provider) Close(_ context.Context) error {
return nil
}
// Configuration returns the cache configuration
func (c *provider) Configuration() *_cache.Memory {
return nil
}

View File

@@ -8,25 +8,30 @@ import (
"github.com/go-redis/redis/v8"
_cache "go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/factory"
"go.uber.org/zap"
)
type provider struct {
type cache struct {
client *redis.Client
opts *_cache.Redis
opts _cache.Redis
}
func New(opts *_cache.Redis) *provider {
return &provider{opts: opts}
func NewFactory() factory.ProviderFactory[_cache.Cache, _cache.Config] {
return factory.NewProviderFactory(factory.MustNewName("redis"), New)
}
func New(ctx context.Context, settings factory.ProviderSettings, config _cache.Config) (_cache.Cache, error) {
return &cache{opts: config.Redis}, nil
}
// WithClient creates a new cache with the given client
func WithClient(client *redis.Client) *provider {
return &provider{client: client}
func WithClient(client *redis.Client) *cache {
return &cache{client: client}
}
// Connect connects to the redis server
func (c *provider) Connect(_ context.Context) error {
func (c *cache) Connect(_ context.Context) error {
c.client = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", c.opts.Host, c.opts.Port),
Password: c.opts.Password,
@@ -36,12 +41,12 @@ func (c *provider) Connect(_ context.Context) error {
}
// Store stores the data in the cache
func (c *provider) Store(ctx context.Context, cacheKey string, data _cache.CacheableEntity, ttl time.Duration) error {
func (c *cache) Store(ctx context.Context, cacheKey string, data _cache.CacheableEntity, ttl time.Duration) error {
return c.client.Set(ctx, cacheKey, data, ttl).Err()
}
// Retrieve retrieves the data from the cache
func (c *provider) Retrieve(ctx context.Context, cacheKey string, dest _cache.CacheableEntity, allowExpired bool) (_cache.RetrieveStatus, error) {
func (c *cache) Retrieve(ctx context.Context, cacheKey string, dest _cache.CacheableEntity, allowExpired bool) (_cache.RetrieveStatus, error) {
err := c.client.Get(ctx, cacheKey).Scan(dest)
if err != nil {
if errors.Is(err, redis.Nil) {
@@ -53,7 +58,7 @@ func (c *provider) Retrieve(ctx context.Context, cacheKey string, dest _cache.Ca
}
// SetTTL sets the TTL for the cache entry
func (c *provider) SetTTL(ctx context.Context, cacheKey string, ttl time.Duration) {
func (c *cache) SetTTL(ctx context.Context, cacheKey string, ttl time.Duration) {
err := c.client.Expire(ctx, cacheKey, ttl).Err()
if err != nil {
zap.L().Error("error setting TTL for cache key", zap.String("cacheKey", cacheKey), zap.Duration("ttl", ttl), zap.Error(err))
@@ -61,39 +66,34 @@ func (c *provider) SetTTL(ctx context.Context, cacheKey string, ttl time.Duratio
}
// Remove removes the cache entry
func (c *provider) Remove(ctx context.Context, cacheKey string) {
func (c *cache) Remove(ctx context.Context, cacheKey string) {
c.BulkRemove(ctx, []string{cacheKey})
}
// BulkRemove removes the cache entries
func (c *provider) BulkRemove(ctx context.Context, cacheKeys []string) {
func (c *cache) BulkRemove(ctx context.Context, cacheKeys []string) {
if err := c.client.Del(ctx, cacheKeys...).Err(); err != nil {
zap.L().Error("error deleting cache keys", zap.Strings("cacheKeys", cacheKeys), zap.Error(err))
}
}
// Close closes the connection to the redis server
func (c *provider) Close(_ context.Context) error {
func (c *cache) Close(_ context.Context) error {
return c.client.Close()
}
// Ping pings the redis server
func (c *provider) Ping(ctx context.Context) error {
func (c *cache) Ping(ctx context.Context) error {
return c.client.Ping(ctx).Err()
}
// GetClient returns the redis client
func (c *provider) GetClient() *redis.Client {
func (c *cache) GetClient() *redis.Client {
return c.client
}
// GetOptions returns the options
func (c *provider) GetOptions() *_cache.Redis {
return c.opts
}
// GetTTL returns the TTL for the cache entry
func (c *provider) GetTTL(ctx context.Context, cacheKey string) time.Duration {
func (c *cache) GetTTL(ctx context.Context, cacheKey string) time.Duration {
ttl, err := c.client.TTL(ctx, cacheKey).Result()
if err != nil {
zap.L().Error("error getting TTL for cache key", zap.String("cacheKey", cacheKey), zap.Error(err))
@@ -102,12 +102,12 @@ func (c *provider) GetTTL(ctx context.Context, cacheKey string) time.Duration {
}
// GetKeys returns the keys matching the pattern
func (c *provider) GetKeys(ctx context.Context, pattern string) ([]string, error) {
func (c *cache) GetKeys(ctx context.Context, pattern string) ([]string, error) {
return c.client.Keys(ctx, pattern).Result()
}
// GetKeysWithTTL returns the keys matching the pattern with their TTL
func (c *provider) GetKeysWithTTL(ctx context.Context, pattern string) (map[string]time.Duration, error) {
func (c *cache) GetKeysWithTTL(ctx context.Context, pattern string) (map[string]time.Duration, error) {
keys, err := c.GetKeys(ctx, pattern)
if err != nil {
return nil, err

90
pkg/config/conf.go Normal file
View File

@@ -0,0 +1,90 @@
package config
import (
"github.com/go-viper/mapstructure/v2"
"github.com/knadh/koanf/providers/confmap"
"github.com/knadh/koanf/v2"
)
const (
KoanfDelimiter string = "::"
)
// Conf is a wrapper around the koanf library.
type Conf struct {
*koanf.Koanf
}
// NewConf creates a new Conf instance.
func NewConf() *Conf {
return &Conf{koanf.New(KoanfDelimiter)}
}
// NewConfFromMap creates a new Conf instance from a map.
func NewConfFromMap(m map[string]any) (*Conf, error) {
conf := NewConf()
if err := conf.Koanf.Load(confmap.Provider(m, KoanfDelimiter), nil); err != nil {
return nil, err
}
return conf, nil
}
// MustNewConfFromMap creates a new Conf instance from a map.
// It panics if the conf cannot be created.
func MustNewConfFromMap(m map[string]any) *Conf {
conf, err := NewConfFromMap(m)
if err != nil {
panic(err)
}
return conf
}
// Merge merges the current configuration with the input configuration.
func (conf *Conf) Merge(input *Conf) error {
return conf.Koanf.Merge(input.Koanf)
}
// Merge merges the current configuration with the input configuration.
func (conf *Conf) MergeAt(input *Conf, path string) error {
return conf.Koanf.MergeAt(input.Koanf, path)
}
// Unmarshal unmarshals the configuration at the given path into the input.
// It uses a WeaklyTypedInput to allow for more flexible unmarshalling.
func (conf *Conf) Unmarshal(path string, input any) error {
dc := &mapstructure.DecoderConfig{
TagName: "mapstructure",
WeaklyTypedInput: true,
DecodeHook: mapstructure.ComposeDecodeHookFunc(
mapstructure.StringToSliceHookFunc(","),
mapstructure.StringToTimeDurationHookFunc(),
mapstructure.TextUnmarshallerHookFunc(),
),
Result: input,
}
return conf.Koanf.UnmarshalWithConf(path, input, koanf.UnmarshalConf{Tag: "mapstructure", DecoderConfig: dc})
}
// Set sets the configuration at the given key.
// It decodes the input into a map as per mapstructure.Decode and then merges it into the configuration.
func (conf *Conf) Set(key string, input any) error {
m := map[string]any{}
err := mapstructure.Decode(input, &m)
if err != nil {
return err
}
newConf := NewConf()
if err := newConf.Koanf.Load(confmap.Provider(m, KoanfDelimiter), nil); err != nil {
return err
}
if err := conf.Koanf.MergeAt(newConf.Koanf, key); err != nil {
return err
}
return nil
}

38
pkg/config/conf_test.go Normal file
View File

@@ -0,0 +1,38 @@
package config
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestConfMerge(t *testing.T) {
testCases := []struct {
name string
conf *Conf
input *Conf
expected *Conf
pass bool
}{
{name: "Empty", conf: NewConf(), input: NewConf(), expected: NewConf(), pass: true},
{name: "Merge", conf: MustNewConfFromMap(map[string]any{"a": "b"}), input: MustNewConfFromMap(map[string]any{"c": "d"}), expected: MustNewConfFromMap(map[string]any{"a": "b", "c": "d"}), pass: true},
{name: "NestedMerge", conf: MustNewConfFromMap(map[string]any{"a": map[string]any{"b": "v1", "c": "v2"}}), input: MustNewConfFromMap(map[string]any{"a": map[string]any{"d": "v1", "e": "v2"}}), expected: MustNewConfFromMap(map[string]any{"a": map[string]any{"b": "v1", "c": "v2", "d": "v1", "e": "v2"}}), pass: true},
{name: "Override", conf: MustNewConfFromMap(map[string]any{"a": "b"}), input: MustNewConfFromMap(map[string]any{"a": "c"}), expected: MustNewConfFromMap(map[string]any{"a": "c"}), pass: true},
}
t.Parallel()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.conf.Merge(tc.input)
if !tc.pass {
assert.Error(t, err)
return
}
require.NoError(t, err)
assert.Equal(t, tc.expected.Raw(), tc.conf.Raw())
assert.Equal(t, tc.expected.Raw(), tc.conf.Raw())
})
}
}

View File

@@ -3,32 +3,34 @@ package config
import (
"context"
"go.signoz.io/signoz/pkg/cache"
signozconfmap "go.signoz.io/signoz/pkg/confmap"
"go.signoz.io/signoz/pkg/instrumentation"
"go.signoz.io/signoz/pkg/web"
"go.signoz.io/signoz/pkg/factory"
)
// This map contains the default values of all config structs
var (
defaults = map[string]signozconfmap.Config{
"web": &web.Config{},
"cache": &cache.Config{},
}
)
// Config defines the entire configuration of signoz.
type Config struct {
Instrumentation instrumentation.Config `mapstructure:"instrumentation"`
Web web.Config `mapstructure:"web"`
Cache cache.Config `mapstructure:"cache"`
}
func New(ctx context.Context, settings ProviderSettings) (*Config, error) {
provider, err := NewProvider(settings)
func New(ctx context.Context, resolverConfig ResolverConfig, configFactories []factory.ConfigFactory) (*Conf, error) {
// Get the config from the resolver
resolver, err := NewResolver(resolverConfig)
if err != nil {
return nil, err
}
return provider.Get(ctx)
resolvedConf, err := resolver.Do(ctx)
if err != nil {
return nil, err
}
conf := NewConf()
// Set the default configs
for _, factory := range configFactories {
c := factory.New()
if err := conf.Set(factory.Name().String(), c); err != nil {
return nil, err
}
}
err = conf.Merge(resolvedConf)
if err != nil {
return nil, err
}
return conf, nil
}

View File

@@ -1,54 +0,0 @@
package config
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/confmap"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/confmap/provider/signozenvprovider"
"go.signoz.io/signoz/pkg/web"
)
func TestNewWithSignozEnvProvider(t *testing.T) {
t.Setenv("SIGNOZ__WEB__PREFIX", "/web")
t.Setenv("SIGNOZ__WEB__DIRECTORY", "/build")
t.Setenv("SIGNOZ__CACHE__PROVIDER", "redis")
t.Setenv("SIGNOZ__CACHE__REDIS__HOST", "127.0.0.1")
config, err := New(context.Background(), ProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{"signozenv:"},
ProviderFactories: []confmap.ProviderFactory{
signozenvprovider.NewFactory(),
},
},
})
require.NoError(t, err)
expected := &Config{
Web: web.Config{
Prefix: "/web",
Directory: "/build",
},
Cache: cache.Config{
Provider: "redis",
Memory: cache.Memory{
TTL: time.Duration(-1),
CleanupInterval: 1 * time.Minute,
},
Redis: cache.Redis{
Host: "127.0.0.1",
Port: 6379,
Password: "",
DB: 0,
},
},
}
assert.Equal(t, expected, config)
}

View File

@@ -0,0 +1,71 @@
package envprovider
import (
"context"
"strings"
koanfenv "github.com/knadh/koanf/providers/env"
"go.signoz.io/signoz/pkg/config"
)
const (
prefix string = "SIGNOZ_"
scheme string = "env"
)
type provider struct{}
func NewFactory() config.ProviderFactory {
return config.NewProviderFactory(New)
}
func New(config config.ProviderConfig) config.Provider {
return &provider{}
}
func (provider *provider) Scheme() string {
return scheme
}
func (provider *provider) Get(ctx context.Context, uri config.Uri) (*config.Conf, error) {
conf := config.NewConf()
err := conf.Load(
koanfenv.Provider(
prefix,
// Do not set this to `_`. The correct delimiter is being set by the custom callback provided below.
// Since this had to be passed, using `config.KoanfDelimiter` eliminates any possible side effect.
config.KoanfDelimiter,
func(s string) string {
s = strings.ToLower(strings.TrimPrefix(s, prefix))
return provider.cb(s, config.KoanfDelimiter)
},
),
nil,
)
return conf, err
}
func (provider *provider) cb(s string, delim string) string {
delims := []rune(delim)
runes := []rune(s)
result := make([]rune, 0, len(runes))
for i := 0; i < len(runes); i++ {
// Check for double underscore pattern
if i < len(runes)-1 && runes[i] == '_' && runes[i+1] == '_' {
result = append(result, '_')
i++ // Skip next underscore
continue
}
if runes[i] == '_' {
result = append(result, delims...)
continue
}
result = append(result, runes[i])
}
return string(result)
}

View File

@@ -0,0 +1,78 @@
package envprovider
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/config"
)
func TestGetWithStrings(t *testing.T) {
t.Setenv("SIGNOZ_K1_K2", "string")
t.Setenv("SIGNOZ_K3__K4", "string")
t.Setenv("SIGNOZ_K5__K6_K7__K8", "string")
t.Setenv("SIGNOZ_K9___K10", "string")
t.Setenv("SIGNOZ_K11____K12", "string")
expected := map[string]any{
"k1::k2": "string",
"k3_k4": "string",
"k5_k6::k7_k8": "string",
"k9_::k10": "string",
"k11__k12": "string",
}
provider := New(config.ProviderConfig{})
actual, err := provider.Get(context.Background(), config.MustNewUri("env:"))
require.NoError(t, err)
assert.Equal(t, expected, actual.All())
}
func TestGetWithGoTypes(t *testing.T) {
t.Setenv("SIGNOZ_BOOL", "true")
t.Setenv("SIGNOZ_STRING", "string")
t.Setenv("SIGNOZ_INT", "1")
t.Setenv("SIGNOZ_SLICE", "[1,2]")
expected := map[string]any{
"bool": "true",
"int": "1",
"slice": "[1,2]",
"string": "string",
}
provider := New(config.ProviderConfig{})
actual, err := provider.Get(context.Background(), config.MustNewUri("env:"))
require.NoError(t, err)
assert.Equal(t, expected, actual.All())
}
func TestGetWithGoTypesWithUnmarshal(t *testing.T) {
t.Setenv("SIGNOZ_BOOL", "true")
t.Setenv("SIGNOZ_STRING", "string")
t.Setenv("SIGNOZ_INT", "1")
type test struct {
Bool bool `mapstructure:"bool"`
String string `mapstructure:"string"`
Int int `mapstructure:"int"`
}
expected := test{
Bool: true,
String: "string",
Int: 1,
}
provider := New(config.ProviderConfig{})
conf, err := provider.Get(context.Background(), config.MustNewUri("env:"))
require.NoError(t, err)
actual := test{}
err = conf.Unmarshal("", &actual)
require.NoError(t, err)
assert.Equal(t, expected, actual)
}

View File

@@ -0,0 +1,34 @@
package fileprovider
import (
"context"
koanfyaml "github.com/knadh/koanf/parsers/yaml"
koanffile "github.com/knadh/koanf/providers/file"
"go.signoz.io/signoz/pkg/config"
)
const (
scheme string = "file"
)
type provider struct{}
func NewFactory() config.ProviderFactory {
return config.NewProviderFactory(New)
}
func New(config config.ProviderConfig) config.Provider {
return &provider{}
}
func (provider *provider) Scheme() string {
return scheme
}
func (provider *provider) Get(ctx context.Context, uri config.Uri) (*config.Conf, error) {
conf := config.NewConf()
err := conf.Load(koanffile.Provider(uri.Value()), koanfyaml.Parser())
return conf, err
}

View File

@@ -0,0 +1,68 @@
package fileprovider
import (
"context"
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/config"
)
func TestGetWithStrings(t *testing.T) {
expected := map[string]any{
"k1::k2": "string",
"k3_k4": "string",
"k5_k6::k7_k8": "string",
"k9_::k10": "string",
"k11__k12": "string",
}
provider := New(config.ProviderConfig{})
actual, err := provider.Get(context.Background(), config.MustNewUri("file:"+filepath.Join("testdata", "strings.yaml")))
require.NoError(t, err)
assert.Equal(t, expected, actual.All())
}
func TestGetWithGoTypes(t *testing.T) {
expected := map[string]any{
"bool": true,
"int": 1,
"slice": []any{1, 2},
"string": "string",
}
provider := New(config.ProviderConfig{})
actual, err := provider.Get(context.Background(), config.MustNewUri("file:"+filepath.Join("testdata", "gotypes.yaml")))
require.NoError(t, err)
assert.Equal(t, expected, actual.All())
}
func TestGetWithGoTypesWithUnmarshal(t *testing.T) {
type test struct {
Bool bool `mapstructure:"bool"`
String string `mapstructure:"string"`
Int int `mapstructure:"int"`
Slice []any `mapstructure:"slice"`
}
expected := test{
Bool: true,
String: "string",
Int: 1,
Slice: []any{1, 2},
}
provider := New(config.ProviderConfig{})
conf, err := provider.Get(context.Background(), config.MustNewUri("file:"+filepath.Join("testdata", "gotypes.yaml")))
require.NoError(t, err)
actual := test{}
err = conf.Unmarshal("", &actual)
require.NoError(t, err)
assert.Equal(t, expected, actual)
}

View File

@@ -0,0 +1,6 @@
bool: true
string: string
int: 1
slice:
- 1
- 2

View File

@@ -0,0 +1,8 @@
k1:
k2: string
k3_k4: string
k5_k6:
k7_k8: string
k9_:
k10: string
k11__k12: string

View File

@@ -2,51 +2,38 @@ package config
import (
"context"
"fmt"
"go.opentelemetry.io/collector/confmap"
)
// Provides the configuration for signoz.
// NewProviderFunc is a function that creates a new provider.
type NewProviderFunc = func(ProviderConfig) Provider
// ProviderFactory is a factory that creates a new provider.
type ProviderFactory interface {
New(ProviderConfig) Provider
}
// NewProviderFactory creates a new provider factory.
func NewProviderFactory(f NewProviderFunc) ProviderFactory {
return &providerFactory{f: f}
}
// providerFactory is a factory that implements the ProviderFactory interface.
type providerFactory struct {
f NewProviderFunc
}
// New creates a new provider.
func (factory *providerFactory) New(config ProviderConfig) Provider {
return factory.f(config)
}
// ProviderConfig is the configuration for a provider.
type ProviderConfig struct{}
// Provider is an interface that represents a provider.
type Provider interface {
// Get returns the configuration, or error otherwise.
Get(ctx context.Context) (*Config, error)
}
type provider struct {
resolver *confmap.Resolver
}
// ProviderSettings are the settings to configure the behavior of the Provider.
type ProviderSettings struct {
// ResolverSettings are the settings to configure the behavior of the confmap.Resolver.
ResolverSettings confmap.ResolverSettings
}
// NewProvider returns a new Provider that provides the entire configuration.
// See https://github.com/open-telemetry/opentelemetry-collector/blob/main/otelcol/configprovider.go for
// more details
func NewProvider(settings ProviderSettings) (Provider, error) {
resolver, err := confmap.NewResolver(settings.ResolverSettings)
if err != nil {
return nil, err
}
return &provider{
resolver: resolver,
}, nil
}
func (provider *provider) Get(ctx context.Context) (*Config, error) {
conf, err := provider.resolver.Resolve(ctx)
if err != nil {
return nil, fmt.Errorf("cannot resolve configuration: %w", err)
}
config, err := unmarshal(conf)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal configuration: %w", err)
}
return config, nil
// Get returns the configuration for the given URI.
Get(context.Context, Uri) (*Conf, error)
// Scheme returns the scheme of the provider.
Scheme() string
}

87
pkg/config/resolver.go Normal file
View File

@@ -0,0 +1,87 @@
package config
import (
"context"
"errors"
"fmt"
)
type ResolverConfig struct {
// Each string or `uri` must follow "<scheme>:<value>" format. This format is compatible with the URI definition
// defined at https://datatracker.ietf.org/doc/html/rfc3986".
// It is required to have at least one uri.
Uris []string
// ProviderFactories is a slice of Provider factories.
// It is required to have at least one factory.
ProviderFactories []ProviderFactory
}
type Resolver struct {
uris []Uri
providers map[string]Provider
}
func NewResolver(config ResolverConfig) (*Resolver, error) {
if len(config.Uris) == 0 {
return nil, errors.New("cannot build resolver, no uris have been provided")
}
if len(config.ProviderFactories) == 0 {
return nil, errors.New("cannot build resolver, no providers have been provided")
}
uris := make([]Uri, len(config.Uris))
for i, inputUri := range config.Uris {
uri, err := NewUri(inputUri)
if err != nil {
return nil, err
}
uris[i] = uri
}
providers := make(map[string]Provider, len(config.ProviderFactories))
for _, factory := range config.ProviderFactories {
provider := factory.New(ProviderConfig{})
scheme := provider.Scheme()
// Check that the scheme is unique.
if _, ok := providers[scheme]; ok {
return nil, fmt.Errorf("cannot build resolver, duplicate scheme %q found", scheme)
}
providers[provider.Scheme()] = provider
}
return &Resolver{
uris: uris,
providers: providers,
}, nil
}
func (resolver *Resolver) Do(ctx context.Context) (*Conf, error) {
conf := NewConf()
for _, uri := range resolver.uris {
currentConf, err := resolver.get(ctx, uri)
if err != nil {
return nil, err
}
if err = conf.Merge(currentConf); err != nil {
return nil, fmt.Errorf("cannot merge config: %w", err)
}
}
return conf, nil
}
func (resolver *Resolver) get(ctx context.Context, uri Uri) (*Conf, error) {
provider, ok := resolver.providers[uri.scheme]
if !ok {
return nil, fmt.Errorf("cannot find provider with schema %q", uri.scheme)
}
return provider.Get(ctx, uri)
}

View File

@@ -1,49 +0,0 @@
package config
import (
"fmt"
"go.opentelemetry.io/collector/confmap"
)
// unmarshal converts a confmap.Conf into a Config struct.
// It splits the input confmap into a map of key-value pairs, fetches the corresponding
// signozconfmap.Config interface by name, merges it with the default config, validates it,
// and then creates a new confmap from the parsed map to unmarshal into the Config struct.
func unmarshal(conf *confmap.Conf) (*Config, error) {
raw := make(map[string]any)
if err := conf.Unmarshal(&raw); err != nil {
return nil, err
}
parsed := make(map[string]any)
// To help the defaults kick in, we need iterate over the default map instead of the raw values
for k, v := range defaults {
sub, err := conf.Sub(k)
if err != nil {
return nil, fmt.Errorf("cannot read config for %q: %w", k, err)
}
d := v.NewWithDefaults()
if err := sub.Unmarshal(&d); err != nil {
return nil, fmt.Errorf("cannot merge config for %q: %w", k, err)
}
err = d.Validate()
if err != nil {
return nil, fmt.Errorf("failed to validate config for for %q: %w", k, err)
}
parsed[k] = d
}
parsedConf := confmap.NewFromStringMap(parsed)
config := new(Config)
err := parsedConf.Unmarshal(config)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal config: %w", err)
}
return config, nil
}

46
pkg/config/uri.go Normal file
View File

@@ -0,0 +1,46 @@
package config
import (
"fmt"
"regexp"
)
var (
// uriRegex is a regex that matches the URI format. It complies with the URI definition defined at https://datatracker.ietf.org/doc/html/rfc3986.
// The format is "<scheme>:<value>".
uriRegex = regexp.MustCompile(`(?s:^(?P<Scheme>[A-Za-z][A-Za-z0-9+.-]+):(?P<Value>.*)$)`)
)
type Uri struct {
scheme string
value string
}
func NewUri(input string) (Uri, error) {
submatches := uriRegex.FindStringSubmatch(input)
if len(submatches) != 3 {
return Uri{}, fmt.Errorf("invalid uri: %q", input)
}
return Uri{
scheme: submatches[1],
value: submatches[2],
}, nil
}
func MustNewUri(input string) Uri {
uri, err := NewUri(input)
if err != nil {
panic(err)
}
return uri
}
func (uri Uri) Scheme() string {
return uri.scheme
}
func (uri Uri) Value() string {
return uri.value
}

35
pkg/config/uri_test.go Normal file
View File

@@ -0,0 +1,35 @@
package config
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewUri(t *testing.T) {
testCases := []struct {
input string
expected Uri
pass bool
}{
{input: "file:/path/1", expected: Uri{scheme: "file", value: "/path/1"}, pass: true},
{input: "file:", expected: Uri{scheme: "file", value: ""}, pass: true},
{input: "env:", expected: Uri{scheme: "env", value: ""}, pass: true},
{input: "scheme", expected: Uri{}, pass: false},
}
for _, tc := range testCases {
uri, err := NewUri(tc.input)
if !tc.pass {
assert.Error(t, err)
continue
}
require.NoError(t, err)
assert.NotPanics(t, func() { MustNewUri(tc.input) })
assert.Equal(t, tc.expected, uri)
assert.Equal(t, tc.expected.Scheme(), uri.scheme)
assert.Equal(t, tc.expected.Value(), uri.value)
}
}

View File

@@ -1,9 +0,0 @@
package confmap
// Config is an interface that defines methods for creating and validating configurations.
type Config interface {
// New creates a new instance of the configuration with default values.
NewWithDefaults() Config
// Validate the configuration and returns an error if invalid.
Validate() error
}

View File

@@ -1,3 +0,0 @@
// Package confmap is a wrapper on top of the confmap defined here:
// https://github.com/open-telemetry/opentelemetry-collector/blob/main/otelcol/configprovider.go/
package confmap

View File

@@ -1,94 +0,0 @@
package signozenvprovider
import (
"context"
"fmt"
"os"
"regexp"
"sort"
"strings"
"go.opentelemetry.io/collector/confmap"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
)
const (
schemeName string = "signozenv"
envPrefix string = "signoz"
separator string = "__"
envPrefixWithOneSeparator string = "signoz_"
envRegexString string = `^[a-zA-Z][a-zA-Z0-9_]*$`
)
var (
envRegex = regexp.MustCompile(envRegexString)
)
type provider struct {
logger *zap.Logger
}
// NewFactory returns a factory for a confmap.Provider that reads the configuration from the environment.
// All variables starting with `SIGNOZ__` are read from the environment.
// The separator is `__` (2 underscores) in order to incorporate env variables having keys with a single `_`
func NewFactory() confmap.ProviderFactory {
return confmap.NewProviderFactory(newProvider)
}
func newProvider(settings confmap.ProviderSettings) confmap.Provider {
return &provider{
logger: settings.Logger,
}
}
func (provider *provider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) {
if !strings.HasPrefix(uri, schemeName+":") {
return nil, fmt.Errorf("%q uri is not supported by %q provider", uri, schemeName)
}
// Read and Sort environment variables for consistent output
envvars := os.Environ()
sort.Strings(envvars)
// Create a map m containing key value pairs
m := make(map[string]any)
for _, envvar := range envvars {
parts := strings.SplitN(envvar, "=", 2)
if len(parts) != 2 {
continue
}
key := strings.ToLower(parts[0])
val := parts[1]
if strings.HasPrefix(key, envPrefixWithOneSeparator) {
// Remove the envPrefix from the key
key = strings.Replace(key, envPrefix+separator, "", 1)
// Check whether the resulting key matches with the regex
if !envRegex.MatchString(key) {
provider.logger.Warn("Configuration references invalid environment variable key", zap.String("key", key))
continue
}
// Convert key into yaml format
key = strings.ToLower(strings.ReplaceAll(key, separator, confmap.KeyDelimiter))
m[key] = val
}
}
out, err := yaml.Marshal(m)
if err != nil {
return nil, err
}
return confmap.NewRetrievedFromYAML(out)
}
func (*provider) Scheme() string {
return schemeName
}
func (*provider) Shutdown(context.Context) error {
return nil
}

View File

@@ -1,40 +0,0 @@
package signozenvprovider
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/confmaptest"
)
func createProvider() confmap.Provider {
return NewFactory().Create(confmaptest.NewNopProviderSettings())
}
func TestValidateProviderScheme(t *testing.T) {
assert.NoError(t, confmaptest.ValidateProviderScheme(createProvider()))
}
func TestRetrieve(t *testing.T) {
t.Setenv("SIGNOZ__STORAGE__DSN", "localhost:9000")
t.Setenv("SIGNOZ__SIGNOZ_ENABLED", "true")
t.Setenv("SIGNOZ__INSTRUMENTATION__LOGS__ENABLED", "true")
expected := confmap.NewFromStringMap(map[string]any{
"storage::dsn": "localhost:9000",
"signoz_enabled": "true",
"instrumentation::logs::enabled": "true",
})
signoz := createProvider()
retrieved, err := signoz.Retrieve(context.Background(), schemeName+":", nil)
require.NoError(t, err)
actual, err := retrieved.AsConf()
require.NoError(t, err)
assert.Equal(t, expected.ToStringMap(), actual.ToStringMap())
assert.NoError(t, signoz.Shutdown(context.Background()))
}

View File

@@ -21,7 +21,7 @@ type configFactory struct {
newConfigFunc NewConfigFunc
}
// Name returns the name of the factory.
// New creates a new config.
func (factory *configFactory) Name() Name {
return factory.name
}

View File

@@ -1,29 +0,0 @@
package factory
import (
"testing"
"github.com/stretchr/testify/assert"
)
type c1 struct{}
func (c1) Validate() error {
return nil
}
func TestNewConfigFactory(t *testing.T) {
cf := NewConfigFactory(MustNewName("c1"), func() Config {
return c1{}
})
assert.Equal(t, MustNewName("c1"), cf.Name())
assert.IsType(t, c1{}, cf.New())
}
func TestNewConfigFactoryWithPointer(t *testing.T) {
cfp := NewConfigFactory(MustNewName("c1"), func() Config {
return &c1{}
})
assert.Equal(t, MustNewName("c1"), cfp.Name())
assert.IsType(t, &c1{}, cfp.New())
}

View File

@@ -1,4 +1,4 @@
package servicetest
package factorytest
import (
"context"

View File

@@ -1,20 +0,0 @@
package factory
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestName(t *testing.T) {
assert.Equal(t, Name{name: "c1"}, MustNewName("c1"))
}
func TestNameWithInvalidCharacters(t *testing.T) {
_, err := NewName("c1%")
assert.Error(t, err)
assert.Panics(t, func() {
MustNewName("c1%")
})
}

View File

@@ -12,8 +12,6 @@ type NamedMap[T Named] struct {
factoriesInOrder []T
}
// NewNamedMap creates a new NamedMap from a list of factories.
// It returns an error if the factories have duplicate names.
func NewNamedMap[T Named](factories ...T) (NamedMap[T], error) {
fmap := make(map[Name]T)
for _, factory := range factories {
@@ -27,8 +25,6 @@ func NewNamedMap[T Named](factories ...T) (NamedMap[T], error) {
return NamedMap[T]{factories: fmap, factoriesInOrder: factories}, nil
}
// MustNewNamedMap creates a new NamedMap from a list of factories.
// It panics if the factories have duplicate names.
func MustNewNamedMap[T Named](factories ...T) NamedMap[T] {
nm, err := NewNamedMap(factories...)
if err != nil {
@@ -37,9 +33,7 @@ func MustNewNamedMap[T Named](factories ...T) NamedMap[T] {
return nm
}
// Get returns the factory for the given name by string.
// It returns an error if the factory is not found or the name is invalid.
func (n *NamedMap[T]) Get(namestr string) (t T, err error) {
func (n NamedMap[T]) Get(namestr string) (t T, err error) {
name, err := NewName(namestr)
if err != nil {
return
@@ -55,20 +49,16 @@ func (n *NamedMap[T]) Get(namestr string) (t T, err error) {
return
}
// Add adds a factory to the NamedMap.
// It returns an error if the factory already exists.
func (n *NamedMap[T]) Add(factory T) (err error) {
func (n NamedMap[T]) Add(factory T) (err error) {
name := factory.Name()
if _, ok := n.factories[name]; ok {
return fmt.Errorf("factory %q already exists", name)
}
n.factories[name] = factory
n.factoriesInOrder = append(n.factoriesInOrder, factory)
return nil
}
// GetInOrder returns the factories in the order they were added.
func (n *NamedMap[T]) GetInOrder() []T {
func (n NamedMap[T]) GetInOrder() []T {
return n.factoriesInOrder
}

View File

@@ -1,72 +0,0 @@
package factory
import (
"testing"
"github.com/stretchr/testify/assert"
)
type f1 struct{}
func (*f1) Name() Name {
return MustNewName("f1")
}
type f2 struct{}
func (*f2) Name() Name {
return MustNewName("f2")
}
func TestNewNamedMap(t *testing.T) {
nm, err := NewNamedMap[Named](&f1{}, &f2{})
assert.NoError(t, err)
assert.Equal(t, map[Name]Named{
MustNewName("f1"): &f1{},
MustNewName("f2"): &f2{},
}, nm.factories)
assert.Equal(t, []Named{&f1{}, &f2{}}, nm.GetInOrder())
}
func TestNewNamedMapWithDuplicateNames(t *testing.T) {
_, err := NewNamedMap[Named](&f1{}, &f1{})
assert.Error(t, err)
}
func TestMustNewNamedMap(t *testing.T) {
nm := MustNewNamedMap[Named](&f1{}, &f2{})
assert.Equal(t, map[Name]Named{
MustNewName("f1"): &f1{},
MustNewName("f2"): &f2{},
}, nm.factories)
assert.Equal(t, []Named{&f1{}, &f2{}}, nm.GetInOrder())
}
func TestMustNewNamedMapDuplicateNames(t *testing.T) {
assert.Panics(t, func() {
MustNewNamedMap[Named](&f1{}, &f1{})
})
}
func TestNamedMapGet(t *testing.T) {
nm := MustNewNamedMap[Named](&f1{}, &f2{})
nf1, err := nm.Get("f1")
assert.NoError(t, err)
assert.IsType(t, &f1{}, nf1)
_, err = nm.Get("f3")
assert.Error(t, err)
}
func TestNamedMapAdd(t *testing.T) {
nm := MustNewNamedMap[Named](&f1{})
err := nm.Add(&f2{})
assert.NoError(t, err)
assert.Equal(t, map[Name]Named{
MustNewName("f1"): &f1{},
MustNewName("f2"): &f2{},
}, nm.factories)
assert.Equal(t, []Named{&f1{}, &f2{}}, nm.GetInOrder())
}

View File

@@ -1,41 +0,0 @@
package factory
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
)
type p1 struct{}
type pc1 struct{}
func (pc1) Validate() error {
return nil
}
func TestNewProviderFactory(t *testing.T) {
pf := NewProviderFactory(MustNewName("p1"), func(ctx context.Context, settings ProviderSettings, config pc1) (p1, error) {
return p1{}, nil
})
assert.Equal(t, MustNewName("p1"), pf.Name())
p, err := pf.New(context.Background(), ProviderSettings{}, pc1{})
assert.NoError(t, err)
assert.IsType(t, p1{}, p)
}
func TestNewProviderFactoryFromFactory(t *testing.T) {
pf := NewProviderFactory(MustNewName("p1"), func(ctx context.Context, settings ProviderSettings, config pc1) (p1, error) {
return p1{}, nil
})
m := MustNewNamedMap(pf)
assert.Equal(t, MustNewName("p1"), pf.Name())
p, err := NewFromFactory(context.Background(), ProviderSettings{}, pc1{}, m, "p1")
assert.NoError(t, err)
assert.IsType(t, p1{}, p)
_, err = NewFromFactory(context.Background(), ProviderSettings{}, pc1{}, m, "p2")
assert.Error(t, err)
}

View File

@@ -1,12 +1,5 @@
package server
import (
"go.signoz.io/signoz/pkg/confmap"
)
// Config satisfies the confmap.Config interface
var _ confmap.Config = (*Config)(nil)
// Config holds the configuration for http.
type Config struct {
//Address specifies the TCP address for the server to listen on, in the form "host:port".
@@ -15,7 +8,7 @@ type Config struct {
Address string `mapstructure:"address"`
}
func (c *Config) NewWithDefaults() confmap.Config {
func (c *Config) NewWithDefaults() *Config {
return &Config{
Address: "0.0.0.0:8080",
}

View File

@@ -6,21 +6,21 @@ import (
"net/http"
"time"
"go.signoz.io/signoz/pkg/registry"
"go.signoz.io/signoz/pkg/factory"
"go.uber.org/zap"
)
var _ registry.NamedService = (*Server)(nil)
var _ factory.Service = (*Server)(nil)
type Server struct {
srv *http.Server
logger *zap.Logger
handler http.Handler
cfg Config
name string
name factory.Name
}
func New(logger *zap.Logger, name string, cfg Config, handler http.Handler) (*Server, error) {
func New(logger *zap.Logger, name factory.Name, cfg Config, handler http.Handler) (*Server, error) {
if handler == nil {
return nil, fmt.Errorf("cannot build http server, handler is required")
}
@@ -29,10 +29,6 @@ func New(logger *zap.Logger, name string, cfg Config, handler http.Handler) (*Se
return nil, fmt.Errorf("cannot build http server, logger is required")
}
if name == "" {
return nil, fmt.Errorf("cannot build http server, name is required")
}
srv := &http.Server{
Addr: cfg.Address,
Handler: handler,
@@ -50,7 +46,7 @@ func New(logger *zap.Logger, name string, cfg Config, handler http.Handler) (*Se
}, nil
}
func (server *Server) Name() string {
func (server *Server) Name() factory.Name {
return server.name
}

View File

@@ -1,34 +1,151 @@
package instrumentation
import (
"os"
"context"
"go.opentelemetry.io/contrib/bridges/otelzap"
contribsdkconfig "go.opentelemetry.io/contrib/config"
sdklog "go.opentelemetry.io/otel/log"
sdkmetric "go.opentelemetry.io/otel/metric"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
sdktrace "go.opentelemetry.io/otel/trace"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/version"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// Instrumentation provides the core components for application instrumentation.
var _ factory.Service = (*SDK)(nil)
var _ Instrumentation = (*SDK)(nil)
type Instrumentation interface {
// LoggerProvider returns the OpenTelemetry logger provider.
LoggerProvider() sdklog.LoggerProvider
// Logger returns the Zap logger.
Logger() *zap.Logger
// MeterProvider returns the OpenTelemetry meter provider.
MeterProvider() sdkmetric.MeterProvider
// TracerProvider returns the OpenTelemetry tracer provider.
TracerProvider() sdktrace.TracerProvider
// ToProviderSettings converts instrumentation to provider settings.
ToProviderSettings() factory.ProviderSettings
}
// Merges the input attributes with the resource attributes.
func mergeAttributes(input map[string]any, resource *sdkresource.Resource) map[string]any {
// SDK holds the core components for application instrumentation.
type SDK struct {
sdk contribsdkconfig.SDK
logger *zap.Logger
}
// New creates a new Instrumentation instance with configured providers.
// It sets up logging, tracing, and metrics based on the provided configuration.
func New(ctx context.Context, build version.Build, cfg Config) (*SDK, error) {
// Set default resource attributes if not provided
if cfg.Resource.Attributes == nil {
cfg.Resource.Attributes = map[string]any{
string(semconv.ServiceNameKey): build.Name,
string(semconv.ServiceVersionKey): build.Version,
}
}
// Create a new resource with default detectors.
// The upstream contrib repository is not taking detectors into account.
// We are, therefore, using some sensible defaults here.
resource, err := sdkresource.New(
ctx,
sdkresource.WithContainer(),
sdkresource.WithFromEnv(),
sdkresource.WithHost(),
)
if err != nil {
return nil, err
}
// Prepare the resource configuration by merging
// resource and attributes.
sch := semconv.SchemaURL
configResource := contribsdkconfig.Resource{
Attributes: attributes(cfg.Resource.Attributes, resource),
Detectors: nil,
SchemaUrl: &sch,
}
var loggerProvider *contribsdkconfig.LoggerProvider
if cfg.Logs.Enabled {
loggerProvider = &contribsdkconfig.LoggerProvider{
Processors: []contribsdkconfig.LogRecordProcessor{
{Batch: &cfg.Logs.Processors.Batch},
},
}
}
var tracerProvider *contribsdkconfig.TracerProvider
if cfg.Traces.Enabled {
tracerProvider = &contribsdkconfig.TracerProvider{
Processors: []contribsdkconfig.SpanProcessor{
{Batch: &cfg.Traces.Processors.Batch},
},
Sampler: &cfg.Traces.Sampler,
}
}
var meterProvider *contribsdkconfig.MeterProvider
if cfg.Metrics.Enabled {
meterProvider = &contribsdkconfig.MeterProvider{
Readers: []contribsdkconfig.MetricReader{
{Pull: &cfg.Metrics.Readers.Pull},
},
}
}
sdk, err := contribsdkconfig.NewSDK(
contribsdkconfig.WithContext(ctx),
contribsdkconfig.WithOpenTelemetryConfiguration(contribsdkconfig.OpenTelemetryConfiguration{
LoggerProvider: loggerProvider,
TracerProvider: tracerProvider,
MeterProvider: meterProvider,
Resource: &configResource,
}),
)
if err != nil {
return nil, err
}
return &SDK{
sdk: sdk,
logger: newLogger(cfg, sdk.LoggerProvider()),
}, nil
}
func (i *SDK) Start(ctx context.Context) error {
return nil
}
func (i *SDK) Stop(ctx context.Context) error {
return i.sdk.Shutdown(ctx)
}
func (i *SDK) LoggerProvider() sdklog.LoggerProvider {
return i.sdk.LoggerProvider()
}
func (i *SDK) Logger() *zap.Logger {
return i.logger
}
func (i *SDK) MeterProvider() sdkmetric.MeterProvider {
return i.sdk.MeterProvider()
}
func (i *SDK) TracerProvider() sdktrace.TracerProvider {
return i.sdk.TracerProvider()
}
func (i *SDK) ToProviderSettings() factory.ProviderSettings {
return factory.ProviderSettings{
LoggerProvider: i.LoggerProvider(),
ZapLogger: i.Logger(),
MeterProvider: i.MeterProvider(),
TracerProvider: i.TracerProvider(),
}
}
// attributes merges the input attributes with the resource attributes.
func attributes(input map[string]any, resource *sdkresource.Resource) map[string]any {
output := make(map[string]any)
for k, v := range input {
@@ -42,14 +159,3 @@ func mergeAttributes(input map[string]any, resource *sdkresource.Resource) map[s
return output
}
// newLogger creates a new Zap logger with the configured level and output.
// It combines a JSON encoder for stdout and an OpenTelemetry bridge.
func newLogger(cfg Config, provider sdklog.LoggerProvider) *zap.Logger {
core := zapcore.NewTee(
zapcore.NewCore(zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), zapcore.AddSync(os.Stdout), cfg.Logs.Level),
otelzap.NewCore("go.signoz.io/pkg/instrumentation", otelzap.WithLoggerProvider(provider)),
)
return zap.New(core, zap.AddCaller(), zap.AddStacktrace(zap.ErrorLevel))
}

View File

@@ -0,0 +1,21 @@
package instrumentation
import (
"os"
"go.opentelemetry.io/contrib/bridges/otelzap"
sdklog "go.opentelemetry.io/otel/log"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// newLogger creates a new Zap logger with the configured level and output.
// It combines a JSON encoder for stdout and an OpenTelemetry bridge.
func newLogger(cfg Config, provider sdklog.LoggerProvider) *zap.Logger {
core := zapcore.NewTee(
zapcore.NewCore(zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), zapcore.AddSync(os.Stdout), cfg.Logs.Level),
otelzap.NewCore("go.signoz.io/pkg/instrumentation", otelzap.WithLoggerProvider(provider)),
)
return zap.New(core, zap.AddCaller(), zap.AddStacktrace(zap.ErrorLevel))
}

View File

@@ -1,137 +0,0 @@
package instrumentation
import (
"context"
contribsdkconfig "go.opentelemetry.io/contrib/config"
sdklog "go.opentelemetry.io/otel/log"
sdkmetric "go.opentelemetry.io/otel/metric"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
sdktrace "go.opentelemetry.io/otel/trace"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/version"
"go.uber.org/zap"
)
var _ factory.Service = (*SDK)(nil)
var _ Instrumentation = (*SDK)(nil)
// SDK holds the core components for application instrumentation.
type SDK struct {
sdk contribsdkconfig.SDK
logger *zap.Logger
}
// New creates a new Instrumentation instance with configured providers.
// It sets up logging, tracing, and metrics based on the provided configuration.
func New(ctx context.Context, build version.Build, cfg Config) (*SDK, error) {
// Set default resource attributes if not provided
if cfg.Resource.Attributes == nil {
cfg.Resource.Attributes = map[string]any{
string(semconv.ServiceNameKey): build.Name,
string(semconv.ServiceVersionKey): build.Version,
}
}
// Create a new resource with default detectors.
// The upstream contrib repository is not taking detectors into account.
// We are, therefore, using some sensible defaults here.
resource, err := sdkresource.New(
ctx,
sdkresource.WithContainer(),
sdkresource.WithFromEnv(),
sdkresource.WithHost(),
)
if err != nil {
return nil, err
}
// Prepare the resource configuration by merging
// resource and attributes.
sch := semconv.SchemaURL
configResource := contribsdkconfig.Resource{
Attributes: mergeAttributes(cfg.Resource.Attributes, resource),
Detectors: nil,
SchemaUrl: &sch,
}
var loggerProvider *contribsdkconfig.LoggerProvider
if cfg.Logs.Enabled {
loggerProvider = &contribsdkconfig.LoggerProvider{
Processors: []contribsdkconfig.LogRecordProcessor{
{Batch: &cfg.Logs.Processors.Batch},
},
}
}
var tracerProvider *contribsdkconfig.TracerProvider
if cfg.Traces.Enabled {
tracerProvider = &contribsdkconfig.TracerProvider{
Processors: []contribsdkconfig.SpanProcessor{
{Batch: &cfg.Traces.Processors.Batch},
},
Sampler: &cfg.Traces.Sampler,
}
}
var meterProvider *contribsdkconfig.MeterProvider
if cfg.Metrics.Enabled {
meterProvider = &contribsdkconfig.MeterProvider{
Readers: []contribsdkconfig.MetricReader{
{Pull: &cfg.Metrics.Readers.Pull},
},
}
}
sdk, err := contribsdkconfig.NewSDK(
contribsdkconfig.WithContext(ctx),
contribsdkconfig.WithOpenTelemetryConfiguration(contribsdkconfig.OpenTelemetryConfiguration{
LoggerProvider: loggerProvider,
TracerProvider: tracerProvider,
MeterProvider: meterProvider,
Resource: &configResource,
}),
)
if err != nil {
return nil, err
}
return &SDK{
sdk: sdk,
logger: newLogger(cfg, sdk.LoggerProvider()),
}, nil
}
func (i *SDK) Start(ctx context.Context) error {
return nil
}
func (i *SDK) Stop(ctx context.Context) error {
return i.sdk.Shutdown(ctx)
}
func (i *SDK) LoggerProvider() sdklog.LoggerProvider {
return i.sdk.LoggerProvider()
}
func (i *SDK) Logger() *zap.Logger {
return i.logger
}
func (i *SDK) MeterProvider() sdkmetric.MeterProvider {
return i.sdk.MeterProvider()
}
func (i *SDK) TracerProvider() sdktrace.TracerProvider {
return i.sdk.TracerProvider()
}
func (i *SDK) ToProviderSettings() factory.ProviderSettings {
return factory.ProviderSettings{
LoggerProvider: i.LoggerProvider(),
ZapLogger: i.Logger(),
MeterProvider: i.MeterProvider(),
TracerProvider: i.TracerProvider(),
}
}

View File

@@ -8,7 +8,6 @@ import (
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/agentConf/sqlite"
"go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
"golang.org/x/exp/slices"
@@ -19,15 +18,6 @@ type Repo struct {
db *sqlx.DB
}
func (r *Repo) initDB(engine string) error {
switch engine {
case "sqlite3", "sqlite":
return sqlite.InitDB(r.db)
default:
return fmt.Errorf("unsupported db")
}
}
func (r *Repo) GetConfigHistory(
ctx context.Context, typ ElementTypeDef, limit int,
) ([]ConfigVersion, *model.ApiError) {

View File

@@ -39,8 +39,7 @@ type Manager struct {
}
type ManagerOptions struct {
DB *sqlx.DB
DBEngine string
DB *sqlx.DB
// When acting as opamp.AgentConfigProvider, agent conf recommendations are
// applied to the base conf in the order the features have been specified here.
@@ -66,10 +65,6 @@ func Initiate(options *ManagerOptions) (*Manager, error) {
configSubscribers: map[string]func(){},
}
err := m.initDB(options.DBEngine)
if err != nil {
return nil, errors.Wrap(err, "could not init agentConf db")
}
return m, nil
}

View File

@@ -1,65 +0,0 @@
package sqlite
import (
"fmt"
"github.com/pkg/errors"
"github.com/jmoiron/sqlx"
)
func InitDB(db *sqlx.DB) error {
var err error
if db == nil {
return fmt.Errorf("invalid db connection")
}
table_schema := `CREATE TABLE IF NOT EXISTS agent_config_versions(
id TEXT PRIMARY KEY,
created_by TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_by TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
version INTEGER DEFAULT 1,
active int,
is_valid int,
disabled int,
element_type VARCHAR(120) NOT NULL,
deploy_status VARCHAR(80) NOT NULL DEFAULT 'DIRTY',
deploy_sequence INTEGER,
deploy_result TEXT,
last_hash TEXT,
last_config TEXT,
UNIQUE(element_type, version)
);
CREATE UNIQUE INDEX IF NOT EXISTS agent_config_versions_u1
ON agent_config_versions(element_type, version);
CREATE INDEX IF NOT EXISTS agent_config_versions_nu1
ON agent_config_versions(last_hash);
CREATE TABLE IF NOT EXISTS agent_config_elements(
id TEXT PRIMARY KEY,
created_by TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_by TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
element_id TEXT NOT NULL,
element_type VARCHAR(120) NOT NULL,
version_id TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS agent_config_elements_u1
ON agent_config_elements(version_id, element_id, element_type);
`
_, err = db.Exec(table_schema)
if err != nil {
return errors.Wrap(err, "Error in creating agent config tables")
}
return nil
}

View File

@@ -1,217 +0,0 @@
package cloudintegrations
import (
"bytes"
"embed"
"encoding/json"
"fmt"
"io/fs"
"path"
"sort"
koanfJson "github.com/knadh/koanf/parsers/json"
"go.signoz.io/signoz/pkg/query-service/app/integrations"
"go.signoz.io/signoz/pkg/query-service/model"
"golang.org/x/exp/maps"
)
func listCloudProviderServices(
cloudProvider string,
) ([]CloudServiceDetails, *model.ApiError) {
cloudServices := availableServices[cloudProvider]
if cloudServices == nil {
return nil, model.NotFoundError(fmt.Errorf(
"unsupported cloud provider: %s", cloudProvider,
))
}
services := maps.Values(cloudServices)
sort.Slice(services, func(i, j int) bool {
return services[i].Id < services[j].Id
})
return services, nil
}
func getCloudProviderService(
cloudProvider string, serviceId string,
) (*CloudServiceDetails, *model.ApiError) {
cloudServices := availableServices[cloudProvider]
if cloudServices == nil {
return nil, model.NotFoundError(fmt.Errorf(
"unsupported cloud provider: %s", cloudProvider,
))
}
svc, exists := cloudServices[serviceId]
if !exists {
return nil, model.NotFoundError(fmt.Errorf(
"%s service not found: %s", cloudProvider, serviceId,
))
}
return &svc, nil
}
// End of API. Logic for reading service definition files follows
// Service details read from ./serviceDefinitions
// { "providerName": { "service_id": {...}} }
var availableServices map[string]map[string]CloudServiceDetails
func init() {
err := readAllServiceDefinitions()
if err != nil {
panic(fmt.Errorf(
"couldn't read cloud service definitions: %w", err,
))
}
}
//go:embed serviceDefinitions/*
var serviceDefinitionFiles embed.FS
func readAllServiceDefinitions() error {
availableServices = map[string]map[string]CloudServiceDetails{}
rootDirName := "serviceDefinitions"
cloudProviderDirs, err := fs.ReadDir(serviceDefinitionFiles, rootDirName)
if err != nil {
return fmt.Errorf("couldn't read dirs in %s: %w", rootDirName, err)
}
for _, d := range cloudProviderDirs {
if !d.IsDir() {
continue
}
cloudProviderDirPath := path.Join(rootDirName, d.Name())
cloudServices, err := readServiceDefinitionsFromDir(cloudProviderDirPath)
if err != nil {
return fmt.Errorf("couldn't read %s service definitions", d.Name())
}
if len(cloudServices) < 1 {
return fmt.Errorf("no %s services could be read", d.Name())
}
availableServices[d.Name()] = cloudServices
}
return nil
}
func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
map[string]CloudServiceDetails, error,
) {
svcDefDirs, err := fs.ReadDir(serviceDefinitionFiles, cloudProviderDirPath)
if err != nil {
return nil, fmt.Errorf("couldn't list integrations dirs: %w", err)
}
svcDefs := map[string]CloudServiceDetails{}
for _, d := range svcDefDirs {
if !d.IsDir() {
continue
}
svcDirPath := path.Join(cloudProviderDirPath, d.Name())
s, err := readServiceDefinition(svcDirPath)
if err != nil {
return nil, fmt.Errorf("couldn't read svc definition for %s: %w", d.Name(), err)
}
_, exists := svcDefs[s.Id]
if exists {
return nil, fmt.Errorf(
"duplicate service definition for id %s at %s", s.Id, d.Name(),
)
}
svcDefs[s.Id] = *s
}
return svcDefs, nil
}
func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
integrationJsonPath := path.Join(dirpath, "integration.json")
serializedSpec, err := serviceDefinitionFiles.ReadFile(integrationJsonPath)
if err != nil {
return nil, fmt.Errorf(
"couldn't find integration.json in %s: %w",
dirpath, err,
)
}
integrationSpec, err := koanfJson.Parser().Unmarshal(serializedSpec)
if err != nil {
return nil, fmt.Errorf(
"couldn't parse integration.json from %s: %w",
integrationJsonPath, err,
)
}
hydrated, err := integrations.HydrateFileUris(
integrationSpec, serviceDefinitionFiles, dirpath,
)
if err != nil {
return nil, fmt.Errorf(
"couldn't hydrate files referenced in service definition %s: %w",
integrationJsonPath, err,
)
}
hydratedSpec := hydrated.(map[string]interface{})
hydratedSpecJson, err := koanfJson.Parser().Marshal(hydratedSpec)
if err != nil {
return nil, fmt.Errorf(
"couldn't serialize hydrated integration spec back to JSON %s: %w",
integrationJsonPath, err,
)
}
var serviceDef CloudServiceDetails
decoder := json.NewDecoder(bytes.NewReader(hydratedSpecJson))
decoder.DisallowUnknownFields()
err = decoder.Decode(&serviceDef)
if err != nil {
return nil, fmt.Errorf(
"couldn't parse hydrated JSON spec read from %s: %w",
integrationJsonPath, err,
)
}
err = validateServiceDefinition(serviceDef)
if err != nil {
return nil, fmt.Errorf("invalid service definition %s: %w", serviceDef.Id, err)
}
return &serviceDef, nil
}
func validateServiceDefinition(s CloudServiceDetails) error {
// Validate dashboard data
seenDashboardIds := map[string]interface{}{}
for _, dd := range s.Assets.Dashboards {
did, exists := dd["id"]
if !exists {
return fmt.Errorf("id is required. not specified in dashboard titled %v", dd["title"])
}
dashboardId, ok := did.(string)
if !ok {
return fmt.Errorf("id must be string in dashboard titled %v", dd["title"])
}
if _, seen := seenDashboardIds[dashboardId]; seen {
return fmt.Errorf("multiple dashboards found with id %s", dashboardId)
}
seenDashboardIds[dashboardId] = nil
}
// potentially more to follow
return nil
}

View File

@@ -1,34 +0,0 @@
package cloudintegrations
import (
"testing"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/model"
)
func TestAvailableServices(t *testing.T) {
require := require.New(t)
// should be able to list available services.
_, apiErr := listCloudProviderServices("bad-cloud-provider")
require.NotNil(apiErr)
require.Equal(model.ErrorNotFound, apiErr.Type())
awsSvcs, apiErr := listCloudProviderServices("aws")
require.Nil(apiErr)
require.Greater(len(awsSvcs), 0)
// should be able to get details of a service
_, apiErr = getCloudProviderService(
"aws", "bad-service-id",
)
require.NotNil(apiErr)
require.Equal(model.ErrorNotFound, apiErr.Type())
svc, apiErr := getCloudProviderService(
"aws", awsSvcs[0].Id,
)
require.Nil(apiErr)
require.Equal(*svc, awsSvcs[0])
}

View File

@@ -22,26 +22,19 @@ func validateCloudProviderName(name string) *model.ApiError {
}
type Controller struct {
accountsRepo cloudProviderAccountsRepository
serviceConfigRepo serviceConfigRepository
repo cloudProviderAccountsRepository
}
func NewController(db *sqlx.DB) (
*Controller, error,
) {
accountsRepo, err := newCloudProviderAccountsRepository(db)
repo, err := newCloudProviderAccountsRepository(db)
if err != nil {
return nil, fmt.Errorf("couldn't create cloud provider accounts repo: %w", err)
}
serviceConfigRepo, err := newServiceConfigRepository(db)
if err != nil {
return nil, fmt.Errorf("couldn't create cloud provider service config repo: %w", err)
}
return &Controller{
accountsRepo: accountsRepo,
serviceConfigRepo: serviceConfigRepo,
repo: repo,
}, nil
}
@@ -65,7 +58,7 @@ func (c *Controller) ListConnectedAccounts(
return nil, apiErr
}
accountRecords, apiErr := c.accountsRepo.listConnected(ctx, cloudProvider)
accountRecords, apiErr := c.repo.listConnected(ctx, cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud accounts")
}
@@ -107,7 +100,7 @@ func (c *Controller) GenerateConnectionUrl(
return nil, model.BadRequest(fmt.Errorf("unsupported cloud provider: %s", cloudProvider))
}
account, apiErr := c.accountsRepo.upsert(
account, apiErr := c.repo.upsert(
ctx, cloudProvider, req.AccountId, &req.AccountConfig, nil, nil, nil,
)
if apiErr != nil {
@@ -127,9 +120,8 @@ func (c *Controller) GenerateConnectionUrl(
}
type AccountStatusResponse struct {
Id string `json:"id"`
CloudAccountId *string `json:"cloud_account_id,omitempty"`
Status AccountStatus `json:"status"`
Id string `json:"id"`
Status AccountStatus `json:"status"`
}
func (c *Controller) GetAccountStatus(
@@ -141,15 +133,14 @@ func (c *Controller) GetAccountStatus(
return nil, apiErr
}
account, apiErr := c.accountsRepo.get(ctx, cloudProvider, accountId)
account, apiErr := c.repo.get(ctx, cloudProvider, accountId)
if apiErr != nil {
return nil, apiErr
}
resp := AccountStatusResponse{
Id: account.Id,
CloudAccountId: account.CloudAccountId,
Status: account.status(),
Id: account.Id,
Status: account.status(),
}
return &resp, nil
@@ -173,7 +164,7 @@ func (c *Controller) CheckInAsAgent(
return nil, apiErr
}
existingAccount, apiErr := c.accountsRepo.get(ctx, cloudProvider, req.AccountId)
existingAccount, apiErr := c.repo.get(ctx, cloudProvider, req.AccountId)
if existingAccount != nil && existingAccount.CloudAccountId != nil && *existingAccount.CloudAccountId != req.CloudAccountId {
return nil, model.BadRequest(fmt.Errorf(
"can't check in with new %s account id %s for account %s with existing %s id %s",
@@ -181,7 +172,7 @@ func (c *Controller) CheckInAsAgent(
))
}
existingAccount, apiErr = c.accountsRepo.getConnectedCloudAccount(ctx, cloudProvider, req.CloudAccountId)
existingAccount, apiErr = c.repo.getConnectedCloudAccount(ctx, cloudProvider, req.CloudAccountId)
if existingAccount != nil && existingAccount.Id != req.AccountId {
return nil, model.BadRequest(fmt.Errorf(
"can't check in to %s account %s with id %s. already connected with id %s",
@@ -194,7 +185,7 @@ func (c *Controller) CheckInAsAgent(
Data: req.Data,
}
account, apiErr := c.accountsRepo.upsert(
account, apiErr := c.repo.upsert(
ctx, cloudProvider, &req.AccountId, nil, &req.CloudAccountId, &agentReport, nil,
)
if apiErr != nil {
@@ -220,7 +211,7 @@ func (c *Controller) UpdateAccountConfig(
return nil, apiErr
}
accountRecord, apiErr := c.accountsRepo.upsert(
accountRecord, apiErr := c.repo.upsert(
ctx, cloudProvider, &accountId, &req.Config, nil, nil, nil,
)
if apiErr != nil {
@@ -239,13 +230,13 @@ func (c *Controller) DisconnectAccount(
return nil, apiErr
}
account, apiErr := c.accountsRepo.get(ctx, cloudProvider, accountId)
account, apiErr := c.repo.get(ctx, cloudProvider, accountId)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't disconnect account")
}
tsNow := time.Now()
account, apiErr = c.accountsRepo.upsert(
account, apiErr = c.repo.upsert(
ctx, cloudProvider, &accountId, nil, nil, nil, &tsNow,
)
if apiErr != nil {
@@ -254,127 +245,3 @@ func (c *Controller) DisconnectAccount(
return account, nil
}
type ListServicesResponse struct {
Services []CloudServiceSummary `json:"services"`
}
func (c *Controller) ListServices(
ctx context.Context,
cloudProvider string,
cloudAccountId *string,
) (*ListServicesResponse, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
services, apiErr := listCloudProviderServices(cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud services")
}
svcConfigs := map[string]*CloudServiceConfig{}
if cloudAccountId != nil {
svcConfigs, apiErr = c.serviceConfigRepo.getAllForAccount(
ctx, cloudProvider, *cloudAccountId,
)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, "couldn't get service configs for cloud account",
)
}
}
summaries := []CloudServiceSummary{}
for _, s := range services {
summary := s.CloudServiceSummary
summary.Config = svcConfigs[summary.Id]
summaries = append(summaries, summary)
}
return &ListServicesResponse{
Services: summaries,
}, nil
}
func (c *Controller) GetServiceDetails(
ctx context.Context,
cloudProvider string,
serviceId string,
cloudAccountId *string,
) (*CloudServiceDetails, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
service, apiErr := getCloudProviderService(cloudProvider, serviceId)
if apiErr != nil {
return nil, apiErr
}
if cloudAccountId != nil {
config, apiErr := c.serviceConfigRepo.get(
ctx, cloudProvider, *cloudAccountId, serviceId,
)
if apiErr != nil && apiErr.Type() != model.ErrorNotFound {
return nil, model.WrapApiError(apiErr, "couldn't fetch service config")
}
if config != nil {
service.Config = config
}
}
return service, nil
}
type UpdateServiceConfigRequest struct {
CloudAccountId string `json:"cloud_account_id"`
Config CloudServiceConfig `json:"config"`
}
type UpdateServiceConfigResponse struct {
Id string `json:"id"`
Config CloudServiceConfig `json:"config"`
}
func (c *Controller) UpdateServiceConfig(
ctx context.Context,
cloudProvider string,
serviceId string,
req UpdateServiceConfigRequest,
) (*UpdateServiceConfigResponse, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
// can only update config for a connected cloud account id
_, apiErr := c.accountsRepo.getConnectedCloudAccount(
ctx, cloudProvider, req.CloudAccountId,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't find connected cloud account")
}
// can only update config for a valid service.
_, apiErr = getCloudProviderService(cloudProvider, serviceId)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "unsupported service")
}
updatedConfig, apiErr := c.serviceConfigRepo.upsert(
ctx, cloudProvider, req.CloudAccountId, serviceId, req.Config,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't update service config")
}
return &UpdateServiceConfigResponse{
Id: serviceId,
Config: *updatedConfig,
}, nil
}

View File

@@ -12,7 +12,7 @@ import (
func TestRegenerateConnectionUrlWithUpdatedConfig(t *testing.T) {
require := require.New(t)
testDB, _ := utils.NewTestSqliteDB(t)
testDB := utils.NewQueryServiceDBForTests(t)
controller, err := NewController(testDB)
require.NoError(err)
@@ -30,7 +30,7 @@ func TestRegenerateConnectionUrlWithUpdatedConfig(t *testing.T) {
require.NotEmpty(resp1.AccountId)
testAccountId := resp1.AccountId
account, apiErr := controller.accountsRepo.get(
account, apiErr := controller.repo.get(
context.TODO(), "aws", testAccountId,
)
require.Nil(apiErr)
@@ -47,7 +47,7 @@ func TestRegenerateConnectionUrlWithUpdatedConfig(t *testing.T) {
require.Nil(apiErr)
require.Equal(testAccountId, resp2.AccountId)
account, apiErr = controller.accountsRepo.get(
account, apiErr = controller.repo.get(
context.TODO(), "aws", testAccountId,
)
require.Nil(apiErr)
@@ -56,7 +56,7 @@ func TestRegenerateConnectionUrlWithUpdatedConfig(t *testing.T) {
func TestAgentCheckIns(t *testing.T) {
require := require.New(t)
testDB, _ := utils.NewTestSqliteDB(t)
testDB := utils.NewQueryServiceDBForTests(t)
controller, err := NewController(testDB)
require.NoError(err)
@@ -89,7 +89,7 @@ func TestAgentCheckIns(t *testing.T) {
// if another connected AccountRecord exists for same cloud account
// i.e. there can't be 2 connected account records for the same cloud account id
// at any point in time.
existingConnected, apiErr := controller.accountsRepo.getConnectedCloudAccount(
existingConnected, apiErr := controller.repo.getConnectedCloudAccount(
context.TODO(), "aws", testCloudAccountId1,
)
require.Nil(apiErr)
@@ -112,7 +112,7 @@ func TestAgentCheckIns(t *testing.T) {
context.TODO(), "aws", testAccountId1,
)
existingConnected, apiErr = controller.accountsRepo.getConnectedCloudAccount(
existingConnected, apiErr = controller.repo.getConnectedCloudAccount(
context.TODO(), "aws", testCloudAccountId1,
)
require.Nil(existingConnected)
@@ -139,7 +139,7 @@ func TestAgentCheckIns(t *testing.T) {
func TestCantDisconnectNonExistentAccount(t *testing.T) {
require := require.New(t)
testDB, _ := utils.NewTestSqliteDB(t)
testDB := utils.NewQueryServiceDBForTests(t)
controller, err := NewController(testDB)
require.NoError(err)
@@ -151,120 +151,3 @@ func TestCantDisconnectNonExistentAccount(t *testing.T) {
require.Equal(model.ErrorNotFound, apiErr.Type())
require.Nil(account)
}
func TestConfigureService(t *testing.T) {
require := require.New(t)
testDB, _ := utils.NewTestSqliteDB(t)
controller, err := NewController(testDB)
require.NoError(err)
testCloudAccountId := "546311234"
// should start out without any service config
svcListResp, apiErr := controller.ListServices(
context.TODO(), "aws", &testCloudAccountId,
)
require.Nil(apiErr)
testSvcId := svcListResp.Services[0].Id
require.Nil(svcListResp.Services[0].Config)
svcDetails, apiErr := controller.GetServiceDetails(
context.TODO(), "aws", testSvcId, &testCloudAccountId,
)
require.Nil(apiErr)
require.Equal(testSvcId, svcDetails.Id)
require.Nil(svcDetails.Config)
// should be able to configure a service for a connected account
testConnectedAccount := makeTestConnectedAccount(t, controller, testCloudAccountId)
require.Nil(testConnectedAccount.RemovedAt)
require.NotNil(testConnectedAccount.CloudAccountId)
require.Equal(testCloudAccountId, *testConnectedAccount.CloudAccountId)
testSvcConfig := CloudServiceConfig{
Metrics: &CloudServiceMetricsConfig{
Enabled: true,
},
}
updateSvcConfigResp, apiErr := controller.UpdateServiceConfig(
context.TODO(), "aws", testSvcId, UpdateServiceConfigRequest{
CloudAccountId: testCloudAccountId,
Config: testSvcConfig,
},
)
require.Nil(apiErr)
require.Equal(testSvcId, updateSvcConfigResp.Id)
require.Equal(testSvcConfig, updateSvcConfigResp.Config)
svcDetails, apiErr = controller.GetServiceDetails(
context.TODO(), "aws", testSvcId, &testCloudAccountId,
)
require.Nil(apiErr)
require.Equal(testSvcId, svcDetails.Id)
require.Equal(testSvcConfig, *svcDetails.Config)
svcListResp, apiErr = controller.ListServices(
context.TODO(), "aws", &testCloudAccountId,
)
require.Nil(apiErr)
for _, svc := range svcListResp.Services {
if svc.Id == testSvcId {
require.Equal(testSvcConfig, *svc.Config)
}
}
// should not be able to configure service after cloud account has been disconnected
_, apiErr = controller.DisconnectAccount(
context.TODO(), "aws", testConnectedAccount.Id,
)
require.Nil(apiErr)
_, apiErr = controller.UpdateServiceConfig(
context.TODO(), "aws", testSvcId,
UpdateServiceConfigRequest{
CloudAccountId: testCloudAccountId,
Config: testSvcConfig,
},
)
require.NotNil(apiErr)
// should not be able to configure a service for a cloud account id that is not connected yet
_, apiErr = controller.UpdateServiceConfig(
context.TODO(), "aws", testSvcId,
UpdateServiceConfigRequest{
CloudAccountId: "9999999999",
Config: testSvcConfig,
},
)
require.NotNil(apiErr)
// should not be able to set config for an unsupported service
_, apiErr = controller.UpdateServiceConfig(
context.TODO(), "aws", "bad-service", UpdateServiceConfigRequest{
CloudAccountId: testCloudAccountId,
Config: testSvcConfig,
},
)
require.NotNil(apiErr)
}
func makeTestConnectedAccount(t *testing.T, controller *Controller, cloudAccountId string) *AccountRecord {
require := require.New(t)
// a check in from SigNoz agent creates or updates a connected account.
testAccountId := uuid.NewString()
resp, apiErr := controller.CheckInAsAgent(
context.TODO(), "aws", AgentCheckInRequest{
AccountId: testAccountId,
CloudAccountId: cloudAccountId,
},
)
require.Nil(apiErr)
require.Equal(testAccountId, resp.Account.Id)
require.Equal(cloudAccountId, *resp.Account.CloudAccountId)
return &resp.Account
}

View File

@@ -5,8 +5,6 @@ import (
"encoding/json"
"fmt"
"time"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
)
// Represents a cloud provider account for cloud integrations
@@ -117,102 +115,3 @@ func (a *AccountRecord) account() Account {
return ca
}
type CloudServiceSummary struct {
Id string `json:"id"`
Title string `json:"title"`
Icon string `json:"icon"`
// Present only if the service has been configured in the
// context of a cloud provider account.
Config *CloudServiceConfig `json:"config,omitempty"`
}
type CloudServiceDetails struct {
CloudServiceSummary
Overview string `json:"overview"` // markdown
Assets CloudServiceAssets `json:"assets"`
SupportedSignals SupportedSignals `json:"supported_signals"`
DataCollected DataCollectedForService `json:"data_collected"`
ConnectionStatus *CloudServiceConnectionStatus `json:"status,omitempty"`
}
type CloudServiceConfig struct {
Logs *CloudServiceLogsConfig `json:"logs,omitempty"`
Metrics *CloudServiceMetricsConfig `json:"metrics,omitempty"`
}
// For serializing from db
func (c *CloudServiceConfig) Scan(src any) error {
data, ok := src.([]byte)
if !ok {
return fmt.Errorf("tried to scan from %T instead of bytes", src)
}
return json.Unmarshal(data, &c)
}
// For serializing to db
func (c *CloudServiceConfig) Value() (driver.Value, error) {
if c == nil {
return nil, nil
}
serialized, err := json.Marshal(c)
if err != nil {
return nil, fmt.Errorf(
"couldn't serialize cloud service config to JSON: %w", err,
)
}
return serialized, nil
}
type CloudServiceLogsConfig struct {
Enabled bool `json:"enabled"`
}
type CloudServiceMetricsConfig struct {
Enabled bool `json:"enabled"`
}
type CloudServiceAssets struct {
Dashboards []dashboards.Data `json:"dashboards"`
}
type SupportedSignals struct {
Logs bool `json:"logs"`
Metrics bool `json:"metrics"`
}
type DataCollectedForService struct {
Logs []CollectedLogAttribute `json:"logs"`
Metrics []CollectedMetric `json:"metrics"`
}
type CollectedLogAttribute struct {
Name string `json:"name"`
Path string `json:"path"`
Type string `json:"type"`
}
type CollectedMetric struct {
Name string `json:"name"`
Type string `json:"type"`
Unit string `json:"unit"`
Description string `json:"description"`
}
type CloudServiceConnectionStatus struct {
Logs *SignalConnectionStatus `json:"logs"`
Metrics *SignalConnectionStatus `json:"metrics"`
}
type SignalConnectionStatus struct {
LastReceivedTsMillis int64 `json:"last_received_ts_ms"` // epoch milliseconds
LastReceivedFrom string `json:"last_received_from"` // resource identifier
}

View File

@@ -37,42 +37,11 @@ type cloudProviderAccountsRepository interface {
func newCloudProviderAccountsRepository(db *sqlx.DB) (
*cloudProviderAccountsSQLRepository, error,
) {
if err := initAccountsSqliteDBIfNeeded(db); err != nil {
return nil, fmt.Errorf("could not init sqlite DB for cloudintegrations accounts: %w", err)
}
return &cloudProviderAccountsSQLRepository{
db: db,
}, nil
}
func initAccountsSqliteDBIfNeeded(db *sqlx.DB) error {
if db == nil {
return fmt.Errorf("db is required")
}
createTablesStatements := `
CREATE TABLE IF NOT EXISTS cloud_integrations_accounts(
cloud_provider TEXT NOT NULL,
id TEXT NOT NULL,
config_json TEXT,
cloud_account_id TEXT,
last_agent_report_json TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
removed_at TIMESTAMP,
UNIQUE(cloud_provider, id)
)
`
_, err := db.Exec(createTablesStatements)
if err != nil {
return fmt.Errorf(
"could not ensure cloud provider accounts schema in sqlite DB: %w", err,
)
}
return nil
}
type cloudProviderAccountsSQLRepository struct {
db *sqlx.DB
}

View File

@@ -1,198 +0,0 @@
package cloudintegrations
import (
"context"
"database/sql"
"fmt"
"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/query-service/model"
)
type serviceConfigRepository interface {
get(
ctx context.Context,
cloudProvider string,
cloudAccountId string,
serviceId string,
) (*CloudServiceConfig, *model.ApiError)
upsert(
ctx context.Context,
cloudProvider string,
cloudAccountId string,
serviceId string,
config CloudServiceConfig,
) (*CloudServiceConfig, *model.ApiError)
getAllForAccount(
ctx context.Context,
cloudProvider string,
cloudAccountId string,
) (
configsBySvcId map[string]*CloudServiceConfig,
apiErr *model.ApiError,
)
}
func newServiceConfigRepository(db *sqlx.DB) (
*serviceConfigSQLRepository, error,
) {
if err := initServiceConfigSqliteDBIfNeeded(db); err != nil {
return nil, fmt.Errorf(
"could not init sqlite DB for cloudintegrations service configs: %w", err,
)
}
return &serviceConfigSQLRepository{
db: db,
}, nil
}
func initServiceConfigSqliteDBIfNeeded(db *sqlx.DB) error {
if db == nil {
return fmt.Errorf("db is required")
}
createTableStatement := `
CREATE TABLE IF NOT EXISTS cloud_integrations_service_configs(
cloud_provider TEXT NOT NULL,
cloud_account_id TEXT NOT NULL,
service_id TEXT NOT NULL,
config_json TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
UNIQUE(cloud_provider, cloud_account_id, service_id)
)
`
_, err := db.Exec(createTableStatement)
if err != nil {
return fmt.Errorf(
"could not ensure cloud provider service configs schema in sqlite DB: %w", err,
)
}
return nil
}
type serviceConfigSQLRepository struct {
db *sqlx.DB
}
func (r *serviceConfigSQLRepository) get(
ctx context.Context,
cloudProvider string,
cloudAccountId string,
serviceId string,
) (*CloudServiceConfig, *model.ApiError) {
var result CloudServiceConfig
err := r.db.GetContext(
ctx, &result, `
select
config_json
from cloud_integrations_service_configs
where
cloud_provider=$1
and cloud_account_id=$2
and service_id=$3
`,
cloudProvider, cloudAccountId, serviceId,
)
if err == sql.ErrNoRows {
return nil, model.NotFoundError(fmt.Errorf(
"couldn't find %s %s config for %s",
cloudProvider, serviceId, cloudAccountId,
))
} else if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't query cloud service config: %w", err,
))
}
return &result, nil
}
func (r *serviceConfigSQLRepository) upsert(
ctx context.Context,
cloudProvider string,
cloudAccountId string,
serviceId string,
config CloudServiceConfig,
) (*CloudServiceConfig, *model.ApiError) {
query := `
INSERT INTO cloud_integrations_service_configs (
cloud_provider,
cloud_account_id,
service_id,
config_json
) values ($1, $2, $3, $4)
on conflict(cloud_provider, cloud_account_id, service_id)
do update set config_json=excluded.config_json
`
_, dbErr := r.db.ExecContext(
ctx, query,
cloudProvider, cloudAccountId, serviceId, &config,
)
if dbErr != nil {
return nil, model.InternalError(fmt.Errorf(
"could not upsert cloud service config: %w", dbErr,
))
}
upsertedConfig, apiErr := r.get(ctx, cloudProvider, cloudAccountId, serviceId)
if apiErr != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't fetch upserted service config: %w", apiErr.ToError(),
))
}
return upsertedConfig, nil
}
func (r *serviceConfigSQLRepository) getAllForAccount(
ctx context.Context,
cloudProvider string,
cloudAccountId string,
) (map[string]*CloudServiceConfig, *model.ApiError) {
type ScannedServiceConfigRecord struct {
ServiceId string `db:"service_id"`
Config CloudServiceConfig `db:"config_json"`
}
records := []ScannedServiceConfigRecord{}
err := r.db.SelectContext(
ctx, &records, `
select
service_id,
config_json
from cloud_integrations_service_configs
where
cloud_provider=$1
and cloud_account_id=$2
`,
cloudProvider, cloudAccountId,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not query service configs from db: %w", err,
))
}
result := map[string]*CloudServiceConfig{}
for _, r := range records {
result[r.ServiceId] = &r.Config
}
return result, nil
}

View File

@@ -1,11 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<svg width="800px" height="800px" viewBox="0 0 16 16" xmlns="http://www.w3.org/2000/svg" fill="none">
<path fill="#9D5025" d="M1.702 2.98L1 3.312v9.376l.702.332 2.842-4.777L1.702 2.98z" />
<path fill="#F58536" d="M3.339 12.657l-1.637.363V2.98l1.637.353v9.324z" />
<path fill="#9D5025" d="M2.476 2.612l.863-.406 4.096 6.216-4.096 5.372-.863-.406V2.612z" />
<path fill="#F58536" d="M5.38 13.248l-2.041.546V2.206l2.04.548v10.494z" />
<path fill="#9D5025" d="M4.3 1.75l1.08-.512 6.043 7.864-6.043 5.66-1.08-.511V1.749z" />
<path fill="#F58536" d="M7.998 13.856l-2.618.906V1.238l2.618.908v11.71z" />
<path fill="#9D5025" d="M6.602.66L7.998 0l6.538 8.453L7.998 16l-1.396-.66V.66z" />
<path fill="#F58536" d="M15 12.686L7.998 16V0L15 3.314v9.372z" />
</svg>

Before

Width:  |  Height:  |  Size: 805 B

View File

@@ -1,30 +0,0 @@
{
"id": "ec2",
"title": "EC2",
"icon": "file://icon.svg",
"overview": "file://overview.md",
"assets": {
"dashboards": []
},
"supported_signals": {
"metrics": true,
"logs": false
},
"data_collected": {
"metrics": [
{
"name": "ec2_cpuutilization_average",
"type": "Gauge",
"unit": "number",
"description": "CloudWatch metric CPUUtilization"
},
{
"name": "ec2_cpuutilization_maximum",
"type": "Gauge",
"unit": "number",
"description": "CloudWatch metric CPUUtilization"
}
],
"logs": []
}
}

View File

@@ -1,3 +0,0 @@
### Monitor EC2 with SigNoz
Collect key EC2 metrics and view them with an out of the box dashboard.

View File

@@ -1,21 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<svg width="80px" height="80px" viewBox="0 0 80 80" version="1.1" xmlns="http://www.w3.org/2000/svg"
xmlns:xlink="http://www.w3.org/1999/xlink">
<title>Icon-Architecture/64/Arch_Amazon-RDS_64</title>
<desc>Created with Sketch.</desc>
<defs>
<linearGradient x1="0%" y1="100%" x2="100%" y2="0%" id="linearGradient-1">
<stop stop-color="#2E27AD" offset="0%"></stop>
<stop stop-color="#527FFF" offset="100%"></stop>
</linearGradient>
</defs>
<g id="Icon-Architecture/64/Arch_Amazon-RDS_64" stroke="none" stroke-width="1" fill="none"
fill-rule="evenodd">
<g id="Icon-Architecture-BG/64/Database" fill="url(#linearGradient-1)">
<rect id="Rectangle" x="0" y="0" width="80" height="80"></rect>
</g>
<path
d="M15.414,14 L24.707,23.293 L23.293,24.707 L14,15.414 L14,23 L12,23 L12,13 C12,12.448 12.447,12 13,12 L23,12 L23,14 L15.414,14 Z M68,13 L68,23 L66,23 L66,15.414 L56.707,24.707 L55.293,23.293 L64.586,14 L57,14 L57,12 L67,12 C67.553,12 68,12.448 68,13 L68,13 Z M66,57 L68,57 L68,67 C68,67.552 67.553,68 67,68 L57,68 L57,66 L64.586,66 L55.293,56.707 L56.707,55.293 L66,64.586 L66,57 Z M65.5,39.213 C65.5,35.894 61.668,32.615 55.25,30.442 L55.891,28.548 C63.268,31.045 67.5,34.932 67.5,39.213 C67.5,43.495 63.268,47.383 55.89,49.879 L55.249,47.984 C61.668,45.812 65.5,42.534 65.5,39.213 L65.5,39.213 Z M14.556,39.213 C14.556,42.393 18.143,45.585 24.152,47.753 L23.473,49.634 C16.535,47.131 12.556,43.333 12.556,39.213 C12.556,35.094 16.535,31.296 23.473,28.792 L24.152,30.673 C18.143,32.842 14.556,36.034 14.556,39.213 L14.556,39.213 Z M24.707,56.707 L15.414,66 L23,66 L23,68 L13,68 C12.447,68 12,67.552 12,67 L12,57 L14,57 L14,64.586 L23.293,55.293 L24.707,56.707 Z M40,31.286 C32.854,31.286 29,29.44 29,28.686 C29,27.931 32.854,26.086 40,26.086 C47.145,26.086 51,27.931 51,28.686 C51,29.44 47.145,31.286 40,31.286 L40,31.286 Z M40.029,39.031 C33.187,39.031 29,37.162 29,36.145 L29,31.284 C31.463,32.643 35.832,33.286 40,33.286 C44.168,33.286 48.537,32.643 51,31.284 L51,36.145 C51,37.163 46.835,39.031 40.029,39.031 L40.029,39.031 Z M40.029,46.667 C33.187,46.667 29,44.798 29,43.781 L29,38.862 C31.431,40.291 35.742,41.031 40.029,41.031 C44.292,41.031 48.578,40.292 51,38.867 L51,43.781 C51,44.799 46.835,46.667 40.029,46.667 L40.029,46.667 Z M40,53.518 C32.883,53.518 29,51.605 29,50.622 L29,46.498 C31.431,47.927 35.742,48.667 40.029,48.667 C44.292,48.667 48.578,47.929 51,46.503 L51,50.622 C51,51.605 47.117,53.518 40,53.518 L40,53.518 Z M40,24.086 C33.739,24.086 27,25.525 27,28.686 L27,50.622 C27,53.836 33.54,55.518 40,55.518 C46.46,55.518 53,53.836 53,50.622 L53,28.686 C53,25.525 46.261,24.086 40,24.086 L40,24.086 Z"
id="Amazon-RDS_Icon_64_Squid" fill="#FFFFFF"></path>
</g>
</svg>

Before

Width:  |  Height:  |  Size: 2.7 KiB

View File

@@ -1,30 +0,0 @@
{
"id": "rds-postgres",
"title": "RDS Postgres",
"icon": "file://icon.svg",
"overview": "file://overview.md",
"assets": {
"dashboards": []
},
"supported_signals": {
"metrics": true,
"logs": true
},
"data_collected": {
"metrics": [
{
"name": "rds_postgres_cpuutilization_average",
"type": "Gauge",
"unit": "number",
"description": "CloudWatch metric CPUUtilization"
},
{
"name": "rds_postgres_cpuutilization_maximum",
"type": "Gauge",
"unit": "number",
"description": "CloudWatch metric CPUUtilization"
}
],
"logs": []
}
}

View File

@@ -1,3 +0,0 @@
### Monitor RDS Postgres with SigNoz
Collect key RDS Postgres metrics and view them with an out of the box dashboard.

View File

@@ -35,126 +35,10 @@ var (
)
// InitDB sets up setting up the connection pool global variable.
func InitDB(dataSourceName string) (*sqlx.DB, error) {
var err error
db, err = sqlx.Open("sqlite3", dataSourceName)
if err != nil {
return nil, err
}
table_schema := `CREATE TABLE IF NOT EXISTS dashboards (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT NOT NULL UNIQUE,
created_at datetime NOT NULL,
updated_at datetime NOT NULL,
data TEXT NOT NULL
);`
_, err = db.Exec(table_schema)
if err != nil {
return nil, fmt.Errorf("error in creating dashboard table: %s", err.Error())
}
table_schema = `CREATE TABLE IF NOT EXISTS rules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
updated_at datetime NOT NULL,
deleted INTEGER DEFAULT 0,
data TEXT NOT NULL
);`
_, err = db.Exec(table_schema)
if err != nil {
return nil, fmt.Errorf("error in creating rules table: %s", err.Error())
}
table_schema = `CREATE TABLE IF NOT EXISTS notification_channels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at datetime NOT NULL,
updated_at datetime NOT NULL,
name TEXT NOT NULL UNIQUE,
type TEXT NOT NULL,
deleted INTEGER DEFAULT 0,
data TEXT NOT NULL
);`
_, err = db.Exec(table_schema)
if err != nil {
return nil, fmt.Errorf("error in creating notification_channles table: %s", err.Error())
}
tableSchema := `CREATE TABLE IF NOT EXISTS planned_maintenance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
alert_ids TEXT,
schedule TEXT NOT NULL,
created_at datetime NOT NULL,
created_by TEXT NOT NULL,
updated_at datetime NOT NULL,
updated_by TEXT NOT NULL
);`
_, err = db.Exec(tableSchema)
if err != nil {
return nil, fmt.Errorf("error in creating planned_maintenance table: %s", err.Error())
}
table_schema = `CREATE TABLE IF NOT EXISTS ttl_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transaction_id TEXT NOT NULL,
created_at datetime NOT NULL,
updated_at datetime NOT NULL,
table_name TEXT NOT NULL,
ttl INTEGER DEFAULT 0,
cold_storage_ttl INTEGER DEFAULT 0,
status TEXT NOT NULL
);`
_, err = db.Exec(table_schema)
if err != nil {
return nil, fmt.Errorf("error in creating ttl_status table: %s", err.Error())
}
// sqlite does not support "IF NOT EXISTS"
createdAt := `ALTER TABLE rules ADD COLUMN created_at datetime;`
_, err = db.Exec(createdAt)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return nil, fmt.Errorf("error in adding column created_at to rules table: %s", err.Error())
}
createdBy := `ALTER TABLE rules ADD COLUMN created_by TEXT;`
_, err = db.Exec(createdBy)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return nil, fmt.Errorf("error in adding column created_by to rules table: %s", err.Error())
}
updatedBy := `ALTER TABLE rules ADD COLUMN updated_by TEXT;`
_, err = db.Exec(updatedBy)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return nil, fmt.Errorf("error in adding column updated_by to rules table: %s", err.Error())
}
createdBy = `ALTER TABLE dashboards ADD COLUMN created_by TEXT;`
_, err = db.Exec(createdBy)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return nil, fmt.Errorf("error in adding column created_by to dashboards table: %s", err.Error())
}
updatedBy = `ALTER TABLE dashboards ADD COLUMN updated_by TEXT;`
_, err = db.Exec(updatedBy)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return nil, fmt.Errorf("error in adding column updated_by to dashboards table: %s", err.Error())
}
locked := `ALTER TABLE dashboards ADD COLUMN locked INTEGER DEFAULT 0;`
_, err = db.Exec(locked)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return nil, fmt.Errorf("error in adding column locked to dashboards table: %s", err.Error())
}
// @deprecated
func InitDB(inputDB *sqlx.DB) {
db = inputDB
telemetry.GetInstance().SetDashboardsInfoCallback(GetDashboardsInfo)
return db, nil
}
type Dashboard struct {

View File

@@ -33,41 +33,9 @@ type SavedView struct {
ExtraData string `json:"extra_data" db:"extra_data"`
}
// InitWithDSN sets up setting up the connection pool global variable.
func InitWithDSN(dataSourceName string) (*sqlx.DB, error) {
var err error
db, err = sqlx.Open("sqlite3", dataSourceName)
if err != nil {
return nil, err
}
tableSchema := `CREATE TABLE IF NOT EXISTS saved_views (
uuid TEXT PRIMARY KEY,
name TEXT NOT NULL,
category TEXT NOT NULL,
created_at datetime NOT NULL,
created_by TEXT,
updated_at datetime NOT NULL,
updated_by TEXT,
source_page TEXT NOT NULL,
tags TEXT,
data TEXT NOT NULL,
extra_data TEXT
);`
_, err = db.Exec(tableSchema)
if err != nil {
return nil, fmt.Errorf("error in creating saved views table: %s", err.Error())
}
telemetry.GetInstance().SetSavedViewsInfoCallback(GetSavedViewsInfo)
return db, nil
}
func InitWithDB(sqlDB *sqlx.DB) {
db = sqlDB
telemetry.GetInstance().SetSavedViewsInfoCallback(GetSavedViewsInfo)
}
func GetViews() ([]*v3.SavedView, error) {

View File

@@ -3902,18 +3902,6 @@ func (aH *APIHandler) RegisterCloudIntegrationsRoutes(router *mux.Router, am *Au
"/{cloudProvider}/agent-check-in", am.EditAccess(aH.CloudIntegrationsAgentCheckIn),
).Methods(http.MethodPost)
subRouter.HandleFunc(
"/{cloudProvider}/services", am.ViewAccess(aH.CloudIntegrationsListServices),
).Methods(http.MethodGet)
subRouter.HandleFunc(
"/{cloudProvider}/services/{serviceId}", am.ViewAccess(aH.CloudIntegrationsGetServiceDetails),
).Methods(http.MethodGet)
subRouter.HandleFunc(
"/{cloudProvider}/services/{serviceId}/config", am.EditAccess(aH.CloudIntegrationsUpdateServiceConfig),
).Methods(http.MethodPost)
}
func (aH *APIHandler) CloudIntegrationsListConnectedAccounts(
@@ -4037,77 +4025,6 @@ func (aH *APIHandler) CloudIntegrationsDisconnectAccount(
aH.Respond(w, result)
}
func (aH *APIHandler) CloudIntegrationsListServices(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
var cloudAccountId *string
cloudAccountIdQP := r.URL.Query().Get("cloud_account_id")
if len(cloudAccountIdQP) > 0 {
cloudAccountId = &cloudAccountIdQP
}
resp, apiErr := aH.CloudIntegrationsController.ListServices(
r.Context(), cloudProvider, cloudAccountId,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, resp)
}
func (aH *APIHandler) CloudIntegrationsGetServiceDetails(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
serviceId := mux.Vars(r)["serviceId"]
var cloudAccountId *string
cloudAccountIdQP := r.URL.Query().Get("cloud_account_id")
if len(cloudAccountIdQP) > 0 {
cloudAccountId = &cloudAccountIdQP
}
resp, apiErr := aH.CloudIntegrationsController.GetServiceDetails(
r.Context(), cloudProvider, serviceId, cloudAccountId,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, resp)
}
func (aH *APIHandler) CloudIntegrationsUpdateServiceConfig(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
serviceId := mux.Vars(r)["serviceId"]
req := cloudintegrations.UpdateServiceConfigRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, model.BadRequest(err), nil)
return
}
result, apiErr := aH.CloudIntegrationsController.UpdateServiceConfig(
r.Context(), cloudProvider, serviceId, req,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, result)
}
// logs
func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router, am *AuthMiddleware) {
subRouter := router.PathPrefix("/api/v1/logs").Subrouter()

View File

@@ -105,7 +105,7 @@ func readBuiltInIntegration(dirpath string) (
)
}
hydrated, err := HydrateFileUris(integrationSpec, integrationFiles, dirpath)
hydrated, err := hydrateFileUris(integrationSpec, dirpath)
if err != nil {
return nil, fmt.Errorf(
"couldn't hydrate files referenced in integration %s: %w", integrationJsonPath, err,
@@ -172,11 +172,11 @@ func validateIntegration(i IntegrationDetails) error {
return nil
}
func HydrateFileUris(spec interface{}, fs embed.FS, basedir string) (interface{}, error) {
func hydrateFileUris(spec interface{}, basedir string) (interface{}, error) {
if specMap, ok := spec.(map[string]interface{}); ok {
result := map[string]interface{}{}
for k, v := range specMap {
hydrated, err := HydrateFileUris(v, fs, basedir)
hydrated, err := hydrateFileUris(v, basedir)
if err != nil {
return nil, err
}
@@ -187,7 +187,7 @@ func HydrateFileUris(spec interface{}, fs embed.FS, basedir string) (interface{}
} else if specSlice, ok := spec.([]interface{}); ok {
result := []interface{}{}
for _, v := range specSlice {
hydrated, err := HydrateFileUris(v, fs, basedir)
hydrated, err := hydrateFileUris(v, basedir)
if err != nil {
return nil, err
}
@@ -196,14 +196,14 @@ func HydrateFileUris(spec interface{}, fs embed.FS, basedir string) (interface{}
return result, nil
} else if maybeFileUri, ok := spec.(string); ok {
return readFileIfUri(fs, maybeFileUri, basedir)
return readFileIfUri(maybeFileUri, basedir)
}
return spec, nil
}
func readFileIfUri(fs embed.FS, maybeFileUri string, basedir string) (interface{}, error) {
func readFileIfUri(maybeFileUri string, basedir string) (interface{}, error) {
fileUriPrefix := "file://"
if !strings.HasPrefix(maybeFileUri, fileUriPrefix) {
return maybeFileUri, nil
@@ -212,7 +212,7 @@ func readFileIfUri(fs embed.FS, maybeFileUri string, basedir string) (interface{
relativePath := maybeFileUri[len(fileUriPrefix):]
fullPath := path.Join(basedir, relativePath)
fileContents, err := fs.ReadFile(fullPath)
fileContents, err := integrationFiles.ReadFile(fullPath)
if err != nil {
return nil, fmt.Errorf("couldn't read referenced file: %w", err)
}

View File

@@ -123,12 +123,7 @@ type Manager struct {
}
func NewManager(db *sqlx.DB) (*Manager, error) {
iiRepo, err := NewInstalledIntegrationsSqliteRepo(db)
if err != nil {
return nil, fmt.Errorf(
"could not init sqlite DB for installed integrations: %w", err,
)
}
iiRepo := NewInstalledIntegrationsSqliteRepo(db)
return &Manager{
availableIntegrationsRepo: &BuiltInIntegrations{},

View File

@@ -9,45 +9,14 @@ import (
"go.signoz.io/signoz/pkg/query-service/model"
)
func InitSqliteDBIfNeeded(db *sqlx.DB) error {
if db == nil {
return fmt.Errorf("db is required")
}
createTablesStatements := `
CREATE TABLE IF NOT EXISTS integrations_installed(
integration_id TEXT PRIMARY KEY,
config_json TEXT,
installed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`
_, err := db.Exec(createTablesStatements)
if err != nil {
return fmt.Errorf(
"could not ensure integrations schema in sqlite DB: %w", err,
)
}
return nil
}
type InstalledIntegrationsSqliteRepo struct {
db *sqlx.DB
}
func NewInstalledIntegrationsSqliteRepo(db *sqlx.DB) (
*InstalledIntegrationsSqliteRepo, error,
) {
err := InitSqliteDBIfNeeded(db)
if err != nil {
return nil, fmt.Errorf(
"couldn't ensure sqlite schema for installed integrations: %w", err,
)
}
func NewInstalledIntegrationsSqliteRepo(db *sqlx.DB) *InstalledIntegrationsSqliteRepo {
return &InstalledIntegrationsSqliteRepo{
db: db,
}, nil
}
}
func (r *InstalledIntegrationsSqliteRepo) list(

View File

@@ -15,11 +15,7 @@ import (
func NewTestIntegrationsManager(t *testing.T) *Manager {
testDB := utils.NewQueryServiceDBForTests(t)
installedIntegrationsRepo, err := NewInstalledIntegrationsSqliteRepo(testDB)
if err != nil {
t.Fatalf("could not init sqlite DB for installed integrations: %v", err)
}
installedIntegrationsRepo := NewInstalledIntegrationsSqliteRepo(testDB)
return &Manager{
availableIntegrationsRepo: &TestAvailableIntegrationsRepo{},

View File

@@ -27,15 +27,13 @@ type LogParsingPipelineController struct {
func NewLogParsingPipelinesController(
db *sqlx.DB,
engine string,
getIntegrationPipelines func(context.Context) ([]Pipeline, *model.ApiError),
) (*LogParsingPipelineController, error) {
repo := NewRepo(db)
err := repo.InitDB(engine)
return &LogParsingPipelineController{
Repo: repo,
GetIntegrationPipelines: getIntegrationPipelines,
}, err
}, nil
}
// PipelinesResponse is used to prepare http response for pipelines config related requests

View File

@@ -9,7 +9,6 @@ import (
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline/sqlite"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
@@ -29,15 +28,6 @@ func NewRepo(db *sqlx.DB) Repo {
}
}
func (r *Repo) InitDB(engine string) error {
switch engine {
case "sqlite3", "sqlite":
return sqlite.InitDB(r.db)
default:
return fmt.Errorf("unsupported db")
}
}
// insertPipeline stores a given postable pipeline to database
func (r *Repo) insertPipeline(
ctx context.Context, postable *PostablePipeline,

View File

@@ -1,35 +0,0 @@
package sqlite
import (
"fmt"
"github.com/pkg/errors"
"github.com/jmoiron/sqlx"
)
func InitDB(db *sqlx.DB) error {
var err error
if db == nil {
return fmt.Errorf("invalid db connection")
}
table_schema := `CREATE TABLE IF NOT EXISTS pipelines(
id TEXT PRIMARY KEY,
order_id INTEGER,
enabled BOOLEAN,
created_by TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
name VARCHAR(400) NOT NULL,
alias VARCHAR(20) NOT NULL,
description TEXT,
filter TEXT NOT NULL,
config_json TEXT
);
`
_, err = db.Exec(table_schema)
if err != nil {
return errors.Wrap(err, "Error in creating pipelines table")
}
return nil
}

View File

@@ -167,22 +167,6 @@ func jsonFilterEnrich(filter v3.FilterItem) v3.FilterItem {
// check if the value is a int, float, string, bool
valueType := ""
switch filter.Value.(type) {
// even the filter value is an array the actual type of the value is string.
case []interface{}:
// check first value type in array and use that
if len(filter.Value.([]interface{})) > 0 {
firstVal := filter.Value.([]interface{})[0]
switch firstVal.(type) {
case uint8, uint16, uint32, uint64, int, int8, int16, int32, int64:
valueType = "int64"
case float32, float64:
valueType = "float64"
case bool:
valueType = "bool"
default:
valueType = "string"
}
}
case uint8, uint16, uint32, uint64, int, int8, int16, int32, int64:
valueType = "int64"
case float32, float64:

View File

@@ -563,50 +563,6 @@ var testJSONFilterEnrichData = []struct {
Value: 10.0,
},
},
{
Name: "check IN",
Filter: v3.FilterItem{
Key: v3.AttributeKey{
Key: "body.attr",
DataType: v3.AttributeKeyDataTypeUnspecified,
Type: v3.AttributeKeyTypeUnspecified,
},
Operator: "IN",
Value: []interface{}{"hello", "world"},
},
Result: v3.FilterItem{
Key: v3.AttributeKey{
Key: "body.attr",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeUnspecified,
IsJSON: true,
},
Operator: "IN",
Value: []interface{}{"hello", "world"},
},
},
{
Name: "check NOT_IN",
Filter: v3.FilterItem{
Key: v3.AttributeKey{
Key: "body.attr",
DataType: v3.AttributeKeyDataTypeUnspecified,
Type: v3.AttributeKeyTypeUnspecified,
},
Operator: "NOT_IN",
Value: []interface{}{10, 20},
},
Result: v3.FilterItem{
Key: v3.AttributeKey{
Key: "body.attr",
DataType: v3.AttributeKeyDataTypeInt64,
Type: v3.AttributeKeyTypeUnspecified,
IsJSON: true,
},
Operator: "NOT_IN",
Value: []interface{}{10, 20},
},
},
}
func TestJsonEnrich(t *testing.T) {

View File

@@ -183,71 +183,6 @@ var testGetJSONFilterData = []struct {
},
Filter: "lower(body) like lower('%message%') AND JSON_EXISTS(body, '$.\"message\"')",
},
{
Name: "test json in array string",
FilterItem: v3.FilterItem{
Key: v3.AttributeKey{
Key: "body.name",
DataType: "string",
IsJSON: true,
},
Operator: "in",
Value: []interface{}{"hello", "world"},
},
Filter: "lower(body) like lower('%name%') AND JSON_EXISTS(body, '$.\"name\"') AND JSON_VALUE(body, '$.\"name\"') IN ['hello','world']",
},
{
Name: "test json in array number",
FilterItem: v3.FilterItem{
Key: v3.AttributeKey{
Key: "body.value",
DataType: "int64",
IsJSON: true,
},
Operator: "in",
Value: []interface{}{10, 11},
},
Filter: "lower(body) like lower('%value%') AND JSON_EXISTS(body, '$.\"value\"') AND JSONExtract(JSON_VALUE(body, '$.\"value\"'), 'Int64') IN [10,11]",
},
{
Name: "test json in array mixed data- allow",
FilterItem: v3.FilterItem{
Key: v3.AttributeKey{
Key: "body.value",
DataType: "int64",
IsJSON: true,
},
Operator: "in",
Value: []interface{}{11, "11"},
},
Filter: "lower(body) like lower('%value%') AND JSON_EXISTS(body, '$.\"value\"') AND JSONExtract(JSON_VALUE(body, '$.\"value\"'), 'Int64') IN [11,11]",
},
{
Name: "test json in array mixed data- fail",
FilterItem: v3.FilterItem{
Key: v3.AttributeKey{
Key: "body.value",
DataType: "int64",
IsJSON: true,
},
Operator: "in",
Value: []interface{}{11, "11", "hello"},
},
Error: true,
},
{
Name: "test json in array mixed data- allow",
FilterItem: v3.FilterItem{
Key: v3.AttributeKey{
Key: "body.value",
DataType: "string",
IsJSON: true,
},
Operator: "in",
Value: []interface{}{"hello", 11},
},
Filter: "lower(body) like lower('%value%') AND JSON_EXISTS(body, '$.\"value\"') AND JSON_VALUE(body, '$.\"value\"') IN ['hello','11']",
},
}
func TestGetJSONFilter(t *testing.T) {

View File

@@ -166,10 +166,7 @@ type testbed struct {
func newTestbed(t *testing.T) *testbed {
testDB := utils.NewQueryServiceDBForTests(t)
_, err := model.InitDB(testDB)
if err != nil {
t.Fatalf("could not init opamp model: %v", err)
}
model.InitDB(testDB)
testConfigProvider := NewMockAgentConfigProvider()
opampServer := InitializeServer(nil, testConfigProvider)

View File

@@ -30,28 +30,15 @@ func (a *Agents) Count() int {
}
// Initialize the database and create schema if needed
func InitDB(qsDB *sqlx.DB) (*sqlx.DB, error) {
func InitDB(qsDB *sqlx.DB) *sqlx.DB {
db = qsDB
tableSchema := `CREATE TABLE IF NOT EXISTS agents (
agent_id TEXT PRIMARY KEY UNIQUE,
started_at datetime NOT NULL,
terminated_at datetime,
current_status TEXT NOT NULL,
effective_config TEXT NOT NULL
);`
_, err := db.Exec(tableSchema)
if err != nil {
return nil, fmt.Errorf("error in creating agents table: %s", err.Error())
}
AllAgents = Agents{
agentsById: make(map[string]*Agent),
connections: make(map[types.Connection]map[string]bool),
mux: sync.RWMutex{},
}
return db, nil
return qsDB
}
// RemoveConnection removes the connection all Agent instances associated with the

View File

@@ -203,53 +203,8 @@ type UpdatePreference struct {
var db *sqlx.DB
func InitDB(datasourceName string) error {
var err error
db, err = sqlx.Open("sqlite3", datasourceName)
if err != nil {
return err
}
// create the user preference table
tableSchema := `
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS user_preference(
preference_id TEXT NOT NULL,
preference_value TEXT,
user_id TEXT NOT NULL,
PRIMARY KEY (preference_id,user_id),
FOREIGN KEY (user_id)
REFERENCES users(id)
ON UPDATE CASCADE
ON DELETE CASCADE
);`
_, err = db.Exec(tableSchema)
if err != nil {
return fmt.Errorf("error in creating user_preference table: %s", err.Error())
}
// create the org preference table
tableSchema = `
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS org_preference(
preference_id TEXT NOT NULL,
preference_value TEXT,
org_id TEXT NOT NULL,
PRIMARY KEY (preference_id,org_id),
FOREIGN KEY (org_id)
REFERENCES organizations(id)
ON UPDATE CASCADE
ON DELETE CASCADE
);`
_, err = db.Exec(tableSchema)
if err != nil {
return fmt.Errorf("error in creating org_preference table: %s", err.Error())
}
return nil
func InitDB(inputDB *sqlx.DB) {
db = inputDB
}
// org preference functions

View File

@@ -27,6 +27,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
"go.signoz.io/signoz/pkg/query-service/app/explorer"
"go.signoz.io/signoz/pkg/query-service/app/integrations"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/app/opamp"
@@ -35,8 +36,8 @@ import (
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/migrate"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/query-service/app/explorer"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/constants"
@@ -96,24 +97,13 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status {
}
// NewServer creates and initializes Server
func NewServer(serverOptions *ServerOptions) (*Server, error) {
if err := dao.InitDao("sqlite", constants.RELATIONAL_DATASOURCE_PATH); err != nil {
func NewServer(serverOptions *ServerOptions, config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
if err := dao.InitDao(signoz.SQLStore.SQLxDB()); err != nil {
return nil, err
}
if err := preferences.InitDB(constants.RELATIONAL_DATASOURCE_PATH); err != nil {
return nil, err
}
localDB, err := dashboards.InitDB(constants.RELATIONAL_DATASOURCE_PATH)
explorer.InitWithDSN(constants.RELATIONAL_DATASOURCE_PATH)
if err != nil {
return nil, err
}
localDB.SetMaxOpenConns(10)
preferences.InitDB(signoz.SQLStore.SQLxDB())
dashboards.InitDB(signoz.SQLStore.SQLxDB())
explorer.InitWithDB(signoz.SQLStore.SQLxDB())
// initiate feature manager
fm := featureManager.StartManager()
@@ -125,7 +115,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
if storage == "clickhouse" {
zap.L().Info("Using ClickHouse as datastore ...")
clickhouseReader := clickhouseReader.NewReader(
localDB,
signoz.SQLStore.SQLxDB(),
serverOptions.PromConfigPath,
fm,
serverOptions.MaxIdleConns,
@@ -140,7 +130,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
} else {
return nil, fmt.Errorf("storage type: %s is not supported in query service", storage)
}
skipConfig := &model.SkipConfig{}
var err error
if serverOptions.SkipTopLvlOpsPath != "" {
// read skip config
skipConfig, err = model.ReadSkipConfig(serverOptions.SkipTopLvlOpsPath)
@@ -161,7 +153,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
rm, err := makeRulesManager(
serverOptions.PromConfigPath,
constants.GetAlertManagerApiPrefix(),
serverOptions.RuleRepoURL, localDB, reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema)
serverOptions.RuleRepoURL, signoz.SQLStore.SQLxDB(), reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema)
if err != nil {
return nil, err
}
@@ -178,18 +170,18 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err
}
integrationsController, err := integrations.NewController(localDB)
integrationsController, err := integrations.NewController(signoz.SQLStore.SQLxDB())
if err != nil {
return nil, fmt.Errorf("couldn't create integrations controller: %w", err)
}
cloudIntegrationsController, err := cloudintegrations.NewController(localDB)
cloudIntegrationsController, err := cloudintegrations.NewController(signoz.SQLStore.SQLxDB())
if err != nil {
return nil, fmt.Errorf("couldn't create cloud provider integrations controller: %w", err)
}
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
localDB, "sqlite", integrationsController.GetPipelinesForInstalledIntegrations,
signoz.SQLStore.SQLxDB(), integrationsController.GetPipelinesForInstalledIntegrations,
)
if err != nil {
return nil, err
@@ -241,14 +233,10 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
s.privateHTTP = privateServer
_, err = opAmpModel.InitDB(localDB)
if err != nil {
return nil, err
}
opAmpModel.InitDB(signoz.SQLStore.SQLxDB())
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: localDB,
DBEngine: "sqlite",
DB: signoz.SQLStore.SQLxDB(),
AgentFeatures: []agentConf.AgentFeature{
logParsingPipelineController,
},

View File

@@ -1,26 +1,20 @@
package dao
import (
"fmt"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/dao/sqlite"
)
var db ModelDao
func InitDao(engine, path string) error {
func InitDao(inputDB *sqlx.DB) error {
var err error
switch engine {
case "sqlite":
db, err = sqlite.InitDB(path)
if err != nil {
return errors.Wrap(err, "failed to initialize DB")
}
default:
return fmt.Errorf("RelationalDB type: %s is not supported in query service", engine)
db, err = sqlite.InitDB(inputDB)
if err != nil {
return errors.Wrap(err, "failed to initialize DB")
}
return nil
}

View File

@@ -2,7 +2,6 @@ package sqlite
import (
"context"
"fmt"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
@@ -17,83 +16,8 @@ type ModelDaoSqlite struct {
}
// InitDB sets up setting up the connection pool global variable.
func InitDB(dataSourceName string) (*ModelDaoSqlite, error) {
var err error
db, err := sqlx.Open("sqlite3", dataSourceName)
if err != nil {
return nil, errors.Wrap(err, "failed to Open sqlite3 DB")
}
db.SetMaxOpenConns(10)
table_schema := `
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS invites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
token TEXT NOT NULL,
created_at INTEGER NOT NULL,
role TEXT NOT NULL,
org_id TEXT NOT NULL,
FOREIGN KEY(org_id) REFERENCES organizations(id)
);
CREATE TABLE IF NOT EXISTS organizations (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
created_at INTEGER NOT NULL,
is_anonymous INTEGER NOT NULL DEFAULT 0 CHECK(is_anonymous IN (0,1)),
has_opted_updates INTEGER NOT NULL DEFAULT 1 CHECK(has_opted_updates IN (0,1))
);
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
password TEXT NOT NULL,
created_at INTEGER NOT NULL,
profile_picture_url TEXT,
group_id TEXT NOT NULL,
org_id TEXT NOT NULL,
FOREIGN KEY(group_id) REFERENCES groups(id),
FOREIGN KEY(org_id) REFERENCES organizations(id)
);
CREATE TABLE IF NOT EXISTS groups (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS reset_password_request (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
token TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS user_flags (
user_id TEXT PRIMARY KEY,
flags TEXT,
FOREIGN KEY(user_id) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS apdex_settings (
service_name TEXT PRIMARY KEY,
threshold FLOAT NOT NULL,
exclude_status_codes TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS ingestion_keys (
key_id TEXT PRIMARY KEY,
name TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ingestion_key TEXT NOT NULL,
ingestion_url TEXT NOT NULL,
data_region TEXT NOT NULL
);
`
_, err = db.Exec(table_schema)
if err != nil {
return nil, fmt.Errorf("error in creating tables: %v", err.Error())
}
mds := &ModelDaoSqlite{db: db}
func InitDB(inputDB *sqlx.DB) (*ModelDaoSqlite, error) {
mds := &ModelDaoSqlite{db: inputDB}
ctx := context.Background()
if err := mds.initializeOrgPreferences(ctx); err != nil {

View File

@@ -9,25 +9,19 @@ import (
"time"
prommodel "github.com/prometheus/common/model"
"go.signoz.io/signoz/pkg/config"
"go.signoz.io/signoz/pkg/config/envprovider"
"go.signoz.io/signoz/pkg/instrumentation"
"go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/migrate"
"go.signoz.io/signoz/pkg/query-service/version"
"go.signoz.io/signoz/pkg/signoz"
pkgversion "go.signoz.io/signoz/pkg/version"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func initZapLog() *zap.Logger {
config := zap.NewProductionConfig()
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
config.EncoderConfig.TimeKey = "timestamp"
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
logger, _ := config.Build()
return logger
}
func init() {
prommodel.NameValidationScheme = prommodel.UTF8Validation
}
@@ -67,13 +61,33 @@ func main() {
flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection, only used with clickhouse if not set in ClickHouseUrl env var DSN.)")
flag.Parse()
loggerMgr := initZapLog()
zap.ReplaceGlobals(loggerMgr)
defer loggerMgr.Sync() // flushes buffer, if any
config, err := signoz.NewConfig(context.Background(), config.ResolverConfig{
Uris: []string{"env:"},
ProviderFactories: []config.ProviderFactory{
envprovider.NewFactory(),
},
})
if err != nil {
zap.L().Fatal("Failed to create config", zap.Error(err))
}
logger := loggerMgr.Sugar()
instrumentation, err := instrumentation.New(context.Background(), pkgversion.Build{}, config.Instrumentation)
if err != nil {
zap.L().Fatal("Failed to create instrumentation", zap.Error(err))
}
defer instrumentation.Stop(context.Background())
zap.ReplaceGlobals(instrumentation.Logger())
defer instrumentation.Logger().Sync() // flushes buffer, if any
logger := instrumentation.Logger().Sugar()
version.PrintVersion()
signoz, err := signoz.New(context.Background(), instrumentation, config, signoz.NewProviderFactories())
if err != nil {
zap.L().Fatal("Failed to create signoz struct", zap.Error(err))
}
serverOptions := &app.ServerOptions{
HTTPHostPort: constants.HTTPHostPort,
PromConfigPath: promConfigPath,
@@ -101,13 +115,7 @@ func main() {
zap.L().Info("JWT secret key set successfully.")
}
if err := migrate.Migrate(constants.RELATIONAL_DATASOURCE_PATH); err != nil {
zap.L().Error("Failed to migrate", zap.Error(err))
} else {
zap.L().Info("Migration successful")
}
server, err := app.NewServer(serverOptions)
server, err := app.NewServer(serverOptions, config, signoz)
if err != nil {
logger.Fatal("Failed to create server", zap.Error(err))
}

View File

@@ -16,22 +16,6 @@ type DataMigration struct {
Succeeded bool `db:"succeeded"`
}
func initSchema(conn *sqlx.DB) error {
tableSchema := `
CREATE TABLE IF NOT EXISTS data_migrations (
id SERIAL PRIMARY KEY,
version VARCHAR(255) NOT NULL UNIQUE,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
succeeded BOOLEAN NOT NULL DEFAULT FALSE
);
`
_, err := conn.Exec(tableSchema)
if err != nil {
return err
}
return nil
}
func getMigrationVersion(conn *sqlx.DB, version string) (*DataMigration, error) {
var migration DataMigration
err := conn.Get(&migration, "SELECT * FROM data_migrations WHERE version = $1", version)
@@ -44,18 +28,6 @@ func getMigrationVersion(conn *sqlx.DB, version string) (*DataMigration, error)
return &migration, nil
}
func Migrate(dsn string) error {
conn, err := sqlx.Connect("sqlite3", dsn)
if err != nil {
return err
}
if err := initSchema(conn); err != nil {
return err
}
return nil
}
func ClickHouseMigrate(conn driver.Conn, cluster string) error {
database := "CREATE DATABASE IF NOT EXISTS signoz_analytics ON CLUSTER %s"

View File

@@ -34,7 +34,7 @@ import (
)
func TestLogPipelinesLifecycle(t *testing.T) {
testbed := NewLogPipelinesTestBed(t, nil)
testbed := NewLogPipelinesTestBed(t, utils.NewQueryServiceDBForTests(t))
require := require.New(t)
getPipelinesResp := testbed.GetPipelinesFromQS()
@@ -461,7 +461,7 @@ func NewTestbedWithoutOpamp(t *testing.T, testDB *sqlx.DB) *LogPipelinesTestBed
}
controller, err := logparsingpipeline.NewLogParsingPipelinesController(
testDB, "sqlite", ic.GetPipelinesForInstalledIntegrations,
testDB, ic.GetPipelinesForInstalledIntegrations,
)
if err != nil {
t.Fatalf("could not create a logparsingpipelines controller: %v", err)
@@ -481,12 +481,10 @@ func NewTestbedWithoutOpamp(t *testing.T, testDB *sqlx.DB) *LogPipelinesTestBed
}
// Mock an available opamp agent
testDB, err = opampModel.InitDB(testDB)
require.Nil(t, err, "failed to init opamp model")
_ = opampModel.InitDB(testDB)
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: testDB,
DBEngine: "sqlite",
DB: testDB,
AgentFeatures: []agentConf.AgentFeature{
apiHandler.LogsParsingPipelineController,
}})

View File

@@ -7,7 +7,6 @@ import (
"testing"
"time"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
mockhouse "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require"
@@ -20,7 +19,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/utils"
)
func TestAWSIntegrationAccountLifecycle(t *testing.T) {
func TestAWSIntegrationLifecycle(t *testing.T) {
// Test for happy path of connecting and managing AWS integration accounts
t0 := time.Now()
@@ -52,7 +51,6 @@ func TestAWSIntegrationAccountLifecycle(t *testing.T) {
accountStatusResp := testbed.GetAccountStatusFromQS("aws", testAccountId)
require.Equal(testAccountId, accountStatusResp.Id)
require.Nil(accountStatusResp.Status.Integration.LastHeartbeatTsMillis)
require.Nil(accountStatusResp.CloudAccountId)
// The unconnected account should not show up in connected accounts list yet
accountsListResp1 := testbed.GetConnectedAccountsListFromQS("aws")
@@ -76,8 +74,6 @@ func TestAWSIntegrationAccountLifecycle(t *testing.T) {
// Polling for connection status from UI should now return latest status
accountStatusResp1 := testbed.GetAccountStatusFromQS("aws", testAccountId)
require.Equal(testAccountId, accountStatusResp1.Id)
require.NotNil(accountStatusResp1.CloudAccountId)
require.Equal(testAWSAccountId, *accountStatusResp1.CloudAccountId)
require.NotNil(accountStatusResp1.Status.Integration.LastHeartbeatTsMillis)
require.LessOrEqual(
tsMillisBeforeAgentCheckIn,
@@ -130,70 +126,6 @@ func TestAWSIntegrationAccountLifecycle(t *testing.T) {
require.LessOrEqual(tsBeforeDisconnect, *agentCheckInResp2.Account.RemovedAt)
}
func TestAWSIntegrationServices(t *testing.T) {
require := require.New(t)
testbed := NewCloudIntegrationsTestBed(t, nil)
// should be able to list available cloud services.
svcListResp := testbed.GetServicesFromQS("aws", nil)
require.Greater(len(svcListResp.Services), 0)
for _, svc := range svcListResp.Services {
require.NotEmpty(svc.Id)
require.Nil(svc.Config)
}
// should be able to get details of a particular service.
svcId := svcListResp.Services[0].Id
svcDetailResp := testbed.GetServiceDetailFromQS("aws", svcId, nil)
require.Equal(svcId, svcDetailResp.Id)
require.NotEmpty(svcDetailResp.Overview)
require.Nil(svcDetailResp.Config)
require.Nil(svcDetailResp.ConnectionStatus)
// should be able to configure a service in the ctx of a connected account
// create a connected account
testAccountId := uuid.NewString()
testAWSAccountId := "389389489489"
testbed.CheckInAsAgentWithQS(
"aws", cloudintegrations.AgentCheckInRequest{
AccountId: testAccountId,
CloudAccountId: testAWSAccountId,
},
)
testSvcConfig := cloudintegrations.CloudServiceConfig{
Metrics: &cloudintegrations.CloudServiceMetricsConfig{
Enabled: true,
},
}
updateSvcConfigResp := testbed.UpdateServiceConfigWithQS("aws", svcId, cloudintegrations.UpdateServiceConfigRequest{
CloudAccountId: testAWSAccountId,
Config: testSvcConfig,
})
require.Equal(svcId, updateSvcConfigResp.Id)
require.Equal(testSvcConfig, updateSvcConfigResp.Config)
// service list should include config when queried in the ctx of an account
svcListResp = testbed.GetServicesFromQS("aws", &testAWSAccountId)
require.Greater(len(svcListResp.Services), 0)
for _, svc := range svcListResp.Services {
if svc.Id == svcId {
require.NotNil(svc.Config)
require.Equal(testSvcConfig, *svc.Config)
}
}
// service detail should include config and status info when
// queried in the ctx of an account
svcDetailResp = testbed.GetServiceDetailFromQS("aws", svcId, &testAWSAccountId)
require.Equal(svcId, svcDetailResp.Id)
require.NotNil(svcDetailResp.Config)
require.Equal(testSvcConfig, *svcDetailResp.Config)
}
type CloudIntegrationsTestBed struct {
t *testing.T
testUser *model.User
@@ -343,41 +275,6 @@ func (tb *CloudIntegrationsTestBed) DisconnectAccountWithQS(
return &resp
}
func (tb *CloudIntegrationsTestBed) GetServicesFromQS(
cloudProvider string, cloudAccountId *string,
) *cloudintegrations.ListServicesResponse {
path := fmt.Sprintf("/api/v1/cloud-integrations/%s/services", cloudProvider)
if cloudAccountId != nil {
path = fmt.Sprintf("%s?cloud_account_id=%s", path, *cloudAccountId)
}
return RequestQSAndParseResp[cloudintegrations.ListServicesResponse](
tb, path, nil,
)
}
func (tb *CloudIntegrationsTestBed) GetServiceDetailFromQS(
cloudProvider string, serviceId string, cloudAccountId *string,
) *cloudintegrations.CloudServiceDetails {
path := fmt.Sprintf("/api/v1/cloud-integrations/%s/services/%s", cloudProvider, serviceId)
if cloudAccountId != nil {
path = fmt.Sprintf("%s?cloud_account_id=%s", path, *cloudAccountId)
}
return RequestQSAndParseResp[cloudintegrations.CloudServiceDetails](
tb, path, nil,
)
}
func (tb *CloudIntegrationsTestBed) UpdateServiceConfigWithQS(
cloudProvider string, serviceId string, req any,
) *cloudintegrations.UpdateServiceConfigResponse {
path := fmt.Sprintf("/api/v1/cloud-integrations/%s/services/%s/config", cloudProvider, serviceId)
return RequestQSAndParseResp[cloudintegrations.UpdateServiceConfigResponse](
tb, path, req,
)
}
func (tb *CloudIntegrationsTestBed) RequestQS(
path string,
postData interface{},
@@ -400,20 +297,3 @@ func (tb *CloudIntegrationsTestBed) RequestQS(
}
return dataJson
}
func RequestQSAndParseResp[ResponseType any](
tb *CloudIntegrationsTestBed,
path string,
postData interface{},
) *ResponseType {
respDataJson := tb.RequestQS(path, postData)
var resp ResponseType
err := json.Unmarshal(respDataJson, &resp)
if err != nil {
tb.t.Fatalf("could not unmarshal apiResponse.Data json into %T", resp)
}
return &resp
}

View File

@@ -1,38 +1,70 @@
package utils
import (
"context"
"os"
"testing"
"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/instrumentation/instrumentationtest"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
"go.signoz.io/signoz/pkg/query-service/dao"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/sqlstore/sqlitesqlstore"
"go.signoz.io/signoz/pkg/sqlstoremigrator"
"go.signoz.io/signoz/pkg/sqlstoremigrator/migrations"
)
func NewTestSqliteDB(t *testing.T) (testDB *sqlx.DB, testDBFilePath string) {
func NewQueryServiceDBForTests(t *testing.T) (testDB *sqlx.DB) {
testDBFile, err := os.CreateTemp("", "test-signoz-db-*")
if err != nil {
t.Fatalf("could not create temp file for test db: %v", err)
}
testDBFilePath = testDBFile.Name()
testDBFilePath := testDBFile.Name()
t.Cleanup(func() { os.Remove(testDBFilePath) })
testDBFile.Close()
testDB, err = sqlx.Open("sqlite3", testDBFilePath)
if err != nil {
t.Fatalf("could not open test db sqlite file: %v", err)
config := sqlstore.Config{
Provider: "sqlite",
Sqlite: sqlstore.SqliteConfig{
Path: testDBFilePath,
},
}
return testDB, testDBFilePath
}
func NewQueryServiceDBForTests(t *testing.T) *sqlx.DB {
testDB, testDBFilePath := NewTestSqliteDB(t)
// TODO(Raj): This should not require passing in the DB file path
dao.InitDao("sqlite", testDBFilePath)
dashboards.InitDB(testDBFilePath)
return testDB
sqlStore, err := factory.NewFromFactory(context.Background(), instrumentationtest.New().ToProviderSettings(), config, factory.MustNewNamedMap(sqlitesqlstore.NewFactory()), "sqlite")
if err != nil {
t.Fatalf("could not create sqlite provider: %v", err)
}
migrations, err := sqlstoremigrator.NewMigrations(context.Background(), instrumentationtest.New().ToProviderSettings(), config, factory.MustNewNamedMap(
migrations.NewAddDataMigrationsFactory(),
migrations.NewAddOrganizationFactory(),
migrations.NewAddPreferencesFactory(),
migrations.NewAddDashboardsFactory(),
migrations.NewAddSavedViewsFactory(),
migrations.NewAddAgentsFactory(),
migrations.NewAddPipelinesFactory(),
migrations.NewAddIntegrationsFactory(),
))
if err != nil {
t.Fatalf("could not create migrations: %v", err)
}
sqlStoreMigrator := sqlstoremigrator.New(context.Background(), instrumentationtest.New().ToProviderSettings(), sqlStore, migrations, config)
err = sqlStoreMigrator.Migrate(context.Background())
if err != nil {
t.Fatalf("could not run migrations: %v", err)
}
err = dao.InitDao(sqlStore.SQLxDB())
if err != nil {
t.Fatalf("could not init dao: %v", err)
}
dashboards.InitDB(sqlStore.SQLxDB())
return sqlStore.SQLxDB()
}

View File

@@ -8,18 +8,19 @@ import (
"os/signal"
"syscall"
"go.signoz.io/signoz/pkg/factory"
"go.uber.org/zap"
)
type Registry struct {
services []NamedService
services []factory.Service
logger *zap.Logger
startCh chan error
stopCh chan error
}
// New creates a new registry of services. It needs at least one service in the input.
func New(logger *zap.Logger, services ...NamedService) (*Registry, error) {
func New(logger *zap.Logger, services ...factory.Service) (*Registry, error) {
if logger == nil {
return nil, fmt.Errorf("cannot build registry, logger is required")
}
@@ -38,7 +39,7 @@ func New(logger *zap.Logger, services ...NamedService) (*Registry, error) {
func (r *Registry) Start(ctx context.Context) error {
for _, s := range r.services {
go func(s Service) {
go func(s factory.Service) {
err := s.Start(ctx)
r.startCh <- err
}(s)
@@ -66,7 +67,7 @@ func (r *Registry) Wait(ctx context.Context) error {
func (r *Registry) Stop(ctx context.Context) error {
for _, s := range r.services {
go func(s Service) {
go func(s factory.Service) {
err := s.Stop(ctx)
r.stopCh <- err
}(s)

View File

@@ -6,14 +6,15 @@ import (
"testing"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/factory/factorytest"
"go.uber.org/zap"
)
func TestRegistryWith2HttpServers(t *testing.T) {
http1, err := newHttpService("http1")
http1, err := factorytest.NewHttpService("http1")
require.NoError(t, err)
http2, err := newHttpService("http2")
http2, err := factorytest.NewHttpService("http2")
require.NoError(t, err)
registry, err := New(zap.NewNop(), http1, http2)
@@ -34,10 +35,10 @@ func TestRegistryWith2HttpServers(t *testing.T) {
}
func TestRegistryWith2HttpServersWithoutWait(t *testing.T) {
http1, err := newHttpService("http1")
http1, err := factorytest.NewHttpService("http1")
require.NoError(t, err)
http2, err := newHttpService("http2")
http2, err := factorytest.NewHttpService("http2")
require.NoError(t, err)
registry, err := New(zap.NewNop(), http1, http2)

View File

@@ -1,16 +0,0 @@
package registry
import "context"
type Service interface {
// Starts a service. The service should return an error if it cannot be started.
Start(context.Context) error
// Stops a service.
Stop(context.Context) error
}
type NamedService interface {
// Identifier of a service. It should be unique across all services.
Name() string
Service
}

View File

@@ -1,49 +0,0 @@
package registry
import (
"context"
"net"
"net/http"
)
var _ NamedService = (*httpService)(nil)
type httpService struct {
Listener net.Listener
Server *http.Server
name string
}
func newHttpService(name string) (*httpService, error) {
return &httpService{
name: name,
Server: &http.Server{},
}, nil
}
func (service *httpService) Name() string {
return service.name
}
func (service *httpService) Start(ctx context.Context) error {
listener, err := net.Listen("tcp", "localhost:0")
if err != nil {
return err
}
service.Listener = listener
if err := service.Server.Serve(service.Listener); err != nil {
if err != http.ErrServerClosed {
return err
}
}
return nil
}
func (service *httpService) Stop(ctx context.Context) error {
if err := service.Server.Shutdown(ctx); err != nil {
return err
}
return nil
}

41
pkg/signoz/config.go Normal file
View File

@@ -0,0 +1,41 @@
package signoz
import (
"context"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/config"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/instrumentation"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/web"
)
// Config defines the entire configuration of signoz.
type Config struct {
Instrumentation instrumentation.Config `mapstructure:"instrumentation"`
Web web.Config `mapstructure:"web"`
Cache cache.Config `mapstructure:"cache"`
SQLStore sqlstore.Config `mapstructure:"sqlstore"`
}
func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig) (Config, error) {
configFactories := []factory.ConfigFactory{
instrumentation.NewConfigFactory(),
web.NewConfigFactory(),
sqlstore.NewConfigFactory(),
cache.NewConfigFactory(),
}
conf, err := config.New(ctx, resolverConfig, configFactories)
if err != nil {
return Config{}, err
}
var config Config
if err := conf.Unmarshal("", &config); err != nil {
return Config{}, err
}
return config, nil
}

47
pkg/signoz/provider.go Normal file
View File

@@ -0,0 +1,47 @@
package signoz
import (
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/cache/memorycache"
"go.signoz.io/signoz/pkg/cache/rediscache"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/sqlstore/sqlitesqlstore"
"go.signoz.io/signoz/pkg/sqlstoremigrator/migrations"
"go.signoz.io/signoz/pkg/web"
"go.signoz.io/signoz/pkg/web/noopweb"
"go.signoz.io/signoz/pkg/web/routerweb"
)
type ProviderFactories struct {
SQLStoreMigrationFactories factory.NamedMap[factory.ProviderFactory[sqlstore.SQLStoreMigration, sqlstore.Config]]
SQLStoreProviderFactories factory.NamedMap[factory.ProviderFactory[sqlstore.SQLStore, sqlstore.Config]]
WebProviderFactories factory.NamedMap[factory.ProviderFactory[web.Web, web.Config]]
CacheProviderFactories factory.NamedMap[factory.ProviderFactory[cache.Cache, cache.Config]]
}
func NewProviderFactories() ProviderFactories {
return ProviderFactories{
SQLStoreMigrationFactories: factory.MustNewNamedMap(
migrations.NewAddDataMigrationsFactory(),
migrations.NewAddOrganizationFactory(),
migrations.NewAddPreferencesFactory(),
migrations.NewAddDashboardsFactory(),
migrations.NewAddSavedViewsFactory(),
migrations.NewAddAgentsFactory(),
migrations.NewAddPipelinesFactory(),
migrations.NewAddIntegrationsFactory(),
),
SQLStoreProviderFactories: factory.MustNewNamedMap(
sqlitesqlstore.NewFactory(),
),
WebProviderFactories: factory.MustNewNamedMap(
routerweb.NewFactory(),
noopweb.NewFactory(),
),
CacheProviderFactories: factory.MustNewNamedMap(
memorycache.NewFactory(),
rediscache.NewFactory(),
),
}
}

View File

@@ -1,38 +1,57 @@
package signoz
import (
"context"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/cache/memorycache"
"go.signoz.io/signoz/pkg/cache/rediscache"
"go.signoz.io/signoz/pkg/config"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/instrumentation"
"go.signoz.io/signoz/pkg/sqlstoremigrator"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/web"
"go.signoz.io/signoz/pkg/web/routerweb"
"go.uber.org/zap"
)
type SigNoz struct {
Cache cache.Cache
Web web.Web
Cache cache.Cache
Web web.Web
SQLStore sqlstore.SQLStore
SQLStoreMigrator sqlstore.SQLStoreMigrator
}
func New(config *config.Config, skipWebFrontend bool) (*SigNoz, error) {
var cache cache.Cache
func New(ctx context.Context, instrumentation instrumentation.Instrumentation, config Config, factories ProviderFactories) (*SigNoz, error) {
providerSettings := instrumentation.ToProviderSettings()
// init for the cache
switch config.Cache.Provider {
case "memory":
cache = memorycache.New(&config.Cache.Memory)
case "redis":
cache = rediscache.New(&config.Cache.Redis)
cache, err := factory.NewFromFactory(ctx, providerSettings, config.Cache, factories.CacheProviderFactories, config.Cache.Provider)
if err != nil {
return nil, err
}
web, err := routerweb.New(zap.L(), config.Web)
if err != nil && !skipWebFrontend {
web, err := factory.NewFromFactory(ctx, providerSettings, config.Web, factories.WebProviderFactories, config.Web.GetProvider())
if err != nil {
return nil, err
}
sqlStore, err := factory.NewFromFactory(ctx, providerSettings, config.SQLStore, factories.SQLStoreProviderFactories, config.SQLStore.Provider)
if err != nil {
return nil, err
}
migrations, err := sqlstoremigrator.NewMigrations(ctx, providerSettings, config.SQLStore, factories.SQLStoreMigrationFactories)
if err != nil {
return nil, err
}
sqlStoreMigrator := sqlstoremigrator.New(ctx, providerSettings, sqlStore, migrations, config.SQLStore)
err = sqlStoreMigrator.Migrate(ctx)
if err != nil {
return nil, err
}
return &SigNoz{
Cache: cache,
Web: web,
Cache: cache,
Web: web,
SQLStore: sqlStore,
}, nil
}

View File

@@ -1,41 +0,0 @@
package sqlmigrator
import (
"errors"
"time"
"go.signoz.io/signoz/pkg/factory"
)
type Config struct {
// Lock is the lock configuration.
Lock Lock `mapstructure:"lock"`
}
type Lock struct {
// Timeout is the time to wait for the migration lock.
Timeout time.Duration `mapstructure:"timeout"`
// Interval is the interval to try to acquire the migration lock.
Interval time.Duration `mapstructure:"interval"`
}
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("sqlmigrator"), newConfig)
}
func newConfig() factory.Config {
return Config{
Lock: Lock{
Timeout: 2 * time.Minute,
Interval: 10 * time.Second,
},
}
}
func (c Config) Validate() error {
if c.Lock.Timeout < c.Lock.Interval {
return errors.New("lock_timeout must be greater than lock_interval")
}
return nil
}

View File

@@ -1,48 +0,0 @@
package sqlmigrator
import (
"context"
"database/sql/driver"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/instrumentation/instrumentationtest"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/sqlstore/sqlstoretest"
)
func TestMigratorWithSqliteAndNoopMigration(t *testing.T) {
ctx := context.Background()
sqlstoreConfig := sqlstore.Config{
Provider: "sqlite",
}
migrationConfig := Config{
Lock: Lock{
Timeout: 10 * time.Second,
Interval: 1 * time.Second,
},
}
providerSettings := instrumentationtest.New().ToProviderSettings()
sqlstore := sqlstoretest.New(sqlstoreConfig, sqlmock.QueryMatcherRegexp)
migrator := New(
ctx,
providerSettings,
sqlstore,
MustNewMigrations(ctx, providerSettings, migrationConfig, factory.MustNewNamedMap(NoopMigrationFactory())),
migrationConfig,
)
sqlstore.Mock().ExpectExec("CREATE TABLE IF NOT EXISTS migration (.+)").WillReturnResult(driver.ResultNoRows)
sqlstore.Mock().ExpectExec("CREATE TABLE IF NOT EXISTS migration_lock (.+)").WillReturnResult(driver.ResultNoRows)
sqlstore.Mock().ExpectQuery("INSERT INTO migration_lock (.+)").WillReturnRows(sqlstore.Mock().NewRows([]string{"id"}).AddRow(1))
sqlstore.Mock().ExpectQuery("(.+) FROM migration").WillReturnRows(sqlstore.Mock().NewRows([]string{"id"}).AddRow(1))
sqlstore.Mock().ExpectQuery("INSERT INTO migration (.+)").WillReturnRows(sqlstore.Mock().NewRows([]string{"id", "migrated_at"}).AddRow(1, time.Now()))
err := migrator.Migrate(ctx)
require.NoError(t, err)
}

View File

@@ -1,27 +0,0 @@
package sqlmigrator
import (
"context"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
// SQLMigrator is the interface for the SQLMigrator.
type SQLMigrator interface {
// Migrate migrates the database. Migrate acquires a lock on the database and runs the migrations.
Migrate(context.Context) error
// Rollback rolls back the database. Rollback acquires a lock on the database and rolls back the migrations.
Rollback(context.Context) error
}
// SQLMigration is the interface for a single migration.
type SQLMigration interface {
// Register registers the migration with the given migrations. Each migration needs to be registered
//in a dedicated `*.go` file so that the correct migration semantics can be detected.
Register(*migrate.Migrations) error
// Up runs the migration.
Up(context.Context, *bun.DB) error
// Down rolls back the migration.
Down(context.Context, *bun.DB) error
}

Some files were not shown because too many files have changed in this diff Show More