|
|
|
|
@@ -9,8 +9,15 @@ import (
|
|
|
|
|
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type ClickhouseQuerySettings struct {
|
|
|
|
|
MaxExecutionTimeLeaf string
|
|
|
|
|
TimeoutBeforeCheckingExecutionSpeed string
|
|
|
|
|
MaxBytesToRead string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type clickhouseConnWrapper struct {
|
|
|
|
|
conn clickhouse.Conn
|
|
|
|
|
conn clickhouse.Conn
|
|
|
|
|
settings ClickhouseQuerySettings
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c clickhouseConnWrapper) Close() error {
|
|
|
|
|
@@ -25,16 +32,46 @@ func (c clickhouseConnWrapper) Stats() driver.Stats {
|
|
|
|
|
return c.conn.Stats()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c clickhouseConnWrapper) logComment(ctx context.Context) context.Context {
|
|
|
|
|
func (c clickhouseConnWrapper) addClickHouseSettings(ctx context.Context, query string) context.Context {
|
|
|
|
|
settings := clickhouse.Settings{}
|
|
|
|
|
|
|
|
|
|
logComment := c.getLogComment(ctx)
|
|
|
|
|
if logComment != "" {
|
|
|
|
|
settings["log_comment"] = logComment
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// don't add resource restrictions for metrics and traces
|
|
|
|
|
if !strings.Contains(query, "signoz_logs") {
|
|
|
|
|
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
|
|
|
|
|
return ctx
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if c.settings.MaxBytesToRead != "" {
|
|
|
|
|
settings["max_bytes_to_read"] = c.settings.MaxBytesToRead
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if c.settings.MaxExecutionTimeLeaf != "" {
|
|
|
|
|
settings["max_execution_time_leaf"] = c.settings.MaxExecutionTimeLeaf
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if c.settings.TimeoutBeforeCheckingExecutionSpeed != "" {
|
|
|
|
|
settings["timeout_before_checking_execution_speed"] = c.settings.TimeoutBeforeCheckingExecutionSpeed
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
|
|
|
|
|
return ctx
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c clickhouseConnWrapper) getLogComment(ctx context.Context) string {
|
|
|
|
|
// Get the key-value pairs from context for log comment
|
|
|
|
|
kv := ctx.Value("log_comment")
|
|
|
|
|
if kv == nil {
|
|
|
|
|
return ctx
|
|
|
|
|
return ""
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logCommentKVs, ok := kv.(map[string]string)
|
|
|
|
|
if !ok {
|
|
|
|
|
return ctx
|
|
|
|
|
return ""
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logComment := ""
|
|
|
|
|
@@ -43,34 +80,31 @@ func (c clickhouseConnWrapper) logComment(ctx context.Context) context.Context {
|
|
|
|
|
}
|
|
|
|
|
logComment = strings.TrimSuffix(logComment, ", ")
|
|
|
|
|
|
|
|
|
|
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
|
|
|
|
|
"log_comment": logComment,
|
|
|
|
|
}))
|
|
|
|
|
return ctx
|
|
|
|
|
return logComment
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c clickhouseConnWrapper) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
|
|
|
|
|
return c.conn.Query(c.logComment(ctx), query, args...)
|
|
|
|
|
return c.conn.Query(c.addClickHouseSettings(ctx, query), query, args...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c clickhouseConnWrapper) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
|
|
|
|
|
return c.conn.QueryRow(c.logComment(ctx), query, args...)
|
|
|
|
|
return c.conn.QueryRow(c.addClickHouseSettings(ctx, query), query, args...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c clickhouseConnWrapper) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
|
|
|
|
|
return c.conn.Select(c.logComment(ctx), dest, query, args...)
|
|
|
|
|
return c.conn.Select(c.addClickHouseSettings(ctx, query), dest, query, args...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c clickhouseConnWrapper) Exec(ctx context.Context, query string, args ...interface{}) error {
|
|
|
|
|
return c.conn.Exec(c.logComment(ctx), query, args...)
|
|
|
|
|
return c.conn.Exec(c.addClickHouseSettings(ctx, query), query, args...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c clickhouseConnWrapper) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
|
|
|
|
|
return c.conn.AsyncInsert(c.logComment(ctx), query, wait, args...)
|
|
|
|
|
return c.conn.AsyncInsert(c.addClickHouseSettings(ctx, query), query, wait, args...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c clickhouseConnWrapper) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
|
|
|
|
|
return c.conn.PrepareBatch(c.logComment(ctx), query, opts...)
|
|
|
|
|
return c.conn.PrepareBatch(c.addClickHouseSettings(ctx, query), query, opts...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c clickhouseConnWrapper) ServerVersion() (*driver.ServerVersion, error) {
|
|
|
|
|
|