diff --git a/components/link-manager/src/integrationTest/kotlin/net/corda/p2p/linkmanager/integration/LinkManagerIntegrationTest.kt b/components/link-manager/src/integrationTest/kotlin/net/corda/p2p/linkmanager/integration/LinkManagerIntegrationTest.kt index 6bd0f0d8f74..c49575bee94 100644 --- a/components/link-manager/src/integrationTest/kotlin/net/corda/p2p/linkmanager/integration/LinkManagerIntegrationTest.kt +++ b/components/link-manager/src/integrationTest/kotlin/net/corda/p2p/linkmanager/integration/LinkManagerIntegrationTest.kt @@ -12,6 +12,7 @@ import net.corda.data.p2p.crypto.protocol.RevocationCheckMode import net.corda.db.messagebus.testkit.DBSetup import net.corda.libs.configuration.SmartConfigFactory import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration +import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.HEARTBEAT_ENABLED_KEY import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.HEARTBEAT_MESSAGE_PERIOD_KEY import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.MAX_MESSAGE_SIZE_KEY import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.MAX_REPLAYING_MESSAGES_PER_PEER @@ -113,6 +114,7 @@ class LinkManagerIntegrationTest { return ConfigFactory.empty() .withValue(MAX_MESSAGE_SIZE_KEY, ConfigValueFactory.fromAnyRef(1000000)) .withValue(MAX_REPLAYING_MESSAGES_PER_PEER, ConfigValueFactory.fromAnyRef(100)) + .withValue(HEARTBEAT_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true)) .withValue(HEARTBEAT_MESSAGE_PERIOD_KEY, ConfigValueFactory.fromAnyRef(2000)) .withValue(SESSION_TIMEOUT_KEY, ConfigValueFactory.fromAnyRef(10000)) .withValue(SESSIONS_PER_PEER_KEY, ConfigValueFactory.fromAnyRef(null)) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/metrics/MetricsHelpers.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/metrics/MetricsHelpers.kt index d1cdcdc168d..d5cd4689dc5 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/metrics/MetricsHelpers.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/metrics/MetricsHelpers.kt @@ -86,4 +86,20 @@ private fun recordInboundMessagesMetric(source: String?, dest: String?, group: S builder.withTag(it.first, value) } builder.build().increment() +} + +fun recordOutboundSessionTimeoutMetric(source: HoldingIdentity, destination: HoldingIdentity) { + CordaMetrics.Metric.OutboundSessionTimeoutCount.builder() + .withTag(CordaMetrics.Tag.SourceVirtualNode, source.x500Name.toString()) + .withTag(CordaMetrics.Tag.DestinationVirtualNode, destination.x500Name.toString()) + .withTag(CordaMetrics.Tag.MembershipGroup, source.groupId) + .build().increment() +} + +fun recordInboundSessionTimeoutMetric(source: HoldingIdentity, destination: HoldingIdentity?) { + CordaMetrics.Metric.InboundSessionTimeoutCount.builder() + .withTag(CordaMetrics.Tag.SourceVirtualNode, source.x500Name.toString()) + .withTag(CordaMetrics.Tag.DestinationVirtualNode, destination?.x500Name?.toString() ?: NOT_APPLICABLE_TAG_VALUE) + .withTag(CordaMetrics.Tag.MembershipGroup, source.groupId) + .build().increment() } \ No newline at end of file diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index 493dc3361d3..d6ddcc66b40 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -77,7 +77,6 @@ import net.corda.schema.Schemas.P2P.SESSION_OUT_PARTITIONS import net.corda.schema.configuration.ConfigKeys import net.corda.utilities.VisibleForTesting import net.corda.utilities.time.Clock -import net.corda.utilities.trace import net.corda.v5.membership.MemberInfo import net.corda.virtualnode.HoldingIdentity import net.corda.virtualnode.toCorda @@ -93,11 +92,13 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.locks.ReentrantReadWriteLock import net.corda.membership.lib.exceptions.BadGroupPolicyException -import net.corda.metrics.CordaMetrics.NOT_APPLICABLE_TAG_VALUE import net.corda.p2p.crypto.protocol.api.InvalidSelectedModeError import net.corda.p2p.crypto.protocol.api.NoCommonModeError +import net.corda.p2p.linkmanager.metrics.recordInboundSessionTimeoutMetric import net.corda.p2p.linkmanager.metrics.recordOutboundHeartbeatMessagesMetric +import net.corda.p2p.linkmanager.metrics.recordOutboundSessionTimeoutMetric import net.corda.p2p.linkmanager.sessions.SessionManagerWarnings.badGroupPolicy +import net.corda.utilities.trace import kotlin.concurrent.read import kotlin.concurrent.write @@ -147,11 +148,12 @@ internal class SessionManagerImpl( 2, 1, RevocationCheckMode.OFF, - 432000 + 432000, + true ) ) - private val heartbeatManager: HeartbeatManager = HeartbeatManager( + private val sessionHealthManager: SessionHealthManager = SessionHealthManager( publisherFactory, configurationReaderService, coordinatorFactory, @@ -163,7 +165,7 @@ internal class SessionManagerImpl( clock, executorServiceFactory ) - private val outboundSessionPool = OutboundSessionPool(heartbeatManager::calculateWeightForSession) + private val outboundSessionPool = OutboundSessionPool(sessionHealthManager::calculateWeightForSession) private val publisher = PublisherWithDominoLogic( publisherFactory, @@ -185,7 +187,7 @@ internal class SessionManagerImpl( ::onTileStart, ::onTileClose, dependentChildren = setOf( - heartbeatManager.dominoTile.coordinatorName, sessionReplayer.dominoTile.coordinatorName, + sessionHealthManager.dominoTile.coordinatorName, sessionReplayer.dominoTile.coordinatorName, LifecycleCoordinatorName.forComponent(), LifecycleCoordinatorName.forComponent(), LifecycleCoordinatorName.forComponent(), @@ -193,7 +195,7 @@ internal class SessionManagerImpl( linkManagerHostingMap.dominoTile.coordinatorName, inboundAssignmentListener.dominoTile.coordinatorName, revocationCheckerClient.dominoTile.coordinatorName, ), - managedChildren = setOf(heartbeatManager.dominoTile.toNamedLifecycle(), sessionReplayer.dominoTile.toNamedLifecycle(), + managedChildren = setOf(sessionHealthManager.dominoTile.toNamedLifecycle(), sessionReplayer.dominoTile.toNamedLifecycle(), publisher.dominoTile.toNamedLifecycle(), revocationCheckerClient.dominoTile.toNamedLifecycle()), configurationChangeHandler = SessionManagerConfigChangeHandler() ) @@ -205,6 +207,7 @@ internal class SessionManagerImpl( val sessionsPerPeerForMgm: Int, val revocationConfigMode: RevocationCheckMode, val sessionRefreshThreshold: Int, + val heartbeatsEnabled: Boolean ) internal inner class SessionManagerConfigChangeHandler : ConfigurationChangeHandler( @@ -223,7 +226,7 @@ internal class SessionManagerImpl( if (oldConfiguration != null) { logger.info("The Session Manager got new config. All sessions will be cleaned up.") sessionReplayer.removeAllMessagesFromReplay() - heartbeatManager.stopTrackingAllSessions() + sessionHealthManager.stopTrackingAllSessions() val tombstoneRecords = ( outboundSessionPool.getAllSessionIds() + activeInboundSessions.keys + pendingInboundSessions.keys @@ -254,6 +257,7 @@ internal class SessionManagerImpl( config.getInt(LinkManagerConfiguration.SESSIONS_PER_PEER_FOR_MGM_KEY), config.getEnum(RevocationCheckMode::class.java, LinkManagerConfiguration.REVOCATION_CHECK_KEY), config.getInt(LinkManagerConfiguration.SESSION_REFRESH_THRESHOLD_KEY), + config.getBoolean(LinkManagerConfiguration.HEARTBEAT_ENABLED_KEY) ) } @@ -395,25 +399,25 @@ internal class SessionManagerImpl( override fun dataMessageSent(session: Session) { dominoTile.withLifecycleLock { - heartbeatManager.dataMessageSent(session) + sessionHealthManager.dataMessageSent(session) } } override fun messageAcknowledged(sessionId: String) { dominoTile.withLifecycleLock { - heartbeatManager.messageAcknowledged(sessionId) + sessionHealthManager.messageAcknowledged(sessionId) } } fun sessionMessageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { dominoTile.withLifecycleLock { - heartbeatManager.sessionMessageReceived(sessionId, source, destination) + sessionHealthManager.sessionMessageReceived(sessionId, source, destination) } } override fun dataMessageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity) { dominoTile.withLifecycleLock { - heartbeatManager.dataMessageReceived(sessionId, source, destination) + sessionHealthManager.dataMessageReceived(sessionId, source, destination) } } @@ -577,7 +581,7 @@ internal class SessionManagerImpl( message.second, message.first.sessionId, sessionCounterparties, - heartbeatManager::sessionMessageSent + sessionHealthManager::sessionMessageSent ), sessionCounterparties ) @@ -621,7 +625,7 @@ internal class SessionManagerImpl( responderMemberInfo, p2pParams.networkType )?.let { - heartbeatManager.sessionMessageSent(sessionCounterparties, message.first.sessionId) + sessionHealthManager.sessionMessageSent(sessionCounterparties, message.first.sessionId) message.first.sessionId to it } } @@ -692,7 +696,7 @@ internal class SessionManagerImpl( } sessionReplayer.removeMessageFromReplay(initiatorHelloUniqueId(message.header.sessionId), sessionInfo) - heartbeatManager.messageAcknowledged(message.header.sessionId) + sessionHealthManager.messageAcknowledged(message.header.sessionId) sessionReplayer.addMessageForReplay( initiatorHandshakeUniqueId(message.header.sessionId), @@ -700,7 +704,7 @@ internal class SessionManagerImpl( payload, message.header.sessionId, sessionInfo, - heartbeatManager::sessionMessageSent + sessionHealthManager::sessionMessageSent ), sessionInfo ) @@ -715,7 +719,7 @@ internal class SessionManagerImpl( logger.couldNotFindGroupInfo(message::class.java.simpleName, message.header.sessionId, ourIdentityInfo.holdingIdentity) return null } - heartbeatManager.sessionMessageSent( + sessionHealthManager.sessionMessageSent( sessionInfo, message.header.sessionId, ) @@ -757,8 +761,8 @@ internal class SessionManagerImpl( } val authenticatedSession = session.getSession() sessionReplayer.removeMessageFromReplay(initiatorHandshakeUniqueId(message.header.sessionId), sessionCounterparties) - heartbeatManager.messageAcknowledged(message.header.sessionId) - heartbeatManager.startSendingHeartbeats(authenticatedSession) + sessionHealthManager.messageAcknowledged(message.header.sessionId) + sessionHealthManager.sessionEstablished(authenticatedSession) sessionNegotiationLock.write { outboundSessionPool.updateAfterSessionEstablished(authenticatedSession) pendingOutboundSessionMessageQueues.sessionNegotiatedCallback( @@ -785,8 +789,8 @@ internal class SessionManagerImpl( "Outbound session $sessionId (local=${sessionCounterparties.ourId}, remote=${sessionCounterparties.counterpartyId}) timed " + "out to refresh ephemeral keys and it will be cleaned up." ) - refreshOutboundSession(sessionCounterparties, sessionId) - heartbeatManager.stopTrackingSpecifiedOutboundSession(sessionId) + refreshOutboundSession(sessionCounterparties, sessionId) + sessionHealthManager.stopTrackingSpecifiedOutboundSession(sessionId) } private fun processInitiatorHello(message: InitiatorHelloMessage): LinkOutMessage? { @@ -1001,7 +1005,7 @@ internal class SessionManagerImpl( return false } - class HeartbeatManager( + class SessionHealthManager( publisherFactory: PublisherFactory, private val configurationReaderService: ConfigurationReadService, coordinatorFactory: LifecycleCoordinatorFactory, @@ -1016,30 +1020,46 @@ internal class SessionManagerImpl( companion object { private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) - const val HEARTBEAT_MANAGER_CLIENT_ID = "heartbeat-manager-client" + const val SESSION_HEALTH_MANAGER_CLIENT_ID = "session-health-manager-client" } - private val config = AtomicReference() + private val config = AtomicReference() + private val sessionHealthMonitor = AtomicReference() @VisibleForTesting - internal data class HeartbeatManagerConfig( + internal data class SessionHealthManagerConfig( + val heartbeatEnabled: Boolean, val heartbeatPeriod: Duration, val sessionTimeout: Duration ) @VisibleForTesting - internal inner class HeartbeatManagerConfigChangeHandler : ConfigurationChangeHandler( + internal inner class SessionHealthManagerConfigChangeHandler : ConfigurationChangeHandler( configurationReaderService, ConfigKeys.P2P_LINK_MANAGER_CONFIG, ::fromConfig ) { override fun applyNewConfiguration( - newConfiguration: HeartbeatManagerConfig, - oldConfiguration: HeartbeatManagerConfig?, + newConfiguration: SessionHealthManagerConfig, + oldConfiguration: SessionHealthManagerConfig?, resources: ResourcesHolder, ): CompletableFuture { val configUpdateResult = CompletableFuture() config.set(newConfiguration) + if(newConfiguration.heartbeatEnabled != oldConfiguration?.heartbeatEnabled) { + sessionHealthMonitor.set( + when { + newConfiguration.heartbeatEnabled -> { + logger.info("Using session heartbeats to monitor session health.") + HeartbeatSessionHealthMonitor() + } + else -> { + logger.info("Using message acknowledgements to monitor session health.") + MessageAckSessionHealthMonitor() + } + } + ) + } configUpdateResult.complete(Unit) return configUpdateResult } @@ -1047,8 +1067,9 @@ internal class SessionManagerImpl( private val executorService = executorServiceFactory() - private fun fromConfig(config: Config): HeartbeatManagerConfig { - return HeartbeatManagerConfig( + private fun fromConfig(config: Config): SessionHealthManagerConfig { + return SessionHealthManagerConfig( + config.getBoolean(LinkManagerConfiguration.HEARTBEAT_ENABLED_KEY), Duration.ofMillis(config.getLong(LinkManagerConfiguration.HEARTBEAT_MESSAGE_PERIOD_KEY)), Duration.ofMillis(config.getLong(LinkManagerConfiguration.SESSION_TIMEOUT_KEY)) ) @@ -1060,7 +1081,7 @@ internal class SessionManagerImpl( private val publisher = PublisherWithDominoLogic( publisherFactory, coordinatorFactory, - PublisherConfig(HEARTBEAT_MANAGER_CLIENT_ID, false), + PublisherConfig(SESSION_HEALTH_MANAGER_CLIENT_ID, false), configuration ) @@ -1074,7 +1095,7 @@ internal class SessionManagerImpl( publisher.dominoTile.coordinatorName ), managedChildren = setOf(publisher.dominoTile.toNamedLifecycle()), - configurationChangeHandler = HeartbeatManagerConfigChangeHandler(), + configurationChangeHandler = SessionHealthManagerConfigChangeHandler(), ) /** @@ -1090,7 +1111,8 @@ internal class SessionManagerImpl( * [identityData]: The source and destination identities for this Session. * [lastSendTimestamp]: The last time we sent a message using this Session. * [lastAckTimestamp]: The last time a message we sent via this Session was acknowledged by the other side. - * [sendingHeartbeats]: If true we send heartbeats to the counterparty (this happens after the session established). + * [sendingHeartbeats]: If true we send heartbeats to the counterparty (this happens after the session + * established if configured to do so). */ class TrackedOutboundSession( val identityData: SessionCounterparties, @@ -1122,48 +1144,35 @@ internal class SessionManagerImpl( fun sessionMessageSent(counterparties: SessionCounterparties, sessionId: String) { dominoTile.withLifecycleLock { - if (!isRunning) { - throw IllegalStateException("A session message was added before the HeartbeatManager was started.") + check (isRunning) { + "A session message was added before the ${SessionHealthManager::class.java.simpleName} was started." } trackedOutboundSessions.compute(sessionId) { _, initialTrackedSession -> + val timestamp = timeStamp() if (initialTrackedSession != null) { - initialTrackedSession.lastSendTimestamp = timeStamp() + initialTrackedSession.lastSendTimestamp = timestamp initialTrackedSession } else { - executorService.schedule( - { outboundSessionTimeout(counterparties, sessionId) }, - config.get().sessionTimeout.toMillis(), - TimeUnit.MILLISECONDS - ) - TrackedOutboundSession(counterparties, timeStamp(), timeStamp()) + scheduleOutboundSessionTimeout(counterparties, sessionId, config.get().sessionTimeout) + TrackedOutboundSession(counterparties, timestamp, timestamp) } } } } - fun startSendingHeartbeats(session: Session) { + fun sessionEstablished(session: Session) { dominoTile.withLifecycleLock { - if (!isRunning) { - throw IllegalStateException("A message was sent before the HeartbeatManager was started.") + check (isRunning) { + "A message was sent before the ${SessionHealthManager::class.java.simpleName} was started." } - trackedOutboundSessions.computeIfPresent(session.sessionId) { _, trackedSession -> - if (!trackedSession.sendingHeartbeats) { - executorService.schedule( - { sendHeartbeat(trackedSession.identityData, session) }, - config.get().heartbeatPeriod.toMillis(), - TimeUnit.MILLISECONDS - ) - trackedSession.sendingHeartbeats = true - } - trackedSession - } ?: throw IllegalStateException("A message was sent on session with Id ${session.sessionId} which is not tracked.") + sessionHealthMonitor.get().sessionEstablished(session) } } fun dataMessageSent(session: Session) { dominoTile.withLifecycleLock { - if (!isRunning) { - throw IllegalStateException("A message was sent before the HeartbeatManager was started.") + check (isRunning) { + "A message was sent before the ${SessionHealthManager::class.java.simpleName} was started." } trackedOutboundSessions.computeIfPresent(session.sessionId) { _, trackedSession -> trackedSession.lastSendTimestamp = timeStamp() @@ -1174,8 +1183,8 @@ internal class SessionManagerImpl( fun messageAcknowledged(sessionId: String) { dominoTile.withLifecycleLock { - if (!isRunning) { - throw IllegalStateException("A message was acknowledged before the HeartbeatManager was started.") + check (isRunning) { + "A message was acknowledged before the ${SessionHealthManager::class.java.simpleName} was started." } val sessionInfo = trackedOutboundSessions[sessionId] ?: return@withLifecycleLock logger.trace("Message acknowledged with on a session with Id $sessionId.") @@ -1185,54 +1194,51 @@ internal class SessionManagerImpl( fun sessionMessageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { dominoTile.withLifecycleLock { - check(isRunning) { "A session message was received before the HeartbeatManager was started." } - messageReceived(sessionId, source, destination) + check(isRunning) { + "A session message was received before the ${SessionHealthManager::class.java.simpleName} was started." + } + sessionHealthMonitor.get().messageReceived(sessionId, source, destination) } } fun dataMessageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity) { dominoTile.withLifecycleLock { - check(isRunning) { "A data message was received before the HeartbeatManager was started." } - messageReceived(sessionId, source, destination) + check(isRunning) { + "A data message was received before the ${SessionHealthManager::class.java.simpleName} was started." + } + sessionHealthMonitor.get().messageReceived(sessionId, source, destination) } } - private fun messageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { - trackedInboundSessions.compute(sessionId) { _, initialTrackedSession -> - if (initialTrackedSession != null) { - initialTrackedSession.lastReceivedTimestamp = timeStamp() - initialTrackedSession - } else { - executorService.schedule( - { inboundSessionTimeout(sessionId, source, destination) }, - config.get().sessionTimeout.toMillis(), - TimeUnit.MILLISECONDS - ) - TrackedInboundSession(timeStamp()) - } - } + private fun tearDownOutboundSession(counterparties: SessionCounterparties, sessionId: String) { + destroyOutboundSession(counterparties, sessionId) + trackedOutboundSessions.remove(sessionId) + recordOutboundSessionTimeoutMetric(counterparties.ourId, counterparties.counterpartyId) } - private fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { - val sessionInfo = trackedOutboundSessions[sessionId] ?: return - val timeSinceLastAck = timeStamp() - sessionInfo.lastAckTimestamp - val sessionTimeoutMs = config.get().sessionTimeout.toMillis() - if (timeSinceLastAck >= sessionTimeoutMs) { - logger.info( - "Outbound session $sessionId (local=${counterparties.ourId}, remote=" + - "${counterparties.counterpartyId}) has not received any messages for the configured " + - "timeout threshold ($sessionTimeoutMs ms) so it will be cleaned up." - ) - destroyOutboundSession(counterparties, sessionId) - trackedOutboundSessions.remove(sessionId) - recordOutboundSessionTimeoutMetric(counterparties.ourId, counterparties.counterpartyId) - } else { - executorService.schedule( - { outboundSessionTimeout(counterparties, sessionId) }, - sessionTimeoutMs - timeSinceLastAck, - TimeUnit.MILLISECONDS - ) - } + private fun scheduleOutboundSessionTimeout( + counterparties: SessionCounterparties, + sessionId: String, + delay: Duration + ) { + executorService.schedule( + { sessionHealthMonitor.get().checkIfOutboundSessionTimeout(counterparties, sessionId) }, + delay.toMillis(), + TimeUnit.MILLISECONDS + ) + } + + private fun scheduleInboundSessionTimeout( + sessionId: String, + source: HoldingIdentity, + destination: HoldingIdentity?, + delay: Duration + ) { + executorService.schedule( + { inboundSessionTimeout(sessionId, source, destination) }, + delay.toMillis(), + TimeUnit.MILLISECONDS + ) } private fun inboundSessionTimeout(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { @@ -1248,108 +1254,198 @@ internal class SessionManagerImpl( trackedInboundSessions.remove(sessionId) recordInboundSessionTimeoutMetric(source, destination) } else { - executorService.schedule( - { inboundSessionTimeout(sessionId, source, destination) }, - sessionTimeoutMs - timeSinceLastReceived, - TimeUnit.MILLISECONDS - ) + scheduleInboundSessionTimeout(sessionId, source, destination, Duration.ofMillis(sessionTimeoutMs - timeSinceLastReceived)) } } - private fun sendHeartbeat(counterparties: SessionCounterparties, session: Session) { - val sessionInfo = trackedOutboundSessions[session.sessionId] - if (sessionInfo == null) { - logger.info("Stopped sending heartbeats for session (${session.sessionId}), which expired.") - return + private fun timeStamp(): Long { + return clock.instant().toEpochMilli() + } + + /** + * Implementations of [SessionHealthMonitor] provide different methods of determining when a session has become + * unhealthy and handling of unhealthy sessions. + */ + sealed interface SessionHealthMonitor { + fun sessionEstablished(session: Session) + + fun messageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) + + fun checkIfOutboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) + } + + /** + * Monitors session health based on a heart beating mechanism. + */ + private inner class HeartbeatSessionHealthMonitor: SessionHealthMonitor { + override fun sessionEstablished(session: Session) { + trackedOutboundSessions.computeIfPresent(session.sessionId) { _, trackedSession -> + if (!trackedSession.sendingHeartbeats) { + executorService.schedule( + { sendHeartbeat(trackedSession.identityData, session) }, + config.get().heartbeatPeriod.toMillis(), + TimeUnit.MILLISECONDS + ) + trackedSession.sendingHeartbeats = true + } + trackedSession + } ?: throw IllegalStateException("A message was sent on session with Id ${session.sessionId} which is not tracked.") } - val config = config.get() - val timeSinceLastSend = timeStamp() - sessionInfo.lastSendTimestamp - if (timeSinceLastSend >= config.heartbeatPeriod.toMillis()) { - logger.trace { - "Sending heartbeat message between ${counterparties.ourId} (our Identity) and " + - "${counterparties.counterpartyId}." + override fun messageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { + trackedInboundSessions.compute(sessionId) { _, initialTrackedSession -> + if (initialTrackedSession != null) { + initialTrackedSession.lastReceivedTimestamp = timeStamp() + initialTrackedSession + } else { + scheduleInboundSessionTimeout(sessionId, source, destination, config.get().sessionTimeout) + TrackedInboundSession(timeStamp()) + } } - sendHeartbeatMessage( - counterparties.ourId, - counterparties.counterpartyId, - session, - counterparties.status, - counterparties.serial - ) - executorService.schedule( - { sendHeartbeat(counterparties, session) }, - config.heartbeatPeriod.toMillis(), - TimeUnit.MILLISECONDS - ) - } else { - executorService.schedule( - { sendHeartbeat(counterparties, session) }, - config.heartbeatPeriod.toMillis() - timeSinceLastSend, - TimeUnit.MILLISECONDS - ) } - } - private fun sendHeartbeatMessage( - source: HoldingIdentity, - dest: HoldingIdentity, - session: Session, - filter: MembershipStatusFilter, - serial: Long - ) { - val heartbeatMessage = HeartbeatMessage() - val message = MessageConverter.linkOutMessageFromHeartbeat( - source, - dest, - heartbeatMessage, - session, - groupPolicyProvider, - membershipGroupReaderProvider, - filter, - serial - ) - if (message == null) { - logger.warn("Failed to send a Heartbeat between $source and $dest.") - return + override fun checkIfOutboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { + val sessionInfo = trackedOutboundSessions[sessionId] ?: return + val timeSinceLastAck = timeStamp() - sessionInfo.lastAckTimestamp + val sessionTimeoutMs = config.get().sessionTimeout.toMillis() + if (timeSinceLastAck >= sessionTimeoutMs) { + logger.info( + "Outbound session $sessionId (local=${counterparties.ourId}, remote=" + + "${counterparties.counterpartyId}) has not received any messages for the configured " + + "timeout threshold ($sessionTimeoutMs ms) so it will be cleaned up." + ) + tearDownOutboundSession(counterparties, sessionId) + } else { + scheduleOutboundSessionTimeout(counterparties, sessionId, Duration.ofMillis(sessionTimeoutMs - timeSinceLastAck)) + } } - val future = publisher.publish( - listOf( - Record( - LINK_OUT_TOPIC, - UUID.randomUUID().toString(), - message + + private fun sendHeartbeat(counterparties: SessionCounterparties, session: Session) { + val sessionInfo = trackedOutboundSessions[session.sessionId] + if (sessionInfo == null) { + logger.info("Stopped sending heartbeats for session (${session.sessionId}), which expired.") + return + } + val config = config.get() + if(!config.heartbeatEnabled) { + logger.info("Heartbeats have been disabled. Stopping heartbeats for (${session.sessionId}).") + return + } + + val timeSinceLastSend = timeStamp() - sessionInfo.lastSendTimestamp + if (timeSinceLastSend >= config.heartbeatPeriod.toMillis()) { + logger.trace { + "Sending heartbeat message between ${counterparties.ourId} (our Identity) and " + + "${counterparties.counterpartyId}." + } + sendHeartbeatMessage( + counterparties.ourId, + counterparties.counterpartyId, + session, + counterparties.status, + counterparties.serial + ) + executorService.schedule( + { sendHeartbeat(counterparties, session) }, + config.heartbeatPeriod.toMillis(), + TimeUnit.MILLISECONDS + ) + } else { + executorService.schedule( + { sendHeartbeat(counterparties, session) }, + config.heartbeatPeriod.toMillis() - timeSinceLastSend, + TimeUnit.MILLISECONDS + ) + } + } + + private fun sendHeartbeatMessage( + source: HoldingIdentity, + dest: HoldingIdentity, + session: Session, + filter: MembershipStatusFilter, + serial: Long + ) { + val heartbeatMessage = HeartbeatMessage() + val message = MessageConverter.linkOutMessageFromHeartbeat( + source, + dest, + heartbeatMessage, + session, + groupPolicyProvider, + membershipGroupReaderProvider, + filter, + serial + ) + if (message == null) { + logger.warn("Failed to send a Heartbeat between $source and $dest.") + return + } + val future = publisher.publish( + listOf( + Record( + LINK_OUT_TOPIC, + UUID.randomUUID().toString(), + message + ) ) ) - ) - future.single().whenComplete { _, error -> - if (error != null) { - logger.warn("An exception was thrown when sending a heartbeat message.\nException:", error) - } else { - recordOutboundHeartbeatMessagesMetric(source, dest) + future.single().whenComplete { _, error -> + if (error != null) { + logger.warn("An exception was thrown when sending a heartbeat message.\nException:", error) + } else { + recordOutboundHeartbeatMessagesMetric(source, dest) + } } } } - private fun timeStamp(): Long { - return clock.instant().toEpochMilli() - } + /** + * Monitors session health based on whether sent messages have been acknowledged in a timely manner or not. + */ + private inner class MessageAckSessionHealthMonitor: SessionHealthMonitor { + override fun sessionEstablished(session: Session) { + check(trackedOutboundSessions.containsKey(session.sessionId)) { + "A message was sent on session with Id ${session.sessionId} which is not tracked." + } + logger.debug( + "Session heartbeats are disabled. Not starting heartbeats for session ${session.sessionId}." + ) + } - private fun recordOutboundSessionTimeoutMetric(source: HoldingIdentity, destination: HoldingIdentity) { - CordaMetrics.Metric.OutboundSessionTimeoutCount.builder() - .withTag(CordaMetrics.Tag.SourceVirtualNode, source.x500Name.toString()) - .withTag(CordaMetrics.Tag.DestinationVirtualNode, destination.x500Name.toString()) - .withTag(CordaMetrics.Tag.MembershipGroup, source.groupId) - .build().increment() - } + override fun messageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { + logger.debug( + "Session heartbeats are disabled. " + + "Inbound session timeout not enabled for session with ID $sessionId." + ) + } - private fun recordInboundSessionTimeoutMetric(source: HoldingIdentity, destination: HoldingIdentity?) { - CordaMetrics.Metric.InboundSessionTimeoutCount.builder() - .withTag(CordaMetrics.Tag.SourceVirtualNode, source.x500Name.toString()) - .withTag(CordaMetrics.Tag.DestinationVirtualNode, destination?.x500Name?.toString() ?: NOT_APPLICABLE_TAG_VALUE) - .withTag(CordaMetrics.Tag.MembershipGroup, source.groupId) - .build().increment() + override fun checkIfOutboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { + val sessionInfo = trackedOutboundSessions[sessionId] ?: return + val now = timeStamp() + val timeSinceLastAck = now - sessionInfo.lastAckTimestamp + val timeSinceLastSent = now - sessionInfo.lastSendTimestamp + val maxWaitForAck = config.get().sessionTimeout.toMillis() + val waitingForAck = timeSinceLastAck > timeSinceLastSent + if (waitingForAck && timeSinceLastSent >= maxWaitForAck) { + logger.info( + "Outbound session $sessionId (local=${counterparties.ourId}, remote=" + + "${counterparties.counterpartyId}) has not received any acknowledgement to the last sent message " + + "within the configured timeout threshold ($maxWaitForAck ms) so it will be cleaned up. " + + "Time since last ack ${timeSinceLastAck}ms. ]" + + "Time since last sent ${timeSinceLastSent}ms." + ) + tearDownOutboundSession(counterparties, sessionId) + } else { + val delay = if (waitingForAck) { + maxWaitForAck - timeSinceLastSent + } else { + maxWaitForAck + } + scheduleOutboundSessionTimeout(counterparties, sessionId, Duration.ofMillis(delay)) + } + } } } } diff --git a/components/link-manager/src/nonOsgiIntegrationTest/kotlin/net/corda/p2p/P2PLayerEndToEndTest.kt b/components/link-manager/src/nonOsgiIntegrationTest/kotlin/net/corda/p2p/P2PLayerEndToEndTest.kt index 69e5fb7f5be..aed56396da0 100644 --- a/components/link-manager/src/nonOsgiIntegrationTest/kotlin/net/corda/p2p/P2PLayerEndToEndTest.kt +++ b/components/link-manager/src/nonOsgiIntegrationTest/kotlin/net/corda/p2p/P2PLayerEndToEndTest.kt @@ -29,6 +29,7 @@ import net.corda.libs.configuration.SmartConfig import net.corda.libs.configuration.SmartConfigFactory import net.corda.libs.configuration.merger.impl.ConfigMergerImpl import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration +import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.HEARTBEAT_ENABLED_KEY import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.HEARTBEAT_MESSAGE_PERIOD_KEY import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.MAX_MESSAGE_SIZE_KEY import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.MAX_REPLAYING_MESSAGES_PER_PEER @@ -519,6 +520,7 @@ class P2PLayerEndToEndTest { ConfigFactory.empty() .withValue(MAX_MESSAGE_SIZE_KEY, ConfigValueFactory.fromAnyRef(1000000)) .withValue(MAX_REPLAYING_MESSAGES_PER_PEER, ConfigValueFactory.fromAnyRef(100)) + .withValue(HEARTBEAT_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true)) .withValue(HEARTBEAT_MESSAGE_PERIOD_KEY, ConfigValueFactory.fromAnyRef(Duration.ofSeconds(2))) .withValue(SESSION_TIMEOUT_KEY, ConfigValueFactory.fromAnyRef(Duration.ofSeconds(10))) .withValue(SESSIONS_PER_PEER_KEY, ConfigValueFactory.fromAnyRef(null)) diff --git a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt index 5840222fd91..cf05daa4934 100644 --- a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt +++ b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt @@ -69,7 +69,6 @@ import org.assertj.core.api.Assertions.assertThat import org.bouncycastle.jce.provider.BouncyCastleProvider import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -102,6 +101,10 @@ import net.corda.p2p.linkmanager.grouppolicy.protocolModes import java.util.UUID import java.util.concurrent.TimeUnit import net.corda.membership.lib.exceptions.BadGroupPolicyException +import net.corda.p2p.linkmanager.sessions.SessionManagerImpl.SessionHealthManager.Companion.SESSION_HEALTH_MANAGER_CLIENT_ID +import net.corda.p2p.linkmanager.sessions.SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfigChangeHandler +import net.corda.p2p.linkmanager.sessions.SessionManagerImpl.SessionManagerConfigChangeHandler +import org.mockito.ArgumentMatchers.anyList class SessionManagerTest { @@ -115,14 +118,21 @@ class SessionManagerTest { val RANDOM_BYTES = ByteBuffer.wrap("some-random-data".toByteArray()) private val sixDaysInMillis = 6.days.toMillis() - private val configWithHeartbeat = SessionManagerImpl.HeartbeatManager.HeartbeatManagerConfig( + private val configWithHeartbeat = SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfig( + true, Duration.ofMillis(100), Duration.ofMillis(500) ) - private val configNoHeartbeat = SessionManagerImpl.HeartbeatManager.HeartbeatManagerConfig( + private val configWithHighHeartbeatPeriod = SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfig( + true, Duration.ofMillis(sixDaysInMillis), Duration.ofMillis(sixDaysInMillis) ) + private val configWithNoHeartbeat = SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfig( + false, + Duration.ofMillis(100), + Duration.ofMillis(500) + ) private val keyGenerator = KeyPairGenerator.getInstance("EC", BouncyCastleProvider()) private val messageDigest = MessageDigest.getInstance(ProtocolConstants.HASH_ALGO, BouncyCastleProvider()) @@ -172,18 +182,16 @@ class SessionManagerTest { outboundSessionPool.close() } - private lateinit var configHandler: SessionManagerImpl.SessionManagerConfigChangeHandler - private lateinit var heartbeatConfigHandler: SessionManagerImpl.HeartbeatManager.HeartbeatManagerConfigChangeHandler + private lateinit var configHandler: SessionManagerConfigChangeHandler + private lateinit var sessionHealthManagerConfigHandler: SessionHealthManagerConfigChangeHandler + + @Suppress("UNCHECKED_CAST") private val dominoTile = Mockito.mockConstruction(ComplexDominoTile::class.java) { mock, context -> - @Suppress("UNCHECKED_CAST") whenever(mock.withLifecycleLock(any<() -> Any>())).doAnswer { (it.arguments.first() as () -> Any).invoke() } - @Suppress("UNCHECKED_CAST") whenever(mock.withLifecycleWriteLock(any<() -> Any>())).doAnswer { (it.arguments.first() as () -> Any).invoke() } - if (context.arguments()[6] is SessionManagerImpl.SessionManagerConfigChangeHandler) { - configHandler = context.arguments()[6] as SessionManagerImpl.SessionManagerConfigChangeHandler - } - if (context.arguments()[6] is SessionManagerImpl.HeartbeatManager.HeartbeatManagerConfigChangeHandler) { - heartbeatConfigHandler = context.arguments()[6] as SessionManagerImpl.HeartbeatManager.HeartbeatManagerConfigChangeHandler + when(val seventhArg = context.arguments()[6]) { + is SessionManagerConfigChangeHandler -> configHandler = seventhArg + is SessionHealthManagerConfigChangeHandler -> sessionHealthManagerConfigHandler = seventhArg } whenever(mock.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) } @@ -297,6 +305,7 @@ class SessionManagerTest { SESSIONS_PER_COUNTERPARTIES_FOR_MGM, RevocationCheckMode.OFF, SESSION_REFRESH_THRESHOLD_KEY, + true ) private val configWithOneSessionBetweenMembers = SessionManagerImpl.SessionManagerConfig( MAX_MESSAGE_SIZE, @@ -304,35 +313,61 @@ class SessionManagerTest { SESSIONS_PER_COUNTERPARTIES_FOR_MGM, RevocationCheckMode.OFF, SESSION_REFRESH_THRESHOLD_KEY, + true + ) + private val configWithOneSessionBetweenMembersAndNoHeartbeats = SessionManagerImpl.SessionManagerConfig( + MAX_MESSAGE_SIZE, + 1, + SESSIONS_PER_COUNTERPARTIES_FOR_MGM, + RevocationCheckMode.OFF, + SESSION_REFRESH_THRESHOLD_KEY, + false ) - private val sessionManager = SessionManagerImpl( - groupPolicyProvider, - membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - config, - null, + private val sessionManager = createSessionManager(mock(), config, configWithHighHeartbeatPeriod) + + private fun createSessionManager( + resourcesHolder: ResourcesHolder = mock(), + sessionManagerConfig: SessionManagerImpl.SessionManagerConfig, + sessionHealthManagerConfig: SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfig + ): SessionManagerImpl { + return SessionManagerImpl( + groupPolicyProvider, + membershipGroupReaderProvider, + cryptoOpsClient, + pendingSessionMessageQueues, mock(), - ) - heartbeatConfigHandler.applyNewConfiguration(configNoHeartbeat, null, mock()) + mock(), + mock(), + mock(), + mock { + val dominoTile = mock { + whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) + } + on { it.dominoTile } doReturn dominoTile + }, + linkManagerHostingMap, + protocolFactory, + mockTimeFacilitiesProvider.clock, + sessionReplayer, + ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { + setRunning() + configHandler.applyNewConfiguration( + sessionManagerConfig, + null, + mock(), + ) + sessionHealthManagerConfigHandler.applyNewConfiguration(sessionHealthManagerConfig, null, resourcesHolder) + } + } + + @Suppress("UNCHECKED_CAST") + private fun mockSessionHealthManagerPublisherAndCaptureRecords(callback: (List>) -> List>) { + publisherWithDominoLogicByClientId[SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { + whenever(it.publish(any())).doAnswer { invocation -> + return@doAnswer callback(invocation.arguments.first() as List>) + } + } } private fun MessageDigest.hash(data: ByteArray): ByteArray { @@ -367,24 +402,27 @@ class SessionManagerTest { } /** - * Send the [sessionManager] an authenticatedMessage and a [ResponderHandshakeMessage] so that it starts sending Heartbeats. + * Send the [sessionManager] an authenticatedMessage and a [ResponderHandshakeMessage] so that it a session is started. */ - private fun startSendingHeartbeats(sessionManager: SessionManager) { + private fun startSession( + sessionManager: SessionManager, + localProtocolInitiator: AuthenticationProtocolInitiator = protocolInitiator + ) { val initiatorHello = mock() - whenever(protocolInitiator.generateInitiatorHello()).thenReturn(initiatorHello) + whenever(localProtocolInitiator.generateInitiatorHello()).thenReturn(initiatorHello) whenever(outboundSessionPool.constructed().last().getNextSession(counterparties)).thenReturn( OutboundSessionPool.SessionPoolStatus.NewSessionsNeeded ) sessionManager.processOutboundMessages(listOf(message)) { it } - whenever(outboundSessionPool.constructed().last().getSession(protocolInitiator.sessionId)).thenReturn( - OutboundSessionPool.SessionType.PendingSession(counterparties, protocolInitiator) + whenever(outboundSessionPool.constructed().last().getSession(localProtocolInitiator.sessionId)).thenReturn( + OutboundSessionPool.SessionType.PendingSession(counterparties, localProtocolInitiator) ) - val header = CommonHeader(MessageType.RESPONDER_HANDSHAKE, 1, protocolInitiator.sessionId, 4, Instant.now().toEpochMilli()) + val header = CommonHeader(MessageType.RESPONDER_HANDSHAKE, 1, localProtocolInitiator.sessionId, 4, Instant.now().toEpochMilli()) val responderHandshakeMessage = ResponderHandshakeMessage(header, RANDOM_BYTES, RANDOM_BYTES) - whenever(authenticatedSession.sessionId).doAnswer { protocolInitiator.sessionId } - whenever(protocolInitiator.getSession()).thenReturn(authenticatedSession) + whenever(authenticatedSession.sessionId).doAnswer { localProtocolInitiator.sessionId } + whenever(localProtocolInitiator.getSession()).thenReturn(authenticatedSession) sessionManager.processSessionMessages(listOf(LinkInMessage(responderHandshakeMessage))) { it } } @@ -1634,36 +1672,10 @@ class SessionManagerTest { } @Test - fun `when responder hello is received, the session is pending, if no response is received, the session times out`() { + @Suppress("MaxLineLength") + fun `when responder hello is received, the session is pending, if no response is received, the session times out if heartbeats are enabled`() { val resourceHolder = ResourcesHolder() - val sessionManager = SessionManagerImpl( - groupPolicyProvider, - membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourceHolder) - } + val sessionManager = createSessionManager(resourceHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) sessionManager.start() val sessionId = "some-session" @@ -1679,9 +1691,8 @@ class SessionManagerTest { val header = CommonHeader(MessageType.RESPONDER_HANDSHAKE, 1, sessionId, 4, Instant.now().toEpochMilli()) val responderHello = ResponderHelloMessage(header, ByteBuffer.wrap(PEER_KEY.public.encoded)) sessionManager.processSessionMessages(listOf(LinkInMessage(responderHello))) {it}.single().second - assertTrue(sessionManager.processOutboundMessages(listOf(message)) { it }.single().second - is SessionManager.SessionState.SessionAlreadyPending - ) + assertThat(sessionManager.processOutboundMessages(listOf(message)) { it }.single().second) + .isInstanceOf(SessionManager.SessionState.SessionAlreadyPending::class.java) mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.sessionTimeout.plus(5.millis)) verify(outboundSessionPool.constructed().last()).replaceSession(counterparties, sessionId, protocolInitiator) @@ -1690,35 +1701,39 @@ class SessionManagerTest { } @Test - fun `when responder handshake is received, the session is established, if no message is sent, the session times out`() { + @Suppress("MaxLineLength") + fun `when responder hello is received, the session is pending, if no response is received, the session does not time out if heartbeats are disabled`() { val resourceHolder = ResourcesHolder() - val sessionManager = SessionManagerImpl( - groupPolicyProvider, membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourceHolder) - } + val sessionManager = createSessionManager(resourceHolder, configWithOneSessionBetweenMembersAndNoHeartbeats, configWithNoHeartbeat) + sessionManager.start() + + val sessionId = "some-session" + whenever(outboundSessionPool.constructed().last().getSession(sessionId)).thenReturn( + OutboundSessionPool.SessionType.PendingSession(counterparties, protocolInitiator) + ) + whenever(outboundSessionPool.constructed().last().getNextSession(counterparties)).thenReturn( + OutboundSessionPool.SessionPoolStatus.SessionPending + ) + + val initiatorHandshakeMsg = mock() + whenever(protocolInitiator.generateOurHandshakeMessage(eq(PEER_KEY.public), eq(null), any())).thenReturn(initiatorHandshakeMsg) + val header = CommonHeader(MessageType.RESPONDER_HANDSHAKE, 1, sessionId, 4, Instant.now().toEpochMilli()) + val responderHello = ResponderHelloMessage(header, ByteBuffer.wrap(PEER_KEY.public.encoded)) + sessionManager.processSessionMessages(listOf(LinkInMessage(responderHello))) {it}.single().second + assertThat(sessionManager.processOutboundMessages(listOf(message)) { it }.single().second) + .isInstanceOf(SessionManager.SessionState.SessionAlreadyPending::class.java) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.sessionTimeout.plus(5.millis)) + verify(outboundSessionPool.constructed().last(), never()).replaceSession(any(), any(), any()) + + sessionManager.stop() + resourceHolder.close() + } + + @Test + @Suppress("MaxLineLength") + fun `when responder handshake is received, the session is established, if no message is sent, the session times out if heartbeats are enabled`() { + val resourceHolder = ResourcesHolder() + val sessionManager = createSessionManager(resourceHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) sessionManager.start() val initiatorHello = mock() @@ -1762,52 +1777,58 @@ class SessionManagerTest { resourceHolder.close() } + @Test + @Suppress("MaxLineLength") + fun `when responder handshake is received, the session is established, if no message is sent, the session doesn't time out if heartbeats are disabled`() { + val resourceHolder = ResourcesHolder() + val sessionManager = createSessionManager(resourceHolder, configWithOneSessionBetweenMembersAndNoHeartbeats, configWithNoHeartbeat) + sessionManager.start() + + val initiatorHello = mock() + whenever(protocolInitiator.generateInitiatorHello()).thenReturn(initiatorHello) + whenever(outboundSessionPool.constructed().last().getNextSession(counterparties)).thenReturn( + OutboundSessionPool.SessionPoolStatus.NewSessionsNeeded + ) + sessionManager.processOutboundMessages(listOf(message)) { it } + whenever(outboundSessionPool.constructed().last().getSession(protocolInitiator.sessionId)).thenReturn( + OutboundSessionPool.SessionType.PendingSession(counterparties, protocolInitiator) + ) + val header = CommonHeader( + MessageType.RESPONDER_HANDSHAKE, + 1, + protocolInitiator.sessionId, + 4, + Instant.now().toEpochMilli() + ) + val responderHandshakeMessage = ResponderHandshakeMessage(header, RANDOM_BYTES, RANDOM_BYTES) + val session = mock() + whenever(session.sessionId).doAnswer { protocolInitiator.sessionId } + whenever(protocolInitiator.getSession()).thenReturn(session) + sessionManager.processSessionMessages(listOf(LinkInMessage(responderHandshakeMessage))) { it } + + whenever(secondProtocolInitiator.generateInitiatorHello()).thenReturn(initiatorHello) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.sessionTimeout.plus(5.millis)) + + verify(outboundSessionPool.constructed().last(), never()).replaceSession(any(), any(), any(),) + verify(publisherWithDominoLogicByClientId["session-manager"]!!.last(), never()).publish(anyList()) + + sessionManager.stop() + resourceHolder.close() + } + @Test fun `when a responder handshake message is received, heartbeats are sent, if these are not acknowledged the session times out`() { val messages = mutableListOf() - fun callback(records: List>): List> { + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> val record = records.single() assertEquals(LINK_OUT_TOPIC, record.topic) messages.add((record.value as LinkOutMessage).payload as AuthenticatedDataMessage) - return listOf(CompletableFuture.completedFuture(Unit)) - } - - val resourcesHolder = ResourcesHolder() - val sessionManager = SessionManagerImpl( - groupPolicyProvider, membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) - } - @Suppress("UNCHECKED_CAST") - publisherWithDominoLogicByClientId[SessionManagerImpl.HeartbeatManager.HEARTBEAT_MANAGER_CLIENT_ID]!!.forEach { - whenever(it.publish(any())).doAnswer { invocation -> - callback(invocation.arguments.first() as List>) - } + listOf(CompletableFuture.completedFuture(Unit)) } sessionManager.start() - startSendingHeartbeats(sessionManager) + startSession(sessionManager) whenever(outboundSessionPool.constructed().last().replaceSession(eq(counterparties), eq(sessionId), any())).thenReturn(true) whenever(secondProtocolInitiator.generateInitiatorHello()).thenReturn(mock()) @@ -1832,62 +1853,153 @@ class SessionManagerTest { } @Test - fun `when a responder handshake message is received, heartbeats are sent, this continues if the heartbeat manager gets a new config`() { + @Suppress("MaxLineLength") + fun `when a responder handshake message is received, heartbeats are not sent, and sessions don't time out if heartbeats are disabled`() { + val messages = mutableListOf() + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager( + resourcesHolder, + configWithOneSessionBetweenMembersAndNoHeartbeats, + configWithNoHeartbeat + ) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> + val record = records.single() + assertEquals(LINK_OUT_TOPIC, record.topic) + messages.add((record.value as LinkOutMessage).payload as AuthenticatedDataMessage) + listOf(CompletableFuture.completedFuture(Unit)) + } + sessionManager.start() + startSession(sessionManager) + + whenever(outboundSessionPool.constructed().last().replaceSession(eq(counterparties), eq(sessionId), any())).thenReturn(true) + whenever(secondProtocolInitiator.generateInitiatorHello()).thenReturn(mock()) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.sessionTimeout.plus(5.millis)) + verify(outboundSessionPool.constructed().last(), never()).replaceSession(any(), any(), any()) + verify(publisherWithDominoLogicByClientId["session-manager"]!!.last(), never()).publish(anyList()) + + sessionManager.stop() + resourcesHolder.close() + + assertThat(messages).isEmpty() + } + + @Test + @Suppress("MaxLineLength") + fun `when a responder handshake message is received, heartbeats are sent if enabled, this continues if the heartbeat manager gets a new config with heartbeats enabled`() { val messages = Collections.synchronizedList(mutableListOf()) - fun callback(records: List>): List> { + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> val record = records.single() val message = (record.value as LinkOutMessage).payload as AuthenticatedDataMessage messages.add(message) - return listOf(CompletableFuture.completedFuture(Unit)) + listOf(CompletableFuture.completedFuture(Unit)) } + sessionManager.start() + startSession(sessionManager) + + mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) + mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) + assertThat(messages.size).isEqualTo(2) + + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, configWithHeartbeat, mock()) + + mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) + mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) + assertThat(messages.size).isEqualTo(4) + + sessionManager.stop() + resourcesHolder.close() + } + + @Test + @Suppress("MaxLineLength") + fun `when a responder handshake message is received, heartbeats are sent if enabled, this stops if the heartbeat manager gets a new config with heartbeats disabled`() { + val messages = Collections.synchronizedList(mutableListOf()) val resourcesHolder = ResourcesHolder() - val sessionManager = SessionManagerImpl( - groupPolicyProvider, membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, mock()) - } - @Suppress("UNCHECKED_CAST") - publisherWithDominoLogicByClientId[SessionManagerImpl.HeartbeatManager.HEARTBEAT_MANAGER_CLIENT_ID]!!.forEach { - whenever(it.publish(any())).doAnswer { invocation -> - callback(invocation.arguments.first() as List>) - } + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> + val record = records.single() + val message = (record.value as LinkOutMessage).payload as AuthenticatedDataMessage + messages.add(message) + listOf(CompletableFuture.completedFuture(Unit)) } sessionManager.start() - startSendingHeartbeats(sessionManager) + startSession(sessionManager) mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) assertThat(messages.size).isEqualTo(2) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, mock()) + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithNoHeartbeat, configWithHeartbeat, mock()) + + startSession(sessionManager, secondProtocolInitiator) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + assertThat(messages.size).isEqualTo(2) + + sessionManager.stop() + resourcesHolder.close() + } + + @Test + @Suppress("MaxLineLength") + fun `when a responder handshake message is received, heartbeats are not sent if disabled, this continues if the heartbeat manager gets a new config with heartbeats disabled`() { + val messages = Collections.synchronizedList(mutableListOf()) + + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembersAndNoHeartbeats, configWithNoHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> + val record = records.single() + val message = (record.value as LinkOutMessage).payload as AuthenticatedDataMessage + messages.add(message) + listOf(CompletableFuture.completedFuture(Unit)) + } + sessionManager.start() + startSession(sessionManager) + + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + assertThat(messages).isEmpty() + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithNoHeartbeat, configWithNoHeartbeat, mock()) + + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + assertThat(messages).isEmpty() + + sessionManager.stop() + resourcesHolder.close() + } + + @Test + @Suppress("MaxLineLength") + fun `when a responder handshake message is received, heartbeats are not sent if disabled, they start if the heartbeat manager gets a new config with heartbeats enabled`() { + val messages = Collections.synchronizedList(mutableListOf()) + + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembersAndNoHeartbeats, configWithNoHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> + val record = records.single() + val message = (record.value as LinkOutMessage).payload as AuthenticatedDataMessage + messages.add(message) + listOf(CompletableFuture.completedFuture(Unit)) + } + sessionManager.start() + startSession(sessionManager) + + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + assertThat(messages).isEmpty() + + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, configWithNoHeartbeat, mock()) + + startSession(sessionManager, secondProtocolInitiator) mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) - assertThat(messages.size).isEqualTo(4) + assertThat(messages).hasSize(2) sessionManager.stop() resourcesHolder.close() @@ -1897,53 +2009,21 @@ class SessionManagerTest { fun `when a responder handshake message is received, heartbeats are sent, this stops if the session manager gets a new config`() { var linkOutMessages = 0 val resourcesHolder = ResourcesHolder() - fun callback(records: List>): List> { + + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> for (record in records) { if (record.topic == LINK_OUT_TOPIC) { linkOutMessages++ } } - return listOf(CompletableFuture.completedFuture(Unit)) - } - - val sessionManager = SessionManagerImpl( - groupPolicyProvider, membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) - } - @Suppress("UNCHECKED_CAST") - publisherWithDominoLogicByClientId[SessionManagerImpl.HeartbeatManager.HEARTBEAT_MANAGER_CLIENT_ID]!!.forEach { - whenever(it.publish(any())).doAnswer { invocation -> - callback(invocation.arguments.first() as List>) - } + listOf(CompletableFuture.completedFuture(Unit)) } sessionManager.start() whenever(outboundSessionPool.constructed().last().replaceSession(eq(counterparties), eq(sessionId), any())).thenReturn(true) whenever(outboundSessionPool.constructed().last().getAllSessionIds()).thenAnswer { (listOf(protocolInitiator.sessionId)) } - startSendingHeartbeats(sessionManager) + startSession(sessionManager) repeat(2) { mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) } assertThat(linkOutMessages).isEqualTo(2) @@ -1956,6 +2036,7 @@ class SessionManagerTest { SESSIONS_PER_COUNTERPARTIES_FOR_MGM, RevocationCheckMode.OFF, SESSION_REFRESH_THRESHOLD_KEY, + true ), resourcesHolder, ) @@ -1974,52 +2055,19 @@ class SessionManagerTest { val resourcesHolder = ResourcesHolder() val messages = Collections.synchronizedList(mutableListOf()) - fun callback(records: List>): List> { + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> val record = records.single() assertEquals(LINK_OUT_TOPIC, record.topic) val message = (record.value as LinkOutMessage).payload as AuthenticatedDataMessage messages.add(message) - return listOf(CompletableFuture.completedFuture(Unit)) - } - - val sessionManager = SessionManagerImpl( - groupPolicyProvider, membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) - } - publisherWithDominoLogicByClientId[SessionManagerImpl.HeartbeatManager.HEARTBEAT_MANAGER_CLIENT_ID]!!.forEach { - whenever(it.publish(any())).doAnswer { invocation -> - @Suppress("UNCHECKED_CAST") - callback(invocation.arguments.first() as List>) - } + listOf(CompletableFuture.completedFuture(Unit)) } sessionManager.start() whenever(outboundSessionPool.constructed().last().replaceSession(eq(counterparties), eq(sessionId), any())).thenReturn(true) whenever(outboundSessionPool.constructed().last().getAllSessionIds()).thenAnswer { (listOf(protocolInitiator.sessionId)) } - startSendingHeartbeats(sessionManager) + startSession(sessionManager) // sum of heartbeats extending over the session timeout val numberOfHeartbeats = configWithHeartbeat.let { @@ -2053,41 +2101,15 @@ class SessionManagerTest { } val resourcesHolder = ResourcesHolder() - val sessionManager = SessionManagerImpl( - groupPolicyProvider, membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) - } - publisherWithDominoLogicByClientId[SessionManagerImpl.HeartbeatManager.HEARTBEAT_MANAGER_CLIENT_ID]!!.forEach { + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) + publisherWithDominoLogicByClientId[SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { whenever(it.publish(any())).doAnswer { publish() } } sessionManager.start() whenever(outboundSessionPool.constructed().last().replaceSession(eq(counterparties), eq(sessionId), any())).thenReturn(true) whenever(outboundSessionPool.constructed().last().getAllSessionIds()).thenAnswer { (listOf(protocolInitiator.sessionId)) } - startSendingHeartbeats(sessionManager) + startSession(sessionManager) repeat(3) { mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) } assertThat(sentHeartbeats).isEqualTo(3) @@ -2143,60 +2165,26 @@ class SessionManagerTest { @Test fun `sessions that have been refreshed are not tracked by the heartbeat manager`() { - val longTimePeriodConfigWithHeartbeat = SessionManagerImpl.HeartbeatManager.HeartbeatManagerConfig( + val longTimePeriodConfigWithHeartbeat = SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfig( + true, Duration.ofDays(1), Duration.ofDays(10) ) val messages = Collections.synchronizedList(mutableListOf()) - - fun callback(records: List>): List> { + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembers, longTimePeriodConfigWithHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> val record = records.single() assertEquals(LINK_OUT_TOPIC, record.topic) val message = (record.value as LinkOutMessage).payload as AuthenticatedDataMessage messages.add(message) - return listOf(CompletableFuture.completedFuture(Unit)) - } - - val resourcesHolder = ResourcesHolder() - val sessionManager = SessionManagerImpl( - groupPolicyProvider, membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - heartbeatConfigHandler.applyNewConfiguration(longTimePeriodConfigWithHeartbeat, null, mock()) + listOf(CompletableFuture.completedFuture(Unit)) } - @Suppress("UNCHECKED_CAST") - publisherWithDominoLogicByClientId[SessionManagerImpl.HeartbeatManager.HEARTBEAT_MANAGER_CLIENT_ID]!!.forEach { - whenever(it.publish(any())).doAnswer { invocation -> - callback(invocation.arguments.first() as List>) - } - } - val header = CommonHeader(MessageType.RESPONDER_HANDSHAKE, 1, protocolInitiator.sessionId, 4, Instant.now().toEpochMilli()) val responderHello = ResponderHelloMessage(header, ByteBuffer.wrap(PEER_KEY.public.encoded)) sessionManager.processSessionMessages(listOf(LinkInMessage(responderHello))) {it} - startSendingHeartbeats(sessionManager) + startSession(sessionManager) fun advanceTimeAndAcknowledgeMessages() { @@ -2447,4 +2435,96 @@ class SessionManagerTest { eq(TimeUnit.MILLISECONDS) ) } + + @Test + fun `when heartbeats are disabled and a message is sent, the session will timeout if the message is not acknowledged`() { + val messages = mutableListOf() + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager( + resourcesHolder, + configWithOneSessionBetweenMembersAndNoHeartbeats, + configWithNoHeartbeat + ) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> + val record = records.single() + assertEquals(LINK_OUT_TOPIC, record.topic) + messages.add((record.value as LinkOutMessage).payload as AuthenticatedDataMessage) + listOf(CompletableFuture.completedFuture(Unit)) + } + sessionManager.start() + startSession(sessionManager) + mockTimeFacilitiesProvider.advanceTime(5.millis) + sessionManager.dataMessageSent(authenticatedSession) + + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.sessionTimeout.plus(5.millis)) + verify(outboundSessionPool.constructed().last()).replaceSession(eq(counterparties), eq(sessionId), any()) + + sessionManager.stop() + resourcesHolder.close() + + assertThat(messages).isEmpty() + } + + @Test + fun `when heartbeats are disabled and a message is sent then acknowledged, the session will not timeout`() { + val messages = mutableListOf() + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager( + resourcesHolder, + configWithOneSessionBetweenMembersAndNoHeartbeats, + configWithNoHeartbeat + ) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> + val record = records.single() + assertEquals(LINK_OUT_TOPIC, record.topic) + messages.add((record.value as LinkOutMessage).payload as AuthenticatedDataMessage) + listOf(CompletableFuture.completedFuture(Unit)) + } + sessionManager.start() + startSession(sessionManager) + mockTimeFacilitiesProvider.advanceTime(5.millis) + sessionManager.dataMessageSent(authenticatedSession) + mockTimeFacilitiesProvider.advanceTime(5.millis) + sessionManager.messageAcknowledged(authenticatedSession.sessionId) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.sessionTimeout.plus(5.millis)) + + verify(outboundSessionPool.constructed().last(), never()).replaceSession(eq(counterparties), eq(sessionId), any()) + + sessionManager.stop() + resourcesHolder.close() + + assertThat(messages).isEmpty() + } + + @Test + @Suppress("MaxLineLength") + fun `when heartbeats are disabled and a message is sent and not acknowledged, the session will timeout only after the session timeout duration has passed`() { + val messages = mutableListOf() + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager( + resourcesHolder, + configWithOneSessionBetweenMembersAndNoHeartbeats, + configWithNoHeartbeat + ) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> + val record = records.single() + assertEquals(LINK_OUT_TOPIC, record.topic) + messages.add((record.value as LinkOutMessage).payload as AuthenticatedDataMessage) + listOf(CompletableFuture.completedFuture(Unit)) + } + sessionManager.start() + startSession(sessionManager) + mockTimeFacilitiesProvider.advanceTime(5.millis) + sessionManager.dataMessageSent(authenticatedSession) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.sessionTimeout.dividedBy(2)) + sessionManager.messageAcknowledged(authenticatedSession.sessionId) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.sessionTimeout.dividedBy(2).plus(5.millis)) + + verify(outboundSessionPool.constructed().last(), never()).replaceSession(eq(counterparties), eq(sessionId), any()) + + sessionManager.stop() + resourcesHolder.close() + + assertThat(messages).isEmpty() + } } diff --git a/libs/configuration/configuration-schema/p2p/src/main/kotlin/net/corda/libs/configuration/schema/p2p/LinkManagerConfiguration.kt b/libs/configuration/configuration-schema/p2p/src/main/kotlin/net/corda/libs/configuration/schema/p2p/LinkManagerConfiguration.kt index 8cf38c12f0a..fb7bbaa3c55 100644 --- a/libs/configuration/configuration-schema/p2p/src/main/kotlin/net/corda/libs/configuration/schema/p2p/LinkManagerConfiguration.kt +++ b/libs/configuration/configuration-schema/p2p/src/main/kotlin/net/corda/libs/configuration/schema/p2p/LinkManagerConfiguration.kt @@ -8,6 +8,7 @@ class LinkManagerConfiguration { const val BASE_REPLAY_PERIOD_KEY = "baseReplayPeriod" const val REPLAY_PERIOD_CUTOFF_KEY = "replayPeriodCutoff" const val MAX_REPLAYING_MESSAGES_PER_PEER = "maxReplayingMessages" + const val HEARTBEAT_ENABLED_KEY = "heartbeatEnabled" const val HEARTBEAT_MESSAGE_PERIOD_KEY = "heartbeatMessagePeriod" const val SESSION_TIMEOUT_KEY = "sessionTimeout" const val SESSIONS_PER_PEER_KEY = "sessionsPerPeer"