Files
kami_backend/internal/logic/camel_oil/prefetch_order.go
danial 8495c453f3 feat(camel_oil): 添加骆驼模块设置和预拉取订单日志功能
- 增加骆驼模块设置接口支持获取和更新配置
- 使用Redis缓存设置数据,实现模块配置的持久化管理
- 引入预拉取订单日志功能,支持日志的保存和按时间范围查询
- 预拉取订单请求响应数据记录到Redis,方便问题追踪
- 根据模块设置动态调整账号登录、预拉取订单并发数量
- 调整账号登录逻辑以支持配置的并发控制
- 优化预拉取订单补充流程,支持多面额库存管理
- 修正集成API请求函数名及调用,记录详细调用日志数据
- 调整定时任务调度频率,增加预拉取订单补充任务的执行频率
- 升级golang版本到1.25.5,保持开发环境最新状态
2025-12-03 21:17:56 +08:00

416 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package camel_oil
import (
"context"
"fmt"
"sync"
"sync/atomic"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/os/gmlock"
"github.com/gogf/gf/v2/os/gtime"
"github.com/shopspring/decimal"
"kami/internal/consts"
"kami/internal/dao"
"kami/internal/model"
"kami/internal/model/do"
"kami/internal/model/entity"
"kami/utility/config"
"kami/utility/integration/camel_oil_api"
)
// ====================================================================================
// 预拉取订单管理相关方法
// ====================================================================================
// GetPrefetchOrderCapacity 获取当前可用订单容量
func (s *sCamelOil) GetPrefetchOrderCapacity(ctx context.Context, amount float64) (capacity int, err error) {
m := dao.V1CamelOilPrefetchOrder.Ctx(ctx).DB(config.GetDatabaseV1())
count, err := m.Where(dao.V1CamelOilPrefetchOrder.Columns().Status, consts.CamelOilPrefetchOrderStatusPending).
WhereGTE(dao.V1CamelOilPrefetchOrder.Columns().ExpireAt, gtime.Now()).
Where(dao.V1CamelOilPrefetchOrder.Columns().Amount, amount).
Count()
if err != nil {
return 0, gerror.Wrap(err, "查询预拉取订单库存失败")
}
return count, nil
}
// PrefetchOrderConcurrently 使用所有可用账号并发拉取订单,直到获取到可用订单为止
func (s *sCamelOil) PrefetchOrderConcurrently(ctx context.Context, amount float64) (result *model.PrefetchOrderResult, err error) {
// 获取设置
settings, err := GetCamelOilSettings(ctx)
if err != nil {
glog.Errorf(ctx, "获取骆驼模块设置失败: %v", err)
return nil, err
}
// 1. 获取所有在线账号
m := dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1())
var onlineAccounts []*entity.V1CamelOilAccount
err = m.Where(dao.V1CamelOilAccount.Columns().Status, consts.CamelOilAccountStatusOnline).
WhereLT(dao.V1CamelOilAccount.Columns().DailyOrderCount, consts.CamelOilAccountDailyOrderLimit).
OrderRandom().
Scan(&onlineAccounts)
if err != nil {
return nil, gerror.Wrap(err, "查询在线账号失败")
}
if len(onlineAccounts) == 0 {
return nil, gerror.New("暂无在线账号可用")
}
// 2. 使用设置中的并发数量限制并发
concurrencyLimit := min(len(onlineAccounts), settings.PrefetchConcurrencyAccounts)
var (
resultChan = make(chan *model.PrefetchOrderResult, 1)
errorChan = make(chan error, len(onlineAccounts))
semaphore = make(chan struct{}, concurrencyLimit) // 信信道控制并发量
wg sync.WaitGroup
mu sync.Mutex
found = false
)
// 3. 每个账号起一个协程尝试拉取,控制并发量
for _, account := range onlineAccounts {
acc := account // 避免闭包陷阱
wg.Go(func() {
// 获取信信道控制
semaphore <- struct{}{}
defer func() { <-semaphore }()
// 检查是否已经找到订单
mu.Lock()
if found {
mu.Unlock()
return
}
mu.Unlock()
// 拉取订单
platformOrderId, payUrl, err2 := camel_oil_api.NewClient().CreateCamelOilOrder(ctx, acc.Phone, acc.Token, amount)
if err2 != nil {
if err2.Error() == "auth_error" {
_ = s.UpdateAccountStatus(ctx, acc.Id, consts.CamelOilAccountStatusInvalid, consts.CamelOilAccountChangeTypeInvalidate, "账号token失效")
}
errorChan <- err2
return
}
// 成功拉取订单,返回结果
mu.Lock()
if !found {
found = true
resultChan <- &model.PrefetchOrderResult{
PlatformOrderNo: platformOrderId,
AlipayUrl: payUrl,
AccountId: acc.Id,
AccountName: acc.AccountName,
Amount: amount,
}
}
mu.Unlock()
})
}
// 4. 等待所有协程完成並关闭Channel
go func() {
wg.Wait()
close(resultChan)
close(errorChan)
}()
// 5. 等待第一个成功的订单
if res := <-resultChan; res != nil {
glog.Infof(ctx, "并发拉取订单成功,账号=%s, 金额=%.2f", res.AccountName, res.Amount)
return res, nil
}
// 6. 所有账号都拉取失败
var lastErr error
for err = range errorChan {
lastErr = err
}
return nil, gerror.Wrap(lastErr, "所有账号拉取订单失败")
}
// PrefetchOrder 拉取单个订单(用于单个账号)
func (s *sCamelOil) PrefetchOrder(ctx context.Context, account *entity.V1CamelOilAccount, amount float64) (prefetchId int64, err error) {
// 1. 从骆驼平台拉取订单
platformOrderId, payUrl, err := camel_oil_api.NewClient().CreateCamelOilOrder(ctx, account.Phone, account.Token, amount)
if err != nil {
if err.Error() == "auth_error" {
_ = s.UpdateAccountStatus(ctx, account.Id, consts.CamelOilAccountStatusInvalid, consts.CamelOilAccountChangeTypeInvalidate, "账号token失效")
return 0, gerror.Wrap(err, "账号token失效拉取订单失败")
}
return 0, gerror.Wrap(err, "从骆驼平台拉取订单失败")
}
// 3. 保存预拉取订单记录
result, err := dao.V1CamelOilPrefetchOrder.Ctx(ctx).DB(config.GetDatabaseV1()).Insert(&do.V1CamelOilPrefetchOrder{
AccountId: account.Id,
AccountName: account.AccountName,
Amount: decimal.NewFromFloat(amount),
PlatformOrderNo: platformOrderId,
AlipayUrl: payUrl,
Status: consts.CamelOilPrefetchOrderStatusPending, // 待匹配
ExpireAt: gtime.Now().Add(consts.CamelOilPrefetchOrderExpireDuration),
})
if err != nil {
return 0, gerror.Wrap(err, "保存预拉取订单失败")
}
prefetchId, err = result.LastInsertId()
if err != nil {
return 0, gerror.Wrap(err, "获取预拉取订单ID失败")
}
// 4. 记录预拉取订单历史
_ = s.recordPrefetchOrderHistory(ctx, prefetchId, consts.CamelOilPrefetchOrderChangeTypeFetch,
account.Id, account.AccountName, fmt.Sprintf("从骆驼平台拉取订单,平台订单号: %s", platformOrderId), nil)
glog.Infof(ctx, "预拉取订单创建成功ID=%d, 账号=%s, 平台订单号=%s", prefetchId, account.Phone, platformOrderId)
return prefetchId, nil
}
// ConcurrentPrefetchOrders 使用多个账号并发拉取订单
func (s *sCamelOil) ConcurrentPrefetchOrders(ctx context.Context, amount float64, targetCount int) (successCount int, err error) {
// 获取设置
settings, err := GetCamelOilSettings(ctx)
if err != nil {
glog.Errorf(ctx, "获取骆驼模块设置失败: %v", err)
return 0, err
}
// 1. 获取所有在线账号
m := dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1())
var onlineAccounts []*entity.V1CamelOilAccount
err = m.Where(dao.V1CamelOilAccount.Columns().Status, consts.CamelOilAccountStatusOnline).
WhereLT(dao.V1CamelOilAccount.Columns().DailyOrderCount, consts.CamelOilAccountDailyOrderLimit).
Scan(&onlineAccounts)
if err != nil {
return 0, gerror.Wrap(err, "查询在线账号失败")
}
if len(onlineAccounts) == 0 {
return 0, gerror.New("暂无在线账号可用于拉取订单")
}
// 2. 使用设置中的并发数量限制并发
concurrencyLimit := settings.PrefetchConcurrencyAccounts
var (
wg sync.WaitGroup
semaphore = make(chan struct{}, concurrencyLimit)
successCounter int32
failureCounter int32
accountIndex int32
)
targetRemaining := int32(targetCount)
accountIndex = 0
const maxRetries = 10 // 失败计数是成功计数的10倍每个循环重试10次
for {
// 原子方式检查并减少目标数量
current := atomic.LoadInt32(&targetRemaining)
if current <= 0 {
break
}
account := onlineAccounts[int(atomic.LoadInt32(&accountIndex))%len(onlineAccounts)]
atomic.AddInt32(&accountIndex, 1)
semaphore <- struct{}{} // 获取信号量
wg.Go(func() {
defer wg.Done()
defer func() { <-semaphore }() // 释放信号量
// 为了提高效率每个账号可以拉取多单每单重试10次
for i := 0; i < 2; i++ {
// 原子方式检查是否还需要继续拉取
if atomic.LoadInt32(&targetRemaining) <= 0 {
break
}
// 重试机制每个订单最多重试10次
var success bool
for retry := 0; retry < maxRetries; retry++ {
_, err = s.PrefetchOrder(ctx, account, amount)
if err == nil {
success = true
break
}
atomic.AddInt32(&failureCounter, 1)
if retry < maxRetries-1 {
glog.Debugf(ctx, "账号%s拉取订单失败(第%d次重试): %v", account.Phone, retry+1, err)
} else {
glog.Warningf(ctx, "账号%s拉取订单失败(已重试%d次): %v", account.Phone, maxRetries, err)
}
}
if success {
atomic.AddInt32(&successCounter, 1)
}
atomic.AddInt32(&targetRemaining, -1)
}
})
}
wg.Wait()
close(semaphore) // 关闭信号量通道,允许 GC 回收
successCount = int(atomic.LoadInt32(&successCounter))
failureCount := int(atomic.LoadInt32(&failureCounter))
glog.Infof(ctx, "并发拉取订单完成,目标: %d, 成功: %d, 失败: %d", targetCount, successCount, failureCount)
return successCount, nil
}
// SupplementPrefetchOrders 补充预拉取订单,当库存不足时调用
func (s *sCamelOil) SupplementPrefetchOrders(ctx context.Context) (supplementedCount int, err error) {
gmlock.Lock(consts.CamelOilPrefetchTaskLockKey)
defer gmlock.Unlock(consts.CamelOilPrefetchTaskLockKey)
// 获取设置
settings, err := GetCamelOilSettings(ctx)
if err != nil {
glog.Errorf(ctx, "获取骆驼模块设置失败: %v", err)
return 0, err
}
// 如果没有设置面额配置,直接返回
if len(settings.TargetDenominations) == 0 {
glog.Infof(ctx, "未配置面额设置,无需补充预拉取订单")
return 0, nil
}
successCount := 0
for _, denom := range settings.TargetDenominations {
// 1. 获取当前库存
capacity, err2 := s.GetPrefetchOrderCapacity(ctx, float64(denom.Denomination))
if err2 != nil {
return 0, gerror.Wrap(err2, "获取预拉取订单库存失败")
}
glog.Infof(ctx, "当前预拉取订单库存 (面额 %d): %d", denom.Denomination, capacity)
// 2. 如果库存充足,无需补充
if capacity >= denom.MinCapacity {
glog.Infof(ctx, "面额 %d 预拉取订单库存充足 (%d >= %d),无需补充", denom.Denomination, capacity, denom.MinCapacity)
continue
}
// 3. 计算需要补充的数量
needCount := denom.TargetCapacity - capacity
glog.Infof(ctx, "面额 %d 预拉取订单库存不足,需要补充 %d 单", denom.Denomination, needCount)
// 4. 并发拉取订单
success, err := s.ConcurrentPrefetchOrders(ctx, float64(denom.Denomination), needCount)
if err != nil {
glog.Errorf(ctx, "面额 %d 并发拉取订单失败: %v", denom.Denomination, err)
continue
}
successCount += success
}
return successCount, nil
}
// MatchPrefetchOrder 将预拉取订单与用户订单进行匹配
func (s *sCamelOil) MatchPrefetchOrder(ctx context.Context, orderId string, amount float64) (result *model.PrefetchOrderResult, err error) {
// 2. 查询待匹配的预拉取订单(同金额)
var prefetchOrder *entity.V1CamelOilPrefetchOrder
err = dao.V1CamelOilPrefetchOrder.Ctx(ctx).DB(config.GetDatabaseV1()).
Where(dao.V1CamelOilPrefetchOrder.Columns().Status, consts.CamelOilPrefetchOrderStatusPending).
Where(dao.V1CamelOilPrefetchOrder.Columns().Amount, amount).
WhereGTE(dao.V1CamelOilPrefetchOrder.Columns().ExpireAt, gtime.Now()).
OrderAsc(dao.V1CamelOilPrefetchOrder.Columns().CreatedAt).
Scan(&prefetchOrder)
if err != nil {
return nil, gerror.Wrap(err, "查询预拉取订单失败")
}
if prefetchOrder == nil {
return nil, gerror.New("暂无匹配的预拉取订单,请稍后重试")
}
// 3. 使用事务更新预拉取订单和用户订单
err = config.GetDatabaseV1().Ctx(ctx).Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
// 3.1 更新预拉取订单:标记为已匹配
_, err = dao.V1CamelOilPrefetchOrder.Ctx(ctx).DB(config.GetDatabaseV1()).TX(tx).
Where(dao.V1CamelOilPrefetchOrder.Columns().Id, prefetchOrder.Id).
Update(&do.V1CamelOilPrefetchOrder{
Status: consts.CamelOilPrefetchOrderStatusMatched,
OrderNo: orderId,
MatchedAt: gtime.Now(),
})
if err != nil {
return gerror.Wrap(err, "更新预拉取订单状态失败")
}
if err != nil {
return gerror.Wrap(err, "更新用户订单失败")
}
// 3.3 记录预拉取订单历史
_ = s.recordPrefetchOrderHistory(ctx, prefetchOrder.Id, consts.CamelOilPrefetchOrderChangeTypeMatch,
prefetchOrder.AccountId, prefetchOrder.AccountName,
fmt.Sprintf("与用户订单匹配订单ID=%s", orderId), tx)
return nil
})
if err != nil {
return nil, err
}
result = &model.PrefetchOrderResult{
PlatformOrderNo: prefetchOrder.PlatformOrderNo,
AlipayUrl: prefetchOrder.AlipayUrl,
AccountId: prefetchOrder.AccountId,
AccountName: prefetchOrder.AccountName,
Amount: prefetchOrder.Amount.InexactFloat64(),
}
glog.Infof(ctx, "预拉取订单匹配成功预拉取ID=%d, 用户订单ID=%s, 平台订单号=%s",
prefetchOrder.Id, orderId, prefetchOrder.PlatformOrderNo)
return result, nil
}
// recordPrefetchOrderHistory 记录预拉取订单历史
func (s *sCamelOil) recordPrefetchOrderHistory(ctx context.Context, prefetchId int64,
changeType consts.CamelOilPrefetchOrderChangeType, accountId int64, accountName, remark string, tx gdb.TX) error {
m := dao.V1CamelOilPrefetchOrderHistory.Ctx(ctx).DB(config.GetDatabaseV1())
if tx != nil {
m = m.TX(tx)
}
_, err := m.Insert(&do.V1CamelOilPrefetchOrderHistory{
PrefetchId: prefetchId,
ChangeType: string(changeType),
AccountId: accountId,
AccountName: accountName,
Remark: remark,
})
if err != nil {
glog.Errorf(ctx, "记录预拉取订单历史失败: %v", err)
}
return err
}