diff --git a/src/main/java/org/apache/flume/source/kafka/KafkaSource08.java b/src/main/java/org/apache/flume/source/kafka/KafkaSource08.java old mode 100644 new mode 100755 index c1865b2..b618161 --- a/src/main/java/org/apache/flume/source/kafka/KafkaSource08.java +++ b/src/main/java/org/apache/flume/source/kafka/KafkaSource08.java @@ -53,6 +53,7 @@ public class KafkaSource08 extends AbstractSource implements Configurable, Polla private ConsumerConnector consumer; private ConsumerIterator it; private String topic; + private int batchUpperLimit = 1; public Status process() throws EventDeliveryException { List eventList = new ArrayList(); @@ -62,7 +63,8 @@ public Status process() throws EventDeliveryException { Map headers; // byte [] bytes; try { - if(it.hasNext()) { + int eventCounter = 0; + while (it.hasNext() && eventCounter++ <= batchUpperLimit) { bytes = it.next().message(); event = new SimpleEvent(); headers = new HashMap(); @@ -81,6 +83,8 @@ public Status process() throws EventDeliveryException { } public void configure(Context context) { + + batchUpperLimit = context.getInteger("batch.upper.limit", 1); topic = context.getString("topic"); if(topic == null) { throw new ConfigurationException("Kafka topic must be specified."); diff --git a/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java b/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java old mode 100644 new mode 100755 index b2fdec3..5a78790 --- a/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java +++ b/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java @@ -95,6 +95,26 @@ public void testProcessItNotEmpty() throws EventDeliveryException, SecurityExcep when(mockIt.next()).thenReturn(mockMessageAndMetadata); assertEquals(Status.READY, status); } + + @SuppressWarnings("unchecked") + @Test + public void testProcessItNotEmptyBatch() throws EventDeliveryException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException { + + Field field = KafkaSource08.class.getDeclaredField("batchUpperLimit"); + field.setAccessible(true); + field.set(mockKafkaSource, 2); + + when(mockIt.next()).thenReturn(mockMessageAndMetadata); + when(mockIt.hasNext()).thenReturn(true); + when(mockIt.next()).thenReturn(mockMessageAndMetadata); + when(mockIt.hasNext()).thenReturn(true); + Status status = mockKafkaSource.process(); + verify(mockIt, times(2)).hasNext(); + verify(mockIt, times(2)).next(); + verify(mockChannelProcessor, times(1)).processEventBatch(anyList()); + when(mockIt.next()).thenReturn(mockMessageAndMetadata); + assertEquals(Status.READY, status); + } @SuppressWarnings("unchecked") @Test @@ -120,4 +140,6 @@ public void testProcessException() throws EventDeliveryException, SecurityExcept verify(mockChannelProcessor, times(0)).processEventBatch(anyList()); assertEquals(Status.BACKOFF, status); } + + }