From 9ea81cc1d80c99969bd5fb3cb28622f58209b057 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 7 Feb 2024 20:26:22 +0100 Subject: wip --- kafka-sandbox/pom.xml | 10 +++ .../src/main/java/examples/ConsumerExample.java | 71 ++++++++++++++++++---- .../src/main/java/examples/ProducerExample.java | 49 +++++++-------- kafka-sandbox/src/main/resources/logback.xml | 16 +++++ 4 files changed, 108 insertions(+), 38 deletions(-) create mode 100644 kafka-sandbox/src/main/resources/logback.xml diff --git a/kafka-sandbox/pom.xml b/kafka-sandbox/pom.xml index eb33e8d..35fc41c 100644 --- a/kafka-sandbox/pom.xml +++ b/kafka-sandbox/pom.xml @@ -21,6 +21,16 @@ kafka-clients ${kafka-clients.version} + + ch.qos.logback + logback-classic + 1.4.14 + + + org.slf4j + slf4j-api + 2.0.9 + diff --git a/kafka-sandbox/src/main/java/examples/ConsumerExample.java b/kafka-sandbox/src/main/java/examples/ConsumerExample.java index e491035..6672166 100644 --- a/kafka-sandbox/src/main/java/examples/ConsumerExample.java +++ b/kafka-sandbox/src/main/java/examples/ConsumerExample.java @@ -1,38 +1,85 @@ package examples; -import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; public class ConsumerExample { + private static final Map mappings = new TreeMap<>(); + public static void main(final String[] args) throws Exception { if (args.length != 1) { System.out.println("Please provide the configuration file path as a command line argument"); System.exit(1); } - final String topic = "purchases"; + var topic = "purchases"; // Load consumer configuration settings from a local file // Reusing the loadConfig method from the ProducerExample class - final Properties props = ProducerExample.loadConfig(args[0]); + var props = ProducerExample.loadConfig(args[0]); // Add additional properties. props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-java-getting-started"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - try (final Consumer consumer = new KafkaConsumer<>(props)) { - consumer.subscribe(Arrays.asList(topic)); + var first = true; + + try (var consumer = new KafkaConsumer(props)) { + consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + System.out.println("ConsumerExample.onPartitionsRevoked"); + partitions.forEach(p -> System.out.println("p = " + p)); + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + System.out.println("ConsumerExample.onPartitionsAssigned"); + partitions.forEach(p -> System.out.println("p = " + p)); + consumer.seekToBeginning(partitions); + } + }); while (true) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - String key = record.key(); - String value = record.value(); - System.out.println(String.format("Consumed event from topic %s: key = %-10s value = %s", topic, key, value)); + var records = consumer.poll(Duration.ofMillis(1000)); + for (var record : records) { + var username = record.key(); + var email = record.value(); + if (!first) { + System.out.printf("Consumed event from topic %s: username = %-10s email = %s%n", topic, username, email); + } + + if (email == null) { + mappings.remove(username); + } else { + mappings.put(username, email); + } } + + if (!records.isEmpty()) { + if (first) { + System.out.println("FIRST"); + } + System.out.println("Got " + records.count() + " records!"); + showMappings(); + } + + first = false; } } } + + private static void showMappings() { + System.out.println("----------"); + for (var mapping : mappings.entrySet()) { + System.out.printf("%-10s = %s%n", mapping.getKey(), mapping.getValue()); + } + } } diff --git a/kafka-sandbox/src/main/java/examples/ProducerExample.java b/kafka-sandbox/src/main/java/examples/ProducerExample.java index 3d185cd..26879d3 100644 --- a/kafka-sandbox/src/main/java/examples/ProducerExample.java +++ b/kafka-sandbox/src/main/java/examples/ProducerExample.java @@ -1,55 +1,52 @@ package examples; -import org.apache.kafka.clients.producer.*; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; -import java.io.*; -import java.nio.file.*; -import java.util.*; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; +import java.util.Random; public class ProducerExample { - - public static void main(final String[] args) throws IOException { + public static void main(String[] args) throws IOException { if (args.length != 1) { System.out.println("Please provide the configuration file path as a command line argument"); System.exit(1); } // Load producer configuration settings from a local file - final Properties props = loadConfig(args[0]); - final String topic = "purchases"; - - String[] users = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}; - String[] items = {"book", "alarm clock", "t-shirts", "gift card", "batteries"}; - try (final Producer producer = new KafkaProducer<>(props)) { - final Random rnd = new Random(); - final var numMessages = 10L; + var props = loadConfig(args[0]); + var topic = "purchases"; + + var users = new String[]{"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}; + var domains = new String[]{"gmail.com", "yahoo.com", "sol.no", "online.no", "hotmail.com"}; + try (var producer = new KafkaProducer<>(props)) { + var rnd = new Random(); + var numMessages = 10L; for (var i = 0L; i < numMessages; i++) { - String user = users[rnd.nextInt(users.length)]; - String item = items[rnd.nextInt(items.length)]; + var user = users[rnd.nextInt(users.length)]; + var domain = domains[rnd.nextInt(domains.length)]; producer.send( - new ProducerRecord<>(topic, user, item), + new ProducerRecord<>(topic, user, user + "@" + domain), (event, ex) -> { if (ex != null) ex.printStackTrace(); else - System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item); + System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, domain); }); } System.out.printf("%s events were produced to topic %s%n", numMessages, topic); } - } // We'll reuse this function to load properties from the Consumer as well - public static Properties loadConfig(final String configFile) throws IOException { - if (!Files.exists(Paths.get(configFile))) { - throw new IOException(configFile + " not found."); - } - final Properties cfg = new Properties(); - try (InputStream inputStream = new FileInputStream(configFile)) { + public static Properties loadConfig(String configFile) throws IOException { + try (var inputStream = new FileInputStream(configFile)) { + var cfg = new Properties(); cfg.load(inputStream); + return cfg; } - return cfg; } } diff --git a/kafka-sandbox/src/main/resources/logback.xml b/kafka-sandbox/src/main/resources/logback.xml new file mode 100644 index 0000000..62333a0 --- /dev/null +++ b/kafka-sandbox/src/main/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -%kvp- %msg%n + + + + + + + + + -- cgit v1.2.3