Save the Date!!! P99 CONF 2024 is Oct 23-24 | Call for Speakers Now Open

lines masked in circle
purple circle at half opacity

Static Streams for Faster Async Proxies (In Rust)

grid of red dots
white dots in concentric circle

Share This Post

P99 CONF 2023 is now a wrap! You can (re)watch all videos and access the decks now.

ACCESS ALL THE VIDEOS AND DECKS NOW

Editor’s note: The following is a post from KittyCAD Systems Engineer Adam Chalmers. He spoke at P99 CONF 23 on “High-Level Rust for Backend Programming”: exploring why Rust is also a great language for writing API servers and backends, including two very different use cases for Rust on the backend. This article was originally published on his blog

Watch Adam’s Talk

A little while back in Tokio’s Discord chat server, someone asked a really interesting question: how can you stream a multipart body from an incoming request into an outgoing request? My struggle to answer this really helped me understand Rust async lifetimes. We’ll explore why this is tricky, then design and benchmark a solution.

Note: all the code is in a full GitHub example.

The question was:

I’m rather new to Rust, so perhaps I’m trying to bite off more than I can chew, but I can’t find any way to stream an axum::extract::multipart::Field to the body of a PUT request made with reqwest. Obviously, waiting for all the bytes works just fine, as in this test handler:

Everything I’ve found for using a stream as the request body requires that the stream be ‘static, which of course the fields I’m getting from multipart aren’t.

The problem sounded very simple — the server should stream a request in, and stream a request out — so I figured I’d help this person out, for two reasons:

  • It’s nice to help people and I am Very Nice
  • My dayjob is writing a streaming HTTP proxy in Rust, so this would be good practice.

But it turned out to be harder than I thought.

Why streaming instead of buffering?

Why not just buffer the whole body into memory? Is streaming really that important? Well, yeah — if you’re building a proxy, streaming is worth the extra headache, for two reasons:

First, latency is worse with buffering. Your proxy has to buffer the entire request from the client before sending it to the server. This doubles the latency of requests (assuming all three hosts are equidistant). You have to wait n seconds for the request to reach the proxy, then n seconds to transmit it to the server. But if your proxy used streams, you could get the first few bytes from the client, and send them to the server while you waited for the next few bytes from the client.

Second, memory overhead is huge with buffering. Say you’re proxying a 10gb MacOS system update file. If you buffer every request into memory, your server can’t handle many parallel requests (probably <10). But with streaming, you can get the first n bytes the client, proxy them to the server, and then free them. No more crashing the process because it ran out of memory.

So we really want our proxy server to stream requests. There’s just one problem. Our HTTP server and HTTP client seem to have contradictory lifetimes.

The problem

Axum’s Multipart extractor owns the incoming request body. It has a method next_field that returns a Field type. That Field is a view into the data from the main Multipart body. It just borrows part of the multipart data. This means the Field is actually Field<'a> where ‘a is the lifetime of the Multipart that created it.

Field borrows from Multipart

Field borrows from Multipart

Clearly the field can’t outlive the Multipart, because then Field would be pointing at data that is no longer there. Rust borrow checker stops us from doing that. So far so good.

You can read data from a stream, either by reading the whole thing into memory (as above), or using the Stream trait. This trait is defined by the Futures crate here and implemented here. Streams are basically asynchronous iterators. When you try to get the next value from them, it might not be ready yet, so you have to .await. This is perfect for reading HTTP request bodies, because you can start processing the body as it comes in piece-by-piece. If the next bytes aren’t available, your runtime (e.g. Tokio) will switch to a different task until the data arrives.

This means you don’t have to wait for the entire body to be available, saving time and maybe RAM, if you don’t actually need to store the entire body.

So, now we know how to stream data out of the body. What about streaming data into a new request’s body?

We’ll use reqwest as the HTTP client. Reqwest can send various things as HTTP bodies — strings, vectors of bytes, and even streams, using the wrap_stream method. The problem is, the stream has to be ‘static, a special lifetime which means “either it’s not borrowed, or it’s borrowed for the entire length of the program”. This means we can’t use Field as a reqwest body, because it’s not ‘static. It’s borrowed, and it’s only valid for as long as the Multipart that owns it.

Why does reqwest only allow static streams to be bodies? I asked its creator Sean McArthur.

Sean: Internally the connection is in a separate task, to manage connection-level stuff (keep alive, if http2: pings, settings, and flow control). The body gets sent to that connection task

Adam: So basically the connection task takes ownership of all request bodies? And therefore bodies have to be ‘static because they need to be moved into that task?

Sean: Exactly

But there is a solution. The field type is Field<'a>, so it’s generic over various possible lifetimes. We just need to make sure that the specific lifetime our server uses is ‘static.

The solution

Note that the Multipart object itself is ‘static. Why? Because it’s not borrowed. The next_field type signature is next_field(&mut self) -> Field<'_>. What’s that '_? That’s an elided lifetime. Basically these two signatures are equivalent:

fn next_field(&'a mut self) -> Field<'a>
fn next_field(&mut self) -> Field<'_>

So, the field type is Field<'a>, but that ‘a is generic. Its definition works for any possible lifetime. We just have to make sure that when our server’s handler is compiled, it knows that ‘a is ‘static.

The key insight is that, if you want to use a stream as a reqwest body, it can’t borrow anything. Because if it borrowed data from some other thread which owns data, what happens if the owning thread dies? Your body would keep reading from the freed memory, and Rust won’t let that happen. This means the stream has to own all the data being streamed.

So, the stream needs to own the Multipart! After all, the multipart owns the data, and the stream has to own the data. So the stream has to own the multipart.

Once I realized that, the solution emerged. We’ll define a new type of Stream that owns a Multipart and streams data out of its fields.

We make a newtype for converting a Multipart into a Stream. I’m Very Creative so I chose the imaginative name MultipartStream. It owns the Multipart, and its into_stream method consumes (aka takes ownership of) self, so that method also owns the Multipart. This means the Multipart is ‘static. The method creates a stream, using the stream! macro from the async-stream crate. And that stream then takes ownership of self and therefore the Multipart.

All this means that the stream doesn’t borrow anything. It owns all its data — both the multipart and its fields — so the stream is ‘static. Now you can pass it to reqweset::Body::wrap_stream.

Note: the into_stream example is very rudimentary — it concatenates all the fields together. This probably wouldn’t be useful. In a real server you might want to only stream certain fields, or maybe filter out fields, or maybe only stream the nth field.

The last thing to do is actually use this MultipartStream wrapper in our endpoint.

Note, this example uses Axum 0.6.0-rc.1 with its new State types. It’s possible that it might change a little before the final 0.6 release. See the announcement for more. State is basically like an Axum extension where the compiler can guarantee it’s always set. This perfectly solves the problem my previous post about Axum complained about, where I know the extension is always set, but the compiler doesn’t, so I need a dubious .unwrap()

Benchmarks

I ran some benchmarks on the repo. Basically, I used curl to send the proxy server a Multipart body with 20 copies of the Unix wordlist. Then the server proxied it to a second server, which prints it. I compared the streaming proxy above, with a proxy that buffers everything. You can see the full setup in the GitHub example.

Because I ran this all locally, I don’t expect much difference in total time. After all, the latency between processes running on my Macbook is pretty low. So doubling the latency won’t matter much, because the latency is nearly zero anyway. But I expect the RAM usage to be very different.

Yep, streaming saves a lot of memory.

Takeaways

    • reqwest connections are handled in their own thread, so they need to own their bodies.
      • This is just a design choice — other HTTP libraries could work differently, although tide and actix web client also require streaming bodies be ‘static.
      • I think this is partly because of the Tokio runtime, and other runtimes might not require ‘static, see this P99 CONF talk from Glauber Costa, the creator of glommio (and another P99 CONF 23 speaker).
  • static means “this value either isn’t borrowed, or is borrowed for the entire runtime of the program”
  • A static stream can’t borrow any data
  • Implementing your own streams with async-stream is pretty easy

Watch Adam’s talk from P99 CONF

Here’s a look at what Adam presented at P99 CONF 23…

Some people say you should only use Rust where you can’t afford to use garbage collection. I disagree — Rust is a great language for writing API servers and backends, due to its high-level libraries, type system, and macros. Having used Rust at both Cloudflare and KittyCAD, I have multiple perspectives to share about iterating quickly with Rust.  For example, we use Dropshot, a server library from Oxide Computer which generates OpenAPI specs from your server’s endpoint functions. Then we generate API clients for that server, in many different languages.

Watch Adam’s Talk

About Adam Chalmers

Adam is a Systems Engineer at KittyCAD.
He started learning Rust for fun in 2017. He's now worked full-time with Rust for five years at Cloudflare and KittyCAD, writing high-performance and high-availability API servers and internet proxies.

More To Explore

Bun, Tokio, Turso Creators on Rust vs Zig

What transpired when Glauber Costa (Turso co-founder), Jarred Sumner (developer of Bun.js and CEO of Oven) and Carl Lerche (developer of Tokio and major Rust

P99 CONF OCT. 18 + 19, 2023

Register for Your Free Ticket