1、BufReader

一句话总结:拿一大块buff去尝试读取底层IO的数据,然后逐步被上层读取;

需要注意的是,如果上层来读取时的buff比BufReader自己的buf还要大,并且自己并没有缓存数据时,会直接进行读取,读取到上层的buf中去;

impl AsyncRead for BufReader {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll> {
        // If we don't have any buffered data and we're doing a massive read
        // (larger than our internal buffer), bypass our internal buffer
        // entirely.
        if self.pos == self.cap && buf.remaining() >= self.buf.len() {
            let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
            self.discard_buffer();
            return Poll::Ready(res);
        }
        let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
        let amt = std::cmp::min(rem.len(), buf.remaining());
        buf.put_slice(&rem[..amt]);
        self.consume(amt);
        Poll::Ready(Ok(()))
    }
}

BufReader适合上层慢慢读取的场景

2、BufWriter

一句话总结: 如果要写入的数据小,则缓存,如果很大,直接写。

    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll> {
        if self.buf.len() + buf.len() > self.buf.capacity() {
            ready!(self.as_mut().flush_buf(cx))?;
        }

        let me = self.project();
        // 这儿又个小trick,如果为true,那么必然已经flush了,也就是self.buf肯定为空,所以可以直接写入上层的buf数据。
        if buf.len() >= me.buf.capacity() {
            me.inner.poll_write(cx, buf)
        } else {
            // 这儿注意,me.buf.write可能会extence Vec的cap,因为self.buf里面可能还有一些数据,然后加上buff.len()可能超过self.buf的cap,所以BufWriter::with_capacity的只是初始cap,并不是一直不变的。
            Poll::Ready(me.buf.write(buf))
        }
    }

如果长期写入大块的数据(大于设置的capacity),BufWriter相当于啥事儿没用。

BufWriter适合上层写入大量比较短的数据,缓存效果比较好。

3.tokio::io::copy

pub async fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> Resultwhere
    R: AsyncRead + Unpin + ?Sized,
    W: AsyncWrite + Unpin + ?Sized,

Tokio::io::copy参见官方文档的说明:从reader中一直读取,直到eof或者error,然后写入到writer中。

具体细节是,内部有一个8192Byte的缓存buff,每次从reader中读取最大8192字节,然后写入到writer中,如果reader没有更多数据能读取了,就flush writer,当然,如果reader eof了,也要写到writer完并且flush。

这里需要注意flush的条件是reader没有更多的数据可以读,或者reader eof,也就是完全由reader控制。

pub(super) fn poll_copy(
        &mut self,
        cx: &mut Context<'_>,
        mut reader: Pin<&mut R>,
        mut writer: Pin<&mut W>,
    ) -> Poll>
    where
        R: AsyncRead + ?Sized,
        W: AsyncWrite + ?Sized,
    {
        loop {
            // If our buffer is empty, then we need to read some data to
            // continue.
            if self.pos == self.cap && !self.read_done {
                self.pos = 0;
                self.cap = 0;

                match self.poll_fill_buf(cx, reader.as_mut()) {
                    Poll::Ready(Ok(_)) => (),
                    Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
                    Poll::Pending => {
                        // Try flushing when the reader has no progress to avoid deadlock
                        // when the reader depends on buffered writer.
                        if self.need_flush {
                            ready!(writer.as_mut().poll_flush(cx))?;
                            self.need_flush = false;
                        }

                        return Poll::Pending;
                    }
                }
            }

            // If our buffer has some data, let's write it out!
            while self.pos < self.cap {
                let i = ready!(self.poll_write_buf(cx, reader.as_mut(), writer.as_mut()))?;
                if i == 0 {
                    return Poll::Ready(Err(io::Error::new(
                        io::ErrorKind::WriteZero,
                        "write zero byte into writer",
                    )));
                } else {
                    self.pos += i;
                    self.amt += i as u64;
                    self.need_flush = true;
                }
            }

            // If pos larger than cap, this loop will never stop.
            // In particular, user's wrong poll_write implementation returning
            // incorrect written length may lead to thread blocking.
            debug_assert!(
                self.pos <= self.cap,
                "writer returned length larger than input slice"
            );

            // If we've written all the data and we've seen EOF, flush out the
            // data and finish the transfer.
            if self.pos == self.cap && self.read_done {
                ready!(writer.as_mut().poll_flush(cx))?;
                return Poll::Ready(Ok(self.amt));
            }
        }
    }

4、tokio::io::copy_buf

pub async fn copy_buf<'a, R, W>(
    reader: &'a mut R,
    writer: &'a mut W
) -> Resultwhere
    R: AsyncBufRead + Unpin + ?Sized,
    W: AsyncWrite + Unpin + ?Sized,

按照官方的说明,这个接口和copy比较像,但是,这个接口个人认为比较坑,因为整个过程只会flush一次,也就是把reader读完了,返回eof了,才flush一次。

也就是说如果reader是pending的,啥事儿都不会做

impl Future for CopyBuf<'_, R, W>
where
    R: AsyncBufRead + Unpin + ?Sized,
    W: AsyncWrite + Unpin + ?Sized,
{
    type Output = io::Result;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
        loop {
            let me = &mut *self;
            let buffer = ready!(Pin::new(&mut *me.reader).poll_fill_buf(cx))?;
            if buffer.is_empty() {
                ready!(Pin::new(&mut self.writer).poll_flush(cx))?;
                return Poll::Ready(Ok(self.amt));
            }

            let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, buffer))?;
            if i == 0 {
                return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into()));
            }
            self.amt += i as u64;
            Pin::new(&mut *self.reader).consume(i);
        }
    }
}

这个接口比较适合: 一把读取大量数据,然后一把写入的场景,比如拷贝文件之类的