Log compaction for KTables

Kamini Kamal
Level Up Coding
Published in
6 min readJul 12, 2023

--

[ src: http://cloudurable.com/blog/kafka-architecture-log-compaction/index.html ]

Log compaction is a feature provided by Apache Kafka that ensures efficient and reliable data storage and retrieval in a Kafka topic. In Kafka, topics are divided into partitions, and each partition maintains an ordered log of records.

The purpose of log compaction is to retain the latest value for each key in a compacted topic, while removing older versions of the same key. This differs from the traditional approach of retaining all records indefinitely. With log compaction, the log is compacted by retaining only the most recent record for each key, eliminating duplicate records with the same key.

The key idea behind log compaction is to enable the topic to function as a changelog or key-value store, where the most recent value for each key is always available. It is particularly useful for maintaining up-to-date materialized views, maintaining the latest state of a table, or storing key-value pairs that represent the current state of an entity.

The compaction process operates in the background, continuously monitoring the log and removing obsolete records. It ensures that the compacted topic retains the complete history of updates while effectively utilizing storage resources.

The benefits of log compaction include:

  1. Efficient Storage: Log compaction reduces storage requirements by retaining only the latest record for each key, eliminating redundant data.
  2. Point Lookups: With log compaction, it becomes more efficient to perform point lookups, as only the latest record for each key needs to be accessed.
  3. Reliable State Restoration: Log compaction allows for reliable state restoration in the event of failures or restarts. The compacted topic can be replayed to rebuild the most recent state for each key.
  4. High-Level Abstractions: Log compaction provides a foundation for higher-level abstractions such as KTables in the Kafka Streams library, enabling efficient stateful processing and maintaining the current state for each key.

For further details, visit this link https://medium.com/gitconnected/compaction-of-topic-data-in-kafka-95a57f2aa6c9

Benefits of log compaction in KTables

Log compaction in KTables, which is a feature provided by Apache Kafka, offers several benefits:

  1. Efficient Use of Storage: Log compaction helps to optimize storage usage by retaining only the latest record for each key in the KTable. Instead of storing the complete change log for every update, log compaction retains the most recent value for each key while discarding older versions. This significantly reduces storage requirements, especially for KTables with high update rates or long retention periods.
  2. Reliable State Restoration: With log compaction, it becomes easier to restore the state of a KTable from a compacted topic. By replaying the compacted topic, you can rebuild the most recent state for each key in the KTable. This simplifies the process of recovering and restoring the state of the application in the event of failures or restarts.
  3. Efficient Point Lookups: Log compaction ensures that the KTable retains only the latest value for each key. This makes point lookups more efficient and faster since you can directly access the latest value for a specific key without the need to scan through multiple versions of the record. This is particularly useful when performing key-based operations or querying the KTable for real-time lookups.
  4. Compact Change Logs: Log compaction ensures that the change log topics associated with KTables are compacted. This means that the change log topics will retain only the latest version of each record for each key. Compact change logs not only reduce storage overhead but also enable efficient consumption and processing of change log topics during recovery or state restoration.
  5. Smaller Replication Traffic: When KTables are replicated across multiple Kafka brokers, log compaction helps reduce the replication traffic. Since only the latest version of each record is retained in the compacted topics, the replication of unnecessary data is minimized. This improves the overall efficiency of replication and reduces network bandwidth consumption.
  6. Compatibility with Event Sourcing: Log compaction is well-suited for event sourcing architectures, where events are stored as an append-only log. By compacting the event log, you can retain the current state of the KTable while still preserving the history of events. This allows you to reconstruct the past state and perform historical analysis if needed.

Enable log compaction in KTables

To enable log compaction for a Kafka topic, including topics used as KTables, you need to configure the topic with the appropriate settings. Here’s how you can enable log compaction for KTables:

  1. Topic Configuration: Set the cleanup.policy configuration to compact for the topic used as a KTable. This configuration tells Kafka to use log compaction for the topic. You can set this configuration either through Kafka command-line tools or through Kafka client configurations.
  2. For example, using the Kafka command-line tool:
kafka-topics.sh --zookeeper <zookeeper-address> --alter --topic <topic-name> --config cleanup.policy=compact

Alternatively, you can set the configuration programmatically using Kafka client properties:

Properties props = new Properties();
props.put("cleanup.policy", "compact");
// Set additional properties as required

KafkaAdminClient adminClient = KafkaAdminClient.create(props);
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "<topic-name>");
ConfigEntry cleanupPolicyConfig = new ConfigEntry("cleanup.policy", "compact");
adminClient.alterConfigs(Collections.singletonMap(topicResource, Collections.singletonList(cleanupPolicyConfig)));

2. Ensure Proper Key Usage: For effective log compaction in KTables, it’s important to use a key for each record consistently. The key is used to determine which records are retained during compaction. If records with the same key are produced, only the latest record will be retained. Ensure that the key is meaningful and unique for each record to achieve the desired compaction behavior.

3. Compact the Existing Data: If you have existing data in the topic used as a KTable and want to apply log compaction to it, you can trigger a manual log compaction process by modifying the topic’s configuration and forcing a reassignment of partitions. This process can be done using the kafka-topics.sh command-line tool or programmatically through the KafkaAdminClient.

kafka-topics.sh --zookeeper <zookeeper-address> --alter --topic <topic-name> --config cleanup.policy=compact --reassign

Note that triggering a reassignment will result in some downtime, as Kafka needs to rebuild the log compacted segments.

By enabling log compaction for the Kafka topic used as a KTable and following the appropriate key usage practices, you can ensure that the KTable benefits from the advantages provided by log compaction, such as efficient storage, reliable state restoration, and faster point lookups.

How can we retain the current state of the KTable while still preserving the history of events using log compaction?

To retain the current state of a KTable while preserving the history of events using log compaction, you can leverage the concept of changelog topics in Kafka Streams. The changelog topic serves as a durable, compacted log that maintains the complete history of all updates to the KTable. Here’s how you can achieve this:

  1. Enable Log Compaction: Configure the changelog topic associated with the KTable to use log compaction by setting the cleanup.policy configuration of the topic to compact.
  2. Configure the KTable: When defining the KTable in your Kafka Streams application, specify a materialized state store for the KTable using the Materialized API. This materialized state store is associated with the changelog topic and allows the application to store and retrieve the current state of the KTable.

For example, in Java:

KTable<KeyType, ValueType> kTable = builder.table(
"input-topic",
Materialized.<KeyType, ValueType>as("state-store-name")
.withKeySerde(keySerde)
.withValueSerde(valueSerde)
);

Here, "state-store-name" is the name of the materialized state store that will be associated with the KTable.

3. Query the KTable: Use the materialized state store to query the KTable and access the current state of the table. You can retrieve the current value for a specific key or perform range queries on the KTable.

ReadOnlyKeyValueStore<KeyType, ValueType> stateStore = streams.store("state-store-name", QueryableStoreTypes.keyValueStore());
ValueType currentValue = stateStore.get(key);

This allows you to retrieve the latest value associated with a particular key from the KTable’s state store.

By using the combination of log compaction and materialized state stores, you retain the current state of the KTable in a durable and compacted changelog topic, while still preserving the history of events. The materialized state store associated with the KTable provides efficient access to the current state of the table, and the log compaction ensures that the changelog topic retains all events and updates for historical analysis or recovery purposes.

--

--