Python - Read Messages in Kafka Topic

This code snippet provides example of reading messages from Kafka topics. In common practices, one Kafka consumer is used to read from one partition for a topic; if there are multiple partitions for the same topic, each consumer can run in different servers or containers. Those consumers for different partitions are also configured within one consumer group. In this example, all partitions are assigned to the same consumer. This code snippet utilize Python package **kafka-python**. It can be installed via the following command if you have `pip `installed: ``` pip install kafka-python ``` Or we can also use built-in pip in Python 3: ``` python -m pip install kafka-python ``` ### References [Apache Kafka Python Producer and Consumer Clients Introduction](https://kontext.tech/article/473/apache-kafka-python-clients-introduction) [Install and Run Kafka 3.2.0 On WSL](https://kontext.tech/article/1047/install-and-run-kafka-320-on-wsl) [Install and Run Kafka 2.6.0 On Windows 10](https://kontext.tech/article/470/install-and-run-kafka-260-on-windows-10)

Kontext Kontext 0 4583 4.40 index 8/22/2022

Code description

This code snippet provides example of reading messages from Kafka topics. In common practices, one Kafka consumer is used to read from one partition for a topic; if there are multiple partitions for the same topic, each consumer can run in different servers or containers. Those consumers for different partitions are also configured within one consumer group.

In this example, all partitions are assigned to the same consumer. This code snippet utilize Python package kafka-python. It can be installed via the following command if you have pip installed:

    pip install kafka-python

Or we can also use built-in pip in Python 3:

    python -m pip install kafka-python

References

Apache Kafka Python Producer and Consumer Clients Introduction

Install and Run Kafka 3.2.0 On WSL

Install and Run Kafka 2.6.0 On Windows 10

Code snippet

    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    
    topic = 'kontext-kafka-5'
    bootstrap_servers = 'localhost:9092'
    consumer = KafkaConsumer(
        topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest')
    
    partitions = consumer.partitions_for_topic(topic)
    for p in partitions:
        topic_partition = TopicPartition(topic, p)
        # Seek offset 0
        consumer.seek(partition=topic_partition, offset=0)
        for msg in consumer:
            print(msg.value.decode("utf-8"))
kafka python

Join the Discussion

View or add your thoughts below

Comments