5. Scaling SQL Processors

Lenses leverages Kafka Streams and currently provides two execution modes to run Lenses SQL processors

../../_images/execution_modes.png

IN_PROC is the default execution mode and the processors are executed locally within Lenses. This can have scalability issues and poses a risk to the running application, and can affect stability. IN_PROC is recommended only for testing.

CONNECT is the execution mode that solves these limitations and provides availability guarantees and scalability. Lenses can deploy your Lenses SQL processors in Kafka Connect. Kafka Connect provides a distributed, fault tolerant and scalable framework as part of the core Apache Kafka distribution.

KUBERNETES is an execution mode that provides scalability by deploying Lenses SQL runners into Kubernetes clusters. Lenses can deploy and monitior SQL runner deployments created through Lenses or exsiting tools such as Helm or kubectl.

5.1. Connect

Kafka Connect provides scalable, fault tolerant, distributed processing by forming a cluster of workers. The cluster provides endpoints to which Lenses will submit the processor configuration. From this point Kafka Connect will persist configurations and distribute work to the cluster workers. Upon a restart Lenses will recover the status, configuration and metrics of any Lenses SQL Connectors that are found in the configured clusters, this ensures that if Lenses is offline processing of data in your topologies continues. Lenses will also identify any connectors created outside of Lenses at runtime and start tracking them to provide you visibility.

To scale in or out the number of processor applications we can simply instruct Kafka Connect to decrease or increase the number of tasks across the cluster. The Lenses UI provides simple way to deploy Lenses SQL processors and scale them, simply:

  1. Creating an new processor and selecting the cluster to deploy to
  2. Compose your SQL statement
  3. Set the parallelization .i.e many how tasks/application instances to run
  4. Give the processor a name
  5. Deploy

Lenses will check the validity of the SQL statement and if valid create the Connector instance and start to monitor its behaviour.

Lenses supports the following Connector functionality:

  • CREATE - Register and create a new connector
  • PAUSE - Pause the connector and tasks
  • START - Start a paused connector
  • DELETE - Remove a connector

Note

Updating an existing connector is not directly supported. The KStream app can not be updated and the update is more than likely going to break the Schema compatibility of the target insert table but it can be scaled.

5.1.1. Lenses Configuration

To configure Lenses for CONNECT execution mode:

  1. Edit the lenses.conf file and set the SQL execution mode to CONNECT
  2. Add one or more connect-distributed endpoints for each of your Lenses SQL enabled clusters in the lenses.connect.clusters configuration option.

The resulting lenses.conf should look like this:

lenses.connect.clusters = [{name: "sql-cluster", url: "http://localhost:8083", statuses: "connect-statuses", config: "connect-configs", offsets: "connect-offsets" }]
....
# Set up Lenses SQL processing engine
lenses.sql.execution.mode = "CONNECT"
lenses.sql.state.dir = "logs/lenses-sql-kstream-state"
lenses.sql.monitor.frequency = 5000
lenses.sql.connect.connector.class = "com.landoop.connect.SQL"
lenses.sql.sample.default = 2 // // Sample 2 messages every 200 msec
lenses.sql.sample.window = 200

This configuration tells Lenses that the processor execution mode is CONNECT and where to find the Lenses SQL enabled connect clusters. The connector workers are sending health-check metrics into a metrics.topic every few seconds.

Lenses will scan the Connect cluster specified in the lenses.connect.cluster option for the Lenses SQL connector class and make them available for selection when submitting processors. You can check if the SQL runner is correctly picked with the kafka-connect-cli.

~|⇒ connect-cli plugins
Class name: com.landoop.connect.SQL, Type: source, Version: 0.0.3
Class name: org.apache.kafka.connect.file.FileStreamSinkConnector, Type: sink, Version: 0.11.0.0-cp1
Class name: org.apache.kafka.connect.file.FileStreamSourceConnector, Type: source, Version: 0.11.0.0-cp1
~|

Warning

When scaling out with CONNECT, the lenses.sql.state.dir must be created on all workers in any SQL enabled Connect Cluster! This maps to the connect.sql.state.store.dir connector option when used with Lenses.

5.1.2. Connector Install

The connector needs to be available to each worker in the Kafka Connect Cluster intended for SQL. The best way to archive this is via the isolated classpath loader introduced into Connect in Kafka version 0.11.

  1. Create a folder called plugins/lib and place the Lenses SQL Connector jar inside
  2. Set the plugin.path in the worker properties file to the location of the jar
  3. Restart the Connect worker.
#  create folder
mkdir -p plugins/lib

# copy in the jar
cp lenses-sql-runners-x.x.x-all.jar plugins/lib

# add plugins path to the worker properties file, ensure this is the only uncommented entry
echo $PWD/plugins/lib > config/connect-distributed.properties

# restart the workers
bin/connect-distributed.sh config/connect-distributed.properties

If you are using Kafka versions 0.10.x the plugin.path classloader isolation is not available then set the connector first on the classpath

export CLASSPATH=lenses-sql-runners-x.x.x-all.jar

5.1.3. Connector Configuration

The connector requires a mininmal set of configurations which are handled for you when submitting requests via Lenses.

Key Description Type Importance
sql.bootstrap.servers Kafka brokers to bootstrap the clients string high
sql.schema.registry.url The url of the schema registry including the protocol .i.e.http string high
sql.state.store.dir Location for KStreams rocksdb directory string high
sql Lenses SQL query to execution in the KStream string high
sql.lenses.id A Lenses specific ID to track connector string medium
sql.metrics.topic The topic to write connector metrics to string medium
sql.metric.return.frequency Frequency in msec to send state and metrics to the metric topic long medium
sql.enable.metrics Enable state and metrics reporting to Lenses metrics topic boolean medium
sql.status.topic
Status backing topic of the Connect Cluster, has been paused.
The Connect framework does not expose this at runtime
string high

The following Default values are used if not provided

Key Default value
sql.bootstrap.servers localhost:9092
sql.schema.registry.url http://localhost:8081
sql.state.store.dir logs/lenses-kafka-streams-state
sql.lenses.id lenses.connect.id.${UUID.randomUUID()}
sql.metrics.topic _kafka_lenses_metrics
sql.metric.frequency 5000
sql.enable.metrics true
sql.status.topic connect-statuses

5.2. Deployment

The connector needs to be available to each worker in the Kafka Connect Cluster intended for SQL. The best way to archive this is via the isolated classpath loader introduced into Connect in Kafka version 0.11.

  1. Create a folder called plugins/libs and place the Lenses SQL Connector jar inside.
  2. Set the plugin.path in the worker properties file to the location of the jar
  3. Restart the Connect worker.
#  create folder
mkdir -p plugins/lib

# copy in the jar
cp lenses-sql-runners-XXX-all.jar plugins/lib

# add plugins path to the worker properties file, ensure this is the only uncommented entry
echo $PWD/plugins/lib > worker.properties

# restart the workers
bin/connect-distributed worker.properties

If you are using Kafka versions 0.10.x and the plugin.path classloader isolation is not available then set the connector first on the classpath

export CLASSPATH=lenses-sql-runners-XXX-all.jar

5.2.1. Kubernetes

Kubernetes, a container orchestation engine, provides the perfect platform to run streaming micro services. It has the ability to ensure a configured number of application instances or pods are running and to scale them up or down accordingly. Landoop already has many customers running either Kafka Stream processors or Kafka Connect Connectors in Kubernetes. All our Connectors are already dockered with Helm charts to deploy via CI/CD in a repeatable and audited manner.

In our next release we are going to extend the ExecutionMode to allow deploy Lenses SQL processors into Kubernetes but Enterprise customers can already deploy the Lenses SQL Connector into Kubernetes using our Docker. We recommend using Helm to manage and deploy the connector.

5.3. Helm

Lenses will utilize Helm, Helm is a package manager for Kubernetes which allows you to set out in configuration which image you want, the container specs, the application environment and the labelling and annotations in Kubernetes that allow for monitoring.

For a current list of our existing Helm Charts please visit our repo. The Lenses Connector SQL processor chart, available for Enterprise users, is packaged in the Connector release.

To deploy the Helm Chart, edit the values.yaml accordingly or set the them at the command line. The values.yaml contains all the options previous described.

# Add repos other connector charts
helm repo add landoop https://datamountaineer.github.io/helm-charts/

# Install with values.yaml in dry run mode
helm install ./helm --dry-run --debug

# Install
helm install ./helm

# Install and override with different values from a file
helm install -f myvalues.yaml ./helm

# Install and override with different values from command line
helm install --set connect.sql.app.id=landoop ./helm

5.4. Topics in Kubernetes

When deploying into Kubernetes the Chart is configured to have backing topics per deployment with the following format:

connect_{{ .Values.clusterName }}_statuses
connect_{{ .Values.clusterName }}_offsets
connect_{{ .Values.clusterName }}_configs

Ensure the following

The status topic used for monitoring the Connector is also set as connect_{{ .Values.clusterName }}_statuses

Ensure that these topics have an infinite retention period, retention.ms=-1 to avoid losing status, configurations or offsets on restart .i.e Kafka has removed the entries.

Note

The connect-configs topic must always ways be a compacted topic

5.4.1. Cloudera

When Landoop’s FAST DATA CSD is used, the Cloudera parcel lenses-sql-runners can install and provision the connector in a few seconds. More information and step-by-step instructions on how to install the parcel can be found at FAST DATA docs