3. Configure Lenses

Lenses configuration is driven by a single configuration file, lenses.conf, which is divided into sections enclosed by the lenses block. The main configuration options are described in this section. You can find the full list of options here.

lenses {
  ............
}

Once downloaded and extracted the content of the Lenses folder looks like:

.
├── LICENSE.txt
├── NOTICE.txt
├── bin
├── jre8u131
├── lenses.conf
├── lib
├── licenses
├── logback-debug.xml
└── logback.xml

The bin folder contains the start scripts.

Lenses can always run with a specific configuration. Simply pass the configuration full file path to the startup script:

bin/lenses lenses.conf

3.1. Java Options

The following environment variables control the java configuration options when starting Lenses:

  • LENSES_HEAP_OPTS - The heap space settings, the default is -Xmx3g -Xms512m
  • LENSES_JMX_OPTS - JMX options so set
  • LENSES_LOG4J_OPTS - Logging options
  • LENSES_PERFORMANCE_OPTS - Any extra options, default is -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true

3.2. Logging

Lenses uses Logback for logging. The logback.xml is picked up from the installation folder; of course you can replace this file with your own configuration or you can override the existing logback.xml file. The former can be achieved by exporting LENSES_LOG4J_OPTS variables like you see below.

export LENSES_LOG4J_OPTS="-Dlogback.configurationFile=file:mylogback.xml"

3.2.1. Changing log levels

Logback enables hot loading of changes to the logback.xml file. We have set this to 30 seconds however it can be adjusted via the configuration element:

<configuration scan="true" scanPeriod="30 seconds" >
  ...
</configuration>

Simply change the root or appender level to the one required.

The default appenders are:

<logger name="akka" level="INFO"/>
<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR"/>
<logger name="com.typesafe.sslconfig.ssl.DisabledComplainingHostnameVerifier" level="ERROR"/>
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="ERROR"/>
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="WARN"/>
<logger name="org.apache.kafka.clients.consumer.internals.AbstractCoordinator" level="WARN"/>
<logger name="io.confluent.kafka.serializers.KafkaAvroDeserializerConfig" level="WARN"/>
<logger name="org.I0Itec.zkclient" level="WARN"/>
<logger name="org.apache.zookeeper" level="WARN"/>
<logger name="org.apache.calcite" level="OFF"/>

The default pattern is %d{ISO8601} %-5p [%c{2}:%L] %m%n.

3.2.2. Log location

All the logs Lenses produces can be found in the logs directory; however we recommend following the Twelve Factor App approach to logging and log to stdout, especially when using a container orchestration engine such as Kubernetes. Leave log collection to agents such as filebeats, logstash, fluentd and flume.

3.3. Ports

At startup Lenses will bind to the IP and port set in the configuration file. To change the address and port just set the ip and port configuration entries. By default Lenses binds to port 9991.

lenses {
    # Set the ip:port for Lenses to bind to
    ip = 0.0.0.0
    port = 9991
}

3.4. License

Lenses requires a license file which can be obtained from us. Once you have received your license token store it in a file ( i.e. license.file) and update the configuration to point to it. Please provide full file path in the configuration.

lenses {

  ...

  # License file allowing connecting to up to N brokers
  license.file="license.json"
}

The license folder contains all the third-party licenses for all the software libraries used by Lenses. A complete list is also available at https://landoop.com/third-party-software

3.5. System Topics

Lenses requires a number of system topics for monitoring, auditing, user profiles and processors information. These topics are created by Lenses during its startup and their names are configured by the topics configuration block:

 lenses {

    ...

    # topics created on start-up which Lenses uses to store state

    topics {
      audits = "_kafka_lenses_audits"
      metrics = "_kafka_lenses_metrics"
      profiles = "_kafka_lenses_profiles"
      processors = "_kafka_lenses_processors"
    }
}

Warning

Do not alter the configuration of these topics. They are managed by Lenses. If you are using ACLs grant the user running Lenses permissions to manage these topics.

If ACLs are already enabled on your Kafka cluster set the ACLs for the Lenses user and server for the following topics _kafka_lenses_audits, _kafka_lenses_metrics, _kafka_lenses_profiles and _kafka_lenses_processors.

kafka-acls \
--authorizer-properties zookeeper.connect=my_zk:2181 \
--add \
--allow-principal User:Lenses \
--allow-host lenses-host \
--operation Read \
--operation Write \
--operation Alter \
--operation Delete \
--topic topic

3.6. JMX Monitoring

3.6.1. Enabling JMX

JMX should be enabled for Kafka Brokers, Schema Registries and Zookeepers. To enable JMX set the JMX_PORT variable. Additionally JMX options can be set via the KAFKA_OPTS environment variable.

export JMX_PORT=9000
export KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent-0.7.jar=9001:/etc/jmx_exporter/jmx.yml

3.6.2. Configuring Lenses

Lenses integrates and monitors Kafka Brokers, Zookeepers, Schema Registries and Kafka Connect Clusters. To configure which cluster to monitor set the respective endpoints in the lenses.conf file. Use the jmx configuration section to let Lenses know about the JMX endpoints. Below you can find an example of such configuration:

  lenses {

    ...


    # Set up infrastructure end-points

    kafka.brokers        = "PLAINTEXT://host1:9092,PLAINTEXT://host2:9092,PLAINTEXT://host3:9092"
    zookeeper.hosts      = "zkhost1:2181,zkhost2:2181,zkhost3:2181"
    schema.registry.urls = "http://schema-host:8081"
    connect = [
      {dev: "http://kafka-connect-dev-host-or-ip:connect-port"},
      {uat: "http://connect1:8082,http://connect2:8082,http://connect3:8082"}
    ]

    # Set up monitoring end-points

    jmx {
      brokers = "host1:broker-jmx-port,host2:broker-jmx-port,host3:broker-jmx-port"
      schema.registry = "schema-host:jmx-port"
      zookeepers = "zkhost1:jmx-port,zkhost2:jmx-port,zkhost3:jmx-port"
      connect = [ {dev: "kafka-connect-dev-host-or-ip:jmx-port"} ]
    }
}

3.6.3. Expose Lenses JMX

Lenses also exposes its own JMX and therefore other systems can monitor it. To enable JMX in Lenses set the jmx.port option; to disable it please comment out the entry. The`Prometheus JMX exporter <https://github.com/prometheus/jmx_exporter>`__ can also be used which will make Lenses metrics available to Prometheus. A jmx exporter config file jmx_config.yaml is provided in etc. The jmx exporter from Prometheus runs as javaagent and can be set via the LENSES_OPTS environment variable:

export LENSES_OPTS="-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent-0.7.jar=9102:/etc/jmx_exporter/config.yaml"

3.7. SQL Configuration

The Lenses SQL Engine allows users to browse topics or even build and execute Kafka streams flow with a SQL like syntax. There are currently three execution modes available: IN_PROC, CONNECT and KUBERNETES. The last two are available only for Enterprise, and offers configurable and reliable scale out of Kafka Streams apps built via Lenses SQL.

Configuration of the SQL Engine execution mode is set in the sql section. When running with the execution mode set to CONNECT a SQL enabled Kafka Connect Cluster is required. To enable a cluster for Lenses SQL the Lenses SQL Connector must be installed in the cluster. The best way to archive this is via the isolated classpath loader introduced by Connect framework in Kafka 0.11.

  1. Create a folder called plugins/lib and place the Lenses SQL Connector jar inside.
  2. Set the plugin.path in the worker properties file to this location.
  3. Restart the Connect worker.

Warning

This must be done for all workers in the cluster.

#  create folder
mkdir -p plugins/lib

# copy in the jar
cp lenses-sql-runners-XXX-all.jar plugins/libs

# add plugins path to the worker properties file, ensure this is the only uncommented entry
echo $PWD/plugins/lib > worker.properties

# restart the workers
bin/connect-distributed worker.properties

If you are using Kafka versions 0.10.x the plugin.path classloader isolation is not available then set the connector folder as the first entry in the Java classpath.

export CLASSPATH=lenses-sql-runners-XX-all.jar

The Lenses SQL Connector returns metrics and state information from the KStreams application instances they are running. To control the frequency of these updates is done via the metric.return.frequency configuration entry.

In order to support Kafka Connect pause functionality and stop the SQL processors inside the connector, Lenses also needs to know the configured status topic of the target Connect cluster, the default is connect-statuses. You can find this information in the worker.properties file you started the Connect workers look for this configuration:

status.storage.topic=connect-statuses
  lenses {

    ...

    # sql.execution.mode can be IN_PROC or CONNECT (to scale-out)
    # You need to deploy lenses-SQL connector to the classpath, and set your connect clusters
    sql {
      execution.mode = "CONNECT"
      connect.clusters = [ {sql: "https://ip-or-host/api/kafka-connect"} ]
      # Metric return frequency in milliseconds for connect
      connect.metric.return.frequency = 5000
      # Metric topic
      metric.topic = _kafka_lenses_metrics
      # Location to store the kstreams local state
      state.dir = logs/kstreams-state-store-dir
    }
}

Warning

For Enterprise versions, the lenses.sql.state.dir must be available for all workers in any of the SQL enabled Connect Cluster and the lenses.sql.status.topic must be set to the status backing topic of the Connect cluster

Important

The connector artifacts are delivered after an initial commercial agreement

3.8. Security

Lenses has support for the following security modes: BASIC and LDAP. The mode is configured through the security.mode option. If set to BASIC the users and roles can be set via the security.users option. For example:

lenses {

  ...

  security {
    mode = BASIC
    users = [
              {"username": "admin", "password": "admin", "displayname": "Lenses Admin", "roles": ["admin", "write", "read"]},
              {"username": "write", "password": "write", "displayname": "Write User", "roles": ["write", "read"]},
              {"username": "read", "password": "read", "displayname": "Read Only", "roles": ["read"]}
            ]
  }
}

If you want to map LDAP roles to Lenses roles you have to set security.mode value to LDAP and use the ldap configuration section to set the mapping:

security {
  mode = LDAP
  ldap {
    url = your_ldap_url
    base = your_base
    user = your_ldap_user
    password = your_ldap_password
    login.filter = the_login_filter
    memberOf.key = the_member_of_ldap_entry
    group.extract.regex = the_group_name_regex_extractor
    roles {
      admin = []
      read = []
      write = []
      nodata = []
    }
  }
}

Note

The properties admin, read, write and nodata should list user roles not users.

Key Description Optional Type Default
url The LDAP server url. For example: ldap://mycompany.com:10389 No String N/A
base Your LDAP base. For example: dc=jboss,dc=org No String N/A
user Your LDAP user. For example: uid=admin,ou=system No String N/A
password Your LDAP user password. No String N/A
login.filter
The LDAP search filter - must result in a unique result.
See default value. <user> is required since is replaced
at runtime with the current user id.
Yes String (&(objectClass=person)(sAMAccountName=<user>))
memberOf.key
Your LDAP member of key entry. This is the key for which a
role is attached to the user entry. For example,
memberOf: cn=AdminR,ou=Groups,dc=jboss,dc=org - links
AdminR role to the current user entry.
Yes String memberOf
group.extract.regex
The regular expression syntax to extra
the role for the each ``memberOf``(see above) entry.
The default value matches the earlier example for memberOf.
Yes String (?i)CN=(\\w+),ou=Groups.*
roles.admin
The list of LDAP roles for which you want to grant
admin rights
No String[] N/A
roles.write
The list of LDAP roles for which you want to grant
write rights
No String[] N/A
roles.read
The list of LDAP roles for which you want to grant
read rights
No String[] N/A
roles.nodata
The list of LDAP roles for which you want to grant
nodata rights
No String[] N/A

3.8.1. JAAS

In order for Lenses to access Kafka in an environment set up with Kerberos (SASL) you need to create a standard JAAS file and start Lenses:

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/full/path/keytab-file"
  storeKey=true
  useTicketCache=false
  serviceName="kafka"
  principal="kafka@MYREALM";
};

And then, add the additional option, before starting Lenses

export LENSES_OPTS="-Djava.security.auth.login.config=/opt/lenses/jaas.conf"

3.9. Example

lenses {

  # Set the ip:port for Lenses to bind to
  ip = 0.0.0.0
  port = 9991

  # The port to start the JMX agent to allow Lenses to be monitored - comment to disable
  jmx.port = 9015

  # License file allowing connecting to up to N brokers
  license.file="license.json"

  # topics created on start-up that Lenses uses to store state

  topics {
    audits = "_kafka_lenses_audits"
    metrics = "_kafka_lenses_metrics"
    profiles = "_kafka_lenses_profiles"
    processors = "_kafka_lenses_processors"
  }

  # Set up infrastructure end-points

  kafka.brokers        = "PLAINTEXT://host1:9092,PLAINTEXT://host2:9092,PLAINTEXT://host3:9092"
  zookeeper.hosts      = "zkhost1:2181,zkhost2:2181,zkhost3:2181"
  schema.registry.urls = "http://schema-host:8081"
  connect = [
    {dev: "http://kafka-connect-dev-host-or-ip:connect-port"},
    {uat: "http://connect1:8082,http://connect2:8082,http://connect3:8082"}
  ]

  # Set up monitoring end-points

  jmx {
    brokers = "host1:jmx-port,host2:jmx-port,host3:jmx-port"
    schema.registry = "schema-host:jmx-port"
    zookeepers = "zkhost1:jmx-port,zkhost2:jmx-port,zkhost3:jmx-port"
    connect = [ {dev: "kafka-connect-dev-host-or-ip:jmx-port"} ]
  }

  # sql.execution.mode can be IN_PROC or CONNECT (to scale-out)
  # You need to deploy Lenses SQL connector to the classpath, and set your connect clusters
  sql {
    execution.mode = "CONNECT"
    connect.clusters = [ {sql: "https://ip-or-host/api/kafka-connect"} ]
    # Metric return frequency in milliseconds for connect
    connect.metric.return.frequency = 5000
    # Metric topic
    metric.topic = _kafka_lenses_metrics
    # Location to store the kstreams local state
    state.dir = logs/kstreams-state-store-dir
  }
}

3.10. Lenses Instances

We recommend running 1 instance of Lenses per Kafka cluster. Running multiple instances against the same infrastructure, (if not configured appropriately) could result in SQL processors duplicating data.

If you are looking for a Lenses Development Environment (free) just download it from here

3.11. Config Reference

Config Description Required Type
lenses.license.file The full path to the license file yes string
lenses.akka.system.name Akka Http actor system name no string
lenses.ip Bind HTTP at the given endpoint. Used in conjunction with lenses.port no string
lenses.port The HTTP port the HTTP server listen for connections on no int
lenses.akka.request.timeout.ms The maximum time in milliseconds to wait for an Akka Actor to reply no int
lenses.akka.actors.warmup.ms Time to allow Lenses actor to warm up on startup no int
lenses.zookeeper.hosts
Specifies the ZooKeeper connection string. Use the form hostname:port
where host and port are the host and port of a ZooKeeper server. Provide
multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3
and you can add a /znode path if you are using it
yes string
lenses.zookeeper.max.session.ms
The max time in milliseconds to wait for the Zookeeper server to
reply for a request. The implementation requires that the timeout be a
minimum of 2 times the tickTime (as set in the server configuration)
no int
lenses.curator.initial.sleep.time.ms
The initial amount of time to wait between retries when reading JMX
metrics for the brokers, schema registry, connect, zk services
no int
lenses.curator.retries
The number of attempts to read the metrics from JMX endpoints for the brokers,
schema registry, zk and connect services
no int
lenses.zookeeper.max.connection.ms
The time in msec to wait for the Zookeeper client to establish
a new connection
no int
lenses.kafka.brokers
A list of host/port pairs to use for establishing the initial connection
to the Kafka cluster. The client will make use of all servers irrespective of
which servers are specified here for bootstrapping, this list only impacts the
initial hosts used to discover the full set of servers. This list should be in
the form host1:port1,host2:port2,. Since these servers are just used for the
initial connection to discover the full cluster membership (which may change
dynamically), this list does not require ALL brokers to be specified.
yes string
lenses.schema.registry.urls
A list of host/port pairs to use for establishing the connection to the Schema
Registry cluster. This list should be in the form host1:port1,host2:port2,.
List all instances in case some of them are down or list the load balancer
address we one is used.
yes string
lenses.connect
The Kafka Connect clusters to use.
List of key/value of cluster name and host:port
no string
lenses.schema.registry.check.ms
The interval in milliseconds to check the instances of Schema Registry are
up and running
no long
lenses.kafka.control.topics List of Kafka topics to be marked as system topics no string
lenses.kafka.ws.max.poll.records
This is part of the WebSocket functionality for Kafka. It specifies the
maximum number of records returned in a single call to poll(). It will impact
how many records will be pushed at once to the WS client
no int
lenses.sql.connect.clusters
The connect cluster if lenses.sql.execution.mode
is set to CONNECT. List of key/value of cluster name and host:port
no string
lenses.sql.execution.mode The SQL execution mode, IN_PROC or CONNECT yes string
lenses.sql.metric.topic The topic to listen for SQL Connector KStream metrics on no string
lenses.sql.connect.metric.return.frequency
The frequency, in milliseconds a SQL Connector should
return metrics
no Long
lenses.sql.status.topic The status backing topic of the SQL enabled Connect clusters no string
lenses.sql.state.dir
Directory location for state store for KStream flows. This location must exist
on each worker of a SQL enabled Kafka Connect Cluster.
no string
lenses.interval.consumers
The interval in milliseconds to read the information about all consumers from
Kafka
no int
lenses.kafka.settings.consumer
Allow additional Kafka consumer settings to be specified. When Lenses creates
an instance of KafkaConsumer class it will use these properties during initialization
no string
lenses.kafka.settings.producer
Allow additional Kafka producer settings to be specified. When Lenses creates
an instance of KafkaProducer class it will use these properties during initialization
no string
lenses.kafka.settings.kstream
Allow additional Kafka KStream settings to be specified. When Lenses creates
an instance of KStream class it will use these properties during initialization
no string
lenses.kafka.read.max.polling.ms
When reading data from a Kafka topic, this is the time in milliseconds the
call to brokers will block waiting on new records
no int
lenses.kafka.read.max.records
The maximum number of records to read from a Kafka topic at once
Used for topic browsing functionality
no int
lenses.access.control.allow.origin Restrict cross-origin HTTP requests made from within Lenses client no string
lenses.access.control.allow.methods
Restrict the HTTP verbs allowed to initiate a cross-origin HTTP
request made from within Lenses client
no string
lenses.allow.weak.SLL
Allows connecting with https:// services even if they are using
self-signed certificates
no boolean
lenses.sqlSample Number of messages to take in every sampling attempt no int
lenses.sqlSampleWindow How frequently to sample a topic for new messages no int
lenses.kafka.ws.poll.ms
This is part of the WebSocket functionality for Apache Kafka. The amount of
time to wait before it reads records from the Kafka consumer
no int
lenses.kafka.ws.consumer.buffer.size
This is part of the WebSocket functionality for Apache Kafka. The
number of records to cache for the client. If the client can’t consume them fast
enough records will be dropped
no int
lenses.kafka.ws.heartbeat.ms
This is part of the WebSocket functionality. The interval to
send messages to the client in order to keep the TCP connection opened
no int
lenses.security.mode
Establishes which mechanism to use for authentication/authorization. Available
values are: NONE, BASIC and LDAP. For NONE, all requests are allowed.
BASIC uses a user-password-role approach. If LDAP is chosen then the authentication
is made against the given LDAP server and authorization is based on LDAP GROUPS.
no string
lenses.security.user
This only applies if lenses.security.mode has been set to BASIC.
The entry should contain an array of user structures defining the user, password
and access rights. Lenses support the following roles: Admin, Write, Read
no array
lenses.security.ldap.url
This only applies if lenses.security.mode has been set to LDAP
Contains url of the LDAP server
no string
lenses.security.ldap.base
This only applies if lenses.security.mode has been set to LDAP
Set the base suffix from which all operations should origin. If a base suffix is set,
you will not have to (and, indeed, must not) specify the full distinguished names in any
operations performed
no string
lenses.security.ldap.user
This only applies if lenses.security.mode has been set to LDAP
Set the user distinguished name (principal) to use for getting authenticated contexts
no string
lenses.security.ldap.password
This only applies if lenses.security.mode has been set to LDAP
The password (credentials) to use for getting authenticated contexts when querying LDAP
no string
lenses.security.ldap.login.filter
The LDAP search filter - must result in a unique result. See
default value. <user> is required since is replaced at runtime with the
current user id
no string
lenses.security.memberOf.key
Your LDAP member of key entry. This is the key for which a role is
attached to the user entry. For example, memberOf: cn=AdminR,ou=Groups,dc=jboss,dc=org links
AdminR role to the current user entry
no string
lenses.security.group.extract.regex
The regular expression syntax to extra the role for the each memberOf
(see above) entry. The default value matches the earlier example for memberOf
no string
lenses.security.ldap.roles.admin
This only applies if lenses.security.mode has been set to LDAP
Contains all the LDAP groups allowing full admin rights for Lenses
no string[]
lenses.security.ldap.roles.read
This only applies if lenses.security.mode has been set to LDAP
Contains all the LDAP groups allowing read access for Lenses
no string[]
lenses.security.ldap.roles.nodata
This only applies if lenses.security.mode has been set to LDAP
Contains all the LDAP groups allowing access without seeing data
no string[]
lenses.alert.buffer.size
The number of last raised alerts to keep. The client can call to get the alerts
raised and therefore the response size is linked directly to this number and of
course how many alerts have been raise
no int
lenses.alert.consumers.lag.threshold
The threshold beyond which an alert is raised for a consumer being
too slow reading data of a topic and partition in milliseconds
no int
lenses.jmx.port The port to start the JMX agent to allow Lenses monitoring no int
lenses.jmx.brokers The JMX ports for the Kafka Brokers. The format is hostname:port[,hostname:port] no string
lenses.jmx.zookeeper The JMX ports for the Zookeeper instances. The format is hostname:port[,hostname:port] no string
lenses.jmx.connect The JMX ports for the Kafka Connect instances. The format is hostname:port[,hostname:port] no string
lenses.jmx.schema.registry The JMX ports for the Schema Registry instances. The format is hostname:port[,hostname:port] no string
lenses.jmx.kafka.lenses The JMX ports for the Lenses instances. The format is hostname:port[,hostname:port] no string

Default Values

Config Default
lenses.ip 0.0.0.0
lenses.port 9991
lenses.jmx.port 9015
lenses.akka.system.name lenses
lenses.akka.request.timeout.ms 1000
lenses.akka.actors.warmup.ms 2000
lenses.zookeeper.max.session.ms 10000
lenses.zookeeper.max.connection.ms 10000
lenses.curator.initial.sleep.time.ms 3000
lenses.curator.retries 3
lenses.schema.registry.check.ms 30000
lenses.kafka.ws.max.poll.records 1000
lenses.sql.execution.mode IN_PROC
lenses.sql.metric.topic _kafka_lenses_metrics
lenses.sql.metric.return.frequency 5000
lenses.sql.status.topic connect-statuses
lenses.sql.state.dir logs/lenses-kafka-streams
lenses.interval.consumers 30000
lenses.kafka.read.max.polling.ms 1000
lenses.kafka.read.max.records 1000
lenses.access.control.allow.origin *
lenses.access.control.allow.methods GET,POST,PUT,DELETE,OPTIONS
lenses.allow.weak.SLL true
lenses.sqlSample 2
lenses.sqlSampleWindow 200
lenses.kafka.ws.poll.ms 1000
lenses.kafka.ws.consumer.buffer.size 10000
lenses.kafka.ws.heartbeat.ms 30000 – TODO interval
lenses.security.mode BASIC
lenses.security.ldap.login.filter (&(objectClass=person)(sAMAccountName=<user>))
lenses.security.ldap.memberOf.key memberOf
lenses.security.group.extract.regex (?i)CN=(\w+),ou=Groups.*
lenses.alert.buffer.size 100
lenses.alert.consumers.lag.threshold 10000
lenses.kafka.control.topics connect-configs, connect-offsets, connect-status, _schemas, __consumer_offsets