Files
kami_itunes_third_api/src/integrations/itunes/api.py
danial dc34e8646e fix(itunes): 删除敏感信息的日志打印
- 移除输出POST请求数据的日志,避免泄露账号密码信息
- 删除接口响应内容的日志打印,增强安全性
- 保持功能逻辑不变,提升代码安全性
2025-11-17 20:06:18 +08:00

352 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import pickle
import random
import traceback
import urllib3
import requests
from loguru import logger
from urllib3.util import Retry
from requests.adapters import HTTPAdapter
from src.integrations.itunes.models.login import (
ItunesLoginResponse,
ItunesFailLoginPlistData,
ItunesSuccessLoginPlistData,
ItunesAccountInfo,
)
from src.integrations.itunes.models.redeem import (
RedeemFailResponseModel,
RedeemSuccessResponse,
)
from src.integrations.itunes.utils import parse_xml
from src.integrations.june.models.login import (
ItunesLoginModel,
AppleAccountModel,
)
from src.integrations.june.models.redeem import AuthenticateModel, ItunesRedeemModel
from src.service.proxy import ProxyService
from src.utils.crypto import AESKey, decrypt_with_aes
# 禁用 SSL 警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class AppleClient:
def __init__(self):
self.__session = requests.Session()
self.__session.verify = False # 禁用 SSL 证书验证
# 设置重试策略
retry_strategy = Retry(
total=3, # 总重试次数
backoff_factor=1, # 重试间隔增加到1秒
status_forcelist=[429, 500, 502, 503, 504], # 需要重试的HTTP状态码
allowed_methods=["GET", "POST"], # 允许重试的 HTTP 方法
raise_on_status=False, # 不抛出 HTTP 错误状态的异常
raise_on_redirect=False, # 不抛出重定向异常
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.__session.mount("http://", adapter)
self.__session.mount("https://", adapter)
# 设置默认请求头
self.__session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
}
)
def _get_sign(self, post_data: str):
post_data = {"data": post_data}
response = self.__session.post(
"http://43.136.20.129:8880/iTunes/ac", json=post_data
)
result = json.loads(response.text)
return result.get("result", "")
def _generate_guid(self):
segments = []
for _ in range(7):
segment = f"{random.getrandbits(32):08X}"
segments.append(segment)
return ".".join(segments)
def _get_post_data(self, guid: str, account: AppleAccountModel):
password = decrypt_with_aes(
account.password,
AESKey.load_from_base64(
"P0x6Gy6dXIpPbhE7PHxaHbfZHhsbT2qNPlx3qbHTP1o="
),
AESKey.load_from_base64("nywao1XkDXeYwbPeWh+SxA=="),
)
post_data = f"""<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"><plist version="1.0"><dict><key>appleId</key><string>{account.account}</string><key>attempt</key><string>1</string><key>guid</key><string>{guid}</string><key>password</key><string>{password}</string><key>rmp</key><string>0</string><key>createSession</key><string>true</string><key>why</key><string>purchase</string></dict></plist>"""
return post_data
def login(
self,
account: AppleAccountModel,
) -> ItunesLoginResponse:
guid = self._generate_guid()
post_data = self._get_post_data(guid, account)
action_signature = self._get_sign(post_data)
headers = {
"Host": "buy.itunes.apple.com",
"X-Apple-ActionSignature": action_signature,
"X-Apple-Store-Front": "143465-19,17",
"X-Apple-Partner": "origin.0",
"X-Apple-Client-Application": "Software",
"X-Apple-Connection-Type": "WiFi",
"X-Token-T": "M",
"X-Apple-Client-Versions": "iBooks/??; iTunesU/??; GameCenter/??; Podcasts/3.9",
"Accept-Encoding": "gzip, deflate",
"Accept": "*/*",
"X-Apple-Software-Guid": "8dc642820fc6e1ec0cc40188b7d90f2c620de391",
"X-Apple-Tz": "28800",
"User-Agent": "MacAppStore/2.0 (Macintosh; OS X 12.6) AppleWebKit/613.3.9.1.16 build/7 (dt:1)",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"Referer": "https://buy.itunes.apple.com/WebObjects/MZFinance.woa/wa/authenticate",
}
response = None
for i in range(3):
try:
response = self.__session.post(
"https://buy.itunes.apple.com/WebObjects/MZFinance.woa/wa/authenticate",
data=post_data,
headers=headers,
verify=False,
proxies=ProxyService().get_wrap_proxy(account.account),
allow_redirects=False,
)
break
except (requests.exceptions.SSLError, requests.exceptions.ProxyError) as e:
logger.error(f"SSL/代理错误: {str(e)}")
# 代理错误时,清除当前代理并重试
ProxyService().set_expire_strategy()
continue
except requests.exceptions.RequestException as e:
logger.error(f"请求错误: {str(e)}")
continue
if not response or not response.ok:
return ItunesLoginResponse(
serverId="",
response=ItunesFailLoginPlistData(
status=30,
failureType="MAX_RETRIES_EXCEEDED",
customerMessage="登录重试次数已用完",
),
guid="",
originLog="登录重试次数已用完",
)
response_dict_data = parse_xml(response.text)
if "failureType" in response_dict_data:
status = 31
# 账户被禁用
if (
response_dict_data.get("metrics", {}).get("dialogId")
== "MZFinance.AccountDisabled"
):
status = 14
# 账户被锁定
if (
response_dict_data.get("metrics", {}).get("dialogId")
== "MZFinance.DisabledAndFraudLocked"
):
status = 14
# 密码错误
if response_dict_data.get("failureType") == "-5000":
status = 13
if status == 31:
logger.warning("登录状态未知:", response_dict_data)
response_model = ItunesFailLoginPlistData(
**{"status": status, **response_dict_data}
)
return ItunesLoginResponse(
serverId="",
response=response_model,
originLog=response.text,
)
pod = response.headers["pod"]
headers = {
"User-Agent": "MacAppStore/2.0 (Macintosh; OS X 12.6) AppleWebKit/613.3.9.1.16 build/7 (dt:1)",
"X-Apple-Store-Front": response.headers.get("x-apple-store-front"),
"X-Apple-ActionSignature": action_signature,
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"X-Apple-Software-Guid": "8dc642820fc6e1ec0cc40188",
"X-Apple-Client-Application": "Software",
"X-Apple-Connection-Type": "WiFi",
"X-Apple-Client-Versions": "iBooks/??; iTunesU/??; GameCenter/??; Podcasts/3.9",
"X-Apple-Tz": "28800",
"Accept-Language": "zh-CN, en;q=0.9, *;q=0.1",
"Accept": "*/*",
}
response = self.__session.post(
f"https://p{pod}-buy.itunes.apple.com/WebObjects/MZFinance.woa/wa/authenticate?Pod={pod}&PRH={pod}",
data=post_data,
headers=headers,
verify=False,
proxies=ProxyService().get_wrap_proxy(account.account),
allow_redirects=False,
)
response_dict_data = parse_xml(response.text)
return ItunesLoginResponse(
serverId=pod,
guid=guid,
originLog=response.text,
response=ItunesSuccessLoginPlistData(**response_dict_data),
)
def redeem(
self,
code: str,
itunes: ItunesLoginModel,
account_info: ItunesAccountInfo,
reties=5,
) -> RedeemSuccessResponse | RedeemFailResponseModel:
if reties <= 0:
logger.error("兑换失败,兑换重试次数已用完")
return RedeemFailResponseModel(
status=30,
errorMessageKey="",
errorMessage="兑换失败,兑换重试次数已用完",
origin_log="兑换失败,兑换重试次数已用完",
userPresentableErrorMessage="兑换失败,兑换重试次数已用完",
)
url = f"https://p{itunes.server_id}-buy.itunes.apple.com/WebObjects/MZFinance.woa/wa/redeemCodeSrv"
headers = {
"X-Apple-Store-Front": "143465-19,13",
"X-Dsid": str(itunes.dsis),
"X-Token": itunes.password_token,
"X-Apple-Software-Guid": itunes.guid,
"X-Apple-Partner": "origin.0",
"X-Apple-Client-Application": "Software",
"X-Apple-Connection-Type": "WiFi",
"X-Apple-Client-Versions": "GameCenter/2.0",
"X-Token-T": "M",
"X-Apple-Tz": "28800",
"Accept-Encoding": "gzip, deflate",
"Content-Type": "application/x-apple-plist; Charset=UTF-8",
"User-Agent": "MacAppStore/2.0 (Macintosh; OS X 12.10) AppleWebKit/600.1.3.41",
"Referer": f"https://p{itunes.server_id}-buy.itunes.apple.com/WebObjects/MZFinance.woa/wa/com.apple"
f".jingle.app.finance.DirectAction/redeemCode?cl=iTunes&pg=Music",
}
try:
response = self.__session.post(
url,
data=ItunesRedeemModel(
attempt_count=1,
camera_recognized_code=False,
cl="iTunes",
code=code,
ds_personId=itunes.dsis,
guid=itunes.guid,
kbsync="",
pg="Music",
has_4gb_limit=False,
).to_xml(),
headers=headers,
timeout=30,
proxies=ProxyService().get_wrap_proxy(account_info.account_name),
)
except Exception as e:
logger.warning(f"兑换连接错误,重试:{e}\t{traceback.format_exc()}")
return self.redeem(code, itunes, account_info, reties - 1)
response.encoding = "utf-8"
try:
logger.info(f"返回状态码:{response.status_code}")
if response.status_code == 429:
return RedeemFailResponseModel(
status=32,
originStatusCode=response.status_code,
errorMessageKey="",
errorMessage="触发充值次数限制",
origin_log=response.text,
userPresentableErrorMessage="触发充值次数限制",
)
if not response.text.strip():
return RedeemFailResponseModel(
status=40,
errorMessageKey="",
originStatusCode=response.status_code,
errorMessage="兑换1分钟限制",
origin_log=response.text,
userPresentableErrorMessage="兑换1分钟限制",
)
if "xml" in response.text:
if "MZFinance.RedeemCodeSrvLoginRequired" in response.text:
return RedeemFailResponseModel(
status=0,
errorMessageKey="",
originStatusCode=response.status_code,
errorMessage="需要登录",
origin_log=response.text,
userPresentableErrorMessage="需要登录",
)
else:
if response.json().get("status") == 0:
result = RedeemSuccessResponse.model_validate(response.json())
result.origin_log = response.text
return result
result = RedeemFailResponseModel.model_validate(response.json())
result.origin_log = response.text
if (
result.errorMessageKey
== "MZCommerce.GiftCertificateAlreadyRedeemed"
):
# 已经被兑换
result.status = 12
elif result.errorMessageKey == "MZFreeProductCode.NoSuch":
# 没有这个卡密
result.status = 11
elif (
result.errorMessageKey
== "MZCommerce.NatIdYearlyCapExceededException"
):
# 年限额
result.status = 31
elif (
result.errorMessageKey
== "MZCommerce.NatIdDailyCapExceededException"
):
# 日限额
result.status = 31
# 国籍问题
elif (
result.errorMessageKey
== "MZCommerce.GiftCertRedeemStoreFrontMismatch"
):
result.status = 15
elif result.errorMessageKey == "MZCommerce.GiftCertificateDisabled":
result.status = 11
else:
logger.error(f"失败状态未知:{response.json()}")
if result.status == -1 or result.status == 0:
result.status = 30
logger.warning("兑换状态未知:", response.text)
return result
except Exception as e:
logger.error(f"json格式化失败{e}\t返回值{response.text}")
return RedeemFailResponseModel(
status=30,
errorMessageKey="",
originStatusCode=response.status_code,
errorMessage="状态未知",
origin_log=response.text,
userPresentableErrorMessage="状态未知",
)
# 导出cookies
def export_cookies(self) -> bytes:
return pickle.dumps(self.__session.cookies)
def import_cookies(self, cookies: bytes):
self.__session.cookies.update(pickle.loads(cookies))