diff options
Diffstat (limited to 'kafka-sandbox/src/main/java/examples/ConsumerExample.java')
-rw-r--r-- | kafka-sandbox/src/main/java/examples/ConsumerExample.java | 38 |
1 files changed, 38 insertions, 0 deletions
diff --git a/kafka-sandbox/src/main/java/examples/ConsumerExample.java b/kafka-sandbox/src/main/java/examples/ConsumerExample.java new file mode 100644 index 0000000..e491035 --- /dev/null +++ b/kafka-sandbox/src/main/java/examples/ConsumerExample.java @@ -0,0 +1,38 @@ +package examples; + +import org.apache.kafka.clients.consumer.*; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; + +public class ConsumerExample { + public static void main(final String[] args) throws Exception { + if (args.length != 1) { + System.out.println("Please provide the configuration file path as a command line argument"); + System.exit(1); + } + + final String topic = "purchases"; + + // Load consumer configuration settings from a local file + // Reusing the loadConfig method from the ProducerExample class + final Properties props = ProducerExample.loadConfig(args[0]); + + // Add additional properties. + props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-java-getting-started"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + try (final Consumer<String, String> consumer = new KafkaConsumer<>(props)) { + consumer.subscribe(Arrays.asList(topic)); + while (true) { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord<String, String> record : records) { + String key = record.key(); + String value = record.value(); + System.out.println(String.format("Consumed event from topic %s: key = %-10s value = %s", topic, key, value)); + } + } + } + } +} |