diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2024-02-07 14:02:22 +0100 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2024-02-07 14:02:22 +0100 |
commit | acf44b68569f6615d373f05f92671afecb296af8 (patch) | |
tree | 337b49bb93ed8cc08096e803065c72f3e87f9642 | |
parent | c4d37d593262f8ff0053693513d1a5e397ff2b5d (diff) | |
download | kafka-sandbox-acf44b68569f6615d373f05f92671afecb296af8.tar.gz kafka-sandbox-acf44b68569f6615d373f05f92671afecb296af8.tar.bz2 kafka-sandbox-acf44b68569f6615d373f05f92671afecb296af8.tar.xz kafka-sandbox-acf44b68569f6615d373f05f92671afecb296af8.zip |
wip
-rw-r--r-- | README.md | 9 | ||||
-rw-r--r-- | consumer.go | 13 | ||||
-rw-r--r-- | getting-started.properties | 3 | ||||
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | go.sum | 2 | ||||
-rw-r--r-- | init.sql | 0 | ||||
-rw-r--r-- | producer.go | 9 | ||||
-rw-r--r-- | util/util.go | 4 |
8 files changed, 29 insertions, 13 deletions
@@ -1,2 +1,11 @@ * Docker compose: https://github.com/confluentinc/cp-all-in-one/blob/7.5.3-post/cp-all-in-one-kraft/docker-compose.yml * Demo: https://developer.confluent.io/get-started/go + +# Start services + + . .settings.sh + docker-compose up -d + +Available services: + + * Control center: http://localhost:9021 diff --git a/consumer.go b/consumer.go index c838b0c..a8c6346 100644 --- a/consumer.go +++ b/consumer.go @@ -2,17 +2,15 @@ package main import ( "fmt" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "kafka-sandbox/util" "os" "os/signal" "syscall" "time" - - "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { - if len(os.Args) != 2 { fmt.Fprintf(os.Stderr, "Usage: %s <config-file-path>\n", os.Args[0]) @@ -50,8 +48,13 @@ func main() { // Errors are informational and automatically handled by the consumer continue } - fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n", - *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value)) + fmt.Printf("%s %s %s: %+v, key = %-10s value = %s\n", + ev.TimestampType.String(), + ev.Timestamp.String(), + *ev.TopicPartition.Topic, + ev.Headers, + string(ev.Key), + string(ev.Value)) } } diff --git a/getting-started.properties b/getting-started.properties index ea4ec5a..467d1c4 100644 --- a/getting-started.properties +++ b/getting-started.properties @@ -1 +1,2 @@ -bootstrap.servers={{ kafka.broker.server }} +bootstrap.servers=localhost + @@ -2,4 +2,4 @@ module kafka-sandbox go 1.22 -require github.com/confluentinc/confluent-kafka-go v1.9.2 +require github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 @@ -19,6 +19,8 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/confluentinc/confluent-kafka-go v1.9.2 h1:gV/GxhMBUb03tFWkN+7kdhg+zf+QUM+wVkI9zwh770Q= github.com/confluentinc/confluent-kafka-go v1.9.2/go.mod h1:ptXNqsuDfYbAE/LBW6pnwWZElUoWxHoV8E43DCrliyo= +github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts= +github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/init.sql b/init.sql new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/init.sql diff --git a/producer.go b/producer.go index 8cf1b0e..f5392ae 100644 --- a/producer.go +++ b/producer.go @@ -2,11 +2,11 @@ package main import ( "fmt" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "kafka-sandbox/util" + "log" "math/rand" "os" - - "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { @@ -49,11 +49,14 @@ func main() { for n := 0; n < 10; n++ { key := users[rand.Intn(len(users))] data := items[rand.Intn(len(items))] - p.Produce(&kafka.Message{ + err := p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Key: []byte(key), Value: []byte(data), }, nil) + if err != nil { + log.Fatalln(err) + } } // Wait for all messages to be delivered diff --git a/util/util.go b/util/util.go index 57c8246..d56f662 100644 --- a/util/util.go +++ b/util/util.go @@ -6,11 +6,10 @@ import ( "os" "strings" - "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) func ReadConfig(configFile string) kafka.ConfigMap { - m := make(map[string]kafka.ConfigValue) file, err := os.Open(configFile) @@ -39,5 +38,4 @@ func ReadConfig(configFile string) kafka.ConfigMap { } return m - } |