diff options
Diffstat (limited to 'kafka-sandbox/src/main/java/examples/ConsumerExample.java')
-rw-r--r-- | kafka-sandbox/src/main/java/examples/ConsumerExample.java | 71 |
1 files changed, 59 insertions, 12 deletions
diff --git a/kafka-sandbox/src/main/java/examples/ConsumerExample.java b/kafka-sandbox/src/main/java/examples/ConsumerExample.java index e491035..6672166 100644 --- a/kafka-sandbox/src/main/java/examples/ConsumerExample.java +++ b/kafka-sandbox/src/main/java/examples/ConsumerExample.java @@ -1,38 +1,85 @@ package examples; -import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; public class ConsumerExample { + private static final Map<String, String> mappings = new TreeMap<>(); + public static void main(final String[] args) throws Exception { if (args.length != 1) { System.out.println("Please provide the configuration file path as a command line argument"); System.exit(1); } - final String topic = "purchases"; + var topic = "purchases"; // Load consumer configuration settings from a local file // Reusing the loadConfig method from the ProducerExample class - final Properties props = ProducerExample.loadConfig(args[0]); + var props = ProducerExample.loadConfig(args[0]); // Add additional properties. props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-java-getting-started"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - try (final Consumer<String, String> consumer = new KafkaConsumer<>(props)) { - consumer.subscribe(Arrays.asList(topic)); + var first = true; + + try (var consumer = new KafkaConsumer<String, String>(props)) { + consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + System.out.println("ConsumerExample.onPartitionsRevoked"); + partitions.forEach(p -> System.out.println("p = " + p)); + } + + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + System.out.println("ConsumerExample.onPartitionsAssigned"); + partitions.forEach(p -> System.out.println("p = " + p)); + consumer.seekToBeginning(partitions); + } + }); while (true) { - ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord<String, String> record : records) { - String key = record.key(); - String value = record.value(); - System.out.println(String.format("Consumed event from topic %s: key = %-10s value = %s", topic, key, value)); + var records = consumer.poll(Duration.ofMillis(1000)); + for (var record : records) { + var username = record.key(); + var email = record.value(); + if (!first) { + System.out.printf("Consumed event from topic %s: username = %-10s email = %s%n", topic, username, email); + } + + if (email == null) { + mappings.remove(username); + } else { + mappings.put(username, email); + } } + + if (!records.isEmpty()) { + if (first) { + System.out.println("FIRST"); + } + System.out.println("Got " + records.count() + " records!"); + showMappings(); + } + + first = false; } } } + + private static void showMappings() { + System.out.println("----------"); + for (var mapping : mappings.entrySet()) { + System.out.printf("%-10s = %s%n", mapping.getKey(), mapping.getValue()); + } + } } |