Compare commits
46 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a431b53af4 | ||
|
|
574cb5a531 | ||
|
|
37554c8c29 | ||
|
|
35b12bacbc | ||
|
|
a70cf322c7 | ||
|
|
baebb13d0b | ||
|
|
843d9e1a8c | ||
|
|
d91e7c1e80 | ||
|
|
4809dc064f | ||
|
|
e2ccc5cf26 | ||
|
|
5f35528780 | ||
|
|
a5bff98837 | ||
|
|
17646957dc | ||
|
|
767b9bfb39 | ||
|
|
0d152b0159 | ||
|
|
c5459f3456 | ||
|
|
ccd2ccacb9 | ||
|
|
097a561b38 | ||
|
|
7c07f599db | ||
|
|
556370263b | ||
|
|
bc61850c43 | ||
|
|
c1f86b17ab | ||
|
|
ea4c7ac51f | ||
|
|
e7269bb14b | ||
|
|
45901951fb | ||
|
|
08b9e9b9fa | ||
|
|
a41d413203 | ||
|
|
d83daa6085 | ||
|
|
8149bb5664 | ||
|
|
43a31221a7 | ||
|
|
7394c06fdf | ||
|
|
3e43a966c2 | ||
|
|
bba49c1b24 | ||
|
|
03d9c620f2 | ||
|
|
d4bdcb1f2d | ||
|
|
51794bae10 | ||
|
|
2a53b953a1 | ||
|
|
969d6b00e7 | ||
|
|
ab0ae8e6ad | ||
|
|
9676b7e068 | ||
|
|
8215cabf71 | ||
|
|
bab1399680 | ||
|
|
0ca886e213 | ||
|
|
9385029f5c | ||
|
|
f6ac729e70 | ||
|
|
59d3198b80 |
@@ -374,6 +374,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
|
||||
apiHandler.RegisterQueryRangeV4Routes(r, am)
|
||||
apiHandler.RegisterWebSocketPaths(r, am)
|
||||
apiHandler.RegisterMessagingQueuesRoutes(r, am)
|
||||
apiHandler.MetricExplorerRoutes(r, am)
|
||||
|
||||
c := cors.New(cors.Options{
|
||||
AllowedOrigins: []string{"*"},
|
||||
|
||||
2
go.mod
2
go.mod
@@ -72,6 +72,7 @@ require (
|
||||
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
|
||||
golang.org/x/net v0.33.0
|
||||
golang.org/x/oauth2 v0.24.0
|
||||
golang.org/x/sync v0.10.0
|
||||
golang.org/x/text v0.21.0
|
||||
google.golang.org/grpc v1.67.1
|
||||
google.golang.org/protobuf v1.35.2
|
||||
@@ -263,7 +264,6 @@ require (
|
||||
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
|
||||
go.uber.org/atomic v1.11.0 // indirect
|
||||
golang.org/x/mod v0.22.0 // indirect
|
||||
golang.org/x/sync v0.10.0 // indirect
|
||||
golang.org/x/sys v0.29.0 // indirect
|
||||
golang.org/x/time v0.6.0 // indirect
|
||||
golang.org/x/tools v0.28.0 // indirect
|
||||
|
||||
@@ -16,6 +16,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/model/metrics_explorer"
|
||||
|
||||
"github.com/go-kit/log"
|
||||
"github.com/go-kit/log/level"
|
||||
"github.com/google/uuid"
|
||||
@@ -1141,7 +1143,7 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU
|
||||
|
||||
func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams,
|
||||
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
|
||||
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
|
||||
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
|
||||
searchSpansResult := []model.SearchSpansResult{
|
||||
{
|
||||
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"},
|
||||
@@ -1289,7 +1291,7 @@ func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.Sea
|
||||
|
||||
func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams,
|
||||
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
|
||||
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
|
||||
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
|
||||
|
||||
if r.useTraceNewSchema {
|
||||
return r.SearchTracesV2(ctx, params, smartTraceAlgorithm)
|
||||
@@ -5594,3 +5596,828 @@ func (r *ClickHouseReader) SubscribeToQueryProgress(
|
||||
) (<-chan model.QueryProgress, func(), *model.ApiError) {
|
||||
return r.queryProgressTracker.SubscribeToQueryProgress(queryId)
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetAllMetricFilterAttributeKeys(ctx context.Context, req *metrics_explorer.FilterKeyRequest, skipDotNames bool) (*[]v3.AttributeKey, *model.ApiError) {
|
||||
var rows driver.Rows
|
||||
var response []v3.AttributeKey
|
||||
query := fmt.Sprintf("SELECT arrayJoin(tagKeys) AS distinctTagKey FROM (SELECT JSONExtractKeys(labels) AS tagKeys FROM %s.%s WHERE unix_milli >= $1 GROUP BY tagKeys) WHERE distinctTagKey ILIKE $2 AND distinctTagKey NOT LIKE '\\_\\_%%' GROUP BY distinctTagKey", signozMetricDBName, signozTSTableNameV41Day)
|
||||
if req.Limit != 0 {
|
||||
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
|
||||
}
|
||||
rows, err := r.db.Query(ctx, query, common.PastDayRoundOff(), fmt.Sprintf("%%%s%%", req.SearchText))
|
||||
if err != nil {
|
||||
zap.L().Error("Error while executing query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
|
||||
var attributeKey string
|
||||
for rows.Next() {
|
||||
if err := rows.Scan(&attributeKey); err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
if skipDotNames && strings.Contains(attributeKey, ".") {
|
||||
continue
|
||||
}
|
||||
key := v3.AttributeKey{
|
||||
Key: attributeKey,
|
||||
DataType: v3.AttributeKeyDataTypeString, // https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto#L64-L72.
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
IsColumn: false,
|
||||
}
|
||||
response = append(response, key)
|
||||
}
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetAllMetricFilterAttributeValues(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) {
|
||||
var query string
|
||||
var err error
|
||||
var rows driver.Rows
|
||||
var attributeValues []string
|
||||
|
||||
query = fmt.Sprintf("SELECT JSONExtractString(labels, $1) AS tagValue FROM %s.%s WHERE JSONExtractString(labels, $2) ILIKE $3 AND unix_milli >= $4 GROUP BY tagValue", signozMetricDBName, signozTSTableNameV41Day)
|
||||
if req.Limit != 0 {
|
||||
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
|
||||
}
|
||||
rows, err = r.db.Query(ctx, query, req.FilterKey, req.FilterKey, fmt.Sprintf("%%%s%%", req.SearchText), common.PastDayRoundOff())
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error while executing query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var atrributeValue string
|
||||
for rows.Next() {
|
||||
if err := rows.Scan(&atrributeValue); err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
attributeValues = append(attributeValues, atrributeValue)
|
||||
}
|
||||
return attributeValues, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetAllMetricFilterUnits(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) {
|
||||
var rows driver.Rows
|
||||
var response []string
|
||||
query := fmt.Sprintf("SELECT DISTINCT unit FROM %s.%s WHERE unit ILIKE $1 AND unit IS NOT NULL ORDER BY unit", signozMetricDBName, signozTSTableNameV41Day)
|
||||
if req.Limit != 0 {
|
||||
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
|
||||
}
|
||||
|
||||
rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText))
|
||||
if err != nil {
|
||||
zap.L().Error("Error while executing query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
|
||||
var attributeKey string
|
||||
for rows.Next() {
|
||||
if err := rows.Scan(&attributeKey); err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
response = append(response, attributeKey)
|
||||
}
|
||||
return response, nil
|
||||
}
|
||||
func (r *ClickHouseReader) GetAllMetricFilterTypes(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) {
|
||||
var rows driver.Rows
|
||||
var response []string
|
||||
query := fmt.Sprintf("SELECT DISTINCT type FROM %s.%s WHERE type ILIKE $1 AND type IS NOT NULL ORDER BY type", signozMetricDBName, signozTSTableNameV41Day)
|
||||
if req.Limit != 0 {
|
||||
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
|
||||
}
|
||||
|
||||
rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText))
|
||||
if err != nil {
|
||||
zap.L().Error("Error while executing query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
|
||||
var attributeKey string
|
||||
for rows.Next() {
|
||||
if err := rows.Scan(&attributeKey); err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
response = append(response, attributeKey)
|
||||
}
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetMetricsDataPointsAndLastReceived(ctx context.Context, metricName string) (uint64, uint64, *model.ApiError) {
|
||||
query := fmt.Sprintf("SELECT COUNT(*) AS data_points, MAX(unix_milli) AS last_received_time FROM %s.%s WHERE metric_name = ?", signozMetricDBName, signozSampleTableName)
|
||||
var lastRecievedTimestamp int64 // Changed from uint64 to int64
|
||||
var dataPoints uint64
|
||||
err := r.db.QueryRow(ctx, query, metricName).Scan(&dataPoints, &lastRecievedTimestamp)
|
||||
if err != nil {
|
||||
return 0, 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
return dataPoints, uint64(lastRecievedTimestamp), nil // Convert to uint64 before returning
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTotalTimeSeriesForMetricName(ctx context.Context, metricName string) (uint64, *model.ApiError) {
|
||||
query := fmt.Sprintf(`SELECT
|
||||
uniq(fingerprint) AS timeSeriesCount
|
||||
FROM %s.%s
|
||||
WHERE metric_name = ?;`, signozMetricDBName, signozTSTableNameV41Week)
|
||||
var timeSeriesCount uint64
|
||||
err := r.db.QueryRow(ctx, query, metricName).Scan(&timeSeriesCount)
|
||||
if err != nil {
|
||||
return 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
return timeSeriesCount, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetAttributesForMetricName(ctx context.Context, metricName string) (*[]metrics_explorer.Attribute, *model.ApiError) {
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
kv.1 AS key,
|
||||
arrayMap(x -> trim(BOTH '\"' FROM x), groupUniqArray(10000)(kv.2)) AS values,
|
||||
length(groupUniqArray(10000)(kv.2)) AS valueCount
|
||||
FROM %s.%s
|
||||
ARRAY JOIN arrayFilter(x -> NOT startsWith(x.1, '__'), JSONExtractKeysAndValuesRaw(labels)) AS kv
|
||||
WHERE metric_name = ?
|
||||
GROUP BY kv.1
|
||||
ORDER BY valueCount DESC;
|
||||
`, signozMetricDBName, signozTSTableNameV41Week)
|
||||
|
||||
rows, err := r.db.Query(ctx, query, metricName)
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
defer rows.Close() // Ensure the rows are closed
|
||||
|
||||
var attributesList []metrics_explorer.Attribute
|
||||
for rows.Next() {
|
||||
var key string
|
||||
var values []string
|
||||
var valueCount uint64
|
||||
|
||||
// Manually scan each value into its corresponding variable
|
||||
if err := rows.Scan(&key, &values, &valueCount); err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
|
||||
// Append the scanned values into the struct
|
||||
attributesList = append(attributesList, metrics_explorer.Attribute{
|
||||
Key: key,
|
||||
Value: values,
|
||||
ValueCount: valueCount,
|
||||
})
|
||||
}
|
||||
|
||||
// Handle any errors encountered while scanning rows
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
|
||||
return &attributesList, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetActiveTimeSeriesForMetricName(ctx context.Context, metricName string, duration time.Duration) (uint64, *model.ApiError) {
|
||||
milli := time.Now().Add(-duration).UnixMilli()
|
||||
query := fmt.Sprintf("SELECT uniq(fingerprint) FROM %s.%s WHERE metric_name = '%s' and unix_milli >= ?", signozMetricDBName, signozTSTableNameV4, metricName)
|
||||
var timeSeries uint64
|
||||
// Using QueryRow instead of Select since we're only expecting a single value
|
||||
err := r.db.QueryRow(ctx, query, milli).Scan(&timeSeries)
|
||||
if err != nil {
|
||||
return 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
return timeSeries, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) {
|
||||
var args []interface{}
|
||||
|
||||
conditions, _ := utils.BuildFilterConditions(&req.Filters, "t")
|
||||
whereClause := ""
|
||||
if conditions != nil {
|
||||
whereClause = "AND " + strings.Join(conditions, " AND ")
|
||||
}
|
||||
|
||||
firstQueryLimit := req.Limit
|
||||
dataPointsOrder := false
|
||||
var orderByClauseFirstQuery string
|
||||
if req.OrderBy.ColumnName == "samples" {
|
||||
dataPointsOrder = true
|
||||
orderByClauseFirstQuery = fmt.Sprintf("ORDER BY timeseries %s", req.OrderBy.Order)
|
||||
if req.Limit < 50 {
|
||||
firstQueryLimit = 50
|
||||
}
|
||||
} else if req.OrderBy.ColumnName == "metric_type" {
|
||||
orderByClauseFirstQuery = fmt.Sprintf("ORDER BY type %s", req.OrderBy.Order)
|
||||
} else {
|
||||
orderByClauseFirstQuery = fmt.Sprintf("ORDER BY %s %s", req.OrderBy.ColumnName, req.OrderBy.Order)
|
||||
}
|
||||
|
||||
start, end, tsTable, localTsTable := utils.WhichTSTableToUse(req.Start, req.EndD)
|
||||
sampleTable, countExp := utils.WhichSampleTableToUse(req.Start, req.EndD)
|
||||
|
||||
metricsQuery := fmt.Sprintf(
|
||||
`SELECT
|
||||
t.metric_name AS metric_name,
|
||||
ANY_VALUE(t.description) AS description,
|
||||
ANY_VALUE(t.type) AS type,
|
||||
ANY_VALUE(t.unit),
|
||||
uniq(t.fingerprint) AS timeseries,
|
||||
uniq(metric_name) OVER() AS total
|
||||
FROM %s.%s AS t
|
||||
WHERE unix_milli BETWEEN ? AND ?
|
||||
%s
|
||||
GROUP BY t.metric_name
|
||||
%s
|
||||
LIMIT %d OFFSET %d;`,
|
||||
signozMetricDBName, tsTable, whereClause, orderByClauseFirstQuery, firstQueryLimit, req.Offset)
|
||||
|
||||
args = append(args, start, end)
|
||||
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
|
||||
rows, err := r.db.Query(valueCtx, metricsQuery, args...)
|
||||
if err != nil {
|
||||
zap.L().Error("Error executing metrics query", zap.Error(err))
|
||||
return &metrics_explorer.SummaryListMetricsResponse{}, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var response metrics_explorer.SummaryListMetricsResponse
|
||||
var metricNames []string
|
||||
|
||||
for rows.Next() {
|
||||
var metric metrics_explorer.MetricDetail
|
||||
if err := rows.Scan(&metric.MetricName, &metric.Description, &metric.Type, &metric.Unit, &metric.TimeSeries, &response.Total); err != nil {
|
||||
zap.L().Error("Error scanning metric row", zap.Error(err))
|
||||
return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
metricNames = append(metricNames, metric.MetricName)
|
||||
response.Metrics = append(response.Metrics, metric)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
zap.L().Error("Error iterating over metric rows", zap.Error(err))
|
||||
return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
|
||||
if len(metricNames) == 0 {
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
metricsList := "'" + strings.Join(metricNames, "', '") + "'"
|
||||
if dataPointsOrder {
|
||||
orderByClauseFirstQuery = fmt.Sprintf("ORDER BY s.samples %s", req.OrderBy.Order)
|
||||
} else {
|
||||
orderByClauseFirstQuery = ""
|
||||
}
|
||||
|
||||
sampleQuery := fmt.Sprintf(
|
||||
`SELECT
|
||||
s.samples,
|
||||
s.metric_name,
|
||||
s.unix_milli AS lastReceived
|
||||
FROM (
|
||||
SELECT
|
||||
metric_name,
|
||||
%s AS samples,
|
||||
max(unix_milli) as unix_milli
|
||||
FROM %s.%s
|
||||
WHERE fingerprint IN (
|
||||
SELECT fingerprint
|
||||
FROM %s.%s
|
||||
WHERE unix_milli BETWEEN ? AND ?
|
||||
%s
|
||||
AND metric_name IN (%s)
|
||||
GROUP BY fingerprint
|
||||
)
|
||||
AND metric_name in (%s)
|
||||
GROUP BY metric_name
|
||||
) AS s
|
||||
%s
|
||||
LIMIT %d OFFSET %d;`,
|
||||
countExp, signozMetricDBName, sampleTable, signozMetricDBName, localTsTable,
|
||||
whereClause, metricsList, metricsList, orderByClauseFirstQuery,
|
||||
req.Limit, req.Offset)
|
||||
|
||||
args = append(args, start, end)
|
||||
rows, err = r.db.Query(valueCtx, sampleQuery, args...)
|
||||
if err != nil {
|
||||
zap.L().Error("Error executing samples query", zap.Error(err))
|
||||
return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
samplesMap := make(map[string]uint64)
|
||||
lastReceivedMap := make(map[string]int64)
|
||||
|
||||
for rows.Next() {
|
||||
var samples uint64
|
||||
var metricName string
|
||||
var lastReceived int64
|
||||
if err := rows.Scan(&samples, &metricName, &lastReceived); err != nil {
|
||||
zap.L().Error("Error scanning sample row", zap.Error(err))
|
||||
return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
samplesMap[metricName] = samples
|
||||
lastReceivedMap[metricName] = lastReceived
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
zap.L().Error("Error iterating over sample rows", zap.Error(err))
|
||||
return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
|
||||
var filteredMetrics []metrics_explorer.MetricDetail
|
||||
for i := range response.Metrics {
|
||||
if samples, exists := samplesMap[response.Metrics[i].MetricName]; exists {
|
||||
response.Metrics[i].Samples = samples
|
||||
if lastReceived, exists := lastReceivedMap[response.Metrics[i].MetricName]; exists {
|
||||
response.Metrics[i].LastReceived = lastReceived
|
||||
}
|
||||
filteredMetrics = append(filteredMetrics, response.Metrics[i])
|
||||
}
|
||||
}
|
||||
response.Metrics = filteredMetrics
|
||||
|
||||
if dataPointsOrder {
|
||||
sort.Slice(response.Metrics, func(i, j int) bool {
|
||||
return response.Metrics[i].Samples > response.Metrics[j].Samples
|
||||
})
|
||||
}
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetMetricsTimeSeriesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) {
|
||||
var args []interface{}
|
||||
|
||||
// Build filters dynamically
|
||||
conditions, _ := utils.BuildFilterConditions(&req.Filters, "")
|
||||
whereClause := ""
|
||||
if len(conditions) > 0 {
|
||||
whereClause = "AND " + strings.Join(conditions, " AND ")
|
||||
}
|
||||
start, end, tsTable, _ := utils.WhichTSTableToUse(req.Start, req.EndD)
|
||||
|
||||
// Construct the query without backticks
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
metric_name,
|
||||
total_value,
|
||||
(total_value * 100.0 / total_time_series) AS percentage
|
||||
FROM (
|
||||
SELECT
|
||||
metric_name,
|
||||
uniq(fingerprint) AS total_value,
|
||||
(SELECT uniq(fingerprint)
|
||||
FROM %s.%s
|
||||
WHERE unix_milli BETWEEN ? AND ?) AS total_time_series
|
||||
FROM %s.%s
|
||||
WHERE unix_milli BETWEEN ? AND ? %s
|
||||
GROUP BY metric_name
|
||||
)
|
||||
ORDER BY percentage DESC
|
||||
LIMIT %d;`,
|
||||
signozMetricDBName,
|
||||
tsTable,
|
||||
signozMetricDBName,
|
||||
tsTable,
|
||||
whereClause,
|
||||
req.Limit,
|
||||
)
|
||||
|
||||
args = append(args,
|
||||
start, end, // For total_cardinality subquery
|
||||
start, end, // For main query
|
||||
)
|
||||
|
||||
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
|
||||
rows, err := r.db.Query(valueCtx, query, args...)
|
||||
if err != nil {
|
||||
zap.L().Error("Error executing cardinality query", zap.Error(err), zap.String("query", query))
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var heatmap []metrics_explorer.TreeMapResponseItem
|
||||
for rows.Next() {
|
||||
var item metrics_explorer.TreeMapResponseItem
|
||||
if err := rows.Scan(&item.MetricName, &item.TotalValue, &item.Percentage); err != nil {
|
||||
zap.L().Error("Error scanning row", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
heatmap = append(heatmap, item)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
zap.L().Error("Error iterating over rows", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
|
||||
return &heatmap, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) {
|
||||
var args []interface{}
|
||||
|
||||
// Build the filter conditions
|
||||
conditions, _ := utils.BuildFilterConditions(&req.Filters, "t")
|
||||
whereClause := ""
|
||||
if conditions != nil {
|
||||
whereClause = "AND " + strings.Join(conditions, " AND ")
|
||||
}
|
||||
|
||||
// Determine time range and tables to use
|
||||
start, end, tsTable, localTsTable := utils.WhichTSTableToUse(req.Start, req.EndD)
|
||||
sampleTable, countExp := utils.WhichSampleTableToUse(req.Start, req.EndD)
|
||||
|
||||
// Construct the metrics query
|
||||
queryLimit := 50 + req.Limit
|
||||
metricsQuery := fmt.Sprintf(
|
||||
`SELECT
|
||||
t.metric_name AS metric_name,
|
||||
uniq(t.fingerprint) AS timeSeries
|
||||
FROM %s.%s AS t
|
||||
WHERE unix_milli BETWEEN ? AND ?
|
||||
%s
|
||||
GROUP BY t.metric_name
|
||||
ORDER BY timeSeries DESC
|
||||
LIMIT %d;`,
|
||||
signozMetricDBName, tsTable, whereClause, queryLimit,
|
||||
)
|
||||
|
||||
args = append(args, start, end)
|
||||
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
|
||||
|
||||
// Execute the metrics query
|
||||
rows, err := r.db.Query(valueCtx, metricsQuery, args...)
|
||||
if err != nil {
|
||||
zap.L().Error("Error executing metrics query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// Process the query results
|
||||
var metricNames []string
|
||||
for rows.Next() {
|
||||
var metricName string
|
||||
var timeSeries uint64
|
||||
if err := rows.Scan(&metricName, &timeSeries); err != nil {
|
||||
zap.L().Error("Error scanning metric row", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
metricNames = append(metricNames, metricName)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
zap.L().Error("Error iterating over metric rows", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
|
||||
// If no metrics found, return early
|
||||
if len(metricNames) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Format metric names for query
|
||||
metricsList := "'" + strings.Join(metricNames, "', '") + "'"
|
||||
|
||||
// Construct the sample percentage query
|
||||
sampleQuery := fmt.Sprintf(
|
||||
`WITH TotalSamples AS (
|
||||
SELECT %s AS total_samples
|
||||
FROM %s.%s
|
||||
WHERE unix_milli BETWEEN ? AND ?
|
||||
)
|
||||
SELECT
|
||||
s.samples,
|
||||
s.metric_name,
|
||||
COALESCE((s.samples * 100.0 / t.total_samples), 0) AS percentage
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
metric_name,
|
||||
%s AS samples
|
||||
FROM %s.%s
|
||||
WHERE fingerprint IN
|
||||
(
|
||||
SELECT fingerprint
|
||||
FROM %s.%s
|
||||
WHERE unix_milli BETWEEN ? AND ?
|
||||
%s
|
||||
AND metric_name IN (%s)
|
||||
GROUP BY fingerprint
|
||||
)
|
||||
AND metric_name IN (%s)
|
||||
GROUP BY metric_name
|
||||
) AS s
|
||||
JOIN TotalSamples t ON 1 = 1
|
||||
ORDER BY percentage DESC
|
||||
LIMIT %d;`,
|
||||
countExp, signozMetricDBName, sampleTable, // Total samples
|
||||
countExp, signozMetricDBName, sampleTable, // Inner select samples
|
||||
signozMetricDBName, localTsTable, whereClause, metricsList, // Subquery conditions
|
||||
metricsList, req.Limit, // Final conditions
|
||||
)
|
||||
|
||||
args = append(args, start, end)
|
||||
|
||||
// Execute the sample percentage query
|
||||
rows, err = r.db.Query(valueCtx, sampleQuery, args...)
|
||||
if err != nil {
|
||||
zap.L().Error("Error executing samples query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// Process the results into a response slice
|
||||
var heatmap []metrics_explorer.TreeMapResponseItem
|
||||
for rows.Next() {
|
||||
var item metrics_explorer.TreeMapResponseItem
|
||||
if err := rows.Scan(&item.TotalValue, &item.MetricName, &item.Percentage); err != nil {
|
||||
zap.L().Error("Error scanning row", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
heatmap = append(heatmap, item)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
zap.L().Error("Error iterating over sample rows", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
|
||||
return &heatmap, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetRelatedMetrics(ctx context.Context, req *metrics_explorer.RelatedMetricsRequest) (map[string]metrics_explorer.RelatedMetricsScore, *model.ApiError) {
|
||||
// First query: Compute name similarity and get candidate metric names.
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
metric_name,
|
||||
any(type) as type,
|
||||
1 - (levenshteinDistance(?, metric_name) / greatest(NULLIF(length(?), 0), NULLIF(length(metric_name), 0))) AS name_similarity
|
||||
FROM %s.%s
|
||||
WHERE metric_name != ?
|
||||
AND unix_milli BETWEEN ? AND ?
|
||||
GROUP BY metric_name
|
||||
ORDER BY name_similarity DESC
|
||||
LIMIT 30;
|
||||
`, signozMetricDBName, signozTSTableNameV41Week)
|
||||
|
||||
rows, err := r.db.Query(ctx, query, req.CurrentMetricName, req.CurrentMetricName, req.CurrentMetricName, req.Start, req.End)
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
result := make(map[string]metrics_explorer.RelatedMetricsScore)
|
||||
var metricNames []string
|
||||
for rows.Next() {
|
||||
var metric string
|
||||
var sim float64
|
||||
var metricType string
|
||||
if err := rows.Scan(&metric, &metricType, &sim); err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
result[metric] = metrics_explorer.RelatedMetricsScore{
|
||||
NameSimilarity: sim,
|
||||
MetricType: metricType,
|
||||
}
|
||||
metricNames = append(metricNames, metric)
|
||||
}
|
||||
|
||||
if len(metricNames) == 0 {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// --- STEP 1: Get the extracted labels for the target metric ---
|
||||
extractedLabelsQuery := fmt.Sprintf(`
|
||||
SELECT
|
||||
kv.1 AS label_key,
|
||||
arraySlice(
|
||||
arrayDistinct(
|
||||
groupArray(replaceRegexpAll(kv.2, '^"(.*)"$', '\\1'))
|
||||
),
|
||||
1,
|
||||
10
|
||||
) AS label_values
|
||||
FROM %s.%s
|
||||
ARRAY JOIN JSONExtractKeysAndValuesRaw(labels) AS kv
|
||||
WHERE metric_name = ?
|
||||
AND NOT startsWith(kv.1, '__')
|
||||
AND unix_milli between ? and ?
|
||||
GROUP BY label_key
|
||||
LIMIT 50
|
||||
`, signozMetricDBName, signozTSTableNameV41Week)
|
||||
|
||||
var targetKeys []string
|
||||
var targetValues []string
|
||||
rows, err = r.db.Query(ctx, extractedLabelsQuery, req.CurrentMetricName, req.Start, req.End)
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var key string
|
||||
var value []string
|
||||
if err := rows.Scan(&key, &value); err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
targetKeys = append(targetKeys, key)
|
||||
targetValues = append(targetValues, value...)
|
||||
}
|
||||
|
||||
targetKeysList := "'" + strings.Join(targetKeys, "', '") + "'"
|
||||
targetValuesList := "'" + strings.Join(targetValues, "', '") + "'"
|
||||
|
||||
var priorityList []string
|
||||
for _, f := range req.Filters.Items {
|
||||
priorityList = append(priorityList, fmt.Sprintf("tuple('%s', '%s')", f.Key.Key, f.Value))
|
||||
}
|
||||
priorityListString := strings.Join(priorityList, ", ")
|
||||
|
||||
// --- STEP 2: Get labels for candidate metrics ---
|
||||
candidateLabelsQuery := fmt.Sprintf(`
|
||||
WITH
|
||||
arrayDistinct([%s]) AS filter_keys,
|
||||
arrayDistinct([%s]) AS filter_values,
|
||||
[%s] AS priority_pairs_input,
|
||||
%d AS priority_multiplier
|
||||
SELECT
|
||||
metric_name,
|
||||
any(type) as type,
|
||||
SUM(
|
||||
arraySum(
|
||||
kv -> if(has(filter_keys, kv.1) AND has(filter_values, kv.2), 1, 0),
|
||||
JSONExtractKeysAndValues(labels, 'String')
|
||||
)
|
||||
)::UInt64 AS raw_match_count,
|
||||
SUM(
|
||||
arraySum(
|
||||
kv ->
|
||||
if(
|
||||
arrayExists(pr -> pr.1 = kv.1 AND pr.2 = kv.2, priority_pairs_input),
|
||||
priority_multiplier,
|
||||
0
|
||||
),
|
||||
JSONExtractKeysAndValues(labels, 'String')
|
||||
)
|
||||
)::UInt64 AS weighted_match_count,
|
||||
toJSONString(
|
||||
arrayDistinct(
|
||||
arrayFlatten(
|
||||
groupArray(
|
||||
arrayFilter(
|
||||
kv -> arrayExists(pr -> pr.1 = kv.1 AND pr.2 = kv.2, priority_pairs_input),
|
||||
JSONExtractKeysAndValues(labels, 'String')
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
) AS priority_pairs
|
||||
|
||||
FROM %s.%s
|
||||
WHERE rand() %% 100 < 50
|
||||
AND unix_milli between ? and ?
|
||||
GROUP BY metric_name
|
||||
ORDER BY weighted_match_count DESC, raw_match_count DESC
|
||||
LIMIT 30
|
||||
`, targetKeysList, targetValuesList, priorityListString, 2,
|
||||
signozMetricDBName, signozTSTableNameV41Week)
|
||||
|
||||
rows, err = r.db.Query(ctx, candidateLabelsQuery, req.Start, req.End)
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
attributeMap := make(map[string]uint64)
|
||||
for rows.Next() {
|
||||
var metric string
|
||||
var metricType string
|
||||
var weightedMatchCount, rawMatchCount uint64
|
||||
var priorityPairsJSON string // Scan into a string
|
||||
|
||||
if err := rows.Scan(&metric, &metricType, &rawMatchCount, &weightedMatchCount, &priorityPairsJSON); err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
attributeMap[metric] = weightedMatchCount + (rawMatchCount)/10
|
||||
var priorityPairs [][]string
|
||||
if err := json.Unmarshal([]byte(priorityPairsJSON), &priorityPairs); err != nil {
|
||||
priorityPairs = [][]string{}
|
||||
}
|
||||
if _, ok := result[metric]; ok {
|
||||
result[metric] = metrics_explorer.RelatedMetricsScore{
|
||||
NameSimilarity: result[metric].NameSimilarity,
|
||||
Filters: priorityPairs,
|
||||
MetricType: metricType,
|
||||
}
|
||||
} else {
|
||||
result[metric] = metrics_explorer.RelatedMetricsScore{
|
||||
Filters: priorityPairs,
|
||||
MetricType: metricType,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
normalizeMap := utils.NormalizeMap(attributeMap)
|
||||
|
||||
// --- STEP 4: Compute similarity scores and update result map ---
|
||||
for metric, data := range normalizeMap {
|
||||
if _, ok := result[metric]; ok {
|
||||
result[metric] = metrics_explorer.RelatedMetricsScore{
|
||||
NameSimilarity: result[metric].NameSimilarity,
|
||||
AttributeSimilarity: data,
|
||||
Filters: result[metric].Filters,
|
||||
MetricType: result[metric].MetricType,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetInspectMetrics(ctx context.Context, req *metrics_explorer.InspectMetricsRequest) (*metrics_explorer.InspectMetricsResponse, *model.ApiError) {
|
||||
start, end, _, localTsTable := utils.WhichTSTableToUse(req.Start, req.End)
|
||||
query := fmt.Sprintf(`SELECT
|
||||
fingerprint,
|
||||
labels,
|
||||
unix_milli,
|
||||
value as per_series_value
|
||||
FROM
|
||||
signoz_metrics.distributed_samples_v4
|
||||
INNER JOIN (
|
||||
SELECT DISTINCT
|
||||
fingerprint,
|
||||
labels
|
||||
FROM
|
||||
%s.%s
|
||||
WHERE
|
||||
metric_name = ?
|
||||
AND unix_milli >= ?
|
||||
AND unix_milli < ? LIMIT 20) as filtered_time_series
|
||||
USING fingerprint
|
||||
WHERE
|
||||
metric_name = ?
|
||||
AND unix_milli >= ?
|
||||
AND unix_milli < ?
|
||||
ORDER BY fingerprint DESC, unix_milli DESC`, signozMetricDBName, localTsTable)
|
||||
|
||||
rows, err := r.db.Query(ctx, query, req.MetricName, start, end, req.MetricName, start, end)
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
seriesMap := make(map[uint64]*v3.Series)
|
||||
|
||||
for rows.Next() {
|
||||
var fingerprint uint64
|
||||
var labelsJSON string
|
||||
var unixMilli int64
|
||||
var perSeriesValue float64
|
||||
|
||||
if err := rows.Scan(&fingerprint, &labelsJSON, &unixMilli, &perSeriesValue); err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
|
||||
var labelsMap map[string]string
|
||||
if err := json.Unmarshal([]byte(labelsJSON), &labelsMap); err != nil {
|
||||
return nil, &model.ApiError{Typ: "JsonUnmarshalError", Err: err}
|
||||
}
|
||||
|
||||
// Filter out keys starting with "__"
|
||||
filteredLabelsMap := make(map[string]string)
|
||||
for k, v := range labelsMap {
|
||||
if !strings.HasPrefix(k, "__") {
|
||||
filteredLabelsMap[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
var labelsArray []map[string]string
|
||||
for k, v := range filteredLabelsMap {
|
||||
labelsArray = append(labelsArray, map[string]string{k: v})
|
||||
}
|
||||
|
||||
// Check if we already have a Series for this fingerprint.
|
||||
series, exists := seriesMap[fingerprint]
|
||||
if !exists {
|
||||
series = &v3.Series{
|
||||
Labels: filteredLabelsMap,
|
||||
LabelsArray: labelsArray,
|
||||
Points: []v3.Point{},
|
||||
}
|
||||
seriesMap[fingerprint] = series
|
||||
}
|
||||
|
||||
series.Points = append(series.Points, v3.Point{
|
||||
Timestamp: unixMilli,
|
||||
Value: perSeriesValue,
|
||||
})
|
||||
}
|
||||
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
|
||||
var seriesList []v3.Series
|
||||
for _, s := range seriesMap {
|
||||
seriesList = append(seriesList, *s)
|
||||
}
|
||||
|
||||
return &metrics_explorer.InspectMetricsResponse{
|
||||
Series: &seriesList,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -537,3 +537,181 @@ func countPanelsInDashboard(inputData map[string]interface{}) model.DashboardsIn
|
||||
LogsPanelsWithAttrContainsOp: logsPanelsWithAttrContains,
|
||||
}
|
||||
}
|
||||
|
||||
func GetDashboardsWithMetricName(ctx context.Context, metricName string) ([]map[string]string, *model.ApiError) {
|
||||
// Get all dashboards first
|
||||
query := `SELECT uuid, data FROM dashboards`
|
||||
|
||||
type dashboardRow struct {
|
||||
Uuid string `db:"uuid"`
|
||||
Data json.RawMessage `db:"data"`
|
||||
}
|
||||
|
||||
var dashboards []dashboardRow
|
||||
err := db.Select(&dashboards, query)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in getting dashboards", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
|
||||
}
|
||||
|
||||
// Process the JSON data in Go
|
||||
var result []map[string]string
|
||||
for _, dashboard := range dashboards {
|
||||
var dashData map[string]interface{}
|
||||
if err := json.Unmarshal(dashboard.Data, &dashData); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
dashTitle, _ := dashData["title"].(string)
|
||||
widgets, ok := dashData["widgets"].([]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, w := range widgets {
|
||||
widget, ok := w.(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
widgetTitle, _ := widget["title"].(string)
|
||||
widgetID, _ := widget["id"].(string)
|
||||
|
||||
query, ok := widget["query"].(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
builder, ok := query["builder"].(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
queryData, ok := builder["queryData"].([]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, qd := range queryData {
|
||||
data, ok := qd.(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if dataSource, ok := data["dataSource"].(string); !ok || dataSource != "metrics" {
|
||||
continue
|
||||
}
|
||||
|
||||
aggregateAttr, ok := data["aggregateAttribute"].(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if key, ok := aggregateAttr["key"].(string); ok && strings.TrimSpace(key) == metricName {
|
||||
result = append(result, map[string]string{
|
||||
"dashboard_id": dashboard.Uuid,
|
||||
"widget_title": widgetTitle,
|
||||
"widget_id": widgetID,
|
||||
"dashboard_title": dashTitle,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func GetDashboardsWithMetricNames(ctx context.Context, metricNames []string) (map[string][]map[string]string, *model.ApiError) {
|
||||
// Get all dashboards first
|
||||
query := `SELECT uuid, data FROM dashboards`
|
||||
|
||||
type dashboardRow struct {
|
||||
Uuid string `db:"uuid"`
|
||||
Data json.RawMessage `db:"data"`
|
||||
}
|
||||
|
||||
var dashboards []dashboardRow
|
||||
err := db.Select(&dashboards, query)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in getting dashboards", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
|
||||
}
|
||||
|
||||
// Initialize result map for each metric
|
||||
result := make(map[string][]map[string]string)
|
||||
for _, metricName := range metricNames {
|
||||
result[metricName] = []map[string]string{}
|
||||
}
|
||||
|
||||
// Process the JSON data in Go
|
||||
for _, dashboard := range dashboards {
|
||||
var dashData map[string]interface{}
|
||||
if err := json.Unmarshal(dashboard.Data, &dashData); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
dashTitle, _ := dashData["title"].(string)
|
||||
widgets, ok := dashData["widgets"].([]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, w := range widgets {
|
||||
widget, ok := w.(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
widgetTitle, _ := widget["title"].(string)
|
||||
widgetID, _ := widget["id"].(string)
|
||||
|
||||
query, ok := widget["query"].(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
builder, ok := query["builder"].(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
queryData, ok := builder["queryData"].([]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, qd := range queryData {
|
||||
data, ok := qd.(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if dataSource, ok := data["dataSource"].(string); !ok || dataSource != "metrics" {
|
||||
continue
|
||||
}
|
||||
|
||||
aggregateAttr, ok := data["aggregateAttribute"].(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if key, ok := aggregateAttr["key"].(string); ok {
|
||||
// Check if this metric is in our list of interest
|
||||
for _, metricName := range metricNames {
|
||||
if strings.TrimSpace(key) == metricName {
|
||||
result[metricName] = append(result[metricName], map[string]string{
|
||||
"dashboard_id": dashboard.Uuid,
|
||||
"widget_title": widgetTitle,
|
||||
"widget_id": widgetID,
|
||||
"dashboard_title": dashTitle,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/metricsexplorer"
|
||||
"io"
|
||||
"math"
|
||||
"net/http"
|
||||
@@ -126,6 +127,8 @@ type APIHandler struct {
|
||||
statefulsetsRepo *inframetrics.StatefulSetsRepo
|
||||
jobsRepo *inframetrics.JobsRepo
|
||||
|
||||
SummaryService *metricsexplorer.SummaryService
|
||||
|
||||
pvcsRepo *inframetrics.PvcsRepo
|
||||
|
||||
JWT *authtypes.JWT
|
||||
@@ -214,6 +217,8 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
||||
statefulsetsRepo := inframetrics.NewStatefulSetsRepo(opts.Reader, querierv2)
|
||||
jobsRepo := inframetrics.NewJobsRepo(opts.Reader, querierv2)
|
||||
pvcsRepo := inframetrics.NewPvcsRepo(opts.Reader, querierv2)
|
||||
//explorerCache := metricsexplorer.NewExplorerCache(metricsexplorer.WithCache(opts.Cache))
|
||||
summaryService := metricsexplorer.NewSummaryService(opts.Reader, querierv2)
|
||||
|
||||
aH := &APIHandler{
|
||||
reader: opts.Reader,
|
||||
@@ -243,6 +248,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
||||
jobsRepo: jobsRepo,
|
||||
pvcsRepo: pvcsRepo,
|
||||
JWT: opts.JWT,
|
||||
SummaryService: summaryService,
|
||||
}
|
||||
|
||||
logsQueryBuilder := logsv3.PrepareLogsQuery
|
||||
@@ -605,6 +611,30 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
|
||||
router.HandleFunc("/api/v1/changePassword/{id}", am.SelfAccess(aH.changePassword)).Methods(http.MethodPost)
|
||||
}
|
||||
|
||||
func (ah *APIHandler) MetricExplorerRoutes(router *mux.Router, am *AuthMiddleware) {
|
||||
router.HandleFunc("/api/v1/metrics/filters/keys",
|
||||
am.ViewAccess(ah.FilterKeysSuggestion)).
|
||||
Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/metrics/filters/values",
|
||||
am.ViewAccess(ah.FilterValuesSuggestion)).
|
||||
Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/metrics/{metric_name}/metadata",
|
||||
am.ViewAccess(ah.GetMetricsDetails)).
|
||||
Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/metrics",
|
||||
am.ViewAccess(ah.ListMetrics)).
|
||||
Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/metrics/treemap",
|
||||
am.ViewAccess(ah.GetTreeMap)).
|
||||
Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/metrics/related",
|
||||
am.ViewAccess(ah.GetRelatedMetrics)).
|
||||
Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/metrics/inspect",
|
||||
am.ViewAccess(ah.GetInspectMetricsData)).
|
||||
Methods(http.MethodPost)
|
||||
}
|
||||
|
||||
func Intersection(a, b []int) (c []int) {
|
||||
m := make(map[int]bool)
|
||||
|
||||
|
||||
89
pkg/query-service/app/metricsexplorer/parser.go
Normal file
89
pkg/query-service/app/metricsexplorer/parser.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package metricsexplorer
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
"go.signoz.io/signoz/pkg/query-service/model/metrics_explorer"
|
||||
)
|
||||
|
||||
func ParseFilterKeySuggestions(r *http.Request) (*metrics_explorer.FilterKeyRequest, *model.ApiError) {
|
||||
|
||||
searchText := r.URL.Query().Get("searchText")
|
||||
limit, err := strconv.Atoi(r.URL.Query().Get("limit"))
|
||||
if err != nil {
|
||||
limit = 50
|
||||
}
|
||||
|
||||
return &metrics_explorer.FilterKeyRequest{Limit: limit, SearchText: searchText}, nil
|
||||
}
|
||||
|
||||
func ParseFilterValueSuggestions(r *http.Request) (*metrics_explorer.FilterValueRequest, *model.ApiError) {
|
||||
var filterValueRequest metrics_explorer.FilterValueRequest
|
||||
|
||||
// parse the request body
|
||||
if err := json.NewDecoder(r.Body).Decode(&filterValueRequest); err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
|
||||
}
|
||||
|
||||
return &filterValueRequest, nil
|
||||
}
|
||||
|
||||
func ParseSummaryListMetricsParams(r *http.Request) (*metrics_explorer.SummaryListMetricsRequest, *model.ApiError) {
|
||||
var listMetricsParams *metrics_explorer.SummaryListMetricsRequest
|
||||
|
||||
// parse the request body
|
||||
if err := json.NewDecoder(r.Body).Decode(&listMetricsParams); err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
|
||||
}
|
||||
|
||||
if listMetricsParams.OrderBy.ColumnName == "" || listMetricsParams.OrderBy.Order == "" {
|
||||
listMetricsParams.OrderBy.ColumnName = "timeseries" // DEFAULT ORDER BY
|
||||
listMetricsParams.OrderBy.Order = v3.DirectionDesc
|
||||
}
|
||||
|
||||
if listMetricsParams.Limit == 0 {
|
||||
listMetricsParams.Limit = 10 // DEFAULT LIMIT
|
||||
}
|
||||
|
||||
return listMetricsParams, nil
|
||||
}
|
||||
|
||||
func ParseTreeMapMetricsParams(r *http.Request) (*metrics_explorer.TreeMapMetricsRequest, *model.ApiError) {
|
||||
var treeMapMetricParams *metrics_explorer.TreeMapMetricsRequest
|
||||
|
||||
// parse the request body
|
||||
if err := json.NewDecoder(r.Body).Decode(&treeMapMetricParams); err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
|
||||
}
|
||||
|
||||
if treeMapMetricParams.Limit == 0 {
|
||||
treeMapMetricParams.Limit = 10
|
||||
}
|
||||
|
||||
return treeMapMetricParams, nil
|
||||
}
|
||||
|
||||
func ParseRelatedMetricsParams(r *http.Request) (*metrics_explorer.RelatedMetricsRequest, *model.ApiError) {
|
||||
var relatedMetricParams metrics_explorer.RelatedMetricsRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&relatedMetricParams); err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
|
||||
}
|
||||
return &relatedMetricParams, nil
|
||||
}
|
||||
|
||||
func ParseInspectMetricsParams(r *http.Request) (*metrics_explorer.InspectMetricsRequest, *model.ApiError) {
|
||||
var inspectMetricParams metrics_explorer.InspectMetricsRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&inspectMetricParams); err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
|
||||
}
|
||||
if inspectMetricParams.End-inspectMetricParams.Start > 1800000 { // half hour only
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("time duration shouldn't be more than 30 mins")}
|
||||
}
|
||||
return &inspectMetricParams, nil
|
||||
}
|
||||
335
pkg/query-service/app/metricsexplorer/summary.go
Normal file
335
pkg/query-service/app/metricsexplorer/summary.go
Normal file
@@ -0,0 +1,335 @@
|
||||
package metricsexplorer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
|
||||
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
"go.signoz.io/signoz/pkg/query-service/model/metrics_explorer"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type SummaryService struct {
|
||||
reader interfaces.Reader
|
||||
querierV2 interfaces.Querier
|
||||
}
|
||||
|
||||
func NewSummaryService(reader interfaces.Reader, querierV2 interfaces.Querier) *SummaryService {
|
||||
return &SummaryService{reader: reader, querierV2: querierV2}
|
||||
}
|
||||
|
||||
func (receiver *SummaryService) FilterKeys(ctx context.Context, params *metrics_explorer.FilterKeyRequest) (*metrics_explorer.FilterKeyResponse, *model.ApiError) {
|
||||
var response metrics_explorer.FilterKeyResponse
|
||||
keys, apiError := receiver.reader.GetAllMetricFilterAttributeKeys(
|
||||
ctx,
|
||||
params,
|
||||
true,
|
||||
)
|
||||
if apiError != nil {
|
||||
return nil, apiError
|
||||
}
|
||||
response.AttributeKeys = *keys
|
||||
var availableColumnFilter []string
|
||||
for key := range metrics_explorer.AvailableColumnFilterMap {
|
||||
availableColumnFilter = append(availableColumnFilter, key)
|
||||
}
|
||||
response.MetricColumns = availableColumnFilter
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (receiver *SummaryService) FilterValues(ctx context.Context, params *metrics_explorer.FilterValueRequest) (*metrics_explorer.FilterValueResponse, *model.ApiError) {
|
||||
var response metrics_explorer.FilterValueResponse
|
||||
switch params.FilterKey {
|
||||
case "metric_name":
|
||||
var filterValues []string
|
||||
request := v3.AggregateAttributeRequest{DataSource: v3.DataSourceMetrics, SearchText: params.SearchText, Limit: params.Limit}
|
||||
attributes, err := receiver.reader.GetMetricAggregateAttributes(ctx, &request, true)
|
||||
if err != nil {
|
||||
return nil, model.InternalError(err)
|
||||
}
|
||||
for _, item := range attributes.AttributeKeys {
|
||||
filterValues = append(filterValues, item.Key)
|
||||
}
|
||||
response.FilterValues = filterValues
|
||||
return &response, nil
|
||||
case "metric_unit":
|
||||
attributes, err := receiver.reader.GetAllMetricFilterUnits(ctx, params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
response.FilterValues = attributes
|
||||
return &response, nil
|
||||
case "metric_type":
|
||||
attributes, err := receiver.reader.GetAllMetricFilterTypes(ctx, params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
response.FilterValues = attributes
|
||||
return &response, nil
|
||||
default:
|
||||
attributes, err := receiver.reader.GetAllMetricFilterAttributeValues(ctx, params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
response.FilterValues = attributes
|
||||
return &response, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricName string) (metrics_explorer.MetricDetailsDTO, *model.ApiError) {
|
||||
var metricDetailsDTO metrics_explorer.MetricDetailsDTO
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
// Call 1: GetMetricMetadata
|
||||
g.Go(func() error {
|
||||
metadata, err := receiver.reader.GetMetricMetadata(ctx, metricName, metricName)
|
||||
if err != nil {
|
||||
return &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
metricDetailsDTO.Name = metricName
|
||||
metricDetailsDTO.Unit = metadata.Unit
|
||||
metricDetailsDTO.Description = metadata.Description
|
||||
metricDetailsDTO.Type = metadata.Type
|
||||
metricDetailsDTO.Metadata.MetricType = metadata.Type
|
||||
metricDetailsDTO.Metadata.Description = metadata.Description
|
||||
metricDetailsDTO.Metadata.Unit = metadata.Unit
|
||||
return nil
|
||||
})
|
||||
|
||||
// Call 2: GetMetricsDataPointsAndLastReceived
|
||||
g.Go(func() error {
|
||||
dataPoints, lastReceived, err := receiver.reader.GetMetricsDataPointsAndLastReceived(ctx, metricName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metricDetailsDTO.Samples = dataPoints
|
||||
metricDetailsDTO.LastReceived = lastReceived
|
||||
return nil
|
||||
})
|
||||
|
||||
// Call 3: GetTotalTimeSeriesForMetricName
|
||||
g.Go(func() error {
|
||||
totalSeries, err := receiver.reader.GetTotalTimeSeriesForMetricName(ctx, metricName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metricDetailsDTO.TimeSeriesTotal = totalSeries
|
||||
return nil
|
||||
})
|
||||
|
||||
// Call 4: GetActiveTimeSeriesForMetricName
|
||||
g.Go(func() error {
|
||||
activeSeries, err := receiver.reader.GetActiveTimeSeriesForMetricName(ctx, metricName, 120*time.Minute)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metricDetailsDTO.TimeSeriesActive = activeSeries
|
||||
return nil
|
||||
})
|
||||
|
||||
// Call 5: GetAttributesForMetricName
|
||||
g.Go(func() error {
|
||||
attributes, err := receiver.reader.GetAttributesForMetricName(ctx, metricName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if attributes != nil {
|
||||
metricDetailsDTO.Attributes = *attributes
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Call 6: GetDashboardsWithMetricName
|
||||
g.Go(func() error {
|
||||
data, err := dashboards.GetDashboardsWithMetricName(ctx, metricName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if data != nil {
|
||||
jsonData, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
zap.L().Error("Error marshalling data:", zap.Error(err))
|
||||
return &model.ApiError{Typ: "MarshallingErr", Err: err}
|
||||
}
|
||||
|
||||
var dashboards []metrics_explorer.Dashboard
|
||||
err = json.Unmarshal(jsonData, &dashboards)
|
||||
if err != nil {
|
||||
zap.L().Error("Error unmarshalling data:", zap.Error(err))
|
||||
return &model.ApiError{Typ: "UnMarshallingErr", Err: err}
|
||||
}
|
||||
metricDetailsDTO.Dashboards = dashboards
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Wait for all goroutines and handle any errors
|
||||
if err := g.Wait(); err != nil {
|
||||
// Type assert to check if it's already an ApiError
|
||||
if apiErr, ok := err.(*model.ApiError); ok {
|
||||
return metrics_explorer.MetricDetailsDTO{}, apiErr
|
||||
}
|
||||
// If it's not an ApiError, wrap it in one
|
||||
return metrics_explorer.MetricDetailsDTO{}, &model.ApiError{Typ: "InternalError", Err: err}
|
||||
}
|
||||
|
||||
return metricDetailsDTO, nil
|
||||
}
|
||||
|
||||
func (receiver *SummaryService) ListMetricsWithSummary(ctx context.Context, params *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) {
|
||||
return receiver.reader.ListSummaryMetrics(ctx, params)
|
||||
}
|
||||
|
||||
func (receiver *SummaryService) GetMetricsTreemap(ctx context.Context, params *metrics_explorer.TreeMapMetricsRequest) (*metrics_explorer.TreeMap, *model.ApiError) {
|
||||
var response metrics_explorer.TreeMap
|
||||
switch params.Treemap {
|
||||
case metrics_explorer.TimeSeriesTeeMap:
|
||||
cardinality, apiError := receiver.reader.GetMetricsTimeSeriesPercentage(ctx, params)
|
||||
if apiError != nil {
|
||||
return nil, apiError
|
||||
}
|
||||
response.TimeSeries = *cardinality
|
||||
return &response, nil
|
||||
case metrics_explorer.SamplesTreeMap:
|
||||
dataPoints, apiError := receiver.reader.GetMetricsSamplesPercentage(ctx, params)
|
||||
if apiError != nil {
|
||||
return nil, apiError
|
||||
}
|
||||
response.Samples = *dataPoints
|
||||
return &response, nil
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (receiver *SummaryService) GetRelatedMetrics(ctx context.Context, params *metrics_explorer.RelatedMetricsRequest) (*metrics_explorer.RelatedMetricsResponse, *model.ApiError) {
|
||||
var relatedMetricsResponse metrics_explorer.RelatedMetricsResponse
|
||||
|
||||
relatedMetricsMap, err := receiver.reader.GetRelatedMetrics(ctx, params)
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Typ: "Error", Err: err}
|
||||
}
|
||||
|
||||
finalScores := make(map[string]float64)
|
||||
for metric, scores := range relatedMetricsMap {
|
||||
finalScores[metric] = scores.NameSimilarity*0.5 + scores.AttributeSimilarity*0.5
|
||||
}
|
||||
|
||||
type metricScore struct {
|
||||
Name string
|
||||
Score float64
|
||||
}
|
||||
var sortedScores []metricScore
|
||||
for metric, score := range finalScores {
|
||||
sortedScores = append(sortedScores, metricScore{
|
||||
Name: metric,
|
||||
Score: score,
|
||||
})
|
||||
}
|
||||
|
||||
sort.Slice(sortedScores, func(i, j int) bool {
|
||||
return sortedScores[i].Score > sortedScores[j].Score
|
||||
})
|
||||
|
||||
// Extract metric names for retrieving dashboard information
|
||||
var metricNames []string
|
||||
for _, ms := range sortedScores {
|
||||
metricNames = append(metricNames, ms.Name)
|
||||
}
|
||||
|
||||
dashboardsInfo, err := dashboards.GetDashboardsWithMetricNames(ctx, metricNames)
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Typ: "Error", Err: err}
|
||||
}
|
||||
|
||||
// Build the final response using the sorted order
|
||||
for _, ms := range sortedScores {
|
||||
var dashboardsList []metrics_explorer.Dashboard
|
||||
|
||||
if dashEntries, ok := dashboardsInfo[ms.Name]; ok {
|
||||
for _, dashInfo := range dashEntries {
|
||||
dashboardsList = append(dashboardsList, metrics_explorer.Dashboard{
|
||||
DashboardName: dashInfo["dashboard_title"],
|
||||
DashboardID: dashInfo["dashboard_id"],
|
||||
WidgetID: dashInfo["widget_id"],
|
||||
WidgetName: dashInfo["widget_title"],
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
relatedMetric := metrics_explorer.RelatedMetrics{
|
||||
Name: ms.Name,
|
||||
Dashboards: dashboardsList,
|
||||
Query: GetQueryRangeForRelateMetricsList(ms.Name, relatedMetricsMap[ms.Name]),
|
||||
}
|
||||
|
||||
relatedMetricsResponse.RelatedMetrics = append(relatedMetricsResponse.RelatedMetrics, relatedMetric)
|
||||
}
|
||||
|
||||
return &relatedMetricsResponse, nil
|
||||
}
|
||||
|
||||
func GetQueryRangeForRelateMetricsList(metricName string, scores metrics_explorer.RelatedMetricsScore) *v3.BuilderQuery {
|
||||
var filterItems []v3.FilterItem
|
||||
for _, pair := range scores.Filters {
|
||||
if len(pair) < 2 {
|
||||
continue // Skip invalid filter pairs.
|
||||
}
|
||||
filterItem := v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: pair[0], // Default type, or you can use v3.AttributeKeyTypeUnspecified.
|
||||
IsColumn: false,
|
||||
IsJSON: false,
|
||||
},
|
||||
Value: pair[1],
|
||||
Operator: v3.FilterOperatorEqual, // Using "=" as the operator.
|
||||
}
|
||||
filterItems = append(filterItems, filterItem)
|
||||
}
|
||||
|
||||
// If there are any filters, combine them with an "AND" operator.
|
||||
var filters *v3.FilterSet
|
||||
if len(filterItems) > 0 {
|
||||
filters = &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: filterItems,
|
||||
}
|
||||
}
|
||||
|
||||
// Create the BuilderQuery. Here we set the QueryName to the metric name.
|
||||
query := v3.BuilderQuery{
|
||||
QueryName: metricName,
|
||||
DataSource: v3.DataSourceMetrics, // Assuming the data source is metrics.
|
||||
Expression: metricName, // Using metric name as expression (can be adjusted as needed).
|
||||
Filters: filters,
|
||||
}
|
||||
|
||||
if strings.EqualFold(scores.MetricType, "Gauge") {
|
||||
query.TimeAggregation = v3.TimeAggregationAvg
|
||||
query.SpaceAggregation = v3.SpaceAggregationAvg
|
||||
} else if strings.EqualFold(scores.MetricType, "Sum") {
|
||||
query.TimeAggregation = v3.TimeAggregationRate
|
||||
query.SpaceAggregation = v3.SpaceAggregationSum
|
||||
}
|
||||
|
||||
query.AggregateAttribute = v3.AttributeKey{
|
||||
Key: metricName, // Use the metric name as the attribute key. // Assuming a numeric value.
|
||||
Type: v3.AttributeKeyType(scores.MetricType), // "Gauge" or "Sum" as provided.
|
||||
}
|
||||
|
||||
query.StepInterval = 60
|
||||
|
||||
return &query
|
||||
}
|
||||
|
||||
func (receiver *SummaryService) GetInspectMetrics(ctx context.Context, params *metrics_explorer.InspectMetricsRequest) (*metrics_explorer.InspectMetricsResponse, *model.ApiError) {
|
||||
return receiver.reader.GetInspectMetrics(ctx, params)
|
||||
}
|
||||
@@ -315,6 +315,7 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server,
|
||||
api.RegisterWebSocketPaths(r, am)
|
||||
api.RegisterQueryRangeV4Routes(r, am)
|
||||
api.RegisterMessagingQueuesRoutes(r, am)
|
||||
api.MetricExplorerRoutes(r, am)
|
||||
|
||||
c := cors.New(cors.Options{
|
||||
AllowedOrigins: []string{"*"},
|
||||
|
||||
144
pkg/query-service/app/summary.go
Normal file
144
pkg/query-service/app/summary.go
Normal file
@@ -0,0 +1,144 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
|
||||
explorer "go.signoz.io/signoz/pkg/query-service/app/metricsexplorer"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (aH *APIHandler) FilterKeysSuggestion(w http.ResponseWriter, r *http.Request) {
|
||||
bodyBytes, _ := io.ReadAll(r.Body)
|
||||
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
||||
ctx := r.Context()
|
||||
params, apiError := explorer.ParseFilterKeySuggestions(r)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error parsing summary filter keys request", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
keys, apiError := aH.SummaryService.FilterKeys(ctx, params)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error getting filter keys", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, keys)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) FilterValuesSuggestion(w http.ResponseWriter, r *http.Request) {
|
||||
bodyBytes, _ := io.ReadAll(r.Body)
|
||||
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
||||
ctx := r.Context()
|
||||
params, apiError := explorer.ParseFilterValueSuggestions(r)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error parsing summary filter values request", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
|
||||
values, apiError := aH.SummaryService.FilterValues(ctx, params)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error getting filter values", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, values)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) GetMetricsDetails(w http.ResponseWriter, r *http.Request) {
|
||||
metricName := mux.Vars(r)["metric_name"]
|
||||
ctx := r.Context()
|
||||
metricsDetail, apiError := aH.SummaryService.GetMetricsSummary(ctx, metricName)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error parsing metric query range params", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, metricsDetail)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) ListMetrics(w http.ResponseWriter, r *http.Request) {
|
||||
bodyBytes, _ := io.ReadAll(r.Body)
|
||||
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
||||
ctx := r.Context()
|
||||
params, apiError := explorer.ParseSummaryListMetricsParams(r)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error parsing metric list metric summary api request", zap.Error(apiError.Err))
|
||||
RespondError(w, model.BadRequest(apiError), nil)
|
||||
return
|
||||
}
|
||||
|
||||
slmr, apiErr := aH.SummaryService.ListMetricsWithSummary(ctx, params)
|
||||
if apiErr != nil {
|
||||
zap.L().Error("error parsing metric query range params", zap.Error(apiErr.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, slmr)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) GetTreeMap(w http.ResponseWriter, r *http.Request) {
|
||||
bodyBytes, _ := io.ReadAll(r.Body)
|
||||
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
||||
ctx := r.Context()
|
||||
params, apiError := explorer.ParseTreeMapMetricsParams(r)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error parsing metric query range params", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
result, apiError := aH.SummaryService.GetMetricsTreemap(ctx, params)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error getting heatmap data", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, result)
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) GetRelatedMetrics(w http.ResponseWriter, r *http.Request) {
|
||||
bodyBytes, _ := io.ReadAll(r.Body)
|
||||
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
||||
ctx := r.Context()
|
||||
params, apiError := explorer.ParseRelatedMetricsParams(r)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error parsing metric query range params", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
result, apiError := aH.SummaryService.GetRelatedMetrics(ctx, params)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error getting heatmap data", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, result)
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) GetInspectMetricsData(w http.ResponseWriter, r *http.Request) {
|
||||
bodyBytes, _ := io.ReadAll(r.Body)
|
||||
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
||||
ctx := r.Context()
|
||||
params, apiError := explorer.ParseInspectMetricsParams(r)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error parsing metric query range params", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
result, apiError := aH.SummaryService.GetInspectMetrics(ctx, params)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error getting inspect metrics data", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, result)
|
||||
|
||||
}
|
||||
@@ -85,6 +85,8 @@ var TimestampSortFeature = GetOrDefaultEnv("TIMESTAMP_SORT_FEATURE", "true")
|
||||
|
||||
var PreferRPMFeature = GetOrDefaultEnv("PREFER_RPM_FEATURE", "false")
|
||||
|
||||
var MetricsExplorerClickhouseThreads = GetOrDefaultEnvInt("METRICS_EXPLORER_CLICKHOUSE_THREADS", 8)
|
||||
|
||||
// TODO(srikanthccv): remove after backfilling is done
|
||||
func UseMetricsPreAggregation() bool {
|
||||
return GetOrDefaultEnv("USE_METRICS_PRE_AGGREGATION", "true") == "true"
|
||||
@@ -231,6 +233,9 @@ const (
|
||||
SIGNOZ_TIMESERIES_v4_1WEEK_LOCAL_TABLENAME = "time_series_v4_1week"
|
||||
SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME = "distributed_time_series_v4_1day"
|
||||
SIGNOZ_TOP_LEVEL_OPERATIONS_TABLENAME = "distributed_top_level_operations"
|
||||
SIGNOZ_TIMESERIES_v4_TABLENAME = "distributed_time_series_v4"
|
||||
SIGNOZ_TIMESERIES_v4_1WEEK_TABLENAME = "distributed_time_series_v4_1week"
|
||||
SIGNOZ_TIMESERIES_v4_6HRS_TABLENAME = "distributed_time_series_v4_6hrs"
|
||||
)
|
||||
|
||||
// alert related constants
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/model/metrics_explorer"
|
||||
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/stats"
|
||||
@@ -115,6 +117,23 @@ type Reader interface {
|
||||
//trace
|
||||
GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)
|
||||
UpdateTraceField(ctx context.Context, field *model.UpdateField) *model.ApiError
|
||||
|
||||
GetAllMetricFilterAttributeValues(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError)
|
||||
GetAllMetricFilterUnits(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError)
|
||||
GetAllMetricFilterTypes(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError)
|
||||
GetAllMetricFilterAttributeKeys(ctx context.Context, req *metrics_explorer.FilterKeyRequest, skipDotNames bool) (*[]v3.AttributeKey, *model.ApiError)
|
||||
|
||||
GetMetricsDataPointsAndLastReceived(ctx context.Context, metricName string) (uint64, uint64, *model.ApiError)
|
||||
GetTotalTimeSeriesForMetricName(ctx context.Context, metricName string) (uint64, *model.ApiError)
|
||||
GetActiveTimeSeriesForMetricName(ctx context.Context, metricName string, duration time.Duration) (uint64, *model.ApiError)
|
||||
GetAttributesForMetricName(ctx context.Context, metricName string) (*[]metrics_explorer.Attribute, *model.ApiError)
|
||||
|
||||
ListSummaryMetrics(ctx context.Context, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError)
|
||||
|
||||
GetMetricsTimeSeriesPercentage(ctx context.Context, request *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError)
|
||||
GetMetricsSamplesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError)
|
||||
GetRelatedMetrics(ctx context.Context, req *metrics_explorer.RelatedMetricsRequest) (map[string]metrics_explorer.RelatedMetricsScore, *model.ApiError)
|
||||
GetInspectMetrics(ctx context.Context, req *metrics_explorer.InspectMetricsRequest) (*metrics_explorer.InspectMetricsResponse, *model.ApiError)
|
||||
}
|
||||
|
||||
type Querier interface {
|
||||
|
||||
160
pkg/query-service/model/metrics_explorer/summary.go
Normal file
160
pkg/query-service/model/metrics_explorer/summary.go
Normal file
@@ -0,0 +1,160 @@
|
||||
package metrics_explorer
|
||||
|
||||
import (
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
)
|
||||
|
||||
type SummaryListMetricsRequest struct {
|
||||
Offset int `json:"offset"`
|
||||
Limit int `json:"limit"`
|
||||
OrderBy v3.OrderBy `json:"orderBy"`
|
||||
Start int64 `json:"start"`
|
||||
EndD int64 `json:"end"`
|
||||
Filters v3.FilterSet `json:"filters"`
|
||||
}
|
||||
|
||||
type TreeMapType string
|
||||
|
||||
const (
|
||||
TimeSeriesTeeMap TreeMapType = "timeseries"
|
||||
SamplesTreeMap TreeMapType = "samples"
|
||||
)
|
||||
|
||||
type TreeMapMetricsRequest struct {
|
||||
Limit int `json:"limit"`
|
||||
Treemap TreeMapType `json:"treemap"`
|
||||
Start int64 `json:"start"`
|
||||
EndD int64 `json:"end"`
|
||||
Filters v3.FilterSet `json:"filters"`
|
||||
}
|
||||
|
||||
type MetricDetail struct {
|
||||
MetricName string `json:"metric_name"`
|
||||
Description string `json:"description"`
|
||||
Type string `json:"type"`
|
||||
Unit string `json:"unit"`
|
||||
TimeSeries uint64 `json:"timeseries"`
|
||||
Samples uint64 `json:"samples"`
|
||||
LastReceived int64 `json:"lastReceived"`
|
||||
}
|
||||
|
||||
type TreeMapResponseItem struct {
|
||||
Percentage float64 `json:"percentage"`
|
||||
TotalValue uint64 `json:"total_value"`
|
||||
MetricName string `json:"metric_name"`
|
||||
}
|
||||
|
||||
type TreeMap struct {
|
||||
TimeSeries []TreeMapResponseItem `json:"timeseries"`
|
||||
Samples []TreeMapResponseItem `json:"samples"`
|
||||
}
|
||||
|
||||
type SummaryListMetricsResponse struct {
|
||||
Metrics []MetricDetail `json:"metrics"`
|
||||
Total uint64 `json:"total"`
|
||||
}
|
||||
|
||||
type Attribute struct {
|
||||
Key string `json:"key" db:"key"`
|
||||
Value []string `json:"value" db:"value"`
|
||||
ValueCount uint64 `json:"valueCount" db:"valueCount"`
|
||||
}
|
||||
|
||||
// Metadata holds additional information about the metric.
|
||||
type Metadata struct {
|
||||
MetricType string `json:"metric_type"`
|
||||
Description string `json:"description"`
|
||||
Unit string `json:"unit"`
|
||||
}
|
||||
|
||||
// Alert represents individual alerts associated with the metric.
|
||||
type Alert struct {
|
||||
AlertName string `json:"alert_name"`
|
||||
AlertID string `json:"alert_id"`
|
||||
}
|
||||
|
||||
// Dashboard represents individual dashboards associated with the metric.
|
||||
type Dashboard struct {
|
||||
DashboardName string `json:"dashboard_name"`
|
||||
DashboardID string `json:"dashboard_id"`
|
||||
WidgetID string `json:"widget_id"`
|
||||
WidgetName string `json:"widget_name"`
|
||||
}
|
||||
|
||||
type MetricDetailsDTO struct {
|
||||
Name string `json:"name"`
|
||||
Description string `json:"description"`
|
||||
Type string `json:"type"`
|
||||
Unit string `json:"unit"`
|
||||
Samples uint64 `json:"samples"`
|
||||
TimeSeriesTotal uint64 `json:"timeSeriesTotal"`
|
||||
TimeSeriesActive uint64 `json:"timeSeriesActive"`
|
||||
LastReceived uint64 `json:"lastReceived"`
|
||||
Attributes []Attribute `json:"attributes"`
|
||||
Metadata Metadata `json:"metadata"`
|
||||
Alerts []Alert `json:"alerts"`
|
||||
Dashboards []Dashboard `json:"dashboards"`
|
||||
}
|
||||
|
||||
type FilterKeyRequest struct {
|
||||
SearchText string `json:"searchText"`
|
||||
Limit int `json:"limit"`
|
||||
}
|
||||
|
||||
type FilterValueRequest struct {
|
||||
FilterKey string `json:"filterKey"`
|
||||
FilterAttributeKeyDataType v3.AttributeKeyDataType `json:"filterAttributeKeyDataType"`
|
||||
SearchText string `json:"searchText"`
|
||||
Limit int `json:"limit"`
|
||||
}
|
||||
|
||||
type FilterValueResponse struct {
|
||||
FilterValues []string `json:"filterValues"`
|
||||
}
|
||||
|
||||
type FilterKeyResponse struct {
|
||||
MetricColumns []string `json:"metricColumns"`
|
||||
AttributeKeys []v3.AttributeKey `json:"attributeKeys"`
|
||||
}
|
||||
|
||||
var AvailableColumnFilterMap = map[string]bool{
|
||||
"metric_name": true,
|
||||
"metric_unit": true,
|
||||
"metric_type": true,
|
||||
}
|
||||
|
||||
type RelatedMetricsScore struct {
|
||||
AttributeSimilarity float64
|
||||
NameSimilarity float64
|
||||
Filters [][]string
|
||||
MetricType string
|
||||
}
|
||||
|
||||
type RelatedMetricsRequest struct {
|
||||
CurrentMetricName string `json:"currentMetricName"`
|
||||
Start int64 `json:"start"`
|
||||
End int64 `json:"end"`
|
||||
Filters v3.FilterSet `json:"filters"`
|
||||
}
|
||||
|
||||
type RelatedMetricsResponse struct {
|
||||
RelatedMetrics []RelatedMetrics `json:"related_metrics"`
|
||||
}
|
||||
|
||||
type RelatedMetrics struct {
|
||||
Name string `json:"name"`
|
||||
Dashboards []Dashboard `json:"dashboards"`
|
||||
Alerts []Alert `json:"alerts"`
|
||||
Query *v3.BuilderQuery `json:"query"`
|
||||
}
|
||||
|
||||
type InspectMetricsRequest struct {
|
||||
MetricName string `json:"metricName"`
|
||||
Filters v3.FilterSet `json:"filters"`
|
||||
Start int64 `json:"start"`
|
||||
End int64 `json:"end"`
|
||||
}
|
||||
|
||||
type InspectMetricsResponse struct {
|
||||
Series *[]v3.Series `json:"series,omitempty"`
|
||||
}
|
||||
153
pkg/query-service/utils/filter_conditions.go
Normal file
153
pkg/query-service/utils/filter_conditions.go
Normal file
@@ -0,0 +1,153 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/model/metrics_explorer"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
)
|
||||
|
||||
// skipKey is an optional parameter to skip processing of a specific key
|
||||
func BuildFilterConditions(fs *v3.FilterSet, skipKey string) ([]string, error) {
|
||||
if fs == nil || len(fs.Items) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var conditions []string
|
||||
|
||||
for _, item := range fs.Items {
|
||||
if skipKey != "" && item.Key.Key == skipKey {
|
||||
continue
|
||||
}
|
||||
|
||||
toFormat := item.Value
|
||||
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
|
||||
if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains {
|
||||
toFormat = fmt.Sprintf("%%%s%%", toFormat)
|
||||
}
|
||||
fmtVal := ClickHouseFormattedValue(toFormat)
|
||||
|
||||
// Determine if the key is a JSON key or a normal column
|
||||
isJSONKey := false
|
||||
if _, exists := metrics_explorer.AvailableColumnFilterMap[item.Key.Key]; exists {
|
||||
isJSONKey = false
|
||||
} else {
|
||||
isJSONKey = true
|
||||
}
|
||||
|
||||
condition, err := buildSingleFilterCondition(item.Key.Key, op, fmtVal, isJSONKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conditions = append(conditions, condition)
|
||||
}
|
||||
|
||||
return conditions, nil
|
||||
}
|
||||
|
||||
func buildSingleFilterCondition(key string, op v3.FilterOperator, fmtVal string, isJSONKey bool) (string, error) {
|
||||
var keyCondition string
|
||||
if isJSONKey {
|
||||
keyCondition = fmt.Sprintf("JSONExtractString(labels, '%s')", key)
|
||||
} else { // Assuming normal column access
|
||||
if key == "metric_unit" {
|
||||
key = "unit"
|
||||
}
|
||||
if key == "metric_type" {
|
||||
key = "type"
|
||||
}
|
||||
keyCondition = key
|
||||
}
|
||||
|
||||
switch op {
|
||||
case v3.FilterOperatorEqual:
|
||||
return fmt.Sprintf("%s = %s", keyCondition, fmtVal), nil
|
||||
case v3.FilterOperatorNotEqual:
|
||||
return fmt.Sprintf("%s != %s", keyCondition, fmtVal), nil
|
||||
case v3.FilterOperatorIn:
|
||||
return fmt.Sprintf("%s IN %s", keyCondition, fmtVal), nil
|
||||
case v3.FilterOperatorNotIn:
|
||||
return fmt.Sprintf("%s NOT IN %s", keyCondition, fmtVal), nil
|
||||
case v3.FilterOperatorLike:
|
||||
return fmt.Sprintf("like(%s, %s)", keyCondition, fmtVal), nil
|
||||
case v3.FilterOperatorNotLike:
|
||||
return fmt.Sprintf("notLike(%s, %s)", keyCondition, fmtVal), nil
|
||||
case v3.FilterOperatorRegex:
|
||||
return fmt.Sprintf("match(%s, %s)", keyCondition, fmtVal), nil
|
||||
case v3.FilterOperatorNotRegex:
|
||||
return fmt.Sprintf("not match(%s, %s)", keyCondition, fmtVal), nil
|
||||
case v3.FilterOperatorGreaterThan:
|
||||
return fmt.Sprintf("%s > %s", keyCondition, fmtVal), nil
|
||||
case v3.FilterOperatorGreaterThanOrEq:
|
||||
return fmt.Sprintf("%s >= %s", keyCondition, fmtVal), nil
|
||||
case v3.FilterOperatorLessThan:
|
||||
return fmt.Sprintf("%s < %s", keyCondition, fmtVal), nil
|
||||
case v3.FilterOperatorLessThanOrEq:
|
||||
return fmt.Sprintf("%s <= %s", keyCondition, fmtVal), nil
|
||||
case v3.FilterOperatorContains:
|
||||
return fmt.Sprintf("like(%s, %s)", keyCondition, fmtVal), nil
|
||||
case v3.FilterOperatorNotContains:
|
||||
return fmt.Sprintf("notLike(%s, %s)", keyCondition, fmtVal), nil
|
||||
case v3.FilterOperatorExists:
|
||||
return fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", key), nil
|
||||
case v3.FilterOperatorNotExists:
|
||||
return fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", key), nil
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported filter operator: %s", op)
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
sixHoursInMilliseconds = time.Hour.Milliseconds() * 6
|
||||
oneDayInMilliseconds = time.Hour.Milliseconds() * 24
|
||||
oneWeekInMilliseconds = oneDayInMilliseconds * 7
|
||||
)
|
||||
|
||||
func WhichTSTableToUse(start, end int64) (int64, int64, string, string) {
|
||||
|
||||
var tableName string
|
||||
var localTableName string
|
||||
if end-start < sixHoursInMilliseconds {
|
||||
// adjust the start time to nearest 1 hour
|
||||
start = start - (start % (time.Hour.Milliseconds() * 1))
|
||||
tableName = constants.SIGNOZ_TIMESERIES_v4_TABLENAME
|
||||
localTableName = constants.SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME
|
||||
} else if end-start < oneDayInMilliseconds {
|
||||
// adjust the start time to nearest 6 hours
|
||||
start = start - (start % (time.Hour.Milliseconds() * 6))
|
||||
tableName = constants.SIGNOZ_TIMESERIES_v4_6HRS_TABLENAME
|
||||
localTableName = constants.SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME
|
||||
} else if end-start < oneWeekInMilliseconds {
|
||||
// adjust the start time to nearest 1 day
|
||||
start = start - (start % (time.Hour.Milliseconds() * 24))
|
||||
tableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME
|
||||
localTableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME
|
||||
} else {
|
||||
if constants.UseMetricsPreAggregation() {
|
||||
// adjust the start time to nearest 1 week
|
||||
start = start - (start % (time.Hour.Milliseconds() * 24 * 7))
|
||||
tableName = constants.SIGNOZ_TIMESERIES_v4_1WEEK_TABLENAME
|
||||
localTableName = constants.SIGNOZ_TIMESERIES_v4_1WEEK_LOCAL_TABLENAME
|
||||
} else {
|
||||
// continue to use the 1 day table
|
||||
start = start - (start % (time.Hour.Milliseconds() * 24))
|
||||
tableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME
|
||||
localTableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME
|
||||
}
|
||||
}
|
||||
|
||||
return start, end, tableName, localTableName
|
||||
}
|
||||
|
||||
func WhichSampleTableToUse(start, end int64) (string, string) {
|
||||
if end-start < oneDayInMilliseconds {
|
||||
return constants.SIGNOZ_SAMPLES_V4_TABLENAME, "count(*)"
|
||||
} else if end-start < oneWeekInMilliseconds {
|
||||
return constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME, "sum(count)"
|
||||
} else {
|
||||
return constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME, "sum(count)"
|
||||
}
|
||||
}
|
||||
@@ -350,3 +350,42 @@ func GetEpochNanoSecs(epoch int64) int64 {
|
||||
}
|
||||
return temp * int64(math.Pow(10, float64(19-count)))
|
||||
}
|
||||
|
||||
func NormalizeMap(data map[string]uint64) map[string]float64 {
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var minVal, maxVal uint64
|
||||
first := true
|
||||
for _, v := range data {
|
||||
if first {
|
||||
minVal, maxVal = v, v
|
||||
first = false
|
||||
} else {
|
||||
if v < minVal {
|
||||
minVal = v
|
||||
}
|
||||
if v > maxVal {
|
||||
maxVal = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If all values are the same, avoid division by zero
|
||||
if minVal == maxVal {
|
||||
normalized := make(map[string]float64)
|
||||
for k := range data {
|
||||
normalized[k] = 1.0 // or 0.0, depending on the convention
|
||||
}
|
||||
return normalized
|
||||
}
|
||||
|
||||
// Normalize the values using min-max normalization
|
||||
normalized := make(map[string]float64)
|
||||
for k, v := range data {
|
||||
normalized[k] = float64(v-minVal) / float64(maxVal-minVal)
|
||||
}
|
||||
|
||||
return normalized
|
||||
}
|
||||
|
||||
@@ -63,6 +63,10 @@ func (h *provider) clickHouseSettings(ctx context.Context, query string, args ..
|
||||
settings["timeout_before_checking_execution_speed"] = h.settings.TimeoutBeforeCheckingExecutionSpeed
|
||||
}
|
||||
|
||||
if ctx.Value("clickhouse_max_threads") != nil {
|
||||
if maxThreads, ok := ctx.Value("clickhouse_max_threads").(int); ok { settings["max_threads"] = maxThreads }
|
||||
}
|
||||
|
||||
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
|
||||
return ctx, query, args
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user