IBM Message Hub is a fully-managed Apache Kafka service running on the IBM Bluemix PaaS. It exposes a native Kafka interface, so Lagom services can communicate with it using the standard Lagom Message Broker API.
This project demonstrates a simple consumer service, designed to consume messages produced by the IBM Message Hub Kafka Java console sample application producer.
The Lagom consumer service also exposes a streaming service call, which allows WebSocket clients to receive broadcasts of the consumed messages.
To build and run this example, you need:
- git
- Java SE 8 JDK
- Maven 3.2.1+ to build and run the Lagom project (3.5.0 recommended)
- Message Hub Service Instance provisioned in IBM Bluemix
- IBM Message Hub Kafka Java console sample application
Once you have a Message Hub Service instance provisioned in Bluemix, the main steps to run this example are:
- Gather Message Hub credentials that are needed for the producer and consumer to authenticate with Message Hub
- Download and set up the Lagom service
- Start the console producer
- Start the Lagom consumer service
- Monitor the stream of consumed messages
- Stop the Lagom consumer and console producer when finished
- Log in to the IBM Bluemix console.
- Navigate to the Message Hub service you have created.
- Navigate to "Service credentials".
- Create new credentials if needed (no special parameters are required).
- Click "View credentials".
- Copy the following credential values to use in the console producer and consumer service:
"api_key"
"kafka_admin_url"
"kafka_brokers_sasl"
— Note: both the Lagom service and the console sample application require the list of brokers to be formatted as a single-line, comma-separated list of hostname:port pairs. For example:"host1:port1,host2:port2"
."user"
"password"
Follow these steps to get a local copy of this project and configure it with the Message Hub credentials you saved in the previous step. You can supply the credentials in a configuration file or as environment variables.
- Open a command line shell and clone this repository:
git clone https://github.com/lagom/ibm-integration-examples.git
- Change into the root directory for this example:
cd ibm-integration-examples/lagom-message-hub-example
- To supply the configuration, do one of the following:
- Open the
message-hub-consumer-impl/src/main/resources/message-hub.conf
file in a text editor and fill in the empty values of thebrokers
,user
andpassword
properties from the credentials retrieved above.- You don't need to change the duplicate lines (with the form
brokers = ${?KAFKA_BROKERS}
) beneath each property—these allow the values to be overridden by environment variables (see below) and are ignored if the environment variables are not set. - Be sure not to commit this file with your credentials in it.
- You don't need to change the duplicate lines (with the form
- If you prefer not to enter credentials into the file, you can also set them as environment variables named
KAFKA_BROKERS
,KAFKA_USER
, andKAFKA_PASSWORD
.
- Open the
Open a second command line shell and follow the instructions to build and run the console sample producer.
For example:
java -jar build/libs/kafka-java-console-sample-2.0.jar "kafka02-prod01.messagehub.services.us-south.bluemix.net:9093,kafka04-prod01.messagehub.services.us-south.bluemix.net:9093,kafka01-prod01.messagehub.services.us-south.bluemix.net:9093,kafka05-prod01.messagehub.services.us-south.bluemix.net:9093,kafka03-prod01.messagehub.services.us-south.bluemix.net:9093" "https://kafka-admin-prod01.messagehub.services.us-south.bluemix.net:443" "<api_key>" -producer
After a few seconds, you should see messages logged to the console periodically:
[2017-08-09 15:23:44,339] INFO class com.messagehub.samples.ProducerRunnable is starting. (com.messagehub.samples.ProducerRunnable)
[2017-08-09 15:23:47,023] INFO Message produced, offset: 0 (com.messagehub.samples.ProducerRunnable)
[2017-08-09 15:23:49,369] INFO Message produced, offset: 1 (com.messagehub.samples.ProducerRunnable)
[2017-08-09 15:23:51,692] INFO Message produced, offset: 2 (com.messagehub.samples.ProducerRunnable)
[2017-08-09 15:23:54,008] INFO Message produced, offset: 3 (com.messagehub.samples.ProducerRunnable)
Leave the console producer running in the background as you complete the following steps.
In the command line shell where you downloaded the Lagom service, from the lagom-message-hub-example
directory, start the Lagom development environment by running:
mvn lagom:runAll
You should see some console output, including these lines:
...
[INFO] Service gateway is running at http://localhost:9000
...
[INFO] Service message-hub-consumer-impl listening for HTTP on 0:0:0:0:0:0:0:0:60025
[INFO] (Service started, press enter to stop and go back to the console...)
These messages indicate that the service has started correctly.
The Lagom consumer service will also begin logging messages to the console as they are consumed from Message Hub. It should quickly catch up to the messages already produced and then continue logging messages as they are produced.
16:29:48.804 [info] com.lightbend.lagom.messagehub.consumer.impl.MessageHubSubscriber [] - Message consumed: [This is a test message #0]
16:29:48.804 [info] com.lightbend.lagom.messagehub.consumer.impl.MessageHubSubscriber [] - Broadcasting message to internal subscribers: [This is a test message #0]
16:29:48.807 [info] com.lightbend.lagom.messagehub.consumer.impl.MessageHubSubscriber [] - Message consumed: [This is a test message #1]
16:29:48.807 [info] com.lightbend.lagom.messagehub.consumer.impl.MessageHubSubscriber [] - Broadcasting message to internal subscribers: [This is a test message #1]
16:29:48.808 [info] com.lightbend.lagom.messagehub.consumer.impl.MessageHubSubscriber [] - Message consumed: [This is a test message #2]
16:29:48.808 [info] com.lightbend.lagom.messagehub.consumer.impl.MessageHubSubscriber [] - Broadcasting message to internal subscribers: [This is a test message #2]
16:29:48.808 [info] com.lightbend.lagom.messagehub.consumer.impl.MessageHubSubscriber [] - Message consumed: [This is a test message #3]
16:29:48.808 [info] com.lightbend.lagom.messagehub.consumer.impl.MessageHubSubscriber [] - Broadcasting message to internal subscribers: [This is a test message #3]
...
From a WebSocket client, you can monitor the stream of messages that the Lagom service is consuming by connecting to the service URI as follows:
-
In the Location: field, enter "
ws://localhost:9000/message-hub-consumer
". -
Click Connect.
-
You should begin seeing messages appear in the Log panel:
CONNECTED RECEIVED: This is a test message #237 RECEIVED: This is a test message #238 RECEIVED: This is a test message #239
To stop running the examples:
- Press "Control-C" in the the console running the console producer to stop it.
- Press "Enter" in the console running the Lagom development environment to stop the service.