Compare commits
4 Commits
main
...
query-limi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ec21d1bca1 | ||
|
|
52e98bcddf | ||
|
|
a8b7e9582a | ||
|
|
4a042fa413 |
@@ -150,6 +150,7 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *baseapp.AuthMiddlew
|
||||
router.HandleFunc("/api/v1/login", am.OpenAccess(ah.loginUser)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/traces/{traceId}", am.ViewAccess(ah.searchTraces)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v2/metrics/query_range", am.ViewAccess(ah.queryRangeMetricsV2)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v3/query_range", am.ViewAccess(ah.QueryRangeV3)).Methods(http.MethodPost)
|
||||
|
||||
// PAT APIs
|
||||
router.HandleFunc("/api/v1/pat", am.OpenAccess(ah.createPAT)).Methods(http.MethodPost)
|
||||
@@ -163,6 +164,10 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *baseapp.AuthMiddlew
|
||||
router.HandleFunc("/api/v1/dashboards/{uuid}/lock", am.EditAccess(ah.lockDashboard)).Methods(http.MethodPut)
|
||||
router.HandleFunc("/api/v1/dashboards/{uuid}/unlock", am.EditAccess(ah.unlockDashboard)).Methods(http.MethodPut)
|
||||
|
||||
router.HandleFunc("/api/v1/limits", am.AdminAccess(ah.getQueryLimits)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/limits", am.AdminAccess(ah.addQueryLimits)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/limits", am.AdminAccess(ah.updateQueryLimits)).Methods(http.MethodPut)
|
||||
|
||||
router.HandleFunc("/api/v2/licenses",
|
||||
am.ViewAccess(ah.listLicensesV2)).
|
||||
Methods(http.MethodGet)
|
||||
|
||||
51
ee/query-service/app/api/limits.go
Normal file
51
ee/query-service/app/api/limits.go
Normal file
@@ -0,0 +1,51 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"go.signoz.io/signoz/ee/query-service/model"
|
||||
basemodel "go.signoz.io/signoz/pkg/query-service/model"
|
||||
)
|
||||
|
||||
func (aH *APIHandler) getQueryLimits(w http.ResponseWriter, r *http.Request) {
|
||||
queryLimits, err := aH.AppDao().GetQueryLimits(r.Context())
|
||||
if err != nil {
|
||||
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, queryLimits)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) addQueryLimits(w http.ResponseWriter, r *http.Request) {
|
||||
var queryLimits []*model.QueryLimit
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&queryLimits); err != nil {
|
||||
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
err := aH.AppDao().AddQueryLimits(r.Context(), queryLimits)
|
||||
if err != nil {
|
||||
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, nil)
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) updateQueryLimits(w http.ResponseWriter, r *http.Request) {
|
||||
var queryLimits []*model.QueryLimit
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&queryLimits); err != nil {
|
||||
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
err := aH.AppDao().UpdateQueryLimits(r.Context(), queryLimits)
|
||||
if err != nil {
|
||||
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, nil)
|
||||
}
|
||||
@@ -4,14 +4,17 @@ import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/app"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/metrics"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/parser"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
basemodel "go.signoz.io/signoz/pkg/query-service/model"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@@ -234,3 +237,52 @@ func (ah *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
|
||||
resp := ResponseFormat{ResultType: "matrix", Result: seriesList, TableName: tableName}
|
||||
ah.Respond(w, resp)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
|
||||
queryRangeParams, apiErrorObj := app.ParseQueryRangeParams(r)
|
||||
|
||||
if apiErrorObj != nil {
|
||||
zap.S().Errorf(apiErrorObj.Err.Error())
|
||||
RespondError(w, apiErrorObj, nil)
|
||||
return
|
||||
}
|
||||
|
||||
limits, err := aH.AppDao().GetQueryLimits(r.Context())
|
||||
if err != nil {
|
||||
zap.S().Errorf("Error while getting query limits: %v", err)
|
||||
// We don't want to fail the request if we can't get the limits
|
||||
// iterate over the nil slice will not execute the loop
|
||||
// and the query will be executed without any limits
|
||||
// this shouldn't happen ideally but we don't want to fail the request
|
||||
}
|
||||
|
||||
var timeSeriesLimt int
|
||||
for _, limit := range limits {
|
||||
if limit.Name == "time_series_limit" {
|
||||
timeSeriesLimt = limit.UsageLimit
|
||||
}
|
||||
}
|
||||
|
||||
if len(queryRangeParams.CompositeQuery.BuilderQueries) > 0 {
|
||||
for idx := range queryRangeParams.CompositeQuery.BuilderQueries {
|
||||
queryRangeParams.CompositeQuery.BuilderQueries[idx].QueryLimits = v3.QueryLimits{
|
||||
MaxTimeSeries: timeSeriesLimt,
|
||||
}
|
||||
// TODO(srikanthccv): should apply to explore page also
|
||||
if !strings.Contains(queryRangeParams.SourcePage, "dashboard") {
|
||||
queryRangeParams.CompositeQuery.BuilderQueries[idx].QueryLimits.MaxTimeSeries = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// add temporality for each metric
|
||||
|
||||
temporalityErr := aH.APIHandler.AddTemporality(r.Context(), queryRangeParams)
|
||||
if temporalityErr != nil {
|
||||
zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr)
|
||||
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: temporalityErr}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.APIHandler.ExecQueryRangeV3(r.Context(), queryRangeParams, w, r)
|
||||
}
|
||||
|
||||
@@ -42,7 +42,6 @@ import (
|
||||
baseconst "go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/healthcheck"
|
||||
basealm "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
|
||||
baseint "go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
basemodel "go.signoz.io/signoz/pkg/query-service/model"
|
||||
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
|
||||
rules "go.signoz.io/signoz/pkg/query-service/rules"
|
||||
@@ -87,7 +86,7 @@ type Server struct {
|
||||
privateHTTP *http.Server
|
||||
|
||||
// feature flags
|
||||
featureLookup baseint.FeatureLookup
|
||||
featureLookup baseInterface.FeatureLookup
|
||||
|
||||
// Usage manager
|
||||
usageManager *usage.Manager
|
||||
@@ -641,7 +640,7 @@ func makeRulesManager(
|
||||
alertManagerURL string,
|
||||
ruleRepoURL string,
|
||||
db *sqlx.DB,
|
||||
ch baseint.Reader,
|
||||
ch baseInterface.Reader,
|
||||
disableRules bool,
|
||||
fm baseInterface.FeatureLookup) (*rules.Manager, error) {
|
||||
|
||||
|
||||
@@ -39,4 +39,8 @@ type ModelDao interface {
|
||||
GetUserByPAT(ctx context.Context, token string) (*basemodel.UserPayload, basemodel.BaseApiError)
|
||||
ListPATs(ctx context.Context, userID string) ([]model.PAT, basemodel.BaseApiError)
|
||||
DeletePAT(ctx context.Context, id string) basemodel.BaseApiError
|
||||
|
||||
GetQueryLimits(ctx context.Context) ([]*model.QueryLimit, error)
|
||||
AddQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error
|
||||
UpdateQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error
|
||||
}
|
||||
|
||||
54
ee/query-service/dao/sqlite/limits.go
Normal file
54
ee/query-service/dao/sqlite/limits.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.signoz.io/signoz/ee/query-service/model"
|
||||
)
|
||||
|
||||
// GetQueryLimits returns the query limits
|
||||
func (m *modelDao) GetQueryLimits(ctx context.Context) ([]*model.QueryLimit, error) {
|
||||
query := `SELECT name, title, usage_limit FROM resource_usage_limits`
|
||||
var queryLimits []*model.QueryLimit
|
||||
err := m.DB().Select(&queryLimits, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return queryLimits, nil
|
||||
}
|
||||
|
||||
// AddQueryLimits adds the query limits
|
||||
func (m *modelDao) AddQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error {
|
||||
tx := m.DB().MustBegin()
|
||||
for _, queryLimit := range queryLimits {
|
||||
query := `INSERT INTO resource_usage_limits (name, title, usage_limit) VALUES (?, ?, ?)`
|
||||
_, err := tx.Exec(query, queryLimit.Name, queryLimit.Title, queryLimit.UsageLimit)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
}
|
||||
err := tx.Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateQueryLimits updates the query limits
|
||||
func (m *modelDao) UpdateQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error {
|
||||
tx := m.DB().MustBegin()
|
||||
for _, queryLimit := range queryLimits {
|
||||
query := `UPDATE resource_usage_limits SET usage_limit = ? WHERE name = ?`
|
||||
_, err := tx.Exec(query, queryLimit.UsageLimit, queryLimit.Name)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
}
|
||||
err := tx.Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -38,7 +38,7 @@ func InitDB(dataSourceName string) (*modelDao, error) {
|
||||
basedao.SetDB(dao)
|
||||
m := &modelDao{ModelDaoSqlite: dao}
|
||||
|
||||
table_schema := `
|
||||
tableSchema := `
|
||||
PRAGMA foreign_keys = ON;
|
||||
CREATE TABLE IF NOT EXISTS org_domains(
|
||||
id TEXT PRIMARY KEY,
|
||||
@@ -60,11 +60,23 @@ func InitDB(dataSourceName string) (*modelDao, error) {
|
||||
);
|
||||
`
|
||||
|
||||
_, err = m.DB().Exec(table_schema)
|
||||
_, err = m.DB().Exec(tableSchema)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error in creating tables: %v", err.Error())
|
||||
}
|
||||
|
||||
// create resource_usage_limits table
|
||||
tableSchema = `CREATE TABLE IF NOT EXISTS resource_usage_limits (
|
||||
name TEXT PRIMARY KEY,
|
||||
title TEXT,
|
||||
usage_limit INTEGER DEFAULT 0
|
||||
);`
|
||||
|
||||
_, err = m.DB().Exec(tableSchema)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error in creating resource_usage_limits table: %s", err.Error())
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -9,8 +9,8 @@ import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
saml2 "github.com/russellhaering/gosaml2"
|
||||
"go.signoz.io/signoz/ee/query-service/sso/saml"
|
||||
"go.signoz.io/signoz/ee/query-service/sso"
|
||||
"go.signoz.io/signoz/ee/query-service/sso/saml"
|
||||
basemodel "go.signoz.io/signoz/pkg/query-service/model"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@@ -24,16 +24,16 @@ const (
|
||||
|
||||
// OrgDomain identify org owned web domains for auth and other purposes
|
||||
type OrgDomain struct {
|
||||
Id uuid.UUID `json:"id"`
|
||||
Name string `json:"name"`
|
||||
OrgId string `json:"orgId"`
|
||||
SsoEnabled bool `json:"ssoEnabled"`
|
||||
SsoType SSOType `json:"ssoType"`
|
||||
Id uuid.UUID `json:"id"`
|
||||
Name string `json:"name"`
|
||||
OrgId string `json:"orgId"`
|
||||
SsoEnabled bool `json:"ssoEnabled"`
|
||||
SsoType SSOType `json:"ssoType"`
|
||||
|
||||
SamlConfig *SamlConfig `json:"samlConfig"`
|
||||
SamlConfig *SamlConfig `json:"samlConfig"`
|
||||
GoogleAuthConfig *GoogleOAuthConfig `json:"googleAuthConfig"`
|
||||
|
||||
Org *basemodel.Organization
|
||||
Org *basemodel.Organization
|
||||
}
|
||||
|
||||
func (od *OrgDomain) String() string {
|
||||
@@ -100,8 +100,8 @@ func (od *OrgDomain) GetSAMLCert() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// PrepareGoogleOAuthProvider creates GoogleProvider that is used in
|
||||
// requesting OAuth and also used in processing response from google
|
||||
// PrepareGoogleOAuthProvider creates GoogleProvider that is used in
|
||||
// requesting OAuth and also used in processing response from google
|
||||
func (od *OrgDomain) PrepareGoogleOAuthProvider(siteUrl *url.URL) (sso.OAuthCallbackProvider, error) {
|
||||
if od.GoogleAuthConfig == nil {
|
||||
return nil, fmt.Errorf("Google auth is not setup correctly for this domain")
|
||||
@@ -137,38 +137,36 @@ func (od *OrgDomain) PrepareSamlRequest(siteUrl *url.URL) (*saml2.SAMLServicePro
|
||||
}
|
||||
|
||||
func (od *OrgDomain) BuildSsoUrl(siteUrl *url.URL) (ssoUrl string, err error) {
|
||||
|
||||
|
||||
fmtDomainId := strings.Replace(od.Id.String(), "-", ":", -1)
|
||||
|
||||
|
||||
// build redirect url from window.location sent by frontend
|
||||
redirectURL := fmt.Sprintf("%s://%s%s", siteUrl.Scheme, siteUrl.Host, siteUrl.Path)
|
||||
|
||||
// prepare state that gets relayed back when the auth provider
|
||||
// calls back our url. here we pass the app url (where signoz runs)
|
||||
// and the domain Id. The domain Id helps in identifying sso config
|
||||
// when the call back occurs and the app url is useful in redirecting user
|
||||
// back to the right path.
|
||||
// when the call back occurs and the app url is useful in redirecting user
|
||||
// back to the right path.
|
||||
// why do we need to pass app url? the callback typically is handled by backend
|
||||
// and sometimes backend might right at a different port or is unaware of frontend
|
||||
// endpoint (unless SITE_URL param is set). hence, we receive this build sso request
|
||||
// along with frontend window.location and use it to relay the information through
|
||||
// auth provider to the backend (HandleCallback or HandleSSO method).
|
||||
// along with frontend window.location and use it to relay the information through
|
||||
// auth provider to the backend (HandleCallback or HandleSSO method).
|
||||
relayState := fmt.Sprintf("%s?domainId=%s", redirectURL, fmtDomainId)
|
||||
|
||||
|
||||
switch (od.SsoType) {
|
||||
switch od.SsoType {
|
||||
case SAML:
|
||||
|
||||
sp, err := od.PrepareSamlRequest(siteUrl)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
|
||||
return sp.BuildAuthURL(relayState)
|
||||
|
||||
|
||||
case GoogleAuth:
|
||||
|
||||
|
||||
googleProvider, err := od.PrepareGoogleOAuthProvider(siteUrl)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@@ -177,8 +175,7 @@ func (od *OrgDomain) BuildSsoUrl(siteUrl *url.URL) (ssoUrl string, err error) {
|
||||
|
||||
default:
|
||||
zap.S().Errorf("found unsupported SSO config for the org domain", zap.String("orgDomain", od.Name))
|
||||
return "", fmt.Errorf("unsupported SSO config for the domain")
|
||||
return "", fmt.Errorf("unsupported SSO config for the domain")
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
12
ee/query-service/model/limits.go
Normal file
12
ee/query-service/model/limits.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package model
|
||||
|
||||
type QueryLimit struct {
|
||||
// Name of the query limit
|
||||
Name string `db:"name" json:"name"`
|
||||
|
||||
// Title of the query limit
|
||||
Title string `db:"title" json:"title"`
|
||||
// UsageLimit indicates the usage limit of the query
|
||||
// If the usage limit is 0, then there is no limit
|
||||
UsageLimit int `db:"usage_limit" json:"usage_limit"`
|
||||
}
|
||||
@@ -4491,6 +4491,14 @@ func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNam
|
||||
seriesToPoints[key] = append(seriesToPoints[key], metricPoint)
|
||||
}
|
||||
|
||||
if rows.Err() != nil {
|
||||
zap.S().Errorf("error while reading time series result %v", rows.Err())
|
||||
if strings.Contains(rows.Err().Error(), "code: 191") {
|
||||
return nil, fmt.Errorf(`Exceeded max number of series per query. Please reduce the number of series by applying filters.`)
|
||||
}
|
||||
return nil, rows.Err()
|
||||
}
|
||||
|
||||
var seriesList []*v3.Series
|
||||
for _, key := range keys {
|
||||
points := seriesToPoints[key]
|
||||
|
||||
@@ -514,7 +514,7 @@ func (aH *APIHandler) metricAutocompleteTagValue(w http.ResponseWriter, r *http.
|
||||
aH.Respond(w, tagValueList)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
|
||||
func (aH *APIHandler) AddTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
|
||||
|
||||
metricNames := make([]string, 0)
|
||||
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
|
||||
@@ -3014,7 +3014,7 @@ func (aH *APIHandler) QueryRangeV3Format(w http.ResponseWriter, r *http.Request)
|
||||
aH.Respond(w, queryRangeParams)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) {
|
||||
func (aH *APIHandler) ExecQueryRangeV3(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
var result []*v3.Result
|
||||
var err error
|
||||
@@ -3080,14 +3080,14 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// add temporality for each metric
|
||||
|
||||
temporalityErr := aH.addTemporality(r.Context(), queryRangeParams)
|
||||
temporalityErr := aH.AddTemporality(r.Context(), queryRangeParams)
|
||||
if temporalityErr != nil {
|
||||
zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr)
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.queryRangeV3(r.Context(), queryRangeParams, w, r)
|
||||
aH.ExecQueryRangeV3(r.Context(), queryRangeParams, w, r)
|
||||
}
|
||||
|
||||
func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {
|
||||
|
||||
@@ -38,7 +38,7 @@ func TestPanelTableForCumulative(t *testing.T) {
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts",
|
||||
expected: "SELECT toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "latency p50",
|
||||
@@ -61,7 +61,7 @@ func TestPanelTableForCumulative(t *testing.T) {
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, le ORDER BY fingerprint, le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
|
||||
expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, le ORDER BY fingerprint, le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "latency p99 with group by",
|
||||
@@ -80,7 +80,7 @@ func TestPanelTableForCumulative(t *testing.T) {
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT service_name,le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, service_name,le ORDER BY fingerprint, service_name ASC,le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT service_name,le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, service_name,le ORDER BY fingerprint, service_name ASC,le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ func TestPanelTableForDelta(t *testing.T) {
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY ts ORDER BY ts",
|
||||
expected: "SELECT toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "latency p50",
|
||||
@@ -61,7 +61,7 @@ func TestPanelTableForDelta(t *testing.T) {
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
|
||||
expected: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "latency p99 with group by",
|
||||
@@ -80,7 +80,7 @@ func TestPanelTableForDelta(t *testing.T) {
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -132,7 +132,7 @@ func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.Attribut
|
||||
}
|
||||
}
|
||||
|
||||
filterSubQuery := fmt.Sprintf("SELECT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, queryString)
|
||||
filterSubQuery := fmt.Sprintf("SELECT DISTINCT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, queryString)
|
||||
|
||||
return filterSubQuery, nil
|
||||
}
|
||||
@@ -502,6 +502,18 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
|
||||
if panelType == v3.PanelTypeValue {
|
||||
query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator)
|
||||
}
|
||||
// We want to prevent the query from returning too many time series, which can cause
|
||||
// excessive resource usage and slow query times. This usually happens when there are
|
||||
// no filters on the query.
|
||||
//
|
||||
// We use the DISTINCT to remove duplicate time series, and use the max_rows_in_distinct
|
||||
// setting to throw an error if the number of time series is too high.
|
||||
// See https://clickhouse.com/docs/en/operations/settings/query-complexity#max-rows-in-distinct
|
||||
//
|
||||
// This can be disabled by setting the limit to 0.
|
||||
if mq.QueryLimits.MaxTimeSeries != 0 {
|
||||
query = query + " SETTINGS max_rows_in_distinct = " + fmt.Sprintf("%d", mq.QueryLimits.MaxTimeSeries)
|
||||
}
|
||||
return query, err
|
||||
}
|
||||
|
||||
|
||||
@@ -245,7 +245,7 @@ func TestBuildQueryOperators(t *testing.T) {
|
||||
func TestBuildQueryXRate(t *testing.T) {
|
||||
t.Run("TestBuildQueryXRate", func(t *testing.T) {
|
||||
|
||||
tmpl := `SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts`
|
||||
tmpl := `SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts`
|
||||
|
||||
cases := []struct {
|
||||
aggregateOperator v3.AggregateOperator
|
||||
@@ -298,7 +298,7 @@ func TestBuildQueryXRate(t *testing.T) {
|
||||
func TestBuildQueryRPM(t *testing.T) {
|
||||
t.Run("TestBuildQueryXRate", func(t *testing.T) {
|
||||
|
||||
tmpl := `SELECT ts, ceil(value * 60) as value FROM (SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts)`
|
||||
tmpl := `SELECT ts, ceil(value * 60) as value FROM (SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts)`
|
||||
|
||||
cases := []struct {
|
||||
aggregateOperator v3.AggregateOperator
|
||||
@@ -520,3 +520,34 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildQueryLimits(t *testing.T) {
|
||||
t.Run("TestBuildQueryLimits", func(t *testing.T) {
|
||||
q := &v3.QueryRangeParamsV3{
|
||||
Start: 1650991982000,
|
||||
End: 1651078382000,
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||
"A": {
|
||||
QueryName: "A",
|
||||
StepInterval: 60,
|
||||
AggregateAttribute: v3.AttributeKey{Key: "name"},
|
||||
Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "a"}, Value: "b", Operator: v3.FilterOperatorNotEqual},
|
||||
{Key: v3.AttributeKey{Key: "code"}, Value: "ERROR_*", Operator: v3.FilterOperatorNotRegex},
|
||||
}},
|
||||
AggregateOperator: v3.AggregateOperatorRateMax,
|
||||
Expression: "A",
|
||||
QueryLimits: v3.QueryLimits{
|
||||
MaxTimeSeries: 100,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"], Options{PreferRPM: false})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Contains(t, query, "SETTINGS max_rows_in_distinct = 100")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -145,7 +145,14 @@ func (q *querier) runBuilderQuery(
|
||||
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
|
||||
// If the query is not cached, we execute the query and return the result without caching it.
|
||||
if _, ok := cacheKeys[queryName]; !ok {
|
||||
query, err := metricsV3.PrepareMetricQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM})
|
||||
query, err := metricsV3.PrepareMetricQuery(
|
||||
params.Start,
|
||||
params.End,
|
||||
params.CompositeQuery.QueryType,
|
||||
params.CompositeQuery.PanelType,
|
||||
builderQuery,
|
||||
metricsV3.Options{PreferRPM: preferRPM},
|
||||
)
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
|
||||
return
|
||||
@@ -175,7 +182,7 @@ func (q *querier) runBuilderQuery(
|
||||
params.CompositeQuery.QueryType,
|
||||
params.CompositeQuery.PanelType,
|
||||
builderQuery,
|
||||
metricsV3.Options{},
|
||||
metricsV3.Options{PreferRPM: preferRPM},
|
||||
)
|
||||
if err != nil {
|
||||
ch <- channelResult{
|
||||
|
||||
@@ -233,7 +233,8 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...in
|
||||
queries[queryName] = queryString
|
||||
}
|
||||
case v3.DataSourceMetrics:
|
||||
queryString, err := qb.options.BuildMetricQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, metricsV3.Options{PreferRPM: PreferRPMFeatureEnabled})
|
||||
metricsOptions := metricsV3.Options{PreferRPM: PreferRPMFeatureEnabled}
|
||||
queryString, err := qb.options.BuildMetricQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, metricsOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -257,7 +257,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
queryToTest: "A",
|
||||
expected: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts",
|
||||
expected: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "TestQueryWithExpression - Error rate",
|
||||
@@ -327,7 +327,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
queryToTest: "C",
|
||||
expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`",
|
||||
expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`",
|
||||
},
|
||||
{
|
||||
name: "TestQuery - Quantile",
|
||||
@@ -354,7 +354,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
queryToTest: "A",
|
||||
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -353,6 +353,7 @@ type QueryRangeParamsV3 struct {
|
||||
CompositeQuery *CompositeQuery `json:"compositeQuery"`
|
||||
Variables map[string]interface{} `json:"variables,omitempty"`
|
||||
NoCache bool `json:"noCache"`
|
||||
SourcePage string `json:"sourcePage"`
|
||||
}
|
||||
|
||||
type PromQuery struct {
|
||||
@@ -501,6 +502,11 @@ type BuilderQuery struct {
|
||||
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
|
||||
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
|
||||
Functions []Function `json:"functions,omitempty"`
|
||||
QueryLimits QueryLimits
|
||||
}
|
||||
|
||||
type QueryLimits struct {
|
||||
MaxTimeSeries int
|
||||
}
|
||||
|
||||
func (b *BuilderQuery) Validate() error {
|
||||
|
||||
Reference in New Issue
Block a user