Files
kami_apple_exchage/backend/test_tasks.py
danial 48cdcb6140 feat: Add deployment scripts and configuration for Apple Gift Card Exchange Platform
- Create README.md for deployment instructions including environment requirements and setup steps.
- Implement deploy.sh script for automated deployment of development and production environments.
- Add combined Docker Compose configuration for frontend and backend services.
- Include Redis configuration file for optimized memory management and persistence.
- Update frontend Dockerfile to handle Next.js asset paths and static files.
- Remove obsolete deployment files and configurations from frontend directory.
2025-09-11 17:57:18 +08:00

57 lines
1.6 KiB
Python

#!/usr/bin/env python3
"""
测试 Celery 任务注册
"""
import sys
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
def test_task_registration():
"""测试任务注册"""
try:
print("正在导入 Celery 应用...")
from app.core.celery_app import celery_app
print("正在导入任务模块...")
from app.tasks import crawler_tasks
print("检查已注册的任务...")
registered_tasks = list(celery_app.tasks.keys())
print(f"已注册的任务数量: {len(registered_tasks)}")
print("已注册的任务:")
for task_name in sorted(registered_tasks):
if not task_name.startswith('celery.'):
print(f"{task_name}")
# 检查特定任务
target_tasks = [
'app.tasks.crawler_tasks.batch_process_orders',
'app.tasks.crawler_tasks.process_apple_order'
]
print("\n检查目标任务:")
for task_name in target_tasks:
if task_name in celery_app.tasks:
print(f"{task_name} - 已注册")
else:
print(f"{task_name} - 未注册")
return False
print("\n✅ 所有任务都已正确注册!")
return True
except Exception as e:
print(f"❌ 任务注册测试失败: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = test_task_registration()
sys.exit(0 if success else 1)