HazelCast Sink

Download connector Hazelcast Connector for Kafka 1.0 Hazelcast Connector for Kafka 0.11

This HazelCast Sink allows you to write events from Kafka to HazelCast. The connector takes the value from the Kafka Connect SinkRecords and inserts/updates an entry in HazelCast. The Sink supports writing to a reliable topic, ring buffer, queue, set, list, imap, multi-map, and icache. This Connector has been developed in collaboration with HazelCast.

Prerequisites

  • Apache Kafka 0.11.x of above
  • Kafka Connect 0.11.x or above
  • Hazelcast 3.6.4 or higher
  • Java 1.8

Features

  1. The KCQL routing querying - Kafka topic payload field selection is supported, allowing you to chose the fields to be written to Hazelcast
  2. Encoding as Json or Avro in Hazelcast via KCQL
  3. Storing in a Hazelcast RELIABLE_TOPIC, RING_BUFFER, QUEUE, SET, LIST, IMAP, MULTI_MAP, ICACHE via KCQL
  4. Parallel writes for improved performance
  5. Error policies for handling failures.

KCQL Support

INSERT INTO entity_name SELECT { FIELD, ... } FROM kafka_topic [PK FIELD, ...] WITHFORMAT
{ JSON|AVRO } STOREAS { RELIABLE_TOPIC|RING_BUFFER|QUEUE|SET|LIST|IMAP|MULTI_MAP|ICACHE }

Tip

You can specify multiple KCQL statements separated by ; to have a the connector sink multiple topics.

The Hazelcast sink supports KCQL, Kafka Connect Query Language. The following support KCQL is available:

  1. Field selection
  2. Target Hazelcast storage selection, RELIABLE_TOPIC, RING_BUFFER, QUEUE, SET, LIST, IMAP, MULTI_MAP or ICACHE
  3. Serialiaztion format in Hazelcast, Json, text or Avro
  4. Setting the primary keys for MAPS and MULTI_MAP.

Example:

-- Insert mode, select all fields from topicA and write to tableA
INSERT INTO tableA SELECT * FROM topicA

-- Insert mode, select 3 fields and rename from topicB and write to tableB, store as serialized Avro encoded byte arrays
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB WITHFORMAT avro STOREAS RING_BUFFER

This is set in the connect.hazelcast.kcql option.

Primary Keys

When inserting into MAP or MULTI_MAP we need a key, the PK keyword can be used to specifiy the fields which will be used for the key value. The field values will be concatenated and separated by a -. If no fields are set the topic name, partition and message offset are used.

With Format

Hazelcast requires that data stored in collections and topics is serializable. The Sink offers two modes to store data.

Avro In this mode the Sink converts the SinkRecords from Kafka to Avro encoded byte arrays. Json In this mode the Sink converts the SinkRecords from Kafka to Json strings.

This behaviour is controlled by the KCQL statement in the connect.hazelcast.kcql option. The default is JSON.

Stored As

The Hazelcast Sink supports storing data in RingBuffers, ReliableTopics, Queues, Sets, Lists, IMaps, Multi-maps and ICaches. This behaviour is controlled by the KCQL statement in the connect.hazelcast.kcql option. Note that IMaps, Multi-maps and ICaches support a key as well as a value.

-- store into a ring buffer
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB WITHFORMAT avro STOREAS RING_BUFFER
-- store into a reliable topic
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB WITHFORMAT avro STOREAS RELIABLE_TOPIC
-- store into a queue
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB WITHFORMAT avro STOREAS QUEUE
-- store into a set
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB WITHFORMAT avro STOREAS SET
-- store into a list
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB WITHFORMAT avro STOREAS LIST
-- store into an i-map with field1 used as the map key
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB PK field1 WITHFORMAT avro STOREAS IMAP
-- store into a multi-map with field1 used as the map key
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB PK field1 WITHFORMAT avro STOREAS MULTI_MAP
-- store into an i-cache with field1 used as the cache key
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB PK field1 WITHFORMAT avro STOREAS ICACHE

Parallel Writes

By default each task in the Sink will write the records it receives sequentially, the Sink optionally supports parallel writes where an executorThreadPool is started and records are written in parallel. While this results in better performance we can’t guarantee the order of the writes.

To enable parallel writes set the connect.hazelcast.parallel.write configuration option to true

Lenses QuickStart

The easiest way to try out this is using Lenses Box the pre-configured docker, that comes with this connector pre-installed. You would need to Connectors –> New Connector –> Sink –> HazelCast and paste your configuration

../../_images/lenses-create-hazelcast-sink-connector.png

HazelCast Setup

Download and install HazelCast from here

When you download and extract the Hazelcast ZIP or TAR.GZ package, you will see 3 scripts under the /bin folder which provide basic functionality for member and cluster management.

The following are the names and descriptions of each script:

  • start.sh - Starts a Hazelcast member with the default configuration in the working directory.
  • stop.sh - Stops the Hazelcast member that was started in the current working directory.

Start HazelCast:

➜  bin/start.sh

INFO: [10.128.137.102]:5701 [dev] [3.6.4] Address[10.128.137.102]:5701 is STARTING
Aug 16, 2016 2:43:04 PM com.hazelcast.nio.tcp.nonblocking.NonBlockingIOThreadingModel
INFO: [10.128.137.102]:5701 [dev] [3.6.4] TcpIpConnectionManager configured with Non Blocking IO-threading model: 3 input threads and 3 output threads
Aug 16, 2016 2:43:07 PM com.hazelcast.cluster.impl.MulticastJoiner
INFO: [10.128.137.102]:5701 [dev] [3.6.4]

Members [1] {
    Member [10.128.137.102]:5701 this
}

Aug 16, 2016 2:43:07 PM com.hazelcast.core.LifecycleService
INFO: [10.128.137.102]:5701 [dev] [3.6.4] Address[10.128.137.102]:5701 is STARTED

This will start Hazelcast with a default group called dev and password dev-pass

Installing the Connector

Connect, in production should be run in distributed mode

  1. Install and configure a Kafka Connect cluster
  2. Create a folder on each server called plugins/lib
  3. Copy into the above folder the required connector jars from the stream reactor download
  4. Edit connect-avro-distributed.properties in the etc/schema-registry folder and uncomment the plugin.path option. Set it to the root directory i.e. plugins you deployed the stream reactor connector jars in step 2.
  5. Start Connect, bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties

Connect Workers are long running processes so set an init.d or systemctl service accordingly.

Starting the Connector (Distributed)

Download, and install Stream Reactor. Follow the instructions here if you haven’t already done so. All paths in the quickstart are based on the location you installed Stream Reactor.

Once the Connect has started we can now use the kafka-connect-tools cli to post in our distributed properties file for HazelCast. For the CLI to work including when using the dockers you will have to set the following environment variable to point the Kafka Connect Rest API.

export KAFKA_CONNECT_REST="http://myserver:myport"
➜  bin/connect-cli create hazelcast-sink < conf/hazelcast-sink.properties

name=hazelcast-sink
connector.class=com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelCastSinkConnector
tasks.max=1
topics=hazelcast-topic
connect.hazelcast.cluster.members=locallhost
connect.hazelcast.group.name=dev
connect.hazelcast.group.password=dev-pass
connect.hazelcast.kcql=INSERT INTO sink-test SELECT * FROM hazelcast-topic WITHFORMAT JSON

If you switch back to the terminal you started Kafka Connect in, you should see the Elastic Search Sink being accepted and the task starting.

We can use the CLI to check if the connector is up but you should be able to see this in logs as well.

#check for running connectors with the CLI
➜ bin/connect-cli ps
hazelcast-sink
INFO
    ____        __        __  ___                  __        _
   / __ \____ _/ /_____ _/  |/  /___  __  ______  / /_____ _(_)___  ___  ___  _____
  / / / / __ `/ __/ __ `/ /|_/ / __ \/ / / / __ \/ __/ __ `/ / __ \/ _ \/ _ \/ ___/
 / /_/ / /_/ / /_/ /_/ / /  / / /_/ / /_/ / / / / /_/ /_/ / / / / /  __/  __/ /
/_____/\__,_/\__/\__,_/_/  /_/\____/\__,_/_/ /_/\__/\__,_/_/_/ /_/\___/\___/_/
    __  __                 ________           __  _____ _       __
   / / / /___ _____  ___  / / ____/___ ______/ /_/ ___/(_)___  / /__
  / /_/ / __ `/_  / / _ \/ / /   / __ `/ ___/ __/\__ \/ / __ \/ //_/
 / __  / /_/ / / /_/  __/ / /___/ /_/ (__  ) /_ ___/ / / / / / ,<
/_/ /_/\__,_/ /___/\___/_/\____/\__,_/____/\__//____/_/_/ /_/_/|_|

by Andrew Stevenson
       (com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelCastSinkTask:41)

Members [1] {
    Member [172.17.0.2]:5701
}
INFO: HazelcastClient[dev-kafka-connect-05e64989-41d9-433e-ad21-b54894486384][3.6.4] is CLIENT_CONNECTED

Test Records

Tip

If your input topic doesn’t match the target use Lenses SQL to transform in real-time the input, no Java or Scala required!

Now we need to put some records it to the hazelcast-topic topics. We can use the kafka-avro-console-producer to do this. Start the producer and pass in a schema to register in the Schema Registry. The schema has a firstname field of type string a lastname field of type string, an age field of type int and a salary field of type double.

bin/kafka-avro-console-producer \
  --broker-list localhost:9092 --topic hazelcast-topic \
  --property value.schema='{"type":"record","name":"User",
  "fields":[{"name":"firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"salary","type":"double"}]}'

Now the producer is waiting for input. Paste in the following:

{"firstName": "John", "lastName": "Smith", "age":30, "salary": 4830}

Check for records in HazelCast

Now check the logs of the connector you should see this:

[2016-08-20 16:53:58,608] INFO Received 1 records. (com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelCastWriter:62)
[2016-08-20 16:53:58,644] INFO Written 1 (com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelCastWriter:71)

Configurations

The Kafka Connect framework requires the following in addition to any connectors specific configurations:

Config Description Type Value
name Name of the connector string This must be unique across the Connect cluster
topics
The topics to sink.
The connector will check this matchs the KCQL statement
string  
tasks.max The number of tasks to scale output int 1
connector.class Name of the connector class string com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelcastSinkConnector

Connector Configurations

Config Description Type
connect.hazelcast.kcql KCQL expression describing field selection and routes string
connect.hazelcast.cluster.members
Address List is the initial list of
cluster addresses to which the client
will connect. The client uses this list to
find an alive node. Although it may be enough
to give only one address of a node in the
cluster (since all nodes communicate with each other),
it is recommended that you give the addresses for all the nodes
string
connect.hazelcast.group.name
The group name of the connector
in the target Hazelcast cluster
string

Optional Configurations

Config Description Type Default
connect.hazelcast.batch.size
Specifies how many records to insert together
at one time. If the connect framework provides
fewer records when it is
calling the Sink it won’t wait to
fulfill this value but rather execute it
int 1000
connect.hazelcast.group.password The password for the group name string dev-pass
connect.hazelcast.timeout
Connection timeout is the timeout value in
milliseconds for nodes to accept client
connection requests
int 5000
connect.hazelcast.retries
The number of times a client will

retry the connection at startup

int 2
connect.hazelcast.keep.alive
Enables/disables the SO_KEEPALIVE socket option
boolean true
connect.hazelcast.tcp.no.delay
Enables/disables the SO_REUSEADDR socket option
boolean true
connect.hazelcast.linger.seconds
Enables/disables SO_LINGER with the specified
linger time in seconds
int 3
connect.hazelcast.buffer.size
Sets the SO_SNDBUF and SO_RCVBUF options

to the specified value in KB for this Socket

int 32
connect.hazelcast.parallel.write
All the sink to write in parallel the records
received from Kafka on each poll.
Order of writes in not guaranteed
boolean true
connect.hazelcast.error.policy
Specifies the action to be
taken if an error occurs while inserting the data.
There are three available options, NOOP, the error
is swallowed, THROW, the error is allowed
to propagate and retry.
For RETRY the Kafka message is redelivered up
to a maximum number of times specified by the
connect.hazelcast.max.retires option
string THROW
connect.hazelcast.max.retires
The maximum number of times a message
is retried. Only valid when the
connect.hazelcast.error.policy is set to THROW
string 10
connect.hazelcast.retry.interval
The interval, in milliseconds between retries,
if the sink is using
connect.hazelcast.error.policy set to RETRY
string 60000
connect.progress.enabled
Enables the output for how many
records have been processed
boolean false

Example

name=hazelcast-sink
connector.class=com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelCastSinkConnector
tasks.max=1
topics=hazelcast-topic
connect.hazelcast.cluster.members=locallhost
connect.hazelcast.group.name=dev
connect.hazelcast.group.password=dev-pass
connect.hazelcast.kcql=INSERT INTO sink-test SELECT * FROM hazelcast-topic WITHFORMAT JSON STOREAS RING_BUFFER
connect.progress.enabled=true

Schema Evolution

Upstream changes to schemas are handled by Schema registry which will validate the addition and removal or fields, data type changes and if defaults are set. The Schema Registry enforces Avro schema evolution rules. More information can be found here.

The Sink serializes either an Avro or Json representation of the Sink record to the target reliable topic in Hazelcast. Hazelcast is agnostic to the schema.

Kubernetes

Helm Charts are provided at our repo, add the repo to your Helm instance and install. We recommend using the Landscaper to manage Helm Values since typically each Connector instance has its own deployment.

Add the Helm charts to your Helm instance:

helm repo add landoop https://landoop.github.io/kafka-helm-charts/

TroubleShooting

Please review the FAQs and join our slack channel