package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "kafka-sandbox/util" "os" "os/signal" "syscall" "time" ) func main() { if len(os.Args) != 2 { fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) os.Exit(1) } configFile := os.Args[1] conf := util.ReadConfig(configFile) conf["group.id"] = "kafka-go-getting-started" conf["auto.offset.reset"] = "earliest" c, err := kafka.NewConsumer(&conf) if err != nil { fmt.Printf("Failed to create consumer: %s", err) os.Exit(1) } partitions, err := c.Assignment() if err != nil { fmt.Printf("Failed to get assignments: %s", err) os.Exit(1) } fmt.Printf("Consuming %d partitions: \n", len(partitions)) for i, partition := range partitions { fmt.Printf(" #%d: %s\n", i+1, partition.String()) } topic := "purchases" err = c.SubscribeTopics([]string{topic}, nil) // Set up a channel for handling Ctrl-C, etc sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) // Process messages run := true for run { select { case sig := <-sigchan: fmt.Printf("Caught signal %v: terminating\n", sig) run = false default: ev, err := c.ReadMessage(100 * time.Millisecond) if err != nil { // Errors are informational and automatically handled by the consumer continue } fmt.Printf("%s %s %s: %+v, key = %-10s value = %s\n", ev.TimestampType.String(), ev.Timestamp.String(), *ev.TopicPartition.Topic, ev.Headers, string(ev.Key), string(ev.Value)) } } c.Close() }