Compare commits
28 Commits
main
...
feat/sync-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db50b3f952 | ||
|
|
cf507dd88e | ||
|
|
6db30764d2 | ||
|
|
d47df57339 | ||
|
|
23741983e6 | ||
|
|
d654099311 | ||
|
|
d59edde634 | ||
|
|
136c23bd63 | ||
|
|
453b8d56fc | ||
|
|
0678bd59e1 | ||
|
|
426c7fd075 | ||
|
|
162239bec5 | ||
|
|
7648c7084a | ||
|
|
c79e90231f | ||
|
|
a2a97e9d5c | ||
|
|
c5e1172863 | ||
|
|
cb97e05c24 | ||
|
|
c43a77621a | ||
|
|
c2b8ff0c67 | ||
|
|
8494be2fe8 | ||
|
|
c874e731cc | ||
|
|
9c4e9db665 | ||
|
|
0f48f90e8d | ||
|
|
bbdeeca2e0 | ||
|
|
7c79f57dee | ||
|
|
d1857177d1 | ||
|
|
1909f25931 | ||
|
|
1b88fa6549 |
4
go.mod
4
go.mod
@@ -84,6 +84,7 @@ require (
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.8.0 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
|
||||
github.com/ClickHouse/ch-go v0.61.3 // indirect
|
||||
github.com/alecthomas/participle/v2 v2.1.1 // indirect
|
||||
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 // indirect
|
||||
github.com/andybalholm/brotli v1.1.0 // indirect
|
||||
github.com/aws/aws-sdk-go v1.53.16 // indirect
|
||||
@@ -108,6 +109,7 @@ require (
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/go-ole/go-ole v1.2.6 // indirect
|
||||
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect
|
||||
github.com/gobwas/glob v0.2.3 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
@@ -118,6 +120,7 @@ require (
|
||||
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
|
||||
github.com/hashicorp/go-version v1.7.0 // indirect
|
||||
github.com/iancoleman/strcase v0.3.0 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
||||
github.com/jonboulle/clockwork v0.2.2 // indirect
|
||||
@@ -142,6 +145,7 @@ require (
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
|
||||
github.com/oklog/run v1.1.0 // indirect
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.102.0 // indirect
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.102.0 // indirect
|
||||
github.com/paulmach/orb v0.11.1 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.21 // indirect
|
||||
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
|
||||
|
||||
8
go.sum
8
go.sum
@@ -72,6 +72,8 @@ github.com/SigNoz/zap_otlp/zap_otlp_encoder v0.0.0-20230822164844-1b861a431974 h
|
||||
github.com/SigNoz/zap_otlp/zap_otlp_encoder v0.0.0-20230822164844-1b861a431974/go.mod h1:fpiHtiboLJpIE5TtkQfiWx6xtnlA+uWmv+N9opETqKY=
|
||||
github.com/SigNoz/zap_otlp/zap_otlp_sync v0.0.0-20230822164844-1b861a431974 h1:G2JzCrqdeOTtAn4tDFZEg5gCAEYVRXcddG3ZlrFMumo=
|
||||
github.com/SigNoz/zap_otlp/zap_otlp_sync v0.0.0-20230822164844-1b861a431974/go.mod h1:YtDal1xBRQfPRNo7iSU3W37RGT0jMW7Rnzk6EON3a4M=
|
||||
github.com/alecthomas/participle/v2 v2.1.1 h1:hrjKESvSqGHzRb4yW1ciisFJ4p3MGYih6icjJvbsmV8=
|
||||
github.com/alecthomas/participle/v2 v2.1.1/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c=
|
||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
@@ -244,6 +246,8 @@ github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 h1:TQcrn6Wq+sKGkpyPvppOz99zsM
|
||||
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
|
||||
github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg=
|
||||
github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
|
||||
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
|
||||
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
@@ -414,6 +418,8 @@ github.com/hetznercloud/hcloud-go/v2 v2.9.0/go.mod h1:qtW/TuU7Bs16ibXl/ktJarWqU2
|
||||
github.com/hjson/hjson-go/v4 v4.0.0 h1:wlm6IYYqHjOdXH1gHev4VoXCaW20HdQAGCxdOEEg2cs=
|
||||
github.com/hjson/hjson-go/v4 v4.0.0/go.mod h1:KaYt3bTw3zhBjYqnXkYywcYctk0A2nxeEFTse3rH13E=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI=
|
||||
github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
|
||||
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
||||
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
||||
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
|
||||
@@ -583,6 +589,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.102
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.102.0/go.mod h1:cBbjwd8m4rBVgCQksUbAVQX1EoM5IuCyNQw2mzvibEM=
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.102.0 h1:qsM5HhWpAfIMg8LdO4u+CHofu4UuCuJwg/M+ySO9uZA=
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.102.0/go.mod h1:wBJlGy9Wx6s7AxIMcSne2sGw73e5ZUy1AQ/duYwpFf8=
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.102.0 h1:EPmEtTgrlNzriEYZpkVOVDWlqWTUHoEqmM8oU/EpdkA=
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.102.0/go.mod h1:qnLc/+jOVcsL1dF17ztBcf3juQ3f9bt6Wuf+Xxbrd9w=
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.102.0 h1:vJL6lDaeI3pVA7ADnWKD3HMpI80BSrZ2UnGc+qkwqoY=
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.102.0/go.mod h1:xtE7tds5j8PtI/wMuGb+Em5K9rJH8hm6t28Qe4QrpoU=
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.102.0 h1:TvJYcU/DLRFCgHr7nT98k5D+qkZ4syKVxc8OJjv+K4c=
|
||||
|
||||
@@ -21,7 +21,9 @@ func CollectorConfProcessorName(p Pipeline) string {
|
||||
return constants.LogsPPLPfx + p.Alias
|
||||
}
|
||||
|
||||
func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []string, error) {
|
||||
// Old proc generation logic
|
||||
|
||||
func PreparePipelineProcessorOld(pipelines []Pipeline) (map[string]interface{}, []string, error) {
|
||||
processors := map[string]interface{}{}
|
||||
names := []string{}
|
||||
for pipelineIdx, v := range pipelines {
|
||||
@@ -230,6 +232,8 @@ func cleanTraceParser(operator *PipelineOperator) {
|
||||
}
|
||||
}
|
||||
|
||||
// End of old stuff
|
||||
|
||||
// Generates an expression checking that `fieldPath` has a non-nil value in a log record.
|
||||
func fieldNotNilCheck(fieldPath string) (string, error) {
|
||||
_, err := expr.Compile(fieldPath)
|
||||
|
||||
654
pkg/query-service/app/logparsingpipeline/pipelineBuilderV2.go
Normal file
654
pkg/query-service/app/logparsingpipeline/pipelineBuilderV2.go
Normal file
@@ -0,0 +1,654 @@
|
||||
// Generate collector config for log pipelines
|
||||
// using ottl targeting signoztransform processor.
|
||||
|
||||
package logparsingpipeline
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr"
|
||||
)
|
||||
|
||||
func PreparePipelineProcessor(pipelines []Pipeline) (
|
||||
map[string]interface{}, []string, error,
|
||||
) {
|
||||
processors := map[string]interface{}{}
|
||||
names := []string{}
|
||||
|
||||
ottlStatements := []string{}
|
||||
|
||||
for _, pipeline := range pipelines {
|
||||
if pipeline.Enabled {
|
||||
pipelineOttlStatements, err := ottlStatementsForPipeline(pipeline)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("couldn't generate ottl statements for pipeline %s: %w", pipeline.Alias, err)
|
||||
}
|
||||
|
||||
ottlStatements = append(ottlStatements, pipelineOttlStatements...)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(Raj): Maybe validate ottl statements
|
||||
if len(ottlStatements) > 0 {
|
||||
pipelinesProcessorName := "signoztransform/logs-pipelines"
|
||||
names = append(names, pipelinesProcessorName)
|
||||
processors[pipelinesProcessorName] = map[string]interface{}{
|
||||
"error_mode": "ignore",
|
||||
"log_statements": []map[string]interface{}{
|
||||
{
|
||||
"context": "log",
|
||||
"statements": ottlStatements,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(Raj): with error_mode: ignore, errors in ottl statements get logged and processing
|
||||
// continues on to the next statement.
|
||||
// So operators that get translated to multiple ottl statements must behave atomically
|
||||
// as much as possible - if a statement for the op fails, there is no point running the
|
||||
// statements that follow.
|
||||
|
||||
return processors, names, nil
|
||||
}
|
||||
|
||||
func ottlStatementsForPipeline(pipeline Pipeline) ([]string, error) {
|
||||
enabledOperators := []PipelineOperator{}
|
||||
for _, op := range pipeline.Config {
|
||||
if op.Enabled {
|
||||
enabledOperators = append(enabledOperators, op)
|
||||
}
|
||||
}
|
||||
if len(enabledOperators) < 1 {
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
// We are generating one or more ottl statements per pipeline operator.
|
||||
// ottl statements have individual where conditions per statement
|
||||
// The simplest path is to add where clause for pipeline filter to each statement.
|
||||
// However, this breaks if an early operator statement in the pipeline ends up
|
||||
// modifying the fields referenced in the pipeline filter.
|
||||
// To work around this, we add statements before and after the actual pipeline
|
||||
// operator statements, that add and remove a pipeline specific marker, ensuring
|
||||
// all operators in a pipeline get to act on the log even if an op changes the filter referenced fields.
|
||||
pipelineMarker := fmt.Sprintf(
|
||||
"%s-%s", pipeline.Alias, pipeline.Id, // pipeline.Id is guaranteed to be unique by DB.
|
||||
)
|
||||
|
||||
addPipelineMarkerOttlStmt := fmt.Sprintf(
|
||||
`set(attributes["__matched-log-pipeline__"], "%s")`, pipelineMarker,
|
||||
)
|
||||
|
||||
filterExpr, err := queryBuilderToExpr.Parse(pipeline.Filter)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse pipeline filter: %w", err)
|
||||
}
|
||||
if len(filterExpr) > 0 {
|
||||
// TODO(Raj): Update qb2Expr logic to work directly with ottl
|
||||
filterOttl := exprToOttl(filterExpr)
|
||||
addPipelineMarkerOttlStmt += fmt.Sprintf(" where %s", filterOttl)
|
||||
}
|
||||
|
||||
pipelineOttlStatements := []string{addPipelineMarkerOttlStmt}
|
||||
|
||||
// Add ottl statements for implementing enabled pipeline operators
|
||||
|
||||
logMatchesPipeline := fmt.Sprintf(
|
||||
`attributes["__matched-log-pipeline__"] == "%s"`, pipelineMarker,
|
||||
)
|
||||
|
||||
for _, operator := range enabledOperators {
|
||||
stmts, err := ottlStatementsForPipelineOperator(operator)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate ottl for %s operator: %w", operator.Type, err,
|
||||
)
|
||||
}
|
||||
|
||||
for _, s := range stmts {
|
||||
// prepend pipeline marker check to operator ottl statement conditions
|
||||
s.conditions = append([]string{logMatchesPipeline}, s.conditions...)
|
||||
pipelineOttlStatements = append(pipelineOttlStatements, s.toString())
|
||||
}
|
||||
}
|
||||
|
||||
// Add a final ottl statement for the pipeline for removing pipeline marker
|
||||
removePipelineMarkerFromMatchingLogs := fmt.Sprintf(
|
||||
`delete_key(attributes, "__matched-log-pipeline__") where %s`, logMatchesPipeline,
|
||||
)
|
||||
pipelineOttlStatements = append(
|
||||
pipelineOttlStatements, removePipelineMarkerFromMatchingLogs,
|
||||
)
|
||||
|
||||
return pipelineOttlStatements, nil
|
||||
}
|
||||
|
||||
// Operator specific ottl generation helpers follow
|
||||
|
||||
// struct for helping put ottl statements together
|
||||
type ottlStatement struct {
|
||||
// All ottl statements have exactly 1 "editor" for transforming log
|
||||
editor string
|
||||
// editor only gets applied if a log matches the condition
|
||||
// `conditions` get joined with `AND` when being rendered to final ottl statements
|
||||
conditions []string
|
||||
}
|
||||
|
||||
func (s *ottlStatement) toString() string {
|
||||
if len(s.conditions) < 1 {
|
||||
return s.editor
|
||||
}
|
||||
|
||||
conditions := []string{}
|
||||
for _, c := range s.conditions {
|
||||
if len(c) > 0 {
|
||||
conditions = append(conditions, c)
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Sprintf(
|
||||
"%s where %s", s.editor, strings.Join(conditions, " and "),
|
||||
)
|
||||
}
|
||||
|
||||
func ottlStatementsForPipelineOperator(operator PipelineOperator) (
|
||||
[]ottlStatement, error,
|
||||
) {
|
||||
|
||||
if operator.Type == "add" {
|
||||
return ottlStatementsForAddOperator((operator))
|
||||
|
||||
} else if operator.Type == "remove" {
|
||||
return ottlStatementsForRemoveOperator((operator))
|
||||
|
||||
} else if operator.Type == "copy" {
|
||||
return ottlStatementsForCopyOperator((operator))
|
||||
|
||||
} else if operator.Type == "move" {
|
||||
return ottlStatementsForMoveOperator((operator))
|
||||
|
||||
} else if operator.Type == "regex_parser" {
|
||||
return ottlStatementsForRegexParser(operator)
|
||||
|
||||
} else if operator.Type == "grok_parser" {
|
||||
return ottlStatementsForGrokParser(operator)
|
||||
|
||||
} else if operator.Type == "json_parser" {
|
||||
return ottlStatementsForJsonParser(operator)
|
||||
|
||||
} else if operator.Type == "time_parser" {
|
||||
return ottlStatementsForTimeParser(operator)
|
||||
|
||||
} else if operator.Type == "severity_parser" {
|
||||
return ottlStatementsForSeverityParser(operator)
|
||||
|
||||
} else if operator.Type == "trace_parser" {
|
||||
return ottlStatementsForTraceParser(operator)
|
||||
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("unsupported pipeline operator type: %s", operator.Type)
|
||||
}
|
||||
|
||||
func ottlStatementsForAddOperator(
|
||||
operator PipelineOperator,
|
||||
) ([]ottlStatement, error) {
|
||||
conditions := []string{}
|
||||
value := fmt.Sprintf(`"%s"`, operator.Value)
|
||||
|
||||
// Handling for adding dynamic values using golang expr as allowed by logstransform add operator
|
||||
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/operator/transformer/add/transformer.go#L49
|
||||
// Expression values are enclosed in `EXPR(...)`. Note that only uppercase `EXPR(...)` is supported
|
||||
if strings.HasPrefix(operator.Value, "EXPR(") {
|
||||
expression := strings.TrimSuffix(strings.TrimPrefix(operator.Value, "EXPR("), ")")
|
||||
value = exprToOttl(expression)
|
||||
|
||||
// Also add non-nil check condition for fields referenced in the value expression
|
||||
fieldsNotNilCheck, err := fieldsReferencedInExprNotNilCheck(expression)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate nil check for fields referenced in value expr of add operator %s: %w",
|
||||
operator.Name, err,
|
||||
)
|
||||
}
|
||||
if fieldsNotNilCheck != "" {
|
||||
conditions = append(conditions, exprToOttl(fieldsNotNilCheck))
|
||||
}
|
||||
}
|
||||
|
||||
return []ottlStatement{{
|
||||
editor: fmt.Sprintf(`set(%s, %s)`, logTransformPathToOttlPath(operator.Field), value),
|
||||
conditions: conditions,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func ottlStatementsForRemoveOperator(operator PipelineOperator) (
|
||||
[]ottlStatement, error,
|
||||
) {
|
||||
stmt, err := ottlStatementForDeletingField(operator.Field)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't generate ottl for remove operator: %w", err)
|
||||
}
|
||||
return []ottlStatement{*stmt}, nil
|
||||
}
|
||||
|
||||
func ottlStatementsForCopyOperator(operator PipelineOperator) (
|
||||
[]ottlStatement, error,
|
||||
) {
|
||||
return []ottlStatement{{
|
||||
editor: fmt.Sprintf(
|
||||
`set(%s, %s)`,
|
||||
logTransformPathToOttlPath(operator.To),
|
||||
logTransformPathToOttlPath(operator.From),
|
||||
),
|
||||
// TODO(Raj): What if operator.From is nil? Add a test
|
||||
conditions: []string{},
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func ottlStatementsForMoveOperator(operator PipelineOperator) (
|
||||
[]ottlStatement, error,
|
||||
) {
|
||||
stmts := []ottlStatement{{
|
||||
editor: fmt.Sprintf(
|
||||
`set(%s, %s)`,
|
||||
logTransformPathToOttlPath(operator.To),
|
||||
logTransformPathToOttlPath(operator.From),
|
||||
),
|
||||
// TODO(Raj): What happens if operatore.From is nil here.
|
||||
conditions: []string{},
|
||||
}}
|
||||
|
||||
deleteStmt, err := ottlStatementForDeletingField(operator.From)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't generate delete stmt for move op: %w", err)
|
||||
}
|
||||
stmts = append(stmts, *deleteStmt)
|
||||
|
||||
return stmts, nil
|
||||
}
|
||||
|
||||
func ottlStatementsForRegexParser(operator PipelineOperator) (
|
||||
[]ottlStatement, error,
|
||||
) {
|
||||
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate nil check for parseFrom of regex op %s: %w", operator.Name, err,
|
||||
)
|
||||
}
|
||||
|
||||
// TODO(Raj): What happens if ParseTo is not a map? Add a test for this
|
||||
return []ottlStatement{{
|
||||
editor: fmt.Sprintf(
|
||||
`merge_maps(%s, ExtractPatterns(%s, "%s"), "upsert")`,
|
||||
logTransformPathToOttlPath(operator.ParseTo),
|
||||
logTransformPathToOttlPath(operator.ParseFrom),
|
||||
escapeDoubleQuotesForOttl(operator.Regex),
|
||||
),
|
||||
conditions: []string{exprToOttl(parseFromNotNilCheck)},
|
||||
}}, nil
|
||||
|
||||
}
|
||||
|
||||
func ottlStatementsForGrokParser(operator PipelineOperator) (
|
||||
[]ottlStatement, error,
|
||||
) {
|
||||
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate nil check for parseFrom of grok op %s: %w", operator.Name, err,
|
||||
)
|
||||
}
|
||||
// TODO(Raj): What happens if ParseTo is not a map? Add a test for this
|
||||
return []ottlStatement{{
|
||||
editor: fmt.Sprintf(
|
||||
`merge_maps(%s, GrokParse(%s, "%s"), "upsert")`,
|
||||
logTransformPathToOttlPath(operator.ParseTo),
|
||||
logTransformPathToOttlPath(operator.ParseFrom),
|
||||
escapeDoubleQuotesForOttl(operator.Pattern),
|
||||
),
|
||||
conditions: []string{exprToOttl(parseFromNotNilCheck)},
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func ottlStatementsForJsonParser(operator PipelineOperator) (
|
||||
[]ottlStatement, error,
|
||||
) {
|
||||
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate nil check for parseFrom of json parser op %s: %w", operator.Name, err,
|
||||
)
|
||||
}
|
||||
|
||||
mapExtractStmts := ottlStatementsForExtractingMapValue(
|
||||
fmt.Sprintf("ParseJSON(%s)", logTransformPathToOttlPath(operator.ParseFrom)),
|
||||
logTransformPathToOttlPath(operator.ParseTo),
|
||||
)
|
||||
|
||||
stmts := ottlStatementsWithPrependedConditions(
|
||||
mapExtractStmts,
|
||||
exprToOttl(parseFromNotNilCheck),
|
||||
fmt.Sprintf(
|
||||
`IsMatch(%s, "^\\s*{.*}\\s*$")`,
|
||||
logTransformPathToOttlPath(operator.ParseFrom),
|
||||
),
|
||||
)
|
||||
|
||||
return stmts, nil
|
||||
}
|
||||
|
||||
func ottlStatementsForTimeParser(operator PipelineOperator) (
|
||||
[]ottlStatement, error,
|
||||
) {
|
||||
stmts := []ottlStatement{}
|
||||
|
||||
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate nil check for parseFrom of json parser op %s: %w", operator.Name, err,
|
||||
)
|
||||
}
|
||||
|
||||
whereClauseParts := []string{exprToOttl(parseFromNotNilCheck)}
|
||||
|
||||
if operator.LayoutType == "strptime" {
|
||||
regex, err := RegexForStrptimeLayout(operator.Layout)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate layout regex for time_parser %s: %w", operator.Name, err,
|
||||
)
|
||||
}
|
||||
whereClauseParts = append(whereClauseParts,
|
||||
fmt.Sprintf(`IsMatch(%s, "%s")`, logTransformPathToOttlPath(operator.ParseFrom), regex),
|
||||
)
|
||||
|
||||
stmts = append(stmts, ottlStatement{
|
||||
editor: fmt.Sprintf(
|
||||
`set(time, Time(%s, "%s"))`,
|
||||
logTransformPathToOttlPath(operator.ParseFrom),
|
||||
operator.Layout,
|
||||
),
|
||||
conditions: whereClauseParts,
|
||||
})
|
||||
|
||||
} else if operator.LayoutType == "epoch" {
|
||||
valueRegex := `^\\s*[0-9]+\\s*$`
|
||||
if strings.Contains(operator.Layout, ".") {
|
||||
valueRegex = `^\\s*[0-9]+\\.[0-9]+\\s*$`
|
||||
}
|
||||
|
||||
whereClauseParts = append(whereClauseParts,
|
||||
exprToOttl(fmt.Sprintf(
|
||||
`string(%s) matches "%s"`, operator.ParseFrom, valueRegex,
|
||||
)),
|
||||
)
|
||||
|
||||
timeValue := fmt.Sprintf("Double(%s)", logTransformPathToOttlPath(operator.ParseFrom))
|
||||
if strings.HasPrefix(operator.Layout, "seconds") {
|
||||
timeValue = fmt.Sprintf("%s * 1000000000", timeValue)
|
||||
} else if operator.Layout == "milliseconds" {
|
||||
timeValue = fmt.Sprintf("%s * 1000000", timeValue)
|
||||
} else if operator.Layout == "microseconds" {
|
||||
timeValue = fmt.Sprintf("%s * 1000", timeValue)
|
||||
}
|
||||
stmts = append(stmts, ottlStatement{
|
||||
editor: fmt.Sprintf(`set(time_unix_nano, %s)`, timeValue),
|
||||
conditions: whereClauseParts,
|
||||
})
|
||||
} else {
|
||||
return nil, fmt.Errorf("unsupported time layout %s", operator.LayoutType)
|
||||
}
|
||||
|
||||
return stmts, nil
|
||||
}
|
||||
|
||||
func ottlStatementsForSeverityParser(operator PipelineOperator) (
|
||||
[]ottlStatement, error,
|
||||
) {
|
||||
stmts := []ottlStatement{}
|
||||
|
||||
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate nil check for parseFrom of severity parser %s: %w", operator.Name, err,
|
||||
)
|
||||
}
|
||||
|
||||
for severity, valuesToMap := range operator.SeverityMapping {
|
||||
for _, value := range valuesToMap {
|
||||
// Special case for 2xx 3xx 4xx and 5xx
|
||||
isSpecialValue, err := regexp.MatchString(`^\s*[2|3|4|5]xx\s*$`, strings.ToLower(value))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't regex match for wildcard severity values: %w", err)
|
||||
}
|
||||
if isSpecialValue {
|
||||
whereClause := strings.Join([]string{
|
||||
exprToOttl(parseFromNotNilCheck),
|
||||
exprToOttl(fmt.Sprintf(`type(%s) in ["int", "float"] && %s == float(int(%s))`, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom)),
|
||||
exprToOttl(fmt.Sprintf(`string(int(%s)) matches "^%s$"`, operator.ParseFrom, fmt.Sprintf("%s[0-9]{2}", value[0:1]))),
|
||||
}, " and ")
|
||||
stmts = append(stmts, ottlStatement{
|
||||
editor: fmt.Sprintf("set(severity_number, SEVERITY_NUMBER_%s)", strings.ToUpper(severity)),
|
||||
conditions: []string{whereClause},
|
||||
})
|
||||
stmts = append(stmts, ottlStatement{
|
||||
editor: fmt.Sprintf(`set(severity_text, "%s")`, strings.ToUpper(severity)),
|
||||
conditions: []string{whereClause},
|
||||
})
|
||||
} else {
|
||||
whereClause := strings.Join([]string{
|
||||
exprToOttl(parseFromNotNilCheck),
|
||||
fmt.Sprintf(
|
||||
`IsString(%s)`,
|
||||
logTransformPathToOttlPath(operator.ParseFrom),
|
||||
),
|
||||
fmt.Sprintf(
|
||||
`IsMatch(%s, "^\\s*%s\\s*$")`,
|
||||
logTransformPathToOttlPath(operator.ParseFrom), value,
|
||||
),
|
||||
}, " and ")
|
||||
|
||||
stmts = append(stmts, ottlStatement{
|
||||
editor: fmt.Sprintf("set(severity_number, SEVERITY_NUMBER_%s)", strings.ToUpper(severity)),
|
||||
conditions: []string{whereClause},
|
||||
})
|
||||
stmts = append(stmts, ottlStatement{
|
||||
editor: fmt.Sprintf(`set(severity_text, "%s")`, strings.ToUpper(severity)),
|
||||
conditions: []string{whereClause},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return stmts, nil
|
||||
}
|
||||
|
||||
func ottlStatementsForTraceParser(operator PipelineOperator) (
|
||||
[]ottlStatement, error,
|
||||
) {
|
||||
stmts := []ottlStatement{}
|
||||
|
||||
if operator.TraceId != nil && len(operator.TraceId.ParseFrom) > 0 {
|
||||
parseFromNotNilCheck, err := fieldNotNilCheck(operator.TraceId.ParseFrom)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate nil check for TraceId.parseFrom %s: %w", operator.Name, err,
|
||||
)
|
||||
}
|
||||
// TODO(Raj): Also check for trace id regex pattern
|
||||
stmts = append(stmts, ottlStatement{
|
||||
editor: fmt.Sprintf(`set(trace_id.string, %s)`, logTransformPathToOttlPath(operator.TraceId.ParseFrom)),
|
||||
conditions: []string{exprToOttl(parseFromNotNilCheck)},
|
||||
})
|
||||
}
|
||||
|
||||
if operator.SpanId != nil && len(operator.SpanId.ParseFrom) > 0 {
|
||||
parseFromNotNilCheck, err := fieldNotNilCheck(operator.SpanId.ParseFrom)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate nil check for TraceId.parseFrom %s: %w", operator.Name, err,
|
||||
)
|
||||
}
|
||||
// TODO(Raj): Also check for span id regex pattern
|
||||
stmts = append(stmts, ottlStatement{
|
||||
editor: fmt.Sprintf("set(span_id.string, %s)", logTransformPathToOttlPath(operator.SpanId.ParseFrom)),
|
||||
conditions: []string{exprToOttl(parseFromNotNilCheck)},
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
if operator.TraceFlags != nil && len(operator.TraceFlags.ParseFrom) > 0 {
|
||||
|
||||
parseFromNotNilCheck, err := fieldNotNilCheck(operator.TraceFlags.ParseFrom)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate nil check for TraceId.parseFrom %s: %w", operator.Name, err,
|
||||
)
|
||||
}
|
||||
// TODO(Raj): Also check for trace flags hex regex pattern
|
||||
stmts = append(stmts, ottlStatement{
|
||||
editor: fmt.Sprintf(`set(flags, HexToInt(%s))`, logTransformPathToOttlPath(operator.TraceFlags.ParseFrom)),
|
||||
conditions: []string{exprToOttl(parseFromNotNilCheck)},
|
||||
})
|
||||
}
|
||||
return stmts, nil
|
||||
}
|
||||
|
||||
// TODO(Raj): This should be used in regex and grok parser too?
|
||||
func ottlStatementsForExtractingMapValue(
|
||||
mapGenerator string,
|
||||
target string,
|
||||
) []ottlStatement {
|
||||
stmts := []ottlStatement{}
|
||||
|
||||
cacheKey := uuid.NewString()
|
||||
|
||||
// Extract parsed map to cache.
|
||||
stmts = append(stmts, ottlStatement{
|
||||
editor: fmt.Sprintf(
|
||||
`set(cache["%s"], %s)`, cacheKey, mapGenerator,
|
||||
),
|
||||
conditions: []string{},
|
||||
})
|
||||
|
||||
// Set target to a map if not already one.
|
||||
stmts = append(stmts, ottlStatement{
|
||||
editor: fmt.Sprintf(
|
||||
`set(%s, ParseJSON("{}"))`, logTransformPathToOttlPath(target),
|
||||
),
|
||||
conditions: []string{
|
||||
fmt.Sprintf(`cache["%s"] != nil`, cacheKey),
|
||||
fmt.Sprintf("not IsMap(%s)", logTransformPathToOttlPath(target)),
|
||||
},
|
||||
})
|
||||
|
||||
stmts = append(stmts, ottlStatement{
|
||||
editor: fmt.Sprintf(
|
||||
`merge_maps(%s, cache["%s"], "upsert")`,
|
||||
logTransformPathToOttlPath(target), cacheKey,
|
||||
),
|
||||
conditions: []string{fmt.Sprintf(`cache["%s"] != nil`, cacheKey)},
|
||||
})
|
||||
|
||||
return stmts
|
||||
}
|
||||
|
||||
func ottlStatementForDeletingField(fieldPath string) (*ottlStatement, error) {
|
||||
ottlPath := logTransformPathToOttlPath(fieldPath)
|
||||
fieldPathParts := rSplitAfterN(ottlPath, "[", 2)
|
||||
target := fieldPathParts[0]
|
||||
key := fieldPathParts[1][1 : len(fieldPathParts[1])-1]
|
||||
|
||||
pathNotNilCheck, err := fieldNotNilCheck(fieldPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't generate nil check for path %s: %w", fieldPath, err)
|
||||
}
|
||||
|
||||
return &ottlStatement{
|
||||
editor: fmt.Sprintf(`delete_key(%s, %s)`, target, key),
|
||||
conditions: []string{exprToOttl(pathNotNilCheck)},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// a.b?.c -> ["a", "b", "c"]
|
||||
// a.b["c.d"].e -> ["a", "b", "c.d", "e"]
|
||||
func pathParts(path string) []string {
|
||||
path = strings.ReplaceAll(path, "?.", ".")
|
||||
|
||||
// Split once from the right to include the rightmost membership op and everything after it.
|
||||
// Eg: `attributes.test["a.b"].value["c.d"].e` would result in `attributes.test["a.b"].value` and `["c.d"].e`
|
||||
memberOpParts := rSplitAfterN(path, "[", 2)
|
||||
|
||||
if len(memberOpParts) < 2 {
|
||||
// there is no [] access in fieldPath
|
||||
return strings.Split(path, ".")
|
||||
}
|
||||
|
||||
// recursively get parts for path prefix before rightmost membership op (`attributes.test["a.b"].value`)
|
||||
parts := pathParts(memberOpParts[0])
|
||||
|
||||
suffixParts := strings.SplitAfter(memberOpParts[1], "]") // ["c.d"].e -> `["c.d"]`, `.e`
|
||||
|
||||
// add key used in membership op ("c.d")
|
||||
parts = append(parts, suffixParts[0][2:len(suffixParts[0])-2])
|
||||
|
||||
// add parts for path after the membership op ("e")
|
||||
if len(suffixParts[1]) > 0 {
|
||||
parts = append(parts, strings.Split(suffixParts[1][1:], ".")...)
|
||||
}
|
||||
|
||||
return parts
|
||||
}
|
||||
|
||||
// converts a logtransform path to an equivalent ottl path
|
||||
// For details, see https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottllog#paths
|
||||
func logTransformPathToOttlPath(path string) string {
|
||||
if !(strings.HasPrefix(path, "attributes") || strings.HasPrefix(path, "resource")) {
|
||||
return path
|
||||
}
|
||||
|
||||
parts := pathParts(path)
|
||||
|
||||
ottlPathParts := []string{parts[0]}
|
||||
|
||||
if ottlPathParts[0] == "resource" {
|
||||
ottlPathParts[0] = "resource.attributes"
|
||||
}
|
||||
|
||||
for _, p := range parts[1:] {
|
||||
ottlPathParts = append(ottlPathParts, fmt.Sprintf(`["%s"]`, p))
|
||||
}
|
||||
|
||||
return strings.Join(ottlPathParts, "")
|
||||
}
|
||||
|
||||
func escapeDoubleQuotesForOttl(str string) string {
|
||||
return strings.ReplaceAll(
|
||||
strings.ReplaceAll(str, `\`, `\\`), `"`, `\"`,
|
||||
)
|
||||
}
|
||||
|
||||
func exprToOttl(expr string) string {
|
||||
return fmt.Sprintf(`EXPR("%s")`, escapeDoubleQuotesForOttl(expr))
|
||||
}
|
||||
|
||||
func ottlStatementsWithPrependedConditions(
|
||||
statements []ottlStatement,
|
||||
conditionsToPrepend ...string,
|
||||
) []ottlStatement {
|
||||
stmts := []ottlStatement{}
|
||||
for _, s := range statements {
|
||||
stmts = append(stmts, ottlStatement{
|
||||
editor: s.editor,
|
||||
conditions: append(
|
||||
conditionsToPrepend, s.conditions...,
|
||||
),
|
||||
})
|
||||
}
|
||||
return stmts
|
||||
}
|
||||
@@ -199,9 +199,10 @@ var prepareProcessorTestData = []struct {
|
||||
func TestPreparePipelineProcessor(t *testing.T) {
|
||||
for _, test := range prepareProcessorTestData {
|
||||
Convey(test.Name, t, func() {
|
||||
res, err := getOperators(test.Operators)
|
||||
So(err, ShouldBeNil)
|
||||
So(res, ShouldResemble, test.Output)
|
||||
require.NotNil(t, nil, "TODO(Raj): maybe use these tests in new config generation")
|
||||
// res, err := getOperators(test.Operators)
|
||||
// So(err, ShouldBeNil)
|
||||
// So(res, ShouldResemble, test.Output)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -656,14 +657,24 @@ func TestMembershipOpInProcessorFieldExpressions(t *testing.T) {
|
||||
Name: "move",
|
||||
From: `attributes["http.method"]`,
|
||||
To: `attributes["test.http.method"]`,
|
||||
}, {
|
||||
},
|
||||
{
|
||||
ID: "json",
|
||||
Type: "json_parser",
|
||||
Enabled: true,
|
||||
Name: "json",
|
||||
ParseFrom: `attributes["order.products"]`,
|
||||
ParseTo: `attributes["some"].missing.target`,
|
||||
},
|
||||
{
|
||||
ID: "json",
|
||||
Type: "json_parser",
|
||||
Enabled: true,
|
||||
Name: "json",
|
||||
ParseFrom: `attributes["order.products"]`,
|
||||
ParseTo: `attributes["order.products"]`,
|
||||
}, {
|
||||
},
|
||||
{
|
||||
ID: "move1",
|
||||
Type: "move",
|
||||
Enabled: true,
|
||||
@@ -678,6 +689,14 @@ func TestMembershipOpInProcessorFieldExpressions(t *testing.T) {
|
||||
From: `attributes.test?.doesnt_exist`,
|
||||
To: `attributes["test.doesnt_exist"]`,
|
||||
}, {
|
||||
ID: "addToMissingField",
|
||||
Type: "add",
|
||||
Enabled: true,
|
||||
Name: "add",
|
||||
Field: `attributes["another"].new.missing_field`,
|
||||
Value: `2`,
|
||||
},
|
||||
{
|
||||
ID: "add",
|
||||
Type: "add",
|
||||
Enabled: true,
|
||||
@@ -722,6 +741,7 @@ func TestMembershipOpInProcessorFieldExpressions(t *testing.T) {
|
||||
require.False(methodAttrExists)
|
||||
require.Equal("GET", result[0].Attributes_string["test.http.method"])
|
||||
require.Equal("pid0", result[0].Attributes_string["order.pids.pid0"])
|
||||
require.Equal(`{"new":{"missing_field":"2"}}`, result[0].Attributes_string["another"])
|
||||
}
|
||||
|
||||
func TestContainsFilterIsCaseInsensitive(t *testing.T) {
|
||||
@@ -803,3 +823,69 @@ func TestContainsFilterIsCaseInsensitive(t *testing.T) {
|
||||
_, test2Exists := result[0].Attributes_string["test2"]
|
||||
require.False(test2Exists)
|
||||
}
|
||||
|
||||
func TestAllOpsAppliedEvenIfFirstOpAltersAttribsReferencedInPipelineFilter(t *testing.T) {
|
||||
// If the first op in a pipeline alters the log record in such a way that
|
||||
// it would not match the pipeline filter anymore, the rest of the ops
|
||||
// should still be applied to the log record
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
testLogs := []model.SignozLog{
|
||||
makeTestSignozLog("test Ecom Log", map[string]interface{}{
|
||||
"http.method": "GET",
|
||||
}),
|
||||
}
|
||||
|
||||
testPipelines := []Pipeline{{
|
||||
OrderId: 1,
|
||||
Name: "pipeline1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "http.method",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
},
|
||||
Operator: "=",
|
||||
Value: "GET",
|
||||
},
|
||||
},
|
||||
},
|
||||
Config: []PipelineOperator{
|
||||
{
|
||||
ID: "move",
|
||||
Type: "move",
|
||||
Enabled: true,
|
||||
Name: "move",
|
||||
From: `attributes["http.method"]`,
|
||||
To: `attributes["test.http.method"]`,
|
||||
},
|
||||
{
|
||||
ID: "add",
|
||||
Type: "add",
|
||||
Enabled: true,
|
||||
Name: "add",
|
||||
Field: "attributes.test2",
|
||||
Value: "test2",
|
||||
},
|
||||
},
|
||||
}}
|
||||
|
||||
result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing(
|
||||
context.Background(), testPipelines, testLogs,
|
||||
)
|
||||
require.Nil(err)
|
||||
require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n"))
|
||||
require.Equal(1, len(result))
|
||||
|
||||
_, filterAttribExists := result[0].Attributes_string["http.method"]
|
||||
require.False(filterAttribExists)
|
||||
|
||||
require.Equal(result[0].Attributes_string["test.http.method"], "GET")
|
||||
require.Equal(result[0].Attributes_string["test2"], "test2")
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
_ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok"
|
||||
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor"
|
||||
"github.com/SigNoz/signoz-otel-collector/processor/signoztransformprocessor"
|
||||
"github.com/pkg/errors"
|
||||
"go.opentelemetry.io/collector/pdata/pcommon"
|
||||
"go.opentelemetry.io/collector/pdata/plog"
|
||||
@@ -42,7 +42,7 @@ func SimulatePipelinesProcessing(
|
||||
simulatorInputPLogs := SignozLogsToPLogs(logs)
|
||||
|
||||
processorFactories, err := processor.MakeFactoryMap(
|
||||
logstransformprocessor.NewFactory(),
|
||||
signoztransformprocessor.NewFactory(),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, model.InternalError(errors.Wrap(
|
||||
|
||||
@@ -292,9 +292,10 @@ func TestTraceParsingProcessor(t *testing.T) {
|
||||
testLog,
|
||||
},
|
||||
)
|
||||
|
||||
require.Nil(err)
|
||||
require.Equal(1, len(result))
|
||||
require.Equal(0, len(collectorWarnAndErrorLogs))
|
||||
require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n"))
|
||||
processed := result[0]
|
||||
|
||||
require.Equal(testTraceId, processed.TraceID)
|
||||
@@ -346,20 +347,38 @@ func TestAddProcessor(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
var parserOp PipelineOperator
|
||||
err := json.Unmarshal([]byte(`
|
||||
{
|
||||
for _, procJson := range []string{
|
||||
`{
|
||||
"orderId": 1,
|
||||
"enabled": true,
|
||||
"type": "add",
|
||||
"name": "Test add parser",
|
||||
"id": "test-add-parser",
|
||||
"field": "resource.test",
|
||||
"value": "1"
|
||||
}`, `{
|
||||
"orderId": 2,
|
||||
"enabled": true,
|
||||
"type": "add",
|
||||
"name": "Test enabled add resource parser",
|
||||
"id": "test-add-parser",
|
||||
"field": "attributes.test",
|
||||
"value": "EXPR(int(resource.test) + 1)"
|
||||
}`, `{
|
||||
"orderId": 3,
|
||||
"enabled": false,
|
||||
"type": "add",
|
||||
"name": "Test disabled add parser",
|
||||
"id": "test-add-parser",
|
||||
"field": "attributes.testMissing",
|
||||
"value": "test"
|
||||
}
|
||||
`), &parserOp)
|
||||
require.Nil(err)
|
||||
testPipelines[0].Config = append(testPipelines[0].Config, parserOp)
|
||||
}`,
|
||||
} {
|
||||
var parserOp PipelineOperator
|
||||
err := json.Unmarshal([]byte(procJson), &parserOp)
|
||||
require.Nil(err)
|
||||
testPipelines[0].Config = append(testPipelines[0].Config, parserOp)
|
||||
}
|
||||
|
||||
testLog := makeTestSignozLog(
|
||||
"test log",
|
||||
@@ -379,7 +398,9 @@ func TestAddProcessor(t *testing.T) {
|
||||
require.Equal(1, len(result))
|
||||
require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n"))
|
||||
processed := result[0]
|
||||
require.Equal("test", processed.Attributes_string["test"])
|
||||
require.Equal("1", processed.Resources_string["test"])
|
||||
require.Equal(int64(2), processed.Attributes_int64["test"])
|
||||
require.Equal("", processed.Attributes_string["testMissing"])
|
||||
}
|
||||
|
||||
func TestRemoveProcessor(t *testing.T) {
|
||||
|
||||
@@ -305,7 +305,7 @@ var ReservedColumnTargetAliases = map[string]struct{}{
|
||||
}
|
||||
|
||||
// logsPPLPfx is a short constant for logsPipelinePrefix
|
||||
const LogsPPLPfx = "logstransform/pipeline_"
|
||||
const LogsPPLPfx = "signoztransform/logs-pipeline"
|
||||
|
||||
const IntegrationPipelineIdPrefix = "integration"
|
||||
|
||||
|
||||
@@ -27,10 +27,8 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/dao"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
"golang.org/x/exp/maps"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
func TestLogPipelinesLifecycle(t *testing.T) {
|
||||
@@ -628,10 +626,14 @@ func assertPipelinesRecommendedInRemoteConfig(
|
||||
t.Fatalf("could not unmarshal config file sent to opamp client: %v", err)
|
||||
}
|
||||
|
||||
// Each pipeline is expected to become its own processor
|
||||
// in the logs service in otel collector config.
|
||||
// Validate expected collector config processors for log pipelines
|
||||
// are present in config recommended to opamp client
|
||||
|
||||
expectedProcessors, expectedProcNames, err := logparsingpipeline.PreparePipelineProcessor(pipelines)
|
||||
|
||||
collectorConfSvcs := collectorConfSentToClient["service"].(map[string]interface{})
|
||||
collectorConfLogsSvc := collectorConfSvcs["pipelines"].(map[string]interface{})["logs"].(map[string]interface{})
|
||||
|
||||
collectorConfLogsSvcProcessorNames := collectorConfLogsSvc["processors"].([]interface{})
|
||||
collectorConfLogsPipelineProcNames := []string{}
|
||||
for _, procNameVal := range collectorConfLogsSvcProcessorNames {
|
||||
@@ -644,43 +646,28 @@ func assertPipelinesRecommendedInRemoteConfig(
|
||||
}
|
||||
}
|
||||
|
||||
_, expectedLogProcessorNames, err := logparsingpipeline.PreparePipelineProcessor(pipelines)
|
||||
_, expectedProcNames, err = logparsingpipeline.PreparePipelineProcessor(pipelines)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(
|
||||
t, expectedLogProcessorNames, collectorConfLogsPipelineProcNames,
|
||||
t, expectedProcNames, collectorConfLogsPipelineProcNames,
|
||||
"config sent to opamp client doesn't contain expected log pipelines",
|
||||
)
|
||||
|
||||
collectorConfProcessors := collectorConfSentToClient["processors"].(map[string]interface{})
|
||||
for _, procName := range expectedLogProcessorNames {
|
||||
for _, procName := range expectedProcNames {
|
||||
pipelineProcessorInConf, procExists := collectorConfProcessors[procName]
|
||||
require.True(t, procExists, fmt.Sprintf(
|
||||
"%s processor not found in config sent to opamp client", procName,
|
||||
))
|
||||
|
||||
// Validate that filter expr in collector conf is as expected.
|
||||
|
||||
// extract expr present in collector conf processor
|
||||
pipelineProcOps := pipelineProcessorInConf.(map[string]interface{})["operators"].([]interface{})
|
||||
|
||||
routerOpIdx := slices.IndexFunc(
|
||||
pipelineProcOps,
|
||||
func(op interface{}) bool { return op.(map[string]interface{})["id"] == "router_signoz" },
|
||||
)
|
||||
require.GreaterOrEqual(t, routerOpIdx, 0)
|
||||
routerOproutes := pipelineProcOps[routerOpIdx].(map[string]interface{})["routes"].([]interface{})
|
||||
pipelineFilterExpr := routerOproutes[0].(map[string]interface{})["expr"].(string)
|
||||
|
||||
// find logparsingpipeline.Pipeline whose processor is being validated here
|
||||
pipelineIdx := slices.IndexFunc(
|
||||
pipelines, func(p logparsingpipeline.Pipeline) bool {
|
||||
return logparsingpipeline.CollectorConfProcessorName(p) == procName
|
||||
},
|
||||
)
|
||||
require.GreaterOrEqual(t, pipelineIdx, 0)
|
||||
expectedExpr, err := queryBuilderToExpr.Parse(pipelines[pipelineIdx].Filter)
|
||||
procInConfYaml, err := yaml.Parser().Marshal(pipelineProcessorInConf.(map[string]interface{}))
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, expectedExpr, pipelineFilterExpr)
|
||||
|
||||
expectedProcYaml, err := yaml.Parser().Marshal(expectedProcessors[procName].(map[string]interface{}))
|
||||
require.Nil(t, err)
|
||||
|
||||
require.Equal(t, procInConfYaml, expectedProcYaml)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user