Confluent Kafka KSQL 5.0.0 Review & How-To

By Tim Panagos

KSQL is an enhanced servicethat allows a SQL-like query capability over top of Kafka streams. It is maintained by Confluent, the commercial company behind Apache Kafka. They announced KSQL in Aug of 2017 and a little over a year later, they have released the 5.0.0 version of the service. In this post, I review KSQL 5.0.0 and show you how to install and exercise some of it’s key features.

We use Kafka in the Microshare SaaS architecture to feed streams of events into Robots, Device Unpackers, and other microservices. We also use MongoDB as the backbone of our query processing engine. MongoDB performance is an ongoing struggle. I am always looking for ways to offload responsibility from MongoDB to another part of the architecture.

In that vein, I did a deep dive into one of the newer additions to the Kafka suite of tools developed by Confluent, KSQL version 5.0.0.

Our Kafka topics are populated with JSON structured content exclusively. We chose JSON rather than AVRO because as an omnivorous, multi-tenant data platform, we cannot depend on static schemas for the data that passes through the architecture. While we impose standardized annotations and promote our Data Domain Modules architecture as a way to easily impose consistency on varied sensor data, we cannot rely on tools or approaches that depend on schema stability. For that reason, we have not been able to take advantage of other Confluent suite technologies like Kafka Connect which relies heavily on the Confluent Schema Registry service.

I was really hoping that KSQL would be less strict about schemas. Indeed, there are scalar functions in the documentation that gave me hope that we would be able to offload some of the stream filtering and batch query processing in our workload.

Overall, KSQL is a tool that is still evolving and needs further maturation. The documentation is decent, but incomplete. The tutorials and examples tend to stick to very simple use-cases. There are few external blog posts that cover the technology in any depth. There is a growing body of detail to be found on StackOverflow (tags: Kafka and KSQL) with expert answers from Confluent staff. For a tool that was only announced a year ago, it is not bad.

Recent changes to the codebase for KSQL seem to have broken important features related to the handling of JSON content. I will cover those issue below as well as show the sort of work-arounds that are possible.

As an expedient to experimentation, I found it useful to load some messages into my topic using a tool called kafkacat. I use OSX as a personal develop platform, so installing kafkacat is easy using the homebrew package manager.

After installation of kafkacat you can use it to inject messages into a target Kafka topic using the -P flag (for Producer) and read messages from a topic using the -C flag (for Consumer). Since I have a healthy stream of live data in our development server, I found it convenient to pipe some messages from our development kafka server into my localhost kafka topics. The following commands made it easy to move the data. You can see that it is a Consumer call piped into a Producer call with a different From & To server. Of course,  you can move data between topics on the same server, pipe data from a txt file, or simply type data from stdin. Consult the kafkacat –help for other usages.

Notice that the hosts do not have the port appended in the example. I found these commands online with the port annotation (eg. localhost:9092) included and this generated the following confusing error:

For this exercise, I will base the queries on a simplified JSON body based on the JSON object returned by a typical /share call in the Microshare API. You need not be familiar with the API or the JSON format to makes sense of what follows. The excerpt is from a typical unpacked sensor reading from a LoRaWAN-compatible sensor. For more information on our standardized formats for sensor data, see our documentation on the Data Domain Modules.

For my deep dive, I used the KSQL-CLI tool to exercise the functionality. KSQL-CLI is installed along with the Confluent 5.0 Open Source distribution package. If you are running the confluent services locally, you can simply start KSQL-CLI in a terminal window by typing:

By default, the KSQL-CLI will process queries on newly streamed input. That means that you run a query (the CLI will idle), publish messages to your Kafka topic, and then you will see the results of the query appear in the CLI as the logic is applied to the newly arriving messages. So, if you find that you type a command or query and the CLI seems to hang…it is waiting for input to the source topic. A CTRL-C will break you out of the stream process and bring you back to the CLI prompt.

I found it very useful to have queries run immediately from the beginning of my test queue (as setup with kafkacat above). To do this, you can set the default message offset:

From here, you have a KSQL-CLI session ready to being defining your first Streams and Tables. I will start with a simple Stream definition based on a simplified JSON message

This command has created a Stream definition in our session that parses messages in the kafka_topic called “create-json” and extracts two value from the JSON: the id and the recType.

You can see your created Streams using the List command:

You can see the structure of your Stream with the Describe command:

For runtime statistics and query details run:

If you need a mulligan and want to scrap your Stream use the DROP command:

You can now send SQL-like queries against your Stream. The simplest SELECT would look like this:

Notice that you will not automatically drop back to the CLI command prompt. Here, the SELECT is being continuously applied to the stream of data entering your kafka topic. If you run your kafkacat command again from another terminal, you should see additional rows appear in the output in KSQL-CLI session. To get back to terminate the query and get back to your prompt, use CTRL-C.

You can filter the results with SQL WHERE clauses:

Let’s create a more complex Stream and experiment with what is possible. With our nested JSON message format, the approaches to extracting JSON fields using scalar function is currently not working. We have two work-around approaches, that are functional but to my mind inferior.  Since this JSON Object itself contains nested JSON Objects, we can use STRUCT or MAP datatypes to drill-down through the JSON structure to a leaf node like the count contained at the obj.data.ipso.3300.5534 node.

STRUCT

In this set of examples, we are building a STRUCT data column from the JSON Object in the obj field of the message and each subsequent level of the JSON Object hierarchy that interests us.

To retrieve that count field in a select, we use the following syntax:

Note: the numeric node names 3300 and 5534 need to be enclosed in double-quotes when used within the CREATE and SELECT commands.

The results here are a combination of integers and nulls. The nulls are returned by message whose JSON structure does not contain a node at obj.data.ipso.3300.5534! The SELECT query will handle the case where data structure diverges gracefully. And, you can use the NULL column value to filter our records that do not contain the count value using a WHERE clause and the condition IS NOT NULL:

MAP

In this set of examples, we are using the MAP datatype to describe the hierarchical structure of interest in our underlying JSON Object in the obj field of the message. This approach only works for single-level JSON objects because the nesting MAP structures does not seem to work.

It might work for more complex examples is the nesting of MAP worked as expected. The Stream can be created successfully but the syntax for querying nested MAPs does not seem to be functional. For  example:

To retrieve that count field in a select, we use the following syntax:

Note: the numeric node names 3300 and 5534 need to be enclosed in double-quotes when used within the CREATE and SELECT commands.

The results here are a combination of integers and nulls. The nulls are returned by message whose JSON structure does not contain a node at obj.data.ipso.3300.5534! The SELECT query will handle the case where data structure diverges gracefully. And, you can use the NULL column value to filter our records that do not contain the count value using a WHERE clause and the condition IS NOT NULL:

Now let’s look at the tstamp JSON node which contains a UNIX-style epoch timestamp. We can use the KSQL scalar function TIMESTAMPTOSTRING to convert the BIGINT value into a human-readable ISO date format (or any other).

Pretty cool, so far.

Things that SHOULD work but do not currently

Now, here are some things that I expected to work that didn’t. I suspect that these functional issues are specific to JSON-based message content. They may function differently with AVRO content OR with text content that happens to be in JSON format (I know the distinction may be confusing.) I have included the links to GitHub issues where they exist if you wish to stay tuned-in to the progress.

EXTRACTJSONFIELD to get at deeply nested JSON nodes without declaring explicit STRUCT or MAP constructs.

In this case, the documentation suggests that the nodes should be reachable using the scalar function EXTRACTJSONFIELD. The query returns ‘null’ for the returned column regardless of formatting or positioning of the function in the SELECT. At this time, there is an outstanding Github issue that ascribes this affect to the loss of double-quotes in the internal representation of the JSON string created by the CREATE STREAM command. This effectively ensures that the STRING column ‘$.updaterId’ is never found which results in a null.

With a working version of EXTRACTJSONFIELD, you could more easily access deeply nested nodes without needing to materialized specialized Streams. Consider the potential of this query:

GitHub Issue #1562

Potential Fix! #1962

ARRAYCONTAINS to filter values based on membership in an array of values

In our architecture, we depend on a tagging system that creates a simple array of strings in the metadata annotation to make searching simpler with our REST API. My interpretation of the documentation is that you should be able use the scalar function ARRAYCONTAINS to filter using the tags node in the JSON.

In this example, an error occurs when attempting to filter results based on the contents of the obj.tags array. Once again, this function may be operational for AVRO or JSON text message formats.

GitHub Issue #1960

Potential Fix! #1962

Conclusion

I really want to like KSQL. Working with a SQL-like query syntax is attractive because it gives data access to a broad range of people with skills in relational databases. It is at least partially Google-able which I think is an important metric for a technology that will have broad exposure. It also works natively on streaming data which is new to many architectures. Making streaming data more familiar by wrangling it with filters and transformation outside of hardcore code provides an on-ramp to real-time for a lot of would-be adopters. Smart move on Confluent’s behalf.

If it can be made to work smoothly with loosely schema’d JSON content, it has the promise to replace or reduce the roll of MongoDB in many of my problematic use-cases for Microshare SaaS. It is still a very young technology, so approach it will caution. Be prepared to experiment since there is little searchable content—non-trivial tasks will not be copy and paste just yet. Performance is a concern. I HAVE NOT YET MEASURED. So I am not saying there is a problem, but since I don’t have an unlimited budget for hardware (or PaaS instances), I need to make sure that the services deliver value at an acceptable cost/value ratio.

I will continue to follow the progress of KSQL and update the blog as it develops. Comment, corrections, and questions are welcome in the comments section. Happy Streaming!

Tim Panagos is the Co-founder and CTO of Microshare, Inc.