From acf44b68569f6615d373f05f92671afecb296af8 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 7 Feb 2024 14:02:22 +0100 Subject: wip --- README.md | 9 +++++++++ consumer.go | 13 ++++++++----- getting-started.properties | 3 ++- go.mod | 2 +- go.sum | 2 ++ init.sql | 0 producer.go | 9 ++++++--- util/util.go | 4 +--- 8 files changed, 29 insertions(+), 13 deletions(-) create mode 100644 init.sql diff --git a/README.md b/README.md index 35ffa4f..0cf77f8 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,11 @@ * Docker compose: https://github.com/confluentinc/cp-all-in-one/blob/7.5.3-post/cp-all-in-one-kraft/docker-compose.yml * Demo: https://developer.confluent.io/get-started/go + +# Start services + + . .settings.sh + docker-compose up -d + +Available services: + + * Control center: http://localhost:9021 diff --git a/consumer.go b/consumer.go index c838b0c..a8c6346 100644 --- a/consumer.go +++ b/consumer.go @@ -2,17 +2,15 @@ package main import ( "fmt" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "kafka-sandbox/util" "os" "os/signal" "syscall" "time" - - "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { - if len(os.Args) != 2 { fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) @@ -50,8 +48,13 @@ func main() { // Errors are informational and automatically handled by the consumer continue } - fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n", - *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value)) + fmt.Printf("%s %s %s: %+v, key = %-10s value = %s\n", + ev.TimestampType.String(), + ev.Timestamp.String(), + *ev.TopicPartition.Topic, + ev.Headers, + string(ev.Key), + string(ev.Value)) } } diff --git a/getting-started.properties b/getting-started.properties index ea4ec5a..467d1c4 100644 --- a/getting-started.properties +++ b/getting-started.properties @@ -1 +1,2 @@ -bootstrap.servers={{ kafka.broker.server }} +bootstrap.servers=localhost + diff --git a/go.mod b/go.mod index f407cb9..8cedea6 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module kafka-sandbox go 1.22 -require github.com/confluentinc/confluent-kafka-go v1.9.2 +require github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 diff --git a/go.sum b/go.sum index 8086547..a9c2f29 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,8 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/confluentinc/confluent-kafka-go v1.9.2 h1:gV/GxhMBUb03tFWkN+7kdhg+zf+QUM+wVkI9zwh770Q= github.com/confluentinc/confluent-kafka-go v1.9.2/go.mod h1:ptXNqsuDfYbAE/LBW6pnwWZElUoWxHoV8E43DCrliyo= +github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts= +github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/init.sql b/init.sql new file mode 100644 index 0000000..e69de29 diff --git a/producer.go b/producer.go index 8cf1b0e..f5392ae 100644 --- a/producer.go +++ b/producer.go @@ -2,11 +2,11 @@ package main import ( "fmt" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "kafka-sandbox/util" + "log" "math/rand" "os" - - "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { @@ -49,11 +49,14 @@ func main() { for n := 0; n < 10; n++ { key := users[rand.Intn(len(users))] data := items[rand.Intn(len(items))] - p.Produce(&kafka.Message{ + err := p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Key: []byte(key), Value: []byte(data), }, nil) + if err != nil { + log.Fatalln(err) + } } // Wait for all messages to be delivered diff --git a/util/util.go b/util/util.go index 57c8246..d56f662 100644 --- a/util/util.go +++ b/util/util.go @@ -6,11 +6,10 @@ import ( "os" "strings" - "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) func ReadConfig(configFile string) kafka.ConfigMap { - m := make(map[string]kafka.ConfigValue) file, err := os.Open(configFile) @@ -39,5 +38,4 @@ func ReadConfig(configFile string) kafka.ConfigMap { } return m - } -- cgit v1.2.3