Processing guarantee in Kafka

Ref: https://medium.com/@andy.bryant/processing-guarantees-in-kafka-12dd2e30be0e


Each of the projects I’ve worked on in the last few years has involved a distributed message system such as AWS SQS, AWS Kinesis and more often than not Apache Kafka. In designing these systems, we invariably need to consider questions such as:

How do we guarantee all messages are processed?

How do we avoid or handle duplicate messages?

These simple questions are surprisingly hard to answer. To do so, we need to delve into how producers and consumers interact with distributed messaging systems. In this post I’ll be looking at message processing guarantees and the implications these have when designing and building systems around distributed messages systems. I will be specifically concentrating on the Apache Kafka platform as it’s such a popular choice and the one I am most familiar with.

Basic Architecture

A producer process reads from some data source which may or may not be local, then writes to the messaging system over the network. The messaging system persists the message, typically in multiple locations for redundancy. One or more consumers poll the messaging system over the network, receive batches of new messages and perform some action on these messages, often transforming the data and writing to some other remote data store, possibly back to the messaging system. This basic design applies to Apache Kafka, Apache Pulsar, AWS Kinesis, AWS SQS, Google Cloud Pub/Sub and Azure Event Hubs among others.


Different categories

  • No guarantee — No explicit guarantee is provided, so consumers may process messages once, multiple times or never at all.

  • At most once — This is “best effort” delivery semantics. Consumers will receive and process messages exactly once or not at all.

  • At least once — Consumers will receive and process every message, but they may process the same message more than once.

  • Effectively once — Also contentiously known as exactly once, this promises consumers will process every message once.

At this point, you may be asking why is this so complicated? Why isn’t it always effectively once? What’s causing messages to go missing or appear more than once? The short answer to this is system behaviour in the face of failure. The key word in describing these architectures is distributed. Here is a small subset of failure scenarios that you will need to consider:

  • Producer failure

  • Consumer publish remote call failure

  • Messaging system failure

  • Consumer processing failure

Your consumer process could run out of memory and crash while writing to a downstream database; your broker could run out of disk space; a network partition may form between ZooKeeper instances; a timeout could occur publishing messages to Kafka. These types of failures are not just hypothetical — they can and will happen with any non-trivial system in any and all environments including production. How these failures are handled determines the processing guarantee of the system as a whole.

Before looking at the different types of guarantees in detail, we’ll have a quick look at the Kafka consumer API as the implementation is relevant to many of the examples given below.


Kafka Consumer API

Processes pull data from Kafka using the consumer API. When creating a Consumer, a client may specify a consumer group. This identifies a collection of consumers that coordinate to read data from a set of topic partitions. The partitions of any topics subscribed to by consumers in a consumer group are guaranteed to be assigned to at most one individual consumer in that group at any time. The messages from each topic partition are delivered to the assigned consumer strictly in the order they are stored in the log. To save progress in reading data from Kafka, a consumer needs to save the offset of the next message it will read in each topic partition it is assigned to. Consumers are free to store their offsets wherever they want but by default and for all Kafka Streams applications, these are stored back in Kafka itself in an internal topic called _consumer_offsets. To use this mechanism consumers either enable automatic periodic commitment of offsets back to Kafka by setting the configuration flag enable.auto.commit to true or by making an explicit call to commit the offsets. In the example below, the consumer would store offset 5 for topic-a partition-0 and offset 7 for topic-a partition-1.

Let’s now look at each of these guarantees in detail and see some real-world examples of systems that provide each guarantee.


No guarantee

A system that provides no guarantee means any given message could be processed once, multiple times or not at all. With Kafka a simple scenario where you will end up with these semantics is if you have a consumer with enable.auto.commit set to true (this is the default) and for each batch of messages you asynchronously process and save the results to a database.

With auto commit enabled, the consumer will save offsets back to Kafka periodically at the start of subsequent poll calls. The frequency of these commits is determined by the configuration parameter auto.commit.interval.ms. If you save the messages to the database then the application crashes before the progress is saved, you will reprocess those messages again the next run and save them to the database twice. If progress is saved prior to the results being saved to the database, then the program crashes, these messages will not be reprocessed in the next run meaning you have data loss.

有可能存两次,也有可能一次都没存。


At most once

At most once guarantee means the message will be processed exactly once, or not at all. This guarantee is often known as “best effort” semantics.

A common example that results in at most once semantics is where a producer performs a ‘fire-and-forget’ approach sending a message to Kafka with no retries and ignoring any response from the broker. This approach is useful where progress is a higher priority than completeness.

A producer saves its progress reading from a source system first, then writes data into Kafka. If the producer crashes before the second step, the data will never be delivered to Kafka.

producer 先读一下状态,保存状态;然后再 deliver 到 kafka

A consumer receives a batch of messages from Kafka, transforms these and writes the results to a database. The consumer application has enable.auto.commit set to false and is programmed to commit their offsets back to Kafka prior to writing to the database. If the consumer fails after saving the offsets back to Kafka but before writing the data to the database, it will skip these records next time it runs and data will be lost.

consumer 如果在保存完 offset 之后 fail 了,那么下一次 consumer 就会跳过这些 record。也就是说这些 data 就丢失了。


At least once

At least once guarantee means you will definitely receive and process every message, but you may process some messages additional times in the face of a failure. Here’s a few examples of some failure scenarios that can lead to at-least-once semantics:

An application sends a batch of messages to Kafka. The application never receives a response so sends the batch again. In this case it may have been the first batch was successfully saved, but the acknowledgement was lost, so the messages end up being added twice.

acknowledge 如果丢失了,有可能会发两遍。

An application processes a large file containing events. It starts processing the file sending a message to Kafka for each event. Half way through processing the file the process dies and is restarted. It then starts processing the file again from the start and only marks it as processed when the whole file has been read. In this case the events from the first half of the file will be in Kafka twice.

处理的时候,process die 了,也可能会发两遍

A consumer receives a batch of messages from Kafka, transforms these and writes the results to a database. The consumer application has enable.auto.commit set to false and is programmed to commit their offsets back to Kafka once the database write succeeds. If the consumer fails after writing the data to the database but before saving the offsets back to Kafka, it will reprocess the same records next time it runs and save them to the database once more.


Effectively once

Many distributed messaging systems such as Pulsar and Prevega as well as data processing systems such as Kafka Streams, Spark, Flink, Delta Lake and Cloud Dataflow claim exactly-once or effectively-once semantics in certain scenarios. In 2017 Confluent introduced Exactly Once semantics to Apache Kafka 0.11. Achieving exactly-once, or as many prefer to call it, effectively-once was a multi-year effort involving a detailed public specification, extensive real world testing, changes in the wire protocol and two new low level features to make it all work — idempotent writes and transactions. We’ll start by looking at these two features to see what they are and why they’re necessary for effectively-once support.

Last updated