Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TECH] add option to pass kafka config and add malli schema validation #4

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
:dependencies [[org.clojure/clojure "1.10.0"]
[com.stuartsierra/component "1.0.0"]
[io.logicblocks/configurati "0.5.2"]
[metosin/malli "0.12.0"]
[org.apache.kafka/kafka-clients "2.3.0"]
[cambium/cambium.core "0.9.3"]
[cambium/cambium.codec-cheshire "0.9.3"]
Expand Down
147 changes: 54 additions & 93 deletions src/kafka_event_processor/kafka/component.clj
Original file line number Diff line number Diff line change
@@ -1,102 +1,63 @@
(ns ^:no-doc kafka-event-processor.kafka.component
(:require
[clojure.string :as str]

[com.stuartsierra.component :as component]

[configurati.core
:refer [define-configuration
define-configuration-specification
with-parameter
with-source
with-specification
with-key-fn
env-source]]

[configurati.key-fns :refer [remove-prefix]]
[configurati.conversions :refer [convert-to]]

[kafka-event-processor.utils.properties :refer [map->properties]])
(:import
[org.apache.kafka.clients.consumer ConsumerConfig]))

(defmethod convert-to :comma-separated-list [_ value]
(cond
(vector? value) value
(some? value) (mapv str/trim (str/split value #","))
:else nil))

(def kafka-configuration-specification
(define-configuration-specification
(with-key-fn (remove-prefix :kafka))
(with-parameter :kafka-bootstrap-servers)
(with-parameter :kafka-key-deserializer-class-config
:default "org.apache.kafka.common.serialization.StringDeserializer")
(with-parameter :kafka-value-deserializer-class-config
:default "org.apache.kafka.common.serialization.StringDeserializer")
(with-parameter :kafka-auto-offset-reset-config
:default "earliest")
(with-parameter :kafka-enable-auto-commit-config
:default "false")
(with-parameter :kafka-security-protocol
:default "SSL")
(with-parameter :kafka-ssl-truststore-location
:default "")
(with-parameter :kafka-ssl-truststore-password
:default "")
(with-parameter :kafka-ssl-keystore-location
:default "")
(with-parameter :kafka-ssl-keystore-password
:default "")
(with-parameter :kafka-ssl-key-password
:default "")))

(defn kafka-configuration
[prefix]
(define-configuration
(with-source (env-source :prefix prefix))
(with-specification kafka-configuration-specification)))
(ns kafka-event-processor.kafka.component
(:require [com.stuartsierra.component :as component]
[malli.core :as m]
[malli.error :as me]
[malli.util :as mu]))

(def kafka-base-configuration-schema
(m/schema
[:map
[:bootstrap.servers :string]
[:auto.offset.reset [:enum "earliest" "latest" "none"]]
[:key.deserializer {:optional true} :string]
[:value.deserializer {:optional true} :string]
[:enable.auto.commit {:optional true} [:enum "false" "true"]]]))

(def kafka-configuration-schema
(m/schema
[:multi {:dispatch :security.protocol}
["SSL"
(mu/merge
kafka-base-configuration-schema
[:map
[:security.protocol [:enum "SSL"]]
[:ssl.keystore.location :string]
[:ssl.keystore.password :string]
[:ssl.key.password :string]
[:ssl.truststore.location :string]
[:ssl.truststore.password :string]])]
["PLAINTEXT"
(mu/merge
kafka-base-configuration-schema
[:map [:security.protocol [:enum "PLAINTEXT"]]])]]))

(def valid-configuration?
(m/validator kafka-configuration-schema))

(defn- configuration-errors
[configuration]
(->> configuration
(m/explain kafka-configuration-schema)
(me/humanize)))

(defrecord Kafka
[configuration]
component/Lifecycle

(start [component]
(let [{:keys [bootstrap-servers
key-deserializer-class-config
value-deserializer-class-config
auto-offset-reset-config
enable-auto-commit-config
security-protocol
ssl-truststore-location
ssl-truststore-password
ssl-keystore-location
ssl-keystore-password
ssl-key-password]} configuration

consumer-config
{ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG
bootstrap-servers
ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG
key-deserializer-class-config
ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG
value-deserializer-class-config
ConsumerConfig/AUTO_OFFSET_RESET_CONFIG
auto-offset-reset-config
ConsumerConfig/ENABLE_AUTO_COMMIT_CONFIG
enable-auto-commit-config
:security.protocol security-protocol
:ssl.truststore.location ssl-truststore-location
:ssl.truststore.password ssl-truststore-password
:ssl.keystore.location ssl-keystore-location
:ssl.keystore.password ssl-keystore-password
:ssl.key.password ssl-key-password}]

(assoc component
:consumer-config consumer-config)))
(when-not (valid-configuration? configuration)
(throw (ex-info "Input config is not valid against the schema"
{:errors (configuration-errors configuration)})))
(assoc component
:consumer-config
(merge
{:key.deserializer "org.apache.kafka.common.serialization.StringDeserializer"
:value.deserializer "org.apache.kafka.common.serialization.StringDeserializer"
:enable.auto.commit "false"}
configuration)))

(stop [component]
(dissoc component :consumer-config)))

(defn new-kafka []
(map->Kafka {}))
(defn new-kafka
[configuration]
(map->Kafka {:configuration configuration}))
7 changes: 1 addition & 6 deletions src/kafka_event_processor/kafka/consumer_group.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
(ns ^:no-doc kafka-event-processor.kafka.consumer-group
(:require
[clojure.string :as str]

[com.stuartsierra.component :as component]

[configurati.core
:refer [define-configuration
define-configuration-specification
Expand All @@ -12,11 +10,8 @@
with-specification
with-key-fn
env-source]]

[configurati.key-fns :refer [remove-prefix]]
[configurati.conversions :refer [convert-to]]

[kafka-event-processor.utils.properties :refer [map->properties]])
[configurati.conversions :refer [convert-to]])
(:import [org.apache.kafka.clients.consumer ConsumerConfig]))

(defmethod convert-to :comma-separated-list [_ value]
Expand Down
52 changes: 0 additions & 52 deletions src/kafka_event_processor/kafka/system.clj

This file was deleted.

21 changes: 6 additions & 15 deletions test/kafka_event_processor/test_support/kafka/combined.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,14 @@
:refer [define-configuration
with-specification
with-source
with-key-fn
yaml-file-source
map-source
env-source]]
[configurati.key-fns :refer [remove-prefix]]
map-source]]

[freeport.core :refer [get-free-port!]]

[kafka-event-processor.test-support.kafka.zookeeper
:as zk]
[kafka-event-processor.test-support.kafka.broker
:as broker]
[kafka-event-processor.kafka.component
:as kafka]
[kafka-event-processor.kafka.consumer-group :as kafka-consumer-group]
[kafka-event-processor.utils.generators :as generators]
[kafka-event-processor.processor.configuration :as config]))
Expand Down Expand Up @@ -55,14 +49,11 @@
(finally
(stop kafka)))))

(defn kafka-configuration [{:keys [broker-host broker-port]}]
(define-configuration
(with-specification kafka/kafka-configuration-specification)
(with-source
(map-source
{:kafka-bootstrap-servers (str broker-host ":" broker-port)
:kafka-auto-offset-reset-config "earliest"
:kafka-security-protocol "PLAINTEXT"}))))
(defn kafka-configuration
[{:keys [broker-host broker-port]}]
{:bootstrap.servers (str broker-host ":" broker-port)
:auto.offset.reset "earliest"
:security.protocol "PLAINTEXT"})

(def kafka-main-consumer-group-configuration
(define-configuration
Expand Down
18 changes: 7 additions & 11 deletions test/kafka_event_processor/test_support/system.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(:require
[com.stuartsierra.component :as component]
[kafka-event-processor.test-support.database :as db]
[kafka-event-processor.kafka.system :as kafka-system]
[kafka-event-processor.kafka.component :as kafka-component]
[kafka-event-processor.processor.system :as processors]
[vent.core :as vent]
[kafka-event-processor.test-support.kafka.combined :as kafka]
Expand Down Expand Up @@ -86,13 +86,9 @@
{:db
:embedded-postgres})

:event-handler
(AtomEventHandler. (atom []))
:atom
(atom []))
(kafka-system/new-system
configuration-overrides
{:kafka :kafka})
:event-handler (AtomEventHandler. (atom []))
:atom (atom [])
:kafka (kafka-component/new-kafka (:kafka configuration-overrides)))
(processors/new-system
configuration-overrides
{:processor-identifier :main
Expand All @@ -101,12 +97,12 @@
:event-handler :event-handler
:additional-dependencies {:atom :atom}}))))


(defn new-test-system [configuration]
(defn new-test-system
[configuration]
(new-system
(merge
{:kafka (kafka/kafka-configuration configuration)
:kafka-main-consumer-group kafka/kafka-main-consumer-group-configuration
:main-processor kafka/main-processor-configuration
:main-processing-enabled? true}
configuration)))
configuration)))