diff --git a/backend/.env b/backend/.env index f552be7..856a358 100644 --- a/backend/.env +++ b/backend/.env @@ -54,15 +54,15 @@ LOG_MAX_SIZE=10485760 LOG_BACKUP_COUNT=5 # OpenTelemetry简化配置 -OTEL_ENABLED=true -OTEL_SERVICE_NAME=apple-exchange-backend +OTEL_ENABLED=false +OTEL_SERVICE_NAME=苹果官网下单 OTEL_SERVICE_VERSION=2.0.0 OTEL_EXPORTER_ENDPOINT=http://38.38.251.113:31547 OTEL_EXPORTER_PROTOCOL=grpc OTEL_EXPORTER_TIMEOUT=30 -OTEL_TRACES_ENABLED=true -OTEL_METRICS_ENABLED=true -OTEL_LOGS_ENABLED=true +OTEL_TRACES_ENABLED=false +OTEL_METRICS_ENABLED=false +OTEL_LOGS_ENABLED=false OTEL_SAMPLER_RATIO=1.0 OTEL_BATCH_SIZE=512 OTEL_EXPORT_INTERVAL=5000 diff --git a/backend/app/api/v1/links.py b/backend/app/api/v1/links.py index a569a37..309ba9b 100644 --- a/backend/app/api/v1/links.py +++ b/backend/app/api/v1/links.py @@ -9,9 +9,12 @@ from app.core.database import get_async_db from app.core.log import get_logger from app.schemas.link import ( LinkCreate, - LinkListResponse, - LinkResponse, LinkStatus, + LinkUpdate, +) +from app.schemas.task import ( + LinkInfo, + PaginatedResponse, ) from app.services.link_service import LinksService @@ -24,10 +27,10 @@ def get_link_service(db: AsyncSession = Depends(get_async_db)) -> LinksService: return LinksService(db) -@router.post("/", response_model=LinkResponse) +@router.post("/", response_model=LinkInfo) async def create_link( link_data: LinkCreate, link_service: LinksService = Depends(get_link_service) -): +) -> LinkInfo: """创建新链接""" try: return await link_service.create_link(link_data) @@ -41,7 +44,7 @@ async def create_link( raise HTTPException(status_code=500, detail="创建链接失败") -@router.get("/list", response_model=LinkListResponse) +@router.get("/list", response_model=PaginatedResponse[LinkInfo]) async def get_links( page: int = Query(1, ge=1, description="页码"), size: int = Query(20, ge=1, le=1000, description="每页大小"), @@ -49,7 +52,7 @@ async def get_links( max_amount: float | None = Query(None, description="最大金额"), url_pattern: str | None = Query(None, description="URL模式"), link_service: LinksService = Depends(get_link_service), -): +) -> PaginatedResponse[LinkInfo]: """获取链接列表""" try: result = await link_service.get_links( @@ -66,10 +69,10 @@ async def get_links( raise HTTPException(status_code=500, detail="获取链接列表失败") -@router.get("/{link_id}", response_model=LinkResponse) +@router.get("/{link_id}", response_model=LinkInfo) async def get_link( link_id: int, link_service: LinksService = Depends(get_link_service) -): +) -> LinkInfo: """获取单个链接详情""" try: link = await link_service.get_link(str(link_id)) @@ -87,7 +90,7 @@ async def get_link( @router.delete("/{link_id}") async def delete_link( link_id: str, link_service: LinksService = Depends(get_link_service) -): +) -> dict[str, str]: """删除链接""" try: success = await link_service.delete_link(link_id) @@ -108,7 +111,7 @@ async def toggle_link_status( link_id: str, status: LinkStatus, link_service: LinksService = Depends(get_link_service), -): +) -> LinkInfo: """切换链接状态""" try: updated_link = await link_service.update_link_status(link_id, status) @@ -122,3 +125,25 @@ async def toggle_link_status( except Exception as e: logger.error(f"切换链接状态失败: {str(e)}", link_id=link_id, exc_info=True) raise HTTPException(status_code=500, detail="切换链接状态失败") + + +@router.patch("/{link_id}/weight") +async def update_link_weight( + link_id: str, + weight: int = Query(..., ge=1, le=100, description="权重值(1-100)"), + link_service: LinksService = Depends(get_link_service), +) -> LinkInfo: + """更新链接权重""" + try: + link_update = LinkUpdate(url=None, amount=None, weight=weight, status=None) + updated_link = await link_service.update_link(link_id, link_update) + if not updated_link: + logger.warning(f"更新链接权重失败 - 链接不存在: {link_id}") + raise HTTPException(status_code=404, detail="链接不存在") + logger.info(f"链接权重更新成功: {link_id} -> {weight}") + return updated_link + except HTTPException: + raise + except Exception as e: + logger.error(f"更新链接权重失败: {str(e)}", link_id=link_id, exc_info=True) + raise HTTPException(status_code=500, detail="更新链接权重失败") diff --git a/backend/app/api/v1/orders.py b/backend/app/api/v1/orders.py index ad06df7..0b0684e 100644 --- a/backend/app/api/v1/orders.py +++ b/backend/app/api/v1/orders.py @@ -12,13 +12,11 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import get_async_db from app.core.log import get_logger from app.models.orders import OrderStatus -from app.schemas.link import LinkResponse from app.schemas.order import ( OrderDetailResponse, OrderStatsResponse, ) from app.schemas.task import CardInfo, UserInfo, LinkInfo -from app.schemas.user_data import UserInfoResponse from app.services.order_business_service import OrderService router = APIRouter() @@ -56,7 +54,8 @@ async def get_orders( # Convert SQLAlchemy models to Pydantic response schemas result = [] for order in orders: - link_response = LinkResponse( + link_response = LinkInfo( + weight=order.links.weight, id=order.links.id, url=order.links.url, amount=order.links.amount, @@ -79,7 +78,7 @@ async def get_orders( ) ) - user_data = UserInfoResponse( + user_data = UserInfo( id=order.user_data.id, email=order.user_data.email, phone=order.user_data.phone, diff --git a/backend/app/api/v1/user_data.py b/backend/app/api/v1/user_data.py index a551804..f718bf3 100644 --- a/backend/app/api/v1/user_data.py +++ b/backend/app/api/v1/user_data.py @@ -9,10 +9,13 @@ from app.core.database import get_async_db from app.core.log import get_logger from app.schemas.user_data import ( UserDataCreate, - UserDataListResponse, UserDataResponse, UserDataUploadResponse, ) +from app.schemas.task import ( + BulkDeleteUserDataResponse, + PaginatedResponse, +) from app.services.user_data_service import UserDataService logger = get_logger(__name__) @@ -79,6 +82,30 @@ async def get_user_data( raise HTTPException(status_code=500, detail="获取用户数据详情失败") +@router.delete("/all", response_model=BulkDeleteUserDataResponse, summary="批量删除所有用户数据") +async def bulk_delete_all_user_data( + skip_orders: bool = Query(False, description="是否跳过有关联订单的用户数据"), + service: UserDataService = Depends(get_user_data_service), +): + """ + 批量软删除所有用户数据 + + - **skip_orders**: 是否跳过有关联订单的用户数据(默认false,会删除包括有关联订单的所有数据) + + 返回删除统计信息,包括总用户数、删除用户数和跳过用户数 + """ + try: + result = await service.delete_all_user_data(skip_orders=skip_orders) + logger.info( + f"批量删除用户数据完成: total={result['total_users']}, " + f"deleted={result['deleted_users']}, skipped={result['skipped_users']}" + ) + return result + except Exception as e: + logger.error(f"批量删除用户数据失败: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail="批量删除用户数据失败") + + @router.delete("/{user_id}", summary="删除用户数据") async def delete_user_data( user_id: str, service: UserDataService = Depends(get_user_data_service) @@ -105,7 +132,7 @@ async def delete_user_data( raise HTTPException(status_code=500, detail="删除用户数据失败") -@router.get("/list", response_model=UserDataListResponse, summary="获取用户数据列表") +@router.get("/list", response_model=PaginatedResponse[UserDataResponse], summary="获取用户数据列表") async def get_user_data_list( page: int = Query(1, ge=1, description="页码"), size: int = Query(20, ge=1, le=100, description="每页大小"), @@ -202,3 +229,5 @@ async def batch_upload_user_data( except Exception as e: logger.error(f"批量上传用户数据失败: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail="批量上传用户数据失败") + + diff --git a/backend/app/core/database.py b/backend/app/core/database.py index f030745..939f665 100644 --- a/backend/app/core/database.py +++ b/backend/app/core/database.py @@ -13,7 +13,7 @@ from sqlalchemy.ext.asyncio import ( async_sessionmaker, create_async_engine, ) -from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.orm import Session from sqlalchemy.pool import StaticPool from app.core.config import get_settings diff --git a/backend/app/models/links.py b/backend/app/models/links.py index 4dc5adc..bbd6edd 100644 --- a/backend/app/models/links.py +++ b/backend/app/models/links.py @@ -5,7 +5,7 @@ from typing import TYPE_CHECKING, Any from enum import Enum -from sqlalchemy import Boolean, Float, Index, String +from sqlalchemy import Boolean, Float, Index, Integer, String from sqlalchemy.orm import Mapped, mapped_column, relationship from .base import BaseModel @@ -33,6 +33,7 @@ class Links(BaseModel): url: Mapped[str] = mapped_column(String(255), nullable=False, comment="链接地址") amount: Mapped[float] = mapped_column(Float, nullable=False, comment="金额") + weight: Mapped[int] = mapped_column(Integer, default=1, nullable=False, comment="权重(1-100)") status: Mapped[LinkStatus] = mapped_column( default=LinkStatus.ACTIVE, comment="链接状态" ) diff --git a/backend/app/repositories/link_repository.py b/backend/app/repositories/link_repository.py index f7933f1..944571b 100644 --- a/backend/app/repositories/link_repository.py +++ b/backend/app/repositories/link_repository.py @@ -7,7 +7,7 @@ from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.log import get_logger -from app.models.links import Links +from app.models.links import LinkStatus, Links from app.repositories.base_repository import BaseRepository logger = get_logger(__name__) @@ -142,28 +142,55 @@ class LinkRepository(BaseRepository[Links]): self, current_position: int = 0 ) -> tuple[Links, int] | None: """ - 从轮询池中获取下一个链接 + 从轮询池中获取下一个链接(基于权重的轮询算法) Args: - current_position: 当前位置 + current_position: 当前轮询位置 Returns: (链接实例, 新位置) 或 None """ - # 获取所有链接按ID排序(确保顺序一致) - query = select(Links).order_by(Links.id) + # 获取所有激活状态且未软删除的链接,按权重降序排列 + query = select(Links).where( + Links.status == LinkStatus.ACTIVE.value, + Links.is_deleted == False + ).order_by(Links.weight.desc()) result = await self.db_session.execute(query) links = list(result.scalars().all()) if not links: return None - # 计算下一个位置 - next_position = (current_position + 1) % len(links) - next_link = links[next_position] + # 计算总权重 + total_weight = sum(link.weight for link in links) + if total_weight == 0: + return None - logger.info(f"从轮询池获取链接: {next_link.id}, 位置: {next_position}") - return next_link, next_position + # 使用基于权重的轮询算法 + # current_position 在总权重范围内循环,确保权重越高的链接被选中的频率越高 + position_in_weight_cycle = current_position % total_weight + + # 遍历链接,累积权重直到找到应该被选中的链接 + accumulated_weight = 0 + + for link in links: + accumulated_weight += link.weight + if position_in_weight_cycle < accumulated_weight: + # 计算下一个位置,确保在总权重范围内循环 + next_position = (current_position + 1) % total_weight if total_weight > 0 else 0 + logger.info( + f"从权重轮询池获取链接: {link.id}, 权重: {link.weight}, " + f"累积权重: {accumulated_weight}, 位置: {next_position}" + ) + return link, next_position + + # 理论上不应该到达这里,但作为保险措施 + next_position = (current_position + 1) % total_weight if total_weight > 0 else 0 + selected_link = links[0] + logger.info( + f"从权重轮询池获取链接(默认): {selected_link.id}, 权重: {selected_link.weight}, 位置: {next_position}" + ) + return selected_link, next_position async def get_link_by_pool_position(self, position: int) -> Links | None: """ @@ -181,9 +208,14 @@ class LinkRepository(BaseRepository[Links]): async def get_pool_size(self) -> int: """ - 获取轮询池大小 + 获取轮询池大小(仅包含激活状态且未软删除的链接) Returns: 池中链接总数 """ - return await self.count() + query = select(func.count()).where( + Links.status == LinkStatus.ACTIVE.value, + Links.is_deleted == False + ) + result = await self.db_session.execute(query) + return result.scalar() or 0 diff --git a/backend/app/repositories/user_data_repository.py b/backend/app/repositories/user_data_repository.py index 8030f4a..866630c 100644 --- a/backend/app/repositories/user_data_repository.py +++ b/backend/app/repositories/user_data_repository.py @@ -130,3 +130,61 @@ class UserDataRepository(BaseRepository[UserData]): # 创建新用户 new_user = await self.create(**user_data) return new_user, True + + async def bulk_soft_delete(self, skip_orders: bool = False) -> dict[str, any]: + """ + 批量软删除用户数据 + + Args: + skip_orders: 是否跳过有关联订单的用户数据 + + Returns: + 删除统计信息 + """ + try: + # 获取所有未删除的用户数据 + all_users = await self.get_all(include_deleted=False) + + if not all_users: + return { + "total_users": 0, + "deleted_users": 0, + "skipped_users": 0, + "message": "没有找到需要删除的用户数据" + } + + deleted_count = 0 + skipped_count = 0 + user_ids_to_delete = [] + + for user in all_users: + # 检查是否有关联订单 + if skip_orders: + user_with_orders = await self.get_user_with_orders(user.id) + if user_with_orders and user_with_orders.orders: + skipped_count += 1 + continue + + user_ids_to_delete.append(user.id) + deleted_count += 1 + + # 批量执行软删除 + if user_ids_to_delete: + await self.bulk_delete(user_ids_to_delete) + + logger.info( + f"批量软删除用户数据完成: total={len(all_users)}, " + f"deleted={deleted_count}, skipped={skipped_count}" + ) + + return { + "total_users": len(all_users), + "deleted_users": deleted_count, + "skipped_users": skipped_count, + "message": f"成功软删除 {deleted_count} 个用户数据" + + (f",跳过 {skipped_count} 个有关联订单的用户数据" if skipped_count > 0 else "") + } + + except Exception as e: + logger.error(f"批量软删除用户数据失败: {str(e)}", exc_info=True) + raise diff --git a/backend/app/schemas/__init__.py b/backend/app/schemas/__init__.py index 4ecb15c..92846d2 100644 --- a/backend/app/schemas/__init__.py +++ b/backend/app/schemas/__init__.py @@ -1,5 +1,6 @@ """ Pydantic schemas for API request/response models +统一管理的所有 Pydantic 模型 """ from .health import ( @@ -8,39 +9,45 @@ from .health import ( LivenessCheckResponse, ReadinessCheckResponse, ) -from .link import ( - LinkCreate, - LinkListResponse, - LinkPoolResponse, - LinkResponse, - LinkStatsResponse, - LinkUpdate, -) -from .order import ( - OrderDetailResponse, - OrderStatsResponse, - UploadUrlRequest, - UploadUrlResponse, -) from .task import ( BatchProcessRequest, CardInfo, + DeleteAllDataResponse, + GiftCardDetailCreate, + GiftCardInfoCreate, + GiftCardInfoResponse, + GiftCardRequest, GiftCardResponse, + GiftCardSubmissionRequest, + GiftCardSubmissionResponse, + LinkBase, + LinkCreate, LinkInfo, + LinkPoolResponse, + LinkStatsResponse, + LinkStatus, + LinkUpdate, + OrderDetailResponse, + OrderStatsResponse, PaginatedResponse, ProcessOrderRequest, QueueStatsResponse, + TaskControlRequest, + TaskControlResponse, + TaskListResponse, + TaskListItem, + TaskStateResponse, TaskStatusResponse, + UploadUrlRequest, + UploadUrlResponse, UserInfo, - WorkerStatusResponse, -) -from .user_data import ( + UserDataBase, UserDataCreate, - UserDataListResponse, UserDataResponse, UserDataStatsResponse, UserDataUpdate, UserDataUploadResponse, + WorkerStatusResponse, ) __all__ = [ @@ -49,32 +56,45 @@ __all__ = [ "LinkInfo", "CardInfo", "PaginatedResponse", + "LinkStatus", + # User data schemas + "UserDataBase", + "UserDataCreate", + "UserDataUpdate", + "UserDataResponse", + "UserDataUploadResponse", + "UserDataStatsResponse", + # Link schemas + "LinkBase", + "LinkCreate", + "LinkUpdate", + "LinkPoolResponse", + "LinkStatsResponse", # Order schemas "OrderStatsResponse", "OrderDetailResponse", "UploadUrlRequest", "UploadUrlResponse", + # Gift card schemas + "GiftCardRequest", "GiftCardResponse", + "GiftCardSubmissionRequest", + "GiftCardSubmissionResponse", + "GiftCardInfoCreate", + "GiftCardInfoResponse", + "GiftCardDetailCreate", # Task schemas "ProcessOrderRequest", "BatchProcessRequest", "TaskStatusResponse", "WorkerStatusResponse", "QueueStatsResponse", - # Link schemas - "LinkCreate", - "LinkUpdate", - "LinkResponse", - "LinkListResponse", - "LinkPoolResponse", - "LinkStatsResponse", - # User data schemas - "UserDataCreate", - "UserDataUpdate", - "UserDataResponse", - "UserDataListResponse", - "UserDataUploadResponse", - "UserDataStatsResponse", + "TaskControlRequest", + "TaskControlResponse", + "TaskStateResponse", + "TaskListResponse", + "TaskListItem", + "DeleteAllDataResponse", # Health schemas "HealthCheckResponse", "DetailedHealthCheckResponse", diff --git a/backend/app/schemas/gift_card.py b/backend/app/schemas/gift_card.py index 590f857..687d1ed 100644 --- a/backend/app/schemas/gift_card.py +++ b/backend/app/schemas/gift_card.py @@ -21,5 +21,3 @@ __all__ = [ "GiftCardDetailCreate", ] -# 为了向后兼容,保留别名 -GiftCardInfoResponse = CardInfo diff --git a/backend/app/schemas/link.py b/backend/app/schemas/link.py index 102886f..b552c6e 100644 --- a/backend/app/schemas/link.py +++ b/backend/app/schemas/link.py @@ -1,103 +1,27 @@ """ 链接相关的Pydantic模型 +已迁移到 app.schemas.task 模块中统一管理 """ -from datetime import datetime -from enum import Enum +# 从统一schema导入所有链接相关模型 +from app.schemas.task import ( + LinkBase, + LinkCreate, + LinkInfo, + LinkPoolResponse, + LinkStatsResponse, + LinkUpdate, + LinkStatus, + PaginatedResponse, +) -from pydantic import BaseModel, ConfigDict, Field - - -class LinkStatus(str, Enum): - """链接状态枚举""" - - ACTIVE = "active" - INACTIVE = "inactive" - - -class LinkBase(BaseModel): - """链接基础模型""" - - url: str = Field(..., description="链接URL", max_length=255) - amount: float = Field(..., description="金额", gt=0) - status: LinkStatus = Field(LinkStatus.ACTIVE, description="链接状态") - - -class LinkCreate(LinkBase): - """创建链接请求模型""" - - pass - - -class LinkUpdate(BaseModel): - """更新链接请求模型""" - - url: str | None = Field(None, description="链接URL", max_length=255) - amount: float | None = Field(None, description="金额", gt=0) - status: LinkStatus | None = Field(None, description="链接状态") - - -# 从统一schema导入LinkInfo(延迟导入避免循环导入) -try: - from app.schemas.task import LinkInfo - - # 为了向后兼容,保留别名 - LinkResponse = LinkInfo -except ImportError: - # 如果导入失败,保持原有的LinkResponse - class LinkResponse(LinkBase): - """链接响应模型""" - - id: str = Field(..., description="链接ID") - created_at: str = Field(..., description="创建时间") - updated_at: str = Field(..., description="更新时间") - - model_config = ConfigDict(from_attributes=True) - - @classmethod - def from_orm(cls, obj): - """Custom ORM conversion to handle datetime serialization""" - data = {} - for field in cls.model_fields: - value = getattr(obj, field, None) - if isinstance(value, datetime): - data[field] = value.isoformat() - else: - data[field] = value - - return cls(**data) - - -# 从统一schema导入PaginatedResponse(延迟导入避免循环导入) -try: - from app.schemas.task import PaginatedResponse - - # 为了向后兼容,保留别名 - LinkListResponse = PaginatedResponse[LinkResponse] -except ImportError: - # 如果导入失败,保持原有的LinkListResponse - class LinkListResponse(BaseModel): - """链接列表响应模型""" - - items: list[LinkResponse] - total: int - page: int - size: int - pages: int - - -class LinkPoolResponse(BaseModel): - """轮询池响应模型""" - - link: LinkResponse - pool_position: int = Field(..., description="在轮询池中的位置") - - -class LinkStatsResponse(BaseModel): - """链接统计响应模型""" - - total_links: int = Field(..., description="总链接数") - total_orders: int = Field(..., description="总订单数") - average_amount: float = Field(..., description="平均金额") - min_amount: float = Field(..., description="最小金额") - max_amount: float = Field(..., description="最大金额") +__all__ = [ + "LinkBase", + "LinkCreate", + "LinkInfo", + "LinkPoolResponse", + "LinkStatsResponse", + "LinkUpdate", + "LinkStatus", + "PaginatedResponse", +] diff --git a/backend/app/schemas/order.py b/backend/app/schemas/order.py index 7dcf9ad..899d443 100644 --- a/backend/app/schemas/order.py +++ b/backend/app/schemas/order.py @@ -1,56 +1,20 @@ """ 订单相关的Pydantic模型 +已迁移到 app.schemas.task 模块中统一管理 """ -from pydantic import BaseModel, ConfigDict, Field +# 从统一schema导入所有订单相关模型 +from app.schemas.task import ( + OrderDetailResponse, + OrderStatsResponse, + UploadUrlRequest, + UploadUrlResponse, + UserInfo, +) -from app.models.orders import OrderStatus -from app.schemas.task import CardInfo, LinkInfo, UserInfo - - -class OrderStatsResponse(BaseModel): - """订单统计响应""" - - total: int - pending: int - processing: int - success: int - failed: int - last_update: str - - -class OrderDetailResponse(BaseModel): - """订单详情响应 - 与数据库结构完全一致""" - - id: str = Field(..., description="订单ID") - status: OrderStatus = Field(..., description="订单状态") - created_at: str = Field(..., description="创建时间") - updated_at: str = Field(..., description="更新时间") - final_order_url: str | None = Field(None, description="最终订单URL") - final_order_id: str | None = Field(None, description="最终苹果订单ID") - failure_reason: str | None = Field(None, description="失败原因") - user_data_id: str = Field(..., description="用户数据ID") - links_id: str = Field(..., description="链接ID") - - # 关联关系 - user_data: UserInfo = Field(description="用户数据") - links: LinkInfo = Field(description="链接信息") - gift_cards: list[CardInfo] = Field(default_factory=list, description="礼品卡列表") - - model_config = ConfigDict(from_attributes=True) - - -class UploadUrlRequest(BaseModel): - """上传URL请求""" - - url: str = Field(..., min_length=1, description="上传URL") - thread_id: str | None = Field(None, description="线程ID") - - -class UploadUrlResponse(BaseModel): - """上传URL响应""" - - success: bool - message: str - upload_config_id: str - url: str +__all__ = [ + "OrderDetailResponse", + "OrderStatsResponse", + "UploadUrlRequest", + "UploadUrlResponse", +] diff --git a/backend/app/schemas/task.py b/backend/app/schemas/task.py index 5875f01..43c9214 100644 --- a/backend/app/schemas/task.py +++ b/backend/app/schemas/task.py @@ -4,14 +4,21 @@ """ from datetime import datetime +from enum import Enum from typing import Any, Generic, List, TypeVar -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict, Field, field_validator from app.enums.task import OrderTaskStatus from app.models.orders import OrderStatus from app.models.giftcards import GiftCardStatus -from app.models.links import LinkStatus + + +class LinkStatus(str, Enum): + """链接状态枚举""" + + ACTIVE = "active" + INACTIVE = "inactive" T = TypeVar("T") @@ -101,8 +108,6 @@ class UserInfo(BaseModel): model_config = ConfigDict(from_attributes=True) -# 为了向后兼容,保留 TaskUserInfo 别名 -TaskUserInfo = UserInfo class LinkInfo(BaseModel): @@ -111,6 +116,7 @@ class LinkInfo(BaseModel): id: str = Field(..., description="链接ID") url: str = Field(..., description="链接地址") amount: float = Field(..., description="金额") + weight: int = Field(..., description="权重(1-100)") status: LinkStatus = Field(..., description="链接状态") created_at: str = Field(description="创建时间") updated_at: str = Field(description="更新时间") @@ -118,8 +124,6 @@ class LinkInfo(BaseModel): model_config = ConfigDict(from_attributes=True) -# 为了向后兼容,保留 TaskLinkInfo 别名 -TaskLinkInfo = LinkInfo class CardInfo(BaseModel): @@ -137,8 +141,6 @@ class CardInfo(BaseModel): model_config = ConfigDict(from_attributes=True) -# 为了向后兼容,保留 TaskCardInfo 别名 -TaskCardInfo = CardInfo class TaskListItem(BaseModel): @@ -268,3 +270,173 @@ class PaginatedResponse(BaseModel, Generic[T]): pages: int = Field(..., description="总页数") model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True) + + +# 用户数据相关模型 +class UserDataBase(BaseModel): + """用户数据基础模型""" + + first_name: str = Field(..., description="名字", max_length=255) + last_name: str = Field(..., description="姓氏", max_length=255) + email: str = Field(..., description="邮箱", max_length=255) + phone: str = Field(..., description="电话", max_length=50) + street_address: str = Field(..., description="街道地址", max_length=500) + city: str = Field(..., description="城市", max_length=255) + state: str = Field(..., description="州/省", max_length=255) + zip_code: str = Field(..., description="邮编", max_length=20) + + @field_validator("email") + @classmethod + def validate_email(cls, v): + """验证邮箱格式""" + import re + + pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" + if not re.match(pattern, v): + raise ValueError("邮箱格式不正确") + return v + + +class UserDataCreate(UserDataBase): + """创建用户数据请求模型""" + pass + + +class UserDataUpdate(BaseModel): + """更新用户数据请求模型""" + + first_name: str | None = Field(None, description="名字", max_length=255) + last_name: str | None = Field(None, description="姓氏", max_length=255) + email: str | None = Field(None, description="邮箱", max_length=255) + phone: str | None = Field(None, description="电话", max_length=50) + street_address: str | None = Field(None, description="街道地址", max_length=500) + city: str | None = Field(None, description="城市", max_length=255) + state: str | None = Field(None, description="州/省", max_length=255) + zip_code: str | None = Field(None, description="邮编", max_length=20) + + +class UserDataResponse(UserDataBase): + """用户数据响应模型""" + + id: str = Field(..., description="用户数据ID") + created_at: str = Field(..., description="创建时间") + updated_at: str = Field(..., description="更新时间") + + model_config = ConfigDict(from_attributes=True) + + +class UserDataUploadResponse(BaseModel): + """用户数据上传响应模型""" + + user_data: UserDataResponse + message: str = Field(..., description="响应消息") + + +class UserDataStatsResponse(BaseModel): + """用户数据统计响应模型""" + + total_users: int = Field(..., description="总用户数") + total_orders: int = Field(..., description="总订单数") + recent_uploads: int = Field(..., description="最近上传数量") + success_rate: float = Field(..., description="成功率") + + +class BulkDeleteUserDataResponse(BaseModel): + """批量删除用户数据响应模型""" + + total_users: int = Field(..., description="总用户数") + deleted_users: int = Field(..., description="已删除用户数") + skipped_users: int = Field(..., description="跳过的用户数") + message: str = Field(..., description="响应消息") + + +# 链接相关模型 +class LinkBase(BaseModel): + """链接基础模型""" + + url: str = Field(..., description="链接URL", max_length=255) + amount: float = Field(..., description="金额", gt=0) + weight: int = Field(1, description="权重(1-100)", ge=1, le=100) + status: LinkStatus = Field(LinkStatus.ACTIVE, description="链接状态") + + +class LinkCreate(LinkBase): + """创建链接请求模型""" + pass + + +class LinkUpdate(BaseModel): + """更新链接请求模型""" + + url: str | None = Field(None, description="链接URL", max_length=255) + amount: float | None = Field(None, description="金额", gt=0) + weight: int | None = Field(None, description="权重(1-100)", ge=1, le=100) + status: LinkStatus | None = Field(None, description="链接状态") + + model_config = ConfigDict(use_enum_values=True) + + +class LinkPoolResponse(BaseModel): + """轮询池响应模型""" + + link: LinkInfo + pool_position: int = Field(..., description="在轮询池中的位置") + + +class LinkStatsResponse(BaseModel): + """链接统计响应模型""" + + total_links: int = Field(..., description="总链接数") + total_orders: int = Field(..., description="总订单数") + average_amount: float = Field(..., description="平均金额") + min_amount: float = Field(..., description="最小金额") + max_amount: float = Field(..., description="最大金额") + + +# 订单相关模型 +class OrderStatsResponse(BaseModel): + """订单统计响应""" + + total: int + pending: int + processing: int + success: int + failed: int + last_update: str + + +class OrderDetailResponse(BaseModel): + """订单详情响应 - 与数据库结构完全一致""" + + id: str = Field(..., description="订单ID") + status: OrderStatus = Field(..., description="订单状态") + created_at: str = Field(..., description="创建时间") + updated_at: str = Field(..., description="更新时间") + final_order_url: str | None = Field(None, description="最终订单URL") + final_order_id: str | None = Field(None, description="最终苹果订单ID") + failure_reason: str | None = Field(None, description="失败原因") + user_data_id: str = Field(..., description="用户数据ID") + links_id: str = Field(..., description="链接ID") + + # 关联关系 + user_data: UserInfo = Field(description="用户数据") + links: LinkInfo = Field(description="链接信息") + gift_cards: list[CardInfo] = Field(default_factory=list, description="礼品卡列表") + + model_config = ConfigDict(from_attributes=True) + + +class UploadUrlRequest(BaseModel): + """上传URL请求""" + + url: str = Field(..., min_length=1, description="上传URL") + thread_id: str | None = Field(None, description="线程ID") + + +class UploadUrlResponse(BaseModel): + """上传URL响应""" + + success: bool + message: str + upload_config_id: str + url: str diff --git a/backend/app/schemas/user_data.py b/backend/app/schemas/user_data.py index 4b97a1b..feaea9d 100644 --- a/backend/app/schemas/user_data.py +++ b/backend/app/schemas/user_data.py @@ -1,104 +1,27 @@ """ 用户数据相关的Pydantic模型 +已迁移到 app.schemas.task 模块中统一管理 """ -from datetime import datetime +# 从统一schema导入所有用户数据相关模型 +from app.schemas.task import ( + UserDataBase, + UserDataCreate, + UserDataUpdate, + UserDataResponse, + UserDataUploadResponse, + UserDataStatsResponse, + PaginatedResponse, + UserInfo, +) -from pydantic import BaseModel, ConfigDict, Field, field_validator - - -class UserDataBase(BaseModel): - """用户数据基础模型""" - - first_name: str = Field(..., description="名字", max_length=255) - last_name: str = Field(..., description="姓氏", max_length=255) - email: str = Field(..., description="邮箱", max_length=255) - phone: str = Field(..., description="电话", max_length=50) - street_address: str = Field(..., description="街道地址", max_length=500) - city: str = Field(..., description="城市", max_length=255) - state: str = Field(..., description="州/省", max_length=255) - zip_code: str = Field(..., description="邮编", max_length=20) - - @field_validator("email") - @classmethod - def validate_email(cls, v): - """验证邮箱格式""" - import re - - pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" - if not re.match(pattern, v): - raise ValueError("邮箱格式不正确") - return v - - -class UserDataCreate(UserDataBase): - """创建用户数据请求模型""" - - pass - - -class UserDataUpdate(BaseModel): - """更新用户数据请求模型""" - - first_name: str | None = Field(None, description="名字", max_length=255) - last_name: str | None = Field(None, description="姓氏", max_length=255) - email: str | None = Field(None, description="邮箱", max_length=255) - phone: str | None = Field(None, description="电话", max_length=50) - street_address: str | None = Field(None, description="街道地址", max_length=500) - city: str | None = Field(None, description="城市", max_length=255) - state: str | None = Field(None, description="州/省", max_length=255) - zip_code: str | None = Field(None, description="邮编", max_length=20) - - -class UserDataResponse(UserDataBase): - """用户数据响应模型""" - - id: str = Field(..., description="用户数据ID") - created_at: str = Field(..., description="创建时间") - updated_at: str = Field(..., description="更新时间") - - model_config = ConfigDict(from_attributes=True) - - -# 从统一schema导入PaginatedResponse -from app.schemas.task import PaginatedResponse - -# 为了向后兼容,保留别名 -UserDataListResponse = PaginatedResponse[UserDataResponse] - - -class UserDataUploadResponse(BaseModel): - """用户数据上传响应模型""" - - user_data: UserDataResponse - message: str = Field(..., description="响应消息") - - -class UserDataResponse(UserDataBase): - """用户数据响应模型""" - - id: str = Field(..., description="用户数据ID") - created_at: str = Field(..., description="创建时间") - updated_at: str = Field(..., description="更新时间") - - model_config = ConfigDict(from_attributes=True) - - -class UserDataStatsResponse(BaseModel): - """用户数据统计响应模型""" - - total_users: int = Field(..., description="总用户数") - total_orders: int = Field(..., description="总订单数") - recent_uploads: int = Field(..., description="最近上传数量") - success_rate: float = Field(..., description="成功率") - - -# 从统一schema导入UserInfo(延迟导入避免循环导入) -try: - from app.schemas.task import UserInfo - - # 为了向后兼容,保留别名 - UserInfoResponse = UserInfo -except ImportError: - # 如果导入失败,定义一个临时的UserInfoResponse - UserInfoResponse = UserDataResponse +__all__ = [ + "UserDataBase", + "UserDataCreate", + "UserDataUpdate", + "UserDataResponse", + "UserDataUploadResponse", + "UserDataStatsResponse", + "PaginatedResponse", + "UserInfo", +] diff --git a/backend/app/services/link_service.py b/backend/app/services/link_service.py index 8bb3925..8e07610 100644 --- a/backend/app/services/link_service.py +++ b/backend/app/services/link_service.py @@ -16,13 +16,15 @@ from app.models.links import Links from app.repositories.repository_factory import RepositoryFactory from app.schemas.link import ( LinkCreate, - LinkListResponse, LinkPoolResponse, - LinkResponse, LinkStatsResponse, LinkUpdate, LinkStatus, ) +from app.schemas.task import ( + LinkInfo, + PaginatedResponse, +) logger = get_logger(__name__) @@ -39,7 +41,7 @@ class LinksService: self.repo_factory = RepositoryFactory(db) # 注意:这里不再直接获取redis客户端,而是在需要时调用get_redis() - async def create_link(self, link_data: LinkCreate) -> LinkResponse: + async def create_link(self, link_data: LinkCreate) -> LinkInfo: """ 创建新链接 @@ -56,7 +58,7 @@ class LinksService: # 创建链接 link = await self.repo_factory.links.create( - url=link_data.url, amount=link_data.amount + url=link_data.url, amount=link_data.amount, weight=link_data.weight ) logger.info(f"创建链接成功: {link.id}") @@ -64,7 +66,7 @@ class LinksService: async def update_link_status( self, link_id: str, status: LinkStatus - ) -> LinkResponse | None: + ) -> LinkInfo | None: """ 更新链接状态 @@ -83,7 +85,7 @@ class LinksService: return self._convert_to_response(updated_link) return None - async def get_link(self, link_id: str) -> LinkResponse | None: + async def get_link(self, link_id: str) -> LinkInfo | None: """ 获取单个链接 @@ -100,7 +102,7 @@ class LinksService: async def update_link( self, link_id: str, link_data: LinkUpdate - ) -> LinkResponse | None: + ) -> LinkInfo | None: """ 更新链接 @@ -164,7 +166,7 @@ class LinksService: min_amount: float | None = None, max_amount: float | None = None, url_pattern: str | None = None, - ) -> LinkListResponse: + ) -> PaginatedResponse[LinkInfo]: """ 获取链接列表 @@ -198,7 +200,7 @@ class LinksService: total = result.total pages = result.pages - return LinkListResponse( + return PaginatedResponse[LinkInfo]( items=[self._convert_to_response(link) for link in links], total=total, page=page, @@ -215,7 +217,7 @@ class LinksService: page_links = sorted(page_links, key=lambda x: x.created_at, reverse=True) - return LinkListResponse( + return PaginatedResponse[LinkInfo]( items=[self._convert_to_response(link) for link in page_links], total=total, page=page, @@ -297,7 +299,7 @@ class LinksService: pool_size = await self.repo_factory.links.get_pool_size() await redis_client.set(self.POOL_SIZE_KEY, str(pool_size)) - def _convert_to_response(self, link: Links) -> LinkResponse: + def _convert_to_response(self, link: Links) -> LinkInfo: """ 将链接模型转换为响应模型 @@ -307,10 +309,11 @@ class LinksService: Returns: 链接响应模型 """ - return LinkResponse( + return LinkInfo( id=link.id, url=link.url, amount=link.amount, + weight=link.weight, status=link.status, created_at=link.created_at.isoformat(), updated_at=link.updated_at.isoformat(), diff --git a/backend/app/services/task_service.py b/backend/app/services/task_service.py index a848fbe..3de4895 100644 --- a/backend/app/services/task_service.py +++ b/backend/app/services/task_service.py @@ -14,14 +14,14 @@ from app.core.state_manager import StateType, TaskState, task_state_manager from app.enums.task import OrderTaskStatus from app.repositories.task_repository import TaskRepository from app.schemas.task import ( + CardInfo, GiftCardSubmissionRequest, GiftCardSubmissionResponse, - TaskCardInfo, - TaskLinkInfo, + GiftCardDetailCreate, + LinkInfo, TaskListItem, TaskListResponse, - TaskUserInfo, - GiftCardDetailCreate, + UserInfo, ) from app.services.gift_card_service import GiftCardService @@ -132,7 +132,7 @@ class TaskService: user_info = None if order.user_data: user_data = order.user_data - user_info = TaskUserInfo( + user_info = UserInfo( id=user_data.id, first_name=user_data.first_name, last_name=user_data.last_name, @@ -150,7 +150,8 @@ class TaskService: link_info = None if order.links: link = order.links - link_info = TaskLinkInfo( + link_info = LinkInfo( + weight=link.weight, id=link.id, url=link.url, amount=link.amount, @@ -170,7 +171,7 @@ class TaskService: if gift_card_list: card_info = [] for gift_card in gift_card_list: - card_info_item = TaskCardInfo( + card_info_item = CardInfo( id=gift_card.id, card_code=gift_card.card_code, card_value=gift_card.card_value, diff --git a/backend/app/services/user_data_service.py b/backend/app/services/user_data_service.py index c24bf9a..3d62271 100644 --- a/backend/app/services/user_data_service.py +++ b/backend/app/services/user_data_service.py @@ -16,12 +16,14 @@ from app.repositories.repository_factory import RepositoryFactory from app.schemas.user_data import ( UserDataBase, UserDataCreate, - UserDataListResponse, UserDataResponse, UserDataStatsResponse, UserDataUpdate, UserDataUploadResponse, - UserInfoResponse, +) +from app.schemas.task import ( + PaginatedResponse, + UserInfo, ) @@ -93,7 +95,7 @@ class UserDataService: return None return self._convert_to_response(user) - async def get_user_info(self, user_id: str) -> UserInfoResponse | None: + async def get_user_info(self, user_id: str) -> UserInfo | None: """ 获取用户完整信息(包含所有数据库字段) @@ -177,7 +179,7 @@ class UserDataService: state: str | None = None, country: str | None = None, name_pattern: str | None = None, - ) -> UserDataListResponse: + ) -> PaginatedResponse[UserDataResponse]: """ 获取用户数据列表 @@ -213,7 +215,7 @@ class UserDataService: total = result.total pages = result.pages - return UserDataListResponse( + return PaginatedResponse[UserDataResponse]( items=[self._convert_to_response(user) for user in users], total=total, page=page, @@ -228,7 +230,7 @@ class UserDataService: page_users = users[start_idx:end_idx] pages = (total + size - 1) // size - return UserDataListResponse( + return PaginatedResponse[UserDataResponse]( items=[self._convert_to_response(user) for user in page_users], total=total, page=page, @@ -282,7 +284,7 @@ class UserDataService: updated_at=user.updated_at.isoformat(), ) - def _convert_to_info_response(self, user: UserData) -> UserInfoResponse: + def _convert_to_info_response(self, user: UserData) -> UserInfo: """ 将用户数据模型转换为完整信息响应模型 @@ -292,7 +294,7 @@ class UserDataService: Returns: 用户完整信息响应模型 """ - return UserInfoResponse( + return UserInfo( id=user.id, first_name=user.first_name, last_name=user.last_name, @@ -307,3 +309,15 @@ class UserDataService: full_name=user.full_name, full_address=user.full_address, ) + + async def delete_all_user_data(self, skip_orders: bool = False) -> dict[str, Any]: + """ + 软删除所有用户数据 + + Args: + skip_orders: 是否跳过有关联订单的用户数据 + + Returns: + 删除统计信息 + """ + return await self.repo_factory.user_data.bulk_soft_delete(skip_orders=skip_orders) diff --git a/backend/app/tasks/arq_tasks.py b/backend/app/tasks/arq_tasks.py index d7a0ea5..f4c9696 100644 --- a/backend/app/tasks/arq_tasks.py +++ b/backend/app/tasks/arq_tasks.py @@ -70,6 +70,43 @@ async def _process_apple_order_async( "order_id": order_id, } + # 检查关联的用户数据是否已被软删除 + async with db_manager.get_async_session() as session: + order_repo = OrderRepository(session) + order = await order_repo.get_by_id(order_id, relations=["user_data"]) + + if not order: + logger.error(f"订单不存在: {order_id}") + await task_state_manager.fail_task( + task_id, order_id, f"订单 {order_id} 不存在" + ) + return { + "success": False, + "error": f"订单 {order_id} 不存在", + "order_id": order_id, + } + + # 检查用户数据是否已被软删除 + if order.user_data.is_deleted: + logger.warning(f"用户数据已被软删除,终止订单处理: {order_id}, user_data_id={order.user_data_id}") + await task_state_manager.fail_task( + task_id, order_id, f"用户数据 {order.user_data_id} 已被删除" + ) + + # 更新订单状态为失败 + await order_repo.update_by_id( + order_id, + status=OrderStatus.FAILURE, + failure_reason=f"用户数据 {order.user_data_id} 已被删除", + completed_at=datetime.now(), + ) + + return { + "success": False, + "error": f"用户数据 {order.user_data_id} 已被删除", + "order_id": order_id, + } + # 获取分布式锁 lock_key = f"apple_order_processing:{order_id}" lock = get_lock( diff --git a/frontend/.hintrc b/frontend/.hintrc new file mode 100644 index 0000000..ddfb452 --- /dev/null +++ b/frontend/.hintrc @@ -0,0 +1,20 @@ +{ + "extends": [ + "development" + ], + "hints": { + "axe/forms": [ + "default", + { + "label": "off" + } + ], + "axe/name-role-value": [ + "default", + { + "button-name": "off" + } + ], + "button-type": "off" + } +} \ No newline at end of file diff --git a/frontend/src/components/dashboard/link-item.tsx b/frontend/src/components/dashboard/link-item.tsx index dfef6ff..af1dae4 100644 --- a/frontend/src/components/dashboard/link-item.tsx +++ b/frontend/src/components/dashboard/link-item.tsx @@ -1,6 +1,8 @@ "use client"; -import { Trash2, ExternalLink, Calendar, DollarSign, Copy, Loader2, Pause, Play } from "lucide-react"; +import { useState } from "react"; + +import { Trash2, ExternalLink, Calendar, DollarSign, Copy, Loader2, Pause, Play, Edit3, Check, X } from "lucide-react"; import { toast } from "sonner"; // 导入 Tooltip 组件 import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from "@/components/animate-ui/base/tooltip"; @@ -11,11 +13,15 @@ interface LinkItemProps { link: LinkInfo; onDelete: (linkId: string) => void; onToggleStatus?: (linkId: string) => void; + onUpdateWeight?: (linkId: string, weight: number) => void; isDeleting?: boolean; isTogglingStatus?: boolean; + isUpdatingWeight?: boolean; } -export function LinkItem({ link, onDelete, onToggleStatus, isDeleting = false, isTogglingStatus = false }: LinkItemProps) { +export function LinkItem({ link, onDelete, onToggleStatus, onUpdateWeight, isDeleting = false, isTogglingStatus = false, isUpdatingWeight = false }: LinkItemProps) { + const [isEditingWeight, setIsEditingWeight] = useState(false); + const [tempWeight, setTempWeight] = useState(link.weight); // 截断URL显示 const truncateUrl = (url: string, maxLength: number = 30) => { if (url.length <= maxLength) return url; @@ -92,6 +98,31 @@ export function LinkItem({ link, onDelete, onToggleStatus, isDeleting = false, i } }; + // 处理权重编辑 + const handleWeightEdit = () => { + setTempWeight(link.weight); + setIsEditingWeight(true); + }; + + // 保存权重 + const handleSaveWeight = async () => { + if (onUpdateWeight) { + try { + await onUpdateWeight(link.id, tempWeight); + setIsEditingWeight(false); + toast.success("权重更新成功"); + } catch { + toast.error("权重更新失败"); + } + } + }; + + // 取消权重编辑 + const handleCancelWeightEdit = () => { + setTempWeight(link.weight); + setIsEditingWeight(false); + }; + const statusInfo = getStatusInfo(link.status); const StatusIcon = statusInfo.icon; @@ -130,6 +161,54 @@ export function LinkItem({ link, onDelete, onToggleStatus, isDeleting = false, i {/* 第二行:金额、创建时间和操作按钮 */}
+ 权重范围0-100,用于控制链接的优先级 +
+{task.error_message}
-{task.error_message}
+{item.first_name} {item.last_name} {item.street_address} {item.city} {item.state} {item.zip_code} {item.email} {item.phone}