From a457621e444b4e0719c076a172a62d823616a122 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 7 Feb 2024 19:14:47 +0100 Subject: wip --- go/producer.go | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 go/producer.go (limited to 'go/producer.go') diff --git a/go/producer.go b/go/producer.go new file mode 100644 index 0000000..2eb33d1 --- /dev/null +++ b/go/producer.go @@ -0,0 +1,64 @@ +package main + +import ( + "fmt" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "kafka-sandbox/util" + "log" + "math/rand" + "os" +) + +func main() { + if len(os.Args) != 2 { + fmt.Fprintf(os.Stderr, "Usage: %s \n", + os.Args[0]) + os.Exit(1) + } + + configFile := os.Args[1] + conf := util.ReadConfig(configFile) + + topic := "purchases" + p, err := kafka.NewProducer(&conf) + if err != nil { + fmt.Printf("Failed to create producer: %s", err) + os.Exit(1) + } + + // Go-routine to handle message delivery reports and + // possibly other event types (errors, stats, etc) + go func() { + for e := range p.Events() { + switch ev := e.(type) { + case *kafka.Message: + if ev.TopicPartition.Error != nil { + fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition) + } else { + fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n", + *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value)) + } + } + } + }() + + users := [...]string{"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"} + items := [...]string{"book", "alarm clock", "t-shirts", "gift card", "batteries"} + + for n := 0; n < 10; n++ { + key := users[rand.Intn(len(users))] + data := items[rand.Intn(len(items))] + err := p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, + Key: []byte(key), + Value: []byte(data), + }, nil) + if err != nil { + log.Fatalln(err) + } + } + + // Wait for all messages to be delivered + p.Flush(15 * 1000) + p.Close() +} -- cgit v1.2.3