golang中的并发同步

2018-03-10 fishedee 后端

1 概述

golang中的并发同步,前置知识——Go的内存模型

2 状态

routine都是主动拉取数据的,例如,单一计数器,共享缓存等等,routine之间共享的数据就是状态。

2.1 非依赖状态

同步的状态,写入的数据不来源于自身。

  • sync.Value,在一个routine直接Store,而另外routine直接Load。sync.Value保证读写操作是原子的,也就是不可能出现Value的中间状态。
  • sync.Pointer,其实和sync.Value的用法是一样的,只是sync.Value操作的是一个值,而sync.Pointer操作的是一个引用。另外,就绝大部份的情况下,原子读写引用的性能比原子读写值的性能要快得多。

2.2 依赖状态

同步的状态,写入的数据来源于自身

  • atomic,在sync.atomic库中,提供了对int32,int64,uint32,uint64的原子add操作(计数器),原子cas操作(乐观锁)。
  • 容器,在sync中,目前只提供了Map容器的原子操作,并没有提供像java中的丰富的无锁数据结构,arraylist,linkedlist,queue等等。
  • 互斥锁,对多个变量的组合读写操作提供互斥块,只要进入互斥块就会被限制为单一进入,这也被称为悲观锁。
  • 读写锁,跟互斥锁类似,只是进一步区分了读和写的操作。只有读时多个读可以同时进入,只要含有写时就只能单一进入。当业务是写少读多的环境时,读写锁的效率就会快得多。

3 通知

routine之间存在着主动提醒和被动通知关系的,例如,生产者消费者模型,订阅发布模型,连接池模型,routine之间共享的提醒管道就是通知。

在普通的java,c模型中,这种同步的通知结构只能通过互斥锁,信号量和条件变量来实现。但是,golang中还额外提供了channel和select的工具来实现,这种工具比起传统的工具简单直观很多。在绝大部份的需要通知的情况下,我们都应该只使用channel和select。

3.1 一次提醒一次通知

一个routine的提醒,对应的只有一个routine的通知。

  • 同步:routine的提醒必须存在着另外一个的routine正在等待通知,否则该routine就会一直等待直到。方法就是channel unbuffered。
  • 半异步:routine的提醒允许此时没有一个的routine不在等待通知,另外一个routine下次等待通知时直接从channel中获取就可以了。但是,这种方法是大小限制的,当buffer溢出时,routine的提醒就必须要等待。方法就是channel buffered。
  • 全异步:routine的提醒允许此时没有一个的routine不在等待通知,而且可以假设buffer的大小是无限大的。routine的提醒在任何时候都是不需要等待。方法就是infinite channel,可以看这里这里

3.2 多次提醒一次通知

将多次提醒聚合为一次通知,方法其实很简单,将多次通知计数然后输出到另外一个提醒就可以了。

func do(ch chan bool) {
    time.Sleep(time.Second * 1)
    ch <- true
}
func main() {
    ch := make(chan bool)
    exit := make(chan bool)
    for i := 0; i != 4; i++ {
        go do(ch)
    }
    go func() {
        for i := 0; i != 4; i++ {
            <-ch
        }
        exit <- true
    }()

    fmt.Println(time.Now())
    <-exit
    fmt.Println(time.Now())
}

ch就是多次提醒的写入channel,exit就是一次通知的读取channel。在这个例子中,你也可以使用sync.Waitgroup来完成同样的操作。

3.3 一次提醒多次通知

一次提醒多次通知是channel中比较巧妙的一个特性,它使用的是close channel的特性。

func do(ch chan bool) {
    _, isOk := <-ch
    fmt.Println("exit!", isOk)
}
func main() {
    ch := make(chan bool)
    for i := 0; i != 4; i++ {
        go do(ch)
    }
    close(ch)

    time.Sleep(time.Second * 1)
}

对channel进行close操作后,这个channel进行读取操作时就会非阻塞地马上返回,它返回的第二个参数为false,代表channel已经被关闭了。这种一次提醒多次通知的功能常常用于通知关闭多个goroutine,这也是Context中Cancel的实现原理。

但是,要注意的是,对一个已经close的channel进行写入操作时会发生panic。

3.4 select

channel加上select后就能实现很多巧妙的功能

3.4.1 通知组合

func main() {
    readMsg := make(chan bool)
    writeMsg := make(chan bool)
    closeMsg := make(chan bool)

    for {
        select {
        case msg := <-readMsg:
            fmt.Println("websocket read msg", msg)
        case msg := <-writeMsg:
            fmt.Println("websocket write msg", msg)
        case <-closeMsg:
            break
        }
    }
}

在websocket的场景中,我们希望可以在一个routine中同时处理读消息,写消息,和主动踢下线的能力,select加channel就能很简洁的解决这个问题。

3.4.2 通知转换为channel

func (r *messageReader) Read(b []byte) (int, error) 

但是,在绝大部份的sdk中,我们routine接收提醒的方法都不是通过channel,而是阻塞方式的Read操作。例如在websocket库的接收消息方法Read。这样的Read方法是阻塞方式的,我们根本就不能用select将它和closeMsg,writeMsg组合起来。

for {
    select {
    case msg := websocket.Read(nil):
        fmt.Println("websocket read msg", msg)
    case msg := <-writeMsg:
        fmt.Println("websocket write msg", msg)
    case <-closeMsg:
        break
    }
}

也就是说,我们不能像上面这样写,因为select并不允许对非channel进行通知组合。

readMsg := make(chan bool)
writeMsg := make(chan bool)
closeMsg := make(chan bool)
var readErr error

go func() {
    for {
        data := make([]byte, 1024)
        size, err := websocket.Read(data)
        if err != nil {
            close(readMsg)
            readErr = err
            return
        }
        readMsg <- data[0:size]
    }
}()

for {
    select {
    case msg, isOk := <-readMsg:
        if isOk == false {
            close(websocket)
            fmt.Println("websocket read error", readErr)
            break
        } else {
            fmt.Println("websocket read msg", msg)
        }
    case msg := <-writeMsg:
        fmt.Println("websocket write msg", msg)
    case <-closeMsg:
        close(websocket)
        break
    }
}

解决方法很简单,开启一个单独的routine,将阻塞的读操作转换为readMsg的channel,我们就能重新用回原来的通知组合方法了。要注意的是,上面的代码有两个routine,当阻塞读取发生意外时,和,当主动踢下线时,这两种情况下,上面代码是如何退出程序的,是如何有序地退出两个routine的。并且,它保证了close websocket有且只有发生一次。

select {
    case msg, isOk := <-readMsg:
        if isOk == false {
            close(websocket)
            fmt.Println("websocket read error", readErr)
            break
        } else {
            fmt.Println("websocket read msg", msg)
            go this.msgListener(msg)
        }
    }

要注意的是,业务经常是收到消息后回调listener接口。为了避免listener接口返回来调用send或者close导致死锁的问题,在触发listener时都需要用单独的go routine来做。

3.4.3 占位通知

select的写法十分灵活,但它有一点不好,就是需要同时组合的channel数量在编译时就已经被固定下来了。无法实现,在运行时组合任意数量的channel。

解决办法是:

  • 对于在运行时可能需要减少组合的channel,你可以使用nil channel来代替。golang保证nil channel无论是读操作还是写操作都不会panic,它在select分支下更是永远都不会触发。具体可以看这里,一个很实用的例子是在这里中的infiniteBuffer函数。
  • 对于在运行时可能需要增加组合的channel,你可以使用reflect.Select,这可是最灵活的组合通知的方式。

3.4.4 非阻塞测试

readMsg := make(chan bool, 16)
writeMsg := make(chan bool, 16)

for i := 0; i != 16; i++ {
    writeMsg <- true
}

select {
case msg := <-readMsg:
    fmt.Println("read Msg", msg)
default:
    fmt.Println("read msg empty!")
}

select {
case writeMsg <- true:
    fmt.Println("write msg", true)
default:
    fmt.Println("write msg full!")
}

在select中加入default,我们就能实现测试通知的目的。read channel与default的组合,我们能实现测试channel是否有信息可读。write channel与default的组合,我们能实现测试channel是否已经满了,不能再写入了。

这个非阻塞测试的方法,被nsq消息队列使用来这样的场景。当对write msg进行写入测试失败时,就把msg写入文件中。这样就能同时兼顾内存有限但是速度快,文件速度慢但空间很大的平衡。

另外,你也可以用在reproxy的实现中,将所有的输入请求都丢入到一个较大的buffer channel中,然后后端服务不断从buffer channel中取出数据来处理。当请求突然增大,甚至远远超出后端服务的计划承受能力时,reproxy作为转发层就可以通过检查buffer channel是否已经满了,来提前把后面的所有请求都拒绝掉,以保护后端服务,避免雪崩。也就是传说中的fail-fast机制

4 总结

golang中的channel与select是处理并发同步的一大神器。

相关文章