mirror of
https://git.oceanpay.cc/danial/kami_ctrip.git
synced 2025-12-18 22:24:38 +00:00
- 新增 .drone.yml 文件,配置 Drone CI/CD 流程 - 添加 Dockerfile,定义应用的 Docker 镜像构建过程 - 新增 app.py 文件,实现携程卡绑定功能 - 添加 __pycache__ 目录,存放编译后的 Python 文件
1134 lines
42 KiB
Python
1134 lines
42 KiB
Python
import re
|
||
import traceback
|
||
from hashlib import md5
|
||
import time
|
||
import string
|
||
import PIL
|
||
from PIL import Image
|
||
from base64 import b64decode
|
||
from io import BytesIO
|
||
import numpy as np
|
||
from pathlib import Path
|
||
import random
|
||
import cv2
|
||
import base64
|
||
import json
|
||
import requests
|
||
from itertools import chain
|
||
from logger import get_logger
|
||
|
||
import execjs
|
||
import secrets
|
||
import subprocess
|
||
from functools import partial
|
||
|
||
|
||
logger = get_logger()
|
||
|
||
subprocess.Popen = partial(subprocess.Popen, encoding="utf-8")
|
||
|
||
|
||
def pil_to_cv2(img):
|
||
"""
|
||
pil转cv2图片
|
||
:param img: pil图像, <type 'PIL.JpegImagePlugin.JpegImageFile'>
|
||
:return: cv2图像, <type 'numpy.ndarray'>
|
||
"""
|
||
img = cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)
|
||
return img
|
||
|
||
|
||
def bytes_to_cv2(img):
|
||
"""
|
||
二进制图片转cv2
|
||
:param img: 二进制图片数据, <type 'bytes'>
|
||
:return: cv2图像, <type 'numpy.ndarray'>
|
||
"""
|
||
# 将图片字节码bytes, 转换成一维的numpy数组到缓存中
|
||
img_buffer_np = np.frombuffer(img, dtype=np.uint8)
|
||
# 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式
|
||
img_np = cv2.imdecode(img_buffer_np, 1)
|
||
return img_np
|
||
|
||
|
||
def cv2_open(img, flag=None):
|
||
"""
|
||
统一输出图片格式为cv2图像, <type 'numpy.ndarray'>
|
||
:param img: <type 'bytes'/'numpy.ndarray'/'str'/'Path'/'PIL.JpegImagePlugin.JpegImageFile'>
|
||
:param flag: 颜色空间转换类型, default: None
|
||
eg: cv2.COLOR_BGR2GRAY(灰度图)
|
||
:return: cv2图像, <numpy.ndarray>
|
||
"""
|
||
if isinstance(img, bytes):
|
||
img = bytes_to_cv2(img)
|
||
elif isinstance(img, (str, Path)):
|
||
img = cv2.imread(str(img))
|
||
elif isinstance(img, np.ndarray):
|
||
img = img
|
||
elif isinstance(img, PIL.Image):
|
||
img = pil_to_cv2(img)
|
||
else:
|
||
raise ValueError(f"输入的图片类型无法解析: {type(img)}")
|
||
if flag is not None:
|
||
img = cv2.cvtColor(img, flag)
|
||
return img
|
||
|
||
|
||
def imshow(img, winname="test", delay=0):
|
||
"""cv2展示图片"""
|
||
cv2.imshow(winname, img)
|
||
cv2.waitKey(delay)
|
||
cv2.destroyAllWindows()
|
||
|
||
|
||
def get_distance(bg, tp, im_show=False, save_path=None):
|
||
"""
|
||
:param bg: 背景图路径或Path对象或图片二进制
|
||
eg: 'assets/bg.jpg'
|
||
Path('assets/bg.jpg')
|
||
:param tp: 缺口图路径或Path对象或图片二进制
|
||
eg: 'assets/tp.jpg'
|
||
Path('assets/tp.jpg')
|
||
:param im_show: 是否显示结果, <type 'bool'>; default: False
|
||
:param save_path: 保存路径, <type 'str'/'Path'>; default: None
|
||
:return: 缺口位置
|
||
"""
|
||
# 读取图片
|
||
bg_img = cv2_open(bg)
|
||
tp_gray = cv2_open(tp, flag=cv2.COLOR_BGR2GRAY)
|
||
|
||
# 金字塔均值漂移
|
||
bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50)
|
||
|
||
# 边缘检测
|
||
tp_gray = cv2.Canny(tp_gray, 255, 255)
|
||
bg_gray = cv2.Canny(bg_shift, 255, 255)
|
||
|
||
# 目标匹配
|
||
result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED)
|
||
# 解析匹配结果
|
||
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
|
||
|
||
distance = max_loc[0]
|
||
if save_path or im_show:
|
||
# 需要绘制的方框高度和宽度
|
||
tp_height, tp_width = tp_gray.shape[:2]
|
||
# 矩形左上角点位置
|
||
x, y = max_loc
|
||
# 矩形右下角点位置
|
||
_x, _y = x + tp_width, y + tp_height
|
||
# 绘制矩形
|
||
bg_img = cv2_open(bg)
|
||
cv2.rectangle(bg_img, (x, y), (_x, _y), (0, 0, 255), 2)
|
||
# 保存缺口识别结果到背景图
|
||
if save_path:
|
||
save_path = Path(save_path).resolve()
|
||
save_path = (
|
||
save_path.parent / f"{save_path.stem}.{distance}{save_path.suffix}"
|
||
)
|
||
save_path = save_path.__str__()
|
||
cv2.imwrite(save_path, bg_img)
|
||
# 显示缺口识别结果
|
||
if im_show:
|
||
imshow(bg_img)
|
||
return distance
|
||
|
||
|
||
def generate_coordinate_data(t, include_duplicates=False, total_steps=30):
|
||
"""
|
||
生成模拟的坐标时间序列数据。
|
||
|
||
:param start_x: 起始横坐标
|
||
:param start_y: 起始纵坐标
|
||
:param end_x: 结束横坐标
|
||
:param end_y: 结束纵坐标
|
||
:param include_duplicates: 是否包含重复坐标点
|
||
:param include_backtrack: 是否包含回溯点
|
||
:param total_steps: 总的时间步数
|
||
:return: 包含(x, y, 时间戳)的列表
|
||
"""
|
||
|
||
def gen_arr(current_x, current_y, end_x, end_y):
|
||
arr = []
|
||
arr.append({"x": int(current_x), "y": int(current_y), "t": int(time_base)})
|
||
step_time = ((time_base + 3000) - time_base) / total_steps
|
||
for step in range(total_steps):
|
||
# 决定是否添加重复点
|
||
if include_duplicates and random.random() < 0.1: # 10%概率添加重复点
|
||
arr.append(
|
||
{
|
||
"x": current_x,
|
||
"y": current_y,
|
||
"t": int(time_base + step * step_time),
|
||
}
|
||
)
|
||
|
||
# 计算新坐标,模拟逐步接近终点的趋势
|
||
if current_x > end_x:
|
||
dx = -1 if random.random() < 0.8 else -2 # 大部分时间递减,偶尔更快
|
||
elif current_x < end_x:
|
||
dx = 1 if random.random() < 0.8 else 2
|
||
else:
|
||
dx = 0
|
||
if current_y > end_y:
|
||
dy = -1
|
||
elif current_y < end_y:
|
||
dy = 1
|
||
else:
|
||
dy = 0
|
||
|
||
current_x += dx
|
||
current_y += dy
|
||
arr.append(
|
||
{
|
||
"x": int(current_x),
|
||
"y": int(current_y),
|
||
"t": int(time_base + step * step_time),
|
||
}
|
||
)
|
||
return arr
|
||
|
||
def del_timestamp(arr):
|
||
return [{"x": item["x"], "y": item["y"]} for item in arr]
|
||
|
||
current_x, current_y = random.randint(200, 300), random.randint(500, 503)
|
||
end_x, end_y = random.randint(700, 800), random.randint(500, 503)
|
||
time_base = int(time.time() * 1000) # 假定的起始时间戳
|
||
mouse_trajectory = gen_arr(current_x, current_y, end_x, end_y)
|
||
|
||
dx, dy = mouse_trajectory[-1]["x"], mouse_trajectory[-1]["y"]
|
||
end_dx = dx + t
|
||
sliding_trajectory = gen_arr(dx, dy, end_dx, dy)
|
||
return {
|
||
"pre_jigsaw_sliding_track": mouse_trajectory,
|
||
"jigsaw_sliding_track": sliding_trajectory,
|
||
"sliding_track": del_timestamp(sliding_trajectory),
|
||
}
|
||
|
||
|
||
class Show_xy:
|
||
def __init__(self, bg_path, tg_path, save_path):
|
||
# 由于没有识别模型, ttshitu.com 需要取这个网站注册一个账号来识别点选
|
||
self._user_name = ""
|
||
self._pwd = ""
|
||
self._typeid = 21
|
||
self._save_path = save_path
|
||
self.__join_img(bg_path, tg_path)
|
||
|
||
def __join_img(self, bg_path, tg_path):
|
||
# 读取图像
|
||
image_bottom_pil = Image.open(tg_path) # 假设是PNG格式,包含透明度
|
||
image_bottom_pil = image_bottom_pil.convert("RGBA")
|
||
|
||
# 将透明部分替换为白色
|
||
datas = image_bottom_pil.getdata()
|
||
new_data = []
|
||
for item in datas:
|
||
if item[3] == 0: # alpha通道值为0表示完全透明
|
||
new_data.append((255, 255, 255, 255)) # 替换为白色
|
||
else:
|
||
new_data.append(item)
|
||
image_bottom_pil.putdata(new_data)
|
||
|
||
new_width = int(image_bottom_pil.width * 0.9)
|
||
new_height = int(image_bottom_pil.height * 0.9)
|
||
image_bottom_resized_pil = image_bottom_pil.resize((new_width, new_height))
|
||
|
||
image_bottom = np.array(image_bottom_resized_pil)[:, :, :3][..., ::-1]
|
||
image_top = cv2.imread(bg_path)
|
||
height_top, width_top, _ = image_top.shape
|
||
height_bottom, width_bottom, _ = image_bottom.shape
|
||
|
||
new_height = height_top + height_bottom
|
||
new_width = max(width_top, width_bottom)
|
||
new_image = np.ones((new_height, new_width, 3), dtype=np.uint8) * 255
|
||
|
||
new_image[:height_top, :width_top] = image_top
|
||
new_image[height_top:, :width_bottom] = image_bottom
|
||
|
||
cv2.imwrite(self._save_path, new_image, [cv2.IMWRITE_JPEG_QUALITY, 100])
|
||
|
||
def show_xy(self):
|
||
with open(self._save_path, "rb") as f:
|
||
base64_data = base64.b64encode(f.read())
|
||
b64 = base64_data.decode()
|
||
data = {
|
||
"username": self._user_name,
|
||
"password": self._pwd,
|
||
"typeid": self._typeid,
|
||
"remark": "",
|
||
"image": b64,
|
||
}
|
||
result = json.loads(
|
||
requests.post("http://api.ttshitu.com/predict", json=data).text
|
||
)
|
||
if result["success"]:
|
||
res = result["data"]["result"]
|
||
res.split("|")
|
||
return True, [i.split(",") for i in res.split("|")]
|
||
else:
|
||
# !!!!!!!注意:返回 人工不足等 错误情况 请加逻辑处理防止脚本卡死 继续重新 识别
|
||
return False, result["message"]
|
||
|
||
|
||
class CtripLogin(object):
|
||
def __init__(self, card_num, card_pwd, price, order_num, cookies):
|
||
self.headers = {
|
||
"accept": "*/*",
|
||
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
|
||
"cache-control": "no-cache",
|
||
"content-type": "application/json;charset=UTF-8",
|
||
"origin": "https://passport.ctrip.com",
|
||
"pragma": "no-cache",
|
||
"priority": "u=1, i",
|
||
"referer": "https://passport.ctrip.com/",
|
||
"sec-ch-ua": '"Chromium";v="124", "Microsoft Edge";v="124", "Not-A.Brand";v="99"',
|
||
"sec-ch-ua-mobile": "?0",
|
||
"sec-ch-ua-platform": '"Windows"',
|
||
"sec-fetch-dest": "empty",
|
||
"sec-fetch-mode": "cors",
|
||
"sec-fetch-site": "same-site",
|
||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0",
|
||
}
|
||
self.headers2 = {
|
||
"accept": "*/*",
|
||
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
|
||
"cache-control": "no-cache",
|
||
"content-type": "text/plain",
|
||
"cookieorigin": "https://passport.ctrip.com",
|
||
"origin": "https://passport.ctrip.com",
|
||
"pragma": "no-cache",
|
||
"priority": "u=1, i",
|
||
"referer": "https://passport.ctrip.com/",
|
||
"sec-ch-ua": '"Chromium";v="130", "Microsoft Edge";v="130", "Not?A_Brand";v="99"',
|
||
"sec-ch-ua-mobile": "?0",
|
||
"sec-ch-ua-platform": '"Windows"',
|
||
"sec-fetch-dest": "empty",
|
||
"sec-fetch-mode": "cors",
|
||
"sec-fetch-site": "same-site",
|
||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
|
||
"x-origin-ct": "application/json;charset=utf-8",
|
||
"x-payload-accept": "camev1",
|
||
"x-payload-encoding": "camev1",
|
||
}
|
||
self.headers3 = {
|
||
"accept": "application/json",
|
||
"accept-language": "zh-CN,zh;q=0.9",
|
||
"cache-control": "no-cache",
|
||
"content-type": "application/json",
|
||
"cookieorigin": "https://3g.ctrip.com",
|
||
"origin": "https://3g.ctrip.com",
|
||
"pragma": "no-cache",
|
||
"priority": "u=1, i",
|
||
"referer": "https://3g.ctrip.com/",
|
||
"sec-fetch-dest": "empty",
|
||
"sec-fetch-mode": "cors",
|
||
"sec-fetch-site": "same-site",
|
||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
|
||
"x-ctrip-pageid": "214402",
|
||
}
|
||
self.cookies = {}
|
||
# 接收参数
|
||
self.card_num = card_num
|
||
self.card_pwd = card_pwd
|
||
self.price = price
|
||
self.order_num = order_num
|
||
self.cookies2 = cookies
|
||
# self.phone = phone
|
||
self.token = None
|
||
self.rid = None
|
||
self.captcha_token = None
|
||
self.dimensions = None
|
||
self.client_id = None
|
||
self.sequence_id = None
|
||
self.display = None
|
||
self.extend_param = None
|
||
self.screen_size_info = None
|
||
|
||
# 指纹参数
|
||
self.page_id = None
|
||
self.fp = None
|
||
self.vid = None
|
||
self.r = None
|
||
self.rt = None
|
||
self.random_str_32 = None
|
||
self.ip = "14.19.6.53"
|
||
self.app_id = "100015783"
|
||
self.business_site = "prepay_giftcardquery_h5_pic"
|
||
self.version = "2.0.23"
|
||
self.guid = None
|
||
|
||
# self.__save_img_path = __file__.replace('\\', '/').rsplit('/', 1)[0] + '/images/'
|
||
# if not os.path.isdir(self.__save_img_path):
|
||
# os.makedirs(self.__save_img_path)
|
||
|
||
self.remarks = ""
|
||
|
||
@staticmethod
|
||
def generate_non_zero_starting_number(length):
|
||
first_digit = random.randint(1, 9)
|
||
remaining_length = length - 1
|
||
remaining_digits = "".join(
|
||
str(random.randint(0, 9)) for _ in range(remaining_length)
|
||
)
|
||
random_number_str = str(first_digit) + remaining_digits
|
||
return random_number_str
|
||
|
||
@staticmethod
|
||
def generate_random_string(length):
|
||
possible_characters = string.ascii_letters + string.digits
|
||
random_string = "".join(
|
||
secrets.choice(possible_characters) for _ in range(length)
|
||
)
|
||
return random_string
|
||
|
||
@staticmethod
|
||
def ctrip_aes_enc(data):
|
||
with open("./enc1.js", "r", encoding="utf8") as f:
|
||
js_code = f.read()
|
||
ctx = execjs.compile(js_code)
|
||
return ctx.call("enc", data)
|
||
|
||
@staticmethod
|
||
def login_crypto(med, text):
|
||
with open("./passwordEnc.js", encoding="utf8") as f:
|
||
js_code = f.read()
|
||
ctx = execjs.compile(js_code).call(med, text)
|
||
return ctx
|
||
|
||
def generate_random_hex_string_lowercase(self, length=32):
|
||
characters = string.hexdigits.lower()
|
||
random_str = "".join(random.choice(characters) for _ in range(length))
|
||
self.r = random_str
|
||
|
||
def generate_vid(self, length=8):
|
||
# 生成随机的小数部分,保留两位小数
|
||
decimal_part = int(time.time() * 1000)
|
||
# 生成随机的字母后缀,指定长度
|
||
suffix = "".join(
|
||
random.choices(
|
||
string.ascii_uppercase + string.ascii_lowercase + string.digits,
|
||
k=length,
|
||
)
|
||
)
|
||
# 拼接整个字符串
|
||
result_string = str(decimal_part) + "." + suffix
|
||
self.vid = result_string
|
||
|
||
def generate_screen_info(self):
|
||
"""
|
||
指纹相关参数
|
||
:return:
|
||
"""
|
||
screen_information = [
|
||
{
|
||
"display": "2560x1440",
|
||
"screen_size": {
|
||
"resolution_width": 2260,
|
||
"resolution_height": 1360,
|
||
"language": "",
|
||
},
|
||
},
|
||
{
|
||
"display": "1920x1080",
|
||
"screen_size": {
|
||
"resolution_width": 1536,
|
||
"resolution_height": 864,
|
||
"language": "",
|
||
},
|
||
},
|
||
{
|
||
"display": "3200x1680",
|
||
"screen_size": {
|
||
"resolution_width": 2950,
|
||
"resolution_height": 1480,
|
||
"language": "",
|
||
},
|
||
},
|
||
{
|
||
"display": "720x480",
|
||
"screen_size": {
|
||
"resolution_width": 640,
|
||
"resolution_height": 420,
|
||
"language": "",
|
||
},
|
||
},
|
||
]
|
||
screen = random.choice(screen_information)
|
||
self.display = screen["display"]
|
||
self.screen_size_info = screen["screen_size"]
|
||
self.extend_param = self.ctrip_aes_enc(screen["screen_size"])
|
||
|
||
def generate_random_fp(self):
|
||
def random_hex_segment_upper():
|
||
return format(random.randint(0, 0xFFFFFF), "06X")
|
||
|
||
fp = "-".join([random_hex_segment_upper() for _ in range(3)])
|
||
self.fp = fp
|
||
|
||
def generate_rt(self):
|
||
data = {
|
||
"fp": self.fp,
|
||
"vid": self.vid,
|
||
"pageId": self.page_id,
|
||
"r": self.r,
|
||
"ip": self.ip,
|
||
"rg": "fin",
|
||
"kpData": "0_0_0",
|
||
"kpControl": "0_0_0-0_0_0",
|
||
"kpEmp": "0_0_0_0_0_0_0_0_0_0-0_0_0_0_0_0_0_0_0_0-0_0_0_0_0_0_0_0_0_0",
|
||
"screen": self.display,
|
||
"tz": "+8",
|
||
"blang": "zh-CN",
|
||
"oslang": "zh-CN",
|
||
"ua": "Mozilla%2F5.0%20(Windows%20NT%2010.0%3B%20Win64%3B%20x64)%20AppleWebKit%2F537.36%20(KHTML%2C%20like%20Gecko)%20Chrome%2F124.0.0.0%20Safari%2F537.36%20Edg%2F124.0.0.0",
|
||
"d": "passport.ctrip.com",
|
||
"v": "25",
|
||
"kpg": "0_0_0_19_12161_43_0_0_0_0",
|
||
"adblock": "F",
|
||
"cck": "F",
|
||
"GUID": self.guid,
|
||
}
|
||
rt = "&".join(f"{key}={value}" for key, value in data.items())
|
||
self.rt = rt
|
||
|
||
def set_client_id(self):
|
||
url = "https://m.ctrip.com/restapi/soa2/10290/createclientid"
|
||
params = {"systemcode": "09", "createtype": "3", "contentType": "json"}
|
||
response = requests.get(url, headers=self.headers, params=params)
|
||
self.client_id = response.json()["ClientID"]
|
||
logger.info(f"订单ID:{self.order_num},初始化 client_id 成功 {self.client_id}")
|
||
|
||
def set_sequence_id(self):
|
||
time_ = time.time() * 1000
|
||
url = "https://bbzutils.ctrip.com/ubtChecking"
|
||
data = {
|
||
"vid": f"{self.vid}",
|
||
"sid": 1,
|
||
"pvid": 1,
|
||
"cid": None,
|
||
"pageId": self.page_id,
|
||
"env": "online",
|
||
"ts": time_,
|
||
"timezoneOffset": -480,
|
||
"sendType": "single",
|
||
"pageRef": self.page_id,
|
||
"contextTs": time_,
|
||
"sdkVer": "1.2.36/new/c",
|
||
"url": "https://passport.ctrip.com/user/login?BackUrl=https%3A%2F%2Fwww.ctrip.com%2F%3Fsid%3D153507%26allianceid%3D5376%26ouid%3Dtitle%26sepopup%3D104#ctm_ref=c_ph_login_buttom",
|
||
"contextTs_date": time_,
|
||
"collectReq": "",
|
||
"tag": "",
|
||
}
|
||
data = json.dumps(data, separators=(",", ":"))
|
||
response = requests.post(
|
||
url, headers=self.headers, cookies=self.cookies, data=data
|
||
)
|
||
self.sequence_id = response.json()["ResponseStatus"]["Extension"][0]["Value"]
|
||
logger.info(
|
||
f"订单ID:{self.order_num},初始化 sequence_id 成功 {self.sequence_id}"
|
||
)
|
||
|
||
@staticmethod
|
||
def base64_to_image(base64_string: str, output_file: str):
|
||
"""
|
||
base64 转为png图片
|
||
:param base64_string:
|
||
:param output_file:
|
||
:return:
|
||
"""
|
||
encoded = base64_string
|
||
decoded = b64decode(encoded)
|
||
image_data = BytesIO(decoded)
|
||
image = Image.open(image_data)
|
||
image.save(output_file, "png")
|
||
|
||
@staticmethod
|
||
def merge_and_scale_arrays(arrays):
|
||
merged_array = list(chain(*arrays))
|
||
print(merged_array)
|
||
return [int(element) for element in merged_array]
|
||
|
||
def generate_11_digit_number(self):
|
||
# 生成一个1开头的11位数,第二位到第十一位是0-9之间的随机数
|
||
number = "1" + "".join(str(random.randint(0, 9)) for _ in range(10))
|
||
self.page_id = str(number)
|
||
return number
|
||
|
||
def risk_inspect(self):
|
||
data_dict = {
|
||
"rt": self.rt,
|
||
"ua": self.headers["user-agent"],
|
||
"p": "pc",
|
||
"fp": self.fp,
|
||
"vid": f"{self.vid}",
|
||
"identify": self.fp,
|
||
"guid": self.client_id,
|
||
"h5_duid": None,
|
||
"pc_duid": None,
|
||
"hb_uid": None,
|
||
"pc_uid": None,
|
||
"h5_uid": None,
|
||
"infosec_openid": None,
|
||
"device_id": self.r,
|
||
"client_id": self.generate_random_string(32),
|
||
"pid": self.generate_non_zero_starting_number(16),
|
||
"sid": self.generate_random_string(16),
|
||
"login_uid": self.generate_non_zero_starting_number(10),
|
||
"client_type": "PC",
|
||
"site": {
|
||
"type": "PC",
|
||
"url": "https://m.ctrip.com/webapp/cw/giftcard/xct/login_bycode.html",
|
||
"ref": "",
|
||
"title": "登录",
|
||
"keywords": "携程通,携程通用车,携程,礼品卡,人力资源,员工福利,企业管理,旅游",
|
||
},
|
||
"device": {
|
||
"width": self.screen_size_info["resolution_width"],
|
||
"height": self.screen_size_info["resolution_height"],
|
||
"os": "",
|
||
"pixelRatio": 1.5,
|
||
"did": "",
|
||
},
|
||
"user": {"tid": "", "uid": "", "vid": ""},
|
||
}
|
||
dimensions = self.ctrip_aes_enc(data_dict)
|
||
self.dimensions = dimensions
|
||
json_data = {
|
||
"extend_param": self.extend_param,
|
||
"appid": self.app_id,
|
||
"business_site": self.business_site,
|
||
"version": self.version,
|
||
"dimensions": dimensions,
|
||
"sign": None,
|
||
}
|
||
enc_str = f"appid={json_data['appid']}&business_site={json_data['business_site']}&version={json_data['version']}&dimensions={json_data['dimensions']}&extend_param={json_data['extend_param']}"
|
||
sign = md5(enc_str.encode()).hexdigest()
|
||
json_data["sign"] = sign
|
||
response = requests.post(
|
||
url="https://ic.ctrip.com/captcha/v4/risk_inspect",
|
||
headers=self.headers,
|
||
json=json_data,
|
||
)
|
||
ret = response.json()
|
||
self.token = ret["result"]["token"]
|
||
self.rid = ret["result"]["rid"]
|
||
if ret["result"]["risk_info"]["process_type"] == "NONE":
|
||
self.token = ret["result"]["token"]
|
||
self.rid = ret["result"]["rid"]
|
||
return None
|
||
else:
|
||
bg = ret["result"]["risk_info"]["process_value"]["processed_image"]
|
||
slide = ret["result"]["risk_info"]["process_value"]["jigsaw_image"]
|
||
# verify_bg_path = os.path.join(self.__save_img_path, f'bg_{int(time.time() * 1000)}.png')
|
||
# verify_slide_path = os.path.join(self.__save_img_path, f'slide_{int(time.time() * 1000)}.png')
|
||
# self.base64_to_image(bg, verify_bg_path)
|
||
# self.base64_to_image(slide, verify_slide_path)
|
||
|
||
distance = int(get_distance(base64.b64decode(bg), base64.b64decode(slide)))
|
||
logger.info(f"订单ID:{self.order_num},滑动距离 ---> {distance}")
|
||
return distance
|
||
|
||
def enc_params(self, t):
|
||
data_dict = generate_coordinate_data(t)
|
||
data = {
|
||
"st": int(time.time()) * 1000,
|
||
"slidingTime": random.randint(3000, 4000),
|
||
"display": self.display,
|
||
"keykoardTrack": [],
|
||
"jigsawKeyboardEventExist": False,
|
||
"jigsawPicWidth": 320,
|
||
"jigsawPicHeight": 160,
|
||
"jigsawViewDuration": random.randint(1000, 2000),
|
||
"slidingTrack": data_dict["sliding_track"],
|
||
"timezone": -480,
|
||
"flashState": False,
|
||
"language": "zh-CN",
|
||
"platform": "Win32",
|
||
"hasSessStorage": False,
|
||
"hasLocalStorage": False,
|
||
"hasIndexedDB": False,
|
||
"hasDataBase": False,
|
||
"doNotTrack": False,
|
||
"touchSupport": False,
|
||
"mediaStreamTrack": False,
|
||
"value": t,
|
||
"preJigsawSlidingTrack": data_dict["pre_jigsaw_sliding_track"],
|
||
"jigsawSlidingTrack": data_dict["jigsaw_sliding_track"],
|
||
}
|
||
enc_text = self.ctrip_aes_enc(data)
|
||
return enc_text
|
||
|
||
def verify_jigsaw(self, verify_msg):
|
||
json_data = {
|
||
"appid": self.app_id,
|
||
"business_site": self.business_site,
|
||
"token": self.token,
|
||
"rid": self.rid,
|
||
"version": self.version,
|
||
"verify_msg": verify_msg,
|
||
"dimensions": self.dimensions,
|
||
"extend_param": self.extend_param,
|
||
"sign": None,
|
||
}
|
||
enc_str = f"appid={json_data['appid']}&business_site={json_data['business_site']}&version={json_data['version']}&verify_msg={json_data['verify_msg']}&dimensions={json_data['dimensions']}&extend_param={json_data['extend_param']}&token={json_data['token']}&captcha_type=JIGSAW"
|
||
sign = md5(enc_str.encode()).hexdigest()
|
||
json_data["sign"] = sign
|
||
response = requests.post(
|
||
"https://ic.ctrip.com/captcha/v4/verify_jigsaw",
|
||
headers=self.headers,
|
||
json=json_data,
|
||
)
|
||
ret = response.json()
|
||
|
||
if "ICON" in response.text:
|
||
logger.info(f"订单ID:{self.order_num},出现点选验证,请重新请求")
|
||
# self.token = ret["result"]["token"]
|
||
# self.rid = ret["result"]["rid"]
|
||
# bg = ret["result"]["risk_info"]["process_value"]["big_image"]
|
||
# icon = ret["result"]["risk_info"]["process_value"]["small_image"]
|
||
# verify_bg_path = os.path.join(self.__save_img_path, f'bg2_{int(time.time() * 1000)}.png')
|
||
# verify_slide_path = os.path.join(self.__save_img_path, f'icon{int(time.time() * 1000)}.png')
|
||
# self.base64_to_image(bg, verify_bg_path)
|
||
# self.base64_to_image(icon, verify_slide_path)
|
||
# save_path = os.path.join(self.__save_img_path, f'{int(time.time() * 1000)}.png')
|
||
# coordinate = Show_xy(verify_bg_path, verify_slide_path, save_path).show_xy()
|
||
# new_list = self.merge_and_scale_arrays(coordinate[1])
|
||
# logger.info(f'点选坐标 ---> {new_list}')
|
||
# return new_list
|
||
return {}
|
||
else:
|
||
logger.info(
|
||
f'订单ID:{self.order_num},通过滑块验证码 token ---> {ret["result"]["token"]} rid ---> {ret["result"]["rid"]}'
|
||
)
|
||
return {"token": ret["result"]["token"], "rid": ret["result"]["rid"]}
|
||
|
||
def enc_icon_params(self, new_list):
|
||
t = random.randint(460, 500)
|
||
data_dict = generate_coordinate_data(t)
|
||
time_ = int(time.time() * 1000)
|
||
difference = int(random.randint(3000, 4000))
|
||
tiem_2 = time_ - difference
|
||
|
||
json_str1 = json.dumps(
|
||
data_dict["jigsaw_sliding_track"], ensure_ascii=False, separators=(",", ":")
|
||
)
|
||
json_str2 = json.dumps(
|
||
data_dict["sliding_track"], ensure_ascii=False, separators=(",", ":")
|
||
)
|
||
data = {
|
||
"st": time_,
|
||
# 这里是检测屏幕大小的位置
|
||
"display": self.display,
|
||
"keykoardTrack": [],
|
||
"iconKeyboardEventExist": True,
|
||
"iconViewDuration": 2237,
|
||
"timezone": -480,
|
||
"flashState": False,
|
||
"language": "zh-CN",
|
||
"platform": "Win32",
|
||
"hasSessStorage": True,
|
||
"hasLocalStorage": True,
|
||
"hasIndexedDB": True,
|
||
"hasDataBase": False,
|
||
"doNotTrack": False,
|
||
"touchSupport": False,
|
||
"mediaStreamTrack": True,
|
||
"preIconClickTrack": data_dict["pre_jigsaw_sliding_track"],
|
||
"iconClickTrack": json_str1,
|
||
"inputStartTs": time_,
|
||
"inputEndTs": tiem_2,
|
||
"inputTime": difference,
|
||
"selectMoveTrace": json_str2,
|
||
"selectMoveTime": f"{random.randint(400, 500)}&{random.randint(200, 300)}",
|
||
"selectCancelCount": 0,
|
||
"selectIsTruncation": False,
|
||
"value": new_list,
|
||
}
|
||
enc_text = self.ctrip_aes_enc(data)
|
||
return enc_text
|
||
|
||
def verify_icon(self, verify_msg):
|
||
url = "https://ic.ctrip.com/captcha/v4/verify_icon"
|
||
data = {
|
||
"appid": self.app_id,
|
||
"token": self.token,
|
||
"rid": self.rid,
|
||
"business_site": self.business_site,
|
||
"version": self.version,
|
||
"verify_msg": verify_msg,
|
||
"dimensions": self.dimensions,
|
||
"extend_param": self.extend_param,
|
||
"sign": None,
|
||
}
|
||
enc_str = f"appid={data['appid']}&business_site={data['business_site']}&version={data['version']}&verify_msg={data['verify_msg']}&dimensions={data['dimensions']}&extend_param={data['extend_param']}&token={data['token']}&captcha_type=ICON"
|
||
sign = md5(enc_str.encode()).hexdigest()
|
||
data["sign"] = sign
|
||
response = requests.post(url, headers=self.headers, json=data)
|
||
ret = response.json()
|
||
logger.info("第二次验证通过")
|
||
logger.info(
|
||
f'token ---> {ret["result"]["token"]} rid ---> {ret["result"]["rid"]}'
|
||
)
|
||
return {"token": ret["result"]["token"], "rid": ret["result"]["rid"]}
|
||
|
||
def check_captcha(self):
|
||
url = "https://m.ctrip.com/restapi/giftcard/lipinService/CheckCaptcha"
|
||
params = {
|
||
"_fxpcqlniredt": "09031096417206203360",
|
||
"__gw_appid": "99999999",
|
||
"__gw_ver": "1.0",
|
||
"__gw_from": "214402",
|
||
"__gw_platform": "H5",
|
||
}
|
||
data = {
|
||
"captcha": {
|
||
"rid": self.rid,
|
||
"token": self.token,
|
||
"version": "2.0.20",
|
||
"checkState": "hidden",
|
||
"message": "",
|
||
},
|
||
"size": 1,
|
||
"PlatForm": "H5",
|
||
"AppVersion": "",
|
||
"AppType": "",
|
||
"LipinVersion": "8.62",
|
||
"newBound": True,
|
||
"supportWebp": True,
|
||
"head": {
|
||
"cid": self.client_id,
|
||
"ctok": "",
|
||
"cver": "1.0",
|
||
"lang": "01",
|
||
"sid": "8888",
|
||
"syscode": "09",
|
||
"auth": None,
|
||
"extension": [{"name": "protocal", "value": "https"}],
|
||
},
|
||
"contentType": "json",
|
||
}
|
||
data = json.dumps(data, separators=(",", ":"))
|
||
response = requests.post(url, headers=self.headers3, params=params, data=data)
|
||
logger.info(
|
||
f"订单ID:{self.order_num},验证码类型:NONE,captcha_res返回:{response.text}"
|
||
)
|
||
return response.json()
|
||
|
||
def get_card_info(self):
|
||
url = "https://m.ctrip.com/restapi/giftcard/lipinService/GetRechargeCardInfo"
|
||
params = {
|
||
"_fxpcqlniredt": "09031096417206203360",
|
||
"__gw_appid": "99999999",
|
||
"__gw_ver": "1.0",
|
||
"__gw_from": "214402",
|
||
"__gw_platform": "H5",
|
||
}
|
||
data = {
|
||
"SerialNumber": None,
|
||
"CardCode": self.card_num,
|
||
"Password": self.card_pwd,
|
||
"URL": "https://3g.ctrip.com/webapp/lipin/receiveticket?from=money",
|
||
"URLParam": "/webapp/lipin/receiveticket",
|
||
"captcha": {
|
||
"rid": self.rid,
|
||
"token": self.token,
|
||
"version": "2.0.20",
|
||
"checkState": "hidden",
|
||
"message": "",
|
||
},
|
||
"token": self.captcha_token,
|
||
"PlatForm": "H5",
|
||
"AppVersion": "",
|
||
"AppType": "",
|
||
"LipinVersion": "8.62",
|
||
"newBound": True,
|
||
"supportWebp": True,
|
||
"head": {
|
||
"cid": "09031096417206203360",
|
||
"ctok": "",
|
||
"cver": "1.0",
|
||
"lang": "01",
|
||
"sid": "8888",
|
||
"syscode": "09",
|
||
"auth": None,
|
||
"extension": [{"name": "protocal", "value": "https"}],
|
||
},
|
||
"contentType": "json",
|
||
}
|
||
data = json.dumps(data, separators=(",", ":"))
|
||
cookies = {
|
||
"cticket": self.cookies2,
|
||
}
|
||
response = requests.post(
|
||
url, headers=self.headers3, cookies=cookies, params=params, data=data
|
||
)
|
||
logger.info(f"订单ID:{self.order_num},查卡返回:{response.text}")
|
||
return response.json()
|
||
|
||
def process_cookies(self):
|
||
if "cticket" in self.cookies2:
|
||
p_ck = re.findall(r"cticket=(.*?);", self.cookies2)
|
||
if p_ck:
|
||
self.cookies2 = p_ck[0]
|
||
return True
|
||
else:
|
||
return False
|
||
elif len(self.cookies2) == 64:
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
def create_response(self, status_code, data={}):
|
||
"""创建统一的返回结构"""
|
||
return {"code": status_code, "msg": self.remarks, "data": data}
|
||
|
||
def format_number(self, number):
|
||
if not number:
|
||
number = 0
|
||
number = float(number)
|
||
formatted_number = "{:.2f}".format(number)
|
||
return formatted_number
|
||
|
||
def get_status_code(self, res):
|
||
try:
|
||
result_code = res["ResultInfo"]["ResultCode"]
|
||
# 卡号不存在或卡密错误
|
||
if result_code == 1005:
|
||
self.remarks = "卡号不存在或卡密错误"
|
||
return 105
|
||
# 您已连续输错5次,请30分钟后再试
|
||
if result_code == 10004:
|
||
self.remarks = "您已连续输错5次,请30分钟后再试"
|
||
return 115
|
||
# 无法获取到有效的用户号!
|
||
if result_code == 1003:
|
||
self.remarks = "无法获取到有效的用户号!"
|
||
return 113
|
||
# 卡号[cardCode]只能输入12位数字
|
||
if result_code == 1002:
|
||
self.remarks = "卡号[cardCode]只能输入12位数字!"
|
||
return 105
|
||
# 滑块验证失败
|
||
if result_code == -10001:
|
||
self.remarks = "滑块验证失败"
|
||
return 115
|
||
|
||
card_status_code = res["RechargeCardList"][0]["CardStatusCode"]
|
||
face_value = self.format_number(res["RechargeCardList"][0]["FaceValue"])
|
||
money = self.format_number(self.price)
|
||
# 未激活
|
||
if card_status_code == 1001:
|
||
self.remarks = "未激活"
|
||
return 104
|
||
# 已领用
|
||
if card_status_code == 4001:
|
||
self.remarks = "已领用"
|
||
return 104
|
||
# 已作废
|
||
if card_status_code == 6001:
|
||
return 104
|
||
# 实际充值和卡面值不一致
|
||
if face_value != money:
|
||
self.remarks = "实际充值和卡面值不一致"
|
||
return 114
|
||
# 已冻结
|
||
if card_status_code == 7001:
|
||
self.remarks = "已冻结"
|
||
return 104
|
||
# 激活未领用
|
||
if card_status_code == 2001:
|
||
self.remarks = "激活未领用"
|
||
return 2001
|
||
|
||
self.remarks = "check_card_status未知状态码"
|
||
return 111
|
||
except Exception as e:
|
||
self.remarks = "check_card_status异常未知状态码"
|
||
logger.info(
|
||
f"订单号:{self.order_num},check_card_status异常未知状态码:{traceback.format_exc()}"
|
||
)
|
||
return 111
|
||
|
||
def do_bind_card(self):
|
||
url = "https://m.ctrip.com/restapi/giftcard/lipinService/ReceiveGiftCard"
|
||
params = {
|
||
"_fxpcqlniredt": "09031096417206203360",
|
||
"__gw_appid": "99999999",
|
||
"__gw_ver": "1.0",
|
||
"__gw_from": "214402",
|
||
"__gw_platform": "H5",
|
||
}
|
||
data = {
|
||
"VipGate": None,
|
||
"IsConfirmedCharge": False,
|
||
"ReceiveMethod": "CP",
|
||
"ReceiveCards": [
|
||
{"Code": self.card_num, "Password": self.card_pwd, "IsHasPoint": False}
|
||
],
|
||
"DeviceID": "09031096417206203360",
|
||
"captcha": {
|
||
"rid": self.rid,
|
||
"token": self.token,
|
||
"version": "2.0.20",
|
||
"checkState": "success",
|
||
"message": "",
|
||
},
|
||
"token": self.captcha_token,
|
||
"referer": "https://3g.ctrip.com/webapp/lipin/receiveticket?from=money",
|
||
"clientInfo": {
|
||
"pageId": "214402",
|
||
"rmsToken": "fp=2E679A-AF69ED-585793&vid=1717491186803.5a9al5OuAK71&pageId=214402&r=70cfa46f6ec44ce08c51db48a73eb62a&ip=14.19.6.53&rg=fin&kpData=0_0_0&kpControl=0_0_0-0_0_0&kpEmp=0_0_0_0_0_0_0_0_0_0-0_0_0_0_0_0_0_0_0_0-0_0_0_0_0_0_0_0_0_0&screen=414x896&tz=+8&blang=zh-CN&oslang=zh-CN&ua=Mozilla%2F5.0%20(iPhone%3B%20CPU%20iPhone%20OS%2016_6%20like%20Mac%20OS%20X)%20AppleWebKit%2F605.1.15%20(KHTML%2C%20like%20Gecko)%20Version%2F16.6%20Mobile%2F15E148%20Safari%2F604.1&d=3g.ctrip.com&v=25&kpg=0_11_0_0_1890117_65_16_0_0_0&adblock=F&cck=F",
|
||
"serverFrom": "H5",
|
||
"url": "https://3g.ctrip.com/webapp/lipin/receiveticket?from=money",
|
||
"uRLParameter": "from=money",
|
||
},
|
||
"PlatForm": "H5",
|
||
"AppVersion": "",
|
||
"AppType": "",
|
||
"LipinVersion": "8.62",
|
||
"newBound": True,
|
||
"supportWebp": True,
|
||
"head": {
|
||
"cid": self.client_id,
|
||
"ctok": "",
|
||
"cver": "1.0",
|
||
"lang": "01",
|
||
"sid": "55551825",
|
||
"syscode": "09",
|
||
"auth": None,
|
||
"extension": [{"name": "protocal", "value": "https"}],
|
||
},
|
||
"contentType": "json",
|
||
}
|
||
cookies = {
|
||
"cticket": self.cookies2,
|
||
}
|
||
data = json.dumps(data, separators=(",", ":"))
|
||
response = requests.post(
|
||
url, headers=self.headers3, cookies=cookies, params=params, data=data
|
||
)
|
||
logger.info(f"订单ID:{self.order_num},绑卡返回:{response.text}")
|
||
return response.json()
|
||
|
||
def verify_code(self):
|
||
for i in range(5):
|
||
self.guid = (
|
||
f"{self.generate_11_digit_number() + self.generate_11_digit_number()}"[
|
||
0:20
|
||
]
|
||
)
|
||
self.generate_11_digit_number()
|
||
self.generate_screen_info()
|
||
self.generate_random_hex_string_lowercase()
|
||
self.set_sequence_id()
|
||
self.set_client_id()
|
||
self.generate_random_fp()
|
||
self.generate_vid()
|
||
self.generate_rt()
|
||
t = self.risk_inspect()
|
||
if t:
|
||
verify_msg = self.enc_params(t)
|
||
print(f"verify_msg: {verify_msg}")
|
||
first_verify_ret = self.verify_jigsaw(verify_msg)
|
||
if not first_verify_ret:
|
||
continue
|
||
if isinstance(first_verify_ret, list):
|
||
enc_icon = self.enc_icon_params(first_verify_ret)
|
||
verify_info = self.verify_icon(enc_icon)
|
||
self.rid = verify_info["rid"]
|
||
self.token = verify_info["token"]
|
||
else:
|
||
self.rid = first_verify_ret["rid"]
|
||
self.token = first_verify_ret["token"]
|
||
captcha_res = self.check_captcha()
|
||
self.captcha_token = captcha_res["tokens"][0]
|
||
return True
|
||
return False
|
||
|
||
def check_bind_status(self, res):
|
||
try:
|
||
result_code = res["ResultInfo"]["ResultCode"]
|
||
# 券号密码已经被领用
|
||
if result_code == 1:
|
||
self.remarks = "券号密码已经被领用"
|
||
return 104
|
||
# 绑卡成功
|
||
elif result_code == 0:
|
||
self.remarks = "绑卡成功"
|
||
return 100
|
||
# 出现滑块
|
||
elif result_code == -10001:
|
||
self.remarks = "请求绑卡返回滑块验证失败"
|
||
return 111
|
||
else:
|
||
self.remarks = "绑卡check_bind_status未知状态码"
|
||
return 111
|
||
except Exception as e:
|
||
self.remarks = "绑卡check_bind_status异常未知状态码"
|
||
return 111
|
||
|
||
def check_card(self):
|
||
try:
|
||
verify_status = self.verify_code()
|
||
if not verify_status:
|
||
self.remarks = "查卡滑块验证失败"
|
||
return 110
|
||
card_info_res = self.get_card_info()
|
||
code = self.get_status_code(card_info_res)
|
||
return self.create_response(status_code=code)
|
||
except:
|
||
self.remarks = "check_card方法异常未知"
|
||
logger.info(
|
||
f"订单ID:{self.order_num},重新查卡返回:{traceback.format_exc()}"
|
||
)
|
||
return self.create_response(status_code=111)
|
||
|
||
def run(self):
|
||
try:
|
||
logger.info(
|
||
f"订单号:{self.order_num},卡号:{self.card_num},卡密:{self.card_pwd},充值金额:{self.price},ck:{self.cookies2}"
|
||
)
|
||
# 判断cookies
|
||
if not self.process_cookies():
|
||
self.remarks = "ck检测不通过"
|
||
return self.create_response(status_code=113)
|
||
|
||
# 查卡
|
||
check_card_res = self.check_card()
|
||
code = check_card_res.get("code")
|
||
# 绑卡
|
||
if code == 2001:
|
||
# 绑卡前重新获取验证码
|
||
verify_status = self.verify_code()
|
||
if not verify_status:
|
||
self.remarks = "绑卡滑块验证失败"
|
||
return self.create_response(status_code=110)
|
||
bind_res = self.do_bind_card()
|
||
code = self.check_bind_status(bind_res)
|
||
return self.create_response(status_code=code)
|
||
|
||
except Exception as e:
|
||
self.remarks = "run方法异常未知"
|
||
logger.info(f"订单ID:{self.order_num},绑卡返回:{traceback.format_exc()}")
|
||
return self.create_response(status_code=111)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
cookies = "1E0855FE0BB5344CE91251B384CC4E2A7ADDE4239841FDCBF09DC09144BC7668"
|
||
card_num = "319935633479"
|
||
card_pwd = "553831"
|
||
price = "100"
|
||
order_num = "123456"
|
||
ctrip = CtripLogin(
|
||
cookies=cookies,
|
||
card_num=card_num,
|
||
card_pwd=card_pwd,
|
||
price=price,
|
||
order_num=order_num,
|
||
)
|
||
# for i in range(10):
|
||
res = ctrip.check_card()
|
||
print(res)
|
||
print("=" * 200)
|