import ctypes import hashlib import json from urllib.parse import parse_qs from curl_cffi import requests from tenacity import retry, stop_after_attempt, wait_exponential from observability.logging import get_logger_with_trace logger = get_logger_with_trace(__name__) def r(data_string): def int_overflow(val): maxint = 2147483647 if not -maxint - 1 <= val <= maxint: val = (val + (maxint + 1)) % (2 * (maxint + 1)) - maxint - 1 return val def unsigned_right_shitf(n, i): # 数字小于0,则转为32位无符号uint if n < 0: n = ctypes.c_uint32(n).value # 正常位移位数是为正数,但是为了兼容js之类的,负数就右移变成左移好了 if i < 0: return -int_overflow(n << abs(i)) # print(n) return int_overflow(n >> i) char_list = [] aae = [ "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "U", "V", "W", "X", "Y", "Z", "a", "b", "c", "d", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "y", "z", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "+", "/", ] b_arr = data_string.encode("utf-8") for i in range(0, len(b_arr), 3): b_arr2 = [None for i in range(4)] b2 = 0 for i2 in range(0, 3): i3 = i + i2 if i3 <= len(b_arr) - 1: b_arr2[i2] = b2 | unsigned_right_shitf( (b_arr[i3] & 255), ((i2 * 2) + 2) ) b2 = unsigned_right_shitf( ((b_arr[i3] & 255) << (((2 - i2) * 2) + 2)) & 255, 2 ) else: b_arr2[i2] = b2 b2 = 64 b_arr2[3] = b2 for i4 in range(4): if b_arr2[i4] <= 63: char_list.append(aae[b_arr2[i4]]) else: char_list.append("=") return "".join(char_list) def encode_cipher(cipher_dict): for k, v in cipher_dict.items(): cipher_dict[k] = r(v) def gen_cipher_ep(uuid, ts): cipher_dict = { "d_model": "SM-N9760", "wifiBssid": "unknown", "osVersion": "9", "d_brand": "samsung", "screen": "960*540", "uuid": uuid, "aid": uuid, } encode_cipher(cipher_dict) data_dict = { "hdid": "JM9F1ywUPwflvMIpYPok0tt5k9kW4ArJEU3lfLhxBqw=", "ts": ts, "ridx": -1, "cipher": cipher_dict, "ciphertype": 5, "version": "1.2.0", "appname": "com.jingdong.app.mall", } ep = json.dumps(data_dict, separators=(",", ":")) return ep def get_pay_sign(order_id, face_price): app_id = "jd_android_app4" pay_app_key = "e53jfgRgd7Hk" input_str = f"{app_id};{order_id};37;{face_price};{pay_app_key}" # 获取 MD5 实例 input_bytes = input_str.encode("GBK") md5_hash = hashlib.md5() md5_hash.update(input_bytes) return md5_hash.hexdigest() @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=15)) def get_sign(func, body, uuid_, version): data = { "func": func, "body": body, "uid": uuid_, "platform": "android", "version": version, } logger.info(f"请求参数:{data}") response_jd = requests.post( url="http://unidbg-boot-server:9999/api/jd/encrypt", json=data, headers={"Content-Type": "application/json"}, ) logger.info(f"请求结果:{response_jd.text}") res = response_jd.json() params = parse_qs(res["data"]) formatted_params = {key: value[0] for key, value in params.items()} return formatted_params def my_json(code, data, msg): """返回标准格式的JSON响应""" # 处理枚举类型 if hasattr(code, "value"): code = code.value return {"code": code, "data": data, "msg": msg}