Compare commits

...

28 Commits

Author SHA1 Message Date
Raj
db50b3f952 chore: some more cleanup 2024-07-14 20:01:44 +05:30
Raj
cf507dd88e chore: some more cleanup 2024-07-14 19:35:16 +05:30
Raj
6db30764d2 chore: some more cleanup 2024-07-14 12:59:56 +05:30
Raj
d47df57339 chore: some more cleanup 2024-07-14 12:05:31 +05:30
Raj
23741983e6 chore: some cleanup 2024-07-12 16:00:49 +05:30
Raj
d654099311 chore: move new pipeline collector config generation logic to its own file 2024-07-11 20:46:38 +05:30
Raj
d59edde634 chore: add dependency for signoztransformprocessor 2024-07-11 19:17:04 +05:30
Raj
136c23bd63 chore: add explicit test validating pipeline processing when first op affects filter expr result 2024-07-11 19:15:10 +05:30
Raj
453b8d56fc chore: some more cleanup and update pipelines e2e/integration test 2024-07-11 19:13:43 +05:30
Raj
0678bd59e1 chore: some more cleanup 2024-07-11 19:11:59 +05:30
Raj
426c7fd075 chore: some cleanup 2024-07-11 19:11:57 +05:30
Raj
162239bec5 chore: rename Hex2Int -> HexToInt 2024-07-11 19:08:53 +05:30
Raj
7648c7084a chore: transformprocessor -> signoztransformprocessor 2024-07-11 19:08:53 +05:30
Raj
c79e90231f chore: stash latest state of changes 2024-07-11 19:08:53 +05:30
Raj
a2a97e9d5c chore: get all tests passing 2024-07-11 19:08:53 +05:30
Raj
c5e1172863 chore: stash current state of things 2024-07-11 19:08:53 +05:30
Raj
cb97e05c24 chore: sev parser translations to ottl 2024-07-11 19:08:53 +05:30
Raj
c43a77621a chore: get sev parser started 2024-07-11 19:08:53 +05:30
Raj
c2b8ff0c67 feat: add ottl translation for grok processors in pipelines 2024-07-11 19:08:53 +05:30
Raj
8494be2fe8 chore: add translation for time parser 2024-07-11 19:08:53 +05:30
Raj
c874e731cc chore: add support for JSON parser 2024-07-11 19:08:53 +05:30
Raj
9c4e9db665 chore: translate regex_parser to ottl 2024-07-11 19:08:53 +05:30
Raj
0f48f90e8d chore: get resource filters working too 2024-07-11 19:08:53 +05:30
Raj
bbdeeca2e0 chore: update add processor happy test case to check for setting resource attribs and EXPR values 2024-07-11 19:08:53 +05:30
Raj
7c79f57dee chore: add ottl translation for remove processor 2024-07-11 19:08:53 +05:30
Raj
d1857177d1 chore: make add processor test more nuanced and get it passed 2024-07-11 19:08:53 +05:30
Raj
1909f25931 chore: working experiment for add operator 2024-07-11 19:08:50 +05:30
Raj
1b88fa6549 chore: add transformprocessor factory to logs preview simulator 2024-07-11 19:04:21 +05:30
9 changed files with 811 additions and 47 deletions

4
go.mod
View File

@@ -84,6 +84,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/internal v1.8.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/ClickHouse/ch-go v0.61.3 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/aws/aws-sdk-go v1.53.16 // indirect
@@ -108,6 +109,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
@@ -118,6 +120,7 @@ require (
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
@@ -142,6 +145,7 @@ require (
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/oklog/run v1.1.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.102.0 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect

8
go.sum
View File

@@ -72,6 +72,8 @@ github.com/SigNoz/zap_otlp/zap_otlp_encoder v0.0.0-20230822164844-1b861a431974 h
github.com/SigNoz/zap_otlp/zap_otlp_encoder v0.0.0-20230822164844-1b861a431974/go.mod h1:fpiHtiboLJpIE5TtkQfiWx6xtnlA+uWmv+N9opETqKY=
github.com/SigNoz/zap_otlp/zap_otlp_sync v0.0.0-20230822164844-1b861a431974 h1:G2JzCrqdeOTtAn4tDFZEg5gCAEYVRXcddG3ZlrFMumo=
github.com/SigNoz/zap_otlp/zap_otlp_sync v0.0.0-20230822164844-1b861a431974/go.mod h1:YtDal1xBRQfPRNo7iSU3W37RGT0jMW7Rnzk6EON3a4M=
github.com/alecthomas/participle/v2 v2.1.1 h1:hrjKESvSqGHzRb4yW1ciisFJ4p3MGYih6icjJvbsmV8=
github.com/alecthomas/participle/v2 v2.1.1/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -244,6 +246,8 @@ github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 h1:TQcrn6Wq+sKGkpyPvppOz99zsM
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg=
github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
@@ -414,6 +418,8 @@ github.com/hetznercloud/hcloud-go/v2 v2.9.0/go.mod h1:qtW/TuU7Bs16ibXl/ktJarWqU2
github.com/hjson/hjson-go/v4 v4.0.0 h1:wlm6IYYqHjOdXH1gHev4VoXCaW20HdQAGCxdOEEg2cs=
github.com/hjson/hjson-go/v4 v4.0.0/go.mod h1:KaYt3bTw3zhBjYqnXkYywcYctk0A2nxeEFTse3rH13E=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI=
github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
@@ -583,6 +589,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.102
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.102.0/go.mod h1:cBbjwd8m4rBVgCQksUbAVQX1EoM5IuCyNQw2mzvibEM=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.102.0 h1:qsM5HhWpAfIMg8LdO4u+CHofu4UuCuJwg/M+ySO9uZA=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.102.0/go.mod h1:wBJlGy9Wx6s7AxIMcSne2sGw73e5ZUy1AQ/duYwpFf8=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.102.0 h1:EPmEtTgrlNzriEYZpkVOVDWlqWTUHoEqmM8oU/EpdkA=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.102.0/go.mod h1:qnLc/+jOVcsL1dF17ztBcf3juQ3f9bt6Wuf+Xxbrd9w=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.102.0 h1:vJL6lDaeI3pVA7ADnWKD3HMpI80BSrZ2UnGc+qkwqoY=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.102.0/go.mod h1:xtE7tds5j8PtI/wMuGb+Em5K9rJH8hm6t28Qe4QrpoU=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.102.0 h1:TvJYcU/DLRFCgHr7nT98k5D+qkZ4syKVxc8OJjv+K4c=

View File

@@ -21,7 +21,9 @@ func CollectorConfProcessorName(p Pipeline) string {
return constants.LogsPPLPfx + p.Alias
}
func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []string, error) {
// Old proc generation logic
func PreparePipelineProcessorOld(pipelines []Pipeline) (map[string]interface{}, []string, error) {
processors := map[string]interface{}{}
names := []string{}
for pipelineIdx, v := range pipelines {
@@ -230,6 +232,8 @@ func cleanTraceParser(operator *PipelineOperator) {
}
}
// End of old stuff
// Generates an expression checking that `fieldPath` has a non-nil value in a log record.
func fieldNotNilCheck(fieldPath string) (string, error) {
_, err := expr.Compile(fieldPath)

View File

@@ -0,0 +1,654 @@
// Generate collector config for log pipelines
// using ottl targeting signoztransform processor.
package logparsingpipeline
import (
"fmt"
"regexp"
"strings"
"github.com/google/uuid"
"go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr"
)
func PreparePipelineProcessor(pipelines []Pipeline) (
map[string]interface{}, []string, error,
) {
processors := map[string]interface{}{}
names := []string{}
ottlStatements := []string{}
for _, pipeline := range pipelines {
if pipeline.Enabled {
pipelineOttlStatements, err := ottlStatementsForPipeline(pipeline)
if err != nil {
return nil, nil, fmt.Errorf("couldn't generate ottl statements for pipeline %s: %w", pipeline.Alias, err)
}
ottlStatements = append(ottlStatements, pipelineOttlStatements...)
}
}
// TODO(Raj): Maybe validate ottl statements
if len(ottlStatements) > 0 {
pipelinesProcessorName := "signoztransform/logs-pipelines"
names = append(names, pipelinesProcessorName)
processors[pipelinesProcessorName] = map[string]interface{}{
"error_mode": "ignore",
"log_statements": []map[string]interface{}{
{
"context": "log",
"statements": ottlStatements,
},
},
}
}
// TODO(Raj): with error_mode: ignore, errors in ottl statements get logged and processing
// continues on to the next statement.
// So operators that get translated to multiple ottl statements must behave atomically
// as much as possible - if a statement for the op fails, there is no point running the
// statements that follow.
return processors, names, nil
}
func ottlStatementsForPipeline(pipeline Pipeline) ([]string, error) {
enabledOperators := []PipelineOperator{}
for _, op := range pipeline.Config {
if op.Enabled {
enabledOperators = append(enabledOperators, op)
}
}
if len(enabledOperators) < 1 {
return []string{}, nil
}
// We are generating one or more ottl statements per pipeline operator.
// ottl statements have individual where conditions per statement
// The simplest path is to add where clause for pipeline filter to each statement.
// However, this breaks if an early operator statement in the pipeline ends up
// modifying the fields referenced in the pipeline filter.
// To work around this, we add statements before and after the actual pipeline
// operator statements, that add and remove a pipeline specific marker, ensuring
// all operators in a pipeline get to act on the log even if an op changes the filter referenced fields.
pipelineMarker := fmt.Sprintf(
"%s-%s", pipeline.Alias, pipeline.Id, // pipeline.Id is guaranteed to be unique by DB.
)
addPipelineMarkerOttlStmt := fmt.Sprintf(
`set(attributes["__matched-log-pipeline__"], "%s")`, pipelineMarker,
)
filterExpr, err := queryBuilderToExpr.Parse(pipeline.Filter)
if err != nil {
return nil, fmt.Errorf("failed to parse pipeline filter: %w", err)
}
if len(filterExpr) > 0 {
// TODO(Raj): Update qb2Expr logic to work directly with ottl
filterOttl := exprToOttl(filterExpr)
addPipelineMarkerOttlStmt += fmt.Sprintf(" where %s", filterOttl)
}
pipelineOttlStatements := []string{addPipelineMarkerOttlStmt}
// Add ottl statements for implementing enabled pipeline operators
logMatchesPipeline := fmt.Sprintf(
`attributes["__matched-log-pipeline__"] == "%s"`, pipelineMarker,
)
for _, operator := range enabledOperators {
stmts, err := ottlStatementsForPipelineOperator(operator)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate ottl for %s operator: %w", operator.Type, err,
)
}
for _, s := range stmts {
// prepend pipeline marker check to operator ottl statement conditions
s.conditions = append([]string{logMatchesPipeline}, s.conditions...)
pipelineOttlStatements = append(pipelineOttlStatements, s.toString())
}
}
// Add a final ottl statement for the pipeline for removing pipeline marker
removePipelineMarkerFromMatchingLogs := fmt.Sprintf(
`delete_key(attributes, "__matched-log-pipeline__") where %s`, logMatchesPipeline,
)
pipelineOttlStatements = append(
pipelineOttlStatements, removePipelineMarkerFromMatchingLogs,
)
return pipelineOttlStatements, nil
}
// Operator specific ottl generation helpers follow
// struct for helping put ottl statements together
type ottlStatement struct {
// All ottl statements have exactly 1 "editor" for transforming log
editor string
// editor only gets applied if a log matches the condition
// `conditions` get joined with `AND` when being rendered to final ottl statements
conditions []string
}
func (s *ottlStatement) toString() string {
if len(s.conditions) < 1 {
return s.editor
}
conditions := []string{}
for _, c := range s.conditions {
if len(c) > 0 {
conditions = append(conditions, c)
}
}
return fmt.Sprintf(
"%s where %s", s.editor, strings.Join(conditions, " and "),
)
}
func ottlStatementsForPipelineOperator(operator PipelineOperator) (
[]ottlStatement, error,
) {
if operator.Type == "add" {
return ottlStatementsForAddOperator((operator))
} else if operator.Type == "remove" {
return ottlStatementsForRemoveOperator((operator))
} else if operator.Type == "copy" {
return ottlStatementsForCopyOperator((operator))
} else if operator.Type == "move" {
return ottlStatementsForMoveOperator((operator))
} else if operator.Type == "regex_parser" {
return ottlStatementsForRegexParser(operator)
} else if operator.Type == "grok_parser" {
return ottlStatementsForGrokParser(operator)
} else if operator.Type == "json_parser" {
return ottlStatementsForJsonParser(operator)
} else if operator.Type == "time_parser" {
return ottlStatementsForTimeParser(operator)
} else if operator.Type == "severity_parser" {
return ottlStatementsForSeverityParser(operator)
} else if operator.Type == "trace_parser" {
return ottlStatementsForTraceParser(operator)
}
return nil, fmt.Errorf("unsupported pipeline operator type: %s", operator.Type)
}
func ottlStatementsForAddOperator(
operator PipelineOperator,
) ([]ottlStatement, error) {
conditions := []string{}
value := fmt.Sprintf(`"%s"`, operator.Value)
// Handling for adding dynamic values using golang expr as allowed by logstransform add operator
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/operator/transformer/add/transformer.go#L49
// Expression values are enclosed in `EXPR(...)`. Note that only uppercase `EXPR(...)` is supported
if strings.HasPrefix(operator.Value, "EXPR(") {
expression := strings.TrimSuffix(strings.TrimPrefix(operator.Value, "EXPR("), ")")
value = exprToOttl(expression)
// Also add non-nil check condition for fields referenced in the value expression
fieldsNotNilCheck, err := fieldsReferencedInExprNotNilCheck(expression)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for fields referenced in value expr of add operator %s: %w",
operator.Name, err,
)
}
if fieldsNotNilCheck != "" {
conditions = append(conditions, exprToOttl(fieldsNotNilCheck))
}
}
return []ottlStatement{{
editor: fmt.Sprintf(`set(%s, %s)`, logTransformPathToOttlPath(operator.Field), value),
conditions: conditions,
}}, nil
}
func ottlStatementsForRemoveOperator(operator PipelineOperator) (
[]ottlStatement, error,
) {
stmt, err := ottlStatementForDeletingField(operator.Field)
if err != nil {
return nil, fmt.Errorf("couldn't generate ottl for remove operator: %w", err)
}
return []ottlStatement{*stmt}, nil
}
func ottlStatementsForCopyOperator(operator PipelineOperator) (
[]ottlStatement, error,
) {
return []ottlStatement{{
editor: fmt.Sprintf(
`set(%s, %s)`,
logTransformPathToOttlPath(operator.To),
logTransformPathToOttlPath(operator.From),
),
// TODO(Raj): What if operator.From is nil? Add a test
conditions: []string{},
}}, nil
}
func ottlStatementsForMoveOperator(operator PipelineOperator) (
[]ottlStatement, error,
) {
stmts := []ottlStatement{{
editor: fmt.Sprintf(
`set(%s, %s)`,
logTransformPathToOttlPath(operator.To),
logTransformPathToOttlPath(operator.From),
),
// TODO(Raj): What happens if operatore.From is nil here.
conditions: []string{},
}}
deleteStmt, err := ottlStatementForDeletingField(operator.From)
if err != nil {
return nil, fmt.Errorf("couldn't generate delete stmt for move op: %w", err)
}
stmts = append(stmts, *deleteStmt)
return stmts, nil
}
func ottlStatementsForRegexParser(operator PipelineOperator) (
[]ottlStatement, error,
) {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of regex op %s: %w", operator.Name, err,
)
}
// TODO(Raj): What happens if ParseTo is not a map? Add a test for this
return []ottlStatement{{
editor: fmt.Sprintf(
`merge_maps(%s, ExtractPatterns(%s, "%s"), "upsert")`,
logTransformPathToOttlPath(operator.ParseTo),
logTransformPathToOttlPath(operator.ParseFrom),
escapeDoubleQuotesForOttl(operator.Regex),
),
conditions: []string{exprToOttl(parseFromNotNilCheck)},
}}, nil
}
func ottlStatementsForGrokParser(operator PipelineOperator) (
[]ottlStatement, error,
) {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of grok op %s: %w", operator.Name, err,
)
}
// TODO(Raj): What happens if ParseTo is not a map? Add a test for this
return []ottlStatement{{
editor: fmt.Sprintf(
`merge_maps(%s, GrokParse(%s, "%s"), "upsert")`,
logTransformPathToOttlPath(operator.ParseTo),
logTransformPathToOttlPath(operator.ParseFrom),
escapeDoubleQuotesForOttl(operator.Pattern),
),
conditions: []string{exprToOttl(parseFromNotNilCheck)},
}}, nil
}
func ottlStatementsForJsonParser(operator PipelineOperator) (
[]ottlStatement, error,
) {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of json parser op %s: %w", operator.Name, err,
)
}
mapExtractStmts := ottlStatementsForExtractingMapValue(
fmt.Sprintf("ParseJSON(%s)", logTransformPathToOttlPath(operator.ParseFrom)),
logTransformPathToOttlPath(operator.ParseTo),
)
stmts := ottlStatementsWithPrependedConditions(
mapExtractStmts,
exprToOttl(parseFromNotNilCheck),
fmt.Sprintf(
`IsMatch(%s, "^\\s*{.*}\\s*$")`,
logTransformPathToOttlPath(operator.ParseFrom),
),
)
return stmts, nil
}
func ottlStatementsForTimeParser(operator PipelineOperator) (
[]ottlStatement, error,
) {
stmts := []ottlStatement{}
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of json parser op %s: %w", operator.Name, err,
)
}
whereClauseParts := []string{exprToOttl(parseFromNotNilCheck)}
if operator.LayoutType == "strptime" {
regex, err := RegexForStrptimeLayout(operator.Layout)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate layout regex for time_parser %s: %w", operator.Name, err,
)
}
whereClauseParts = append(whereClauseParts,
fmt.Sprintf(`IsMatch(%s, "%s")`, logTransformPathToOttlPath(operator.ParseFrom), regex),
)
stmts = append(stmts, ottlStatement{
editor: fmt.Sprintf(
`set(time, Time(%s, "%s"))`,
logTransformPathToOttlPath(operator.ParseFrom),
operator.Layout,
),
conditions: whereClauseParts,
})
} else if operator.LayoutType == "epoch" {
valueRegex := `^\\s*[0-9]+\\s*$`
if strings.Contains(operator.Layout, ".") {
valueRegex = `^\\s*[0-9]+\\.[0-9]+\\s*$`
}
whereClauseParts = append(whereClauseParts,
exprToOttl(fmt.Sprintf(
`string(%s) matches "%s"`, operator.ParseFrom, valueRegex,
)),
)
timeValue := fmt.Sprintf("Double(%s)", logTransformPathToOttlPath(operator.ParseFrom))
if strings.HasPrefix(operator.Layout, "seconds") {
timeValue = fmt.Sprintf("%s * 1000000000", timeValue)
} else if operator.Layout == "milliseconds" {
timeValue = fmt.Sprintf("%s * 1000000", timeValue)
} else if operator.Layout == "microseconds" {
timeValue = fmt.Sprintf("%s * 1000", timeValue)
}
stmts = append(stmts, ottlStatement{
editor: fmt.Sprintf(`set(time_unix_nano, %s)`, timeValue),
conditions: whereClauseParts,
})
} else {
return nil, fmt.Errorf("unsupported time layout %s", operator.LayoutType)
}
return stmts, nil
}
func ottlStatementsForSeverityParser(operator PipelineOperator) (
[]ottlStatement, error,
) {
stmts := []ottlStatement{}
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of severity parser %s: %w", operator.Name, err,
)
}
for severity, valuesToMap := range operator.SeverityMapping {
for _, value := range valuesToMap {
// Special case for 2xx 3xx 4xx and 5xx
isSpecialValue, err := regexp.MatchString(`^\s*[2|3|4|5]xx\s*$`, strings.ToLower(value))
if err != nil {
return nil, fmt.Errorf("couldn't regex match for wildcard severity values: %w", err)
}
if isSpecialValue {
whereClause := strings.Join([]string{
exprToOttl(parseFromNotNilCheck),
exprToOttl(fmt.Sprintf(`type(%s) in ["int", "float"] && %s == float(int(%s))`, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom)),
exprToOttl(fmt.Sprintf(`string(int(%s)) matches "^%s$"`, operator.ParseFrom, fmt.Sprintf("%s[0-9]{2}", value[0:1]))),
}, " and ")
stmts = append(stmts, ottlStatement{
editor: fmt.Sprintf("set(severity_number, SEVERITY_NUMBER_%s)", strings.ToUpper(severity)),
conditions: []string{whereClause},
})
stmts = append(stmts, ottlStatement{
editor: fmt.Sprintf(`set(severity_text, "%s")`, strings.ToUpper(severity)),
conditions: []string{whereClause},
})
} else {
whereClause := strings.Join([]string{
exprToOttl(parseFromNotNilCheck),
fmt.Sprintf(
`IsString(%s)`,
logTransformPathToOttlPath(operator.ParseFrom),
),
fmt.Sprintf(
`IsMatch(%s, "^\\s*%s\\s*$")`,
logTransformPathToOttlPath(operator.ParseFrom), value,
),
}, " and ")
stmts = append(stmts, ottlStatement{
editor: fmt.Sprintf("set(severity_number, SEVERITY_NUMBER_%s)", strings.ToUpper(severity)),
conditions: []string{whereClause},
})
stmts = append(stmts, ottlStatement{
editor: fmt.Sprintf(`set(severity_text, "%s")`, strings.ToUpper(severity)),
conditions: []string{whereClause},
})
}
}
}
return stmts, nil
}
func ottlStatementsForTraceParser(operator PipelineOperator) (
[]ottlStatement, error,
) {
stmts := []ottlStatement{}
if operator.TraceId != nil && len(operator.TraceId.ParseFrom) > 0 {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.TraceId.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for TraceId.parseFrom %s: %w", operator.Name, err,
)
}
// TODO(Raj): Also check for trace id regex pattern
stmts = append(stmts, ottlStatement{
editor: fmt.Sprintf(`set(trace_id.string, %s)`, logTransformPathToOttlPath(operator.TraceId.ParseFrom)),
conditions: []string{exprToOttl(parseFromNotNilCheck)},
})
}
if operator.SpanId != nil && len(operator.SpanId.ParseFrom) > 0 {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.SpanId.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for TraceId.parseFrom %s: %w", operator.Name, err,
)
}
// TODO(Raj): Also check for span id regex pattern
stmts = append(stmts, ottlStatement{
editor: fmt.Sprintf("set(span_id.string, %s)", logTransformPathToOttlPath(operator.SpanId.ParseFrom)),
conditions: []string{exprToOttl(parseFromNotNilCheck)},
})
}
if operator.TraceFlags != nil && len(operator.TraceFlags.ParseFrom) > 0 {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.TraceFlags.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for TraceId.parseFrom %s: %w", operator.Name, err,
)
}
// TODO(Raj): Also check for trace flags hex regex pattern
stmts = append(stmts, ottlStatement{
editor: fmt.Sprintf(`set(flags, HexToInt(%s))`, logTransformPathToOttlPath(operator.TraceFlags.ParseFrom)),
conditions: []string{exprToOttl(parseFromNotNilCheck)},
})
}
return stmts, nil
}
// TODO(Raj): This should be used in regex and grok parser too?
func ottlStatementsForExtractingMapValue(
mapGenerator string,
target string,
) []ottlStatement {
stmts := []ottlStatement{}
cacheKey := uuid.NewString()
// Extract parsed map to cache.
stmts = append(stmts, ottlStatement{
editor: fmt.Sprintf(
`set(cache["%s"], %s)`, cacheKey, mapGenerator,
),
conditions: []string{},
})
// Set target to a map if not already one.
stmts = append(stmts, ottlStatement{
editor: fmt.Sprintf(
`set(%s, ParseJSON("{}"))`, logTransformPathToOttlPath(target),
),
conditions: []string{
fmt.Sprintf(`cache["%s"] != nil`, cacheKey),
fmt.Sprintf("not IsMap(%s)", logTransformPathToOttlPath(target)),
},
})
stmts = append(stmts, ottlStatement{
editor: fmt.Sprintf(
`merge_maps(%s, cache["%s"], "upsert")`,
logTransformPathToOttlPath(target), cacheKey,
),
conditions: []string{fmt.Sprintf(`cache["%s"] != nil`, cacheKey)},
})
return stmts
}
func ottlStatementForDeletingField(fieldPath string) (*ottlStatement, error) {
ottlPath := logTransformPathToOttlPath(fieldPath)
fieldPathParts := rSplitAfterN(ottlPath, "[", 2)
target := fieldPathParts[0]
key := fieldPathParts[1][1 : len(fieldPathParts[1])-1]
pathNotNilCheck, err := fieldNotNilCheck(fieldPath)
if err != nil {
return nil, fmt.Errorf("couldn't generate nil check for path %s: %w", fieldPath, err)
}
return &ottlStatement{
editor: fmt.Sprintf(`delete_key(%s, %s)`, target, key),
conditions: []string{exprToOttl(pathNotNilCheck)},
}, nil
}
// a.b?.c -> ["a", "b", "c"]
// a.b["c.d"].e -> ["a", "b", "c.d", "e"]
func pathParts(path string) []string {
path = strings.ReplaceAll(path, "?.", ".")
// Split once from the right to include the rightmost membership op and everything after it.
// Eg: `attributes.test["a.b"].value["c.d"].e` would result in `attributes.test["a.b"].value` and `["c.d"].e`
memberOpParts := rSplitAfterN(path, "[", 2)
if len(memberOpParts) < 2 {
// there is no [] access in fieldPath
return strings.Split(path, ".")
}
// recursively get parts for path prefix before rightmost membership op (`attributes.test["a.b"].value`)
parts := pathParts(memberOpParts[0])
suffixParts := strings.SplitAfter(memberOpParts[1], "]") // ["c.d"].e -> `["c.d"]`, `.e`
// add key used in membership op ("c.d")
parts = append(parts, suffixParts[0][2:len(suffixParts[0])-2])
// add parts for path after the membership op ("e")
if len(suffixParts[1]) > 0 {
parts = append(parts, strings.Split(suffixParts[1][1:], ".")...)
}
return parts
}
// converts a logtransform path to an equivalent ottl path
// For details, see https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottllog#paths
func logTransformPathToOttlPath(path string) string {
if !(strings.HasPrefix(path, "attributes") || strings.HasPrefix(path, "resource")) {
return path
}
parts := pathParts(path)
ottlPathParts := []string{parts[0]}
if ottlPathParts[0] == "resource" {
ottlPathParts[0] = "resource.attributes"
}
for _, p := range parts[1:] {
ottlPathParts = append(ottlPathParts, fmt.Sprintf(`["%s"]`, p))
}
return strings.Join(ottlPathParts, "")
}
func escapeDoubleQuotesForOttl(str string) string {
return strings.ReplaceAll(
strings.ReplaceAll(str, `\`, `\\`), `"`, `\"`,
)
}
func exprToOttl(expr string) string {
return fmt.Sprintf(`EXPR("%s")`, escapeDoubleQuotesForOttl(expr))
}
func ottlStatementsWithPrependedConditions(
statements []ottlStatement,
conditionsToPrepend ...string,
) []ottlStatement {
stmts := []ottlStatement{}
for _, s := range statements {
stmts = append(stmts, ottlStatement{
editor: s.editor,
conditions: append(
conditionsToPrepend, s.conditions...,
),
})
}
return stmts
}

View File

@@ -199,9 +199,10 @@ var prepareProcessorTestData = []struct {
func TestPreparePipelineProcessor(t *testing.T) {
for _, test := range prepareProcessorTestData {
Convey(test.Name, t, func() {
res, err := getOperators(test.Operators)
So(err, ShouldBeNil)
So(res, ShouldResemble, test.Output)
require.NotNil(t, nil, "TODO(Raj): maybe use these tests in new config generation")
// res, err := getOperators(test.Operators)
// So(err, ShouldBeNil)
// So(res, ShouldResemble, test.Output)
})
}
}
@@ -656,14 +657,24 @@ func TestMembershipOpInProcessorFieldExpressions(t *testing.T) {
Name: "move",
From: `attributes["http.method"]`,
To: `attributes["test.http.method"]`,
}, {
},
{
ID: "json",
Type: "json_parser",
Enabled: true,
Name: "json",
ParseFrom: `attributes["order.products"]`,
ParseTo: `attributes["some"].missing.target`,
},
{
ID: "json",
Type: "json_parser",
Enabled: true,
Name: "json",
ParseFrom: `attributes["order.products"]`,
ParseTo: `attributes["order.products"]`,
}, {
},
{
ID: "move1",
Type: "move",
Enabled: true,
@@ -678,6 +689,14 @@ func TestMembershipOpInProcessorFieldExpressions(t *testing.T) {
From: `attributes.test?.doesnt_exist`,
To: `attributes["test.doesnt_exist"]`,
}, {
ID: "addToMissingField",
Type: "add",
Enabled: true,
Name: "add",
Field: `attributes["another"].new.missing_field`,
Value: `2`,
},
{
ID: "add",
Type: "add",
Enabled: true,
@@ -722,6 +741,7 @@ func TestMembershipOpInProcessorFieldExpressions(t *testing.T) {
require.False(methodAttrExists)
require.Equal("GET", result[0].Attributes_string["test.http.method"])
require.Equal("pid0", result[0].Attributes_string["order.pids.pid0"])
require.Equal(`{"new":{"missing_field":"2"}}`, result[0].Attributes_string["another"])
}
func TestContainsFilterIsCaseInsensitive(t *testing.T) {
@@ -803,3 +823,69 @@ func TestContainsFilterIsCaseInsensitive(t *testing.T) {
_, test2Exists := result[0].Attributes_string["test2"]
require.False(test2Exists)
}
func TestAllOpsAppliedEvenIfFirstOpAltersAttribsReferencedInPipelineFilter(t *testing.T) {
// If the first op in a pipeline alters the log record in such a way that
// it would not match the pipeline filter anymore, the rest of the ops
// should still be applied to the log record
require := require.New(t)
testLogs := []model.SignozLog{
makeTestSignozLog("test Ecom Log", map[string]interface{}{
"http.method": "GET",
}),
}
testPipelines := []Pipeline{{
OrderId: 1,
Name: "pipeline1",
Alias: "pipeline1",
Enabled: true,
Filter: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "http.method",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "GET",
},
},
},
Config: []PipelineOperator{
{
ID: "move",
Type: "move",
Enabled: true,
Name: "move",
From: `attributes["http.method"]`,
To: `attributes["test.http.method"]`,
},
{
ID: "add",
Type: "add",
Enabled: true,
Name: "add",
Field: "attributes.test2",
Value: "test2",
},
},
}}
result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing(
context.Background(), testPipelines, testLogs,
)
require.Nil(err)
require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n"))
require.Equal(1, len(result))
_, filterAttribExists := result[0].Attributes_string["http.method"]
require.False(filterAttribExists)
require.Equal(result[0].Attributes_string["test.http.method"], "GET")
require.Equal(result[0].Attributes_string["test2"], "test2")
}

View File

@@ -7,7 +7,7 @@ import (
"time"
_ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor"
"github.com/SigNoz/signoz-otel-collector/processor/signoztransformprocessor"
"github.com/pkg/errors"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
@@ -42,7 +42,7 @@ func SimulatePipelinesProcessing(
simulatorInputPLogs := SignozLogsToPLogs(logs)
processorFactories, err := processor.MakeFactoryMap(
logstransformprocessor.NewFactory(),
signoztransformprocessor.NewFactory(),
)
if err != nil {
return nil, nil, model.InternalError(errors.Wrap(

View File

@@ -292,9 +292,10 @@ func TestTraceParsingProcessor(t *testing.T) {
testLog,
},
)
require.Nil(err)
require.Equal(1, len(result))
require.Equal(0, len(collectorWarnAndErrorLogs))
require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n"))
processed := result[0]
require.Equal(testTraceId, processed.TraceID)
@@ -346,20 +347,38 @@ func TestAddProcessor(t *testing.T) {
},
}
var parserOp PipelineOperator
err := json.Unmarshal([]byte(`
{
for _, procJson := range []string{
`{
"orderId": 1,
"enabled": true,
"type": "add",
"name": "Test add parser",
"id": "test-add-parser",
"field": "resource.test",
"value": "1"
}`, `{
"orderId": 2,
"enabled": true,
"type": "add",
"name": "Test enabled add resource parser",
"id": "test-add-parser",
"field": "attributes.test",
"value": "EXPR(int(resource.test) + 1)"
}`, `{
"orderId": 3,
"enabled": false,
"type": "add",
"name": "Test disabled add parser",
"id": "test-add-parser",
"field": "attributes.testMissing",
"value": "test"
}
`), &parserOp)
require.Nil(err)
testPipelines[0].Config = append(testPipelines[0].Config, parserOp)
}`,
} {
var parserOp PipelineOperator
err := json.Unmarshal([]byte(procJson), &parserOp)
require.Nil(err)
testPipelines[0].Config = append(testPipelines[0].Config, parserOp)
}
testLog := makeTestSignozLog(
"test log",
@@ -379,7 +398,9 @@ func TestAddProcessor(t *testing.T) {
require.Equal(1, len(result))
require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n"))
processed := result[0]
require.Equal("test", processed.Attributes_string["test"])
require.Equal("1", processed.Resources_string["test"])
require.Equal(int64(2), processed.Attributes_int64["test"])
require.Equal("", processed.Attributes_string["testMissing"])
}
func TestRemoveProcessor(t *testing.T) {

View File

@@ -305,7 +305,7 @@ var ReservedColumnTargetAliases = map[string]struct{}{
}
// logsPPLPfx is a short constant for logsPipelinePrefix
const LogsPPLPfx = "logstransform/pipeline_"
const LogsPPLPfx = "signoztransform/logs-pipeline"
const IntegrationPipelineIdPrefix = "integration"

View File

@@ -27,10 +27,8 @@ import (
"go.signoz.io/signoz/pkg/query-service/dao"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr"
"go.signoz.io/signoz/pkg/query-service/utils"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)
func TestLogPipelinesLifecycle(t *testing.T) {
@@ -628,10 +626,14 @@ func assertPipelinesRecommendedInRemoteConfig(
t.Fatalf("could not unmarshal config file sent to opamp client: %v", err)
}
// Each pipeline is expected to become its own processor
// in the logs service in otel collector config.
// Validate expected collector config processors for log pipelines
// are present in config recommended to opamp client
expectedProcessors, expectedProcNames, err := logparsingpipeline.PreparePipelineProcessor(pipelines)
collectorConfSvcs := collectorConfSentToClient["service"].(map[string]interface{})
collectorConfLogsSvc := collectorConfSvcs["pipelines"].(map[string]interface{})["logs"].(map[string]interface{})
collectorConfLogsSvcProcessorNames := collectorConfLogsSvc["processors"].([]interface{})
collectorConfLogsPipelineProcNames := []string{}
for _, procNameVal := range collectorConfLogsSvcProcessorNames {
@@ -644,43 +646,28 @@ func assertPipelinesRecommendedInRemoteConfig(
}
}
_, expectedLogProcessorNames, err := logparsingpipeline.PreparePipelineProcessor(pipelines)
_, expectedProcNames, err = logparsingpipeline.PreparePipelineProcessor(pipelines)
require.NoError(t, err)
require.Equal(
t, expectedLogProcessorNames, collectorConfLogsPipelineProcNames,
t, expectedProcNames, collectorConfLogsPipelineProcNames,
"config sent to opamp client doesn't contain expected log pipelines",
)
collectorConfProcessors := collectorConfSentToClient["processors"].(map[string]interface{})
for _, procName := range expectedLogProcessorNames {
for _, procName := range expectedProcNames {
pipelineProcessorInConf, procExists := collectorConfProcessors[procName]
require.True(t, procExists, fmt.Sprintf(
"%s processor not found in config sent to opamp client", procName,
))
// Validate that filter expr in collector conf is as expected.
// extract expr present in collector conf processor
pipelineProcOps := pipelineProcessorInConf.(map[string]interface{})["operators"].([]interface{})
routerOpIdx := slices.IndexFunc(
pipelineProcOps,
func(op interface{}) bool { return op.(map[string]interface{})["id"] == "router_signoz" },
)
require.GreaterOrEqual(t, routerOpIdx, 0)
routerOproutes := pipelineProcOps[routerOpIdx].(map[string]interface{})["routes"].([]interface{})
pipelineFilterExpr := routerOproutes[0].(map[string]interface{})["expr"].(string)
// find logparsingpipeline.Pipeline whose processor is being validated here
pipelineIdx := slices.IndexFunc(
pipelines, func(p logparsingpipeline.Pipeline) bool {
return logparsingpipeline.CollectorConfProcessorName(p) == procName
},
)
require.GreaterOrEqual(t, pipelineIdx, 0)
expectedExpr, err := queryBuilderToExpr.Parse(pipelines[pipelineIdx].Filter)
procInConfYaml, err := yaml.Parser().Marshal(pipelineProcessorInConf.(map[string]interface{}))
require.Nil(t, err)
require.Equal(t, expectedExpr, pipelineFilterExpr)
expectedProcYaml, err := yaml.Parser().Marshal(expectedProcessors[procName].(map[string]interface{}))
require.Nil(t, err)
require.Equal(t, procInConfYaml, expectedProcYaml)
}
}