并发超时处理
Jun 9, 2025 - ⧖ 7 min对于这种并发查询的超时处理,通常有以下几种处理策略,各有利弊:
1. 全部成功或全部失败策略
// 设置一个合理的超时时间
ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Minute )
defer cancel ()
// 如果任何一个查询失败或超时,整个任务失败
for i := 0 ; i < totalQueries ; i ++ {
select {
case result := <- resultChan :
if result .err != nil {
return nil , fmt .Errorf ("查询失败: %v" , result .err )
}
// 处理成功的结果
case <- ctx .Done ():
return nil , fmt .Errorf ("查询超时" )
}
}
- 优点:数据一致性好,不会出现部分数据的情况
- 缺点:一个查询失败就导致整个任务失败
2. 降级策略
type QueryResult struct {
Data interface {}
Err error
}
// 为每个查询设置默认值
defaultResults := map [string ]interface {}{
"order" : make (map [string ]param.OrderStats ),
"problem" : make (map [string ]param.ProblemStats ),
// ... 其他默认值
}
results := make (map [string ]interface {})
timeout := time .After (10 * time .Minute )
for i := 0 ; i < totalQueries ; i ++ {
select {
case result := <- resultChan :
if result .err != nil {
ecomm .Zlog .Error ("查询失败,使用默认值" ,
zap .String ("type" , result .name ),
zap .Error (result .err ))
results [result .name ] = defaultResults [result .name ]
} else {
results [result .name ] = result .data
}
case <- timeout :
// 超时后,未完成的查询使用默认值
ecomm .Zlog .Warn ("部分查询超时,使用默认值" )
for name , defaultValue := range defaultResults {
if _ , exists := results [name ]; ! exists {
results [name ] = defaultValue
}
}
goto ProcessResults
}
}
- 优点:系统可以继续运行,不会完全阻塞
- 缺点:数据可能不完整,需要明确标记哪些是默认值
3. 重试机制
func queryWithRetry (ctx context.Context , queryFunc func () (interface {}, error )) (interface {}, error ) {
retries := 3
for i := 0 ; i < retries ; i ++ {
result , err := queryFunc ()
if err == nil {
return result , nil
}
// 指数退避
backoff := time .Duration (math .Pow (2 , float64 (i ))) * time .Second
timer := time .NewTimer (backoff )
select {
case <- ctx .Done ():
return nil , ctx .Err ()
case <- timer .C :
continue
}
}
return nil , fmt .Errorf ("达到最大重试次数" )
}
// 使用
go func () {
result , err := queryWithRetry (ctx , func () (interface {}, error ) {
return dbcpm .QueryOrderStatsRt (startTime , endTime )
})
resultChan <- queryResult {"order" , result , err }
}()
- 优点:增加了成功的机会
- 缺点:可能会增加总体执行时间
4. 分批执行策略
// 将查询分组,每组执行完再执行下一组
queryGroups := [][]queryFunc {
{queryOrderStats , queryProblemStats }, // 优先级高的查询
{queryCompensStats , queryAppealStats }, // 次优先级查询
{queryLossStats , queryWyueStats }, // 其他查询
}
for _ , group := range queryGroups {
results := make ([]interface {}, len (group ))
errChan := make (chan error , len (group ))
// 执行当前组的所有查询
for i , query := range group {
go func (i int , q queryFunc ) {
result , err := q ()
if err != nil {
errChan <- err
return
}
results [i ] = result
}(i , query )
}
// 等待当前组完成或超时
select {
case err := <- errChan :
// 处理错误
case <- time .After (2 * time .Minute ):
// 处理超时
}
}
- 优点:可以控制并发量,优先保证重要数据的查询
- 缺点:总体执行时间可能会更长
5. 混合策略(推荐)
func (s * StDashboardRtTask ) AssembleData () (map [string ]param.ResDashboard , error ) {
ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Minute )
defer cancel ()
// 定义查询的优先级和重要性
queries := []struct {
name string
critical bool // 是否是关键查询
query func () (interface {}, error )
}{
{"order" , true , func () (interface {}, error ) {
return queryWithRetry (ctx , func () (interface {}, error ) {
return dbcpm .QueryOrderStatsRt (s .StartHour , s .EndHour )
})
}},
{"problem" , false , func () (interface {}, error ) {
return dbcpm .QueryProblemStatsRt (s .StartHour , s .EndHour )
}},
// ... 其他查询
}
results := make (map [string ]interface {})
var criticalError error
for _ , q := range queries {
select {
case <- ctx .Done ():
if len (results ) == 0 {
return nil , fmt .Errorf ("所有查询都超时" )
}
goto ProcessResults
default :
result , err := q .query ()
if err != nil {
if q .critical {
criticalError = err
break
}
// 非关键查询失败,使用默认值
results [q .name ] = getDefaultValue (q .name )
continue
}
results [q .name ] = result
}
}
if criticalError != nil {
return nil , criticalError
}
ProcessResults:
return buildResult (results ), nil
}
这个混合策略的特点是:
- 区分关键查询和非关键查询
- 关键查询失败会导致整个任务失败
- 非关键查询失败会使用默认值
- 为关键查询添加重试机制
- 设置合理的超时时间
- 提供清晰的日志记录和监控点
这样既保证了核心数据的可靠性,又不会因为非核心数据的问题导致整个任务失败。同时,通过日志和监控,我们可以及时发现和处理问题。
你觉得这些方案中哪个更适合你的场景?我们可以根据你的具体需求来调整和优化。