Compare commits
7 Commits
main
...
v0.77.0-51
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51ecd97b70 | ||
|
|
fdf3331c0f | ||
|
|
fb2210d1e0 | ||
|
|
6d5d047162 | ||
|
|
eea07211e0 | ||
|
|
386363b93e | ||
|
|
ec3604c7d5 |
4
Makefile
4
Makefile
@@ -74,6 +74,10 @@ go-run-enterprise: ## Runs the enterprise go backend server
|
||||
--use-logs-new-schema true \
|
||||
--use-trace-new-schema true
|
||||
|
||||
.PHONY: go-test
|
||||
go-test: ## Runs go unit tests
|
||||
@go test -race ./...
|
||||
|
||||
.PHONY: go-run-community
|
||||
go-run-community: ## Runs the community go backend server
|
||||
@SIGNOZ_INSTRUMENTATION_LOGS_LEVEL=debug \
|
||||
|
||||
@@ -72,7 +72,6 @@ sqlstore:
|
||||
# The path to the SQLite database file.
|
||||
path: /var/lib/signoz/signoz.db
|
||||
|
||||
|
||||
##################### APIServer #####################
|
||||
apiserver:
|
||||
timeout:
|
||||
@@ -91,20 +90,30 @@ apiserver:
|
||||
- /api/v1/version
|
||||
- /
|
||||
|
||||
|
||||
##################### TelemetryStore #####################
|
||||
telemetrystore:
|
||||
# Specifies the telemetrystore provider to use.
|
||||
provider: clickhouse
|
||||
# Maximum number of idle connections in the connection pool.
|
||||
max_idle_conns: 50
|
||||
# Maximum number of open connections to the database.
|
||||
max_open_conns: 100
|
||||
# Maximum time to wait for a connection to be established.
|
||||
dial_timeout: 5s
|
||||
# Specifies the telemetrystore provider to use.
|
||||
provider: clickhouse
|
||||
clickhouse:
|
||||
# The DSN to use for ClickHouse.
|
||||
dsn: http://localhost:9000
|
||||
# The DSN to use for clickhouse.
|
||||
dsn: tcp://localhost:9000
|
||||
prometheus:
|
||||
remote_read:
|
||||
# The URL to use for remote read. It must be a clickhouse dsn pointing to the database where metrics is stored.
|
||||
url: tcp://localhost:9000/signoz_metrics
|
||||
active_query_tracker:
|
||||
# Whether to enable the active query tracker.
|
||||
enabled: true
|
||||
# The path to use for the active query tracker.
|
||||
path: ""
|
||||
# The maximum number of concurrent queries.
|
||||
max_concurrent: 20
|
||||
|
||||
##################### Alertmanager #####################
|
||||
alertmanager:
|
||||
@@ -117,7 +126,7 @@ alertmanager:
|
||||
# The poll interval for periodically syncing the alertmanager with the config in the store.
|
||||
poll_interval: 1m
|
||||
# The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself.
|
||||
external_url: http://localhost:9093
|
||||
external_url: http://localhost:8080
|
||||
# The global configuration for the alertmanager. All the exahustive fields can be found in the upstream: https://github.com/prometheus/alertmanager/blob/efa05feffd644ba4accb526e98a8c6545d26a783/config/config.go#L833
|
||||
global:
|
||||
# ResolveTimeout is the time after which an alert is declared resolved if it has not been updated.
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
basechr "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
)
|
||||
|
||||
type ClickhouseReader struct {
|
||||
@@ -20,8 +21,7 @@ type ClickhouseReader struct {
|
||||
|
||||
func NewDataConnector(
|
||||
localDB *sqlx.DB,
|
||||
ch clickhouse.Conn,
|
||||
promConfigPath string,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
lm interfaces.FeatureLookup,
|
||||
cluster string,
|
||||
useLogsNewSchema bool,
|
||||
@@ -29,14 +29,10 @@ func NewDataConnector(
|
||||
fluxIntervalForTraceDetail time.Duration,
|
||||
cache cache.Cache,
|
||||
) *ClickhouseReader {
|
||||
chReader := basechr.NewReader(localDB, ch, promConfigPath, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
|
||||
chReader := basechr.NewReader(localDB, telemetryStore, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
|
||||
return &ClickhouseReader{
|
||||
conn: ch,
|
||||
conn: telemetryStore.ClickhouseDB(),
|
||||
appdb: localDB,
|
||||
ClickHouseReader: chReader,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ClickhouseReader) Start(readerReady chan bool) {
|
||||
r.ClickHouseReader.Start(readerReady)
|
||||
}
|
||||
|
||||
@@ -18,13 +18,13 @@ import (
|
||||
"github.com/SigNoz/signoz/ee/query-service/constants"
|
||||
"github.com/SigNoz/signoz/ee/query-service/dao"
|
||||
"github.com/SigNoz/signoz/ee/query-service/integrations/gateway"
|
||||
"github.com/SigNoz/signoz/ee/query-service/interfaces"
|
||||
"github.com/SigNoz/signoz/ee/query-service/rules"
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager"
|
||||
"github.com/SigNoz/signoz/pkg/http/middleware"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/auth"
|
||||
"github.com/SigNoz/signoz/pkg/signoz"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/web"
|
||||
@@ -49,7 +49,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/healthcheck"
|
||||
baseint "github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||
basemodel "github.com/SigNoz/signoz/pkg/query-service/model"
|
||||
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
|
||||
baserules "github.com/SigNoz/signoz/pkg/query-service/rules"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/telemetry"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||
@@ -137,18 +136,15 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
|
||||
// set license manager as feature flag provider in dao
|
||||
modelDao.SetFlagProvider(lm)
|
||||
readerReady := make(chan bool)
|
||||
|
||||
fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var reader interfaces.DataConnector
|
||||
qb := db.NewDataConnector(
|
||||
reader := db.NewDataConnector(
|
||||
serverOptions.SigNoz.SQLStore.SQLxDB(),
|
||||
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
|
||||
serverOptions.PromConfigPath,
|
||||
serverOptions.SigNoz.TelemetryStore,
|
||||
lm,
|
||||
serverOptions.Cluster,
|
||||
serverOptions.UseLogsNewSchema,
|
||||
@@ -156,8 +152,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
fluxIntervalForTraceDetail,
|
||||
serverOptions.SigNoz.Cache,
|
||||
)
|
||||
go qb.Start(readerReady)
|
||||
reader = qb
|
||||
|
||||
skipConfig := &basemodel.SkipConfig{}
|
||||
if serverOptions.SkipTopLvlOpsPath != "" {
|
||||
@@ -176,9 +170,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
c = cache.NewCache(cacheOpts)
|
||||
}
|
||||
|
||||
<-readerReady
|
||||
rm, err := makeRulesManager(
|
||||
serverOptions.PromConfigPath,
|
||||
serverOptions.RuleRepoURL,
|
||||
serverOptions.SigNoz.SQLStore.SQLxDB(),
|
||||
reader,
|
||||
@@ -189,6 +181,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
serverOptions.UseTraceNewSchema,
|
||||
serverOptions.SigNoz.Alertmanager,
|
||||
serverOptions.SigNoz.SQLStore,
|
||||
serverOptions.SigNoz.TelemetryStore,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -233,7 +226,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
}
|
||||
|
||||
// start the usagemanager
|
||||
usageManager, err := usage.New(modelDao, lm.GetRepo(), serverOptions.SigNoz.TelemetryStore.ClickHouseDB(), serverOptions.Config.TelemetryStore.ClickHouse.DSN)
|
||||
usageManager, err := usage.New(modelDao, lm.GetRepo(), serverOptions.SigNoz.TelemetryStore.ClickhouseDB(), serverOptions.Config.TelemetryStore.Clickhouse.DSN)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -304,7 +297,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
&opAmpModel.AllAgents, agentConfMgr,
|
||||
)
|
||||
|
||||
errorList := qb.PreloadMetricsMetadata(context.Background())
|
||||
errorList := reader.PreloadMetricsMetadata(context.Background())
|
||||
for _, er := range errorList {
|
||||
zap.L().Error("failed to preload metrics metadata", zap.Error(er))
|
||||
}
|
||||
@@ -538,7 +531,6 @@ func (s *Server) Stop() error {
|
||||
}
|
||||
|
||||
func makeRulesManager(
|
||||
promConfigPath,
|
||||
ruleRepoURL string,
|
||||
db *sqlx.DB,
|
||||
ch baseint.Reader,
|
||||
@@ -549,16 +541,11 @@ func makeRulesManager(
|
||||
useTraceNewSchema bool,
|
||||
alertmanager alertmanager.Alertmanager,
|
||||
sqlstore sqlstore.SQLStore,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
) (*baserules.Manager, error) {
|
||||
// create engine
|
||||
pqle, err := pqle.FromConfigPath(promConfigPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create pql engine : %v", err)
|
||||
}
|
||||
|
||||
// create manager opts
|
||||
managerOpts := &baserules.ManagerOptions{
|
||||
PqlEngine: pqle,
|
||||
TelemetryStore: telemetryStore,
|
||||
RepoURL: ruleRepoURL,
|
||||
DBConn: db,
|
||||
Context: context.Background(),
|
||||
|
||||
@@ -7,6 +7,5 @@ import (
|
||||
// Connector defines methods for interaction
|
||||
// with o11y data. for example - clickhouse
|
||||
type DataConnector interface {
|
||||
Start(readerReady chan bool)
|
||||
baseint.Reader
|
||||
}
|
||||
|
||||
@@ -88,6 +88,7 @@ func main() {
|
||||
MaxIdleConns: maxIdleConns,
|
||||
MaxOpenConns: maxOpenConns,
|
||||
DialTimeout: dialTimeout,
|
||||
Config: promConfigPath,
|
||||
})
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to create config", zap.Error(err))
|
||||
@@ -104,7 +105,7 @@ func main() {
|
||||
signoz.NewTelemetryStoreProviderFactories(),
|
||||
)
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to create signoz struct", zap.Error(err))
|
||||
zap.L().Fatal("Failed to create signoz", zap.Error(err))
|
||||
}
|
||||
|
||||
jwtSecret := os.Getenv("SIGNOZ_JWT_SECRET")
|
||||
|
||||
@@ -48,7 +48,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
opts.Rule,
|
||||
opts.Logger,
|
||||
opts.Reader,
|
||||
opts.ManagerOpts.PqlEngine,
|
||||
opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
)
|
||||
|
||||
@@ -145,7 +145,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
|
||||
parsedRule,
|
||||
opts.Logger,
|
||||
opts.Reader,
|
||||
opts.ManagerOpts.PqlEngine,
|
||||
opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
|
||||
baserules.WithSendAlways(),
|
||||
baserules.WithSendUnmatched(),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
|
||||
4
go.mod
4
go.mod
@@ -34,7 +34,6 @@ require (
|
||||
github.com/knadh/koanf/v2 v2.1.1
|
||||
github.com/mailru/easyjson v0.7.7
|
||||
github.com/mattn/go-sqlite3 v1.14.24
|
||||
github.com/oklog/oklog v0.3.2
|
||||
github.com/open-telemetry/opamp-go v0.5.0
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.111.0
|
||||
github.com/opentracing/opentracing-go v1.2.0
|
||||
@@ -93,6 +92,7 @@ require (
|
||||
github.com/armon/go-metrics v0.4.1 // indirect
|
||||
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
|
||||
github.com/aws/aws-sdk-go v1.55.5 // indirect
|
||||
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect
|
||||
github.com/beevik/etree v1.1.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
|
||||
@@ -132,6 +132,7 @@ require (
|
||||
github.com/golang/protobuf v1.5.4 // indirect
|
||||
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
|
||||
github.com/google/btree v1.0.1 // indirect
|
||||
github.com/google/go-cmp v0.6.0 // indirect
|
||||
github.com/google/s2a-go v0.1.8 // indirect
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
|
||||
github.com/gopherjs/gopherjs v1.17.2 // indirect
|
||||
@@ -262,6 +263,7 @@ require (
|
||||
go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
|
||||
go.uber.org/atomic v1.11.0 // indirect
|
||||
go.uber.org/goleak v1.3.0 // indirect
|
||||
golang.org/x/mod v0.22.0 // indirect
|
||||
golang.org/x/net v0.33.0 // indirect
|
||||
golang.org/x/sys v0.29.0 // indirect
|
||||
|
||||
2
go.sum
2
go.sum
@@ -690,8 +690,6 @@ github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnu
|
||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
||||
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
|
||||
github.com/oklog/oklog v0.3.2 h1:wVfs8F+in6nTBMkA7CbRw+zZMIB7nNM825cM1wuzoTk=
|
||||
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
|
||||
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
|
||||
github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
|
||||
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
package instrumentation
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/loghandler"
|
||||
"github.com/go-kit/log"
|
||||
"github.com/go-kit/log/level"
|
||||
)
|
||||
|
||||
func NewLogger(config Config, wrappers ...loghandler.Wrapper) *slog.Logger {
|
||||
@@ -33,3 +38,65 @@ func NewLogger(config Config, wrappers ...loghandler.Wrapper) *slog.Logger {
|
||||
|
||||
return logger
|
||||
}
|
||||
|
||||
var _ log.Logger = (*gokitLogger)(nil)
|
||||
|
||||
type gokitLogger struct {
|
||||
handler slog.Handler
|
||||
messageKey string
|
||||
}
|
||||
|
||||
func NewGoKitLoggerFromSlogHandler(handler slog.Handler, messageKey string) log.Logger {
|
||||
return &gokitLogger{handler, messageKey}
|
||||
}
|
||||
|
||||
func (l *gokitLogger) Log(keyvals ...any) error {
|
||||
var (
|
||||
attrs []slog.Attr
|
||||
message string
|
||||
gkl level.Value
|
||||
)
|
||||
|
||||
for i := 1; i < len(keyvals); i += 2 {
|
||||
// go-kit/log keys don't have to be strings, but slog keys do.
|
||||
// Convert the go-kit key to a string with fmt.Sprint.
|
||||
key, ok := keyvals[i-1].(string)
|
||||
if !ok {
|
||||
key = fmt.Sprint(keyvals[i-1])
|
||||
}
|
||||
|
||||
if l.messageKey != "" && key == l.messageKey {
|
||||
message = fmt.Sprint(keyvals[i])
|
||||
continue
|
||||
}
|
||||
|
||||
if l, ok := keyvals[i].(level.Value); ok {
|
||||
gkl = l
|
||||
continue
|
||||
}
|
||||
|
||||
attrs = append(attrs, slog.Any(key, keyvals[i]))
|
||||
}
|
||||
|
||||
var sl slog.Level
|
||||
if gkl != nil {
|
||||
switch gkl {
|
||||
case level.DebugValue():
|
||||
sl = slog.LevelDebug
|
||||
case level.InfoValue():
|
||||
sl = slog.LevelInfo
|
||||
case level.WarnValue():
|
||||
sl = slog.LevelWarn
|
||||
case level.ErrorValue():
|
||||
sl = slog.LevelError
|
||||
}
|
||||
}
|
||||
|
||||
if !l.handler.Enabled(context.Background(), sl) {
|
||||
return nil
|
||||
}
|
||||
|
||||
r := slog.NewRecord(time.Now(), sl, message, 0)
|
||||
r.AddAttrs(attrs...)
|
||||
return l.handler.Handle(context.Background(), r)
|
||||
}
|
||||
|
||||
18
pkg/promengine/config.go
Normal file
18
pkg/promengine/config.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package promengine
|
||||
|
||||
import "net/url"
|
||||
|
||||
type ActiveQueryTrackerConfig struct {
|
||||
Enabled bool `mapstructure:"enabled"`
|
||||
Path string `mapstructure:"path"`
|
||||
MaxConcurrent int `mapstructure:"max_concurrent"`
|
||||
}
|
||||
|
||||
type RemoteReadConfig struct {
|
||||
URL *url.URL `mapstructure:"url"`
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
RemoteReadConfig RemoteReadConfig `mapstructure:"remote_read"`
|
||||
ActiveQueryTrackerConfig ActiveQueryTrackerConfig `mapstructure:"active_query_tracker"`
|
||||
}
|
||||
87
pkg/promengine/ext.go
Normal file
87
pkg/promengine/ext.go
Normal file
@@ -0,0 +1,87 @@
|
||||
package promengine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation"
|
||||
commoncfg "github.com/prometheus/common/config"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
)
|
||||
|
||||
var _ PromEngine = (*ExtEngine)(nil)
|
||||
|
||||
type ExtEngine struct {
|
||||
engine *promql.Engine
|
||||
fanoutStorage storage.Storage
|
||||
}
|
||||
|
||||
func New(logger *slog.Logger, cfg Config) (*ExtEngine, error) {
|
||||
if logger == nil {
|
||||
return nil, fmt.Errorf("logger is required")
|
||||
}
|
||||
|
||||
gokitLogger := instrumentation.NewGoKitLoggerFromSlogHandler(logger.Handler(), "msg")
|
||||
|
||||
var activeQueryTracker promql.QueryTracker
|
||||
if cfg.ActiveQueryTrackerConfig.Enabled {
|
||||
activeQueryTracker = promql.NewActiveQueryTracker(
|
||||
cfg.ActiveQueryTrackerConfig.Path,
|
||||
cfg.ActiveQueryTrackerConfig.MaxConcurrent,
|
||||
gokitLogger,
|
||||
)
|
||||
}
|
||||
|
||||
engine := promql.NewEngine(promql.EngineOpts{
|
||||
Logger: gokitLogger,
|
||||
Reg: nil,
|
||||
MaxSamples: 50000000,
|
||||
Timeout: time.Duration(2 * time.Minute),
|
||||
ActiveQueryTracker: activeQueryTracker,
|
||||
})
|
||||
|
||||
remoteStorage := remote.NewStorage(
|
||||
gokitLogger,
|
||||
nil,
|
||||
func() (int64, error) { return int64(model.Latest), nil },
|
||||
"",
|
||||
time.Duration(1*time.Minute),
|
||||
nil,
|
||||
false,
|
||||
)
|
||||
fanoutStorage := storage.NewFanout(gokitLogger, remoteStorage)
|
||||
|
||||
config := &config.Config{
|
||||
RemoteReadConfigs: []*config.RemoteReadConfig{
|
||||
{
|
||||
URL: &commoncfg.URL{URL: cfg.RemoteReadConfig.URL},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if err := config.UnmarshalYAML(func(i interface{}) error { return nil }); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := remoteStorage.ApplyConfig(config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ExtEngine{
|
||||
engine: engine,
|
||||
fanoutStorage: fanoutStorage,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (engine *ExtEngine) Engine() *promql.Engine {
|
||||
return engine.engine
|
||||
}
|
||||
|
||||
func (engine *ExtEngine) Storage() storage.Storage {
|
||||
return engine.fanoutStorage
|
||||
}
|
||||
20
pkg/promengine/promengine.go
Normal file
20
pkg/promengine/promengine.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package promengine
|
||||
|
||||
import (
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
type PromEngine interface {
|
||||
// Engine returns the underlying promql engine
|
||||
Engine() *promql.Engine
|
||||
|
||||
// Storage returns the underlying storage
|
||||
Storage() storage.Storage
|
||||
}
|
||||
|
||||
// init initializes the prometheus model with UTF8 validation
|
||||
func init() {
|
||||
model.NameValidationScheme = model.UTF8Validation
|
||||
}
|
||||
46
pkg/promengine/promenginetest/promengine.go
Normal file
46
pkg/promengine/promenginetest/promengine.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package promenginetest
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/promengine"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
var _ promengine.PromEngine = (*Engine)(nil)
|
||||
|
||||
type Engine struct {
|
||||
engine *promql.Engine
|
||||
storage storage.Storage
|
||||
}
|
||||
|
||||
func New(outOfOrderTimeWindow ...int64) (*Engine, error) {
|
||||
engine := promql.NewEngine(promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
MaxSamples: 50000000,
|
||||
Timeout: time.Duration(2 * time.Minute),
|
||||
ActiveQueryTracker: nil,
|
||||
})
|
||||
|
||||
testStorage, err := NewStorageWithError(outOfOrderTimeWindow...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fanoutStorage := storage.NewFanout(nil, testStorage)
|
||||
|
||||
return &Engine{
|
||||
engine: engine,
|
||||
storage: fanoutStorage,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *Engine) Engine() *promql.Engine {
|
||||
return e.engine
|
||||
}
|
||||
|
||||
func (e *Engine) Storage() storage.Storage {
|
||||
return e.storage
|
||||
}
|
||||
76
pkg/promengine/promenginetest/storage.go
Normal file
76
pkg/promengine/promenginetest/storage.go
Normal file
@@ -0,0 +1,76 @@
|
||||
package promenginetest
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
)
|
||||
|
||||
type TestStorage struct {
|
||||
*tsdb.DB
|
||||
exemplarStorage tsdb.ExemplarStorage
|
||||
dir string
|
||||
}
|
||||
|
||||
// NewStorageWithError returns a new TestStorage for user facing tests, which reports
|
||||
// errors directly.
|
||||
func NewStorageWithError(outOfOrderTimeWindow ...int64) (*TestStorage, error) {
|
||||
dir, err := os.MkdirTemp("", "test_storage")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("opening test directory: %w", err)
|
||||
}
|
||||
|
||||
// Tests just load data for a series sequentially. Thus we
|
||||
// need a long appendable window.
|
||||
opts := tsdb.DefaultOptions()
|
||||
opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond)
|
||||
opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond)
|
||||
opts.RetentionDuration = 0
|
||||
opts.EnableNativeHistograms = true
|
||||
|
||||
// Set OutOfOrderTimeWindow if provided, otherwise use default (0)
|
||||
if len(outOfOrderTimeWindow) > 0 {
|
||||
opts.OutOfOrderTimeWindow = outOfOrderTimeWindow[0]
|
||||
} else {
|
||||
opts.OutOfOrderTimeWindow = 0 // Default value is zero
|
||||
}
|
||||
|
||||
db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("opening test storage: %w", err)
|
||||
}
|
||||
reg := prometheus.NewRegistry()
|
||||
eMetrics := tsdb.NewExemplarMetrics(reg)
|
||||
|
||||
es, err := tsdb.NewCircularExemplarStorage(10, eMetrics)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("opening test exemplar storage: %w", err)
|
||||
}
|
||||
|
||||
return &TestStorage{DB: db, exemplarStorage: es, dir: dir}, nil
|
||||
}
|
||||
|
||||
func (s TestStorage) Close() error {
|
||||
if err := s.DB.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.RemoveAll(s.dir)
|
||||
}
|
||||
|
||||
func (s TestStorage) ExemplarAppender() storage.ExemplarAppender {
|
||||
return s
|
||||
}
|
||||
|
||||
func (s TestStorage) ExemplarQueryable() storage.ExemplarQueryable {
|
||||
return s.exemplarStorage
|
||||
}
|
||||
|
||||
func (s TestStorage) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
return ref, s.exemplarStorage.AddExemplar(l, e)
|
||||
}
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"sort"
|
||||
@@ -16,20 +15,16 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/promengine"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model/metrics_explorer"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
|
||||
"github.com/go-kit/log"
|
||||
"github.com/go-kit/log/level"
|
||||
"github.com/google/uuid"
|
||||
"github.com/mailru/easyjson"
|
||||
"github.com/oklog/oklog/pkg/group"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/common/promlog"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
"github.com/prometheus/prometheus/util/stats"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
@@ -38,7 +33,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
promModel "github.com/prometheus/common/model"
|
||||
"go.uber.org/zap"
|
||||
|
||||
queryprogress "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader/query_progress"
|
||||
@@ -120,6 +114,7 @@ var (
|
||||
// SpanWriter for reading spans from ClickHouse
|
||||
type ClickHouseReader struct {
|
||||
db clickhouse.Conn
|
||||
engine promengine.PromEngine
|
||||
localDB *sqlx.DB
|
||||
TraceDB string
|
||||
operationsTable string
|
||||
@@ -138,9 +133,6 @@ type ClickHouseReader struct {
|
||||
logsAttributeKeys string
|
||||
logsResourceKeys string
|
||||
logsTagAttributeTableV2 string
|
||||
queryEngine *promql.Engine
|
||||
remoteStorage *remote.Storage
|
||||
fanoutStorage *storage.Storage
|
||||
queryProgressTracker queryprogress.QueryProgressTracker
|
||||
|
||||
logsTableV2 string
|
||||
@@ -175,8 +167,7 @@ type ClickHouseReader struct {
|
||||
// NewTraceReader returns a TraceReader for the database
|
||||
func NewReader(
|
||||
localDB *sqlx.DB,
|
||||
db driver.Conn,
|
||||
configFile string,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
featureFlag interfaces.FeatureLookup,
|
||||
cluster string,
|
||||
useLogsNewSchema bool,
|
||||
@@ -185,14 +176,13 @@ func NewReader(
|
||||
cache cache.Cache,
|
||||
) *ClickHouseReader {
|
||||
options := NewOptions(primaryNamespace, archiveNamespace)
|
||||
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
|
||||
return NewReaderFromClickhouseConnection(options, localDB, telemetryStore, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
|
||||
}
|
||||
|
||||
func NewReaderFromClickhouseConnection(
|
||||
db driver.Conn,
|
||||
options *Options,
|
||||
localDB *sqlx.DB,
|
||||
configFile string,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
featureFlag interfaces.FeatureLookup,
|
||||
cluster string,
|
||||
useLogsNewSchema bool,
|
||||
@@ -215,7 +205,8 @@ func NewReaderFromClickhouseConnection(
|
||||
}
|
||||
|
||||
return &ClickHouseReader{
|
||||
db: db,
|
||||
db: telemetryStore.ClickhouseDB(),
|
||||
engine: telemetryStore.PrometheusEngine(),
|
||||
localDB: localDB,
|
||||
TraceDB: options.primary.TraceDB,
|
||||
operationsTable: options.primary.OperationsTable,
|
||||
@@ -235,7 +226,6 @@ func NewReaderFromClickhouseConnection(
|
||||
logsResourceKeys: options.primary.LogsResourceKeysTable,
|
||||
logsTagAttributeTableV2: options.primary.LogsTagAttributeTableV2,
|
||||
liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds,
|
||||
promConfigFile: configFile,
|
||||
featureFlags: featureFlag,
|
||||
cluster: cluster,
|
||||
queryProgressTracker: queryprogress.NewQueryProgressTracker(),
|
||||
@@ -262,154 +252,8 @@ func NewReaderFromClickhouseConnection(
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) Start(readerReady chan bool) {
|
||||
logLevel := promlog.AllowedLevel{}
|
||||
logLevel.Set("debug")
|
||||
allowedFormat := promlog.AllowedFormat{}
|
||||
allowedFormat.Set("logfmt")
|
||||
|
||||
promlogConfig := promlog.Config{
|
||||
Level: &logLevel,
|
||||
Format: &allowedFormat,
|
||||
}
|
||||
|
||||
logger := promlog.New(&promlogConfig)
|
||||
|
||||
startTime := func() (int64, error) {
|
||||
return int64(promModel.Latest), nil
|
||||
}
|
||||
|
||||
remoteStorage := remote.NewStorage(
|
||||
log.With(logger, "component", "remote"),
|
||||
nil,
|
||||
startTime,
|
||||
"",
|
||||
time.Duration(1*time.Minute),
|
||||
nil,
|
||||
false,
|
||||
)
|
||||
|
||||
cfg := struct {
|
||||
configFile string
|
||||
|
||||
localStoragePath string
|
||||
lookbackDelta promModel.Duration
|
||||
webTimeout promModel.Duration
|
||||
queryTimeout promModel.Duration
|
||||
queryConcurrency int
|
||||
queryMaxSamples int
|
||||
RemoteFlushDeadline promModel.Duration
|
||||
|
||||
prometheusURL string
|
||||
|
||||
logLevel promlog.AllowedLevel
|
||||
}{
|
||||
configFile: r.promConfigFile,
|
||||
}
|
||||
|
||||
fanoutStorage := storage.NewFanout(logger, remoteStorage)
|
||||
|
||||
opts := promql.EngineOpts{
|
||||
Logger: log.With(logger, "component", "query engine"),
|
||||
Reg: nil,
|
||||
MaxSamples: 50000000,
|
||||
Timeout: time.Duration(2 * time.Minute),
|
||||
ActiveQueryTracker: promql.NewActiveQueryTracker(
|
||||
"",
|
||||
20,
|
||||
log.With(logger, "component", "activeQueryTracker"),
|
||||
),
|
||||
}
|
||||
|
||||
queryEngine := promql.NewEngine(opts)
|
||||
|
||||
reloaders := []func(cfg *config.Config) error{
|
||||
remoteStorage.ApplyConfig,
|
||||
}
|
||||
|
||||
// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
|
||||
type closeOnce struct {
|
||||
C chan struct{}
|
||||
once sync.Once
|
||||
Close func()
|
||||
}
|
||||
// Wait until the server is ready to handle reloading.
|
||||
reloadReady := &closeOnce{
|
||||
C: make(chan struct{}),
|
||||
}
|
||||
reloadReady.Close = func() {
|
||||
reloadReady.once.Do(func() {
|
||||
close(reloadReady.C)
|
||||
})
|
||||
}
|
||||
|
||||
var g group.Group
|
||||
{
|
||||
// Initial configuration loading.
|
||||
cancel := make(chan struct{})
|
||||
g.Add(
|
||||
func() error {
|
||||
var err error
|
||||
r.promConfig, err = reloadConfig(cfg.configFile, logger, reloaders...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error loading config from %q: %s", cfg.configFile, err)
|
||||
}
|
||||
|
||||
reloadReady.Close()
|
||||
|
||||
<-cancel
|
||||
|
||||
return nil
|
||||
},
|
||||
func(err error) {
|
||||
close(cancel)
|
||||
},
|
||||
)
|
||||
}
|
||||
r.queryEngine = queryEngine
|
||||
r.remoteStorage = remoteStorage
|
||||
r.fanoutStorage = &fanoutStorage
|
||||
readerReady <- true
|
||||
|
||||
if err := g.Run(); err != nil {
|
||||
level.Error(logger).Log("err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetQueryEngine() *promql.Engine {
|
||||
return r.queryEngine
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetFanoutStorage() *storage.Storage {
|
||||
return r.fanoutStorage
|
||||
}
|
||||
|
||||
func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (promConfig *config.Config, err error) {
|
||||
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
|
||||
|
||||
conf, err := config.LoadFile(filename, false, false, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't load configuration (--config.file=%q): %v", filename, err)
|
||||
}
|
||||
|
||||
failed := false
|
||||
for _, rl := range rls {
|
||||
if err := rl(conf); err != nil {
|
||||
level.Error(logger).Log("msg", "Failed to apply configuration", "err", err)
|
||||
failed = true
|
||||
}
|
||||
}
|
||||
if failed {
|
||||
return nil, fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename)
|
||||
}
|
||||
level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename)
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
|
||||
qry, err := r.queryEngine.NewInstantQuery(ctx, r.remoteStorage, nil, queryParams.Query, queryParams.Time)
|
||||
qry, err := r.engine.Engine().NewInstantQuery(ctx, r.engine.Storage(), nil, queryParams.Query, queryParams.Time)
|
||||
if err != nil {
|
||||
return nil, nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
}
|
||||
@@ -428,7 +272,7 @@ func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, que
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
|
||||
qry, err := r.queryEngine.NewRangeQuery(ctx, r.remoteStorage, nil, query.Query, query.Start, query.End, query.Step)
|
||||
qry, err := r.engine.Engine().NewRangeQuery(ctx, r.engine.Storage(), nil, query.Query, query.Start, query.End, query.Step)
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
|
||||
@@ -5,11 +5,11 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"regexp"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
|
||||
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/querycache"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@@ -1131,20 +1132,6 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
type regexMatcher struct {
|
||||
}
|
||||
|
||||
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
|
||||
re, err := regexp.Compile(expectedSQL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !re.MatchString(actualSQL) {
|
||||
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
params := &v3.QueryRangeParamsV3{
|
||||
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||
@@ -1358,8 +1345,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Setup mock
|
||||
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, ®exMatcher{})
|
||||
require.NoError(t, err, "Failed to create ClickHouse mock")
|
||||
telemetryStore, err := telemetrystoretest.New(sqlmock.QueryMatcherRegexp)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Configure mock responses
|
||||
for _, response := range tc.queryResponses {
|
||||
@@ -1368,7 +1355,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
values = append(values, []any{&ts, &testName})
|
||||
}
|
||||
// if len(values) > 0 {
|
||||
mock.ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||
cmock.NewRows(cols, values),
|
||||
)
|
||||
// }
|
||||
@@ -1376,10 +1363,9 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
|
||||
// Create reader and querier
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||
mock,
|
||||
options,
|
||||
nil,
|
||||
"",
|
||||
telemetryStore,
|
||||
featureManager.StartManager(),
|
||||
"",
|
||||
true,
|
||||
@@ -1429,7 +1415,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
}
|
||||
|
||||
// Verify mock expectations
|
||||
err = mock.ExpectationsWereMet()
|
||||
err = telemetryStore.Mock().ExpectationsWereMet()
|
||||
require.NoError(t, err, "Mock expectations were not met")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -5,11 +5,11 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"regexp"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
|
||||
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/querycache"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@@ -1185,20 +1186,6 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
type regexMatcher struct {
|
||||
}
|
||||
|
||||
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
|
||||
re, err := regexp.Compile(expectedSQL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !re.MatchString(actualSQL) {
|
||||
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
params := &v3.QueryRangeParamsV3{
|
||||
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||
@@ -1412,8 +1399,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Setup mock
|
||||
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, ®exMatcher{})
|
||||
require.NoError(t, err, "Failed to create ClickHouse mock")
|
||||
telemetryStore, err := telemetrystoretest.New(sqlmock.QueryMatcherRegexp)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Configure mock responses
|
||||
for _, response := range tc.queryResponses {
|
||||
@@ -1422,7 +1409,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
values = append(values, []any{&ts, &testName})
|
||||
}
|
||||
// if len(values) > 0 {
|
||||
mock.ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||
cmock.NewRows(cols, values),
|
||||
)
|
||||
// }
|
||||
@@ -1430,10 +1417,9 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
|
||||
// Create reader and querier
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||
mock,
|
||||
options,
|
||||
nil,
|
||||
"",
|
||||
telemetryStore,
|
||||
featureManager.StartManager(),
|
||||
"",
|
||||
true,
|
||||
@@ -1483,7 +1469,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
}
|
||||
|
||||
// Verify mock expectations
|
||||
err = mock.ExpectationsWereMet()
|
||||
err = telemetryStore.Mock().ExpectationsWereMet()
|
||||
require.NoError(t, err, "Mock expectations were not met")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/preferences"
|
||||
"github.com/SigNoz/signoz/pkg/signoz"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/web"
|
||||
@@ -40,7 +41,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/healthcheck"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/rules"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/telemetry"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||
@@ -119,10 +119,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
clickhouseReader := clickhouseReader.NewReader(
|
||||
reader := clickhouseReader.NewReader(
|
||||
serverOptions.SigNoz.SQLStore.SQLxDB(),
|
||||
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
|
||||
serverOptions.PromConfigPath,
|
||||
serverOptions.SigNoz.TelemetryStore,
|
||||
fm,
|
||||
serverOptions.Cluster,
|
||||
serverOptions.UseLogsNewSchema,
|
||||
@@ -130,8 +129,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
fluxIntervalForTraceDetail,
|
||||
serverOptions.SigNoz.Cache,
|
||||
)
|
||||
go clickhouseReader.Start(readerReady)
|
||||
reader := clickhouseReader
|
||||
|
||||
skipConfig := &model.SkipConfig{}
|
||||
if serverOptions.SkipTopLvlOpsPath != "" {
|
||||
@@ -162,6 +159,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
serverOptions.UseLogsNewSchema,
|
||||
serverOptions.UseTraceNewSchema,
|
||||
serverOptions.SigNoz.SQLStore,
|
||||
serverOptions.SigNoz.TelemetryStore,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -491,17 +489,11 @@ func makeRulesManager(
|
||||
useLogsNewSchema bool,
|
||||
useTraceNewSchema bool,
|
||||
sqlstore sqlstore.SQLStore,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
) (*rules.Manager, error) {
|
||||
|
||||
// create engine
|
||||
pqle, err := pqle.FromReader(ch)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create pql engine : %v", err)
|
||||
}
|
||||
|
||||
// create manager opts
|
||||
managerOpts := &rules.ManagerOptions{
|
||||
PqlEngine: pqle,
|
||||
TelemetryStore: telemetryStore,
|
||||
RepoURL: ruleRepoURL,
|
||||
DBConn: db,
|
||||
Context: context.Background(),
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/querycache"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/stats"
|
||||
)
|
||||
|
||||
@@ -86,10 +85,6 @@ type Reader interface {
|
||||
req *v3.QBFilterSuggestionsRequest,
|
||||
) (*v3.QBFilterSuggestionsResponse, *model.ApiError)
|
||||
|
||||
// Connection needed for rules, not ideal but required
|
||||
GetQueryEngine() *promql.Engine
|
||||
GetFanoutStorage() *storage.Storage
|
||||
|
||||
QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error)
|
||||
CheckClickHouse(ctx context.Context) error
|
||||
|
||||
|
||||
@@ -86,6 +86,7 @@ func main() {
|
||||
MaxIdleConns: maxIdleConns,
|
||||
MaxOpenConns: maxOpenConns,
|
||||
DialTimeout: dialTimeout,
|
||||
Config: promConfigPath,
|
||||
})
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to create config", zap.Error(err))
|
||||
@@ -102,7 +103,7 @@ func main() {
|
||||
signoz.NewTelemetryStoreProviderFactories(),
|
||||
)
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to create signoz struct", zap.Error(err))
|
||||
zap.L().Fatal("Failed to create signoz", zap.Error(err))
|
||||
}
|
||||
|
||||
// Read the jwt secret key
|
||||
|
||||
@@ -1,126 +0,0 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||
"github.com/go-kit/log"
|
||||
pmodel "github.com/prometheus/common/model"
|
||||
"github.com/prometheus/common/promlog"
|
||||
pconfig "github.com/prometheus/prometheus/config"
|
||||
pql "github.com/prometheus/prometheus/promql"
|
||||
pstorage "github.com/prometheus/prometheus/storage"
|
||||
premote "github.com/prometheus/prometheus/storage/remote"
|
||||
)
|
||||
|
||||
type PqlEngine struct {
|
||||
engine *pql.Engine
|
||||
fanoutStorage pstorage.Storage
|
||||
}
|
||||
|
||||
func FromConfigPath(promConfigPath string) (*PqlEngine, error) {
|
||||
// load storage path
|
||||
c, err := pconfig.LoadFile(promConfigPath, false, false, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't load configuration (--config.file=%q): %v", promConfigPath, err)
|
||||
}
|
||||
|
||||
return NewPqlEngine(c)
|
||||
}
|
||||
|
||||
func FromReader(ch interfaces.Reader) (*PqlEngine, error) {
|
||||
return &PqlEngine{
|
||||
engine: ch.GetQueryEngine(),
|
||||
fanoutStorage: *ch.GetFanoutStorage(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewPqlEngine(config *pconfig.Config) (*PqlEngine, error) {
|
||||
|
||||
logLevel := promlog.AllowedLevel{}
|
||||
_ = logLevel.Set("debug")
|
||||
|
||||
allowedFormat := promlog.AllowedFormat{}
|
||||
_ = allowedFormat.Set("logfmt")
|
||||
|
||||
promlogConfig := promlog.Config{
|
||||
Level: &logLevel,
|
||||
Format: &allowedFormat,
|
||||
}
|
||||
|
||||
logger := promlog.New(&promlogConfig)
|
||||
|
||||
opts := pql.EngineOpts{
|
||||
Logger: log.With(logger, "component", "promql evaluator"),
|
||||
Reg: nil,
|
||||
MaxSamples: 50000000,
|
||||
Timeout: time.Duration(2 * time.Minute),
|
||||
ActiveQueryTracker: pql.NewActiveQueryTracker(
|
||||
"",
|
||||
20,
|
||||
logger,
|
||||
),
|
||||
}
|
||||
|
||||
e := pql.NewEngine(opts)
|
||||
startTime := func() (int64, error) {
|
||||
return int64(pmodel.Latest), nil
|
||||
}
|
||||
|
||||
remoteStorage := premote.NewStorage(
|
||||
log.With(logger, "component", "remote"),
|
||||
nil,
|
||||
startTime,
|
||||
"",
|
||||
time.Duration(1*time.Minute),
|
||||
nil,
|
||||
false,
|
||||
)
|
||||
fanoutStorage := pstorage.NewFanout(logger, remoteStorage)
|
||||
|
||||
_ = remoteStorage.ApplyConfig(config)
|
||||
|
||||
return &PqlEngine{
|
||||
engine: e,
|
||||
fanoutStorage: fanoutStorage,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *PqlEngine) RunAlertQuery(ctx context.Context, qs string, start, end time.Time, interval time.Duration) (pql.Matrix, error) {
|
||||
q, err := p.engine.NewRangeQuery(ctx, p.fanoutStorage, nil, qs, start, end, interval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res := q.Exec(ctx)
|
||||
|
||||
if res.Err != nil {
|
||||
return nil, res.Err
|
||||
}
|
||||
|
||||
switch typ := res.Value.(type) {
|
||||
case pql.Vector:
|
||||
series := make([]pql.Series, 0, len(typ))
|
||||
value := res.Value.(pql.Vector)
|
||||
for _, smpl := range value {
|
||||
series = append(series, pql.Series{
|
||||
Metric: smpl.Metric,
|
||||
Floats: []pql.FPoint{{T: smpl.T, F: smpl.F}},
|
||||
})
|
||||
}
|
||||
return series, nil
|
||||
case pql.Scalar:
|
||||
value := res.Value.(pql.Scalar)
|
||||
series := make([]pql.Series, 0, 1)
|
||||
series = append(series, pql.Series{
|
||||
Floats: []pql.FPoint{{T: value.T, F: value.V}},
|
||||
})
|
||||
return series, nil
|
||||
case pql.Matrix:
|
||||
return res.Value.(pql.Matrix), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("rule result is not a vector or scalar")
|
||||
}
|
||||
}
|
||||
@@ -21,9 +21,9 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/cache"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/telemetry"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
)
|
||||
@@ -76,8 +76,7 @@ func prepareTaskName(ruleId interface{}) string {
|
||||
|
||||
// ManagerOptions bundles options for the Manager.
|
||||
type ManagerOptions struct {
|
||||
PqlEngine *pqle.PqlEngine
|
||||
|
||||
TelemetryStore telemetrystore.TelemetryStore
|
||||
// RepoURL is used to generate a backlink in sent alert messages
|
||||
RepoURL string
|
||||
|
||||
@@ -180,7 +179,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
opts.Rule,
|
||||
opts.Logger,
|
||||
opts.Reader,
|
||||
opts.ManagerOpts.PqlEngine,
|
||||
opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
|
||||
WithSQLStore(opts.SQLStore),
|
||||
)
|
||||
|
||||
|
||||
@@ -8,21 +8,22 @@ import (
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/promengine"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/formatter"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
|
||||
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/times"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
pql "github.com/prometheus/prometheus/promql"
|
||||
yaml "gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
type PromRule struct {
|
||||
*BaseRule
|
||||
pqlEngine *pqle.PqlEngine
|
||||
pqlEngine promengine.PromEngine
|
||||
}
|
||||
|
||||
func NewPromRule(
|
||||
@@ -30,7 +31,7 @@ func NewPromRule(
|
||||
postableRule *PostableRule,
|
||||
logger *zap.Logger,
|
||||
reader interfaces.Reader,
|
||||
pqlEngine *pqle.PqlEngine,
|
||||
pqlEngine promengine.PromEngine,
|
||||
opts ...RuleOption,
|
||||
) (*PromRule, error) {
|
||||
|
||||
@@ -108,7 +109,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
|
||||
return nil, err
|
||||
}
|
||||
zap.L().Info("evaluating promql query", zap.String("name", r.Name()), zap.String("query", q))
|
||||
res, err := r.pqlEngine.RunAlertQuery(ctx, q, start, end, interval)
|
||||
res, err := r.RunAlertQuery(ctx, q, start, end, interval)
|
||||
if err != nil {
|
||||
r.SetHealth(HealthBad)
|
||||
r.SetLastError(err)
|
||||
@@ -306,6 +307,43 @@ func (r *PromRule) String() string {
|
||||
return string(byt)
|
||||
}
|
||||
|
||||
func (r *PromRule) RunAlertQuery(ctx context.Context, qs string, start, end time.Time, interval time.Duration) (pql.Matrix, error) {
|
||||
q, err := r.pqlEngine.Engine().NewRangeQuery(ctx, r.pqlEngine.Storage(), nil, qs, start, end, interval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res := q.Exec(ctx)
|
||||
|
||||
if res.Err != nil {
|
||||
return nil, res.Err
|
||||
}
|
||||
|
||||
switch typ := res.Value.(type) {
|
||||
case pql.Vector:
|
||||
series := make([]pql.Series, 0, len(typ))
|
||||
value := res.Value.(pql.Vector)
|
||||
for _, smpl := range value {
|
||||
series = append(series, pql.Series{
|
||||
Metric: smpl.Metric,
|
||||
Floats: []pql.FPoint{{T: smpl.T, F: smpl.F}},
|
||||
})
|
||||
}
|
||||
return series, nil
|
||||
case pql.Scalar:
|
||||
value := res.Value.(pql.Scalar)
|
||||
series := make([]pql.Series, 0, 1)
|
||||
series = append(series, pql.Series{
|
||||
Floats: []pql.FPoint{{T: value.T, F: value.V}},
|
||||
})
|
||||
return series, nil
|
||||
case pql.Matrix:
|
||||
return res.Value.(pql.Matrix), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("rule result is not a vector or scalar")
|
||||
}
|
||||
}
|
||||
|
||||
func toCommonSeries(series promql.Series) v3.Series {
|
||||
commonSeries := v3.Series{
|
||||
Labels: make(map[string]string),
|
||||
|
||||
@@ -68,7 +68,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
|
||||
parsedRule,
|
||||
opts.Logger,
|
||||
opts.Reader,
|
||||
opts.ManagerOpts.PqlEngine,
|
||||
opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
|
||||
WithSendAlways(),
|
||||
WithSendUnmatched(),
|
||||
WithSQLStore(opts.SQLStore),
|
||||
|
||||
@@ -3,19 +3,22 @@ package rules
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
"github.com/SigNoz/signoz/pkg/cache/memorycache"
|
||||
"github.com/SigNoz/signoz/pkg/factory/factorytest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
"github.com/SigNoz/signoz/pkg/cache/memorycache"
|
||||
"github.com/SigNoz/signoz/pkg/factory/factorytest"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/common"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/featureManager"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
)
|
||||
@@ -1152,10 +1155,8 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
||||
},
|
||||
}
|
||||
fm := featureManager.StartManager()
|
||||
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{})
|
||||
if err != nil {
|
||||
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
|
||||
}
|
||||
telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
|
||||
require.NoError(t, err)
|
||||
|
||||
cols := make([]cmock.ColumnType, 0)
|
||||
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"})
|
||||
@@ -1227,11 +1228,11 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
||||
|
||||
for idx, c := range cases {
|
||||
rows := cmock.NewRows(cols, c.values)
|
||||
mock.ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
|
||||
telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
|
||||
// We are testing the eval logic after the query is run
|
||||
// so we don't care about the query string here
|
||||
queryString := "SELECT any"
|
||||
mock.
|
||||
telemetryStore.Mock().
|
||||
ExpectQuery(queryString).
|
||||
WillReturnRows(rows)
|
||||
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
|
||||
@@ -1246,7 +1247,8 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), readerCache)
|
||||
require.NoError(t, err)
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), readerCache)
|
||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||
"signoz_calls_total": {
|
||||
@@ -1305,10 +1307,8 @@ func TestThresholdRuleNoData(t *testing.T) {
|
||||
},
|
||||
}
|
||||
fm := featureManager.StartManager()
|
||||
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{})
|
||||
if err != nil {
|
||||
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
|
||||
}
|
||||
telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
|
||||
require.NoError(t, err)
|
||||
|
||||
cols := make([]cmock.ColumnType, 0)
|
||||
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"})
|
||||
@@ -1328,12 +1328,12 @@ func TestThresholdRuleNoData(t *testing.T) {
|
||||
for idx, c := range cases {
|
||||
rows := cmock.NewRows(cols, c.values)
|
||||
|
||||
mock.ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
|
||||
telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
|
||||
|
||||
// We are testing the eval logic after the query is run
|
||||
// so we don't care about the query string here
|
||||
queryString := "SELECT any"
|
||||
mock.
|
||||
telemetryStore.Mock().
|
||||
ExpectQuery(queryString).
|
||||
WillReturnRows(rows)
|
||||
var target float64 = 0
|
||||
@@ -1346,7 +1346,7 @@ func TestThresholdRuleNoData(t *testing.T) {
|
||||
}
|
||||
readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), readerCache)
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), readerCache)
|
||||
|
||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||
@@ -1410,10 +1410,8 @@ func TestThresholdRuleTracesLink(t *testing.T) {
|
||||
},
|
||||
}
|
||||
fm := featureManager.StartManager()
|
||||
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{})
|
||||
if err != nil {
|
||||
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
|
||||
}
|
||||
telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
|
||||
require.NoError(t, err)
|
||||
|
||||
metaCols := make([]cmock.ColumnType, 0)
|
||||
metaCols = append(metaCols, cmock.ColumnType{Name: "DISTINCT(tagKey)", Type: "String"})
|
||||
@@ -1428,11 +1426,11 @@ func TestThresholdRuleTracesLink(t *testing.T) {
|
||||
|
||||
for idx, c := range testCases {
|
||||
metaRows := cmock.NewRows(metaCols, c.metaValues)
|
||||
mock.
|
||||
telemetryStore.Mock().
|
||||
ExpectQuery("SELECT DISTINCT(tagKey), tagType, dataType FROM archiveNamespace.span_attributes_keys").
|
||||
WillReturnRows(metaRows)
|
||||
|
||||
mock.
|
||||
telemetryStore.Mock().
|
||||
ExpectSelect("SHOW CREATE TABLE signoz_traces.distributed_signoz_index_v3").WillReturnRows(&cmock.Rows{})
|
||||
|
||||
rows := cmock.NewRows(cols, c.values)
|
||||
@@ -1440,7 +1438,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
|
||||
// We are testing the eval logic after the query is run
|
||||
// so we don't care about the query string here
|
||||
queryString := "SELECT any"
|
||||
mock.
|
||||
telemetryStore.Mock().
|
||||
ExpectQuery(queryString).
|
||||
WillReturnRows(rows)
|
||||
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
|
||||
@@ -1454,7 +1452,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
|
||||
}
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), nil)
|
||||
|
||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||
@@ -1523,10 +1521,8 @@ func TestThresholdRuleLogsLink(t *testing.T) {
|
||||
},
|
||||
}
|
||||
fm := featureManager.StartManager()
|
||||
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{})
|
||||
if err != nil {
|
||||
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
|
||||
}
|
||||
telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
|
||||
require.NoError(t, err)
|
||||
|
||||
attrMetaCols := make([]cmock.ColumnType, 0)
|
||||
attrMetaCols = append(attrMetaCols, cmock.ColumnType{Name: "name", Type: "String"})
|
||||
@@ -1546,17 +1542,17 @@ func TestThresholdRuleLogsLink(t *testing.T) {
|
||||
|
||||
for idx, c := range testCases {
|
||||
attrMetaRows := cmock.NewRows(attrMetaCols, c.attrMetaValues)
|
||||
mock.
|
||||
telemetryStore.Mock().
|
||||
ExpectSelect("SELECT DISTINCT name, datatype from signoz_logs.distributed_logs_attribute_keys group by name, datatype").
|
||||
WillReturnRows(attrMetaRows)
|
||||
|
||||
resourceMetaRows := cmock.NewRows(resourceMetaCols, c.resourceMetaValues)
|
||||
mock.
|
||||
telemetryStore.Mock().
|
||||
ExpectSelect("SELECT DISTINCT name, datatype from signoz_logs.distributed_logs_resource_keys group by name, datatype").
|
||||
WillReturnRows(resourceMetaRows)
|
||||
|
||||
createTableRows := cmock.NewRows(createTableCols, c.createTableValues)
|
||||
mock.
|
||||
telemetryStore.Mock().
|
||||
ExpectSelect("SHOW CREATE TABLE signoz_logs.logs").
|
||||
WillReturnRows(createTableRows)
|
||||
|
||||
@@ -1565,7 +1561,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
|
||||
// We are testing the eval logic after the query is run
|
||||
// so we don't care about the query string here
|
||||
queryString := "SELECT any"
|
||||
mock.
|
||||
telemetryStore.Mock().
|
||||
ExpectQuery(queryString).
|
||||
WillReturnRows(rows)
|
||||
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
|
||||
@@ -1579,7 +1575,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
|
||||
}
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), nil)
|
||||
|
||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/dao"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/google/uuid"
|
||||
@@ -39,14 +40,14 @@ func NewMockClickhouseReader(
|
||||
) {
|
||||
require.NotNil(t, testDB)
|
||||
|
||||
mockDB, err := mockhouse.NewClickHouseWithQueryMatcher(nil, sqlmock.QueryMatcherRegexp)
|
||||
telemetryStore, err := telemetrystoretest.New(sqlmock.QueryMatcherRegexp)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Nil(t, err, "could not init mock clickhouse")
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||
mockDB,
|
||||
clickhouseReader.NewOptions("", ""),
|
||||
testDB,
|
||||
"",
|
||||
telemetryStore,
|
||||
featureFlags,
|
||||
"",
|
||||
true,
|
||||
@@ -55,7 +56,7 @@ func NewMockClickhouseReader(
|
||||
nil,
|
||||
)
|
||||
|
||||
return reader, mockDB
|
||||
return reader, telemetryStore.Mock()
|
||||
}
|
||||
|
||||
func addLogsQueryExpectation(
|
||||
|
||||
@@ -20,6 +20,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/version"
|
||||
"github.com/SigNoz/signoz/pkg/web"
|
||||
"github.com/go-kit/log"
|
||||
promconfig "github.com/prometheus/prometheus/config"
|
||||
)
|
||||
|
||||
// Config defines the entire input configuration of signoz.
|
||||
@@ -49,7 +51,7 @@ type Config struct {
|
||||
APIServer apiserver.Config `mapstructure:"apiserver"`
|
||||
|
||||
// TelemetryStore config
|
||||
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"`
|
||||
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore" yaml:"telemetrystore"`
|
||||
|
||||
// Alertmanager config
|
||||
Alertmanager alertmanager.Config `mapstructure:"alertmanager" yaml:"alertmanager"`
|
||||
@@ -61,6 +63,7 @@ type DeprecatedFlags struct {
|
||||
MaxIdleConns int
|
||||
MaxOpenConns int
|
||||
DialTimeout time.Duration
|
||||
Config string
|
||||
}
|
||||
|
||||
func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprecatedFlags DeprecatedFlags) (Config, error) {
|
||||
@@ -145,7 +148,7 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
|
||||
|
||||
if os.Getenv("ClickHouseUrl") != "" {
|
||||
fmt.Println("[Deprecated] env ClickHouseUrl is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN instead.")
|
||||
config.TelemetryStore.ClickHouse.DSN = os.Getenv("ClickHouseUrl")
|
||||
config.TelemetryStore.Clickhouse.DSN = os.Getenv("ClickHouseUrl")
|
||||
}
|
||||
|
||||
if deprecatedFlags.MaxIdleConns != 50 {
|
||||
@@ -176,4 +179,18 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
|
||||
if os.Getenv("ALERTMANAGER_API_CHANNEL_PATH") != "" {
|
||||
fmt.Println("[Deprecated] env ALERTMANAGER_API_CHANNEL_PATH is deprecated and scheduled for complete removal.")
|
||||
}
|
||||
|
||||
if deprecatedFlags.Config != "" {
|
||||
fmt.Println("[Deprecated] flag --config is deprecated for passing prometheus config. Please use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_PROMETHEUS_REMOTE__READ_URL instead. The flag will be used for passing the entire SigNoz config. More details can be found at https://github.com/SigNoz/signoz/issues/6805.")
|
||||
cfg, err := promconfig.LoadFile(deprecatedFlags.Config, false, false, log.NewNopLogger())
|
||||
if err != nil {
|
||||
fmt.Println("Error parsing config, using value set in SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_PROMETHEUS_REMOTE__READ_URL")
|
||||
} else {
|
||||
if len(cfg.RemoteReadConfigs) != 1 {
|
||||
fmt.Println("Error finding remote read config, using value set in SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_PROMETHEUS_REMOTE__READ_URL")
|
||||
} else {
|
||||
config.TelemetryStore.Clickhouse.Prometheus.RemoteReadConfig.URL = cfg.RemoteReadConfigs[0].URL.URL
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,7 +67,7 @@ func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedM
|
||||
|
||||
func NewTelemetryStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewFactory()),
|
||||
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewSettingsFactory()),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -6,13 +6,15 @@ import (
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/promengine"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
settings factory.ScopedProviderSettings
|
||||
clickHouseConn clickhouse.Conn
|
||||
hooks []telemetrystore.TelemetryStoreHook
|
||||
settings factory.ScopedProviderSettings
|
||||
clickHouseConn clickhouse.Conn
|
||||
prometheusEngine promengine.PromEngine
|
||||
hooks []telemetrystore.TelemetryStoreHook
|
||||
}
|
||||
|
||||
func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config]) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
|
||||
@@ -33,7 +35,7 @@ func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.Telemetr
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hooks ...telemetrystore.TelemetryStoreHook) (telemetrystore.TelemetryStore, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/telemetrystore/clickhousetelemetrystore")
|
||||
|
||||
options, err := clickhouse.ParseDSN(config.ClickHouse.DSN)
|
||||
options, err := clickhouse.ParseDSN(config.Clickhouse.DSN)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -46,75 +48,85 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
|
||||
return nil, err
|
||||
}
|
||||
|
||||
engine, err := promengine.New(settings.Logger(), config.Clickhouse.Prometheus)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &provider{
|
||||
settings: settings,
|
||||
clickHouseConn: chConn,
|
||||
hooks: hooks,
|
||||
settings: settings,
|
||||
clickHouseConn: chConn,
|
||||
hooks: hooks,
|
||||
prometheusEngine: engine,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *provider) ClickHouseDB() clickhouse.Conn {
|
||||
func (p *provider) ClickhouseDB() clickhouse.Conn {
|
||||
return p
|
||||
}
|
||||
|
||||
func (p provider) Close() error {
|
||||
func (p *provider) PrometheusEngine() promengine.PromEngine {
|
||||
return p.prometheusEngine
|
||||
}
|
||||
|
||||
func (p *provider) Close() error {
|
||||
return p.clickHouseConn.Close()
|
||||
}
|
||||
|
||||
func (p provider) Ping(ctx context.Context) error {
|
||||
func (p *provider) Ping(ctx context.Context) error {
|
||||
return p.clickHouseConn.Ping(ctx)
|
||||
}
|
||||
|
||||
func (p provider) Stats() driver.Stats {
|
||||
func (p *provider) Stats() driver.Stats {
|
||||
return p.clickHouseConn.Stats()
|
||||
}
|
||||
|
||||
func (p provider) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
|
||||
func (p *provider) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
|
||||
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
||||
rows, err := p.clickHouseConn.Query(ctx, query, args...)
|
||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, rows, err)
|
||||
return rows, err
|
||||
}
|
||||
|
||||
func (p provider) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
|
||||
func (p *provider) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
|
||||
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
||||
row := p.clickHouseConn.QueryRow(ctx, query, args...)
|
||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, nil)
|
||||
return row
|
||||
}
|
||||
|
||||
func (p provider) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
|
||||
func (p *provider) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
|
||||
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
||||
err := p.clickHouseConn.Select(ctx, dest, query, args...)
|
||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (p provider) Exec(ctx context.Context, query string, args ...interface{}) error {
|
||||
func (p *provider) Exec(ctx context.Context, query string, args ...interface{}) error {
|
||||
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
||||
err := p.clickHouseConn.Exec(ctx, query, args...)
|
||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (p provider) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
|
||||
func (p *provider) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
|
||||
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
||||
err := p.clickHouseConn.AsyncInsert(ctx, query, wait, args...)
|
||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (p provider) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
|
||||
func (p *provider) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
|
||||
ctx, query, args := telemetrystore.WrapBeforeQuery(p.hooks, ctx, query)
|
||||
batch, err := p.clickHouseConn.PrepareBatch(ctx, query, opts...)
|
||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
||||
return batch, err
|
||||
}
|
||||
|
||||
func (p provider) ServerVersion() (*driver.ServerVersion, error) {
|
||||
func (p *provider) ServerVersion() (*driver.ServerVersion, error) {
|
||||
return p.clickHouseConn.ServerVersion()
|
||||
}
|
||||
|
||||
func (p provider) Contributors() []string {
|
||||
func (p *provider) Contributors() []string {
|
||||
return p.clickHouseConn.Contributors()
|
||||
}
|
||||
|
||||
@@ -2,28 +2,36 @@ package telemetrystore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/promengine"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
// Provider is the provider to use
|
||||
Provider string `mapstructure:"provider"`
|
||||
|
||||
// Connection is the connection configuration
|
||||
Connection ConnectionConfig `mapstructure:",squash"`
|
||||
|
||||
// Clickhouse is the clickhouse configuration
|
||||
ClickHouse ClickHouseConfig `mapstructure:"clickhouse"`
|
||||
Clickhouse ClickhouseConfig `mapstructure:"clickhouse"`
|
||||
}
|
||||
|
||||
type ConnectionConfig struct {
|
||||
// MaxOpenConns is the maximum number of open connections to the database.
|
||||
MaxOpenConns int `mapstructure:"max_open_conns"`
|
||||
MaxIdleConns int `mapstructure:"max_idle_conns"`
|
||||
DialTimeout time.Duration `mapstructure:"dial_timeout"`
|
||||
MaxOpenConns int `mapstructure:"max_open_conns"`
|
||||
|
||||
// MaxIdleConns is the maximum number of connections in the idle connection pool.
|
||||
MaxIdleConns int `mapstructure:"max_idle_conns"`
|
||||
|
||||
// DialTimeout is the timeout for dialing a new connection.
|
||||
DialTimeout time.Duration `mapstructure:"dial_timeout"`
|
||||
}
|
||||
|
||||
type ClickHouseQuerySettings struct {
|
||||
type QuerySettings struct {
|
||||
MaxExecutionTime int `mapstructure:"max_execution_time"`
|
||||
MaxExecutionTimeLeaf int `mapstructure:"max_execution_time_leaf"`
|
||||
TimeoutBeforeCheckingExecutionSpeed int `mapstructure:"timeout_before_checking_execution_speed"`
|
||||
@@ -31,15 +39,19 @@ type ClickHouseQuerySettings struct {
|
||||
MaxResultRowsForCHQuery int `mapstructure:"max_result_rows_for_ch_query"`
|
||||
}
|
||||
|
||||
type ClickHouseConfig struct {
|
||||
type ClickhouseConfig struct {
|
||||
// DSN is the database source name.
|
||||
DSN string `mapstructure:"dsn"`
|
||||
|
||||
QuerySettings ClickHouseQuerySettings `mapstructure:"settings"`
|
||||
// QuerySettings is the query settings for clickhouse.
|
||||
QuerySettings QuerySettings `mapstructure:"settings"`
|
||||
|
||||
// Prometheus is the prometheus configuration
|
||||
Prometheus promengine.Config `mapstructure:"prometheus"`
|
||||
}
|
||||
|
||||
func NewConfigFactory() factory.ConfigFactory {
|
||||
return factory.NewConfigFactory(factory.MustNewName("telemetrystore"), newConfig)
|
||||
|
||||
}
|
||||
|
||||
func newConfig() factory.Config {
|
||||
@@ -50,15 +62,39 @@ func newConfig() factory.Config {
|
||||
MaxIdleConns: 50,
|
||||
DialTimeout: 5 * time.Second,
|
||||
},
|
||||
ClickHouse: ClickHouseConfig{
|
||||
Clickhouse: ClickhouseConfig{
|
||||
DSN: "tcp://localhost:9000",
|
||||
Prometheus: promengine.Config{
|
||||
RemoteReadConfig: promengine.RemoteReadConfig{
|
||||
URL: &url.URL{
|
||||
Scheme: "tcp",
|
||||
Host: "localhost:9000",
|
||||
Path: "/signoz_metrics",
|
||||
},
|
||||
},
|
||||
ActiveQueryTrackerConfig: promengine.ActiveQueryTrackerConfig{
|
||||
Enabled: true,
|
||||
Path: "",
|
||||
MaxConcurrent: 20,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (c Config) Validate() error {
|
||||
if c.Provider != "clickhouse" {
|
||||
return fmt.Errorf("provider: %q is not supported", c.Provider)
|
||||
dsn, err := url.Parse(c.Clickhouse.DSN)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.Clickhouse.Prometheus.RemoteReadConfig.URL.Host != dsn.Host {
|
||||
return fmt.Errorf("mismatch between host in prometheus.remote_read.url %q and clickhouse.dsn %q", c.Clickhouse.Prometheus.RemoteReadConfig.URL.Host, dsn.Host)
|
||||
}
|
||||
|
||||
if c.Clickhouse.Prometheus.RemoteReadConfig.URL.Scheme != dsn.Scheme {
|
||||
return fmt.Errorf("mismatch between scheme in prometheus.remote_read.url %q and clickhouse.dsn %q", c.Clickhouse.Prometheus.RemoteReadConfig.URL.Scheme, dsn.Scheme)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
)
|
||||
|
||||
func TestNewWithEnvProvider(t *testing.T) {
|
||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN", "http://localhost:9000")
|
||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN", "tcp://localhost:9000")
|
||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS", "60")
|
||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS", "150")
|
||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT", "5s")
|
||||
@@ -33,22 +33,18 @@ func TestNewWithEnvProvider(t *testing.T) {
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
actual := &Config{}
|
||||
err = conf.Unmarshal("telemetrystore", actual)
|
||||
|
||||
actual := Config{}
|
||||
err = conf.Unmarshal("telemetrystore", &actual)
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := &Config{
|
||||
Provider: "clickhouse",
|
||||
Connection: ConnectionConfig{
|
||||
MaxOpenConns: 150,
|
||||
MaxIdleConns: 60,
|
||||
DialTimeout: 5 * time.Second,
|
||||
},
|
||||
ClickHouse: ClickHouseConfig{
|
||||
DSN: "http://localhost:9000",
|
||||
},
|
||||
}
|
||||
assert.NoError(t, actual.Validate())
|
||||
|
||||
expected := NewConfigFactory().New().(Config)
|
||||
expected.Provider = "clickhouse"
|
||||
expected.Connection.MaxOpenConns = 150
|
||||
expected.Connection.MaxIdleConns = 60
|
||||
expected.Connection.DialTimeout = 5 * time.Second
|
||||
expected.Clickhouse.DSN = "tcp://localhost:9000"
|
||||
|
||||
assert.Equal(t, expected, actual)
|
||||
}
|
||||
@@ -74,14 +70,14 @@ func TestNewWithEnvProviderWithQuerySettings(t *testing.T) {
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
actual := &Config{}
|
||||
err = conf.Unmarshal("telemetrystore", actual)
|
||||
actual := Config{}
|
||||
err = conf.Unmarshal("telemetrystore", &actual)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := &Config{
|
||||
ClickHouse: ClickHouseConfig{
|
||||
QuerySettings: ClickHouseQuerySettings{
|
||||
expected := Config{
|
||||
Clickhouse: ClickhouseConfig{
|
||||
QuerySettings: QuerySettings{
|
||||
MaxExecutionTime: 10,
|
||||
MaxExecutionTimeLeaf: 10,
|
||||
TimeoutBeforeCheckingExecutionSpeed: 10,
|
||||
@@ -91,5 +87,5 @@ func TestNewWithEnvProviderWithQuerySettings(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
assert.Equal(t, expected.ClickHouse.QuerySettings, actual.ClickHouse.QuerySettings)
|
||||
assert.Equal(t, expected.Clickhouse.QuerySettings, actual.Clickhouse.QuerySettings)
|
||||
}
|
||||
|
||||
@@ -5,10 +5,12 @@ import (
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
"github.com/SigNoz/signoz/pkg/promengine"
|
||||
)
|
||||
|
||||
type TelemetryStore interface {
|
||||
ClickHouseDB() clickhouse.Conn
|
||||
ClickhouseDB() clickhouse.Conn
|
||||
PrometheusEngine() promengine.PromEngine
|
||||
}
|
||||
|
||||
type TelemetryStoreHook interface {
|
||||
|
||||
@@ -12,29 +12,20 @@ import (
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
settings telemetrystore.ClickHouseQuerySettings
|
||||
settings telemetrystore.QuerySettings
|
||||
}
|
||||
|
||||
func NewFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("clickhousesettings"), New)
|
||||
func NewSettingsFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("settings"), NewSettings)
|
||||
}
|
||||
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
|
||||
func NewSettings(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
|
||||
return &provider{
|
||||
settings: config.ClickHouse.QuerySettings,
|
||||
settings: config.Clickhouse.QuerySettings,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (h *provider) BeforeQuery(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) {
|
||||
return h.clickHouseSettings(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (h *provider) AfterQuery(ctx context.Context, query string, args []interface{}, rows driver.Rows, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// clickHouseSettings adds clickhouse settings to queries
|
||||
func (h *provider) clickHouseSettings(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) {
|
||||
settings := clickhouse.Settings{}
|
||||
|
||||
// Apply default settings
|
||||
@@ -73,6 +64,9 @@ func (h *provider) clickHouseSettings(ctx context.Context, query string, args ..
|
||||
return ctx, query, args
|
||||
}
|
||||
|
||||
func (h *provider) AfterQuery(ctx context.Context, query string, args []interface{}, rows driver.Rows, err error) {
|
||||
}
|
||||
|
||||
func (h *provider) getLogComment(ctx context.Context) string {
|
||||
// Get the key-value pairs from context for log comment
|
||||
kv := ctx.Value(common.LogCommentKey)
|
||||
|
||||
@@ -2,6 +2,9 @@ package telemetrystoretest
|
||||
|
||||
import (
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/SigNoz/signoz/pkg/promengine"
|
||||
"github.com/SigNoz/signoz/pkg/promengine/promenginetest"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
)
|
||||
@@ -10,28 +13,38 @@ var _ telemetrystore.TelemetryStore = (*Provider)(nil)
|
||||
|
||||
// Provider represents a mock telemetry store provider for testing
|
||||
type Provider struct {
|
||||
mock cmock.ClickConnMockCommon
|
||||
clickhouseDB cmock.ClickConnMockCommon
|
||||
engine promengine.PromEngine
|
||||
}
|
||||
|
||||
// New creates a new mock telemetry store provider
|
||||
func New() (*Provider, error) {
|
||||
options := &clickhouse.Options{} // Default options
|
||||
mock, err := cmock.NewClickHouseNative(options)
|
||||
func New(matcher sqlmock.QueryMatcher) (*Provider, error) {
|
||||
clickhouseDB, err := cmock.NewClickHouseWithQueryMatcher(&clickhouse.Options{}, matcher)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
engine, err := promenginetest.New()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Provider{
|
||||
mock: mock,
|
||||
clickhouseDB: clickhouseDB,
|
||||
engine: engine,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *Provider) PrometheusEngine() promengine.PromEngine {
|
||||
return p.engine
|
||||
}
|
||||
|
||||
// ClickhouseDB returns the mock Clickhouse connection
|
||||
func (p *Provider) ClickHouseDB() clickhouse.Conn {
|
||||
return p.mock.(clickhouse.Conn)
|
||||
func (p *Provider) ClickhouseDB() clickhouse.Conn {
|
||||
return p.clickhouseDB.(clickhouse.Conn)
|
||||
}
|
||||
|
||||
// Mock returns the underlying Clickhouse mock instance for setting expectations
|
||||
func (p *Provider) Mock() cmock.ClickConnMockCommon {
|
||||
return p.mock
|
||||
return p.clickhouseDB
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package telemetrystoretest
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
@@ -19,7 +20,7 @@ func TestNew(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
provider, err := New()
|
||||
provider, err := New(sqlmock.QueryMatcherRegexp)
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, provider)
|
||||
@@ -29,7 +30,8 @@ func TestNew(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, provider)
|
||||
assert.NotNil(t, provider.Mock())
|
||||
assert.NotNil(t, provider.ClickHouseDB())
|
||||
assert.NotNil(t, provider.ClickhouseDB())
|
||||
assert.NotNil(t, provider.PrometheusEngine())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user