#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Apple ID Balance Query via Privacy Center Python implementation of applyQueryAuthBalance function """ import re import json import base64 import requests from typing import Dict, Optional, Tuple from dataclasses import dataclass from enum import Enum class DisplayType(Enum): """Display message types""" INFO = "xinxi" BALANCE = "balance" ERROR = "error" @dataclass class AppleAccount: """Apple account credentials""" apple_id: str apple_pwd: str @dataclass class HttpResult: """HTTP response wrapper""" status_code: int html: str headers: Dict[str, str] redirect_url: Optional[str] = None class AppleBalanceQuery: """Apple ID balance query via privacy.apple.com""" def __init__(self, callback_func=None): """ Initialize the balance query Args: callback_func: Callback function for status updates Function signature: func(key, display_type, message) """ self.apple_widget_key = "04659e25236376d440c224638c1cdd6a001abdd7f186cdcfa120abf35417efab" self.callback = callback_func self.session = requests.Session() self.cookies = {} self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } self.handel_dic = {} # Configuration - should match AppSysConfig in original code self.csrf_token_regex = r'name="csrf_token"\s+value="([^"]+)"' self.balance_regex = r'"store_balance":\s*\{([^}]+)\}' def _notify(self, message: str, display_type: DisplayType = DisplayType.INFO): """Send notification via callback if available""" if self.callback: self.callback("balance_query", display_type, message) else: print(f"[{display_type.value}] {message}") def _get_web_content(self, url: str, referer: str = "") -> HttpResult: """Make HTTP GET request""" headers = self.headers.copy() if referer: headers['Referer'] = referer try: response = self.session.get( url, headers=headers, cookies=self.cookies, allow_redirects=False ) # Update cookies from response if response.cookies: for cookie in response.cookies: self.cookies[cookie.name] = cookie.value return HttpResult( status_code=response.status_code, html=response.text, headers=dict(response.headers), redirect_url=response.headers.get('Location') ) except Exception as e: self._notify(f"请求失败: {str(e)}", DisplayType.ERROR) return HttpResult(status_code=500, html="", headers={}) def _get_web_json_content(self, url: str, referer: str = "", extra_headers: Dict[str, str] = None) -> HttpResult: """Make HTTP GET request for JSON content""" headers = self.headers.copy() headers['Accept'] = 'application/json, text/plain, */*' if referer: headers['Referer'] = referer if extra_headers: headers.update(extra_headers) try: response = self.session.get( url, headers=headers, cookies=self.cookies, allow_redirects=False ) # Update cookies from response if response.cookies: for cookie in response.cookies: self.cookies[cookie.name] = cookie.value return HttpResult( status_code=response.status_code, html=response.text, headers=dict(response.headers), redirect_url=response.headers.get('Location') ) except Exception as e: self._notify(f"JSON请求失败: {str(e)}", DisplayType.ERROR) return HttpResult(status_code=500, html="", headers={}) def _post_web_content(self, url: str, data: str, referer: str = "", extra_headers: Dict[str, str] = None) -> HttpResult: """Make HTTP POST request""" headers = self.headers.copy() headers['Content-Type'] = 'application/x-www-form-urlencoded' if referer: headers['Referer'] = referer if extra_headers: headers.update(extra_headers) try: response = self.session.post( url, data=data, headers=headers, cookies=self.cookies, allow_redirects=False ) # Update cookies from response if response.cookies: for cookie in response.cookies: self.cookies[cookie.name] = cookie.value return HttpResult( status_code=response.status_code, html=response.text, headers=dict(response.headers), redirect_url=response.headers.get('Location') ) except Exception as e: self._notify(f"POST请求失败: {str(e)}", DisplayType.ERROR) return HttpResult(status_code=500, html="", headers={}) def _authsignin(self, apple_id: str, apple_pwd: str) -> Tuple[bool, str]: """ Authenticate with Apple ID Note: This is a simplified implementation. The original code has a much more complex authentication flow with multiple steps and 2FA support. """ self._notify("开始身份验证...") # This would need to implement the full Apple ID authentication flow # including captcha, 2FA, etc. For demonstration purposes: # Step 1: Get initial auth page auth_url = "https://idmsa.apple.com/appleauth/auth/signin" init_response = self._get_web_content(auth_url) if init_response.status_code != 200: return False, "认证页面加载失败" # Step 2: Submit credentials (simplified) login_data = { 'accountName': apple_id, 'password': apple_pwd, 'rememberMe': 'false' } # The actual implementation would need to handle: # - CSRF tokens # - Two-factor authentication # - CAPTCHA challenges # - Session management # - Redirect handling # For now, return a placeholder result return True, "" def _apple_auth_auth(self, account: AppleAccount) -> bool: """ Handle additional authentication steps Note: This would implement 2FA handling if required """ self._notify("进行额外身份验证...") # Placeholder for 2FA handling # Real implementation would need to: # - Check if 2FA is required # - Handle SMS/phone verification # - Handle device trust # - Extract OAuth grant codes # Store OAuth grant code for session creation self.handel_dic['X-Apple-OAuth-Grant-Code'] = 'placeholder_grant_code' return True def _priv_signout(self): """Sign out from privacy center""" try: logout_url = "https://privacy.apple.com/logout" self._get_web_content(logout_url) except: pass # Ignore signout errors def query_balance(self, account: AppleAccount, use_proxy: bool = False) -> bool: """ Query Apple ID balance via privacy.apple.com Args: account: Apple account credentials use_proxy: Whether to use proxy (not implemented in this version) Returns: bool: True if query was successful, False otherwise """ try: # Add id client cookie self.cookies['idclient'] = 'web' self._notify("开始查询余额...") # Step 1: Load privacy account page privacy_url = "https://privacy.apple.com/account" web_content = self._get_web_content(privacy_url) if web_content.status_code != 200: self._notify("页面加载失败#1", DisplayType.ERROR) return False # Extract CSRF token csrf_match = re.search(self.csrf_token_regex, web_content.html) if not csrf_match: self._notify("无法获取CSRF令牌", DisplayType.ERROR) return False csrf_token = csrf_match.group(1).strip() # Step 2: Authenticate err_msg = "" if not self._authsignin(account.apple_id, account.apple_pwd)[0]: return False if not self._apple_auth_auth(account): return False # Step 3: Create authenticated session grant_code = self.handel_dic.get('X-Apple-OAuth-Grant-Code', '') if not grant_code: self._notify("无法获取授权码", DisplayType.ERROR) return False # Create authentication header auth_string = f"{self.apple_widget_key}:{grant_code}" auth_header = base64.b64encode(auth_string.encode()).decode() session_headers = { 'x-csrf-token': csrf_token, 'x-apple-authentication': auth_header } # Create session session_url = "https://privacy.apple.com/session/create" self._get_web_json_content(session_url, privacy_url, session_headers) # Step 4: Query balance from delete-account section self._notify("正在获取余额") delete_account_url = "https://privacy.apple.com/section/delete-account" balance_response = self._get_web_json_content( delete_account_url, privacy_url, session_headers ) if balance_response.status_code == 200: return self._parse_balance_response(balance_response) # Step 5: Try deactivate-account section as fallback self._notify("正在重新获取余额") deactivate_url = "https://privacy.apple.com/section/deactivate-account" fallback_response = self._get_web_json_content(deactivate_url, privacy_url) if fallback_response.status_code == 200: return self._parse_balance_response(fallback_response) self._notify("余额获取失败", DisplayType.ERROR) return False except Exception as e: self._notify(f"查询过程中发生错误: {str(e)}", DisplayType.ERROR) return False finally: # Always try to sign out self._priv_signout() def _parse_balance_response(self, response: HttpResult) -> bool: """Parse balance information from response""" try: if 'store_balance' not in response.html: self._notify("0", DisplayType.BALANCE) self._notify("查询完成") return True # Extract balance data using regex balance_match = re.search(self.balance_regex, response.html) if not balance_match: self._notify("余额解析失败", DisplayType.ERROR) return False balance_data = "{" + balance_match.group(1).strip().rstrip(',') + "}" balance_dict = json.loads(balance_data) balance_amount = balance_dict.get('balance', 0) currency = balance_dict.get('currency', 'USD') # Display balance balance_text = f"{balance_amount} {currency}" self._notify(balance_text, DisplayType.BALANCE) self._notify("查询完成") return True except json.JSONDecodeError as e: self._notify(f"余额数据解析失败: {str(e)}", DisplayType.ERROR) return False except Exception as e: self._notify(f"余额解析错误: {str(e)}", DisplayType.ERROR) return False def sample_callback(key: str, display_type: DisplayType, message: str): """Sample callback function for status updates""" print(f"[{key}] [{display_type.value}] {message}") def main(): """Example usage""" # Create balance query instance query = AppleBalanceQuery(callback_func=sample_callback) # Example account (replace with real credentials) account = AppleAccount( apple_id="your_apple_id@example.com", apple_pwd="your_password" ) # Query balance success = query.query_balance(account) if success: print("余额查询成功完成") else: print("余额查询失败") if __name__ == "__main__": main()