Make event-based applications with Kafka


First create a Go Module

mkdir kafka-event
cd kafka-event
go mod init gokafka

For my application

  • I will write an API server to get requests from users and this server will have a kafka-producer for publishing events related to each user request
  • I will run a kafka-consumer app in background that subscribes to topics those would be published from the producer. This kafka-consumer will only connect to kafka and doesn't expose any ports to users.
  • I'm going to make only two topics which are RegisterAccountEvent and DeactivateAccountEvent

Then create an event.go file

touch events.go
package main

import "reflect"

var Topics = []string{
	reflect.TypeOf(RegisterAccountEvent{}).Name(),
	reflect.TypeOf(DeactivateAccountEvent{}).Name(),
}

type RegisterAccountEvent struct {
	TransactionID string `json:"transaction_id"`
	Email         string `json:"email"`
	Firstname     string `json:"firstname"`
	Lastname      string `json:"lastname"`
	Age           int    `json:"age"`
}

type DeactivateAccountEvent struct {
	TransactionID string `json:"transaction_id"`
	Email         string `json:"email"`
}

Now I will write a consumer.go file. My kafka client libraries will be confluent-kafka-go and this library requires you to install librdkafka, so you have to install the C client. To install it, use this

apt install build-essential pkg-config git

Then you will be able to use the kafka client library.

touch consumer.go
package main

import (
	"encoding/json"
	"log"
	"os"
	"os/signal"
	"reflect"
	"syscall"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func handleEvent(topic *string, value []byte) {
	if topic == nil {
		return
	}
	log.Println(*topic)
	switch *topic {
	case reflect.TypeOf(RegisterAccountEvent{}).Name():
		var newAccount RegisterAccountEvent
		if err := json.Unmarshal(value, &newAccount); err != nil {
			return
		}
		// do something
		log.Printf("Welcome %s (%s %s)\n", newAccount.Email, newAccount.Firstname, newAccount.Lastname)
	case reflect.TypeOf(DeactivateAccountEvent{}).Name():
		var goneAccount DeactivateAccountEvent
		if err := json.Unmarshal(value, &goneAccount); err != nil {
			return
		}
		// do something
		log.Printf("Goodbye %s", goneAccount.Email)
	}
}

func RunConsumer(servers string) {
	consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":               servers,
		"group.id":                        "mygroup",
		"auto.offset.reset":               "earliest",
		"go.events.channel.enable":        true,
		"go.application.rebalance.enable": true,
		"session.timeout.ms":              6000,
	})
	if err != nil {
		log.Fatal("Consumer can't connect")
	}
	defer consumer.Close()
	consumer.SubscribeTopics(Topics, nil)

	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

	for {
		select {
		case <-sig:
			log.Println("Shutting down consumer")
			return
		case ev := <-consumer.Events():
			switch ev := ev.(type) {
			case kafka.AssignedPartitions:
				log.Println(ev)
				consumer.Assign(ev.Partitions)
			case kafka.RevokedPartitions:
				log.Println(ev)
				consumer.Unassign()
			case *kafka.Message:
				log.Printf("Message received: partition - %v + value: %v\n", ev.TopicPartition, string(ev.Value))
				handleEvent(ev.TopicPartition.Topic, ev.Value)
			case kafka.Error:
				log.Printf("Error occurred: %v", ev)
			}
		}
	}
}

This is my consumer config, the consumer client will consumer topics that has been declared in event.go file. And then it will handle events based on the type of content.

Now write the producer client

touch producer.go
package main

import (
	"fmt"
	"log"
	"os"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func ProduceMessage(producer *kafka.Producer, topic string, value []byte) error {
	return producer.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
		Value:          value,
	}, nil)
}

func eventListener(events chan kafka.Event) {
	for ev := range events {
		m, ok := ev.(*kafka.Message)
		if !ok {
			continue
		}
		if m.TopicPartition.Error != nil {
			fmt.Fprintf(os.Stderr, "%% Delivery error: %v\n", m.TopicPartition)
		} else {
			fmt.Fprintf(os.Stderr, "%% Delivered %v\n", m)
		}
	}
}

func RunProducer(servers string) {
	producer, err := kafka.NewProducer(&kafka.ConfigMap{
		"bootstrap.servers": servers,
	})
	if err != nil {
		log.Fatal("Producer can't connect")
	}

	defer producer.Close()

	go eventListener(producer.Events())

	runServerWithGracefullyShutdown(producer)
}

There are two API endpoints(POST /account and DELETE /account). This server is just here to parse json message from users' requests then use the producer client to publish events to the consumer.

Next, create an admin client to initialize topics before you use them

touch admin.go
package main

import (
	"context"
	"log"
	"time"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

// for initializing topics
func RunAdmin(servers string) {
	admin, err := kafka.NewAdminClient(&kafka.ConfigMap{
		"bootstrap.servers": servers,
	})
	if err != nil {
		log.Fatal("Admin can't connect")
	}
	defer admin.Close()

	var topicSpecifications []kafka.TopicSpecification

	for _, topic := range Topics {
		topicSpecifications = append(topicSpecifications, kafka.TopicSpecification{
			Topic:             topic,
			NumPartitions:     2,
			ReplicationFactor: 1,
		})
	}

	results, err := admin.CreateTopics(context.Background(), topicSpecifications, kafka.SetAdminOperationTimeout(time.Second*5))

	if err != nil {
		log.Fatal(err)
	}

	for _, result := range results {
		log.Printf("%s\n", result)
	}
}

Last, the setup for the main function

touch main.go
package main

import (
	"os"
)

func main() {
	servers := "localhost:9092"
	var mode string
	if v, ok := os.LookupEnv("mode"); ok {
		mode = v
	}
	if mode == "consumer" {
		RunConsumer(servers)
	} else if mode == "producer" {
		RunProducer(servers)
	} else if mode == "admin" {
		RunAdmin(servers)
	}
}

Now After writing these setup, I can start making kafka server. In this case, I will use the kafka docker image

touch docker-compose.yml
version: "3.9"
services:
  zookeeper:
    image: bitnami/zookeeper
    environment:
    - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: bitnami/kafka
    ports:
    - "9092:9092"
    environment:
    - ALLOW_PLAINTEXT_LISTENER=yes
    - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
    - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
    - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092

Then start the docker server with

docker compose up

Wait until the kafka server is ready then open another terminals to run these programs. Create topics before start working on kafka

mode=admin go run .

Start the consumer

mode=consumer go run .

You can spawn two consumers to consume events (this depends on the number of partitions provided in admin.go)

Then start the API server (the producer)

mode=producer go run .

Then send some http requests and check what will happen in your consumer programs

@API = http://localhost:8080

###

POST {{API}}/account
Content-Type: application/json

{
  "email": "wilbert@example.com",
  "firstname": "Wilbert",
  "lastname": "Smith",
  "age": 20
}

###

DELETE  {{API}}/account
Content-Type: application/json

{
  "email": "Hello@example.com"
}

P.S. the number of consumers that are running shouldn't be more than the number of partitions that was set in admin.go else there will be consumers that will do nothing

To clean up the application, press ctrl+C and use docker compose down. Done

sources