Compare commits
9 Commits
main
...
v0.45.0-al
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5627172c2c | ||
|
|
dab4ccc5eb | ||
|
|
b3665ab0bd | ||
|
|
394b30b0e9 | ||
|
|
cf8cc5df4b | ||
|
|
ce7fdacdc0 | ||
|
|
982f4535e7 | ||
|
|
8bc18646e2 | ||
|
|
19d4c34143 |
@@ -13,6 +13,7 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
|
||||
"go.signoz.io/signoz/ee/query-service/app"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/clickhouseOptimizeS3"
|
||||
"go.signoz.io/signoz/pkg/query-service/auth"
|
||||
baseconst "go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/version"
|
||||
@@ -143,6 +144,8 @@ func main() {
|
||||
zap.L().Info("JWT secret key set successfully.")
|
||||
}
|
||||
|
||||
clickhouseOptimizeS3.InitChOptimizer(cluster)
|
||||
|
||||
server, err := app.NewServer(serverOptions)
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to create server", zap.Error(err))
|
||||
|
||||
142
pkg/query-service/app/clickhouseOptimizeS3/optimize.go
Normal file
142
pkg/query-service/app/clickhouseOptimizeS3/optimize.go
Normal file
@@ -0,0 +1,142 @@
|
||||
package clickhouseOptimizeS3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
"github.com/go-co-op/gocron"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// General
|
||||
const (
|
||||
CH_OPTIMIZE_INTERVAL_IN_HOURS = 24
|
||||
CH_TIMEOUT_WAIT_IN_MINUTES = 30
|
||||
S3_DISK_TYPE = "s3"
|
||||
)
|
||||
|
||||
// Error message templates
|
||||
const (
|
||||
ERROR_RUNNING_CRON_JOB = "error running ch optimize cron job"
|
||||
ERROR_SCHEDULING_CRON_JOB = "error scheduling cron job for %s"
|
||||
)
|
||||
|
||||
func InitChOptimizer(cluster string) error {
|
||||
|
||||
chConn, err := initClickhouse()
|
||||
if err != nil {
|
||||
zap.L().Error("failed to initialize ClickHouse", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if s3Enabled, err := checkS3Enabled(chConn); err != nil {
|
||||
zap.L().Error("failed to check if S3 is enabled", zap.Error(err))
|
||||
return err
|
||||
} else if !s3Enabled {
|
||||
zap.L().Info("S3 is not enabled, skipping clickhouse s3 optimization")
|
||||
return nil
|
||||
}
|
||||
|
||||
err = runCronJobAsync(chConn, cluster)
|
||||
if err != nil {
|
||||
zap.L().Error(ERROR_RUNNING_CRON_JOB, zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
zap.L().Info("clickhouseOptimizeS3 cron job started successfully")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func initClickhouse() (driver.Conn, error) {
|
||||
datasource := os.Getenv("ClickHouseUrl")
|
||||
ctx := context.Background()
|
||||
options, err := clickhouse.ParseDSN(datasource)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to parse DSN", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
zap.L().Info("Connecting to Clickhouse", zap.String("at", options.Addr[0]))
|
||||
db, err := clickhouse.Open(options)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to initialize ClickHouse", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := db.Ping(ctx); err != nil {
|
||||
zap.L().Error("failed to ping ClickHouse", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func runCronJobAsync(chConn driver.Conn, cluster string) error {
|
||||
var s *gocron.Scheduler
|
||||
var err error
|
||||
|
||||
s = gocron.NewScheduler(time.UTC)
|
||||
_, err = s.Every(CH_OPTIMIZE_INTERVAL_IN_HOURS).Hour().Do(optimizeTables, chConn, cluster)
|
||||
if err != nil {
|
||||
return fmt.Errorf(ERROR_SCHEDULING_CRON_JOB, err)
|
||||
}
|
||||
|
||||
s.StartAsync()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func optimizeTables(conn driver.Conn, cluster string) {
|
||||
|
||||
// Array of db_name.table_name
|
||||
tables := []string{
|
||||
"signoz_logs.logs",
|
||||
"signoz_metrics.samples_v2",
|
||||
"signoz_metrics.time_series_v4",
|
||||
"signoz_metrics.time_series_v3",
|
||||
"signoz_metrics.time_series_v2",
|
||||
"signoz_traces.usage_explorer",
|
||||
"signoz_traces.span_attributes",
|
||||
"signoz_traces.dependency_graph_minutes",
|
||||
"signoz_traces.dependency_graph_minutes_v2",
|
||||
"signoz_traces.signoz_error_index_v2",
|
||||
"signoz_traces.signoz_index_v2",
|
||||
"signoz_traces.signoz_spans",
|
||||
"signoz_traces.durationSort",
|
||||
}
|
||||
for _, table := range tables {
|
||||
// run OPTIMIZE TABLE db_name.table_name ON CLUSTER cluster FINAL SETTINGS optimize_skip_merged_partitions=1;
|
||||
err := conn.Exec(context.Background(), "OPTIMIZE TABLE "+table+" ON CLUSTER "+cluster+" FINAL SETTINGS optimize_skip_merged_partitions=1;")
|
||||
|
||||
if err != nil {
|
||||
if exception, ok := err.(*clickhouse.Exception); ok {
|
||||
if exception.Code == 159 {
|
||||
// sleep for CH_TIMEOUT_WAIT_IN_MINUTES if there's TIMEOUT_EXCEEDED - 159 error
|
||||
time.Sleep(CH_TIMEOUT_WAIT_IN_MINUTES * time.Minute)
|
||||
} else {
|
||||
log.Println("Error while optimizing table: ", table, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func checkS3Enabled(conn clickhouse.Conn) (bool, error) {
|
||||
var s3DiskCount uint64
|
||||
ctx := context.Background()
|
||||
query := fmt.Sprintf("SELECT count() FROM system.disks where type='%v'", S3_DISK_TYPE)
|
||||
row := conn.QueryRow(ctx, query)
|
||||
if err := row.Scan(&s3DiskCount); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if s3DiskCount > 0 {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
@@ -85,7 +85,7 @@ type namespaceConfig struct {
|
||||
Connector Connector
|
||||
}
|
||||
|
||||
// Connecto defines how to connect to the database
|
||||
// Connector defines how to connect to the database
|
||||
type Connector func(cfg *namespaceConfig) (clickhouse.Conn, error)
|
||||
|
||||
func defaultConnector(cfg *namespaceConfig) (clickhouse.Conn, error) {
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/app"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/clickhouseOptimizeS3"
|
||||
"go.signoz.io/signoz/pkg/query-service/auth"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/version"
|
||||
@@ -90,6 +91,8 @@ func main() {
|
||||
zap.L().Info("JWT secret key set successfully.")
|
||||
}
|
||||
|
||||
clickhouseOptimizeS3.InitChOptimizer(cluster)
|
||||
|
||||
server, err := app.NewServer(serverOptions)
|
||||
if err != nil {
|
||||
logger.Fatal("Failed to create server", zap.Error(err))
|
||||
|
||||
Reference in New Issue
Block a user