mirror of
https://git.oceanpay.cc/danial/kami_ticket_slide_server.git
synced 2025-12-18 22:42:40 +00:00
604 lines
22 KiB
Python
604 lines
22 KiB
Python
# -*- coding: utf-8 -*-
|
||
# @Time: 2024/9/16 13:42
|
||
# @Author:GuangXu-Pan
|
||
# @Software:Pycharm
|
||
# @File:tx_slide_ai_word.py
|
||
import base64
|
||
import io
|
||
import json
|
||
import os
|
||
import re
|
||
import sys
|
||
import time
|
||
import cv2
|
||
import execjs
|
||
import numpy as np
|
||
# from curl_cffi import requests
|
||
import requests
|
||
from PIL import Image
|
||
from flask import Flask, request
|
||
from loguru import logger
|
||
|
||
|
||
def get_resource_path(relative_path):
|
||
""" 获取打包后资源文件的路径 """
|
||
if hasattr(sys, '_MEIPASS'):
|
||
# 如果程序是打包状态,使用 _MEIPASS 目录
|
||
return os.path.join(sys._MEIPASS, relative_path)
|
||
# 如果是未打包状态,使用当前目录
|
||
return os.path.join(os.path.abspath("."), relative_path)
|
||
|
||
|
||
class TX_new:
|
||
def __init__(self, aid, host, proxy=None):
|
||
self.requests = requests.session()
|
||
if proxy:
|
||
self.requests.proxies = proxy
|
||
self.aid = aid
|
||
self.host = host
|
||
self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
||
self.jiami = base64.b64encode(self.ua.encode()).decode(encoding='utf-8')
|
||
try:
|
||
ip_text = proxy['https'].split('://')[-1].split(':')[0]
|
||
self.ip_v4 = ip_text
|
||
except:
|
||
try:
|
||
ip_text = self.get_current_ip()
|
||
except:
|
||
ip_text = self.get_current_ip()
|
||
self.ip_v4 = re.findall('IP:(.*)来自于', ip_text)[0]
|
||
# logger.debug("当前ip:{}".format(self.ip_v4))
|
||
self.headers = {
|
||
"Accept": "*/*",
|
||
"Accept-Language": "zh-CN,zh;q=0.9",
|
||
"Cache-Control": "no-cache",
|
||
"Connection": "keep-alive",
|
||
"Pragma": "no-cache",
|
||
"Referer": "https://cloud.tencent.com/",
|
||
"Sec-Fetch-Dest": "script",
|
||
"Sec-Fetch-Mode": "no-cors",
|
||
"Sec-Fetch-Site": "cross-site",
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
|
||
"sec-ch-ua": "\"Google Chrome\";v=\"124\", \"Not:A-Brand\";v=\"8\", \"Chromium\";v=\"124\"",
|
||
"sec-ch-ua-mobile": "?0",
|
||
"sec-ch-ua-platform": "\"Windows\""
|
||
}
|
||
self.response = None
|
||
|
||
def get_current_ip(self) -> str:
|
||
headers = {
|
||
"user-agent": self.ua
|
||
}
|
||
url = "https://myip.ipip.net"
|
||
response = requests.get(url, headers=headers)
|
||
if response.status_code == 200:
|
||
return response.text
|
||
|
||
def get_image(self, img_url, sprite_url=None):
|
||
bg_url = self.host + img_url
|
||
if sprite_url:
|
||
fg_url = self.host + sprite_url
|
||
fg_content = self.requests.get(fg_url, headers=self.headers).content
|
||
bg_content = self.requests.get(bg_url, headers=self.headers).content
|
||
return bg_content, fg_content
|
||
else:
|
||
content = self.requests.get(bg_url, headers=self.headers).content
|
||
return content
|
||
# with open('bg.png', 'wb') as file:
|
||
# file.write(bg_content)
|
||
# with open('fg.png', 'wb') as files:
|
||
# files.write(fg_content)
|
||
|
||
def bytes_to_cv2(self, img):
|
||
"""
|
||
二进制图片转cv2
|
||
:param img: 二进制图片数据, <type 'bytes'>
|
||
:return: cv2图像, <type 'numpy.ndarray'>
|
||
"""
|
||
# 将图片字节码bytes, 转换成一维的numpy数组到缓存中
|
||
img_buffer_np = np.frombuffer(img, dtype=np.uint8)
|
||
# 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式
|
||
img_np = cv2.imdecode(img_buffer_np, 1)
|
||
return img_np
|
||
|
||
def cv2_open(self, img, flag=None):
|
||
if isinstance(img, bytes):
|
||
img = self.bytes_to_cv2(img)
|
||
elif isinstance(img, str):
|
||
img = cv2.imread(str(img))
|
||
elif isinstance(img, np.ndarray):
|
||
img = img
|
||
else:
|
||
raise ValueError(f'输入的图片类型无法解析: {type(img)}')
|
||
if flag is not None:
|
||
img = cv2.cvtColor(img, flag)
|
||
return img
|
||
|
||
def get_distance(self, bg, tp):
|
||
"""
|
||
:param bg: 背景图路径
|
||
:param tp: 缺口图路径
|
||
:return: 缺口位置
|
||
"""
|
||
# 读取图片
|
||
bg_img = self.cv2_open(bg)
|
||
tp_gray = self.cv2_open(tp, flag=cv2.COLOR_BGR2GRAY)
|
||
|
||
# 金字塔均值漂移
|
||
bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50)
|
||
|
||
# 边缘检测
|
||
tp_gray = cv2.Canny(tp_gray, 255, 255)
|
||
bg_gray = cv2.Canny(bg_shift, 255, 255)
|
||
|
||
# 目标匹配
|
||
result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED)
|
||
# 解析匹配结果
|
||
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
|
||
|
||
# distance = max_loc[0]
|
||
return max_loc
|
||
|
||
def crop_img(self, bg_content, fg_content):
|
||
# 打开图像文件
|
||
image = Image.open(io.BytesIO(fg_content))
|
||
x1, y1, x2, y2 = 142, 492, 260, 610
|
||
cropped_image = image.crop((x1, y1, x2, y2))
|
||
# 将裁剪后的图像转换回字节数据
|
||
cropped_bytes_io = io.BytesIO()
|
||
cropped_image.save(cropped_bytes_io, format='PNG')
|
||
cropped_bytes = cropped_bytes_io.getvalue()
|
||
# 缺口距离
|
||
distance = self.get_distance(bg_content, cropped_bytes)
|
||
logger.debug(f"缺口位置为{distance}")
|
||
# 坐标
|
||
x = str(distance[0])
|
||
y = str(distance[1])
|
||
return x, y
|
||
|
||
def get_tx_verify(self, tdc_path):
|
||
capurl = self.host + tdc_path
|
||
headers = {
|
||
"Accept": "*/*",
|
||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7",
|
||
"Cache-Control": "no-cache",
|
||
"Connection": "keep-alive",
|
||
"Pragma": "no-cache",
|
||
"Referer": "https://turing.captcha.gtimg.com/",
|
||
"Sec-Fetch-Dest": "script",
|
||
"Sec-Fetch-Mode": "no-cors",
|
||
"Sec-Fetch-Site": "cross-site",
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
|
||
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"124\", \"Google Chrome\";v=\"124\"",
|
||
"sec-ch-ua-mobile": "?0",
|
||
"sec-ch-ua-platform": "\"Windows\""
|
||
}
|
||
response = self.requests.get(capurl, headers=headers)
|
||
cap_ = response.text
|
||
tx = get_resource_path('tx.js')
|
||
with open(tx, 'r', encoding="utf-8") as f:
|
||
jscode = f.read().replace('"vmp"', cap_)
|
||
collect_dict = execjs.compile(jscode).call('get_data', self.ip_v4)
|
||
eks = collect_dict["eks"]
|
||
tlg = collect_dict["tlg"]
|
||
collect = collect_dict["collect"]
|
||
return eks, tlg, collect
|
||
|
||
def get_slide_ticket(self, eks, tlg, collect, x, y, sess, pow_answer, pow_calc_time):
|
||
url = self.host + "/cap_union_new_verify"
|
||
data = {
|
||
"collect": collect,
|
||
"tlg": tlg,
|
||
"eks": eks,
|
||
"sess": sess,
|
||
"ans": "[{\"elem_id\":1,\"type\":\"DynAnswerType_POS\",\"data\":\"" + x + "," + y + "\"}]",
|
||
"pow_answer": pow_answer,
|
||
"pow_calc_time": pow_calc_time
|
||
}
|
||
response = self.requests.post(url, headers=self.headers, data=data).json()
|
||
# logger.info(response)
|
||
|
||
return response
|
||
|
||
def slide(self, img_url, sess, tdc_path):
|
||
sprite_url = json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["dyn_show_info"][
|
||
"sprite_url"]
|
||
prefix = \
|
||
json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["comm_captcha_cfg"]["pow_cfg"][
|
||
"prefix"]
|
||
md5 = json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["comm_captcha_cfg"]["pow_cfg"][
|
||
"md5"]
|
||
ans = get_resource_path('ans.js')
|
||
with open(ans, 'r', encoding='utf-8') as frd:
|
||
jscode = frd.read()
|
||
hhhh = {"target": md5, "nonce": prefix}
|
||
pow_an = execjs.compile(jscode).call('getWorkloadResult', hhhh)
|
||
pow = pow_an["ans"]
|
||
pow_calc_time = pow_an["duration"]
|
||
pow_answer = f"{prefix}{str(pow)}"
|
||
bg_content, fg_content = self.get_image(img_url, sprite_url)
|
||
x, y = self.crop_img(bg_content, fg_content)
|
||
eks, tlg, collect = self.get_tx_verify(tdc_path)
|
||
result = self.get_slide_ticket(eks, tlg, collect, x, y, sess, pow_answer, pow_calc_time)
|
||
if result['errorCode'] == '0':
|
||
return {'ticket': result['ticket'], 'randstr': result['randstr']}
|
||
if result['errorCode'] == '50':
|
||
logger.error('识别错误')
|
||
return self.main()
|
||
else:
|
||
logger.error('失败')
|
||
logger.error(result['errorCode'])
|
||
return '失败请重试'
|
||
|
||
def main(self):
|
||
url = self.host + "/cap_union_prehandle"
|
||
params = {
|
||
"aid": self.aid, # 小程序id
|
||
"protocol": "https",
|
||
"accver": "1",
|
||
"showtype": "popup",
|
||
"ua": self.jiami,
|
||
"noheader": "1",
|
||
"fb": "1",
|
||
"aged": "0",
|
||
"enableAged": "0",
|
||
"enableDarkMode": "0",
|
||
"grayscale": "1",
|
||
"clientype": "2",
|
||
"cap_cd": "",
|
||
"uid": "",
|
||
"lang": "zh-cn",
|
||
# "entry_url": "https://cloud.tencent.com/product/captcha",
|
||
"entry_url": "https://vip.tulingpyton.cn/",
|
||
"elder_captcha": "0",
|
||
"js": "/tcaptcha-frame.cc3d815a.js",
|
||
"login_appid": "",
|
||
"wb": "2",
|
||
"subsid": "3",
|
||
# "callback": "_aq_612456",
|
||
"sess": ""
|
||
}
|
||
self.response = self.requests.get(url, headers=self.headers, params=params)
|
||
sess = json.loads(self.response.text.replace("(", "").replace(")", ""))["sess"]
|
||
sid = json.loads(self.response.text.replace("(", "").replace(")", ""))["sid"]
|
||
tdc_path = json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["comm_captcha_cfg"][
|
||
"tdc_path"]
|
||
if json.loads(self.response.text.replace("(", "").replace(")", "")).get("data").get("dyn_show_info").get(
|
||
"instruction"):
|
||
if '拼图' in json.loads(self.response.text.replace("(", "").replace(")", "")).get("data").get(
|
||
"dyn_show_info").get("instruction"):
|
||
img_url = \
|
||
json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["dyn_show_info"][
|
||
"bg_elem_cfg"][
|
||
"img_url"]
|
||
logger.info('当前需要滑动验证')
|
||
result = self.slide(img_url, sess, tdc_path)
|
||
# logger.success(result)
|
||
return result
|
||
else:
|
||
logger.error('不支持类型')
|
||
return '不支持类型'
|
||
|
||
else:
|
||
logger.error('不支持类型')
|
||
return '不支持类型'
|
||
|
||
|
||
class Tx_old:
|
||
def __init__(self, aid, host, proxy=None):
|
||
self.session = requests.session()
|
||
self.host = host
|
||
if proxy:
|
||
self.session.proxies = proxy
|
||
self.aid = aid
|
||
try:
|
||
ip_text = proxy['https'].split('://')[-1].split(':')[0]
|
||
self.ip_v4 = ip_text
|
||
except:
|
||
try:
|
||
ip_text = self.get_current_ip()
|
||
except:
|
||
ip_text = self.get_current_ip()
|
||
self.ip_v4 = re.findall('IP:(.*)来自于', ip_text)[0]
|
||
# logger.debug("当前ip:{}".format(self.ip_v4))
|
||
self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
||
self.jiami = base64.b64encode(self.ua.encode()).decode(encoding='utf-8')
|
||
self.createIframeStart = str(int(time.time() * 1000))
|
||
self.headers = {
|
||
"Accept": "*/*",
|
||
"Accept-Language": "zh-CN,zh;q=0.9",
|
||
"Cache-Control": "no-cache",
|
||
"Connection": "keep-alive",
|
||
"Pragma": "no-cache",
|
||
"Referer": "https://cloud.tencent.com/",
|
||
"Sec-Fetch-Dest": "script",
|
||
"Sec-Fetch-Mode": "no-cors",
|
||
"Sec-Fetch-Site": "cross-site",
|
||
"User-Agent": self.ua,
|
||
"sec-ch-ua": "\"Google Chrome\";v=\"124\", \"Not:A-Brand\";v=\"8\", \"Chromium\";v=\"124\"",
|
||
"sec-ch-ua-mobile": "?0",
|
||
"sec-ch-ua-platform": "\"Windows\""
|
||
}
|
||
|
||
def get_current_ip(self) -> str:
|
||
headers = {
|
||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
||
}
|
||
url = "https://myip.ipip.net"
|
||
response = self.session.get(url, headers=headers)
|
||
if response.status_code == 200:
|
||
return response.text
|
||
|
||
def get_sid_sess(self):
|
||
url = self.host + '/cap_union_prehandle'
|
||
data = {
|
||
'aid': self.aid,
|
||
'protocol': 'https',
|
||
'accver': '1',
|
||
'showtype': 'popup',
|
||
'ua': 'TW96aWxsYS81LjAgKFdpbmRvd3MgTlQgMTAuMDsgV2luNjQ7IHg2NCkgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgQ2hyb21lLzExNi4wLjAuMCBTYWZhcmkvNTM3LjM2',
|
||
'noheader': '1',
|
||
'fb': '0',
|
||
'aged': '0',
|
||
'enableAged': '0',
|
||
'enableDarkMode': '1',
|
||
'grayscale': '1',
|
||
'clientype': '2',
|
||
'cap_cd': '',
|
||
'uid': '',
|
||
'lang': 'zh-cn',
|
||
'entry_url': 'https://www.szwego.com/static/index.html#/password_login',
|
||
'elder_captcha': '0',
|
||
'js': '/tcaptcha-frame.22125576.js',
|
||
'login_appid': '',
|
||
'wb': '1',
|
||
'subsid': '3',
|
||
'callback': '_aq_373409',
|
||
'sess': '',
|
||
}
|
||
res = self.session.get(url, params=data).text
|
||
return re.findall('"sess":"(.*?)"', res)[0], re.findall('"sid":"(.*?)"', res)[0]
|
||
|
||
def bytes_to_cv2(self, img):
|
||
"""
|
||
二进制图片转cv2
|
||
:param img: 二进制图片数据, <type 'bytes'>
|
||
:return: cv2图像, <type 'numpy.ndarray'>
|
||
"""
|
||
# 将图片字节码bytes, 转换成一维的numpy数组到缓存中
|
||
img_buffer_np = np.frombuffer(img, dtype=np.uint8)
|
||
# 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式
|
||
img_np = cv2.imdecode(img_buffer_np, 1)
|
||
return img_np
|
||
|
||
def cv2_open(self, img, flag=None):
|
||
"""
|
||
统一输出图片格式为cv2图像, <type 'numpy.ndarray'>
|
||
:param img: <type 'bytes'/'numpy.ndarray'/'str'/'Path'/'PIL.JpegImagePlugin.JpegImageFile'>
|
||
:param flag: 颜色空间转换类型, default: None
|
||
eg: cv2.COLOR_BGR2GRAY(灰度图)
|
||
:return: cv2图像, <numpy.ndarray>
|
||
"""
|
||
if isinstance(img, bytes):
|
||
img = self.bytes_to_cv2(img)
|
||
else:
|
||
raise ValueError(f'输入的图片类型无法解析: {type(img)}')
|
||
if flag is not None:
|
||
img = cv2.cvtColor(img, flag)
|
||
return img
|
||
|
||
def get_distance(self, bg, tp):
|
||
# 读取图片
|
||
bg_img = self.cv2_open(bg)
|
||
tp_gray = self.cv2_open(tp, flag=cv2.COLOR_BGR2GRAY)
|
||
|
||
# 金字塔均值漂移
|
||
bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50)
|
||
|
||
# 边缘检测
|
||
tp_gray = cv2.Canny(tp_gray, 255, 255)
|
||
bg_gray = cv2.Canny(bg_shift, 255, 255)
|
||
|
||
# 目标匹配
|
||
result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED)
|
||
# 解析匹配结果
|
||
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
|
||
logger.info(f'坐标:{min_loc}')
|
||
return max_loc
|
||
|
||
def verify(self, sess2, sid, ans, pow_answer, collect, tlg, eks, nonce):
|
||
verify_url = self.host + '/cap_union_new_verify'
|
||
data = {
|
||
'aid': self.aid,
|
||
'protocol': 'https',
|
||
'accver': '1',
|
||
'showtype': 'popup',
|
||
'ua': self.jiami,
|
||
'noheader': '1',
|
||
'fb': '0',
|
||
'aged': '0',
|
||
'enableAged': '0',
|
||
'enableDarkMode': '1',
|
||
'grayscale': '1',
|
||
'clientype': '2',
|
||
'sess': sess2,
|
||
'fwidth': '0',
|
||
'sid': str(sid),
|
||
'wxLang': '',
|
||
'tcScale': '1',
|
||
'uid': '',
|
||
'cap_cd': '',
|
||
'rnd': '415581',
|
||
'prehandleLoadTime': '83',
|
||
'createIframeStart': self.createIframeStart,
|
||
'global': '0',
|
||
'subsid': '26',
|
||
'cdata': '0',
|
||
'ans': ans,
|
||
'vsig': '',
|
||
'websig': '',
|
||
'subcapclass': '',
|
||
'pow_answer': pow_answer,
|
||
'pow_calc_time': '10',
|
||
'collect': collect,
|
||
'tlg': tlg,
|
||
'fpinfo': '',
|
||
'eks': eks,
|
||
'nonce': nonce,
|
||
'vlg': '0_0_1',
|
||
'vData': 'ed0FNEDmGsgjfhnCwiBah1gi1hqqagtoj88Pd0lL95PbaFK-LucV4iDyjLi*iCQ7h9V04rTubXsbuSpp9H1MctodkEOQDPiacJr1PBkrwfPse3PZPmgfP3daO0*a34PJrD1ShB32X_jt0PAD8MoHriYY',
|
||
}
|
||
response = self.session.post(verify_url, data=data, headers=self.headers).json()
|
||
return response
|
||
|
||
def get_image_str(self, sess, sid):
|
||
get_url = self.host + '/cap_union_new_show'
|
||
get_data = {
|
||
'aid': self.aid,
|
||
'protocol': 'https',
|
||
'accver': '1',
|
||
'showtype': 'popup',
|
||
'ua': self.jiami,
|
||
'noheader': '1',
|
||
'fb': '0',
|
||
'aged': '0',
|
||
'enableAged': '0',
|
||
'enableDarkMode': '1',
|
||
'grayscale': '1',
|
||
'clientype': '2',
|
||
'sess': f'{sess}',
|
||
'fwidth': '0',
|
||
'sid': f'{sid}',
|
||
'wxLang': '',
|
||
'tcScale': '1',
|
||
'uid': '',
|
||
'cap_cd': '',
|
||
'rnd': '204910',
|
||
'prehandleLoadTime': '84',
|
||
'createIframeStart': f'{int(time.time() * 1000)}',
|
||
'global': '0',
|
||
'subsid': '6',
|
||
}
|
||
respon = self.session.get(get_url, params=get_data)
|
||
respon.encoding = 'utf-8'
|
||
image_str = re.findall('cdnPic1:"/hycdn\?index=1&image=(.*?)",', respon.text)[0]
|
||
# prefix = re.findall('prefix:"(.*?)"', respon.text)[0]
|
||
# md5 = re.findall('md5:"(.*?)"', respon.text)[0]
|
||
# with open('ans.js', 'r', encoding='utf-8') as frd:
|
||
# jscode = frd.read()
|
||
# hhhh = {"target": md5, "nonce": prefix}
|
||
# pow_an = execjs.compile(jscode).call('getWorkloadResult', hhhh)
|
||
# pow = pow_an["ans"]
|
||
# pow_answer = f"{prefix}{str(pow)}"
|
||
pow_answer = ""
|
||
nonce = re.findall('nonce:"(.*?)"', respon.text)[0]
|
||
sess2 = re.findall('sess:"(.*?)"', respon.text)[0]
|
||
js_path = re.findall('dcFileName:"(.*?)"', respon.text)[0]
|
||
return pow_answer, nonce, sess2, js_path, image_str
|
||
|
||
def slide(self, image_str, sess2, sid, js_path):
|
||
p1 = {
|
||
'index': '1',
|
||
'image': f'{image_str}?aid={self.aid}',
|
||
'sess': sess2,
|
||
'sid': sid,
|
||
'img_index': '1',
|
||
'subsid': '7',
|
||
}
|
||
p2 = {
|
||
'index': '2',
|
||
'image': f'{image_str}?aid={self.aid}',
|
||
'sess': sess2,
|
||
'sid': sid,
|
||
'img_index': '2',
|
||
'subsid': '8',
|
||
}
|
||
|
||
distance = self.get_distance(
|
||
self.session.get(self.host + '/hycdn', params=p1).content,
|
||
self.session.get(self.host + '/hycdn', params=p2).content,
|
||
)
|
||
ans = str(distance[0]) + ',' + str(distance[1]) + ';'
|
||
js_url = f'{self.host}/{js_path}'
|
||
cap_ = self.session.get(js_url).text
|
||
tx = get_resource_path('tx.js')
|
||
with open(tx, 'r', encoding="utf-8") as f:
|
||
jscode = f.read().replace('"vmp"', cap_)
|
||
collect_dict = execjs.compile(jscode).call('get_data', self.ip_v4)
|
||
eks = collect_dict["eks"]
|
||
tlg = collect_dict["tlg"]
|
||
collect = collect_dict["collect"]
|
||
|
||
return ans, eks, tlg, collect
|
||
|
||
def main(self):
|
||
sess, sid = self.get_sid_sess()
|
||
pow_answer, nonce, sess2, js_path, image_str = self.get_image_str(sess, sid)
|
||
|
||
ans, eks, tlg, collect = self.slide(image_str, sess2, sid, js_path)
|
||
|
||
response = self.verify(sess2, sid, ans, pow_answer, collect, tlg, eks, nonce)
|
||
if response['errorCode'] == '0':
|
||
return {'ticket': response['ticket'], 'randstr': response['randstr']}
|
||
if response['errorCode'] == '50':
|
||
logger.error('识别错误')
|
||
return self.main()
|
||
else:
|
||
logger.error('失败')
|
||
logger.error(response['errorCode'])
|
||
return '失败请重试'
|
||
|
||
|
||
def read_port_from_config():
|
||
config_path = 'config.txt'
|
||
with open(config_path, 'r') as file:
|
||
data = file.readlines()
|
||
port = int(data[0].strip())
|
||
xieyi = data[1]
|
||
return port, xieyi
|
||
|
||
|
||
port, xieyi = read_port_from_config()
|
||
|
||
app = Flask(__name__)
|
||
|
||
|
||
@app.route('/api/TX', methods=['GET'])
|
||
def handle_request():
|
||
aid = request.args.get('aid')
|
||
host = request.args.get('host')
|
||
try:
|
||
if request.args.get('ip'):
|
||
ip = request.args.get('ip')
|
||
proxies = {
|
||
'https': f'{xieyi}://' + ip,
|
||
'http': f'{xieyi}://' + ip
|
||
}
|
||
result = TX_new(aid, host, proxies).main()
|
||
logger.success(result)
|
||
return result
|
||
else:
|
||
result = TX_new(aid, host).main()
|
||
logger.success(result)
|
||
return result
|
||
except:
|
||
if request.args.get('ip'):
|
||
ip = request.args.get('ip')
|
||
proxies = {
|
||
'https': f'{xieyi}://' + ip,
|
||
'http': f'{xieyi}://' + ip
|
||
}
|
||
result = Tx_old(aid, host, proxies).main()
|
||
logger.success(result)
|
||
return result
|
||
else:
|
||
result = Tx_old(aid, host).main()
|
||
logger.success(result)
|
||
return result
|
||
|
||
|
||
if __name__ == '__main__':
|
||
logger.info(f'代理协议-->{xieyi}')
|
||
logger.debug(f'端口-->{port}')
|
||
app.run(host='0.0.0.0', port=port)
|