Interview

10 Kafka Streams Interview Questions and Answers

Prepare for your next interview with this guide on Kafka Streams, covering common and advanced questions to enhance your stream processing knowledge.

Kafka Streams is a powerful library for building real-time, scalable, and fault-tolerant stream processing applications. It leverages Apache Kafka to process and analyze data streams, making it an essential tool for handling large-scale data flows in various industries. Kafka Streams simplifies the development of stream processing applications by providing a high-level DSL and integrating seamlessly with existing Kafka infrastructure.

This guide offers a curated list of common and advanced Kafka Streams interview questions. Reviewing these questions will help you gain a deeper understanding of Kafka Streams, enhancing your ability to tackle technical interviews and demonstrate your expertise in stream processing.

Kafka Streams Interview Questions and Answers

1. Explain the difference between KStream and KTable. Provide an example use case for each.

KStream and KTable are core abstractions in Kafka Streams for processing and transforming data.

  • KStream: Represents a stream of records where each record is a key-value pair. It is an unbounded, continuously updating sequence of records, suitable for event-driven applications where each event is processed individually.
  • KTable: Represents a changelog stream, where each record updates a value for a specific key. It is a snapshot of the latest value for each key, suitable for applications that need to maintain state, such as aggregations or joins.

Example use case for KStream:

  • Real-time monitoring of user activities on a website, where each user action (click, view, etc.) is processed as an individual event.

Example use case for KTable:

  • Maintaining a count of the number of times each product has been viewed on an e-commerce site, where the count is continuously updated.

Example code:

StreamsBuilder builder = new StreamsBuilder();

// KStream example
KStream<String, String> userActions = builder.stream("user-actions-topic");
userActions.filter((key, value) -> value.contains("click"))
           .to("filtered-user-actions-topic");

// KTable example
KTable<String, Long> productViews = builder.table("product-views-topic");
productViews.groupBy((key, value) -> KeyValue.pair(value, key))
            .count()
            .toStream()
            .to("product-view-counts-topic");

2. Write a simple Kafka Streams application that reads from one topic, processes the data, and writes to another topic.

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It allows for real-time processing of data streams with high throughput and low latency.

Here is a simple Kafka Streams application that reads from one topic, processes the data by converting all text to uppercase, and writes the processed data to another topic:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class SimpleKafkaStreamsApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> sourceStream = builder.stream("input-topic");
        KStream<String, String> processedStream = sourceStream.mapValues(value -> value.toUpperCase());
        processedStream.to("output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

3. What are the different types of joins available in Kafka Streams? Write a code snippet to demonstrate an inner join.

Kafka Streams provides several types of joins to combine records from two streams or a stream and a table. The main types of joins available are:

  • Inner Join: Combines records with matching keys in both streams or tables.
  • Left Join: Combines records with matching keys in both streams or tables, but also includes records from the left stream/table that do not have a match in the right stream/table.
  • Outer Join: Combines records with matching keys in both streams or tables, and also includes records from both streams/tables that do not have a match in the other stream/table.

Here is a code snippet to demonstrate an inner join in Kafka Streams:

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> leftStream = builder.stream("left-topic");
KStream<String, String> rightStream = builder.stream("right-topic");

KStream<String, String> joinedStream = leftStream.join(
    rightStream,
    (leftValue, rightValue) -> leftValue + "-" + rightValue,
    JoinWindows.of(Duration.ofMinutes(5)),
    Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
);

joinedStream.to("output-topic");

KafkaStreams streams = new KafkaStreams(builder.build(), new Properties());
streams.start();

4. Explain the concept of windowing in Kafka Streams. Write a code snippet to perform a tumbling window aggregation.

Windowing in Kafka Streams is used to group records that arrive within a certain time frame. This is essential for time-based operations like aggregations, joins, and windowed state stores. Tumbling windows are a type of window that are fixed in size and do not overlap. Each event belongs to exactly one window.

Here is a code snippet to perform a tumbling window aggregation in Kafka Streams:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("input-topic");

KTable<Windowed<String>, Long> tumblingWindowCounts = stream
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
    .count(Materialized.as("tumbling-window-counts"));

tumblingWindowCounts.toStream().to("output-topic", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));

5. Write a Kafka Streams application that performs a stateful transformation using a custom state store.

Stateful transformations in Kafka Streams involve operations that require maintaining state across multiple records, such as aggregations, joins, and windowed computations. Custom state stores allow developers to define their own storage mechanisms for maintaining this state, providing flexibility beyond the built-in state stores.

Here is an example of a Kafka Streams application that performs a stateful transformation using a custom state store:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;

import java.util.Properties;

public class CustomStateStoreExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "custom-state-store-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream("input-topic");

        StateStoreSupplier<KeyValueStore<String, String>> customStoreSupplier = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("custom-store"),
                Serdes.String(),
                Serdes.String()
        );

        builder.addStateStore(customStoreSupplier);

        stream.transform(() -> new CustomTransformer("custom-store"), "custom-store")
              .to("output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

In this example, a custom state store named “custom-store” is created using the Stores.keyValueStoreBuilder method. The CustomTransformer class (not shown here) would implement the logic for the stateful transformation, utilizing the custom state store.

6. What is the role of SerDes in Kafka Streams? How do you implement a custom SerDe?

In Kafka Streams, SerDes (Serializer/Deserializer) are used to convert data between its byte array format (used for transmission and storage) and its object format (used for processing). This is essential because Kafka Streams processes data in a distributed environment, and data needs to be serialized for transmission over the network and deserialized for processing.

To implement a custom SerDe, you need to create a class that implements the Serde interface, which in turn requires implementing both the Serializer and Deserializer interfaces. This allows you to define how your specific data types should be serialized and deserialized.

Example:

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

public class CustomSerde implements Serde<CustomObject> {

    @Override
    public Serializer<CustomObject> serializer() {
        return new CustomSerializer();
    }

    @Override
    public Deserializer<CustomObject> deserializer() {
        return new CustomDeserializer();
    }
}

class CustomSerializer implements Serializer<CustomObject> {
    @Override
    public byte[] serialize(String topic, CustomObject data) {
        // Implement serialization logic
    }
}

class CustomDeserializer implements Deserializer<CustomObject> {
    @Override
    public CustomObject deserialize(String topic, byte[] data) {
        // Implement deserialization logic
    }
}

7. Write a Kafka Streams application that integrates with an external database for enriching stream data.

To integrate Kafka Streams with an external database for enriching stream data, you can use a combination of Kafka Streams API and a database client. The idea is to process the incoming stream data and, for each record, query the external database to fetch additional information that can be appended to the stream data.

Here is a high-level overview of how you can achieve this:

  • Create a Kafka Streams application.
  • Define a stream processing topology.
  • For each record in the stream, query the external database to fetch the enrichment data.
  • Append the enrichment data to the original stream data.
  • Write the enriched data back to another Kafka topic or perform further processing.

Example:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class KafkaStreamsEnrichmentApp {

    public static void main(String[] args) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> sourceStream = builder.stream("input-topic");

        KStream<String, String> enrichedStream = sourceStream.mapValues(new ValueMapper<String, String>() {
            @Override
            public String apply(String value) {
                // Query the external database
                String enrichedValue = value;
                try (Connection connection = DriverManager.getConnection("jdbc:your_database_url", "username", "password")) {
                    PreparedStatement statement = connection.prepareStatement("SELECT additional_info FROM your_table WHERE key = ?");
                    statement.setString(1, value);
                    ResultSet resultSet = statement.executeQuery();
                    if (resultSet.next()) {
                        enrichedValue += ", " + resultSet.getString("additional_info");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return enrichedValue;
            }
        });

        enrichedStream.to("output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), new Properties());
        streams.start();
    }
}

In this example, the Kafka Streams application reads data from an input topic, queries an external database to fetch additional information, and writes the enriched data to an output topic. The ValueMapper is used to transform each record by appending the enrichment data fetched from the database.

8. How do you handle error handling and monitoring in Kafka Streams applications?

Error handling and monitoring in Kafka Streams applications are important for ensuring the reliability and stability of the data processing pipeline. Kafka Streams provides several mechanisms to handle errors and monitor the application effectively.

For error handling, Kafka Streams allows you to define custom error handlers for deserialization and processing errors. You can use the DeserializationExceptionHandler and ProductionExceptionHandler interfaces to implement custom logic for handling these errors. Additionally, you can configure the default.deserialization.exception.handler and default.production.exception.handler properties to specify the error handlers.

Monitoring Kafka Streams applications can be achieved using various tools and techniques. Kafka Streams exposes metrics through JMX (Java Management Extensions), which can be integrated with monitoring systems like Prometheus, Grafana, or any other JMX-compatible monitoring tool. These metrics provide insights into the application’s performance, such as throughput, latency, and error rates.

Furthermore, Kafka Streams applications can be instrumented with logging frameworks like SLF4J to log important events and errors. This logging can be configured to capture detailed information about the application’s behavior, which can be useful for debugging and monitoring purposes.

9. Explain the concept of interactive queries in Kafka Streams. Provide an example use case.

Interactive queries in Kafka Streams enable you to query the state of your stream processing application directly. This is useful for applications that need real-time insights or need to access the state store for additional processing. By enabling interactive queries, you can expose the state store to external applications, allowing them to query the state directly without having to go through the Kafka topic.

Example use case:
Consider a real-time analytics dashboard for an e-commerce platform. The platform processes user activity streams to calculate metrics like the number of active users, total sales, and product views. By using interactive queries, the dashboard can directly query the state store to get the latest metrics without waiting for the data to be written back to Kafka and then read again.

Example code snippet:

StreamsBuilder builder = new StreamsBuilder();
KTable<String, Long> userCounts = builder.table("user-activity-topic", Materialized.as("user-store"));

KafkaStreams streams = new KafkaStreams(builder.build(), new Properties());
streams.start();

// Expose the state store via a REST API
public class UserActivityService {
    private final KafkaStreams streams;

    public UserActivityService(KafkaStreams streams) {
        this.streams = streams;
    }

    public Long getUserCount(String userId) {
        ReadOnlyKeyValueStore<String, Long> keyValueStore =
            streams.store(StoreQueryParameters.fromNameAndType("user-store", QueryableStoreTypes.keyValueStore()));
        return keyValueStore.get(userId);
    }
}

10. How do you test Kafka Streams applications to ensure reliability and correctness?

Testing Kafka Streams applications involves multiple layers to ensure reliability and correctness. The primary strategies include:

  • Unit Testing: This involves testing individual components of the Kafka Streams application in isolation. The Kafka Streams TestUtils library is particularly useful for this purpose, allowing you to create mock streams and test the logic of your stream processing.
  • Integration Testing: This type of testing ensures that different components of the Kafka Streams application work together as expected. It often involves setting up a local Kafka cluster and running the application to verify its behavior.
  • End-to-End Testing: This involves testing the entire Kafka Streams application in a production-like environment to ensure that it behaves correctly under real-world conditions.

Example of Unit Testing using Kafka Streams TestUtils:

import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class KafkaStreamsTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

        Topology topology = buildTopology(); // Assume this method builds your Kafka Streams topology
        TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

        ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
        testDriver.pipeInput(recordFactory.create("input-topic", "key", "value"));

        OutputVerifier.compareKeyValue(testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer()), "key", "processed-value");

        testDriver.close();
    }
}
Previous

10 Kendo UI Interview Questions and Answers

Back to Interview
Next

15 Fortigate Firewall Interview Questions and Answers