diff options
| author | Trygve Laugstøl <trygvis@inamo.no> | 2024-02-07 19:12:07 +0100 | 
|---|---|---|
| committer | Trygve Laugstøl <trygvis@inamo.no> | 2024-02-07 19:12:07 +0100 | 
| commit | df50aaee6c6ef4b51638fe22317e28234ffe8acc (patch) | |
| tree | 6cae8794fa21a893c8751643a29c4ede6fe0ef94 | |
| parent | 5200eb197082495227fb9d19f4303a8e3edee948 (diff) | |
| download | kafka-sandbox-df50aaee6c6ef4b51638fe22317e28234ffe8acc.tar.gz kafka-sandbox-df50aaee6c6ef4b51638fe22317e28234ffe8acc.tar.bz2 kafka-sandbox-df50aaee6c6ef4b51638fe22317e28234ffe8acc.tar.xz kafka-sandbox-df50aaee6c6ef4b51638fe22317e28234ffe8acc.zip | |
wip
| -rw-r--r-- | consumer.go | 74 | ||||
| -rw-r--r-- | producer.go | 1 | 
2 files changed, 51 insertions, 24 deletions
| diff --git a/consumer.go b/consumer.go index b9c1b45..4047c4e 100644 --- a/consumer.go +++ b/consumer.go @@ -4,10 +4,10 @@ import (  	"fmt"  	"github.com/confluentinc/confluent-kafka-go/v2/kafka"  	"kafka-sandbox/util" +	"log"  	"os"  	"os/signal"  	"syscall" -	"time"  )  func main() { @@ -23,23 +23,11 @@ func main() {  	conf["auto.offset.reset"] = "earliest"  	c, err := kafka.NewConsumer(&conf) -  	if err != nil {  		fmt.Printf("Failed to create consumer: %s", err)  		os.Exit(1)  	} -	partitions, err := c.Assignment() -	if err != nil { -		fmt.Printf("Failed to get assignments: %s", err) -		os.Exit(1) -	} - -	fmt.Printf("Consuming %d partitions: \n", len(partitions)) -	for i, partition := range partitions { -		fmt.Printf(" #%d: %s\n", i+1, partition.String()) -	} -  	topic := "purchases"  	err = c.SubscribeTopics([]string{topic}, nil)  	// Set up a channel for handling Ctrl-C, etc @@ -54,20 +42,60 @@ func main() {  			fmt.Printf("Caught signal %v: terminating\n", sig)  			run = false  		default: -			ev, err := c.ReadMessage(100 * time.Millisecond) -			if err != nil { -				// Errors are informational and automatically handled by the consumer +			ev := c.Poll(100) +			if ev == nil {  				continue  			} -			fmt.Printf("%s %s %s: %+v, key = %-10s value = %s\n", -				ev.TimestampType.String(), -				ev.Timestamp.String(), -				*ev.TopicPartition.Topic, -				ev.Headers, -				string(ev.Key), -				string(ev.Value)) + +			switch e := ev.(type) { +			case *kafka.Message: +				if e.TopicPartition.Error != nil { +					onTopicPartitionError(e) +				} else { +					onMessage(e) +				} +			case kafka.OffsetsCommitted: +				log.Println("Offsets committed") +				for i, offset := range e.Offsets { +					log.Printf(" %d: %s\n", i, offset.String()) +				} +			case kafka.Error: +				log.Fatalln(e) +			default: +				log.Printf("Unhandled message type: %s\n", e.String()) +			}  		}  	}  	c.Close()  } + +func onMessage(ev *kafka.Message) { +	fmt.Printf("-------- %s\n", ev.TopicPartition.String()) +	fmt.Printf("%s %s: %s\n", +		ev.TimestampType.String(), +		ev.Timestamp.String(), +		string(ev.Key)) +	for _, header := range ev.Headers { +		fmt.Printf("%s: %s\n", header.Key, string(header.Value)) +	} +	fmt.Printf("%s\n", string(ev.Value)) +} + +func onTopicPartitionError(e *kafka.Message) { +	fmt.Printf("Partition error: %s\n", e.TopicPartition.String()) +} + +func showAssignment(err error, c *kafka.Consumer) error { +	partitions, err := c.Assignment() +	if err != nil { +		return err +	} + +	fmt.Printf("Consuming %d partitions: \n", len(partitions)) +	for i, partition := range partitions { +		fmt.Printf(" #%d: %s\n", i+1, partition.String()) +	} + +	return nil +} diff --git a/producer.go b/producer.go index f5392ae..2eb33d1 100644 --- a/producer.go +++ b/producer.go @@ -21,7 +21,6 @@ func main() {  	topic := "purchases"  	p, err := kafka.NewProducer(&conf) -  	if err != nil {  		fmt.Printf("Failed to create producer: %s", err)  		os.Exit(1) | 
