From df50aaee6c6ef4b51638fe22317e28234ffe8acc Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 7 Feb 2024 19:12:07 +0100 Subject: wip --- consumer.go | 74 ++++++++++++++++++++++++++++++++++++++++++------------------- producer.go | 1 - 2 files changed, 51 insertions(+), 24 deletions(-) diff --git a/consumer.go b/consumer.go index b9c1b45..4047c4e 100644 --- a/consumer.go +++ b/consumer.go @@ -4,10 +4,10 @@ import ( "fmt" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "kafka-sandbox/util" + "log" "os" "os/signal" "syscall" - "time" ) func main() { @@ -23,23 +23,11 @@ func main() { conf["auto.offset.reset"] = "earliest" c, err := kafka.NewConsumer(&conf) - if err != nil { fmt.Printf("Failed to create consumer: %s", err) os.Exit(1) } - partitions, err := c.Assignment() - if err != nil { - fmt.Printf("Failed to get assignments: %s", err) - os.Exit(1) - } - - fmt.Printf("Consuming %d partitions: \n", len(partitions)) - for i, partition := range partitions { - fmt.Printf(" #%d: %s\n", i+1, partition.String()) - } - topic := "purchases" err = c.SubscribeTopics([]string{topic}, nil) // Set up a channel for handling Ctrl-C, etc @@ -54,20 +42,60 @@ func main() { fmt.Printf("Caught signal %v: terminating\n", sig) run = false default: - ev, err := c.ReadMessage(100 * time.Millisecond) - if err != nil { - // Errors are informational and automatically handled by the consumer + ev := c.Poll(100) + if ev == nil { continue } - fmt.Printf("%s %s %s: %+v, key = %-10s value = %s\n", - ev.TimestampType.String(), - ev.Timestamp.String(), - *ev.TopicPartition.Topic, - ev.Headers, - string(ev.Key), - string(ev.Value)) + + switch e := ev.(type) { + case *kafka.Message: + if e.TopicPartition.Error != nil { + onTopicPartitionError(e) + } else { + onMessage(e) + } + case kafka.OffsetsCommitted: + log.Println("Offsets committed") + for i, offset := range e.Offsets { + log.Printf(" %d: %s\n", i, offset.String()) + } + case kafka.Error: + log.Fatalln(e) + default: + log.Printf("Unhandled message type: %s\n", e.String()) + } } } c.Close() } + +func onMessage(ev *kafka.Message) { + fmt.Printf("-------- %s\n", ev.TopicPartition.String()) + fmt.Printf("%s %s: %s\n", + ev.TimestampType.String(), + ev.Timestamp.String(), + string(ev.Key)) + for _, header := range ev.Headers { + fmt.Printf("%s: %s\n", header.Key, string(header.Value)) + } + fmt.Printf("%s\n", string(ev.Value)) +} + +func onTopicPartitionError(e *kafka.Message) { + fmt.Printf("Partition error: %s\n", e.TopicPartition.String()) +} + +func showAssignment(err error, c *kafka.Consumer) error { + partitions, err := c.Assignment() + if err != nil { + return err + } + + fmt.Printf("Consuming %d partitions: \n", len(partitions)) + for i, partition := range partitions { + fmt.Printf(" #%d: %s\n", i+1, partition.String()) + } + + return nil +} diff --git a/producer.go b/producer.go index f5392ae..2eb33d1 100644 --- a/producer.go +++ b/producer.go @@ -21,7 +21,6 @@ func main() { topic := "purchases" p, err := kafka.NewProducer(&conf) - if err != nil { fmt.Printf("Failed to create producer: %s", err) os.Exit(1) -- cgit v1.2.3