Skip to content

Commit

Permalink
Merge pull request #6405 from corda/merge-release/os/5.2-release/os/5…
Browse files Browse the repository at this point in the history
….3-2024-11-28-397

CORE-20926: Merging forward updates from release/os/5.2 to release/os/5.3 - 2024-11-28
  • Loading branch information
LWogan authored Dec 3, 2024
2 parents fa70322 + 9c1aca9 commit ca16596
Show file tree
Hide file tree
Showing 53 changed files with 971 additions and 391 deletions.
32 changes: 32 additions & 0 deletions .snyk
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,36 @@ ignore:
OSGi so for now we have to wait for the next one.
expires: 2024-07-31T00:00:00.000Z
created: 2024-04-11T15:11:31.735Z
SNYK-JAVA-ORGJETBRAINSKOTLIN-2393744:
- '*':
reason: >-
Corda5 Shippable artifacts do not make use of detekt-cli, which is
where this dependency originates, this is used at compile / build time
only for static code analysis and not shipped in any of our releasable artifacts.
expires: 2025-11-20T14:30:31.735Z
created: 2024-11-20T14:30:31.735Z
SNYK-JAVA-ORGECLIPSEJETTY-8186141:
- '*':
reason: >-
This project acknowledges the presence of CVE-2024-6763 in the version of Jetty currently used by Javalin.
The vulnerability affects users of Jetty's HttpURI class, which our project does not directly utilize,
nor is it exposed through Javalin in our application context.
The Javalin team has indicated that they do not use HttpURI, and we have verified that our dependency tree presents no indirect
exposure. We will monitor Javalin updates and adopt a release upgrading Jetty to a patched version (≥12.0.12) when feasible.
Given the limited risk, no immediate action is required beyond ongoing dependency monitoring.
Note: there are currently no versions of Javalin released without this issue.
expires: 2025-11-21T14:30:31.735Z
created: 2024-11-21T12:30:31.735Z
SNYK-JAVA-ORGECLIPSEJETTY-8186158:
- '*':
reason: >-
This project acknowledges the presence of CVE-2024-6763 in the version of Jetty currently used by Javalin.
The vulnerability affects users of Jetty's HttpURI class, which our project does not directly utilize,
nor is it exposed through Javalin in our application context.
The Javalin team has indicated that they do not use HttpURI, and we have verified that our dependency tree presents no indirect
exposure. We will monitor Javalin updates and adopt a release upgrading Jetty to a patched version (≥12.0.12) when feasible.
Given the limited risk, no immediate action is required beyond ongoing dependency monitoring.
Note: there are currently no versions of Javalin released without this issue.
expires: 2025-11-21T14:30:31.735Z
created: 2024-11-21T12:30:31.735Z
patch: {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@ object FlowRestExceptionConstants {
const val INVALID_ID = "Supplied clientRequestId %s is invalid, it must conform to the pattern %s."
const val CPI_NOT_FOUND = "Failed to find a CPI for ID = %s."
const val FLOW_STATUS_NOT_FOUND = "Failed to find the flow status for holdingId = %s and clientRequestId = %s."
const val MAX_FLOW_START_ARGS_SIZE = "The flow start payload has exceeded the max allowed payload size. Note: max payload size is set" +
" to half the value of maxAllowedMessageSize."

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package net.corda.flow.rest.impl.v1

import net.corda.avro.serialization.CordaAvroSerializationFactory
import net.corda.cpiinfo.read.CpiInfoReadService
import net.corda.data.flow.FlowKey
import net.corda.data.flow.output.FlowStates
Expand Down Expand Up @@ -41,6 +42,7 @@ import net.corda.rest.messagebus.MessageBusUtils.tryWithExceptionHandling
import net.corda.rest.response.ResponseEntity
import net.corda.rest.security.CURRENT_REST_CONTEXT
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_START
import net.corda.schema.configuration.MessagingConfig
import net.corda.tracing.TraceTag
import net.corda.tracing.addTraceContextToRecord
import net.corda.tracing.trace
Expand Down Expand Up @@ -71,6 +73,8 @@ class FlowRestResourceImpl @Activate constructor(
private val permissionValidationService: PermissionValidationService,
@Reference(service = PlatformInfoProvider::class)
private val platformInfoProvider: PlatformInfoProvider,
@Reference(service = CordaAvroSerializationFactory::class)
private val cordaAvroSerializationFactory: CordaAvroSerializationFactory,
) : FlowRestResource, PluggableRestResource<FlowRestResource>, Lifecycle {

private companion object {
Expand All @@ -82,12 +86,15 @@ class FlowRestResourceImpl @Activate constructor(
override val targetInterface: Class<FlowRestResource> = FlowRestResource::class.java
override val protocolVersion get() = platformInfoProvider.localWorkerPlatformVersion

private val serializer = cordaAvroSerializationFactory.createAvroSerializer<Any>()
private var publisher: Publisher? = null
private var fatalErrorOccurred = false
private lateinit var onFatalError: () -> Unit
private lateinit var messagingConfig: SmartConfig

override fun initialise(config: SmartConfig, onFatalError: () -> Unit) {
this.onFatalError = onFatalError
this.messagingConfig = config
publisher?.close()
publisher = publisherFactory.createPublisher(PublisherConfig("FlowRestResource"), config)
}
Expand Down Expand Up @@ -196,6 +203,13 @@ class FlowRestResourceImpl @Activate constructor(
startFlow.requestBody.escapedJson,
flowContextPlatformProperties
)
val startEventSize = serializer.serialize(startEvent)?.size
val maxAllowedMessageSize = getMaxAllowedMessageSize(messagingConfig)
if (startEventSize != null && startEventSize > maxAllowedMessageSize) {
log.warn(FlowRestExceptionConstants.MAX_FLOW_START_ARGS_SIZE, IllegalArgumentException("Flow start event of size " +
"[$startEventSize] exceeds maxAllowedMessageSize [$maxAllowedMessageSize]"))
throw InvalidInputDataException(FlowRestExceptionConstants.FATAL_ERROR)
}
val status = messageFactory.createStartFlowStatus(clientRequestId, vNode, flowClassName)

val records = listOf(
Expand Down Expand Up @@ -328,4 +342,6 @@ class FlowRestResourceImpl @Activate constructor(
private fun getVirtualNode(holdingIdentityShortHash: String): VirtualNodeInfo {
return virtualNodeInfoReadService.getByHoldingIdentityShortHashOrThrow(holdingIdentityShortHash).toAvro()
}

private fun getMaxAllowedMessageSize(messagingConfig: SmartConfig) = messagingConfig.getLong(MessagingConfig.MAX_ALLOWED_MSG_SIZE)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package net.corda.flow.rest.impl.v1

import com.typesafe.config.ConfigValueFactory
import net.corda.avro.serialization.CordaAvroSerializationFactory
import net.corda.avro.serialization.CordaAvroSerializer
import net.corda.cpiinfo.read.CpiInfoReadService
import net.corda.crypto.core.SecureHashImpl
import net.corda.data.flow.FlowKey
Expand All @@ -10,6 +13,7 @@ import net.corda.flow.rest.factory.MessageFactory
import net.corda.flow.rest.v1.FlowRestResource
import net.corda.flow.rest.v1.types.request.StartFlowParameters
import net.corda.flow.rest.v1.types.response.FlowStatusResponse
import net.corda.libs.configuration.SmartConfig
import net.corda.libs.configuration.SmartConfigImpl
import net.corda.libs.packaging.core.CordappManifest
import net.corda.libs.packaging.core.CpiIdentifier
Expand All @@ -34,6 +38,7 @@ import net.corda.rest.exception.ResourceNotFoundException
import net.corda.rest.exception.ServiceUnavailableException
import net.corda.rest.security.CURRENT_REST_CONTEXT
import net.corda.rest.security.RestAuthContext
import net.corda.schema.configuration.MessagingConfig
import net.corda.test.util.identity.createTestHoldingIdentity
import net.corda.utilities.MDC_CLIENT_ID
import net.corda.virtualnode.OperationalStatus
Expand All @@ -46,6 +51,7 @@ import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.mockito.kotlin.any
import org.mockito.kotlin.anyOrNull
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.atLeastOnce
import org.mockito.kotlin.doThrow
Expand All @@ -64,6 +70,8 @@ class FlowRestResourceImplTest {

private lateinit var flowStatusLookupService: FlowStatusLookupService
private lateinit var virtualNodeInfoReadService: VirtualNodeInfoReadService
private lateinit var cordaAvroSerializationFactory: CordaAvroSerializationFactory
private lateinit var serializer: CordaAvroSerializer<Any>
private lateinit var publisherFactory: PublisherFactory
private lateinit var messageFactory: MessageFactory
private lateinit var cpiInfoReadService: CpiInfoReadService
Expand Down Expand Up @@ -131,6 +139,11 @@ class FlowRestResourceImplTest {
permissionValidationService = mock()
permissionValidator = mock()
fatalErrorFunction = mock()
cordaAvroSerializationFactory = mock()
serializer = mock()

whenever(cordaAvroSerializationFactory.createAvroSerializer<Any>(anyOrNull())).thenReturn(serializer)
whenever(serializer.serialize(anyOrNull())).thenReturn(byteArrayOf(1,2,3))

val cpiMetadata = getMockCPIMeta()
whenever(cpiInfoReadService.get(any())).thenReturn(cpiMetadata)
Expand Down Expand Up @@ -169,16 +182,17 @@ class FlowRestResourceImplTest {
).thenReturn(true)
}

private fun createFlowRestResource(initialise: Boolean = true): FlowRestResource {
private fun createFlowRestResource(initialise: Boolean = true, messagingConfigParam: SmartConfig = messagingConfig): FlowRestResource {
return FlowRestResourceImpl(
virtualNodeInfoReadService,
flowStatusLookupService,
publisherFactory,
messageFactory,
cpiInfoReadService,
permissionValidationService,
mock()
).apply { if (initialise) (initialise(SmartConfigImpl.empty(), fatalErrorFunction)) }
mock(),
cordaAvroSerializationFactory
).apply { if (initialise) (initialise(messagingConfigParam, fatalErrorFunction)) }
}

@Test
Expand Down Expand Up @@ -350,6 +364,16 @@ class FlowRestResourceImplTest {
}
}

@Test
fun `start flow fails with InvalidInputDataException when payload is too large`() {
val flowRestResource = createFlowRestResource(true, SmartConfigImpl.empty().withValue(MessagingConfig.MAX_ALLOWED_MSG_SIZE,
ConfigValueFactory.fromAnyRef(1)))

assertThrows<InvalidInputDataException> {
flowRestResource.startFlow(VALID_SHORT_HASH, StartFlowParameters(clientRequestId, FLOW1, TestJsonObject()))
}
}

@Test
fun `start flow event fails when not initialized`() {
val flowRestResource = createFlowRestResource(false)
Expand Down Expand Up @@ -587,4 +611,7 @@ class FlowRestResourceImplTest {
flowRestResource.startFlow(VALID_SHORT_HASH, StartFlowParameters("", FLOW1, TestJsonObject()))
}
}

private val messagingConfig = SmartConfigImpl.empty().withValue(MessagingConfig.MAX_ALLOWED_MSG_SIZE, ConfigValueFactory.fromAnyRef
(10000000))
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package net.corda.flow.application.messaging

import net.corda.avro.serialization.CordaAvroSerializationFactory
import net.corda.flow.fiber.FlowFiberService
import net.corda.flow.fiber.FlowIORequest
import net.corda.sandbox.type.UsedByFlow
import net.corda.v5.application.messaging.ExternalMessaging
import net.corda.v5.base.annotations.Suspendable
import net.corda.v5.base.exceptions.CordaRuntimeException
import net.corda.v5.serialization.SingletonSerializeAsToken
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
Expand All @@ -16,25 +18,46 @@ import java.util.UUID
@Component(service = [ExternalMessaging::class, UsedByFlow::class], scope = ServiceScope.PROTOTYPE)
class ExternalMessagingImpl(
private val flowFiberService: FlowFiberService,
private val idFactoryFunc: () -> String
private val idFactoryFunc: () -> String,
cordaAvroSerializationFactory: CordaAvroSerializationFactory
) : ExternalMessaging, UsedByFlow, SingletonSerializeAsToken {

private val serializer = cordaAvroSerializationFactory.createAvroSerializer<Any>()

@Activate
constructor(
@Reference(service = FlowFiberService::class)
flowFiberService: FlowFiberService
) : this(flowFiberService, { UUID.randomUUID().toString() })
flowFiberService: FlowFiberService,
@Reference(service = CordaAvroSerializationFactory::class)
cordaAvroSerializationFactory: CordaAvroSerializationFactory
) : this(flowFiberService, { UUID.randomUUID().toString() }, cordaAvroSerializationFactory)

@Suspendable
override fun send(channelName: String, message: String) {
validateSize(message)
send(channelName, idFactoryFunc(), message)
}

private fun validateSize(message: String) {
val bytesSize = serializer.serialize(message)?.size
val maxAllowedMessageSize = maxMessageSize()
if (bytesSize != null && maxAllowedMessageSize < bytesSize) {
throw CordaRuntimeException(
"Cannot send external messaging content as " +
"it exceeds the max message size allowed. Message Size: [$bytesSize], Max Size: [$maxAllowedMessageSize}]"
)
}
}

@Suspendable
override fun send(channelName: String, messageId: String, message: String) {
validateSize(message)

flowFiberService
.getExecutingFiber()
.suspend(FlowIORequest.SendExternalMessage(channelName, messageId, message))
}

private fun maxMessageSize() = flowFiberService.getExecutingFiber().getExecutionContext().flowCheckpoint.maxMessageSize
}

Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package net.corda.flow.external.events.impl

import java.time.Instant
import net.corda.data.flow.event.external.ExternalEvent
import net.corda.data.flow.event.external.ExternalEventResponse
import net.corda.data.flow.state.external.ExternalEventState
import net.corda.flow.external.events.factory.ExternalEventFactory
import net.corda.flow.external.events.factory.ExternalEventRecord
import net.corda.messaging.api.records.Record
import java.time.Duration
import java.time.Instant

/**
* [ExternalEventManager] encapsulates external event behaviour by creating and modifying [ExternalEventState]s.
Expand Down Expand Up @@ -86,4 +86,16 @@ interface ExternalEventManager {
instant: Instant,
retryWindow: Duration
): Pair<ExternalEventState, Record<*, *>?>

/**
* Get the external event to send for the transient error retry scenario.
* Returns the event as is from the state. No additional checks required.
* @param externalEventState The [ExternalEventState] to get the event from.
* @param instant The current time. Used to set timestamp.
* @return The external event request to resend
* */
fun getRetryEvent(
externalEventState: ExternalEventState,
instant: Instant,
): Record<*, *>
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ class ExternalEventManagerImpl(
return externalEventState to record
}

override fun getRetryEvent(
externalEventState: ExternalEventState,
instant: Instant,
): Record<*, *> {
return generateRecord(externalEventState, instant)
}

private fun checkRetry(externalEventState: ExternalEventState, instant: Instant, retryWindow: Duration) {
when {
(externalEventState.sendTimestamp + retryWindow) >= instant -> {
Expand Down
Loading

0 comments on commit ca16596

Please sign in to comment.