feat(apple): 重构并实现Apple iTunes和June客户端API模块

- 重命名apps/app_b为apps/apple,调整项目结构
- 新增apps.apple.clients.itunes模块,实现iTunes API客户端功能
- 实现iTunes登录、兑换和查询接口,支持错误重试和状态处理
- 设计解析Apple XML响应的工具函数,提升数据处理能力
- 定义iTunes登录和兑换相关数据模型,基于Pydantic提升数据校验
- 新增apps.apple.clients.june模块,实现June API客户端功能
- 实现六月客户端登录、状态检测、签名获取及远程账户登录
- 设计June客户端请求加密与签名机制,保障接口安全通信
- 增加六月客户端配置、加密工具和辅助函数支持
- 完善模块__init__.py文件,明确导出API客户端类
This commit is contained in:
danial
2025-11-01 15:22:21 +08:00
parent aebc83edc9
commit 949a0d4e61
28 changed files with 1384 additions and 18 deletions

View File

@@ -0,0 +1 @@
"""Apple clients module."""

View File

@@ -0,0 +1,5 @@
"""iTunes API client module."""
from apps.apple.clients.itunes.api import AppleClient
__all__ = ["AppleClient"]

View File

@@ -0,0 +1,361 @@
import pickle
import re
import traceback
from typing import Optional
import httpx
from observability.logging import get_logger
from apps.apple.clients.itunes.models.login import (
ItunesLoginResponse,
ItunesSuccessLoginPlistData,
ItunesFailLoginPlistData,
ItunesAccountInfo,
)
from apps.apple.clients.itunes.models.redeem import (
RedeemSuccessResponse,
RedeemFailResponseModel,
)
from apps.apple.clients.june.models.login import (
LoginSignatureModel,
ItunesLoginModel,
)
from apps.apple.clients.june.models.redeem import AuthenticateModel
from apps.apple.clients.itunes.utils import parse_xml
from core.clients.http_client import HTTPClient
logger = get_logger(__name__)
class AppleClient:
def __init__(self):
self._client: Optional[HTTPClient] = None
async def _get_client(self) -> HTTPClient:
"""获取或创建HTTP客户端"""
if self._client is None:
self._client = HTTPClient(
base_url="https://play.itunes.apple.com",
timeout=30.0,
max_retries=5,
max_connections=10,
max_keepalive_connections=10,
)
return self._client
async def close(self):
"""关闭客户端连接"""
if self._client:
await self._client.close()
self._client = None
async def query_sign_sap_setup(self, signature: LoginSignatureModel, retries=3) -> str:
if retries <= 0:
return ""
client = await self._get_client()
# 构建Cookie字符串
cookies_dict = {
"mzf_in": "07281",
"s_vi": "",
"itsMetricsR": "Genre-CN-Mobile Software Applications-29099@@Mobile Software Applications-main@@@@",
"s_vnum_n2_us": "0|1",
}
if signature.serverId != "0":
cookies_dict["pod"] = signature.serverId
cookies_dict["itspod"] = signature.serverId
try:
response = await client.post(
"/WebObjects/MZPlay.woa/wa/signSapSetup",
data=signature.signature,
headers={
"X-Apple-Store-Front": "1433465-19,17",
"X-Apple-Partner": "origin.0",
"X-Apple-Client-Application": "Software",
"X-Apple-Connection-Type": "WiFi",
"X-Apple-Client-Versions": "GameCenter/2.0",
"X-Token-T": "M",
"X-Apple-Tz": "28800",
"Content-Type": "application/x-apple-plist; Charset=UTF-8",
"Cookie": "; ".join([f"{k}={v}" for k, v in cookies_dict.items()]),
},
)
return response.text
except Exception as e:
logger.info(f"请求发生错误: {e}")
return await self.query_sign_sap_setup(signature, retries - 1)
async def login(
self, sign_map: AuthenticateModel, account_info: ItunesAccountInfo, server_id: str = "", retries: int = 5
) -> ItunesLoginResponse:
headers = {
"X-Apple-ActionSignature": sign_map.signature,
"X-Apple-Store-Front": "143465-19,17",
"X-Apple-Partner": "origin.0",
"X-Apple-Client-Application": "Software",
"X-Apple-Connection-Type": "WiFi",
"X-Apple-Client-Versions": "GameCenter/2.0",
"X-Token-T": "M",
"Accept-Encoding": "gzip, deflate",
"Accept": "*/*",
"X-Apple-Tz": "28800",
"User-Agent": sign_map.userAgent,
"Content-Type": "application/x-apple-plist; Charset=UTF-8",
"Referer": "https://buy.itunes.apple.com/WebObjects/MZFinance.woa/wa/authenticate",
}
# 构建Cookie字符串
cookies_dict = {
"mzf_in": "07281",
"s_vi": "",
"itsMetricsR": "Genre-CN-Mobile Software Applications-29099@@Mobile Software Applications-main@@@@",
"s_vnum_n2_us": "0|1",
}
headers["Cookie"] = "; ".join([f"{k}={v}" for k, v in cookies_dict.items()])
params = {}
base_url = "https://buy.itunes.apple.com"
path = "/WebObjects/MZFinance.woa/wa/authenticate"
if server_id != "":
base_url = f"https://p{server_id}-buy.itunes.apple.com"
params = {"Pod": server_id, "PRH": server_id}
# 创建临时客户端用于登录请求
login_client = HTTPClient(
base_url=base_url,
timeout=30.0,
headers={"X-Apple-Store-Front": "143465-19,12"},
)
try:
response = await login_client.request(
"POST",
path,
headers=headers,
params=params,
data=sign_map.post,
)
except httpx.HTTPStatusError as e:
response = e.response
finally:
await login_client.close()
if response.status_code in (301, 302, 303, 307, 308):
redirect_url = response.headers.get("Location", "")
groups = re.search(r"https://p(\d+)-buy.itunes.apple.com", redirect_url)
if groups:
server_id = groups.group(1)
return await self.login(sign_map, account_info, server_id, retries - 1)
response_dict_data = parse_xml(response.text)
if "failureType" in response_dict_data:
status = 31
# 账户被禁用
if (
response_dict_data.get("metrics", {}).get("dialogId")
== "MZFinance.AccountDisabled"
):
status = 14
# 账户被锁定
if (
response_dict_data.get("metrics", {}).get("dialogId")
== "MZFinance.DisabledAndFraudLocked"
):
status = 14
# 密码错误
if response_dict_data.get("failureType") == "-5000":
status = 13
if status == 31:
logger.warning("登录状态未知:", response_dict_data)
response_model = ItunesFailLoginPlistData(
**{"status": status, **response_dict_data}
)
else:
response_model = ItunesSuccessLoginPlistData(**response_dict_data)
return ItunesLoginResponse(
serverId=server_id, response=response_model, originLog=response.text
)
async def redeem(
self,
code: str,
itunes: ItunesLoginModel,
account_info: ItunesAccountInfo,
reties=5,
) -> RedeemSuccessResponse | RedeemFailResponseModel:
if reties <= 0:
logger.error("兑换失败,兑换重试次数已用完")
return RedeemFailResponseModel(
status=30,
errorMessageKey="",
errorMessage="兑换失败,兑换重试次数已用完",
origin_log="兑换失败,兑换重试次数已用完",
userPresentableErrorMessage="兑换失败,兑换重试次数已用完",
)
base_url = f"https://p{itunes.server_id}-buy.itunes.apple.com"
path = "/WebObjects/MZFinance.woa/wa/redeemCodeSrv"
headers = {
"X-Apple-Store-Front": "143465-19,13",
"X-Dsid": str(itunes.dsis),
"X-Token": itunes.password_token,
"X-Apple-Software-Guid": itunes.guid,
"X-Apple-Partner": "origin.0",
"X-Apple-Client-Application": "Software",
"X-Apple-Connection-Type": "WiFi",
"X-Apple-Client-Versions": "GameCenter/2.0",
"X-Token-T": "M",
"X-Apple-Tz": "28800",
"Accept-Encoding": "gzip, deflate",
"Content-Type": "application/x-apple-plist; Charset=UTF-8",
"User-Agent": "MacAppStore/2.0 (Macintosh; OS X 12.10) AppleWebKit/600.1.3.41",
"Referer": f"{base_url}/WebObjects/MZFinance.woa/wa/com.apple.jingle.app.finance.DirectAction/redeemCode?cl=iTunes&pg=Music",
}
redeem_client = HTTPClient(
base_url=base_url,
timeout=30.0,
)
try:
response = await redeem_client.post(
path,
data=self._build_redeem_xml(
code=code,
ds_personId=itunes.dsis,
guid=itunes.guid,
),
headers=headers,
)
except Exception as e:
logger.warning(f"兑换连接错误,重试:{e}\t{traceback.format_exc()}")
await redeem_client.close()
return await self.redeem(code, itunes, account_info, reties - 1)
finally:
await redeem_client.close()
logger.info(f"返回状态码:{response.status_code}")
try:
if response.status_code == 429:
return RedeemFailResponseModel(
status=32,
originStatusCode=response.status_code,
errorMessageKey="",
errorMessage="触发充值次数限制",
origin_log=response.text,
userPresentableErrorMessage="触发充值次数限制",
)
if not response.text.strip():
return RedeemFailResponseModel(
status=40,
errorMessageKey="",
originStatusCode=response.status_code,
errorMessage="兑换1分钟限制",
origin_log=response.text,
userPresentableErrorMessage="兑换1分钟限制",
)
if "xml" in response.text:
if "MZFinance.RedeemCodeSrvLoginRequired" in response.text:
return RedeemFailResponseModel(
status=0,
errorMessageKey="",
originStatusCode=response.status_code,
errorMessage="需要登录",
origin_log=response.text,
userPresentableErrorMessage="需要登录",
)
else:
if response.json().get("status") == 0:
result = RedeemSuccessResponse.model_validate(response.json())
result.origin_log = response.text
return result
result = RedeemFailResponseModel.model_validate(response.json())
result.origin_log = response.text
if (
result.errorMessageKey
== "MZCommerce.GiftCertificateAlreadyRedeemed"
):
# 已经被兑换
result.status = 12
elif result.errorMessageKey == "MZFreeProductCode.NoSuch":
# 没有这个卡密
result.status = 11
elif (
result.errorMessageKey
== "MZCommerce.NatIdYearlyCapExceededException"
):
# 年限额
result.status = 31
elif (
result.errorMessageKey
== "MZCommerce.NatIdDailyCapExceededException"
):
# 日限额
result.status = 31
# 国籍问题
elif (
result.errorMessageKey
== "MZCommerce.GiftCertRedeemStoreFrontMismatch"
):
result.status = 15
elif (
result.errorMessageKey
== "MZCommerce.GiftCertificateDisabled"
):
result.status = 11
else:
logger.error(f"失败状态未知:{response.json()}")
if result.status == -1 or result.status == 0:
result.status = 30
logger.warning("兑换状态未知:", response.text)
return result
except Exception as e:
logger.error(f"json格式化失败{e}\t返回值{response.text}")
return RedeemFailResponseModel(
status=30,
errorMessageKey="",
originStatusCode=response.status_code,
errorMessage="状态未知",
origin_log=response.text,
userPresentableErrorMessage="状态未知",
)
# 注意由于使用了HTTPClientcookie管理方式已改变
# 如需持久化cookie需要自行管理httpx.Cookies对象
def _build_redeem_xml(self, code: str, ds_personId: int, guid: str) -> str:
"""构建兑换请求的XML数据"""
from xml.etree import ElementTree
plist = ElementTree.Element("plist", version="1.0")
dict_elem = ElementTree.SubElement(plist, "dict")
data = {
"attempt_count": "1",
"camera_recognized_code": "false",
"cl": "iTunes",
"code": code,
"ds_personId": str(ds_personId),
"guid": guid,
"has_4gb_limit": "false",
"kbsync": "",
"pg": "Music",
"response-content-type": "application/json",
}
for key, value in data.items():
key_elem = ElementTree.SubElement(dict_elem, "key")
key_elem.text = key
if key == "kbsync":
value_elem = ElementTree.SubElement(dict_elem, "data")
value_elem.text = value
else:
value_elem = ElementTree.SubElement(dict_elem, "string")
value_elem.text = value
xml_str = ElementTree.tostring(plist, encoding="utf-8", method="xml").decode()
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>\n'
return xml_declaration + xml_str

View File

@@ -0,0 +1 @@
"""iTunes API models."""

View File

@@ -0,0 +1,119 @@
from typing import Optional
from pydantic import BaseModel, Field
class ItunesLoginAddress(BaseModel):
first_name: str = Field(..., alias="firstName")
last_name: str = Field(..., alias="lastName")
class ItunesLoginAccountInfo(BaseModel):
apple_id: str = Field(..., alias="appleId")
address: ItunesLoginAddress
class ItunesLoginSubscriptionTerms(BaseModel):
type: str = Field(..., alias="type")
latest_terms: int = Field(..., alias="latestTerms")
agreed_to_terms: int = Field(..., alias="agreedToTerms")
source: str = Field(..., alias="source")
class ItunesLoginSubscriptionAccount(BaseModel):
is_minor: bool = Field(..., alias="isMinor")
suspect_underage: bool = Field(..., alias="suspectUnderage")
class ItunesLoginSubscriptionFamily(BaseModel):
has_family: bool = Field(..., alias="hasFamily")
class ItunesLoginSubscriptionStatus(BaseModel):
terms: list[ItunesLoginSubscriptionTerms]
account: ItunesLoginSubscriptionAccount
family: ItunesLoginSubscriptionFamily
class ItunesLoginAccountFlags(BaseModel):
personalization: bool = Field(..., alias="underThirteen")
under_thirteen: bool = Field(..., alias="underThirteen")
identity_last_verified: Optional[int] = Field(None, alias="identityLastVerified")
verified_expiration_date: Optional[int] = Field(
None, alias="verifiedExpirationDate"
)
retail_demo: bool = Field(..., alias="retailDemo")
auto_play: bool = Field(..., alias="autoPlay")
is_disabled_account: bool = Field(..., alias="isDisabledAccount")
is_restricted_account: bool = Field(..., alias="isRestrictedAccount")
is_managed_account: bool = Field(..., alias="isManagedAccount")
is_in_restricted_region: bool = Field(..., alias="isInRestrictedRegion")
account_flags_version: int = Field(..., alias="accountFlagsVersion")
is_in_bad_credit: bool = Field(..., alias="isInBadCredit")
has_agreed_to_terms: bool = Field(..., alias="hasAgreedToTerms")
has_agreed_to_app_clip_terms: bool = Field(..., alias="hasAgreedToAppClipTerms")
has_watch_hardware_offer: bool = Field(..., alias="hasWatchHardwareOffer")
is_in_family: bool = Field(..., alias="isInFamily")
has_subscription_family_sharing_enabled: bool = Field(
..., alias="hasSubscriptionFamilySharingEnabled"
)
class ItunesLoginPrivacyAcknowledgement(BaseModel):
com_apple_onboarding_ibooks: int = Field(..., alias="com.apple.onboarding.ibooks")
# com_apple_onboarding_appstore: int = Field(
# ..., alias="com.apple.onboarding.appstore"
# )
# com_apple_onboarding_applemusic: int = Field(
# ..., alias="com.apple.onboarding.applemusic"
# )
# com_apple_onboarding_itunesstore: int = Field(
# ..., alias="com.apple.onboarding.itunesstore"
# )
# com_apple_onboarding_itunesu: int = Field(..., alias="com.apple.onboarding.itunesu")
# com_apple_onboarding_applearcade: int = Field(
# ..., alias="com.apple.onboarding.applearcade"
# )
class ItunesSuccessLoginPlistData(BaseModel):
pings: list[str] = Field([], alias="pings")
# account_info: ItunesLoginAccountInfo = Field(..., alias="accountInfo")
alt_dsid: str = Field(..., alias="altDsid")
password_token: str = Field(..., alias="passwordToken")
clear_token: str = Field(..., alias="clearToken")
m_allowed: bool = Field(..., alias="m-allowed")
is_cloud_enabled: bool = Field(..., alias="is-cloud-enabled")
ds_person_id: str = Field(..., alias="dsPersonId")
credit_display: str = Field(default="", alias="creditDisplay")
credit_balance: str = Field(..., alias="creditBalance")
free_song_balance: str = Field(..., alias="freeSongBalance")
is_managed_student: bool = Field(..., alias="isManagedStudent")
# subscription_status: ItunesLoginSubscriptionStatus = Field(
# ..., alias="subscriptionStatus"
# )
# account_flags: ItunesLoginAccountFlags = Field(..., alias="accountFlags")
status: int = Field(..., alias="status")
# privacy_acknowledgement: ItunesLoginPrivacyAcknowledgement = Field(
# ..., alias="privacyAcknowledgement"
# )
class ItunesFailLoginPlistData(BaseModel):
pings: list = Field([], alias="pings")
status: int = Field(..., alias="status")
failureType: str = Field(..., alias="failureType")
customerMessage: str = Field(..., alias="customerMessage")
m_allowed: bool = Field(..., alias="m-allowed")
class ItunesLoginResponse(BaseModel):
server_id: str = Field(..., alias="serverId")
response: ItunesSuccessLoginPlistData | ItunesFailLoginPlistData = Field(
..., alias="response"
)
origin_log: str = Field(default="", alias="originLog")
class ItunesAccountInfo(BaseModel):
account_name: str = Field(..., description="账号")

View File

@@ -0,0 +1,60 @@
from pydantic import BaseModel, Field, ConfigDict
class RedeemFailResponseModel(BaseModel):
model_config = ConfigDict(extra="ignore")
errorMessageKey: str = Field(default="", alias="errorMessageKey")
errorMessage: str = Field(default="", alias="errorMessage")
userPresentableErrorMessage: str = Field(
default="", alias="userPresentableErrorMessage"
)
origin_log: str = Field(default="")
origin_status_code: int = Field(default=0, alias="originStatusCode", description="原始状态码")
status: int = Field(..., alias="status", description="0.需要登录 1.正常")
class TotalCreditModel(BaseModel):
# movieRentalBalance: int
# songBalance: int
# videoBalance: int
money: str = Field(default="")
totalCredit: str = Field(default="")
moneyRaw: float = Field(default=0)
# gameBalance: int
# tvRentalBalance: int
class CustomSuccessData(BaseModel):
isAmplifyRedemption: bool
isUpsell: bool
class RedeemedCredit(BaseModel):
movieRentalBalance: int
songBalance: int
videoBalance: int
money: str
totalCredit: str
moneyRaw: float = Field(default=0)
gameBalance: int
tvRentalBalance: int
class RedeemSuccessResponse(BaseModel):
# woinst: int = Field(default=0)
creditDisplay: str = Field(default="")
totalCredit: TotalCreditModel = Field(default_factory=TotalCreditModel)
# giftCertificateBonusId: int
# dsPersonId: str
# redeemCount: int
# isOptedInForPersonalizedRecommendations: bool
# isSubscription: bool
# triggerDownload: bool
# customSuccessData: CustomSuccessData
redeemedCredit: RedeemedCredit = Field(default_factory=RedeemedCredit)
# wosid: str
# protocolVersion: str
email: str = Field(default="")
status: int = Field(default=0)
origin_log: str = Field(default="", alias="originLog")

View File

@@ -0,0 +1,38 @@
from xml.etree import ElementTree
def parse_xml(xml_str: str) -> dict:
root = ElementTree.fromstring(xml_str)
dict_data = {}
key = ""
for child in root.find("dict"):
if child.tag == "key":
key = child.text
else:
if child.tag == "integer":
dict_data[key] = int(child.text)
elif child.tag == "string":
dict_data[key] = child.text if child.text else ""
elif child.tag == "true":
dict_data[key] = True
elif child.tag == "false":
dict_data[key] = False
elif child.tag == "array":
dict_data[key] = []
elif child.tag == "dict":
dict_data[key] = parse_xml_tree([x for x in child])
return dict_data
def parse_xml_tree(tree_list: list[ElementTree]):
dict_data = {}
key = ""
for child in tree_list:
if child.tag == "key":
key = child.text
else:
if child.tag == "string":
dict_data[key] = child.text if child.text else ""
if child.tag == "dict":
dict_data[key] = parse_xml_tree([x for x in child])
return dict_data

View File

@@ -0,0 +1,5 @@
"""June API client module."""
from apps.apple.clients.june.api import SixClient
__all__ = ["SixClient"]

View File

@@ -0,0 +1,242 @@
import base64
import hashlib
import json
import random
import time
import traceback
from typing import Any, Optional
from urllib import parse
from observability.logging import get_logger
from apps.apple.clients.june.config import Config
from apps.apple.clients.june.crypto import encrypt_cbc_base64, encrypt
from apps.apple.clients.june.models.login import (
AppleSixResponseModel,
LoginUserInfo,
AppleAccountModel,
LoginSignatureModel,
)
from apps.apple.clients.june.models.redeem import AuthenticateModel
from apps.apple.clients.june.utils.utils import ShareCodeUtils, decode_and_decompress
from core.clients.http_client import HTTPClient
logger = get_logger(__name__)
class SixClient:
def __init__(self):
self.__base_url = "http://43.240.73.119:6113"
self._client: Optional[HTTPClient] = None
async def _get_client(self) -> HTTPClient:
"""获取或创建HTTP客户端"""
if self._client is None:
self._client = HTTPClient(
base_url=self.__base_url,
timeout=30.0,
max_retries=3,
)
return self._client
async def close(self):
"""关闭客户端连接"""
if self._client:
await self._client.close()
self._client = None
async def _do_post(
self, post_data: Any, type_: str, start_now_fun: str = "0", reties: int = 3
) -> AppleSixResponseModel | None:
if reties <= 0:
return None
client = await self._get_client()
req_count = random.randint(0, 90) + 1
text = (
str(int(time.time()) + req_count)
+ str(Config.user_info.uid).zfill(4)
+ str(req_count)
).zfill(16)
if len(text) > 16:
text = text[:16]
# 生成32位md5
md5_ = hashlib.md5((Config.saff_mac + "by六月的风").encode()).hexdigest()
key = "7a588e60045849a1"
text2 = "90e7b0dc3ef2134c"
text3 = encrypt_cbc_base64(
encrypt(
json.dumps(post_data, separators=(",", ":")), key, text, False, text2
),
text,
text2,
).strip()
md5_2 = hashlib.md5(
(text3 + type_ + md5_ + "by六月的风_联系qq:1023092054").encode("utf-8")
).hexdigest()
headers = {
"timestamp": text,
"mac": md5_,
"startNowFun": start_now_fun,
"version": "5.1.9",
"User-Agent": "liuyeu_AppleBatch_June",
"Content-Type": "application/x-www-form-urlencoded",
}
if Config.user_info.token:
headers["token"] = base64.b64encode(
encrypt_cbc_base64(Config.user_info.token, text, text2).encode("utf-8")
).decode("utf-8")
try:
response = await client.post(
"/AppleClientApi/requestApi",
data={
"authentiString": base64.b64encode(text3.encode("utf-8")).decode(
"utf-8"
),
"sign": md5_2,
"type": type_,
},
headers=headers,
)
except Exception as e:
logger.error(e)
logger.warning("请求错误,重试~")
return await self._do_post(post_data, type_, start_now_fun, reties - 1)
if response.status_code == 200:
if (
response.headers.get("sign")
and hashlib.md5(
(
"abc_" + response.text + text + "by六月的风_联系qq:1023092054"
).encode("utf-8")
).hexdigest()
!= response.headers.get("sign")
):
raise Exception("签名错误")
return AppleSixResponseModel(**response.json())
logger.warning("请求错误,重试~")
return await self._do_post(post_data, type_, start_now_fun, reties - 1)
async def login(self):
response = await self._do_post(
{"account": "q905262752", "pwd": "Aa112211"},
"ApiLogin",
)
if response and response.Data:
Config.user_info = LoginUserInfo(**response.Data)
Config.user_info.uid = str(ShareCodeUtils.code_to_id(Config.user_info.userNumber))
async def check_is_login(self) -> bool:
response = await self._do_post(
{"token": Config.user_info.token, "type": 9},
"ApiIsLogin",
)
if response and response.Code == "0000" and response.Message == "已经登录":
return True
return False
async def login_remote_apple_account(
self, account: AppleAccountModel
) -> AppleSixResponseModel[dict] | None:
response = await self._do_post(
{
"token": Config.user_info.token,
"account": account.account,
"pwd": account.password,
"isStore": 2,
"guid": "",
"serviceIpIndex": -1,
},
"ApiItunesLogin",
)
if response and response.Data:
response.Data = json.loads(response.Data)
return AppleSixResponseModel[dict].model_validate(response.model_dump())
return None
async def get_sign_sap_setup(
self, reties: int = 3
) -> AppleSixResponseModel[LoginSignatureModel] | None:
if reties < 0:
return None
response = await self._do_post(
json.dumps(
{
"gZip": "1",
"type": "GetSignsapsetup",
"userAgent": "MacAppStore/2.0 (Macintosh; OS X 12.10) AppleWebKit/600.1.3.41",
},
separators=(",", ":"),
),
"ApiServiceSend",
start_now_fun="9",
)
if not response or not response.Data:
return None
response.Data = json.loads(decode_and_decompress(response.Data))
if response.Data.get("msg") == "请重试":
logger.info(f"重试六月登录,{response}")
time.sleep(1)
return await self.get_sign_sap_setup(reties - 1)
response = AppleSixResponseModel[LoginSignatureModel].model_validate(
response.model_dump()
)
# 处理signature编码问题和Base64转码问题
response.Data.signature = base64.b64decode(
parse.unquote_plus(response.Data.signature)
).decode()
return response
async def get_sign_sap_setup_cert(
self,
account: AppleAccountModel,
sign: AppleSixResponseModel[LoginSignatureModel],
sign_sap_setup: str,
reties: int = 3,
) -> AppleSixResponseModel[AuthenticateModel] | None:
if reties < 0:
return None
response = await self._do_post(
json.dumps(
{
"signSap": parse.quote_plus(
base64.b64encode(sign_sap_setup.encode()).decode()
),
"appleId": account.account,
"guid": "",
"applePwd": account.password,
"intptr_": sign.Data.adder1,
"intPtr": sign.Data.adder2,
"idType": 1,
"gZip": "1",
"type": "GetSignsapsetupCert",
"serviceIndex": sign.serverIndex,
},
separators=(",", ":"),
),
"ApiServiceSend",
start_now_fun="9",
)
if not response or not response.Data:
return None
try:
response.Data = json.loads(decode_and_decompress(response.Data))
except AttributeError as e:
logger.error(f"获取cert失败{response},错误信息:{traceback.format_exc()}")
return await self.get_sign_sap_setup_cert(account, sign, sign_sap_setup, reties - 1)
if response.Data.get("msg") == "请重试":
logger.info(f"重试六月登录,{response}")
time.sleep(1)
return await self.get_sign_sap_setup_cert(
account, sign, sign_sap_setup, reties - 1
)
response = AppleSixResponseModel[AuthenticateModel].model_validate(
response.model_dump()
)
# 解码数据
response.Data.signature = parse.unquote_plus(response.Data.signature)
response.Data.guid = parse.unquote_plus(response.Data.guid)
response.Data.post = parse.unquote_plus(response.Data.post)
return response

View File

@@ -0,0 +1,14 @@
import hashlib
from apps.apple.clients.june.models.login import LoginUserInfo
from apps.apple.clients.june.utils.utils import MachineCode
class Config:
saff_mac = hashlib.md5(
(MachineCode().get_machine_code_string() + "巴拉拉小魔仙").encode("utf-8")
).hexdigest()
new_saff_mac = hashlib.md5(
(MachineCode().get_machine_code_string(True) + "LiuYue_Acbse").encode("utf-8")
).hexdigest()
user_info = LoginUserInfo()

View File

@@ -0,0 +1,113 @@
import base64
import hashlib
import random
import time
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
def decode_base64(data: str):
text = data.replace("%", "").replace(",", "").replace(" ", "+")
if len(text) % 4 > 0:
text.ljust(len(text) + 4 - len(text) % 4, "=")
return base64.decodebytes(text.encode()).decode("utf-8")
def encrypt_cbc_base64(data: str, key: str, iv: str) -> str:
"""使用AES CBC模式加密并返回Base64编码的结果"""
key_bytes = key.encode("utf-8")
iv_bytes = iv.encode("utf-8")
to_encrypt_bytes = data.encode("utf-8")
if len(key_bytes) != 16 or len(iv_bytes) != 16:
raise ValueError("Key and IV must be 16 bytes long.")
# Pad data to AES block size (16 bytes)
pad_length = 16 - len(to_encrypt_bytes) % 16
to_encrypt_bytes += bytes([0] * pad_length)
# Create cipher and encrypt
cipher = Cipher(
algorithms.AES(key_bytes),
modes.CBC(iv_bytes),
backend=default_backend()
)
encryptor = cipher.encryptor()
encrypted_bytes = encryptor.update(to_encrypt_bytes) + encryptor.finalize()
# Return base64 encoded result
encrypted_base64 = base64.b64encode(encrypted_bytes).decode()
return encrypted_base64
def encrypt(
d_string: str, key_a: str, key_b: str, operation: bool, key: str = "", expiry: int = 0
) -> str:
"""自定义加密/解密算法"""
num = 4
key = hashlib.md5(key.encode("utf-8")).hexdigest()
if operation:
text = d_string[0:num]
else:
text = hashlib.md5(
(str(random.random())[:8] + "00 " + str(int(time.time()))).encode("utf-8")
).hexdigest()[-num:]
text2 = key_a + hashlib.md5((key_a + text).encode("utf-8")).hexdigest()
if not operation:
if expiry != 0:
tmp_text = str(expiry) + str(int(time.time()))
else:
tmp_text = "0"
d_string = (
tmp_text
+ tmp_text.ljust(10 - len(tmp_text), "0")
+ hashlib.md5((d_string + key_b).encode("utf-8")).hexdigest()[:16]
+ d_string
)
array: str | None = None
if operation:
array = decode_base64(d_string[num:])
num2 = len(array) if (operation and array is not None) else len(d_string)
array2 = bytearray(num2)
array3 = list(range(256))
array4 = list(map(lambda x: 0, range(256)))
for i in range(256):
array4[i] = ord(text2[i % len(text2)])
num3 = 0
for j in range(256):
num3 = (num3 + array3[j] + array4[j]) % 256
num4 = array3[j]
array3[j] = array3[num3]
array3[num3] = num4
k = 0
num5 = 0
num6 = 0
while k < num2:
num6 = (num6 + 1) % 256
num5 = (num5 + array3[num6]) % 256
num7 = array3[num6]
array3[num6] = array3[num5]
array3[num5] = num7
if operation and array is not None:
array2[k] = ord(array[k]) ^ array3[(array3[num6] + array3[num5]) % 256]
else:
array2[k] = ord(d_string[k]) ^ array3[(array3[num6] + array3[num5]) % 256]
k += 1
if operation:
# array2转str
new_str = array2.decode("utf-8")
if (
int(new_str[:10]) != 0
and (float(new_str[:10]) - time.time()) <= 0
and not (
new_str[10:16]
== hashlib.md5((new_str[26:] + key_b).encode("utf-8")).hexdigest()[0:16]
)
):
return ""
return new_str[26:]
return text + base64.b64encode(array2).decode().replace("=", "")

View File

@@ -0,0 +1 @@
"""June API models."""

View File

@@ -0,0 +1,94 @@
from datetime import datetime
from typing import Any, TypeVar, Generic
from pydantic import BaseModel, Field
class LoginUserInfo(BaseModel):
token: str = Field(default="")
userName: str = Field(default="")
userPwd: str = Field(default="")
integral: int = Field(default=0)
freezeIntegral: int = Field(default=0)
userNumber: str = Field(default="")
guid: str = Field(default="")
uid: str = Field(default="")
class LoginSessionInfo(BaseModel):
integral: int = Field(default=0)
userType: str = Field(default="")
userNumber: str = Field(default="")
userTypeName: str = Field(default="")
expirationTime: datetime = Field(default=datetime.now())
freezeIntegral: int = Field(default=0)
serverIndex: int = Field(default=0)
class Cookies(BaseModel):
wosid: str = Field(None, alias="wosid")
woinst: str = Field(None, alias="woinst")
ns_mzf_inst: str = Field(..., alias="ns-mzf-inst")
mzf_in: str = Field(None, alias="mzf_in")
mzf_dr: str = Field(None, alias="mzf_dr")
hsaccnt: str = Field(None, alias="hsaccnt")
session_store_id: str = Field(..., alias="session-store-id")
X_Dsid: str = Field(..., alias="X-Dsid")
mz_at0_135096725: str = Field(..., alias="mz_at0-135096725")
ampsc: str = Field(None, alias="ampsc")
mz_at_ssl_135096725: str = Field(..., alias="mz_at_ssl-135096725")
mz_at_mau_135096725: str = Field(..., alias="mz_at_mau-135096725")
pldfltcid: str = Field(None, alias="pldfltcid")
tv_pldfltcid: str = Field(..., alias="tv-pldfltcid")
wosid_lite: str = Field(..., alias="wosid-lite")
itspod: str = Field(None, alias="itspod")
class RemoteCookieModel(BaseModel):
msg: str = Field(..., alias="msg")
Balance: str = Field(..., alias="Balance")
Area: str = Field(..., alias="Area")
UserAgent: str = Field(..., alias="UserAgent")
cookis: Cookies = Field(..., alias="cookis")
xtoken: str = Field(..., alias="xtoken")
dsis: str = Field(..., alias="dsis")
Kbsync: Any = Field(..., alias="Kbsync")
software: str = Field(..., alias="software")
Guid: str = Field(..., alias="Guid")
ServerId: str = Field(..., alias="ServerId")
isDisabledAccount: str = Field(..., alias="isDisabledAccount")
class LoginSignatureModel(BaseModel):
msg: str = Field(..., alias="msg")
signature: str = Field(..., alias="signature")
serverId: str = Field(..., alias="serverId")
adder1: int = Field(..., alias="adder1")
adder2: int = Field(..., alias="adder2")
userAgent: str = Field(..., alias="userAgent")
class AppleAccountModel(BaseModel):
account: str = Field(default="")
password: str = Field(default="")
class ItunesLoginModel(BaseModel):
server_id: str = Field(default="")
dsis: int = Field(default=0)
guid: str = Field(default="")
password_token: str = Field(default="", alias="passwordToken")
# 泛型
DataT = TypeVar("DataT")
class AppleSixResponseModel(BaseModel, Generic[DataT]):
Code: str = Field(default="")
Message: str = Field(default="")
Data: DataT = Field(default="")
serverIndex: int = Field(default=0)
extend: str = Field(default="")
authenUserInfo: str = Field(default="")
platformName: str = Field(default="")

View File

@@ -0,0 +1,44 @@
from xml.etree import ElementTree
from pydantic import BaseModel, Field
class AuthenticateModel(BaseModel):
msg: str = Field(..., alias="msg")
post: str = Field(..., alias="post")
signature: str = Field(..., alias="signature")
userAgent: str = Field(..., alias="userAgent")
guid: str = Field(..., alias="guid")
class ItunesRedeemModel(BaseModel):
attempt_count: int = Field(default=0)
camera_recognized_code: bool = Field(default=False)
cl: str = Field(default="iTunes")
code: str = Field(default="")
ds_personId: int = Field(default=0)
guid: str = Field(default="")
has_4gb_limit: bool = Field(default=False)
kbsync: str = Field(default="")
pg: str = Field(default="")
response_content_type: str = Field(
alias="response-content-type", default="application/json"
)
def to_xml(self):
plist = ElementTree.Element("plist", version="1.0")
dict_elem = ElementTree.SubElement(plist, "dict")
for key, value in self.model_dump(by_alias=True).items():
key_elem = ElementTree.SubElement(dict_elem, "key")
key_elem.text = key
if key == "kbsync":
value_elem = ElementTree.SubElement(dict_elem, "data")
value_elem.text = str(value)
else:
value_elem = ElementTree.SubElement(dict_elem, "string")
value_elem.text = str(value)
xml_str = ElementTree.tostring(plist, encoding="utf-8", method="xml").decode()
# 添加XML声明
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>\n'
xml_str = xml_declaration + xml_str
return xml_str

View File

@@ -0,0 +1,9 @@
from unittest import TestCase
from src.integrations.june.api import SixClient
class TestSixClient(TestCase):
def test_get_sign_sap_setup(self):
result = SixClient().get_sign_sap_setup()
print(result)

View File

@@ -0,0 +1,13 @@
from unittest import TestCase
from src.integrations.june.utils.utils import ShareCodeUtils, MachineCode
class TestShareCodeUtils(TestCase):
def test_code_to_id(self):
print(ShareCodeUtils.code_to_id("2JPA"))
def test_get_cpu_info(self):
print(MachineCode().get_cpu_info())
print(MachineCode().get_hd_id())
print(MachineCode().get_mo_address())

View File

@@ -0,0 +1 @@
"""June utilities."""

View File

View File

@@ -0,0 +1,236 @@
import base64
import gzip
import os
import platform
import random
import zlib
from xml.etree import ElementTree
from typing import Optional
from observability.logging import get_logger
logger = get_logger(__name__)
class ShareCodeUtils:
BASE = [
"H",
"V",
"E",
"8",
"S",
"2",
"D",
"Z",
"X",
"9",
"C",
"7",
"P",
"5",
"I",
"K",
"3",
"M",
"J",
"U",
"F",
"R",
"4",
"W",
"Y",
"L",
"T",
"N",
"6",
"B",
"G",
"Q",
]
SUFFIX_CHAR = "A"
BIN_LEN = len(BASE)
CODE_LEN = 4
@staticmethod
def id_to_code(id_: int):
array = [""] * ShareCodeUtils.BIN_LEN
num = ShareCodeUtils.BIN_LEN
while id_ // ShareCodeUtils.BIN_LEN > 0:
num2 = int(id_ % ShareCodeUtils.BIN_LEN)
num -= 1
array[num] = ShareCodeUtils.BASE[num2]
id_ //= ShareCodeUtils.BIN_LEN
num -= 1
array[num] = ShareCodeUtils.BASE[int(id_ % ShareCodeUtils.BIN_LEN)]
text = "".join(array[num : ShareCodeUtils.BIN_LEN])
length = len(text)
if length < ShareCodeUtils.CODE_LEN:
string_builder = [ShareCodeUtils.SUFFIX_CHAR]
for _ in range(ShareCodeUtils.CODE_LEN - length - 1):
string_builder.append(random.choice(ShareCodeUtils.BASE))
text += "".join(string_builder)
return text
@staticmethod
def id_to_number(id_: int, length: int):
array = [""] * ShareCodeUtils.BIN_LEN
num = ShareCodeUtils.BIN_LEN
while id_ // ShareCodeUtils.BIN_LEN > 0:
num2 = int(id_ % ShareCodeUtils.BIN_LEN)
num -= 1
array[num] = ShareCodeUtils.BASE[num2]
id_ //= ShareCodeUtils.BIN_LEN
num -= 1
array[num] = ShareCodeUtils.BASE[int(id_ % ShareCodeUtils.BIN_LEN)]
text = "".join(array[num : ShareCodeUtils.BIN_LEN])
length = len(text)
if length < length:
string_builder = [ShareCodeUtils.SUFFIX_CHAR]
for _ in range(length - length - 1):
string_builder.append(random.choice(ShareCodeUtils.BASE))
text += "".join(string_builder)
text2 = "".join(str(ord(c)) for c in text)
return text2
@staticmethod
def code_to_id(code: str):
try:
array = list(code)
num = 0
for i, char in enumerate(array):
num2 = 0
for j in range(ShareCodeUtils.BIN_LEN):
if char == ShareCodeUtils.BASE[j]:
num2 = j
break
if char == ShareCodeUtils.SUFFIX_CHAR:
break
num = num2 if i == 0 else num * ShareCodeUtils.BIN_LEN + num2
return num
except Exception:
return 0
class MachineCode:
"""
机器码生成器
注意由于移除了psutil和wmi依赖此类返回默认值
"""
def __init__(self):
self.machineCodeString = ""
def get_machine_code_string(self, mo_address: bool = False) -> str:
"""获取机器码字符串"""
try:
if not self.machineCodeString:
cpu_info = self.get_cpu_info() or "default_cpu"
hd_id = self.get_hd_id() or "default_hd"
self.machineCodeString = f"PC.{cpu_info}.{hd_id}."
mac_address = self.get_mo_address() if mo_address else ""
return self.machineCodeString + mac_address
except Exception as e:
logger.error(f"获取机器码失败: {e}")
return "teluns."
@classmethod
def get_os_type(cls) -> str:
system_name = platform.system()
if system_name == "Linux":
return "Linux"
elif system_name == "Windows":
return "Windows"
else:
return "Other"
def get_cpu_info(self) -> Optional[str]:
"""获取CPU信息 - 返回默认值"""
try:
if self.get_os_type() == "Windows":
return self.get_cpu_info_from_windows()
return self.get_cpu_info_from_linux()
except Exception as e:
logger.error(f"获取CPU错误{e}")
return "default_cpu"
def get_hd_id(self) -> Optional[str]:
"""获取硬盘ID - 返回默认值"""
try:
if self.get_os_type() == "Windows":
return self.get_hd_id_from_windows()
return self.get_hd_id_from_linux()
except Exception as e:
logger.error(f"获取HD错误{e}")
return "default_hd"
def get_cpu_info_from_windows(self) -> str:
"""Windows CPU信息 - 返回默认值"""
return "windows_cpu"
def get_cpu_info_from_linux(self) -> str:
"""Linux CPU信息 - 尝试读取/proc/cpuinfo"""
try:
with open("/proc/cpuinfo", "r") as f:
for line in f:
if line.strip() and line.rstrip("\n").startswith("processor"):
return line.split(":")[-1].strip()
except Exception as e:
logger.error(f"Error reading CPU info: {e}")
return "linux_cpu"
def get_hd_id_from_windows(self) -> str:
"""Windows硬盘ID - 返回默认值"""
return "WDC PC SN730 SDBPNTY-512G-1101"
def get_hd_id_from_linux(self) -> str:
"""Linux硬盘ID - 返回默认值"""
return "WDC PC SN730 SDBPNTY-512G-1101"
def get_mo_address(self) -> str:
"""MAC地址 - 返回默认值"""
return "00:00:00:00:00:00"
def get_disk_serial_numbers(self) -> str:
"""获取磁盘序列号 - 返回默认值"""
try:
if self.get_os_type() == "Windows":
return self.get_disk_serial_numbers_from_windows()
return self.get_disk_serial_numbers_from_linux()
except Exception as e:
logger.error(f"Get Disk Serial Number Error: {e}")
return "default_serial"
def get_disk_serial_numbers_from_windows(self) -> str:
"""Windows磁盘序列号 - 返回默认值"""
return "default_serial"
def get_disk_serial_numbers_from_linux(self) -> str:
"""Linux磁盘序列号 - 返回默认值"""
return "default_serial"
def decode_and_decompress(encoded_data: str, method: str = "gzip") -> str:
"""解码并解压缩数据"""
decoded_data = base64.b64decode(encoded_data)
if method == "gzip":
decompressed_data = gzip.decompress(decoded_data)
elif method == "deflate":
decompressed_data = zlib.decompress(decoded_data, -zlib.MAX_WBITS)
elif method == "brotli":
raise ValueError("Brotli compression not supported - library not available")
else:
raise ValueError("Unsupported compression method.")
return decompressed_data.decode("utf-8")
def xml_to_dict(element: ElementTree.Element) -> dict:
result = {}
key = None
for child in element:
if child.tag == "key":
key = child.text
elif child.tag == "data" and key:
result[key] = child.text
key = None
return result

View File

View File

@@ -7,8 +7,8 @@ from sqlalchemy.ext.asyncio import AsyncSession
from core.database import get_session
from core.responses import ApiResponse, success, paginated, ERROR_RESPONSES
from core.dependencies import get_trace_id
from apps.app_b.schemas import ProductCreate, ProductResponse
from apps.app_b.services import ProductService
from apps.apple.schemas import ProductCreate, ProductResponse
from apps.apple.services import ProductService
router = APIRouter(
prefix="/app-b",

View File

@@ -5,8 +5,8 @@ App B Services - Business logic for products.
from datetime import datetime
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from apps.app_b.models import Product
from apps.app_b.schemas import ProductCreate, ProductUpdate
from apps.apple.models import Product
from apps.apple.schemas import ProductCreate, ProductUpdate
from core.exceptions import NotFoundException, AlreadyExistsException