并发超时处理

对于这种并发查询的超时处理,通常有以下几种处理策略,各有利弊:

1. 全部成功或全部失败策略

// 设置一个合理的超时时间
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()

// 如果任何一个查询失败或超时,整个任务失败
for i := 0; i < totalQueries; i++ {
    select {
    case result := <-resultChan:
        if result.err != nil {
            return nil, fmt.Errorf("查询失败: %v", result.err)
        }
        // 处理成功的结果
    case <-ctx.Done():
        return nil, fmt.Errorf("查询超时")
    }
}
  • 优点:数据一致性好,不会出现部分数据的情况
  • 缺点:一个查询失败就导致整个任务失败

2. 降级策略

type QueryResult struct {
    Data interface{}
    Err  error
}

// 为每个查询设置默认值
defaultResults := map[string]interface{}{
    "order":   make(map[string]param.OrderStats),
    "problem": make(map[string]param.ProblemStats),
    // ... 其他默认值
}

results := make(map[string]interface{})
timeout := time.After(10 * time.Minute)

for i := 0; i < totalQueries; i++ {
    select {
    case result := <-resultChan:
        if result.err != nil {
            ecomm.Zlog.Error("查询失败,使用默认值",
                zap.String("type", result.name),
                zap.Error(result.err))
            results[result.name] = defaultResults[result.name]
        } else {
            results[result.name] = result.data
        }
    case <-timeout:
        // 超时后,未完成的查询使用默认值
        ecomm.Zlog.Warn("部分查询超时,使用默认值")
        for name, defaultValue := range defaultResults {
            if _, exists := results[name]; !exists {
                results[name] = defaultValue
            }
        }
        goto ProcessResults
    }
}
  • 优点:系统可以继续运行,不会完全阻塞
  • 缺点:数据可能不完整,需要明确标记哪些是默认值

3. 重试机制

func queryWithRetry(ctx context.Context, queryFunc func() (interface{}, error)) (interface{}, error) {
    retries := 3
    for i := 0; i < retries; i++ {
        result, err := queryFunc()
        if err == nil {
            return result, nil
        }

        // 指数退避
        backoff := time.Duration(math.Pow(2, float64(i))) * time.Second
        timer := time.NewTimer(backoff)

        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case <-timer.C:
            continue
        }
    }
    return nil, fmt.Errorf("达到最大重试次数")
}

// 使用
go func() {
    result, err := queryWithRetry(ctx, func() (interface{}, error) {
        return dbcpm.QueryOrderStatsRt(startTime, endTime)
    })
    resultChan <- queryResult{"order", result, err}
}()
  • 优点:增加了成功的机会
  • 缺点:可能会增加总体执行时间

4. 分批执行策略

// 将查询分组,每组执行完再执行下一组
queryGroups := [][]queryFunc{
    {queryOrderStats, queryProblemStats},    // 优先级高的查询
    {queryCompensStats, queryAppealStats},   // 次优先级查询
    {queryLossStats, queryWyueStats},        // 其他查询
}

for _, group := range queryGroups {
    results := make([]interface{}, len(group))
    errChan := make(chan error, len(group))

    // 执行当前组的所有查询
    for i, query := range group {
        go func(i int, q queryFunc) {
            result, err := q()
            if err != nil {
                errChan <- err
                return
            }
            results[i] = result
        }(i, query)
    }

    // 等待当前组完成或超时
    select {
    case err := <-errChan:
        // 处理错误
    case <-time.After(2 * time.Minute):
        // 处理超时
    }
}
  • 优点:可以控制并发量,优先保证重要数据的查询
  • 缺点:总体执行时间可能会更长

5. 混合策略(推荐)

func (s *StDashboardRtTask) AssembleData() (map[string]param.ResDashboard, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
    defer cancel()

    // 定义查询的优先级和重要性
    queries := []struct {
        name     string
        critical bool  // 是否是关键查询
        query    func() (interface{}, error)
    }{
        {"order", true, func() (interface{}, error) {
            return queryWithRetry(ctx, func() (interface{}, error) {
                return dbcpm.QueryOrderStatsRt(s.StartHour, s.EndHour)
            })
        }},
        {"problem", false, func() (interface{}, error) {
            return dbcpm.QueryProblemStatsRt(s.StartHour, s.EndHour)
        }},
        // ... 其他查询
    }

    results := make(map[string]interface{})
    var criticalError error

    for _, q := range queries {
        select {
        case <-ctx.Done():
            if len(results) == 0 {
                return nil, fmt.Errorf("所有查询都超时")
            }
            goto ProcessResults
        default:
            result, err := q.query()
            if err != nil {
                if q.critical {
                    criticalError = err
                    break
                }
                // 非关键查询失败,使用默认值
                results[q.name] = getDefaultValue(q.name)
                continue
            }
            results[q.name] = result
        }
    }

    if criticalError != nil {
        return nil, criticalError
    }

ProcessResults:
    return buildResult(results), nil
}

这个混合策略的特点是:

  1. 区分关键查询和非关键查询
  2. 关键查询失败会导致整个任务失败
  3. 非关键查询失败会使用默认值
  4. 为关键查询添加重试机制
  5. 设置合理的超时时间
  6. 提供清晰的日志记录和监控点

这样既保证了核心数据的可靠性,又不会因为非核心数据的问题导致整个任务失败。同时,通过日志和监控,我们可以及时发现和处理问题。

你觉得这些方案中哪个更适合你的场景?我们可以根据你的具体需求来调整和优化。