-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy patheventhubs_mongo.py
79 lines (56 loc) · 2.09 KB
/
eventhubs_mongo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# Databricks notebook source
# MAGIC %md # Azure Event Hubs -> MongoDB
# MAGIC This notebook demonstrates reading events from Azure Event Hubs and writing them to a MongoDB Collection using [foreachBatch()](https://docs.databricks.com/spark/latest/structured-streaming/foreach.html).
# MAGIC
# MAGIC In order to run this notebook successfully, the following connectors must be installed and attached to the Databricks cluster.
# MAGIC - [azure-eventhubs-spark](https://github.com/Azure/azure-event-hubs-spark)
# MAGIC - [mongo-spark-connector_2.11
# MAGIC ](https://github.com/mongodb/mongo-spark)
# COMMAND ----------
from pyspark.sql.functions import *
from pyspark.sql.types import *
# COMMAND ----------
connectionString = "<EVENTHUBS_CONNECTIONSTRING>;EntityPath=<EVENTHUB_NAME>"
ehConf = {
"eventhubs.connectionString": connectionString,
"eventhubs.consumerGroup": "$Default"
}
# COMMAND ----------
inputStream = spark.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
rawBody = inputStream.selectExpr("CAST(body as STRING)")
# COMMAND ----------
schema = StructType([
StructField("messageId", IntegerType()),
StructField("deviceId", StringType()),
StructField("temperature", DoubleType()),
StructField("humidity", DoubleType())
])
bodyWithSchema = rawBody.select(col("body"), from_json(col("body"), schema).alias("data"))
ehStream = bodyWithSchema.select("data.*")
# COMMAND ----------
display(ehStream)
# COMMAND ----------
mongoConfig = {
"uri" : "<MONGO_CONNECTION_STRING> e.g. mongodb://...",
"database" : "<DATABASE>",
"collection" : "<COLLECTION>"
}
# COMMAND ----------
def foreach_batch_mongo(df, epoch_id):
df.write \
.format("mongo") \
.mode("append") \
.options(**mongoConfig) \
.save()
# COMMAND ----------
# MAGIC %md **Note:** If using Cosmos DB, please make sure to provision appropriate number of RUs in order to avoid Bulk write operation errors.
# COMMAND ----------
ehStream \
.writeStream \
.foreachBatch(foreach_batch_mongo) \
.outputMode("append") \
.option("checkpointLocation", "/tmp/streamingCheckpoint") \
.start()