Skip to content

Commit

Permalink
provide consumer example
Browse files Browse the repository at this point in the history
issue #73
  • Loading branch information
rsoika committed Apr 18, 2019
1 parent 4e40f27 commit c9d45e1
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 33 deletions.
54 changes: 51 additions & 3 deletions imixs-adapters-kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ To test the behavior of the Imxis Kafka Adapter, you can run [Imixs-Microservice

## How to create a custom Docker Image from Imixs-Microservcie

To create a custom Docker image of the Imixs-Microservice just jecout the project from [Github](https://github.com/imixs/imixs-microservice) and add the Imixs-Adapters-Kafka dependency:
To create a custom Docker image of the Imixs-Microservice just checkout the project from [Github](https://github.com/imixs/imixs-microservice) and add the Imixs-Adapters-Kafka dependency into the maven pom.xml:

<dependency>
<groupId>org.imixs.workflow</groupId>
Expand All @@ -30,14 +30,46 @@ The Imixs-Microservice project already includes this dependency in the pom.xml.
$ cd ~/git/imixs-microservice
$ mvn clean install -Pdocker-build

After that you can switch back into the imixs-adapter-kafka project and start the Imixs-Microservice Container with the docker-compose.yml file:
Now you have a local Docker image of the Imixs-Microservice including the Apache-Kafka adapter project.

Switch back into the imixs-adapter-kafka project and start the Imixs-Microservice Container with the docker-compose.yml file:

$ cd ~/git/imixs-adapters/imixs-adapters-kafka/
$ docker-compose up

This will start an instance of your new build Docker image of Imixs-Microservice including the Kafka Adater and also a local Kafak Server.

This docker-compose.ymls file will start the following components in a Docker stack:

* Imixs-Microservice including the Kafka Adapter and running in Wildfly Debug mode listening to port 8787
* Postgres DB for Imixs-Workflow
* Apache Kafka single node cluster
* Zookeeper

Take note of the following setup within the docker-compose.yml file:

....
kafka:
image: wurstmeister/kafka:latest
ports:
- target: 9094
published: 9094
protocol: tcp
mode: host
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://localhost:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE


This defines port 9092 for the internal network communication with kafka (kafka:9092). The port number 9094 is for the outside communication and bound to the host name 'localhost'. This is for local dev test only. In production environment you can add the real host name here or use the following bash command to resolve the host name from the docker info:

HOSTNAME_COMMAND: "docker info | grep ^Name: | cut -d' ' -f 2"



## Test the Kafka Adapter

First upload the demo model located under /src/model.ticket.bpmn
Expand All @@ -62,3 +94,19 @@ Now you can create a process instance which will trigger the Kafka Adapter:
]}' \
http://localhost:8080/api/workflow/workitem.json

### Error while fetching metadata with correlation id 537 : {1.0.1=LEADER_NOT_AVAILABLE}

If you got a error message like this one:

Error while fetching metadata with correlation id 537 : {1.0.1=LEADER_NOT_AVAILABLE}


Then first shutdown your stack with the command:

$ docker-compose down

and restart it again

$ docker-compose up

12 changes: 8 additions & 4 deletions imixs-adapters-kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ services:
image: imixs/imixs-microservice:latest
environment:
WILDFLY_PASS: adminadmin
DEBUG: "true"
POSTGRES_USER: "postgres"
POSTGRES_PASSWORD: "adminadmin"
POSTGRES_CONNECTION: "jdbc:postgresql://db/workflow"
Expand All @@ -22,6 +23,8 @@ services:
KAFKA_CLIENTID: "imixs-workflow1"
ports:
- "8080:8080"
- "9990:9990"
- "8787:8787"

# Imixs Admin Client
imixsadmin:
Expand All @@ -44,13 +47,14 @@ services:
protocol: tcp
mode: host
environment:
HOSTNAME_COMMAND: "docker info | grep ^Name: | cut -d' ' -f 2"
# HOSTNAME_COMMAND: "docker info | grep ^Name: | cut -d' ' -f 2"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
# KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://localhost:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
# volumes:
# - /var/run/docker.sock:/var/run/docker.sock
volumes:
- /var/run/docker.sock:/var/run/docker.sock


Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.imixs.workflow.kafka;

import java.io.Serializable;
import java.util.logging.Logger;

import javax.ejb.Stateless;

/**
* The ConfigService is used to provide static String and environment access methods.
*
* @version 1.0
* @author rsoika
*
*/
@Stateless
public class ConfigService implements Serializable {

public static final String ENV_KAFKA_BROKERS = "KAFKA_BROKERS";
public static final String ENV_KAFKA_CLIENTID = "KAFKA_CLIENTID";

private static final long serialVersionUID = 1L;

private static Logger logger = Logger.getLogger(ConfigService.class.getName());


/**
* Returns a environment variable. An environment variable can be provided as a
* System property.
*
* @param env
* - environment variable name
* @param defaultValue
* - optional default value
* @return value
*/
public static String getEnv(String env, String defaultValue) {
logger.finest("......read env: " + env);
String result = System.getenv(env);
if (result == null || result.isEmpty()) {
result = defaultValue;
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.Singleton;
import javax.ejb.Startup;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

Expand All @@ -24,14 +26,17 @@
* @author rsoika
*
*/
@Startup
@Singleton
@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
public class ConsumerService implements Serializable {

public static String KAFKA_BROKERS = "localhost:9092";
public static Integer MESSAGE_COUNT = 1000;
public static String CLIENT_ID = "client1";
public static String TOPIC_NAME = "demo";


public static String TOPIC_NAME = "1.0.1"; // just an example
public static String GROUP_ID_CONFIG = "consumerGroup1";
public static Integer MAX_NO_MESSAGE_FOUND_COUNT = 100;
public static String OFFSET_RESET_LATEST = "latest";
Expand Down Expand Up @@ -75,16 +80,23 @@ void init() {

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);

props.put(ProducerConfig.CLIENT_ID_CONFIG, ConfigService.getEnv(ConfigService.ENV_KAFKA_CLIENTID, "Imixs-Workflow-1"));


props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET_EARLIER);

consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));

runConsumer();


// here we need to subsribe the topics!
// logger.info("...register topic: " + TOPIC_NAME);
// consumer.subscribe(Collections.singletonList(TOPIC_NAME));
// runConsumer();
}

void runConsumer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@
@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
public class ProducerService implements Serializable {

public static final String ENV_KAFKA_BROKERS = "KAFKA_BROKERS";
public static final String ENV_KAFKA_CLIENTID = "KAFKA_CLIENTID";

private static final long serialVersionUID = 1L;

private static Logger logger = Logger.getLogger(ProducerService.class.getName());
Expand Down Expand Up @@ -80,8 +77,8 @@ void init() {
logger.info("...init KafkaProducer...");
Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getEnv(ENV_KAFKA_BROKERS, "kafka:9092"));
props.put(ProducerConfig.CLIENT_ID_CONFIG, getEnv(ENV_KAFKA_CLIENTID, "Imixs-Workflow-1"));
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ConfigService.getEnv(ConfigService.ENV_KAFKA_BROKERS, "kafka:9092"));
props.put(ProducerConfig.CLIENT_ID_CONFIG, ConfigService.getEnv(ConfigService.ENV_KAFKA_CLIENTID, "Imixs-Workflow-1"));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

Expand Down Expand Up @@ -124,21 +121,5 @@ public void onProcess(@Observes ProcessingEvent documentEvent) {

}

/**
* Returns a environment variable. An environment variable can be provided as a
* System property.
*
* @param env
* - environment variable name
* @param defaultValue
* - optional default value
* @return value
*/
public static String getEnv(String env, String defaultValue) {
String result = System.getenv(env);
if (result == null || result.isEmpty()) {
result = defaultValue;
}
return result;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package org.imixs.workflow.kafka;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import junit.framework.Assert;

/**
* This test verifies the IBAN regex
*
* @author rsoika
*
*/
public class TestProducer {

Producer<Long, String> producer;

private static Logger logger = Logger.getLogger(TestProducer.class.getName());


@Before
public void setup() {
Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094");


props.put(ProducerConfig.CLIENT_ID_CONFIG,"Imixs-Workflow1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



// here we wait maximum one second if the topic is present in the metadata (default is 60000)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,1000);
// props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
// CustomPartitioner.class.getName());
producer = new KafkaProducer<>(props);
}

@After
public void teardown() {
producer.close();
}

/**
* test the fieldlist of the first line of the file
*/
@Test
public void testSendMessages() {

ProducerRecord<Long, String> record = new ProducerRecord<Long, String>("1.0.1", "some data test...");

try {
RecordMetadata metadata = producer.send(record).get();
logger.info("...Imixs-Workflow Event sent to partition " + metadata.partition()
+ " with offset " + metadata.offset());
}

catch (ExecutionException e) {
logger.info("Error in sending record: " + e.getMessage());
Assert.fail();
}

catch (InterruptedException e) {
System.out.println("Error in sending record: " + e.getMessage());
Assert.fail();
}




}


}

0 comments on commit c9d45e1

Please sign in to comment.