From bd2a718ea0f8613a056683d133a05c89cd6f1988 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 7 Feb 2024 19:20:51 +0100 Subject: wip --- .../src/main/java/examples/ProducerExample.java | 55 ++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 kafka-sandbox/src/main/java/examples/ProducerExample.java (limited to 'kafka-sandbox/src/main/java/examples/ProducerExample.java') diff --git a/kafka-sandbox/src/main/java/examples/ProducerExample.java b/kafka-sandbox/src/main/java/examples/ProducerExample.java new file mode 100644 index 0000000..3d185cd --- /dev/null +++ b/kafka-sandbox/src/main/java/examples/ProducerExample.java @@ -0,0 +1,55 @@ +package examples; + +import org.apache.kafka.clients.producer.*; + +import java.io.*; +import java.nio.file.*; +import java.util.*; + +public class ProducerExample { + + public static void main(final String[] args) throws IOException { + if (args.length != 1) { + System.out.println("Please provide the configuration file path as a command line argument"); + System.exit(1); + } + + // Load producer configuration settings from a local file + final Properties props = loadConfig(args[0]); + final String topic = "purchases"; + + String[] users = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}; + String[] items = {"book", "alarm clock", "t-shirts", "gift card", "batteries"}; + try (final Producer producer = new KafkaProducer<>(props)) { + final Random rnd = new Random(); + final var numMessages = 10L; + for (var i = 0L; i < numMessages; i++) { + String user = users[rnd.nextInt(users.length)]; + String item = items[rnd.nextInt(items.length)]; + + producer.send( + new ProducerRecord<>(topic, user, item), + (event, ex) -> { + if (ex != null) + ex.printStackTrace(); + else + System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item); + }); + } + System.out.printf("%s events were produced to topic %s%n", numMessages, topic); + } + + } + + // We'll reuse this function to load properties from the Consumer as well + public static Properties loadConfig(final String configFile) throws IOException { + if (!Files.exists(Paths.get(configFile))) { + throw new IOException(configFile + " not found."); + } + final Properties cfg = new Properties(); + try (InputStream inputStream = new FileInputStream(configFile)) { + cfg.load(inputStream); + } + return cfg; + } +} -- cgit v1.2.3