Go之所以这么火,很大程度上就是因为它在并发编程这块儿做得太出色了,简洁又高效。刚接触Go的时候,我就是被它处理并发的优雅姿态给深深吸引了。
如果你也想让你的Go程序跑得更快、响应更及时,那并发编程就是你绕不开的一环。别担心,Go已经为我们准备好了一堆"神兵利器"。这篇文章呢,我就带大家一起看看Go里面有哪些常用的并发模式,分享一些我的实战经验和心得,希望能帮你轻松上手,写出丝滑的并发程序。
首先登场的必须是Goroutine!你可以把它想象成Go语言里的"轻量级线程"。为什么说它轻量呢?因为它占用的资源非常少,启动一个Goroutine大概也就几KB的内存开销,比传统线程动辄MB级别的消耗小多了。这意味着你可以轻松创建成千上万个Goroutine,而不用太担心系统扛不住。
用起来也超级简单,就是在普通函数调用前加个go
关键字,嗖的一下,这个函数就在一个新的Goroutine里跑起来了,而且完全不会阻塞你当前的主流程。
import (
"fmt"
"time"
)
func main() {
// 看,就这么简单,hello函数就在另一个"线程"里跑起来了
go hello("你好呀,并发的世界!")
fmt.Println("main函数继续执行自己的任务...")
// 这里我们稍微等一下,不然main函数跑完了,hello可能还没来得及打印
time.Sleep(time.Second)
fmt.Println("main函数执行完毕。")
}
func hello(msg string) {
fmt.Println(msg)
}
几个小贴士:
go
一下,Goroutine立马就去执行了,你的主程序会继续往下走,不会傻等。
- 要注意的是,如果主函数(
main
Goroutine)执行完了退出了,那么它启动的其他Goroutine也会被强制"团灭",不管它们干完活没有。
- 所以,实际项目中,我们不会用
time.Sleep
这种简单粗暴的方式来等待Goroutine,而是会用更优雅的同步机制,比如后面要讲到的Channel或者sync.WaitGroup
。
光让Goroutine自己跑自己的还不够,它们之间总得交流信息吧?这时候就轮到Channel出场了。Channel是Go里面Goroutine之间通信的桥梁,有点像一个安全的管道,你可以往里面塞数据,另一个Goroutine再从里面取数据。这种"通过通信来共享内存"的设计理念,是Go并发编程的一大特色,能有效避免多线程共享数据时常见的竞态条件问题。
创建一个Channel很简单,用make
函数:
import (
"fmt"
)
func main() {
// 创建一个传递字符串类型的Channel
ch := make(chan string)
go func() {
// 往Channel里发送一条消息
fmt.Println("Goroutine准备发送消息...")
ch <- "嘿,消息来啦!"
fmt.Println("Goroutine消息已发送!")
}()
fmt.Println("Main Goroutine准备接收消息...")
// 从Channel里接收消息,如果Channel是空的,这里会阻塞等待
msg := <-ch
fmt.Println("Main Goroutine收到消息:", msg)
}
在这个例子里,主Goroutine会等着从ch
里接收到消息后,才会打印出来。这种阻塞等待的特性,使得Channel天然就具备了同步的功能。
默认创建的Channel是无缓冲的,也就是说发送方发送数据时,必须要有接收方准备好接收,否则发送操作就会阻塞。有时候我们不希望发送方被阻塞,或者想批量发送一些数据,就可以用带缓冲的Channel。
// 创建一个缓冲大小为3的Channel,可以存3个int类型的消息
bufCh := make(chan int, 3)
bufCh <- 1 // 不会阻塞
bufCh <- 2 // 不会阻塞
bufCh <- 3 // 不会阻塞
// 如果再发送 bufCh <- 4 就会阻塞了,因为缓冲区满了
val1 := <-bufCh // 取出1
带缓冲的Channel就像一个有固定容量的邮箱,只要邮箱没满,你就可以一直往里投信,不用等收信人。满了之后,就得等收信人取走一些信件才能继续投了。
有时候,我们希望一个函数只能往Channel里发数据(生产者),或者只能从Channel里收数据(消费者)。这时候就可以用单向Channel来限制Channel的使用方向,让代码意图更清晰,也更安全。
import (
"fmt"
"time"
)
// producer函数只能向out这个Channel发送数据
func producer(out chan<- int) {
defer close(out) // 完成后记得关闭Channel
for i := 0; i < 5; i++ {
fmt.Printf("生产者: 发送 %d
", i)
out <- i
time.Sleep(time.Millisecond * 100) // 模拟生产耗时
}
fmt.Println("生产者: 所有数据发送完毕!")
}
// consumer函数只能从in这个Channel接收数据
func consumer(in <-chan int) {
fmt.Println("消费者: 开始接收数据...")
// 使用range来遍历Channel,直到Channel被关闭
for num := range in {
fmt.Printf("消费者: 收到 %d
", num)
}
fmt.Println("消费者: Channel已关闭,接收完毕!")
}
func main() {
ch := make(chan int, 3) // 带缓冲的Channel
go producer(ch)
consumer(ch) // 主Goroutine作为消费者
fmt.Println("所有任务完成!")
}
注意 chan<- int
表示只写Channel,<-chan int
表示只读Channel。这种写法能在编译期就帮你发现一些潜在的错误,挺不错的。
掌握了Goroutine和Channel这两个基本功,我们就可以组合出很多强大的并发模式了。
想象一下,你有一大堆任务要处理,如果一个一个来,那得等到猴年马月。工作池模式就是找一堆"工人"(Goroutine)来同时处理这些任务。
import (
"fmt"
"time"
"sync"
)
// worker是我们的"工人"
func worker(id int, jobs <-chan int, results chan<- string, wg *sync.WaitGroup) {
defer wg.Done() // 确保每个worker完成后都通知WaitGroup
for j := range jobs { // 从任务Channel里取任务
fmt.Printf("工人 %d 号:开始处理任务 %d
", id, j)
time.Sleep(time.Second) // 模拟干活需要时间
resultMsg := fmt.Sprintf("工人 %d 号:搞定任务 %d,结果是 %d", id, j, j*2)
fmt.Println(resultMsg)
results <- resultMsg // 把处理结果发到结果Channel
}
fmt.Printf("工人 %d 号:任务队列空了,下班!
", id)
}
func main() {
numJobs := 10
jobs := make(chan int, numJobs)
results := make(chan string, numJobs)
var wg sync.WaitGroup // 用WaitGroup来等待所有工人下班
// 启动3个"工人" Goroutine
numWorkers := 3
for w := 1; w <= numWorkers; w++ {
wg.Add(1) // 每启动一个工人,计数器加1
go worker(w, jobs, results, &wg)
}
// 分配任务
fmt.Println("老板:开始分配任务啦!")
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // 所有任务都分配完了,关闭任务Channel,工人们就知道没活干了
fmt.Println("老板:任务分配完毕!")
// 等待所有工人完成任务
go func() { // 需要一个单独的goroutine来等待,防止阻塞main
wg.Wait()
close(results) // 所有工人都下班了,关闭结果Channel
fmt.Println("老板:所有工人都下班了,可以关结果通道了。")
}()
// 收集并打印所有结果
fmt.Println("
开始收集工作成果:")
for res := range results {
fmt.Println(res)
}
fmt.Println("
所有成果收集完毕!程序结束。")
}
这个模式非常实用,比如处理HTTP请求、图片处理、数据转换等等,只要是有大量独立任务的场景,都可以用它来提速。
有时候我们调用一个函数或者等待一个操作,不能让它无限期地等下去。比如调用一个外部API,如果对方半天没响应,我们总不能一直卡在那儿吧?这时候就需要超时控制了。Go的select
语句配合time.After
可以很优雅地实现这个功能。
import (
"fmt"
"time"
)
func main() {
c := make(chan string)
go func() {
// 模拟一个耗时操作,比如2秒后才有结果
time.Sleep(2 * time.Second)
c <- "操作成功,结果来啦!"
}()
select {
case res := <-c:
fmt.Println("太棒了,接收到结果:", res)
case <-time.After(1 * time.Second): // 我们只等1秒
fmt.Println("唉,等了1秒还没结果,不等了,超时啦!")
}
// 如果操作需要更长时间
go func() {
time.Sleep(500 * time.Millisecond)
c <- "第二次操作结果"
}()
select {
case res := <-c:
fmt.Println("第二次操作:", res)
case <-time.After(1 * time.Second):
fmt.Println("第二次操作超时了")
}
}
select
语句会等待多个Channel操作,哪个先准备好就执行哪个。time.After(duration)
会在指定的duration
之后向一个Channel发送一个值。所以,上面的代码就是:要么在1秒内从c
收到结果,要么1秒后time.After
的Channel就绪了,触发超时逻辑。是不是很巧妙?
- 扇出 (Fan-out):一个数据源,分发给多个处理单元(Goroutine)并行处理。就像一个水龙头的水流出来,分成好几股给不同的人用。
- 扇入 (Fan-in):多个数据源(通常是多个Goroutine的处理结果),汇聚到一个输出Channel。就像多条小溪汇入一条大河。
这两个模式通常结合使用,构成一个处理流水线。
import (
"fmt"
"sync"
"time"
)
// producer: 模拟一个数据源,产生数据
func producer(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
time.Sleep(10 * time.Millisecond) // 模拟生产间隔
}
}()
return out
}
// square: 处理器,接收一个数字,返回它的平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
time.Sleep(50 * time.Millisecond) // 模拟处理耗时
}
}()
return out
}
// fanIn: 扇入,合并多个channel的数据到一个channel
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为每个输入channel启动一个goroutine
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
// 启动一个goroutine,在所有数据都合并完成后关闭out
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// 1. 数据源
source := producer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// 2. 扇出:启动两个worker来处理数据(计算平方)
// 我们把同一个source分发给两个square worker
worker1 := square(source)
worker2 := square(source)
// 注意:这里实际上两个worker会竞争同一个source channel的数据。
// 更典型的扇出可能是一个dispatcher将数据分发到不同的channel,再给worker。
// 或者,更简单的方式是为每个worker创建一个数据源的副本,或者将数据源的输出显式分发。
// 为了演示扇出更清晰的场景,我们创建两个独立的处理流程
sourceA := producer(1, 2, 3, 4, 5)
sourceB := producer(6, 7, 8, 9, 10)
processedA := square(sourceA)
processedB := square(sourceB)
fmt.Println("使用扇入模式聚合结果:")
// 3. 扇入:将两个worker的处理结果合并
mergedResults := fanIn(processedA, processedB)
// 4. 消费最终结果
for res := range mergedResults {
fmt.Printf("扇入聚合结果: %d
", res)
}
fmt.Println("所有数据处理完成。")
}
上面的fanIn
函数展示了如何把多个输入Channel的数据汇总到一个输出Channel。扇出则可以理解为producer
将任务分发给多个square
这样的Goroutine。这种流水线式的处理方式,能充分利用多核CPU的性能。
Channel虽好,但也不是万能的。Go还在sync
包里给我们提供了一些传统的同步工具,关键时刻也能派上大用场。
前面例子里其实已经用到了。如果你启动了一堆Goroutine,想等它们全都执行完毕再继续下一步,sync.WaitGroup
就是你的好帮手。它内部有个计数器:
- 调用
Add(n)
把计数器加n
。
- 每个Goroutine完成任务后调用
Done()
把计数器减1。
- 主Goroutine调用
Wait()
,它会阻塞,直到计数器归零。
import (
"fmt"
"sync"
"time"
)
func doWork(id int, wg *sync.WaitGroup) {
defer wg.Done() // 确保任务完成时调用Done
fmt.Printf("工人 %d:开始干活...
", id)
time.Sleep(time.Duration(id) * time.Second) // 模拟不同工人干活时间不同
fmt.Printf("工人 %d:活干完了!
", id)
}
func main() {
var wg sync.WaitGroup
numWorkers := 3
for i := 1; i <= numWorkers; i++ {
wg.Add(1) // 每启动一个工人,计数器加1
go doWork(i, &wg)
}
fmt.Println("老板:等着工人们下班...")
wg.Wait() // 等待所有工人(计数器清零)
fmt.Println("老板:所有工人都下班啦,收工!")
}
当多个Goroutine需要访问同一个共享变量,并且至少有一个会修改它时,为了防止数据错乱(竞态条件),就需要用互斥锁(sync.Mutex
)来保护这段"临界区"代码。
拿到锁的Goroutine才能执行临界区代码,执行完释放锁,其他等待的Goroutine才能抢锁。
import (
"fmt"
"sync"
"time"
)
var (
counter int
mutex sync.Mutex // 我们的守护神:互斥锁
)
// 增加counter的值,这个操作需要被保护
func increment(wg *sync.WaitGroup) {
defer wg.Done()
mutex.Lock() // 在访问counter前,加锁!
// ---- 临界区开始 ----
c := counter
time.Sleep(10 * time.Millisecond) // 模拟一些操作,增加竞态条件发生的概率(如果不加锁)
c++
counter = c
// ---- 临界区结束 ----
mutex.Unlock() // 访问完毕,解锁!让别人用
}
func main() {
var wg sync.WaitGroup
numIncrements := 100
fmt.Printf("初始Counter: %d
", counter)
for i := 0; i < numIncrements; i++ {
wg.Add(1)
go increment(&wg)
}
wg.Wait() // 等待所有increment操作完成
fmt.Printf("经过 %d 次增加后,Counter: %d (期望值: %d)
", numIncrements, counter, numIncrements)
// 如果没有锁,这个counter的值很可能不是100
}
记得,锁的粒度要尽可能小,只锁住必要的部分,并且一定要用defer mutex.Unlock()
来确保锁在函数退出时(无论是正常结束还是panic)都能被释放,不然就可能死锁啦!
context
包是Go 1.7引入的,现在已经是并发控制的标配了。它主要用来在Goroutine之间传递取消信号、超时信息、截止时间以及其他请求范围的值。
想象一下,你发起一个请求,这个请求可能会触发一系列的Goroutine去执行不同的子任务。如果用户取消了这个请求,或者请求超时了,我们就需要一种机制来通知所有相关的Goroutine:"嘿,别干了,收工吧!" context.Context
就是干这个的。
import (
"context"
"fmt"
"time"
)
func workerWithContext(ctx context.Context, id int) {
for {
select {
case <-ctx.Done(): // 监听Context的取消信号
fmt.Printf("工人 %d:收到取消信号 (原因: %v),摸鱼了...
", id, ctx.Err())
return // 收到信号,退出
default:
fmt.Printf("工人 %d:努力工作中...
", id)
time.Sleep(time.Second) // 模拟干活
}
}
}
func main() {
// 创建一个5秒后会自动取消的Context
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
// cancel函数也很重要,当我们想主动取消时可以调用它
defer cancel() // 无论如何,最后调用cancel释放资源
fmt.Println("启动工人,给他们3秒钟工作时间...")
go workerWithContext(ctx, 1)
go workerWithContext(ctx, 2)
// 让主Goroutine等待足够长的时间,以便观察worker的取消行为
// 比如等6秒,这样就能看到Context超时后worker的反应
time.Sleep(6 * time.Second)
fmt.Println("主程序:时间差不多了,准备退出。")
// 如果想在超时前手动取消,可以调用 cancel()
// cancel()
// time.Sleep(1 * time.Second) // 给点时间让worker响应取消
}
使用Context
可以让你的并发代码更加健壮,特别是在构建大型的、涉及多个微服务的系统时,用它来控制请求的生命周期简直太香了。
虽然 Goroutine 轻量,但也不是说可以毫无节制地创建。每个 Goroutine 还是会消耗一点内存(主要是栈空间,初始约2KB),如果失控地创建太多,内存还是会被吃光的。所以,对于可以复用的场景,考虑用工作池模式。
并且如果涉及到 channel 的使用,一定要记得关闭 channel,否则会导致 goroutine 泄漏。当需要管理多个 channel 的读写,或者需要超时控制时,可以使用 select 语句。