diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2024-02-07 20:26:22 +0100 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2024-02-07 20:26:22 +0100 |
commit | 9ea81cc1d80c99969bd5fb3cb28622f58209b057 (patch) | |
tree | 0aa1b03f47f61b49cf35911707861399a3637cea | |
parent | bd2a718ea0f8613a056683d133a05c89cd6f1988 (diff) | |
download | kafka-sandbox-main.tar.gz kafka-sandbox-main.tar.bz2 kafka-sandbox-main.tar.xz kafka-sandbox-main.zip |
wipmain
-rw-r--r-- | kafka-sandbox/pom.xml | 10 | ||||
-rw-r--r-- | kafka-sandbox/src/main/java/examples/ConsumerExample.java | 71 | ||||
-rw-r--r-- | kafka-sandbox/src/main/java/examples/ProducerExample.java | 49 | ||||
-rw-r--r-- | kafka-sandbox/src/main/resources/logback.xml | 16 |
4 files changed, 108 insertions, 38 deletions
diff --git a/kafka-sandbox/pom.xml b/kafka-sandbox/pom.xml index eb33e8d..35fc41c 100644 --- a/kafka-sandbox/pom.xml +++ b/kafka-sandbox/pom.xml @@ -21,6 +21,16 @@ <artifactId>kafka-clients</artifactId> <version>${kafka-clients.version}</version> </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>1.4.14</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>2.0.9</version> + </dependency> </dependencies> </project> diff --git a/kafka-sandbox/src/main/java/examples/ConsumerExample.java b/kafka-sandbox/src/main/java/examples/ConsumerExample.java index e491035..6672166 100644 --- a/kafka-sandbox/src/main/java/examples/ConsumerExample.java +++ b/kafka-sandbox/src/main/java/examples/ConsumerExample.java @@ -1,38 +1,85 @@ package examples; -import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; public class ConsumerExample { + private static final Map<String, String> mappings = new TreeMap<>(); + public static void main(final String[] args) throws Exception { if (args.length != 1) { System.out.println("Please provide the configuration file path as a command line argument"); System.exit(1); } - final String topic = "purchases"; + var topic = "purchases"; // Load consumer configuration settings from a local file // Reusing the loadConfig method from the ProducerExample class - final Properties props = ProducerExample.loadConfig(args[0]); + var props = ProducerExample.loadConfig(args[0]); // Add additional properties. props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-java-getting-started"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - try (final Consumer<String, String> consumer = new KafkaConsumer<>(props)) { - consumer.subscribe(Arrays.asList(topic)); + var first = true; + + try (var consumer = new KafkaConsumer<String, String>(props)) { + consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + System.out.println("ConsumerExample.onPartitionsRevoked"); + partitions.forEach(p -> System.out.println("p = " + p)); + } + + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + System.out.println("ConsumerExample.onPartitionsAssigned"); + partitions.forEach(p -> System.out.println("p = " + p)); + consumer.seekToBeginning(partitions); + } + }); while (true) { - ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord<String, String> record : records) { - String key = record.key(); - String value = record.value(); - System.out.println(String.format("Consumed event from topic %s: key = %-10s value = %s", topic, key, value)); + var records = consumer.poll(Duration.ofMillis(1000)); + for (var record : records) { + var username = record.key(); + var email = record.value(); + if (!first) { + System.out.printf("Consumed event from topic %s: username = %-10s email = %s%n", topic, username, email); + } + + if (email == null) { + mappings.remove(username); + } else { + mappings.put(username, email); + } } + + if (!records.isEmpty()) { + if (first) { + System.out.println("FIRST"); + } + System.out.println("Got " + records.count() + " records!"); + showMappings(); + } + + first = false; } } } + + private static void showMappings() { + System.out.println("----------"); + for (var mapping : mappings.entrySet()) { + System.out.printf("%-10s = %s%n", mapping.getKey(), mapping.getValue()); + } + } } diff --git a/kafka-sandbox/src/main/java/examples/ProducerExample.java b/kafka-sandbox/src/main/java/examples/ProducerExample.java index 3d185cd..26879d3 100644 --- a/kafka-sandbox/src/main/java/examples/ProducerExample.java +++ b/kafka-sandbox/src/main/java/examples/ProducerExample.java @@ -1,55 +1,52 @@ package examples; -import org.apache.kafka.clients.producer.*; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; -import java.io.*; -import java.nio.file.*; -import java.util.*; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; +import java.util.Random; public class ProducerExample { - - public static void main(final String[] args) throws IOException { + public static void main(String[] args) throws IOException { if (args.length != 1) { System.out.println("Please provide the configuration file path as a command line argument"); System.exit(1); } // Load producer configuration settings from a local file - final Properties props = loadConfig(args[0]); - final String topic = "purchases"; - - String[] users = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}; - String[] items = {"book", "alarm clock", "t-shirts", "gift card", "batteries"}; - try (final Producer<String, String> producer = new KafkaProducer<>(props)) { - final Random rnd = new Random(); - final var numMessages = 10L; + var props = loadConfig(args[0]); + var topic = "purchases"; + + var users = new String[]{"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}; + var domains = new String[]{"gmail.com", "yahoo.com", "sol.no", "online.no", "hotmail.com"}; + try (var producer = new KafkaProducer<>(props)) { + var rnd = new Random(); + var numMessages = 10L; for (var i = 0L; i < numMessages; i++) { - String user = users[rnd.nextInt(users.length)]; - String item = items[rnd.nextInt(items.length)]; + var user = users[rnd.nextInt(users.length)]; + var domain = domains[rnd.nextInt(domains.length)]; producer.send( - new ProducerRecord<>(topic, user, item), + new ProducerRecord<>(topic, user, user + "@" + domain), (event, ex) -> { if (ex != null) ex.printStackTrace(); else - System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item); + System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, domain); }); } System.out.printf("%s events were produced to topic %s%n", numMessages, topic); } - } // We'll reuse this function to load properties from the Consumer as well - public static Properties loadConfig(final String configFile) throws IOException { - if (!Files.exists(Paths.get(configFile))) { - throw new IOException(configFile + " not found."); - } - final Properties cfg = new Properties(); - try (InputStream inputStream = new FileInputStream(configFile)) { + public static Properties loadConfig(String configFile) throws IOException { + try (var inputStream = new FileInputStream(configFile)) { + var cfg = new Properties(); cfg.load(inputStream); + return cfg; } - return cfg; } } diff --git a/kafka-sandbox/src/main/resources/logback.xml b/kafka-sandbox/src/main/resources/logback.xml new file mode 100644 index 0000000..62333a0 --- /dev/null +++ b/kafka-sandbox/src/main/resources/logback.xml @@ -0,0 +1,16 @@ +<configuration> + <import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/> + <import class="ch.qos.logback.core.ConsoleAppender"/> + + <appender name="STDOUT" class="ConsoleAppender"> + <encoder class="PatternLayoutEncoder"> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -%kvp- %msg%n</pattern> + </encoder> + </appender> + + <logger name="org.apache.kafka" level="INFO"/> + + <root level="DEBUG"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> |