Files
kami_gateway/internal/service/notify/order_notify.go
danial e88ff05a14 refactor(trace): 重命名 otel 包为 otelTrace并更新相关引用
- 将内部使用的 otel 包重命名为 otelTrace
- 更新了所有引用该包的文件中的导入路径
- 修改了部分函数和变量名称以适应新的包名
2025-02-23 21:56:29 +08:00

196 lines
5.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package notify
import (
"context"
"fmt"
"gateway/internal/config"
"gateway/internal/consts"
"gateway/internal/models/notify"
"gateway/internal/otelTrace"
"gateway/internal/service/message"
"gateway/internal/utils"
"os"
"strings"
"time"
"github.com/bytedance/gopkg/util/gopool"
"go.uber.org/zap"
"github.com/beego/beego/v2/client/httplib"
"github.com/beego/beego/v2/core/logs"
"github.com/go-stomp/stomp/v3"
)
type OrderNotifyTask struct {
Delay *time.Timer
MerchantOrderId string
BankOrderId string
FirstNotifyTime string
NotifyTimes int
LimitTimes int
Status string // success-通知成功,其余的为待通知或者通知未完成
}
var (
notifyPool = gopool.NewPool("notify", 20, gopool.NewConfig())
sendNotifyPool = gopool.NewPool("sendNotify", 20, gopool.NewConfig())
)
// SendOrderNotify 给商户发送订单结果
func SendOrderNotify(ctx context.Context, bankOrderId string) {
if !notify.NotifyInfoExistByBankOrderId(bankOrderId) {
otelTrace.Logger.WithContext(ctx).Error("该订单不存在回调内容bankOrderId= " + bankOrderId)
return
}
notifyInfo := notify.GetNotifyInfoByBankOrderId(ctx, bankOrderId)
if notifyInfo.Status == "success" {
otelTrace.Logger.WithContext(ctx).Info(fmt.Sprintf("该订单= %s,已经回调", bankOrderId))
return
}
notifyInfo.Times += 1
notifyInfo.UpdateTime = time.Now()
req := httplib.Get(notifyInfo.Url)
response, err := req.String()
otelTrace.Logger.WithContext(ctx).Info(fmt.Sprintf("bankOrderId = %s, 回调结果为:%s", bankOrderId, response))
if err == nil && ("success" == response || "SUCCESS" == response) {
if strings.Contains(strings.ToLower(response), "success") {
notifyInfo.Status = "success"
if notify.UpdateNotifyInfo(ctx, notifyInfo) {
otelTrace.Logger.WithContext(ctx).Info("订单回调成功, bankOrderId=", zap.String("bankOrderId", bankOrderId))
} else {
otelTrace.Logger.WithContext(ctx).Error("订单回调成功,但是更新数据库失败, bankOrderId=", zap.String("bankOrderId", bankOrderId))
}
} else {
logs.Notice("订单已经回调,商户已经收到了回调通知,但是返回值错误: ", response)
}
} else {
if notifyInfo.Times > consts.LimitTimes {
logs.Notice(fmt.Sprintf("该订单= %s已经超过了回调次数", bankOrderId))
} else {
minute := GetOrderNotifyMinute(notifyInfo.Times)
otelTrace.Logger.WithContext(ctx).Info(fmt.Sprintf("bankOrderId = %s, 进行第 %d 次回调,本次延时时间为:%d", notifyInfo.BankOrderId, notifyInfo.Times, minute))
task := OrderNotifyTask{
Delay: time.NewTimer(time.Duration(minute) * time.Minute),
MerchantOrderId: notifyInfo.MerchantOrderId,
BankOrderId: notifyInfo.BankOrderId,
FirstNotifyTime: notifyInfo.CreateTime.Format("2006-12-20 12:43:34"),
NotifyTimes: notifyInfo.Times, LimitTimes: consts.LimitTimes, Status: notifyInfo.Status,
}
notifyPool.Go(func() {
// 创建一个5分钟超时的上下文
ctx2, span := otelTrace.NewSchedulerTrace().Start(otelTrace.InitCtx, "sendOrderNotify")
defer span.End()
OrderNotifyTimer(ctx2, task)
})
if !notify.UpdateNotifyInfo(ctx, notifyInfo) {
otelTrace.Logger.WithContext(ctx).Error("订单回调失败,数据库更新失败:" + bankOrderId)
}
}
}
}
func GetOrderNotifyMinute(times int) int {
cur := 0
switch times {
case 0:
cur = 0
break
case 1:
cur = 1
break
case 2:
cur = 2
break
case 3:
cur = 5
break
case 4:
cur = 15
break
case 5:
cur = 30
break
default:
cur = 45
break
}
return cur
}
func OrderNotifyTimer(ctx context.Context, task OrderNotifyTask) {
for {
select {
case <-task.Delay.C:
SendOrderNotify(ctx, task.BankOrderId)
return
case <-time.After(time.Minute * 70):
logs.Notice("订单回调延时执行70分钟没有执行")
return
}
}
}
// CreateOrderDelayQueue 读取一小时之内,未发送成功,并且还没有到达回调限制次数的记录读取,存入延迟队列
func CreateOrderDelayQueue(ctx context.Context) {
params := make(map[string]interface{})
params["times__lte"] = consts.LimitTimes
params["create_time__gte"] = utils.GetDateTimeBeforeHours(48)
notifyList := notify.GetNotifyInfosNotSuccess(ctx, params)
for _, nf := range notifyList {
minute := GetOrderNotifyMinute(nf.Times)
task := OrderNotifyTask{
Delay: time.NewTimer(time.Duration(minute) * time.Minute),
MerchantOrderId: nf.MerchantOrderId,
BankOrderId: nf.BankOrderId,
FirstNotifyTime: nf.CreateTime.Format("2006-01-02 15:04:05"),
NotifyTimes: nf.Times,
LimitTimes: consts.LimitTimes,
Status: nf.Status,
}
notifyPool.Go(func() {
OrderNotifyTimer(ctx, task)
})
}
}
// CreateOrderNotifyConsumer 创建订单回调消费者
func CreateOrderNotifyConsumer(ctx context.Context) {
ctx, cancel := otelTrace.Span(ctx, "CreateOrderNotifyConsumer", "CreateOrderNotifyConsumer")
defer cancel()
CreateOrderDelayQueue(ctx)
// 启动定时任务
conn := message.GetActiveMQConn()
if conn == nil {
otelTrace.Logger.WithContext(ctx).Error("启动消息队列消费者失败....")
os.Exit(1)
}
logs.Notice("订单回调消息队列启动成功......")
orderNotify, err := conn.Subscribe(config.MqOrderNotify, stomp.AckClient)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("订阅订单回调失败......")
os.Exit(1)
}
for {
select {
case v := <-orderNotify.C:
if v != nil {
bankOrderId := string(v.Body)
sendNotifyPool.CtxGo(ctx, func() {
ctx2, span := otelTrace.NewSchedulerTrace().Start(otelTrace.InitCtx, "Span")
defer span.End()
SendOrderNotify(ctx2, bankOrderId)
})
// 应答,重要
err := conn.Ack(v)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("消息应答失败!")
}
}
}
}
}