lines masked in circle
purple circle at half opacity

Kafka-Compatible Fast Distributed Transactions

grid of red dots
white dots in concentric circle

Share This Post

First published on vectorized.io on September 14, 2021.

Transactions bring exactly-once semantics (EOS) to streaming processing. They help developers avoid the anomalies of at-most-once or at-least-once processing (lost or duplicated events, respectively) and focus only on the essential logic.

It’s a rare opportunity to get to work on negative-cost abstractions, where you improve a product in all major pillars: improving data safety guarantees, maintaining compatibility with existing applications and increasing throughput – all in one feature! We’re excited to share with you Redpanda Transactions, the culmination of a year’s work that is finally here.

Redpanda’s Transactions are compatible with the Kafka-API, so developers can use their favorite client libraries to work with it. For example, we may fetch messages starting from the last consumed offset and transactionally process them one by one updating the last consumed offset and producing events at the same time.

Streaming Example

Data intensive applications tend to be IO bound so we decided to step away from KIP-98 and to design transactions from scratch to aggressively minimize disk IO. This decision helped us build an efficient transactional system.

In this post we’ll investigate the efficiency of the Redpanda transactions and show how transactions even increase (!) throughput.

Systems with strict consistency models tend to have poorer performance than their weaker counterparts. For example, linearizable systems are slower than ones based on CRDTs. The whole point of the isolation levels is to relax isolation to increase performance. Redpanda is no exception: its transactional stream processing (send and sendOffsetsToTransaction wrapped in a transaction) is more expensive than sequential send and commitSync.

In a 3-node cluster (i3.large, NVMe SSD, same availability zone) Redpanda’s transactional processing has 4.6x higher median latency compared to the naive streaming (2.8 ms / 0.6 ms = 4.6).

Kafka has 6.3x transactional overhead.

Since we normalized the overhead (the lower, the better) the difference between 6.3 (Kafka) and 4.6 (Redpanda) is the effect of the different transaction protocols.

What makes Redpanda transactions fast?

Let’s take a look at the following transactional workload and measure each step to understand what makes Redpanda transactions so fast.

var producer = new KafkaProducer(...);
producer.initTransactions();

while (true) {
  producer.beginTransaction();
  var f1 = producer.send(new ProducerRecord<String, String>(TARGET_TOPIC1, "key1", "value1"));
  var f2 = producer.send(new ProducerRecord<String, String>(TARGET_TOPIC2, "key1", "value1"));
  f1.get();
  f2.get();
  producer.commitTransaction();
}

Just like with streaming we measured the workload in a 3-node cluster (i3.large, NVMe SSD, same availability zone) and we were running it for 15 minutes. Median latency of the whole Redpanda’s transaction is cut in half compared to Kafka.

Let’s measure how each part of the transaction (begin, send, resolve and commit) contributes to those numbers to see where the improvement comes from.

Method beginTransaction() should be called before the start of each new transaction. It doesn’t invoke any server side API so it’s incredibly fast and there is no significant difference between Redpanda and Kafka.

send() asynchronously sends a record to a topic. Depending on values of batch.size and linger.ms it may or may not invoke RPC. In the next figure both systems go toe to toe.

We are not obligated to wait for the send() futures to be resolved, but commitTransaction waits for them anyway, so we don’t increase the overall latency by measuring the resolving phase. And this data is very useful because it highlights the key difference between Redpanda and Kafka transaction protocols.

Redpanda uses transactions to scope a unit of work to optimize it as a whole instead of processing send() requests one by one. It overwrites acks settings from acks=all to acks=1 to use asynchronous replication for send() which is way faster than the default synchronous replication. Since asynchronous replication isn’t reliable, when Redpanda commits, it waits until follower nodes have caught up with the leader or aborts the transaction when replication isn’t possible.

Another optimization comes in the AddPartitionsToTxn area. When a client starts writing to a new partition it issues the AddPartitionsToTxn call to the transaction coordinator. Unlike Kafka, Redpanda’s coordinator relies on the in-memory state and writes to disk only during the EndTxn call (commit or abort).

But still 1.5 ms looks like a pretty high latency for a workflow which doesn’t write to disk. Where does it come from?

Transactions in Kafka are strongly eventually consistent (SEC). A coordinator acks the commit request after it passes the point of no return (coordinator wrote its state to log) but before a tx is fully committed (coordinator wrote commit markers). Eventual consistency is known to cause various anomalies but SEC is a different beast. Since the coordinator replicates its state before confirming a request the commit can’t be lost and it will eventually be applied.

It’s a good optimization and Redpanda does the same. As a result the next transaction may collide and wait (hence 1.5 ms) until the previous tx writes the markers. The good news: those actions don’t need to be sequential and there is an opportunity to shave off another millisecond.

Finally, let’s look at the commitTransaction latency:

We see that Kafka has slightly better latency than Redpanda here.

In Kafka a transaction is considered committed as soon as a client accesses the coordinator and marks it committed. Since Redpanda overwrites the consistency level, the coordinator communicates with the data partitions and writes a special marker to make sure that the writes are fully replicated.

Actually it should increase latency more than 0.2 ms since it does intra-cluster RTT and writes to disks. But, Redpanda uses Parallel Commits optimization and does the work in parallel with updating the state of the coordinator so the increase is insignificant.

Is It Faster to Go with Redpanda Transactions than Without Them?!

Now we get close to answering why transactional bulk import is faster than non-transactional workload. N.B. We tested the latter with the default settings and with the settings optimized for high throughput (increased batch.size).

Consider the following workload:

var futures = new Future[N];
for (int i=0;i<N;i++) {
futures[i] = producer.send(new ProducerRecord<String, String>(TARGET_TOPIC, "key1", "value1"));
}
for (var future : futures) {
futures[i].get();
}

Even when we issue all the send() requests at the same time, the Kafka client maintains at most five concurrent requests. With acks=all, Redpanda synchronously replicates messages to the majority of the replicas and waits until they persist them to disk before acknowledging the request. So for N sends there are roughly O(N) fsyncs.

producer.beginTransaction();
for (int i=0;i<N;i++) {
producer.send(new ProducerRecord<String, String>(TARGET_TOPIC, "key1", "value1"));
}
producer.commitTransaction();

When we wrap the requests into a transaction we override the consistency level, write N requests without fsyncs, and then do a single fsync at the commit phase. That method gives Redpanda a much better disk access pattern.

​​What’s next

Once again, we’re thrilled to announce that Redpanda transactions are here. They improve latency, throughput and are available via the classic Kafka API. Please try it, we’d love your feedback!

This post is only the first in a series dedicated to the Redpanda transactions so stay tuned. In the next posts we’ll:

  • dive in the internals of the Redpanda transactions
  • learn how to operate them
  • compare Redpanda with other transactional streaming solutions and open source the benchmarks

Q&A

Is Redpanda transaction support complete?

No, it isn’t. Redpanda transactions don’t cover compacted topics, also we don’t support KIP-447 and KIP-360 yet. Overall transactional support is on the Kafka 2.4 level. But we work on catching up.

Any caveats?

Redpanda optimizes transactions for the happy path, without sacrificing correctness in more complex situations. Compared to Kafka, Redpanda transactions are more sensitive to intra cluster disturbances such as data partition or transaction coordinator re-elections. When it happens Redpanda aborts the affected transactions and returns an error.

When a Kafka client encounters an error during a transactional processing they should close the current producer, create a new producer and retry the transaction. Aborted transactions are always a possibility in Kafka; Redpanda merely increases their occurrence rate.

What machines did you use to run the tests?

I tested streaming and multi-partition writes on the i3.large nodes with NVMe SSD. For transactional bulk insert tests I used c5.large with 14TB st1 disk.

How much load was used to get the results?

We used an infinitesimal amount of load: a single thread with a closed loop. The whole idea behind this blog post is to use latency distribution to highlight architectural choice in the Redpanda transactional protocol. If we were using stress data we could hide the signal behind the noise. But we’re going to write about the transactional stress testing soon. Stay tuned!

What Kafka settings did you use?

We used Kafka 2.8.0 and tuned Kafka topics to have a replication factor 3, to flush on every write, and to have zero backoff:

retry.backoff.ms=0
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.flush.interval.messages=1
log.flush.interval.ms=0
min.insync.replicas=2

Why did you set retry.backoff.ms to 0?

Kafka transactions are strongly eventual consistent. A coordinator acks a transaction after it passes the point of no return (made an update to the transaction log) but before it is fully executed.

We can see this behavior in the intercepted traffic: a transaction coordinator acks a commit request (look for EndTxn v1 Response) before it sends WriteTxnMarkers.

It’s a very clever optimization. Unfortunately, with the default producer settings this optimization backfires and makes latency hundred fold higher.

When we run back-to-back transactions the current transaction collides with writing transactional markers and results in the concurrent error which causes a client to wait for retry.backoff.ms before retrying. By setting it to zero we reduce the latency impact.

Why did you change log.flush.interval.*?

At the default log.flush.interval setting, Kafka confirms requests before writing to disk. This may lead to silent data loss and spontaneous silent transaction rollback.

What are the production settings for Redpanda transactions?

You need to update Redpanda configuration to:

  • Enable transactions and idempotent producers (enable_idempotence: true & enable_transactions: true)
  • Increase replication factor of the internal topic (id_allocator_replication: 3 & transaction_coordinator_replication: 3)
  • Make sure transaction_coordinator_delete_retention_ms is at least as high as transactional_id_expiration_ms

More To Explore

Bun, Tokio, Turso Creators on Rust vs Zig

What transpired when Glauber Costa (Turso co-founder), Jarred Sumner (developer of Bun.js and CEO of Oven) and Carl Lerche (developer of Tokio and major Rust

P99 CONF OCT. 18 + 19, 2023

Register for Your Free Ticket