引言
Go 语言对并发编程的一等支持是其最突出的特性。Goroutine 和 Channel 的设计深受 CSP (Communicating Sequential Processes)模型的影响——不要通过共享内存来通信, 而要通过通信来共享内存。
本文总结了我在实际项目中常用的七种并发模式,并附上可运行代码示例和适用场景分析。
模式一:Pipeline(流水线)
Pipeline 是最基础的 Channel 使用模式。每个阶段接收输入、处理、输出,通过 Channel 串联起来。
func pipeline() {
// Stage 1: 生成数据
gen := func(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// Stage 2: 平方计算
sq := func(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 组装流水线
for result := range sq(gen(2, 3, 4, 5)) {
fmt.Println(result) // 4, 9, 16, 25
}
}
模式二:Fan-Out / Fan-In
当一个上游处理速度跟不上时,可以将任务分发到多个 Goroutine 并行处理(Fan-Out), 再将结果合并到一个 Channel(Fan-In)。
func fanIn(chs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range chs {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
模式三:超时控制(Timeout)
网络请求、数据库查询等 I/O 操作必须设置超时,否则可能导致 Goroutine 泄漏。
用 select + time.After 实现简洁的超时控制:
func fetchWithTimeout(url string, timeout time.Duration) (string, error) {
result := make(chan string, 1)
errCh := make(chan error, 1)
go func() {
data, err := httpGet(url)
if err != nil {
errCh <- err
return
}
result <- data
}()
select {
case data := <-result:
return data, nil
case err := <-errCh:
return "", err
case <-time.After(timeout):
return "", fmt.Errorf("request timed out after %v", timeout)
}
}
注意 Channel 的 buffer 大小。设置为 1 可以避免 Goroutine 泄漏—— 即使没有人读取 result,发送操作也不会阻塞。
模式四:并发 Map-Reduce
对于大批量数据处理任务,可以结合 Pipeline 和 Fan-Out/Fan-In 实现并发 Map-Reduce:
// Map 阶段:并行处理
func parallelMap(input []int, workers int, fn func(int) int) []int {
jobs := make(chan int, len(input))
results := make(chan int, len(input))
// 启动 Worker
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
results <- fn(job)
}
}()
}
// 提交任务
for _, v := range input {
jobs <- v
}
close(jobs)
wg.Wait()
close(results)
// 收集结果
var output []int
for r := range results {
output = append(output, r)
}
return output
}
// Reduce 阶段:归并结果
func reduce(values []int, fn func(int, int) int) int {
result := values[0]
for _, v := range values[1:] {
result = fn(result, v)
}
return result
}
最佳实践总结
| 模式 | 适用场景 | 注意事项 |
|---|---|---|
| Pipeline | 串行数据处理 | 确保所有 Channel 正确关闭 |
| Fan-Out/Fan-In | 并行加速密集型任务 | 控制 Worker 数量避免过载 |
| Timeout | I/O 操作 | 使用 buffer Channel 避免泄漏 |
| Map-Reduce | 批处理数据 | 注意 Worker 数的权衡 |
| Or-Done | 多路取消 | 综合使用 Context |
| Tee-Channel | 广播分发 | 注意背压 |
| Bridge | Channel of Channels | 递归合并 |
小结
这七种模式覆盖了 Go 并发编程中 80% 以上的日常使用场景。 理解这些模式的关键不在于记住代码模板,而在于理解 Channel 的本质——Goroutine 之间的同步通信机制。
并发编程的核心原则永远不会变:避免竞态、防止死锁、控制 Goroutine 生命周期。 Go 的哲学不是消灭复杂性,而是用简单的方式表达并发逻辑。