Files
kami_itunes_june/apple_balance_query.py
danial cb905409f8 Refactor AppleBatch_June project:
- Removed DotRas library dependency in RasTools.cs, providing empty implementations for Connect and Disconnect methods.
- Updated context menu implementation in ReddemHelp.cs to use ToolStripMenuItem and ContextMenuStrip.
- Replaced caching mechanism in SiteHelper.cs with a custom dictionary-based implementation, removing reliance on HttpRuntime.Cache.
- Switched from JavaScriptSerializer to Newtonsoft.Json for JSON serialization/deserialization in multiple files (Tools.cs, addMaterial.cs).
- Added WebHeaderCollection property to HttpItem.cs for better header management.
- Deleted obsolete AssemblyInfo.cs file.
- Introduced apple_balance_query.py for querying Apple ID balance via Privacy Center, implementing authentication and balance retrieval logic.
2025-11-10 17:38:18 +08:00

380 lines
13 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Apple ID Balance Query via Privacy Center
Python implementation of applyQueryAuthBalance function
"""
import re
import json
import base64
import requests
from typing import Dict, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
class DisplayType(Enum):
"""Display message types"""
INFO = "xinxi"
BALANCE = "balance"
ERROR = "error"
@dataclass
class AppleAccount:
"""Apple account credentials"""
apple_id: str
apple_pwd: str
@dataclass
class HttpResult:
"""HTTP response wrapper"""
status_code: int
html: str
headers: Dict[str, str]
redirect_url: Optional[str] = None
class AppleBalanceQuery:
"""Apple ID balance query via privacy.apple.com"""
def __init__(self, callback_func=None):
"""
Initialize the balance query
Args:
callback_func: Callback function for status updates
Function signature: func(key, display_type, message)
"""
self.apple_widget_key = "04659e25236376d440c224638c1cdd6a001abdd7f186cdcfa120abf35417efab"
self.callback = callback_func
self.session = requests.Session()
self.cookies = {}
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.handel_dic = {}
# Configuration - should match AppSysConfig in original code
self.csrf_token_regex = r'name="csrf_token"\s+value="([^"]+)"'
self.balance_regex = r'"store_balance":\s*\{([^}]+)\}'
def _notify(self, message: str, display_type: DisplayType = DisplayType.INFO):
"""Send notification via callback if available"""
if self.callback:
self.callback("balance_query", display_type, message)
else:
print(f"[{display_type.value}] {message}")
def _get_web_content(self, url: str, referer: str = "") -> HttpResult:
"""Make HTTP GET request"""
headers = self.headers.copy()
if referer:
headers['Referer'] = referer
try:
response = self.session.get(
url,
headers=headers,
cookies=self.cookies,
allow_redirects=False
)
# Update cookies from response
if response.cookies:
for cookie in response.cookies:
self.cookies[cookie.name] = cookie.value
return HttpResult(
status_code=response.status_code,
html=response.text,
headers=dict(response.headers),
redirect_url=response.headers.get('Location')
)
except Exception as e:
self._notify(f"请求失败: {str(e)}", DisplayType.ERROR)
return HttpResult(status_code=500, html="", headers={})
def _get_web_json_content(self, url: str, referer: str = "",
extra_headers: Dict[str, str] = None) -> HttpResult:
"""Make HTTP GET request for JSON content"""
headers = self.headers.copy()
headers['Accept'] = 'application/json, text/plain, */*'
if referer:
headers['Referer'] = referer
if extra_headers:
headers.update(extra_headers)
try:
response = self.session.get(
url,
headers=headers,
cookies=self.cookies,
allow_redirects=False
)
# Update cookies from response
if response.cookies:
for cookie in response.cookies:
self.cookies[cookie.name] = cookie.value
return HttpResult(
status_code=response.status_code,
html=response.text,
headers=dict(response.headers),
redirect_url=response.headers.get('Location')
)
except Exception as e:
self._notify(f"JSON请求失败: {str(e)}", DisplayType.ERROR)
return HttpResult(status_code=500, html="", headers={})
def _post_web_content(self, url: str, data: str, referer: str = "",
extra_headers: Dict[str, str] = None) -> HttpResult:
"""Make HTTP POST request"""
headers = self.headers.copy()
headers['Content-Type'] = 'application/x-www-form-urlencoded'
if referer:
headers['Referer'] = referer
if extra_headers:
headers.update(extra_headers)
try:
response = self.session.post(
url,
data=data,
headers=headers,
cookies=self.cookies,
allow_redirects=False
)
# Update cookies from response
if response.cookies:
for cookie in response.cookies:
self.cookies[cookie.name] = cookie.value
return HttpResult(
status_code=response.status_code,
html=response.text,
headers=dict(response.headers),
redirect_url=response.headers.get('Location')
)
except Exception as e:
self._notify(f"POST请求失败: {str(e)}", DisplayType.ERROR)
return HttpResult(status_code=500, html="", headers={})
def _authsignin(self, apple_id: str, apple_pwd: str) -> Tuple[bool, str]:
"""
Authenticate with Apple ID
Note: This is a simplified implementation. The original code has a much
more complex authentication flow with multiple steps and 2FA support.
"""
self._notify("开始身份验证...")
# This would need to implement the full Apple ID authentication flow
# including captcha, 2FA, etc. For demonstration purposes:
# Step 1: Get initial auth page
auth_url = "https://idmsa.apple.com/appleauth/auth/signin"
init_response = self._get_web_content(auth_url)
if init_response.status_code != 200:
return False, "认证页面加载失败"
# Step 2: Submit credentials (simplified)
login_data = {
'accountName': apple_id,
'password': apple_pwd,
'rememberMe': 'false'
}
# The actual implementation would need to handle:
# - CSRF tokens
# - Two-factor authentication
# - CAPTCHA challenges
# - Session management
# - Redirect handling
# For now, return a placeholder result
return True, ""
def _apple_auth_auth(self, account: AppleAccount) -> bool:
"""
Handle additional authentication steps
Note: This would implement 2FA handling if required
"""
self._notify("进行额外身份验证...")
# Placeholder for 2FA handling
# Real implementation would need to:
# - Check if 2FA is required
# - Handle SMS/phone verification
# - Handle device trust
# - Extract OAuth grant codes
# Store OAuth grant code for session creation
self.handel_dic['X-Apple-OAuth-Grant-Code'] = 'placeholder_grant_code'
return True
def _priv_signout(self):
"""Sign out from privacy center"""
try:
logout_url = "https://privacy.apple.com/logout"
self._get_web_content(logout_url)
except:
pass # Ignore signout errors
def query_balance(self, account: AppleAccount, use_proxy: bool = False) -> bool:
"""
Query Apple ID balance via privacy.apple.com
Args:
account: Apple account credentials
use_proxy: Whether to use proxy (not implemented in this version)
Returns:
bool: True if query was successful, False otherwise
"""
try:
# Add id client cookie
self.cookies['idclient'] = 'web'
self._notify("开始查询余额...")
# Step 1: Load privacy account page
privacy_url = "https://privacy.apple.com/account"
web_content = self._get_web_content(privacy_url)
if web_content.status_code != 200:
self._notify("页面加载失败#1", DisplayType.ERROR)
return False
# Extract CSRF token
csrf_match = re.search(self.csrf_token_regex, web_content.html)
if not csrf_match:
self._notify("无法获取CSRF令牌", DisplayType.ERROR)
return False
csrf_token = csrf_match.group(1).strip()
# Step 2: Authenticate
err_msg = ""
if not self._authsignin(account.apple_id, account.apple_pwd)[0]:
return False
if not self._apple_auth_auth(account):
return False
# Step 3: Create authenticated session
grant_code = self.handel_dic.get('X-Apple-OAuth-Grant-Code', '')
if not grant_code:
self._notify("无法获取授权码", DisplayType.ERROR)
return False
# Create authentication header
auth_string = f"{self.apple_widget_key}:{grant_code}"
auth_header = base64.b64encode(auth_string.encode()).decode()
session_headers = {
'x-csrf-token': csrf_token,
'x-apple-authentication': auth_header
}
# Create session
session_url = "https://privacy.apple.com/session/create"
self._get_web_json_content(session_url, privacy_url, session_headers)
# Step 4: Query balance from delete-account section
self._notify("正在获取余额")
delete_account_url = "https://privacy.apple.com/section/delete-account"
balance_response = self._get_web_json_content(
delete_account_url, privacy_url, session_headers
)
if balance_response.status_code == 200:
return self._parse_balance_response(balance_response)
# Step 5: Try deactivate-account section as fallback
self._notify("正在重新获取余额")
deactivate_url = "https://privacy.apple.com/section/deactivate-account"
fallback_response = self._get_web_json_content(deactivate_url, privacy_url)
if fallback_response.status_code == 200:
return self._parse_balance_response(fallback_response)
self._notify("余额获取失败", DisplayType.ERROR)
return False
except Exception as e:
self._notify(f"查询过程中发生错误: {str(e)}", DisplayType.ERROR)
return False
finally:
# Always try to sign out
self._priv_signout()
def _parse_balance_response(self, response: HttpResult) -> bool:
"""Parse balance information from response"""
try:
if 'store_balance' not in response.html:
self._notify("0", DisplayType.BALANCE)
self._notify("查询完成")
return True
# Extract balance data using regex
balance_match = re.search(self.balance_regex, response.html)
if not balance_match:
self._notify("余额解析失败", DisplayType.ERROR)
return False
balance_data = "{" + balance_match.group(1).strip().rstrip(',') + "}"
balance_dict = json.loads(balance_data)
balance_amount = balance_dict.get('balance', 0)
currency = balance_dict.get('currency', 'USD')
# Display balance
balance_text = f"{balance_amount} {currency}"
self._notify(balance_text, DisplayType.BALANCE)
self._notify("查询完成")
return True
except json.JSONDecodeError as e:
self._notify(f"余额数据解析失败: {str(e)}", DisplayType.ERROR)
return False
except Exception as e:
self._notify(f"余额解析错误: {str(e)}", DisplayType.ERROR)
return False
def sample_callback(key: str, display_type: DisplayType, message: str):
"""Sample callback function for status updates"""
print(f"[{key}] [{display_type.value}] {message}")
def main():
"""Example usage"""
# Create balance query instance
query = AppleBalanceQuery(callback_func=sample_callback)
# Example account (replace with real credentials)
account = AppleAccount(
apple_id="your_apple_id@example.com",
apple_pwd="your_password"
)
# Query balance
success = query.query_balance(account)
if success:
print("余额查询成功完成")
else:
print("余额查询失败")
if __name__ == "__main__":
main()