First create a Go Module
mkdir kafka-event
cd kafka-event
go mod init gokafka
For my application
RegisterAccountEvent
and DeactivateAccountEvent
Then create an event.go
file
touch events.go
package main
import "reflect"
var Topics = []string{
reflect.TypeOf(RegisterAccountEvent{}).Name(),
reflect.TypeOf(DeactivateAccountEvent{}).Name(),
}
type RegisterAccountEvent struct {
TransactionID string `json:"transaction_id"`
Email string `json:"email"`
Firstname string `json:"firstname"`
Lastname string `json:"lastname"`
Age int `json:"age"`
}
type DeactivateAccountEvent struct {
TransactionID string `json:"transaction_id"`
Email string `json:"email"`
}
Now I will write a consumer.go
file. My kafka client libraries will be confluent-kafka-go and this library
requires you to install librdkafka, so you have to install the C client. To install it, use this
apt install build-essential pkg-config git
Then you will be able to use the kafka client library.
touch consumer.go
package main
import (
"encoding/json"
"log"
"os"
"os/signal"
"reflect"
"syscall"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func handleEvent(topic *string, value []byte) {
if topic == nil {
return
}
log.Println(*topic)
switch *topic {
case reflect.TypeOf(RegisterAccountEvent{}).Name():
var newAccount RegisterAccountEvent
if err := json.Unmarshal(value, &newAccount); err != nil {
return
}
// do something
log.Printf("Welcome %s (%s %s)\n", newAccount.Email, newAccount.Firstname, newAccount.Lastname)
case reflect.TypeOf(DeactivateAccountEvent{}).Name():
var goneAccount DeactivateAccountEvent
if err := json.Unmarshal(value, &goneAccount); err != nil {
return
}
// do something
log.Printf("Goodbye %s", goneAccount.Email)
}
}
func RunConsumer(servers string) {
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": servers,
"group.id": "mygroup",
"auto.offset.reset": "earliest",
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"session.timeout.ms": 6000,
})
if err != nil {
log.Fatal("Consumer can't connect")
}
defer consumer.Close()
consumer.SubscribeTopics(Topics, nil)
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case <-sig:
log.Println("Shutting down consumer")
return
case ev := <-consumer.Events():
switch ev := ev.(type) {
case kafka.AssignedPartitions:
log.Println(ev)
consumer.Assign(ev.Partitions)
case kafka.RevokedPartitions:
log.Println(ev)
consumer.Unassign()
case *kafka.Message:
log.Printf("Message received: partition - %v + value: %v\n", ev.TopicPartition, string(ev.Value))
handleEvent(ev.TopicPartition.Topic, ev.Value)
case kafka.Error:
log.Printf("Error occurred: %v", ev)
}
}
}
}
This is my consumer config, the consumer client will consumer topics that has been declared in event.go
file.
And then it will handle events based on the type of content.
Now write the producer client
touch producer.go
package main
import (
"fmt"
"log"
"os"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func ProduceMessage(producer *kafka.Producer, topic string, value []byte) error {
return producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: value,
}, nil)
}
func eventListener(events chan kafka.Event) {
for ev := range events {
m, ok := ev.(*kafka.Message)
if !ok {
continue
}
if m.TopicPartition.Error != nil {
fmt.Fprintf(os.Stderr, "%% Delivery error: %v\n", m.TopicPartition)
} else {
fmt.Fprintf(os.Stderr, "%% Delivered %v\n", m)
}
}
}
func RunProducer(servers string) {
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": servers,
})
if err != nil {
log.Fatal("Producer can't connect")
}
defer producer.Close()
go eventListener(producer.Events())
runServerWithGracefullyShutdown(producer)
}
There are two API endpoints(POST /account and DELETE /account). This server is just here to parse json message from users' requests then use the producer client to publish events to the consumer.
Next, create an admin client to initialize topics before you use them
touch admin.go
package main
import (
"context"
"log"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
// for initializing topics
func RunAdmin(servers string) {
admin, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": servers,
})
if err != nil {
log.Fatal("Admin can't connect")
}
defer admin.Close()
var topicSpecifications []kafka.TopicSpecification
for _, topic := range Topics {
topicSpecifications = append(topicSpecifications, kafka.TopicSpecification{
Topic: topic,
NumPartitions: 2,
ReplicationFactor: 1,
})
}
results, err := admin.CreateTopics(context.Background(), topicSpecifications, kafka.SetAdminOperationTimeout(time.Second*5))
if err != nil {
log.Fatal(err)
}
for _, result := range results {
log.Printf("%s\n", result)
}
}
Last, the setup for the main function
touch main.go
package main
import (
"os"
)
func main() {
servers := "localhost:9092"
var mode string
if v, ok := os.LookupEnv("mode"); ok {
mode = v
}
if mode == "consumer" {
RunConsumer(servers)
} else if mode == "producer" {
RunProducer(servers)
} else if mode == "admin" {
RunAdmin(servers)
}
}
Now After writing these setup, I can start making kafka server. In this case, I will use the kafka docker image
touch docker-compose.yml
version: "3.9"
services:
zookeeper:
image: bitnami/zookeeper
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka
ports:
- "9092:9092"
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
Then start the docker server with
docker compose up
Wait until the kafka server is ready then open another terminals to run these programs. Create topics before start working on kafka
mode=admin go run .
Start the consumer
mode=consumer go run .
You can spawn two consumers to consume events (this depends on the number of partitions provided in admin.go
)
Then start the API server (the producer)
mode=producer go run .
Then send some http requests and check what will happen in your consumer programs
@API = http://localhost:8080
###
POST {{API}}/account
Content-Type: application/json
{
"email": "wilbert@example.com",
"firstname": "Wilbert",
"lastname": "Smith",
"age": 20
}
###
DELETE {{API}}/account
Content-Type: application/json
{
"email": "Hello@example.com"
}
P.S. the number of consumers that are running shouldn't be more than the number of partitions that was set in admin.go
else there will be consumers that will do nothing
To clean up the application, press ctrl+C and use docker compose down
. Done
sources