Compare commits

...

3 Commits

Author SHA1 Message Date
nityanandagohain
e4d9c4e239 fix: error check 2024-03-30 19:39:52 +05:30
nityanandagohain
781732f25a Merge remote-tracking branch 'origin/develop' into issue_4777 2024-03-30 19:22:20 +05:30
nityanandagohain
77e55a0ec9 feat: allow query restrictions for log queries 2024-03-30 18:41:53 +05:30
2 changed files with 72 additions and 17 deletions

View File

@@ -162,7 +162,14 @@ func NewReaderFromClickhouseConnection(
os.Exit(1) os.Exit(1)
} }
wrap := clickhouseConnWrapper{conn: db} wrap := clickhouseConnWrapper{
conn: db,
settings: ClickhouseQuerySettings{
MaxExecutionTimeLeaf: os.Getenv("ClickHouseMaxExecutionTimeLeaf"),
TimeoutBeforeCheckingExecutionSpeed: os.Getenv("ClickHouseTimeoutBeforeCheckingExecutionSpeed"),
MaxBytesToRead: os.Getenv("ClickHouseMaxBytesToRead"),
},
}
return &ClickHouseReader{ return &ClickHouseReader{
db: wrap, db: wrap,
@@ -4732,7 +4739,7 @@ func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNam
series := v3.Series{Labels: seriesToAttrs[key], Points: points, GroupingSetsPoint: groupingSetsPoint, LabelsArray: labelsArray[key]} series := v3.Series{Labels: seriesToAttrs[key], Points: points, GroupingSetsPoint: groupingSetsPoint, LabelsArray: labelsArray[key]}
seriesList = append(seriesList, &series) seriesList = append(seriesList, &series)
} }
return seriesList, nil return seriesList, getPersonalisedError(rows.Err())
} }
func logComment(ctx context.Context) string { func logComment(ctx context.Context) string {
@@ -4823,10 +4830,24 @@ func (r *ClickHouseReader) GetListResultV3(ctx context.Context, query string) ([
rowList = append(rowList, &v3.Row{Timestamp: t, Data: row}) rowList = append(rowList, &v3.Row{Timestamp: t, Data: row})
} }
return rowList, nil return rowList, getPersonalisedError(rows.Err())
} }
func getPersonalisedError(err error) error {
if err == nil {
return nil
}
if strings.Contains(err.Error(), "code: 307") {
return errors.New("query is consuming too much resources, please reach out to the team")
}
if strings.Contains(err.Error(), "code: 159") {
return errors.New("Query is taking too long to run, please reach out to the team")
}
return err
}
func removeDuplicateUnderscoreAttributes(row map[string]interface{}) { func removeDuplicateUnderscoreAttributes(row map[string]interface{}) {
if val, ok := row["attributes_int64"]; ok { if val, ok := row["attributes_int64"]; ok {
attributes := val.(*map[string]int64) attributes := val.(*map[string]int64)

View File

@@ -9,8 +9,15 @@ import (
"github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
) )
type ClickhouseQuerySettings struct {
MaxExecutionTimeLeaf string
TimeoutBeforeCheckingExecutionSpeed string
MaxBytesToRead string
}
type clickhouseConnWrapper struct { type clickhouseConnWrapper struct {
conn clickhouse.Conn conn clickhouse.Conn
settings ClickhouseQuerySettings
} }
func (c clickhouseConnWrapper) Close() error { func (c clickhouseConnWrapper) Close() error {
@@ -25,16 +32,46 @@ func (c clickhouseConnWrapper) Stats() driver.Stats {
return c.conn.Stats() return c.conn.Stats()
} }
func (c clickhouseConnWrapper) logComment(ctx context.Context) context.Context { func (c clickhouseConnWrapper) addClickHouseSettings(ctx context.Context, query string) context.Context {
settings := clickhouse.Settings{}
logComment := c.getLogComment(ctx)
if logComment != "" {
settings["log_comment"] = logComment
}
// don't add resource restrictions for metrics and traces
if !strings.Contains(query, "signoz_logs") {
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
return ctx
}
if c.settings.MaxBytesToRead != "" {
settings["max_bytes_to_read"] = c.settings.MaxBytesToRead
}
if c.settings.MaxExecutionTimeLeaf != "" {
settings["max_execution_time_leaf"] = c.settings.MaxExecutionTimeLeaf
}
if c.settings.TimeoutBeforeCheckingExecutionSpeed != "" {
settings["timeout_before_checking_execution_speed"] = c.settings.TimeoutBeforeCheckingExecutionSpeed
}
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
return ctx
}
func (c clickhouseConnWrapper) getLogComment(ctx context.Context) string {
// Get the key-value pairs from context for log comment // Get the key-value pairs from context for log comment
kv := ctx.Value("log_comment") kv := ctx.Value("log_comment")
if kv == nil { if kv == nil {
return ctx return ""
} }
logCommentKVs, ok := kv.(map[string]string) logCommentKVs, ok := kv.(map[string]string)
if !ok { if !ok {
return ctx return ""
} }
logComment := "" logComment := ""
@@ -43,34 +80,31 @@ func (c clickhouseConnWrapper) logComment(ctx context.Context) context.Context {
} }
logComment = strings.TrimSuffix(logComment, ", ") logComment = strings.TrimSuffix(logComment, ", ")
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{ return logComment
"log_comment": logComment,
}))
return ctx
} }
func (c clickhouseConnWrapper) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) { func (c clickhouseConnWrapper) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
return c.conn.Query(c.logComment(ctx), query, args...) return c.conn.Query(c.addClickHouseSettings(ctx, query), query, args...)
} }
func (c clickhouseConnWrapper) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row { func (c clickhouseConnWrapper) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
return c.conn.QueryRow(c.logComment(ctx), query, args...) return c.conn.QueryRow(c.addClickHouseSettings(ctx, query), query, args...)
} }
func (c clickhouseConnWrapper) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error { func (c clickhouseConnWrapper) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
return c.conn.Select(c.logComment(ctx), dest, query, args...) return c.conn.Select(c.addClickHouseSettings(ctx, query), dest, query, args...)
} }
func (c clickhouseConnWrapper) Exec(ctx context.Context, query string, args ...interface{}) error { func (c clickhouseConnWrapper) Exec(ctx context.Context, query string, args ...interface{}) error {
return c.conn.Exec(c.logComment(ctx), query, args...) return c.conn.Exec(c.addClickHouseSettings(ctx, query), query, args...)
} }
func (c clickhouseConnWrapper) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error { func (c clickhouseConnWrapper) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
return c.conn.AsyncInsert(c.logComment(ctx), query, wait, args...) return c.conn.AsyncInsert(c.addClickHouseSettings(ctx, query), query, wait, args...)
} }
func (c clickhouseConnWrapper) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) { func (c clickhouseConnWrapper) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
return c.conn.PrepareBatch(c.logComment(ctx), query, opts...) return c.conn.PrepareBatch(c.addClickHouseSettings(ctx, query), query, opts...)
} }
func (c clickhouseConnWrapper) ServerVersion() (*driver.ServerVersion, error) { func (c clickhouseConnWrapper) ServerVersion() (*driver.ServerVersion, error) {