Compare commits

...

4 Commits

Author SHA1 Message Date
Srikanth Chekuri
ec21d1bca1 fix: tests 2024-01-04 07:53:11 +05:30
Srikanth Chekuri
52e98bcddf chore: add ee query limits 2024-01-04 00:55:58 +05:30
Srikanth Chekuri
a8b7e9582a Merge branch 'develop' of github.com:SigNoz/signoz into query-limits 2024-01-02 20:53:51 +05:30
Srikanth Chekuri
4a042fa413 feat: add time-series-limit config option 2023-11-14 02:58:59 +05:30
19 changed files with 298 additions and 47 deletions

View File

@@ -150,6 +150,7 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *baseapp.AuthMiddlew
router.HandleFunc("/api/v1/login", am.OpenAccess(ah.loginUser)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/traces/{traceId}", am.ViewAccess(ah.searchTraces)).Methods(http.MethodGet)
router.HandleFunc("/api/v2/metrics/query_range", am.ViewAccess(ah.queryRangeMetricsV2)).Methods(http.MethodPost)
router.HandleFunc("/api/v3/query_range", am.ViewAccess(ah.QueryRangeV3)).Methods(http.MethodPost)
// PAT APIs
router.HandleFunc("/api/v1/pat", am.OpenAccess(ah.createPAT)).Methods(http.MethodPost)
@@ -163,6 +164,10 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *baseapp.AuthMiddlew
router.HandleFunc("/api/v1/dashboards/{uuid}/lock", am.EditAccess(ah.lockDashboard)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/dashboards/{uuid}/unlock", am.EditAccess(ah.unlockDashboard)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/limits", am.AdminAccess(ah.getQueryLimits)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/limits", am.AdminAccess(ah.addQueryLimits)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/limits", am.AdminAccess(ah.updateQueryLimits)).Methods(http.MethodPut)
router.HandleFunc("/api/v2/licenses",
am.ViewAccess(ah.listLicensesV2)).
Methods(http.MethodGet)

View File

@@ -0,0 +1,51 @@
package api
import (
"encoding/json"
"net/http"
"go.signoz.io/signoz/ee/query-service/model"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
)
func (aH *APIHandler) getQueryLimits(w http.ResponseWriter, r *http.Request) {
queryLimits, err := aH.AppDao().GetQueryLimits(r.Context())
if err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, queryLimits)
}
func (aH *APIHandler) addQueryLimits(w http.ResponseWriter, r *http.Request) {
var queryLimits []*model.QueryLimit
if err := json.NewDecoder(r.Body).Decode(&queryLimits); err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err}, nil)
return
}
err := aH.AppDao().AddQueryLimits(r.Context(), queryLimits)
if err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, nil)
}
func (aH *APIHandler) updateQueryLimits(w http.ResponseWriter, r *http.Request) {
var queryLimits []*model.QueryLimit
if err := json.NewDecoder(r.Body).Decode(&queryLimits); err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err}, nil)
return
}
err := aH.AppDao().UpdateQueryLimits(r.Context(), queryLimits)
if err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, nil)
}

View File

@@ -4,14 +4,17 @@ import (
"bytes"
"fmt"
"net/http"
"strings"
"sync"
"text/template"
"time"
"go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/metrics"
"go.signoz.io/signoz/pkg/query-service/app/parser"
"go.signoz.io/signoz/pkg/query-service/constants"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate"
"go.uber.org/zap"
)
@@ -234,3 +237,52 @@ func (ah *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
resp := ResponseFormat{ResultType: "matrix", Result: seriesList, TableName: tableName}
ah.Respond(w, resp)
}
func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
queryRangeParams, apiErrorObj := app.ParseQueryRangeParams(r)
if apiErrorObj != nil {
zap.S().Errorf(apiErrorObj.Err.Error())
RespondError(w, apiErrorObj, nil)
return
}
limits, err := aH.AppDao().GetQueryLimits(r.Context())
if err != nil {
zap.S().Errorf("Error while getting query limits: %v", err)
// We don't want to fail the request if we can't get the limits
// iterate over the nil slice will not execute the loop
// and the query will be executed without any limits
// this shouldn't happen ideally but we don't want to fail the request
}
var timeSeriesLimt int
for _, limit := range limits {
if limit.Name == "time_series_limit" {
timeSeriesLimt = limit.UsageLimit
}
}
if len(queryRangeParams.CompositeQuery.BuilderQueries) > 0 {
for idx := range queryRangeParams.CompositeQuery.BuilderQueries {
queryRangeParams.CompositeQuery.BuilderQueries[idx].QueryLimits = v3.QueryLimits{
MaxTimeSeries: timeSeriesLimt,
}
// TODO(srikanthccv): should apply to explore page also
if !strings.Contains(queryRangeParams.SourcePage, "dashboard") {
queryRangeParams.CompositeQuery.BuilderQueries[idx].QueryLimits.MaxTimeSeries = 0
}
}
}
// add temporality for each metric
temporalityErr := aH.APIHandler.AddTemporality(r.Context(), queryRangeParams)
if temporalityErr != nil {
zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr)
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: temporalityErr}, nil)
return
}
aH.APIHandler.ExecQueryRangeV3(r.Context(), queryRangeParams, w, r)
}

View File

@@ -42,7 +42,6 @@ import (
baseconst "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/healthcheck"
basealm "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
baseint "go.signoz.io/signoz/pkg/query-service/interfaces"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
rules "go.signoz.io/signoz/pkg/query-service/rules"
@@ -87,7 +86,7 @@ type Server struct {
privateHTTP *http.Server
// feature flags
featureLookup baseint.FeatureLookup
featureLookup baseInterface.FeatureLookup
// Usage manager
usageManager *usage.Manager
@@ -641,7 +640,7 @@ func makeRulesManager(
alertManagerURL string,
ruleRepoURL string,
db *sqlx.DB,
ch baseint.Reader,
ch baseInterface.Reader,
disableRules bool,
fm baseInterface.FeatureLookup) (*rules.Manager, error) {

View File

@@ -39,4 +39,8 @@ type ModelDao interface {
GetUserByPAT(ctx context.Context, token string) (*basemodel.UserPayload, basemodel.BaseApiError)
ListPATs(ctx context.Context, userID string) ([]model.PAT, basemodel.BaseApiError)
DeletePAT(ctx context.Context, id string) basemodel.BaseApiError
GetQueryLimits(ctx context.Context) ([]*model.QueryLimit, error)
AddQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error
UpdateQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error
}

View File

@@ -0,0 +1,54 @@
package sqlite
import (
"context"
"go.signoz.io/signoz/ee/query-service/model"
)
// GetQueryLimits returns the query limits
func (m *modelDao) GetQueryLimits(ctx context.Context) ([]*model.QueryLimit, error) {
query := `SELECT name, title, usage_limit FROM resource_usage_limits`
var queryLimits []*model.QueryLimit
err := m.DB().Select(&queryLimits, query)
if err != nil {
return nil, err
}
return queryLimits, nil
}
// AddQueryLimits adds the query limits
func (m *modelDao) AddQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error {
tx := m.DB().MustBegin()
for _, queryLimit := range queryLimits {
query := `INSERT INTO resource_usage_limits (name, title, usage_limit) VALUES (?, ?, ?)`
_, err := tx.Exec(query, queryLimit.Name, queryLimit.Title, queryLimit.UsageLimit)
if err != nil {
tx.Rollback()
return err
}
}
err := tx.Commit()
if err != nil {
return err
}
return nil
}
// UpdateQueryLimits updates the query limits
func (m *modelDao) UpdateQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error {
tx := m.DB().MustBegin()
for _, queryLimit := range queryLimits {
query := `UPDATE resource_usage_limits SET usage_limit = ? WHERE name = ?`
_, err := tx.Exec(query, queryLimit.UsageLimit, queryLimit.Name)
if err != nil {
tx.Rollback()
return err
}
}
err := tx.Commit()
if err != nil {
return err
}
return nil
}

View File

@@ -38,7 +38,7 @@ func InitDB(dataSourceName string) (*modelDao, error) {
basedao.SetDB(dao)
m := &modelDao{ModelDaoSqlite: dao}
table_schema := `
tableSchema := `
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS org_domains(
id TEXT PRIMARY KEY,
@@ -60,11 +60,23 @@ func InitDB(dataSourceName string) (*modelDao, error) {
);
`
_, err = m.DB().Exec(table_schema)
_, err = m.DB().Exec(tableSchema)
if err != nil {
return nil, fmt.Errorf("error in creating tables: %v", err.Error())
}
// create resource_usage_limits table
tableSchema = `CREATE TABLE IF NOT EXISTS resource_usage_limits (
name TEXT PRIMARY KEY,
title TEXT,
usage_limit INTEGER DEFAULT 0
);`
_, err = m.DB().Exec(tableSchema)
if err != nil {
return nil, fmt.Errorf("Error in creating resource_usage_limits table: %s", err.Error())
}
return m, nil
}

View File

@@ -9,8 +9,8 @@ import (
"github.com/google/uuid"
"github.com/pkg/errors"
saml2 "github.com/russellhaering/gosaml2"
"go.signoz.io/signoz/ee/query-service/sso/saml"
"go.signoz.io/signoz/ee/query-service/sso"
"go.signoz.io/signoz/ee/query-service/sso/saml"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
)
@@ -24,16 +24,16 @@ const (
// OrgDomain identify org owned web domains for auth and other purposes
type OrgDomain struct {
Id uuid.UUID `json:"id"`
Name string `json:"name"`
OrgId string `json:"orgId"`
SsoEnabled bool `json:"ssoEnabled"`
SsoType SSOType `json:"ssoType"`
Id uuid.UUID `json:"id"`
Name string `json:"name"`
OrgId string `json:"orgId"`
SsoEnabled bool `json:"ssoEnabled"`
SsoType SSOType `json:"ssoType"`
SamlConfig *SamlConfig `json:"samlConfig"`
SamlConfig *SamlConfig `json:"samlConfig"`
GoogleAuthConfig *GoogleOAuthConfig `json:"googleAuthConfig"`
Org *basemodel.Organization
Org *basemodel.Organization
}
func (od *OrgDomain) String() string {
@@ -100,8 +100,8 @@ func (od *OrgDomain) GetSAMLCert() string {
return ""
}
// PrepareGoogleOAuthProvider creates GoogleProvider that is used in
// requesting OAuth and also used in processing response from google
// PrepareGoogleOAuthProvider creates GoogleProvider that is used in
// requesting OAuth and also used in processing response from google
func (od *OrgDomain) PrepareGoogleOAuthProvider(siteUrl *url.URL) (sso.OAuthCallbackProvider, error) {
if od.GoogleAuthConfig == nil {
return nil, fmt.Errorf("Google auth is not setup correctly for this domain")
@@ -137,38 +137,36 @@ func (od *OrgDomain) PrepareSamlRequest(siteUrl *url.URL) (*saml2.SAMLServicePro
}
func (od *OrgDomain) BuildSsoUrl(siteUrl *url.URL) (ssoUrl string, err error) {
fmtDomainId := strings.Replace(od.Id.String(), "-", ":", -1)
// build redirect url from window.location sent by frontend
redirectURL := fmt.Sprintf("%s://%s%s", siteUrl.Scheme, siteUrl.Host, siteUrl.Path)
// prepare state that gets relayed back when the auth provider
// calls back our url. here we pass the app url (where signoz runs)
// and the domain Id. The domain Id helps in identifying sso config
// when the call back occurs and the app url is useful in redirecting user
// back to the right path.
// when the call back occurs and the app url is useful in redirecting user
// back to the right path.
// why do we need to pass app url? the callback typically is handled by backend
// and sometimes backend might right at a different port or is unaware of frontend
// endpoint (unless SITE_URL param is set). hence, we receive this build sso request
// along with frontend window.location and use it to relay the information through
// auth provider to the backend (HandleCallback or HandleSSO method).
// along with frontend window.location and use it to relay the information through
// auth provider to the backend (HandleCallback or HandleSSO method).
relayState := fmt.Sprintf("%s?domainId=%s", redirectURL, fmtDomainId)
switch (od.SsoType) {
switch od.SsoType {
case SAML:
sp, err := od.PrepareSamlRequest(siteUrl)
if err != nil {
return "", err
}
return sp.BuildAuthURL(relayState)
case GoogleAuth:
googleProvider, err := od.PrepareGoogleOAuthProvider(siteUrl)
if err != nil {
return "", err
@@ -177,8 +175,7 @@ func (od *OrgDomain) BuildSsoUrl(siteUrl *url.URL) (ssoUrl string, err error) {
default:
zap.S().Errorf("found unsupported SSO config for the org domain", zap.String("orgDomain", od.Name))
return "", fmt.Errorf("unsupported SSO config for the domain")
return "", fmt.Errorf("unsupported SSO config for the domain")
}
}

View File

@@ -0,0 +1,12 @@
package model
type QueryLimit struct {
// Name of the query limit
Name string `db:"name" json:"name"`
// Title of the query limit
Title string `db:"title" json:"title"`
// UsageLimit indicates the usage limit of the query
// If the usage limit is 0, then there is no limit
UsageLimit int `db:"usage_limit" json:"usage_limit"`
}

View File

@@ -4491,6 +4491,14 @@ func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNam
seriesToPoints[key] = append(seriesToPoints[key], metricPoint)
}
if rows.Err() != nil {
zap.S().Errorf("error while reading time series result %v", rows.Err())
if strings.Contains(rows.Err().Error(), "code: 191") {
return nil, fmt.Errorf(`Exceeded max number of series per query. Please reduce the number of series by applying filters.`)
}
return nil, rows.Err()
}
var seriesList []*v3.Series
for _, key := range keys {
points := seriesToPoints[key]

View File

@@ -514,7 +514,7 @@ func (aH *APIHandler) metricAutocompleteTagValue(w http.ResponseWriter, r *http.
aH.Respond(w, tagValueList)
}
func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
func (aH *APIHandler) AddTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
metricNames := make([]string, 0)
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
@@ -3014,7 +3014,7 @@ func (aH *APIHandler) QueryRangeV3Format(w http.ResponseWriter, r *http.Request)
aH.Respond(w, queryRangeParams)
}
func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) {
func (aH *APIHandler) ExecQueryRangeV3(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) {
var result []*v3.Result
var err error
@@ -3080,14 +3080,14 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
// add temporality for each metric
temporalityErr := aH.addTemporality(r.Context(), queryRangeParams)
temporalityErr := aH.AddTemporality(r.Context(), queryRangeParams)
if temporalityErr != nil {
zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr)
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)
return
}
aH.queryRangeV3(r.Context(), queryRangeParams, w, r)
aH.ExecQueryRangeV3(r.Context(), queryRangeParams, w, r)
}
func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {

View File

@@ -38,7 +38,7 @@ func TestPanelTableForCumulative(t *testing.T) {
},
Expression: "A",
},
expected: "SELECT toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts",
expected: "SELECT toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts",
},
{
name: "latency p50",
@@ -61,7 +61,7 @@ func TestPanelTableForCumulative(t *testing.T) {
},
Expression: "A",
},
expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, le ORDER BY fingerprint, le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, le ORDER BY fingerprint, le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
},
{
name: "latency p99 with group by",
@@ -80,7 +80,7 @@ func TestPanelTableForCumulative(t *testing.T) {
},
Expression: "A",
},
expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT service_name,le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, service_name,le ORDER BY fingerprint, service_name ASC,le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT service_name,le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, service_name,le ORDER BY fingerprint, service_name ASC,le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
},
}

View File

@@ -38,7 +38,7 @@ func TestPanelTableForDelta(t *testing.T) {
},
Expression: "A",
},
expected: "SELECT toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY ts ORDER BY ts",
expected: "SELECT toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY ts ORDER BY ts",
},
{
name: "latency p50",
@@ -61,7 +61,7 @@ func TestPanelTableForDelta(t *testing.T) {
},
Expression: "A",
},
expected: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
expected: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
},
{
name: "latency p99 with group by",
@@ -80,7 +80,7 @@ func TestPanelTableForDelta(t *testing.T) {
},
Expression: "A",
},
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
},
}

View File

@@ -132,7 +132,7 @@ func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.Attribut
}
}
filterSubQuery := fmt.Sprintf("SELECT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, queryString)
filterSubQuery := fmt.Sprintf("SELECT DISTINCT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, queryString)
return filterSubQuery, nil
}
@@ -502,6 +502,18 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
if panelType == v3.PanelTypeValue {
query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator)
}
// We want to prevent the query from returning too many time series, which can cause
// excessive resource usage and slow query times. This usually happens when there are
// no filters on the query.
//
// We use the DISTINCT to remove duplicate time series, and use the max_rows_in_distinct
// setting to throw an error if the number of time series is too high.
// See https://clickhouse.com/docs/en/operations/settings/query-complexity#max-rows-in-distinct
//
// This can be disabled by setting the limit to 0.
if mq.QueryLimits.MaxTimeSeries != 0 {
query = query + " SETTINGS max_rows_in_distinct = " + fmt.Sprintf("%d", mq.QueryLimits.MaxTimeSeries)
}
return query, err
}

View File

@@ -245,7 +245,7 @@ func TestBuildQueryOperators(t *testing.T) {
func TestBuildQueryXRate(t *testing.T) {
t.Run("TestBuildQueryXRate", func(t *testing.T) {
tmpl := `SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts`
tmpl := `SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts`
cases := []struct {
aggregateOperator v3.AggregateOperator
@@ -298,7 +298,7 @@ func TestBuildQueryXRate(t *testing.T) {
func TestBuildQueryRPM(t *testing.T) {
t.Run("TestBuildQueryXRate", func(t *testing.T) {
tmpl := `SELECT ts, ceil(value * 60) as value FROM (SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts)`
tmpl := `SELECT ts, ceil(value * 60) as value FROM (SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts)`
cases := []struct {
aggregateOperator v3.AggregateOperator
@@ -520,3 +520,34 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
})
}
}
func TestBuildQueryLimits(t *testing.T) {
t.Run("TestBuildQueryLimits", func(t *testing.T) {
q := &v3.QueryRangeParamsV3{
Start: 1650991982000,
End: 1651078382000,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
AggregateAttribute: v3.AttributeKey{Key: "name"},
Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "a"}, Value: "b", Operator: v3.FilterOperatorNotEqual},
{Key: v3.AttributeKey{Key: "code"}, Value: "ERROR_*", Operator: v3.FilterOperatorNotRegex},
}},
AggregateOperator: v3.AggregateOperatorRateMax,
Expression: "A",
QueryLimits: v3.QueryLimits{
MaxTimeSeries: 100,
},
},
},
},
}
query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"], Options{PreferRPM: false})
require.NoError(t, err)
require.Contains(t, query, "SETTINGS max_rows_in_distinct = 100")
})
}

View File

@@ -145,7 +145,14 @@ func (q *querier) runBuilderQuery(
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.
if _, ok := cacheKeys[queryName]; !ok {
query, err := metricsV3.PrepareMetricQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM})
query, err := metricsV3.PrepareMetricQuery(
params.Start,
params.End,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
metricsV3.Options{PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
@@ -175,7 +182,7 @@ func (q *querier) runBuilderQuery(
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
metricsV3.Options{},
metricsV3.Options{PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{

View File

@@ -233,7 +233,8 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...in
queries[queryName] = queryString
}
case v3.DataSourceMetrics:
queryString, err := qb.options.BuildMetricQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, metricsV3.Options{PreferRPM: PreferRPMFeatureEnabled})
metricsOptions := metricsV3.Options{PreferRPM: PreferRPMFeatureEnabled}
queryString, err := qb.options.BuildMetricQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, metricsOptions)
if err != nil {
return nil, err
}

View File

@@ -257,7 +257,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
},
},
queryToTest: "A",
expected: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts",
expected: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts",
},
{
name: "TestQueryWithExpression - Error rate",
@@ -327,7 +327,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
},
},
queryToTest: "C",
expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`",
expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`",
},
{
name: "TestQuery - Quantile",
@@ -354,7 +354,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
},
},
queryToTest: "A",
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
},
}

View File

@@ -353,6 +353,7 @@ type QueryRangeParamsV3 struct {
CompositeQuery *CompositeQuery `json:"compositeQuery"`
Variables map[string]interface{} `json:"variables,omitempty"`
NoCache bool `json:"noCache"`
SourcePage string `json:"sourcePage"`
}
type PromQuery struct {
@@ -501,6 +502,11 @@ type BuilderQuery struct {
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
Functions []Function `json:"functions,omitempty"`
QueryLimits QueryLimits
}
type QueryLimits struct {
MaxTimeSeries int
}
func (b *BuilderQuery) Validate() error {