At some point in your journey with Apache Kafka®, you might ask, "How many messages are in that topic?" You know enough to avoid using just the latest offset—because it’s possible messages at lower offsets have since been deleted. For an active topic, the number of messages is constantly changing and thus may have little value, but in some cases, even a rough estimate is good enough. There are also times where stricter validation is necessary.
For instance, say you have a failover scenario using Confluent Replicator. It might be important to know how many messages were in the source topic before cutting back over if messages were produced on the destination cluster during the downtime. Any client can be programmed to do this count for you, but KSQL makes it really easy and gives you access to filters if there are specific types of message that you want to count.
Let’s assume you have some customer profile data in a compacted topic. The messages in this topic might look something this:
19:{"full_name":"Skipper Koppe",
"birthdate":"1983-02-28",
"fav_animal":"Squirrel, malabar",
"fav_colour":"Fuscia",
"fav_movie":"Return Of The Ghostbusters"}
Now obviously, this is important customer information that we are keeping track of for various applications. Since this is a compacted topic, we expect a single message for each key with few exceptions depending on the how often we clean up our partitions segments.
In our example above, the key is 19
and the value is our JSON string. This is all great, but one of our developers has asked how many customers we have in this topic. It doesn’t have to be exact because they are just trying to understand how much memory would be required if they store all of the customers in their application.
-
Docker
-
If running on Mac/Windows, at least 4GB allocated to Docker:
docker system info | grep Memory
Should return a value greater than 8GB - if not, the Kafka stack will probably not work.
Minimum version is Confluent Platform 4.1
-
Clone this repository
git clone https://github.com/confluentinc/ksql-recipes-try-it-at-home.git
-
Launch:
cd ksql-recipes-try-it-at-home/counting-messages-in-a-topic docker-compose up -d
-
Run KSQL CLI:
docker-compose exec ksql-cli ksql http://ksql-server:8088
-
Create a KSQL stream on our topic:
CREATE STREAM customer_profile (\ full_name VARCHAR,\ birth_date VARCHAR,\ fav_animal VARCHAR,\ fav_color VARCHAR,\ fav_movie VARCHAR) \ WITH (KAFKA_TOPIC = 'customers',\ VALUE_FORMAT = 'JSON');
-
Change the
auto.offset.reset
parameter so KSQL will read from the beginning of the topic:SET 'auto.offset.reset' = 'earliest';
-
Create a new KSQL stream from our previous stream that includes some derived columns to
GROUP BY
andSUM
:CREATE STREAM customer_profile_count AS \ SELECT 'Records' AS field1, \ 1 AS sum_this \ FROM customer_profile;
-
Run a
SELECT
statement to sum our derived column and count the messages:SELECT field1, \ SUM(sum_this) AS message_count \ FROM customer_profile_count \ GROUP BY field1;
As new messages come into the original topic, the message count will update and emit another output message. If you leave the SELECT
statement running you will see this value increase as time passes because new messages are being added to the topic in the background.
A more complex use case might require a filter or join that could be applied at various steps. The same concept can be used for a non-compacted topic if the need should arise.