Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fc1483c56a |
@@ -337,6 +337,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
|
|||||||
apiHandler.RegisterMessagingQueuesRoutes(r, am)
|
apiHandler.RegisterMessagingQueuesRoutes(r, am)
|
||||||
apiHandler.RegisterThirdPartyApiRoutes(r, am)
|
apiHandler.RegisterThirdPartyApiRoutes(r, am)
|
||||||
apiHandler.MetricExplorerRoutes(r, am)
|
apiHandler.MetricExplorerRoutes(r, am)
|
||||||
|
apiHandler.RegisterTraceFunnelsRoutes(r, am)
|
||||||
|
|
||||||
c := cors.New(cors.Options{
|
c := cors.New(cors.Options{
|
||||||
AllowedOrigins: []string{"*"},
|
AllowedOrigins: []string{"*"},
|
||||||
|
|||||||
450
pkg/modules/tracefunnel/impltracefunnel/handler.go
Normal file
450
pkg/modules/tracefunnel/impltracefunnel/handler.go
Normal file
@@ -0,0 +1,450 @@
|
|||||||
|
package impltracefunnel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/errors"
|
||||||
|
"github.com/SigNoz/signoz/pkg/http/render"
|
||||||
|
"github.com/SigNoz/signoz/pkg/modules/tracefunnel"
|
||||||
|
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||||
|
tf "github.com/SigNoz/signoz/pkg/types/tracefunnel"
|
||||||
|
"github.com/SigNoz/signoz/pkg/valuer"
|
||||||
|
"github.com/gorilla/mux"
|
||||||
|
)
|
||||||
|
|
||||||
|
type handler struct {
|
||||||
|
module tracefunnel.Module
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHandler(module tracefunnel.Module) tracefunnel.Handler {
|
||||||
|
return &handler{module: module}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (handler *handler) New(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
var req tf.FunnelRequest
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
claims, err := authtypes.ClaimsFromContext(r.Context())
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
userID := claims.UserID
|
||||||
|
orgID := claims.OrgID
|
||||||
|
|
||||||
|
funnels, err := handler.module.List(r.Context(), orgID)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range funnels {
|
||||||
|
if f.Name == req.Name {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "a funnel with name '%s' already exists in this organization", req.Name))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
funnel, err := handler.module.Create(r.Context(), req.Timestamp, req.Name, userID, orgID)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to create funnel"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
response := tf.FunnelResponse{
|
||||||
|
FunnelID: funnel.ID.String(),
|
||||||
|
FunnelName: funnel.Name,
|
||||||
|
CreatedAt: req.Timestamp,
|
||||||
|
UserEmail: claims.Email,
|
||||||
|
OrgID: orgID,
|
||||||
|
}
|
||||||
|
|
||||||
|
render.Success(rw, http.StatusOK, response)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (handler *handler) Update(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
var req tf.FunnelRequest
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
claims, err := authtypes.ClaimsFromContext(r.Context())
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
userID := claims.UserID
|
||||||
|
orgID := claims.OrgID
|
||||||
|
|
||||||
|
if err := tracefunnel.ValidateTimestamp(req.Timestamp, "timestamp"); err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "timestamp is invalid: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
funnel, err := handler.module.Get(r.Context(), req.FunnelID.String())
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "funnel not found: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if name is being updated and if it already exists
|
||||||
|
if req.Name != "" && req.Name != funnel.Name {
|
||||||
|
funnels, err := handler.module.List(r.Context(), orgID)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to list funnels: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range funnels {
|
||||||
|
if f.Name == req.Name {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "a funnel with name '%s' already exists in this organization", req.Name))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process each step in the request
|
||||||
|
for i := range req.Steps {
|
||||||
|
if req.Steps[i].Order < 1 {
|
||||||
|
req.Steps[i].Order = int64(i + 1) // Default to sequential ordering if not specified
|
||||||
|
}
|
||||||
|
// Generate a new UUID for the step if it doesn't have one
|
||||||
|
if req.Steps[i].Id.IsZero() {
|
||||||
|
newUUID := valuer.GenerateUUID()
|
||||||
|
req.Steps[i].Id = newUUID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tracefunnel.ValidateFunnelSteps(req.Steps); err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid funnel steps: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normalize step orders
|
||||||
|
req.Steps = tracefunnel.NormalizeFunnelSteps(req.Steps)
|
||||||
|
|
||||||
|
// Update the funnel with new steps
|
||||||
|
funnel.Steps = req.Steps
|
||||||
|
funnel.UpdatedAt = time.Unix(0, req.Timestamp*1000000) // Convert to nanoseconds
|
||||||
|
funnel.UpdatedBy = userID
|
||||||
|
|
||||||
|
if req.Name != "" {
|
||||||
|
funnel.Name = req.Name
|
||||||
|
}
|
||||||
|
if req.Description != "" {
|
||||||
|
funnel.Description = req.Description
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update funnel in database
|
||||||
|
err = handler.module.Update(r.Context(), funnel, userID)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to update funnel in database: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//// Update name and description if provided
|
||||||
|
//if req.Name != "" || req.Description != "" {
|
||||||
|
// name := req.Name
|
||||||
|
//
|
||||||
|
// description := req.Description
|
||||||
|
//
|
||||||
|
// err = handler.module.UpdateMetadata(r.Context(), funnel.ID, name, description, userID)
|
||||||
|
// if err != nil {
|
||||||
|
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to update funnel metadata: %v", err))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
|
||||||
|
// Get the updated funnel to return in response
|
||||||
|
updatedFunnel, err := handler.module.Get(r.Context(), funnel.ID.String())
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to get updated funnel: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
response := tf.FunnelResponse{
|
||||||
|
FunnelName: updatedFunnel.Name,
|
||||||
|
FunnelID: updatedFunnel.ID.String(),
|
||||||
|
Steps: updatedFunnel.Steps,
|
||||||
|
CreatedAt: updatedFunnel.CreatedAt.UnixNano() / 1000000,
|
||||||
|
CreatedBy: updatedFunnel.CreatedBy,
|
||||||
|
OrgID: updatedFunnel.OrgID.String(),
|
||||||
|
UpdatedBy: userID,
|
||||||
|
UpdatedAt: updatedFunnel.UpdatedAt.UnixNano() / 1000000,
|
||||||
|
Description: updatedFunnel.Description,
|
||||||
|
}
|
||||||
|
|
||||||
|
render.Success(rw, http.StatusOK, response)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (handler *handler) List(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
claims, err := authtypes.ClaimsFromContext(r.Context())
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "unauthenticated"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
orgID := claims.OrgID
|
||||||
|
funnels, err := handler.module.List(r.Context(), orgID)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to list funnels: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var response []tf.FunnelResponse
|
||||||
|
for _, f := range funnels {
|
||||||
|
funnelResp := tf.FunnelResponse{
|
||||||
|
FunnelName: f.Name,
|
||||||
|
FunnelID: f.ID.String(),
|
||||||
|
CreatedAt: f.CreatedAt.UnixNano() / 1000000,
|
||||||
|
CreatedBy: f.CreatedBy,
|
||||||
|
OrgID: f.OrgID.String(),
|
||||||
|
UpdatedAt: f.UpdatedAt.UnixNano() / 1000000,
|
||||||
|
UpdatedBy: f.UpdatedBy,
|
||||||
|
Description: f.Description,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get user email if available
|
||||||
|
if f.CreatedByUser != nil {
|
||||||
|
funnelResp.UserEmail = f.CreatedByUser.Email
|
||||||
|
}
|
||||||
|
|
||||||
|
response = append(response, funnelResp)
|
||||||
|
}
|
||||||
|
|
||||||
|
render.Success(rw, http.StatusOK, response)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (handler *handler) Get(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
funnelID := vars["funnel_id"]
|
||||||
|
|
||||||
|
funnel, err := handler.module.Get(r.Context(), funnelID)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "funnel not found: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a response with all funnel details including step IDs
|
||||||
|
response := tf.FunnelResponse{
|
||||||
|
FunnelID: funnel.ID.String(),
|
||||||
|
FunnelName: funnel.Name,
|
||||||
|
Description: funnel.Description,
|
||||||
|
CreatedAt: funnel.CreatedAt.UnixNano() / 1000000,
|
||||||
|
UpdatedAt: funnel.UpdatedAt.UnixNano() / 1000000,
|
||||||
|
CreatedBy: funnel.CreatedBy,
|
||||||
|
UpdatedBy: funnel.UpdatedBy,
|
||||||
|
OrgID: funnel.OrgID.String(),
|
||||||
|
Steps: funnel.Steps,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add user email if available
|
||||||
|
if funnel.CreatedByUser != nil {
|
||||||
|
response.UserEmail = funnel.CreatedByUser.Email
|
||||||
|
}
|
||||||
|
|
||||||
|
render.Success(rw, http.StatusOK, response)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (handler *handler) Delete(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
funnelID := vars["funnel_id"]
|
||||||
|
|
||||||
|
err := handler.module.Delete(r.Context(), funnelID)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to delete funnel: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
render.Success(rw, http.StatusOK, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (handler *handler) Save(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
var req tf.FunnelRequest
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid request: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
claims, err := authtypes.ClaimsFromContext(r.Context())
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "unauthenticated"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
orgID := claims.OrgID
|
||||||
|
usrID := claims.UserID
|
||||||
|
|
||||||
|
funnel, err := handler.module.Get(r.Context(), req.FunnelID.String())
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "funnel not found: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
updateTimestamp := req.Timestamp
|
||||||
|
if updateTimestamp == 0 {
|
||||||
|
updateTimestamp = time.Now().UnixMilli()
|
||||||
|
} else if !tracefunnel.ValidateTimestampIsMilliseconds(updateTimestamp) {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "timestamp must be in milliseconds format (13 digits)"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
funnel.UpdatedAt = time.Unix(0, updateTimestamp*1000000) // Convert to nanoseconds
|
||||||
|
|
||||||
|
if req.UserID != "" {
|
||||||
|
funnel.UpdatedBy = usrID
|
||||||
|
}
|
||||||
|
|
||||||
|
funnel.Description = req.Description
|
||||||
|
|
||||||
|
if err := handler.module.Save(r.Context(), funnel, funnel.UpdatedBy, orgID); err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to save funnel: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to fetch metadata from DB
|
||||||
|
createdAt, updatedAt, extraDataFromDB, err := handler.module.GetFunnelMetadata(r.Context(), funnel.ID.String())
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to get funnel metadata: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := tf.FunnelResponse{
|
||||||
|
FunnelName: funnel.Name,
|
||||||
|
CreatedAt: createdAt,
|
||||||
|
UpdatedAt: updatedAt,
|
||||||
|
CreatedBy: funnel.CreatedBy,
|
||||||
|
UpdatedBy: funnel.UpdatedBy,
|
||||||
|
OrgID: funnel.OrgID.String(),
|
||||||
|
Description: extraDataFromDB,
|
||||||
|
}
|
||||||
|
|
||||||
|
render.Success(rw, http.StatusOK, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
//func (handler *handler) ValidateTraces(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
// vars := mux.Vars(r)
|
||||||
|
// funnelID := vars["funnel_id"]
|
||||||
|
//
|
||||||
|
// funnel, err := handler.module.Get(r.Context(), funnelID)
|
||||||
|
// if err != nil {
|
||||||
|
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "funnel not found: %v", err))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// var timeRange tf.TimeRange
|
||||||
|
// if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
|
||||||
|
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "error decoding time range: %v", err))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// response, err := handler.module.ValidateTraces(r.Context(), funnel, timeRange)
|
||||||
|
// if err != nil {
|
||||||
|
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "error validating traces: %v", err))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// render.Success(rw, http.StatusOK, response)
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//func (handler *handler) FunnelAnalytics(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
// vars := mux.Vars(r)
|
||||||
|
// funnelID := vars["funnel_id"]
|
||||||
|
//
|
||||||
|
// funnel, err := handler.module.Get(r.Context(), funnelID)
|
||||||
|
// if err != nil {
|
||||||
|
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "funnel not found: %v", err))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// var timeRange tf.TimeRange
|
||||||
|
// if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
|
||||||
|
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "error decoding time range: %v", err))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// response, err := handler.module.GetFunnelAnalytics(r.Context(), funnel, timeRange)
|
||||||
|
// if err != nil {
|
||||||
|
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "error getting funnel analytics: %v", err))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// render.Success(rw, http.StatusOK, response)
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//func (handler *handler) StepAnalytics(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
// vars := mux.Vars(r)
|
||||||
|
// funnelID := vars["funnel_id"]
|
||||||
|
//
|
||||||
|
// funnel, err := handler.module.Get(r.Context(), funnelID)
|
||||||
|
// if err != nil {
|
||||||
|
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "funnel not found: %v", err))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// var timeRange tf.TimeRange
|
||||||
|
// if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
|
||||||
|
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "error decoding time range: %v", err))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// response, err := handler.module.GetStepAnalytics(r.Context(), funnel, timeRange)
|
||||||
|
// if err != nil {
|
||||||
|
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "error getting step analytics: %v", err))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// render.Success(rw, http.StatusOK, response)
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//func (handler *handler) SlowestTraces(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
// handler.handleTracesWithLatency(rw, r, false)
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//func (handler *handler) ErrorTraces(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
// handler.handleTracesWithLatency(rw, r, true)
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//// handleTracesWithLatency handles both slow and error traces with common logic
|
||||||
|
//func (handler *handler) handleTracesWithLatency(rw http.ResponseWriter, r *http.Request, isError bool) {
|
||||||
|
// funnel, req, err := handler.validateTracesRequest(r)
|
||||||
|
// if err != nil {
|
||||||
|
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "%v", err))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// if err := tracefunnel.ValidateSteps(funnel, req.StepAOrder, req.StepBOrder); err != nil {
|
||||||
|
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "%v", err))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// response, err := handler.module.GetSlowestTraces(r.Context(), funnel, req.StepAOrder, req.StepBOrder, req.TimeRange, isError)
|
||||||
|
// if err != nil {
|
||||||
|
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "error getting traces: %v", err))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// render.Success(rw, http.StatusOK, response)
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//// validateTracesRequest validates and extracts the request parameters
|
||||||
|
//func (handler *handler) validateTracesRequest(r *http.Request) (*tf.Funnel, *tf.StepTransitionRequest, error) {
|
||||||
|
// vars := mux.Vars(r)
|
||||||
|
// funnelID := vars["funnel_id"]
|
||||||
|
//
|
||||||
|
// funnel, err := handler.module.Get(r.Context(), funnelID)
|
||||||
|
// if err != nil {
|
||||||
|
// return nil, nil, fmt.Errorf("funnel not found: %v", err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// var req tf.StepTransitionRequest
|
||||||
|
// if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
// return nil, nil, fmt.Errorf("invalid request body: %v", err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return funnel, &req, nil
|
||||||
|
//}
|
||||||
220
pkg/modules/tracefunnel/impltracefunnel/module.go
Normal file
220
pkg/modules/tracefunnel/impltracefunnel/module.go
Normal file
@@ -0,0 +1,220 @@
|
|||||||
|
package impltracefunnel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/modules/tracefunnel"
|
||||||
|
"github.com/SigNoz/signoz/pkg/types"
|
||||||
|
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
|
||||||
|
"github.com/SigNoz/signoz/pkg/valuer"
|
||||||
|
)
|
||||||
|
|
||||||
|
type module struct {
|
||||||
|
store traceFunnels.TraceFunnelStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewModule(store traceFunnels.TraceFunnelStore) tracefunnel.Module {
|
||||||
|
return &module{
|
||||||
|
store: store,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (module *module) Create(ctx context.Context, timestamp int64, name string, userID string, orgID string) (*traceFunnels.Funnel, error) {
|
||||||
|
orgUUID, err := valuer.NewUUID(orgID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid org ID: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
funnel := &traceFunnels.Funnel{
|
||||||
|
BaseMetadata: traceFunnels.BaseMetadata{
|
||||||
|
Name: name,
|
||||||
|
OrgID: orgUUID,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
funnel.CreatedAt = time.Unix(0, timestamp*1000000) // Convert to nanoseconds
|
||||||
|
funnel.CreatedBy = userID
|
||||||
|
|
||||||
|
// Set up the user relationship
|
||||||
|
funnel.CreatedByUser = &types.User{
|
||||||
|
ID: userID,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := module.store.Create(ctx, funnel); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create funnel: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return funnel, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get gets a funnel by ID
|
||||||
|
func (module *module) Get(ctx context.Context, funnelID string) (*traceFunnels.Funnel, error) {
|
||||||
|
uuid, err := valuer.NewUUID(funnelID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid funnel ID: %v", err)
|
||||||
|
}
|
||||||
|
return module.store.Get(ctx, uuid)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update updates a funnel
|
||||||
|
func (module *module) Update(ctx context.Context, funnel *traceFunnels.Funnel, userID string) error {
|
||||||
|
funnel.UpdatedBy = userID
|
||||||
|
return module.store.Update(ctx, funnel)
|
||||||
|
}
|
||||||
|
|
||||||
|
// List lists all funnels for an organization
|
||||||
|
func (module *module) List(ctx context.Context, orgID string) ([]*traceFunnels.Funnel, error) {
|
||||||
|
orgUUID, err := valuer.NewUUID(orgID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid org ID: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
funnels, err := module.store.List(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter by orgID
|
||||||
|
var orgFunnels []*traceFunnels.Funnel
|
||||||
|
for _, f := range funnels {
|
||||||
|
if f.OrgID == orgUUID {
|
||||||
|
orgFunnels = append(orgFunnels, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return orgFunnels, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete deletes a funnel
|
||||||
|
func (module *module) Delete(ctx context.Context, funnelID string) error {
|
||||||
|
uuid, err := valuer.NewUUID(funnelID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid funnel ID: %v", err)
|
||||||
|
}
|
||||||
|
return module.store.Delete(ctx, uuid)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save saves a funnel
|
||||||
|
func (module *module) Save(ctx context.Context, funnel *traceFunnels.Funnel, userID string, orgID string) error {
|
||||||
|
orgUUID, err := valuer.NewUUID(orgID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid org ID: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
funnel.UpdatedBy = userID
|
||||||
|
funnel.OrgID = orgUUID
|
||||||
|
return module.store.Update(ctx, funnel)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetFunnelMetadata gets metadata for a funnel
|
||||||
|
func (module *module) GetFunnelMetadata(ctx context.Context, funnelID string) (int64, int64, string, error) {
|
||||||
|
uuid, err := valuer.NewUUID(funnelID)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, "", fmt.Errorf("invalid funnel ID: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
funnel, err := module.store.Get(ctx, uuid)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return funnel.CreatedAt.UnixNano() / 1000000, funnel.UpdatedAt.UnixNano() / 1000000, funnel.Description, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ValidateTraces validates traces in a funnel
|
||||||
|
//func (module *module) ValidateTraces(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) ([]*v3.Row, error) {
|
||||||
|
// chq, err := tracefunnel.ValidateTraces(funnel, timeRange)
|
||||||
|
// if err != nil {
|
||||||
|
// RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// results, err := aH.reader. GetListResultV3(r.Context(), chq.Query)
|
||||||
|
// if err != nil {
|
||||||
|
// RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
//}
|
||||||
|
|
||||||
|
// GetFunnelAnalytics gets analytics for a funnel
|
||||||
|
//func (module *module) GetFunnelAnalytics(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*traceFunnels.FunnelAnalytics, error) {
|
||||||
|
// if err := tracefunnel.ValidateFunnel(funnel); err != nil {
|
||||||
|
// return nil, fmt.Errorf("invalid funnel: %v", err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// if err := tracefunnel.ValidateTimeRange(timeRange); err != nil {
|
||||||
|
// return nil, fmt.Errorf("invalid time range: %v", err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// _, err := tracefunnel.ValidateTracesWithLatency(funnel, timeRange)
|
||||||
|
// if err != nil {
|
||||||
|
// return nil, fmt.Errorf("error building clickhouse query: %v", err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // TODO: Execute query and return results
|
||||||
|
// // For now, return empty analytics
|
||||||
|
// return &traceFunnels.FunnelAnalytics{
|
||||||
|
// TotalStart: 0,
|
||||||
|
// TotalComplete: 0,
|
||||||
|
// ErrorCount: 0,
|
||||||
|
// AvgDurationMs: 0,
|
||||||
|
// P99LatencyMs: 0,
|
||||||
|
// ConversionRate: 0,
|
||||||
|
// }, nil
|
||||||
|
//}
|
||||||
|
|
||||||
|
// GetStepAnalytics gets analytics for each step
|
||||||
|
//func (module *module) GetStepAnalytics(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*traceFunnels.FunnelAnalytics, error) {
|
||||||
|
// if err := tracefunnel.ValidateFunnel(funnel); err != nil {
|
||||||
|
// return nil, fmt.Errorf("invalid funnel: %v", err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// if err := tracefunnel.ValidateTimeRange(timeRange); err != nil {
|
||||||
|
// return nil, fmt.Errorf("invalid time range: %v", err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// _, err := tracefunnel.GetStepAnalytics(funnel, timeRange)
|
||||||
|
// if err != nil {
|
||||||
|
// return nil, fmt.Errorf("error building clickhouse query: %v", err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // TODO: Execute query and return results
|
||||||
|
// // For now, return empty analytics
|
||||||
|
// return &traceFunnels.FunnelAnalytics{
|
||||||
|
// TotalStart: 0,
|
||||||
|
// TotalComplete: 0,
|
||||||
|
// ErrorCount: 0,
|
||||||
|
// AvgDurationMs: 0,
|
||||||
|
// P99LatencyMs: 0,
|
||||||
|
// ConversionRate: 0,
|
||||||
|
// }, nil
|
||||||
|
//}
|
||||||
|
|
||||||
|
// GetSlowestTraces gets the slowest traces between two steps
|
||||||
|
//func (module *module) GetSlowestTraces(ctx context.Context, funnel *traceFunnels.Funnel, stepAOrder, stepBOrder int64, timeRange traceFunnels.TimeRange, isError bool) (*traceFunnels.ValidTracesResponse, error) {
|
||||||
|
// if err := tracefunnel.ValidateFunnel(funnel); err != nil {
|
||||||
|
// return nil, fmt.Errorf("invalid funnel: %v", err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// if err := tracefunnel.ValidateTimeRange(timeRange); err != nil {
|
||||||
|
// return nil, fmt.Errorf("invalid time range: %v", err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// _, err := tracefunnel.GetSlowestTraces(funnel, stepAOrder, stepBOrder, timeRange, isError)
|
||||||
|
// if err != nil {
|
||||||
|
// return nil, fmt.Errorf("error building clickhouse query: %v", err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // TODO: Execute query and return results
|
||||||
|
// // For now, return empty response
|
||||||
|
// return &traceFunnels.ValidTracesResponse{
|
||||||
|
// TraceIDs: []string{},
|
||||||
|
// }, nil
|
||||||
|
//}
|
||||||
|
|
||||||
|
//UpdateMetadata updates the metadata of a funnel
|
||||||
|
//func (module *module) UpdateMetadata(ctx context.Context, funnelID valuer.UUID, name, description string, userID string) error {
|
||||||
|
// return module.store.UpdateMetadata(ctx, funnelID, name, description, userID)
|
||||||
|
//}
|
||||||
220
pkg/modules/tracefunnel/impltracefunnel/store.go
Normal file
220
pkg/modules/tracefunnel/impltracefunnel/store.go
Normal file
@@ -0,0 +1,220 @@
|
|||||||
|
package impltracefunnel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||||
|
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
|
||||||
|
"github.com/SigNoz/signoz/pkg/valuer"
|
||||||
|
)
|
||||||
|
|
||||||
|
type store struct {
|
||||||
|
sqlstore sqlstore.SQLStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStore(sqlstore sqlstore.SQLStore) traceFunnels.TraceFunnelStore {
|
||||||
|
return &store{sqlstore: sqlstore}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store *store) Create(ctx context.Context, funnel *traceFunnels.Funnel) error {
|
||||||
|
if funnel.ID.IsZero() {
|
||||||
|
funnel.ID = valuer.GenerateUUID()
|
||||||
|
}
|
||||||
|
|
||||||
|
if funnel.CreatedAt.IsZero() {
|
||||||
|
funnel.CreatedAt = time.Now()
|
||||||
|
}
|
||||||
|
if funnel.UpdatedAt.IsZero() {
|
||||||
|
funnel.UpdatedAt = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := store.
|
||||||
|
sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewInsert().
|
||||||
|
Model(funnel).
|
||||||
|
Exec(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create funnel: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if funnel.CreatedByUser != nil {
|
||||||
|
_, err = store.sqlstore.BunDB().NewUpdate().
|
||||||
|
Model(funnel).
|
||||||
|
Set("created_by = ?", funnel.CreatedByUser.ID).
|
||||||
|
Where("id = ?", funnel.ID).
|
||||||
|
Exec(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to update funnel user relationship: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get retrieves a funnel by ID
|
||||||
|
func (store *store) Get(ctx context.Context, uuid valuer.UUID) (*traceFunnels.Funnel, error) {
|
||||||
|
funnel := &traceFunnels.Funnel{}
|
||||||
|
err := store.
|
||||||
|
sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewSelect().
|
||||||
|
Model(funnel).
|
||||||
|
Relation("CreatedByUser").
|
||||||
|
Where("?TableAlias.id = ?", uuid).
|
||||||
|
Scan(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get funnel: %v", err)
|
||||||
|
}
|
||||||
|
return funnel, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update updates an existing funnel
|
||||||
|
func (store *store) Update(ctx context.Context, funnel *traceFunnels.Funnel) error {
|
||||||
|
// Update the updated_at timestamp
|
||||||
|
funnel.UpdatedAt = time.Now()
|
||||||
|
|
||||||
|
_, err := store.
|
||||||
|
sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewUpdate().
|
||||||
|
Model(funnel).
|
||||||
|
WherePK().
|
||||||
|
Exec(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to update funnel: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// List retrieves all funnels
|
||||||
|
func (store *store) List(ctx context.Context) ([]*traceFunnels.Funnel, error) {
|
||||||
|
var funnels []*traceFunnels.Funnel
|
||||||
|
err := store.
|
||||||
|
sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewSelect().
|
||||||
|
Model(&funnels).
|
||||||
|
Relation("CreatedByUser").
|
||||||
|
Scan(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to list funnels: %v", err)
|
||||||
|
}
|
||||||
|
return funnels, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete removes a funnel by ID
|
||||||
|
func (store *store) Delete(ctx context.Context, uuid valuer.UUID) error {
|
||||||
|
_, err := store.
|
||||||
|
sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewDelete().
|
||||||
|
Model((*traceFunnels.Funnel)(nil)).
|
||||||
|
Where("id = ?", uuid).Exec(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to delete funnel: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListByOrg retrieves all funnels for a specific organization
|
||||||
|
//func (store *store) ListByOrg(ctx context.Context, orgID valuer.UUID) ([]*traceFunnels.Funnel, error) {
|
||||||
|
// var funnels []*traceFunnels.Funnel
|
||||||
|
// err := store.
|
||||||
|
// sqlstore.
|
||||||
|
// BunDB().
|
||||||
|
// NewSelect().
|
||||||
|
// Model(&funnels).
|
||||||
|
// Relation("CreatedByUser").
|
||||||
|
// Where("org_id = ?", orgID).
|
||||||
|
// Scan(ctx)
|
||||||
|
// if err != nil {
|
||||||
|
// return nil, fmt.Errorf("failed to list funnels by org: %v", err)
|
||||||
|
// }
|
||||||
|
// return funnels, nil
|
||||||
|
//}
|
||||||
|
|
||||||
|
// GetByIDAndOrg retrieves a funnel by ID and organization ID
|
||||||
|
//func (store *store) GetByIDAndOrg(ctx context.Context, id, orgID valuer.UUID) (*traceFunnels.Funnel, error) {
|
||||||
|
// funnel := &traceFunnels.Funnel{}
|
||||||
|
// err := store.
|
||||||
|
// sqlstore.
|
||||||
|
// BunDB().
|
||||||
|
// NewSelect().
|
||||||
|
// Model(funnel).
|
||||||
|
// Relation("CreatedByUser").
|
||||||
|
// Where("?TableAlias.id = ? AND ?TableAlias.org_id = ?", id, orgID).
|
||||||
|
// Scan(ctx)
|
||||||
|
// if err != nil {
|
||||||
|
// return nil, fmt.Errorf("failed to get funnel by ID and org: %v", err)
|
||||||
|
// }
|
||||||
|
// return funnel, nil
|
||||||
|
//}
|
||||||
|
|
||||||
|
// UpdateSteps updates the steps of a funnel
|
||||||
|
//func (store *store) UpdateSteps(ctx context.Context, funnelID valuer.UUID, steps []traceFunnels.FunnelStep) error {
|
||||||
|
// _, err := store.
|
||||||
|
// sqlstore.
|
||||||
|
// BunDB().
|
||||||
|
// NewUpdate().
|
||||||
|
// Model((*traceFunnels.Funnel)(nil)).
|
||||||
|
// Set("steps = ?", steps).
|
||||||
|
// Where("id = ?", funnelID).
|
||||||
|
// Exec(ctx)
|
||||||
|
// if err != nil {
|
||||||
|
// return fmt.Errorf("failed to update funnel steps: %v", err)
|
||||||
|
// }
|
||||||
|
// return nil
|
||||||
|
//}
|
||||||
|
|
||||||
|
// UpdateMetadata updates the metadata of a funnel
|
||||||
|
//func (store *store) UpdateMetadata(ctx context.Context, funnelID valuer.UUID, name, description string, userID string) error {
|
||||||
|
//
|
||||||
|
// // First get the current funnel to preserve other fields
|
||||||
|
// funnel := &traceFunnels.Funnel{}
|
||||||
|
// err := store.
|
||||||
|
// sqlstore.
|
||||||
|
// BunDB().
|
||||||
|
// NewSelect().
|
||||||
|
// Model(funnel).
|
||||||
|
// Where("id = ?", funnelID).
|
||||||
|
// Scan(ctx)
|
||||||
|
// if err != nil {
|
||||||
|
// return fmt.Errorf("failed to get funnel: %v", err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // Update the fields
|
||||||
|
// funnel.Name = name
|
||||||
|
// funnel.Description = description
|
||||||
|
// funnel.UpdatedAt = time.Now()
|
||||||
|
// funnel.UpdatedBy = userID
|
||||||
|
//
|
||||||
|
// // Save the updated funnel
|
||||||
|
// _, err = store.
|
||||||
|
// sqlstore.
|
||||||
|
// BunDB().
|
||||||
|
// NewUpdate().
|
||||||
|
// Model(funnel).
|
||||||
|
// WherePK().
|
||||||
|
// Exec(ctx)
|
||||||
|
// if err != nil {
|
||||||
|
// return fmt.Errorf("failed to update funnel metadata: %v", err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // Verify the update
|
||||||
|
// updatedFunnel := &traceFunnels.Funnel{}
|
||||||
|
// err = store.
|
||||||
|
// sqlstore.
|
||||||
|
// BunDB().
|
||||||
|
// NewSelect().
|
||||||
|
// Model(updatedFunnel).
|
||||||
|
// Where("id = ?", funnelID).
|
||||||
|
// Scan(ctx)
|
||||||
|
// if err != nil {
|
||||||
|
// return fmt.Errorf("failed to verify update: %v", err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return nil
|
||||||
|
//}
|
||||||
442
pkg/modules/tracefunnel/query.go
Normal file
442
pkg/modules/tracefunnel/query.go
Normal file
@@ -0,0 +1,442 @@
|
|||||||
|
package tracefunnel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||||
|
tracefunnel "github.com/SigNoz/signoz/pkg/types/tracefunnel"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetSlowestTraces builds a ClickHouse query to get the slowest traces between two steps
|
||||||
|
func GetSlowestTraces(funnel *tracefunnel.Funnel, stepAOrder, stepBOrder int64, timeRange tracefunnel.TimeRange, withErrors bool) (*v3.ClickHouseQuery, error) {
|
||||||
|
// Find steps by order
|
||||||
|
var stepA, stepB *tracefunnel.FunnelStep
|
||||||
|
for i := range funnel.Steps {
|
||||||
|
if funnel.Steps[i].Order == stepAOrder {
|
||||||
|
stepA = &funnel.Steps[i]
|
||||||
|
}
|
||||||
|
if funnel.Steps[i].Order == stepBOrder {
|
||||||
|
stepB = &funnel.Steps[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if stepA == nil || stepB == nil {
|
||||||
|
return nil, fmt.Errorf("step not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build having clause based on withErrors flag
|
||||||
|
havingClause := ""
|
||||||
|
if withErrors {
|
||||||
|
havingClause = "HAVING has_error = 1"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build filter strings for each step
|
||||||
|
stepAFilters := ""
|
||||||
|
if stepA.Filters != nil && len(stepA.Filters.Items) > 0 {
|
||||||
|
// ToDO: need to implement where clause filtering with minimal code duplication
|
||||||
|
stepAFilters = "/* Custom filters for step A would be applied here */"
|
||||||
|
}
|
||||||
|
|
||||||
|
stepBFilters := ""
|
||||||
|
if stepB.Filters != nil && len(stepB.Filters.Items) > 0 {
|
||||||
|
// ToDO: need to implement where clause filtering with minimal code duplication
|
||||||
|
stepBFilters = "/* Custom filters for step B would be applied here */"
|
||||||
|
}
|
||||||
|
|
||||||
|
query := fmt.Sprintf(`
|
||||||
|
WITH
|
||||||
|
toUInt64(%d) AS start_time,
|
||||||
|
toUInt64(%d) AS end_time,
|
||||||
|
toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart,
|
||||||
|
toString(intDiv(end_time, 1000000000)) AS tsBucketEnd
|
||||||
|
SELECT
|
||||||
|
trace_id,
|
||||||
|
concat(toString((max_end_time_ns - min_start_time_ns) / 1e6), ' ms') AS duration_ms,
|
||||||
|
COUNT(*) AS span_count
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
s1.trace_id,
|
||||||
|
MIN(toUnixTimestamp64Nano(s1.timestamp)) AS min_start_time_ns,
|
||||||
|
MAX(toUnixTimestamp64Nano(s2.timestamp) + s2.duration_nano) AS max_end_time_ns,
|
||||||
|
MAX(s1.has_error OR s2.has_error) AS has_error
|
||||||
|
FROM %s AS s1
|
||||||
|
JOIN %s AS s2
|
||||||
|
ON s1.trace_id = s2.trace_id
|
||||||
|
WHERE s1.resource_string_service$$name = '%s'
|
||||||
|
AND s1.name = '%s'
|
||||||
|
AND s2.resource_string_service$$name = '%s'
|
||||||
|
AND s2.name = '%s'
|
||||||
|
AND s1.timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||||
|
AND s1.ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
|
||||||
|
AND s2.timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||||
|
AND s2.ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
|
||||||
|
%s
|
||||||
|
%s
|
||||||
|
GROUP BY s1.trace_id
|
||||||
|
%s
|
||||||
|
) AS trace_durations
|
||||||
|
JOIN %s AS spans
|
||||||
|
ON spans.trace_id = trace_durations.trace_id
|
||||||
|
WHERE spans.timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||||
|
AND spans.ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
|
||||||
|
GROUP BY trace_id, duration_ms
|
||||||
|
ORDER BY CAST(replaceRegexpAll(duration_ms, ' ms$', '') AS Float64) DESC
|
||||||
|
LIMIT 5`,
|
||||||
|
timeRange.StartTime,
|
||||||
|
timeRange.EndTime,
|
||||||
|
TracesTable,
|
||||||
|
TracesTable,
|
||||||
|
escapeString(stepA.ServiceName),
|
||||||
|
escapeString(stepA.SpanName),
|
||||||
|
escapeString(stepB.ServiceName),
|
||||||
|
escapeString(stepB.SpanName),
|
||||||
|
stepAFilters,
|
||||||
|
stepBFilters,
|
||||||
|
havingClause,
|
||||||
|
TracesTable,
|
||||||
|
)
|
||||||
|
|
||||||
|
return &v3.ClickHouseQuery{
|
||||||
|
Query: query,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetStepAnalytics builds a ClickHouse query to get analytics for each step
|
||||||
|
func GetStepAnalytics(funnel *tracefunnel.Funnel, timeRange tracefunnel.TimeRange) (*v3.ClickHouseQuery, error) {
|
||||||
|
if len(funnel.Steps) == 0 {
|
||||||
|
return nil, fmt.Errorf("funnel has no steps")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build funnel steps array
|
||||||
|
var steps []string
|
||||||
|
for _, step := range funnel.Steps {
|
||||||
|
steps = append(steps, fmt.Sprintf("('%s', '%s')",
|
||||||
|
escapeString(step.ServiceName), escapeString(step.SpanName)))
|
||||||
|
}
|
||||||
|
stepsArray := fmt.Sprintf("array(%s)", strings.Join(steps, ","))
|
||||||
|
|
||||||
|
// Build step CTEs
|
||||||
|
var stepCTEs []string
|
||||||
|
for i, step := range funnel.Steps {
|
||||||
|
filterStr := ""
|
||||||
|
if step.Filters != nil && len(step.Filters.Items) > 0 {
|
||||||
|
// ToDO: need to implement where clause filtering with minimal code duplication
|
||||||
|
filterStr = "/* Custom filters would be applied here */"
|
||||||
|
}
|
||||||
|
|
||||||
|
cte := fmt.Sprintf(`
|
||||||
|
step%d_traces AS (
|
||||||
|
SELECT DISTINCT trace_id
|
||||||
|
FROM %s
|
||||||
|
WHERE resource_string_service$$name = '%s'
|
||||||
|
AND name = '%s'
|
||||||
|
AND timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||||
|
AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
|
||||||
|
%s
|
||||||
|
)`,
|
||||||
|
i+1,
|
||||||
|
TracesTable,
|
||||||
|
escapeString(step.ServiceName),
|
||||||
|
escapeString(step.SpanName),
|
||||||
|
filterStr,
|
||||||
|
)
|
||||||
|
stepCTEs = append(stepCTEs, cte)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build intersecting traces CTE
|
||||||
|
var intersections []string
|
||||||
|
for i := 1; i <= len(funnel.Steps); i++ {
|
||||||
|
intersections = append(intersections, fmt.Sprintf("SELECT trace_id FROM step%d_traces", i))
|
||||||
|
}
|
||||||
|
intersectingTracesCTE := fmt.Sprintf(`
|
||||||
|
intersecting_traces AS (
|
||||||
|
%s
|
||||||
|
)`,
|
||||||
|
strings.Join(intersections, "\nINTERSECT\n"),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Build CASE expressions for each step
|
||||||
|
var caseExpressions []string
|
||||||
|
for i, step := range funnel.Steps {
|
||||||
|
totalSpansExpr := fmt.Sprintf(`
|
||||||
|
COUNT(CASE WHEN resource_string_service$$name = '%s'
|
||||||
|
AND name = '%s'
|
||||||
|
THEN trace_id END) AS total_s%d_spans`,
|
||||||
|
escapeString(step.ServiceName), escapeString(step.SpanName), i+1)
|
||||||
|
|
||||||
|
erroredSpansExpr := fmt.Sprintf(`
|
||||||
|
COUNT(CASE WHEN resource_string_service$$name = '%s'
|
||||||
|
AND name = '%s'
|
||||||
|
AND has_error = true
|
||||||
|
THEN trace_id END) AS total_s%d_errored_spans`,
|
||||||
|
escapeString(step.ServiceName), escapeString(step.SpanName), i+1)
|
||||||
|
|
||||||
|
caseExpressions = append(caseExpressions, totalSpansExpr, erroredSpansExpr)
|
||||||
|
}
|
||||||
|
|
||||||
|
query := fmt.Sprintf(`
|
||||||
|
WITH
|
||||||
|
toUInt64(%d) AS start_time,
|
||||||
|
toUInt64(%d) AS end_time,
|
||||||
|
toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart,
|
||||||
|
toString(intDiv(end_time, 1000000000)) AS tsBucketEnd,
|
||||||
|
%s AS funnel_steps,
|
||||||
|
%s,
|
||||||
|
%s
|
||||||
|
SELECT
|
||||||
|
%s
|
||||||
|
FROM %s
|
||||||
|
WHERE trace_id IN (SELECT trace_id FROM intersecting_traces)
|
||||||
|
AND timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||||
|
AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd`,
|
||||||
|
timeRange.StartTime,
|
||||||
|
timeRange.EndTime,
|
||||||
|
stepsArray,
|
||||||
|
strings.Join(stepCTEs, ",\n"),
|
||||||
|
intersectingTracesCTE,
|
||||||
|
strings.Join(caseExpressions, ",\n "),
|
||||||
|
TracesTable,
|
||||||
|
)
|
||||||
|
|
||||||
|
return &v3.ClickHouseQuery{
|
||||||
|
Query: query,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ValidateTracesWithLatency builds a ClickHouse query to validate traces with latency information
|
||||||
|
func ValidateTracesWithLatency(funnel *tracefunnel.Funnel, timeRange tracefunnel.TimeRange) (*v3.ClickHouseQuery, error) {
|
||||||
|
filters, err := buildFunnelFiltersWithLatency(funnel)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error building funnel filters with latency: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
query := generateFunnelSQLWithLatency(timeRange.StartTime, timeRange.EndTime, filters)
|
||||||
|
|
||||||
|
return &v3.ClickHouseQuery{
|
||||||
|
Query: query,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateFunnelSQLWithLatency(start, end int64, filters []tracefunnel.FunnelStepFilter) string {
|
||||||
|
var expressions []string
|
||||||
|
|
||||||
|
// Convert timestamps to nanoseconds
|
||||||
|
startTime := fmt.Sprintf("toUInt64(%d)", start)
|
||||||
|
endTime := fmt.Sprintf("toUInt64(%d)", end)
|
||||||
|
|
||||||
|
expressions = append(expressions, fmt.Sprintf("%s AS start_time", startTime))
|
||||||
|
expressions = append(expressions, fmt.Sprintf("%s AS end_time", endTime))
|
||||||
|
expressions = append(expressions, "toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart")
|
||||||
|
expressions = append(expressions, "toString(intDiv(end_time, 1000000000)) AS tsBucketEnd")
|
||||||
|
expressions = append(expressions, "(end_time - start_time) / 1e9 AS total_time_seconds")
|
||||||
|
|
||||||
|
// Define step configurations dynamically
|
||||||
|
for _, f := range filters {
|
||||||
|
expressions = append(expressions, fmt.Sprintf("('%s', '%s') AS s%d_config",
|
||||||
|
escapeString(f.ServiceName),
|
||||||
|
escapeString(f.SpanName),
|
||||||
|
f.StepNumber))
|
||||||
|
}
|
||||||
|
|
||||||
|
withClause := "WITH \n" + strings.Join(expressions, ",\n") + "\n"
|
||||||
|
|
||||||
|
// Build step raw expressions and cumulative logic
|
||||||
|
var stepRaws []string
|
||||||
|
var cumulativeLogic []string
|
||||||
|
var filterConditions []string
|
||||||
|
|
||||||
|
stepCount := len(filters)
|
||||||
|
|
||||||
|
// Build raw step detection
|
||||||
|
for i := 1; i <= stepCount; i++ {
|
||||||
|
stepRaws = append(stepRaws, fmt.Sprintf(
|
||||||
|
"MAX(CASE WHEN (resource_string_service$$name, name) = s%d_config THEN 1 ELSE 0 END) AS has_s%d_raw", i, i))
|
||||||
|
filterConditions = append(filterConditions, fmt.Sprintf("s%d_config", i))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build cumulative IF logic
|
||||||
|
for i := 1; i <= stepCount; i++ {
|
||||||
|
if i == 1 {
|
||||||
|
cumulativeLogic = append(cumulativeLogic, fmt.Sprintf(`
|
||||||
|
IF(MAX(CASE WHEN (resource_string_service$$name, name) = s1_config THEN 1 ELSE 0 END) = 1, 1, 0) AS has_s1`))
|
||||||
|
} else {
|
||||||
|
innerIf := "IF(MAX(CASE WHEN (resource_string_service$$name, name) = s1_config THEN 1 ELSE 0 END) = 1, 1, 0)"
|
||||||
|
for j := 2; j < i; j++ {
|
||||||
|
innerIf = fmt.Sprintf(`IF(%s = 1 AND MAX(CASE WHEN (resource_string_service$$name, name) = s%d_config THEN 1 ELSE 0 END) = 1, 1, 0)`, innerIf, j)
|
||||||
|
}
|
||||||
|
cumulativeLogic = append(cumulativeLogic, fmt.Sprintf(`
|
||||||
|
IF(
|
||||||
|
%s = 1 AND MAX(CASE WHEN (resource_string_service$$name, name) = s%d_config THEN 1 ELSE 0 END) = 1,
|
||||||
|
1, 0
|
||||||
|
) AS has_s%d`, innerIf, i, i))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Final SELECT counts using FILTER clauses
|
||||||
|
var stepCounts []string
|
||||||
|
for i := 1; i <= stepCount; i++ {
|
||||||
|
stepCounts = append(stepCounts, fmt.Sprintf("COUNT(DISTINCT trace_id) FILTER (WHERE has_s%d = 1) AS step%d_count", i, i))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Final query assembly
|
||||||
|
lastStep := fmt.Sprint(stepCount)
|
||||||
|
query := withClause + `
|
||||||
|
SELECT
|
||||||
|
` + strings.Join(stepCounts, ",\n ") + `,
|
||||||
|
|
||||||
|
IF(total_time_seconds = 0 OR COUNT(DISTINCT trace_id) FILTER (WHERE has_s` + lastStep + ` = 1) = 0, 0,
|
||||||
|
COUNT(DISTINCT trace_id) FILTER (WHERE has_s` + lastStep + ` = 1) / total_time_seconds
|
||||||
|
) AS avg_rate,
|
||||||
|
|
||||||
|
COUNT(DISTINCT trace_id) FILTER (WHERE has_s` + lastStep + ` = 1 AND has_error = true) AS errors,
|
||||||
|
|
||||||
|
IF(COUNT(*) = 0, 0, avg(trace_duration)) AS avg_duration,
|
||||||
|
|
||||||
|
IF(COUNT(*) = 0, 0, quantile(0.99)(trace_duration)) AS p99_latency,
|
||||||
|
|
||||||
|
IF(COUNT(DISTINCT trace_id) FILTER (WHERE has_s1 = 1) = 0, 0,
|
||||||
|
100.0 * COUNT(DISTINCT trace_id) FILTER (WHERE has_s` + lastStep + ` = 1) /
|
||||||
|
COUNT(DISTINCT trace_id) FILTER (WHERE has_s1 = 1)
|
||||||
|
) AS conversion_rate
|
||||||
|
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
trace_id,
|
||||||
|
MAX(has_error) AS has_error,
|
||||||
|
` + strings.Join(stepRaws, ",\n ") + `,
|
||||||
|
MAX(toUnixTimestamp64Nano(timestamp) + duration_nano) - MIN(toUnixTimestamp64Nano(timestamp)) AS trace_duration,
|
||||||
|
` + strings.Join(cumulativeLogic, ",\n ") + `
|
||||||
|
FROM ` + TracesTable + `
|
||||||
|
WHERE
|
||||||
|
timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||||
|
AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
|
||||||
|
AND (resource_string_service$$name, name) IN (` + strings.Join(filterConditions, ", ") + `)
|
||||||
|
GROUP BY trace_id
|
||||||
|
) AS funnel_data;`
|
||||||
|
|
||||||
|
return query
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildFunnelFiltersWithLatency(funnel *tracefunnel.Funnel) ([]tracefunnel.FunnelStepFilter, error) {
|
||||||
|
if funnel == nil {
|
||||||
|
return nil, fmt.Errorf("funnel cannot be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(funnel.Steps) == 0 {
|
||||||
|
return nil, fmt.Errorf("funnel must have at least one step")
|
||||||
|
}
|
||||||
|
|
||||||
|
filters := make([]tracefunnel.FunnelStepFilter, len(funnel.Steps))
|
||||||
|
|
||||||
|
for i, step := range funnel.Steps {
|
||||||
|
latencyPointer := "start" // Default value
|
||||||
|
if step.LatencyPointer != "" {
|
||||||
|
latencyPointer = step.LatencyPointer
|
||||||
|
}
|
||||||
|
|
||||||
|
filters[i] = tracefunnel.FunnelStepFilter{
|
||||||
|
StepNumber: i + 1,
|
||||||
|
ServiceName: step.ServiceName,
|
||||||
|
SpanName: step.SpanName,
|
||||||
|
LatencyPointer: latencyPointer,
|
||||||
|
CustomFilters: step.Filters,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return filters, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildFunnelFilters(funnel *tracefunnel.Funnel) ([]tracefunnel.FunnelStepFilter, error) {
|
||||||
|
if funnel == nil {
|
||||||
|
return nil, fmt.Errorf("funnel cannot be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(funnel.Steps) == 0 {
|
||||||
|
return nil, fmt.Errorf("funnel must have at least one step")
|
||||||
|
}
|
||||||
|
|
||||||
|
filters := make([]tracefunnel.FunnelStepFilter, len(funnel.Steps))
|
||||||
|
|
||||||
|
for i, step := range funnel.Steps {
|
||||||
|
filters[i] = tracefunnel.FunnelStepFilter{
|
||||||
|
StepNumber: i + 1,
|
||||||
|
ServiceName: step.ServiceName,
|
||||||
|
SpanName: step.SpanName,
|
||||||
|
CustomFilters: step.Filters,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return filters, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func escapeString(s string) string {
|
||||||
|
// Replace single quotes with double single quotes to escape them in SQL
|
||||||
|
return strings.ReplaceAll(s, "'", "''")
|
||||||
|
}
|
||||||
|
|
||||||
|
const TracesTable = "signoz_traces.signoz_index_v3"
|
||||||
|
|
||||||
|
func generateFunnelSQL(start, end int64, filters []tracefunnel.FunnelStepFilter) string {
|
||||||
|
var expressions []string
|
||||||
|
|
||||||
|
// Basic time expressions.
|
||||||
|
expressions = append(expressions, fmt.Sprintf("toUInt64(%d) AS start_time", start))
|
||||||
|
expressions = append(expressions, fmt.Sprintf("toUInt64(%d) AS end_time", end))
|
||||||
|
expressions = append(expressions, "toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart")
|
||||||
|
expressions = append(expressions, "toString(intDiv(end_time, 1000000000)) AS tsBucketEnd")
|
||||||
|
|
||||||
|
// Add service and span alias definitions from each filter.
|
||||||
|
for _, f := range filters {
|
||||||
|
expressions = append(expressions, fmt.Sprintf("'%s' AS service_%d", escapeString(f.ServiceName), f.StepNumber))
|
||||||
|
expressions = append(expressions, fmt.Sprintf("'%s' AS span_%d", escapeString(f.SpanName), f.StepNumber))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the CTE for each step.
|
||||||
|
for _, f := range filters {
|
||||||
|
cte := fmt.Sprintf(`step%d_traces AS (
|
||||||
|
SELECT DISTINCT trace_id
|
||||||
|
FROM %s
|
||||||
|
WHERE serviceName = service_%d
|
||||||
|
AND name = span_%d
|
||||||
|
AND timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||||
|
AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
|
||||||
|
)`, f.StepNumber, TracesTable, f.StepNumber, f.StepNumber)
|
||||||
|
expressions = append(expressions, cte)
|
||||||
|
}
|
||||||
|
|
||||||
|
withClause := "WITH \n" + strings.Join(expressions, ",\n") + "\n"
|
||||||
|
|
||||||
|
// Build the intersect clause for each step.
|
||||||
|
var intersectQueries []string
|
||||||
|
for _, f := range filters {
|
||||||
|
intersectQueries = append(intersectQueries, fmt.Sprintf("SELECT trace_id FROM step%d_traces", f.StepNumber))
|
||||||
|
}
|
||||||
|
intersectClause := strings.Join(intersectQueries, "\nINTERSECT\n")
|
||||||
|
|
||||||
|
query := withClause + `
|
||||||
|
SELECT trace_id
|
||||||
|
FROM ` + TracesTable + `
|
||||||
|
WHERE trace_id IN (
|
||||||
|
` + intersectClause + `
|
||||||
|
)
|
||||||
|
AND timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||||
|
AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
|
||||||
|
GROUP BY trace_id
|
||||||
|
LIMIT 5
|
||||||
|
`
|
||||||
|
return query
|
||||||
|
}
|
||||||
|
|
||||||
|
// ValidateTraces builds a ClickHouse query to validate traces in a funnel
|
||||||
|
func ValidateTraces(funnel *tracefunnel.Funnel, timeRange tracefunnel.TimeRange) (*v3.ClickHouseQuery, error) {
|
||||||
|
filters, err := buildFunnelFilters(funnel)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error building funnel filters: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
query := generateFunnelSQL(timeRange.StartTime, timeRange.EndTime, filters)
|
||||||
|
|
||||||
|
return &v3.ClickHouseQuery{
|
||||||
|
Query: query,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
65
pkg/modules/tracefunnel/tracefunnel.go
Normal file
65
pkg/modules/tracefunnel/tracefunnel.go
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
package tracefunnel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Module defines the interface for trace funnel operations
|
||||||
|
type Module interface {
|
||||||
|
// operations on funnel
|
||||||
|
Create(ctx context.Context, timestamp int64, name string, userID string, orgID string) (*traceFunnels.Funnel, error)
|
||||||
|
|
||||||
|
Get(ctx context.Context, funnelID string) (*traceFunnels.Funnel, error)
|
||||||
|
|
||||||
|
Update(ctx context.Context, funnel *traceFunnels.Funnel, userID string) error
|
||||||
|
|
||||||
|
List(ctx context.Context, orgID string) ([]*traceFunnels.Funnel, error)
|
||||||
|
|
||||||
|
Delete(ctx context.Context, funnelID string) error
|
||||||
|
|
||||||
|
Save(ctx context.Context, funnel *traceFunnels.Funnel, userID string, orgID string) error
|
||||||
|
|
||||||
|
GetFunnelMetadata(ctx context.Context, funnelID string) (int64, int64, string, error)
|
||||||
|
//
|
||||||
|
//GetFunnelAnalytics(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*traceFunnels.FunnelAnalytics, error)
|
||||||
|
//
|
||||||
|
//GetStepAnalytics(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*traceFunnels.FunnelAnalytics, error)
|
||||||
|
//
|
||||||
|
//GetSlowestTraces(ctx context.Context, funnel *traceFunnels.Funnel, stepAOrder, stepBOrder int64, timeRange traceFunnels.TimeRange, isError bool) (*traceFunnels.ValidTracesResponse, error)
|
||||||
|
|
||||||
|
// updates funnel metadata
|
||||||
|
//UpdateMetadata(ctx context.Context, funnelID valuer.UUID, name, description string, userID string) error
|
||||||
|
|
||||||
|
// validates funnel
|
||||||
|
//ValidateTraces(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) ([]*v3.Row, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Handler interface {
|
||||||
|
// CRUD on funnel
|
||||||
|
New(http.ResponseWriter, *http.Request)
|
||||||
|
|
||||||
|
Update(http.ResponseWriter, *http.Request)
|
||||||
|
|
||||||
|
List(http.ResponseWriter, *http.Request)
|
||||||
|
|
||||||
|
Get(http.ResponseWriter, *http.Request)
|
||||||
|
|
||||||
|
Delete(http.ResponseWriter, *http.Request)
|
||||||
|
|
||||||
|
Save(http.ResponseWriter, *http.Request)
|
||||||
|
|
||||||
|
// validator handlers
|
||||||
|
//ValidateTraces(http.ResponseWriter, *http.Request)
|
||||||
|
//
|
||||||
|
//// Analytics handlers
|
||||||
|
//FunnelAnalytics(http.ResponseWriter, *http.Request)
|
||||||
|
//
|
||||||
|
//StepAnalytics(http.ResponseWriter, *http.Request)
|
||||||
|
//
|
||||||
|
//SlowestTraces(http.ResponseWriter, *http.Request)
|
||||||
|
//
|
||||||
|
//ErrorTraces(http.ResponseWriter, *http.Request)
|
||||||
|
}
|
||||||
171
pkg/modules/tracefunnel/utils.go
Normal file
171
pkg/modules/tracefunnel/utils.go
Normal file
@@ -0,0 +1,171 @@
|
|||||||
|
package tracefunnel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
tracefunnel "github.com/SigNoz/signoz/pkg/types/tracefunnel"
|
||||||
|
"sort"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ValidateTimestamp validates a timestamp
|
||||||
|
func ValidateTimestamp(timestamp int64, fieldName string) error {
|
||||||
|
if timestamp == 0 {
|
||||||
|
return fmt.Errorf("%s is required", fieldName)
|
||||||
|
}
|
||||||
|
if timestamp < 0 {
|
||||||
|
return fmt.Errorf("%s must be positive", fieldName)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ValidateTimestampIsMilliseconds validates that a timestamp is in milliseconds
|
||||||
|
func ValidateTimestampIsMilliseconds(timestamp int64) bool {
|
||||||
|
// Check if timestamp is in milliseconds (13 digits)
|
||||||
|
return timestamp >= 1000000000000 && timestamp <= 9999999999999
|
||||||
|
}
|
||||||
|
|
||||||
|
// ValidateFunnelSteps validates funnel steps
|
||||||
|
func ValidateFunnelSteps(steps []tracefunnel.FunnelStep) error {
|
||||||
|
if len(steps) < 2 {
|
||||||
|
return fmt.Errorf("funnel must have at least 2 steps")
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, step := range steps {
|
||||||
|
if step.ServiceName == "" {
|
||||||
|
return fmt.Errorf("step %d: service name is required", i+1)
|
||||||
|
}
|
||||||
|
if step.SpanName == "" {
|
||||||
|
return fmt.Errorf("step %d: span name is required", i+1)
|
||||||
|
}
|
||||||
|
if step.Order < 0 {
|
||||||
|
return fmt.Errorf("step %d: order must be non-negative", i+1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NormalizeFunnelSteps normalizes step orders to be sequential
|
||||||
|
func NormalizeFunnelSteps(steps []tracefunnel.FunnelStep) []tracefunnel.FunnelStep {
|
||||||
|
// Sort steps by order
|
||||||
|
sort.Slice(steps, func(i, j int) bool {
|
||||||
|
return steps[i].Order < steps[j].Order
|
||||||
|
})
|
||||||
|
|
||||||
|
// Normalize orders to be sequential
|
||||||
|
for i := range steps {
|
||||||
|
steps[i].Order = int64(i + 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return steps
|
||||||
|
}
|
||||||
|
|
||||||
|
//// ValidateSteps checks if the requested steps exist in the funnel
|
||||||
|
//func ValidateSteps(funnel *tracefunnel.Funnel, stepAOrder, stepBOrder int64) error {
|
||||||
|
// stepAExists, stepBExists := false, false
|
||||||
|
// for _, step := range funnel.Steps {
|
||||||
|
// if step.Order == stepAOrder {
|
||||||
|
// stepAExists = true
|
||||||
|
// }
|
||||||
|
// if step.Order == stepBOrder {
|
||||||
|
// stepBExists = true
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// if !stepAExists || !stepBExists {
|
||||||
|
// return fmt.Errorf("one or both steps not found. Step A Order: %d, Step B Order: %d", stepAOrder, stepBOrder)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return nil
|
||||||
|
//}
|
||||||
|
|
||||||
|
//// ValidateFunnel validates a funnel's data
|
||||||
|
//func ValidateFunnel(funnel *tracefunnel.Funnel) error {
|
||||||
|
// if funnel == nil {
|
||||||
|
// return fmt.Errorf("funnel cannot be nil")
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// if len(funnel.Steps) < 2 {
|
||||||
|
// return fmt.Errorf("funnel must have at least 2 steps")
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // Validate each step
|
||||||
|
// for i, step := range funnel.Steps {
|
||||||
|
// if err := ValidateStep(step, i+1); err != nil {
|
||||||
|
// return err
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return nil
|
||||||
|
//}
|
||||||
|
|
||||||
|
// ValidateStep validates a single funnel step
|
||||||
|
//func ValidateStep(step tracefunnel.FunnelStep, stepNum int) error {
|
||||||
|
// if step.ServiceName == "" {
|
||||||
|
// return fmt.Errorf("step %d: service name is required", stepNum)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// if step.SpanName == "" {
|
||||||
|
// return fmt.Errorf("step %d: span name is required", stepNum)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// if step.Order < 0 {
|
||||||
|
// return fmt.Errorf("step %d: order must be non-negative", stepNum)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return nil
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//// ValidateTimeRange validates a time range
|
||||||
|
//func ValidateTimeRange(timeRange tracefunnel.TimeRange) error {
|
||||||
|
// if timeRange.StartTime <= 0 {
|
||||||
|
// return fmt.Errorf("start time must be positive")
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// if timeRange.EndTime <= 0 {
|
||||||
|
// return fmt.Errorf("end time must be positive")
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// if timeRange.EndTime < timeRange.StartTime {
|
||||||
|
// return fmt.Errorf("end time must be after start time")
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // Check if the time range is not too far in the future
|
||||||
|
// now := time.Now().UnixNano() / 1000000 // Convert to milliseconds
|
||||||
|
// if timeRange.EndTime > now {
|
||||||
|
// return fmt.Errorf("end time cannot be in the future")
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // Check if the time range is not too old (e.g., more than 30 days)
|
||||||
|
// maxAge := int64(30 * 24 * 60 * 60 * 1000) // 30 days in milliseconds
|
||||||
|
// if now-timeRange.StartTime > maxAge {
|
||||||
|
// return fmt.Errorf("time range cannot be older than 30 days")
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return nil
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//// ValidateStepOrder validates that step orders are sequential
|
||||||
|
//func ValidateStepOrder(steps []tracefunnel.FunnelStep) error {
|
||||||
|
// if len(steps) < 2 {
|
||||||
|
// return nil
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // Create a map to track used orders
|
||||||
|
// usedOrders := make(map[int64]bool)
|
||||||
|
//
|
||||||
|
// for i, step := range steps {
|
||||||
|
// if usedOrders[step.Order] {
|
||||||
|
// return fmt.Errorf("duplicate step order %d at step %d", step.Order, i+1)
|
||||||
|
// }
|
||||||
|
// usedOrders[step.Order] = true
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // Check if orders are sequential
|
||||||
|
// for i := 0; i < len(steps)-1; i++ {
|
||||||
|
// if steps[i+1].Order != steps[i].Order+1 {
|
||||||
|
// return fmt.Errorf("step orders must be sequential")
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return nil
|
||||||
|
//}
|
||||||
@@ -23,9 +23,11 @@ import (
|
|||||||
errorsV2 "github.com/SigNoz/signoz/pkg/errors"
|
errorsV2 "github.com/SigNoz/signoz/pkg/errors"
|
||||||
"github.com/SigNoz/signoz/pkg/http/middleware"
|
"github.com/SigNoz/signoz/pkg/http/middleware"
|
||||||
"github.com/SigNoz/signoz/pkg/http/render"
|
"github.com/SigNoz/signoz/pkg/http/render"
|
||||||
|
tracefunnels "github.com/SigNoz/signoz/pkg/modules/tracefunnel"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
|
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer"
|
"github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer"
|
||||||
"github.com/SigNoz/signoz/pkg/signoz"
|
"github.com/SigNoz/signoz/pkg/signoz"
|
||||||
|
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
|
||||||
"github.com/SigNoz/signoz/pkg/valuer"
|
"github.com/SigNoz/signoz/pkg/valuer"
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
|
|
||||||
@@ -5535,3 +5537,207 @@ func (aH *APIHandler) getDomainInfo(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
aH.Respond(w, resp)
|
aH.Respond(w, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterTraceFunnelsRoutes adds trace funnels routes
|
||||||
|
func (aH *APIHandler) RegisterTraceFunnelsRoutes(router *mux.Router, am *middleware.AuthZ) {
|
||||||
|
// Main trace funnels router
|
||||||
|
traceFunnelsRouter := router.PathPrefix("/api/v1/trace-funnels").Subrouter()
|
||||||
|
|
||||||
|
// API endpoints
|
||||||
|
traceFunnelsRouter.HandleFunc("/new",
|
||||||
|
am.ViewAccess(aH.Signoz.Handlers.TraceFunnel.New)).
|
||||||
|
Methods(http.MethodPost)
|
||||||
|
traceFunnelsRouter.HandleFunc("/list",
|
||||||
|
am.ViewAccess(aH.Signoz.Handlers.TraceFunnel.List)).
|
||||||
|
Methods(http.MethodGet)
|
||||||
|
traceFunnelsRouter.HandleFunc("/steps/update",
|
||||||
|
am.ViewAccess(aH.Signoz.Handlers.TraceFunnel.Update)).
|
||||||
|
Methods(http.MethodPut)
|
||||||
|
|
||||||
|
traceFunnelsRouter.HandleFunc("/{funnel_id}",
|
||||||
|
am.ViewAccess(aH.Signoz.Handlers.TraceFunnel.Get)).
|
||||||
|
Methods(http.MethodGet)
|
||||||
|
traceFunnelsRouter.HandleFunc("/{funnel_id}",
|
||||||
|
am.ViewAccess(aH.Signoz.Handlers.TraceFunnel.Delete)).
|
||||||
|
Methods(http.MethodDelete)
|
||||||
|
traceFunnelsRouter.HandleFunc("/save",
|
||||||
|
am.ViewAccess(aH.Signoz.Handlers.TraceFunnel.Save)).
|
||||||
|
Methods(http.MethodPost)
|
||||||
|
|
||||||
|
// Analytics endpoints
|
||||||
|
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/validate", aH.handleValidateTraces).Methods("POST")
|
||||||
|
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/overview", aH.handleFunnelAnalytics).Methods("POST")
|
||||||
|
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/steps", aH.handleStepAnalytics).Methods("POST")
|
||||||
|
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/slow-traces", aH.handleFunnelSlowTraces).Methods("POST")
|
||||||
|
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/error-traces", aH.handleFunnelErrorTraces).Methods("POST")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (aH *APIHandler) handleValidateTraces(w http.ResponseWriter, r *http.Request) {
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
funnelID := vars["funnel_id"]
|
||||||
|
|
||||||
|
funnel, err := aH.Signoz.Modules.TraceFunnel.Get(r.Context(), funnelID)
|
||||||
|
if err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var timeRange traceFunnels.TimeRange
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding time range: %v", err)}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(funnel.Steps) < 2 {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("funnel must have at least 2 steps")}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
chq, err := tracefunnels.ValidateTraces(funnel, timeRange)
|
||||||
|
if err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
|
||||||
|
if err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
aH.Respond(w, results)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (aH *APIHandler) handleFunnelAnalytics(w http.ResponseWriter, r *http.Request) {
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
funnelID := vars["funnel_id"]
|
||||||
|
|
||||||
|
funnel, err := aH.Signoz.Modules.TraceFunnel.Get(r.Context(), funnelID)
|
||||||
|
if err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var timeRange traceFunnels.TimeRange
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding time range: %v", err)}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
chq, err := tracefunnels.ValidateTracesWithLatency(funnel, timeRange)
|
||||||
|
if err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
|
||||||
|
if err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
aH.Respond(w, results)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (aH *APIHandler) handleStepAnalytics(w http.ResponseWriter, r *http.Request) {
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
funnelID := vars["funnel_id"]
|
||||||
|
|
||||||
|
funnel, err := aH.Signoz.Modules.TraceFunnel.Get(r.Context(), funnelID)
|
||||||
|
if err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var timeRange traceFunnels.TimeRange
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding time range: %v", err)}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
chq, err := tracefunnels.GetStepAnalytics(funnel, timeRange)
|
||||||
|
if err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
|
||||||
|
if err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
aH.Respond(w, results)
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleFunnelSlowTraces handles requests for slow traces in a funnel
|
||||||
|
func (aH *APIHandler) handleFunnelSlowTraces(w http.ResponseWriter, r *http.Request) {
|
||||||
|
aH.handleTracesWithLatency(w, r, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleFunnelErrorTraces handles requests for error traces in a funnel
|
||||||
|
func (aH *APIHandler) handleFunnelErrorTraces(w http.ResponseWriter, r *http.Request) {
|
||||||
|
aH.handleTracesWithLatency(w, r, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleTracesWithLatency handles both slow and error traces with common logic
|
||||||
|
func (aH *APIHandler) handleTracesWithLatency(w http.ResponseWriter, r *http.Request, isError bool) {
|
||||||
|
funnel, req, err := aH.validateTracesRequest(r)
|
||||||
|
if err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := aH.validateSteps(funnel, req.StepAOrder, req.StepBOrder); err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
chq, err := tracefunnels.GetSlowestTraces(funnel, req.StepAOrder, req.StepBOrder, req.TimeRange, isError)
|
||||||
|
if err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
|
||||||
|
if err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
aH.Respond(w, results)
|
||||||
|
}
|
||||||
|
|
||||||
|
// validateTracesRequest validates and extracts the request parameters
|
||||||
|
func (aH *APIHandler) validateTracesRequest(r *http.Request) (*traceFunnels.Funnel, *traceFunnels.StepTransitionRequest, error) {
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
funnelID := vars["funnel_id"]
|
||||||
|
|
||||||
|
funnel, err := aH.Signoz.Modules.TraceFunnel.Get(r.Context(), funnelID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("funnel not found: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var req traceFunnels.StepTransitionRequest
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("invalid request body: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return funnel, &req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// validateSteps checks if the requested steps exist in the funnel
|
||||||
|
func (aH *APIHandler) validateSteps(funnel *traceFunnels.Funnel, stepAOrder, stepBOrder int64) error {
|
||||||
|
stepAExists, stepBExists := false, false
|
||||||
|
for _, step := range funnel.Steps {
|
||||||
|
if step.Order == stepAOrder {
|
||||||
|
stepAExists = true
|
||||||
|
}
|
||||||
|
if step.Order == stepBOrder {
|
||||||
|
stepBExists = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !stepAExists || !stepBExists {
|
||||||
|
return fmt.Errorf("one or both steps not found. Step A Order: %d, Step B Order: %d", stepAOrder, stepBOrder)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -289,6 +289,7 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server,
|
|||||||
api.RegisterMessagingQueuesRoutes(r, am)
|
api.RegisterMessagingQueuesRoutes(r, am)
|
||||||
api.RegisterThirdPartyApiRoutes(r, am)
|
api.RegisterThirdPartyApiRoutes(r, am)
|
||||||
api.MetricExplorerRoutes(r, am)
|
api.MetricExplorerRoutes(r, am)
|
||||||
|
api.RegisterTraceFunnelsRoutes(r, am)
|
||||||
|
|
||||||
c := cors.New(cors.Options{
|
c := cors.New(cors.Options{
|
||||||
AllowedOrigins: []string{"*"},
|
AllowedOrigins: []string{"*"},
|
||||||
|
|||||||
@@ -5,16 +5,20 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||||
"github.com/SigNoz/signoz/pkg/modules/preference/implpreference"
|
"github.com/SigNoz/signoz/pkg/modules/preference/implpreference"
|
||||||
|
"github.com/SigNoz/signoz/pkg/modules/tracefunnel"
|
||||||
|
"github.com/SigNoz/signoz/pkg/modules/tracefunnel/impltracefunnel"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Handlers struct {
|
type Handlers struct {
|
||||||
Organization organization.Handler
|
Organization organization.Handler
|
||||||
Preference preference.Handler
|
Preference preference.Handler
|
||||||
|
TraceFunnel tracefunnel.Handler
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHandlers(modules Modules) Handlers {
|
func NewHandlers(modules Modules) Handlers {
|
||||||
return Handlers{
|
return Handlers{
|
||||||
Organization: implorganization.NewHandler(modules.Organization),
|
Organization: implorganization.NewHandler(modules.Organization),
|
||||||
Preference: implpreference.NewHandler(modules.Preference),
|
Preference: implpreference.NewHandler(modules.Preference),
|
||||||
|
TraceFunnel: impltracefunnel.NewHandler(modules.TraceFunnel),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||||
"github.com/SigNoz/signoz/pkg/modules/preference/implpreference"
|
"github.com/SigNoz/signoz/pkg/modules/preference/implpreference"
|
||||||
|
"github.com/SigNoz/signoz/pkg/modules/tracefunnel"
|
||||||
|
"github.com/SigNoz/signoz/pkg/modules/tracefunnel/impltracefunnel"
|
||||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||||
"github.com/SigNoz/signoz/pkg/types/preferencetypes"
|
"github.com/SigNoz/signoz/pkg/types/preferencetypes"
|
||||||
)
|
)
|
||||||
@@ -12,11 +14,13 @@ import (
|
|||||||
type Modules struct {
|
type Modules struct {
|
||||||
Organization organization.Module
|
Organization organization.Module
|
||||||
Preference preference.Module
|
Preference preference.Module
|
||||||
|
TraceFunnel tracefunnel.Module
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewModules(sqlstore sqlstore.SQLStore) Modules {
|
func NewModules(sqlstore sqlstore.SQLStore) Modules {
|
||||||
return Modules{
|
return Modules{
|
||||||
Organization: implorganization.NewModule(implorganization.NewStore(sqlstore)),
|
Organization: implorganization.NewModule(implorganization.NewStore(sqlstore)),
|
||||||
Preference: implpreference.NewModule(implpreference.NewStore(sqlstore), preferencetypes.NewDefaultPreferenceMap()),
|
Preference: implpreference.NewModule(implpreference.NewStore(sqlstore), preferencetypes.NewDefaultPreferenceMap()),
|
||||||
|
TraceFunnel: impltracefunnel.NewModule(impltracefunnel.NewStore(sqlstore)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -74,6 +74,7 @@ func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedM
|
|||||||
sqlmigration.NewUpdateIntegrationsFactory(sqlstore),
|
sqlmigration.NewUpdateIntegrationsFactory(sqlstore),
|
||||||
sqlmigration.NewUpdateOrganizationsFactory(sqlstore),
|
sqlmigration.NewUpdateOrganizationsFactory(sqlstore),
|
||||||
sqlmigration.NewDropGroupsFactory(sqlstore),
|
sqlmigration.NewDropGroupsFactory(sqlstore),
|
||||||
|
sqlmigration.NewAddTraceFunnelsFactory(sqlstore),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
96
pkg/sqlmigration/030_add_trace_funnels.go
Normal file
96
pkg/sqlmigration/030_add_trace_funnels.go
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
package sqlmigration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/factory"
|
||||||
|
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||||
|
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
|
||||||
|
"github.com/uptrace/bun"
|
||||||
|
"github.com/uptrace/bun/migrate"
|
||||||
|
)
|
||||||
|
|
||||||
|
type addTraceFunnels struct {
|
||||||
|
sqlstore sqlstore.SQLStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAddTraceFunnelsFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
|
||||||
|
return factory.NewProviderFactory(factory.MustNewName("add_trace_funnels"), func(ctx context.Context, providerSettings factory.ProviderSettings, config Config) (SQLMigration, error) {
|
||||||
|
return newAddTraceFunnels(ctx, providerSettings, config, sqlstore)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func newAddTraceFunnels(_ context.Context, _ factory.ProviderSettings, _ Config, sqlstore sqlstore.SQLStore) (SQLMigration, error) {
|
||||||
|
return &addTraceFunnels{sqlstore: sqlstore}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (migration *addTraceFunnels) Register(migrations *migrate.Migrations) error {
|
||||||
|
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (migration *addTraceFunnels) Up(ctx context.Context, db *bun.DB) error {
|
||||||
|
tx, err := db.BeginTx(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
|
|
||||||
|
// Create trace_funnel table with foreign key constraint inline
|
||||||
|
_, err = tx.NewCreateTable().Model((*traceFunnels.Funnel)(nil)).
|
||||||
|
ForeignKey(`("org_id") REFERENCES "organizations" ("id") ON DELETE CASCADE`).
|
||||||
|
IfNotExists().
|
||||||
|
Exec(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create trace_funnel table: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add unique constraint for org_id and name
|
||||||
|
_, err = tx.NewRaw(`
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_trace_funnel_org_id_name
|
||||||
|
ON trace_funnel (org_id, name)
|
||||||
|
`).Exec(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create unique constraint: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create indexes
|
||||||
|
_, err = tx.NewCreateIndex().Model((*traceFunnels.Funnel)(nil)).Index("idx_trace_funnel_org_id").Column("org_id").Exec(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create org_id index: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.NewCreateIndex().Model((*traceFunnels.Funnel)(nil)).Index("idx_trace_funnel_created_at").Column("created_at").Exec(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create created_at index: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (migration *addTraceFunnels) Down(ctx context.Context, db *bun.DB) error {
|
||||||
|
tx, err := db.BeginTx(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
|
|
||||||
|
// Drop trace_funnel table
|
||||||
|
_, err = tx.NewDropTable().Model((*traceFunnels.Funnel)(nil)).IfExists().Exec(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to drop trace_funnel table: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
15
pkg/types/tracefunnel/store.go
Normal file
15
pkg/types/tracefunnel/store.go
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
package traceFunnels
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/valuer"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TraceFunnelStore interface {
|
||||||
|
Create(context.Context, *Funnel) error
|
||||||
|
Get(context.Context, valuer.UUID) (*Funnel, error)
|
||||||
|
List(context.Context) ([]*Funnel, error)
|
||||||
|
Update(context.Context, *Funnel) error
|
||||||
|
Delete(context.Context, valuer.UUID) error
|
||||||
|
}
|
||||||
113
pkg/types/tracefunnel/tracefunnel.go
Normal file
113
pkg/types/tracefunnel/tracefunnel.go
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
package traceFunnels
|
||||||
|
|
||||||
|
import (
|
||||||
|
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||||
|
"github.com/SigNoz/signoz/pkg/types"
|
||||||
|
"github.com/SigNoz/signoz/pkg/valuer"
|
||||||
|
"github.com/uptrace/bun"
|
||||||
|
)
|
||||||
|
|
||||||
|
// metadata for funnels
|
||||||
|
|
||||||
|
type BaseMetadata struct {
|
||||||
|
types.Identifiable // funnel id
|
||||||
|
types.TimeAuditable
|
||||||
|
types.UserAuditable
|
||||||
|
Name string `json:"funnel_name" bun:"name,type:text,notnull"` // funnel name
|
||||||
|
Description string `json:"description" bun:"description,type:text"` // funnel description
|
||||||
|
OrgID valuer.UUID `json:"org_id" bun:"org_id,type:varchar,notnull"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Funnel Core Data Structure (Funnel and FunnelStep)
|
||||||
|
type Funnel struct {
|
||||||
|
bun.BaseModel `bun:"table:trace_funnel"`
|
||||||
|
BaseMetadata
|
||||||
|
Steps []FunnelStep `json:"steps" bun:"steps,type:text,notnull"`
|
||||||
|
Tags string `json:"tags" bun:"tags,type:text"`
|
||||||
|
CreatedByUser *types.User `json:"user" bun:"rel:belongs-to,join:created_by=id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type FunnelStep struct {
|
||||||
|
Id valuer.UUID `json:"id,omitempty"`
|
||||||
|
Name string `json:"name,omitempty"` // step name
|
||||||
|
Description string `json:"description,omitempty"` // step description
|
||||||
|
Order int64 `json:"step_order"`
|
||||||
|
ServiceName string `json:"service_name"`
|
||||||
|
SpanName string `json:"span_name"`
|
||||||
|
Filters *v3.FilterSet `json:"filters,omitempty"`
|
||||||
|
LatencyPointer string `json:"latency_pointer,omitempty"`
|
||||||
|
LatencyType string `json:"latency_type,omitempty"`
|
||||||
|
HasErrors bool `json:"has_errors"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// FunnelRequest represents all possible funnel-related requests
|
||||||
|
type FunnelRequest struct {
|
||||||
|
FunnelID valuer.UUID `json:"funnel_id,omitempty"`
|
||||||
|
Name string `json:"funnel_name,omitempty"`
|
||||||
|
Timestamp int64 `json:"timestamp,omitempty"`
|
||||||
|
Description string `json:"description,omitempty"`
|
||||||
|
Steps []FunnelStep `json:"steps,omitempty"`
|
||||||
|
UserID string `json:"user_id,omitempty"`
|
||||||
|
|
||||||
|
// Analytics specific fields
|
||||||
|
StartTime int64 `json:"start_time,omitempty"`
|
||||||
|
EndTime int64 `json:"end_time,omitempty"`
|
||||||
|
StepAOrder int64 `json:"step_a_order,omitempty"`
|
||||||
|
StepBOrder int64 `json:"step_b_order,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// FunnelResponse represents all possible funnel-related responses
|
||||||
|
type FunnelResponse struct {
|
||||||
|
FunnelID string `json:"funnel_id,omitempty"`
|
||||||
|
FunnelName string `json:"funnel_name,omitempty"`
|
||||||
|
Description string `json:"description,omitempty"`
|
||||||
|
CreatedAt int64 `json:"created_at,omitempty"`
|
||||||
|
CreatedBy string `json:"created_by,omitempty"`
|
||||||
|
UpdatedAt int64 `json:"updated_at,omitempty"`
|
||||||
|
UpdatedBy string `json:"updated_by,omitempty"`
|
||||||
|
OrgID string `json:"org_id,omitempty"`
|
||||||
|
UserEmail string `json:"user_email,omitempty"`
|
||||||
|
Funnel *Funnel `json:"funnel,omitempty"`
|
||||||
|
Steps []FunnelStep `json:"steps,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TimeRange represents a time range for analytics
|
||||||
|
type TimeRange struct {
|
||||||
|
StartTime int64 `json:"start_time"`
|
||||||
|
EndTime int64 `json:"end_time"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// StepTransitionRequest represents a request for step transition analytics
|
||||||
|
type StepTransitionRequest struct {
|
||||||
|
TimeRange
|
||||||
|
StepAOrder int64 `json:"step_a_order"`
|
||||||
|
StepBOrder int64 `json:"step_b_order"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UserInfo represents basic user information
|
||||||
|
type UserInfo struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Email string `json:"email"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Analytics on traces
|
||||||
|
//type FunnelAnalytics struct {
|
||||||
|
// TotalStart int64 `json:"total_start"`
|
||||||
|
// TotalComplete int64 `json:"total_complete"`
|
||||||
|
// ErrorCount int64 `json:"error_count"`
|
||||||
|
// AvgDurationMs float64 `json:"avg_duration_ms"`
|
||||||
|
// P99LatencyMs float64 `json:"p99_latency_ms"`
|
||||||
|
// ConversionRate float64 `json:"conversion_rate"`
|
||||||
|
//}
|
||||||
|
|
||||||
|
//type ValidTracesResponse struct {
|
||||||
|
// TraceIDs []string `json:"trace_ids"`
|
||||||
|
//}
|
||||||
|
|
||||||
|
type FunnelStepFilter struct {
|
||||||
|
StepNumber int
|
||||||
|
ServiceName string
|
||||||
|
SpanName string
|
||||||
|
LatencyPointer string // "start" or "end"
|
||||||
|
CustomFilters *v3.FilterSet
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user