From a457621e444b4e0719c076a172a62d823616a122 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 7 Feb 2024 19:14:47 +0100 Subject: wip --- consumer.go | 101 ------------------------------------------------------------ 1 file changed, 101 deletions(-) delete mode 100644 consumer.go (limited to 'consumer.go') diff --git a/consumer.go b/consumer.go deleted file mode 100644 index 4047c4e..0000000 --- a/consumer.go +++ /dev/null @@ -1,101 +0,0 @@ -package main - -import ( - "fmt" - "github.com/confluentinc/confluent-kafka-go/v2/kafka" - "kafka-sandbox/util" - "log" - "os" - "os/signal" - "syscall" -) - -func main() { - if len(os.Args) != 2 { - fmt.Fprintf(os.Stderr, "Usage: %s \n", - os.Args[0]) - os.Exit(1) - } - - configFile := os.Args[1] - conf := util.ReadConfig(configFile) - conf["group.id"] = "kafka-go-getting-started" - conf["auto.offset.reset"] = "earliest" - - c, err := kafka.NewConsumer(&conf) - if err != nil { - fmt.Printf("Failed to create consumer: %s", err) - os.Exit(1) - } - - topic := "purchases" - err = c.SubscribeTopics([]string{topic}, nil) - // Set up a channel for handling Ctrl-C, etc - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - - // Process messages - run := true - for run { - select { - case sig := <-sigchan: - fmt.Printf("Caught signal %v: terminating\n", sig) - run = false - default: - ev := c.Poll(100) - if ev == nil { - continue - } - - switch e := ev.(type) { - case *kafka.Message: - if e.TopicPartition.Error != nil { - onTopicPartitionError(e) - } else { - onMessage(e) - } - case kafka.OffsetsCommitted: - log.Println("Offsets committed") - for i, offset := range e.Offsets { - log.Printf(" %d: %s\n", i, offset.String()) - } - case kafka.Error: - log.Fatalln(e) - default: - log.Printf("Unhandled message type: %s\n", e.String()) - } - } - } - - c.Close() -} - -func onMessage(ev *kafka.Message) { - fmt.Printf("-------- %s\n", ev.TopicPartition.String()) - fmt.Printf("%s %s: %s\n", - ev.TimestampType.String(), - ev.Timestamp.String(), - string(ev.Key)) - for _, header := range ev.Headers { - fmt.Printf("%s: %s\n", header.Key, string(header.Value)) - } - fmt.Printf("%s\n", string(ev.Value)) -} - -func onTopicPartitionError(e *kafka.Message) { - fmt.Printf("Partition error: %s\n", e.TopicPartition.String()) -} - -func showAssignment(err error, c *kafka.Consumer) error { - partitions, err := c.Assignment() - if err != nil { - return err - } - - fmt.Printf("Consuming %d partitions: \n", len(partitions)) - for i, partition := range partitions { - fmt.Printf(" #%d: %s\n", i+1, partition.String()) - } - - return nil -} -- cgit v1.2.3