diff --git a/gradle.properties b/gradle.properties index 71d70aefa09..2bbdb1f8f71 100644 --- a/gradle.properties +++ b/gradle.properties @@ -42,7 +42,7 @@ commonsLangVersion = 3.12.0 commonsTextVersion = 1.10.0 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.2.0.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.2.0.27-beta+ +cordaApiVersion=5.2.0.28-alpha-1705311495943 disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MediatorState.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MediatorSubscriptionState.kt similarity index 94% rename from libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MediatorState.kt rename to libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MediatorSubscriptionState.kt index bc9ee158d4c..a2b1736e6f7 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MediatorState.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MediatorSubscriptionState.kt @@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicBoolean * process records. * @property running When set to true, the mediator pattern has been started. When False the mediator pattern is closed or errorred. */ -data class MediatorState ( +data class MediatorSubscriptionState ( private val stopped: AtomicBoolean = AtomicBoolean(false), val running: AtomicBoolean = AtomicBoolean(false) ) { diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt index f3a9e801e7e..ee2eb8cd438 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt @@ -24,7 +24,7 @@ class MultiSourceEventMediatorImpl( get() = lifecycleCoordinator.name private val log = LoggerFactory.getLogger("${this.javaClass.name}-${config.name}") - private val mediatorState = MediatorState() + private val mediatorSubscriptionState = MediatorSubscriptionState() private val consumerConfig = MediatorConsumerConfig( config.messageProcessor.keyClass, config.messageProcessor.eventValueClass, @@ -38,7 +38,7 @@ class MultiSourceEventMediatorImpl( } private fun runMediator() { - mediatorState.running.set(true) + mediatorSubscriptionState.running.set(true) val clients = mediatorComponentFactory.createClients(::onSerializationError) val messageRouter = mediatorComponentFactory.createRouter(clients) @@ -46,7 +46,8 @@ class MultiSourceEventMediatorImpl( config.consumerFactories.map { consumerFactory -> taskManager.executeLongRunningTask { - val consumerProcessor = mediatorComponentFactory.createConsumerProcessor(config, taskManager, messageRouter, mediatorState) + val consumerProcessor = + mediatorComponentFactory.createConsumerProcessor(config, taskManager, messageRouter, mediatorSubscriptionState) consumerProcessor.processTopic(consumerFactory, consumerConfig) }.exceptionally { exception -> handleTaskException(exception) @@ -56,20 +57,20 @@ class MultiSourceEventMediatorImpl( } clients.forEach { it.close() } - mediatorState.running.set(false) + mediatorSubscriptionState.running.set(false) } override fun close() { log.debug("Closing multi-source event mediator") - mediatorState.stop() - while (mediatorState.running()) { + mediatorSubscriptionState.stop() + while (mediatorSubscriptionState.running()) { sleep(100) } lifecycleCoordinator.close() } private fun handleTaskException(exception: Throwable): Unit { - mediatorState.stop() + mediatorSubscriptionState.stop() lifecycleCoordinator.updateStatus(LifecycleStatus.ERROR, "Error: ${exception.message}") log.error("${exception.message}. Closing Multi-Source Event Mediator.", exception) } diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/StateManagerHelper.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/StateManagerHelper.kt index 01b67ac6ee5..a118177c542 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/StateManagerHelper.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/StateManagerHelper.kt @@ -2,36 +2,44 @@ package net.corda.messaging.mediator import net.corda.avro.serialization.CordaAvroDeserializer import net.corda.avro.serialization.CordaAvroSerializer +import net.corda.data.messaging.mediator.MediatorState import net.corda.libs.statemanager.api.Metadata import net.corda.libs.statemanager.api.State import net.corda.libs.statemanager.api.StateManager import net.corda.libs.statemanager.api.metadata import net.corda.messaging.api.constants.MessagingMetadataKeys.PROCESSING_FAILURE import net.corda.messaging.api.processor.StateAndEventProcessor +import java.nio.ByteBuffer /** * Helper for working with [StateManager], used by [MultiSourceEventMediatorImpl]. */ class StateManagerHelper( - private val stateSerializer: CordaAvroSerializer, + private val serializer: CordaAvroSerializer, private val stateDeserializer: CordaAvroDeserializer, + private val mediatorStateDeserializer: CordaAvroDeserializer, ) { /** * Creates an updated [State] or a new one if there was no previous version. * * @param key Event's key. + * @param mediatorState Mediator wrapper state. * @param persistedState State being updated. * @param newState Updated state. */ fun createOrUpdateState( key: String, persistedState: State?, + mediatorState: MediatorState, newState: StateAndEventProcessor.State?, ) = serialize(newState?.value)?.let { serializedValue -> + mediatorState.state = ByteBuffer.wrap(serializedValue) + val mediatorStateBytes = serializer.serialize(mediatorState) + ?: throw IllegalStateException("Serialized mediator state was null. This should not be possible!") State( key, - serializedValue, + mediatorStateBytes, persistedState?.version ?: State.VERSION_INITIAL_VALUE, mergeMetadata(persistedState?.metadata, newState?.metadata), ) @@ -63,8 +71,8 @@ class StateManagerHelper( * @param value State value. * @return Serialized state value. */ - private fun serialize(value: S?) = - value?.let { stateSerializer.serialize(it) } + private fun serialize(value: Any?) = + value?.let { serializer.serialize(it) } /** * Deserializes state value. @@ -72,6 +80,16 @@ class StateManagerHelper( * @param state State. * @return Deserialized state value. */ - fun deserializeValue(state: State?) = - state?.value?.let { stateDeserializer.deserialize(it) } + fun deserializeValue(mediatorState: MediatorState?) = + mediatorState?.state?.let { stateDeserializer.deserialize(it.array()) } + + + /** + * Deserializes state value into the MediatorState. + * + * @param state State. + * @return Deserialized MediatorState. + */ + fun deserializeMediatorState(state: State?) = + state?.value?.let { mediatorStateDeserializer.deserialize(it) } } \ No newline at end of file diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactory.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactory.kt index 03b68b968fd..d0519a4184b 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactory.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactory.kt @@ -11,10 +11,11 @@ import net.corda.messaging.api.mediator.factory.MessageRouterFactory import net.corda.messaging.api.mediator.factory.MessagingClientFactory import net.corda.messaging.api.processor.StateAndEventProcessor import net.corda.messaging.mediator.GroupAllocator -import net.corda.messaging.mediator.MediatorState +import net.corda.messaging.mediator.MediatorSubscriptionState import net.corda.messaging.mediator.StateManagerHelper import net.corda.messaging.mediator.processor.ConsumerProcessor import net.corda.messaging.mediator.processor.EventProcessor +import net.corda.messaging.mediator.processor.MediatorReplayService import net.corda.taskmanager.TaskManager /** @@ -27,7 +28,8 @@ class MediatorComponentFactory( private val clientFactories: Collection, private val messageRouterFactory: MessageRouterFactory, private val groupAllocator: GroupAllocator, - private val stateManagerHelper: StateManagerHelper + private val stateManagerHelper: StateManagerHelper, + private val mediatorReplayService: MediatorReplayService, ) { /** @@ -95,22 +97,22 @@ class MediatorComponentFactory( * @param eventMediatorConfig contains details of the mediators config * @param taskManager used to launch concurrent tasks * @param messageRouter Required by messaging clients to route records to the correct destination - * @param mediatorState shared state to track the mediators processing status + * @param mediatorSubscriptionState shared state to track the mediators processing status * @return A consumer processor */ fun createConsumerProcessor( eventMediatorConfig: EventMediatorConfig, taskManager: TaskManager, messageRouter: MessageRouter, - mediatorState: MediatorState, + mediatorSubscriptionState: MediatorSubscriptionState, ): ConsumerProcessor { - val eventProcessor = EventProcessor(eventMediatorConfig, stateManagerHelper, messageRouter) + val eventProcessor = EventProcessor(eventMediatorConfig, stateManagerHelper, messageRouter, mediatorReplayService) return ConsumerProcessor( eventMediatorConfig, groupAllocator, taskManager, messageRouter, - mediatorState, + mediatorSubscriptionState, eventProcessor, stateManagerHelper ) diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MultiSourceEventMediatorFactoryImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MultiSourceEventMediatorFactoryImpl.kt index 1ca471181d7..d02c093a809 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MultiSourceEventMediatorFactoryImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/factory/MultiSourceEventMediatorFactoryImpl.kt @@ -1,6 +1,7 @@ package net.corda.messaging.mediator.factory import net.corda.avro.serialization.CordaAvroSerializationFactory +import net.corda.data.messaging.mediator.MediatorState import net.corda.lifecycle.LifecycleCoordinator import net.corda.lifecycle.LifecycleCoordinatorFactory import net.corda.lifecycle.LifecycleCoordinatorName @@ -10,6 +11,7 @@ import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory import net.corda.messaging.mediator.GroupAllocator import net.corda.messaging.mediator.MultiSourceEventMediatorImpl import net.corda.messaging.mediator.StateManagerHelper +import net.corda.messaging.mediator.processor.MediatorReplayService import net.corda.taskmanager.TaskManagerFactory import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component @@ -20,15 +22,17 @@ import java.util.UUID class MultiSourceEventMediatorFactoryImpl( private val cordaAvroSerializationFactory: CordaAvroSerializationFactory, private val lifecycleCoordinatorFactory: LifecycleCoordinatorFactory, - private val taskManagerFactory: TaskManagerFactory + private val taskManagerFactory: TaskManagerFactory, + private val mediatorReplayService: MediatorReplayService ) : MultiSourceEventMediatorFactory { @Activate constructor( @Reference(service = CordaAvroSerializationFactory::class) cordaAvroSerializationFactory: CordaAvroSerializationFactory, - @Reference(service = LifecycleCoordinatorFactory::class) lifecycleCoordinatorFactory: LifecycleCoordinatorFactory + @Reference(service = LifecycleCoordinatorFactory::class) lifecycleCoordinatorFactory: LifecycleCoordinatorFactory, + @Reference(service = MediatorReplayService::class) mediatorReplayService: MediatorReplayService ) : this( - cordaAvroSerializationFactory, lifecycleCoordinatorFactory, TaskManagerFactory.INSTANCE + cordaAvroSerializationFactory, lifecycleCoordinatorFactory, TaskManagerFactory.INSTANCE, mediatorReplayService ) override fun create( @@ -52,7 +56,8 @@ class MultiSourceEventMediatorFactoryImpl( eventMediatorConfig.clientFactories, eventMediatorConfig.messageRouterFactory, GroupAllocator(), - stateManagerHelper + stateManagerHelper, + mediatorReplayService ) private fun createLifecycleCoordinator( @@ -68,12 +73,15 @@ class MultiSourceEventMediatorFactoryImpl( private fun createStateManagerHelper( eventMediatorConfig: EventMediatorConfig ): StateManagerHelper { - val stateSerializer = cordaAvroSerializationFactory.createAvroSerializer { } + val stateSerializer = cordaAvroSerializationFactory.createAvroSerializer { } val stateDeserializer = cordaAvroSerializationFactory.createAvroDeserializer( {}, eventMediatorConfig.messageProcessor.stateValueClass ) + val mediatorWrapperDeserializer = cordaAvroSerializationFactory.createAvroDeserializer( + {}, MediatorState::class.java + ) return StateManagerHelper( - stateSerializer, stateDeserializer + stateSerializer, stateDeserializer, mediatorWrapperDeserializer ) } } \ No newline at end of file diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt index d00c74071ee..c97e1e81b52 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt @@ -10,7 +10,7 @@ import net.corda.messaging.api.mediator.config.MediatorConsumerConfig import net.corda.messaging.api.mediator.factory.MediatorConsumerFactory import net.corda.messaging.api.records.Record import net.corda.messaging.mediator.GroupAllocator -import net.corda.messaging.mediator.MediatorState +import net.corda.messaging.mediator.MediatorSubscriptionState import net.corda.messaging.mediator.MultiSourceEventMediatorImpl import net.corda.messaging.mediator.StateManagerHelper import net.corda.messaging.mediator.metrics.EventMediatorMetrics @@ -41,7 +41,7 @@ class ConsumerProcessor( private val groupAllocator: GroupAllocator, private val taskManager: TaskManager, private val messageRouter: MessageRouter, - private val mediatorState: MediatorState, + private val mediatorSubscriptionState: MediatorSubscriptionState, private val eventProcessor: EventProcessor, private val stateManagerHelper: StateManagerHelper ) { @@ -65,7 +65,7 @@ class ConsumerProcessor( fun processTopic(consumerFactory: MediatorConsumerFactory, consumerConfig: MediatorConsumerConfig) { var attempts = 0 var consumer: MediatorConsumer? = null - while (!mediatorState.stopped()) { + while (!mediatorSubscriptionState.stopped()) { attempts++ try { if (consumer == null) { diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt index 7b6f2e1bd5d..4e5405ab8c3 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt @@ -1,5 +1,6 @@ package net.corda.messaging.mediator.processor +import net.corda.data.messaging.mediator.MediatorState import net.corda.libs.statemanager.api.State import net.corda.messaging.api.exception.CordaMessageAPIIntermittentException import net.corda.messaging.api.mediator.MediatorMessage @@ -22,7 +23,8 @@ import net.corda.tracing.addTraceContextToRecord class EventProcessor( private val config: EventMediatorConfig, private val stateManagerHelper: StateManagerHelper, - private val messageRouter: MessageRouter + private val messageRouter: MessageRouter, + private val mediatorReplayService: MediatorReplayService, ) { /** @@ -33,31 +35,38 @@ class EventProcessor( fun processEvents( group: Map>>, retrievedStates: Map - ) : Map { + ): Map { return group.mapValues { groupEntry -> val groupKey = groupEntry.key.toString() val state = retrievedStates.getOrDefault(groupKey, null) - var processorState = stateManagerHelper.deserializeValue(state)?.let { stateValue -> + val mediatorState = stateManagerHelper.deserializeMediatorState(state) ?: createNewMediatorState() + var processorState = stateManagerHelper.deserializeValue(mediatorState)?.let { stateValue -> StateAndEventProcessor.State( stateValue, state?.metadata ) } - val asyncOutputs = mutableListOf>() - val queue = ArrayDeque(groupEntry.value) + val asyncOutputs = mutableMapOf, MutableList>>() + val allConsumerInputs = groupEntry.value val processed = try { - while (queue.isNotEmpty()) { - val event = queue.removeFirst() - val response = config.messageProcessor.onNext(processorState, event) - processorState = response.updatedState - val (syncEvents, asyncEvents) = response.responseEvents.map { convertToMessage(it) }.partition { - messageRouter.getDestination(it).type == RoutingDestination.Type.SYNCHRONOUS + allConsumerInputs.onEach { consumerInputEvent -> + val queue = ArrayDeque(listOf(consumerInputEvent)) + while (queue.isNotEmpty()) { + val event = queue.removeFirst() + val response = config.messageProcessor.onNext(processorState, event) + processorState = response.updatedState + val (syncEvents, asyncEvents) = response.responseEvents.map { convertToMessage(it) }.partition { + messageRouter.getDestination(it).type == RoutingDestination.Type.SYNCHRONOUS + } + asyncOutputs.compute(consumerInputEvent) { _, oldValue -> + (oldValue?.plus(asyncEvents) ?: asyncEvents).toMutableList() + } + val returnedMessages = processSyncEvents(groupEntry.key, syncEvents) + queue.addAll(returnedMessages) } - asyncOutputs.addAll(asyncEvents) - val returnedMessages = processSyncEvents(groupEntry.key, syncEvents) - queue.addAll(returnedMessages) } - stateManagerHelper.createOrUpdateState(groupKey, state, processorState) + mediatorState.outputEvents = mediatorReplayService.getOutputEvents(mediatorState.outputEvents, asyncOutputs) + stateManagerHelper.createOrUpdateState(groupKey, state, mediatorState, processorState) } catch (e: CordaMessageAPIIntermittentException) { // If an intermittent error occurs here, the RPC client has failed to deliver a message to another part // of the system despite the retry loop implemented there. This should trigger individual processing to @@ -72,19 +81,27 @@ class EventProcessor( else -> StateChangeAndOperation.Noop } - EventProcessingOutput(asyncOutputs, stateChangeAndOperation) + EventProcessingOutput(asyncOutputs.values.flatten(), stateChangeAndOperation) } } + private fun createNewMediatorState(): MediatorState { + return MediatorState.newBuilder() + .setState(null) + .setOutputEvents(mutableListOf()) + .build() + } + /** * Send any synchronous events immediately and feed results back onto the queue. */ private fun processSyncEvents( key: K, syncEvents: List> - ) : List> { + ): List> { return syncEvents.mapNotNull { message -> val destination = messageRouter.getDestination(message) + @Suppress("UNCHECKED_CAST") val reply = with(destination) { message.addProperty(MessagingClient.MSG_PROP_ENDPOINT, endpoint) diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/MediatorReplayService.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/MediatorReplayService.kt new file mode 100644 index 00000000000..b9ca5ccd5f9 --- /dev/null +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/MediatorReplayService.kt @@ -0,0 +1,108 @@ +package net.corda.messaging.mediator.processor + +import net.corda.avro.serialization.CordaAvroSerializationFactory +import net.corda.crypto.cipher.suite.sha256Bytes +import net.corda.data.messaging.mediator.MediatorReplayOutputEvent +import net.corda.data.messaging.mediator.MediatorReplayOutputEvents +import net.corda.data.messaging.mediator.MediatorState +import net.corda.messaging.api.mediator.MediatorMessage +import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_KEY +import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_TOPIC +import net.corda.messaging.api.records.Record +import org.osgi.service.component.annotations.Activate +import org.osgi.service.component.annotations.Component +import org.osgi.service.component.annotations.Reference +import java.nio.ByteBuffer + +/** + * Service used by the Multi-Source Mediator to assist in detecting replays and storing output events. + */ +@Component(service = [MediatorReplayService::class]) +class MediatorReplayService @Activate constructor( + @Reference(service = CordaAvroSerializationFactory::class) + private val cordaAvroSerializationFactory: CordaAvroSerializationFactory, +) { + private val serializer = cordaAvroSerializationFactory.createAvroSerializer { } + private val deSerializer = cordaAvroSerializationFactory.createAvroDeserializer({}, Any::class.java) + + /** + * Generate the new [MediatorReplayOutputEvents] given the mediators [existingOutputs] when provided with the [newOutputs] + * @param existingOutputs The existing output events saved to the mediator state/ + * @param newOutputs The new outputs to add to the existingOutputs to be stored in the mediator. + * @return Return a new [MediatorReplayOutputEvents] object containing all the existing outputs with the new outputs added to it. + */ + fun getOutputEvents( + existingOutputs: List, + newOutputs: Map, MutableList>> + ): List { + val mediatorOutputs = existingOutputs.toMutableList() + + newOutputs.onEach { entry -> + val hash = getInputHash(entry.key) + val mediatorOutputList = entry.value.map { + val topic = it.properties.getProperty(MSG_PROP_TOPIC) + val key = ByteBuffer.wrap(serializer.serialize(it.properties.getProperty(MSG_PROP_KEY))) + val payload = ByteBuffer.wrap(serialize(it.payload)) + MediatorReplayOutputEvent(topic, key, payload) + } + mediatorOutputs.add(MediatorReplayOutputEvents(hash, mediatorOutputList)) + } + + return mediatorOutputs + } + + /** + * Compare the [inputRecord] to the existing [mediatorState] to see if it is a replayed record or not. + * If it is replay then return all the outputs from the [mediatorState] as [MediatorMessage]s. + * @param inputRecord Record to check whether it is a replay or not + * @param mediatorState The mediator state to check the [inputRecord] against + * @return Null if it is not a replayed event, if it is a replay event, + * a list of mediator messages are returned associated with the [inputRecord]. + */ + fun getReplayEvents(inputRecord: Record, mediatorState: MediatorState): List>? { + val savedOutputs = mediatorState.outputEvents + val inputHash = getInputHash(inputRecord).array() + + savedOutputs.forEach { mediatorReplayOutputEvents -> + if (inputHash.contentEquals(mediatorReplayOutputEvents.inputEventHash.array())) { + return mediatorReplayOutputEvents.outputEvents.map { outputEvent -> + outputEvent.toMediatorMessage() + } + } + } + + return null + } + + fun MutableMap.getProperty(key: String): String { + return this[key]?.toString() ?: throw IllegalStateException("Mediator message property $key was null") + } + + /** + * Generate a unique hash for the consumer input record to be used as a key when storing outputs. + * This allows for replay detection of consumer inputs by looking up the hash in the [MediatorReplayOutputEvents] + * @param inputEvent The consumer input event polled from the bus + * @return A hash of the input event as bytes + */ + private fun getInputHash(inputEvent: Record): ByteBuffer { + val recordKeyBytes = serialize(inputEvent.key) + val recordValueBytes = serialize(inputEvent.value) + if (recordKeyBytes == null || recordValueBytes == null) + throw IllegalStateException("Input record key and value bytes should not be null") + return ByteBuffer.wrap((recordKeyBytes + recordValueBytes).sha256Bytes()) + } + + private fun serialize(value: Any?) = value?.let { serializer.serialize(it) } + + private fun MediatorReplayOutputEvent.toMediatorMessage(): MediatorMessage { + val key = deSerializer.deserialize(key.array()) ?: throw IllegalStateException("Mediator message key is null after deserialization") + val payload = value.let { deSerializer.deserialize(it.array()) } + val properties = mutableMapOf( + MSG_PROP_TOPIC to topic, + MSG_PROP_KEY to key + ) + return MediatorMessage(payload, properties) + } + +} + diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt index fdfd78b591a..d5ad84fd778 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt @@ -23,13 +23,13 @@ import java.util.concurrent.atomic.AtomicBoolean class MultiSourceEventMediatorImplTest { - private val mediatorState = spy(MediatorState(AtomicBoolean(false), AtomicBoolean(false))) + private val mediatorSubscriptionState = spy(MediatorSubscriptionState(AtomicBoolean(false), AtomicBoolean(false))) private val messagingClient = mock() private val consumerProcessor = mock>() private val mediatorComponentFactory = mock>().apply { whenever(createClients(any())).thenReturn(listOf(messagingClient)) whenever(createRouter(any())).thenReturn(mock()) - whenever(createConsumerProcessor(any(), any(), any(), eq(mediatorState))).thenReturn(consumerProcessor) + whenever(createConsumerProcessor(any(), any(), any(), eq(mediatorSubscriptionState))).thenReturn(consumerProcessor) } private val lifecycleCoordinator = mock() private val taskManager = mock().apply { @@ -76,6 +76,6 @@ class MultiSourceEventMediatorImplTest { mediator.close() verify(lifecycleCoordinator).close() - verify(mediatorState).stop() + verify(mediatorSubscriptionState).stop() } } diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/StateManagerHelperTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/StateManagerHelperTest.kt index a75f037e56d..dd30b38e93b 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/StateManagerHelperTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/StateManagerHelperTest.kt @@ -2,6 +2,7 @@ package net.corda.messaging.mediator import net.corda.avro.serialization.CordaAvroDeserializer import net.corda.avro.serialization.CordaAvroSerializer +import net.corda.data.messaging.mediator.MediatorState import net.corda.libs.statemanager.api.Metadata import net.corda.libs.statemanager.api.State import net.corda.libs.statemanager.api.State.Companion.VERSION_INITIAL_VALUE @@ -17,6 +18,7 @@ import org.mockito.Mockito.`when` import org.mockito.kotlin.anyOrNull import org.mockito.kotlin.mock import org.mockito.kotlin.verify +import java.nio.ByteBuffer class StateManagerHelperTest { @@ -27,8 +29,10 @@ class StateManagerHelperTest { private data class StateType(val id: Int) - private val stateSerializer = mock>() + private val mediatorState = mock() + private val stateSerializer = mock>() private val stateDeserializer = mock>() + private val wrapperDeserializer = mock>() @BeforeEach fun setup() { @@ -51,15 +55,16 @@ class StateManagerHelperTest { val stateManagerHelper = StateManagerHelper( stateSerializer, stateDeserializer, + wrapperDeserializer ) val state = stateManagerHelper.createOrUpdateState( - TEST_KEY, persistedState, newState + TEST_KEY, persistedState, mediatorState, newState ) assertNotNull(state) assertEquals(TEST_KEY, state!!.key) - assertArrayEquals(serialized(newState.value!!), state.value) + assertArrayEquals(serialized(mediatorState), state.value) assertEquals(VERSION_INITIAL_VALUE, state.version) assertEquals(newState.metadata, state.metadata) } @@ -80,15 +85,16 @@ class StateManagerHelperTest { val stateManagerHelper = StateManagerHelper( stateSerializer, stateDeserializer, + wrapperDeserializer ) - + val mediatorState = MediatorState(ByteBuffer.wrap(serialized(persistedState)), emptyList()) val state = stateManagerHelper.createOrUpdateState( - TEST_KEY, persistedState, updatedState + TEST_KEY, persistedState, mediatorState, updatedState ) assertNotNull(state) assertEquals(persistedState.key, state!!.key) - assertArrayEquals(serialized(updatedState.value!!), state.value) + assertArrayEquals(serialized(MediatorState(ByteBuffer.wrap(serialized(updatedState.value!!)), emptyList())), state.value) assertEquals(persistedState.version, state.version) assertEquals(updatedState.metadata, state.metadata) } @@ -98,12 +104,14 @@ class StateManagerHelperTest { val stateManagerHelper = StateManagerHelper( stateSerializer, stateDeserializer, + wrapperDeserializer ) val serializedStateValue = "test".toByteArray() + val mediatorState = MediatorState(ByteBuffer.wrap(serializedStateValue), emptyList()) val state = mock() `when`(state.value).thenReturn(serializedStateValue) - stateManagerHelper.deserializeValue(state) + stateManagerHelper.deserializeValue(mediatorState) verify(stateDeserializer).deserialize(serializedStateValue) } @@ -120,6 +128,7 @@ class StateManagerHelperTest { val stateManagerHelper = StateManagerHelper( stateSerializer, stateDeserializer, + wrapperDeserializer ) val state = stateManagerHelper.failStateProcessing(TEST_KEY, persistedState) @@ -134,6 +143,7 @@ class StateManagerHelperTest { val stateManagerHelper = StateManagerHelper( stateSerializer, stateDeserializer, + wrapperDeserializer ) val state = stateManagerHelper.failStateProcessing(TEST_KEY, null) diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactoryTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactoryTest.kt index 5818c4038ab..b7d21e94b62 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactoryTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MediatorComponentFactoryTest.kt @@ -15,8 +15,9 @@ import net.corda.messaging.api.processor.StateAndEventProcessor import net.corda.messaging.api.processor.StateAndEventProcessor.State import net.corda.messaging.api.records.Record import net.corda.messaging.mediator.GroupAllocator -import net.corda.messaging.mediator.MediatorState +import net.corda.messaging.mediator.MediatorSubscriptionState import net.corda.messaging.mediator.StateManagerHelper +import net.corda.messaging.mediator.processor.MediatorReplayService import net.corda.taskmanager.TaskManager import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Assertions.assertEquals @@ -60,7 +61,8 @@ class MediatorComponentFactoryTest { private val stateManagerHelper = mock>() private val taskManager = mock() private val messageRouter = mock() - private val mediatorState = MediatorState(AtomicBoolean(false), AtomicBoolean(false)) + private val mediatorReplayService = mock() + private val mediatorSubscriptionState = MediatorSubscriptionState(AtomicBoolean(false), AtomicBoolean(false)) private val eventMediatorConfig = mock>().apply { whenever(name).thenReturn("name") whenever(stateManager).thenReturn(mock()) @@ -90,7 +92,8 @@ class MediatorComponentFactoryTest { clientFactories, messageRouterFactory, groupAllocator, - stateManagerHelper + stateManagerHelper, + mediatorReplayService ) } @@ -123,7 +126,8 @@ class MediatorComponentFactoryTest { clientFactories, messageRouterFactory, groupAllocator, - stateManagerHelper + stateManagerHelper, + mediatorReplayService ) assertThrows { @@ -158,7 +162,8 @@ class MediatorComponentFactoryTest { emptyList(), messageRouterFactory, groupAllocator, - stateManagerHelper + stateManagerHelper, + mediatorReplayService ) assertThrows { @@ -195,7 +200,7 @@ class MediatorComponentFactoryTest { @Test fun `create a consumer processor`() { val consumerProcessor = mediatorComponentFactory.createConsumerProcessor(eventMediatorConfig, taskManager, messageRouter, - mediatorState) + mediatorSubscriptionState) assertThat(consumerProcessor).isNotNull() } diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MultiSourceEventMediatorFactoryTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MultiSourceEventMediatorFactoryTest.kt index 0127d02ea8f..b92b9244e23 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MultiSourceEventMediatorFactoryTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/factory/MultiSourceEventMediatorFactoryTest.kt @@ -9,6 +9,7 @@ import net.corda.lifecycle.LifecycleCoordinatorFactory import net.corda.messaging.api.mediator.config.EventMediatorConfig import net.corda.messaging.api.mediator.factory.MessageRouterFactory import net.corda.messaging.api.processor.StateAndEventProcessor +import net.corda.messaging.mediator.processor.MediatorReplayService import net.corda.taskmanager.TaskManager import net.corda.taskmanager.TaskManagerFactory import org.junit.jupiter.api.Assertions @@ -25,6 +26,7 @@ class MultiSourceEventMediatorFactoryTest { private val serializer = mock>() private val stateDeserializer = mock>() private val lifecycleCoordinatorFactory = mock() + private val mediatorReplayService = mock() private val taskManagerFactory = mock() @BeforeEach @@ -37,6 +39,7 @@ class MultiSourceEventMediatorFactoryTest { cordaAvroSerializationFactory, lifecycleCoordinatorFactory, taskManagerFactory, + mediatorReplayService ) } diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessorTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessorTest.kt index a705d2f0b93..c0920ac48e5 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessorTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessorTest.kt @@ -19,7 +19,7 @@ import net.corda.messaging.api.processor.StateAndEventProcessor import net.corda.messaging.api.records.Record import net.corda.messaging.getStringRecords import net.corda.messaging.mediator.GroupAllocator -import net.corda.messaging.mediator.MediatorState +import net.corda.messaging.mediator.MediatorSubscriptionState import net.corda.messaging.mediator.StateManagerHelper import net.corda.schema.configuration.MessagingConfig import net.corda.taskmanager.TaskManager @@ -54,7 +54,7 @@ class ConsumerProcessorTest { private lateinit var groupAllocator: GroupAllocator private lateinit var taskManager: TaskManager private lateinit var messageRouter: MessageRouter - private lateinit var mediatorState: MediatorState + private lateinit var mediatorSubscriptionState: MediatorSubscriptionState private lateinit var stateManagerHelper: StateManagerHelper private lateinit var eventProcessor: EventProcessor @@ -68,12 +68,12 @@ class ConsumerProcessorTest { consumerFactory = mock() groupAllocator = mock() messageRouter = mock() - mediatorState = MediatorState() + mediatorSubscriptionState = MediatorSubscriptionState() eventProcessor = mock() eventMediatorConfig = buildStringTestConfig() stateManagerHelper = mock() consumerProcessor = ConsumerProcessor( - eventMediatorConfig, groupAllocator, taskManager, messageRouter, mediatorState, eventProcessor, stateManagerHelper + eventMediatorConfig, groupAllocator, taskManager, messageRouter, mediatorSubscriptionState, eventProcessor, stateManagerHelper ) } @@ -100,7 +100,7 @@ class ConsumerProcessorTest { ) ) whenever(groupAllocator.allocateGroups(any(), any())).thenReturn(getGroups(2, 4)) - whenever(stateManagerHelper.createOrUpdateState(any(), any(), any())).thenReturn(mock()) + whenever(stateManagerHelper.createOrUpdateState(any(), any(), any(), any())).thenReturn(mock()) consumerProcessor.processTopic(getConsumerFactory(), getConsumerConfig()) @@ -191,7 +191,7 @@ class ConsumerProcessorTest { private fun getConsumerFactory(): MediatorConsumerFactory { consumer.apply { whenever(poll(any())).thenAnswer { - mediatorState.stop() + mediatorSubscriptionState.stop() listOf(getConsumerRecord()) } } diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt index 276bbadd92a..9ad311b9183 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt @@ -32,6 +32,7 @@ class EventProcessorTest { private lateinit var stateManagerHelper: StateManagerHelper private lateinit var client: MessagingClient private lateinit var messageRouter: MessageRouter + private lateinit var mediatorReplayService: MediatorReplayService private lateinit var stateAndEventProcessor: StateAndEventProcessor private lateinit var eventProcessor: EventProcessor @@ -45,6 +46,7 @@ class EventProcessorTest { client = mock() stateAndEventProcessor = mock() stateManagerHelper = mock() + mediatorReplayService = mock() messageRouter = mock() whenever(messageRouter.getDestination(any())).thenAnswer { val msg = it.arguments[0] as MediatorMessage @@ -54,7 +56,7 @@ class EventProcessorTest { } eventMediatorConfig = buildStringTestConfig() - eventProcessor = EventProcessor(eventMediatorConfig, stateManagerHelper, messageRouter) + eventProcessor = EventProcessor(eventMediatorConfig, stateManagerHelper, messageRouter, mediatorReplayService) } @Test @@ -80,7 +82,7 @@ class EventProcessorTest { verify(stateAndEventProcessor, times(4)).onNext(anyOrNull(), any()) verify(messageRouter, times(9)).getDestination(any()) verify(client, times(3)).send(any()) - verify(stateManagerHelper, times(1)).createOrUpdateState(any(), anyOrNull(), anyOrNull()) + verify(stateManagerHelper, times(1)).createOrUpdateState(any(), anyOrNull(), any(), anyOrNull()) } @Test diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/MediatorReplayServiceTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/MediatorReplayServiceTest.kt new file mode 100644 index 00000000000..372c5165322 --- /dev/null +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/MediatorReplayServiceTest.kt @@ -0,0 +1,154 @@ +package net.corda.messaging.mediator.processor + +import net.corda.avro.serialization.CordaAvroDeserializer +import net.corda.avro.serialization.CordaAvroSerializationFactory +import net.corda.avro.serialization.CordaAvroSerializer +import net.corda.crypto.cipher.suite.sha256Bytes +import net.corda.data.messaging.mediator.MediatorReplayOutputEvent +import net.corda.data.messaging.mediator.MediatorReplayOutputEvents +import net.corda.data.messaging.mediator.MediatorState +import net.corda.messaging.api.mediator.MediatorMessage +import net.corda.messaging.api.mediator.MessagingClient +import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_TOPIC +import net.corda.messaging.api.records.Record +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.mockito.kotlin.any +import org.mockito.kotlin.anyOrNull +import org.mockito.kotlin.mock +import org.mockito.kotlin.whenever +import java.nio.ByteBuffer +import kotlin.test.assertNull + +class MediatorReplayServiceTest { + + private val serializer: CordaAvroSerializer = mock>() + private val deserializer: CordaAvroDeserializer = mock>() + private val cordaAvroSerializationFactory: CordaAvroSerializationFactory = mock().apply { + whenever(createAvroSerializer(anyOrNull())).thenReturn(serializer) + whenever(createAvroDeserializer(anyOrNull(), any())).thenReturn(deserializer) + } + private val mediatorReplayService = MediatorReplayService(cordaAvroSerializationFactory) + private val topic = "topic" + private val testState = ByteBuffer.wrap("state".toByteArray()) + + @BeforeEach + fun setup() { + whenever(serializer.serialize(anyOrNull())).thenReturn("test1".toByteArray()) + whenever(deserializer.deserialize(anyOrNull())).thenReturn("test1") + } + + @Test + fun `Add new output events to empty mediator state`() { + val mediatorReplayOutputEvents = mediatorReplayOutputEvents() + val numberOfKeys = 2 + val numberOfValues = 4 + val outputs = mediatorReplayService.getOutputEvents( + mediatorReplayOutputEvents, + getNewOutputs(numberOfKeys, numberOfValues) + ) + + assertEquals(2, outputs.size) + outputs.onEach { + assertEquals(4, it.outputEvents.size) + } + } + + @Test + fun `Add new output events to mediator state with existing outputs`() { + val mediatorReplayOutputEvents = mediatorReplayOutputEvents(3, 3) + val numberOfKeys = 2 + val numberOfValues = 4 + val outputs = mediatorReplayService.getOutputEvents( + mediatorReplayOutputEvents, + getNewOutputs(numberOfKeys, numberOfValues) + ) + assertEquals(5, outputs.size) + assertEquals(17, outputs.sumOf { it.outputEvents.size }) + } + + @Test + fun `Add new output events with missing mandatory properties throws exception`() { + val mediatorReplayOutputEvents = mediatorReplayOutputEvents() + val numberOfKeys = 2 + val numberOfValues = 4 + assertThrows { + mediatorReplayService.getOutputEvents( + mediatorReplayOutputEvents, + getNewOutputs(numberOfKeys, numberOfValues, true) + ) + } + } + + @Test + fun `input record is replay event`() { + val inputRecord = Record(topic, "test1", "test1") + val existingOutputs = mediatorReplayOutputEvents(2, 3) + val outputs = mediatorReplayService.getReplayEvents(inputRecord, MediatorState(testState, existingOutputs)) + assertEquals(3, outputs?.size) + } + + @Test + fun `input record is not a replay event, empty outputs`() { + val inputRecord = Record(topic, "test1", "test1") + assertNull(mediatorReplayService.getReplayEvents(inputRecord, MediatorState(testState, mutableListOf()))) + } + + @Test + fun `input record is not a replay event, existing outputs`() { + val existingOutputs = mediatorReplayOutputEvents(2, 3) + val inputRecord = Record(topic, "test3", "test3") + whenever(serializer.serialize(any())).thenReturn("bytes".toByteArray()) + assertNull(mediatorReplayService.getReplayEvents(inputRecord, MediatorState(testState, existingOutputs))) + } + + private fun mediatorReplayOutputEvents(existingKeys: Int = 0, existingValuesPerKey: Int = 0): List { + if (existingKeys == 0) return mutableListOf() + + val existingOutputs = mutableListOf() + for (i in 1 .. existingKeys) { + val recordKey = "test$i" + val outputsPerKey = mutableListOf() + for (j in 1 .. existingValuesPerKey) { + outputsPerKey.add( + MediatorReplayOutputEvent( + topic, + ByteBuffer.wrap(recordKey.toByteArray()), + ByteBuffer.wrap(recordKey.toByteArray()) + ) + ) + } + val hash = ByteBuffer.wrap((recordKey.toByteArray() + recordKey.toByteArray()).sha256Bytes()) + existingOutputs.add(MediatorReplayOutputEvents(hash, outputsPerKey)) + } + return existingOutputs + } + + private fun getNewOutputs( + numberOfKeys: Int, + numberOfRecordsPerKey: Int, + missingProperty: Boolean = false + ): Map, MutableList>> { + val newOutputs = mutableMapOf, MutableList>>() + for (i in 1 .. numberOfKeys) { + val recordKey = i.toString() + val outputsPerKey = mutableListOf>() + for (j in 1 .. numberOfRecordsPerKey) { + outputsPerKey.add(MediatorMessage("$j", getProperties(recordKey, missingProperty))) + } + newOutputs[Record(topic, recordKey, recordKey)] = outputsPerKey + } + + return newOutputs + } + + private fun getProperties(key: String, missingProperty: Boolean): MutableMap { + val testProperties: MutableMap = mutableMapOf(MessagingClient.MSG_PROP_KEY to key) + if (!missingProperty) { + testProperties[MSG_PROP_TOPIC] = topic + } + return testProperties + } +} \ No newline at end of file diff --git a/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/repository/StateRepository.kt b/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/repository/StateRepository.kt index 39eea18e960..4aa1f4ef21a 100644 --- a/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/repository/StateRepository.kt +++ b/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/repository/StateRepository.kt @@ -26,7 +26,7 @@ interface StateRepository { * Transaction should be controlled by the caller. * * @param connection The JDBC connection used to interact with the database. - * @param state State entity to persist. + * @param states State entity to persist. * @return The collection of keys that were successfully created. */ fun create(connection: Connection, states: Collection): Collection