diff --git a/project.clj b/project.clj index 463949a..e442ec6 100644 --- a/project.clj +++ b/project.clj @@ -10,6 +10,7 @@ :dependencies [[org.clojure/clojure "1.10.0"] [com.stuartsierra/component "1.0.0"] [io.logicblocks/configurati "0.5.2"] + [metosin/malli "0.12.0"] [org.apache.kafka/kafka-clients "2.3.0"] [cambium/cambium.core "0.9.3"] [cambium/cambium.codec-cheshire "0.9.3"] diff --git a/src/kafka_event_processor/kafka/component.clj b/src/kafka_event_processor/kafka/component.clj index 8b98a70..268a15b 100644 --- a/src/kafka_event_processor/kafka/component.clj +++ b/src/kafka_event_processor/kafka/component.clj @@ -1,102 +1,63 @@ -(ns ^:no-doc kafka-event-processor.kafka.component - (:require - [clojure.string :as str] - - [com.stuartsierra.component :as component] - - [configurati.core - :refer [define-configuration - define-configuration-specification - with-parameter - with-source - with-specification - with-key-fn - env-source]] - - [configurati.key-fns :refer [remove-prefix]] - [configurati.conversions :refer [convert-to]] - - [kafka-event-processor.utils.properties :refer [map->properties]]) - (:import - [org.apache.kafka.clients.consumer ConsumerConfig])) - -(defmethod convert-to :comma-separated-list [_ value] - (cond - (vector? value) value - (some? value) (mapv str/trim (str/split value #",")) - :else nil)) - -(def kafka-configuration-specification - (define-configuration-specification - (with-key-fn (remove-prefix :kafka)) - (with-parameter :kafka-bootstrap-servers) - (with-parameter :kafka-key-deserializer-class-config - :default "org.apache.kafka.common.serialization.StringDeserializer") - (with-parameter :kafka-value-deserializer-class-config - :default "org.apache.kafka.common.serialization.StringDeserializer") - (with-parameter :kafka-auto-offset-reset-config - :default "earliest") - (with-parameter :kafka-enable-auto-commit-config - :default "false") - (with-parameter :kafka-security-protocol - :default "SSL") - (with-parameter :kafka-ssl-truststore-location - :default "") - (with-parameter :kafka-ssl-truststore-password - :default "") - (with-parameter :kafka-ssl-keystore-location - :default "") - (with-parameter :kafka-ssl-keystore-password - :default "") - (with-parameter :kafka-ssl-key-password - :default ""))) - -(defn kafka-configuration - [prefix] - (define-configuration - (with-source (env-source :prefix prefix)) - (with-specification kafka-configuration-specification))) +(ns kafka-event-processor.kafka.component + (:require [com.stuartsierra.component :as component] + [malli.core :as m] + [malli.error :as me] + [malli.util :as mu])) + +(def kafka-base-configuration-schema + (m/schema + [:map + [:bootstrap.servers :string] + [:auto.offset.reset [:enum "earliest" "latest" "none"]] + [:key.deserializer {:optional true} :string] + [:value.deserializer {:optional true} :string] + [:enable.auto.commit {:optional true} [:enum "false" "true"]]])) + +(def kafka-configuration-schema + (m/schema + [:multi {:dispatch :security.protocol} + ["SSL" + (mu/merge + kafka-base-configuration-schema + [:map + [:security.protocol [:enum "SSL"]] + [:ssl.keystore.location :string] + [:ssl.keystore.password :string] + [:ssl.key.password :string] + [:ssl.truststore.location :string] + [:ssl.truststore.password :string]])] + ["PLAINTEXT" + (mu/merge + kafka-base-configuration-schema + [:map [:security.protocol [:enum "PLAINTEXT"]]])]])) + +(def valid-configuration? + (m/validator kafka-configuration-schema)) + +(defn- configuration-errors + [configuration] + (->> configuration + (m/explain kafka-configuration-schema) + (me/humanize))) (defrecord Kafka [configuration] component/Lifecycle (start [component] - (let [{:keys [bootstrap-servers - key-deserializer-class-config - value-deserializer-class-config - auto-offset-reset-config - enable-auto-commit-config - security-protocol - ssl-truststore-location - ssl-truststore-password - ssl-keystore-location - ssl-keystore-password - ssl-key-password]} configuration - - consumer-config - {ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG - bootstrap-servers - ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG - key-deserializer-class-config - ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG - value-deserializer-class-config - ConsumerConfig/AUTO_OFFSET_RESET_CONFIG - auto-offset-reset-config - ConsumerConfig/ENABLE_AUTO_COMMIT_CONFIG - enable-auto-commit-config - :security.protocol security-protocol - :ssl.truststore.location ssl-truststore-location - :ssl.truststore.password ssl-truststore-password - :ssl.keystore.location ssl-keystore-location - :ssl.keystore.password ssl-keystore-password - :ssl.key.password ssl-key-password}] - - (assoc component - :consumer-config consumer-config))) + (when-not (valid-configuration? configuration) + (throw (ex-info "Input config is not valid against the schema" + {:errors (configuration-errors configuration)}))) + (assoc component + :consumer-config + (merge + {:key.deserializer "org.apache.kafka.common.serialization.StringDeserializer" + :value.deserializer "org.apache.kafka.common.serialization.StringDeserializer" + :enable.auto.commit "false"} + configuration))) (stop [component] (dissoc component :consumer-config))) - -(defn new-kafka [] - (map->Kafka {})) +(defn new-kafka + [configuration] + (map->Kafka {:configuration configuration})) \ No newline at end of file diff --git a/src/kafka_event_processor/kafka/consumer_group.clj b/src/kafka_event_processor/kafka/consumer_group.clj index 3f3bf95..1fcaabd 100644 --- a/src/kafka_event_processor/kafka/consumer_group.clj +++ b/src/kafka_event_processor/kafka/consumer_group.clj @@ -1,9 +1,7 @@ (ns ^:no-doc kafka-event-processor.kafka.consumer-group (:require [clojure.string :as str] - [com.stuartsierra.component :as component] - [configurati.core :refer [define-configuration define-configuration-specification @@ -12,11 +10,8 @@ with-specification with-key-fn env-source]] - [configurati.key-fns :refer [remove-prefix]] - [configurati.conversions :refer [convert-to]] - - [kafka-event-processor.utils.properties :refer [map->properties]]) + [configurati.conversions :refer [convert-to]]) (:import [org.apache.kafka.clients.consumer ConsumerConfig])) (defmethod convert-to :comma-separated-list [_ value] diff --git a/src/kafka_event_processor/kafka/system.clj b/src/kafka_event_processor/kafka/system.clj deleted file mode 100644 index 4292d5b..0000000 --- a/src/kafka_event_processor/kafka/system.clj +++ /dev/null @@ -1,52 +0,0 @@ -(ns kafka-event-processor.kafka.system - (:require - [com.stuartsierra.component :as component] - - [configurati.core :as conf] - - [kafka-event-processor.kafka.component - :as kafka] - [kafka-event-processor.utils.logging :as log])) - -(defn new-system - "Creates a new kafka consumer client. - - * Configuration prefix to be specified (defaults to :service). - - All system map keys can be overridden or they default where applicable: - - * kafka: :kafka - * kafka-configuration: :kafka-configuration - - e.g. - - ```` - (kafka-system/new-system - configuration-overrides - {:kafka :kafka}) - ```` - " - ([configuration-overrides] - (new-system configuration-overrides {})) - ([configuration-overrides - {:keys [kafka kafka-configuration configuration-prefix kafka-enabled] - :or {kafka :kafka - kafka-configuration :kafka-configuration - kafka-enabled :kafka-enabled? - configuration-prefix :service}}] - (let [kafka-enabled? - (get configuration-overrides kafka-enabled true)] - (log/log-info - {kafka-enabled kafka-enabled?} - "Kafka enabled?") - (when kafka-enabled? - (component/system-map - kafka-configuration - (conf/resolve - (:kafka configuration-overrides - (kafka/kafka-configuration configuration-prefix))) - - kafka - (component/using - (kafka/new-kafka) - {:configuration kafka-configuration})))))) diff --git a/test/kafka_event_processor/test_support/kafka/combined.clj b/test/kafka_event_processor/test_support/kafka/combined.clj index 89b3f3c..fe273d9 100644 --- a/test/kafka_event_processor/test_support/kafka/combined.clj +++ b/test/kafka_event_processor/test_support/kafka/combined.clj @@ -4,11 +4,7 @@ :refer [define-configuration with-specification with-source - with-key-fn - yaml-file-source - map-source - env-source]] - [configurati.key-fns :refer [remove-prefix]] + map-source]] [freeport.core :refer [get-free-port!]] @@ -16,8 +12,6 @@ :as zk] [kafka-event-processor.test-support.kafka.broker :as broker] - [kafka-event-processor.kafka.component - :as kafka] [kafka-event-processor.kafka.consumer-group :as kafka-consumer-group] [kafka-event-processor.utils.generators :as generators] [kafka-event-processor.processor.configuration :as config])) @@ -55,14 +49,11 @@ (finally (stop kafka))))) -(defn kafka-configuration [{:keys [broker-host broker-port]}] - (define-configuration - (with-specification kafka/kafka-configuration-specification) - (with-source - (map-source - {:kafka-bootstrap-servers (str broker-host ":" broker-port) - :kafka-auto-offset-reset-config "earliest" - :kafka-security-protocol "PLAINTEXT"})))) +(defn kafka-configuration + [{:keys [broker-host broker-port]}] + {:bootstrap.servers (str broker-host ":" broker-port) + :auto.offset.reset "earliest" + :security.protocol "PLAINTEXT"}) (def kafka-main-consumer-group-configuration (define-configuration diff --git a/test/kafka_event_processor/test_support/system.clj b/test/kafka_event_processor/test_support/system.clj index 411bbe9..95916ed 100644 --- a/test/kafka_event_processor/test_support/system.clj +++ b/test/kafka_event_processor/test_support/system.clj @@ -2,7 +2,7 @@ (:require [com.stuartsierra.component :as component] [kafka-event-processor.test-support.database :as db] - [kafka-event-processor.kafka.system :as kafka-system] + [kafka-event-processor.kafka.component :as kafka-component] [kafka-event-processor.processor.system :as processors] [vent.core :as vent] [kafka-event-processor.test-support.kafka.combined :as kafka] @@ -86,13 +86,9 @@ {:db :embedded-postgres}) - :event-handler - (AtomEventHandler. (atom [])) - :atom - (atom [])) - (kafka-system/new-system - configuration-overrides - {:kafka :kafka}) + :event-handler (AtomEventHandler. (atom [])) + :atom (atom []) + :kafka (kafka-component/new-kafka (:kafka configuration-overrides))) (processors/new-system configuration-overrides {:processor-identifier :main @@ -101,12 +97,12 @@ :event-handler :event-handler :additional-dependencies {:atom :atom}})))) - -(defn new-test-system [configuration] +(defn new-test-system + [configuration] (new-system (merge {:kafka (kafka/kafka-configuration configuration) :kafka-main-consumer-group kafka/kafka-main-consumer-group-configuration :main-processor kafka/main-processor-configuration :main-processing-enabled? true} - configuration))) + configuration))) \ No newline at end of file