Files
kami_walmart_slide/logger.py
danial 4f2b08ffd1 重构代理池实现,优化模块导入和结构
- 在 app.py 中更新代理池工厂的导入路径
- 在 config.py 中修改代理池类型的导入路径
- 新增 enums.py 文件,定义代理池类型枚举
- 新增 proxy_pool.py 文件,包含代理池的基本实现和逻辑
2025-04-30 14:22:39 +08:00

180 lines
5.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import os
import gzip
import shutil
import pytz
from datetime import datetime
from logging.handlers import TimedRotatingFileHandler
# 确保logs目录存在
logs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs")
os.makedirs(logs_dir, exist_ok=True)
# 设置中国时区
china_tz = pytz.timezone("Asia/Shanghai")
# 定义颜色
class Colors:
"""ANSI颜色代码"""
RESET = '\033[0m'
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
BOLD = '\033[1m'
# 日志级别对应的颜色
LOG_COLORS = {
'DEBUG': Colors.BLUE,
'INFO': Colors.GREEN,
'WARNING': Colors.YELLOW,
'ERROR': Colors.RED,
'CRITICAL': Colors.MAGENTA
}
# 自定义的日志格式化器,添加中国时区的时间和行号
class ChinaTimeFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
dt = datetime.fromtimestamp(record.created).astimezone(china_tz)
if datefmt:
return dt.strftime(datefmt)
else:
return dt.strftime("%Y-%m-%d %H:%M:%S")
def format(self, record):
# 添加行号信息
if record.pathname:
record.lineno_str = f"{record.pathname}:{record.lineno}"
else:
record.lineno_str = "unknown"
# 调用父类的format方法
return super().format(record)
# 自定义的彩色控制台处理器
class ColoredConsoleHandler(logging.StreamHandler):
def emit(self, record):
try:
# 获取原始消息
msg = self.format(record)
# 添加颜色
if record.levelname in LOG_COLORS:
color = LOG_COLORS[record.levelname]
msg = f"{color}{msg}{Colors.RESET}"
# 输出到控制台
self.stream.write(msg + self.terminator)
self.flush()
except Exception:
self.handleError(record)
# 自定义的TimedRotatingFileHandler添加压缩功能
class CompressedTimedRotatingFileHandler(TimedRotatingFileHandler):
def __init__(
self,
filename,
when="midnight",
interval=1,
backupCount=0,
encoding=None,
delay=False,
utc=False,
atTime=None,
):
super().__init__(
filename, when, interval, backupCount, encoding, delay, utc, atTime
)
def doRollover(self):
# 执行基类的日志轮转
super().doRollover()
# 获取刚刚轮转的日志文件名
# TimedRotatingFileHandler会将文件重命名为 filename.YYYY-MM-DD
if self.backupCount > 0:
for i in range(self.backupCount - 1, 0, -1):
sfn = self.rotation_filename("%s.%d" % (self.baseFilename, i))
dfn = self.rotation_filename("%s.%d" % (self.baseFilename, i + 1))
if os.path.exists(sfn):
if os.path.exists(dfn):
os.remove(dfn)
os.rename(sfn, dfn)
dfn = self.rotation_filename(self.baseFilename + ".1")
if os.path.exists(dfn):
os.remove(dfn)
self.rotate(self.baseFilename, dfn)
# 压缩最新的日志文件
for filename in os.listdir(os.path.dirname(self.baseFilename)):
if filename.startswith(
os.path.basename(self.baseFilename)
) and filename != os.path.basename(self.baseFilename):
log_file_path = os.path.join(
os.path.dirname(self.baseFilename), filename
)
# 检查文件是否已经被压缩
if not filename.endswith(".gz") and os.path.isfile(log_file_path):
with open(log_file_path, "rb") as f_in:
with gzip.open(log_file_path + ".gz", "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(log_file_path) # 删除原始文件
# 全局logger实例确保只初始化一次
_logger_instance = None
def get_logger():
global _logger_instance
# 如果logger已经初始化过直接返回实例
if _logger_instance is not None:
return _logger_instance
# 创建logger
logger = logging.getLogger("walmart")
logger.setLevel(logging.INFO)
# 确保没有重复的处理器
if logger.handlers:
# 如果已经有处理器说明logger已经被初始化过直接返回
_logger_instance = logger
return logger
# 创建彩色控制台处理器
console_handler = ColoredConsoleHandler()
console_handler.setLevel(logging.INFO)
# 创建文件处理器,按天轮转并压缩
log_file = os.path.join(logs_dir, "walmart.log")
file_handler = CompressedTimedRotatingFileHandler(
filename=log_file,
when="midnight", # 每天午夜轮转
interval=1, # 每1天轮转一次
backupCount=30, # 保留30个备份
encoding="utf-8",
)
file_handler.setLevel(logging.INFO)
# 设置日志格式
log_format = "%(asctime)s [%(levelname)s] [%(lineno_str)s] %(message)s"
formatter = ChinaTimeFormatter(log_format)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
# 添加处理器到logger
logger.addHandler(console_handler)
logger.addHandler(file_handler)
# 保存实例到全局变量
_logger_instance = logger
return logger