refactor(camel_oil): 优化登录流程并添加链路追踪
- 删除了 LoginAccount 中的轮询逻辑,改为单次获取手机号 - 当手机号已存在时返回错误,提示重新获取手机号 - BatchLoginAccounts 函数中固定并发数为1,移除依赖的配置项 - 添加调用链的追踪 span,覆盖 LoginAccount、BatchLoginAccounts 及 pig 包的 GetAccountInfo 方法 - CronAccountPrefetchTask 中新增链路追踪并传递新的上下文 span - 统一日志上下文使用新的 gtrace span 追踪ctx,提升可观测性 - 移除部分无用导入,整理代码结构
This commit is contained in:
@@ -2,7 +2,11 @@ package camel_oil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/gogf/gf/v2/errors/gerror"
|
||||
"github.com/gogf/gf/v2/net/gtrace"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
"kami/internal/consts"
|
||||
"kami/internal/dao"
|
||||
"kami/internal/model/do"
|
||||
@@ -11,50 +15,32 @@ import (
|
||||
"kami/utility/integration/pig"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/v2/errors/gerror"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
)
|
||||
|
||||
// LoginAccount 执行账号登录流程
|
||||
// 注意:当前使用假数据,实际应对接骆驼加油平台和接码平台
|
||||
func (s *sCamelOil) LoginAccount(ctx context.Context) (err error) {
|
||||
// 获取设置
|
||||
settings, err := s.GetCamelOilSettings(ctx)
|
||||
if err != nil {
|
||||
glog.Errorf(ctx, "获取骆驼模块设置失败: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// 如果不使用豪猪平台,直接返回错误
|
||||
if !settings.UseHaozhuPlatform {
|
||||
return gerror.New("未启用豪猪平台,无法获取手机号")
|
||||
}
|
||||
ctx, span := gtrace.NewSpan(ctx, "sCamelOil.LoginAccount")
|
||||
defer span.End()
|
||||
|
||||
// 对接接码平台,获取手机号并检查是否已存在
|
||||
var phoneNumber string
|
||||
ticker := time.NewTicker(time.Second)
|
||||
for range ticker.C {
|
||||
phoneNumber, err = pig.NewClient().GetAccountInfo(ctx)
|
||||
if err != nil {
|
||||
return gerror.Wrap(err, "从豪猪平台获取手机号失败")
|
||||
}
|
||||
phoneNumber, err := pig.NewClient().GetAccountInfo(ctx)
|
||||
if err != nil {
|
||||
return gerror.Wrap(err, "从豪猪平台获取手机号失败")
|
||||
}
|
||||
|
||||
// 检查手机号是否已存在
|
||||
existingAccount, checkErr := dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()).
|
||||
Where(dao.V1CamelOilAccount.Columns().Phone, phoneNumber).
|
||||
One()
|
||||
if checkErr != nil {
|
||||
return gerror.Wrap(checkErr, "检查手机号是否存在失败")
|
||||
}
|
||||
// 检查手机号是否已存在
|
||||
existingAccount, checkErr := dao.V1CamelOilAccount.Ctx(ctx).DB(config.GetDatabaseV1()).
|
||||
Where(dao.V1CamelOilAccount.Columns().Phone, phoneNumber).
|
||||
One()
|
||||
if checkErr != nil {
|
||||
return gerror.Wrap(checkErr, "检查手机号是否存在失败")
|
||||
}
|
||||
|
||||
// 如果手机号已存在,继续获取新的手机号
|
||||
if existingAccount == nil {
|
||||
// 手机号不存在,可以使用
|
||||
break
|
||||
}
|
||||
glog.Infof(ctx, "手机号已存在,重新获取: %s", phoneNumber)
|
||||
// 如果手机号已存在,继续获取新的手机号
|
||||
if existingAccount == nil {
|
||||
// 手机号不存在,可以使用
|
||||
return errors.New("手机号已存在,重新获取")
|
||||
}
|
||||
|
||||
isOk, err := camel_oil_api.NewClient().SendCaptcha(ctx, phoneNumber)
|
||||
@@ -76,28 +62,20 @@ func (s *sCamelOil) LoginAccount(ctx context.Context) (err error) {
|
||||
|
||||
// BatchLoginAccounts 批量登录账号
|
||||
func (s *sCamelOil) BatchLoginAccounts(ctx context.Context, count int64) (successCount int64, err error) {
|
||||
ctx, span := gtrace.NewSpan(ctx, "sCamelOil.BatchLoginAccounts")
|
||||
defer span.End()
|
||||
|
||||
if count <= 0 {
|
||||
return 0, gerror.New("登录数量必须大于0")
|
||||
}
|
||||
|
||||
// 获取设置
|
||||
settings, err := s.GetCamelOilSettings(ctx)
|
||||
if err != nil {
|
||||
glog.Errorf(ctx, "获取骆驼模块设置失败: %v", err)
|
||||
return 0, err
|
||||
}
|
||||
if !settings.UseHaozhuPlatform {
|
||||
return 0, nil
|
||||
}
|
||||
// 使用设置中的并发数量控制登录
|
||||
semaphore := make(chan struct{}, settings.SingleAccountConcurrency)
|
||||
semaphore := make(chan struct{}, 1)
|
||||
var wg sync.WaitGroup
|
||||
var successCounter int64
|
||||
|
||||
for i := 0; i < int(count); i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
wg.Go(func() {
|
||||
semaphore <- struct{}{} // 获取信号量
|
||||
defer func() { <-semaphore }() // 释放信号量
|
||||
|
||||
@@ -107,7 +85,7 @@ func (s *sCamelOil) BatchLoginAccounts(ctx context.Context, count int64) (succes
|
||||
return
|
||||
}
|
||||
atomic.AddInt64(&successCounter, 1)
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
@@ -4,6 +4,9 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/gogf/gf/v2/errors/gerror"
|
||||
"github.com/gogf/gf/v2/net/gtrace"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"time"
|
||||
|
||||
"kami/internal/consts"
|
||||
@@ -43,16 +46,22 @@ func (s *sCamelOil) CronAccountPrefetchTask(ctx context.Context) error {
|
||||
|
||||
// 2. 如果在线账号少于目标数,触发并发登录
|
||||
if onlineCount < targetOnlineAccounts {
|
||||
ctx2, span := gtrace.NewTracer("CronAccountPrefetchTask").
|
||||
Start(ctx, "sCamelOil.CronAccountPrefetchTask",
|
||||
trace.WithNewRoot(),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
needCount := targetOnlineAccounts - onlineCount
|
||||
glog.Infof(ctx, "在线账号不足,需要登录 %d 个账号", needCount)
|
||||
span.SetAttributes(attribute.Int("needCount", needCount))
|
||||
|
||||
// 使用并发登录提高效率
|
||||
successCount, err2 := s.BatchLoginAccounts(ctx, int64(needCount))
|
||||
successCount, err2 := s.BatchLoginAccounts(ctx2, int64(needCount))
|
||||
if err2 != nil {
|
||||
glog.Errorf(ctx, "批量登录账号失败: %v", err2)
|
||||
glog.Errorf(ctx2, "批量登录账号失败: %v", err2)
|
||||
// 不返回错误,继续执行
|
||||
} else {
|
||||
glog.Infof(ctx, "成功登录 %d 个账号", successCount)
|
||||
glog.Infof(ctx2, "成功登录 %d 个账号", successCount)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/gogf/gf/v2/net/gtrace"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
"github.com/gogf/gf/v2/os/gtime"
|
||||
"kami/internal/service"
|
||||
@@ -75,33 +76,33 @@ func (c *InternalClient) getToken(ctx context.Context) (string, error) {
|
||||
|
||||
// GetAccountInfo 获取账号信息
|
||||
func (c *InternalClient) GetAccountInfo(ctx context.Context) (string, error) {
|
||||
ctx, span := gtrace.NewSpan(ctx, "pig.GetAccountInfo")
|
||||
defer span.End()
|
||||
//尝试100次,直到获取到号码为止
|
||||
for range 100 {
|
||||
token, err := c.getToken(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
respBody := struct {
|
||||
Token string `json:"token"`
|
||||
SID int `json:"sid"`
|
||||
}{
|
||||
Token: token,
|
||||
SID: 21108,
|
||||
}
|
||||
token, err := c.getToken(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
respBody := struct {
|
||||
Token string `json:"token"`
|
||||
SID int `json:"sid"`
|
||||
}{
|
||||
Token: token,
|
||||
SID: 21108,
|
||||
}
|
||||
|
||||
resp, err := c.Client.Post(ctx, fmt.Sprintf("https://api.haozhuyun.com/sms?api=getPhone&token=%s&sid=%d&Province=&ascription=&isp=", respBody.Token, respBody.SID))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
respStr := resp.ReadAllString()
|
||||
glog.Info(ctx, "获取信息", respStr)
|
||||
respStruct := struct {
|
||||
Phone string `json:"phone"`
|
||||
}{}
|
||||
err = json.Unmarshal([]byte(respStr), &respStruct)
|
||||
if respStruct.Phone != "" {
|
||||
return respStruct.Phone, nil
|
||||
}
|
||||
resp, err := c.Client.Post(ctx, fmt.Sprintf("https://api.haozhuyun.com/sms?api=getPhone&token=%s&sid=%d&Province=&ascription=&isp=", respBody.Token, respBody.SID))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
respStr := resp.ReadAllString()
|
||||
glog.Info(ctx, "获取信息", respStr)
|
||||
respStruct := struct {
|
||||
Phone string `json:"phone"`
|
||||
}{}
|
||||
err = json.Unmarshal([]byte(respStr), &respStruct)
|
||||
if respStruct.Phone != "" {
|
||||
return respStruct.Phone, nil
|
||||
}
|
||||
return "", fmt.Errorf("获取账号失败")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user