package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "kafka-sandbox/util" "log" "os" "os/signal" "syscall" ) func main() { if len(os.Args) != 2 { fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) os.Exit(1) } configFile := os.Args[1] conf := util.ReadConfig(configFile) conf["group.id"] = "kafka-go-getting-started" conf["auto.offset.reset"] = "earliest" c, err := kafka.NewConsumer(&conf) if err != nil { fmt.Printf("Failed to create consumer: %s", err) os.Exit(1) } topic := "purchases" err = c.SubscribeTopics([]string{topic}, nil) // Set up a channel for handling Ctrl-C, etc sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) // Process messages run := true for run { select { case sig := <-sigchan: fmt.Printf("Caught signal %v: terminating\n", sig) run = false default: ev := c.Poll(100) if ev == nil { continue } switch e := ev.(type) { case *kafka.Message: if e.TopicPartition.Error != nil { onTopicPartitionError(e) } else { onMessage(e) } case kafka.OffsetsCommitted: log.Println("Offsets committed") for i, offset := range e.Offsets { log.Printf(" %d: %s\n", i, offset.String()) } case kafka.Error: log.Fatalln(e) default: log.Printf("Unhandled message type: %s\n", e.String()) } } } c.Close() } func onMessage(ev *kafka.Message) { fmt.Printf("-------- %s\n", ev.TopicPartition.String()) fmt.Printf("%s %s: %s\n", ev.TimestampType.String(), ev.Timestamp.String(), string(ev.Key)) for _, header := range ev.Headers { fmt.Printf("%s: %s\n", header.Key, string(header.Value)) } fmt.Printf("%s\n", string(ev.Value)) } func onTopicPartitionError(e *kafka.Message) { fmt.Printf("Partition error: %s\n", e.TopicPartition.String()) } func showAssignment(err error, c *kafka.Consumer) error { partitions, err := c.Assignment() if err != nil { return err } fmt.Printf("Consuming %d partitions: \n", len(partitions)) for i, partition := range partitions { fmt.Printf(" #%d: %s\n", i+1, partition.String()) } return nil }