From a623177ee80908216cc1ebda9187f964abe35341 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Fri, 5 Jan 2024 17:09:47 +0000 Subject: [PATCH 01/24] add config to switch off heartbeats --- .../p2p/linkmanager/metrics/MetricsHelpers.kt | 16 ++ .../sessions/SessionManagerImpl.kt | 202 +++++++++++------- .../sessions/SessionManagerTest.kt | 28 +-- gradle.properties | 2 +- .../schema/p2p/LinkManagerConfiguration.kt | 1 + 5 files changed, 158 insertions(+), 91 deletions(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/metrics/MetricsHelpers.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/metrics/MetricsHelpers.kt index d1cdcdc168d..d5cd4689dc5 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/metrics/MetricsHelpers.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/metrics/MetricsHelpers.kt @@ -86,4 +86,20 @@ private fun recordInboundMessagesMetric(source: String?, dest: String?, group: S builder.withTag(it.first, value) } builder.build().increment() +} + +fun recordOutboundSessionTimeoutMetric(source: HoldingIdentity, destination: HoldingIdentity) { + CordaMetrics.Metric.OutboundSessionTimeoutCount.builder() + .withTag(CordaMetrics.Tag.SourceVirtualNode, source.x500Name.toString()) + .withTag(CordaMetrics.Tag.DestinationVirtualNode, destination.x500Name.toString()) + .withTag(CordaMetrics.Tag.MembershipGroup, source.groupId) + .build().increment() +} + +fun recordInboundSessionTimeoutMetric(source: HoldingIdentity, destination: HoldingIdentity?) { + CordaMetrics.Metric.InboundSessionTimeoutCount.builder() + .withTag(CordaMetrics.Tag.SourceVirtualNode, source.x500Name.toString()) + .withTag(CordaMetrics.Tag.DestinationVirtualNode, destination?.x500Name?.toString() ?: NOT_APPLICABLE_TAG_VALUE) + .withTag(CordaMetrics.Tag.MembershipGroup, source.groupId) + .build().increment() } \ No newline at end of file diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index 493dc3361d3..32b8d5cfc8c 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -93,10 +93,11 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.locks.ReentrantReadWriteLock import net.corda.membership.lib.exceptions.BadGroupPolicyException -import net.corda.metrics.CordaMetrics.NOT_APPLICABLE_TAG_VALUE import net.corda.p2p.crypto.protocol.api.InvalidSelectedModeError import net.corda.p2p.crypto.protocol.api.NoCommonModeError +import net.corda.p2p.linkmanager.metrics.recordInboundSessionTimeoutMetric import net.corda.p2p.linkmanager.metrics.recordOutboundHeartbeatMessagesMetric +import net.corda.p2p.linkmanager.metrics.recordOutboundSessionTimeoutMetric import net.corda.p2p.linkmanager.sessions.SessionManagerWarnings.badGroupPolicy import kotlin.concurrent.read import kotlin.concurrent.write @@ -151,7 +152,7 @@ internal class SessionManagerImpl( ) ) - private val heartbeatManager: HeartbeatManager = HeartbeatManager( + private val sessionHealthManager: SessionHealthManager = SessionHealthManager( publisherFactory, configurationReaderService, coordinatorFactory, @@ -163,7 +164,7 @@ internal class SessionManagerImpl( clock, executorServiceFactory ) - private val outboundSessionPool = OutboundSessionPool(heartbeatManager::calculateWeightForSession) + private val outboundSessionPool = OutboundSessionPool(sessionHealthManager::calculateWeightForSession) private val publisher = PublisherWithDominoLogic( publisherFactory, @@ -185,7 +186,7 @@ internal class SessionManagerImpl( ::onTileStart, ::onTileClose, dependentChildren = setOf( - heartbeatManager.dominoTile.coordinatorName, sessionReplayer.dominoTile.coordinatorName, + sessionHealthManager.dominoTile.coordinatorName, sessionReplayer.dominoTile.coordinatorName, LifecycleCoordinatorName.forComponent(), LifecycleCoordinatorName.forComponent(), LifecycleCoordinatorName.forComponent(), @@ -193,7 +194,7 @@ internal class SessionManagerImpl( linkManagerHostingMap.dominoTile.coordinatorName, inboundAssignmentListener.dominoTile.coordinatorName, revocationCheckerClient.dominoTile.coordinatorName, ), - managedChildren = setOf(heartbeatManager.dominoTile.toNamedLifecycle(), sessionReplayer.dominoTile.toNamedLifecycle(), + managedChildren = setOf(sessionHealthManager.dominoTile.toNamedLifecycle(), sessionReplayer.dominoTile.toNamedLifecycle(), publisher.dominoTile.toNamedLifecycle(), revocationCheckerClient.dominoTile.toNamedLifecycle()), configurationChangeHandler = SessionManagerConfigChangeHandler() ) @@ -223,7 +224,7 @@ internal class SessionManagerImpl( if (oldConfiguration != null) { logger.info("The Session Manager got new config. All sessions will be cleaned up.") sessionReplayer.removeAllMessagesFromReplay() - heartbeatManager.stopTrackingAllSessions() + sessionHealthManager.stopTrackingAllSessions() val tombstoneRecords = ( outboundSessionPool.getAllSessionIds() + activeInboundSessions.keys + pendingInboundSessions.keys @@ -395,25 +396,25 @@ internal class SessionManagerImpl( override fun dataMessageSent(session: Session) { dominoTile.withLifecycleLock { - heartbeatManager.dataMessageSent(session) + sessionHealthManager.dataMessageSent(session) } } override fun messageAcknowledged(sessionId: String) { dominoTile.withLifecycleLock { - heartbeatManager.messageAcknowledged(sessionId) + sessionHealthManager.messageAcknowledged(sessionId) } } fun sessionMessageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { dominoTile.withLifecycleLock { - heartbeatManager.sessionMessageReceived(sessionId, source, destination) + sessionHealthManager.sessionMessageReceived(sessionId, source, destination) } } override fun dataMessageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity) { dominoTile.withLifecycleLock { - heartbeatManager.dataMessageReceived(sessionId, source, destination) + sessionHealthManager.dataMessageReceived(sessionId, source, destination) } } @@ -577,7 +578,7 @@ internal class SessionManagerImpl( message.second, message.first.sessionId, sessionCounterparties, - heartbeatManager::sessionMessageSent + sessionHealthManager::sessionMessageSent ), sessionCounterparties ) @@ -621,7 +622,7 @@ internal class SessionManagerImpl( responderMemberInfo, p2pParams.networkType )?.let { - heartbeatManager.sessionMessageSent(sessionCounterparties, message.first.sessionId) + sessionHealthManager.sessionMessageSent(sessionCounterparties, message.first.sessionId) message.first.sessionId to it } } @@ -692,7 +693,7 @@ internal class SessionManagerImpl( } sessionReplayer.removeMessageFromReplay(initiatorHelloUniqueId(message.header.sessionId), sessionInfo) - heartbeatManager.messageAcknowledged(message.header.sessionId) + sessionHealthManager.messageAcknowledged(message.header.sessionId) sessionReplayer.addMessageForReplay( initiatorHandshakeUniqueId(message.header.sessionId), @@ -700,7 +701,7 @@ internal class SessionManagerImpl( payload, message.header.sessionId, sessionInfo, - heartbeatManager::sessionMessageSent + sessionHealthManager::sessionMessageSent ), sessionInfo ) @@ -715,7 +716,7 @@ internal class SessionManagerImpl( logger.couldNotFindGroupInfo(message::class.java.simpleName, message.header.sessionId, ourIdentityInfo.holdingIdentity) return null } - heartbeatManager.sessionMessageSent( + sessionHealthManager.sessionMessageSent( sessionInfo, message.header.sessionId, ) @@ -757,8 +758,8 @@ internal class SessionManagerImpl( } val authenticatedSession = session.getSession() sessionReplayer.removeMessageFromReplay(initiatorHandshakeUniqueId(message.header.sessionId), sessionCounterparties) - heartbeatManager.messageAcknowledged(message.header.sessionId) - heartbeatManager.startSendingHeartbeats(authenticatedSession) + sessionHealthManager.messageAcknowledged(message.header.sessionId) + sessionHealthManager.sessionEstablished(authenticatedSession) sessionNegotiationLock.write { outboundSessionPool.updateAfterSessionEstablished(authenticatedSession) pendingOutboundSessionMessageQueues.sessionNegotiatedCallback( @@ -786,7 +787,7 @@ internal class SessionManagerImpl( "out to refresh ephemeral keys and it will be cleaned up." ) refreshOutboundSession(sessionCounterparties, sessionId) - heartbeatManager.stopTrackingSpecifiedOutboundSession(sessionId) + sessionHealthManager.stopTrackingSpecifiedOutboundSession(sessionId) } private fun processInitiatorHello(message: InitiatorHelloMessage): LinkOutMessage? { @@ -1001,7 +1002,7 @@ internal class SessionManagerImpl( return false } - class HeartbeatManager( + class SessionHealthManager( publisherFactory: PublisherFactory, private val configurationReaderService: ConfigurationReadService, coordinatorFactory: LifecycleCoordinatorFactory, @@ -1016,26 +1017,27 @@ internal class SessionManagerImpl( companion object { private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) - const val HEARTBEAT_MANAGER_CLIENT_ID = "heartbeat-manager-client" + const val SESSION_HEALTH_MANAGER_CLIENT_ID = "session-health-manager-client" } - private val config = AtomicReference() + private val config = AtomicReference() @VisibleForTesting - internal data class HeartbeatManagerConfig( + internal data class SessionHealthManagerConfig( + val heartbeatEnabled: Boolean, val heartbeatPeriod: Duration, val sessionTimeout: Duration ) @VisibleForTesting - internal inner class HeartbeatManagerConfigChangeHandler : ConfigurationChangeHandler( + internal inner class SessionHealthManagerConfigChangeHandler : ConfigurationChangeHandler( configurationReaderService, ConfigKeys.P2P_LINK_MANAGER_CONFIG, ::fromConfig ) { override fun applyNewConfiguration( - newConfiguration: HeartbeatManagerConfig, - oldConfiguration: HeartbeatManagerConfig?, + newConfiguration: SessionHealthManagerConfig, + oldConfiguration: SessionHealthManagerConfig?, resources: ResourcesHolder, ): CompletableFuture { val configUpdateResult = CompletableFuture() @@ -1047,8 +1049,9 @@ internal class SessionManagerImpl( private val executorService = executorServiceFactory() - private fun fromConfig(config: Config): HeartbeatManagerConfig { - return HeartbeatManagerConfig( + private fun fromConfig(config: Config): SessionHealthManagerConfig { + return SessionHealthManagerConfig( + config.getBoolean(LinkManagerConfiguration.HEARTBEAT_ENABLED_KEY), Duration.ofMillis(config.getLong(LinkManagerConfiguration.HEARTBEAT_MESSAGE_PERIOD_KEY)), Duration.ofMillis(config.getLong(LinkManagerConfiguration.SESSION_TIMEOUT_KEY)) ) @@ -1060,7 +1063,7 @@ internal class SessionManagerImpl( private val publisher = PublisherWithDominoLogic( publisherFactory, coordinatorFactory, - PublisherConfig(HEARTBEAT_MANAGER_CLIENT_ID, false), + PublisherConfig(SESSION_HEALTH_MANAGER_CLIENT_ID, false), configuration ) @@ -1074,7 +1077,7 @@ internal class SessionManagerImpl( publisher.dominoTile.coordinatorName ), managedChildren = setOf(publisher.dominoTile.toNamedLifecycle()), - configurationChangeHandler = HeartbeatManagerConfigChangeHandler(), + configurationChangeHandler = SessionHealthManagerConfigChangeHandler(), ) /** @@ -1090,7 +1093,8 @@ internal class SessionManagerImpl( * [identityData]: The source and destination identities for this Session. * [lastSendTimestamp]: The last time we sent a message using this Session. * [lastAckTimestamp]: The last time a message we sent via this Session was acknowledged by the other side. - * [sendingHeartbeats]: If true we send heartbeats to the counterparty (this happens after the session established). + * [sendingHeartbeats]: If true we send heartbeats to the counterparty (this happens after the session + * established if configured to do so). */ class TrackedOutboundSession( val identityData: SessionCounterparties, @@ -1122,12 +1126,13 @@ internal class SessionManagerImpl( fun sessionMessageSent(counterparties: SessionCounterparties, sessionId: String) { dominoTile.withLifecycleLock { - if (!isRunning) { - throw IllegalStateException("A session message was added before the HeartbeatManager was started.") + check (isRunning) { + "A session message was added before the ${SessionHealthManager::class.java.simpleName} was started." } trackedOutboundSessions.compute(sessionId) { _, initialTrackedSession -> + val timestamp = timeStamp() if (initialTrackedSession != null) { - initialTrackedSession.lastSendTimestamp = timeStamp() + initialTrackedSession.lastSendTimestamp = timestamp initialTrackedSession } else { executorService.schedule( @@ -1135,25 +1140,29 @@ internal class SessionManagerImpl( config.get().sessionTimeout.toMillis(), TimeUnit.MILLISECONDS ) - TrackedOutboundSession(counterparties, timeStamp(), timeStamp()) + TrackedOutboundSession(counterparties, timestamp, timestamp) } } } } - fun startSendingHeartbeats(session: Session) { + fun sessionEstablished(session: Session) { dominoTile.withLifecycleLock { - if (!isRunning) { - throw IllegalStateException("A message was sent before the HeartbeatManager was started.") + check (isRunning) { + "A message was sent before the ${SessionHealthManager::class.java.simpleName} was started." } trackedOutboundSessions.computeIfPresent(session.sessionId) { _, trackedSession -> - if (!trackedSession.sendingHeartbeats) { + if (config.get().heartbeatEnabled && !trackedSession.sendingHeartbeats) { executorService.schedule( { sendHeartbeat(trackedSession.identityData, session) }, config.get().heartbeatPeriod.toMillis(), TimeUnit.MILLISECONDS ) trackedSession.sendingHeartbeats = true + } else if(!config.get().heartbeatEnabled) { + logger.debug( + "Session heartbeats are disabled. Not starting heartbeats for session ${session.sessionId}." + ) } trackedSession } ?: throw IllegalStateException("A message was sent on session with Id ${session.sessionId} which is not tracked.") @@ -1162,8 +1171,8 @@ internal class SessionManagerImpl( fun dataMessageSent(session: Session) { dominoTile.withLifecycleLock { - if (!isRunning) { - throw IllegalStateException("A message was sent before the HeartbeatManager was started.") + check (isRunning) { + "A message was sent before the ${SessionHealthManager::class.java.simpleName} was started." } trackedOutboundSessions.computeIfPresent(session.sessionId) { _, trackedSession -> trackedSession.lastSendTimestamp = timeStamp() @@ -1174,8 +1183,8 @@ internal class SessionManagerImpl( fun messageAcknowledged(sessionId: String) { dominoTile.withLifecycleLock { - if (!isRunning) { - throw IllegalStateException("A message was acknowledged before the HeartbeatManager was started.") + check (isRunning) { + "A message was acknowledged before the ${SessionHealthManager::class.java.simpleName} was started." } val sessionInfo = trackedOutboundSessions[sessionId] ?: return@withLifecycleLock logger.trace("Message acknowledged with on a session with Id $sessionId.") @@ -1185,35 +1194,54 @@ internal class SessionManagerImpl( fun sessionMessageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { dominoTile.withLifecycleLock { - check(isRunning) { "A session message was received before the HeartbeatManager was started." } + check(isRunning) { + "A session message was received before the ${SessionHealthManager::class.java.simpleName} was started." + } messageReceived(sessionId, source, destination) } } fun dataMessageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity) { dominoTile.withLifecycleLock { - check(isRunning) { "A data message was received before the HeartbeatManager was started." } + check(isRunning) { + "A data message was received before the ${SessionHealthManager::class.java.simpleName} was started." + } messageReceived(sessionId, source, destination) } } private fun messageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { - trackedInboundSessions.compute(sessionId) { _, initialTrackedSession -> - if (initialTrackedSession != null) { - initialTrackedSession.lastReceivedTimestamp = timeStamp() - initialTrackedSession - } else { - executorService.schedule( - { inboundSessionTimeout(sessionId, source, destination) }, - config.get().sessionTimeout.toMillis(), - TimeUnit.MILLISECONDS - ) - TrackedInboundSession(timeStamp()) + if(config.get().heartbeatEnabled) { + trackedInboundSessions.compute(sessionId) { _, initialTrackedSession -> + if (initialTrackedSession != null) { + initialTrackedSession.lastReceivedTimestamp = timeStamp() + initialTrackedSession + } else { + executorService.schedule( + { inboundSessionTimeout(sessionId, source, destination) }, + config.get().sessionTimeout.toMillis(), + TimeUnit.MILLISECONDS + ) + TrackedInboundSession(timeStamp()) + } } + } else { + logger.debug( + "Heartbeats are disabled. " + + "Inbound session timeout not enabled for session with ID $sessionId." + ) } } private fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { + if(config.get().heartbeatEnabled) { + heartbeatOutboundSessionTimeout(counterparties, sessionId) + } else { + noHeartbeatOutboundSessionTimeout(counterparties, sessionId) + } + } + + private fun heartbeatOutboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { val sessionInfo = trackedOutboundSessions[sessionId] ?: return val timeSinceLastAck = timeStamp() - sessionInfo.lastAckTimestamp val sessionTimeoutMs = config.get().sessionTimeout.toMillis() @@ -1223,18 +1251,52 @@ internal class SessionManagerImpl( "${counterparties.counterpartyId}) has not received any messages for the configured " + "timeout threshold ($sessionTimeoutMs ms) so it will be cleaned up." ) - destroyOutboundSession(counterparties, sessionId) - trackedOutboundSessions.remove(sessionId) - recordOutboundSessionTimeoutMetric(counterparties.ourId, counterparties.counterpartyId) + tearDownOutboundSession(counterparties, sessionId) } else { - executorService.schedule( - { outboundSessionTimeout(counterparties, sessionId) }, - sessionTimeoutMs - timeSinceLastAck, - TimeUnit.MILLISECONDS + scheduleOutboundSessionTimeout(counterparties, sessionId, sessionTimeoutMs - timeSinceLastAck) + } + } + + private fun noHeartbeatOutboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { + val sessionInfo = trackedOutboundSessions[sessionId] ?: return + val timestamp = timeStamp() + val timeSinceLastAck = timestamp - sessionInfo.lastAckTimestamp + val timeSinceLastSent = timestamp - sessionInfo.lastSendTimestamp + val maxWaitForAck = config.get().sessionTimeout.toMillis() + val waitingForAck = timeSinceLastAck > timeSinceLastSent + if (waitingForAck && timeSinceLastSent >= maxWaitForAck) { + logger.info( + "Outbound session $sessionId (local=${counterparties.ourId}, remote=" + + "${counterparties.counterpartyId}) has not received any acknowledgement to the last sent message " + + "within the configured timeout threshold ($maxWaitForAck ms) so it will be cleaned up. " + + "Time since last ack ${timeSinceLastAck}ms. ]" + + "Time since last sent ${timeSinceLastSent}ms." ) + tearDownOutboundSession(counterparties, sessionId) + } else { + val delay = if (waitingForAck) { + maxWaitForAck - timeSinceLastSent + } else { + maxWaitForAck + } + scheduleOutboundSessionTimeout(counterparties, sessionId, delay) } } + private fun tearDownOutboundSession(counterparties: SessionCounterparties, sessionId: String) { + destroyOutboundSession(counterparties, sessionId) + trackedOutboundSessions.remove(sessionId) + recordOutboundSessionTimeoutMetric(counterparties.ourId, counterparties.counterpartyId) + } + + private fun scheduleOutboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String, delay: Long) { + executorService.schedule( + { outboundSessionTimeout(counterparties, sessionId) }, + delay, + TimeUnit.MILLISECONDS + ) + } + private fun inboundSessionTimeout(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { val sessionInfo = trackedInboundSessions[sessionId] ?: return val timeSinceLastReceived = timeStamp() - sessionInfo.lastReceivedTimestamp @@ -1335,21 +1397,5 @@ internal class SessionManagerImpl( private fun timeStamp(): Long { return clock.instant().toEpochMilli() } - - private fun recordOutboundSessionTimeoutMetric(source: HoldingIdentity, destination: HoldingIdentity) { - CordaMetrics.Metric.OutboundSessionTimeoutCount.builder() - .withTag(CordaMetrics.Tag.SourceVirtualNode, source.x500Name.toString()) - .withTag(CordaMetrics.Tag.DestinationVirtualNode, destination.x500Name.toString()) - .withTag(CordaMetrics.Tag.MembershipGroup, source.groupId) - .build().increment() - } - - private fun recordInboundSessionTimeoutMetric(source: HoldingIdentity, destination: HoldingIdentity?) { - CordaMetrics.Metric.InboundSessionTimeoutCount.builder() - .withTag(CordaMetrics.Tag.SourceVirtualNode, source.x500Name.toString()) - .withTag(CordaMetrics.Tag.DestinationVirtualNode, destination?.x500Name?.toString() ?: NOT_APPLICABLE_TAG_VALUE) - .withTag(CordaMetrics.Tag.MembershipGroup, source.groupId) - .build().increment() - } } } diff --git a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt index 5840222fd91..ff81cac928d 100644 --- a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt +++ b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt @@ -115,11 +115,13 @@ class SessionManagerTest { val RANDOM_BYTES = ByteBuffer.wrap("some-random-data".toByteArray()) private val sixDaysInMillis = 6.days.toMillis() - private val configWithHeartbeat = SessionManagerImpl.HeartbeatManager.HeartbeatManagerConfig( + private val configWithHeartbeat = SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfig( + true, Duration.ofMillis(100), Duration.ofMillis(500) ) - private val configNoHeartbeat = SessionManagerImpl.HeartbeatManager.HeartbeatManagerConfig( + private val configNoHeartbeat = SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfig( + true, Duration.ofMillis(sixDaysInMillis), Duration.ofMillis(sixDaysInMillis) ) @@ -173,7 +175,7 @@ class SessionManagerTest { } private lateinit var configHandler: SessionManagerImpl.SessionManagerConfigChangeHandler - private lateinit var heartbeatConfigHandler: SessionManagerImpl.HeartbeatManager.HeartbeatManagerConfigChangeHandler + private lateinit var heartbeatConfigHandler: SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfigChangeHandler private val dominoTile = Mockito.mockConstruction(ComplexDominoTile::class.java) { mock, context -> @Suppress("UNCHECKED_CAST") whenever(mock.withLifecycleLock(any<() -> Any>())).doAnswer { (it.arguments.first() as () -> Any).invoke() } @@ -182,8 +184,9 @@ class SessionManagerTest { if (context.arguments()[6] is SessionManagerImpl.SessionManagerConfigChangeHandler) { configHandler = context.arguments()[6] as SessionManagerImpl.SessionManagerConfigChangeHandler } - if (context.arguments()[6] is SessionManagerImpl.HeartbeatManager.HeartbeatManagerConfigChangeHandler) { - heartbeatConfigHandler = context.arguments()[6] as SessionManagerImpl.HeartbeatManager.HeartbeatManagerConfigChangeHandler + if (context.arguments()[6] is SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfigChangeHandler) { + heartbeatConfigHandler = context.arguments()[6] + as SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfigChangeHandler } whenever(mock.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) } @@ -1801,7 +1804,7 @@ class SessionManagerTest { heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) } @Suppress("UNCHECKED_CAST") - publisherWithDominoLogicByClientId[SessionManagerImpl.HeartbeatManager.HEARTBEAT_MANAGER_CLIENT_ID]!!.forEach { + publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { whenever(it.publish(any())).doAnswer { invocation -> callback(invocation.arguments.first() as List>) } @@ -1871,7 +1874,7 @@ class SessionManagerTest { heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, mock()) } @Suppress("UNCHECKED_CAST") - publisherWithDominoLogicByClientId[SessionManagerImpl.HeartbeatManager.HEARTBEAT_MANAGER_CLIENT_ID]!!.forEach { + publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { whenever(it.publish(any())).doAnswer { invocation -> callback(invocation.arguments.first() as List>) } @@ -1934,7 +1937,7 @@ class SessionManagerTest { heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) } @Suppress("UNCHECKED_CAST") - publisherWithDominoLogicByClientId[SessionManagerImpl.HeartbeatManager.HEARTBEAT_MANAGER_CLIENT_ID]!!.forEach { + publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { whenever(it.publish(any())).doAnswer { invocation -> callback(invocation.arguments.first() as List>) } @@ -2009,7 +2012,7 @@ class SessionManagerTest { ) heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) } - publisherWithDominoLogicByClientId[SessionManagerImpl.HeartbeatManager.HEARTBEAT_MANAGER_CLIENT_ID]!!.forEach { + publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { whenever(it.publish(any())).doAnswer { invocation -> @Suppress("UNCHECKED_CAST") callback(invocation.arguments.first() as List>) @@ -2080,7 +2083,7 @@ class SessionManagerTest { ) heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) } - publisherWithDominoLogicByClientId[SessionManagerImpl.HeartbeatManager.HEARTBEAT_MANAGER_CLIENT_ID]!!.forEach { + publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { whenever(it.publish(any())).doAnswer { publish() } } sessionManager.start() @@ -2143,7 +2146,8 @@ class SessionManagerTest { @Test fun `sessions that have been refreshed are not tracked by the heartbeat manager`() { - val longTimePeriodConfigWithHeartbeat = SessionManagerImpl.HeartbeatManager.HeartbeatManagerConfig( + val longTimePeriodConfigWithHeartbeat = SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfig( + true, Duration.ofDays(1), Duration.ofDays(10) ) @@ -2186,7 +2190,7 @@ class SessionManagerTest { heartbeatConfigHandler.applyNewConfiguration(longTimePeriodConfigWithHeartbeat, null, mock()) } @Suppress("UNCHECKED_CAST") - publisherWithDominoLogicByClientId[SessionManagerImpl.HeartbeatManager.HEARTBEAT_MANAGER_CLIENT_ID]!!.forEach { + publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { whenever(it.publish(any())).doAnswer { invocation -> callback(invocation.arguments.first() as List>) } diff --git a/gradle.properties b/gradle.properties index baa9e36824a..9a234aa2c26 100644 --- a/gradle.properties +++ b/gradle.properties @@ -44,7 +44,7 @@ commonsLangVersion = 3.12.0 commonsTextVersion = 1.10.0 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.2.0.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.2.0.26-beta+ +cordaApiVersion=5.2.0.26-alpha-1704472101327 disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 diff --git a/libs/configuration/configuration-schema/p2p/src/main/kotlin/net/corda/libs/configuration/schema/p2p/LinkManagerConfiguration.kt b/libs/configuration/configuration-schema/p2p/src/main/kotlin/net/corda/libs/configuration/schema/p2p/LinkManagerConfiguration.kt index 8cf38c12f0a..fb7bbaa3c55 100644 --- a/libs/configuration/configuration-schema/p2p/src/main/kotlin/net/corda/libs/configuration/schema/p2p/LinkManagerConfiguration.kt +++ b/libs/configuration/configuration-schema/p2p/src/main/kotlin/net/corda/libs/configuration/schema/p2p/LinkManagerConfiguration.kt @@ -8,6 +8,7 @@ class LinkManagerConfiguration { const val BASE_REPLAY_PERIOD_KEY = "baseReplayPeriod" const val REPLAY_PERIOD_CUTOFF_KEY = "replayPeriodCutoff" const val MAX_REPLAYING_MESSAGES_PER_PEER = "maxReplayingMessages" + const val HEARTBEAT_ENABLED_KEY = "heartbeatEnabled" const val HEARTBEAT_MESSAGE_PERIOD_KEY = "heartbeatMessagePeriod" const val SESSION_TIMEOUT_KEY = "sessionTimeout" const val SESSIONS_PER_PEER_KEY = "sessionsPerPeer" From 7d01b1b1adb699e371045a67e58f988e024ed485 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Fri, 5 Jan 2024 17:41:28 +0000 Subject: [PATCH 02/24] fix integration test --- .../kotlin/net/corda/p2p/P2PLayerEndToEndTest.kt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/components/link-manager/src/nonOsgiIntegrationTest/kotlin/net/corda/p2p/P2PLayerEndToEndTest.kt b/components/link-manager/src/nonOsgiIntegrationTest/kotlin/net/corda/p2p/P2PLayerEndToEndTest.kt index 69e5fb7f5be..aed56396da0 100644 --- a/components/link-manager/src/nonOsgiIntegrationTest/kotlin/net/corda/p2p/P2PLayerEndToEndTest.kt +++ b/components/link-manager/src/nonOsgiIntegrationTest/kotlin/net/corda/p2p/P2PLayerEndToEndTest.kt @@ -29,6 +29,7 @@ import net.corda.libs.configuration.SmartConfig import net.corda.libs.configuration.SmartConfigFactory import net.corda.libs.configuration.merger.impl.ConfigMergerImpl import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration +import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.HEARTBEAT_ENABLED_KEY import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.HEARTBEAT_MESSAGE_PERIOD_KEY import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.MAX_MESSAGE_SIZE_KEY import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.MAX_REPLAYING_MESSAGES_PER_PEER @@ -519,6 +520,7 @@ class P2PLayerEndToEndTest { ConfigFactory.empty() .withValue(MAX_MESSAGE_SIZE_KEY, ConfigValueFactory.fromAnyRef(1000000)) .withValue(MAX_REPLAYING_MESSAGES_PER_PEER, ConfigValueFactory.fromAnyRef(100)) + .withValue(HEARTBEAT_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true)) .withValue(HEARTBEAT_MESSAGE_PERIOD_KEY, ConfigValueFactory.fromAnyRef(Duration.ofSeconds(2))) .withValue(SESSION_TIMEOUT_KEY, ConfigValueFactory.fromAnyRef(Duration.ofSeconds(10))) .withValue(SESSIONS_PER_PEER_KEY, ConfigValueFactory.fromAnyRef(null)) From c611f61b89747358575e1e84841100ed0fe4dcfa Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Mon, 8 Jan 2024 09:39:39 +0000 Subject: [PATCH 03/24] fix integration test --- .../p2p/linkmanager/integration/LinkManagerIntegrationTest.kt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/components/link-manager/src/integrationTest/kotlin/net/corda/p2p/linkmanager/integration/LinkManagerIntegrationTest.kt b/components/link-manager/src/integrationTest/kotlin/net/corda/p2p/linkmanager/integration/LinkManagerIntegrationTest.kt index 6bd0f0d8f74..c49575bee94 100644 --- a/components/link-manager/src/integrationTest/kotlin/net/corda/p2p/linkmanager/integration/LinkManagerIntegrationTest.kt +++ b/components/link-manager/src/integrationTest/kotlin/net/corda/p2p/linkmanager/integration/LinkManagerIntegrationTest.kt @@ -12,6 +12,7 @@ import net.corda.data.p2p.crypto.protocol.RevocationCheckMode import net.corda.db.messagebus.testkit.DBSetup import net.corda.libs.configuration.SmartConfigFactory import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration +import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.HEARTBEAT_ENABLED_KEY import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.HEARTBEAT_MESSAGE_PERIOD_KEY import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.MAX_MESSAGE_SIZE_KEY import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.MAX_REPLAYING_MESSAGES_PER_PEER @@ -113,6 +114,7 @@ class LinkManagerIntegrationTest { return ConfigFactory.empty() .withValue(MAX_MESSAGE_SIZE_KEY, ConfigValueFactory.fromAnyRef(1000000)) .withValue(MAX_REPLAYING_MESSAGES_PER_PEER, ConfigValueFactory.fromAnyRef(100)) + .withValue(HEARTBEAT_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true)) .withValue(HEARTBEAT_MESSAGE_PERIOD_KEY, ConfigValueFactory.fromAnyRef(2000)) .withValue(SESSION_TIMEOUT_KEY, ConfigValueFactory.fromAnyRef(10000)) .withValue(SESSIONS_PER_PEER_KEY, ConfigValueFactory.fromAnyRef(null)) From 09082fab2a753ec566a3b9704e5dddd485b82c6d Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Mon, 8 Jan 2024 12:15:30 +0000 Subject: [PATCH 04/24] refactor --- .../sessions/SessionManagerImpl.kt | 195 ++++++++++-------- 1 file changed, 112 insertions(+), 83 deletions(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index 32b8d5cfc8c..4952de54ff4 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -786,7 +786,7 @@ internal class SessionManagerImpl( "Outbound session $sessionId (local=${sessionCounterparties.ourId}, remote=${sessionCounterparties.counterpartyId}) timed " + "out to refresh ephemeral keys and it will be cleaned up." ) - refreshOutboundSession(sessionCounterparties, sessionId) + refreshOutboundSession(sessionCounterparties, sessionId) sessionHealthManager.stopTrackingSpecifiedOutboundSession(sessionId) } @@ -1022,6 +1022,8 @@ internal class SessionManagerImpl( private val config = AtomicReference() + private val sessionHealthMonitor = AtomicReference() + @VisibleForTesting internal data class SessionHealthManagerConfig( val heartbeatEnabled: Boolean, @@ -1042,6 +1044,13 @@ internal class SessionManagerImpl( ): CompletableFuture { val configUpdateResult = CompletableFuture() config.set(newConfiguration) + sessionHealthMonitor.set( + when { + newConfiguration.heartbeatEnabled -> HeartbeatSessionHealthMonitor() + else -> NoHeartbeatSessionHealthMonitor() + } + ) + stopTrackingAllSessions() configUpdateResult.complete(Unit) return configUpdateResult } @@ -1151,21 +1160,7 @@ internal class SessionManagerImpl( check (isRunning) { "A message was sent before the ${SessionHealthManager::class.java.simpleName} was started." } - trackedOutboundSessions.computeIfPresent(session.sessionId) { _, trackedSession -> - if (config.get().heartbeatEnabled && !trackedSession.sendingHeartbeats) { - executorService.schedule( - { sendHeartbeat(trackedSession.identityData, session) }, - config.get().heartbeatPeriod.toMillis(), - TimeUnit.MILLISECONDS - ) - trackedSession.sendingHeartbeats = true - } else if(!config.get().heartbeatEnabled) { - logger.debug( - "Session heartbeats are disabled. Not starting heartbeats for session ${session.sessionId}." - ) - } - trackedSession - } ?: throw IllegalStateException("A message was sent on session with Id ${session.sessionId} which is not tracked.") + sessionHealthMonitor.get().sessionEstablished(session) } } @@ -1211,76 +1206,11 @@ internal class SessionManagerImpl( } private fun messageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { - if(config.get().heartbeatEnabled) { - trackedInboundSessions.compute(sessionId) { _, initialTrackedSession -> - if (initialTrackedSession != null) { - initialTrackedSession.lastReceivedTimestamp = timeStamp() - initialTrackedSession - } else { - executorService.schedule( - { inboundSessionTimeout(sessionId, source, destination) }, - config.get().sessionTimeout.toMillis(), - TimeUnit.MILLISECONDS - ) - TrackedInboundSession(timeStamp()) - } - } - } else { - logger.debug( - "Heartbeats are disabled. " + - "Inbound session timeout not enabled for session with ID $sessionId." - ) - } + sessionHealthMonitor.get().messageReceived(sessionId, source, destination) } private fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { - if(config.get().heartbeatEnabled) { - heartbeatOutboundSessionTimeout(counterparties, sessionId) - } else { - noHeartbeatOutboundSessionTimeout(counterparties, sessionId) - } - } - - private fun heartbeatOutboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { - val sessionInfo = trackedOutboundSessions[sessionId] ?: return - val timeSinceLastAck = timeStamp() - sessionInfo.lastAckTimestamp - val sessionTimeoutMs = config.get().sessionTimeout.toMillis() - if (timeSinceLastAck >= sessionTimeoutMs) { - logger.info( - "Outbound session $sessionId (local=${counterparties.ourId}, remote=" + - "${counterparties.counterpartyId}) has not received any messages for the configured " + - "timeout threshold ($sessionTimeoutMs ms) so it will be cleaned up." - ) - tearDownOutboundSession(counterparties, sessionId) - } else { - scheduleOutboundSessionTimeout(counterparties, sessionId, sessionTimeoutMs - timeSinceLastAck) - } - } - - private fun noHeartbeatOutboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { - val sessionInfo = trackedOutboundSessions[sessionId] ?: return - val timestamp = timeStamp() - val timeSinceLastAck = timestamp - sessionInfo.lastAckTimestamp - val timeSinceLastSent = timestamp - sessionInfo.lastSendTimestamp - val maxWaitForAck = config.get().sessionTimeout.toMillis() - val waitingForAck = timeSinceLastAck > timeSinceLastSent - if (waitingForAck && timeSinceLastSent >= maxWaitForAck) { - logger.info( - "Outbound session $sessionId (local=${counterparties.ourId}, remote=" + - "${counterparties.counterpartyId}) has not received any acknowledgement to the last sent message " + - "within the configured timeout threshold ($maxWaitForAck ms) so it will be cleaned up. " + - "Time since last ack ${timeSinceLastAck}ms. ]" + - "Time since last sent ${timeSinceLastSent}ms." - ) - tearDownOutboundSession(counterparties, sessionId) - } else { - val delay = if (waitingForAck) { - maxWaitForAck - timeSinceLastSent - } else { - maxWaitForAck - } - scheduleOutboundSessionTimeout(counterparties, sessionId, delay) - } + sessionHealthMonitor.get().outboundSessionTimeout(counterparties, sessionId) } private fun tearDownOutboundSession(counterparties: SessionCounterparties, sessionId: String) { @@ -1397,5 +1327,104 @@ internal class SessionManagerImpl( private fun timeStamp(): Long { return clock.instant().toEpochMilli() } + + private interface SessionHealthMonitor { + fun sessionEstablished(session: Session) + + fun messageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) + + fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) + } + + private inner class HeartbeatSessionHealthMonitor: SessionHealthMonitor { + override fun sessionEstablished(session: Session) { + trackedOutboundSessions.computeIfPresent(session.sessionId) { _, trackedSession -> + if (!trackedSession.sendingHeartbeats) { + executorService.schedule( + { sendHeartbeat(trackedSession.identityData, session) }, + config.get().heartbeatPeriod.toMillis(), + TimeUnit.MILLISECONDS + ) + trackedSession.sendingHeartbeats = true + } + trackedSession + } ?: throw IllegalStateException("A message was sent on session with Id ${session.sessionId} which is not tracked.") + } + + override fun messageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { + trackedInboundSessions.compute(sessionId) { _, initialTrackedSession -> + if (initialTrackedSession != null) { + initialTrackedSession.lastReceivedTimestamp = timeStamp() + initialTrackedSession + } else { + executorService.schedule( + { inboundSessionTimeout(sessionId, source, destination) }, + config.get().sessionTimeout.toMillis(), + TimeUnit.MILLISECONDS + ) + TrackedInboundSession(timeStamp()) + } + } + } + + override fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { + val sessionInfo = trackedOutboundSessions[sessionId] ?: return + val timeSinceLastAck = timeStamp() - sessionInfo.lastAckTimestamp + val sessionTimeoutMs = config.get().sessionTimeout.toMillis() + if (timeSinceLastAck >= sessionTimeoutMs) { + logger.info( + "Outbound session $sessionId (local=${counterparties.ourId}, remote=" + + "${counterparties.counterpartyId}) has not received any messages for the configured " + + "timeout threshold ($sessionTimeoutMs ms) so it will be cleaned up." + ) + tearDownOutboundSession(counterparties, sessionId) + } else { + scheduleOutboundSessionTimeout(counterparties, sessionId, sessionTimeoutMs - timeSinceLastAck) + } + } + } + private inner class NoHeartbeatSessionHealthMonitor: SessionHealthMonitor { + override fun sessionEstablished(session: Session) { + check(trackedOutboundSessions.contains(session.sessionId)) { + "A message was sent on session with Id ${session.sessionId} which is not tracked." + } + logger.debug( + "Session heartbeats are disabled. Not starting heartbeats for session ${session.sessionId}." + ) + } + + override fun messageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { + logger.debug( + "Session heartbeats are disabled. " + + "Inbound session timeout not enabled for session with ID $sessionId." + ) + } + + override fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { + val sessionInfo = trackedOutboundSessions[sessionId] ?: return + val now = timeStamp() + val timeSinceLastAck = now - sessionInfo.lastAckTimestamp + val timeSinceLastSent = now - sessionInfo.lastSendTimestamp + val maxWaitForAck = config.get().sessionTimeout.toMillis() + val waitingForAck = timeSinceLastAck > timeSinceLastSent + if (waitingForAck && timeSinceLastSent >= maxWaitForAck) { + logger.info( + "Outbound session $sessionId (local=${counterparties.ourId}, remote=" + + "${counterparties.counterpartyId}) has not received any acknowledgement to the last sent message " + + "within the configured timeout threshold ($maxWaitForAck ms) so it will be cleaned up. " + + "Time since last ack ${timeSinceLastAck}ms. ]" + + "Time since last sent ${timeSinceLastSent}ms." + ) + tearDownOutboundSession(counterparties, sessionId) + } else { + val delay = if (waitingForAck) { + maxWaitForAck - timeSinceLastSent + } else { + maxWaitForAck + } + scheduleOutboundSessionTimeout(counterparties, sessionId, delay) + } + } + } } } From 99a1051c65453a3ca8cf34c157276b17c7e00b10 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Mon, 8 Jan 2024 12:37:31 +0000 Subject: [PATCH 05/24] refactor --- .../net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt | 2 -- 1 file changed, 2 deletions(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index 4952de54ff4..0574943e821 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -1021,7 +1021,6 @@ internal class SessionManagerImpl( } private val config = AtomicReference() - private val sessionHealthMonitor = AtomicReference() @VisibleForTesting @@ -1050,7 +1049,6 @@ internal class SessionManagerImpl( else -> NoHeartbeatSessionHealthMonitor() } ) - stopTrackingAllSessions() configUpdateResult.complete(Unit) return configUpdateResult } From a9ca0e2bd77c2bdb02a229b589b16241985e32fc Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Mon, 8 Jan 2024 12:51:28 +0000 Subject: [PATCH 06/24] refactor --- .../sessions/SessionManagerImpl.kt | 40 +++++++------------ 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index 0574943e821..92419c2480e 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -1142,11 +1142,7 @@ internal class SessionManagerImpl( initialTrackedSession.lastSendTimestamp = timestamp initialTrackedSession } else { - executorService.schedule( - { outboundSessionTimeout(counterparties, sessionId) }, - config.get().sessionTimeout.toMillis(), - TimeUnit.MILLISECONDS - ) + scheduleOutboundSessionTimeout(counterparties, sessionId, config.get().sessionTimeout.toMillis()) TrackedOutboundSession(counterparties, timestamp, timestamp) } } @@ -1190,7 +1186,7 @@ internal class SessionManagerImpl( check(isRunning) { "A session message was received before the ${SessionHealthManager::class.java.simpleName} was started." } - messageReceived(sessionId, source, destination) + sessionHealthMonitor.get().messageReceived(sessionId, source, destination) } } @@ -1199,18 +1195,10 @@ internal class SessionManagerImpl( check(isRunning) { "A data message was received before the ${SessionHealthManager::class.java.simpleName} was started." } - messageReceived(sessionId, source, destination) + sessionHealthMonitor.get().messageReceived(sessionId, source, destination) } } - private fun messageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { - sessionHealthMonitor.get().messageReceived(sessionId, source, destination) - } - - private fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { - sessionHealthMonitor.get().outboundSessionTimeout(counterparties, sessionId) - } - private fun tearDownOutboundSession(counterparties: SessionCounterparties, sessionId: String) { destroyOutboundSession(counterparties, sessionId) trackedOutboundSessions.remove(sessionId) @@ -1219,7 +1207,15 @@ internal class SessionManagerImpl( private fun scheduleOutboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String, delay: Long) { executorService.schedule( - { outboundSessionTimeout(counterparties, sessionId) }, + { sessionHealthMonitor.get().outboundSessionTimeout(counterparties, sessionId) }, + delay, + TimeUnit.MILLISECONDS + ) + } + + private fun scheduleInboundSessionTimeout(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?, delay: Long) { + executorService.schedule( + { inboundSessionTimeout(sessionId, source, destination) }, delay, TimeUnit.MILLISECONDS ) @@ -1238,11 +1234,7 @@ internal class SessionManagerImpl( trackedInboundSessions.remove(sessionId) recordInboundSessionTimeoutMetric(source, destination) } else { - executorService.schedule( - { inboundSessionTimeout(sessionId, source, destination) }, - sessionTimeoutMs - timeSinceLastReceived, - TimeUnit.MILLISECONDS - ) + scheduleInboundSessionTimeout(sessionId, source, destination, sessionTimeoutMs - timeSinceLastReceived) } } @@ -1355,11 +1347,7 @@ internal class SessionManagerImpl( initialTrackedSession.lastReceivedTimestamp = timeStamp() initialTrackedSession } else { - executorService.schedule( - { inboundSessionTimeout(sessionId, source, destination) }, - config.get().sessionTimeout.toMillis(), - TimeUnit.MILLISECONDS - ) + scheduleInboundSessionTimeout(sessionId, source, destination, config.get().sessionTimeout.toMillis()) TrackedInboundSession(timeStamp()) } } From 9646a8c4954be17b10a46e959c4b211fbbab7775 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Mon, 8 Jan 2024 15:42:43 +0000 Subject: [PATCH 07/24] refactor --- .../sessions/SessionManagerImpl.kt | 167 ++++++++++-------- .../sessions/SessionManagerTest.kt | 24 +-- 2 files changed, 101 insertions(+), 90 deletions(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index 92419c2480e..a157be98670 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -1046,7 +1046,7 @@ internal class SessionManagerImpl( sessionHealthMonitor.set( when { newConfiguration.heartbeatEnabled -> HeartbeatSessionHealthMonitor() - else -> NoHeartbeatSessionHealthMonitor() + else -> MessageAckSessionHealthMonitor() } ) configUpdateResult.complete(Unit) @@ -1238,86 +1238,14 @@ internal class SessionManagerImpl( } } - private fun sendHeartbeat(counterparties: SessionCounterparties, session: Session) { - val sessionInfo = trackedOutboundSessions[session.sessionId] - if (sessionInfo == null) { - logger.info("Stopped sending heartbeats for session (${session.sessionId}), which expired.") - return - } - val config = config.get() - - val timeSinceLastSend = timeStamp() - sessionInfo.lastSendTimestamp - if (timeSinceLastSend >= config.heartbeatPeriod.toMillis()) { - logger.trace { - "Sending heartbeat message between ${counterparties.ourId} (our Identity) and " + - "${counterparties.counterpartyId}." - } - sendHeartbeatMessage( - counterparties.ourId, - counterparties.counterpartyId, - session, - counterparties.status, - counterparties.serial - ) - executorService.schedule( - { sendHeartbeat(counterparties, session) }, - config.heartbeatPeriod.toMillis(), - TimeUnit.MILLISECONDS - ) - } else { - executorService.schedule( - { sendHeartbeat(counterparties, session) }, - config.heartbeatPeriod.toMillis() - timeSinceLastSend, - TimeUnit.MILLISECONDS - ) - } - } - - private fun sendHeartbeatMessage( - source: HoldingIdentity, - dest: HoldingIdentity, - session: Session, - filter: MembershipStatusFilter, - serial: Long - ) { - val heartbeatMessage = HeartbeatMessage() - val message = MessageConverter.linkOutMessageFromHeartbeat( - source, - dest, - heartbeatMessage, - session, - groupPolicyProvider, - membershipGroupReaderProvider, - filter, - serial - ) - if (message == null) { - logger.warn("Failed to send a Heartbeat between $source and $dest.") - return - } - val future = publisher.publish( - listOf( - Record( - LINK_OUT_TOPIC, - UUID.randomUUID().toString(), - message - ) - ) - ) - - future.single().whenComplete { _, error -> - if (error != null) { - logger.warn("An exception was thrown when sending a heartbeat message.\nException:", error) - } else { - recordOutboundHeartbeatMessagesMetric(source, dest) - } - } - } - private fun timeStamp(): Long { return clock.instant().toEpochMilli() } + /** + * Implementations of [SessionHealthMonitor] provide different methods of determining when a session has become + * unhealthy and handling of unhealthy sessions. + */ private interface SessionHealthMonitor { fun sessionEstablished(session: Session) @@ -1326,6 +1254,9 @@ internal class SessionManagerImpl( fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) } + /** + * Monitors session health based on a heart beating mechanism. + */ private inner class HeartbeatSessionHealthMonitor: SessionHealthMonitor { override fun sessionEstablished(session: Session) { trackedOutboundSessions.computeIfPresent(session.sessionId) { _, trackedSession -> @@ -1368,8 +1299,88 @@ internal class SessionManagerImpl( scheduleOutboundSessionTimeout(counterparties, sessionId, sessionTimeoutMs - timeSinceLastAck) } } + + private fun sendHeartbeat(counterparties: SessionCounterparties, session: Session) { + val sessionInfo = trackedOutboundSessions[session.sessionId] + if (sessionInfo == null) { + logger.info("Stopped sending heartbeats for session (${session.sessionId}), which expired.") + return + } + val config = config.get() + + val timeSinceLastSend = timeStamp() - sessionInfo.lastSendTimestamp + if (timeSinceLastSend >= config.heartbeatPeriod.toMillis()) { + logger.trace { + "Sending heartbeat message between ${counterparties.ourId} (our Identity) and " + + "${counterparties.counterpartyId}." + } + sendHeartbeatMessage( + counterparties.ourId, + counterparties.counterpartyId, + session, + counterparties.status, + counterparties.serial + ) + executorService.schedule( + { sendHeartbeat(counterparties, session) }, + config.heartbeatPeriod.toMillis(), + TimeUnit.MILLISECONDS + ) + } else { + executorService.schedule( + { sendHeartbeat(counterparties, session) }, + config.heartbeatPeriod.toMillis() - timeSinceLastSend, + TimeUnit.MILLISECONDS + ) + } + } + + private fun sendHeartbeatMessage( + source: HoldingIdentity, + dest: HoldingIdentity, + session: Session, + filter: MembershipStatusFilter, + serial: Long + ) { + val heartbeatMessage = HeartbeatMessage() + val message = MessageConverter.linkOutMessageFromHeartbeat( + source, + dest, + heartbeatMessage, + session, + groupPolicyProvider, + membershipGroupReaderProvider, + filter, + serial + ) + if (message == null) { + logger.warn("Failed to send a Heartbeat between $source and $dest.") + return + } + val future = publisher.publish( + listOf( + Record( + LINK_OUT_TOPIC, + UUID.randomUUID().toString(), + message + ) + ) + ) + + future.single().whenComplete { _, error -> + if (error != null) { + logger.warn("An exception was thrown when sending a heartbeat message.\nException:", error) + } else { + recordOutboundHeartbeatMessagesMetric(source, dest) + } + } + } } - private inner class NoHeartbeatSessionHealthMonitor: SessionHealthMonitor { + + /** + * Monitors session health based on whether sent messages have been acknowledged in a timely manner or not. + */ + private inner class MessageAckSessionHealthMonitor: SessionHealthMonitor { override fun sessionEstablished(session: Session) { check(trackedOutboundSessions.contains(session.sessionId)) { "A message was sent on session with Id ${session.sessionId} which is not tracked." diff --git a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt index ff81cac928d..804c24a7893 100644 --- a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt +++ b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt @@ -175,7 +175,7 @@ class SessionManagerTest { } private lateinit var configHandler: SessionManagerImpl.SessionManagerConfigChangeHandler - private lateinit var heartbeatConfigHandler: SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfigChangeHandler + private lateinit var sessionHealthManagerConfigHandler: SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfigChangeHandler private val dominoTile = Mockito.mockConstruction(ComplexDominoTile::class.java) { mock, context -> @Suppress("UNCHECKED_CAST") whenever(mock.withLifecycleLock(any<() -> Any>())).doAnswer { (it.arguments.first() as () -> Any).invoke() } @@ -185,7 +185,7 @@ class SessionManagerTest { configHandler = context.arguments()[6] as SessionManagerImpl.SessionManagerConfigChangeHandler } if (context.arguments()[6] is SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfigChangeHandler) { - heartbeatConfigHandler = context.arguments()[6] + sessionHealthManagerConfigHandler = context.arguments()[6] as SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfigChangeHandler } whenever(mock.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) @@ -335,7 +335,7 @@ class SessionManagerTest { null, mock(), ) - heartbeatConfigHandler.applyNewConfiguration(configNoHeartbeat, null, mock()) + sessionHealthManagerConfigHandler.applyNewConfiguration(configNoHeartbeat, null, mock()) } private fun MessageDigest.hash(data: ByteArray): ByteArray { @@ -1665,7 +1665,7 @@ class SessionManagerTest { null, mock(), ) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourceHolder) + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourceHolder) } sessionManager.start() @@ -1720,7 +1720,7 @@ class SessionManagerTest { null, mock(), ) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourceHolder) + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourceHolder) } sessionManager.start() @@ -1801,7 +1801,7 @@ class SessionManagerTest { null, mock(), ) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) } @Suppress("UNCHECKED_CAST") publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { @@ -1871,7 +1871,7 @@ class SessionManagerTest { null, mock(), ) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, mock()) + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, mock()) } @Suppress("UNCHECKED_CAST") publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { @@ -1886,7 +1886,7 @@ class SessionManagerTest { mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) assertThat(messages.size).isEqualTo(2) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, mock()) + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, mock()) mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) @@ -1934,7 +1934,7 @@ class SessionManagerTest { null, mock(), ) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) } @Suppress("UNCHECKED_CAST") publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { @@ -2010,7 +2010,7 @@ class SessionManagerTest { null, mock(), ) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) } publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { whenever(it.publish(any())).doAnswer { invocation -> @@ -2081,7 +2081,7 @@ class SessionManagerTest { null, mock(), ) - heartbeatConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) } publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { whenever(it.publish(any())).doAnswer { publish() } @@ -2187,7 +2187,7 @@ class SessionManagerTest { null, mock(), ) - heartbeatConfigHandler.applyNewConfiguration(longTimePeriodConfigWithHeartbeat, null, mock()) + sessionHealthManagerConfigHandler.applyNewConfiguration(longTimePeriodConfigWithHeartbeat, null, mock()) } @Suppress("UNCHECKED_CAST") publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { From 045b8c59a6390f81bdb83876b851cd8ad9e892ba Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Mon, 8 Jan 2024 17:34:36 +0000 Subject: [PATCH 08/24] bump api and temp logging --- .../linkmanager/sessions/SessionManagerImpl.kt | 16 ++++++++++++++-- gradle.properties | 2 +- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index a157be98670..a01e70787fc 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -1045,8 +1045,14 @@ internal class SessionManagerImpl( config.set(newConfiguration) sessionHealthMonitor.set( when { - newConfiguration.heartbeatEnabled -> HeartbeatSessionHealthMonitor() - else -> MessageAckSessionHealthMonitor() + newConfiguration.heartbeatEnabled -> { + logger.info("Using session heartbeats to monitor session health.") + HeartbeatSessionHealthMonitor() + } + else -> { + logger.info("Using message acknowledgements to monitor session health.") + MessageAckSessionHealthMonitor() + } } ) configUpdateResult.complete(Unit) @@ -1259,6 +1265,7 @@ internal class SessionManagerImpl( */ private inner class HeartbeatSessionHealthMonitor: SessionHealthMonitor { override fun sessionEstablished(session: Session) { + logger.info("${HeartbeatSessionHealthMonitor::class.java.simpleName} session established.") trackedOutboundSessions.computeIfPresent(session.sessionId) { _, trackedSession -> if (!trackedSession.sendingHeartbeats) { executorService.schedule( @@ -1273,6 +1280,7 @@ internal class SessionManagerImpl( } override fun messageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { + logger.info("${HeartbeatSessionHealthMonitor::class.java.simpleName} message received.") trackedInboundSessions.compute(sessionId) { _, initialTrackedSession -> if (initialTrackedSession != null) { initialTrackedSession.lastReceivedTimestamp = timeStamp() @@ -1285,6 +1293,7 @@ internal class SessionManagerImpl( } override fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { + logger.info("${HeartbeatSessionHealthMonitor::class.java.simpleName} outbound session timeout.") val sessionInfo = trackedOutboundSessions[sessionId] ?: return val timeSinceLastAck = timeStamp() - sessionInfo.lastAckTimestamp val sessionTimeoutMs = config.get().sessionTimeout.toMillis() @@ -1382,6 +1391,7 @@ internal class SessionManagerImpl( */ private inner class MessageAckSessionHealthMonitor: SessionHealthMonitor { override fun sessionEstablished(session: Session) { + logger.info("${MessageAckSessionHealthMonitor::class.java.simpleName} session established.") check(trackedOutboundSessions.contains(session.sessionId)) { "A message was sent on session with Id ${session.sessionId} which is not tracked." } @@ -1391,6 +1401,7 @@ internal class SessionManagerImpl( } override fun messageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { + logger.info("${MessageAckSessionHealthMonitor::class.java.simpleName} message received.") logger.debug( "Session heartbeats are disabled. " + "Inbound session timeout not enabled for session with ID $sessionId." @@ -1398,6 +1409,7 @@ internal class SessionManagerImpl( } override fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { + logger.info("${MessageAckSessionHealthMonitor::class.java.simpleName} outbound session timeout.") val sessionInfo = trackedOutboundSessions[sessionId] ?: return val now = timeStamp() val timeSinceLastAck = now - sessionInfo.lastAckTimestamp diff --git a/gradle.properties b/gradle.properties index 190b126746f..aa282a598f2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -44,7 +44,7 @@ commonsLangVersion = 3.12.0 commonsTextVersion = 1.10.0 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.2.0.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.2.0.27-beta+ +cordaApiVersion=5.2.0.27-alpha-1704732925433 disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 From 0eb7c52c0c47b23b7f112a7d31032059babf3791 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Tue, 9 Jan 2024 16:33:10 +0000 Subject: [PATCH 09/24] hardcode new config for quick test --- .../p2p/linkmanager/sessions/SessionManagerImpl.kt | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index a01e70787fc..e96adb0b89b 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -1064,7 +1064,7 @@ internal class SessionManagerImpl( private fun fromConfig(config: Config): SessionHealthManagerConfig { return SessionHealthManagerConfig( - config.getBoolean(LinkManagerConfiguration.HEARTBEAT_ENABLED_KEY), + false,//config.getBoolean(LinkManagerConfiguration.HEARTBEAT_ENABLED_KEY), Duration.ofMillis(config.getLong(LinkManagerConfiguration.HEARTBEAT_MESSAGE_PERIOD_KEY)), Duration.ofMillis(config.getLong(LinkManagerConfiguration.SESSION_TIMEOUT_KEY)) ) @@ -1265,7 +1265,6 @@ internal class SessionManagerImpl( */ private inner class HeartbeatSessionHealthMonitor: SessionHealthMonitor { override fun sessionEstablished(session: Session) { - logger.info("${HeartbeatSessionHealthMonitor::class.java.simpleName} session established.") trackedOutboundSessions.computeIfPresent(session.sessionId) { _, trackedSession -> if (!trackedSession.sendingHeartbeats) { executorService.schedule( @@ -1280,7 +1279,6 @@ internal class SessionManagerImpl( } override fun messageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { - logger.info("${HeartbeatSessionHealthMonitor::class.java.simpleName} message received.") trackedInboundSessions.compute(sessionId) { _, initialTrackedSession -> if (initialTrackedSession != null) { initialTrackedSession.lastReceivedTimestamp = timeStamp() @@ -1293,7 +1291,6 @@ internal class SessionManagerImpl( } override fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { - logger.info("${HeartbeatSessionHealthMonitor::class.java.simpleName} outbound session timeout.") val sessionInfo = trackedOutboundSessions[sessionId] ?: return val timeSinceLastAck = timeStamp() - sessionInfo.lastAckTimestamp val sessionTimeoutMs = config.get().sessionTimeout.toMillis() @@ -1391,7 +1388,6 @@ internal class SessionManagerImpl( */ private inner class MessageAckSessionHealthMonitor: SessionHealthMonitor { override fun sessionEstablished(session: Session) { - logger.info("${MessageAckSessionHealthMonitor::class.java.simpleName} session established.") check(trackedOutboundSessions.contains(session.sessionId)) { "A message was sent on session with Id ${session.sessionId} which is not tracked." } @@ -1401,7 +1397,6 @@ internal class SessionManagerImpl( } override fun messageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) { - logger.info("${MessageAckSessionHealthMonitor::class.java.simpleName} message received.") logger.debug( "Session heartbeats are disabled. " + "Inbound session timeout not enabled for session with ID $sessionId." @@ -1409,8 +1404,7 @@ internal class SessionManagerImpl( } override fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { - logger.info("${MessageAckSessionHealthMonitor::class.java.simpleName} outbound session timeout.") - val sessionInfo = trackedOutboundSessions[sessionId] ?: return +c val sessionInfo = trackedOutboundSessions[sessionId] ?: return val now = timeStamp() val timeSinceLastAck = now - sessionInfo.lastAckTimestamp val timeSinceLastSent = now - sessionInfo.lastSendTimestamp From cd36fe190ca9177e994c8a6a2a56c17f2e97e6c8 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Tue, 9 Jan 2024 16:45:54 +0000 Subject: [PATCH 10/24] remove random char missed in last commit --- .../net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index e96adb0b89b..41f42410b4e 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -1404,7 +1404,7 @@ internal class SessionManagerImpl( } override fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { -c val sessionInfo = trackedOutboundSessions[sessionId] ?: return + val sessionInfo = trackedOutboundSessions[sessionId] ?: return val now = timeStamp() val timeSinceLastAck = now - sessionInfo.lastAckTimestamp val timeSinceLastSent = now - sessionInfo.lastSendTimestamp From 74681a42d9c9eb6ac248bb2eaafadb2f7ab764f9 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Wed, 10 Jan 2024 09:22:07 +0000 Subject: [PATCH 11/24] fix bug in logic --- .../net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index 41f42410b4e..406c040e5c0 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -1388,7 +1388,7 @@ internal class SessionManagerImpl( */ private inner class MessageAckSessionHealthMonitor: SessionHealthMonitor { override fun sessionEstablished(session: Session) { - check(trackedOutboundSessions.contains(session.sessionId)) { + check(trackedOutboundSessions.containsKey(session.sessionId)) { "A message was sent on session with Id ${session.sessionId} which is not tracked." } logger.debug( From 2af8f71656b2650a807c7ab9aba79d893f124c40 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Wed, 10 Jan 2024 11:41:21 +0000 Subject: [PATCH 12/24] hardcode heartbeats enabled --- .../net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index 406c040e5c0..9f692a9b8fc 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -1064,7 +1064,7 @@ internal class SessionManagerImpl( private fun fromConfig(config: Config): SessionHealthManagerConfig { return SessionHealthManagerConfig( - false,//config.getBoolean(LinkManagerConfiguration.HEARTBEAT_ENABLED_KEY), + true,//config.getBoolean(LinkManagerConfiguration.HEARTBEAT_ENABLED_KEY), Duration.ofMillis(config.getLong(LinkManagerConfiguration.HEARTBEAT_MESSAGE_PERIOD_KEY)), Duration.ofMillis(config.getLong(LinkManagerConfiguration.SESSION_TIMEOUT_KEY)) ) From ac0edba640705f55288dee9d7f46fa82199818c3 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Wed, 10 Jan 2024 14:57:07 +0000 Subject: [PATCH 13/24] remove hardcoding --- .../net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index 9f692a9b8fc..159854fc4f9 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -1064,7 +1064,7 @@ internal class SessionManagerImpl( private fun fromConfig(config: Config): SessionHealthManagerConfig { return SessionHealthManagerConfig( - true,//config.getBoolean(LinkManagerConfiguration.HEARTBEAT_ENABLED_KEY), + config.getBoolean(LinkManagerConfiguration.HEARTBEAT_ENABLED_KEY), Duration.ofMillis(config.getLong(LinkManagerConfiguration.HEARTBEAT_MESSAGE_PERIOD_KEY)), Duration.ofMillis(config.getLong(LinkManagerConfiguration.SESSION_TIMEOUT_KEY)) ) From 0b14926da5e9307c6fcce9b21959137b1cfb47b0 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Thu, 11 Jan 2024 09:25:16 +0000 Subject: [PATCH 14/24] tidy up metrics and add logging to debug --- .../p2p/linkmanager/inbound/InboundMessageProcessor.kt | 6 ++++-- .../corda/p2p/linkmanager/sessions/SessionManagerImpl.kt | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/inbound/InboundMessageProcessor.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/inbound/InboundMessageProcessor.kt index 624f65a2a8a..f179c7a24d4 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/inbound/InboundMessageProcessor.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/inbound/InboundMessageProcessor.kt @@ -127,8 +127,10 @@ internal class InboundMessageProcessor( private fun processSessionMessages(messages: List>): List>, LinkInMessage>> { - recordInboundSessionMessagesMetric() - val responses = sessionManager.processSessionMessages(messages) {message -> message.item} + val responses = sessionManager.processSessionMessages(messages) { message -> + recordInboundSessionMessagesMetric() + message.item + } return responses.map { (traceableMessage, response) -> if (response != null) { when (val payload = response.payload) { diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index 159854fc4f9..b7697e26fc3 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -1316,10 +1316,10 @@ internal class SessionManagerImpl( val timeSinceLastSend = timeStamp() - sessionInfo.lastSendTimestamp if (timeSinceLastSend >= config.heartbeatPeriod.toMillis()) { - logger.trace { + logger.info ( "Sending heartbeat message between ${counterparties.ourId} (our Identity) and " + "${counterparties.counterpartyId}." - } + ) sendHeartbeatMessage( counterparties.ourId, counterparties.counterpartyId, From 37cb0b0daa60cced0d8eb961116ff8d9ffd7cba8 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Thu, 11 Jan 2024 10:02:39 +0000 Subject: [PATCH 15/24] clear tracked outbound sessions when new config is received --- .../sessions/SessionManagerImpl.kt | 25 +++++++++++-------- .../sessions/SessionManagerTest.kt | 2 +- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index b7697e26fc3..a5e954781f9 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -1043,18 +1043,21 @@ internal class SessionManagerImpl( ): CompletableFuture { val configUpdateResult = CompletableFuture() config.set(newConfiguration) - sessionHealthMonitor.set( - when { - newConfiguration.heartbeatEnabled -> { - logger.info("Using session heartbeats to monitor session health.") - HeartbeatSessionHealthMonitor() + if(newConfiguration.heartbeatEnabled != oldConfiguration?.heartbeatEnabled) { + sessionHealthMonitor.set( + when { + newConfiguration.heartbeatEnabled -> { + logger.info("Using session heartbeats to monitor session health.") + HeartbeatSessionHealthMonitor() + } + else -> { + logger.info("Using message acknowledgements to monitor session health.") + MessageAckSessionHealthMonitor() + } } - else -> { - logger.info("Using message acknowledgements to monitor session health.") - MessageAckSessionHealthMonitor() - } - } - ) + ) + trackedOutboundSessions.clear() + } configUpdateResult.complete(Unit) return configUpdateResult } diff --git a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt index 804c24a7893..8c5d850b13f 100644 --- a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt +++ b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt @@ -1886,7 +1886,7 @@ class SessionManagerTest { mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) assertThat(messages.size).isEqualTo(2) - sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, mock()) + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, configWithHeartbeat, mock()) mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) From 6740d80904a1ce81b7a6affde2ee1bb6ea9ee29c Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Thu, 11 Jan 2024 15:27:16 +0000 Subject: [PATCH 16/24] undo last change and protect against sending heartbeats after config change --- .../net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index a5e954781f9..5a5c7ab8e36 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -1056,7 +1056,6 @@ internal class SessionManagerImpl( } } ) - trackedOutboundSessions.clear() } configUpdateResult.complete(Unit) return configUpdateResult @@ -1316,6 +1315,10 @@ internal class SessionManagerImpl( return } val config = config.get() + if(!config.heartbeatEnabled) { + logger.info("Heartbeats have been disabled. Stopping heartbeats for (${session.sessionId}).") + return + } val timeSinceLastSend = timeStamp() - sessionInfo.lastSendTimestamp if (timeSinceLastSend >= config.heartbeatPeriod.toMillis()) { From ff9a98031470e9bbb44926ef6d245dadf41454f8 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Fri, 12 Jan 2024 09:51:03 +0000 Subject: [PATCH 17/24] ensure session manager handles changed config --- .../corda/p2p/linkmanager/sessions/SessionManagerImpl.kt | 6 ++++-- .../corda/p2p/linkmanager/sessions/SessionManagerTest.kt | 7 +++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index 5a5c7ab8e36..b36600b5ceb 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -77,7 +77,6 @@ import net.corda.schema.Schemas.P2P.SESSION_OUT_PARTITIONS import net.corda.schema.configuration.ConfigKeys import net.corda.utilities.VisibleForTesting import net.corda.utilities.time.Clock -import net.corda.utilities.trace import net.corda.v5.membership.MemberInfo import net.corda.virtualnode.HoldingIdentity import net.corda.virtualnode.toCorda @@ -148,7 +147,8 @@ internal class SessionManagerImpl( 2, 1, RevocationCheckMode.OFF, - 432000 + 432000, + true ) ) @@ -206,6 +206,7 @@ internal class SessionManagerImpl( val sessionsPerPeerForMgm: Int, val revocationConfigMode: RevocationCheckMode, val sessionRefreshThreshold: Int, + val heartbeatsEnabled: Boolean ) internal inner class SessionManagerConfigChangeHandler : ConfigurationChangeHandler( @@ -255,6 +256,7 @@ internal class SessionManagerImpl( config.getInt(LinkManagerConfiguration.SESSIONS_PER_PEER_FOR_MGM_KEY), config.getEnum(RevocationCheckMode::class.java, LinkManagerConfiguration.REVOCATION_CHECK_KEY), config.getInt(LinkManagerConfiguration.SESSION_REFRESH_THRESHOLD_KEY), + config.getBoolean(LinkManagerConfiguration.HEARTBEAT_ENABLED_KEY) ) } diff --git a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt index 8c5d850b13f..9dab5958f84 100644 --- a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt +++ b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt @@ -120,7 +120,7 @@ class SessionManagerTest { Duration.ofMillis(100), Duration.ofMillis(500) ) - private val configNoHeartbeat = SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfig( + private val configWithHighHeartbeatPeriod = SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfig( true, Duration.ofMillis(sixDaysInMillis), Duration.ofMillis(sixDaysInMillis) @@ -300,6 +300,7 @@ class SessionManagerTest { SESSIONS_PER_COUNTERPARTIES_FOR_MGM, RevocationCheckMode.OFF, SESSION_REFRESH_THRESHOLD_KEY, + true ) private val configWithOneSessionBetweenMembers = SessionManagerImpl.SessionManagerConfig( MAX_MESSAGE_SIZE, @@ -307,6 +308,7 @@ class SessionManagerTest { SESSIONS_PER_COUNTERPARTIES_FOR_MGM, RevocationCheckMode.OFF, SESSION_REFRESH_THRESHOLD_KEY, + true ) private val sessionManager = SessionManagerImpl( @@ -335,7 +337,7 @@ class SessionManagerTest { null, mock(), ) - sessionHealthManagerConfigHandler.applyNewConfiguration(configNoHeartbeat, null, mock()) + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHighHeartbeatPeriod, null, mock()) } private fun MessageDigest.hash(data: ByteArray): ByteArray { @@ -1959,6 +1961,7 @@ class SessionManagerTest { SESSIONS_PER_COUNTERPARTIES_FOR_MGM, RevocationCheckMode.OFF, SESSION_REFRESH_THRESHOLD_KEY, + true ), resourcesHolder, ) From 30114e273202968186a86078761ec01525965ab6 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Mon, 15 Jan 2024 17:30:21 +0000 Subject: [PATCH 18/24] unit tests --- .../sessions/SessionManagerTest.kt | 706 ++++++++++-------- 1 file changed, 386 insertions(+), 320 deletions(-) diff --git a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt index 9dab5958f84..91b2bb554dd 100644 --- a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt +++ b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt @@ -102,6 +102,10 @@ import net.corda.p2p.linkmanager.grouppolicy.protocolModes import java.util.UUID import java.util.concurrent.TimeUnit import net.corda.membership.lib.exceptions.BadGroupPolicyException +import net.corda.p2p.linkmanager.sessions.SessionManagerImpl.SessionHealthManager.Companion.SESSION_HEALTH_MANAGER_CLIENT_ID +import net.corda.p2p.linkmanager.sessions.SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfigChangeHandler +import net.corda.p2p.linkmanager.sessions.SessionManagerImpl.SessionManagerConfigChangeHandler +import org.mockito.ArgumentMatchers.anyList class SessionManagerTest { @@ -125,6 +129,11 @@ class SessionManagerTest { Duration.ofMillis(sixDaysInMillis), Duration.ofMillis(sixDaysInMillis) ) + private val configWithNoHeartbeat = SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfig( + false, + Duration.ofMillis(100), + Duration.ofMillis(500) + ) private val keyGenerator = KeyPairGenerator.getInstance("EC", BouncyCastleProvider()) private val messageDigest = MessageDigest.getInstance(ProtocolConstants.HASH_ALGO, BouncyCastleProvider()) @@ -174,19 +183,16 @@ class SessionManagerTest { outboundSessionPool.close() } - private lateinit var configHandler: SessionManagerImpl.SessionManagerConfigChangeHandler - private lateinit var sessionHealthManagerConfigHandler: SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfigChangeHandler + private lateinit var configHandler: SessionManagerConfigChangeHandler + private lateinit var sessionHealthManagerConfigHandler: SessionHealthManagerConfigChangeHandler + + @Suppress("UNCHECKED_CAST") private val dominoTile = Mockito.mockConstruction(ComplexDominoTile::class.java) { mock, context -> - @Suppress("UNCHECKED_CAST") whenever(mock.withLifecycleLock(any<() -> Any>())).doAnswer { (it.arguments.first() as () -> Any).invoke() } - @Suppress("UNCHECKED_CAST") whenever(mock.withLifecycleWriteLock(any<() -> Any>())).doAnswer { (it.arguments.first() as () -> Any).invoke() } - if (context.arguments()[6] is SessionManagerImpl.SessionManagerConfigChangeHandler) { - configHandler = context.arguments()[6] as SessionManagerImpl.SessionManagerConfigChangeHandler - } - if (context.arguments()[6] is SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfigChangeHandler) { - sessionHealthManagerConfigHandler = context.arguments()[6] - as SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfigChangeHandler + when(val seventhArg = context.arguments()[6]) { + is SessionManagerConfigChangeHandler -> configHandler = seventhArg + is SessionHealthManagerConfigChangeHandler -> sessionHealthManagerConfigHandler = seventhArg } whenever(mock.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) } @@ -310,34 +316,59 @@ class SessionManagerTest { SESSION_REFRESH_THRESHOLD_KEY, true ) + private val configWithOneSessionBetweenMembersAndNoHeartbeats = SessionManagerImpl.SessionManagerConfig( + MAX_MESSAGE_SIZE, + 1, + SESSIONS_PER_COUNTERPARTIES_FOR_MGM, + RevocationCheckMode.OFF, + SESSION_REFRESH_THRESHOLD_KEY, + false + ) - private val sessionManager = SessionManagerImpl( - groupPolicyProvider, - membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - config, - null, + private val sessionManager = createSessionManager(mock(), config, configWithHighHeartbeatPeriod) + + private fun createSessionManager( + resourcesHolder: ResourcesHolder = mock(), + sessionManagerConfig: SessionManagerImpl.SessionManagerConfig, + sessionHealthManagerConfig: SessionManagerImpl.SessionHealthManager.SessionHealthManagerConfig + ): SessionManagerImpl { + return SessionManagerImpl( + groupPolicyProvider, + membershipGroupReaderProvider, + cryptoOpsClient, + pendingSessionMessageQueues, mock(), - ) - sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHighHeartbeatPeriod, null, mock()) + mock(), + mock(), + mock(), + mock { + val dominoTile = mock { + whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) + } + on { it.dominoTile } doReturn dominoTile + }, + linkManagerHostingMap, + protocolFactory, + mockTimeFacilitiesProvider.clock, + sessionReplayer, + ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { + setRunning() + configHandler.applyNewConfiguration( + sessionManagerConfig, + null, + mock(), + ) + sessionHealthManagerConfigHandler.applyNewConfiguration(sessionHealthManagerConfig, null, resourcesHolder) + } + } + + @Suppress("UNCHECKED_CAST") + private fun mockSessionHealthManagerPublisherAndCaptureRecords(callback: (List>) -> List>) { + publisherWithDominoLogicByClientId[SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { + whenever(it.publish(any())).doAnswer { invocation -> + return@doAnswer callback(invocation.arguments.first() as List>) + } + } } private fun MessageDigest.hash(data: ByteArray): ByteArray { @@ -372,24 +403,27 @@ class SessionManagerTest { } /** - * Send the [sessionManager] an authenticatedMessage and a [ResponderHandshakeMessage] so that it starts sending Heartbeats. + * Send the [sessionManager] an authenticatedMessage and a [ResponderHandshakeMessage] so that it a session is started. */ - private fun startSendingHeartbeats(sessionManager: SessionManager) { + private fun startSession( + sessionManager: SessionManager, + localProtocolInitiator: AuthenticationProtocolInitiator = protocolInitiator + ) { val initiatorHello = mock() - whenever(protocolInitiator.generateInitiatorHello()).thenReturn(initiatorHello) + whenever(localProtocolInitiator.generateInitiatorHello()).thenReturn(initiatorHello) whenever(outboundSessionPool.constructed().last().getNextSession(counterparties)).thenReturn( OutboundSessionPool.SessionPoolStatus.NewSessionsNeeded ) sessionManager.processOutboundMessages(listOf(message)) { it } - whenever(outboundSessionPool.constructed().last().getSession(protocolInitiator.sessionId)).thenReturn( - OutboundSessionPool.SessionType.PendingSession(counterparties, protocolInitiator) + whenever(outboundSessionPool.constructed().last().getSession(localProtocolInitiator.sessionId)).thenReturn( + OutboundSessionPool.SessionType.PendingSession(counterparties, localProtocolInitiator) ) - val header = CommonHeader(MessageType.RESPONDER_HANDSHAKE, 1, protocolInitiator.sessionId, 4, Instant.now().toEpochMilli()) + val header = CommonHeader(MessageType.RESPONDER_HANDSHAKE, 1, localProtocolInitiator.sessionId, 4, Instant.now().toEpochMilli()) val responderHandshakeMessage = ResponderHandshakeMessage(header, RANDOM_BYTES, RANDOM_BYTES) - whenever(authenticatedSession.sessionId).doAnswer { protocolInitiator.sessionId } - whenever(protocolInitiator.getSession()).thenReturn(authenticatedSession) + whenever(authenticatedSession.sessionId).doAnswer { localProtocolInitiator.sessionId } + whenever(localProtocolInitiator.getSession()).thenReturn(authenticatedSession) sessionManager.processSessionMessages(listOf(LinkInMessage(responderHandshakeMessage))) { it } } @@ -1639,36 +1673,9 @@ class SessionManagerTest { } @Test - fun `when responder hello is received, the session is pending, if no response is received, the session times out`() { + fun `when responder hello is received, the session is pending, if no response is received, the session times out if heartbeats are enabled`() { val resourceHolder = ResourcesHolder() - val sessionManager = SessionManagerImpl( - groupPolicyProvider, - membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourceHolder) - } + val sessionManager = createSessionManager(resourceHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) sessionManager.start() val sessionId = "some-session" @@ -1695,35 +1702,38 @@ class SessionManagerTest { } @Test - fun `when responder handshake is received, the session is established, if no message is sent, the session times out`() { + fun `when responder hello is received, the session is pending, if no response is received, the session does not time out if heartbeats are disabled`() { val resourceHolder = ResourcesHolder() - val sessionManager = SessionManagerImpl( - groupPolicyProvider, membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourceHolder) - } + val sessionManager = createSessionManager(resourceHolder, configWithOneSessionBetweenMembersAndNoHeartbeats, configWithNoHeartbeat) + sessionManager.start() + + val sessionId = "some-session" + whenever(outboundSessionPool.constructed().last().getSession(sessionId)).thenReturn( + OutboundSessionPool.SessionType.PendingSession(counterparties, protocolInitiator) + ) + whenever(outboundSessionPool.constructed().last().getNextSession(counterparties)).thenReturn( + OutboundSessionPool.SessionPoolStatus.SessionPending + ) + + val initiatorHandshakeMsg = mock() + whenever(protocolInitiator.generateOurHandshakeMessage(eq(PEER_KEY.public), eq(null), any())).thenReturn(initiatorHandshakeMsg) + val header = CommonHeader(MessageType.RESPONDER_HANDSHAKE, 1, sessionId, 4, Instant.now().toEpochMilli()) + val responderHello = ResponderHelloMessage(header, ByteBuffer.wrap(PEER_KEY.public.encoded)) + sessionManager.processSessionMessages(listOf(LinkInMessage(responderHello))) {it}.single().second + assertTrue(sessionManager.processOutboundMessages(listOf(message)) { it }.single().second + is SessionManager.SessionState.SessionAlreadyPending + ) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.sessionTimeout.plus(5.millis)) + verify(outboundSessionPool.constructed().last(), never()).replaceSession(any(), any(), any()) + + sessionManager.stop() + resourceHolder.close() + } + + @Test + fun `when responder handshake is received, the session is established, if no message is sent, the session times out if heartbeats are enabled`() { + val resourceHolder = ResourcesHolder() + val sessionManager = createSessionManager(resourceHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) sessionManager.start() val initiatorHello = mock() @@ -1767,52 +1777,57 @@ class SessionManagerTest { resourceHolder.close() } + @Test + fun `when responder handshake is received, the session is established, if no message is sent, the session doesn't time out if heartbeats are disabled`() { + val resourceHolder = ResourcesHolder() + val sessionManager = createSessionManager(resourceHolder, configWithOneSessionBetweenMembersAndNoHeartbeats, configWithNoHeartbeat) + sessionManager.start() + + val initiatorHello = mock() + whenever(protocolInitiator.generateInitiatorHello()).thenReturn(initiatorHello) + whenever(outboundSessionPool.constructed().last().getNextSession(counterparties)).thenReturn( + OutboundSessionPool.SessionPoolStatus.NewSessionsNeeded + ) + sessionManager.processOutboundMessages(listOf(message)) { it } + whenever(outboundSessionPool.constructed().last().getSession(protocolInitiator.sessionId)).thenReturn( + OutboundSessionPool.SessionType.PendingSession(counterparties, protocolInitiator) + ) + val header = CommonHeader( + MessageType.RESPONDER_HANDSHAKE, + 1, + protocolInitiator.sessionId, + 4, + Instant.now().toEpochMilli() + ) + val responderHandshakeMessage = ResponderHandshakeMessage(header, RANDOM_BYTES, RANDOM_BYTES) + val session = mock() + whenever(session.sessionId).doAnswer { protocolInitiator.sessionId } + whenever(protocolInitiator.getSession()).thenReturn(session) + sessionManager.processSessionMessages(listOf(LinkInMessage(responderHandshakeMessage))) { it } + + whenever(secondProtocolInitiator.generateInitiatorHello()).thenReturn(initiatorHello) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.sessionTimeout.plus(5.millis)) + + verify(outboundSessionPool.constructed().last(), never()).replaceSession(any(), any(), any(),) + verify(publisherWithDominoLogicByClientId["session-manager"]!!.last(), never()).publish(anyList()) + + sessionManager.stop() + resourceHolder.close() + } + @Test fun `when a responder handshake message is received, heartbeats are sent, if these are not acknowledged the session times out`() { val messages = mutableListOf() - fun callback(records: List>): List> { + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> val record = records.single() assertEquals(LINK_OUT_TOPIC, record.topic) messages.add((record.value as LinkOutMessage).payload as AuthenticatedDataMessage) - return listOf(CompletableFuture.completedFuture(Unit)) - } - - val resourcesHolder = ResourcesHolder() - val sessionManager = SessionManagerImpl( - groupPolicyProvider, membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) - } - @Suppress("UNCHECKED_CAST") - publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { - whenever(it.publish(any())).doAnswer { invocation -> - callback(invocation.arguments.first() as List>) - } + listOf(CompletableFuture.completedFuture(Unit)) } sessionManager.start() - startSendingHeartbeats(sessionManager) + startSession(sessionManager) whenever(outboundSessionPool.constructed().last().replaceSession(eq(counterparties), eq(sessionId), any())).thenReturn(true) whenever(secondProtocolInitiator.generateInitiatorHello()).thenReturn(mock()) @@ -1837,62 +1852,148 @@ class SessionManagerTest { } @Test - fun `when a responder handshake message is received, heartbeats are sent, this continues if the heartbeat manager gets a new config`() { + fun `when a responder handshake message is received, heartbeats are not sent, and sessions don't time out if heartbeats are disabled`() { + val messages = mutableListOf() + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager( + resourcesHolder, + configWithOneSessionBetweenMembersAndNoHeartbeats, + configWithNoHeartbeat + ) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> + val record = records.single() + assertEquals(LINK_OUT_TOPIC, record.topic) + messages.add((record.value as LinkOutMessage).payload as AuthenticatedDataMessage) + listOf(CompletableFuture.completedFuture(Unit)) + } + sessionManager.start() + startSession(sessionManager) + + whenever(outboundSessionPool.constructed().last().replaceSession(eq(counterparties), eq(sessionId), any())).thenReturn(true) + whenever(secondProtocolInitiator.generateInitiatorHello()).thenReturn(mock()) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.sessionTimeout.plus(5.millis)) + verify(outboundSessionPool.constructed().last(), never()).replaceSession(any(), any(), any()) + verify(publisherWithDominoLogicByClientId["session-manager"]!!.last(), never()).publish(anyList()) + + sessionManager.stop() + resourcesHolder.close() + + assertThat(messages).isEmpty() + } + + @Test + fun `when a responder handshake message is received, heartbeats are sent if enabled, this continues if the heartbeat manager gets a new config with heartbeats enabled`() { val messages = Collections.synchronizedList(mutableListOf()) - fun callback(records: List>): List> { + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> val record = records.single() val message = (record.value as LinkOutMessage).payload as AuthenticatedDataMessage messages.add(message) - return listOf(CompletableFuture.completedFuture(Unit)) + listOf(CompletableFuture.completedFuture(Unit)) } + sessionManager.start() + startSession(sessionManager) + + mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) + mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) + assertThat(messages.size).isEqualTo(2) + + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, configWithHeartbeat, mock()) + + mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) + mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) + assertThat(messages.size).isEqualTo(4) + + sessionManager.stop() + resourcesHolder.close() + } + + @Test + fun `when a responder handshake message is received, heartbeats are sent if enabled, this stops if the heartbeat manager gets a new config with heartbeats disabled`() { + val messages = Collections.synchronizedList(mutableListOf()) val resourcesHolder = ResourcesHolder() - val sessionManager = SessionManagerImpl( - groupPolicyProvider, membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, mock()) - } - @Suppress("UNCHECKED_CAST") - publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { - whenever(it.publish(any())).doAnswer { invocation -> - callback(invocation.arguments.first() as List>) - } + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> + val record = records.single() + val message = (record.value as LinkOutMessage).payload as AuthenticatedDataMessage + messages.add(message) + listOf(CompletableFuture.completedFuture(Unit)) } sessionManager.start() - startSendingHeartbeats(sessionManager) + startSession(sessionManager) mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) assertThat(messages.size).isEqualTo(2) - sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, configWithHeartbeat, mock()) + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithNoHeartbeat, configWithHeartbeat, mock()) + + startSession(sessionManager, secondProtocolInitiator) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + assertThat(messages.size).isEqualTo(2) + + sessionManager.stop() + resourcesHolder.close() + } + + @Test + fun `when a responder handshake message is received, heartbeats are not sent if disabled, this continues if the heartbeat manager gets a new config with heartbeats disabled`() { + val messages = Collections.synchronizedList(mutableListOf()) + + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembersAndNoHeartbeats, configWithNoHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> + val record = records.single() + val message = (record.value as LinkOutMessage).payload as AuthenticatedDataMessage + messages.add(message) + listOf(CompletableFuture.completedFuture(Unit)) + } + sessionManager.start() + startSession(sessionManager) + + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + assertThat(messages).isEmpty() + + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithNoHeartbeat, configWithNoHeartbeat, mock()) + + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + assertThat(messages).isEmpty() + + sessionManager.stop() + resourcesHolder.close() + } + + @Test + fun `when a responder handshake message is received, heartbeats are not sent if disabled, they start if the heartbeat manager gets a new config with heartbeats enabled`() { + val messages = Collections.synchronizedList(mutableListOf()) + + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembersAndNoHeartbeats, configWithNoHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> + val record = records.single() + val message = (record.value as LinkOutMessage).payload as AuthenticatedDataMessage + messages.add(message) + listOf(CompletableFuture.completedFuture(Unit)) + } + sessionManager.start() + startSession(sessionManager) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.heartbeatPeriod.plus(5.millis)) + assertThat(messages).isEmpty() + + sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, configWithNoHeartbeat, mock()) + + startSession(sessionManager, secondProtocolInitiator) mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) - assertThat(messages.size).isEqualTo(4) + assertThat(messages).hasSize(2) sessionManager.stop() resourcesHolder.close() @@ -1902,53 +2003,21 @@ class SessionManagerTest { fun `when a responder handshake message is received, heartbeats are sent, this stops if the session manager gets a new config`() { var linkOutMessages = 0 val resourcesHolder = ResourcesHolder() - fun callback(records: List>): List> { + + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> for (record in records) { if (record.topic == LINK_OUT_TOPIC) { linkOutMessages++ } } - return listOf(CompletableFuture.completedFuture(Unit)) - } - - val sessionManager = SessionManagerImpl( - groupPolicyProvider, membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) - } - @Suppress("UNCHECKED_CAST") - publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { - whenever(it.publish(any())).doAnswer { invocation -> - callback(invocation.arguments.first() as List>) - } + listOf(CompletableFuture.completedFuture(Unit)) } sessionManager.start() whenever(outboundSessionPool.constructed().last().replaceSession(eq(counterparties), eq(sessionId), any())).thenReturn(true) whenever(outboundSessionPool.constructed().last().getAllSessionIds()).thenAnswer { (listOf(protocolInitiator.sessionId)) } - startSendingHeartbeats(sessionManager) + startSession(sessionManager) repeat(2) { mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) } assertThat(linkOutMessages).isEqualTo(2) @@ -1980,52 +2049,19 @@ class SessionManagerTest { val resourcesHolder = ResourcesHolder() val messages = Collections.synchronizedList(mutableListOf()) - fun callback(records: List>): List> { + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> val record = records.single() assertEquals(LINK_OUT_TOPIC, record.topic) val message = (record.value as LinkOutMessage).payload as AuthenticatedDataMessage messages.add(message) - return listOf(CompletableFuture.completedFuture(Unit)) - } - - val sessionManager = SessionManagerImpl( - groupPolicyProvider, membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) - } - publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { - whenever(it.publish(any())).doAnswer { invocation -> - @Suppress("UNCHECKED_CAST") - callback(invocation.arguments.first() as List>) - } + listOf(CompletableFuture.completedFuture(Unit)) } sessionManager.start() whenever(outboundSessionPool.constructed().last().replaceSession(eq(counterparties), eq(sessionId), any())).thenReturn(true) whenever(outboundSessionPool.constructed().last().getAllSessionIds()).thenAnswer { (listOf(protocolInitiator.sessionId)) } - startSendingHeartbeats(sessionManager) + startSession(sessionManager) // sum of heartbeats extending over the session timeout val numberOfHeartbeats = configWithHeartbeat.let { @@ -2059,41 +2095,15 @@ class SessionManagerTest { } val resourcesHolder = ResourcesHolder() - val sessionManager = SessionManagerImpl( - groupPolicyProvider, membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - sessionHealthManagerConfigHandler.applyNewConfiguration(configWithHeartbeat, null, resourcesHolder) - } - publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) + publisherWithDominoLogicByClientId[SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { whenever(it.publish(any())).doAnswer { publish() } } sessionManager.start() whenever(outboundSessionPool.constructed().last().replaceSession(eq(counterparties), eq(sessionId), any())).thenReturn(true) whenever(outboundSessionPool.constructed().last().getAllSessionIds()).thenAnswer { (listOf(protocolInitiator.sessionId)) } - startSendingHeartbeats(sessionManager) + startSession(sessionManager) repeat(3) { mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.heartbeatPeriod.plus(5.millis)) } assertThat(sentHeartbeats).isEqualTo(3) @@ -2155,55 +2165,20 @@ class SessionManagerTest { Duration.ofDays(10) ) val messages = Collections.synchronizedList(mutableListOf()) - - fun callback(records: List>): List> { + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager(resourcesHolder, configWithOneSessionBetweenMembers, longTimePeriodConfigWithHeartbeat) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> val record = records.single() assertEquals(LINK_OUT_TOPIC, record.topic) val message = (record.value as LinkOutMessage).payload as AuthenticatedDataMessage messages.add(message) - return listOf(CompletableFuture.completedFuture(Unit)) - } - - val resourcesHolder = ResourcesHolder() - val sessionManager = SessionManagerImpl( - groupPolicyProvider, membershipGroupReaderProvider, - cryptoOpsClient, - pendingSessionMessageQueues, - mock(), - mock(), - mock(), - mock(), - mock { - val dominoTile = mock { - whenever(it.coordinatorName).doReturn(LifecycleCoordinatorName("", "")) - } - on { it.dominoTile } doReturn dominoTile - }, - linkManagerHostingMap, - protocolFactory, - mockTimeFacilitiesProvider.clock, - sessionReplayer, - ) { mockTimeFacilitiesProvider.mockScheduledExecutor }.apply { - setRunning() - configHandler.applyNewConfiguration( - configWithOneSessionBetweenMembers, - null, - mock(), - ) - sessionHealthManagerConfigHandler.applyNewConfiguration(longTimePeriodConfigWithHeartbeat, null, mock()) + listOf(CompletableFuture.completedFuture(Unit)) } - @Suppress("UNCHECKED_CAST") - publisherWithDominoLogicByClientId[SessionManagerImpl.SessionHealthManager.SESSION_HEALTH_MANAGER_CLIENT_ID]!!.forEach { - whenever(it.publish(any())).doAnswer { invocation -> - callback(invocation.arguments.first() as List>) - } - } - val header = CommonHeader(MessageType.RESPONDER_HANDSHAKE, 1, protocolInitiator.sessionId, 4, Instant.now().toEpochMilli()) val responderHello = ResponderHelloMessage(header, ByteBuffer.wrap(PEER_KEY.public.encoded)) sessionManager.processSessionMessages(listOf(LinkInMessage(responderHello))) {it} - startSendingHeartbeats(sessionManager) + startSession(sessionManager) fun advanceTimeAndAcknowledgeMessages() { @@ -2454,4 +2429,95 @@ class SessionManagerTest { eq(TimeUnit.MILLISECONDS) ) } + + @Test + fun `when heartbeats are disabled and a message is sent, the session will timeout if the message is not acknowledged`() { + val messages = mutableListOf() + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager( + resourcesHolder, + configWithOneSessionBetweenMembersAndNoHeartbeats, + configWithNoHeartbeat + ) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> + val record = records.single() + assertEquals(LINK_OUT_TOPIC, record.topic) + messages.add((record.value as LinkOutMessage).payload as AuthenticatedDataMessage) + listOf(CompletableFuture.completedFuture(Unit)) + } + sessionManager.start() + startSession(sessionManager) + mockTimeFacilitiesProvider.advanceTime(5.millis) + sessionManager.dataMessageSent(authenticatedSession) + + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.sessionTimeout.plus(5.millis)) + verify(outboundSessionPool.constructed().last()).replaceSession(eq(counterparties), eq(sessionId), any()) + + sessionManager.stop() + resourcesHolder.close() + + assertThat(messages).isEmpty() + } + + @Test + fun `when heartbeats are disabled and a message is sent then acknowledged, the session will not timeout`() { + val messages = mutableListOf() + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager( + resourcesHolder, + configWithOneSessionBetweenMembersAndNoHeartbeats, + configWithNoHeartbeat + ) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> + val record = records.single() + assertEquals(LINK_OUT_TOPIC, record.topic) + messages.add((record.value as LinkOutMessage).payload as AuthenticatedDataMessage) + listOf(CompletableFuture.completedFuture(Unit)) + } + sessionManager.start() + startSession(sessionManager) + mockTimeFacilitiesProvider.advanceTime(5.millis) + sessionManager.dataMessageSent(authenticatedSession) + mockTimeFacilitiesProvider.advanceTime(5.millis) + sessionManager.messageAcknowledged(authenticatedSession.sessionId) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.sessionTimeout.plus(5.millis)) + + verify(outboundSessionPool.constructed().last(), never()).replaceSession(eq(counterparties), eq(sessionId), any()) + + sessionManager.stop() + resourcesHolder.close() + + assertThat(messages).isEmpty() + } + + @Test + fun `when heartbeats are disabled and a message is sent and not acknowledged, the session will timeout only after the session timeout duration has passed`() { + val messages = mutableListOf() + val resourcesHolder = ResourcesHolder() + val sessionManager = createSessionManager( + resourcesHolder, + configWithOneSessionBetweenMembersAndNoHeartbeats, + configWithNoHeartbeat + ) + mockSessionHealthManagerPublisherAndCaptureRecords { records -> + val record = records.single() + assertEquals(LINK_OUT_TOPIC, record.topic) + messages.add((record.value as LinkOutMessage).payload as AuthenticatedDataMessage) + listOf(CompletableFuture.completedFuture(Unit)) + } + sessionManager.start() + startSession(sessionManager) + mockTimeFacilitiesProvider.advanceTime(5.millis) + sessionManager.dataMessageSent(authenticatedSession) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.sessionTimeout.dividedBy(2)) + sessionManager.messageAcknowledged(authenticatedSession.sessionId) + mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.sessionTimeout.dividedBy(2).plus(5.millis)) + + verify(outboundSessionPool.constructed().last(), never()).replaceSession(eq(counterparties), eq(sessionId), any()) + + sessionManager.stop() + resourcesHolder.close() + + assertThat(messages).isEmpty() + } } From ed747a22b43f3fc9af717542471ba5349f92c311 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Tue, 16 Jan 2024 09:50:13 +0000 Subject: [PATCH 19/24] detekt --- .../p2p/linkmanager/sessions/SessionManagerTest.kt | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt index 91b2bb554dd..af92c12af0b 100644 --- a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt +++ b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt @@ -1673,6 +1673,7 @@ class SessionManagerTest { } @Test + @Suppress("MaxLineLength") fun `when responder hello is received, the session is pending, if no response is received, the session times out if heartbeats are enabled`() { val resourceHolder = ResourcesHolder() val sessionManager = createSessionManager(resourceHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) @@ -1702,6 +1703,7 @@ class SessionManagerTest { } @Test + @Suppress("MaxLineLength") fun `when responder hello is received, the session is pending, if no response is received, the session does not time out if heartbeats are disabled`() { val resourceHolder = ResourcesHolder() val sessionManager = createSessionManager(resourceHolder, configWithOneSessionBetweenMembersAndNoHeartbeats, configWithNoHeartbeat) @@ -1731,6 +1733,7 @@ class SessionManagerTest { } @Test + @Suppress("MaxLineLength") fun `when responder handshake is received, the session is established, if no message is sent, the session times out if heartbeats are enabled`() { val resourceHolder = ResourcesHolder() val sessionManager = createSessionManager(resourceHolder, configWithOneSessionBetweenMembers, configWithHeartbeat) @@ -1778,6 +1781,7 @@ class SessionManagerTest { } @Test + @Suppress("MaxLineLength") fun `when responder handshake is received, the session is established, if no message is sent, the session doesn't time out if heartbeats are disabled`() { val resourceHolder = ResourcesHolder() val sessionManager = createSessionManager(resourceHolder, configWithOneSessionBetweenMembersAndNoHeartbeats, configWithNoHeartbeat) @@ -1852,6 +1856,7 @@ class SessionManagerTest { } @Test + @Suppress("MaxLineLength") fun `when a responder handshake message is received, heartbeats are not sent, and sessions don't time out if heartbeats are disabled`() { val messages = mutableListOf() val resourcesHolder = ResourcesHolder() @@ -1882,6 +1887,7 @@ class SessionManagerTest { } @Test + @Suppress("MaxLineLength") fun `when a responder handshake message is received, heartbeats are sent if enabled, this continues if the heartbeat manager gets a new config with heartbeats enabled`() { val messages = Collections.synchronizedList(mutableListOf()) @@ -1911,6 +1917,7 @@ class SessionManagerTest { } @Test + @Suppress("MaxLineLength") fun `when a responder handshake message is received, heartbeats are sent if enabled, this stops if the heartbeat manager gets a new config with heartbeats disabled`() { val messages = Collections.synchronizedList(mutableListOf()) @@ -1941,6 +1948,7 @@ class SessionManagerTest { } @Test + @Suppress("MaxLineLength") fun `when a responder handshake message is received, heartbeats are not sent if disabled, this continues if the heartbeat manager gets a new config with heartbeats disabled`() { val messages = Collections.synchronizedList(mutableListOf()) @@ -1970,6 +1978,7 @@ class SessionManagerTest { } @Test + @Suppress("MaxLineLength") fun `when a responder handshake message is received, heartbeats are not sent if disabled, they start if the heartbeat manager gets a new config with heartbeats enabled`() { val messages = Collections.synchronizedList(mutableListOf()) @@ -2491,6 +2500,7 @@ class SessionManagerTest { } @Test + @Suppress("MaxLineLength") fun `when heartbeats are disabled and a message is sent and not acknowledged, the session will timeout only after the session timeout duration has passed`() { val messages = mutableListOf() val resourcesHolder = ResourcesHolder() From 18d6a98f92311b460c86fe1c5496c2437b9fe8e7 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Tue, 16 Jan 2024 14:24:32 +0000 Subject: [PATCH 20/24] reverting log level back to original trace level --- .../net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index b36600b5ceb..bf8710eed55 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -98,6 +98,7 @@ import net.corda.p2p.linkmanager.metrics.recordInboundSessionTimeoutMetric import net.corda.p2p.linkmanager.metrics.recordOutboundHeartbeatMessagesMetric import net.corda.p2p.linkmanager.metrics.recordOutboundSessionTimeoutMetric import net.corda.p2p.linkmanager.sessions.SessionManagerWarnings.badGroupPolicy +import net.corda.utilities.trace import kotlin.concurrent.read import kotlin.concurrent.write @@ -1324,10 +1325,10 @@ internal class SessionManagerImpl( val timeSinceLastSend = timeStamp() - sessionInfo.lastSendTimestamp if (timeSinceLastSend >= config.heartbeatPeriod.toMillis()) { - logger.info ( + logger.trace { "Sending heartbeat message between ${counterparties.ourId} (our Identity) and " + "${counterparties.counterpartyId}." - ) + } sendHeartbeatMessage( counterparties.ourId, counterparties.counterpartyId, From bf2f5eee9660a0141f6395538b912f253ddacf42 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Tue, 16 Jan 2024 15:18:02 +0000 Subject: [PATCH 21/24] set latest API version --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 92f1c5aa7e1..f9cef8e7e10 100644 --- a/gradle.properties +++ b/gradle.properties @@ -42,7 +42,7 @@ commonsLangVersion = 3.12.0 commonsTextVersion = 1.10.0 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.2.0.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.2.0.28-alpha-1705339976179 +cordaApiVersion=5.2.0.28-alpha-1705416974920 disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 From 2617d6e492f4b9d3ff601393fe964822ad95837b Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Tue, 16 Jan 2024 15:53:51 +0000 Subject: [PATCH 22/24] revert additional bug fix --- .../p2p/linkmanager/inbound/InboundMessageProcessor.kt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/inbound/InboundMessageProcessor.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/inbound/InboundMessageProcessor.kt index f179c7a24d4..624f65a2a8a 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/inbound/InboundMessageProcessor.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/inbound/InboundMessageProcessor.kt @@ -127,10 +127,8 @@ internal class InboundMessageProcessor( private fun processSessionMessages(messages: List>): List>, LinkInMessage>> { - val responses = sessionManager.processSessionMessages(messages) { message -> - recordInboundSessionMessagesMetric() - message.item - } + recordInboundSessionMessagesMetric() + val responses = sessionManager.processSessionMessages(messages) {message -> message.item} return responses.map { (traceableMessage, response) -> if (response != null) { when (val payload = response.payload) { From 49d81841a5e2be96e56b15c07645e19b510ce430 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Tue, 16 Jan 2024 17:12:55 +0000 Subject: [PATCH 23/24] PR feedback --- .../sessions/SessionManagerImpl.kt | 37 ++++++++++++------- .../sessions/SessionManagerTest.kt | 11 ++---- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt index bf8710eed55..d6ddcc66b40 100644 --- a/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt +++ b/components/link-manager/src/main/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerImpl.kt @@ -1153,7 +1153,7 @@ internal class SessionManagerImpl( initialTrackedSession.lastSendTimestamp = timestamp initialTrackedSession } else { - scheduleOutboundSessionTimeout(counterparties, sessionId, config.get().sessionTimeout.toMillis()) + scheduleOutboundSessionTimeout(counterparties, sessionId, config.get().sessionTimeout) TrackedOutboundSession(counterparties, timestamp, timestamp) } } @@ -1216,18 +1216,27 @@ internal class SessionManagerImpl( recordOutboundSessionTimeoutMetric(counterparties.ourId, counterparties.counterpartyId) } - private fun scheduleOutboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String, delay: Long) { + private fun scheduleOutboundSessionTimeout( + counterparties: SessionCounterparties, + sessionId: String, + delay: Duration + ) { executorService.schedule( - { sessionHealthMonitor.get().outboundSessionTimeout(counterparties, sessionId) }, - delay, + { sessionHealthMonitor.get().checkIfOutboundSessionTimeout(counterparties, sessionId) }, + delay.toMillis(), TimeUnit.MILLISECONDS ) } - private fun scheduleInboundSessionTimeout(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?, delay: Long) { + private fun scheduleInboundSessionTimeout( + sessionId: String, + source: HoldingIdentity, + destination: HoldingIdentity?, + delay: Duration + ) { executorService.schedule( { inboundSessionTimeout(sessionId, source, destination) }, - delay, + delay.toMillis(), TimeUnit.MILLISECONDS ) } @@ -1245,7 +1254,7 @@ internal class SessionManagerImpl( trackedInboundSessions.remove(sessionId) recordInboundSessionTimeoutMetric(source, destination) } else { - scheduleInboundSessionTimeout(sessionId, source, destination, sessionTimeoutMs - timeSinceLastReceived) + scheduleInboundSessionTimeout(sessionId, source, destination, Duration.ofMillis(sessionTimeoutMs - timeSinceLastReceived)) } } @@ -1257,12 +1266,12 @@ internal class SessionManagerImpl( * Implementations of [SessionHealthMonitor] provide different methods of determining when a session has become * unhealthy and handling of unhealthy sessions. */ - private interface SessionHealthMonitor { + sealed interface SessionHealthMonitor { fun sessionEstablished(session: Session) fun messageReceived(sessionId: String, source: HoldingIdentity, destination: HoldingIdentity?) - fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) + fun checkIfOutboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) } /** @@ -1289,13 +1298,13 @@ internal class SessionManagerImpl( initialTrackedSession.lastReceivedTimestamp = timeStamp() initialTrackedSession } else { - scheduleInboundSessionTimeout(sessionId, source, destination, config.get().sessionTimeout.toMillis()) + scheduleInboundSessionTimeout(sessionId, source, destination, config.get().sessionTimeout) TrackedInboundSession(timeStamp()) } } } - override fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { + override fun checkIfOutboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { val sessionInfo = trackedOutboundSessions[sessionId] ?: return val timeSinceLastAck = timeStamp() - sessionInfo.lastAckTimestamp val sessionTimeoutMs = config.get().sessionTimeout.toMillis() @@ -1307,7 +1316,7 @@ internal class SessionManagerImpl( ) tearDownOutboundSession(counterparties, sessionId) } else { - scheduleOutboundSessionTimeout(counterparties, sessionId, sessionTimeoutMs - timeSinceLastAck) + scheduleOutboundSessionTimeout(counterparties, sessionId, Duration.ofMillis(sessionTimeoutMs - timeSinceLastAck)) } } @@ -1412,7 +1421,7 @@ internal class SessionManagerImpl( ) } - override fun outboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { + override fun checkIfOutboundSessionTimeout(counterparties: SessionCounterparties, sessionId: String) { val sessionInfo = trackedOutboundSessions[sessionId] ?: return val now = timeStamp() val timeSinceLastAck = now - sessionInfo.lastAckTimestamp @@ -1434,7 +1443,7 @@ internal class SessionManagerImpl( } else { maxWaitForAck } - scheduleOutboundSessionTimeout(counterparties, sessionId, delay) + scheduleOutboundSessionTimeout(counterparties, sessionId, Duration.ofMillis(delay)) } } } diff --git a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt index af92c12af0b..cf05daa4934 100644 --- a/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt +++ b/components/link-manager/src/test/kotlin/net/corda/p2p/linkmanager/sessions/SessionManagerTest.kt @@ -69,7 +69,6 @@ import org.assertj.core.api.Assertions.assertThat import org.bouncycastle.jce.provider.BouncyCastleProvider import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -1692,9 +1691,8 @@ class SessionManagerTest { val header = CommonHeader(MessageType.RESPONDER_HANDSHAKE, 1, sessionId, 4, Instant.now().toEpochMilli()) val responderHello = ResponderHelloMessage(header, ByteBuffer.wrap(PEER_KEY.public.encoded)) sessionManager.processSessionMessages(listOf(LinkInMessage(responderHello))) {it}.single().second - assertTrue(sessionManager.processOutboundMessages(listOf(message)) { it }.single().second - is SessionManager.SessionState.SessionAlreadyPending - ) + assertThat(sessionManager.processOutboundMessages(listOf(message)) { it }.single().second) + .isInstanceOf(SessionManager.SessionState.SessionAlreadyPending::class.java) mockTimeFacilitiesProvider.advanceTime(configWithHeartbeat.sessionTimeout.plus(5.millis)) verify(outboundSessionPool.constructed().last()).replaceSession(counterparties, sessionId, protocolInitiator) @@ -1722,9 +1720,8 @@ class SessionManagerTest { val header = CommonHeader(MessageType.RESPONDER_HANDSHAKE, 1, sessionId, 4, Instant.now().toEpochMilli()) val responderHello = ResponderHelloMessage(header, ByteBuffer.wrap(PEER_KEY.public.encoded)) sessionManager.processSessionMessages(listOf(LinkInMessage(responderHello))) {it}.single().second - assertTrue(sessionManager.processOutboundMessages(listOf(message)) { it }.single().second - is SessionManager.SessionState.SessionAlreadyPending - ) + assertThat(sessionManager.processOutboundMessages(listOf(message)) { it }.single().second) + .isInstanceOf(SessionManager.SessionState.SessionAlreadyPending::class.java) mockTimeFacilitiesProvider.advanceTime(configWithNoHeartbeat.sessionTimeout.plus(5.millis)) verify(outboundSessionPool.constructed().last(), never()).replaceSession(any(), any(), any()) From 77eeaa3f1a2423307923c75349fe370844d62821 Mon Sep 17 00:00:00 2001 From: Charlie Crean Date: Wed, 17 Jan 2024 09:46:13 +0000 Subject: [PATCH 24/24] set API version --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index f9cef8e7e10..80f1a1031a2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -42,7 +42,7 @@ commonsLangVersion = 3.12.0 commonsTextVersion = 1.10.0 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.2.0.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.2.0.28-alpha-1705416974920 +cordaApiVersion=5.2.0.28-beta+ disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26