summaryrefslogtreecommitdiff
path: root/kafka-sandbox/src/main/java/examples/ProducerExample.java
diff options
context:
space:
mode:
Diffstat (limited to 'kafka-sandbox/src/main/java/examples/ProducerExample.java')
-rw-r--r--kafka-sandbox/src/main/java/examples/ProducerExample.java55
1 files changed, 55 insertions, 0 deletions
diff --git a/kafka-sandbox/src/main/java/examples/ProducerExample.java b/kafka-sandbox/src/main/java/examples/ProducerExample.java
new file mode 100644
index 0000000..3d185cd
--- /dev/null
+++ b/kafka-sandbox/src/main/java/examples/ProducerExample.java
@@ -0,0 +1,55 @@
+package examples;
+
+import org.apache.kafka.clients.producer.*;
+
+import java.io.*;
+import java.nio.file.*;
+import java.util.*;
+
+public class ProducerExample {
+
+ public static void main(final String[] args) throws IOException {
+ if (args.length != 1) {
+ System.out.println("Please provide the configuration file path as a command line argument");
+ System.exit(1);
+ }
+
+ // Load producer configuration settings from a local file
+ final Properties props = loadConfig(args[0]);
+ final String topic = "purchases";
+
+ String[] users = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"};
+ String[] items = {"book", "alarm clock", "t-shirts", "gift card", "batteries"};
+ try (final Producer<String, String> producer = new KafkaProducer<>(props)) {
+ final Random rnd = new Random();
+ final var numMessages = 10L;
+ for (var i = 0L; i < numMessages; i++) {
+ String user = users[rnd.nextInt(users.length)];
+ String item = items[rnd.nextInt(items.length)];
+
+ producer.send(
+ new ProducerRecord<>(topic, user, item),
+ (event, ex) -> {
+ if (ex != null)
+ ex.printStackTrace();
+ else
+ System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item);
+ });
+ }
+ System.out.printf("%s events were produced to topic %s%n", numMessages, topic);
+ }
+
+ }
+
+ // We'll reuse this function to load properties from the Consumer as well
+ public static Properties loadConfig(final String configFile) throws IOException {
+ if (!Files.exists(Paths.get(configFile))) {
+ throw new IOException(configFile + " not found.");
+ }
+ final Properties cfg = new Properties();
+ try (InputStream inputStream = new FileInputStream(configFile)) {
+ cfg.load(inputStream);
+ }
+ return cfg;
+ }
+}