Files
kami_backend/internal/logic/camel_oil/cron_tasks.go
danial 3588bf9af6 feat(camel_oil): 支持Token管理与卡密绑定功能
- 新增CamelOilToken和CamelOilCardBinding数据库表,实现Token及卡密绑定记录管理
- 在service层增加Token的创建、查询、更新、删除及分页功能
- 实现卡密与Token绑定的业务逻辑,支持基于Token的卡密管理
- 在API层新增Token和卡密绑定相关接口:创建Token、获取Token详情、删除Token、列出Token及根据Token查询绑定卡密
- camel_oil_api新增绑卡接口,支持绑卡状态分类及错误处理
- 在定时任务中增加卡密绑定任务,实现自动处理已支付订单的卡密绑定
- 优化订单提交及支付流程,包含日志调整和请求参数随机扰动
- 统一调整camel_oil模块多控制器实现,完成账号状态查询及订单相关接口实现
- 注册更多camel_oil定时任务,包括订单支付检查、账号日重置和待回调订单处理任务
2025-11-23 00:08:35 +08:00

376 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package camel_oil
import (
"context"
"fmt"
"time"
"kami/internal/consts"
"kami/internal/dao"
"kami/internal/model/do"
"kami/internal/model/entity"
"kami/utility/config"
"kami/utility/integration/camel_oil_api"
"kami/utility/integration/pig"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
)
// CronAccountPrefetchTask 账户预拉取定时任务 - 由cron调度器定期调用
// 流程:并发拉取账户到指定数量
func (s *sCamelOil) CronAccountPrefetchTask(ctx context.Context) error {
glog.Info(ctx, "开始执行账户预拉取任务")
// 1. 获取当前在线账号数量
m := dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1())
onlineCount, err := m.Where(dao.V1CamelOilAccount.Columns().Status, consts.CamelOilAccountStatusOnline).
WhereOr(dao.V1CamelOilAccount.Columns().Status, consts.CamelOilAccountStatusSendCode).
Count()
if err != nil {
glog.Errorf(ctx, "获取在线账号数量失败: %v", err)
onlineCount = 0
}
glog.Infof(ctx, "当前在线账号数量: %d, 目标数量: %d", onlineCount, consts.CamelOilTargetOnlineAccounts)
// 2. 如果在线账号少于目标数,触发并发登录
if onlineCount < consts.CamelOilTargetOnlineAccounts {
needCount := consts.CamelOilTargetOnlineAccounts - onlineCount
glog.Infof(ctx, "在线账号不足,需要登录 %d 个账号", needCount)
// 使用并发登录提高效率
successCount, err := s.BatchLoginAccounts(ctx, int64(needCount))
if err != nil {
glog.Errorf(ctx, "批量登录账号失败: %v", err)
// 不返回错误,继续执行
} else {
glog.Infof(ctx, "成功登录 %d 个账号", successCount)
}
}
return nil
}
// CronPrefetchOrderSupplementTask 预拉取订单补充定时任务 - 由cron调度器定期调用
// 流程:检查预拉取订单库存,不足时补充
func (s *sCamelOil) CronPrefetchOrderSupplementTask(ctx context.Context) error {
glog.Info(ctx, "开始执行预拉取订单补充任务")
// 1. 检查预拉取订单库存
capacity, err := s.GetPrefetchOrderCapacity(ctx)
if err != nil {
glog.Errorf(ctx, "获取预拉取订单库存失败: %v", err)
return err
}
glog.Infof(ctx, "当前预拉取订单库存: %d", capacity)
// 2. 如果库存不足则补充
if capacity < consts.CamelOilPrefetchOrderMinCapacity {
glog.Infof(ctx, "预拉取订单库存不足,开始补充任务")
supplementedCount, err := s.SupplementPrefetchOrders(ctx)
if err != nil {
glog.Errorf(ctx, "预拉取订单补充失败: %v", err)
return err
}
glog.Infof(ctx, "预拉取订单补充完成,补充数量: %d", supplementedCount)
} else {
glog.Infof(ctx, "预拉取订单库存充足 (%d >= %d),无需补充", capacity, consts.CamelOilPrefetchOrderMinCapacity)
}
glog.Info(ctx, "预拉取订单补充任务完成")
return nil
}
// CronOrderPaymentCheckTask 订单支付状态检测任务 - 由cron调度器定期调用
func (s *sCamelOil) CronOrderPaymentCheckTask(ctx context.Context) error {
// 查询待支付订单创建时间在24小时内
var orders []*entity.V1CamelOilOrder
err := dao.V1CamelOilOrder.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilOrder.Columns().PayStatus, consts.CamelOilPaymentStatusUnpaid).
WhereGTE(dao.V1CamelOilOrder.Columns().CreatedAt, gtime.Now().Add(time.Hour*6)).
Scan(&orders)
if err != nil {
glog.Error(ctx, "查询待支付订单失败:", err)
return err
}
if len(orders) == 0 {
glog.Debug(ctx, "无待支付订单")
return nil
}
glog.Infof(ctx, "查询到 %d 个待支付订单", len(orders))
paidCount := 0
timeoutCount := 0
for _, order := range orders {
accountInfo, err2 := s.GetAccountInfo(ctx, order.AccountId)
if err2 != nil {
glog.Error(ctx, "获取账号信息失败:", err2)
// 记录该订单检查失败
_ = s.UpdateOrderStatus(ctx, order.Id, consts.CamelOilOrderStatusFailed, consts.CamelOilOrderChangeTypeFail, "", fmt.Sprintf("查询账户失败: %v", err2))
continue
}
// 查询订单状态
queryResult, err2 := camel_oil_api.NewClient().QueryOrder(ctx, accountInfo.Phone, accountInfo.Token, order.PlatformOrderNo)
if err2 != nil {
glog.Error(ctx, "查询订单状态失败:", err2)
_ = s.RecordOrderHistory(ctx, order.OrderNo, consts.CamelOilOrderChangeType("query_failed"), "", fmt.Sprintf("查询订单失败: %v", err))
continue
}
// 订单已支付
if queryResult != nil {
_ = s.fillOrderCard(ctx, order.OrderNo, queryResult.CardNumber, queryResult.CardNumber)
if order.PayStatus != int(consts.CamelOilPaymentStatusPaid) {
glog.Infof(ctx, "订单%s已支付金额: %.2f", order.OrderNo, queryResult.Balance)
_, _ = dao.V1CamelOilOrder.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilOrder.Columns().Id, order.Id).
Update(&do.V1CamelOilOrder{
CardNumber: queryResult.CardNumber,
CardPassword: queryResult.CardPassword,
PaidAt: gtime.Now(),
PayStatus: consts.CamelOilPaymentStatusPaid,
})
_ = s.RecordOrderHistory(ctx, order.OrderNo, consts.CamelOilOrderChangeTypePaid, "", fmt.Sprintf("支付成功,金额: %.2f", queryResult.Balance))
paidCount++
}
continue
}
// 订单未支付检查是否超时24小时
if gtime.Now().Sub(order.CreatedAt).Hours() >= 1 {
glog.Warningf(ctx, "订单%s支付超时创建时间: %v", order.OrderNo, order.CreatedAt)
_, _ = dao.V1CamelOilOrder.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilOrder.Columns().Id, order.Id).
Update(&do.V1CamelOilOrder{
Status: consts.CamelOilOrderStatusFailed,
PayStatus: consts.CamelOilPaymentStatusTimeout,
FailureReason: "支付时间超过24小时支付超时",
})
_ = s.RecordOrderHistory(ctx, order.OrderNo, consts.CamelOilOrderChangeTypeTimeout, "", "订单支付超时")
timeoutCount++
}
}
glog.Infof(ctx, "订单支付状态检测完成: 已支付=%d, 超时=%d", paidCount, timeoutCount)
return nil
}
// CronAccountDailyResetTask 账号日重置任务 - 由cron调度器在每日00:00调用
func (s *sCamelOil) CronAccountDailyResetTask(ctx context.Context) error {
glog.Info(ctx, "开始执行账号日重置任务")
yesterday := gtime.Now().AddDate(0, 0, -1)
_, _ = dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilAccount.Columns().Status, consts.CamelOilAccountStatusOnline).
Update(&do.V1CamelOilAccount{
DailyOrderCount: 0,
DailyOrderDate: gtime.Now(),
})
// 查询所有暂停的账号
var accounts []*entity.V1CamelOilAccount
err := dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilAccount.Columns().Status, consts.CamelOilAccountStatusPaused).
Scan(&accounts)
if err != nil {
glog.Error(ctx, "查询暂停账号失败:", err)
return err
}
glog.Infof(ctx, "查询到 %d 个暂停账号", len(accounts))
resumedCount := 0
invalidCount := 0
for _, account := range accounts {
// 检查是否是昨日的记录
if account.DailyOrderDate == nil || account.DailyOrderDate.Format("Y-m-d") != yesterday.Format("Y-m-d") {
continue
}
if account.DailyOrderCount >= 10 {
// 正常完成,重置为在线
_ = s.UpdateAccountStatus(ctx, account.Id, consts.CamelOilAccountStatusOnline,
consts.CamelOilAccountChangeTypeResume, "次日重置,恢复使用")
// 同时重置日算信息
_, _ = dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilAccount.Columns().Id, account.Id).
Update(&do.V1CamelOilAccount{
DailyOrderCount: 0,
DailyOrderDate: gtime.Now(),
})
resumedCount++
} else {
// 单日下单不足10个标记为失效
_ = s.UpdateAccountStatus(ctx, account.Id, consts.CamelOilAccountStatusInvalid,
consts.CamelOilAccountChangeTypeInvalidate, "单日下单不足10个账号失效")
invalidCount++
}
}
glog.Infof(ctx, "账号日重置任务完成: 恢复=%d, 失效=%d", resumedCount, invalidCount)
return nil
}
func (s *sCamelOil) CronVerifyCodeCheckTask(ctx context.Context) error {
glog.Info(ctx, "开始执行验证码检测任务")
var accounts []*entity.V1CamelOilAccount
err := dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilAccount.Columns().Status, consts.CamelOilAccountStatusSendCode).
Scan(&accounts)
if err != nil {
glog.Error(ctx, "查询待验证码账号失败:", err)
return err
}
glog.Infof(ctx, "查询到 %d 个待验证码账号", len(accounts))
successCount := 0
failCount := 0
pigClient := pig.NewClient()
camelClient := camel_oil_api.NewClient()
for _, account := range accounts {
//如果时间超过 1 分钟,就是过期
if gtime.Now().Sub(account.CreatedAt).Minutes() > 1 {
glog.Warningf(ctx, "验证码已过期账号ID: %d, 手机号: %s", account.Id, account.Phone)
_ = s.UpdateAccountStatus(ctx, account.Id, consts.CamelOilAccountStatusInvalid,
consts.CamelOilAccountChangeTypeLoginFail, "验证码已过期")
failCount++
continue
}
// 从野猪平台检测验证码是否已接收
verifyCode, received, err := pigClient.CheckVerifyCode(ctx, account.Phone)
if err != nil {
glog.Warningf(ctx, "检测验证码失败账号ID: %d, 手机号: %s, 错误: %v", account.Id, account.Phone, err)
failCount++
continue
}
// 验证码未接收,继续等待
if !received {
continue
}
// 验证码已接收,执行登录
glog.Infof(ctx, "验证码已接收开始执行登录账号ID: %d, 手机号: %s", account.Id, account.Phone)
// 调用骆驼加油平台执行登录
token, err := camelClient.LoginWithCaptcha(ctx, account.Phone, verifyCode)
if err != nil {
glog.Errorf(ctx, "登录失败账号ID: %d, 手机号: %s, 错误: %v", account.Id, account.Phone, err)
// 标记账号失效
_ = s.UpdateAccountStatus(ctx, account.Id, consts.CamelOilAccountStatusInvalid,
consts.CamelOilAccountChangeTypeLoginFail, fmt.Sprintf("登录失败: %v", err))
failCount++
continue
}
// 保存 token
_, err = dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilAccount.Columns().Id, account.Id).
Update(&do.V1CamelOilAccount{
Token: token,
LastLoginAt: gtime.Now(),
})
if err != nil {
glog.Errorf(ctx, "保存 token 失败账号ID: %d, 错误: %v", account.Id, err)
failCount++
continue
}
// 调用 UpdateAccountStatus 更新账号状态为在线
_ = s.UpdateAccountStatus(ctx, account.Id, consts.CamelOilAccountStatusOnline,
consts.CamelOilAccountChangeTypeLogin, fmt.Sprintf("登录成功,手机号: %s", account.Phone))
glog.Infof(ctx, "账号登录成功ID: %d, 手机号: %s, Token: %s", account.Id, account.Phone, token)
successCount++
}
glog.Infof(ctx, "验证码检测任务完成: 成功=%d, 失败=%d", successCount, failCount)
return nil
}
// CronCardBindingTask 卡密绑定定时任务 - 由cron调度器定期调用
// 流程:处理已支付但未绑定 Token 的订单,进行卡密绑定
func (s *sCamelOil) CronCardBindingTask(ctx context.Context) error {
glog.Info(ctx, "开始执行卡密绑定任务")
// 查询已支付但未绑定 Token 的订单
var orders []*entity.V1CamelOilOrder
err := dao.V1CamelOilOrder.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilOrder.Columns().PayStatus, consts.CamelOilPaymentStatusPaid).
WhereNotIn(dao.V1CamelOilOrder.Columns().Id,
dao.V1CamelOilCardBinding.Ctx(ctx).DB(config.GetDatabaseV1()).
Fields(dao.V1CamelOilCardBinding.Columns().OrderId)).
Limit(50).
Scan(&orders)
if err != nil {
glog.Error(ctx, "查询待绑定订单失败", err)
return err
}
if len(orders) == 0 {
glog.Debug(ctx, "无待绑定订单")
return nil
}
glog.Infof(ctx, "查询到 %d 个待绑定订单", len(orders))
successCount := 0
failCount := 0
for _, order := range orders {
// 检查是否有可用的 Token
availableTokenCount, err := dao.V1CamelOilToken.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilToken.Columns().Status, consts.CamelOilTokenStatusAvailable).
Count()
if err != nil || availableTokenCount == 0 {
glog.Warningf(ctx, "无可用 Token订单 ID: %d 无法绑定", order.Id)
failCount++
continue
}
// 检查卡号和卡密是否存在
if order.CardNumber == "" || order.CardPassword == "" {
glog.Warningf(ctx, "订单 %d 卡号或卡密未填写,无法绑定", order.Id)
failCount++
continue
}
// 尝试绑定卡密到 Token
_, err = s.BindCardToToken(ctx, order.Id, order.CardNumber, order.CardPassword, order.Amount)
if err != nil {
glog.Errorf(ctx, "绑定卡密到 Token 失败,订单 ID: %d, 错误: %v", order.Id, err)
failCount++
continue
}
glog.Infof(ctx, "订单 %d 卡密绑定成功", order.Id)
successCount++
}
glog.Infof(ctx, "卡密绑定任务完成: 成功=%d, 失败=%d", successCount, failCount)
return nil
}