refactor(proxy): 优化基于订单号的代理池管理

- 将代理缓存结构由map[string][]*ProxyInfo改为map[string]*ProxyInfo,简化数据结构
- 统一缓存键格式为channel:orderID,避免混淆和错误
- 优化跨通道代理复用逻辑,排除指定通道代理并筛选使用次数合适的代理
- 实现清理不可用代理功能,及时从缓存和代理列表中剔除失效代理
- 优化代理使用记录,更新最后使用时间和使用次数
- 精简清理协程逻辑,删除过期代理,避免内存泄漏
- 修正测试用例,增加更多通道模拟,验证代理获取与使用状况
This commit is contained in:
danial
2025-12-09 02:38:38 +08:00
parent 7bfb3195e1
commit 4fac3e2336
2 changed files with 82 additions and 95 deletions

View File

@@ -7,10 +7,10 @@ import (
"github.com/dubonzi/otelresty"
"github.com/go-resty/resty/v2"
"maps"
"math/rand/v2"
"net/http"
"net/url"
"slices"
"strings"
"sync"
"time"
@@ -61,14 +61,15 @@ type DefaultProxyStrategy struct {
// OrderBasedProxyStrategy 基于订单号和通道的代理策略
type OrderBasedProxyStrategy struct {
Id string
authKey string
proxyURL string
authPwd string
OrderPerIP int
mu sync.RWMutex
proxies map[string][]*ProxyInfo // key: channel_orderID
stopCh chan struct{} // 用于停止清理协程
Id string
authKey string
proxyURL string
authPwd string
OrderPerIP int
mu sync.RWMutex
totalProxies []*ProxyInfo
proxies map[string]*ProxyInfo // key: channel_orderID
stopCh chan struct{} // 用于停止清理协程
}
// NewDefaultProxyStrategy 创建默认代理策略
@@ -83,11 +84,12 @@ func NewDefaultProxyStrategy(proxyURL, authKey, authPwd string) *DefaultProxyStr
// NewOrderBasedProxyStrategy 创建基于订单号和通道的代理策略
func NewOrderBasedProxyStrategy(proxyURL, authKey, authPwd string) *OrderBasedProxyStrategy {
strategy := &OrderBasedProxyStrategy{
proxyURL: proxyURL,
authKey: authKey,
authPwd: authPwd,
proxies: make(map[string][]*ProxyInfo),
stopCh: make(chan struct{}),
proxyURL: proxyURL,
authKey: authKey,
authPwd: authPwd,
totalProxies: make([]*ProxyInfo, 0),
proxies: make(map[string]*ProxyInfo),
stopCh: make(chan struct{}),
}
// 启动清理协程
@@ -134,27 +136,26 @@ func (p *OrderBasedProxyStrategy) getInnerProxy(ctx context.Context, proxyReques
if proxyRequest.OrderPerIP == 0 {
proxyRequest.OrderPerIP = 1
}
// 生成代理缓存键channel_orderID
cacheKey := fmt.Sprintf("%s_%s", proxyRequest.Channel, proxyRequest.OrderNo)
p.mu.RLock()
if proxies, exists := p.proxies[cacheKey]; exists {
p.mu.RUnlock()
for _, proxy := range proxies {
if err := p.checkProxyAvailable(ctx, proxy.proxy); err == nil {
// 更新最后使用时间和使用次数
p.mu.Lock()
proxy.lastUsedAt = time.Now()
proxy.usageCount++
p.mu.Unlock()
// 生成代理缓存键channel_orderID
cacheKey := fmt.Sprintf("%s:%s", proxyRequest.Channel, proxyRequest.OrderNo)
otelTrace.Logger.WithContext(ctx).Info("使用订单缓存代理",
zap.String("proxy", proxy.proxy),
zap.String("orderID", proxyRequest.OrderNo),
zap.String("channel", proxyRequest.Channel),
zap.Int("usageCount", proxy.usageCount))
return proxy.proxy, nil
}
if proxy, exists := p.proxies[cacheKey]; exists {
p.mu.RUnlock()
if err := p.checkProxyAvailable(ctx, proxy.proxy); err == nil {
// 更新最后使用时间和使用次数
p.mu.Lock()
proxy.lastUsedAt = time.Now()
proxy.usageCount++
p.mu.Unlock()
otelTrace.Logger.WithContext(ctx).Info("使用订单缓存代理",
zap.String("proxy", proxy.proxy),
zap.String("orderID", proxyRequest.OrderNo),
zap.String("channel", proxyRequest.Channel),
zap.Int("usageCount", proxy.usageCount))
return proxy.proxy, nil
}
} else {
p.mu.RUnlock()
@@ -195,43 +196,39 @@ func (p *DefaultProxyStrategy) getNewProxy(ctx context.Context) (string, error)
}
// 查找使用次数少于指定次数的 ip跨通道查找
func (p *OrderBasedProxyStrategy) getUnderLimitedProxy(ctx context.Context, perIP int, excludeChannels []string) (string, error) {
var userCount []string
func (p *OrderBasedProxyStrategy) getUnderLimitedProxy(ctx context.Context, perIP int, excludeChannels string) (string, error) {
p.mu.RLock()
for _, proxies := range p.proxies {
for _, proxy := range proxies {
if proxy == nil {
continue
}
// 排除指定通道的代理
excluded := slices.Contains(excludeChannels, proxy.channel)
if !excluded {
userCount = append(userCount, proxy.proxy)
}
//需要找到指定通道所有的代理
counts := map[*ProxyInfo]int{}
for channel, info := range p.proxies {
if strings.Contains(channel, excludeChannels) && info.usageCount <= perIP {
counts[info]++
}
}
p.mu.RUnlock()
userCount = slice.Unique(userCount)
rand.Shuffle(len(userCount), func(i, j int) {
userCount[i], userCount[j] = userCount[j], userCount[i]
proxies := slice.Filter(p.totalProxies, func(index int, item *ProxyInfo) bool {
for info := range counts {
if info.proxy == item.proxy {
return false
}
}
return true
})
for _, proxy := range userCount {
if err := p.checkProxyAvailable(ctx, proxy); err == nil {
return proxy, nil
p.mu.RUnlock()
for _, proxy := range proxies {
if err := p.checkProxyAvailable(ctx, proxy.proxy); err == nil {
return proxy.proxy, nil
} else {
// 清理不可用的代理
func() {
p.mu.Lock()
defer p.mu.Unlock()
for index, infos := range p.proxies {
p.proxies[index] = slices.DeleteFunc(infos, func(info *ProxyInfo) bool {
return (info != nil && info.proxy == proxy) || info == nil
})
}
maps.DeleteFunc(p.proxies, func(key string, value []*ProxyInfo) bool {
return len(value) == 0
maps.DeleteFunc(p.proxies, func(key string, value *ProxyInfo) bool {
return value.proxy == proxy.proxy
})
otelTrace.Logger.WithContext(ctx).Warn("代理IP不可用", zap.String("proxy", proxy), zap.Error(err))
p.totalProxies = slices.DeleteFunc(p.totalProxies, func(info *ProxyInfo) bool {
return info.proxy == proxy.proxy
})
otelTrace.Logger.WithContext(ctx).Warn("代理IP不可用", zap.String("proxy", proxy.proxy), zap.Error(err))
}()
}
}
@@ -245,12 +242,12 @@ func (p *OrderBasedProxyStrategy) getNewProxy(ctx context.Context, proxyRequest
// 如果订单 ip设置使用次数大于1则获取使用次数少于指定次数的 ip
if proxyRequest.OrderPerIP >= 1 {
// 排除当前通道,允许跨通道复用代理
excludeChannels := []string{proxyRequest.Channel}
excludeChannels := proxyRequest.Channel
proxy, err := p.getUnderLimitedProxy(ctx, proxyRequest.OrderPerIP, excludeChannels)
if err == nil {
cacheKey := fmt.Sprintf("%s_%s", proxyRequest.Channel, proxyRequest.OrderNo)
cacheKey := fmt.Sprintf("%s:%s", proxyRequest.Channel, proxyRequest.OrderNo)
p.mu.Lock()
p.proxies[cacheKey] = append(p.proxies[cacheKey], &ProxyInfo{
p.proxies[cacheKey] = &ProxyInfo{
proxy: proxy,
expireAt: time.Now().Add(50 * time.Second),
authKey: p.authKey,
@@ -259,28 +256,27 @@ func (p *OrderBasedProxyStrategy) getNewProxy(ctx context.Context, proxyRequest
orderID: proxyRequest.OrderNo,
lastUsedAt: time.Now(),
usageCount: 1,
})
}
p.mu.Unlock()
return proxy, nil
}
}
proxies := make([]string, 0)
tmpProxies := make([]*ProxyInfo, 0)
var err error
for i := range 3 {
proxies, err = p.tryGetProxy(ctx)
if err != nil {
proxies, err2 := p.tryGetProxy(ctx)
if err2 != nil {
otelTrace.Logger.WithContext(ctx).Warn("获取代理失败,准备重试",
zap.Int("retryCount", i+1),
zap.Error(err))
zap.Error(err2))
time.Sleep(time.Second * 2)
lastErr = err
lastErr = err2
continue
}
for _, proxy := range proxies {
cacheKey := fmt.Sprintf("%s_%s", proxyRequest.Channel, proxyRequest.OrderNo)
p.mu.Lock()
proxyInfo := &ProxyInfo{
proxy: proxy,
@@ -292,12 +288,7 @@ func (p *OrderBasedProxyStrategy) getNewProxy(ctx context.Context, proxyRequest
lastUsedAt: time.Now(),
usageCount: 1,
}
_, exists := p.proxies[cacheKey]
if exists {
p.proxies[cacheKey] = append(p.proxies[cacheKey], proxyInfo)
} else {
p.proxies[cacheKey] = []*ProxyInfo{proxyInfo}
}
tmpProxies = append(tmpProxies, proxyInfo)
p.mu.Unlock()
otelTrace.Logger.WithContext(ctx).Info("获取新代理",
@@ -309,9 +300,13 @@ func (p *OrderBasedProxyStrategy) getNewProxy(ctx context.Context, proxyRequest
break
}
for _, proxy := range proxies {
if err2 := p.checkProxyAvailable(ctx, proxy); err2 == nil {
return proxy, nil
p.totalProxies = append(p.totalProxies, tmpProxies...)
for _, proxy := range tmpProxies {
if err2 := p.checkProxyAvailable(ctx, proxy.proxy); err2 == nil {
cacheKey := fmt.Sprintf("%s:%s", proxyRequest.Channel, proxyRequest.OrderNo)
p.proxies[cacheKey] = proxy
return proxy.proxy, nil
}
}
@@ -446,21 +441,13 @@ func (p *OrderBasedProxyStrategy) cleanupUnusedProxies() {
defer p.mu.Unlock()
now := time.Now()
unusedThreshold := now.Add(-1 * time.Minute)
maps.DeleteFunc(p.proxies, func(s string, info *ProxyInfo) bool {
return info.expireAt.Before(now)
})
for cacheKey, proxies := range p.proxies {
for _, proxy := range proxies {
// 清理超过10分钟未使用的代理
if proxy.lastUsedAt.Before(unusedThreshold) {
delete(p.proxies, cacheKey)
otelTrace.Logger.WithContext(context.Background()).Info("清理未使用的代理",
zap.String("cacheKey", cacheKey),
zap.String("channel", proxy.channel),
zap.String("orderID", proxy.orderID),
zap.Time("lastUsedAt", proxy.lastUsedAt))
}
}
}
slices.DeleteFunc(p.totalProxies, func(info *ProxyInfo) bool {
return info.expireAt.Before(now)
})
}
// Stop 停止清理协程

View File

@@ -9,10 +9,10 @@ import (
func TestGetProxy(t *testing.T) {
cache.Start()
StartProxyPool()
channel := []string{"channel_four", "channel_five", "channel_six"}
channel := []string{"channel_four", "channel_five", "channel_six", "channel_seven", "channel_eight", "channel_nine", "channel_ten", "channel_eleven", "channel_twelve", "channel_thirteen", "channel_fourteen", "channel_fifteen", "channel_sixteen", "channel_seventeen", "channel_eighteen", "channel_nineteen", "channel_twenty"}
for _, c := range channel {
proxy, _ := GetProxy(t.Context(), "2434534543", c)
proxy, _ := GetProxy(t.Context(), GenerateId(), c)
resp, err := resty.New().SetProxy(proxy).R().Get("https://www.qq.com")
t.Log(resp.StatusCode(), err)
t.Log(resp.StatusCode(), c, err)
}
}