Skip to content

Latest commit

 

History

History
 
 

eventhubskafka-flink-eventhubskafka

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
topic languages products statusNotificationTargets
sample
azurecli
json
sql
azure
azure-container-instances
azure-event-hubs
azure-container-service
azure-hdinsight

Streaming at Scale with Azure Event Hubs and Apache Flink

This sample uses Apache Flink to process streaming data from Event Hubs Kafka and uses another Event Hubs Kafka as a sink to store JSON data. This is done to analyze pure streaming performance of Flink; no aggregation is done and data is passed as fast as possible from the input to the output. Data is augmented by adding additional fields.

The sample provides a choice among options for hosting Flink: Azure Kubernetes Service, or Azure HDInsight Hadoop (using YARN).

To support very high throughput, two different Event Hubs namespaces are deployed by the template. Event Hubs capacity is limited to up to 20 units of 1 MB/s each (although this limit can be increased through a support ticket). If incoming throughput is under 10 MB/s, you could deploy two Event Hub instances under a single namespace instead.

The provided scripts will create an end-to-end solution complete with load test client.

Running the Scripts

Please note that the scripts have been tested on Ubuntu 18 LTS, so make sure to use that environment to run the scripts. You can run it using Docker, WSL or a VM:

The following tools/languages are also needed:

Setup Solution

Make sure you are logged into your Azure account:

az login

and also make sure you have the subscription you want to use selected

az account list

if you want to select a specific subscription use the following command

az account set --subscription <subscription_name>

once you have selected the subscription you want to use just execute the following command

./create-solution.sh -d <solution_name>

then solution_name value will be used to create a resource group that will contain all resources created by the script. It will also be used as a prefix for all resource create so, in order to help to avoid name duplicates that will break the script, you may want to generated a name using a unique prefix. Please also use only lowercase letters and numbers only, since the solution_name is also used to create a storage account, which has several constraints on characters usage:

Storage Naming Conventions and Limits

to have an overview of all the supported arguments just run

./create-solution.sh

Note To make sure that name collisions will be unlikely, you should use a random string to give name to your solution. The following script will generated a 7 random lowercase letter name for you:

./generate-solution-name.sh

Created resources

The script will create the following resources:

  • Azure Container Instances to host Spark Load Test Clients: by default one client will be created, generating a load of 1000 events/second
  • Event Hubs Namespace, Hub and Consumer Group: to ingest data incoming from test clients and to store data generated by Apache Flink
  • HDInsight or Azure Kubernetes Service: to host the Apache Flink job that processes event data
  • Azure Monitor: to monitor HDInsight, Azure Kubernetes Service and Flink

Streamed Data

Streamed data simulates an IoT device sending the following JSON data:

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 57.739726013343247,
        "moreData1": 52.230732688620829,
        "moreData2": 57.497518587807189,
        "moreData3": 81.32211656749469,
        "moreData4": 54.412361539409427,
        "moreData5": 75.36416309399911,
        "moreData6": 71.53407865773488,
        "moreData7": 45.34076957651598,
        "moreData8": 51.3068118685458,
        "moreData9": 44.44672606436184,
        [...]
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-154",
    "deviceSequenceNumber": 0,
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

Duplicate event handling

The solution does not perform event deduplication. In order to illustrate the effect of this, the event simulator is configured to randomly duplicate a small fraction of the messages (0.1% on average). Those duplicate events will be present in the destination Event Hub.

Solution customization

If you want to change some setting of the solution, like number of load test clients, event hubs TU and so on, you can do it right in the create-solution.sh script, by changing any of these values:

export EVENTHUB_CAPACITY=2
export EVENTHUB_PARTITIONS=1
export FLINK_PARALLELISM=1
export SIMULATOR_INSTANCES=1
# settings for AKS (-p aks)
export AKS_NODES=3
export AKS_VM_SIZE=Standard_D2s_v3
# settings for HDInsight YARN (-p hdinsight)
export HDINSIGHT_HADOOP_WORKERS=3
export HDINSIGHT_HADOOP_WORKER_SIZE=Standard_D3_V2

The above settings have been chosen to sustain a 1,000 msg/s stream. The script also contains settings for 5,000 msg/s and 10,000 msg/s.

Monitor performance

The deployment script will report performance, by default every minute for 30 minutes:

***** [M] Starting METRICS reporting
Event Hub capacity: 2 throughput units (this determines MAX VALUE below).
Reporting aggregate metrics per minute, offset by 2 minutes, for 30 minutes.
                         Event Hub #  IncomingMessages  IncomingBytes  OutgoingMessages  OutgoingBytes  ThrottledRequests
                         -----------  ----------------  -------------  ----------------  -------------  -----------------
              MAX VALUE                         120000      120000000            491520      240000000                  -
                         -----------  ----------------  -------------  ----------------  -------------  -----------------
    2019-11-10T08:17:44            1                 0              0                 0              0                  0
    2019-11-10T08:17:44            2                 0              0                 0              0                  0
    2019-11-10T08:18:00            1                 0              0                 0              0                  0
    2019-11-10T08:18:00            2                 0              0                 0              0                  0
    2019-11-10T08:19:00            1                 0              0                 0              0                  0
    2019-11-10T08:19:00            2                 0              0                 0              0                  0
    2019-11-10T08:22:37            1                 0              0                 0              0                  0
    2019-11-10T08:22:37            2                 0              0                 0              0                  0
    2019-11-10T08:23:00            1             43163       40022882             43163       40365390                  0
    2019-11-10T08:23:00            2             37007       37332787                 0              0                  0
    2019-11-10T08:24:00            1             59966       55621703             59966       56097577                  0
    2019-11-10T08:24:00            2             59943       60488690                 0              0                  0
    2019-11-10T08:25:00            1             60258       55947759             60258       56425866                  0
    2019-11-10T08:25:00            2             60117       60728670                 0              0                  0
    2019-11-10T08:26:00            1             60027       55738691             60027       56214951                  0
    2019-11-10T08:26:00            2             60003       60612850                 0              0                  0

In column "Event Hub #", 1 refers to the Event Hub used as input to Flink, and 2 to the Event Hub used as output. After a few minutes of ramp-up, the metrics for Event Hub 1 will show around 60k events/min (depending on selected event rate, here 1k events/s). As Apache Flink starts, the incoming message rate on Event Hub 2 rate will also reach around 60k events/min.

Apache Flink

The deployed Apache Flink solution doesn't do any analytics or projection, but only populates two fields in the JSON message: the time at which the event was received in Event Hubs, and the current timestamp.

The solution includes a custom monitoring library to log Flink events and metrics to Azure Monitor. The custom monitoring library is currently only included when the Flink job is deployed in AKS. To view the monitoring data, navigate to the Log Analytics resource in the Azure Portal.

The Flink Job Manager UI shows information about the current running job. The IP address of the Job Manager UI is reported by the deployment script. Note that the solution deploys the Job Manager on a public IP address without any security. In a production deployment, you should disable public IP endpoints.

Flink Job Manager Web UI

Flink deployment on AKS

Deployment on Azure Kubernetes Service is done in single-job, highly available mode. The deployment includes:

  • A Zookeeper cluster for maintaining quorum
  • A pod for the (per-job) Flink Job Manager and
  • A pod for each Flink Task Manager deployed as part of the job

In HA mode, the Flink the JobManager exposes a dynamically allocated port. Together with the JobManager, we run a custom sidecar container containing a small shell script. The script calls the JobManager REST API (running on fixed port 8081) to discover the JobManager RPC port, then calls the Kubernetes API to update the port exposed in the Kubernetes Service. RBAC is used to grant the sidecar container permissions to only this specific operation in the API.

Flink deployment on HDInsight

Deployment on HDInsight is done in job server, highly available mode. The deployment runs a YARN job for the Flink Job Manager, then submits a JAR job to the Job Manager. The Job Manager creates a YARN application per job.

Note that deployed jobs do not survive an HDInsight cluster reboot.

Query Data

Data is available in the created Event Hub output. You can use the Process Data screen in the Azure portal to inspect the event data.

Clean up

To remove all the created resource, you can just delete the related resource group:

az group delete -n <resource-group-name>