Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4788] Support disruptor as memory queue #4844

Merged
merged 2 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
dependencies {
implementation project(":eventmesh-common")
implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
implementation "com.lmax:disruptor"

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.eventmesh.api.admin.AbstractAdmin;
import org.apache.eventmesh.api.admin.TopicProperties;
import org.apache.eventmesh.storage.standalone.broker.MessageQueue;
import org.apache.eventmesh.storage.standalone.broker.Channel;
import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;

Expand All @@ -42,11 +42,11 @@ public StandaloneAdmin() {

@Override
public List<TopicProperties> getTopic() throws Exception {
ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = this.standaloneBroker.getMessageContainer();
ConcurrentHashMap<TopicMetadata, Channel> messageContainer = this.standaloneBroker.getMessageContainer();
List<TopicProperties> topicList = new ArrayList<>();
messageContainer.keySet().forEach(topicMetadata -> {
MessageQueue messageQueue = messageContainer.get(topicMetadata);
final int messageCount = messageQueue.getPutIndex() - messageQueue.getTakeIndex();
Channel channel = messageContainer.get(topicMetadata);
final int messageCount = channel.getMessageCount();
topicList.add(new TopicProperties(
topicMetadata.getTopicName(),
messageCount));
Expand All @@ -65,25 +65,7 @@ public void deleteTopic(String topicName) {
standaloneBroker.deleteTopicIfExist(topicName);
}

@Override
public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
if (!this.standaloneBroker.checkTopicExist(topicName)) {
throw new Exception("The topic name doesn't exist in the message queue");
}
ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = this.standaloneBroker.getMessageContainer();
long topicOffset = messageContainer.get(new TopicMetadata(topicName)).getTakeIndex();

List<CloudEvent> messageList = new ArrayList<>();
for (int index = 0; index < length; index++) {
long messageOffset = topicOffset + offset + index;
CloudEvent event = this.standaloneBroker.getMessage(topicName, messageOffset);
if (event == null) {
break;
}
messageList.add(event);
}
return messageList;
}

@Override
public void publish(CloudEvent cloudEvent) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.storage.standalone.broker;

import org.apache.eventmesh.api.LifeCycle;
import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
import org.apache.eventmesh.storage.standalone.broker.provider.DisruptorProvider;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import lombok.Getter;


public class Channel implements LifeCycle {

public static final Integer DEFAULT_SIZE = 4096 << 1 << 1;
@Getter
private DisruptorProvider provider;
private final Integer size;
private final EventHandler<MessageEntity> eventHandler;
private volatile boolean started = false;
private final TopicMetadata topic;
private static final String THREAD_NAME_PREFIX = "standalone_disruptor_provider_";

public Channel(TopicMetadata topic, EventHandler<MessageEntity> eventHandler) {
this(DEFAULT_SIZE, topic, eventHandler);
}


public Channel(final Integer ringBufferSize, final TopicMetadata topic, final EventHandler<MessageEntity> eventHandler) {
this.size = ringBufferSize;
this.topic = topic;
this.eventHandler = eventHandler;
}


@Override
public boolean isStarted() {
return started;
}

@Override
public boolean isClosed() {
return !isStarted();
}

public synchronized void start() {
if (isClosed()) {
doStart();
started = true;
}
}

public void doStart() {
Disruptor<MessageEntity> disruptor = new Disruptor<>(
MessageEntity::new,
size,
new EventMeshThreadFactory(THREAD_NAME_PREFIX + topic.getTopicName(), true),
ProducerType.MULTI,
new BlockingWaitStrategy()
);

disruptor.handleEventsWith(eventHandler);
disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
RingBuffer<MessageEntity> ringBuffer = disruptor.getRingBuffer();
provider = new DisruptorProvider(ringBuffer, disruptor);
provider.start();
}

public int getMessageCount() {
return provider.getMessageCount();
}

@Override
public synchronized void shutdown() {
if (isStarted()) {
provider.shutdown();
provider = null;
started = false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,71 +19,75 @@

import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
import org.apache.eventmesh.storage.standalone.broker.task.HistoryMessageClear;
import org.apache.eventmesh.storage.standalone.broker.task.HistoryMessageClearTask;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import io.cloudevents.CloudEvent;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
* This broker used to store event, it just support standalone mode, you shouldn't use this module in production environment
*/
@Slf4j
public class StandaloneBroker {

private final ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer;
// message source by topic
@Getter
private final ConcurrentHashMap<TopicMetadata, Channel> messageContainer;

// todo: move the offset manage to consumer
private final ConcurrentHashMap<TopicMetadata, AtomicLong> offsetMap;
@Getter
private final ConcurrentHashMap<TopicMetadata, Subscribe> subscribeContainer;

private StandaloneBroker() {
this.messageContainer = new ConcurrentHashMap<>();
this.offsetMap = new ConcurrentHashMap<>();
startHistoryMessageCleanTask();
}

public ConcurrentHashMap<TopicMetadata, MessageQueue> getMessageContainer() {
return this.messageContainer;
this.subscribeContainer = new ConcurrentHashMap<>();
}

public ConcurrentHashMap<TopicMetadata, AtomicLong> getOffsetMap() {
return this.offsetMap;
}

public static StandaloneBroker getInstance() {
return StandaloneBrokerInstanceHolder.instance;
return StandaloneBrokerInstanceHolder.INSTANCE;
}

/**
* put message
*
* @param topicName topic name
* @param message message
* @throws InterruptedException
*/
public MessageEntity putMessage(String topicName, CloudEvent message) throws InterruptedException {
Pair<MessageQueue, AtomicLong> pair = createTopicIfAbsent(topicName);
AtomicLong topicOffset = pair.getRight();
MessageQueue messageQueue = pair.getLeft();

MessageEntity messageEntity = new MessageEntity(
new TopicMetadata(topicName), message, topicOffset.getAndIncrement(), System.currentTimeMillis());
messageQueue.put(messageEntity);

public MessageEntity putMessage(String topicName, CloudEvent message) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
if (!messageContainer.containsKey(topicMetadata)) {
createTopic(topicName);
}
Channel channel = messageContainer.get(topicMetadata);
MessageEntity messageEntity = new MessageEntity(new TopicMetadata(topicName), message);
channel.getProvider().onData(messageEntity);
return messageEntity;
}

public Channel createTopic(String topicName) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
return messageContainer.computeIfAbsent(topicMetadata, k -> {
Subscribe subscribe = subscribeContainer.get(topicMetadata);
if (subscribe == null) {
throw new IllegalStateException("the topic not exist subscribe ");
}
Channel channel = new Channel(topicMetadata, subscribe);
channel.start();
return channel;
});
}

/**
* Get the message, if the queue is empty then await
*
* @param topicName
*/
public CloudEvent takeMessage(String topicName) throws InterruptedException {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
return messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()).take().getMessage();
return null;
}

/**
Expand All @@ -92,12 +96,7 @@ public CloudEvent takeMessage(String topicName) throws InterruptedException {
* @param topicName
*/
public CloudEvent getMessage(String topicName) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
MessageEntity head = messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()).getHead();
if (head == null) {
return null;
}
return head.getMessage();
return null;
}

/**
Expand All @@ -108,21 +107,9 @@ public CloudEvent getMessage(String topicName) {
* @return CloudEvent
*/
public CloudEvent getMessage(String topicName, long offset) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
MessageEntity messageEntity = messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()).getByOffset(offset);
if (messageEntity == null) {
return null;
}
return messageEntity.getMessage();
return null;
}

private void startHistoryMessageCleanTask() {
HistoryMessageClear historyMessageClear = new HistoryMessageClear(messageContainer);
Thread thread = new Thread(new HistoryMessageClearTask(historyMessageClear));
thread.setDaemon(true);
thread.setName("StandaloneBroker-HistoryMessageCleanTask");
thread.start();
}

public boolean checkTopicExist(String topicName) {
return messageContainer.containsKey(new TopicMetadata(topicName));
Expand All @@ -132,13 +119,10 @@ public boolean checkTopicExist(String topicName) {
* if the topic does not exist, create the topic
*
* @param topicName topicName
* @return messageQueue and offset
* @return Channel
*/
public Pair<MessageQueue, AtomicLong> createTopicIfAbsent(String topicName) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
MessageQueue messageQueue = messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue());
AtomicLong offset = offsetMap.computeIfAbsent(topicMetadata, k -> new AtomicLong());
return Pair.of(messageQueue, offset);
public Channel createTopicIfAbsent(String topicName) {
return createTopic(topicName);
}

/**
Expand All @@ -148,18 +132,23 @@ public Pair<MessageQueue, AtomicLong> createTopicIfAbsent(String topicName) {
*/
public void deleteTopicIfExist(String topicName) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
Channel channel = createTopicIfAbsent(topicName);
channel.shutdown();
messageContainer.remove(topicMetadata);
}

public void updateOffset(TopicMetadata topicMetadata, long offset) {
offsetMap.computeIfPresent(topicMetadata, (k, v) -> {
v.set(offset);
return v;
});
public void subscribed(String topicName, Subscribe subscribe) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
if (getMessageContainer().containsKey(topicMetadata)) {
log.warn("the topic already subscribed");
return;
}
subscribeContainer.put(topicMetadata, subscribe);
}


private static class StandaloneBrokerInstanceHolder {

private static final StandaloneBroker instance = new StandaloneBroker();
private static final StandaloneBroker INSTANCE = new StandaloneBroker();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import io.cloudevents.CloudEvent;

import lombok.NoArgsConstructor;

@NoArgsConstructor
public class MessageEntity implements Serializable {

private static final long serialVersionUID = 6646148767540524786L;
Expand All @@ -40,6 +43,11 @@ public MessageEntity(TopicMetadata topicMetadata, CloudEvent message, long offse
this.createTimeMills = currentTimeMills;
}

public MessageEntity(TopicMetadata topicMetadata, CloudEvent message) {
this.topicMetadata = topicMetadata;
this.message = message;
}

public TopicMetadata getTopicMetadata() {
return topicMetadata;
}
Expand Down
Loading
Loading