Go语言并发模式实战

34 Views
聊聊Go语言里那些好用的并发小技巧,从goroutine、channel到互斥锁,带你轻松玩转并发编程。

Go语言并发模式实战

Go之所以这么火,很大程度上就是因为它在并发编程这块儿做得太出色了,简洁又高效。刚接触Go的时候,我就是被它处理并发的优雅姿态给深深吸引了。

如果你也想让你的Go程序跑得更快、响应更及时,那并发编程就是你绕不开的一环。别担心,Go已经为我们准备好了一堆"神兵利器"。这篇文章呢,我就带大家一起看看Go里面有哪些常用的并发模式,分享一些我的实战经验和心得,希望能帮你轻松上手,写出丝滑的并发程序。

Goroutine:并发界的"轻骑兵"

首先登场的必须是Goroutine!你可以把它想象成Go语言里的"轻量级线程"。为什么说它轻量呢?因为它占用的资源非常少,启动一个Goroutine大概也就几KB的内存开销,比传统线程动辄MB级别的消耗小多了。这意味着你可以轻松创建成千上万个Goroutine,而不用太担心系统扛不住。

用起来也超级简单,就是在普通函数调用前加个go关键字,嗖的一下,这个函数就在一个新的Goroutine里跑起来了,而且完全不会阻塞你当前的主流程。

import (
	"fmt"
	"time"
)
 
func main() {
    // 看,就这么简单,hello函数就在另一个"线程"里跑起来了
    go hello("你好呀,并发的世界!")
    
    fmt.Println("main函数继续执行自己的任务...")
    // 这里我们稍微等一下,不然main函数跑完了,hello可能还没来得及打印
    time.Sleep(time.Second)
    fmt.Println("main函数执行完毕。")
}
 
func hello(msg string) {
    fmt.Println(msg)
}

几个小贴士

  • go一下,Goroutine立马就去执行了,你的主程序会继续往下走,不会傻等。
  • 要注意的是,如果主函数(main Goroutine)执行完了退出了,那么它启动的其他Goroutine也会被强制"团灭",不管它们干完活没有。
  • 所以,实际项目中,我们不会用time.Sleep这种简单粗暴的方式来等待Goroutine,而是会用更优雅的同步机制,比如后面要讲到的Channel或者sync.WaitGroup

Channel:Goroutine们的"传话筒"

光让Goroutine自己跑自己的还不够,它们之间总得交流信息吧?这时候就轮到Channel出场了。Channel是Go里面Goroutine之间通信的桥梁,有点像一个安全的管道,你可以往里面塞数据,另一个Goroutine再从里面取数据。这种"通过通信来共享内存"的设计理念,是Go并发编程的一大特色,能有效避免多线程共享数据时常见的竞态条件问题。

基础用法

创建一个Channel很简单,用make函数:

import (
	"fmt"
)
 
func main() {
    // 创建一个传递字符串类型的Channel
    ch := make(chan string)
    
    go func() {
        // 往Channel里发送一条消息
        fmt.Println("Goroutine准备发送消息...")
        ch <- "嘿,消息来啦!"
        fmt.Println("Goroutine消息已发送!")
    }()
    
    fmt.Println("Main Goroutine准备接收消息...")
    // 从Channel里接收消息,如果Channel是空的,这里会阻塞等待
    msg := <-ch
    fmt.Println("Main Goroutine收到消息:", msg)
}

在这个例子里,主Goroutine会等着从ch里接收到消息后,才会打印出来。这种阻塞等待的特性,使得Channel天然就具备了同步的功能。

带缓冲的Channel:给消息一点"喘息空间"

默认创建的Channel是无缓冲的,也就是说发送方发送数据时,必须要有接收方准备好接收,否则发送操作就会阻塞。有时候我们不希望发送方被阻塞,或者想批量发送一些数据,就可以用带缓冲的Channel。

// 创建一个缓冲大小为3的Channel,可以存3个int类型的消息
bufCh := make(chan int, 3)
 
bufCh <- 1 // 不会阻塞
bufCh <- 2 // 不会阻塞
bufCh <- 3 // 不会阻塞
// 如果再发送 bufCh <- 4 就会阻塞了,因为缓冲区满了
 
val1 := <-bufCh // 取出1

带缓冲的Channel就像一个有固定容量的邮箱,只要邮箱没满,你就可以一直往里投信,不用等收信人。满了之后,就得等收信人取走一些信件才能继续投了。

单向Channel:明确职责,防止误操作

有时候,我们希望一个函数只能往Channel里发数据(生产者),或者只能从Channel里收数据(消费者)。这时候就可以用单向Channel来限制Channel的使用方向,让代码意图更清晰,也更安全。

import (
	"fmt"
	"time"
)
 
// producer函数只能向out这个Channel发送数据
func producer(out chan<- int) {
    defer close(out) // 完成后记得关闭Channel
    for i := 0; i < 5; i++ {
        fmt.Printf("生产者: 发送 %d
", i)
        out <- i
        time.Sleep(time.Millisecond * 100) // 模拟生产耗时
    }
    fmt.Println("生产者: 所有数据发送完毕!")
}
 
// consumer函数只能从in这个Channel接收数据
func consumer(in <-chan int) {
    fmt.Println("消费者: 开始接收数据...")
    // 使用range来遍历Channel,直到Channel被关闭
    for num := range in {
        fmt.Printf("消费者: 收到 %d
", num)
    }
    fmt.Println("消费者: Channel已关闭,接收完毕!")
}
 
func main() {
    ch := make(chan int, 3) // 带缓冲的Channel
    go producer(ch)
    consumer(ch) // 主Goroutine作为消费者
    fmt.Println("所有任务完成!")
}

注意 chan<- int 表示只写Channel,<-chan int 表示只读Channel。这种写法能在编译期就帮你发现一些潜在的错误,挺不错的。

常见的并发"姿势"

掌握了Goroutine和Channel这两个基本功,我们就可以组合出很多强大的并发模式了。

工作池模式 (Worker Pool):人多力量大

想象一下,你有一大堆任务要处理,如果一个一个来,那得等到猴年马月。工作池模式就是找一堆"工人"(Goroutine)来同时处理这些任务。

import (
	"fmt"
	"time"
	"sync"
)
 
// worker是我们的"工人"
func worker(id int, jobs <-chan int, results chan<- string, wg *sync.WaitGroup) {
    defer wg.Done() // 确保每个worker完成后都通知WaitGroup
    for j := range jobs { // 从任务Channel里取任务
        fmt.Printf("工人 %d 号:开始处理任务 %d
", id, j)
        time.Sleep(time.Second) // 模拟干活需要时间
        resultMsg := fmt.Sprintf("工人 %d 号:搞定任务 %d,结果是 %d", id, j, j*2)
        fmt.Println(resultMsg)
        results <- resultMsg // 把处理结果发到结果Channel
    }
    fmt.Printf("工人 %d 号:任务队列空了,下班!
", id)
}
 
func main() {
    numJobs := 10
    jobs := make(chan int, numJobs)
    results := make(chan string, numJobs)
    
    var wg sync.WaitGroup // 用WaitGroup来等待所有工人下班
 
    // 启动3个"工人" Goroutine
    numWorkers := 3
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1) // 每启动一个工人,计数器加1
        go worker(w, jobs, results, &wg)
    }
    
    // 分配任务
    fmt.Println("老板:开始分配任务啦!")
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs) // 所有任务都分配完了,关闭任务Channel,工人们就知道没活干了
    fmt.Println("老板:任务分配完毕!")
    
    // 等待所有工人完成任务
    go func() { // 需要一个单独的goroutine来等待,防止阻塞main
        wg.Wait()
        close(results) // 所有工人都下班了,关闭结果Channel
        fmt.Println("老板:所有工人都下班了,可以关结果通道了。")
    }()
 
    // 收集并打印所有结果
    fmt.Println("
开始收集工作成果:")
    for res := range results {
        fmt.Println(res)
    }
    fmt.Println("
所有成果收集完毕!程序结束。")
}

这个模式非常实用,比如处理HTTP请求、图片处理、数据转换等等,只要是有大量独立任务的场景,都可以用它来提速。

超时控制 (Timeout):别让等待遥遥无期

有时候我们调用一个函数或者等待一个操作,不能让它无限期地等下去。比如调用一个外部API,如果对方半天没响应,我们总不能一直卡在那儿吧?这时候就需要超时控制了。Go的select语句配合time.After可以很优雅地实现这个功能。

import (
	"fmt"
	"time"
)
 
func main() {
    c := make(chan string)
    
    go func() {
        // 模拟一个耗时操作,比如2秒后才有结果
        time.Sleep(2 * time.Second)
        c <- "操作成功,结果来啦!"
    }()
    
    select {
    case res := <-c:
        fmt.Println("太棒了,接收到结果:", res)
    case <-time.After(1 * time.Second): // 我们只等1秒
        fmt.Println("唉,等了1秒还没结果,不等了,超时啦!")
    }
 
    // 如果操作需要更长时间
    go func() {
        time.Sleep(500 * time.Millisecond)
        c <- "第二次操作结果"
    }()
 
    select {
    case res := <-c:
        fmt.Println("第二次操作:", res)
    case <-time.After(1 * time.Second):
        fmt.Println("第二次操作超时了")
    }
}

select语句会等待多个Channel操作,哪个先准备好就执行哪个。time.After(duration)会在指定的duration之后向一个Channel发送一个值。所以,上面的代码就是:要么在1秒内从c收到结果,要么1秒后time.After的Channel就绪了,触发超时逻辑。是不是很巧妙?

扇入 (Fan-in) / 扇出 (Fan-out):分发与聚合

  • 扇出 (Fan-out):一个数据源,分发给多个处理单元(Goroutine)并行处理。就像一个水龙头的水流出来,分成好几股给不同的人用。
  • 扇入 (Fan-in):多个数据源(通常是多个Goroutine的处理结果),汇聚到一个输出Channel。就像多条小溪汇入一条大河。

这两个模式通常结合使用,构成一个处理流水线。

import (
	"fmt"
	"sync"
	"time"
)
 
// producer: 模拟一个数据源,产生数据
func producer(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
            time.Sleep(10 * time.Millisecond) // 模拟生产间隔
        }
    }()
    return out
}
 
// square: 处理器,接收一个数字,返回它的平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
            time.Sleep(50 * time.Millisecond) // 模拟处理耗时
        }
    }()
    return out
}
 
// fanIn: 扇入,合并多个channel的数据到一个channel
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
 
    // 为每个输入channel启动一个goroutine
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            out <- n
        }
    }
 
    wg.Add(len(channels))
    for _, c := range channels {
        go output(c)
    }
 
    // 启动一个goroutine,在所有数据都合并完成后关闭out
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}
 
func main() {
    // 1. 数据源
    source := producer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
 
    // 2. 扇出:启动两个worker来处理数据(计算平方)
    // 我们把同一个source分发给两个square worker
    worker1 := square(source)
    worker2 := square(source)
    // 注意:这里实际上两个worker会竞争同一个source channel的数据。
    // 更典型的扇出可能是一个dispatcher将数据分发到不同的channel,再给worker。
    // 或者,更简单的方式是为每个worker创建一个数据源的副本,或者将数据源的输出显式分发。
 
    // 为了演示扇出更清晰的场景,我们创建两个独立的处理流程
    sourceA := producer(1, 2, 3, 4, 5)
    sourceB := producer(6, 7, 8, 9, 10)
 
    processedA := square(sourceA)
    processedB := square(sourceB)
 
 
    fmt.Println("使用扇入模式聚合结果:")
    // 3. 扇入:将两个worker的处理结果合并
    mergedResults := fanIn(processedA, processedB)
 
    // 4. 消费最终结果
    for res := range mergedResults {
        fmt.Printf("扇入聚合结果: %d
", res)
    }
 
    fmt.Println("所有数据处理完成。")
}

上面的fanIn函数展示了如何把多个输入Channel的数据汇总到一个输出Channel。扇出则可以理解为producer将任务分发给多个square这样的Goroutine。这种流水线式的处理方式,能充分利用多核CPU的性能。

同步原语:不止有Channel

Channel虽好,但也不是万能的。Go还在sync包里给我们提供了一些传统的同步工具,关键时刻也能派上大用场。

WaitGroup:等待一组Goroutine完工

前面例子里其实已经用到了。如果你启动了一堆Goroutine,想等它们全都执行完毕再继续下一步,sync.WaitGroup就是你的好帮手。它内部有个计数器:

  • 调用Add(n)把计数器加n
  • 每个Goroutine完成任务后调用Done()把计数器减1。
  • 主Goroutine调用Wait(),它会阻塞,直到计数器归零。
import (
	"fmt"
	"sync"
	"time"
)
 
func doWork(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 确保任务完成时调用Done
    fmt.Printf("工人 %d:开始干活...
", id)
    time.Sleep(time.Duration(id) * time.Second) // 模拟不同工人干活时间不同
    fmt.Printf("工人 %d:活干完了!
", id)
}
 
func main() {
    var wg sync.WaitGroup
    
    numWorkers := 3
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1) // 每启动一个工人,计数器加1
        go doWork(i, &wg)
    }
    
    fmt.Println("老板:等着工人们下班...")
    wg.Wait() // 等待所有工人(计数器清零)
    fmt.Println("老板:所有工人都下班啦,收工!")
}

互斥锁 (Mutex):保护共享资源

当多个Goroutine需要访问同一个共享变量,并且至少有一个会修改它时,为了防止数据错乱(竞态条件),就需要用互斥锁(sync.Mutex)来保护这段"临界区"代码。 拿到锁的Goroutine才能执行临界区代码,执行完释放锁,其他等待的Goroutine才能抢锁。

import (
	"fmt"
	"sync"
	"time"
)
 
var (
    counter int
    mutex   sync.Mutex // 我们的守护神:互斥锁
)
 
// 增加counter的值,这个操作需要被保护
func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    mutex.Lock()   // 在访问counter前,加锁!
    // ---- 临界区开始 ----
    c := counter
    time.Sleep(10 * time.Millisecond) // 模拟一些操作,增加竞态条件发生的概率(如果不加锁)
    c++
    counter = c
    // ---- 临界区结束 ----
    mutex.Unlock() // 访问完毕,解锁!让别人用
}
 
func main() {
    var wg sync.WaitGroup
    numIncrements := 100
    
    fmt.Printf("初始Counter: %d
", counter)
 
    for i := 0; i < numIncrements; i++ {
        wg.Add(1)
        go increment(&wg)
    }
    
    wg.Wait() // 等待所有increment操作完成
    fmt.Printf("经过 %d 次增加后,Counter: %d (期望值: %d)
", numIncrements, counter, numIncrements)
    // 如果没有锁,这个counter的值很可能不是100
}

记得,锁的粒度要尽可能小,只锁住必要的部分,并且一定要用defer mutex.Unlock()来确保锁在函数退出时(无论是正常结束还是panic)都能被释放,不然就可能死锁啦!

Context:优雅地控制Goroutine的生命周期

context包是Go 1.7引入的,现在已经是并发控制的标配了。它主要用来在Goroutine之间传递取消信号、超时信息、截止时间以及其他请求范围的值。 想象一下,你发起一个请求,这个请求可能会触发一系列的Goroutine去执行不同的子任务。如果用户取消了这个请求,或者请求超时了,我们就需要一种机制来通知所有相关的Goroutine:"嘿,别干了,收工吧!" context.Context就是干这个的。

import (
	"context"
	"fmt"
	"time"
)
 
func workerWithContext(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done(): // 监听Context的取消信号
            fmt.Printf("工人 %d:收到取消信号 (原因: %v),摸鱼了...
", id, ctx.Err())
            return // 收到信号,退出
        default:
            fmt.Printf("工人 %d:努力工作中...
", id)
            time.Sleep(time.Second) // 模拟干活
        }
    }
}
 
func main() {
    // 创建一个5秒后会自动取消的Context
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    // cancel函数也很重要,当我们想主动取消时可以调用它
    defer cancel() // 无论如何,最后调用cancel释放资源
 
    fmt.Println("启动工人,给他们3秒钟工作时间...")
    go workerWithContext(ctx, 1)
    go workerWithContext(ctx, 2)
    
    // 让主Goroutine等待足够长的时间,以便观察worker的取消行为
    // 比如等6秒,这样就能看到Context超时后worker的反应
    time.Sleep(6 * time.Second) 
    
    fmt.Println("主程序:时间差不多了,准备退出。")
    // 如果想在超时前手动取消,可以调用 cancel()
    // cancel() 
    // time.Sleep(1 * time.Second) // 给点时间让worker响应取消
}

使用Context可以让你的并发代码更加健壮,特别是在构建大型的、涉及多个微服务的系统时,用它来控制请求的生命周期简直太香了。

总结

虽然 Goroutine 轻量,但也不是说可以毫无节制地创建。每个 Goroutine 还是会消耗一点内存(主要是栈空间,初始约2KB),如果失控地创建太多,内存还是会被吃光的。所以,对于可以复用的场景,考虑用工作池模式。

并且如果涉及到 channel 的使用,一定要记得关闭 channel,否则会导致 goroutine 泄漏。当需要管理多个 channel 的读写,或者需要超时控制时,可以使用 select 语句。

34 Views