|
|
|
|
@@ -1428,23 +1428,67 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
|
|
|
|
|
// uuid is used as transaction id
|
|
|
|
|
uuidWithHyphen := uuid.New()
|
|
|
|
|
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
|
|
|
|
|
tableNames := []string{
|
|
|
|
|
r.TraceDB + "." + r.traceTableName,
|
|
|
|
|
r.TraceDB + "." + r.traceResourceTableV3,
|
|
|
|
|
r.TraceDB + "." + signozErrorIndexTable,
|
|
|
|
|
r.TraceDB + "." + signozUsageExplorerTable,
|
|
|
|
|
r.TraceDB + "." + defaultDependencyGraphTable,
|
|
|
|
|
r.TraceDB + "." + r.traceSummaryTable,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
coldStorageDuration := -1
|
|
|
|
|
if len(params.ColdStorageVolume) > 0 {
|
|
|
|
|
coldStorageDuration = int(params.ToColdStorageDuration)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ttlConfig struct {
|
|
|
|
|
TTLQuery string
|
|
|
|
|
TTLColumn string
|
|
|
|
|
ColdStorageQuery string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TTL query
|
|
|
|
|
ttlV2 := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(%s) + INTERVAL " + fmt.Sprint(params.DelDuration) + " SECOND DELETE"
|
|
|
|
|
ttlV2ColdStorage := ", toDateTime(%s) + INTERVAL " + fmt.Sprint(params.ToColdStorageDuration) + " SECOND TO VOLUME '%s'"
|
|
|
|
|
|
|
|
|
|
// TTL query for resource table
|
|
|
|
|
ttlV2Resource := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(%s) + toIntervalSecond(1800) + INTERVAL " + fmt.Sprint(params.DelDuration) + " SECOND DELETE"
|
|
|
|
|
ttlTracesV2ResourceColdStorage := ", toDateTime(%s) + toIntervalSecond(1800) + INTERVAL " + fmt.Sprint(params.ToColdStorageDuration) + " SECOND TO VOLUME '%s'"
|
|
|
|
|
|
|
|
|
|
traceTTLConfigs := map[string]ttlConfig{
|
|
|
|
|
r.TraceDB + "." + r.traceTableName: {
|
|
|
|
|
TTLQuery: ttlV2,
|
|
|
|
|
TTLColumn: "timestamp",
|
|
|
|
|
ColdStorageQuery: ttlV2ColdStorage,
|
|
|
|
|
},
|
|
|
|
|
r.TraceDB + "." + r.traceResourceTableV3: {
|
|
|
|
|
TTLQuery: ttlV2Resource,
|
|
|
|
|
TTLColumn: "seen_at_ts_bucket_start",
|
|
|
|
|
ColdStorageQuery: ttlTracesV2ResourceColdStorage,
|
|
|
|
|
},
|
|
|
|
|
r.TraceDB + "." + signozErrorIndexTable: {
|
|
|
|
|
TTLQuery: ttlV2,
|
|
|
|
|
TTLColumn: "timestamp",
|
|
|
|
|
ColdStorageQuery: ttlV2ColdStorage,
|
|
|
|
|
},
|
|
|
|
|
r.TraceDB + "." + signozUsageExplorerTable: {
|
|
|
|
|
TTLQuery: ttlV2,
|
|
|
|
|
TTLColumn: "timestamp",
|
|
|
|
|
ColdStorageQuery: ttlV2ColdStorage,
|
|
|
|
|
},
|
|
|
|
|
r.TraceDB + "." + defaultDependencyGraphTable: {
|
|
|
|
|
TTLQuery: ttlV2,
|
|
|
|
|
TTLColumn: "timestamp",
|
|
|
|
|
ColdStorageQuery: ttlV2ColdStorage,
|
|
|
|
|
},
|
|
|
|
|
r.TraceDB + "." + r.traceSummaryTable: {
|
|
|
|
|
TTLQuery: ttlV2,
|
|
|
|
|
TTLColumn: "end",
|
|
|
|
|
ColdStorageQuery: ttlV2ColdStorage,
|
|
|
|
|
},
|
|
|
|
|
r.TraceDB + "." + r.spanAttributesKeysTable: {
|
|
|
|
|
TTLQuery: ttlV2,
|
|
|
|
|
TTLColumn: "timestamp",
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// check if there is existing things to be done
|
|
|
|
|
for _, tableName := range tableNames {
|
|
|
|
|
statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
|
|
|
|
|
for tableName := range traceTTLConfigs {
|
|
|
|
|
localTableName := getLocalTableName(tableName)
|
|
|
|
|
statusItem, err := r.checkTTLStatusItem(ctx, orgID, localTableName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
|
|
|
|
|
}
|
|
|
|
|
@@ -1453,23 +1497,14 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TTL query
|
|
|
|
|
ttlV2 := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(%s) + INTERVAL %v SECOND DELETE"
|
|
|
|
|
ttlV2ColdStorage := ", toDateTime(%s) + INTERVAL %v SECOND TO VOLUME '%s'"
|
|
|
|
|
|
|
|
|
|
// TTL query for resource table
|
|
|
|
|
ttlV2Resource := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + INTERVAL %v SECOND DELETE"
|
|
|
|
|
ttlTracesV2ResourceColdStorage := ", toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + INTERVAL %v SECOND TO VOLUME '%s'"
|
|
|
|
|
|
|
|
|
|
for _, distributedTableName := range tableNames {
|
|
|
|
|
for distributedTableName := range traceTTLConfigs {
|
|
|
|
|
go func(distributedTableName string) {
|
|
|
|
|
tableName := getLocalTableName(distributedTableName)
|
|
|
|
|
|
|
|
|
|
// for trace summary table, we need to use end instead of timestamp
|
|
|
|
|
timestamp := "timestamp"
|
|
|
|
|
if strings.HasSuffix(distributedTableName, r.traceSummaryTable) {
|
|
|
|
|
timestamp = "end"
|
|
|
|
|
}
|
|
|
|
|
timestamp := traceTTLConfigs[distributedTableName].TTLColumn
|
|
|
|
|
ttlV2 := traceTTLConfigs[distributedTableName].TTLQuery
|
|
|
|
|
ttlV2ColdStorage := traceTTLConfigs[distributedTableName].ColdStorageQuery
|
|
|
|
|
|
|
|
|
|
ttl := types.TTLSetting{
|
|
|
|
|
Identifiable: types.Identifiable{
|
|
|
|
|
@@ -1486,88 +1521,39 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
|
|
|
|
|
ColdStorageTTL: coldStorageDuration,
|
|
|
|
|
OrgID: orgID,
|
|
|
|
|
}
|
|
|
|
|
_, dbErr := r.
|
|
|
|
|
sqlDB.
|
|
|
|
|
BunDB().
|
|
|
|
|
NewInsert().
|
|
|
|
|
Model(&ttl).
|
|
|
|
|
Exec(ctx)
|
|
|
|
|
_, dbErr := r.sqlDB.BunDB().NewInsert().Model(&ttl).Exec(ctx)
|
|
|
|
|
if dbErr != nil {
|
|
|
|
|
zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
req := fmt.Sprintf(ttlV2, tableName, r.cluster, timestamp, params.DelDuration)
|
|
|
|
|
if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) {
|
|
|
|
|
req = fmt.Sprintf(ttlV2Resource, tableName, r.cluster, params.DelDuration)
|
|
|
|
|
}
|
|
|
|
|
req := fmt.Sprintf(ttlV2, tableName, r.cluster, timestamp)
|
|
|
|
|
|
|
|
|
|
if len(params.ColdStorageVolume) > 0 {
|
|
|
|
|
if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) {
|
|
|
|
|
req += fmt.Sprintf(ttlTracesV2ResourceColdStorage, params.ToColdStorageDuration, params.ColdStorageVolume)
|
|
|
|
|
} else {
|
|
|
|
|
req += fmt.Sprintf(ttlV2ColdStorage, timestamp, params.ToColdStorageDuration, params.ColdStorageVolume)
|
|
|
|
|
if len(params.ColdStorageVolume) > 0 && len(ttlV2ColdStorage) > 0 {
|
|
|
|
|
req += fmt.Sprintf(ttlV2ColdStorage, timestamp, params.ColdStorageVolume)
|
|
|
|
|
|
|
|
|
|
// set the cold storage volume if not already set
|
|
|
|
|
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
|
|
|
|
|
if err != nil {
|
|
|
|
|
zap.L().Error("Error in setting cold storage", zap.Error(err))
|
|
|
|
|
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
|
|
|
|
|
if err != nil {
|
|
|
|
|
zap.L().Error("Error in setting cold storage", zap.Error(err))
|
|
|
|
|
statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
|
|
|
|
|
if err == nil {
|
|
|
|
|
_, dbErr := r.
|
|
|
|
|
sqlDB.
|
|
|
|
|
BunDB().
|
|
|
|
|
NewUpdate().
|
|
|
|
|
Model(new(types.TTLSetting)).
|
|
|
|
|
Set("updated_at = ?", time.Now()).
|
|
|
|
|
Set("status = ?", constants.StatusFailed).
|
|
|
|
|
Where("id = ?", statusItem.ID.StringValue()).
|
|
|
|
|
Exec(ctx)
|
|
|
|
|
if dbErr != nil {
|
|
|
|
|
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
req += " SETTINGS materialize_ttl_after_modify=0;"
|
|
|
|
|
zap.L().Error(" ExecutingTTL request: ", zap.String("request", req))
|
|
|
|
|
statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
|
|
|
|
|
if err := r.db.Exec(ctx, req); err != nil {
|
|
|
|
|
zap.L().Error("Error in executing set TTL query", zap.Error(err))
|
|
|
|
|
_, dbErr := r.
|
|
|
|
|
sqlDB.
|
|
|
|
|
BunDB().
|
|
|
|
|
NewUpdate().
|
|
|
|
|
Model(new(types.TTLSetting)).
|
|
|
|
|
Set("updated_at = ?", time.Now()).
|
|
|
|
|
Set("status = ?", constants.StatusFailed).
|
|
|
|
|
Where("id = ?", statusItem.ID.StringValue()).
|
|
|
|
|
Exec(ctx)
|
|
|
|
|
if dbErr != nil {
|
|
|
|
|
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
_, dbErr = r.
|
|
|
|
|
sqlDB.
|
|
|
|
|
BunDB().
|
|
|
|
|
NewUpdate().
|
|
|
|
|
Model(new(types.TTLSetting)).
|
|
|
|
|
Set("updated_at = ?", time.Now()).
|
|
|
|
|
Set("status = ?", constants.StatusSuccess).
|
|
|
|
|
Where("id = ?", statusItem.ID.StringValue()).
|
|
|
|
|
Exec(ctx)
|
|
|
|
|
if dbErr != nil {
|
|
|
|
|
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
|
|
|
|
|
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusSuccess)
|
|
|
|
|
}(distributedTableName)
|
|
|
|
|
}
|
|
|
|
|
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (r *ClickHouseReader) hasCustomRetentionColumn(ctx context.Context) (bool, error) {
|
|
|
|
|
// Directly query for the _retention_days column existence
|
|
|
|
|
query := fmt.Sprintf("SELECT 1 FROM system.columns WHERE database = '%s' AND table = '%s' AND name = '_retention_days' LIMIT 1", r.logsDB, r.logsLocalTableV2)
|
|
|
|
|
@@ -1646,13 +1632,42 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
|
|
|
|
|
coldStorageDuration = int(params.ToColdStorageDuration) // Already in days
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tableNames := []string{
|
|
|
|
|
r.logsDB + "." + r.logsLocalTableV2,
|
|
|
|
|
r.logsDB + "." + r.logsResourceLocalTableV2,
|
|
|
|
|
multiIfExpr := r.buildMultiIfExpression(params.TTLConditions, params.DefaultTTLDays, false)
|
|
|
|
|
resourceMultiIfExpr := r.buildMultiIfExpression(params.TTLConditions, params.DefaultTTLDays, true)
|
|
|
|
|
|
|
|
|
|
type ttlConfig struct {
|
|
|
|
|
CustomRetentionQuery string
|
|
|
|
|
TTLQuery string
|
|
|
|
|
ColdStorageQuery string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, tableName := range tableNames {
|
|
|
|
|
statusItem, err := r.checkCustomRetentionTTLStatusItem(ctx, orgID, tableName)
|
|
|
|
|
maxTTLDays := params.DefaultTTLDays
|
|
|
|
|
for _, condition := range params.TTLConditions {
|
|
|
|
|
maxTTLDays = max(maxTTLDays, condition.TTLDays)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logsTTLConfigs := map[string]ttlConfig{
|
|
|
|
|
r.logsDB + "." + r.logsTableV2: {
|
|
|
|
|
CustomRetentionQuery: "ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days UInt16 DEFAULT " + multiIfExpr,
|
|
|
|
|
TTLQuery: "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(toUInt32(timestamp / 1000), 'UTC') + INTERVAL " + fmt.Sprint(params.DefaultTTLDays) + " DAY DELETE",
|
|
|
|
|
ColdStorageQuery: ", toDateTime(toUInt32(timestamp / 1000), 'UTC') + INTERVAL " + fmt.Sprint(params.ToColdStorageDuration) + " DAY TO VOLUME '%s'",
|
|
|
|
|
},
|
|
|
|
|
r.logsDB + "." + r.logsResourceTableV2: {
|
|
|
|
|
CustomRetentionQuery: "ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days UInt16 DEFAULT " + resourceMultiIfExpr,
|
|
|
|
|
TTLQuery: "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(toUInt32(seen_at_ts_bucket_start), 'UTC') + INTERVAL 1800 SECOND + INTERVAL " + fmt.Sprint(params.DefaultTTLDays) + " DAY DELETE",
|
|
|
|
|
ColdStorageQuery: ", toDateTime(toUInt32(seen_at_ts_bucket_start), 'UTC') + INTERVAL 1800 SECOND + INTERVAL " + fmt.Sprint(params.ToColdStorageDuration) + " DAY TO VOLUME '%s'",
|
|
|
|
|
},
|
|
|
|
|
r.logsDB + "." + r.logsAttributeKeys: {
|
|
|
|
|
TTLQuery: "ALTER TABLE %v ON CLUSTER %s MODIFY TTL timestamp + INTERVAL " + fmt.Sprint(maxTTLDays) + " DAY DELETE",
|
|
|
|
|
},
|
|
|
|
|
r.logsDB + "." + r.logsResourceKeys: {
|
|
|
|
|
TTLQuery: "ALTER TABLE %v ON CLUSTER %s MODIFY TTL timestamp + INTERVAL " + fmt.Sprint(maxTTLDays) + " DAY DELETE",
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for tableName := range logsTTLConfigs {
|
|
|
|
|
localTableName := getLocalTableName(tableName)
|
|
|
|
|
statusItem, err := r.checkCustomRetentionTTLStatusItem(ctx, orgID, localTableName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "error in processing custom_retention_ttl_status check sql query")
|
|
|
|
|
}
|
|
|
|
|
@@ -1661,47 +1676,14 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
multiIfExpr := r.buildMultiIfExpression(params.TTLConditions, params.DefaultTTLDays, false)
|
|
|
|
|
resourceMultiIfExpr := r.buildMultiIfExpression(params.TTLConditions, params.DefaultTTLDays, true)
|
|
|
|
|
|
|
|
|
|
ttlPayload := make(map[string][]string)
|
|
|
|
|
|
|
|
|
|
queries := []string{
|
|
|
|
|
fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days UInt16 DEFAULT %s`,
|
|
|
|
|
tableNames[0], r.cluster, multiIfExpr),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(params.ColdStorageVolume) > 0 && coldStorageDuration > 0 {
|
|
|
|
|
queries = append(queries, fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days_cold UInt16 DEFAULT %d`,
|
|
|
|
|
tableNames[0], r.cluster, coldStorageDuration))
|
|
|
|
|
|
|
|
|
|
queries = append(queries, fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(timestamp / 1000000000) + toIntervalDay(_retention_days) DELETE, toDateTime(timestamp / 1000000000) + toIntervalDay(_retention_days_cold) TO VOLUME '%s' SETTINGS materialize_ttl_after_modify=0`,
|
|
|
|
|
tableNames[0], r.cluster, params.ColdStorageVolume))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ttlPayload[tableNames[0]] = queries
|
|
|
|
|
|
|
|
|
|
resourceQueries := []string{
|
|
|
|
|
fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days UInt16 DEFAULT %s`,
|
|
|
|
|
tableNames[1], r.cluster, resourceMultiIfExpr),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(params.ColdStorageVolume) > 0 && coldStorageDuration > 0 {
|
|
|
|
|
resourceQueries = append(resourceQueries, fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days_cold UInt16 DEFAULT %d`,
|
|
|
|
|
tableNames[1], r.cluster, coldStorageDuration))
|
|
|
|
|
|
|
|
|
|
resourceQueries = append(resourceQueries, fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + toIntervalDay(_retention_days) DELETE, toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + toIntervalDay(_retention_days_cold) TO VOLUME '%s' SETTINGS materialize_ttl_after_modify=0`,
|
|
|
|
|
tableNames[1], r.cluster, params.ColdStorageVolume))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ttlPayload[tableNames[1]] = resourceQueries
|
|
|
|
|
|
|
|
|
|
ttlConditionsJSON, err := json.Marshal(params.TTLConditions)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error marshalling TTL condition")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for tableName, queries := range ttlPayload {
|
|
|
|
|
for tableName, ttlConfig := range logsTTLConfigs {
|
|
|
|
|
|
|
|
|
|
localTableName := getLocalTableName(tableName)
|
|
|
|
|
customTTL := types.TTLSetting{
|
|
|
|
|
Identifiable: types.Identifiable{
|
|
|
|
|
ID: valuer.GenerateUUID(),
|
|
|
|
|
@@ -1711,7 +1693,7 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
|
|
|
|
|
UpdatedAt: time.Now(),
|
|
|
|
|
},
|
|
|
|
|
TransactionID: uuid,
|
|
|
|
|
TableName: tableName,
|
|
|
|
|
TableName: localTableName,
|
|
|
|
|
TTL: params.DefaultTTLDays,
|
|
|
|
|
Condition: string(ttlConditionsJSON),
|
|
|
|
|
Status: constants.StatusPending,
|
|
|
|
|
@@ -1726,25 +1708,41 @@ func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *m
|
|
|
|
|
return nil, errorsV2.Wrapf(dbErr, errorsV2.TypeInternal, errorsV2.CodeInternal, "error inserting TTL settings")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(params.ColdStorageVolume) > 0 && coldStorageDuration > 0 {
|
|
|
|
|
err := r.setColdStorage(ctx, tableName, params.ColdStorageVolume)
|
|
|
|
|
// Enable custom retention column with default value
|
|
|
|
|
if ttlConfig.CustomRetentionQuery != "" {
|
|
|
|
|
customRetentionQuery := fmt.Sprintf(ttlConfig.CustomRetentionQuery, localTableName, r.cluster)
|
|
|
|
|
zap.L().Debug("Executing custom retention TTL request: ", zap.String("request", customRetentionQuery))
|
|
|
|
|
if err := r.db.Exec(ctx, customRetentionQuery); err != nil {
|
|
|
|
|
zap.L().Error("error while setting custom retention ttl", zap.Error(err))
|
|
|
|
|
r.updateCustomRetentionTTLStatus(ctx, orgID, localTableName, constants.StatusFailed)
|
|
|
|
|
return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting custom retention TTL for table %s, query: %s", tableName, ttlConfig.CustomRetentionQuery)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// Set TTL based on custom retention column
|
|
|
|
|
ttlQuery := fmt.Sprintf(ttlConfig.TTLQuery, localTableName, r.cluster)
|
|
|
|
|
|
|
|
|
|
if len(params.ColdStorageVolume) > 0 && coldStorageDuration > 0 && ttlConfig.ColdStorageQuery != "" {
|
|
|
|
|
ttlQuery += fmt.Sprintf(ttlConfig.ColdStorageQuery, localTableName, r.cluster, params.ColdStorageVolume)
|
|
|
|
|
|
|
|
|
|
// Enable cold storage volume
|
|
|
|
|
err := r.setColdStorage(ctx, localTableName, params.ColdStorageVolume)
|
|
|
|
|
if err != nil {
|
|
|
|
|
zap.L().Error("error in setting cold storage", zap.Error(err))
|
|
|
|
|
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
|
|
|
|
|
r.updateCustomRetentionTTLStatus(ctx, orgID, localTableName, constants.StatusFailed)
|
|
|
|
|
return nil, errorsV2.Wrapf(err.Err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting cold storage for table %s", tableName)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for i, query := range queries {
|
|
|
|
|
zap.L().Debug("Executing custom retention TTL request: ", zap.String("request", query), zap.Int("step", i+1))
|
|
|
|
|
if err := r.db.Exec(ctx, query); err != nil {
|
|
|
|
|
zap.L().Error("error while setting custom retention ttl", zap.Error(err))
|
|
|
|
|
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
|
|
|
|
|
return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting custom retention TTL for table %s, query: %s", tableName, query)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
ttlQuery += " SETTINGS materialize_ttl_after_modify=0"
|
|
|
|
|
zap.L().Info("Executing TTL request: ", zap.String("request", ttlQuery))
|
|
|
|
|
|
|
|
|
|
if err := r.db.Exec(ctx, ttlQuery); err != nil {
|
|
|
|
|
zap.L().Error("error while setting ttl.", zap.Error(err))
|
|
|
|
|
r.updateCustomRetentionTTLStatus(ctx, orgID, localTableName, constants.StatusFailed)
|
|
|
|
|
return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting TTL for table %s, query: %s", tableName, ttlQuery)
|
|
|
|
|
}
|
|
|
|
|
r.updateCustomRetentionTTLStatus(ctx, orgID, localTableName, constants.StatusSuccess)
|
|
|
|
|
|
|
|
|
|
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusSuccess)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &model.CustomRetentionTTLResponse{
|
|
|
|
|
@@ -2066,6 +2064,7 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
|
|
|
|
|
if len(params.ColdStorageVolume) > 0 {
|
|
|
|
|
coldStorageDuration = int(params.ToColdStorageDuration)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tableNames := []string{
|
|
|
|
|
signozMetricDBName + "." + signozSampleLocalTableName,
|
|
|
|
|
signozMetricDBName + "." + signozSamplesAgg5mLocalTableName,
|
|
|
|
|
@@ -2127,57 +2126,17 @@ func (r *ClickHouseReader) setTTLMetrics(ctx context.Context, orgID string, para
|
|
|
|
|
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
|
|
|
|
|
if err != nil {
|
|
|
|
|
zap.L().Error("Error in setting cold storage", zap.Error(err))
|
|
|
|
|
statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
|
|
|
|
|
if err == nil {
|
|
|
|
|
_, dbErr := r.
|
|
|
|
|
sqlDB.
|
|
|
|
|
BunDB().
|
|
|
|
|
NewUpdate().
|
|
|
|
|
Model(new(types.TTLSetting)).
|
|
|
|
|
Set("updated_at = ?", time.Now()).
|
|
|
|
|
Set("status = ?", constants.StatusFailed).
|
|
|
|
|
Where("id = ?", statusItem.ID.StringValue()).
|
|
|
|
|
Exec(ctx)
|
|
|
|
|
if dbErr != nil {
|
|
|
|
|
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
req += " SETTINGS materialize_ttl_after_modify=0"
|
|
|
|
|
zap.L().Info("Executing TTL request: ", zap.String("request", req))
|
|
|
|
|
statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
|
|
|
|
|
if err := r.db.Exec(ctx, req); err != nil {
|
|
|
|
|
zap.L().Error("error while setting ttl.", zap.Error(err))
|
|
|
|
|
_, dbErr := r.
|
|
|
|
|
sqlDB.
|
|
|
|
|
BunDB().
|
|
|
|
|
NewUpdate().
|
|
|
|
|
Model(new(types.TTLSetting)).
|
|
|
|
|
Set("updated_at = ?", time.Now()).
|
|
|
|
|
Set("status = ?", constants.StatusFailed).
|
|
|
|
|
Where("id = ?", statusItem.ID.StringValue()).
|
|
|
|
|
Exec(ctx)
|
|
|
|
|
if dbErr != nil {
|
|
|
|
|
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
_, dbErr = r.
|
|
|
|
|
sqlDB.
|
|
|
|
|
BunDB().
|
|
|
|
|
NewUpdate().
|
|
|
|
|
Model(new(types.TTLSetting)).
|
|
|
|
|
Set("updated_at = ?", time.Now()).
|
|
|
|
|
Set("status = ?", constants.StatusSuccess).
|
|
|
|
|
Where("id = ?", statusItem.ID.StringValue()).
|
|
|
|
|
Exec(ctx)
|
|
|
|
|
if dbErr != nil {
|
|
|
|
|
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
|
|
|
|
|
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusSuccess)
|
|
|
|
|
}
|
|
|
|
|
for _, tableName := range tableNames {
|
|
|
|
|
go metricTTL(tableName)
|
|
|
|
|
|