summaryrefslogtreecommitdiff
path: root/kafka-sandbox/src/main/java/examples/ConsumerExample.java
diff options
context:
space:
mode:
Diffstat (limited to 'kafka-sandbox/src/main/java/examples/ConsumerExample.java')
-rw-r--r--kafka-sandbox/src/main/java/examples/ConsumerExample.java71
1 files changed, 59 insertions, 12 deletions
diff --git a/kafka-sandbox/src/main/java/examples/ConsumerExample.java b/kafka-sandbox/src/main/java/examples/ConsumerExample.java
index e491035..6672166 100644
--- a/kafka-sandbox/src/main/java/examples/ConsumerExample.java
+++ b/kafka-sandbox/src/main/java/examples/ConsumerExample.java
@@ -1,38 +1,85 @@
package examples;
-import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
-import java.util.Arrays;
-import java.util.Properties;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
public class ConsumerExample {
+ private static final Map<String, String> mappings = new TreeMap<>();
+
public static void main(final String[] args) throws Exception {
if (args.length != 1) {
System.out.println("Please provide the configuration file path as a command line argument");
System.exit(1);
}
- final String topic = "purchases";
+ var topic = "purchases";
// Load consumer configuration settings from a local file
// Reusing the loadConfig method from the ProducerExample class
- final Properties props = ProducerExample.loadConfig(args[0]);
+ var props = ProducerExample.loadConfig(args[0]);
// Add additional properties.
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-java-getting-started");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- try (final Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
- consumer.subscribe(Arrays.asList(topic));
+ var first = true;
+
+ try (var consumer = new KafkaConsumer<String, String>(props)) {
+ consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ System.out.println("ConsumerExample.onPartitionsRevoked");
+ partitions.forEach(p -> System.out.println("p = " + p));
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ System.out.println("ConsumerExample.onPartitionsAssigned");
+ partitions.forEach(p -> System.out.println("p = " + p));
+ consumer.seekToBeginning(partitions);
+ }
+ });
while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records) {
- String key = record.key();
- String value = record.value();
- System.out.println(String.format("Consumed event from topic %s: key = %-10s value = %s", topic, key, value));
+ var records = consumer.poll(Duration.ofMillis(1000));
+ for (var record : records) {
+ var username = record.key();
+ var email = record.value();
+ if (!first) {
+ System.out.printf("Consumed event from topic %s: username = %-10s email = %s%n", topic, username, email);
+ }
+
+ if (email == null) {
+ mappings.remove(username);
+ } else {
+ mappings.put(username, email);
+ }
}
+
+ if (!records.isEmpty()) {
+ if (first) {
+ System.out.println("FIRST");
+ }
+ System.out.println("Got " + records.count() + " records!");
+ showMappings();
+ }
+
+ first = false;
}
}
}
+
+ private static void showMappings() {
+ System.out.println("----------");
+ for (var mapping : mappings.entrySet()) {
+ System.out.printf("%-10s = %s%n", mapping.getKey(), mapping.getValue());
+ }
+ }
}