请提供文件的di

This commit is contained in:
danial
2025-04-13 19:21:12 +08:00
parent 2b28c002d4
commit 83c8082bfe
14 changed files with 2838 additions and 0 deletions

View File

@@ -0,0 +1,16 @@
__pycache__
*.pyc
*.pyo
*.pyd
.Python
.git
.gitignore
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
*.spec
.DS_Store

View File

@@ -0,0 +1,50 @@
---
kind: pipeline
type: ssh
name: 香港服务器
server:
host: 38.38.251.113:31245
user: root
password:
from_secret: www_password
clone:
depth: 1
steps:
- name: build new image
environment:
DOCKER_LOGIN:
from_secret: docker_login
DOCKER_TOKEN:
from_secret: docker_token
DOCKER_PASSWORD:
from_secret: docker_password
commands:
- docker login git.kkknametrans.buzz -u $DOCKER_LOGIN -p $DOCKER_TOKEN
- docker build -t git.kkknametrans.buzz/danial/kami-slide-server-${DRONE_BRANCH}:${DRONE_BUILD_NUMBER} -f deploy/Dockerfile . --build-arg USE_PROXY=0
- docker tag git.kkknametrans.buzz/danial/kami-slide-server-${DRONE_BRANCH}:${DRONE_BUILD_NUMBER} git.kkknametrans.buzz/danial/kami-slide-server-${DRONE_BRANCH}:latest
- docker push git.kkknametrans.buzz/danial/kami-slide-server-${DRONE_BRANCH}:${DRONE_BUILD_NUMBER}
- docker push git.kkknametrans.buzz/danial/kami-slide-server-${DRONE_BRANCH}:latest
- docker logout git.kkknametrans.buzz
- name: deploy to docker compose
environment:
DOCKER_LOGIN:
from_secret: docker_login
DOCKER_TOKEN:
from_secret: docker_token
DOCKER_PASSWORD:
from_secret: docker_password
commands:
- docker login git.kkknametrans.buzz -u $DOCKER_LOGIN -p $DOCKER_TOKEN
- BRANCH=${DRONE_BRANCH} VERSION=${DRONE_BUILD_NUMBER} docker compose -f /data/kami/docker-compose.yaml --profile jd_bind_card up -d
- docker logout git.kkknametrans.buzz
trigger:
branch:
- develop
- production
when:
event:
- push

View File

@@ -0,0 +1,33 @@
FROM python:3.9
WORKDIR /app
# 安装Node.js环境和OpenCV依赖
RUN apt-get update && apt-get install -y \
curl \
gnupg \
libgl1-mesa-glx \
libglib2.0-0 \
libsm6 \
libxrender1 \
libxext6 && \
curl -fsSL https://deb.nodesource.com/setup_18.x | bash - && \
apt-get install -y nodejs && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# 验证Node.js和npm安装
RUN node --version && npm --version
# 复制依赖文件并安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码和资源文件
COPY . .
# 暴露配置文件中定义的端口默认99
EXPOSE 99
# 启动应用
CMD ["python", "app.py"]

View File

@@ -0,0 +1,51 @@
# Ticket Slide Server
## Docker部署说明
### 前提条件
- 安装 [Docker](https://www.docker.com/get-started)
- Install Docker Compose (https://docs.docker.com/compose/install/)
### 部署步骤
1. 克隆或下载项目代码到服务器
2. 进入项目目录
```bash
cd ticket_slide_server
```
3. 构建并启动Docker容器
```bash
docker-compose up -d
```
4. 查看容器运行状态
```bash
docker-compose ps
```
### 配置说明
- 服务默认在99端口运行可以通过修改`config.txt`文件更改端口
- 修改配置后需要重启容器:`docker-compose restart`
### API使用
访问API`http://服务器IP:99/api/TX?aid=<aid>&host=<host>`
可选参数:
- `ip`: 代理IP地址
### 日志查看
```bash
docker-compose logs -f
```
### 停止服务
```bash
docker-compose down
```

101
ticket_slide_server/ans.js Normal file
View File

@@ -0,0 +1,101 @@
function r(t, n) {
var e = (65535 & t) + (65535 & n);
return (t >> 16) + (n >> 16) + (e >> 16) << 16 | 65535 & e
}
function o(t, n, e, o, u, f) {
return r(function(t, n) {
return t << n | t >>> 32 - n
}(r(r(n, t), r(o, f)), u), e)
}
function u(t, n, e, r, u, f, a) {
return o(n & e | ~n & r, t, n, u, f, a)
}
function f(t, n, e, r, u, f, a) {
return o(n & r | e & ~r, t, n, u, f, a)
}
function a(t, n, e, r, u, f, a) {
return o(n ^ e ^ r, t, n, u, f, a)
}
function i(t, n, e, r, u, f, a) {
return o(e ^ (n | ~r), t, n, u, f, a)
}
function c(t, n) {
var e, o, c, l, s;
t[n >> 5] |= 128 << n % 32,
t[14 + (n + 64 >>> 9 << 4)] = n;
var d = 1732584193
, p = -271733879
, g = -1732584194
, _ = 271733878;
for (e = 0; e < t.length; e += 16)
o = d,
c = p,
l = g,
s = _,
p = i(p = i(p = i(p = i(p = a(p = a(p = a(p = a(p = f(p = f(p = f(p = f(p = u(p = u(p = u(p = u(p, g = u(g, _ = u(_, d = u(d, p, g, _, t[e], 7, -680876936), p, g, t[e + 1], 12, -389564586), d, p, t[e + 2], 17, 606105819), _, d, t[e + 3], 22, -1044525330), g = u(g, _ = u(_, d = u(d, p, g, _, t[e + 4], 7, -176418897), p, g, t[e + 5], 12, 1200080426), d, p, t[e + 6], 17, -1473231341), _, d, t[e + 7], 22, -45705983), g = u(g, _ = u(_, d = u(d, p, g, _, t[e + 8], 7, 1770035416), p, g, t[e + 9], 12, -1958414417), d, p, t[e + 10], 17, -42063), _, d, t[e + 11], 22, -1990404162), g = u(g, _ = u(_, d = u(d, p, g, _, t[e + 12], 7, 1804603682), p, g, t[e + 13], 12, -40341101), d, p, t[e + 14], 17, -1502002290), _, d, t[e + 15], 22, 1236535329), g = f(g, _ = f(_, d = f(d, p, g, _, t[e + 1], 5, -165796510), p, g, t[e + 6], 9, -1069501632), d, p, t[e + 11], 14, 643717713), _, d, t[e], 20, -373897302), g = f(g, _ = f(_, d = f(d, p, g, _, t[e + 5], 5, -701558691), p, g, t[e + 10], 9, 38016083), d, p, t[e + 15], 14, -660478335), _, d, t[e + 4], 20, -405537848), g = f(g, _ = f(_, d = f(d, p, g, _, t[e + 9], 5, 568446438), p, g, t[e + 14], 9, -1019803690), d, p, t[e + 3], 14, -187363961), _, d, t[e + 8], 20, 1163531501), g = f(g, _ = f(_, d = f(d, p, g, _, t[e + 13], 5, -1444681467), p, g, t[e + 2], 9, -51403784), d, p, t[e + 7], 14, 1735328473), _, d, t[e + 12], 20, -1926607734), g = a(g, _ = a(_, d = a(d, p, g, _, t[e + 5], 4, -378558), p, g, t[e + 8], 11, -2022574463), d, p, t[e + 11], 16, 1839030562), _, d, t[e + 14], 23, -35309556), g = a(g, _ = a(_, d = a(d, p, g, _, t[e + 1], 4, -1530992060), p, g, t[e + 4], 11, 1272893353), d, p, t[e + 7], 16, -155497632), _, d, t[e + 10], 23, -1094730640), g = a(g, _ = a(_, d = a(d, p, g, _, t[e + 13], 4, 681279174), p, g, t[e], 11, -358537222), d, p, t[e + 3], 16, -722521979), _, d, t[e + 6], 23, 76029189), g = a(g, _ = a(_, d = a(d, p, g, _, t[e + 9], 4, -640364487), p, g, t[e + 12], 11, -421815835), d, p, t[e + 15], 16, 530742520), _, d, t[e + 2], 23, -995338651), g = i(g, _ = i(_, d = i(d, p, g, _, t[e], 6, -198630844), p, g, t[e + 7], 10, 1126891415), d, p, t[e + 14], 15, -1416354905), _, d, t[e + 5], 21, -57434055), g = i(g, _ = i(_, d = i(d, p, g, _, t[e + 12], 6, 1700485571), p, g, t[e + 3], 10, -1894986606), d, p, t[e + 10], 15, -1051523), _, d, t[e + 1], 21, -2054922799), g = i(g, _ = i(_, d = i(d, p, g, _, t[e + 8], 6, 1873313359), p, g, t[e + 15], 10, -30611744), d, p, t[e + 6], 15, -1560198380), _, d, t[e + 13], 21, 1309151649), g = i(g, _ = i(_, d = i(d, p, g, _, t[e + 4], 6, -145523070), p, g, t[e + 11], 10, -1120210379), d, p, t[e + 2], 15, 718787259), _, d, t[e + 9], 21, -343485551),
d = r(d, o),
p = r(p, c),
g = r(g, l),
_ = r(_, s);
return [d, p, g, _]
}
function l(t) {
var n, e = "";
for (n = 0; n < 32 * t.length; n += 8)
e += String.fromCharCode(t[n >> 5] >>> n % 32 & 255);
return e
}
function s(t) {
var n, e = [];
for (e[(t.length >> 2) - 1] = void 0,
n = 0; n < e.length; n += 1)
e[n] = 0;
for (n = 0; n < 8 * t.length; n += 8)
e[n >> 5] |= (255 & t.charCodeAt(n / 8)) << n % 32;
return e
}
function d(t) {
var n, e, r = "0123456789abcdef", o = "";
for (e = 0; e < t.length; e += 1)
n = t.charCodeAt(e),
o += r.charAt(n >>> 4 & 15) + r.charAt(15 & n);
return o
}
function p(t) {
return unescape(encodeURIComponent(t))
}
function g(t) {
return function(t) {
return l(c(s(t), 8 * t.length))
}(p(t))
}
function _(t, n) {
return function(t, n) {
var e, r, o = s(t), u = [], f = [];
for (u[15] = f[15] = void 0,
o.length > 16 && (o = c(o, 8 * t.length)),
e = 0; e < 16; e += 1)
u[e] = 909522486 ^ o[e],
f[e] = 1549556828 ^ o[e];
return r = c(u.concat(s(n)), 512 + 8 * n.length),
l(c(f.concat(r), 640))
}(p(t), p(n))
}
function v(t, n, e) {
return n ? e ? _(n, t) : function(t, n) {
return d(_(t, n))
}(n, t) : e ? g(t) : function(t) {
return d(g(t))
}(t)
}
function getWorkloadResult(t, n) {
for (var e = t.nonce, r = t.target, o = +new Date, u = 0, f = "number" == typeof n ? n : 3e4; v("" + e + u) !== r && (u += 1,
!(+new Date - o > f)); )
;
return {
ans: u,
duration: +new Date - o
}
}
console.log(getWorkloadResult({"target":"5422d8a4362d3fcd5aac564fd894164b","nonce":"30d191a2da1b377d#"}))

603
ticket_slide_server/app.py Normal file
View File

@@ -0,0 +1,603 @@
# -*- coding: utf-8 -*-
# @Time: 2024/9/16 13:42
# @AuthorGuangXu-Pan
# @SoftwarePycharm
# @Filetx_slide_ai_word.py
import base64
import io
import json
import os
import re
import sys
import time
import cv2
import execjs
import numpy as np
# from curl_cffi import requests
import requests
from PIL import Image
from flask import Flask, request
from loguru import logger
def get_resource_path(relative_path):
""" 获取打包后资源文件的路径 """
if hasattr(sys, '_MEIPASS'):
# 如果程序是打包状态,使用 _MEIPASS 目录
return os.path.join(sys._MEIPASS, relative_path)
# 如果是未打包状态,使用当前目录
return os.path.join(os.path.abspath("."), relative_path)
class TX_new:
def __init__(self, aid, host, proxy=None):
self.requests = requests.session()
if proxy:
self.requests.proxies = proxy
self.aid = aid
self.host = host
self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
self.jiami = base64.b64encode(self.ua.encode()).decode(encoding='utf-8')
try:
ip_text = proxy['https'].split('://')[-1].split(':')[0]
self.ip_v4 = ip_text
except:
try:
ip_text = self.get_current_ip()
except:
ip_text = self.get_current_ip()
self.ip_v4 = re.findall('IP(.*)来自于', ip_text)[0]
# logger.debug("当前ip:{}".format(self.ip_v4))
self.headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Pragma": "no-cache",
"Referer": "https://cloud.tencent.com/",
"Sec-Fetch-Dest": "script",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "cross-site",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"sec-ch-ua": "\"Google Chrome\";v=\"124\", \"Not:A-Brand\";v=\"8\", \"Chromium\";v=\"124\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
self.response = None
def get_current_ip(self) -> str:
headers = {
"user-agent": self.ua
}
url = "https://myip.ipip.net"
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.text
def get_image(self, img_url, sprite_url=None):
bg_url = self.host + img_url
if sprite_url:
fg_url = self.host + sprite_url
fg_content = self.requests.get(fg_url, headers=self.headers).content
bg_content = self.requests.get(bg_url, headers=self.headers).content
return bg_content, fg_content
else:
content = self.requests.get(bg_url, headers=self.headers).content
return content
# with open('bg.png', 'wb') as file:
# file.write(bg_content)
# with open('fg.png', 'wb') as files:
# files.write(fg_content)
def bytes_to_cv2(self, img):
"""
二进制图片转cv2
:param img: 二进制图片数据, <type 'bytes'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
# 将图片字节码bytes, 转换成一维的numpy数组到缓存中
img_buffer_np = np.frombuffer(img, dtype=np.uint8)
# 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式
img_np = cv2.imdecode(img_buffer_np, 1)
return img_np
def cv2_open(self, img, flag=None):
if isinstance(img, bytes):
img = self.bytes_to_cv2(img)
elif isinstance(img, str):
img = cv2.imread(str(img))
elif isinstance(img, np.ndarray):
img = img
else:
raise ValueError(f'输入的图片类型无法解析: {type(img)}')
if flag is not None:
img = cv2.cvtColor(img, flag)
return img
def get_distance(self, bg, tp):
"""
:param bg: 背景图路径
:param tp: 缺口图路径
:return: 缺口位置
"""
# 读取图片
bg_img = self.cv2_open(bg)
tp_gray = self.cv2_open(tp, flag=cv2.COLOR_BGR2GRAY)
# 金字塔均值漂移
bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50)
# 边缘检测
tp_gray = cv2.Canny(tp_gray, 255, 255)
bg_gray = cv2.Canny(bg_shift, 255, 255)
# 目标匹配
result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED)
# 解析匹配结果
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
# distance = max_loc[0]
return max_loc
def crop_img(self, bg_content, fg_content):
# 打开图像文件
image = Image.open(io.BytesIO(fg_content))
x1, y1, x2, y2 = 142, 492, 260, 610
cropped_image = image.crop((x1, y1, x2, y2))
# 将裁剪后的图像转换回字节数据
cropped_bytes_io = io.BytesIO()
cropped_image.save(cropped_bytes_io, format='PNG')
cropped_bytes = cropped_bytes_io.getvalue()
# 缺口距离
distance = self.get_distance(bg_content, cropped_bytes)
logger.debug(f"缺口位置为{distance}")
# 坐标
x = str(distance[0])
y = str(distance[1])
return x, y
def get_tx_verify(self, tdc_path):
capurl = self.host + tdc_path
headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Pragma": "no-cache",
"Referer": "https://turing.captcha.gtimg.com/",
"Sec-Fetch-Dest": "script",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "cross-site",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"124\", \"Google Chrome\";v=\"124\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
response = self.requests.get(capurl, headers=headers)
cap_ = response.text
tx = get_resource_path('tx.js')
with open(tx, 'r', encoding="utf-8") as f:
jscode = f.read().replace('"vmp"', cap_)
collect_dict = execjs.compile(jscode).call('get_data', self.ip_v4)
eks = collect_dict["eks"]
tlg = collect_dict["tlg"]
collect = collect_dict["collect"]
return eks, tlg, collect
def get_slide_ticket(self, eks, tlg, collect, x, y, sess, pow_answer, pow_calc_time):
url = self.host + "/cap_union_new_verify"
data = {
"collect": collect,
"tlg": tlg,
"eks": eks,
"sess": sess,
"ans": "[{\"elem_id\":1,\"type\":\"DynAnswerType_POS\",\"data\":\"" + x + "," + y + "\"}]",
"pow_answer": pow_answer,
"pow_calc_time": pow_calc_time
}
response = self.requests.post(url, headers=self.headers, data=data).json()
# logger.info(response)
return response
def slide(self, img_url, sess, tdc_path):
sprite_url = json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["dyn_show_info"][
"sprite_url"]
prefix = \
json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["comm_captcha_cfg"]["pow_cfg"][
"prefix"]
md5 = json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["comm_captcha_cfg"]["pow_cfg"][
"md5"]
ans = get_resource_path('ans.js')
with open(ans, 'r', encoding='utf-8') as frd:
jscode = frd.read()
hhhh = {"target": md5, "nonce": prefix}
pow_an = execjs.compile(jscode).call('getWorkloadResult', hhhh)
pow = pow_an["ans"]
pow_calc_time = pow_an["duration"]
pow_answer = f"{prefix}{str(pow)}"
bg_content, fg_content = self.get_image(img_url, sprite_url)
x, y = self.crop_img(bg_content, fg_content)
eks, tlg, collect = self.get_tx_verify(tdc_path)
result = self.get_slide_ticket(eks, tlg, collect, x, y, sess, pow_answer, pow_calc_time)
if result['errorCode'] == '0':
return {'ticket': result['ticket'], 'randstr': result['randstr']}
if result['errorCode'] == '50':
logger.error('识别错误')
return self.main()
else:
logger.error('失败')
logger.error(result['errorCode'])
return '失败请重试'
def main(self):
url = self.host + "/cap_union_prehandle"
params = {
"aid": self.aid, # 小程序id
"protocol": "https",
"accver": "1",
"showtype": "popup",
"ua": self.jiami,
"noheader": "1",
"fb": "1",
"aged": "0",
"enableAged": "0",
"enableDarkMode": "0",
"grayscale": "1",
"clientype": "2",
"cap_cd": "",
"uid": "",
"lang": "zh-cn",
# "entry_url": "https://cloud.tencent.com/product/captcha",
"entry_url": "https://vip.tulingpyton.cn/",
"elder_captcha": "0",
"js": "/tcaptcha-frame.cc3d815a.js",
"login_appid": "",
"wb": "2",
"subsid": "3",
# "callback": "_aq_612456",
"sess": ""
}
self.response = self.requests.get(url, headers=self.headers, params=params)
sess = json.loads(self.response.text.replace("(", "").replace(")", ""))["sess"]
sid = json.loads(self.response.text.replace("(", "").replace(")", ""))["sid"]
tdc_path = json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["comm_captcha_cfg"][
"tdc_path"]
if json.loads(self.response.text.replace("(", "").replace(")", "")).get("data").get("dyn_show_info").get(
"instruction"):
if '拼图' in json.loads(self.response.text.replace("(", "").replace(")", "")).get("data").get(
"dyn_show_info").get("instruction"):
img_url = \
json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["dyn_show_info"][
"bg_elem_cfg"][
"img_url"]
logger.info('当前需要滑动验证')
result = self.slide(img_url, sess, tdc_path)
# logger.success(result)
return result
else:
logger.error('不支持类型')
return '不支持类型'
else:
logger.error('不支持类型')
return '不支持类型'
class Tx_old:
def __init__(self, aid, host, proxy=None):
self.session = requests.session()
self.host = host
if proxy:
self.session.proxies = proxy
self.aid = aid
try:
ip_text = proxy['https'].split('://')[-1].split(':')[0]
self.ip_v4 = ip_text
except:
try:
ip_text = self.get_current_ip()
except:
ip_text = self.get_current_ip()
self.ip_v4 = re.findall('IP(.*)来自于', ip_text)[0]
# logger.debug("当前ip:{}".format(self.ip_v4))
self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
self.jiami = base64.b64encode(self.ua.encode()).decode(encoding='utf-8')
self.createIframeStart = str(int(time.time() * 1000))
self.headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Pragma": "no-cache",
"Referer": "https://cloud.tencent.com/",
"Sec-Fetch-Dest": "script",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "cross-site",
"User-Agent": self.ua,
"sec-ch-ua": "\"Google Chrome\";v=\"124\", \"Not:A-Brand\";v=\"8\", \"Chromium\";v=\"124\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
def get_current_ip(self) -> str:
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
}
url = "https://myip.ipip.net"
response = self.session.get(url, headers=headers)
if response.status_code == 200:
return response.text
def get_sid_sess(self):
url = self.host + '/cap_union_prehandle'
data = {
'aid': self.aid,
'protocol': 'https',
'accver': '1',
'showtype': 'popup',
'ua': 'TW96aWxsYS81LjAgKFdpbmRvd3MgTlQgMTAuMDsgV2luNjQ7IHg2NCkgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgQ2hyb21lLzExNi4wLjAuMCBTYWZhcmkvNTM3LjM2',
'noheader': '1',
'fb': '0',
'aged': '0',
'enableAged': '0',
'enableDarkMode': '1',
'grayscale': '1',
'clientype': '2',
'cap_cd': '',
'uid': '',
'lang': 'zh-cn',
'entry_url': 'https://www.szwego.com/static/index.html#/password_login',
'elder_captcha': '0',
'js': '/tcaptcha-frame.22125576.js',
'login_appid': '',
'wb': '1',
'subsid': '3',
'callback': '_aq_373409',
'sess': '',
}
res = self.session.get(url, params=data).text
return re.findall('"sess":"(.*?)"', res)[0], re.findall('"sid":"(.*?)"', res)[0]
def bytes_to_cv2(self, img):
"""
二进制图片转cv2
:param img: 二进制图片数据, <type 'bytes'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
# 将图片字节码bytes, 转换成一维的numpy数组到缓存中
img_buffer_np = np.frombuffer(img, dtype=np.uint8)
# 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式
img_np = cv2.imdecode(img_buffer_np, 1)
return img_np
def cv2_open(self, img, flag=None):
"""
统一输出图片格式为cv2图像, <type 'numpy.ndarray'>
:param img: <type 'bytes'/'numpy.ndarray'/'str'/'Path'/'PIL.JpegImagePlugin.JpegImageFile'>
:param flag: 颜色空间转换类型, default: None
eg: cv2.COLOR_BGR2GRAY灰度图
:return: cv2图像, <numpy.ndarray>
"""
if isinstance(img, bytes):
img = self.bytes_to_cv2(img)
else:
raise ValueError(f'输入的图片类型无法解析: {type(img)}')
if flag is not None:
img = cv2.cvtColor(img, flag)
return img
def get_distance(self, bg, tp):
# 读取图片
bg_img = self.cv2_open(bg)
tp_gray = self.cv2_open(tp, flag=cv2.COLOR_BGR2GRAY)
# 金字塔均值漂移
bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50)
# 边缘检测
tp_gray = cv2.Canny(tp_gray, 255, 255)
bg_gray = cv2.Canny(bg_shift, 255, 255)
# 目标匹配
result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED)
# 解析匹配结果
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
logger.info(f'坐标:{min_loc}')
return max_loc
def verify(self, sess2, sid, ans, pow_answer, collect, tlg, eks, nonce):
verify_url = self.host + '/cap_union_new_verify'
data = {
'aid': self.aid,
'protocol': 'https',
'accver': '1',
'showtype': 'popup',
'ua': self.jiami,
'noheader': '1',
'fb': '0',
'aged': '0',
'enableAged': '0',
'enableDarkMode': '1',
'grayscale': '1',
'clientype': '2',
'sess': sess2,
'fwidth': '0',
'sid': str(sid),
'wxLang': '',
'tcScale': '1',
'uid': '',
'cap_cd': '',
'rnd': '415581',
'prehandleLoadTime': '83',
'createIframeStart': self.createIframeStart,
'global': '0',
'subsid': '26',
'cdata': '0',
'ans': ans,
'vsig': '',
'websig': '',
'subcapclass': '',
'pow_answer': pow_answer,
'pow_calc_time': '10',
'collect': collect,
'tlg': tlg,
'fpinfo': '',
'eks': eks,
'nonce': nonce,
'vlg': '0_0_1',
'vData': 'ed0FNEDmGsgjfhnCwiBah1gi1hqqagtoj88Pd0lL95PbaFK-LucV4iDyjLi*iCQ7h9V04rTubXsbuSpp9H1MctodkEOQDPiacJr1PBkrwfPse3PZPmgfP3daO0*a34PJrD1ShB32X_jt0PAD8MoHriYY',
}
response = self.session.post(verify_url, data=data, headers=self.headers).json()
return response
def get_image_str(self, sess, sid):
get_url = self.host + '/cap_union_new_show'
get_data = {
'aid': self.aid,
'protocol': 'https',
'accver': '1',
'showtype': 'popup',
'ua': self.jiami,
'noheader': '1',
'fb': '0',
'aged': '0',
'enableAged': '0',
'enableDarkMode': '1',
'grayscale': '1',
'clientype': '2',
'sess': f'{sess}',
'fwidth': '0',
'sid': f'{sid}',
'wxLang': '',
'tcScale': '1',
'uid': '',
'cap_cd': '',
'rnd': '204910',
'prehandleLoadTime': '84',
'createIframeStart': f'{int(time.time() * 1000)}',
'global': '0',
'subsid': '6',
}
respon = self.session.get(get_url, params=get_data)
respon.encoding = 'utf-8'
image_str = re.findall('cdnPic1:"/hycdn\?index=1&image=(.*?)",', respon.text)[0]
# prefix = re.findall('prefix:"(.*?)"', respon.text)[0]
# md5 = re.findall('md5:"(.*?)"', respon.text)[0]
# with open('ans.js', 'r', encoding='utf-8') as frd:
# jscode = frd.read()
# hhhh = {"target": md5, "nonce": prefix}
# pow_an = execjs.compile(jscode).call('getWorkloadResult', hhhh)
# pow = pow_an["ans"]
# pow_answer = f"{prefix}{str(pow)}"
pow_answer = ""
nonce = re.findall('nonce:"(.*?)"', respon.text)[0]
sess2 = re.findall('sess:"(.*?)"', respon.text)[0]
js_path = re.findall('dcFileName:"(.*?)"', respon.text)[0]
return pow_answer, nonce, sess2, js_path, image_str
def slide(self, image_str, sess2, sid, js_path):
p1 = {
'index': '1',
'image': f'{image_str}?aid={self.aid}',
'sess': sess2,
'sid': sid,
'img_index': '1',
'subsid': '7',
}
p2 = {
'index': '2',
'image': f'{image_str}?aid={self.aid}',
'sess': sess2,
'sid': sid,
'img_index': '2',
'subsid': '8',
}
distance = self.get_distance(
self.session.get(self.host + '/hycdn', params=p1).content,
self.session.get(self.host + '/hycdn', params=p2).content,
)
ans = str(distance[0]) + ',' + str(distance[1]) + ';'
js_url = f'{self.host}/{js_path}'
cap_ = self.session.get(js_url).text
tx = get_resource_path('tx.js')
with open(tx, 'r', encoding="utf-8") as f:
jscode = f.read().replace('"vmp"', cap_)
collect_dict = execjs.compile(jscode).call('get_data', self.ip_v4)
eks = collect_dict["eks"]
tlg = collect_dict["tlg"]
collect = collect_dict["collect"]
return ans, eks, tlg, collect
def main(self):
sess, sid = self.get_sid_sess()
pow_answer, nonce, sess2, js_path, image_str = self.get_image_str(sess, sid)
ans, eks, tlg, collect = self.slide(image_str, sess2, sid, js_path)
response = self.verify(sess2, sid, ans, pow_answer, collect, tlg, eks, nonce)
if response['errorCode'] == '0':
return {'ticket': response['ticket'], 'randstr': response['randstr']}
if response['errorCode'] == '50':
logger.error('识别错误')
return self.main()
else:
logger.error('失败')
logger.error(response['errorCode'])
return '失败请重试'
def read_port_from_config():
config_path = 'config.txt'
with open(config_path, 'r') as file:
data = file.readlines()
port = int(data[0].strip())
xieyi = data[1]
return port, xieyi
port, xieyi = read_port_from_config()
app = Flask(__name__)
@app.route('/api/TX', methods=['GET'])
def handle_request():
aid = request.args.get('aid')
host = request.args.get('host')
try:
if request.args.get('ip'):
ip = request.args.get('ip')
proxies = {
'https': f'{xieyi}://' + ip,
'http': f'{xieyi}://' + ip
}
result = TX_new(aid, host, proxies).main()
logger.success(result)
return result
else:
result = TX_new(aid, host).main()
logger.success(result)
return result
except:
if request.args.get('ip'):
ip = request.args.get('ip')
proxies = {
'https': f'{xieyi}://' + ip,
'http': f'{xieyi}://' + ip
}
result = Tx_old(aid, host, proxies).main()
logger.success(result)
return result
else:
result = Tx_old(aid, host).main()
logger.success(result)
return result
if __name__ == '__main__':
logger.info(f'代理协议-->{xieyi}')
logger.debug(f'端口-->{port}')
app.run(host='0.0.0.0', port=port)

View File

@@ -0,0 +1,2 @@
99
http

View File

@@ -0,0 +1,10 @@
version: '3'
services:
ticket-slide-server:
build: .
container_name: ticket-slide-server
ports:
# 将容器内部端口映射到主机使用配置文件中的端口默认99
- "99:99"
restart: unless-stopped

View File

@@ -0,0 +1,9 @@
numpy==1.26.2
opencv-python-headless==4.9.0.80
flask==3.0.3
flask_cors==4.0.1
pillow==10.3.0
requests==2.32.3
pyexecjs==1.5.1
loguru==0.7.2
pytz==2024.2

View File

View File

1316
ticket_slide_server/tx.js Normal file

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,609 @@
# -*- coding: utf-8 -*-
# @Time: 2024/9/16 13:42
# @AuthorGuangXu-Pan
# @SoftwarePycharm
# @Filetx_slide_ai_word.py
import base64
import io
import json
import os
import re
import sys
import time
import cv2
import execjs
import numpy as np
# from curl_cffi import requests
import requests
from PIL import Image
from flask import Flask, request
from loguru import logger
def get_resource_path(relative_path):
""" 获取打包后资源文件的路径 """
if hasattr(sys, '_MEIPASS'):
# 如果程序是打包状态,使用 _MEIPASS 目录
return os.path.join(sys._MEIPASS, relative_path)
# 如果是未打包状态,使用当前目录
return os.path.join(os.path.abspath("."), relative_path)
class TX_new:
def __init__(self, aid, host, proxy=None):
self.requests = requests.session()
if proxy:
self.requests.proxies = proxy
self.aid = aid
self.host = host
self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
self.jiami = base64.b64encode(self.ua.encode()).decode(encoding='utf-8')
try:
ip_text = proxy['https'].split('://')[-1].split(':')[0]
self.ip_v4 = ip_text
except:
try:
ip_text = self.get_current_ip()
except:
ip_text = self.get_current_ip()
self.ip_v4 = re.findall('IP(.*)来自于', ip_text)[0]
# logger.debug("当前ip:{}".format(self.ip_v4))
self.headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Pragma": "no-cache",
"Referer": "https://cloud.tencent.com/",
"Sec-Fetch-Dest": "script",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "cross-site",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"sec-ch-ua": "\"Google Chrome\";v=\"124\", \"Not:A-Brand\";v=\"8\", \"Chromium\";v=\"124\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
self.response = None
def get_current_ip(self) -> str:
headers = {
"user-agent": self.ua
}
url = "https://myip.ipip.net"
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.text
def get_image(self, img_url, sprite_url=None):
bg_url = self.host + img_url
if sprite_url:
fg_url = self.host + sprite_url
fg_content = self.requests.get(fg_url, headers=self.headers).content
bg_content = self.requests.get(bg_url, headers=self.headers).content
return bg_content, fg_content
else:
content = self.requests.get(bg_url, headers=self.headers).content
return content
# with open('bg.png', 'wb') as file:
# file.write(bg_content)
# with open('fg.png', 'wb') as files:
# files.write(fg_content)
def bytes_to_cv2(self, img):
"""
二进制图片转cv2
:param img: 二进制图片数据, <type 'bytes'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
# 将图片字节码bytes, 转换成一维的numpy数组到缓存中
img_buffer_np = np.frombuffer(img, dtype=np.uint8)
# 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式
img_np = cv2.imdecode(img_buffer_np, 1)
return img_np
def cv2_open(self, img, flag=None):
if isinstance(img, bytes):
img = self.bytes_to_cv2(img)
elif isinstance(img, str):
img = cv2.imread(str(img))
elif isinstance(img, np.ndarray):
img = img
else:
raise ValueError(f'输入的图片类型无法解析: {type(img)}')
if flag is not None:
img = cv2.cvtColor(img, flag)
return img
def get_distance(self, bg, tp):
"""
:param bg: 背景图路径
:param tp: 缺口图路径
:return: 缺口位置
"""
# 读取图片
bg_img = self.cv2_open(bg)
tp_gray = self.cv2_open(tp, flag=cv2.COLOR_BGR2GRAY)
# 金字塔均值漂移
bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50)
# 边缘检测
tp_gray = cv2.Canny(tp_gray, 255, 255)
bg_gray = cv2.Canny(bg_shift, 255, 255)
# 目标匹配
result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED)
# 解析匹配结果
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
# distance = max_loc[0]
return max_loc
def crop_img(self, bg_content, fg_content):
# 打开图像文件
image = Image.open(io.BytesIO(fg_content))
x1, y1, x2, y2 = 142, 492, 260, 610
cropped_image = image.crop((x1, y1, x2, y2))
# 将裁剪后的图像转换回字节数据
cropped_bytes_io = io.BytesIO()
cropped_image.save(cropped_bytes_io, format='PNG')
cropped_bytes = cropped_bytes_io.getvalue()
# 缺口距离
distance = self.get_distance(bg_content, cropped_bytes)
logger.debug(f"缺口位置为{distance}")
# 坐标
x = str(distance[0])
y = str(distance[1])
return x, y
def get_tx_verify(self, tdc_path):
capurl = self.host + tdc_path
headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Pragma": "no-cache",
"Referer": "https://turing.captcha.gtimg.com/",
"Sec-Fetch-Dest": "script",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "cross-site",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"124\", \"Google Chrome\";v=\"124\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
response = self.requests.get(capurl, headers=headers)
cap_ = response.text
tx = get_resource_path('tx.js')
with open(tx, 'r', encoding="utf-8") as f:
jscode = f.read().replace('"vmp"', cap_)
collect_dict = execjs.compile(jscode).call('get_data', self.ip_v4)
eks = collect_dict["eks"]
tlg = collect_dict["tlg"]
collect = collect_dict["collect"]
return eks, tlg, collect
def get_slide_ticket(self, eks, tlg, collect, x, y, sess, pow_answer,pow_calc_time):
url = self.host + "/cap_union_new_verify"
data = {
"collect": collect,
"tlg": tlg,
"eks": eks,
"sess": sess,
"ans": "[{\"elem_id\":1,\"type\":\"DynAnswerType_POS\",\"data\":\"" + x + "," + y + "\"}]",
"pow_answer": pow_answer,
"pow_calc_time": pow_calc_time
}
response = self.requests.post(url, headers=self.headers, data=data).json()
# logger.info(response)
return response
def slide(self, img_url, sess, tdc_path):
sprite_url = json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["dyn_show_info"][
"sprite_url"]
prefix = \
json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["comm_captcha_cfg"]["pow_cfg"][
"prefix"]
md5 = json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["comm_captcha_cfg"]["pow_cfg"][
"md5"]
ans = get_resource_path('ans.js')
with open(ans, 'r', encoding='utf-8') as frd:
jscode = frd.read()
hhhh = {"target": md5, "nonce": prefix}
pow_an = execjs.compile(jscode).call('getWorkloadResult', hhhh)
pow = pow_an["ans"]
pow_calc_time = pow_an["duration"]
pow_answer = f"{prefix}{str(pow)}"
bg_content, fg_content = self.get_image(img_url, sprite_url)
x, y = self.crop_img(bg_content, fg_content)
eks, tlg, collect = self.get_tx_verify(tdc_path)
result = self.get_slide_ticket(eks, tlg, collect, x, y, sess, pow_answer,pow_calc_time)
if result['errorCode'] == '0':
return {'ticket': result['ticket'], 'randstr': result['randstr']}
if result['errorCode'] == '50':
logger.error('识别错误')
return self.main()
else:
logger.error('失败')
logger.error(result['errorCode'])
return '失败请重试'
def main(self):
url = self.host + "/cap_union_prehandle"
params = {
"aid": self.aid, # 小程序id
"protocol": "https",
"accver": "1",
"showtype": "popup",
"ua": self.jiami,
"noheader": "1",
"fb": "1",
"aged": "0",
"enableAged": "0",
"enableDarkMode": "0",
"grayscale": "1",
"clientype": "2",
"cap_cd": "",
"uid": "",
"lang": "zh-cn",
# "entry_url": "https://cloud.tencent.com/product/captcha",
"entry_url": "https://vip.tulingpyton.cn/",
"elder_captcha": "0",
"js": "/tcaptcha-frame.cc3d815a.js",
"login_appid": "",
"wb": "2",
"subsid": "3",
# "callback": "_aq_612456",
"sess": ""
}
self.response = self.requests.get(url, headers=self.headers, params=params)
sess = json.loads(self.response.text.replace("(", "").replace(")", ""))["sess"]
sid = json.loads(self.response.text.replace("(", "").replace(")", ""))["sid"]
tdc_path = json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["comm_captcha_cfg"][
"tdc_path"]
if json.loads(self.response.text.replace("(", "").replace(")", "")).get("data").get("dyn_show_info").get("instruction"):
if '拼图' in json.loads(self.response.text.replace("(", "").replace(")", "")).get("data").get(
"dyn_show_info").get("instruction"):
img_url = \
json.loads(self.response.text.replace("(", "").replace(")", ""))["data"]["dyn_show_info"]["bg_elem_cfg"][
"img_url"]
logger.info('当前需要滑动验证')
result = self.slide(img_url, sess, tdc_path)
# logger.success(result)
return result
else:
logger.error('不支持类型')
return '不支持类型'
else:
logger.error('不支持类型')
return '不支持类型'
class Tx_old:
def __init__(self, aid, host, proxy=None):
self.session = requests.session()
self.host = host
if proxy:
self.session.proxies = proxy
self.aid = aid
try:
ip_text = proxy['https'].split('://')[-1].split(':')[0]
self.ip_v4 = ip_text
except:
try:
ip_text = self.get_current_ip()
except:
ip_text = self.get_current_ip()
self.ip_v4 = re.findall('IP(.*)来自于', ip_text)[0]
# logger.debug("当前ip:{}".format(self.ip_v4))
self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
self.jiami = base64.b64encode(self.ua.encode()).decode(encoding='utf-8')
self.createIframeStart = str(int(time.time() * 1000))
self.headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Pragma": "no-cache",
"Referer": "https://cloud.tencent.com/",
"Sec-Fetch-Dest": "script",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "cross-site",
"User-Agent": self.ua,
"sec-ch-ua": "\"Google Chrome\";v=\"124\", \"Not:A-Brand\";v=\"8\", \"Chromium\";v=\"124\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
def get_current_ip(self) -> str:
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
}
url = "https://myip.ipip.net"
response = self.session.get(url, headers=headers)
if response.status_code == 200:
return response.text
def get_sid_sess(self):
url = self.host+'/cap_union_prehandle'
data = {
'aid': self.aid,
'protocol': 'https',
'accver': '1',
'showtype': 'popup',
'ua': 'TW96aWxsYS81LjAgKFdpbmRvd3MgTlQgMTAuMDsgV2luNjQ7IHg2NCkgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgQ2hyb21lLzExNi4wLjAuMCBTYWZhcmkvNTM3LjM2',
'noheader': '1',
'fb': '0',
'aged': '0',
'enableAged': '0',
'enableDarkMode': '1',
'grayscale': '1',
'clientype': '2',
'cap_cd': '',
'uid': '',
'lang': 'zh-cn',
'entry_url': 'https://www.szwego.com/static/index.html#/password_login',
'elder_captcha': '0',
'js': '/tcaptcha-frame.22125576.js',
'login_appid': '',
'wb': '1',
'subsid': '3',
'callback': '_aq_373409',
'sess': '',
}
res = self.session.get(url, params=data).text
return re.findall('"sess":"(.*?)"', res)[0], re.findall('"sid":"(.*?)"', res)[0]
def bytes_to_cv2(self, img):
"""
二进制图片转cv2
:param img: 二进制图片数据, <type 'bytes'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
# 将图片字节码bytes, 转换成一维的numpy数组到缓存中
img_buffer_np = np.frombuffer(img, dtype=np.uint8)
# 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式
img_np = cv2.imdecode(img_buffer_np, 1)
return img_np
def cv2_open(self, img, flag=None):
"""
统一输出图片格式为cv2图像, <type 'numpy.ndarray'>
:param img: <type 'bytes'/'numpy.ndarray'/'str'/'Path'/'PIL.JpegImagePlugin.JpegImageFile'>
:param flag: 颜色空间转换类型, default: None
eg: cv2.COLOR_BGR2GRAY灰度图
:return: cv2图像, <numpy.ndarray>
"""
if isinstance(img, bytes):
img = self.bytes_to_cv2(img)
else:
raise ValueError(f'输入的图片类型无法解析: {type(img)}')
if flag is not None:
img = cv2.cvtColor(img, flag)
return img
def get_distance(self, bg, tp):
# 读取图片
bg_img = self.cv2_open(bg)
tp_gray = self.cv2_open(tp, flag=cv2.COLOR_BGR2GRAY)
# 金字塔均值漂移
bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50)
# 边缘检测
tp_gray = cv2.Canny(tp_gray, 255, 255)
bg_gray = cv2.Canny(bg_shift, 255, 255)
# 目标匹配
result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED)
# 解析匹配结果
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
logger.info(f'坐标:{min_loc}')
return max_loc
def verify(self,sess2,sid,ans,pow_answer,collect,tlg,eks,nonce):
verify_url = self.host + '/cap_union_new_verify'
data = {
'aid': self.aid,
'protocol': 'https',
'accver': '1',
'showtype': 'popup',
'ua': self.jiami,
'noheader': '1',
'fb': '0',
'aged': '0',
'enableAged': '0',
'enableDarkMode': '1',
'grayscale': '1',
'clientype': '2',
'sess': sess2,
'fwidth': '0',
'sid': str(sid),
'wxLang': '',
'tcScale': '1',
'uid': '',
'cap_cd': '',
'rnd': '415581',
'prehandleLoadTime': '83',
'createIframeStart': self.createIframeStart,
'global': '0',
'subsid': '26',
'cdata': '0',
'ans': ans,
'vsig': '',
'websig': '',
'subcapclass': '',
'pow_answer': pow_answer,
'pow_calc_time': '10',
'collect': collect,
'tlg': tlg,
'fpinfo': '',
'eks': eks,
'nonce': nonce,
'vlg': '0_0_1',
'vData': 'ed0FNEDmGsgjfhnCwiBah1gi1hqqagtoj88Pd0lL95PbaFK-LucV4iDyjLi*iCQ7h9V04rTubXsbuSpp9H1MctodkEOQDPiacJr1PBkrwfPse3PZPmgfP3daO0*a34PJrD1ShB32X_jt0PAD8MoHriYY',
}
response = self.session.post(verify_url, data=data, headers=self.headers).json()
return response
def get_image_str(self, sess, sid):
get_url = self.host + '/cap_union_new_show'
get_data = {
'aid': self.aid,
'protocol': 'https',
'accver': '1',
'showtype': 'popup',
'ua': self.jiami,
'noheader': '1',
'fb': '0',
'aged': '0',
'enableAged': '0',
'enableDarkMode': '1',
'grayscale': '1',
'clientype': '2',
'sess': f'{sess}',
'fwidth': '0',
'sid': f'{sid}',
'wxLang': '',
'tcScale': '1',
'uid': '',
'cap_cd': '',
'rnd': '204910',
'prehandleLoadTime': '84',
'createIframeStart': f'{int(time.time() * 1000)}',
'global': '0',
'subsid': '6',
}
respon = self.session.get(get_url, params=get_data)
respon.encoding = 'utf-8'
image_str = re.findall('cdnPic1:"/hycdn\?index=1&image=(.*?)",', respon.text)[0]
# prefix = re.findall('prefix:"(.*?)"', respon.text)[0]
# md5 = re.findall('md5:"(.*?)"', respon.text)[0]
# with open('ans.js', 'r', encoding='utf-8') as frd:
# jscode = frd.read()
# hhhh = {"target": md5, "nonce": prefix}
# pow_an = execjs.compile(jscode).call('getWorkloadResult', hhhh)
# pow = pow_an["ans"]
# pow_answer = f"{prefix}{str(pow)}"
pow_answer = ""
nonce = re.findall('nonce:"(.*?)"', respon.text)[0]
sess2 = re.findall('sess:"(.*?)"', respon.text)[0]
js_path = re.findall('dcFileName:"(.*?)"', respon.text)[0]
return pow_answer, nonce, sess2, js_path, image_str
def slide(self,image_str,sess2,sid,js_path):
p1 = {
'index': '1',
'image': f'{image_str}?aid={self.aid}',
'sess': sess2,
'sid': sid,
'img_index': '1',
'subsid': '7',
}
p2 = {
'index': '2',
'image': f'{image_str}?aid={self.aid}',
'sess': sess2,
'sid': sid,
'img_index': '2',
'subsid': '8',
}
distance = self.get_distance(
self.session.get(self.host + '/hycdn', params=p1).content,
self.session.get(self.host + '/hycdn', params=p2).content,
)
ans = str(distance[0]) + ',' + str(distance[1]) + ';'
js_url = f'{self.host}/{js_path}'
cap_ = self.session.get(js_url).text
tx = get_resource_path('tx.js')
with open(tx, 'r', encoding="utf-8") as f:
jscode = f.read().replace('"vmp"', cap_)
collect_dict = execjs.compile(jscode).call('get_data', self.ip_v4)
eks = collect_dict["eks"]
tlg = collect_dict["tlg"]
collect = collect_dict["collect"]
return ans,eks,tlg,collect
def main(self):
sess, sid = self.get_sid_sess()
pow_answer, nonce, sess2, js_path,image_str = self.get_image_str(sess, sid)
ans, eks, tlg, collect = self.slide(image_str,sess2,sid,js_path)
response = self.verify(sess2, sid, ans, pow_answer, collect, tlg, eks, nonce)
if response['errorCode'] == '0':
return {'ticket': response['ticket'], 'randstr': response['randstr']}
if response['errorCode'] == '50':
logger.error('识别错误')
return self.main()
else:
logger.error('失败')
logger.error(response['errorCode'])
return '失败请重试'
def read_port_from_config():
config_path = 'config.txt'
with open(config_path, 'r') as file:
data = file.readlines()
port = int(data[0].strip())
xieyi = data[1]
return port, xieyi
port, xieyi = read_port_from_config()
app = Flask(__name__)
@app.route('/api/TX', methods=['GET'])
def handle_request():
aid = request.args.get('aid')
host = request.args.get('host')
try:
if request.args.get('ip'):
ip = request.args.get('ip')
proxies = {
'https': f'{xieyi}://' + ip,
'http': f'{xieyi}://' + ip
}
result = TX_new(aid,host,proxies).main()
logger.success(result)
return result
else:
result = TX_new(aid, host).main()
logger.success(result)
return result
except:
if request.args.get('ip'):
ip = request.args.get('ip')
proxies = {
'https': f'{xieyi}://' + ip,
'http': f'{xieyi}://' + ip
}
result = Tx_old(aid,host, proxies).main()
logger.success(result)
return result
else:
result = Tx_old(aid, host).main()
logger.success(result)
return result
if __name__ == '__main__':
logger.info(f'代理协议-->{xieyi}')
logger.debug(f'端口-->{port}')
app.run(host='0.0.0.0', port=port)

View File

@@ -0,0 +1,38 @@
# -*- mode: python ; coding: utf-8 -*-
a = Analysis(
['tx_slide.py'],
pathex=[],
binaries=[],
datas=[('tx.js', '.'),('ans.js', '.')],
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
noarchive=False,
optimize=0,
)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.datas,
[],
name='tx_slide',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)