Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate array copies #31

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions bmq-sdk/src/main/java/com/bloomberg/bmq/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.bloomberg.bmq.impl.events.PushMessageEvent;
import com.bloomberg.bmq.impl.events.QueueControlEvent;
import com.bloomberg.bmq.impl.events.QueueControlEventHandler;
import com.bloomberg.bmq.impl.infr.io.ByteBufferOutputStream;
import com.bloomberg.bmq.impl.infr.net.NettyTcpConnectionFactory;
import com.bloomberg.bmq.impl.infr.proto.AckMessageImpl;
import com.bloomberg.bmq.impl.infr.proto.BinaryMessageProperty;
Expand Down Expand Up @@ -1019,8 +1020,9 @@ public void closeAsync(Duration timeout) throws BMQException {
public PutMessage createPutMessage(ByteBuffer... payload) {
PutMessageImpl msg = new PutMessageImpl();

try {
msg.appData().setPayload(payload);
try (ByteBufferOutputStream bbos = new ByteBufferOutputStream()) {
bbos.writeBuffers(payload);
msg.appData().setPayload(bbos.peekUnflipped());
} catch (IOException e) {
throw new BMQException("Failed to set payload", e);
}
Expand Down
16 changes: 14 additions & 2 deletions bmq-sdk/src/main/java/com/bloomberg/bmq/impl/BrokerSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -1219,10 +1221,12 @@ public QueueHandle lookupQueue(QueueId queueId) {
return queueStateManager.findByQueueId(queueId);
}

public void post(QueueHandle queueHandle, PutMessageImpl... msgs) throws BMQException {
public void post(QueueHandle queueHandle, Collection<PutMessageImpl> msgs) throws BMQException {
Argument.expectNonNull(queueHandle, "queueHandle");
Argument.expectNonNull(msgs, "msgs");
Argument.expectPositive(msgs.length, "message array length");
if (msgs.isEmpty()) {
return;
}

// Queue state guard
QueueState state = queueHandle.getState();
Expand All @@ -1249,6 +1253,14 @@ public void post(QueueHandle queueHandle, PutMessageImpl... msgs) throws BMQExce
}
}

public void post(QueueHandle queueHandle, PutMessageImpl msg) throws BMQException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need there there two methods here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.singletonList() is more efficient for a single element than Arrays.asList()

post(queueHandle, Collections.singletonList(msg));
}

public void post(QueueHandle queueHandle, PutMessageImpl... msgs) throws BMQException {
post(queueHandle, Arrays.asList(msgs));
}

public GenericResult confirm(QueueHandle queueHandle, PushMessageImpl... messages) {
Argument.expectNonNull(queueHandle, "queueHandle");
Argument.expectNonNull(messages, "messages");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,17 @@ private void addPayload(ByteBuffer b) {
public void read(
TcpConnection.ReadCallback.ReadCompletionStatus completionStatus, ByteBuffer[] data)
throws IOException {
logger.debug(
"Read from buffers {}, completion status needed bytes {}",
data.length,
completionStatus.numNeeded());
if (logger.isDebugEnabled()) {
int totalRead = 0;
for (ByteBuffer b : data) {
totalRead += b.remaining();
}
logger.debug(
"Read {} bytes from {} buffers, completion status needed bytes {}",
totalRead,
data.length,
completionStatus.numNeeded());
}
try {
EventHeader eventHeader = null;
for (ByteBuffer restData : data) {
Expand Down Expand Up @@ -134,12 +141,18 @@ public void read(
if (expectedDataSize <= 0) {
throw new IOException("Wrong event size: " + expectedDataSize);
}
byte[] b = new byte[expectedDataSize];
restData.get(b, 0, expectedDataSize);
addPayload(ByteBuffer.wrap(b));
ByteBuffer payload = restData.slice();
payload.limit(expectedDataSize);
addPayload(payload);
restData.position(restData.position() + expectedDataSize);
restData = restData.slice();
ByteBuffer[] bb = new ByteBuffer[receivedPayloads.size()];

ByteBuffer[] bb = new ByteBuffer[0];
bb = receivedPayloads.toArray(bb);
logger.debug(
"dispatching event handler for {} with {} buffers",
expectedEventType,
receivedPayloads.size());
eventHandler.handleEvent(expectedEventType, bb);
reset();
break;
Expand Down
16 changes: 13 additions & 3 deletions bmq-sdk/src/main/java/com/bloomberg/bmq/impl/PutPoster.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -68,9 +70,9 @@ void setMaxEventSize(int val) {
maxEventSize = Argument.expectNotGreater(val, EventHeader.MAX_SIZE_SOFT, "max event size");
}

public void pack(PutMessageImpl... msgs) {
public void pack(Collection<PutMessageImpl> msgs) {
Argument.expectNonNull(msgs, "msgs");
Argument.expectPositive(msgs.length, "message array length");
Argument.expectPositive(msgs.size(), "message array length");
for (PutMessageImpl m : msgs) {
Argument.expectNonNull(m, "put message");

Expand All @@ -87,11 +89,19 @@ public void flush() {
}
}

public void post(PutMessageImpl... msgs) {
public void post(Collection<PutMessageImpl> msgs) {
pack(msgs);
flush();
}

public void post(PutMessageImpl msg) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here

post(Collections.singletonList(msg));
}

public void post(PutMessageImpl... msgs) {
post(Arrays.asList(msgs));
}

private void sendEvent() {
PutEventBuilder putBuilder = new PutEventBuilder();
putBuilder.setMaxEventSize(maxEventSize);
Expand Down
19 changes: 10 additions & 9 deletions bmq-sdk/src/main/java/com/bloomberg/bmq/impl/QueueImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +50,7 @@ public class QueueImpl implements QueueHandle {
static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

static final int INVALID_QUEUE_ID = -1;
private static final int INITIAL_PUTMESSAGES_SIZE = 100;

// Immutable fields
private final BrokerSession brokerSession;
Expand All @@ -61,7 +64,8 @@ public class QueueImpl implements QueueHandle {
// Fields exposed to user thread
private final QueueHandleParameters parameters; // mutable object and final field
private volatile QueueState state;
private final ArrayList<PutMessageImpl> putMessages = new ArrayList<>();
private final AtomicReference<Collection<PutMessageImpl>> putMessages =
new AtomicReference<>(new ArrayList<>(INITIAL_PUTMESSAGES_SIZE));
private volatile boolean isSuspended = false;
// Whether the queue is suspended.
// While suspended, a queue receives no
Expand Down Expand Up @@ -262,19 +266,16 @@ public BmqFuture<CloseQueueCode> closeAsync(Duration timeout) {

public void pack(PutMessageImpl message) throws BMQException {
synchronized (lock) {
putMessages.add(message);
putMessages.get().add(message);
}
}

public PutMessageImpl[] flush() throws BMQException {
PutMessageImpl[] msgs;
public void flush() throws BMQException {
Collection<PutMessageImpl> messages = null;
synchronized (lock) {
msgs = new PutMessageImpl[putMessages.size()];
msgs = putMessages.toArray(msgs);
putMessages.clear();
messages = putMessages.getAndSet(new ArrayList<>(INITIAL_PUTMESSAGES_SIZE));
}
brokerSession.post(this, msgs);
return msgs;
brokerSession.post(this, messages);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,31 @@ public int read(byte[] b, int off, int len) throws IOException {
}
}

public int read(ByteBufferOutputStream bbos, int len) throws IOException {
if (len == 0) {
return 0;
}
int needed = len;
while (needed > 0 && currentBuffer < byteBuffers.length) {
ByteBuffer buffer = getBuffer();
ByteBuffer readable = buffer.slice();
int remaining = readable.remaining();
if (needed > remaining) {
readable.position(remaining);
bbos.writeBuffer(false, readable);
buffer.position(buffer.limit());
needed -= remaining;
} else {
readable.limit(needed);
readable.position(needed);
bbos.writeBuffer(false, readable);
buffer.position(buffer.position() + needed);
needed = 0;
}
}
return len - needed;
}

private ByteBuffer getBuffer(int length) throws IOException {
if (length == 0) {
return ByteBuffer.allocate(0);
Expand Down
Loading