Compare commits

...

32 Commits

Author SHA1 Message Date
nityanandagohain
ead1015ec1 Merge remote-tracking branch 'origin/develop' into fix/new_schema 2024-08-22 16:48:32 +05:30
nityanandagohain
6a9fe62fd8 fix: update table name 2024-08-22 13:55:41 +05:30
nityanandagohain
437fd24b42 Merge remote-tracking branch 'origin/develop' into fix/new_schema 2024-08-14 16:13:00 +05:30
nityanandagohain
e5b362bd92 fix: test for moving all way to new table 2024-08-07 15:51:26 +05:30
nityanandagohain
2e162a8794 Merge branch 'fix/remove_old_table' into fix/new_schema 2024-08-07 14:55:42 +05:30
nityanandagohain
5b0f024ff9 feat: add mapContains for attribute filters 2024-08-06 16:36:24 +05:30
nityanandagohain
28c33ca006 feat: use new table for all queries 2024-08-06 15:56:19 +05:30
vikrantgupta25
59d05e702b fix: lastlogtime should only be sent when orderby timestamp is desc 2024-07-31 18:54:57 +05:30
vikrantgupta25
1930c029f2 feat: send the end time stamp for the last log in pagination 2024-07-31 18:04:37 +05:30
nityanandagohain
23c4745512 fix: pagination conditions updated 2024-07-31 17:47:33 +05:30
nityanandagohain
d50047a6e7 fix: materialize number columns 2024-07-31 15:40:17 +05:30
nityanandagohain
379b49a0d0 feat: token search for all attributes 2024-07-30 21:39:22 +05:30
nityanandagohain
243fd76a1a fix: add changes to querier v2 2024-07-30 18:00:48 +05:30
nityanandagohain
3d8e9d918c feat: timrange modification for list queries 2024-07-30 17:34:48 +05:30
nityanandagohain
91948f3991 fix: minor fix with in operator 2024-07-29 18:09:50 +05:30
nityanandagohain
27190c10ee fix: fixes with resource attribute search 2024-07-29 17:26:37 +05:30
nityanandagohain
88daeb19bc fix: case insensitive search for resource attributes 2024-07-29 15:45:46 +05:30
nityanandagohain
ed61db52cc fix: dont add resource attribute filter in main table query 2024-07-27 17:55:18 +05:30
nityanandagohain
e4374d1115 fix: make attribute search case insensitive 2024-07-26 16:58:46 +05:30
nityanandagohain
ce3d9a850e feat: support for proper indexing 2024-07-26 16:47:19 +05:30
nityanandagohain
dc16457b47 feat: faster filtering for resource fingerprint 2024-07-25 18:36:35 +05:30
nityanandagohain
99e80e52ca feat: support for exists and nexists in resource fingerprint table 2024-07-25 17:54:02 +05:30
nityanandagohain
b9373a0e41 fix: use OR in group by keys for fingerprint filtering 2024-07-25 16:49:55 +05:30
nityanandagohain
e4c6f402b1 fix: order by changes added 2024-07-25 15:17:11 +05:30
nityanandagohain
ac9396bfdb fix: tests updated 2024-07-25 14:36:06 +05:30
nityanandagohain
cbd5922fe5 Merge remote-tracking branch 'origin/develop' into fix/new_schema 2024-07-25 14:32:09 +05:30
nityanandagohain
4cb5e06402 fix: update updateField function to handle new attribute type 2024-07-24 18:05:52 +05:30
nityanandagohain
890d4a53e5 fix: add support for maps and numbers 2024-07-24 16:38:22 +05:30
nityanandagohain
c8d5ddfbe0 fix: update distributed table name 2024-07-22 17:32:00 +05:30
nityanandagohain
afcb64281c feat: change with group by , aggregate attribute and fields api 2024-07-22 00:27:18 +05:30
nityanandagohain
cf43d6a3c6 feat: qs changes v1 changes for new schema 2024-07-19 17:43:35 +05:30
nityanandagohain
f863b5d8aa fix: temp commit for schema 2024-07-18 17:24:40 +05:30
17 changed files with 1052 additions and 262 deletions

View File

@@ -134,6 +134,9 @@ function LogsExplorerViews({
const [logs, setLogs] = useState<ILog[]>([]);
const [requestData, setRequestData] = useState<Query | null>(null);
const [showFormatMenuItems, setShowFormatMenuItems] = useState(false);
const [lastLogLineTimestamp, setLastLogLineTimestamp] = useState<
string | number
>();
const [queryId, setQueryId] = useState<string>(v4());
const [queryStats, setQueryStats] = useState<WsDataEvent>();
@@ -260,6 +263,12 @@ function LogsExplorerViews({
start: timeRange.start,
end: timeRange.end,
}),
lastLogLineTimestamp:
requestData?.builder?.queryData[0]?.orderBy?.[0]?.columnName ===
'timestamp' &&
requestData?.builder?.queryData[0]?.orderBy?.[0]?.order === 'desc'
? lastLogLineTimestamp
: undefined,
},
undefined,
listQueryKeyRef,
@@ -269,6 +278,10 @@ function LogsExplorerViews({
},
);
useEffect(() => {
setLastLogLineTimestamp(undefined);
}, [data]);
const getRequestData = useCallback(
(
query: Query | null,
@@ -337,6 +350,8 @@ function LogsExplorerViews({
pageSize: nextPageSize,
});
setLastLogLineTimestamp(lastLog?.timestamp);
setPage((prevPage) => prevPage + 1);
setRequestData(newRequestData);

View File

@@ -1,6 +1,7 @@
import getStartEndRangeTime from 'lib/getStartEndRangeTime';
import getStep from 'lib/getStep';
import { mapQueryDataToApi } from 'lib/newQueryBuilder/queryBuilderMappers/mapQueryDataToApi';
import { isUndefined } from 'lodash-es';
import store from 'store';
import { QueryRangePayload } from 'types/api/metrics/getQueryRange';
import { EQueryType } from 'types/common/dashboard';
@@ -24,7 +25,11 @@ export const prepareQueryRangePayload = ({
fillGaps = false,
}: GetQueryResultsProps): PrepareQueryRangePayload => {
let legendMap: Record<string, string> = {};
const { allowSelectedIntervalForStepGen, ...restParams } = params;
const {
allowSelectedIntervalForStepGen,
lastLogLineTimestamp,
...restParams
} = params;
const compositeQuery: QueryRangePayload['compositeQuery'] = {
queryType: query.queryType,
@@ -90,9 +95,13 @@ export const prepareQueryRangePayload = ({
interval: globalSelectedInterval,
});
const endLogTimeStamp = !isUndefined(lastLogLineTimestamp)
? new Date(lastLogLineTimestamp as any)?.getTime()
: undefined;
const queryPayload: QueryRangePayload = {
start: parseInt(start, 10) * 1e3,
end: parseInt(end, 10) * 1e3,
end: endLogTimeStamp || parseInt(end, 10) * 1e3,
step: getStep({
start: allowSelectedIntervalForStepGen
? start

View File

@@ -32,7 +32,9 @@ const (
defaultSpanAttributeKeysTable string = "distributed_span_attributes_keys"
defaultLogsDB string = "signoz_logs"
defaultLogsTable string = "distributed_logs"
defaultLogsTableV2 string = "distributed_logs_v2"
defaultLogsLocalTable string = "logs"
defaultLogsLocalTableV2 string = "logs_v2"
defaultLogAttributeKeysTable string = "distributed_logs_attribute_keys"
defaultLogResourceKeysTable string = "distributed_logs_resource_keys"
defaultLogTagAttributeTable string = "distributed_tag_attributes"
@@ -63,7 +65,9 @@ type namespaceConfig struct {
TopLevelOperationsTable string
LogsDB string
LogsTable string
LogsTableV2 string
LogsLocalTable string
logsLocalTableV2 string
LogsAttributeKeysTable string
LogsResourceKeysTable string
LogsTagAttributeTable string
@@ -150,7 +154,9 @@ func NewOptions(
TopLevelOperationsTable: defaultTopLevelOperationsTable,
LogsDB: defaultLogsDB,
LogsTable: defaultLogsTable,
LogsTableV2: defaultLogsTableV2,
LogsLocalTable: defaultLogsLocalTable,
logsLocalTableV2: defaultLogsLocalTableV2,
LogsAttributeKeysTable: defaultLogAttributeKeysTable,
LogsResourceKeysTable: defaultLogResourceKeysTable,
LogsTagAttributeTable: defaultLogTagAttributeTable,

View File

@@ -116,7 +116,9 @@ type ClickHouseReader struct {
topLevelOperationsTable string
logsDB string
logsTable string
logsTableV2 string
logsLocalTable string
logsLocalTableV2 string
logsAttributeKeys string
logsResourceKeys string
logsTagAttributeTable string
@@ -209,7 +211,9 @@ func NewReaderFromClickhouseConnection(
topLevelOperationsTable: options.primary.TopLevelOperationsTable,
logsDB: options.primary.LogsDB,
logsTable: options.primary.LogsTable,
logsTableV2: options.primary.LogsTableV2,
logsLocalTable: options.primary.LogsLocalTable,
logsLocalTableV2: options.primary.logsLocalTableV2,
logsAttributeKeys: options.primary.LogsAttributeKeysTable,
logsResourceKeys: options.primary.LogsResourceKeysTable,
logsTagAttributeTable: options.primary.LogsTagAttributeTable,
@@ -3512,7 +3516,7 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe
resources = removeUnderscoreDuplicateFields(resources)
statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable)
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTableV2)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
@@ -3552,21 +3556,23 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
colname := utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name)
dataType := strings.ToLower(field.DataType)
if dataType == "int64" || dataType == "float64" {
dataType = "number"
}
attrColName := fmt.Sprintf("%s_%s", field.Type, dataType)
// if a field is selected it means that the field needs to be indexed
if field.Selected {
keyColName := fmt.Sprintf("%s_%s_key", field.Type, strings.ToLower(field.DataType))
valueColName := fmt.Sprintf("%s_%s_value", field.Type, strings.ToLower(field.DataType))
// create materialized column
for _, table := range []string{r.logsLocalTable, r.logsTable} {
q := "ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s DEFAULT %s[indexOf(%s, '%s')] CODEC(ZSTD(1))"
for _, table := range []string{r.logsLocalTableV2, r.logsTableV2} {
q := "ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s DEFAULT %s['%s'] CODEC(ZSTD(1))"
query := fmt.Sprintf(q,
r.logsDB, table,
r.cluster,
colname, field.DataType,
valueColName,
keyColName,
attrColName,
field.Name,
)
err := r.db.Exec(ctx, query)
@@ -3574,11 +3580,11 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s_exists` bool DEFAULT if(indexOf(%s, '%s') != 0, true, false) CODEC(ZSTD(1))",
query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s_exists` bool DEFAULT if(mapContains(%s, '%s') != 0, true, false) CODEC(ZSTD(1))",
r.logsDB, table,
r.cluster,
strings.TrimSuffix(colname, "`"),
keyColName,
attrColName,
field.Name,
)
err = r.db.Exec(ctx, query)
@@ -3600,7 +3606,7 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
field.IndexGranularity = constants.DefaultLogSkipIndexGranularity
}
query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS %s_idx` (%s) TYPE %s GRANULARITY %d",
r.logsDB, r.logsLocalTable,
r.logsDB, r.logsLocalTableV2,
r.cluster,
strings.TrimSuffix(colname, "`"),
colname,
@@ -4203,7 +4209,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v
defer rows.Close()
statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable)
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTableV2)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
@@ -4257,7 +4263,7 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt
defer rows.Close()
statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable)
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTableV2)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
@@ -4309,7 +4315,7 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi
}
// ignore autocomplete request for body
if req.FilterAttributeKey == "body" {
if req.FilterAttributeKey == "body" || req.FilterAttributeKey == "__attrs" {
return &v3.FilterAttributeValueResponse{}, nil
}
@@ -4344,10 +4350,10 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi
// prepare the query and run
if len(req.SearchText) != 0 {
query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) and %s ILIKE $1 limit $2", selectKey, r.logsDB, r.logsTable, filterValueColumnWhere)
query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) and %s ILIKE $1 limit $2", selectKey, r.logsDB, r.logsLocalTableV2, filterValueColumnWhere)
rows, err = r.db.Query(ctx, query, searchText, req.Limit)
} else {
query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) limit $1", selectKey, r.logsDB, r.logsTable)
query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) limit $1", selectKey, r.logsDB, r.logsLocalTableV2)
rows, err = r.db.Query(ctx, query, req.Limit)
}
} else if len(req.SearchText) != 0 {

View File

@@ -254,7 +254,7 @@ func TestReplaceInterestingFields(t *testing.T) {
},
}
expectedTokens := []string{"attributes_int64_value[indexOf(attributes_int64_key, 'id.userid')] IN (100) ", "and `attribute_int64_id_key` >= 50 ", `AND body ILIKE '%searchstring%'`}
expectedTokens := []string{"attributes_int64_value[indexOf(attributes_int64_key, 'id.userid')] IN (100) ", "and `attribute_number_id_key` >= 50 ", `AND body ILIKE '%searchstring%'`}
Convey("testInterestingFields", t, func() {
tokens, err := replaceInterestingFields(&allFields, queryTokens)
So(err, ShouldBeNil)
@@ -376,7 +376,7 @@ var generateSQLQueryTestCases = []struct {
IdGt: "2BsKLKv8cZrLCn6rkOcRGkdjBdM",
IdLT: "2BsKG6tRpFWjYMcWsAGKfSxoQdU",
},
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' and id > '2BsKLKv8cZrLCn6rkOcRGkdjBdM' and id < '2BsKG6tRpFWjYMcWsAGKfSxoQdU' ) and ( `attribute_int64_field1` < 100 and `attribute_int64_field1` > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' and id > '2BsKLKv8cZrLCn6rkOcRGkdjBdM' and id < '2BsKG6tRpFWjYMcWsAGKfSxoQdU' ) and ( `attribute_number_field1` < 100 and `attribute_number_field1` > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
},
{
Name: "second query with only timestamp range",
@@ -385,7 +385,7 @@ var generateSQLQueryTestCases = []struct {
TimestampStart: uint64(1657689292000),
TimestampEnd: uint64(1657689294000),
},
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( `attribute_int64_field1` < 100 and `attribute_int64_field1` > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( `attribute_number_field1` < 100 and `attribute_number_field1` > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
},
{
Name: "generate case sensitive query",
@@ -394,7 +394,7 @@ var generateSQLQueryTestCases = []struct {
TimestampStart: uint64(1657689292000),
TimestampEnd: uint64(1657689294000),
},
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( `attribute_int64_field1` < 100 and attributes_int64_value[indexOf(attributes_int64_key, 'FielD1')] > 50 and `attribute_double64_Field2` > 10 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( `attribute_number_field1` < 100 and attributes_int64_value[indexOf(attributes_int64_key, 'FielD1')] > 50 and `attribute_double64_Field2` > 10 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
},
{
Name: "Check exists and not exists",

View File

@@ -45,10 +45,10 @@ var jsonLogOperators = map[v3.FilterOperator]string{
v3.FilterOperatorLessThanOrEq: "<=",
v3.FilterOperatorGreaterThan: ">",
v3.FilterOperatorGreaterThanOrEq: ">=",
v3.FilterOperatorLike: "ILIKE",
v3.FilterOperatorNotLike: "NOT ILIKE",
v3.FilterOperatorContains: "ILIKE",
v3.FilterOperatorNotContains: "NOT ILIKE",
v3.FilterOperatorLike: "LIKE",
v3.FilterOperatorNotLike: "NOT LIKE",
v3.FilterOperatorContains: "LIKE",
v3.FilterOperatorNotContains: "NOT LIKE",
v3.FilterOperatorRegex: "match(%s, %s)",
v3.FilterOperatorNotRegex: "NOT match(%s, %s)",
v3.FilterOperatorIn: "IN",

View File

@@ -298,7 +298,7 @@ var testGetJSONFilterData = []struct {
Operator: "contains",
Value: "a",
},
Filter: "lower(body) like lower('%message%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') ILIKE '%a%'",
Filter: "lower(body) like lower('%message%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') LIKE '%a%'",
},
{
Name: "contains operator with quotes",
@@ -311,7 +311,7 @@ var testGetJSONFilterData = []struct {
Operator: "contains",
Value: "hello 'world'",
},
Filter: "lower(body) like lower('%message%') AND lower(body) like lower('%hello \\'world\\'%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') ILIKE '%hello \\'world\\'%'",
Filter: "lower(body) like lower('%message%') AND lower(body) like lower('%hello \\'world\\'%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') LIKE '%hello \\'world\\'%'",
},
{
Name: "exists",

View File

@@ -39,19 +39,23 @@ var logOperators = map[v3.FilterOperator]string{
v3.FilterOperatorLessThanOrEq: "<=",
v3.FilterOperatorGreaterThan: ">",
v3.FilterOperatorGreaterThanOrEq: ">=",
v3.FilterOperatorLike: "ILIKE",
v3.FilterOperatorNotLike: "NOT ILIKE",
v3.FilterOperatorContains: "ILIKE",
v3.FilterOperatorNotContains: "NOT ILIKE",
v3.FilterOperatorLike: "LIKE",
v3.FilterOperatorNotLike: "NOT LIKE",
v3.FilterOperatorContains: "LIKE",
v3.FilterOperatorNotContains: "NOT LIKE",
v3.FilterOperatorRegex: "match(%s, %s)",
v3.FilterOperatorNotRegex: "NOT match(%s, %s)",
v3.FilterOperatorIn: "IN",
v3.FilterOperatorNotIn: "NOT IN",
v3.FilterOperatorExists: "has(%s_%s_key, '%s')",
v3.FilterOperatorNotExists: "not has(%s_%s_key, '%s')",
v3.FilterOperatorExists: "mapContains(%s_%s, '%s')",
v3.FilterOperatorNotExists: "not mapContains(%s_%s, '%s')",
}
const BODY = "body"
const (
BODY = "body"
DISTRIBUTED_LOGS_V2 = "distributed_logs_v2"
DISTRIBUTED_LOGS_V2_RESOURCE = "distributed_logs_v2_resource"
)
func getClickhouseLogsColumnType(columnType v3.AttributeKeyType) string {
if columnType == v3.AttributeKeyTypeTag {
@@ -61,11 +65,8 @@ func getClickhouseLogsColumnType(columnType v3.AttributeKeyType) string {
}
func getClickhouseLogsColumnDataType(columnDataType v3.AttributeKeyDataType) string {
if columnDataType == v3.AttributeKeyDataTypeFloat64 {
return "float64"
}
if columnDataType == v3.AttributeKeyDataTypeInt64 {
return "int64"
if columnDataType == v3.AttributeKeyDataTypeFloat64 || columnDataType == v3.AttributeKeyDataTypeInt64 {
return "number"
}
if columnDataType == v3.AttributeKeyDataTypeBool {
return "bool"
@@ -85,7 +86,7 @@ func getClickhouseColumnName(key v3.AttributeKey) string {
if !key.IsColumn {
columnType := getClickhouseLogsColumnType(key.Type)
columnDataType := getClickhouseLogsColumnDataType(key.DataType)
clickhouseColumn = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", columnType, columnDataType, columnType, columnDataType, key.Key)
clickhouseColumn = fmt.Sprintf("%s_%s['%s']", columnType, columnDataType, key.Key)
return clickhouseColumn
}
@@ -164,6 +165,12 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items {
// skip if it's a resource attribute
if item.Key.Type == v3.AttributeKeyTypeResource {
continue
}
if item.Key.IsJSON {
filter, err := GetJSONFilter(item)
if err != nil {
@@ -173,6 +180,16 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
continue
}
// check if the user is searching for all attributes
if item.Key.Key == "__attrs" {
if (item.Operator != v3.FilterOperatorEqual && item.Operator != v3.FilterOperatorContains) || item.Key.DataType != v3.AttributeKeyDataTypeString {
return "", fmt.Errorf("only = operator and string data type is supported for __attrs")
}
val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
conditions = append(conditions, fmt.Sprintf("has(mapValues(attributes_string), '%v')", val))
continue
}
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
var value interface{}
@@ -196,7 +213,6 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
columnName := getClickhouseColumnName(item.Key)
val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
if columnName == BODY {
logsOp = strings.Replace(logsOp, "ILIKE", "LIKE", 1) // removing i from ilike and not ilike
conditions = append(conditions, fmt.Sprintf("lower(%s) %s lower('%%%s%%')", columnName, logsOp, val))
} else {
conditions = append(conditions, fmt.Sprintf("%s %s '%%%s%%'", columnName, logsOp, val))
@@ -208,7 +224,6 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
// for use lower for like and ilike
if op == v3.FilterOperatorLike || op == v3.FilterOperatorNotLike {
if columnName == BODY {
logsOp = strings.Replace(logsOp, "ILIKE", "LIKE", 1) // removing i from ilike and not ilike
columnName = fmt.Sprintf("lower(%s)", columnName)
fmtVal = fmt.Sprintf("lower(%s)", fmtVal)
}
@@ -218,15 +233,27 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
} else {
return "", fmt.Errorf("unsupported operator: %s", op)
}
// add extra condition for map contains
// by default clickhouse is not able to utilize indexes for keys with all operators.
// mapContains forces the use of index.
if item.Key.IsColumn == false && op != v3.FilterOperatorExists && op != v3.FilterOperatorNotExists {
conditions = append(conditions, GetExistsNexistsFilter(v3.FilterOperatorExists, item))
}
}
}
// add group by conditions to filter out log lines which doesn't have the key
for _, attr := range groupBy {
// skip if it's a resource attribute
if attr.Type == v3.AttributeKeyTypeResource {
continue
}
if !attr.IsColumn {
columnType := getClickhouseLogsColumnType(attr.Type)
columnDataType := getClickhouseLogsColumnDataType(attr.DataType)
conditions = append(conditions, fmt.Sprintf("has(%s_%s_key, '%s')", columnType, columnDataType, attr.Key))
conditions = append(conditions, fmt.Sprintf("mapContains(%s_%s, '%s')", columnType, columnDataType, attr.Key))
} else if attr.Type != v3.AttributeKeyTypeUnspecified {
// for materialzied columns
conditions = append(conditions, fmt.Sprintf("%s_exists`=true", strings.TrimSuffix(getClickhouseColumnName(attr), "`")))
@@ -234,7 +261,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
}
// add conditions for aggregate attribute
if aggregateAttribute.Key != "" {
if aggregateAttribute.Key != "" && aggregateAttribute.Type != v3.AttributeKeyTypeResource {
existsFilter := GetExistsNexistsFilter(v3.FilterOperatorExists, v3.FilterItem{Key: aggregateAttribute})
conditions = append(conditions, existsFilter)
}
@@ -243,6 +270,148 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
return queryString, nil
}
func buildResourceBucketFilters(fs *v3.FilterSet, groupBy []v3.AttributeKey, orderBy []v3.OrderBy, aggregateAttribute v3.AttributeKey) (string, error) {
var andConditions []string
var orConditions []string
// only add the resource attributes to the filters here
if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items {
// skip anything other than resource attribute
if item.Key.Type != v3.AttributeKeyTypeResource {
continue
}
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
var value interface{}
var err error
if op != v3.FilterOperatorExists && op != v3.FilterOperatorNotExists {
value, err = utils.ValidateAndCastValue(item.Value, item.Key.DataType)
if err != nil {
return "", fmt.Errorf("failed to validate and cast value for %s: %v", item.Key.Key, err)
}
}
columnName := fmt.Sprintf("simpleJSONExtractString(lower(labels), '%s')", item.Key.Key)
if logsOp, ok := logOperators[op]; ok {
switch op {
case v3.FilterOperatorExists:
andConditions = append(andConditions, fmt.Sprintf("simpleJSONHas(labels, '%s')", item.Key.Key))
case v3.FilterOperatorNotExists:
andConditions = append(andConditions, fmt.Sprintf("not simpleJSONHas(labels, '%s')", item.Key.Key))
case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex:
fmtVal := utils.ClickHouseFormattedValue(value)
// for regex don't lowercase it as it will result in something else
andConditions = append(andConditions, fmt.Sprintf(logsOp, columnName, fmtVal))
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
andConditions = append(andConditions, fmt.Sprintf("%s %s '%%%s%%'", columnName, logsOp, strings.ToLower(val)))
default:
fmtVal := utils.ClickHouseFormattedValue(value)
andConditions = append(andConditions, fmt.Sprintf("%s %s %s", columnName, logsOp, strings.ToLower(fmtVal)))
}
// add index filters
switch op {
case v3.FilterOperatorContains, v3.FilterOperatorEqual, v3.FilterOperatorLike:
val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
andConditions = append(andConditions, fmt.Sprintf("lower(labels) like '%%%s%%%s%%'", strings.ToLower(item.Key.Key), strings.ToLower(val)))
case v3.FilterOperatorNotContains, v3.FilterOperatorNotEqual, v3.FilterOperatorNotLike:
val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
andConditions = append(andConditions, fmt.Sprintf("lower(labels) not like '%%%s%%%s%%'", strings.ToLower(item.Key.Key), strings.ToLower(val)))
case v3.FilterOperatorNotRegex:
andConditions = append(andConditions, fmt.Sprintf("lower(labels) not like '%%%s%%'", strings.ToLower(item.Key.Key)))
case v3.FilterOperatorIn, v3.FilterOperatorNotIn:
// for in operator value needs to used for indexing in a different way.
// eg1:= x in a,b,c = (labels like '%x%a%' or labels like '%"x":"b"%' or labels like '%"x"="c"%')
// eg1:= x nin a,b,c = (labels nlike '%x%a%' AND labels nlike '%"x"="b"' AND labels nlike '%"x"="c"%')
tCondition := []string{}
separator := " OR "
sqlOp := "like"
if op == v3.FilterOperatorNotIn {
separator = " AND "
sqlOp = "not like"
}
values := []string{}
switch item.Value.(type) {
case string:
values = append(values, item.Value.(string))
case []interface{}:
for _, v := range (item.Value).([]interface{}) {
// also resources attributes are always string values
strV, ok := v.(string)
if !ok {
continue
}
values = append(values, strV)
}
}
if len(values) > 0 {
for _, v := range values {
tCondition = append(tCondition, fmt.Sprintf("lower(labels) %s '%%\"%s\":\"%s\"%%'", sqlOp, strings.ToLower(item.Key.Key), strings.ToLower(v)))
}
andConditions = append(andConditions, "("+strings.Join(tCondition, separator)+")")
}
default:
andConditions = append(andConditions, fmt.Sprintf("lower(labels) like '%%%s%%'", strings.ToLower(item.Key.Key)))
}
} else {
return "", fmt.Errorf("unsupported operator: %s", op)
}
}
}
// for group by add exists check in resources
if aggregateAttribute.Key != "" && aggregateAttribute.Type == v3.AttributeKeyTypeResource {
andConditions = append(andConditions, fmt.Sprintf("(simpleJSONHas(labels, '%s') AND lower(labels) like '%%%s%%')", aggregateAttribute.Key, strings.ToLower(aggregateAttribute.Key)))
}
// for aggregate attribute add exists check in resources
// we are using OR for groupBy and orderby as we just want to filter out the logs using fingerprint
for _, attr := range groupBy {
if attr.Type != v3.AttributeKeyTypeResource {
continue
}
orConditions = append(orConditions, fmt.Sprintf("(simpleJSONHas(labels, '%s') AND lower(labels) like '%%%s%%')", attr.Key, strings.ToLower(attr.Key)))
}
// for orderBy it's not required as the keys will be already present in the group by
// so no point in adding them
if len(orConditions) > 0 {
// TODO: change OR to AND once we know how to solve for group by
orConditionStr := "( " + strings.Join(orConditions, " AND ") + " )"
andConditions = append(andConditions, orConditionStr)
}
if len(andConditions) == 0 {
return "", nil
}
conditionStr := strings.Join(andConditions, " AND ")
return conditionStr, nil
}
func filtersUseNewTable(filters *v3.FilterSet) bool {
if filters == nil {
return false
}
for _, item := range filters.Items {
if item.Key.Key != "id" {
return true
}
}
return false
}
func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.BuilderQuery, graphLimitQtype string, preferRPM bool) (string, error) {
filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, mq.AggregateAttribute)
@@ -254,7 +423,31 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
}
// timerange will be sent in epoch millisecond
timeFilter := fmt.Sprintf("(timestamp >= %d AND timestamp <= %d)", utils.GetEpochNanoSecs(start), utils.GetEpochNanoSecs(end))
logsStart := utils.GetEpochNanoSecs(start)
logsEnd := utils.GetEpochNanoSecs(end)
// -1800 this is added so that the bucket start considers all the fingerprints.
bucketStart := logsStart/1000000000 - 1800
bucketEnd := logsEnd / 1000000000
timeFilter := fmt.Sprintf("(timestamp >= %d AND timestamp <= %d)", logsStart, logsEnd)
resourceBucketFilters, err := buildResourceBucketFilters(mq.Filters, mq.GroupBy, mq.OrderBy, mq.AggregateAttribute)
if err != nil {
return "", err
}
tableName := DISTRIBUTED_LOGS_V2
timeFilter = timeFilter + fmt.Sprintf(" AND (ts_bucket_start >= %d AND ts_bucket_start <= %d)", bucketStart, bucketEnd)
if len(resourceBucketFilters) > 0 {
filter := " AND (resource_fingerprint GLOBAL IN (" +
"SELECT fingerprint FROM signoz_logs.%s " +
"WHERE (seen_at_ts_bucket_start >= %d) AND (seen_at_ts_bucket_start <= %d) AND "
filter = fmt.Sprintf(filter, DISTRIBUTED_LOGS_V2_RESOURCE, bucketStart, bucketEnd)
filterSubQuery = filterSubQuery + filter + resourceBucketFilters + "))"
}
selectLabels := getSelectLabels(mq.AggregateOperator, mq.GroupBy)
@@ -280,8 +473,8 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
queryTmpl =
queryTmpl + selectLabels +
" %s as value " +
"from signoz_logs.distributed_logs " +
"where " + timeFilter + "%s" +
"from signoz_logs." + tableName +
" where " + timeFilter + "%s" +
"%s%s" +
"%s"
@@ -358,8 +551,12 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case v3.AggregateOperatorNoOp:
queryTmpl := constants.LogsSQLSelect + "from signoz_logs.distributed_logs where %s%s order by %s"
query := fmt.Sprintf(queryTmpl, timeFilter, filterSubQuery, orderBy)
// sqlSelect := constants.LogsSQLSelect
// with noop any filter or different order by other than ts will use new table
sqlSelect := constants.LogsSQLSelectV2
tableName = DISTRIBUTED_LOGS_V2
queryTmpl := sqlSelect + "from signoz_logs.%s where %s%s order by %s"
query := fmt.Sprintf(queryTmpl, tableName, timeFilter, filterSubQuery, orderBy)
return query, nil
default:
return "", fmt.Errorf("unsupported aggregate operator")
@@ -374,7 +571,7 @@ func buildLogsLiveTailQuery(mq *v3.BuilderQuery) (string, error) {
switch mq.AggregateOperator {
case v3.AggregateOperatorNoOp:
query := constants.LogsSQLSelect + "from signoz_logs.distributed_logs where "
query := constants.LogsSQLSelectV2 + "from signoz_logs." + DISTRIBUTED_LOGS_V2 + " where "
if len(filterSubQuery) > 0 {
query = query + filterSubQuery + " AND "
}

File diff suppressed because it is too large Load Diff

View File

@@ -15,6 +15,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
"go.signoz.io/signoz/pkg/query-service/utils"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/interfaces"
@@ -457,7 +458,118 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
return results, errQueriesByName, err
}
type startEndArr struct {
Start int64
End int64
}
func getTsStartEndArray(start, end int64) []startEndArr {
startNano := utils.GetEpochNanoSecs(start)
endNano := utils.GetEpochNanoSecs(end)
result := []startEndArr{}
hourNano := int64(3600000000000)
if endNano-startNano > hourNano {
bucket := hourNano
tStartNano := endNano - bucket
complete := false
for {
result = append(result, startEndArr{Start: tStartNano, End: endNano})
if complete {
break
}
bucket = bucket * 2
endNano = tStartNano
tStartNano = tStartNano - bucket
// break condition
if tStartNano <= startNano {
complete = true
tStartNano = startNano
}
}
}
return result
}
func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey, startEndArr []startEndArr) ([]*v3.Result, map[string]error, error) {
// check the timerange
res := make([]*v3.Result, 0)
qName := ""
pageSize := uint64(0)
// it will run only once
for name, v := range params.CompositeQuery.BuilderQueries {
qName = name
pageSize = v.PageSize
}
data := []*v3.Row{}
for _, v := range startEndArr {
params.Start = v.Start
params.End = v.End
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
queries, err := q.builder.PrepareQueries(params, keys)
if err != nil {
return nil, nil, err
}
// only one query
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query)
if err != nil {
errs := []error{err}
errQuriesByName := map[string]error{
name: err,
}
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
data = append(data, rowList...)
}
// append a filter to the params
if len(data) > 0 {
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "id",
IsColumn: true,
DataType: "string",
},
Operator: v3.FilterOperatorLessThan,
Value: data[len(data)-1].Data["id"],
})
}
if uint64(len(data)) >= pageSize {
fmt.Println(len(data), pageSize)
break
}
}
res = append(res, &v3.Result{
QueryName: qName,
List: data,
})
return res, nil, nil
}
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error) {
// Note: as of now only supporting one list query
if params.CompositeQuery != nil {
if len(params.CompositeQuery.BuilderQueries) == 1 {
for _, v := range params.CompositeQuery.BuilderQueries {
// only allow of logs queries with timestamp ordering desc
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" {
startEndArr := getTsStartEndArray(params.Start, params.End)
if len(startEndArr) > 0 {
return q.runLogsListQuery(ctx, params, keys, startEndArr)
}
}
}
}
}
queries, err := q.builder.PrepareQueries(params, keys)

View File

@@ -1050,3 +1050,49 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
}
}
}
func TestTsStartEnd(t *testing.T) {
startEndData := []struct {
name string
start int64
end int64
res []startEndArr
}{
{
name: "testing for less then one hour",
start: 1722262800000000000, // July 29, 2024 7:50:00 PM
end: 1722263800000000000, // July 29, 2024 8:06:40 PM
res: []startEndArr{},
},
{
name: "testing for more than one hour",
start: 1722255800000000000, // July 29, 2024 5:53:20 PM
end: 1722262800000000000, // July 29, 2024 8:06:40 PM
res: []startEndArr{
{1722259200000000000, 1722262800000000000}, // July 29, 2024 6:50:00 PM - July 29, 2024 7:50:00 PM
{1722255800000000000, 1722259200000000000}, // July 29, 2024 5:53:20 PM - July 29, 2024 6:50:00 PM
},
},
{
name: "testing for 1 day",
start: 1722171576000000000,
end: 1722262800000000000,
res: []startEndArr{
{1722259200000000000, 1722262800000000000}, // July 29, 2024 6:50:00 PM - July 29, 2024 7:50:00 PM
{1722252000000000000, 1722259200000000000}, // July 29, 2024 4:50:00 PM - July 29, 2024 6:50:00 PM
{1722237600000000000, 1722252000000000000}, // July 29, 2024 12:50:00 PM - July 29, 2024 4:50:00 PM
{1722208800000000000, 1722237600000000000}, // July 29, 2024 4:50:00 AM - July 29, 2024 12:50:00 PM
{1722171576000000000, 1722208800000000000}, // July 28, 2024 6:29:36 PM - July 29, 2024 4:50:00 AM
},
},
}
for _, test := range startEndData {
res := getTsStartEndArray(test.start, test.end)
for i, v := range res {
if test.res[i].Start != v.Start || test.res[i].End != v.End {
t.Errorf("expected range was %v - %v, got %v - %v", v.Start, v.End, test.res[i].Start, test.res[i].End)
}
}
}
}

View File

@@ -15,6 +15,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
"go.signoz.io/signoz/pkg/query-service/utils"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/interfaces"
@@ -465,14 +466,127 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
return results, errQueriesByName, err
}
type startEndArr struct {
Start int64
End int64
}
func getTsStartEndArray(start, end int64) []startEndArr {
startNano := utils.GetEpochNanoSecs(start)
endNano := utils.GetEpochNanoSecs(end)
result := []startEndArr{}
hourNano := int64(3600000000000)
if endNano-startNano > hourNano {
bucket := hourNano
tStartNano := endNano - bucket
complete := false
for {
result = append(result, startEndArr{Start: tStartNano, End: endNano})
if complete {
break
}
bucket = bucket * 2
endNano = tStartNano
tStartNano = tStartNano - bucket
// break condition
if tStartNano <= startNano {
complete = true
tStartNano = startNano
}
}
}
return result
}
func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey, startEndArr []startEndArr) ([]*v3.Result, map[string]error, error) {
// check the timerange
res := make([]*v3.Result, 0)
qName := ""
pageSize := uint64(0)
// it will run only once
for name, v := range params.CompositeQuery.BuilderQueries {
qName = name
pageSize = v.PageSize
}
data := []*v3.Row{}
for _, v := range startEndArr {
params.Start = v.Start
params.End = v.End
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
queries, err := q.builder.PrepareQueries(params, keys)
if err != nil {
return nil, nil, err
}
// only one query
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query)
if err != nil {
errs := []error{err}
errQuriesByName := map[string]error{
name: err,
}
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
data = append(data, rowList...)
}
// append a filter to the params
if len(data) > 0 {
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "id",
IsColumn: true,
DataType: "string",
},
Operator: v3.FilterOperatorLessThan,
Value: data[len(data)-1].Data["id"],
})
}
if uint64(len(data)) >= pageSize {
fmt.Println(len(data), pageSize)
break
}
}
res = append(res, &v3.Result{
QueryName: qName,
List: data,
})
return res, nil, nil
}
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error) {
// Note: as of now only supporting one list query
if params.CompositeQuery != nil {
if len(params.CompositeQuery.BuilderQueries) == 1 {
for _, v := range params.CompositeQuery.BuilderQueries {
// only allow of logs queries with timestamp ordering desc
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" {
startEndArr := getTsStartEndArray(params.Start, params.End)
if len(startEndArr) > 0 {
return q.runLogsListQuery(ctx, params, keys, startEndArr)
}
}
}
}
}
queries, err := q.builder.PrepareQueries(params, keys)
if err != nil {
return nil, nil, err
}
// change for list is need here for faster listing for logs
ch := make(chan channelResult, len(queries))
var wg sync.WaitGroup

View File

@@ -1102,3 +1102,49 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
}
}
}
func TestTsStartEnd(t *testing.T) {
startEndData := []struct {
name string
start int64
end int64
res []startEndArr
}{
{
name: "testing for less then one hour",
start: 1722262800000000000, // July 29, 2024 7:50:00 PM
end: 1722263800000000000, // July 29, 2024 8:06:40 PM
res: []startEndArr{},
},
{
name: "testing for more than one hour",
start: 1722255800000000000, // July 29, 2024 5:53:20 PM
end: 1722262800000000000, // July 29, 2024 8:06:40 PM
res: []startEndArr{
{1722259200000000000, 1722262800000000000}, // July 29, 2024 6:50:00 PM - July 29, 2024 7:50:00 PM
{1722255800000000000, 1722259200000000000}, // July 29, 2024 5:53:20 PM - July 29, 2024 6:50:00 PM
},
},
{
name: "testing for 1 day",
start: 1722171576000000000,
end: 1722262800000000000,
res: []startEndArr{
{1722259200000000000, 1722262800000000000}, // July 29, 2024 6:50:00 PM - July 29, 2024 7:50:00 PM
{1722252000000000000, 1722259200000000000}, // July 29, 2024 4:50:00 PM - July 29, 2024 6:50:00 PM
{1722237600000000000, 1722252000000000000}, // July 29, 2024 12:50:00 PM - July 29, 2024 4:50:00 PM
{1722208800000000000, 1722237600000000000}, // July 29, 2024 4:50:00 AM - July 29, 2024 12:50:00 PM
{1722171576000000000, 1722208800000000000}, // July 28, 2024 6:29:36 PM - July 29, 2024 4:50:00 AM
},
},
}
for _, test := range startEndData {
res := getTsStartEndArray(test.start, test.end)
for i, v := range res {
if test.res[i].Start != v.Start || test.res[i].End != v.End {
t.Errorf("expected range was %v - %v, got %v - %v", v.Start, v.End, test.res[i].Start, test.res[i].End)
}
}
}
}

View File

@@ -437,11 +437,12 @@ var testLogsWithFormula = []struct {
},
},
ExpectedQuery: "SELECT A.`key_1` as `key_1`, A.`ts` as `ts`, A.value + B.value as value FROM " +
"(SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_bool_value[indexOf(attributes_bool_key, 'key_1')] as `key_1`, toFloat64(count(*)) as value from " +
"signoz_logs.distributed_logs where (timestamp >= 1702979275000000000 AND timestamp <= 1702981075000000000) AND attributes_bool_value[indexOf(attributes_bool_key, 'key_1')] = true AND " +
"has(attributes_bool_key, 'key_1') group by `key_1`,ts order by value DESC) as A INNER JOIN (SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, " +
"attributes_bool_value[indexOf(attributes_bool_key, 'key_1')] as `key_1`, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1702979275000000000 AND timestamp <= 1702981075000000000) " +
"AND attributes_bool_value[indexOf(attributes_bool_key, 'key_2')] = true AND has(attributes_bool_key, 'key_1') group by `key_1`,ts order by value DESC) as B ON A.`key_1` = B.`key_1` AND A.`ts` = B.`ts`",
"(SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_bool['key_1'] as `key_1`, toFloat64(count(*)) as value from " +
"signoz_logs.distributed_logs_v2 where (timestamp >= 1702979275000000000 AND timestamp <= 1702981075000000000) AND (ts_bucket_start >= 1702977475 AND ts_bucket_start <= 1702981075) " +
"AND attributes_bool['key_1'] = true AND mapContains(attributes_bool, 'key_1') AND mapContains(attributes_bool, 'key_1') group by `key_1`,ts order by value DESC) as A INNER JOIN (SELECT " +
"toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_bool['key_1'] as `key_1`, toFloat64(count(*)) as value " +
"from signoz_logs.distributed_logs_v2 where (timestamp >= 1702979275000000000 AND timestamp <= 1702981075000000000) AND (ts_bucket_start >= 1702977475 AND ts_bucket_start <= 1702981075) " +
"AND attributes_bool['key_2'] = true AND mapContains(attributes_bool, 'key_2') AND mapContains(attributes_bool, 'key_1') group by `key_1`,ts order by value DESC) as B ON A.`key_1` = B.`key_1` AND A.`ts` = B.`ts`",
},
{
Name: "test formula with dot in filter and group by attribute",
@@ -497,11 +498,12 @@ var testLogsWithFormula = []struct {
},
},
},
ExpectedQuery: "SELECT A.`key1.1` as `key1.1`, A.`ts` as `ts`, A.value + B.value as value FROM (SELECT now() as ts, attributes_bool_value[indexOf(attributes_bool_key, 'key1.1')] as `key1.1`, " +
"toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1702979056000000000 AND timestamp <= 1702982656000000000) AND attributes_bool_value[indexOf(attributes_bool_key, 'key1.1')] = true AND " +
"has(attributes_bool_key, 'key1.1') group by `key1.1` order by value DESC) as A INNER JOIN (SELECT now() as ts, attributes_bool_value[indexOf(attributes_bool_key, 'key1.1')] as `key1.1`, " +
"toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1702979056000000000 AND timestamp <= 1702982656000000000) AND attributes_bool_value[indexOf(attributes_bool_key, 'key1.2')] = true AND " +
"has(attributes_bool_key, 'key1.1') group by `key1.1` order by value DESC) as B ON A.`key1.1` = B.`key1.1` AND A.`ts` = B.`ts`",
ExpectedQuery: "SELECT A.`key1.1` as `key1.1`, A.`ts` as `ts`, A.value + B.value as value FROM (SELECT now() as ts, attributes_bool['key1.1'] as `key1.1`, " +
"toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1702979056000000000 AND timestamp <= 1702982656000000000) AND (ts_bucket_start >= 1702977256 AND ts_bucket_start <= 1702982656) " +
"AND attributes_bool['key1.1'] = true AND mapContains(attributes_bool, 'key1.1') AND mapContains(attributes_bool, 'key1.1') group by `key1.1` order by value DESC) as A INNER JOIN (SELECT now() as ts, " +
"attributes_bool['key1.1'] as `key1.1`, toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1702979056000000000 AND timestamp <= 1702982656000000000) " +
"AND (ts_bucket_start >= 1702977256 AND ts_bucket_start <= 1702982656) AND attributes_bool['key1.2'] = true AND mapContains(attributes_bool, 'key1.2') AND " +
"mapContains(attributes_bool, 'key1.1') group by `key1.1` order by value DESC) as B ON A.`key1.1` = B.`key1.1` AND A.`ts` = B.`ts`",
},
{
Name: "test formula with dot in filter and group by materialized attribute",
@@ -558,10 +560,11 @@ var testLogsWithFormula = []struct {
},
},
ExpectedQuery: "SELECT A.`key1.1` as `key1.1`, A.`ts` as `ts`, A.value - B.value as value FROM (SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, " +
"`attribute_bool_key1$$1` as `key1.1`, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1702980884000000000 AND timestamp <= 1702984484000000000) AND " +
"`attribute_bool_key_2` = true AND `attribute_bool_key1$$1_exists`=true group by `key1.1`,ts order by value DESC) as A INNER JOIN (SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), " +
"INTERVAL 60 SECOND) AS ts, `attribute_bool_key1$$1` as `key1.1`, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1702980884000000000 AND " +
"timestamp <= 1702984484000000000) AND attributes_bool_value[indexOf(attributes_bool_key, 'key_1')] = true AND `attribute_bool_key1$$1_exists`=true group by `key1.1`,ts order by value DESC) as B " +
"`attribute_bool_key1$$1` as `key1.1`, toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1702980884000000000 AND timestamp <= 1702984484000000000) AND " +
"(ts_bucket_start >= 1702979084 AND ts_bucket_start <= 1702984484) AND `attribute_bool_key_2` = true AND `attribute_bool_key1$$1_exists`=true group by `key1.1`,ts order by value DESC) as " +
"A INNER JOIN (SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, `attribute_bool_key1$$1` as `key1.1`, toFloat64(count(*)) as value from " +
"signoz_logs.distributed_logs_v2 where (timestamp >= 1702980884000000000 AND timestamp <= 1702984484000000000) AND (ts_bucket_start >= 1702979084 AND ts_bucket_start <= 1702984484) AND " +
"attributes_bool['key_1'] = true AND mapContains(attributes_bool, 'key_1') AND `attribute_bool_key1$$1_exists`=true group by `key1.1`,ts order by value DESC) as B " +
"ON A.`key1.1` = B.`key1.1` AND A.`ts` = B.`ts`",
},
}

View File

@@ -311,6 +311,12 @@ const (
"CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64," +
"CAST((attributes_bool_key, attributes_bool_value), 'Map(String, Bool)') as attributes_bool," +
"CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string "
LogsSQLSelectV2 = "SELECT " +
"timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body," +
"attributes_string," +
"attributes_number," +
"attributes_bool," +
"resources_string "
TracesExplorerViewSQLSelectWithSubQuery = "WITH subQuery AS (SELECT distinct on (traceID) traceID, durationNano, " +
"serviceName, name FROM %s.%s WHERE parentSpanID = '' AND %s %s ORDER BY durationNano DESC "
TracesExplorerViewSQLSelectQuery = "SELECT subQuery.serviceName, subQuery.name, count() AS " +
@@ -375,6 +381,12 @@ var StaticFieldsLogsV3 = map[string]v3.AttributeKey{
Type: v3.AttributeKeyTypeUnspecified,
IsColumn: true,
},
"__attrs": {
Key: "__attrs",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeUnspecified,
IsColumn: true,
},
}
const SigNozOrderByValue = "#SIGNOZ_VALUE"

View File

@@ -14,6 +14,9 @@ import (
// ValidateAndCastValue validates and casts the value of a key to the corresponding data type of the key
func ValidateAndCastValue(v interface{}, dataType v3.AttributeKeyDataType) (interface{}, error) {
// if it's pointer convert it to a value
v = getPointerValue(v)
switch dataType {
case v3.AttributeKeyDataTypeString:
switch x := v.(type) {
@@ -244,10 +247,16 @@ func GetClickhouseColumnName(typeName string, dataType, field string) string {
typeName = typeName[:len(typeName)-1]
}
dataType = strings.ToLower(dataType)
if dataType == "int64" || dataType == "float64" {
dataType = "number"
}
// if name contains . replace it with `$$`
field = strings.ReplaceAll(field, ".", "$$")
colName := fmt.Sprintf("`%s_%s_%s`", strings.ToLower(typeName), strings.ToLower(dataType), field)
colName := fmt.Sprintf("`%s_%s_%s`", strings.ToLower(typeName), dataType, field)
return colName
}

View File

@@ -419,28 +419,28 @@ var testGetClickhouseColumnName = []struct {
typeName: string(v3.AttributeKeyTypeTag),
dataType: string(v3.AttributeKeyDataTypeInt64),
field: "tag1",
want: "attribute_int64_tag1",
want: "`attribute_int64_tag1`",
},
{
name: "resource",
typeName: string(v3.AttributeKeyTypeResource),
dataType: string(v3.AttributeKeyDataTypeInt64),
field: "tag1",
want: "resource_int64_tag1",
want: "`resource_int64_tag1`",
},
{
name: "attribute old parser",
typeName: constants.Attributes,
dataType: string(v3.AttributeKeyDataTypeInt64),
field: "tag1",
want: "attribute_int64_tag1",
want: "`attribute_int64_tag1`",
},
{
name: "resource old parser",
typeName: constants.Resources,
dataType: string(v3.AttributeKeyDataTypeInt64),
field: "tag1",
want: "resource_int64_tag1",
want: "`resource_int64_tag1`",
},
}