diff --git a/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/bus/CryptoRekeyBusProcessor.kt b/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/bus/CryptoRekeyBusProcessor.kt index a2511458f72..3905b81d6e7 100644 --- a/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/bus/CryptoRekeyBusProcessor.kt +++ b/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/bus/CryptoRekeyBusProcessor.kt @@ -10,6 +10,7 @@ import net.corda.data.crypto.wire.ops.key.rotation.KeyRotationRequest import net.corda.data.crypto.wire.ops.key.rotation.KeyRotationStatus import net.corda.data.crypto.wire.ops.key.rotation.KeyType import net.corda.libs.statemanager.api.Metadata +import net.corda.libs.statemanager.api.STATE_TYPE import net.corda.libs.statemanager.api.State import net.corda.libs.statemanager.api.StateManager import net.corda.messaging.api.processor.DurableProcessor @@ -107,8 +108,16 @@ class CryptoRekeyBusProcessor( now ) - val flattend = checkNotNull(serializer.serialize(status)) - stateManager?.create(listOf(State(request.requestId, flattend, 1, Metadata(), now))) + val flattened = checkNotNull(serializer.serialize(status)) + stateManager?.create( + listOf( + State( + request.requestId, + flattened, 1, + Metadata(mapOf(STATE_TYPE to status::class.java.name)) + ) + ) + ) } return emptyList() diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowTimeoutTaskProcessor.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowTimeoutTaskProcessor.kt index fe595180c57..ce217f408bd 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowTimeoutTaskProcessor.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowTimeoutTaskProcessor.kt @@ -1,14 +1,16 @@ package net.corda.flow.maintenance import net.corda.data.flow.FlowTimeout +import net.corda.data.flow.state.checkpoint.Checkpoint import net.corda.data.scheduler.ScheduledTaskTrigger import net.corda.flow.state.impl.CheckpointMetadataKeys.STATE_META_SESSION_EXPIRY_KEY import net.corda.libs.configuration.SmartConfig import net.corda.libs.statemanager.api.IntervalFilter import net.corda.libs.statemanager.api.MetadataFilter import net.corda.libs.statemanager.api.Operation +import net.corda.libs.statemanager.api.STATE_TYPE import net.corda.libs.statemanager.api.StateManager -import net.corda.messaging.api.constants.MessagingMetadataKeys +import net.corda.messaging.api.constants.MessagingMetadataKeys.PROCESSING_FAILURE import net.corda.messaging.api.processor.DurableProcessor import net.corda.messaging.api.records.Record import net.corda.schema.Schemas.Flow.FLOW_TIMEOUT_TOPIC @@ -19,11 +21,13 @@ import org.slf4j.LoggerFactory import java.time.Instant /** - * Automatically scheduled by Corda for cleaning up timed out flows. A flow is timed out if and only if any of the - * following conditions is true: + * Automatically scheduled by Corda for cleaning up timed out flows. + * A flow will be picked up to be timed out if and only if any of the following conditions is true: * - Flow has at least one opened session that timed out. * - Flow hasn't been updated within the configured maximum idle time. * - Flow processing marked as failed by the messaging layer (key [MessagingMetadataKeys.PROCESSING_FAILURE] set). + * + * TODO - Execute a single State Manager API call once all filters are supported at once. */ class FlowTimeoutTaskProcessor( private val stateManager: StateManager, @@ -38,31 +42,38 @@ class FlowTimeoutTaskProcessor( override val valueClass = ScheduledTaskTrigger::class.java private val maxIdleTimeMilliseconds = config.getLong(FlowConfig.PROCESSING_MAX_IDLE_TIME) + private fun idleTimeOutExpired() = + // Flows that have not been updated in at least [maxIdleTimeMilliseconds] + stateManager.findUpdatedBetweenWithMetadataMatchingAll( + IntervalFilter( + Instant.EPOCH, + now().minusMillis(maxIdleTimeMilliseconds) + ), + listOf( + MetadataFilter(STATE_TYPE, Operation.Equals, Checkpoint::class.java.name), + ) + ) + + private fun sessionExpiredFailureSignaledByMessagingLayer() = + // Flows timed out by the messaging layer + sessions timed out + stateManager.findByMetadataMatchingAny( + listOf( + // Time out signaled by the messaging layer + MetadataFilter(PROCESSING_FAILURE, Operation.Equals, true), + // Session expired + MetadataFilter(STATE_META_SESSION_EXPIRY_KEY, Operation.LesserThan, now().epochSecond), + ) + ) + override fun onNext(events: List>): List> { // If we receive multiple, there's probably an issue somewhere, and we can ignore all but the last one. return events.lastOrNull { it.key == ScheduledTask.SCHEDULED_TASK_NAME_SESSION_TIMEOUT }?.value?.let { trigger -> logger.trace("Processing trigger scheduled at {}", trigger.timestamp) - // TODO - temporary query - // TODO - we must be able to limit by type of state - val flowsToTimeOut = - // Flows timed out by the messaging layer + sessions timed out - stateManager.findByMetadataMatchingAny( - listOf( - // Time out signaled by the messaging layer - MetadataFilter(MessagingMetadataKeys.PROCESSING_FAILURE, Operation.Equals, true), - // Session expired - MetadataFilter(STATE_META_SESSION_EXPIRY_KEY, Operation.LesserThan, now().epochSecond), - ) - ) + - // Flows that have not been updated in at least [maxIdleTime] seconds - stateManager.updatedBetween( - IntervalFilter( - Instant.EPOCH, - now().minusMillis(maxIdleTimeMilliseconds) - ) - ) + val flowsToTimeOut = (idleTimeOutExpired() + sessionExpiredFailureSignaledByMessagingLayer()).filter { + it.value.metadata.containsKeyWithValue(STATE_TYPE, Checkpoint::class.java.name) + } if (flowsToTimeOut.isEmpty()) { logger.trace("No flows to time out") diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowTimeoutTaskProcessorTests.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowTimeoutTaskProcessorTests.kt index 54979a9ee24..8612001cfcd 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowTimeoutTaskProcessorTests.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowTimeoutTaskProcessorTests.kt @@ -1,10 +1,12 @@ package net.corda.flow.maintenance import net.corda.data.flow.FlowTimeout +import net.corda.data.flow.state.checkpoint.Checkpoint import net.corda.data.scheduler.ScheduledTaskTrigger import net.corda.flow.state.impl.CheckpointMetadataKeys.STATE_META_SESSION_EXPIRY_KEY import net.corda.libs.configuration.SmartConfig import net.corda.libs.statemanager.api.Metadata +import net.corda.libs.statemanager.api.STATE_TYPE import net.corda.libs.statemanager.api.State import net.corda.libs.statemanager.api.StateManager import net.corda.messaging.api.constants.MessagingMetadataKeys @@ -23,27 +25,42 @@ import java.time.Instant class FlowTimeoutTaskProcessorTests { private val now = Instant.now() + private val checkpointMetadata = STATE_TYPE to Checkpoint::class.java.name private val flowConfig = mock().apply { whenever(getLong(any())).thenReturn(10L) } - private val idleState = State("idle", randomBytes(), 0) + private val nonCheckpointState = State("not-a-checkpoint", randomBytes(), 0) + private val idleState = State("idle", randomBytes(), 0, Metadata(mapOf(checkpointMetadata))) private val sessionTimeoutState = State( "sessionTimeout", randomBytes(), 0, - Metadata(mapOf(STATE_META_SESSION_EXPIRY_KEY to now.minusSeconds(1).epochSecond.toInt())) + Metadata( + mapOf( + checkpointMetadata, + STATE_META_SESSION_EXPIRY_KEY to now.minusSeconds(1).epochSecond.toInt() + ) + ) ) private val messagingLayerTimeoutState = State( "messagingLayerTimeout", randomBytes(), 0, - Metadata(mapOf(MessagingMetadataKeys.PROCESSING_FAILURE to true)) + Metadata( + mapOf( + checkpointMetadata, + MessagingMetadataKeys.PROCESSING_FAILURE to true + ) + ) ) private val stateManager = mock { - on { updatedBetween(any()) } doReturn (mapOf(idleState.key to idleState)) on { findByMetadataMatchingAny(any()) } doReturn (mapOf( sessionTimeoutState.key to sessionTimeoutState, messagingLayerTimeoutState.key to messagingLayerTimeoutState )) + on { findUpdatedBetweenWithMetadataMatchingAll(any(), any()) } doReturn (mapOf( + idleState.key to idleState, + nonCheckpointState.key to nonCheckpointState + )) } private val record1 = Record( Schemas.ScheduledTask.SCHEDULED_TASK_NAME_SESSION_TIMEOUT, @@ -96,8 +113,12 @@ class FlowTimeoutTaskProcessorTests { @Test fun `when no states are found return empty list of clean up records`() { - whenever(stateManager.updatedBetween(any())).doReturn(emptyMap()) - whenever(stateManager.findByMetadataMatchingAny(any())).doReturn(emptyMap()) + whenever( + stateManager.findByMetadataMatchingAny(any()) + ).doReturn(mapOf(nonCheckpointState.key to nonCheckpointState)) + whenever( + stateManager.findUpdatedBetweenWithMetadataMatchingAll(any(), any()) + ).doReturn(mapOf(nonCheckpointState.key to nonCheckpointState)) val output = processor.onNext(listOf(record1)) assertThat(output).isEmpty() diff --git a/components/ledger/ledger-utxo-token-cache/src/main/kotlin/net/corda/ledger/utxo/token/cache/services/PerformanceClaimStateStoreImpl.kt b/components/ledger/ledger-utxo-token-cache/src/main/kotlin/net/corda/ledger/utxo/token/cache/services/PerformanceClaimStateStoreImpl.kt index 1388a6a4ed4..dc6655bd5c6 100644 --- a/components/ledger/ledger-utxo-token-cache/src/main/kotlin/net/corda/ledger/utxo/token/cache/services/PerformanceClaimStateStoreImpl.kt +++ b/components/ledger/ledger-utxo-token-cache/src/main/kotlin/net/corda/ledger/utxo/token/cache/services/PerformanceClaimStateStoreImpl.kt @@ -2,6 +2,8 @@ package net.corda.ledger.utxo.token.cache.services import net.corda.data.ledger.utxo.token.selection.state.TokenPoolCacheState import net.corda.ledger.utxo.token.cache.entities.TokenPoolKey +import net.corda.libs.statemanager.api.Metadata +import net.corda.libs.statemanager.api.STATE_TYPE import net.corda.libs.statemanager.api.State import net.corda.libs.statemanager.api.StateManager import net.corda.tracing.wrapWithTracingExecutor @@ -156,6 +158,7 @@ class PerformanceClaimStateStoreImpl( val newStoredState = State( key = tokenPoolKey.toString(), value = stateBytes, + metadata = Metadata(mapOf(STATE_TYPE to tokenPoolCacheState::class.java.name)), modifiedTime = clock.instant() ) diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/StateManagerHelper.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/StateManagerHelper.kt index a118177c542..ad8730ff649 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/StateManagerHelper.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/StateManagerHelper.kt @@ -4,6 +4,7 @@ import net.corda.avro.serialization.CordaAvroDeserializer import net.corda.avro.serialization.CordaAvroSerializer import net.corda.data.messaging.mediator.MediatorState import net.corda.libs.statemanager.api.Metadata +import net.corda.libs.statemanager.api.STATE_TYPE import net.corda.libs.statemanager.api.State import net.corda.libs.statemanager.api.StateManager import net.corda.libs.statemanager.api.metadata @@ -37,11 +38,12 @@ class StateManagerHelper( mediatorState.state = ByteBuffer.wrap(serializedValue) val mediatorStateBytes = serializer.serialize(mediatorState) ?: throw IllegalStateException("Serialized mediator state was null. This should not be possible!") + val stateType = newState!!.value!!::class.java.name State( key, mediatorStateBytes, persistedState?.version ?: State.VERSION_INITIAL_VALUE, - mergeMetadata(persistedState?.metadata, newState?.metadata), + mergeMetadata(persistedState?.metadata, newState.metadata, stateType), ) } @@ -57,11 +59,11 @@ class StateManagerHelper( ) } - private fun mergeMetadata(existing: Metadata?, newMetadata: Metadata?): Metadata { + private fun mergeMetadata(existing: Metadata?, newMetadata: Metadata?, stateType: String): Metadata { val map = (existing ?: metadata()).toMutableMap() - newMetadata?.forEach { - map[it.key] = it.value - } + newMetadata?.forEach { map[it.key] = it.value } + map[STATE_TYPE] = stateType + return Metadata(map) } @@ -77,7 +79,7 @@ class StateManagerHelper( /** * Deserializes state value. * - * @param state State. + * @param mediatorState State. * @return Deserialized state value. */ fun deserializeValue(mediatorState: MediatorState?) = @@ -92,4 +94,4 @@ class StateManagerHelper( */ fun deserializeMediatorState(state: State?) = state?.value?.let { mediatorStateDeserializer.deserialize(it) } -} \ No newline at end of file +} diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/StateManagerHelperTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/StateManagerHelperTest.kt index dd30b38e93b..1d0add1cba7 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/StateManagerHelperTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/StateManagerHelperTest.kt @@ -4,6 +4,7 @@ import net.corda.avro.serialization.CordaAvroDeserializer import net.corda.avro.serialization.CordaAvroSerializer import net.corda.data.messaging.mediator.MediatorState import net.corda.libs.statemanager.api.Metadata +import net.corda.libs.statemanager.api.STATE_TYPE import net.corda.libs.statemanager.api.State import net.corda.libs.statemanager.api.State.Companion.VERSION_INITIAL_VALUE import net.corda.messaging.api.constants.MessagingMetadataKeys.PROCESSING_FAILURE @@ -46,7 +47,6 @@ class StateManagerHelperTest { @Test fun `successfully creates new state`() { - val persistedState: State? = null val newState = StateAndEventProcessor.State( StateType(1), @@ -66,7 +66,7 @@ class StateManagerHelperTest { assertEquals(TEST_KEY, state!!.key) assertArrayEquals(serialized(mediatorState), state.value) assertEquals(VERSION_INITIAL_VALUE, state.version) - assertEquals(newState.metadata, state.metadata) + assertEquals(Metadata(mapOf(STATE_TYPE to StateType::class.java.name)), state.metadata) } @Test @@ -76,7 +76,7 @@ class StateManagerHelperTest { TEST_KEY, serialized(TEST_STATE_VALUE), stateVersion, - Metadata() + Metadata(mapOf(STATE_TYPE to StateType::class.java.simpleName)) ) val updatedState = StateAndEventProcessor.State( StateType(TEST_STATE_VALUE.id + 1), @@ -96,7 +96,7 @@ class StateManagerHelperTest { assertEquals(persistedState.key, state!!.key) assertArrayEquals(serialized(MediatorState(ByteBuffer.wrap(serialized(updatedState.value!!)), emptyList())), state.value) assertEquals(persistedState.version, state.version) - assertEquals(updatedState.metadata, state.metadata) + assertEquals(Metadata(mapOf(STATE_TYPE to StateType::class.java.name)), state.metadata) } @Test diff --git a/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt b/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt index 70746c6eb33..45ba23027f1 100644 --- a/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt +++ b/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt @@ -1,5 +1,12 @@ package net.corda.libs.statemanager.api +/** + * Metadata key used to store the actual State Type, if relevant. + * + * TODO-[CORE-16416]: remove once Isolated State Managers per State Type has been implemented. + */ +const val STATE_TYPE = "state.type" + /** * Map that allows only primitive types to be used as values. */ @@ -15,10 +22,12 @@ class Metadata( Boolean::class.java, java.lang.Boolean::class.java, ) + private fun isPrimitiveOrBoxedValue(value: Any): Boolean { return supportedType.any { it.isAssignableFrom(value.javaClass) } } } + init { map.filter { kvp -> !isPrimitiveOrBoxedValue(kvp.value) }.takeIf { it.isNotEmpty() }?.also { kvp -> val invalidPairs = kvp.entries.joinToString { "${it.key}/${it.value::class.java.name}" } @@ -40,6 +49,8 @@ class Metadata( override fun hashCode(): Int { return map.hashCode() } + + fun containsKeyWithValue(key: String, value: Any) = map.containsKey(key) && map[key]!! == value } fun metadata(): Metadata = Metadata()