Configure Lenses

Lenses use two configuration files. The main configuration file, lenses.conf contains runtime configuration options. Security-related configurations are stored in a separate file security.conf. This way sensitive data can be protected by administrators, whilst configuration may be accessible from team members. The full list of options can be found here.

License

In order to run, Lenses requires a license file which can be obtained by contacting us. Once you have received your license, store it in a file ( i.e. license.json) and update the configuration to point to it. Make sure the configuration value contains the full file path.

# License file allowing connecting to up to N brokers
lenses.license.file="license.json"

Third Party Licenses

The license folder contains the third party licenses for all the open source software and libraries Lenses uses. A complete list is also available at https://landoop.com/third-party-software

Host and Port

During startup, Lenses will bind to the IP and port settings in the configuration file. Use the ip and port configuration entries to set a different value. By default Lenses binds to port 9991.

# Set the ip:port for Lenses to bind to
lenses.ip = 0.0.0.0
lenses.port = 9991

Java Options

The following environment variables control the Java configuration options when starting Lenses:

  • LENSES_HEAP_OPTS - The heap space settings, the default is -Xmx3g -Xms512m
  • LENSES_JMX_OPTS - JMX options so set
  • LENSES_LOG4J_OPTS - Logging options
  • LENSES_PERFORMANCE_OPTS - Any extra options, default is -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true

Logging

Lenses uses Logback for logging. The logback.xml is picked up from the installation folder. To point to a different location for the Logback configuration file just export LENSES_LOG4J_OPTS as you see below:

export LENSES_LOG4J_OPTS="-Dlogback.configurationFile=file:mylogback.xml"

Log levels

Logback enables hot loading of changes to the logback.xml file. The default refresh period is 30 seconds, and this can be adjusted via the configuration element:

<configuration scan="true" scanPeriod="30 seconds" >
  ...
</configuration>

Default log level is set to INFO. To change adjust the configuration in <logger name="akka" level="DEBUG"/>.

The default appenders are:

<logger name="akka" level="INFO"/>
<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR"/>
<logger name="com.typesafe.sslconfig.ssl.DisabledComplainingHostnameVerifier" level="ERROR"/>
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="ERROR"/>
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="WARN"/>
<logger name="org.apache.kafka.clients.consumer.internals.AbstractCoordinator" level="WARN"/>
<logger name="io.confluent.kafka.serializers.KafkaAvroDeserializerConfig" level="WARN"/>
<logger name="org.I0Itec.zkclient" level="WARN"/>
<logger name="org.apache.zookeeper" level="WARN"/>
<logger name="org.apache.calcite" level="OFF"/>

All the log entries are written to the output using the following pattern:%d{ISO8601} %-5p [%c{2}:%L] %m%n.

Log location

All the logs Lenses produces can be found in the logs directory; however we recommend following the Twelve-Factor App approach to logging and log to stdout, especially when using a container orchestration engine such as Kubernetes. Leave log collection to agents such as filebeats, logstash, fluentd and flume.

Security

Options related to Lenses security starting with lenses.security.*** should be stored in a separate file, named security.conf. You can set the path to the security configuration file in the main configuration file lenses.conf via the key lenses.secret.file. This way security.conf can be managed only by the administration team and have more tight access control than the rest of the configuration.

Lenses has support for the following login modes: BASIC and LDAP. The security mode is configured through the lenses.security.mode option. If set to BASIC the users and roles can be set via the lenses.security.users option. For example:

# Security by default to is set to BASIC, alternatively LDAP.
lenses.security.mode=BASIC

# Define the user groups and their roles. At least one user group needs to be set
lenses.security.groups=[
     {"name": "adminGroup", "roles": ["admin", "write", "read"]},
     {"name": "writeGroup", "roles": ["read", "write"], topic: { blacklist: ["payment.*"] },
     {"name": "readGroup",  "roles": ["read"], topic: { whitelist: [ "users.*" ] },
     {"name": "nodataGroup",  "roles": ["nodata"]}
]

#Define the users and link each one to the group(-s) it belongs
lenses.security.users=[
  {"username": "admin", "password": "admin999", "displayname": "Lenses Admin", "groups": ["adminGroup"]},
  {"username": "write", "password": "write1", "displayname": "Write User", "groups": ["writeGroup"]},
  {"username": "read", "password": "read1", "displayname": "Read Only", "groups": ["readGroup"]},
  {"username": "nodata", "password": "nodata1", "displayname": "No Data", "groups": ["nodataGroup"]}
]

Note

If a role has been added to admin it will inherit write, read and nodata automatically. For write role it will automatically inherit read and nodata as well. The permission matrix contains additional details on roles and access levels.

Both LDAP and BASIC require lenses.security.groups. This entry specifies for each group the roles but also, optionally, which topics can be accessed through a whitelist/blacklist approach. In the example above, the writeGroup will get to access all topics apart from the ones starting with payment. The readGroup has been set using the whitelisting approach, and will only allow access to topics with names starting with users. The entries for the topic.whitelist and topic.blacklist are expected to be regular expressions.

Note

You can control which topics a user group can access. This is how Lenses handles multi-tenancy over Kafka. Leaving out the setting means the group will be able to access all topics. If a topic is restricted no LSQL processor or Connect instance can be created by the users in the group.

LDAP

If you want to map LDAP roles to Lenses roles you have to set the lenses.security.mode value to LDAP and then use the ldap configuration section to provide the settings.

Each enterprise LDAP setup might be different, therefore there is a plugin functionality in order to have a custom implementation for retrieving the user role list. The project template for a custom implementation can be found on Github. With the implementation ready, all that is required is to drop the jar file into Lenses lib folder and set the configuration entry lenses.security.ldap.plugin to point to the implementation full classpath.

Lenses provides out-of-the-box a default implementation via com.landoop.kafka.lenses.security.LdapMemberOfUserGroupPlugin class. Here is the template for the LDAP configuration section:

lenses.security.mode=LDAP
lenses.security.ldap.url="ldaps://mycompany.com:636"
lenses.security.ldap.base="OU=Users,DC=mycompany,DC=com"
lenses.security.ldap.user="$LDAP_USER"
lenses.security.ldap.password="$LDAP_USER_PASSWORD"
lenses.security.ldap.filter="(&(objectClass=person)(sAMAccountName=<user>))"

//LDAP roles retriever settings
lenses.security.ldap.plugin.class="com.landoop.kafka.lenses.security.LdapMemberOfUserGroupPlugin"
lenses.security.ldap.plugin.group.extract.regex="(?i)CN=(\\w+),ou=ServiceGroups.*"
lenses.security.ldap.plugin.memberof.key="memberOf"
lenses.security.ldap.plugin.person.name.key = "sn"
Key Description Optional Type Default
url The LDAP server url. For example: ldap://mycompany.com:10389 No String N/A
base Your LDAP base. For example: dc=jboss,dc=org No String N/A
user Your LDAP user. For example: uid=admin,ou=system No String N/A
password Your LDAP user password. No String N/A
filter
The LDAP search filter - must result in a unique result.
See default value. <user> is required since is replaced
at runtime with the current user id.
Yes String (&(objectClass=person)(sAMAccountName=<user>))
plugin.class
Contains the full classpath for the LDAP roles retriever

|implementation

Yes string N/A
plugin.memberof.key
Your LDAP member of key entry. This is the key for which a
role is attached to the user entry. For example,
memberOf: cn=AdminR,ou=Groups,dc=jboss,dc=org - links
AdminR role to the current user entry.
Yes String memberOf
plugin.person.name.key
Your LDAP person entry attribute containing the user full name.
The default value if the configuration is not provided is sn.
yes string sn
plugin.group.extract.regex
The regular expression syntax to extra
the role for each ``memberof``(see above) entry.
The default value matches the earlier example for memberof.
Yes String (?i)CN=(\\w+),ou=Groups.*

Note

The configuration entries lenses.security.ldap.plugin.memberof.key, lenses.security.ldap.plugin.person.name.key and lenses.security.ldap.plugin.group.extract.regex are specific to the implementation Lenses provides out of the box. Any custom implementation may require different entries under lenses.security.ldap.plugin

Here is a sample configuration LDAP enabled Lenses:

lenses.security.mode=LDAP
lenses.security.ldap.url="ldaps://landoop.ldap.url:636"
lenses.security.ldap.base="DC=landoop,DC=com"
lenses.security.ldap.password=*****
lenses.security.ldap.user="UID=smiths,OU=ServiceAccounts,DC=landoop,DC=com"
lenses.security.ldap.filter="(&((objectclass=person)(CN=<user>)))"

lenses.security.ldap.plugin.class="com.landoop.kafka.lenses.security.LdapMemberOfUserGroupPlugin"
lenses.security.ldap.plugin.memberof.key="memberOf"
lenses.security.ldap.plugin.group.extract.regex="(?i)CN=(\\w+),ou=ServiceGroups.*"
lenses.security.ldap.plugin.person.name.key ="sn"

Service Accounts

Service accounts allow easier integration with Lenses API. A typical use case is enabling your CI/CD tools to interact with Lenses. Via the lenses.security.service.accounts specific users and their authorization token can be defined. Here is how two service accounts can be created:

lenses.security.groups=[
      {"name": "group1", "roles": ["admin", "write", "read"]},
      {"name": "group2", "roles": ["read", "write"], topic: { blacklist: ["payment.*"] },
      {"name": "group3",  "roles": ["read"], topic: { whitelist: [ "users.*" ] },
]

lenses.security.service.accounts=[
  {
    "username": "jenkins",
    "token": "1231543kn!!1",
    "groups": ["group1", "group2"]
  },
  {
    "username": "goCli",
    "token": "12345678",
    "groups": ["group3"]
  }
]

Your CI system (like Jenkins) and the Lenses Go CLI tool can call into the API without having to first login. All that is required is for every HTTP request to contain the HTTP header: X-Kafka-Lenses-Token:$TOKEN.

Each service account is of course linked to a user group in order to restrict the actions it can execute.

SSL Encryption

If your Kafka cluster uses TLS certificates for authentication, set the broker protocol to SSL and then pass in any keystore and truststore configurations to the consumer, producer and KStream settings by prefixing with lenses.kafka.settings. such as:

lenses.kafka.settings.consumer.security.protocol=SSL
lenses.kafka.settings.consumer.ssl.truststore.location=/var/private/ssl/client.truststore.jks
lenses.kafka.settings.consumer.ssl.truststore.password=test1234
lenses.kafka.settings.consumer.ssl.keystore.location=/var/private/ssl/client.keystore.jks
lenses.kafka.settings.consumer.ssl.keystore.password=test1234
lenses.kafka.settings.consumer.ssl.key.password=test1234

lenses.kafka.settings.producer.security.protocol=SSL
lenses.kafka.settings.producer.ssl.truststore.location=/var/private/ssl/client.truststore.jks
lenses.kafka.settings.producer.ssl.truststore.password=test1234
lenses.kafka.settings.producer.ssl.keystore.location=/var/private/ssl/client.keystore.jks
lenses.kafka.settings.producer.ssl.keystore.password=test1234
lenses.kafka.settings.producer.ssl.key.password=test1234

lenses.kafka.settings.kstream.security.protocol=SSL
lenses.kafka.settings.kstream.ssl.truststore.location=/var/private/ssl/truststore.jks
lenses.kafka.settings.kstream.ssl.truststore.password=test1234
lenses.kafka.settings.kstream.ssl.keystore.location=/var/private/ssl/client.keystore.jks
lenses.kafka.settings.kstream.ssl.keystore.password=test1234
lenses.kafka.settings.kstream.ssl.key.password=test1234

SASL Authentication

In order for Lenses to access Kafka in an environment set up with Kerberos (SASL) you need to create a standard JAAS file and start Lenses:

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/full/path/keytab-file"
  storeKey=true
  useTicketCache=false
  serviceName="kafka"
  principal="kafka@MYREALM";
};

Then, add the additional option, before starting Lenses

export LENSES_OPTS="-Djava.security.auth.login.config=/opt/lenses/jaas.conf"

Last, set the security protocol in lenses configuration file:

lenses.kafka.settings.consumer.security.protocol=SASL_PLAINTEXT
lenses.kafka.settings.producer.security.protocol=SASL_PLAINTEXT
lenses.kafka.settings.kstreams.security.protocol=SASL_PLAINTEXT

By default, the connection to Zookeeper will remain unauthenticated. This only affects the Quota entries, which are written without any Zookeeper ACLs to protect them. The option lenses.zookeeper.security.enabled may be used to change this behaviour but it is very important in such case to use the brokers’ principal for Lenses. If Lenses is configured with a different principal, then the brokers will not be able to manipulate the Quota entries and will fail to start. Please contact our support if help is needed for this feature.

System Topics

Lenses is a state-less application and thus an excellent fit for Kubernetes or Openshift. During its startup it creates a few system topics for storing: monitoring, auditing, cluster, user profiles and processors information. These topics are configured by the topics configuration block:

# topics created on start-up that Lenses uses to store state
lenses.topics.audits = "_kafka_lenses_audits"
lenses.topics.cluster = "_kafka_lenses_cluster"
lenses.topics.metrics = "_kafka_lenses_metrics"
lenses.topics.profiles = "_kafka_lenses_profiles"
lenses.topics.processors = "_kafka_lenses_processors"
lenses.topics.alerts.storage = "_kafka_lenses_alerts"
lenses.topics.lsql.storage= "_kafka_lenses_lsql_storage"
lenses.topics.alerts.settings = "_kafka_lenses_alerts_settings"

Warning

These are the topics created and managed by the platform automatically. If you are using ACLs allow the user running the Lenses application permissions to manage these topics.

If ACLs are already enabled on your Kafka cluster set the ACLs for the Lenses user and server for the following topics _kafka_lenses_audits, _kafka_lenses_cluster, _kafka_lenses_metrics, _kafka_lenses_profiles, _kafka_lenses_processors, _kafka_lenses_alerts, _kafka_lenses_lsql_storage and _kafka_lenses_alerts_settings.

kafka-acls \
--authorizer-properties zookeeper.connect=my_zk:2181 \
--add \
--allow-principal User:Lenses \
--allow-host lenses-host \
--operation Read \
--operation Write \
--operation Alter \
--operation Delete \
--topic topic

JMX Monitoring

Enabling JMX

To use the full potential of Lenses to monitor the cluster, JMX should be enabled for Kafka Brokers, Schema Registry, Zookeeper and Kafka Connect instances. To enable JMX, JMX_PORT environment variable should be set for each running process with the desired port. Additional options should be set in order to access JMX remotely (from a different host) as shown below.

Kafka Brokers

Set JMX_PORT and KAFKA_JMX_OPTS environment variables:

export JMX_PORT=[JMX_PORT]
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=[JMX_PORT]"
Kafka Connect

Set JMX_PORT and KAFKA_JMX_OPTS environment variables:

export JMX_PORT=[JMX_PORT]
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=[JMX_PORT]"
Schema Registry

Set JMX_PORT and SCHEMA_REGISTRY_JMX_OPTS environment variables:

export JMX_PORT=[JMX_PORT]
export SCHEMA_REGISTRY_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=[JMX_PORT]"
Zookeeper

Set JMX_PORT and ZOOKEEPER_SERVER_OPTS environment variables:

export JMX_PORT=[JMX_PORT]
export ZOOKEEPER_SERVER_OPTS="$ZOOKEEPER_SERVER_OPTS -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=[JMX_PORT]"

Lenses configuration

Lenses integrates and monitors Kafka Brokers, Zookeepers, Schema Registries and Kafka Connect Clusters. To configure which cluster to monitor, set the corresponding endpoints in the lenses.conf file. If JMX is enabled for the services mentioned earlier set the jmx configuration entries as seen below.

# Set up infrastructure end-points
lenses.kafka.brokers        = ""     // "PLAINTEXT://host1:9092,PLAINTEXT://host2:9092"
lenses.zookeeper.hosts      = []     // [{url:"localhost:2181",jmx:"localhost:12181"}]
lenses.zookeeper.chroot     = ""     // "kafka"
lenses.schema.registry.urls = []     // [{url:"http://localhost:8081",jmx:"localhost:18081"}]
lenses.connect.clusters     = []     // [{name: "connectClusterA", statuses: "connect-statuses", configs: "connect-configs", offsets: "connect-offsets", urls:[{url: "http://localhost:8083", jmx:"localhost:18083"}] }]

# If using Grafana, set the url i.e. http://grafana-host:port
lenses.grafana = ""

To configure Kafka Connect correctly you must provide:

  1. Name for the cluster
  2. For each worker provide its REST endpoint and optionally the JMX connection
  3. The Kafka Connect backing topics for status, configs, and offsets

Note

The Kafka Brokers JMX endpoints are picked up automatically from Zookeeper.

Expose Lenses JMX

Lenses may expose its own JMX endpoint and therefore other systems can monitor it. To enable it, set the lenses.jmx.port option; to disable it comment out the entry. The Prometheus JMX exporter may also be used which will make Lenses metrics available to Prometheus. The JMX exporter configuration file and a jmx_exporter build are provided within the monitoring suite. JMX exporter can run as a java agent, in which case it must be set via the LENSES_OPTS environment variable:

export LENSES_OPTS="-javaagent:/path/to/jmx_exporter/fastdata_agent.jar=9102:/path/to/jmx_exporter/client.yml"

In order to monitor from remote hosts, JMX remote access should be enabled as well.

export LENSES_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=[JMX_PORT]"

Alert System Integration

Alertmanager

This is the preferred way to route alerts to downstream gateways. You can read more on Alertmanager here. Lenses can push alert notifications to the Alertmanager Web hook. To enable the functionality, provide the Alertmanager endpoint(s) via the following setting:

lenses.alert.manager.endpoints="http://host1:port1/api/v1/alerts,http://host1:port1/api/v1/alerts"

Slack

To integrate Lenses alerting with Slack, add an Incoming WebHook integration here. Select the #channel where Lenses will be posting alerts and copy the Webhook URL

lenses.alert.plugins.slack.enabled = true
lenses.alert.plugins.slack.webhook.url = "https://hooks.slack.com/services/SECRET/YYYYYYYYYYY/XXXXXXXX"
lenses.alert.plugins.slack.username = "lenses"
lenses.alert.plugins.slack.channel = "#devops"

SQL Processors

The Lenses SQL Engine allows users to browse and query topics and also build and execute Kafka Streams flows with a SQL like syntax. Three execution modes are currently available: IN_PROC, CONNECT and KUBERNETES. The last two are made available to Enterprise clients and offer fault tolerant and performant execution modes of Kafka Streams apps built via Lenses SQL.

To configure the execution mode update the lenses.sql.execution.mode.

See Configuring SQL Processors for more detials.

Topology

When using a Kafka Connector and/or Lenses SQL, Lenses will build a graph of all the data flows, and users can interact with this graph via the topology screen. This provides a high-level view of how your data moves in and out of Kafka. LSQL processors (Kafka Streams applications written with LSQL) are managed automatically. Out of the box, Lenses supports over 45 Kafka Connect connectors. To enable a custom connector, or a connector not supported out of the box, set lenses.connectors.info configuration entry.

Configuration

For Topology to work at its best configuration needs to be provided for connectors.

lenses {

  ...

  connectors.info = [
      {
           class.name = "The connector full classpath"
           name = "The name which will be presented in the UI"
           instance = "Details about the instance. Contains the connector configuration field which holds the information. If  a database is involved it would be  the DB connection details, if it is a file it would be the file path, etc"
           sink = true
           extractor.class = "The full classpath for the implementation knowing how to extract the Kafka topics involved. This is only required for a Source"
           icon = "file.png"
           description = "A description for the connector"
           author = "The connector author"
      }
      ...
  ]
}

Source connectors require an extra configuration in order to extract the topics information from the connector configuration. The extractor class should be: com.landoop.kafka.lenses.connect.SimpleTopicsExtractor. When using this extractor it is expected to provide also property configuration which specifies the field in the connector runtime configuration listing the topics to publish to. Below you can find an example of the file stream source:

class.name = "org.apache.kafka.connect.file.FileStreamSource"
name = "File"
instance = "file"
sink = false
property = "topic"
extractor.class = "com.landoop.kafka.lenses.connect.SimpleTopicsExtractor"

Here are some of the entries the default Lenses configuration provides automatically - of course, all Landoop connectors are already covered.

lenses {

  ...

  connectors.info = [
    {
      class.name = "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector"
      name = "Cassandra"
      instance = "connect.cassandra.contact.points"
      sink = true
      icon = "cassandra.jpg"
      description = "Store Kafka data into Cassandra"
      docs = "//lenses.stream/connectors/sink/cassandra.html"
      author = "Landoop"
    },
    {
      class.name = "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector"
      name = "Cassandra"
      instance = "connect.cassandra.contact.points"
      sink = false
      property = "connect.cassandra.kcql"
      extractor.class = "com.landoop.kafka.lenses.connect.KcqlInsertTopicsExtractor"
      icon = "cassandra.jpg"
      description = "Extract Cassandra data using the CQL driver into Kafka"
      docs = "//lenses.stream/connectors/source/cassandra.html"
      author = "Landoop"
    },
    {
      class.name = "com.datamountaineer.streamreactor.connect.ftp.source.FtpSourceConnector"
      name = "Ftp"
      instance = "connect.ftp.address"
      sink = false
      property = "connect.ftp.monitor.tail,connect.ftp.monitor.update"
      extractor.class = "com.landoop.kafka.lenses.connect.FtpTopicsExtractor"
      icon = "ftp.png"
      description = "Tail remote FTP folders and bring messages in Kafka"
      docs = "//lenses.stream/connectors/source/ftp.html"
      author = "Landoop"
    },
    {
      class.name = "com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector"
      name = "Jms"
      instance = "connect.jms.url"
      sink = false
      property = "connect.jms.kcql"
      extractor.class = "com.landoop.kafka.lenses.connect.KcqlInsertTopicsExtractor"
      icon = "jms.png"
      description = "Get data from JMS into Kafka"
      docs = "//lenses.stream/connectors/source/jms.html"
      author = "Landoop"
    },
    {
      class.name = "org.apache.kafka.connect.file.FileStreamSinkConnector"
      name = "File"
      instance = "file"
      sink = true
      extractor.class = "com.landoop.kafka.lenses.connect.SimpleTopicsExtractor"
      icon = "file.png"
      description = "Store Kafka data into files"
      author = "Apache Kafka"
    },
    {
      class.name = "org.apache.kafka.connect.file.FileStreamSourceConnector"
      name = "File"
      instance = "file"
      sink = false
      property = "topic"
      extractor.class = "com.landoop.kafka.lenses.connect.SimpleTopicsExtractor"
      icon = "file.png"
      description = "Tail files or folders and stream data into Kafka"
      author = "Apache Kafka"
    },
    {
      class.name = "io.debezium​.connector.mysql.MySqlConnector"
      name = "CDC MySQL",
      instance = "database.hostname"
      sink = false,
      property = "database.history.kafka.topic"
      extractor.class = "com.landoop.kafka.lenses.connect.SimpleTopicsExtractor"
      icon = "debezium.png",
      description = "CDC data from RDBMS into Kafka"
      docs="//debezium.io/docs/connectors/mysql/"
      author = "Debezium"
    }
    ...
  ]
}

Note

If the connector configuration is not present in the configuration, Lenses will ignore the connector and therefore it won’t show up in the topology view.

Key Description Optional Type Default
class.name The connector full class name. i.e. org.apache.kafka.connect.file.FileStreamSourceConnector No String N/A
name The name as it will appear in the topology view. For example: File No String N/A
instance
Contains the connector configuration key(-s) the extractor instance
will use to get the information. Consider the FTP source the value is set to
connect.ftp.monitor.tail,connect.ftp.monitor.update. The extractor will get the
result of both those fields and provide a list of involved topics
No String N/A
extractor.class
Used for Connect sources only to extract the target topics from the
connector runtime configuration.
No String N/A
icon
The path to an icon file the UI will use to display

|the connector.

Yes string N/A
description A short sentence to say what the connector does Yes string N/A
author Who is providing the connector Yes string N/A

Lenses provides these classes to extract the information from a Kafka Connect connector configuration:

Class Description
com.landoop.kafka.lenses.connect.KcqlInsertTopicsExtractor
A class responsible for extracting the target Kafka topics defined by
a KCQL statement. This targets our own source connectors providing
an easy way to describe their Action via SQL like syntax.
The class can extract from syntax like INSERT INTO $TOPICA SELECT ...
com.landoop.kafka.lenses.connect.FtpTopicsExtractor
A class responsible
for extracting the target topics for the FTP source
com.landoop.kafka.lenses.connect.SimpleTopicsExtractor
A class responsible
for extracting the information out of a a connector configuration entry. For example, topic:topicA, topicB

Option Reference

Config Description Required Type
lenses.ip Bind HTTP at the given endpoint. Used in conjunction with lenses.port no string
lenses.port The HTTP port the HTTP server listens for connections: serves UI, Rest and WS APIs no int
lenses.jmx.port The port to bind an JMX agent to enable JVM monitoring no int
lenses.license.file The full path to the license file yes string
lenses.secret.file The full path to security.conf containing security credentials read more yes string
lenses.topics.audits
Topic to store system auditing information. Keep track of WHO did WHAT and WHEN.
When a topic, config, connector is Created/Updated or Deleted an audit message is stored.
We advise not to change the defaults neither to delete the topic
yes string
lenses.topics.metrics
Topic to store stream processor metrics. When your state-less stream processors are
running in Kubernetes or Kafka Connect, this topic collects healthcheck and performance metrics.
We advise not to change the defaults neither to delete the topic
yes string
lenses.topics.cluster
Topic to store broker details. Infrastructure information is used to determine
config changes, failures and new nodes added or removed in a cluster.
We advise not to change the defaults neither to delete the topic
yes string
lenses.topics.profiles
Topic to store user preferences. Bookmark your most used topics, connectors or SQL processors.
We advise not to change the defaults neither to delete the topic
yes string
lenses.topics.processors
Topic to store the SQL processors details.
We advise not to change the defaults neither to delete the topic
yes string
lenses.topics.alerts.storage
Topic to store the alerts raised.
We advise not to change the defaults neither to delete the topic
yes string
lenses.topics.alerts.settings
Topic to store the alerts configurations.
We advise not to change the defaults neither to delete the topic
yes string
lenses.topics.lsql.storage
Topic to store all data access SQL queries. Know WHO access WHAT data and WHEN.
We advise not to change the defaults neither to delete the topic
yes string
lenses.kafka.brokers
A list of host/port pairs to use for establishing the initial connection to the Kafka
cluster. Add just a few broker addresses here and Lenses will bootstrap and discover the
full cluster membership (which may change dynamically). This list should be in the form
"host1:port1,host2:port2,host3:port3"
yes string
lenses.zookeeper.hosts
Provide all the available Zookeeper nodes details. For every ZooKeeper node specify the
connection url (host:port) and if JMX is enabled the JMX (host:port). The configuration should be
[{url:"hostname1:port1",jmx:"hostname1:port2"},{url:"hostname2:port3",jmx:"hostname2:port4"}]
yes string
lenses.zookeeper.chroot
You can add your znode (chroot) path if you are using it. Please do not add
leading or trailing slashes. For example if you use the zookeeper chroot ``/kafka` for
your Kafka cluster, set this value to kafka
no string
lenses.zookeeper.security.enabled
Enables secured connection to your Zookeeper.
The default value is false. Please read about this setting before enabling it.
no boolean
lenses.schema.registry.urls
Provide all available Schema Registry node details or list the load balancer address if one is
used. For every instance specify the connection url and if JMX is enabled the JMX (host:port).
The configuration should be
[{url:"http://host1:port1", jmx:"host1:port2"},{url:"http://host2:port3",jmx:"host2:port4"}]
no string
lenses.connect.clusters
Provide all available Kafka Connect clusters. For each cluster give a name, list the 3 backing topics
and provide workers connection details (host:port) and JMX endpoints if enabled and on Kafka 1.0.0
no array
lenses.alert.manager.endpoints
Comma separated Alert Manager endpoints. If provided, Lenses will push raised
alerts to the downstream notification gateway. The configuration should be
"http://host1:port1/api/v1/alerts,http://host1:port1/api/v1/alerts"
no string
lenses.alert.manager.source
How to identify the source of an Alert in Alert Manager. Default is Lenses but you might
want to override to UAT for example
no string
lenses.alert.manager.generator.url
A unique URL identifying the creator of this alert. Default is http://lenses but you might
want to override to http://<my_instance_url> for example
no string
lenses.grafana
If using Grafana, provide the Url location. The configuration should be
"http://grafana-host:port"
no string
lenses.sql.max.bytes
Used when reading data from a Kafka topic. This is the maximum data size in bytes to return
from a LSQL query. If the query is bringing more data than this limit any records received after
the limit are discarded. This can be overwritten in the LSQL query.
Default value is 20971520 ( 20 MB )
yes long
lenses.sql.max.time
Used when reading data from a Kafka topic. This is the time in milliseconds the
query will be allowed to run. If the time is exhausted it returns the records found so far.
This can be overwritten in the LSQL query. Default value is 1 hour
yes int
lenses.sql.sample.default Number of messages to take in every sampling attempt yes int
lenses.sql.sample.window How frequently to sample a topic for new messages when tailing it yes int
lenses.metrics.workers Number of workers to distribute the load of querying JMX endpoints and collecting metrics yes int
lenses.offset.workers Number of workers to distribute the load of querying topic offsets yes int
lenses.sql.execution.mode The SQL execution mode, IN_PROC or CONNECT or KUBERNETES read more yes string
lenses.sql.state.dir
Directory location to store the state of KStreams. If using CONNECT mode, this folder
must already exist on each Kafka Connect worker
no string
lenses.sql.monitor.frequency
How frequently SQL processors emmit healthcheck and performance metrics to
lenses.topics.metrics
no int
lenses.kubernetes.image.name The docker/container repository url and name of the Lenses SQL runner no string
lenses.kubernetes.image.tag The Lenses SQL runner image tag no string
lenses.kubernetes.config.file The location of the kubectl config file no string
lenses.kubernetes.service.account
The service account to deploy with. This account should be able to pull images
from lenses.kubernetes.image.name
no string
lenses.kubernetes.pull.policy The pull policy for Kubernetes containers: IfNotPresent or Always no string
lenses.kubernetes.runner.mem.limit The memory limit applied to the Container no string
lenses.kubernetes.runner.mem.request The memory requested for the Container no string
lenses.kubernetes.runner.java.opts Advanced JVM and GC memory tunning parameters no string
lenses.interval.summary The interval (in msec) to check for new topics, or topic config changes no long
lenses.interval.consumers The interval (in msec) to read all consumer info no int
lenses.interval.partitions.messages The interval (in msec) to refresh partitions info no long
lenses.interval.type.detection The interval (in msec) to check the topic payload type no long
lenses.interval.user.session.ms The duration (in msec) that a client session stays alive for. Default is 4 hours no long
lenses.interval.user.session.refresh The interval (in msec) to check whether a client session is idle and should be terminated no long
lenses.interval.schema.registry.healthcheck The interval (in msec) to check the status of schema registry instances no long
lenses.interval.topology.topics.metrics The interval (in msec) to refresh the topology status page no long
lenses.interval.alert.manager.healthcheck The interval (in msec) to check the status of the Alert manager instances no long
lenses.interval.alert.manager.publish The interval (in msec) on which unresolved alerts are published to alert manager no long
lenses.interval.jmx.refresh.zk The interval (in msec) to get Zookeeper JMX yes long
lenses.interval.jmx.refresh.sr The interval (in msec) to get Schema Registry JMX yes long
lenses.interval.jmx.refresh.broker The interval (in msec) to get Broker JMX yes long
lenses.interval.jmx.refresh.alert.manager The interval (in msec) to get Alert Manager JMX yes long
lenses.interval.jmx.refresh.connect The interval (in msec) to get Connect JMX yes long
lenses.interval.jmx.refresh.brokers.in.zk The interval (in msec) to refresh the brokers from Zookeeper yes long
lenses.kafka.ws.poll.ms
Max time (in msec) a consumer polls for data on each request, on WS API request
no int
lenses.kafka.ws.buffer.size Max buffer size for WS consumer no int
lenses.kafka.ws.max.poll.records
Specify the maximum number of records returned in a single call to poll(). It will
impact how many records will be pushed at once to the WS client
no int
lenses.kafka.ws.heartbeat.ms The interval (in msec) to send messages to the client to keep the TCP connection open no int
lenses.access.control.allow.methods Restrict the HTTP verbs allowed to initiate a cross-origin HTTP request no string
lenses.access.control.allow.origin Restrict to specific hosts cross-origin HTTP requests no string
lenses.schema.registry.topics The backing topic where schemas are stored no string
lenses.schema.registry.delete
Allows subjects to be deleted in the Schema Registry. Default is disabled.
Requires schema-registry version 3.3.0 or later
no boolean
lenses.allow.weak.SLL Allow connecting with https:// services even when self-signed certificates are used no boolean
lenses.telemetry.enable Enable or disable telemetry data collection no boolean
lenses.curator.retries The number of attempts to read the broker metadata from Zookeeper no int
lenses.curator.initial.sleep.time.ms The initial amount of time to wait between retries to ZK no int
lenses.zookeeper.max.session.ms
The max time (in msec) to wait for the Zookeeper server to
reply for a request. The implementation requires that the timeout be a
minimum of 2 times the tickTime (as set in the server configuration)
no int
lenses.zookeeper.max.connection.ms The duration (in msec) to wait for the Zookeeper client to establish a new connection no int
lenses.akka.request.timeout.ms The maximum time (in msec) to wait for an Akka Actor to reply no int
lenses.kafka.control.topics List of Kafka topics to be marked as system topics no string
lenses.alert.buffer.size The number of most recently raised alerts to keep in the cache no int
lenses.kafka.settings.consumer
Allow additional Kafka consumer settings to be specified. When Lenses creates
an instance of KafkaConsumer class it will use these properties during initialization
no string
lenses.kafka.settings.producer
Allow additional Kafka producer settings to be specified. When Lenses creates
an instance of KafkaProducer class it will use these properties during initialization
no string
lenses.kafka.settings.kstream
Allow additional Kafka KStreams settings to be specified
no string

The last three keys, allow configuring the consumer/producer/kstreams settings of Lenses internal consumer/producers/kstreams. Example: lenses.kafka.settings.producer.compression.type = snappy

Default Values

Config Default
lenses.ip 0.0.0.0
lenses.port 9991
lenses.jmx.port 9992
lenses.license.file license.json
lenses.secret.file security.conf
lenses.topics.audits _kafka_lenses_audits
lenses.topics.metrics _kafka_lenses_metrics
lenses.topics.cluster _kafka_lenses_cluster
lenses.topics.profiles _kafka_lenses_profiles
lenses.topics.processors _kafka_lenses_processors
lenses.topics.alerts.storage _kafka_lenses_alerts
lenses.topics.lsql.storage _kafka_lenses_lsql_storage
lenses.topics.alerts.settings _kafka_lenses_alerts_settings
lenses.kafka.brokers PLAINTEXT://localhost:9092
lenses.zookeeper.hosts [{url:”localhost:2181”, jmx:”localhost:11991”}]
lenses.zookeeper.chroot  
lenses.schema.registry.urls [{url:”http://localhost:8081”, jmx:”localhost:10081”}]
lenses.connect.clusters [{name: “dev”, urls: [{url:”http://localhost:8083”, jmx:”localhost:11100”}], statuses: “connect-statuses”, configs: “connect-configs”, offsets: “connect-offsets” }]
lenses.alert.manager.endpoints  
lenses.grafana  
lenses.sql.max.bytes 20971520
lenses.sql.max.time 3600000
lenses.sql.sample.default 2
lenses.sql.sample.window 200
lenses.metrics.workers 16
lenses.offset.workers 5
lenses.sql.execution.mode IN_PROC
lenses.sql.state.dir logs/lenses-sql-kstream-state
lenses.sql.monitor.frequency 10000
lenses.kubernetes.image.name eu.gcr.io/lenses-container-registry/lenses-sql-processor
lenses.kubernetes.image.tag 2.0
lenses.kubernetes.config.file /home/lenses/.kube/config
lenses.kubernetes.service.account default
lenses.kubernetes.pull.policy IfNotPresent
lenses.kubernetes.watch.reconnect.limit 10
lenses.kubernetes.runner.mem.limit 768Mi
lenses.kubernetes.runner.mem.request 512Mi
lenses.kubernetes.runner.java.opts -Xms256m -Xmx512m -XX:MaxPermSize=128m -XX:MaxNewSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true
lenses.schema.registry.topics _schemas
lenses.schema.registry.delete false
lenses.interval.summary 10000
lenses.interval.consumers 10000
lenses.interval.partitions.messages 10000
lenses.interval.type.detection 30000
lenses.interval.user.session.ms 14400000
lenses.interval.user.session.refresh 60000
lenses.interval.schema.registry.healthcheck 30000
lenses.interval.topology.topics.metrics 30000
lenses.interval.alert.manager.healthcheck 5000
lenses.interval.alert.manager.publish 30000
lenses.interval.jmx.refresh.zk 5000
lenses.interval.jmx.refresh.sr 5000
lenses.interval.jmx.refresh.broker 5000
lenses.interval.jmx.refresh.alert.manager 6000
lenses.interval.jmx.refresh.connect 5000
lenses.interval.jmx.refresh.brokers.in.zk 5000
lenses.kafka.ws.poll.ms 1000
lenses.kafka.ws.buffer.size 10000
lenses.kafka.ws.max.poll.records 1000
lenses.kafka.ws.heartbeat.ms 30000
lenses.access.control.allow.methods GET,POST,PUT,DELETE,OPTIONS
lenses.access.control.allow.origin
lenses.allow.weak.SSL true
lenses.telemetry.enable true
lenses.curator.retries 3
lenses.curator.initial.sleep.time.ms 2000
lenses.zookeeper.max.session.ms 10000
lenses.zookeeper.max.connection.ms 10000
lenses.akka.request.timeout.ms 10000
lenses.kafka.control.topics [“connect-configs”, “connect-offsets”, “connect-status”, “connect-statuses”, “_schemas”, “__consumer_offsets”, “_kafka_lenses_”, “lsql_”, “__transaction_state”]
lenses.alert.buffer.size 100
lenses.kafka.settings.consumer {reconnect.backoff.ms = 1000, retry.backoff.ms = 1000}
lenses.kafka.settings.producer {reconnect.backoff.ms = 1000, retry.backoff.ms = 1000}
lenses.interval.alert.manager.healthcheck 5000
lenses.alert.manager.source Lenses
lenses.alert.manager.generator.url http://lenses

Example

# Set the ip:port for Lenses to bind to
lenses.ip = 0.0.0.0
lenses.port = 9991
#lenses.jmx.port = 9992

# License file allowing connecting to up to N brokers
lenses.license.file = "license.json"

# Lenses security configuration is managed in an external file
lenses.secret.file = "security.conf"

# Topics created on start-up that Lenses uses to store state
lenses.topics.audits = "_kafka_lenses_audits"
lenses.topics.metrics = "_kafka_lenses_metrics"
lenses.topics.cluster = "_kafka_lenses_cluster"
lenses.topics.profiles = "_kafka_lenses_profiles"
lenses.topics.processors = "_kafka_lenses_processors"
lenses.topics.alerts.storage = "_kafka_lenses_alerts"
lenses.topics.lsql.storage = "_kafka_lenses_lsql_storage"
lenses.topics.alerts.settings= "_kafka_lenses_alerts_settings"

# Set up infrastructure end-points
lenses.kafka.brokers = "PLAINTEXT://localhost:9092"
lenses.zookeeper.hosts = [{url:"localhost:2181", jmx:"localhost:11991"}]
lenses.zookeeper.chroot= ""

# Optional integrations
lenses.schema.registry.urls = [{url:"http://localhost:8081", jmx:"localhost:10081"}]
lenses.connect.clusters = [{name: "dev", urls: [{url:"http://localhost:8083", jmx:"localhost:11100"}], statuses: "connect-statuses", configs: "connect-configs", offsets: "connect-offsets" }]
lenses.alert.manager.endpoints = "" # "http://host1:port1/api/v1/alerts,http://host1:port1/api/v1/alerts"
lenses.grafana = "" # "http://grafana-host:port"

# Set up Lenses SQL
lenses.sql.max.bytes = 20971520
lenses.sql.max.time = 3600000
lenses.sql.sample.default = 2 # Sample 2 messages every 200 msec
lenses.sql.sample.window = 200

# Set up Lenses workers
lenses.metrics.workers = 16
lenses.offset.workers = 5

# Set up Lenses SQL processing engine
lenses.sql.execution.mode = "IN_PROC" # "CONNECT" # "KUBERNETES"
lenses.sql.state.dir = "logs/lenses-sql-kstream-state"
lenses.sql.monitor.frequency = 10000

# Kubernetes configuration
lenses.kubernetes.image.name = "eu.gcr.io/lenses-container-registry/lenses-sql-processor"
lenses.kubernetes.image.tag = "2.0"
lenses.kubernetes.config.file = "/home/lenses/.kube/config"
lenses.kubernetes.service.account = "default"
lenses.kubernetes.pull.policy = "IfNotPresent"
lenses.kubernetes.watch.reconnect.limit = 10
lenses.kubernetes.runner.mem.limit = "768Mi"
lenses.kubernetes.runner.mem.request = "512Mi"
lenses.kubernetes.runner.java.opts = "-Xms256m -Xmx512m -XX:MaxPermSize=128m -XX:MaxNewSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true"

# Schema Registry topics and whether to allow deleting schemas in schema registry
lenses.schema.registry.topics = "_schemas"
lenses.schema.registry.delete = false

# Lenses internal refresh (in msec)
lenses.interval.summary = 10000
lenses.interval.consumers = 10000
lenses.interval.partitions.messages = 10000
lenses.interval.type.detection = 30000
lenses.interval.user.session.ms = 14400000
lenses.interval.user.session.refresh = 60000
lenses.interval.schema.registry.healthcheck = 30000
lenses.interval.topology.topics.metrics = 30000
lenses.interval.alert.manager.healthcheck = 5000
lenses.interval.alert.manager.publish = 30000

# Lenses JMX internal refresh (in msec)
lenses.interval.jmx.refresh.zk = 30000
lenses.interval.jmx.refresh.sr = 30000
lenses.interval.jmx.refresh.broker = 30000
lenses.interval.jmx.refresh.alert.manager = 30000
lenses.interval.jmx.refresh.connect = 30000
lenses.interval.jmx.refresh.brokers.in.zk = 30000

# Lenses Web Socket API
lenses.kafka.ws.poll.ms = 1000
lenses.kafka.ws.buffer.size = 10000
lenses.kafka.ws.max.poll.records = 1000
lenses.kafka.ws.heartbeat.ms = 30000

# Set access control
lenses.access.control.allow.methods = "GET,POST,PUT,DELETE,OPTIONS"
lenses.access.control.allow.origin = "*"

# Whether to allow self-signed certificates and telemetry
lenses.allow.weak.SSL = true
lenses.telemetry.enable = true

# Zookeeper connections configs
lenses.curator.retries = 3
lenses.curator.initial.sleep.time.ms = 2000
lenses.zookeeper.max.session.ms = 10000
lenses.zookeeper.max.connection.ms = 10000

lenses.akka.request.timeout.ms = 10000
lenses.kafka.control.topics = ["connect-configs", "connect-offsets", "connect-status", "connect-statuses", "_schemas", "__consumer_offsets", "_kafka_lenses_", "lsql_", "__transaction_state"]

# Set up Alerts and Integrations
lenses.alert.buffer.size = 100
lenses.alert.manager.source = "Lenses"
lenses.alert.manager.generator.url = "http://lenses"  # A unique URL identifying the creator of this alert.

#we override the aggressive defaults. Don't go too low as it will affect performance when the cluster is down
lenses.kafka.settings.consumer {
    reconnect.backoff.ms = 1000
    retry.backoff.ms = 1000
}

#we override the aggressive defaults. Don't go too low as it will affect performance when the cluster is down
lenses.kafka.settings.producer {
    reconnect.backoff.ms = 1000
    retry.backoff.ms = 1000
}

lenses.kafka.settings.producer.compression.type=snappy