Files
2025-04-13 19:21:12 +08:00

604 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Time: 2024/9/16 13:42
# @AuthorGuangXu-Pan
# @SoftwarePycharm
# @Filetx_slide_ai_word.py
import base64
import io
import json
import os
import re
import sys
import time
import cv2
import execjs
import numpy as np
# from curl_cffi import requests
import requests
from PIL import Image
from flask import Flask, request
from loguru import logger
def get_resource_path(relative_path):
""" 获取打包后资源文件的路径 """
if hasattr(sys, '_MEIPASS'):
# 如果程序是打包状态,使用 _MEIPASS 目录
return os.path.join(sys._MEIPASS, relative_path)
# 如果是未打包状态,使用当前目录
return os.path.join(os.path.abspath("."), relative_path)
class TX_new:
def __init__(self, aid, host, proxy=None):
self.requests = requests.session()
if proxy:
self.requests.proxies = proxy
self.aid = aid
self.host = host
self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
self.jiami = base64.b64encode(self.ua.encode()).decode(encoding='utf-8')
try:
ip_text = proxy['https'].split('://')[-1].split(':')[0]
self.ip_v4 = ip_text
except:
try:
ip_text = self.get_current_ip()
except:
ip_text = self.get_current_ip()
self.ip_v4 = re.findall('IP(.*)来自于', ip_text)[0]
# logger.debug("当前ip:{}".format(self.ip_v4))
self.headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Pragma": "no-cache",
"Referer": "https://cloud.tencent.com/",
"Sec-Fetch-Dest": "script",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "cross-site",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"sec-ch-ua": "\"Google Chrome\";v=\"124\", \"Not:A-Brand\";v=\"8\", \"Chromium\";v=\"124\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
self.response = None
def get_current_ip(self) -> str:
headers = {
"user-agent": self.ua
}
url = "https://myip.ipip.net"
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.text
def get_image(self, img_url, sprite_url=None):
bg_url = self.host + img_url
if sprite_url:
fg_url = self.host + sprite_url
fg_content = self.requests.get(fg_url, headers=self.headers).content
bg_content = self.requests.get(bg_url, headers=self.headers).content
return bg_content, fg_content
else:
content = self.requests.get(bg_url, headers=self.headers).content
return content
# with open('bg.png', 'wb') as file:
# file.write(bg_content)
# with open('fg.png', 'wb') as files:
# files.write(fg_content)
def bytes_to_cv2(self, img):
"""
二进制图片转cv2
:param img: 二进制图片数据, <type 'bytes'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
# 将图片字节码bytes, 转换成一维的numpy数组到缓存中
img_buffer_np = np.frombuffer(img, dtype=np.uint8)
# 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式
img_np = cv2.imdecode(img_buffer_np, 1)
return img_np
def cv2_open(self, img, flag=None):
if isinstance(img, bytes):
img = self.bytes_to_cv2(img)
elif isinstance(img, str):
img = cv2.imread(str(img))
elif isinstance(img, np.ndarray):
img = img
else:
raise ValueError(f'输入的图片类型无法解析: {type(img)}')
if flag is not None:
img = cv2.cvtColor(img, flag)
return img
def get_distance(self, bg, tp):
"""
:param bg: 背景图路径
:param tp: 缺口图路径
:return: 缺口位置
"""
# 读取图片
bg_img = self.cv2_open(bg)
tp_gray = self.cv2_open(tp, flag=cv2.COLOR_BGR2GRAY)
# 金字塔均值漂移
bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50)
# 边缘检测
tp_gray = cv2.Canny(tp_gray, 255, 255)
bg_gray = cv2.Canny(bg_shift, 255, 255)
# 目标匹配
result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED)
# 解析匹配结果
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
# distance = max_loc[0]
return max_loc
def crop_img(self, bg_content, fg_content):
# 打开图像文件
image = Image.open(io.BytesIO(fg_content))
x1, y1, x2, y2 = 142, 492, 260, 610
cropped_image = image.crop((x1, y1, x2, y2))
# 将裁剪后的图像转换回字节数据
cropped_bytes_io = io.BytesIO()
cropped_image.save(cropped_bytes_io, format='PNG')
cropped_bytes = cropped_bytes_io.getvalue()
# 缺口距离
distance = self.get_distance(bg_content, cropped_bytes)
logger.debug(f"缺口位置为{distance}")
# 坐标
x = str(distance[0])
y = str(distance[1])
return x, y
def get_tx_verify(self, tdc_path):
capurl = self.host + tdc_path
headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Pragma": "no-cache",
"Referer": "https://turing.captcha.gtimg.com/",
"Sec-Fetch-Dest": "script",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "cross-site",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"124\", \"Google Chrome\";v=\"124\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
response = self.requests.get(capurl, headers=headers)
cap_ = response.text
tx = get_resource_path('tx.js')
with open(tx, 'r', encoding="utf-8") as f:
jscode = f.read().replace('"vmp"', cap_)
collect_dict = execjs.compile(jscode).call('get_data', self.ip_v4)
eks = collect_dict["eks"]
tlg = collect_dict["tlg"]
collect = collect_dict["collect"]
return eks, tlg, collect
def get_slide_ticket(self, eks, tlg, collect, x, y, sess, pow_answer, pow_calc_time):
url = self.host + "/cap_union_new_verify"
data = {
"collect": collect,
"tlg": tlg,
"eks": eks,
"sess": sess,
"ans": "[{\"elem_id\":1,\"type\":\"DynAnswerType_POS\",\"data\":\"" + x + "," + y + "\"}]",
"pow_answer": pow_answer,
"pow_calc_time": pow_calc_time
}
response = self.requests.post(url, headers=self.headers, data=data).json()
# logger.info(response)
return response
def slide(self, img_url, sess, tdc_path):
sprite_url = json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["dyn_show_info"][
"sprite_url"]
prefix = \
json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["comm_captcha_cfg"]["pow_cfg"][
"prefix"]
md5 = json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["comm_captcha_cfg"]["pow_cfg"][
"md5"]
ans = get_resource_path('ans.js')
with open(ans, 'r', encoding='utf-8') as frd:
jscode = frd.read()
hhhh = {"target": md5, "nonce": prefix}
pow_an = execjs.compile(jscode).call('getWorkloadResult', hhhh)
pow = pow_an["ans"]
pow_calc_time = pow_an["duration"]
pow_answer = f"{prefix}{str(pow)}"
bg_content, fg_content = self.get_image(img_url, sprite_url)
x, y = self.crop_img(bg_content, fg_content)
eks, tlg, collect = self.get_tx_verify(tdc_path)
result = self.get_slide_ticket(eks, tlg, collect, x, y, sess, pow_answer, pow_calc_time)
if result['errorCode'] == '0':
return {'ticket': result['ticket'], 'randstr': result['randstr']}
if result['errorCode'] == '50':
logger.error('识别错误')
return self.main()
else:
logger.error('失败')
logger.error(result['errorCode'])
return '失败请重试'
def main(self):
url = self.host + "/cap_union_prehandle"
params = {
"aid": self.aid, # 小程序id
"protocol": "https",
"accver": "1",
"showtype": "popup",
"ua": self.jiami,
"noheader": "1",
"fb": "1",
"aged": "0",
"enableAged": "0",
"enableDarkMode": "0",
"grayscale": "1",
"clientype": "2",
"cap_cd": "",
"uid": "",
"lang": "zh-cn",
# "entry_url": "https://cloud.tencent.com/product/captcha",
"entry_url": "https://vip.tulingpyton.cn/",
"elder_captcha": "0",
"js": "/tcaptcha-frame.cc3d815a.js",
"login_appid": "",
"wb": "2",
"subsid": "3",
# "callback": "_aq_612456",
"sess": ""
}
self.response = self.requests.get(url, headers=self.headers, params=params)
sess = json.loads(self.response.text.replace("(", "").replace(")", ""))["sess"]
sid = json.loads(self.response.text.replace("(", "").replace(")", ""))["sid"]
tdc_path = json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["comm_captcha_cfg"][
"tdc_path"]
if json.loads(self.response.text.replace("(", "").replace(")", "")).get("data").get("dyn_show_info").get(
"instruction"):
if '拼图' in json.loads(self.response.text.replace("(", "").replace(")", "")).get("data").get(
"dyn_show_info").get("instruction"):
img_url = \
json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["dyn_show_info"][
"bg_elem_cfg"][
"img_url"]
logger.info('当前需要滑动验证')
result = self.slide(img_url, sess, tdc_path)
# logger.success(result)
return result
else:
logger.error('不支持类型')
return '不支持类型'
else:
logger.error('不支持类型')
return '不支持类型'
class Tx_old:
def __init__(self, aid, host, proxy=None):
self.session = requests.session()
self.host = host
if proxy:
self.session.proxies = proxy
self.aid = aid
try:
ip_text = proxy['https'].split('://')[-1].split(':')[0]
self.ip_v4 = ip_text
except:
try:
ip_text = self.get_current_ip()
except:
ip_text = self.get_current_ip()
self.ip_v4 = re.findall('IP(.*)来自于', ip_text)[0]
# logger.debug("当前ip:{}".format(self.ip_v4))
self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
self.jiami = base64.b64encode(self.ua.encode()).decode(encoding='utf-8')
self.createIframeStart = str(int(time.time() * 1000))
self.headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Pragma": "no-cache",
"Referer": "https://cloud.tencent.com/",
"Sec-Fetch-Dest": "script",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "cross-site",
"User-Agent": self.ua,
"sec-ch-ua": "\"Google Chrome\";v=\"124\", \"Not:A-Brand\";v=\"8\", \"Chromium\";v=\"124\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
def get_current_ip(self) -> str:
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
}
url = "https://myip.ipip.net"
response = self.session.get(url, headers=headers)
if response.status_code == 200:
return response.text
def get_sid_sess(self):
url = self.host + '/cap_union_prehandle'
data = {
'aid': self.aid,
'protocol': 'https',
'accver': '1',
'showtype': 'popup',
'ua': 'TW96aWxsYS81LjAgKFdpbmRvd3MgTlQgMTAuMDsgV2luNjQ7IHg2NCkgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgQ2hyb21lLzExNi4wLjAuMCBTYWZhcmkvNTM3LjM2',
'noheader': '1',
'fb': '0',
'aged': '0',
'enableAged': '0',
'enableDarkMode': '1',
'grayscale': '1',
'clientype': '2',
'cap_cd': '',
'uid': '',
'lang': 'zh-cn',
'entry_url': 'https://www.szwego.com/static/index.html#/password_login',
'elder_captcha': '0',
'js': '/tcaptcha-frame.22125576.js',
'login_appid': '',
'wb': '1',
'subsid': '3',
'callback': '_aq_373409',
'sess': '',
}
res = self.session.get(url, params=data).text
return re.findall('"sess":"(.*?)"', res)[0], re.findall('"sid":"(.*?)"', res)[0]
def bytes_to_cv2(self, img):
"""
二进制图片转cv2
:param img: 二进制图片数据, <type 'bytes'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
# 将图片字节码bytes, 转换成一维的numpy数组到缓存中
img_buffer_np = np.frombuffer(img, dtype=np.uint8)
# 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式
img_np = cv2.imdecode(img_buffer_np, 1)
return img_np
def cv2_open(self, img, flag=None):
"""
统一输出图片格式为cv2图像, <type 'numpy.ndarray'>
:param img: <type 'bytes'/'numpy.ndarray'/'str'/'Path'/'PIL.JpegImagePlugin.JpegImageFile'>
:param flag: 颜色空间转换类型, default: None
eg: cv2.COLOR_BGR2GRAY灰度图
:return: cv2图像, <numpy.ndarray>
"""
if isinstance(img, bytes):
img = self.bytes_to_cv2(img)
else:
raise ValueError(f'输入的图片类型无法解析: {type(img)}')
if flag is not None:
img = cv2.cvtColor(img, flag)
return img
def get_distance(self, bg, tp):
# 读取图片
bg_img = self.cv2_open(bg)
tp_gray = self.cv2_open(tp, flag=cv2.COLOR_BGR2GRAY)
# 金字塔均值漂移
bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50)
# 边缘检测
tp_gray = cv2.Canny(tp_gray, 255, 255)
bg_gray = cv2.Canny(bg_shift, 255, 255)
# 目标匹配
result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED)
# 解析匹配结果
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
logger.info(f'坐标:{min_loc}')
return max_loc
def verify(self, sess2, sid, ans, pow_answer, collect, tlg, eks, nonce):
verify_url = self.host + '/cap_union_new_verify'
data = {
'aid': self.aid,
'protocol': 'https',
'accver': '1',
'showtype': 'popup',
'ua': self.jiami,
'noheader': '1',
'fb': '0',
'aged': '0',
'enableAged': '0',
'enableDarkMode': '1',
'grayscale': '1',
'clientype': '2',
'sess': sess2,
'fwidth': '0',
'sid': str(sid),
'wxLang': '',
'tcScale': '1',
'uid': '',
'cap_cd': '',
'rnd': '415581',
'prehandleLoadTime': '83',
'createIframeStart': self.createIframeStart,
'global': '0',
'subsid': '26',
'cdata': '0',
'ans': ans,
'vsig': '',
'websig': '',
'subcapclass': '',
'pow_answer': pow_answer,
'pow_calc_time': '10',
'collect': collect,
'tlg': tlg,
'fpinfo': '',
'eks': eks,
'nonce': nonce,
'vlg': '0_0_1',
'vData': 'ed0FNEDmGsgjfhnCwiBah1gi1hqqagtoj88Pd0lL95PbaFK-LucV4iDyjLi*iCQ7h9V04rTubXsbuSpp9H1MctodkEOQDPiacJr1PBkrwfPse3PZPmgfP3daO0*a34PJrD1ShB32X_jt0PAD8MoHriYY',
}
response = self.session.post(verify_url, data=data, headers=self.headers).json()
return response
def get_image_str(self, sess, sid):
get_url = self.host + '/cap_union_new_show'
get_data = {
'aid': self.aid,
'protocol': 'https',
'accver': '1',
'showtype': 'popup',
'ua': self.jiami,
'noheader': '1',
'fb': '0',
'aged': '0',
'enableAged': '0',
'enableDarkMode': '1',
'grayscale': '1',
'clientype': '2',
'sess': f'{sess}',
'fwidth': '0',
'sid': f'{sid}',
'wxLang': '',
'tcScale': '1',
'uid': '',
'cap_cd': '',
'rnd': '204910',
'prehandleLoadTime': '84',
'createIframeStart': f'{int(time.time() * 1000)}',
'global': '0',
'subsid': '6',
}
respon = self.session.get(get_url, params=get_data)
respon.encoding = 'utf-8'
image_str = re.findall('cdnPic1:"/hycdn\?index=1&image=(.*?)",', respon.text)[0]
# prefix = re.findall('prefix:"(.*?)"', respon.text)[0]
# md5 = re.findall('md5:"(.*?)"', respon.text)[0]
# with open('ans.js', 'r', encoding='utf-8') as frd:
# jscode = frd.read()
# hhhh = {"target": md5, "nonce": prefix}
# pow_an = execjs.compile(jscode).call('getWorkloadResult', hhhh)
# pow = pow_an["ans"]
# pow_answer = f"{prefix}{str(pow)}"
pow_answer = ""
nonce = re.findall('nonce:"(.*?)"', respon.text)[0]
sess2 = re.findall('sess:"(.*?)"', respon.text)[0]
js_path = re.findall('dcFileName:"(.*?)"', respon.text)[0]
return pow_answer, nonce, sess2, js_path, image_str
def slide(self, image_str, sess2, sid, js_path):
p1 = {
'index': '1',
'image': f'{image_str}?aid={self.aid}',
'sess': sess2,
'sid': sid,
'img_index': '1',
'subsid': '7',
}
p2 = {
'index': '2',
'image': f'{image_str}?aid={self.aid}',
'sess': sess2,
'sid': sid,
'img_index': '2',
'subsid': '8',
}
distance = self.get_distance(
self.session.get(self.host + '/hycdn', params=p1).content,
self.session.get(self.host + '/hycdn', params=p2).content,
)
ans = str(distance[0]) + ',' + str(distance[1]) + ';'
js_url = f'{self.host}/{js_path}'
cap_ = self.session.get(js_url).text
tx = get_resource_path('tx.js')
with open(tx, 'r', encoding="utf-8") as f:
jscode = f.read().replace('"vmp"', cap_)
collect_dict = execjs.compile(jscode).call('get_data', self.ip_v4)
eks = collect_dict["eks"]
tlg = collect_dict["tlg"]
collect = collect_dict["collect"]
return ans, eks, tlg, collect
def main(self):
sess, sid = self.get_sid_sess()
pow_answer, nonce, sess2, js_path, image_str = self.get_image_str(sess, sid)
ans, eks, tlg, collect = self.slide(image_str, sess2, sid, js_path)
response = self.verify(sess2, sid, ans, pow_answer, collect, tlg, eks, nonce)
if response['errorCode'] == '0':
return {'ticket': response['ticket'], 'randstr': response['randstr']}
if response['errorCode'] == '50':
logger.error('识别错误')
return self.main()
else:
logger.error('失败')
logger.error(response['errorCode'])
return '失败请重试'
def read_port_from_config():
config_path = 'config.txt'
with open(config_path, 'r') as file:
data = file.readlines()
port = int(data[0].strip())
xieyi = data[1]
return port, xieyi
port, xieyi = read_port_from_config()
app = Flask(__name__)
@app.route('/api/TX', methods=['GET'])
def handle_request():
aid = request.args.get('aid')
host = request.args.get('host')
try:
if request.args.get('ip'):
ip = request.args.get('ip')
proxies = {
'https': f'{xieyi}://' + ip,
'http': f'{xieyi}://' + ip
}
result = TX_new(aid, host, proxies).main()
logger.success(result)
return result
else:
result = TX_new(aid, host).main()
logger.success(result)
return result
except:
if request.args.get('ip'):
ip = request.args.get('ip')
proxies = {
'https': f'{xieyi}://' + ip,
'http': f'{xieyi}://' + ip
}
result = Tx_old(aid, host, proxies).main()
logger.success(result)
return result
else:
result = Tx_old(aid, host).main()
logger.success(result)
return result
if __name__ == '__main__':
logger.info(f'代理协议-->{xieyi}')
logger.debug(f'端口-->{port}')
app.run(host='0.0.0.0', port=port)