Files
kami_backend/internal/logic/camel_oil/cron_tasks.go
danial 85b552eec3 feat(camel_oil): add order export to Excel functionality
- Add ExportOrder RPC method to camel_oil API and service interfaces
- Implement service logic to query orders and generate Excel file with order data
- Include card number and password fields in order export
- Create HTTP handler to stream Excel file with proper headers for download
- Handle token status update on frequent error ban (oneDay case)
- Fix order processing query to filter by status and pay status correctly
- Add new error code for one-day ban in camel_oil_api and handle in client logic
- Update order model and response to include card number and password
- Remove redundant logging of SendCaptcha request data in camel_oil_api client
- Add access control checks on ExportOrder endpoint for authorized users only
2025-12-11 20:13:52 +08:00

417 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package camel_oil
import (
"context"
"fmt"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/net/gtrace"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"kami/internal/consts"
"kami/internal/dao"
"kami/internal/model"
"kami/internal/model/do"
"kami/internal/model/entity"
"kami/utility/config"
"kami/utility/integration/camel_oil_api"
"kami/utility/integration/pig"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
)
// CronAccountPrefetchTask 账户预拉取定时任务 - 由cron调度器定期调用
// 流程:并发拉取账户到指定数量
func (s *sCamelOil) CronAccountPrefetchTask(ctx context.Context) error {
// 获取设置
settings, err := s.GetCamelOilSettings(ctx)
if err != nil {
glog.Errorf(ctx, "获取骆驼模块设置失败: %v", err)
return err
}
// 1. 获取当前在线账号数量
m := dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1())
onlineCount, err := m.Where(dao.V1CamelOilAccount.Columns().Status, consts.CamelOilAccountStatusOnline).
WhereOr(dao.V1CamelOilAccount.Columns().Status, consts.CamelOilAccountStatusSendCode).
Count()
if err != nil {
glog.Errorf(ctx, "获取在线账号数量失败: %v", err)
onlineCount = 0
}
targetOnlineAccounts := settings.LoginAccountCount
// 2. 如果在线账号少于目标数,触发并发登录
if onlineCount < targetOnlineAccounts {
ctx2, span := gtrace.NewTracer("CronAccountPrefetchTask").
Start(ctx, "sCamelOil.CronAccountPrefetchTask",
trace.WithNewRoot(),
)
defer span.End()
needCount := targetOnlineAccounts - onlineCount
span.SetAttributes(attribute.Int("needCount", needCount))
// 使用并发登录提高效率
successCount, err2 := s.BatchLoginAccounts(ctx2, int64(needCount))
if err2 != nil {
glog.Errorf(ctx2, "批量登录账号失败: %v", err2)
// 不返回错误,继续执行
} else {
glog.Infof(ctx2, "成功登录 %d 个账号", successCount)
}
}
return nil
}
// CronPrefetchOrderSupplementTask 预拉取订单补充定时任务 - 由cron调度器定期调用
// 流程:检查预拉取订单库存,不足时补充
func (s *sCamelOil) CronPrefetchOrderSupplementTask(ctx context.Context) error {
// 2. 如果库存不足则补充
supplementedCount, err := s.SupplementPrefetchOrders(ctx)
if err != nil {
glog.Errorf(ctx, "预拉取订单补充失败: %v", err)
return err
}
if supplementedCount > 0 {
s.SavePrefetchOrderLog(ctx, fmt.Sprintf("批量拉取补充预拉取订单成功 - 补充数量: %d", supplementedCount))
glog.Infof(ctx, "预拉取订单补充完成,补充数量: %d", supplementedCount)
}
return nil
}
// CronOrderPaymentCheckTask 订单支付状态检测任务 - 由cron调度器定期调用
func (s *sCamelOil) CronOrderPaymentCheckTask(ctx context.Context) error {
// 查询待支付订单创建时间在24小时内
var orders []*entity.V1CamelOilOrder
err := dao.V1CamelOilOrder.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilOrder.Columns().Status, consts.CamelOilOrderStatusProcessing).
Where(dao.V1CamelOilOrder.Columns().PayStatus, consts.CamelOilPaymentStatusUnpaid).
WhereGTE(dao.V1CamelOilOrder.Columns().CreatedAt, gtime.Now().Add(-gtime.D)).
Scan(&orders)
if err != nil {
glog.Error(ctx, "查询待支付订单失败:", err)
return err
}
if len(orders) == 0 {
return nil
}
glog.Infof(ctx, "查询到 %d 个待支付订单", len(orders))
paidCount := 0
timeoutCount := 0
for _, order := range orders {
accountInfo, err2 := s.GetAccountInfo(ctx, order.AccountId)
if err2 != nil {
glog.Error(ctx, "获取账号信息失败:", err2)
// 记录该订单检查失败
_ = s.UpdateOrderStatus(ctx, order.Id, consts.CamelOilOrderStatusFailed, consts.CamelOilOrderChangeTypeFail, "", fmt.Sprintf("查询账户失败: %v", err2))
continue
}
// 查询订单状态
queryResult, err2 := camel_oil_api.NewClient(ctx).QueryOrder(ctx, accountInfo.Phone, accountInfo.Token, order.PlatformOrderNo)
if err2 != nil {
glog.Error(ctx, "查询订单状态失败:", err2)
_ = s.RecordOrderHistory(ctx, order.OrderNo, "query_failed", "", fmt.Sprintf("查询订单失败: %v", err))
continue
}
// 订单已支付
if queryResult != nil && order.PayStatus != int(consts.CamelOilPaymentStatusPaid) {
glog.Infof(ctx, "订单%s已支付金额: %.2f", order.OrderNo, queryResult.Balance)
_ = s.fillOrderCard(ctx, order.OrderNo, queryResult.CardNumber, queryResult.CardPassword)
// 增加账户订单计数
_ = s.IncrementAccountOrderCount(ctx, order.AccountId)
paidCount++
continue
}
// 订单未支付检查是否超时1小时
if gtime.Now().Sub(order.CreatedAt) >= consts.CamelOilOrderExpireDuration {
glog.Warningf(ctx, "订单%s支付超时创建时间: %v", order.OrderNo, order.CreatedAt)
_, _ = dao.V1CamelOilOrder.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilOrder.Columns().Id, order.Id).
Update(&do.V1CamelOilOrder{
Status: consts.CamelOilOrderStatusFailed,
PayStatus: consts.CamelOilPaymentStatusTimeout,
FailureReason: "支付时间超过24小时支付超时",
})
_ = s.RecordOrderHistory(ctx, order.OrderNo, consts.CamelOilOrderChangeTypeTimeout, "", "订单支付超时")
timeoutCount++
}
}
glog.Infof(ctx, "订单支付状态检测完成: 已支付=%d, 超时=%d", paidCount, timeoutCount)
return nil
}
// CronAccountDailyResetTask 账号日重置任务 - 由cron调度器在每日00:00调用
func (s *sCamelOil) CronAccountDailyResetTask(ctx context.Context) error {
glog.Info(ctx, "开始执行账号日重置任务")
_, _ = dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilAccount.Columns().Status, consts.CamelOilAccountStatusOnline).
WhereLTE(dao.V1CamelOilAccount.Columns().DailyOrderCount, consts.CamelOilAccountDailyOrderLimit).
Update(&do.V1CamelOilAccount{
DailyOrderCount: 0,
DailyOrderDate: gtime.Now(),
})
//// 查询所有暂停的账号
//var accounts []*entity.V1CamelOilAccount
//err := dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()).
// Where(dao.V1CamelOilAccount.Columns().Status, consts.CamelOilAccountStatusPaused).
// Scan(&accounts)
//
//if err != nil {
// glog.Error(ctx, "查询暂停账号失败:", err)
// return err
//}
//
//glog.Infof(ctx, "查询到 %d 个暂停账号", len(accounts))
//
//resumedCount := 0
//invalidCount := 0
//
//for _, account := range accounts {
// // 检查是否是昨日的记录
// if account.DailyOrderDate == nil || account.DailyOrderDate.Format("Y-m-d") != yesterday.Format("Y-m-d") {
// continue
// }
//
// if account.DailyOrderCount >= consts.CamelOilAccountDailyOrderLimit {
// // 正常完成,重置为在线
// _ = s.UpdateAccountStatus(ctx, account.Id, consts.CamelOilAccountStatusOnline,
// consts.CamelOilAccountChangeTypeResume, "次日重置,恢复使用")
//
// // 同时重置日算信息
// _, _ = dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()).
// Where(dao.V1CamelOilAccount.Columns().Id, account.Id).
// Update(&do.V1CamelOilAccount{
// DailyOrderCount: 0,
// DailyOrderDate: gtime.Now(),
// })
//
// resumedCount++
// } else {
// // 单日下单不足10个标记为失效
// _ = s.UpdateAccountStatus(ctx, account.Id, consts.CamelOilAccountStatusInvalid,
// consts.CamelOilAccountChangeTypeInvalidate, "单日下单不足10个账号失效")
//
// invalidCount++
// }
//}
//
//glog.Infof(ctx, "账号日重置任务完成: 恢复=%d, 失效=%d", resumedCount, invalidCount)
return nil
}
func (s *sCamelOil) CronVerifyCodeCheckTask(ctx context.Context) error {
var accounts []*entity.V1CamelOilAccount
err := dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilAccount.Columns().Status, consts.CamelOilAccountStatusSendCode).
Scan(&accounts)
if err != nil {
glog.Error(ctx, "查询待验证码账号失败:", err)
return err
}
if len(accounts) == 0 {
return nil
}
glog.Infof(ctx, "查询到 %d 个待验证码账号", len(accounts))
successCount := 0
failCount := 0
pigClient := pig.NewClient()
camelClient := camel_oil_api.NewClient(ctx)
for _, account := range accounts {
//如果时间超过 1 分钟,就是过期
if gtime.Now().Sub(account.CreatedAt).Minutes() > 1 {
glog.Warningf(ctx, "验证码已过期账号ID: %d, 手机号: %s", account.Id, account.Phone)
_ = s.UpdateAccountStatus(ctx, account.Id, consts.CamelOilAccountStatusInvalid,
consts.CamelOilAccountChangeTypeLoginFail, "验证码已过期")
failCount++
continue
}
// 从野猪平台检测验证码是否已接收
verifyCode, received, err := pigClient.CheckVerifyCode(ctx, account.Phone)
if err != nil {
glog.Warningf(ctx, "检测验证码失败账号ID: %d, 手机号: %s, 错误: %v", account.Id, account.Phone, err)
failCount++
continue
}
// 验证码未接收,继续等待
if !received {
continue
}
// 验证码已接收,执行登录
glog.Infof(ctx, "验证码已接收开始执行登录账号ID: %d, 手机号: %s", account.Id, account.Phone)
// 调用骆驼加油平台执行登录
token, err := camelClient.LoginWithCaptcha(ctx, account.Phone, verifyCode)
if err != nil {
glog.Errorf(ctx, "登录失败账号ID: %d, 手机号: %s, 错误: %v", account.Id, account.Phone, err)
// 标记账号失效
_ = s.UpdateAccountStatus(ctx, account.Id, consts.CamelOilAccountStatusInvalid,
consts.CamelOilAccountChangeTypeLoginFail, fmt.Sprintf("登录失败: %v", err))
failCount++
continue
}
// 保存 token
_, err = dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilAccount.Columns().Id, account.Id).
Update(&do.V1CamelOilAccount{
Token: token,
LastLoginAt: gtime.Now(),
})
if err != nil {
glog.Errorf(ctx, "保存 token 失败账号ID: %d, 错误: %v", account.Id, err)
failCount++
continue
}
// 调用 UpdateAccountStatus 更新账号状态为在线
_ = s.UpdateAccountStatus(ctx, account.Id, consts.CamelOilAccountStatusOnline,
consts.CamelOilAccountChangeTypeLogin, fmt.Sprintf("登录成功,手机号: %s", account.Phone))
glog.Infof(ctx, "账号登录成功ID: %d, 手机号: %s, Token: %s", account.Id, account.Phone, token)
successCount++
}
glog.Infof(ctx, "验证码检测任务完成: 成功=%d, 失败=%d", successCount, failCount)
return nil
}
// CronCardBindingTask 卡密绑定定时任务 - 由cron调度器定期调用
// 流程:处理已支付但未绑定 Token 的订单,进行卡密绑定
func (s *sCamelOil) CronCardBindingTask(ctx context.Context) error {
// 查询已支付但未绑定 Token 的订单
var orders []*entity.V1CamelOilOrder
err := dao.V1CamelOilOrder.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilOrder.Columns().PayStatus, consts.CamelOilPaymentStatusPaid).
Where(dao.V1CamelOilOrder.Columns().Status, consts.CamelOilOrderStatusProcessing).
WhereNotIn(dao.V1CamelOilOrder.Columns().Id,
dao.V1CamelOilCardBinding.Ctx(ctx).DB(config.GetDatabaseV1()).
Fields(dao.V1CamelOilCardBinding.Columns().OrderId)).
Limit(50).
Scan(&orders)
if err != nil {
glog.Error(ctx, "查询待绑定订单失败", err)
return err
}
if len(orders) == 0 {
return nil
}
glog.Infof(ctx, "查询到 %d 个待绑定订单", len(orders))
successCount := 0
failCount := 0
for _, order := range orders {
// 检查是否有可用的 Token
availableTokenCount, err2 := dao.V1CamelOilToken.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilToken.Columns().Status, consts.CamelOilTokenStatusAvailable).
Count()
if err2 != nil || availableTokenCount == 0 {
glog.Warningf(ctx, "无可用 Token订单 ID: %d 无法绑定", order.Id)
failCount++
continue
}
// 检查卡号和卡密是否存在
if order.CardNumber == "" || order.CardPassword == "" {
glog.Warningf(ctx, "订单 %d 卡号或卡密未填写,无法绑定", order.Id)
failCount++
continue
}
// 尝试绑定卡密到 Token
_, err = s.BindCardToToken(ctx, &model.CamelOilCardBindInput{
OrderId: order.Id,
CardNumber: order.CardNumber,
CardPassword: order.CardPassword,
Amount: order.Amount,
})
if err != nil {
glog.Errorf(ctx, "绑定卡密到 Token 失败,订单 ID: %d, 错误: %v", order.Id, err)
failCount++
continue
}
glog.Infof(ctx, "订单 %d 卡密绑定成功", order.Id)
successCount++
}
glog.Infof(ctx, "卡密绑定任务完成: 成功=%d, 失败=%d", successCount, failCount)
return nil
}
// CronCleanExpiredPrefetchOrders 清理过期的预拉取订单
func (s *sCamelOil) CronCleanExpiredPrefetchOrders(ctx context.Context) (cleanedCount int, err error) {
m := dao.V1CamelOilPrefetchOrder.Ctx(ctx).DB(config.GetDatabaseV1())
// 查询已过期的待匹配订单
var expiredOrders []*entity.V1CamelOilPrefetchOrder
err = m.Where(dao.V1CamelOilPrefetchOrder.Columns().Status, consts.CamelOilPrefetchOrderStatusPending).
WhereLT(dao.V1CamelOilPrefetchOrder.Columns().ExpireAt, gtime.Now()).
Scan(&expiredOrders)
if err != nil {
return 0, gerror.Wrap(err, "查询过期订单失败")
}
if len(expiredOrders) == 0 {
return 0, nil
}
// 标记为已过期
for _, order := range expiredOrders {
_, err = m.Where(dao.V1CamelOilPrefetchOrder.Columns().Id, order.Id).
Update(&do.V1CamelOilPrefetchOrder{
Status: int(consts.CamelOilPrefetchOrderStatusExpired),
})
if err != nil {
glog.Warningf(ctx, "标记预拉取订单为过期失败ID=%d: %v", order.Id, err)
continue
}
// 记录历史
_ = s.recordPrefetchOrderHistory(ctx, order.Id, consts.CamelOilPrefetchOrderChangeTypeExpire,
order.AccountId, order.AccountName, "订单已过期,自动标记失效", nil)
cleanedCount++
}
glog.Infof(ctx, "清理过期预拉取订单完成,清理数量: %d", cleanedCount)
return cleanedCount, nil
}
// CronExpiredTokensCode 获取需要发送验证码的 Token状态为待验证码
func (s *sCamelOil) CronExpiredTokensCode(ctx context.Context) error {
_, err := dao.V1CamelOilToken.Ctx(ctx).DB(config.GetDatabaseV1()).Where(dao.V1CamelOilToken.Columns().Status, consts.CamelOilTokenStatusCodeSent).
WhereLT(dao.V1CamelOilToken.Columns().UpdatedAt, gtime.Now().Add(-gtime.M*5)).
Update(do.V1CamelOilToken{Status: consts.CamelOilTokenStatusVerificationFailed})
return err
}