summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2024-02-07 19:12:07 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2024-02-07 19:12:07 +0100
commitdf50aaee6c6ef4b51638fe22317e28234ffe8acc (patch)
tree6cae8794fa21a893c8751643a29c4ede6fe0ef94
parent5200eb197082495227fb9d19f4303a8e3edee948 (diff)
downloadkafka-sandbox-df50aaee6c6ef4b51638fe22317e28234ffe8acc.tar.gz
kafka-sandbox-df50aaee6c6ef4b51638fe22317e28234ffe8acc.tar.bz2
kafka-sandbox-df50aaee6c6ef4b51638fe22317e28234ffe8acc.tar.xz
kafka-sandbox-df50aaee6c6ef4b51638fe22317e28234ffe8acc.zip
wip
-rw-r--r--consumer.go74
-rw-r--r--producer.go1
2 files changed, 51 insertions, 24 deletions
diff --git a/consumer.go b/consumer.go
index b9c1b45..4047c4e 100644
--- a/consumer.go
+++ b/consumer.go
@@ -4,10 +4,10 @@ import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"kafka-sandbox/util"
+ "log"
"os"
"os/signal"
"syscall"
- "time"
)
func main() {
@@ -23,23 +23,11 @@ func main() {
conf["auto.offset.reset"] = "earliest"
c, err := kafka.NewConsumer(&conf)
-
if err != nil {
fmt.Printf("Failed to create consumer: %s", err)
os.Exit(1)
}
- partitions, err := c.Assignment()
- if err != nil {
- fmt.Printf("Failed to get assignments: %s", err)
- os.Exit(1)
- }
-
- fmt.Printf("Consuming %d partitions: \n", len(partitions))
- for i, partition := range partitions {
- fmt.Printf(" #%d: %s\n", i+1, partition.String())
- }
-
topic := "purchases"
err = c.SubscribeTopics([]string{topic}, nil)
// Set up a channel for handling Ctrl-C, etc
@@ -54,20 +42,60 @@ func main() {
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
- ev, err := c.ReadMessage(100 * time.Millisecond)
- if err != nil {
- // Errors are informational and automatically handled by the consumer
+ ev := c.Poll(100)
+ if ev == nil {
continue
}
- fmt.Printf("%s %s %s: %+v, key = %-10s value = %s\n",
- ev.TimestampType.String(),
- ev.Timestamp.String(),
- *ev.TopicPartition.Topic,
- ev.Headers,
- string(ev.Key),
- string(ev.Value))
+
+ switch e := ev.(type) {
+ case *kafka.Message:
+ if e.TopicPartition.Error != nil {
+ onTopicPartitionError(e)
+ } else {
+ onMessage(e)
+ }
+ case kafka.OffsetsCommitted:
+ log.Println("Offsets committed")
+ for i, offset := range e.Offsets {
+ log.Printf(" %d: %s\n", i, offset.String())
+ }
+ case kafka.Error:
+ log.Fatalln(e)
+ default:
+ log.Printf("Unhandled message type: %s\n", e.String())
+ }
}
}
c.Close()
}
+
+func onMessage(ev *kafka.Message) {
+ fmt.Printf("-------- %s\n", ev.TopicPartition.String())
+ fmt.Printf("%s %s: %s\n",
+ ev.TimestampType.String(),
+ ev.Timestamp.String(),
+ string(ev.Key))
+ for _, header := range ev.Headers {
+ fmt.Printf("%s: %s\n", header.Key, string(header.Value))
+ }
+ fmt.Printf("%s\n", string(ev.Value))
+}
+
+func onTopicPartitionError(e *kafka.Message) {
+ fmt.Printf("Partition error: %s\n", e.TopicPartition.String())
+}
+
+func showAssignment(err error, c *kafka.Consumer) error {
+ partitions, err := c.Assignment()
+ if err != nil {
+ return err
+ }
+
+ fmt.Printf("Consuming %d partitions: \n", len(partitions))
+ for i, partition := range partitions {
+ fmt.Printf(" #%d: %s\n", i+1, partition.String())
+ }
+
+ return nil
+}
diff --git a/producer.go b/producer.go
index f5392ae..2eb33d1 100644
--- a/producer.go
+++ b/producer.go
@@ -21,7 +21,6 @@ func main() {
topic := "purchases"
p, err := kafka.NewProducer(&conf)
-
if err != nil {
fmt.Printf("Failed to create producer: %s", err)
os.Exit(1)