Configure SQL Processors

The Lenses SQL Engine allows users to browse topics or even build and execute Kafka streams flow with a SQL like syntax. There are currently three execution modes available: IN_PROC, CONNECT and KUBERNETES. The last two are available only for Enterprise and offers configurable and reliable scale out of Kafka Streams apps built via Lenses SQL.

To configure which execution mode update set the lenses.sql.execution.mode.

In Process

IN_PROC is the default execution, set the lenses.sql.execution.mode to IN_PROC.

# Set up Lenses SQL processing engine
lenses.sql.execution.mode = "IN_PROC" // "CONNECT" // "KUBERNETES"
lenses.sql.state.dir = "logs/lenses-sql-kstream-state"

Kafka Connect

To configure Lenses for CONNECT execution mode:

  1. Edit the lenses.conf file and set the SQL execution mode to CONNECT
  2. Add one or more connect-distributed endpoints for each of your Lenses SQL enabled clusters in the lenses.connect.clusters configuration option.

The resulting lenses.conf should look like this:

lenses.connect.clusters = [{name: "sql-cluster", url: "http://localhost:8083", statuses: "connect-statuses", config: "connect-configs", offsets: "connect-offsets" }]
....
# Set up Lenses SQL processing engine
lenses.sql.execution.mode = "CONNECT"
lenses.sql.state.dir = "logs/lenses-sql-kstream-state"

This configuration tells Lenses that the processor execution mode is CONNECT and where to find the Lenses SQL enabled connect clusters.

Warning

When scaling out with CONNECT, the lenses.sql.state.dir must be created on all workers in any SQL enabled Connect Cluster! This maps to the connect.sql.state.store.dir connector option when used with Lenses.

Connector Install

The connector needs to be available to each worker in the Kafka Connect Cluster intended for SQL. The best way to archive this is via the isolated classpath loader introduced into Connect in Kafka version 0.11.

  1. Create a folder called plugins/lib and place the Lenses SQL Connector jar inside
  2. Set the plugin.path in the worker properties file to the location of the jar
  3. Restart the Connect worker.
#  create folder
mkdir -p plugins/lib

# copy in the jar
cp lenses-sql-runners-x.x.x-all.jar plugins/lib

# add plugins path to the worker properties file, ensure this is the only uncommented entry
echo $PWD/plugins/lib > config/connect-distributed.properties

# restart the workers
bin/connect-distributed.sh config/connect-distributed.properties

If you are using Kafka versions 0.10.x the plugin.path classloader isolation is not available then set the connector first on the classpath

export CLASSPATH=lenses-sql-runners-x.x.x-all.jar

Lenses will scan the Connect cluster specified in the lenses.connect.clusters option for the Lenses SQL connector class and make them available for selection when submitting processors. You can check if the SQL runner is correctly picked with the Connect CLI.

~|⇒ connect-cli plugins
Class name: com.landoop.connect.SQL, Type: source, Version: X.X.X
Class name: org.apache.kafka.connect.file.FileStreamSinkConnector, Type: sink, Version: 0.11.0.0-cp1
Class name: org.apache.kafka.connect.file.FileStreamSourceConnector, Type: source, Version: 0.11.0.0-cp1
~|

Kubernetes

To enable scale-out processing via Kubernetes the lenses.sql.execution.mode needs to be set to KUBERNETES. Additionally, Lenses requires access to a kubectl config file and Kubernetes requires access to Landoops Container Registry.

# kubernetes configuration
lenses.kubernetes.image.name = ""
lenses.kubernetes.image.tag = ""
lenses.kubernetes.config.file = "/home/lenses/.kube/config"
lenses.kubernetes.service.account = "default"

The Docker images for the Lenses SQL Runners are hosted in Landoops container registry. Kubernetes requires an image pull secret to be set up for each namespace you wish to deploy the Lenses SQL Runners too.

Enterprise customers will be provided with credentials to access the registry. For each namespace, you wish to deploy to the script bin/configure-image-secret can be run to set up the image pull secret:

./configure-image-secret landoop lenses-sql gce-credentials.json username@example.com https://eu.gcr.io default

The options for the script are, in ordinal position.

argument Description
context Kubectl context to use
namespace Namespace to create the secret in
json_key_path
The path to the GCE service
account user credential file
email
The email to use, require for creating
a docker-registry secret in Kubernetes
gcr_registry The google container registry url
service_account
The kubernetes service account to patch.
This is optional. The ‘default’ service account is
patched in the namespace if not set

If you are not using the default service account you need to set the correct service account in lenses.kubernetes.service.account option in the lenses.conf. This tells Lenses to deploy the pods using this service account.

Kubernetes Services

The Lenses SQL runners requires access to the Kafka brokers, Zookeeper and optionally the Schema Registry. If you are running Lenses inside Kubernetes it is recommended to use Kubernetes services for these endpoints to abstract the configuration, avoiding updates in case the Kafka cluster topology changes. Lenses currently does not support creating these services.

If you are using Kubernetes services, the names should match those set in the lenses.conf file. For example, assuming a service and endpoint have been created in the default namespace called Kafka as follows:

        # Service
        kind: Service
        apiVersion: v1
        metadata:
        name: kafka
        spec:
        ports:
        - port: 9092

The lenses.conf file should specify kafka for the kafka.brokers. The same applies for the other services .i.e zookeeper and optionally the Schema Registry.

        lenses.kafka.brokers= "PLAINTEXT://kafka:9092"
        lenses.zookeeper.hosts = "zookeeper:2181"
        lenses.schema.registry.urls = "http://schema-registry:8081"

Runner Helm Chart

Helm is a package manager for Kubernetes which allows you to set out in configuration which image you want, the container specs, the application environment and the labeling and annotations in Kubernetes that allow for monitoring.

For a current list of our existing Helm Charts please visit our repo. The Lenses Connector SQL processor chart, available for Enterprise users, is packaged in the SQL runner connector release.

To deploy the SQL runner Helm Chart, edit the values.yaml accordingly or set them at the command line.

# Add repos other connector charts
helm repo add landoop https://landoop.github.io/kafka-helm-charts/

# Install with values.yaml in dry run mode
helm install charts/lenses-sql-processor-runner --name my-stream --namespace lenses --dry-run --debug

# Install
helm install charts/lenses-sql-processor-runner --name my-stream --namespace lenses

# Install and override with different values from a file
helm install -f myvalues.yaml ./helm

# Install and override with different values from command line
helm install install charts/lenses-sql-processor-runner --name my-stream --namespace lenses --set sql.app.id=landoop

Warning

Lenses will pick up and track deployments created via Helm however if you modify or delete via Lenses, Helm is not aware of these changes. Future releases of Lenses will address this.

Important

The connector and Kubernetes artifacts are delivered after an initial commercial agreement

SQL Runner Config

The connector or kubernetes processor when not deployed via Lenses requires a minimal set of configurations which are handled for you when submitting requests via Lenses.

Key Description Type Importance
sql.bootstrap.servers Kafka brokers to bootstrap the clients string high
sql.schema.registry.url The url of the schema registry including the protocol .i.e. http string high
sql.state.store.dir Location for KStreams rocksdb directory string high
sql Lenses SQL query to execution in the KStream string high
sql.app.id The Kafka consumer group string medium
sql.metrics.topic The topic to write connector metrics to string medium
sql.metric.frequency Frequency in msec to send state and metrics to the metric topic long medium
sql.enable.metrics Enable state and metrics reporting to Lenses metrics topic boolean medium
sql.status.topic
Status backing topic of the Connect Cluster, has been paused.
The Connect framework does not expose this at runtime
string high
sql.consumer.extra
Contains consumer specific
connection settings as a Json.
These are used mainly for SSL/Kerberorised clusters
string medium
sql.producer.extra
Contains producer specific
connection settings as a Json.
These are used mainly for SSL/Kerberorised clusters
string medium

The following Default values are used if not provided

Key Default value
sql.bootstrap.servers localhost:9092
sql.schema.registry.url http://localhost:8081
sql.state.store.dir logs/lenses-kafka-streams-state
sql.lenses.id lenses.connect.id.${UUID.randomUUID()}
sql.metrics.topic _kafka_lenses_metrics
sql.metric.frequency 5000
sql.enable.metrics true
sql.status.topic connect-statuses
sql.consumer.extra  
sql.producer.extra  

Kubernetes SQL Deployment

Below is an example Kubernetes deployment yaml.

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
annotations:
    deployment.kubernetes.io/revision: "1"
labels:
    app: sql-processor-outside
    lenses: lenses-processor
    lenses-user: ""
name: sql-processor-outside
namespace: processor-test
spec:
replicas: 5
selector:
    matchLabels:
    app: sql-processor-outside
    containerPort: "8083"
    lenses: lenses-processor
    pipeline: lsql
strategy:
    rollingUpdate:
    maxSurge: 1
    maxUnavailable: 1
    type: RollingUpdate
template:
    metadata:
    annotations:
        prometheus.io/path: /metrics
        prometheus.io/port: "9101"
        prometheus.io/scrape: "true"
    creationTimestamp: null
    labels:
        app: sql-processor-outside
        containerPort: "8083"
        lenses: lenses-processor
        pipeline: lsql
    spec:
    containers:
    - name: lenses-processor
        image: http://eu.gcr.io/lenses-container-registry/lenses-sql-processor:latest
        imagePullPolicy: IfNotPresent
        lifecycle:
        preStop:
            exec:
            command:
            - /etc/landoop/processor-stop
        livenessProbe:
        exec:
            command:
            - /etc/landoop/liveliness
        failureThreshold: 3
        initialDelaySeconds: 20
        periodSeconds: 5
        successThreshold: 1
        timeoutSeconds: 5
        ports:
        - containerPort: 8083
        protocol: TCP
        resources:
        limits:
            memory: 768Mi
        requests:
            memory: 512Mi
    - env:
        - name: ROOT_LOG_LEVEL
        value: INFO
        - name: SQL_METRIC_FREQUENCY
        value: "5000"
        - name: SQL
        value: "SET autocreate=true;
        SET `auto.offset.reset`='earliest';
        INSERT INTO `demo-kuberenetes`
        SELECT *
        FROM `position_reports`
        WHERE _ktype = AVRO AND _vtype = AVRO"
        - name: SQL_BOOTSTRAP_SERVERS
        value: PLAINTEXT://kafka:9092
        - name: SQL_SCHEMA_REGISTRY_URL
        value: http://schema-registry:8081
        - name: SQL_APP_ID
        value: my-processor
        - name: SQL_STATE_STORE_DIR
        value: logs/lenses-sql-kstream-state
        - name: SQL_METRICS_TOPIC
        value: _kafka_lenses_metrics
        - name: SQL_METRIC_FREQUENCY
        value: "5000"
        - name: SQL_PORT
        value: "8083"
        - name: JAVA_OPTS
        value: -Xms256m -Xmx512m -XX:MaxPermSize=128m -XX:MaxNewSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true