feat(proxy): 添加代理池功能并集成到系统中

- 新增代理池配置和初始化逻辑
- 实现代理池核心功能,包括随机获取代理、测试代理有效性等
- 在支付请求中集成代理功能
- 添加代理池状态更新和统计功能
This commit is contained in:
danial
2025-04-12 00:24:47 +08:00
parent 363d7ee571
commit b70616c3c3
10 changed files with 321 additions and 3 deletions

View File

@@ -68,4 +68,7 @@ key = thisis32bitlongpassphraseimusing
iv = 1234567890123456
[selfThird]
notify_url = http://kami_gateway:12309/selfThird/notify
notify_url = http://kami_gateway:12309/selfThird/notify
[proxy]
proxies = []

1
go.mod
View File

@@ -34,6 +34,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.35.0
go.opentelemetry.io/otel/trace v1.35.0
go.uber.org/zap v1.27.0
gopkg.in/yaml.v2 v2.4.0
)
require (

1
go.sum
View File

@@ -198,6 +198,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -26,3 +26,10 @@ func GetMQAddress() string {
return net.JoinHostPort(mqHost, mqPort)
}
func GetProxyConfig() []string {
if proxy, err := web.AppConfig.Strings("proxy::proxies"); err == nil {
return proxy
}
return []string{}
}

17
internal/proxy/config.go Normal file
View File

@@ -0,0 +1,17 @@
package proxy
// ProxyConfig 代理配置
type ProxyConfig struct {
Proxies []string
TestURL string
Timeout int `yaml:"timeout"` // 超时时间(秒)
}
// DefaultConfig 默认配置
func DefaultConfig() *ProxyConfig {
return &ProxyConfig{
Proxies: []string{},
TestURL: "https://www.baidu.com",
Timeout: 5,
}
}

53
internal/proxy/example.go Normal file
View File

@@ -0,0 +1,53 @@
package proxy
import (
"fmt"
"time"
)
func Example() {
// 获取全局代理池实例
pool := GetGlobalProxyPool()
// 创建配置
config := &ProxyConfig{
Proxies: []string{
"http://proxy1.example.com:8080",
"http://proxy2.example.com:8080",
"http://proxy3.example.com:8080",
},
TestURL: "https://www.baidu.com",
Timeout: 5,
}
// 使用配置初始化代理池
pool.InitWithConfig(config)
// 启动定期更新代理状态的goroutine
go func() {
ticker := time.NewTicker(time.Duration(config.Timeout) * time.Second)
for range ticker.C {
pool.UpdateProxies()
}
}()
// 使用代理的示例
startTime := time.Now()
proxy, err := pool.GetRandomProxy()
if err != nil {
fmt.Printf("获取代理失败: %v\n", err)
return
}
// 模拟使用代理
time.Sleep(100 * time.Millisecond)
// 标记代理使用完成
pool.MarkProxyUsed(proxy, time.Since(startTime))
// 获取代理统计信息
stats := pool.GetProxyStats()
for _, stat := range stats {
fmt.Printf("代理统计: %+v\n", stat)
}
}

17
internal/proxy/init.go Normal file
View File

@@ -0,0 +1,17 @@
package proxy
import "gateway/internal/config"
// InitProxyPool 初始化代理池
func InitProxyPool() error {
// 获取全局代理池实例
pool := GetGlobalProxyPool()
// 初始化代理池
pool.InitWithConfig(&ProxyConfig{
Proxies: config.GetProxyConfig(),
TestURL: "https://www.baidu.com",
Timeout: 30,
})
return nil
}

View File

@@ -0,0 +1,199 @@
package proxy
import (
"context"
"fmt"
"net/http"
"net/url"
"sync"
"time"
)
// Proxy 表示一个代理服务器
type Proxy struct {
URL string
LastTest time.Time
IsValid bool
UseCount int64 // 使用次数
LastUseTime time.Time // 最后使用时间
TotalTime int64 // 总使用时间(毫秒)
}
// ProxyPool 代理池
type ProxyPool struct {
proxies []*Proxy
mu sync.RWMutex
config *ProxyConfig
}
var (
globalProxyPool *ProxyPool
once sync.Once
)
// GetGlobalProxyPool 获取全局代理池实例
func GetGlobalProxyPool() *ProxyPool {
once.Do(func() {
globalProxyPool = NewProxyPool()
})
return globalProxyPool
}
// NewProxyPool 创建一个新的代理池
func NewProxyPool() *ProxyPool {
return &ProxyPool{
proxies: make([]*Proxy, 0),
config: DefaultConfig(),
}
}
// InitWithConfig 使用配置初始化代理池
func (p *ProxyPool) InitWithConfig(config *ProxyConfig) {
p.mu.Lock()
defer p.mu.Unlock()
p.config = config
p.proxies = make([]*Proxy, 0, len(config.Proxies))
for _, proxyURL := range config.Proxies {
p.proxies = append(p.proxies, &Proxy{
URL: proxyURL,
LastTest: time.Now(),
IsValid: false,
UseCount: 0,
LastUseTime: time.Time{},
TotalTime: 0,
})
}
}
// AddProxy 添加代理到代理池
func (p *ProxyPool) AddProxy(proxyURL string) {
p.mu.Lock()
defer p.mu.Unlock()
p.proxies = append(p.proxies, &Proxy{
URL: proxyURL,
LastTest: time.Now(),
IsValid: false,
UseCount: 0,
LastUseTime: time.Time{},
TotalTime: 0,
})
}
// GetRandomProxy 随机获取一个可用的代理
func (p *ProxyPool) GetRandomProxy() (*Proxy, error) {
p.mu.RLock()
defer p.mu.RUnlock()
if len(p.proxies) == 0 {
return nil, fmt.Errorf("no proxies available")
}
// 过滤出有效的代理
validProxies := make([]*Proxy, 0)
for _, proxy := range p.proxies {
if proxy.IsValid && time.Since(proxy.LastTest) < time.Duration(p.config.Timeout)*time.Second {
validProxies = append(validProxies, proxy)
}
}
if len(validProxies) == 0 {
return nil, fmt.Errorf("no valid proxies available")
}
// 选择使用次数最少的代理
var selectedProxy *Proxy
minUseCount := int64(1 << 62)
for _, proxy := range validProxies {
if proxy.UseCount < minUseCount {
minUseCount = proxy.UseCount
selectedProxy = proxy
}
}
return selectedProxy, nil
}
// MarkProxyUsed 标记代理被使用
func (p *ProxyPool) MarkProxyUsed(proxy *Proxy, duration time.Duration) {
p.mu.Lock()
defer p.mu.Unlock()
proxy.UseCount++
proxy.LastUseTime = time.Now()
proxy.TotalTime += duration.Milliseconds()
}
// TestProxy 测试代理是否可用
func (p *ProxyPool) TestProxy(proxy *Proxy) error {
proxyURL, err := url.Parse(proxy.URL)
if err != nil {
return err
}
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyURL(proxyURL),
},
Timeout: time.Duration(p.config.Timeout) * time.Second,
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(p.config.Timeout)*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", p.config.TestURL, nil)
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("proxy test failed with status code: %d", resp.StatusCode)
}
proxy.LastTest = time.Now()
proxy.IsValid = true
return nil
}
// UpdateProxies 更新代理池中所有代理的状态
func (p *ProxyPool) UpdateProxies() {
p.mu.Lock()
defer p.mu.Unlock()
for _, proxy := range p.proxies {
if time.Since(proxy.LastTest) >= time.Duration(p.config.Timeout)*time.Second {
err := p.TestProxy(proxy)
if err != nil {
proxy.IsValid = false
}
}
}
}
// GetProxyStats 获取代理统计信息
func (p *ProxyPool) GetProxyStats() []map[string]interface{} {
p.mu.RLock()
defer p.mu.RUnlock()
stats := make([]map[string]interface{}, 0, len(p.proxies))
for _, proxy := range p.proxies {
stat := map[string]interface{}{
"url": proxy.URL,
"is_valid": proxy.IsValid,
"use_count": proxy.UseCount,
"last_use_time": proxy.LastUseTime,
"total_time_ms": proxy.TotalTime,
"last_test": proxy.LastTest,
}
stats = append(stats, stat)
}
return stats
}

View File

@@ -10,6 +10,7 @@ import (
"fmt"
"gateway/internal/config"
"gateway/internal/otelTrace"
"gateway/internal/proxy"
"github.com/forgoer/openssl"
@@ -90,7 +91,7 @@ func (c *HeepayImpl) SendCard(ctx context.Context, jsonStr string, cardInfo supp
"card_type": int64(10),
"bill_id": attach,
"bill_time": time.Now().Format("20060102150405"),
"card_data": "e193a5fb74d23dd8db4640de5d5f7340f44342553870589c2d8860c16f4b13af611d999d8aeef889",
"card_data": TripleDesEncrypt(fmt.Sprintf("%s,%s,%s", cardInfo.CardNo, cardInfo.Data, cardInfo.FaceType), gojson.Json(jsonStr).Get("3ds_key").Tostring()),
"pay_amt": int64(payAmt),
"notify_url": fmt.Sprintf("%s%s", config.GetConfig().GatewayAddr(), "/notify/Heepay"),
"client_ip": strings.ReplaceAll("127.0.0.1", ".", "_"),
@@ -124,6 +125,16 @@ func (c *HeepayImpl) SendCard(ctx context.Context, jsonStr string, cardInfo supp
for s, a := range params {
req.Param(s, convertor.ToString(a))
}
currentProxy, err := proxy.GetGlobalProxyPool().GetRandomProxy()
if err == nil && currentProxy != nil {
req.SetProxy(func(r *http.Request) (*url.URL, error) {
return url.Parse(currentProxy.URL)
})
} else {
otelTrace.Logger.WithContext(ctx).Error("获取代理失败")
}
response, err := req.String()
if err != nil {
otelTrace.Logger.WithContext(ctx).Error("请求失败:", zap.Error(err))
@@ -178,7 +189,7 @@ func (c *HeepayImpl) Scan(ctx context.Context, orderInfo order.OrderInfo, roadIn
scanData.OrderPrice = strconv.FormatFloat(orderInfo.OrderAmount, 'f', 2, 64)
scanData.ReturnData = response
scanData.Msg = msg
// scanData.Msg =
return scanData
}

View File

@@ -1,8 +1,10 @@
package main
import (
"gateway/internal/config"
_ "gateway/internal/models"
"gateway/internal/otelTrace"
"gateway/internal/proxy"
_ "gateway/internal/routers"
"gateway/internal/schema/query"
"gateway/internal/service"
@@ -23,6 +25,13 @@ func logRequest(ctx *context.Context) {
}
func main() {
config.GetMQAddress()
// 初始化代理池
if err := proxy.InitProxyPool(); err != nil {
log.Printf("初始化代理池失败: %v", err)
return
}
// cleanup1, cleanup2, cleanup3 := otelTrace.InitTracer()
// defer func() {
// if cleanup1 != nil {