Compare commits
8 Commits
fix/query-
...
v0.77.0-ec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ecbe0b82de | ||
|
|
4085558281 | ||
|
|
93428970ab | ||
|
|
293047b9e6 | ||
|
|
9272d2bb6d | ||
|
|
22ecdc9a0c | ||
|
|
3b1db1f6ea | ||
|
|
0a1338bca9 |
4
Makefile
4
Makefile
@@ -74,6 +74,10 @@ go-run-enterprise: ## Runs the enterprise go backend server
|
|||||||
--use-logs-new-schema true \
|
--use-logs-new-schema true \
|
||||||
--use-trace-new-schema true
|
--use-trace-new-schema true
|
||||||
|
|
||||||
|
.PHONY: go-test
|
||||||
|
go-test: ## Runs go unit tests
|
||||||
|
@go test -race ./...
|
||||||
|
|
||||||
.PHONY: go-run-community
|
.PHONY: go-run-community
|
||||||
go-run-community: ## Runs the community go backend server
|
go-run-community: ## Runs the community go backend server
|
||||||
@SIGNOZ_INSTRUMENTATION_LOGS_LEVEL=debug \
|
@SIGNOZ_INSTRUMENTATION_LOGS_LEVEL=debug \
|
||||||
|
|||||||
@@ -72,7 +72,6 @@ sqlstore:
|
|||||||
# The path to the SQLite database file.
|
# The path to the SQLite database file.
|
||||||
path: /var/lib/signoz/signoz.db
|
path: /var/lib/signoz/signoz.db
|
||||||
|
|
||||||
|
|
||||||
##################### APIServer #####################
|
##################### APIServer #####################
|
||||||
apiserver:
|
apiserver:
|
||||||
timeout:
|
timeout:
|
||||||
@@ -91,20 +90,30 @@ apiserver:
|
|||||||
- /api/v1/version
|
- /api/v1/version
|
||||||
- /
|
- /
|
||||||
|
|
||||||
|
|
||||||
##################### TelemetryStore #####################
|
##################### TelemetryStore #####################
|
||||||
telemetrystore:
|
telemetrystore:
|
||||||
# Specifies the telemetrystore provider to use.
|
|
||||||
provider: clickhouse
|
|
||||||
# Maximum number of idle connections in the connection pool.
|
# Maximum number of idle connections in the connection pool.
|
||||||
max_idle_conns: 50
|
max_idle_conns: 50
|
||||||
# Maximum number of open connections to the database.
|
# Maximum number of open connections to the database.
|
||||||
max_open_conns: 100
|
max_open_conns: 100
|
||||||
# Maximum time to wait for a connection to be established.
|
# Maximum time to wait for a connection to be established.
|
||||||
dial_timeout: 5s
|
dial_timeout: 5s
|
||||||
|
# Specifies the telemetrystore provider to use.
|
||||||
|
provider: clickhouse
|
||||||
clickhouse:
|
clickhouse:
|
||||||
# The DSN to use for ClickHouse.
|
# The DSN to use for clickhouse.
|
||||||
dsn: http://localhost:9000
|
dsn: tcp://localhost:9000
|
||||||
|
prometheus:
|
||||||
|
remote_read:
|
||||||
|
# The URL to use for remote read. It must be a clickhouse dsn pointing to the database where metrics is stored.
|
||||||
|
url: tcp://localhost:9000/signoz_metrics
|
||||||
|
active_query_tracker:
|
||||||
|
# Whether to enable the active query tracker.
|
||||||
|
enabled: true
|
||||||
|
# The path to use for the active query tracker.
|
||||||
|
path: ""
|
||||||
|
# The maximum number of concurrent queries.
|
||||||
|
max_concurrent: 20
|
||||||
|
|
||||||
##################### Alertmanager #####################
|
##################### Alertmanager #####################
|
||||||
alertmanager:
|
alertmanager:
|
||||||
@@ -117,7 +126,7 @@ alertmanager:
|
|||||||
# The poll interval for periodically syncing the alertmanager with the config in the store.
|
# The poll interval for periodically syncing the alertmanager with the config in the store.
|
||||||
poll_interval: 1m
|
poll_interval: 1m
|
||||||
# The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself.
|
# The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself.
|
||||||
external_url: http://localhost:9093
|
external_url: http://localhost:8080
|
||||||
# The global configuration for the alertmanager. All the exahustive fields can be found in the upstream: https://github.com/prometheus/alertmanager/blob/efa05feffd644ba4accb526e98a8c6545d26a783/config/config.go#L833
|
# The global configuration for the alertmanager. All the exahustive fields can be found in the upstream: https://github.com/prometheus/alertmanager/blob/efa05feffd644ba4accb526e98a8c6545d26a783/config/config.go#L833
|
||||||
global:
|
global:
|
||||||
# ResolveTimeout is the time after which an alert is declared resolved if it has not been updated.
|
# ResolveTimeout is the time after which an alert is declared resolved if it has not been updated.
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/cache"
|
"github.com/SigNoz/signoz/pkg/cache"
|
||||||
basechr "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
basechr "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||||
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClickhouseReader struct {
|
type ClickhouseReader struct {
|
||||||
@@ -20,8 +21,7 @@ type ClickhouseReader struct {
|
|||||||
|
|
||||||
func NewDataConnector(
|
func NewDataConnector(
|
||||||
localDB *sqlx.DB,
|
localDB *sqlx.DB,
|
||||||
ch clickhouse.Conn,
|
telemetryStore telemetrystore.TelemetryStore,
|
||||||
promConfigPath string,
|
|
||||||
lm interfaces.FeatureLookup,
|
lm interfaces.FeatureLookup,
|
||||||
cluster string,
|
cluster string,
|
||||||
useLogsNewSchema bool,
|
useLogsNewSchema bool,
|
||||||
@@ -29,14 +29,10 @@ func NewDataConnector(
|
|||||||
fluxIntervalForTraceDetail time.Duration,
|
fluxIntervalForTraceDetail time.Duration,
|
||||||
cache cache.Cache,
|
cache cache.Cache,
|
||||||
) *ClickhouseReader {
|
) *ClickhouseReader {
|
||||||
chReader := basechr.NewReader(localDB, ch, promConfigPath, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
|
chReader := basechr.NewReader(localDB, telemetryStore, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
|
||||||
return &ClickhouseReader{
|
return &ClickhouseReader{
|
||||||
conn: ch,
|
conn: telemetryStore.ClickhouseDB(),
|
||||||
appdb: localDB,
|
appdb: localDB,
|
||||||
ClickHouseReader: chReader,
|
ClickHouseReader: chReader,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickhouseReader) Start(readerReady chan bool) {
|
|
||||||
r.ClickHouseReader.Start(readerReady)
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -18,13 +18,13 @@ import (
|
|||||||
"github.com/SigNoz/signoz/ee/query-service/constants"
|
"github.com/SigNoz/signoz/ee/query-service/constants"
|
||||||
"github.com/SigNoz/signoz/ee/query-service/dao"
|
"github.com/SigNoz/signoz/ee/query-service/dao"
|
||||||
"github.com/SigNoz/signoz/ee/query-service/integrations/gateway"
|
"github.com/SigNoz/signoz/ee/query-service/integrations/gateway"
|
||||||
"github.com/SigNoz/signoz/ee/query-service/interfaces"
|
|
||||||
"github.com/SigNoz/signoz/ee/query-service/rules"
|
"github.com/SigNoz/signoz/ee/query-service/rules"
|
||||||
"github.com/SigNoz/signoz/pkg/alertmanager"
|
"github.com/SigNoz/signoz/pkg/alertmanager"
|
||||||
"github.com/SigNoz/signoz/pkg/http/middleware"
|
"github.com/SigNoz/signoz/pkg/http/middleware"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/auth"
|
"github.com/SigNoz/signoz/pkg/query-service/auth"
|
||||||
"github.com/SigNoz/signoz/pkg/signoz"
|
"github.com/SigNoz/signoz/pkg/signoz"
|
||||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||||
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||||
"github.com/SigNoz/signoz/pkg/types"
|
"github.com/SigNoz/signoz/pkg/types"
|
||||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||||
"github.com/SigNoz/signoz/pkg/web"
|
"github.com/SigNoz/signoz/pkg/web"
|
||||||
@@ -49,7 +49,6 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/query-service/healthcheck"
|
"github.com/SigNoz/signoz/pkg/query-service/healthcheck"
|
||||||
baseint "github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
baseint "github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||||
basemodel "github.com/SigNoz/signoz/pkg/query-service/model"
|
basemodel "github.com/SigNoz/signoz/pkg/query-service/model"
|
||||||
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
|
|
||||||
baserules "github.com/SigNoz/signoz/pkg/query-service/rules"
|
baserules "github.com/SigNoz/signoz/pkg/query-service/rules"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/telemetry"
|
"github.com/SigNoz/signoz/pkg/query-service/telemetry"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||||
@@ -137,18 +136,15 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
|
|
||||||
// set license manager as feature flag provider in dao
|
// set license manager as feature flag provider in dao
|
||||||
modelDao.SetFlagProvider(lm)
|
modelDao.SetFlagProvider(lm)
|
||||||
readerReady := make(chan bool)
|
|
||||||
|
|
||||||
fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail)
|
fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var reader interfaces.DataConnector
|
reader := db.NewDataConnector(
|
||||||
qb := db.NewDataConnector(
|
|
||||||
serverOptions.SigNoz.SQLStore.SQLxDB(),
|
serverOptions.SigNoz.SQLStore.SQLxDB(),
|
||||||
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
|
serverOptions.SigNoz.TelemetryStore,
|
||||||
serverOptions.PromConfigPath,
|
|
||||||
lm,
|
lm,
|
||||||
serverOptions.Cluster,
|
serverOptions.Cluster,
|
||||||
serverOptions.UseLogsNewSchema,
|
serverOptions.UseLogsNewSchema,
|
||||||
@@ -156,8 +152,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
fluxIntervalForTraceDetail,
|
fluxIntervalForTraceDetail,
|
||||||
serverOptions.SigNoz.Cache,
|
serverOptions.SigNoz.Cache,
|
||||||
)
|
)
|
||||||
go qb.Start(readerReady)
|
|
||||||
reader = qb
|
|
||||||
|
|
||||||
skipConfig := &basemodel.SkipConfig{}
|
skipConfig := &basemodel.SkipConfig{}
|
||||||
if serverOptions.SkipTopLvlOpsPath != "" {
|
if serverOptions.SkipTopLvlOpsPath != "" {
|
||||||
@@ -176,9 +170,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
c = cache.NewCache(cacheOpts)
|
c = cache.NewCache(cacheOpts)
|
||||||
}
|
}
|
||||||
|
|
||||||
<-readerReady
|
|
||||||
rm, err := makeRulesManager(
|
rm, err := makeRulesManager(
|
||||||
serverOptions.PromConfigPath,
|
|
||||||
serverOptions.RuleRepoURL,
|
serverOptions.RuleRepoURL,
|
||||||
serverOptions.SigNoz.SQLStore.SQLxDB(),
|
serverOptions.SigNoz.SQLStore.SQLxDB(),
|
||||||
reader,
|
reader,
|
||||||
@@ -189,6 +181,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
serverOptions.UseTraceNewSchema,
|
serverOptions.UseTraceNewSchema,
|
||||||
serverOptions.SigNoz.Alertmanager,
|
serverOptions.SigNoz.Alertmanager,
|
||||||
serverOptions.SigNoz.SQLStore,
|
serverOptions.SigNoz.SQLStore,
|
||||||
|
serverOptions.SigNoz.TelemetryStore,
|
||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -233,7 +226,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// start the usagemanager
|
// start the usagemanager
|
||||||
usageManager, err := usage.New(modelDao, lm.GetRepo(), serverOptions.SigNoz.TelemetryStore.ClickHouseDB(), serverOptions.Config.TelemetryStore.ClickHouse.DSN)
|
usageManager, err := usage.New(modelDao, lm.GetRepo(), serverOptions.SigNoz.TelemetryStore.ClickhouseDB(), serverOptions.Config.TelemetryStore.Clickhouse.DSN)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -304,7 +297,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
&opAmpModel.AllAgents, agentConfMgr,
|
&opAmpModel.AllAgents, agentConfMgr,
|
||||||
)
|
)
|
||||||
|
|
||||||
errorList := qb.PreloadMetricsMetadata(context.Background())
|
errorList := reader.PreloadMetricsMetadata(context.Background())
|
||||||
for _, er := range errorList {
|
for _, er := range errorList {
|
||||||
zap.L().Error("failed to preload metrics metadata", zap.Error(er))
|
zap.L().Error("failed to preload metrics metadata", zap.Error(er))
|
||||||
}
|
}
|
||||||
@@ -538,7 +531,6 @@ func (s *Server) Stop() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func makeRulesManager(
|
func makeRulesManager(
|
||||||
promConfigPath,
|
|
||||||
ruleRepoURL string,
|
ruleRepoURL string,
|
||||||
db *sqlx.DB,
|
db *sqlx.DB,
|
||||||
ch baseint.Reader,
|
ch baseint.Reader,
|
||||||
@@ -549,16 +541,11 @@ func makeRulesManager(
|
|||||||
useTraceNewSchema bool,
|
useTraceNewSchema bool,
|
||||||
alertmanager alertmanager.Alertmanager,
|
alertmanager alertmanager.Alertmanager,
|
||||||
sqlstore sqlstore.SQLStore,
|
sqlstore sqlstore.SQLStore,
|
||||||
|
telemetryStore telemetrystore.TelemetryStore,
|
||||||
) (*baserules.Manager, error) {
|
) (*baserules.Manager, error) {
|
||||||
// create engine
|
|
||||||
pqle, err := pqle.FromConfigPath(promConfigPath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create pql engine : %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// create manager opts
|
// create manager opts
|
||||||
managerOpts := &baserules.ManagerOptions{
|
managerOpts := &baserules.ManagerOptions{
|
||||||
PqlEngine: pqle,
|
TelemetryStore: telemetryStore,
|
||||||
RepoURL: ruleRepoURL,
|
RepoURL: ruleRepoURL,
|
||||||
DBConn: db,
|
DBConn: db,
|
||||||
Context: context.Background(),
|
Context: context.Background(),
|
||||||
|
|||||||
@@ -7,6 +7,5 @@ import (
|
|||||||
// Connector defines methods for interaction
|
// Connector defines methods for interaction
|
||||||
// with o11y data. for example - clickhouse
|
// with o11y data. for example - clickhouse
|
||||||
type DataConnector interface {
|
type DataConnector interface {
|
||||||
Start(readerReady chan bool)
|
|
||||||
baseint.Reader
|
baseint.Reader
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,8 +16,6 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||||
"github.com/SigNoz/signoz/pkg/version"
|
"github.com/SigNoz/signoz/pkg/version"
|
||||||
|
|
||||||
prommodel "github.com/prometheus/common/model"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
)
|
)
|
||||||
@@ -30,10 +28,6 @@ func initZapLog() *zap.Logger {
|
|||||||
return logger
|
return logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
|
||||||
prommodel.NameValidationScheme = prommodel.UTF8Validation
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
var promConfigPath, skipTopLvlOpsPath string
|
var promConfigPath, skipTopLvlOpsPath string
|
||||||
|
|
||||||
@@ -87,6 +81,7 @@ func main() {
|
|||||||
MaxIdleConns: maxIdleConns,
|
MaxIdleConns: maxIdleConns,
|
||||||
MaxOpenConns: maxOpenConns,
|
MaxOpenConns: maxOpenConns,
|
||||||
DialTimeout: dialTimeout,
|
DialTimeout: dialTimeout,
|
||||||
|
Config: promConfigPath,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Fatal("Failed to create config", zap.Error(err))
|
zap.L().Fatal("Failed to create config", zap.Error(err))
|
||||||
@@ -103,7 +98,7 @@ func main() {
|
|||||||
signoz.NewTelemetryStoreProviderFactories(),
|
signoz.NewTelemetryStoreProviderFactories(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Fatal("Failed to create signoz struct", zap.Error(err))
|
zap.L().Fatal("Failed to create signoz", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
jwtSecret := os.Getenv("SIGNOZ_JWT_SECRET")
|
jwtSecret := os.Getenv("SIGNOZ_JWT_SECRET")
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
|||||||
opts.Rule,
|
opts.Rule,
|
||||||
opts.Logger,
|
opts.Logger,
|
||||||
opts.Reader,
|
opts.Reader,
|
||||||
opts.ManagerOpts.PqlEngine,
|
opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
|
||||||
baserules.WithSQLStore(opts.SQLStore),
|
baserules.WithSQLStore(opts.SQLStore),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -145,7 +145,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
|
|||||||
parsedRule,
|
parsedRule,
|
||||||
opts.Logger,
|
opts.Logger,
|
||||||
opts.Reader,
|
opts.Reader,
|
||||||
opts.ManagerOpts.PqlEngine,
|
opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
|
||||||
baserules.WithSendAlways(),
|
baserules.WithSendAlways(),
|
||||||
baserules.WithSendUnmatched(),
|
baserules.WithSendUnmatched(),
|
||||||
baserules.WithSQLStore(opts.SQLStore),
|
baserules.WithSQLStore(opts.SQLStore),
|
||||||
|
|||||||
4
go.mod
4
go.mod
@@ -34,7 +34,6 @@ require (
|
|||||||
github.com/knadh/koanf/v2 v2.1.1
|
github.com/knadh/koanf/v2 v2.1.1
|
||||||
github.com/mailru/easyjson v0.7.7
|
github.com/mailru/easyjson v0.7.7
|
||||||
github.com/mattn/go-sqlite3 v1.14.24
|
github.com/mattn/go-sqlite3 v1.14.24
|
||||||
github.com/oklog/oklog v0.3.2
|
|
||||||
github.com/open-telemetry/opamp-go v0.5.0
|
github.com/open-telemetry/opamp-go v0.5.0
|
||||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.111.0
|
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.111.0
|
||||||
github.com/opentracing/opentracing-go v1.2.0
|
github.com/opentracing/opentracing-go v1.2.0
|
||||||
@@ -93,6 +92,7 @@ require (
|
|||||||
github.com/armon/go-metrics v0.4.1 // indirect
|
github.com/armon/go-metrics v0.4.1 // indirect
|
||||||
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
|
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
|
||||||
github.com/aws/aws-sdk-go v1.55.5 // indirect
|
github.com/aws/aws-sdk-go v1.55.5 // indirect
|
||||||
|
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect
|
||||||
github.com/beevik/etree v1.1.0 // indirect
|
github.com/beevik/etree v1.1.0 // indirect
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
github.com/beorn7/perks v1.0.1 // indirect
|
||||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
|
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
|
||||||
@@ -132,6 +132,7 @@ require (
|
|||||||
github.com/golang/protobuf v1.5.4 // indirect
|
github.com/golang/protobuf v1.5.4 // indirect
|
||||||
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
|
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
|
||||||
github.com/google/btree v1.0.1 // indirect
|
github.com/google/btree v1.0.1 // indirect
|
||||||
|
github.com/google/go-cmp v0.6.0 // indirect
|
||||||
github.com/google/s2a-go v0.1.8 // indirect
|
github.com/google/s2a-go v0.1.8 // indirect
|
||||||
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
|
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
|
||||||
github.com/gopherjs/gopherjs v1.17.2 // indirect
|
github.com/gopherjs/gopherjs v1.17.2 // indirect
|
||||||
@@ -262,6 +263,7 @@ require (
|
|||||||
go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect
|
go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect
|
||||||
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
|
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
|
||||||
go.uber.org/atomic v1.11.0 // indirect
|
go.uber.org/atomic v1.11.0 // indirect
|
||||||
|
go.uber.org/goleak v1.3.0 // indirect
|
||||||
golang.org/x/mod v0.22.0 // indirect
|
golang.org/x/mod v0.22.0 // indirect
|
||||||
golang.org/x/net v0.33.0 // indirect
|
golang.org/x/net v0.33.0 // indirect
|
||||||
golang.org/x/sys v0.29.0 // indirect
|
golang.org/x/sys v0.29.0 // indirect
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -690,8 +690,6 @@ github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnu
|
|||||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||||
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
||||||
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
|
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
|
||||||
github.com/oklog/oklog v0.3.2 h1:wVfs8F+in6nTBMkA7CbRw+zZMIB7nNM825cM1wuzoTk=
|
|
||||||
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
|
|
||||||
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
|
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
|
||||||
github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
|
github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
|
||||||
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
|
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
|
||||||
|
|||||||
@@ -1,10 +1,15 @@
|
|||||||
package instrumentation
|
package instrumentation
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/SigNoz/signoz/pkg/instrumentation/loghandler"
|
"github.com/SigNoz/signoz/pkg/instrumentation/loghandler"
|
||||||
|
"github.com/go-kit/log"
|
||||||
|
"github.com/go-kit/log/level"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewLogger(config Config, wrappers ...loghandler.Wrapper) *slog.Logger {
|
func NewLogger(config Config, wrappers ...loghandler.Wrapper) *slog.Logger {
|
||||||
@@ -33,3 +38,65 @@ func NewLogger(config Config, wrappers ...loghandler.Wrapper) *slog.Logger {
|
|||||||
|
|
||||||
return logger
|
return logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ log.Logger = (*gokitLogger)(nil)
|
||||||
|
|
||||||
|
type gokitLogger struct {
|
||||||
|
handler slog.Handler
|
||||||
|
messageKey string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGoKitLoggerFromSlogHandler(handler slog.Handler, messageKey string) log.Logger {
|
||||||
|
return &gokitLogger{handler, messageKey}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *gokitLogger) Log(keyvals ...any) error {
|
||||||
|
var (
|
||||||
|
attrs []slog.Attr
|
||||||
|
message string
|
||||||
|
gkl level.Value
|
||||||
|
)
|
||||||
|
|
||||||
|
for i := 1; i < len(keyvals); i += 2 {
|
||||||
|
// go-kit/log keys don't have to be strings, but slog keys do.
|
||||||
|
// Convert the go-kit key to a string with fmt.Sprint.
|
||||||
|
key, ok := keyvals[i-1].(string)
|
||||||
|
if !ok {
|
||||||
|
key = fmt.Sprint(keyvals[i-1])
|
||||||
|
}
|
||||||
|
|
||||||
|
if l.messageKey != "" && key == l.messageKey {
|
||||||
|
message = fmt.Sprint(keyvals[i])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if l, ok := keyvals[i].(level.Value); ok {
|
||||||
|
gkl = l
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
attrs = append(attrs, slog.Any(key, keyvals[i]))
|
||||||
|
}
|
||||||
|
|
||||||
|
var sl slog.Level
|
||||||
|
if gkl != nil {
|
||||||
|
switch gkl {
|
||||||
|
case level.DebugValue():
|
||||||
|
sl = slog.LevelDebug
|
||||||
|
case level.InfoValue():
|
||||||
|
sl = slog.LevelInfo
|
||||||
|
case level.WarnValue():
|
||||||
|
sl = slog.LevelWarn
|
||||||
|
case level.ErrorValue():
|
||||||
|
sl = slog.LevelError
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !l.handler.Enabled(context.Background(), sl) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
r := slog.NewRecord(time.Now(), sl, message, 0)
|
||||||
|
r.AddAttrs(attrs...)
|
||||||
|
return l.handler.Handle(context.Background(), r)
|
||||||
|
}
|
||||||
|
|||||||
18
pkg/promengine/config.go
Normal file
18
pkg/promengine/config.go
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
package promengine
|
||||||
|
|
||||||
|
import "net/url"
|
||||||
|
|
||||||
|
type ActiveQueryTrackerConfig struct {
|
||||||
|
Enabled bool `mapstructure:"enabled"`
|
||||||
|
Path string `mapstructure:"path"`
|
||||||
|
MaxConcurrent int `mapstructure:"max_concurrent"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type RemoteReadConfig struct {
|
||||||
|
URL *url.URL `mapstructure:"url"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
RemoteReadConfig RemoteReadConfig `mapstructure:"remote_read"`
|
||||||
|
ActiveQueryTrackerConfig ActiveQueryTrackerConfig `mapstructure:"active_query_tracker"`
|
||||||
|
}
|
||||||
87
pkg/promengine/ext.go
Normal file
87
pkg/promengine/ext.go
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
package promengine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/instrumentation"
|
||||||
|
commoncfg "github.com/prometheus/common/config"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/prometheus/prometheus/config"
|
||||||
|
"github.com/prometheus/prometheus/promql"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
"github.com/prometheus/prometheus/storage/remote"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ PromEngine = (*ExtEngine)(nil)
|
||||||
|
|
||||||
|
type ExtEngine struct {
|
||||||
|
engine *promql.Engine
|
||||||
|
fanoutStorage storage.Storage
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(logger *slog.Logger, cfg Config) (*ExtEngine, error) {
|
||||||
|
if logger == nil {
|
||||||
|
return nil, fmt.Errorf("logger is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
gokitLogger := instrumentation.NewGoKitLoggerFromSlogHandler(logger.Handler(), "msg")
|
||||||
|
|
||||||
|
var activeQueryTracker promql.QueryTracker
|
||||||
|
if cfg.ActiveQueryTrackerConfig.Enabled {
|
||||||
|
activeQueryTracker = promql.NewActiveQueryTracker(
|
||||||
|
cfg.ActiveQueryTrackerConfig.Path,
|
||||||
|
cfg.ActiveQueryTrackerConfig.MaxConcurrent,
|
||||||
|
gokitLogger,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
engine := promql.NewEngine(promql.EngineOpts{
|
||||||
|
Logger: gokitLogger,
|
||||||
|
Reg: nil,
|
||||||
|
MaxSamples: 50000000,
|
||||||
|
Timeout: time.Duration(2 * time.Minute),
|
||||||
|
ActiveQueryTracker: activeQueryTracker,
|
||||||
|
})
|
||||||
|
|
||||||
|
remoteStorage := remote.NewStorage(
|
||||||
|
gokitLogger,
|
||||||
|
nil,
|
||||||
|
func() (int64, error) { return int64(model.Latest), nil },
|
||||||
|
"",
|
||||||
|
time.Duration(1*time.Minute),
|
||||||
|
nil,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
fanoutStorage := storage.NewFanout(gokitLogger, remoteStorage)
|
||||||
|
|
||||||
|
config := &config.Config{
|
||||||
|
RemoteReadConfigs: []*config.RemoteReadConfig{
|
||||||
|
{
|
||||||
|
URL: &commoncfg.URL{URL: cfg.RemoteReadConfig.URL},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := config.UnmarshalYAML(func(i interface{}) error { return nil }); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := remoteStorage.ApplyConfig(config); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &ExtEngine{
|
||||||
|
engine: engine,
|
||||||
|
fanoutStorage: fanoutStorage,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (engine *ExtEngine) Engine() *promql.Engine {
|
||||||
|
return engine.engine
|
||||||
|
}
|
||||||
|
|
||||||
|
func (engine *ExtEngine) Storage() storage.Storage {
|
||||||
|
return engine.fanoutStorage
|
||||||
|
}
|
||||||
20
pkg/promengine/promengine.go
Normal file
20
pkg/promengine/promengine.go
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
package promengine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/prometheus/prometheus/promql"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PromEngine interface {
|
||||||
|
// Engine returns the underlying promql engine
|
||||||
|
Engine() *promql.Engine
|
||||||
|
|
||||||
|
// Storage returns the underlying storage
|
||||||
|
Storage() storage.Storage
|
||||||
|
}
|
||||||
|
|
||||||
|
// init initializes the prometheus model with UTF8 validation
|
||||||
|
func init() {
|
||||||
|
model.NameValidationScheme = model.UTF8Validation
|
||||||
|
}
|
||||||
46
pkg/promengine/promenginetest/promengine.go
Normal file
46
pkg/promengine/promenginetest/promengine.go
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
package promenginetest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/promengine"
|
||||||
|
"github.com/prometheus/prometheus/promql"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ promengine.PromEngine = (*Engine)(nil)
|
||||||
|
|
||||||
|
type Engine struct {
|
||||||
|
engine *promql.Engine
|
||||||
|
storage storage.Storage
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(outOfOrderTimeWindow ...int64) (*Engine, error) {
|
||||||
|
engine := promql.NewEngine(promql.EngineOpts{
|
||||||
|
Logger: nil,
|
||||||
|
Reg: nil,
|
||||||
|
MaxSamples: 50000000,
|
||||||
|
Timeout: time.Duration(2 * time.Minute),
|
||||||
|
ActiveQueryTracker: nil,
|
||||||
|
})
|
||||||
|
|
||||||
|
testStorage, err := NewStorageWithError(outOfOrderTimeWindow...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
fanoutStorage := storage.NewFanout(nil, testStorage)
|
||||||
|
|
||||||
|
return &Engine{
|
||||||
|
engine: engine,
|
||||||
|
storage: fanoutStorage,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) Engine() *promql.Engine {
|
||||||
|
return e.engine
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) Storage() storage.Storage {
|
||||||
|
return e.storage
|
||||||
|
}
|
||||||
76
pkg/promengine/promenginetest/storage.go
Normal file
76
pkg/promengine/promenginetest/storage.go
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
package promenginetest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/prometheus/model/exemplar"
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
"github.com/prometheus/prometheus/tsdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TestStorage struct {
|
||||||
|
*tsdb.DB
|
||||||
|
exemplarStorage tsdb.ExemplarStorage
|
||||||
|
dir string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStorageWithError returns a new TestStorage for user facing tests, which reports
|
||||||
|
// errors directly.
|
||||||
|
func NewStorageWithError(outOfOrderTimeWindow ...int64) (*TestStorage, error) {
|
||||||
|
dir, err := os.MkdirTemp("", "test_storage")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("opening test directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tests just load data for a series sequentially. Thus we
|
||||||
|
// need a long appendable window.
|
||||||
|
opts := tsdb.DefaultOptions()
|
||||||
|
opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond)
|
||||||
|
opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond)
|
||||||
|
opts.RetentionDuration = 0
|
||||||
|
opts.EnableNativeHistograms = true
|
||||||
|
|
||||||
|
// Set OutOfOrderTimeWindow if provided, otherwise use default (0)
|
||||||
|
if len(outOfOrderTimeWindow) > 0 {
|
||||||
|
opts.OutOfOrderTimeWindow = outOfOrderTimeWindow[0]
|
||||||
|
} else {
|
||||||
|
opts.OutOfOrderTimeWindow = 0 // Default value is zero
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("opening test storage: %w", err)
|
||||||
|
}
|
||||||
|
reg := prometheus.NewRegistry()
|
||||||
|
eMetrics := tsdb.NewExemplarMetrics(reg)
|
||||||
|
|
||||||
|
es, err := tsdb.NewCircularExemplarStorage(10, eMetrics)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("opening test exemplar storage: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &TestStorage{DB: db, exemplarStorage: es, dir: dir}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s TestStorage) Close() error {
|
||||||
|
if err := s.DB.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return os.RemoveAll(s.dir)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s TestStorage) ExemplarAppender() storage.ExemplarAppender {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s TestStorage) ExemplarQueryable() storage.ExemplarQueryable {
|
||||||
|
return s.exemplarStorage
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s TestStorage) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||||
|
return ref, s.exemplarStorage.AddExemplar(l, e)
|
||||||
|
}
|
||||||
@@ -7,7 +7,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
"regexp"
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
@@ -16,20 +15,16 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/promengine"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/model/metrics_explorer"
|
"github.com/SigNoz/signoz/pkg/query-service/model/metrics_explorer"
|
||||||
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||||
|
|
||||||
"github.com/go-kit/log"
|
|
||||||
"github.com/go-kit/log/level"
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/mailru/easyjson"
|
"github.com/mailru/easyjson"
|
||||||
"github.com/oklog/oklog/pkg/group"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/common/promlog"
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/storage"
|
|
||||||
"github.com/prometheus/prometheus/storage/remote"
|
|
||||||
"github.com/prometheus/prometheus/util/stats"
|
"github.com/prometheus/prometheus/util/stats"
|
||||||
|
|
||||||
"github.com/ClickHouse/clickhouse-go/v2"
|
"github.com/ClickHouse/clickhouse-go/v2"
|
||||||
@@ -38,7 +33,6 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
|
|
||||||
promModel "github.com/prometheus/common/model"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
queryprogress "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader/query_progress"
|
queryprogress "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader/query_progress"
|
||||||
@@ -120,6 +114,7 @@ var (
|
|||||||
// SpanWriter for reading spans from ClickHouse
|
// SpanWriter for reading spans from ClickHouse
|
||||||
type ClickHouseReader struct {
|
type ClickHouseReader struct {
|
||||||
db clickhouse.Conn
|
db clickhouse.Conn
|
||||||
|
engine promengine.PromEngine
|
||||||
localDB *sqlx.DB
|
localDB *sqlx.DB
|
||||||
TraceDB string
|
TraceDB string
|
||||||
operationsTable string
|
operationsTable string
|
||||||
@@ -138,9 +133,6 @@ type ClickHouseReader struct {
|
|||||||
logsAttributeKeys string
|
logsAttributeKeys string
|
||||||
logsResourceKeys string
|
logsResourceKeys string
|
||||||
logsTagAttributeTableV2 string
|
logsTagAttributeTableV2 string
|
||||||
queryEngine *promql.Engine
|
|
||||||
remoteStorage *remote.Storage
|
|
||||||
fanoutStorage *storage.Storage
|
|
||||||
queryProgressTracker queryprogress.QueryProgressTracker
|
queryProgressTracker queryprogress.QueryProgressTracker
|
||||||
|
|
||||||
logsTableV2 string
|
logsTableV2 string
|
||||||
@@ -175,8 +167,7 @@ type ClickHouseReader struct {
|
|||||||
// NewTraceReader returns a TraceReader for the database
|
// NewTraceReader returns a TraceReader for the database
|
||||||
func NewReader(
|
func NewReader(
|
||||||
localDB *sqlx.DB,
|
localDB *sqlx.DB,
|
||||||
db driver.Conn,
|
telemetryStore telemetrystore.TelemetryStore,
|
||||||
configFile string,
|
|
||||||
featureFlag interfaces.FeatureLookup,
|
featureFlag interfaces.FeatureLookup,
|
||||||
cluster string,
|
cluster string,
|
||||||
useLogsNewSchema bool,
|
useLogsNewSchema bool,
|
||||||
@@ -185,14 +176,13 @@ func NewReader(
|
|||||||
cache cache.Cache,
|
cache cache.Cache,
|
||||||
) *ClickHouseReader {
|
) *ClickHouseReader {
|
||||||
options := NewOptions(primaryNamespace, archiveNamespace)
|
options := NewOptions(primaryNamespace, archiveNamespace)
|
||||||
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
|
return NewReaderFromClickhouseConnection(options, localDB, telemetryStore, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewReaderFromClickhouseConnection(
|
func NewReaderFromClickhouseConnection(
|
||||||
db driver.Conn,
|
|
||||||
options *Options,
|
options *Options,
|
||||||
localDB *sqlx.DB,
|
localDB *sqlx.DB,
|
||||||
configFile string,
|
telemetryStore telemetrystore.TelemetryStore,
|
||||||
featureFlag interfaces.FeatureLookup,
|
featureFlag interfaces.FeatureLookup,
|
||||||
cluster string,
|
cluster string,
|
||||||
useLogsNewSchema bool,
|
useLogsNewSchema bool,
|
||||||
@@ -215,7 +205,8 @@ func NewReaderFromClickhouseConnection(
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &ClickHouseReader{
|
return &ClickHouseReader{
|
||||||
db: db,
|
db: telemetryStore.ClickhouseDB(),
|
||||||
|
engine: telemetryStore.PrometheusEngine(),
|
||||||
localDB: localDB,
|
localDB: localDB,
|
||||||
TraceDB: options.primary.TraceDB,
|
TraceDB: options.primary.TraceDB,
|
||||||
operationsTable: options.primary.OperationsTable,
|
operationsTable: options.primary.OperationsTable,
|
||||||
@@ -235,7 +226,6 @@ func NewReaderFromClickhouseConnection(
|
|||||||
logsResourceKeys: options.primary.LogsResourceKeysTable,
|
logsResourceKeys: options.primary.LogsResourceKeysTable,
|
||||||
logsTagAttributeTableV2: options.primary.LogsTagAttributeTableV2,
|
logsTagAttributeTableV2: options.primary.LogsTagAttributeTableV2,
|
||||||
liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds,
|
liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds,
|
||||||
promConfigFile: configFile,
|
|
||||||
featureFlags: featureFlag,
|
featureFlags: featureFlag,
|
||||||
cluster: cluster,
|
cluster: cluster,
|
||||||
queryProgressTracker: queryprogress.NewQueryProgressTracker(),
|
queryProgressTracker: queryprogress.NewQueryProgressTracker(),
|
||||||
@@ -262,154 +252,8 @@ func NewReaderFromClickhouseConnection(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) Start(readerReady chan bool) {
|
|
||||||
logLevel := promlog.AllowedLevel{}
|
|
||||||
logLevel.Set("debug")
|
|
||||||
allowedFormat := promlog.AllowedFormat{}
|
|
||||||
allowedFormat.Set("logfmt")
|
|
||||||
|
|
||||||
promlogConfig := promlog.Config{
|
|
||||||
Level: &logLevel,
|
|
||||||
Format: &allowedFormat,
|
|
||||||
}
|
|
||||||
|
|
||||||
logger := promlog.New(&promlogConfig)
|
|
||||||
|
|
||||||
startTime := func() (int64, error) {
|
|
||||||
return int64(promModel.Latest), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
remoteStorage := remote.NewStorage(
|
|
||||||
log.With(logger, "component", "remote"),
|
|
||||||
nil,
|
|
||||||
startTime,
|
|
||||||
"",
|
|
||||||
time.Duration(1*time.Minute),
|
|
||||||
nil,
|
|
||||||
false,
|
|
||||||
)
|
|
||||||
|
|
||||||
cfg := struct {
|
|
||||||
configFile string
|
|
||||||
|
|
||||||
localStoragePath string
|
|
||||||
lookbackDelta promModel.Duration
|
|
||||||
webTimeout promModel.Duration
|
|
||||||
queryTimeout promModel.Duration
|
|
||||||
queryConcurrency int
|
|
||||||
queryMaxSamples int
|
|
||||||
RemoteFlushDeadline promModel.Duration
|
|
||||||
|
|
||||||
prometheusURL string
|
|
||||||
|
|
||||||
logLevel promlog.AllowedLevel
|
|
||||||
}{
|
|
||||||
configFile: r.promConfigFile,
|
|
||||||
}
|
|
||||||
|
|
||||||
fanoutStorage := storage.NewFanout(logger, remoteStorage)
|
|
||||||
|
|
||||||
opts := promql.EngineOpts{
|
|
||||||
Logger: log.With(logger, "component", "query engine"),
|
|
||||||
Reg: nil,
|
|
||||||
MaxSamples: 50000000,
|
|
||||||
Timeout: time.Duration(2 * time.Minute),
|
|
||||||
ActiveQueryTracker: promql.NewActiveQueryTracker(
|
|
||||||
"",
|
|
||||||
20,
|
|
||||||
log.With(logger, "component", "activeQueryTracker"),
|
|
||||||
),
|
|
||||||
}
|
|
||||||
|
|
||||||
queryEngine := promql.NewEngine(opts)
|
|
||||||
|
|
||||||
reloaders := []func(cfg *config.Config) error{
|
|
||||||
remoteStorage.ApplyConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
|
|
||||||
type closeOnce struct {
|
|
||||||
C chan struct{}
|
|
||||||
once sync.Once
|
|
||||||
Close func()
|
|
||||||
}
|
|
||||||
// Wait until the server is ready to handle reloading.
|
|
||||||
reloadReady := &closeOnce{
|
|
||||||
C: make(chan struct{}),
|
|
||||||
}
|
|
||||||
reloadReady.Close = func() {
|
|
||||||
reloadReady.once.Do(func() {
|
|
||||||
close(reloadReady.C)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
var g group.Group
|
|
||||||
{
|
|
||||||
// Initial configuration loading.
|
|
||||||
cancel := make(chan struct{})
|
|
||||||
g.Add(
|
|
||||||
func() error {
|
|
||||||
var err error
|
|
||||||
r.promConfig, err = reloadConfig(cfg.configFile, logger, reloaders...)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error loading config from %q: %s", cfg.configFile, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
reloadReady.Close()
|
|
||||||
|
|
||||||
<-cancel
|
|
||||||
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
func(err error) {
|
|
||||||
close(cancel)
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
r.queryEngine = queryEngine
|
|
||||||
r.remoteStorage = remoteStorage
|
|
||||||
r.fanoutStorage = &fanoutStorage
|
|
||||||
readerReady <- true
|
|
||||||
|
|
||||||
if err := g.Run(); err != nil {
|
|
||||||
level.Error(logger).Log("err", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetQueryEngine() *promql.Engine {
|
|
||||||
return r.queryEngine
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetFanoutStorage() *storage.Storage {
|
|
||||||
return r.fanoutStorage
|
|
||||||
}
|
|
||||||
|
|
||||||
func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (promConfig *config.Config, err error) {
|
|
||||||
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
|
|
||||||
|
|
||||||
conf, err := config.LoadFile(filename, false, false, logger)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("couldn't load configuration (--config.file=%q): %v", filename, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
failed := false
|
|
||||||
for _, rl := range rls {
|
|
||||||
if err := rl(conf); err != nil {
|
|
||||||
level.Error(logger).Log("msg", "Failed to apply configuration", "err", err)
|
|
||||||
failed = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if failed {
|
|
||||||
return nil, fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename)
|
|
||||||
}
|
|
||||||
level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename)
|
|
||||||
return conf, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
|
func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
|
||||||
qry, err := r.queryEngine.NewInstantQuery(ctx, r.remoteStorage, nil, queryParams.Query, queryParams.Time)
|
qry, err := r.engine.Engine().NewInstantQuery(ctx, r.engine.Storage(), nil, queryParams.Query, queryParams.Time)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
return nil, nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||||
}
|
}
|
||||||
@@ -428,7 +272,7 @@ func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, que
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
|
func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
|
||||||
qry, err := r.queryEngine.NewRangeQuery(ctx, r.remoteStorage, nil, query.Query, query.Start, query.End, query.Step)
|
qry, err := r.engine.Engine().NewRangeQuery(ctx, r.engine.Storage(), nil, query.Query, query.Start, query.End, query.Step)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
return nil, nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||||
|
|||||||
@@ -5,11 +5,11 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"regexp"
|
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/DATA-DOG/go-sqlmock"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
|
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
|
||||||
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
|
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
|
||||||
@@ -18,6 +18,7 @@ import (
|
|||||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/querycache"
|
"github.com/SigNoz/signoz/pkg/query-service/querycache"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||||
|
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
@@ -1131,20 +1132,6 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type regexMatcher struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
|
|
||||||
re, err := regexp.Compile(expectedSQL)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if !re.MatchString(actualSQL) {
|
|
||||||
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||||
params := &v3.QueryRangeParamsV3{
|
params := &v3.QueryRangeParamsV3{
|
||||||
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
@@ -1358,8 +1345,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
// Setup mock
|
// Setup mock
|
||||||
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, ®exMatcher{})
|
telemetryStore, err := telemetrystoretest.New(sqlmock.QueryMatcherRegexp)
|
||||||
require.NoError(t, err, "Failed to create ClickHouse mock")
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Configure mock responses
|
// Configure mock responses
|
||||||
for _, response := range tc.queryResponses {
|
for _, response := range tc.queryResponses {
|
||||||
@@ -1368,7 +1355,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
values = append(values, []any{&ts, &testName})
|
values = append(values, []any{&ts, &testName})
|
||||||
}
|
}
|
||||||
// if len(values) > 0 {
|
// if len(values) > 0 {
|
||||||
mock.ExpectQuery(response.expectedQuery).WillReturnRows(
|
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||||
cmock.NewRows(cols, values),
|
cmock.NewRows(cols, values),
|
||||||
)
|
)
|
||||||
// }
|
// }
|
||||||
@@ -1376,10 +1363,9 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
|
|
||||||
// Create reader and querier
|
// Create reader and querier
|
||||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||||
mock,
|
|
||||||
options,
|
options,
|
||||||
nil,
|
nil,
|
||||||
"",
|
telemetryStore,
|
||||||
featureManager.StartManager(),
|
featureManager.StartManager(),
|
||||||
"",
|
"",
|
||||||
true,
|
true,
|
||||||
@@ -1429,7 +1415,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Verify mock expectations
|
// Verify mock expectations
|
||||||
err = mock.ExpectationsWereMet()
|
err = telemetryStore.Mock().ExpectationsWereMet()
|
||||||
require.NoError(t, err, "Mock expectations were not met")
|
require.NoError(t, err, "Mock expectations were not met")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,11 +5,11 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"regexp"
|
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/DATA-DOG/go-sqlmock"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
|
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
|
||||||
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
|
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
|
||||||
@@ -18,6 +18,7 @@ import (
|
|||||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/querycache"
|
"github.com/SigNoz/signoz/pkg/query-service/querycache"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||||
|
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
@@ -1185,20 +1186,6 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type regexMatcher struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
|
|
||||||
re, err := regexp.Compile(expectedSQL)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if !re.MatchString(actualSQL) {
|
|
||||||
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||||
params := &v3.QueryRangeParamsV3{
|
params := &v3.QueryRangeParamsV3{
|
||||||
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
@@ -1412,8 +1399,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
// Setup mock
|
// Setup mock
|
||||||
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, ®exMatcher{})
|
telemetryStore, err := telemetrystoretest.New(sqlmock.QueryMatcherRegexp)
|
||||||
require.NoError(t, err, "Failed to create ClickHouse mock")
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Configure mock responses
|
// Configure mock responses
|
||||||
for _, response := range tc.queryResponses {
|
for _, response := range tc.queryResponses {
|
||||||
@@ -1422,7 +1409,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
values = append(values, []any{&ts, &testName})
|
values = append(values, []any{&ts, &testName})
|
||||||
}
|
}
|
||||||
// if len(values) > 0 {
|
// if len(values) > 0 {
|
||||||
mock.ExpectQuery(response.expectedQuery).WillReturnRows(
|
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||||
cmock.NewRows(cols, values),
|
cmock.NewRows(cols, values),
|
||||||
)
|
)
|
||||||
// }
|
// }
|
||||||
@@ -1430,10 +1417,9 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
|
|
||||||
// Create reader and querier
|
// Create reader and querier
|
||||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||||
mock,
|
|
||||||
options,
|
options,
|
||||||
nil,
|
nil,
|
||||||
"",
|
telemetryStore,
|
||||||
featureManager.StartManager(),
|
featureManager.StartManager(),
|
||||||
"",
|
"",
|
||||||
true,
|
true,
|
||||||
@@ -1483,7 +1469,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Verify mock expectations
|
// Verify mock expectations
|
||||||
err = mock.ExpectationsWereMet()
|
err = telemetryStore.Mock().ExpectationsWereMet()
|
||||||
require.NoError(t, err, "Mock expectations were not met")
|
require.NoError(t, err, "Mock expectations were not met")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/query-service/app/preferences"
|
"github.com/SigNoz/signoz/pkg/query-service/app/preferences"
|
||||||
"github.com/SigNoz/signoz/pkg/signoz"
|
"github.com/SigNoz/signoz/pkg/signoz"
|
||||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||||
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||||
"github.com/SigNoz/signoz/pkg/types"
|
"github.com/SigNoz/signoz/pkg/types"
|
||||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||||
"github.com/SigNoz/signoz/pkg/web"
|
"github.com/SigNoz/signoz/pkg/web"
|
||||||
@@ -40,7 +41,6 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/query-service/healthcheck"
|
"github.com/SigNoz/signoz/pkg/query-service/healthcheck"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||||
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
|
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/rules"
|
"github.com/SigNoz/signoz/pkg/query-service/rules"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/telemetry"
|
"github.com/SigNoz/signoz/pkg/query-service/telemetry"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||||
@@ -119,10 +119,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
clickhouseReader := clickhouseReader.NewReader(
|
reader := clickhouseReader.NewReader(
|
||||||
serverOptions.SigNoz.SQLStore.SQLxDB(),
|
serverOptions.SigNoz.SQLStore.SQLxDB(),
|
||||||
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
|
serverOptions.SigNoz.TelemetryStore,
|
||||||
serverOptions.PromConfigPath,
|
|
||||||
fm,
|
fm,
|
||||||
serverOptions.Cluster,
|
serverOptions.Cluster,
|
||||||
serverOptions.UseLogsNewSchema,
|
serverOptions.UseLogsNewSchema,
|
||||||
@@ -130,8 +129,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
fluxIntervalForTraceDetail,
|
fluxIntervalForTraceDetail,
|
||||||
serverOptions.SigNoz.Cache,
|
serverOptions.SigNoz.Cache,
|
||||||
)
|
)
|
||||||
go clickhouseReader.Start(readerReady)
|
|
||||||
reader := clickhouseReader
|
|
||||||
|
|
||||||
skipConfig := &model.SkipConfig{}
|
skipConfig := &model.SkipConfig{}
|
||||||
if serverOptions.SkipTopLvlOpsPath != "" {
|
if serverOptions.SkipTopLvlOpsPath != "" {
|
||||||
@@ -162,6 +159,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
serverOptions.UseLogsNewSchema,
|
serverOptions.UseLogsNewSchema,
|
||||||
serverOptions.UseTraceNewSchema,
|
serverOptions.UseTraceNewSchema,
|
||||||
serverOptions.SigNoz.SQLStore,
|
serverOptions.SigNoz.SQLStore,
|
||||||
|
serverOptions.SigNoz.TelemetryStore,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -491,17 +489,11 @@ func makeRulesManager(
|
|||||||
useLogsNewSchema bool,
|
useLogsNewSchema bool,
|
||||||
useTraceNewSchema bool,
|
useTraceNewSchema bool,
|
||||||
sqlstore sqlstore.SQLStore,
|
sqlstore sqlstore.SQLStore,
|
||||||
|
telemetryStore telemetrystore.TelemetryStore,
|
||||||
) (*rules.Manager, error) {
|
) (*rules.Manager, error) {
|
||||||
|
|
||||||
// create engine
|
|
||||||
pqle, err := pqle.FromReader(ch)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create pql engine : %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// create manager opts
|
// create manager opts
|
||||||
managerOpts := &rules.ManagerOptions{
|
managerOpts := &rules.ManagerOptions{
|
||||||
PqlEngine: pqle,
|
TelemetryStore: telemetryStore,
|
||||||
RepoURL: ruleRepoURL,
|
RepoURL: ruleRepoURL,
|
||||||
DBConn: db,
|
DBConn: db,
|
||||||
Context: context.Background(),
|
Context: context.Background(),
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ import (
|
|||||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/querycache"
|
"github.com/SigNoz/signoz/pkg/query-service/querycache"
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
"github.com/prometheus/prometheus/storage"
|
|
||||||
"github.com/prometheus/prometheus/util/stats"
|
"github.com/prometheus/prometheus/util/stats"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -86,10 +85,6 @@ type Reader interface {
|
|||||||
req *v3.QBFilterSuggestionsRequest,
|
req *v3.QBFilterSuggestionsRequest,
|
||||||
) (*v3.QBFilterSuggestionsResponse, *model.ApiError)
|
) (*v3.QBFilterSuggestionsResponse, *model.ApiError)
|
||||||
|
|
||||||
// Connection needed for rules, not ideal but required
|
|
||||||
GetQueryEngine() *promql.Engine
|
|
||||||
GetFanoutStorage() *storage.Storage
|
|
||||||
|
|
||||||
QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error)
|
QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error)
|
||||||
CheckClickHouse(ctx context.Context) error
|
CheckClickHouse(ctx context.Context) error
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,6 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/signoz"
|
"github.com/SigNoz/signoz/pkg/signoz"
|
||||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||||
"github.com/SigNoz/signoz/pkg/version"
|
"github.com/SigNoz/signoz/pkg/version"
|
||||||
prommodel "github.com/prometheus/common/model"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
@@ -29,10 +28,6 @@ func initZapLog() *zap.Logger {
|
|||||||
return logger
|
return logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
|
||||||
prommodel.NameValidationScheme = prommodel.UTF8Validation
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
var promConfigPath, skipTopLvlOpsPath string
|
var promConfigPath, skipTopLvlOpsPath string
|
||||||
|
|
||||||
@@ -85,6 +80,7 @@ func main() {
|
|||||||
MaxIdleConns: maxIdleConns,
|
MaxIdleConns: maxIdleConns,
|
||||||
MaxOpenConns: maxOpenConns,
|
MaxOpenConns: maxOpenConns,
|
||||||
DialTimeout: dialTimeout,
|
DialTimeout: dialTimeout,
|
||||||
|
Config: promConfigPath,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Fatal("Failed to create config", zap.Error(err))
|
zap.L().Fatal("Failed to create config", zap.Error(err))
|
||||||
@@ -101,7 +97,7 @@ func main() {
|
|||||||
signoz.NewTelemetryStoreProviderFactories(),
|
signoz.NewTelemetryStoreProviderFactories(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Fatal("Failed to create signoz struct", zap.Error(err))
|
zap.L().Fatal("Failed to create signoz", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the jwt secret key
|
// Read the jwt secret key
|
||||||
|
|||||||
@@ -1,126 +0,0 @@
|
|||||||
package promql
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
|
||||||
"github.com/go-kit/log"
|
|
||||||
pmodel "github.com/prometheus/common/model"
|
|
||||||
"github.com/prometheus/common/promlog"
|
|
||||||
pconfig "github.com/prometheus/prometheus/config"
|
|
||||||
pql "github.com/prometheus/prometheus/promql"
|
|
||||||
pstorage "github.com/prometheus/prometheus/storage"
|
|
||||||
premote "github.com/prometheus/prometheus/storage/remote"
|
|
||||||
)
|
|
||||||
|
|
||||||
type PqlEngine struct {
|
|
||||||
engine *pql.Engine
|
|
||||||
fanoutStorage pstorage.Storage
|
|
||||||
}
|
|
||||||
|
|
||||||
func FromConfigPath(promConfigPath string) (*PqlEngine, error) {
|
|
||||||
// load storage path
|
|
||||||
c, err := pconfig.LoadFile(promConfigPath, false, false, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("couldn't load configuration (--config.file=%q): %v", promConfigPath, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return NewPqlEngine(c)
|
|
||||||
}
|
|
||||||
|
|
||||||
func FromReader(ch interfaces.Reader) (*PqlEngine, error) {
|
|
||||||
return &PqlEngine{
|
|
||||||
engine: ch.GetQueryEngine(),
|
|
||||||
fanoutStorage: *ch.GetFanoutStorage(),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewPqlEngine(config *pconfig.Config) (*PqlEngine, error) {
|
|
||||||
|
|
||||||
logLevel := promlog.AllowedLevel{}
|
|
||||||
_ = logLevel.Set("debug")
|
|
||||||
|
|
||||||
allowedFormat := promlog.AllowedFormat{}
|
|
||||||
_ = allowedFormat.Set("logfmt")
|
|
||||||
|
|
||||||
promlogConfig := promlog.Config{
|
|
||||||
Level: &logLevel,
|
|
||||||
Format: &allowedFormat,
|
|
||||||
}
|
|
||||||
|
|
||||||
logger := promlog.New(&promlogConfig)
|
|
||||||
|
|
||||||
opts := pql.EngineOpts{
|
|
||||||
Logger: log.With(logger, "component", "promql evaluator"),
|
|
||||||
Reg: nil,
|
|
||||||
MaxSamples: 50000000,
|
|
||||||
Timeout: time.Duration(2 * time.Minute),
|
|
||||||
ActiveQueryTracker: pql.NewActiveQueryTracker(
|
|
||||||
"",
|
|
||||||
20,
|
|
||||||
logger,
|
|
||||||
),
|
|
||||||
}
|
|
||||||
|
|
||||||
e := pql.NewEngine(opts)
|
|
||||||
startTime := func() (int64, error) {
|
|
||||||
return int64(pmodel.Latest), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
remoteStorage := premote.NewStorage(
|
|
||||||
log.With(logger, "component", "remote"),
|
|
||||||
nil,
|
|
||||||
startTime,
|
|
||||||
"",
|
|
||||||
time.Duration(1*time.Minute),
|
|
||||||
nil,
|
|
||||||
false,
|
|
||||||
)
|
|
||||||
fanoutStorage := pstorage.NewFanout(logger, remoteStorage)
|
|
||||||
|
|
||||||
_ = remoteStorage.ApplyConfig(config)
|
|
||||||
|
|
||||||
return &PqlEngine{
|
|
||||||
engine: e,
|
|
||||||
fanoutStorage: fanoutStorage,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PqlEngine) RunAlertQuery(ctx context.Context, qs string, start, end time.Time, interval time.Duration) (pql.Matrix, error) {
|
|
||||||
q, err := p.engine.NewRangeQuery(ctx, p.fanoutStorage, nil, qs, start, end, interval)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
res := q.Exec(ctx)
|
|
||||||
|
|
||||||
if res.Err != nil {
|
|
||||||
return nil, res.Err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch typ := res.Value.(type) {
|
|
||||||
case pql.Vector:
|
|
||||||
series := make([]pql.Series, 0, len(typ))
|
|
||||||
value := res.Value.(pql.Vector)
|
|
||||||
for _, smpl := range value {
|
|
||||||
series = append(series, pql.Series{
|
|
||||||
Metric: smpl.Metric,
|
|
||||||
Floats: []pql.FPoint{{T: smpl.T, F: smpl.F}},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return series, nil
|
|
||||||
case pql.Scalar:
|
|
||||||
value := res.Value.(pql.Scalar)
|
|
||||||
series := make([]pql.Series, 0, 1)
|
|
||||||
series = append(series, pql.Series{
|
|
||||||
Floats: []pql.FPoint{{T: value.T, F: value.V}},
|
|
||||||
})
|
|
||||||
return series, nil
|
|
||||||
case pql.Matrix:
|
|
||||||
return res.Value.(pql.Matrix), nil
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("rule result is not a vector or scalar")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -21,9 +21,9 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/query-service/cache"
|
"github.com/SigNoz/signoz/pkg/query-service/cache"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||||
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
|
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/telemetry"
|
"github.com/SigNoz/signoz/pkg/query-service/telemetry"
|
||||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||||
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||||
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||||
)
|
)
|
||||||
@@ -76,8 +76,7 @@ func prepareTaskName(ruleId interface{}) string {
|
|||||||
|
|
||||||
// ManagerOptions bundles options for the Manager.
|
// ManagerOptions bundles options for the Manager.
|
||||||
type ManagerOptions struct {
|
type ManagerOptions struct {
|
||||||
PqlEngine *pqle.PqlEngine
|
TelemetryStore telemetrystore.TelemetryStore
|
||||||
|
|
||||||
// RepoURL is used to generate a backlink in sent alert messages
|
// RepoURL is used to generate a backlink in sent alert messages
|
||||||
RepoURL string
|
RepoURL string
|
||||||
|
|
||||||
@@ -180,7 +179,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
|||||||
opts.Rule,
|
opts.Rule,
|
||||||
opts.Logger,
|
opts.Logger,
|
||||||
opts.Reader,
|
opts.Reader,
|
||||||
opts.ManagerOpts.PqlEngine,
|
opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
|
||||||
WithSQLStore(opts.SQLStore),
|
WithSQLStore(opts.SQLStore),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -8,21 +8,22 @@ import (
|
|||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/promengine"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/formatter"
|
"github.com/SigNoz/signoz/pkg/query-service/formatter"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||||
pqle "github.com/SigNoz/signoz/pkg/query-service/pqlEngine"
|
|
||||||
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/utils/times"
|
"github.com/SigNoz/signoz/pkg/query-service/utils/times"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
|
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
|
pql "github.com/prometheus/prometheus/promql"
|
||||||
yaml "gopkg.in/yaml.v2"
|
yaml "gopkg.in/yaml.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PromRule struct {
|
type PromRule struct {
|
||||||
*BaseRule
|
*BaseRule
|
||||||
pqlEngine *pqle.PqlEngine
|
pqlEngine promengine.PromEngine
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPromRule(
|
func NewPromRule(
|
||||||
@@ -30,7 +31,7 @@ func NewPromRule(
|
|||||||
postableRule *PostableRule,
|
postableRule *PostableRule,
|
||||||
logger *zap.Logger,
|
logger *zap.Logger,
|
||||||
reader interfaces.Reader,
|
reader interfaces.Reader,
|
||||||
pqlEngine *pqle.PqlEngine,
|
pqlEngine promengine.PromEngine,
|
||||||
opts ...RuleOption,
|
opts ...RuleOption,
|
||||||
) (*PromRule, error) {
|
) (*PromRule, error) {
|
||||||
|
|
||||||
@@ -108,7 +109,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
zap.L().Info("evaluating promql query", zap.String("name", r.Name()), zap.String("query", q))
|
zap.L().Info("evaluating promql query", zap.String("name", r.Name()), zap.String("query", q))
|
||||||
res, err := r.pqlEngine.RunAlertQuery(ctx, q, start, end, interval)
|
res, err := r.RunAlertQuery(ctx, q, start, end, interval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.SetHealth(HealthBad)
|
r.SetHealth(HealthBad)
|
||||||
r.SetLastError(err)
|
r.SetLastError(err)
|
||||||
@@ -306,6 +307,43 @@ func (r *PromRule) String() string {
|
|||||||
return string(byt)
|
return string(byt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *PromRule) RunAlertQuery(ctx context.Context, qs string, start, end time.Time, interval time.Duration) (pql.Matrix, error) {
|
||||||
|
q, err := r.pqlEngine.Engine().NewRangeQuery(ctx, r.pqlEngine.Storage(), nil, qs, start, end, interval)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res := q.Exec(ctx)
|
||||||
|
|
||||||
|
if res.Err != nil {
|
||||||
|
return nil, res.Err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch typ := res.Value.(type) {
|
||||||
|
case pql.Vector:
|
||||||
|
series := make([]pql.Series, 0, len(typ))
|
||||||
|
value := res.Value.(pql.Vector)
|
||||||
|
for _, smpl := range value {
|
||||||
|
series = append(series, pql.Series{
|
||||||
|
Metric: smpl.Metric,
|
||||||
|
Floats: []pql.FPoint{{T: smpl.T, F: smpl.F}},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return series, nil
|
||||||
|
case pql.Scalar:
|
||||||
|
value := res.Value.(pql.Scalar)
|
||||||
|
series := make([]pql.Series, 0, 1)
|
||||||
|
series = append(series, pql.Series{
|
||||||
|
Floats: []pql.FPoint{{T: value.T, F: value.V}},
|
||||||
|
})
|
||||||
|
return series, nil
|
||||||
|
case pql.Matrix:
|
||||||
|
return res.Value.(pql.Matrix), nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("rule result is not a vector or scalar")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func toCommonSeries(series promql.Series) v3.Series {
|
func toCommonSeries(series promql.Series) v3.Series {
|
||||||
commonSeries := v3.Series{
|
commonSeries := v3.Series{
|
||||||
Labels: make(map[string]string),
|
Labels: make(map[string]string),
|
||||||
|
|||||||
@@ -68,7 +68,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
|
|||||||
parsedRule,
|
parsedRule,
|
||||||
opts.Logger,
|
opts.Logger,
|
||||||
opts.Reader,
|
opts.Reader,
|
||||||
opts.ManagerOpts.PqlEngine,
|
opts.ManagerOpts.TelemetryStore.PrometheusEngine(),
|
||||||
WithSendAlways(),
|
WithSendAlways(),
|
||||||
WithSendUnmatched(),
|
WithSendUnmatched(),
|
||||||
WithSQLStore(opts.SQLStore),
|
WithSQLStore(opts.SQLStore),
|
||||||
|
|||||||
@@ -3,19 +3,22 @@ package rules
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/SigNoz/signoz/pkg/cache"
|
|
||||||
"github.com/SigNoz/signoz/pkg/cache/memorycache"
|
|
||||||
"github.com/SigNoz/signoz/pkg/factory/factorytest"
|
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/cache"
|
||||||
|
"github.com/SigNoz/signoz/pkg/cache/memorycache"
|
||||||
|
"github.com/SigNoz/signoz/pkg/factory/factorytest"
|
||||||
|
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||||
|
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/common"
|
"github.com/SigNoz/signoz/pkg/query-service/common"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/featureManager"
|
"github.com/SigNoz/signoz/pkg/query-service/featureManager"
|
||||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||||
)
|
)
|
||||||
@@ -1152,10 +1155,8 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
fm := featureManager.StartManager()
|
fm := featureManager.StartManager()
|
||||||
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{})
|
telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
|
||||||
if err != nil {
|
require.NoError(t, err)
|
||||||
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cols := make([]cmock.ColumnType, 0)
|
cols := make([]cmock.ColumnType, 0)
|
||||||
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"})
|
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"})
|
||||||
@@ -1227,11 +1228,11 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
|||||||
|
|
||||||
for idx, c := range cases {
|
for idx, c := range cases {
|
||||||
rows := cmock.NewRows(cols, c.values)
|
rows := cmock.NewRows(cols, c.values)
|
||||||
mock.ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
|
telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
|
||||||
// We are testing the eval logic after the query is run
|
// We are testing the eval logic after the query is run
|
||||||
// so we don't care about the query string here
|
// so we don't care about the query string here
|
||||||
queryString := "SELECT any"
|
queryString := "SELECT any"
|
||||||
mock.
|
telemetryStore.Mock().
|
||||||
ExpectQuery(queryString).
|
ExpectQuery(queryString).
|
||||||
WillReturnRows(rows)
|
WillReturnRows(rows)
|
||||||
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
|
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
|
||||||
@@ -1246,7 +1247,8 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
|||||||
|
|
||||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||||
readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
|
readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
|
||||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), readerCache)
|
require.NoError(t, err)
|
||||||
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), readerCache)
|
||||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||||
"signoz_calls_total": {
|
"signoz_calls_total": {
|
||||||
@@ -1305,10 +1307,8 @@ func TestThresholdRuleNoData(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
fm := featureManager.StartManager()
|
fm := featureManager.StartManager()
|
||||||
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{})
|
telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
|
||||||
if err != nil {
|
require.NoError(t, err)
|
||||||
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cols := make([]cmock.ColumnType, 0)
|
cols := make([]cmock.ColumnType, 0)
|
||||||
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"})
|
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"})
|
||||||
@@ -1328,12 +1328,12 @@ func TestThresholdRuleNoData(t *testing.T) {
|
|||||||
for idx, c := range cases {
|
for idx, c := range cases {
|
||||||
rows := cmock.NewRows(cols, c.values)
|
rows := cmock.NewRows(cols, c.values)
|
||||||
|
|
||||||
mock.ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
|
telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
|
||||||
|
|
||||||
// We are testing the eval logic after the query is run
|
// We are testing the eval logic after the query is run
|
||||||
// so we don't care about the query string here
|
// so we don't care about the query string here
|
||||||
queryString := "SELECT any"
|
queryString := "SELECT any"
|
||||||
mock.
|
telemetryStore.Mock().
|
||||||
ExpectQuery(queryString).
|
ExpectQuery(queryString).
|
||||||
WillReturnRows(rows)
|
WillReturnRows(rows)
|
||||||
var target float64 = 0
|
var target float64 = 0
|
||||||
@@ -1346,7 +1346,7 @@ func TestThresholdRuleNoData(t *testing.T) {
|
|||||||
}
|
}
|
||||||
readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
|
readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
|
||||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), readerCache)
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), readerCache)
|
||||||
|
|
||||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||||
@@ -1410,10 +1410,8 @@ func TestThresholdRuleTracesLink(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
fm := featureManager.StartManager()
|
fm := featureManager.StartManager()
|
||||||
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{})
|
telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
|
||||||
if err != nil {
|
require.NoError(t, err)
|
||||||
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
metaCols := make([]cmock.ColumnType, 0)
|
metaCols := make([]cmock.ColumnType, 0)
|
||||||
metaCols = append(metaCols, cmock.ColumnType{Name: "DISTINCT(tagKey)", Type: "String"})
|
metaCols = append(metaCols, cmock.ColumnType{Name: "DISTINCT(tagKey)", Type: "String"})
|
||||||
@@ -1428,11 +1426,11 @@ func TestThresholdRuleTracesLink(t *testing.T) {
|
|||||||
|
|
||||||
for idx, c := range testCases {
|
for idx, c := range testCases {
|
||||||
metaRows := cmock.NewRows(metaCols, c.metaValues)
|
metaRows := cmock.NewRows(metaCols, c.metaValues)
|
||||||
mock.
|
telemetryStore.Mock().
|
||||||
ExpectQuery("SELECT DISTINCT(tagKey), tagType, dataType FROM archiveNamespace.span_attributes_keys").
|
ExpectQuery("SELECT DISTINCT(tagKey), tagType, dataType FROM archiveNamespace.span_attributes_keys").
|
||||||
WillReturnRows(metaRows)
|
WillReturnRows(metaRows)
|
||||||
|
|
||||||
mock.
|
telemetryStore.Mock().
|
||||||
ExpectSelect("SHOW CREATE TABLE signoz_traces.distributed_signoz_index_v3").WillReturnRows(&cmock.Rows{})
|
ExpectSelect("SHOW CREATE TABLE signoz_traces.distributed_signoz_index_v3").WillReturnRows(&cmock.Rows{})
|
||||||
|
|
||||||
rows := cmock.NewRows(cols, c.values)
|
rows := cmock.NewRows(cols, c.values)
|
||||||
@@ -1440,7 +1438,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
|
|||||||
// We are testing the eval logic after the query is run
|
// We are testing the eval logic after the query is run
|
||||||
// so we don't care about the query string here
|
// so we don't care about the query string here
|
||||||
queryString := "SELECT any"
|
queryString := "SELECT any"
|
||||||
mock.
|
telemetryStore.Mock().
|
||||||
ExpectQuery(queryString).
|
ExpectQuery(queryString).
|
||||||
WillReturnRows(rows)
|
WillReturnRows(rows)
|
||||||
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
|
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
|
||||||
@@ -1454,7 +1452,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), nil)
|
||||||
|
|
||||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||||
@@ -1523,10 +1521,8 @@ func TestThresholdRuleLogsLink(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
fm := featureManager.StartManager()
|
fm := featureManager.StartManager()
|
||||||
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &queryMatcherAny{})
|
telemetryStore, err := telemetrystoretest.New(&queryMatcherAny{})
|
||||||
if err != nil {
|
require.NoError(t, err)
|
||||||
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
attrMetaCols := make([]cmock.ColumnType, 0)
|
attrMetaCols := make([]cmock.ColumnType, 0)
|
||||||
attrMetaCols = append(attrMetaCols, cmock.ColumnType{Name: "name", Type: "String"})
|
attrMetaCols = append(attrMetaCols, cmock.ColumnType{Name: "name", Type: "String"})
|
||||||
@@ -1546,17 +1542,17 @@ func TestThresholdRuleLogsLink(t *testing.T) {
|
|||||||
|
|
||||||
for idx, c := range testCases {
|
for idx, c := range testCases {
|
||||||
attrMetaRows := cmock.NewRows(attrMetaCols, c.attrMetaValues)
|
attrMetaRows := cmock.NewRows(attrMetaCols, c.attrMetaValues)
|
||||||
mock.
|
telemetryStore.Mock().
|
||||||
ExpectSelect("SELECT DISTINCT name, datatype from signoz_logs.distributed_logs_attribute_keys group by name, datatype").
|
ExpectSelect("SELECT DISTINCT name, datatype from signoz_logs.distributed_logs_attribute_keys group by name, datatype").
|
||||||
WillReturnRows(attrMetaRows)
|
WillReturnRows(attrMetaRows)
|
||||||
|
|
||||||
resourceMetaRows := cmock.NewRows(resourceMetaCols, c.resourceMetaValues)
|
resourceMetaRows := cmock.NewRows(resourceMetaCols, c.resourceMetaValues)
|
||||||
mock.
|
telemetryStore.Mock().
|
||||||
ExpectSelect("SELECT DISTINCT name, datatype from signoz_logs.distributed_logs_resource_keys group by name, datatype").
|
ExpectSelect("SELECT DISTINCT name, datatype from signoz_logs.distributed_logs_resource_keys group by name, datatype").
|
||||||
WillReturnRows(resourceMetaRows)
|
WillReturnRows(resourceMetaRows)
|
||||||
|
|
||||||
createTableRows := cmock.NewRows(createTableCols, c.createTableValues)
|
createTableRows := cmock.NewRows(createTableCols, c.createTableValues)
|
||||||
mock.
|
telemetryStore.Mock().
|
||||||
ExpectSelect("SHOW CREATE TABLE signoz_logs.logs").
|
ExpectSelect("SHOW CREATE TABLE signoz_logs.logs").
|
||||||
WillReturnRows(createTableRows)
|
WillReturnRows(createTableRows)
|
||||||
|
|
||||||
@@ -1565,7 +1561,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
|
|||||||
// We are testing the eval logic after the query is run
|
// We are testing the eval logic after the query is run
|
||||||
// so we don't care about the query string here
|
// so we don't care about the query string here
|
||||||
queryString := "SELECT any"
|
queryString := "SELECT any"
|
||||||
mock.
|
telemetryStore.Mock().
|
||||||
ExpectQuery(queryString).
|
ExpectQuery(queryString).
|
||||||
WillReturnRows(rows)
|
WillReturnRows(rows)
|
||||||
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
|
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
|
||||||
@@ -1579,7 +1575,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, fm, "", true, true, time.Duration(time.Second), nil)
|
||||||
|
|
||||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/query-service/dao"
|
"github.com/SigNoz/signoz/pkg/query-service/dao"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||||
|
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||||
"github.com/SigNoz/signoz/pkg/types"
|
"github.com/SigNoz/signoz/pkg/types"
|
||||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
@@ -39,14 +40,14 @@ func NewMockClickhouseReader(
|
|||||||
) {
|
) {
|
||||||
require.NotNil(t, testDB)
|
require.NotNil(t, testDB)
|
||||||
|
|
||||||
mockDB, err := mockhouse.NewClickHouseWithQueryMatcher(nil, sqlmock.QueryMatcherRegexp)
|
telemetryStore, err := telemetrystoretest.New(sqlmock.QueryMatcherRegexp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Nil(t, err, "could not init mock clickhouse")
|
require.Nil(t, err, "could not init mock clickhouse")
|
||||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||||
mockDB,
|
|
||||||
clickhouseReader.NewOptions("", ""),
|
clickhouseReader.NewOptions("", ""),
|
||||||
testDB,
|
testDB,
|
||||||
"",
|
telemetryStore,
|
||||||
featureFlags,
|
featureFlags,
|
||||||
"",
|
"",
|
||||||
true,
|
true,
|
||||||
@@ -55,7 +56,7 @@ func NewMockClickhouseReader(
|
|||||||
nil,
|
nil,
|
||||||
)
|
)
|
||||||
|
|
||||||
return reader, mockDB
|
return reader, telemetryStore.Mock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func addLogsQueryExpectation(
|
func addLogsQueryExpectation(
|
||||||
|
|||||||
@@ -20,6 +20,8 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||||
"github.com/SigNoz/signoz/pkg/version"
|
"github.com/SigNoz/signoz/pkg/version"
|
||||||
"github.com/SigNoz/signoz/pkg/web"
|
"github.com/SigNoz/signoz/pkg/web"
|
||||||
|
"github.com/go-kit/log"
|
||||||
|
promconfig "github.com/prometheus/prometheus/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config defines the entire input configuration of signoz.
|
// Config defines the entire input configuration of signoz.
|
||||||
@@ -49,7 +51,7 @@ type Config struct {
|
|||||||
APIServer apiserver.Config `mapstructure:"apiserver"`
|
APIServer apiserver.Config `mapstructure:"apiserver"`
|
||||||
|
|
||||||
// TelemetryStore config
|
// TelemetryStore config
|
||||||
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"`
|
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore" yaml:"telemetrystore"`
|
||||||
|
|
||||||
// Alertmanager config
|
// Alertmanager config
|
||||||
Alertmanager alertmanager.Config `mapstructure:"alertmanager" yaml:"alertmanager"`
|
Alertmanager alertmanager.Config `mapstructure:"alertmanager" yaml:"alertmanager"`
|
||||||
@@ -61,6 +63,7 @@ type DeprecatedFlags struct {
|
|||||||
MaxIdleConns int
|
MaxIdleConns int
|
||||||
MaxOpenConns int
|
MaxOpenConns int
|
||||||
DialTimeout time.Duration
|
DialTimeout time.Duration
|
||||||
|
Config string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprecatedFlags DeprecatedFlags) (Config, error) {
|
func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprecatedFlags DeprecatedFlags) (Config, error) {
|
||||||
@@ -145,7 +148,7 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
|
|||||||
|
|
||||||
if os.Getenv("ClickHouseUrl") != "" {
|
if os.Getenv("ClickHouseUrl") != "" {
|
||||||
fmt.Println("[Deprecated] env ClickHouseUrl is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN instead.")
|
fmt.Println("[Deprecated] env ClickHouseUrl is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN instead.")
|
||||||
config.TelemetryStore.ClickHouse.DSN = os.Getenv("ClickHouseUrl")
|
config.TelemetryStore.Clickhouse.DSN = os.Getenv("ClickHouseUrl")
|
||||||
}
|
}
|
||||||
|
|
||||||
if deprecatedFlags.MaxIdleConns != 50 {
|
if deprecatedFlags.MaxIdleConns != 50 {
|
||||||
@@ -176,4 +179,18 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
|
|||||||
if os.Getenv("ALERTMANAGER_API_CHANNEL_PATH") != "" {
|
if os.Getenv("ALERTMANAGER_API_CHANNEL_PATH") != "" {
|
||||||
fmt.Println("[Deprecated] env ALERTMANAGER_API_CHANNEL_PATH is deprecated and scheduled for complete removal.")
|
fmt.Println("[Deprecated] env ALERTMANAGER_API_CHANNEL_PATH is deprecated and scheduled for complete removal.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if deprecatedFlags.Config != "" {
|
||||||
|
fmt.Println("[Deprecated] flag --config is deprecated for passing prometheus config. Please use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_PROMETHEUS_REMOTE__READ_URL instead. The flag will be used for passing the entire SigNoz config. More details can be found at https://github.com/SigNoz/signoz/issues/6805.")
|
||||||
|
cfg, err := promconfig.LoadFile(deprecatedFlags.Config, false, false, log.NewNopLogger())
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("Error parsing config, using value set in SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_PROMETHEUS_REMOTE__READ_URL")
|
||||||
|
} else {
|
||||||
|
if len(cfg.RemoteReadConfigs) != 1 {
|
||||||
|
fmt.Println("Error finding remote read config, using value set in SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_PROMETHEUS_REMOTE__READ_URL")
|
||||||
|
} else {
|
||||||
|
config.TelemetryStore.Clickhouse.Prometheus.RemoteReadConfig.URL = cfg.RemoteReadConfigs[0].URL.URL
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedM
|
|||||||
|
|
||||||
func NewTelemetryStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]] {
|
func NewTelemetryStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]] {
|
||||||
return factory.MustNewNamedMap(
|
return factory.MustNewNamedMap(
|
||||||
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewFactory()),
|
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewSettingsFactory()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,13 +6,15 @@ import (
|
|||||||
"github.com/ClickHouse/clickhouse-go/v2"
|
"github.com/ClickHouse/clickhouse-go/v2"
|
||||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||||
"github.com/SigNoz/signoz/pkg/factory"
|
"github.com/SigNoz/signoz/pkg/factory"
|
||||||
|
"github.com/SigNoz/signoz/pkg/promengine"
|
||||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||||
)
|
)
|
||||||
|
|
||||||
type provider struct {
|
type provider struct {
|
||||||
settings factory.ScopedProviderSettings
|
settings factory.ScopedProviderSettings
|
||||||
clickHouseConn clickhouse.Conn
|
clickHouseConn clickhouse.Conn
|
||||||
hooks []telemetrystore.TelemetryStoreHook
|
prometheusEngine promengine.PromEngine
|
||||||
|
hooks []telemetrystore.TelemetryStoreHook
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config]) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
|
func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config]) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
|
||||||
@@ -33,7 +35,7 @@ func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.Telemetr
|
|||||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hooks ...telemetrystore.TelemetryStoreHook) (telemetrystore.TelemetryStore, error) {
|
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hooks ...telemetrystore.TelemetryStoreHook) (telemetrystore.TelemetryStore, error) {
|
||||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/telemetrystore/clickhousetelemetrystore")
|
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/telemetrystore/clickhousetelemetrystore")
|
||||||
|
|
||||||
options, err := clickhouse.ParseDSN(config.ClickHouse.DSN)
|
options, err := clickhouse.ParseDSN(config.Clickhouse.DSN)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -46,75 +48,85 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
engine, err := promengine.New(settings.Logger(), config.Clickhouse.Prometheus)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return &provider{
|
return &provider{
|
||||||
settings: settings,
|
settings: settings,
|
||||||
clickHouseConn: chConn,
|
clickHouseConn: chConn,
|
||||||
hooks: hooks,
|
hooks: hooks,
|
||||||
|
prometheusEngine: engine,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *provider) ClickHouseDB() clickhouse.Conn {
|
func (p *provider) ClickhouseDB() clickhouse.Conn {
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p provider) Close() error {
|
func (p *provider) PrometheusEngine() promengine.PromEngine {
|
||||||
|
return p.prometheusEngine
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *provider) Close() error {
|
||||||
return p.clickHouseConn.Close()
|
return p.clickHouseConn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p provider) Ping(ctx context.Context) error {
|
func (p *provider) Ping(ctx context.Context) error {
|
||||||
return p.clickHouseConn.Ping(ctx)
|
return p.clickHouseConn.Ping(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p provider) Stats() driver.Stats {
|
func (p *provider) Stats() driver.Stats {
|
||||||
return p.clickHouseConn.Stats()
|
return p.clickHouseConn.Stats()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p provider) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
|
func (p *provider) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
|
||||||
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
||||||
rows, err := p.clickHouseConn.Query(ctx, query, args...)
|
rows, err := p.clickHouseConn.Query(ctx, query, args...)
|
||||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, rows, err)
|
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, rows, err)
|
||||||
return rows, err
|
return rows, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p provider) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
|
func (p *provider) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
|
||||||
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
||||||
row := p.clickHouseConn.QueryRow(ctx, query, args...)
|
row := p.clickHouseConn.QueryRow(ctx, query, args...)
|
||||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, nil)
|
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, nil)
|
||||||
return row
|
return row
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p provider) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
|
func (p *provider) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
|
||||||
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
||||||
err := p.clickHouseConn.Select(ctx, dest, query, args...)
|
err := p.clickHouseConn.Select(ctx, dest, query, args...)
|
||||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p provider) Exec(ctx context.Context, query string, args ...interface{}) error {
|
func (p *provider) Exec(ctx context.Context, query string, args ...interface{}) error {
|
||||||
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
||||||
err := p.clickHouseConn.Exec(ctx, query, args...)
|
err := p.clickHouseConn.Exec(ctx, query, args...)
|
||||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p provider) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
|
func (p *provider) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
|
||||||
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
|
||||||
err := p.clickHouseConn.AsyncInsert(ctx, query, wait, args...)
|
err := p.clickHouseConn.AsyncInsert(ctx, query, wait, args...)
|
||||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p provider) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
|
func (p *provider) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
|
||||||
ctx, query, args := telemetrystore.WrapBeforeQuery(p.hooks, ctx, query)
|
ctx, query, args := telemetrystore.WrapBeforeQuery(p.hooks, ctx, query)
|
||||||
batch, err := p.clickHouseConn.PrepareBatch(ctx, query, opts...)
|
batch, err := p.clickHouseConn.PrepareBatch(ctx, query, opts...)
|
||||||
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
|
||||||
return batch, err
|
return batch, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p provider) ServerVersion() (*driver.ServerVersion, error) {
|
func (p *provider) ServerVersion() (*driver.ServerVersion, error) {
|
||||||
return p.clickHouseConn.ServerVersion()
|
return p.clickHouseConn.ServerVersion()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p provider) Contributors() []string {
|
func (p *provider) Contributors() []string {
|
||||||
return p.clickHouseConn.Contributors()
|
return p.clickHouseConn.Contributors()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,28 +2,36 @@ package telemetrystore
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/SigNoz/signoz/pkg/factory"
|
"github.com/SigNoz/signoz/pkg/factory"
|
||||||
|
"github.com/SigNoz/signoz/pkg/promengine"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
// Provider is the provider to use
|
// Provider is the provider to use
|
||||||
Provider string `mapstructure:"provider"`
|
Provider string `mapstructure:"provider"`
|
||||||
|
|
||||||
// Connection is the connection configuration
|
// Connection is the connection configuration
|
||||||
Connection ConnectionConfig `mapstructure:",squash"`
|
Connection ConnectionConfig `mapstructure:",squash"`
|
||||||
|
|
||||||
// Clickhouse is the clickhouse configuration
|
// Clickhouse is the clickhouse configuration
|
||||||
ClickHouse ClickHouseConfig `mapstructure:"clickhouse"`
|
Clickhouse ClickhouseConfig `mapstructure:"clickhouse"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ConnectionConfig struct {
|
type ConnectionConfig struct {
|
||||||
// MaxOpenConns is the maximum number of open connections to the database.
|
// MaxOpenConns is the maximum number of open connections to the database.
|
||||||
MaxOpenConns int `mapstructure:"max_open_conns"`
|
MaxOpenConns int `mapstructure:"max_open_conns"`
|
||||||
MaxIdleConns int `mapstructure:"max_idle_conns"`
|
|
||||||
DialTimeout time.Duration `mapstructure:"dial_timeout"`
|
// MaxIdleConns is the maximum number of connections in the idle connection pool.
|
||||||
|
MaxIdleConns int `mapstructure:"max_idle_conns"`
|
||||||
|
|
||||||
|
// DialTimeout is the timeout for dialing a new connection.
|
||||||
|
DialTimeout time.Duration `mapstructure:"dial_timeout"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClickHouseQuerySettings struct {
|
type QuerySettings struct {
|
||||||
MaxExecutionTime int `mapstructure:"max_execution_time"`
|
MaxExecutionTime int `mapstructure:"max_execution_time"`
|
||||||
MaxExecutionTimeLeaf int `mapstructure:"max_execution_time_leaf"`
|
MaxExecutionTimeLeaf int `mapstructure:"max_execution_time_leaf"`
|
||||||
TimeoutBeforeCheckingExecutionSpeed int `mapstructure:"timeout_before_checking_execution_speed"`
|
TimeoutBeforeCheckingExecutionSpeed int `mapstructure:"timeout_before_checking_execution_speed"`
|
||||||
@@ -31,15 +39,19 @@ type ClickHouseQuerySettings struct {
|
|||||||
MaxResultRowsForCHQuery int `mapstructure:"max_result_rows_for_ch_query"`
|
MaxResultRowsForCHQuery int `mapstructure:"max_result_rows_for_ch_query"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClickHouseConfig struct {
|
type ClickhouseConfig struct {
|
||||||
|
// DSN is the database source name.
|
||||||
DSN string `mapstructure:"dsn"`
|
DSN string `mapstructure:"dsn"`
|
||||||
|
|
||||||
QuerySettings ClickHouseQuerySettings `mapstructure:"settings"`
|
// QuerySettings is the query settings for clickhouse.
|
||||||
|
QuerySettings QuerySettings `mapstructure:"settings"`
|
||||||
|
|
||||||
|
// Prometheus is the prometheus configuration
|
||||||
|
Prometheus promengine.Config `mapstructure:"prometheus"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConfigFactory() factory.ConfigFactory {
|
func NewConfigFactory() factory.ConfigFactory {
|
||||||
return factory.NewConfigFactory(factory.MustNewName("telemetrystore"), newConfig)
|
return factory.NewConfigFactory(factory.MustNewName("telemetrystore"), newConfig)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newConfig() factory.Config {
|
func newConfig() factory.Config {
|
||||||
@@ -50,15 +62,39 @@ func newConfig() factory.Config {
|
|||||||
MaxIdleConns: 50,
|
MaxIdleConns: 50,
|
||||||
DialTimeout: 5 * time.Second,
|
DialTimeout: 5 * time.Second,
|
||||||
},
|
},
|
||||||
ClickHouse: ClickHouseConfig{
|
Clickhouse: ClickhouseConfig{
|
||||||
DSN: "tcp://localhost:9000",
|
DSN: "tcp://localhost:9000",
|
||||||
|
Prometheus: promengine.Config{
|
||||||
|
RemoteReadConfig: promengine.RemoteReadConfig{
|
||||||
|
URL: &url.URL{
|
||||||
|
Scheme: "tcp",
|
||||||
|
Host: "localhost:9000",
|
||||||
|
Path: "/signoz_metrics",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ActiveQueryTrackerConfig: promengine.ActiveQueryTrackerConfig{
|
||||||
|
Enabled: true,
|
||||||
|
Path: "",
|
||||||
|
MaxConcurrent: 20,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Config) Validate() error {
|
func (c Config) Validate() error {
|
||||||
if c.Provider != "clickhouse" {
|
dsn, err := url.Parse(c.Clickhouse.DSN)
|
||||||
return fmt.Errorf("provider: %q is not supported", c.Provider)
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.Clickhouse.Prometheus.RemoteReadConfig.URL.Host != dsn.Host {
|
||||||
|
return fmt.Errorf("mismatch between host in prometheus.remote_read.url %q and clickhouse.dsn %q", c.Clickhouse.Prometheus.RemoteReadConfig.URL.Host, dsn.Host)
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.Clickhouse.Prometheus.RemoteReadConfig.URL.Scheme != dsn.Scheme {
|
||||||
|
return fmt.Errorf("mismatch between scheme in prometheus.remote_read.url %q and clickhouse.dsn %q", c.Clickhouse.Prometheus.RemoteReadConfig.URL.Scheme, dsn.Scheme)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestNewWithEnvProvider(t *testing.T) {
|
func TestNewWithEnvProvider(t *testing.T) {
|
||||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN", "http://localhost:9000")
|
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN", "tcp://localhost:9000")
|
||||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS", "60")
|
t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS", "60")
|
||||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS", "150")
|
t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS", "150")
|
||||||
t.Setenv("SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT", "5s")
|
t.Setenv("SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT", "5s")
|
||||||
@@ -33,22 +33,18 @@ func TestNewWithEnvProvider(t *testing.T) {
|
|||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
actual := &Config{}
|
actual := Config{}
|
||||||
err = conf.Unmarshal("telemetrystore", actual)
|
err = conf.Unmarshal("telemetrystore", &actual)
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
expected := &Config{
|
assert.NoError(t, actual.Validate())
|
||||||
Provider: "clickhouse",
|
|
||||||
Connection: ConnectionConfig{
|
expected := NewConfigFactory().New().(Config)
|
||||||
MaxOpenConns: 150,
|
expected.Provider = "clickhouse"
|
||||||
MaxIdleConns: 60,
|
expected.Connection.MaxOpenConns = 150
|
||||||
DialTimeout: 5 * time.Second,
|
expected.Connection.MaxIdleConns = 60
|
||||||
},
|
expected.Connection.DialTimeout = 5 * time.Second
|
||||||
ClickHouse: ClickHouseConfig{
|
expected.Clickhouse.DSN = "tcp://localhost:9000"
|
||||||
DSN: "http://localhost:9000",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.Equal(t, expected, actual)
|
assert.Equal(t, expected, actual)
|
||||||
}
|
}
|
||||||
@@ -74,14 +70,14 @@ func TestNewWithEnvProviderWithQuerySettings(t *testing.T) {
|
|||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
actual := &Config{}
|
actual := Config{}
|
||||||
err = conf.Unmarshal("telemetrystore", actual)
|
err = conf.Unmarshal("telemetrystore", &actual)
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
expected := &Config{
|
expected := Config{
|
||||||
ClickHouse: ClickHouseConfig{
|
Clickhouse: ClickhouseConfig{
|
||||||
QuerySettings: ClickHouseQuerySettings{
|
QuerySettings: QuerySettings{
|
||||||
MaxExecutionTime: 10,
|
MaxExecutionTime: 10,
|
||||||
MaxExecutionTimeLeaf: 10,
|
MaxExecutionTimeLeaf: 10,
|
||||||
TimeoutBeforeCheckingExecutionSpeed: 10,
|
TimeoutBeforeCheckingExecutionSpeed: 10,
|
||||||
@@ -91,5 +87,5 @@ func TestNewWithEnvProviderWithQuerySettings(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.Equal(t, expected.ClickHouse.QuerySettings, actual.ClickHouse.QuerySettings)
|
assert.Equal(t, expected.Clickhouse.QuerySettings, actual.Clickhouse.QuerySettings)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,10 +5,12 @@ import (
|
|||||||
|
|
||||||
"github.com/ClickHouse/clickhouse-go/v2"
|
"github.com/ClickHouse/clickhouse-go/v2"
|
||||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||||
|
"github.com/SigNoz/signoz/pkg/promengine"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TelemetryStore interface {
|
type TelemetryStore interface {
|
||||||
ClickHouseDB() clickhouse.Conn
|
ClickhouseDB() clickhouse.Conn
|
||||||
|
PrometheusEngine() promengine.PromEngine
|
||||||
}
|
}
|
||||||
|
|
||||||
type TelemetryStoreHook interface {
|
type TelemetryStoreHook interface {
|
||||||
|
|||||||
@@ -12,29 +12,20 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type provider struct {
|
type provider struct {
|
||||||
settings telemetrystore.ClickHouseQuerySettings
|
settings telemetrystore.QuerySettings
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
|
func NewSettingsFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
|
||||||
return factory.NewProviderFactory(factory.MustNewName("clickhousesettings"), New)
|
return factory.NewProviderFactory(factory.MustNewName("settings"), NewSettings)
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
|
func NewSettings(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
|
||||||
return &provider{
|
return &provider{
|
||||||
settings: config.ClickHouse.QuerySettings,
|
settings: config.Clickhouse.QuerySettings,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *provider) BeforeQuery(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) {
|
func (h *provider) BeforeQuery(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) {
|
||||||
return h.clickHouseSettings(ctx, query, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *provider) AfterQuery(ctx context.Context, query string, args []interface{}, rows driver.Rows, err error) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// clickHouseSettings adds clickhouse settings to queries
|
|
||||||
func (h *provider) clickHouseSettings(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) {
|
|
||||||
settings := clickhouse.Settings{}
|
settings := clickhouse.Settings{}
|
||||||
|
|
||||||
// Apply default settings
|
// Apply default settings
|
||||||
@@ -73,6 +64,9 @@ func (h *provider) clickHouseSettings(ctx context.Context, query string, args ..
|
|||||||
return ctx, query, args
|
return ctx, query, args
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *provider) AfterQuery(ctx context.Context, query string, args []interface{}, rows driver.Rows, err error) {
|
||||||
|
}
|
||||||
|
|
||||||
func (h *provider) getLogComment(ctx context.Context) string {
|
func (h *provider) getLogComment(ctx context.Context) string {
|
||||||
// Get the key-value pairs from context for log comment
|
// Get the key-value pairs from context for log comment
|
||||||
kv := ctx.Value(common.LogCommentKey)
|
kv := ctx.Value(common.LogCommentKey)
|
||||||
|
|||||||
@@ -2,6 +2,9 @@ package telemetrystoretest
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/ClickHouse/clickhouse-go/v2"
|
"github.com/ClickHouse/clickhouse-go/v2"
|
||||||
|
"github.com/DATA-DOG/go-sqlmock"
|
||||||
|
"github.com/SigNoz/signoz/pkg/promengine"
|
||||||
|
"github.com/SigNoz/signoz/pkg/promengine/promenginetest"
|
||||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||||
)
|
)
|
||||||
@@ -10,28 +13,38 @@ var _ telemetrystore.TelemetryStore = (*Provider)(nil)
|
|||||||
|
|
||||||
// Provider represents a mock telemetry store provider for testing
|
// Provider represents a mock telemetry store provider for testing
|
||||||
type Provider struct {
|
type Provider struct {
|
||||||
mock cmock.ClickConnMockCommon
|
clickhouseDB cmock.ClickConnMockCommon
|
||||||
|
engine promengine.PromEngine
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new mock telemetry store provider
|
// New creates a new mock telemetry store provider
|
||||||
func New() (*Provider, error) {
|
func New(matcher sqlmock.QueryMatcher) (*Provider, error) {
|
||||||
options := &clickhouse.Options{} // Default options
|
clickhouseDB, err := cmock.NewClickHouseWithQueryMatcher(&clickhouse.Options{}, matcher)
|
||||||
mock, err := cmock.NewClickHouseNative(options)
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
engine, err := promenginetest.New()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Provider{
|
return &Provider{
|
||||||
mock: mock,
|
clickhouseDB: clickhouseDB,
|
||||||
|
engine: engine,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Provider) PrometheusEngine() promengine.PromEngine {
|
||||||
|
return p.engine
|
||||||
|
}
|
||||||
|
|
||||||
// ClickhouseDB returns the mock Clickhouse connection
|
// ClickhouseDB returns the mock Clickhouse connection
|
||||||
func (p *Provider) ClickHouseDB() clickhouse.Conn {
|
func (p *Provider) ClickhouseDB() clickhouse.Conn {
|
||||||
return p.mock.(clickhouse.Conn)
|
return p.clickhouseDB.(clickhouse.Conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mock returns the underlying Clickhouse mock instance for setting expectations
|
// Mock returns the underlying Clickhouse mock instance for setting expectations
|
||||||
func (p *Provider) Mock() cmock.ClickConnMockCommon {
|
func (p *Provider) Mock() cmock.ClickConnMockCommon {
|
||||||
return p.mock
|
return p.clickhouseDB
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package telemetrystoretest
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/DATA-DOG/go-sqlmock"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -19,7 +20,7 @@ func TestNew(t *testing.T) {
|
|||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
provider, err := New()
|
provider, err := New(sqlmock.QueryMatcherRegexp)
|
||||||
if tt.wantErr {
|
if tt.wantErr {
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
assert.Nil(t, provider)
|
assert.Nil(t, provider)
|
||||||
@@ -29,7 +30,8 @@ func TestNew(t *testing.T) {
|
|||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.NotNil(t, provider)
|
assert.NotNil(t, provider)
|
||||||
assert.NotNil(t, provider.Mock())
|
assert.NotNil(t, provider.Mock())
|
||||||
assert.NotNil(t, provider.ClickHouseDB())
|
assert.NotNil(t, provider.ClickhouseDB())
|
||||||
|
assert.NotNil(t, provider.PrometheusEngine())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user