最近阅读了一些Rust的异步网络协议的实现,包括tokio-yamux,hyper,h2。 我发现对于rust的异步网络编程,他们的实现都有几个特点: 1、所以的Poll操作poll_write和poll_read的顺序其实不重要,都可以,关键还是要看是否会导致死锁,比如我的read是ready的,但是因为我先poll_write,并且write操作是pending的,就会导致我的read操作无法被执行,所以推荐的顺序是先write,再read,比较符合逻辑,因为网络中的大部分交互逻辑都是cs架构,需要先说,再读。 2、都有一个主要的异步task执行真正的读写,然后关联一些Stream进行单独的buffer。

针对第一点: tokio-yamux中是:

impl Stream for Session
where
    T: AsyncRead + AsyncWrite + Unpin,
{
    type Item = Result;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> {
        ......
        // 先看是否有keepalive需要发送,也就是写入操作
        if let Some(ref mut interval) = self.keepalive {
            match Pin::new(interval).as_mut().poll_next(cx) {
                Poll::Ready(Some(_)) => {
                    if self.local_go_away {
                        // The remote peer has not responded to our sent go away code.
                        // Assume that remote peer has gone away and this session should be closed.
                        self.remote_go_away = true;
                    } else {
                        self.keep_alive(cx, Instant::now())?;
                    }
                }
                Poll::Ready(None) => {
                    debug!("yamux::Session poll keepalive interval finished");
                }
                Poll::Pending => (),
            }
        }

        let mut need_wake = false;
        // 里面会把所有stream的frame尝试发送和flush
        self.flush(cx)?;

        // 后面都是处理读取,不过作者这里循环了16次,每次都是先处理读取,如果没啥可以读取的,则flush之后退出,因为有可能读取了之后,又产生一些事情需要处理,如果一直都有数据需要读取,则主动yield,让其他异步task能获得执行权限。
        for _ in 0..16 {
            if self.is_dead() {
                debug!("yamux::Session finished because is_dead, end");
                return Poll::Ready(None);
            }
            // Reset initial value
            need_wake = false;

            // Open stream as soon as possible
            if let Some(stream) = self.pending_streams.pop_front() {
                debug!("yamux::Session [{:?}] A stream is ready", self.ty);
                return Poll::Ready(Some(Ok(stream)));
            }

            let mut is_pending = self.control_poll(cx)?.is_pending();
            is_pending &= self.recv_frames(cx)?.is_pending();
            is_pending &= self.recv_events(cx)?.is_pending();

            if is_pending {
                self.flush(cx)?;
                break;
            } else {
                need_wake = true;
            }
        }

        if need_wake {
            // To ensure we do not starve other tasks waiting on the executor,
            // we yield here, but immediately wake ourselves up to continue.
            cx.waker().wake_by_ref()
        }

        Poll::Pending
    }
}

总体上来说是先写再读的模式。 再来看看hyper的,hyper的是先读再写模式:

fn poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll> {
        // Limit the looping on this connection, in case it is ready far too
        // often, so that other futures don't starve.
        //
        // 16 was chosen arbitrarily, as that is number of pipelined requests
        // benchmarks often use. Perhaps it should be a config option instead.
        for _ in 0..16 {
            let _ = self.poll_read(cx)?;
            let _ = self.poll_write(cx)?;
            let _ = self.poll_flush(cx)?;

            // This could happen if reading paused before blocking on IO,
            // such as getting to the end of a framed message, but then
            // writing/flushing set the state back to Init. In that case,
            // if the read buffer still had bytes, we'd want to try poll_read
            // again, or else we wouldn't ever be woken up again.
            //
            // Using this instead of task::current() and notify() inside
            // the Conn is noticeably faster in pipelined benchmarks.
            if !self.conn.wants_read_again() {
                //break;
                return Poll::Ready(Ok(()));
            }
        }

        trace!("poll_loop yielding (self = {:p})", self);

        task::yield_now(cx).map(|never| match never {})
    }

为什么hyper是先读再写也没问题呢,个人粗略的理解是hyper中真正执行io操作之前,会判断是否需要执行io操作,如果状态不正确,不会执行导致pending的操作,所以顺序就无所谓了,比如代码:

fn poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll> {
        loop {
            if self.is_closing {
                return Poll::Ready(Ok(()));
            } else if self.conn.can_read_head() {
                ready!(self.poll_read_head(cx))?;
            } else if let Some(mut body) = self.body_tx.take() {
                if self.conn.can_read_body() {
                    match body.poll_ready(cx) {
                        Poll::Ready(Ok(())) => (),
                        Poll::Pending => {
                            self.body_tx = Some(body);
                            return Poll::Pending;
                        }
                        Poll::Ready(Err(_canceled)) => {
                            // user doesn't care about the body
                            // so we should stop reading
                            trace!("body receiver dropped before eof, draining or closing");
                            self.conn.poll_drain_or_close_read(cx);
                            continue;
                        }
                    }
                    match self.conn.poll_read_body(cx) {
                        Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) {
                            Ok(()) => {
                                self.body_tx = Some(body);
                            }
                            Err(_canceled) => {
                                if self.conn.can_read_body() {
                                    trace!("body receiver dropped before eof, closing");
                                    self.conn.close_read();
                                }
                            }
                        },
                        Poll::Ready(None) => {
                            // just drop, the body will close automatically
                        }
                        Poll::Pending => {
                            self.body_tx = Some(body);
                            return Poll::Pending;
                        }
                        Poll::Ready(Some(Err(e))) => {
                            body.send_error(crate::Error::new_body(e));
                        }
                    }
                } else {
                    // just drop, the body will close automatically
                }
            } else {
                return self.conn.poll_read_keep_alive(cx);
            }
        }
    }

在执行read之前,会判断状态,write也同理。 在来看看h2:

fn poll2(&mut self, cx: &mut Context) -> Poll> {
        // This happens outside of the loop to prevent needing to do a clock
        // check and then comparison of the queue possibly multiple times a
        // second (and thus, the clock wouldn't have changed enough to matter).
        self.clear_expired_reset_streams();

        loop {
            // First, ensure that the `Connection` is able to receive a frame
            //
            // The order here matters:
            // - poll_go_away may buffer a graceful shutdown GOAWAY frame
            // - If it has, we've also added a PING to be sent in poll_ready
            if let Some(reason) = ready!(self.poll_go_away(cx)?) {
                if self.inner.go_away.should_close_now() {
                    if self.inner.go_away.is_user_initiated() {
                        // A user initiated abrupt shutdown shouldn't return
                        // the same error back to the user.
                        return Poll::Ready(Ok(()));
                    } else {
                        return Poll::Ready(Err(Error::library_go_away(reason)));
                    }
                }
                // Only NO_ERROR should be waiting for idle
                debug_assert_eq!(
                    reason,
                    Reason::NO_ERROR,
                    "graceful GOAWAY should be NO_ERROR"
                );
            }
            // 里面就是在写入ping pong settings等frame
            ready!(self.poll_ready(cx))?;
            // 然后执行读取的操作
            match self
                .inner
                .as_dyn()
                .recv_frame(ready!(Pin::new(&mut self.codec).poll_next(cx)?))?
            {
                ReceivedFrame::Settings(frame) => {
                    self.inner.settings.recv_settings(
                        frame,
                        &mut self.codec,
                        &mut self.inner.streams,
                    )?;
                }
                ReceivedFrame::Continue => (),
                ReceivedFrame::Done => {
                    return Poll::Ready(Ok(()));
                }
            }
        }
    }

整体结构还是以先写入再读取为主。


针对第二点,个人猜测完全可以做到读写的解藕,这样可以提高性能(因为在读取的时候的阻塞,不会导致写入也被阻塞),但是可能由于协议的复杂性,会让实现的复杂性提高吧,而且提高的性能估计也不大,因为tcp本身速度也不算特别快,不像udp。