Merge branch 'production' into develop

This commit is contained in:
danial
2025-11-12 13:46:14 +08:00
9 changed files with 1068 additions and 11 deletions

14
.qoder/settings.json Normal file
View File

@@ -0,0 +1,14 @@
{
"permissions": {
"ask": [
"Read(!./**)",
"Edit(!./**)"
],
"allow": [
"Read(./**)",
"Edit(./**)"
]
},
"memoryImport": {},
"monitoring": {}
}

4
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,4 @@
{
"python.languageServer": "None",
"python.analysis.typeCheckingMode": "off"
}

View File

@@ -5,8 +5,7 @@ pydantic-settings~=2.4.0
loguru~=0.7.2
pyyaml~=6.0.1
brotli~=1.1.0
pycryptodome~=3.20.0
cryptography~=46.0.3
sqlalchemy~=2.0.32
pymysql~=1.1.1
psutil~=6.0.0
cryptography~=42.0.4
psutil~=6.0.0

View File

@@ -3,7 +3,8 @@ import hashlib
import random
import time
from Crypto.Cipher import AES
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
def decode_base64(data: str):
@@ -19,10 +20,15 @@ def encrypt_cbc_base64(data, key, iv):
to_encrypt_bytes = data.encode("utf-8")
if len(key_bytes) != 16 or len(iv_bytes) != 16:
raise ValueError("Key and IV must be 16 bytes long.")
cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)
pad_length = AES.block_size - len(to_encrypt_bytes) % AES.block_size
to_encrypt_bytes += bytes([0] * pad_length)
encrypted_bytes = cipher.encrypt(to_encrypt_bytes)
cipher = Cipher(
algorithms.AES(key_bytes), modes.CBC(iv_bytes), backend=default_backend()
)
encryptor = cipher.encryptor()
# PKCS7 padding
block_size = 16
pad_length = block_size - len(to_encrypt_bytes) % block_size
to_encrypt_bytes += bytes([pad_length] * pad_length)
encrypted_bytes = encryptor.update(to_encrypt_bytes) + encryptor.finalize()
encrypted_base64 = base64.b64encode(encrypted_bytes).decode()
return encrypted_base64
@@ -50,10 +56,10 @@ def encrypt(
+ hashlib.md5((d_string + key_b).encode("utf-8")).hexdigest()[:16]
+ d_string
)
array = None
array: str | None = None
if operation:
array = decode_base64(d_string[:num])
num2 = len(array) if operation else len(d_string)
num2 = len(array) if (operation and array is not None) else len(d_string)
array2 = bytearray(num2)
array3 = list(range(256))
array4 = list(map(lambda x: 0, range(256)))
@@ -74,7 +80,7 @@ def encrypt(
num7 = array3[num6]
array3[num6] = array3[num5]
array3[num5] = num7
if operation:
if operation and array is not None:
array2[k] = ord(array[k]) ^ array3[(array3[num6] + array3[num5]) % 256]
else:
array2[k] = ord(d_string[k]) ^ array3[(array3[num6] + array3[num5]) % 256]

23
src/utils/__init__.py Normal file
View File

@@ -0,0 +1,23 @@
"""
工具模块:提供加密、编码等辅助函数
"""
from src.utils.crypto import (
RSAKeyPair,
encrypt_with_rsa,
decrypt_with_rsa,
sign_with_rsa,
verify_rsa_signature,
generate_key_pair_files,
load_key_pair_from_files,
)
__all__ = [
"RSAKeyPair",
"encrypt_with_rsa",
"decrypt_with_rsa",
"sign_with_rsa",
"verify_rsa_signature",
"generate_key_pair_files",
"load_key_pair_from_files",
]

337
src/utils/crypto.py Normal file
View File

@@ -0,0 +1,337 @@
"""
非对称加密辅助函数模块
提供 RSA 密钥对生成、加密、解密、签名、验证等功能
"""
import base64
from typing import Tuple, Optional
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.backends import default_backend
class RSAKeyPair:
"""RSA 密钥对管理类"""
def __init__(self, private_key=None, public_key=None):
"""
初始化 RSA 密钥对
Args:
private_key: RSA 私钥对象
public_key: RSA 公钥对象
"""
self.private_key = private_key
self.public_key = public_key
@staticmethod
def generate(key_size: int = 2048) -> "RSAKeyPair":
"""
生成新的 RSA 密钥对
Args:
key_size: 密钥大小2048 或 4096 位),默认 2048
Returns:
RSAKeyPair: 包含私钥和公钥的对象
"""
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=default_backend(),
)
public_key = private_key.public_key()
return RSAKeyPair(private_key, public_key)
def get_private_key_pem(self) -> str:
"""
获取 PEM 格式的私钥字符串
Returns:
str: PEM 格式的私钥(包含 -----BEGIN/END PRIVATE KEY-----
"""
if not self.private_key:
raise ValueError("私钥未设置")
pem = self.private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
return pem.decode("utf-8")
def get_public_key_pem(self) -> str:
"""
获取 PEM 格式的公钥字符串
Returns:
str: PEM 格式的公钥(包含 -----BEGIN/END PUBLIC KEY-----
"""
if not self.public_key:
raise ValueError("公钥未设置")
pem = self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
return pem.decode("utf-8")
@staticmethod
def load_private_key_from_pem(pem: str) -> "RSAKeyPair":
"""
从 PEM 格式字符串加载私钥
Args:
pem: PEM 格式的私钥字符串
Returns:
RSAKeyPair: 包含私钥和公钥的对象
"""
private_key = serialization.load_pem_private_key(
pem.encode("utf-8"), password=None, backend=default_backend()
)
public_key = private_key.public_key()
return RSAKeyPair(private_key, public_key)
@staticmethod
def load_public_key_from_pem(pem: str) -> "RSAKeyPair":
"""
从 PEM 格式字符串加载公钥
Args:
pem: PEM 格式的公钥字符串
Returns:
RSAKeyPair: 包含公钥的对象
"""
public_key = serialization.load_pem_public_key(
pem.encode("utf-8"), backend=default_backend()
)
return RSAKeyPair(public_key=public_key)
def encrypt_with_rsa(
plaintext: str, public_key_pem: str, encoding: str = "base64"
) -> str:
"""
使用 RSA 公钥加密数据OAEP 填充)
Args:
plaintext: 明文字符串
public_key_pem: PEM 格式的公钥字符串
encoding: 输出编码方式('base64''hex'),默认 'base64'
Returns:
str: 加密后的数据Base64 或 Hex 编码)
Raises:
ValueError: 如果数据过大或编码方式不支持
"""
key_pair = RSAKeyPair.load_public_key_from_pem(public_key_pem)
public_key = key_pair.public_key
if not public_key:
raise ValueError("公钥未设置")
plaintext_bytes = plaintext.encode("utf-8")
ciphertext = public_key.encrypt(
plaintext_bytes,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
if encoding == "base64":
return base64.b64encode(ciphertext).decode("utf-8")
elif encoding == "hex":
return ciphertext.hex()
else:
raise ValueError(f"不支持的编码方式: {encoding}")
def decrypt_with_rsa(
ciphertext: str, private_key_pem: str, encoding: str = "base64"
) -> str:
"""
使用 RSA 私钥解密数据
Args:
ciphertext: 加密数据Base64 或 Hex 编码)
private_key_pem: PEM 格式的私钥字符串
encoding: 输入编码方式('base64''hex'),默认 'base64'
Returns:
str: 解密后的明文字符串
Raises:
ValueError: 如果编码方式不支持或解密失败
"""
key_pair = RSAKeyPair.load_private_key_from_pem(private_key_pem)
private_key = key_pair.private_key
if not private_key:
raise ValueError("私钥未设置")
# 解码密文
if encoding == "base64":
ciphertext_bytes = base64.b64decode(ciphertext)
elif encoding == "hex":
ciphertext_bytes = bytes.fromhex(ciphertext)
else:
raise ValueError(f"不支持的编码方式: {encoding}")
plaintext_bytes = private_key.decrypt(
ciphertext_bytes,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
return plaintext_bytes.decode("utf-8")
def sign_with_rsa(message: str, private_key_pem: str, encoding: str = "base64") -> str:
"""
使用 RSA 私钥对消息进行数字签名
Args:
message: 要签名的消息
private_key_pem: PEM 格式的私钥字符串
encoding: 输出编码方式('base64''hex'),默认 'base64'
Returns:
str: 数字签名Base64 或 Hex 编码)
"""
key_pair = RSAKeyPair.load_private_key_from_pem(private_key_pem)
private_key = key_pair.private_key
if not private_key:
raise ValueError("私钥未设置")
message_bytes = message.encode("utf-8")
signature = private_key.sign(
message_bytes,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256(),
)
if encoding == "base64":
return base64.b64encode(signature).decode("utf-8")
elif encoding == "hex":
return signature.hex()
else:
raise ValueError(f"不支持的编码方式: {encoding}")
def verify_rsa_signature(
message: str, signature: str, public_key_pem: str, encoding: str = "base64"
) -> bool:
"""
使用 RSA 公钥验证数字签名
Args:
message: 原始消息
signature: 数字签名Base64 或 Hex 编码)
public_key_pem: PEM 格式的公钥字符串
encoding: 签名的编码方式('base64''hex'),默认 'base64'
Returns:
bool: 签名有效返回 True无效返回 False
"""
try:
key_pair = RSAKeyPair.load_public_key_from_pem(public_key_pem)
public_key = key_pair.public_key
if not public_key:
raise ValueError("公钥未设置")
message_bytes = message.encode("utf-8")
# 解码签名
if encoding == "base64":
signature_bytes = base64.b64decode(signature)
elif encoding == "hex":
signature_bytes = bytes.fromhex(signature)
else:
raise ValueError(f"不支持的编码方式: {encoding}")
# 验证签名
public_key.verify(
signature_bytes,
message_bytes,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256(),
)
return True
except Exception:
return False
def generate_key_pair_files(
private_key_path: str, public_key_path: str, key_size: int = 2048
) -> Tuple[str, str]:
"""
生成 RSA 密钥对并保存到文件
Args:
private_key_path: 私钥文件路径
public_key_path: 公钥文件路径
key_size: 密钥大小2048 或 4096 位),默认 2048
Returns:
Tuple[str, str]: (私钥PEM字符串, 公钥PEM字符串)
"""
key_pair = RSAKeyPair.generate(key_size)
private_pem = key_pair.get_private_key_pem()
public_pem = key_pair.get_public_key_pem()
with open(private_key_path, "w") as f:
f.write(private_pem)
with open(public_key_path, "w") as f:
f.write(public_pem)
return private_pem, public_pem
def load_key_pair_from_files(
private_key_path: Optional[str] = None, public_key_path: Optional[str] = None
) -> RSAKeyPair:
"""
从文件加载 RSA 密钥对
Args:
private_key_path: 私钥文件路径(可选)
public_key_path: 公钥文件路径(可选)
Returns:
RSAKeyPair: 包含密钥的对象
"""
private_key = None
public_key = None
if private_key_path:
with open(private_key_path, "r") as f:
private_pem = f.read()
key_pair = RSAKeyPair.load_private_key_from_pem(private_pem)
private_key = key_pair.private_key
public_key = key_pair.public_key
if public_key_path:
with open(public_key_path, "r") as f:
public_pem = f.read()
key_pair = RSAKeyPair.load_public_key_from_pem(public_pem)
public_key = key_pair.public_key
return RSAKeyPair(private_key, public_key)

126
src/utils/examples.py Normal file
View File

@@ -0,0 +1,126 @@
"""
非对称加密辅助函数使用示例
"""
from src.utils.crypto import (
RSAKeyPair,
encrypt_with_rsa,
decrypt_with_rsa,
sign_with_rsa,
verify_rsa_signature,
generate_key_pair_files,
load_key_pair_from_files,
)
def example_generate_keys():
"""示例:生成 RSA 密钥对"""
print("=== 生成 RSA 密钥对 ===")
key_pair = RSAKeyPair.generate(key_size=2048)
private_pem = key_pair.get_private_key_pem()
public_pem = key_pair.get_public_key_pem()
print("私钥PEM 格式,前 100 字符):")
print(private_pem[:100] + "...")
print("\n公钥PEM 格式,前 100 字符):")
print(public_pem[:100] + "...")
return private_pem, public_pem
def example_encrypt_decrypt(private_pem, public_pem):
"""示例:加密和解密"""
print("\n=== RSA 加密和解密 ===")
plaintext = "Hello, World! This is a secret message."
print(f"明文: {plaintext}")
# 加密
ciphertext = encrypt_with_rsa(plaintext, public_pem, encoding="base64")
print(f"密文Base64: {ciphertext[:50]}...")
# 解密
decrypted = decrypt_with_rsa(ciphertext, private_pem, encoding="base64")
print(f"解密后的明文: {decrypted}")
print(f"解密成功: {decrypted == plaintext}")
return ciphertext
def example_sign_verify(private_pem, public_pem):
"""示例:数字签名和验证"""
print("\n=== RSA 数字签名和验证 ===")
message = "This message needs to be signed"
print(f"原始消息: {message}")
# 签名
signature = sign_with_rsa(message, private_pem, encoding="base64")
print(f"签名Base64: {signature[:50]}...")
# 验证签名
is_valid = verify_rsa_signature(message, signature, public_pem, encoding="base64")
print(f"签名有效: {is_valid}")
# 尝试验证被篡改的消息
tampered_message = "This message has been tampered with"
is_tampered_valid = verify_rsa_signature(
tampered_message, signature, public_pem, encoding="base64"
)
print(f"篡改消息的签名有效: {is_tampered_valid}")
return signature
def example_key_files():
"""示例:保存和加载密钥文件"""
print("\n=== 密钥文件操作 ===")
private_path = "/tmp/private_key.pem"
public_path = "/tmp/public_key.pem"
# 生成并保存密钥对
private_pem, public_pem = generate_key_pair_files(private_path, public_path)
print(f"密钥对已保存到:")
print(f" 私钥: {private_path}")
print(f" 公钥: {public_path}")
# 从文件加载密钥对
loaded_key_pair = load_key_pair_from_files(
private_key_path=private_path, public_key_path=public_path
)
print(f"密钥对已从文件加载")
print(f" 包含私钥: {loaded_key_pair.private_key is not None}")
print(f" 包含公钥: {loaded_key_pair.public_key is not None}")
def example_hex_encoding():
"""示例:使用 Hex 编码替代 Base64"""
print("\n=== Hex 编码示例 ===")
key_pair = RSAKeyPair.generate(key_size=2048)
private_pem = key_pair.get_private_key_pem()
public_pem = key_pair.get_public_key_pem()
plaintext = "Secret data"
print(f"明文: {plaintext}")
# 使用 Hex 编码加密
ciphertext_hex = encrypt_with_rsa(plaintext, public_pem, encoding="hex")
print(f"密文Hex: {ciphertext_hex[:50]}...")
# 使用 Hex 编码解密
decrypted = decrypt_with_rsa(ciphertext_hex, private_pem, encoding="hex")
print(f"解密后的明文: {decrypted}")
if __name__ == "__main__":
# 运行所有示例
private_pem, public_pem = example_generate_keys()
example_encrypt_decrypt(private_pem, public_pem)
example_sign_verify(private_pem, public_pem)
example_key_files()
example_hex_encoding()
print("\n✓ 所有示例执行完成!")

View File

@@ -0,0 +1,348 @@
"""
非对称加密辅助函数在项目中的集成示例
"""
import json
from datetime import datetime
from typing import Optional, Dict, Any
from src.utils.crypto import (
RSAKeyPair,
encrypt_with_rsa,
decrypt_with_rsa,
sign_with_rsa,
verify_rsa_signature,
)
class SecureAPIRequest:
"""
安全的 API 请求签名示例
用于与第三方 API 进行安全交互,确保请求的真实性和完整性
"""
def __init__(self, private_key_pem: str, app_id: str):
"""
初始化安全 API 请求
Args:
private_key_pem: 应用的 RSA 私钥PEM 格式)
app_id: 应用 ID
"""
self.private_key_pem = private_key_pem
self.app_id = app_id
def create_signed_request(
self, endpoint: str, data: Dict[str, Any]
) -> Dict[str, Any]:
"""
创建带签名的 API 请求
Args:
endpoint: API 端点路径
data: 请求数据
Returns:
Dict: 包含签名的完整请求体
"""
timestamp = str(int(datetime.now().timestamp()))
# 构建请求内容
request_content = json.dumps(
{
"app_id": self.app_id,
"endpoint": endpoint,
"timestamp": timestamp,
"data": data,
},
separators=(",", ":"),
)
# 对请求内容签名
signature = sign_with_rsa(request_content, self.private_key_pem)
return {
"request": request_content,
"signature": signature,
"app_id": self.app_id,
}
@staticmethod
def verify_request(request_json: str, signature: str, public_key_pem: str) -> bool:
"""
验证 API 请求签名
Args:
request_json: 请求内容JSON 字符串)
signature: 请求签名
public_key_pem: 应用的 RSA 公钥PEM 格式)
Returns:
bool: 签名是否有效
"""
return verify_rsa_signature(request_json, signature, public_key_pem)
class SecureDataEncryption:
"""
敏感数据加密示例
用于在存储或传输敏感数据时进行加密
"""
def __init__(self, public_key_pem: str, private_key_pem: str):
"""
初始化数据加密工具
Args:
public_key_pem: 公钥(用于加密)
private_key_pem: 私钥(用于解密)
"""
self.public_key_pem = public_key_pem
self.private_key_pem = private_key_pem
def encrypt_user_data(self, user_id: str, sensitive_data: str) -> Dict[str, str]:
"""
加密用户敏感数据
Args:
user_id: 用户 ID
sensitive_data: 敏感数据(如密码、令牌等)
Returns:
Dict: 包含加密数据和元数据的字典
"""
# 为了避免超过 RSA 加密限制,先对数据进行摘要处理
# 如果数据较大,应使用混合加密方式
encrypted = encrypt_with_rsa(sensitive_data, self.public_key_pem)
return {
"user_id": user_id,
"encrypted_data": encrypted,
"timestamp": datetime.now().isoformat(),
"algorithm": "RSA-OAEP",
}
def decrypt_user_data(self, encrypted_data: str) -> str:
"""
解密用户敏感数据
Args:
encrypted_data: 加密的数据
Returns:
str: 解密后的原始数据
"""
return decrypt_with_rsa(encrypted_data, self.private_key_pem)
class CertificateAuthority:
"""
简单的证书颁发机构实现
用于证书的签发和验证
"""
def __init__(self, ca_private_key_pem: str, ca_public_key_pem: str):
"""
初始化 CA
Args:
ca_private_key_pem: CA 的私钥
ca_public_key_pem: CA 的公钥
"""
self.ca_private_key_pem = ca_private_key_pem
self.ca_public_key_pem = ca_public_key_pem
def issue_certificate(
self, subject: str, subject_public_key_pem: str, validity_days: int = 365
) -> Dict[str, Any]:
"""
颁发证书
Args:
subject: 证书主体(如用户 ID、服务器名称等
subject_public_key_pem: 主体的公钥
validity_days: 有效期(天数)
Returns:
Dict: 颁发的证书
"""
now = datetime.now().isoformat()
valid_until = datetime.fromtimestamp(
datetime.now().timestamp() + validity_days * 86400
).isoformat()
cert_data = json.dumps(
{
"subject": subject,
"public_key": subject_public_key_pem,
"issued_at": now,
"valid_until": valid_until,
"issuer": "CA",
}
)
# CA 对证书进行签名
signature = sign_with_rsa(cert_data, self.ca_private_key_pem)
return {
"certificate": cert_data,
"signature": signature,
"issued_at": now,
}
def verify_certificate(self, cert_data: str, signature: str) -> bool:
"""
验证证书签名
Args:
cert_data: 证书数据
signature: 证书签名
Returns:
bool: 证书是否有效
"""
return verify_rsa_signature(cert_data, signature, self.ca_public_key_pem)
class AuthenticationToken:
"""
认证令牌签名示例
用于生成和验证认证令牌
"""
def __init__(self, private_key_pem: str, public_key_pem: str, issuer: str):
"""
初始化令牌生成器
Args:
private_key_pem: 私钥(用于签名)
public_key_pem: 公钥(用于验证)
issuer: 令牌颁发者
"""
self.private_key_pem = private_key_pem
self.public_key_pem = public_key_pem
self.issuer = issuer
def generate_token(
self, user_id: str, permissions: list, expires_in_hours: int = 24
) -> str:
"""
生成签名的认证令牌
Args:
user_id: 用户 ID
permissions: 权限列表
expires_in_hours: 过期时间(小时)
Returns:
str: 签名的令牌
"""
issued_at = datetime.now().isoformat()
expires_at = datetime.fromtimestamp(
datetime.now().timestamp() + expires_in_hours * 3600
).isoformat()
token_data = json.dumps(
{
"user_id": user_id,
"permissions": permissions,
"issuer": self.issuer,
"issued_at": issued_at,
"expires_at": expires_at,
}
)
# 对令牌签名
signature = sign_with_rsa(token_data, self.private_key_pem)
# 将令牌和签名组合(使用分隔符)
return f"{token_data}.{signature}"
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
"""
验证令牌并返回令牌数据
Args:
token: 签名的令牌字符串
Returns:
Optional[Dict]: 令牌数据(如果有效)或 None如果无效
"""
try:
token_data, signature = token.rsplit(".", 1)
# 验证签名
if not verify_rsa_signature(token_data, signature, self.public_key_pem):
return None
# 解析令牌数据
token_dict = json.loads(token_data)
# 验证过期时间
expires_at = datetime.fromisoformat(token_dict["expires_at"])
if datetime.now() > expires_at:
return None
return token_dict
except Exception:
return None
# 使用示例
if __name__ == "__main__":
# 生成密钥对
print("=== 生成密钥对 ===")
key_pair = RSAKeyPair.generate(key_size=2048)
private_pem = key_pair.get_private_key_pem()
public_pem = key_pair.get_public_key_pem()
# 示例 1: 安全 API 请求
print("\n=== 安全 API 请求 ===")
api_request = SecureAPIRequest(private_pem, app_id="app_001")
signed_request = api_request.create_signed_request(
endpoint="/api/users/login",
data={"username": "user@example.com", "password": "secret"},
)
print(f"签名请求: {signed_request['signature'][:50]}...")
is_valid = SecureAPIRequest.verify_request(
signed_request["request"], signed_request["signature"], public_pem
)
print(f"请求签名有效: {is_valid}")
# 示例 2: 数据加密
print("\n=== 数据加密 ===")
data_encryption = SecureDataEncryption(public_pem, private_pem)
encrypted = data_encryption.encrypt_user_data(
user_id="user_123", sensitive_data="my_secret_token_12345"
)
print(f"加密数据: {encrypted['encrypted_data'][:50]}...")
decrypted = data_encryption.decrypt_user_data(encrypted["encrypted_data"])
print(f"解密后: {decrypted}")
# 示例 3: 证书签发
print("\n=== 证书颁发 ===")
ca = CertificateAuthority(private_pem, public_pem)
certificate = ca.issue_certificate(
subject="server.example.com", subject_public_key_pem=public_pem
)
print(f"证书签名: {certificate['signature'][:50]}...")
is_valid = ca.verify_certificate(
certificate["certificate"], certificate["signature"]
)
print(f"证书有效: {is_valid}")
# 示例 4: 认证令牌
print("\n=== 认证令牌 ===")
auth_token = AuthenticationToken(private_pem, public_pem, issuer="auth_server")
token = auth_token.generate_token(
user_id="user_456", permissions=["read", "write"], expires_in_hours=24
)
print(f"生成的令牌: {token[:50]}...")
token_data = auth_token.verify_token(token)
print(f"令牌数据: {token_data}")
print("\n✓ 所有集成示例执行完成!")

200
src/utils/test_crypto.py Normal file
View File

@@ -0,0 +1,200 @@
"""
非对称加密辅助函数单元测试
"""
import unittest
from src.utils.crypto import (
RSAKeyPair,
encrypt_with_rsa,
decrypt_with_rsa,
sign_with_rsa,
verify_rsa_signature,
)
class TestRSAKeyPair(unittest.TestCase):
"""测试 RSAKeyPair 类"""
def setUp(self):
"""测试前准备"""
self.key_pair = RSAKeyPair.generate(key_size=2048)
def test_generate_keys(self):
"""测试生成密钥对"""
self.assertIsNotNone(self.key_pair.private_key)
self.assertIsNotNone(self.key_pair.public_key)
def test_get_private_key_pem(self):
"""测试获取 PEM 格式私钥"""
private_pem = self.key_pair.get_private_key_pem()
self.assertIn("-----BEGIN PRIVATE KEY-----", private_pem)
self.assertIn("-----END PRIVATE KEY-----", private_pem)
def test_get_public_key_pem(self):
"""测试获取 PEM 格式公钥"""
public_pem = self.key_pair.get_public_key_pem()
self.assertIn("-----BEGIN PUBLIC KEY-----", public_pem)
self.assertIn("-----END PUBLIC KEY-----", public_pem)
def test_load_private_key_from_pem(self):
"""测试从 PEM 字符串加载私钥"""
private_pem = self.key_pair.get_private_key_pem()
loaded_key_pair = RSAKeyPair.load_private_key_from_pem(private_pem)
self.assertIsNotNone(loaded_key_pair.private_key)
self.assertIsNotNone(loaded_key_pair.public_key)
def test_load_public_key_from_pem(self):
"""测试从 PEM 字符串加载公钥"""
public_pem = self.key_pair.get_public_key_pem()
loaded_key_pair = RSAKeyPair.load_public_key_from_pem(public_pem)
self.assertIsNone(loaded_key_pair.private_key)
self.assertIsNotNone(loaded_key_pair.public_key)
class TestRSAEncryption(unittest.TestCase):
"""测试 RSA 加密和解密"""
def setUp(self):
"""测试前准备"""
self.key_pair = RSAKeyPair.generate(key_size=2048)
self.private_pem = self.key_pair.get_private_key_pem()
self.public_pem = self.key_pair.get_public_key_pem()
self.plaintext = "Hello, World!"
def test_encrypt_decrypt_base64(self):
"""测试使用 Base64 编码的加密和解密"""
ciphertext = encrypt_with_rsa(
self.plaintext, self.public_pem, encoding="base64"
)
decrypted = decrypt_with_rsa(ciphertext, self.private_pem, encoding="base64")
self.assertEqual(self.plaintext, decrypted)
self.assertNotEqual(self.plaintext, ciphertext)
def test_encrypt_decrypt_hex(self):
"""测试使用 Hex 编码的加密和解密"""
ciphertext = encrypt_with_rsa(self.plaintext, self.public_pem, encoding="hex")
decrypted = decrypt_with_rsa(ciphertext, self.private_pem, encoding="hex")
self.assertEqual(self.plaintext, decrypted)
self.assertNotEqual(self.plaintext, ciphertext)
def test_encrypt_unicode(self):
"""测试加密 Unicode 文本"""
unicode_text = "你好,世界!🔐"
ciphertext = encrypt_with_rsa(unicode_text, self.public_pem)
decrypted = decrypt_with_rsa(ciphertext, self.private_pem)
self.assertEqual(unicode_text, decrypted)
def test_encrypt_with_invalid_encoding(self):
"""测试使用无效编码方式加密"""
with self.assertRaises(ValueError):
encrypt_with_rsa(self.plaintext, self.public_pem, encoding="invalid")
def test_decrypt_with_invalid_encoding(self):
"""测试使用无效编码方式解密"""
ciphertext = encrypt_with_rsa(
self.plaintext, self.public_pem, encoding="base64"
)
with self.assertRaises(ValueError):
decrypt_with_rsa(ciphertext, self.private_pem, encoding="invalid")
class TestRSASignature(unittest.TestCase):
"""测试 RSA 数字签名"""
def setUp(self):
"""测试前准备"""
self.key_pair = RSAKeyPair.generate(key_size=2048)
self.private_pem = self.key_pair.get_private_key_pem()
self.public_pem = self.key_pair.get_public_key_pem()
self.message = "This is a message to sign"
def test_sign_verify_base64(self):
"""测试使用 Base64 编码的签名和验证"""
signature = sign_with_rsa(self.message, self.private_pem, encoding="base64")
is_valid = verify_rsa_signature(
self.message, signature, self.public_pem, encoding="base64"
)
self.assertTrue(is_valid)
def test_sign_verify_hex(self):
"""测试使用 Hex 编码的签名和验证"""
signature = sign_with_rsa(self.message, self.private_pem, encoding="hex")
is_valid = verify_rsa_signature(
self.message, signature, self.public_pem, encoding="hex"
)
self.assertTrue(is_valid)
def test_verify_invalid_signature(self):
"""测试验证无效签名"""
signature = sign_with_rsa(self.message, self.private_pem)
# 修改消息
tampered_message = "This is a tampered message"
is_valid = verify_rsa_signature(
tampered_message, signature, self.public_pem, encoding="base64"
)
self.assertFalse(is_valid)
def test_verify_corrupted_signature(self):
"""测试验证损坏的签名"""
signature = sign_with_rsa(self.message, self.private_pem)
# 修改签名
corrupted_signature = signature[:-10] + "corrupted"
is_valid = verify_rsa_signature(
self.message, corrupted_signature, self.public_pem, encoding="base64"
)
self.assertFalse(is_valid)
def test_sign_unicode_message(self):
"""测试签名 Unicode 消息"""
unicode_message = "签名测试消息 🔐"
signature = sign_with_rsa(unicode_message, self.private_pem)
is_valid = verify_rsa_signature(unicode_message, signature, self.public_pem)
self.assertTrue(is_valid)
class TestEdgeCases(unittest.TestCase):
"""测试边界情况"""
def setUp(self):
"""测试前准备"""
self.key_pair = RSAKeyPair.generate(key_size=2048)
self.private_pem = self.key_pair.get_private_key_pem()
self.public_pem = self.key_pair.get_public_key_pem()
def test_encrypt_empty_string(self):
"""测试加密空字符串"""
plaintext = ""
ciphertext = encrypt_with_rsa(plaintext, self.public_pem)
decrypted = decrypt_with_rsa(ciphertext, self.private_pem)
self.assertEqual(plaintext, decrypted)
def test_encrypt_long_message(self):
"""测试加密长消息应该失败RSA 有长度限制)"""
# RSA 2048 位密钥最多能加密约 190 字节
long_plaintext = "A" * 300
with self.assertRaises(Exception):
encrypt_with_rsa(long_plaintext, self.public_pem)
def test_sign_empty_message(self):
"""测试对空消息签名"""
message = ""
signature = sign_with_rsa(message, self.private_pem)
is_valid = verify_rsa_signature(message, signature, self.public_pem)
self.assertTrue(is_valid)
if __name__ == "__main__":
unittest.main()