REST API

Lenses provides a rich set of REST APIs that can be used to interact with Apache Kafka, topics, offsets, consumers as well as the micro-services of your data streaming platform. Lenses takes security as a first-class citizen and provides role-based access and auditing on APIs and protects sensitive data such as passwords.

Note

We recommend protecting via firewall rules direct access to any REST APIs (i.e. Schema Registry, Kafka Connect, Kubernetes) and then use Lenses APIs to take advantage of refined secure access policies.

To run the examples below, we recommend to install the jq tool

Headers

  • x-kafka-lenses-token:myToken
  • content-type:application/json

Error Codes

All APIs use a standard HTTP error codes for any requests that return an HTTP status indicating an error (4xx or 5xx statuses).

Authentication

All REST APIs are protected via role-based authentication that is either BASIC or LDAP based, depending on how Lenses security has been setup. In order to be able to use the APIs, you will need to first authenticate via an appropriate user, then receive an access token and use that token for any subsequent request.

POST /api/login

Data Params

  • user, string
  • password, string

Error Code

401 UNAUTHORIZED

Example Request

# login and receive the access token
HOST="http://localhost:9991"
TOKEN=$(curl -X POST -H "Content-Type:application/json" -d '{"user":"admin",  "password":"XXXXX"}' ${HOST}/api/login --compress -s | jq -r .'token')
echo $TOKEN

Example Response

{
    "success": true,
    "token": "a1f44cb8-0f37-4b96-828c-57bbd8d4934b",
    "user": {
        "id": "admin",
        "name": "Admin User",
        "email": null,
        "roles": ["admin", "read", "write", "nodata"]
    },
    "schemaRegistryDelete": true
}

Note

Once an Access Token has been retrieved, it will need to be used in every subsequent request to any API call -H "X-Kafka-Lenses-Token:${TOKEN}"

Data API

The REST APIs for getting Data allows you to get data from topics by sending LSQL Queries. You can also subscribe and produce messages to the Kafka Topic Live Stream via the Web Socket APIs

LSQL URL Encoding

The LSQL statements need to be encoded in order to fire the request. You may want to follow standard instructions here for the encoding: https://www.w3schools.com/tags/ref_urlencode.asp Here is an example:

Assume the query bellow:

SELECT * FROM `topicA`
WHERE _vtype='AVRO'
AND _ktype='AVRO'
LIMIT 1000

The Encoded version will look like this:

SELECT+*+FROM+%60topicA%60%0AWHERE+_vtype%3D%27AVRO%27%0AAND+_ktype%3D%27AVRO%27%0ALIMIT+1000

LSQL Validation

GET /api/sql/validation

URL Params

  • sql, string (Encoded URL LSQL query), Required

Example Request

# login and receive the access token
HOST="http://localhost:9991"
TOKEN=$(curl -X GET -H "Content-Type:application/json" ${HOST}/api/sql/validation?sql=SELECT+*+FROM+%60topicA%60%0AWHERE+_vtype%3D%27AVRO%27%0AAND+_ktype%3D%27AVRO%27%0ALIMIT+1000 --compress -s | jq -r .'token')
echo $TOKEN

Example Response

{
    "isValid": true,
    "line": 0,
    "column": 0,
    "message": null
}

Example Error

{
    "isValid": false,
    "line": 4,
    "column": 1,
    "message": "Invalid syntax.Encountered \"LIIT\" at line 4, column 1.\nWas expecting one of:\n    <EOF> ... "
}

LSQL Get Data

GET /api/sql/data

URL Params

  • sql, string (Encoded URL LSQL query), Required

Example Request

# login and receive the access token
HOST="http://localhost:9991"
TOKEN=$(curl -X GET -H "Content-Type:application/json" ${HOST}/api/sql/data?sql=SELECT+*+FROM+%60topicA%60%0AWHERE+_vtype%3D%27AVRO%27%0AAND+_ktype%3D%27AVRO%27%0ALIMIT+1000 --compress -s | jq -r .'token')
echo $TOKEN

Example Response

{
    "messages": [
        {
            "timestamp": 1510605052735,
            "partition": 0,
            "key": "my-key",
            "offset": 0,
            "topic": "myTopic",
            "value": "my-value"
        },
        {
            ...
        }
    ],
    "offsets": [
        {
            "partition": 0,
            "min": 0,
            "max": 10000000
        },
        {
            ...
        }
    ]
}

Example Error

{
    "isValid": false,
    "line": 4,
    "column": 1,
    "message": "Invalid syntax.Encountered \"LIIT\" at line 4, column 1.\nWas expecting one of:\n    <EOF> ... "
}

Topic API

Create Topic

POST /api/topics

Data Params

  • topicName, string, Required
  • replication, int
  • partitions, int
  • configs, topic key - value

Example Request

{
    "topicName": "topicA",
    "replication": 1,
    "partitions": 1,
    "configs": {
        "cleanup.policy": "compact",
        "compression.type": "snappy"
    }
}

Delete Topic

DELETE /api/topics/(string: topicName)

Route Params

  • topicName, string

Update Topic Configuration

PUT /api/topics/config/(string: topicName)

Route Params

  • topicName, string

Data Params

  • configs, array of topic config key-values

Example Request

PUT /api/topics/config/topicA

{
    "configs": [{
        "key": "cleanup.policy",
        "value": "compact"
    }]
}

Get Topic information

GET api/topics/(string: topicName)

Route Params

  • topicName, string

Example Response

{
    "topicName": "topicA",
    "keyType": "AVRO",
    "valueType": "AVRO",
    "partitions": 1,
    "replication": 1,
    "isControlTopic": false,
    "keySchema": null
    "valueSchema": null,
    "messagesPerSecond": 0,
    "totalMessages": 1737056563,
    "timestamp": 1515415557251,
    "isMarkedForDeletion": false,
    "config": [{
        "configuration": "cleanup.policy",
        "value": "compact",
        "defaultValue": "delete",
        "documentation": "A string that is either \"delete\" or \"compact\". This string designates the retention policy to use on old log segments. The default policy (\"delete\") will discard old segments when their retention time or size limit has been reached. The \"compact\" setting will enable log compaction on the topic."
    }],
    "consumers": [],
    "messagesPerPartition": [{
        "partition": 0,
        "messages": 1737056563,
        "begin": 0,
        "end": 1737056563
    }]
}

Processor API

Before using the Processor API make sure you are aware of which mode your Lenses instance is running to execute the processors (Lenses Configuration).

Create processor

POST /api/streams

Data Params

  • name, string, Required
  • sql, string, Required
  • runners, int
  • clusterName, string, applies for scale modes
  • namespace, string, applies for Kubernetes mode
  • pipeline, string, applies for Kubernetes mode

Example Request

{
     "name": "myProcessor",
     "sql": "SET `autocreate`=true;INSERT INTO `topicB` SELECT * FROM `topicA` WHERE  _ktype='BYTES' AND _vtype='AVRO'",
     "runners": 1,
     "clusterName": "myCluster",
     "namespace": "ns"
 }

Pause Processor

PUT /api/streams/(string: processorName)/pause

Route Params

  • processorName, string, Required

Resume Processor

PUT /api/streams/(string: processorName)/resume

Route Params

  • processorName, string, Required

Update Processor Runners

PUT /api/streams/(string: processorName)/scale/(int: numberOfRunners)

Route Params

  • processorName, string, Required
  • numberOfRunners, int, Required

Delete Processor

DELETE /api/streams/(string: processorName)

Route Params

  • processorName, string, Required

Connector API

Kafka Connect APIs are getting proxied via Lenses. In case multiple connect clusters are managed via Lenses you will need to include the alias used for this cluster as per your Lenses Configuration.

# List active connectors
GET /api/proxy-connect/(string: clusterAlias)/connectors

# Create new connector
POST /api/proxy-connect/(string: clusterAlias)/connectors [CONNECTOR_CONFIG]

# Get information about a specific connector
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)

# Get connector config
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/config

# Set connector config
PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/config

# Get connector status
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/status

# Pause a connector
PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/pause

# Resume a paused connector
PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/resume

# Restart a connector
POST /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/restart

# Get list of connector tasks
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks

# Get current status of a task
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks/(string: task_id)/status

# Restart a connector task
POST /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks/(string: task_id)/restart

# Remove a running connector
DELETE /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)

# List available connector plugins
GET /api/proxy-connect/(string: clusterAlias)/connector-plugins

Schemas API

The Schema Registry proxy API interact directly with the schema registry services and iterate over the available hosts and in case of a failure retries with the next available one

# List all available subjects
GET /api/proxy-sr/subjects

# List all versions of a particular subject
GET /api/proxy-sr/subjects/(string: subject)/versions

# Delete a subject and associated compatibility level
DELETE /api/proxy-sr/subjects/(string: subject)

# Get the schema for a particular subject id
GET /api/proxy-sr/schemas/ids/{int: id}

# Get the schema at a particular version
GET /api/proxy-sr/subjects/(string: subject)/versions/(versionId: version)

# Register a new schema under a particular subject
POST /subjects/(string: subject)/versions

# Delete a particular version of a subject
DELETE /api/proxy-sr/subjects/(string: subject)/versions/(versionId: version)

# Update global compatibility level
PUT /api/proxy-sr/config

# Get global compatibility level
GET /api/proxy-sr/config

# Change compatibility level of a subject
PUT /api/proxy-sr/config/(string: subject)

# Get compatibility level of a subject
GET /api/proxy-sr/config/(string: subject)

Tip

You can still use the rest endpoints for Schema Registry and Kafka Connect directly. Lenses only proxies the queries to the correct endpoint by also tracking the relevant auditing information!