Compare commits

...

1 Commits

Author SHA1 Message Date
nityanandagohain
277419fad2 fix: use middleware from common middleware package 2025-01-21 11:37:19 +05:30
6 changed files with 335 additions and 579 deletions

View File

@@ -1,22 +1,16 @@
package app
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
_ "net/http/pprof" // http profiler
"os"
"regexp"
"time"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/jmoiron/sqlx"
"github.com/rs/cors"
@@ -29,14 +23,13 @@ import (
"go.signoz.io/signoz/ee/query-service/integrations/gateway"
"go.signoz.io/signoz/ee/query-service/interfaces"
"go.signoz.io/signoz/ee/query-service/rules"
baseauth "go.signoz.io/signoz/pkg/query-service/auth"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/web"
licensepkg "go.signoz.io/signoz/ee/query-service/license"
"go.signoz.io/signoz/ee/query-service/usage"
"go.signoz.io/signoz/pkg/http/middleware"
"go.signoz.io/signoz/pkg/query-service/agentConf"
baseapp "go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
@@ -319,10 +312,17 @@ func (s *Server) createPrivateServer(apiHandler *api.APIHandler) (*http.Server,
r := baseapp.NewRouter()
r.Use(setTimeoutMiddleware)
r.Use(s.analyticsMiddleware)
r.Use(loggingMiddlewarePrivate)
r.Use(baseapp.LogCommentEnricher)
timeoutMiddleware := middleware.NewTimeout(zap.L(), baseconst.TimeoutExcludedRoutes, 60*time.Second, 600*time.Second)
r.Use(timeoutMiddleware.Wrap)
analyticsMiddleware := middleware.NewAnalytics(zap.L())
r.Use(analyticsMiddleware.Wrap)
loggingMiddleware := middleware.NewLogging(zap.L())
r.Use(loggingMiddleware.Wrap)
logCommentMiddleware := middleware.NewLogComment(zap.L())
r.Use(logCommentMiddleware.Wrap)
apiHandler.RegisterPrivateRoutes(r)
@@ -362,10 +362,17 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
}
am := baseapp.NewAuthMiddleware(getUserFromRequest)
r.Use(setTimeoutMiddleware)
r.Use(s.analyticsMiddleware)
r.Use(loggingMiddleware)
r.Use(baseapp.LogCommentEnricher)
timeoutMiddleware := middleware.NewTimeout(zap.L(), baseconst.TimeoutExcludedRoutes, 60*time.Second, 600*time.Second)
r.Use(timeoutMiddleware.Wrap)
analyticsMiddleware := middleware.NewAnalytics(zap.L())
r.Use(analyticsMiddleware.Wrap)
loggingMiddleware := middleware.NewLogging(zap.L())
r.Use(loggingMiddleware.Wrap)
logCommentMiddleware := middleware.NewLogComment(zap.L())
r.Use(logCommentMiddleware.Wrap)
apiHandler.RegisterRoutes(r, am)
apiHandler.RegisterLogsRoutes(r, am)
@@ -397,216 +404,6 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
}, nil
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
// loggingMiddleware is used for logging public api calls
func loggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
route := mux.CurrentRoute(r)
path, _ := route.GetPathTemplate()
startTime := time.Now()
next.ServeHTTP(w, r)
zap.L().Info(path, zap.Duration("timeTaken", time.Since(startTime)), zap.String("path", path))
})
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
// loggingMiddlewarePrivate is used for logging private api calls
// from internal services like alert manager
func loggingMiddlewarePrivate(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
route := mux.CurrentRoute(r)
path, _ := route.GetPathTemplate()
startTime := time.Now()
next.ServeHTTP(w, r)
zap.L().Info(path, zap.Duration("timeTaken", time.Since(startTime)), zap.String("path", path), zap.Bool("tprivatePort", true))
})
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
type loggingResponseWriter struct {
http.ResponseWriter
statusCode int
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
func NewLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
// WriteHeader(int) is not called if our response implicitly returns 200 OK, so
// we default to that status code.
return &loggingResponseWriter{w, http.StatusOK}
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
// Flush implements the http.Flush interface.
func (lrw *loggingResponseWriter) Flush() {
lrw.ResponseWriter.(http.Flusher).Flush()
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
// Support websockets
func (lrw *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
h, ok := lrw.ResponseWriter.(http.Hijacker)
if !ok {
return nil, nil, errors.New("hijack not supported")
}
return h.Hijack()
}
func extractQueryRangeData(path string, r *http.Request) (map[string]interface{}, bool) {
pathToExtractBodyFromV3 := "/api/v3/query_range"
pathToExtractBodyFromV4 := "/api/v4/query_range"
data := map[string]interface{}{}
var postData *v3.QueryRangeParamsV3
if (r.Method == "POST") && ((path == pathToExtractBodyFromV3) || (path == pathToExtractBodyFromV4)) {
if r.Body != nil {
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
return nil, false
}
r.Body.Close() // must close
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
json.Unmarshal(bodyBytes, &postData)
} else {
return nil, false
}
} else {
return nil, false
}
referrer := r.Header.Get("Referer")
dashboardMatched, err := regexp.MatchString(`/dashboard/[a-zA-Z0-9\-]+/(new|edit)(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the referrer", zap.Error(err))
}
alertMatched, err := regexp.MatchString(`/alerts/(new|edit)(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the alert: ", zap.Error(err))
}
logsExplorerMatched, err := regexp.MatchString(`/logs/logs-explorer(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the logs explorer: ", zap.Error(err))
}
traceExplorerMatched, err := regexp.MatchString(`/traces-explorer(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the trace explorer: ", zap.Error(err))
}
queryInfoResult := telemetry.GetInstance().CheckQueryInfo(postData)
if (queryInfoResult.MetricsUsed || queryInfoResult.LogsUsed || queryInfoResult.TracesUsed) && (queryInfoResult.FilterApplied) {
if queryInfoResult.MetricsUsed {
telemetry.GetInstance().AddActiveMetricsUser()
}
if queryInfoResult.LogsUsed {
telemetry.GetInstance().AddActiveLogsUser()
}
if queryInfoResult.TracesUsed {
telemetry.GetInstance().AddActiveTracesUser()
}
data["metricsUsed"] = queryInfoResult.MetricsUsed
data["logsUsed"] = queryInfoResult.LogsUsed
data["tracesUsed"] = queryInfoResult.TracesUsed
data["filterApplied"] = queryInfoResult.FilterApplied
data["groupByApplied"] = queryInfoResult.GroupByApplied
data["aggregateOperator"] = queryInfoResult.AggregateOperator
data["aggregateAttributeKey"] = queryInfoResult.AggregateAttributeKey
data["numberOfQueries"] = queryInfoResult.NumberOfQueries
data["queryType"] = queryInfoResult.QueryType
data["panelType"] = queryInfoResult.PanelType
userEmail, err := baseauth.GetEmailFromJwt(r.Context())
if err == nil {
// switch case to set data["screen"] based on the referrer
switch {
case dashboardMatched:
data["screen"] = "panel"
case alertMatched:
data["screen"] = "alert"
case logsExplorerMatched:
data["screen"] = "logs-explorer"
case traceExplorerMatched:
data["screen"] = "traces-explorer"
default:
data["screen"] = "unknown"
return data, true
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_API, data, userEmail, true, false)
}
}
return data, true
}
func getActiveLogs(path string, r *http.Request) {
// if path == "/api/v1/dashboards/{uuid}" {
// telemetry.GetInstance().AddActiveMetricsUser()
// }
if path == "/api/v1/logs" {
hasFilters := len(r.URL.Query().Get("q"))
if hasFilters > 0 {
telemetry.GetInstance().AddActiveLogsUser()
}
}
}
func (s *Server) analyticsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := baseauth.AttachJwtToContext(r.Context(), r)
r = r.WithContext(ctx)
route := mux.CurrentRoute(r)
path, _ := route.GetPathTemplate()
queryRangeData, metadataExists := extractQueryRangeData(path, r)
getActiveLogs(path, r)
lrw := NewLoggingResponseWriter(w)
next.ServeHTTP(lrw, r)
data := map[string]interface{}{"path": path, "statusCode": lrw.statusCode}
if metadataExists {
for key, value := range queryRangeData {
data[key] = value
}
}
if _, ok := telemetry.EnabledPaths()[path]; ok {
userEmail, err := baseauth.GetEmailFromJwt(r.Context())
if err == nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data, userEmail, true, false)
}
}
})
}
// TODO(remove): Implemented at pkg/http/middleware/timeout.go
func setTimeoutMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var cancel context.CancelFunc
// check if route is not excluded
url := r.URL.Path
if _, ok := baseconst.TimeoutExcludedRoutes[url]; !ok {
ctx, cancel = context.WithTimeout(r.Context(), baseconst.ContextTimeout)
defer cancel()
}
r = r.WithContext(ctx)
next.ServeHTTP(w, r)
})
}
// initListeners initialises listeners of the server
func (s *Server) initListeners() error {
// listen on public port

View File

@@ -0,0 +1,198 @@
package middleware
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"io"
"net"
"net/http"
"regexp"
"github.com/gorilla/mux"
"go.signoz.io/signoz/pkg/query-service/auth"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/telemetry"
"go.uber.org/zap"
)
type Analytics struct {
logger *zap.Logger
}
func NewAnalytics(logger *zap.Logger) *Analytics {
if logger == nil {
panic("cannot build analytics, logger is empty")
}
return &Analytics{
logger: logger.Named(pkgname),
}
}
func (middleware *Analytics) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := auth.AttachJwtToContext(r.Context(), r)
r = r.WithContext(ctx)
route := mux.CurrentRoute(r)
path, _ := route.GetPathTemplate()
queryRangeData, metadataExists := extractQueryRangeData(path, r)
getActiveLogs(path, r)
lrw := NewLoggingResponseWriter(w)
next.ServeHTTP(lrw, r)
data := map[string]interface{}{"path": path, "statusCode": lrw.statusCode}
if metadataExists {
for key, value := range queryRangeData {
data[key] = value
}
}
if _, ok := telemetry.EnabledPaths()[path]; ok {
userEmail, err := auth.GetEmailFromJwt(r.Context())
if err == nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data, userEmail, true, false)
}
}
})
}
type loggingResponseWriter struct {
http.ResponseWriter
statusCode int
}
func NewLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
// WriteHeader(int) is not called if our response implicitly returns 200 OK, so
// we default to that status code.
return &loggingResponseWriter{w, http.StatusOK}
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
// Flush implements the http.Flush interface.
func (lrw *loggingResponseWriter) Flush() {
lrw.ResponseWriter.(http.Flusher).Flush()
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
// Support websockets
func (lrw *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
h, ok := lrw.ResponseWriter.(http.Hijacker)
if !ok {
return nil, nil, errors.New("hijack not supported")
}
return h.Hijack()
}
func getActiveLogs(path string, r *http.Request) {
// if path == "/api/v1/dashboards/{uuid}" {
// telemetry.GetInstance().AddActiveMetricsUser()
// }
if path == "/api/v1/logs" {
hasFilters := len(r.URL.Query().Get("q"))
if hasFilters > 0 {
telemetry.GetInstance().AddActiveLogsUser()
}
}
}
func extractQueryRangeData(path string, r *http.Request) (map[string]interface{}, bool) {
pathToExtractBodyFromV3 := "/api/v3/query_range"
pathToExtractBodyFromV4 := "/api/v4/query_range"
data := map[string]interface{}{}
var postData *v3.QueryRangeParamsV3
if (r.Method == "POST") && ((path == pathToExtractBodyFromV3) || (path == pathToExtractBodyFromV4)) {
if r.Body != nil {
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
return nil, false
}
r.Body.Close() // must close
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
json.Unmarshal(bodyBytes, &postData)
} else {
return nil, false
}
} else {
return nil, false
}
referrer := r.Header.Get("Referer")
dashboardMatched, err := regexp.MatchString(`/dashboard/[a-zA-Z0-9\-]+/(new|edit)(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the referrer", zap.Error(err))
}
alertMatched, err := regexp.MatchString(`/alerts/(new|edit)(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the alert: ", zap.Error(err))
}
logsExplorerMatched, err := regexp.MatchString(`/logs/logs-explorer(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the logs explorer: ", zap.Error(err))
}
traceExplorerMatched, err := regexp.MatchString(`/traces-explorer(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the trace explorer: ", zap.Error(err))
}
queryInfoResult := telemetry.GetInstance().CheckQueryInfo(postData)
if (queryInfoResult.MetricsUsed || queryInfoResult.LogsUsed || queryInfoResult.TracesUsed) && (queryInfoResult.FilterApplied) {
if queryInfoResult.MetricsUsed {
telemetry.GetInstance().AddActiveMetricsUser()
}
if queryInfoResult.LogsUsed {
telemetry.GetInstance().AddActiveLogsUser()
}
if queryInfoResult.TracesUsed {
telemetry.GetInstance().AddActiveTracesUser()
}
data["metricsUsed"] = queryInfoResult.MetricsUsed
data["logsUsed"] = queryInfoResult.LogsUsed
data["tracesUsed"] = queryInfoResult.TracesUsed
data["filterApplied"] = queryInfoResult.FilterApplied
data["groupByApplied"] = queryInfoResult.GroupByApplied
data["aggregateOperator"] = queryInfoResult.AggregateOperator
data["aggregateAttributeKey"] = queryInfoResult.AggregateAttributeKey
data["numberOfQueries"] = queryInfoResult.NumberOfQueries
data["queryType"] = queryInfoResult.QueryType
data["panelType"] = queryInfoResult.PanelType
userEmail, err := auth.GetEmailFromJwt(r.Context())
if err == nil {
// switch case to set data["screen"] based on the referrer
switch {
case dashboardMatched:
data["screen"] = "panel"
case alertMatched:
data["screen"] = "alert"
case logsExplorerMatched:
data["screen"] = "logs-explorer"
case traceExplorerMatched:
data["screen"] = "traces-explorer"
default:
data["screen"] = "unknown"
return data, true
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_API, data, userEmail, true, false)
}
}
return data, true
}

View File

@@ -0,0 +1,88 @@
package middleware
import (
"context"
"net/http"
"net/url"
"strings"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/common"
"go.uber.org/zap"
)
type LogComment struct {
logger *zap.Logger
}
func NewLogComment(logger *zap.Logger) *LogComment {
if logger == nil {
panic("cannot build log enrichment, logger is empty")
}
return &LogComment{
logger: logger.Named(pkgname),
}
}
func (middleware *LogComment) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
referrer := r.Header.Get("Referer")
var path, dashboardID, alertID, page, client, viewName, tab string
if referrer != "" {
referrerURL, _ := url.Parse(referrer)
client = "browser"
path = referrerURL.Path
if strings.Contains(path, "/dashboard") {
// Split the path into segments
pathSegments := strings.Split(referrerURL.Path, "/")
// The dashboard ID should be the segment after "/dashboard/"
// Loop through pathSegments to find "dashboard" and then take the next segment as the ID
for i, segment := range pathSegments {
if segment == "dashboard" && i < len(pathSegments)-1 {
// Return the next segment, which should be the dashboard ID
dashboardID = pathSegments[i+1]
}
}
page = "dashboards"
} else if strings.Contains(path, "/alerts") {
urlParams := referrerURL.Query()
alertID = urlParams.Get("ruleId")
page = "alerts"
} else if strings.Contains(path, "logs") && strings.Contains(path, "explorer") {
page = "logs-explorer"
viewName = referrerURL.Query().Get("viewName")
} else if strings.Contains(path, "/trace") || strings.Contains(path, "traces-explorer") {
page = "traces-explorer"
viewName = referrerURL.Query().Get("viewName")
} else if strings.Contains(path, "/services") {
page = "services"
tab = referrerURL.Query().Get("tab")
if tab == "" {
tab = "OVER_METRICS"
}
}
} else {
client = "api"
}
email, _ := auth.GetEmailFromJwt(r.Context())
kvs := map[string]string{
"path": path,
"dashboardID": dashboardID,
"alertID": alertID,
"source": page,
"client": client,
"viewName": viewName,
"servicesTab": tab,
"email": email,
}
r = r.WithContext(context.WithValue(r.Context(), common.LogCommentKey, kvs))
next.ServeHTTP(w, r)
})
}

View File

@@ -1,28 +1,21 @@
package app
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
_ "net/http/pprof" // http profiler
"net/url"
"os"
"regexp"
"strings"
"time"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/jmoiron/sqlx"
"github.com/rs/cors"
"github.com/soheilhy/cmux"
"go.signoz.io/signoz/pkg/http/middleware"
"go.signoz.io/signoz/pkg/query-service/agentConf"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
@@ -32,8 +25,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/opamp"
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
"go.signoz.io/signoz/pkg/query-service/app/preferences"
"go.signoz.io/signoz/pkg/query-service/common"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/web"
@@ -263,9 +254,17 @@ func (s *Server) createPrivateServer(api *APIHandler) (*http.Server, error) {
r := NewRouter()
r.Use(setTimeoutMiddleware)
r.Use(s.analyticsMiddleware)
r.Use(loggingMiddlewarePrivate)
timeoutMiddleware := middleware.NewTimeout(zap.L(), constants.TimeoutExcludedRoutes, 60*time.Second, 600*time.Second)
r.Use(timeoutMiddleware.Wrap)
analyticsMiddleware := middleware.NewAnalytics(zap.L())
r.Use(analyticsMiddleware.Wrap)
loggingMiddleware := middleware.NewLogging(zap.L())
r.Use(loggingMiddleware.Wrap)
logCommentMiddleware := middleware.NewLogComment(zap.L())
r.Use(logCommentMiddleware.Wrap)
api.RegisterPrivateRoutes(r)
@@ -289,10 +288,17 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server,
r := NewRouter()
r.Use(setTimeoutMiddleware)
r.Use(s.analyticsMiddleware)
r.Use(loggingMiddleware)
r.Use(LogCommentEnricher)
timeoutMiddleware := middleware.NewTimeout(zap.L(), constants.TimeoutExcludedRoutes, 60*time.Second, 600*time.Second)
r.Use(timeoutMiddleware.Wrap)
analyticsMiddleware := middleware.NewAnalytics(zap.L())
r.Use(analyticsMiddleware.Wrap)
loggingMiddleware := middleware.NewLogging(zap.L())
r.Use(loggingMiddleware.Wrap)
logCommentMiddleware := middleware.NewLogComment(zap.L())
r.Use(logCommentMiddleware.Wrap)
// add auth middleware
getUserFromRequest := func(r *http.Request) (*model.UserPayload, error) {
@@ -340,297 +346,6 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server,
}, nil
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
// loggingMiddleware is used for logging public api calls
func loggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
route := mux.CurrentRoute(r)
path, _ := route.GetPathTemplate()
startTime := time.Now()
next.ServeHTTP(w, r)
zap.L().Info(path, zap.Duration("timeTaken", time.Since(startTime)), zap.String("path", path))
})
}
func LogCommentEnricher(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
referrer := r.Header.Get("Referer")
var path, dashboardID, alertID, page, client, viewName, tab string
if referrer != "" {
referrerURL, _ := url.Parse(referrer)
client = "browser"
path = referrerURL.Path
if strings.Contains(path, "/dashboard") {
// Split the path into segments
pathSegments := strings.Split(referrerURL.Path, "/")
// The dashboard ID should be the segment after "/dashboard/"
// Loop through pathSegments to find "dashboard" and then take the next segment as the ID
for i, segment := range pathSegments {
if segment == "dashboard" && i < len(pathSegments)-1 {
// Return the next segment, which should be the dashboard ID
dashboardID = pathSegments[i+1]
}
}
page = "dashboards"
} else if strings.Contains(path, "/alerts") {
urlParams := referrerURL.Query()
alertID = urlParams.Get("ruleId")
page = "alerts"
} else if strings.Contains(path, "logs") && strings.Contains(path, "explorer") {
page = "logs-explorer"
viewName = referrerURL.Query().Get("viewName")
} else if strings.Contains(path, "/trace") || strings.Contains(path, "traces-explorer") {
page = "traces-explorer"
viewName = referrerURL.Query().Get("viewName")
} else if strings.Contains(path, "/services") {
page = "services"
tab = referrerURL.Query().Get("tab")
if tab == "" {
tab = "OVER_METRICS"
}
}
} else {
client = "api"
}
email, _ := auth.GetEmailFromJwt(r.Context())
kvs := map[string]string{
"path": path,
"dashboardID": dashboardID,
"alertID": alertID,
"source": page,
"client": client,
"viewName": viewName,
"servicesTab": tab,
"email": email,
}
r = r.WithContext(context.WithValue(r.Context(), common.LogCommentKey, kvs))
next.ServeHTTP(w, r)
})
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
// loggingMiddlewarePrivate is used for logging private api calls
// from internal services like alert manager
func loggingMiddlewarePrivate(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
route := mux.CurrentRoute(r)
path, _ := route.GetPathTemplate()
startTime := time.Now()
next.ServeHTTP(w, r)
zap.L().Info(path, zap.Duration("timeTaken", time.Since(startTime)), zap.String("path", path), zap.Bool("privatePort", true))
})
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
type loggingResponseWriter struct {
http.ResponseWriter
statusCode int
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
func NewLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
// WriteHeader(int) is not called if our response implicitly returns 200 OK, so
// we default to that status code.
return &loggingResponseWriter{w, http.StatusOK}
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
// Flush implements the http.Flush interface.
func (lrw *loggingResponseWriter) Flush() {
lrw.ResponseWriter.(http.Flusher).Flush()
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
// Support websockets
func (lrw *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
h, ok := lrw.ResponseWriter.(http.Hijacker)
if !ok {
return nil, nil, errors.New("hijack not supported")
}
return h.Hijack()
}
func extractQueryRangeV3Data(path string, r *http.Request) (map[string]interface{}, bool) {
pathToExtractBodyFromV3 := "/api/v3/query_range"
pathToExtractBodyFromV4 := "/api/v4/query_range"
data := map[string]interface{}{}
var postData *v3.QueryRangeParamsV3
if (r.Method == "POST") && ((path == pathToExtractBodyFromV3) || (path == pathToExtractBodyFromV4)) {
if r.Body != nil {
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
return nil, false
}
r.Body.Close() // must close
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
json.Unmarshal(bodyBytes, &postData)
} else {
return nil, false
}
} else {
return nil, false
}
referrer := r.Header.Get("Referer")
dashboardMatched, err := regexp.MatchString(`/dashboard/[a-zA-Z0-9\-]+/(new|edit)(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the referrer", zap.Error(err))
}
alertMatched, err := regexp.MatchString(`/alerts/(new|edit)(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the alert: ", zap.Error(err))
}
logsExplorerMatched, err := regexp.MatchString(`/logs/logs-explorer(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the logs explorer: ", zap.Error(err))
}
traceExplorerMatched, err := regexp.MatchString(`/traces-explorer(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the trace explorer: ", zap.Error(err))
}
queryInfoResult := telemetry.GetInstance().CheckQueryInfo(postData)
if (queryInfoResult.MetricsUsed || queryInfoResult.LogsUsed || queryInfoResult.TracesUsed) && (queryInfoResult.FilterApplied) {
if queryInfoResult.MetricsUsed {
telemetry.GetInstance().AddActiveMetricsUser()
}
if queryInfoResult.LogsUsed {
telemetry.GetInstance().AddActiveLogsUser()
}
if queryInfoResult.TracesUsed {
telemetry.GetInstance().AddActiveTracesUser()
}
data["metricsUsed"] = queryInfoResult.MetricsUsed
data["logsUsed"] = queryInfoResult.LogsUsed
data["tracesUsed"] = queryInfoResult.TracesUsed
data["filterApplied"] = queryInfoResult.FilterApplied
data["groupByApplied"] = queryInfoResult.GroupByApplied
data["aggregateOperator"] = queryInfoResult.AggregateOperator
data["aggregateAttributeKey"] = queryInfoResult.AggregateAttributeKey
data["numberOfQueries"] = queryInfoResult.NumberOfQueries
data["queryType"] = queryInfoResult.QueryType
data["panelType"] = queryInfoResult.PanelType
userEmail, err := auth.GetEmailFromJwt(r.Context())
if err == nil {
// switch case to set data["screen"] based on the referrer
switch {
case dashboardMatched:
data["screen"] = "panel"
case alertMatched:
data["screen"] = "alert"
case logsExplorerMatched:
data["screen"] = "logs-explorer"
case traceExplorerMatched:
data["screen"] = "traces-explorer"
default:
data["screen"] = "unknown"
return data, true
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_API, data, userEmail, true, false)
}
}
return data, true
}
func getActiveLogs(path string, r *http.Request) {
// if path == "/api/v1/dashboards/{uuid}" {
// telemetry.GetInstance().AddActiveMetricsUser()
// }
if path == "/api/v1/logs" {
hasFilters := len(r.URL.Query().Get("q"))
if hasFilters > 0 {
telemetry.GetInstance().AddActiveLogsUser()
}
}
}
func (s *Server) analyticsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := auth.AttachJwtToContext(r.Context(), r)
r = r.WithContext(ctx)
route := mux.CurrentRoute(r)
path, _ := route.GetPathTemplate()
queryRangeV3data, metadataExists := extractQueryRangeV3Data(path, r)
getActiveLogs(path, r)
lrw := NewLoggingResponseWriter(w)
next.ServeHTTP(lrw, r)
data := map[string]interface{}{"path": path, "statusCode": lrw.statusCode}
if metadataExists {
for key, value := range queryRangeV3data {
data[key] = value
}
}
// if telemetry.GetInstance().IsSampled() {
if _, ok := telemetry.EnabledPaths()[path]; ok {
userEmail, err := auth.GetEmailFromJwt(r.Context())
if err == nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data, userEmail, true, false)
}
}
// }
})
}
// TODO(remove): Implemented at pkg/http/middleware/timeout.go
func getRouteContextTimeout(overrideTimeout string) time.Duration {
var timeout time.Duration
var err error
if overrideTimeout != "" {
timeout, err = time.ParseDuration(overrideTimeout + "s")
if err != nil {
timeout = constants.ContextTimeout
}
if timeout > constants.ContextTimeoutMaxAllowed {
timeout = constants.ContextTimeoutMaxAllowed
}
return timeout
}
return constants.ContextTimeout
}
// TODO(remove): Implemented at pkg/http/middleware/timeout.go
func setTimeoutMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var cancel context.CancelFunc
// check if route is not excluded
url := r.URL.Path
if _, ok := constants.TimeoutExcludedRoutes[url]; !ok {
ctx, cancel = context.WithTimeout(r.Context(), getRouteContextTimeout(r.Header.Get("timeout")))
defer cancel()
}
r = r.WithContext(ctx)
next.ServeHTTP(w, r)
})
}
// initListeners initialises listeners of the server
func (s *Server) initListeners() error {
// listen on public port

View File

@@ -1,42 +0,0 @@
package app
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// TODO(remove): Implemented at pkg/http/middleware/timeout_test.go
func TestGetRouteContextTimeout(t *testing.T) {
var testGetRouteContextTimeoutData = []struct {
Name string
OverrideValue string
timeout time.Duration
}{
{
Name: "default",
OverrideValue: "",
timeout: 60 * time.Second,
},
{
Name: "override",
OverrideValue: "180",
timeout: 180 * time.Second,
},
{
Name: "override more than max",
OverrideValue: "610",
timeout: 600 * time.Second,
},
}
t.Parallel()
for _, test := range testGetRouteContextTimeoutData {
t.Run(test.Name, func(t *testing.T) {
res := getRouteContextTimeout(test.OverrideValue)
assert.Equal(t, test.timeout, res)
})
}
}

View File

@@ -254,9 +254,9 @@ const (
SIGNOZ_TOP_LEVEL_OPERATIONS_TABLENAME = "distributed_top_level_operations"
)
var TimeoutExcludedRoutes = map[string]bool{
"/api/v1/logs/tail": true,
"/api/v3/logs/livetail": true,
var TimeoutExcludedRoutes = map[string]struct{}{
"/api/v1/logs/tail": {},
"/api/v3/logs/livetail": {},
}
// alert related constants