Debezium test drive

Debezium is an open source, distributed change data capture system built on top of Apache Kafka. I tried it out and the project is available on Github


Details are in the README. It uses the Debezium tutorial as a background but the setup is very simple since it uses Docker Compose – one command (docker-compose up --build is all it takes to get going !


  • MySQL – the database
  • Debezium – the CDC platform which tails MySQL binlog and pushes its logs to Kafka using its MySQL Kafka Connect connector
  • Consumer – its a Java EE application which consumes the (DB change) events from Kafka (topic), derserializes and parses it out and logs it




Posted in Distributed systems, Kafka | Tagged , , , , , , | 1 Comment

Kafka & Websocket

For those who are interested in an example of Kafka working with the (Java EE) Websocket API, please check out this blog . There is an associated Github project as well


Posted in Distributed systems, Kafka | Tagged , , | Leave a comment

Kafka producer and partitions

There are only a few possible ways to specify partitions while using the Kafka Producer API

  • Just specify it in the ProducerRecord itself
  • If key is not null, (by default) Kafka will hash your key and calculate the partition
  • If key is null, (by default) Kafka will round-robin b/w all the partitions (to load balance the data)
  • If not, just use a custom Partitioner

If interested, you can also check out the Kafka Partitioning blog


Posted in Distributed systems, Kafka | Tagged , , | Leave a comment

Kafka Streams state stores…

This blog explores some common aspects of state stores in Kafka Streams…

Default state store

By default, Kafka Streams uses the RocksDB as it’s default state store

In-memory or persistent ?

This parameter of the state store is configurable. RocksDB can work in both modes and you can toggle this using the Stores factory API.

Once the StateStoreSupplier (api) is created it can be used in the (high level) Kafka Streams DSL API as well as the (low level) Processor API

Persistent storage medium

In case of persistent stores,  RocksDB (default) flushes the state store contents to the file system which can be specified by the StreamsConfig.STATE_DIR_CONFIG (this is not a compulsory parameter)

config.put(StreamsConfig.STATE_DIR_CONFIG, "my-state-store")


  • this state store is managed by Kafka Streams internally
  • it is also replicated to Kafka (for fault tolerance and elasticity) topic – this is log compacted topic and nothing but a changelog of the local state store contents (this is the default behavior which is also configurable using the enableLogging method or can be turned off using disableLogging 
  • it is possible to query these state stores using interactive queries feature

DSL vs Processor API

There are differences in the way DSL and Processor APIs handle state stores

Read-only vs writable stores

the DSL API restricts access to a read-only view of the state store (via ReadOnlyKeyValueStore) as opposed to the Processor API using which you can get access to a writable view which allows you to mutate the state store

Custom state stores

As of now (0.10.2) it’s not possible to plugin your custom state store implementation when using the DSL API. This means that with the DSL API, you are limited to the RocksDB based state store implementation. With the Processor API, it’s possible to configure a custom implementation (more on this below)

Custom state stores

As mentioned earlier, the Processor API gives you the freedom to use your own state store in a streams application. You can wrap your custom state store on top of the Kafka Streams API itself – by implementing the required interfaces like StateStore , StateStoreSupplier etc. More details here. Doing this will allow you to query the state store using standard Kafka Streams APIs

But …..

A custom state implementation might already have a query feature.. So does it still make sense to wrap it with the Kafka Streams interfaces/APIs ?

the answer is yes.. if…

you want to leverage the fault tolerance and elastic capabilities which are come for free (if configured using aforementioned parameters) with the Kafka Streams API – this is due to the changelog topic which is created in Kafka (corresponding to each state store). A custom state store implementation which is not based on the Streams API

  • will not interact with the Kafka broker to backup the state stores, as a result of which
  • task re-distribution during scale-in/scale-out will not be possible since the participating application nodes will not be able to synchronize the latest changes made to other local stores due to the lack of a global checkpoint

Note that this problem can be circumvented with the help of an inherently distributed state store implementation which all the stream processing application instances can access

Other stuff


Posted in Distributed systems, Kafka | Tagged , , , , | 1 Comment

Docker-ized Kafka Streams applications

Here is another example of a Kafka Streams based application.. this time, it’s about running it in Docker containers – spawn more containers to distribute the processing load. More details in the README



Posted in Distributed systems, Kafka | Tagged , , , , | Leave a comment

Kafka Streams based application

A Kafka Streams sample application is available on Github… This is a microservice (packaged in form of an Uber JAR) which uses the Kafka Streams Processor (low level) API to calculate the Cumulative Moving Average of the CPU metrics of each machine in a distributed manner

  • A producer application continuously emits CPU usage metrics into a Kafka topic (cpu-metrics-topic) and consumer application (instances) do the computation
  • Consumers can be horizontally scaled – the processing work is distributed amongst many nodes and the process is elastic and flexible thanks to Kafka Streams (and the fact that it leverages Kafka for fault tolerance etc.)
  • Each instance has its own (local) state for the calculated average. A custom REST API has been (using Jersey JAX-RS implementation) to tap into this state and provide a unified view of the entire system (moving averages of CPU usage of all machines)

….. more in the project README


Posted in Distributed systems, Kafka | Tagged , , , | Leave a comment

Tip: ClassCastException in Kafka Streams reducer

If you use the groupByKey function on a KStream without specifying a Serdes, the (one configured in the StreamsConfig will be used by default e.g. in the below snippet, it’s Serdes.String(). As a result, you will face a ClassCastExcpetion in case

  • you execute an operation (e.g. mapValues)  which changes the data type of the key or value, and
  • do not specify the corresponding Serdes for the data type while execuing subsequent operations (e.g. groupByKey)

Quick Tip

The solution is to use the appropriate Serdes (in the below example, Serdes.Double() is explicitly specified)


Posted in Distributed systems, Kafka | Tagged , , , , , | Leave a comment

Microservices messaging on Oracle Cloud using Apache Kafka

Here is a blog I posted on the Oracle Cloud Developer Solutions portal. This is the first of a two-part series which shows asynchronous messaging b/w microservices with the help of a simple example (application)


Technical components

Oracle Cloud

Open source

  • Apache Kafka: scalable pub-sub message hub
  • Jersey: Used to implement REST and SSE services. Uses Grizzly as a (pluggable) runtime/container


Posted in Distributed systems, Kafka | Tagged , , , , , , , | Leave a comment

Apache Curator: distributed (try) locks

Apache Curator provides different types of distributed locks such as a basic one, re-entrant lock, re-entrant read write lock etc. In this blog we look at one such lock implementation (an InterProcessMutex )

  • its API
  • how it simulates the tryLock feature in Java Lock
  • walk through sample code

Code available on Github


Apache Curator: the keeper for Zookeeper

InterProcessMutex API

It has a simple API where the primary components are

  • the constructor itself: how to instantiate a distributed lock
  • acquire the lock
  • relinquish (release) the lock

Prior to entering a critical section, an application or process would need to obtain a lock in order to ensure that it is the only one executing that piece of logic

Block for Lock

What’s important to understand is that the acquire method blocks until the lock is available. This has some important implications for multiple processes which are competing for this lock

  • Only one will succeed – this is obvious
  • Since acquire method blocks, the other processes will queue up
  • The queuing up will happen in a fair manner (as per implementation) i.e. the process which calledacquire first (as per Zookeeper, not as per the application) will be the next in queue to get the lock once it’s released by the process which actually has the lock

Depending upon your use case, you might or might not want the queuing effect. For example

  • A node in your application gets the lock, finish a part of the job (and save it state to a shared location like a DB) and release the lock. You might want that the other node in that the application to continue processing from where the previous node left – using acquire to let the nodes wait in a queue makes sense
  • If you have a scheduled timer distributed across multiple nodes in your cluster and need it to be executed by one node at a time, then you might want to use the overloaded form of acquire method. This will avoid queing up effect of processes – its something wich you would not want since the timer is already pre-configured to fire after certain intervals

Distributed try lock

Just like the tryLock method in Java’s Lock, the InterProcessMutex provides an overloaded version of acquire which accepts a time out. It does not block – all it does is the following

  • returns true if the lock is obtained within the stipulated time out
  • returns false i.e. it gives up if the lock is not available and the time out is breached

Code walk through

Details in the Github project README

Further reading


Posted in Curator, Distributed systems, Zookeeper | Tagged , , , , | Leave a comment

Kafka Partitioning…

Partitions are the key to scalability attributes of Kafka. Developers can also implement custom partitioning algorithm to override the default partition assignment behavior. This post will briefly cover

  • Partitions in general
  • Data distribution, default partitioning, and
  • Example of custom partitioning logic

Partitions in Kafka

In Kafka, partitions serve as another layer of abstraction – a Partition

Here is a quickie

  • Topic is divided into one (default, can be increased) or more partitions
  • A partition is like a log
  • Publishers append data (end of log) and each entry is identified by a unique number called the offset. The data in the partition is immutable
  • Records in the partition are immutable and stored for a configurable amount of time (after which they are removed from disk)
  • Consumers can read (they pull data) from any position (offset) in a partition, move forward or back
  • Each partition is replicated (as per replication factor configuration) which means that it can have (at most) one primary copy (on the leader node) and and 0 or more copies(follower nodes)
  • Kafka ensures strict ordering within a partition i.e. consumers will receive it in the order which a producer published the data to begin with

Distributing partitions across nodes

In Kafka, spreading/distributing the data over multiple machines deals with partitions (not individual records). Scenario depicted below


Partition distribution over a Kafka cluster

  • Primary partition placement: 2 partitions per node e.g. N1 will have P1,P3 & N2 will have P2, P4
  • Replica placement: one replica for each partition since the factor is 2 i.e. one replica in addition to a primary copy (total 2)

Data and Partitions: the relationship

When a producer sends data, it goes to a topic – but that’s 50,000 foot view. You must understand that

  • data is actually a key-value pair
  • its storage happens at a partition level

A key-value pair in a messaging system like Kafka might sound odd, but the key is used for intelligent and efficient data distribution within a cluster. Depending on the key, Kafka sends the data to a specific partition and ensures that its replicated as well (as per config). Thus, each record

Default behavior

The data for same key goes to same partition since Kafka uses a consistent hashing algorithm to map key to partitions. In case of a null key (yes, that’s possible), the data is randomly placed on any of the partition. If you only have one partition: ALL data goes to that single partition


Data in partitions

Custom partitioning scheme

You can plugin a custom algorithm to partition the data

  • by implementing the Partitioner interface
  • configure Kafka producer to use it

Here is an example

Generate random partitions which are within the valid partition range

Further reading


Posted in Distributed systems, Kafka | Tagged , , , , | 1 Comment