summaryrefslogtreecommitdiff
path: root/consumer.go
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2024-02-07 19:14:47 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2024-02-07 19:14:47 +0100
commita457621e444b4e0719c076a172a62d823616a122 (patch)
treedf052ecf303a23d25f7300bd6e5db13fa2797805 /consumer.go
parentdf50aaee6c6ef4b51638fe22317e28234ffe8acc (diff)
downloadkafka-sandbox-a457621e444b4e0719c076a172a62d823616a122.tar.gz
kafka-sandbox-a457621e444b4e0719c076a172a62d823616a122.tar.bz2
kafka-sandbox-a457621e444b4e0719c076a172a62d823616a122.tar.xz
kafka-sandbox-a457621e444b4e0719c076a172a62d823616a122.zip
wip
Diffstat (limited to 'consumer.go')
-rw-r--r--consumer.go101
1 files changed, 0 insertions, 101 deletions
diff --git a/consumer.go b/consumer.go
deleted file mode 100644
index 4047c4e..0000000
--- a/consumer.go
+++ /dev/null
@@ -1,101 +0,0 @@
-package main
-
-import (
- "fmt"
- "github.com/confluentinc/confluent-kafka-go/v2/kafka"
- "kafka-sandbox/util"
- "log"
- "os"
- "os/signal"
- "syscall"
-)
-
-func main() {
- if len(os.Args) != 2 {
- fmt.Fprintf(os.Stderr, "Usage: %s <config-file-path>\n",
- os.Args[0])
- os.Exit(1)
- }
-
- configFile := os.Args[1]
- conf := util.ReadConfig(configFile)
- conf["group.id"] = "kafka-go-getting-started"
- conf["auto.offset.reset"] = "earliest"
-
- c, err := kafka.NewConsumer(&conf)
- if err != nil {
- fmt.Printf("Failed to create consumer: %s", err)
- os.Exit(1)
- }
-
- topic := "purchases"
- err = c.SubscribeTopics([]string{topic}, nil)
- // Set up a channel for handling Ctrl-C, etc
- sigchan := make(chan os.Signal, 1)
- signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
-
- // Process messages
- run := true
- for run {
- select {
- case sig := <-sigchan:
- fmt.Printf("Caught signal %v: terminating\n", sig)
- run = false
- default:
- ev := c.Poll(100)
- if ev == nil {
- continue
- }
-
- switch e := ev.(type) {
- case *kafka.Message:
- if e.TopicPartition.Error != nil {
- onTopicPartitionError(e)
- } else {
- onMessage(e)
- }
- case kafka.OffsetsCommitted:
- log.Println("Offsets committed")
- for i, offset := range e.Offsets {
- log.Printf(" %d: %s\n", i, offset.String())
- }
- case kafka.Error:
- log.Fatalln(e)
- default:
- log.Printf("Unhandled message type: %s\n", e.String())
- }
- }
- }
-
- c.Close()
-}
-
-func onMessage(ev *kafka.Message) {
- fmt.Printf("-------- %s\n", ev.TopicPartition.String())
- fmt.Printf("%s %s: %s\n",
- ev.TimestampType.String(),
- ev.Timestamp.String(),
- string(ev.Key))
- for _, header := range ev.Headers {
- fmt.Printf("%s: %s\n", header.Key, string(header.Value))
- }
- fmt.Printf("%s\n", string(ev.Value))
-}
-
-func onTopicPartitionError(e *kafka.Message) {
- fmt.Printf("Partition error: %s\n", e.TopicPartition.String())
-}
-
-func showAssignment(err error, c *kafka.Consumer) error {
- partitions, err := c.Assignment()
- if err != nil {
- return err
- }
-
- fmt.Printf("Consuming %d partitions: \n", len(partitions))
- for i, partition := range partitions {
- fmt.Printf(" #%d: %s\n", i+1, partition.String())
- }
-
- return nil
-}