feat(supplier): 实现666通道队列处理器

- 新增666通道的队列处理器实现,包括登录、提交卡密和查询卡密功能
- 添加基础通道处理器和登录结果、卡密提交结果、卡密查询结果等数据结构
- 实现通道会话管理,支持会话存储和获取
- 添加相关单元测试
This commit is contained in:
danial
2025-06-14 19:08:35 +08:00
parent 9bd09638c6
commit abc54c259e
18 changed files with 3085 additions and 965 deletions

2
go.mod
View File

@@ -62,6 +62,7 @@ require (
github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/panjf2000/ants/v2 v2.11.3 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.64.0 // indirect
@@ -76,6 +77,7 @@ require (
golang.org/x/arch v0.17.0 // indirect
golang.org/x/crypto v0.38.0 // indirect
golang.org/x/net v0.40.0 // indirect
golang.org/x/sync v0.14.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.25.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect

4
go.sum
View File

@@ -95,6 +95,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM=
github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk=
github.com/panjf2000/ants/v2 v2.11.3 h1:AfI0ngBoXJmYOpDh9m516vjqoUu2sLrIVgppI9TZVpg=
github.com/panjf2000/ants/v2 v2.11.3/go.mod h1:8u92CYMUc6gyvTIw8Ru7Mt7+/ESnJahz5EVtqfrilek=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@@ -214,6 +216,8 @@ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View File

@@ -11,6 +11,7 @@ import (
"time"
"gateway/internal/service/supplier"
"gateway/internal/service/supplier/third_party/queue"
"gateway/internal/models/merchant"
"gateway/internal/models/order"
@@ -105,7 +106,51 @@ func (c *BatchSixImpl) login(ctx context.Context, jsonStr string) (bool, string,
}
func (c *BatchSixImpl) SendCard(ctx context.Context, jsonStr string, cardInfo supplier.RedeemCardInfo, attach string) (bool, string) {
for i := 0; i < 3; i++ {
// 尝试使用新的队列系统
queueService := queue.GetQueueService()
if queueService != nil {
// 解析通道ID
id := gojson.Json(jsonStr).Get("id").Tostring()
if id == "" {
otelTrace.Logger.WithContext(ctx).Error("id为空")
return false, "id为空"
}
idInt, err := strconv.ParseInt(id, 10, 64)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("解析失败", zap.Error(err))
return false, "卡编码设置错误"
}
// 解析面值
faceType, err := strconv.ParseFloat(cardInfo.FaceType, 64)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("解析失败", zap.Error(err))
return false, "卡面值格式错误"
}
// 提交卡密任务
err = queueService.SubmitCardTask(
ctx,
queue.ChannelBatchSix,
idInt,
int64(faceType),
cardInfo.CardNo,
cardInfo.Data,
gojson.Json(jsonStr).Get("username").Tostring(),
gojson.Json(jsonStr).Get("password").Tostring(),
attach,
)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("【666】提交卡密任务失败", zap.Error(err))
return false, "提交卡密任务失败:" + err.Error()
}
return true, "等待兑换"
}
// 如果队列系统未初始化,使用原来的方式
for range 3 {
ok, data, loginCookie := c.login(ctx, jsonStr)
if !ok {
return false, data
@@ -124,11 +169,6 @@ func (c *BatchSixImpl) SendCard(ctx context.Context, jsonStr string, cardInfo su
}
})
faceType, err := strconv.ParseFloat(cardInfo.FaceType, 64)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("解析失败", zap.Error(err))
return false, "卡面值格式错误"
}
id := gojson.Json(jsonStr).Get("id").Tostring()
if id == "" {
otelTrace.Logger.WithContext(ctx).Error("id为空")
@@ -154,7 +194,7 @@ func (c *BatchSixImpl) SendCard(ctx context.Context, jsonStr string, cardInfo su
Cardlist: []string{},
Cardno: cardInfo.CardNo,
Cardpwd: cardInfo.Data,
Cardprice: strconv.Itoa(int(faceType)),
Cardprice: cardInfo.FaceType,
Type: 0,
Urgent: 0,
Feilv: "",

View File

@@ -0,0 +1,308 @@
// Package channel 提供666通道处理器实现
package channel
import (
"context"
"encoding/json"
"errors"
"io"
"net/http"
"strconv"
"time"
"github.com/beego/beego/v2/client/httplib"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.uber.org/zap"
)
// BatchSixChannelHandler 666通道处理器
type BatchSixChannelHandler struct {
*BaseChannelHandler
apiBaseURL string
}
// NewBatchSixChannelHandler 创建666通道处理器
func NewBatchSixChannelHandler() *BatchSixChannelHandler {
return &BatchSixChannelHandler{
BaseChannelHandler: NewBaseChannelHandler(ChannelTypeBatchSix),
apiBaseURL: "https://duika.666dkw.com",
}
}
// Login 登录666通道
func (h *BatchSixChannelHandler) Login(ctx context.Context, username, password string, params map[string]string) (*LoginResult, error) {
req := httplib.NewBeegoRequestWithCtx(ctx, h.apiBaseURL+"/getLogin", "POST").
SetTransport(otelhttp.NewTransport(http.DefaultTransport)).
SetTimeout(time.Second*3, time.Second*30).Retries(30)
loginReq := struct {
Type int `json:"type"`
Username string `json:"username"`
Password string `json:"password"`
}{
Type: 0,
Username: username,
Password: password,
}
req, err := req.JSONBody(loginReq)
if err != nil {
h.logger.Error("【666】JSONBody失败", zap.Error(err))
return &LoginResult{Success: false, Message: "登录请求格式失败"}, err
}
response, err := req.Response()
if err != nil {
h.logger.Error("【666】请求失败", zap.Error(err))
return &LoginResult{Success: false, Message: "登录返回数据错误"}, err
}
var loginResp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data string `json:"data"`
}
body, err := io.ReadAll(response.Body)
if err != nil {
h.logger.Error("【666】读取响应体失败", zap.Error(err))
return &LoginResult{Success: false, Message: "登录返回数据错误"}, err
}
h.logger.Info("【666】登录响应", zap.Any("loginReq", loginReq), zap.String("body", string(body)))
err = json.Unmarshal(body, &loginResp)
if err != nil {
h.logger.Error("【666】解析响应体失败", zap.Error(err))
return &LoginResult{Success: false, Message: "登录返回数据错误"}, err
}
if loginResp.Code != 200 {
h.logger.Error("【666】登录失败", zap.String("msg", loginResp.Msg))
return &LoginResult{Success: false, Message: "登录失败:" + loginResp.Data}, nil
}
// 获取并转换 Cookie
cookieStrings := make([]string, 0)
for _, cookie := range response.Cookies() {
cookieStrings = append(cookieStrings, cookie.String())
}
return &LoginResult{
Success: true,
AuthType: AuthTypeCookie,
AccessToken: "",
Message: "登录成功",
Cookies: cookieStrings,
Data: map[string]any{"code": loginResp.Code},
}, nil
}
func (h *BatchSixChannelHandler) SubmitCard(ctx context.Context, channelCode int64, faceValue float64, cardNo, cardPwd string, authData AuthData, params map[string]string) (*CardSubmitResult, error) {
// 验证认证类型
if authData.AuthType != AuthTypeCookie || len(authData.Cookies) == 0 {
return &CardSubmitResult{Success: false, Message: "认证信息无效", NeedRetry: true}, errors.New("认证信息无效")
}
req := httplib.NewBeegoRequestWithCtx(ctx, h.apiBaseURL+"/apitocard", "POST").
SetTransport(otelhttp.NewTransport(http.DefaultTransport)).
SetTimeout(time.Second*3, time.Second*3).Retries(3)
// 设置Cookie
for _, cookieStr := range authData.Cookies {
cookie, err := http.ParseSetCookie(cookieStr)
if err != nil {
h.logger.Error("【666】解析Cookie失败", zap.Error(err))
return &CardSubmitResult{Success: false, Message: "解析Cookie失败", NeedRetry: true}, err
}
if cookie.Name == "PHPSESSID" {
req = req.SetCookie(cookie)
}
}
cardReq := struct {
Id int64 `json:"id"`
Cardlist []string `json:"cardlist"`
Cardno string `json:"cardno"`
Cardpwd string `json:"cardpwd"`
Cardprice string `json:"cardprice"`
Type int `json:"type"`
Urgent int `json:"urgent"`
Feilv string `json:"feilv"`
}{
Id: channelCode,
Cardlist: []string{},
Cardno: cardNo,
Cardpwd: cardPwd,
Cardprice: strconv.Itoa(int(faceValue)),
Type: 0,
Urgent: 0,
Feilv: "",
}
req, err := req.JSONBody(cardReq)
if err != nil {
h.logger.Error("【666】JSONBody失败", zap.Error(err))
return &CardSubmitResult{Success: false, Message: "兑卡请求格式失败", NeedRetry: false}, err
}
responseStr, err := req.String()
if err != nil {
h.logger.Error("【666】请求失败", zap.Error(err))
return &CardSubmitResult{Success: false, Message: "兑卡请求失败", NeedRetry: true}, err
}
h.logger.Info("【666】请求结果", zap.String("response", responseStr))
var resp struct {
Code json.Number `json:"code"`
Msg string `json:"msg"`
Data string `json:"data"`
OrderNo string `json:"orderNo,omitempty"`
}
err = json.Unmarshal([]byte(responseStr), &resp)
if err != nil {
h.logger.Error("【666】解析失败", zap.Error(err))
return &CardSubmitResult{Success: false, Message: "兑卡返回数据错误:" + responseStr, NeedRetry: false}, err
}
if resp.Code == "201" && resp.Data == "请先登录后再来提交" {
return &CardSubmitResult{Success: false, Message: resp.Data, NeedRetry: true}, errors.New(resp.Data)
}
if resp.Code != "200" {
return &CardSubmitResult{Success: false, Message: resp.Data, NeedRetry: false}, errors.New(resp.Data)
}
return &CardSubmitResult{
Success: true,
OrderID: resp.OrderNo,
Message: resp.Data,
Data: map[string]any{"code": resp.Code},
}, nil
}
// QueryCard 查询卡密
func (h *BatchSixChannelHandler) QueryCard(ctx context.Context, channelCode int64, faceValue float64, cardNo, cardPwd, orderID string, authData AuthData, params map[string]string) (*CardQueryResult, error) {
// 验证认证类型
if authData.AuthType != AuthTypeCookie || len(authData.Cookies) == 0 {
return &CardQueryResult{Success: false, Message: "认证信息无效"}, errors.New("认证信息无效")
}
req := httplib.NewBeegoRequestWithCtx(ctx, h.apiBaseURL+"/getSellCard", "GET").
SetTransport(otelhttp.NewTransport(http.DefaultTransport)).
SetTimeout(time.Second*3, time.Second*3).Retries(3)
req.Param("limit", "1")
req.Param("page", "1")
req.Param("day", "0")
req.Param("rekey", "3423423")
req.Param("setype", "card_no")
req.Param("cardype", strconv.Itoa(int(channelCode)))
req.Param("status", "0")
// 设置Cookie
for _, cookieStr := range authData.Cookies {
cookie, err := http.ParseSetCookie(cookieStr)
if err != nil {
h.logger.Error("【666】解析Cookie失败", zap.Error(err))
return &CardQueryResult{Success: false, Message: "解析Cookie失败"}, err
}
if cookie.Name == "PHPSESSID" {
req = req.SetCookie(cookie)
}
}
response, err := req.Response()
if err != nil {
h.logger.Error("【666】JSONBody失败", zap.Error(err))
return &CardQueryResult{Success: false, Message: "查询请求格式失败"}, err
}
if response.StatusCode == 302 {
return &CardQueryResult{Success: false, Message: "登录已过期"}, errors.New("登录已过期")
}
responseStr, err := io.ReadAll(response.Body)
if err != nil {
h.logger.Error("【666】读取响应体失败", zap.Error(err))
return &CardQueryResult{Success: false, Message: "查询返回数据错误"}, err
}
h.logger.Info("【666】查询结果", zap.String("response", string(responseStr)))
var resp struct {
Code json.Number `json:"code"`
Msg string `json:"msg"`
Data struct {
List struct {
Data []struct {
Id int `json:"id"`
Class string `json:"class"`
Orderno string `json:"orderno"`
Tmporder string `json:"tmporder"`
Money int `json:"money"`
Xitmoney string `json:"xitmoney"`
SettleAmt string `json:"settle_amt"`
State string `json:"state"`
CardNo string `json:"card_no"`
CardKey string `json:"card_key"`
Remarks string `json:"remarks"`
CreateTime string `json:"create_time"`
Chulitime string `json:"chulitime"`
Title string `json:"title"`
PhoneRecycleIcon string `json:"phoneRecycleIcon"`
Tid int `json:"tid"`
Iscode int `json:"iscode"`
Isyzm int `json:"isyzm"`
Cid int `json:"cid"`
Card string `json:"card"`
Order string `json:"order"`
} `json:"data,omitempty"`
} `json:"list,omitempty"`
} `json:"data,omitempty"`
}
err = json.Unmarshal([]byte(responseStr), &resp)
if err != nil {
h.logger.Error("【666】解析失败", zap.Error(err))
return &CardQueryResult{Success: false, Message: "查询返回数据错误:" + string(responseStr)}, err
}
if resp.Code != "200" {
return &CardQueryResult{Success: false, Message: resp.Msg}, errors.New(resp.Msg)
}
for _, result := range resp.Data.List.Data {
stateCode := 0
switch result.State {
case "处理成功":
stateCode = 1
case "处理失败":
stateCode = -1
case "等待受理":
stateCode = 0
default:
stateCode = -1
}
if result.CardNo == cardNo {
return &CardQueryResult{
Success: true,
Status: stateCode,
Message: result.State,
FaceValue: faceValue,
SettledAmount: faceValue,
Data: map[string]any{"code": resp.Code},
}, nil
}
}
return &CardQueryResult{
Success: false,
Status: -1,
Message: "卡密不存在",
FaceValue: faceValue,
SettledAmount: faceValue,
Data: map[string]any{"code": resp.Code},
}, nil
}

View File

@@ -0,0 +1,112 @@
// Package channel 提供通道处理器接口和实现
package channel
import (
"context"
"gateway/internal/otelTrace"
"go.uber.org/zap"
)
// ChannelType 通道类型
type ChannelType string
const (
// ChannelTypeYuhv 宇辉通道
ChannelTypeYuhv ChannelType = "yuhv"
// ChannelTypeBatchSix 666通道
ChannelTypeBatchSix ChannelType = "batch_six"
)
// AuthType 认证类型
type AuthType string
const (
// AuthTypeToken 令牌认证
AuthTypeToken AuthType = "token"
// AuthTypeCookie Cookie认证
AuthTypeCookie AuthType = "cookie"
)
// AuthData 认证数据
type AuthData struct {
AuthType AuthType // 认证类型
AccessToken string // 访问令牌
Cookies []string // Cookie信息
}
// LoginResult 登录结果
type LoginResult struct {
Success bool // 是否成功
AuthType AuthType // 认证类型
AccessToken string // 访问令牌
Message string // 错误消息
Cookies []string // Cookie信息
Data map[string]any // 其他数据
}
// CardSubmitResult 卡密提交结果
type CardSubmitResult struct {
Success bool // 是否成功
OrderID string // 订单ID
Message string // 错误消息
NeedRetry bool // 是否需要重试
Data map[string]any // 其他数据
}
// CardQueryResult 卡密查询结果
type CardQueryResult struct {
Success bool // 是否成功
Status int // 状态0-处理中1-成功,-1-失败
Message string // 状态消息
FaceValue float64 // 面值
SettledAmount float64 // 结算金额
Data map[string]any // 其他数据
}
// ChannelHandler 通道处理器接口
type ChannelHandler interface {
// Type 返回通道类型
Type() ChannelType
// Login 登录
Login(ctx context.Context, username, password string, params map[string]string) (*LoginResult, error)
// SubmitCard 提交卡密
SubmitCard(ctx context.Context, channelCode int64, faceValue float64, cardNo, cardPwd string, authData AuthData, params map[string]string) (*CardSubmitResult, error)
// QueryCard 查询卡密
QueryCard(ctx context.Context, channelCode int64, faceValue float64, cardNo, cardPwd, orderID string, authData AuthData, params map[string]string) (*CardQueryResult, error)
}
// BaseChannelHandler 基础通道处理器
type BaseChannelHandler struct {
channelType ChannelType
logger *zap.Logger
}
// NewBaseChannelHandler 创建基础通道处理器
func NewBaseChannelHandler(channelType ChannelType) *BaseChannelHandler {
return &BaseChannelHandler{
channelType: channelType,
logger: otelTrace.Logger.WithContext(context.Background()),
}
}
// Type 返回通道类型
func (h *BaseChannelHandler) Type() ChannelType {
return h.channelType
}
// 通道处理器注册表
var (
handlers = make(map[ChannelType]ChannelHandler)
)
// RegisterHandler 注册通道处理器
func RegisterHandler(handler ChannelHandler) {
handlers[handler.Type()] = handler
}
// GetHandler 获取通道处理器
func GetHandler(channelType ChannelType) (ChannelHandler, bool) {
handler, ok := handlers[channelType]
return handler, ok
}

View File

@@ -0,0 +1,20 @@
// Package channel 提供通道处理器初始化
package channel
import (
"context"
"gateway/internal/otelTrace"
)
// InitChannelHandlers 初始化通道处理器
func InitChannelHandlers() {
// 注册宇辉通道处理器
// RegisterHandler(NewYuhvChannelHandler())
// otelTrace.Logger.WithContext(context.Background()).Info("宇辉通道处理器注册成功")
// 注册666通道处理器
RegisterHandler(NewBatchSixChannelHandler())
otelTrace.Logger.WithContext(context.Background()).Info("666通道处理器注册成功")
otelTrace.Logger.WithContext(context.Background()).Info("通道处理器初始化完成")
}

View File

@@ -0,0 +1,305 @@
// Package channel 提供宇辉通道处理器实现
package channel
import (
"context"
"encoding/json"
"errors"
"net/http"
"time"
"github.com/beego/beego/v2/client/httplib"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.uber.org/zap"
)
// YuhvChannelHandler 宇辉通道处理器
type YuhvChannelHandler struct {
*BaseChannelHandler
apiBaseURL string
}
// NewYuhvChannelHandler 创建宇辉通道处理器
func NewYuhvChannelHandler() *YuhvChannelHandler {
return &YuhvChannelHandler{
BaseChannelHandler: NewBaseChannelHandler(ChannelTypeYuhv),
apiBaseURL: "http://www.yh0968.com",
}
}
// Login 登录宇辉通道
func (h *YuhvChannelHandler) Login(ctx context.Context, username, password string, params map[string]string) (*LoginResult, error) {
request := httplib.NewBeegoRequestWithCtx(ctx, h.apiBaseURL+"/user/auth/login", "POST").
SetTransport(otelhttp.NewTransport(http.DefaultTransport)).
SetTimeout(time.Second*3, time.Second*30).Retries(30)
loginReq := struct {
Mobile string `json:"mobile"`
Password string `json:"password"`
Key string `json:"key"`
Captcha string `json:"captcha"`
}{
Mobile: username,
Password: password,
Key: "",
Captcha: "",
}
request, err := request.JSONBody(loginReq)
if err != nil {
h.logger.Error("【宇辉】JSONBody失败", zap.Error(err))
return &LoginResult{Success: false, Message: "登录请求格式失败"}, err
}
response, err := request.String()
if err != nil {
h.logger.Error("【宇辉】请求失败", zap.Error(err))
return &LoginResult{Success: false, Message: "登录返回数据错误"}, err
}
type responseStruct struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data struct {
AccessToken string `json:"access_token"`
} `json:"data"`
}
var resData responseStruct
err = json.Unmarshal([]byte(response), &resData)
h.logger.Info("【宇辉】登录返回", zap.Any("resData", resData), zap.String("response", response))
if err != nil {
return &LoginResult{Success: false, Message: "登录返回数据解析失败"}, err
}
if resData.Code != 0 {
return &LoginResult{Success: false, Message: resData.Msg}, errors.New(resData.Msg)
}
return &LoginResult{
Success: true,
AuthType: AuthTypeToken,
AccessToken: resData.Data.AccessToken,
Message: "登录成功",
Data: map[string]any{"code": resData.Code},
}, nil
}
// SubmitCard 提交卡密到宇辉通道
func (h *YuhvChannelHandler) SubmitCard(ctx context.Context, channelCode int64, faceValue float64, cardNo, cardPwd string, authData AuthData, params map[string]string) (*CardSubmitResult, error) {
// 宇辉通道只支持token认证
if authData.AuthType != AuthTypeToken || authData.AccessToken == "" {
return &CardSubmitResult{
Success: false,
Message: "认证信息无效",
NeedRetry: true,
}, errors.New("认证信息无效")
}
// 获取面值ID
faceValueID := h.getChannelInfo(ctx, channelCode, int64(faceValue))
if faceValueID == 0 {
return &CardSubmitResult{
Success: false,
Message: "未找到对应面值",
NeedRetry: false,
}, errors.New("未找到对应面值")
}
request := httplib.NewBeegoRequestWithCtx(ctx, h.apiBaseURL+"/user/card/recover", "POST").
SetTransport(otelhttp.NewTransport(http.DefaultTransport)).
SetTimeout(time.Second*3, time.Second*30).Retries(30).
Header("Authorization", authData.AccessToken)
cardReq := struct {
CardId int64 `json:"cardId"`
CardNo string `json:"cardNo"`
CardPwd string `json:"cardPwd"`
Quantity int `json:"quantity"`
}{
CardId: faceValueID,
CardNo: cardNo,
CardPwd: cardPwd,
Quantity: 1,
}
request, err := request.JSONBody(cardReq)
if err != nil {
h.logger.Error("【宇辉】JSONBody失败", zap.Error(err))
return &CardSubmitResult{
Success: false,
Message: "提交卡密请求格式失败",
NeedRetry: false,
}, err
}
response, err := request.String()
if err != nil {
h.logger.Error("【宇辉】请求失败", zap.Error(err))
return &CardSubmitResult{
Success: false,
Message: "提交卡密请求失败",
NeedRetry: true,
}, err
}
h.logger.Info("【宇辉】提交卡密返回", zap.String("response", response))
type responseStruct struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data struct {
OrderNo string `json:"orderNo"`
} `json:"data"`
}
var resData responseStruct
err = json.Unmarshal([]byte(response), &resData)
if err != nil {
h.logger.Error("【宇辉】解析失败", zap.Error(err))
return &CardSubmitResult{
Success: false,
Message: "提交卡密返回数据解析失败",
NeedRetry: false,
}, err
}
// 如果返回码不为0表示提交失败
if resData.Code != 0 {
// 如果是登录失效,需要重新登录
if resData.Code == 401 {
return &CardSubmitResult{
Success: false,
Message: "登录已过期,请重新登录",
NeedRetry: true,
}, errors.New(resData.Msg)
}
return &CardSubmitResult{
Success: false,
Message: resData.Msg,
NeedRetry: false,
}, errors.New(resData.Msg)
}
return &CardSubmitResult{
Success: true,
OrderID: resData.Data.OrderNo,
Message: "提交成功",
Data: map[string]interface{}{"code": resData.Code},
}, nil
}
// QueryCard 查询卡密
func (h *YuhvChannelHandler) QueryCard(ctx context.Context, channelCode int64, faceValue float64, cardNo, cardPwd, orderID string, authData AuthData, params map[string]string) (*CardQueryResult, error) {
// 宇辉通道只支持token认证
if authData.AuthType != AuthTypeToken || authData.AccessToken == "" {
return &CardQueryResult{
Success: false,
Status: 0,
Message: "认证信息无效",
}, errors.New("认证信息无效")
}
// 如果没有订单号,无法查询
if orderID == "" {
return &CardQueryResult{
Success: false,
Status: 0,
Message: "订单号为空",
}, errors.New("订单号为空")
}
request := httplib.NewBeegoRequestWithCtx(ctx, h.apiBaseURL+"/user/card/recoveryOrder", "POST").
SetTransport(otelhttp.NewTransport(http.DefaultTransport)).
SetTimeout(time.Second*3, time.Second*30).Retries(30).
Header("Authorization", authData.AccessToken)
queryReq := struct {
OrderNo string `json:"orderNo"`
}{
OrderNo: orderID,
}
request, err := request.JSONBody(queryReq)
if err != nil {
h.logger.Error("【宇辉】JSONBody失败", zap.Error(err))
return &CardQueryResult{
Success: false,
Status: 0,
Message: "查询请求格式失败",
}, err
}
response, err := request.String()
if err != nil {
h.logger.Error("【宇辉】请求失败", zap.Error(err))
return &CardQueryResult{
Success: false,
Status: 0,
Message: "查询请求失败",
}, err
}
type responseStruct struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data struct {
List []struct {
OrderNo string `json:"orderNo"`
RecoveryStatus int `json:"recoveryStatus"`
FaceValue float64 `json:"faceValue"`
SettlementFace float64 `json:"settlementFace"`
RecoveryDisount float64 `json:"recoveryDisount"`
Reason string `json:"reason"`
} `json:"list"`
} `json:"data"`
}
var resData responseStruct
err = json.Unmarshal([]byte(response), &resData)
h.logger.Info("【宇辉】查询订单返回", zap.Any("resData", resData))
if err != nil || len(resData.Data.List) == 0 {
return &CardQueryResult{
Success: false,
Status: 0,
Message: "未查询到订单信息",
}, err
}
targetOrder := resData.Data.List[0]
status := 0
switch targetOrder.RecoveryStatus {
case 2:
// 成功
status = 1
case 3:
status = -1
}
return &CardQueryResult{
Success: true,
Status: status,
Message: targetOrder.Reason,
FaceValue: targetOrder.FaceValue,
SettledAmount: targetOrder.SettlementFace,
Data: map[string]interface{}{
"recovery_status": targetOrder.RecoveryStatus,
"order_no": targetOrder.OrderNo,
},
}, nil
}
// getChannelInfo 获取通道信息
func (h *YuhvChannelHandler) getChannelInfo(ctx context.Context, channelCode, faceValue int64) int64 {
// 这里简化实现,实际应该查询通道信息
// 由于原始代码中依赖了全局变量和复杂的查询逻辑,这里简化返回
h.logger.Info("获取通道信息",
zap.Int64("channelCode", channelCode),
zap.Int64("faceValue", faceValue))
// 在实际实现中这里应该查询通道信息并返回正确的面值ID
return faceValue
}

View File

@@ -0,0 +1,85 @@
// Package queue 提供队列单元测试
package queue
import (
"context"
"gateway/internal/cache"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestChannelSessionManager(t *testing.T) {
// 跳过测试如果Redis不可用
redisClient := cache.GetRedisClient()
if redisClient == nil {
t.Skip("Redis client not available, skipping test")
return
}
ctx := context.Background()
manager := GetSessionManager()
// 测试数据
channelID := "test_channel"
username := "test_user"
session := &ChannelSession{
ChannelID: channelID,
Username: username,
Password: "test_password",
AccessToken: "test_token",
Cookies: []string{"cookie1=value1", "cookie2=value2"},
LoginTime: time.Now(),
ExpiresIn: time.Now().Add(1 * time.Hour),
Data: map[string]interface{}{"key": "value"},
}
// 清理可能存在的旧数据
_ = manager.RemoveSession(ctx, channelID, username)
// 测试设置会话
err := manager.SetSession(ctx, session)
assert.NoError(t, err, "SetSession should not return error")
// 测试获取会话
retrievedSession, ok := manager.GetSession(ctx, channelID, username)
assert.True(t, ok, "GetSession should return true")
assert.NotNil(t, retrievedSession, "GetSession should return non-nil session")
assert.Equal(t, session.ChannelID, retrievedSession.ChannelID, "ChannelID should match")
assert.Equal(t, session.Username, retrievedSession.Username, "Username should match")
assert.Equal(t, session.Password, retrievedSession.Password, "Password should match")
assert.Equal(t, session.AccessToken, retrievedSession.AccessToken, "AccessToken should match")
assert.Equal(t, len(session.Cookies), len(retrievedSession.Cookies), "Cookies length should match")
assert.Equal(t, session.Data["key"], retrievedSession.Data["key"], "Data should match")
// 测试删除会话
err = manager.RemoveSession(ctx, channelID, username)
assert.NoError(t, err, "RemoveSession should not return error")
// 确认会话已删除
_, ok = manager.GetSession(ctx, channelID, username)
assert.False(t, ok, "GetSession should return false after removal")
// 测试过期会话
expiredSession := &ChannelSession{
ChannelID: channelID,
Username: username + "_expired",
Password: "test_password",
AccessToken: "test_token",
Cookies: []string{"cookie1=value1", "cookie2=value2"},
LoginTime: time.Now().Add(-2 * time.Hour),
ExpiresIn: time.Now().Add(-1 * time.Hour), // 已过期
Data: map[string]interface{}{"key": "value"},
}
err = manager.SetSession(ctx, expiredSession)
assert.NoError(t, err, "SetSession should not return error for expired session")
// 获取过期会话应该返回false
_, ok = manager.GetSession(ctx, channelID, username+"_expired")
assert.False(t, ok, "GetSession should return false for expired session")
// 清理测试数据
_ = manager.RemoveSession(ctx, channelID, username+"_expired")
}

View File

@@ -0,0 +1,422 @@
// Package queue 提供通道任务处理器实现
package queue
import (
"context"
"errors"
"fmt"
"gateway/internal/cache"
"gateway/internal/otelTrace"
"gateway/internal/service/supplier/third_party/queue/channel"
"strconv"
"sync"
"time"
"go.uber.org/zap"
)
// ChannelSessionManager 通道会话管理器
type ChannelSessionManager struct {
// Redis客户端
redisClient *cache.RedisClient
// 会话过期时间
sessionTTL time.Duration
logger *zap.Logger
}
// ChannelSession 通道会话
type ChannelSession struct {
ChannelID string // 通道ID
Username string // 用户名
Password string // 密码
AuthType channel.AuthType // 认证类型
AccessToken string // 访问令牌
Cookies []string // Cookie信息
LoginTime time.Time // 登录时间
ExpiresIn time.Time // 过期时间
Data map[string]any // 其他数据
}
// 获取会话键
func getSessionKey(channelID, username string) string {
return fmt.Sprintf("channel:session:%s:%s", channelID, username)
}
// 单例模式实现
var (
sessionManager *ChannelSessionManager
sessionManagerOnce sync.Once
)
// GetSessionManager 获取会话管理器单例
func GetSessionManager() *ChannelSessionManager {
sessionManagerOnce.Do(func() {
sessionManager = &ChannelSessionManager{
redisClient: cache.GetRedisClient(),
sessionTTL: 24 * time.Hour, // 默认24小时过期
logger: otelTrace.Logger.WithContext(context.Background()),
}
})
return sessionManager
}
// GetSession 获取会话
func (m *ChannelSessionManager) GetSession(ctx context.Context, channelID, username string) (*ChannelSession, bool) {
key := getSessionKey(channelID, username)
// 从Redis获取会话数据
var session ChannelSession
err := m.redisClient.Get(key, &session)
if err != nil {
m.logger.Error("从Redis获取会话失败",
zap.String("channelID", channelID),
zap.String("username", username),
zap.Error(err))
return nil, false
}
// 检查会话是否过期
if time.Now().After(session.ExpiresIn) {
_ = m.RemoveSession(ctx, channelID, username)
return nil, false
}
return &session, true
}
// SetSession 设置会话
func (m *ChannelSessionManager) SetSession(ctx context.Context, session *ChannelSession) error {
key := getSessionKey(session.ChannelID, session.Username)
// 存储到Redis设置过期时间
ttl := session.ExpiresIn.Sub(time.Now())
if ttl <= 0 {
ttl = m.sessionTTL
}
if err := m.redisClient.Set(key, session, ttl); err != nil {
m.logger.Error("保存会话到Redis失败",
zap.String("channelID", session.ChannelID),
zap.String("username", session.Username),
zap.Error(err))
return err
}
return nil
}
// RemoveSession 移除会话
func (m *ChannelSessionManager) RemoveSession(ctx context.Context, channelID, username string) error {
key := getSessionKey(channelID, username)
if err := m.redisClient.Delete(key); err != nil {
m.logger.Error("从Redis删除会话失败",
zap.String("channelID", channelID),
zap.String("username", username),
zap.Error(err))
return err
}
return nil
}
// HandleLoginTask 登录任务处理器
func HandleLoginTask(ctx context.Context, task Task) error {
loginTask, ok := task.(*LoginTask)
if !ok {
return fmt.Errorf("任务类型错误: %T", task)
}
// 获取通道处理器
handler, ok := channel.GetHandler(channel.ChannelType(loginTask.Channel()))
if !ok {
return fmt.Errorf("未找到通道处理器: %s", loginTask.Channel())
}
// 执行登录
result, err := handler.Login(ctx, loginTask.Mobile, loginTask.Password, loginTask.Params)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("登录失败",
zap.String("channel", loginTask.Channel()),
zap.String("mobile", loginTask.Mobile),
zap.Error(err),
)
return err
}
if !result.Success {
return errors.New(result.Message)
}
// 保存会话
session := &ChannelSession{
ChannelID: loginTask.Channel(),
Username: loginTask.Mobile,
Password: loginTask.Password,
AuthType: result.AuthType,
AccessToken: result.AccessToken,
Cookies: result.Cookies,
LoginTime: time.Now(),
ExpiresIn: time.Now().Add(24 * time.Hour), // 默认24小时过期
Data: result.Data,
}
if err2 := GetSessionManager().SetSession(ctx, session); err2 != nil {
otelTrace.Logger.WithContext(ctx).Error("保存会话失败",
zap.String("channel", loginTask.Channel()),
zap.String("mobile", loginTask.Mobile),
zap.Error(err2),
)
return err2
}
otelTrace.Logger.WithContext(ctx).Info("登录成功",
zap.String("channel", loginTask.Channel()),
zap.String("mobile", loginTask.Mobile),
zap.String("authType", string(result.AuthType)),
)
return nil
}
// HandleCardSubmitTask 卡密提交任务处理器
func HandleCardSubmitTask(ctx context.Context, task Task) error {
submitTask, ok := task.(*CardSubmitTask)
if !ok {
return fmt.Errorf("任务类型错误: %T", task)
}
// 获取通道处理器
handler, ok := channel.GetHandler(channel.ChannelType(submitTask.Channel()))
if !ok {
return fmt.Errorf("未找到通道处理器: %s", submitTask.Channel())
}
// 获取会话
session, ok := GetSessionManager().GetSession(ctx, submitTask.Channel(), submitTask.Mobile)
if !ok {
// 如果没有会话,先登录
loginTask := NewLoginTask(
submitTask.Channel(),
submitTask.Mobile,
submitTask.Password,
make(map[string]string),
)
if err := HandleLoginTask(ctx, loginTask); err != nil {
return fmt.Errorf("登录失败: %w", err)
}
session, ok = GetSessionManager().GetSession(ctx, submitTask.Channel(), submitTask.Mobile)
if !ok {
return errors.New("登录后未找到会话")
}
}
// 构建认证数据
authData := channel.AuthData{
AuthType: session.AuthType,
AccessToken: session.AccessToken,
Cookies: session.Cookies,
}
// 设置参数
submitTask.Params = map[string]string{
"username": submitTask.Mobile,
"password": submitTask.Password,
"channelCode": strconv.FormatInt(submitTask.ChannelCode, 10),
"faceValue": strconv.FormatFloat(float64(submitTask.FaceValue), 'f', 2, 64),
}
// 执行卡密提交
result, err := handler.SubmitCard(ctx, submitTask.ChannelCode, float64(submitTask.FaceValue), submitTask.CardNo, submitTask.CardPwd, authData, submitTask.Params)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("提交卡密失败",
zap.String("channel", submitTask.Channel()),
zap.String("cardNo", submitTask.CardNo),
zap.Error(err),
)
// 如果需要重试且重试次数未达到上限
if result != nil && result.NeedRetry && submitTask.DispatchCount < 5 {
// 如果需要重新登录
if result.Message == "请先登录后再来提交" || result.Message == "未登录,请先登录" || result.Message == "认证信息无效" {
// 移除会话
if err := GetSessionManager().RemoveSession(ctx, submitTask.Channel(), submitTask.Mobile); err != nil {
otelTrace.Logger.WithContext(ctx).Error("移除会话失败",
zap.String("channel", submitTask.Channel()),
zap.String("mobile", submitTask.Mobile),
zap.Error(err),
)
}
// 重新登录
loginTask := NewLoginTask(
submitTask.Channel(),
submitTask.Mobile,
submitTask.Password,
make(map[string]string),
)
if err := HandleLoginTask(ctx, loginTask); err != nil {
return fmt.Errorf("重新登录失败: %w", err)
}
}
// 重新入队
newTask := NewCardSubmitTask(
submitTask.Channel(),
submitTask.ChannelCode,
submitTask.FaceValue,
submitTask.CardNo,
submitTask.CardPwd,
submitTask.Mobile,
submitTask.Password,
submitTask.BankOrderId,
submitTask.DispatchCount+1,
)
newTask.Params = submitTask.Params
// 延迟一秒后重试
time.Sleep(time.Second)
return GetQueueManager().EnqueueTask(ctx, QueueNameCardSubmit, newTask)
}
return err
}
if !result.Success {
return errors.New(result.Message)
}
// 提交成功后,添加查询任务
queryTask := NewCardQueryTask(
submitTask.Channel(),
submitTask.BankOrderId,
submitTask.CardNo,
submitTask.CardPwd,
submitTask.Mobile,
submitTask.Password,
submitTask.Params,
)
// 将查询任务加入队列
_ = GetQueueManager().EnqueueTask(ctx, QueueNameCardQuery, queryTask)
otelTrace.Logger.WithContext(ctx).Info("提交卡密成功",
zap.String("channel", submitTask.Channel()),
zap.String("cardNo", submitTask.CardNo),
zap.String("bankOrderId", submitTask.BankOrderId),
)
return nil
}
// HandleCardQueryTask 卡密查询任务处理器
func HandleCardQueryTask(ctx context.Context, task Task) error {
queryTask, ok := task.(*CardQueryTask)
if !ok {
return fmt.Errorf("任务类型错误: %T", task)
}
// 获取通道处理器
handler, ok := channel.GetHandler(channel.ChannelType(queryTask.Channel()))
if !ok {
return fmt.Errorf("未找到通道处理器: %s", queryTask.Channel())
}
// 获取会话
session, ok := GetSessionManager().GetSession(ctx, queryTask.Channel(), queryTask.Mobile)
if !ok {
return errors.New("未找到会话")
}
// 构建认证数据
authData := channel.AuthData{
AuthType: session.AuthType,
AccessToken: session.AccessToken,
Cookies: session.Cookies,
}
// 如果查询次数超过最大次数,不再重试
if queryTask.RetryCount >= 10 {
otelTrace.Logger.WithContext(ctx).Warn("查询卡密已达到最大重试次数",
zap.String("channel", queryTask.Channel()),
zap.String("cardNo", queryTask.CardNo),
zap.Int("retryCount", queryTask.RetryCount),
)
return nil
}
// 执行卡密查询
result, err := handler.QueryCard(ctx, queryTask.ChannelCode, float64(queryTask.FaceValue), queryTask.CardNo, queryTask.CardPwd, queryTask.BankOrderId, authData, queryTask.Params)
if err != nil {
// 如果是不支持的操作,不记录错误
if err.Error() == "不支持的操作" {
return nil
}
otelTrace.Logger.WithContext(ctx).Error("查询卡密失败",
zap.String("channel", queryTask.Channel()),
zap.String("cardNo", queryTask.CardNo),
zap.Error(err),
)
// 如果查询失败,稍后重试
return GetQueueManager().EnqueueTask(ctx, QueueNameCardQuery, queryTask)
}
// 如果查询结果是处理中增加查询次数并在1分钟后重试
if result.Status == 0 {
// 增加查询次数
queryTask.RetryCount++
otelTrace.Logger.WithContext(ctx).Info("卡密核销处理中,稍后重试",
zap.String("channel", queryTask.Channel()),
zap.String("cardNo", queryTask.CardNo),
zap.Int("retryCount", queryTask.RetryCount),
)
// 一分钟后重试
time.Sleep(time.Second * 10)
return GetQueueManager().EnqueueTask(ctx, QueueNameCardQuery, queryTask)
}
// 处理查询结果
otelTrace.Logger.WithContext(ctx).Info("查询卡密结果",
zap.String("channel", queryTask.Channel()),
zap.String("cardNo", queryTask.CardNo),
zap.Int("status", result.Status),
zap.String("message", result.Message),
zap.Float64("faceValue", result.FaceValue),
zap.Float64("settledAmount", result.SettledAmount),
zap.Int("retryCount", queryTask.RetryCount),
)
// TODO: 处理订单结果,更新订单状态等
return nil
}
// RegisterChannelTaskHandlers 注册通道任务处理器
func RegisterChannelTaskHandlers() {
registry := NewDefaultHandlerRegistry()
// 注册登录任务处理器
// registry.RegisterHandler(TaskTypeLogin, ChannelYuhv, HandleLoginTask)
registry.RegisterHandler(TaskTypeLogin, ChannelBatchSix, HandleLoginTask)
// 注册卡密提交任务处理器
// registry.RegisterHandler(TaskTypeCardSubmit, ChannelYuhv, HandleCardSubmitTask)
registry.RegisterHandler(TaskTypeCardSubmit, ChannelBatchSix, HandleCardSubmitTask)
// 注册卡密查询任务处理器
// registry.RegisterHandler(TaskTypeCardQuery, ChannelYuhv, HandleCardQueryTask)
registry.RegisterHandler(TaskTypeCardQuery, ChannelBatchSix, HandleCardQueryTask)
// 创建任务处理器
taskHandler := NewDefaultTaskHandler(registry)
// 设置任务处理器
GetQueueManager().SetTaskHandler(taskHandler)
}

View File

@@ -0,0 +1,83 @@
// Package queue 提供队列任务处理器实现
package queue
import (
"context"
"fmt"
"gateway/internal/otelTrace"
"sync"
"go.uber.org/zap"
)
// HandlerFunc 任务处理函数类型
type HandlerFunc func(ctx context.Context, task Task) error
// HandlerRegistry 处理器注册表接口
type HandlerRegistry interface {
// RegisterHandler 注册任务处理器
RegisterHandler(taskType TaskType, channelID string, handler HandlerFunc)
// GetHandler 获取任务处理器
GetHandler(taskType TaskType, channelID string) (HandlerFunc, bool)
}
// DefaultHandlerRegistry 默认处理器注册表实现
type DefaultHandlerRegistry struct {
handlers map[string]HandlerFunc
mu sync.RWMutex
}
// NewDefaultHandlerRegistry 创建默认处理器注册表
func NewDefaultHandlerRegistry() *DefaultHandlerRegistry {
return &DefaultHandlerRegistry{
handlers: make(map[string]HandlerFunc),
}
}
// handlerKey 生成处理器键
func handlerKey(taskType TaskType, channelID string) string {
return fmt.Sprintf("%s:%s", string(taskType), channelID)
}
// RegisterHandler 注册任务处理器
func (r *DefaultHandlerRegistry) RegisterHandler(taskType TaskType, channelID string, handler HandlerFunc) {
r.mu.Lock()
defer r.mu.Unlock()
key := handlerKey(taskType, channelID)
r.handlers[key] = handler
}
// GetHandler 获取任务处理器
func (r *DefaultHandlerRegistry) GetHandler(taskType TaskType, channelID string) (HandlerFunc, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
key := handlerKey(taskType, channelID)
handler, ok := r.handlers[key]
return handler, ok
}
// DefaultTaskHandler 默认任务处理器实现
type DefaultTaskHandler struct {
registry HandlerRegistry
logger *zap.Logger
}
// NewDefaultTaskHandler 创建默认任务处理器
func NewDefaultTaskHandler(registry HandlerRegistry) *DefaultTaskHandler {
return &DefaultTaskHandler{
registry: registry,
logger: otelTrace.Logger.WithContext(context.Background()),
}
}
// HandleTask 处理任务
func (h *DefaultTaskHandler) HandleTask(ctx context.Context, task Task) error {
handler, ok := h.registry.GetHandler(task.Type(), task.Channel())
if !ok {
return fmt.Errorf("未找到任务处理器: %s:%s", task.Type(), task.Channel())
}
return handler(ctx, task)
}

View File

@@ -0,0 +1,36 @@
// Package queue 提供队列初始化模块
package queue
import (
"context"
"gateway/internal/cache"
"gateway/internal/otelTrace"
"gateway/internal/service/supplier/third_party/queue/channel"
"go.uber.org/zap"
)
// DefaultWorkerCount 默认工作协程数量
const DefaultWorkerCount = 30
// Init 初始化队列系统
func Init(ctx context.Context) {
// 初始化通道处理器
channel.InitChannelHandlers()
// 初始化队列管理器
InitQueueManager(cache.GetRedisClient(), DefaultWorkerCount)
// 注册通道任务处理器
RegisterChannelTaskHandlers()
// 初始化队列服务
InitQueueService()
// 启动所有队列
if err := GetQueueManager().StartAll(ctx); err != nil {
otelTrace.Logger.WithContext(ctx).Error("启动队列失败", zap.Error(err))
} else {
otelTrace.Logger.WithContext(ctx).Info("队列系统初始化成功")
}
}

View File

@@ -0,0 +1,138 @@
// Package queue 提供队列管理器实现
package queue
import (
"context"
"fmt"
"gateway/internal/cache"
"gateway/internal/otelTrace"
"sync"
"go.uber.org/zap"
)
// QueueManager 队列管理器,负责创建和管理队列
type QueueManager struct {
redisClient *cache.RedisClient
queues map[string]*RedisQueue
registry HandlerRegistry
taskFactory TaskFactory
taskHandler TaskHandler
defaultWorker int
logger *zap.Logger
mu sync.RWMutex
}
// NewQueueManager 创建队列管理器
func NewQueueManager(redisClient *cache.RedisClient, registry HandlerRegistry, taskFactory TaskFactory, defaultWorker int) *QueueManager {
return &QueueManager{
redisClient: redisClient,
queues: make(map[string]*RedisQueue),
registry: registry,
taskFactory: taskFactory,
defaultWorker: defaultWorker,
logger: otelTrace.Logger.WithContext(context.Background()),
}
}
// queueKey 生成队列键
func queueKey(queueName, channelID string) string {
return fmt.Sprintf("%s:%s", channelID, queueName)
}
// GetOrCreateQueue 获取或创建队列
func (m *QueueManager) GetOrCreateQueue(queueName, channelID string, workerCount int) *RedisQueue {
m.mu.Lock()
defer m.mu.Unlock()
key := queueKey(queueName, channelID)
if queue, ok := m.queues[key]; ok {
return queue
}
if workerCount <= 0 {
workerCount = m.defaultWorker
}
handler := m.taskHandler
if handler == nil {
handler = NewDefaultTaskHandler(m.registry)
}
queue := NewRedisQueue(queueName, channelID, m.redisClient, handler, workerCount, m.taskFactory)
m.queues[key] = queue
m.logger.Info("创建队列",
zap.String("queue", queueName),
zap.String("channel", channelID),
zap.Int("workerCount", workerCount),
)
return queue
}
// StartAll 启动所有队列
func (m *QueueManager) StartAll(ctx context.Context) error {
for key, queue := range m.queues {
if err := queue.Start(ctx); err != nil {
m.logger.Error("启动队列失败", zap.String("queue", key), zap.Error(err))
return err
}
}
return nil
}
// StopAll 停止所有队列
func (m *QueueManager) StopAll() error {
for key, queue := range m.queues {
if err := queue.Stop(); err != nil {
m.logger.Error("停止队列失败", zap.String("queue", key), zap.Error(err))
return err
}
}
return nil
}
// RegisterHandler 注册任务处理器
func (m *QueueManager) RegisterHandler(taskType TaskType, channelID string, handler HandlerFunc) {
m.registry.RegisterHandler(taskType, channelID, handler)
}
// EnqueueTask 将任务加入队列
func (m *QueueManager) EnqueueTask(ctx context.Context, queueName string, task Task) error {
queue := m.GetOrCreateQueue(queueName, task.Channel(), m.defaultWorker)
return queue.Enqueue(ctx, task)
}
// SetTaskHandler 设置任务处理器
func (m *QueueManager) SetTaskHandler(handler TaskHandler) {
m.taskHandler = handler
}
// 单例模式实现
var (
manager *QueueManager
managerOnce sync.Once
)
// GetQueueManager 获取队列管理器单例
func GetQueueManager() *QueueManager {
return manager
}
// InitQueueManager 初始化队列管理器
func InitQueueManager(redisClient *cache.RedisClient, defaultWorker int) {
managerOnce.Do(func() {
registry := NewDefaultHandlerRegistry()
taskFactory := NewDefaultTaskFactory()
manager = NewQueueManager(redisClient, registry, taskFactory, defaultWorker)
})
}
// 通道ID常量
const (
ChannelYuhv = "yuhv"
ChannelBatchSix = "batch_six"
)

View File

@@ -0,0 +1,385 @@
// Package queue 提供队列抽象层,用于处理卡密提交、登录和查询等操作
// 该模块抽象了队列操作,使其可以被不同通道复用
package queue
import (
"context"
"encoding/json"
"fmt"
"gateway/internal/cache"
"gateway/internal/otelTrace"
"sync"
"time"
"github.com/bytedance/gopkg/util/gopool"
"go.uber.org/zap"
)
// TaskType 定义队列任务类型
type TaskType string
// 预定义的队列名称和任务类型常量
const (
// 任务类型
TaskTypeCardSubmit TaskType = "card_submit" // 卡密提交任务
TaskTypeLogin TaskType = "login" // 登录任务
TaskTypeCardQuery TaskType = "card_query" // 卡密查询任务
// 队列名称,与任务类型对应
QueueNameCardSubmit = "card_submit" // 卡密提交队列
QueueNameLogin = "login" // 登录队列
QueueNameCardQuery = "card_query" // 卡密查询队列
)
// Task 队列任务接口
type Task interface {
// Type 返回任务类型
Type() TaskType
// Channel 返回任务所属通道
Channel() string
// MarshalJSON 序列化任务
MarshalJSON() ([]byte, error)
}
// BaseTask 基础任务
type BaseTask struct {
TaskType TaskType `json:"type"` // 任务类型
ChannelID string `json:"channel_id"` // 通道类型
CreateTime time.Time `json:"create_time"` // 创建时间
DispatchCount int `json:"dispatch_count"` // 派发次数
}
// Type 返回任务类型
func (t *BaseTask) Type() TaskType {
return t.TaskType
}
// Channel 返回通道类型
func (t *BaseTask) Channel() string {
return t.ChannelID
}
// MarshalJSON 序列化任务
func (t *BaseTask) MarshalJSON() ([]byte, error) {
return json.Marshal(t)
}
// NewBaseTask 创建基础任务
func NewBaseTask(taskType TaskType, channelType string) *BaseTask {
return &BaseTask{
TaskType: taskType,
ChannelID: channelType,
CreateTime: time.Now(),
DispatchCount: 0,
}
}
// LoginTask 登录任务
type LoginTask struct {
*BaseTask
Mobile string `json:"mobile"` // 手机号
Password string `json:"password"` // 密码
Params map[string]string `json:"params"` // 其他参数
}
// MarshalJSON 序列化任务
func (t *LoginTask) MarshalJSON() ([]byte, error) {
return json.Marshal(t)
}
// NewLoginTask 创建登录任务
func NewLoginTask(channelType, mobile, password string, params map[string]string) *LoginTask {
return &LoginTask{
BaseTask: NewBaseTask(TaskTypeLogin, channelType),
Mobile: mobile,
Password: password,
Params: params,
}
}
// CardSubmitTask 卡密提交任务
type CardSubmitTask struct {
*BaseTask
ChannelCode int64 `json:"channel_code"` // 通道编码
FaceValue int64 `json:"face_value"` // 面值
CardNo string `json:"card_no"` // 卡号
CardPwd string `json:"card_pwd"` // 卡密
Mobile string `json:"mobile"` // 手机号
Password string `json:"password"` // 密码
BankOrderId string `json:"bank_order_id"` // 银行订单号
Params map[string]string `json:"params"` // 其他参数
}
// MarshalJSON 序列化任务
func (t *CardSubmitTask) MarshalJSON() ([]byte, error) {
return json.Marshal(t)
}
// NewCardSubmitTask 创建卡密提交任务
func NewCardSubmitTask(channelType string, channelCode, faceValue int64, cardNo, cardPwd, mobile, password, bankOrderId string, dispatchCount int) *CardSubmitTask {
task := &CardSubmitTask{
BaseTask: NewBaseTask(TaskTypeCardSubmit, channelType),
ChannelCode: channelCode,
FaceValue: faceValue,
CardNo: cardNo,
CardPwd: cardPwd,
Mobile: mobile,
Password: password,
BankOrderId: bankOrderId,
Params: make(map[string]string),
}
task.DispatchCount = dispatchCount
return task
}
// CardQueryTask 卡密查询任务
type CardQueryTask struct {
*BaseTask
ChannelCode int64 `json:"channel_code"` // 通道编码
FaceValue int64 `json:"face_value"` // 面值
BankOrderId string `json:"bank_order_id"` // 银行订单号
CardNo string `json:"card_no"` // 卡号
CardPwd string `json:"card_pwd"` // 卡密
Params map[string]string `json:"params"` // 其他参数
Mobile string `json:"mobile"` // 手机号
Password string `json:"password"` // 密码
RetryCount int `json:"retry_count"` // 重试次数
}
// MarshalJSON 序列化任务
func (t *CardQueryTask) MarshalJSON() ([]byte, error) {
return json.Marshal(t)
}
// NewCardQueryTask 创建卡密查询任务
func NewCardQueryTask(channelType, bankOrderId, cardNo, cardPwd string, mobile, password string, params map[string]string) *CardQueryTask {
return &CardQueryTask{
BaseTask: NewBaseTask(TaskTypeCardQuery, channelType),
BankOrderId: bankOrderId,
CardNo: cardNo,
CardPwd: cardPwd,
Params: params,
Mobile: mobile,
Password: password,
RetryCount: 0,
}
}
// DefaultTaskFactory 默认任务工厂实现
type DefaultTaskFactory struct{}
// CreateTask 根据任务类型创建任务实例
func (f *DefaultTaskFactory) CreateTask(taskType TaskType) Task {
switch taskType {
case TaskTypeCardSubmit:
return &CardSubmitTask{
BaseTask: &BaseTask{
TaskType: TaskTypeCardSubmit,
},
}
case TaskTypeLogin:
return &LoginTask{
BaseTask: &BaseTask{
TaskType: TaskTypeLogin,
},
}
case TaskTypeCardQuery:
return &CardQueryTask{
BaseTask: &BaseTask{
TaskType: TaskTypeCardQuery,
},
}
default:
return nil
}
}
// NewDefaultTaskFactory 创建默认任务工厂
func NewDefaultTaskFactory() TaskFactory {
return &DefaultTaskFactory{}
}
// TaskHandler 任务处理器接口
type TaskHandler interface {
// HandleTask 处理任务
HandleTask(ctx context.Context, task Task) error
}
// Queue 队列接口
type Queue interface {
// Enqueue 将任务加入队列
Enqueue(ctx context.Context, task Task) error
// Dequeue 从队列中取出任务
Dequeue(ctx context.Context) (Task, error)
// Size 返回队列大小
Size(ctx context.Context) (int64, error)
// Start 启动队列处理
Start(ctx context.Context) error
// Stop 停止队列处理
Stop() error
}
// RedisQueue Redis队列实现
type RedisQueue struct {
name string
channel string
redisClient *cache.RedisClient
handler TaskHandler
pool gopool.Pool
workerCount int
running bool
stopChan chan struct{}
logger *zap.Logger
taskFactory TaskFactory
mu sync.Mutex
}
// TaskFactory 任务工厂接口,用于创建具体的任务实例
type TaskFactory interface {
// CreateTask 根据任务类型创建任务实例
CreateTask(taskType TaskType) Task
}
// NewRedisQueue 创建新的Redis队列
func NewRedisQueue(name string, channel string, redisClient *cache.RedisClient, handler TaskHandler, workerCount int, taskFactory TaskFactory) *RedisQueue {
return &RedisQueue{
name: name,
channel: channel,
redisClient: redisClient,
handler: handler,
pool: gopool.NewPool(name, int32(workerCount), gopool.NewConfig()),
workerCount: workerCount,
stopChan: make(chan struct{}),
logger: otelTrace.Logger.WithContext(context.Background()),
taskFactory: taskFactory,
}
}
// queueKey 生成队列的Redis键
func (q *RedisQueue) queueKey() string {
return fmt.Sprintf("queue:%s:%s", q.channel, q.name)
}
// Enqueue 将任务加入队列
func (q *RedisQueue) Enqueue(ctx context.Context, task Task) error {
data, err := task.MarshalJSON()
if err != nil {
return fmt.Errorf("序列化任务失败: %w", err)
}
err = q.redisClient.RPush(q.queueKey(), string(data))
if err != nil {
return fmt.Errorf("添加任务到队列失败: %w", err)
}
q.logger.Debug("任务已入队",
zap.String("queue", q.name),
zap.String("channel", q.channel),
zap.String("type", string(task.Type())),
)
return nil
}
// Dequeue 从队列中取出任务
func (q *RedisQueue) Dequeue(ctx context.Context) (Task, error) {
var taskData string
err := q.redisClient.LPopUnmarshal(q.queueKey(), &taskData)
if err != nil {
return nil, err
}
// 根据任务类型反序列化
var taskType struct {
Type TaskType `json:"type"`
}
if err := json.Unmarshal([]byte(taskData), &taskType); err != nil {
return nil, fmt.Errorf("解析任务类型失败: %w", err)
}
task := q.taskFactory.CreateTask(taskType.Type)
if task == nil {
return nil, fmt.Errorf("未知任务类型: %s", taskType.Type)
}
if err := json.Unmarshal([]byte(taskData), task); err != nil {
return nil, fmt.Errorf("反序列化任务失败: %w", err)
}
return task, nil
}
// Size 返回队列大小
func (q *RedisQueue) Size(ctx context.Context) (int64, error) {
return q.redisClient.LLen(q.queueKey())
}
// Start 启动队列处理
func (q *RedisQueue) Start(ctx context.Context) error {
q.mu.Lock()
defer q.mu.Unlock()
if q.running {
return nil
}
q.running = true
q.logger.Info("队列处理启动",
zap.String("queue", q.name),
zap.String("channel", q.channel),
zap.Int("workerCount", q.workerCount),
)
for range q.workerCount {
q.pool.Go(func() {
q.processQueue(ctx)
})
}
return nil
}
// Stop 停止队列处理
func (q *RedisQueue) Stop() error {
q.mu.Lock()
defer q.mu.Unlock()
if !q.running {
return nil
}
q.running = false
close(q.stopChan)
q.logger.Info("队列处理停止", zap.String("queue", q.name), zap.String("channel", q.channel))
return nil
}
// processQueue 处理队列中的任务
func (q *RedisQueue) processQueue(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-q.stopChan:
return
case <-ticker.C:
task, err := q.Dequeue(ctx)
if err != nil {
// 队列为空或其他错误,继续等待
continue
}
// 处理任务
if err := q.handler.HandleTask(ctx, task); err != nil {
q.logger.Error("处理任务失败",
zap.String("queue", q.name),
zap.String("channel", q.channel),
zap.String("type", string(task.Type())),
zap.Error(err),
)
}
}
}
}

View File

@@ -0,0 +1,91 @@
// Package queue 提供队列服务接口
package queue
import (
"context"
"gateway/internal/cache"
"gateway/internal/otelTrace"
"sync"
"go.uber.org/zap"
)
// QueueService 队列服务接口
type QueueService interface {
// Start 启动服务
Start(ctx context.Context) error
// Stop 停止服务
Stop() error
// LoginTask 提交登录任务
LoginTask(ctx context.Context, channelID, mobile, password string, params map[string]string) error
// SubmitCardTask 提交卡密任务
SubmitCardTask(ctx context.Context, channelID string, channelCode, faceValue int64, cardNo, cardPwd, mobile, password, bankOrderId string) error
// QueryCardTask 提交查询任务
QueryCardTask(ctx context.Context, channelID, bankOrderId, cardNo, cardPwd, mobile, password string, params map[string]string) error
}
// DefaultQueueService 默认队列服务实现
type DefaultQueueService struct {
manager *QueueManager
redisClient *cache.RedisClient
logger *zap.Logger
}
// NewQueueService 创建新的队列服务
func NewQueueService(redisClient *cache.RedisClient) QueueService {
return &DefaultQueueService{
manager: GetQueueManager(),
redisClient: redisClient,
logger: otelTrace.Logger.WithContext(context.Background()),
}
}
// Start 启动服务
func (s *DefaultQueueService) Start(ctx context.Context) error {
return s.manager.StartAll(ctx)
}
// Stop 停止服务
func (s *DefaultQueueService) Stop() error {
return s.manager.StopAll()
}
// LoginTask 提交登录任务
func (s *DefaultQueueService) LoginTask(ctx context.Context, channelID, mobile, password string, params map[string]string) error {
task := NewLoginTask(channelID, mobile, password, params)
return s.manager.EnqueueTask(ctx, QueueNameLogin, task)
}
// SubmitCardTask 提交卡密任务
func (s *DefaultQueueService) SubmitCardTask(ctx context.Context, channelID string, channelCode, faceValue int64, cardNo, cardPwd, mobile, password, bankOrderId string) error {
task := NewCardSubmitTask(channelID, channelCode, faceValue, cardNo, cardPwd, mobile, password, bankOrderId, 0)
return s.manager.EnqueueTask(ctx, QueueNameCardSubmit, task)
}
// QueryCardTask 提交查询任务
func (s *DefaultQueueService) QueryCardTask(ctx context.Context, channelID, bankOrderId, cardNo, cardPwd, mobile, password string, params map[string]string) error {
task := NewCardQueryTask(channelID, bankOrderId, cardNo, cardPwd, mobile, password, params)
return s.manager.EnqueueTask(ctx, QueueNameCardQuery, task)
}
// 单例模式实现
var (
queueService QueueService
queueServiceOnce sync.Once
)
// GetQueueService 获取队列服务单例
func GetQueueService() QueueService {
queueServiceOnce.Do(func() {
queueService = NewQueueService(cache.GetRedisClient())
})
return queueService
}
// StartQueueService 启动队列服务
func StartQueueService(ctx context.Context) error {
return GetQueueService().Start(ctx)
}

View File

@@ -0,0 +1,28 @@
// Package queue 提供队列服务初始化
package queue
import (
"context"
"gateway/internal/otelTrace"
)
// InitQueueService 初始化队列服务
func InitQueueService() {
// 获取队列管理器
manager := GetQueueManager()
if manager == nil {
otelTrace.Logger.WithContext(context.Background()).Error("队列管理器未初始化")
return
}
// 创建队列服务实例
service := &DefaultQueueService{
manager: manager,
logger: otelTrace.Logger.WithContext(context.Background()),
}
// 设置队列服务
queueService = service
otelTrace.Logger.WithContext(context.Background()).Info("队列服务初始化成功")
}

File diff suppressed because it is too large Load Diff

20
internal/utils/wait.go Normal file
View File

@@ -0,0 +1,20 @@
// Package utils 提供工具函数
package utils
import (
"time"
)
// WaitUntil 等待直到条件满足或超时
func WaitUntil(condition func() bool, interval time.Duration, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if condition() {
return true
}
time.Sleep(interval)
}
return condition()
}

View File

@@ -11,6 +11,7 @@ import (
_ "gateway/internal/service/message"
"gateway/internal/service/notify"
_ "gateway/internal/service/supplier/third_party"
"gateway/internal/service/supplier/third_party/queue"
"log"
_ "net/http/pprof"
@@ -38,6 +39,10 @@ func main() {
// _ = cleanup3(otelTrace.InitCtx)
// }
// }()
// 初始化队列系统
queue.Init(otelTrace.InitCtx)
go notify.CreateOrderNotifyConsumer(otelTrace.InitCtx)
go query.CreateSupplierOrderQueryCuConsumer(otelTrace.InitCtx)
// go query.CreatePayForQueryConsumer(otelTrace.InitCtx)