fix: added ttl for logs_attribute_keys, logs_resource_keys and span_attributes_keys (#9545)
* fix: added ttl for logs_attribute_keys, logs_resource_keys and span_attributes_keys * fix: table name consitent * fix: table name Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: typo Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: ttl query for retention --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -1293,7 +1293,12 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
|
||||
coldStorageDuration = int(params.ToColdStorageDuration)
|
||||
}
|
||||
|
||||
tableNameArray := []string{r.logsDB + "." + r.logsLocalTableV2, r.logsDB + "." + r.logsResourceLocalTableV2}
|
||||
tableNameArray := []string{
|
||||
r.logsDB + "." + r.logsLocalTableV2,
|
||||
r.logsDB + "." + r.logsResourceLocalTableV2,
|
||||
getLocalTableName(r.logsDB + "." + r.logsAttributeKeys),
|
||||
getLocalTableName(r.logsDB + "." + r.logsResourceKeys),
|
||||
}
|
||||
|
||||
// check if there is existing things to be done
|
||||
for _, tableName := range tableNameArray {
|
||||
@@ -1327,9 +1332,19 @@ func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params
|
||||
params.ToColdStorageDuration, params.ColdStorageVolume)
|
||||
}
|
||||
|
||||
ttlLogsV2AttributeKeys := fmt.Sprintf(
|
||||
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL timestamp + "+
|
||||
"INTERVAL %v SECOND DELETE", tableNameArray[2], r.cluster, params.DelDuration)
|
||||
|
||||
ttlLogsV2ResourceKeys := fmt.Sprintf(
|
||||
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL timestamp + "+
|
||||
"INTERVAL %v SECOND DELETE", tableNameArray[3], r.cluster, params.DelDuration)
|
||||
|
||||
ttlPayload := map[string]string{
|
||||
tableNameArray[0]: ttlLogsV2,
|
||||
tableNameArray[1]: ttlLogsV2Resource,
|
||||
tableNameArray[2]: ttlLogsV2AttributeKeys,
|
||||
tableNameArray[3]: ttlLogsV2ResourceKeys,
|
||||
}
|
||||
|
||||
// set the ttl if nothing is pending/ no errors
|
||||
@@ -1435,6 +1450,7 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
|
||||
r.TraceDB + "." + signozUsageExplorerTable,
|
||||
r.TraceDB + "." + defaultDependencyGraphTable,
|
||||
r.TraceDB + "." + r.traceSummaryTable,
|
||||
r.TraceDB + "." + r.spanAttributesKeysTable,
|
||||
}
|
||||
|
||||
coldStorageDuration := -1
|
||||
@@ -1502,7 +1518,7 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
|
||||
req = fmt.Sprintf(ttlV2Resource, tableName, r.cluster, params.DelDuration)
|
||||
}
|
||||
|
||||
if len(params.ColdStorageVolume) > 0 {
|
||||
if len(params.ColdStorageVolume) > 0 && !strings.HasSuffix(distributedTableName, r.spanAttributesKeysTable) {
|
||||
if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) {
|
||||
req += fmt.Sprintf(ttlTracesV2ResourceColdStorage, params.ToColdStorageDuration, params.ColdStorageVolume)
|
||||
} else {
|
||||
@@ -1649,6 +1665,8 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
|
||||
tableNames := []string{
|
||||
r.logsDB + "." + r.logsLocalTableV2,
|
||||
r.logsDB + "." + r.logsResourceLocalTableV2,
|
||||
getLocalTableName(r.logsDB + "." + r.logsAttributeKeys),
|
||||
getLocalTableName(r.logsDB + "." + r.logsResourceKeys),
|
||||
}
|
||||
|
||||
for _, tableName := range tableNames {
|
||||
@@ -1696,6 +1714,23 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
|
||||
|
||||
ttlPayload[tableNames[1]] = resourceQueries
|
||||
|
||||
// NOTE: Since logs support custom rule based retention, that makes it difficult to identify which attributes, resource keys
|
||||
// we need to keep, hence choosing MAX for safe side and not to create any complex solution for this.
|
||||
maxRetentionTTL := params.DefaultTTLDays
|
||||
for _, rule := range params.TTLConditions {
|
||||
maxRetentionTTL = max(maxRetentionTTL, rule.TTLDays)
|
||||
}
|
||||
|
||||
ttlPayload[tableNames[2]] = []string{
|
||||
fmt.Sprintf("ALTER TABLE %s ON CLUSTER %s MODIFY TTL timestamp + toIntervalDay(%d) DELETE SETTINGS materialize_ttl_after_modify=0",
|
||||
tableNames[2], r.cluster, maxRetentionTTL),
|
||||
}
|
||||
|
||||
ttlPayload[tableNames[3]] = []string{
|
||||
fmt.Sprintf("ALTER TABLE %s ON CLUSTER %s MODIFY TTL timestamp + toIntervalDay(%d) DELETE SETTINGS materialize_ttl_after_modify=0",
|
||||
tableNames[3], r.cluster, maxRetentionTTL),
|
||||
}
|
||||
|
||||
ttlConditionsJSON, err := json.Marshal(params.TTLConditions)
|
||||
if err != nil {
|
||||
return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error marshalling TTL condition")
|
||||
|
||||
@@ -60,6 +60,6 @@ def pytest_addoption(parser: pytest.Parser):
|
||||
parser.addoption(
|
||||
"--schema-migrator-version",
|
||||
action="store",
|
||||
default="v0.129.6",
|
||||
default="v0.129.7",
|
||||
help="schema migrator version",
|
||||
)
|
||||
|
||||
@@ -37,7 +37,7 @@ def test_set_ttl_traces_success(
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
@@ -64,6 +64,7 @@ def test_set_ttl_traces_success(
|
||||
"usage_explorer",
|
||||
"dependency_graph_minutes_v2",
|
||||
"trace_summary",
|
||||
"span_attributes_keys",
|
||||
]
|
||||
|
||||
# Query to get table engine info which includes TTL
|
||||
@@ -92,7 +93,7 @@ def test_set_ttl_traces_with_cold_storage(signoz: types.SigNoz, get_token: Calla
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
@@ -120,7 +121,7 @@ def test_set_ttl_metrics_success(
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
@@ -179,7 +180,7 @@ def test_set_ttl_metrics_with_cold_storage(
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
@@ -207,7 +208,7 @@ def test_set_ttl_invalid_type(
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
@@ -233,7 +234,7 @@ def test_set_custom_retention_ttl_basic(
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
@@ -291,6 +292,26 @@ def test_set_custom_retention_ttl_basic(
|
||||
retention_col[3] == "100"
|
||||
), f"Expected default value of _retention_days to be 100 in table {table}, but got {retention_col[3]}"
|
||||
|
||||
tables_to_check = [
|
||||
"logs_attribute_keys",
|
||||
"logs_resource_keys"
|
||||
]
|
||||
|
||||
# Query to get table engine info which includes TTL
|
||||
table_list = "', '".join(tables_to_check)
|
||||
query = f"SELECT engine_full FROM system.tables WHERE table in ['{table_list}']"
|
||||
result = signoz.telemetrystore.conn.query(query).result_rows
|
||||
|
||||
# Verify TTL exists in all table definitions
|
||||
assert all("TTL" in r[0] for r in result)
|
||||
|
||||
assert all(" SETTINGS" in r[0] for r in result)
|
||||
|
||||
ttl_parts = [r[0].split("TTL ")[1].split(" SETTINGS")[0] for r in result]
|
||||
|
||||
# Also verify the TTL parts contain retention_days
|
||||
assert all("toIntervalDay(100)" in ttl_part for ttl_part in ttl_parts)
|
||||
|
||||
|
||||
def test_set_custom_retention_ttl_basic_fallback(
|
||||
signoz: types.SigNoz,
|
||||
@@ -309,7 +330,7 @@ def test_set_custom_retention_ttl_basic_fallback(
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
@@ -332,6 +353,8 @@ def test_set_custom_retention_ttl_basic_fallback(
|
||||
tables_to_check = [
|
||||
"logs_v2",
|
||||
"logs_v2_resource",
|
||||
"logs_attribute_keys",
|
||||
"logs_resource_keys"
|
||||
]
|
||||
|
||||
# Query to get table engine info which includes TTL
|
||||
@@ -364,7 +387,7 @@ def test_set_custom_retention_ttl_basic_101_times(signoz: types.SigNoz, get_toke
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
@@ -440,7 +463,7 @@ def test_set_custom_retention_ttl_with_conditions(
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
@@ -494,7 +517,7 @@ def test_set_custom_retention_ttl_with_cold_storage(
|
||||
insert_logs(logs)
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
@@ -538,7 +561,7 @@ def test_set_custom_retention_ttl_duplicate_conditions(
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
@@ -575,7 +598,7 @@ def test_set_custom_retention_ttl_invalid_condition(
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
@@ -614,7 +637,7 @@ def test_get_custom_retention_ttl(
|
||||
insert_logs(logs)
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
set_response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v2/settings/ttl"),
|
||||
@@ -629,7 +652,7 @@ def test_get_custom_retention_ttl(
|
||||
|
||||
# Now get the TTL configuration
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
get_response = requests.get(
|
||||
@@ -666,7 +689,7 @@ def test_set_ttl_logs_success(
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
@@ -683,7 +706,7 @@ def test_set_ttl_logs_success(
|
||||
|
||||
# Verify TTL settings in Clickhouse
|
||||
# Allow some time for the TTL to be applied
|
||||
time.sleep(2)
|
||||
time.sleep(5)
|
||||
|
||||
# Check TTL settings on relevant logs tables
|
||||
tables_to_check = [
|
||||
@@ -719,7 +742,7 @@ def test_get_ttl_traces_success(
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
set_response = requests.post(
|
||||
@@ -786,7 +809,7 @@ def test_large_ttl_conditions_list(
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}"
|
||||
"Authorization": f"Bearer {get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)}"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
|
||||
Reference in New Issue
Block a user