How to create topics in apache kafka?

When you are starting your Kafka broker you can define set of properties in conf/server.properties file. This file is just key value property file. One of the properties is auto.create.topics.enable, if it’s set to true (by default) Kafka will create topics automatically when you send messages to non-existing topics. All config options you can find … Read more

Can I have 100s of thousands of topics in a Kafka Cluster?

Update March 2021: With Kafka’s new KRaft mode, which entirely removes ZooKeeper from Kafka’s architecture, a Kafka cluster can handle millions of topics/partitions. See https://www.confluent.io/blog/kafka-without-zookeeper-a-sneak-peek/ for details. *short for “Kafka Raft Metadata mode”; in Early Access as of Kafka v2.8 Update September 2018: As of Kafka v2.0, a Kafka cluster can have hundreds of thousands … Read more

How do I delete/clean Kafka queued messages without deleting Topic

In 0.11 or higher you can run the bin/kafka-delete-records.sh command to mark messages for deletion. https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh For example, publish 100 messages seq 100 | ./bin/kafka-console-producer.sh –broker-list localhost:9092 –topic mytest then delete 90 of those 100 messages with the new kafka-delete-records.sh command line tool ./bin/kafka-delete-records.sh –bootstrap-server localhost:9092 –offset-json-file ./offsetfile.json where offsetfile.json contains {“partitions”: [{“topic”: “mytest”, “partition”: … Read more

How to send final kafka-streams aggregation result of a time windowed KTable?

Update 2 With KIP-825 (Apache Kafka 3.3), you can specify a “emit strategy” via windowedBy(…). Default is EMIT_EAGER but you can also specify EMIT_FINAL to only get a single result per key and window when a window closes (ie, at point window-end + grace-period. Update 1 With KIP-328 (Apache Kafka 2.1), a KTable#suppress() operator is … Read more

In Kafka is each message replicated across all partitions of a topic?

Partitioning and replication are two different things. Partitioning is for scalability. A topic is partitioned in one or more partitions distributed on different brokers so that more consumers can connect to these brokers in order to receive messages sent to the same topic but from different partitions. Increasing partitions increases scalability and the possibility to … Read more

getting “org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)”

1195725856 is GET[space] encoded as a big-endian, four-byte integer (see here for more information on how that works). This indicates that HTTP traffic is being sent to Kafka port 9092, but Kafka doesn’t accept HTTP traffic, it only accepts its own protocol (which takes the first four bytes as the receive size, hence the error). … Read more

Consumer not receiving messages, kafka console, new consumer api, Kafka 0.9

I my MAC box I was facing the same issue of console-consumer not consuming any messages when used the command kafka-console-consumer –bootstrap-server localhost:9095 –from-beginning –topic my-replicated-topic But when I tried with kafka-console-consumer –bootstrap-server localhost:9095 –from-beginning –topic my-replicated-topic –partition 0 It happily lists the messages sent. Is this a bug in Kafka 1.10.11?

Increase the number of messages read by a Kafka consumer in a single poll

You can increase Consumer poll() batch size by increasing max.partition.fetch.bytes, but still as per documentation it has limitation with fetch.max.bytes which also need to be increased with required batch size. And also from the documentation there is one other property message.max.bytes in Topic config and Broker config to restrict the batch size. so one way … Read more