Skip to content

Commit

Permalink
#60 #65 Postpone informing other cluster members of new member join u…
Browse files Browse the repository at this point in the history
…ntil the new member has finished cleaning up old data.
  • Loading branch information
Emiel van der Herberg authored and guusdk committed Oct 21, 2021
1 parent abf316d commit b542ca3
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.cluster.ClusterNodeInfo;
import org.jivesoftware.openfire.cluster.NodeID;
import org.jivesoftware.openfire.muc.cluster.NewClusterMemberJoinedTask;
import org.jivesoftware.openfire.plugin.util.cluster.HazelcastClusterNodeInfo;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.cache.CacheWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -151,21 +153,49 @@ public void memberAdded(final MembershipEvent event) {
final NodeID nodeID = ClusteredCacheFactory.getNodeID(event.getMember());
if (event.getMember().localMember()) { // We left and re-joined the cluster
joinCluster();

waitForClusterCacheToBeInstalled();

// Let the other nodes know that we joined the cluster
logger.debug("Done joining the cluster. Now proceed informing other nodes that we joined the cluster.");
CacheFactory.doClusterTask(new NewClusterMemberJoinedTask(true));
} else {
if(wasSenior && !isSenior) {
if (wasSenior && !isSenior) {
logger.warn("Recovering from split-brain; firing leftCluster()/joinedCluster() events");
ClusteredCacheFactory.fireLeftClusterAndWaitToComplete(Duration.ofSeconds(30));
logger.debug("Firing joinedCluster() event");
ClusterManager.fireJoinedCluster(true);
} else {
// Trigger event that a new node has joined the cluster
ClusterManager.fireJoinedCluster(nodeID.toByteArray(), true);

waitForClusterCacheToBeInstalled();

// Let the other nodes know that we joined the cluster
logger.debug("Done joining the cluster in split brain recovery. Now proceed informing other nodes that we joined the cluster.");
CacheFactory.doClusterTask(new NewClusterMemberJoinedTask(true));
}
}
clusterNodesInfo.put(nodeID,
new HazelcastClusterNodeInfo(event.getMember(), cluster.getClusterTime()));
}

/**
* Blocks the current thread until the cluster cache is guaranteed to support clustering. This is especially useful
* for executing cluster tasks immediately after joining. If this wait is not performed, the cache factory may still
* be using the 'default' strategy instead of the 'hazelcast' strategy, which leads to cluster tasks being silently
* discarded.
*/
private void waitForClusterCacheToBeInstalled() {
if (!ClusteredCacheFactory.PLUGIN_NAME.equals(CacheFactory.getPluginName())) {
logger.debug("This node now joined a cluster, but the cache factory has not been swapped to '{}' yet. Waiting for that to happen.", ClusteredCacheFactory.PLUGIN_NAME);
while (!ClusteredCacheFactory.PLUGIN_NAME.equals(CacheFactory.getPluginName())) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
}
logger.debug("Cache factory has been swapped to '{}'. Cluster join is considered complete.", ClusteredCacheFactory.PLUGIN_NAME);
}
}

@Override
public void memberRemoved(final MembershipEvent event) {
logger.info("Received a Hazelcast memberRemoved event {}", event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,18 @@

package org.jivesoftware.openfire.plugin.util.cache;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import com.hazelcast.config.ClasspathXmlConfig;
import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MaxSizeConfig;
import com.hazelcast.config.MemberAttributeConfig;
import com.hazelcast.config.MemcacheProtocolConfig;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.config.RestApiConfig;
import com.hazelcast.core.Cluster;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.Member;
import org.jivesoftware.openfire.JMXManager;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterEventListener;
Expand All @@ -57,18 +48,26 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.hazelcast.config.ClasspathXmlConfig;
import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MaxSizeConfig;
import com.hazelcast.config.MemberAttributeConfig;
import com.hazelcast.config.MemcacheProtocolConfig;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.config.RestApiConfig;
import com.hazelcast.core.Cluster;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.Member;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
* CacheFactory implementation to use when using Hazelcast in cluster mode.
Expand Down Expand Up @@ -130,6 +129,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
.build();

private static final Logger logger = LoggerFactory.getLogger(ClusteredCacheFactory.class);
public static final String PLUGIN_NAME = "hazelcast";

/**
* Keep serialization strategy the server was using before we set our strategy. We will
Expand Down Expand Up @@ -350,6 +350,7 @@ public long getClusterTime() {
*/
@Override
public void doClusterTask(final ClusterTask<?> task) {

if (cluster == null) {
return;
}
Expand All @@ -360,6 +361,8 @@ public void doClusterTask(final ClusterTask<?> task) {
members.add(member);
}
}


if (!members.isEmpty()) {
// Asynchronously execute the task on the other cluster members
logger.debug("Executing asynchronous MultiTask: " + task.getClass().getName());
Expand Down Expand Up @@ -517,7 +520,7 @@ public void updateCacheStats(final Map<String, Cache> caches) {

@Override
public String getPluginName() {
return "hazelcast";
return PLUGIN_NAME;
}

@Override
Expand Down

0 comments on commit b542ca3

Please sign in to comment.