最近阅读了一些Rust的异步网络协议的实现,包括tokio-yamux,hyper,h2。 我发现对于rust的异步网络编程,他们的实现都有几个特点: 1、所以的Poll操作poll_write和poll_read的顺序其实不重要,都可以,关键还是要看是否会导致死锁,比如我的read是ready的,但是因为我先poll_write,并且write操作是pending的,就会导致我的read操作无法被执行,所以推荐的顺序是先write,再read,比较符合逻辑,因为网络中的大部分交互逻辑都是cs架构,需要先说,再读。 2、都有一个主要的异步task执行真正的读写,然后关联一些Stream进行单独的buffer。
针对第一点: tokio-yamux中是:
impl Stream for Session
where
T: AsyncRead + AsyncWrite + Unpin,
{
type Item = Result;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll
总体上来说是先写再读的模式。 再来看看hyper的,hyper的是先读再写模式:
fn poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll> {
// Limit the looping on this connection, in case it is ready far too
// often, so that other futures don't starve.
//
// 16 was chosen arbitrarily, as that is number of pipelined requests
// benchmarks often use. Perhaps it should be a config option instead.
for _ in 0..16 {
let _ = self.poll_read(cx)?;
let _ = self.poll_write(cx)?;
let _ = self.poll_flush(cx)?;
// This could happen if reading paused before blocking on IO,
// such as getting to the end of a framed message, but then
// writing/flushing set the state back to Init. In that case,
// if the read buffer still had bytes, we'd want to try poll_read
// again, or else we wouldn't ever be woken up again.
//
// Using this instead of task::current() and notify() inside
// the Conn is noticeably faster in pipelined benchmarks.
if !self.conn.wants_read_again() {
//break;
return Poll::Ready(Ok(()));
}
}
trace!("poll_loop yielding (self = {:p})", self);
task::yield_now(cx).map(|never| match never {})
}
为什么hyper是先读再写也没问题呢,个人粗略的理解是hyper中真正执行io操作之前,会判断是否需要执行io操作,如果状态不正确,不会执行导致pending的操作,所以顺序就无所谓了,比如代码:
fn poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll> {
loop {
if self.is_closing {
return Poll::Ready(Ok(()));
} else if self.conn.can_read_head() {
ready!(self.poll_read_head(cx))?;
} else if let Some(mut body) = self.body_tx.take() {
if self.conn.can_read_body() {
match body.poll_ready(cx) {
Poll::Ready(Ok(())) => (),
Poll::Pending => {
self.body_tx = Some(body);
return Poll::Pending;
}
Poll::Ready(Err(_canceled)) => {
// user doesn't care about the body
// so we should stop reading
trace!("body receiver dropped before eof, draining or closing");
self.conn.poll_drain_or_close_read(cx);
continue;
}
}
match self.conn.poll_read_body(cx) {
Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) {
Ok(()) => {
self.body_tx = Some(body);
}
Err(_canceled) => {
if self.conn.can_read_body() {
trace!("body receiver dropped before eof, closing");
self.conn.close_read();
}
}
},
Poll::Ready(None) => {
// just drop, the body will close automatically
}
Poll::Pending => {
self.body_tx = Some(body);
return Poll::Pending;
}
Poll::Ready(Some(Err(e))) => {
body.send_error(crate::Error::new_body(e));
}
}
} else {
// just drop, the body will close automatically
}
} else {
return self.conn.poll_read_keep_alive(cx);
}
}
}
在执行read之前,会判断状态,write也同理。 在来看看h2:
fn poll2(&mut self, cx: &mut Context) -> Poll> {
// This happens outside of the loop to prevent needing to do a clock
// check and then comparison of the queue possibly multiple times a
// second (and thus, the clock wouldn't have changed enough to matter).
self.clear_expired_reset_streams();
loop {
// First, ensure that the `Connection` is able to receive a frame
//
// The order here matters:
// - poll_go_away may buffer a graceful shutdown GOAWAY frame
// - If it has, we've also added a PING to be sent in poll_ready
if let Some(reason) = ready!(self.poll_go_away(cx)?) {
if self.inner.go_away.should_close_now() {
if self.inner.go_away.is_user_initiated() {
// A user initiated abrupt shutdown shouldn't return
// the same error back to the user.
return Poll::Ready(Ok(()));
} else {
return Poll::Ready(Err(Error::library_go_away(reason)));
}
}
// Only NO_ERROR should be waiting for idle
debug_assert_eq!(
reason,
Reason::NO_ERROR,
"graceful GOAWAY should be NO_ERROR"
);
}
// 里面就是在写入ping pong settings等frame
ready!(self.poll_ready(cx))?;
// 然后执行读取的操作
match self
.inner
.as_dyn()
.recv_frame(ready!(Pin::new(&mut self.codec).poll_next(cx)?))?
{
ReceivedFrame::Settings(frame) => {
self.inner.settings.recv_settings(
frame,
&mut self.codec,
&mut self.inner.streams,
)?;
}
ReceivedFrame::Continue => (),
ReceivedFrame::Done => {
return Poll::Ready(Ok(()));
}
}
}
}
整体结构还是以先写入再读取为主。
针对第二点,个人猜测完全可以做到读写的解藕,这样可以提高性能(因为在读取的时候的阻塞,不会导致写入也被阻塞),但是可能由于协议的复杂性,会让实现的复杂性提高吧,而且提高的性能估计也不大,因为tcp本身速度也不算特别快,不像udp。