refactor(gateway): 使用线程池替换裸协程

- 在 notify、pay_solve、payfor_query 和 settle_service 模块中引入 gopool
- 创建多个线程池实例,用于不同场景的异步任务处理
- 用线程池执行异步任务,以替代直接使用 go 关键字启动协程
- 调整部分代码结构,以适应线程池的使用
This commit is contained in:
danial
2024-12-08 21:32:08 +08:00
parent a790730fc8
commit 2770fb80c5
6 changed files with 61 additions and 28 deletions

1
go.mod
View File

@@ -19,6 +19,7 @@ require (
require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/gopkg v0.1.2-0.20240828084325-780ca9ee70fb // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect

2
go.sum
View File

@@ -4,6 +4,8 @@ github.com/beego/beego/v2 v2.3.2-0.20241006064559-d5830a0fc2ee h1:j+mbzD7idTH1kt
github.com/beego/beego/v2 v2.3.2-0.20241006064559-d5830a0fc2ee/go.mod h1:5cqHsOHJIxkq44tBpRvtDe59GuVRVv/9/tyVDxd5ce4=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bytedance/gopkg v0.1.2-0.20240828084325-780ca9ee70fb h1:glte+Ka6C5efXn/QlEAE/wwNrvE+3mYo/ce69fpvtrE=
github.com/bytedance/gopkg v0.1.2-0.20240828084325-780ca9ee70fb/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM=
github.com/bytedance/sonic v1.11.7 h1:k/l9p1hZpNIMJSk37wL9ltkcpqLfIho1vYthi4xT2t4=
github.com/bytedance/sonic v1.11.7/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=
github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM=

View File

@@ -6,6 +6,7 @@ import (
"gateway/internal/entities/message"
"gateway/internal/models/notify"
"gateway/internal/utils"
"github.com/bytedance/gopkg/util/gopool"
"os"
"strings"
"time"
@@ -29,6 +30,11 @@ const (
LimitTimes = 5 // 最多通知5次
)
var (
notifyPool = gopool.NewPool("notify", 20, gopool.NewConfig())
sendNotifyPool = gopool.NewPool("sendNotify", 20, gopool.NewConfig())
)
// SendOrderNotify 给商户发送订单结果
func SendOrderNotify(bankOrderId string) {
if !notify.NotifyInfoExistByBankOrderId(bankOrderId) {
@@ -73,7 +79,9 @@ func SendOrderNotify(bankOrderId string) {
FirstNotifyTime: notifyInfo.CreateTime.Format("2006-12-20 12:43:34"),
NotifyTimes: notifyInfo.Times, LimitTimes: LimitTimes, Status: notifyInfo.Status,
}
go OrderNotifyTimer(task)
notifyPool.Go(func() {
OrderNotifyTimer(task)
})
if !notify.UpdateNotifyInfo(notifyInfo) {
logs.Error("订单回调失败,数据库更新失败:" + bankOrderId)
}
@@ -139,7 +147,9 @@ func CreateOrderDelayQueue() {
LimitTimes: LimitTimes,
Status: nf.Status,
}
go OrderNotifyTimer(task)
notifyPool.Go(func() {
OrderNotifyTimer(task)
})
}
}
@@ -164,7 +174,9 @@ func CreateOrderNotifyConsumer() {
case v := <-orderNotify.C:
if v != nil {
bankOrderId := string(v.Body)
go SendOrderNotify(bankOrderId)
sendNotifyPool.Go(func() {
SendOrderNotify(bankOrderId)
})
// 应答,重要
err := conn.Ack(v)
if err != nil {

View File

@@ -9,6 +9,7 @@ import (
"gateway/internal/models/road"
"gateway/internal/service/pay_for"
"gateway/internal/utils"
"github.com/bytedance/gopkg/util/gopool"
"os"
"time"
@@ -31,6 +32,11 @@ const (
PayForQueryInterval = 5 // 时间间隔为5分钟
)
var (
PayForQueryPool = gopool.NewPool("PayForQueryPool", 20, gopool.NewConfig())
payForQueryConsumerPool = gopool.NewPool("PayForQueryConsumer", 20, gopool.NewConfig())
)
func PayForQueryTimer(task PayForQueryTask) {
for {
select {
@@ -67,7 +73,9 @@ func PayForSupplier(task PayForQueryTask) {
if task.QueryTimes <= task.LimitTimes {
task.QueryTimes += 1
task.Delay = time.NewTimer(time.Duration(PayForQueryInterval) * time.Minute)
go PayForQueryTimer(task)
PayForQueryPool.Go(func() {
PayForQueryTimer(task)
})
} else {
logs.Info(fmt.Sprintf("该代付订单已经超过最大查询次数bankOrderId = %s", task.BankOrderId))
}
@@ -97,8 +105,9 @@ func payForQueryConsumer(bankOrderId string) {
LimitTimes: PayForLimitTimes,
Status: payFor.Status,
}
go PayForQueryTimer(payForQueryTask)
PayForQueryPool.Go(func() {
PayForQueryTimer(payForQueryTask)
})
}
// CreatePayForQueryConsumer 创建代付查询的消费者
@@ -109,9 +118,6 @@ func CreatePayForQueryConsumer() {
logs.Error("启动消息队列消费者失败....")
os.Exit(1)
}
logs.Notice("代付查询消费启动成功......")
payForQuery, err := conn.Subscribe(config.MQ_PAYFOR_QUERY, stomp.AckClient)
if err != nil {
logs.Error("订阅代付查询失败......")
@@ -123,7 +129,9 @@ func CreatePayForQueryConsumer() {
case v := <-payForQuery.C:
if v != nil {
bankOrderId := string(v.Body)
go payForQueryConsumer(bankOrderId)
payForQueryConsumerPool.Go(func() {
payForQueryConsumer(bankOrderId)
})
// 应答,重要
err := conn.Ack(v)
if err != nil {

View File

@@ -12,6 +12,7 @@ import (
"gateway/internal/models/order"
"gateway/internal/models/road"
"gateway/internal/utils"
"github.com/bytedance/gopkg/util/gopool"
"net/url"
"strconv"
"time"
@@ -20,6 +21,11 @@ import (
"github.com/beego/beego/v2/core/logs"
)
var (
orderNotify = gopool.NewPool("orderNotify", 20, gopool.NewConfig())
sendMessageNotify = gopool.NewPool("sendMessageNotify", 20, gopool.NewConfig())
)
// SolvePaySuccess 处理支付成功的加款等各项操作
func SolvePaySuccess(bankOrderId string, factAmount float64, trxNo string) bool {
o := orm.NewOrm()
@@ -130,7 +136,9 @@ func SolvePaySuccess(bankOrderId string, factAmount float64, trxNo string) bool
return err
}
// 给下游发送回调通知
go CreateOrderNotifyInfo(orderInfo, config.SUCCESS)
orderNotify.Go(func() {
CreateOrderNotifyInfo(orderInfo, config.SUCCESS)
})
return nil
})
if err != nil {
@@ -162,7 +170,9 @@ func SolvePayFail(bankOrderId, transId string) bool {
logs.Error("更改订单状态失败:", err)
return err
}
go CreateOrderNotifyInfo(orderTmp, config.FAIL)
orderNotify.Go(func() {
CreateOrderNotifyInfo(orderTmp, config.FAIL)
})
return nil
})
if err != nil {
@@ -490,5 +500,7 @@ func CreateOrderNotifyInfo(orderInfo order.OrderInfo, tradeStatus string) {
logs.Error(fmt.Sprintf("订单bankOrderId=%s插入回调数据库失败", orderInfo.BankOrderId))
}
// 将订单发送到消息队列,给下面的商户进行回调
go message.SendMessage(config.MqOrderNotify, orderInfo.BankOrderId)
sendMessageNotify.Go(func() {
message.SendMessage(config.MqOrderNotify, orderInfo.BankOrderId)
})
}

View File

@@ -212,20 +212,18 @@ func MerchantAbleAmount(merchantLoad merchant.MerchantLoadInfo) bool {
func OrderSettleInit() {
// 每隔5分钟巡查有没有可以进行结算的订单
go func() {
settleTimer := time.NewTimer(time.Duration(Interval) * time.Minute)
oneMinuteTimer := time.NewTimer(time.Duration(Minutes) * time.Minute)
for {
select {
case <-settleTimer.C:
settleTimer = time.NewTimer(time.Duration(Interval) * time.Minute)
logs.Info("开始对商户进行支付订单结算>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
OrderSettle()
case <-oneMinuteTimer.C:
oneMinuteTimer = time.NewTimer(time.Duration(Minutes) * time.Minute)
logs.Info("开始执行商户的解款操作>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
MerchantLoadSolve()
}
settleTimer := time.NewTimer(time.Duration(Interval) * time.Minute)
oneMinuteTimer := time.NewTimer(time.Duration(Minutes) * time.Minute)
for {
select {
case <-settleTimer.C:
settleTimer = time.NewTimer(time.Duration(Interval) * time.Minute)
logs.Info("开始对商户进行支付订单结算>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
OrderSettle()
case <-oneMinuteTimer.C:
oneMinuteTimer = time.NewTimer(time.Duration(Minutes) * time.Minute)
logs.Info("开始执行商户的解款操作>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
MerchantLoadSolve()
}
}()
}
}