summaryrefslogtreecommitdiff
path: root/go/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'go/consumer.go')
-rw-r--r--go/consumer.go101
1 files changed, 101 insertions, 0 deletions
diff --git a/go/consumer.go b/go/consumer.go
new file mode 100644
index 0000000..4047c4e
--- /dev/null
+++ b/go/consumer.go
@@ -0,0 +1,101 @@
+package main
+
+import (
+ "fmt"
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+ "kafka-sandbox/util"
+ "log"
+ "os"
+ "os/signal"
+ "syscall"
+)
+
+func main() {
+ if len(os.Args) != 2 {
+ fmt.Fprintf(os.Stderr, "Usage: %s <config-file-path>\n",
+ os.Args[0])
+ os.Exit(1)
+ }
+
+ configFile := os.Args[1]
+ conf := util.ReadConfig(configFile)
+ conf["group.id"] = "kafka-go-getting-started"
+ conf["auto.offset.reset"] = "earliest"
+
+ c, err := kafka.NewConsumer(&conf)
+ if err != nil {
+ fmt.Printf("Failed to create consumer: %s", err)
+ os.Exit(1)
+ }
+
+ topic := "purchases"
+ err = c.SubscribeTopics([]string{topic}, nil)
+ // Set up a channel for handling Ctrl-C, etc
+ sigchan := make(chan os.Signal, 1)
+ signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
+
+ // Process messages
+ run := true
+ for run {
+ select {
+ case sig := <-sigchan:
+ fmt.Printf("Caught signal %v: terminating\n", sig)
+ run = false
+ default:
+ ev := c.Poll(100)
+ if ev == nil {
+ continue
+ }
+
+ switch e := ev.(type) {
+ case *kafka.Message:
+ if e.TopicPartition.Error != nil {
+ onTopicPartitionError(e)
+ } else {
+ onMessage(e)
+ }
+ case kafka.OffsetsCommitted:
+ log.Println("Offsets committed")
+ for i, offset := range e.Offsets {
+ log.Printf(" %d: %s\n", i, offset.String())
+ }
+ case kafka.Error:
+ log.Fatalln(e)
+ default:
+ log.Printf("Unhandled message type: %s\n", e.String())
+ }
+ }
+ }
+
+ c.Close()
+}
+
+func onMessage(ev *kafka.Message) {
+ fmt.Printf("-------- %s\n", ev.TopicPartition.String())
+ fmt.Printf("%s %s: %s\n",
+ ev.TimestampType.String(),
+ ev.Timestamp.String(),
+ string(ev.Key))
+ for _, header := range ev.Headers {
+ fmt.Printf("%s: %s\n", header.Key, string(header.Value))
+ }
+ fmt.Printf("%s\n", string(ev.Value))
+}
+
+func onTopicPartitionError(e *kafka.Message) {
+ fmt.Printf("Partition error: %s\n", e.TopicPartition.String())
+}
+
+func showAssignment(err error, c *kafka.Consumer) error {
+ partitions, err := c.Assignment()
+ if err != nil {
+ return err
+ }
+
+ fmt.Printf("Consuming %d partitions: \n", len(partitions))
+ for i, partition := range partitions {
+ fmt.Printf(" #%d: %s\n", i+1, partition.String())
+ }
+
+ return nil
+}