From a457621e444b4e0719c076a172a62d823616a122 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 7 Feb 2024 19:14:47 +0100 Subject: wip --- producer.go | 64 ------------------------------------------------------------- 1 file changed, 64 deletions(-) delete mode 100644 producer.go (limited to 'producer.go') diff --git a/producer.go b/producer.go deleted file mode 100644 index 2eb33d1..0000000 --- a/producer.go +++ /dev/null @@ -1,64 +0,0 @@ -package main - -import ( - "fmt" - "github.com/confluentinc/confluent-kafka-go/v2/kafka" - "kafka-sandbox/util" - "log" - "math/rand" - "os" -) - -func main() { - if len(os.Args) != 2 { - fmt.Fprintf(os.Stderr, "Usage: %s \n", - os.Args[0]) - os.Exit(1) - } - - configFile := os.Args[1] - conf := util.ReadConfig(configFile) - - topic := "purchases" - p, err := kafka.NewProducer(&conf) - if err != nil { - fmt.Printf("Failed to create producer: %s", err) - os.Exit(1) - } - - // Go-routine to handle message delivery reports and - // possibly other event types (errors, stats, etc) - go func() { - for e := range p.Events() { - switch ev := e.(type) { - case *kafka.Message: - if ev.TopicPartition.Error != nil { - fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition) - } else { - fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n", - *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value)) - } - } - } - }() - - users := [...]string{"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"} - items := [...]string{"book", "alarm clock", "t-shirts", "gift card", "batteries"} - - for n := 0; n < 10; n++ { - key := users[rand.Intn(len(users))] - data := items[rand.Intn(len(items))] - err := p.Produce(&kafka.Message{ - TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, - Key: []byte(key), - Value: []byte(data), - }, nil) - if err != nil { - log.Fatalln(err) - } - } - - // Wait for all messages to be delivered - p.Flush(15 * 1000) - p.Close() -} -- cgit v1.2.3