Compare commits
32 Commits
main
...
v0.50.0-lo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ead1015ec1 | ||
|
|
6a9fe62fd8 | ||
|
|
437fd24b42 | ||
|
|
e5b362bd92 | ||
|
|
2e162a8794 | ||
|
|
5b0f024ff9 | ||
|
|
28c33ca006 | ||
|
|
59d05e702b | ||
|
|
1930c029f2 | ||
|
|
23c4745512 | ||
|
|
d50047a6e7 | ||
|
|
379b49a0d0 | ||
|
|
243fd76a1a | ||
|
|
3d8e9d918c | ||
|
|
91948f3991 | ||
|
|
27190c10ee | ||
|
|
88daeb19bc | ||
|
|
ed61db52cc | ||
|
|
e4374d1115 | ||
|
|
ce3d9a850e | ||
|
|
dc16457b47 | ||
|
|
99e80e52ca | ||
|
|
b9373a0e41 | ||
|
|
e4c6f402b1 | ||
|
|
ac9396bfdb | ||
|
|
cbd5922fe5 | ||
|
|
4cb5e06402 | ||
|
|
890d4a53e5 | ||
|
|
c8d5ddfbe0 | ||
|
|
afcb64281c | ||
|
|
cf43d6a3c6 | ||
|
|
f863b5d8aa |
@@ -134,6 +134,9 @@ function LogsExplorerViews({
|
||||
const [logs, setLogs] = useState<ILog[]>([]);
|
||||
const [requestData, setRequestData] = useState<Query | null>(null);
|
||||
const [showFormatMenuItems, setShowFormatMenuItems] = useState(false);
|
||||
const [lastLogLineTimestamp, setLastLogLineTimestamp] = useState<
|
||||
string | number
|
||||
>();
|
||||
const [queryId, setQueryId] = useState<string>(v4());
|
||||
const [queryStats, setQueryStats] = useState<WsDataEvent>();
|
||||
|
||||
@@ -260,6 +263,12 @@ function LogsExplorerViews({
|
||||
start: timeRange.start,
|
||||
end: timeRange.end,
|
||||
}),
|
||||
lastLogLineTimestamp:
|
||||
requestData?.builder?.queryData[0]?.orderBy?.[0]?.columnName ===
|
||||
'timestamp' &&
|
||||
requestData?.builder?.queryData[0]?.orderBy?.[0]?.order === 'desc'
|
||||
? lastLogLineTimestamp
|
||||
: undefined,
|
||||
},
|
||||
undefined,
|
||||
listQueryKeyRef,
|
||||
@@ -269,6 +278,10 @@ function LogsExplorerViews({
|
||||
},
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
setLastLogLineTimestamp(undefined);
|
||||
}, [data]);
|
||||
|
||||
const getRequestData = useCallback(
|
||||
(
|
||||
query: Query | null,
|
||||
@@ -337,6 +350,8 @@ function LogsExplorerViews({
|
||||
pageSize: nextPageSize,
|
||||
});
|
||||
|
||||
setLastLogLineTimestamp(lastLog?.timestamp);
|
||||
|
||||
setPage((prevPage) => prevPage + 1);
|
||||
|
||||
setRequestData(newRequestData);
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import getStartEndRangeTime from 'lib/getStartEndRangeTime';
|
||||
import getStep from 'lib/getStep';
|
||||
import { mapQueryDataToApi } from 'lib/newQueryBuilder/queryBuilderMappers/mapQueryDataToApi';
|
||||
import { isUndefined } from 'lodash-es';
|
||||
import store from 'store';
|
||||
import { QueryRangePayload } from 'types/api/metrics/getQueryRange';
|
||||
import { EQueryType } from 'types/common/dashboard';
|
||||
@@ -24,7 +25,11 @@ export const prepareQueryRangePayload = ({
|
||||
fillGaps = false,
|
||||
}: GetQueryResultsProps): PrepareQueryRangePayload => {
|
||||
let legendMap: Record<string, string> = {};
|
||||
const { allowSelectedIntervalForStepGen, ...restParams } = params;
|
||||
const {
|
||||
allowSelectedIntervalForStepGen,
|
||||
lastLogLineTimestamp,
|
||||
...restParams
|
||||
} = params;
|
||||
|
||||
const compositeQuery: QueryRangePayload['compositeQuery'] = {
|
||||
queryType: query.queryType,
|
||||
@@ -90,9 +95,13 @@ export const prepareQueryRangePayload = ({
|
||||
interval: globalSelectedInterval,
|
||||
});
|
||||
|
||||
const endLogTimeStamp = !isUndefined(lastLogLineTimestamp)
|
||||
? new Date(lastLogLineTimestamp as any)?.getTime()
|
||||
: undefined;
|
||||
|
||||
const queryPayload: QueryRangePayload = {
|
||||
start: parseInt(start, 10) * 1e3,
|
||||
end: parseInt(end, 10) * 1e3,
|
||||
end: endLogTimeStamp || parseInt(end, 10) * 1e3,
|
||||
step: getStep({
|
||||
start: allowSelectedIntervalForStepGen
|
||||
? start
|
||||
|
||||
@@ -32,7 +32,9 @@ const (
|
||||
defaultSpanAttributeKeysTable string = "distributed_span_attributes_keys"
|
||||
defaultLogsDB string = "signoz_logs"
|
||||
defaultLogsTable string = "distributed_logs"
|
||||
defaultLogsTableV2 string = "distributed_logs_v2"
|
||||
defaultLogsLocalTable string = "logs"
|
||||
defaultLogsLocalTableV2 string = "logs_v2"
|
||||
defaultLogAttributeKeysTable string = "distributed_logs_attribute_keys"
|
||||
defaultLogResourceKeysTable string = "distributed_logs_resource_keys"
|
||||
defaultLogTagAttributeTable string = "distributed_tag_attributes"
|
||||
@@ -63,7 +65,9 @@ type namespaceConfig struct {
|
||||
TopLevelOperationsTable string
|
||||
LogsDB string
|
||||
LogsTable string
|
||||
LogsTableV2 string
|
||||
LogsLocalTable string
|
||||
logsLocalTableV2 string
|
||||
LogsAttributeKeysTable string
|
||||
LogsResourceKeysTable string
|
||||
LogsTagAttributeTable string
|
||||
@@ -150,7 +154,9 @@ func NewOptions(
|
||||
TopLevelOperationsTable: defaultTopLevelOperationsTable,
|
||||
LogsDB: defaultLogsDB,
|
||||
LogsTable: defaultLogsTable,
|
||||
LogsTableV2: defaultLogsTableV2,
|
||||
LogsLocalTable: defaultLogsLocalTable,
|
||||
logsLocalTableV2: defaultLogsLocalTableV2,
|
||||
LogsAttributeKeysTable: defaultLogAttributeKeysTable,
|
||||
LogsResourceKeysTable: defaultLogResourceKeysTable,
|
||||
LogsTagAttributeTable: defaultLogTagAttributeTable,
|
||||
|
||||
@@ -116,7 +116,9 @@ type ClickHouseReader struct {
|
||||
topLevelOperationsTable string
|
||||
logsDB string
|
||||
logsTable string
|
||||
logsTableV2 string
|
||||
logsLocalTable string
|
||||
logsLocalTableV2 string
|
||||
logsAttributeKeys string
|
||||
logsResourceKeys string
|
||||
logsTagAttributeTable string
|
||||
@@ -209,7 +211,9 @@ func NewReaderFromClickhouseConnection(
|
||||
topLevelOperationsTable: options.primary.TopLevelOperationsTable,
|
||||
logsDB: options.primary.LogsDB,
|
||||
logsTable: options.primary.LogsTable,
|
||||
logsTableV2: options.primary.LogsTableV2,
|
||||
logsLocalTable: options.primary.LogsLocalTable,
|
||||
logsLocalTableV2: options.primary.logsLocalTableV2,
|
||||
logsAttributeKeys: options.primary.LogsAttributeKeysTable,
|
||||
logsResourceKeys: options.primary.LogsResourceKeysTable,
|
||||
logsTagAttributeTable: options.primary.LogsTagAttributeTable,
|
||||
@@ -3512,7 +3516,7 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe
|
||||
resources = removeUnderscoreDuplicateFields(resources)
|
||||
|
||||
statements := []model.ShowCreateTableStatement{}
|
||||
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable)
|
||||
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTableV2)
|
||||
err = r.db.Select(ctx, &statements, query)
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
|
||||
@@ -3552,21 +3556,23 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
|
||||
|
||||
colname := utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name)
|
||||
|
||||
dataType := strings.ToLower(field.DataType)
|
||||
if dataType == "int64" || dataType == "float64" {
|
||||
dataType = "number"
|
||||
}
|
||||
attrColName := fmt.Sprintf("%s_%s", field.Type, dataType)
|
||||
|
||||
// if a field is selected it means that the field needs to be indexed
|
||||
if field.Selected {
|
||||
keyColName := fmt.Sprintf("%s_%s_key", field.Type, strings.ToLower(field.DataType))
|
||||
valueColName := fmt.Sprintf("%s_%s_value", field.Type, strings.ToLower(field.DataType))
|
||||
|
||||
// create materialized column
|
||||
|
||||
for _, table := range []string{r.logsLocalTable, r.logsTable} {
|
||||
q := "ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s DEFAULT %s[indexOf(%s, '%s')] CODEC(ZSTD(1))"
|
||||
for _, table := range []string{r.logsLocalTableV2, r.logsTableV2} {
|
||||
q := "ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s DEFAULT %s['%s'] CODEC(ZSTD(1))"
|
||||
query := fmt.Sprintf(q,
|
||||
r.logsDB, table,
|
||||
r.cluster,
|
||||
colname, field.DataType,
|
||||
valueColName,
|
||||
keyColName,
|
||||
attrColName,
|
||||
field.Name,
|
||||
)
|
||||
err := r.db.Exec(ctx, query)
|
||||
@@ -3574,11 +3580,11 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
|
||||
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
|
||||
}
|
||||
|
||||
query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s_exists` bool DEFAULT if(indexOf(%s, '%s') != 0, true, false) CODEC(ZSTD(1))",
|
||||
query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s_exists` bool DEFAULT if(mapContains(%s, '%s') != 0, true, false) CODEC(ZSTD(1))",
|
||||
r.logsDB, table,
|
||||
r.cluster,
|
||||
strings.TrimSuffix(colname, "`"),
|
||||
keyColName,
|
||||
attrColName,
|
||||
field.Name,
|
||||
)
|
||||
err = r.db.Exec(ctx, query)
|
||||
@@ -3600,7 +3606,7 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
|
||||
field.IndexGranularity = constants.DefaultLogSkipIndexGranularity
|
||||
}
|
||||
query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS %s_idx` (%s) TYPE %s GRANULARITY %d",
|
||||
r.logsDB, r.logsLocalTable,
|
||||
r.logsDB, r.logsLocalTableV2,
|
||||
r.cluster,
|
||||
strings.TrimSuffix(colname, "`"),
|
||||
colname,
|
||||
@@ -4203,7 +4209,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v
|
||||
defer rows.Close()
|
||||
|
||||
statements := []model.ShowCreateTableStatement{}
|
||||
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable)
|
||||
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTableV2)
|
||||
err = r.db.Select(ctx, &statements, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
|
||||
@@ -4257,7 +4263,7 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt
|
||||
defer rows.Close()
|
||||
|
||||
statements := []model.ShowCreateTableStatement{}
|
||||
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable)
|
||||
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTableV2)
|
||||
err = r.db.Select(ctx, &statements, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
|
||||
@@ -4309,7 +4315,7 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi
|
||||
}
|
||||
|
||||
// ignore autocomplete request for body
|
||||
if req.FilterAttributeKey == "body" {
|
||||
if req.FilterAttributeKey == "body" || req.FilterAttributeKey == "__attrs" {
|
||||
return &v3.FilterAttributeValueResponse{}, nil
|
||||
}
|
||||
|
||||
@@ -4344,10 +4350,10 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi
|
||||
|
||||
// prepare the query and run
|
||||
if len(req.SearchText) != 0 {
|
||||
query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) and %s ILIKE $1 limit $2", selectKey, r.logsDB, r.logsTable, filterValueColumnWhere)
|
||||
query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) and %s ILIKE $1 limit $2", selectKey, r.logsDB, r.logsLocalTableV2, filterValueColumnWhere)
|
||||
rows, err = r.db.Query(ctx, query, searchText, req.Limit)
|
||||
} else {
|
||||
query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) limit $1", selectKey, r.logsDB, r.logsTable)
|
||||
query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) limit $1", selectKey, r.logsDB, r.logsLocalTableV2)
|
||||
rows, err = r.db.Query(ctx, query, req.Limit)
|
||||
}
|
||||
} else if len(req.SearchText) != 0 {
|
||||
|
||||
@@ -254,7 +254,7 @@ func TestReplaceInterestingFields(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
expectedTokens := []string{"attributes_int64_value[indexOf(attributes_int64_key, 'id.userid')] IN (100) ", "and `attribute_int64_id_key` >= 50 ", `AND body ILIKE '%searchstring%'`}
|
||||
expectedTokens := []string{"attributes_int64_value[indexOf(attributes_int64_key, 'id.userid')] IN (100) ", "and `attribute_number_id_key` >= 50 ", `AND body ILIKE '%searchstring%'`}
|
||||
Convey("testInterestingFields", t, func() {
|
||||
tokens, err := replaceInterestingFields(&allFields, queryTokens)
|
||||
So(err, ShouldBeNil)
|
||||
@@ -376,7 +376,7 @@ var generateSQLQueryTestCases = []struct {
|
||||
IdGt: "2BsKLKv8cZrLCn6rkOcRGkdjBdM",
|
||||
IdLT: "2BsKG6tRpFWjYMcWsAGKfSxoQdU",
|
||||
},
|
||||
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' and id > '2BsKLKv8cZrLCn6rkOcRGkdjBdM' and id < '2BsKG6tRpFWjYMcWsAGKfSxoQdU' ) and ( `attribute_int64_field1` < 100 and `attribute_int64_field1` > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
|
||||
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' and id > '2BsKLKv8cZrLCn6rkOcRGkdjBdM' and id < '2BsKG6tRpFWjYMcWsAGKfSxoQdU' ) and ( `attribute_number_field1` < 100 and `attribute_number_field1` > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
|
||||
},
|
||||
{
|
||||
Name: "second query with only timestamp range",
|
||||
@@ -385,7 +385,7 @@ var generateSQLQueryTestCases = []struct {
|
||||
TimestampStart: uint64(1657689292000),
|
||||
TimestampEnd: uint64(1657689294000),
|
||||
},
|
||||
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( `attribute_int64_field1` < 100 and `attribute_int64_field1` > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
|
||||
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( `attribute_number_field1` < 100 and `attribute_number_field1` > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
|
||||
},
|
||||
{
|
||||
Name: "generate case sensitive query",
|
||||
@@ -394,7 +394,7 @@ var generateSQLQueryTestCases = []struct {
|
||||
TimestampStart: uint64(1657689292000),
|
||||
TimestampEnd: uint64(1657689294000),
|
||||
},
|
||||
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( `attribute_int64_field1` < 100 and attributes_int64_value[indexOf(attributes_int64_key, 'FielD1')] > 50 and `attribute_double64_Field2` > 10 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
|
||||
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( `attribute_number_field1` < 100 and attributes_int64_value[indexOf(attributes_int64_key, 'FielD1')] > 50 and `attribute_double64_Field2` > 10 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
|
||||
},
|
||||
{
|
||||
Name: "Check exists and not exists",
|
||||
|
||||
@@ -45,10 +45,10 @@ var jsonLogOperators = map[v3.FilterOperator]string{
|
||||
v3.FilterOperatorLessThanOrEq: "<=",
|
||||
v3.FilterOperatorGreaterThan: ">",
|
||||
v3.FilterOperatorGreaterThanOrEq: ">=",
|
||||
v3.FilterOperatorLike: "ILIKE",
|
||||
v3.FilterOperatorNotLike: "NOT ILIKE",
|
||||
v3.FilterOperatorContains: "ILIKE",
|
||||
v3.FilterOperatorNotContains: "NOT ILIKE",
|
||||
v3.FilterOperatorLike: "LIKE",
|
||||
v3.FilterOperatorNotLike: "NOT LIKE",
|
||||
v3.FilterOperatorContains: "LIKE",
|
||||
v3.FilterOperatorNotContains: "NOT LIKE",
|
||||
v3.FilterOperatorRegex: "match(%s, %s)",
|
||||
v3.FilterOperatorNotRegex: "NOT match(%s, %s)",
|
||||
v3.FilterOperatorIn: "IN",
|
||||
|
||||
@@ -298,7 +298,7 @@ var testGetJSONFilterData = []struct {
|
||||
Operator: "contains",
|
||||
Value: "a",
|
||||
},
|
||||
Filter: "lower(body) like lower('%message%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') ILIKE '%a%'",
|
||||
Filter: "lower(body) like lower('%message%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') LIKE '%a%'",
|
||||
},
|
||||
{
|
||||
Name: "contains operator with quotes",
|
||||
@@ -311,7 +311,7 @@ var testGetJSONFilterData = []struct {
|
||||
Operator: "contains",
|
||||
Value: "hello 'world'",
|
||||
},
|
||||
Filter: "lower(body) like lower('%message%') AND lower(body) like lower('%hello \\'world\\'%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') ILIKE '%hello \\'world\\'%'",
|
||||
Filter: "lower(body) like lower('%message%') AND lower(body) like lower('%hello \\'world\\'%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') LIKE '%hello \\'world\\'%'",
|
||||
},
|
||||
{
|
||||
Name: "exists",
|
||||
|
||||
@@ -39,19 +39,23 @@ var logOperators = map[v3.FilterOperator]string{
|
||||
v3.FilterOperatorLessThanOrEq: "<=",
|
||||
v3.FilterOperatorGreaterThan: ">",
|
||||
v3.FilterOperatorGreaterThanOrEq: ">=",
|
||||
v3.FilterOperatorLike: "ILIKE",
|
||||
v3.FilterOperatorNotLike: "NOT ILIKE",
|
||||
v3.FilterOperatorContains: "ILIKE",
|
||||
v3.FilterOperatorNotContains: "NOT ILIKE",
|
||||
v3.FilterOperatorLike: "LIKE",
|
||||
v3.FilterOperatorNotLike: "NOT LIKE",
|
||||
v3.FilterOperatorContains: "LIKE",
|
||||
v3.FilterOperatorNotContains: "NOT LIKE",
|
||||
v3.FilterOperatorRegex: "match(%s, %s)",
|
||||
v3.FilterOperatorNotRegex: "NOT match(%s, %s)",
|
||||
v3.FilterOperatorIn: "IN",
|
||||
v3.FilterOperatorNotIn: "NOT IN",
|
||||
v3.FilterOperatorExists: "has(%s_%s_key, '%s')",
|
||||
v3.FilterOperatorNotExists: "not has(%s_%s_key, '%s')",
|
||||
v3.FilterOperatorExists: "mapContains(%s_%s, '%s')",
|
||||
v3.FilterOperatorNotExists: "not mapContains(%s_%s, '%s')",
|
||||
}
|
||||
|
||||
const BODY = "body"
|
||||
const (
|
||||
BODY = "body"
|
||||
DISTRIBUTED_LOGS_V2 = "distributed_logs_v2"
|
||||
DISTRIBUTED_LOGS_V2_RESOURCE = "distributed_logs_v2_resource"
|
||||
)
|
||||
|
||||
func getClickhouseLogsColumnType(columnType v3.AttributeKeyType) string {
|
||||
if columnType == v3.AttributeKeyTypeTag {
|
||||
@@ -61,11 +65,8 @@ func getClickhouseLogsColumnType(columnType v3.AttributeKeyType) string {
|
||||
}
|
||||
|
||||
func getClickhouseLogsColumnDataType(columnDataType v3.AttributeKeyDataType) string {
|
||||
if columnDataType == v3.AttributeKeyDataTypeFloat64 {
|
||||
return "float64"
|
||||
}
|
||||
if columnDataType == v3.AttributeKeyDataTypeInt64 {
|
||||
return "int64"
|
||||
if columnDataType == v3.AttributeKeyDataTypeFloat64 || columnDataType == v3.AttributeKeyDataTypeInt64 {
|
||||
return "number"
|
||||
}
|
||||
if columnDataType == v3.AttributeKeyDataTypeBool {
|
||||
return "bool"
|
||||
@@ -85,7 +86,7 @@ func getClickhouseColumnName(key v3.AttributeKey) string {
|
||||
if !key.IsColumn {
|
||||
columnType := getClickhouseLogsColumnType(key.Type)
|
||||
columnDataType := getClickhouseLogsColumnDataType(key.DataType)
|
||||
clickhouseColumn = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", columnType, columnDataType, columnType, columnDataType, key.Key)
|
||||
clickhouseColumn = fmt.Sprintf("%s_%s['%s']", columnType, columnDataType, key.Key)
|
||||
return clickhouseColumn
|
||||
}
|
||||
|
||||
@@ -164,6 +165,12 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
|
||||
|
||||
if fs != nil && len(fs.Items) != 0 {
|
||||
for _, item := range fs.Items {
|
||||
|
||||
// skip if it's a resource attribute
|
||||
if item.Key.Type == v3.AttributeKeyTypeResource {
|
||||
continue
|
||||
}
|
||||
|
||||
if item.Key.IsJSON {
|
||||
filter, err := GetJSONFilter(item)
|
||||
if err != nil {
|
||||
@@ -173,6 +180,16 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
|
||||
continue
|
||||
}
|
||||
|
||||
// check if the user is searching for all attributes
|
||||
if item.Key.Key == "__attrs" {
|
||||
if (item.Operator != v3.FilterOperatorEqual && item.Operator != v3.FilterOperatorContains) || item.Key.DataType != v3.AttributeKeyDataTypeString {
|
||||
return "", fmt.Errorf("only = operator and string data type is supported for __attrs")
|
||||
}
|
||||
val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
|
||||
conditions = append(conditions, fmt.Sprintf("has(mapValues(attributes_string), '%v')", val))
|
||||
continue
|
||||
}
|
||||
|
||||
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
|
||||
|
||||
var value interface{}
|
||||
@@ -196,7 +213,6 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
|
||||
columnName := getClickhouseColumnName(item.Key)
|
||||
val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
|
||||
if columnName == BODY {
|
||||
logsOp = strings.Replace(logsOp, "ILIKE", "LIKE", 1) // removing i from ilike and not ilike
|
||||
conditions = append(conditions, fmt.Sprintf("lower(%s) %s lower('%%%s%%')", columnName, logsOp, val))
|
||||
} else {
|
||||
conditions = append(conditions, fmt.Sprintf("%s %s '%%%s%%'", columnName, logsOp, val))
|
||||
@@ -208,7 +224,6 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
|
||||
// for use lower for like and ilike
|
||||
if op == v3.FilterOperatorLike || op == v3.FilterOperatorNotLike {
|
||||
if columnName == BODY {
|
||||
logsOp = strings.Replace(logsOp, "ILIKE", "LIKE", 1) // removing i from ilike and not ilike
|
||||
columnName = fmt.Sprintf("lower(%s)", columnName)
|
||||
fmtVal = fmt.Sprintf("lower(%s)", fmtVal)
|
||||
}
|
||||
@@ -218,15 +233,27 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
|
||||
} else {
|
||||
return "", fmt.Errorf("unsupported operator: %s", op)
|
||||
}
|
||||
|
||||
// add extra condition for map contains
|
||||
// by default clickhouse is not able to utilize indexes for keys with all operators.
|
||||
// mapContains forces the use of index.
|
||||
if item.Key.IsColumn == false && op != v3.FilterOperatorExists && op != v3.FilterOperatorNotExists {
|
||||
conditions = append(conditions, GetExistsNexistsFilter(v3.FilterOperatorExists, item))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// add group by conditions to filter out log lines which doesn't have the key
|
||||
for _, attr := range groupBy {
|
||||
// skip if it's a resource attribute
|
||||
if attr.Type == v3.AttributeKeyTypeResource {
|
||||
continue
|
||||
}
|
||||
|
||||
if !attr.IsColumn {
|
||||
columnType := getClickhouseLogsColumnType(attr.Type)
|
||||
columnDataType := getClickhouseLogsColumnDataType(attr.DataType)
|
||||
conditions = append(conditions, fmt.Sprintf("has(%s_%s_key, '%s')", columnType, columnDataType, attr.Key))
|
||||
conditions = append(conditions, fmt.Sprintf("mapContains(%s_%s, '%s')", columnType, columnDataType, attr.Key))
|
||||
} else if attr.Type != v3.AttributeKeyTypeUnspecified {
|
||||
// for materialzied columns
|
||||
conditions = append(conditions, fmt.Sprintf("%s_exists`=true", strings.TrimSuffix(getClickhouseColumnName(attr), "`")))
|
||||
@@ -234,7 +261,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
|
||||
}
|
||||
|
||||
// add conditions for aggregate attribute
|
||||
if aggregateAttribute.Key != "" {
|
||||
if aggregateAttribute.Key != "" && aggregateAttribute.Type != v3.AttributeKeyTypeResource {
|
||||
existsFilter := GetExistsNexistsFilter(v3.FilterOperatorExists, v3.FilterItem{Key: aggregateAttribute})
|
||||
conditions = append(conditions, existsFilter)
|
||||
}
|
||||
@@ -243,6 +270,148 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
|
||||
return queryString, nil
|
||||
}
|
||||
|
||||
func buildResourceBucketFilters(fs *v3.FilterSet, groupBy []v3.AttributeKey, orderBy []v3.OrderBy, aggregateAttribute v3.AttributeKey) (string, error) {
|
||||
var andConditions []string
|
||||
var orConditions []string
|
||||
// only add the resource attributes to the filters here
|
||||
if fs != nil && len(fs.Items) != 0 {
|
||||
for _, item := range fs.Items {
|
||||
// skip anything other than resource attribute
|
||||
if item.Key.Type != v3.AttributeKeyTypeResource {
|
||||
continue
|
||||
}
|
||||
|
||||
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
|
||||
var value interface{}
|
||||
var err error
|
||||
if op != v3.FilterOperatorExists && op != v3.FilterOperatorNotExists {
|
||||
value, err = utils.ValidateAndCastValue(item.Value, item.Key.DataType)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to validate and cast value for %s: %v", item.Key.Key, err)
|
||||
}
|
||||
}
|
||||
|
||||
columnName := fmt.Sprintf("simpleJSONExtractString(lower(labels), '%s')", item.Key.Key)
|
||||
|
||||
if logsOp, ok := logOperators[op]; ok {
|
||||
switch op {
|
||||
case v3.FilterOperatorExists:
|
||||
andConditions = append(andConditions, fmt.Sprintf("simpleJSONHas(labels, '%s')", item.Key.Key))
|
||||
case v3.FilterOperatorNotExists:
|
||||
andConditions = append(andConditions, fmt.Sprintf("not simpleJSONHas(labels, '%s')", item.Key.Key))
|
||||
case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex:
|
||||
fmtVal := utils.ClickHouseFormattedValue(value)
|
||||
// for regex don't lowercase it as it will result in something else
|
||||
andConditions = append(andConditions, fmt.Sprintf(logsOp, columnName, fmtVal))
|
||||
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
|
||||
val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
|
||||
andConditions = append(andConditions, fmt.Sprintf("%s %s '%%%s%%'", columnName, logsOp, strings.ToLower(val)))
|
||||
default:
|
||||
fmtVal := utils.ClickHouseFormattedValue(value)
|
||||
andConditions = append(andConditions, fmt.Sprintf("%s %s %s", columnName, logsOp, strings.ToLower(fmtVal)))
|
||||
}
|
||||
|
||||
// add index filters
|
||||
switch op {
|
||||
case v3.FilterOperatorContains, v3.FilterOperatorEqual, v3.FilterOperatorLike:
|
||||
val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
|
||||
andConditions = append(andConditions, fmt.Sprintf("lower(labels) like '%%%s%%%s%%'", strings.ToLower(item.Key.Key), strings.ToLower(val)))
|
||||
case v3.FilterOperatorNotContains, v3.FilterOperatorNotEqual, v3.FilterOperatorNotLike:
|
||||
val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
|
||||
andConditions = append(andConditions, fmt.Sprintf("lower(labels) not like '%%%s%%%s%%'", strings.ToLower(item.Key.Key), strings.ToLower(val)))
|
||||
case v3.FilterOperatorNotRegex:
|
||||
andConditions = append(andConditions, fmt.Sprintf("lower(labels) not like '%%%s%%'", strings.ToLower(item.Key.Key)))
|
||||
case v3.FilterOperatorIn, v3.FilterOperatorNotIn:
|
||||
|
||||
// for in operator value needs to used for indexing in a different way.
|
||||
// eg1:= x in a,b,c = (labels like '%x%a%' or labels like '%"x":"b"%' or labels like '%"x"="c"%')
|
||||
// eg1:= x nin a,b,c = (labels nlike '%x%a%' AND labels nlike '%"x"="b"' AND labels nlike '%"x"="c"%')
|
||||
tCondition := []string{}
|
||||
|
||||
separator := " OR "
|
||||
sqlOp := "like"
|
||||
if op == v3.FilterOperatorNotIn {
|
||||
separator = " AND "
|
||||
sqlOp = "not like"
|
||||
}
|
||||
|
||||
values := []string{}
|
||||
|
||||
switch item.Value.(type) {
|
||||
case string:
|
||||
values = append(values, item.Value.(string))
|
||||
case []interface{}:
|
||||
for _, v := range (item.Value).([]interface{}) {
|
||||
// also resources attributes are always string values
|
||||
strV, ok := v.(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
values = append(values, strV)
|
||||
}
|
||||
}
|
||||
|
||||
if len(values) > 0 {
|
||||
for _, v := range values {
|
||||
tCondition = append(tCondition, fmt.Sprintf("lower(labels) %s '%%\"%s\":\"%s\"%%'", sqlOp, strings.ToLower(item.Key.Key), strings.ToLower(v)))
|
||||
}
|
||||
andConditions = append(andConditions, "("+strings.Join(tCondition, separator)+")")
|
||||
}
|
||||
|
||||
default:
|
||||
andConditions = append(andConditions, fmt.Sprintf("lower(labels) like '%%%s%%'", strings.ToLower(item.Key.Key)))
|
||||
}
|
||||
|
||||
} else {
|
||||
return "", fmt.Errorf("unsupported operator: %s", op)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// for group by add exists check in resources
|
||||
if aggregateAttribute.Key != "" && aggregateAttribute.Type == v3.AttributeKeyTypeResource {
|
||||
andConditions = append(andConditions, fmt.Sprintf("(simpleJSONHas(labels, '%s') AND lower(labels) like '%%%s%%')", aggregateAttribute.Key, strings.ToLower(aggregateAttribute.Key)))
|
||||
}
|
||||
|
||||
// for aggregate attribute add exists check in resources
|
||||
// we are using OR for groupBy and orderby as we just want to filter out the logs using fingerprint
|
||||
for _, attr := range groupBy {
|
||||
if attr.Type != v3.AttributeKeyTypeResource {
|
||||
continue
|
||||
}
|
||||
orConditions = append(orConditions, fmt.Sprintf("(simpleJSONHas(labels, '%s') AND lower(labels) like '%%%s%%')", attr.Key, strings.ToLower(attr.Key)))
|
||||
}
|
||||
|
||||
// for orderBy it's not required as the keys will be already present in the group by
|
||||
// so no point in adding them
|
||||
|
||||
if len(orConditions) > 0 {
|
||||
// TODO: change OR to AND once we know how to solve for group by
|
||||
orConditionStr := "( " + strings.Join(orConditions, " AND ") + " )"
|
||||
andConditions = append(andConditions, orConditionStr)
|
||||
}
|
||||
|
||||
if len(andConditions) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
conditionStr := strings.Join(andConditions, " AND ")
|
||||
return conditionStr, nil
|
||||
}
|
||||
|
||||
func filtersUseNewTable(filters *v3.FilterSet) bool {
|
||||
if filters == nil {
|
||||
return false
|
||||
}
|
||||
for _, item := range filters.Items {
|
||||
if item.Key.Key != "id" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.BuilderQuery, graphLimitQtype string, preferRPM bool) (string, error) {
|
||||
|
||||
filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, mq.AggregateAttribute)
|
||||
@@ -254,7 +423,31 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
|
||||
}
|
||||
|
||||
// timerange will be sent in epoch millisecond
|
||||
timeFilter := fmt.Sprintf("(timestamp >= %d AND timestamp <= %d)", utils.GetEpochNanoSecs(start), utils.GetEpochNanoSecs(end))
|
||||
logsStart := utils.GetEpochNanoSecs(start)
|
||||
logsEnd := utils.GetEpochNanoSecs(end)
|
||||
|
||||
// -1800 this is added so that the bucket start considers all the fingerprints.
|
||||
bucketStart := logsStart/1000000000 - 1800
|
||||
bucketEnd := logsEnd / 1000000000
|
||||
|
||||
timeFilter := fmt.Sprintf("(timestamp >= %d AND timestamp <= %d)", logsStart, logsEnd)
|
||||
|
||||
resourceBucketFilters, err := buildResourceBucketFilters(mq.Filters, mq.GroupBy, mq.OrderBy, mq.AggregateAttribute)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
tableName := DISTRIBUTED_LOGS_V2
|
||||
timeFilter = timeFilter + fmt.Sprintf(" AND (ts_bucket_start >= %d AND ts_bucket_start <= %d)", bucketStart, bucketEnd)
|
||||
|
||||
if len(resourceBucketFilters) > 0 {
|
||||
filter := " AND (resource_fingerprint GLOBAL IN (" +
|
||||
"SELECT fingerprint FROM signoz_logs.%s " +
|
||||
"WHERE (seen_at_ts_bucket_start >= %d) AND (seen_at_ts_bucket_start <= %d) AND "
|
||||
filter = fmt.Sprintf(filter, DISTRIBUTED_LOGS_V2_RESOURCE, bucketStart, bucketEnd)
|
||||
|
||||
filterSubQuery = filterSubQuery + filter + resourceBucketFilters + "))"
|
||||
}
|
||||
|
||||
selectLabels := getSelectLabels(mq.AggregateOperator, mq.GroupBy)
|
||||
|
||||
@@ -280,8 +473,8 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
|
||||
queryTmpl =
|
||||
queryTmpl + selectLabels +
|
||||
" %s as value " +
|
||||
"from signoz_logs.distributed_logs " +
|
||||
"where " + timeFilter + "%s" +
|
||||
"from signoz_logs." + tableName +
|
||||
" where " + timeFilter + "%s" +
|
||||
"%s%s" +
|
||||
"%s"
|
||||
|
||||
@@ -358,8 +551,12 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
|
||||
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorNoOp:
|
||||
queryTmpl := constants.LogsSQLSelect + "from signoz_logs.distributed_logs where %s%s order by %s"
|
||||
query := fmt.Sprintf(queryTmpl, timeFilter, filterSubQuery, orderBy)
|
||||
// sqlSelect := constants.LogsSQLSelect
|
||||
// with noop any filter or different order by other than ts will use new table
|
||||
sqlSelect := constants.LogsSQLSelectV2
|
||||
tableName = DISTRIBUTED_LOGS_V2
|
||||
queryTmpl := sqlSelect + "from signoz_logs.%s where %s%s order by %s"
|
||||
query := fmt.Sprintf(queryTmpl, tableName, timeFilter, filterSubQuery, orderBy)
|
||||
return query, nil
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported aggregate operator")
|
||||
@@ -374,7 +571,7 @@ func buildLogsLiveTailQuery(mq *v3.BuilderQuery) (string, error) {
|
||||
|
||||
switch mq.AggregateOperator {
|
||||
case v3.AggregateOperatorNoOp:
|
||||
query := constants.LogsSQLSelect + "from signoz_logs.distributed_logs where "
|
||||
query := constants.LogsSQLSelectV2 + "from signoz_logs." + DISTRIBUTED_LOGS_V2 + " where "
|
||||
if len(filterSubQuery) > 0 {
|
||||
query = query + filterSubQuery + " AND "
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -15,6 +15,7 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
||||
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
||||
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/cache"
|
||||
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
@@ -457,7 +458,118 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
|
||||
return results, errQueriesByName, err
|
||||
}
|
||||
|
||||
type startEndArr struct {
|
||||
Start int64
|
||||
End int64
|
||||
}
|
||||
|
||||
func getTsStartEndArray(start, end int64) []startEndArr {
|
||||
startNano := utils.GetEpochNanoSecs(start)
|
||||
endNano := utils.GetEpochNanoSecs(end)
|
||||
result := []startEndArr{}
|
||||
|
||||
hourNano := int64(3600000000000)
|
||||
if endNano-startNano > hourNano {
|
||||
bucket := hourNano
|
||||
tStartNano := endNano - bucket
|
||||
|
||||
complete := false
|
||||
for {
|
||||
result = append(result, startEndArr{Start: tStartNano, End: endNano})
|
||||
if complete {
|
||||
break
|
||||
}
|
||||
|
||||
bucket = bucket * 2
|
||||
endNano = tStartNano
|
||||
tStartNano = tStartNano - bucket
|
||||
|
||||
// break condition
|
||||
if tStartNano <= startNano {
|
||||
complete = true
|
||||
tStartNano = startNano
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey, startEndArr []startEndArr) ([]*v3.Result, map[string]error, error) {
|
||||
// check the timerange
|
||||
res := make([]*v3.Result, 0)
|
||||
qName := ""
|
||||
pageSize := uint64(0)
|
||||
|
||||
// it will run only once
|
||||
for name, v := range params.CompositeQuery.BuilderQueries {
|
||||
qName = name
|
||||
pageSize = v.PageSize
|
||||
}
|
||||
data := []*v3.Row{}
|
||||
|
||||
for _, v := range startEndArr {
|
||||
params.Start = v.Start
|
||||
params.End = v.End
|
||||
|
||||
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
|
||||
queries, err := q.builder.PrepareQueries(params, keys)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// only one query
|
||||
for name, query := range queries {
|
||||
rowList, err := q.reader.GetListResultV3(ctx, query)
|
||||
if err != nil {
|
||||
errs := []error{err}
|
||||
errQuriesByName := map[string]error{
|
||||
name: err,
|
||||
}
|
||||
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||
}
|
||||
data = append(data, rowList...)
|
||||
}
|
||||
|
||||
// append a filter to the params
|
||||
if len(data) > 0 {
|
||||
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "id",
|
||||
IsColumn: true,
|
||||
DataType: "string",
|
||||
},
|
||||
Operator: v3.FilterOperatorLessThan,
|
||||
Value: data[len(data)-1].Data["id"],
|
||||
})
|
||||
}
|
||||
|
||||
if uint64(len(data)) >= pageSize {
|
||||
fmt.Println(len(data), pageSize)
|
||||
break
|
||||
}
|
||||
}
|
||||
res = append(res, &v3.Result{
|
||||
QueryName: qName,
|
||||
List: data,
|
||||
})
|
||||
return res, nil, nil
|
||||
}
|
||||
|
||||
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error) {
|
||||
// Note: as of now only supporting one list query
|
||||
if params.CompositeQuery != nil {
|
||||
if len(params.CompositeQuery.BuilderQueries) == 1 {
|
||||
for _, v := range params.CompositeQuery.BuilderQueries {
|
||||
// only allow of logs queries with timestamp ordering desc
|
||||
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" {
|
||||
startEndArr := getTsStartEndArray(params.Start, params.End)
|
||||
if len(startEndArr) > 0 {
|
||||
return q.runLogsListQuery(ctx, params, keys, startEndArr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
queries, err := q.builder.PrepareQueries(params, keys)
|
||||
|
||||
|
||||
@@ -1050,3 +1050,49 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTsStartEnd(t *testing.T) {
|
||||
startEndData := []struct {
|
||||
name string
|
||||
start int64
|
||||
end int64
|
||||
res []startEndArr
|
||||
}{
|
||||
{
|
||||
name: "testing for less then one hour",
|
||||
start: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||
end: 1722263800000000000, // July 29, 2024 8:06:40 PM
|
||||
res: []startEndArr{},
|
||||
},
|
||||
{
|
||||
name: "testing for more than one hour",
|
||||
start: 1722255800000000000, // July 29, 2024 5:53:20 PM
|
||||
end: 1722262800000000000, // July 29, 2024 8:06:40 PM
|
||||
res: []startEndArr{
|
||||
{1722259200000000000, 1722262800000000000}, // July 29, 2024 6:50:00 PM - July 29, 2024 7:50:00 PM
|
||||
{1722255800000000000, 1722259200000000000}, // July 29, 2024 5:53:20 PM - July 29, 2024 6:50:00 PM
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "testing for 1 day",
|
||||
start: 1722171576000000000,
|
||||
end: 1722262800000000000,
|
||||
res: []startEndArr{
|
||||
{1722259200000000000, 1722262800000000000}, // July 29, 2024 6:50:00 PM - July 29, 2024 7:50:00 PM
|
||||
{1722252000000000000, 1722259200000000000}, // July 29, 2024 4:50:00 PM - July 29, 2024 6:50:00 PM
|
||||
{1722237600000000000, 1722252000000000000}, // July 29, 2024 12:50:00 PM - July 29, 2024 4:50:00 PM
|
||||
{1722208800000000000, 1722237600000000000}, // July 29, 2024 4:50:00 AM - July 29, 2024 12:50:00 PM
|
||||
{1722171576000000000, 1722208800000000000}, // July 28, 2024 6:29:36 PM - July 29, 2024 4:50:00 AM
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range startEndData {
|
||||
res := getTsStartEndArray(test.start, test.end)
|
||||
for i, v := range res {
|
||||
if test.res[i].Start != v.Start || test.res[i].End != v.End {
|
||||
t.Errorf("expected range was %v - %v, got %v - %v", v.Start, v.End, test.res[i].Start, test.res[i].End)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
||||
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
||||
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/cache"
|
||||
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
@@ -465,14 +466,127 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
|
||||
return results, errQueriesByName, err
|
||||
}
|
||||
|
||||
type startEndArr struct {
|
||||
Start int64
|
||||
End int64
|
||||
}
|
||||
|
||||
func getTsStartEndArray(start, end int64) []startEndArr {
|
||||
startNano := utils.GetEpochNanoSecs(start)
|
||||
endNano := utils.GetEpochNanoSecs(end)
|
||||
result := []startEndArr{}
|
||||
|
||||
hourNano := int64(3600000000000)
|
||||
if endNano-startNano > hourNano {
|
||||
bucket := hourNano
|
||||
tStartNano := endNano - bucket
|
||||
|
||||
complete := false
|
||||
for {
|
||||
result = append(result, startEndArr{Start: tStartNano, End: endNano})
|
||||
if complete {
|
||||
break
|
||||
}
|
||||
|
||||
bucket = bucket * 2
|
||||
endNano = tStartNano
|
||||
tStartNano = tStartNano - bucket
|
||||
|
||||
// break condition
|
||||
if tStartNano <= startNano {
|
||||
complete = true
|
||||
tStartNano = startNano
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey, startEndArr []startEndArr) ([]*v3.Result, map[string]error, error) {
|
||||
// check the timerange
|
||||
res := make([]*v3.Result, 0)
|
||||
qName := ""
|
||||
pageSize := uint64(0)
|
||||
|
||||
// it will run only once
|
||||
for name, v := range params.CompositeQuery.BuilderQueries {
|
||||
qName = name
|
||||
pageSize = v.PageSize
|
||||
}
|
||||
data := []*v3.Row{}
|
||||
|
||||
for _, v := range startEndArr {
|
||||
params.Start = v.Start
|
||||
params.End = v.End
|
||||
|
||||
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
|
||||
queries, err := q.builder.PrepareQueries(params, keys)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// only one query
|
||||
for name, query := range queries {
|
||||
rowList, err := q.reader.GetListResultV3(ctx, query)
|
||||
if err != nil {
|
||||
errs := []error{err}
|
||||
errQuriesByName := map[string]error{
|
||||
name: err,
|
||||
}
|
||||
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||
}
|
||||
data = append(data, rowList...)
|
||||
}
|
||||
|
||||
// append a filter to the params
|
||||
if len(data) > 0 {
|
||||
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "id",
|
||||
IsColumn: true,
|
||||
DataType: "string",
|
||||
},
|
||||
Operator: v3.FilterOperatorLessThan,
|
||||
Value: data[len(data)-1].Data["id"],
|
||||
})
|
||||
}
|
||||
|
||||
if uint64(len(data)) >= pageSize {
|
||||
fmt.Println(len(data), pageSize)
|
||||
break
|
||||
}
|
||||
}
|
||||
res = append(res, &v3.Result{
|
||||
QueryName: qName,
|
||||
List: data,
|
||||
})
|
||||
return res, nil, nil
|
||||
}
|
||||
|
||||
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error) {
|
||||
|
||||
// Note: as of now only supporting one list query
|
||||
if params.CompositeQuery != nil {
|
||||
if len(params.CompositeQuery.BuilderQueries) == 1 {
|
||||
for _, v := range params.CompositeQuery.BuilderQueries {
|
||||
// only allow of logs queries with timestamp ordering desc
|
||||
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" {
|
||||
startEndArr := getTsStartEndArray(params.Start, params.End)
|
||||
if len(startEndArr) > 0 {
|
||||
return q.runLogsListQuery(ctx, params, keys, startEndArr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
queries, err := q.builder.PrepareQueries(params, keys)
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// change for list is need here for faster listing for logs
|
||||
ch := make(chan channelResult, len(queries))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
|
||||
@@ -1102,3 +1102,49 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTsStartEnd(t *testing.T) {
|
||||
startEndData := []struct {
|
||||
name string
|
||||
start int64
|
||||
end int64
|
||||
res []startEndArr
|
||||
}{
|
||||
{
|
||||
name: "testing for less then one hour",
|
||||
start: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||
end: 1722263800000000000, // July 29, 2024 8:06:40 PM
|
||||
res: []startEndArr{},
|
||||
},
|
||||
{
|
||||
name: "testing for more than one hour",
|
||||
start: 1722255800000000000, // July 29, 2024 5:53:20 PM
|
||||
end: 1722262800000000000, // July 29, 2024 8:06:40 PM
|
||||
res: []startEndArr{
|
||||
{1722259200000000000, 1722262800000000000}, // July 29, 2024 6:50:00 PM - July 29, 2024 7:50:00 PM
|
||||
{1722255800000000000, 1722259200000000000}, // July 29, 2024 5:53:20 PM - July 29, 2024 6:50:00 PM
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "testing for 1 day",
|
||||
start: 1722171576000000000,
|
||||
end: 1722262800000000000,
|
||||
res: []startEndArr{
|
||||
{1722259200000000000, 1722262800000000000}, // July 29, 2024 6:50:00 PM - July 29, 2024 7:50:00 PM
|
||||
{1722252000000000000, 1722259200000000000}, // July 29, 2024 4:50:00 PM - July 29, 2024 6:50:00 PM
|
||||
{1722237600000000000, 1722252000000000000}, // July 29, 2024 12:50:00 PM - July 29, 2024 4:50:00 PM
|
||||
{1722208800000000000, 1722237600000000000}, // July 29, 2024 4:50:00 AM - July 29, 2024 12:50:00 PM
|
||||
{1722171576000000000, 1722208800000000000}, // July 28, 2024 6:29:36 PM - July 29, 2024 4:50:00 AM
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range startEndData {
|
||||
res := getTsStartEndArray(test.start, test.end)
|
||||
for i, v := range res {
|
||||
if test.res[i].Start != v.Start || test.res[i].End != v.End {
|
||||
t.Errorf("expected range was %v - %v, got %v - %v", v.Start, v.End, test.res[i].Start, test.res[i].End)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -437,11 +437,12 @@ var testLogsWithFormula = []struct {
|
||||
},
|
||||
},
|
||||
ExpectedQuery: "SELECT A.`key_1` as `key_1`, A.`ts` as `ts`, A.value + B.value as value FROM " +
|
||||
"(SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_bool_value[indexOf(attributes_bool_key, 'key_1')] as `key_1`, toFloat64(count(*)) as value from " +
|
||||
"signoz_logs.distributed_logs where (timestamp >= 1702979275000000000 AND timestamp <= 1702981075000000000) AND attributes_bool_value[indexOf(attributes_bool_key, 'key_1')] = true AND " +
|
||||
"has(attributes_bool_key, 'key_1') group by `key_1`,ts order by value DESC) as A INNER JOIN (SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, " +
|
||||
"attributes_bool_value[indexOf(attributes_bool_key, 'key_1')] as `key_1`, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1702979275000000000 AND timestamp <= 1702981075000000000) " +
|
||||
"AND attributes_bool_value[indexOf(attributes_bool_key, 'key_2')] = true AND has(attributes_bool_key, 'key_1') group by `key_1`,ts order by value DESC) as B ON A.`key_1` = B.`key_1` AND A.`ts` = B.`ts`",
|
||||
"(SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_bool['key_1'] as `key_1`, toFloat64(count(*)) as value from " +
|
||||
"signoz_logs.distributed_logs_v2 where (timestamp >= 1702979275000000000 AND timestamp <= 1702981075000000000) AND (ts_bucket_start >= 1702977475 AND ts_bucket_start <= 1702981075) " +
|
||||
"AND attributes_bool['key_1'] = true AND mapContains(attributes_bool, 'key_1') AND mapContains(attributes_bool, 'key_1') group by `key_1`,ts order by value DESC) as A INNER JOIN (SELECT " +
|
||||
"toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_bool['key_1'] as `key_1`, toFloat64(count(*)) as value " +
|
||||
"from signoz_logs.distributed_logs_v2 where (timestamp >= 1702979275000000000 AND timestamp <= 1702981075000000000) AND (ts_bucket_start >= 1702977475 AND ts_bucket_start <= 1702981075) " +
|
||||
"AND attributes_bool['key_2'] = true AND mapContains(attributes_bool, 'key_2') AND mapContains(attributes_bool, 'key_1') group by `key_1`,ts order by value DESC) as B ON A.`key_1` = B.`key_1` AND A.`ts` = B.`ts`",
|
||||
},
|
||||
{
|
||||
Name: "test formula with dot in filter and group by attribute",
|
||||
@@ -497,11 +498,12 @@ var testLogsWithFormula = []struct {
|
||||
},
|
||||
},
|
||||
},
|
||||
ExpectedQuery: "SELECT A.`key1.1` as `key1.1`, A.`ts` as `ts`, A.value + B.value as value FROM (SELECT now() as ts, attributes_bool_value[indexOf(attributes_bool_key, 'key1.1')] as `key1.1`, " +
|
||||
"toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1702979056000000000 AND timestamp <= 1702982656000000000) AND attributes_bool_value[indexOf(attributes_bool_key, 'key1.1')] = true AND " +
|
||||
"has(attributes_bool_key, 'key1.1') group by `key1.1` order by value DESC) as A INNER JOIN (SELECT now() as ts, attributes_bool_value[indexOf(attributes_bool_key, 'key1.1')] as `key1.1`, " +
|
||||
"toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1702979056000000000 AND timestamp <= 1702982656000000000) AND attributes_bool_value[indexOf(attributes_bool_key, 'key1.2')] = true AND " +
|
||||
"has(attributes_bool_key, 'key1.1') group by `key1.1` order by value DESC) as B ON A.`key1.1` = B.`key1.1` AND A.`ts` = B.`ts`",
|
||||
ExpectedQuery: "SELECT A.`key1.1` as `key1.1`, A.`ts` as `ts`, A.value + B.value as value FROM (SELECT now() as ts, attributes_bool['key1.1'] as `key1.1`, " +
|
||||
"toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1702979056000000000 AND timestamp <= 1702982656000000000) AND (ts_bucket_start >= 1702977256 AND ts_bucket_start <= 1702982656) " +
|
||||
"AND attributes_bool['key1.1'] = true AND mapContains(attributes_bool, 'key1.1') AND mapContains(attributes_bool, 'key1.1') group by `key1.1` order by value DESC) as A INNER JOIN (SELECT now() as ts, " +
|
||||
"attributes_bool['key1.1'] as `key1.1`, toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1702979056000000000 AND timestamp <= 1702982656000000000) " +
|
||||
"AND (ts_bucket_start >= 1702977256 AND ts_bucket_start <= 1702982656) AND attributes_bool['key1.2'] = true AND mapContains(attributes_bool, 'key1.2') AND " +
|
||||
"mapContains(attributes_bool, 'key1.1') group by `key1.1` order by value DESC) as B ON A.`key1.1` = B.`key1.1` AND A.`ts` = B.`ts`",
|
||||
},
|
||||
{
|
||||
Name: "test formula with dot in filter and group by materialized attribute",
|
||||
@@ -558,10 +560,11 @@ var testLogsWithFormula = []struct {
|
||||
},
|
||||
},
|
||||
ExpectedQuery: "SELECT A.`key1.1` as `key1.1`, A.`ts` as `ts`, A.value - B.value as value FROM (SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, " +
|
||||
"`attribute_bool_key1$$1` as `key1.1`, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1702980884000000000 AND timestamp <= 1702984484000000000) AND " +
|
||||
"`attribute_bool_key_2` = true AND `attribute_bool_key1$$1_exists`=true group by `key1.1`,ts order by value DESC) as A INNER JOIN (SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), " +
|
||||
"INTERVAL 60 SECOND) AS ts, `attribute_bool_key1$$1` as `key1.1`, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1702980884000000000 AND " +
|
||||
"timestamp <= 1702984484000000000) AND attributes_bool_value[indexOf(attributes_bool_key, 'key_1')] = true AND `attribute_bool_key1$$1_exists`=true group by `key1.1`,ts order by value DESC) as B " +
|
||||
"`attribute_bool_key1$$1` as `key1.1`, toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1702980884000000000 AND timestamp <= 1702984484000000000) AND " +
|
||||
"(ts_bucket_start >= 1702979084 AND ts_bucket_start <= 1702984484) AND `attribute_bool_key_2` = true AND `attribute_bool_key1$$1_exists`=true group by `key1.1`,ts order by value DESC) as " +
|
||||
"A INNER JOIN (SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, `attribute_bool_key1$$1` as `key1.1`, toFloat64(count(*)) as value from " +
|
||||
"signoz_logs.distributed_logs_v2 where (timestamp >= 1702980884000000000 AND timestamp <= 1702984484000000000) AND (ts_bucket_start >= 1702979084 AND ts_bucket_start <= 1702984484) AND " +
|
||||
"attributes_bool['key_1'] = true AND mapContains(attributes_bool, 'key_1') AND `attribute_bool_key1$$1_exists`=true group by `key1.1`,ts order by value DESC) as B " +
|
||||
"ON A.`key1.1` = B.`key1.1` AND A.`ts` = B.`ts`",
|
||||
},
|
||||
}
|
||||
|
||||
@@ -311,6 +311,12 @@ const (
|
||||
"CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64," +
|
||||
"CAST((attributes_bool_key, attributes_bool_value), 'Map(String, Bool)') as attributes_bool," +
|
||||
"CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string "
|
||||
LogsSQLSelectV2 = "SELECT " +
|
||||
"timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body," +
|
||||
"attributes_string," +
|
||||
"attributes_number," +
|
||||
"attributes_bool," +
|
||||
"resources_string "
|
||||
TracesExplorerViewSQLSelectWithSubQuery = "WITH subQuery AS (SELECT distinct on (traceID) traceID, durationNano, " +
|
||||
"serviceName, name FROM %s.%s WHERE parentSpanID = '' AND %s %s ORDER BY durationNano DESC "
|
||||
TracesExplorerViewSQLSelectQuery = "SELECT subQuery.serviceName, subQuery.name, count() AS " +
|
||||
@@ -375,6 +381,12 @@ var StaticFieldsLogsV3 = map[string]v3.AttributeKey{
|
||||
Type: v3.AttributeKeyTypeUnspecified,
|
||||
IsColumn: true,
|
||||
},
|
||||
"__attrs": {
|
||||
Key: "__attrs",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeUnspecified,
|
||||
IsColumn: true,
|
||||
},
|
||||
}
|
||||
|
||||
const SigNozOrderByValue = "#SIGNOZ_VALUE"
|
||||
|
||||
@@ -14,6 +14,9 @@ import (
|
||||
|
||||
// ValidateAndCastValue validates and casts the value of a key to the corresponding data type of the key
|
||||
func ValidateAndCastValue(v interface{}, dataType v3.AttributeKeyDataType) (interface{}, error) {
|
||||
// if it's pointer convert it to a value
|
||||
v = getPointerValue(v)
|
||||
|
||||
switch dataType {
|
||||
case v3.AttributeKeyDataTypeString:
|
||||
switch x := v.(type) {
|
||||
@@ -244,10 +247,16 @@ func GetClickhouseColumnName(typeName string, dataType, field string) string {
|
||||
typeName = typeName[:len(typeName)-1]
|
||||
}
|
||||
|
||||
dataType = strings.ToLower(dataType)
|
||||
|
||||
if dataType == "int64" || dataType == "float64" {
|
||||
dataType = "number"
|
||||
}
|
||||
|
||||
// if name contains . replace it with `$$`
|
||||
field = strings.ReplaceAll(field, ".", "$$")
|
||||
|
||||
colName := fmt.Sprintf("`%s_%s_%s`", strings.ToLower(typeName), strings.ToLower(dataType), field)
|
||||
colName := fmt.Sprintf("`%s_%s_%s`", strings.ToLower(typeName), dataType, field)
|
||||
return colName
|
||||
}
|
||||
|
||||
|
||||
@@ -419,28 +419,28 @@ var testGetClickhouseColumnName = []struct {
|
||||
typeName: string(v3.AttributeKeyTypeTag),
|
||||
dataType: string(v3.AttributeKeyDataTypeInt64),
|
||||
field: "tag1",
|
||||
want: "attribute_int64_tag1",
|
||||
want: "`attribute_int64_tag1`",
|
||||
},
|
||||
{
|
||||
name: "resource",
|
||||
typeName: string(v3.AttributeKeyTypeResource),
|
||||
dataType: string(v3.AttributeKeyDataTypeInt64),
|
||||
field: "tag1",
|
||||
want: "resource_int64_tag1",
|
||||
want: "`resource_int64_tag1`",
|
||||
},
|
||||
{
|
||||
name: "attribute old parser",
|
||||
typeName: constants.Attributes,
|
||||
dataType: string(v3.AttributeKeyDataTypeInt64),
|
||||
field: "tag1",
|
||||
want: "attribute_int64_tag1",
|
||||
want: "`attribute_int64_tag1`",
|
||||
},
|
||||
{
|
||||
name: "resource old parser",
|
||||
typeName: constants.Resources,
|
||||
dataType: string(v3.AttributeKeyDataTypeInt64),
|
||||
field: "tag1",
|
||||
want: "resource_int64_tag1",
|
||||
want: "`resource_int64_tag1`",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user