Files
kami_backend/internal/logic/limiter/rate.go
danial 15e2426e85 feat(camel_oil): 新增骆驼加油账号管理模块
- 实现账号增删改查接口和逻辑
- 支持账号状态更新及状态历史记录功能
- 提供账号列表、历史和统计信息查询API
- 实现账号轮询机制,支持按使用时间轮询获取账号
- 增加账号登录流程及批量登录功能,集成接码平台和平台API
- 管理账号订单容量,支持容量检查与账号登录触发
- 提供账号池状态统计接口
- 账号历史记录查询支持多种变更类型文本展示
- 密码等敏感信息采用脱敏展示
- 完善日志记录和错误处理机制,保证业务稳定运行
2025-11-21 00:49:50 +08:00

219 lines
6.2 KiB
Go

package limiter
import (
"context"
"fmt"
"kami/internal/model"
"kami/internal/service"
"kami/utility/cache"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gmutex"
)
func init() {
service.RegisterRate(New())
}
func New() *sRate {
return &sRate{
mu: gmutex.Mutex{},
configs: make(map[model.LimiterType]*model.LimiterConfig),
cache: cache.NewCache(),
}
}
type sRate struct {
mu gmutex.Mutex
configs map[model.LimiterType]*model.LimiterConfig
cache *cache.Cache
}
// Check 检查限流
func (r *sRate) Check(ctx context.Context, limiterType model.LimiterType, key string) *model.LimiterResult {
config := r.GetLimiterConfig(limiterType)
if config == nil {
// 如果没有配置,使用默认配置
config = r.getDefaultConfig(limiterType)
}
return r.CheckWithConfig(ctx, config, key)
}
// CheckWithConfig 使用自定义配置检查限流
func (r *sRate) CheckWithConfig(ctx context.Context, config *model.LimiterConfig, key string) *model.LimiterResult {
if config == nil {
return &model.LimiterResult{
Allowed: true,
Remaining: 0,
RetryAfter: 0,
}
}
cacheKey := r.buildCacheKey(config.Type, key)
switch config.Strategy {
case model.StrategySlidingWindow:
return r.slidingWindowCheck(ctx, cacheKey, config)
case model.StrategyFixedWindow:
return r.fixedWindowCheck(ctx, cacheKey, config)
case model.StrategyTokenBucket:
return r.tokenBucketCheck(ctx, cacheKey, config)
default:
// 默认使用滑动窗口
return r.slidingWindowCheck(ctx, cacheKey, config)
}
}
// Allow 简单的限流检查,返回是否允许通过
func (r *sRate) Allow(ctx context.Context, limiterType model.LimiterType, key string) bool {
result := r.Check(ctx, limiterType, key)
return result.Allowed
}
// GetLimiterConfig 获取限流器配置
func (r *sRate) GetLimiterConfig(limiterType model.LimiterType) *model.LimiterConfig {
r.mu.LockFunc(func() {
if _, ok := r.configs[limiterType]; !ok {
r.configs[limiterType] = r.getDefaultConfig(limiterType)
}
})
return r.configs[limiterType]
}
// SetLimiterConfig 设置限流器配置(运行时修改)
func (r *sRate) SetLimiterConfig(limiterType model.LimiterType, config *model.LimiterConfig) error {
r.mu.LockFunc(func() {
r.configs[limiterType] = config
})
return nil
}
// Reset 重置指定key的限流计数
func (r *sRate) Reset(ctx context.Context, limiterType model.LimiterType, key string) error {
cacheKey := r.buildCacheKey(limiterType, key)
_, err := r.cache.Remove(ctx, cacheKey)
return err
}
// GetRemaining 获取剩余可用次数
func (r *sRate) GetRemaining(ctx context.Context, limiterType model.LimiterType, key string) int {
result := r.Check(ctx, limiterType, key)
return result.Remaining
}
// buildCacheKey 构建缓存键名
func (r *sRate) buildCacheKey(limiterType model.LimiterType, key string) string {
return fmt.Sprintf("rate_limiter:%s:%s", limiterType, key)
}
// slidingWindowCheck 滑动窗口限流检查(简化版本,使用固定窗口近似实现)
func (r *sRate) slidingWindowCheck(ctx context.Context, cacheKey string, config *model.LimiterConfig) *model.LimiterResult {
return r.fixedWindowCheck(ctx, cacheKey, config)
}
// fixedWindowCheck 固定窗口限流检查
func (r *sRate) fixedWindowCheck(ctx context.Context, cacheKey string, config *model.LimiterConfig) *model.LimiterResult {
now := time.Now()
windowStart := now.Truncate(config.Window).Unix()
windowKey := fmt.Sprintf("%s:%d", cacheKey, windowStart)
// 先获取当前计数
currentValue, err := r.cache.Get(ctx, windowKey)
var count int64 = 0
if err == nil && currentValue != nil && !currentValue.IsNil() {
count = currentValue.Int64()
}
// 检查是否超过限制
if count >= int64(config.Capacity) {
remaining := 0
nextWindow := now.Truncate(config.Window).Add(config.Window)
retryAfter := time.Until(nextWindow)
if retryAfter < 0 {
retryAfter = 0
}
return &model.LimiterResult{
Allowed: false,
Remaining: remaining,
ResetAt: nextWindow,
RetryAfter: retryAfter,
}
}
// 增加计数
err = r.cache.Incr(ctx, windowKey, config.Window)
if err != nil {
g.Log().Errorf(ctx, "fixed window limiter incr failed: %v", err)
// 降级处理:出错时允许通过
return &model.LimiterResult{
Allowed: true,
Remaining: config.Capacity - int(count) - 1,
RetryAfter: 0,
}
}
count++
remaining := config.Capacity - int(count)
return &model.LimiterResult{
Allowed: true,
Remaining: remaining,
ResetAt: now.Truncate(config.Window).Add(config.Window),
RetryAfter: 0,
}
}
// tokenBucketCheck 令牌桶限流检查(简化版本,使用固定窗口近似实现)
func (r *sRate) tokenBucketCheck(ctx context.Context, cacheKey string, config *model.LimiterConfig) *model.LimiterResult {
return r.fixedWindowCheck(ctx, cacheKey, config)
}
// getDefaultConfig 获取默认配置
func (r *sRate) getDefaultConfig(limiterType model.LimiterType) *model.LimiterConfig {
switch limiterType {
case model.LimiterTypeSysUserLogin:
return &model.LimiterConfig{
Type: model.LimiterTypeSysUserLogin,
Strategy: model.StrategySlidingWindow,
Capacity: 10,
Window: time.Minute,
}
case model.LimiterTypeCardInfoJdAccountCookieChecker:
return &model.LimiterConfig{
Type: model.LimiterTypeCardInfoJdAccountCookieChecker,
Strategy: model.StrategyFixedWindow,
Capacity: 30,
Window: time.Minute,
}
case model.LimiterTypeCardInfoJdAccountCookieSet:
return &model.LimiterConfig{
Type: model.LimiterTypeCardInfoJdAccountCookieSet,
Strategy: model.StrategyFixedWindow,
Capacity: 10,
Window: time.Minute,
}
case model.LimiterTypeCardInfoRedeemAccountCookieChecker:
return &model.LimiterConfig{
Type: model.LimiterTypeCardInfoRedeemAccountCookieChecker,
Strategy: model.StrategyFixedWindow,
Capacity: 30,
Window: time.Minute,
}
case model.LimiterTypeCardInfoRedeemAccountCookieSet:
return &model.LimiterConfig{
Type: model.LimiterTypeCardInfoRedeemAccountCookieSet,
Strategy: model.StrategyFixedWindow,
Capacity: 10,
Window: time.Minute,
}
default:
return &model.LimiterConfig{
Type: limiterType,
Strategy: model.StrategySlidingWindow,
Capacity: 100,
Window: time.Minute,
}
}
}