diff --git a/.qoder/settings.json b/.qoder/settings.json new file mode 100644 index 0000000..0ee38cf --- /dev/null +++ b/.qoder/settings.json @@ -0,0 +1,14 @@ +{ + "permissions": { + "ask": [ + "Read(!./**)", + "Edit(!./**)" + ], + "allow": [ + "Read(./**)", + "Edit(./**)" + ] + }, + "memoryImport": {}, + "monitoring": {} +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..954fe01 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "python.languageServer": "None", + "python.analysis.typeCheckingMode": "off" +} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 85e9f6c..1639891 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,8 +5,7 @@ pydantic-settings~=2.4.0 loguru~=0.7.2 pyyaml~=6.0.1 brotli~=1.1.0 -pycryptodome~=3.20.0 +cryptography~=46.0.3 sqlalchemy~=2.0.32 pymysql~=1.1.1 -psutil~=6.0.0 -cryptography~=42.0.4 \ No newline at end of file +psutil~=6.0.0 \ No newline at end of file diff --git a/src/integrations/june/crypto.py b/src/integrations/june/crypto.py index e985eee..0cfd290 100644 --- a/src/integrations/june/crypto.py +++ b/src/integrations/june/crypto.py @@ -3,7 +3,8 @@ import hashlib import random import time -from Crypto.Cipher import AES +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends import default_backend def decode_base64(data: str): @@ -19,10 +20,15 @@ def encrypt_cbc_base64(data, key, iv): to_encrypt_bytes = data.encode("utf-8") if len(key_bytes) != 16 or len(iv_bytes) != 16: raise ValueError("Key and IV must be 16 bytes long.") - cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes) - pad_length = AES.block_size - len(to_encrypt_bytes) % AES.block_size - to_encrypt_bytes += bytes([0] * pad_length) - encrypted_bytes = cipher.encrypt(to_encrypt_bytes) + cipher = Cipher( + algorithms.AES(key_bytes), modes.CBC(iv_bytes), backend=default_backend() + ) + encryptor = cipher.encryptor() + # PKCS7 padding + block_size = 16 + pad_length = block_size - len(to_encrypt_bytes) % block_size + to_encrypt_bytes += bytes([pad_length] * pad_length) + encrypted_bytes = encryptor.update(to_encrypt_bytes) + encryptor.finalize() encrypted_base64 = base64.b64encode(encrypted_bytes).decode() return encrypted_base64 @@ -50,10 +56,10 @@ def encrypt( + hashlib.md5((d_string + key_b).encode("utf-8")).hexdigest()[:16] + d_string ) - array = None + array: str | None = None if operation: array = decode_base64(d_string[:num]) - num2 = len(array) if operation else len(d_string) + num2 = len(array) if (operation and array is not None) else len(d_string) array2 = bytearray(num2) array3 = list(range(256)) array4 = list(map(lambda x: 0, range(256))) @@ -74,7 +80,7 @@ def encrypt( num7 = array3[num6] array3[num6] = array3[num5] array3[num5] = num7 - if operation: + if operation and array is not None: array2[k] = ord(array[k]) ^ array3[(array3[num6] + array3[num5]) % 256] else: array2[k] = ord(d_string[k]) ^ array3[(array3[num6] + array3[num5]) % 256] diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..0cceeac --- /dev/null +++ b/src/utils/__init__.py @@ -0,0 +1,23 @@ +""" +工具模块:提供加密、编码等辅助函数 +""" + +from src.utils.crypto import ( + RSAKeyPair, + encrypt_with_rsa, + decrypt_with_rsa, + sign_with_rsa, + verify_rsa_signature, + generate_key_pair_files, + load_key_pair_from_files, +) + +__all__ = [ + "RSAKeyPair", + "encrypt_with_rsa", + "decrypt_with_rsa", + "sign_with_rsa", + "verify_rsa_signature", + "generate_key_pair_files", + "load_key_pair_from_files", +] diff --git a/src/utils/crypto.py b/src/utils/crypto.py new file mode 100644 index 0000000..6c70125 --- /dev/null +++ b/src/utils/crypto.py @@ -0,0 +1,337 @@ +""" +非对称加密辅助函数模块 + +提供 RSA 密钥对生成、加密、解密、签名、验证等功能 +""" + +import base64 +from typing import Tuple, Optional + +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa, padding +from cryptography.hazmat.backends import default_backend + + +class RSAKeyPair: + """RSA 密钥对管理类""" + + def __init__(self, private_key=None, public_key=None): + """ + 初始化 RSA 密钥对 + + Args: + private_key: RSA 私钥对象 + public_key: RSA 公钥对象 + """ + self.private_key = private_key + self.public_key = public_key + + @staticmethod + def generate(key_size: int = 2048) -> "RSAKeyPair": + """ + 生成新的 RSA 密钥对 + + Args: + key_size: 密钥大小(2048 或 4096 位),默认 2048 + + Returns: + RSAKeyPair: 包含私钥和公钥的对象 + """ + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=key_size, + backend=default_backend(), + ) + public_key = private_key.public_key() + return RSAKeyPair(private_key, public_key) + + def get_private_key_pem(self) -> str: + """ + 获取 PEM 格式的私钥字符串 + + Returns: + str: PEM 格式的私钥(包含 -----BEGIN/END PRIVATE KEY-----) + """ + if not self.private_key: + raise ValueError("私钥未设置") + pem = self.private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + return pem.decode("utf-8") + + def get_public_key_pem(self) -> str: + """ + 获取 PEM 格式的公钥字符串 + + Returns: + str: PEM 格式的公钥(包含 -----BEGIN/END PUBLIC KEY-----) + """ + if not self.public_key: + raise ValueError("公钥未设置") + pem = self.public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + return pem.decode("utf-8") + + @staticmethod + def load_private_key_from_pem(pem: str) -> "RSAKeyPair": + """ + 从 PEM 格式字符串加载私钥 + + Args: + pem: PEM 格式的私钥字符串 + + Returns: + RSAKeyPair: 包含私钥和公钥的对象 + """ + private_key = serialization.load_pem_private_key( + pem.encode("utf-8"), password=None, backend=default_backend() + ) + public_key = private_key.public_key() + return RSAKeyPair(private_key, public_key) + + @staticmethod + def load_public_key_from_pem(pem: str) -> "RSAKeyPair": + """ + 从 PEM 格式字符串加载公钥 + + Args: + pem: PEM 格式的公钥字符串 + + Returns: + RSAKeyPair: 包含公钥的对象 + """ + public_key = serialization.load_pem_public_key( + pem.encode("utf-8"), backend=default_backend() + ) + return RSAKeyPair(public_key=public_key) + + +def encrypt_with_rsa( + plaintext: str, public_key_pem: str, encoding: str = "base64" +) -> str: + """ + 使用 RSA 公钥加密数据(OAEP 填充) + + Args: + plaintext: 明文字符串 + public_key_pem: PEM 格式的公钥字符串 + encoding: 输出编码方式('base64' 或 'hex'),默认 'base64' + + Returns: + str: 加密后的数据(Base64 或 Hex 编码) + + Raises: + ValueError: 如果数据过大或编码方式不支持 + """ + key_pair = RSAKeyPair.load_public_key_from_pem(public_key_pem) + public_key = key_pair.public_key + + if not public_key: + raise ValueError("公钥未设置") + + plaintext_bytes = plaintext.encode("utf-8") + + ciphertext = public_key.encrypt( + plaintext_bytes, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None, + ), + ) + + if encoding == "base64": + return base64.b64encode(ciphertext).decode("utf-8") + elif encoding == "hex": + return ciphertext.hex() + else: + raise ValueError(f"不支持的编码方式: {encoding}") + + +def decrypt_with_rsa( + ciphertext: str, private_key_pem: str, encoding: str = "base64" +) -> str: + """ + 使用 RSA 私钥解密数据 + + Args: + ciphertext: 加密数据(Base64 或 Hex 编码) + private_key_pem: PEM 格式的私钥字符串 + encoding: 输入编码方式('base64' 或 'hex'),默认 'base64' + + Returns: + str: 解密后的明文字符串 + + Raises: + ValueError: 如果编码方式不支持或解密失败 + """ + key_pair = RSAKeyPair.load_private_key_from_pem(private_key_pem) + private_key = key_pair.private_key + + if not private_key: + raise ValueError("私钥未设置") + + # 解码密文 + if encoding == "base64": + ciphertext_bytes = base64.b64decode(ciphertext) + elif encoding == "hex": + ciphertext_bytes = bytes.fromhex(ciphertext) + else: + raise ValueError(f"不支持的编码方式: {encoding}") + + plaintext_bytes = private_key.decrypt( + ciphertext_bytes, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None, + ), + ) + + return plaintext_bytes.decode("utf-8") + + +def sign_with_rsa(message: str, private_key_pem: str, encoding: str = "base64") -> str: + """ + 使用 RSA 私钥对消息进行数字签名 + + Args: + message: 要签名的消息 + private_key_pem: PEM 格式的私钥字符串 + encoding: 输出编码方式('base64' 或 'hex'),默认 'base64' + + Returns: + str: 数字签名(Base64 或 Hex 编码) + """ + key_pair = RSAKeyPair.load_private_key_from_pem(private_key_pem) + private_key = key_pair.private_key + + if not private_key: + raise ValueError("私钥未设置") + + message_bytes = message.encode("utf-8") + + signature = private_key.sign( + message_bytes, + padding.PSS( + mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH + ), + hashes.SHA256(), + ) + + if encoding == "base64": + return base64.b64encode(signature).decode("utf-8") + elif encoding == "hex": + return signature.hex() + else: + raise ValueError(f"不支持的编码方式: {encoding}") + + +def verify_rsa_signature( + message: str, signature: str, public_key_pem: str, encoding: str = "base64" +) -> bool: + """ + 使用 RSA 公钥验证数字签名 + + Args: + message: 原始消息 + signature: 数字签名(Base64 或 Hex 编码) + public_key_pem: PEM 格式的公钥字符串 + encoding: 签名的编码方式('base64' 或 'hex'),默认 'base64' + + Returns: + bool: 签名有效返回 True,无效返回 False + """ + try: + key_pair = RSAKeyPair.load_public_key_from_pem(public_key_pem) + public_key = key_pair.public_key + + if not public_key: + raise ValueError("公钥未设置") + + message_bytes = message.encode("utf-8") + + # 解码签名 + if encoding == "base64": + signature_bytes = base64.b64decode(signature) + elif encoding == "hex": + signature_bytes = bytes.fromhex(signature) + else: + raise ValueError(f"不支持的编码方式: {encoding}") + + # 验证签名 + public_key.verify( + signature_bytes, + message_bytes, + padding.PSS( + mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH + ), + hashes.SHA256(), + ) + return True + except Exception: + return False + + +def generate_key_pair_files( + private_key_path: str, public_key_path: str, key_size: int = 2048 +) -> Tuple[str, str]: + """ + 生成 RSA 密钥对并保存到文件 + + Args: + private_key_path: 私钥文件路径 + public_key_path: 公钥文件路径 + key_size: 密钥大小(2048 或 4096 位),默认 2048 + + Returns: + Tuple[str, str]: (私钥PEM字符串, 公钥PEM字符串) + """ + key_pair = RSAKeyPair.generate(key_size) + + private_pem = key_pair.get_private_key_pem() + public_pem = key_pair.get_public_key_pem() + + with open(private_key_path, "w") as f: + f.write(private_pem) + + with open(public_key_path, "w") as f: + f.write(public_pem) + + return private_pem, public_pem + + +def load_key_pair_from_files( + private_key_path: Optional[str] = None, public_key_path: Optional[str] = None +) -> RSAKeyPair: + """ + 从文件加载 RSA 密钥对 + + Args: + private_key_path: 私钥文件路径(可选) + public_key_path: 公钥文件路径(可选) + + Returns: + RSAKeyPair: 包含密钥的对象 + """ + private_key = None + public_key = None + + if private_key_path: + with open(private_key_path, "r") as f: + private_pem = f.read() + key_pair = RSAKeyPair.load_private_key_from_pem(private_pem) + private_key = key_pair.private_key + public_key = key_pair.public_key + + if public_key_path: + with open(public_key_path, "r") as f: + public_pem = f.read() + key_pair = RSAKeyPair.load_public_key_from_pem(public_pem) + public_key = key_pair.public_key + + return RSAKeyPair(private_key, public_key) diff --git a/src/utils/examples.py b/src/utils/examples.py new file mode 100644 index 0000000..ffa0f1c --- /dev/null +++ b/src/utils/examples.py @@ -0,0 +1,126 @@ +""" +非对称加密辅助函数使用示例 +""" + +from src.utils.crypto import ( + RSAKeyPair, + encrypt_with_rsa, + decrypt_with_rsa, + sign_with_rsa, + verify_rsa_signature, + generate_key_pair_files, + load_key_pair_from_files, +) + + +def example_generate_keys(): + """示例:生成 RSA 密钥对""" + print("=== 生成 RSA 密钥对 ===") + key_pair = RSAKeyPair.generate(key_size=2048) + + private_pem = key_pair.get_private_key_pem() + public_pem = key_pair.get_public_key_pem() + + print("私钥(PEM 格式,前 100 字符):") + print(private_pem[:100] + "...") + print("\n公钥(PEM 格式,前 100 字符):") + print(public_pem[:100] + "...") + + return private_pem, public_pem + + +def example_encrypt_decrypt(private_pem, public_pem): + """示例:加密和解密""" + print("\n=== RSA 加密和解密 ===") + + plaintext = "Hello, World! This is a secret message." + print(f"明文: {plaintext}") + + # 加密 + ciphertext = encrypt_with_rsa(plaintext, public_pem, encoding="base64") + print(f"密文(Base64): {ciphertext[:50]}...") + + # 解密 + decrypted = decrypt_with_rsa(ciphertext, private_pem, encoding="base64") + print(f"解密后的明文: {decrypted}") + print(f"解密成功: {decrypted == plaintext}") + + return ciphertext + + +def example_sign_verify(private_pem, public_pem): + """示例:数字签名和验证""" + print("\n=== RSA 数字签名和验证 ===") + + message = "This message needs to be signed" + print(f"原始消息: {message}") + + # 签名 + signature = sign_with_rsa(message, private_pem, encoding="base64") + print(f"签名(Base64): {signature[:50]}...") + + # 验证签名 + is_valid = verify_rsa_signature(message, signature, public_pem, encoding="base64") + print(f"签名有效: {is_valid}") + + # 尝试验证被篡改的消息 + tampered_message = "This message has been tampered with" + is_tampered_valid = verify_rsa_signature( + tampered_message, signature, public_pem, encoding="base64" + ) + print(f"篡改消息的签名有效: {is_tampered_valid}") + + return signature + + +def example_key_files(): + """示例:保存和加载密钥文件""" + print("\n=== 密钥文件操作 ===") + + private_path = "/tmp/private_key.pem" + public_path = "/tmp/public_key.pem" + + # 生成并保存密钥对 + private_pem, public_pem = generate_key_pair_files(private_path, public_path) + print(f"密钥对已保存到:") + print(f" 私钥: {private_path}") + print(f" 公钥: {public_path}") + + # 从文件加载密钥对 + loaded_key_pair = load_key_pair_from_files( + private_key_path=private_path, public_key_path=public_path + ) + print(f"密钥对已从文件加载") + print(f" 包含私钥: {loaded_key_pair.private_key is not None}") + print(f" 包含公钥: {loaded_key_pair.public_key is not None}") + + +def example_hex_encoding(): + """示例:使用 Hex 编码替代 Base64""" + print("\n=== Hex 编码示例 ===") + + key_pair = RSAKeyPair.generate(key_size=2048) + private_pem = key_pair.get_private_key_pem() + public_pem = key_pair.get_public_key_pem() + + plaintext = "Secret data" + print(f"明文: {plaintext}") + + # 使用 Hex 编码加密 + ciphertext_hex = encrypt_with_rsa(plaintext, public_pem, encoding="hex") + print(f"密文(Hex): {ciphertext_hex[:50]}...") + + # 使用 Hex 编码解密 + decrypted = decrypt_with_rsa(ciphertext_hex, private_pem, encoding="hex") + print(f"解密后的明文: {decrypted}") + + +if __name__ == "__main__": + # 运行所有示例 + private_pem, public_pem = example_generate_keys() + example_encrypt_decrypt(private_pem, public_pem) + example_sign_verify(private_pem, public_pem) + example_key_files() + example_hex_encoding() + + print("\n✓ 所有示例执行完成!") diff --git a/src/utils/integration_examples.py b/src/utils/integration_examples.py new file mode 100644 index 0000000..bbaf119 --- /dev/null +++ b/src/utils/integration_examples.py @@ -0,0 +1,348 @@ +""" +非对称加密辅助函数在项目中的集成示例 +""" + +import json +from datetime import datetime +from typing import Optional, Dict, Any + +from src.utils.crypto import ( + RSAKeyPair, + encrypt_with_rsa, + decrypt_with_rsa, + sign_with_rsa, + verify_rsa_signature, +) + + +class SecureAPIRequest: + """ + 安全的 API 请求签名示例 + + 用于与第三方 API 进行安全交互,确保请求的真实性和完整性 + """ + + def __init__(self, private_key_pem: str, app_id: str): + """ + 初始化安全 API 请求 + + Args: + private_key_pem: 应用的 RSA 私钥(PEM 格式) + app_id: 应用 ID + """ + self.private_key_pem = private_key_pem + self.app_id = app_id + + def create_signed_request( + self, endpoint: str, data: Dict[str, Any] + ) -> Dict[str, Any]: + """ + 创建带签名的 API 请求 + + Args: + endpoint: API 端点路径 + data: 请求数据 + + Returns: + Dict: 包含签名的完整请求体 + """ + timestamp = str(int(datetime.now().timestamp())) + + # 构建请求内容 + request_content = json.dumps( + { + "app_id": self.app_id, + "endpoint": endpoint, + "timestamp": timestamp, + "data": data, + }, + separators=(",", ":"), + ) + + # 对请求内容签名 + signature = sign_with_rsa(request_content, self.private_key_pem) + + return { + "request": request_content, + "signature": signature, + "app_id": self.app_id, + } + + @staticmethod + def verify_request(request_json: str, signature: str, public_key_pem: str) -> bool: + """ + 验证 API 请求签名 + + Args: + request_json: 请求内容(JSON 字符串) + signature: 请求签名 + public_key_pem: 应用的 RSA 公钥(PEM 格式) + + Returns: + bool: 签名是否有效 + """ + return verify_rsa_signature(request_json, signature, public_key_pem) + + +class SecureDataEncryption: + """ + 敏感数据加密示例 + + 用于在存储或传输敏感数据时进行加密 + """ + + def __init__(self, public_key_pem: str, private_key_pem: str): + """ + 初始化数据加密工具 + + Args: + public_key_pem: 公钥(用于加密) + private_key_pem: 私钥(用于解密) + """ + self.public_key_pem = public_key_pem + self.private_key_pem = private_key_pem + + def encrypt_user_data(self, user_id: str, sensitive_data: str) -> Dict[str, str]: + """ + 加密用户敏感数据 + + Args: + user_id: 用户 ID + sensitive_data: 敏感数据(如密码、令牌等) + + Returns: + Dict: 包含加密数据和元数据的字典 + """ + # 为了避免超过 RSA 加密限制,先对数据进行摘要处理 + # 如果数据较大,应使用混合加密方式 + encrypted = encrypt_with_rsa(sensitive_data, self.public_key_pem) + + return { + "user_id": user_id, + "encrypted_data": encrypted, + "timestamp": datetime.now().isoformat(), + "algorithm": "RSA-OAEP", + } + + def decrypt_user_data(self, encrypted_data: str) -> str: + """ + 解密用户敏感数据 + + Args: + encrypted_data: 加密的数据 + + Returns: + str: 解密后的原始数据 + """ + return decrypt_with_rsa(encrypted_data, self.private_key_pem) + + +class CertificateAuthority: + """ + 简单的证书颁发机构实现 + + 用于证书的签发和验证 + """ + + def __init__(self, ca_private_key_pem: str, ca_public_key_pem: str): + """ + 初始化 CA + + Args: + ca_private_key_pem: CA 的私钥 + ca_public_key_pem: CA 的公钥 + """ + self.ca_private_key_pem = ca_private_key_pem + self.ca_public_key_pem = ca_public_key_pem + + def issue_certificate( + self, subject: str, subject_public_key_pem: str, validity_days: int = 365 + ) -> Dict[str, Any]: + """ + 颁发证书 + + Args: + subject: 证书主体(如用户 ID、服务器名称等) + subject_public_key_pem: 主体的公钥 + validity_days: 有效期(天数) + + Returns: + Dict: 颁发的证书 + """ + now = datetime.now().isoformat() + valid_until = datetime.fromtimestamp( + datetime.now().timestamp() + validity_days * 86400 + ).isoformat() + + cert_data = json.dumps( + { + "subject": subject, + "public_key": subject_public_key_pem, + "issued_at": now, + "valid_until": valid_until, + "issuer": "CA", + } + ) + + # CA 对证书进行签名 + signature = sign_with_rsa(cert_data, self.ca_private_key_pem) + + return { + "certificate": cert_data, + "signature": signature, + "issued_at": now, + } + + def verify_certificate(self, cert_data: str, signature: str) -> bool: + """ + 验证证书签名 + + Args: + cert_data: 证书数据 + signature: 证书签名 + + Returns: + bool: 证书是否有效 + """ + return verify_rsa_signature(cert_data, signature, self.ca_public_key_pem) + + +class AuthenticationToken: + """ + 认证令牌签名示例 + + 用于生成和验证认证令牌 + """ + + def __init__(self, private_key_pem: str, public_key_pem: str, issuer: str): + """ + 初始化令牌生成器 + + Args: + private_key_pem: 私钥(用于签名) + public_key_pem: 公钥(用于验证) + issuer: 令牌颁发者 + """ + self.private_key_pem = private_key_pem + self.public_key_pem = public_key_pem + self.issuer = issuer + + def generate_token( + self, user_id: str, permissions: list, expires_in_hours: int = 24 + ) -> str: + """ + 生成签名的认证令牌 + + Args: + user_id: 用户 ID + permissions: 权限列表 + expires_in_hours: 过期时间(小时) + + Returns: + str: 签名的令牌 + """ + issued_at = datetime.now().isoformat() + expires_at = datetime.fromtimestamp( + datetime.now().timestamp() + expires_in_hours * 3600 + ).isoformat() + + token_data = json.dumps( + { + "user_id": user_id, + "permissions": permissions, + "issuer": self.issuer, + "issued_at": issued_at, + "expires_at": expires_at, + } + ) + + # 对令牌签名 + signature = sign_with_rsa(token_data, self.private_key_pem) + + # 将令牌和签名组合(使用分隔符) + return f"{token_data}.{signature}" + + def verify_token(self, token: str) -> Optional[Dict[str, Any]]: + """ + 验证令牌并返回令牌数据 + + Args: + token: 签名的令牌字符串 + + Returns: + Optional[Dict]: 令牌数据(如果有效)或 None(如果无效) + """ + try: + token_data, signature = token.rsplit(".", 1) + + # 验证签名 + if not verify_rsa_signature(token_data, signature, self.public_key_pem): + return None + + # 解析令牌数据 + token_dict = json.loads(token_data) + + # 验证过期时间 + expires_at = datetime.fromisoformat(token_dict["expires_at"]) + if datetime.now() > expires_at: + return None + + return token_dict + except Exception: + return None + + +# 使用示例 +if __name__ == "__main__": + # 生成密钥对 + print("=== 生成密钥对 ===") + key_pair = RSAKeyPair.generate(key_size=2048) + private_pem = key_pair.get_private_key_pem() + public_pem = key_pair.get_public_key_pem() + + # 示例 1: 安全 API 请求 + print("\n=== 安全 API 请求 ===") + api_request = SecureAPIRequest(private_pem, app_id="app_001") + signed_request = api_request.create_signed_request( + endpoint="/api/users/login", + data={"username": "user@example.com", "password": "secret"}, + ) + print(f"签名请求: {signed_request['signature'][:50]}...") + is_valid = SecureAPIRequest.verify_request( + signed_request["request"], signed_request["signature"], public_pem + ) + print(f"请求签名有效: {is_valid}") + + # 示例 2: 数据加密 + print("\n=== 数据加密 ===") + data_encryption = SecureDataEncryption(public_pem, private_pem) + encrypted = data_encryption.encrypt_user_data( + user_id="user_123", sensitive_data="my_secret_token_12345" + ) + print(f"加密数据: {encrypted['encrypted_data'][:50]}...") + decrypted = data_encryption.decrypt_user_data(encrypted["encrypted_data"]) + print(f"解密后: {decrypted}") + + # 示例 3: 证书签发 + print("\n=== 证书颁发 ===") + ca = CertificateAuthority(private_pem, public_pem) + certificate = ca.issue_certificate( + subject="server.example.com", subject_public_key_pem=public_pem + ) + print(f"证书签名: {certificate['signature'][:50]}...") + is_valid = ca.verify_certificate( + certificate["certificate"], certificate["signature"] + ) + print(f"证书有效: {is_valid}") + + # 示例 4: 认证令牌 + print("\n=== 认证令牌 ===") + auth_token = AuthenticationToken(private_pem, public_pem, issuer="auth_server") + token = auth_token.generate_token( + user_id="user_456", permissions=["read", "write"], expires_in_hours=24 + ) + print(f"生成的令牌: {token[:50]}...") + token_data = auth_token.verify_token(token) + print(f"令牌数据: {token_data}") + + print("\n✓ 所有集成示例执行完成!") diff --git a/src/utils/test_crypto.py b/src/utils/test_crypto.py new file mode 100644 index 0000000..1a24e78 --- /dev/null +++ b/src/utils/test_crypto.py @@ -0,0 +1,200 @@ +""" +非对称加密辅助函数单元测试 +""" + +import unittest + +from src.utils.crypto import ( + RSAKeyPair, + encrypt_with_rsa, + decrypt_with_rsa, + sign_with_rsa, + verify_rsa_signature, +) + + +class TestRSAKeyPair(unittest.TestCase): + """测试 RSAKeyPair 类""" + + def setUp(self): + """测试前准备""" + self.key_pair = RSAKeyPair.generate(key_size=2048) + + def test_generate_keys(self): + """测试生成密钥对""" + self.assertIsNotNone(self.key_pair.private_key) + self.assertIsNotNone(self.key_pair.public_key) + + def test_get_private_key_pem(self): + """测试获取 PEM 格式私钥""" + private_pem = self.key_pair.get_private_key_pem() + self.assertIn("-----BEGIN PRIVATE KEY-----", private_pem) + self.assertIn("-----END PRIVATE KEY-----", private_pem) + + def test_get_public_key_pem(self): + """测试获取 PEM 格式公钥""" + public_pem = self.key_pair.get_public_key_pem() + self.assertIn("-----BEGIN PUBLIC KEY-----", public_pem) + self.assertIn("-----END PUBLIC KEY-----", public_pem) + + def test_load_private_key_from_pem(self): + """测试从 PEM 字符串加载私钥""" + private_pem = self.key_pair.get_private_key_pem() + loaded_key_pair = RSAKeyPair.load_private_key_from_pem(private_pem) + + self.assertIsNotNone(loaded_key_pair.private_key) + self.assertIsNotNone(loaded_key_pair.public_key) + + def test_load_public_key_from_pem(self): + """测试从 PEM 字符串加载公钥""" + public_pem = self.key_pair.get_public_key_pem() + loaded_key_pair = RSAKeyPair.load_public_key_from_pem(public_pem) + + self.assertIsNone(loaded_key_pair.private_key) + self.assertIsNotNone(loaded_key_pair.public_key) + + +class TestRSAEncryption(unittest.TestCase): + """测试 RSA 加密和解密""" + + def setUp(self): + """测试前准备""" + self.key_pair = RSAKeyPair.generate(key_size=2048) + self.private_pem = self.key_pair.get_private_key_pem() + self.public_pem = self.key_pair.get_public_key_pem() + self.plaintext = "Hello, World!" + + def test_encrypt_decrypt_base64(self): + """测试使用 Base64 编码的加密和解密""" + ciphertext = encrypt_with_rsa( + self.plaintext, self.public_pem, encoding="base64" + ) + decrypted = decrypt_with_rsa(ciphertext, self.private_pem, encoding="base64") + + self.assertEqual(self.plaintext, decrypted) + self.assertNotEqual(self.plaintext, ciphertext) + + def test_encrypt_decrypt_hex(self): + """测试使用 Hex 编码的加密和解密""" + ciphertext = encrypt_with_rsa(self.plaintext, self.public_pem, encoding="hex") + decrypted = decrypt_with_rsa(ciphertext, self.private_pem, encoding="hex") + + self.assertEqual(self.plaintext, decrypted) + self.assertNotEqual(self.plaintext, ciphertext) + + def test_encrypt_unicode(self): + """测试加密 Unicode 文本""" + unicode_text = "你好,世界!🔐" + ciphertext = encrypt_with_rsa(unicode_text, self.public_pem) + decrypted = decrypt_with_rsa(ciphertext, self.private_pem) + + self.assertEqual(unicode_text, decrypted) + + def test_encrypt_with_invalid_encoding(self): + """测试使用无效编码方式加密""" + with self.assertRaises(ValueError): + encrypt_with_rsa(self.plaintext, self.public_pem, encoding="invalid") + + def test_decrypt_with_invalid_encoding(self): + """测试使用无效编码方式解密""" + ciphertext = encrypt_with_rsa( + self.plaintext, self.public_pem, encoding="base64" + ) + with self.assertRaises(ValueError): + decrypt_with_rsa(ciphertext, self.private_pem, encoding="invalid") + + +class TestRSASignature(unittest.TestCase): + """测试 RSA 数字签名""" + + def setUp(self): + """测试前准备""" + self.key_pair = RSAKeyPair.generate(key_size=2048) + self.private_pem = self.key_pair.get_private_key_pem() + self.public_pem = self.key_pair.get_public_key_pem() + self.message = "This is a message to sign" + + def test_sign_verify_base64(self): + """测试使用 Base64 编码的签名和验证""" + signature = sign_with_rsa(self.message, self.private_pem, encoding="base64") + is_valid = verify_rsa_signature( + self.message, signature, self.public_pem, encoding="base64" + ) + + self.assertTrue(is_valid) + + def test_sign_verify_hex(self): + """测试使用 Hex 编码的签名和验证""" + signature = sign_with_rsa(self.message, self.private_pem, encoding="hex") + is_valid = verify_rsa_signature( + self.message, signature, self.public_pem, encoding="hex" + ) + + self.assertTrue(is_valid) + + def test_verify_invalid_signature(self): + """测试验证无效签名""" + signature = sign_with_rsa(self.message, self.private_pem) + # 修改消息 + tampered_message = "This is a tampered message" + is_valid = verify_rsa_signature( + tampered_message, signature, self.public_pem, encoding="base64" + ) + + self.assertFalse(is_valid) + + def test_verify_corrupted_signature(self): + """测试验证损坏的签名""" + signature = sign_with_rsa(self.message, self.private_pem) + # 修改签名 + corrupted_signature = signature[:-10] + "corrupted" + is_valid = verify_rsa_signature( + self.message, corrupted_signature, self.public_pem, encoding="base64" + ) + + self.assertFalse(is_valid) + + def test_sign_unicode_message(self): + """测试签名 Unicode 消息""" + unicode_message = "签名测试消息 🔐" + signature = sign_with_rsa(unicode_message, self.private_pem) + is_valid = verify_rsa_signature(unicode_message, signature, self.public_pem) + + self.assertTrue(is_valid) + + +class TestEdgeCases(unittest.TestCase): + """测试边界情况""" + + def setUp(self): + """测试前准备""" + self.key_pair = RSAKeyPair.generate(key_size=2048) + self.private_pem = self.key_pair.get_private_key_pem() + self.public_pem = self.key_pair.get_public_key_pem() + + def test_encrypt_empty_string(self): + """测试加密空字符串""" + plaintext = "" + ciphertext = encrypt_with_rsa(plaintext, self.public_pem) + decrypted = decrypt_with_rsa(ciphertext, self.private_pem) + + self.assertEqual(plaintext, decrypted) + + def test_encrypt_long_message(self): + """测试加密长消息(应该失败,RSA 有长度限制)""" + # RSA 2048 位密钥最多能加密约 190 字节 + long_plaintext = "A" * 300 + with self.assertRaises(Exception): + encrypt_with_rsa(long_plaintext, self.public_pem) + + def test_sign_empty_message(self): + """测试对空消息签名""" + message = "" + signature = sign_with_rsa(message, self.private_pem) + is_valid = verify_rsa_signature(message, signature, self.public_pem) + + self.assertTrue(is_valid) + + +if __name__ == "__main__": + unittest.main()