How to get data from old offset point in Kafka?

The consumers belong always to a group and, for each partition, the Zookeeper keeps track of the progress of that consumer group in the partition. To fetch from the beginning, you can delete all the data associated with progress as Hussain refered ZkUtils.maybeDeletePath(${zkhost:zkport}”, “/consumers/${group.id}”); You can also specify the offset of partition you want, as … Read more

How many Kafka controllers are there in a cluster and what is the purpose of a controller?

The controller is one of the Kafka brokers that is also responsible for the task of electing partition leaders (in addition to the usual broker functionality). Is the controller just one broker? There is only 1 controller at a time. Going internally, each broker tries to create an ephemeral node in the zookeeper (/controller). The … Read more

Kafka how to read from __consumer_offsets topic

I came across this question when trying to also consume from the __consumer_offsets topic. I managed to figure it out for different Kafka versions and thought I’d share what I’d found For Kafka 0.8.2.x Note: This uses Zookeeper connection #Create consumer config echo “exclude.internal.topics=false” > /tmp/consumer.config #Consume all offsets ./kafka-console-consumer.sh –consumer.config /tmp/consumer.config \ –formatter “kafka.server.OffsetManager\$OffsetsMessageFormatter” … Read more

how do you get default Kafka configs global and per topic from command line?

Since Kafka 2.5.0 (see https://issues.apache.org/jira/browse/KAFKA-9040), you can now use the –all option to see all (default and overridden) topic configuration: % kafka-configs –bootstrap-server <KAFKA_SERVERS> –entity-type topics –entity-name <TOPIC_NAME> –describe –all All configs for topic <TOPIC_NAME> are: compression.type=producer sensitive=false synonyms={} message.format.version=1.0-IV0 sensitive=false synonyms={} file.delete.delay.ms=60000 sensitive=false synonyms={} leader.replication.throttled.replicas= sensitive=false synonyms={} max.message.bytes=1000012 sensitive=false synonyms={} min.compaction.lag.ms=0 sensitive=false synonyms={} message.timestamp.type=CreateTime … Read more

How to expose a headless Kafka service for a StatefulSet externally in Kubernetes

We have solved this in 1.7 by changing the headless service to Type=NodePort and setting the externalTrafficPolicy=Local. This bypasses the internal load balancing of a Service and traffic destined to a specific node on that node port will only work if a Kafka pod is on that node. apiVersion: v1 kind: Service metadata: name: broker … Read more

Kafka consumer fetching metadata for topics failed

The broker tells the client which hostname should be used to produce/consume messages. By default Kafka uses the hostname of the system it runs on. If this hostname can not be resolved by the client side you get this exception. You can try setting advertised.host.name in the Kafka configuration to an hostname/address which the clients … Read more

Kafka – difference between Log end offset(LEO) vs High Watermark(HW)

The high watermark indicates the offset of messages that are fully replicated, while the end-of-log offset might be larger if there are newly appended records to the leader partition which are not replicated yet. Consumers can only consume messages up to the high watermark. See this blog post for more details: http://www.confluent.io/blog/hands-free-kafka-replication-a-lesson-in-operational-simplicity/