diff options
Diffstat (limited to 'go/consumer.go')
-rw-r--r-- | go/consumer.go | 101 |
1 files changed, 101 insertions, 0 deletions
diff --git a/go/consumer.go b/go/consumer.go new file mode 100644 index 0000000..4047c4e --- /dev/null +++ b/go/consumer.go @@ -0,0 +1,101 @@ +package main + +import ( + "fmt" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "kafka-sandbox/util" + "log" + "os" + "os/signal" + "syscall" +) + +func main() { + if len(os.Args) != 2 { + fmt.Fprintf(os.Stderr, "Usage: %s <config-file-path>\n", + os.Args[0]) + os.Exit(1) + } + + configFile := os.Args[1] + conf := util.ReadConfig(configFile) + conf["group.id"] = "kafka-go-getting-started" + conf["auto.offset.reset"] = "earliest" + + c, err := kafka.NewConsumer(&conf) + if err != nil { + fmt.Printf("Failed to create consumer: %s", err) + os.Exit(1) + } + + topic := "purchases" + err = c.SubscribeTopics([]string{topic}, nil) + // Set up a channel for handling Ctrl-C, etc + sigchan := make(chan os.Signal, 1) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + + // Process messages + run := true + for run { + select { + case sig := <-sigchan: + fmt.Printf("Caught signal %v: terminating\n", sig) + run = false + default: + ev := c.Poll(100) + if ev == nil { + continue + } + + switch e := ev.(type) { + case *kafka.Message: + if e.TopicPartition.Error != nil { + onTopicPartitionError(e) + } else { + onMessage(e) + } + case kafka.OffsetsCommitted: + log.Println("Offsets committed") + for i, offset := range e.Offsets { + log.Printf(" %d: %s\n", i, offset.String()) + } + case kafka.Error: + log.Fatalln(e) + default: + log.Printf("Unhandled message type: %s\n", e.String()) + } + } + } + + c.Close() +} + +func onMessage(ev *kafka.Message) { + fmt.Printf("-------- %s\n", ev.TopicPartition.String()) + fmt.Printf("%s %s: %s\n", + ev.TimestampType.String(), + ev.Timestamp.String(), + string(ev.Key)) + for _, header := range ev.Headers { + fmt.Printf("%s: %s\n", header.Key, string(header.Value)) + } + fmt.Printf("%s\n", string(ev.Value)) +} + +func onTopicPartitionError(e *kafka.Message) { + fmt.Printf("Partition error: %s\n", e.TopicPartition.String()) +} + +func showAssignment(err error, c *kafka.Consumer) error { + partitions, err := c.Assignment() + if err != nil { + return err + } + + fmt.Printf("Consuming %d partitions: \n", len(partitions)) + for i, partition := range partitions { + fmt.Printf(" #%d: %s\n", i+1, partition.String()) + } + + return nil +} |