Tuesday, January 23, 2018

Apache Kafka - getting started. Simple java project.

0. Intro

Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.

From wiki
The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. Its storage layer is essentially a "massively scalable pub/sub message queue architected as a distributed transaction log,"[3] making it highly valuable for enterprise infrastructures to process streaming data. Additionally, Kafka connects to external systems (for data import/export) via Kafka Connect and provides Kafka Streams, a Java stream processing library.
The design is heavily influenced by transaction logs.[4]


Last time Apache Kafka is getting more and more popular. With growing popularity of event-sourcing concept, more and more developers are switching to Kafka as primary storage of events. Kafka has everything for this: it's very hast, compact, scalable, "user-friendly"....
In this post I'll show basic simple operations like "send"(by producer) and "receive"(by consumer) messages.


1. Downloading and running kafka

This page is explaining very well how to download and run kafka. If you're using Windows, you can use next commands from "bin/windows" folder:

Run these commands from your Kafka root folder:
cd bin/windows
Then run Zookeper server:
zookeeper-server-start.bat ../../config/zookeeper.properties
Then run Kafka server:
kafka-server-start.bat ../../config/server.properties

Now when kafka is running you can check it by creating a topic and getting topic list:

Create a topic:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test0
List topics:
kafka-topics.bat --list --zookeeper localhost:2181

Response should be something like:
D:\Projects\kafka_2.11-1.0.0\bin\windows>kafka-topics.bat --list --zookeeper localhost:2181
test0


2. Project structure

Our project structure is very simple: we need just 2 files MessageProducer and MessageConsumer. 

build.gradle file:

group 'com.demien'version '1.0-SNAPSHOT'
apply plugin: 'java'
sourceCompatibility = 1.8
repositories {
    mavenCentral()
}

dependencies {
    compile 'org.apache.kafka:kafka-clients:0.9.0.0'    compile 'org.slf4j:slf4j-api:1.7.12'    compile 'org.slf4j:slf4j-log4j12:1.7.12'    compile 'log4j:log4j:1.2.17'

    testCompile group: 'junit', name: 'junit', version: '4.11'}



3. Producer

It designed as generic by KEY,VALUE types. Also I added to constructor optional messageSentCallback parameter - this callBack will be called when message was sent.


package com.demien.kafka;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Date;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Consumer;

public class MessageProducer<K, V> {

    private final Producer kafkaProducer;
    private final String topicName;
    private final Consumer<RecordMetadata> messageSentCallback;

    public MessageProducer(String topicName) {
        this(topicName, null);
    }

    public MessageProducer(String topicName, Consumer<RecordMetadata> messageSentCallback) {
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        this.kafkaProducer = new KafkaProducer(configProperties);
        this.topicName = topicName;
        this.messageSentCallback = messageSentCallback;
    }


    public void sendMessage(K key, V value) {
        ProducerRecord<K, V> rec = new ProducerRecord<K, V>(topicName, key, value);
        Future<RecordMetadata> future = kafkaProducer.send(rec);
        if (messageSentCallback != null) {
            CompletableFuture.supplyAsync(() -> {
                try {
                    RecordMetadata recordMetadata = future.get();
                    messageSentCallback.accept(recordMetadata);
                } catch (Exception e) {
                }
                return null;
            });
        }
    }

    public void close() {
        kafkaProducer.close();
    }


    public static void main(String[] args) throws InterruptedException {
        MessageProducer<String, String> testProducer = new MessageProducer<String, String>("test0", (recordMetadata) -> {
            System.out.println("Message was sent: offset:" + recordMetadata.offset() + " partition:" + recordMetadata.partition() + " topic:" + recordMetadata.topic());
        });
        testProducer.sendMessage(null, "Test 1 " + new Date().toString());
        testProducer.sendMessage(null, "Test 2 " + new Date().toString());
        testProducer.sendMessage(null, "Test 3 " + new Date().toString());
        testProducer.close();
    }


}



4. Consumer

This class is more complicated, because it's designed to deal with the offsets for reading the data.
Consumer can start reading form the beginning, from the end, or from provided offset. That is why constructor is so complicated. Method for receiving messages is pretty simple. Supplier for cuncumed messages is provided in constructor.

package com.demien.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Properties;

import java.util.function.BiConsumer;

public class MessageConsumer<K, V> {
    private final String topic;
    private final String groupId;
    private final long startingOffset;
    private final KafkaConsumer<K, V> kafkaConsumer;

    public MessageConsumer(String topic, String groupId) {
        this(topic, groupId, -1);
    }

    /**     * @param topic - id of topic     * @param groupId - id of consumer group     * @param startingOffset - offset to read messages. 0 - from the beginning.      *                       -1 - from the end. other values - start reading from this value                            */    public MessageConsumer(String topic, String groupId, long startingOffset) {
        this.topic = topic;
        this.groupId = groupId;
        this.startingOffset = startingOffset;

        Properties configProperties = new Properties();
        configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "testClient");
        configProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        kafkaConsumer = new KafkaConsumer<>(configProperties);

        kafkaConsumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.printf("%s topic-partitions are revoked from this consumer\n", Arrays.toString(partitions.toArray()));
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.printf("%s topic-partitions are assigned to this consumer\n", Arrays.toString(partitions.toArray()));
                Iterator<TopicPartition> topicPartitionIterator = partitions.iterator();
                while (topicPartitionIterator.hasNext()) {
                    TopicPartition topicPartition = topicPartitionIterator.next();
                    System.out.println("Current offset is " + kafkaConsumer.position(topicPartition) + " committed offset is ->" + kafkaConsumer.committed(topicPartition));
                    if (MessageConsumer.this.startingOffset == 0) {
                        System.out.println("Setting offset to begining");

                        kafkaConsumer.seekToBeginning(topicPartition);
                    } else if (MessageConsumer.this.startingOffset == -1) {
                        System.out.println("Setting it to the end ");

                        kafkaConsumer.seekToEnd(topicPartition);
                    } else {
                        System.out.println("Resetting offset to " + MessageConsumer.this.startingOffset);
                        kafkaConsumer.seek(topicPartition, MessageConsumer.this.startingOffset);
                    }
                }
            }
        });

    }

    public void startReceiving(BiConsumer<K, V> biConsumer) {
        try {
            while (true) {
                ConsumerRecords<K, V> records = kafkaConsumer.poll(100);
                records.forEach(record->  biConsumer.accept(record.key(), record.value()));
                if (startingOffset == -2) kafkaConsumer.commitSync();
            }
        } finally {
            kafkaConsumer.close();
        }
    }

    public static void main(String[] args) {
        final MessageConsumer<String, String> testConsumer = new MessageConsumer<>("test0", "testGroup");
        testConsumer.startReceiving( (k,v) -> System.out.println("received:"+v) );

    }
}


5. Execution 

Let's start the Consumer now. It should output something like:

[test0-0] topic-partitions are assigned to this consumer
Current offset is 0 committed offset is ->null


Now let's start the Producer. Is should send 3 test messages and print information about them:

Message was sent: offset:0 partition:0 topic:test0
Message was sent: offset:1 partition:0 topic:test0
Message was sent: offset:2 partition:0 topic:test0

Consumer also should print information about received messages:

received:Test 1 Tue Jan 23 15:36:29 CET 2018
received:Test 2 Tue Jan 23 15:36:29 CET 2018
received:Test 3 Tue Jan 23 15:36:29 CET 2018

Let's restart our consumer now. By default value in our constructor, if will be reading data from the end, so previous messages will not be shown:

[test0-0] topic-partitions are assigned to this consumer
Current offset is 3 committed offset is ->OffsetAndMetadata{offset=3, metadata=''}
Setting it to the end 


Now we can try to read previous messages by changing the constructor parameter:

public static void main(String[] args) {
    final MessageConsumer<String, String> testConsumer = new MessageConsumer<>("test0", "testGroup", 2);
    testConsumer.startReceiving( (k,v) -> System.out.println("received:"+v) );
}

- it's now 2 so we will be reading from offset 2. Let's restart it again:

[test0-0] topic-partitions are assigned to this consumer
Current offset is 3 committed offset is ->OffsetAndMetadata{offset=3, metadata=''}
Resetting offset to 2
received:Test 3 Tue Jan 23 15:36:29 CET 2018

- now last previous message with the offset 2 was read.

6. The end

Source code can be downloaded from here.