KTables | Changelog topics

Kamini Kamal
Level Up Coding
Published in
6 min readJul 12, 2023

--

How can we retain the current state of the KTable while still preserving the history of events using log compaction?

To retain the current state of a KTable while preserving the history of events using log compaction, you can leverage the concept of changelog topics in Kafka Streams. The changelog topic serves as a durable, compacted log that maintains the complete history of all updates to the KTable. Here’s how you can achieve this:

  1. Enable Log Compaction: Configure the changelog topic associated with the KTable to use log compaction by setting the cleanup.policy configuration of the topic to compact.
  2. Configure the KTable: When defining the KTable in your Kafka Streams application, specify a materialized state store for the KTable using the Materialized API. This materialized state store is associated with the changelog topic and allows the application to store and retrieve the current state of the KTable.

For example, in Java:

KTable<KeyType, ValueType> kTable = builder.table(
"input-topic",
Materialized.<KeyType, ValueType>as("state-store-name")
.withKeySerde(keySerde)
.withValueSerde(valueSerde)
);

Here, "state-store-name" is the name of the materialized state store that will be associated with the KTable.

3. Query the KTable: Use the materialized state store to query the KTable and access the current state of the table. You can retrieve the current value for a specific key or perform range queries on the KTable.

ReadOnlyKeyValueStore<KeyType, ValueType> stateStore = streams.store("state-store-name", QueryableStoreTypes.keyValueStore());
ValueType currentValue = stateStore.get(key);

This allows you to retrieve the latest value associated with a particular key from the KTable’s state store.

By using the combination of log compaction and materialized state stores, you retain the current state of the KTable in a durable and compacted changelog topic, while still preserving the history of events. The materialized state store associated with the KTable provides efficient access to the current state of the table, and the log compaction ensures that the changelog topic retains all events and updates for historical analysis or recovery purposes.

Understanding Materialized API in KTables

In the Kafka Streams library, the Materialized API is used in conjunction with KTables to specify the state store configuration and options for materializing the KTable's state. It allows you to define how the state of the KTable is stored and accessed within the application. Here are the main use cases and functionalities provided by the Materialized API for KTables:

  1. State Store Configuration: With Materialized, you can specify the configuration for the underlying state store associated with the KTable. This includes properties such as the name of the state store, the serialization/deserialization serdes for keys and values, and any additional configuration options specific to the state store implementation.
Materialized.<KeyType, ValueType>as("state-store-name")
.withKeySerde(keySerde)
.withValueSerde(valueSerde)
.withCachingDisabled()
.withLoggingEnabled(Collections.emptyMap());

The above code snippet demonstrates setting the name of the state store, key and value serdes, disabling caching, and enabling logging for the state store.

2. State Store Access: Materialized provides methods to specify the type of state store and obtain a reference to it for querying or interactive access. This allows you to retrieve the current state of the KTable or perform advanced queries on the state store.

ReadOnlyKeyValueStore<KeyType, ValueType> stateStore = streams.store("state-store-name", QueryableStoreTypes.keyValueStore());
ValueType currentValue = stateStore.get(key);

Here, QueryableStoreTypes.keyValueStore() specifies that the state store is a key-value store, and streams.store() returns a reference to the specified state store.

3. Changelog Topics: When using Materialized, Kafka Streams automatically creates a changelog topic for the KTable. The changelog topic is used for durable storage and fault tolerance, ensuring that the state of the KTable can be reconstructed in the event of failures or restarts.

Materialized.<KeyType, ValueType>as("state-store-name")
.withLoggingEnabled(Collections.emptyMap());

In the example, withLoggingEnabled() ensures that the changelog topic is created and used for logging updates to the KTable.

4. Custom State Store Implementation: Materialized also allows you to specify a custom state store implementation by providing a custom StateStoreSupplier. This gives you the flexibility to use alternative storage solutions or integrate with external systems.

Materialized.<KeyType, ValueType>as("state-store-name")
.withKeySerde(keySerde)
.withValueSerde(valueSerde)
.withStateStoreSupplier(myCustomStoreSupplier);

In this case, myCustomStoreSupplier is a custom implementation of the StateStoreSupplier interface.

By using the Materialized API in Kafka Streams, you can define the configuration, behavior, and access patterns for the state stores associated with KTables. It allows you to customize and control how the state of KTables is managed, queried, and materialized within your stream processing application.

What is the retention time of changelog in KTables?

The retention time of the changelog in KTables is determined by the topic-level retention configuration in Apache Kafka. The retention configuration specifies how long Kafka should retain data in a topic before considering it eligible for deletion.

By default, Kafka uses a retention policy based on the size of the log segment files (log.segment.bytes) or the time-based retention (log.retention.ms), whichever is met first. This means that if the size of the log segment reaches a certain threshold or if the data has been in the topic for a certain amount of time, it becomes eligible for deletion.

When it comes to KTables, the changelog topic associated with a KTable follows the same retention configuration as any other Kafka topic. Therefore, the retention time for the changelog topic can be configured based on your specific requirements.

To configure the retention time for a topic, you can use the following approaches:

  1. Topic-Level Configuration: You can set the retention time for the changelog topic explicitly when creating the topic using the Kafka topic creation command or the AdminClient API. You can set the retention time based on size (retention.bytes) or time (retention.ms) as per your needs.
  2. Broker-Level Configuration: The retention configuration can also be set at the broker level using the log.retention.bytes or log.retention.ms configuration properties. This will apply the retention policy to all topics unless overridden by topic-level configurations.
  3. Dynamic Topic Configuration: Kafka also supports dynamic topic configuration changes. You can use the kafka-configs.sh command-line tool or the AdminClient API to update the retention configuration for a topic on the fly without requiring a topic recreation.

By adjusting the retention time configuration, you can control how long the changelog for a KTable is retained in Kafka. It is important to consider the balance between retention time and storage requirements to ensure that the necessary historical data is retained while managing storage resources effectively.

Considerations and challenges associated with using changelogs in KTables

There are certain considerations and challenges associated with using changelogs in KTables:

  1. Storage Requirements: Changelogs can introduce additional storage requirements. Since changelogs retain the complete history of updates, the size of the changelog topic can grow significantly over time, especially for KTables with high update rates or long retention periods. Proper capacity planning is required to ensure sufficient storage resources.
  2. Retention Period: The retention period of the changelog topic impacts the availability of historical data for state restoration. If the retention period is relatively short, historical data may be pruned, limiting the ability to restore the KTable’s state to a point in time beyond the retention period.
  3. Rebuilding State: In the event of failures or restarts, rebuilding the state of a KTable from the changelog topic can take time, especially if the topic is large or there are significant updates to process. The time required for state restoration can impact the recovery time and availability of the KTable.
  4. Data Skew: Changelogs in KTables may suffer from data skew if certain keys have a disproportionately high number of updates compared to others. This can impact the efficiency of log compaction and state restoration, as the workload may not be evenly distributed among the partitions of the changelog topic.
  5. Topic Configuration: Changelog topics inherit certain limitations and considerations associated with Kafka topics in general, such as topic partitioning, replication, and resource allocation. It is important to configure the changelog topic appropriately based on the expected workload and requirements of the KTable.

It’s important to carefully consider these factors when using changelogs in KTables to ensure efficient storage utilization, proper retention settings, efficient state restoration, and effective management of data skew. Proper monitoring and maintenance practices are also necessary to ensure the continued reliability and performance of the KTable and its associated changelog topic.

--

--