feat: JSON Flattening in logs pipelines (#8227)

* feat: introducing JSON Flattening

* fix: removed bug and tested

* test: removed testing test

* feat: additional severity levels, and some clearing

* chore: minor changes

* test: added tests for processJSONParser

* test: added check for OnError

* fix: review from ellipsis

* fix: variablise max flattening depth

* Update pkg/query-service/app/logparsingpipeline/pipelineBuilder.go

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>

* Update pkg/errors/errors.go

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>

* fix: quoted JSON strings fix

* test: updating otel collector for testing

* test: update collector's reference

* chore: change with new error package

* chore: set flattening depth equal to 1

* fix: fallback for depth

* fix: change in errors package

* fix: tests

* fix: test

* chore: update collector version

* fix: go.sum

---------

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
Co-authored-by: Nityananda Gohain <nityanandagohain@gmail.com>
This commit is contained in:
Piyush Singariya
2025-07-14 18:48:01 +05:30
committed by GitHub
parent 6137740907
commit d6eed8e79d
11 changed files with 473 additions and 43 deletions

2
go.mod
View File

@@ -8,7 +8,7 @@ require (
github.com/ClickHouse/clickhouse-go/v2 v2.36.0
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd
github.com/SigNoz/signoz-otel-collector v0.111.43-aded056
github.com/SigNoz/signoz-otel-collector v0.128.1
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/antonmedv/expr v1.15.3
github.com/cespare/xxhash/v2 v2.3.0

4
go.sum
View File

@@ -98,8 +98,8 @@ github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd h1:Bk43AsDYe0fhkbj57eGXx8H3ZJ4zhmQXBnrW523ktj8=
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd/go.mod h1:nxRcH/OEdM8QxzH37xkGzomr1O0JpYBRS6pwjsWW6Pc=
github.com/SigNoz/signoz-otel-collector v0.111.43-aded056 h1:lJ7262JHZlHX7KuUlQa8vpWCdgZKwlZ2P6sUmZEqNLE=
github.com/SigNoz/signoz-otel-collector v0.111.43-aded056/go.mod h1:AHfJ2N/74IXsrbYEPAlqfJeKg006VTt63vBZglUK3jY=
github.com/SigNoz/signoz-otel-collector v0.128.1 h1:D0bKMrRNgcKreKKYoakCr5jTWj1srupbNwGIvpHMihw=
github.com/SigNoz/signoz-otel-collector v0.128.1/go.mod h1:vFQLsJFzQwVkO1ltIMH+z9KKuTZTn/P0lKu2mNYDBpE=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=

View File

@@ -79,6 +79,22 @@ func Wrapf(cause error, t typ, code Code, format string, args ...interface{}) *b
}
}
// WithAdditional wraps an existing base error with a new formatted message.
// It is used when the original error already contains type and code.
func WithAdditional(cause error, format string, args ...interface{}) *base {
t, c, m, e, u, a := Unwrapb(cause)
b := &base{
t: t,
c: c,
m: m,
e: e,
u: u,
a: a,
}
return b.WithAdditional(append(a, fmt.Sprintf(format, args...))...)
}
// WithUrl adds a url to the base error and returns a new base error.
func (b *base) WithUrl(u string) *base {
return &base{
@@ -169,3 +185,11 @@ func WrapInvalidInputf(cause error, code Code, format string, args ...interface{
func NewInvalidInputf(code Code, format string, args ...interface{}) *base {
return Newf(TypeInvalidInput, code, format, args...)
}
func WrapUnexpectedf(cause error, code Code, format string, args ...interface{}) *base {
return Wrapf(cause, TypeInvalidInput, code, format, args...)
}
func NewUnexpectedf(code Code, format string, args ...interface{}) *base {
return Newf(TypeInvalidInput, code, format, args...)
}

View File

@@ -11,6 +11,7 @@ var (
TypeForbidden = typ{"forbidden"}
TypeCanceled = typ{"canceled"}
TypeTimeout = typ{"timeout"}
TypeUnexpected = typ{"unexpected"} // Generic mismatch of expectations
)
// Defines custom error types

View File

@@ -0,0 +1,8 @@
package logparsingpipeline
import "github.com/SigNoz/signoz/pkg/errors"
var (
CodeInvalidOperatorType = errors.MustNewCode("operator_type_mismatch")
CodeFieldNilCheckType = errors.MustNewCode("operator_field_nil_check")
)

View File

@@ -6,13 +6,15 @@ import (
"slices"
"strings"
signozstanzahelper "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/helper"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/query-service/queryBuilderToExpr"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/antonmedv/expr"
"github.com/antonmedv/expr/ast"
"github.com/antonmedv/expr/parser"
"github.com/pkg/errors"
"github.com/google/uuid"
)
const (
@@ -38,7 +40,7 @@ func PreparePipelineProcessor(gettablePipelines []pipelinetypes.GettablePipeline
operators, err := getOperators(v.Config)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to prepare operators")
return nil, nil, err
}
if len(operators) == 0 {
@@ -47,7 +49,7 @@ func PreparePipelineProcessor(gettablePipelines []pipelinetypes.GettablePipeline
filterExpr, err := queryBuilderToExpr.Parse(v.Filter)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to parse pipeline filter")
return nil, nil, err
}
router := []pipelinetypes.PipelineOperator{
@@ -93,10 +95,6 @@ func getOperators(ops []pipelinetypes.PipelineOperator) ([]pipelinetypes.Pipelin
filteredOp := []pipelinetypes.PipelineOperator{}
for i, operator := range ops {
if operator.Enabled {
if len(filteredOp) > 0 {
filteredOp[len(filteredOp)-1].Output = operator.ID
}
if operator.Type == "regex_parser" {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
@@ -124,16 +122,13 @@ func getOperators(ops []pipelinetypes.PipelineOperator) ([]pipelinetypes.Pipelin
operator.If = parseFromNotNilCheck
} else if operator.Type == "json_parser" {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
operators, err := processJSONParser(&operator)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of json parser op %s: %w", operator.Name, err,
)
return nil, fmt.Errorf("couldn't process json_parser op %s: %s", operator.Name, err)
}
operator.If = fmt.Sprintf(
`%s && ((type(%s) == "string" && %s matches "^\\s*{.*}\\s*$" ) || type(%s) == "map")`,
parseFromNotNilCheck, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom,
)
filteredOp = append(filteredOp, operators...)
continue // Continue here to skip deduplication of json_parser operator
} else if operator.Type == "add" {
if strings.HasPrefix(operator.Value, "EXPR(") && strings.HasSuffix(operator.Value, ")") {
expression := strings.TrimSuffix(strings.TrimPrefix(operator.Value, "EXPR("), ")")
@@ -148,7 +143,6 @@ func getOperators(ops []pipelinetypes.PipelineOperator) ([]pipelinetypes.Pipelin
operator.If = fieldsNotNilCheck
}
}
} else if operator.Type == "move" || operator.Type == "copy" {
fromNotNilCheck, err := fieldNotNilCheck(operator.From)
if err != nil {
@@ -157,7 +151,6 @@ func getOperators(ops []pipelinetypes.PipelineOperator) ([]pipelinetypes.Pipelin
)
}
operator.If = fromNotNilCheck
} else if operator.Type == "remove" {
fieldNotNilCheck, err := fieldNotNilCheck(operator.Field)
if err != nil {
@@ -166,10 +159,8 @@ func getOperators(ops []pipelinetypes.PipelineOperator) ([]pipelinetypes.Pipelin
)
}
operator.If = fieldNotNilCheck
} else if operator.Type == "trace_parser" {
cleanTraceParser(&operator)
} else if operator.Type == "time_parser" {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
@@ -202,19 +193,11 @@ func getOperators(ops []pipelinetypes.PipelineOperator) ([]pipelinetypes.Pipelin
}
// TODO(Raj): Maybe add support for gotime too eventually
} else if operator.Type == "severity_parser" {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
err := processSeverityParser(&operator)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of severity parser %s: %w", operator.Name, err,
)
return nil, err
}
operator.If = fmt.Sprintf(
`%s && ( type(%s) == "string" || ( type(%s) in ["int", "float"] && %s == float(int(%s)) ) )`,
parseFromNotNilCheck, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom,
)
}
filteredOp = append(filteredOp, operator)
@@ -222,9 +205,193 @@ func getOperators(ops []pipelinetypes.PipelineOperator) ([]pipelinetypes.Pipelin
filteredOp[len(filteredOp)-1].Output = ""
}
}
for idx := range filteredOp {
if idx > 0 {
filteredOp[idx-1].Output = filteredOp[idx].ID
}
}
return filteredOp, nil
}
func processSeverityParser(operator *pipelinetypes.PipelineOperator) error {
if operator.Type != "severity_parser" {
return errors.NewUnexpectedf(CodeInvalidOperatorType, "operator type received %s", operator.Type)
}
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return errors.WrapInvalidInputf(err, CodeFieldNilCheckType,
"couldn't generate nil check for parseFrom of severity parser %s", operator.Name,
)
}
operator.If = fmt.Sprintf(
`%s && ( type(%s) == "string" || ( type(%s) in ["int", "float"] && %s == float(int(%s)) ) )`,
parseFromNotNilCheck, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom,
)
return nil
}
// processJSONParser converts simple JSON parser operator into multiple operators for JSONMapping of default variables
func processJSONParser(parent *pipelinetypes.PipelineOperator) ([]pipelinetypes.PipelineOperator, error) {
if parent.Type != "json_parser" {
return nil, errors.NewUnexpectedf(CodeInvalidOperatorType, "operator type received %s", parent.Type)
}
parseFromNotNilCheck, err := fieldNotNilCheck(parent.ParseFrom)
if err != nil {
return nil, errors.WrapInvalidInputf(err, CodeFieldNilCheckType,
"couldn't generate nil check for parseFrom of json parser op %s: %s", parent.Name, err,
)
}
parent.If = fmt.Sprintf(
`%s && ((type(%s) == "string" && isJSON(%s) && type(fromJSON(unquote(%s))) == "map" ) || type(%s) == "map")`,
parseFromNotNilCheck, parent.ParseFrom, parent.ParseFrom, parent.ParseFrom, parent.ParseFrom,
)
if parent.EnableFlattening {
parent.MaxFlatteningDepth = constants.MaxJSONFlatteningDepth
}
// return if no mapping available
if parent.Mapping == nil {
return []pipelinetypes.PipelineOperator{*parent}, nil
}
mapping := parent.Mapping
children := []pipelinetypes.PipelineOperator{}
// cloning since the same function is used when saving pipelines (POST request) hence reversing
// the same array inplace ends up with saving mapping in a reversed order in database
cloneAndReverse := func(input []string) []string {
cloned := slices.Clone(input)
slices.Reverse(cloned)
return cloned
}
generateCustomID := func() string {
return fmt.Sprintf("%s-json-parser", uuid.NewString()) // json-parser helps in identifying processors part of JSON Parser
}
// reusable move operator function
generateMoveOperators := func(keywords []string, to string) error {
for _, keyword := range cloneAndReverse(keywords) {
operator := pipelinetypes.PipelineOperator{
Type: "move",
ID: generateCustomID(),
OnError: signozstanzahelper.SendOnErrorQuiet,
From: fmt.Sprintf(`%s["%s"]`, parent.ParseTo, keyword),
To: to,
}
fromNotNilCheck, err := fieldNotNilCheck(operator.From)
if err != nil {
return err
}
operator.If = fromNotNilCheck
children = append(children, operator)
}
return nil
}
// JSONMapping: host
err = generateMoveOperators(mapping[pipelinetypes.Host], `resource["host.name"]`)
if err != nil {
return nil, err
}
// JSONMapping: service
err = generateMoveOperators(mapping[pipelinetypes.Service], `resource["service.name"]`)
if err != nil {
return nil, err
}
// JSONMapping: trace_id
for _, keyword := range cloneAndReverse(mapping[pipelinetypes.TraceID]) {
operator := pipelinetypes.PipelineOperator{
Type: "trace_parser",
ID: generateCustomID(),
OnError: signozstanzahelper.SendOnErrorQuiet,
TraceParser: &pipelinetypes.TraceParser{
TraceId: &pipelinetypes.ParseFrom{
ParseFrom: fmt.Sprintf(`%s["%s"]`, parent.ParseTo, keyword),
},
},
}
children = append(children, operator)
}
// JSONMapping: span_id
for _, keyword := range cloneAndReverse(mapping[pipelinetypes.SpanID]) {
operator := pipelinetypes.PipelineOperator{
Type: "trace_parser",
ID: generateCustomID(),
OnError: signozstanzahelper.SendOnErrorQuiet,
TraceParser: &pipelinetypes.TraceParser{
SpanId: &pipelinetypes.ParseFrom{
ParseFrom: fmt.Sprintf(`%s["%s"]`, parent.ParseTo, keyword),
},
},
}
children = append(children, operator)
}
// JSONMapping: trace_flags
for _, keyword := range cloneAndReverse(mapping[pipelinetypes.TraceFlags]) {
operator := pipelinetypes.PipelineOperator{
Type: "trace_parser",
ID: generateCustomID(),
OnError: signozstanzahelper.SendOnErrorQuiet,
TraceParser: &pipelinetypes.TraceParser{
TraceFlags: &pipelinetypes.ParseFrom{
ParseFrom: fmt.Sprintf(`%s["%s"]`, parent.ParseTo, keyword),
},
},
}
children = append(children, operator)
}
// JSONMapping: severity
for _, keyword := range cloneAndReverse(mapping[pipelinetypes.Severity]) {
operator := pipelinetypes.PipelineOperator{
Type: "severity_parser",
ID: generateCustomID(),
OnError: signozstanzahelper.SendOnErrorQuiet,
ParseFrom: fmt.Sprintf(`%s["%s"]`, parent.ParseTo, keyword),
}
err := processSeverityParser(&operator)
if err != nil {
return nil, err
}
operator.Mapping = pipelinetypes.DefaultSeverityMapping
children = append(children, operator)
}
// JSONMapping: environment
err = generateMoveOperators(mapping[pipelinetypes.Environment], `resource["deployment.environment.name"]`)
if err != nil {
return nil, err
}
// JSONMapping: body
err = generateMoveOperators(mapping[pipelinetypes.Message], `body`)
if err != nil {
return nil, err
}
// removed mapping reference so it doesn't appear in Collector's config
parent.Mapping = nil
return append(append([]pipelinetypes.PipelineOperator{}, *parent), children...), nil
}
// TODO: (Piyush) remove this in future
func cleanTraceParser(operator *pipelinetypes.PipelineOperator) {
if operator.TraceId != nil && len(operator.TraceId.ParseFrom) < 1 {
operator.TraceId = nil
@@ -241,7 +408,7 @@ func cleanTraceParser(operator *pipelinetypes.PipelineOperator) {
func fieldNotNilCheck(fieldPath string) (string, error) {
_, err := expr.Compile(fieldPath)
if err != nil {
return "", fmt.Errorf("invalid fieldPath %s: %w", fieldPath, err)
return "", errors.WrapInvalidInputf(err, CodeFieldNilCheckType, "invalid fieldPath %s", fieldPath)
}
// helper for turning `.` into `?.` in field paths.
@@ -270,7 +437,7 @@ func fieldNotNilCheck(fieldPath string) (string, error) {
// should come out to be (attributes.test != nil && attributes.test["a.b"]?.value != nil)
collectionNotNilCheck, err := fieldNotNilCheck(parts[0])
if err != nil {
return "", fmt.Errorf("couldn't generate nil check for %s: %w", parts[0], err)
return "", errors.WithAdditional(err, "couldn't generate nil check for %s", parts[0])
}
// generate nil check for entire path.

View File

@@ -7,12 +7,14 @@ import (
"testing"
"time"
signozstanzahelper "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/helper"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/require"
@@ -841,3 +843,147 @@ func TestContainsFilterIsCaseInsensitive(t *testing.T) {
_, test2Exists := result[0].Attributes_string["test2"]
require.False(test2Exists)
}
func TestProcessJSONParser_WithFlatteningAndMapping(t *testing.T) {
parserID := uuid.NewString()
outputID := uuid.NewString()
parent := &pipelinetypes.PipelineOperator{
Type: "json_parser",
ID: parserID,
Name: "Parse JSON",
OrderId: 1,
Enabled: true,
ParseFrom: "body",
ParseTo: "attributes",
Output: outputID,
EnableFlattening: true,
EnablePaths: false,
PathPrefix: "",
Mapping: map[string][]string{
pipelinetypes.Host: {"host", "hostname"},
pipelinetypes.Service: {"service", "syslog.appname"},
pipelinetypes.Severity: {"status", "severity", "level", "syslog.severity"},
pipelinetypes.TraceID: {"trace_id"},
pipelinetypes.SpanID: {"span_id"},
pipelinetypes.Message: {"message", "msg", "log"},
pipelinetypes.TraceFlags: {"flags"},
pipelinetypes.Environment: {"service.env"},
},
}
// Total children generated = sum(len(mapping values)) + severity_parser + trace_parser ops
expectedMoveOps := len(parent.Mapping[pipelinetypes.Host]) +
len(parent.Mapping[pipelinetypes.Service]) +
len(parent.Mapping[pipelinetypes.Message]) +
len(parent.Mapping[pipelinetypes.Environment])
expectedTraceOps := len(parent.Mapping[pipelinetypes.TraceID]) +
len(parent.Mapping[pipelinetypes.SpanID]) +
len(parent.Mapping[pipelinetypes.TraceFlags])
expectedSeverityOps := len(parent.Mapping[pipelinetypes.Severity]) // severity_parser
totalOps := expectedMoveOps + expectedTraceOps + expectedSeverityOps
ops, err := processJSONParser(parent)
require.NoError(t, err)
require.NotEmpty(t, ops)
// Parent is always first
parentOp := ops[0]
require.Equal(t, "json_parser", parentOp.Type)
require.Equal(t, 1, parentOp.MaxFlatteningDepth)
require.Nil(t, parentOp.Mapping) // Mapping should be removed
require.Nil(t, parent.Mapping) // Mapping should be removed
require.Contains(t, parentOp.If, `isJSON(body)`)
require.Contains(t, parentOp.If, `type(body)`)
require.Equal(t, 1+totalOps, len(ops))
var traceParserCount, moveCount, severityParserCount int
for _, op := range ops[1:] {
require.NotEmpty(t, op.ID)
require.Equal(t, op.OnError, signozstanzahelper.SendOnErrorQuiet)
switch op.Type {
case "move":
require.NotEmpty(t, op.From)
require.NotEmpty(t, op.To)
moveCount++
case "trace_parser":
require.NotNil(t, op.TraceParser)
traceParserCount++
case "severity_parser":
require.NotEmpty(t, op.ParseFrom)
require.NotEmpty(t, op.If)
severityParserCount++
default:
t.Errorf("unexpected operator type: %s", op.Type)
}
}
require.Equal(t, expectedMoveOps, moveCount)
require.Equal(t, expectedTraceOps, traceParserCount)
require.Equal(t, expectedSeverityOps, severityParserCount)
}
func TestProcessJSONParser_WithoutMapping(t *testing.T) {
parent := &pipelinetypes.PipelineOperator{
Type: "json_parser",
ID: uuid.NewString(),
Name: "Parse JSON",
OrderId: 1,
Enabled: true,
ParseFrom: "body",
ParseTo: "attributes",
EnableFlattening: true,
EnablePaths: true,
PathPrefix: "parsed",
Mapping: nil, // No mapping
}
ops, err := processJSONParser(parent)
require.NoError(t, err)
require.Len(t, ops, 1) // Only the parent operator should exist
op := ops[0]
require.Equal(t, "json_parser", op.Type)
require.Equal(t, 1, op.MaxFlatteningDepth)
require.True(t, op.EnableFlattening)
require.True(t, op.EnablePaths)
require.Equal(t, "parsed", op.PathPrefix)
require.Contains(t, op.If, `isJSON(body)`)
}
func TestProcessJSONParser_Simple(t *testing.T) {
parent := &pipelinetypes.PipelineOperator{
Type: "json_parser",
ID: uuid.NewString(),
Name: "Parse JSON",
OrderId: 1,
Enabled: true,
ParseFrom: "body",
ParseTo: "attributes",
}
ops, err := processJSONParser(parent)
require.NoError(t, err)
require.Len(t, ops, 1) // Only the parent operator should exist
op := ops[0]
require.Equal(t, "json_parser", op.Type)
require.Equal(t, 0, op.MaxFlatteningDepth)
require.False(t, op.EnableFlattening)
require.False(t, op.EnablePaths)
require.Equal(t, "", op.PathPrefix)
require.Contains(t, op.If, `isJSON(body)`)
}
func TestProcessJSONParser_InvalidType(t *testing.T) {
parent := &pipelinetypes.PipelineOperator{
Type: "copy", // Invalid type
}
_, err := processJSONParser(parent)
require.Error(t, err)
require.Contains(t, err.Error(), "operator type received copy")
}

View File

@@ -183,7 +183,7 @@ func TestNoCollectorErrorsFromSeverityParserForMismatchedLogs(t *testing.T) {
Enabled: true,
Name: "severity parser",
ParseFrom: "attributes.test_severity",
SeverityMapping: map[string][]string{
Mapping: map[string][]string{
"debug": {"debug"},
},
OverwriteSeverityText: true,
@@ -199,7 +199,7 @@ func TestNoCollectorErrorsFromSeverityParserForMismatchedLogs(t *testing.T) {
Enabled: true,
Name: "severity parser",
ParseFrom: "attributes.test_severity",
SeverityMapping: map[string][]string{
Mapping: map[string][]string{
"debug": {"debug"},
},
OverwriteSeverityText: true,

View File

@@ -604,6 +604,7 @@ var StaticFieldsTraces = map[string]v3.AttributeKey{}
var IsDotMetricsEnabled = false
var PreferSpanMetrics = false
var MaxJSONFlatteningDepth = 1
func init() {
StaticFieldsTraces = maps.Clone(NewStaticFieldsTraces)
@@ -614,6 +615,12 @@ func init() {
if GetOrDefaultEnv("USE_SPAN_METRICS", "false") == "true" {
PreferSpanMetrics = true
}
// set max flattening depth
depth, err := strconv.Atoi(GetOrDefaultEnv(maxJSONFlatteningDepth, "1"))
if err == nil {
MaxJSONFlatteningDepth = depth
}
}
const TRACE_V4_MAX_PAGINATION_LIMIT = 10000
@@ -641,3 +648,4 @@ func GetDefaultSiteURL() string {
}
const DotMetricsEnabled = "DOT_METRICS_ENABLED"
const maxJSONFlatteningDepth = "MAX_JSON_FLATTENING_DEPTH"

View File

@@ -14,6 +14,32 @@ import (
"github.com/uptrace/bun"
)
type JSONMappingType = string
const (
Host JSONMappingType = "host"
Service JSONMappingType = "service"
Environment JSONMappingType = "environment"
Severity JSONMappingType = "severity"
TraceID JSONMappingType = "trace_id"
SpanID JSONMappingType = "span_id"
TraceFlags JSONMappingType = "trace_flags"
Message JSONMappingType = "message"
)
var DefaultSeverityMapping = map[string][]string{
"trace": {"TRACE", "Trace", "trace", "trc", "Trc"},
"debug": {"DEBUG", "Debug", "debug", "dbg", "Dbg"},
"info": {"INFO", "Info", "info"},
"warn": {"WARN", "Warn", "warn", "warning", "Warning", "wrn", "Wrn"},
"error": {"ERROR", "Error", "error", "err", "Err", "ERR", "fail", "Fail", "FAIL"},
"fatal": {"FATAL", "Fatal", "fatal", "critical", "Critical", "CRITICAL", "crit", "Crit", "CRIT",
"panic", "Panic", "PANIC"},
}
var validMappingLevels = []string{"trace", "debug", "info", "warn", "error", "fatal"}
var validMappingVariableTypes = []string{Host, Service, Environment, Severity, TraceID, SpanID, TraceFlags, Message}
type StoreablePipeline struct {
bun.BaseModel `bun:"table:pipelines,alias:p"`
@@ -91,9 +117,54 @@ type PipelineOperator struct {
Layout string `json:"layout,omitempty" yaml:"layout,omitempty"`
LayoutType string `json:"layout_type,omitempty" yaml:"layout_type,omitempty"`
// json_parser fields
EnableFlattening bool `json:"enable_flattening,omitempty" yaml:"enable_flattening,omitempty"`
MaxFlatteningDepth int `json:"-" yaml:"max_flattening_depth,omitempty"` // MaxFlatteningDepth is not configurable from User's side
EnablePaths bool `json:"enable_paths,omitempty" yaml:"enable_paths,omitempty"`
PathPrefix string `json:"path_prefix,omitempty" yaml:"path_prefix,omitempty"`
// Used in Severity Parsing and JSON Flattening mapping
Mapping map[string][]string `json:"mapping,omitempty" yaml:"mapping,omitempty"`
// severity parser fields
SeverityMapping map[string][]string `json:"mapping,omitempty" yaml:"mapping,omitempty"`
OverwriteSeverityText bool `json:"overwrite_text,omitempty" yaml:"overwrite_text,omitempty"`
OverwriteSeverityText bool `json:"overwrite_text,omitempty" yaml:"overwrite_text,omitempty"`
}
func (op PipelineOperator) MarshalJSON() ([]byte, error) {
type Alias PipelineOperator
p := Alias(op)
if p.TraceParser != nil {
if p.TraceId != nil && len(p.TraceId.ParseFrom) < 1 {
p.TraceId = nil
}
if p.SpanId != nil && len(p.SpanId.ParseFrom) < 1 {
p.SpanId = nil
}
if p.TraceFlags != nil && len(p.TraceFlags.ParseFrom) < 1 {
p.TraceFlags = nil
}
}
return json.Marshal(p)
}
func (op PipelineOperator) MarshalYAML() (interface{}, error) {
type Alias PipelineOperator
alias := Alias(op)
if alias.TraceParser != nil {
if alias.TraceParser.TraceId != nil && len(alias.TraceParser.TraceId.ParseFrom) < 1 {
alias.TraceParser.TraceId = nil
}
if alias.TraceParser.SpanId != nil && len(alias.TraceParser.SpanId.ParseFrom) < 1 {
alias.TraceParser.SpanId = nil
}
if alias.TraceParser.TraceFlags != nil && len(alias.TraceParser.TraceFlags.ParseFrom) < 1 {
alias.TraceParser.TraceFlags = nil
}
}
return alias, nil
}
type TimestampParser struct {
@@ -206,6 +277,12 @@ func isValidOperator(op PipelineOperator) error {
if op.ParseFrom == "" && op.ParseTo == "" {
return fmt.Errorf("parse from and parse to of %s json operator cannot be empty", op.ID)
}
for k := range op.Mapping {
if !slices.Contains(validMappingVariableTypes, strings.ToLower(k)) {
return fmt.Errorf("%s is not a valid mapping type in processor %s", k, op.ID)
}
}
case "grok_parser":
if op.Pattern == "" {
return fmt.Errorf("pattern of %s grok operator cannot be empty", op.ID)
@@ -306,8 +383,7 @@ func isValidOperator(op PipelineOperator) error {
return fmt.Errorf("parse from of severity parsing processor %s cannot be empty", op.ID)
}
validMappingLevels := []string{"trace", "debug", "info", "warn", "error", "fatal"}
for k := range op.SeverityMapping {
for k := range op.Mapping {
if !slices.Contains(validMappingLevels, strings.ToLower(k)) {
return fmt.Errorf("%s is not a valid severity in processor %s", k, op.ID)
}

View File

@@ -332,7 +332,7 @@ var operatorTest = []struct {
ID: "severity",
Type: "severity_parser",
ParseFrom: "attributes.test_severity",
SeverityMapping: map[string][]string{
Mapping: map[string][]string{
"trace": {"test_trace"},
"fatal": {"test_fatal"},
},
@@ -344,7 +344,7 @@ var operatorTest = []struct {
Operator: PipelineOperator{
ID: "severity",
Type: "severity_parser",
SeverityMapping: map[string][]string{},
Mapping: map[string][]string{},
OverwriteSeverityText: true,
},
IsValid: false,
@@ -354,7 +354,7 @@ var operatorTest = []struct {
ID: "severity",
Type: "severity_parser",
ParseFrom: "attributes.test",
SeverityMapping: map[string][]string{
Mapping: map[string][]string{
"not-a-level": {"bad-level"},
},
OverwriteSeverityText: true,