Kafka consumer for multiple topic

We can subscribe for multiple topic using following API :

consumer.subscribe(Arrays.asList(topic1,topic2), ConsumerRebalanceListener obj)

Consumer has the topic info and we can commit using consumer.commitAsync or consumer.commitSync() by creating OffsetAndMetadata object as follows.

ConsumerRecords<String, String> records = consumer.poll(long value);

for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    
    for (ConsumerRecord<String, String> record : partitionRecords) {
        System.out.println(record.offset() + ": " + record.value());
    }
    
    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}

Leave a Comment