Skip to content

Commit

Permalink
Added kafka Adapter class
Browse files Browse the repository at this point in the history
issue #73
  • Loading branch information
rsoika committed Apr 19, 2019
1 parent 409bdfe commit aa3ce92
Show file tree
Hide file tree
Showing 15 changed files with 703 additions and 106 deletions.
26 changes: 19 additions & 7 deletions imixs-adapters-kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,35 @@ This adapter module provides an Apache Kafka messaging service for Imixs-Workflo

With Imixs-Kafka you can easily send Workflow Messages automatically during the processing life-cycle. With the Autowire-Function new process instances are send into a Kafka Message Queue so that any consumer interested in workflow events can consume the message and react in various ways.

<br /><br /><img src="src/uml/kafka-adapter-producer.png" />
<img src="src/uml/kafka-adapter-producer.png" />


The Adapter filters Workflow events by the Model Version number so you can control which kind of workflows are send into a message queue.
The Adapter filters Workflow events by its Model Version, so you can control which kind of workflows should be send into a message queue.

## Workflow Messages based on Business Logic

Another way to send Workflow Messages into a Kafka queue is the Imixs-Adapter Class.

org.imixs.workflow.kafka.KafkaAdapter

This implementation is based on the [Imixs-Adapter concept](https://www.imixs.org/doc/core/adapter-api.html) and allows a more fine grained modeling of a asynchronous service integration. The Imixs-Kafka Adapter can configured directly in a BPMN 2.0 Model.
This implementation is based on the [Imixs-Adapter concept](https://www.imixs.org/doc/core/adapter-api.html) and allows a more fine grained modeling of a asynchronous service integration.

The Imixs-Kafka Adapter can be configured directly in a BPMN 2.0 Model.

<img src="https://www.imixs.org/doc/images/modelling/bpmn_screen_37.png" />

You can configure the integration of the Kakfa Producer Service in various ways.



## Consuming Worklfow Message

The other way to integrate Imixs-Workflow into a Kafka infrastructure is to send Workflow Messages to a Kafka queue to be processed by the Imixs-Workflow Instance. In this way a client can sends a Process Instance to a predefined Message queue.

<img src="src/uml/kafka-adapter-consumer.png" />

Imixs-Workflow will automatically consume messages for a predefined topic and process the workflow data. In this way messages can be used to trigger the event-based Imixs-Workflow engine.

# <img src="https://github.com/imixs/imixs-microservice/raw/master/small_h-trans.png">

To test the behavior of the Imxis Kafka Adapter, you can run [Imixs-Microservices](https://github.com/imixs/imixs-microservice) as a custom image. The project provides a setup to include the Imixs Kafka Adapter and create a custom build.
Expand Down Expand Up @@ -86,10 +98,10 @@ This defines port 9092 for the internal network communication with kafka (kafka:

## Test the Kafka Adapter

First upload the demo model located under /src/model.ticket.bpmn
First upload the demo model located under /src/model/


curl --user admin:adminadmin --request POST -Tsrc/model/ticket.bpmn http://localhost:8080/api/model/bpmn
curl --user admin:adminadmin --request POST -Tsrc/model/kafka-ticket-1.0.0.bpmn http://localhost:8080/api/model/bpmn

You can verify the availiblity of the model under the Web URI:

Expand All @@ -101,7 +113,7 @@ Now you can create a process instance which will trigger the Kafka Adapter:
curl --user admin:adminadmin -H "Content-Type: application/json" -H 'Accept: application/json' -d \
'{"item":[ \
{"name":"type","value":{"@type":"xs:string","$":"workitem"}}, \
{"name":"$modelversion","value":{"@type":"xs:string","$":"1.0.1"}}, \
{"name":"$modelversion","value":{"@type":"xs:string","$":"kafka-ticket-1.0"}}, \
{"name":"$taskid","value":{"@type":"xs:int","$":"1000"}}, \
{"name":"$eventid","value":{"@type":"xs:int","$":"10"}}, \
{"name":"txtname","value":{"@type":"xs:string","$":"test-json"}}\
Expand All @@ -118,7 +130,7 @@ The junit class 'TestProcuer' contains an example how to send a workflow message

If you got a error message like this one:

Error while fetching metadata with correlation id 537 : {1.0.1=LEADER_NOT_AVAILABLE}
Error while fetching metadata with correlation id 537 : {kafka-ticket-1.0=LEADER_NOT_AVAILABLE}


Then first shutdown your stack with the command:
Expand Down
10 changes: 6 additions & 4 deletions imixs-adapters-kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ services:
# LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_BROKERS: "kafka:9092"
KAFKA_CLIENTID: "imixs-workflow1"
# KAFKA_AUTOWIRE: "(^kafka-ticket)"

ports:
- "8080:8080"
- "9990:9990"
- "8787:8787"

# Imixs Admin Client
# imixsadmin:
# image: imixs/imixs-admin
# ports:
# - "8888:8080"
imixsadmin:
image: imixs/imixs-admin
ports:
- "8888:8080"


#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class ConfigService implements Serializable {

public static final String ENV_KAFKA_BROKERS = "KAFKA_BROKERS";
public static final String ENV_KAFKA_CLIENTID = "KAFKA_CLIENTID";
public static final String ENV_KAFKA_AUTOWIRE = "KAFKA_AUTOWIRE";

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*******************************************************************************
* Imixs Workflow
* Copyright (C) 2001, 2011 Imixs Software Solutions GmbH,
* http://www.imixs.com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You can receive a copy of the GNU General Public
* License at http://www.gnu.org/licenses/gpl.html
*
* Project:
* http://www.imixs.org
* http://java.net/projects/imixs-workflow
*
* Contributors:
* Imixs Software Solutions GmbH - initial API and implementation
* Ralph Soika - Software Developer
*******************************************************************************/

package org.imixs.workflow.kafka;

import java.util.logging.Logger;

import javax.ejb.EJB;

import org.imixs.workflow.Adapter;
import org.imixs.workflow.ItemCollection;
import org.imixs.workflow.exceptions.AdapterException;

/**
* This adapter class sends a kafka message
*
* @author Ralph Soika
* @version 1.0
*
*/
public class KafkaAdapter implements Adapter {
private static Logger logger = Logger.getLogger(KafkaAdapter.class.getName());

// inject services...
@EJB
ProducerService producerService;

/**
* Send kafka message based on the current workitem
*/
@Override
public ItemCollection execute(ItemCollection workitem, ItemCollection event) throws AdapterException {
logger.info("execute kafka adapter - send message...");
producerService.sendWorkitem(workitem);
return workitem;
}

}
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package org.imixs.workflow.kafka;

import java.io.IOException;
import java.io.Serializable;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
import java.util.regex.Pattern;

import javax.annotation.PostConstruct;
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.Singleton;
import javax.enterprise.event.Observes;
import javax.xml.bind.JAXBException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
Expand All @@ -18,7 +21,13 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.imixs.workflow.ItemCollection;
import org.imixs.workflow.WorkflowKernel;
import org.imixs.workflow.engine.ProcessingEvent;
import org.imixs.workflow.exceptions.AdapterException;
import org.imixs.workflow.exceptions.ProcessingErrorException;
import org.imixs.workflow.xml.XMLDocument;
import org.imixs.workflow.xml.XMLDocumentAdapter;

/**
* The ProducerService is a Kafka client that publishes workflow events to the
Expand Down Expand Up @@ -50,7 +59,8 @@ public class ProducerService implements Serializable {
Producer<Long, String> producer;

/**
* This method creates a Kafka producer with some properties during initalization.
* This method creates a Kafka producer with some properties during
* initalization.
* <p>
* BOOTSTRAP_SERVERS_CONFIG: The Kafka broker's address. If Kafka is running in
* a cluster then you can provide comma (,) seperated addresses. For
Expand All @@ -77,49 +87,92 @@ void init() {
logger.info("...init KafkaProducer...");
Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ConfigService.getEnv(ConfigService.ENV_KAFKA_BROKERS, "kafka:9092"));
props.put(ProducerConfig.CLIENT_ID_CONFIG, ConfigService.getEnv(ConfigService.ENV_KAFKA_CLIENTID, "Imixs-Workflow-1"));
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
ConfigService.getEnv(ConfigService.ENV_KAFKA_BROKERS, "kafka:9092"));
props.put(ProducerConfig.CLIENT_ID_CONFIG,
ConfigService.getEnv(ConfigService.ENV_KAFKA_CLIENTID, "Imixs-Workflow-1"));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
// props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
// CustomPartitioner.class.getName());
producer = new KafkaProducer<>(props);
}

/**
* On each workflow process event a new message is generated.
* Autowire:
* <p>
* On each workflow process event a new message is generated if the workflow
* model version matches the setup.
*/
public void onProcess(@Observes ProcessingEvent documentEvent) {

if (ProcessingEvent.AFTER_PROCESS == documentEvent.getEventType()) {

logger.info("...consuming ProcssingEvent... send new kafka event...");

String uid = documentEvent.getDocument().getUniqueID();
// we use the model version as the topic name

String topic = documentEvent.getDocument().getModelVersion();
String value = documentEvent.getDocument().getWorkflowGroup() + ":" + uid;
// test autowire / model version
String modelPattern = ConfigService.getEnv(ConfigService.ENV_KAFKA_AUTOWIRE, null);
if (modelPattern != null && !modelPattern.isEmpty()) {
String modelVersion = documentEvent.getDocument().getModelVersion();

ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(topic, value);
Pattern regexPattern = Pattern.compile(modelPattern);
if (!regexPattern.matcher(modelVersion).find()) {
// no match
return;
}
}

logger.info("...consuming ProcssingEvent (model:" + modelPattern + ") -> send new kafka event...");
try {
RecordMetadata metadata = producer.send(record).get();
logger.info("...Imixs-Workflow Event sent with key " + uid + " to partition " + metadata.partition()
+ " with offset " + metadata.offset());
sendWorkitem(documentEvent.getDocument());
} catch (AdapterException e) {
// convert Adapter Exception into runtime Exception!
throw new ProcessingErrorException(e.getErrorContext() , e.getErrorCode(), e.getMessage(),e);
}
}

catch (ExecutionException e) {
logger.info("Error in sending record: " + e.getMessage());
}
}

catch (InterruptedException e) {
System.out.println("Error in sending record: " + e.getMessage());
}
/**
* This method sends a kafka message based on a given workitem.
* <p>
* The topic ofi the message is the model version
* <p>
* The value is a serialized version of the workitem.
*
* @param workitem
* @throws AdapterException
*/
public void sendWorkitem(ItemCollection workitem) throws AdapterException {
String uid = workitem.getUniqueID();
// we use the model version as the topic name

String topic = workitem.getModelVersion();
// String value = workitem.getWorkflowGroup() + ":" + uid;

try {
byte[] value = XMLDocumentAdapter.writeItemCollection(workitem);

ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(topic, new String(value));

RecordMetadata metadata = producer.send(record).get();
logger.info("...Imixs-Workflow Event sent with key " + uid + " to partition " + metadata.partition()
+ " with offset " + metadata.offset());
}

catch (ExecutionException e) {
throw new AdapterException(ProducerService.class.getSimpleName(), "EXECUTION-EXCEPTION",
e.getMessage(), e);
} catch (InterruptedException e) {
throw new AdapterException(ProducerService.class.getSimpleName(), "INTERUPTED-EXCEPTION",
e.getMessage(), e);
} catch (JAXBException e) {
throw new AdapterException(ProducerService.class.getSimpleName(), "JAXB-EXCEPTION", e.getMessage(),
e);
} catch (IOException e) {
throw new AdapterException(ProducerService.class.getSimpleName(), "IO-EXCEPTION", e.getMessage(),
e);
}

}


}
6 changes: 6 additions & 0 deletions imixs-adapters-kafka/src/main/resources/META-INF/beans.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
version="1.2" bean-discovery-mode="all">

</beans>
24 changes: 24 additions & 0 deletions imixs-adapters-kafka/src/main/resources/META-INF/copyright.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
Imixs Workflow
Copyright (C) 2001, 2019 Imixs Software Solutions GmbH,
http://www.imixs.com

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.

You can receive a copy of the GNU General Public
License at http://www.gnu.org/licenses/gpl.html

Project:
http://www.imixs.org
http://java.net/projects/imixs-workflow

Contributors:
Imixs Software Solutions GmbH - initial API and implementation
Ralph Soika - Software Developer
Loading

0 comments on commit aa3ce92

Please sign in to comment.