10 Kafka Streams Interview Questions and Answers
Prepare for your next interview with this guide on Kafka Streams, covering common and advanced questions to enhance your stream processing knowledge.
Prepare for your next interview with this guide on Kafka Streams, covering common and advanced questions to enhance your stream processing knowledge.
Kafka Streams is a powerful library for building real-time, scalable, and fault-tolerant stream processing applications. It leverages Apache Kafka to process and analyze data streams, making it an essential tool for handling large-scale data flows in various industries. Kafka Streams simplifies the development of stream processing applications by providing a high-level DSL and integrating seamlessly with existing Kafka infrastructure.
This guide offers a curated list of common and advanced Kafka Streams interview questions. Reviewing these questions will help you gain a deeper understanding of Kafka Streams, enhancing your ability to tackle technical interviews and demonstrate your expertise in stream processing.
KStream and KTable are core abstractions in Kafka Streams for processing and transforming data.
Example use case for KStream:
Example use case for KTable:
Example code:
StreamsBuilder builder = new StreamsBuilder(); // KStream example KStream<String, String> userActions = builder.stream("user-actions-topic"); userActions.filter((key, value) -> value.contains("click")) .to("filtered-user-actions-topic"); // KTable example KTable<String, Long> productViews = builder.table("product-views-topic"); productViews.groupBy((key, value) -> KeyValue.pair(value, key)) .count() .toStream() .to("product-view-counts-topic");
Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It allows for real-time processing of data streams with high throughput and low latency.
Here is a simple Kafka Streams application that reads from one topic, processes the data by converting all text to uppercase, and writes the processed data to another topic:
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import java.util.Properties; public class SimpleKafkaStreamsApp { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-example"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> sourceStream = builder.stream("input-topic"); KStream<String, String> processedStream = sourceStream.mapValues(value -> value.toUpperCase()); processedStream.to("output-topic"); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
Kafka Streams provides several types of joins to combine records from two streams or a stream and a table. The main types of joins available are:
Here is a code snippet to demonstrate an inner join in Kafka Streams:
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> leftStream = builder.stream("left-topic"); KStream<String, String> rightStream = builder.stream("right-topic"); KStream<String, String> joinedStream = leftStream.join( rightStream, (leftValue, rightValue) -> leftValue + "-" + rightValue, JoinWindows.of(Duration.ofMinutes(5)), Joined.with(Serdes.String(), Serdes.String(), Serdes.String()) ); joinedStream.to("output-topic"); KafkaStreams streams = new KafkaStreams(builder.build(), new Properties()); streams.start();
Windowing in Kafka Streams is used to group records that arrive within a certain time frame. This is essential for time-based operations like aggregations, joins, and windowed state stores. Tumbling windows are a type of window that are fixed in size and do not overlap. Each event belongs to exactly one window.
Here is a code snippet to perform a tumbling window aggregation in Kafka Streams:
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> stream = builder.stream("input-topic"); KTable<Windowed<String>, Long> tumblingWindowCounts = stream .groupByKey() .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) .count(Materialized.as("tumbling-window-counts")); tumblingWindowCounts.toStream().to("output-topic", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));
Stateful transformations in Kafka Streams involve operations that require maintaining state across multiple records, such as aggregations, joins, and windowed computations. Custom state stores allow developers to define their own storage mechanisms for maintaining this state, providing flexibility beyond the built-in state stores.
Here is an example of a Kafka Streams application that performs a stateful transformation using a custom state store:
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import java.util.Properties; public class CustomStateStoreExample { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "custom-state-store-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> stream = builder.stream("input-topic"); StateStoreSupplier<KeyValueStore<String, String>> customStoreSupplier = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("custom-store"), Serdes.String(), Serdes.String() ); builder.addStateStore(customStoreSupplier); stream.transform(() -> new CustomTransformer("custom-store"), "custom-store") .to("output-topic"); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } }
In this example, a custom state store named “custom-store” is created using the Stores.keyValueStoreBuilder
method. The CustomTransformer
class (not shown here) would implement the logic for the stateful transformation, utilizing the custom state store.
In Kafka Streams, SerDes (Serializer/Deserializer) are used to convert data between its byte array format (used for transmission and storage) and its object format (used for processing). This is essential because Kafka Streams processes data in a distributed environment, and data needs to be serialized for transmission over the network and deserialized for processing.
To implement a custom SerDe, you need to create a class that implements the Serde
interface, which in turn requires implementing both the Serializer
and Deserializer
interfaces. This allows you to define how your specific data types should be serialized and deserialized.
Example:
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; public class CustomSerde implements Serde<CustomObject> { @Override public Serializer<CustomObject> serializer() { return new CustomSerializer(); } @Override public Deserializer<CustomObject> deserializer() { return new CustomDeserializer(); } } class CustomSerializer implements Serializer<CustomObject> { @Override public byte[] serialize(String topic, CustomObject data) { // Implement serialization logic } } class CustomDeserializer implements Deserializer<CustomObject> { @Override public CustomObject deserialize(String topic, byte[] data) { // Implement deserialization logic } }
To integrate Kafka Streams with an external database for enriching stream data, you can use a combination of Kafka Streams API and a database client. The idea is to process the incoming stream data and, for each record, query the external database to fetch additional information that can be appended to the stream data.
Here is a high-level overview of how you can achieve this:
Example:
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.ValueMapper; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; public class KafkaStreamsEnrichmentApp { public static void main(String[] args) { StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> sourceStream = builder.stream("input-topic"); KStream<String, String> enrichedStream = sourceStream.mapValues(new ValueMapper<String, String>() { @Override public String apply(String value) { // Query the external database String enrichedValue = value; try (Connection connection = DriverManager.getConnection("jdbc:your_database_url", "username", "password")) { PreparedStatement statement = connection.prepareStatement("SELECT additional_info FROM your_table WHERE key = ?"); statement.setString(1, value); ResultSet resultSet = statement.executeQuery(); if (resultSet.next()) { enrichedValue += ", " + resultSet.getString("additional_info"); } } catch (Exception e) { e.printStackTrace(); } return enrichedValue; } }); enrichedStream.to("output-topic"); KafkaStreams streams = new KafkaStreams(builder.build(), new Properties()); streams.start(); } }
In this example, the Kafka Streams application reads data from an input topic, queries an external database to fetch additional information, and writes the enriched data to an output topic. The ValueMapper
is used to transform each record by appending the enrichment data fetched from the database.
Error handling and monitoring in Kafka Streams applications are important for ensuring the reliability and stability of the data processing pipeline. Kafka Streams provides several mechanisms to handle errors and monitor the application effectively.
For error handling, Kafka Streams allows you to define custom error handlers for deserialization and processing errors. You can use the DeserializationExceptionHandler
and ProductionExceptionHandler
interfaces to implement custom logic for handling these errors. Additionally, you can configure the default.deserialization.exception.handler
and default.production.exception.handler
properties to specify the error handlers.
Monitoring Kafka Streams applications can be achieved using various tools and techniques. Kafka Streams exposes metrics through JMX (Java Management Extensions), which can be integrated with monitoring systems like Prometheus, Grafana, or any other JMX-compatible monitoring tool. These metrics provide insights into the application’s performance, such as throughput, latency, and error rates.
Furthermore, Kafka Streams applications can be instrumented with logging frameworks like SLF4J to log important events and errors. This logging can be configured to capture detailed information about the application’s behavior, which can be useful for debugging and monitoring purposes.
Interactive queries in Kafka Streams enable you to query the state of your stream processing application directly. This is useful for applications that need real-time insights or need to access the state store for additional processing. By enabling interactive queries, you can expose the state store to external applications, allowing them to query the state directly without having to go through the Kafka topic.
Example use case:
Consider a real-time analytics dashboard for an e-commerce platform. The platform processes user activity streams to calculate metrics like the number of active users, total sales, and product views. By using interactive queries, the dashboard can directly query the state store to get the latest metrics without waiting for the data to be written back to Kafka and then read again.
Example code snippet:
StreamsBuilder builder = new StreamsBuilder(); KTable<String, Long> userCounts = builder.table("user-activity-topic", Materialized.as("user-store")); KafkaStreams streams = new KafkaStreams(builder.build(), new Properties()); streams.start(); // Expose the state store via a REST API public class UserActivityService { private final KafkaStreams streams; public UserActivityService(KafkaStreams streams) { this.streams = streams; } public Long getUserCount(String userId) { ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(StoreQueryParameters.fromNameAndType("user-store", QueryableStoreTypes.keyValueStore())); return keyValueStore.get(userId); } }
Testing Kafka Streams applications involves multiple layers to ensure reliability and correctness. The primary strategies include:
Example of Unit Testing using Kafka Streams TestUtils:
import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.streams.test.OutputVerifier; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Properties; public class KafkaStreamsTest { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); Topology topology = buildTopology(); // Assume this method builds your Kafka Streams topology TopologyTestDriver testDriver = new TopologyTestDriver(topology, props); ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); testDriver.pipeInput(recordFactory.create("input-topic", "key", "value")); OutputVerifier.compareKeyValue(testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer()), "key", "processed-value"); testDriver.close(); } }