Compare commits

...

7 Commits

Author SHA1 Message Date
grandwizard28
51ecd97b70 feat(promengine): move directories 2025-03-24 18:12:38 +05:30
grandwizard28
fdf3331c0f feat(promengine): move directories 2025-03-24 18:12:15 +05:30
grandwizard28
fb2210d1e0 feat(promengine): add utf-8 validation 2025-03-24 18:05:24 +05:30
grandwizard28
6d5d047162 refactor: better readability 2025-03-24 18:05:24 +05:30
grandwizard28
eea07211e0 fix: fix a typo in telemetrystore 2025-03-24 18:05:23 +05:30
grandwizard28
386363b93e feat(promengine): add the deprecated flag 2025-03-24 18:05:23 +05:30
grandwizard28
ec3604c7d5 feat(promengine): create a dedicated promengine package 2025-03-24 18:05:21 +05:30
36 changed files with 617 additions and 523 deletions

View File

@@ -74,6 +74,10 @@ go-run-enterprise: ## Runs the enterprise go backend server
--use-logs-new-schema true \
--use-trace-new-schema true
.PHONY: go-test
go-test: ## Runs go unit tests
@go test -race ./...
.PHONY: go-run-community
go-run-community: ## Runs the community go backend server
@SIGNOZ_INSTRUMENTATION_LOGS_LEVEL=debug \

View File

@@ -72,7 +72,6 @@ sqlstore:
# The path to the SQLite database file.
path: /var/lib/signoz/signoz.db
##################### APIServer #####################
apiserver:
timeout:
@@ -91,20 +90,30 @@ apiserver:
- /api/v1/version
- /
##################### TelemetryStore #####################
telemetrystore:
# Specifies the telemetrystore provider to use.
provider: clickhouse
# Maximum number of idle connections in the connection pool.
max_idle_conns: 50
# Maximum number of open connections to the database.
max_open_conns: 100
# Maximum time to wait for a connection to be established.
dial_timeout: 5s
# Specifies the telemetrystore provider to use.
provider: clickhouse
clickhouse:
# The DSN to use for ClickHouse.
dsn: http://localhost:9000
# The DSN to use for clickhouse.
dsn: tcp://localhost:9000
prometheus:
remote_read:
# The URL to use for remote read. It must be a clickhouse dsn pointing to the database where metrics is stored.
url: tcp://localhost:9000/signoz_metrics
active_query_tracker:
# Whether to enable the active query tracker.
enabled: true
# The path to use for the active query tracker.
path: ""
# The maximum number of concurrent queries.
max_concurrent: 20
##################### Alertmanager #####################
alertmanager:
@@ -117,7 +126,7 @@ alertmanager:
# The poll interval for periodically syncing the alertmanager with the config in the store.
poll_interval: 1m
# The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself.
external_url: http://localhost:9093
external_url: http://localhost:8080
# The global configuration for the alertmanager. All the exahustive fields can be found in the upstream: https://github.com/prometheus/alertmanager/blob/efa05feffd644ba4accb526e98a8c6545d26a783/config/config.go#L833
global:
# ResolveTimeout is the time after which an alert is declared resolved if it has not been updated.

View File

@@ -10,6 +10,7 @@ import (
"github.com/SigNoz/signoz/pkg/cache"
basechr "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/telemetrystore"
)
type ClickhouseReader struct {
@@ -20,8 +21,7 @@ type ClickhouseReader struct {
func NewDataConnector(
localDB *sqlx.DB,
ch clickhouse.Conn,
promConfigPath string,
telemetryStore telemetrystore.TelemetryStore,
lm interfaces.FeatureLookup,
cluster string,
useLogsNewSchema bool,
@@ -29,14 +29,10 @@ func NewDataConnector(
fluxIntervalForTraceDetail time.Duration,
cache cache.Cache,
) *ClickhouseReader {
chReader := basechr.NewReader(localDB, ch, promConfigPath, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
chReader := basechr.NewReader(localDB, telemetryStore, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
return &ClickhouseReader{
conn: ch,
conn: telemetryStore.ClickhouseDB(),
appdb: localDB,
ClickHouseReader: chReader,
}
}
func (r *ClickhouseReader) Start(readerReady chan bool) {
r.ClickHouseReader.Start(readerReady)
}

View File

@@ -18,13 +18,13 @@ import (
"github.com/SigNoz/signoz/ee/query-service/constants"
"github.com/SigNoz/signoz/ee/query-service/dao"
"github.com/SigNoz/signoz/ee/query-service/integrations/gateway"
"github.com/SigNoz/signoz/ee/query-service/interfaces"
"github.com/SigNoz/signoz/ee/query-service/rules"
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/http/middleware"
"github.com/SigNoz/signoz/pkg/query-service/auth"
"github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/web"
@@ -49,7 +49,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/healthcheck"
baseint "github.com/SigNoz/signoz/pkg/query-service/interfaces"
basemodel "github.com/SigNoz/signoz/pkg/query-service/model"
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
baserules "github.com/SigNoz/signoz/pkg/query-service/rules"
"github.com/SigNoz/signoz/pkg/query-service/telemetry"
"github.com/SigNoz/signoz/pkg/query-service/utils"
@@ -137,18 +136,15 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
// set license manager as feature flag provider in dao
modelDao.SetFlagProvider(lm)
readerReady := make(chan bool)
fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail)
if err != nil {
return nil, err
}
var reader interfaces.DataConnector
qb := db.NewDataConnector(
reader := db.NewDataConnector(
serverOptions.SigNoz.SQLStore.SQLxDB(),
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
serverOptions.PromConfigPath,
serverOptions.SigNoz.TelemetryStore,
lm,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
@@ -156,8 +152,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
fluxIntervalForTraceDetail,
serverOptions.SigNoz.Cache,
)
go qb.Start(readerReady)
reader = qb
skipConfig := &basemodel.SkipConfig{}
if serverOptions.SkipTopLvlOpsPath != "" {
@@ -176,9 +170,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
c = cache.NewCache(cacheOpts)
}
<-readerReady
rm, err := makeRulesManager(
serverOptions.PromConfigPath,
serverOptions.RuleRepoURL,
serverOptions.SigNoz.SQLStore.SQLxDB(),
reader,
@@ -189,6 +181,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.UseTraceNewSchema,
serverOptions.SigNoz.Alertmanager,
serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore,
)
if err != nil {
@@ -233,7 +226,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
// start the usagemanager
usageManager, err := usage.New(modelDao, lm.GetRepo(), serverOptions.SigNoz.TelemetryStore.ClickHouseDB(), serverOptions.Config.TelemetryStore.ClickHouse.DSN)
usageManager, err := usage.New(modelDao, lm.GetRepo(), serverOptions.SigNoz.TelemetryStore.ClickhouseDB(), serverOptions.Config.TelemetryStore.Clickhouse.DSN)
if err != nil {
return nil, err
}
@@ -304,7 +297,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
&opAmpModel.AllAgents, agentConfMgr,
)
errorList := qb.PreloadMetricsMetadata(context.Background())
errorList := reader.PreloadMetricsMetadata(context.Background())
for _, er := range errorList {
zap.L().Error("failed to preload metrics metadata", zap.Error(er))
}
@@ -538,7 +531,6 @@ func (s *Server) Stop() error {
}
func makeRulesManager(
promConfigPath,
ruleRepoURL string,
db *sqlx.DB,
ch baseint.Reader,
@@ -549,16 +541,11 @@ func makeRulesManager(
useTraceNewSchema bool,
alertmanager alertmanager.Alertmanager,
sqlstore sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
) (*baserules.Manager, error) {
// create engine
pqle, err := pqle.FromConfigPath(promConfigPath)
if err != nil {
return nil, fmt.Errorf("failed to create pql engine : %v", err)
}
// create manager opts
managerOpts := &baserules.ManagerOptions{
PqlEngine: pqle,
TelemetryStore: telemetryStore,
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),

View File

@@ -7,6 +7,5 @@ import (
// Connector defines methods for interaction
// with o11y data. for example - clickhouse
type DataConnector interface {
Start(readerReady chan bool)
baseint.Reader
}

View File

@@ -88,6 +88,7 @@ func main() {
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
Config: promConfigPath,
})
if err != nil {
zap.L().Fatal("Failed to create config", zap.Error(err))
@@ -104,7 +105,7 @@ func main() {
signoz.NewTelemetryStoreProviderFactories(),
)
if err != nil {
zap.L().Fatal("Failed to create signoz struct", zap.Error(err))
zap.L().Fatal("Failed to create signoz", zap.Error(err))
}
jwtSecret := os.Getenv("SIGNOZ_JWT_SECRET")

View File

@@ -48,7 +48,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Rule,
opts.Logger,
opts.Reader,
opts.ManagerOpts.PqlEngine,
opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
baserules.WithSQLStore(opts.SQLStore),
)
@@ -145,7 +145,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
parsedRule,
opts.Logger,
opts.Reader,
opts.ManagerOpts.PqlEngine,
opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),

4
go.mod
View File

@@ -34,7 +34,6 @@ require (
github.com/knadh/koanf/v2 v2.1.1
github.com/mailru/easyjson v0.7.7
github.com/mattn/go-sqlite3 v1.14.24
github.com/oklog/oklog v0.3.2
github.com/open-telemetry/opamp-go v0.5.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.111.0
github.com/opentracing/opentracing-go v1.2.0
@@ -93,6 +92,7 @@ require (
github.com/armon/go-metrics v0.4.1 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/aws/aws-sdk-go v1.55.5 // indirect
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect
github.com/beevik/etree v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
@@ -132,6 +132,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/s2a-go v0.1.8 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect
@@ -262,6 +263,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.29.0 // indirect

2
go.sum
View File

@@ -690,8 +690,6 @@ github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnu
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/oklog v0.3.2 h1:wVfs8F+in6nTBMkA7CbRw+zZMIB7nNM825cM1wuzoTk=
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=

View File

@@ -1,10 +1,15 @@
package instrumentation
import (
"context"
"fmt"
"log/slog"
"os"
"time"
"github.com/SigNoz/signoz/pkg/instrumentation/loghandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
)
func NewLogger(config Config, wrappers ...loghandler.Wrapper) *slog.Logger {
@@ -33,3 +38,65 @@ func NewLogger(config Config, wrappers ...loghandler.Wrapper) *slog.Logger {
return logger
}
var _ log.Logger = (*gokitLogger)(nil)
type gokitLogger struct {
handler slog.Handler
messageKey string
}
func NewGoKitLoggerFromSlogHandler(handler slog.Handler, messageKey string) log.Logger {
return &gokitLogger{handler, messageKey}
}
func (l *gokitLogger) Log(keyvals ...any) error {
var (
attrs []slog.Attr
message string
gkl level.Value
)
for i := 1; i < len(keyvals); i += 2 {
// go-kit/log keys don't have to be strings, but slog keys do.
// Convert the go-kit key to a string with fmt.Sprint.
key, ok := keyvals[i-1].(string)
if !ok {
key = fmt.Sprint(keyvals[i-1])
}
if l.messageKey != "" && key == l.messageKey {
message = fmt.Sprint(keyvals[i])
continue
}
if l, ok := keyvals[i].(level.Value); ok {
gkl = l
continue
}
attrs = append(attrs, slog.Any(key, keyvals[i]))
}
var sl slog.Level
if gkl != nil {
switch gkl {
case level.DebugValue():
sl = slog.LevelDebug
case level.InfoValue():
sl = slog.LevelInfo
case level.WarnValue():
sl = slog.LevelWarn
case level.ErrorValue():
sl = slog.LevelError
}
}
if !l.handler.Enabled(context.Background(), sl) {
return nil
}
r := slog.NewRecord(time.Now(), sl, message, 0)
r.AddAttrs(attrs...)
return l.handler.Handle(context.Background(), r)
}

18
pkg/promengine/config.go Normal file
View File

@@ -0,0 +1,18 @@
package promengine
import "net/url"
type ActiveQueryTrackerConfig struct {
Enabled bool `mapstructure:"enabled"`
Path string `mapstructure:"path"`
MaxConcurrent int `mapstructure:"max_concurrent"`
}
type RemoteReadConfig struct {
URL *url.URL `mapstructure:"url"`
}
type Config struct {
RemoteReadConfig RemoteReadConfig `mapstructure:"remote_read"`
ActiveQueryTrackerConfig ActiveQueryTrackerConfig `mapstructure:"active_query_tracker"`
}

87
pkg/promengine/ext.go Normal file
View File

@@ -0,0 +1,87 @@
package promengine
import (
"fmt"
"log/slog"
"time"
"github.com/SigNoz/signoz/pkg/instrumentation"
commoncfg "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
)
var _ PromEngine = (*ExtEngine)(nil)
type ExtEngine struct {
engine *promql.Engine
fanoutStorage storage.Storage
}
func New(logger *slog.Logger, cfg Config) (*ExtEngine, error) {
if logger == nil {
return nil, fmt.Errorf("logger is required")
}
gokitLogger := instrumentation.NewGoKitLoggerFromSlogHandler(logger.Handler(), "msg")
var activeQueryTracker promql.QueryTracker
if cfg.ActiveQueryTrackerConfig.Enabled {
activeQueryTracker = promql.NewActiveQueryTracker(
cfg.ActiveQueryTrackerConfig.Path,
cfg.ActiveQueryTrackerConfig.MaxConcurrent,
gokitLogger,
)
}
engine := promql.NewEngine(promql.EngineOpts{
Logger: gokitLogger,
Reg: nil,
MaxSamples: 50000000,
Timeout: time.Duration(2 * time.Minute),
ActiveQueryTracker: activeQueryTracker,
})
remoteStorage := remote.NewStorage(
gokitLogger,
nil,
func() (int64, error) { return int64(model.Latest), nil },
"",
time.Duration(1*time.Minute),
nil,
false,
)
fanoutStorage := storage.NewFanout(gokitLogger, remoteStorage)
config := &config.Config{
RemoteReadConfigs: []*config.RemoteReadConfig{
{
URL: &commoncfg.URL{URL: cfg.RemoteReadConfig.URL},
},
},
}
if err := config.UnmarshalYAML(func(i interface{}) error { return nil }); err != nil {
return nil, err
}
if err := remoteStorage.ApplyConfig(config); err != nil {
return nil, err
}
return &ExtEngine{
engine: engine,
fanoutStorage: fanoutStorage,
}, nil
}
func (engine *ExtEngine) Engine() *promql.Engine {
return engine.engine
}
func (engine *ExtEngine) Storage() storage.Storage {
return engine.fanoutStorage
}

View File

@@ -0,0 +1,20 @@
package promengine
import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
)
type PromEngine interface {
// Engine returns the underlying promql engine
Engine() *promql.Engine
// Storage returns the underlying storage
Storage() storage.Storage
}
// init initializes the prometheus model with UTF8 validation
func init() {
model.NameValidationScheme = model.UTF8Validation
}

View File

@@ -0,0 +1,46 @@
package promenginetest
import (
"time"
"github.com/SigNoz/signoz/pkg/promengine"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
)
var _ promengine.PromEngine = (*Engine)(nil)
type Engine struct {
engine *promql.Engine
storage storage.Storage
}
func New(outOfOrderTimeWindow ...int64) (*Engine, error) {
engine := promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 50000000,
Timeout: time.Duration(2 * time.Minute),
ActiveQueryTracker: nil,
})
testStorage, err := NewStorageWithError(outOfOrderTimeWindow...)
if err != nil {
return nil, err
}
fanoutStorage := storage.NewFanout(nil, testStorage)
return &Engine{
engine: engine,
storage: fanoutStorage,
}, nil
}
func (e *Engine) Engine() *promql.Engine {
return e.engine
}
func (e *Engine) Storage() storage.Storage {
return e.storage
}

View File

@@ -0,0 +1,76 @@
package promenginetest
import (
"fmt"
"os"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
)
type TestStorage struct {
*tsdb.DB
exemplarStorage tsdb.ExemplarStorage
dir string
}
// NewStorageWithError returns a new TestStorage for user facing tests, which reports
// errors directly.
func NewStorageWithError(outOfOrderTimeWindow ...int64) (*TestStorage, error) {
dir, err := os.MkdirTemp("", "test_storage")
if err != nil {
return nil, fmt.Errorf("opening test directory: %w", err)
}
// Tests just load data for a series sequentially. Thus we
// need a long appendable window.
opts := tsdb.DefaultOptions()
opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.RetentionDuration = 0
opts.EnableNativeHistograms = true
// Set OutOfOrderTimeWindow if provided, otherwise use default (0)
if len(outOfOrderTimeWindow) > 0 {
opts.OutOfOrderTimeWindow = outOfOrderTimeWindow[0]
} else {
opts.OutOfOrderTimeWindow = 0 // Default value is zero
}
db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats())
if err != nil {
return nil, fmt.Errorf("opening test storage: %w", err)
}
reg := prometheus.NewRegistry()
eMetrics := tsdb.NewExemplarMetrics(reg)
es, err := tsdb.NewCircularExemplarStorage(10, eMetrics)
if err != nil {
return nil, fmt.Errorf("opening test exemplar storage: %w", err)
}
return &TestStorage{DB: db, exemplarStorage: es, dir: dir}, nil
}
func (s TestStorage) Close() error {
if err := s.DB.Close(); err != nil {
return err
}
return os.RemoveAll(s.dir)
}
func (s TestStorage) ExemplarAppender() storage.ExemplarAppender {
return s
}
func (s TestStorage) ExemplarQueryable() storage.ExemplarQueryable {
return s.exemplarStorage
}
func (s TestStorage) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
return ref, s.exemplarStorage.AddExemplar(l, e)
}

View File

@@ -7,7 +7,6 @@ import (
"fmt"
"math"
"math/rand"
"os"
"reflect"
"regexp"
"sort"
@@ -16,20 +15,16 @@ import (
"sync"
"time"
"github.com/SigNoz/signoz/pkg/promengine"
"github.com/SigNoz/signoz/pkg/query-service/model/metrics_explorer"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/mailru/easyjson"
"github.com/oklog/oklog/pkg/group"
"github.com/pkg/errors"
"github.com/prometheus/common/promlog"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/util/stats"
"github.com/ClickHouse/clickhouse-go/v2"
@@ -38,7 +33,6 @@ import (
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/jmoiron/sqlx"
promModel "github.com/prometheus/common/model"
"go.uber.org/zap"
queryprogress "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader/query_progress"
@@ -120,6 +114,7 @@ var (
// SpanWriter for reading spans from ClickHouse
type ClickHouseReader struct {
db clickhouse.Conn
engine promengine.PromEngine
localDB *sqlx.DB
TraceDB string
operationsTable string
@@ -138,9 +133,6 @@ type ClickHouseReader struct {
logsAttributeKeys string
logsResourceKeys string
logsTagAttributeTableV2 string
queryEngine *promql.Engine
remoteStorage *remote.Storage
fanoutStorage *storage.Storage
queryProgressTracker queryprogress.QueryProgressTracker
logsTableV2 string
@@ -175,8 +167,7 @@ type ClickHouseReader struct {
// NewTraceReader returns a TraceReader for the database
func NewReader(
localDB *sqlx.DB,
db driver.Conn,
configFile string,
telemetryStore telemetrystore.TelemetryStore,
featureFlag interfaces.FeatureLookup,
cluster string,
useLogsNewSchema bool,
@@ -185,14 +176,13 @@ func NewReader(
cache cache.Cache,
) *ClickHouseReader {
options := NewOptions(primaryNamespace, archiveNamespace)
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
return NewReaderFromClickhouseConnection(options, localDB, telemetryStore, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
}
func NewReaderFromClickhouseConnection(
db driver.Conn,
options *Options,
localDB *sqlx.DB,
configFile string,
telemetryStore telemetrystore.TelemetryStore,
featureFlag interfaces.FeatureLookup,
cluster string,
useLogsNewSchema bool,
@@ -215,7 +205,8 @@ func NewReaderFromClickhouseConnection(
}
return &ClickHouseReader{
db: db,
db: telemetryStore.ClickhouseDB(),
engine: telemetryStore.PrometheusEngine(),
localDB: localDB,
TraceDB: options.primary.TraceDB,
operationsTable: options.primary.OperationsTable,
@@ -235,7 +226,6 @@ func NewReaderFromClickhouseConnection(
logsResourceKeys: options.primary.LogsResourceKeysTable,
logsTagAttributeTableV2: options.primary.LogsTagAttributeTableV2,
liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds,
promConfigFile: configFile,
featureFlags: featureFlag,
cluster: cluster,
queryProgressTracker: queryprogress.NewQueryProgressTracker(),
@@ -262,154 +252,8 @@ func NewReaderFromClickhouseConnection(
}
}
func (r *ClickHouseReader) Start(readerReady chan bool) {
logLevel := promlog.AllowedLevel{}
logLevel.Set("debug")
allowedFormat := promlog.AllowedFormat{}
allowedFormat.Set("logfmt")
promlogConfig := promlog.Config{
Level: &logLevel,
Format: &allowedFormat,
}
logger := promlog.New(&promlogConfig)
startTime := func() (int64, error) {
return int64(promModel.Latest), nil
}
remoteStorage := remote.NewStorage(
log.With(logger, "component", "remote"),
nil,
startTime,
"",
time.Duration(1*time.Minute),
nil,
false,
)
cfg := struct {
configFile string
localStoragePath string
lookbackDelta promModel.Duration
webTimeout promModel.Duration
queryTimeout promModel.Duration
queryConcurrency int
queryMaxSamples int
RemoteFlushDeadline promModel.Duration
prometheusURL string
logLevel promlog.AllowedLevel
}{
configFile: r.promConfigFile,
}
fanoutStorage := storage.NewFanout(logger, remoteStorage)
opts := promql.EngineOpts{
Logger: log.With(logger, "component", "query engine"),
Reg: nil,
MaxSamples: 50000000,
Timeout: time.Duration(2 * time.Minute),
ActiveQueryTracker: promql.NewActiveQueryTracker(
"",
20,
log.With(logger, "component", "activeQueryTracker"),
),
}
queryEngine := promql.NewEngine(opts)
reloaders := []func(cfg *config.Config) error{
remoteStorage.ApplyConfig,
}
// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
type closeOnce struct {
C chan struct{}
once sync.Once
Close func()
}
// Wait until the server is ready to handle reloading.
reloadReady := &closeOnce{
C: make(chan struct{}),
}
reloadReady.Close = func() {
reloadReady.once.Do(func() {
close(reloadReady.C)
})
}
var g group.Group
{
// Initial configuration loading.
cancel := make(chan struct{})
g.Add(
func() error {
var err error
r.promConfig, err = reloadConfig(cfg.configFile, logger, reloaders...)
if err != nil {
return fmt.Errorf("error loading config from %q: %s", cfg.configFile, err)
}
reloadReady.Close()
<-cancel
return nil
},
func(err error) {
close(cancel)
},
)
}
r.queryEngine = queryEngine
r.remoteStorage = remoteStorage
r.fanoutStorage = &fanoutStorage
readerReady <- true
if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
}
func (r *ClickHouseReader) GetQueryEngine() *promql.Engine {
return r.queryEngine
}
func (r *ClickHouseReader) GetFanoutStorage() *storage.Storage {
return r.fanoutStorage
}
func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (promConfig *config.Config, err error) {
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
conf, err := config.LoadFile(filename, false, false, logger)
if err != nil {
return nil, fmt.Errorf("couldn't load configuration (--config.file=%q): %v", filename, err)
}
failed := false
for _, rl := range rls {
if err := rl(conf); err != nil {
level.Error(logger).Log("msg", "Failed to apply configuration", "err", err)
failed = true
}
}
if failed {
return nil, fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename)
}
level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename)
return conf, nil
}
func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
qry, err := r.queryEngine.NewInstantQuery(ctx, r.remoteStorage, nil, queryParams.Query, queryParams.Time)
qry, err := r.engine.Engine().NewInstantQuery(ctx, r.engine.Storage(), nil, queryParams.Query, queryParams.Time)
if err != nil {
return nil, nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
}
@@ -428,7 +272,7 @@ func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, que
}
func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
qry, err := r.queryEngine.NewRangeQuery(ctx, r.remoteStorage, nil, query.Query, query.Start, query.End, query.Step)
qry, err := r.engine.Engine().NewRangeQuery(ctx, r.engine.Storage(), nil, query.Query, query.Start, query.End, query.Step)
if err != nil {
return nil, nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}

View File

@@ -5,11 +5,11 @@ import (
"encoding/json"
"fmt"
"math"
"regexp"
"strings"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
@@ -18,6 +18,7 @@ import (
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require"
)
@@ -1131,20 +1132,6 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
}
}
type regexMatcher struct {
}
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
re, err := regexp.Compile(expectedSQL)
if err != nil {
return err
}
if !re.MatchString(actualSQL) {
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
}
return nil
}
func Test_querier_runWindowBasedListQuery(t *testing.T) {
params := &v3.QueryRangeParamsV3{
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
@@ -1358,8 +1345,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Setup mock
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &regexMatcher{})
require.NoError(t, err, "Failed to create ClickHouse mock")
telemetryStore, err := telemetrystoretest.New(sqlmock.QueryMatcherRegexp)
require.NoError(t, err)
// Configure mock responses
for _, response := range tc.queryResponses {
@@ -1368,7 +1355,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
values = append(values, []any{&ts, &testName})
}
// if len(values) > 0 {
mock.ExpectQuery(response.expectedQuery).WillReturnRows(
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
cmock.NewRows(cols, values),
)
// }
@@ -1376,10 +1363,9 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
// Create reader and querier
reader := clickhouseReader.NewReaderFromClickhouseConnection(
mock,
options,
nil,
"",
telemetryStore,
featureManager.StartManager(),
"",
true,
@@ -1429,7 +1415,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
}
// Verify mock expectations
err = mock.ExpectationsWereMet()
err = telemetryStore.Mock().ExpectationsWereMet()
require.NoError(t, err, "Mock expectations were not met")
})
}

View File

@@ -5,11 +5,11 @@ import (
"encoding/json"
"fmt"
"math"
"regexp"
"strings"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
@@ -18,6 +18,7 @@ import (
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require"
)
@@ -1185,20 +1186,6 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
}
}
type regexMatcher struct {
}
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
re, err := regexp.Compile(expectedSQL)
if err != nil {
return err
}
if !re.MatchString(actualSQL) {
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
}
return nil
}
func Test_querier_runWindowBasedListQuery(t *testing.T) {
params := &v3.QueryRangeParamsV3{
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
@@ -1412,8 +1399,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Setup mock
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &regexMatcher{})
require.NoError(t, err, "Failed to create ClickHouse mock")
telemetryStore, err := telemetrystoretest.New(sqlmock.QueryMatcherRegexp)
require.NoError(t, err)
// Configure mock responses
for _, response := range tc.queryResponses {
@@ -1422,7 +1409,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
values = append(values, []any{&ts, &testName})
}
// if len(values) > 0 {
mock.ExpectQuery(response.expectedQuery).WillReturnRows(
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
cmock.NewRows(cols, values),
)
// }
@@ -1430,10 +1417,9 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
// Create reader and querier
reader := clickhouseReader.NewReaderFromClickhouseConnection(
mock,
options,
nil,
"",
telemetryStore,
featureManager.StartManager(),
"",
true,
@@ -1483,7 +1469,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
}
// Verify mock expectations
err = mock.ExpectationsWereMet()
err = telemetryStore.Mock().ExpectationsWereMet()
require.NoError(t, err, "Mock expectations were not met")
})
}

View File

@@ -25,6 +25,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/app/preferences"
"github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/web"
@@ -40,7 +41,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/healthcheck"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
"github.com/SigNoz/signoz/pkg/query-service/rules"
"github.com/SigNoz/signoz/pkg/query-service/telemetry"
"github.com/SigNoz/signoz/pkg/query-service/utils"
@@ -119,10 +119,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err
}
clickhouseReader := clickhouseReader.NewReader(
reader := clickhouseReader.NewReader(
serverOptions.SigNoz.SQLStore.SQLxDB(),
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
serverOptions.PromConfigPath,
serverOptions.SigNoz.TelemetryStore,
fm,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
@@ -130,8 +129,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
fluxIntervalForTraceDetail,
serverOptions.SigNoz.Cache,
)
go clickhouseReader.Start(readerReady)
reader := clickhouseReader
skipConfig := &model.SkipConfig{}
if serverOptions.SkipTopLvlOpsPath != "" {
@@ -162,6 +159,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore,
)
if err != nil {
return nil, err
@@ -491,17 +489,11 @@ func makeRulesManager(
useLogsNewSchema bool,
useTraceNewSchema bool,
sqlstore sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
) (*rules.Manager, error) {
// create engine
pqle, err := pqle.FromReader(ch)
if err != nil {
return nil, fmt.Errorf("failed to create pql engine : %v", err)
}
// create manager opts
managerOpts := &rules.ManagerOptions{
PqlEngine: pqle,
TelemetryStore: telemetryStore,
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),

View File

@@ -9,7 +9,6 @@ import (
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/stats"
)
@@ -86,10 +85,6 @@ type Reader interface {
req *v3.QBFilterSuggestionsRequest,
) (*v3.QBFilterSuggestionsResponse, *model.ApiError)
// Connection needed for rules, not ideal but required
GetQueryEngine() *promql.Engine
GetFanoutStorage() *storage.Storage
QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error)
CheckClickHouse(ctx context.Context) error

View File

@@ -86,6 +86,7 @@ func main() {
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
Config: promConfigPath,
})
if err != nil {
zap.L().Fatal("Failed to create config", zap.Error(err))
@@ -102,7 +103,7 @@ func main() {
signoz.NewTelemetryStoreProviderFactories(),
)
if err != nil {
zap.L().Fatal("Failed to create signoz struct", zap.Error(err))
zap.L().Fatal("Failed to create signoz", zap.Error(err))
}
// Read the jwt secret key

View File

@@ -1,126 +0,0 @@
package promql
import (
"context"
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/go-kit/log"
pmodel "github.com/prometheus/common/model"
"github.com/prometheus/common/promlog"
pconfig "github.com/prometheus/prometheus/config"
pql "github.com/prometheus/prometheus/promql"
pstorage "github.com/prometheus/prometheus/storage"
premote "github.com/prometheus/prometheus/storage/remote"
)
type PqlEngine struct {
engine *pql.Engine
fanoutStorage pstorage.Storage
}
func FromConfigPath(promConfigPath string) (*PqlEngine, error) {
// load storage path
c, err := pconfig.LoadFile(promConfigPath, false, false, nil)
if err != nil {
return nil, fmt.Errorf("couldn't load configuration (--config.file=%q): %v", promConfigPath, err)
}
return NewPqlEngine(c)
}
func FromReader(ch interfaces.Reader) (*PqlEngine, error) {
return &PqlEngine{
engine: ch.GetQueryEngine(),
fanoutStorage: *ch.GetFanoutStorage(),
}, nil
}
func NewPqlEngine(config *pconfig.Config) (*PqlEngine, error) {
logLevel := promlog.AllowedLevel{}
_ = logLevel.Set("debug")
allowedFormat := promlog.AllowedFormat{}
_ = allowedFormat.Set("logfmt")
promlogConfig := promlog.Config{
Level: &logLevel,
Format: &allowedFormat,
}
logger := promlog.New(&promlogConfig)
opts := pql.EngineOpts{
Logger: log.With(logger, "component", "promql evaluator"),
Reg: nil,
MaxSamples: 50000000,
Timeout: time.Duration(2 * time.Minute),
ActiveQueryTracker: pql.NewActiveQueryTracker(
"",
20,
logger,
),
}
e := pql.NewEngine(opts)
startTime := func() (int64, error) {
return int64(pmodel.Latest), nil
}
remoteStorage := premote.NewStorage(
log.With(logger, "component", "remote"),
nil,
startTime,
"",
time.Duration(1*time.Minute),
nil,
false,
)
fanoutStorage := pstorage.NewFanout(logger, remoteStorage)
_ = remoteStorage.ApplyConfig(config)
return &PqlEngine{
engine: e,
fanoutStorage: fanoutStorage,
}, nil
}
func (p *PqlEngine) RunAlertQuery(ctx context.Context, qs string, start, end time.Time, interval time.Duration) (pql.Matrix, error) {
q, err := p.engine.NewRangeQuery(ctx, p.fanoutStorage, nil, qs, start, end, interval)
if err != nil {
return nil, err
}
res := q.Exec(ctx)
if res.Err != nil {
return nil, res.Err
}
switch typ := res.Value.(type) {
case pql.Vector:
series := make([]pql.Series, 0, len(typ))
value := res.Value.(pql.Vector)
for _, smpl := range value {
series = append(series, pql.Series{
Metric: smpl.Metric,
Floats: []pql.FPoint{{T: smpl.T, F: smpl.F}},
})
}
return series, nil
case pql.Scalar:
value := res.Value.(pql.Scalar)
series := make([]pql.Series, 0, 1)
series = append(series, pql.Series{
Floats: []pql.FPoint{{T: value.T, F: value.V}},
})
return series, nil
case pql.Matrix:
return res.Value.(pql.Matrix), nil
default:
return nil, fmt.Errorf("rule result is not a vector or scalar")
}
}

View File

@@ -21,9 +21,9 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/cache"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
"github.com/SigNoz/signoz/pkg/query-service/telemetry"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
"github.com/SigNoz/signoz/pkg/types/authtypes"
)
@@ -76,8 +76,7 @@ func prepareTaskName(ruleId interface{}) string {
// ManagerOptions bundles options for the Manager.
type ManagerOptions struct {
PqlEngine *pqle.PqlEngine
TelemetryStore telemetrystore.TelemetryStore
// RepoURL is used to generate a backlink in sent alert messages
RepoURL string
@@ -180,7 +179,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.Rule,
opts.Logger,
opts.Reader,
opts.ManagerOpts.PqlEngine,
opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
WithSQLStore(opts.SQLStore),
)

View File

@@ -8,21 +8,22 @@ import (
"go.uber.org/zap"
"github.com/SigNoz/signoz/pkg/promengine"
"github.com/SigNoz/signoz/pkg/query-service/formatter"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/query-service/utils/times"
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
"github.com/prometheus/prometheus/promql"
pql "github.com/prometheus/prometheus/promql"
yaml "gopkg.in/yaml.v2"
)
type PromRule struct {
*BaseRule
pqlEngine *pqle.PqlEngine
pqlEngine promengine.PromEngine
}
func NewPromRule(
@@ -30,7 +31,7 @@ func NewPromRule(
postableRule *PostableRule,
logger *zap.Logger,
reader interfaces.Reader,
pqlEngine *pqle.PqlEngine,
pqlEngine promengine.PromEngine,
opts ...RuleOption,
) (*PromRule, error) {
@@ -108,7 +109,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
return nil, err
}
zap.L().Info("evaluating promql query", zap.String("name", r.Name()), zap.String("query", q))
res, err := r.pqlEngine.RunAlertQuery(ctx, q, start, end, interval)
res, err := r.RunAlertQuery(ctx, q, start, end, interval)
if err != nil {
r.SetHealth(HealthBad)
r.SetLastError(err)
@@ -306,6 +307,43 @@ func (r *PromRule) String() string {
return string(byt)
}
func (r *PromRule) RunAlertQuery(ctx context.Context, qs string, start, end time.Time, interval time.Duration) (pql.Matrix, error) {
q, err := r.pqlEngine.Engine().NewRangeQuery(ctx, r.pqlEngine.Storage(), nil, qs, start, end, interval)
if err != nil {
return nil, err
}
res := q.Exec(ctx)
if res.Err != nil {
return nil, res.Err
}
switch typ := res.Value.(type) {
case pql.Vector:
series := make([]pql.Series, 0, len(typ))
value := res.Value.(pql.Vector)
for _, smpl := range value {
series = append(series, pql.Series{
Metric: smpl.Metric,
Floats: []pql.FPoint{{T: smpl.T, F: smpl.F}},
})
}
return series, nil
case pql.Scalar:
value := res.Value.(pql.Scalar)
series := make([]pql.Series, 0, 1)
series = append(series, pql.Series{
Floats: []pql.FPoint{{T: value.T, F: value.V}},
})
return series, nil
case pql.Matrix:
return res.Value.(pql.Matrix), nil
default:
return nil, fmt.Errorf("rule result is not a vector or scalar")
}
}
func toCommonSeries(series promql.Series) v3.Series {
commonSeries := v3.Series{
Labels: make(map[string]string),

View File

@@ -68,7 +68,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
parsedRule,
opts.Logger,
opts.Reader,
opts.ManagerOpts.PqlEngine,
opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
WithSendAlways(),
WithSendUnmatched(),
WithSQLStore(opts.SQLStore),

View File

@@ -3,19 +3,22 @@ package rules
import (
"context"
"fmt"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/factory/factorytest"
"strings"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/factory/factorytest"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/featureManager"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
)
@@ -1152,10 +1155,8 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
},
}
fm := featureManager.StartManager()
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{})
if err != nil {
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
}
telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
require.NoError(t, err)
cols := make([]cmock.ColumnType, 0)
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"})
@@ -1227,11 +1228,11 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
for idx, c := range cases {
rows := cmock.NewRows(cols, c.values)
mock.ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
// We are testing the eval logic after the query is run
// so we don't care about the query string here
queryString := "SELECT any"
mock.
telemetryStore.Mock().
ExpectQuery(queryString).
WillReturnRows(rows)
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
@@ -1246,7 +1247,8 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), readerCache)
require.NoError(t, err)
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), readerCache)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
@@ -1305,10 +1307,8 @@ func TestThresholdRuleNoData(t *testing.T) {
},
}
fm := featureManager.StartManager()
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{})
if err != nil {
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
}
telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
require.NoError(t, err)
cols := make([]cmock.ColumnType, 0)
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"})
@@ -1328,12 +1328,12 @@ func TestThresholdRuleNoData(t *testing.T) {
for idx, c := range cases {
rows := cmock.NewRows(cols, c.values)
mock.ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
// We are testing the eval logic after the query is run
// so we don't care about the query string here
queryString := "SELECT any"
mock.
telemetryStore.Mock().
ExpectQuery(queryString).
WillReturnRows(rows)
var target float64 = 0
@@ -1346,7 +1346,7 @@ func TestThresholdRuleNoData(t *testing.T) {
}
readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), readerCache)
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), readerCache)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
@@ -1410,10 +1410,8 @@ func TestThresholdRuleTracesLink(t *testing.T) {
},
}
fm := featureManager.StartManager()
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{})
if err != nil {
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
}
telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
require.NoError(t, err)
metaCols := make([]cmock.ColumnType, 0)
metaCols = append(metaCols, cmock.ColumnType{Name: "DISTINCT(tagKey)", Type: "String"})
@@ -1428,11 +1426,11 @@ func TestThresholdRuleTracesLink(t *testing.T) {
for idx, c := range testCases {
metaRows := cmock.NewRows(metaCols, c.metaValues)
mock.
telemetryStore.Mock().
ExpectQuery("SELECT DISTINCT(tagKey), tagType, dataType FROM archiveNamespace.span_attributes_keys").
WillReturnRows(metaRows)
mock.
telemetryStore.Mock().
ExpectSelect("SHOW CREATE TABLE signoz_traces.distributed_signoz_index_v3").WillReturnRows(&cmock.Rows{})
rows := cmock.NewRows(cols, c.values)
@@ -1440,7 +1438,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
// We are testing the eval logic after the query is run
// so we don't care about the query string here
queryString := "SELECT any"
mock.
telemetryStore.Mock().
ExpectQuery(queryString).
WillReturnRows(rows)
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
@@ -1454,7 +1452,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), nil)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
@@ -1523,10 +1521,8 @@ func TestThresholdRuleLogsLink(t *testing.T) {
},
}
fm := featureManager.StartManager()
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{})
if err != nil {
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
}
telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
require.NoError(t, err)
attrMetaCols := make([]cmock.ColumnType, 0)
attrMetaCols = append(attrMetaCols, cmock.ColumnType{Name: "name", Type: "String"})
@@ -1546,17 +1542,17 @@ func TestThresholdRuleLogsLink(t *testing.T) {
for idx, c := range testCases {
attrMetaRows := cmock.NewRows(attrMetaCols, c.attrMetaValues)
mock.
telemetryStore.Mock().
ExpectSelect("SELECT DISTINCT name, datatype from signoz_logs.distributed_logs_attribute_keys group by name, datatype").
WillReturnRows(attrMetaRows)
resourceMetaRows := cmock.NewRows(resourceMetaCols, c.resourceMetaValues)
mock.
telemetryStore.Mock().
ExpectSelect("SELECT DISTINCT name, datatype from signoz_logs.distributed_logs_resource_keys group by name, datatype").
WillReturnRows(resourceMetaRows)
createTableRows := cmock.NewRows(createTableCols, c.createTableValues)
mock.
telemetryStore.Mock().
ExpectSelect("SHOW CREATE TABLE signoz_logs.logs").
WillReturnRows(createTableRows)
@@ -1565,7 +1561,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
// We are testing the eval logic after the query is run
// so we don't care about the query string here
queryString := "SELECT any"
mock.
telemetryStore.Mock().
ExpectQuery(queryString).
WillReturnRows(rows)
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
@@ -1579,7 +1575,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), nil)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{

View File

@@ -20,6 +20,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/dao"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/google/uuid"
@@ -39,14 +40,14 @@ func NewMockClickhouseReader(
) {
require.NotNil(t, testDB)
mockDB, err := mockhouse.NewClickHouseWithQueryMatcher(nil, sqlmock.QueryMatcherRegexp)
telemetryStore, err := telemetrystoretest.New(sqlmock.QueryMatcherRegexp)
require.NoError(t, err)
require.Nil(t, err, "could not init mock clickhouse")
reader := clickhouseReader.NewReaderFromClickhouseConnection(
mockDB,
clickhouseReader.NewOptions("", ""),
testDB,
"",
telemetryStore,
featureFlags,
"",
true,
@@ -55,7 +56,7 @@ func NewMockClickhouseReader(
nil,
)
return reader, mockDB
return reader, telemetryStore.Mock()
}
func addLogsQueryExpectation(

View File

@@ -20,6 +20,8 @@ import (
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/version"
"github.com/SigNoz/signoz/pkg/web"
"github.com/go-kit/log"
promconfig "github.com/prometheus/prometheus/config"
)
// Config defines the entire input configuration of signoz.
@@ -49,7 +51,7 @@ type Config struct {
APIServer apiserver.Config `mapstructure:"apiserver"`
// TelemetryStore config
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"`
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore" yaml:"telemetrystore"`
// Alertmanager config
Alertmanager alertmanager.Config `mapstructure:"alertmanager" yaml:"alertmanager"`
@@ -61,6 +63,7 @@ type DeprecatedFlags struct {
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
Config string
}
func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprecatedFlags DeprecatedFlags) (Config, error) {
@@ -145,7 +148,7 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
if os.Getenv("ClickHouseUrl") != "" {
fmt.Println("[Deprecated] env ClickHouseUrl is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN instead.")
config.TelemetryStore.ClickHouse.DSN = os.Getenv("ClickHouseUrl")
config.TelemetryStore.Clickhouse.DSN = os.Getenv("ClickHouseUrl")
}
if deprecatedFlags.MaxIdleConns != 50 {
@@ -176,4 +179,18 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
if os.Getenv("ALERTMANAGER_API_CHANNEL_PATH") != "" {
fmt.Println("[Deprecated] env ALERTMANAGER_API_CHANNEL_PATH is deprecated and scheduled for complete removal.")
}
if deprecatedFlags.Config != "" {
fmt.Println("[Deprecated] flag --config is deprecated for passing prometheus config. Please use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_PROMETHEUS_REMOTE__READ_URL instead. The flag will be used for passing the entire SigNoz config. More details can be found at https://github.com/SigNoz/signoz/issues/6805.")
cfg, err := promconfig.LoadFile(deprecatedFlags.Config, false, false, log.NewNopLogger())
if err != nil {
fmt.Println("Error parsing config, using value set in SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_PROMETHEUS_REMOTE__READ_URL")
} else {
if len(cfg.RemoteReadConfigs) != 1 {
fmt.Println("Error finding remote read config, using value set in SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_PROMETHEUS_REMOTE__READ_URL")
} else {
config.TelemetryStore.Clickhouse.Prometheus.RemoteReadConfig.URL = cfg.RemoteReadConfigs[0].URL.URL
}
}
}
}

View File

@@ -67,7 +67,7 @@ func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedM
func NewTelemetryStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]] {
return factory.MustNewNamedMap(
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewFactory()),
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewSettingsFactory()),
)
}

View File

@@ -6,13 +6,15 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/promengine"
"github.com/SigNoz/signoz/pkg/telemetrystore"
)
type provider struct {
settings factory.ScopedProviderSettings
clickHouseConn clickhouse.Conn
hooks []telemetrystore.TelemetryStoreHook
settings factory.ScopedProviderSettings
clickHouseConn clickhouse.Conn
prometheusEngine promengine.PromEngine
hooks []telemetrystore.TelemetryStoreHook
}
func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config]) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
@@ -33,7 +35,7 @@ func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.Telemetr
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hooks ...telemetrystore.TelemetryStoreHook) (telemetrystore.TelemetryStore, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/telemetrystore/clickhousetelemetrystore")
options, err := clickhouse.ParseDSN(config.ClickHouse.DSN)
options, err := clickhouse.ParseDSN(config.Clickhouse.DSN)
if err != nil {
return nil, err
}
@@ -46,75 +48,85 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
return nil, err
}
engine, err := promengine.New(settings.Logger(), config.Clickhouse.Prometheus)
if err != nil {
return nil, err
}
return &provider{
settings: settings,
clickHouseConn: chConn,
hooks: hooks,
settings: settings,
clickHouseConn: chConn,
hooks: hooks,
prometheusEngine: engine,
}, nil
}
func (p *provider) ClickHouseDB() clickhouse.Conn {
func (p *provider) ClickhouseDB() clickhouse.Conn {
return p
}
func (p provider) Close() error {
func (p *provider) PrometheusEngine() promengine.PromEngine {
return p.prometheusEngine
}
func (p *provider) Close() error {
return p.clickHouseConn.Close()
}
func (p provider) Ping(ctx context.Context) error {
func (p *provider) Ping(ctx context.Context) error {
return p.clickHouseConn.Ping(ctx)
}
func (p provider) Stats() driver.Stats {
func (p *provider) Stats() driver.Stats {
return p.clickHouseConn.Stats()
}
func (p provider) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
func (p *provider) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
rows, err := p.clickHouseConn.Query(ctx, query, args...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, rows, err)
return rows, err
}
func (p provider) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
func (p *provider) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
row := p.clickHouseConn.QueryRow(ctx, query, args...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, nil)
return row
}
func (p provider) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
func (p *provider) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
err := p.clickHouseConn.Select(ctx, dest, query, args...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
return err
}
func (p provider) Exec(ctx context.Context, query string, args ...interface{}) error {
func (p *provider) Exec(ctx context.Context, query string, args ...interface{}) error {
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
err := p.clickHouseConn.Exec(ctx, query, args...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
return err
}
func (p provider) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
func (p *provider) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
err := p.clickHouseConn.AsyncInsert(ctx, query, wait, args...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
return err
}
func (p provider) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
func (p *provider) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
ctx, query, args := telemetrystore.WrapBeforeQuery(p.hooks, ctx, query)
batch, err := p.clickHouseConn.PrepareBatch(ctx, query, opts...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
return batch, err
}
func (p provider) ServerVersion() (*driver.ServerVersion, error) {
func (p *provider) ServerVersion() (*driver.ServerVersion, error) {
return p.clickHouseConn.ServerVersion()
}
func (p provider) Contributors() []string {
func (p *provider) Contributors() []string {
return p.clickHouseConn.Contributors()
}

View File

@@ -2,28 +2,36 @@ package telemetrystore
import (
"fmt"
"net/url"
"time"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/promengine"
)
type Config struct {
// Provider is the provider to use
Provider string `mapstructure:"provider"`
// Connection is the connection configuration
Connection ConnectionConfig `mapstructure:",squash"`
// Clickhouse is the clickhouse configuration
ClickHouse ClickHouseConfig `mapstructure:"clickhouse"`
Clickhouse ClickhouseConfig `mapstructure:"clickhouse"`
}
type ConnectionConfig struct {
// MaxOpenConns is the maximum number of open connections to the database.
MaxOpenConns int `mapstructure:"max_open_conns"`
MaxIdleConns int `mapstructure:"max_idle_conns"`
DialTimeout time.Duration `mapstructure:"dial_timeout"`
MaxOpenConns int `mapstructure:"max_open_conns"`
// MaxIdleConns is the maximum number of connections in the idle connection pool.
MaxIdleConns int `mapstructure:"max_idle_conns"`
// DialTimeout is the timeout for dialing a new connection.
DialTimeout time.Duration `mapstructure:"dial_timeout"`
}
type ClickHouseQuerySettings struct {
type QuerySettings struct {
MaxExecutionTime int `mapstructure:"max_execution_time"`
MaxExecutionTimeLeaf int `mapstructure:"max_execution_time_leaf"`
TimeoutBeforeCheckingExecutionSpeed int `mapstructure:"timeout_before_checking_execution_speed"`
@@ -31,15 +39,19 @@ type ClickHouseQuerySettings struct {
MaxResultRowsForCHQuery int `mapstructure:"max_result_rows_for_ch_query"`
}
type ClickHouseConfig struct {
type ClickhouseConfig struct {
// DSN is the database source name.
DSN string `mapstructure:"dsn"`
QuerySettings ClickHouseQuerySettings `mapstructure:"settings"`
// QuerySettings is the query settings for clickhouse.
QuerySettings QuerySettings `mapstructure:"settings"`
// Prometheus is the prometheus configuration
Prometheus promengine.Config `mapstructure:"prometheus"`
}
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("telemetrystore"), newConfig)
}
func newConfig() factory.Config {
@@ -50,15 +62,39 @@ func newConfig() factory.Config {
MaxIdleConns: 50,
DialTimeout: 5 * time.Second,
},
ClickHouse: ClickHouseConfig{
Clickhouse: ClickhouseConfig{
DSN: "tcp://localhost:9000",
Prometheus: promengine.Config{
RemoteReadConfig: promengine.RemoteReadConfig{
URL: &url.URL{
Scheme: "tcp",
Host: "localhost:9000",
Path: "/signoz_metrics",
},
},
ActiveQueryTrackerConfig: promengine.ActiveQueryTrackerConfig{
Enabled: true,
Path: "",
MaxConcurrent: 20,
},
},
},
}
}
func (c Config) Validate() error {
if c.Provider != "clickhouse" {
return fmt.Errorf("provider: %q is not supported", c.Provider)
dsn, err := url.Parse(c.Clickhouse.DSN)
if err != nil {
return err
}
if c.Clickhouse.Prometheus.RemoteReadConfig.URL.Host != dsn.Host {
return fmt.Errorf("mismatch between host in prometheus.remote_read.url %q and clickhouse.dsn %q", c.Clickhouse.Prometheus.RemoteReadConfig.URL.Host, dsn.Host)
}
if c.Clickhouse.Prometheus.RemoteReadConfig.URL.Scheme != dsn.Scheme {
return fmt.Errorf("mismatch between scheme in prometheus.remote_read.url %q and clickhouse.dsn %q", c.Clickhouse.Prometheus.RemoteReadConfig.URL.Scheme, dsn.Scheme)
}
return nil

View File

@@ -13,7 +13,7 @@ import (
)
func TestNewWithEnvProvider(t *testing.T) {
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN", "http://localhost:9000")
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN", "tcp://localhost:9000")
t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS", "60")
t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS", "150")
t.Setenv("SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT", "5s")
@@ -33,22 +33,18 @@ func TestNewWithEnvProvider(t *testing.T) {
)
require.NoError(t, err)
actual := &Config{}
err = conf.Unmarshal("telemetrystore", actual)
actual := Config{}
err = conf.Unmarshal("telemetrystore", &actual)
require.NoError(t, err)
expected := &Config{
Provider: "clickhouse",
Connection: ConnectionConfig{
MaxOpenConns: 150,
MaxIdleConns: 60,
DialTimeout: 5 * time.Second,
},
ClickHouse: ClickHouseConfig{
DSN: "http://localhost:9000",
},
}
assert.NoError(t, actual.Validate())
expected := NewConfigFactory().New().(Config)
expected.Provider = "clickhouse"
expected.Connection.MaxOpenConns = 150
expected.Connection.MaxIdleConns = 60
expected.Connection.DialTimeout = 5 * time.Second
expected.Clickhouse.DSN = "tcp://localhost:9000"
assert.Equal(t, expected, actual)
}
@@ -74,14 +70,14 @@ func TestNewWithEnvProviderWithQuerySettings(t *testing.T) {
)
require.NoError(t, err)
actual := &Config{}
err = conf.Unmarshal("telemetrystore", actual)
actual := Config{}
err = conf.Unmarshal("telemetrystore", &actual)
require.NoError(t, err)
expected := &Config{
ClickHouse: ClickHouseConfig{
QuerySettings: ClickHouseQuerySettings{
expected := Config{
Clickhouse: ClickhouseConfig{
QuerySettings: QuerySettings{
MaxExecutionTime: 10,
MaxExecutionTimeLeaf: 10,
TimeoutBeforeCheckingExecutionSpeed: 10,
@@ -91,5 +87,5 @@ func TestNewWithEnvProviderWithQuerySettings(t *testing.T) {
},
}
assert.Equal(t, expected.ClickHouse.QuerySettings, actual.ClickHouse.QuerySettings)
assert.Equal(t, expected.Clickhouse.QuerySettings, actual.Clickhouse.QuerySettings)
}

View File

@@ -5,10 +5,12 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/SigNoz/signoz/pkg/promengine"
)
type TelemetryStore interface {
ClickHouseDB() clickhouse.Conn
ClickhouseDB() clickhouse.Conn
PrometheusEngine() promengine.PromEngine
}
type TelemetryStoreHook interface {

View File

@@ -12,29 +12,20 @@ import (
)
type provider struct {
settings telemetrystore.ClickHouseQuerySettings
settings telemetrystore.QuerySettings
}
func NewFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
return factory.NewProviderFactory(factory.MustNewName("clickhousesettings"), New)
func NewSettingsFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
return factory.NewProviderFactory(factory.MustNewName("settings"), NewSettings)
}
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
func NewSettings(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
return &provider{
settings: config.ClickHouse.QuerySettings,
settings: config.Clickhouse.QuerySettings,
}, nil
}
func (h *provider) BeforeQuery(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) {
return h.clickHouseSettings(ctx, query, args...)
}
func (h *provider) AfterQuery(ctx context.Context, query string, args []interface{}, rows driver.Rows, err error) {
return
}
// clickHouseSettings adds clickhouse settings to queries
func (h *provider) clickHouseSettings(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) {
settings := clickhouse.Settings{}
// Apply default settings
@@ -73,6 +64,9 @@ func (h *provider) clickHouseSettings(ctx context.Context, query string, args ..
return ctx, query, args
}
func (h *provider) AfterQuery(ctx context.Context, query string, args []interface{}, rows driver.Rows, err error) {
}
func (h *provider) getLogComment(ctx context.Context) string {
// Get the key-value pairs from context for log comment
kv := ctx.Value(common.LogCommentKey)

View File

@@ -2,6 +2,9 @@ package telemetrystoretest
import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/DATA-DOG/go-sqlmock"
"github.com/SigNoz/signoz/pkg/promengine"
"github.com/SigNoz/signoz/pkg/promengine/promenginetest"
"github.com/SigNoz/signoz/pkg/telemetrystore"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
)
@@ -10,28 +13,38 @@ var _ telemetrystore.TelemetryStore = (*Provider)(nil)
// Provider represents a mock telemetry store provider for testing
type Provider struct {
mock cmock.ClickConnMockCommon
clickhouseDB cmock.ClickConnMockCommon
engine promengine.PromEngine
}
// New creates a new mock telemetry store provider
func New() (*Provider, error) {
options := &clickhouse.Options{} // Default options
mock, err := cmock.NewClickHouseNative(options)
func New(matcher sqlmock.QueryMatcher) (*Provider, error) {
clickhouseDB, err := cmock.NewClickHouseWithQueryMatcher(&clickhouse.Options{}, matcher)
if err != nil {
return nil, err
}
engine, err := promenginetest.New()
if err != nil {
return nil, err
}
return &Provider{
mock: mock,
clickhouseDB: clickhouseDB,
engine: engine,
}, nil
}
func (p *Provider) PrometheusEngine() promengine.PromEngine {
return p.engine
}
// ClickhouseDB returns the mock Clickhouse connection
func (p *Provider) ClickHouseDB() clickhouse.Conn {
return p.mock.(clickhouse.Conn)
func (p *Provider) ClickhouseDB() clickhouse.Conn {
return p.clickhouseDB.(clickhouse.Conn)
}
// Mock returns the underlying Clickhouse mock instance for setting expectations
func (p *Provider) Mock() cmock.ClickConnMockCommon {
return p.mock
return p.clickhouseDB
}

View File

@@ -3,6 +3,7 @@ package telemetrystoretest
import (
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/assert"
)
@@ -19,7 +20,7 @@ func TestNew(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider, err := New()
provider, err := New(sqlmock.QueryMatcherRegexp)
if tt.wantErr {
assert.Error(t, err)
assert.Nil(t, provider)
@@ -29,7 +30,8 @@ func TestNew(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, provider)
assert.NotNil(t, provider.Mock())
assert.NotNil(t, provider.ClickHouseDB())
assert.NotNil(t, provider.ClickhouseDB())
assert.NotNil(t, provider.PrometheusEngine())
})
}
}