P99 CONF 2023 is now a wrap! You can (re)watch all videos and access the decks now.
ACCESS ALL THE VIDEOS AND DECKS NOW
Editor’s note: The following is a post from KittyCAD Systems Engineer Adam Chalmers. He spoke at P99 CONF 23 on “High-Level Rust for Backend Programming”: exploring why Rust is also a great language for writing API servers and backends, including two very different use cases for Rust on the backend. This article was originally published on his blog.
A little while back in Tokio’s Discord chat server, someone asked a really interesting question: how can you stream a multipart body from an incoming request into an outgoing request? My struggle to answer this really helped me understand Rust async lifetimes. We’ll explore why this is tricky, then design and benchmark a solution.
Note: all the code is in a full GitHub example.
The question was:
I’m rather new to Rust, so perhaps I’m trying to bite off more than I can chew, but I can’t find any way to stream an axum::extract::multipart::Field
to the body of a PUT request made with reqwest. Obviously, waiting for all the bytes works just fine, as in this test handler:
Everything I’ve found for using a stream as the request body requires that the stream be ‘static, which of course the fields I’m getting from multipart aren’t.
The problem sounded very simple — the server should stream a request in, and stream a request out — so I figured I’d help this person out, for two reasons:
- It’s nice to help people and I am Very Nice
- My dayjob is writing a streaming HTTP proxy in Rust, so this would be good practice.
But it turned out to be harder than I thought.
Why streaming instead of buffering?
Why not just buffer the whole body into memory? Is streaming really that important? Well, yeah — if you’re building a proxy, streaming is worth the extra headache, for two reasons:
First, latency is worse with buffering. Your proxy has to buffer the entire request from the client before sending it to the server. This doubles the latency of requests (assuming all three hosts are equidistant). You have to wait n seconds for the request to reach the proxy, then n seconds to transmit it to the server. But if your proxy used streams, you could get the first few bytes from the client, and send them to the server while you waited for the next few bytes from the client.
Second, memory overhead is huge with buffering. Say you’re proxying a 10gb MacOS system update file. If you buffer every request into memory, your server can’t handle many parallel requests (probably <10). But with streaming, you can get the first n bytes the client, proxy them to the server, and then free them. No more crashing the process because it ran out of memory.
So we really want our proxy server to stream requests. There’s just one problem. Our HTTP server and HTTP client seem to have contradictory lifetimes.
The problem
Axum’s Multipart
extractor owns the incoming request body. It has a method next_field
that returns a Field
type. That Field
is a view into the data from the main Multipart body. It just borrows part of the multipart data. This means the Field is actually Field<'a>
where ‘a
is the lifetime of the Multipart
that created it.
Field borrows from Multipart
Clearly the field can’t outlive the Multipart, because then Field would be pointing at data that is no longer there. Rust borrow checker stops us from doing that. So far so good.
You can read data from a stream, either by reading the whole thing into memory (as above), or using the Stream trait. This trait is defined by the Futures crate here and implemented here. Streams are basically asynchronous iterators. When you try to get the next value from them, it might not be ready yet, so you have to .await
. This is perfect for reading HTTP request bodies, because you can start processing the body as it comes in piece-by-piece. If the next bytes aren’t available, your runtime (e.g. Tokio) will switch to a different task until the data arrives.
This means you don’t have to wait for the entire body to be available, saving time and maybe RAM, if you don’t actually need to store the entire body.
So, now we know how to stream data out of the body. What about streaming data into a new request’s body?
We’ll use reqwest as the HTTP client. Reqwest can send various things as HTTP bodies — strings, vectors of bytes, and even streams, using the wrap_stream
method. The problem is, the stream has to be ‘static
, a special lifetime which means “either it’s not borrowed, or it’s borrowed for the entire length of the program”. This means we can’t use Field
as a reqwest body, because it’s not ‘static
. It’s borrowed, and it’s only valid for as long as the Multipart
that owns it.
Why does reqwest only allow static streams to be bodies? I asked its creator Sean McArthur.
Sean: Internally the connection is in a separate task, to manage connection-level stuff (keep alive, if http2: pings, settings, and flow control). The body gets sent to that connection task
Adam: So basically the connection task takes ownership of all request bodies? And therefore bodies have to be ‘static because they need to be moved into that task?
Sean: Exactly
But there is a solution. The field type is Field<'a>
, so it’s generic over various possible lifetimes. We just need to make sure that the specific lifetime our server uses is ‘static
.
The solution
Note that the Multipart
object itself is ‘static
. Why? Because it’s not borrowed. The next_field
type signature is next_field(&mut self) -> Field<'_>
. What’s that '_
? That’s an elided lifetime. Basically these two signatures are equivalent:
fn next_field(&'a mut self) -> Field<'a>
fn next_field(&mut self) -> Field<'_>
So, the field type is Field<'a>,
but that ‘a
is generic. Its definition works for any possible lifetime. We just have to make sure that when our server’s handler is compiled, it knows that ‘a
is ‘static
.
The key insight is that, if you want to use a stream as a reqwest body, it can’t borrow anything. Because if it borrowed data from some other thread which owns data, what happens if the owning thread dies? Your body would keep reading from the freed memory, and Rust won’t let that happen. This means the stream has to own all the data being streamed.
So, the stream needs to own the Multipart
! After all, the multipart owns the data, and the stream has to own the data. So the stream has to own the multipart.
Once I realized that, the solution emerged. We’ll define a new type of Stream that owns a Multipart and streams data out of its fields.
We make a newtype for converting a Multipart into a Stream. I’m Very Creative so I chose the imaginative name MultipartStream. It owns the Multipart, and its into_stream
method consumes (aka takes ownership of) self
, so that method also owns the Multipart. This means the Multipart is ‘static
. The method creates a stream, using the stream
! macro from the async-stream crate. And that stream then takes ownership of self
and therefore the Multipart.
All this means that the stream doesn’t borrow anything. It owns all its data — both the multipart and its fields — so the stream is ‘static
. Now you can pass it to reqweset::Body::wrap_stream
.
Note: the into_stream
example is very rudimentary — it concatenates all the fields together. This probably wouldn’t be useful. In a real server you might want to only stream certain fields, or maybe filter out fields, or maybe only stream the nth field.
The last thing to do is actually use this MultipartStream
wrapper in our endpoint.
Note, this example uses Axum 0.6.0-rc.1 with its new State types. It’s possible that it might change a little before the final 0.6 release. See the announcement for more. State is basically like an Axum extension where the compiler can guarantee it’s always set. This perfectly solves the problem my previous post about Axum complained about, where I know the extension is always set, but the compiler doesn’t, so I need a dubious .unwrap()
Benchmarks
I ran some benchmarks on the repo. Basically, I used curl to send the proxy server a Multipart body with 20 copies of the Unix wordlist. Then the server proxied it to a second server, which prints it. I compared the streaming proxy above, with a proxy that buffers everything. You can see the full setup in the GitHub example.
Because I ran this all locally, I don’t expect much difference in total time. After all, the latency between processes running on my Macbook is pretty low. So doubling the latency won’t matter much, because the latency is nearly zero anyway. But I expect the RAM usage to be very different.
Yep, streaming saves a lot of memory.
Takeaways
-
- reqwest connections are handled in their own thread, so they need to own their bodies.
- This is just a design choice — other HTTP libraries could work differently, although tide and actix web client also require streaming bodies be ‘static.
- I think this is partly because of the Tokio runtime, and other runtimes might not require ‘static, see this P99 CONF talk from Glauber Costa, the creator of glommio (and another P99 CONF 23 speaker).
- reqwest connections are handled in their own thread, so they need to own their bodies.
- ‘
static
means “this value either isn’t borrowed, or is borrowed for the entire runtime of the program” - A static stream can’t borrow any data
- Implementing your own streams with async-stream is pretty easy
Watch Adam’s talk from P99 CONF
Here’s a look at what Adam presented at P99 CONF 23…
Some people say you should only use Rust where you can’t afford to use garbage collection. I disagree — Rust is a great language for writing API servers and backends, due to its high-level libraries, type system, and macros. Having used Rust at both Cloudflare and KittyCAD, I have multiple perspectives to share about iterating quickly with Rust. For example, we use Dropshot, a server library from Oxide Computer which generates OpenAPI specs from your server’s endpoint functions. Then we generate API clients for that server, in many different languages.