Event driven menggunakan Kafka dan Go

Umum kita jumpai aplikasi dengan ukuran besar, atau biasa disebut sebagai monolithic yang melakukan hal mulai dari konfigurasi aplikasi, penghitungan profit, manajemen user, dan lain sebagainya.

Tidak ada masalah untuk memiliki aplikasi yang monolithic, it's just a matter of choice and proper programming, hanya terkadang, if possible, memecah aplikasi menjadi bagian-bagian yang terkecil akan membuatnya lebih mudah untuk diubah, dipahami, dan dikembangkan lebih lanjut.

Event driven adalah teknik yang dapat kita gunakan untuk memecah-mecah bagian aplikasi, sehingga kita akan memiliki service-service kecil (microservice) yang fokus mensolve problem pada domainnya, yang dapat merespon event tertentu.

Study case

Katakanlah, kita memiliki aplikasi dashboard yang menampilkan secara realtime jumlah transaksi yang terjadi.

Untuk membuat aplikasi tersebut, kita bisa mempertimbangkan untuk menggunakan:

  1. Queueing
  2. Publish-subscribe

Dengan menggunakan queuing, pada dasarnya, begitu ada data baru yang masuk di-queue, kita akan melakukan operasi pop untuk mengambil data tersebut. Kelemahannya:

  1. Kita harus melakukan periodical check, apakah ada data baru. Periodical check ini wasting bagi resource computer yang seharusnya bisa melakukan hal lain.
  2. Jika terdapat banyak data, dan kita melakukan pop pada data-data tersebut, meskipun sistem queue melakukan guarantee pada ordering, tapi data mungkin bisa sampai out of order dikarenakan network latency.
  3. Data yang telah di-pop sudah tidak ada lagi di server, karena data sudah hilang dan di-consume. Jika data tidak sampai, atau server sedang down, terdapat kemungkinan data akan hilang.

Dengan menggunakan sistem publish-subscribe, kita bisa mengatasi beberapa masalah diatas, kecuali mungkin nomor ke-2 dan ke-3. Tetapi ada permasalahan baru, dimana dengan menggunakan pub/sub, semua consumer akan me-listen pada set of data yang sama, sehingga sulit melakukan paralelisasi secara otomatis, dimana fleet of computer machines melakukan pekerjaan yang berbeda di waktu yang sama, yang akan meringankan beban pekerjaan secara garis besar.

Dalam kasus ini, kita bisa memanfaatkan Kafka. Kafka pada dasarnya tidak lebih dari suatu broker. Kafka adalah middle-man antara producer/publisher dan consumer/subscriber.

Kafka in nutshell

Jika dibandingkan dengan database, maka table adalah topic di Kafka. Setiap table memiliki data, begitupula di Kafka. Data di Kafka diexpresikan sebagai String, atau disebut juga, simply, sebagai commit log.

Seluruh commit log tersebut tidak menempati tempat yang sama, karena suatu topic di Kafka bisa (diibaratkan) di-shard ke dalam beberapa partisi. Dengan kata lain, satu topik bisa memiliki lebih dari 1 partisi, dimana setiap partisi menyimpan berbagai commit log dalam hitungan puluhan, ribuan, jutaan, atau milyaran sekalipun. Kafka bisa menyimpan data sebanyak itu karena Kafka mengclaim dirinya memiliki operasi O(1), dimana aktifitas penulisan dan pembacaan bersifat konstan tidak peduli seberapa banyak data yang ada dalam Kafka.

Sebagai consumer/client/subscriber, kita dapat mendengar (me-subscribe) pada partisi-partisi tertentu. Consumer tersebut dapat dikelompokkan kedalam suatu consumer group. Setiap consumer dalam consumer group akan otomatis load-balancing.

Dari segi publisher, load-balancing terjadi ketika Kafka dicluster menjadi beberapa instansi. Ketika terdapat lebih dari satu instansi Kafka, maka setiap instansi tersebut akan menjadi leader. Setiap leader bertugas mengatur read/write ke partisi tertentu.

Penjelasan diatas tentu diusahakan agar terdapat banyak informasi dalam sesedikit mungkin paragraf. Untuk informasi yang lebih detail, disarankan pembaca untuk langsung mengunjungi situs dokumentasi dari Kafka.

Why Go?

Dalam artikel ini, kita akan mencoba membuat sistem profitter untuk menghitung profit ketika terdapat order di sistem utama. Kita akan menggunakan Go untuk membuat kedua sistem tersebut dengan pertimbangan sebagai berikut:

  1. Go dapat dicompile menjadi single binary yang sangat cocok untuk pemrograman microservice, sehingga binary akhir tidak memerlukan dependensi lain lagi.
  2. Go memiliki ekosistem yang berkembang cepat mengingat banyak sistem microservice(-oriented) seperti: Consul, Docker, Kubernetes, Serf yang semuanya diprogram menggunakan Go, kurang/lebih membuktikan kebergunaan Go dalam lingkup ini.
  3. Waktu starting up Go sangat singkat, cocok untuk microservice.

That being said, Go bukanlah silver bullet dalam mengembangkan sistem microservice, atau kita tidak boleh tidak menggunakan Go dalam pengembangan aplikasi microservice.

Pre-coding

Sebelum melakukan coding, kita harus mempersiapkan environment sehingga kita memiliki:

  1. Go
  2. Kafka

Sangat direkomendasikan untuk menggunakan operating sistem berbasis Unix: macOS, Ubuntu, Debbian, atau yang lainnya.

Juga disarankan agar Kafka yang digunakan didownload dari situs confluent.

After the environment is set up, kita bisa mulai menstart ZooKeeper, system registry dan juga Kafka menggunakan perintah berikut:

$ cd ~/Downloads/confl-kafka

$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

$ ./bin/kafka-server-start ./etc/kafka/server.properties

$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

Pembuatan publisher

Publisher berguna untuk mengirim commit log ke Kafka. Dalam melakukan publishing event, terdapat dua mekanisme yang dapat dipilih: (1) async, (2) synchronous.

Katakanlah kita memiliki 10 data. Saat mengirim data secara asynchronous, kita bisa mengirim ke-sepuluh data tersebut secara bersamaan, dan Kafka dapat menerima data tersebut secara bersamaan tanpa melakukan proses replikasi atau proses-proses internal lainnya. Yang terjadi adalah, kita akan memiliki throughput yang lebih tinggi, tapi kemungkinan jika data tidak berhasil dikirim, tergantung teknik pemrograman yang digunakan, data bisa saja hilang.

Sistem async sangat cocok digunakan untuk mengirim log aktifitas pengguna aplikasi ke Kafka. Ketika ada data yang hilang, maka kehilangan tersebut tidak parah. Juga, pengiriman log tidak dapat menghambat performa aplikasi secara berlebihan.

Alternatif lain adalah pengiriman secara synchronous, dimana pengiriman kesepuluh data tersebut harus dilakukan satu-per-satu. Data ke-2 tidak akan dikirim sebelum data ke-1 sukses diterima, dan (depends on the config, dan jumlah cluster) direplikasi oleh Kafka.

Di Go, untuk pengiriman secara async, library Sarama menggunakan konstruksi di Go yang disebut channel, yang memang bersifat konkuren (async). Sedangkan pengiriman synchronous dilakukan melalui fungsi SendMessage yang terdapat pada protokol producer yang synchronous.

Karena kita membutuhkan library luar (Sarama), kita harus melakukan go get sehingga library tersebut dapat digunakan saat melakukan development aplikasi:

go get github.com/shopify/sarama  

Producer kita dapat menerima beberapa variabel yang harusnya dapat dikonfigurasi saat aplikasi baru dimulai. Kita bisa menggunakan kingpin, atau bisa juga menggunakan librari internal flag yang sudah mumpuni:

var (  
    brokers = flag.String(
        "brokers",
        "127.0.0.1:9001,127.0.0.1:9101,127.0.0.1:9201",
        "The Kafka brokers to connect to, as a comma separated list",
    )
)

Kita juga harus memastikan untuk melakukan flag.Parse() sehingga jika user melakukan customisasi nilai flag, nilai default akan terganti.

Sebelum mendeklarasikan producer, kita perlu mendeklarasikan konfigurasi terlebih dahulu:

    brokerList := strings.Split(*brokers, ",")

    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Return.Successes = true
    config.ChannelBufferSize = 1
    config.Version = sarama.V0_10_0_1

Setelah itu kita mendefinisikan producer:

    producer, err := sarama.NewSyncProducer(brokerList, config)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := producer.Close(); err != nil {
            panic(err)
        }
    }()

Kita juga mendefinisikan fungsi yang akan di-defer, yang otomatis akan dieksekusi begitu fungsi pemanggil telah selesai dieksekusi (return).

Kemudian, kita mendefinisikan fungsi sendMessage untuk mengirim pesan ke Kafka:

func sendMessage(producer sarama.SyncProducer, text string) {  
    msg := &sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.StringEncoder(text),
    }

    partition, offset, err := producer.SendMessage(msg)
    if err != nil {
        panic(err)
    }

    fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)\n",
        topic, partition, offset)
}

Sehingga, untuk aplikasi CLI ini, dalam main fungsi kita, kita pastikan bahwa aplikasi akan selalu menerima input dari user hingga user mengetik 'end'. Setiap kali user menekan enter, maka pesan tersebut akan dikirim menggunakan sendMessage ke seluruh broker Kafka.

    for {
        fmt.Print("-> ")
        text, _ := reader.ReadString('\n')
        text = strings.Replace(text, "\n", "", -1)
        if text == "end" {
            break
        } else {
            sendMessage(producer, text)
        }
    }

Pembuatan consumer

Setiap data yang masuk, adalah data yang memiliki struktur:

ID status_transaksi jumlah_amount_transaksi  

Contoh data yang valid adalah:

1 paid 20000  
2 unpaid 35000  
1 cancelled 20000  
2 paid 35000  

Dengan contoh diatas, maka profit yang terakumulasi adalah 35.000. Selain format diatas, maka sistem profitter tidak akan mengubah profit, tidak akan melakukan kalkulasi apapun, dan secara efektif akan menghiraukan commit log tersebut.

Dengan memiliki config yang sama saat pembuatan publisher, kita akan memiliki master dan consumer yang didefinisikan sebagai berikut:

    master, err := sarama.NewConsumer(brokerList, config)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := master.Close(); err != nil {
            panic(err)
        }
    }()

    consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest)
    if err != nil {
        panic(err)
    }

Untuk mulai listening event pada Kafka, kita akan subscribe ke read channel Messages:

    go func() {
        for {
            select {
            case err := <-consumer.Errors():
                fmt.Println(err)
            case msg := <-consumer.Messages():
                s := strings.Split(string(msg.Value), " ")
        }
    }()

Kemudian kita pastikan bahwa panjang s adalah 3 elemen, kemudian kita melakukan pemrosesan profit berdasarkan status:

if len(s) == 3 {  
    id := s[0]
    status := s[1]
    price, err := strconv.ParseFloat(s[2], 64)

    if err == nil {
        if status == "paid" {
            profit += price
        } else if status == "cancelled" {
            profit -= price
        }
    }
    fmt.Println("Profit ", strconv.FormatFloat(profit, 'f', 2, 64), " from: ", string(id), " - ", string(status), " - ", price)
} else {
    fmt.Println("Ignoring message: ", s)
}

Agar microservice ini bisa dihentikan tanpa menggunakan kill, kita perlu menangkap sinyal Ctrl+C (interrupt) yang sering digunakan untuk request terminasi suatu proses. Pertama, kita definisikan signals (dan melakukan import os/signal):

signals := make(chan os.Signal, 1)  
signal.Notify(signals, os.Interrupt)  
// Get signal for finish
doneCh := make(chan struct{})  

Kemudian pada kode terakhir di fungsi main kita membaca sinyal doneCh:

func main() {  
  // any code above
  <-doneCh
}

Kode diatas akan menghalt fungsi main hingga ada data dari channel doneCh. Data tersebut akan berasal dari signals yang akan kita tangkap di go routine anonymous yang didefinisikan sebelumnya. Dalam go-routine anonym tadi, kita tambahkan kode:

for {  
    select {
    case err := <-consumer.Errors():
        ...
    case msg := <-consumer.Messages():
        ...
    case <-signals:
        fmt.Println("Interrupt is detected")
        doneCh <- struct{}{}
    }
}

Eksekusi aplikasi

Fhew, cukup teknis. Sekarang, kita mencoba jerih payah kita. Pertama, kita eksekusi microservice booking kita:

go run booking.go --brokers=127.0.0.1:9092  

Kemudian menginput beberapa data:

Setelah itu, kita me-run microservice profitter:

go run profitter.go --brokers=127.0.0.1:9092  

Saat ada transaksi baru yang ter-generate di service booking, maka profitter akan otomatis melakukan kalkukasi secara realtime. Nice, microservices kita work as expected.

Bottom line

Contoh microservice diatas dikatakan event-driven dikarenakan sistem profitter merespon pada event yang dipublish oleh sistem publisher, melalui message broker yang dalam contoh ini adalah Kafka.

Terdapat berbagai jenis use case lainnya yang disupport oleh Kafka, namun pada dasarnya Kafka adalah suatu sistem message broker modern dengan berbagai kelebihan, yang layak dipertimbangkan dalam mendesain aplikasi dengan approach microservices.

Artikel - Artikel Terkait