# -*- coding: utf-8 -*- # @Time: 2024/9/16 13:42 # @Author:GuangXu-Pan # @Software:Pycharm # @File:tx_slide_ai_word.py import base64 import io import json import os import re import sys import time import cv2 import execjs import numpy as np # from curl_cffi import requests import requests from PIL import Image from flask import Flask, request from loguru import logger def get_resource_path(relative_path): """ 获取打包后资源文件的路径 """ if hasattr(sys, '_MEIPASS'): # 如果程序是打包状态,使用 _MEIPASS 目录 return os.path.join(sys._MEIPASS, relative_path) # 如果是未打包状态,使用当前目录 return os.path.join(os.path.abspath("."), relative_path) class TX_new: def __init__(self, aid, host, proxy=None): self.requests = requests.session() if proxy: self.requests.proxies = proxy self.aid = aid self.host = host self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" self.jiami = base64.b64encode(self.ua.encode()).decode(encoding='utf-8') try: ip_text = proxy['https'].split('://')[-1].split(':')[0] self.ip_v4 = ip_text except: try: ip_text = self.get_current_ip() except: ip_text = self.get_current_ip() self.ip_v4 = re.findall('IP:(.*)来自于', ip_text)[0] # logger.debug("当前ip:{}".format(self.ip_v4)) self.headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Cache-Control": "no-cache", "Connection": "keep-alive", "Pragma": "no-cache", "Referer": "https://cloud.tencent.com/", "Sec-Fetch-Dest": "script", "Sec-Fetch-Mode": "no-cors", "Sec-Fetch-Site": "cross-site", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "sec-ch-ua": "\"Google Chrome\";v=\"124\", \"Not:A-Brand\";v=\"8\", \"Chromium\";v=\"124\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"Windows\"" } self.response = None def get_current_ip(self) -> str: headers = { "user-agent": self.ua } url = "https://myip.ipip.net" response = requests.get(url, headers=headers) if response.status_code == 200: return response.text def get_image(self, img_url, sprite_url=None): bg_url = self.host + img_url if sprite_url: fg_url = self.host + sprite_url fg_content = self.requests.get(fg_url, headers=self.headers).content bg_content = self.requests.get(bg_url, headers=self.headers).content return bg_content, fg_content else: content = self.requests.get(bg_url, headers=self.headers).content return content # with open('bg.png', 'wb') as file: # file.write(bg_content) # with open('fg.png', 'wb') as files: # files.write(fg_content) def bytes_to_cv2(self, img): """ 二进制图片转cv2 :param img: 二进制图片数据, :return: cv2图像, """ # 将图片字节码bytes, 转换成一维的numpy数组到缓存中 img_buffer_np = np.frombuffer(img, dtype=np.uint8) # 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式 img_np = cv2.imdecode(img_buffer_np, 1) return img_np def cv2_open(self, img, flag=None): if isinstance(img, bytes): img = self.bytes_to_cv2(img) elif isinstance(img, str): img = cv2.imread(str(img)) elif isinstance(img, np.ndarray): img = img else: raise ValueError(f'输入的图片类型无法解析: {type(img)}') if flag is not None: img = cv2.cvtColor(img, flag) return img def get_distance(self, bg, tp): """ :param bg: 背景图路径 :param tp: 缺口图路径 :return: 缺口位置 """ # 读取图片 bg_img = self.cv2_open(bg) tp_gray = self.cv2_open(tp, flag=cv2.COLOR_BGR2GRAY) # 金字塔均值漂移 bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50) # 边缘检测 tp_gray = cv2.Canny(tp_gray, 255, 255) bg_gray = cv2.Canny(bg_shift, 255, 255) # 目标匹配 result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED) # 解析匹配结果 min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) # distance = max_loc[0] return max_loc def crop_img(self, bg_content, fg_content): # 打开图像文件 image = Image.open(io.BytesIO(fg_content)) x1, y1, x2, y2 = 142, 492, 260, 610 cropped_image = image.crop((x1, y1, x2, y2)) # 将裁剪后的图像转换回字节数据 cropped_bytes_io = io.BytesIO() cropped_image.save(cropped_bytes_io, format='PNG') cropped_bytes = cropped_bytes_io.getvalue() # 缺口距离 distance = self.get_distance(bg_content, cropped_bytes) logger.debug(f"缺口位置为{distance}") # 坐标 x = str(distance[0]) y = str(distance[1]) return x, y def get_tx_verify(self, tdc_path): capurl = self.host + tdc_path headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7", "Cache-Control": "no-cache", "Connection": "keep-alive", "Pragma": "no-cache", "Referer": "https://turing.captcha.gtimg.com/", "Sec-Fetch-Dest": "script", "Sec-Fetch-Mode": "no-cors", "Sec-Fetch-Site": "cross-site", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"124\", \"Google Chrome\";v=\"124\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"Windows\"" } response = self.requests.get(capurl, headers=headers) cap_ = response.text tx = get_resource_path('tx.js') with open(tx, 'r', encoding="utf-8") as f: jscode = f.read().replace('"vmp"', cap_) collect_dict = execjs.compile(jscode).call('get_data', self.ip_v4) eks = collect_dict["eks"] tlg = collect_dict["tlg"] collect = collect_dict["collect"] return eks, tlg, collect def get_slide_ticket(self, eks, tlg, collect, x, y, sess, pow_answer, pow_calc_time): url = self.host + "/cap_union_new_verify" data = { "collect": collect, "tlg": tlg, "eks": eks, "sess": sess, "ans": "[{\"elem_id\":1,\"type\":\"DynAnswerType_POS\",\"data\":\"" + x + "," + y + "\"}]", "pow_answer": pow_answer, "pow_calc_time": pow_calc_time } response = self.requests.post(url, headers=self.headers, data=data).json() # logger.info(response) return response def slide(self, img_url, sess, tdc_path): sprite_url = json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["dyn_show_info"][ "sprite_url"] prefix = \ json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["comm_captcha_cfg"]["pow_cfg"][ "prefix"] md5 = json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["comm_captcha_cfg"]["pow_cfg"][ "md5"] ans = get_resource_path('ans.js') with open(ans, 'r', encoding='utf-8') as frd: jscode = frd.read() hhhh = {"target": md5, "nonce": prefix} pow_an = execjs.compile(jscode).call('getWorkloadResult', hhhh) pow = pow_an["ans"] pow_calc_time = pow_an["duration"] pow_answer = f"{prefix}{str(pow)}" bg_content, fg_content = self.get_image(img_url, sprite_url) x, y = self.crop_img(bg_content, fg_content) eks, tlg, collect = self.get_tx_verify(tdc_path) result = self.get_slide_ticket(eks, tlg, collect, x, y, sess, pow_answer, pow_calc_time) if result['errorCode'] == '0': return {'ticket': result['ticket'], 'randstr': result['randstr']} if result['errorCode'] == '50': logger.error('识别错误') return self.main() else: logger.error('失败') logger.error(result['errorCode']) return '失败请重试' def main(self): url = self.host + "/cap_union_prehandle" params = { "aid": self.aid, # 小程序id "protocol": "https", "accver": "1", "showtype": "popup", "ua": self.jiami, "noheader": "1", "fb": "1", "aged": "0", "enableAged": "0", "enableDarkMode": "0", "grayscale": "1", "clientype": "2", "cap_cd": "", "uid": "", "lang": "zh-cn", # "entry_url": "https://cloud.tencent.com/product/captcha", "entry_url": "https://vip.tulingpyton.cn/", "elder_captcha": "0", "js": "/tcaptcha-frame.cc3d815a.js", "login_appid": "", "wb": "2", "subsid": "3", # "callback": "_aq_612456", "sess": "" } self.response = self.requests.get(url, headers=self.headers, params=params) sess = json.loads(self.response.text.replace("(", "").replace(")", ""))["sess"] sid = json.loads(self.response.text.replace("(", "").replace(")", ""))["sid"] tdc_path = json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["comm_captcha_cfg"][ "tdc_path"] if json.loads(self.response.text.replace("(", "").replace(")", "")).get("data").get("dyn_show_info").get( "instruction"): if '拼图' in json.loads(self.response.text.replace("(", "").replace(")", "")).get("data").get( "dyn_show_info").get("instruction"): img_url = \ json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["dyn_show_info"][ "bg_elem_cfg"][ "img_url"] logger.info('当前需要滑动验证') result = self.slide(img_url, sess, tdc_path) # logger.success(result) return result else: logger.error('不支持类型') return '不支持类型' else: logger.error('不支持类型') return '不支持类型' class Tx_old: def __init__(self, aid, host, proxy=None): self.session = requests.session() self.host = host if proxy: self.session.proxies = proxy self.aid = aid try: ip_text = proxy['https'].split('://')[-1].split(':')[0] self.ip_v4 = ip_text except: try: ip_text = self.get_current_ip() except: ip_text = self.get_current_ip() self.ip_v4 = re.findall('IP:(.*)来自于', ip_text)[0] # logger.debug("当前ip:{}".format(self.ip_v4)) self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" self.jiami = base64.b64encode(self.ua.encode()).decode(encoding='utf-8') self.createIframeStart = str(int(time.time() * 1000)) self.headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Cache-Control": "no-cache", "Connection": "keep-alive", "Pragma": "no-cache", "Referer": "https://cloud.tencent.com/", "Sec-Fetch-Dest": "script", "Sec-Fetch-Mode": "no-cors", "Sec-Fetch-Site": "cross-site", "User-Agent": self.ua, "sec-ch-ua": "\"Google Chrome\";v=\"124\", \"Not:A-Brand\";v=\"8\", \"Chromium\";v=\"124\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"Windows\"" } def get_current_ip(self) -> str: headers = { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" } url = "https://myip.ipip.net" response = self.session.get(url, headers=headers) if response.status_code == 200: return response.text def get_sid_sess(self): url = self.host + '/cap_union_prehandle' data = { 'aid': self.aid, 'protocol': 'https', 'accver': '1', 'showtype': 'popup', 'ua': 'TW96aWxsYS81LjAgKFdpbmRvd3MgTlQgMTAuMDsgV2luNjQ7IHg2NCkgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgQ2hyb21lLzExNi4wLjAuMCBTYWZhcmkvNTM3LjM2', 'noheader': '1', 'fb': '0', 'aged': '0', 'enableAged': '0', 'enableDarkMode': '1', 'grayscale': '1', 'clientype': '2', 'cap_cd': '', 'uid': '', 'lang': 'zh-cn', 'entry_url': 'https://www.szwego.com/static/index.html#/password_login', 'elder_captcha': '0', 'js': '/tcaptcha-frame.22125576.js', 'login_appid': '', 'wb': '1', 'subsid': '3', 'callback': '_aq_373409', 'sess': '', } res = self.session.get(url, params=data).text return re.findall('"sess":"(.*?)"', res)[0], re.findall('"sid":"(.*?)"', res)[0] def bytes_to_cv2(self, img): """ 二进制图片转cv2 :param img: 二进制图片数据, :return: cv2图像, """ # 将图片字节码bytes, 转换成一维的numpy数组到缓存中 img_buffer_np = np.frombuffer(img, dtype=np.uint8) # 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式 img_np = cv2.imdecode(img_buffer_np, 1) return img_np def cv2_open(self, img, flag=None): """ 统一输出图片格式为cv2图像, :param img: :param flag: 颜色空间转换类型, default: None eg: cv2.COLOR_BGR2GRAY(灰度图) :return: cv2图像, """ if isinstance(img, bytes): img = self.bytes_to_cv2(img) else: raise ValueError(f'输入的图片类型无法解析: {type(img)}') if flag is not None: img = cv2.cvtColor(img, flag) return img def get_distance(self, bg, tp): # 读取图片 bg_img = self.cv2_open(bg) tp_gray = self.cv2_open(tp, flag=cv2.COLOR_BGR2GRAY) # 金字塔均值漂移 bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50) # 边缘检测 tp_gray = cv2.Canny(tp_gray, 255, 255) bg_gray = cv2.Canny(bg_shift, 255, 255) # 目标匹配 result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED) # 解析匹配结果 min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) logger.info(f'坐标:{min_loc}') return max_loc def verify(self, sess2, sid, ans, pow_answer, collect, tlg, eks, nonce): verify_url = self.host + '/cap_union_new_verify' data = { 'aid': self.aid, 'protocol': 'https', 'accver': '1', 'showtype': 'popup', 'ua': self.jiami, 'noheader': '1', 'fb': '0', 'aged': '0', 'enableAged': '0', 'enableDarkMode': '1', 'grayscale': '1', 'clientype': '2', 'sess': sess2, 'fwidth': '0', 'sid': str(sid), 'wxLang': '', 'tcScale': '1', 'uid': '', 'cap_cd': '', 'rnd': '415581', 'prehandleLoadTime': '83', 'createIframeStart': self.createIframeStart, 'global': '0', 'subsid': '26', 'cdata': '0', 'ans': ans, 'vsig': '', 'websig': '', 'subcapclass': '', 'pow_answer': pow_answer, 'pow_calc_time': '10', 'collect': collect, 'tlg': tlg, 'fpinfo': '', 'eks': eks, 'nonce': nonce, 'vlg': '0_0_1', 'vData': 'ed0FNEDmGsgjfhnCwiBah1gi1hqqagtoj88Pd0lL95PbaFK-LucV4iDyjLi*iCQ7h9V04rTubXsbuSpp9H1MctodkEOQDPiacJr1PBkrwfPse3PZPmgfP3daO0*a34PJrD1ShB32X_jt0PAD8MoHriYY', } response = self.session.post(verify_url, data=data, headers=self.headers).json() return response def get_image_str(self, sess, sid): get_url = self.host + '/cap_union_new_show' get_data = { 'aid': self.aid, 'protocol': 'https', 'accver': '1', 'showtype': 'popup', 'ua': self.jiami, 'noheader': '1', 'fb': '0', 'aged': '0', 'enableAged': '0', 'enableDarkMode': '1', 'grayscale': '1', 'clientype': '2', 'sess': f'{sess}', 'fwidth': '0', 'sid': f'{sid}', 'wxLang': '', 'tcScale': '1', 'uid': '', 'cap_cd': '', 'rnd': '204910', 'prehandleLoadTime': '84', 'createIframeStart': f'{int(time.time() * 1000)}', 'global': '0', 'subsid': '6', } respon = self.session.get(get_url, params=get_data) respon.encoding = 'utf-8' image_str = re.findall('cdnPic1:"/hycdn\?index=1&image=(.*?)",', respon.text)[0] # prefix = re.findall('prefix:"(.*?)"', respon.text)[0] # md5 = re.findall('md5:"(.*?)"', respon.text)[0] # with open('ans.js', 'r', encoding='utf-8') as frd: # jscode = frd.read() # hhhh = {"target": md5, "nonce": prefix} # pow_an = execjs.compile(jscode).call('getWorkloadResult', hhhh) # pow = pow_an["ans"] # pow_answer = f"{prefix}{str(pow)}" pow_answer = "" nonce = re.findall('nonce:"(.*?)"', respon.text)[0] sess2 = re.findall('sess:"(.*?)"', respon.text)[0] js_path = re.findall('dcFileName:"(.*?)"', respon.text)[0] return pow_answer, nonce, sess2, js_path, image_str def slide(self, image_str, sess2, sid, js_path): p1 = { 'index': '1', 'image': f'{image_str}?aid={self.aid}', 'sess': sess2, 'sid': sid, 'img_index': '1', 'subsid': '7', } p2 = { 'index': '2', 'image': f'{image_str}?aid={self.aid}', 'sess': sess2, 'sid': sid, 'img_index': '2', 'subsid': '8', } distance = self.get_distance( self.session.get(self.host + '/hycdn', params=p1).content, self.session.get(self.host + '/hycdn', params=p2).content, ) ans = str(distance[0]) + ',' + str(distance[1]) + ';' js_url = f'{self.host}/{js_path}' cap_ = self.session.get(js_url).text tx = get_resource_path('tx.js') with open(tx, 'r', encoding="utf-8") as f: jscode = f.read().replace('"vmp"', cap_) collect_dict = execjs.compile(jscode).call('get_data', self.ip_v4) eks = collect_dict["eks"] tlg = collect_dict["tlg"] collect = collect_dict["collect"] return ans, eks, tlg, collect def main(self): sess, sid = self.get_sid_sess() pow_answer, nonce, sess2, js_path, image_str = self.get_image_str(sess, sid) ans, eks, tlg, collect = self.slide(image_str, sess2, sid, js_path) response = self.verify(sess2, sid, ans, pow_answer, collect, tlg, eks, nonce) if response['errorCode'] == '0': return {'ticket': response['ticket'], 'randstr': response['randstr']} if response['errorCode'] == '50': logger.error('识别错误') return self.main() else: logger.error('失败') logger.error(response['errorCode']) return '失败请重试' def read_port_from_config(): config_path = 'config.txt' with open(config_path, 'r') as file: data = file.readlines() port = int(data[0].strip()) xieyi = data[1] return port, xieyi port, xieyi = read_port_from_config() app = Flask(__name__) @app.route('/api/TX', methods=['GET']) def handle_request(): aid = request.args.get('aid') host = request.args.get('host') try: if request.args.get('ip'): ip = request.args.get('ip') proxies = { 'https': f'{xieyi}://' + ip, 'http': f'{xieyi}://' + ip } result = TX_new(aid, host, proxies).main() logger.success(result) return result else: result = TX_new(aid, host).main() logger.success(result) return result except: if request.args.get('ip'): ip = request.args.get('ip') proxies = { 'https': f'{xieyi}://' + ip, 'http': f'{xieyi}://' + ip } result = Tx_old(aid, host, proxies).main() logger.success(result) return result else: result = Tx_old(aid, host).main() logger.success(result) return result if __name__ == '__main__': logger.info(f'代理协议-->{xieyi}') logger.debug(f'端口-->{port}') app.run(host='0.0.0.0', port=port)