summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka-sandbox/pom.xml10
-rw-r--r--kafka-sandbox/src/main/java/examples/ConsumerExample.java71
-rw-r--r--kafka-sandbox/src/main/java/examples/ProducerExample.java49
-rw-r--r--kafka-sandbox/src/main/resources/logback.xml16
4 files changed, 108 insertions, 38 deletions
diff --git a/kafka-sandbox/pom.xml b/kafka-sandbox/pom.xml
index eb33e8d..35fc41c 100644
--- a/kafka-sandbox/pom.xml
+++ b/kafka-sandbox/pom.xml
@@ -21,6 +21,16 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
</dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.4.14</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>2.0.9</version>
+ </dependency>
</dependencies>
</project>
diff --git a/kafka-sandbox/src/main/java/examples/ConsumerExample.java b/kafka-sandbox/src/main/java/examples/ConsumerExample.java
index e491035..6672166 100644
--- a/kafka-sandbox/src/main/java/examples/ConsumerExample.java
+++ b/kafka-sandbox/src/main/java/examples/ConsumerExample.java
@@ -1,38 +1,85 @@
package examples;
-import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
-import java.util.Arrays;
-import java.util.Properties;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
public class ConsumerExample {
+ private static final Map<String, String> mappings = new TreeMap<>();
+
public static void main(final String[] args) throws Exception {
if (args.length != 1) {
System.out.println("Please provide the configuration file path as a command line argument");
System.exit(1);
}
- final String topic = "purchases";
+ var topic = "purchases";
// Load consumer configuration settings from a local file
// Reusing the loadConfig method from the ProducerExample class
- final Properties props = ProducerExample.loadConfig(args[0]);
+ var props = ProducerExample.loadConfig(args[0]);
// Add additional properties.
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-java-getting-started");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- try (final Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
- consumer.subscribe(Arrays.asList(topic));
+ var first = true;
+
+ try (var consumer = new KafkaConsumer<String, String>(props)) {
+ consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ System.out.println("ConsumerExample.onPartitionsRevoked");
+ partitions.forEach(p -> System.out.println("p = " + p));
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ System.out.println("ConsumerExample.onPartitionsAssigned");
+ partitions.forEach(p -> System.out.println("p = " + p));
+ consumer.seekToBeginning(partitions);
+ }
+ });
while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records) {
- String key = record.key();
- String value = record.value();
- System.out.println(String.format("Consumed event from topic %s: key = %-10s value = %s", topic, key, value));
+ var records = consumer.poll(Duration.ofMillis(1000));
+ for (var record : records) {
+ var username = record.key();
+ var email = record.value();
+ if (!first) {
+ System.out.printf("Consumed event from topic %s: username = %-10s email = %s%n", topic, username, email);
+ }
+
+ if (email == null) {
+ mappings.remove(username);
+ } else {
+ mappings.put(username, email);
+ }
}
+
+ if (!records.isEmpty()) {
+ if (first) {
+ System.out.println("FIRST");
+ }
+ System.out.println("Got " + records.count() + " records!");
+ showMappings();
+ }
+
+ first = false;
}
}
}
+
+ private static void showMappings() {
+ System.out.println("----------");
+ for (var mapping : mappings.entrySet()) {
+ System.out.printf("%-10s = %s%n", mapping.getKey(), mapping.getValue());
+ }
+ }
}
diff --git a/kafka-sandbox/src/main/java/examples/ProducerExample.java b/kafka-sandbox/src/main/java/examples/ProducerExample.java
index 3d185cd..26879d3 100644
--- a/kafka-sandbox/src/main/java/examples/ProducerExample.java
+++ b/kafka-sandbox/src/main/java/examples/ProducerExample.java
@@ -1,55 +1,52 @@
package examples;
-import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
-import java.io.*;
-import java.nio.file.*;
-import java.util.*;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.Random;
public class ProducerExample {
-
- public static void main(final String[] args) throws IOException {
+ public static void main(String[] args) throws IOException {
if (args.length != 1) {
System.out.println("Please provide the configuration file path as a command line argument");
System.exit(1);
}
// Load producer configuration settings from a local file
- final Properties props = loadConfig(args[0]);
- final String topic = "purchases";
-
- String[] users = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"};
- String[] items = {"book", "alarm clock", "t-shirts", "gift card", "batteries"};
- try (final Producer<String, String> producer = new KafkaProducer<>(props)) {
- final Random rnd = new Random();
- final var numMessages = 10L;
+ var props = loadConfig(args[0]);
+ var topic = "purchases";
+
+ var users = new String[]{"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"};
+ var domains = new String[]{"gmail.com", "yahoo.com", "sol.no", "online.no", "hotmail.com"};
+ try (var producer = new KafkaProducer<>(props)) {
+ var rnd = new Random();
+ var numMessages = 10L;
for (var i = 0L; i < numMessages; i++) {
- String user = users[rnd.nextInt(users.length)];
- String item = items[rnd.nextInt(items.length)];
+ var user = users[rnd.nextInt(users.length)];
+ var domain = domains[rnd.nextInt(domains.length)];
producer.send(
- new ProducerRecord<>(topic, user, item),
+ new ProducerRecord<>(topic, user, user + "@" + domain),
(event, ex) -> {
if (ex != null)
ex.printStackTrace();
else
- System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item);
+ System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, domain);
});
}
System.out.printf("%s events were produced to topic %s%n", numMessages, topic);
}
-
}
// We'll reuse this function to load properties from the Consumer as well
- public static Properties loadConfig(final String configFile) throws IOException {
- if (!Files.exists(Paths.get(configFile))) {
- throw new IOException(configFile + " not found.");
- }
- final Properties cfg = new Properties();
- try (InputStream inputStream = new FileInputStream(configFile)) {
+ public static Properties loadConfig(String configFile) throws IOException {
+ try (var inputStream = new FileInputStream(configFile)) {
+ var cfg = new Properties();
cfg.load(inputStream);
+ return cfg;
}
- return cfg;
}
}
diff --git a/kafka-sandbox/src/main/resources/logback.xml b/kafka-sandbox/src/main/resources/logback.xml
new file mode 100644
index 0000000..62333a0
--- /dev/null
+++ b/kafka-sandbox/src/main/resources/logback.xml
@@ -0,0 +1,16 @@
+<configuration>
+ <import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
+ <import class="ch.qos.logback.core.ConsoleAppender"/>
+
+ <appender name="STDOUT" class="ConsoleAppender">
+ <encoder class="PatternLayoutEncoder">
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -%kvp- %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.kafka" level="INFO"/>
+
+ <root level="DEBUG">
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>