更新代理池实现,优化代理管理逻辑

- 在 proxy_pool.py 中引入了代理池类型和工厂类,支持不同类型的代理池
- 修改 WalMartSpiderV3 类,使用工厂类获取代理池实例
- 在 config.py 中添加获取代理类型的方法
- 更新 .gitignore 文件,增加忽略项
- 优化 Dockerfile,简化项目复制和工作目录设置
This commit is contained in:
danial
2025-04-30 12:07:25 +08:00
parent 0fee89a585
commit 8a21c45d19
5 changed files with 321 additions and 101 deletions

10
.gitignore vendored
View File

@@ -1,2 +1,10 @@
/.vscode/
/.idea/
/.idea/
.DS_Store
/__pycache__/
*.pyc
*.pyo
*.pyd
*.pyw
*.pyz
*.pywz

View File

@@ -1,21 +1,18 @@
# 基于 Python 3.8.6 镜像
FROM python:3.8.6
# 设置工作目录
WORKDIR /app
# 复制项目
COPY . .
# 修改apt-get源地址为阿里云镜像
RUN echo "" > /etc/apt/sources.list && \
echo "deb http://mirrors.aliyun.com/debian buster main" >> /etc/apt/sources.list && \
echo "deb http://mirrors.aliyun.com/debian-security buster/updates main" >> /etc/apt/sources.list && \
echo "deb http://mirrors.aliyun.com/debian buster-updates main" >> /etc/apt/sources.list
# 复制项目
ADD . /app
# 设置工作目录
WORKDIR /app
# 暴露容器端口
EXPOSE 5007
# 安装cv2依赖和其他依赖
RUN apt-get update && apt-get install -y libgl1-mesa-glx libglib2.0-0 curl
@@ -29,7 +26,10 @@ ENV PROXY_PASS=""
# 添加健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 \
CMD curl -f http://localhost:5007/health || exit 1
CMD curl -f http://localhost:5007/health || exit 1
# 运行应用
# 暴露容器端口
EXPOSE 5007
CMD ["gunicorn", "-c", "gun.conf", "app:app"]

29
app.py
View File

@@ -10,9 +10,11 @@ import trace
from pathlib import Path
import PIL
from config import Config
import cv2
import requests
import traceback
from proxy_pool import ProxyPoolFactory
import numpy as np
@@ -299,15 +301,7 @@ class WalMartSpiderV3:
self.session = requests.Session()
# 初始化代理池
from proxy_pool import ProxyPool
current_os = platform.system()
logger.info(f"当前系统:{current_os}")
if current_os == "Linux":
redis_url = "redis://:jd2024@10.0.0.211:6379/0"
else:
redis_url = "redis://:jd2024@120.79.27.250:6379/0"
self.proxy_pool = ProxyPool(redis_url)
self.proxy_pool = ProxyPoolFactory.get_proxy_pool(Config.get_proxy_type())
self.proxies = {}
# 请求超时时间
@@ -414,7 +408,9 @@ class WalMartSpiderV3:
# 记录异常
add_span_attribute(span, "error", str(e))
span.record_exception(e)
# 切换代理
if self.proxies:
self.proxy_pool.mark_proxy_invalid(self.proxies["http"])
# 记录度量
if meter:
record_metric(
@@ -485,7 +481,9 @@ class WalMartSpiderV3:
# 记录异常
add_span_attribute(span, "error", str(e))
span.record_exception(e)
# 切换代理
if self.proxies:
self.proxy_pool.mark_proxy_invalid(self.proxies["http"])
# 记录度量
if meter:
record_metric(
@@ -575,7 +573,9 @@ class WalMartSpiderV3:
# 记录异常
add_span_attribute(span, "error", str(e))
span.record_exception(e)
# 切换代理
if self.proxies:
self.proxy_pool.mark_proxy_invalid(self.proxies["http"])
# 记录度量
if meter:
record_metric(
@@ -663,6 +663,9 @@ class WalMartSpiderV3:
return result
except Exception as e:
# 切换代理
if self.proxies:
self.proxy_pool.mark_proxy_invalid(self.proxies["http"])
# 记录异常
add_span_attribute(span, "error", str(e))
span.record_exception(e)
@@ -717,6 +720,8 @@ class WalMartSpiderV3:
# 记录异常
add_span_attribute(span, "error", str(e))
span.record_exception(e)
if self.proxies:
self.proxy_pool.mark_proxy_invalid(self.proxies["http"])
# 记录度量
if meter:

View File

@@ -1,17 +1,26 @@
import os
from proxy_pool import ProxyPoolType
class Config:
@staticmethod
def get_proxy_user() -> str:
env = os.getenv("PROXY_USER", "AZ7L3BH2")
env = os.getenv("PROXY_USER", "6CF4CD53")
if not env:
env = "AZ7L3BH2"
env = "6CF4CD53"
return env
@staticmethod
def get_proxy_pass() -> str:
env = os.getenv("PROXY_PASS", "EB4E494E55E8")
env = os.getenv("PROXY_PASS", "0E03825E822B")
if not env:
env = "EB4E494E55E8"
env = "0E03825E822B"
return env
@staticmethod
def get_proxy_type() -> ProxyPoolType:
env = os.getenv("PROXY_TYPE", "default")
if not env:
env = ProxyPoolType.DEFAULT
return ProxyPoolType(env)

View File

@@ -1,94 +1,55 @@
import logging
import threading
from typing import Optional, Dict
import time
from typing import Optional, Dict, Tuple
import urllib3
from logger import get_logger
from abc import ABC, abstractmethod
from enum import Enum
import requests
from config import Config
logger = logging.getLogger(__name__)
# 禁用 HTTPS 请求的警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = get_logger()
class ProxyPool:
def __init__(self, redis_url: str = None):
"""初始化代理池
Args:
redis_url: 已废弃参数,保留仅为兼容性
"""
self.order_proxy_map: Dict[str, str] = {}
class ProxyPoolType(Enum):
"""代理池类型枚举"""
DEFAULT = "default" # 默认代理池
EXPIRING = "expiring" # 带有效期的代理池
class BaseProxyPool(ABC):
"""代理池抽象基类"""
def __init__(self):
self.proxy_timeout = 3 # 代理超时时间,单位秒
self.test_url = "https://www.baidu.com"
self.max_retries = 3 # 最大重试次数
self.lock = threading.Lock() # 添加线程锁以确保并发安全
def get_proxy(self, order_id: str) -> Optional[str]:
"""获取指定订单的代理
Args:
order_id: 订单ID
Returns:
代理地址,格式为 http://user:pass@ip:port如果获取失败则返回None
"""
with self.lock: # 使用线程锁确保并发安全
# 检查是否已有分配的代理
if order_id in self.order_proxy_map:
proxy = self.order_proxy_map[order_id]
if self._validate_proxy_with_auth(proxy):
return proxy
@abstractmethod
def get_proxy(self, *args, **kwargs) -> Optional[str]:
"""获取代理的抽象方法"""
pass
# 获取新代理
try:
proxy = self._get_new_proxy()
self.order_proxy_map[order_id] = proxy
return proxy
except Exception as e:
logger.error(f"获取代理失败: {str(e)}")
return None
def release_proxy(self, order_id: str):
"""释放指定订单的代理
Args:
order_id: 订单ID
"""
with self.lock: # 使用线程锁确保并发安全
if order_id in self.order_proxy_map:
del self.order_proxy_map[order_id]
@abstractmethod
def release_proxy(self, *args, **kwargs):
"""释放代理的抽象方法"""
pass
@abstractmethod
def _get_new_proxy(self) -> Optional[str]:
"""获取新的代理最多尝试3次
Returns:
代理地址,格式为 http://user:pass@ip:port
Raises:
Exception: 连续3次获取代理失败时抛出异常
"""
max_attempts = 3 # 最大尝试次数
"""获取新的代理的抽象方法"""
pass
for attempt in range(max_attempts):
try:
res = requests.get(
f"https://overseas.proxy.qg.net/get?key={Config.get_proxy_user()}&num=1&area=&isp=&format=txt&seq=\n&distinct=false",
timeout=self.proxy_timeout,
)
ip = res.text.strip()
if ip and self._validate_proxy_with_auth(ip):
# 构建带认证信息的代理地址
proxyMeta = "http://%(user)s:%(pass)s@%(host)s" % {
"host": ip,
"user": Config.get_proxy_user(),
"pass": Config.get_proxy_pass(),
}
return proxyMeta
logger.warning(
f"获取代理失败或代理验证失败,尝试次数: {attempt + 1}/{max_attempts}"
)
except Exception as e:
logger.error(
f"获取代理出错: {str(e)},尝试次数: {attempt + 1}/{max_attempts}"
)
# 直接进行下一次尝试,不等待
# 所有尝试都失败,抛出异常
raise Exception(f"连续{max_attempts}次获取代理失败")
@abstractmethod
def mark_proxy_invalid(self, proxy: str):
"""标记代理为无效的抽象方法"""
pass
def _validate_proxy_with_auth(self, proxy: str) -> bool:
"""验证带认证信息的代理是否可用
@@ -104,7 +65,6 @@ class ProxyPool:
# 如果是纯IP地址添加认证信息
proxyMeta = "http://%(user)s:%(pass)s@%(host)s" % {
"host": proxy,
# 下值修改为订单中的用户名
"user": Config.get_proxy_user(),
"pass": Config.get_proxy_pass(),
}
@@ -130,6 +90,91 @@ class ProxyPool:
)
return False
class DefaultProxyPool(BaseProxyPool):
"""默认代理池实现"""
def __init__(self):
super().__init__()
self.order_proxy_map: Dict[str, str] = {}
self.proxy_api_url = "https://overseas.proxy.qg.net/get"
def get_proxy(self, order_id: str = "") -> Optional[str]:
"""获取指定订单的代理
Args:
order_id: 订单ID
Returns:
代理地址,格式为 http://user:pass@ip:port如果获取失败则返回None
"""
with self.lock: # 使用线程锁确保并发安全
# 检查是否已有分配的代理
if order_id in self.order_proxy_map:
proxy = self.order_proxy_map[order_id]
if self._validate_proxy_with_auth(proxy):
return proxy
# 获取新代理
try:
proxy = self._get_new_proxy()
if proxy is not None:
self.order_proxy_map[order_id] = proxy
return proxy
except Exception as e:
logger.error(f"获取代理失败: {str(e)}")
return None
def release_proxy(self, order_id: str):
"""释放指定订单的代理
Args:
order_id: 订单ID
"""
with self.lock: # 使用线程锁确保并发安全
if order_id in self.order_proxy_map:
del self.order_proxy_map[order_id]
def mark_proxy_invalid(self, proxy: str):
"""标记代理为无效
Args:
proxy: 要标记为无效的代理地址
"""
with self.lock:
if proxy in self.order_proxy_map:
del self.order_proxy_map[proxy]
def _get_new_proxy(self) -> Optional[str]:
"""获取新的代理最多尝试3次
Returns:
代理地址,格式为 http://user:pass@ip:port
Raises:
Exception: 连续3次获取代理失败时抛出异常
"""
max_attempts = 5 # 最大尝试次数
for attempt in range(max_attempts):
try:
res = requests.get(
f"{self.proxy_api_url}?key={Config.get_proxy_user()}&num=1&area=&isp=&format=txt&seq=\n&distinct=false",
timeout=self.proxy_timeout,
)
ip = res.text.strip()
if ip and self._validate_proxy_with_auth(ip):
# 构建带认证信息的代理地址
proxyMeta = "http://%(user)s:%(pass)s@%(host)s" % {
"host": ip,
"user": Config.get_proxy_user(),
"pass": Config.get_proxy_pass(),
}
return proxyMeta
logger.warning(
f"获取代理失败或代理验证失败,尝试次数: {attempt + 1}/{max_attempts}"
)
except Exception as e:
logger.error(
f"获取代理出错: {str(e)},尝试次数: {attempt + 1}/{max_attempts}"
)
# 所有尝试都失败,抛出异常
raise Exception(f"连续{max_attempts}次获取代理失败")
def get_all_proxies(self) -> Dict[str, str]:
"""获取所有订单的代理映射
Returns:
@@ -137,3 +182,156 @@ class ProxyPool:
"""
with self.lock: # 使用线程锁确保并发安全
return self.order_proxy_map.copy()
class ExpiringProxyPool(BaseProxyPool):
"""带有效期的代理池实现"""
def __init__(self, expire_time: int = 60):
super().__init__()
self.current_proxy: Optional[Tuple[str, float]] = None # 当前代理及其过期时间
self.expire_time = expire_time # 代理有效期
self.invalid_proxies: set = set() # 存储无效的代理
self.proxy_api_url = "https://share.proxy.qg.net/get"
def get_proxy(self) -> Optional[str]:
"""获取当前有效的代理
Returns:
代理地址,格式为 http://user:pass@ip:port如果获取失败则返回None
"""
with self.lock: # 使用线程锁确保并发安全
# 检查当前代理是否有效且未过期
if self.current_proxy:
proxy, expire_time = self.current_proxy
# 如果代理未过期且不在无效列表中,则返回
if time.time() < expire_time and proxy not in self.invalid_proxies and self._validate_proxy_with_auth(proxy):
return proxy
# 获取新代理
try:
proxy = self._get_new_proxy()
if proxy:
# 设置代理过期时间
expire_time = time.time() + self.expire_time
self.current_proxy = (proxy, expire_time)
return proxy
except Exception as e:
logger.error(f"获取代理失败: {str(e)}")
return None
return None
def release_proxy(self, *args, **kwargs):
"""释放代理(此实现不需要)"""
pass
def _get_new_proxy(self) -> Optional[str]:
"""获取新的代理最多尝试3次
Returns:
代理地址,格式为 http://user:pass@ip:port
Raises:
Exception: 连续3次获取代理失败时抛出异常
"""
max_attempts = 5 # 最大尝试次数
for attempt in range(max_attempts):
try:
res = requests.get(
self.proxy_api_url,
params={
"key": Config.get_proxy_user(),
"num": 1,
"area": "",
"isp": 0,
"format": "txt",
"seq": "\r\n",
"distinct": "false",
},
timeout=self.proxy_timeout,
)
ip = res.text.strip()
if ip and self._validate_proxy_with_auth(ip):
# 构建带认证信息的代理地址
proxyMeta = "http://%(user)s:%(pass)s@%(host)s" % {
"host": ip,
"user": Config.get_proxy_user(),
"pass": Config.get_proxy_pass(),
}
# 检查新获取的代理是否在无效列表中
if proxyMeta not in self.invalid_proxies:
return proxyMeta
logger.warning(f"获取的代理 {proxyMeta} 在无效列表中,继续尝试获取新代理")
logger.warning(
f"获取代理失败或代理验证失败,尝试次数: {attempt + 1}/{max_attempts}"
)
except Exception as e:
logger.error(
f"获取代理出错: {str(e)},尝试次数: {attempt + 1}/{max_attempts}"
)
# 所有尝试都失败,抛出异常
raise Exception(f"连续{max_attempts}次获取代理失败")
def mark_proxy_invalid(self, proxy: str):
"""标记代理为无效
Args:
proxy: 要标记为无效的代理地址
"""
with self.lock:
self.invalid_proxies.add(proxy)
# 如果当前代理被标记为无效,立即清除
if self.current_proxy and self.current_proxy[0] == proxy:
self.current_proxy = None
logger.info(f"代理 {proxy} 已被标记为无效")
def clear_invalid_proxies(self):
"""清除无效代理列表
用于定期清理无效代理列表,避免内存占用过大
"""
with self.lock:
self.invalid_proxies.clear()
logger.info("无效代理列表已清除")
def set_expire_time(self, seconds: int):
"""设置代理有效期
Args:
seconds: 代理有效期,单位秒
"""
with self.lock:
self.expire_time = seconds
class ProxyPoolFactory:
"""代理池工厂类"""
_instances: Dict[ProxyPoolType, BaseProxyPool] = {}
_lock = threading.Lock()
@classmethod
def get_proxy_pool(cls, pool_type: ProxyPoolType = ProxyPoolType.DEFAULT, **kwargs) -> BaseProxyPool:
"""获取代理池实例
Args:
pool_type: 代理池类型
**kwargs: 代理池初始化参数
Returns:
BaseProxyPool: 代理池实例
"""
with cls._lock:
if pool_type not in cls._instances:
if pool_type == ProxyPoolType.DEFAULT:
cls._instances[pool_type] = DefaultProxyPool()
elif pool_type == ProxyPoolType.EXPIRING:
expire_time = kwargs.get('expire_time', 60)
cls._instances[pool_type] = ExpiringProxyPool(expire_time=expire_time)
return cls._instances[pool_type]
# 使用示例
if __name__ == "__main__":
# 获取默认代理池
# default_pool = ProxyPoolFactory.get_proxy_pool()
# print(default_pool.get_proxy("test_order"))
# 获取带有效期的代理池
expiring_pool = ProxyPoolFactory.get_proxy_pool(
ProxyPoolType.EXPIRING,
expire_time=60
)
print(expiring_pool.get_proxy())