Golang io.Pipe 源码阅读

2018/03/02 Golang

最近在看Golang源码,看到了io模块,这是对Pipe模块的阅读记录

// Pipe adapter to connect code expecting an io.Reader
// with code expecting an io.Writer.

package io

import (
	"errors"
	"sync"
)

// ErrClosedPipe is the error used for read or write operations on a closed pipe.
var ErrClosedPipe = errors.New("io: read/write on closed pipe")

// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
type pipe struct {
	rl    sync.Mutex // gates readers one at a time
	wl    sync.Mutex // gates writers one at a time
	l     sync.Mutex // protects remaining fields
	data  []byte     // data remaining in pending write
	rwait sync.Cond  // waiting reader
	wwait sync.Cond  // waiting writer
	rerr  error      // if reader closed, error to give writes
	werr  error      // if writer closed, error to give reads
}

func (p *pipe) read(b []byte) (n int, err error) {
	// One reader at a time.
	p.rl.Lock()         // 加读锁
	defer p.rl.Unlock() // 解读锁

	p.l.Lock()
	defer p.l.Unlock()
	for { // 循环检查错误 和 数据情况
		if p.rerr != nil {
			return 0, ErrClosedPipe
		}
		if p.data != nil { // data 有数据了,则 break 循环, 进行下面的copy 读取数据的操作
			break
		}
		if p.werr != nil {
			return 0, p.werr
		}
		p.rwait.Wait() // 读的等待, 等待 p.rwait.Signal
	}
	n = copy(b, p.data) // 一下把所有数据都copy 到 b中
	p.data = p.data[n:]
	if len(p.data) == 0 { // 没有data了, 告知 写goroutine,可以继续写了
		p.data = nil
		p.wwait.Signal() // 通知 p.wwait.Wait  释放等待, 准备进行下一次的写操作
	}
	return
}

var zero [0]byte

func (p *pipe) write(b []byte) (n int, err error) {
	// pipe uses nil to mean not available
	if b == nil {
		b = zero[:]
	}

	// One writer at a time.
	p.wl.Lock()
	defer p.wl.Unlock()

	p.l.Lock()
	defer p.l.Unlock()
	if p.werr != nil {
		err = ErrClosedPipe
		return
	}
	p.data = b  // 这是写操作, 将 b 的数据全部写入到data
	p.rwait.Signal() // 通知 p.rwait.Wait, 释放p.rwait.Wait 等待, 可以开始读数据了. 让读操作把data都读完,读完之后即可等待准备下一次写操作
	for {
		if p.data == nil {
			break // 有数据来了, break 等待循环, 进行写操作
		}
		if p.rerr != nil {
			err = p.rerr
			break
		}
		if p.werr != nil {
			err = ErrClosedPipe
			break
		}
		p.wwait.Wait() // 写的等待, 等待 p.wwait.Signal
	}
	n = len(b) - len(p.data)
	p.data = nil // in case of rerr or werr  // 将data置为nil,等待下一次的写操作
	return
}

func (p *pipe) rclose(err error) {
	if err == nil {
		err = ErrClosedPipe
	}
	p.l.Lock()
	defer p.l.Unlock()
	p.rerr = err
	p.rwait.Signal() // 释放读等待,避免阻塞
	p.wwait.Signal() // 释放写等待,避免阻塞
}

func (p *pipe) wclose(err error) {
	if err == nil {
		err = EOF
	}
	p.l.Lock()
	defer p.l.Unlock()
	p.werr = err
	p.rwait.Signal() // 释放读等待,避免阻塞
	p.wwait.Signal() // 释放写等待,避免阻塞
}

// 之后剩下的代码是具体定义了PipeReader和PipeWriter, 然后封装了read 和 write  close 实现方法方法, 也就顺畅的实现了io.Writer io.Reader 接口.

// A PipeReader is the read half of a pipe.
type PipeReader struct {
	p *pipe
}
func (r *PipeReader) Read(data []byte) (n int, err error) {...}
func (r *PipeReader) Close() error {...}
func (r *PipeReader) CloseWithError(err error) error {...}
// A PipeWriter is the write half of a pipe.
type PipeWriter struct {
	p *pipe
}
func (w *PipeWriter) Write(data []byte) (n int, err error) {...}
func (w *PipeWriter) Close() error {...}
func (w *PipeWriter) CloseWithError(err error) error {...}

func Pipe() (*PipeReader, *PipeWriter) { //  使用pipe 的第一步就是 调用  io.Pipe()  实际是返回两个 pipe, 一个封装为PipeReader, 专门用于读操作, 一个封装PipeWriter, 专门用于写操作
	p := new(pipe)
	p.rwait.L = &p.l
	p.wwait.L = &p.l
	r := &PipeReader{p}
	w := &PipeWriter{p}
	return r, w
}

这个逻辑,我觉得《简书-理解golang io.Pipe》讲的挺生动的.

Pipe的使用场景, 官方的说明:

Reads and Writes on the pipe are matched one to one except when multiple Reads are needed to consume a single Write

我的个人总结是, pipe 定义了 一个data []byte, 用于存数据. 定义了PipeReader 和 PipeWriter, 分别对data进行操作,读和写严格加锁,这样就能在多个goroutine中分别进行读写. 一次写操作或一次读操作都会把当前data的所有数据进行读写.读操作和写操作互相等待,读完数据了,读操作等待新的数据到来,就告诉在等待中的写goroutine,可以写数据了,写操作结束了,写操作等待数据被读取,就告诉等待中的读goroutine,可以进行读操作了. 这样就相当于data 的一端是进行读操作,一端进行写操作,写和读直接是串行的不是并发的.

官方文档也提了,Pipe适用有多个Read和一个Write的适用场景,所以,建议具体使用的时候,遵循官方的建议.当然,用Pipe还是可以用在一些有趣的地方.

参考网上的例子: https://github.com/pathbox/learning-go/tree/master/src/io/pipe

Search

    Table of Contents