Compare commits

...

8 Commits

Author SHA1 Message Date
grandwizard28
ecbe0b82de chore: remove utf-8 validation from main 2025-03-24 19:07:57 +05:30
grandwizard28
4085558281 feat(promengine): move directories 2025-03-24 18:53:25 +05:30
grandwizard28
93428970ab feat(promengine): move directories 2025-03-24 18:53:25 +05:30
grandwizard28
293047b9e6 feat(promengine): add utf-8 validation 2025-03-24 18:53:25 +05:30
grandwizard28
9272d2bb6d refactor: better readability 2025-03-24 18:53:25 +05:30
grandwizard28
22ecdc9a0c fix: fix a typo in telemetrystore 2025-03-24 18:53:25 +05:30
grandwizard28
3b1db1f6ea feat(promengine): add the deprecated flag 2025-03-24 18:53:24 +05:30
grandwizard28
0a1338bca9 feat(promengine): create a dedicated promengine package 2025-03-24 18:53:24 +05:30
36 changed files with 617 additions and 534 deletions

View File

@@ -74,6 +74,10 @@ go-run-enterprise: ## Runs the enterprise go backend server
--use-logs-new-schema true \ --use-logs-new-schema true \
--use-trace-new-schema true --use-trace-new-schema true
.PHONY: go-test
go-test: ## Runs go unit tests
@go test -race ./...
.PHONY: go-run-community .PHONY: go-run-community
go-run-community: ## Runs the community go backend server go-run-community: ## Runs the community go backend server
@SIGNOZ_INSTRUMENTATION_LOGS_LEVEL=debug \ @SIGNOZ_INSTRUMENTATION_LOGS_LEVEL=debug \

View File

@@ -72,7 +72,6 @@ sqlstore:
# The path to the SQLite database file. # The path to the SQLite database file.
path: /var/lib/signoz/signoz.db path: /var/lib/signoz/signoz.db
##################### APIServer ##################### ##################### APIServer #####################
apiserver: apiserver:
timeout: timeout:
@@ -91,20 +90,30 @@ apiserver:
- /api/v1/version - /api/v1/version
- / - /
##################### TelemetryStore ##################### ##################### TelemetryStore #####################
telemetrystore: telemetrystore:
# Specifies the telemetrystore provider to use.
provider: clickhouse
# Maximum number of idle connections in the connection pool. # Maximum number of idle connections in the connection pool.
max_idle_conns: 50 max_idle_conns: 50
# Maximum number of open connections to the database. # Maximum number of open connections to the database.
max_open_conns: 100 max_open_conns: 100
# Maximum time to wait for a connection to be established. # Maximum time to wait for a connection to be established.
dial_timeout: 5s dial_timeout: 5s
# Specifies the telemetrystore provider to use.
provider: clickhouse
clickhouse: clickhouse:
# The DSN to use for ClickHouse. # The DSN to use for clickhouse.
dsn: http://localhost:9000 dsn: tcp://localhost:9000
prometheus:
remote_read:
# The URL to use for remote read. It must be a clickhouse dsn pointing to the database where metrics is stored.
url: tcp://localhost:9000/signoz_metrics
active_query_tracker:
# Whether to enable the active query tracker.
enabled: true
# The path to use for the active query tracker.
path: ""
# The maximum number of concurrent queries.
max_concurrent: 20
##################### Alertmanager ##################### ##################### Alertmanager #####################
alertmanager: alertmanager:
@@ -117,7 +126,7 @@ alertmanager:
# The poll interval for periodically syncing the alertmanager with the config in the store. # The poll interval for periodically syncing the alertmanager with the config in the store.
poll_interval: 1m poll_interval: 1m
# The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. # The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself.
external_url: http://localhost:9093 external_url: http://localhost:8080
# The global configuration for the alertmanager. All the exahustive fields can be found in the upstream: https://github.com/prometheus/alertmanager/blob/efa05feffd644ba4accb526e98a8c6545d26a783/config/config.go#L833 # The global configuration for the alertmanager. All the exahustive fields can be found in the upstream: https://github.com/prometheus/alertmanager/blob/efa05feffd644ba4accb526e98a8c6545d26a783/config/config.go#L833
global: global:
# ResolveTimeout is the time after which an alert is declared resolved if it has not been updated. # ResolveTimeout is the time after which an alert is declared resolved if it has not been updated.

View File

@@ -10,6 +10,7 @@ import (
"github.com/SigNoz/signoz/pkg/cache" "github.com/SigNoz/signoz/pkg/cache"
basechr "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader" basechr "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/interfaces" "github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/telemetrystore"
) )
type ClickhouseReader struct { type ClickhouseReader struct {
@@ -20,8 +21,7 @@ type ClickhouseReader struct {
func NewDataConnector( func NewDataConnector(
localDB *sqlx.DB, localDB *sqlx.DB,
ch clickhouse.Conn, telemetryStore telemetrystore.TelemetryStore,
promConfigPath string,
lm interfaces.FeatureLookup, lm interfaces.FeatureLookup,
cluster string, cluster string,
useLogsNewSchema bool, useLogsNewSchema bool,
@@ -29,14 +29,10 @@ func NewDataConnector(
fluxIntervalForTraceDetail time.Duration, fluxIntervalForTraceDetail time.Duration,
cache cache.Cache, cache cache.Cache,
) *ClickhouseReader { ) *ClickhouseReader {
chReader := basechr.NewReader(localDB, ch, promConfigPath, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) chReader := basechr.NewReader(localDB, telemetryStore, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
return &ClickhouseReader{ return &ClickhouseReader{
conn: ch, conn: telemetryStore.ClickhouseDB(),
appdb: localDB, appdb: localDB,
ClickHouseReader: chReader, ClickHouseReader: chReader,
} }
} }
func (r *ClickhouseReader) Start(readerReady chan bool) {
r.ClickHouseReader.Start(readerReady)
}

View File

@@ -18,13 +18,13 @@ import (
"github.com/SigNoz/signoz/ee/query-service/constants" "github.com/SigNoz/signoz/ee/query-service/constants"
"github.com/SigNoz/signoz/ee/query-service/dao" "github.com/SigNoz/signoz/ee/query-service/dao"
"github.com/SigNoz/signoz/ee/query-service/integrations/gateway" "github.com/SigNoz/signoz/ee/query-service/integrations/gateway"
"github.com/SigNoz/signoz/ee/query-service/interfaces"
"github.com/SigNoz/signoz/ee/query-service/rules" "github.com/SigNoz/signoz/ee/query-service/rules"
"github.com/SigNoz/signoz/pkg/alertmanager" "github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/http/middleware" "github.com/SigNoz/signoz/pkg/http/middleware"
"github.com/SigNoz/signoz/pkg/query-service/auth" "github.com/SigNoz/signoz/pkg/query-service/auth"
"github.com/SigNoz/signoz/pkg/signoz" "github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/web" "github.com/SigNoz/signoz/pkg/web"
@@ -49,7 +49,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/healthcheck" "github.com/SigNoz/signoz/pkg/query-service/healthcheck"
baseint "github.com/SigNoz/signoz/pkg/query-service/interfaces" baseint "github.com/SigNoz/signoz/pkg/query-service/interfaces"
basemodel "github.com/SigNoz/signoz/pkg/query-service/model" basemodel "github.com/SigNoz/signoz/pkg/query-service/model"
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
baserules "github.com/SigNoz/signoz/pkg/query-service/rules" baserules "github.com/SigNoz/signoz/pkg/query-service/rules"
"github.com/SigNoz/signoz/pkg/query-service/telemetry" "github.com/SigNoz/signoz/pkg/query-service/telemetry"
"github.com/SigNoz/signoz/pkg/query-service/utils" "github.com/SigNoz/signoz/pkg/query-service/utils"
@@ -137,18 +136,15 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
// set license manager as feature flag provider in dao // set license manager as feature flag provider in dao
modelDao.SetFlagProvider(lm) modelDao.SetFlagProvider(lm)
readerReady := make(chan bool)
fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail) fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var reader interfaces.DataConnector reader := db.NewDataConnector(
qb := db.NewDataConnector(
serverOptions.SigNoz.SQLStore.SQLxDB(), serverOptions.SigNoz.SQLStore.SQLxDB(),
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(), serverOptions.SigNoz.TelemetryStore,
serverOptions.PromConfigPath,
lm, lm,
serverOptions.Cluster, serverOptions.Cluster,
serverOptions.UseLogsNewSchema, serverOptions.UseLogsNewSchema,
@@ -156,8 +152,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
fluxIntervalForTraceDetail, fluxIntervalForTraceDetail,
serverOptions.SigNoz.Cache, serverOptions.SigNoz.Cache,
) )
go qb.Start(readerReady)
reader = qb
skipConfig := &basemodel.SkipConfig{} skipConfig := &basemodel.SkipConfig{}
if serverOptions.SkipTopLvlOpsPath != "" { if serverOptions.SkipTopLvlOpsPath != "" {
@@ -176,9 +170,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
c = cache.NewCache(cacheOpts) c = cache.NewCache(cacheOpts)
} }
<-readerReady
rm, err := makeRulesManager( rm, err := makeRulesManager(
serverOptions.PromConfigPath,
serverOptions.RuleRepoURL, serverOptions.RuleRepoURL,
serverOptions.SigNoz.SQLStore.SQLxDB(), serverOptions.SigNoz.SQLStore.SQLxDB(),
reader, reader,
@@ -189,6 +181,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.UseTraceNewSchema, serverOptions.UseTraceNewSchema,
serverOptions.SigNoz.Alertmanager, serverOptions.SigNoz.Alertmanager,
serverOptions.SigNoz.SQLStore, serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore,
) )
if err != nil { if err != nil {
@@ -233,7 +226,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
} }
// start the usagemanager // start the usagemanager
usageManager, err := usage.New(modelDao, lm.GetRepo(), serverOptions.SigNoz.TelemetryStore.ClickHouseDB(), serverOptions.Config.TelemetryStore.ClickHouse.DSN) usageManager, err := usage.New(modelDao, lm.GetRepo(), serverOptions.SigNoz.TelemetryStore.ClickhouseDB(), serverOptions.Config.TelemetryStore.Clickhouse.DSN)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -304,7 +297,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
&opAmpModel.AllAgents, agentConfMgr, &opAmpModel.AllAgents, agentConfMgr,
) )
errorList := qb.PreloadMetricsMetadata(context.Background()) errorList := reader.PreloadMetricsMetadata(context.Background())
for _, er := range errorList { for _, er := range errorList {
zap.L().Error("failed to preload metrics metadata", zap.Error(er)) zap.L().Error("failed to preload metrics metadata", zap.Error(er))
} }
@@ -538,7 +531,6 @@ func (s *Server) Stop() error {
} }
func makeRulesManager( func makeRulesManager(
promConfigPath,
ruleRepoURL string, ruleRepoURL string,
db *sqlx.DB, db *sqlx.DB,
ch baseint.Reader, ch baseint.Reader,
@@ -549,16 +541,11 @@ func makeRulesManager(
useTraceNewSchema bool, useTraceNewSchema bool,
alertmanager alertmanager.Alertmanager, alertmanager alertmanager.Alertmanager,
sqlstore sqlstore.SQLStore, sqlstore sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
) (*baserules.Manager, error) { ) (*baserules.Manager, error) {
// create engine
pqle, err := pqle.FromConfigPath(promConfigPath)
if err != nil {
return nil, fmt.Errorf("failed to create pql engine : %v", err)
}
// create manager opts // create manager opts
managerOpts := &baserules.ManagerOptions{ managerOpts := &baserules.ManagerOptions{
PqlEngine: pqle, TelemetryStore: telemetryStore,
RepoURL: ruleRepoURL, RepoURL: ruleRepoURL,
DBConn: db, DBConn: db,
Context: context.Background(), Context: context.Background(),

View File

@@ -7,6 +7,5 @@ import (
// Connector defines methods for interaction // Connector defines methods for interaction
// with o11y data. for example - clickhouse // with o11y data. for example - clickhouse
type DataConnector interface { type DataConnector interface {
Start(readerReady chan bool)
baseint.Reader baseint.Reader
} }

View File

@@ -16,8 +16,6 @@ import (
"github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/version" "github.com/SigNoz/signoz/pkg/version"
prommodel "github.com/prometheus/common/model"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
) )
@@ -30,10 +28,6 @@ func initZapLog() *zap.Logger {
return logger return logger
} }
func init() {
prommodel.NameValidationScheme = prommodel.UTF8Validation
}
func main() { func main() {
var promConfigPath, skipTopLvlOpsPath string var promConfigPath, skipTopLvlOpsPath string
@@ -87,6 +81,7 @@ func main() {
MaxIdleConns: maxIdleConns, MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns, MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout, DialTimeout: dialTimeout,
Config: promConfigPath,
}) })
if err != nil { if err != nil {
zap.L().Fatal("Failed to create config", zap.Error(err)) zap.L().Fatal("Failed to create config", zap.Error(err))
@@ -103,7 +98,7 @@ func main() {
signoz.NewTelemetryStoreProviderFactories(), signoz.NewTelemetryStoreProviderFactories(),
) )
if err != nil { if err != nil {
zap.L().Fatal("Failed to create signoz struct", zap.Error(err)) zap.L().Fatal("Failed to create signoz", zap.Error(err))
} }
jwtSecret := os.Getenv("SIGNOZ_JWT_SECRET") jwtSecret := os.Getenv("SIGNOZ_JWT_SECRET")

View File

@@ -48,7 +48,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Rule, opts.Rule,
opts.Logger, opts.Logger,
opts.Reader, opts.Reader,
opts.ManagerOpts.PqlEngine, opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
baserules.WithSQLStore(opts.SQLStore), baserules.WithSQLStore(opts.SQLStore),
) )
@@ -145,7 +145,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
parsedRule, parsedRule,
opts.Logger, opts.Logger,
opts.Reader, opts.Reader,
opts.ManagerOpts.PqlEngine, opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
baserules.WithSendAlways(), baserules.WithSendAlways(),
baserules.WithSendUnmatched(), baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore), baserules.WithSQLStore(opts.SQLStore),

4
go.mod
View File

@@ -34,7 +34,6 @@ require (
github.com/knadh/koanf/v2 v2.1.1 github.com/knadh/koanf/v2 v2.1.1
github.com/mailru/easyjson v0.7.7 github.com/mailru/easyjson v0.7.7
github.com/mattn/go-sqlite3 v1.14.24 github.com/mattn/go-sqlite3 v1.14.24
github.com/oklog/oklog v0.3.2
github.com/open-telemetry/opamp-go v0.5.0 github.com/open-telemetry/opamp-go v0.5.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.111.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.111.0
github.com/opentracing/opentracing-go v1.2.0 github.com/opentracing/opentracing-go v1.2.0
@@ -93,6 +92,7 @@ require (
github.com/armon/go-metrics v0.4.1 // indirect github.com/armon/go-metrics v0.4.1 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/aws/aws-sdk-go v1.55.5 // indirect github.com/aws/aws-sdk-go v1.55.5 // indirect
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect
github.com/beevik/etree v1.1.0 // indirect github.com/beevik/etree v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
@@ -132,6 +132,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/btree v1.0.1 // indirect github.com/google/btree v1.0.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/s2a-go v0.1.8 // indirect github.com/google/s2a-go v0.1.8 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect github.com/gopherjs/gopherjs v1.17.2 // indirect
@@ -262,6 +263,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/atomic v1.11.0 // indirect go.uber.org/atomic v1.11.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/mod v0.22.0 // indirect golang.org/x/mod v0.22.0 // indirect
golang.org/x/net v0.33.0 // indirect golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.29.0 // indirect golang.org/x/sys v0.29.0 // indirect

2
go.sum
View File

@@ -690,8 +690,6 @@ github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnu
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/oklog v0.3.2 h1:wVfs8F+in6nTBMkA7CbRw+zZMIB7nNM825cM1wuzoTk=
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=

View File

@@ -1,10 +1,15 @@
package instrumentation package instrumentation
import ( import (
"context"
"fmt"
"log/slog" "log/slog"
"os" "os"
"time"
"github.com/SigNoz/signoz/pkg/instrumentation/loghandler" "github.com/SigNoz/signoz/pkg/instrumentation/loghandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
) )
func NewLogger(config Config, wrappers ...loghandler.Wrapper) *slog.Logger { func NewLogger(config Config, wrappers ...loghandler.Wrapper) *slog.Logger {
@@ -33,3 +38,65 @@ func NewLogger(config Config, wrappers ...loghandler.Wrapper) *slog.Logger {
return logger return logger
} }
var _ log.Logger = (*gokitLogger)(nil)
type gokitLogger struct {
handler slog.Handler
messageKey string
}
func NewGoKitLoggerFromSlogHandler(handler slog.Handler, messageKey string) log.Logger {
return &gokitLogger{handler, messageKey}
}
func (l *gokitLogger) Log(keyvals ...any) error {
var (
attrs []slog.Attr
message string
gkl level.Value
)
for i := 1; i < len(keyvals); i += 2 {
// go-kit/log keys don't have to be strings, but slog keys do.
// Convert the go-kit key to a string with fmt.Sprint.
key, ok := keyvals[i-1].(string)
if !ok {
key = fmt.Sprint(keyvals[i-1])
}
if l.messageKey != "" && key == l.messageKey {
message = fmt.Sprint(keyvals[i])
continue
}
if l, ok := keyvals[i].(level.Value); ok {
gkl = l
continue
}
attrs = append(attrs, slog.Any(key, keyvals[i]))
}
var sl slog.Level
if gkl != nil {
switch gkl {
case level.DebugValue():
sl = slog.LevelDebug
case level.InfoValue():
sl = slog.LevelInfo
case level.WarnValue():
sl = slog.LevelWarn
case level.ErrorValue():
sl = slog.LevelError
}
}
if !l.handler.Enabled(context.Background(), sl) {
return nil
}
r := slog.NewRecord(time.Now(), sl, message, 0)
r.AddAttrs(attrs...)
return l.handler.Handle(context.Background(), r)
}

18
pkg/promengine/config.go Normal file
View File

@@ -0,0 +1,18 @@
package promengine
import "net/url"
type ActiveQueryTrackerConfig struct {
Enabled bool `mapstructure:"enabled"`
Path string `mapstructure:"path"`
MaxConcurrent int `mapstructure:"max_concurrent"`
}
type RemoteReadConfig struct {
URL *url.URL `mapstructure:"url"`
}
type Config struct {
RemoteReadConfig RemoteReadConfig `mapstructure:"remote_read"`
ActiveQueryTrackerConfig ActiveQueryTrackerConfig `mapstructure:"active_query_tracker"`
}

87
pkg/promengine/ext.go Normal file
View File

@@ -0,0 +1,87 @@
package promengine
import (
"fmt"
"log/slog"
"time"
"github.com/SigNoz/signoz/pkg/instrumentation"
commoncfg "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
)
var _ PromEngine = (*ExtEngine)(nil)
type ExtEngine struct {
engine *promql.Engine
fanoutStorage storage.Storage
}
func New(logger *slog.Logger, cfg Config) (*ExtEngine, error) {
if logger == nil {
return nil, fmt.Errorf("logger is required")
}
gokitLogger := instrumentation.NewGoKitLoggerFromSlogHandler(logger.Handler(), "msg")
var activeQueryTracker promql.QueryTracker
if cfg.ActiveQueryTrackerConfig.Enabled {
activeQueryTracker = promql.NewActiveQueryTracker(
cfg.ActiveQueryTrackerConfig.Path,
cfg.ActiveQueryTrackerConfig.MaxConcurrent,
gokitLogger,
)
}
engine := promql.NewEngine(promql.EngineOpts{
Logger: gokitLogger,
Reg: nil,
MaxSamples: 50000000,
Timeout: time.Duration(2 * time.Minute),
ActiveQueryTracker: activeQueryTracker,
})
remoteStorage := remote.NewStorage(
gokitLogger,
nil,
func() (int64, error) { return int64(model.Latest), nil },
"",
time.Duration(1*time.Minute),
nil,
false,
)
fanoutStorage := storage.NewFanout(gokitLogger, remoteStorage)
config := &config.Config{
RemoteReadConfigs: []*config.RemoteReadConfig{
{
URL: &commoncfg.URL{URL: cfg.RemoteReadConfig.URL},
},
},
}
if err := config.UnmarshalYAML(func(i interface{}) error { return nil }); err != nil {
return nil, err
}
if err := remoteStorage.ApplyConfig(config); err != nil {
return nil, err
}
return &ExtEngine{
engine: engine,
fanoutStorage: fanoutStorage,
}, nil
}
func (engine *ExtEngine) Engine() *promql.Engine {
return engine.engine
}
func (engine *ExtEngine) Storage() storage.Storage {
return engine.fanoutStorage
}

View File

@@ -0,0 +1,20 @@
package promengine
import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
)
type PromEngine interface {
// Engine returns the underlying promql engine
Engine() *promql.Engine
// Storage returns the underlying storage
Storage() storage.Storage
}
// init initializes the prometheus model with UTF8 validation
func init() {
model.NameValidationScheme = model.UTF8Validation
}

View File

@@ -0,0 +1,46 @@
package promenginetest
import (
"time"
"github.com/SigNoz/signoz/pkg/promengine"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
)
var _ promengine.PromEngine = (*Engine)(nil)
type Engine struct {
engine *promql.Engine
storage storage.Storage
}
func New(outOfOrderTimeWindow ...int64) (*Engine, error) {
engine := promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 50000000,
Timeout: time.Duration(2 * time.Minute),
ActiveQueryTracker: nil,
})
testStorage, err := NewStorageWithError(outOfOrderTimeWindow...)
if err != nil {
return nil, err
}
fanoutStorage := storage.NewFanout(nil, testStorage)
return &Engine{
engine: engine,
storage: fanoutStorage,
}, nil
}
func (e *Engine) Engine() *promql.Engine {
return e.engine
}
func (e *Engine) Storage() storage.Storage {
return e.storage
}

View File

@@ -0,0 +1,76 @@
package promenginetest
import (
"fmt"
"os"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
)
type TestStorage struct {
*tsdb.DB
exemplarStorage tsdb.ExemplarStorage
dir string
}
// NewStorageWithError returns a new TestStorage for user facing tests, which reports
// errors directly.
func NewStorageWithError(outOfOrderTimeWindow ...int64) (*TestStorage, error) {
dir, err := os.MkdirTemp("", "test_storage")
if err != nil {
return nil, fmt.Errorf("opening test directory: %w", err)
}
// Tests just load data for a series sequentially. Thus we
// need a long appendable window.
opts := tsdb.DefaultOptions()
opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.RetentionDuration = 0
opts.EnableNativeHistograms = true
// Set OutOfOrderTimeWindow if provided, otherwise use default (0)
if len(outOfOrderTimeWindow) > 0 {
opts.OutOfOrderTimeWindow = outOfOrderTimeWindow[0]
} else {
opts.OutOfOrderTimeWindow = 0 // Default value is zero
}
db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats())
if err != nil {
return nil, fmt.Errorf("opening test storage: %w", err)
}
reg := prometheus.NewRegistry()
eMetrics := tsdb.NewExemplarMetrics(reg)
es, err := tsdb.NewCircularExemplarStorage(10, eMetrics)
if err != nil {
return nil, fmt.Errorf("opening test exemplar storage: %w", err)
}
return &TestStorage{DB: db, exemplarStorage: es, dir: dir}, nil
}
func (s TestStorage) Close() error {
if err := s.DB.Close(); err != nil {
return err
}
return os.RemoveAll(s.dir)
}
func (s TestStorage) ExemplarAppender() storage.ExemplarAppender {
return s
}
func (s TestStorage) ExemplarQueryable() storage.ExemplarQueryable {
return s.exemplarStorage
}
func (s TestStorage) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
return ref, s.exemplarStorage.AddExemplar(l, e)
}

View File

@@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"math" "math"
"math/rand" "math/rand"
"os"
"reflect" "reflect"
"regexp" "regexp"
"sort" "sort"
@@ -16,20 +15,16 @@ import (
"sync" "sync"
"time" "time"
"github.com/SigNoz/signoz/pkg/promengine"
"github.com/SigNoz/signoz/pkg/query-service/model/metrics_explorer" "github.com/SigNoz/signoz/pkg/query-service/model/metrics_explorer"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/mailru/easyjson" "github.com/mailru/easyjson"
"github.com/oklog/oklog/pkg/group"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/common/promlog"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/util/stats" "github.com/prometheus/prometheus/util/stats"
"github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2"
@@ -38,7 +33,6 @@ import (
"github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
promModel "github.com/prometheus/common/model"
"go.uber.org/zap" "go.uber.org/zap"
queryprogress "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader/query_progress" queryprogress "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader/query_progress"
@@ -120,6 +114,7 @@ var (
// SpanWriter for reading spans from ClickHouse // SpanWriter for reading spans from ClickHouse
type ClickHouseReader struct { type ClickHouseReader struct {
db clickhouse.Conn db clickhouse.Conn
engine promengine.PromEngine
localDB *sqlx.DB localDB *sqlx.DB
TraceDB string TraceDB string
operationsTable string operationsTable string
@@ -138,9 +133,6 @@ type ClickHouseReader struct {
logsAttributeKeys string logsAttributeKeys string
logsResourceKeys string logsResourceKeys string
logsTagAttributeTableV2 string logsTagAttributeTableV2 string
queryEngine *promql.Engine
remoteStorage *remote.Storage
fanoutStorage *storage.Storage
queryProgressTracker queryprogress.QueryProgressTracker queryProgressTracker queryprogress.QueryProgressTracker
logsTableV2 string logsTableV2 string
@@ -175,8 +167,7 @@ type ClickHouseReader struct {
// NewTraceReader returns a TraceReader for the database // NewTraceReader returns a TraceReader for the database
func NewReader( func NewReader(
localDB *sqlx.DB, localDB *sqlx.DB,
db driver.Conn, telemetryStore telemetrystore.TelemetryStore,
configFile string,
featureFlag interfaces.FeatureLookup, featureFlag interfaces.FeatureLookup,
cluster string, cluster string,
useLogsNewSchema bool, useLogsNewSchema bool,
@@ -185,14 +176,13 @@ func NewReader(
cache cache.Cache, cache cache.Cache,
) *ClickHouseReader { ) *ClickHouseReader {
options := NewOptions(primaryNamespace, archiveNamespace) options := NewOptions(primaryNamespace, archiveNamespace)
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) return NewReaderFromClickhouseConnection(options, localDB, telemetryStore, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
} }
func NewReaderFromClickhouseConnection( func NewReaderFromClickhouseConnection(
db driver.Conn,
options *Options, options *Options,
localDB *sqlx.DB, localDB *sqlx.DB,
configFile string, telemetryStore telemetrystore.TelemetryStore,
featureFlag interfaces.FeatureLookup, featureFlag interfaces.FeatureLookup,
cluster string, cluster string,
useLogsNewSchema bool, useLogsNewSchema bool,
@@ -215,7 +205,8 @@ func NewReaderFromClickhouseConnection(
} }
return &ClickHouseReader{ return &ClickHouseReader{
db: db, db: telemetryStore.ClickhouseDB(),
engine: telemetryStore.PrometheusEngine(),
localDB: localDB, localDB: localDB,
TraceDB: options.primary.TraceDB, TraceDB: options.primary.TraceDB,
operationsTable: options.primary.OperationsTable, operationsTable: options.primary.OperationsTable,
@@ -235,7 +226,6 @@ func NewReaderFromClickhouseConnection(
logsResourceKeys: options.primary.LogsResourceKeysTable, logsResourceKeys: options.primary.LogsResourceKeysTable,
logsTagAttributeTableV2: options.primary.LogsTagAttributeTableV2, logsTagAttributeTableV2: options.primary.LogsTagAttributeTableV2,
liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds, liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds,
promConfigFile: configFile,
featureFlags: featureFlag, featureFlags: featureFlag,
cluster: cluster, cluster: cluster,
queryProgressTracker: queryprogress.NewQueryProgressTracker(), queryProgressTracker: queryprogress.NewQueryProgressTracker(),
@@ -262,154 +252,8 @@ func NewReaderFromClickhouseConnection(
} }
} }
func (r *ClickHouseReader) Start(readerReady chan bool) {
logLevel := promlog.AllowedLevel{}
logLevel.Set("debug")
allowedFormat := promlog.AllowedFormat{}
allowedFormat.Set("logfmt")
promlogConfig := promlog.Config{
Level: &logLevel,
Format: &allowedFormat,
}
logger := promlog.New(&promlogConfig)
startTime := func() (int64, error) {
return int64(promModel.Latest), nil
}
remoteStorage := remote.NewStorage(
log.With(logger, "component", "remote"),
nil,
startTime,
"",
time.Duration(1*time.Minute),
nil,
false,
)
cfg := struct {
configFile string
localStoragePath string
lookbackDelta promModel.Duration
webTimeout promModel.Duration
queryTimeout promModel.Duration
queryConcurrency int
queryMaxSamples int
RemoteFlushDeadline promModel.Duration
prometheusURL string
logLevel promlog.AllowedLevel
}{
configFile: r.promConfigFile,
}
fanoutStorage := storage.NewFanout(logger, remoteStorage)
opts := promql.EngineOpts{
Logger: log.With(logger, "component", "query engine"),
Reg: nil,
MaxSamples: 50000000,
Timeout: time.Duration(2 * time.Minute),
ActiveQueryTracker: promql.NewActiveQueryTracker(
"",
20,
log.With(logger, "component", "activeQueryTracker"),
),
}
queryEngine := promql.NewEngine(opts)
reloaders := []func(cfg *config.Config) error{
remoteStorage.ApplyConfig,
}
// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
type closeOnce struct {
C chan struct{}
once sync.Once
Close func()
}
// Wait until the server is ready to handle reloading.
reloadReady := &closeOnce{
C: make(chan struct{}),
}
reloadReady.Close = func() {
reloadReady.once.Do(func() {
close(reloadReady.C)
})
}
var g group.Group
{
// Initial configuration loading.
cancel := make(chan struct{})
g.Add(
func() error {
var err error
r.promConfig, err = reloadConfig(cfg.configFile, logger, reloaders...)
if err != nil {
return fmt.Errorf("error loading config from %q: %s", cfg.configFile, err)
}
reloadReady.Close()
<-cancel
return nil
},
func(err error) {
close(cancel)
},
)
}
r.queryEngine = queryEngine
r.remoteStorage = remoteStorage
r.fanoutStorage = &fanoutStorage
readerReady <- true
if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
}
func (r *ClickHouseReader) GetQueryEngine() *promql.Engine {
return r.queryEngine
}
func (r *ClickHouseReader) GetFanoutStorage() *storage.Storage {
return r.fanoutStorage
}
func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (promConfig *config.Config, err error) {
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
conf, err := config.LoadFile(filename, false, false, logger)
if err != nil {
return nil, fmt.Errorf("couldn't load configuration (--config.file=%q): %v", filename, err)
}
failed := false
for _, rl := range rls {
if err := rl(conf); err != nil {
level.Error(logger).Log("msg", "Failed to apply configuration", "err", err)
failed = true
}
}
if failed {
return nil, fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename)
}
level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename)
return conf, nil
}
func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) { func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
qry, err := r.queryEngine.NewInstantQuery(ctx, r.remoteStorage, nil, queryParams.Query, queryParams.Time) qry, err := r.engine.Engine().NewInstantQuery(ctx, r.engine.Storage(), nil, queryParams.Query, queryParams.Time)
if err != nil { if err != nil {
return nil, nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} return nil, nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
} }
@@ -428,7 +272,7 @@ func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, que
} }
func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError) { func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
qry, err := r.queryEngine.NewRangeQuery(ctx, r.remoteStorage, nil, query.Query, query.Start, query.End, query.Step) qry, err := r.engine.Engine().NewRangeQuery(ctx, r.engine.Storage(), nil, query.Query, query.Start, query.End, query.Step)
if err != nil { if err != nil {
return nil, nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} return nil, nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}

View File

@@ -5,11 +5,11 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"math" "math"
"regexp"
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader" "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder" "github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3" tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
@@ -18,6 +18,7 @@ import (
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache" "github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/query-service/utils" "github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
cmock "github.com/srikanthccv/ClickHouse-go-mock" cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@@ -1131,20 +1132,6 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
} }
} }
type regexMatcher struct {
}
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
re, err := regexp.Compile(expectedSQL)
if err != nil {
return err
}
if !re.MatchString(actualSQL) {
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
}
return nil
}
func Test_querier_runWindowBasedListQuery(t *testing.T) { func Test_querier_runWindowBasedListQuery(t *testing.T) {
params := &v3.QueryRangeParamsV3{ params := &v3.QueryRangeParamsV3{
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
@@ -1358,8 +1345,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
// Setup mock // Setup mock
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &regexMatcher{}) telemetryStore, err := telemetrystoretest.New(sqlmock.QueryMatcherRegexp)
require.NoError(t, err, "Failed to create ClickHouse mock") require.NoError(t, err)
// Configure mock responses // Configure mock responses
for _, response := range tc.queryResponses { for _, response := range tc.queryResponses {
@@ -1368,7 +1355,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
values = append(values, []any{&ts, &testName}) values = append(values, []any{&ts, &testName})
} }
// if len(values) > 0 { // if len(values) > 0 {
mock.ExpectQuery(response.expectedQuery).WillReturnRows( telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
cmock.NewRows(cols, values), cmock.NewRows(cols, values),
) )
// } // }
@@ -1376,10 +1363,9 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
// Create reader and querier // Create reader and querier
reader := clickhouseReader.NewReaderFromClickhouseConnection( reader := clickhouseReader.NewReaderFromClickhouseConnection(
mock,
options, options,
nil, nil,
"", telemetryStore,
featureManager.StartManager(), featureManager.StartManager(),
"", "",
true, true,
@@ -1429,7 +1415,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
} }
// Verify mock expectations // Verify mock expectations
err = mock.ExpectationsWereMet() err = telemetryStore.Mock().ExpectationsWereMet()
require.NoError(t, err, "Mock expectations were not met") require.NoError(t, err, "Mock expectations were not met")
}) })
} }

View File

@@ -5,11 +5,11 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"math" "math"
"regexp"
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader" "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder" "github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3" tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
@@ -18,6 +18,7 @@ import (
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache" "github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/query-service/utils" "github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
cmock "github.com/srikanthccv/ClickHouse-go-mock" cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@@ -1185,20 +1186,6 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
} }
} }
type regexMatcher struct {
}
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
re, err := regexp.Compile(expectedSQL)
if err != nil {
return err
}
if !re.MatchString(actualSQL) {
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
}
return nil
}
func Test_querier_runWindowBasedListQuery(t *testing.T) { func Test_querier_runWindowBasedListQuery(t *testing.T) {
params := &v3.QueryRangeParamsV3{ params := &v3.QueryRangeParamsV3{
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
@@ -1412,8 +1399,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
// Setup mock // Setup mock
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &regexMatcher{}) telemetryStore, err := telemetrystoretest.New(sqlmock.QueryMatcherRegexp)
require.NoError(t, err, "Failed to create ClickHouse mock") require.NoError(t, err)
// Configure mock responses // Configure mock responses
for _, response := range tc.queryResponses { for _, response := range tc.queryResponses {
@@ -1422,7 +1409,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
values = append(values, []any{&ts, &testName}) values = append(values, []any{&ts, &testName})
} }
// if len(values) > 0 { // if len(values) > 0 {
mock.ExpectQuery(response.expectedQuery).WillReturnRows( telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
cmock.NewRows(cols, values), cmock.NewRows(cols, values),
) )
// } // }
@@ -1430,10 +1417,9 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
// Create reader and querier // Create reader and querier
reader := clickhouseReader.NewReaderFromClickhouseConnection( reader := clickhouseReader.NewReaderFromClickhouseConnection(
mock,
options, options,
nil, nil,
"", telemetryStore,
featureManager.StartManager(), featureManager.StartManager(),
"", "",
true, true,
@@ -1483,7 +1469,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
} }
// Verify mock expectations // Verify mock expectations
err = mock.ExpectationsWereMet() err = telemetryStore.Mock().ExpectationsWereMet()
require.NoError(t, err, "Mock expectations were not met") require.NoError(t, err, "Mock expectations were not met")
}) })
} }

View File

@@ -25,6 +25,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/app/preferences" "github.com/SigNoz/signoz/pkg/query-service/app/preferences"
"github.com/SigNoz/signoz/pkg/signoz" "github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/web" "github.com/SigNoz/signoz/pkg/web"
@@ -40,7 +41,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/healthcheck" "github.com/SigNoz/signoz/pkg/query-service/healthcheck"
"github.com/SigNoz/signoz/pkg/query-service/interfaces" "github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/model"
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
"github.com/SigNoz/signoz/pkg/query-service/rules" "github.com/SigNoz/signoz/pkg/query-service/rules"
"github.com/SigNoz/signoz/pkg/query-service/telemetry" "github.com/SigNoz/signoz/pkg/query-service/telemetry"
"github.com/SigNoz/signoz/pkg/query-service/utils" "github.com/SigNoz/signoz/pkg/query-service/utils"
@@ -119,10 +119,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err return nil, err
} }
clickhouseReader := clickhouseReader.NewReader( reader := clickhouseReader.NewReader(
serverOptions.SigNoz.SQLStore.SQLxDB(), serverOptions.SigNoz.SQLStore.SQLxDB(),
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(), serverOptions.SigNoz.TelemetryStore,
serverOptions.PromConfigPath,
fm, fm,
serverOptions.Cluster, serverOptions.Cluster,
serverOptions.UseLogsNewSchema, serverOptions.UseLogsNewSchema,
@@ -130,8 +129,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
fluxIntervalForTraceDetail, fluxIntervalForTraceDetail,
serverOptions.SigNoz.Cache, serverOptions.SigNoz.Cache,
) )
go clickhouseReader.Start(readerReady)
reader := clickhouseReader
skipConfig := &model.SkipConfig{} skipConfig := &model.SkipConfig{}
if serverOptions.SkipTopLvlOpsPath != "" { if serverOptions.SkipTopLvlOpsPath != "" {
@@ -162,6 +159,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.UseLogsNewSchema, serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema, serverOptions.UseTraceNewSchema,
serverOptions.SigNoz.SQLStore, serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore,
) )
if err != nil { if err != nil {
return nil, err return nil, err
@@ -491,17 +489,11 @@ func makeRulesManager(
useLogsNewSchema bool, useLogsNewSchema bool,
useTraceNewSchema bool, useTraceNewSchema bool,
sqlstore sqlstore.SQLStore, sqlstore sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
) (*rules.Manager, error) { ) (*rules.Manager, error) {
// create engine
pqle, err := pqle.FromReader(ch)
if err != nil {
return nil, fmt.Errorf("failed to create pql engine : %v", err)
}
// create manager opts // create manager opts
managerOpts := &rules.ManagerOptions{ managerOpts := &rules.ManagerOptions{
PqlEngine: pqle, TelemetryStore: telemetryStore,
RepoURL: ruleRepoURL, RepoURL: ruleRepoURL,
DBConn: db, DBConn: db,
Context: context.Background(), Context: context.Background(),

View File

@@ -9,7 +9,6 @@ import (
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache" "github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/stats" "github.com/prometheus/prometheus/util/stats"
) )
@@ -86,10 +85,6 @@ type Reader interface {
req *v3.QBFilterSuggestionsRequest, req *v3.QBFilterSuggestionsRequest,
) (*v3.QBFilterSuggestionsResponse, *model.ApiError) ) (*v3.QBFilterSuggestionsResponse, *model.ApiError)
// Connection needed for rules, not ideal but required
GetQueryEngine() *promql.Engine
GetFanoutStorage() *storage.Storage
QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error) QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error)
CheckClickHouse(ctx context.Context) error CheckClickHouse(ctx context.Context) error

View File

@@ -15,7 +15,6 @@ import (
"github.com/SigNoz/signoz/pkg/signoz" "github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/version" "github.com/SigNoz/signoz/pkg/version"
prommodel "github.com/prometheus/common/model"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
@@ -29,10 +28,6 @@ func initZapLog() *zap.Logger {
return logger return logger
} }
func init() {
prommodel.NameValidationScheme = prommodel.UTF8Validation
}
func main() { func main() {
var promConfigPath, skipTopLvlOpsPath string var promConfigPath, skipTopLvlOpsPath string
@@ -85,6 +80,7 @@ func main() {
MaxIdleConns: maxIdleConns, MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns, MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout, DialTimeout: dialTimeout,
Config: promConfigPath,
}) })
if err != nil { if err != nil {
zap.L().Fatal("Failed to create config", zap.Error(err)) zap.L().Fatal("Failed to create config", zap.Error(err))
@@ -101,7 +97,7 @@ func main() {
signoz.NewTelemetryStoreProviderFactories(), signoz.NewTelemetryStoreProviderFactories(),
) )
if err != nil { if err != nil {
zap.L().Fatal("Failed to create signoz struct", zap.Error(err)) zap.L().Fatal("Failed to create signoz", zap.Error(err))
} }
// Read the jwt secret key // Read the jwt secret key

View File

@@ -1,126 +0,0 @@
package promql
import (
"context"
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/go-kit/log"
pmodel "github.com/prometheus/common/model"
"github.com/prometheus/common/promlog"
pconfig "github.com/prometheus/prometheus/config"
pql "github.com/prometheus/prometheus/promql"
pstorage "github.com/prometheus/prometheus/storage"
premote "github.com/prometheus/prometheus/storage/remote"
)
type PqlEngine struct {
engine *pql.Engine
fanoutStorage pstorage.Storage
}
func FromConfigPath(promConfigPath string) (*PqlEngine, error) {
// load storage path
c, err := pconfig.LoadFile(promConfigPath, false, false, nil)
if err != nil {
return nil, fmt.Errorf("couldn't load configuration (--config.file=%q): %v", promConfigPath, err)
}
return NewPqlEngine(c)
}
func FromReader(ch interfaces.Reader) (*PqlEngine, error) {
return &PqlEngine{
engine: ch.GetQueryEngine(),
fanoutStorage: *ch.GetFanoutStorage(),
}, nil
}
func NewPqlEngine(config *pconfig.Config) (*PqlEngine, error) {
logLevel := promlog.AllowedLevel{}
_ = logLevel.Set("debug")
allowedFormat := promlog.AllowedFormat{}
_ = allowedFormat.Set("logfmt")
promlogConfig := promlog.Config{
Level: &logLevel,
Format: &allowedFormat,
}
logger := promlog.New(&promlogConfig)
opts := pql.EngineOpts{
Logger: log.With(logger, "component", "promql evaluator"),
Reg: nil,
MaxSamples: 50000000,
Timeout: time.Duration(2 * time.Minute),
ActiveQueryTracker: pql.NewActiveQueryTracker(
"",
20,
logger,
),
}
e := pql.NewEngine(opts)
startTime := func() (int64, error) {
return int64(pmodel.Latest), nil
}
remoteStorage := premote.NewStorage(
log.With(logger, "component", "remote"),
nil,
startTime,
"",
time.Duration(1*time.Minute),
nil,
false,
)
fanoutStorage := pstorage.NewFanout(logger, remoteStorage)
_ = remoteStorage.ApplyConfig(config)
return &PqlEngine{
engine: e,
fanoutStorage: fanoutStorage,
}, nil
}
func (p *PqlEngine) RunAlertQuery(ctx context.Context, qs string, start, end time.Time, interval time.Duration) (pql.Matrix, error) {
q, err := p.engine.NewRangeQuery(ctx, p.fanoutStorage, nil, qs, start, end, interval)
if err != nil {
return nil, err
}
res := q.Exec(ctx)
if res.Err != nil {
return nil, res.Err
}
switch typ := res.Value.(type) {
case pql.Vector:
series := make([]pql.Series, 0, len(typ))
value := res.Value.(pql.Vector)
for _, smpl := range value {
series = append(series, pql.Series{
Metric: smpl.Metric,
Floats: []pql.FPoint{{T: smpl.T, F: smpl.F}},
})
}
return series, nil
case pql.Scalar:
value := res.Value.(pql.Scalar)
series := make([]pql.Series, 0, 1)
series = append(series, pql.Series{
Floats: []pql.FPoint{{T: value.T, F: value.V}},
})
return series, nil
case pql.Matrix:
return res.Value.(pql.Matrix), nil
default:
return nil, fmt.Errorf("rule result is not a vector or scalar")
}
}

View File

@@ -21,9 +21,9 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/cache" "github.com/SigNoz/signoz/pkg/query-service/cache"
"github.com/SigNoz/signoz/pkg/query-service/interfaces" "github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/model"
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
"github.com/SigNoz/signoz/pkg/query-service/telemetry" "github.com/SigNoz/signoz/pkg/query-service/telemetry"
"github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes" "github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
"github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/types/authtypes"
) )
@@ -76,8 +76,7 @@ func prepareTaskName(ruleId interface{}) string {
// ManagerOptions bundles options for the Manager. // ManagerOptions bundles options for the Manager.
type ManagerOptions struct { type ManagerOptions struct {
PqlEngine *pqle.PqlEngine TelemetryStore telemetrystore.TelemetryStore
// RepoURL is used to generate a backlink in sent alert messages // RepoURL is used to generate a backlink in sent alert messages
RepoURL string RepoURL string
@@ -180,7 +179,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.Rule, opts.Rule,
opts.Logger, opts.Logger,
opts.Reader, opts.Reader,
opts.ManagerOpts.PqlEngine, opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
WithSQLStore(opts.SQLStore), WithSQLStore(opts.SQLStore),
) )

View File

@@ -8,21 +8,22 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"github.com/SigNoz/signoz/pkg/promengine"
"github.com/SigNoz/signoz/pkg/query-service/formatter" "github.com/SigNoz/signoz/pkg/query-service/formatter"
"github.com/SigNoz/signoz/pkg/query-service/interfaces" "github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels" qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/query-service/utils/times" "github.com/SigNoz/signoz/pkg/query-service/utils/times"
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp" "github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
pql "github.com/prometheus/prometheus/promql"
yaml "gopkg.in/yaml.v2" yaml "gopkg.in/yaml.v2"
) )
type PromRule struct { type PromRule struct {
*BaseRule *BaseRule
pqlEngine *pqle.PqlEngine pqlEngine promengine.PromEngine
} }
func NewPromRule( func NewPromRule(
@@ -30,7 +31,7 @@ func NewPromRule(
postableRule *PostableRule, postableRule *PostableRule,
logger *zap.Logger, logger *zap.Logger,
reader interfaces.Reader, reader interfaces.Reader,
pqlEngine *pqle.PqlEngine, pqlEngine promengine.PromEngine,
opts ...RuleOption, opts ...RuleOption,
) (*PromRule, error) { ) (*PromRule, error) {
@@ -108,7 +109,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
return nil, err return nil, err
} }
zap.L().Info("evaluating promql query", zap.String("name", r.Name()), zap.String("query", q)) zap.L().Info("evaluating promql query", zap.String("name", r.Name()), zap.String("query", q))
res, err := r.pqlEngine.RunAlertQuery(ctx, q, start, end, interval) res, err := r.RunAlertQuery(ctx, q, start, end, interval)
if err != nil { if err != nil {
r.SetHealth(HealthBad) r.SetHealth(HealthBad)
r.SetLastError(err) r.SetLastError(err)
@@ -306,6 +307,43 @@ func (r *PromRule) String() string {
return string(byt) return string(byt)
} }
func (r *PromRule) RunAlertQuery(ctx context.Context, qs string, start, end time.Time, interval time.Duration) (pql.Matrix, error) {
q, err := r.pqlEngine.Engine().NewRangeQuery(ctx, r.pqlEngine.Storage(), nil, qs, start, end, interval)
if err != nil {
return nil, err
}
res := q.Exec(ctx)
if res.Err != nil {
return nil, res.Err
}
switch typ := res.Value.(type) {
case pql.Vector:
series := make([]pql.Series, 0, len(typ))
value := res.Value.(pql.Vector)
for _, smpl := range value {
series = append(series, pql.Series{
Metric: smpl.Metric,
Floats: []pql.FPoint{{T: smpl.T, F: smpl.F}},
})
}
return series, nil
case pql.Scalar:
value := res.Value.(pql.Scalar)
series := make([]pql.Series, 0, 1)
series = append(series, pql.Series{
Floats: []pql.FPoint{{T: value.T, F: value.V}},
})
return series, nil
case pql.Matrix:
return res.Value.(pql.Matrix), nil
default:
return nil, fmt.Errorf("rule result is not a vector or scalar")
}
}
func toCommonSeries(series promql.Series) v3.Series { func toCommonSeries(series promql.Series) v3.Series {
commonSeries := v3.Series{ commonSeries := v3.Series{
Labels: make(map[string]string), Labels: make(map[string]string),

View File

@@ -68,7 +68,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
parsedRule, parsedRule,
opts.Logger, opts.Logger,
opts.Reader, opts.Reader,
opts.ManagerOpts.PqlEngine, opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
WithSendAlways(), WithSendAlways(),
WithSendUnmatched(), WithSendUnmatched(),
WithSQLStore(opts.SQLStore), WithSQLStore(opts.SQLStore),

View File

@@ -3,19 +3,22 @@ package rules
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/factory/factorytest"
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/factory/factorytest"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader" "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/common" "github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/featureManager" "github.com/SigNoz/signoz/pkg/query-service/featureManager"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels" "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cmock "github.com/srikanthccv/ClickHouse-go-mock" cmock "github.com/srikanthccv/ClickHouse-go-mock"
) )
@@ -1152,10 +1155,8 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
}, },
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{}) telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
if err != nil { require.NoError(t, err)
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
}
cols := make([]cmock.ColumnType, 0) cols := make([]cmock.ColumnType, 0)
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"}) cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"})
@@ -1227,11 +1228,11 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
for idx, c := range cases { for idx, c := range cases {
rows := cmock.NewRows(cols, c.values) rows := cmock.NewRows(cols, c.values)
mock.ExpectQuery(".*").WillReturnError(fmt.Errorf("error")) telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
// We are testing the eval logic after the query is run // We are testing the eval logic after the query is run
// so we don't care about the query string here // so we don't care about the query string here
queryString := "SELECT any" queryString := "SELECT any"
mock. telemetryStore.Mock().
ExpectQuery(queryString). ExpectQuery(queryString).
WillReturnRows(rows) WillReturnRows(rows)
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp) postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
@@ -1246,7 +1247,8 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
options := clickhouseReader.NewOptions("", "", "archiveNamespace") options := clickhouseReader.NewOptions("", "", "archiveNamespace")
readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}}) readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), readerCache) require.NoError(t, err)
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), readerCache)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{ rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": { "signoz_calls_total": {
@@ -1305,10 +1307,8 @@ func TestThresholdRuleNoData(t *testing.T) {
}, },
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{}) telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
if err != nil { require.NoError(t, err)
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
}
cols := make([]cmock.ColumnType, 0) cols := make([]cmock.ColumnType, 0)
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"}) cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"})
@@ -1328,12 +1328,12 @@ func TestThresholdRuleNoData(t *testing.T) {
for idx, c := range cases { for idx, c := range cases {
rows := cmock.NewRows(cols, c.values) rows := cmock.NewRows(cols, c.values)
mock.ExpectQuery(".*").WillReturnError(fmt.Errorf("error")) telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
// We are testing the eval logic after the query is run // We are testing the eval logic after the query is run
// so we don't care about the query string here // so we don't care about the query string here
queryString := "SELECT any" queryString := "SELECT any"
mock. telemetryStore.Mock().
ExpectQuery(queryString). ExpectQuery(queryString).
WillReturnRows(rows) WillReturnRows(rows)
var target float64 = 0 var target float64 = 0
@@ -1346,7 +1346,7 @@ func TestThresholdRuleNoData(t *testing.T) {
} }
readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}}) readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
options := clickhouseReader.NewOptions("", "", "archiveNamespace") options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), readerCache) reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), readerCache)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{ rule.TemporalityMap = map[string]map[v3.Temporality]bool{
@@ -1410,10 +1410,8 @@ func TestThresholdRuleTracesLink(t *testing.T) {
}, },
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{}) telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
if err != nil { require.NoError(t, err)
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
}
metaCols := make([]cmock.ColumnType, 0) metaCols := make([]cmock.ColumnType, 0)
metaCols = append(metaCols, cmock.ColumnType{Name: "DISTINCT(tagKey)", Type: "String"}) metaCols = append(metaCols, cmock.ColumnType{Name: "DISTINCT(tagKey)", Type: "String"})
@@ -1428,11 +1426,11 @@ func TestThresholdRuleTracesLink(t *testing.T) {
for idx, c := range testCases { for idx, c := range testCases {
metaRows := cmock.NewRows(metaCols, c.metaValues) metaRows := cmock.NewRows(metaCols, c.metaValues)
mock. telemetryStore.Mock().
ExpectQuery("SELECT DISTINCT(tagKey), tagType, dataType FROM archiveNamespace.span_attributes_keys"). ExpectQuery("SELECT DISTINCT(tagKey), tagType, dataType FROM archiveNamespace.span_attributes_keys").
WillReturnRows(metaRows) WillReturnRows(metaRows)
mock. telemetryStore.Mock().
ExpectSelect("SHOW CREATE TABLE signoz_traces.distributed_signoz_index_v3").WillReturnRows(&cmock.Rows{}) ExpectSelect("SHOW CREATE TABLE signoz_traces.distributed_signoz_index_v3").WillReturnRows(&cmock.Rows{})
rows := cmock.NewRows(cols, c.values) rows := cmock.NewRows(cols, c.values)
@@ -1440,7 +1438,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
// We are testing the eval logic after the query is run // We are testing the eval logic after the query is run
// so we don't care about the query string here // so we don't care about the query string here
queryString := "SELECT any" queryString := "SELECT any"
mock. telemetryStore.Mock().
ExpectQuery(queryString). ExpectQuery(queryString).
WillReturnRows(rows) WillReturnRows(rows)
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp) postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
@@ -1454,7 +1452,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
} }
options := clickhouseReader.NewOptions("", "", "archiveNamespace") options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), nil)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{ rule.TemporalityMap = map[string]map[v3.Temporality]bool{
@@ -1523,10 +1521,8 @@ func TestThresholdRuleLogsLink(t *testing.T) {
}, },
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{}) telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
if err != nil { require.NoError(t, err)
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
}
attrMetaCols := make([]cmock.ColumnType, 0) attrMetaCols := make([]cmock.ColumnType, 0)
attrMetaCols = append(attrMetaCols, cmock.ColumnType{Name: "name", Type: "String"}) attrMetaCols = append(attrMetaCols, cmock.ColumnType{Name: "name", Type: "String"})
@@ -1546,17 +1542,17 @@ func TestThresholdRuleLogsLink(t *testing.T) {
for idx, c := range testCases { for idx, c := range testCases {
attrMetaRows := cmock.NewRows(attrMetaCols, c.attrMetaValues) attrMetaRows := cmock.NewRows(attrMetaCols, c.attrMetaValues)
mock. telemetryStore.Mock().
ExpectSelect("SELECT DISTINCT name, datatype from signoz_logs.distributed_logs_attribute_keys group by name, datatype"). ExpectSelect("SELECT DISTINCT name, datatype from signoz_logs.distributed_logs_attribute_keys group by name, datatype").
WillReturnRows(attrMetaRows) WillReturnRows(attrMetaRows)
resourceMetaRows := cmock.NewRows(resourceMetaCols, c.resourceMetaValues) resourceMetaRows := cmock.NewRows(resourceMetaCols, c.resourceMetaValues)
mock. telemetryStore.Mock().
ExpectSelect("SELECT DISTINCT name, datatype from signoz_logs.distributed_logs_resource_keys group by name, datatype"). ExpectSelect("SELECT DISTINCT name, datatype from signoz_logs.distributed_logs_resource_keys group by name, datatype").
WillReturnRows(resourceMetaRows) WillReturnRows(resourceMetaRows)
createTableRows := cmock.NewRows(createTableCols, c.createTableValues) createTableRows := cmock.NewRows(createTableCols, c.createTableValues)
mock. telemetryStore.Mock().
ExpectSelect("SHOW CREATE TABLE signoz_logs.logs"). ExpectSelect("SHOW CREATE TABLE signoz_logs.logs").
WillReturnRows(createTableRows) WillReturnRows(createTableRows)
@@ -1565,7 +1561,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
// We are testing the eval logic after the query is run // We are testing the eval logic after the query is run
// so we don't care about the query string here // so we don't care about the query string here
queryString := "SELECT any" queryString := "SELECT any"
mock. telemetryStore.Mock().
ExpectQuery(queryString). ExpectQuery(queryString).
WillReturnRows(rows) WillReturnRows(rows)
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp) postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
@@ -1579,7 +1575,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
} }
options := clickhouseReader.NewOptions("", "", "archiveNamespace") options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), nil)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{ rule.TemporalityMap = map[string]map[v3.Temporality]bool{

View File

@@ -20,6 +20,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/dao" "github.com/SigNoz/signoz/pkg/query-service/dao"
"github.com/SigNoz/signoz/pkg/query-service/interfaces" "github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
"github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/google/uuid" "github.com/google/uuid"
@@ -39,14 +40,14 @@ func NewMockClickhouseReader(
) { ) {
require.NotNil(t, testDB) require.NotNil(t, testDB)
mockDB, err := mockhouse.NewClickHouseWithQueryMatcher(nil, sqlmock.QueryMatcherRegexp) telemetryStore, err := telemetrystoretest.New(sqlmock.QueryMatcherRegexp)
require.NoError(t, err)
require.Nil(t, err, "could not init mock clickhouse") require.Nil(t, err, "could not init mock clickhouse")
reader := clickhouseReader.NewReaderFromClickhouseConnection( reader := clickhouseReader.NewReaderFromClickhouseConnection(
mockDB,
clickhouseReader.NewOptions("", ""), clickhouseReader.NewOptions("", ""),
testDB, testDB,
"", telemetryStore,
featureFlags, featureFlags,
"", "",
true, true,
@@ -55,7 +56,7 @@ func NewMockClickhouseReader(
nil, nil,
) )
return reader, mockDB return reader, telemetryStore.Mock()
} }
func addLogsQueryExpectation( func addLogsQueryExpectation(

View File

@@ -20,6 +20,8 @@ import (
"github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/version" "github.com/SigNoz/signoz/pkg/version"
"github.com/SigNoz/signoz/pkg/web" "github.com/SigNoz/signoz/pkg/web"
"github.com/go-kit/log"
promconfig "github.com/prometheus/prometheus/config"
) )
// Config defines the entire input configuration of signoz. // Config defines the entire input configuration of signoz.
@@ -49,7 +51,7 @@ type Config struct {
APIServer apiserver.Config `mapstructure:"apiserver"` APIServer apiserver.Config `mapstructure:"apiserver"`
// TelemetryStore config // TelemetryStore config
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"` TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore" yaml:"telemetrystore"`
// Alertmanager config // Alertmanager config
Alertmanager alertmanager.Config `mapstructure:"alertmanager" yaml:"alertmanager"` Alertmanager alertmanager.Config `mapstructure:"alertmanager" yaml:"alertmanager"`
@@ -61,6 +63,7 @@ type DeprecatedFlags struct {
MaxIdleConns int MaxIdleConns int
MaxOpenConns int MaxOpenConns int
DialTimeout time.Duration DialTimeout time.Duration
Config string
} }
func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprecatedFlags DeprecatedFlags) (Config, error) { func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprecatedFlags DeprecatedFlags) (Config, error) {
@@ -145,7 +148,7 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
if os.Getenv("ClickHouseUrl") != "" { if os.Getenv("ClickHouseUrl") != "" {
fmt.Println("[Deprecated] env ClickHouseUrl is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN instead.") fmt.Println("[Deprecated] env ClickHouseUrl is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN instead.")
config.TelemetryStore.ClickHouse.DSN = os.Getenv("ClickHouseUrl") config.TelemetryStore.Clickhouse.DSN = os.Getenv("ClickHouseUrl")
} }
if deprecatedFlags.MaxIdleConns != 50 { if deprecatedFlags.MaxIdleConns != 50 {
@@ -176,4 +179,18 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
if os.Getenv("ALERTMANAGER_API_CHANNEL_PATH") != "" { if os.Getenv("ALERTMANAGER_API_CHANNEL_PATH") != "" {
fmt.Println("[Deprecated] env ALERTMANAGER_API_CHANNEL_PATH is deprecated and scheduled for complete removal.") fmt.Println("[Deprecated] env ALERTMANAGER_API_CHANNEL_PATH is deprecated and scheduled for complete removal.")
} }
if deprecatedFlags.Config != "" {
fmt.Println("[Deprecated] flag --config is deprecated for passing prometheus config. Please use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_PROMETHEUS_REMOTE__READ_URL instead. The flag will be used for passing the entire SigNoz config. More details can be found at https://github.com/SigNoz/signoz/issues/6805.")
cfg, err := promconfig.LoadFile(deprecatedFlags.Config, false, false, log.NewNopLogger())
if err != nil {
fmt.Println("Error parsing config, using value set in SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_PROMETHEUS_REMOTE__READ_URL")
} else {
if len(cfg.RemoteReadConfigs) != 1 {
fmt.Println("Error finding remote read config, using value set in SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_PROMETHEUS_REMOTE__READ_URL")
} else {
config.TelemetryStore.Clickhouse.Prometheus.RemoteReadConfig.URL = cfg.RemoteReadConfigs[0].URL.URL
}
}
}
} }

View File

@@ -67,7 +67,7 @@ func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedM
func NewTelemetryStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]] { func NewTelemetryStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]] {
return factory.MustNewNamedMap( return factory.MustNewNamedMap(
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewFactory()), clickhousetelemetrystore.NewFactory(telemetrystorehook.NewSettingsFactory()),
) )
} }

View File

@@ -6,13 +6,15 @@ import (
"github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/SigNoz/signoz/pkg/factory" "github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/promengine"
"github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrystore"
) )
type provider struct { type provider struct {
settings factory.ScopedProviderSettings settings factory.ScopedProviderSettings
clickHouseConn clickhouse.Conn clickHouseConn clickhouse.Conn
hooks []telemetrystore.TelemetryStoreHook prometheusEngine promengine.PromEngine
hooks []telemetrystore.TelemetryStoreHook
} }
func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config]) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] { func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config]) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
@@ -33,7 +35,7 @@ func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.Telemetr
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hooks ...telemetrystore.TelemetryStoreHook) (telemetrystore.TelemetryStore, error) { func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hooks ...telemetrystore.TelemetryStoreHook) (telemetrystore.TelemetryStore, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/telemetrystore/clickhousetelemetrystore") settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/telemetrystore/clickhousetelemetrystore")
options, err := clickhouse.ParseDSN(config.ClickHouse.DSN) options, err := clickhouse.ParseDSN(config.Clickhouse.DSN)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -46,75 +48,85 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
return nil, err return nil, err
} }
engine, err := promengine.New(settings.Logger(), config.Clickhouse.Prometheus)
if err != nil {
return nil, err
}
return &provider{ return &provider{
settings: settings, settings: settings,
clickHouseConn: chConn, clickHouseConn: chConn,
hooks: hooks, hooks: hooks,
prometheusEngine: engine,
}, nil }, nil
} }
func (p *provider) ClickHouseDB() clickhouse.Conn { func (p *provider) ClickhouseDB() clickhouse.Conn {
return p return p
} }
func (p provider) Close() error { func (p *provider) PrometheusEngine() promengine.PromEngine {
return p.prometheusEngine
}
func (p *provider) Close() error {
return p.clickHouseConn.Close() return p.clickHouseConn.Close()
} }
func (p provider) Ping(ctx context.Context) error { func (p *provider) Ping(ctx context.Context) error {
return p.clickHouseConn.Ping(ctx) return p.clickHouseConn.Ping(ctx)
} }
func (p provider) Stats() driver.Stats { func (p *provider) Stats() driver.Stats {
return p.clickHouseConn.Stats() return p.clickHouseConn.Stats()
} }
func (p provider) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) { func (p *provider) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...) ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
rows, err := p.clickHouseConn.Query(ctx, query, args...) rows, err := p.clickHouseConn.Query(ctx, query, args...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, rows, err) telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, rows, err)
return rows, err return rows, err
} }
func (p provider) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row { func (p *provider) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...) ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
row := p.clickHouseConn.QueryRow(ctx, query, args...) row := p.clickHouseConn.QueryRow(ctx, query, args...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, nil) telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, nil)
return row return row
} }
func (p provider) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error { func (p *provider) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...) ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
err := p.clickHouseConn.Select(ctx, dest, query, args...) err := p.clickHouseConn.Select(ctx, dest, query, args...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err) telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
return err return err
} }
func (p provider) Exec(ctx context.Context, query string, args ...interface{}) error { func (p *provider) Exec(ctx context.Context, query string, args ...interface{}) error {
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...) ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
err := p.clickHouseConn.Exec(ctx, query, args...) err := p.clickHouseConn.Exec(ctx, query, args...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err) telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
return err return err
} }
func (p provider) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error { func (p *provider) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...) ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
err := p.clickHouseConn.AsyncInsert(ctx, query, wait, args...) err := p.clickHouseConn.AsyncInsert(ctx, query, wait, args...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err) telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
return err return err
} }
func (p provider) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) { func (p *provider) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
ctx, query, args := telemetrystore.WrapBeforeQuery(p.hooks, ctx, query) ctx, query, args := telemetrystore.WrapBeforeQuery(p.hooks, ctx, query)
batch, err := p.clickHouseConn.PrepareBatch(ctx, query, opts...) batch, err := p.clickHouseConn.PrepareBatch(ctx, query, opts...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err) telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
return batch, err return batch, err
} }
func (p provider) ServerVersion() (*driver.ServerVersion, error) { func (p *provider) ServerVersion() (*driver.ServerVersion, error) {
return p.clickHouseConn.ServerVersion() return p.clickHouseConn.ServerVersion()
} }
func (p provider) Contributors() []string { func (p *provider) Contributors() []string {
return p.clickHouseConn.Contributors() return p.clickHouseConn.Contributors()
} }

View File

@@ -2,28 +2,36 @@ package telemetrystore
import ( import (
"fmt" "fmt"
"net/url"
"time" "time"
"github.com/SigNoz/signoz/pkg/factory" "github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/promengine"
) )
type Config struct { type Config struct {
// Provider is the provider to use // Provider is the provider to use
Provider string `mapstructure:"provider"` Provider string `mapstructure:"provider"`
// Connection is the connection configuration // Connection is the connection configuration
Connection ConnectionConfig `mapstructure:",squash"` Connection ConnectionConfig `mapstructure:",squash"`
// Clickhouse is the clickhouse configuration // Clickhouse is the clickhouse configuration
ClickHouse ClickHouseConfig `mapstructure:"clickhouse"` Clickhouse ClickhouseConfig `mapstructure:"clickhouse"`
} }
type ConnectionConfig struct { type ConnectionConfig struct {
// MaxOpenConns is the maximum number of open connections to the database. // MaxOpenConns is the maximum number of open connections to the database.
MaxOpenConns int `mapstructure:"max_open_conns"` MaxOpenConns int `mapstructure:"max_open_conns"`
MaxIdleConns int `mapstructure:"max_idle_conns"`
DialTimeout time.Duration `mapstructure:"dial_timeout"` // MaxIdleConns is the maximum number of connections in the idle connection pool.
MaxIdleConns int `mapstructure:"max_idle_conns"`
// DialTimeout is the timeout for dialing a new connection.
DialTimeout time.Duration `mapstructure:"dial_timeout"`
} }
type ClickHouseQuerySettings struct { type QuerySettings struct {
MaxExecutionTime int `mapstructure:"max_execution_time"` MaxExecutionTime int `mapstructure:"max_execution_time"`
MaxExecutionTimeLeaf int `mapstructure:"max_execution_time_leaf"` MaxExecutionTimeLeaf int `mapstructure:"max_execution_time_leaf"`
TimeoutBeforeCheckingExecutionSpeed int `mapstructure:"timeout_before_checking_execution_speed"` TimeoutBeforeCheckingExecutionSpeed int `mapstructure:"timeout_before_checking_execution_speed"`
@@ -31,15 +39,19 @@ type ClickHouseQuerySettings struct {
MaxResultRowsForCHQuery int `mapstructure:"max_result_rows_for_ch_query"` MaxResultRowsForCHQuery int `mapstructure:"max_result_rows_for_ch_query"`
} }
type ClickHouseConfig struct { type ClickhouseConfig struct {
// DSN is the database source name.
DSN string `mapstructure:"dsn"` DSN string `mapstructure:"dsn"`
QuerySettings ClickHouseQuerySettings `mapstructure:"settings"` // QuerySettings is the query settings for clickhouse.
QuerySettings QuerySettings `mapstructure:"settings"`
// Prometheus is the prometheus configuration
Prometheus promengine.Config `mapstructure:"prometheus"`
} }
func NewConfigFactory() factory.ConfigFactory { func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("telemetrystore"), newConfig) return factory.NewConfigFactory(factory.MustNewName("telemetrystore"), newConfig)
} }
func newConfig() factory.Config { func newConfig() factory.Config {
@@ -50,15 +62,39 @@ func newConfig() factory.Config {
MaxIdleConns: 50, MaxIdleConns: 50,
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
}, },
ClickHouse: ClickHouseConfig{ Clickhouse: ClickhouseConfig{
DSN: "tcp://localhost:9000", DSN: "tcp://localhost:9000",
Prometheus: promengine.Config{
RemoteReadConfig: promengine.RemoteReadConfig{
URL: &url.URL{
Scheme: "tcp",
Host: "localhost:9000",
Path: "/signoz_metrics",
},
},
ActiveQueryTrackerConfig: promengine.ActiveQueryTrackerConfig{
Enabled: true,
Path: "",
MaxConcurrent: 20,
},
},
}, },
} }
} }
func (c Config) Validate() error { func (c Config) Validate() error {
if c.Provider != "clickhouse" { dsn, err := url.Parse(c.Clickhouse.DSN)
return fmt.Errorf("provider: %q is not supported", c.Provider) if err != nil {
return err
}
if c.Clickhouse.Prometheus.RemoteReadConfig.URL.Host != dsn.Host {
return fmt.Errorf("mismatch between host in prometheus.remote_read.url %q and clickhouse.dsn %q", c.Clickhouse.Prometheus.RemoteReadConfig.URL.Host, dsn.Host)
}
if c.Clickhouse.Prometheus.RemoteReadConfig.URL.Scheme != dsn.Scheme {
return fmt.Errorf("mismatch between scheme in prometheus.remote_read.url %q and clickhouse.dsn %q", c.Clickhouse.Prometheus.RemoteReadConfig.URL.Scheme, dsn.Scheme)
} }
return nil return nil

View File

@@ -13,7 +13,7 @@ import (
) )
func TestNewWithEnvProvider(t *testing.T) { func TestNewWithEnvProvider(t *testing.T) {
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN", "http://localhost:9000") t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN", "tcp://localhost:9000")
t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS", "60") t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS", "60")
t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS", "150") t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS", "150")
t.Setenv("SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT", "5s") t.Setenv("SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT", "5s")
@@ -33,22 +33,18 @@ func TestNewWithEnvProvider(t *testing.T) {
) )
require.NoError(t, err) require.NoError(t, err)
actual := &Config{} actual := Config{}
err = conf.Unmarshal("telemetrystore", actual) err = conf.Unmarshal("telemetrystore", &actual)
require.NoError(t, err) require.NoError(t, err)
expected := &Config{ assert.NoError(t, actual.Validate())
Provider: "clickhouse",
Connection: ConnectionConfig{ expected := NewConfigFactory().New().(Config)
MaxOpenConns: 150, expected.Provider = "clickhouse"
MaxIdleConns: 60, expected.Connection.MaxOpenConns = 150
DialTimeout: 5 * time.Second, expected.Connection.MaxIdleConns = 60
}, expected.Connection.DialTimeout = 5 * time.Second
ClickHouse: ClickHouseConfig{ expected.Clickhouse.DSN = "tcp://localhost:9000"
DSN: "http://localhost:9000",
},
}
assert.Equal(t, expected, actual) assert.Equal(t, expected, actual)
} }
@@ -74,14 +70,14 @@ func TestNewWithEnvProviderWithQuerySettings(t *testing.T) {
) )
require.NoError(t, err) require.NoError(t, err)
actual := &Config{} actual := Config{}
err = conf.Unmarshal("telemetrystore", actual) err = conf.Unmarshal("telemetrystore", &actual)
require.NoError(t, err) require.NoError(t, err)
expected := &Config{ expected := Config{
ClickHouse: ClickHouseConfig{ Clickhouse: ClickhouseConfig{
QuerySettings: ClickHouseQuerySettings{ QuerySettings: QuerySettings{
MaxExecutionTime: 10, MaxExecutionTime: 10,
MaxExecutionTimeLeaf: 10, MaxExecutionTimeLeaf: 10,
TimeoutBeforeCheckingExecutionSpeed: 10, TimeoutBeforeCheckingExecutionSpeed: 10,
@@ -91,5 +87,5 @@ func TestNewWithEnvProviderWithQuerySettings(t *testing.T) {
}, },
} }
assert.Equal(t, expected.ClickHouse.QuerySettings, actual.ClickHouse.QuerySettings) assert.Equal(t, expected.Clickhouse.QuerySettings, actual.Clickhouse.QuerySettings)
} }

View File

@@ -5,10 +5,12 @@ import (
"github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/SigNoz/signoz/pkg/promengine"
) )
type TelemetryStore interface { type TelemetryStore interface {
ClickHouseDB() clickhouse.Conn ClickhouseDB() clickhouse.Conn
PrometheusEngine() promengine.PromEngine
} }
type TelemetryStoreHook interface { type TelemetryStoreHook interface {

View File

@@ -12,29 +12,20 @@ import (
) )
type provider struct { type provider struct {
settings telemetrystore.ClickHouseQuerySettings settings telemetrystore.QuerySettings
} }
func NewFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] { func NewSettingsFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
return factory.NewProviderFactory(factory.MustNewName("clickhousesettings"), New) return factory.NewProviderFactory(factory.MustNewName("settings"), NewSettings)
} }
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) { func NewSettings(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
return &provider{ return &provider{
settings: config.ClickHouse.QuerySettings, settings: config.Clickhouse.QuerySettings,
}, nil }, nil
} }
func (h *provider) BeforeQuery(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) { func (h *provider) BeforeQuery(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) {
return h.clickHouseSettings(ctx, query, args...)
}
func (h *provider) AfterQuery(ctx context.Context, query string, args []interface{}, rows driver.Rows, err error) {
return
}
// clickHouseSettings adds clickhouse settings to queries
func (h *provider) clickHouseSettings(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) {
settings := clickhouse.Settings{} settings := clickhouse.Settings{}
// Apply default settings // Apply default settings
@@ -73,6 +64,9 @@ func (h *provider) clickHouseSettings(ctx context.Context, query string, args ..
return ctx, query, args return ctx, query, args
} }
func (h *provider) AfterQuery(ctx context.Context, query string, args []interface{}, rows driver.Rows, err error) {
}
func (h *provider) getLogComment(ctx context.Context) string { func (h *provider) getLogComment(ctx context.Context) string {
// Get the key-value pairs from context for log comment // Get the key-value pairs from context for log comment
kv := ctx.Value(common.LogCommentKey) kv := ctx.Value(common.LogCommentKey)

View File

@@ -2,6 +2,9 @@ package telemetrystoretest
import ( import (
"github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2"
"github.com/DATA-DOG/go-sqlmock"
"github.com/SigNoz/signoz/pkg/promengine"
"github.com/SigNoz/signoz/pkg/promengine/promenginetest"
"github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrystore"
cmock "github.com/srikanthccv/ClickHouse-go-mock" cmock "github.com/srikanthccv/ClickHouse-go-mock"
) )
@@ -10,28 +13,38 @@ var _ telemetrystore.TelemetryStore = (*Provider)(nil)
// Provider represents a mock telemetry store provider for testing // Provider represents a mock telemetry store provider for testing
type Provider struct { type Provider struct {
mock cmock.ClickConnMockCommon clickhouseDB cmock.ClickConnMockCommon
engine promengine.PromEngine
} }
// New creates a new mock telemetry store provider // New creates a new mock telemetry store provider
func New() (*Provider, error) { func New(matcher sqlmock.QueryMatcher) (*Provider, error) {
options := &clickhouse.Options{} // Default options clickhouseDB, err := cmock.NewClickHouseWithQueryMatcher(&clickhouse.Options{}, matcher)
mock, err := cmock.NewClickHouseNative(options) if err != nil {
return nil, err
}
engine, err := promenginetest.New()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Provider{ return &Provider{
mock: mock, clickhouseDB: clickhouseDB,
engine: engine,
}, nil }, nil
} }
func (p *Provider) PrometheusEngine() promengine.PromEngine {
return p.engine
}
// ClickhouseDB returns the mock Clickhouse connection // ClickhouseDB returns the mock Clickhouse connection
func (p *Provider) ClickHouseDB() clickhouse.Conn { func (p *Provider) ClickhouseDB() clickhouse.Conn {
return p.mock.(clickhouse.Conn) return p.clickhouseDB.(clickhouse.Conn)
} }
// Mock returns the underlying Clickhouse mock instance for setting expectations // Mock returns the underlying Clickhouse mock instance for setting expectations
func (p *Provider) Mock() cmock.ClickConnMockCommon { func (p *Provider) Mock() cmock.ClickConnMockCommon {
return p.mock return p.clickhouseDB
} }

View File

@@ -3,6 +3,7 @@ package telemetrystoretest
import ( import (
"testing" "testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@@ -19,7 +20,7 @@ func TestNew(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
provider, err := New() provider, err := New(sqlmock.QueryMatcherRegexp)
if tt.wantErr { if tt.wantErr {
assert.Error(t, err) assert.Error(t, err)
assert.Nil(t, provider) assert.Nil(t, provider)
@@ -29,7 +30,8 @@ func TestNew(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, provider) assert.NotNil(t, provider)
assert.NotNil(t, provider.Mock()) assert.NotNil(t, provider.Mock())
assert.NotNil(t, provider.ClickHouseDB()) assert.NotNil(t, provider.ClickhouseDB())
assert.NotNil(t, provider.PrometheusEngine())
}) })
} }
} }