mirror of
https://git.oceanpay.cc/danial/kami_itunes_third_api.git
synced 2025-12-18 11:06:33 +00:00
feat(utils): 新增非对称加密RSA工具与示例
- 实现RSA密钥对的生成、导出、加载功能 - 新增RSA公钥加密与私钥解密接口,支持Base64和Hex编码 - 实现RSA私钥数字签名及公钥签名验证功能 - 添加RSA密钥对文件读写辅助函数 - 新增非对称加密使用示例,涵盖密钥生成、加解密、签名验证等场景 - 提供项目中集成非对称加密的应用示例,包括安全API请求、数据加密、证书签发与认证令牌 - 添加完善的单元测试覆盖密钥管理、加密解密、签名验证及边界情况 - 调整依赖版本,替换pycryptodome为cryptography包 - 优化部分已有加密代码,改用cryptography库实现AES加密,增强兼容性和安全性
This commit is contained in:
14
.qoder/settings.json
Normal file
14
.qoder/settings.json
Normal file
@@ -0,0 +1,14 @@
|
||||
{
|
||||
"permissions": {
|
||||
"ask": [
|
||||
"Read(!./**)",
|
||||
"Edit(!./**)"
|
||||
],
|
||||
"allow": [
|
||||
"Read(./**)",
|
||||
"Edit(./**)"
|
||||
]
|
||||
},
|
||||
"memoryImport": {},
|
||||
"monitoring": {}
|
||||
}
|
||||
4
.vscode/settings.json
vendored
Normal file
4
.vscode/settings.json
vendored
Normal file
@@ -0,0 +1,4 @@
|
||||
{
|
||||
"python.languageServer": "None",
|
||||
"python.analysis.typeCheckingMode": "off"
|
||||
}
|
||||
@@ -5,8 +5,7 @@ pydantic-settings~=2.4.0
|
||||
loguru~=0.7.2
|
||||
pyyaml~=6.0.1
|
||||
brotli~=1.1.0
|
||||
pycryptodome~=3.20.0
|
||||
cryptography~=46.0.3
|
||||
sqlalchemy~=2.0.32
|
||||
pymysql~=1.1.1
|
||||
psutil~=6.0.0
|
||||
cryptography~=42.0.4
|
||||
psutil~=6.0.0
|
||||
@@ -3,7 +3,8 @@ import hashlib
|
||||
import random
|
||||
import time
|
||||
|
||||
from Crypto.Cipher import AES
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
|
||||
def decode_base64(data: str):
|
||||
@@ -19,10 +20,15 @@ def encrypt_cbc_base64(data, key, iv):
|
||||
to_encrypt_bytes = data.encode("utf-8")
|
||||
if len(key_bytes) != 16 or len(iv_bytes) != 16:
|
||||
raise ValueError("Key and IV must be 16 bytes long.")
|
||||
cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)
|
||||
pad_length = AES.block_size - len(to_encrypt_bytes) % AES.block_size
|
||||
to_encrypt_bytes += bytes([0] * pad_length)
|
||||
encrypted_bytes = cipher.encrypt(to_encrypt_bytes)
|
||||
cipher = Cipher(
|
||||
algorithms.AES(key_bytes), modes.CBC(iv_bytes), backend=default_backend()
|
||||
)
|
||||
encryptor = cipher.encryptor()
|
||||
# PKCS7 padding
|
||||
block_size = 16
|
||||
pad_length = block_size - len(to_encrypt_bytes) % block_size
|
||||
to_encrypt_bytes += bytes([pad_length] * pad_length)
|
||||
encrypted_bytes = encryptor.update(to_encrypt_bytes) + encryptor.finalize()
|
||||
encrypted_base64 = base64.b64encode(encrypted_bytes).decode()
|
||||
return encrypted_base64
|
||||
|
||||
@@ -50,10 +56,10 @@ def encrypt(
|
||||
+ hashlib.md5((d_string + key_b).encode("utf-8")).hexdigest()[:16]
|
||||
+ d_string
|
||||
)
|
||||
array = None
|
||||
array: str | None = None
|
||||
if operation:
|
||||
array = decode_base64(d_string[:num])
|
||||
num2 = len(array) if operation else len(d_string)
|
||||
num2 = len(array) if (operation and array is not None) else len(d_string)
|
||||
array2 = bytearray(num2)
|
||||
array3 = list(range(256))
|
||||
array4 = list(map(lambda x: 0, range(256)))
|
||||
@@ -74,7 +80,7 @@ def encrypt(
|
||||
num7 = array3[num6]
|
||||
array3[num6] = array3[num5]
|
||||
array3[num5] = num7
|
||||
if operation:
|
||||
if operation and array is not None:
|
||||
array2[k] = ord(array[k]) ^ array3[(array3[num6] + array3[num5]) % 256]
|
||||
else:
|
||||
array2[k] = ord(d_string[k]) ^ array3[(array3[num6] + array3[num5]) % 256]
|
||||
|
||||
23
src/utils/__init__.py
Normal file
23
src/utils/__init__.py
Normal file
@@ -0,0 +1,23 @@
|
||||
"""
|
||||
工具模块:提供加密、编码等辅助函数
|
||||
"""
|
||||
|
||||
from src.utils.crypto import (
|
||||
RSAKeyPair,
|
||||
encrypt_with_rsa,
|
||||
decrypt_with_rsa,
|
||||
sign_with_rsa,
|
||||
verify_rsa_signature,
|
||||
generate_key_pair_files,
|
||||
load_key_pair_from_files,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"RSAKeyPair",
|
||||
"encrypt_with_rsa",
|
||||
"decrypt_with_rsa",
|
||||
"sign_with_rsa",
|
||||
"verify_rsa_signature",
|
||||
"generate_key_pair_files",
|
||||
"load_key_pair_from_files",
|
||||
]
|
||||
337
src/utils/crypto.py
Normal file
337
src/utils/crypto.py
Normal file
@@ -0,0 +1,337 @@
|
||||
"""
|
||||
非对称加密辅助函数模块
|
||||
|
||||
提供 RSA 密钥对生成、加密、解密、签名、验证等功能
|
||||
"""
|
||||
|
||||
import base64
|
||||
from typing import Tuple, Optional
|
||||
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
|
||||
class RSAKeyPair:
|
||||
"""RSA 密钥对管理类"""
|
||||
|
||||
def __init__(self, private_key=None, public_key=None):
|
||||
"""
|
||||
初始化 RSA 密钥对
|
||||
|
||||
Args:
|
||||
private_key: RSA 私钥对象
|
||||
public_key: RSA 公钥对象
|
||||
"""
|
||||
self.private_key = private_key
|
||||
self.public_key = public_key
|
||||
|
||||
@staticmethod
|
||||
def generate(key_size: int = 2048) -> "RSAKeyPair":
|
||||
"""
|
||||
生成新的 RSA 密钥对
|
||||
|
||||
Args:
|
||||
key_size: 密钥大小(2048 或 4096 位),默认 2048
|
||||
|
||||
Returns:
|
||||
RSAKeyPair: 包含私钥和公钥的对象
|
||||
"""
|
||||
private_key = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=key_size,
|
||||
backend=default_backend(),
|
||||
)
|
||||
public_key = private_key.public_key()
|
||||
return RSAKeyPair(private_key, public_key)
|
||||
|
||||
def get_private_key_pem(self) -> str:
|
||||
"""
|
||||
获取 PEM 格式的私钥字符串
|
||||
|
||||
Returns:
|
||||
str: PEM 格式的私钥(包含 -----BEGIN/END PRIVATE KEY-----)
|
||||
"""
|
||||
if not self.private_key:
|
||||
raise ValueError("私钥未设置")
|
||||
pem = self.private_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.PKCS8,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
)
|
||||
return pem.decode("utf-8")
|
||||
|
||||
def get_public_key_pem(self) -> str:
|
||||
"""
|
||||
获取 PEM 格式的公钥字符串
|
||||
|
||||
Returns:
|
||||
str: PEM 格式的公钥(包含 -----BEGIN/END PUBLIC KEY-----)
|
||||
"""
|
||||
if not self.public_key:
|
||||
raise ValueError("公钥未设置")
|
||||
pem = self.public_key.public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||
)
|
||||
return pem.decode("utf-8")
|
||||
|
||||
@staticmethod
|
||||
def load_private_key_from_pem(pem: str) -> "RSAKeyPair":
|
||||
"""
|
||||
从 PEM 格式字符串加载私钥
|
||||
|
||||
Args:
|
||||
pem: PEM 格式的私钥字符串
|
||||
|
||||
Returns:
|
||||
RSAKeyPair: 包含私钥和公钥的对象
|
||||
"""
|
||||
private_key = serialization.load_pem_private_key(
|
||||
pem.encode("utf-8"), password=None, backend=default_backend()
|
||||
)
|
||||
public_key = private_key.public_key()
|
||||
return RSAKeyPair(private_key, public_key)
|
||||
|
||||
@staticmethod
|
||||
def load_public_key_from_pem(pem: str) -> "RSAKeyPair":
|
||||
"""
|
||||
从 PEM 格式字符串加载公钥
|
||||
|
||||
Args:
|
||||
pem: PEM 格式的公钥字符串
|
||||
|
||||
Returns:
|
||||
RSAKeyPair: 包含公钥的对象
|
||||
"""
|
||||
public_key = serialization.load_pem_public_key(
|
||||
pem.encode("utf-8"), backend=default_backend()
|
||||
)
|
||||
return RSAKeyPair(public_key=public_key)
|
||||
|
||||
|
||||
def encrypt_with_rsa(
|
||||
plaintext: str, public_key_pem: str, encoding: str = "base64"
|
||||
) -> str:
|
||||
"""
|
||||
使用 RSA 公钥加密数据(OAEP 填充)
|
||||
|
||||
Args:
|
||||
plaintext: 明文字符串
|
||||
public_key_pem: PEM 格式的公钥字符串
|
||||
encoding: 输出编码方式('base64' 或 'hex'),默认 'base64'
|
||||
|
||||
Returns:
|
||||
str: 加密后的数据(Base64 或 Hex 编码)
|
||||
|
||||
Raises:
|
||||
ValueError: 如果数据过大或编码方式不支持
|
||||
"""
|
||||
key_pair = RSAKeyPair.load_public_key_from_pem(public_key_pem)
|
||||
public_key = key_pair.public_key
|
||||
|
||||
if not public_key:
|
||||
raise ValueError("公钥未设置")
|
||||
|
||||
plaintext_bytes = plaintext.encode("utf-8")
|
||||
|
||||
ciphertext = public_key.encrypt(
|
||||
plaintext_bytes,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||
algorithm=hashes.SHA256(),
|
||||
label=None,
|
||||
),
|
||||
)
|
||||
|
||||
if encoding == "base64":
|
||||
return base64.b64encode(ciphertext).decode("utf-8")
|
||||
elif encoding == "hex":
|
||||
return ciphertext.hex()
|
||||
else:
|
||||
raise ValueError(f"不支持的编码方式: {encoding}")
|
||||
|
||||
|
||||
def decrypt_with_rsa(
|
||||
ciphertext: str, private_key_pem: str, encoding: str = "base64"
|
||||
) -> str:
|
||||
"""
|
||||
使用 RSA 私钥解密数据
|
||||
|
||||
Args:
|
||||
ciphertext: 加密数据(Base64 或 Hex 编码)
|
||||
private_key_pem: PEM 格式的私钥字符串
|
||||
encoding: 输入编码方式('base64' 或 'hex'),默认 'base64'
|
||||
|
||||
Returns:
|
||||
str: 解密后的明文字符串
|
||||
|
||||
Raises:
|
||||
ValueError: 如果编码方式不支持或解密失败
|
||||
"""
|
||||
key_pair = RSAKeyPair.load_private_key_from_pem(private_key_pem)
|
||||
private_key = key_pair.private_key
|
||||
|
||||
if not private_key:
|
||||
raise ValueError("私钥未设置")
|
||||
|
||||
# 解码密文
|
||||
if encoding == "base64":
|
||||
ciphertext_bytes = base64.b64decode(ciphertext)
|
||||
elif encoding == "hex":
|
||||
ciphertext_bytes = bytes.fromhex(ciphertext)
|
||||
else:
|
||||
raise ValueError(f"不支持的编码方式: {encoding}")
|
||||
|
||||
plaintext_bytes = private_key.decrypt(
|
||||
ciphertext_bytes,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||
algorithm=hashes.SHA256(),
|
||||
label=None,
|
||||
),
|
||||
)
|
||||
|
||||
return plaintext_bytes.decode("utf-8")
|
||||
|
||||
|
||||
def sign_with_rsa(message: str, private_key_pem: str, encoding: str = "base64") -> str:
|
||||
"""
|
||||
使用 RSA 私钥对消息进行数字签名
|
||||
|
||||
Args:
|
||||
message: 要签名的消息
|
||||
private_key_pem: PEM 格式的私钥字符串
|
||||
encoding: 输出编码方式('base64' 或 'hex'),默认 'base64'
|
||||
|
||||
Returns:
|
||||
str: 数字签名(Base64 或 Hex 编码)
|
||||
"""
|
||||
key_pair = RSAKeyPair.load_private_key_from_pem(private_key_pem)
|
||||
private_key = key_pair.private_key
|
||||
|
||||
if not private_key:
|
||||
raise ValueError("私钥未设置")
|
||||
|
||||
message_bytes = message.encode("utf-8")
|
||||
|
||||
signature = private_key.sign(
|
||||
message_bytes,
|
||||
padding.PSS(
|
||||
mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH
|
||||
),
|
||||
hashes.SHA256(),
|
||||
)
|
||||
|
||||
if encoding == "base64":
|
||||
return base64.b64encode(signature).decode("utf-8")
|
||||
elif encoding == "hex":
|
||||
return signature.hex()
|
||||
else:
|
||||
raise ValueError(f"不支持的编码方式: {encoding}")
|
||||
|
||||
|
||||
def verify_rsa_signature(
|
||||
message: str, signature: str, public_key_pem: str, encoding: str = "base64"
|
||||
) -> bool:
|
||||
"""
|
||||
使用 RSA 公钥验证数字签名
|
||||
|
||||
Args:
|
||||
message: 原始消息
|
||||
signature: 数字签名(Base64 或 Hex 编码)
|
||||
public_key_pem: PEM 格式的公钥字符串
|
||||
encoding: 签名的编码方式('base64' 或 'hex'),默认 'base64'
|
||||
|
||||
Returns:
|
||||
bool: 签名有效返回 True,无效返回 False
|
||||
"""
|
||||
try:
|
||||
key_pair = RSAKeyPair.load_public_key_from_pem(public_key_pem)
|
||||
public_key = key_pair.public_key
|
||||
|
||||
if not public_key:
|
||||
raise ValueError("公钥未设置")
|
||||
|
||||
message_bytes = message.encode("utf-8")
|
||||
|
||||
# 解码签名
|
||||
if encoding == "base64":
|
||||
signature_bytes = base64.b64decode(signature)
|
||||
elif encoding == "hex":
|
||||
signature_bytes = bytes.fromhex(signature)
|
||||
else:
|
||||
raise ValueError(f"不支持的编码方式: {encoding}")
|
||||
|
||||
# 验证签名
|
||||
public_key.verify(
|
||||
signature_bytes,
|
||||
message_bytes,
|
||||
padding.PSS(
|
||||
mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH
|
||||
),
|
||||
hashes.SHA256(),
|
||||
)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def generate_key_pair_files(
|
||||
private_key_path: str, public_key_path: str, key_size: int = 2048
|
||||
) -> Tuple[str, str]:
|
||||
"""
|
||||
生成 RSA 密钥对并保存到文件
|
||||
|
||||
Args:
|
||||
private_key_path: 私钥文件路径
|
||||
public_key_path: 公钥文件路径
|
||||
key_size: 密钥大小(2048 或 4096 位),默认 2048
|
||||
|
||||
Returns:
|
||||
Tuple[str, str]: (私钥PEM字符串, 公钥PEM字符串)
|
||||
"""
|
||||
key_pair = RSAKeyPair.generate(key_size)
|
||||
|
||||
private_pem = key_pair.get_private_key_pem()
|
||||
public_pem = key_pair.get_public_key_pem()
|
||||
|
||||
with open(private_key_path, "w") as f:
|
||||
f.write(private_pem)
|
||||
|
||||
with open(public_key_path, "w") as f:
|
||||
f.write(public_pem)
|
||||
|
||||
return private_pem, public_pem
|
||||
|
||||
|
||||
def load_key_pair_from_files(
|
||||
private_key_path: Optional[str] = None, public_key_path: Optional[str] = None
|
||||
) -> RSAKeyPair:
|
||||
"""
|
||||
从文件加载 RSA 密钥对
|
||||
|
||||
Args:
|
||||
private_key_path: 私钥文件路径(可选)
|
||||
public_key_path: 公钥文件路径(可选)
|
||||
|
||||
Returns:
|
||||
RSAKeyPair: 包含密钥的对象
|
||||
"""
|
||||
private_key = None
|
||||
public_key = None
|
||||
|
||||
if private_key_path:
|
||||
with open(private_key_path, "r") as f:
|
||||
private_pem = f.read()
|
||||
key_pair = RSAKeyPair.load_private_key_from_pem(private_pem)
|
||||
private_key = key_pair.private_key
|
||||
public_key = key_pair.public_key
|
||||
|
||||
if public_key_path:
|
||||
with open(public_key_path, "r") as f:
|
||||
public_pem = f.read()
|
||||
key_pair = RSAKeyPair.load_public_key_from_pem(public_pem)
|
||||
public_key = key_pair.public_key
|
||||
|
||||
return RSAKeyPair(private_key, public_key)
|
||||
126
src/utils/examples.py
Normal file
126
src/utils/examples.py
Normal file
@@ -0,0 +1,126 @@
|
||||
"""
|
||||
非对称加密辅助函数使用示例
|
||||
"""
|
||||
|
||||
from src.utils.crypto import (
|
||||
RSAKeyPair,
|
||||
encrypt_with_rsa,
|
||||
decrypt_with_rsa,
|
||||
sign_with_rsa,
|
||||
verify_rsa_signature,
|
||||
generate_key_pair_files,
|
||||
load_key_pair_from_files,
|
||||
)
|
||||
|
||||
|
||||
def example_generate_keys():
|
||||
"""示例:生成 RSA 密钥对"""
|
||||
print("=== 生成 RSA 密钥对 ===")
|
||||
key_pair = RSAKeyPair.generate(key_size=2048)
|
||||
|
||||
private_pem = key_pair.get_private_key_pem()
|
||||
public_pem = key_pair.get_public_key_pem()
|
||||
|
||||
print("私钥(PEM 格式,前 100 字符):")
|
||||
print(private_pem[:100] + "...")
|
||||
print("\n公钥(PEM 格式,前 100 字符):")
|
||||
print(public_pem[:100] + "...")
|
||||
|
||||
return private_pem, public_pem
|
||||
|
||||
|
||||
def example_encrypt_decrypt(private_pem, public_pem):
|
||||
"""示例:加密和解密"""
|
||||
print("\n=== RSA 加密和解密 ===")
|
||||
|
||||
plaintext = "Hello, World! This is a secret message."
|
||||
print(f"明文: {plaintext}")
|
||||
|
||||
# 加密
|
||||
ciphertext = encrypt_with_rsa(plaintext, public_pem, encoding="base64")
|
||||
print(f"密文(Base64): {ciphertext[:50]}...")
|
||||
|
||||
# 解密
|
||||
decrypted = decrypt_with_rsa(ciphertext, private_pem, encoding="base64")
|
||||
print(f"解密后的明文: {decrypted}")
|
||||
print(f"解密成功: {decrypted == plaintext}")
|
||||
|
||||
return ciphertext
|
||||
|
||||
|
||||
def example_sign_verify(private_pem, public_pem):
|
||||
"""示例:数字签名和验证"""
|
||||
print("\n=== RSA 数字签名和验证 ===")
|
||||
|
||||
message = "This message needs to be signed"
|
||||
print(f"原始消息: {message}")
|
||||
|
||||
# 签名
|
||||
signature = sign_with_rsa(message, private_pem, encoding="base64")
|
||||
print(f"签名(Base64): {signature[:50]}...")
|
||||
|
||||
# 验证签名
|
||||
is_valid = verify_rsa_signature(message, signature, public_pem, encoding="base64")
|
||||
print(f"签名有效: {is_valid}")
|
||||
|
||||
# 尝试验证被篡改的消息
|
||||
tampered_message = "This message has been tampered with"
|
||||
is_tampered_valid = verify_rsa_signature(
|
||||
tampered_message, signature, public_pem, encoding="base64"
|
||||
)
|
||||
print(f"篡改消息的签名有效: {is_tampered_valid}")
|
||||
|
||||
return signature
|
||||
|
||||
|
||||
def example_key_files():
|
||||
"""示例:保存和加载密钥文件"""
|
||||
print("\n=== 密钥文件操作 ===")
|
||||
|
||||
private_path = "/tmp/private_key.pem"
|
||||
public_path = "/tmp/public_key.pem"
|
||||
|
||||
# 生成并保存密钥对
|
||||
private_pem, public_pem = generate_key_pair_files(private_path, public_path)
|
||||
print(f"密钥对已保存到:")
|
||||
print(f" 私钥: {private_path}")
|
||||
print(f" 公钥: {public_path}")
|
||||
|
||||
# 从文件加载密钥对
|
||||
loaded_key_pair = load_key_pair_from_files(
|
||||
private_key_path=private_path, public_key_path=public_path
|
||||
)
|
||||
print(f"密钥对已从文件加载")
|
||||
print(f" 包含私钥: {loaded_key_pair.private_key is not None}")
|
||||
print(f" 包含公钥: {loaded_key_pair.public_key is not None}")
|
||||
|
||||
|
||||
def example_hex_encoding():
|
||||
"""示例:使用 Hex 编码替代 Base64"""
|
||||
print("\n=== Hex 编码示例 ===")
|
||||
|
||||
key_pair = RSAKeyPair.generate(key_size=2048)
|
||||
private_pem = key_pair.get_private_key_pem()
|
||||
public_pem = key_pair.get_public_key_pem()
|
||||
|
||||
plaintext = "Secret data"
|
||||
print(f"明文: {plaintext}")
|
||||
|
||||
# 使用 Hex 编码加密
|
||||
ciphertext_hex = encrypt_with_rsa(plaintext, public_pem, encoding="hex")
|
||||
print(f"密文(Hex): {ciphertext_hex[:50]}...")
|
||||
|
||||
# 使用 Hex 编码解密
|
||||
decrypted = decrypt_with_rsa(ciphertext_hex, private_pem, encoding="hex")
|
||||
print(f"解密后的明文: {decrypted}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 运行所有示例
|
||||
private_pem, public_pem = example_generate_keys()
|
||||
example_encrypt_decrypt(private_pem, public_pem)
|
||||
example_sign_verify(private_pem, public_pem)
|
||||
example_key_files()
|
||||
example_hex_encoding()
|
||||
|
||||
print("\n✓ 所有示例执行完成!")
|
||||
348
src/utils/integration_examples.py
Normal file
348
src/utils/integration_examples.py
Normal file
@@ -0,0 +1,348 @@
|
||||
"""
|
||||
非对称加密辅助函数在项目中的集成示例
|
||||
"""
|
||||
|
||||
import json
|
||||
from datetime import datetime
|
||||
from typing import Optional, Dict, Any
|
||||
|
||||
from src.utils.crypto import (
|
||||
RSAKeyPair,
|
||||
encrypt_with_rsa,
|
||||
decrypt_with_rsa,
|
||||
sign_with_rsa,
|
||||
verify_rsa_signature,
|
||||
)
|
||||
|
||||
|
||||
class SecureAPIRequest:
|
||||
"""
|
||||
安全的 API 请求签名示例
|
||||
|
||||
用于与第三方 API 进行安全交互,确保请求的真实性和完整性
|
||||
"""
|
||||
|
||||
def __init__(self, private_key_pem: str, app_id: str):
|
||||
"""
|
||||
初始化安全 API 请求
|
||||
|
||||
Args:
|
||||
private_key_pem: 应用的 RSA 私钥(PEM 格式)
|
||||
app_id: 应用 ID
|
||||
"""
|
||||
self.private_key_pem = private_key_pem
|
||||
self.app_id = app_id
|
||||
|
||||
def create_signed_request(
|
||||
self, endpoint: str, data: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
创建带签名的 API 请求
|
||||
|
||||
Args:
|
||||
endpoint: API 端点路径
|
||||
data: 请求数据
|
||||
|
||||
Returns:
|
||||
Dict: 包含签名的完整请求体
|
||||
"""
|
||||
timestamp = str(int(datetime.now().timestamp()))
|
||||
|
||||
# 构建请求内容
|
||||
request_content = json.dumps(
|
||||
{
|
||||
"app_id": self.app_id,
|
||||
"endpoint": endpoint,
|
||||
"timestamp": timestamp,
|
||||
"data": data,
|
||||
},
|
||||
separators=(",", ":"),
|
||||
)
|
||||
|
||||
# 对请求内容签名
|
||||
signature = sign_with_rsa(request_content, self.private_key_pem)
|
||||
|
||||
return {
|
||||
"request": request_content,
|
||||
"signature": signature,
|
||||
"app_id": self.app_id,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def verify_request(request_json: str, signature: str, public_key_pem: str) -> bool:
|
||||
"""
|
||||
验证 API 请求签名
|
||||
|
||||
Args:
|
||||
request_json: 请求内容(JSON 字符串)
|
||||
signature: 请求签名
|
||||
public_key_pem: 应用的 RSA 公钥(PEM 格式)
|
||||
|
||||
Returns:
|
||||
bool: 签名是否有效
|
||||
"""
|
||||
return verify_rsa_signature(request_json, signature, public_key_pem)
|
||||
|
||||
|
||||
class SecureDataEncryption:
|
||||
"""
|
||||
敏感数据加密示例
|
||||
|
||||
用于在存储或传输敏感数据时进行加密
|
||||
"""
|
||||
|
||||
def __init__(self, public_key_pem: str, private_key_pem: str):
|
||||
"""
|
||||
初始化数据加密工具
|
||||
|
||||
Args:
|
||||
public_key_pem: 公钥(用于加密)
|
||||
private_key_pem: 私钥(用于解密)
|
||||
"""
|
||||
self.public_key_pem = public_key_pem
|
||||
self.private_key_pem = private_key_pem
|
||||
|
||||
def encrypt_user_data(self, user_id: str, sensitive_data: str) -> Dict[str, str]:
|
||||
"""
|
||||
加密用户敏感数据
|
||||
|
||||
Args:
|
||||
user_id: 用户 ID
|
||||
sensitive_data: 敏感数据(如密码、令牌等)
|
||||
|
||||
Returns:
|
||||
Dict: 包含加密数据和元数据的字典
|
||||
"""
|
||||
# 为了避免超过 RSA 加密限制,先对数据进行摘要处理
|
||||
# 如果数据较大,应使用混合加密方式
|
||||
encrypted = encrypt_with_rsa(sensitive_data, self.public_key_pem)
|
||||
|
||||
return {
|
||||
"user_id": user_id,
|
||||
"encrypted_data": encrypted,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"algorithm": "RSA-OAEP",
|
||||
}
|
||||
|
||||
def decrypt_user_data(self, encrypted_data: str) -> str:
|
||||
"""
|
||||
解密用户敏感数据
|
||||
|
||||
Args:
|
||||
encrypted_data: 加密的数据
|
||||
|
||||
Returns:
|
||||
str: 解密后的原始数据
|
||||
"""
|
||||
return decrypt_with_rsa(encrypted_data, self.private_key_pem)
|
||||
|
||||
|
||||
class CertificateAuthority:
|
||||
"""
|
||||
简单的证书颁发机构实现
|
||||
|
||||
用于证书的签发和验证
|
||||
"""
|
||||
|
||||
def __init__(self, ca_private_key_pem: str, ca_public_key_pem: str):
|
||||
"""
|
||||
初始化 CA
|
||||
|
||||
Args:
|
||||
ca_private_key_pem: CA 的私钥
|
||||
ca_public_key_pem: CA 的公钥
|
||||
"""
|
||||
self.ca_private_key_pem = ca_private_key_pem
|
||||
self.ca_public_key_pem = ca_public_key_pem
|
||||
|
||||
def issue_certificate(
|
||||
self, subject: str, subject_public_key_pem: str, validity_days: int = 365
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
颁发证书
|
||||
|
||||
Args:
|
||||
subject: 证书主体(如用户 ID、服务器名称等)
|
||||
subject_public_key_pem: 主体的公钥
|
||||
validity_days: 有效期(天数)
|
||||
|
||||
Returns:
|
||||
Dict: 颁发的证书
|
||||
"""
|
||||
now = datetime.now().isoformat()
|
||||
valid_until = datetime.fromtimestamp(
|
||||
datetime.now().timestamp() + validity_days * 86400
|
||||
).isoformat()
|
||||
|
||||
cert_data = json.dumps(
|
||||
{
|
||||
"subject": subject,
|
||||
"public_key": subject_public_key_pem,
|
||||
"issued_at": now,
|
||||
"valid_until": valid_until,
|
||||
"issuer": "CA",
|
||||
}
|
||||
)
|
||||
|
||||
# CA 对证书进行签名
|
||||
signature = sign_with_rsa(cert_data, self.ca_private_key_pem)
|
||||
|
||||
return {
|
||||
"certificate": cert_data,
|
||||
"signature": signature,
|
||||
"issued_at": now,
|
||||
}
|
||||
|
||||
def verify_certificate(self, cert_data: str, signature: str) -> bool:
|
||||
"""
|
||||
验证证书签名
|
||||
|
||||
Args:
|
||||
cert_data: 证书数据
|
||||
signature: 证书签名
|
||||
|
||||
Returns:
|
||||
bool: 证书是否有效
|
||||
"""
|
||||
return verify_rsa_signature(cert_data, signature, self.ca_public_key_pem)
|
||||
|
||||
|
||||
class AuthenticationToken:
|
||||
"""
|
||||
认证令牌签名示例
|
||||
|
||||
用于生成和验证认证令牌
|
||||
"""
|
||||
|
||||
def __init__(self, private_key_pem: str, public_key_pem: str, issuer: str):
|
||||
"""
|
||||
初始化令牌生成器
|
||||
|
||||
Args:
|
||||
private_key_pem: 私钥(用于签名)
|
||||
public_key_pem: 公钥(用于验证)
|
||||
issuer: 令牌颁发者
|
||||
"""
|
||||
self.private_key_pem = private_key_pem
|
||||
self.public_key_pem = public_key_pem
|
||||
self.issuer = issuer
|
||||
|
||||
def generate_token(
|
||||
self, user_id: str, permissions: list, expires_in_hours: int = 24
|
||||
) -> str:
|
||||
"""
|
||||
生成签名的认证令牌
|
||||
|
||||
Args:
|
||||
user_id: 用户 ID
|
||||
permissions: 权限列表
|
||||
expires_in_hours: 过期时间(小时)
|
||||
|
||||
Returns:
|
||||
str: 签名的令牌
|
||||
"""
|
||||
issued_at = datetime.now().isoformat()
|
||||
expires_at = datetime.fromtimestamp(
|
||||
datetime.now().timestamp() + expires_in_hours * 3600
|
||||
).isoformat()
|
||||
|
||||
token_data = json.dumps(
|
||||
{
|
||||
"user_id": user_id,
|
||||
"permissions": permissions,
|
||||
"issuer": self.issuer,
|
||||
"issued_at": issued_at,
|
||||
"expires_at": expires_at,
|
||||
}
|
||||
)
|
||||
|
||||
# 对令牌签名
|
||||
signature = sign_with_rsa(token_data, self.private_key_pem)
|
||||
|
||||
# 将令牌和签名组合(使用分隔符)
|
||||
return f"{token_data}.{signature}"
|
||||
|
||||
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
验证令牌并返回令牌数据
|
||||
|
||||
Args:
|
||||
token: 签名的令牌字符串
|
||||
|
||||
Returns:
|
||||
Optional[Dict]: 令牌数据(如果有效)或 None(如果无效)
|
||||
"""
|
||||
try:
|
||||
token_data, signature = token.rsplit(".", 1)
|
||||
|
||||
# 验证签名
|
||||
if not verify_rsa_signature(token_data, signature, self.public_key_pem):
|
||||
return None
|
||||
|
||||
# 解析令牌数据
|
||||
token_dict = json.loads(token_data)
|
||||
|
||||
# 验证过期时间
|
||||
expires_at = datetime.fromisoformat(token_dict["expires_at"])
|
||||
if datetime.now() > expires_at:
|
||||
return None
|
||||
|
||||
return token_dict
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
# 使用示例
|
||||
if __name__ == "__main__":
|
||||
# 生成密钥对
|
||||
print("=== 生成密钥对 ===")
|
||||
key_pair = RSAKeyPair.generate(key_size=2048)
|
||||
private_pem = key_pair.get_private_key_pem()
|
||||
public_pem = key_pair.get_public_key_pem()
|
||||
|
||||
# 示例 1: 安全 API 请求
|
||||
print("\n=== 安全 API 请求 ===")
|
||||
api_request = SecureAPIRequest(private_pem, app_id="app_001")
|
||||
signed_request = api_request.create_signed_request(
|
||||
endpoint="/api/users/login",
|
||||
data={"username": "user@example.com", "password": "secret"},
|
||||
)
|
||||
print(f"签名请求: {signed_request['signature'][:50]}...")
|
||||
is_valid = SecureAPIRequest.verify_request(
|
||||
signed_request["request"], signed_request["signature"], public_pem
|
||||
)
|
||||
print(f"请求签名有效: {is_valid}")
|
||||
|
||||
# 示例 2: 数据加密
|
||||
print("\n=== 数据加密 ===")
|
||||
data_encryption = SecureDataEncryption(public_pem, private_pem)
|
||||
encrypted = data_encryption.encrypt_user_data(
|
||||
user_id="user_123", sensitive_data="my_secret_token_12345"
|
||||
)
|
||||
print(f"加密数据: {encrypted['encrypted_data'][:50]}...")
|
||||
decrypted = data_encryption.decrypt_user_data(encrypted["encrypted_data"])
|
||||
print(f"解密后: {decrypted}")
|
||||
|
||||
# 示例 3: 证书签发
|
||||
print("\n=== 证书颁发 ===")
|
||||
ca = CertificateAuthority(private_pem, public_pem)
|
||||
certificate = ca.issue_certificate(
|
||||
subject="server.example.com", subject_public_key_pem=public_pem
|
||||
)
|
||||
print(f"证书签名: {certificate['signature'][:50]}...")
|
||||
is_valid = ca.verify_certificate(
|
||||
certificate["certificate"], certificate["signature"]
|
||||
)
|
||||
print(f"证书有效: {is_valid}")
|
||||
|
||||
# 示例 4: 认证令牌
|
||||
print("\n=== 认证令牌 ===")
|
||||
auth_token = AuthenticationToken(private_pem, public_pem, issuer="auth_server")
|
||||
token = auth_token.generate_token(
|
||||
user_id="user_456", permissions=["read", "write"], expires_in_hours=24
|
||||
)
|
||||
print(f"生成的令牌: {token[:50]}...")
|
||||
token_data = auth_token.verify_token(token)
|
||||
print(f"令牌数据: {token_data}")
|
||||
|
||||
print("\n✓ 所有集成示例执行完成!")
|
||||
200
src/utils/test_crypto.py
Normal file
200
src/utils/test_crypto.py
Normal file
@@ -0,0 +1,200 @@
|
||||
"""
|
||||
非对称加密辅助函数单元测试
|
||||
"""
|
||||
|
||||
import unittest
|
||||
|
||||
from src.utils.crypto import (
|
||||
RSAKeyPair,
|
||||
encrypt_with_rsa,
|
||||
decrypt_with_rsa,
|
||||
sign_with_rsa,
|
||||
verify_rsa_signature,
|
||||
)
|
||||
|
||||
|
||||
class TestRSAKeyPair(unittest.TestCase):
|
||||
"""测试 RSAKeyPair 类"""
|
||||
|
||||
def setUp(self):
|
||||
"""测试前准备"""
|
||||
self.key_pair = RSAKeyPair.generate(key_size=2048)
|
||||
|
||||
def test_generate_keys(self):
|
||||
"""测试生成密钥对"""
|
||||
self.assertIsNotNone(self.key_pair.private_key)
|
||||
self.assertIsNotNone(self.key_pair.public_key)
|
||||
|
||||
def test_get_private_key_pem(self):
|
||||
"""测试获取 PEM 格式私钥"""
|
||||
private_pem = self.key_pair.get_private_key_pem()
|
||||
self.assertIn("-----BEGIN PRIVATE KEY-----", private_pem)
|
||||
self.assertIn("-----END PRIVATE KEY-----", private_pem)
|
||||
|
||||
def test_get_public_key_pem(self):
|
||||
"""测试获取 PEM 格式公钥"""
|
||||
public_pem = self.key_pair.get_public_key_pem()
|
||||
self.assertIn("-----BEGIN PUBLIC KEY-----", public_pem)
|
||||
self.assertIn("-----END PUBLIC KEY-----", public_pem)
|
||||
|
||||
def test_load_private_key_from_pem(self):
|
||||
"""测试从 PEM 字符串加载私钥"""
|
||||
private_pem = self.key_pair.get_private_key_pem()
|
||||
loaded_key_pair = RSAKeyPair.load_private_key_from_pem(private_pem)
|
||||
|
||||
self.assertIsNotNone(loaded_key_pair.private_key)
|
||||
self.assertIsNotNone(loaded_key_pair.public_key)
|
||||
|
||||
def test_load_public_key_from_pem(self):
|
||||
"""测试从 PEM 字符串加载公钥"""
|
||||
public_pem = self.key_pair.get_public_key_pem()
|
||||
loaded_key_pair = RSAKeyPair.load_public_key_from_pem(public_pem)
|
||||
|
||||
self.assertIsNone(loaded_key_pair.private_key)
|
||||
self.assertIsNotNone(loaded_key_pair.public_key)
|
||||
|
||||
|
||||
class TestRSAEncryption(unittest.TestCase):
|
||||
"""测试 RSA 加密和解密"""
|
||||
|
||||
def setUp(self):
|
||||
"""测试前准备"""
|
||||
self.key_pair = RSAKeyPair.generate(key_size=2048)
|
||||
self.private_pem = self.key_pair.get_private_key_pem()
|
||||
self.public_pem = self.key_pair.get_public_key_pem()
|
||||
self.plaintext = "Hello, World!"
|
||||
|
||||
def test_encrypt_decrypt_base64(self):
|
||||
"""测试使用 Base64 编码的加密和解密"""
|
||||
ciphertext = encrypt_with_rsa(
|
||||
self.plaintext, self.public_pem, encoding="base64"
|
||||
)
|
||||
decrypted = decrypt_with_rsa(ciphertext, self.private_pem, encoding="base64")
|
||||
|
||||
self.assertEqual(self.plaintext, decrypted)
|
||||
self.assertNotEqual(self.plaintext, ciphertext)
|
||||
|
||||
def test_encrypt_decrypt_hex(self):
|
||||
"""测试使用 Hex 编码的加密和解密"""
|
||||
ciphertext = encrypt_with_rsa(self.plaintext, self.public_pem, encoding="hex")
|
||||
decrypted = decrypt_with_rsa(ciphertext, self.private_pem, encoding="hex")
|
||||
|
||||
self.assertEqual(self.plaintext, decrypted)
|
||||
self.assertNotEqual(self.plaintext, ciphertext)
|
||||
|
||||
def test_encrypt_unicode(self):
|
||||
"""测试加密 Unicode 文本"""
|
||||
unicode_text = "你好,世界!🔐"
|
||||
ciphertext = encrypt_with_rsa(unicode_text, self.public_pem)
|
||||
decrypted = decrypt_with_rsa(ciphertext, self.private_pem)
|
||||
|
||||
self.assertEqual(unicode_text, decrypted)
|
||||
|
||||
def test_encrypt_with_invalid_encoding(self):
|
||||
"""测试使用无效编码方式加密"""
|
||||
with self.assertRaises(ValueError):
|
||||
encrypt_with_rsa(self.plaintext, self.public_pem, encoding="invalid")
|
||||
|
||||
def test_decrypt_with_invalid_encoding(self):
|
||||
"""测试使用无效编码方式解密"""
|
||||
ciphertext = encrypt_with_rsa(
|
||||
self.plaintext, self.public_pem, encoding="base64"
|
||||
)
|
||||
with self.assertRaises(ValueError):
|
||||
decrypt_with_rsa(ciphertext, self.private_pem, encoding="invalid")
|
||||
|
||||
|
||||
class TestRSASignature(unittest.TestCase):
|
||||
"""测试 RSA 数字签名"""
|
||||
|
||||
def setUp(self):
|
||||
"""测试前准备"""
|
||||
self.key_pair = RSAKeyPair.generate(key_size=2048)
|
||||
self.private_pem = self.key_pair.get_private_key_pem()
|
||||
self.public_pem = self.key_pair.get_public_key_pem()
|
||||
self.message = "This is a message to sign"
|
||||
|
||||
def test_sign_verify_base64(self):
|
||||
"""测试使用 Base64 编码的签名和验证"""
|
||||
signature = sign_with_rsa(self.message, self.private_pem, encoding="base64")
|
||||
is_valid = verify_rsa_signature(
|
||||
self.message, signature, self.public_pem, encoding="base64"
|
||||
)
|
||||
|
||||
self.assertTrue(is_valid)
|
||||
|
||||
def test_sign_verify_hex(self):
|
||||
"""测试使用 Hex 编码的签名和验证"""
|
||||
signature = sign_with_rsa(self.message, self.private_pem, encoding="hex")
|
||||
is_valid = verify_rsa_signature(
|
||||
self.message, signature, self.public_pem, encoding="hex"
|
||||
)
|
||||
|
||||
self.assertTrue(is_valid)
|
||||
|
||||
def test_verify_invalid_signature(self):
|
||||
"""测试验证无效签名"""
|
||||
signature = sign_with_rsa(self.message, self.private_pem)
|
||||
# 修改消息
|
||||
tampered_message = "This is a tampered message"
|
||||
is_valid = verify_rsa_signature(
|
||||
tampered_message, signature, self.public_pem, encoding="base64"
|
||||
)
|
||||
|
||||
self.assertFalse(is_valid)
|
||||
|
||||
def test_verify_corrupted_signature(self):
|
||||
"""测试验证损坏的签名"""
|
||||
signature = sign_with_rsa(self.message, self.private_pem)
|
||||
# 修改签名
|
||||
corrupted_signature = signature[:-10] + "corrupted"
|
||||
is_valid = verify_rsa_signature(
|
||||
self.message, corrupted_signature, self.public_pem, encoding="base64"
|
||||
)
|
||||
|
||||
self.assertFalse(is_valid)
|
||||
|
||||
def test_sign_unicode_message(self):
|
||||
"""测试签名 Unicode 消息"""
|
||||
unicode_message = "签名测试消息 🔐"
|
||||
signature = sign_with_rsa(unicode_message, self.private_pem)
|
||||
is_valid = verify_rsa_signature(unicode_message, signature, self.public_pem)
|
||||
|
||||
self.assertTrue(is_valid)
|
||||
|
||||
|
||||
class TestEdgeCases(unittest.TestCase):
|
||||
"""测试边界情况"""
|
||||
|
||||
def setUp(self):
|
||||
"""测试前准备"""
|
||||
self.key_pair = RSAKeyPair.generate(key_size=2048)
|
||||
self.private_pem = self.key_pair.get_private_key_pem()
|
||||
self.public_pem = self.key_pair.get_public_key_pem()
|
||||
|
||||
def test_encrypt_empty_string(self):
|
||||
"""测试加密空字符串"""
|
||||
plaintext = ""
|
||||
ciphertext = encrypt_with_rsa(plaintext, self.public_pem)
|
||||
decrypted = decrypt_with_rsa(ciphertext, self.private_pem)
|
||||
|
||||
self.assertEqual(plaintext, decrypted)
|
||||
|
||||
def test_encrypt_long_message(self):
|
||||
"""测试加密长消息(应该失败,RSA 有长度限制)"""
|
||||
# RSA 2048 位密钥最多能加密约 190 字节
|
||||
long_plaintext = "A" * 300
|
||||
with self.assertRaises(Exception):
|
||||
encrypt_with_rsa(long_plaintext, self.public_pem)
|
||||
|
||||
def test_sign_empty_message(self):
|
||||
"""测试对空消息签名"""
|
||||
message = ""
|
||||
signature = sign_with_rsa(message, self.private_pem)
|
||||
is_valid = verify_rsa_signature(message, signature, self.public_pem)
|
||||
|
||||
self.assertTrue(is_valid)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user