大家好,今天将梳理出的 Go语言并发知识内容,分享给大家。 请多多指教,谢谢。
本文主要介绍 sync 标准库中基本同步原语 sync.Cond
、sync.Once
、sync.Pool
介绍及使用。
本章节内容
sync.Cond
sync.Once
sync.Pool
sync.Cond
条件变量
Cond 类型原型
type Cond struct { // L 是在观察或改变状态时保持的 L Locker // 包含过滤或未导出的字段}func NewCond(l Locker) *Condfunc (c *Cond) Broadcast()func (c *Cond) Signal()func (c *Cond) Wait()
Cond 实现了一个条件变量,用于等待或宣布事件发生时 goroutine 的交汇点。 在这个定义中,“事件”是指两个或更多的goroutine之间的任何信号,仅指事件发生了,不包含其他任何信息。 通常,你可能想要在收到某个 goroutine 信号前令其处于等待状态。
每个 Cond 都有一个相关联的Locker L (通常是 Mutex
或 RWMutex
),当改变条件和调用 Wait()
方法时必须持有它。Cond在第一次使用后不得复制。
NewCond()
函数: 返回一个新的Cond与Locker l
func (*Cond) Broadcast()
方法:广播会唤醒所有等待c的goroutine
func (*Cond) Signal()
方法:信号唤醒了一个等待c的goroutine
func (*Cond) Wait()
方法:Wait自动解锁c.L并暂停调用goroutine的执行。在稍后恢复执行后,在返回之前等待锁定c.L。(主要为等待信号通知)
补充:
Wait()
方法会自动的对该条件变量关联的那个锁进行解锁,并且使它所在的 goroutine 阻塞。 一旦接收到通知,该方法所在的 goroutine 就会被唤醒,并且该方法会立即尝试锁定该锁。
Singnal()
和 Broadcast()
方法的作用都是发送通知,以唤醒正在为此阻塞的 goroutine。Singnal 的目标只有一个,Broadcast 的目标则是所有。
Cond 使用
举例1:假设我们有一个固定长度为2的队列,并且我们要将10个元素放入队列中。希望一有空间就能放入,所以在队列中有空间时需要立即通知。
package mainimport ( "fmt" "sync" "time")func main() { c := sync.NewCond(&sync.Mutex{}) // 1 queue := make([]interface{}, 0, 10) // 2 removeFromQueue := func(delay time.Duration) { time.Sleep(delay) c.L.Lock() // 8 queue = queue[1:] // 9 fmt.Println("Remove from queue") c.L.Unlock() // 10 c.Signal() // 11 } for i := 0; i < 10; i++ { c.L.Lock() // 3 for len(queue) == 2 { // 4 c.Wait() // 5 } fmt.Println("Adding to queue") queue = append(queue, struct{}{}) go removeFromQueue(1 * time.Second) // 6 c.L.Unlock() //7 }}
输出
Adding to queueAdding to queueRemove from queueAdding to queueRemove from queueAdding to queueRemove from queueRemove from queueAdding to queueAdding to queueRemove from queueAdding to queueRemove from queueAdding to queueRemove from queueAdding to queueRemove from queueAdding to queue
首先,我们使用一个标准的 sync.Mutex 作为Locker来创建Cond
然后创建一个长度为零的切片,最终会添加10个元素,因此将其容量设为10
在进入关键的部分前调用Lock来锁定c.L
检查队列的长度,确认什么时候需要等待。由于removeFromQueue是异步的,for不满足时才会跳出,而 if 做不到重复判断,这一点很重要。
调用Wait,将阻塞 main() goroutine
,直到接受到信号。
在这里创建一个新的 goroutine,它会在1秒后将元素移除队列。
退出条件的关键部分,因为已经成功加入了一个元素。
进入并发条件下的关键部分,修改与并发条件判断直接相关的数据。
移除切片的头部并重新分配给第二个元素,这一步模拟了元素出列。
退出操作关键部分
发出信号,通知处于等待装的 goroutine 可以进行下一步了。
举例2:介绍另一个方法 Broadcast()
, 它提供了一种同时与多个 goroutine 进行通信的解决方案。假设创建了一个带有按钮的GUI程序,该程序需要注册任意数量的函数,当点击按钮时运行这些函数,可以使用 Cond的 Broadcast
来通知所有已注册的函数。
package mainimport ( "fmt" "sync")func main() { type Button struct { Clicked *sync.Cond // 1 } button := Button{Clicked: sync.NewCond(&sync.Mutex{})} subscribe := func(c *sync.Cond, fn func()) { // 2 var tempwg sync.WaitGroup tempwg.Add(1) go func() { tempwg.Done() c.L.Lock() defer c.L.Unlock() c.Wait() fn() }() tempwg.Wait() } var wg sync.WaitGroup // 3 wg.Add(3) subscribe(button.Clicked, func(){ // 4 fmt.Println("Clicked 1") wg.Done() }) subscribe(button.Clicked, func(){ // 5 fmt.Println("Clicked 2") wg.Done() }) subscribe(button.Clicked, func(){ // 6 fmt.Println("Clicked 3") wg.Done() }) button.Clicked.Broadcast() // 7 wg.Wait()}
输出
Clicked 3Clicked 1Clicked 2
定义了一个 Button 类型,包含了 sync.Cond
指针类型的 Clicked 属性,这是 goroutine 接收通知的关键条件。
定义了一个注册函数来处理信号,每个注册的函数都在自己的 goroutine 上运行,并且在该 goroutine 不会退出,知道接到通知。
创建一个 WaitGroup 用来确保程序写入标准输出之前不会退出。
模拟注册一个处理函数
模拟注册一个处理函数
模拟注册一个处理函数
在按钮点击设置了一个处理程序,让所有注册的函数知道按钮已被点击。
sync.Once
一次性执行
Once 类型原型
type Once struct { // contains filtered or unexported fields}func (o *Once) Do(f func())
Once 主要作用是只执行一次处理,在第一次使用后将不可复制。
func (o *Once) Do(f func())
方法:当 Once 的这个实例第一次调用Do时,Do将调用函数f。换句话说,如果once.Do(f)被多次调用,只有第一个调用会调用f,即使f在每次调用中具有不同的值。每个函数执行时都需要一个Once的新实例。
Once 使用
package mainimport ( "fmt" "sync")func main() { var count int increment := func() { count++ } var once sync.Once var wg sync.WaitGroup wg.Add(100) for i := 0; i < 100; i++ { go func() { defer wg.Done() once.Do(increment) }() } wg.Wait() fmt.Printf("Count is %d\n", count)}
输出
Count is 1
sync.Once
确保了即使在不同的 goroutine上,调用 Do 传入的函数只执行一次。
sync.Pool
临时对象池
Pool 类型原型
type Pool struct { // New 可选指定要生成的函数 // 一个值,否则Get将返回nil // 它可能不会在调用Get时并发更改 New func() any // contains filtered or unexported fields}func (p *Pool) Get() anyfunc (p *Pool) Put(x any)
Pool 是一组可以单独保存和检索的临时对象。存储在池中的任何项目可以在任何时候自动删除而不通知。如果发生这种情况时Pool持有唯一的引用,则可能会释放该项。
Pool 可以安全的被多个 goroutine 同时使用。
Pool 的目的是缓存已分配但未使用的项,以便稍后重用,减轻垃圾收集器的压力。也就是说,它使构建高效的、线程安全的空闲列表变得容易。然而,它并不适用于所有的免费列表。
Pool 适当使用是管理一组临时项,这些临时项在包的并发独立客户端之间共享,并可能被包的并发独立客户端重用。Pool 提供了一种在多个客户端之间摊销分配开销的方法。
在较高的层次上,池模式是一种创建和提供固定数量可用对象的方式。它通常用于约束创建资源昂贵的事物(例如数据库连接)。Go的 sync.Pool
可以被多个例程安全地使用。
一个很好 Pool 的例子是在 fmt 包中,它维护一个动态大小的临时输出缓冲区存储。存储在负载下伸缩(当许多goroutines正在积极地打印时),在静默时收缩。
注意,作为生存期较短的对象一部分维护的空闲列表不适合用于Pool,因为在这种情况下,开销不会很好地摊销。让这些对象实现它们自己的空闲列表会更有效率。
func (p *Pool) Get() any
方法:Get()
从池中选择任意项,将其从池中移除,并将其返回给调用者。Get()
可以选择忽略池并将其视为空的。调用方不应假定传递给 Put()
的值与 Get()
返回的值之间存在任何关系。
如果Get将返回nil而p.New
是非nil,则Get将返回调用p.New
的结果。
func (p *Pool) Put(x any)
方法:Put将x添加到 Pool 中。
Pool 使用
举例1:Pool的主要接口是它的Get方法。 被调用时,Get将首先检查池中是否有可用实例返回给调用者,如果没有,则创建一个新成员变量。使用完成后,调用者调用Put将正在使用的实例放回池中供其他进程使用。
package mainimport ( "fmt" "sync")func main() { myPool := &sync.Pool{ New: func() interface{} { fmt.Println("create new instance") return struct{}{} }, } myPool.Get() // 1 instance := myPool.Get() // 1 myPool.Put(instance) // 2 myPool.Get() // 3}
输出
package mainimport ( "fmt" "sync" "time")func main() { c := sync.NewCond(&sync.Mutex{}) // 1 queue := make([]interface{}, 0, 10) // 2 removeFromQueue := func(delay time.Duration) { time.Sleep(delay) c.L.Lock() // 8 queue = queue[1:] // 9 fmt.Println("Remove from queue") c.L.Unlock() // 10 c.Signal() // 11 } for i := 0; i < 10; i++ { c.L.Lock() // 3 for len(queue) == 2 { // 4 c.Wait() // 5 } fmt.Println("Adding to queue") queue = append(queue, struct{}{}) go removeFromQueue(1 * time.Second) // 6 c.L.Unlock() //7 }}0
调用 Get 方法,将在池中定义 New 函数,因为实例尚未实例化
将先前检索到的实例放回池中,这时实例的可用数量为1个
执行此调用时,将重新使用先前分配的实例。 New函数不会被调用
举例2:指定分配内存
package mainimport ( "fmt" "sync" "time")func main() { c := sync.NewCond(&sync.Mutex{}) // 1 queue := make([]interface{}, 0, 10) // 2 removeFromQueue := func(delay time.Duration) { time.Sleep(delay) c.L.Lock() // 8 queue = queue[1:] // 9 fmt.Println("Remove from queue") c.L.Unlock() // 10 c.Signal() // 11 } for i := 0; i < 10; i++ { c.L.Lock() // 3 for len(queue) == 2 { // 4 c.Wait() // 5 } fmt.Println("Adding to queue") queue = append(queue, struct{}{}) go removeFromQueue(1 * time.Second) // 6 c.L.Unlock() //7 }}1
输出
package mainimport ( "fmt" "sync" "time")func main() { c := sync.NewCond(&sync.Mutex{}) // 1 queue := make([]interface{}, 0, 10) // 2 removeFromQueue := func(delay time.Duration) { time.Sleep(delay) c.L.Lock() // 8 queue = queue[1:] // 9 fmt.Println("Remove from queue") c.L.Unlock() // 10 c.Signal() // 11 } for i := 0; i < 10; i++ { c.L.Lock() // 3 for len(queue) == 2 { // 4 c.Wait() // 5 } fmt.Println("Adding to queue") queue = append(queue, struct{}{}) go removeFromQueue(1 * time.Second) // 6 c.L.Unlock() //7 }}2
储存了字节切片的指针
指向了字节切片的指针
案例中,如果不使用 sync.Pool
方式,可能需要分配千兆字节的内存。目前使用 sync.Pool
设置对象池方式只分配了 4KB。
举例3:Pool 另一种常见情况是预热分配对象缓存,用于必须尽快运行的操作。通过预先加载获取对另一个对象的引用来减少消费者的时间消耗。
package mainimport ( "fmt" "sync" "time")func main() { c := sync.NewCond(&sync.Mutex{}) // 1 queue := make([]interface{}, 0, 10) // 2 removeFromQueue := func(delay time.Duration) { time.Sleep(delay) c.L.Lock() // 8 queue = queue[1:] // 9 fmt.Println("Remove from queue") c.L.Unlock() // 10 c.Signal() // 11 } for i := 0; i < 10; i++ { c.L.Lock() // 3 for len(queue) == 2 { // 4 c.Wait() // 5 } fmt.Println("Adding to queue") queue = append(queue, struct{}{}) go removeFromQueue(1 * time.Second) // 6 c.L.Unlock() //7 }}3
正如这个例子所展现的,池模式非常适合于这种需要并发进程,或者构建这些对象可能会对内存产生负面影响的应用程序。
但是,在确定是否应该使用池时有一点需要注意:如果使用池子里东西在内存上不是大致均匀的,则会花更多时间将从池中检索,这比首先实例化它要耗费更多的资源。
因此,在使用 Pool 时,请记住以下几点:
实例化 sync.Pool
时,给它一个新元素,该元素应该是线程安全的。
当你从 Get 获得一个实例时,不要假设你接收到的对象状态。
当你从池中取得实例时,请务必不要忘记调用Put。否则池的优越性就体现不出来了。这通常用defer来执行延迟操作。
池中的元素必须大致上是均匀的。
Pool 特性
最后总结下临时对象池的特性:
临时对象池可以把由其中的对象值产生的存储压力进行分摊,它会专门为每一个与操作它的 goroutine 相关联的P建立本地池。 在临时对象池的 Get()
方法被调用时,它一般会先尝试从与本地 P 对应的本地池和本地共享池中获取一个对象值。 如果获取失败,就会试图从其他 P 共享池中取走一个对象值并直接返回给调用方。 注意,这个对象值生成函数产生的对象值永远不会被放置到池中,而是会被直接返回给调用方。 另外,临时对象池的 Put()
方法会把它的参数存放到本地 P 的本地池中,每个相关的 P 的本地共享池中的所有对象值,都是在当前临时对象池的范围内共享的。
垃圾回收的执行一般会使临时对象池中的对象值全部被移除。 即使永远不会显式地从临时对象池取走某个对象值,该对象值也不会永远待在临时对象池中,它的生命周期取决于垃圾回收任务下一次的执行时间。
技术文章持续更新,请大家多多关注呀~~
搜索微信公众号,关注我【 帽儿山的枪手 】
参考材料
[1] "《Go并发编程实战》书籍"
[2]: "《Concurrency in Go》书籍"
[3] https://pkg.go.dev/sync sync标准库
原文:https://juejin.cn/post/7100370980509319175