sync包
锁
共享内存的并发问题
在很多语言中并发编程都有对一个内存进行读写的情况,比如下面的测试:
func test() {
count++
println(count)
}
var count int
func main() {
for i := 0; i < 30; i++ {
go test()
}
time.Sleep(3 * time.Second)
}
很简单,我们开启30个协程对count变量进行增加写操作,而后又进行读取打印操作。 结果并不是顺序的打出1-30,因为协程的执行时机是随机的。但是有重复的结果打印出,那就是多协程并发读写的原因了。 java语言有锁,golang中也有,有了锁,就可以安全的读写了。
互斥锁 sync.Mutex
sync.Mutex是一个结构体,它是开箱即用的。 它实现了Locker接口:
type Locker interface {
Lock()
Unlock()
}
通过sync.Mutex来对上面示例进行改造:
func test(mutex *sync.Mutex) {
mutex.Lock()
defer mutex.Unlock()//lock unlock要成对出现,直接使用defer更安全
count++
println(count)
}
var count int
func main() {
mutex := sync.Mutex{}
for i := 0; i < 30; i++ {
go test(&mutex)//这里要使用指针,若发生复制便不再是一个锁了
}
time.Sleep(3 * time.Second)
}
注意:
- 未lock直接unlock报错
读写锁 sync.RWMutex
sync.RWMutex也是一个结构体,内部嵌入sync.Mutex,也是开箱即用的。 读写锁顾名思义就是可以加只读锁和写锁
注意:
- RWMutex 是单写多读锁,该锁可以加多个读锁或者一个写锁
- 读锁占用的情况下会阻止写,不会阻止读,多个 goroutine 可以同时获取读锁
- 写锁会阻止其他 goroutine(无论读和写)进来,整个锁由该 goroutine 独占适用于读多写少的场景
总结起来就是读和写是互斥的,但是读之间是不互斥的。 读写锁使用的例子:
func testRead(group *sync.WaitGroup, mutex *sync.RWMutex) {
mutex.RLock()
defer func() {
mutex.RUnlock()
group.Done()
}()
fmt.Println("读取开始")
time.Sleep(time.Second)
println(count)
time.Sleep(time.Second)
fmt.Println("读取结束")
}
func testWrite(group *sync.WaitGroup, mutex *sync.RWMutex) {
mutex.Lock()
defer func() {
mutex.Unlock()
group.Done()
}()
fmt.Println("写开始")
time.Sleep(time.Second)
count++
time.Sleep(time.Second)
fmt.Println("写结束")
}
var count int
func main() {
mutex := sync.RWMutex{}
group := sync.WaitGroup{}
group.Add(10)
for i := 0; i < 5; i++ {
go testRead(&group, &mutex) //这里要使用指针,若发生复制便不再是一个锁了
go testWrite(&group, &mutex)
}
group.Wait()
}
打印结果:
可以看到,读操作是有重叠的,即他们之间不互斥,但是写操作之间,读写操作之间都没有交叉,他们是互斥的。
条件 sync.Cond
条件变量的作用并不是保证在同一时刻仅有一个线程访问某一个共享数据,而是在对应的共享数据的状态发生变化时,通知其他因此而被阻塞的线程。
条件变量要和配合锁来使用
unc testWait(group *sync.WaitGroup, mutex *sync.Cond) {
mutex.L.Lock()//wait前必须先加锁
defer func() {
mutex.L.Unlock()
group.Done()
}()
fmt.Println("读取开始")
mutex.Wait()//内部先解锁,再加锁
time.Sleep(time.Second)
fmt.Println("读取结束")
}
func testSignal(group *sync.WaitGroup, mutex *sync.Cond) {
fmt.Println("唤醒其他协程")
mutex.Signal()//唤醒一个
mutex.Broadcast()//唤醒所有等待该条件变量的协程
group.Done()
}
func main() {
mutex := sync.Mutex{}
cond := sync.NewCond(&mutex)
group := sync.WaitGroup{}
group.Add(2)
go testWait(&group, cond)
time.Sleep(time.Second)
go testSignal(&group, cond)
group.Wait()
}
wait会阻塞当前goroutine,内部会先解锁,所在协程进入阻塞等待状态,等到有人发通知恢复,再加锁,所以在wait之前要先加锁,否则会报错。
sync.Once
这个相对就比较简单了,直接看示例:
func testOnce() {
once := sync.Once{}
for i := 0; i < 5; i++ {
once.Do(func() {
fmt.Println("once do")
})
}
time.Sleep(3 * time.Second)
}
once.do中的函数只会执行一次,其实原理也很简单,once结构体中有个互斥锁及标签位
type Once struct {
m Mutex
done uint32
}
原子操作
通过对互斥锁的合理使用,我们可以使一个 goroutine 在执行临界区中的代码时,不被其他的goroutine 打扰。不过,虽然不会被打扰,但是它仍然可能会被中断(interruption)。
当协程正执行临界区的时候还是会可能失去CPU的。换句话说,互斥锁可以保证串行执行,但是不能保证原子执行。
在底层,由 CPU 提供芯片级别的支持,原子操作可以实现。这使得原子操作可以完全地消除竞态条件,并能够绝对地保证并发安全性。并且,它的执行速度要比其他的同步工具快得多,通常会高出好几个数量级。不过,它的缺点也很明显。 更具体地说,正是因为原子操作不能被中断,所以它需要足够简单,并且要求快速。
Go 语言的原子操作当然是基于 CPU 和操作系统的,所以它也只针对少数数据类型的值提供了原子 操作函数。这些函数都存在于标准库代码包sync/atomic中。
sync/atomic包中的函数可以做的原子操作有:
- 加法(add)
- 比较并交换(compare andswap,简称 CAS)
- 加载(load)
- 存储(store)
- 交换(swap)
这些函数针对的数据类型并不多。但是,对这些类型中的每一个,sync/atomic包都会有一套函 数给予支持。这些数据类型有:int32、int64、uint32、uint64、uintptr,以及unsafe包中的Pointer。 示例:
func testAtmic() {
atomic.AddInt32(&count, 10)
val := atomic.LoadInt32(&count)
fmt.Println(val)
atomic.AddInt32(&count, -1)
fmt.Println(count)
atomic.SwapInt32(&count, 33)
fmt.Println(count)
atomic.StoreInt32(&count, 11)
fmt.Println(count)
atomic.CompareAndSwapInt32(&count, 11, 22)
fmt.Println(count)
}
原子操作任意类型
此外,sync/atomic包还提供了一个名为Value的类型,它可以被用来存储任意类型的值
func testAtmicValue() {
value := atomic.Value{}
u := user{
name: "tom",
age: 12,
}
value.Store(u)
u2 := value.Load().(user)
fmt.Println(u2)
}
对象缓存池 sync.Pool
func testPool() {
pool := sync.Pool{
New: func() interface{} {//生成新的对象
return 0
},
}
value1 := pool.Get()//取
fmt.Println(value1)
pool.Put(1)//存
value2 := pool.Get()
fmt.Println(value2)
}
对象缓存池和普通意义的缓存池作用一样,我们可以指定生成新对象的方式,get时,缓存池中没有数据便调用改函数生成。
缓存池并没有数量的限制,每次gc时都会清除缓存池中的数据,所以要看清使用场景。比如当做数据连接池显然就不合适。
安全的map sync.Map
普通的map并不是并发安全的,于是提供了一个并发安全的。 简单使用示例:
func testMap(){
m := sync.Map{}
u := user{
name: "tom",
age: 12,
}
m.Store("zhangsan",u)
value, _ := m.Load("zhangsan")
u2 := value.(user)
fmt.Println(u2)
}
sync.Map主要使用方法: m.Load() m.Store() m.Delete() m.LoadOrStore() m.Range()
sync.map的操作方法key,value都是interface{}格式的,所以要自己进行类型判断,这显然有点麻烦。这里有个方案:
type mapWrapper struct {
M sync.Map
keyType reflect.Type
valType reflect.Type
}
func (m *mapWrapper) load(key interface{}) (value interface{}, ok bool) {
if reflect.TypeOf(key) != m.keyType {
return nil, false
}
return m.M.Load(key)
}
func (m *mapWrapper) store(key interface{}, value interface{}) (ok bool) {
if reflect.TypeOf(key) != m.keyType { //校验key和value类型
return false
}
if reflect.TypeOf(value) != m.valType {
return false
}
m.M.Store(key, value)
return true
}
func testMap() {
m := mapWrapper{
keyType: reflect.TypeOf(""),
valType: reflect.TypeOf(user{}),
}
u := user{
name: "tom",
age: 12,
}
ok := m.store(1, 2)
fmt.Println(ok)
ok = m.store("zhangsan", u)
fmt.Println(ok)
value, ok := m.load("zhangsan")
fmt.Println(value, ok)
}
主要思路是对sync.Map封装,操作的时候校验类型,类型由初始化的时候传入。
注意: 和原来的map一样,虽然key是interface{}类型,但是一定是可以比较类型的,func,map等作为key就不别想了
总结
-
互斥锁是一个很有用的同步工具,它可以保证每一时刻进入临界区的 goroutine 只有一个。读写锁对共享资源的写操作和读操作则区别看待,并消除了读操作之间的互斥。
-
条件变量主要是用于协调想要访问共享资源的那些线程。 当共享资源的状态发生变化时,它可以被用来通知被互斥锁阻塞的线程,它既可以基于互斥锁,也可以基于读写锁。当然了,读写锁也是一种互斥锁,前者是对后者的扩展。
-
原子操作不光能保证并发安全(不使用锁),还能保证临界区操作不被中断而原子执行,但是只支持少数的数据类型。当然也有atomic.Value支持任意类型
-
缓存池提供了一个api,可以提高重复使用率,但是要注意gc时会清空。