Compare commits

...

7 Commits

Author SHA1 Message Date
Prashant Shahi
b3665ab0bd Merge branch 'develop' into feat/optimize-s3-archive 2024-04-19 10:19:28 +05:30
makeavish
394b30b0e9 chore: address review comments
Allow cluster to be a variable
2024-04-16 10:04:41 +05:30
Vishal Sharma
cf8cc5df4b Merge branch 'develop' into feat/optimize-s3-archive 2024-04-16 09:40:17 +05:30
Vishal Sharma
ce7fdacdc0 Merge branch 'develop' into feat/optimize-s3-archive 2024-04-12 12:10:22 +05:30
Prashant Shahi
982f4535e7 Merge branch 'develop' into feat/optimize-s3-archive 2024-04-10 15:17:58 +05:45
Prashant Shahi
8bc18646e2 Merge branch 'develop' into feat/optimize-s3-archive 2024-04-09 13:53:29 +05:45
makeavish
19d4c34143 feat: clickhouseOptimizeS3
Co-authored-by: Prashant Shahi <prashant@signoz.io>
2024-04-07 00:36:05 +05:30
4 changed files with 149 additions and 1 deletions

View File

@@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.signoz.io/signoz/ee/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseOptimizeS3"
"go.signoz.io/signoz/pkg/query-service/auth"
baseconst "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/version"
@@ -143,6 +144,8 @@ func main() {
zap.L().Info("JWT secret key set successfully.")
}
clickhouseOptimizeS3.InitChOptimizer(cluster)
server, err := app.NewServer(serverOptions)
if err != nil {
zap.L().Fatal("Failed to create server", zap.Error(err))

View File

@@ -0,0 +1,142 @@
package clickhouseOptimizeS3
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/go-co-op/gocron"
"go.uber.org/zap"
)
// General
const (
CH_OPTIMIZE_INTERVAL_IN_HOURS = 24
CH_TIMEOUT_WAIT_IN_MINUTES = 30
S3_DISK_TYPE = "s3"
)
// Error message templates
const (
ERROR_RUNNING_CRON_JOB = "error running ch optimize cron job"
ERROR_SCHEDULING_CRON_JOB = "error scheduling cron job for %s"
)
func InitChOptimizer(cluster string) error {
chConn, err := initClickhouse()
if err != nil {
zap.L().Error("failed to initialize ClickHouse", zap.Error(err))
return err
}
if s3Enabled, err := checkS3Enabled(chConn); err != nil {
zap.L().Error("failed to check if S3 is enabled", zap.Error(err))
return err
} else if !s3Enabled {
zap.L().Info("S3 is not enabled, skipping clickhouse s3 optimization")
return nil
}
err = runCronJobAsync(chConn, cluster)
if err != nil {
zap.L().Error(ERROR_RUNNING_CRON_JOB, zap.Error(err))
return err
}
zap.L().Info("clickhouseOptimizeS3 cron job started successfully")
return nil
}
func initClickhouse() (driver.Conn, error) {
datasource := os.Getenv("ClickHouseUrl")
ctx := context.Background()
options, err := clickhouse.ParseDSN(datasource)
if err != nil {
zap.L().Error("failed to parse DSN", zap.Error(err))
return nil, err
}
zap.L().Info("Connecting to Clickhouse", zap.String("at", options.Addr[0]))
db, err := clickhouse.Open(options)
if err != nil {
zap.L().Error("failed to initialize ClickHouse", zap.Error(err))
return nil, err
}
if err := db.Ping(ctx); err != nil {
zap.L().Error("failed to ping ClickHouse", zap.Error(err))
return nil, err
}
return db, nil
}
func runCronJobAsync(chConn driver.Conn, cluster string) error {
var s *gocron.Scheduler
var err error
s = gocron.NewScheduler(time.UTC)
_, err = s.Every(CH_OPTIMIZE_INTERVAL_IN_HOURS).Hour().Do(optimizeTables, chConn, cluster)
if err != nil {
return fmt.Errorf(ERROR_SCHEDULING_CRON_JOB, err)
}
s.StartAsync()
return nil
}
func optimizeTables(conn driver.Conn, cluster string) {
// Array of db_name.table_name
tables := []string{
"signoz_logs.logs",
"signoz_metrics.samples_v2",
"signoz_metrics.time_series_v4",
"signoz_metrics.time_series_v3",
"signoz_metrics.time_series_v2",
"signoz_traces.usage_explorer",
"signoz_traces.span_attributes",
"signoz_traces.dependency_graph_minutes",
"signoz_traces.dependency_graph_minutes_v2",
"signoz_traces.signoz_error_index_v2",
"signoz_traces.signoz_index_v2",
"signoz_traces.signoz_spans",
"signoz_traces.durationSort",
}
for _, table := range tables {
// run OPTIMIZE TABLE db_name.table_name ON CLUSTER cluster FINAL SETTINGS optimize_skip_merged_partitions=1;
err := conn.Exec(context.Background(), "OPTIMIZE TABLE "+table+" ON CLUSTER "+cluster+" FINAL SETTINGS optimize_skip_merged_partitions=1;")
if err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
if exception.Code == 159 {
// sleep for CH_TIMEOUT_WAIT_IN_MINUTES if there's TIMEOUT_EXCEEDED - 159 error
time.Sleep(CH_TIMEOUT_WAIT_IN_MINUTES * time.Minute)
} else {
log.Println("Error while optimizing table: ", table, err)
}
}
}
}
}
func checkS3Enabled(conn clickhouse.Conn) (bool, error) {
var s3DiskCount uint64
ctx := context.Background()
query := fmt.Sprintf("SELECT count() FROM system.disks where type='%v'", S3_DISK_TYPE)
row := conn.QueryRow(ctx, query)
if err := row.Scan(&s3DiskCount); err != nil {
return false, err
}
if s3DiskCount > 0 {
return true, nil
}
return false, nil
}

View File

@@ -85,7 +85,7 @@ type namespaceConfig struct {
Connector Connector
}
// Connecto defines how to connect to the database
// Connector defines how to connect to the database
type Connector func(cfg *namespaceConfig) (clickhouse.Conn, error)
func defaultConnector(cfg *namespaceConfig) (clickhouse.Conn, error) {

View File

@@ -9,6 +9,7 @@ import (
"time"
"go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseOptimizeS3"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/version"
@@ -90,6 +91,8 @@ func main() {
zap.L().Info("JWT secret key set successfully.")
}
clickhouseOptimizeS3.InitChOptimizer(cluster)
server, err := app.NewServer(serverOptions)
if err != nil {
logger.Fatal("Failed to create server", zap.Error(err))