Skip to content

Commit

Permalink
CORE-19295: Clean Up Checkpoints Only (#5439)
Browse files Browse the repository at this point in the history
Fix "FlowTimeoutTaskProcessor" implementation so that only states
representing flow checkpoints are marked for deletion. Other types of
states should not be automatically deleted as that might cause issues,
especially when the persistent storage is shared  to store multiple
state types.
  • Loading branch information
jujoramos authored Jan 16, 2024
1 parent f86ee1c commit 4955c5f
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import net.corda.data.crypto.wire.ops.key.rotation.KeyRotationRequest
import net.corda.data.crypto.wire.ops.key.rotation.KeyRotationStatus
import net.corda.data.crypto.wire.ops.key.rotation.KeyType
import net.corda.libs.statemanager.api.Metadata
import net.corda.libs.statemanager.api.STATE_TYPE
import net.corda.libs.statemanager.api.State
import net.corda.libs.statemanager.api.StateManager
import net.corda.messaging.api.processor.DurableProcessor
Expand Down Expand Up @@ -107,8 +108,16 @@ class CryptoRekeyBusProcessor(
now
)

val flattend = checkNotNull(serializer.serialize(status))
stateManager?.create(listOf(State(request.requestId, flattend, 1, Metadata(), now)))
val flattened = checkNotNull(serializer.serialize(status))
stateManager?.create(
listOf(
State(
request.requestId,
flattened, 1,
Metadata(mapOf(STATE_TYPE to status::class.java.name))
)
)
)
}

return emptyList()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package net.corda.flow.maintenance

import net.corda.data.flow.FlowTimeout
import net.corda.data.flow.state.checkpoint.Checkpoint
import net.corda.data.scheduler.ScheduledTaskTrigger
import net.corda.flow.state.impl.CheckpointMetadataKeys.STATE_META_SESSION_EXPIRY_KEY
import net.corda.libs.configuration.SmartConfig
import net.corda.libs.statemanager.api.IntervalFilter
import net.corda.libs.statemanager.api.MetadataFilter
import net.corda.libs.statemanager.api.Operation
import net.corda.libs.statemanager.api.STATE_TYPE
import net.corda.libs.statemanager.api.StateManager
import net.corda.messaging.api.constants.MessagingMetadataKeys
import net.corda.messaging.api.constants.MessagingMetadataKeys.PROCESSING_FAILURE
import net.corda.messaging.api.processor.DurableProcessor
import net.corda.messaging.api.records.Record
import net.corda.schema.Schemas.Flow.FLOW_TIMEOUT_TOPIC
Expand All @@ -19,11 +21,13 @@ import org.slf4j.LoggerFactory
import java.time.Instant

/**
* Automatically scheduled by Corda for cleaning up timed out flows. A flow is timed out if and only if any of the
* following conditions is true:
* Automatically scheduled by Corda for cleaning up timed out flows.
* A flow will be picked up to be timed out if and only if any of the following conditions is true:
* - Flow has at least one opened session that timed out.
* - Flow hasn't been updated within the configured maximum idle time.
* - Flow processing marked as failed by the messaging layer (key [MessagingMetadataKeys.PROCESSING_FAILURE] set).
*
* TODO - Execute a single State Manager API call once all filters are supported at once.
*/
class FlowTimeoutTaskProcessor(
private val stateManager: StateManager,
Expand All @@ -38,31 +42,38 @@ class FlowTimeoutTaskProcessor(
override val valueClass = ScheduledTaskTrigger::class.java
private val maxIdleTimeMilliseconds = config.getLong(FlowConfig.PROCESSING_MAX_IDLE_TIME)

private fun idleTimeOutExpired() =
// Flows that have not been updated in at least [maxIdleTimeMilliseconds]
stateManager.findUpdatedBetweenWithMetadataMatchingAll(
IntervalFilter(
Instant.EPOCH,
now().minusMillis(maxIdleTimeMilliseconds)
),
listOf(
MetadataFilter(STATE_TYPE, Operation.Equals, Checkpoint::class.java.name),
)
)

private fun sessionExpiredFailureSignaledByMessagingLayer() =
// Flows timed out by the messaging layer + sessions timed out
stateManager.findByMetadataMatchingAny(
listOf(
// Time out signaled by the messaging layer
MetadataFilter(PROCESSING_FAILURE, Operation.Equals, true),
// Session expired
MetadataFilter(STATE_META_SESSION_EXPIRY_KEY, Operation.LesserThan, now().epochSecond),
)
)

override fun onNext(events: List<Record<String, ScheduledTaskTrigger>>): List<Record<*, *>> {
// If we receive multiple, there's probably an issue somewhere, and we can ignore all but the last one.
return events.lastOrNull {
it.key == ScheduledTask.SCHEDULED_TASK_NAME_SESSION_TIMEOUT
}?.value?.let { trigger ->
logger.trace("Processing trigger scheduled at {}", trigger.timestamp)
// TODO - temporary query
// TODO - we must be able to limit by type of state
val flowsToTimeOut =
// Flows timed out by the messaging layer + sessions timed out
stateManager.findByMetadataMatchingAny(
listOf(
// Time out signaled by the messaging layer
MetadataFilter(MessagingMetadataKeys.PROCESSING_FAILURE, Operation.Equals, true),
// Session expired
MetadataFilter(STATE_META_SESSION_EXPIRY_KEY, Operation.LesserThan, now().epochSecond),
)
) +
// Flows that have not been updated in at least [maxIdleTime] seconds
stateManager.updatedBetween(
IntervalFilter(
Instant.EPOCH,
now().minusMillis(maxIdleTimeMilliseconds)
)
)
val flowsToTimeOut = (idleTimeOutExpired() + sessionExpiredFailureSignaledByMessagingLayer()).filter {
it.value.metadata.containsKeyWithValue(STATE_TYPE, Checkpoint::class.java.name)
}

if (flowsToTimeOut.isEmpty()) {
logger.trace("No flows to time out")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package net.corda.flow.maintenance

import net.corda.data.flow.FlowTimeout
import net.corda.data.flow.state.checkpoint.Checkpoint
import net.corda.data.scheduler.ScheduledTaskTrigger
import net.corda.flow.state.impl.CheckpointMetadataKeys.STATE_META_SESSION_EXPIRY_KEY
import net.corda.libs.configuration.SmartConfig
import net.corda.libs.statemanager.api.Metadata
import net.corda.libs.statemanager.api.STATE_TYPE
import net.corda.libs.statemanager.api.State
import net.corda.libs.statemanager.api.StateManager
import net.corda.messaging.api.constants.MessagingMetadataKeys
Expand All @@ -23,27 +25,42 @@ import java.time.Instant

class FlowTimeoutTaskProcessorTests {
private val now = Instant.now()
private val checkpointMetadata = STATE_TYPE to Checkpoint::class.java.name
private val flowConfig = mock<SmartConfig>().apply {
whenever(getLong(any())).thenReturn(10L)
}

private val idleState = State("idle", randomBytes(), 0)
private val nonCheckpointState = State("not-a-checkpoint", randomBytes(), 0)
private val idleState = State("idle", randomBytes(), 0, Metadata(mapOf(checkpointMetadata)))
private val sessionTimeoutState =
State(
"sessionTimeout", randomBytes(), 0,
Metadata(mapOf(STATE_META_SESSION_EXPIRY_KEY to now.minusSeconds(1).epochSecond.toInt()))
Metadata(
mapOf(
checkpointMetadata,
STATE_META_SESSION_EXPIRY_KEY to now.minusSeconds(1).epochSecond.toInt()
)
)
)
private val messagingLayerTimeoutState =
State(
"messagingLayerTimeout", randomBytes(), 0,
Metadata(mapOf(MessagingMetadataKeys.PROCESSING_FAILURE to true))
Metadata(
mapOf(
checkpointMetadata,
MessagingMetadataKeys.PROCESSING_FAILURE to true
)
)
)
private val stateManager = mock<StateManager> {
on { updatedBetween(any()) } doReturn (mapOf(idleState.key to idleState))
on { findByMetadataMatchingAny(any()) } doReturn (mapOf(
sessionTimeoutState.key to sessionTimeoutState,
messagingLayerTimeoutState.key to messagingLayerTimeoutState
))
on { findUpdatedBetweenWithMetadataMatchingAll(any(), any()) } doReturn (mapOf(
idleState.key to idleState,
nonCheckpointState.key to nonCheckpointState
))
}
private val record1 = Record<String, ScheduledTaskTrigger>(
Schemas.ScheduledTask.SCHEDULED_TASK_NAME_SESSION_TIMEOUT,
Expand Down Expand Up @@ -96,8 +113,12 @@ class FlowTimeoutTaskProcessorTests {

@Test
fun `when no states are found return empty list of clean up records`() {
whenever(stateManager.updatedBetween(any())).doReturn(emptyMap())
whenever(stateManager.findByMetadataMatchingAny(any())).doReturn(emptyMap())
whenever(
stateManager.findByMetadataMatchingAny(any())
).doReturn(mapOf(nonCheckpointState.key to nonCheckpointState))
whenever(
stateManager.findUpdatedBetweenWithMetadataMatchingAll(any(), any())
).doReturn(mapOf(nonCheckpointState.key to nonCheckpointState))

val output = processor.onNext(listOf(record1))
assertThat(output).isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package net.corda.ledger.utxo.token.cache.services

import net.corda.data.ledger.utxo.token.selection.state.TokenPoolCacheState
import net.corda.ledger.utxo.token.cache.entities.TokenPoolKey
import net.corda.libs.statemanager.api.Metadata
import net.corda.libs.statemanager.api.STATE_TYPE
import net.corda.libs.statemanager.api.State
import net.corda.libs.statemanager.api.StateManager
import net.corda.tracing.wrapWithTracingExecutor
Expand Down Expand Up @@ -156,6 +158,7 @@ class PerformanceClaimStateStoreImpl(
val newStoredState = State(
key = tokenPoolKey.toString(),
value = stateBytes,
metadata = Metadata(mapOf(STATE_TYPE to tokenPoolCacheState::class.java.name)),
modifiedTime = clock.instant()
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import net.corda.avro.serialization.CordaAvroDeserializer
import net.corda.avro.serialization.CordaAvroSerializer
import net.corda.data.messaging.mediator.MediatorState
import net.corda.libs.statemanager.api.Metadata
import net.corda.libs.statemanager.api.STATE_TYPE
import net.corda.libs.statemanager.api.State
import net.corda.libs.statemanager.api.StateManager
import net.corda.libs.statemanager.api.metadata
Expand Down Expand Up @@ -37,11 +38,12 @@ class StateManagerHelper<S : Any>(
mediatorState.state = ByteBuffer.wrap(serializedValue)
val mediatorStateBytes = serializer.serialize(mediatorState)
?: throw IllegalStateException("Serialized mediator state was null. This should not be possible!")
val stateType = newState!!.value!!::class.java.name
State(
key,
mediatorStateBytes,
persistedState?.version ?: State.VERSION_INITIAL_VALUE,
mergeMetadata(persistedState?.metadata, newState?.metadata),
mergeMetadata(persistedState?.metadata, newState.metadata, stateType),
)
}

Expand All @@ -57,11 +59,11 @@ class StateManagerHelper<S : Any>(
)
}

private fun mergeMetadata(existing: Metadata?, newMetadata: Metadata?): Metadata {
private fun mergeMetadata(existing: Metadata?, newMetadata: Metadata?, stateType: String): Metadata {
val map = (existing ?: metadata()).toMutableMap()
newMetadata?.forEach {
map[it.key] = it.value
}
newMetadata?.forEach { map[it.key] = it.value }
map[STATE_TYPE] = stateType

return Metadata(map)
}

Expand All @@ -77,7 +79,7 @@ class StateManagerHelper<S : Any>(
/**
* Deserializes state value.
*
* @param state State.
* @param mediatorState State.
* @return Deserialized state value.
*/
fun deserializeValue(mediatorState: MediatorState?) =
Expand All @@ -92,4 +94,4 @@ class StateManagerHelper<S : Any>(
*/
fun deserializeMediatorState(state: State?) =
state?.value?.let { mediatorStateDeserializer.deserialize(it) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import net.corda.avro.serialization.CordaAvroDeserializer
import net.corda.avro.serialization.CordaAvroSerializer
import net.corda.data.messaging.mediator.MediatorState
import net.corda.libs.statemanager.api.Metadata
import net.corda.libs.statemanager.api.STATE_TYPE
import net.corda.libs.statemanager.api.State
import net.corda.libs.statemanager.api.State.Companion.VERSION_INITIAL_VALUE
import net.corda.messaging.api.constants.MessagingMetadataKeys.PROCESSING_FAILURE
Expand Down Expand Up @@ -46,7 +47,6 @@ class StateManagerHelperTest {

@Test
fun `successfully creates new state`() {

val persistedState: State? = null
val newState = StateAndEventProcessor.State(
StateType(1),
Expand All @@ -66,7 +66,7 @@ class StateManagerHelperTest {
assertEquals(TEST_KEY, state!!.key)
assertArrayEquals(serialized(mediatorState), state.value)
assertEquals(VERSION_INITIAL_VALUE, state.version)
assertEquals(newState.metadata, state.metadata)
assertEquals(Metadata(mapOf(STATE_TYPE to StateType::class.java.name)), state.metadata)
}

@Test
Expand All @@ -76,7 +76,7 @@ class StateManagerHelperTest {
TEST_KEY,
serialized(TEST_STATE_VALUE),
stateVersion,
Metadata()
Metadata(mapOf(STATE_TYPE to StateType::class.java.simpleName))
)
val updatedState = StateAndEventProcessor.State(
StateType(TEST_STATE_VALUE.id + 1),
Expand All @@ -96,7 +96,7 @@ class StateManagerHelperTest {
assertEquals(persistedState.key, state!!.key)
assertArrayEquals(serialized(MediatorState(ByteBuffer.wrap(serialized(updatedState.value!!)), emptyList())), state.value)
assertEquals(persistedState.version, state.version)
assertEquals(updatedState.metadata, state.metadata)
assertEquals(Metadata(mapOf(STATE_TYPE to StateType::class.java.name)), state.metadata)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package net.corda.libs.statemanager.api

/**
* Metadata key used to store the actual State Type, if relevant.
*
* TODO-[CORE-16416]: remove once Isolated State Managers per State Type has been implemented.
*/
const val STATE_TYPE = "state.type"

/**
* Map that allows only primitive types to be used as values.
*/
Expand All @@ -15,10 +22,12 @@ class Metadata(
Boolean::class.java,
java.lang.Boolean::class.java,
)

private fun isPrimitiveOrBoxedValue(value: Any): Boolean {
return supportedType.any { it.isAssignableFrom(value.javaClass) }
}
}

init {
map.filter { kvp -> !isPrimitiveOrBoxedValue(kvp.value) }.takeIf { it.isNotEmpty() }?.also { kvp ->
val invalidPairs = kvp.entries.joinToString { "${it.key}/${it.value::class.java.name}" }
Expand All @@ -40,6 +49,8 @@ class Metadata(
override fun hashCode(): Int {
return map.hashCode()
}

fun containsKeyWithValue(key: String, value: Any) = map.containsKey(key) && map[key]!! == value
}

fun metadata(): Metadata = Metadata()
Expand Down

0 comments on commit 4955c5f

Please sign in to comment.