summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2024-02-07 14:02:22 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2024-02-07 14:02:22 +0100
commitacf44b68569f6615d373f05f92671afecb296af8 (patch)
tree337b49bb93ed8cc08096e803065c72f3e87f9642
parentc4d37d593262f8ff0053693513d1a5e397ff2b5d (diff)
downloadkafka-sandbox-acf44b68569f6615d373f05f92671afecb296af8.tar.gz
kafka-sandbox-acf44b68569f6615d373f05f92671afecb296af8.tar.bz2
kafka-sandbox-acf44b68569f6615d373f05f92671afecb296af8.tar.xz
kafka-sandbox-acf44b68569f6615d373f05f92671afecb296af8.zip
wip
-rw-r--r--README.md9
-rw-r--r--consumer.go13
-rw-r--r--getting-started.properties3
-rw-r--r--go.mod2
-rw-r--r--go.sum2
-rw-r--r--init.sql0
-rw-r--r--producer.go9
-rw-r--r--util/util.go4
8 files changed, 29 insertions, 13 deletions
diff --git a/README.md b/README.md
index 35ffa4f..0cf77f8 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,11 @@
* Docker compose: https://github.com/confluentinc/cp-all-in-one/blob/7.5.3-post/cp-all-in-one-kraft/docker-compose.yml
* Demo: https://developer.confluent.io/get-started/go
+
+# Start services
+
+ . .settings.sh
+ docker-compose up -d
+
+Available services:
+
+ * Control center: http://localhost:9021
diff --git a/consumer.go b/consumer.go
index c838b0c..a8c6346 100644
--- a/consumer.go
+++ b/consumer.go
@@ -2,17 +2,15 @@ package main
import (
"fmt"
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"kafka-sandbox/util"
"os"
"os/signal"
"syscall"
"time"
-
- "github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
-
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: %s <config-file-path>\n",
os.Args[0])
@@ -50,8 +48,13 @@ func main() {
// Errors are informational and automatically handled by the consumer
continue
}
- fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n",
- *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
+ fmt.Printf("%s %s %s: %+v, key = %-10s value = %s\n",
+ ev.TimestampType.String(),
+ ev.Timestamp.String(),
+ *ev.TopicPartition.Topic,
+ ev.Headers,
+ string(ev.Key),
+ string(ev.Value))
}
}
diff --git a/getting-started.properties b/getting-started.properties
index ea4ec5a..467d1c4 100644
--- a/getting-started.properties
+++ b/getting-started.properties
@@ -1 +1,2 @@
-bootstrap.servers={{ kafka.broker.server }}
+bootstrap.servers=localhost
+
diff --git a/go.mod b/go.mod
index f407cb9..8cedea6 100644
--- a/go.mod
+++ b/go.mod
@@ -2,4 +2,4 @@ module kafka-sandbox
go 1.22
-require github.com/confluentinc/confluent-kafka-go v1.9.2
+require github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
diff --git a/go.sum b/go.sum
index 8086547..a9c2f29 100644
--- a/go.sum
+++ b/go.sum
@@ -19,6 +19,8 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/confluentinc/confluent-kafka-go v1.9.2 h1:gV/GxhMBUb03tFWkN+7kdhg+zf+QUM+wVkI9zwh770Q=
github.com/confluentinc/confluent-kafka-go v1.9.2/go.mod h1:ptXNqsuDfYbAE/LBW6pnwWZElUoWxHoV8E43DCrliyo=
+github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts=
+github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
diff --git a/init.sql b/init.sql
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/init.sql
diff --git a/producer.go b/producer.go
index 8cf1b0e..f5392ae 100644
--- a/producer.go
+++ b/producer.go
@@ -2,11 +2,11 @@ package main
import (
"fmt"
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"kafka-sandbox/util"
+ "log"
"math/rand"
"os"
-
- "github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
@@ -49,11 +49,14 @@ func main() {
for n := 0; n < 10; n++ {
key := users[rand.Intn(len(users))]
data := items[rand.Intn(len(items))]
- p.Produce(&kafka.Message{
+ err := p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Key: []byte(key),
Value: []byte(data),
}, nil)
+ if err != nil {
+ log.Fatalln(err)
+ }
}
// Wait for all messages to be delivered
diff --git a/util/util.go b/util/util.go
index 57c8246..d56f662 100644
--- a/util/util.go
+++ b/util/util.go
@@ -6,11 +6,10 @@ import (
"os"
"strings"
- "github.com/confluentinc/confluent-kafka-go/kafka"
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
func ReadConfig(configFile string) kafka.ConfigMap {
-
m := make(map[string]kafka.ConfigValue)
file, err := os.Open(configFile)
@@ -39,5 +38,4 @@ func ReadConfig(configFile string) kafka.ConfigMap {
}
return m
-
}