Tip: ClassCastException in Kafka Streams reducer

If you use the groupByKey function on a KStream without specifying a Serdes, the (one configured in the StreamsConfig will be used by default e.g. in the below snippet, it’s Serdes.String(). As a result, you will face a ClassCastExcpetion in case

  • you execute an operation (e.g. mapValues)  which changes the data type of the key or value, and
  • do not specify the corresponding Serdes for the data type while execuing subsequent operations (e.g. groupByKey)

Quick Tip

The solution is to use the appropriate Serdes (in the below example, Serdes.Double() is explicitly specified)


Posted in Distributed systems, Kafka | Tagged , , , , , | Leave a comment

Microservices messaging on Oracle Cloud using Apache Kafka

Here is a blog I posted on the Oracle Cloud Developer Solutions portal. This is the first of a two-part series which shows asynchronous messaging b/w microservices with the help of a simple example (application)


Technical components

Oracle Cloud

Open source

  • Apache Kafka: scalable pub-sub message hub
  • Jersey: Used to implement REST and SSE services. Uses Grizzly as a (pluggable) runtime/container


Posted in Distributed systems, Kafka | Tagged , , , , , , , | Leave a comment

Apache Curator: distributed (try) locks

Apache Curator provides different types of distributed locks such as a basic one, re-entrant lock, re-entrant read write lock etc. In this blog we look at one such lock implementation (an InterProcessMutex )

  • its API
  • how it simulates the tryLock feature in Java Lock
  • walk through sample code

Code available on Github


Apache Curator: the keeper for Zookeeper

InterProcessMutex API

It has a simple API where the primary components are

  • the constructor itself: how to instantiate a distributed lock
  • acquire the lock
  • relinquish (release) the lock

Prior to entering a critical section, an application or process would need to obtain a lock in order to ensure that it is the only one executing that piece of logic

Block for Lock

What’s important to understand is that the acquire method blocks until the lock is available. This has some important implications for multiple processes which are competing for this lock

  • Only one will succeed – this is obvious
  • Since acquire method blocks, the other processes will queue up
  • The queuing up will happen in a fair manner (as per implementation) i.e. the process which calledacquire first (as per Zookeeper, not as per the application) will be the next in queue to get the lock once it’s released by the process which actually has the lock

Depending upon your use case, you might or might not want the queuing effect. For example

  • A node in your application gets the lock, finish a part of the job (and save it state to a shared location like a DB) and release the lock. You might want that the other node in that the application to continue processing from where the previous node left – using acquire to let the nodes wait in a queue makes sense
  • If you have a scheduled timer distributed across multiple nodes in your cluster and need it to be executed by one node at a time, then you might want to use the overloaded form of acquire method. This will avoid queing up effect of processes – its something wich you would not want since the timer is already pre-configured to fire after certain intervals

Distributed try lock

Just like the tryLock method in Java’s Lock, the InterProcessMutex provides an overloaded version of acquire which accepts a time out. It does not block – all it does is the following

  • returns true if the lock is obtained within the stipulated time out
  • returns false i.e. it gives up if the lock is not available and the time out is breached

Code walk through

Details in the Github project README

Further reading


Posted in Curator, Distributed systems, Zookeeper | Tagged , , , , | Leave a comment

Kafka Partitioning…

Partitions are the key to scalability attributes of Kafka. Developers can also implement custom partitioning algorithm to override the default partition assignment behavior. This post will briefly cover

  • Partitions in general
  • Data distribution, default partitioning, and
  • Example of custom partitioning logic

Partitions in Kafka

In Kafka, partitions serve as another layer of abstraction – a Partition

Here is a quickie

  • Topic is divided into one (default, can be increased) or more partitions
  • A partition is like a log
  • Publishers append data (end of log) and each entry is identified by a unique number called the offset. The data in the partition is immutable
  • Records in the partition are immutable and stored for a configurable amount of time (after which they are removed from disk)
  • Consumers can read (they pull data) from any position (offset) in a partition, move forward or back
  • Each partition is replicated (as per replication factor configuration) which means that it can have (at most) one primary copy (on the leader node) and and 0 or more copies(follower nodes)
  • Kafka ensures strict ordering within a partition i.e. consumers will receive it in the order which a producer published the data to begin with

Distributing partitions across nodes

In Kafka, spreading/distributing the data over multiple machines deals with partitions (not individual records). Scenario depicted below


Partition distribution over a Kafka cluster

  • Primary partition placement: 2 partitions per node e.g. N1 will have P1,P3 & N2 will have P2, P4
  • Replica placement: one replica for each partition since the factor is 2 i.e. one replica in addition to a primary copy (total 2)

Data and Partitions: the relationship

When a producer sends data, it goes to a topic – but that’s 50,000 foot view. You must understand that

  • data is actually a key-value pair
  • its storage happens at a partition level

A key-value pair in a messaging system like Kafka might sound odd, but the key is used for intelligent and efficient data distribution within a cluster. Depending on the key, Kafka sends the data to a specific partition and ensures that its replicated as well (as per config). Thus, each record

Default behavior

The data for same key goes to same partition since Kafka uses a consistent hashing algorithm to map key to partitions. In case of a null key (yes, that’s possible), the data is randomly placed on any of the partition. If you only have one partition: ALL data goes to that single partition


Data in partitions

Custom partitioning scheme

You can plugin a custom algorithm to partition the data

  • by implementing the Partitioner interface
  • configure Kafka producer to use it

Here is an example

Generate random partitions which are within the valid partition range

Further reading


Posted in Distributed systems, Kafka | Tagged , , , , | 2 Comments

Scaling out with Kafka Consumer Groups

This blog post will cover Apache Kafka consumers and demonstrate how they are designed for distributed, scale-out architectures. It assumes you have some idea about Kafka in general. We will mainly look at

  • Kafka Consumers groups, and
  • dive into the scale out section
  • a sample application

Source code available on Github

Let’s start with a quick overview of …

Kafka Consumer groups

They ensure two important things

  • Consumer parallelism, and
  • Load balancing

(from the Kafka documentation) Co-relation b/w Consumer Groups and Topic partitions in Kafka

  • Each Kafka consumer belongs to a consumer group i.e. it can be thought of as a logical container/namespace for a bunch of consumers
  • A consumer group can choose to receive messages from one or more topics
  • Instances in a consumer group can receive messages from zero, one or more partitions within each topic (depending on the number of partitions and consumer instances)
  • Kafka makes sure that there is no overlap as far as message consumption is concerned i.e. a consumer (in a group) receives messages from exactly one partition of a specific topic
  • The partition to consumer instance assignment is internally taken care of by Kafka and this process is dynamic in nature i.e. the work is re-distributed as and when
    • new instances are added to an existing consumer group, or
    • instances are removed from the consumer group

Scaling out


From the point of view of a Kafka consumer, scalability is the ability to consume messages which are both high volume and velocity. In such a scenario, it makes sense to scale out the kafka broker layer by adding more instances. This will

  • ensure that topic partitions storage are offloaded to the new brokers – maintain system throughput
  • i.e. the new broker(s) instance(s) act as the leader for some partitions and follower (store the replica for) other partitions – maintain high availability

We also need to take care of the consumer layer – the applications should be able to keep up with the rate of production….

Possible solution

Consumer Groups: Kafka transparently load balances traffic from all partitions amongst a bunch of consumers in a group which means that a consuming application can respond to higher performance and throughput requirements by

  • simply spawning additional consumer instances within the same group, and
  • expect the load to be divided amongst them

Things to note

  • Consumer group membership  – assign groupid (in consumer logic)
    • kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
  • Consumer to Partition ratio
    • equal to 1: each consumer will receive messages from exactly one partition i.e. one-to-one co-relation
    • less than 1: some consumers might receive from more than 1 partition
    • more than 1: some consumers will remain idle
  • A Kafka Consumer is not meant for concurrent access by multiple threads



  • single node cluster (keeping things simple)
  • 4 partitions
  • start with 1 consumer and bump up to 4 consumers (increment by 1)

First and foremost

  • download Kafka and unzip it
  • configure partitions
    • edit /config/server.properties
    • find num.partitions and change it to 4
  • start things up
    • cd /bin
    • zookeeper-server-start.sh ../config/zookeeper.properties (zookeeper first)
    • kafka-server-start.sh ../config/server.properties (kafka broker node)
  • setup the Github project
    • fork the code
    • cd
    • mvn clean install

Actions & observations

  • start the producer (we’ll keep one producer for simplicity) –/target/java -jar kafka-scale.jar producer
    • the partition ID for each message is logged
  • start consumer instance #1 /target/java -jar kafka-scale.jar consumer
    • track the logs to confirm that contents of ALL of the partitions (4 in this example) of the topic are consumed by the one and only consumer instance in the group
  • start 2nd consumer instance (same command as above)- the partition load will now get (randomly) distributed across two consumer instances e.g. in the logs, you will notice that consumer 1 is receiving messages from partition 0,1 and consumer 2 will act as the sink for partition 2,3
  • start consumer #3 – Again, the load distribution will be at random, but we can be sure of is that
    • one of the consumers will receive data from any two partitions
    • the remaining (two) consumers will receive data from one partition each
    • e.g. consumer 1 will take partition 0,1 and consumers 2,3 get assigned to partitions 2,3 respectively
  • start consumer #4 – this one is easy. The logs will confirm that the load will get evenly distributed among ALL (the 4) consumers i.e. one consumer per partition

Further reading


Posted in Distributed systems, Kafka | Tagged , , , , , , | 1 Comment

Hello (distributed) world !

Hi there … ! I am the guy from ‘Thinking in Java EE‘ have been interested in distributed systems and its equivalent software solutions for some time now.. So I thought of starting off yet another blog (the one you’re reading) to help myself and others (hopefully ;-))

The plan (at least right now) is to write and discuss about a variety of distributed systems products ranging from

  • NoSQL solutions,
  • in-memory data grids,
  • general distributed computing frameworks,
  • messaging systems,
  • and maybe pure distributed system theory from time to time, if I am able to make sense out of it !
  • etc etc etc.

Let’s see how this goes. Don’t get fooled into thinking I am a distributed systems expert… but who knows, maybe one day 😉

Posted in Distributed systems | Tagged , , , | 1 Comment