Configure Lenses

Lenses use two configuration files. The main one, usually named lenses.conf, contains runtime configuration options except the security related ones, that are stored in a separate file, usually named security.conf. That way sensitive data can be protected by administrators, whilst configuration may be accessible for more teams. The full list of options can be found here.

License

In order to run, Lenses requires a license file which can be obtained from us. Once you have received your license, store it in a file ( i.e. license.file) and update the configuration to point to it. Make sure the configuration value contains the full file path.

# License file allowing connecting to up to N brokers
lenses.license.file="license.json"

Third Party Licenses

The license folder contains all the third-party licenses for all the software libraries used by Lenses. A complete list is also available at https://landoop.com/third-party-software

Host and Port

During startup, Lenses will bind to the IP and port settings in the configuration file. Use the ip and port configuration entries to set a different value. By default Lenses binds to port 9991.

# Set the ip:port for Lenses to bind to
lenses.ip = 0.0.0.0
lenses.port = 9991

Java Options

The following environment variables control the Java configuration options when starting Lenses:

  • LENSES_HEAP_OPTS - The heap space settings, the default is -Xmx3g -Xms512m
  • LENSES_JMX_OPTS - JMX options so set
  • LENSES_LOG4J_OPTS - Logging options
  • LENSES_PERFORMANCE_OPTS - Any extra options, default is -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true

Logging

Lenses uses Logback for logging. The logback.xml is picked up from the installation folder. To point to a different location for the Logback configuration file just export LENSES_LOG4J_OPTS like you see below:

export LENSES_LOG4J_OPTS="-Dlogback.configurationFile=file:mylogback.xml"

Log levels

Logback enables hot loading of changes to the logback.xml file. The default scan window is 30 seconds, and this can be-be adjusted via the configuration element:

<configuration scan="true" scanPeriod="30 seconds" >
  ...
</configuration>

Default log level is set to INFO. To change adjust the configuration in <logger name="akka" level="DEBUG"/>.

The default appenders are:

<logger name="akka" level="INFO"/>
<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR"/>
<logger name="com.typesafe.sslconfig.ssl.DisabledComplainingHostnameVerifier" level="ERROR"/>
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="ERROR"/>
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="WARN"/>
<logger name="org.apache.kafka.clients.consumer.internals.AbstractCoordinator" level="WARN"/>
<logger name="io.confluent.kafka.serializers.KafkaAvroDeserializerConfig" level="WARN"/>
<logger name="org.I0Itec.zkclient" level="WARN"/>
<logger name="org.apache.zookeeper" level="WARN"/>
<logger name="org.apache.calcite" level="OFF"/>

All the log entries are written to the output using the following pattern:%d{ISO8601} %-5p [%c{2}:%L] %m%n.

Log location

All the logs Lenses produces can be found in the logs directory; however we recommend following the Twelve Factor App approach to logging and log to stdout, especially when using a container orchestration engine such as Kubernetes. Leave log collection to agents such as filebeats, logstash, fluentd and flume.

Security

Options related to Lenses security —i.e options that start with lenses.security— must be stored in a separate file, usually name security.conf. You can set the path to the security configuration file in the main configuration file (lenses.conf) via the key lenses.secret.file. This way security.conf can be managed only by the administration team and have more tight access control than the rest of the configuration.

Lenses has support for the following login modes: BASIC and LDAP. The login mode is configured through the lenses.security.mode option. If set to BASIC the users and roles can be set via the lenses.security.users option. For example:

# Security by default to is set to BASIC, alternatively LDAP.
lenses.security.mode = BASIC
lenses.security.users = [
  {"username": "admin", "password": "admin", "displayname": "Lenses Admin", "roles": ["admin", "write", "read"]},
  {"username": "write", "password": "write", "displayname": "Write User", "roles": ["read", "write"]},
  {"username": "read", "password": "read", "displayname": "Read Only", "roles": ["read"]}
  {"username": "nodata", "password": "nodata", "displayname": "No Data", "roles": ["nodata"]}
]

If you want to map LDAP roles to Lenses roles you have to set the lenses.security.mode value to LDAP and then use the ldap configuration section to provide the settings.

Each company can setup its LDAP differently, therefore there is a plugin based functionality in order to have a custom implementation for retrieving the user role list. The project template for a custom implementation can be found on Github. When the code is complete, all is required is to drop the jar file into Lenses lib folder and set the configuration entry lenses.security.ldap.plugin to point to the implementation full class path.

Lenses provides out-of-the-box a default implementation via com.landoop.kafka.lenses.security.LdapMemberOfUserGroupPlugin class. Here is the template for the LDAP configuration section:

lenses.security.mode=LDAP
lenses.security.ldap.url="ldaps://mycompany.com:636"
lenses.security.ldap.base="OU=Users,DC=mycompany,DC=com"
lenses.security.ldap.user="$LDAP_USER"
lenses.security.ldap.password="$LDAP_USER_PASSWORD"
lenses.security.ldap.roles.admin=[ "GROUP_ONE" ]
lenses.security.ldap.roles.write=[ "GROUP_TWO" ]
lenses.security.ldap.roles.read=[ "GROUP_THREE" ]
lenses.security.ldap.roles.nodata=[ "GROUP_FOUR" ]
lenses.security.ldap.filter="(&(objectClass=person)(sAMAccountName=<user>))"

//LDAP roles retriever settings
lenses.security.ldap.plugin.class="com.landoop.kafka.lenses.security.LdapMemberOfUserGroupPlugin"
lenses.security.ldap.plugin.group.extract.regex="(?i)CN=(\\w+),ou=ServiceGroups.*"
lenses.security.ldap.plugin.memberof.key="memberOf"
lenses.security.ldap.plugin.person.name.key = "sn"

Note

The properties admin, read, write and nodata should list user roles not users. If a role has been added to admin it will inherit write,read and nodata automatically; there is no need to duplicate!

Key Description Optional Type Default
url The LDAP server url. For example: ldap://mycompany.com:10389 No String N/A
base Your LDAP base. For example: dc=jboss,dc=org No String N/A
user Your LDAP user. For example: uid=admin,ou=system No String N/A
password Your LDAP user password. No String N/A
filter
The LDAP search filter - must result in a unique result.
See default value. <user> is required since is replaced
at runtime with the current user id.
Yes String (&(objectClass=person)(sAMAccountName=<user>))
plugin.class
Contains the full classpath for the LDAP roles retriever

|implementation

Yes string N/A
plugin.memberof.key
Your LDAP member of key entry. This is the key for which a
role is attached to the user entry. For example,
memberOf: cn=AdminR,ou=Groups,dc=jboss,dc=org - links
AdminR role to the current user entry.
Yes String memberOf
plugin.person.name.key
Your LDAP person entry attribute containing the user full name.
The default value if the configuration is not provided is sn.
no string sn
plugin.group.extract.regex
The regular expression syntax to extra
the role for each ``memberof``(see above) entry.
The default value matches the earlier example for memberof.
Yes String (?i)CN=(\\w+),ou=Groups.*
roles.admin
The list of LDAP roles for which you want to grant
admin rights
No String[] N/A
roles.write
The list of LDAP roles for which you want to grant
write rights
No String[] N/A
roles.read
The list of LDAP roles for which you want to grant
read rights
No String[] N/A
roles.nodata
The list of LDAP roles for which you want to grant
nodata rights
No String[] N/A

Note

The configuration entries lenses.security.ldap.plugin.memberof.key, lenses.security.ldap.plugin.person.name.key and lenses.security.ldap.plugin.group.extract.regex are specific to the implementation Lenses provides out of the box. Any custom implementation may require different entries under lenses.security.ldap.plugin

Here is a sample configuration LDAP enabled Lenses

lenses.security.mode=LDAP
lenses.security.ldap.url="ldaps://landoop.ldap.url:636"
lenses.security.ldap.base="DC=landoop,DC=com"
lenses.security.ldap.password=*****
lenses.security.ldap.user="UID=smiths,OU=ServiceAccounts,DC=landoop,DC=com"
lenses.security.ldap.filter="(&((objectclass=person)(CN=<user>)))"
lenses.security.ldap.roles.admin=[ "OnlyAdmin" ]
lenses.security.ldap.plugin.class="com.landoop.kafka.lenses.security.LdapMemberOfUserGroupPlugin"
lenses.security.ldap.plugin.memberof.key="memberOf"
lenses.security.ldap.plugin.group.extract.regex="(?i)CN=(\\w+),ou=ServiceGroups.*"
lenses.security.ldap.plugin.person.name.key ="sn"

SSL Authentication

If your Kafka cluster uses TLS certificates for authentication, set the broker protocol to SSL and then pass in any keystore and truststore configurations to the consumer, producer and KStream settings by prefixing with lenses.kafka.settings. such as:

lenses.kafka.settings.consumer.security.protocol=SSL
lenses.kafka.settings.consumer.ssl.truststore.location=/var/private/ssl/client.truststore.jks
lenses.kafka.settings.consumer.ssl.truststore.password=test1234
lenses.kafka.settings.consumer.ssl.keystore.location=/var/private/ssl/client.keystore.jks
lenses.kafka.settings.consumer.ssl.keystore.password=test1234
lenses.kafka.settings.consumer.ssl.key.password=test1234

lenses.kafka.settings.producer.security.protocol=SSL
lenses.kafka.settings.producer.ssl.truststore.location=/var/private/ssl/client.truststore.jks
lenses.kafka.settings.producer.ssl.truststore.password=test1234
lenses.kafka.settings.producer.ssl.keystore.location=/var/private/ssl/client.keystore.jks
lenses.kafka.settings.producer.ssl.keystore.password=test1234
lenses.kafka.settings.producer.ssl.key.password=test1234

lenses.kafka.settings.kstream.security.protocol=SSL
lenses.kafka.settings.kstream.ssl.truststore.location=/var/private/ssl/truststore.jks
lenses.kafka.settings.kstream.ssl.truststore.password=test1234
lenses.kafka.settings.kstream.ssl.keystore.location=/var/private/ssl/client.keystore.jks
lenses.kafka.settings.kstream.ssl.keystore.password=test1234
lenses.kafka.settings.kstream.ssl.key.password=test1234

JAAS

In order for Lenses to access Kafka in an environment set up with Kerberos (SASL) you need to create a standard JAAS file and start Lenses:

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/full/path/keytab-file"
  storeKey=true
  useTicketCache=false
  serviceName="kafka"
  principal="kafka@MYREALM";
};

Then, add the additional option, before starting Lenses

export LENSES_OPTS="-Djava.security.auth.login.config=/opt/lenses/jaas.conf"

Last, set the security protocol in lenses configuration file:

lenses.kafka.settings.consumer.security.protocol=SASL_PLAINTEXT
lenses.kafka.settings.producer.security.protocol=SASL_PLAINTEXT
lenses.kafka.settings.kstreams.security.protocol=SASL_PLAINTEXT

System Topics

Lenses requires a number of system topics for monitoring, auditing, cluster, user profiles and processors information. These topics are created by Lenses during its startup and their names are configured by the topics configuration block:

# topics created on start-up that Lenses uses to store state
lenses.topics.audits = "_kafka_lenses_audits"
lenses.topics.cluster = "_kafka_lenses_cluster"
lenses.topics.metrics = "_kafka_lenses_metrics"
lenses.topics.profiles = "_kafka_lenses_profiles"
lenses.topics.processors = "_kafka_lenses_processors"

Warning

Do not alter the configuration of these topics. They are managed by Lenses. If you are using ACLs grant the user running the Lenses application permissions to manage these topics.

If ACLs are already enabled on your Kafka cluster set the ACLs for the Lenses user and server for the following topics _kafka_lenses_audits, _kafka_lenses_cluster, _kafka_lenses_metrics, _kafka_lenses_profiles and _kafka_lenses_processors.

kafka-acls \
--authorizer-properties zookeeper.connect=my_zk:2181 \
--add \
--allow-principal User:Lenses \
--allow-host lenses-host \
--operation Read \
--operation Write \
--operation Alter \
--operation Delete \
--topic topic

JMX Monitoring

Enabling JMX

To use the full potential of Lenses for cluster monitoring, JMX should be enabled for Kafka Brokers, Schema Registry and Zookeeper. To enable JMX the JMX_PORT environment variable should be set for each software with the port that it should listen for JMX connections. Additional options should be set in order to access JMX remotely (from a different host) as is the most common case.

Kafka Brokers

Set JMX_PORT and KAFKA_JMX_OPTS environment variables:

export JMX_PORT=[JMX_PORT]
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=[JMX_PORT]"
Kafka Connect

Set JMX_PORT and KAFKA_JMX_OPTS environment variables:

export JMX_PORT=[JMX_PORT]
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=[JMX_PORT]"
Schema Registry

Set JMX_PORT and SCHEMA_REGISTRY_JMX_OPTS environment variables:

export JMX_PORT=[JMX_PORT]
export SCHEMA_REGISTRY_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=[JMX_PORT]"
Zookeeper

Set JMX_PORT and ZOOKEEPER_SERVER_OPTS environment variables:

export JMX_PORT=[JMX_PORT]
export ZOOKEEPER_SERVER_OPTS="$ZOOKEEPER_SERVER_OPTS -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=[JMX_PORT]"

Configuring Lenses

Lenses integrates and monitors Kafka Brokers, Zookeepers, Schema Registries and Kafka Connect Clusters. To configure which cluster to monitor set the respective endpoints in the lenses.conf file. Use the jmx configuration section to let Lenses know about the JMX endpoints. Below you can find an example of such configuration:

# Set up infrastructure end-points
lenses.kafka.brokers = "" // "PLAINTEXT://localhost:9092"
lenses.zookeeper.hosts = "" // "localhost:2181"
lenses.schema.registry.urls = "" // "http://localhost:8081"
lenses.connect.clusters = [] // [{name: "dev", url: "http://localhost:8083", statuses: "connect-statuses", config: "connect-configs", offsets: "connect-offsets" }]

# Set up monitoring end-points
lenses.jmx.brokers = "" // "localhost:9581"
lenses.jmx.schema.registry = "" // "localhost:9582"
lenses.jmx.zookeepers = "" // "localhost:9585"
lenses.jmx.connect = [] # [ {dev: "localhost:9584"} , .. ]

# Set up integrations with Prometheus and Grafana
lenses.grafana = ""

To configure Kafka Connect correctly you must provide:

  1. Name for the cluster
  2. The urls and port of the workers in the cluster, comma separated
  3. The Kafka Connect backing topics for status, configs, and offsets

Note

The Kafka Brokers JMX endpoints are picked up automatically from Zookeeper.

Expose Lenses JMX

Lenses also exposes its own JMX and therefore other systems can monitor it. To enable JMX in Lenses set the jmx.port option; to disable it please comment out the entry. The Prometheus JMX exporter can also be used which will make Lenses metrics available to Prometheus. A JMX exporter config file jmx_config.yaml is provided in etc. The JMX exporter from Prometheus runs as javaagent and can be set via the LENSES_OPTS environment variable:

export LENSES_OPTS="-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent-0.7.jar=9102:/etc/jmx_exporter/config.yaml"

For monitoring from remote hosts, JMX remote access should be enabled as well.

export LENSES_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=[JMX_PORT]"

Alert Integration

To integrate Lenses alerting with Slack, add an Incoming WebHook integration here. Select the #channel where Lenses will be posting alerts and copy the Webhook URL

lenses.alert.plugins.slack.enabled = true lenses.alert.plugins.slack.webhook.url = “https://hooks.slack.com/services/SECRET/YYYYYYYYYYY/XXXXXXXX” lenses.alert.plugins.slack.username = “lenses” lenses.alert.plugins.slack.channel = “#devops”

lenses.alert.plugins.slack.iconUrl|The URL for the ICON appearing in the top left of each Slack message posted|STRING|YES|http://www.landoop.com/images/landoop-dark.svg|

SQL Processors

The Lenses SQL Engine allows users to browse topics or even build and execute Kafka streams flow with a SQL like syntax. There are currently three execution modes available: IN_PROC, CONNECT and KUBERNETES. The last two are available only for Enterprise and offers configurable and reliable scale out of Kafka Streams apps built via Lenses SQL.

To configure which execution mode update set the lenses.sql.execution.mode.

In Process

IN_PROC is the default execution, set the lenses.sql.execution.mode to IN_PROC.

# Set up Lenses SQL processing engine
lenses.sql.execution.mode = "IN_PROC" // "CONNECT" // "KUBERNETES"
lenses.sql.state.dir = "logs/lenses-sql-kstream-state"

Kafka Connect

To configure Lenses for CONNECT execution mode:

  1. Edit the lenses.conf file and set the SQL execution mode to CONNECT
  2. Add one or more connect-distributed endpoints for each of your Lenses SQL enabled clusters in the lenses.connect.clusters configuration option.

The resulting lenses.conf should look like this:

lenses.connect.clusters = [{name: "sql-cluster", url: "http://localhost:8083", statuses: "connect-statuses", config: "connect-configs", offsets: "connect-offsets" }]
....
# Set up Lenses SQL processing engine
lenses.sql.execution.mode = "CONNECT"
lenses.sql.state.dir = "logs/lenses-sql-kstream-state"

This configuration tells Lenses that the processor execution mode is CONNECT and where to find the Lenses SQL enabled connect clusters.

Warning

When scaling out with CONNECT, the lenses.sql.state.dir must be created on all workers in any SQL enabled Connect Cluster! This maps to the connect.sql.state.store.dir connector option when used with Lenses.

Connector Install

The connector needs to be available to each worker in the Kafka Connect Cluster intended for SQL. The best way to archive this is via the isolated classpath loader introduced into Connect in Kafka version 0.11.

  1. Create a folder called plugins/lib and place the Lenses SQL Connector jar inside
  2. Set the plugin.path in the worker properties file to the location of the jar
  3. Restart the Connect worker.
#  create folder
mkdir -p plugins/lib

# copy in the jar
cp lenses-sql-runners-x.x.x-all.jar plugins/lib

# add plugins path to the worker properties file, ensure this is the only uncommented entry
echo $PWD/plugins > config/connect-distributed.properties

# restart the workers
bin/connect-distributed.sh config/connect-distributed.properties

If you are using Kafka versions 0.10.x the plugin.path classloader isolation is not available then set the connector first on the classpath

export CLASSPATH=lenses-sql-runners-x.x.x-all.jar

Lenses will scan the Connect cluster specified in the lenses.connect.clusters option for the Lenses SQL connector class and make them available for selection when submitting processors. You can check if the SQL runner is correctly picked with the Connect CLI.

~|⇒ connect-cli plugins
Class name: com.landoop.connect.SQL, Type: source, Version: X.X.X
Class name: org.apache.kafka.connect.file.FileStreamSinkConnector, Type: sink, Version: 0.11.0.0-cp1
Class name: org.apache.kafka.connect.file.FileStreamSourceConnector, Type: source, Version: 0.11.0.0-cp1
~|

Kubernetes

To enable scale-out processing via Kubernetes the lenses.sql.execution.mode needs to be set to KUBERNETES. Additionally, Lenses requires access to a kubectl config file and Kubernetes requires access to Landoops Container Registry.

# kubernetes configuration
lenses.kubernetes.image.name = ""
lenses.kubernetes.image.tag = ""
lenses.kubernetes.config.file = "/home/lenses/.kube/config"
lenses.kubernetes.service.account = "default"

The Docker images for the Lenses SQL Runners are hosted in Landoops container registry. Kubernetes requires an image pull secret to be set up for each namespace you wish to deploy the Lenses SQL Runners too.

Enterprise customers will be provided with credentials to access the registry. For each namespace, you wish to deploy to the script bin/configure-image-secret can be run to set up the image pull secret:

./configure-image-secret landoop lenses-sql gce-credentials.json username@example.com https://eu.gcr.io default

The options for the script are, in ordinal position.

argument Description
context Kubectl context to use
namespace Namespace to create the secret in
json_key_path
The path to the GCE service
account user credential file
email
The email to use, require for creating
a docker-registry secret in Kubernetes
gcr_registry The google container registry url
service_account
The kubernetes service account to patch.
This is optional. The ‘default’ service account is
patched in the namespace if not set

If you are not using the default service account you need to set the correct service account in lenses.kubernetes.service.account option in the lenses.conf. This tells Lenses to deploy the pods using this service account.

Kubernetes Services

The Lenses SQL runners requires access to the Kafka brokers, Zookeeper and optionally the Schema Registry. If you are running Lenses inside Kubernetes it is recommended to use Kubernetes services for these endpoints to abstract the configuration, avoiding updates in case the Kafka cluster topology changes. Lenses currently does not support creating these services.

If you are using Kubernetes services, the names should match those set in the lenses.conf file. For example, assuming a service and endpoint have been created in the default namespace called Kafka as follows:

        # Service
        kind: Service
        apiVersion: v1
        metadata:
        name: kafka
        spec:
        ports:
        - port: 9092

The lenses.conf file should specify kafka for the kafka.brokers. The same applies for the other services .i.e zookeeper and optionally the Schema Registry.

        lenses.kafka.brokers= "PLAINTEXT://kafka:9092"
        lenses.zookeeper.hosts = "zookeeper:2181"
        lenses.schema.registry.urls = "http://schema-registry:8081"

Runner Helm Chart

Helm is a package manager for Kubernetes which allows you to set out in configuration which image you want, the container specs, the application environment and the labeling and annotations in Kubernetes that allow for monitoring.

For a current list of our existing Helm Charts please visit our repo. The Lenses Connector SQL processor chart, available for Enterprise users, is packaged in the SQL runner connector release.

To deploy the SQL runner Helm Chart, edit the values.yaml accordingly or set them at the command line. The values.yaml contains all the options previously described.

# Add repos other connector charts
helm repo add landoop https://landoop.github.io/kafka-helm-charts/

# Install with values.yaml in dry run mode
helm install charts/lenses-sql-processor-runner --name my-stream --namespace lenses --dry-run --debug

# Install
helm install charts/lenses-sql-processor-runner --name my-stream --namespace lenses

# Install and override with different values from a file
helm install -f myvalues.yaml ./helm

# Install and override with different values from command line
helm install install charts/lenses-sql-processor-runner --name my-stream --namespace lenses --set connect.sql.app.id=landoop

Warning

Lenses will pick up and track deployments created via Helm however if you modify or delete via Lenses, Helm is not aware of these changes. Future releases of Lenses will address this.

Important

The connector and Kubernetes artifacts are delivered after an initial commercial agreement

Topology

When using Kafka Connect and LSQL, Lenses is able to build a graph of data flows which can be visualized via the user interface. This provides a high-level view of how the data moves in and out of Kafka. LSQL processors (Kafka Streams applications) are managed by Lenses and the full information about them is available. However, this is not the case for Kafka Connect connectors. For those, a configuration entry is required to be able to understand whether it is a sink or source and if it is a source which topics it outputs to.

Configuration

For Topology to work at its best configuration needs to be provided for connectors.

lenses {

  ...

  connectors.info = [
      {
           class.name = "The connector full class path"
           name = "The name which will be presented in the UI"
           instance = "Details about the instance. Contains the field for which to get the information. If is a database could be connection, if it is file is the file property,etc"
           sink = true
           extractor.class = "The full classpath for the implementation knowing how to extract the connector information"
           icon = "file.png"
           description = "A description for the connector"
           color = "#b1b1b1"
           author = "The connector author"
      }
      ...
  ]
}

Here are some of the entries the default Lenses configuration provides automatically - of course, all our connectors are covered already.

lenses {

  ...

  connectors.info = [
    {
      class.name = "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector"
      name = "Cassandra"
      instance = "connect.cassandra.contact.points"
      sink = true
      extractor.class = "com.landoop.kafka.lenses.connect.SimpleTopicsExtractor"
      icon = "cassandra.jpg"
      description = "Store Kafka data into Cassandra"
      color = "#1a9f85"
      docs = "//docs.datamountaineer.com/en/latest/cassandra-sink.html"
      author = "Landoop"
    },
    {
      class.name = "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector"
      name = "Cassandra"
      instance = "connect.cassandra.contact.points"
      sink = false
      property = "connect.cassandra.kcql"
      extractor.class = "com.landoop.kafka.lenses.connect.KcqlInsertTopicsExtractor"
      icon = "cassandra.jpg"
      description = "Extract Cassandra data using the CQL driver into Kafka"
      docs = "//docs.datamountaineer.com/en/latest/cassandra-source.html"
      author = "Landoop"
    },
    {
      class.name = "com.datamountaineer.streamreactor.connect.ftp.source.FtpSourceConnector"
      name = "Ftp"
      instance = "connect.ftp.address"
      sink = false
      property = "connect.ftp.monitor.tail,connect.ftp.monitor.update"
      extractor.class = "com.landoop.kafka.lenses.connect.FtpTopicsExtractor"
      icon = "ftp.png"
      description = "Tail remote FTP folders and bring messages in Kafka"
      color = "#b1b1b1"
      docs = "//docs.datamountaineer.com/en/latest/ftp-source.html"
      author = "Landoop"
    },
    {
      class.name = "com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector"
      name = "Jms"
      instance = "connect.jms.url"
      sink = false
      property = "connect.jms.kcql"
      extractor.class = "com.landoop.kafka.lenses.connect.KcqlInsertTopicsExtractor"
      icon = "jms.png"
      description = "Get data from JMS into Kafka"
      color = "pink"
      docs = "//docs.datamountaineer.com/en/latest/jms-source.html"
      author = "Landoop"
    },
    {
      class.name = "org.apache.kafka.connect.file.FileStreamSink"
      name = "File"
      instance = "file"
      sink = true
      extractor.class = "com.landoop.kafka.lenses.connect.SimpleTopicsExtractor"
      icon = "file.png"
      description = "Store Kafka data into files"
      color = "#b1b1b1"
      author = "Apache Kafka"
    },
    {
      class.name = "org.apache.kafka.connect.file.FileStreamSource"
      name = "File"
      instance = "file"
      sink = false
      property = "topic"
      extractor.class = "com.landoop.kafka.lenses.connect.SimpleTopicsExtractor"
      icon = "file.png"
      description = "Tail files or folders and stream data into Kafka"
      color = "#bbb2b2"
      author = "Apache Kafka"
    },
    ...
  ]
}

Note

If the connector configuration is not present in the configuration, Lenses will ignore the connector and therefore it won’t show up in the topology view.

Key Description Optional Type Default
class.name The connector full class name. i.e. org.apache.kafka.connect.file.FileStreamSource` No String N/A
name The name as it will appear in the topology view. For example: File No String N/A
instance
Contains the connector configuration key(-s) the extractor instance
will use to get the information. Consider the FTP source the value is set to
connect.ftp.monitor.tail,connect.ftp.monitor.update. The extractor will get the
result of both those fields and provide a list of involved topics
No String N/A
extractor.class
The class path for the connector information data provided.
Lenses provides a few implementations out of the box.
No String N/A
icon
The path to a an icon file the UI will use to display

|the connector.

Yes string N/A
description A short sentence to say what the connector does Yes string N/A
color A RGB colour value for the UI to display if the icon is not present Yes string N/A
author Who is providing the connector Yes string N/A

Lenses provides these classes to extract the information from a Kafka Connect connector configuration:

Class Description
com.landoop.kafka.lenses.connect.KcqlInsertTopicsExtractor
A class responsible for extracting the target Kafka topics defined by
a KCQL statement. This targets our own source connectors providing
an easy way to describe their Action via SQL like syntax.
The class can extract from syntax like `INSERT INTO $TOPICA SELECT ...`
com.landoop.kafka.lenses.connect.FtpTopicsExtractor
A class responsible
for extracting the target topics for the FTP source
com.landoop.kafka.lenses.connect.SimpleTopicsExtractor
A class responsible
for providing the vanilla value of a connector configuration entry

Option Reference

Config Description Required Type
lenses.license.file The full path to the license file yes string
lenses.akka.system.name Akka Http actor system name no string
lenses.ip Bind HTTP at the given endpoint. Used in conjunction with lenses.port no string
lenses.port The HTTP port the HTTP server listens for connections on no int
lenses.akka.request.timeout.ms The maximum time in milliseconds to wait for an Akka Actor to reply no int
lenses.akka.actors.warmup.ms Time to allow Lenses actor to warm up on startup no int
lenses.zookeeper.hosts
Specifies the ZooKeeper connection string. Use the form hostname:port
where host and port are the host and port of a ZooKeeper server. Provide
multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3
and you can add a /znode path if you are using it
yes string
lenses.zookeeper.max.session.ms
The max time in milliseconds to wait for the Zookeeper server to
reply for a request. The implementation requires that the timeout be a
minimum of 2 times the tickTime (as set in the server configuration)
no int
lenses.curator.initial.sleep.time.ms
The initial amount of time to wait between retries when reading JMX
metrics for the brokers, schema registry, connect, zk services
no int
lenses.curator.retries
The number of attempts to read the metrics from JMX endpoints for the brokers,
schema registry, zk and connect services
no int
lenses.zookeeper.max.connection.ms
The time in msec to wait for the Zookeeper client to establish
a new connection
no int
lenses.kafka.brokers
A list of host/port pairs to use for establishing the initial connection
to the Kafka cluster. The client will make use of all servers irrespective of
which servers are specified here for bootstrapping, this list only impacts the
initial hosts used to discover the full set of servers. This list should be in
the form host1:port1,host2:port2,. Since these servers are just used for the
initial connection to discover the full cluster membership (which may change
dynamically), this list does not require ALL brokers to be specified.
yes string
lenses.schema.registry.urls
A list of host/port pairs to use for establishing the connection to the Schema
Registry cluster. This list should be in the form host1:port1,host2:port2,.
List all instances in case some of them are down or list the load balancer
address we one is used.
yes string
lenses.schema.registry.delete
Allows for the SchemaRegistry delete action
to be enabled in the UI. It requires SchemaRegistry from version 3.3.0 onwards.
By not providing the setting the default setting will disable it
no boolean
lenses.connect
The Kafka Connect clusters to use.
List of key/value of cluster name and host:port
no string
lenses.schema.registry.check.ms
The interval in milliseconds to check the instances of Schema Registry are
up and running
no long
lenses.kafka.control.topics List of Kafka topics to be marked as system topics no string
lenses.kafka.ws.max.poll.records
This is part of the WebSocket functionality for Kafka. It specifies the
maximum number of records returned in a single call to poll(). It will impact
how many records will be pushed at once to the WS client
no int
lenses.sql.execution.mode The SQL execution mode, IN_PROC or CONNECT yes string
lenses.sql.metric.topic The topic to listen for SQL Connector KStream metrics on no string
lenses.sql.connect.metric.frequency
The frequency, in milliseconds a SQL Connector should
return metrics
no Long
lenses.sql.status.topic The status backing topic of the SQL enabled Connect clusters no string
lenses.sql.state.dir
Directory location for state store for KStream flows. This location must exist
on each worker of a SQL enabled Kafka Connect Cluster.
no string
lenses.interval.consumers
The interval in milliseconds to read the information about all consumers from
Kafka
no int
lenses.kafka.settings.consumer
Allow additional Kafka consumer settings to be specified. When Lenses creates
an instance of KafkaConsumer class it will use these properties during initialization
no string
lenses.kafka.settings.producer
Allow additional Kafka producer settings to be specified. When Lenses creates
an instance of KafkaProducer class it will use these properties during initialization
no string
lenses.kafka.settings.kstream
Allow additional Kafka KStream settings to be specified. When Lenses creates
an instance of KStream class it will use these properties during initialization
no string
lenses.kafka.read.max.polling.ms
When reading data from a Kafka topic, this is the time in milliseconds the
call to brokers will block waiting on new records
no int
lenses.kafka.read.max.records
The maximum number of records to read from a Kafka topic at once
Used for topic browsing functionality
no int
lenses.access.control.allow.origin Restrict cross-origin HTTP requests made from within Lenses client no string
lenses.access.control.allow.methods
Restrict the HTTP verbs allowed to initiate a cross-origin HTTP
request made from within Lenses client
no string
lenses.allow.weak.SLL
Allows connecting with https:// services even if they are using
self-signed certificates
no boolean
lenses.sqlSample Number of messages to take in every sampling attempt no int
lenses.sqlSampleWindow How frequently to sample a topic for new messages no int
lenses.kafka.ws.poll.ms
This is part of the WebSocket functionality for Apache Kafka. The amount of
time to wait before it reads records from the Kafka consumer
no int
lenses.kafka.ws.consumer.buffer.size
This is part of the WebSocket functionality for Apache Kafka. The
number of records to cache for the client. If the client can’t consume them fast
enough records will be dropped
no int
lenses.kafka.ws.heartbeat.ms
This is part of the WebSocket functionality. The interval to
send messages to the client in order to keep the TCP connection opened
no int
lenses.security.mode
Establishes which mechanism to use for authentication/authorization. Available
values are: NONE, BASIC and LDAP. For NONE, all requests are allowed.
BASIC uses a user-password-role approach. If LDAP is chosen then the authentication
is made against the given LDAP server and authorization is based on LDAP GROUPS.
no string
lenses.security.user
This only applies if lenses.security.mode has been set to BASIC.
The entry should contain an array of user structures defining the user, password
and access rights. Lenses support the following roles: Admin, Write, Read
no array
lenses.security.ldap.url
This only applies if lenses.security.mode has been set to LDAP
Contains url of the LDAP server
no string
lenses.security.ldap.base
This only applies if lenses.security.mode has been set to LDAP
Set the base suffix from which all operations should origin. If a base suffix is set,
you will not have to (and, indeed, must not) specify the full distinguished names in any
operations performed
no string
lenses.security.ldap.user
This only applies if lenses.security.mode has been set to LDAP
Set the user distinguished name (principal) to use for getting authenticated contexts
no string
lenses.security.ldap.password
This only applies if lenses.security.mode has been set to LDAP
The password (credentials) to use for getting authenticated contexts when querying LDAP
no string
lenses.security.ldap.login.filter
The LDAP search filter - must result in a unique result. See
default value. <user> is required since is replaced at runtime with the
current user id
no string
lenses.security.memberof.key
Your LDAP member of key entry. This is the key for which a role is
attached to the user entry. For example, memberOf: cn=AdminR,ou=Groups,dc=jboss,dc=org links
AdminR role to the current user entry
no string
lenses.security.ldap.person.name.key
Your LDAP person entry attribute containing the user full name.
The default value if the configuration is not provided is sn
no string
lenses.security.group.extract.regex
The regular expression syntax to extra the role for each memberof
(see above) entry. The default value matches the earlier example for memberof
no string
lenses.security.ldap.roles.admin
This only applies if lenses.security.mode has been set to LDAP
Contains all the LDAP groups allowing full admin rights for Lenses
no string[]
lenses.security.ldap.roles.read
This only applies if lenses.security.mode has been set to LDAP
Contains all the LDAP groups allowing read access for Lenses
no string[]
lenses.security.ldap.roles.nodata
This only applies if lenses.security.mode has been set to LDAP
Contains all the LDAP groups allowing access without seeing data
no string[]
lenses.alert.buffer.size
The number of last raised alerts to keep. The client can call to get the alerts
raised and therefore the response size is linked directly to this number and of
course how many alerts have been raise
no int
lenses.alert.consumers.lag.threshold
The threshold beyond which an alert is raised for a consumer being
too slow reading data of a topic and partition in milliseconds
no int
lenses.jmx.port The port to start the JMX agent to allow Lenses monitoring no int
lenses.jmx.brokers The JMX ports for the Kafka Brokers. The format is hostname:port[,hostname:port] no string
lenses.jmx.zookeeper The JMX ports for the Zookeeper instances. The format is hostname:port[,hostname:port] no string
lenses.jmx.connect The JMX ports for the Kafka Connect instances. The format is hostname:port[,hostname:port] no string
lenses.jmx.schema.registry The JMX ports for the Schema Registry instances. The format is hostname:port[,hostname:port] no string
lenses.jmx.kafka.lenses The JMX ports for the Lenses instances. The format is hostname:port[,hostname:port] no string
lenses.kubernetes.image.name The Lenses SQL runner image name including the repository url no string
lenses.kubernetes.image.tag The Lenses SQL runner image tag no string
lenses.kubernetes.config.file The location of the kubectl config file for cluster configurations no string
lenses.kubernetes.service.account
The service account to deploy with. Requires
image pull access to lenses.kubernetes.image.name
no string
lenses.kubernetes.runner.mem.limit   no string
lenses.kubernetes.runner.mem.request   no string
lenses.kubernetes.runner.java.opts   no string

Default Values

Config Default
lenses.ip 0.0.0.0
lenses.port 9991
lenses.jmx.port 9015
lenses.akka.system.name lenses
lenses.akka.request.timeout.ms 1000
lenses.akka.actors.warmup.ms 2000
lenses.zookeeper.max.session.ms 10000
lenses.zookeeper.max.connection.ms 10000
lenses.curator.initial.sleep.time.ms 3000
lenses.curator.retries 3
lenses.schema.registry.check.ms 30000
lenses.kafka.ws.max.poll.records 1000
lenses.sql.execution.mode IN_PROC
lenses.sql.metric.topic _kafka_lenses_metrics
lenses.sql.metric.return.frequency 5000
lenses.sql.status.topic connect-statuses
lenses.sql.state.dir logs/lenses-kafka-streams
lenses.interval.consumers 30000
lenses.kafka.read.max.polling.ms 1000
lenses.kafka.read.max.records 1000
lenses.access.control.allow.origin *
lenses.access.control.allow.methods GET,POST,PUT,DELETE,OPTIONS
lenses.allow.weak.SSL true
lenses.sqlSample 2
lenses.sqlSampleWindow 200
lenses.kafka.ws.poll.ms 1000
lenses.kafka.ws.consumer.buffer.size 10000
lenses.kafka.ws.heartbeat.ms 30000 – TODO interval
lenses.security.mode BASIC
lenses.security.ldap.login.filter (&(objectClass=person)(sAMAccountName=<user>))
lenses.security.ldap.memberof.key memberOf
lenses.security.group.extract.regex (?i)CN=(\w+),ou=Groups.*
lenses.alert.buffer.size 100
lenses.alert.consumers.lag.threshold 10000
lenses.kafka.control.topics connect-configs, connect-offsets, connect-statuses, _schemas, __consumer_offsets
lenses.kubernetes.image.name eu.gcr.io/k8-engine/lenses-sql-processor
lenses.kubernetes.image.tag g7a59303
lenses.kubernetes.config.file /home/lenses/.kube/config
lenses.kubernetes.service.account default
lenses.kubernetes.runner.mem.limit 512Mi
lenses.kubernetes.runner.mem.request 256Mi
lenses.kubernetes.runner.java.opts
-Xms256 -Xmx512m -XX:MaxPermSize=128m -XX:MaxNewSize=128m
-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35
-XX:+DisableExplicitGC -Djava.awt.headless=true

Example

# Set the ip:port for Lenses to bind to
lenses.ip = 0.0.0.0
lenses.port = 9991
#lenses.jmx.port = 9992

# License file allowing connecting to up to N brokers
lenses.license.file = "license.json"

# topics created on start-up that Lenses uses to store state
lenses.topics.audits = "_kafka_lenses_audits"
lenses.topics.cluster = "_kafka_lenses_cluster"
lenses.topics.metrics = "_kafka_lenses_metrics"
lenses.topics.profiles = "_kafka_lenses_profiles"
lenses.topics.processors = "_kafka_lenses_processors"

# Set up infrastructure end-points
lenses.kafka.brokers = ""           // "PLAINTEXT://localhost:9092"
lenses.zookeeper.hosts = ""         // "localhost:2181"
lenses.schema.registry.urls = ""    // "http://localhost:8081"
lenses.connect.clusters = []        // [{name: "dev", url: "http://localhost:8083", statuses: "connect-statuses", config: "connect-configs", offsets: "connect-offsets" }]

# Set up monitoring end-points
lenses.jmx.schema.registry = ""     // "schema-host:jmx-port"
lenses.jmx.zookeepers = ""          // "zkhost1:jmx-port,zkhost2:jmx-port,zkhost3:jmx-port"
lenses.jmx.connect = []             // [ {dev: "kafka-connect-dev-host-or-ip:jmp-port"} , .. ]

# Set up integrations with Prometheus and Grafana
lenses.grafana = ""

# Security by default to BASIC security. Alternative LDAP and NONE that is not recommended
lenses.security.mode = BASIC
lenses.security.users = [
  {"username": "admin", "password": "admin", "displayname": "Lenses Admin", "roles": ["admin", "write", "read"]},
  {"username": "write", "password": "write", "displayname": "Write User", "roles": ["read", "write"]},
  {"username": "read", "password": "read", "displayname": "Read Only", "roles": ["read"]}
]

# Set up Lenses SQL processing engine
lenses.sql.execution.mode = "IN_PROC"    // "CONNECT" // "KUBERNETES"
lenses.sql.state.dir = "logs/lenses-sql-kstream-state"
lenses.sql.monitor.frequency = 5000
lenses.sql.connect.connector.class = "com.landoop.connect.SQL"
lenses.sql.sample.default = 2 // // Sample 2 messages every 200 msec
lenses.sql.sample.window = 200

# kubernetes configuration
lenses.kubernetes.image.name = "eu.gcr.io/landoop/lenses-sql-processor" # TODO eu.gcr.io/k8-engine/lenses-sql-processor
lenses.kubernetes.image.tag = "0.0.3"
lenses.kubernetes.config.file = "/home/lenses/.kube/config"
lenses.kubernetes.service.account = "default"
lenses.kubernetes.pull.policy = "IfNotPresent"
lenses.kubernetes.runner.mem.limit = "512Mi"
lenses.kubernetes.runner.mem.request = "256Mi"
lenses.kubernetes.runner.java.opts = "-Xms256 -Xmx512m -XX:MaxPermSize=128m -XX:MaxNewSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true"

# Set access control
lenses.access.control.allow.methods = "GET,POST,PUT,DELETE,OPTIONS"
lenses.access.control.allow.origin = "*"

# Schema Registry topics and whether to allow deleting schemas in schema registry
lenses.schema.registry.topics = "_schemas"
lenses.schema.registry.delete = false

# Zookeeper connections configs
lenses.curator.retries = 3
lenses.curator.initial.sleep.time.ms = 2000
lenses.zookeeper.max.session.ms = 10000
lenses.zookeeper.max.connection.ms = 10000

lenses.kafka.control.topics = ["connect-configs", "connect-offsets", "connect-statuses", "_schemas", "__consumer_offsets", "_kafka_lenses_", "lsql_"]