Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-18539: Improve Flow Clean Up #5399

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ interface FlowMaintenanceHandlersFactory {
* Create a handler for scheduled task triggers handling session timeout.
*
* @param stateManager The state manager the handler should use to retrieve states.
* @param config The flow configuration.
* @return A session timeout task processor.
*/
fun createScheduledTaskHandler(stateManager: StateManager): SessionTimeoutTaskProcessor
fun createScheduledTaskHandler(stateManager: StateManager, config: SmartConfig): FlowTimeoutTaskProcessor

/**
* Create a handler for session timeout events.
Expand All @@ -24,4 +25,4 @@ interface FlowMaintenanceHandlersFactory {
* @return A timeout event handler.
*/
fun createTimeoutEventHandler(stateManager: StateManager, config: SmartConfig): TimeoutEventCleanupProcessor
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class FlowMaintenanceHandlersFactoryImpl @Activate constructor(

private val checkpointDeserializer = avroSerializationFactory.createAvroDeserializer({}, Checkpoint::class.java)

override fun createScheduledTaskHandler(stateManager: StateManager): SessionTimeoutTaskProcessor {
return SessionTimeoutTaskProcessor(stateManager)
override fun createScheduledTaskHandler(stateManager: StateManager, config: SmartConfig): FlowTimeoutTaskProcessor {
return FlowTimeoutTaskProcessor(stateManager, config)
}

override fun createTimeoutEventHandler(
Expand All @@ -40,4 +40,4 @@ class FlowMaintenanceHandlersFactoryImpl @Activate constructor(
config
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class FlowMaintenanceImpl @Activate constructor(
"flow.maintenance.tasks",
Schemas.ScheduledTask.SCHEDULED_TASK_TOPIC_FLOW_PROCESSOR
),
flowMaintenanceHandlersFactory.createScheduledTaskHandler(stateManager!!),
flowMaintenanceHandlersFactory.createScheduledTaskHandler(stateManager!!, flowConfig),
messagingConfig,
null
)
Expand Down Expand Up @@ -94,8 +94,8 @@ class FlowMaintenanceImpl @Activate constructor(
config: Map<String, SmartConfig>,
messagingConfig: SmartConfig
) = config.getConfig(ConfigKeys.FLOW_CONFIG)
.withValue(MAX_ALLOWED_MSG_SIZE, ConfigValueFactory.fromAnyRef(messagingConfig.getLong(MAX_ALLOWED_MSG_SIZE)))
.withValue(PROCESSOR_TIMEOUT, ConfigValueFactory.fromAnyRef(messagingConfig.getLong(PROCESSOR_TIMEOUT)))
.withValue(MAX_ALLOWED_MSG_SIZE, ConfigValueFactory.fromAnyRef(messagingConfig.getLong(MAX_ALLOWED_MSG_SIZE)))

override val isRunning: Boolean
get() = coordinator.isRunning
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package net.corda.flow.maintenance

import net.corda.data.flow.FlowTimeout
import net.corda.data.scheduler.ScheduledTaskTrigger
import net.corda.flow.state.impl.CheckpointMetadataKeys.STATE_META_SESSION_EXPIRY_KEY
import net.corda.libs.configuration.SmartConfig
import net.corda.libs.statemanager.api.IntervalFilter
import net.corda.libs.statemanager.api.MetadataFilter
import net.corda.libs.statemanager.api.Operation
import net.corda.libs.statemanager.api.StateManager
import net.corda.messaging.api.constants.MessagingMetadataKeys
import net.corda.messaging.api.processor.DurableProcessor
import net.corda.messaging.api.records.Record
import net.corda.schema.Schemas.Flow.FLOW_TIMEOUT_TOPIC
import net.corda.schema.Schemas.ScheduledTask
import net.corda.schema.configuration.FlowConfig
import net.corda.utilities.debug
import org.slf4j.LoggerFactory
import java.time.Instant

/**
* Automatically scheduled by Corda for cleaning up timed out flows. A flow is timed out if and only if any of the
* following conditions is true:
* - Flow has at least one opened session that timed out.
* - Flow hasn't been updated within the configured maximum idle time.
* - Flow processing marked as failed by the messaging layer (key [MessagingMetadataKeys.PROCESSING_FAILURE] set).
*/
class FlowTimeoutTaskProcessor(
private val stateManager: StateManager,
config: SmartConfig,
private val now: () -> Instant = Instant::now
) : DurableProcessor<String, ScheduledTaskTrigger> {
companion object {
private val logger = LoggerFactory.getLogger(FlowTimeoutTaskProcessor::class.java)
}

override val keyClass = String::class.java
override val valueClass = ScheduledTaskTrigger::class.java
private val maxIdleTimeMilliseconds = config.getLong(FlowConfig.PROCESSING_MAX_IDLE_TIME)

override fun onNext(events: List<Record<String, ScheduledTaskTrigger>>): List<Record<*, *>> {
// If we receive multiple, there's probably an issue somewhere, and we can ignore all but the last one.
return events.lastOrNull {
it.key == ScheduledTask.SCHEDULED_TASK_NAME_SESSION_TIMEOUT
}?.value?.let { trigger ->
logger.trace("Processing trigger scheduled at {}", trigger.timestamp)
// TODO - temporary query
// TODO - we must be able to limit by type of state
jujoramos marked this conversation as resolved.
Show resolved Hide resolved
val flowsToTimeOut =
// Flows timed out by the messaging layer + sessions timed out
stateManager.findByMetadataMatchingAny(
listOf(
// Time out signaled by the messaging layer
MetadataFilter(MessagingMetadataKeys.PROCESSING_FAILURE, Operation.Equals, true),
// Session expired
MetadataFilter(STATE_META_SESSION_EXPIRY_KEY, Operation.LesserThan, now().epochSecond),
)
) +
// Flows that have not been updated in at least [maxIdleTime] seconds
stateManager.updatedBetween(
IntervalFilter(
Instant.MIN,
now().minusMillis(maxIdleTimeMilliseconds)
)
)

if (flowsToTimeOut.isEmpty()) {
logger.trace("No flows to time out")
emptyList()
} else {
logger.debug { "Trigger cleanup of $flowsToTimeOut" }
flowsToTimeOut.map { kvp ->
Record(FLOW_TIMEOUT_TOPIC, kvp.key, FlowTimeout(kvp.value.key, now()))
}
}
} ?: emptyList()
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ class TimeoutEventCleanupProcessor(
}.toMap()
if (statesToRecords.size < events.size) {
logger.info(
"Could not process ${events.size - statesToRecords.size} events for flow session timeout cleanup as the " +
"checkpoint did not deserialize cleanly."
"Could not process ${events.size - statesToRecords.size} events for flow timeout cleanup as the " +
"checkpoint did not deserialize cleanly."
)
}
val undeletedStates = stateManager.delete(statesToRecords.keys)
if (undeletedStates.isNotEmpty()) {
logger.info("Failed to delete ${undeletedStates.size} checkpoints when handling flow session timeout.")
logger.info("Failed to delete ${undeletedStates.size} checkpoints when handling flow timeout.")
}
val records = statesToRecords.filterKeys { !undeletedStates.containsKey(it.key) }.map {
it.value
Expand All @@ -60,4 +60,4 @@ class TimeoutEventCleanupProcessor(

override val keyClass = String::class.java
override val valueClass = FlowTimeout::class.java
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import net.corda.messaging.api.subscription.Subscription
import net.corda.messaging.api.subscription.factory.SubscriptionFactory
import net.corda.schema.Schemas
import net.corda.schema.configuration.ConfigKeys
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.mockito.internal.verification.Times
import org.mockito.kotlin.any
Expand All @@ -37,41 +38,42 @@ class FlowMaintenanceImplTests {
private val stateManagerFactory = mock<StateManagerFactory> {
on { create(any()) } doReturn (stateManager)
}

private val subscription = mock<Subscription<String, ScheduledTaskTrigger>>()
private val timeoutSubscription = mock<Subscription<String, FlowTimeout>>()
private val scheduledTaskSubscription = mock<Subscription<String, ScheduledTaskTrigger>>()

private val lifecycleCoordinator = mock<LifecycleCoordinator> {
val captor = argumentCaptor<() -> Resource>()
on { createManagedResource(any(), captor.capture()) } doAnswer { captor.lastValue.invoke() }
}
private val lifecycleCoordinatorFactory = mock<LifecycleCoordinatorFactory> {
on { createCoordinator(any(), any()) } doReturn (lifecycleCoordinator)
}

private val subscriptionFactory = mock<SubscriptionFactory> {
on { createDurableSubscription(any(), any<SessionTimeoutTaskProcessor>(), any(), anyOrNull()) } doReturn(subscription)
on { createDurableSubscription(any(), any<TimeoutEventCleanupProcessor>(), any(), anyOrNull()) } doReturn (timeoutSubscription)
on {
createDurableSubscription(any(), any<FlowTimeoutTaskProcessor>(), any(), anyOrNull())
} doReturn (scheduledTaskSubscription)
on {
createDurableSubscription(any(), any<TimeoutEventCleanupProcessor>(), any(), anyOrNull())
} doReturn (timeoutSubscription)
}

private val messagingConfig = mock<SmartConfig>().apply {
whenever(getLong(any())).thenReturn(100L)
}
private val stateManagerConfig = mock<SmartConfig>()
private val flowConfig = mock<SmartConfig>().apply {
whenever(getLong(any())).thenReturn(10L)
whenever(withValue(any(), any())).thenReturn(this)
}

private val config = mapOf(
ConfigKeys.FLOW_CONFIG to flowConfig,
ConfigKeys.MESSAGING_CONFIG to messagingConfig,
ConfigKeys.STATE_MANAGER_CONFIG to stateManagerConfig,
ConfigKeys.FLOW_CONFIG to flowConfig
)

private val flowMaintenanceHandlersFactory = mock<FlowMaintenanceHandlersFactory> {
on { createScheduledTaskHandler(any()) } doReturn (SessionTimeoutTaskProcessor(stateManager))
on { createTimeoutEventHandler(any(), any()) } doReturn (
TimeoutEventCleanupProcessor(mock(), stateManager, mock(), mock(), flowConfig)
)
}
private val flowMaintenanceHandlersFactory = mock<FlowMaintenanceHandlersFactory>()

private val flowMaintenance = FlowMaintenanceImpl(
lifecycleCoordinatorFactory,
Expand All @@ -80,6 +82,15 @@ class FlowMaintenanceImplTests {
flowMaintenanceHandlersFactory
)

@BeforeEach
fun setUp() {
doReturn(FlowTimeoutTaskProcessor(stateManager, flowConfig))
.whenever(flowMaintenanceHandlersFactory).createScheduledTaskHandler(any(), any())

doReturn(TimeoutEventCleanupProcessor(mock(), stateManager, mock(), mock(), flowConfig))
.whenever(flowMaintenanceHandlersFactory).createTimeoutEventHandler(any(), any())
}

@Test
fun `when config provided create subscription and start it`() {
flowMaintenance.onConfigChange(config)
Expand All @@ -89,7 +100,7 @@ class FlowMaintenanceImplTests {
argThat { it ->
it.eventTopic == Schemas.ScheduledTask.SCHEDULED_TASK_TOPIC_FLOW_PROCESSOR
},
any<SessionTimeoutTaskProcessor>(),
any<FlowTimeoutTaskProcessor>(),
eq(messagingConfig),
isNull()
)
Expand All @@ -103,7 +114,7 @@ class FlowMaintenanceImplTests {
)
verify(stateManagerFactory).create(stateManagerConfig)
verify(stateManager).start()
verify(subscription).start()
verify(scheduledTaskSubscription).start()
verify(timeoutSubscription).start()
verify(lifecycleCoordinator).followStatusChangesByName(setOf(stateManager.name))
}
Expand Down
Loading