6.1. Kafka Connect Query Language

The Kafka Connect Query Language is implemented in ‘antlr4’ grammar files.

6.1.1. Why ?

While working on our sink/sources we ended up producing quite complex configuration in order to support the functionality required. Imagine a Sink where you Source from different topics and from each topic you want to cherry pick the payload fields or even rename them. Furthermore you might want the storage structure to be automatically created and/or even evolve or you might add new support for the likes of bucketing (Riak TS has one such scenario). Imagine the JDBC sink with a table which needs to be linked to two different topics and the fields in there need to be aligned with the table column names and the complex configuration involved ...or you can just write this

kcql = "INSERT INTO transactions SELECT field1 as column1, field2 as column2, field3 FROM topic_A;
        INSERT INTO transactions SELECT fieldA1 as column1, fieldA2 as column2, fieldC FROM topic_B;"

6.1.1.1. Kafka Connect Query Language

There are two paths supported by this DSL. One is the INSERT that takes the following form (not all grammar is shown):

INSERT INTO $TARGET
SELECT *|columns
FROM   $TOPIC_NAME
       [IGNORE columns]
       [AUTOCREATE]
       [PK columns]
       [AUTOEVOLVE]
       [BATCH = N]
       [CAPITALIZE]
       [PARTITIONBY cola[,colb]]
       [DISTRIBUTEBY cola[,colb]]
       [CLUSTERBY cola[,colb]]
       [WITHTIMESTAMP cola|sys_time()]
       [WITHFORMAT TEXT|JSON|AVRO|BINARY|OBJECT|MAP]
       [STOREAS $YOUR_TYPE([key=value, .....])]

and a select only:

SELECT *|columns
FROM   $TOPIC_NAME
       [IGNORE columns]
       [WITHFORMAT TEXT|JSON|AVRO|BINARY]
       [WITHGROUP $YOUR_CONSUMER_GROUP]
       [WITHPARTITION (partition),[(partition, offset)]
       [SAMPLE $RECORDS_NUMBER EVERY $SLIDE_WINDOW

6.1.1.1.1. Examples

SELECT field1 FROM mytopic                    // Project one avro field named field1
SELECT field1 AS newName                      // Project and renames a field
SELECT *  FROM mytopic                        // Select everything - perfect for avro evolution
SELECT *, field1 AS newName FROM mytopic      // Select all & rename a field - excellent for avro evolution
SELECT * FROM mytopic IGNORE badField         // Select all & ignore a field - excellent for avro evolution
SELECT * FROM mytopic PK field1,field2        // Select all & with primary keys (for the sources where primary keys are required)
SELECT * FROM mytopic AUTOCREATE              // Select all and create the target Source (table for databases)
SELECT * FROM mytopic AUTOEVOLVE              // Select all & reflect the new fields added to the avro payload into the target