diff --git a/.github/workflows/integrationci.yaml b/.github/workflows/integrationci.yaml index a1f5e7a1e7..ee29f663d3 100644 --- a/.github/workflows/integrationci.yaml +++ b/.github/workflows/integrationci.yaml @@ -25,7 +25,7 @@ jobs: clickhouse-version: - 25.5.6 schema-migrator-version: - - v0.129.6 + - v0.129.7 postgres-version: - 15 if: | diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 3369262eaf..f31a359b0b 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -1276,6 +1276,154 @@ func getLocalTableName(tableName string) string { } +func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { + hasCustomRetention, err := r.hasCustomRetentionColumn(ctx) + if hasCustomRetention { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("SetTTLV2 only supported")} + } + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing TTL")} + } + // uuid is used as transaction id + uuidWithHyphen := uuid.New() + uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) + + coldStorageDuration := -1 + if len(params.ColdStorageVolume) > 0 { + coldStorageDuration = int(params.ToColdStorageDuration) + } + + tableNameArray := []string{r.logsDB + "." + r.logsLocalTableV2, r.logsDB + "." + r.logsResourceLocalTableV2} + + // check if there is existing things to be done + for _, tableName := range tableNameArray { + statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} + } + if statusItem.Status == constants.StatusPending { + return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")} + } + } + + // TTL query for logs_v2 table + ttlLogsV2 := fmt.Sprintf( + "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp / 1000000000) + "+ + "INTERVAL %v SECOND DELETE", tableNameArray[0], r.cluster, params.DelDuration) + if len(params.ColdStorageVolume) > 0 { + ttlLogsV2 += fmt.Sprintf(", toDateTime(timestamp / 1000000000)"+ + " + INTERVAL %v SECOND TO VOLUME '%s'", + params.ToColdStorageDuration, params.ColdStorageVolume) + } + + // TTL query for logs_v2_resource table + // adding 1800 as our bucket size is 1800 seconds + ttlLogsV2Resource := fmt.Sprintf( + "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + "+ + "INTERVAL %v SECOND DELETE", tableNameArray[1], r.cluster, params.DelDuration) + if len(params.ColdStorageVolume) > 0 { + ttlLogsV2Resource += fmt.Sprintf(", toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + "+ + "INTERVAL %v SECOND TO VOLUME '%s'", + params.ToColdStorageDuration, params.ColdStorageVolume) + } + + ttlPayload := map[string]string{ + tableNameArray[0]: ttlLogsV2, + tableNameArray[1]: ttlLogsV2Resource, + } + + // set the ttl if nothing is pending/ no errors + go func(ttlPayload map[string]string) { + for tableName, query := range ttlPayload { + // https://github.com/SigNoz/signoz/issues/5470 + // we will change ttl for only the new parts and not the old ones + query += " SETTINGS materialize_ttl_after_modify=0" + + ttl := types.TTLSetting{ + Identifiable: types.Identifiable{ + ID: valuer.GenerateUUID(), + }, + TimeAuditable: types.TimeAuditable{ + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }, + TransactionID: uuid, + TableName: tableName, + TTL: int(params.DelDuration), + Status: constants.StatusPending, + ColdStorageTTL: coldStorageDuration, + OrgID: orgID, + } + _, dbErr := r. + sqlDB. + BunDB(). + NewInsert(). + Model(&ttl). + Exec(ctx) + if dbErr != nil { + zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr)) + return + } + + err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) + if err != nil { + zap.L().Error("error in setting cold storage", zap.Error(err)) + statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) + if err == nil { + _, dbErr := r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusFailed). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) + if dbErr != nil { + zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) + return + } + } + return + } + zap.L().Info("Executing TTL request: ", zap.String("request", query)) + statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName) + if err := r.db.Exec(ctx, query); err != nil { + zap.L().Error("error while setting ttl", zap.Error(err)) + _, dbErr := r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusFailed). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) + if dbErr != nil { + zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) + return + } + return + } + _, dbErr = r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusSuccess). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) + if dbErr != nil { + zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) + return + } + } + + }(ttlPayload) + return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil +} + func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { // uuid is used as transaction id uuidWithHyphen := uuid.New() @@ -1901,6 +2049,8 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *mod return r.setTTLTraces(ctx, orgID, params) case constants.MetricsTTL: return r.setTTLMetrics(ctx, orgID, params) + case constants.LogsTTL: + return r.setTTLLogs(ctx, orgID, params) default: return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be , got %v", params.Type)} } @@ -2041,10 +2191,11 @@ func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, orgID stri sqlDB. BunDB(). NewSelect(). - ColumnExpr("distinct(transaction_id)"). + Column("transaction_id"). Model(new(types.TTLSetting)). Where("org_id = ?", orgID). - OrderExpr("created_at DESC"). + Group("transaction_id"). + OrderExpr("MAX(created_at) DESC"). Limit(numberOfTransactionsStore). Scan(ctx, &limitTransactions) diff --git a/tests/integration/fixtures/logs.py b/tests/integration/fixtures/logs.py index 4b246f3cd0..1d7477cbe9 100644 --- a/tests/integration/fixtures/logs.py +++ b/tests/integration/fixtures/logs.py @@ -413,3 +413,71 @@ def insert_logs( clickhouse.conn.query( f"TRUNCATE TABLE signoz_logs.logs_resource_keys ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC" ) + + +@pytest.fixture(name="ttl_legacy_logs_v2_table_setup", scope="function") +def ttl_legacy_logs_v2_table_setup(request, signoz: types.SigNoz): + """ + Fixture to setup and teardown legacy TTL test environment. + It renames existing logs tables to backup names and creates new empty tables for testing. + After the test, it restores the original tables. + """ + + # Setup code + result = signoz.telemetrystore.conn.query( + "RENAME TABLE signoz_logs.logs_v2 TO signoz_logs.logs_v2_backup;" + ).result_rows + assert result is not None + # Add cleanup to restore original table + request.addfinalizer(lambda: signoz.telemetrystore.conn.query("RENAME TABLE signoz_logs.logs_v2_backup TO signoz_logs.logs_v2;")) + + # Create new test tables + result = signoz.telemetrystore.conn.query( + """CREATE TABLE signoz_logs.logs_v2 + ( + `id` String, + `timestamp` UInt64 CODEC(DoubleDelta, LZ4) + + ) + ENGINE = MergeTree() + ORDER BY id;""" + ).result_rows + + assert result is not None + # Add cleanup to drop test table + request.addfinalizer(lambda: signoz.telemetrystore.conn.query("DROP TABLE IF EXISTS signoz_logs.logs_v2;")) + + yield # Test runs here + +@pytest.fixture(name="ttl_legacy_logs_v2_resource_table_setup", scope="function") +def ttl_legacy_logs_v2_resource_table_setup(request, signoz: types.SigNoz): + """ + Fixture to setup and teardown legacy TTL test environment. + It renames existing logs tables to backup names and creates new empty tables for testing. + After the test, it restores the original tables. + """ + + # Setup code + result = signoz.telemetrystore.conn.query( + "RENAME TABLE signoz_logs.logs_v2_resource TO signoz_logs.logs_v2_resource_backup;" + ).result_rows + assert result is not None + # Add cleanup to restore original table + request.addfinalizer(lambda: signoz.telemetrystore.conn.query("RENAME TABLE signoz_logs.logs_v2_resource_backup TO signoz_logs.logs_v2_resource;")) + + # Create new test tables + result = signoz.telemetrystore.conn.query( + """CREATE TABLE signoz_logs.logs_v2_resource + ( + `id` String, + `seen_at_ts_bucket_start` Int64 CODEC(Delta(8), ZSTD(1)) + ) + ENGINE = MergeTree() + ORDER BY id;""" + ).result_rows + + assert result is not None + # Add cleanup to drop test table + request.addfinalizer(lambda: signoz.telemetrystore.conn.query("DROP TABLE IF EXISTS signoz_logs.logs_v2_resource;")) + + yield # Test runs here \ No newline at end of file diff --git a/tests/integration/src/ttl/a_ttl.py b/tests/integration/src/ttl/a_ttl.py index 8f754da131..23c4b02394 100644 --- a/tests/integration/src/ttl/a_ttl.py +++ b/tests/integration/src/ttl/a_ttl.py @@ -47,7 +47,6 @@ def test_set_ttl_traces_success( timeout=30, ) - print(response.text) assert response.status_code == HTTPStatus.OK response_data = response.json() assert "message" in response_data @@ -293,6 +292,133 @@ def test_set_custom_retention_ttl_basic( ), f"Expected default value of _retention_days to be 100 in table {table}, but got {retention_col[3]}" +def test_set_custom_retention_ttl_basic_fallback( + signoz: types.SigNoz, + get_token, + ttl_legacy_logs_v2_table_setup, # pylint: disable=unused-argument + ttl_legacy_logs_v2_resource_table_setup, # pylint: disable=unused-argument +): + """Test setting TTL for logs using the new setTTLLogs method.""" + + payload = { + "type": "logs", + "defaultTTLDays": 100, + "ttlConditions": [], + "coldStorageVolume": "", + "coldStorageDuration": 0, + } + + headers = { + "Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}" + } + + response = requests.post( + signoz.self.host_configs["8080"].get("/api/v2/settings/ttl"), + json=payload, + headers=headers, + timeout=30, + ) + + assert response.status_code == HTTPStatus.OK + response_data = response.json() + assert "message" in response_data + assert "successfully set up" in response_data["message"].lower() + + # Verify TTL settings in Clickhouse + # Allow some time for the TTL to be applied + time.sleep(2) + + # Check TTL settings on relevant logs tables + tables_to_check = [ + "logs_v2", + "logs_v2_resource", + ] + + # Query to get table engine info which includes TTL + table_list = "', '".join(tables_to_check) + query = f"SELECT engine_full FROM system.tables WHERE table in ['{table_list}']" + + result = signoz.telemetrystore.conn.query(query).result_rows + + # Verify TTL exists in all table definitions + assert all("TTL" in r[0] for r in result) + + assert all(" SETTINGS" in r[0] for r in result) + + ttl_parts = [r[0].split("TTL ")[1].split(" SETTINGS")[0] for r in result] + + # All TTLs should include toIntervalSecond(8640000) which is 100 days + assert all("toIntervalSecond(8640000)" in ttl_part for ttl_part in ttl_parts) + + +def test_set_custom_retention_ttl_basic_101_times(signoz: types.SigNoz, get_token): + """Test setting custom retention TTL with basic configuration to trigger housekeeping.""" + + for _ in range(101): + payload = { + "type": "logs", + "defaultTTLDays": 100, + "ttlConditions": [], + "coldStorageVolume": "", + "coldStorageDuration": 0, + } + + headers = { + "Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}" + } + + response = requests.post( + signoz.self.host_configs["8080"].get("/api/v2/settings/ttl"), + json=payload, + headers=headers, + timeout=30, + ) + + assert response.status_code == HTTPStatus.OK + response_data = response.json() + assert "message" in response_data + + # Check TTL settings on relevant tables + tables_to_check = [ + "logs_v2", + "logs_v2_resource", + ] + + # Query to get table engine info which includes TTL + table_list = "', '".join(tables_to_check) + query = f"SELECT engine_full FROM system.tables WHERE table in ['{table_list}']" + result = signoz.telemetrystore.conn.query(query).result_rows + + # Verify TTL exists in all table definitions + assert all("TTL" in r[0] for r in result) + + assert all(" SETTINGS" in r[0] for r in result) + + ttl_parts = [r[0].split("TTL ")[1].split(" SETTINGS")[0] for r in result] + + # Also verify the TTL parts contain retention_days + assert all("_retention_days" in ttl_part for ttl_part in ttl_parts) + + # Query to describe tables and check retention_days column + for table in tables_to_check: + describe_query = f"DESCRIBE TABLE signoz_logs.{table}" + describe_result = signoz.telemetrystore.conn.query(describe_query).result_rows + + # Find the _retention_days column + retention_col = next( + (row for row in describe_result if row[0] == "_retention_days"), None + ) + assert ( + retention_col is not None + ), f"_retention_days column not found in table {table}" + assert ( + retention_col[1] == "UInt16" + ), f"Expected _retention_days to be UInt16 in table {table}, but got {retention_col[1]}" + assert ( + retention_col[3] == "100" + ), f"Expected default value of _retention_days to be 100 in table {table}, but got {retention_col[3]}" + + def test_set_custom_retention_ttl_with_conditions( signoz: types.SigNoz, get_token: Callable[[str, str], str], insert_logs ): @@ -526,6 +652,62 @@ def test_get_custom_retention_ttl( ] +def test_set_ttl_logs_success( + signoz: types.SigNoz, + get_token, + ttl_legacy_logs_v2_table_setup,# pylint: disable=unused-argument + ttl_legacy_logs_v2_resource_table_setup,# pylint: disable=unused-argument +): + """Test setting TTL for logs using the new setTTLLogs method.""" + + payload = { + "type": "logs", + "duration": "3600h", + } + + headers = { + "Authorization": f"Bearer {get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)}" + } + + response = requests.post( + signoz.self.host_configs["8080"].get("/api/v1/settings/ttl"), + params=payload, + headers=headers, + timeout=30, + ) + + assert response.status_code == HTTPStatus.OK + response_data = response.json() + assert "message" in response_data + assert "successfully set up" in response_data["message"].lower() + + # Verify TTL settings in Clickhouse + # Allow some time for the TTL to be applied + time.sleep(2) + + # Check TTL settings on relevant logs tables + tables_to_check = [ + "logs_v2", + "logs_v2_resource", + ] + + # Query to get table engine info which includes TTL + table_list = "', '".join(tables_to_check) + query = f"SELECT engine_full FROM system.tables WHERE table in ['{table_list}']" + + result = signoz.telemetrystore.conn.query(query).result_rows + + # Verify TTL exists in all table definitions + assert all("TTL" in r[0] for r in result) + + assert all(" SETTINGS" in r[0] for r in result) + + ttl_parts = [r[0].split("TTL ")[1].split(" SETTINGS")[0] for r in result] + + # All TTLs should include toIntervalSecond(12960000) which is 3600h + assert all("toIntervalSecond(12960000)" in ttl_part for ttl_part in ttl_parts) + + def test_get_ttl_traces_success( signoz: types.SigNoz, get_token: Callable[[str, str], str] ): @@ -547,7 +729,6 @@ def test_get_ttl_traces_success( timeout=30, ) - print(set_response.text) assert set_response.status_code == HTTPStatus.OK # Allow some time for the TTL to be processed