Files
kami_apple_exchage/backend/test_tasks.py
danial 5c486e34d3 docs(项目): 添加项目文档并进行代码调整
- 新增 CODEBUDDY.md、GEMINI.md、GEMINI_CN.md 等项目文档
- 更新 Dockerfile 和其他配置文件
- 优化部分代码结构,如 orders.py、tasks.py 等
- 新增 .dockerignore 文件
2025-09-12 19:38:24 +08:00

60 lines
1.6 KiB
Python

#!/usr/bin/env python3
"""
测试 Celery 任务注册
"""
import sys
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
def test_task_registration():
"""测试任务注册"""
try:
print("正在导入 Celery 应用...")
from app.core.celery_app import celery_app
print("正在导入任务模块...")
from app.tasks import crawler_tasks
print("检查已注册的任务...")
registered_tasks = list(celery_app.tasks.keys())
print(f"已注册的任务数量: {len(registered_tasks)}")
print("已注册的任务:")
for task_name in sorted(registered_tasks):
if not task_name.startswith("celery."):
print(f"{task_name}")
# 检查特定任务
target_tasks = [
"app.tasks.crawler_tasks.batch_process_orders",
"app.tasks.crawler_tasks.process_apple_order",
]
print("\n检查目标任务:")
for task_name in target_tasks:
if task_name in celery_app.tasks:
print(f"{task_name} - 已注册")
else:
print(f"{task_name} - 未注册")
return False
print("\n✅ 所有任务都已正确注册!")
return True
except Exception as e:
print(f"❌ 任务注册测试失败: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = test_task_registration()
sys.exit(0 if success else 1)