Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
277419fad2 |
@@ -1,22 +1,16 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
_ "net/http/pprof" // http profiler
|
||||
"os"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/handlers"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"github.com/rs/cors"
|
||||
@@ -29,14 +23,13 @@ import (
|
||||
"go.signoz.io/signoz/ee/query-service/integrations/gateway"
|
||||
"go.signoz.io/signoz/ee/query-service/interfaces"
|
||||
"go.signoz.io/signoz/ee/query-service/rules"
|
||||
baseauth "go.signoz.io/signoz/pkg/query-service/auth"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/signoz"
|
||||
"go.signoz.io/signoz/pkg/web"
|
||||
|
||||
licensepkg "go.signoz.io/signoz/ee/query-service/license"
|
||||
"go.signoz.io/signoz/ee/query-service/usage"
|
||||
|
||||
"go.signoz.io/signoz/pkg/http/middleware"
|
||||
"go.signoz.io/signoz/pkg/query-service/agentConf"
|
||||
baseapp "go.signoz.io/signoz/pkg/query-service/app"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
|
||||
@@ -319,10 +312,17 @@ func (s *Server) createPrivateServer(apiHandler *api.APIHandler) (*http.Server,
|
||||
|
||||
r := baseapp.NewRouter()
|
||||
|
||||
r.Use(setTimeoutMiddleware)
|
||||
r.Use(s.analyticsMiddleware)
|
||||
r.Use(loggingMiddlewarePrivate)
|
||||
r.Use(baseapp.LogCommentEnricher)
|
||||
timeoutMiddleware := middleware.NewTimeout(zap.L(), baseconst.TimeoutExcludedRoutes, 60*time.Second, 600*time.Second)
|
||||
r.Use(timeoutMiddleware.Wrap)
|
||||
|
||||
analyticsMiddleware := middleware.NewAnalytics(zap.L())
|
||||
r.Use(analyticsMiddleware.Wrap)
|
||||
|
||||
loggingMiddleware := middleware.NewLogging(zap.L())
|
||||
r.Use(loggingMiddleware.Wrap)
|
||||
|
||||
logCommentMiddleware := middleware.NewLogComment(zap.L())
|
||||
r.Use(logCommentMiddleware.Wrap)
|
||||
|
||||
apiHandler.RegisterPrivateRoutes(r)
|
||||
|
||||
@@ -362,10 +362,17 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
|
||||
}
|
||||
am := baseapp.NewAuthMiddleware(getUserFromRequest)
|
||||
|
||||
r.Use(setTimeoutMiddleware)
|
||||
r.Use(s.analyticsMiddleware)
|
||||
r.Use(loggingMiddleware)
|
||||
r.Use(baseapp.LogCommentEnricher)
|
||||
timeoutMiddleware := middleware.NewTimeout(zap.L(), baseconst.TimeoutExcludedRoutes, 60*time.Second, 600*time.Second)
|
||||
r.Use(timeoutMiddleware.Wrap)
|
||||
|
||||
analyticsMiddleware := middleware.NewAnalytics(zap.L())
|
||||
r.Use(analyticsMiddleware.Wrap)
|
||||
|
||||
loggingMiddleware := middleware.NewLogging(zap.L())
|
||||
r.Use(loggingMiddleware.Wrap)
|
||||
|
||||
logCommentMiddleware := middleware.NewLogComment(zap.L())
|
||||
r.Use(logCommentMiddleware.Wrap)
|
||||
|
||||
apiHandler.RegisterRoutes(r, am)
|
||||
apiHandler.RegisterLogsRoutes(r, am)
|
||||
@@ -397,216 +404,6 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
|
||||
}, nil
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
// loggingMiddleware is used for logging public api calls
|
||||
func loggingMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
route := mux.CurrentRoute(r)
|
||||
path, _ := route.GetPathTemplate()
|
||||
startTime := time.Now()
|
||||
next.ServeHTTP(w, r)
|
||||
zap.L().Info(path, zap.Duration("timeTaken", time.Since(startTime)), zap.String("path", path))
|
||||
})
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
// loggingMiddlewarePrivate is used for logging private api calls
|
||||
// from internal services like alert manager
|
||||
func loggingMiddlewarePrivate(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
route := mux.CurrentRoute(r)
|
||||
path, _ := route.GetPathTemplate()
|
||||
startTime := time.Now()
|
||||
next.ServeHTTP(w, r)
|
||||
zap.L().Info(path, zap.Duration("timeTaken", time.Since(startTime)), zap.String("path", path), zap.Bool("tprivatePort", true))
|
||||
})
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
type loggingResponseWriter struct {
|
||||
http.ResponseWriter
|
||||
statusCode int
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
func NewLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
|
||||
// WriteHeader(int) is not called if our response implicitly returns 200 OK, so
|
||||
// we default to that status code.
|
||||
return &loggingResponseWriter{w, http.StatusOK}
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
func (lrw *loggingResponseWriter) WriteHeader(code int) {
|
||||
lrw.statusCode = code
|
||||
lrw.ResponseWriter.WriteHeader(code)
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
// Flush implements the http.Flush interface.
|
||||
func (lrw *loggingResponseWriter) Flush() {
|
||||
lrw.ResponseWriter.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
// Support websockets
|
||||
func (lrw *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
||||
h, ok := lrw.ResponseWriter.(http.Hijacker)
|
||||
if !ok {
|
||||
return nil, nil, errors.New("hijack not supported")
|
||||
}
|
||||
return h.Hijack()
|
||||
}
|
||||
|
||||
func extractQueryRangeData(path string, r *http.Request) (map[string]interface{}, bool) {
|
||||
pathToExtractBodyFromV3 := "/api/v3/query_range"
|
||||
pathToExtractBodyFromV4 := "/api/v4/query_range"
|
||||
|
||||
data := map[string]interface{}{}
|
||||
var postData *v3.QueryRangeParamsV3
|
||||
|
||||
if (r.Method == "POST") && ((path == pathToExtractBodyFromV3) || (path == pathToExtractBodyFromV4)) {
|
||||
if r.Body != nil {
|
||||
bodyBytes, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
r.Body.Close() // must close
|
||||
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
||||
json.Unmarshal(bodyBytes, &postData)
|
||||
|
||||
} else {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
} else {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
referrer := r.Header.Get("Referer")
|
||||
|
||||
dashboardMatched, err := regexp.MatchString(`/dashboard/[a-zA-Z0-9\-]+/(new|edit)(?:\?.*)?$`, referrer)
|
||||
if err != nil {
|
||||
zap.L().Error("error while matching the referrer", zap.Error(err))
|
||||
}
|
||||
alertMatched, err := regexp.MatchString(`/alerts/(new|edit)(?:\?.*)?$`, referrer)
|
||||
if err != nil {
|
||||
zap.L().Error("error while matching the alert: ", zap.Error(err))
|
||||
}
|
||||
logsExplorerMatched, err := regexp.MatchString(`/logs/logs-explorer(?:\?.*)?$`, referrer)
|
||||
if err != nil {
|
||||
zap.L().Error("error while matching the logs explorer: ", zap.Error(err))
|
||||
}
|
||||
traceExplorerMatched, err := regexp.MatchString(`/traces-explorer(?:\?.*)?$`, referrer)
|
||||
if err != nil {
|
||||
zap.L().Error("error while matching the trace explorer: ", zap.Error(err))
|
||||
}
|
||||
|
||||
queryInfoResult := telemetry.GetInstance().CheckQueryInfo(postData)
|
||||
|
||||
if (queryInfoResult.MetricsUsed || queryInfoResult.LogsUsed || queryInfoResult.TracesUsed) && (queryInfoResult.FilterApplied) {
|
||||
if queryInfoResult.MetricsUsed {
|
||||
telemetry.GetInstance().AddActiveMetricsUser()
|
||||
}
|
||||
if queryInfoResult.LogsUsed {
|
||||
telemetry.GetInstance().AddActiveLogsUser()
|
||||
}
|
||||
if queryInfoResult.TracesUsed {
|
||||
telemetry.GetInstance().AddActiveTracesUser()
|
||||
}
|
||||
data["metricsUsed"] = queryInfoResult.MetricsUsed
|
||||
data["logsUsed"] = queryInfoResult.LogsUsed
|
||||
data["tracesUsed"] = queryInfoResult.TracesUsed
|
||||
data["filterApplied"] = queryInfoResult.FilterApplied
|
||||
data["groupByApplied"] = queryInfoResult.GroupByApplied
|
||||
data["aggregateOperator"] = queryInfoResult.AggregateOperator
|
||||
data["aggregateAttributeKey"] = queryInfoResult.AggregateAttributeKey
|
||||
data["numberOfQueries"] = queryInfoResult.NumberOfQueries
|
||||
data["queryType"] = queryInfoResult.QueryType
|
||||
data["panelType"] = queryInfoResult.PanelType
|
||||
|
||||
userEmail, err := baseauth.GetEmailFromJwt(r.Context())
|
||||
if err == nil {
|
||||
// switch case to set data["screen"] based on the referrer
|
||||
switch {
|
||||
case dashboardMatched:
|
||||
data["screen"] = "panel"
|
||||
case alertMatched:
|
||||
data["screen"] = "alert"
|
||||
case logsExplorerMatched:
|
||||
data["screen"] = "logs-explorer"
|
||||
case traceExplorerMatched:
|
||||
data["screen"] = "traces-explorer"
|
||||
default:
|
||||
data["screen"] = "unknown"
|
||||
return data, true
|
||||
}
|
||||
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_API, data, userEmail, true, false)
|
||||
}
|
||||
}
|
||||
return data, true
|
||||
}
|
||||
|
||||
func getActiveLogs(path string, r *http.Request) {
|
||||
// if path == "/api/v1/dashboards/{uuid}" {
|
||||
// telemetry.GetInstance().AddActiveMetricsUser()
|
||||
// }
|
||||
if path == "/api/v1/logs" {
|
||||
hasFilters := len(r.URL.Query().Get("q"))
|
||||
if hasFilters > 0 {
|
||||
telemetry.GetInstance().AddActiveLogsUser()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (s *Server) analyticsMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := baseauth.AttachJwtToContext(r.Context(), r)
|
||||
r = r.WithContext(ctx)
|
||||
route := mux.CurrentRoute(r)
|
||||
path, _ := route.GetPathTemplate()
|
||||
|
||||
queryRangeData, metadataExists := extractQueryRangeData(path, r)
|
||||
getActiveLogs(path, r)
|
||||
|
||||
lrw := NewLoggingResponseWriter(w)
|
||||
next.ServeHTTP(lrw, r)
|
||||
|
||||
data := map[string]interface{}{"path": path, "statusCode": lrw.statusCode}
|
||||
if metadataExists {
|
||||
for key, value := range queryRangeData {
|
||||
data[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := telemetry.EnabledPaths()[path]; ok {
|
||||
userEmail, err := baseauth.GetEmailFromJwt(r.Context())
|
||||
if err == nil {
|
||||
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data, userEmail, true, false)
|
||||
}
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/timeout.go
|
||||
func setTimeoutMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
var cancel context.CancelFunc
|
||||
// check if route is not excluded
|
||||
url := r.URL.Path
|
||||
if _, ok := baseconst.TimeoutExcludedRoutes[url]; !ok {
|
||||
ctx, cancel = context.WithTimeout(r.Context(), baseconst.ContextTimeout)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
r = r.WithContext(ctx)
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
// initListeners initialises listeners of the server
|
||||
func (s *Server) initListeners() error {
|
||||
// listen on public port
|
||||
|
||||
198
pkg/http/middleware/analytics.go
Normal file
198
pkg/http/middleware/analytics.go
Normal file
@@ -0,0 +1,198 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"regexp"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"go.signoz.io/signoz/pkg/query-service/auth"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/telemetry"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type Analytics struct {
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewAnalytics(logger *zap.Logger) *Analytics {
|
||||
if logger == nil {
|
||||
panic("cannot build analytics, logger is empty")
|
||||
}
|
||||
|
||||
return &Analytics{
|
||||
logger: logger.Named(pkgname),
|
||||
}
|
||||
}
|
||||
|
||||
func (middleware *Analytics) Wrap(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := auth.AttachJwtToContext(r.Context(), r)
|
||||
r = r.WithContext(ctx)
|
||||
route := mux.CurrentRoute(r)
|
||||
path, _ := route.GetPathTemplate()
|
||||
|
||||
queryRangeData, metadataExists := extractQueryRangeData(path, r)
|
||||
getActiveLogs(path, r)
|
||||
|
||||
lrw := NewLoggingResponseWriter(w)
|
||||
next.ServeHTTP(lrw, r)
|
||||
|
||||
data := map[string]interface{}{"path": path, "statusCode": lrw.statusCode}
|
||||
if metadataExists {
|
||||
for key, value := range queryRangeData {
|
||||
data[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := telemetry.EnabledPaths()[path]; ok {
|
||||
userEmail, err := auth.GetEmailFromJwt(r.Context())
|
||||
if err == nil {
|
||||
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data, userEmail, true, false)
|
||||
}
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
type loggingResponseWriter struct {
|
||||
http.ResponseWriter
|
||||
statusCode int
|
||||
}
|
||||
|
||||
func NewLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
|
||||
// WriteHeader(int) is not called if our response implicitly returns 200 OK, so
|
||||
// we default to that status code.
|
||||
return &loggingResponseWriter{w, http.StatusOK}
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
func (lrw *loggingResponseWriter) WriteHeader(code int) {
|
||||
lrw.statusCode = code
|
||||
lrw.ResponseWriter.WriteHeader(code)
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
// Flush implements the http.Flush interface.
|
||||
func (lrw *loggingResponseWriter) Flush() {
|
||||
lrw.ResponseWriter.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
// Support websockets
|
||||
func (lrw *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
||||
h, ok := lrw.ResponseWriter.(http.Hijacker)
|
||||
if !ok {
|
||||
return nil, nil, errors.New("hijack not supported")
|
||||
}
|
||||
return h.Hijack()
|
||||
}
|
||||
|
||||
func getActiveLogs(path string, r *http.Request) {
|
||||
// if path == "/api/v1/dashboards/{uuid}" {
|
||||
// telemetry.GetInstance().AddActiveMetricsUser()
|
||||
// }
|
||||
if path == "/api/v1/logs" {
|
||||
hasFilters := len(r.URL.Query().Get("q"))
|
||||
if hasFilters > 0 {
|
||||
telemetry.GetInstance().AddActiveLogsUser()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func extractQueryRangeData(path string, r *http.Request) (map[string]interface{}, bool) {
|
||||
pathToExtractBodyFromV3 := "/api/v3/query_range"
|
||||
pathToExtractBodyFromV4 := "/api/v4/query_range"
|
||||
|
||||
data := map[string]interface{}{}
|
||||
var postData *v3.QueryRangeParamsV3
|
||||
|
||||
if (r.Method == "POST") && ((path == pathToExtractBodyFromV3) || (path == pathToExtractBodyFromV4)) {
|
||||
if r.Body != nil {
|
||||
bodyBytes, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
r.Body.Close() // must close
|
||||
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
||||
json.Unmarshal(bodyBytes, &postData)
|
||||
|
||||
} else {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
} else {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
referrer := r.Header.Get("Referer")
|
||||
|
||||
dashboardMatched, err := regexp.MatchString(`/dashboard/[a-zA-Z0-9\-]+/(new|edit)(?:\?.*)?$`, referrer)
|
||||
if err != nil {
|
||||
zap.L().Error("error while matching the referrer", zap.Error(err))
|
||||
}
|
||||
alertMatched, err := regexp.MatchString(`/alerts/(new|edit)(?:\?.*)?$`, referrer)
|
||||
if err != nil {
|
||||
zap.L().Error("error while matching the alert: ", zap.Error(err))
|
||||
}
|
||||
logsExplorerMatched, err := regexp.MatchString(`/logs/logs-explorer(?:\?.*)?$`, referrer)
|
||||
if err != nil {
|
||||
zap.L().Error("error while matching the logs explorer: ", zap.Error(err))
|
||||
}
|
||||
traceExplorerMatched, err := regexp.MatchString(`/traces-explorer(?:\?.*)?$`, referrer)
|
||||
if err != nil {
|
||||
zap.L().Error("error while matching the trace explorer: ", zap.Error(err))
|
||||
}
|
||||
|
||||
queryInfoResult := telemetry.GetInstance().CheckQueryInfo(postData)
|
||||
|
||||
if (queryInfoResult.MetricsUsed || queryInfoResult.LogsUsed || queryInfoResult.TracesUsed) && (queryInfoResult.FilterApplied) {
|
||||
if queryInfoResult.MetricsUsed {
|
||||
telemetry.GetInstance().AddActiveMetricsUser()
|
||||
}
|
||||
if queryInfoResult.LogsUsed {
|
||||
telemetry.GetInstance().AddActiveLogsUser()
|
||||
}
|
||||
if queryInfoResult.TracesUsed {
|
||||
telemetry.GetInstance().AddActiveTracesUser()
|
||||
}
|
||||
data["metricsUsed"] = queryInfoResult.MetricsUsed
|
||||
data["logsUsed"] = queryInfoResult.LogsUsed
|
||||
data["tracesUsed"] = queryInfoResult.TracesUsed
|
||||
data["filterApplied"] = queryInfoResult.FilterApplied
|
||||
data["groupByApplied"] = queryInfoResult.GroupByApplied
|
||||
data["aggregateOperator"] = queryInfoResult.AggregateOperator
|
||||
data["aggregateAttributeKey"] = queryInfoResult.AggregateAttributeKey
|
||||
data["numberOfQueries"] = queryInfoResult.NumberOfQueries
|
||||
data["queryType"] = queryInfoResult.QueryType
|
||||
data["panelType"] = queryInfoResult.PanelType
|
||||
|
||||
userEmail, err := auth.GetEmailFromJwt(r.Context())
|
||||
if err == nil {
|
||||
// switch case to set data["screen"] based on the referrer
|
||||
switch {
|
||||
case dashboardMatched:
|
||||
data["screen"] = "panel"
|
||||
case alertMatched:
|
||||
data["screen"] = "alert"
|
||||
case logsExplorerMatched:
|
||||
data["screen"] = "logs-explorer"
|
||||
case traceExplorerMatched:
|
||||
data["screen"] = "traces-explorer"
|
||||
default:
|
||||
data["screen"] = "unknown"
|
||||
return data, true
|
||||
}
|
||||
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_API, data, userEmail, true, false)
|
||||
}
|
||||
}
|
||||
return data, true
|
||||
}
|
||||
88
pkg/http/middleware/log_comment.go
Normal file
88
pkg/http/middleware/log_comment.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/auth"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type LogComment struct {
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewLogComment(logger *zap.Logger) *LogComment {
|
||||
if logger == nil {
|
||||
panic("cannot build log enrichment, logger is empty")
|
||||
}
|
||||
|
||||
return &LogComment{
|
||||
logger: logger.Named(pkgname),
|
||||
}
|
||||
}
|
||||
|
||||
func (middleware *LogComment) Wrap(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
referrer := r.Header.Get("Referer")
|
||||
|
||||
var path, dashboardID, alertID, page, client, viewName, tab string
|
||||
|
||||
if referrer != "" {
|
||||
referrerURL, _ := url.Parse(referrer)
|
||||
client = "browser"
|
||||
path = referrerURL.Path
|
||||
|
||||
if strings.Contains(path, "/dashboard") {
|
||||
// Split the path into segments
|
||||
pathSegments := strings.Split(referrerURL.Path, "/")
|
||||
// The dashboard ID should be the segment after "/dashboard/"
|
||||
// Loop through pathSegments to find "dashboard" and then take the next segment as the ID
|
||||
for i, segment := range pathSegments {
|
||||
if segment == "dashboard" && i < len(pathSegments)-1 {
|
||||
// Return the next segment, which should be the dashboard ID
|
||||
dashboardID = pathSegments[i+1]
|
||||
}
|
||||
}
|
||||
page = "dashboards"
|
||||
} else if strings.Contains(path, "/alerts") {
|
||||
urlParams := referrerURL.Query()
|
||||
alertID = urlParams.Get("ruleId")
|
||||
page = "alerts"
|
||||
} else if strings.Contains(path, "logs") && strings.Contains(path, "explorer") {
|
||||
page = "logs-explorer"
|
||||
viewName = referrerURL.Query().Get("viewName")
|
||||
} else if strings.Contains(path, "/trace") || strings.Contains(path, "traces-explorer") {
|
||||
page = "traces-explorer"
|
||||
viewName = referrerURL.Query().Get("viewName")
|
||||
} else if strings.Contains(path, "/services") {
|
||||
page = "services"
|
||||
tab = referrerURL.Query().Get("tab")
|
||||
if tab == "" {
|
||||
tab = "OVER_METRICS"
|
||||
}
|
||||
}
|
||||
} else {
|
||||
client = "api"
|
||||
}
|
||||
|
||||
email, _ := auth.GetEmailFromJwt(r.Context())
|
||||
|
||||
kvs := map[string]string{
|
||||
"path": path,
|
||||
"dashboardID": dashboardID,
|
||||
"alertID": alertID,
|
||||
"source": page,
|
||||
"client": client,
|
||||
"viewName": viewName,
|
||||
"servicesTab": tab,
|
||||
"email": email,
|
||||
}
|
||||
|
||||
r = r.WithContext(context.WithValue(r.Context(), common.LogCommentKey, kvs))
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
@@ -1,28 +1,21 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
_ "net/http/pprof" // http profiler
|
||||
"net/url"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/handlers"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"github.com/rs/cors"
|
||||
"github.com/soheilhy/cmux"
|
||||
"go.signoz.io/signoz/pkg/http/middleware"
|
||||
"go.signoz.io/signoz/pkg/query-service/agentConf"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
|
||||
@@ -32,8 +25,6 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/app/opamp"
|
||||
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/preferences"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/signoz"
|
||||
"go.signoz.io/signoz/pkg/web"
|
||||
|
||||
@@ -263,9 +254,17 @@ func (s *Server) createPrivateServer(api *APIHandler) (*http.Server, error) {
|
||||
|
||||
r := NewRouter()
|
||||
|
||||
r.Use(setTimeoutMiddleware)
|
||||
r.Use(s.analyticsMiddleware)
|
||||
r.Use(loggingMiddlewarePrivate)
|
||||
timeoutMiddleware := middleware.NewTimeout(zap.L(), constants.TimeoutExcludedRoutes, 60*time.Second, 600*time.Second)
|
||||
r.Use(timeoutMiddleware.Wrap)
|
||||
|
||||
analyticsMiddleware := middleware.NewAnalytics(zap.L())
|
||||
r.Use(analyticsMiddleware.Wrap)
|
||||
|
||||
loggingMiddleware := middleware.NewLogging(zap.L())
|
||||
r.Use(loggingMiddleware.Wrap)
|
||||
|
||||
logCommentMiddleware := middleware.NewLogComment(zap.L())
|
||||
r.Use(logCommentMiddleware.Wrap)
|
||||
|
||||
api.RegisterPrivateRoutes(r)
|
||||
|
||||
@@ -289,10 +288,17 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server,
|
||||
|
||||
r := NewRouter()
|
||||
|
||||
r.Use(setTimeoutMiddleware)
|
||||
r.Use(s.analyticsMiddleware)
|
||||
r.Use(loggingMiddleware)
|
||||
r.Use(LogCommentEnricher)
|
||||
timeoutMiddleware := middleware.NewTimeout(zap.L(), constants.TimeoutExcludedRoutes, 60*time.Second, 600*time.Second)
|
||||
r.Use(timeoutMiddleware.Wrap)
|
||||
|
||||
analyticsMiddleware := middleware.NewAnalytics(zap.L())
|
||||
r.Use(analyticsMiddleware.Wrap)
|
||||
|
||||
loggingMiddleware := middleware.NewLogging(zap.L())
|
||||
r.Use(loggingMiddleware.Wrap)
|
||||
|
||||
logCommentMiddleware := middleware.NewLogComment(zap.L())
|
||||
r.Use(logCommentMiddleware.Wrap)
|
||||
|
||||
// add auth middleware
|
||||
getUserFromRequest := func(r *http.Request) (*model.UserPayload, error) {
|
||||
@@ -340,297 +346,6 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
// loggingMiddleware is used for logging public api calls
|
||||
func loggingMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
route := mux.CurrentRoute(r)
|
||||
path, _ := route.GetPathTemplate()
|
||||
startTime := time.Now()
|
||||
next.ServeHTTP(w, r)
|
||||
zap.L().Info(path, zap.Duration("timeTaken", time.Since(startTime)), zap.String("path", path))
|
||||
})
|
||||
}
|
||||
|
||||
func LogCommentEnricher(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
referrer := r.Header.Get("Referer")
|
||||
|
||||
var path, dashboardID, alertID, page, client, viewName, tab string
|
||||
|
||||
if referrer != "" {
|
||||
referrerURL, _ := url.Parse(referrer)
|
||||
client = "browser"
|
||||
path = referrerURL.Path
|
||||
|
||||
if strings.Contains(path, "/dashboard") {
|
||||
// Split the path into segments
|
||||
pathSegments := strings.Split(referrerURL.Path, "/")
|
||||
// The dashboard ID should be the segment after "/dashboard/"
|
||||
// Loop through pathSegments to find "dashboard" and then take the next segment as the ID
|
||||
for i, segment := range pathSegments {
|
||||
if segment == "dashboard" && i < len(pathSegments)-1 {
|
||||
// Return the next segment, which should be the dashboard ID
|
||||
dashboardID = pathSegments[i+1]
|
||||
}
|
||||
}
|
||||
page = "dashboards"
|
||||
} else if strings.Contains(path, "/alerts") {
|
||||
urlParams := referrerURL.Query()
|
||||
alertID = urlParams.Get("ruleId")
|
||||
page = "alerts"
|
||||
} else if strings.Contains(path, "logs") && strings.Contains(path, "explorer") {
|
||||
page = "logs-explorer"
|
||||
viewName = referrerURL.Query().Get("viewName")
|
||||
} else if strings.Contains(path, "/trace") || strings.Contains(path, "traces-explorer") {
|
||||
page = "traces-explorer"
|
||||
viewName = referrerURL.Query().Get("viewName")
|
||||
} else if strings.Contains(path, "/services") {
|
||||
page = "services"
|
||||
tab = referrerURL.Query().Get("tab")
|
||||
if tab == "" {
|
||||
tab = "OVER_METRICS"
|
||||
}
|
||||
}
|
||||
} else {
|
||||
client = "api"
|
||||
}
|
||||
|
||||
email, _ := auth.GetEmailFromJwt(r.Context())
|
||||
|
||||
kvs := map[string]string{
|
||||
"path": path,
|
||||
"dashboardID": dashboardID,
|
||||
"alertID": alertID,
|
||||
"source": page,
|
||||
"client": client,
|
||||
"viewName": viewName,
|
||||
"servicesTab": tab,
|
||||
"email": email,
|
||||
}
|
||||
|
||||
r = r.WithContext(context.WithValue(r.Context(), common.LogCommentKey, kvs))
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
// loggingMiddlewarePrivate is used for logging private api calls
|
||||
// from internal services like alert manager
|
||||
func loggingMiddlewarePrivate(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
route := mux.CurrentRoute(r)
|
||||
path, _ := route.GetPathTemplate()
|
||||
startTime := time.Now()
|
||||
next.ServeHTTP(w, r)
|
||||
zap.L().Info(path, zap.Duration("timeTaken", time.Since(startTime)), zap.String("path", path), zap.Bool("privatePort", true))
|
||||
})
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
type loggingResponseWriter struct {
|
||||
http.ResponseWriter
|
||||
statusCode int
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
func NewLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
|
||||
// WriteHeader(int) is not called if our response implicitly returns 200 OK, so
|
||||
// we default to that status code.
|
||||
return &loggingResponseWriter{w, http.StatusOK}
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
func (lrw *loggingResponseWriter) WriteHeader(code int) {
|
||||
lrw.statusCode = code
|
||||
lrw.ResponseWriter.WriteHeader(code)
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
// Flush implements the http.Flush interface.
|
||||
func (lrw *loggingResponseWriter) Flush() {
|
||||
lrw.ResponseWriter.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/logging.go
|
||||
// Support websockets
|
||||
func (lrw *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
||||
h, ok := lrw.ResponseWriter.(http.Hijacker)
|
||||
if !ok {
|
||||
return nil, nil, errors.New("hijack not supported")
|
||||
}
|
||||
return h.Hijack()
|
||||
}
|
||||
|
||||
func extractQueryRangeV3Data(path string, r *http.Request) (map[string]interface{}, bool) {
|
||||
pathToExtractBodyFromV3 := "/api/v3/query_range"
|
||||
pathToExtractBodyFromV4 := "/api/v4/query_range"
|
||||
|
||||
data := map[string]interface{}{}
|
||||
var postData *v3.QueryRangeParamsV3
|
||||
|
||||
if (r.Method == "POST") && ((path == pathToExtractBodyFromV3) || (path == pathToExtractBodyFromV4)) {
|
||||
if r.Body != nil {
|
||||
bodyBytes, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
r.Body.Close() // must close
|
||||
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
||||
json.Unmarshal(bodyBytes, &postData)
|
||||
|
||||
} else {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
} else {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
referrer := r.Header.Get("Referer")
|
||||
|
||||
dashboardMatched, err := regexp.MatchString(`/dashboard/[a-zA-Z0-9\-]+/(new|edit)(?:\?.*)?$`, referrer)
|
||||
if err != nil {
|
||||
zap.L().Error("error while matching the referrer", zap.Error(err))
|
||||
}
|
||||
alertMatched, err := regexp.MatchString(`/alerts/(new|edit)(?:\?.*)?$`, referrer)
|
||||
if err != nil {
|
||||
zap.L().Error("error while matching the alert: ", zap.Error(err))
|
||||
}
|
||||
logsExplorerMatched, err := regexp.MatchString(`/logs/logs-explorer(?:\?.*)?$`, referrer)
|
||||
if err != nil {
|
||||
zap.L().Error("error while matching the logs explorer: ", zap.Error(err))
|
||||
}
|
||||
traceExplorerMatched, err := regexp.MatchString(`/traces-explorer(?:\?.*)?$`, referrer)
|
||||
if err != nil {
|
||||
zap.L().Error("error while matching the trace explorer: ", zap.Error(err))
|
||||
}
|
||||
|
||||
queryInfoResult := telemetry.GetInstance().CheckQueryInfo(postData)
|
||||
|
||||
if (queryInfoResult.MetricsUsed || queryInfoResult.LogsUsed || queryInfoResult.TracesUsed) && (queryInfoResult.FilterApplied) {
|
||||
if queryInfoResult.MetricsUsed {
|
||||
telemetry.GetInstance().AddActiveMetricsUser()
|
||||
}
|
||||
if queryInfoResult.LogsUsed {
|
||||
telemetry.GetInstance().AddActiveLogsUser()
|
||||
}
|
||||
if queryInfoResult.TracesUsed {
|
||||
telemetry.GetInstance().AddActiveTracesUser()
|
||||
}
|
||||
data["metricsUsed"] = queryInfoResult.MetricsUsed
|
||||
data["logsUsed"] = queryInfoResult.LogsUsed
|
||||
data["tracesUsed"] = queryInfoResult.TracesUsed
|
||||
data["filterApplied"] = queryInfoResult.FilterApplied
|
||||
data["groupByApplied"] = queryInfoResult.GroupByApplied
|
||||
data["aggregateOperator"] = queryInfoResult.AggregateOperator
|
||||
data["aggregateAttributeKey"] = queryInfoResult.AggregateAttributeKey
|
||||
data["numberOfQueries"] = queryInfoResult.NumberOfQueries
|
||||
data["queryType"] = queryInfoResult.QueryType
|
||||
data["panelType"] = queryInfoResult.PanelType
|
||||
|
||||
userEmail, err := auth.GetEmailFromJwt(r.Context())
|
||||
if err == nil {
|
||||
// switch case to set data["screen"] based on the referrer
|
||||
switch {
|
||||
case dashboardMatched:
|
||||
data["screen"] = "panel"
|
||||
case alertMatched:
|
||||
data["screen"] = "alert"
|
||||
case logsExplorerMatched:
|
||||
data["screen"] = "logs-explorer"
|
||||
case traceExplorerMatched:
|
||||
data["screen"] = "traces-explorer"
|
||||
default:
|
||||
data["screen"] = "unknown"
|
||||
return data, true
|
||||
}
|
||||
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_API, data, userEmail, true, false)
|
||||
}
|
||||
}
|
||||
return data, true
|
||||
}
|
||||
|
||||
func getActiveLogs(path string, r *http.Request) {
|
||||
// if path == "/api/v1/dashboards/{uuid}" {
|
||||
// telemetry.GetInstance().AddActiveMetricsUser()
|
||||
// }
|
||||
if path == "/api/v1/logs" {
|
||||
hasFilters := len(r.URL.Query().Get("q"))
|
||||
if hasFilters > 0 {
|
||||
telemetry.GetInstance().AddActiveLogsUser()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (s *Server) analyticsMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := auth.AttachJwtToContext(r.Context(), r)
|
||||
r = r.WithContext(ctx)
|
||||
route := mux.CurrentRoute(r)
|
||||
path, _ := route.GetPathTemplate()
|
||||
|
||||
queryRangeV3data, metadataExists := extractQueryRangeV3Data(path, r)
|
||||
getActiveLogs(path, r)
|
||||
|
||||
lrw := NewLoggingResponseWriter(w)
|
||||
next.ServeHTTP(lrw, r)
|
||||
|
||||
data := map[string]interface{}{"path": path, "statusCode": lrw.statusCode}
|
||||
if metadataExists {
|
||||
for key, value := range queryRangeV3data {
|
||||
data[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
// if telemetry.GetInstance().IsSampled() {
|
||||
if _, ok := telemetry.EnabledPaths()[path]; ok {
|
||||
userEmail, err := auth.GetEmailFromJwt(r.Context())
|
||||
if err == nil {
|
||||
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data, userEmail, true, false)
|
||||
}
|
||||
}
|
||||
// }
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/timeout.go
|
||||
func getRouteContextTimeout(overrideTimeout string) time.Duration {
|
||||
var timeout time.Duration
|
||||
var err error
|
||||
if overrideTimeout != "" {
|
||||
timeout, err = time.ParseDuration(overrideTimeout + "s")
|
||||
if err != nil {
|
||||
timeout = constants.ContextTimeout
|
||||
}
|
||||
if timeout > constants.ContextTimeoutMaxAllowed {
|
||||
timeout = constants.ContextTimeoutMaxAllowed
|
||||
}
|
||||
return timeout
|
||||
}
|
||||
return constants.ContextTimeout
|
||||
}
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/timeout.go
|
||||
func setTimeoutMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
var cancel context.CancelFunc
|
||||
// check if route is not excluded
|
||||
url := r.URL.Path
|
||||
if _, ok := constants.TimeoutExcludedRoutes[url]; !ok {
|
||||
ctx, cancel = context.WithTimeout(r.Context(), getRouteContextTimeout(r.Header.Get("timeout")))
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
r = r.WithContext(ctx)
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
// initListeners initialises listeners of the server
|
||||
func (s *Server) initListeners() error {
|
||||
// listen on public port
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// TODO(remove): Implemented at pkg/http/middleware/timeout_test.go
|
||||
func TestGetRouteContextTimeout(t *testing.T) {
|
||||
var testGetRouteContextTimeoutData = []struct {
|
||||
Name string
|
||||
OverrideValue string
|
||||
timeout time.Duration
|
||||
}{
|
||||
{
|
||||
Name: "default",
|
||||
OverrideValue: "",
|
||||
timeout: 60 * time.Second,
|
||||
},
|
||||
{
|
||||
Name: "override",
|
||||
OverrideValue: "180",
|
||||
timeout: 180 * time.Second,
|
||||
},
|
||||
{
|
||||
Name: "override more than max",
|
||||
OverrideValue: "610",
|
||||
timeout: 600 * time.Second,
|
||||
},
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
|
||||
for _, test := range testGetRouteContextTimeoutData {
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
res := getRouteContextTimeout(test.OverrideValue)
|
||||
assert.Equal(t, test.timeout, res)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -254,9 +254,9 @@ const (
|
||||
SIGNOZ_TOP_LEVEL_OPERATIONS_TABLENAME = "distributed_top_level_operations"
|
||||
)
|
||||
|
||||
var TimeoutExcludedRoutes = map[string]bool{
|
||||
"/api/v1/logs/tail": true,
|
||||
"/api/v3/logs/livetail": true,
|
||||
var TimeoutExcludedRoutes = map[string]struct{}{
|
||||
"/api/v1/logs/tail": {},
|
||||
"/api/v3/logs/livetail": {},
|
||||
}
|
||||
|
||||
// alert related constants
|
||||
|
||||
Reference in New Issue
Block a user