From bd2a718ea0f8613a056683d133a05c89cd6f1988 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 7 Feb 2024 19:20:51 +0100 Subject: wip --- .gitignore | 3 ++ kafka-sandbox/.gitignore | 38 +++++++++++++++ kafka-sandbox/kafka.properties | 5 ++ kafka-sandbox/pom.xml | 26 ++++++++++ .../src/main/java/examples/ConsumerExample.java | 38 +++++++++++++++ .../src/main/java/examples/ProducerExample.java | 55 ++++++++++++++++++++++ 6 files changed, 165 insertions(+) create mode 100644 kafka-sandbox/.gitignore create mode 100644 kafka-sandbox/kafka.properties create mode 100644 kafka-sandbox/pom.xml create mode 100644 kafka-sandbox/src/main/java/examples/ConsumerExample.java create mode 100644 kafka-sandbox/src/main/java/examples/ProducerExample.java diff --git a/.gitignore b/.gitignore index 9fb18b4..cfaa919 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ .idea out +*.tmp +tmp +*.tmp.* diff --git a/kafka-sandbox/.gitignore b/kafka-sandbox/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/kafka-sandbox/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/kafka-sandbox/kafka.properties b/kafka-sandbox/kafka.properties new file mode 100644 index 0000000..45042ad --- /dev/null +++ b/kafka-sandbox/kafka.properties @@ -0,0 +1,5 @@ +bootstrap.servers=localhost:9092 +key.serializer=org.apache.kafka.common.serialization.StringSerializer +value.serializer=org.apache.kafka.common.serialization.StringSerializer +key.deserializer=org.apache.kafka.common.serialization.StringDeserializer +value.deserializer=org.apache.kafka.common.serialization.StringDeserializer diff --git a/kafka-sandbox/pom.xml b/kafka-sandbox/pom.xml new file mode 100644 index 0000000..eb33e8d --- /dev/null +++ b/kafka-sandbox/pom.xml @@ -0,0 +1,26 @@ + + + 4.0.0 + + io.trygvis + kafka-sandbox + 1.0-SNAPSHOT + + + 21 + 21 + UTF-8 + 3.5.1 + + + + + org.apache.kafka + kafka-clients + ${kafka-clients.version} + + + + diff --git a/kafka-sandbox/src/main/java/examples/ConsumerExample.java b/kafka-sandbox/src/main/java/examples/ConsumerExample.java new file mode 100644 index 0000000..e491035 --- /dev/null +++ b/kafka-sandbox/src/main/java/examples/ConsumerExample.java @@ -0,0 +1,38 @@ +package examples; + +import org.apache.kafka.clients.consumer.*; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; + +public class ConsumerExample { + public static void main(final String[] args) throws Exception { + if (args.length != 1) { + System.out.println("Please provide the configuration file path as a command line argument"); + System.exit(1); + } + + final String topic = "purchases"; + + // Load consumer configuration settings from a local file + // Reusing the loadConfig method from the ProducerExample class + final Properties props = ProducerExample.loadConfig(args[0]); + + // Add additional properties. + props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-java-getting-started"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + try (final Consumer consumer = new KafkaConsumer<>(props)) { + consumer.subscribe(Arrays.asList(topic)); + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + String key = record.key(); + String value = record.value(); + System.out.println(String.format("Consumed event from topic %s: key = %-10s value = %s", topic, key, value)); + } + } + } + } +} diff --git a/kafka-sandbox/src/main/java/examples/ProducerExample.java b/kafka-sandbox/src/main/java/examples/ProducerExample.java new file mode 100644 index 0000000..3d185cd --- /dev/null +++ b/kafka-sandbox/src/main/java/examples/ProducerExample.java @@ -0,0 +1,55 @@ +package examples; + +import org.apache.kafka.clients.producer.*; + +import java.io.*; +import java.nio.file.*; +import java.util.*; + +public class ProducerExample { + + public static void main(final String[] args) throws IOException { + if (args.length != 1) { + System.out.println("Please provide the configuration file path as a command line argument"); + System.exit(1); + } + + // Load producer configuration settings from a local file + final Properties props = loadConfig(args[0]); + final String topic = "purchases"; + + String[] users = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}; + String[] items = {"book", "alarm clock", "t-shirts", "gift card", "batteries"}; + try (final Producer producer = new KafkaProducer<>(props)) { + final Random rnd = new Random(); + final var numMessages = 10L; + for (var i = 0L; i < numMessages; i++) { + String user = users[rnd.nextInt(users.length)]; + String item = items[rnd.nextInt(items.length)]; + + producer.send( + new ProducerRecord<>(topic, user, item), + (event, ex) -> { + if (ex != null) + ex.printStackTrace(); + else + System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item); + }); + } + System.out.printf("%s events were produced to topic %s%n", numMessages, topic); + } + + } + + // We'll reuse this function to load properties from the Consumer as well + public static Properties loadConfig(final String configFile) throws IOException { + if (!Files.exists(Paths.get(configFile))) { + throw new IOException(configFile + " not found."); + } + final Properties cfg = new Properties(); + try (InputStream inputStream = new FileInputStream(configFile)) { + cfg.load(inputStream); + } + return cfg; + } +} -- cgit v1.2.3