mirror of
https://git.oceanpay.cc/danial/kami_scripts.git
synced 2025-12-18 22:13:23 +00:00
feat(order): 优化订单处理逻辑并添加新功能
- 更新 .gitignore 文件,添加日志和IDE相关目录 - 修改 config.yaml,更新提交URL和商户配置- 在 interfaces.go 中添加新方法并修改现有方法 - 优化 logger_adapter.go 中的日志记录功能 - 调整 main.go 中的定时任务间隔 - 在 order.go 中实现新的 FindRandomFailedOrders 方法 - 更新 order_service.go,添加 CSV 文件处理逻辑 - 新增 road.go 文件,实现 FindRoadByRoadUid 方法 - 修改 submit_order.go,更新订单提交逻辑
This commit is contained in:
8
.idea/.gitignore
generated
vendored
Normal file
8
.idea/.gitignore
generated
vendored
Normal file
@@ -0,0 +1,8 @@
|
||||
# Default ignored files
|
||||
/shelf/
|
||||
/workspace.xml
|
||||
# Editor-based HTTP Client requests
|
||||
/httpRequests/
|
||||
# Datasource local storage ignored files
|
||||
/dataSources/
|
||||
/dataSources.local.xml
|
||||
3
order/.gitignore
vendored
3
order/.gitignore
vendored
@@ -0,0 +1,3 @@
|
||||
/logs/*
|
||||
/.vscode/
|
||||
/.idea/
|
||||
@@ -5,21 +5,16 @@ database:
|
||||
password: Woaizixkie!123
|
||||
dbname: kami
|
||||
|
||||
submit_url: "http://127.0.0.1:8080/api/v1/submit"
|
||||
submit_url: "https://gateway.kkknametrans.buzz"
|
||||
|
||||
merchants:
|
||||
- name: "商户1"
|
||||
merchant_uid: "key1"
|
||||
road_uid: "road1"
|
||||
rate: 0.1 # 发送比例,10%
|
||||
- id: "M002"
|
||||
name: "商户2"
|
||||
merchant_uid: "key2"
|
||||
road_uid: "road2"
|
||||
rate: 0.1 # 发送比例,10%
|
||||
- name: "测试商户"
|
||||
merchant_uid: "8888c9kit6bimggos5kk0c8g"
|
||||
road_uid: "4444d10lepeqgjec73d5kcfg"
|
||||
rate: 0.01 # 发送比例,10%
|
||||
|
||||
schedule:
|
||||
interval: 60 # 运行间隔(秒)
|
||||
interval: 5 # 运行间隔(秒)
|
||||
|
||||
logging:
|
||||
level: "info"
|
||||
|
||||
@@ -22,7 +22,7 @@ func NewMockOrderSender(logger interfaces.Logger) interfaces.OrderSender {
|
||||
return &MockOrderSender{logger: logger}
|
||||
}
|
||||
|
||||
func (s *MockOrderSender) Send(ctx context.Context, order *model.OrderInfo, merchant *model.MerchantInfo, submitURL string) error {
|
||||
func (s *MockOrderSender) Send(ctx context.Context, order *model.OrderInfo, merchant *model.MerchantInfo, roadInfo *model.RoadInfo, submitURL string) error {
|
||||
// 这里实现实际的订单发送逻辑
|
||||
s.logger.Info("发送订单",
|
||||
"order_no", order.BankOrderID,
|
||||
@@ -33,10 +33,10 @@ func (s *MockOrderSender) Send(ctx context.Context, order *model.OrderInfo, merc
|
||||
NotifyUrl: "https://www.baidu.com",
|
||||
OrderPrice: strconv.FormatFloat(order.OrderAmount, 'f', -1, 64),
|
||||
OrderNo: order.BankOrderID,
|
||||
ProductCode: order.PayProductCode,
|
||||
ProductCode: roadInfo.ProductCode,
|
||||
ExValue: order.ExValue,
|
||||
Ip: "127.0.0.1",
|
||||
PayKey: merchant.MerchantUID,
|
||||
PayKey: merchant.MerchantKey,
|
||||
PaySecret: merchant.MerchantSecret,
|
||||
}, submitURL); err != nil {
|
||||
s.logger.Error("提交订单失败", "error", err)
|
||||
@@ -46,7 +46,8 @@ func (s *MockOrderSender) Send(ctx context.Context, order *model.OrderInfo, merc
|
||||
}
|
||||
|
||||
func submitOrder(ctx context.Context, input *SubmitOrder, submitURL string) (*SubmitOrderResponse, error) {
|
||||
req := httplib.NewBeegoRequestWithCtx(ctx, fmt.Sprintf("%s/gateway/scan", submitURL), "POST").SetTimeout(30*time.Second, 30*time.Second).Retries(3).RetryDelay(time.Second * 3)
|
||||
req := httplib.NewBeegoRequestWithCtx(ctx, fmt.Sprintf("%s/gateway/scan", submitURL), "POST").
|
||||
SetTimeout(30*time.Second, 30*time.Second).Retries(3).RetryDelay(time.Second * 3)
|
||||
|
||||
params := input.ToStrMap()
|
||||
params["sign"] = utils.GetMD5SignMF(params, input.PaySecret)
|
||||
|
||||
@@ -18,15 +18,17 @@ type OrderRepository interface {
|
||||
|
||||
// FindOrderByBankOrderID 根据银行订单ID查询订单
|
||||
FindOrderByBankOrderID(ctx context.Context, bankOrderID string) (model.OrderInfo, error)
|
||||
// 寻找失败的订单
|
||||
FindRandomFailedOrders(ctx context.Context) (*model.OrderInfo, error)
|
||||
// 寻找 merchant
|
||||
// FindRandomFailedOrders 寻找失败的订单
|
||||
FindRandomFailedOrders(ctx context.Context, roadUid string) (*model.OrderInfo, error)
|
||||
// FindMerchantByRoadID 寻找 merchant
|
||||
FindMerchantByRoadID(ctx context.Context, merchantUid string) (*model.MerchantInfo, error)
|
||||
// FindRoadByRoadUid 根据 roadUid 查找对应的通道信息
|
||||
FindRoadByRoadUid(ctx context.Context, roadUid string) (*model.RoadInfo, error)
|
||||
}
|
||||
|
||||
// OrderSender 订单发送接口
|
||||
type OrderSender interface {
|
||||
Send(ctx context.Context, order *model.OrderInfo, merchant *model.MerchantInfo, submitURL string) error
|
||||
Send(ctx context.Context, order *model.OrderInfo, merchant *model.MerchantInfo, roadInfo *model.RoadInfo, submitURL string) error
|
||||
}
|
||||
|
||||
// Logger 日志接口
|
||||
|
||||
@@ -48,8 +48,10 @@ func (r *OrderRepositoryImpl) FindOrdersByRoadIdAndAboveBankOrderId(ctx context.
|
||||
return orderInfo, nil
|
||||
}
|
||||
|
||||
func (r *OrderRepositoryImpl) FindRandomFailedOrders(ctx context.Context) (*model.OrderInfo, error) {
|
||||
orderInfo, err := query.Use(r.db).OrderInfo.WithContext(ctx).Where(query.Use(r.db).OrderInfo.Status.Eq("failed")).Order(field.Func.Rand()).First()
|
||||
func (r *OrderRepositoryImpl) FindRandomFailedOrders(ctx context.Context, roadUid string) (*model.OrderInfo, error) {
|
||||
orderInfo, err := query.Use(r.db).OrderInfo.WithContext(ctx).Where(query.Use(r.db).OrderInfo.Status.Eq("fail"),
|
||||
query.Use(r.db).OrderInfo.RoadUID.Eq(roadUid),
|
||||
).Order(field.Func.Rand()).First()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
12
order/internal/repositories/road.go
Normal file
12
order/internal/repositories/road.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
"order/internal/model"
|
||||
"order/internal/query"
|
||||
)
|
||||
|
||||
func (r *OrderRepositoryImpl) FindRoadByRoadUid(ctx context.Context, roadUid string) (*model.RoadInfo, error) {
|
||||
roadInfo, err := query.Use(r.db).RoadInfo.WithContext(ctx).Where(query.Use(r.db).RoadInfo.RoadUID.Eq(roadUid)).Take()
|
||||
return roadInfo, err
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package services
|
||||
import (
|
||||
"context"
|
||||
"encoding/csv"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"order/internal/config"
|
||||
@@ -37,6 +38,19 @@ func NewOrderService(
|
||||
|
||||
// ProcessOrders 处理订单
|
||||
func (s *OrderServiceImpl) ProcessOrders(ctx context.Context) error {
|
||||
// 判断这个文件有没有,如果有,打开,没有就创建一个
|
||||
filePath := fmt.Sprintf("order.csv")
|
||||
if _, err := os.Stat(filePath); os.IsNotExist(err) {
|
||||
_, _ = os.Create(filePath)
|
||||
}
|
||||
// 从 csv 中读取最后一个匹配的 road_uid 和 bank_order_id
|
||||
csvFile, err := os.OpenFile("order.csv", os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
|
||||
if err != nil {
|
||||
s.logger.Error("打开文件失败", "error", err)
|
||||
return err
|
||||
}
|
||||
defer csvFile.Close()
|
||||
|
||||
for _, merchant := range s.config.Merchants {
|
||||
s.logger.Info("开始处理商户", "merchant", merchant)
|
||||
merchantInfo, err := s.repo.FindMerchantByRoadID(ctx, merchant.MerchantUid)
|
||||
@@ -44,13 +58,12 @@ func (s *OrderServiceImpl) ProcessOrders(ctx context.Context) error {
|
||||
s.logger.Error("获取商户失败", "error", err)
|
||||
continue
|
||||
}
|
||||
// 从 csv 中读取最后一个匹配的 road_uid 和 bank_order_id
|
||||
csvFile, err := os.Open("order.csv")
|
||||
if err != nil {
|
||||
s.logger.Error("打开文件失败", "error", err)
|
||||
roadInfo, err := s.repo.FindRoadByRoadUid(ctx, merchant.RoadUid)
|
||||
if err != nil || pointer.IsNil(roadInfo) {
|
||||
s.logger.Error("获取路经失败", "error", err)
|
||||
continue
|
||||
}
|
||||
defer csvFile.Close()
|
||||
|
||||
csvReader := csv.NewReader(csvFile)
|
||||
// 读取所有数据
|
||||
records, err := csvReader.ReadAll()
|
||||
@@ -74,22 +87,22 @@ func (s *OrderServiceImpl) ProcessOrders(ctx context.Context) error {
|
||||
// 判断有多少个订单
|
||||
totalOrders := len(orders)
|
||||
needProcessCount := int(100 * merchant.Rate)
|
||||
if totalOrders <= 100 || needProcessCount <= 0 {
|
||||
if totalOrders <= 10 || needProcessCount <= 0 {
|
||||
continue
|
||||
}
|
||||
for range needProcessCount {
|
||||
order, err := s.repo.FindRandomFailedOrders(ctx)
|
||||
order, err := s.repo.FindRandomFailedOrders(ctx, merchant.RoadUid)
|
||||
if err != nil || pointer.IsNil(order) {
|
||||
s.logger.Error("获取订单失败", "error", err)
|
||||
continue
|
||||
}
|
||||
if err := s.sender.Send(ctx, order, merchantInfo, s.config.SubmitURL); err != nil {
|
||||
if err := s.sender.Send(ctx, order, merchantInfo, roadInfo, s.config.SubmitURL); err != nil {
|
||||
s.logger.Error("发送订单失败", "error", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
csvWriter := csv.NewWriter(csvFile)
|
||||
if err := csvWriter.Write([]string{merchant.RoadUid, orders[len(orders)-1].BankOrderID}); err != nil {
|
||||
if err = csvWriter.Write([]string{merchant.RoadUid, orders[len(orders)-1].BankOrderID}); err != nil {
|
||||
s.logger.Error("写入文件失败", "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ func (l *ZapLoggerAdapter) Info(msg string, fields ...any) {
|
||||
}
|
||||
|
||||
// Error 实现 Logger 接口
|
||||
func (l *ZapLoggerAdapter) Error(msg string, fields ...interface{}) {
|
||||
func (l *ZapLoggerAdapter) Error(msg string, fields ...any) {
|
||||
l.logger.Error(msg, convertFields(fields)...)
|
||||
}
|
||||
|
||||
|
||||
@@ -80,7 +80,7 @@ func main() {
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
// 启动定时任务
|
||||
ticker := time.NewTicker(time.Duration(cfg.Schedule.Interval) / 12 * time.Second)
|
||||
ticker := time.NewTicker(time.Duration(cfg.Schedule.Interval) * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
go func() {
|
||||
|
||||
Reference in New Issue
Block a user