Files
kami_spider_monorepo/apps/apple/clients/june/api.py
danial ba558b94ad refactor(june): 优化SixClient异步请求和异常处理
- SixClient中引入异步请求方法try_get_url获取并验证备用主机地址
- 初始化__base_url时支持异步调用,提升启动时灵活性
- _do_post与相关异步请求方法改用异常抛出替代返回None规范错误处理
- 修正部分解码逻辑,确保签名等字段的正确Base64和URL解码
- 代码中添加日志和重试机制以提升异常处理和调试能力
- 删除june模块中遗留的单元测试代码
- 调整june模型中字段类型,支持可选字符串以增强健壮性
- apps/apple/clients/itunes中添加Field默认值,防止字段缺失错误
- itunes解析XML相关函数增强健壮性,避免空指针异常
- core.clients.http_client支持完整http路径调用,注释掉自动抛错逻辑以兼容特殊响应
- base.py中retry_backoff类型显式定义为float,提高代码类型安全性
2025-11-01 23:52:30 +08:00

288 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import base64
import hashlib
import json
import random
import time
import traceback
from typing import Any, Optional
from urllib import parse
from observability.logging import get_logger
from apps.apple.clients.june.config import Config
from apps.apple.clients.june.crypto import encrypt_cbc_base64, encrypt
from apps.apple.clients.june.models.login import (
AppleSixResponseModel,
LoginUserInfo,
AppleAccountModel,
LoginSignatureModel,
)
from apps.apple.clients.june.models.redeem import AuthenticateModel
from apps.apple.clients.june.utils.utils import ShareCodeUtils, decode_and_decompress
from core.clients.http_client import HTTPClient
logger = get_logger(__name__)
async def try_get_url() -> str:
host_items = ["https://gitee.com/liuyueapp/blogsLiuyue/raw/master/assets/aphos.css", "https://meixi2.oss-us-west-1.aliyuncs.com/host.txt", "https://zjzhuanfa.oss-cn-shenzhen.aliyuncs.com/host.txt"]
target_host_urls = []
for host in host_items:
client = HTTPClient(
base_url=host,
timeout=2.0,
max_retries=1,
)
try:
response = await client.get(host)
if response.status_code == 200:
target_host_urls.extend(response.text.splitlines())
except Exception as e:
logger.error(e)
# 去重
target_host_urls = list(set(target_host_urls))
for host in target_host_urls:
client = HTTPClient(
base_url=host,
timeout=5.0,
max_retries=1,
)
try:
response = await client.get(host+"/AppleClientApi/requestApi")
if response.status_code == 404:
return host
except Exception as e:
print(e)
return "http://43.240.73.119:6113"
__base_url = asyncio.run(main=try_get_url())
class SixClient:
__base_url = "http://43.240.73.119:6113"
def __init__(self):
if not self.__base_url:
self.__base_url = asyncio.run(try_get_url())
self._client: Optional[HTTPClient] = None
async def _get_client(self) -> HTTPClient:
"""获取或创建HTTP客户端"""
if self._client is None:
self._client = HTTPClient(
base_url=self.__base_url,
timeout=30.0,
max_retries=3,
)
return self._client
async def close(self):
"""关闭客户端连接"""
if self._client:
await self._client.close()
self._client = None
async def _do_post(
self, post_data: Any, type_: str, start_now_fun: str = "0", reties: int = 3
) -> AppleSixResponseModel:
if reties <= 0:
raise Exception("请求超时")
client = await self._get_client()
req_count = random.randint(0, 90) + 1
text = (
str(int(time.time()) + req_count)
+ str(Config.user_info.uid).zfill(4)
+ str(req_count)
).zfill(16)
if len(text) > 16:
text = text[:16]
# 生成32位md5
md5_ = hashlib.md5((Config.saff_mac + "by六月的风").encode()).hexdigest()
key = "7a588e60045849a1"
text2 = "90e7b0dc3ef2134c"
text3 = encrypt_cbc_base64(
encrypt(
json.dumps(post_data, separators=(",", ":")), key, text, False, text2
),
text,
text2,
).strip()
md5_2 = hashlib.md5(
(text3 + type_ + md5_ + "by六月的风_联系qq:1023092054").encode("utf-8")
).hexdigest()
headers = {
"timestamp": text,
"mac": md5_,
"startNowFun": start_now_fun,
"version": "5.1.9",
"User-Agent": "liuyeu_AppleBatch_June",
"Content-Type": "application/x-www-form-urlencoded",
}
if Config.user_info.token:
headers["token"] = base64.b64encode(
encrypt_cbc_base64(Config.user_info.token, text, text2).encode("utf-8")
).decode("utf-8")
try:
response = await client.post(
"/AppleClientApi/requestApi",
data={
"authentiString": base64.b64encode(text3.encode("utf-8")).decode(
"utf-8"
),
"sign": md5_2,
"type": type_,
},
headers=headers,
)
except Exception as e:
logger.error(e)
logger.warning("请求错误,重试~")
return await self._do_post(post_data, type_, start_now_fun, reties - 1)
if response.status_code == 200:
if (
response.headers.get("sign")
and hashlib.md5(
(
"abc_" + response.text + text + "by六月的风_联系qq:1023092054"
).encode("utf-8")
).hexdigest()
!= response.headers.get("sign")
):
raise Exception("签名错误")
return AppleSixResponseModel(**response.json())
logger.warning("请求错误,重试~")
return await self._do_post(post_data, type_, start_now_fun, reties - 1)
async def login(self):
response = await self._do_post(
{"account": "q905262752", "pwd": "Aa112211"},
"ApiLogin",
)
if response and response.Data:
Config.user_info = LoginUserInfo(**response.Data)
Config.user_info.uid = str(ShareCodeUtils.code_to_id(Config.user_info.userNumber))
async def check_is_login(self) -> bool:
response = await self._do_post(
{"token": Config.user_info.token, "type": 9},
"ApiIsLogin",
)
if response and response.Code == "0000" and response.Message == "已经登录":
return True
return False
async def login_remote_apple_account(
self, account: AppleAccountModel
) -> AppleSixResponseModel[dict] | None:
response = await self._do_post(
{
"token": Config.user_info.token,
"account": account.account,
"pwd": account.password,
"isStore": 2,
"guid": "",
"serviceIpIndex": -1,
},
"ApiItunesLogin",
)
if response and response.Data:
response.Data = json.loads(response.Data)
return AppleSixResponseModel[dict].model_validate(response.model_dump())
raise Exception("远程登录失败")
async def get_sign_sap_setup(
self, reties: int = 3
) -> AppleSixResponseModel[LoginSignatureModel]:
if reties < 0:
raise Exception("获取 sign 失败")
response: AppleSixResponseModel[str] = await self._do_post(
json.dumps(
{
"gZip": "1",
"type": "GetSignsapsetup",
"userAgent": "MacAppStore/2.0 (Macintosh; OS X 12.10) AppleWebKit/600.1.3.41",
},
separators=(",", ":"),
),
"ApiServiceSend",
start_now_fun="9",
)
if not response or not response.Data:
raise Exception("获取 sign 失败")
response_data = json.loads(decode_and_decompress(response.Data))
if response_data.get("msg") == "请重试":
logger.info(f"重试六月登录,{response}")
time.sleep(1)
return await self.get_sign_sap_setup(reties - 1)
target_response_data = response.model_dump()
target_response_data["Data"] = response_data
target_response = AppleSixResponseModel[LoginSignatureModel].model_validate(
target_response_data
)
# 处理signature编码问题和Base64转码问题
target_response.Data.signature = base64.b64decode(
parse.unquote_plus(target_response.Data.signature)
).decode()
return target_response
async def get_sign_sap_setup_cert(
self,
account: AppleAccountModel,
sign: AppleSixResponseModel[LoginSignatureModel],
sign_sap_setup: str,
reties: int = 3,
) -> AppleSixResponseModel[AuthenticateModel]:
if reties < 0:
raise Exception("获取 sign 失败")
response:AppleSixResponseModel[str] = await self._do_post(
json.dumps(
{
"signSap": parse.quote_plus(
base64.b64encode(sign_sap_setup.encode()).decode()
),
"appleId": account.account,
"guid": "",
"applePwd": account.password,
"intptr_": sign.Data.adder1,
"intPtr": sign.Data.adder2,
"idType": 1,
"gZip": "1",
"type": "GetSignsapsetupCert",
"serviceIndex": sign.serverIndex,
},
separators=(",", ":"),
),
"ApiServiceSend",
start_now_fun="9",
)
if not response or not response.Data:
raise Exception("获取 sign 失败")
try:
response_data = json.loads(decode_and_decompress(response.Data))
except AttributeError as e:
logger.error(msg=f"获取cert失败{response},错误信息:{traceback.format_exc()}")
return await self.get_sign_sap_setup_cert(account, sign, sign_sap_setup, reties - 1)
if response_data.get("msg") == "请重试":
logger.info(f"重试六月登录,{response}")
time.sleep(1)
return await self.get_sign_sap_setup_cert(
account, sign, sign_sap_setup, reties - 1
)
new_target_response_data= response.model_dump()
new_target_response_data["Data"] = response_data
target_response_data = AppleSixResponseModel[AuthenticateModel].model_validate(
response.model_dump()
)
# 解码数据
target_response_data.Data.signature = parse.unquote_plus(target_response_data.Data.signature)
target_response_data.Data.guid = parse.unquote_plus(target_response_data.Data.guid)
target_response_data.Data.post = parse.unquote_plus(target_response_data.Data.post)
return target_response_data