Kafka Basics: Consumers and Consumer Groups
In this lesson, you will learn how to read data from Kafka.
Key points:
- Consumer has a position in each topic partition, which is an offset
poll()returns records starting at that position, and updates the consumer’s position- You can seek the consumer’s position to any offset
- Consumer can commit its current positions (as records sent to the
__consumer_offsetstopic), and resume reading from those offsets later - You can either manually assign topic partitions to a consumer, or dynamically subscribe a consumer to topics
- Partitions of subscribed topics are balanced across consumers in the same group
Like Lesson 2, this lesson’s code examples will use the impure Kafka Java API to demonstrate Kafka concepts. Again, please just focus on working through the code examples and learning the concepts, with the understanding that we’ll see real, production-ready code in future lessons.
If you need help with anything in this lesson, please ask in #sig-kafka.
Review

- A topic partition is an ordered log of records
- Each record in a topic partition has an offset
- A topic consists of one or more partitions
Reading Records from Topic Partitions
Let’s jump right into code! Make sure you have everything running in Minikube, and records in lesson2-topic1.
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common._
import org.apache.kafka.common.serialization._
import scala.collection.JavaConverters._
import scala.concurrent.duration._
val consumerConfig = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka-0.broker.kafka.svc.cluster.local:9092",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName
)
val consumer = new KafkaConsumer[String, String](consumerConfig.asJava)
def consume(topicPartitions: TopicPartition*): Unit = {
println()
println(s"beginning offsets: ${consumer.beginningOffsets(topicPartitions.asJava)}")
println(s"end offsets: ${consumer.endOffsets(topicPartitions.asJava)}")
consumer.assign(topicPartitions.asJava)
consumer.seekToBeginning(topicPartitions.asJava)
@scala.annotation.tailrec
def pollConsumer(): Unit = {
for (tp <- topicPartitions) println(s"position in $tp=${consumer.position(tp)}")
val records = consumer.poll(1.second.toMillis).asScala
println(s"poll() returned ${records.size} records")
records.foreach(println)
if (records.nonEmpty) pollConsumer()
}
pollConsumer()
}
consume(new TopicPartition("lesson2-topic1", 0))
consume(new TopicPartition("lesson2-topic1", 1))
consumer.close()
As with AdminClient and KafkaProducer in the last lesson, first we define configs and then create a KafkaConsumer.
Let’s skip over the consume function for a moment. Next we consume topic partition 0, and then consume topic partition 1. Finally we close the consumer.
The consume function is where all of the interesting parts are. Keep in mind that this is rather low-level, verbose usage of the consumer client API. We’ll see simpler ways to read data from Kafka later on in this lesson. We’re just learning the basics here.

We start off by printing the beginning and end offsets of all topic partitions we’re going to read. The beginning offset of a topic partition is the offset of its oldest (or earliest) record. (Why wouldn’t this always be zero? We’ll answer that in a future lesson.) The end offset of a topic partition is one more than the offset of its newest (or latest) record; it’s the offset that will be assigned to the next record written to the topic partition. Is it absolutely necessary to get these offsets (and print them) in order to read records? No. Just helping you understand more about offsets, explore the consumer API, and it’s useful to see these offsets when you run your program.
Next, we assign the specified topic partitions to the consumer. This tells the consumer which topic partitions to use in subsequent operations.
The consumer has a position for each assigned topic partition. The position is the offset of the next record that the consumer will read. We can set the consumer’s position for one of its topic partitions at any time; the API calls this seeking. Here, we seek the consumer’s position to the topic partition’s beginning offset. There are also methods to seek to the end offset and an arbitrary offset. Since our consumer’s position is the beginning offset, it will start reading at the oldest record in the topic partition, and move forward to higher offsets, in order.

Next we print the consumer’s current positions, and then call poll(), which reads and returns a list of records starting at those positions. The argument to poll() is a timeout; if we already read all records in the topic partition up to the end offset, the consumer will wait this long for new records to be written to the topic partition. We print the received records, and continue polling for more records until we’ve read them all. Each time we call poll() the consumer automatically advances its position in the topic partition to one more than the last received record’s offset.
Run your program now and examine the output. Note the beginning and end offsets, and the consumer’s position as it reads records from the topic.
Instead of seekToBeginning, call seekToEnd. What happens now?
Instead of seekToEnd, try something like for (tp <- topicPartitions) consumer.seek(tp, 2). What happens now? What if you try 10 instead of 2?
Next, replace the two separate calls to consume with this:
consume(new TopicPartition("lesson2-topic1", 0), new TopicPartition("lesson2-topic1", 1))
The consumer can read records from multiple topic partitions at the same time! Run your program and examine the output. How is the behavior different than before? Can the consumer read partitions of different topics at the same time? Try it out!
This program will stop running as soon as poll() returns zero records, which happens once we’ve read all of the records in the topic (i.e. the consumer’s position has reached the topic partition’s end offset). This may be what we want in some cases, but in other cases we want our program to keep trying to read new records “forever”. Producers may continue writing new records forever, and we may want our consumer to continue receiving those new records and processing them in some way. How could we accomplish this? Basically, just keep calling poll() until some other stopping criteria is met (e.g. process receives a SIGTERM).
To recap, you now know how Kafka consumers read records from topic partitions, starting from certain offsets.
Committing Offsets
Each time we run the program in the previous section, it starts reading records from the topic partition’s beginning offset (or the offsets we explicitly seek to). This is perfect if we want our program to read and process all records in the topic partition every time we run it.
In some cases though, we might not want to start from the beginning of the topic partition every time. Instead, we might want to start reading from the last offsets we processed in the previous run of the program, so that our program only processes each record in the topic partition once, across multiple runs of our program. Programs stop running for many reasons, e.g. a crash, a purposeful stop to deploy a new version, etc.
How would we accomplish this? We’ve already seen that we can seek a consumer to any offset in the topic partition. All we need to do now is store the consumer’s positions somewhere, so that the next run of the program can read them. We can store those offsets anywhere we want, but it turns out Kafka can help us store them.
Check out this code:
val consumerConfig = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka-0.broker.kafka.svc.cluster.local:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "Lesson3b",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName
)
val consumer = new KafkaConsumer[String, String](consumerConfig.asJava)
def consume(topicPartitions: TopicPartition*): Unit = {
println()
println(s"beginning offsets: ${consumer.beginningOffsets(topicPartitions.asJava)}")
println(s"end offsets: ${consumer.endOffsets(topicPartitions.asJava)}")
consumer.assign(topicPartitions.asJava)
consumer.seekToBeginning(topicPartitions.asJava)
@scala.annotation.tailrec
def pollConsumer(): Unit = {
for (tp <- topicPartitions) {
println(s"committed in $tp=${consumer.committed(tp)}")
println(s"position in $tp=${consumer.position(tp)}")
}
val records = consumer.poll(1.second.toMillis).asScala
println(s"poll() returned ${records.size} records")
records.foreach(println)
consumer.commitSync()
if (records.nonEmpty) pollConsumer()
}
pollConsumer()
}
consume(new TopicPartition("lesson2-topic1", 0))
consume(new TopicPartition("lesson2-topic1", 1))
consumer.close()
There are three new lines:
ConsumerConfig.GROUP_ID_CONFIG -> "Lesson3b",println(s"committed in $tp=${consumer.committed(tp)}")consumer.commitSync()
The call to commitSync() will actually store the consumer’s current positions in all assigned topic partitions as records in a special Kafka topic named __consumer_offsets. We can call committed(partition) to retrieve the last offsets that our consumer stored. Since there can be many Kafka consumers, the GROUP_ID_CONFIG specifies a unique name for our consumer used in storing and looking up offsets.
Run this program now and examine the output. You can see that we’re able to get the last committed offsets, but we’re not actually using them yet.
Replace the consumer.seekToBeginning(topicPartitions.asJava) line with the following:
for (tp <- topicPartitions) {
Option(consumer.committed(tp)) match {
case Some(om) =>
println(s"Seeking $tp to $om")
consumer.seek(tp, om.offset)
case _ =>
println(s"No offsets committed for $tp, so seeking to beginning")
consumer.seekToBeginning(List(tp).asJava)
}
}
Now we seek to the last committed offsets, if any. Run the program again with this new code. The previous run of the program read up to the end offsets and committed those; this run seeked to those end offsets, and then ended because there were no new records to consume. How can we “start over” without any committed offsets to restore from? Change the value used in GROUP_ID_CONFIG and run again. Now you should see the consumer seek to beginning, consume all records, and end. If you run again, you’ll see it seek to the end offsets and finish. It’s also fun to write more records to this topic (run a program from Lesson 2 again), run this consumer program again, and watch it read only the new records.
Our program now initially sets the consumer’s positions in its assigned topic partitions to the last committed offsets if they exist, and to something else (e.g. beginning) if this consumer has not committed offsets before. This is such commonly desired functionality that KafkaConsumer will actually do it for us - we don’t even need any of this code!
Delete the for-comprehension that calls seek and seekToBeginning and run your program again. It will initially seek to last committed offsets. Change GROUP_ID_CONFIG and run again. You should notice the consumer initializing to the end offsets when none have been committed before. There’s another config, AUTO_OFFSET_RESET_CONFIG that controls what to do when there are no committed offsets. Setting it to "earliest" will seek to beginning, and setting it to "latest" will seek to end, if there are no committed offsets; otherwise it will seek to last committed offsets. If unspecified, it will default to "latest". Add this config to your program and experiment with its different values to see how it works.
Another thing KafkaConsumer can do for you is periodically commit its current positions, so you don’t have to manually call commitSync(). If you set ENABLE_AUTO_COMMIT_CONFIG -> "true" then you don’t need to explicitly commit offsets. This is actually the default value; we’ve been setting it to "false" this whole time to teach you about committing offsets. You can control how often offsets are committed using AUTO_COMMIT_INTERVAL_MS_CONFIG which defaults to 5 seconds.
When we write programs that read records from Kafka, we first need to consider if we always want our program to start at the beginning or end of topic partitions, or if we want it to resume from where it left off last time. If we want it to resume, we then need to decide if we’ll use Kafka to remember the offsets, or if we’ll store them ourselves. And if we use Kafka to commit offsets, will we let the consumer periodically auto-commit them, or manually commit them ourselves? Usually we store committed offsets in Kafka. There is an important (advanced) case where we need to store offsets outside Kafka but that’s beyond the scope of this lesson. We’re actually going to punt on the auto vs manual offset commit question for now, and revisit it later when we talk about exactly-once processing and transactions.
To recap, you now know all about committing offsets so consumers can resume reading records where they left off. Even though KafkaConsumer can handle all of this for you, it’s extremely important to understand how it all works, and to know about the ways you can take over more manual control.
Subscribing to Topics
So far, we’ve been calling assign() to tell the consumer which topic partitions to read from. If a topic has multiple partitions, could we create multiple threads, each with its own KafkaConsumer, and assign different topic partitions to each thread’s consumer, so we can read from the different partitions in parallel? Yes! This is a key reason to create topics with multiple partitions: so the partitions can be read from and processed in parallel. You should spend a few minutes and see if you can add multiple threads and consumers to our previous code.
If multiple threads can consume different topic partitions concurrently, then surely we could run multiple instances of our program (in separate processes, potentially on different computers), with each instance assigned different topic partitions, right? Yes! This is actually more common than multiple threads of the same process consuming different partitions concurrently. If you have a topic with a very high rate of new records, or if each record takes a long time for a consumer to process, this is how you “scale out” to process topic partitions in parallel on multiple machines. Again, it would be super fun if you spent some time and implemented this idea in code.
Knowing that we can scale out and process topic partitions in parallel is great news. But when implementing this idea, the different consumers need to coordinate somehow. Each consumer needs to be assigned different partitions of the topic. That sounds like some static assignment from environment variables, or some complex distributed coordination. What if we wanted to start with a single thread/process reading all partitions of the topic, and scale out to more threads/processes only when needed?


This is such a common problem that Kafka actually solves it for us! Instead of calling consumer.assign(partitions) we can instead call consumer.subscribe(topics). When we run the first instance of our program, KafkaConsumer will discover all partitions of those topics, and assign them all to itself. If we run a 2nd instance of our program, the two consumers will coordinate behind-the-scenes, and split up the topics equally. We can keep running more instances of our program, up to the number of topic partitions, where each consumer will be reading from a single partition. And, if we start terminating program instances, partitions consumed by terminated instances will get assigned to still-running instances, all the way back down to a single instance. Thanks to offset commits, whenever a partition is assigned to a new consumer, the consumer can resume reading records where the previous consumer left off.

Note that all of the above applies to consumers that use the same value for GROUP_ID_CONFIG. We say that those consumers are in the same consumer group. Consumers within the same consumer group split up a topic’s partitions, so each partition is consumed by a single consumer. On the other hand, consumers in different consumer groups don’t coordinate, so they consume topic partitions independently. This behavior allows Kafka topics to work somewhat like both traditional messaging queues and pub-sub topics: each record in a topic is only read by a single consumer in a group, which is similar to multiple workers consuming the same message queue. Each record in a topic is read by every consumer group though, which is similar to a pub-sub topic.
The code for using subscribe is similar to assign:
val consumerConfig = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka-0.broker.kafka.svc.cluster.local:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "Lesson3c",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName
)
val consumer = new KafkaConsumer[String, String](consumerConfig.asJava)
def consume(topics: String*): Unit = {
val topicPartitions = topics.flatMap(t => consumer.partitionsFor(t).asScala).map(p => new TopicPartition(p.topic, p.partition))
println()
println(s"beginning offsets: ${consumer.beginningOffsets(topicPartitions.asJava)}")
println(s"end offsets: ${consumer.endOffsets(topicPartitions.asJava)}")
consumer.subscribe(topics.asJava)
@scala.annotation.tailrec
def pollConsumer(): Unit = {
for (tp <- topicPartitions) {
println(s"committed in $tp=${consumer.committed(tp)}")
Try(println(s"position in $tp=${consumer.position(tp)}"))
}
val records = consumer.poll(1.second.toMillis).asScala
println(s"poll() returned ${records.size} records")
records.foreach(println)
consumer.commitSync()
if (records.nonEmpty) pollConsumer()
}
pollConsumer()
}
consume("lesson2-topic1")
consumer.close()
We do a little extra work now to get the partitions for the consumed topics, but only because we’re printing info like beginning, end, committed, and position offsets. But we still call poll() and commitSync() just like before. KafkaConsumer gives you a lot of control over how exactly you use it to read records, but in a surprisingly consistent, small API.
To recap, you now know the difference between assign and subscribe, and how consumers in the same group (dynamically, automatically) split up topic partitions to read from them concurrently.
Consuming Multiple Topics
We know that a topic’s partitions are balanced across consumers in the same group. What if multiple consumers in the same group subscribe to multiple topics? Let’s write some code and find out.
val bootstrapServers = "kafka-0.broker.kafka.svc.cluster.local:9092"
val topic1 = "lesson3-topic1a"
val topic2 = "lesson3-topic1b"
val adminClientConfig = Map[String, Object](
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers
)
val adminClient = AdminClient.create(adminClientConfig.asJava)
def createTopic(name: String, partitions: Int): Unit =
if (!adminClient.listTopics().names().get().asScala.contains(name))
adminClient.createTopics(List(new NewTopic(name, partitions, 1)).asJava)
createTopic(topic1, 2)
createTopic(topic2, 2)
adminClient.close()
val consumerConfig = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG -> "Lesson3e",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName
)
val consumer = new KafkaConsumer[String, String](consumerConfig.asJava)
consumer.subscribe(List(topic1, topic2).asJava, new ConsumerRebalanceListener() {
def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]): Unit =
println(s"Partitions assigned: ${partitions.asScala.toList.map(_.toString).sorted}")
def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]): Unit =
println(s"Partitions revoked: ${partitions.asScala.toList.map(_.toString).sorted}")
})
while (true) {
consumer.poll(1.second.toMillis)
}
First, we create two topics, each with two partitions. Then we subscribe to both topics, and poll forever (so the program keeps running).
When calling subscribe we also provide a ConsumerRebalanceListener that prints the assigned or revoked partitions. This lets us see exactly which partitions are assigned to the consumer, and shows us when those assignments change.
Run this program. When it starts, notice that all partitions of both topics are assigned to this consumer. This makes sense: there’s only one consumer in the group, so it should read from all topic partitions.
Now, in another terminal, run a second instance of the program. Notice that the second consumer is only assigned one partition from each topic. Look back in the first terminal, the first consumer shows that all of its partitions were revoked, and then it was re-assigned the other partition of each topic that the second consumer is not assigned. So the partitions of both topics are balanced across both consumers in the group, with each consumer assigned some partitions from each topic. Also note that partitions were rebalanced automatically when the second consumer started - this allows easy scale out to achieve more parallelism in our programs.

Stop one of the programs, and watch the other terminal. After a few seconds you should see another rebalance, with all partitions assigned to the remaining consumer. Again, this happened automatically, and allows easy “scale in”. You can stop the remaining program now.
Now create a third topic (lesson3-topic1c) with two partitions, add topic3 to the list in subscribe, and run the program twice. As you probably expected, one partition of each of the three topics is assigned to each consumer.
Now create new topics, each with three partitions (remember to change the topic names, e.g. lesson3-topic2a, lesson3-topic2b, lesson3-topic2c) and run the program twice. Three partitions can’t be balanced evenly across two consumers, but Kafka does its best: one consumer gets two partitions of each topic, while the other consumer gets one partition of each topic. What would happen if you ran a third instance of the program? Try it!
Another thing to notice is that in every case above, the same partition number of each topic was always assigned to the same consumer. If consumer 1 was assigned lesson3-topic1a-0 then it was also assigned lesson3-topic1b-0. Even with three partitions and two consumers, the partition 0s were assigned to the same consumer, the partition 1s were assigned to the same consumer, and the partition 2s were assigned to the same consumer. This may just seem like a coincidence, but it is actually intentional and deterministic. Can you guess why? Think back to Lesson 2 where you learned that the producer will choose the partition to write a record to based on the record’s key. Records with the same key are stored in the same partition, and this applies even across different topics. As long as two topics contain the same number of partitions, records stored in both topics with the same keys will end up in the same partition numbers. This turns out to be an extremely important property when doing joins across multiple topics in stream processing, as we’ll see in a future lesson. Choosing the number of topic partitions and the fields to use as record keys is very important in systems based on Kafka.
Consumer Considerations
All we wanted to do was read records from topics, but this seems so complicated! There are indeed important things to consider when consuming records from topics. Fortunately, KafkaConsumer is able to handle a lot of common cases for you. As you begin to incorporate Kafka into your systems, here is a review of things to consider when you use consumers.
Same Group or Different Groups?
TODO
Subscribe or Assign?
TODO
Commit Offsets and Resume, or Always Start Over?
TODO
Auto or Manual Commits?
TODO
Without Committed Offsets, Start at Earliest or Latest?
TODO
How Many Topic Partitions? How Many Consumer Instances?
TODO
Summary
In this lesson, you wrote code that used KafkaConsumer to read records from Kafka topics. You learned how the consumer uses its position to read records, how to commit offsets, and how to use Kafka’s dynamic subscription and group membership functionality to do things for you.
Key points:
- Consumer has a position in each topic partition, which is an offset
poll()returns records starting at that position, and updates the consumer’s position- You can seek the consumer’s position to any offset
- Consumer can commit its current positions (as records sent to the
__consumer_offsetstopic), and resume reading from those offsets later - You can either manually assign topic partitions to a consumer, or dynamically subscribe a consumer to topics
- Partitions of subscribed topics are balanced across consumers in the same group
In the next lesson, we’ll learn how to configure Kafka topics to have different behavior.