summaryrefslogtreecommitdiff
path: root/producer.go
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2024-02-07 19:14:47 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2024-02-07 19:14:47 +0100
commita457621e444b4e0719c076a172a62d823616a122 (patch)
treedf052ecf303a23d25f7300bd6e5db13fa2797805 /producer.go
parentdf50aaee6c6ef4b51638fe22317e28234ffe8acc (diff)
downloadkafka-sandbox-a457621e444b4e0719c076a172a62d823616a122.tar.gz
kafka-sandbox-a457621e444b4e0719c076a172a62d823616a122.tar.bz2
kafka-sandbox-a457621e444b4e0719c076a172a62d823616a122.tar.xz
kafka-sandbox-a457621e444b4e0719c076a172a62d823616a122.zip
wip
Diffstat (limited to 'producer.go')
-rw-r--r--producer.go64
1 files changed, 0 insertions, 64 deletions
diff --git a/producer.go b/producer.go
deleted file mode 100644
index 2eb33d1..0000000
--- a/producer.go
+++ /dev/null
@@ -1,64 +0,0 @@
-package main
-
-import (
- "fmt"
- "github.com/confluentinc/confluent-kafka-go/v2/kafka"
- "kafka-sandbox/util"
- "log"
- "math/rand"
- "os"
-)
-
-func main() {
- if len(os.Args) != 2 {
- fmt.Fprintf(os.Stderr, "Usage: %s <config-file-path>\n",
- os.Args[0])
- os.Exit(1)
- }
-
- configFile := os.Args[1]
- conf := util.ReadConfig(configFile)
-
- topic := "purchases"
- p, err := kafka.NewProducer(&conf)
- if err != nil {
- fmt.Printf("Failed to create producer: %s", err)
- os.Exit(1)
- }
-
- // Go-routine to handle message delivery reports and
- // possibly other event types (errors, stats, etc)
- go func() {
- for e := range p.Events() {
- switch ev := e.(type) {
- case *kafka.Message:
- if ev.TopicPartition.Error != nil {
- fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
- } else {
- fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n",
- *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
- }
- }
- }
- }()
-
- users := [...]string{"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}
- items := [...]string{"book", "alarm clock", "t-shirts", "gift card", "batteries"}
-
- for n := 0; n < 10; n++ {
- key := users[rand.Intn(len(users))]
- data := items[rand.Intn(len(items))]
- err := p.Produce(&kafka.Message{
- TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
- Key: []byte(key),
- Value: []byte(data),
- }, nil)
- if err != nil {
- log.Fatalln(err)
- }
- }
-
- // Wait for all messages to be delivered
- p.Flush(15 * 1000)
- p.Close()
-}