1、BufReader
一句话总结:拿一大块buff去尝试读取底层IO的数据,然后逐步被上层读取;
需要注意的是,如果上层来读取时的buff比BufReader自己的buf还要大,并且自己并没有缓存数据时,会直接进行读取,读取到上层的buf中去;
impl AsyncRead for BufReader {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll> {
// If we don't have any buffered data and we're doing a massive read
// (larger than our internal buffer), bypass our internal buffer
// entirely.
if self.pos == self.cap && buf.remaining() >= self.buf.len() {
let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
self.discard_buffer();
return Poll::Ready(res);
}
let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
let amt = std::cmp::min(rem.len(), buf.remaining());
buf.put_slice(&rem[..amt]);
self.consume(amt);
Poll::Ready(Ok(()))
}
}
BufReader适合上层慢慢读取的场景
2、BufWriter
一句话总结: 如果要写入的数据小,则缓存,如果很大,直接写。
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll> {
if self.buf.len() + buf.len() > self.buf.capacity() {
ready!(self.as_mut().flush_buf(cx))?;
}
let me = self.project();
// 这儿又个小trick,如果为true,那么必然已经flush了,也就是self.buf肯定为空,所以可以直接写入上层的buf数据。
if buf.len() >= me.buf.capacity() {
me.inner.poll_write(cx, buf)
} else {
// 这儿注意,me.buf.write可能会extence Vec的cap,因为self.buf里面可能还有一些数据,然后加上buff.len()可能超过self.buf的cap,所以BufWriter::with_capacity的只是初始cap,并不是一直不变的。
Poll::Ready(me.buf.write(buf))
}
}
如果长期写入大块的数据(大于设置的capacity),BufWriter相当于啥事儿没用。
BufWriter适合上层写入大量比较短的数据,缓存效果比较好。
3.tokio::io::copy
pub async fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> Resultwhere
R: AsyncRead + Unpin + ?Sized,
W: AsyncWrite + Unpin + ?Sized,
Tokio::io::copy参见官方文档的说明:从reader中一直读取,直到eof或者error,然后写入到writer中。
具体细节是,内部有一个8192Byte的缓存buff,每次从reader中读取最大8192字节,然后写入到writer中,如果reader没有更多数据能读取了,就flush writer,当然,如果reader eof了,也要写到writer完并且flush。
这里需要注意flush的条件是reader没有更多的数据可以读,或者reader eof,也就是完全由reader控制。
pub(super) fn poll_copy(
&mut self,
cx: &mut Context<'_>,
mut reader: Pin<&mut R>,
mut writer: Pin<&mut W>,
) -> Poll>
where
R: AsyncRead + ?Sized,
W: AsyncWrite + ?Sized,
{
loop {
// If our buffer is empty, then we need to read some data to
// continue.
if self.pos == self.cap && !self.read_done {
self.pos = 0;
self.cap = 0;
match self.poll_fill_buf(cx, reader.as_mut()) {
Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => {
// Try flushing when the reader has no progress to avoid deadlock
// when the reader depends on buffered writer.
if self.need_flush {
ready!(writer.as_mut().poll_flush(cx))?;
self.need_flush = false;
}
return Poll::Pending;
}
}
}
// If our buffer has some data, let's write it out!
while self.pos < self.cap {
let i = ready!(self.poll_write_buf(cx, reader.as_mut(), writer.as_mut()))?;
if i == 0 {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"write zero byte into writer",
)));
} else {
self.pos += i;
self.amt += i as u64;
self.need_flush = true;
}
}
// If pos larger than cap, this loop will never stop.
// In particular, user's wrong poll_write implementation returning
// incorrect written length may lead to thread blocking.
debug_assert!(
self.pos <= self.cap,
"writer returned length larger than input slice"
);
// If we've written all the data and we've seen EOF, flush out the
// data and finish the transfer.
if self.pos == self.cap && self.read_done {
ready!(writer.as_mut().poll_flush(cx))?;
return Poll::Ready(Ok(self.amt));
}
}
}
4、tokio::io::copy_buf
pub async fn copy_buf<'a, R, W>(
reader: &'a mut R,
writer: &'a mut W
) -> Resultwhere
R: AsyncBufRead + Unpin + ?Sized,
W: AsyncWrite + Unpin + ?Sized,
按照官方的说明,这个接口和copy比较像,但是,这个接口个人认为比较坑,因为整个过程只会flush一次,也就是把reader读完了,返回eof了,才flush一次。
也就是说如果reader是pending的,啥事儿都不会做。
impl Future for CopyBuf<'_, R, W>
where
R: AsyncBufRead + Unpin + ?Sized,
W: AsyncWrite + Unpin + ?Sized,
{
type Output = io::Result;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
loop {
let me = &mut *self;
let buffer = ready!(Pin::new(&mut *me.reader).poll_fill_buf(cx))?;
if buffer.is_empty() {
ready!(Pin::new(&mut self.writer).poll_flush(cx))?;
return Poll::Ready(Ok(self.amt));
}
let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, buffer))?;
if i == 0 {
return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into()));
}
self.amt += i as u64;
Pin::new(&mut *self.reader).consume(i);
}
}
}
这个接口比较适合: 一把读取大量数据,然后一把写入的场景,比如拷贝文件之类的。