Files
kami_walmart_slide/app.py
danial 4f2b08ffd1 重构代理池实现,优化模块导入和结构
- 在 app.py 中更新代理池工厂的导入路径
- 在 config.py 中修改代理池类型的导入路径
- 新增 enums.py 文件,定义代理池类型枚举
- 新增 proxy_pool.py 文件,包含代理池的基本实现和逻辑
2025-04-30 14:22:39 +08:00

1341 lines
52 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import json
import random
import re
import time
import datetime
import os
from pathlib import Path
import PIL
from config import Config
import cv2
import requests
import traceback
from proxy_pool.proxy_pool import ProxyPoolFactory
import numpy as np
from flask import Flask, request
from flask_cors import CORS
from logger import get_logger
from spiders import BalanceSpider
from telemetry import (
setup_telemetry,
get_tracer,
add_span_attribute,
add_span_event,
record_metric,
)
app = Flask(__name__)
# 跨域
CORS(app)
# 设置日志记录级别,可以根据需要调整
logger = get_logger()
# 初始化OpenTelemetry
otlp_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
enable_console_export = os.getenv("OTEL_CONSOLE_EXPORT", "False").lower() == "true"
tracer, meter = setup_telemetry(
enable_console_export=enable_console_export, otlp_endpoint=otlp_endpoint
)
@app.before_request
def log_request_info():
"""在请求处理前记录请求信息"""
# 跳过健康检查接口的日志记录
if request.path == "/health":
return
# 记录请求开始时间
request.start_time = time.time()
# 创建请求追踪span
with get_tracer().start_as_current_span("request.log") as span:
# 记录请求方法和URL
logger.info(f"请求开始: {request.method} {request.path}")
# 记录请求头信息
headers = dict(request.headers)
# 记录请求参数
params = {}
if request.args:
params["args"] = dict(request.args)
if request.form:
params["form"] = dict(request.form)
if request.json:
params["json"] = request.json
if params:
logger.info(f"请求参数: {params}")
@app.after_request
def log_response_info(response):
"""在请求处理后记录响应信息"""
# 跳过健康检查接口的日志记录
if request.path == "/health":
return response
# 创建响应追踪span
with get_tracer().start_as_current_span("response.log") as span:
# 记录响应状态码
logger.info(f"响应状态码: {response.status_code}")
# 记录响应头信息
logger.info(f"响应头: {dict(response.headers)}")
# 尝试记录响应内容如果是JSON
try:
if response.content_type == "application/json":
response_data = response.get_data(as_text=True)
logger.info(f"响应内容: {response_data}")
except Exception as e:
logger.error(f"记录响应内容时出错: {str(e)}")
# 记录请求处理时间
if hasattr(request, "start_time"):
elapsed = time.time() - request.start_time
logger.info(f"请求处理时间: {elapsed:.6f}")
logger.info(f"请求结束: {request.method} {request.path}")
return response
@app.route("/health", methods=["GET"])
def health_check():
"""健康检查端点用于Kubernetes的liveness和readiness探针"""
health_status = {
"status": "ok",
"timestamp": datetime.datetime.now().isoformat(),
"service": "walmart-card-service",
"components": {},
}
# 检查外部依赖状态
try:
# 简单检查网络连接
requests.get("https://www.baidu.com", timeout=5)
health_status["components"]["external_connectivity"] = {
"status": "ok",
"message": "外部网络连接正常",
}
except Exception as e:
health_status["components"]["external_connectivity"] = {
"status": "error",
"message": f"外部网络连接异常: {str(e)}",
}
health_status["status"] = "degraded"
status_code = 200 if health_status["status"] == "ok" else 503
return health_status, status_code
def imshow(img, winname="test", delay=0):
"""cv2展示图片"""
cv2.imshow(winname, img)
cv2.waitKey(delay)
cv2.destroyAllWindows()
def pil_to_cv2(img):
"""
pil转cv2图片
:param img: pil图像, <type 'PIL.JpegImagePlugin.JpegImageFile'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
img = cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)
return img
def bytes_to_cv2(img):
"""
二进制图片转cv2
:param img: 二进制图片数据, <type 'bytes'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
# 将图片字节码bytes, 转换成一维的numpy数组到缓存中
img_buffer_np = np.frombuffer(img, dtype=np.uint8)
# 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式
img_np = cv2.imdecode(img_buffer_np, 1)
logger.info(f"bytes_to_cv2: 转换二进制图片为cv2图像完成")
return img_np
def cv2_open(img, flag=None):
"""
统一输出图片格式为cv2图像, <type 'numpy.ndarray'>
:param img: <type 'bytes'/'numpy.ndarray'/'str'/'Path'/'PIL.JpegImagePlugin.JpegImageFile'>
:param flag: 颜色空间转换类型, default: None
eg: cv2.COLOR_BGR2GRAY灰度图
:return: cv2图像, <numpy.ndarray>
"""
if isinstance(img, bytes):
img = bytes_to_cv2(img)
elif isinstance(img, (str, Path)):
img = cv2.imread(str(img))
elif isinstance(img, np.ndarray):
img = img
elif isinstance(img, PIL.Image):
img = pil_to_cv2(img)
else:
raise ValueError(f"输入的图片类型无法解析: {type(img)}")
if flag is not None:
img = cv2.cvtColor(img, flag)
return img
def get_distances(bg, tp, im_show=False, save_path=None):
"""
:param bg: 背景图路径或Path对象或图片二进制
eg: 'assets/bg.jpg'
Path('assets/bg.jpg')
:param tp: 缺口图路径或Path对象或图片二进制
eg: 'assets/tp.jpg'
Path('assets/tp.jpg')
:param im_show: 是否显示结果, <type 'bool'>; default: False
:param save_path: 保存路径, <type 'str'/'Path'>; default: None
:return: 缺口位置
"""
# 创建追踪span
with get_tracer().start_as_current_span("get_distances") as span:
try:
add_span_attribute(span, "im_show", im_show)
if save_path:
add_span_attribute(span, "save_path", str(save_path))
bg = base64.b64decode(bg)
tp = base64.b64decode(tp)
# 读取图片
bg_img = cv2_open(bg)
tp_gray = cv2_open(tp, flag=cv2.COLOR_BGR2GRAY)
# 金字塔均值漂移
add_span_event(span, "start_image_processing")
bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50)
# 边缘检测
tp_gray = cv2.Canny(tp_gray, 255, 255)
bg_gray = cv2.Canny(bg_shift, 255, 255)
# 目标匹配
add_span_event(span, "start_template_matching")
result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED)
# 解析匹配结果
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
distance = max_loc[0]
add_span_attribute(span, "distance_result", distance)
if save_path or im_show:
# 需要绘制的方框高度和宽度
tp_height, tp_width = tp_gray.shape[:2]
# 矩形左上角点位置
x, y = max_loc
# 矩形右下角点位置
_x, _y = x + tp_width, y + tp_height
# 绘制矩形
bg_img = cv2_open(bg)
cv2.rectangle(bg_img, (x, y), (_x, _y), (0, 0, 255), 2)
# 保存缺口识别结果到背景图
if save_path:
save_path = Path(save_path).resolve()
save_path = (
save_path.parent
/ f"{save_path.stem}.{distance}{save_path.suffix}"
)
save_path = save_path.__str__()
cv2.imwrite(save_path, bg_img)
# 显示缺口识别结果
if im_show:
imshow(bg_img)
# 记录度量
if meter:
record_metric(
"walmart.captcha.distance",
distance,
description="滑块验证码缺口距离",
attributes={"success": True},
)
return distance
except Exception as e:
# 记录异常
add_span_attribute(span, "error", str(e))
span.record_exception(e)
# 记录度量
if meter:
record_metric(
"walmart.captcha.error",
1,
description="滑块验证码处理错误",
attributes={"error_type": type(e).__name__},
)
# 重新抛出异常
raise
class WalMartSpiderV3:
"""
https://www.upcard.com.cn:8091/chinaloyalty/walmart/qrybaltxn.html?link=next
"""
def __init__(self, card_num):
# 接收变量
self.card_num = card_num
# 初始化user_agent
self.user_agent = "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Mobile Safari/537.36".replace(
"121", str(random.randint(100, 121))
)
self.start_timestamp = None
self.start_formatted_time = None
self.session = requests.Session()
# 初始化代理池
self.proxy_pool = ProxyPoolFactory.get_proxy_pool(Config.get_proxy_type())
self.proxies = {}
# 请求超时时间
self.timeout = 2
# 质量不好的代理
self.unable_proxies = []
def __ease_out_expo(self, sep):
if sep == 1:
return 1
else:
return 1 - pow(2, -10 * sep)
def get_track_list(self, distance):
if not isinstance(distance, int) or distance < 0:
raise ValueError(
f"distance类型必须是大于等于0的整数: distance: {distance}, type: {type(distance)}"
)
# 初始化轨迹列表
slide_track = [
{
"x": 0,
"y": 0,
"type": "down",
"t": int(time.time() * 1000) - self.start_timestamp,
}
]
# 共记录count次滑块位置信息
count = 30 + int(distance / 2)
# 初始化滑动时间
t = random.randint(50, 100)
# 记录上一次滑动的距离
_x = 0
_y = 0
for i in range(count):
# 已滑动的横向距离
x = round(self.__ease_out_expo(i / count) * distance)
# 滑动过程消耗的时间
t += random.randint(10, 20)
if x == _x:
continue
item = {
"x": x,
"y": 0,
"type": "move",
"t": int(time.time() * 1000) + t - self.start_timestamp,
}
slide_track.append(item)
x = round(self.__ease_out_expo(279 / count) * distance)
item2 = {
"x": x,
"y": 1,
"type": "up",
"t": int(time.time() * 1000) + t - self.start_timestamp,
}
slide_track.append(item2)
return slide_track
def get_token(self):
# 创建追踪span
with get_tracer().start_as_current_span("walmart_spider.get_token") as span:
headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"Origin": "http://vpay.upcard.com.cn",
"Pragma": "no-cache",
"Referer": "http://vpay.upcard.com.cn/vcweixin/commercial/walm/query",
"User-Agent": self.user_agent,
"X-Requested-With": "XMLHttpRequest",
}
url = (
"https://vpay.upcard.com.cn/vcweixinwm/commercial/walm/getCaptchaToken"
)
data = {"company": "walm", "businesstype": "WALMQRYCARD"}
add_span_attribute(span, "url", url)
if self.proxies:
add_span_attribute(span, "proxy", str(self.proxies))
try:
response = self.session.post(
url,
headers=headers,
data=data,
proxies=self.proxies,
timeout=self.timeout,
)
add_span_attribute(span, "status_code", response.status_code)
# 记录度量
if meter:
record_metric(
"walmart.api.request",
1,
description="API请求",
attributes={"endpoint": "get_token", "success": True},
)
return response.text
except Exception as e:
# 记录异常
add_span_attribute(span, "error", str(e))
span.record_exception(e)
# 切换代理
if self.proxies:
self.proxy_pool.mark_proxy_invalid(self.proxies["http"])
# 记录度量
if meter:
record_metric(
"walmart.api.request",
1,
description="API请求",
attributes={
"endpoint": "get_token",
"success": False,
"error_type": type(e).__name__,
},
)
# 重新抛出异常
raise
def get_captcha(self, token):
# 创建追踪span
with get_tracer().start_as_current_span("walmart_spider.get_captcha") as span:
add_span_attribute(span, "token", token)
headers = {
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/json;charset=UTF-8",
"Origin": "https://vpay.upcard.com.cn",
"Pragma": "no-cache",
"Referer": "https://vpay.upcard.com.cn/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site",
"User-Agent": self.user_agent,
}
url = f"https://www.culdata.com/captcha/gen/20213997/WALMQRYCARD/{token}"
params = {"type": "SLIDER"}
add_span_attribute(span, "url", url)
if self.proxies:
add_span_attribute(span, "proxy", str(self.proxies))
try:
response = self.session.post(
url,
headers=headers,
params=params,
proxies=self.proxies,
timeout=self.timeout,
)
add_span_attribute(span, "status_code", response.status_code)
# 记录度量
if meter:
record_metric(
"walmart.api.request",
1,
description="API请求",
attributes={"endpoint": "get_captcha", "success": True},
)
result = response.json()
if "id" in result:
add_span_attribute(span, "captcha_id", result["id"])
return result
except Exception as e:
# 记录异常
add_span_attribute(span, "error", str(e))
span.record_exception(e)
# 切换代理
if self.proxies:
self.proxy_pool.mark_proxy_invalid(self.proxies["http"])
# 记录度量
if meter:
record_metric(
"walmart.api.request",
1,
description="API请求",
attributes={
"endpoint": "get_captcha",
"success": False,
"error_type": type(e).__name__,
},
)
# 重新抛出异常
raise
def check_captcha(self, id, token, track_list):
# 创建追踪span
with get_tracer().start_as_current_span("walmart_spider.check_captcha") as span:
add_span_attribute(span, "captcha_id", id)
add_span_attribute(span, "token", token)
add_span_attribute(span, "track_list_length", len(track_list))
url = f"https://www.culdata.com/captcha/check/20213997/WALMQRYCARD/{token}"
headers = {
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/json;charset=UTF-8",
"Origin": "http://vpay.upcard.com.cn",
"Pragma": "no-cache",
"Referer": "http://vpay.upcard.com.cn/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site",
"User-Agent": self.user_agent,
}
current_time = datetime.datetime.utcnow()
formatted_time = current_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
data = {
"id": id,
"data": {
"bgImageWidth": 300,
"bgImageHeight": 180,
"sliderImageWidth": 55,
"sliderImageHeight": 180,
"startSlidingTime": self.start_formatted_time,
"endSlidingTime": formatted_time,
"trackList": track_list,
},
}
add_span_attribute(span, "url", url)
if self.proxies:
add_span_attribute(span, "proxy", str(self.proxies))
try:
data_json = json.dumps(data, separators=(",", ":"))
response = self.session.post(
url,
headers=headers,
data=data_json,
proxies=self.proxies,
timeout=self.timeout,
)
add_span_attribute(span, "status_code", response.status_code)
result = response.json()
add_span_attribute(span, "result_code", result.get("code", 0))
logger.info(f"请求获取captcha返回值{result} 轨迹:{track_list} proxies {self.proxies}")
# 记录度量
if meter:
record_metric(
"walmart.api.request",
1,
description="API请求",
attributes={
"endpoint": "check_captcha",
"success": True,
"result_code": result.get("code", 0),
},
)
return result
except Exception as e:
# 记录异常
add_span_attribute(span, "error", str(e))
logger.error(f"请求获取check_captcha返回值{result} 轨迹:{track_list} proxies {self.proxies}")
span.record_exception(e)
# 切换代理
if self.proxies:
self.proxy_pool.mark_proxy_invalid(self.proxies["http"])
# 记录度量
if meter:
record_metric(
"walmart.api.request",
1,
description="API请求",
attributes={
"endpoint": "check_captcha",
"success": False,
"error_type": type(e).__name__,
},
)
# 重新抛出异常
raise
def query_card(self, captcha_check_id, card_num):
# 创建追踪span
with get_tracer().start_as_current_span("walmart_spider.query_card") as span:
add_span_attribute(span, "captcha_check_id", captcha_check_id)
add_span_attribute(span, "card_num", card_num)
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/x-www-form-urlencoded",
"Origin": "https://vpay.upcard.com.cn",
"Pragma": "no-cache",
"Referer": "https://vpay.upcard.com.cn/vcweixinwm/commercial/walm/query",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": self.user_agent,
}
url = "http://vpay.upcard.com.cn/vcweixin/commercial/walm/query"
data = {
"openId": "xxx",
"company": "walm",
"captchaCheckId": captcha_check_id,
"cardNo": card_num,
}
add_span_attribute(span, "url", url)
if self.proxies:
add_span_attribute(span, "proxy", str(self.proxies))
try:
response = requests.post(
url,
headers=headers,
data=data,
proxies=self.proxies,
timeout=self.timeout,
)
add_span_attribute(span, "status_code", response.status_code)
result = response.text
logger.info(f"请求获取query_res返回值{result} proxies {self.proxies}")
# 分析结果
if "卡号不存在" in result:
add_span_attribute(span, "result_type", "card_not_exist")
elif "卡号格式错误" in result:
add_span_attribute(span, "result_type", "card_format_error")
elif "money01" in result:
money_match = re.findall(
r'<span class="money01">¥(.*?)</span>', result
)
if money_match:
add_span_attribute(span, "result_type", "success")
add_span_attribute(span, "balance", money_match[0])
else:
add_span_attribute(span, "result_type", "unknown_error")
# 记录度量
if meter:
record_metric(
"walmart.api.request",
1,
description="API请求",
attributes={"endpoint": "query_card", "success": True},
)
return result
except Exception as e:
# 切换代理
if self.proxies:
self.proxy_pool.mark_proxy_invalid(self.proxies["http"])
# 记录异常
add_span_attribute(span, "error", str(e))
span.record_exception(e)
# 记录度量
if meter:
record_metric(
"walmart.api.request",
1,
description="API请求",
attributes={
"endpoint": "query_card",
"success": False,
"error_type": type(e).__name__,
},
)
# 重新抛出异常
raise
def get_proxies(self):
# 创建追踪span
with get_tracer().start_as_current_span("walmart_spider.get_proxies") as span:
add_span_attribute(span, "card_num", self.card_num)
try:
proxy = self.proxy_pool.get_proxy(self.card_num)
if proxy:
self.proxies = {"http": proxy, "https": proxy}
logger.info(
f"订单号:{self.card_num},从代理池获取代理:{self.proxies}"
)
add_span_attribute(span, "proxy_obtained", True)
add_span_attribute(span, "proxy", str(self.proxies))
# 记录度量
if meter:
record_metric(
"walmart.proxy.success", 1, description="代理获取成功"
)
else:
logger.warning(f"订单号:{self.card_num},无法获取可用代理")
self.proxies = {}
add_span_attribute(span, "proxy_obtained", False)
# 记录度量
if meter:
record_metric(
"walmart.proxy.failure", 1, description="代理获取失败"
)
except Exception as e:
# 记录异常
add_span_attribute(span, "error", str(e))
span.record_exception(e)
if self.proxies:
self.proxy_pool.mark_proxy_invalid(self.proxies["http"])
# 记录度量
if meter:
record_metric(
"walmart.proxy.error",
1,
description="代理获取错误",
attributes={"error_type": type(e).__name__},
)
# 重置代理
self.proxies = {}
# 重新抛出异常
raise
def run(self):
# 创建主追踪span
with get_tracer().start_as_current_span("walmart_spider.run") as main_span:
add_span_attribute(main_span, "card_num", self.card_num)
try:
# 初始化开始滑块验证时间
self.start_timestamp = int(time.time() * 1000)
current_time = datetime.datetime.utcnow()
self.start_formatted_time = current_time.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
)
# 记录开始时间用于计算总耗时
start_time = time.time()
# 获取token
token = None
for i in range(5):
add_span_event(main_span, f"get_token_attempt_{i+1}")
self.get_proxies()
try:
token = self.get_token()
logger.info(f"请求获取token返回值{token}")
if token:
add_span_attribute(main_span, "token_obtained", True)
break
except Exception as e:
logger.info(f"请求获取token返回值{traceback.format_exc()}")
logger.info(f"请求获取token异常正在重试...")
continue
if not token:
add_span_attribute(
main_span, "failure_reason", "token_not_obtained"
)
# 记录度量
if meter:
record_metric(
"walmart.process.failure",
1,
description="处理失败",
attributes={
"stage": "get_token",
"card_num": self.card_num,
},
)
return 110
# 获取滑块图片
captcha_res = None
for i in range(3):
add_span_event(main_span, f"get_captcha_attempt_{i+1}")
try:
captcha_res = self.get_captcha(token)
logger.info(f"请求获取captcha_res返回值{captcha_res['id']}")
if captcha_res:
add_span_attribute(main_span, "captcha_obtained", True)
add_span_attribute(
main_span, "captcha_id", captcha_res["id"]
)
break
except Exception as e:
logger.info(
f"请求获取captcha_res返回值{traceback.format_exc()}"
)
logger.info(f"请求获取captcha_res异常正在重试...")
self.get_proxies()
continue
if not captcha_res:
add_span_attribute(
main_span, "failure_reason", "captcha_not_obtained"
)
# 记录度量
if meter:
record_metric(
"walmart.process.failure",
1,
description="处理失败",
attributes={
"stage": "get_captcha",
"card_num": self.card_num,
},
)
return 110
id = captcha_res["id"]
bg_str = captcha_res["captcha"]["backgroundImage"].split("base64,")[-1]
slice_str = captcha_res["captcha"]["templateImage"].split("base64,")[-1]
# 识别缺口距离
add_span_event(main_span, "start_distance_detection")
try:
distance = get_distances(tp=slice_str, bg=bg_str)
distance = int(distance * 300 / 600)
add_span_attribute(main_span, "distance", distance)
except Exception as e:
add_span_attribute(
main_span, "failure_reason", "distance_detection_failed"
)
add_span_attribute(main_span, "error", str(e))
# 记录度量
if meter:
record_metric(
"walmart.process.failure",
1,
description="处理失败",
attributes={
"stage": "distance_detection",
"card_num": self.card_num,
},
)
return 110
# 构建滑块轨迹
add_span_event(main_span, "generate_track_list")
track_list = self.get_track_list(distance)
add_span_attribute(main_span, "track_list_length", len(track_list))
# 校验滑块获取滑块id
check_captcha_res = None
for i in range(3):
add_span_event(main_span, f"check_captcha_attempt_{i+1}")
try:
check_captcha_res = self.check_captcha(
id=id, token=token, track_list=track_list
)
break
except Exception as e:
logger.info(
f"请求获取check_captcha_res、获取验证码校验id返回值{traceback.format_exc()}"
)
logger.info(
f"请求获取check_captcha_res获取验证码校验id异常正在重试..."
)
self.get_proxies()
continue
# 如果不为200则校验不通过需要重新执行此时的滑块已失效
if check_captcha_res.get("code") != 200:
add_span_attribute(
main_span, "failure_reason", "captcha_check_failed"
)
add_span_attribute(
main_span, "captcha_check_code", check_captcha_res.get("code")
)
# 记录度量
if meter:
record_metric(
"walmart.process.failure",
1,
description="处理失败",
attributes={
"stage": "check_captcha",
"card_num": self.card_num,
},
)
return 110
captcha_check_id = check_captcha_res["data"]
add_span_attribute(main_span, "captcha_check_id", captcha_check_id)
query_res = None
for i in range(3):
add_span_event(main_span, f"query_card_attempt_{i+1}")
try:
query_res = self.query_card(
captcha_check_id=captcha_check_id, card_num=self.card_num
)
break
except Exception as e:
logger.error(
f"请求获取query_res返回值{traceback.format_exc()}"
)
logger.error(f"请求获取query_res异常正在重试...")
self.get_proxies()
continue
# 计算总耗时
total_time = time.time() - start_time
add_span_attribute(main_span, "total_processing_time", total_time)
if not query_res:
add_span_attribute(
main_span, "failure_reason", "query_result_empty"
)
# 记录度量
if meter:
record_metric(
"walmart.process.failure",
1,
description="处理失败",
attributes={
"stage": "query_card",
"card_num": self.card_num,
},
)
return 110
if "卡号不存在" in query_res:
add_span_attribute(main_span, "result", "card_not_exist")
# 记录度量
if meter:
record_metric(
"walmart.query.result",
1,
description="查询结果",
attributes={
"result": "card_not_exist",
"card_num": self.card_num,
},
)
return 105
if "卡片和商户不匹配" in query_res:
add_span_attribute(main_span, "result", "card_not_exist")
# 记录度量
if meter:
record_metric(
"walmart.query.result",
1,
description="查询结果",
attributes={
"result": "card_not_match",
"card_num": self.card_num,
},
)
return 105
elif "卡号格式错误" in query_res:
add_span_attribute(main_span, "result", "card_format_error")
# 记录度量
if meter:
record_metric(
"walmart.query.result",
1,
description="查询结果",
attributes={
"result": "card_format_error",
"card_num": self.card_num,
},
)
return 103
elif re.findall(r'<span class="money01">¥(.*?)</span>', query_res):
res = re.findall(
r'<span class="money01">¥(.*?)</span>', query_res
)[0]
logger.info(f"匹配出来的money:{res}")
add_span_attribute(main_span, "result", "success")
add_span_attribute(main_span, "balance", res)
# 记录度量
if meter:
record_metric(
"walmart.query.result",
1,
description="查询结果",
attributes={"result": "success", "card_num": self.card_num},
)
try:
record_metric(
"walmart.card.balance",
float(res),
description="卡余额",
attributes={"card_num": self.card_num},
)
except ValueError:
pass
return res
else:
add_span_attribute(main_span, "result", "unknown_error")
add_span_attribute(
main_span, "query_response", query_res[:200]
) # 记录部分响应内容
# 记录度量
if meter:
record_metric(
"walmart.query.result",
1,
description="查询结果",
attributes={
"result": "unknown_error",
"card_num": self.card_num,
},
)
return 110
except Exception as e:
logger.info(f"run方法异常:{traceback.format_exc()}")
# 记录异常到span
add_span_attribute(main_span, "error", str(e))
add_span_attribute(main_span, "failure_reason", "unexpected_exception")
main_span.record_exception(e)
# 记录度量
if meter:
record_metric(
"walmart.process.error",
1,
description="处理错误",
attributes={
"error_type": type(e).__name__,
"card_num": self.card_num,
},
)
return 110
def my_json(code, data, msg):
return {"code": code, "data": data, "msg": msg}
@app.route("/api/v2/walmart/card/query", methods=["GET", "POST"], strict_slashes=False)
def check_money():
# 创建请求追踪span
with get_tracer().start_as_current_span("api.check_money") as span:
if request.method == "GET":
add_span_attribute(span, "method", "GET")
return "okk"
elif request.method == "POST":
add_span_attribute(span, "method", "POST")
# 接收参数
data = json.loads(request.get_data())
card_num = data.get("card_num")
add_span_attribute(span, "card_num", card_num)
# 记录度量
if meter:
record_metric(
"walmart.api.request_count",
1,
description="API请求计数",
attributes={"endpoint": "/api/v2/walmart/card/query"},
)
start_time = time.time()
for i in range(3):
add_span_event(span, f"attempt_{i+1}")
res = WalMartSpiderV3(
card_num=card_num,
).run()
if res == 110:
logger.info(f"请求获取token返回值{res}")
continue
# 卡号格式错误
if res == 105:
add_span_attribute(span, "result", "card_not_exist")
# 记录度量
if meter:
record_metric(
"walmart.api.response_time",
time.time() - start_time,
description="API响应时间",
unit="s",
attributes={
"endpoint": "/api/v2/walmart/card/query",
"result": "card_not_exist",
},
)
return my_json(code=20017, data={}, msg="卡号不存在")
# 卡号格式错误
if res == 103:
add_span_attribute(span, "result", "card_format_error")
# 记录度量
if meter:
record_metric(
"walmart.api.response_time",
time.time() - start_time,
description="API响应时间",
unit="s",
attributes={
"endpoint": "/api/v2/walmart/card/query",
"result": "card_format_error",
},
)
return my_json(code=20018, data={}, msg="卡号格式错误")
# 爬虫未知错误
if not res:
add_span_attribute(span, "result", "unknown_error")
# 记录度量
if meter:
record_metric(
"walmart.api.response_time",
time.time() - start_time,
description="API响应时间",
unit="s",
attributes={
"endpoint": "/api/v2/walmart/card/query",
"result": "unknown_error",
},
)
return my_json(code=100000, data={}, msg="未知错误")
add_span_attribute(span, "result", "success")
add_span_attribute(span, "balance", res)
# 记录度量
if meter:
record_metric(
"walmart.api.response_time",
time.time() - start_time,
description="API响应时间",
unit="s",
attributes={
"endpoint": "/api/v2/walmart/card/query",
"result": "success",
},
)
return my_json(code=2000, data={"money": res}, msg="请求成功")
add_span_attribute(span, "result", "validation_failed")
# 记录度量
if meter:
record_metric(
"walmart.api.response_time",
time.time() - start_time,
description="API响应时间",
unit="s",
attributes={
"endpoint": "/api/v2/walmart/card/query",
"result": "validation_failed",
},
)
return my_json(code=20016, data={}, msg="基础校验失败")
@app.route("/api/v3/walmart/card/query", methods=["GET", "POST"], strict_slashes=False)
def check_money_pc():
# 创建请求追踪span
with get_tracer().start_as_current_span("api.check_money_pc") as span:
if request.method == "GET":
add_span_attribute(span, "method", "GET")
return "okk"
elif request.method == "POST":
add_span_attribute(span, "method", "POST")
# 接收参数
data = json.loads(request.get_data())
card_num = data.get("card_num")
add_span_attribute(span, "card_num", card_num)
logger.info(f"当前操作卡号:{card_num}")
# 记录度量
if meter:
record_metric(
"walmart.api.request_count",
1,
description="API请求计数",
attributes={"endpoint": "/api/v3/walmart/card/query"},
)
start_time = time.time()
try:
if str(card_num).startswith("23") or str(card_num).startswith("60"):
add_span_attribute(span, "card_type", "balance_spider")
add_span_event(span, "using_balance_spider")
code, balance = BalanceSpider(card_num=card_num).run()
logger.info(f"卡号:{card_num},响应码:{code},卡余额:{balance}")
add_span_attribute(span, "response_code", code)
add_span_attribute(span, "balance", balance)
# 记录度量
if meter:
record_metric(
"walmart.api.response_time",
time.time() - start_time,
description="API响应时间",
unit="s",
attributes={
"endpoint": "/api/v3/walmart/card/query",
"card_type": "balance_spider",
},
)
return my_json(code=code, data={"money": balance}, msg="请求成功")
else:
add_span_attribute(span, "card_type", "walmart_spider")
add_span_event(span, "using_walmart_spider")
wal = WalMartSpiderV3(
card_num=card_num,
)
for i in range(5):
add_span_event(span, f"attempt_{i+1}")
res = wal.run()
if res == 110:
continue
# 卡号格式错误
if res == 105:
add_span_attribute(span, "result", "card_not_exist")
# 记录度量
if meter:
record_metric(
"walmart.api.response_time",
time.time() - start_time,
description="API响应时间",
unit="s",
attributes={
"endpoint": "/api/v3/walmart/card/query",
"result": "card_not_exist",
},
)
return my_json(code=20017, data={}, msg="卡号不存在")
# 卡号格式错误
if res == 103:
add_span_attribute(span, "result", "card_format_error")
# 记录度量
if meter:
record_metric(
"walmart.api.response_time",
time.time() - start_time,
description="API响应时间",
unit="s",
attributes={
"endpoint": "/api/v3/walmart/card/query",
"result": "card_format_error",
},
)
return my_json(code=20018, data={}, msg="卡号格式错误")
# 爬虫未知错误
if not res:
add_span_attribute(span, "result", "unknown_error")
# 记录度量
if meter:
record_metric(
"walmart.api.response_time",
time.time() - start_time,
description="API响应时间",
unit="s",
attributes={
"endpoint": "/api/v3/walmart/card/query",
"result": "unknown_error",
},
)
return my_json(code=100000, data={}, msg="未知错误")
add_span_attribute(span, "result", "success")
add_span_attribute(span, "balance", res)
# 记录度量
if meter:
record_metric(
"walmart.api.response_time",
time.time() - start_time,
description="API响应时间",
unit="s",
attributes={
"endpoint": "/api/v3/walmart/card/query",
"result": "success",
},
)
return my_json(code=2000, data={"money": res}, msg="请求成功")
add_span_attribute(span, "result", "all_attempts_failed")
# 记录度量
if meter:
record_metric(
"walmart.api.response_time",
time.time() - start_time,
description="API响应时间",
unit="s",
attributes={
"endpoint": "/api/v3/walmart/card/query",
"result": "all_attempts_failed",
},
)
return my_json(code=20016, data={}, msg="基础校验失败")
except Exception as e:
# 记录异常
logger.info(f"卡号:{card_num},报错:{traceback.format_exc()}")
add_span_attribute(span, "error", str(e))
span.record_exception(e)
# 记录度量
if meter:
record_metric(
"walmart.api.error",
1,
description="API错误",
attributes={
"endpoint": "/api/v3/walmart/card/query",
"error_type": type(e).__name__,
},
)
record_metric(
"walmart.api.response_time",
time.time() - start_time,
description="API响应时间",
unit="s",
attributes={
"endpoint": "/api/v3/walmart/card/query",
"result": "error",
},
)
return my_json(code=100000, data={}, msg="未知错误")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5007)