summaryrefslogtreecommitdiff
path: root/kafka-sandbox/src/main/java/examples/ProducerExample.java
diff options
context:
space:
mode:
Diffstat (limited to 'kafka-sandbox/src/main/java/examples/ProducerExample.java')
-rw-r--r--kafka-sandbox/src/main/java/examples/ProducerExample.java49
1 files changed, 23 insertions, 26 deletions
diff --git a/kafka-sandbox/src/main/java/examples/ProducerExample.java b/kafka-sandbox/src/main/java/examples/ProducerExample.java
index 3d185cd..26879d3 100644
--- a/kafka-sandbox/src/main/java/examples/ProducerExample.java
+++ b/kafka-sandbox/src/main/java/examples/ProducerExample.java
@@ -1,55 +1,52 @@
package examples;
-import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
-import java.io.*;
-import java.nio.file.*;
-import java.util.*;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.Random;
public class ProducerExample {
-
- public static void main(final String[] args) throws IOException {
+ public static void main(String[] args) throws IOException {
if (args.length != 1) {
System.out.println("Please provide the configuration file path as a command line argument");
System.exit(1);
}
// Load producer configuration settings from a local file
- final Properties props = loadConfig(args[0]);
- final String topic = "purchases";
-
- String[] users = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"};
- String[] items = {"book", "alarm clock", "t-shirts", "gift card", "batteries"};
- try (final Producer<String, String> producer = new KafkaProducer<>(props)) {
- final Random rnd = new Random();
- final var numMessages = 10L;
+ var props = loadConfig(args[0]);
+ var topic = "purchases";
+
+ var users = new String[]{"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"};
+ var domains = new String[]{"gmail.com", "yahoo.com", "sol.no", "online.no", "hotmail.com"};
+ try (var producer = new KafkaProducer<>(props)) {
+ var rnd = new Random();
+ var numMessages = 10L;
for (var i = 0L; i < numMessages; i++) {
- String user = users[rnd.nextInt(users.length)];
- String item = items[rnd.nextInt(items.length)];
+ var user = users[rnd.nextInt(users.length)];
+ var domain = domains[rnd.nextInt(domains.length)];
producer.send(
- new ProducerRecord<>(topic, user, item),
+ new ProducerRecord<>(topic, user, user + "@" + domain),
(event, ex) -> {
if (ex != null)
ex.printStackTrace();
else
- System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item);
+ System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, domain);
});
}
System.out.printf("%s events were produced to topic %s%n", numMessages, topic);
}
-
}
// We'll reuse this function to load properties from the Consumer as well
- public static Properties loadConfig(final String configFile) throws IOException {
- if (!Files.exists(Paths.get(configFile))) {
- throw new IOException(configFile + " not found.");
- }
- final Properties cfg = new Properties();
- try (InputStream inputStream = new FileInputStream(configFile)) {
+ public static Properties loadConfig(String configFile) throws IOException {
+ try (var inputStream = new FileInputStream(configFile)) {
+ var cfg = new Properties();
cfg.load(inputStream);
+ return cfg;
}
- return cfg;
}
}