Go 并发编程模式:Goroutine 与 Channel 的七种用法

引言

Go 语言对并发编程的一等支持是其最突出的特性。Goroutine 和 Channel 的设计深受 CSP (Communicating Sequential Processes)模型的影响——不要通过共享内存来通信, 而要通过通信来共享内存。

本文总结了我在实际项目中常用的七种并发模式,并附上可运行代码示例和适用场景分析。

模式一:Pipeline(流水线)

Pipeline 是最基础的 Channel 使用模式。每个阶段接收输入、处理、输出,通过 Channel 串联起来。

func pipeline() {
    // Stage 1: 生成数据
    gen := func(nums ...int) <-chan int {
        out := make(chan int)
        go func() {
            for _, n := range nums {
                out <- n
            }
            close(out)
        }()
        return out
    }

    // Stage 2: 平方计算
    sq := func(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            for n := range in {
                out <- n * n
            }
            close(out)
        }()
        return out
    }

    // 组装流水线
    for result := range sq(gen(2, 3, 4, 5)) {
        fmt.Println(result) // 4, 9, 16, 25
    }
}

模式二:Fan-Out / Fan-In

当一个上游处理速度跟不上时,可以将任务分发到多个 Goroutine 并行处理(Fan-Out), 再将结果合并到一个 Channel(Fan-In)。

func fanIn(chs ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range chs {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                out <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

模式三:超时控制(Timeout)

网络请求、数据库查询等 I/O 操作必须设置超时,否则可能导致 Goroutine 泄漏。 用 select + time.After 实现简洁的超时控制:

func fetchWithTimeout(url string, timeout time.Duration) (string, error) {
    result := make(chan string, 1)
    errCh := make(chan error, 1)

    go func() {
        data, err := httpGet(url)
        if err != nil {
            errCh <- err
            return
        }
        result <- data
    }()

    select {
    case data := <-result:
        return data, nil
    case err := <-errCh:
        return "", err
    case <-time.After(timeout):
        return "", fmt.Errorf("request timed out after %v", timeout)
    }
}

注意 Channel 的 buffer 大小。设置为 1 可以避免 Goroutine 泄漏—— 即使没有人读取 result,发送操作也不会阻塞。

模式四:并发 Map-Reduce

对于大批量数据处理任务,可以结合 Pipeline 和 Fan-Out/Fan-In 实现并发 Map-Reduce:

// Map 阶段:并行处理
func parallelMap(input []int, workers int, fn func(int) int) []int {
    jobs := make(chan int, len(input))
    results := make(chan int, len(input))

    // 启动 Worker
    var wg sync.WaitGroup
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for job := range jobs {
                results <- fn(job)
            }
        }()
    }

    // 提交任务
    for _, v := range input {
        jobs <- v
    }
    close(jobs)
    wg.Wait()
    close(results)

    // 收集结果
    var output []int
    for r := range results {
        output = append(output, r)
    }
    return output
}

// Reduce 阶段:归并结果
func reduce(values []int, fn func(int, int) int) int {
    result := values[0]
    for _, v := range values[1:] {
        result = fn(result, v)
    }
    return result
}

最佳实践总结

模式 适用场景 注意事项
Pipeline 串行数据处理 确保所有 Channel 正确关闭
Fan-Out/Fan-In 并行加速密集型任务 控制 Worker 数量避免过载
Timeout I/O 操作 使用 buffer Channel 避免泄漏
Map-Reduce 批处理数据 注意 Worker 数的权衡
Or-Done 多路取消 综合使用 Context
Tee-Channel 广播分发 注意背压
Bridge Channel of Channels 递归合并

小结

这七种模式覆盖了 Go 并发编程中 80% 以上的日常使用场景。 理解这些模式的关键不在于记住代码模板,而在于理解 Channel 的本质——Goroutine 之间的同步通信机制。

并发编程的核心原则永远不会变:避免竞态、防止死锁、控制 Goroutine 生命周期。 Go 的哲学不是消灭复杂性,而是用简单的方式表达并发逻辑。