Merge branch 'develop' into production

This commit is contained in:
danial
2025-11-17 20:02:02 +08:00
4 changed files with 134 additions and 166 deletions

View File

@@ -1,5 +1,6 @@
import json
import pickle
import re
import random
import traceback
import urllib3
import requests
@@ -18,9 +19,13 @@ from src.integrations.itunes.models.redeem import (
RedeemSuccessResponse,
)
from src.integrations.itunes.utils import parse_xml
from src.integrations.june.models.login import LoginSignatureModel, ItunesLoginModel
from src.integrations.june.models.login import (
ItunesLoginModel,
AppleAccountModel,
)
from src.integrations.june.models.redeem import AuthenticateModel, ItunesRedeemModel
from src.service.proxy import ProxyService
from src.utils.crypto import AESKey, decrypt_with_aes
# 禁用 SSL 警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@@ -54,51 +59,81 @@ class AppleClient:
}
)
def query_sign_sap_setup(self, signature: LoginSignatureModel, retries=3) -> str:
if retries <= 0:
return ""
# 生成cookie
cookies = {
"mzf_in": "07281",
"s_vi": "",
"itsMetricsR": "Genre-CN-Mobile Software Applications-29099@@Mobile Software Applications-main@@@@",
"s_vnum_n2_us": "0|1",
}
# base64解码
if signature.serverId != "0":
cookies["pod"] = signature.serverId
cookies["itspod"] = signature.serverId
try:
response = self.__session.post(
"https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/signSapSetup",
data=signature.signature,
headers={
"X-Apple-Store-Front": "1433465-19,17",
"X-Apple-Partner": "origin.0",
"X-Apple-Client-Application": "Software",
"X-Apple-Connection-Type": "WiFi",
"X-Apple-Client-Versions": "GameCenter/2.0",
"X-Token-T": "M",
"X-Apple-Tz": "28800",
"Content-Type": "application/x-apple-plist; Charset=UTF-8",
},
timeout=30,
cookies=cookies,
)
except Exception as e:
logger.info("这是发生了错误")
return self.query_sign_sap_setup(signature, retries - 1)
return response.text
def _get_sign(self, post_data: str):
post_data = {"data": post_data}
response = self.__session.post(
"http://43.136.20.129:8880/iTunes/ac", json=post_data
)
result = json.loads(response.text)
return result.get("result", "")
def _generate_guid(self):
segments = []
for _ in range(7):
segment = f"{random.getrandbits(32):08X}"
segments.append(segment)
return ".".join(segments)
def _get_post_data(self, guid: str, account: AppleAccountModel):
password = decrypt_with_aes(
account.password,
AESKey.load_from_base64(
"P0x6Gy6dXIpPbhE7PHxaHbfZHhsbT2qNPlx3qbHTP1o="
),
AESKey.load_from_base64("nywao1XkDXeYwbPeWh+SxA=="),
)
post_data = f"""<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"><plist version="1.0"><dict><key>appleId</key><string>{account.account}</string><key>attempt</key><string>1</string><key>guid</key><string>{guid}</string><key>password</key><string>{password}</string><key>rmp</key><string>0</string><key>createSession</key><string>true</string><key>why</key><string>purchase</string></dict></plist>"""
logger.info(f"POST DATA: {post_data}")
return post_data
def login(
self,
sign_map: AuthenticateModel,
account_info: ItunesAccountInfo,
server_id: str = "",
retries: int = 5,
account: AppleAccountModel,
) -> ItunesLoginResponse:
if retries <= 0:
logger.error("登录重试次数已用完")
guid = self._generate_guid()
post_data = self._get_post_data(guid, account)
action_signature = self._get_sign(post_data)
headers = {
"Host": "buy.itunes.apple.com",
"X-Apple-ActionSignature": action_signature,
"X-Apple-Store-Front": "143465-19,17",
"X-Apple-Partner": "origin.0",
"X-Apple-Client-Application": "Software",
"X-Apple-Connection-Type": "WiFi",
"X-Token-T": "M",
"X-Apple-Client-Versions": "iBooks/??; iTunesU/??; GameCenter/??; Podcasts/3.9",
"Accept-Encoding": "gzip, deflate",
"Accept": "*/*",
"X-Apple-Software-Guid": "8dc642820fc6e1ec0cc40188b7d90f2c620de391",
"X-Apple-Tz": "28800",
"User-Agent": "MacAppStore/2.0 (Macintosh; OS X 12.6) AppleWebKit/613.3.9.1.16 build/7 (dt:1)",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"Referer": "https://buy.itunes.apple.com/WebObjects/MZFinance.woa/wa/authenticate",
}
response = None
for i in range(3):
try:
response = self.__session.post(
"https://buy.itunes.apple.com/WebObjects/MZFinance.woa/wa/authenticate",
data=post_data,
headers=headers,
verify=False,
proxies=ProxyService().get_wrap_proxy(account.account),
allow_redirects=False,
)
break
except (requests.exceptions.SSLError, requests.exceptions.ProxyError) as e:
logger.error(f"SSL/代理错误: {str(e)}")
# 代理错误时,清除当前代理并重试
ProxyService().set_expire_strategy()
continue
except requests.exceptions.RequestException as e:
logger.error(f"请求错误: {str(e)}")
continue
if not response or not response.ok:
return ItunesLoginResponse(
serverId="",
response=ItunesFailLoginPlistData(
@@ -106,105 +141,27 @@ class AppleClient:
failureType="MAX_RETRIES_EXCEEDED",
customerMessage="登录重试次数已用完",
),
guid="",
originLog="登录重试次数已用完",
)
headers = {
"X-Apple-ActionSignature": sign_map.signature,
"X-Apple-Store-Front": "143465-19,17",
"X-Apple-Partner": "origin.0",
"X-Apple-Client-Application": "Software",
"X-Apple-Connection-Type": "WiFi",
"X-Apple-Client-Versions": "GameCenter/2.0",
"X-Token-T": "M",
"Accept-Encoding": "gzip, deflate",
"Accept": "*/*",
"X-Apple-Tz": "28800",
"User-Agent": sign_map.userAgent,
"Content-Type": "application/x-apple-plist; Charset=UTF-8",
"Referer": "https://buy.itunes.apple.com/WebObjects/MZFinance.woa/wa/authenticate",
}
cookies = {
"mzf_in": "07281",
"s_vi": "",
"itsMetricsR": "Genre-CN-Mobile Software Applications-29099@@Mobile Software Applications-main@@@@",
"s_vnum_n2_us": "0|1",
}
params = {}
if server_id != "":
url = f"https://p{server_id}-buy.itunes.apple.com/WebObjects/MZFinance.woa/wa/authenticate"
params = {"Pod": server_id, "PRH": server_id}
else:
url = (
"https://buy.itunes.apple.com/WebObjects/MZFinance.woa/wa/authenticate"
)
self.__session.headers["X-Apple-Store-Front"] = "143465-19,12"
try:
response = self.__session.post(
url,
headers=headers,
cookies=cookies,
params=params,
data=sign_map.post,
allow_redirects=False,
timeout=30,
proxies=ProxyService().get_wrap_proxy(account_info.account_name),
)
except (requests.exceptions.SSLError, requests.exceptions.ProxyError) as e:
logger.error(f"SSL/代理错误: {str(e)}")
# 代理错误时,清除当前代理并重试
ProxyService().set_expire_strategy()
return self.login(sign_map, account_info, server_id, retries - 1)
except requests.exceptions.RequestException as e:
logger.error(f"请求错误: {str(e)}")
return self.login(sign_map, account_info, server_id, retries - 1)
if response.is_redirect:
redirect_url = response.headers["Location"]
groups = re.search(r"https://p(\d+)-buy.itunes.apple.com", redirect_url)
if groups:
server_id = groups.group(1)
return self.login(sign_map, account_info, server_id, retries - 1)
else:
logger.error(f"无法解析重定向URL: {redirect_url}")
return ItunesLoginResponse(
serverId="",
response=ItunesFailLoginPlistData(
status=30,
failureType="INVALID_REDIRECT",
customerMessage="无法解析重定向URL",
),
originLog=response.text,
)
try:
response_dict_data = parse_xml(response.text)
except Exception as e:
logger.error(f"解析响应XML失败: {str(e)}")
return ItunesLoginResponse(
serverId=server_id,
response=ItunesFailLoginPlistData(
status=30, failureType="PARSE_ERROR", customerMessage="解析响应失败"
),
originLog=response.text,
)
logger.info(f"<UNK>: {response.text}")
response_dict_data = parse_xml(response.text)
if "failureType" in response_dict_data:
status = 31
# 账户被禁用
if (
response_dict_data.get("metrics", {}).get("dialogId")
== "MZFinance.AccountDisabled"
):
status = 14
# 账户被锁定
if (
response_dict_data.get("metrics", {}).get("dialogId")
== "MZFinance.DisabledAndFraudLocked"
):
status = 14
# 密码错误
if response_dict_data.get("failureType") == "-5000":
status = 13
if status == 31:
@@ -212,11 +169,40 @@ class AppleClient:
response_model = ItunesFailLoginPlistData(
**{"status": status, **response_dict_data}
)
else:
response_model = ItunesSuccessLoginPlistData(**response_dict_data)
return ItunesLoginResponse(
serverId="",
response=response_model,
originLog=response.text,
)
pod = response.headers["pod"]
headers = {
"User-Agent": "MacAppStore/2.0 (Macintosh; OS X 12.6) AppleWebKit/613.3.9.1.16 build/7 (dt:1)",
"X-Apple-Store-Front": response.headers.get("x-apple-store-front"),
"X-Apple-ActionSignature": action_signature,
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"X-Apple-Software-Guid": "8dc642820fc6e1ec0cc40188",
"X-Apple-Client-Application": "Software",
"X-Apple-Connection-Type": "WiFi",
"X-Apple-Client-Versions": "iBooks/??; iTunesU/??; GameCenter/??; Podcasts/3.9",
"X-Apple-Tz": "28800",
"Accept-Language": "zh-CN, en;q=0.9, *;q=0.1",
"Accept": "*/*",
}
response = self.__session.post(
f"https://p{pod}-buy.itunes.apple.com/WebObjects/MZFinance.woa/wa/authenticate?Pod={pod}&PRH={pod}",
data=post_data,
headers=headers,
verify=False,
proxies=ProxyService().get_wrap_proxy(account.account),
allow_redirects=False,
)
response_dict_data = parse_xml(response.text)
return ItunesLoginResponse(
serverId=server_id, response=response_model, originLog=response.text
serverId=pod,
guid=guid,
originLog=response.text,
response=ItunesSuccessLoginPlistData(**response_dict_data),
)
def redeem(

View File

@@ -100,11 +100,11 @@ class ItunesSuccessLoginPlistData(BaseModel):
class ItunesFailLoginPlistData(BaseModel):
pings: list = Field([], alias="pings")
status: int = Field(..., alias="status")
failureType: str = Field(..., alias="failureType")
customerMessage: str = Field(..., alias="customerMessage")
m_allowed: bool = Field(..., alias="m-allowed")
pings: list = Field(default=[], alias="pings")
status: int = Field(default=0, alias="status")
failureType: str = Field(default="", alias="failureType")
customerMessage: str = Field(default="", alias="customerMessage")
m_allowed: bool = Field(default=False, alias="m-allowed")
class ItunesLoginResponse(BaseModel):
@@ -112,6 +112,7 @@ class ItunesLoginResponse(BaseModel):
response: ItunesSuccessLoginPlistData | ItunesFailLoginPlistData = Field(
..., alias="response"
)
guid: str = Field(default="", alias="guid")
origin_log: str = Field(default="", alias="originLog")

View File

@@ -1,7 +1,5 @@
import time
import uuid
from loguru import logger
from src.integrations.itunes.api import AppleClient
from src.integrations.itunes.models.login import (
@@ -11,7 +9,6 @@ from src.integrations.itunes.models.login import (
from src.integrations.itunes.models.redeem import (
RedeemSuccessResponse,
)
from src.integrations.june.api import SixClient
from src.integrations.june.models.login import AppleAccountModel, ItunesLoginModel
from src.models.model import (
LoginSuccessResponse,
@@ -28,43 +25,19 @@ from src.schema.models import (
class ItunesService:
def __init__(self):
self.june_client_service = SixClient()
self.apple_client_service = AppleClient()
def login(
self,
account: AppleAccountModel,
retries: int = 3,
) -> LoginSuccessResponse | LoginFailureResponse:
"""
登录itunes
:param retries:
:param account:
:return:
"""
start_time = time.time()
sign_sap_from_june = self.june_client_service.get_sign_sap_setup()
middle_time_1 = time.time()
logger.info(f"[+] 获取签到参数耗时(六月): {middle_time_1 - start_time}")
sign_sap_setup_buffer = self.apple_client_service.query_sign_sap_setup(
sign_sap_from_june.Data
)
middle_time_2 = time.time()
logger.info(f"[+] 获取签到证书耗时(苹果): {middle_time_2 - middle_time_1}")
sign_sap_cert = self.june_client_service.get_sign_sap_setup_cert(
account, sign_sap_from_june, sign_sap_setup_buffer
)
middle_time_3 = time.time()
logger.info(f"[+] 获取签到证书耗时(六月): {middle_time_3 - middle_time_2}")
login_schema = self.apple_client_service.login(
sign_sap_cert.Data,
ItunesAccountInfo(
account_name=account.account,
),
)
logger.info(f"[+] 登录耗时(苹果): {time.time() - middle_time_3}")
logger.info(f"[+] 登录耗时合计: {time.time() - start_time}")
login_schema = self.apple_client_service.login(account)
session = get_session()()
db_id = uuid.uuid4().hex
@@ -82,7 +55,7 @@ class ItunesService:
response_result = LoginSuccessResponse(
login_schema=ItunesLoginModel(
server_id=login_schema.server_id,
guid=sign_sap_cert.Data.guid,
guid=login_schema.guid,
dsis=int(login_schema.response.ds_person_id),
passwordToken=login_schema.response.password_token,
),
@@ -128,7 +101,6 @@ class ItunesService:
result = self.apple_client_service.redeem(
code,
ItunesLoginModel(
account_name=item.account_name,
server_id=item.login_schema.server_id,
guid=item.login_schema.guid,
dsis=int(item.login_schema.dsis),

View File

@@ -1,6 +1,6 @@
from unittest import TestCase
from src.utils.crypto import encrypt_with_aes, AESKey
from src.utils.crypto import encrypt_with_aes, AESKey, decrypt_with_aes
class Test(TestCase):
@@ -11,3 +11,12 @@ class Test(TestCase):
AESKey.load_from_base64("nywao1XkDXeYwbPeWh+SxA=="),
)
print(result)
def test_decrypt_with_aes(self):
result = decrypt_with_aes(
"uDdGDAizNq7EuF5TH163PA==",
AESKey.load_from_base64(
"P0x6Gy6dXIpPbhE7PHxaHbfZHhsbT2qNPlx3qbHTP1o="
),
AESKey.load_from_base64("nywao1XkDXeYwbPeWh+SxA=="),
)
print(result)