Compare commits
1 Commits
main
...
v0.80.0-74
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
749532a038 |
@@ -356,6 +356,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
|
||||
apiHandler.RegisterMessagingQueuesRoutes(r, am)
|
||||
apiHandler.RegisterThirdPartyApiRoutes(r, am)
|
||||
apiHandler.MetricExplorerRoutes(r, am)
|
||||
apiHandler.RegisterTraceFunnelsRoutes(r, am)
|
||||
|
||||
c := cors.New(cors.Options{
|
||||
AllowedOrigins: []string{"*"},
|
||||
|
||||
249
pkg/modules/tracefunnel/api.go
Normal file
249
pkg/modules/tracefunnel/api.go
Normal file
@@ -0,0 +1,249 @@
|
||||
package tracefunnel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
tracefunnels "github.com/SigNoz/signoz/pkg/modules/tracefunnel/core"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// Api defines the interface for trace funnel operations
|
||||
type Api interface {
|
||||
// Funnel Management
|
||||
CreateFunnel(ctx context.Context, timestamp int64, name string, userID string, orgID string) (*traceFunnels.Funnel, error)
|
||||
GetFunnel(ctx context.Context, funnelID string) (*traceFunnels.Funnel, error)
|
||||
UpdateFunnel(ctx context.Context, funnel *traceFunnels.Funnel, userID string) error
|
||||
ListFunnels(ctx context.Context, orgID string) ([]*traceFunnels.Funnel, error)
|
||||
DeleteFunnel(ctx context.Context, funnelID string) error
|
||||
SaveFunnel(ctx context.Context, funnel *traceFunnels.Funnel, userID string, orgID string) error
|
||||
GetFunnelMetadata(ctx context.Context, funnelID string) (int64, int64, string, error)
|
||||
UpdateMetadata(ctx context.Context, funnelID valuer.UUID, name, description string, userID string) error
|
||||
|
||||
// Analytics
|
||||
ValidateTraces(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*traceFunnels.ValidTracesResponse, error)
|
||||
GetFunnelAnalytics(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*traceFunnels.FunnelAnalytics, error)
|
||||
GetStepAnalytics(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*traceFunnels.FunnelAnalytics, error)
|
||||
GetSlowestTraces(ctx context.Context, funnel *traceFunnels.Funnel, stepAOrder, stepBOrder int64, timeRange traceFunnels.TimeRange, isError bool) (*traceFunnels.ValidTracesResponse, error)
|
||||
}
|
||||
|
||||
// tracefunnel implements the Api interface
|
||||
type tracefunnel struct {
|
||||
store traceFunnels.TraceFunnelStore
|
||||
}
|
||||
|
||||
// Newtracefunnel creates a new trace funnel service
|
||||
func NewAPI(store traceFunnels.TraceFunnelStore) Api {
|
||||
return &tracefunnel{
|
||||
store: store,
|
||||
}
|
||||
}
|
||||
|
||||
// CreateFunnel creates a new funnel
|
||||
func (tf *tracefunnel) CreateFunnel(ctx context.Context, timestamp int64, name string, userID string, orgID string) (*traceFunnels.Funnel, error) {
|
||||
orgUUID, err := valuer.NewUUID(orgID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid org ID: %v", err)
|
||||
}
|
||||
|
||||
funnel := &traceFunnels.Funnel{
|
||||
BaseMetadata: traceFunnels.BaseMetadata{
|
||||
Name: name,
|
||||
OrgID: orgUUID,
|
||||
},
|
||||
}
|
||||
funnel.CreatedAt = time.Unix(0, timestamp*1000000) // Convert to nanoseconds
|
||||
funnel.CreatedBy = userID
|
||||
|
||||
// Set up the user relationship
|
||||
funnel.CreatedByUser = &types.User{
|
||||
ID: userID,
|
||||
}
|
||||
|
||||
if err := tf.store.Create(ctx, funnel); err != nil {
|
||||
return nil, fmt.Errorf("failed to create funnel: %v", err)
|
||||
}
|
||||
|
||||
return funnel, nil
|
||||
}
|
||||
|
||||
// GetFunnel gets a funnel by ID
|
||||
func (tf *tracefunnel) GetFunnel(ctx context.Context, funnelID string) (*traceFunnels.Funnel, error) {
|
||||
uuid, err := valuer.NewUUID(funnelID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid funnel ID: %v", err)
|
||||
}
|
||||
return tf.store.Get(ctx, uuid)
|
||||
}
|
||||
|
||||
// UpdateFunnel updates a funnel
|
||||
func (tf *tracefunnel) UpdateFunnel(ctx context.Context, funnel *traceFunnels.Funnel, userID string) error {
|
||||
funnel.UpdatedBy = userID
|
||||
return tf.store.Update(ctx, funnel)
|
||||
}
|
||||
|
||||
// ListFunnels lists all funnels for an organization
|
||||
func (tf *tracefunnel) ListFunnels(ctx context.Context, orgID string) ([]*traceFunnels.Funnel, error) {
|
||||
orgUUID, err := valuer.NewUUID(orgID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid org ID: %v", err)
|
||||
}
|
||||
|
||||
funnels, err := tf.store.List(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Filter by orgID
|
||||
var orgFunnels []*traceFunnels.Funnel
|
||||
for _, f := range funnels {
|
||||
if f.OrgID == orgUUID {
|
||||
orgFunnels = append(orgFunnels, f)
|
||||
}
|
||||
}
|
||||
|
||||
return orgFunnels, nil
|
||||
}
|
||||
|
||||
// DeleteFunnel deletes a funnel
|
||||
func (tf *tracefunnel) DeleteFunnel(ctx context.Context, funnelID string) error {
|
||||
uuid, err := valuer.NewUUID(funnelID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid funnel ID: %v", err)
|
||||
}
|
||||
return tf.store.Delete(ctx, uuid)
|
||||
}
|
||||
|
||||
// SaveFunnel saves a funnel
|
||||
func (tf *tracefunnel) SaveFunnel(ctx context.Context, funnel *traceFunnels.Funnel, userID string, orgID string) error {
|
||||
orgUUID, err := valuer.NewUUID(orgID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid org ID: %v", err)
|
||||
}
|
||||
|
||||
funnel.UpdatedBy = userID
|
||||
funnel.OrgID = orgUUID
|
||||
return tf.store.Update(ctx, funnel)
|
||||
}
|
||||
|
||||
// GetFunnelMetadata gets metadata for a funnel
|
||||
func (tf *tracefunnel) GetFunnelMetadata(ctx context.Context, funnelID string) (int64, int64, string, error) {
|
||||
uuid, err := valuer.NewUUID(funnelID)
|
||||
if err != nil {
|
||||
return 0, 0, "", fmt.Errorf("invalid funnel ID: %v", err)
|
||||
}
|
||||
|
||||
funnel, err := tf.store.Get(ctx, uuid)
|
||||
if err != nil {
|
||||
return 0, 0, "", err
|
||||
}
|
||||
|
||||
return funnel.CreatedAt.UnixNano() / 1000000, funnel.UpdatedAt.UnixNano() / 1000000, funnel.Description, nil
|
||||
}
|
||||
|
||||
// ValidateTraces validates traces in a funnel
|
||||
func (tf *tracefunnel) ValidateTraces(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*traceFunnels.ValidTracesResponse, error) {
|
||||
if err := tracefunnels.ValidateFunnel(funnel); err != nil {
|
||||
return nil, fmt.Errorf("invalid funnel: %v", err)
|
||||
}
|
||||
|
||||
if err := tracefunnels.ValidateTimeRange(timeRange); err != nil {
|
||||
return nil, fmt.Errorf("invalid time range: %v", err)
|
||||
}
|
||||
|
||||
_, err := tracefunnels.ValidateTraces(funnel, timeRange)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building clickhouse query: %v", err)
|
||||
}
|
||||
|
||||
// TODO: Execute query and return results
|
||||
// For now, return empty response
|
||||
return &traceFunnels.ValidTracesResponse{
|
||||
TraceIDs: []string{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetFunnelAnalytics gets analytics for a funnel
|
||||
func (tf *tracefunnel) GetFunnelAnalytics(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*traceFunnels.FunnelAnalytics, error) {
|
||||
if err := tracefunnels.ValidateFunnel(funnel); err != nil {
|
||||
return nil, fmt.Errorf("invalid funnel: %v", err)
|
||||
}
|
||||
|
||||
if err := tracefunnels.ValidateTimeRange(timeRange); err != nil {
|
||||
return nil, fmt.Errorf("invalid time range: %v", err)
|
||||
}
|
||||
|
||||
_, err := tracefunnels.ValidateTracesWithLatency(funnel, timeRange)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building clickhouse query: %v", err)
|
||||
}
|
||||
|
||||
// TODO: Execute query and return results
|
||||
// For now, return empty analytics
|
||||
return &traceFunnels.FunnelAnalytics{
|
||||
TotalStart: 0,
|
||||
TotalComplete: 0,
|
||||
ErrorCount: 0,
|
||||
AvgDurationMs: 0,
|
||||
P99LatencyMs: 0,
|
||||
ConversionRate: 0,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetStepAnalytics gets analytics for each step
|
||||
func (tf *tracefunnel) GetStepAnalytics(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*traceFunnels.FunnelAnalytics, error) {
|
||||
if err := tracefunnels.ValidateFunnel(funnel); err != nil {
|
||||
return nil, fmt.Errorf("invalid funnel: %v", err)
|
||||
}
|
||||
|
||||
if err := tracefunnels.ValidateTimeRange(timeRange); err != nil {
|
||||
return nil, fmt.Errorf("invalid time range: %v", err)
|
||||
}
|
||||
|
||||
_, err := tracefunnels.GetStepAnalytics(funnel, timeRange)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building clickhouse query: %v", err)
|
||||
}
|
||||
|
||||
// TODO: Execute query and return results
|
||||
// For now, return empty analytics
|
||||
return &traceFunnels.FunnelAnalytics{
|
||||
TotalStart: 0,
|
||||
TotalComplete: 0,
|
||||
ErrorCount: 0,
|
||||
AvgDurationMs: 0,
|
||||
P99LatencyMs: 0,
|
||||
ConversionRate: 0,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetSlowestTraces gets the slowest traces between two steps
|
||||
func (tf *tracefunnel) GetSlowestTraces(ctx context.Context, funnel *traceFunnels.Funnel, stepAOrder, stepBOrder int64, timeRange traceFunnels.TimeRange, isError bool) (*traceFunnels.ValidTracesResponse, error) {
|
||||
if err := tracefunnels.ValidateFunnel(funnel); err != nil {
|
||||
return nil, fmt.Errorf("invalid funnel: %v", err)
|
||||
}
|
||||
|
||||
if err := tracefunnels.ValidateTimeRange(timeRange); err != nil {
|
||||
return nil, fmt.Errorf("invalid time range: %v", err)
|
||||
}
|
||||
|
||||
_, err := tracefunnels.GetSlowestTraces(funnel, stepAOrder, stepBOrder, timeRange, isError)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building clickhouse query: %v", err)
|
||||
}
|
||||
|
||||
// TODO: Execute query and return results
|
||||
// For now, return empty response
|
||||
return &traceFunnels.ValidTracesResponse{
|
||||
TraceIDs: []string{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// UpdateMetadata updates the metadata of a funnel
|
||||
func (tf *tracefunnel) UpdateMetadata(ctx context.Context, funnelID valuer.UUID, name, description string, userID string) error {
|
||||
return tf.store.UpdateMetadata(ctx, funnelID, name, description, userID)
|
||||
}
|
||||
247
pkg/modules/tracefunnel/core/clickhouse.go
Normal file
247
pkg/modules/tracefunnel/core/clickhouse.go
Normal file
@@ -0,0 +1,247 @@
|
||||
package tracefunnels
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
|
||||
)
|
||||
|
||||
// QueryBuilder helps build ClickHouse queries for funnel analysis
|
||||
type QueryBuilder struct {
|
||||
steps []traceFunnels.FunnelStep
|
||||
timeRange traceFunnels.TimeRange
|
||||
limit int
|
||||
}
|
||||
|
||||
// NewQueryBuilder creates a new query builder
|
||||
func NewQueryBuilder(funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) *QueryBuilder {
|
||||
return &QueryBuilder{
|
||||
steps: funnel.Steps,
|
||||
timeRange: timeRange,
|
||||
limit: 100,
|
||||
}
|
||||
}
|
||||
|
||||
// WithLimit sets the limit for the query
|
||||
func (qb *QueryBuilder) WithLimit(limit int) *QueryBuilder {
|
||||
qb.limit = limit
|
||||
return qb
|
||||
}
|
||||
|
||||
// buildStepCTE builds a CTE for a single step
|
||||
func (qb *QueryBuilder) buildStepCTE(step traceFunnels.FunnelStep, stepNum int) string {
|
||||
return fmt.Sprintf(`
|
||||
step%d AS (
|
||||
SELECT
|
||||
trace_id,
|
||||
service_name,
|
||||
name as span_name,
|
||||
timestamp,
|
||||
duration_ns,
|
||||
status_code
|
||||
FROM signoz_traces.signoz_index_v2
|
||||
WHERE service_name = '%s'
|
||||
AND name = '%s'
|
||||
AND timestamp >= %d
|
||||
AND timestamp <= %d
|
||||
)`,
|
||||
stepNum,
|
||||
step.ServiceName,
|
||||
step.SpanName,
|
||||
qb.timeRange.StartTime,
|
||||
qb.timeRange.EndTime,
|
||||
)
|
||||
}
|
||||
|
||||
// buildStepCTEs builds all step CTEs
|
||||
func (qb *QueryBuilder) buildStepCTEs() string {
|
||||
var ctes []string
|
||||
for i, step := range qb.steps {
|
||||
ctes = append(ctes, qb.buildStepCTE(step, i+1))
|
||||
}
|
||||
return strings.Join(ctes, ",\n")
|
||||
}
|
||||
|
||||
// buildStepJoins builds the JOIN clauses between steps
|
||||
func (qb *QueryBuilder) buildStepJoins() string {
|
||||
var joins []string
|
||||
for i := 1; i < len(qb.steps); i++ {
|
||||
joins = append(joins, fmt.Sprintf("JOIN step%d s%d ON s%d.trace_id = s%d.trace_id AND s%d.timestamp > s%d.timestamp",
|
||||
i+1, i+1, i, i+1, i+1, i))
|
||||
}
|
||||
return strings.Join(joins, "\n")
|
||||
}
|
||||
|
||||
// buildStepSelects builds the SELECT clauses for each step
|
||||
func (qb *QueryBuilder) buildStepSelects() string {
|
||||
var selects []string
|
||||
for i := range qb.steps {
|
||||
selects = append(selects, fmt.Sprintf("s%d.timestamp as step%d_timestamp", i+1, i+1))
|
||||
selects = append(selects, fmt.Sprintf("s%d.duration_ns / 1000000 as step%d_duration_ms", i+1, i+1))
|
||||
}
|
||||
return strings.Join(selects, ",\n")
|
||||
}
|
||||
|
||||
// ValidateTraces builds a ClickHouse query to validate traces in a funnel
|
||||
func ValidateTraces(funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*v3.ClickHouseQuery, error) {
|
||||
if err := ValidateFunnel(funnel); err != nil {
|
||||
return nil, fmt.Errorf("invalid funnel: %v", err)
|
||||
}
|
||||
|
||||
if err := ValidateTimeRange(timeRange); err != nil {
|
||||
return nil, fmt.Errorf("invalid time range: %v", err)
|
||||
}
|
||||
|
||||
qb := NewQueryBuilder(funnel, timeRange)
|
||||
query := fmt.Sprintf(`
|
||||
WITH %s
|
||||
SELECT
|
||||
s1.trace_id,
|
||||
%s,
|
||||
s%d.timestamp - s1.timestamp as total_duration_ms
|
||||
FROM step1 s1
|
||||
%s
|
||||
ORDER BY total_duration_ms DESC
|
||||
LIMIT %d
|
||||
`,
|
||||
qb.buildStepCTEs(),
|
||||
qb.buildStepSelects(),
|
||||
len(funnel.Steps),
|
||||
qb.buildStepJoins(),
|
||||
qb.limit,
|
||||
)
|
||||
|
||||
return &v3.ClickHouseQuery{
|
||||
Query: query,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ValidateTracesWithLatency builds a ClickHouse query to validate traces with latency information
|
||||
func ValidateTracesWithLatency(funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*v3.ClickHouseQuery, error) {
|
||||
if err := ValidateFunnel(funnel); err != nil {
|
||||
return nil, fmt.Errorf("invalid funnel: %v", err)
|
||||
}
|
||||
|
||||
if err := ValidateTimeRange(timeRange); err != nil {
|
||||
return nil, fmt.Errorf("invalid time range: %v", err)
|
||||
}
|
||||
|
||||
qb := NewQueryBuilder(funnel, timeRange)
|
||||
query := fmt.Sprintf(`
|
||||
WITH %s
|
||||
SELECT
|
||||
s1.trace_id,
|
||||
%s,
|
||||
s%d.timestamp - s1.timestamp as transition_duration_ms
|
||||
FROM step1 s1
|
||||
%s
|
||||
ORDER BY transition_duration_ms DESC
|
||||
LIMIT %d
|
||||
`,
|
||||
qb.buildStepCTEs(),
|
||||
qb.buildStepSelects(),
|
||||
len(funnel.Steps),
|
||||
qb.buildStepJoins(),
|
||||
qb.limit,
|
||||
)
|
||||
|
||||
return &v3.ClickHouseQuery{
|
||||
Query: query,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetStepAnalytics builds a ClickHouse query to get analytics for each step
|
||||
func GetStepAnalytics(funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*v3.ClickHouseQuery, error) {
|
||||
if err := ValidateFunnel(funnel); err != nil {
|
||||
return nil, fmt.Errorf("invalid funnel: %v", err)
|
||||
}
|
||||
|
||||
if err := ValidateTimeRange(timeRange); err != nil {
|
||||
return nil, fmt.Errorf("invalid time range: %v", err)
|
||||
}
|
||||
|
||||
qb := NewQueryBuilder(funnel, timeRange)
|
||||
query := fmt.Sprintf(`
|
||||
WITH %s
|
||||
SELECT
|
||||
COUNT(DISTINCT s1.trace_id) as total_start,
|
||||
COUNT(DISTINCT s%d.trace_id) as total_complete,
|
||||
COUNT(DISTINCT CASE WHEN s%d.status_code >= 400 THEN s%d.trace_id END) as error_count,
|
||||
AVG(s%d.timestamp - s1.timestamp) as avg_duration_ms,
|
||||
quantile(0.99)(s%d.timestamp - s1.timestamp) as p99_latency_ms,
|
||||
COUNT(DISTINCT s%d.trace_id) * 100.0 / COUNT(DISTINCT s1.trace_id) as conversion_rate
|
||||
FROM step1 s1
|
||||
%s
|
||||
`,
|
||||
qb.buildStepCTEs(),
|
||||
len(funnel.Steps),
|
||||
len(funnel.Steps),
|
||||
len(funnel.Steps),
|
||||
len(funnel.Steps),
|
||||
len(funnel.Steps),
|
||||
len(funnel.Steps),
|
||||
qb.buildStepJoins(),
|
||||
)
|
||||
|
||||
return &v3.ClickHouseQuery{
|
||||
Query: query,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetSlowestTraces builds a ClickHouse query to get the slowest traces between two steps
|
||||
func GetSlowestTraces(funnel *traceFunnels.Funnel, stepAOrder, stepBOrder int64, timeRange traceFunnels.TimeRange, isError bool) (*v3.ClickHouseQuery, error) {
|
||||
if err := ValidateFunnel(funnel); err != nil {
|
||||
return nil, fmt.Errorf("invalid funnel: %v", err)
|
||||
}
|
||||
|
||||
if err := ValidateTimeRange(timeRange); err != nil {
|
||||
return nil, fmt.Errorf("invalid time range: %v", err)
|
||||
}
|
||||
|
||||
// Find the steps by order
|
||||
var stepA, stepB *traceFunnels.FunnelStep
|
||||
for _, step := range funnel.Steps {
|
||||
if step.Order == stepAOrder {
|
||||
stepA = &step
|
||||
}
|
||||
if step.Order == stepBOrder {
|
||||
stepB = &step
|
||||
}
|
||||
}
|
||||
|
||||
if stepA == nil || stepB == nil {
|
||||
return nil, fmt.Errorf("steps not found")
|
||||
}
|
||||
|
||||
// Create a new funnel with just the two steps we want to analyze
|
||||
analysisFunnel := &traceFunnels.Funnel{
|
||||
Steps: []traceFunnels.FunnelStep{*stepA, *stepB},
|
||||
}
|
||||
|
||||
qb := NewQueryBuilder(analysisFunnel, timeRange)
|
||||
query := fmt.Sprintf(`
|
||||
WITH %s
|
||||
SELECT
|
||||
s1.trace_id,
|
||||
%s,
|
||||
s2.timestamp - s1.timestamp as transition_duration_ms,
|
||||
s2.status_code
|
||||
FROM step1 s1
|
||||
%s
|
||||
WHERE CASE WHEN %t THEN s2.status_code >= 400 ELSE 1=1 END
|
||||
ORDER BY transition_duration_ms DESC
|
||||
LIMIT %d
|
||||
`,
|
||||
qb.buildStepCTEs(),
|
||||
qb.buildStepSelects(),
|
||||
qb.buildStepJoins(),
|
||||
isError,
|
||||
qb.limit,
|
||||
)
|
||||
|
||||
return &v3.ClickHouseQuery{
|
||||
Query: query,
|
||||
}, nil
|
||||
}
|
||||
177
pkg/modules/tracefunnel/core/store.go
Normal file
177
pkg/modules/tracefunnel/core/store.go
Normal file
@@ -0,0 +1,177 @@
|
||||
package tracefunnels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// store implements the TraceFunnelStore interface
|
||||
type store struct {
|
||||
db sqlstore.SQLStore
|
||||
}
|
||||
|
||||
// NewStore creates a new trace funnel store
|
||||
func NewStore(db sqlstore.SQLStore) traceFunnels.TraceFunnelStore {
|
||||
return &store{db: db}
|
||||
}
|
||||
|
||||
// Create creates a new funnel in the database
|
||||
func (s *store) Create(ctx context.Context, funnel *traceFunnels.Funnel) error {
|
||||
// Generate a new UUID for the funnel if not already set
|
||||
if funnel.ID.IsZero() {
|
||||
funnel.ID = valuer.GenerateUUID()
|
||||
}
|
||||
|
||||
// Set timestamps if not set
|
||||
if funnel.CreatedAt.IsZero() {
|
||||
funnel.CreatedAt = time.Now()
|
||||
}
|
||||
if funnel.UpdatedAt.IsZero() {
|
||||
funnel.UpdatedAt = time.Now()
|
||||
}
|
||||
|
||||
// Insert the funnel
|
||||
_, err := s.db.BunDB().NewInsert().Model(funnel).Exec(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create funnel: %v", err)
|
||||
}
|
||||
|
||||
// Update the user relationship
|
||||
if funnel.CreatedByUser != nil {
|
||||
_, err = s.db.BunDB().NewUpdate().
|
||||
Model(funnel).
|
||||
Set("created_by = ?", funnel.CreatedByUser.ID).
|
||||
Where("id = ?", funnel.ID).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update funnel user relationship: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get retrieves a funnel by ID
|
||||
func (s *store) Get(ctx context.Context, uuid valuer.UUID) (*traceFunnels.Funnel, error) {
|
||||
funnel := &traceFunnels.Funnel{}
|
||||
err := s.db.BunDB().NewSelect().
|
||||
Model(funnel).
|
||||
Relation("CreatedByUser").
|
||||
Where("?TableAlias.id = ?", uuid).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get funnel: %v", err)
|
||||
}
|
||||
return funnel, nil
|
||||
}
|
||||
|
||||
// Update updates an existing funnel
|
||||
func (s *store) Update(ctx context.Context, funnel *traceFunnels.Funnel) error {
|
||||
// Update the updated_at timestamp
|
||||
funnel.UpdatedAt = time.Now()
|
||||
|
||||
_, err := s.db.BunDB().NewUpdate().Model(funnel).WherePK().Exec(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update funnel: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// List retrieves all funnels
|
||||
func (s *store) List(ctx context.Context) ([]*traceFunnels.Funnel, error) {
|
||||
var funnels []*traceFunnels.Funnel
|
||||
err := s.db.BunDB().NewSelect().
|
||||
Model(&funnels).
|
||||
Relation("CreatedByUser").
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list funnels: %v", err)
|
||||
}
|
||||
return funnels, nil
|
||||
}
|
||||
|
||||
// Delete removes a funnel by ID
|
||||
func (s *store) Delete(ctx context.Context, uuid valuer.UUID) error {
|
||||
_, err := s.db.BunDB().NewDelete().Model((*traceFunnels.Funnel)(nil)).Where("id = ?", uuid).Exec(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete funnel: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListByOrg retrieves all funnels for a specific organization
|
||||
func (s *store) ListByOrg(ctx context.Context, orgID valuer.UUID) ([]*traceFunnels.Funnel, error) {
|
||||
var funnels []*traceFunnels.Funnel
|
||||
err := s.db.BunDB().NewSelect().
|
||||
Model(&funnels).
|
||||
Relation("CreatedByUser").
|
||||
Where("org_id = ?", orgID).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list funnels by org: %v", err)
|
||||
}
|
||||
return funnels, nil
|
||||
}
|
||||
|
||||
// GetByIDAndOrg retrieves a funnel by ID and organization ID
|
||||
func (s *store) GetByIDAndOrg(ctx context.Context, id, orgID valuer.UUID) (*traceFunnels.Funnel, error) {
|
||||
funnel := &traceFunnels.Funnel{}
|
||||
err := s.db.BunDB().NewSelect().
|
||||
Model(funnel).
|
||||
Relation("CreatedByUser").
|
||||
Where("?TableAlias.id = ? AND ?TableAlias.org_id = ?", id, orgID).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get funnel by ID and org: %v", err)
|
||||
}
|
||||
return funnel, nil
|
||||
}
|
||||
|
||||
// UpdateSteps updates the steps of a funnel
|
||||
func (s *store) UpdateSteps(ctx context.Context, funnelID valuer.UUID, steps []traceFunnels.FunnelStep) error {
|
||||
_, err := s.db.BunDB().NewUpdate().Model((*traceFunnels.Funnel)(nil)).
|
||||
Set("steps = ?", steps).
|
||||
Where("id = ?", funnelID).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update funnel steps: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateMetadata updates the metadata of a funnel
|
||||
func (s *store) UpdateMetadata(ctx context.Context, funnelID valuer.UUID, name, description string, userID string) error {
|
||||
|
||||
// First get the current funnel to preserve other fields
|
||||
funnel := &traceFunnels.Funnel{}
|
||||
err := s.db.BunDB().NewSelect().Model(funnel).Where("id = ?", funnelID).Scan(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get funnel: %v", err)
|
||||
}
|
||||
|
||||
// Update the fields
|
||||
funnel.Name = name
|
||||
funnel.Description = description
|
||||
funnel.UpdatedAt = time.Now()
|
||||
funnel.UpdatedBy = userID
|
||||
|
||||
// Save the updated funnel
|
||||
_, err = s.db.BunDB().NewUpdate().Model(funnel).WherePK().Exec(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update funnel metadata: %v", err)
|
||||
}
|
||||
|
||||
// Verify the update
|
||||
updatedFunnel := &traceFunnels.Funnel{}
|
||||
err = s.db.BunDB().NewSelect().Model(updatedFunnel).Where("id = ?", funnelID).Scan(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to verify update: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
100
pkg/modules/tracefunnel/core/validation.go
Normal file
100
pkg/modules/tracefunnel/core/validation.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package tracefunnels
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
|
||||
)
|
||||
|
||||
// ValidateFunnel validates a funnel's data
|
||||
func ValidateFunnel(funnel *traceFunnels.Funnel) error {
|
||||
if funnel == nil {
|
||||
return fmt.Errorf("funnel cannot be nil")
|
||||
}
|
||||
|
||||
if len(funnel.Steps) < 2 {
|
||||
return fmt.Errorf("funnel must have at least 2 steps")
|
||||
}
|
||||
|
||||
// Validate each step
|
||||
for i, step := range funnel.Steps {
|
||||
if err := ValidateStep(step, i+1); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidateStep validates a single funnel step
|
||||
func ValidateStep(step traceFunnels.FunnelStep, stepNum int) error {
|
||||
if step.ServiceName == "" {
|
||||
return fmt.Errorf("step %d: service name is required", stepNum)
|
||||
}
|
||||
|
||||
if step.SpanName == "" {
|
||||
return fmt.Errorf("step %d: span name is required", stepNum)
|
||||
}
|
||||
|
||||
if step.Order < 0 {
|
||||
return fmt.Errorf("step %d: order must be non-negative", stepNum)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidateTimeRange validates a time range
|
||||
func ValidateTimeRange(timeRange traceFunnels.TimeRange) error {
|
||||
if timeRange.StartTime <= 0 {
|
||||
return fmt.Errorf("start time must be positive")
|
||||
}
|
||||
|
||||
if timeRange.EndTime <= 0 {
|
||||
return fmt.Errorf("end time must be positive")
|
||||
}
|
||||
|
||||
if timeRange.EndTime < timeRange.StartTime {
|
||||
return fmt.Errorf("end time must be after start time")
|
||||
}
|
||||
|
||||
// Check if the time range is not too far in the future
|
||||
now := time.Now().UnixNano() / 1000000 // Convert to milliseconds
|
||||
if timeRange.EndTime > now {
|
||||
return fmt.Errorf("end time cannot be in the future")
|
||||
}
|
||||
|
||||
// Check if the time range is not too old (e.g., more than 30 days)
|
||||
maxAge := int64(30 * 24 * 60 * 60 * 1000) // 30 days in milliseconds
|
||||
if now-timeRange.StartTime > maxAge {
|
||||
return fmt.Errorf("time range cannot be older than 30 days")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidateStepOrder validates that step orders are sequential
|
||||
func ValidateStepOrder(steps []traceFunnels.FunnelStep) error {
|
||||
if len(steps) < 2 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create a map to track used orders
|
||||
usedOrders := make(map[int64]bool)
|
||||
|
||||
for i, step := range steps {
|
||||
if usedOrders[step.Order] {
|
||||
return fmt.Errorf("duplicate step order %d at step %d", step.Order, i+1)
|
||||
}
|
||||
usedOrders[step.Order] = true
|
||||
}
|
||||
|
||||
// Check if orders are sequential
|
||||
for i := 0; i < len(steps)-1; i++ {
|
||||
if steps[i+1].Order != steps[i].Order+1 {
|
||||
return fmt.Errorf("step orders must be sequential")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
79
pkg/modules/tracefunnel/utils.go
Normal file
79
pkg/modules/tracefunnel/utils.go
Normal file
@@ -0,0 +1,79 @@
|
||||
package tracefunnel
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
tf "github.com/SigNoz/signoz/pkg/types/tracefunnel"
|
||||
"sort"
|
||||
)
|
||||
|
||||
// ValidateSteps checks if the requested steps exist in the funnel
|
||||
func ValidateSteps(funnel *tf.Funnel, stepAOrder, stepBOrder int64) error {
|
||||
stepAExists, stepBExists := false, false
|
||||
for _, step := range funnel.Steps {
|
||||
if step.Order == stepAOrder {
|
||||
stepAExists = true
|
||||
}
|
||||
if step.Order == stepBOrder {
|
||||
stepBExists = true
|
||||
}
|
||||
}
|
||||
|
||||
if !stepAExists || !stepBExists {
|
||||
return fmt.Errorf("one or both steps not found. Step A Order: %d, Step B Order: %d", stepAOrder, stepBOrder)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidateTimestamp validates a timestamp
|
||||
func ValidateTimestamp(timestamp int64, fieldName string) error {
|
||||
if timestamp == 0 {
|
||||
return fmt.Errorf("%s is required", fieldName)
|
||||
}
|
||||
if timestamp < 0 {
|
||||
return fmt.Errorf("%s must be positive", fieldName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidateTimestampIsMilliseconds validates that a timestamp is in milliseconds
|
||||
func ValidateTimestampIsMilliseconds(timestamp int64) bool {
|
||||
// Check if timestamp is in milliseconds (13 digits)
|
||||
return timestamp >= 1000000000000 && timestamp <= 9999999999999
|
||||
}
|
||||
|
||||
// ValidateFunnelSteps validates funnel steps
|
||||
func ValidateFunnelSteps(steps []tf.FunnelStep) error {
|
||||
if len(steps) < 2 {
|
||||
return fmt.Errorf("funnel must have at least 2 steps")
|
||||
}
|
||||
|
||||
for i, step := range steps {
|
||||
if step.ServiceName == "" {
|
||||
return fmt.Errorf("step %d: service name is required", i+1)
|
||||
}
|
||||
if step.SpanName == "" {
|
||||
return fmt.Errorf("step %d: span name is required", i+1)
|
||||
}
|
||||
if step.Order < 0 {
|
||||
return fmt.Errorf("step %d: order must be non-negative", i+1)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NormalizeFunnelSteps normalizes step orders to be sequential
|
||||
func NormalizeFunnelSteps(steps []tf.FunnelStep) []tf.FunnelStep {
|
||||
// Sort steps by order
|
||||
sort.Slice(steps, func(i, j int) bool {
|
||||
return steps[i].Order < steps[j].Order
|
||||
})
|
||||
|
||||
// Normalize orders to be sequential
|
||||
for i := range steps {
|
||||
steps[i].Order = int64(i + 1)
|
||||
}
|
||||
|
||||
return steps
|
||||
}
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
traceFunnel "github.com/SigNoz/signoz/pkg/modules/tracefunnel"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer"
|
||||
"github.com/SigNoz/signoz/pkg/signoz"
|
||||
@@ -61,7 +62,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
tracefunnel "github.com/SigNoz/signoz/pkg/types/tracefunnel"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
@@ -152,6 +154,8 @@ type APIHandler struct {
|
||||
|
||||
OrganizationAPI organization.API
|
||||
OrganizationModule organization.Module
|
||||
|
||||
TraceFunnels traceFunnel.Api
|
||||
}
|
||||
|
||||
type APIHandlerOpts struct {
|
||||
@@ -203,6 +207,7 @@ type APIHandlerOpts struct {
|
||||
Preference preference.API
|
||||
OrganizationAPI organization.API
|
||||
OrganizationModule organization.Module
|
||||
TraceFunnels traceFunnel.Api
|
||||
}
|
||||
|
||||
// NewAPIHandler returns an APIHandler
|
||||
@@ -275,6 +280,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
||||
FieldsAPI: opts.FieldsAPI,
|
||||
OrganizationAPI: opts.OrganizationAPI,
|
||||
OrganizationModule: opts.OrganizationModule,
|
||||
TraceFunnels: opts.TraceFunnels,
|
||||
}
|
||||
|
||||
logsQueryBuilder := logsv3.PrepareLogsQuery
|
||||
@@ -5615,3 +5621,463 @@ func (aH *APIHandler) getDomainInfo(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
aH.Respond(w, resp)
|
||||
}
|
||||
|
||||
// RegisterTraceFunnelsRoutes adds trace funnels routes
|
||||
func (aH *APIHandler) RegisterTraceFunnelsRoutes(router *mux.Router, am *middleware.AuthZ) {
|
||||
// Main trace funnels router
|
||||
traceFunnelsRouter := router.PathPrefix("/api/v1/trace-funnels").Subrouter()
|
||||
|
||||
// API endpoints
|
||||
traceFunnelsRouter.HandleFunc("/new-funnel", aH.handleNewFunnel).Methods("POST")
|
||||
traceFunnelsRouter.HandleFunc("/steps/update", aH.handleUpdateFunnelStep).Methods("PUT")
|
||||
traceFunnelsRouter.HandleFunc("/list", aH.handleListFunnels).Methods("GET")
|
||||
|
||||
// Standard RESTful endpoints for funnel resource
|
||||
traceFunnelsRouter.HandleFunc("/{funnel_id}", aH.handleGetFunnel).Methods("GET")
|
||||
traceFunnelsRouter.HandleFunc("/{funnel_id}", aH.handleDeleteFunnel).Methods("DELETE")
|
||||
traceFunnelsRouter.HandleFunc("/save", aH.handleSaveFunnel).Methods("POST")
|
||||
|
||||
// Analytics endpoints
|
||||
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/validate", aH.handleValidateTraces).Methods("POST")
|
||||
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/overview", aH.handleFunnelAnalytics).Methods("POST")
|
||||
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/steps", aH.handleStepAnalytics).Methods("POST")
|
||||
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/slow-traces", aH.handleFunnelSlowTraces).Methods("POST")
|
||||
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/error-traces", aH.handleFunnelErrorTraces).Methods("POST")
|
||||
}
|
||||
|
||||
// handleNewFunnel handles the creation of a new funnel
|
||||
func (aH *APIHandler) handleNewFunnel(w http.ResponseWriter, r *http.Request) {
|
||||
var req tracefunnel.FunnelRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
claims, err := authtypes.ClaimsFromContext(r.Context())
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorUnauthorized, Err: fmt.Errorf("unauthenticated")}, nil)
|
||||
return
|
||||
}
|
||||
userID := claims.UserID
|
||||
orgID := claims.OrgID
|
||||
|
||||
// Check if a funnel with the same name already exists for this org
|
||||
funnels, err := aH.TraceFunnels.ListFunnels(r.Context(), orgID)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("failed to list funnels: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
for _, f := range funnels {
|
||||
if f.Name == req.Name {
|
||||
RespondError(w, &model.ApiError{
|
||||
Typ: model.ErrorBadData,
|
||||
Err: fmt.Errorf("a funnel with name '%s' already exists in this organization", req.Name),
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
funnel, err := aH.TraceFunnels.CreateFunnel(r.Context(), req.Timestamp, req.Name, userID, orgID)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("failed to create funnel: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
response := tracefunnel.FunnelResponse{
|
||||
FunnelID: funnel.ID.String(),
|
||||
FunnelName: funnel.Name,
|
||||
CreationTimestamp: req.Timestamp,
|
||||
UserEmail: claims.Email,
|
||||
OrgID: orgID,
|
||||
}
|
||||
|
||||
aH.Respond(w, response)
|
||||
}
|
||||
|
||||
// handleUpdateFunnelStep adds or updates steps for an existing funnel
|
||||
func (aH *APIHandler) handleUpdateFunnelStep(w http.ResponseWriter, r *http.Request) {
|
||||
var req tracefunnel.FunnelRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
claims, err := authtypes.ClaimsFromContext(r.Context())
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorUnauthorized, Err: fmt.Errorf("unauthenticated")}, nil)
|
||||
return
|
||||
}
|
||||
userID := claims.UserID
|
||||
orgID := claims.OrgID
|
||||
|
||||
if err := traceFunnel.ValidateTimestamp(req.Timestamp, "timestamp"); err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
funnel, err := aH.TraceFunnels.GetFunnel(r.Context(), req.FunnelID.String())
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if name is being updated and if it already exists
|
||||
if req.Name != "" && req.Name != funnel.Name {
|
||||
funnels, err := aH.TraceFunnels.ListFunnels(r.Context(), orgID)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("failed to list funnels: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
for _, f := range funnels {
|
||||
if f.Name == req.Name {
|
||||
RespondError(w, &model.ApiError{
|
||||
Typ: model.ErrorBadData,
|
||||
Err: fmt.Errorf("a funnel with name '%s' already exists in this organization", req.Name),
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process each step in the request
|
||||
for i := range req.Steps {
|
||||
if req.Steps[i].Order < 1 {
|
||||
req.Steps[i].Order = int64(i + 1) // Default to sequential ordering if not specified
|
||||
}
|
||||
}
|
||||
|
||||
if err := traceFunnel.ValidateFunnelSteps(req.Steps); err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid funnel steps: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// Normalize step orders
|
||||
req.Steps = traceFunnel.NormalizeFunnelSteps(req.Steps)
|
||||
|
||||
// Update the funnel with new steps
|
||||
funnel.Steps = req.Steps
|
||||
funnel.UpdatedAt = time.Unix(0, req.Timestamp*1000000) // Convert to nanoseconds
|
||||
funnel.UpdatedBy = userID
|
||||
|
||||
// Update funnel in database
|
||||
err = aH.TraceFunnels.UpdateFunnel(r.Context(), funnel, userID)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("failed to update funnel in database: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// Update name and description if provided
|
||||
if req.Name != "" || req.Description != "" {
|
||||
// If name is not provided, use the existing name
|
||||
name := req.Name
|
||||
if name == "" {
|
||||
name = funnel.Name
|
||||
}
|
||||
// If description is not provided, use the existing description
|
||||
description := req.Description
|
||||
if description == "" {
|
||||
description = funnel.Description
|
||||
}
|
||||
|
||||
err = aH.TraceFunnels.UpdateMetadata(r.Context(), funnel.ID, name, description, userID)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("failed to update funnel metadata: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Get the updated funnel to return in response
|
||||
updatedFunnel, err := aH.TraceFunnels.GetFunnel(r.Context(), funnel.ID.String())
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("failed to get updated funnel: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
response := tracefunnel.FunnelResponse{
|
||||
FunnelName: updatedFunnel.Name,
|
||||
FunnelID: updatedFunnel.ID.String(),
|
||||
Steps: updatedFunnel.Steps,
|
||||
CreatedAt: updatedFunnel.CreatedAt.UnixNano() / 1000000,
|
||||
CreatedBy: updatedFunnel.CreatedBy,
|
||||
OrgID: updatedFunnel.OrgID.String(),
|
||||
UpdatedBy: userID,
|
||||
UpdatedAt: updatedFunnel.UpdatedAt.UnixNano() / 1000000,
|
||||
Description: updatedFunnel.Description,
|
||||
}
|
||||
|
||||
aH.Respond(w, response)
|
||||
}
|
||||
|
||||
// handleListFunnels handles listing all funnels for a user
|
||||
func (aH *APIHandler) handleListFunnels(w http.ResponseWriter, r *http.Request) {
|
||||
claims, err := authtypes.ClaimsFromContext(r.Context())
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{
|
||||
Typ: model.ErrorUnauthorized,
|
||||
Err: fmt.Errorf("unauthenticated"),
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
orgID := claims.OrgID
|
||||
funnels, err := aH.TraceFunnels.ListFunnels(r.Context(), orgID)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{
|
||||
Typ: model.ErrorInternal,
|
||||
Err: fmt.Errorf("failed to list funnels: %v", err),
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
var response []tracefunnel.FunnelResponse
|
||||
for _, f := range funnels {
|
||||
funnelResp := tracefunnel.FunnelResponse{
|
||||
FunnelName: f.Name,
|
||||
FunnelID: f.ID.String(),
|
||||
CreatedAt: f.CreatedAt.UnixNano() / 1000000,
|
||||
CreatedBy: f.CreatedBy,
|
||||
OrgID: f.OrgID.String(),
|
||||
UpdatedAt: f.UpdatedAt.UnixNano() / 1000000,
|
||||
UpdatedBy: f.UpdatedBy,
|
||||
Description: f.Description,
|
||||
}
|
||||
|
||||
// Get user email if available
|
||||
if f.CreatedByUser != nil {
|
||||
funnelResp.UserEmail = f.CreatedByUser.Email
|
||||
}
|
||||
|
||||
response = append(response, funnelResp)
|
||||
}
|
||||
|
||||
aH.Respond(w, response)
|
||||
}
|
||||
|
||||
// handleGetFunnel handles getting a single funnel
|
||||
func (aH *APIHandler) handleGetFunnel(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
funnelID := vars["funnel_id"]
|
||||
|
||||
funnel, err := aH.TraceFunnels.GetFunnel(r.Context(), funnelID)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// Create a response with only the email from CreatedByUser
|
||||
response := tracefunnel.FunnelResponse{
|
||||
FunnelID: funnel.ID.String(),
|
||||
FunnelName: funnel.Name,
|
||||
Description: funnel.Description,
|
||||
CreatedAt: funnel.CreatedAt.UnixNano() / 1000000,
|
||||
UpdatedAt: funnel.UpdatedAt.UnixNano() / 1000000,
|
||||
CreatedBy: funnel.CreatedBy,
|
||||
UpdatedBy: funnel.UpdatedBy,
|
||||
OrgID: funnel.OrgID.String(),
|
||||
Steps: funnel.Steps,
|
||||
UserEmail: funnel.CreatedByUser.Email,
|
||||
}
|
||||
|
||||
aH.Respond(w, response)
|
||||
}
|
||||
|
||||
// handleDeleteFunnel handles deleting a funnel
|
||||
func (aH *APIHandler) handleDeleteFunnel(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
funnelID := vars["funnel_id"]
|
||||
|
||||
err := aH.TraceFunnels.DeleteFunnel(r.Context(), funnelID)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("failed to delete funnel: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, nil)
|
||||
}
|
||||
|
||||
// handleSaveFunnel saves a funnel to the SQLite database
|
||||
func (aH *APIHandler) handleSaveFunnel(w http.ResponseWriter, r *http.Request) {
|
||||
var req tracefunnel.FunnelRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
claims, err := authtypes.ClaimsFromContext(r.Context())
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorUnauthorized, Err: fmt.Errorf("unauthenticated")}, nil)
|
||||
return
|
||||
}
|
||||
orgID := claims.OrgID
|
||||
usrID := claims.UserID
|
||||
|
||||
funnel, err := aH.TraceFunnels.GetFunnel(r.Context(), req.FunnelID.String())
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
updateTimestamp := req.Timestamp
|
||||
if updateTimestamp == 0 {
|
||||
updateTimestamp = time.Now().UnixMilli()
|
||||
} else if !traceFunnel.ValidateTimestampIsMilliseconds(updateTimestamp) {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("timestamp must be in milliseconds format (13 digits)")}, nil)
|
||||
return
|
||||
}
|
||||
funnel.UpdatedAt = time.Unix(0, updateTimestamp*1000000) // Convert to nanoseconds
|
||||
|
||||
if req.UserID != "" {
|
||||
funnel.UpdatedBy = usrID
|
||||
}
|
||||
|
||||
funnel.Description = req.Description
|
||||
|
||||
if err := aH.TraceFunnels.SaveFunnel(r.Context(), funnel, funnel.UpdatedBy, orgID); err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("failed to save funnel: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// Try to fetch metadata from DB
|
||||
createdAt, updatedAt, extraDataFromDB, err := aH.TraceFunnels.GetFunnelMetadata(r.Context(), funnel.ID.String())
|
||||
resp := tracefunnel.FunnelResponse{
|
||||
FunnelName: funnel.Name,
|
||||
CreatedAt: createdAt,
|
||||
UpdatedAt: updatedAt,
|
||||
CreatedBy: funnel.CreatedBy,
|
||||
UpdatedBy: funnel.UpdatedBy,
|
||||
OrgID: funnel.OrgID.String(),
|
||||
Description: extraDataFromDB,
|
||||
}
|
||||
|
||||
aH.Respond(w, resp)
|
||||
}
|
||||
|
||||
// handleValidateTraces handles validating traces in a funnel
|
||||
func (aH *APIHandler) handleValidateTraces(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
funnelID := vars["funnel_id"]
|
||||
|
||||
funnel, err := aH.TraceFunnels.GetFunnel(r.Context(), funnelID)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
var timeRange tracefunnel.TimeRange
|
||||
if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding time range: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
response, err := aH.TraceFunnels.ValidateTraces(r.Context(), funnel, timeRange)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error validating traces: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, response)
|
||||
}
|
||||
|
||||
// handleFunnelAnalytics handles getting analytics for a funnel
|
||||
func (aH *APIHandler) handleFunnelAnalytics(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
funnelID := vars["funnel_id"]
|
||||
|
||||
funnel, err := aH.TraceFunnels.GetFunnel(r.Context(), funnelID)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
var timeRange tracefunnel.TimeRange
|
||||
if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding time range: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
response, err := aH.TraceFunnels.GetFunnelAnalytics(r.Context(), funnel, timeRange)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error getting funnel analytics: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, response)
|
||||
}
|
||||
|
||||
// handleStepAnalytics handles getting analytics for each step
|
||||
func (aH *APIHandler) handleStepAnalytics(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
funnelID := vars["funnel_id"]
|
||||
|
||||
funnel, err := aH.TraceFunnels.GetFunnel(r.Context(), funnelID)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
var timeRange tracefunnel.TimeRange
|
||||
if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding time range: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
response, err := aH.TraceFunnels.GetStepAnalytics(r.Context(), funnel, timeRange)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error getting step analytics: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, response)
|
||||
}
|
||||
|
||||
// handleFunnelSlowTraces handles requests for slow traces in a funnel
|
||||
func (aH *APIHandler) handleFunnelSlowTraces(w http.ResponseWriter, r *http.Request) {
|
||||
aH.handleTracesWithLatency(w, r, false)
|
||||
}
|
||||
|
||||
// handleFunnelErrorTraces handles requests for error traces in a funnel
|
||||
func (aH *APIHandler) handleFunnelErrorTraces(w http.ResponseWriter, r *http.Request) {
|
||||
aH.handleTracesWithLatency(w, r, true)
|
||||
}
|
||||
|
||||
// handleTracesWithLatency handles both slow and error traces with common logic
|
||||
func (aH *APIHandler) handleTracesWithLatency(w http.ResponseWriter, r *http.Request, isError bool) {
|
||||
funnel, req, err := aH.validateTracesRequest(r)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
if err := traceFunnel.ValidateSteps(funnel, req.StepAOrder, req.StepBOrder); err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
response, err := aH.TraceFunnels.GetSlowestTraces(r.Context(), funnel, req.StepAOrder, req.StepBOrder, req.TimeRange, isError)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error getting traces: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Respond(w, response)
|
||||
}
|
||||
|
||||
// validateTracesRequest validates and extracts the request parameters
|
||||
func (aH *APIHandler) validateTracesRequest(r *http.Request) (*tracefunnel.Funnel, *tracefunnel.StepTransitionRequest, error) {
|
||||
vars := mux.Vars(r)
|
||||
funnelID := vars["funnel_id"]
|
||||
|
||||
funnel, err := aH.TraceFunnels.GetFunnel(r.Context(), funnelID)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("funnel not found: %v", err)
|
||||
}
|
||||
|
||||
var req tracefunnel.StepTransitionRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
return nil, nil, fmt.Errorf("invalid request body: %v", err)
|
||||
}
|
||||
|
||||
return funnel, &req, nil
|
||||
}
|
||||
|
||||
@@ -17,6 +17,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/preference"
|
||||
preferencecore "github.com/SigNoz/signoz/pkg/modules/preference/core"
|
||||
"github.com/SigNoz/signoz/pkg/modules/tracefunnel"
|
||||
tfCore "github.com/SigNoz/signoz/pkg/modules/tracefunnel/core"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
||||
@@ -186,6 +188,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
preferenceAPI := preference.NewAPI(preferencecore.NewPreference(preferencecore.NewStore(serverOptions.SigNoz.SQLStore), preferencetypes.NewDefaultPreferenceMap()))
|
||||
organizationAPI := implorganization.NewAPI(implorganization.NewModule(implorganization.NewStore(serverOptions.SigNoz.SQLStore)))
|
||||
organizationModule := implorganization.NewModule(implorganization.NewStore(serverOptions.SigNoz.SQLStore))
|
||||
tracefunnels := tracefunnel.NewAPI(tfCore.NewStore(serverOptions.SigNoz.SQLStore))
|
||||
apiHandler, err := NewAPIHandler(APIHandlerOpts{
|
||||
Reader: reader,
|
||||
SkipConfig: skipConfig,
|
||||
@@ -207,6 +210,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
Preference: preferenceAPI,
|
||||
OrganizationAPI: organizationAPI,
|
||||
OrganizationModule: organizationModule,
|
||||
TraceFunnels: tracefunnels,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -319,6 +323,7 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server,
|
||||
api.RegisterMessagingQueuesRoutes(r, am)
|
||||
api.RegisterThirdPartyApiRoutes(r, am)
|
||||
api.MetricExplorerRoutes(r, am)
|
||||
api.RegisterTraceFunnelsRoutes(r, am)
|
||||
|
||||
c := cors.New(cors.Options{
|
||||
AllowedOrigins: []string{"*"},
|
||||
@@ -417,7 +422,6 @@ func (s *Server) Start(ctx context.Context) error {
|
||||
if port, err := utils.GetPort(s.privateConn.Addr()); err == nil {
|
||||
privatePort = port
|
||||
}
|
||||
fmt.Println("starting private http")
|
||||
go func() {
|
||||
zap.L().Info("Starting Private HTTP server", zap.Int("port", privatePort), zap.String("addr", s.serverOptions.PrivateHostPort))
|
||||
|
||||
|
||||
@@ -74,6 +74,7 @@ func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedM
|
||||
sqlmigration.NewUpdateIntegrationsFactory(sqlstore),
|
||||
sqlmigration.NewUpdateOrganizationsFactory(sqlstore),
|
||||
sqlmigration.NewDropGroupsFactory(sqlstore),
|
||||
sqlmigration.NewAddTraceFunnelsFactory(sqlstore),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
96
pkg/sqlmigration/030_add_trace_funnels.go
Normal file
96
pkg/sqlmigration/030_add_trace_funnels.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package sqlmigration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
)
|
||||
|
||||
type addTraceFunnels struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewAddTraceFunnelsFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("add_trace_funnels"), func(ctx context.Context, providerSettings factory.ProviderSettings, config Config) (SQLMigration, error) {
|
||||
return newAddTraceFunnels(ctx, providerSettings, config, sqlstore)
|
||||
})
|
||||
}
|
||||
|
||||
func newAddTraceFunnels(_ context.Context, _ factory.ProviderSettings, _ Config, sqlstore sqlstore.SQLStore) (SQLMigration, error) {
|
||||
return &addTraceFunnels{sqlstore: sqlstore}, nil
|
||||
}
|
||||
|
||||
func (migration *addTraceFunnels) Register(migrations *migrate.Migrations) error {
|
||||
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *addTraceFunnels) Up(ctx context.Context, db *bun.DB) error {
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Create trace_funnel table with foreign key constraint inline
|
||||
_, err = tx.NewCreateTable().Model((*traceFunnels.Funnel)(nil)).
|
||||
ForeignKey(`("org_id") REFERENCES "organizations" ("id") ON DELETE CASCADE`).
|
||||
IfNotExists().
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create trace_funnel table: %v", err)
|
||||
}
|
||||
|
||||
// Add unique constraint for org_id and name
|
||||
_, err = tx.NewRaw(`
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_trace_funnel_org_id_name
|
||||
ON trace_funnel (org_id, name)
|
||||
`).Exec(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create unique constraint: %v", err)
|
||||
}
|
||||
|
||||
// Create indexes
|
||||
_, err = tx.NewCreateIndex().Model((*traceFunnels.Funnel)(nil)).Index("idx_trace_funnel_org_id").Column("org_id").Exec(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create org_id index: %v", err)
|
||||
}
|
||||
|
||||
_, err = tx.NewCreateIndex().Model((*traceFunnels.Funnel)(nil)).Index("idx_trace_funnel_created_at").Column("created_at").Exec(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create created_at index: %v", err)
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (migration *addTraceFunnels) Down(ctx context.Context, db *bun.DB) error {
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Drop trace_funnel table
|
||||
_, err = tx.NewDropTable().Model((*traceFunnels.Funnel)(nil)).IfExists().Exec(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to drop trace_funnel table: %v", err)
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
109
pkg/types/tracefunnel/funnel.go
Normal file
109
pkg/types/tracefunnel/funnel.go
Normal file
@@ -0,0 +1,109 @@
|
||||
package traceFunnels
|
||||
|
||||
import (
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
// metadata for funnels
|
||||
|
||||
type BaseMetadata struct {
|
||||
types.Identifiable // funnel id
|
||||
types.TimeAuditable
|
||||
types.UserAuditable
|
||||
Name string `json:"funnel_name" bun:"name,type:text,notnull"` // funnel name
|
||||
Description string `json:"description" bun:"description,type:text"` // funnel description
|
||||
OrgID valuer.UUID `json:"org_id" bun:"org_id,type:varchar,notnull"`
|
||||
}
|
||||
|
||||
// Funnel Core Data Structure (Funnel and FunnelStep)
|
||||
type Funnel struct {
|
||||
bun.BaseModel `bun:"table:trace_funnel"`
|
||||
BaseMetadata
|
||||
Steps []FunnelStep `json:"steps" bun:"steps,type:text,notnull"`
|
||||
Tags string `json:"tags" bun:"tags,type:text"`
|
||||
CreatedByUser *types.User `json:"user" bun:"rel:belongs-to,join:created_by=id"`
|
||||
}
|
||||
|
||||
type FunnelStep struct {
|
||||
Name string `json:"name,omitempty"` // step name
|
||||
Description string `json:"description,omitempty"` // step description
|
||||
Order int64 `json:"step_order"`
|
||||
ServiceName string `json:"service_name"`
|
||||
SpanName string `json:"span_name"`
|
||||
Filters *v3.FilterSet `json:"filters,omitempty"`
|
||||
LatencyPointer string `json:"latency_pointer,omitempty"`
|
||||
LatencyType string `json:"latency_type,omitempty"`
|
||||
HasErrors bool `json:"has_errors"`
|
||||
}
|
||||
|
||||
// FunnelRequest represents all possible funnel-related requests
|
||||
type FunnelRequest struct {
|
||||
// Common fields
|
||||
FunnelID valuer.UUID `json:"funnel_id,omitempty"`
|
||||
Steps []FunnelStep `json:"steps,omitempty"`
|
||||
Timestamp int64 `json:"timestamp,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
UserID string `json:"user_id,omitempty"`
|
||||
Name string `json:"funnel_name,omitempty"`
|
||||
|
||||
// Analytics specific fields
|
||||
StartTime int64 `json:"start_time,omitempty"`
|
||||
EndTime int64 `json:"end_time,omitempty"`
|
||||
StepAOrder int64 `json:"step_a_order,omitempty"`
|
||||
StepBOrder int64 `json:"step_b_order,omitempty"`
|
||||
}
|
||||
|
||||
// FunnelResponse represents all possible funnel-related responses
|
||||
type FunnelResponse struct {
|
||||
// Common fields
|
||||
Description string `json:"description,omitempty"`
|
||||
CreatedAt int64 `json:"created_at,omitempty"`
|
||||
UpdatedAt int64 `json:"updated_at,omitempty"`
|
||||
CreatedBy string `json:"created_by,omitempty"`
|
||||
UpdatedBy string `json:"updated_by,omitempty"`
|
||||
OrgID string `json:"org_id,omitempty"`
|
||||
Steps []FunnelStep `json:"steps,omitempty"`
|
||||
|
||||
// Special fields
|
||||
FunnelID string `json:"funnel_id,omitempty"`
|
||||
FunnelName string `json:"funnel_name,omitempty"`
|
||||
CreationTimestamp int64 `json:"creation_timestamp,omitempty"`
|
||||
UserEmail string `json:"user_email,omitempty"`
|
||||
Funnel *Funnel `json:"funnel,omitempty"`
|
||||
}
|
||||
|
||||
// TimeRange represents a time range for analytics
|
||||
type TimeRange struct {
|
||||
StartTime int64 `json:"start_time"`
|
||||
EndTime int64 `json:"end_time"`
|
||||
}
|
||||
|
||||
// StepTransitionRequest represents a request for step transition analytics
|
||||
type StepTransitionRequest struct {
|
||||
TimeRange
|
||||
StepAOrder int64 `json:"step_a_order"`
|
||||
StepBOrder int64 `json:"step_b_order"`
|
||||
}
|
||||
|
||||
// UserInfo represents basic user information
|
||||
type UserInfo struct {
|
||||
ID string `json:"id"`
|
||||
Email string `json:"email"`
|
||||
}
|
||||
|
||||
// Analytics on traces
|
||||
type FunnelAnalytics struct {
|
||||
TotalStart int64 `json:"total_start"`
|
||||
TotalComplete int64 `json:"total_complete"`
|
||||
ErrorCount int64 `json:"error_count"`
|
||||
AvgDurationMs float64 `json:"avg_duration_ms"`
|
||||
P99LatencyMs float64 `json:"p99_latency_ms"`
|
||||
ConversionRate float64 `json:"conversion_rate"`
|
||||
}
|
||||
|
||||
type ValidTracesResponse struct {
|
||||
TraceIDs []string `json:"trace_ids"`
|
||||
}
|
||||
16
pkg/types/tracefunnel/store.go
Normal file
16
pkg/types/tracefunnel/store.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package traceFunnels
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type TraceFunnelStore interface {
|
||||
Create(context.Context, *Funnel) error
|
||||
Get(context.Context, valuer.UUID) (*Funnel, error)
|
||||
List(context.Context) ([]*Funnel, error)
|
||||
Update(context.Context, *Funnel) error
|
||||
Delete(context.Context, valuer.UUID) error
|
||||
UpdateMetadata(context.Context, valuer.UUID, string, string, string) error
|
||||
}
|
||||
Reference in New Issue
Block a user