From 8495c453f3cb0e43caad0ae7a5c0f140b9f83d9c Mon Sep 17 00:00:00 2001 From: danial Date: Wed, 3 Dec 2025 21:17:56 +0800 Subject: [PATCH] =?UTF-8?q?feat(camel=5Foil):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E9=AA=86=E9=A9=BC=E6=A8=A1=E5=9D=97=E8=AE=BE=E7=BD=AE=E5=92=8C?= =?UTF-8?q?=E9=A2=84=E6=8B=89=E5=8F=96=E8=AE=A2=E5=8D=95=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 增加骆驼模块设置接口支持获取和更新配置 - 使用Redis缓存设置数据,实现模块配置的持久化管理 - 引入预拉取订单日志功能,支持日志的保存和按时间范围查询 - 预拉取订单请求响应数据记录到Redis,方便问题追踪 - 根据模块设置动态调整账号登录、预拉取订单并发数量 - 调整账号登录逻辑以支持配置的并发控制 - 优化预拉取订单补充流程,支持多面额库存管理 - 修正集成API请求函数名及调用,记录详细调用日志数据 - 调整定时任务调度频率,增加预拉取订单补充任务的执行频率 - 升级golang版本到1.25.5,保持开发环境最新状态 --- .tool-versions | 2 +- api/camel_oil/camel_oil.go | 2 + api/camel_oil/v1/order_logs.go | 33 ++++ api/camel_oil/v1/settings.go | 50 ++++++ internal/consts/camel_oil.go | 15 -- .../camel_oil_v1_get_prefetch_order_logs.go | 12 ++ .../camel_oil/camel_oil_v1_get_settings.go | 12 ++ .../camel_oil/camel_oil_v1_update_settings.go | 12 ++ internal/logic/camel_oil/account_login.go | 48 +++-- internal/logic/camel_oil/cron_tasks.go | 14 +- internal/logic/camel_oil/prefetch_order.go | 71 +++++--- .../logic/camel_oil/prefetch_order_logs.go | 165 ++++++++++++++++++ internal/logic/camel_oil/settings.go | 97 ++++++++++ internal/service/camel_oil.go | 8 + utility/cache/consts.go | 1 + utility/cron/cron.go | 6 +- utility/integration/camel_oil_api/api.go | 35 +++- utility/integration/camel_oil_api/api_test.go | 2 +- 18 files changed, 515 insertions(+), 70 deletions(-) create mode 100644 api/camel_oil/v1/order_logs.go create mode 100644 api/camel_oil/v1/settings.go create mode 100644 internal/controller/camel_oil/camel_oil_v1_get_prefetch_order_logs.go create mode 100644 internal/controller/camel_oil/camel_oil_v1_get_settings.go create mode 100644 internal/controller/camel_oil/camel_oil_v1_update_settings.go create mode 100644 internal/logic/camel_oil/prefetch_order_logs.go create mode 100644 internal/logic/camel_oil/settings.go diff --git a/.tool-versions b/.tool-versions index 05bd69d5..653ac4e3 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ -golang 1.25.3 +golang 1.25.5 python 3.13.9 diff --git a/api/camel_oil/camel_oil.go b/api/camel_oil/camel_oil.go index 6cc310cf..4d507bc7 100644 --- a/api/camel_oil/camel_oil.go +++ b/api/camel_oil/camel_oil.go @@ -21,6 +21,8 @@ type ICamelOilV1 interface { OrderHistory(ctx context.Context, req *v1.OrderHistoryReq) (res *v1.OrderHistoryRes, err error) AccountOrderList(ctx context.Context, req *v1.AccountOrderListReq) (res *v1.AccountOrderListRes, err error) OrderCallback(ctx context.Context, req *v1.OrderCallbackReq) (res *v1.OrderCallbackRes, err error) + GetSettings(ctx context.Context, req *v1.GetSettingsReq) (res *v1.GetSettingsRes, err error) + UpdateSettings(ctx context.Context, req *v1.UpdateSettingsReq) (res *v1.UpdateSettingsRes, err error) CreateToken(ctx context.Context, req *v1.CreateTokenReq) (res *v1.CreateTokenRes, err error) GetToken(ctx context.Context, req *v1.GetTokenReq) (res *v1.GetTokenRes, err error) ListTokens(ctx context.Context, req *v1.ListTokensReq) (res *v1.ListTokensRes, err error) diff --git a/api/camel_oil/v1/order_logs.go b/api/camel_oil/v1/order_logs.go new file mode 100644 index 00000000..4771cc48 --- /dev/null +++ b/api/camel_oil/v1/order_logs.go @@ -0,0 +1,33 @@ +package v1 + +import ( + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/gtime" +) + +// GetPrefetchOrderLogsReq 获取预拉取订单日志请求 +type GetPrefetchOrderLogsReq struct { + g.Meta `path:"/jd-v2/prefetch/logs" tags:"JD V2 Prefetch" method:"get" summary:"获取预拉取订单日志"` + // 开始时间 + StartTime *gtime.Time `json:"startTime" v:"required#开始时间不能为空" description:"开始时间"` + // 结束时间 + EndTime *gtime.Time `json:"endTime" v:"required#结束时间不能为空" description:"结束时间"` +} + +// PrefetchOrderLogItem 预拉取订单日志项 +type PrefetchOrderLogItem struct { + // 请求时间戳 + Timestamp string `json:"timestamp" description:"请求时间戳"` + // 手机号(脱敏) + Phone string `json:"phone" description:"手机号(脱敏)"` + // 订单面额 + Amount float64 `json:"amount" description:"订单面额"` + // API响应数据 + ResponseData string `json:"responseData" description:"API响应原始数据"` +} + +// GetPrefetchOrderLogsRes 获取预拉取订单日志响应 +type GetPrefetchOrderLogsRes struct { + // 日志列表 + Logs []PrefetchOrderLogItem `json:"logs" description:"预拉取订单日志列表"` +} \ No newline at end of file diff --git a/api/camel_oil/v1/settings.go b/api/camel_oil/v1/settings.go new file mode 100644 index 00000000..ee22a7a7 --- /dev/null +++ b/api/camel_oil/v1/settings.go @@ -0,0 +1,50 @@ +package v1 + +import ( + "github.com/gogf/gf/v2/frame/g" +) + +// GetSettingsReq 获取骆驼模块设置 +type GetSettingsReq struct { + g.Meta `path:"/jd-v2/settings/get" tags:"JD V2 Settings" method:"get" summary:"获取骆驼模块设置"` +} + +type GetSettingsRes struct { + g.Meta `mime:"application/json"` + CamelOilSettings +} + +// UpdateSettingsReq 更新骆驼模块设置 +type UpdateSettingsReq struct { + g.Meta `path:"/jd-v2/settings/update" tags:"JD V2 Settings" method:"post" summary:"更新骆驼模块设置"` + CamelOilSettings +} + +type UpdateSettingsRes struct { + g.Meta `mime:"application/json"` +} + +// DenominationSetting 单个面额设置 +type DenominationSetting struct { + Denomination int `json:"denomination" description:"面额值,如100、200、500等"` + MinCapacity int `json:"minCapacity" description:"该面额预拉取订单最小库存阈值(当库存低于此值时触发补充)"` + TargetCapacity int `json:"targetCapacity" description:"该面额预拉取订单目标库存(补充时的目标数量)"` +} + +// CamelOilSettings 骆驼模块设置 +type CamelOilSettings struct { + // 豪猪平台相关设置 + UseHaozhuPlatform bool `json:"useHaozhuPlatform" description:"是否从豪猪平台获取手机号登录"` + + // 账号登录数量设置 + LoginAccountCount int `json:"loginAccountCount" description:"要登录的手机号数量"` + + // 提前拉单并发设置 + PrefetchConcurrencyAccounts int `json:"prefetchConcurrencyAccounts" description:"提前拉单并发的账号数量"` + + // 单账号并发设置 + SingleAccountConcurrency int `json:"singleAccountConcurrency" description:"单个账号的并发数量"` + + // 面额相关设置 + TargetDenominations []DenominationSetting `json:"targetDenominations" description:"要获取的面额和对应库存设置列表"` +} \ No newline at end of file diff --git a/internal/consts/camel_oil.go b/internal/consts/camel_oil.go index dbad3a56..2fe07210 100644 --- a/internal/consts/camel_oil.go +++ b/internal/consts/camel_oil.go @@ -210,18 +210,9 @@ var CamelOilPrefetchOrderChangeTypeText = map[CamelOilPrefetchOrderChangeType]st // ==================================================================================== const ( - // CamelOilPrefetchOrderMinCapacity 预拉取订单最小库存阈值(当库存低于此值时触发补充) - CamelOilPrefetchOrderMinCapacity = 1 - - // CamelOilPrefetchOrderTargetCapacity 预拉取订单目标库存(补充时的目标数量) - CamelOilPrefetchOrderTargetCapacity = 5 - // CamelOilPrefetchOrderExpireDuration 预拉取订单过期时间(小时) CamelOilPrefetchOrderExpireDuration = time.Hour * 24 - // CamelOilPrefetchMaxConcurrency 预拉取最大并发账号数量 - CamelOilPrefetchMaxConcurrency = 10 - // CamelOilPrefetchOrderLockKey Redis中预拉取订单的分布式锁键名前缀 CamelOilPrefetchOrderLockKey = "camel_oil_api:prefetch:order:lock:" @@ -237,18 +228,12 @@ const ( // CamelOilAccountDailyOrderLimit 账号每日订单上限 CamelOilAccountDailyOrderLimit = 10 - // CamelOilTargetOnlineAccounts 目标在线账号数量 - CamelOilTargetOnlineAccounts = 10 - // CamelOilOrderExpireDuration 订单支付超时时间(小时) CamelOilOrderExpireDuration = gtime.H // CamelOilMaxCallbackRetry 回调最大重试次数 CamelOilMaxCallbackRetry = 3 - // CamelOilMaxLoginConcurrency 最大并发登录数量 - CamelOilMaxLoginConcurrency = 3 - // CamelOilTokenExpireDuration Token过期时间(天) CamelOilTokenExpireDuration = 30 ) diff --git a/internal/controller/camel_oil/camel_oil_v1_get_prefetch_order_logs.go b/internal/controller/camel_oil/camel_oil_v1_get_prefetch_order_logs.go new file mode 100644 index 00000000..0971cd58 --- /dev/null +++ b/internal/controller/camel_oil/camel_oil_v1_get_prefetch_order_logs.go @@ -0,0 +1,12 @@ +package camel_oil + +import ( + "context" + + v1 "kami/api/camel_oil/v1" + "kami/internal/service" +) + +func (c *ControllerV1) GetPrefetchOrderLogs(ctx context.Context, req *v1.GetPrefetchOrderLogsReq) (res *v1.GetPrefetchOrderLogsRes, err error) { + return service.CamelOil().GetPrefetchOrderLogs(ctx, req) +} \ No newline at end of file diff --git a/internal/controller/camel_oil/camel_oil_v1_get_settings.go b/internal/controller/camel_oil/camel_oil_v1_get_settings.go new file mode 100644 index 00000000..24118cea --- /dev/null +++ b/internal/controller/camel_oil/camel_oil_v1_get_settings.go @@ -0,0 +1,12 @@ +package camel_oil + +import ( + "context" + + "kami/api/camel_oil/v1" + "kami/internal/service" +) + +func (c *ControllerV1) GetSettings(ctx context.Context, req *v1.GetSettingsReq) (res *v1.GetSettingsRes, err error) { + return service.CamelOil().GetSettings(ctx, req) +} diff --git a/internal/controller/camel_oil/camel_oil_v1_update_settings.go b/internal/controller/camel_oil/camel_oil_v1_update_settings.go new file mode 100644 index 00000000..fd9e0dfd --- /dev/null +++ b/internal/controller/camel_oil/camel_oil_v1_update_settings.go @@ -0,0 +1,12 @@ +package camel_oil + +import ( + "context" + + "kami/api/camel_oil/v1" + "kami/internal/service" +) + +func (c *ControllerV1) UpdateSettings(ctx context.Context, req *v1.UpdateSettingsReq) (res *v1.UpdateSettingsRes, err error) { + return service.CamelOil().UpdateSettings(ctx, req) +} diff --git a/internal/logic/camel_oil/account_login.go b/internal/logic/camel_oil/account_login.go index 9440fbee..22d1f713 100644 --- a/internal/logic/camel_oil/account_login.go +++ b/internal/logic/camel_oil/account_login.go @@ -9,6 +9,7 @@ import ( "kami/utility/config" "kami/utility/integration/camel_oil_api" "kami/utility/integration/pig" + "sync" "sync/atomic" "time" @@ -19,13 +20,25 @@ import ( // LoginAccount 执行账号登录流程 // 注意:当前使用假数据,实际应对接骆驼加油平台和接码平台 func (s *sCamelOil) LoginAccount(ctx context.Context) (err error) { + // 获取设置 + settings, err := GetCamelOilSettings(ctx) + if err != nil { + glog.Errorf(ctx, "获取骆驼模块设置失败: %v", err) + return err + } + + // 如果不使用豪猪平台,直接返回错误 + if !settings.UseHaozhuPlatform { + return gerror.New("未启用豪猪平台,无法获取手机号") + } + // 对接接码平台,获取手机号并检查是否已存在 var phoneNumber string ticker := time.NewTicker(time.Second) for range ticker.C { phoneNumber, err = pig.NewClient().GetAccountInfo(ctx) if err != nil { - return gerror.Wrap(err, "获取手机号失败") + return gerror.Wrap(err, "从豪猪平台获取手机号失败") } // 检查手机号是否已存在 @@ -67,21 +80,36 @@ func (s *sCamelOil) BatchLoginAccounts(ctx context.Context, count int64) (succes return 0, gerror.New("登录数量必须大于0") } - // 逐个登录账号 - successCount = 0 - for range 10 { - if successCount >= count { - break - } - for i := 0; i < int(count-successCount); i++ { + // 获取设置 + settings, err := GetCamelOilSettings(ctx) + if err != nil { + glog.Errorf(ctx, "获取骆驼模块设置失败: %v", err) + return 0, err + } + + // 使用设置中的并发数量控制登录 + semaphore := make(chan struct{}, settings.SingleAccountConcurrency) + var wg sync.WaitGroup + var successCounter int64 + + for i := 0; i < int(count); i++ { + wg.Add(1) + go func() { + defer wg.Done() + semaphore <- struct{}{} // 获取信号量 + defer func() { <-semaphore }() // 释放信号量 + loginErr := s.LoginAccount(ctx) if loginErr != nil { glog.Errorf(ctx, "账号登录失败,错误: %v", loginErr) return } - atomic.AddInt64(&successCount, 1) - } + atomic.AddInt64(&successCounter, 1) + }() } + + wg.Wait() + successCount = successCounter glog.Infof(ctx, "批量登录完成,成功: %d", successCount) return successCount, nil } diff --git a/internal/logic/camel_oil/cron_tasks.go b/internal/logic/camel_oil/cron_tasks.go index e1888044..f77f4877 100644 --- a/internal/logic/camel_oil/cron_tasks.go +++ b/internal/logic/camel_oil/cron_tasks.go @@ -26,6 +26,13 @@ func (s *sCamelOil) CronAccountPrefetchTask(ctx context.Context) error { return nil } + // 获取设置 + settings, err := GetCamelOilSettings(ctx) + if err != nil { + glog.Errorf(ctx, "获取骆驼模块设置失败: %v", err) + return err + } + // 1. 获取当前在线账号数量 m := dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()) onlineCount, err := m.Where(dao.V1CamelOilAccount.Columns().Status, consts.CamelOilAccountStatusOnline). @@ -36,11 +43,12 @@ func (s *sCamelOil) CronAccountPrefetchTask(ctx context.Context) error { onlineCount = 0 } - glog.Infof(ctx, "当前在线账号数量: %d, 目标数量: %d", onlineCount, consts.CamelOilTargetOnlineAccounts) + targetOnlineAccounts := settings.LoginAccountCount + glog.Infof(ctx, "当前在线账号数量: %d, 目标数量: %d", onlineCount, targetOnlineAccounts) // 2. 如果在线账号少于目标数,触发并发登录 - if onlineCount < consts.CamelOilTargetOnlineAccounts { - needCount := consts.CamelOilTargetOnlineAccounts - onlineCount + if onlineCount < targetOnlineAccounts { + needCount := targetOnlineAccounts - onlineCount glog.Infof(ctx, "在线账号不足,需要登录 %d 个账号", needCount) // 使用并发登录提高效率 diff --git a/internal/logic/camel_oil/prefetch_order.go b/internal/logic/camel_oil/prefetch_order.go index d1b1c754..750b8cea 100644 --- a/internal/logic/camel_oil/prefetch_order.go +++ b/internal/logic/camel_oil/prefetch_order.go @@ -44,6 +44,13 @@ func (s *sCamelOil) GetPrefetchOrderCapacity(ctx context.Context, amount float64 // PrefetchOrderConcurrently 使用所有可用账号并发拉取订单,直到获取到可用订单为止 func (s *sCamelOil) PrefetchOrderConcurrently(ctx context.Context, amount float64) (result *model.PrefetchOrderResult, err error) { + // 获取设置 + settings, err := GetCamelOilSettings(ctx) + if err != nil { + glog.Errorf(ctx, "获取骆驼模块设置失败: %v", err) + return nil, err + } + // 1. 获取所有在线账号 m := dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()) var onlineAccounts []*entity.V1CamelOilAccount @@ -59,8 +66,8 @@ func (s *sCamelOil) PrefetchOrderConcurrently(ctx context.Context, amount float6 return nil, gerror.New("暂无在线账号可用") } - // 2. 使用控制并发量的信信道控制并发 - concurrencyLimit := min(len(onlineAccounts), consts.CamelOilPrefetchMaxConcurrency) + // 2. 使用设置中的并发数量限制并发 + concurrencyLimit := min(len(onlineAccounts), settings.PrefetchConcurrencyAccounts) var ( resultChan = make(chan *model.PrefetchOrderResult, 1) @@ -88,7 +95,7 @@ func (s *sCamelOil) PrefetchOrderConcurrently(ctx context.Context, amount float6 mu.Unlock() // 拉取订单 - platformOrderId, payUrl, err2 := camel_oil_api.NewClient().CreateOrder(ctx, acc.Phone, acc.Token, amount) + platformOrderId, payUrl, err2 := camel_oil_api.NewClient().CreateCamelOilOrder(ctx, acc.Phone, acc.Token, amount) if err2 != nil { if err2.Error() == "auth_error" { _ = s.UpdateAccountStatus(ctx, acc.Id, consts.CamelOilAccountStatusInvalid, consts.CamelOilAccountChangeTypeInvalidate, "账号token失效") @@ -138,7 +145,7 @@ func (s *sCamelOil) PrefetchOrderConcurrently(ctx context.Context, amount float6 // PrefetchOrder 拉取单个订单(用于单个账号) func (s *sCamelOil) PrefetchOrder(ctx context.Context, account *entity.V1CamelOilAccount, amount float64) (prefetchId int64, err error) { // 1. 从骆驼平台拉取订单 - platformOrderId, payUrl, err := camel_oil_api.NewClient().CreateOrder(ctx, account.Phone, account.Token, amount) + platformOrderId, payUrl, err := camel_oil_api.NewClient().CreateCamelOilOrder(ctx, account.Phone, account.Token, amount) if err != nil { if err.Error() == "auth_error" { _ = s.UpdateAccountStatus(ctx, account.Id, consts.CamelOilAccountStatusInvalid, consts.CamelOilAccountChangeTypeInvalidate, "账号token失效") @@ -178,6 +185,13 @@ func (s *sCamelOil) PrefetchOrder(ctx context.Context, account *entity.V1CamelOi // ConcurrentPrefetchOrders 使用多个账号并发拉取订单 func (s *sCamelOil) ConcurrentPrefetchOrders(ctx context.Context, amount float64, targetCount int) (successCount int, err error) { + // 获取设置 + settings, err := GetCamelOilSettings(ctx) + if err != nil { + glog.Errorf(ctx, "获取骆驼模块设置失败: %v", err) + return 0, err + } + // 1. 获取所有在线账号 m := dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()) var onlineAccounts []*entity.V1CamelOilAccount @@ -193,11 +207,8 @@ func (s *sCamelOil) ConcurrentPrefetchOrders(ctx context.Context, amount float64 return 0, gerror.New("暂无在线账号可用于拉取订单") } - // 2. 使用协程池并发拉取 - concurrencyLimit := consts.CamelOilPrefetchMaxConcurrency - if len(onlineAccounts) < concurrencyLimit { - concurrencyLimit = len(onlineAccounts) - } + // 2. 使用设置中的并发数量限制并发 + concurrencyLimit := settings.PrefetchConcurrencyAccounts var ( wg sync.WaitGroup @@ -274,42 +285,46 @@ func (s *sCamelOil) SupplementPrefetchOrders(ctx context.Context) (supplementedC gmlock.Lock(consts.CamelOilPrefetchTaskLockKey) defer gmlock.Unlock(consts.CamelOilPrefetchTaskLockKey) - //找到一个可用账户 - account := &entity.V1CamelOilAccount{} - _ = dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()). - Where(dao.V1CamelOilAccount.Columns().Status, consts.CamelOilAccountStatusOnline). - OrderRandom(). - Scan(&account) - - goods, err := camel_oil_api.NewClient().QueryAvailableDenominations(ctx, account.Token) + // 获取设置 + settings, err := GetCamelOilSettings(ctx) if err != nil { - return 0, gerror.Wrap(err, "查询可用面额失败") + glog.Errorf(ctx, "获取骆驼模块设置失败: %v", err) + return 0, err } + + // 如果没有设置面额配置,直接返回 + if len(settings.TargetDenominations) == 0 { + glog.Infof(ctx, "未配置面额设置,无需补充预拉取订单") + return 0, nil + } + successCount := 0 - for _, good := range goods { + for _, denom := range settings.TargetDenominations { // 1. 获取当前库存 - capacity, err2 := s.GetPrefetchOrderCapacity(ctx, good.GoodPrice) + capacity, err2 := s.GetPrefetchOrderCapacity(ctx, float64(denom.Denomination)) if err2 != nil { return 0, gerror.Wrap(err2, "获取预拉取订单库存失败") } - glog.Infof(ctx, "当前预拉取订单库存: %d", capacity) + glog.Infof(ctx, "当前预拉取订单库存 (面额 %d): %d", denom.Denomination, capacity) // 2. 如果库存充足,无需补充 - if capacity >= consts.CamelOilPrefetchOrderMinCapacity { - glog.Infof(ctx, "预拉取订单库存充足 (%d >= %d),无需补充", capacity, consts.CamelOilPrefetchOrderMinCapacity) - return 0, nil + if capacity >= denom.MinCapacity { + glog.Infof(ctx, "面额 %d 预拉取订单库存充足 (%d >= %d),无需补充", denom.Denomination, capacity, denom.MinCapacity) + continue } // 3. 计算需要补充的数量 - needCount := consts.CamelOilPrefetchOrderTargetCapacity - capacity - glog.Infof(ctx, "预拉取订单库存不足,需要补充 %d 单,金额: 100元", needCount) + needCount := denom.TargetCapacity - capacity + glog.Infof(ctx, "面额 %d 预拉取订单库存不足,需要补充 %d 单", denom.Denomination, needCount) // 4. 并发拉取订单 - successCount, err = s.ConcurrentPrefetchOrders(ctx, good.GoodPrice, needCount) + success, err := s.ConcurrentPrefetchOrders(ctx, float64(denom.Denomination), needCount) if err != nil { - return 0, gerror.Wrap(err, "并发拉取订单失败") + glog.Errorf(ctx, "面额 %d 并发拉取订单失败: %v", denom.Denomination, err) + continue } + successCount += success } return successCount, nil } diff --git a/internal/logic/camel_oil/prefetch_order_logs.go b/internal/logic/camel_oil/prefetch_order_logs.go new file mode 100644 index 00000000..9049fd1c --- /dev/null +++ b/internal/logic/camel_oil/prefetch_order_logs.go @@ -0,0 +1,165 @@ +package camel_oil + +import ( + "context" + "encoding/json" + "fmt" + "time" + + v1 "kami/api/camel_oil/v1" + "kami/utility/cache" + + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/os/gtime" +) + +// GetPrefetchOrderLogs 获取预拉取订单日志 +func (s *sCamelOil) GetPrefetchOrderLogs(ctx context.Context, req *v1.GetPrefetchOrderLogsReq) (res *v1.GetPrefetchOrderLogsRes, err error) { + // 计算时间范围跨度 + duration := req.EndTime.Time.Sub(req.StartTime.Time) + if duration <= 0 { + return nil, gerror.New("结束时间必须大于开始时间") + } + + // 限制查询时间范围不超过24小时 + if duration.Hours() > 24 { + return nil, gerror.New("查询时间范围不能超过24小时") + } + + glog.Infof(ctx, "获取预拉取订单日志,时间范围: %s - %s", req.StartTime.Format("Y-m-d H:i:s"), req.EndTime.Format("Y-m-d H:i:s")) + + // Redis key 前缀 + redisKeyPrefix := "camel_oil:prefetch:logs:" + + var logs []v1.PrefetchOrderLogItem + + // 遍历时间范围内的每一分钟 + currentTime := req.StartTime.Time + for currentTime.Before(req.EndTime.Time) || currentTime.Equal(req.EndTime.Time) { + timeKey := currentTime.Format("2006-01-02_15:04") + redisKey := redisKeyPrefix + timeKey + + // 从Redis获取日志数据 + logData, err := cache.NewCache().Get(ctx, redisKey) + if err != nil { + glog.Warningf(ctx, "获取Redis日志失败,key: %s, error: %v", redisKey, err) + currentTime = currentTime.Add(time.Minute) + continue + } + + if logData.IsEmpty() { + currentTime = currentTime.Add(time.Minute) + continue + } + + // 解析日志数据 + var minuteLogs []map[string]interface{} + if err := json.Unmarshal([]byte(logData.String()), &minuteLogs); err != nil { + glog.Warningf(ctx, "解析日志数据失败,key: %s, error: %v", redisKey, err) + currentTime = currentTime.Add(time.Minute) + continue + } + + // 处理每条日志 + for _, log := range minuteLogs { + // 提取面额 + amount, ok := log["amount"].(float64) + if !ok { + continue + } + + // 提取手机号 + phone, ok := log["phone"].(string) + if !ok { + continue + } + + // 提取时间戳 + timestamp, ok := log["timestamp"].(string) + if !ok { + continue + } + + // 提取响应数据 + respStr, ok := log["resp_str"].(string) + if !ok { + respStr = "" + } + + // 添加到结果列表,直接使用结构化字段 + logs = append(logs, v1.PrefetchOrderLogItem{ + Timestamp: timestamp, + Phone: phone, + Amount: amount, + ResponseData: respStr, + }) + } + + currentTime = currentTime.Add(time.Minute) + } + + glog.Infof(ctx, "获取到预拉取订单日志 %d 条", len(logs)) + + // 返回结果 + res = &v1.GetPrefetchOrderLogsRes{ + Logs: logs, + } + + return res, nil +} + +// SavePrefetchOrderLog 保存预拉取订单请求日志到Redis +func (s *sCamelOil) SavePrefetchOrderLog(ctx context.Context, phone string, amount float64, respStr string) { + // 构建日志数据 + logEntry := map[string]interface{}{ + "timestamp": gtime.Now().Format("Y-m-d H:i:s"), + "phone": phone, // 实际使用时应该脱敏处理 + "amount": amount, + "resp_str": respStr, // 保存响应数据 + } + + // 将日志数据序列化为JSON + logData, jsonErr := json.Marshal(logEntry) + if jsonErr != nil { + glog.Errorf(ctx, "序列化预拉取订单日志失败: %v", jsonErr) + return + } + + // 生成Redis key (按分钟级别) + now := gtime.Now() + timeKey := now.Format("2006-01-02_15:04") + redisKey := fmt.Sprintf("camel_oil:prefetch:logs:%s", timeKey) + + // 获取当前分钟已有的日志 + var logs []map[string]interface{} + existingData, cacheErr := cache.NewCache().Get(ctx, redisKey) + if cacheErr == nil && !existingData.IsEmpty() { + if err := json.Unmarshal([]byte(existingData.String()), &logs); err != nil { + // 如果解析失败,创建新的日志数组 + logs = []map[string]interface{}{} + } + } + + // 添加新的日志 + var newLog map[string]interface{} + if err := json.Unmarshal(logData, &newLog); err == nil { + logs = append(logs, newLog) + } + + // 重新序列化并保存到Redis,设置1小时过期时间 + updatedLogData, marshalErr := json.Marshal(logs) + if marshalErr != nil { + glog.Errorf(ctx, "重新序列化日志失败: %v", marshalErr) + return + } + + // 使用cache包保存到Redis + if cacheErr := cache.NewCache().Set(ctx, redisKey, string(updatedLogData), time.Hour); cacheErr != nil { + glog.Errorf(ctx, "保存预拉取订单日志到Redis失败: %v", cacheErr) + return + } + + // 记录到应用日志 + glog.Infof(ctx, "保存预拉取订单日志 - 手机号: %s, 金额: %.2f", phone, amount) +} \ No newline at end of file diff --git a/internal/logic/camel_oil/settings.go b/internal/logic/camel_oil/settings.go new file mode 100644 index 00000000..a63d44e4 --- /dev/null +++ b/internal/logic/camel_oil/settings.go @@ -0,0 +1,97 @@ +package camel_oil + +import ( + "context" + "encoding/json" + + "github.com/gogf/gf/v2/frame/g" + + "kami/api/camel_oil/v1" + "kami/utility/cache" +) + +// GetSettings 获取骆驼模块设置 +func (s *sCamelOil) GetSettings(ctx context.Context, req *v1.GetSettingsReq) (res *v1.GetSettingsRes, err error) { + // 从Redis获取设置 + settingsKey := cache.CamelOilSettings.Key("default") + c := cache.NewCache() + + settingsData, err := c.Get(ctx, settingsKey) + if err != nil { + return nil, err + } + + settings := &v1.CamelOilSettings{ + UseHaozhuPlatform: false, // 默认不使用豪猪平台拉取手机号 + LoginAccountCount: 0, + PrefetchConcurrencyAccounts: 0, + SingleAccountConcurrency: 1, + TargetDenominations: []v1.DenominationSetting{}, + } + + if settingsData != nil && !settingsData.IsNil() { + // 如果有缓存数据,解析它 + err = json.Unmarshal([]byte(settingsData.String()), settings) + if err != nil { + g.Log().Error(ctx, "解析骆驼模块设置失败", err) + // 解析失败则使用默认值 + } + } + + res = &v1.GetSettingsRes{ + CamelOilSettings: *settings, + } + return res, nil +} + +// UpdateSettings 更新骆驼模块设置 +func (s *sCamelOil) UpdateSettings(ctx context.Context, req *v1.UpdateSettingsReq) (res *v1.UpdateSettingsRes, err error) { + // 将设置序列化为JSON + settingsJSON, err := json.Marshal(req.CamelOilSettings) + if err != nil { + return nil, err + } + + // 保存到Redis,永不过期(设置0表示永不过期) + settingsKey := cache.CamelOilSettings.Key("default") + c := cache.NewCache() + err = c.Set(ctx, settingsKey, string(settingsJSON), 0) + if err != nil { + return nil, err + } + + g.Log().Info(ctx, "骆驼模块设置已更新", string(settingsJSON)) + + res = &v1.UpdateSettingsRes{} + return res, nil +} + +// GetCamelOilSettings 获取骆驼模块设置的辅助函数 +func GetCamelOilSettings(ctx context.Context) (*v1.CamelOilSettings, error) { + settingsKey := cache.CamelOilSettings.Key("default") + c := cache.NewCache() + + settingsData, err := c.Get(ctx, settingsKey) + if err != nil { + return nil, err + } + + // 默认设置 + settings := &v1.CamelOilSettings{ + UseHaozhuPlatform: false, // 默认不使用豪猪平台拉取手机号 + LoginAccountCount: 10, // 默认10个账号 + PrefetchConcurrencyAccounts: 10, // 默认10个并发 + SingleAccountConcurrency: 3, // 默认3个并发 + TargetDenominations: []v1.DenominationSetting{}, + } + + if settingsData != nil && !settingsData.IsNil() { + err = json.Unmarshal([]byte(settingsData.String()), settings) + if err != nil { + g.Log().Error(ctx, "解析骆驼模块设置失败", err) + // 解析失败则使用默认值 + } + } + + return settings, nil +} \ No newline at end of file diff --git a/internal/service/camel_oil.go b/internal/service/camel_oil.go index 47967056..6304e7d9 100644 --- a/internal/service/camel_oil.go +++ b/internal/service/camel_oil.go @@ -105,6 +105,14 @@ type ( SupplementPrefetchOrders(ctx context.Context) (supplementedCount int, err error) // MatchPrefetchOrder 将预拉取订单与用户订单进行匹配 MatchPrefetchOrder(ctx context.Context, orderId string, amount float64) (result *model.PrefetchOrderResult, err error) + // GetPrefetchOrderLogs 获取预拉取订单日志 + GetPrefetchOrderLogs(ctx context.Context, req *v1.GetPrefetchOrderLogsReq) (res *v1.GetPrefetchOrderLogsRes, err error) + // SavePrefetchOrderLog 保存预拉取订单请求日志到Redis + SavePrefetchOrderLog(ctx context.Context, phone string, amount float64, respStr string) + // GetSettings 获取骆驼模块设置 + GetSettings(ctx context.Context, req *v1.GetSettingsReq) (res *v1.GetSettingsRes, err error) + // UpdateSettings 更新骆驼模块设置 + UpdateSettings(ctx context.Context, req *v1.UpdateSettingsReq) (res *v1.UpdateSettingsRes, err error) // CreateToken 创建 Token CreateToken(ctx context.Context, tokenName string, tokenValue string, phone string, remark string, rechargeLimitAmount float64, rechargeLimitCount int) (tokenId int64, err error) // GetTokenInfo 获取 Token 信息 diff --git a/utility/cache/consts.go b/utility/cache/consts.go index 105aaa90..4696bc4b 100644 --- a/utility/cache/consts.go +++ b/utility/cache/consts.go @@ -18,6 +18,7 @@ const ( RedeemAccountTargetIDByUser CachedEnum = "redeem_account_target_id_by_user" RedeemAccountTargetIDByCKAndUser CachedEnum = "redeem_account_target_account_id_by_ck_and_user" RedeemAccountTmpStopped CachedEnum = "redeem_account_tmp_stopped" + CamelOilSettings CachedEnum = "camel_oil_settings" ) func (e CachedEnum) Key(key interface{}) string { diff --git a/utility/cron/cron.go b/utility/cron/cron.go index bdd28add..1232a03c 100644 --- a/utility/cron/cron.go +++ b/utility/cron/cron.go @@ -97,9 +97,9 @@ func registerCamelOilTasks(ctx context.Context) { _ = service.CamelOil().CronAccountDailyResetTask(ctx) }, "CamelOilAccountDailyReset") - _, _ = gcron.AddSingleton(ctx, "@every 60m", func(ctx context.Context) { - _, _ = service.CamelOil().CronCleanExpiredPrefetchOrders(ctx) - }, "CamelOilCleanExpiredPrefetchOrders") + //_, _ = gcron.AddSingleton(ctx, "@every 60m", func(ctx context.Context) { + // _, _ = service.CamelOil().CronCleanExpiredPrefetchOrders(ctx) + //}, "CamelOilCleanExpiredPrefetchOrders") _, _ = gcron.AddSingleton(ctx, "@every 1s", func(ctx context.Context) { _ = service.CamelOil().CronPrefetchOrderSupplementTask(ctx) diff --git a/utility/integration/camel_oil_api/api.go b/utility/integration/camel_oil_api/api.go index 6d9440b4..bc58cee9 100644 --- a/utility/integration/camel_oil_api/api.go +++ b/utility/integration/camel_oil_api/api.go @@ -6,6 +6,8 @@ import ( "errors" "github.com/gogf/gf/v2/net/gclient" "github.com/gogf/gf/v2/os/glog" + "kami/api/camel_oil/v1" + "kami/internal/service" "math" "math/rand" "strings" @@ -165,8 +167,8 @@ func (c *Client) getAuth(ctx context.Context, auth string) string { return authRes } -// QueryAvailableDenominations 查询所有可用面额 -func (c *Client) QueryAvailableDenominations(ctx context.Context, token string) ([]Good, error) { +// QueryCamelOilCardAvailableDenominations 查询所有可用面额 +func (c *Client) QueryCamelOilCardAvailableDenominations(ctx context.Context, token string) ([]Good, error) { c.Client.SetHeader("authorization", "Bearer "+c.getAuth(ctx, token)) resp, err := c.Client.ContentJson().Post(ctx, "https://recharge3.bac365.com/camel_wechat_mini_oil_server/eCardMall/wechatCardGoods", struct { Channel string `json:"channel"` @@ -180,15 +182,17 @@ func (c *Client) QueryAvailableDenominations(ctx context.Context, token string) Code string `json:"code"` Goods []Good `json:"goods"` }{} - if err = json.Unmarshal(resp.ReadAll(), &queryRespStruct); err != nil { + respStr := resp.ReadAllString() + glog.Info(ctx, "查询面额", respStr) + if err = json.Unmarshal([]byte(respStr), &queryRespStruct); err != nil { return nil, err } return queryRespStruct.Goods, nil } -func (c *Client) CreateOrder(ctx context.Context, phone, token string, amount float64) (orderId string, payUrl string, err error) { +func (c *Client) CreateCamelOilOrder(ctx context.Context, phone, token string, amount float64) (orderId string, payUrl string, err error) { c.Client.SetHeader("Authorization", "Bearer "+c.getAuth(ctx, token)) - goods, err := c.QueryAvailableDenominations(ctx, token) + goods, err := c.QueryCamelOilCardAvailableDenominations(ctx, token) if err != nil { return "", "", err } @@ -203,9 +207,18 @@ func (c *Client) CreateOrder(ctx context.Context, phone, token string, amount fl return "", "", errors.New("当前金额不支持") } - // 协程池参数 - const maxConcurrency = 5 - const maxRetries = 50 + const maxRetries = 10 + + // 获取骆驼模块设置 + settingsRes, err := service.CamelOil().GetSettings(ctx, &v1.GetSettingsReq{}) + var maxConcurrency int + if err != nil { + glog.Error(ctx, "获取骆驼模块设置失败,使用默认并发数", err) + // 使用默认值继续执行 + maxConcurrency = 5 + } else { + maxConcurrency = settingsRes.PrefetchConcurrencyAccounts + } // 结果存储 var resultMutex sync.Mutex @@ -220,7 +233,7 @@ func (c *Client) CreateOrder(ctx context.Context, phone, token string, amount fl var wg sync.WaitGroup semaphore := make(chan struct{}, maxConcurrency) - for i := 0; i < maxRetries; i++ { + for range maxRetries { // 检查是否已经有结果 if completed.Load() { break @@ -306,6 +319,10 @@ func (c *Client) CreateOrder(ctx context.Context, phone, token string, amount fl return } respStr := resp.ReadAllString() + + // 记录响应数据到日志 + service.CamelOil().SavePrefetchOrderLog(ctx, phone, amount, respStr) + respStruct := struct { Code string `json:"code"` Message string `json:"message"` diff --git a/utility/integration/camel_oil_api/api_test.go b/utility/integration/camel_oil_api/api_test.go index bbc96a33..5db4f365 100644 --- a/utility/integration/camel_oil_api/api_test.go +++ b/utility/integration/camel_oil_api/api_test.go @@ -48,7 +48,7 @@ func TestClient_CreateOrder(t *testing.T) { for t2 := range ticker.C { glog.Info(t.Context(), t2) client := NewClient() - orderId, payUrl, err := client.CreateOrder(t.Context(), "13966750117", "buOSl900L1o6htbHZ6ou32NGtyEsuLu3TeJJlqEZNAvfPzlRk/OqkYm7rMh0X+otku80Jz+sjIlfnf8JXUIjH4NkTRgX92w2knTEjqIc92MSnEi9qyV0lTKue/ycVD1INIGJGBn3vJopJrcb8eupKUjVhFXvONAW2RQ7atAeANc=", 1000) + orderId, payUrl, err := client.CreateCamelOilOrder(t.Context(), "13966750117", "buOSl900L1o6htbHZ6ou32NGtyEsuLu3TeJJlqEZNAvfPzlRk/OqkYm7rMh0X+otku80Jz+sjIlfnf8JXUIjH4NkTRgX92w2knTEjqIc92MSnEi9qyV0lTKue/ycVD1INIGJGBn3vJopJrcb8eupKUjVhFXvONAW2RQ7atAeANc=", 100) if err == nil { glog.Info(t.Context(), orderId, payUrl, err) break