diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2024-02-07 19:20:51 +0100 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2024-02-07 19:20:51 +0100 |
commit | bd2a718ea0f8613a056683d133a05c89cd6f1988 (patch) | |
tree | a9f0e85123a8a1d30d3dba6ed3977e437306cb50 | |
parent | a457621e444b4e0719c076a172a62d823616a122 (diff) | |
download | kafka-sandbox-bd2a718ea0f8613a056683d133a05c89cd6f1988.tar.gz kafka-sandbox-bd2a718ea0f8613a056683d133a05c89cd6f1988.tar.bz2 kafka-sandbox-bd2a718ea0f8613a056683d133a05c89cd6f1988.tar.xz kafka-sandbox-bd2a718ea0f8613a056683d133a05c89cd6f1988.zip |
wip
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | kafka-sandbox/.gitignore | 38 | ||||
-rw-r--r-- | kafka-sandbox/kafka.properties | 5 | ||||
-rw-r--r-- | kafka-sandbox/pom.xml | 26 | ||||
-rw-r--r-- | kafka-sandbox/src/main/java/examples/ConsumerExample.java | 38 | ||||
-rw-r--r-- | kafka-sandbox/src/main/java/examples/ProducerExample.java | 55 |
6 files changed, 165 insertions, 0 deletions
@@ -1,2 +1,5 @@ .idea out +*.tmp +tmp +*.tmp.* diff --git a/kafka-sandbox/.gitignore b/kafka-sandbox/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/kafka-sandbox/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store
\ No newline at end of file diff --git a/kafka-sandbox/kafka.properties b/kafka-sandbox/kafka.properties new file mode 100644 index 0000000..45042ad --- /dev/null +++ b/kafka-sandbox/kafka.properties @@ -0,0 +1,5 @@ +bootstrap.servers=localhost:9092 +key.serializer=org.apache.kafka.common.serialization.StringSerializer +value.serializer=org.apache.kafka.common.serialization.StringSerializer +key.deserializer=org.apache.kafka.common.serialization.StringDeserializer +value.deserializer=org.apache.kafka.common.serialization.StringDeserializer diff --git a/kafka-sandbox/pom.xml b/kafka-sandbox/pom.xml new file mode 100644 index 0000000..eb33e8d --- /dev/null +++ b/kafka-sandbox/pom.xml @@ -0,0 +1,26 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>io.trygvis</groupId> + <artifactId>kafka-sandbox</artifactId> + <version>1.0-SNAPSHOT</version> + + <properties> + <maven.compiler.source>21</maven.compiler.source> + <maven.compiler.target>21</maven.compiler.target> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <kafka-clients.version>3.5.1</kafka-clients.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka-clients.version}</version> + </dependency> + </dependencies> + +</project> diff --git a/kafka-sandbox/src/main/java/examples/ConsumerExample.java b/kafka-sandbox/src/main/java/examples/ConsumerExample.java new file mode 100644 index 0000000..e491035 --- /dev/null +++ b/kafka-sandbox/src/main/java/examples/ConsumerExample.java @@ -0,0 +1,38 @@ +package examples; + +import org.apache.kafka.clients.consumer.*; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; + +public class ConsumerExample { + public static void main(final String[] args) throws Exception { + if (args.length != 1) { + System.out.println("Please provide the configuration file path as a command line argument"); + System.exit(1); + } + + final String topic = "purchases"; + + // Load consumer configuration settings from a local file + // Reusing the loadConfig method from the ProducerExample class + final Properties props = ProducerExample.loadConfig(args[0]); + + // Add additional properties. + props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-java-getting-started"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + try (final Consumer<String, String> consumer = new KafkaConsumer<>(props)) { + consumer.subscribe(Arrays.asList(topic)); + while (true) { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord<String, String> record : records) { + String key = record.key(); + String value = record.value(); + System.out.println(String.format("Consumed event from topic %s: key = %-10s value = %s", topic, key, value)); + } + } + } + } +} diff --git a/kafka-sandbox/src/main/java/examples/ProducerExample.java b/kafka-sandbox/src/main/java/examples/ProducerExample.java new file mode 100644 index 0000000..3d185cd --- /dev/null +++ b/kafka-sandbox/src/main/java/examples/ProducerExample.java @@ -0,0 +1,55 @@ +package examples; + +import org.apache.kafka.clients.producer.*; + +import java.io.*; +import java.nio.file.*; +import java.util.*; + +public class ProducerExample { + + public static void main(final String[] args) throws IOException { + if (args.length != 1) { + System.out.println("Please provide the configuration file path as a command line argument"); + System.exit(1); + } + + // Load producer configuration settings from a local file + final Properties props = loadConfig(args[0]); + final String topic = "purchases"; + + String[] users = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}; + String[] items = {"book", "alarm clock", "t-shirts", "gift card", "batteries"}; + try (final Producer<String, String> producer = new KafkaProducer<>(props)) { + final Random rnd = new Random(); + final var numMessages = 10L; + for (var i = 0L; i < numMessages; i++) { + String user = users[rnd.nextInt(users.length)]; + String item = items[rnd.nextInt(items.length)]; + + producer.send( + new ProducerRecord<>(topic, user, item), + (event, ex) -> { + if (ex != null) + ex.printStackTrace(); + else + System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item); + }); + } + System.out.printf("%s events were produced to topic %s%n", numMessages, topic); + } + + } + + // We'll reuse this function to load properties from the Consumer as well + public static Properties loadConfig(final String configFile) throws IOException { + if (!Files.exists(Paths.get(configFile))) { + throw new IOException(configFile + " not found."); + } + final Properties cfg = new Properties(); + try (InputStream inputStream = new FileInputStream(configFile)) { + cfg.load(inputStream); + } + return cfg; + } +} |