Compare commits

...

4 Commits

Author SHA1 Message Date
Tushar Vats
616481d426 fix: fixed table names and clickhouse queries 2025-10-17 17:46:28 +05:30
Tushar Vats
6765f049a2 feat: updated integration tests 2025-10-17 17:46:28 +05:30
Tushar Vats
206a896f93 feat: fix table names 2025-10-17 17:45:28 +05:30
Tushar Vats
406cad5be5 feat: added custom retension for attributes 2025-10-17 17:45:28 +05:30
2 changed files with 167 additions and 185 deletions

View File

@@ -1428,23 +1428,67 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
// uuid is used as transaction id
uuidWithHyphen := uuid.New()
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
tableNames := []string{
r.TraceDB + "." + r.traceTableName,
r.TraceDB + "." + r.traceResourceTableV3,
r.TraceDB + "." + signozErrorIndexTable,
r.TraceDB + "." + signozUsageExplorerTable,
r.TraceDB + "." + defaultDependencyGraphTable,
r.TraceDB + "." + r.traceSummaryTable,
}
coldStorageDuration := -1
if len(params.ColdStorageVolume) > 0 {
coldStorageDuration = int(params.ToColdStorageDuration)
}
type ttlConfig struct {
TTLQuery string
TTLColumn string
ColdStorageQuery string
}
// TTL query
ttlV2 := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(%s) + INTERVAL " + fmt.Sprint(params.DelDuration) + " SECOND DELETE"
ttlV2ColdStorage := ", toDateTime(%s) + INTERVAL " + fmt.Sprint(params.ToColdStorageDuration) + " SECOND TO VOLUME '%s'"
// TTL query for resource table
ttlV2Resource := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(%s) + toIntervalSecond(1800) + INTERVAL " + fmt.Sprint(params.DelDuration) + " SECOND DELETE"
ttlTracesV2ResourceColdStorage := ", toDateTime(%s) + toIntervalSecond(1800) + INTERVAL " + fmt.Sprint(params.ToColdStorageDuration) + " SECOND TO VOLUME '%s'"
traceTTLConfigs := map[string]ttlConfig{
r.TraceDB + "." + r.traceTableName: {
TTLQuery: ttlV2,
TTLColumn: "timestamp",
ColdStorageQuery: ttlV2ColdStorage,
},
r.TraceDB + "." + r.traceResourceTableV3: {
TTLQuery: ttlV2Resource,
TTLColumn: "seen_at_ts_bucket_start",
ColdStorageQuery: ttlTracesV2ResourceColdStorage,
},
r.TraceDB + "." + signozErrorIndexTable: {
TTLQuery: ttlV2,
TTLColumn: "timestamp",
ColdStorageQuery: ttlV2ColdStorage,
},
r.TraceDB + "." + signozUsageExplorerTable: {
TTLQuery: ttlV2,
TTLColumn: "timestamp",
ColdStorageQuery: ttlV2ColdStorage,
},
r.TraceDB + "." + defaultDependencyGraphTable: {
TTLQuery: ttlV2,
TTLColumn: "timestamp",
ColdStorageQuery: ttlV2ColdStorage,
},
r.TraceDB + "." + r.traceSummaryTable: {
TTLQuery: ttlV2,
TTLColumn: "end",
ColdStorageQuery: ttlV2ColdStorage,
},
r.TraceDB + "." + r.spanAttributesKeysTable: {
TTLQuery: ttlV2,
TTLColumn: "timestamp",
},
}
// check if there is existing things to be done
for _, tableName := range tableNames {
statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
for tableName := range traceTTLConfigs {
localTableName := getLocalTableName(tableName)
statusItem, err := r.checkTTLStatusItem(ctx, orgID, localTableName)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
@@ -1453,23 +1497,14 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
}
}
// TTL query
ttlV2 := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(%s) + INTERVAL %v SECOND DELETE"
ttlV2ColdStorage := ", toDateTime(%s) + INTERVAL %v SECOND TO VOLUME '%s'"
// TTL query for resource table
ttlV2Resource := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + INTERVAL %v SECOND DELETE"
ttlTracesV2ResourceColdStorage := ", toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + INTERVAL %v SECOND TO VOLUME '%s'"
for _, distributedTableName := range tableNames {
for distributedTableName := range traceTTLConfigs {
go func(distributedTableName string) {
tableName := getLocalTableName(distributedTableName)
// for trace summary table, we need to use end instead of timestamp
timestamp := "timestamp"
if strings.HasSuffix(distributedTableName, r.traceSummaryTable) {
timestamp = "end"
}
timestamp := traceTTLConfigs[distributedTableName].TTLColumn
ttlV2 := traceTTLConfigs[distributedTableName].TTLQuery
ttlV2ColdStorage := traceTTLConfigs[distributedTableName].ColdStorageQuery
ttl := types.TTLSetting{
Identifiable: types.Identifiable{
@@ -1486,88 +1521,39 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
_, dbErr := r.
sqlDB.
BunDB().
NewInsert().
Model(&ttl).
Exec(ctx)
_, dbErr := r.sqlDB.BunDB().NewInsert().Model(&ttl).Exec(ctx)
if dbErr != nil {
zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
return
}
req := fmt.Sprintf(ttlV2, tableName, r.cluster, timestamp, params.DelDuration)
if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) {
req = fmt.Sprintf(ttlV2Resource, tableName, r.cluster, params.DelDuration)
}
req := fmt.Sprintf(ttlV2, tableName, r.cluster, timestamp)
if len(params.ColdStorageVolume) > 0 {
if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) {
req += fmt.Sprintf(ttlTracesV2ResourceColdStorage, params.ToColdStorageDuration, params.ColdStorageVolume)
} else {
req += fmt.Sprintf(ttlV2ColdStorage, timestamp, params.ToColdStorageDuration, params.ColdStorageVolume)
if len(params.ColdStorageVolume) > 0 && len(ttlV2ColdStorage) > 0 {
req += fmt.Sprintf(ttlV2ColdStorage, timestamp, params.ColdStorageVolume)
// set the cold storage volume if not already set
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil {
zap.L().Error("Error in setting cold storage", zap.Error(err))
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
return
}
}
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil {
zap.L().Error("Error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err == nil {
_, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return
}
}
return
}
req += " SETTINGS materialize_ttl_after_modify=0;"
zap.L().Error(" ExecutingTTL request: ", zap.String("request", req))
statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
if err := r.db.Exec(ctx, req); err != nil {
zap.L().Error("Error in executing set TTL query", zap.Error(err))
_, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return
}
return
}
_, dbErr = r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
return
}
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusSuccess)
}(distributedTableName)
}
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}
func (r *ClickHouseReader) hasCustomRetentionColumn(ctx context.Context) (bool, error) {
// Directly query for the _retention_days column existence
query := fmt.Sprintf("SELECT 1 FROM system.columns WHERE database = '%s' AND table = '%s' AND name = '_retention_days' LIMIT 1", r.logsDB, r.logsLocalTableV2)
@@ -1646,13 +1632,42 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
coldStorageDuration = int(params.ToColdStorageDuration) // Already in days
}
tableNames := []string{
r.logsDB + "." + r.logsLocalTableV2,
r.logsDB + "." + r.logsResourceLocalTableV2,
multiIfExpr := r.buildMultiIfExpression(params.TTLConditions, params.DefaultTTLDays, false)
resourceMultiIfExpr := r.buildMultiIfExpression(params.TTLConditions, params.DefaultTTLDays, true)
type ttlConfig struct {
CustomRetentionQuery string
TTLQuery string
ColdStorageQuery string
}
for _, tableName := range tableNames {
statusItem, err := r.checkCustomRetentionTTLStatusItem(ctx, orgID, tableName)
maxTTLDays := params.DefaultTTLDays
for _, condition := range params.TTLConditions {
maxTTLDays = max(maxTTLDays, condition.TTLDays)
}
logsTTLConfigs := map[string]ttlConfig{
r.logsDB + "." + r.logsTableV2: {
CustomRetentionQuery: "ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days UInt16 DEFAULT " + multiIfExpr,
TTLQuery: "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(toUInt32(timestamp / 1000), 'UTC') + INTERVAL " + fmt.Sprint(params.DefaultTTLDays) + " DAY DELETE",
ColdStorageQuery: ", toDateTime(toUInt32(timestamp / 1000), 'UTC') + INTERVAL " + fmt.Sprint(params.ToColdStorageDuration) + " DAY TO VOLUME '%s'",
},
r.logsDB + "." + r.logsResourceTableV2: {
CustomRetentionQuery: "ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days UInt16 DEFAULT " + resourceMultiIfExpr,
TTLQuery: "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(toUInt32(seen_at_ts_bucket_start), 'UTC') + INTERVAL 1800 SECOND + INTERVAL " + fmt.Sprint(params.DefaultTTLDays) + " DAY DELETE",
ColdStorageQuery: ", toDateTime(toUInt32(seen_at_ts_bucket_start), 'UTC') + INTERVAL 1800 SECOND + INTERVAL " + fmt.Sprint(params.ToColdStorageDuration) + " DAY TO VOLUME '%s'",
},
r.logsDB + "." + r.logsAttributeKeys: {
TTLQuery: "ALTER TABLE %v ON CLUSTER %s MODIFY TTL timestamp + INTERVAL " + fmt.Sprint(maxTTLDays) + " DAY DELETE",
},
r.logsDB + "." + r.logsResourceKeys: {
TTLQuery: "ALTER TABLE %v ON CLUSTER %s MODIFY TTL timestamp + INTERVAL " + fmt.Sprint(maxTTLDays) + " DAY DELETE",
},
}
for tableName := range logsTTLConfigs {
localTableName := getLocalTableName(tableName)
statusItem, err := r.checkCustomRetentionTTLStatusItem(ctx, orgID, localTableName)
if err != nil {
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "error in processing custom_retention_ttl_status check sql query")
}
@@ -1661,47 +1676,14 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
}
}
multiIfExpr := r.buildMultiIfExpression(params.TTLConditions, params.DefaultTTLDays, false)
resourceMultiIfExpr := r.buildMultiIfExpression(params.TTLConditions, params.DefaultTTLDays, true)
ttlPayload := make(map[string][]string)
queries := []string{
fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days UInt16 DEFAULT %s`,
tableNames[0], r.cluster, multiIfExpr),
}
if len(params.ColdStorageVolume) > 0 && coldStorageDuration > 0 {
queries = append(queries, fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days_cold UInt16 DEFAULT %d`,
tableNames[0], r.cluster, coldStorageDuration))
queries = append(queries, fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(timestamp / 1000000000) + toIntervalDay(_retention_days) DELETE, toDateTime(timestamp / 1000000000) + toIntervalDay(_retention_days_cold) TO VOLUME '%s' SETTINGS materialize_ttl_after_modify=0`,
tableNames[0], r.cluster, params.ColdStorageVolume))
}
ttlPayload[tableNames[0]] = queries
resourceQueries := []string{
fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days UInt16 DEFAULT %s`,
tableNames[1], r.cluster, resourceMultiIfExpr),
}
if len(params.ColdStorageVolume) > 0 && coldStorageDuration > 0 {
resourceQueries = append(resourceQueries, fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days_cold UInt16 DEFAULT %d`,
tableNames[1], r.cluster, coldStorageDuration))
resourceQueries = append(resourceQueries, fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + toIntervalDay(_retention_days) DELETE, toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + toIntervalDay(_retention_days_cold) TO VOLUME '%s' SETTINGS materialize_ttl_after_modify=0`,
tableNames[1], r.cluster, params.ColdStorageVolume))
}
ttlPayload[tableNames[1]] = resourceQueries
ttlConditionsJSON, err := json.Marshal(params.TTLConditions)
if err != nil {
return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error marshalling TTL condition")
}
for tableName, queries := range ttlPayload {
for tableName, ttlConfig := range logsTTLConfigs {
localTableName := getLocalTableName(tableName)
customTTL := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
@@ -1711,7 +1693,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
UpdatedAt: time.Now(),
},
TransactionID: uuid,
TableName: tableName,
TableName: localTableName,
TTL: params.DefaultTTLDays,
Condition: string(ttlConditionsJSON),
Status: constants.StatusPending,
@@ -1726,25 +1708,41 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
return nil, errorsV2.Wrapf(dbErr, errorsV2.TypeInternal, errorsV2.CodeInternal, "error inserting TTL settings")
}
if len(params.ColdStorageVolume) > 0 && coldStorageDuration > 0 {
err := r.setColdStorage(ctx, tableName, params.ColdStorageVolume)
// Enable custom retention column with default value
if ttlConfig.CustomRetentionQuery != "" {
customRetentionQuery := fmt.Sprintf(ttlConfig.CustomRetentionQuery, localTableName, r.cluster)
zap.L().Debug("Executing custom retention TTL request: ", zap.String("request", customRetentionQuery))
if err := r.db.Exec(ctx, customRetentionQuery); err != nil {
zap.L().Error("error while setting custom retention ttl", zap.Error(err))
r.updateCustomRetentionTTLStatus(ctx, orgID, localTableName, constants.StatusFailed)
return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting custom retention TTL for table %s, query: %s", tableName, ttlConfig.CustomRetentionQuery)
}
}
// Set TTL based on custom retention column
ttlQuery := fmt.Sprintf(ttlConfig.TTLQuery, localTableName, r.cluster)
if len(params.ColdStorageVolume) > 0 && coldStorageDuration > 0 && ttlConfig.ColdStorageQuery != "" {
ttlQuery += fmt.Sprintf(ttlConfig.ColdStorageQuery, localTableName, r.cluster, params.ColdStorageVolume)
// Enable cold storage volume
err := r.setColdStorage(ctx, localTableName, params.ColdStorageVolume)
if err != nil {
zap.L().Error("error in setting cold storage", zap.Error(err))
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
r.updateCustomRetentionTTLStatus(ctx, orgID, localTableName, constants.StatusFailed)
return nil, errorsV2.Wrapf(err.Err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting cold storage for table %s", tableName)
}
}
for i, query := range queries {
zap.L().Debug("Executing custom retention TTL request: ", zap.String("request", query), zap.Int("step", i+1))
if err := r.db.Exec(ctx, query); err != nil {
zap.L().Error("error while setting custom retention ttl", zap.Error(err))
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting custom retention TTL for table %s, query: %s", tableName, query)
}
}
ttlQuery += " SETTINGS materialize_ttl_after_modify=0"
zap.L().Info("Executing TTL request: ", zap.String("request", ttlQuery))
if err := r.db.Exec(ctx, ttlQuery); err != nil {
zap.L().Error("error while setting ttl.", zap.Error(err))
r.updateCustomRetentionTTLStatus(ctx, orgID, localTableName, constants.StatusFailed)
return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting TTL for table %s, query: %s", tableName, ttlQuery)
}
r.updateCustomRetentionTTLStatus(ctx, orgID, localTableName, constants.StatusSuccess)
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusSuccess)
}
return &model.CustomRetentionTTLResponse{
@@ -2066,6 +2064,7 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
if len(params.ColdStorageVolume) > 0 {
coldStorageDuration = int(params.ToColdStorageDuration)
}
tableNames := []string{
signozMetricDBName + "." + signozSampleLocalTableName,
signozMetricDBName + "." + signozSamplesAgg5mLocalTableName,
@@ -2127,57 +2126,17 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil {
zap.L().Error("Error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err == nil {
_, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return
}
}
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
return
}
req += " SETTINGS materialize_ttl_after_modify=0"
zap.L().Info("Executing TTL request: ", zap.String("request", req))
statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
if err := r.db.Exec(ctx, req); err != nil {
zap.L().Error("error while setting ttl.", zap.Error(err))
_, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return
}
return
}
_, dbErr = r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
return
}
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusSuccess)
}
for _, tableName := range tableNames {
go metricTTL(tableName)

View File

@@ -64,6 +64,7 @@ def test_set_ttl_traces_success(
"usage_explorer",
"dependency_graph_minutes_v2",
"trace_summary",
"span_attributes_keys"
]
# Query to get table engine info which includes TTL
@@ -252,13 +253,18 @@ def test_set_custom_retention_ttl_basic(
time.sleep(2)
# Check TTL settings on relevant tables
tables_to_check = [
tables_to_check_with_custom_retention = [
"logs_v2",
"logs_v2_resource",
]
tables_to_check_with_basic_retention = [
"logs_attribute_keys",
"logs_resource_keys"
]
# Query to get table engine info which includes TTL
table_list = "', '".join(tables_to_check)
table_list = "', '".join(tables_to_check_with_custom_retention)
query = f"SELECT engine_full FROM system.tables WHERE table in ['{table_list}']"
result = signoz.telemetrystore.conn.query(query).result_rows
@@ -292,6 +298,23 @@ def test_set_custom_retention_ttl_basic(
), f"Expected default value of _retention_days to be 100 in table {table}, but got {retention_col[3]}"
# Query to get table engine info which includes TTL
table_list = ", ".join(f"'{table}'" for table in tables_to_check_with_basic_retention)
query = f"SELECT engine_full FROM system.tables WHERE table in [{table_list}]"
result = signoz.telemetrystore.conn.query(query).result_rows
# Verify TTL exists in all table definitions
assert all("TTL" in r[0] for r in result)
assert all(" SETTINGS" in r[0] for r in result)
ttl_parts = [r[0].split("TTL ")[1].split(" SETTINGS")[0] for r in result]
# All TTLs should include toIntervalSecond(12960000) which is 3600h
assert all("100 Day" in ttl_part for ttl_part in ttl_parts)
def test_set_custom_retention_ttl_basic_fallback(
signoz: types.SigNoz,
get_token,