summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2024-02-07 19:20:51 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2024-02-07 19:20:51 +0100
commitbd2a718ea0f8613a056683d133a05c89cd6f1988 (patch)
treea9f0e85123a8a1d30d3dba6ed3977e437306cb50
parenta457621e444b4e0719c076a172a62d823616a122 (diff)
downloadkafka-sandbox-bd2a718ea0f8613a056683d133a05c89cd6f1988.tar.gz
kafka-sandbox-bd2a718ea0f8613a056683d133a05c89cd6f1988.tar.bz2
kafka-sandbox-bd2a718ea0f8613a056683d133a05c89cd6f1988.tar.xz
kafka-sandbox-bd2a718ea0f8613a056683d133a05c89cd6f1988.zip
wip
-rw-r--r--.gitignore3
-rw-r--r--kafka-sandbox/.gitignore38
-rw-r--r--kafka-sandbox/kafka.properties5
-rw-r--r--kafka-sandbox/pom.xml26
-rw-r--r--kafka-sandbox/src/main/java/examples/ConsumerExample.java38
-rw-r--r--kafka-sandbox/src/main/java/examples/ProducerExample.java55
6 files changed, 165 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
index 9fb18b4..cfaa919 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,5 @@
.idea
out
+*.tmp
+tmp
+*.tmp.*
diff --git a/kafka-sandbox/.gitignore b/kafka-sandbox/.gitignore
new file mode 100644
index 0000000..5ff6309
--- /dev/null
+++ b/kafka-sandbox/.gitignore
@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store \ No newline at end of file
diff --git a/kafka-sandbox/kafka.properties b/kafka-sandbox/kafka.properties
new file mode 100644
index 0000000..45042ad
--- /dev/null
+++ b/kafka-sandbox/kafka.properties
@@ -0,0 +1,5 @@
+bootstrap.servers=localhost:9092
+key.serializer=org.apache.kafka.common.serialization.StringSerializer
+value.serializer=org.apache.kafka.common.serialization.StringSerializer
+key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
diff --git a/kafka-sandbox/pom.xml b/kafka-sandbox/pom.xml
new file mode 100644
index 0000000..eb33e8d
--- /dev/null
+++ b/kafka-sandbox/pom.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>io.trygvis</groupId>
+ <artifactId>kafka-sandbox</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <properties>
+ <maven.compiler.source>21</maven.compiler.source>
+ <maven.compiler.target>21</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <kafka-clients.version>3.5.1</kafka-clients.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka-clients.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/kafka-sandbox/src/main/java/examples/ConsumerExample.java b/kafka-sandbox/src/main/java/examples/ConsumerExample.java
new file mode 100644
index 0000000..e491035
--- /dev/null
+++ b/kafka-sandbox/src/main/java/examples/ConsumerExample.java
@@ -0,0 +1,38 @@
+package examples;
+
+import org.apache.kafka.clients.consumer.*;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
+
+public class ConsumerExample {
+ public static void main(final String[] args) throws Exception {
+ if (args.length != 1) {
+ System.out.println("Please provide the configuration file path as a command line argument");
+ System.exit(1);
+ }
+
+ final String topic = "purchases";
+
+ // Load consumer configuration settings from a local file
+ // Reusing the loadConfig method from the ProducerExample class
+ final Properties props = ProducerExample.loadConfig(args[0]);
+
+ // Add additional properties.
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-java-getting-started");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ try (final Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
+ consumer.subscribe(Arrays.asList(topic));
+ while (true) {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord<String, String> record : records) {
+ String key = record.key();
+ String value = record.value();
+ System.out.println(String.format("Consumed event from topic %s: key = %-10s value = %s", topic, key, value));
+ }
+ }
+ }
+ }
+}
diff --git a/kafka-sandbox/src/main/java/examples/ProducerExample.java b/kafka-sandbox/src/main/java/examples/ProducerExample.java
new file mode 100644
index 0000000..3d185cd
--- /dev/null
+++ b/kafka-sandbox/src/main/java/examples/ProducerExample.java
@@ -0,0 +1,55 @@
+package examples;
+
+import org.apache.kafka.clients.producer.*;
+
+import java.io.*;
+import java.nio.file.*;
+import java.util.*;
+
+public class ProducerExample {
+
+ public static void main(final String[] args) throws IOException {
+ if (args.length != 1) {
+ System.out.println("Please provide the configuration file path as a command line argument");
+ System.exit(1);
+ }
+
+ // Load producer configuration settings from a local file
+ final Properties props = loadConfig(args[0]);
+ final String topic = "purchases";
+
+ String[] users = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"};
+ String[] items = {"book", "alarm clock", "t-shirts", "gift card", "batteries"};
+ try (final Producer<String, String> producer = new KafkaProducer<>(props)) {
+ final Random rnd = new Random();
+ final var numMessages = 10L;
+ for (var i = 0L; i < numMessages; i++) {
+ String user = users[rnd.nextInt(users.length)];
+ String item = items[rnd.nextInt(items.length)];
+
+ producer.send(
+ new ProducerRecord<>(topic, user, item),
+ (event, ex) -> {
+ if (ex != null)
+ ex.printStackTrace();
+ else
+ System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item);
+ });
+ }
+ System.out.printf("%s events were produced to topic %s%n", numMessages, topic);
+ }
+
+ }
+
+ // We'll reuse this function to load properties from the Consumer as well
+ public static Properties loadConfig(final String configFile) throws IOException {
+ if (!Files.exists(Paths.get(configFile))) {
+ throw new IOException(configFile + " not found.");
+ }
+ final Properties cfg = new Properties();
+ try (InputStream inputStream = new FileInputStream(configFile)) {
+ cfg.load(inputStream);
+ }
+ return cfg;
+ }
+}