Compare commits

...

4 Commits

Author SHA1 Message Date
Srikanth Chekuri
ec21d1bca1 fix: tests 2024-01-04 07:53:11 +05:30
Srikanth Chekuri
52e98bcddf chore: add ee query limits 2024-01-04 00:55:58 +05:30
Srikanth Chekuri
a8b7e9582a Merge branch 'develop' of github.com:SigNoz/signoz into query-limits 2024-01-02 20:53:51 +05:30
Srikanth Chekuri
4a042fa413 feat: add time-series-limit config option 2023-11-14 02:58:59 +05:30
19 changed files with 298 additions and 47 deletions

View File

@@ -150,6 +150,7 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *baseapp.AuthMiddlew
router.HandleFunc("/api/v1/login", am.OpenAccess(ah.loginUser)).Methods(http.MethodPost) router.HandleFunc("/api/v1/login", am.OpenAccess(ah.loginUser)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/traces/{traceId}", am.ViewAccess(ah.searchTraces)).Methods(http.MethodGet) router.HandleFunc("/api/v1/traces/{traceId}", am.ViewAccess(ah.searchTraces)).Methods(http.MethodGet)
router.HandleFunc("/api/v2/metrics/query_range", am.ViewAccess(ah.queryRangeMetricsV2)).Methods(http.MethodPost) router.HandleFunc("/api/v2/metrics/query_range", am.ViewAccess(ah.queryRangeMetricsV2)).Methods(http.MethodPost)
router.HandleFunc("/api/v3/query_range", am.ViewAccess(ah.QueryRangeV3)).Methods(http.MethodPost)
// PAT APIs // PAT APIs
router.HandleFunc("/api/v1/pat", am.OpenAccess(ah.createPAT)).Methods(http.MethodPost) router.HandleFunc("/api/v1/pat", am.OpenAccess(ah.createPAT)).Methods(http.MethodPost)
@@ -163,6 +164,10 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *baseapp.AuthMiddlew
router.HandleFunc("/api/v1/dashboards/{uuid}/lock", am.EditAccess(ah.lockDashboard)).Methods(http.MethodPut) router.HandleFunc("/api/v1/dashboards/{uuid}/lock", am.EditAccess(ah.lockDashboard)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/dashboards/{uuid}/unlock", am.EditAccess(ah.unlockDashboard)).Methods(http.MethodPut) router.HandleFunc("/api/v1/dashboards/{uuid}/unlock", am.EditAccess(ah.unlockDashboard)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/limits", am.AdminAccess(ah.getQueryLimits)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/limits", am.AdminAccess(ah.addQueryLimits)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/limits", am.AdminAccess(ah.updateQueryLimits)).Methods(http.MethodPut)
router.HandleFunc("/api/v2/licenses", router.HandleFunc("/api/v2/licenses",
am.ViewAccess(ah.listLicensesV2)). am.ViewAccess(ah.listLicensesV2)).
Methods(http.MethodGet) Methods(http.MethodGet)

View File

@@ -0,0 +1,51 @@
package api
import (
"encoding/json"
"net/http"
"go.signoz.io/signoz/ee/query-service/model"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
)
func (aH *APIHandler) getQueryLimits(w http.ResponseWriter, r *http.Request) {
queryLimits, err := aH.AppDao().GetQueryLimits(r.Context())
if err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, queryLimits)
}
func (aH *APIHandler) addQueryLimits(w http.ResponseWriter, r *http.Request) {
var queryLimits []*model.QueryLimit
if err := json.NewDecoder(r.Body).Decode(&queryLimits); err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err}, nil)
return
}
err := aH.AppDao().AddQueryLimits(r.Context(), queryLimits)
if err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, nil)
}
func (aH *APIHandler) updateQueryLimits(w http.ResponseWriter, r *http.Request) {
var queryLimits []*model.QueryLimit
if err := json.NewDecoder(r.Body).Decode(&queryLimits); err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err}, nil)
return
}
err := aH.AppDao().UpdateQueryLimits(r.Context(), queryLimits)
if err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, nil)
}

View File

@@ -4,14 +4,17 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"net/http" "net/http"
"strings"
"sync" "sync"
"text/template" "text/template"
"time" "time"
"go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/metrics" "go.signoz.io/signoz/pkg/query-service/app/metrics"
"go.signoz.io/signoz/pkg/query-service/app/parser" "go.signoz.io/signoz/pkg/query-service/app/parser"
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
basemodel "go.signoz.io/signoz/pkg/query-service/model" basemodel "go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate" querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate"
"go.uber.org/zap" "go.uber.org/zap"
) )
@@ -234,3 +237,52 @@ func (ah *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
resp := ResponseFormat{ResultType: "matrix", Result: seriesList, TableName: tableName} resp := ResponseFormat{ResultType: "matrix", Result: seriesList, TableName: tableName}
ah.Respond(w, resp) ah.Respond(w, resp)
} }
func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
queryRangeParams, apiErrorObj := app.ParseQueryRangeParams(r)
if apiErrorObj != nil {
zap.S().Errorf(apiErrorObj.Err.Error())
RespondError(w, apiErrorObj, nil)
return
}
limits, err := aH.AppDao().GetQueryLimits(r.Context())
if err != nil {
zap.S().Errorf("Error while getting query limits: %v", err)
// We don't want to fail the request if we can't get the limits
// iterate over the nil slice will not execute the loop
// and the query will be executed without any limits
// this shouldn't happen ideally but we don't want to fail the request
}
var timeSeriesLimt int
for _, limit := range limits {
if limit.Name == "time_series_limit" {
timeSeriesLimt = limit.UsageLimit
}
}
if len(queryRangeParams.CompositeQuery.BuilderQueries) > 0 {
for idx := range queryRangeParams.CompositeQuery.BuilderQueries {
queryRangeParams.CompositeQuery.BuilderQueries[idx].QueryLimits = v3.QueryLimits{
MaxTimeSeries: timeSeriesLimt,
}
// TODO(srikanthccv): should apply to explore page also
if !strings.Contains(queryRangeParams.SourcePage, "dashboard") {
queryRangeParams.CompositeQuery.BuilderQueries[idx].QueryLimits.MaxTimeSeries = 0
}
}
}
// add temporality for each metric
temporalityErr := aH.APIHandler.AddTemporality(r.Context(), queryRangeParams)
if temporalityErr != nil {
zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr)
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: temporalityErr}, nil)
return
}
aH.APIHandler.ExecQueryRangeV3(r.Context(), queryRangeParams, w, r)
}

View File

@@ -42,7 +42,6 @@ import (
baseconst "go.signoz.io/signoz/pkg/query-service/constants" baseconst "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/healthcheck" "go.signoz.io/signoz/pkg/query-service/healthcheck"
basealm "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" basealm "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
baseint "go.signoz.io/signoz/pkg/query-service/interfaces"
basemodel "go.signoz.io/signoz/pkg/query-service/model" basemodel "go.signoz.io/signoz/pkg/query-service/model"
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine" pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
rules "go.signoz.io/signoz/pkg/query-service/rules" rules "go.signoz.io/signoz/pkg/query-service/rules"
@@ -87,7 +86,7 @@ type Server struct {
privateHTTP *http.Server privateHTTP *http.Server
// feature flags // feature flags
featureLookup baseint.FeatureLookup featureLookup baseInterface.FeatureLookup
// Usage manager // Usage manager
usageManager *usage.Manager usageManager *usage.Manager
@@ -641,7 +640,7 @@ func makeRulesManager(
alertManagerURL string, alertManagerURL string,
ruleRepoURL string, ruleRepoURL string,
db *sqlx.DB, db *sqlx.DB,
ch baseint.Reader, ch baseInterface.Reader,
disableRules bool, disableRules bool,
fm baseInterface.FeatureLookup) (*rules.Manager, error) { fm baseInterface.FeatureLookup) (*rules.Manager, error) {

View File

@@ -39,4 +39,8 @@ type ModelDao interface {
GetUserByPAT(ctx context.Context, token string) (*basemodel.UserPayload, basemodel.BaseApiError) GetUserByPAT(ctx context.Context, token string) (*basemodel.UserPayload, basemodel.BaseApiError)
ListPATs(ctx context.Context, userID string) ([]model.PAT, basemodel.BaseApiError) ListPATs(ctx context.Context, userID string) ([]model.PAT, basemodel.BaseApiError)
DeletePAT(ctx context.Context, id string) basemodel.BaseApiError DeletePAT(ctx context.Context, id string) basemodel.BaseApiError
GetQueryLimits(ctx context.Context) ([]*model.QueryLimit, error)
AddQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error
UpdateQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error
} }

View File

@@ -0,0 +1,54 @@
package sqlite
import (
"context"
"go.signoz.io/signoz/ee/query-service/model"
)
// GetQueryLimits returns the query limits
func (m *modelDao) GetQueryLimits(ctx context.Context) ([]*model.QueryLimit, error) {
query := `SELECT name, title, usage_limit FROM resource_usage_limits`
var queryLimits []*model.QueryLimit
err := m.DB().Select(&queryLimits, query)
if err != nil {
return nil, err
}
return queryLimits, nil
}
// AddQueryLimits adds the query limits
func (m *modelDao) AddQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error {
tx := m.DB().MustBegin()
for _, queryLimit := range queryLimits {
query := `INSERT INTO resource_usage_limits (name, title, usage_limit) VALUES (?, ?, ?)`
_, err := tx.Exec(query, queryLimit.Name, queryLimit.Title, queryLimit.UsageLimit)
if err != nil {
tx.Rollback()
return err
}
}
err := tx.Commit()
if err != nil {
return err
}
return nil
}
// UpdateQueryLimits updates the query limits
func (m *modelDao) UpdateQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error {
tx := m.DB().MustBegin()
for _, queryLimit := range queryLimits {
query := `UPDATE resource_usage_limits SET usage_limit = ? WHERE name = ?`
_, err := tx.Exec(query, queryLimit.UsageLimit, queryLimit.Name)
if err != nil {
tx.Rollback()
return err
}
}
err := tx.Commit()
if err != nil {
return err
}
return nil
}

View File

@@ -38,7 +38,7 @@ func InitDB(dataSourceName string) (*modelDao, error) {
basedao.SetDB(dao) basedao.SetDB(dao)
m := &modelDao{ModelDaoSqlite: dao} m := &modelDao{ModelDaoSqlite: dao}
table_schema := ` tableSchema := `
PRAGMA foreign_keys = ON; PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS org_domains( CREATE TABLE IF NOT EXISTS org_domains(
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
@@ -60,11 +60,23 @@ func InitDB(dataSourceName string) (*modelDao, error) {
); );
` `
_, err = m.DB().Exec(table_schema) _, err = m.DB().Exec(tableSchema)
if err != nil { if err != nil {
return nil, fmt.Errorf("error in creating tables: %v", err.Error()) return nil, fmt.Errorf("error in creating tables: %v", err.Error())
} }
// create resource_usage_limits table
tableSchema = `CREATE TABLE IF NOT EXISTS resource_usage_limits (
name TEXT PRIMARY KEY,
title TEXT,
usage_limit INTEGER DEFAULT 0
);`
_, err = m.DB().Exec(tableSchema)
if err != nil {
return nil, fmt.Errorf("Error in creating resource_usage_limits table: %s", err.Error())
}
return m, nil return m, nil
} }

View File

@@ -9,8 +9,8 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/pkg/errors" "github.com/pkg/errors"
saml2 "github.com/russellhaering/gosaml2" saml2 "github.com/russellhaering/gosaml2"
"go.signoz.io/signoz/ee/query-service/sso/saml"
"go.signoz.io/signoz/ee/query-service/sso" "go.signoz.io/signoz/ee/query-service/sso"
"go.signoz.io/signoz/ee/query-service/sso/saml"
basemodel "go.signoz.io/signoz/pkg/query-service/model" basemodel "go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap" "go.uber.org/zap"
) )
@@ -24,16 +24,16 @@ const (
// OrgDomain identify org owned web domains for auth and other purposes // OrgDomain identify org owned web domains for auth and other purposes
type OrgDomain struct { type OrgDomain struct {
Id uuid.UUID `json:"id"` Id uuid.UUID `json:"id"`
Name string `json:"name"` Name string `json:"name"`
OrgId string `json:"orgId"` OrgId string `json:"orgId"`
SsoEnabled bool `json:"ssoEnabled"` SsoEnabled bool `json:"ssoEnabled"`
SsoType SSOType `json:"ssoType"` SsoType SSOType `json:"ssoType"`
SamlConfig *SamlConfig `json:"samlConfig"` SamlConfig *SamlConfig `json:"samlConfig"`
GoogleAuthConfig *GoogleOAuthConfig `json:"googleAuthConfig"` GoogleAuthConfig *GoogleOAuthConfig `json:"googleAuthConfig"`
Org *basemodel.Organization Org *basemodel.Organization
} }
func (od *OrgDomain) String() string { func (od *OrgDomain) String() string {
@@ -100,8 +100,8 @@ func (od *OrgDomain) GetSAMLCert() string {
return "" return ""
} }
// PrepareGoogleOAuthProvider creates GoogleProvider that is used in // PrepareGoogleOAuthProvider creates GoogleProvider that is used in
// requesting OAuth and also used in processing response from google // requesting OAuth and also used in processing response from google
func (od *OrgDomain) PrepareGoogleOAuthProvider(siteUrl *url.URL) (sso.OAuthCallbackProvider, error) { func (od *OrgDomain) PrepareGoogleOAuthProvider(siteUrl *url.URL) (sso.OAuthCallbackProvider, error) {
if od.GoogleAuthConfig == nil { if od.GoogleAuthConfig == nil {
return nil, fmt.Errorf("Google auth is not setup correctly for this domain") return nil, fmt.Errorf("Google auth is not setup correctly for this domain")
@@ -137,38 +137,36 @@ func (od *OrgDomain) PrepareSamlRequest(siteUrl *url.URL) (*saml2.SAMLServicePro
} }
func (od *OrgDomain) BuildSsoUrl(siteUrl *url.URL) (ssoUrl string, err error) { func (od *OrgDomain) BuildSsoUrl(siteUrl *url.URL) (ssoUrl string, err error) {
fmtDomainId := strings.Replace(od.Id.String(), "-", ":", -1) fmtDomainId := strings.Replace(od.Id.String(), "-", ":", -1)
// build redirect url from window.location sent by frontend // build redirect url from window.location sent by frontend
redirectURL := fmt.Sprintf("%s://%s%s", siteUrl.Scheme, siteUrl.Host, siteUrl.Path) redirectURL := fmt.Sprintf("%s://%s%s", siteUrl.Scheme, siteUrl.Host, siteUrl.Path)
// prepare state that gets relayed back when the auth provider // prepare state that gets relayed back when the auth provider
// calls back our url. here we pass the app url (where signoz runs) // calls back our url. here we pass the app url (where signoz runs)
// and the domain Id. The domain Id helps in identifying sso config // and the domain Id. The domain Id helps in identifying sso config
// when the call back occurs and the app url is useful in redirecting user // when the call back occurs and the app url is useful in redirecting user
// back to the right path. // back to the right path.
// why do we need to pass app url? the callback typically is handled by backend // why do we need to pass app url? the callback typically is handled by backend
// and sometimes backend might right at a different port or is unaware of frontend // and sometimes backend might right at a different port or is unaware of frontend
// endpoint (unless SITE_URL param is set). hence, we receive this build sso request // endpoint (unless SITE_URL param is set). hence, we receive this build sso request
// along with frontend window.location and use it to relay the information through // along with frontend window.location and use it to relay the information through
// auth provider to the backend (HandleCallback or HandleSSO method). // auth provider to the backend (HandleCallback or HandleSSO method).
relayState := fmt.Sprintf("%s?domainId=%s", redirectURL, fmtDomainId) relayState := fmt.Sprintf("%s?domainId=%s", redirectURL, fmtDomainId)
switch (od.SsoType) { switch od.SsoType {
case SAML: case SAML:
sp, err := od.PrepareSamlRequest(siteUrl) sp, err := od.PrepareSamlRequest(siteUrl)
if err != nil { if err != nil {
return "", err return "", err
} }
return sp.BuildAuthURL(relayState) return sp.BuildAuthURL(relayState)
case GoogleAuth: case GoogleAuth:
googleProvider, err := od.PrepareGoogleOAuthProvider(siteUrl) googleProvider, err := od.PrepareGoogleOAuthProvider(siteUrl)
if err != nil { if err != nil {
return "", err return "", err
@@ -177,8 +175,7 @@ func (od *OrgDomain) BuildSsoUrl(siteUrl *url.URL) (ssoUrl string, err error) {
default: default:
zap.S().Errorf("found unsupported SSO config for the org domain", zap.String("orgDomain", od.Name)) zap.S().Errorf("found unsupported SSO config for the org domain", zap.String("orgDomain", od.Name))
return "", fmt.Errorf("unsupported SSO config for the domain") return "", fmt.Errorf("unsupported SSO config for the domain")
} }
} }

View File

@@ -0,0 +1,12 @@
package model
type QueryLimit struct {
// Name of the query limit
Name string `db:"name" json:"name"`
// Title of the query limit
Title string `db:"title" json:"title"`
// UsageLimit indicates the usage limit of the query
// If the usage limit is 0, then there is no limit
UsageLimit int `db:"usage_limit" json:"usage_limit"`
}

View File

@@ -4491,6 +4491,14 @@ func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNam
seriesToPoints[key] = append(seriesToPoints[key], metricPoint) seriesToPoints[key] = append(seriesToPoints[key], metricPoint)
} }
if rows.Err() != nil {
zap.S().Errorf("error while reading time series result %v", rows.Err())
if strings.Contains(rows.Err().Error(), "code: 191") {
return nil, fmt.Errorf(`Exceeded max number of series per query. Please reduce the number of series by applying filters.`)
}
return nil, rows.Err()
}
var seriesList []*v3.Series var seriesList []*v3.Series
for _, key := range keys { for _, key := range keys {
points := seriesToPoints[key] points := seriesToPoints[key]

View File

@@ -514,7 +514,7 @@ func (aH *APIHandler) metricAutocompleteTagValue(w http.ResponseWriter, r *http.
aH.Respond(w, tagValueList) aH.Respond(w, tagValueList)
} }
func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error { func (aH *APIHandler) AddTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
metricNames := make([]string, 0) metricNames := make([]string, 0)
metricNameToTemporality := make(map[string]map[v3.Temporality]bool) metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
@@ -3014,7 +3014,7 @@ func (aH *APIHandler) QueryRangeV3Format(w http.ResponseWriter, r *http.Request)
aH.Respond(w, queryRangeParams) aH.Respond(w, queryRangeParams)
} }
func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) ExecQueryRangeV3(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) {
var result []*v3.Result var result []*v3.Result
var err error var err error
@@ -3080,14 +3080,14 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
// add temporality for each metric // add temporality for each metric
temporalityErr := aH.addTemporality(r.Context(), queryRangeParams) temporalityErr := aH.AddTemporality(r.Context(), queryRangeParams)
if temporalityErr != nil { if temporalityErr != nil {
zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr) zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr)
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil) RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)
return return
} }
aH.queryRangeV3(r.Context(), queryRangeParams, w, r) aH.ExecQueryRangeV3(r.Context(), queryRangeParams, w, r)
} }
func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) { func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {

View File

@@ -38,7 +38,7 @@ func TestPanelTableForCumulative(t *testing.T) {
}, },
Expression: "A", Expression: "A",
}, },
expected: "SELECT toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts", expected: "SELECT toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts",
}, },
{ {
name: "latency p50", name: "latency p50",
@@ -61,7 +61,7 @@ func TestPanelTableForCumulative(t *testing.T) {
}, },
Expression: "A", Expression: "A",
}, },
expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, le ORDER BY fingerprint, le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts", expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, le ORDER BY fingerprint, le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
}, },
{ {
name: "latency p99 with group by", name: "latency p99 with group by",
@@ -80,7 +80,7 @@ func TestPanelTableForCumulative(t *testing.T) {
}, },
Expression: "A", Expression: "A",
}, },
expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT service_name,le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, service_name,le ORDER BY fingerprint, service_name ASC,le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT service_name,le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, service_name,le ORDER BY fingerprint, service_name ASC,le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
}, },
} }

View File

@@ -38,7 +38,7 @@ func TestPanelTableForDelta(t *testing.T) {
}, },
Expression: "A", Expression: "A",
}, },
expected: "SELECT toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY ts ORDER BY ts", expected: "SELECT toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY ts ORDER BY ts",
}, },
{ {
name: "latency p50", name: "latency p50",
@@ -61,7 +61,7 @@ func TestPanelTableForDelta(t *testing.T) {
}, },
Expression: "A", Expression: "A",
}, },
expected: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts", expected: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
}, },
{ {
name: "latency p99 with group by", name: "latency p99 with group by",
@@ -80,7 +80,7 @@ func TestPanelTableForDelta(t *testing.T) {
}, },
Expression: "A", Expression: "A",
}, },
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
}, },
} }

View File

@@ -132,7 +132,7 @@ func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.Attribut
} }
} }
filterSubQuery := fmt.Sprintf("SELECT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, queryString) filterSubQuery := fmt.Sprintf("SELECT DISTINCT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, queryString)
return filterSubQuery, nil return filterSubQuery, nil
} }
@@ -502,6 +502,18 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
if panelType == v3.PanelTypeValue { if panelType == v3.PanelTypeValue {
query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator) query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator)
} }
// We want to prevent the query from returning too many time series, which can cause
// excessive resource usage and slow query times. This usually happens when there are
// no filters on the query.
//
// We use the DISTINCT to remove duplicate time series, and use the max_rows_in_distinct
// setting to throw an error if the number of time series is too high.
// See https://clickhouse.com/docs/en/operations/settings/query-complexity#max-rows-in-distinct
//
// This can be disabled by setting the limit to 0.
if mq.QueryLimits.MaxTimeSeries != 0 {
query = query + " SETTINGS max_rows_in_distinct = " + fmt.Sprintf("%d", mq.QueryLimits.MaxTimeSeries)
}
return query, err return query, err
} }

View File

@@ -245,7 +245,7 @@ func TestBuildQueryOperators(t *testing.T) {
func TestBuildQueryXRate(t *testing.T) { func TestBuildQueryXRate(t *testing.T) {
t.Run("TestBuildQueryXRate", func(t *testing.T) { t.Run("TestBuildQueryXRate", func(t *testing.T) {
tmpl := `SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts` tmpl := `SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts`
cases := []struct { cases := []struct {
aggregateOperator v3.AggregateOperator aggregateOperator v3.AggregateOperator
@@ -298,7 +298,7 @@ func TestBuildQueryXRate(t *testing.T) {
func TestBuildQueryRPM(t *testing.T) { func TestBuildQueryRPM(t *testing.T) {
t.Run("TestBuildQueryXRate", func(t *testing.T) { t.Run("TestBuildQueryXRate", func(t *testing.T) {
tmpl := `SELECT ts, ceil(value * 60) as value FROM (SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts)` tmpl := `SELECT ts, ceil(value * 60) as value FROM (SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts)`
cases := []struct { cases := []struct {
aggregateOperator v3.AggregateOperator aggregateOperator v3.AggregateOperator
@@ -520,3 +520,34 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
}) })
} }
} }
func TestBuildQueryLimits(t *testing.T) {
t.Run("TestBuildQueryLimits", func(t *testing.T) {
q := &v3.QueryRangeParamsV3{
Start: 1650991982000,
End: 1651078382000,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
AggregateAttribute: v3.AttributeKey{Key: "name"},
Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "a"}, Value: "b", Operator: v3.FilterOperatorNotEqual},
{Key: v3.AttributeKey{Key: "code"}, Value: "ERROR_*", Operator: v3.FilterOperatorNotRegex},
}},
AggregateOperator: v3.AggregateOperatorRateMax,
Expression: "A",
QueryLimits: v3.QueryLimits{
MaxTimeSeries: 100,
},
},
},
},
}
query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"], Options{PreferRPM: false})
require.NoError(t, err)
require.Contains(t, query, "SETTINGS max_rows_in_distinct = 100")
})
}

View File

@@ -145,7 +145,14 @@ func (q *querier) runBuilderQuery(
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached. // We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it. // If the query is not cached, we execute the query and return the result without caching it.
if _, ok := cacheKeys[queryName]; !ok { if _, ok := cacheKeys[queryName]; !ok {
query, err := metricsV3.PrepareMetricQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM}) query, err := metricsV3.PrepareMetricQuery(
params.Start,
params.End,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
metricsV3.Options{PreferRPM: preferRPM},
)
if err != nil { if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return return
@@ -175,7 +182,7 @@ func (q *querier) runBuilderQuery(
params.CompositeQuery.QueryType, params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType, params.CompositeQuery.PanelType,
builderQuery, builderQuery,
metricsV3.Options{}, metricsV3.Options{PreferRPM: preferRPM},
) )
if err != nil { if err != nil {
ch <- channelResult{ ch <- channelResult{

View File

@@ -233,7 +233,8 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...in
queries[queryName] = queryString queries[queryName] = queryString
} }
case v3.DataSourceMetrics: case v3.DataSourceMetrics:
queryString, err := qb.options.BuildMetricQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, metricsV3.Options{PreferRPM: PreferRPMFeatureEnabled}) metricsOptions := metricsV3.Options{PreferRPM: PreferRPMFeatureEnabled}
queryString, err := qb.options.BuildMetricQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, metricsOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -257,7 +257,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
}, },
}, },
queryToTest: "A", queryToTest: "A",
expected: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts", expected: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts",
}, },
{ {
name: "TestQueryWithExpression - Error rate", name: "TestQueryWithExpression - Error rate",
@@ -327,7 +327,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
}, },
}, },
queryToTest: "C", queryToTest: "C",
expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`", expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`",
}, },
{ {
name: "TestQuery - Quantile", name: "TestQuery - Quantile",
@@ -354,7 +354,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
}, },
}, },
queryToTest: "A", queryToTest: "A",
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
}, },
} }

View File

@@ -353,6 +353,7 @@ type QueryRangeParamsV3 struct {
CompositeQuery *CompositeQuery `json:"compositeQuery"` CompositeQuery *CompositeQuery `json:"compositeQuery"`
Variables map[string]interface{} `json:"variables,omitempty"` Variables map[string]interface{} `json:"variables,omitempty"`
NoCache bool `json:"noCache"` NoCache bool `json:"noCache"`
SourcePage string `json:"sourcePage"`
} }
type PromQuery struct { type PromQuery struct {
@@ -501,6 +502,11 @@ type BuilderQuery struct {
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"` TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"` SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
Functions []Function `json:"functions,omitempty"` Functions []Function `json:"functions,omitempty"`
QueryLimits QueryLimits
}
type QueryLimits struct {
MaxTimeSeries int
} }
func (b *BuilderQuery) Validate() error { func (b *BuilderQuery) Validate() error {