import re import traceback from hashlib import md5 import time import string import PIL from PIL import Image from base64 import b64decode from io import BytesIO import numpy as np from pathlib import Path import random import cv2 import base64 import json import requests from itertools import chain from logger import get_logger import execjs import secrets import subprocess from functools import partial logger = get_logger() subprocess.Popen = partial(subprocess.Popen, encoding="utf-8") def pil_to_cv2(img): """ pil转cv2图片 :param img: pil图像, :return: cv2图像, """ img = cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR) return img def bytes_to_cv2(img): """ 二进制图片转cv2 :param img: 二进制图片数据, :return: cv2图像, """ # 将图片字节码bytes, 转换成一维的numpy数组到缓存中 img_buffer_np = np.frombuffer(img, dtype=np.uint8) # 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式 img_np = cv2.imdecode(img_buffer_np, 1) return img_np def cv2_open(img, flag=None): """ 统一输出图片格式为cv2图像, :param img: :param flag: 颜色空间转换类型, default: None eg: cv2.COLOR_BGR2GRAY(灰度图) :return: cv2图像, """ if isinstance(img, bytes): img = bytes_to_cv2(img) elif isinstance(img, (str, Path)): img = cv2.imread(str(img)) elif isinstance(img, np.ndarray): img = img elif isinstance(img, PIL.Image): img = pil_to_cv2(img) else: raise ValueError(f"输入的图片类型无法解析: {type(img)}") if flag is not None: img = cv2.cvtColor(img, flag) return img def imshow(img, winname="test", delay=0): """cv2展示图片""" cv2.imshow(winname, img) cv2.waitKey(delay) cv2.destroyAllWindows() def get_distance(bg, tp, im_show=False, save_path=None): """ :param bg: 背景图路径或Path对象或图片二进制 eg: 'assets/bg.jpg' Path('assets/bg.jpg') :param tp: 缺口图路径或Path对象或图片二进制 eg: 'assets/tp.jpg' Path('assets/tp.jpg') :param im_show: 是否显示结果, ; default: False :param save_path: 保存路径, ; default: None :return: 缺口位置 """ # 读取图片 bg_img = cv2_open(bg) tp_gray = cv2_open(tp, flag=cv2.COLOR_BGR2GRAY) # 金字塔均值漂移 bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50) # 边缘检测 tp_gray = cv2.Canny(tp_gray, 255, 255) bg_gray = cv2.Canny(bg_shift, 255, 255) # 目标匹配 result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED) # 解析匹配结果 min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) distance = max_loc[0] if save_path or im_show: # 需要绘制的方框高度和宽度 tp_height, tp_width = tp_gray.shape[:2] # 矩形左上角点位置 x, y = max_loc # 矩形右下角点位置 _x, _y = x + tp_width, y + tp_height # 绘制矩形 bg_img = cv2_open(bg) cv2.rectangle(bg_img, (x, y), (_x, _y), (0, 0, 255), 2) # 保存缺口识别结果到背景图 if save_path: save_path = Path(save_path).resolve() save_path = ( save_path.parent / f"{save_path.stem}.{distance}{save_path.suffix}" ) save_path = save_path.__str__() cv2.imwrite(save_path, bg_img) # 显示缺口识别结果 if im_show: imshow(bg_img) return distance def generate_coordinate_data(t, include_duplicates=False, total_steps=30): """ 生成模拟的坐标时间序列数据。 :param start_x: 起始横坐标 :param start_y: 起始纵坐标 :param end_x: 结束横坐标 :param end_y: 结束纵坐标 :param include_duplicates: 是否包含重复坐标点 :param include_backtrack: 是否包含回溯点 :param total_steps: 总的时间步数 :return: 包含(x, y, 时间戳)的列表 """ def gen_arr(current_x, current_y, end_x, end_y): arr = [] arr.append({"x": int(current_x), "y": int(current_y), "t": int(time_base)}) step_time = ((time_base + 3000) - time_base) / total_steps for step in range(total_steps): # 决定是否添加重复点 if include_duplicates and random.random() < 0.1: # 10%概率添加重复点 arr.append( { "x": current_x, "y": current_y, "t": int(time_base + step * step_time), } ) # 计算新坐标,模拟逐步接近终点的趋势 if current_x > end_x: dx = -1 if random.random() < 0.8 else -2 # 大部分时间递减,偶尔更快 elif current_x < end_x: dx = 1 if random.random() < 0.8 else 2 else: dx = 0 if current_y > end_y: dy = -1 elif current_y < end_y: dy = 1 else: dy = 0 current_x += dx current_y += dy arr.append( { "x": int(current_x), "y": int(current_y), "t": int(time_base + step * step_time), } ) return arr def del_timestamp(arr): return [{"x": item["x"], "y": item["y"]} for item in arr] current_x, current_y = random.randint(200, 300), random.randint(500, 503) end_x, end_y = random.randint(700, 800), random.randint(500, 503) time_base = int(time.time() * 1000) # 假定的起始时间戳 mouse_trajectory = gen_arr(current_x, current_y, end_x, end_y) dx, dy = mouse_trajectory[-1]["x"], mouse_trajectory[-1]["y"] end_dx = dx + t sliding_trajectory = gen_arr(dx, dy, end_dx, dy) return { "pre_jigsaw_sliding_track": mouse_trajectory, "jigsaw_sliding_track": sliding_trajectory, "sliding_track": del_timestamp(sliding_trajectory), } class Show_xy: def __init__(self, bg_path, tg_path, save_path): # 由于没有识别模型, ttshitu.com 需要取这个网站注册一个账号来识别点选 self._user_name = "" self._pwd = "" self._typeid = 21 self._save_path = save_path self.__join_img(bg_path, tg_path) def __join_img(self, bg_path, tg_path): # 读取图像 image_bottom_pil = Image.open(tg_path) # 假设是PNG格式,包含透明度 image_bottom_pil = image_bottom_pil.convert("RGBA") # 将透明部分替换为白色 datas = image_bottom_pil.getdata() new_data = [] for item in datas: if item[3] == 0: # alpha通道值为0表示完全透明 new_data.append((255, 255, 255, 255)) # 替换为白色 else: new_data.append(item) image_bottom_pil.putdata(new_data) new_width = int(image_bottom_pil.width * 0.9) new_height = int(image_bottom_pil.height * 0.9) image_bottom_resized_pil = image_bottom_pil.resize((new_width, new_height)) image_bottom = np.array(image_bottom_resized_pil)[:, :, :3][..., ::-1] image_top = cv2.imread(bg_path) height_top, width_top, _ = image_top.shape height_bottom, width_bottom, _ = image_bottom.shape new_height = height_top + height_bottom new_width = max(width_top, width_bottom) new_image = np.ones((new_height, new_width, 3), dtype=np.uint8) * 255 new_image[:height_top, :width_top] = image_top new_image[height_top:, :width_bottom] = image_bottom cv2.imwrite(self._save_path, new_image, [cv2.IMWRITE_JPEG_QUALITY, 100]) def show_xy(self): with open(self._save_path, "rb") as f: base64_data = base64.b64encode(f.read()) b64 = base64_data.decode() data = { "username": self._user_name, "password": self._pwd, "typeid": self._typeid, "remark": "", "image": b64, } result = json.loads( requests.post("http://api.ttshitu.com/predict", json=data).text ) if result["success"]: res = result["data"]["result"] res.split("|") return True, [i.split(",") for i in res.split("|")] else: # !!!!!!!注意:返回 人工不足等 错误情况 请加逻辑处理防止脚本卡死 继续重新 识别 return False, result["message"] class CtripLogin(object): def __init__(self, card_num, card_pwd, price, order_num, cookies): self.headers = { "accept": "*/*", "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "cache-control": "no-cache", "content-type": "application/json;charset=UTF-8", "origin": "https://passport.ctrip.com", "pragma": "no-cache", "priority": "u=1, i", "referer": "https://passport.ctrip.com/", "sec-ch-ua": '"Chromium";v="124", "Microsoft Edge";v="124", "Not-A.Brand";v="99"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-site", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0", } self.headers2 = { "accept": "*/*", "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "cache-control": "no-cache", "content-type": "text/plain", "cookieorigin": "https://passport.ctrip.com", "origin": "https://passport.ctrip.com", "pragma": "no-cache", "priority": "u=1, i", "referer": "https://passport.ctrip.com/", "sec-ch-ua": '"Chromium";v="130", "Microsoft Edge";v="130", "Not?A_Brand";v="99"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-site", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0", "x-origin-ct": "application/json;charset=utf-8", "x-payload-accept": "camev1", "x-payload-encoding": "camev1", } self.headers3 = { "accept": "application/json", "accept-language": "zh-CN,zh;q=0.9", "cache-control": "no-cache", "content-type": "application/json", "cookieorigin": "https://3g.ctrip.com", "origin": "https://3g.ctrip.com", "pragma": "no-cache", "priority": "u=1, i", "referer": "https://3g.ctrip.com/", "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-site", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0", "x-ctrip-pageid": "214402", } self.cookies = {} # 接收参数 self.card_num = card_num self.card_pwd = card_pwd self.price = price self.order_num = order_num self.cookies2 = cookies # self.phone = phone self.token = None self.rid = None self.captcha_token = None self.dimensions = None self.client_id = None self.sequence_id = None self.display = None self.extend_param = None self.screen_size_info = None # 指纹参数 self.page_id = None self.fp = None self.vid = None self.r = None self.rt = None self.random_str_32 = None self.ip = "14.19.6.53" self.app_id = "100015783" self.business_site = "prepay_giftcardquery_h5_pic" self.version = "2.0.23" self.guid = None # self.__save_img_path = __file__.replace('\\', '/').rsplit('/', 1)[0] + '/images/' # if not os.path.isdir(self.__save_img_path): # os.makedirs(self.__save_img_path) self.remarks = "" @staticmethod def generate_non_zero_starting_number(length): first_digit = random.randint(1, 9) remaining_length = length - 1 remaining_digits = "".join( str(random.randint(0, 9)) for _ in range(remaining_length) ) random_number_str = str(first_digit) + remaining_digits return random_number_str @staticmethod def generate_random_string(length): possible_characters = string.ascii_letters + string.digits random_string = "".join( secrets.choice(possible_characters) for _ in range(length) ) return random_string @staticmethod def ctrip_aes_enc(data): with open("./enc1.js", "r", encoding="utf8") as f: js_code = f.read() ctx = execjs.compile(js_code) return ctx.call("enc", data) @staticmethod def login_crypto(med, text): with open("./passwordEnc.js", encoding="utf8") as f: js_code = f.read() ctx = execjs.compile(js_code).call(med, text) return ctx def generate_random_hex_string_lowercase(self, length=32): characters = string.hexdigits.lower() random_str = "".join(random.choice(characters) for _ in range(length)) self.r = random_str def generate_vid(self, length=8): # 生成随机的小数部分,保留两位小数 decimal_part = int(time.time() * 1000) # 生成随机的字母后缀,指定长度 suffix = "".join( random.choices( string.ascii_uppercase + string.ascii_lowercase + string.digits, k=length, ) ) # 拼接整个字符串 result_string = str(decimal_part) + "." + suffix self.vid = result_string def generate_screen_info(self): """ 指纹相关参数 :return: """ screen_information = [ { "display": "2560x1440", "screen_size": { "resolution_width": 2260, "resolution_height": 1360, "language": "", }, }, { "display": "1920x1080", "screen_size": { "resolution_width": 1536, "resolution_height": 864, "language": "", }, }, { "display": "3200x1680", "screen_size": { "resolution_width": 2950, "resolution_height": 1480, "language": "", }, }, { "display": "720x480", "screen_size": { "resolution_width": 640, "resolution_height": 420, "language": "", }, }, ] screen = random.choice(screen_information) self.display = screen["display"] self.screen_size_info = screen["screen_size"] self.extend_param = self.ctrip_aes_enc(screen["screen_size"]) def generate_random_fp(self): def random_hex_segment_upper(): return format(random.randint(0, 0xFFFFFF), "06X") fp = "-".join([random_hex_segment_upper() for _ in range(3)]) self.fp = fp def generate_rt(self): data = { "fp": self.fp, "vid": self.vid, "pageId": self.page_id, "r": self.r, "ip": self.ip, "rg": "fin", "kpData": "0_0_0", "kpControl": "0_0_0-0_0_0", "kpEmp": "0_0_0_0_0_0_0_0_0_0-0_0_0_0_0_0_0_0_0_0-0_0_0_0_0_0_0_0_0_0", "screen": self.display, "tz": "+8", "blang": "zh-CN", "oslang": "zh-CN", "ua": "Mozilla%2F5.0%20(Windows%20NT%2010.0%3B%20Win64%3B%20x64)%20AppleWebKit%2F537.36%20(KHTML%2C%20like%20Gecko)%20Chrome%2F124.0.0.0%20Safari%2F537.36%20Edg%2F124.0.0.0", "d": "passport.ctrip.com", "v": "25", "kpg": "0_0_0_19_12161_43_0_0_0_0", "adblock": "F", "cck": "F", "GUID": self.guid, } rt = "&".join(f"{key}={value}" for key, value in data.items()) self.rt = rt def set_client_id(self): url = "https://m.ctrip.com/restapi/soa2/10290/createclientid" params = {"systemcode": "09", "createtype": "3", "contentType": "json"} response = requests.get(url, headers=self.headers, params=params) self.client_id = response.json()["ClientID"] logger.info(f"订单ID:{self.order_num},初始化 client_id 成功 {self.client_id}") def set_sequence_id(self): time_ = time.time() * 1000 url = "https://bbzutils.ctrip.com/ubtChecking" data = { "vid": f"{self.vid}", "sid": 1, "pvid": 1, "cid": None, "pageId": self.page_id, "env": "online", "ts": time_, "timezoneOffset": -480, "sendType": "single", "pageRef": self.page_id, "contextTs": time_, "sdkVer": "1.2.36/new/c", "url": "https://passport.ctrip.com/user/login?BackUrl=https%3A%2F%2Fwww.ctrip.com%2F%3Fsid%3D153507%26allianceid%3D5376%26ouid%3Dtitle%26sepopup%3D104#ctm_ref=c_ph_login_buttom", "contextTs_date": time_, "collectReq": "", "tag": "", } data = json.dumps(data, separators=(",", ":")) response = requests.post( url, headers=self.headers, cookies=self.cookies, data=data ) self.sequence_id = response.json()["ResponseStatus"]["Extension"][0]["Value"] logger.info( f"订单ID:{self.order_num},初始化 sequence_id 成功 {self.sequence_id}" ) @staticmethod def base64_to_image(base64_string: str, output_file: str): """ base64 转为png图片 :param base64_string: :param output_file: :return: """ encoded = base64_string decoded = b64decode(encoded) image_data = BytesIO(decoded) image = Image.open(image_data) image.save(output_file, "png") @staticmethod def merge_and_scale_arrays(arrays): merged_array = list(chain(*arrays)) print(merged_array) return [int(element) for element in merged_array] def generate_11_digit_number(self): # 生成一个1开头的11位数,第二位到第十一位是0-9之间的随机数 number = "1" + "".join(str(random.randint(0, 9)) for _ in range(10)) self.page_id = str(number) return number def risk_inspect(self): data_dict = { "rt": self.rt, "ua": self.headers["user-agent"], "p": "pc", "fp": self.fp, "vid": f"{self.vid}", "identify": self.fp, "guid": self.client_id, "h5_duid": None, "pc_duid": None, "hb_uid": None, "pc_uid": None, "h5_uid": None, "infosec_openid": None, "device_id": self.r, "client_id": self.generate_random_string(32), "pid": self.generate_non_zero_starting_number(16), "sid": self.generate_random_string(16), "login_uid": self.generate_non_zero_starting_number(10), "client_type": "PC", "site": { "type": "PC", "url": "https://m.ctrip.com/webapp/cw/giftcard/xct/login_bycode.html", "ref": "", "title": "登录", "keywords": "携程通,携程通用车,携程,礼品卡,人力资源,员工福利,企业管理,旅游", }, "device": { "width": self.screen_size_info["resolution_width"], "height": self.screen_size_info["resolution_height"], "os": "", "pixelRatio": 1.5, "did": "", }, "user": {"tid": "", "uid": "", "vid": ""}, } dimensions = self.ctrip_aes_enc(data_dict) self.dimensions = dimensions json_data = { "extend_param": self.extend_param, "appid": self.app_id, "business_site": self.business_site, "version": self.version, "dimensions": dimensions, "sign": None, } enc_str = f"appid={json_data['appid']}&business_site={json_data['business_site']}&version={json_data['version']}&dimensions={json_data['dimensions']}&extend_param={json_data['extend_param']}" sign = md5(enc_str.encode()).hexdigest() json_data["sign"] = sign response = requests.post( url="https://ic.ctrip.com/captcha/v4/risk_inspect", headers=self.headers, json=json_data, ) ret = response.json() self.token = ret["result"]["token"] self.rid = ret["result"]["rid"] if ret["result"]["risk_info"]["process_type"] == "NONE": self.token = ret["result"]["token"] self.rid = ret["result"]["rid"] return None else: bg = ret["result"]["risk_info"]["process_value"]["processed_image"] slide = ret["result"]["risk_info"]["process_value"]["jigsaw_image"] # verify_bg_path = os.path.join(self.__save_img_path, f'bg_{int(time.time() * 1000)}.png') # verify_slide_path = os.path.join(self.__save_img_path, f'slide_{int(time.time() * 1000)}.png') # self.base64_to_image(bg, verify_bg_path) # self.base64_to_image(slide, verify_slide_path) distance = int(get_distance(base64.b64decode(bg), base64.b64decode(slide))) logger.info(f"订单ID:{self.order_num},滑动距离 ---> {distance}") return distance def enc_params(self, t): data_dict = generate_coordinate_data(t) data = { "st": int(time.time()) * 1000, "slidingTime": random.randint(3000, 4000), "display": self.display, "keykoardTrack": [], "jigsawKeyboardEventExist": False, "jigsawPicWidth": 320, "jigsawPicHeight": 160, "jigsawViewDuration": random.randint(1000, 2000), "slidingTrack": data_dict["sliding_track"], "timezone": -480, "flashState": False, "language": "zh-CN", "platform": "Win32", "hasSessStorage": False, "hasLocalStorage": False, "hasIndexedDB": False, "hasDataBase": False, "doNotTrack": False, "touchSupport": False, "mediaStreamTrack": False, "value": t, "preJigsawSlidingTrack": data_dict["pre_jigsaw_sliding_track"], "jigsawSlidingTrack": data_dict["jigsaw_sliding_track"], } enc_text = self.ctrip_aes_enc(data) return enc_text def verify_jigsaw(self, verify_msg): json_data = { "appid": self.app_id, "business_site": self.business_site, "token": self.token, "rid": self.rid, "version": self.version, "verify_msg": verify_msg, "dimensions": self.dimensions, "extend_param": self.extend_param, "sign": None, } enc_str = f"appid={json_data['appid']}&business_site={json_data['business_site']}&version={json_data['version']}&verify_msg={json_data['verify_msg']}&dimensions={json_data['dimensions']}&extend_param={json_data['extend_param']}&token={json_data['token']}&captcha_type=JIGSAW" sign = md5(enc_str.encode()).hexdigest() json_data["sign"] = sign response = requests.post( "https://ic.ctrip.com/captcha/v4/verify_jigsaw", headers=self.headers, json=json_data, ) ret = response.json() if "ICON" in response.text: logger.info(f"订单ID:{self.order_num},出现点选验证,请重新请求") # self.token = ret["result"]["token"] # self.rid = ret["result"]["rid"] # bg = ret["result"]["risk_info"]["process_value"]["big_image"] # icon = ret["result"]["risk_info"]["process_value"]["small_image"] # verify_bg_path = os.path.join(self.__save_img_path, f'bg2_{int(time.time() * 1000)}.png') # verify_slide_path = os.path.join(self.__save_img_path, f'icon{int(time.time() * 1000)}.png') # self.base64_to_image(bg, verify_bg_path) # self.base64_to_image(icon, verify_slide_path) # save_path = os.path.join(self.__save_img_path, f'{int(time.time() * 1000)}.png') # coordinate = Show_xy(verify_bg_path, verify_slide_path, save_path).show_xy() # new_list = self.merge_and_scale_arrays(coordinate[1]) # logger.info(f'点选坐标 ---> {new_list}') # return new_list return {} else: logger.info( f'订单ID:{self.order_num},通过滑块验证码 token ---> {ret["result"]["token"]} rid ---> {ret["result"]["rid"]}' ) return {"token": ret["result"]["token"], "rid": ret["result"]["rid"]} def enc_icon_params(self, new_list): t = random.randint(460, 500) data_dict = generate_coordinate_data(t) time_ = int(time.time() * 1000) difference = int(random.randint(3000, 4000)) tiem_2 = time_ - difference json_str1 = json.dumps( data_dict["jigsaw_sliding_track"], ensure_ascii=False, separators=(",", ":") ) json_str2 = json.dumps( data_dict["sliding_track"], ensure_ascii=False, separators=(",", ":") ) data = { "st": time_, # 这里是检测屏幕大小的位置 "display": self.display, "keykoardTrack": [], "iconKeyboardEventExist": True, "iconViewDuration": 2237, "timezone": -480, "flashState": False, "language": "zh-CN", "platform": "Win32", "hasSessStorage": True, "hasLocalStorage": True, "hasIndexedDB": True, "hasDataBase": False, "doNotTrack": False, "touchSupport": False, "mediaStreamTrack": True, "preIconClickTrack": data_dict["pre_jigsaw_sliding_track"], "iconClickTrack": json_str1, "inputStartTs": time_, "inputEndTs": tiem_2, "inputTime": difference, "selectMoveTrace": json_str2, "selectMoveTime": f"{random.randint(400, 500)}&{random.randint(200, 300)}", "selectCancelCount": 0, "selectIsTruncation": False, "value": new_list, } enc_text = self.ctrip_aes_enc(data) return enc_text def verify_icon(self, verify_msg): url = "https://ic.ctrip.com/captcha/v4/verify_icon" data = { "appid": self.app_id, "token": self.token, "rid": self.rid, "business_site": self.business_site, "version": self.version, "verify_msg": verify_msg, "dimensions": self.dimensions, "extend_param": self.extend_param, "sign": None, } enc_str = f"appid={data['appid']}&business_site={data['business_site']}&version={data['version']}&verify_msg={data['verify_msg']}&dimensions={data['dimensions']}&extend_param={data['extend_param']}&token={data['token']}&captcha_type=ICON" sign = md5(enc_str.encode()).hexdigest() data["sign"] = sign response = requests.post(url, headers=self.headers, json=data) ret = response.json() logger.info("第二次验证通过") logger.info( f'token ---> {ret["result"]["token"]} rid ---> {ret["result"]["rid"]}' ) return {"token": ret["result"]["token"], "rid": ret["result"]["rid"]} def check_captcha(self): url = "https://m.ctrip.com/restapi/giftcard/lipinService/CheckCaptcha" params = { "_fxpcqlniredt": "09031096417206203360", "__gw_appid": "99999999", "__gw_ver": "1.0", "__gw_from": "214402", "__gw_platform": "H5", } data = { "captcha": { "rid": self.rid, "token": self.token, "version": "2.0.20", "checkState": "hidden", "message": "", }, "size": 1, "PlatForm": "H5", "AppVersion": "", "AppType": "", "LipinVersion": "8.62", "newBound": True, "supportWebp": True, "head": { "cid": self.client_id, "ctok": "", "cver": "1.0", "lang": "01", "sid": "8888", "syscode": "09", "auth": None, "extension": [{"name": "protocal", "value": "https"}], }, "contentType": "json", } data = json.dumps(data, separators=(",", ":")) response = requests.post(url, headers=self.headers3, params=params, data=data) logger.info( f"订单ID:{self.order_num},验证码类型:NONE,captcha_res返回:{response.text}" ) return response.json() def get_card_info(self): url = "https://m.ctrip.com/restapi/giftcard/lipinService/GetRechargeCardInfo" params = { "_fxpcqlniredt": "09031096417206203360", "__gw_appid": "99999999", "__gw_ver": "1.0", "__gw_from": "214402", "__gw_platform": "H5", } data = { "SerialNumber": None, "CardCode": self.card_num, "Password": self.card_pwd, "URL": "https://3g.ctrip.com/webapp/lipin/receiveticket?from=money", "URLParam": "/webapp/lipin/receiveticket", "captcha": { "rid": self.rid, "token": self.token, "version": "2.0.20", "checkState": "hidden", "message": "", }, "token": self.captcha_token, "PlatForm": "H5", "AppVersion": "", "AppType": "", "LipinVersion": "8.62", "newBound": True, "supportWebp": True, "head": { "cid": "09031096417206203360", "ctok": "", "cver": "1.0", "lang": "01", "sid": "8888", "syscode": "09", "auth": None, "extension": [{"name": "protocal", "value": "https"}], }, "contentType": "json", } data = json.dumps(data, separators=(",", ":")) cookies = { "cticket": self.cookies2, } response = requests.post( url, headers=self.headers3, cookies=cookies, params=params, data=data ) logger.info(f"订单ID:{self.order_num},查卡返回:{response.text}") return response.json() def process_cookies(self): if "cticket" in self.cookies2: p_ck = re.findall(r"cticket=(.*?);", self.cookies2) if p_ck: self.cookies2 = p_ck[0] return True else: return False elif len(self.cookies2) == 64: return True else: return False def create_response(self, status_code, data={}): """创建统一的返回结构""" return {"code": status_code, "msg": self.remarks, "data": data} def format_number(self, number): if not number: number = 0 number = float(number) formatted_number = "{:.2f}".format(number) return formatted_number def get_status_code(self, res): try: result_code = res["ResultInfo"]["ResultCode"] # 卡号不存在或卡密错误 if result_code == 1005: self.remarks = "卡号不存在或卡密错误" return 105 # 您已连续输错5次,请30分钟后再试 if result_code == 10004: self.remarks = "您已连续输错5次,请30分钟后再试" return 115 # 无法获取到有效的用户号! if result_code == 1003: self.remarks = "无法获取到有效的用户号!" return 113 # 卡号[cardCode]只能输入12位数字 if result_code == 1002: self.remarks = "卡号[cardCode]只能输入12位数字!" return 105 # 滑块验证失败 if result_code == -10001: self.remarks = "滑块验证失败" return 115 card_status_code = res["RechargeCardList"][0]["CardStatusCode"] face_value = self.format_number(res["RechargeCardList"][0]["FaceValue"]) money = self.format_number(self.price) # 未激活 if card_status_code == 1001: self.remarks = "未激活" return 104 # 已领用 if card_status_code == 4001: self.remarks = "已领用" return 104 # 已作废 if card_status_code == 6001: return 104 # 实际充值和卡面值不一致 if face_value != money: self.remarks = "实际充值和卡面值不一致" return 114 # 已冻结 if card_status_code == 7001: self.remarks = "已冻结" return 104 # 激活未领用 if card_status_code == 2001: self.remarks = "激活未领用" return 2001 self.remarks = "check_card_status未知状态码" return 111 except Exception as e: self.remarks = "check_card_status异常未知状态码" logger.info( f"订单号:{self.order_num},check_card_status异常未知状态码:{traceback.format_exc()}" ) return 111 def do_bind_card(self): url = "https://m.ctrip.com/restapi/giftcard/lipinService/ReceiveGiftCard" params = { "_fxpcqlniredt": "09031096417206203360", "__gw_appid": "99999999", "__gw_ver": "1.0", "__gw_from": "214402", "__gw_platform": "H5", } data = { "VipGate": None, "IsConfirmedCharge": False, "ReceiveMethod": "CP", "ReceiveCards": [ {"Code": self.card_num, "Password": self.card_pwd, "IsHasPoint": False} ], "DeviceID": "09031096417206203360", "captcha": { "rid": self.rid, "token": self.token, "version": "2.0.20", "checkState": "success", "message": "", }, "token": self.captcha_token, "referer": "https://3g.ctrip.com/webapp/lipin/receiveticket?from=money", "clientInfo": { "pageId": "214402", "rmsToken": "fp=2E679A-AF69ED-585793&vid=1717491186803.5a9al5OuAK71&pageId=214402&r=70cfa46f6ec44ce08c51db48a73eb62a&ip=14.19.6.53&rg=fin&kpData=0_0_0&kpControl=0_0_0-0_0_0&kpEmp=0_0_0_0_0_0_0_0_0_0-0_0_0_0_0_0_0_0_0_0-0_0_0_0_0_0_0_0_0_0&screen=414x896&tz=+8&blang=zh-CN&oslang=zh-CN&ua=Mozilla%2F5.0%20(iPhone%3B%20CPU%20iPhone%20OS%2016_6%20like%20Mac%20OS%20X)%20AppleWebKit%2F605.1.15%20(KHTML%2C%20like%20Gecko)%20Version%2F16.6%20Mobile%2F15E148%20Safari%2F604.1&d=3g.ctrip.com&v=25&kpg=0_11_0_0_1890117_65_16_0_0_0&adblock=F&cck=F", "serverFrom": "H5", "url": "https://3g.ctrip.com/webapp/lipin/receiveticket?from=money", "uRLParameter": "from=money", }, "PlatForm": "H5", "AppVersion": "", "AppType": "", "LipinVersion": "8.62", "newBound": True, "supportWebp": True, "head": { "cid": self.client_id, "ctok": "", "cver": "1.0", "lang": "01", "sid": "55551825", "syscode": "09", "auth": None, "extension": [{"name": "protocal", "value": "https"}], }, "contentType": "json", } cookies = { "cticket": self.cookies2, } data = json.dumps(data, separators=(",", ":")) response = requests.post( url, headers=self.headers3, cookies=cookies, params=params, data=data ) logger.info(f"订单ID:{self.order_num},绑卡返回:{response.text}") return response.json() def verify_code(self): for i in range(5): self.guid = ( f"{self.generate_11_digit_number() + self.generate_11_digit_number()}"[ 0:20 ] ) self.generate_11_digit_number() self.generate_screen_info() self.generate_random_hex_string_lowercase() self.set_sequence_id() self.set_client_id() self.generate_random_fp() self.generate_vid() self.generate_rt() t = self.risk_inspect() if t: verify_msg = self.enc_params(t) print(f"verify_msg: {verify_msg}") first_verify_ret = self.verify_jigsaw(verify_msg) if not first_verify_ret: continue if isinstance(first_verify_ret, list): enc_icon = self.enc_icon_params(first_verify_ret) verify_info = self.verify_icon(enc_icon) self.rid = verify_info["rid"] self.token = verify_info["token"] else: self.rid = first_verify_ret["rid"] self.token = first_verify_ret["token"] captcha_res = self.check_captcha() self.captcha_token = captcha_res["tokens"][0] return True return False def check_bind_status(self, res): try: result_code = res["ResultInfo"]["ResultCode"] # 券号密码已经被领用 if result_code == 1: self.remarks = "券号密码已经被领用" return 104 # 绑卡成功 elif result_code == 0: self.remarks = "绑卡成功" return 100 # 出现滑块 elif result_code == -10001: self.remarks = "请求绑卡返回滑块验证失败" return 111 else: self.remarks = "绑卡check_bind_status未知状态码" return 111 except Exception as e: self.remarks = "绑卡check_bind_status异常未知状态码" return 111 def check_card(self): try: verify_status = self.verify_code() if not verify_status: self.remarks = "查卡滑块验证失败" return 110 card_info_res = self.get_card_info() code = self.get_status_code(card_info_res) return self.create_response(status_code=code) except: self.remarks = "check_card方法异常未知" logger.info( f"订单ID:{self.order_num},重新查卡返回:{traceback.format_exc()}" ) return self.create_response(status_code=111) def run(self): try: logger.info( f"订单号:{self.order_num},卡号:{self.card_num},卡密:{self.card_pwd},充值金额:{self.price},ck:{self.cookies2}" ) # 判断cookies if not self.process_cookies(): self.remarks = "ck检测不通过" return self.create_response(status_code=113) # 查卡 check_card_res = self.check_card() code = check_card_res.get("code") # 绑卡 if code == 2001: # 绑卡前重新获取验证码 verify_status = self.verify_code() if not verify_status: self.remarks = "绑卡滑块验证失败" return self.create_response(status_code=110) bind_res = self.do_bind_card() code = self.check_bind_status(bind_res) return self.create_response(status_code=code) except Exception as e: self.remarks = "run方法异常未知" logger.info(f"订单ID:{self.order_num},绑卡返回:{traceback.format_exc()}") return self.create_response(status_code=111) if __name__ == "__main__": cookies = "1E0855FE0BB5344CE91251B384CC4E2A7ADDE4239841FDCBF09DC09144BC7668" card_num = "319935633479" card_pwd = "553831" price = "100" order_num = "123456" ctrip = CtripLogin( cookies=cookies, card_num=card_num, card_pwd=card_pwd, price=price, order_num=order_num, ) # for i in range(10): res = ctrip.check_card() print(res) print("=" * 200)