新增 Redis 支持,优化分布式锁实现,调整配置管理,提升代码可读性和一致性。同时,更新支付逻辑,增强错误处理,确保在订单处理过程中更稳定。

This commit is contained in:
danial
2025-05-29 16:35:06 +08:00
parent e1878233e0
commit 2d85c0e612
19 changed files with 999 additions and 178 deletions

View File

@@ -15,7 +15,6 @@ level =7
#日志保存路径
filepath= ./logs/jhmerchant.log
#需要显示的日志信息
#separate="["emergency", "alert", "critical", "error", "warning", "notice", "info", "debug"]"
separate="["emergency","alert","critical","error","warning","notice","info","debug"]"
#日志保存最大天数
maxdays=10
@@ -28,6 +27,12 @@ dbpasswd = 123456
dbbase = kami
debug = true
[redis]
host = 127.0.0.1
port = 6379
db = 0
password = 123456
[mq]
host = 127.0.0.1
port = 61613

2
go.mod
View File

@@ -19,6 +19,7 @@ require (
github.com/google/uuid v1.6.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/redis/go-redis/v9 v9.8.0
github.com/rs/xid v1.6.0
github.com/shopspring/decimal v1.4.0
github.com/stretchr/testify v1.10.0
@@ -49,6 +50,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudwego/base64x v0.1.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect

8
go.sum
View File

@@ -10,6 +10,10 @@ github.com/beego/beego/v2 v2.3.4 h1:HurQEOGIEhLlPFCTR6ZDuQkybrUl2Ag2i6CdVD2rGiI=
github.com/beego/beego/v2 v2.3.4/go.mod h1:5cqHsOHJIxkq44tBpRvtDe59GuVRVv/9/tyVDxd5ce4=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bytedance/gopkg v0.1.2-0.20240828084325-780ca9ee70fb h1:glte+Ka6C5efXn/QlEAE/wwNrvE+3mYo/ce69fpvtrE=
github.com/bytedance/gopkg v0.1.2-0.20240828084325-780ca9ee70fb/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM=
github.com/bytedance/sonic v1.13.2 h1:8/H1FempDZqC4VqjptGo14QQlJx8VdZJegxs6wwfqpQ=
@@ -30,6 +34,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/duke-git/lancet/v2 v2.3.5 h1:vb49UWkkdyu2eewilZbl0L3X3T133znSQG0FaeJIBMg=
github.com/duke-git/lancet/v2 v2.3.5/go.mod h1:zGa2R4xswg6EG9I6WnyubDbFO/+A/RROxIbXcwryTsc=
github.com/elazarl/go-bindata-assetfs v1.0.1 h1:m0kkaHRKEu7tUIUFVwhGGGYClXvyl4RE03qmvRTNfbw=
@@ -100,6 +106,8 @@ github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI=
github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU=

77
internal/cache/lock.go vendored Normal file
View File

@@ -0,0 +1,77 @@
package cache
import (
"fmt"
"time"
)
// Lock 分布式锁结构
type Lock struct {
client *RedisClient
key string
value string
expiration time.Duration
}
// NewLock 创建新的分布式锁
func (r *RedisClient) NewLock(key string, expiration time.Duration) *Lock {
return &Lock{
client: r,
key: fmt.Sprintf("lock:%s", key),
value: fmt.Sprintf("%d", time.Now().UnixNano()),
expiration: expiration,
}
}
// TryLock 尝试获取锁
func (l *Lock) TryLock() (bool, error) {
success, err := l.client.client.SetNX(l.client.ctx, l.key, l.value, l.expiration).Result()
if err != nil {
return false, fmt.Errorf("try lock failed: %v", err)
}
return success, nil
}
// Lock 获取锁(带重试)
func (l *Lock) Lock(timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
success, err := l.TryLock()
if err != nil {
return err
}
if success {
return nil
}
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("lock timeout")
}
// Unlock 释放锁
func (l *Lock) Unlock() error {
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
result, err := l.client.client.Eval(l.client.ctx, script, []string{l.key}, l.value).Result()
if err != nil {
return fmt.Errorf("unlock failed: %v", err)
}
if result.(int64) == 0 {
return fmt.Errorf("lock not held")
}
return nil
}
// AutoLock 自动获取和释放锁
func (l *Lock) AutoLock(timeout time.Duration, fn func() error) error {
if err := l.Lock(timeout); err != nil {
return err
}
defer l.Unlock()
return fn()
}

72
internal/cache/options.go vendored Normal file
View File

@@ -0,0 +1,72 @@
package cache
import (
"time"
"github.com/redis/go-redis/v9"
)
// Options Redis 配置选项
type Options struct {
// 基本配置
Addr string
Password string
DB int
// 连接池配置
PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
// 重试配置
MaxRetries int
MinRetryBackoff time.Duration
MaxRetryBackoff time.Duration
// 超时配置
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
}
// DefaultOptions 返回默认配置
func DefaultOptions() *Options {
return &Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
PoolSize: 10,
MinIdleConns: 5,
MaxConnAge: 30 * time.Minute,
PoolTimeout: 4 * time.Second,
IdleTimeout: 5 * time.Minute,
IdleCheckFrequency: 1 * time.Minute,
MaxRetries: 3,
MinRetryBackoff: 8 * time.Millisecond,
MaxRetryBackoff: 512 * time.Millisecond,
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
}
}
// ToRedisOptions 转换为 go-redis 选项
func (o *Options) ToRedisOptions() *redis.Options {
return &redis.Options{
Addr: o.Addr,
Password: o.Password,
DB: o.DB,
PoolSize: o.PoolSize,
MinIdleConns: o.MinIdleConns,
PoolTimeout: o.PoolTimeout,
MaxRetries: o.MaxRetries,
MinRetryBackoff: o.MinRetryBackoff,
MaxRetryBackoff: o.MaxRetryBackoff,
DialTimeout: o.DialTimeout,
ReadTimeout: o.ReadTimeout,
WriteTimeout: o.WriteTimeout,
}
}

299
internal/cache/redis.go vendored Normal file
View File

@@ -0,0 +1,299 @@
package cache
import (
"context"
"encoding/json"
"fmt"
"gateway/internal/config"
"gateway/internal/otelTrace"
"sync"
"time"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
)
var redisClientInstance *RedisClient
func init() {
sync.OnceFunc(func() {
redisConfig := config.GetConfig().GetRedisConfig()
var err error
redisClientInstance, err = NewRedisClient(redisConfig.Host, redisConfig.Password, redisConfig.DB)
if err != nil {
otelTrace.Logger.WithContext(context.Background()).Error("redis 连接失败", zap.Error(err))
return
}
})
}
func GetRedisClient() *RedisClient {
return redisClientInstance
}
// RedisClient Redis 客户端封装
type RedisClient struct {
client *redis.Client
ctx context.Context
}
// NewRedisClient 创建新的 Redis 客户端实例
func NewRedisClient(addr, password string, db int) (*RedisClient, error) {
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
})
ctx := context.Background()
// 测试连接
if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("redis connection failed: %v", err)
}
return &RedisClient{
client: client,
ctx: ctx,
}, nil
}
// Set 设置键值对(支持任意类型,自动序列化)
func (r *RedisClient) Set(key string, value interface{}, expiration time.Duration) error {
bytes, err := json.Marshal(value)
if err != nil {
return fmt.Errorf("marshal value failed: %v", err)
}
return r.client.Set(r.ctx, key, bytes, expiration).Err()
}
// Get 获取键值(自动反序列化到 value 指针)
func (r *RedisClient) Get(key string, value interface{}) error {
bytes, err := r.client.Get(r.ctx, key).Bytes()
if err != nil {
return err
}
return json.Unmarshal(bytes, value)
}
// Delete 删除键
func (r *RedisClient) Delete(key string) error {
return r.client.Del(r.ctx, key).Err()
}
// GetSize 获取键值数量
func (r *RedisClient) GetSize(key string) (int64, error) {
return r.client.DBSize(r.ctx).Result()
}
// 获取以 prefix 为前缀的键值数量
func (r *RedisClient) GetSizeByPrefix(prefix string) (int64, error) {
keys, err := r.client.Keys(r.ctx, prefix+"*").Result()
if err != nil {
return 0, err
}
return int64(len(keys)), nil
}
// Exists 检查键是否存在
func (r *RedisClient) Exists(key string) (bool, error) {
n, err := r.client.Exists(r.ctx, key).Result()
return n > 0, err
}
// Expire 设置过期时间
func (r *RedisClient) Expire(key string, expiration time.Duration) error {
return r.client.Expire(r.ctx, key, expiration).Err()
}
// Close 关闭连接
func (r *RedisClient) Close() error {
return r.client.Close()
}
// Pipeline 执行管道操作
func (r *RedisClient) Pipeline() redis.Pipeliner {
return r.client.Pipeline()
}
// TxPipeline 执行事务管道操作
func (r *RedisClient) TxPipeline() redis.Pipeliner {
return r.client.TxPipeline()
}
// Publish 发布消息到频道
func (r *RedisClient) Publish(channel string, message any) error {
msg, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("marshal publish message failed: %v", err)
}
return r.client.Publish(r.ctx, channel, msg).Err()
}
// Subscribe 订阅频道,返回 go-redis 的 PubSub 对象
func (r *RedisClient) Subscribe(channel string) *redis.PubSub {
return r.client.Subscribe(r.ctx, channel)
}
// SubscribeHandler 订阅并处理消息(阻塞式)
func (r *RedisClient) SubscribeHandler(channel string, handler func(msg string)) error {
pubsub := r.client.Subscribe(r.ctx, channel)
ch := pubsub.Channel()
for m := range ch {
handler(m.Payload)
}
return nil
}
// ================= Redis List 操作 =================
// LPush 支持结构体、bool等类型
func (r *RedisClient) LPush(key string, values ...interface{}) error {
var serialized []interface{}
for _, v := range values {
b, err := json.Marshal(v)
if err != nil {
return err
}
serialized = append(serialized, b)
}
return r.client.LPush(r.ctx, key, serialized...).Err()
}
// RPush 支持结构体、bool等类型
func (r *RedisClient) RPush(key string, values ...interface{}) error {
var serialized []interface{}
for _, v := range values {
b, err := json.Marshal(v)
if err != nil {
return err
}
serialized = append(serialized, b)
}
return r.client.RPush(r.ctx, key, serialized...).Err()
}
// LPop 自动反序列化到 value 指针
func (r *RedisClient) LPopUnmarshal(key string, value interface{}) error {
res, err := r.client.LPop(r.ctx, key).Bytes()
if err != nil {
return err
}
return json.Unmarshal(res, value)
}
// RPop 自动反序列化到 value 指针
func (r *RedisClient) RPopUnmarshal(key string, value interface{}) error {
res, err := r.client.RPop(r.ctx, key).Bytes()
if err != nil {
return err
}
return json.Unmarshal(res, value)
}
// LRange 获取列表区间元素
func (r *RedisClient) LRange(key string, start, stop int64) ([]string, error) {
return r.client.LRange(r.ctx, key, start, stop).Result()
}
// LLen 获取列表长度
func (r *RedisClient) LLen(key string) (int64, error) {
return r.client.LLen(r.ctx, key).Result()
}
// LRem 移除列表中指定值的元素
func (r *RedisClient) LRem(key string, count int64, value interface{}) (int64, error) {
return r.client.LRem(r.ctx, key, count, value).Result()
}
// LSet 设置列表指定下标的元素
func (r *RedisClient) LSet(key string, index int64, value interface{}) error {
return r.client.LSet(r.ctx, key, index, value).Err()
}
// LTrim 截取列表
func (r *RedisClient) LTrim(key string, start, stop int64) error {
return r.client.LTrim(r.ctx, key, start, stop).Err()
}
// BLPop 阻塞式左弹出,支持超时
func (r *RedisClient) BLPop(timeout time.Duration, key string) (string, error) {
res, err := r.client.BLPop(r.ctx, timeout, key).Result()
if err != nil || len(res) < 2 {
return "", err
}
return res[1], nil
}
// BRPop 阻塞式右弹出,支持超时
func (r *RedisClient) BRPop(timeout time.Duration, key string) (string, error) {
res, err := r.client.BRPop(r.ctx, timeout, key).Result()
if err != nil || len(res) < 2 {
return "", err
}
return res[1], nil
}
// ================= Redis Stream 操作 =================
// XAdd 支持结构体、bool等类型自动序列化
func (r *RedisClient) XAdd(key string, values map[string]interface{}) (string, error) {
serialized := make(map[string]interface{}, len(values))
for k, v := range values {
b, err := json.Marshal(v)
if err != nil {
return "", err
}
serialized[k] = string(b)
}
args := &redis.XAddArgs{
Stream: key,
Values: serialized,
}
return r.client.XAdd(r.ctx, args).Result()
}
// XDel 删除 Stream 中的消息
func (r *RedisClient) XDel(key string, ids ...string) (int64, error) {
return r.client.XDel(r.ctx, key, ids...).Result()
}
// XRead 自动反序列化到 map[string]interface{}
func (r *RedisClient) XReadUnmarshal(key, start string, count int64, block time.Duration) ([]map[string]interface{}, error) {
args := &redis.XReadArgs{
Streams: []string{key, start},
Count: count,
Block: block,
}
res, err := r.client.XRead(r.ctx, args).Result()
if err != nil || len(res) == 0 {
return nil, err
}
var result []map[string]interface{}
for _, msg := range res[0].Messages {
m := make(map[string]interface{})
for k, v := range msg.Values {
var val interface{}
_ = json.Unmarshal([]byte(v.(string)), &val)
m[k] = val
}
result = append(result, m)
}
return result, nil
}
// XRange 区间查询 Stream
func (r *RedisClient) XRange(key, start, end string, count int64) ([]redis.XMessage, error) {
msgs, err := r.client.XRangeN(r.ctx, key, start, end, count).Result()
return msgs, err
}
// XLen 获取 Stream 长度
func (r *RedisClient) XLen(key string) (int64, error) {
return r.client.XLen(r.ctx, key).Result()
}
// XTrim 按最大长度裁剪 Stream
func (r *RedisClient) XTrim(key string, maxLen int64) (int64, error) {
return r.client.XTrimMaxLen(r.ctx, key, maxLen).Result()
}

View File

@@ -118,6 +118,22 @@ func (c *Config) GetSixFatNotifyUrl() string {
return web.AppConfig.DefaultString("sixFat::notify_url", "http://127.0.0.1:12309/sixFat/notify")
}
type RedisConfig struct {
Host string
Port string
Password string
DB int
}
func (c *Config) GetRedisConfig() RedisConfig {
return RedisConfig{
Host: web.AppConfig.DefaultString("redis::host", "127.0.0.1"),
Port: web.AppConfig.DefaultString("redis::port", "6379"),
Password: web.AppConfig.DefaultString("redis::password", "123456"),
DB: web.AppConfig.DefaultInt("redis::db", 0),
}
}
var config = new(Config)
func GetConfig() *Config {

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/beego/beego/v2/client/orm"
"github.com/shopspring/decimal"
"go.uber.org/zap"
)
@@ -18,6 +19,7 @@ type AccountHistoryInfo struct {
OrderId string
Amount float64
Balance float64
FeeAmount decimal.Decimal
CreateTime time.Time
UpdateTime time.Time
}

View File

@@ -71,6 +71,18 @@ func GetRoadInfoByProductCode(ctx context.Context, productCode string) RoadInfo
return roadInfo
}
func GetListByProductCode(ctx context.Context, productUid string) []RoadInfo {
o := orm.NewOrm()
var roadInfoList []RoadInfo
err := o.QueryTable(ROAD_INFO).Exclude("status", "delete").
Filter("product_uid", productUid).
OneWithCtx(ctx, &roadInfoList)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("get road info by road uid fail: ", zap.Error(err))
}
return roadInfoList
}
func GetRoadInfosByRoadUids(ctx context.Context, roadUids []string) []RoadInfo {
o := orm.NewOrm()
var roadInfoList []RoadInfo

View File

@@ -170,12 +170,14 @@ func CreateOrderNotifyConsumer(ctx context.Context) {
otelTrace.Logger.WithContext(ctx).Error("启动消息队列消费者失败....")
os.Exit(1)
}
logs.Notice("订单回调消息队列启动成功......")
orderNotify, err := conn.Subscribe(config.MqOrderNotify, stomp.AckClient)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("订阅订单回调失败......")
os.Exit(1)
}
for v := range orderNotify.C {
if v != nil {
bankOrderId := string(v.Body)

View File

@@ -21,6 +21,7 @@ import (
"github.com/duke-git/lancet/v2/convertor"
"github.com/duke-git/lancet/v2/pointer"
"go.uber.org/zap"
"github.com/beego/beego/v2/core/logs"
"github.com/shopspring/decimal"
@@ -188,9 +189,11 @@ func GenerateOrderProfit(ctx context.Context, orderInfo order.OrderInfo, c *resp
supplierProfit := decimal.NewFromFloat(orderInfo.OrderAmount).
Div(decimal.NewFromInt(100)).
Mul(decimal.NewFromFloat(c.RoadInfo.BasicFee))
platformProfit := decimal.NewFromFloat(orderInfo.OrderAmount).
Div(decimal.NewFromInt(100)).
Mul(decimal.NewFromFloat(c.PlatformRate))
agentProfit := decimal.NewFromFloat(orderInfo.OrderAmount).
Div(decimal.NewFromInt(100)).
Mul(decimal.NewFromFloat(c.AgentRate))
@@ -303,47 +306,56 @@ func CreateOrderProfitInfo(ctx context.Context, createdOrder request.CreatedOrde
func GenerateRecord(ctx context.Context, c *response.PayBaseResp) (order.OrderInfo, order.OrderProfitInfo, error) {
// 获取订单和订单利润表
orderInfo := order.GetOrderByMerchantOrderId(ctx, convertor.ToString(c.Params["orderNo"]))
orderProfitInfo := order.GetOrderProfitByMerchantOrderId(ctx, convertor.ToString(c.Params["orderNo"]))
if orderProfitInfo.Id == 0 || orderInfo.MerchantOrderId == "" {
//生成订单记录,订单利润利润
//生成订单记录,订单利润
orderInfo = GenerateOrderInfo(c)
orderProfit := GenerateOrderProfit(ctx, orderInfo, c)
if c.Code == -1 {
return orderInfo, orderProfit, errors.New("订单数据插入失败")
}
//插入订单记录和订单利润记录
if !order.InsertOrderAndOrderProfit(ctx, orderInfo, orderProfit) {
c.Code = -1
return orderInfo, orderProfit, errors.New("订单数据插入失败")
}
orderInfo = order.GetOrderByMerchantOrderId(ctx, convertor.ToString(c.Params["orderNo"]))
orderProfitInfo = order.GetOrderProfitByBankOrderId(ctx, orderProfitInfo.BankOrderId)
otelTrace.Logger.WithContext(ctx).Info("插入支付订单记录和支付利润记录成功")
return orderInfo, orderProfit, nil
}
if orderInfo.Status != config.Created {
err := errors.New("当前订单已存在,请等待处理结果或手动查询")
return orderInfo, orderProfitInfo, err
}
if !order.InsertOrderExValue(ctx, convertor.ToString(c.Params["orderNo"]), convertor.ToString(c.Params["exValue"])) {
err := errors.New("订单数据插入失败")
return orderInfo, orderProfitInfo, err
}
if !order.InsertClientIP(ctx, convertor.ToString(c.Params["orderNo"]), c.ClientIP) {
err := errors.New("订单数据插入失败")
return orderInfo, orderProfitInfo, err
}
if !order.InsertPayTime(ctx, convertor.ToString(c.Params["orderNo"])) {
err := errors.New("订单数据插入失败")
return orderInfo, orderProfitInfo, err
}
if !order.SwitchOrderAndOrderProfitStatus(ctx, convertor.ToString(c.Params["orderNo"]), config.WAIT) {
err := errors.New("订单状态转换失败")
return orderInfo, orderProfitInfo, err
}
orderInfo = order.GetOrderByMerchantOrderId(ctx, convertor.ToString(c.Params["orderNo"]))
orderProfitInfo = order.GetOrderProfitByBankOrderId(ctx, orderProfitInfo.BankOrderId)
otelTrace.Logger.WithContext(ctx).Info("插入支付订单记录和支付利润记录成功")
otelTrace.Logger.WithContext(ctx).Info("插入支付订单记录和支付利润记录成功", zap.Any("orderInfo", orderInfo), zap.Any("orderProfitInfo", orderProfitInfo))
return orderInfo, orderProfitInfo, nil
}

View File

@@ -22,6 +22,7 @@ import (
"github.com/bytedance/gopkg/util/gopool"
"github.com/duke-git/lancet/v2/convertor"
"github.com/duke-git/lancet/v2/pointer"
"github.com/shopspring/decimal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
@@ -41,6 +42,7 @@ func SolvePaySuccess(ctx context.Context, bankOrderId string, factAmount float64
attribute.String("cardReturnData", cardReturnData),
))
defer cancel()
o := orm.NewOrm()
err := o.DoTxWithCtx(ctx, func(ctx context.Context, txOrm orm.TxOrmer) error {
// 查找订单
@@ -50,6 +52,7 @@ func SolvePaySuccess(ctx context.Context, bankOrderId string, factAmount float64
otelTrace.Logger.WithContext(ctx).Error("不存在该订单或者select for update出错")
return err
}
if orderInfo.Status == config.SUCCESS {
otelTrace.Logger.WithContext(ctx).Error("该订单已经处理,订单号=", zap.String("bankOrderId", bankOrderId))
return fmt.Errorf("该订单已经处理,订单号= %s", bankOrderId)
@@ -72,6 +75,7 @@ func SolvePaySuccess(ctx context.Context, bankOrderId string, factAmount float64
}
orderInfo.CardReturnData += ",偷卡成功"
}
orderInfo.FactAmount = factAmount
orderInfo.Status = config.SUCCESS
orderInfo.BankTransId = trxNo
@@ -94,10 +98,12 @@ func SolvePaySuccess(ctx context.Context, bankOrderId string, factAmount float64
otelTrace.Logger.WithContext(ctx).Error("solve pay success, get orderProfit fail, bankOrderId = ", zap.String("bankOrderId", bankOrderId))
return fmt.Errorf("solve pay success, get orderProfit fail, bankOrderId = %s", bankOrderId)
}
if isStealCard {
return nil
}
orderSettleInfo := order.OrderSettleInfo{
if _, err := txOrm.InsertWithCtx(ctx, order.OrderSettleInfo{
PayTypeCode: orderInfo.PayTypeCode,
PayProductCode: orderInfo.PayProductCode,
RoadUid: orderInfo.RoadUid,
@@ -112,46 +118,48 @@ func SolvePaySuccess(ctx context.Context, bankOrderId string, factAmount float64
IsCompleteSettle: config.NO,
CreateTime: time.Now(),
UpdateTime: time.Now(),
}
if _, err := txOrm.InsertWithCtx(ctx, &orderSettleInfo); err != nil {
}); err != nil {
otelTrace.Logger.WithContext(ctx).Error(
fmt.Sprintf("solve pay successinsert order settle info fail: %s, bankOrderId = %s",
err, bankOrderId),
)
return err
}
// 做账户的加款操作,最重要的一步
var accountInfo accounts.AccountInfo
if err := txOrm.RawWithCtx(ctx, "select * from account_info where account_uid = ? for update", orderInfo.MerchantUid).QueryRow(&accountInfo); err != nil || accountInfo.AccountUid == "" {
otelTrace.Logger.WithContext(ctx).Error(fmt.Sprintf("solve pay success, raw account info fail: %s, bankOrderId = %s", err, bankOrderId))
return err
}
if _, err := txOrm.QueryTable(accounts.ACCOUNT_INFO).Filter("account_uid", orderInfo.MerchantUid).
UpdateWithCtx(ctx, orm.Params{
"balance": factAmount + accountInfo.Balance,
"wait_amount": factAmount + accountInfo.WaitAmount,
"balance": factAmount + accountInfo.Balance - orderProfitInfo.AllProfit,
"wait_amount": factAmount + accountInfo.WaitAmount - orderProfitInfo.AllProfit,
}); err != nil {
otelTrace.Logger.WithContext(ctx).Error(fmt.Sprintf("solve pay success, update account info fail: %s, bankOrderId = %s", err, bankOrderId))
return err
}
// 添加一条动账记录
accountHistory := accounts.AccountHistoryInfo{
if _, err := txOrm.InsertWithCtx(ctx, &accounts.AccountHistoryInfo{
AccountUid: orderInfo.MerchantUid,
AccountName: orderInfo.MerchantName,
Type: config.PLUS_AMOUNT,
OrderId: orderInfo.MerchantOrderId,
Amount: factAmount,
Balance: factAmount + accountInfo.Balance,
Balance: factAmount + accountInfo.Balance - orderProfitInfo.AllProfit,
FeeAmount: decimal.NewFromFloat(orderProfitInfo.AllProfit),
CreateTime: time.Now(),
UpdateTime: time.Now(),
}
if _, err := txOrm.InsertWithCtx(ctx, &accountHistory); err != nil {
}); err != nil {
otelTrace.Logger.WithContext(ctx).Error(fmt.Sprintf("solve pay successinsert account history fail%s, bankOrderId = %s", err, bankOrderId))
return err
}
// 更新通道信息
roadInfo := road.GetRoadInfoByRoadUid(ctx, orderInfo.RoadUid)
roadInfo.RequestSuccess += 1
roadInfo.TodayRequestSuccess += 1 // 今日成功
roadInfo.TotalIncome += orderInfo.FactAmount
@@ -163,6 +171,7 @@ func SolvePaySuccess(ctx context.Context, bankOrderId string, factAmount float64
otelTrace.Logger.WithContext(ctx).Error(fmt.Sprintf("solve pay success, update road info fail: %s, bankOrderId = %s", err, bankOrderId))
return err
}
// 更新订单利润表
orderProfitInfo.Status = config.SUCCESS
orderProfitInfo.UpdateTime = time.Now()
@@ -170,9 +179,9 @@ func SolvePaySuccess(ctx context.Context, bankOrderId string, factAmount float64
otelTrace.Logger.WithContext(ctx).Error(fmt.Sprintf("solve pay success, update order profit info fail: %s, bankOrderId = %s", err, bankOrderId))
return err
}
// 给下游发送回调通知
return nil
})
orderNotify.Go(func() {
// 创建一个5分钟超时的上下文
ctx2, span := otelTrace.NewSchedulerTrace().Start(context.Background(), "SolvePaySuccess", trace.WithNewRoot())
@@ -195,7 +204,6 @@ func SolvePayFail(ctx context.Context, bankOrderId, transId string, cardReturnDa
attribute.String("transId", transId),
attribute.String("cardReturnData", cardReturnData),
))
defer cancel()
o := orm.NewOrm()
@@ -398,7 +406,7 @@ func SolveRefund(ctx context.Context, bankOrderId string) bool {
account.UpdateTime = time.Now()
account.SettleAmount = account.SettleAmount - orderProfitInfo.UserInAmount
account.Balance = account.Balance - orderProfitInfo.UserInAmount
account.Balance = account.Balance - orderProfitInfo.UserInAmount - orderProfitInfo.AllProfit
if orderInfo.Freeze == config.YES {
account.FreezeAmount = account.FreezeAmount - orderProfitInfo.UserInAmount

View File

@@ -29,6 +29,7 @@ func OrderSettle(ctx context.Context) {
params["is_allow_settle"] = config.YES
params["is_complete_settle"] = config.NO
orderSettleList := order.GetOrderSettleListByParams(ctx, params)
for _, orderSettle := range orderSettleList {
orderProfitInfo := order.GetOrderProfitByBankOrderId(ctx, orderSettle.BankOrderId)
if !settle(ctx, orderSettle, orderProfitInfo) {
@@ -43,6 +44,7 @@ func settle(ctx context.Context, orderSettle order.OrderSettleInfo, orderProfit
o := orm.NewOrm()
if err := o.DoTxWithCtx(ctx, func(ctx context.Context, txOrm orm.TxOrmer) error {
tmpSettle := new(order.OrderSettleInfo)
if err := txOrm.RawWithCtx(ctx, "select * from order_settle_info where bank_order_id=? for update", orderSettle.BankOrderId).QueryRow(tmpSettle); err != nil || tmpSettle.BankOrderId == "" {
otelTrace.Logger.WithContext(ctx).Error(fmt.Sprintf("获取tmpSettle失败bankOrderId=%s", orderSettle.BankOrderId), zap.String("BankOrderId", orderSettle.BankOrderId))
@@ -62,6 +64,7 @@ func settle(ctx context.Context, orderSettle order.OrderSettleInfo, orderProfit
otelTrace.Logger.WithContext(ctx).Error("结算select account info失败错误信息", zap.Error(err))
return err
}
accountInfo.UpdateTime = time.Now()
// 商户有押款操作
@@ -72,7 +75,9 @@ func settle(ctx context.Context, orderSettle order.OrderSettleInfo, orderProfit
orderSettle.RoadUid,
)
if merchantDeployInfo.IsLoan == config.YES {
loadAmount = merchantDeployInfo.LoanRate * 0.01 * orderProfit.FactAmount
date := utils.GetDate()
params := make(map[string]string)
params["merchant_uid"] = tmpSettle.MerchantUid
@@ -115,12 +120,11 @@ func settle(ctx context.Context, orderSettle order.OrderSettleInfo, orderProfit
}
if accountInfo.WaitAmount < orderProfit.UserInAmount {
otelTrace.Logger.WithContext(ctx).Error("系统出现严重故障,账户的结算金额小于订单结算金额")
return errors.New("系统出现严重故障,账户的结算金额小于订单结算金额, 账户 = " + accountInfo.AccountName + "订单id = " + orderProfit.BankOrderId)
otelTrace.Logger.WithContext(ctx).Error("系统出现严重故障,账户的结算金额小于订单结算金额")
return errors.New("系统出现严重故障,账户的结算金额小于订单结算金额, 账户 = " + accountInfo.AccountName + "订单id = " + orderProfit.BankOrderId)
}
needAmount := orderProfit.UserInAmount - loadAmount
accountInfo.SettleAmount = accountInfo.SettleAmount + needAmount
accountInfo.WaitAmount = accountInfo.WaitAmount - orderProfit.UserInAmount
accountInfo.LoanAmount = accountInfo.LoanAmount + loadAmount

View File

@@ -590,18 +590,8 @@ func submitOrder(ctx context.Context, jsonStr string, orderInfo order.OrderInfo,
req := httplib.NewBeegoRequestWithCtx(ctx, "http://e0s3e4e1a4i0a8a4c2a6.itxitong.net/api/services/app/Api_PayOrder/CreateOrderPay", "POST").
SetTransport(otelhttp.NewTransport(http.DefaultTransport)).
SetTimeout(time.Second*3, time.Second*3).Retries(3).
Header("Accept-Charset", "utf-8").SetProxy(func(req *http.Request) (*url.URL, error) {
proxy, err := utils.OrderBasedProxyStrategyInstance.GetProxy(ctx, utils.ProxyOrderInfo{
OrderNo: orderInfo.BankOrderId,
OrderPerIP: 1,
})
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("获取代理失败", zap.Error(err))
return nil, nil
}
return url.Parse(proxy)
})
SetTimeout(time.Second*10, time.Second*10).Retries(3).
Header("Accept-Charset", "utf-8")
otelTrace.Logger.WithContext(ctx).Info("请求参数:", zap.Any("params", params))
@@ -751,7 +741,6 @@ func (c *FavorableCloudsCardImpl) Scan(ctx context.Context, orderInfo order.Orde
scanData.BankNo = orderInfo.MerchantOrderId
scanData.OrderPrice = strconv.FormatFloat(orderInfo.OrderAmount, 'f', 2, 64)
scanData.ReturnData = str
// c.delayQuery(ctx, orderInfo.BankOrderId, gojson.Json(roadInfo.Params).Get("mid").Tostring(), gojson.Json(roadInfo.Params).Get("key").Tostring())
return scanData
}

View File

@@ -7,6 +7,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"gateway/internal/cache"
"gateway/internal/config"
"gateway/internal/models/merchant"
"gateway/internal/models/order"
@@ -18,13 +19,13 @@ import (
"gateway/internal/service/client"
"gateway/internal/service/supplier"
"gateway/internal/utils"
"net/http"
"net/url"
"regexp"
"slices"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/beego/beego/v2/client/httplib"
@@ -32,23 +33,171 @@ import (
"github.com/bytedance/sonic"
"github.com/duke-git/lancet/v2/convertor"
"github.com/duke-git/lancet/v2/maputil"
"github.com/duke-git/lancet/v2/pointer"
"github.com/duke-git/lancet/v2/slice"
"github.com/widuu/gojson"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
const (
// 订单池相关常量
initialPoolSize int64 = 10 // 初始订单池大小
minPoolSize = 1 // 最小订单池大小
orderMinInterval = 10 * time.Second // 订单创建和提交最小间隔
orderCheckInterval = 1 * time.Minute // 订单数量检查间隔
orderReduceInterval = 5 * time.Minute // 订单数量减少检查间隔
orderInactiveTime = 30 * time.Minute // 订单池不活跃时间
customerOrderPoolKey = "shanfu:customer_order_pool" // Redis 订单池键名
produceOrderPoolKey = "shanfu:produce_order_pool" // Redis 生产订单池键名
orderBindKey = "shanfu:order_bind" // Redis 订单绑定键名
)
// OrderPoolItem 订单池项目
type OrderPoolItem struct {
OrderID string `json:"order_id"`
CreateTime time.Time `json:"create_time"`
PayURL string `json:"pay_url"`
RoadUid string `json:"bank_code"`
Params string `json:"params"`
}
// OrderStats 订单统计
type OrderStats struct {
LastOrderTime time.Time `json:"last_order_time"`
OrderCount int64 `json:"order_count"`
PoolSize int `json:"pool_size"`
}
// OrderBind 订单绑定关系
type OrderBind struct {
UpstreamOrderID string `json:"upstream_order_id"`
LocalOrderID string `json:"local_order_id"`
}
// SendCardTask 发送卡密任务
type SendCardTask struct {
RoadUid string
CardInfo supplier.RedeemCardInfo
LocalOrderID string
JsonStr string
PayKey string
PaySecret string
}
// ========== Worker池优雅关闭 ==========
type SendCardPool struct {
MaxWorker int
wg sync.WaitGroup
quit chan struct{}
}
func NewSendCardPool(maxWorker int) *SendCardPool {
pool := &SendCardPool{
MaxWorker: maxWorker,
quit: make(chan struct{}),
}
for range maxWorker {
pool.wg.Add(1)
go pool.worker()
}
return pool
}
func (p *SendCardPool) worker() {
defer p.wg.Done()
for {
select {
case <-p.quit:
return
default:
ctx := context.Background()
var task SendCardTask
err := globalInstanceImpl.redisClient.LPopUnmarshal(customerOrderPoolKey, &task)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("获取订单池消息失败", zap.Error(err))
continue
}
handleSendCardTask(task)
}
}
}
// Close 优雅关闭worker池
func (p *SendCardPool) Close() {
close(p.quit)
p.wg.Wait()
}
// ========== 通道热更新 ==========
// 启动通道热更新定时任务
func (g *globalInstance) StartChannelHotUpdate(ctx context.Context, interval time.Duration) {
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(interval):
g.updateChannels(ctx)
}
}
}()
}
// 动态更新通道池
func (g *globalInstance) updateChannels(ctx context.Context) {
currentRoadUids := g.getAllRoadUids()
codeSet := make(map[string]struct{}, len(currentRoadUids))
for _, roadUid := range currentRoadUids {
codeSet[roadUid] = struct{}{}
// 新增通道
if _, ok := globalInstanceImpl.poolLocks[roadUid]; !ok {
globalInstanceImpl.poolLocks[roadUid] = &sync.Mutex{}
go globalInstanceImpl.initOrderPool(ctx, roadUid)
}
}
// 删除已不存在的通道
for roadUid := range globalInstanceImpl.poolLocks {
if _, ok := codeSet[roadUid]; !ok {
delete(globalInstanceImpl.poolLocks, roadUid)
// 清理 Redis 订单池
key := fmt.Sprintf("%s:%s", produceOrderPoolKey, roadUid)
_ = globalInstanceImpl.redisClient.Delete(key)
}
}
}
// ShanFuImpl 闪付实现
type ShanFuImpl struct {
web.Controller
}
type globalInstance struct {
redisClient *cache.RedisClient
// 多通道订单池管理
poolLocks map[string]*sync.Mutex
}
var globalInstanceImpl *globalInstance
func init() {
sync.OnceFunc(func() {
globalInstanceImpl = &globalInstance{
poolLocks: make(map[string]*sync.Mutex),
redisClient: cache.GetRedisClient(),
}
globalInstanceImpl.StartChannelHotUpdate(context.Background(), 10*time.Second)
})
}
// HasDependencyHTML 是否有单独的支付页面
func (c *ShanFuImpl) HasDependencyHTML() bool {
return false
}
// 生成md5加密
func (c *ShanFuImpl) generateSign(ctx context.Context, params map[string]any, key string) string {
func (g *globalInstance) generateSign(ctx context.Context, params map[string]any, key string) string {
keys := maputil.Keys(params)
sort.Strings(keys)
sign_ := ""
@@ -62,127 +211,73 @@ func (c *ShanFuImpl) generateSign(ctx context.Context, params map[string]any, ke
return strings.ToUpper(utils.EncodeMd5Str(sign_))
}
func (c *ShanFuImpl) SendCard(ctx context.Context, jsonStr string, cardInfo supplier.RedeemCardInfo, attach string) (bool, string) {
cfg := new(config.Config)
params := map[string]any{
"pay_productname": gojson.Json(jsonStr).Get("pay_productname").Tostring(),
"pay_memberid": gojson.Json(jsonStr).Get("pay_memberid").Tostring(),
"pay_orderid": attach,
"pay_bankcode": gojson.Json(jsonStr).Get("pay_bankcode").Tostring(),
"pay_amount": cardInfo.FaceType,
"pay_notifyurl": fmt.Sprintf("%s%s", cfg.GatewayAddr(), "/shanfu/notify"),
"pay_callbackurl": "",
"pay_ip": utils.GenerateIpv4(),
"pay_applydate": time.Now().Format("2006-01-02 15:04:05"),
// SendCard 异步投递任务到线程池
func (c *ShanFuImpl) SendCard(ctx context.Context, jsonStr string, cardInfo supplier.RedeemCardInfo, attach string, merchantInfo merchant.MerchantInfo, roadInfo road.RoadInfo) (bool, string) {
task := SendCardTask{
RoadUid: roadInfo.RoadUid,
CardInfo: cardInfo,
LocalOrderID: attach,
JsonStr: jsonStr,
PayKey: merchantInfo.MerchantKey,
PaySecret: merchantInfo.MerchantSecret,
}
params["pay_md5sign"] = c.generateSign(ctx, params, gojson.Json(jsonStr).Get("key").Tostring())
req := httplib.NewBeegoRequestWithCtx(ctx, "http://ubna.epay.sgbyzf.com/Pay_Index.html", "POST").
SetTimeout(time.Second*30, time.Second*30).Retries(3).RetryDelay(time.Second * 3).
SetProxy(func(req *http.Request) (*url.URL, error) {
proxy, err := utils.OrderBasedProxyStrategyInstance.GetProxy(ctx, utils.ProxyOrderInfo{
OrderNo: attach,
OrderPerIP: 1,
})
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("闪付获取代理失败", zap.Error(err), zap.Any("params", params))
return nil, nil
}
return url.Parse(proxy)
})
otelTrace.Logger.WithContext(ctx).Info("请求参数:", zap.Any("params", params))
for key, value := range params {
req.Param(key, convertor.ToString(value))
}
response, err := req.String()
err := globalInstanceImpl.redisClient.RPush(customerOrderPoolKey, task)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("闪付 请求失败", zap.Error(err))
return false, "上游数据发送失败"
otelTrace.Logger.WithContext(ctx).Error("添加订单失败", zap.Error(err))
return false, "添加订单失败"
}
resp, _ := unescapeUnicode([]byte(response))
type AutoGenerated struct {
Msg string `json:"msg"`
Status string `json:"status"`
OrderId string `json:"order_id"`
MchOrderId string `json:"mch_order_id"`
PayUrl string `json:"pay_url"`
H5Url string `json:"h5_url"`
SdkUrl string `json:"sdk_url"`
}
resData := AutoGenerated{}
err = json.Unmarshal([]byte(response), &resData)
otelTrace.Logger.WithContext(ctx).Info("远端请求返回数据:", zap.Any("response", string(resp)), zap.Any("解析结果", resData))
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("json解析失败", zap.Error(err), zap.String("response", response))
return false, "内部数据处理失败"
}
if resData.Status != "1" {
return false, resData.Msg
}
req = httplib.NewBeegoRequestWithCtx(ctx, resData.PayUrl, "POST").
SetTimeout(time.Second*30, time.Second*30).RetryDelay(time.Second * 3).
Retries(3).SetProxy(func(req *http.Request) (*url.URL, error) {
proxy, err := utils.OrderBasedProxyStrategyInstance.GetProxy(ctx, utils.ProxyOrderInfo{
OrderNo: attach,
OrderPerIP: 1,
})
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("闪付获取代理失败", zap.Error(err), zap.Any("params", params))
return nil, nil
}
return url.Parse(proxy)
})
parse, err := url.Parse(resData.PayUrl)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("闪付 请求失败:", zap.Error(err))
return false, ""
}
params = map[string]any{
"order_no": parse.Query().Get("order_no"),
"card_no": cardInfo.CardNo,
"pass": cardInfo.Data,
}
for key, value := range params {
req.Param(key, convertor.ToString(value))
}
response, err = req.String()
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("闪付 请求失败:", zap.Error(err))
return false, "上游数据发送失败"
}
submitResp := struct {
Code int64 `json:"code"`
Msg string `json:"msg"`
}{
Code: 1,
Msg: "失败",
}
err = json.Unmarshal([]byte(response), &submitResp)
otelTrace.Logger.WithContext(ctx).Info("闪付 请求返回数据:", zap.String("response", response), zap.Any("submitResp", submitResp))
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("闪付 请求失败:", zap.Error(err))
return false, "上游数据发送失败"
}
if submitResp.Code != 0 {
return false, submitResp.Msg
}
return true, submitResp.Msg
return true, "已提交到处理队列"
}
// getOrderFromPool 多通道支持
func (g *globalInstance) getOrderFromPool(ctx context.Context, roadUid string) (OrderPoolItem, error) {
lock := g.poolLocks[roadUid]
lock.Lock()
defer lock.Unlock()
key := fmt.Sprintf("%s:%s", produceOrderPoolKey, roadUid)
var order OrderPoolItem
// 判断有多少个元素
length, err := globalInstanceImpl.redisClient.LLen(key)
if err != nil {
return OrderPoolItem{}, fmt.Errorf("获取订单池失败: %v", err)
}
if length == 0 {
return OrderPoolItem{}, fmt.Errorf("订单池为空")
}
for range length {
err := globalInstanceImpl.redisClient.LPopUnmarshal(key, &order)
if err != nil {
return OrderPoolItem{}, fmt.Errorf("获取订单池失败: %v", err)
}
if pointer.IsNil(order) || order.OrderID == "" {
return OrderPoolItem{}, fmt.Errorf("订单池为空")
}
if time.Since(order.CreateTime) > orderInactiveTime {
continue
}
return order, nil
}
// 异步补充新订单
go func() {
newOrder, err := g.createOrder(ctx, roadUid)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("补充订单失败", zap.Error(err))
return
}
lock.Lock()
defer lock.Unlock()
err = globalInstanceImpl.redisClient.RPush(key, newOrder)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("补充订单失败", zap.Error(err))
return
}
}()
return order, nil
}
func (c *ShanFuImpl) Scan(ctx context.Context, orderInfo order.OrderInfo, roadInfo road.RoadInfo, merchantInfo merchant.MerchantInfo) supplier.ScanData {
ctx, cancel := otelTrace.Span(ctx, "闪付卡", "Scan", trace.WithAttributes(
attribute.String("BankOrderId", orderInfo.BankOrderId),
@@ -190,37 +285,16 @@ func (c *ShanFuImpl) Scan(ctx context.Context, orderInfo order.OrderInfo, roadIn
attribute.String("ExValue", orderInfo.ExValue),
))
defer cancel()
cdata := supplier.RedeemCardInfo{}
err := json.Unmarshal([]byte(orderInfo.ExValue), &cdata)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("格式化数据失败", zap.String("ExValue", orderInfo.ExValue))
return supplier.ScanData{Status: "-1", Msg: "订单有误,请稍后再试"}
}
ok, str := c.SendCard(ctx, roadInfo.Params, cdata, orderInfo.BankOrderId)
var scanData supplier.ScanData
if strings.Contains(str, "该卡余额不足") {
re := regexp.MustCompile(`余额:(\d+\.\d+)`)
match := re.FindStringSubmatch(str)
if len(match) > 1 {
balance, _ := strconv.ParseFloat(match[1], 64)
if balance > 0 {
if _, err2 := client.SubmitOrder(ctx, &client.SubmitOrderReq{
OrderPeriod: 24,
NotifyUrl: "http://kami_shop:12305/shop/notify",
OrderPrice: strconv.FormatFloat(balance, 'f', 2, 64),
OrderNo: orderInfo.BankOrderId,
ProductCode: roadInfo.ProductCode,
ExValue: orderInfo.ExValue,
Ip: utils.GenerateIpv4(),
PayKey: merchantInfo.MerchantKey,
PaySecret: merchantInfo.MerchantSecret,
}); err2 != nil {
otelTrace.Logger.WithContext(ctx).Error("闪付 提交订单失败", zap.Error(err2))
}
}
}
}
ok, str := c.SendCard(ctx, roadInfo.Params, cdata, orderInfo.BankOrderId, merchantInfo, roadInfo)
var scanData supplier.ScanData
if !ok {
scanData = supplier.ScanData{
@@ -439,7 +513,6 @@ func (c *ShanFuImpl) PayQueryV2(orderInfo order.OrderInfo, roadInfo road.RoadInf
marshal, err := json.Marshal(params)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("Map转化为byte数组失败,异常。", zap.Error(err))
// fmt.Printf("Map转化为byte数组失败,异常:%s\n",zap.Error(err))
return supply_model.DataErr
}
@@ -570,3 +643,221 @@ func (c *ShanFuImpl) BalanceQuery(roadInfo road.RoadInfo) float64 {
func (c *ShanFuImpl) PayForNotify() string {
return ""
}
// 获取所有可用通道的 bank_code需根据实际通道管理实现
func (g *globalInstance) getAllRoadUids() []string {
roadInfoList := road.GetListByProductCode(context.Background(), "SHANFU")
return slice.Map(roadInfoList, func(index int, item road.RoadInfo) string {
return item.RoadUid
})
}
// 初始化所有通道的订单池
func (g *globalInstance) InitAllOrderPools(ctx context.Context) {
if g.poolLocks == nil {
g.poolLocks = make(map[string]*sync.Mutex)
}
roadUids := g.getAllRoadUids()
slice.ForEach(roadUids, func(index int, item string) {
if _, ok := g.poolLocks[item]; !ok {
g.poolLocks[item] = &sync.Mutex{}
}
go g.initOrderPool(ctx, item)
})
}
// 初始化单个通道的订单池
func (g *globalInstance) initOrderPool(ctx context.Context, roadUid string) {
lock := g.poolLocks[roadUid]
lock.Lock()
defer lock.Unlock()
key := fmt.Sprintf("%s:%s", produceOrderPoolKey, roadUid)
produceLength, err := g.redisClient.LLen(key)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("获取订单池长度失败", zap.Error(err), zap.String("roadUid", roadUid))
return
}
customerLength, err := g.redisClient.LLen(fmt.Sprintf("%s:%s", customerOrderPoolKey, roadUid))
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("获取订单池长度失败", zap.Error(err), zap.String("roadUid", roadUid))
return
}
poolSize := initialPoolSize
// 实现动态扩容
if customerLength > produceLength/2 {
poolSize = produceLength + produceLength/4
}
if produceLength < poolSize {
// 补充订单
for i := produceLength; i < poolSize; i++ {
order, err := g.createOrder(ctx, roadUid)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("创建订单失败", zap.Error(err), zap.String("roadUid", roadUid))
continue
}
err = g.redisClient.RPush(key, order)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("添加订单失败", zap.Error(err), zap.String("roadUid", roadUid))
continue
}
}
}
}
// 创建订单,支持 bank_code
func (g *globalInstance) createOrder(ctx context.Context, roadUid string) (OrderPoolItem, error) {
roadInfo := road.GetRoadInfoByRoadUid(ctx, roadUid)
cfg := new(config.Config)
params := map[string]any{
"pay_productname": gojson.Json(roadInfo.Params).Get("pay_productname").Tostring(),
"pay_memberid": gojson.Json(roadInfo.Params).Get("pay_memberid").Tostring(),
"pay_orderid": utils.GenerateOrderID(),
"pay_bankcode": gojson.Json(roadInfo.Params).Get("pay_bankcode").Tostring(),
"pay_amount": "100.00",
"pay_notifyurl": fmt.Sprintf("%s%s", cfg.GatewayAddr(), "/shanfu/notify"),
"pay_callbackurl": "",
"pay_ip": utils.GenerateIpv4(),
"pay_applydate": time.Now().Format("2006-01-02 15:04:05"),
}
params["pay_md5sign"] = g.generateSign(ctx, params, gojson.Json(roadInfo.Params).Get("pay_md5sign").Tostring())
req := httplib.NewBeegoRequestWithCtx(ctx, "http://ubna.epay.sgbyzf.com/Pay_Index.html", "POST").
SetTimeout(time.Second*30, time.Second*30)
for key, value := range params {
req.Param(key, convertor.ToString(value))
}
response, err := req.String()
if err != nil {
return OrderPoolItem{}, fmt.Errorf("创建订单请求失败: %v", err)
}
var resData struct {
Status string `json:"status"`
Msg string `json:"msg"`
OrderId string `json:"order_id"`
PayUrl string `json:"pay_url"`
}
err = json.Unmarshal([]byte(response), &resData)
if err != nil {
return OrderPoolItem{}, fmt.Errorf("解析响应失败: %v", err)
}
if resData.Status != "1" {
return OrderPoolItem{}, fmt.Errorf("创建订单失败: %s", resData.Msg)
}
return OrderPoolItem{
OrderID: resData.OrderId,
CreateTime: time.Now(),
PayURL: resData.PayUrl,
RoadUid: roadUid,
Params: response,
}, nil
}
// 线程池任务处理函数异步订单提交、ID绑定
func handleSendCardTask(task SendCardTask) {
ctx := context.Background()
roadUid := task.RoadUid
orderItem, err := globalInstanceImpl.getOrderFromPool(ctx, roadUid)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("获取订单失败", zap.Error(err), zap.String("roadUid", roadUid))
// 如果获取不到订单,再给订单加回去
err = globalInstanceImpl.redisClient.RPush(customerOrderPoolKey, task)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("添加订单失败", zap.Error(err), zap.String("roadUid", roadUid))
}
return
}
// 检查订单创建时间是否满足最小间隔要求
if time.Since(orderItem.CreateTime) < orderMinInterval {
time.Sleep(orderMinInterval - time.Since(orderItem.CreateTime))
}
// 提交卡密
req := httplib.NewBeegoRequestWithCtx(ctx, orderItem.PayURL, "POST").
SetTimeout(time.Second*30, time.Second*30).RetryDelay(time.Second * 3).
Retries(3)
parse, err := url.Parse(orderItem.PayURL)
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("解析支付URL失败", zap.Error(err))
return
}
params := map[string]any{
"order_no": parse.Query().Get("order_no"),
"card_no": task.CardInfo.CardNo,
"pass": task.CardInfo.Data,
}
for key, value := range params {
req.Param(key, convertor.ToString(value))
}
response, err := req.String()
if err != nil {
service.SolvePayFail(ctx, task.LocalOrderID, orderItem.OrderID, "提交订单失败")
otelTrace.Logger.WithContext(ctx).Error("提交卡密失败", zap.Error(err))
return
}
submitResp := struct {
Code int64 `json:"code"`
Msg string `json:"msg"`
}{
Code: 1,
Msg: "失败",
}
err = json.Unmarshal([]byte(response), &submitResp)
otelTrace.Logger.WithContext(ctx).Info("闪付请求返回数据", zap.String("response", response), zap.Any("submitResp", submitResp))
if err != nil {
service.SolvePayFail(ctx, task.LocalOrderID, orderItem.OrderID, "返回数据解析失败")
otelTrace.Logger.WithContext(ctx).Error("解析响应失败", zap.Error(err))
return
}
if strings.Contains(submitResp.Msg, "该卡余额不足") {
re := regexp.MustCompile(`余额:(\d+\.\d+)`)
match := re.FindStringSubmatch(submitResp.Msg)
if len(match) > 1 {
orderEntity := order.GetOrderByBankOrderId(ctx, task.LocalOrderID)
roadEntity := road.GetRoadInfoByRoadUid(ctx, orderEntity.RoadUid)
balance, _ := strconv.ParseFloat(match[1], 64)
if balance > 0 {
if _, err2 := client.SubmitOrder(ctx, &client.SubmitOrderReq{
OrderPeriod: 24,
NotifyUrl: "http://kami_shop:12305/shop/notify",
OrderPrice: strconv.FormatFloat(balance, 'f', 2, 64),
OrderNo: task.LocalOrderID,
ProductCode: roadEntity.ProductCode,
ExValue: orderEntity.ExValue,
Ip: utils.GenerateIpv4(),
PayKey: task.PayKey,
PaySecret: task.PaySecret,
}); err2 != nil {
otelTrace.Logger.WithContext(ctx).Error("闪付 提交订单失败", zap.Error(err2))
}
}
}
}
if submitResp.Code != 0 {
service.SolvePayFail(ctx, task.LocalOrderID, orderItem.OrderID, submitResp.Msg)
otelTrace.Logger.WithContext(ctx).Error("订单提交失败", zap.String("msg", submitResp.Msg))
return
}
// 绑定上游订单ID与本地订单ID只有成功订单才有绑定
bindKey := fmt.Sprintf("%s:%s", orderBindKey, orderItem.OrderID)
_ = globalInstanceImpl.redisClient.Set(bindKey, task.LocalOrderID, time.Hour)
}

View File

@@ -5,9 +5,10 @@ import (
"fmt"
"gateway/internal/otelTrace"
"github.com/duke-git/lancet/v2/cryptor"
"strings"
"testing"
"github.com/duke-git/lancet/v2/cryptor"
)
func TestStarSilenceImpl_PayNotify(t *testing.T) {

11
internal/utils/order.go Normal file
View File

@@ -0,0 +1,11 @@
package utils
import (
"fmt"
"time"
)
// GenerateOrderID 生成订单ID
func GenerateOrderID() string {
return fmt.Sprintf("%d%d", time.Now().UnixNano(), RandInt(1000, 9999))
}

10
internal/utils/random.go Normal file
View File

@@ -0,0 +1,10 @@
package utils
import (
"math/rand"
)
// RandInt 生成指定范围内的随机整数
func RandInt(min, max int) int {
return min + rand.Intn(max-min+1)
}

View File

@@ -39,9 +39,9 @@ func main() {
// }
// }()
go notify.CreateOrderNotifyConsumer(otelTrace.InitCtx)
go query.CreateSupplierOrderQueryCuConsumer(otelTrace.InitCtx)
// go query.CreatePayForQueryConsumer(otelTrace.InitCtx)
go service.OrderSettleInit(otelTrace.InitCtx)
go query.CreateSupplierOrderQueryCuConsumer(otelTrace.InitCtx)
otelTrace.Logger.WithContext(otelTrace.InitCtx).Info("gateway start")
web.Run()
}