8. Web Socket API

With Kafka adoption rate increasing we see now clients wanting Front-End Web Application hooked into Apache Kafka. At Landoop we allow these applications to get the benefit of working with Apache Kafka while leveraging Lenses SQL capabilities.

Lenses comes with a REST API allowing JavaScript clients to work with Apache Kafka over a WebSocket. To speed up integration we provide an open source client JavaScript library; if you are using ReactJS (Angular to follow soon) you can get going fast and focus on your business requirements.

Below you can find the list of supported functionality:

  • A client can and should AUTHENTICATE first to obtain the token allowing all other requests to be accepted.
  • A client can SUBSCRIBE to a topic via SQL. Please check the Lenses SQL section for the full details on how to set filters and use functions.
  • A client can UNSUBSCRIBE from a topic.
  • A client can PUBLISH messages to a topic. The current version supports only string/json. In the future we will add support for Avro.
  • A client can COMMIT the (topic, partition) offsets.
  • A client subscription must specify the decoder type. This allows reading correctly the content of the Kafka Message Key/Value parts. The following decoders are supported
    • STRING - the byte[] payload received from Kafka is read as a String
    • INT - the byte[] payload received from Kafka is read as an Int
    • LONG - the byte[] payload received from Kafka is read as a Long
    • AVRO - the byte[] payload received from Kafka is read as an Avro
    • JSON - the byte[] payload received from Kafka is read as an Avro
    • BINARY - the byte[] payload received from Kafka is kept as it is in Binary format

8.1. Lenses WS client lib

Redux is currently supported via middleware. We are actively working on Angular support.

8.1.1. Install

npm i --save kafka-ws-js

If your browser doesn’t support Promises and fetch(), you will need the polyfills

import Promise from 'promise-polyfill';
import 'whatwg-fetch';

if (!window.Promise) {
  window.Promise = Promise;
}

8.1.2. How to use

First setup the redux store with the kafka middleware:

import { kafkaWsRedux } from 'kafka-ws-js';

function configureStore() {
  const kafkaWsMiddleware = kafkaWsRedux.createWsMiddleware();
  //Any other middleware you might use
  const logger = createLogger();
  const middleware = [logger, kafkaWsMiddleware];

  const store = createStore(
    rootReducer,
    applyMiddleware(...middleware),
  );

  return store;
}

Or you can customise the middleware with custom options, in which case it will attempt to connect with:

import { kafkaWsRedux } from 'kafka-ws-js';

function configureStore() {
  const kafkaOptions = {
    host: 'cloudera02.landoop.com:24006/api/kafka/ws',
    clientId: 'MyClient',
    // See Options section for full list
  };

  const kafkaWsMiddleware = kafkaWsRedux.createWsMiddleware(kafkaOptions);
  const middleware = [..., kafkaWsMiddleware];

  const store = createStore(
    rootReducer,
    applyMiddleware(...middleware),
  );

  return store;
}

After, add the reducer:

import { combineReducers } from 'redux';
import { kafkaWsRedux } from 'kafka-ws-js';
import sessionReducer from './sessionReducer';

const rootReducer = combineReducers({
  kafka: kafkaWsRedux.reducer,
  //Add other application reducers where you listen to kafka actions
  session: sessionReducer,
});

export default rootReducer;

Now you are ready to dispatch Actions that the middleware will intercept:

import { kafkaWsRedux } from 'kafka-ws-js';

const Actions = kafkaWsRedux.Actions;

dispatch(Actions.connect(options));
dispatch(Actions.publish(payload));
dispatch(Actions.subscribe(payload));
...

You can also listen to various Action Types dispatched by the middleware:

import { kafkaWsRedux } from 'kafka-ws-js';

const Actions = kafkaWsRedux.Type;


export const Type = {
  KAFKA_MESSAGE
  KAFKA_HEARTBEAT
  CONNECT
  CONNECT_SUCCESS
  CONNECT_FAILURE
  DISCONNECT
  DISCONNECT_SUCCESS
  DISCONNECT_FAILURE
  PUBLISH
  PUBLISH_SUCCESS
  PUBLISH_FAILURE
  SUBSCRIBE
  SUBSCRIBE_SUCCESS
  SUBSCRIBE_FAILURE
  UNSUBSCRIBE
  UNSUBSCRIBE_SUCCESS
  UNSUBSCRIBE_FAILURE
};
...

8.1.3. Options Description

Passed when creating middleware or when dispatching connect actions.

Field Type Description Default
host String
Web socket address, including port. If wss:// is not set, it will be added
by the library. Example of address: test.landoop.com:21112/api/kafka/ws or
wss://test.landoop.com:21112/api/kafka/ws
 
clientId String Client Id. If previous session found, it will send back messages on topic subscription.  
authToken String Token used in order to authenticate kafka publish/subscribe/unsubscribe messages.  
authUrl String

If no token is provided, this address will be used in order to
retrieve token. ( provided the user and password are given )
 
user String User for Http authentication.  
password String Password for Http authentication.  

8.1.4. APIs

We strongly encourage you to use the JavaScript library we provide since it already implements the protocol required and is production ready. But should you chose not to you can find below the details required to implement a client able to use the API.

The API is straight forward, there is only one REST endpoint to connect to and open a WebSocket connection:

HTTP method Path Parameter
GET /api/kafka/ws/$clientId
clientId - the unique identifier for the Kafka
Consumer created behind the scenes

Once the connection has been opened, the client needs to make sure it follows the protocol. Here is the template for each message the client can send to the back end:

{
    "type":" SUBSCRIBE/UNSUBSCRIBE/PUBLISH/COMMIT/LOGIN",
    "content":"The json text for the specific request",
    "correlationId":1000,
    "authToken" : "Authorization token or empty"
}
Field Description Type
type

Describes the action the back end will take in response to the request.
The available values are :LOGIN, SUBSCRIBE, UNSUBSCRIBE,
PUBLISH, COMMIT
String
content
Contains the Json content of the actual request. The content
is strictly related to the type described shortly.
String
correlationId
A unique identifier in order for the client to link the response
with the request made.
Long
authToken

A unique token identifying the user making the request. This token can
only be obtained once the LOGIN request has completed successfully.
String

The response received from the back end follows this template:

{
    "correlationId": Long,
    "type"  : "ERROR/INVALIDREQUEST/KAFKAMSG/HEARTBEAT/SUCCESS",
    "content": String
}
Field Description Type
correlationId
The unique identifier the client has provided in | the request associated with the response.
Long
type

Describes what response content the client has
received. Available values are: ERROR,
INVALIDREQUEST, KAFKAMSG, HEARTBEAT,``SUCCESS``
String
content

Contains the actual response content. Each
response type has its own content layout.
String

8.1.5. Protocol Definition

All requests made are constrained by user permissions on the back end. If the user has only Read access then publishing a record to a topic will not be allowed. If the user has only No-Data user role, then retrieving messages from Kafka will not be allowed either. See the Secruity section for role definitions.

8.2. Login

The first time a websocket connection opens to Lenses, a token needs to be obtained by making a LOGIN request. To do so the client will have to send the following request format:

{
   "type" : "LOGIN",
   "content" : "{
                 "user" : String,
                 "password" : String,
                }",
   "correlationId": Long,
   "authToken": String
}
Field Description Type
content

Contains a json with two fields user and password
to obtain the token for.
String
correlationId
A unique number the back end will send back
as part of the response.
Long
authToken

For this request type the authorization token
is not validated.
String

Note

The content field value is a string containing a json!

A successful login response will look like this:

{
  "correlationId" : Long,
  "type" : "SUCCESS",
  "content" : String
}
Field Description Type
content Contains the authorization token String
correlationId A unique number sent in the the request. Long

If the user or password provided is not correct, the client will receive an error response. In this case the response format looks like this:

{
  "correlationId": Long,
  "type" : "ERROR",
  "content": String
}
Field Description Type
content Contains the description error. String
correlationId A unique number sent in the the request. Long

8.3. Publishing

In order to publish a message to a topic the client has to send the following request:

{
   "type" : "PUBLISH",
   "content" : "{
                 "topic" : String,
                 "key" : String,
                 "value" : String
                }",
   "correlationId": Long,
   "authToken": String
}
Field Description Type
content
Contains a json with three fields: topic, key, and
value. The last two fields are optional. Do not
set the field if you want to send null values.
String
correlationId
A unique number the back end will send back
as part of the response.
Long
authToken
The authorization token. The back end will check
if the user roles allows such action.
String

Note

Remember, the content for key/value are sent to the target Kafka topic are sent as String! The content field value is a string containg a json!

8.4. Subscription

To receive messages from a Kafka topic the client has to send a SUBSCRIBE request.

{
   "type" : "SUBSCRIBE",
   "content" : "{
       "sqls" : [
           String,
           String
        ]
   }",
   "correlationId" : Long,
   "authToken" : String
}
Field Description Type
content
Contains a json with one field: SQLs. The field is
and array of LSQL values.
String
sqls
An array of LSQL values. The format is a SQL like
syntax allowing you to use functions, filter
and allows for field selection. See template below.
Array of String
correlationId
A unique number the back end will send back
as part of the response.
Long
authToken
The authorization token. The back end will
check if the user roles allows such action.
String
SELECT *
FROM $TOPIC
WHERE  _ktype='INT/LONG/JSON/STRING/AVRO'
       AND _vtype='INT/LONG/JSON/STRING/AVRO'
       [AND ...]

You can provide more than one LSQL statement if you want to subscribe to more than 1 topic. Please visit the Lenses SQL Engine section for full details on what it supports. The response from the back end can be a SUCCESS or an ERROR.

Once the subscription has been successful, messages arriving in the Kafka topic(-s) and matching the filter will be delivered. A message received by the client will have this structure:

{
  "content": [
    {
       "key" : "...",
       "value" : "{...}",
       "topic" : "topicA",
       "partition" : Int,
       "offset" : Long,
       "timestamp" : Long
    },
    ..
  ],
  "correlationId": Long,
  "type" : "KAFKAMSG"
}
Field Description Type
content
Contains a Json with six fields: key, value,
topic, partition, offset and timestamp.
String
content.key
Contains the Kafka message key value. If the key
is null, the field will not be present.
String
content.value
Contains the Kafka message value part. If the
value is null, the field will not be present.
String
content.topic Contains Kafka message topic name. String
content.partition Contains Kafka message partition number. Int
content.offset Contains Kafka message offset. Long
content.timestamp Contains the Kafka message timestamp. Long
correlationId
A unique number the back end will send back
as part of the response.
Long
authToken
The authorization token. The back end will check
if the user roles allows such action.
String

Note

The timestamp field requires Kafka 0.10.2+ and correct broker settings/or client publishing the timestamp.

8.5. Un-subscribe

A client can choose at any point to stop receiving messages from a given topic(-s). In order to do so it has to send the following message:

{
   "type" :"UNSUBSCRIBE",
   "content": "{
      "topics": [
         "topic":String,
         ..
      ]
   }",
   "correlationId": Long,
   "authToken" : String,
}
Field Description Type
content
Contains a Json with one field: topics. The field
should contain an array of strings representing
the topics to unsubscribe from.
String
correlationId
A unique number the back end will send back
as part of the response.
Long
authToken
The authorization token. The back end will check
if the user roles allows such action.
String

Although the subscription allows you to specify via LSQL the partitions to subscribe to, the un-subscribe does not support selective partition dropping from the subscription.

Note

Executing a subscribe call with a new LSQL for a topic already in the subscription, will unsubscribe first and subscribe again.

8.6. Offsets Commit

The JavaScript client can decide when to commit the offset in Kafka. This way, when the client reopens a connection and resubscribes to the same Kafka topic it will receive the Kafka messages from where it left it.

To commit offsets the client has to send the following message structure:

{
   "type" :"COMMIT",
   "content": "{
      "commits": [
         {
            "topic": String,
            "partition": Int,
            "offset" : Long
         },
         ...
      ]
   }",
   "correlationId": Long,
   "authToken" : String
}
Field Description Type
content
Contains a Json with one field: commits. The field
should contain an array of elements with three
fields: topic, partition and offset.
String
content.commits.topic The Kafka topic to commit the offsets for. String
content.commits.partition The Kafka topic partition to commit the offsets for. Int
content.commits.offset The offsets number to retain. Long
correlationId
A unique number the back end will send back
as part of the response.
Long
authToken
The authorization token. The back end will check
if the user roles allows such action.
String

Since the commits field is an array, more than one (topic,partition, offset) tuple can be provided at once.

Note

The content field value is a string containing a json!

8.7. Heartbeat

The REST API makes sure it keeps the connection open in case there is no data going back and forth between the client and the back end. As a result the client should be able to handle messages with the following structure:

{
  "type" : "HEARTBEAT"
}

When such messages are received the client can discard them.

8.7.1. RoadMap

To be added in the near future:

  • Finalize Angular implementation
  • Support message batching
  • Handle unexpected disconnects and reconnection attempt
  • Add timeout and delay options
  • Look into Rx.DOM.fromWebSocket for simplifying code