This blog explores some common aspects of state stores in Kafka Streams…
Default state store
By default, Kafka Streams uses the RocksDB as it’s default state store
In-memory or persistent ?
This parameter of the state store is configurable. RocksDB can work in both modes and you can toggle this using the Stores
factory API.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Stores.KeyValueFactory<String,Double> baseKVFactory = Stores.create("my-state-store") | |
.withStringKeys() | |
.withDoubleValues(); | |
Stores.InMemoryKeyValueFactory<String, Double> inMemoryKVFactory = baseKVFactory.inMemory(); //ON-heap | |
StateStoreSupplier imMemoryStateStoreSupplier = inMemoryKVFactory.build(); | |
Stores.PersistentValueFactory<String, Double> persistentKVFactory = baseKVFactory.persistent(); //OFF-heap | |
StateStoreSupplier persistentStateStoreSupplier = persistentKVFactory.build(); |
Once the StateStoreSupplier
(api) is created it can be used in the (high level) Kafka Streams DSL API as well as the (low level) Processor API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//DSL API | |
KStream<String,Double> streamFromKafka = ….; // from kafka topic | |
KGroupedStream<String,Double> groupedStream = streamFromKafka.groupByKey(); | |
//options | |
groupedStream.count(inMemoryKVFactory); OR | |
groupedStream.count(persistentKVFactory); OR | |
//Processor API – use TopologyBuilder to associate state stores | |
TopologyBuilder builder = new TopologyBuilder(); | |
//options | |
builder.addStateStore(inMemoryKVFactory, "processor-1"); OR | |
builder.addStateStore(persistentKVFactory, "processor-1"); | |
//get a handle to the store using ProcessorContext | |
ProcessorContext pc = … ; | |
KeyValueStore<String, Double> kvStateStore = (KeyValueStore<String, Double>) pc.getStateStore("my-state-store"); |
Persistent storage medium
In case of persistent stores, RocksDB (default) flushes the state store contents to the file system which can be specified by the StreamsConfig.STATE_DIR_CONFIG
(this is not a compulsory parameter)
config.put(StreamsConfig.STATE_DIR_CONFIG, "my-state-store")
Notes
- this state store is managed by Kafka Streams internally
- it is also replicated to Kafka (for fault tolerance and elasticity) topic – this is log compacted topic and nothing but a changelog of the local state store contents (this is the default behavior which is also configurable using the
enableLogging
method or can be turned off usingdisableLogging
- it is possible to query these state stores using interactive queries feature
DSL vs Processor API
There are differences in the way DSL and Processor APIs handle state stores
Read-only vs writable stores
the DSL API restricts access to a read-only view of the state store (via ReadOnlyKeyValueStore
) as opposed to the Processor API using which you can get access to a writable view which allows you to mutate the state store
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//DSL API | |
KafkaStreams ks = …; | |
ReadOnlyKeyValueStore<String, Double> myStore = ks.store(storeName, | |
QueryableStoreTypes.<String, Double>keyValueStore()); | |
//Processor API | |
ProcessorContext pc = … ; | |
KeyValueStore<String, Double> kvStateStore = (KeyValueStore<String, Double>) pc.getStateStore("my-state-store"); | |
kvStateStore.put("my-key",2); //mutate operation |
Custom state stores
As of now (0.10.2) it’s not possible to plugin your custom state store implementation when using the DSL API. This means that with the DSL API, you are limited to the RocksDB based state store implementation. With the Processor API, it’s possible to configure a custom implementation (more on this below)
Custom state stores
As mentioned earlier, the Processor API gives you the freedom to use your own state store in a streams application. You can wrap your custom state store on top of the Kafka Streams API itself – by implementing the required interfaces like StateStore
, StateStoreSupplier
etc. More details here. Doing this will allow you to query the state store using standard Kafka Streams APIs
But …..
A custom state implementation might already have a query feature.. So does it still make sense to wrap it with the Kafka Streams interfaces/APIs ?
the answer is yes.. if…
you want to leverage the fault tolerance and elastic capabilities which are come for free (if configured using aforementioned parameters) with the Kafka Streams API – this is due to the changelog topic which is created in Kafka (corresponding to each state store). A custom state store implementation which is not based on the Streams API
- will not interact with the Kafka broker to backup the state stores, as a result of which
- task re-distribution during scale-in/scale-out will not be possible since the participating application nodes will not be able to synchronize the latest changes made to other local stores due to the lack of a global checkpoint
Note that this problem can be circumvented with the help of an inherently distributed state store implementation which all the stream processing application instances can access
Other stuff
- Kafka examples on github
- Other kafka blogs
Cheers!
Pingback: Last week in Stream Processing & Analytics – 27.03.2017 | Enjoy IT - SOA, Java, Event-Driven Computing and Integration
How can you implement the processor?
LikeLike
Use the processor APIs
LikeLike