Files
kami_gateway/internal/utils/proxy_pool.go
danial d5d681ea52 fix(proxy): 修复代理地址换行符问题并优化代理池初始化
- 将代理URL中的换行符由\r\n修改为\n,避免解析错误
- 代理相关配置文件及Dockerfile中统一调整换行符格式
- flyfishv2卡片发送模块设置正确的User-Agent头部
- 使用strutil.SplitAndTrim代替strings.Split优化代理IP列表处理
- 修正全局代理池单例初始化方式,确保线程安全
- 调整main.go中包引入顺序,提升代码规范性
2025-12-07 23:47:16 +08:00

603 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package utils
import (
"context"
"fmt"
"io"
"maps"
"math/rand/v2"
"net/http"
"net/url"
"slices"
"strings"
"sync"
"time"
"gateway/internal/cache"
"gateway/internal/config"
"gateway/internal/otelTrace"
"github.com/duke-git/lancet/v2/netutil"
"github.com/duke-git/lancet/v2/slice"
"github.com/duke-git/lancet/v2/strutil"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
// ProxyStrategy 代理策略接口
type ProxyStrategy interface {
GetProxy(ctx context.Context, proxyRequest ProxyRequest) (string, error)
GetHTTPClient(ctx context.Context, proxyRequest ProxyRequest) (*http.Client, error)
}
// ProxyInfo 代理信息
type ProxyInfo struct {
proxy string
expireAt time.Time
authKey string
authPwd string
channel string // 添加通道字段
orderID string
lastUsedAt time.Time // 添加最后使用时间字段
usageCount int // 添加使用次数字段
}
// ProxyRequest 代理请求信息
type ProxyRequest struct {
OrderNo string
Channel string // 添加通道字段
OrderPerIP int
}
// DefaultProxyStrategy 默认代理策略
type DefaultProxyStrategy struct {
proxyURL string
authKey string
authPwd string
mu sync.RWMutex
current *ProxyInfo
}
// OrderBasedProxyStrategy 基于订单号和通道的代理策略
type OrderBasedProxyStrategy struct {
Id string
authKey string
proxyURL string
authPwd string
OrderPerIP int
mu sync.RWMutex
proxies map[string][]*ProxyInfo // key: channel_orderID
stopCh chan struct{} // 用于停止清理协程
}
// NewDefaultProxyStrategy 创建默认代理策略
func NewDefaultProxyStrategy(proxyURL, authKey, authPwd string) *DefaultProxyStrategy {
return &DefaultProxyStrategy{
proxyURL: proxyURL,
authKey: authKey,
authPwd: authPwd,
}
}
// NewOrderBasedProxyStrategy 创建基于订单号和通道的代理策略
func NewOrderBasedProxyStrategy(proxyURL, authKey, authPwd string) *OrderBasedProxyStrategy {
strategy := &OrderBasedProxyStrategy{
proxyURL: proxyURL,
authKey: authKey,
authPwd: authPwd,
proxies: make(map[string][]*ProxyInfo),
stopCh: make(chan struct{}),
}
// 启动清理协程
go strategy.startCleanupRoutine()
return strategy
}
// GetProxy 获取代理(默认策略)
func (p *DefaultProxyStrategy) GetProxy(ctx context.Context, _ ProxyRequest) (string, error) {
proxy, err := p.innerGetProxy(ctx, ProxyRequest{})
if err != nil {
return "", err
}
return fmt.Sprintf("http://%s:%s@%s", p.authKey, p.authPwd, proxy), nil
}
// GetProxy 获取代理(默认策略)
func (p *DefaultProxyStrategy) innerGetProxy(ctx context.Context, _ ProxyRequest) (string, error) {
p.mu.RLock()
if p.current != nil && time.Now().Before(p.current.expireAt) {
if err := p.checkProxyAvailable(ctx, p.current.proxy); err == nil {
proxy := p.current.proxy
p.mu.RUnlock()
otelTrace.Logger.WithContext(ctx).Info("使用缓存代理", zap.String("proxy", proxy))
return proxy, nil
}
}
p.mu.RUnlock()
return p.getNewProxy(ctx)
}
// GetProxy 获取代理(基于订单号和通道策略)
func (p *OrderBasedProxyStrategy) GetProxy(ctx context.Context, proxyRequest ProxyRequest) (string, error) {
proxy, err := p.getInnerProxy(ctx, proxyRequest)
if err != nil {
return "", err
}
return fmt.Sprintf("socks5://%s:%s@%s", p.authKey, p.authPwd, proxy), nil
}
// GetProxy 获取代理(基于订单号和通道策略)
func (p *OrderBasedProxyStrategy) getInnerProxy(ctx context.Context, proxyRequest ProxyRequest) (string, error) {
if proxyRequest.OrderPerIP == 0 {
proxyRequest.OrderPerIP = 1
}
// 生成代理缓存键channel_orderID
cacheKey := fmt.Sprintf("%s_%s", proxyRequest.Channel, proxyRequest.OrderNo)
p.mu.RLock()
if proxies, exists := p.proxies[cacheKey]; exists {
p.mu.RUnlock()
for _, proxy := range proxies {
if err := p.checkProxyAvailable(ctx, proxy.proxy); err == nil {
// 更新最后使用时间和使用次数
p.mu.Lock()
proxy.lastUsedAt = time.Now()
proxy.usageCount++
p.mu.Unlock()
otelTrace.Logger.WithContext(ctx).Info("使用订单缓存代理",
zap.String("proxy", proxy.proxy),
zap.String("orderID", proxyRequest.OrderNo),
zap.String("channel", proxyRequest.Channel),
zap.Int("usageCount", proxy.usageCount))
return proxy.proxy, nil
}
}
} else {
p.mu.RUnlock()
}
return p.getNewProxy(ctx, proxyRequest)
}
// getNewProxy 获取新代理(默认策略)
func (p *DefaultProxyStrategy) getNewProxy(ctx context.Context) (string, error) {
var lastErr error
for i := range 3 {
proxy, err := p.tryGetProxy(ctx)
if err == nil {
p.mu.Lock()
p.current = &ProxyInfo{
proxy: proxy,
expireAt: time.Now().Add(50 * time.Second),
authKey: p.authKey,
authPwd: p.authPwd,
}
p.mu.Unlock()
otelTrace.Logger.WithContext(ctx).Info("获取新代理", zap.String("proxy", proxy))
return proxy, nil
}
lastErr = err
otelTrace.Logger.WithContext(ctx).Warn("获取代理失败,准备重试",
zap.Int("重试次数", i+1),
zap.Error(err))
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(time.Second * time.Duration(i+1)):
continue
}
}
return "", fmt.Errorf("获取代理失败已重试3次: %v", lastErr)
}
// 查找使用次数少于指定次数的 ip跨通道查找
func (p *OrderBasedProxyStrategy) getUnderLimitedProxy(ctx context.Context, perIP int, excludeChannels []string) (string, error) {
var userCount []string
p.mu.RLock()
for _, proxies := range p.proxies {
for _, proxy := range proxies {
if proxy == nil {
continue
}
// 排除指定通道的代理
excluded := slices.Contains(excludeChannels, proxy.channel)
if !excluded {
userCount = append(userCount, proxy.proxy)
}
}
}
p.mu.RUnlock()
userCount = slice.Unique(userCount)
rand.Shuffle(len(userCount), func(i, j int) {
userCount[i], userCount[j] = userCount[j], userCount[i]
})
for _, proxy := range userCount {
if err := p.checkProxyAvailable(ctx, proxy); err == nil {
return proxy, nil
} else {
// 清理不可用的代理
func() {
p.mu.Lock()
defer p.mu.Unlock()
for index, infos := range p.proxies {
p.proxies[index] = slices.DeleteFunc(infos, func(info *ProxyInfo) bool {
return (info != nil && info.proxy == proxy) || info == nil
})
}
maps.DeleteFunc(p.proxies, func(key string, value []*ProxyInfo) bool {
return len(value) == 0
})
otelTrace.Logger.WithContext(ctx).Warn("代理IP不可用", zap.String("proxy", proxy), zap.Error(err))
}()
}
}
return "", fmt.Errorf("没有找到使用次数少于指定次数的 ip")
}
// getNewProxy 获取新代理(基于订单号和通道策略)
func (p *OrderBasedProxyStrategy) getNewProxy(ctx context.Context, proxyRequest ProxyRequest) (string, error) {
var lastErr error
// 如果订单 ip设置使用次数大于1则获取使用次数少于指定次数的 ip
if proxyRequest.OrderPerIP >= 1 {
// 排除当前通道,允许跨通道复用代理
excludeChannels := []string{proxyRequest.Channel}
proxy, err := p.getUnderLimitedProxy(ctx, proxyRequest.OrderPerIP, excludeChannels)
if err == nil {
cacheKey := fmt.Sprintf("%s_%s", proxyRequest.Channel, proxyRequest.OrderNo)
p.mu.Lock()
p.proxies[cacheKey] = append(p.proxies[cacheKey], &ProxyInfo{
proxy: proxy,
expireAt: time.Now().Add(50 * time.Second),
authKey: p.authKey,
authPwd: p.authPwd,
channel: proxyRequest.Channel,
orderID: proxyRequest.OrderNo,
lastUsedAt: time.Now(),
usageCount: 1,
})
p.mu.Unlock()
return proxy, nil
}
}
proxies := make([]string, 0)
var err error
for i := range 3 {
proxies, err = p.tryGetProxy(ctx)
if err != nil {
otelTrace.Logger.WithContext(ctx).Warn("获取代理失败,准备重试",
zap.Int("retryCount", i+1),
zap.Error(err))
time.Sleep(time.Second * 2)
lastErr = err
continue
}
for _, proxy := range proxies {
cacheKey := fmt.Sprintf("%s_%s", proxyRequest.Channel, proxyRequest.OrderNo)
p.mu.Lock()
proxyInfo := &ProxyInfo{
proxy: proxy,
expireAt: time.Now().Add(55 * time.Second),
authKey: p.authKey,
authPwd: p.authPwd,
channel: proxyRequest.Channel,
orderID: proxyRequest.OrderNo,
lastUsedAt: time.Now(),
usageCount: 1,
}
_, exists := p.proxies[cacheKey]
if exists {
p.proxies[cacheKey] = append(p.proxies[cacheKey], proxyInfo)
} else {
p.proxies[cacheKey] = []*ProxyInfo{proxyInfo}
}
p.mu.Unlock()
otelTrace.Logger.WithContext(ctx).Info("获取新代理",
zap.String("proxy", proxy),
zap.String("orderID", proxyRequest.OrderNo),
zap.String("channel", proxyRequest.Channel))
}
lastErr = err
break
}
for _, proxy := range proxies {
if err2 := p.checkProxyAvailable(ctx, proxy); err2 == nil {
return proxy, nil
}
}
return "", fmt.Errorf("获取代理失败已重试2次: %v", lastErr)
}
// GetHTTPClient 获取HTTP客户端默认策略
func (p *DefaultProxyStrategy) GetHTTPClient(ctx context.Context, _ ProxyRequest) (*http.Client, error) {
proxy, err := p.GetProxy(ctx, ProxyRequest{})
if err != nil {
return nil, err
}
return p.createHTTPClient(proxy)
}
// GetHTTPClient 获取HTTP客户端基于订单号和通道策略
func (p *OrderBasedProxyStrategy) GetHTTPClient(ctx context.Context, proxyRequest ProxyRequest) (*http.Client, error) {
proxy, err := p.GetProxy(ctx, proxyRequest)
if err != nil {
return nil, err
}
return p.createHTTPClient(proxy)
}
// createHTTPClient 创建HTTP客户端
func (p *DefaultProxyStrategy) createHTTPClient(proxy string) (*http.Client, error) {
proxyURL, err := url.Parse(proxy)
if err != nil {
return nil, fmt.Errorf("解析代理URL失败: %v", err)
}
transport := &http.Transport{
Proxy: http.ProxyURL(proxyURL),
}
return &http.Client{
Transport: transport,
Timeout: 30 * time.Second,
}, nil
}
// createHTTPClient 创建HTTP客户端
func (p *OrderBasedProxyStrategy) createHTTPClient(proxy string) (*http.Client, error) {
proxyURL, err := url.Parse(proxy)
if err != nil {
return nil, fmt.Errorf("解析代理URL失败: %v", err)
}
return &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyURL(proxyURL),
},
Timeout: 2 * time.Second,
}, nil
}
// tryGetProxy 尝试获取代理IP
func (p *DefaultProxyStrategy) tryGetProxy(ctx context.Context) (string, error) {
client := &http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport),
Timeout: 2 * time.Second,
}
encodedUrl, err := netutil.EncodeUrl(p.proxyURL)
if err != nil {
return "", fmt.Errorf("编码代理URL失败: %v", err)
}
req, err := http.NewRequestWithContext(ctx, "GET", encodedUrl, nil)
if err != nil {
return "", fmt.Errorf("创建请求失败: %v", err)
}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("请求代理服务失败: %v", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("读取响应失败: %v", err)
}
proxyIP := strings.TrimSpace(string(body))
otelTrace.Logger.WithContext(ctx).Info("获取代理IP", zap.String("proxyIP", proxyIP))
if proxyIP == "" {
return "", fmt.Errorf("获取代理IP失败")
}
if err = p.checkProxyAvailable(ctx, proxyIP); err != nil {
return "", fmt.Errorf("代理IP不可用: %v", err)
}
return proxyIP, nil
}
// tryGetProxy 尝试获取代理IP
func (p *OrderBasedProxyStrategy) tryGetProxy(ctx context.Context) ([]string, error) {
client := &http.Client{
Timeout: 5 * time.Second,
}
proxyURL, err := netutil.EncodeUrl(p.proxyURL)
if err != nil {
return []string{}, fmt.Errorf("解析代理URL失败: %v", err)
}
req, err := http.NewRequestWithContext(ctx, "GET", proxyURL, nil)
if err != nil {
return []string{}, fmt.Errorf("创建请求失败: %v", err)
}
resp, err := client.Do(req)
if err != nil {
return []string{}, fmt.Errorf("请求代理服务失败: %v", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return []string{}, fmt.Errorf("读取响应失败: %v", err)
}
proxyIP := strings.TrimSpace(string(body))
otelTrace.Logger.WithContext(ctx).Info("获取代理IP", zap.String("proxyIP", proxyIP))
if proxyIP == "" {
return []string{}, fmt.Errorf("获取代理IP失败")
}
proxyIPs := strutil.SplitAndTrim(proxyIP, "\n")
slice.ForEach(proxyIPs, func(index int, item string) {
proxyIPs[index] = strutil.Trim(item)
})
return proxyIPs, nil
}
// checkProxyAvailable 检查代理IP是否可用
func (p *DefaultProxyStrategy) checkProxyAvailable(ctx context.Context, proxyIP string) error {
proxyURL, err := url.Parse(fmt.Sprintf("socks5://%s:%s@%s", p.authKey, p.authPwd, proxyIP))
if err != nil {
return fmt.Errorf("解析代理URL失败: %v", err)
}
transport := &http.Transport{
Proxy: http.ProxyURL(proxyURL),
}
client := &http.Client{
Transport: transport,
Timeout: 2 * time.Second,
}
req, err := http.NewRequestWithContext(ctx, "GET", "https://www.baidu.com", nil)
if err != nil {
return fmt.Errorf("创建测试请求失败: %v", err)
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("代理连接测试失败: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("代理响应状态码异常: %d", resp.StatusCode)
}
return nil
}
// checkProxyAvailable 检查代理IP是否可用
func (p *OrderBasedProxyStrategy) checkProxyAvailable(ctx context.Context, proxyIP string) error {
proxyURL, err := url.Parse(fmt.Sprintf("socks5://%s:%s@%s", p.authKey, p.authPwd, proxyIP))
if err != nil {
return fmt.Errorf("解析代理URL失败: %v", err)
}
client := http.Client{
Transport: &http.Transport{
Proxy: http.ProxyURL(proxyURL),
},
Timeout: 2 * time.Second,
}
req, err := http.NewRequestWithContext(ctx, "GET", "https://www.baidu.com", nil)
if err != nil {
return fmt.Errorf("创建测试请求失败: %v", err)
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("代理连接测试失败: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("代理响应状态码异常: %d", resp.StatusCode)
}
return nil
}
// startCleanupRoutine 启动清理协程
func (p *OrderBasedProxyStrategy) startCleanupRoutine() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.cleanupUnusedProxies()
case <-p.stopCh:
return
}
}
}
// cleanupUnusedProxies 清理未使用的代理
func (p *OrderBasedProxyStrategy) cleanupUnusedProxies() {
p.mu.Lock()
defer p.mu.Unlock()
now := time.Now()
unusedThreshold := now.Add(-1 * time.Minute)
for cacheKey, proxies := range p.proxies {
for _, proxy := range proxies {
// 清理超过10分钟未使用的代理
if proxy.lastUsedAt.Before(unusedThreshold) {
delete(p.proxies, cacheKey)
otelTrace.Logger.WithContext(context.Background()).Info("清理未使用的代理",
zap.String("cacheKey", cacheKey),
zap.String("channel", proxy.channel),
zap.String("orderID", proxy.orderID),
zap.Time("lastUsedAt", proxy.lastUsedAt))
}
}
}
}
// Stop 停止清理协程
func (p *OrderBasedProxyStrategy) Stop() {
close(p.stopCh)
}
var OrderBasedProxyStrategyInstance *OrderBasedProxyStrategy
var DMProxyStrategyInstance *DMProxyStrategy
func StartProxyPool() {
proxyConfig := config.GetProxyInfo()
otelTrace.Logger.WithContext(context.Background()).Info("proxyConfig", zap.Any("proxyConfig", proxyConfig))
//// 初始化订单代理策略
OrderBasedProxyStrategyInstance = NewOrderBasedProxyStrategy(
proxyConfig.Url,
proxyConfig.AuthKey,
proxyConfig.AuthPwd,
)
// 初始化大漠代理策略
DMProxyStrategyInstance = NewDMProxyStrategy(
"http://need1.dmdaili.com:7771/dmgetip.asp?apikey=42bab0ac&pwd=1b567c2f286a08e391b5805565fa0882&getnum=20&httptype=1&geshi=&fenge=1&fengefu=&operate=all",
cache.GetRedisClient().Client,
)
}
func GetProxy(ctx context.Context, orderId string, channel string) (string, error) {
ctx, span := otelTrace.Span(ctx, "GetProxy", "GetProxy", trace.WithAttributes(
attribute.String("orderId", orderId),
attribute.String("channel", channel),
))
defer span.End()
proxyConfig := config.GetProxy()
if proxyConfig == "dm" {
return DMProxyStrategyInstance.GetProxy(ctx, ProxyRequest{
OrderNo: orderId,
Channel: channel,
OrderPerIP: 1,
})
}
return OrderBasedProxyStrategyInstance.GetProxy(ctx, ProxyRequest{
OrderNo: orderId,
Channel: channel,
OrderPerIP: 1,
})
}