summaryrefslogtreecommitdiff
path: root/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'consumer.go')
-rw-r--r--consumer.go13
1 files changed, 8 insertions, 5 deletions
diff --git a/consumer.go b/consumer.go
index c838b0c..a8c6346 100644
--- a/consumer.go
+++ b/consumer.go
@@ -2,17 +2,15 @@ package main
import (
"fmt"
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"kafka-sandbox/util"
"os"
"os/signal"
"syscall"
"time"
-
- "github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
-
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: %s <config-file-path>\n",
os.Args[0])
@@ -50,8 +48,13 @@ func main() {
// Errors are informational and automatically handled by the consumer
continue
}
- fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n",
- *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
+ fmt.Printf("%s %s %s: %+v, key = %-10s value = %s\n",
+ ev.TimestampType.String(),
+ ev.Timestamp.String(),
+ *ev.TopicPartition.Topic,
+ ev.Headers,
+ string(ev.Key),
+ string(ev.Value))
}
}