summaryrefslogtreecommitdiff
path: root/producer.go
diff options
context:
space:
mode:
Diffstat (limited to 'producer.go')
-rw-r--r--producer.go62
1 files changed, 62 insertions, 0 deletions
diff --git a/producer.go b/producer.go
new file mode 100644
index 0000000..8cf1b0e
--- /dev/null
+++ b/producer.go
@@ -0,0 +1,62 @@
+package main
+
+import (
+ "fmt"
+ "kafka-sandbox/util"
+ "math/rand"
+ "os"
+
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+)
+
+func main() {
+ if len(os.Args) != 2 {
+ fmt.Fprintf(os.Stderr, "Usage: %s <config-file-path>\n",
+ os.Args[0])
+ os.Exit(1)
+ }
+
+ configFile := os.Args[1]
+ conf := util.ReadConfig(configFile)
+
+ topic := "purchases"
+ p, err := kafka.NewProducer(&conf)
+
+ if err != nil {
+ fmt.Printf("Failed to create producer: %s", err)
+ os.Exit(1)
+ }
+
+ // Go-routine to handle message delivery reports and
+ // possibly other event types (errors, stats, etc)
+ go func() {
+ for e := range p.Events() {
+ switch ev := e.(type) {
+ case *kafka.Message:
+ if ev.TopicPartition.Error != nil {
+ fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
+ } else {
+ fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n",
+ *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
+ }
+ }
+ }
+ }()
+
+ users := [...]string{"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}
+ items := [...]string{"book", "alarm clock", "t-shirts", "gift card", "batteries"}
+
+ for n := 0; n < 10; n++ {
+ key := users[rand.Intn(len(users))]
+ data := items[rand.Intn(len(items))]
+ p.Produce(&kafka.Message{
+ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
+ Key: []byte(key),
+ Value: []byte(data),
+ }, nil)
+ }
+
+ // Wait for all messages to be delivered
+ p.Flush(15 * 1000)
+ p.Close()
+}