Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-19020 replay scenario handling #5440

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c71c85b
CORE-19024 Add a new state manager api that allows for a state to be …
LWogan Jan 8, 2024
8d0304e
CORE-19024 detekt
LWogan Jan 8, 2024
380d9ba
Merge remote-tracking branch 'origin/release/os/5.2' into CORE-19024/…
LWogan Jan 8, 2024
50d0e87
CORE-19024 detekt
LWogan Jan 8, 2024
4ddcc82
CORE-19024 add additional stubs
LWogan Jan 8, 2024
5394a3d
Merge remote-tracking branch 'origin/release/os/5.2' into CORE-19024/…
LWogan Jan 8, 2024
0cbc168
CORE-19016 store async outputs
LWogan Jan 9, 2024
4d95306
CORE-19282 wrap the mediator subscriptions client state in a Mediator…
LWogan Jan 11, 2024
f8e8673
CORE-19282 detekt
LWogan Jan 11, 2024
f551f29
CORE-19282 typo
LWogan Jan 11, 2024
8830efc
CORE-19282 update api version
LWogan Jan 11, 2024
0e2c208
CORE-19282 fix unit tests
LWogan Jan 11, 2024
17e42af
CORE-19016 Store output events in the MediatorState
LWogan Jan 12, 2024
e9797b0
Merge branch 'CORE-19016/store-async-outputs' of https://github.com/c…
LWogan Jan 12, 2024
543f6d7
Merge remote-tracking branch 'origin/release/os/5.2' into CORE-19282/…
LWogan Jan 12, 2024
3a69978
CORE-19016 merge release
LWogan Jan 12, 2024
9fa4070
Merge branch 'CORE-19282/mediator-wrapper-state' into CORE-19016/stor…
LWogan Jan 12, 2024
25278b9
CORE-19016 merge release
LWogan Jan 12, 2024
79e12af
CORE-19016 merge release
LWogan Jan 12, 2024
540e927
CORE-19016 detekt
LWogan Jan 12, 2024
b939724
Merge branch 'CORE-19282/mediator-wrapper-state' into CORE-19016/stor…
LWogan Jan 12, 2024
f3b6e96
CORE-19016 detekt
LWogan Jan 12, 2024
ba1339c
CORE-19016 revert kdoc
LWogan Jan 12, 2024
fad15e5
CORE-19016 adjust types to be immutable and add unit tests
LWogan Jan 15, 2024
0eed229
Merge remote-tracking branch 'origin/release/os/5.2' into CORE-19282/…
LWogan Jan 15, 2024
0648db0
CORE-19282 merge from release
LWogan Jan 15, 2024
9bac482
Merge branch 'CORE-19282/mediator-wrapper-state' into CORE-19016/stor…
LWogan Jan 15, 2024
d974fd4
CORE-19017 detect when an input record is a replayed event based on d…
LWogan Jan 15, 2024
0f7d7ce
CORE-19017 fix hash function
LWogan Jan 15, 2024
e850c98
Merge branch 'CORE-19016/store-async-outputs' into CORE-19017/detect-…
LWogan Jan 15, 2024
012dbe9
CORE-19017 add unit tests
LWogan Jan 15, 2024
606f9b7
CORE-19017 detekt
LWogan Jan 15, 2024
593190b
Merge branch 'CORE-19016/store-async-outputs' into CORE-19017/detect-…
LWogan Jan 15, 2024
b8563eb
CORE-19017 detekt
LWogan Jan 15, 2024
06c5015
CORE-19017 update API to be more suitable
LWogan Jan 15, 2024
baca7a9
CORE-19017 typo.
LWogan Jan 15, 2024
82974d2
Merge branch 'CORE-19282/mediator-wrapper-state' into CORE-19016/stor…
LWogan Jan 15, 2024
c2507ab
Merge branch 'CORE-19016/store-async-outputs' into CORE-19017/detect-…
LWogan Jan 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ commonsLangVersion = 3.12.0
commonsTextVersion = 1.10.0
# Corda API libs revision (change in 4th digit indicates a breaking change)
# Change to 5.2.0.xx-SNAPSHOT to pick up maven local published copy
cordaApiVersion=5.2.0.27-beta+
cordaApiVersion=5.2.0.28-alpha-1705311495943

disruptorVersion=3.4.4
felixConfigAdminVersion=1.9.26
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicBoolean
* process records.
* @property running When set to true, the mediator pattern has been started. When False the mediator pattern is closed or errorred.
*/
data class MediatorState (
data class MediatorSubscriptionState (
private val stopped: AtomicBoolean = AtomicBoolean(false),
val running: AtomicBoolean = AtomicBoolean(false)
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class MultiSourceEventMediatorImpl<K : Any, S : Any, E : Any>(
get() = lifecycleCoordinator.name

private val log = LoggerFactory.getLogger("${this.javaClass.name}-${config.name}")
private val mediatorState = MediatorState()
private val mediatorSubscriptionState = MediatorSubscriptionState()
private val consumerConfig = MediatorConsumerConfig(
config.messageProcessor.keyClass,
config.messageProcessor.eventValueClass,
Expand All @@ -38,15 +38,16 @@ class MultiSourceEventMediatorImpl<K : Any, S : Any, E : Any>(
}

private fun runMediator() {
mediatorState.running.set(true)
mediatorSubscriptionState.running.set(true)
val clients = mediatorComponentFactory.createClients(::onSerializationError)
val messageRouter = mediatorComponentFactory.createRouter(clients)

lifecycleCoordinator.updateStatus(LifecycleStatus.UP)

config.consumerFactories.map { consumerFactory ->
taskManager.executeLongRunningTask {
val consumerProcessor = mediatorComponentFactory.createConsumerProcessor(config, taskManager, messageRouter, mediatorState)
val consumerProcessor =
mediatorComponentFactory.createConsumerProcessor(config, taskManager, messageRouter, mediatorSubscriptionState)
consumerProcessor.processTopic(consumerFactory, consumerConfig)
}.exceptionally { exception ->
handleTaskException(exception)
Expand All @@ -56,20 +57,20 @@ class MultiSourceEventMediatorImpl<K : Any, S : Any, E : Any>(
}

clients.forEach { it.close() }
mediatorState.running.set(false)
mediatorSubscriptionState.running.set(false)
}

override fun close() {
log.debug("Closing multi-source event mediator")
mediatorState.stop()
while (mediatorState.running()) {
mediatorSubscriptionState.stop()
while (mediatorSubscriptionState.running()) {
sleep(100)
}
lifecycleCoordinator.close()
}

private fun handleTaskException(exception: Throwable): Unit {
mediatorState.stop()
mediatorSubscriptionState.stop()
lifecycleCoordinator.updateStatus(LifecycleStatus.ERROR, "Error: ${exception.message}")
log.error("${exception.message}. Closing Multi-Source Event Mediator.", exception)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,44 @@ package net.corda.messaging.mediator

import net.corda.avro.serialization.CordaAvroDeserializer
import net.corda.avro.serialization.CordaAvroSerializer
import net.corda.data.messaging.mediator.MediatorState
import net.corda.libs.statemanager.api.Metadata
import net.corda.libs.statemanager.api.State
import net.corda.libs.statemanager.api.StateManager
import net.corda.libs.statemanager.api.metadata
import net.corda.messaging.api.constants.MessagingMetadataKeys.PROCESSING_FAILURE
import net.corda.messaging.api.processor.StateAndEventProcessor
import java.nio.ByteBuffer

/**
* Helper for working with [StateManager], used by [MultiSourceEventMediatorImpl].
*/
class StateManagerHelper<S : Any>(
private val stateSerializer: CordaAvroSerializer<S>,
private val serializer: CordaAvroSerializer<Any>,
private val stateDeserializer: CordaAvroDeserializer<S>,
private val mediatorStateDeserializer: CordaAvroDeserializer<MediatorState>,
) {

/**
* Creates an updated [State] or a new one if there was no previous version.
*
* @param key Event's key.
* @param mediatorState Mediator wrapper state.
* @param persistedState State being updated.
* @param newState Updated state.
*/
fun createOrUpdateState(
key: String,
persistedState: State?,
mediatorState: MediatorState,
newState: StateAndEventProcessor.State<S>?,
) = serialize(newState?.value)?.let { serializedValue ->
mediatorState.state = ByteBuffer.wrap(serializedValue)
val mediatorStateBytes = serializer.serialize(mediatorState)
?: throw IllegalStateException("Serialized mediator state was null. This should not be possible!")
State(
key,
serializedValue,
mediatorStateBytes,
persistedState?.version ?: State.VERSION_INITIAL_VALUE,
mergeMetadata(persistedState?.metadata, newState?.metadata),
)
Expand Down Expand Up @@ -63,15 +71,25 @@ class StateManagerHelper<S : Any>(
* @param value State value.
* @return Serialized state value.
*/
private fun serialize(value: S?) =
value?.let { stateSerializer.serialize(it) }
private fun serialize(value: Any?) =
value?.let { serializer.serialize(it) }

/**
* Deserializes state value.
*
* @param state State.
* @return Deserialized state value.
*/
fun deserializeValue(state: State?) =
state?.value?.let { stateDeserializer.deserialize(it) }
fun deserializeValue(mediatorState: MediatorState?) =
mediatorState?.state?.let { stateDeserializer.deserialize(it.array()) }


/**
* Deserializes state value into the MediatorState.
*
* @param state State.
* @return Deserialized MediatorState.
*/
fun deserializeMediatorState(state: State?) =
state?.value?.let { mediatorStateDeserializer.deserialize(it) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import net.corda.messaging.api.mediator.factory.MessageRouterFactory
import net.corda.messaging.api.mediator.factory.MessagingClientFactory
import net.corda.messaging.api.processor.StateAndEventProcessor
import net.corda.messaging.mediator.GroupAllocator
import net.corda.messaging.mediator.MediatorState
import net.corda.messaging.mediator.MediatorSubscriptionState
import net.corda.messaging.mediator.StateManagerHelper
import net.corda.messaging.mediator.processor.ConsumerProcessor
import net.corda.messaging.mediator.processor.EventProcessor
import net.corda.messaging.mediator.processor.MediatorReplayService
import net.corda.taskmanager.TaskManager

/**
Expand All @@ -27,7 +28,8 @@ class MediatorComponentFactory<K : Any, S : Any, E : Any>(
private val clientFactories: Collection<MessagingClientFactory>,
private val messageRouterFactory: MessageRouterFactory,
private val groupAllocator: GroupAllocator,
private val stateManagerHelper: StateManagerHelper<S>
private val stateManagerHelper: StateManagerHelper<S>,
private val mediatorReplayService: MediatorReplayService,
) {

/**
Expand Down Expand Up @@ -95,22 +97,22 @@ class MediatorComponentFactory<K : Any, S : Any, E : Any>(
* @param eventMediatorConfig contains details of the mediators config
* @param taskManager used to launch concurrent tasks
* @param messageRouter Required by messaging clients to route records to the correct destination
* @param mediatorState shared state to track the mediators processing status
* @param mediatorSubscriptionState shared state to track the mediators processing status
* @return A consumer processor
*/
fun createConsumerProcessor(
eventMediatorConfig: EventMediatorConfig<K, S, E>,
taskManager: TaskManager,
messageRouter: MessageRouter,
mediatorState: MediatorState,
mediatorSubscriptionState: MediatorSubscriptionState,
): ConsumerProcessor<K, S, E> {
val eventProcessor = EventProcessor(eventMediatorConfig, stateManagerHelper, messageRouter)
val eventProcessor = EventProcessor(eventMediatorConfig, stateManagerHelper, messageRouter, mediatorReplayService)
return ConsumerProcessor(
eventMediatorConfig,
groupAllocator,
taskManager,
messageRouter,
mediatorState,
mediatorSubscriptionState,
eventProcessor,
stateManagerHelper
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package net.corda.messaging.mediator.factory

import net.corda.avro.serialization.CordaAvroSerializationFactory
import net.corda.data.messaging.mediator.MediatorState
import net.corda.lifecycle.LifecycleCoordinator
import net.corda.lifecycle.LifecycleCoordinatorFactory
import net.corda.lifecycle.LifecycleCoordinatorName
Expand All @@ -10,6 +11,7 @@ import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory
import net.corda.messaging.mediator.GroupAllocator
import net.corda.messaging.mediator.MultiSourceEventMediatorImpl
import net.corda.messaging.mediator.StateManagerHelper
import net.corda.messaging.mediator.processor.MediatorReplayService
import net.corda.taskmanager.TaskManagerFactory
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
Expand All @@ -20,15 +22,17 @@ import java.util.UUID
class MultiSourceEventMediatorFactoryImpl(
private val cordaAvroSerializationFactory: CordaAvroSerializationFactory,
private val lifecycleCoordinatorFactory: LifecycleCoordinatorFactory,
private val taskManagerFactory: TaskManagerFactory
private val taskManagerFactory: TaskManagerFactory,
private val mediatorReplayService: MediatorReplayService
) : MultiSourceEventMediatorFactory {

@Activate
constructor(
@Reference(service = CordaAvroSerializationFactory::class) cordaAvroSerializationFactory: CordaAvroSerializationFactory,
@Reference(service = LifecycleCoordinatorFactory::class) lifecycleCoordinatorFactory: LifecycleCoordinatorFactory
@Reference(service = LifecycleCoordinatorFactory::class) lifecycleCoordinatorFactory: LifecycleCoordinatorFactory,
@Reference(service = MediatorReplayService::class) mediatorReplayService: MediatorReplayService
) : this(
cordaAvroSerializationFactory, lifecycleCoordinatorFactory, TaskManagerFactory.INSTANCE
cordaAvroSerializationFactory, lifecycleCoordinatorFactory, TaskManagerFactory.INSTANCE, mediatorReplayService
)

override fun <K : Any, S : Any, E : Any> create(
Expand All @@ -52,7 +56,8 @@ class MultiSourceEventMediatorFactoryImpl(
eventMediatorConfig.clientFactories,
eventMediatorConfig.messageRouterFactory,
GroupAllocator(),
stateManagerHelper
stateManagerHelper,
mediatorReplayService
)

private fun <E : Any, K : Any, S : Any> createLifecycleCoordinator(
Expand All @@ -68,12 +73,15 @@ class MultiSourceEventMediatorFactoryImpl(
private fun <E : Any, K : Any, S : Any> createStateManagerHelper(
eventMediatorConfig: EventMediatorConfig<K, S, E>
): StateManagerHelper<S> {
val stateSerializer = cordaAvroSerializationFactory.createAvroSerializer<S> { }
val stateSerializer = cordaAvroSerializationFactory.createAvroSerializer<Any> { }
val stateDeserializer = cordaAvroSerializationFactory.createAvroDeserializer(
{}, eventMediatorConfig.messageProcessor.stateValueClass
)
val mediatorWrapperDeserializer = cordaAvroSerializationFactory.createAvroDeserializer(
{}, MediatorState::class.java
)
return StateManagerHelper(
stateSerializer, stateDeserializer
stateSerializer, stateDeserializer, mediatorWrapperDeserializer
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import net.corda.messaging.api.mediator.config.MediatorConsumerConfig
import net.corda.messaging.api.mediator.factory.MediatorConsumerFactory
import net.corda.messaging.api.records.Record
import net.corda.messaging.mediator.GroupAllocator
import net.corda.messaging.mediator.MediatorState
import net.corda.messaging.mediator.MediatorSubscriptionState
import net.corda.messaging.mediator.MultiSourceEventMediatorImpl
import net.corda.messaging.mediator.StateManagerHelper
import net.corda.messaging.mediator.metrics.EventMediatorMetrics
Expand Down Expand Up @@ -41,7 +41,7 @@ class ConsumerProcessor<K : Any, S : Any, E : Any>(
private val groupAllocator: GroupAllocator,
private val taskManager: TaskManager,
private val messageRouter: MessageRouter,
private val mediatorState: MediatorState,
private val mediatorSubscriptionState: MediatorSubscriptionState,
private val eventProcessor: EventProcessor<K, S, E>,
private val stateManagerHelper: StateManagerHelper<S>
) {
Expand All @@ -65,7 +65,7 @@ class ConsumerProcessor<K : Any, S : Any, E : Any>(
fun processTopic(consumerFactory: MediatorConsumerFactory, consumerConfig: MediatorConsumerConfig<K, E>) {
var attempts = 0
var consumer: MediatorConsumer<K, E>? = null
while (!mediatorState.stopped()) {
while (!mediatorSubscriptionState.stopped()) {
attempts++
try {
if (consumer == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package net.corda.messaging.mediator.processor

import net.corda.data.messaging.mediator.MediatorState
import net.corda.libs.statemanager.api.State
import net.corda.messaging.api.exception.CordaMessageAPIIntermittentException
import net.corda.messaging.api.mediator.MediatorMessage
Expand All @@ -22,7 +23,8 @@ import net.corda.tracing.addTraceContextToRecord
class EventProcessor<K : Any, S : Any, E : Any>(
private val config: EventMediatorConfig<K, S, E>,
private val stateManagerHelper: StateManagerHelper<S>,
private val messageRouter: MessageRouter
private val messageRouter: MessageRouter,
private val mediatorReplayService: MediatorReplayService,
) {

/**
Expand All @@ -33,31 +35,38 @@ class EventProcessor<K : Any, S : Any, E : Any>(
fun processEvents(
group: Map<K, List<Record<K, E>>>,
retrievedStates: Map<String, State>
) : Map<K, EventProcessingOutput> {
): Map<K, EventProcessingOutput> {
return group.mapValues { groupEntry ->
val groupKey = groupEntry.key.toString()
val state = retrievedStates.getOrDefault(groupKey, null)
var processorState = stateManagerHelper.deserializeValue(state)?.let { stateValue ->
val mediatorState = stateManagerHelper.deserializeMediatorState(state) ?: createNewMediatorState()
var processorState = stateManagerHelper.deserializeValue(mediatorState)?.let { stateValue ->
StateAndEventProcessor.State(
stateValue,
state?.metadata
)
}
val asyncOutputs = mutableListOf<MediatorMessage<Any>>()
val queue = ArrayDeque(groupEntry.value)
val asyncOutputs = mutableMapOf<Record<K, E>, MutableList<MediatorMessage<Any>>>()
val allConsumerInputs = groupEntry.value
val processed = try {
while (queue.isNotEmpty()) {
val event = queue.removeFirst()
val response = config.messageProcessor.onNext(processorState, event)
processorState = response.updatedState
val (syncEvents, asyncEvents) = response.responseEvents.map { convertToMessage(it) }.partition {
messageRouter.getDestination(it).type == RoutingDestination.Type.SYNCHRONOUS
allConsumerInputs.onEach { consumerInputEvent ->
val queue = ArrayDeque(listOf(consumerInputEvent))
while (queue.isNotEmpty()) {
val event = queue.removeFirst()
val response = config.messageProcessor.onNext(processorState, event)
processorState = response.updatedState
val (syncEvents, asyncEvents) = response.responseEvents.map { convertToMessage(it) }.partition {
messageRouter.getDestination(it).type == RoutingDestination.Type.SYNCHRONOUS
}
asyncOutputs.compute(consumerInputEvent) { _, oldValue ->
(oldValue?.plus(asyncEvents) ?: asyncEvents).toMutableList()
}
val returnedMessages = processSyncEvents(groupEntry.key, syncEvents)
queue.addAll(returnedMessages)
}
asyncOutputs.addAll(asyncEvents)
val returnedMessages = processSyncEvents(groupEntry.key, syncEvents)
queue.addAll(returnedMessages)
}
stateManagerHelper.createOrUpdateState(groupKey, state, processorState)
mediatorState.outputEvents = mediatorReplayService.getOutputEvents(mediatorState.outputEvents, asyncOutputs)
stateManagerHelper.createOrUpdateState(groupKey, state, mediatorState, processorState)
} catch (e: CordaMessageAPIIntermittentException) {
// If an intermittent error occurs here, the RPC client has failed to deliver a message to another part
// of the system despite the retry loop implemented there. This should trigger individual processing to
Expand All @@ -72,19 +81,27 @@ class EventProcessor<K : Any, S : Any, E : Any>(
else -> StateChangeAndOperation.Noop
}

EventProcessingOutput(asyncOutputs, stateChangeAndOperation)
EventProcessingOutput(asyncOutputs.values.flatten(), stateChangeAndOperation)
}
}

private fun createNewMediatorState(): MediatorState {
return MediatorState.newBuilder()
.setState(null)
.setOutputEvents(mutableListOf())
.build()
}

/**
* Send any synchronous events immediately and feed results back onto the queue.
*/
private fun processSyncEvents(
key: K,
syncEvents: List<MediatorMessage<Any>>
) : List<Record<K, E>> {
): List<Record<K, E>> {
return syncEvents.mapNotNull { message ->
val destination = messageRouter.getDestination(message)

@Suppress("UNCHECKED_CAST")
val reply = with(destination) {
message.addProperty(MessagingClient.MSG_PROP_ENDPOINT, endpoint)
Expand Down
Loading