Skip to content

Commit

Permalink
fix race condition in LogEntriesObservableList
Browse files Browse the repository at this point in the history
  • Loading branch information
xzel23 committed Dec 24, 2024
1 parent c797a5a commit a3371d2
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
Expand All @@ -25,7 +25,8 @@ final class LogEntriesObservableList extends ObservableListBase<LogEntry> implem
private static final long REST_TIME_IN_MS = 10;

private volatile List<LogEntry> data = Collections.emptyList();
private final AtomicInteger queuedRemoves = new AtomicInteger();
private final AtomicLong totalAdded = new AtomicLong(0);
private final AtomicLong totalRemoved = new AtomicLong(0);

private final ReadWriteLock updateLock = new ReentrantReadWriteLock();
private final Lock updateWriteLock = updateLock.writeLock();
Expand All @@ -46,18 +47,25 @@ final class LogEntriesObservableList extends ObservableListBase<LogEntry> implem
try {
updatesAvailableCondition.await();

List<LogEntry> newData = Arrays.asList(buffer.toArray());

Platform.runLater(() -> {
int newSz = newData.size();
int oldSz = data.size();
int removedRows = queuedRemoves.getAndSet(0);
int remainingRows = oldSz - removedRows;
int addedRows = newSz - remainingRows;

try {
beginChange();
List<LogEntry> removed = List.copyOf(data.subList(0, removedRows));
LogBuffer.BufferState state = buffer.getBufferState();
List<LogEntry> newData = Arrays.asList(state.entries());
totalAdded.getAndSet(state.totalAdded());
long ta = totalAdded.get();
long trOld = totalRemoved.getAndSet(state.totalRemoved());
long tr = totalRemoved.get();

assert newData.size() == ta - tr;

int newSz = newData.size();
int oldSz = data.size();
int removedRows = (int) Math.min(oldSz, (tr - trOld));
int remainingRows = oldSz - removedRows;
int addedRows = newSz - remainingRows;
List<LogEntry> removed = List.copyOf(data.subList(0, removedRows));

data = newData;
nextRemove(0, removed);
nextAdd(newSz - addedRows, newSz);
Expand Down Expand Up @@ -102,7 +110,6 @@ public LogEntry get(int idx) {
public void entries(int removed, int added) {
updateWriteLock.lock();
try {
queuedRemoves.addAndGet(removed);
updatesAvailableCondition.signalAll();
} finally {
updateWriteLock.unlock();
Expand All @@ -113,7 +120,6 @@ public void entries(int removed, int added) {
public void clear() {
updateWriteLock.lock();
try {
queuedRemoves.set(data.size());
updatesAvailableCondition.signalAll();
} finally {
updateWriteLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
* A log buffer class intended to provide a buffer for log messages to display in GUI applications.
Expand All @@ -22,6 +23,8 @@ public class LogBuffer implements LogEntryHandler, Externalizable {

private final RingBuffer<LogEntry> buffer;
private final Collection<LogBufferListener> listeners = new ArrayList<>();
private final AtomicLong totalAdded = new AtomicLong(0);
private final AtomicLong totalRemoved = new AtomicLong(0);

/**
* Construct a new LogBuffer instance with default capacity.
Expand Down Expand Up @@ -81,6 +84,8 @@ public void handleEntry(LogEntry entry) {
int removed;
synchronized (buffer) {
removed = buffer.put(entry) ? 0 : 1;
this.totalAdded.incrementAndGet();
this.totalRemoved.addAndGet(removed);
}
listeners.forEach(listener -> listener.entries(removed, 1));
}
Expand All @@ -93,6 +98,7 @@ public void handleEntry(LogEntry entry) {
public void clear() {
synchronized (listeners) {
synchronized (buffer) {
totalRemoved.addAndGet(buffer.size());
buffer.clear();
}
listeners.forEach(LogBufferListener::clear);
Expand All @@ -110,6 +116,37 @@ public LogEntry[] toArray() {
}
}

/**
* Represents the state of a buffer.
*
* This record is used to encapsulate the current state of a LogBuffer,
* including its entries, the total number of log entries that have been
* removed, and the total number of log entries that have been added.
*
* @param entries the array of LogEntry objects currently in the buffer
* @param totalRemoved the total count of log entries that have been removed from the buffer
* @param totalAdded the total count of log entries that have been added to the buffer
*/
public record BufferState(LogEntry[] entries, long totalRemoved, long totalAdded) {}

/**
* Retrieves the current state of the buffer, encapsulating the entries within the buffer,
* the total number of entries removed, and the total number of entries added.
* This method is thread-safe as it synchronizes on the buffer while performing operations.
*
* @return a {@code BufferState} instance containing the current buffer entries,
* total removed entries, and total added entries
*/
public BufferState getBufferState() {
synchronized (buffer) {
LogEntry[] array = toArray();
long r = totalRemoved.get();
long a = totalAdded.get();
assert array.length == a - r;
return new BufferState(array, r, a);
}
}

/**
* Get the LogEntry at the specified index in the LogBuffer.
*
Expand Down

0 comments on commit a3371d2

Please sign in to comment.