summaryrefslogtreecommitdiff
path: root/kafka-sandbox/src/main/java/examples/ConsumerExample.java
diff options
context:
space:
mode:
Diffstat (limited to 'kafka-sandbox/src/main/java/examples/ConsumerExample.java')
-rw-r--r--kafka-sandbox/src/main/java/examples/ConsumerExample.java38
1 files changed, 38 insertions, 0 deletions
diff --git a/kafka-sandbox/src/main/java/examples/ConsumerExample.java b/kafka-sandbox/src/main/java/examples/ConsumerExample.java
new file mode 100644
index 0000000..e491035
--- /dev/null
+++ b/kafka-sandbox/src/main/java/examples/ConsumerExample.java
@@ -0,0 +1,38 @@
+package examples;
+
+import org.apache.kafka.clients.consumer.*;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
+
+public class ConsumerExample {
+ public static void main(final String[] args) throws Exception {
+ if (args.length != 1) {
+ System.out.println("Please provide the configuration file path as a command line argument");
+ System.exit(1);
+ }
+
+ final String topic = "purchases";
+
+ // Load consumer configuration settings from a local file
+ // Reusing the loadConfig method from the ProducerExample class
+ final Properties props = ProducerExample.loadConfig(args[0]);
+
+ // Add additional properties.
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-java-getting-started");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ try (final Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
+ consumer.subscribe(Arrays.asList(topic));
+ while (true) {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord<String, String> record : records) {
+ String key = record.key();
+ String value = record.value();
+ System.out.println(String.format("Consumed event from topic %s: key = %-10s value = %s", topic, key, value));
+ }
+ }
+ }
+ }
+}