Files
kami_itunes_third_api/src/integrations/june/api.py
2024-08-19 00:29:51 +08:00

207 lines
7.2 KiB
Python

import base64
import hashlib
import json
import random
import time
from typing import Any
from urllib import parse
import requests
from requests.exceptions import ConnectionError
from src.integrations.june.config import Config
from src.integrations.june.crypto import encrypt_cbc_base64, encrypt
from src.integrations.june.models.login import (
AppleSixResponseModel,
LoginUserInfo,
AppleAccountModel,
LoginSignatureModel,
)
from src.integrations.june.models.redeem import AuthenticateModel
from src.integrations.june.utils.utils import ShareCodeUtils, decode_and_decompress
class SixClient:
def __init__(self):
self.__base_url = "http://43.241.16.229:6113"
# self.session = requests.Session()
def _do_post(
self, post_data: Any, type_: str, start_now_fun: str = "0", reties: int = 3
) -> AppleSixResponseModel | None:
if reties <= 0:
return
req_count = random.randint(0, 90) + 1
text = (
str(int(time.time()) + req_count)
+ str(Config.user_info.uid).zfill(4)
+ str(req_count)
).zfill(16)
if len(text) > 16:
text = text[:16]
# 生成32位md5
md5_ = hashlib.md5((Config.saff_mac + "by六月的风").encode()).hexdigest()
key = "7a588e60045849a1"
text2 = "90e7b0dc3ef2134c"
text3 = encrypt_cbc_base64(
encrypt(
json.dumps(post_data, separators=(",", ":")), key, text, False, text2
),
text,
text2,
).strip()
md5_2 = hashlib.md5(
(text3 + type_ + md5_ + "by六月的风_联系qq:1023092054").encode("utf-8")
).hexdigest()
headers = {
"timestamp": text,
"mac": md5_,
"startNowFun": start_now_fun,
"version": "5.1.9",
"User-Agent": "liuyeu_AppleBatch_June",
"Content-Type": "application/x-www-form-urlencoded",
}
if Config.user_info.token:
headers["token"] = base64.b64encode(
encrypt_cbc_base64(Config.user_info.token, text, text2).encode("utf-8")
)
try:
response = requests.post(
parse.urljoin(self.__base_url, "/AppleClientApi/requestApi"),
data={
"authentiString": base64.b64encode(text3.encode("utf-8")).decode(
"utf-8"
),
"sign": md5_2,
"type": type_,
},
headers=headers,
timeout=30,
)
except ConnectionError as e:
return self._do_post(post_data, type_, start_now_fun, reties - 1)
if response.ok:
if (
response.headers["sign"]
and hashlib.md5(
(
"abc_" + response.text + text + "by六月的风_联系qq:1023092054"
).encode("utf-8")
).hexdigest()
!= response.headers["sign"]
):
raise Exception("签名错误")
return AppleSixResponseModel(**response.json())
return self._do_post(post_data, type_, start_now_fun, reties - 1)
def login(self):
response = self._do_post(
{"account": "q905262752", "pwd": "Aa112211"},
"ApiLogin",
)
Config.user_info = LoginUserInfo(**response.Data)
Config.user_info.uid = ShareCodeUtils.code_to_id(Config.user_info.userNumber)
def check_is_login(self) -> bool:
response = self._do_post(
{"token": Config.user_info.token, "type": 9},
"ApiIsLogin",
)
if response.Code == "0000" and response.Message == "已经登录":
return True
return False
def login_remote_apple_account(
self, account: AppleAccountModel
) -> AppleSixResponseModel[dict]:
response = self._do_post(
{
"token": Config.user_info.token,
"account": account.account,
"pwd": account.password,
"isStore": 2,
"guid": "",
"serviceIpIndex": -1,
},
"ApiItunesLogin",
)
response.Data = json.loads(response.Data)
return AppleSixResponseModel[dict].model_validate(response.model_dump())
def get_sign_sap_setup(
self, reties: int = 3
) -> AppleSixResponseModel[LoginSignatureModel] | None:
if reties < 0:
return
response = self._do_post(
json.dumps(
{
"gZip": "1",
"type": "GetSignsapsetup",
"userAgent": "MacAppStore/2.0 (Macintosh; OS X 12.10) AppleWebKit/600.1.3.41",
},
separators=(",", ":"),
),
"ApiServiceSend",
start_now_fun="9",
)
response.Data = json.loads(decode_and_decompress(response.Data))
if response.Data.get("msg") == "请重试":
print("请重试", response.Data)
time.sleep(2)
return self.get_sign_sap_setup(reties - 1)
response = AppleSixResponseModel[LoginSignatureModel].model_validate(
response.model_dump()
)
# 处理signature编码问题和Base64转码问题
response.Data.signature = base64.b64decode(
parse.unquote_plus(response.Data.signature)
).decode()
return response
def get_sign_sap_setup_cert(
self,
account: AppleAccountModel,
sign: AppleSixResponseModel[LoginSignatureModel],
sign_sap_setup: str,
reties: int = 3,
) -> AppleSixResponseModel[AuthenticateModel] | None:
if reties < 0:
return
response = self._do_post(
json.dumps(
{
"signSap": parse.quote_plus(
base64.b64encode(sign_sap_setup.encode()).decode()
),
"appleId": account.account,
"guid": "",
"applePwd": account.password,
"intptr_": sign.Data.adder1,
"intPtr": sign.Data.adder2,
"idType": 1,
"gZip": "1",
"type": "GetSignsapsetupCert",
"serviceIndex": sign.serverIndex,
},
separators=(",", ":"),
),
"ApiServiceSend",
start_now_fun="9",
)
response.Data = json.loads(decode_and_decompress(response.Data))
if response.Data.get("msg") == "请重试":
print("请重试 get_sign_sap_setup_cert", response.Data)
time.sleep(2)
return self.get_sign_sap_setup_cert(
account, sign, sign_sap_setup, reties - 1
)
response = AppleSixResponseModel[AuthenticateModel].model_validate(
response.model_dump()
)
# 解码数据
response.Data.signature = parse.unquote_plus(response.Data.signature)
response.Data.guid = parse.unquote_plus(response.Data.guid)
response.Data.post = parse.unquote_plus(response.Data.post)
return response