summaryrefslogtreecommitdiff
path: root/consumer.go
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2024-02-07 14:02:22 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2024-02-07 14:02:22 +0100
commitacf44b68569f6615d373f05f92671afecb296af8 (patch)
tree337b49bb93ed8cc08096e803065c72f3e87f9642 /consumer.go
parentc4d37d593262f8ff0053693513d1a5e397ff2b5d (diff)
downloadkafka-sandbox-acf44b68569f6615d373f05f92671afecb296af8.tar.gz
kafka-sandbox-acf44b68569f6615d373f05f92671afecb296af8.tar.bz2
kafka-sandbox-acf44b68569f6615d373f05f92671afecb296af8.tar.xz
kafka-sandbox-acf44b68569f6615d373f05f92671afecb296af8.zip
wip
Diffstat (limited to 'consumer.go')
-rw-r--r--consumer.go13
1 files changed, 8 insertions, 5 deletions
diff --git a/consumer.go b/consumer.go
index c838b0c..a8c6346 100644
--- a/consumer.go
+++ b/consumer.go
@@ -2,17 +2,15 @@ package main
import (
"fmt"
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"kafka-sandbox/util"
"os"
"os/signal"
"syscall"
"time"
-
- "github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
-
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: %s <config-file-path>\n",
os.Args[0])
@@ -50,8 +48,13 @@ func main() {
// Errors are informational and automatically handled by the consumer
continue
}
- fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n",
- *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
+ fmt.Printf("%s %s %s: %+v, key = %-10s value = %s\n",
+ ev.TimestampType.String(),
+ ev.Timestamp.String(),
+ *ev.TopicPartition.Topic,
+ ev.Headers,
+ string(ev.Key),
+ string(ev.Value))
}
}