Compare commits

...

27 Commits

Author SHA1 Message Date
Srikanth Chekuri
ec21d1bca1 fix: tests 2024-01-04 07:53:11 +05:30
Srikanth Chekuri
52e98bcddf chore: add ee query limits 2024-01-04 00:55:58 +05:30
Srikanth Chekuri
a8b7e9582a Merge branch 'develop' of github.com:SigNoz/signoz into query-limits 2024-01-02 20:53:51 +05:30
Yunus M
bdd7778e58 update readme.md (#3814)
* chore: update read.me - fe maintainers

* chore: update code owner for frontend codebase
2024-01-02 17:42:36 +05:30
Srikanth Chekuri
105216de3e chore: add prepare query for delta/unspecified timeseries (#4167)
* chore: update BuilderQuery struct and add PrepareTimeseriesFilterQuery

* chore: add prepare query for cumulative/unspecified timeseries

* chore: add prepare query for delta/unspecified timeseries

* chore: update group by to work with 23.11+

* chore: fix test

---------

Co-authored-by: Nityananda Gohain <nityanandagohain@gmail.com>
2023-12-30 22:53:09 +05:30
Ankit Nayan
3072b7eb01 merging main 2023-12-29 22:29:15 +05:30
Raj Kamal Singh
fd9a502012 fix: opamp server: do not panic if config generation fails (#4307) 2023-12-29 21:55:38 +05:30
Prashant Shahi
cf6dc827cc Merge pull request #4306 from SigNoz/release/v0.36.2
Release/v0.36.2
2023-12-29 19:21:12 +05:30
Prashant Shahi
6530873994 Merge branch 'main' into release/v0.36.2 2023-12-29 19:12:41 +05:30
Prashant Shahi
c9c0bd38be chore(signoz): 📌 pin versions: SigNoz 0.36.2
Signed-off-by: Prashant Shahi <prashant@signoz.io>
2023-12-29 19:24:12 +05:45
Prashant Shahi
9ac22fcb10 Merge branch 'develop' into release/v0.36.1 2023-12-29 19:06:53 +05:45
Raj Kamal Singh
86ff865842 Fix: logs pipelines: ignore coalesce op when generating nil check for add value expressions (#4305)
* chore: panic if agent config recommendation can't be generated

* chore: add case with coalesce op in field nil check generation tests

* fix: ignore expr ast member nodes that contain coalesce op in them
2023-12-29 18:21:01 +05:30
Rajat Dabade
e792c48f6d [Refactor]: css fixes (#4248) 2023-12-29 16:05:46 +05:30
Rajat Dabade
8ee92516ca [Refactor]: updated css for height of Panel in LeftContainer (#4030)
* refactor: updated css

* refactor: updated the css

* refactor: removed overflow hidden
2023-12-29 15:52:35 +05:30
Prashant Shahi
79c05d8fa8 Merge pull request #4304 from SigNoz/release/v0.36.1
Release/v0.36.1
2023-12-29 15:39:38 +05:30
Prashant Shahi
019bc8c1df chore(signoz): 📌 pin versions: SigNoz 0.36.1, SigNoz OtelCollector 0.88.6
Signed-off-by: Prashant Shahi <prashant@signoz.io>
2023-12-29 15:44:27 +05:45
Yunus M
d688399b91 fix: overflow issue in service page graphs (#4300) 2023-12-29 13:11:23 +05:30
Rajat Dabade
cfc239e3c9 refactor: added 3 days global timestamp (#4290)
* refactor: added 3 days global timestamp

* refactor: updated 3 days data in right container

* refactor: common function for calculate start and end time
2023-12-29 12:54:02 +05:30
Srikanth Chekuri
3572baa5eb fix: adjust the start and end more accurately (#4263)
* fix: adjust the start and end more accurately
Part of https://github.com/SigNoz/signoz/issues/1327

* chore: cache friendly timestamps
2023-12-29 12:35:22 +05:30
Rajat Dabade
ff26c5f69c fix: make fill gap persistent (#4302) 2023-12-29 12:23:27 +05:30
Srikanth Chekuri
9230f2442f fix: normalize label name to follow prometheus spec (#4264) 2023-12-28 20:22:42 +05:30
Raj Kamal Singh
7fed80b145 Fix: log pipelines contains and ncontains filters should be case insensitive (#4299)
* chore: add test validating contains and ncontains in pipeline filter are case insensitive

* chore: qb2expr: translate contains and ncontains to case insensitive comparison

* chore: minor cleanup
2023-12-28 19:44:17 +05:30
Yunus M
a268bb910c fix: update logic to handle step paths in Kubernetes APM flow (#4297)
* fix: update logic to handle step paths in kubernetes APM flow

* fix: don't reset service name on data source component mount
2023-12-28 18:30:41 +05:30
Rajat Dabade
fbbe0bef86 [Fix]: live view details modal disappear bug (#4249) 2023-12-28 16:02:55 +05:30
Raj Kamal Singh
bcd6ac47f7 Fix: Logs: Pipelines: add nil check for grok parser parseFrom field in generated collector config (#4286)
* chore: add test validating grok parser doesn't spam logs if parse from is missing

* chore: add nil check for grok parser parseFrom
2023-12-28 11:03:31 +05:30
Raj Kamal Singh
ec27916fa5 Fix: QS: Log Pipelines: generate correct nil checks for operators referencing fields like attributes["http.status.code"] (#4284)
* chore: add test validating that using paths like attributes["http.method"] works

* chore: refactor nil checks on processor fields generated for pipelines

* chore: get nil checks working on paths like attributes["http.method"]

* chore: use parsed AST for generating nil checks for add operator value expressions

* chore: some cleanup

* chore: some more cleanup

* chore: some more cleanup

* chore: some more cleanup

---------

Co-authored-by: Nityananda Gohain <nityanandagohain@gmail.com>
2023-12-28 10:31:36 +05:30
Srikanth Chekuri
4a042fa413 feat: add time-series-limit config option 2023-11-14 02:58:59 +05:30
57 changed files with 1441 additions and 245 deletions

2
.github/CODEOWNERS vendored
View File

@@ -2,7 +2,7 @@
# Owners are automatically requested for review for PRs that changes code
# that they own.
/frontend/ @palashgdev @YounixM
/frontend/ @YounixM
/frontend/src/container/MetricsApplication @srikanthccv
/frontend/src/container/NewWidget/RightContainer/types.ts @srikanthccv
/deploy/ @prashant-shahi

View File

@@ -11,7 +11,6 @@
<img alt="tweet" src="https://img.shields.io/twitter/url/http/shields.io.svg?style=social"> </a>
</p>
<h3 align="center">
<a href="https://signoz.io/docs"><b>Dokumentation</b></a> &bull;
<a href="https://github.com/SigNoz/signoz/blob/develop/README.md"><b>Readme auf Englisch </b></a> &bull;
@@ -40,12 +39,13 @@ SigNoz hilft Entwicklern, Anwendungen zu überwachen und Probleme in ihren berei
👉 Einfache Einrichtung von Benachrichtigungen mit dem selbst erstellbaren Abfrage-Builder.
##
### Anwendung Metriken
![application_metrics](https://user-images.githubusercontent.com/83692067/226637410-900dbc5e-6705-4b11-a10c-bd0faeb2a92f.png)
### Verteiltes Tracing
<img width="2068" alt="distributed_tracing_2 2" src="https://user-images.githubusercontent.com/83692067/226536447-bae58321-6a22-4ed3-af80-e3e964cb3489.png">
<img width="2068" alt="distributed_tracing_1" src="https://user-images.githubusercontent.com/83692067/226536462-939745b6-4f9d-45a6-8016-814837e7f7b4.png">
@@ -62,22 +62,18 @@ SigNoz hilft Entwicklern, Anwendungen zu überwachen und Probleme in ihren berei
![exceptions_light](https://user-images.githubusercontent.com/83692067/226637967-4188d024-3ac9-4799-be95-f5ea9c45436f.png)
### Alarme
<img width="2068" alt="alerts_management" src="https://user-images.githubusercontent.com/83692067/226536548-2c81e2e8-c12d-47e8-bad7-c6be79055def.png">
<br /><br />
## Werde Teil unserer Slack Community
Sag Hi zu uns auf [Slack](https://signoz.io/slack) 👋
<br /><br />
## Funktionen:
- Einheitliche Benutzeroberfläche für Metriken, Traces und Logs. Keine Notwendigkeit, zwischen Prometheus und Jaeger zu wechseln, um Probleme zu debuggen oder ein separates Log-Tool wie Elastic neben Ihrer Metriken- und Traces-Stack zu verwenden.
@@ -93,7 +89,6 @@ Sag Hi zu uns auf [Slack](https://signoz.io/slack) 👋
<br /><br />
## Wieso SigNoz?
Als Entwickler fanden wir es anstrengend, uns für jede kleine Funktion, die wir haben wollten, auf Closed Source SaaS Anbieter verlassen zu müssen. Closed Source Anbieter überraschen ihre Kunden zum Monatsende oft mit hohen Rechnungen, die keine Transparenz bzgl. der Kostenaufteilung bieten.
@@ -116,12 +111,10 @@ Wir unterstützen [OpenTelemetry](https://opentelemetry.io) als Bibliothek, mit
- Elixir
- Rust
Hier findest du die vollständige Liste von unterstützten Programmiersprachen - https://opentelemetry.io/docs/
<br /><br />
## Erste Schritte mit SigNoz
### Bereitstellung mit Docker
@@ -138,7 +131,6 @@ Bitte folge den [hier](https://signoz.io/docs/deployment/helm_chart) aufgelistet
<br /><br />
## Vergleiche mit bekannten Tools
### SigNoz vs Prometheus
@@ -179,7 +171,6 @@ Wir haben Benchmarks veröffentlicht, die Loki mit SigNoz vergleichen. Schauen S
<br /><br />
## Zum Projekt beitragen
Wir ❤️ Beiträge zum Projekt, egal ob große oder kleine. Bitte lies dir zuerst die [CONTRIBUTING.md](CONTRIBUTING.md), durch, bevor du anfängst, Beiträge zu SigNoz zu machen.
@@ -197,6 +188,8 @@ Du bist dir nicht sicher, wie du anfangen sollst? Schreib uns einfach auf dem #c
#### Frontend
- [Palash Gupta](https://github.com/palashgdev)
- [Yunus M](https://github.com/YounixM)
- [Rajat Dabade](https://github.com/Rajat-Dabade)
#### DevOps
@@ -204,16 +197,12 @@ Du bist dir nicht sicher, wie du anfangen sollst? Schreib uns einfach auf dem #c
<br /><br />
## Dokumentation
Du findest unsere Dokumentation unter https://signoz.io/docs/. Falls etwas unverständlich ist oder fehlt, öffne gerne ein Github Issue mit dem Label `documentation` oder schreib uns über den Community Slack Channel.
<br /><br />
## Gemeinschaft
Werde Teil der [slack community](https://signoz.io/slack) um mehr über verteilte Einzelschritt-Fehlersuche, Messung von Systemzuständen oder SigNoz zu erfahren und sich mit anderen Nutzern und Mitwirkenden in Verbindung zu setzen.

View File

@@ -19,7 +19,7 @@
<a href="https://twitter.com/SigNozHq"><b>Twitter</b></a>
</h3>
##
##
SigNoz 帮助开发人员监控应用并排查已部署应用的问题。你可以使用 SigNoz 实现如下能力:
@@ -67,7 +67,7 @@ SigNoz 帮助开发人员监控应用并排查已部署应用的问题。你可
## 加入我们 Slack 社区
来 [Slack](https://signoz.io/slack) 和我们打招呼吧 👋
来 [Slack](https://signoz.io/slack) 和我们打招呼吧 👋
<br /><br />
@@ -83,7 +83,7 @@ SigNoz 帮助开发人员监控应用并排查已部署应用的问题。你可
- 通过 服务名、操作方式、延迟、错误、标签/注释 过滤 traces 数据
- 通过聚合 trace 数据而获得业务相关的 metrics。 比如你可以通过 `customer_type: gold` 或者 `deployment_version: v2` 或者 `external_call: paypal` 获取错误率和 P99 延迟数据
- 通过聚合 trace 数据而获得业务相关的 metrics。 比如你可以通过 `customer_type: gold` 或者 `deployment_version: v2` 或者 `external_call: paypal` 获取错误率和 P99 延迟数据
- 原生支持 OpenTelemetry 日志,高级日志查询,自动收集 k8s 相关日志
@@ -101,7 +101,7 @@ SigNoz 帮助开发人员监控应用并排查已部署应用的问题。你可
我们想做一个自托管并且可开源的工具,像 DataDog 和 NewRelic 那样, 为那些担心数据隐私和安全的公司提供第三方服务。
作为开源的项目,你完全可以自己掌控你的配置、样本和更新。你同样可以基于 SigNoz 拓展特定的业务模块。
作为开源的项目,你完全可以自己掌控你的配置、样本和更新。你同样可以基于 SigNoz 拓展特定的业务模块。
### 支持的编程语言:
@@ -153,9 +153,9 @@ Jaeger 仅仅是一个分布式追踪系统。 但是 SigNoz 可以提供 metric
而且, SigNoz 相较于 Jaeger 拥有更对的高级功能:
- Jaegar UI 不能提供任何基于 traces 的 metrics 查询和过滤。
- Jaegar UI 不能提供任何基于 traces 的 metrics 查询和过滤。
- Jaeger 不能针对过滤的 traces 做聚合。 比如, p99 延迟的请求有个标签是 customer_type='premium'。 而这些在 SigNoz 可以轻松做到。
- Jaeger 不能针对过滤的 traces 做聚合。 比如, p99 延迟的请求有个标签是 customer_type='premium'。 而这些在 SigNoz 可以轻松做到。
<p>&nbsp </p>
@@ -185,7 +185,7 @@ Jaeger 仅仅是一个分布式追踪系统。 但是 SigNoz 可以提供 metric
我们 ❤️ 你的贡献,无论大小。 请先阅读 [CONTRIBUTING.md](CONTRIBUTING.md) 再开始给 SigNoz 做贡献。
如果你不知道如何开始? 只需要在 [slack 社区](https://signoz.io/slack) 通过 `#contributing` 频道联系我们。
如果你不知道如何开始? 只需要在 [slack 社区](https://signoz.io/slack) 通过 `#contributing` 频道联系我们。
### 项目维护人员
@@ -199,6 +199,8 @@ Jaeger 仅仅是一个分布式追踪系统。 但是 SigNoz 可以提供 metric
#### 前端
- [Palash Gupta](https://github.com/palashgdev)
- [Yunus M](https://github.com/YounixM)
- [Rajat Dabade](https://github.com/Rajat-Dabade)
#### 运维开发

View File

@@ -146,7 +146,7 @@ services:
condition: on-failure
query-service:
image: signoz/query-service:0.36.0
image: signoz/query-service:0.36.2
command:
[
"-config=/root/config/prometheus.yml",
@@ -186,7 +186,7 @@ services:
<<: *db-depend
frontend:
image: signoz/frontend:0.36.0
image: signoz/frontend:0.36.2
deploy:
restart_policy:
condition: on-failure
@@ -199,7 +199,7 @@ services:
- ../common/nginx-config.conf:/etc/nginx/conf.d/default.conf
otel-collector:
image: signoz/signoz-otel-collector:0.88.4
image: signoz/signoz-otel-collector:0.88.6
command:
[
"--config=/etc/otel-collector-config.yaml",
@@ -237,7 +237,7 @@ services:
- query-service
otel-collector-migrator:
image: signoz/signoz-schema-migrator:0.88.4
image: signoz/signoz-schema-migrator:0.88.6
deploy:
restart_policy:
condition: on-failure
@@ -250,7 +250,7 @@ services:
# - clickhouse-3
otel-collector-metrics:
image: signoz/signoz-otel-collector:0.88.4
image: signoz/signoz-otel-collector:0.88.6
command:
[
"--config=/etc/otel-collector-metrics-config.yaml",

View File

@@ -66,7 +66,7 @@ services:
- --storage.path=/data
otel-collector-migrator:
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.88.4}
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.88.6}
container_name: otel-migrator
command:
- "--dsn=tcp://clickhouse:9000"
@@ -81,7 +81,7 @@ services:
# Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md`
otel-collector:
container_name: signoz-otel-collector
image: signoz/signoz-otel-collector:0.88.4
image: signoz/signoz-otel-collector:0.88.6
command:
[
"--config=/etc/otel-collector-config.yaml",
@@ -118,7 +118,7 @@ services:
otel-collector-metrics:
container_name: signoz-otel-collector-metrics
image: signoz/signoz-otel-collector:0.88.4
image: signoz/signoz-otel-collector:0.88.6
command:
[
"--config=/etc/otel-collector-metrics-config.yaml",

View File

@@ -164,7 +164,7 @@ services:
# Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md`
query-service:
image: signoz/query-service:${DOCKER_TAG:-0.36.0}
image: signoz/query-service:${DOCKER_TAG:-0.36.2}
container_name: signoz-query-service
command:
[
@@ -203,7 +203,7 @@ services:
<<: *db-depend
frontend:
image: signoz/frontend:${DOCKER_TAG:-0.36.0}
image: signoz/frontend:${DOCKER_TAG:-0.36.2}
container_name: signoz-frontend
restart: on-failure
depends_on:
@@ -215,7 +215,7 @@ services:
- ../common/nginx-config.conf:/etc/nginx/conf.d/default.conf
otel-collector-migrator:
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.88.4}
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.88.6}
container_name: otel-migrator
command:
- "--dsn=tcp://clickhouse:9000"
@@ -229,7 +229,7 @@ services:
otel-collector:
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-0.88.4}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-0.88.6}
container_name: signoz-otel-collector
command:
[
@@ -269,7 +269,7 @@ services:
condition: service_healthy
otel-collector-metrics:
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-0.88.4}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-0.88.6}
container_name: signoz-otel-collector-metrics
command:
[

View File

@@ -150,6 +150,7 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *baseapp.AuthMiddlew
router.HandleFunc("/api/v1/login", am.OpenAccess(ah.loginUser)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/traces/{traceId}", am.ViewAccess(ah.searchTraces)).Methods(http.MethodGet)
router.HandleFunc("/api/v2/metrics/query_range", am.ViewAccess(ah.queryRangeMetricsV2)).Methods(http.MethodPost)
router.HandleFunc("/api/v3/query_range", am.ViewAccess(ah.QueryRangeV3)).Methods(http.MethodPost)
// PAT APIs
router.HandleFunc("/api/v1/pat", am.OpenAccess(ah.createPAT)).Methods(http.MethodPost)
@@ -163,6 +164,10 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *baseapp.AuthMiddlew
router.HandleFunc("/api/v1/dashboards/{uuid}/lock", am.EditAccess(ah.lockDashboard)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/dashboards/{uuid}/unlock", am.EditAccess(ah.unlockDashboard)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/limits", am.AdminAccess(ah.getQueryLimits)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/limits", am.AdminAccess(ah.addQueryLimits)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/limits", am.AdminAccess(ah.updateQueryLimits)).Methods(http.MethodPut)
router.HandleFunc("/api/v2/licenses",
am.ViewAccess(ah.listLicensesV2)).
Methods(http.MethodGet)

View File

@@ -0,0 +1,51 @@
package api
import (
"encoding/json"
"net/http"
"go.signoz.io/signoz/ee/query-service/model"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
)
func (aH *APIHandler) getQueryLimits(w http.ResponseWriter, r *http.Request) {
queryLimits, err := aH.AppDao().GetQueryLimits(r.Context())
if err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, queryLimits)
}
func (aH *APIHandler) addQueryLimits(w http.ResponseWriter, r *http.Request) {
var queryLimits []*model.QueryLimit
if err := json.NewDecoder(r.Body).Decode(&queryLimits); err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err}, nil)
return
}
err := aH.AppDao().AddQueryLimits(r.Context(), queryLimits)
if err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, nil)
}
func (aH *APIHandler) updateQueryLimits(w http.ResponseWriter, r *http.Request) {
var queryLimits []*model.QueryLimit
if err := json.NewDecoder(r.Body).Decode(&queryLimits); err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err}, nil)
return
}
err := aH.AppDao().UpdateQueryLimits(r.Context(), queryLimits)
if err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, nil)
}

View File

@@ -4,14 +4,17 @@ import (
"bytes"
"fmt"
"net/http"
"strings"
"sync"
"text/template"
"time"
"go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/metrics"
"go.signoz.io/signoz/pkg/query-service/app/parser"
"go.signoz.io/signoz/pkg/query-service/constants"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate"
"go.uber.org/zap"
)
@@ -234,3 +237,52 @@ func (ah *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
resp := ResponseFormat{ResultType: "matrix", Result: seriesList, TableName: tableName}
ah.Respond(w, resp)
}
func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
queryRangeParams, apiErrorObj := app.ParseQueryRangeParams(r)
if apiErrorObj != nil {
zap.S().Errorf(apiErrorObj.Err.Error())
RespondError(w, apiErrorObj, nil)
return
}
limits, err := aH.AppDao().GetQueryLimits(r.Context())
if err != nil {
zap.S().Errorf("Error while getting query limits: %v", err)
// We don't want to fail the request if we can't get the limits
// iterate over the nil slice will not execute the loop
// and the query will be executed without any limits
// this shouldn't happen ideally but we don't want to fail the request
}
var timeSeriesLimt int
for _, limit := range limits {
if limit.Name == "time_series_limit" {
timeSeriesLimt = limit.UsageLimit
}
}
if len(queryRangeParams.CompositeQuery.BuilderQueries) > 0 {
for idx := range queryRangeParams.CompositeQuery.BuilderQueries {
queryRangeParams.CompositeQuery.BuilderQueries[idx].QueryLimits = v3.QueryLimits{
MaxTimeSeries: timeSeriesLimt,
}
// TODO(srikanthccv): should apply to explore page also
if !strings.Contains(queryRangeParams.SourcePage, "dashboard") {
queryRangeParams.CompositeQuery.BuilderQueries[idx].QueryLimits.MaxTimeSeries = 0
}
}
}
// add temporality for each metric
temporalityErr := aH.APIHandler.AddTemporality(r.Context(), queryRangeParams)
if temporalityErr != nil {
zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr)
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorInternal, Err: temporalityErr}, nil)
return
}
aH.APIHandler.ExecQueryRangeV3(r.Context(), queryRangeParams, w, r)
}

View File

@@ -42,7 +42,6 @@ import (
baseconst "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/healthcheck"
basealm "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
baseint "go.signoz.io/signoz/pkg/query-service/interfaces"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
rules "go.signoz.io/signoz/pkg/query-service/rules"
@@ -87,7 +86,7 @@ type Server struct {
privateHTTP *http.Server
// feature flags
featureLookup baseint.FeatureLookup
featureLookup baseInterface.FeatureLookup
// Usage manager
usageManager *usage.Manager
@@ -641,7 +640,7 @@ func makeRulesManager(
alertManagerURL string,
ruleRepoURL string,
db *sqlx.DB,
ch baseint.Reader,
ch baseInterface.Reader,
disableRules bool,
fm baseInterface.FeatureLookup) (*rules.Manager, error) {

View File

@@ -39,4 +39,8 @@ type ModelDao interface {
GetUserByPAT(ctx context.Context, token string) (*basemodel.UserPayload, basemodel.BaseApiError)
ListPATs(ctx context.Context, userID string) ([]model.PAT, basemodel.BaseApiError)
DeletePAT(ctx context.Context, id string) basemodel.BaseApiError
GetQueryLimits(ctx context.Context) ([]*model.QueryLimit, error)
AddQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error
UpdateQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error
}

View File

@@ -0,0 +1,54 @@
package sqlite
import (
"context"
"go.signoz.io/signoz/ee/query-service/model"
)
// GetQueryLimits returns the query limits
func (m *modelDao) GetQueryLimits(ctx context.Context) ([]*model.QueryLimit, error) {
query := `SELECT name, title, usage_limit FROM resource_usage_limits`
var queryLimits []*model.QueryLimit
err := m.DB().Select(&queryLimits, query)
if err != nil {
return nil, err
}
return queryLimits, nil
}
// AddQueryLimits adds the query limits
func (m *modelDao) AddQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error {
tx := m.DB().MustBegin()
for _, queryLimit := range queryLimits {
query := `INSERT INTO resource_usage_limits (name, title, usage_limit) VALUES (?, ?, ?)`
_, err := tx.Exec(query, queryLimit.Name, queryLimit.Title, queryLimit.UsageLimit)
if err != nil {
tx.Rollback()
return err
}
}
err := tx.Commit()
if err != nil {
return err
}
return nil
}
// UpdateQueryLimits updates the query limits
func (m *modelDao) UpdateQueryLimits(ctx context.Context, queryLimits []*model.QueryLimit) error {
tx := m.DB().MustBegin()
for _, queryLimit := range queryLimits {
query := `UPDATE resource_usage_limits SET usage_limit = ? WHERE name = ?`
_, err := tx.Exec(query, queryLimit.UsageLimit, queryLimit.Name)
if err != nil {
tx.Rollback()
return err
}
}
err := tx.Commit()
if err != nil {
return err
}
return nil
}

View File

@@ -38,7 +38,7 @@ func InitDB(dataSourceName string) (*modelDao, error) {
basedao.SetDB(dao)
m := &modelDao{ModelDaoSqlite: dao}
table_schema := `
tableSchema := `
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS org_domains(
id TEXT PRIMARY KEY,
@@ -60,11 +60,23 @@ func InitDB(dataSourceName string) (*modelDao, error) {
);
`
_, err = m.DB().Exec(table_schema)
_, err = m.DB().Exec(tableSchema)
if err != nil {
return nil, fmt.Errorf("error in creating tables: %v", err.Error())
}
// create resource_usage_limits table
tableSchema = `CREATE TABLE IF NOT EXISTS resource_usage_limits (
name TEXT PRIMARY KEY,
title TEXT,
usage_limit INTEGER DEFAULT 0
);`
_, err = m.DB().Exec(tableSchema)
if err != nil {
return nil, fmt.Errorf("Error in creating resource_usage_limits table: %s", err.Error())
}
return m, nil
}

View File

@@ -9,8 +9,8 @@ import (
"github.com/google/uuid"
"github.com/pkg/errors"
saml2 "github.com/russellhaering/gosaml2"
"go.signoz.io/signoz/ee/query-service/sso/saml"
"go.signoz.io/signoz/ee/query-service/sso"
"go.signoz.io/signoz/ee/query-service/sso/saml"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
)
@@ -24,16 +24,16 @@ const (
// OrgDomain identify org owned web domains for auth and other purposes
type OrgDomain struct {
Id uuid.UUID `json:"id"`
Name string `json:"name"`
OrgId string `json:"orgId"`
SsoEnabled bool `json:"ssoEnabled"`
SsoType SSOType `json:"ssoType"`
Id uuid.UUID `json:"id"`
Name string `json:"name"`
OrgId string `json:"orgId"`
SsoEnabled bool `json:"ssoEnabled"`
SsoType SSOType `json:"ssoType"`
SamlConfig *SamlConfig `json:"samlConfig"`
SamlConfig *SamlConfig `json:"samlConfig"`
GoogleAuthConfig *GoogleOAuthConfig `json:"googleAuthConfig"`
Org *basemodel.Organization
Org *basemodel.Organization
}
func (od *OrgDomain) String() string {
@@ -100,8 +100,8 @@ func (od *OrgDomain) GetSAMLCert() string {
return ""
}
// PrepareGoogleOAuthProvider creates GoogleProvider that is used in
// requesting OAuth and also used in processing response from google
// PrepareGoogleOAuthProvider creates GoogleProvider that is used in
// requesting OAuth and also used in processing response from google
func (od *OrgDomain) PrepareGoogleOAuthProvider(siteUrl *url.URL) (sso.OAuthCallbackProvider, error) {
if od.GoogleAuthConfig == nil {
return nil, fmt.Errorf("Google auth is not setup correctly for this domain")
@@ -137,38 +137,36 @@ func (od *OrgDomain) PrepareSamlRequest(siteUrl *url.URL) (*saml2.SAMLServicePro
}
func (od *OrgDomain) BuildSsoUrl(siteUrl *url.URL) (ssoUrl string, err error) {
fmtDomainId := strings.Replace(od.Id.String(), "-", ":", -1)
// build redirect url from window.location sent by frontend
redirectURL := fmt.Sprintf("%s://%s%s", siteUrl.Scheme, siteUrl.Host, siteUrl.Path)
// prepare state that gets relayed back when the auth provider
// calls back our url. here we pass the app url (where signoz runs)
// and the domain Id. The domain Id helps in identifying sso config
// when the call back occurs and the app url is useful in redirecting user
// back to the right path.
// when the call back occurs and the app url is useful in redirecting user
// back to the right path.
// why do we need to pass app url? the callback typically is handled by backend
// and sometimes backend might right at a different port or is unaware of frontend
// endpoint (unless SITE_URL param is set). hence, we receive this build sso request
// along with frontend window.location and use it to relay the information through
// auth provider to the backend (HandleCallback or HandleSSO method).
// along with frontend window.location and use it to relay the information through
// auth provider to the backend (HandleCallback or HandleSSO method).
relayState := fmt.Sprintf("%s?domainId=%s", redirectURL, fmtDomainId)
switch (od.SsoType) {
switch od.SsoType {
case SAML:
sp, err := od.PrepareSamlRequest(siteUrl)
if err != nil {
return "", err
}
return sp.BuildAuthURL(relayState)
case GoogleAuth:
googleProvider, err := od.PrepareGoogleOAuthProvider(siteUrl)
if err != nil {
return "", err
@@ -177,8 +175,7 @@ func (od *OrgDomain) BuildSsoUrl(siteUrl *url.URL) (ssoUrl string, err error) {
default:
zap.S().Errorf("found unsupported SSO config for the org domain", zap.String("orgDomain", od.Name))
return "", fmt.Errorf("unsupported SSO config for the domain")
return "", fmt.Errorf("unsupported SSO config for the domain")
}
}

View File

@@ -0,0 +1,12 @@
package model
type QueryLimit struct {
// Name of the query limit
Name string `db:"name" json:"name"`
// Title of the query limit
Title string `db:"title" json:"title"`
// UsageLimit indicates the usage limit of the query
// If the usage limit is 0, then there is no limit
UsageLimit int `db:"usage_limit" json:"usage_limit"`
}

View File

@@ -7,7 +7,6 @@ import {
} from '@ant-design/icons';
import Convert from 'ansi-to-html';
import { Button, Divider, Row, Typography } from 'antd';
import LogDetail from 'components/LogDetail';
import LogsExplorerContext from 'container/LogsExplorerContext';
import dayjs from 'dayjs';
import dompurify from 'dompurify';
@@ -95,11 +94,15 @@ function LogSelectedField({
type ListLogViewProps = {
logData: ILog;
selectedFields: IField[];
onSetActiveLog: (log: ILog) => void;
onAddToQuery: AddToQueryHOCProps['onAddToQuery'];
};
function ListLogView({
logData,
selectedFields,
onSetActiveLog,
onAddToQuery,
}: ListLogViewProps): JSX.Element {
const flattenLogData = useMemo(() => FlatLogData(logData), [logData]);
@@ -113,12 +116,6 @@ function ListLogView({
onSetActiveLog: handleSetActiveContextLog,
onClearActiveLog: handleClearActiveContextLog,
} = useActiveLog();
const {
activeLog,
onSetActiveLog,
onClearActiveLog,
onAddToQuery,
} = useActiveLog();
const handleDetailedView = useCallback(() => {
onSetActiveLog(logData);
@@ -223,12 +220,6 @@ function ListLogView({
onClose={handleClearActiveContextLog}
/>
)}
<LogDetail
log={activeLog}
onClose={onClearActiveLog}
onAddToQuery={onAddToQuery}
onClickActionItem={onAddToQuery}
/>
</Row>
</Container>
);

View File

@@ -1,3 +1,5 @@
import '../GridCardLayout.styles.scss';
import { Skeleton, Typography } from 'antd';
import cx from 'classnames';
import { ToggleGraphProps } from 'components/Graph/types';

View File

@@ -1,7 +1,7 @@
import styled from 'styled-components';
export const WrapperStyled = styled.div`
height: 100%;
height: 95%;
overflow: hidden;
& .ant-table-wrapper {

View File

@@ -1,4 +1,5 @@
import { Card, Typography } from 'antd';
import LogDetail from 'components/LogDetail';
import ListLogView from 'components/Logs/ListLogView';
import RawLogView from 'components/Logs/RawLogView';
import Spinner from 'components/Spinner';
@@ -10,6 +11,7 @@ import { InfinityWrapperStyled } from 'container/LogsExplorerList/styles';
import { convertKeysToColumnFields } from 'container/LogsExplorerList/utils';
import { Heading } from 'container/LogsTable/styles';
import { useOptionsMenu } from 'container/OptionsMenu';
import { useActiveLog } from 'hooks/logs/useActiveLog';
import { useCopyLogLink } from 'hooks/logs/useCopyLogLink';
import useFontFaceObserver from 'hooks/useFontObserver';
import { useEventSource } from 'providers/EventSource';
@@ -31,6 +33,13 @@ function LiveLogsList({ logs }: LiveLogsListProps): JSX.Element {
const { activeLogId } = useCopyLogLink();
const {
activeLog,
onClearActiveLog,
onAddToQuery,
onSetActiveLog,
} = useActiveLog();
const { options } = useOptionsMenu({
storageKey: LOCALSTORAGE.LOGS_LIST_OPTIONS,
dataSource: DataSource.LOGS,
@@ -66,10 +75,22 @@ function LiveLogsList({ logs }: LiveLogsListProps): JSX.Element {
}
return (
<ListLogView key={log.id} logData={log} selectedFields={selectedFields} />
<ListLogView
key={log.id}
logData={log}
selectedFields={selectedFields}
onAddToQuery={onAddToQuery}
onSetActiveLog={onSetActiveLog}
/>
);
},
[options.format, options.maxLines, selectedFields],
[
onAddToQuery,
onSetActiveLog,
options.format,
options.maxLines,
selectedFields,
],
);
useEffect(() => {
@@ -123,6 +144,12 @@ function LiveLogsList({ logs }: LiveLogsListProps): JSX.Element {
)}
</InfinityWrapperStyled>
)}
<LogDetail
log={activeLog}
onClose={onClearActiveLog}
onAddToQuery={onAddToQuery}
onClickActionItem={onAddToQuery}
/>
</>
);
}

View File

@@ -1,4 +1,5 @@
import { Card, Typography } from 'antd';
import LogDetail from 'components/LogDetail';
// components
import ListLogView from 'components/Logs/ListLogView';
import RawLogView from 'components/Logs/RawLogView';
@@ -8,6 +9,7 @@ import { LOCALSTORAGE } from 'constants/localStorage';
import ExplorerControlPanel from 'container/ExplorerControlPanel';
import { Heading } from 'container/LogsTable/styles';
import { useOptionsMenu } from 'container/OptionsMenu';
import { useActiveLog } from 'hooks/logs/useActiveLog';
import { useCopyLogLink } from 'hooks/logs/useCopyLogLink';
import { useQueryBuilder } from 'hooks/queryBuilder/useQueryBuilder';
import useFontFaceObserver from 'hooks/useFontObserver';
@@ -37,6 +39,13 @@ function LogsExplorerList({
const { activeLogId } = useCopyLogLink();
const {
activeLog,
onClearActiveLog,
onAddToQuery,
onSetActiveLog,
} = useActiveLog();
const { options, config } = useOptionsMenu({
storageKey: LOCALSTORAGE.LOGS_LIST_OPTIONS,
dataSource: initialDataSource || DataSource.METRICS,
@@ -76,10 +85,22 @@ function LogsExplorerList({
}
return (
<ListLogView key={log.id} logData={log} selectedFields={selectedFields} />
<ListLogView
key={log.id}
logData={log}
selectedFields={selectedFields}
onAddToQuery={onAddToQuery}
onSetActiveLog={onSetActiveLog}
/>
);
},
[options.format, options.maxLines, selectedFields],
[
onAddToQuery,
onSetActiveLog,
options.format,
options.maxLines,
selectedFields,
],
);
useEffect(() => {
@@ -149,6 +170,13 @@ function LogsExplorerList({
)}
<InfinityWrapperStyled>{renderContent}</InfinityWrapperStyled>
<LogDetail
log={activeLog}
onClose={onClearActiveLog}
onAddToQuery={onAddToQuery}
onClickActionItem={onAddToQuery}
/>
</>
);
}

View File

@@ -1,6 +1,7 @@
import './logsTable.styles.scss';
import { Card, Typography } from 'antd';
import LogDetail from 'components/LogDetail';
// components
import ListLogView from 'components/Logs/ListLogView';
import RawLogView from 'components/Logs/RawLogView';
@@ -29,7 +30,12 @@ type LogsTableProps = {
function LogsTable(props: LogsTableProps): JSX.Element {
const { viewMode, linesPerRow } = props;
const { onSetActiveLog } = useActiveLog();
const {
activeLog,
onClearActiveLog,
onAddToQuery,
onSetActiveLog,
} = useActiveLog();
useFontFaceObserver(
[
@@ -69,9 +75,17 @@ function LogsTable(props: LogsTableProps): JSX.Element {
return <RawLogView key={log.id} data={log} linesPerRow={linesPerRow} />;
}
return <ListLogView key={log.id} logData={log} selectedFields={selected} />;
return (
<ListLogView
key={log.id}
logData={log}
selectedFields={selected}
onAddToQuery={onAddToQuery}
onSetActiveLog={onSetActiveLog}
/>
);
},
[logs, viewMode, selected, linesPerRow],
[logs, viewMode, selected, onAddToQuery, onSetActiveLog, linesPerRow],
);
const renderContent = useMemo(() => {
@@ -110,6 +124,12 @@ function LogsTable(props: LogsTableProps): JSX.Element {
{isNoLogs && <Typography>No logs lines found</Typography>}
{renderContent}
<LogDetail
log={activeLog}
onClose={onClearActiveLog}
onAddToQuery={onAddToQuery}
onClickActionItem={onAddToQuery}
/>
</Container>
);
}

View File

@@ -32,19 +32,13 @@ import {
errorPercentage,
operationPerSec,
} from '../MetricsPageQueries/OverviewQueries';
import {
Card,
Col,
ColApDexContainer,
ColErrorContainer,
Row,
} from '../styles';
import { Col, ColApDexContainer, ColErrorContainer, Row } from '../styles';
import ApDex from './Overview/ApDex';
import ServiceOverview from './Overview/ServiceOverview';
import TopLevelOperation from './Overview/TopLevelOperations';
import TopOperation from './Overview/TopOperation';
import TopOperationMetrics from './Overview/TopOperationMetrics';
import { Button } from './styles';
import { Button, Card } from './styles';
import { IServiceName } from './types';
import {
handleNonInQueryRange,
@@ -276,7 +270,7 @@ function Application(): JSX.Element {
<Col span={12}>
<Card>
{isSpanMetricEnabled ? <TopOperationMetrics /> : <TopOperation />}
{isSpanMetricEnabled ? <TopOperationMetrics /> : <TopOperation />}{' '}
</Card>
</Col>
</Row>

View File

@@ -1,4 +1,4 @@
import { Button as ButtonComponent } from 'antd';
import { Button as ButtonComponent, Card as CardComponent } from 'antd';
import styled from 'styled-components';
export const Button = styled(ButtonComponent)`
@@ -8,3 +8,9 @@ export const Button = styled(ButtonComponent)`
display: none;
}
`;
export const Card = styled(CardComponent)`
.ant-card-body {
padding: 10px;
}
`;

View File

@@ -8,12 +8,13 @@ import styled from 'styled-components';
export const Card = styled(CardComponent)`
&&& {
padding: 10px;
height: 40vh;
overflow: hidden;
}
.ant-card-body {
height: calc(100% - 40px);
padding: 0;
min-height: 40vh;
}
`;
@@ -38,7 +39,8 @@ export const ColErrorContainer = styled(ColComponent)`
`;
export const GraphContainer = styled.div`
height: 40vh;
min-height: calc(40vh - 40px);
height: calc(100% - 40px);
`;
export const GraphTitle = styled(Typography)`

View File

@@ -12,9 +12,9 @@ export const Container = styled(Card)<Props>`
}
.ant-card-body {
padding: 8px;
padding: ${({ $panelType }): string =>
$panelType === PANEL_TYPES.TABLE ? '0 0' : '1.5rem 0'};
height: 57vh;
overflow: auto;
display: flex;
flex-direction: column;
}

View File

@@ -27,6 +27,10 @@ import CustomColor from './CustomColor';
import ShowCaseValue from './ShowCaseValue';
import { ThresholdProps } from './types';
const wrapStyle = {
flexWrap: 'wrap',
} as React.CSSProperties;
function Threshold({
index,
thresholdOperator = '>',
@@ -220,7 +224,7 @@ function Threshold({
}
>
{selectedGraph === PANEL_TYPES.TIME_SERIES && (
<>
<Space style={wrapStyle}>
<Typography.Text>Label</Typography.Text>
{isEditMode ? (
<Input
@@ -232,7 +236,7 @@ function Threshold({
) : (
<ShowCaseValue width="180px" value={label || 'none'} />
)}
</>
</Space>
)}
{(selectedGraph === PANEL_TYPES.VALUE ||
selectedGraph === PANEL_TYPES.TABLE) && (
@@ -243,7 +247,7 @@ function Threshold({
{isEditMode ? (
<>
{selectedGraph === PANEL_TYPES.TABLE && (
<Space>
<Space style={wrapStyle}>
<Select
style={{
minWidth: '150px',
@@ -270,7 +274,7 @@ function Threshold({
) : (
<>
{selectedGraph === PANEL_TYPES.TABLE && (
<Space>
<Space style={wrapStyle}>
<ShowCaseValue width="150px" value={tableSelectedOption} />
<Typography.Text>is</Typography.Text>
</Space>
@@ -283,7 +287,7 @@ function Threshold({
</Space>
</div>
<div className="threshold-units-selector">
<Space>
<Space style={wrapStyle}>
{isEditMode ? (
<InputNumber
style={{ backgroundColor }}
@@ -311,7 +315,7 @@ function Threshold({
<div>
<Space direction="vertical">
<Typography.Text>Show with</Typography.Text>
<Space>
<Space style={wrapStyle}>
{isEditMode ? (
<>
<ColorSelector setColor={setColor} thresholdColor={color} />

View File

@@ -28,6 +28,10 @@ export const timeItems: timePreferance[] = [
name: 'Last 1 day',
enum: 'LAST_1_DAY',
},
{
name: 'Last 3 days',
enum: 'LAST_3_DAYS',
},
{
name: 'Last 1 week',
enum: 'LAST_1_WEEK',
@@ -47,6 +51,7 @@ export type timePreferenceType =
| LAST_1_HR
| LAST_6_HR
| LAST_1_DAY
| LAST_3_DAYS
| LAST_1_WEEK;
type GLOBAL_TIME = 'GLOBAL_TIME';
@@ -56,6 +61,7 @@ type LAST_30_MIN = 'LAST_30_MIN';
type LAST_1_HR = 'LAST_1_HR';
type LAST_6_HR = 'LAST_6_HR';
type LAST_1_DAY = 'LAST_1_DAY';
type LAST_3_DAYS = 'LAST_3_DAYS';
type LAST_1_WEEK = 'LAST_1_WEEK';
export default timeItems;

View File

@@ -178,6 +178,7 @@ function NewWidget({ selectedGraph }: NewWidgetProps): JSX.Element {
yAxisUnit,
panelTypes: graphType,
thresholds,
fillSpans: isFillSpans,
},
...afterWidgets,
],
@@ -212,6 +213,7 @@ function NewWidget({ selectedGraph }: NewWidgetProps): JSX.Element {
yAxisUnit,
graphType,
thresholds,
isFillSpans,
afterWidgets,
updateDashboardMutation,
setSelectedDashboard,

View File

@@ -35,9 +35,7 @@ export default function DataSource(): JSX.Element {
selectedFramework,
updateSelectedDataSource,
updateServiceName,
updateSelectedEnvironment,
updateSelectedFramework,
updateErrorDetails,
} = useOnboardingContext();
const [supportedDataSources, setSupportedDataSources] = useState<
@@ -55,11 +53,6 @@ export default function DataSource(): JSX.Element {
setSupportedDataSources(dataSource);
}
updateSelectedEnvironment('');
updateErrorDetails('');
updateServiceName('');
updateSelectedFramework('');
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);

View File

@@ -3,7 +3,10 @@ import { MarkdownRenderer } from 'components/MarkdownRenderer/MarkdownRenderer';
import { ApmDocFilePaths } from 'container/OnboardingContainer/constants/apmDocFilePaths';
import { InfraMonitoringDocFilePaths } from 'container/OnboardingContainer/constants/infraMonitoringDocFilePaths';
import { LogsManagementDocFilePaths } from 'container/OnboardingContainer/constants/logsManagementDocFilePaths';
import { useOnboardingContext } from 'container/OnboardingContainer/context/OnboardingContext';
import {
OnboardingMethods,
useOnboardingContext,
} from 'container/OnboardingContainer/context/OnboardingContext';
import { ModulesMap } from 'container/OnboardingContainer/OnboardingContainer';
import useAnalytics from 'hooks/analytics/useAnalytics';
import { useEffect, useState } from 'react';
@@ -42,12 +45,12 @@ export default function MarkdownStep(): JSX.Element {
path += `_${selectedEnvironment}`;
}
if (
selectedModule?.id === ModulesMap.APM &&
selectedDataSource?.id !== 'kubernetes' &&
selectedMethod
) {
path += `_${selectedMethod}`;
if (selectedModule?.id === ModulesMap.APM) {
if (selectedEnvironment === 'kubernetes') {
path += `_${OnboardingMethods.RECOMMENDED_STEPS}`;
} else if (selectedEnvironment !== 'kubernetes' && selectedMethod) {
path += `_${selectedMethod}`;
}
}
path += `_${step?.id}`;

View File

@@ -104,7 +104,7 @@ function OnboardingContextProvider({
setSelectedDataSource(defaultApplicationDataSource);
setSelectedEnvironment('');
setSelectedFramework('');
setSelectedMethod(OnboardingMethods.RECOMMENDED_STEPS);
setSelectedMethod(OnboardingMethods.QUICK_START);
updateActiveStep(null);
};

View File

@@ -9,6 +9,7 @@ type SixHour = '6hr';
type OneHour = '1hr';
type FourHour = '4hr';
type OneDay = '1day';
type ThreeDay = '3days';
type OneWeek = '1week';
type Custom = 'custom';
@@ -23,7 +24,8 @@ export type Time =
| OneHour
| Custom
| OneWeek
| OneDay;
| OneDay
| ThreeDay;
export const Options: Option[] = [
{ value: '5min', label: 'Last 5 min' },
@@ -32,6 +34,7 @@ export const Options: Option[] = [
{ value: '1hr', label: 'Last 1 hour' },
{ value: '6hr', label: 'Last 6 hour' },
{ value: '1day', label: 'Last 1 day' },
{ value: '3days', label: 'Last 3 days' },
{ value: '1week', label: 'Last 1 week' },
{ value: 'custom', label: 'Custom' },
];
@@ -48,6 +51,7 @@ export const RelativeDurationOptions: Option[] = [
{ value: '1hr', label: 'Last 1 hour' },
{ value: '6hr', label: 'Last 6 hour' },
{ value: '1day', label: 'Last 1 day' },
{ value: '3days', label: 'Last 3 days' },
{ value: '1week', label: 'Last 1 week' },
];

View File

@@ -6,6 +6,7 @@ import getMinAgo from './getStartAndEndTime/getMinAgo';
const GetMinMax = (
interval: Time,
dateTimeRange?: [number, number],
// eslint-disable-next-line sonarjs/cognitive-complexity
): GetMinMaxPayload => {
let maxTime = new Date().getTime();
let minTime = 0;
@@ -32,6 +33,10 @@ const GetMinMax = (
// one day = 24*60(min)
const minTimeAgo = getMinAgo({ minutes: 24 * 60 }).getTime();
minTime = minTimeAgo;
} else if (interval === '3days') {
// three day = one day * 3
const minTimeAgo = getMinAgo({ minutes: 24 * 60 * 3 }).getTime();
minTime = minTimeAgo;
} else if (interval === '1week') {
// one week = one day * 7
const minTimeAgo = getMinAgo({ minutes: 24 * 60 * 7 }).getTime();

View File

@@ -3,6 +3,19 @@ import { timePreferenceType } from 'container/NewWidget/RightContainer/timeItems
import getMicroSeconds from './getMicroSeconds';
import getMinAgo from './getMinAgo';
const calculateStartAndEndTime = (
minutes: number,
endString: string,
): Payload => {
const agodate = getMinAgo({ minutes }).getTime();
const agoString = getMicroSeconds({ time: agodate });
return {
start: agoString,
end: endString,
};
};
const GetStartAndEndTime = ({
type,
minTime,
@@ -12,73 +25,35 @@ const GetStartAndEndTime = ({
const endString = getMicroSeconds({ time: end });
if (type === 'LAST_5_MIN') {
const agodate = getMinAgo({ minutes: 5 }).getTime();
const agoString = getMicroSeconds({ time: agodate });
return {
start: agoString,
end: endString,
};
return calculateStartAndEndTime(5, endString);
}
if (type === 'LAST_30_MIN') {
const agodate = getMinAgo({ minutes: 30 }).getTime();
const agoString = getMicroSeconds({ time: agodate });
return {
start: agoString,
end: endString,
};
return calculateStartAndEndTime(30, endString);
}
if (type === 'LAST_1_HR') {
const agodate = getMinAgo({ minutes: 60 }).getTime();
const agoString = getMicroSeconds({ time: agodate });
return {
start: agoString,
end: endString,
};
return calculateStartAndEndTime(60, endString);
}
if (type === 'LAST_15_MIN') {
const agodate = getMinAgo({ minutes: 15 }).getTime();
const agoString = getMicroSeconds({ time: agodate });
return {
start: agoString,
end: endString,
};
return calculateStartAndEndTime(15, endString);
}
if (type === 'LAST_6_HR') {
const agoDate = getMinAgo({ minutes: 6 * 60 }).getTime();
const agoString = getMicroSeconds({ time: agoDate });
return {
start: agoString,
end: endString,
};
return calculateStartAndEndTime(6 * 60, endString);
}
if (type === 'LAST_1_DAY') {
const agoDate = getMinAgo({ minutes: 24 * 60 }).getTime();
const agoString = getMicroSeconds({ time: agoDate });
return calculateStartAndEndTime(24 * 60, endString);
}
return {
start: agoString,
end: endString,
};
if (type === 'LAST_3_DAYS') {
return calculateStartAndEndTime(24 * 60 * 3, endString);
}
if (type === 'LAST_1_WEEK') {
const agoDate = getMinAgo({ minutes: 24 * 60 * 7 }).getTime();
const agoString = getMicroSeconds({ time: agoDate });
return {
start: agoString,
end: endString,
};
return calculateStartAndEndTime(24 * 60 * 7, endString);
}
return {

2
go.mod
View File

@@ -5,7 +5,7 @@ go 1.21
require (
github.com/ClickHouse/clickhouse-go/v2 v2.15.0
github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb
github.com/SigNoz/signoz-otel-collector v0.88.4
github.com/SigNoz/signoz-otel-collector v0.88.6
github.com/SigNoz/zap_otlp/zap_otlp_encoder v0.0.0-20230822164844-1b861a431974
github.com/SigNoz/zap_otlp/zap_otlp_sync v0.0.0-20230822164844-1b861a431974
github.com/antonmedv/expr v1.15.3

4
go.sum
View File

@@ -98,8 +98,8 @@ github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb h1:bneLSKPf9YUSFm
github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb/go.mod h1:JznGDNg9x1cujDKa22RaQOimOvvEfy3nxzDGd8XDgmA=
github.com/SigNoz/prometheus v1.9.78 h1:bB3yuDrRzi/Mv00kWayR9DZbyjTuGfendSqISyDcXiY=
github.com/SigNoz/prometheus v1.9.78/go.mod h1:MffmFu2qFILQrOHehx3D0XjYtaZMVfI+Ppeiv98x4Ww=
github.com/SigNoz/signoz-otel-collector v0.88.4 h1:vwyr26Iz1IkjTJQvcTk6E0StSg8pZGoz6aqXCAJjU8w=
github.com/SigNoz/signoz-otel-collector v0.88.4/go.mod h1:AkN5EPLaFn9TRS5303LzOEjiApld7TBoMImiRTXAvs8=
github.com/SigNoz/signoz-otel-collector v0.88.6 h1:rvXm9bz4b9GsYeT8c3+F/g56DHPf0IN8mK8tUfZfnw8=
github.com/SigNoz/signoz-otel-collector v0.88.6/go.mod h1:6lR8Uy99zBd0JGPg9zt0aEBW4A4GpblUtpcbszGmg8E=
github.com/SigNoz/zap_otlp v0.1.0 h1:T7rRcFN87GavY8lDGZj0Z3Xv6OhJA6Pj3I9dNPmqvRc=
github.com/SigNoz/zap_otlp v0.1.0/go.mod h1:lcHvbDbRgvDnPxo9lDlaL1JK2PyOyouP/C3ynnYIvyo=
github.com/SigNoz/zap_otlp/zap_otlp_encoder v0.0.0-20230822164844-1b861a431974 h1:PKVgdf83Yw+lZJbFtNGBgqXiXNf3+kOXW2qZ7Ms7OaY=

View File

@@ -4491,6 +4491,14 @@ func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNam
seriesToPoints[key] = append(seriesToPoints[key], metricPoint)
}
if rows.Err() != nil {
zap.S().Errorf("error while reading time series result %v", rows.Err())
if strings.Contains(rows.Err().Error(), "code: 191") {
return nil, fmt.Errorf(`Exceeded max number of series per query. Please reduce the number of series by applying filters.`)
}
return nil, rows.Err()
}
var seriesList []*v3.Series
for _, key := range keys {
points := seriesToPoints[key]

View File

@@ -514,7 +514,7 @@ func (aH *APIHandler) metricAutocompleteTagValue(w http.ResponseWriter, r *http.
aH.Respond(w, tagValueList)
}
func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
func (aH *APIHandler) AddTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
metricNames := make([]string, 0)
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
@@ -3014,7 +3014,7 @@ func (aH *APIHandler) QueryRangeV3Format(w http.ResponseWriter, r *http.Request)
aH.Respond(w, queryRangeParams)
}
func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) {
func (aH *APIHandler) ExecQueryRangeV3(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) {
var result []*v3.Result
var err error
@@ -3080,14 +3080,14 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
// add temporality for each metric
temporalityErr := aH.addTemporality(r.Context(), queryRangeParams)
temporalityErr := aH.AddTemporality(r.Context(), queryRangeParams)
if temporalityErr != nil {
zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr)
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)
return
}
aH.queryRangeV3(r.Context(), queryRangeParams, w, r)
aH.ExecQueryRangeV3(r.Context(), queryRangeParams, w, r)
}
func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {

View File

@@ -2,8 +2,12 @@ package logparsingpipeline
import (
"fmt"
"slices"
"strings"
"github.com/antonmedv/expr"
"github.com/antonmedv/expr/ast"
"github.com/antonmedv/expr/parser"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr"
@@ -81,50 +85,97 @@ func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) {
}
if operator.Type == "regex_parser" {
parseFromParts := strings.Split(operator.ParseFrom, ".")
parseFromPath := strings.Join(parseFromParts, "?.")
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of regex op %s: %w", operator.Name, err,
)
}
operator.If = fmt.Sprintf(
`%s != nil && %s matches "%s"`,
parseFromPath,
parseFromPath,
`%s && %s matches "%s"`,
parseFromNotNilCheck,
operator.ParseFrom,
strings.ReplaceAll(
strings.ReplaceAll(operator.Regex, `\`, `\\`),
`"`, `\"`,
),
)
} else if operator.Type == "grok_parser" {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of grok op %s: %w", operator.Name, err,
)
}
operator.If = parseFromNotNilCheck
} else if operator.Type == "json_parser" {
parseFromParts := strings.Split(operator.ParseFrom, ".")
parseFromPath := strings.Join(parseFromParts, "?.")
operator.If = fmt.Sprintf(`%s != nil && %s matches "^\\s*{.*}\\s*$"`, parseFromPath, parseFromPath)
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of json parser op %s: %w", operator.Name, err,
)
}
operator.If = fmt.Sprintf(
`%s && %s matches "^\\s*{.*}\\s*$"`, parseFromNotNilCheck, operator.ParseFrom,
)
} else if operator.Type == "add" {
if strings.HasPrefix(operator.Value, "EXPR(") && strings.HasSuffix(operator.Value, ")") {
expression := strings.TrimSuffix(strings.TrimPrefix(operator.Value, "EXPR("), ")")
fieldsNotNilCheck, err := fieldsReferencedInExprNotNilCheck(expression)
if err != nil {
return nil, fmt.Errorf(
"could'nt generate nil check for fields referenced in value expr of add operator %s: %w",
operator.Name, err,
)
}
if fieldsNotNilCheck != "" {
operator.If = fieldsNotNilCheck
}
}
} else if operator.Type == "move" || operator.Type == "copy" {
fromParts := strings.Split(operator.From, ".")
fromPath := strings.Join(fromParts, "?.")
operator.If = fmt.Sprintf(`%s != nil`, fromPath)
fromNotNilCheck, err := fieldNotNilCheck(operator.From)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for From field of %s op %s: %w", operator.Type, operator.Name, err,
)
}
operator.If = fromNotNilCheck
} else if operator.Type == "remove" {
fieldParts := strings.Split(operator.Field, ".")
fieldPath := strings.Join(fieldParts, "?.")
operator.If = fmt.Sprintf(`%s != nil`, fieldPath)
fieldNotNilCheck, err := fieldNotNilCheck(operator.Field)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for field to be removed by op %s: %w", operator.Name, err,
)
}
operator.If = fieldNotNilCheck
} else if operator.Type == "trace_parser" {
cleanTraceParser(&operator)
} else if operator.Type == "time_parser" {
parseFromParts := strings.Split(operator.ParseFrom, ".")
parseFromPath := strings.Join(parseFromParts, "?.")
operator.If = fmt.Sprintf(`%s != nil`, parseFromPath)
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of time parser op %s: %w", operator.Name, err,
)
}
operator.If = parseFromNotNilCheck
if operator.LayoutType == "strptime" {
regex, err := RegexForStrptimeLayout(operator.Layout)
if err != nil {
return nil, fmt.Errorf("could not generate time_parser processor: %w", err)
return nil, fmt.Errorf(
"couldn't generate layout regex for time_parser %s: %w", operator.Name, err,
)
}
operator.If = fmt.Sprintf(
`%s && %s matches "%s"`, operator.If, parseFromPath, regex,
`%s && %s matches "%s"`, operator.If, operator.ParseFrom, regex,
)
} else if operator.LayoutType == "epoch" {
valueRegex := `^\\s*[0-9]+\\s*$`
@@ -133,19 +184,22 @@ func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) {
}
operator.If = fmt.Sprintf(
`%s && string(%s) matches "%s"`, operator.If, parseFromPath, valueRegex,
`%s && string(%s) matches "%s"`, operator.If, operator.ParseFrom, valueRegex,
)
}
// TODO(Raj): Maybe add support for gotime too eventually
} else if operator.Type == "severity_parser" {
parseFromParts := strings.Split(operator.ParseFrom, ".")
parseFromPath := strings.Join(parseFromParts, "?.")
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of severity parser %s: %w", operator.Name, err,
)
}
operator.If = fmt.Sprintf(
`%s != nil && ( type(%s) == "string" || ( type(%s) in ["int", "float"] && %s == float(int(%s)) ) )`,
parseFromPath, parseFromPath, parseFromPath, parseFromPath, parseFromPath,
`%s && ( type(%s) == "string" || ( type(%s) in ["int", "float"] && %s == float(int(%s)) ) )`,
parseFromNotNilCheck, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom,
)
}
@@ -169,3 +223,158 @@ func cleanTraceParser(operator *PipelineOperator) {
operator.TraceFlags = nil
}
}
// Generates an expression checking that `fieldPath` has a non-nil value in a log record.
func fieldNotNilCheck(fieldPath string) (string, error) {
_, err := expr.Compile(fieldPath)
if err != nil {
return "", fmt.Errorf("invalid fieldPath %s: %w", fieldPath, err)
}
// helper for turning `.` into `?.` in field paths.
// Eg: a.b?.c.d -> a?.b?.c?.d
optionalChainedPath := func(path string) string {
return strings.ReplaceAll(
strings.ReplaceAll(path, "?.", "."), ".", "?.",
)
}
// Optional chaining before membership ops is not supported by expr.
// Eg: The field `attributes.test["a.b"].value["c.d"].e` can't be checked using
// the nil check `attributes.test?.["a.b"]?.value?.["c.d"]?.e != nil`
// This needs to be worked around by checking that the target of membership op is not nil first.
// Eg: attributes.test != nil && attributes.test["a.b"]?.value != nil && attributes.test["a.b"].value["c.d"]?.e != nil
// Split once from the right to include the rightmost membership op and everything after it.
// Eg: `attributes.test["a.b"].value["c.d"].e` would result in `attributes.test["a.b"].value` and `["c.d"].e`
parts := rSplitAfterN(fieldPath, "[", 2)
if len(parts) < 2 {
// there is no [] access in fieldPath
return fmt.Sprintf("%s != nil", optionalChainedPath(fieldPath)), nil
}
// recursively generate nil check for target of the rightmost membership op (attributes.test["a.b"].value)
// should come out to be (attributes.test != nil && attributes.test["a.b"]?.value != nil)
collectionNotNilCheck, err := fieldNotNilCheck(parts[0])
if err != nil {
return "", fmt.Errorf("couldn't generate nil check for %s: %w", parts[0], err)
}
// generate nil check for entire path.
suffixParts := strings.SplitAfter(parts[1], "]") // ["c.d"], ".e"
fullPath := parts[0] + suffixParts[0]
if len(suffixParts) > 1 {
// attributes.test["a.b"].value["c.d"]?.e
fullPath += optionalChainedPath(suffixParts[1])
}
fullPathCheck := fmt.Sprintf("%s != nil", fullPath)
// If the membership op is for array/slice indexing, add check ensuring array is long enough
// attributes.test[3] -> len(attributes.test) > 3 && attributes.test[3] != nil
if !(strings.Contains(suffixParts[0], "'") || strings.Contains(suffixParts[0], `"`)) {
fullPathCheck = fmt.Sprintf(
"len(%s) > %s && %s",
parts[0], suffixParts[0][1:len(suffixParts[0])-1], fullPathCheck,
)
}
// If prefix is `attributes` or `resource` there is no need to add a nil check for
// the prefix since all log records have non nil `attributes` and `resource` fields.
if slices.Contains([]string{"attributes", "resource"}, parts[0]) {
return fullPathCheck, nil
}
return fmt.Sprintf("%s && %s", collectionNotNilCheck, fullPathCheck), nil
}
// Split `str` after `sep` from the right to create up to `n` parts.
// rSplitAfterN("a.b.c.d", ".", 3) -> ["a.b", ".c", ".d"]
func rSplitAfterN(str string, sep string, n int) []string {
reversedStr := reverseString(str)
parts := strings.SplitAfterN(reversedStr, sep, n)
slices.Reverse(parts)
result := []string{}
for _, p := range parts {
result = append(result, reverseString(p))
}
return result
}
func reverseString(s string) string {
r := []rune(s)
for i := 0; i < len(r)/2; i++ {
j := len(s) - 1 - i
r[i], r[j] = r[j], r[i]
}
return string(r)
}
// Generate expression for checking that all fields referenced in `expr` have a non nil value in log record.
// Eg: `attributes.x + len(resource.y)` will return the expression `attributes.x != nil && resource.y != nil`
func fieldsReferencedInExprNotNilCheck(expr string) (string, error) {
referencedFields, err := logFieldsReferencedInExpr(expr)
if err != nil {
return "", fmt.Errorf("couldn't extract log fields referenced in expr %s: %w", expr, err)
}
// Generating nil check for deepest fields takes care of their prefixes too.
// Eg: `attributes.test.value + len(attributes.test)` needs a nil check only for `attributes.test.value`
deepestFieldRefs := []string{}
for _, field := range referencedFields {
isPrefixOfAnotherReferencedField := slices.ContainsFunc(
referencedFields, func(e string) bool {
return len(e) > len(field) && strings.HasPrefix(e, field)
},
)
if !isPrefixOfAnotherReferencedField {
deepestFieldRefs = append(deepestFieldRefs, field)
}
}
fieldExprChecks := []string{}
for _, field := range deepestFieldRefs {
checkExpr, err := fieldNotNilCheck(field)
if err != nil {
return "", fmt.Errorf("could not create nil check for %s: %w", field, err)
}
fieldExprChecks = append(fieldExprChecks, fmt.Sprintf("(%s)", checkExpr))
}
return strings.Join(fieldExprChecks, " && "), nil
}
// Expr AST visitor for extracting referenced log fields
// See more at https://github.com/expr-lang/expr/blob/master/ast/visitor.go
type logFieldsInExprExtractor struct {
referencedFields []string
}
func (v *logFieldsInExprExtractor) Visit(node *ast.Node) {
if n, ok := (*node).(*ast.MemberNode); ok {
memberRef := n.String()
// coalesce ops end up as MemberNode right now for some reason.
// ignore such member nodes.
if strings.Contains(memberRef, "??") {
return
}
if strings.HasPrefix(memberRef, "attributes") || strings.HasPrefix(memberRef, "resource") {
v.referencedFields = append(v.referencedFields, memberRef)
}
}
}
func logFieldsReferencedInExpr(expr string) ([]string, error) {
// parse abstract syntax tree for expr
exprAst, err := parser.Parse(expr)
if err != nil {
return nil, fmt.Errorf("could not parse expr: %w", err)
}
// walk ast for expr to collect all member references.
v := &logFieldsInExprExtractor{}
ast.Walk(&exprAst.Node, v)
return v.referencedFields, nil
}

View File

@@ -386,8 +386,19 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) {
makeTestLog("mismatching log", map[string]string{
"test_timestamp": "not-an-epoch",
}),
}, {
"grok parser should ignore logs with missing parse from field",
PipelineOperator{
ID: "grok",
Type: "grok_parser",
Enabled: true,
Name: "grok parser",
ParseFrom: "attributes.test",
Pattern: "%{GREEDYDATA}",
ParseTo: "attributes.test_parsed",
},
makeTestLog("test log with missing parse from field", map[string]string{}),
},
// TODO(Raj): see if there is an error scenario for grok parser.
// TODO(Raj): see if there is an error scenario for trace parser.
// TODO(Raj): see if there is an error scenario for Add operator.
}
@@ -608,6 +619,191 @@ func TestAttributePathsContainingDollarDoNotBreakCollector(t *testing.T) {
require.Equal("test", result[0].Attributes_string["$test1"])
}
func TestMembershipOpInProcessorFieldExpressions(t *testing.T) {
require := require.New(t)
testLogs := []model.SignozLog{
makeTestSignozLog("test log", map[string]interface{}{
"http.method": "GET",
"order.products": `{"ids": ["pid0", "pid1"]}`,
}),
}
testPipeline := Pipeline{
OrderId: 1,
Name: "pipeline1",
Alias: "pipeline1",
Enabled: true,
Filter: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "http.method",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "GET",
},
},
},
Config: []PipelineOperator{
{
ID: "move",
Type: "move",
Enabled: true,
Name: "move",
From: `attributes["http.method"]`,
To: `attributes["test.http.method"]`,
}, {
ID: "json",
Type: "json_parser",
Enabled: true,
Name: "json",
ParseFrom: `attributes["order.products"]`,
ParseTo: `attributes["order.products"]`,
}, {
ID: "move1",
Type: "move",
Enabled: true,
Name: "move1",
From: `attributes["order.products"].ids`,
To: `attributes["order.product_ids"]`,
}, {
ID: "move2",
Type: "move",
Enabled: true,
Name: "move2",
From: `attributes.test?.doesnt_exist`,
To: `attributes["test.doesnt_exist"]`,
}, {
ID: "add",
Type: "add",
Enabled: true,
Name: "add",
Field: `attributes["order.pids"].missing_field`,
Value: `EXPR(attributes.a["b.c"].d[4].e + resource.f)`,
}, {
ID: "add2",
Type: "add",
Enabled: true,
Name: "add2",
Field: `attributes["order.pids.pid0"]`,
Value: `EXPR(attributes["order.product_ids"][0])`,
}, {
ID: "add3",
Type: "add",
Enabled: true,
Name: "add3",
Field: `attributes["attrs.test.value"]`,
Value: `EXPR(attributes.test?.value)`,
}, {
ID: "add4",
Type: "add",
Enabled: true,
Name: "add4",
Field: `attributes["attrs.test.value"]`,
Value: `EXPR((attributes.temp?.request_context?.scraper ?? [nil])[0])`,
},
},
}
result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing(
context.Background(),
[]Pipeline{testPipeline},
testLogs,
)
require.Nil(err)
require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n"))
require.Equal(1, len(result))
_, methodAttrExists := result[0].Attributes_string["http.method"]
require.False(methodAttrExists)
require.Equal("GET", result[0].Attributes_string["test.http.method"])
require.Equal("pid0", result[0].Attributes_string["order.pids.pid0"])
}
func TestContainsFilterIsCaseInsensitive(t *testing.T) {
// The contains and ncontains query builder filters are case insensitive when querying logs.
// Pipeline filter should also behave in the same way.
require := require.New(t)
testLogs := []model.SignozLog{
makeTestSignozLog("test Ecom Log", map[string]interface{}{}),
}
testPipelines := []Pipeline{{
OrderId: 1,
Name: "pipeline1",
Alias: "pipeline1",
Enabled: true,
Filter: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{{
Key: v3.AttributeKey{
Key: "body",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeUnspecified,
IsColumn: true,
},
Operator: "contains",
Value: "log",
}},
},
Config: []PipelineOperator{
{
ID: "add",
Type: "add",
Enabled: true,
Name: "add",
Field: "attributes.test1",
Value: "value1",
},
},
}, {
OrderId: 2,
Name: "pipeline2",
Alias: "pipeline2",
Enabled: true,
Filter: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{{
Key: v3.AttributeKey{
Key: "body",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeUnspecified,
IsColumn: true,
},
Operator: "ncontains",
Value: "ecom",
}},
},
Config: []PipelineOperator{
{
ID: "add",
Type: "add",
Enabled: true,
Name: "add",
Field: "attributes.test2",
Value: "value2",
},
},
}}
result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing(
context.Background(), testPipelines, testLogs,
)
require.Nil(err)
require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n"))
require.Equal(1, len(result))
require.Equal(result[0].Attributes_string["test1"], "value1")
_, test2Exists := result[0].Attributes_string["test2"]
require.False(test2Exists)
}
func TestTemporaryWorkaroundForSupportingAttribsContainingDots(t *testing.T) {
// TODO(Raj): Remove this after dots are supported

View File

@@ -38,7 +38,7 @@ func TestPanelTableForCumulative(t *testing.T) {
},
Expression: "A",
},
expected: "SELECT toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts",
expected: "SELECT toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts",
},
{
name: "latency p50",
@@ -61,7 +61,7 @@ func TestPanelTableForCumulative(t *testing.T) {
},
Expression: "A",
},
expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, le ORDER BY fingerprint, le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, le ORDER BY fingerprint, le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
},
{
name: "latency p99 with group by",
@@ -80,7 +80,7 @@ func TestPanelTableForCumulative(t *testing.T) {
},
Expression: "A",
},
expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT service_name,le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, service_name,le ORDER BY fingerprint, service_name ASC,le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT service_name,le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, service_name,le ORDER BY fingerprint, service_name ASC,le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
},
}

View File

@@ -38,7 +38,7 @@ func TestPanelTableForDelta(t *testing.T) {
},
Expression: "A",
},
expected: "SELECT toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY ts ORDER BY ts",
expected: "SELECT toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY ts ORDER BY ts",
},
{
name: "latency p50",
@@ -61,7 +61,7 @@ func TestPanelTableForDelta(t *testing.T) {
},
Expression: "A",
},
expected: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
expected: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
},
{
name: "latency p99 with group by",
@@ -80,7 +80,7 @@ func TestPanelTableForDelta(t *testing.T) {
},
Expression: "A",
},
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
},
}

View File

@@ -2,6 +2,7 @@ package v3
import (
"fmt"
"math"
"strings"
"time"
@@ -131,7 +132,7 @@ func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.Attribut
}
}
filterSubQuery := fmt.Sprintf("SELECT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, queryString)
filterSubQuery := fmt.Sprintf("SELECT DISTINCT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, queryString)
return filterSubQuery, nil
}
@@ -172,7 +173,7 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str
return "", err
}
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
// Select the aggregate value for interval
queryTmpl :=
@@ -447,10 +448,14 @@ func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v
// start and end are in milliseconds
// step is in seconds
func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options Options) (string, error) {
// adjust the start and end time to be aligned with the step interval
start = start - (start % (mq.StepInterval * 1000))
end = end - (end % (mq.StepInterval * 1000))
// if the query is a rate query, we adjust the start time by one more step
// so that we can calculate the rate for the first data point
if mq.AggregateOperator.IsRateOperator() && mq.Temporality != v3.Delta {
start -= mq.StepInterval * 1000
}
adjustStep := int64(math.Min(float64(mq.StepInterval), 60))
end = end - (end % (adjustStep * 1000))
var query string
var err error
@@ -497,6 +502,18 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
if panelType == v3.PanelTypeValue {
query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator)
}
// We want to prevent the query from returning too many time series, which can cause
// excessive resource usage and slow query times. This usually happens when there are
// no filters on the query.
//
// We use the DISTINCT to remove duplicate time series, and use the max_rows_in_distinct
// setting to throw an error if the number of time series is too high.
// See https://clickhouse.com/docs/en/operations/settings/query-complexity#max-rows-in-distinct
//
// This can be disabled by setting the limit to 0.
if mq.QueryLimits.MaxTimeSeries != 0 {
query = query + " SETTINGS max_rows_in_distinct = " + fmt.Sprintf("%d", mq.QueryLimits.MaxTimeSeries)
}
return query, err
}

View File

@@ -245,7 +245,7 @@ func TestBuildQueryOperators(t *testing.T) {
func TestBuildQueryXRate(t *testing.T) {
t.Run("TestBuildQueryXRate", func(t *testing.T) {
tmpl := `SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts`
tmpl := `SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts`
cases := []struct {
aggregateOperator v3.AggregateOperator
@@ -298,7 +298,7 @@ func TestBuildQueryXRate(t *testing.T) {
func TestBuildQueryRPM(t *testing.T) {
t.Run("TestBuildQueryXRate", func(t *testing.T) {
tmpl := `SELECT ts, ceil(value * 60) as value FROM (SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts)`
tmpl := `SELECT ts, ceil(value * 60) as value FROM (SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts)`
cases := []struct {
aggregateOperator v3.AggregateOperator
@@ -376,8 +376,8 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
},
},
},
// 20:11:00 - 20:41:00
expected: "timestamp_ms >= 1686082260000 AND timestamp_ms <= 1686084060000",
// 20:10:00 - 20:41:00
expected: "timestamp_ms >= 1686082200000 AND timestamp_ms < 1686084060000",
},
{
name: "TestBuildQueryAdjustedTimes start close to 50 seconds",
@@ -401,8 +401,8 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
},
},
},
// 20:11:00 - 20:41:00
expected: "timestamp_ms >= 1686082260000 AND timestamp_ms <= 1686084060000",
// 20:10:00 - 20:41:00
expected: "timestamp_ms >= 1686082200000 AND timestamp_ms < 1686084060000",
},
{
name: "TestBuildQueryAdjustedTimes start close to 42 seconds with step 30 seconds",
@@ -426,8 +426,8 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
},
},
},
// 20:11:30 - 20:41:00
expected: "timestamp_ms >= 1686082290000 AND timestamp_ms <= 1686084060000",
// 20:11:00 - 20:41:00
expected: "timestamp_ms >= 1686082260000 AND timestamp_ms < 1686084060000",
},
{
name: "TestBuildQueryAdjustedTimes start close to 42 seconds with step 30 seconds and end close to 30 seconds",
@@ -451,8 +451,8 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
},
},
},
// 20:11:30 - 20:41:00
expected: "timestamp_ms >= 1686082290000 AND timestamp_ms <= 1686084060000",
// 20:11:00 - 20:41:00
expected: "timestamp_ms >= 1686082260000 AND timestamp_ms < 1686084060000",
},
{
name: "TestBuildQueryAdjustedTimes start close to 42 seconds with step 300 seconds and end close to 30 seconds",
@@ -476,8 +476,10 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
},
},
},
// 20:10:00 - 20:40:00
expected: "timestamp_ms >= 1686082200000 AND timestamp_ms <= 1686084000000",
// 20:05:00 - 20:41:00
// 20:10:00 is the nearest 5 minute interval, but we round down to 20:05:00
// as this is a rate query and we want to include the previous value for the first interval
expected: "timestamp_ms >= 1686081900000 AND timestamp_ms < 1686084060000",
},
{
name: "TestBuildQueryAdjustedTimes start close to 42 seconds with step 180 seconds and end close to 30 seconds",
@@ -501,8 +503,10 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
},
},
},
// 20:09:00 - 20:39:00
expected: "timestamp_ms >= 1686082140000 AND timestamp_ms <= 1686083940000",
// 20:06:00 - 20:39:00
// 20:09:00 is the nearest 3 minute interval, but we round down to 20:06:00
// as this is a rate query and we want to include the previous value for the first interval
expected: "timestamp_ms >= 1686081960000 AND timestamp_ms < 1686084060000",
},
}
@@ -516,3 +520,34 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
})
}
}
func TestBuildQueryLimits(t *testing.T) {
t.Run("TestBuildQueryLimits", func(t *testing.T) {
q := &v3.QueryRangeParamsV3{
Start: 1650991982000,
End: 1651078382000,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
AggregateAttribute: v3.AttributeKey{Key: "name"},
Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "a"}, Value: "b", Operator: v3.FilterOperatorNotEqual},
{Key: v3.AttributeKey{Key: "code"}, Value: "ERROR_*", Operator: v3.FilterOperatorNotRegex},
}},
AggregateOperator: v3.AggregateOperatorRateMax,
Expression: "A",
QueryLimits: v3.QueryLimits{
MaxTimeSeries: 100,
},
},
},
},
}
query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"], Options{PreferRPM: false})
require.NoError(t, err)
require.Contains(t, query, "SETTINGS max_rows_in_distinct = 100")
})
}

View File

@@ -0,0 +1,61 @@
package delta
import (
"fmt"
"strings"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
// groupingSets returns a string of comma separated tags for group by clause
// `ts` is always added to the group by clause
func groupingSets(tags ...string) string {
withTs := append(tags, "ts")
if len(withTs) > 1 {
return fmt.Sprintf(`GROUPING SETS ( (%s), (%s) )`, strings.Join(withTs, ", "), strings.Join(tags, ", "))
} else {
return strings.Join(withTs, ", ")
}
}
// groupingSetsByAttributeKeyTags returns a string of comma separated tags for group by clause
func groupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string {
groupTags := []string{}
for _, tag := range tags {
groupTags = append(groupTags, tag.Key)
}
return groupingSets(groupTags...)
}
// groupBy returns a string of comma separated tags for group by clause
func groupByAttributeKeyTags(tags ...v3.AttributeKey) string {
groupTags := []string{}
for _, tag := range tags {
groupTags = append(groupTags, tag.Key)
}
groupTags = append(groupTags, "ts")
return strings.Join(groupTags, ", ")
}
// orderBy returns a string of comma separated tags for order by clause
// if the order is not specified, it defaults to ASC
func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string {
var orderBy []string
for _, tag := range tags {
found := false
for _, item := range items {
if item.ColumnName == tag.Key {
found = true
orderBy = append(orderBy, fmt.Sprintf("%s %s", item.ColumnName, item.Order))
break
}
}
if !found {
orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag.Key))
}
}
orderBy = append(orderBy, "ts ASC")
return strings.Join(orderBy, ", ")
}

View File

@@ -0,0 +1,229 @@
package delta
import (
"testing"
"github.com/stretchr/testify/assert"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func TestPrepareTimeAggregationSubQuery(t *testing.T) {
// The time aggregation is performed for each unique series - since the fingerprint represents the
// unique hash of label set, we always group by fingerprint regardless of the GroupBy
// This sub result is then aggregated on dimensions using the provided GroupBy clause keys
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
start int64
end int64
expectedQueryContains string
}{
{
name: "test time aggregation = avg, temporality = delta",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "http_requests",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyTypeUnspecified,
IsColumn: true,
IsJSON: false,
},
Temporality: v3.Delta,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorNotEqual,
Value: "payment_service",
},
{
Key: v3.AttributeKey{
Key: "endpoint",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorIn,
Value: []interface{}{"/paycallback", "/payme", "/paypal"},
},
},
},
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
}},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationAvg,
},
start: 1701794980000,
end: 1701796780000,
expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts",
},
{
name: "test time aggregation = rate, temporality = delta",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "http_requests",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyTypeUnspecified,
IsColumn: true,
IsJSON: false,
},
Temporality: v3.Delta,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorContains,
Value: "payment_service",
},
},
},
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
}},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationRate,
},
start: 1701794980000,
end: 1701796780000,
expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
query, err := prepareTimeAggregationSubQueryTimeSeries(
testCase.start,
testCase.end,
testCase.builderQuery.StepInterval,
testCase.builderQuery,
)
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}
func TestPrepareTimeseriesQuery(t *testing.T) {
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
start int64
end int64
expectedQueryContains string
}{
{
name: "test time aggregation = avg, space aggregation = sum, temporality = unspecified",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "system_memory_usage",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyTypeUnspecified,
IsColumn: true,
IsJSON: false,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "state",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorNotEqual,
Value: "idle",
},
},
},
GroupBy: []v3.AttributeKey{},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
},
start: 1701794980000,
end: 1701796780000,
expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC",
},
{
name: "test time aggregation = rate, space aggregation = sum, temporality = delta",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "http_requests",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyTypeUnspecified,
IsColumn: true,
IsJSON: false,
},
Temporality: v3.Delta,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorContains,
Value: "payment_service",
},
},
},
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
}},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
start: 1701794980000,
end: 1701796780000,
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
query, err := prepareMetricQueryDeltaTimeSeries(
testCase.start,
testCase.end,
testCase.builderQuery.StepInterval,
testCase.builderQuery,
)
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}

View File

@@ -0,0 +1,120 @@
package delta
import (
"fmt"
v4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
)
// prepareTimeAggregationSubQueryTimeSeries builds the sub-query to be used for temporal aggregation
func prepareTimeAggregationSubQueryTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
var subQuery string
timeSeriesSubQuery, err := v4.PrepareTimeseriesFilterQuery(mq)
if err != nil {
return "", err
}
samplesTableFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
// Select the aggregate value for interval
queryTmpl :=
"SELECT fingerprint, %s" +
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
" %s as per_series_value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
" INNER JOIN" +
" (%s) as filtered_time_series" +
" USING fingerprint" +
" WHERE " + samplesTableFilter +
" GROUP BY fingerprint, ts" +
" ORDER BY fingerprint, ts"
var selectLabelsAny string
for _, tag := range mq.GroupBy {
selectLabelsAny += fmt.Sprintf("any(%s) as %s,", tag.Key, tag.Key)
}
var selectLabels string
for _, tag := range mq.GroupBy {
selectLabels += tag.Key + ","
}
switch mq.TimeAggregation {
case v3.TimeAggregationAvg:
op := "avg(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationSum:
op := "sum(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationMin:
op := "min(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationMax:
op := "max(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationCount:
op := "count(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationCountDistinct:
op := "count(distinct(value))"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationAnyLast:
op := "anyLast(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationRate:
op := fmt.Sprintf("sum(value)/%d", step)
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationIncrease:
op := "sum(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
}
return subQuery, nil
}
// prepareMetricQueryDeltaTimeSeries builds the query to be used for fetching metrics
func prepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
var query string
temporalAggSubQuery, err := prepareTimeAggregationSubQueryTimeSeries(start, end, step, mq)
if err != nil {
return "", err
}
groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...)
orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
selectLabels := groupByAttributeKeyTags(mq.GroupBy...)
queryTmpl :=
"SELECT %s," +
" %s as value" +
" FROM (%s)" +
" WHERE isNaN(per_series_value) = 0" +
" GROUP BY %s" +
" ORDER BY %s"
switch mq.SpaceAggregation {
case v3.SpaceAggregationAvg:
op := "avg(per_series_value)"
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
case v3.SpaceAggregationSum:
op := "sum(per_series_value)"
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
case v3.SpaceAggregationMin:
op := "min(per_series_value)"
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
case v3.SpaceAggregationMax:
op := "max(per_series_value)"
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
case v3.SpaceAggregationCount:
op := "count(per_series_value)"
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
}
return query, nil
}

View File

@@ -258,7 +258,7 @@ func (agent *Agent) processStatusUpdate(
// If remote config is changed and different from what the Agent has then
// send the new remote config to the Agent.
if configChanged ||
(agent.Status.RemoteConfigStatus != nil &&
(agent.Status.RemoteConfigStatus != nil && agent.remoteConfig != nil &&
!bytes.Equal(agent.Status.RemoteConfigStatus.LastRemoteConfigHash, agent.remoteConfig.ConfigHash)) {
// The new status resulted in a change in the config of the Agent or the Agent
// does not have this config (hash is different). Send the new config the Agent.
@@ -276,7 +276,7 @@ func (agent *Agent) processStatusUpdate(
func (agent *Agent) updateRemoteConfig(configProvider AgentConfigProvider) bool {
recommendedConfig, confId, err := configProvider.RecommendAgentConfig([]byte(agent.EffectiveConfig))
if err != nil {
zap.S().Errorf("could not generate config recommendation for agent %d: %w", agent.ID, err)
zap.S().Error("could not generate config recommendation for agent:", agent.ID, err)
return false
}

View File

@@ -145,7 +145,14 @@ func (q *querier) runBuilderQuery(
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.
if _, ok := cacheKeys[queryName]; !ok {
query, err := metricsV3.PrepareMetricQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM})
query, err := metricsV3.PrepareMetricQuery(
params.Start,
params.End,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
metricsV3.Options{PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
@@ -175,7 +182,7 @@ func (q *querier) runBuilderQuery(
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
metricsV3.Options{},
metricsV3.Options{PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{

View File

@@ -558,8 +558,8 @@ func TestQueryRange(t *testing.T) {
}
q := NewQuerier(opts)
expectedTimeRangeInQueryString := []string{
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000, 1675115580000+120*60*1000),
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000),
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms < %d", 1675115520000, 1675115580000+120*60*1000),
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms < %d", 1675115520000+120*60*1000, 1675115580000+180*60*1000),
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", 1675115580000*1000000, (1675115580000+120*60*1000)*int64(1000000)),
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)),
}
@@ -669,7 +669,7 @@ func TestQueryRangeValueType(t *testing.T) {
q := NewQuerier(opts)
// No caching
expectedTimeRangeInQueryString := []string{
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000, 1675115580000+120*60*1000),
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms < %d", 1675115520000, 1675115580000+120*60*1000),
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)),
}

View File

@@ -233,7 +233,8 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...in
queries[queryName] = queryString
}
case v3.DataSourceMetrics:
queryString, err := qb.options.BuildMetricQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, metricsV3.Options{PreferRPM: PreferRPMFeatureEnabled})
metricsOptions := metricsV3.Options{PreferRPM: PreferRPMFeatureEnabled}
queryString, err := qb.options.BuildMetricQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, metricsOptions)
if err != nil {
return nil, err
}

View File

@@ -257,7 +257,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
},
},
queryToTest: "A",
expected: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts",
expected: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts",
},
{
name: "TestQueryWithExpression - Error rate",
@@ -327,7 +327,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
},
},
queryToTest: "C",
expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`",
expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`",
},
{
name: "TestQuery - Quantile",
@@ -354,7 +354,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
},
},
queryToTest: "A",
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
},
}

View File

@@ -353,6 +353,7 @@ type QueryRangeParamsV3 struct {
CompositeQuery *CompositeQuery `json:"compositeQuery"`
Variables map[string]interface{} `json:"variables,omitempty"`
NoCache bool `json:"noCache"`
SourcePage string `json:"sourcePage"`
}
type PromQuery struct {
@@ -501,6 +502,11 @@ type BuilderQuery struct {
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
Functions []Function `json:"functions,omitempty"`
QueryLimits QueryLimits
}
type QueryLimits struct {
MaxTimeSeries int
}
func (b *BuilderQuery) Validate() error {

View File

@@ -73,9 +73,18 @@ func Parse(filters *v3.FilterSet) (string, error) {
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
filter = fmt.Sprintf("%s %s %s", exprFormattedValue(v.Key.Key), logOperatorsToExpr[v.Operator], getTypeName(v.Key.Type))
default:
filter = fmt.Sprintf("%s %s %s", name, logOperatorsToExpr[v.Operator], exprFormattedValue(v.Value))
if v.Operator == v3.FilterOperatorContains || v.Operator == v3.FilterOperatorNotContains {
// `contains` and `ncontains` should be case insensitive to match how they work when querying logs.
filter = fmt.Sprintf(
"lower(%s) %s lower(%s)",
name, logOperatorsToExpr[v.Operator], exprFormattedValue(v.Value),
)
}
// Avoid running operators on nil values
if v.Operator != v3.FilterOperatorEqual && v.Operator != v3.FilterOperatorNotEqual {
filter = fmt.Sprintf("%s != nil && %s", name, filter)

View File

@@ -6,10 +6,12 @@ import (
"fmt"
"math"
"reflect"
"regexp"
"sort"
"sync"
"text/template"
"time"
"unicode"
"go.uber.org/zap"
@@ -435,7 +437,7 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer
for i, v := range vars {
colName := columnNames[i]
colName := normalizeLabelName(columnNames[i])
switch v := v.(type) {
case *string:
@@ -764,6 +766,23 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch c
return nil, fmt.Errorf("this is unexpected, invalid query label")
}
func normalizeLabelName(name string) string {
// See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
// Regular expression to match non-alphanumeric characters except underscores
reg := regexp.MustCompile(`[^a-zA-Z0-9_]`)
// Replace all non-alphanumeric characters except underscores with underscores
normalized := reg.ReplaceAllString(name, "_")
// If the first character is not a letter or an underscore, prepend an underscore
if len(normalized) > 0 && !unicode.IsLetter(rune(normalized[0])) && normalized[0] != '_' {
normalized = "_" + normalized
}
return normalized
}
func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) {
valueFormatter := formatter.FromUnit(r.Unit())
@@ -829,7 +848,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
annotations := make(labels.Labels, 0, len(r.annotations))
for _, a := range r.annotations {
annotations = append(annotations, labels.Label{Name: a.Name, Value: expand(a.Value)})
annotations = append(annotations, labels.Label{Name: normalizeLabelName(a.Name), Value: expand(a.Value)})
}
lbs := lb.Labels()

View File

@@ -295,3 +295,43 @@ func TestThresholdRuleCombinations(t *testing.T) {
}
}
}
func TestNormalizeLabelName(t *testing.T) {
cases := []struct {
labelName string
expected string
}{
{
labelName: "label",
expected: "label",
},
{
labelName: "label.with.dots",
expected: "label_with_dots",
},
{
labelName: "label-with-dashes",
expected: "label_with_dashes",
},
{
labelName: "labelwithnospaces",
expected: "labelwithnospaces",
},
{
labelName: "label with spaces",
expected: "label_with_spaces",
},
{
labelName: "label with spaces and .dots",
expected: "label_with_spaces_and__dots",
},
{
labelName: "label with spaces and -dashes",
expected: "label_with_spaces_and__dashes",
},
}
for _, c := range cases {
assert.Equal(t, c.expected, normalizeLabelName(c.labelName))
}
}

View File

@@ -192,7 +192,7 @@ services:
<<: *db-depend
otel-collector-migrator:
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.88.4}
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.88.6}
container_name: otel-migrator
command:
- "--dsn=tcp://clickhouse:9000"
@@ -205,7 +205,7 @@ services:
# condition: service_healthy
otel-collector:
image: signoz/signoz-otel-collector:0.88.4
image: signoz/signoz-otel-collector:0.88.6
container_name: signoz-otel-collector
command:
[
@@ -245,7 +245,7 @@ services:
condition: service_healthy
otel-collector-metrics:
image: signoz/signoz-otel-collector:0.88.4
image: signoz/signoz-otel-collector:0.88.6
container_name: signoz-otel-collector-metrics
command:
[