diff options
Diffstat (limited to 'kafka-sandbox/src/main')
-rw-r--r-- | kafka-sandbox/src/main/java/examples/ConsumerExample.java | 38 | ||||
-rw-r--r-- | kafka-sandbox/src/main/java/examples/ProducerExample.java | 55 |
2 files changed, 93 insertions, 0 deletions
diff --git a/kafka-sandbox/src/main/java/examples/ConsumerExample.java b/kafka-sandbox/src/main/java/examples/ConsumerExample.java new file mode 100644 index 0000000..e491035 --- /dev/null +++ b/kafka-sandbox/src/main/java/examples/ConsumerExample.java @@ -0,0 +1,38 @@ +package examples; + +import org.apache.kafka.clients.consumer.*; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; + +public class ConsumerExample { + public static void main(final String[] args) throws Exception { + if (args.length != 1) { + System.out.println("Please provide the configuration file path as a command line argument"); + System.exit(1); + } + + final String topic = "purchases"; + + // Load consumer configuration settings from a local file + // Reusing the loadConfig method from the ProducerExample class + final Properties props = ProducerExample.loadConfig(args[0]); + + // Add additional properties. + props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-java-getting-started"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + try (final Consumer<String, String> consumer = new KafkaConsumer<>(props)) { + consumer.subscribe(Arrays.asList(topic)); + while (true) { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord<String, String> record : records) { + String key = record.key(); + String value = record.value(); + System.out.println(String.format("Consumed event from topic %s: key = %-10s value = %s", topic, key, value)); + } + } + } + } +} diff --git a/kafka-sandbox/src/main/java/examples/ProducerExample.java b/kafka-sandbox/src/main/java/examples/ProducerExample.java new file mode 100644 index 0000000..3d185cd --- /dev/null +++ b/kafka-sandbox/src/main/java/examples/ProducerExample.java @@ -0,0 +1,55 @@ +package examples; + +import org.apache.kafka.clients.producer.*; + +import java.io.*; +import java.nio.file.*; +import java.util.*; + +public class ProducerExample { + + public static void main(final String[] args) throws IOException { + if (args.length != 1) { + System.out.println("Please provide the configuration file path as a command line argument"); + System.exit(1); + } + + // Load producer configuration settings from a local file + final Properties props = loadConfig(args[0]); + final String topic = "purchases"; + + String[] users = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}; + String[] items = {"book", "alarm clock", "t-shirts", "gift card", "batteries"}; + try (final Producer<String, String> producer = new KafkaProducer<>(props)) { + final Random rnd = new Random(); + final var numMessages = 10L; + for (var i = 0L; i < numMessages; i++) { + String user = users[rnd.nextInt(users.length)]; + String item = items[rnd.nextInt(items.length)]; + + producer.send( + new ProducerRecord<>(topic, user, item), + (event, ex) -> { + if (ex != null) + ex.printStackTrace(); + else + System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item); + }); + } + System.out.printf("%s events were produced to topic %s%n", numMessages, topic); + } + + } + + // We'll reuse this function to load properties from the Consumer as well + public static Properties loadConfig(final String configFile) throws IOException { + if (!Files.exists(Paths.get(configFile))) { + throw new IOException(configFile + " not found."); + } + final Properties cfg = new Properties(); + try (InputStream inputStream = new FileInputStream(configFile)) { + cfg.load(inputStream); + } + return cfg; + } +} |