diff --git a/java/arcs/core/allocator/Allocator.kt b/java/arcs/core/allocator/Allocator.kt index 70d9761f4af..dbc5e25e49d 100644 --- a/java/arcs/core/allocator/Allocator.kt +++ b/java/arcs/core/allocator/Allocator.kt @@ -10,6 +10,7 @@ */ package arcs.core.allocator +import arcs.core.allocator.CollectionHandlePartitionMap.Companion.arcId import arcs.core.common.ArcId import arcs.core.common.Id import arcs.core.common.toArcId @@ -19,7 +20,7 @@ import arcs.core.entity.HandleSpec import arcs.core.host.ArcHost import arcs.core.host.ArcHostException import arcs.core.host.ArcHostNotFoundException -import arcs.core.host.HandleManagerImpl +import arcs.core.host.HandleManager import arcs.core.host.HostRegistry import arcs.core.host.ParticleNotFoundException import arcs.core.storage.CapabilitiesResolver @@ -60,7 +61,7 @@ class Allocator( * Stores the provided list of [Plan.Parition] for the provided [ArcId]. Existing values * will be replaced. */ - suspend fun set(arcId: ArcId, partitions: List) + suspend fun set(partitions: List) /** * Return the current list of [Plan.Partition] for the provided [ArcId]. If an Arc with @@ -114,7 +115,7 @@ class Allocator( val partitions = computePartitions(arcId, newPlan) log.debug { "Computed partitions" } // Store computed partitions for later - partitionMap.set(arcId, partitions) + partitionMap.set(partitions) try { startPlanPartitionsOnHosts(partitions) return Arc(arcId, partitions, this, scope) @@ -179,16 +180,16 @@ class Allocator( companion object { /** * Creates an [Allocator] which serializes Arc/Particle state to the storage system backing - * the provided [handleManagerImpl]. + * the provided [handleManager]. */ fun create( hostRegistry: HostRegistry, - handleManagerImpl: HandleManagerImpl, + handleManager: HandleManager, scope: CoroutineScope ): Allocator { return Allocator( hostRegistry, - CollectionHandlePartitionMap(handleManagerImpl), + CollectionHandlePartitionMap(handleManager), scope ) } @@ -210,10 +211,9 @@ class Allocator( private val partitions = mutableMapOf>() override suspend fun set( - arcId: ArcId, partitions: List ) = mutex.withLock { - this.partitions[arcId] = partitions + this.partitions[partitions.arcId()] = partitions } override suspend fun readPartitions( diff --git a/java/arcs/core/allocator/CollectionHandlePartitionMap.kt b/java/arcs/core/allocator/CollectionHandlePartitionMap.kt index 60b0b60bc8c..30a68f9dd4d 100644 --- a/java/arcs/core/allocator/CollectionHandlePartitionMap.kt +++ b/java/arcs/core/allocator/CollectionHandlePartitionMap.kt @@ -2,6 +2,7 @@ package arcs.core.allocator import arcs.core.common.ArcId import arcs.core.common.SuspendableLazy +import arcs.core.common.toArcId import arcs.core.data.CollectionType import arcs.core.data.EntityType import arcs.core.data.FieldType @@ -16,7 +17,7 @@ import arcs.core.entity.EntityBaseSpec import arcs.core.entity.HandleSpec import arcs.core.entity.ReadWriteCollectionHandle import arcs.core.entity.awaitReady -import arcs.core.host.HandleManagerImpl +import arcs.core.host.HandleManager import arcs.core.storage.keys.RamDiskStorageKey import arcs.core.storage.referencemode.ReferenceModeStorageKey import arcs.core.util.TaggedLog @@ -26,12 +27,12 @@ import kotlinx.coroutines.withContext /** * An implementation of [Allocator.PartitionSerialization] that stores partition information in an Arcs - * collection handle, created by the [HandleManagerImpl] provided at construction. The handle + * collection handle, created by the [HandleManager] provided at construction. The handle * will be created the first time any of the publicly exposed methods is called. */ @OptIn(ExperimentalCoroutinesApi::class) class CollectionHandlePartitionMap( - private val handleManagerImpl: HandleManagerImpl + private val handleManager: HandleManager ) : Allocator.PartitionSerialization { private val log = TaggedLog { "CollectionHandlePartitionMap" } @@ -39,7 +40,7 @@ class CollectionHandlePartitionMap( @Suppress("UNCHECKED_CAST") private val collection = SuspendableLazy { val entitySpec = EntityBaseSpec(SCHEMA) - val handle = handleManagerImpl.createHandle( + val handle = handleManager.createHandle( HandleSpec( "partitions", HandleMode.ReadWrite, @@ -52,8 +53,9 @@ class CollectionHandlePartitionMap( } /** Persists [ArcId] and associated [Plan.Partition]s */ - override suspend fun set(arcId: ArcId, partitions: List) { - log.debug { "writePartitionMap(arcId=$arcId)" } + override suspend fun set(partitions: List) { + val arcId = partitions.arcId() + log.debug { "writePartitionMap(arcId=${partitions.arcId()})" } val writes = withContext(collection().dispatcher) { partitions.map { (_, arcHost, particles) -> @@ -72,16 +74,6 @@ class CollectionHandlePartitionMap( writes.joinAll() } - /** Converts a [RawEntity] to a [Plan.Partition] */ - private fun entityToPartition(entity: EntityBase): Plan.Partition = - Plan.Partition( - entity.getSingletonValue("arc") as String, - entity.getSingletonValue("host") as String, - entity.getCollectionValue("particles").map { - Plan.Particle(it as String, "", mapOf()) - } - ) - /** Reads associated [PlanPartition]s with an [ArcId]. */ override suspend fun readPartitions(arcId: ArcId): List = entitiesForArc(arcId).map { entityToPartition(it) } @@ -99,6 +91,16 @@ class CollectionHandlePartitionMap( return entities.map { entityToPartition(it) } } + /** Converts a [RawEntity] to a [Plan.Partition] */ + private fun entityToPartition(entity: EntityBase): Plan.Partition = + Plan.Partition( + entity.getSingletonValue("arc") as String, + entity.getSingletonValue("host") as String, + entity.getCollectionValue("particles").map { + Plan.Particle(it as String, "", mapOf()) + } + ) + /** Looks up [RawEntity]s representing [PlanPartition]s for a given [ArcId] */ private suspend fun entitiesForArc(arcId: ArcId): List { return withContext(collection().dispatcher) { @@ -126,5 +128,9 @@ class CollectionHandlePartitionMap( RamDiskStorageKey("partition"), RamDiskStorageKey("partitions") ) + + fun List.arcId(): ArcId { + return this.first().arcId.toArcId() + } } } diff --git a/java/arcs/core/data/Plan.kt b/java/arcs/core/data/Plan.kt index 9fc5e0682b1..41c90a17376 100644 --- a/java/arcs/core/data/Plan.kt +++ b/java/arcs/core/data/Plan.kt @@ -126,4 +126,8 @@ data class Plan( Plan(particles = f, handles = t.handles, annotations = t.annotations) } } + + fun List.arcId(): String { + return this.first().arcId + } } diff --git a/javatests/arcs/core/allocator/BUILD b/javatests/arcs/core/allocator/BUILD index c8757539345..4f070cbdef7 100644 --- a/javatests/arcs/core/allocator/BUILD +++ b/javatests/arcs/core/allocator/BUILD @@ -14,8 +14,19 @@ arcs_kt_jvm_test_suite( package = "arcs.core.allocator", deps = [ ":allocator-test-util", + "//java/arcs/core/allocator", + "//java/arcs/core/common", + "//java/arcs/core/data", + "//java/arcs/core/entity", + "//java/arcs/core/host", + "//java/arcs/core/storage", + "//java/arcs/core/util", + "//java/arcs/core/util/testutil", "//third_party/java/junit:junit-android", + "//third_party/java/truth:truth-android", + "//third_party/kotlin/kotlin:kotlin_test", "//third_party/kotlin/kotlinx_coroutines", + "//third_party/kotlin/kotlinx_coroutines:kotlinx_coroutines_test", ], ) diff --git a/javatests/arcs/core/allocator/CollectionHandlePartitionMapTest.kt b/javatests/arcs/core/allocator/CollectionHandlePartitionMapTest.kt new file mode 100644 index 00000000000..f944503f511 --- /dev/null +++ b/javatests/arcs/core/allocator/CollectionHandlePartitionMapTest.kt @@ -0,0 +1,299 @@ +package arcs.core.allocator + +import arcs.core.common.toArcId +import arcs.core.data.Capability +import arcs.core.data.HandleMode +import arcs.core.data.Plan +import arcs.core.data.Schema +import arcs.core.entity.CollectionDelta +import arcs.core.entity.Entity +import arcs.core.entity.EntityBase +import arcs.core.entity.EntitySpec +import arcs.core.entity.Handle +import arcs.core.entity.HandleSpec +import arcs.core.entity.ReadWriteCollectionHandle +import arcs.core.entity.Reference +import arcs.core.host.HandleManager +import arcs.core.storage.StorageKey +import arcs.core.storage.StorageProxy +import arcs.core.util.Log +import arcs.core.util.Scheduler +import arcs.core.util.testutil.LogRule +import com.google.common.truth.Truth.assertThat +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.GlobalScope.coroutineContext +import kotlinx.coroutines.Job +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineDispatcher +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 + +@RunWith(JUnit4::class) +@OptIn(ExperimentalCoroutinesApi::class) +@Suppress("UnsafeCoroutineCrossing") +class CollectionHandlePartitionMapTest { + @get:Rule + val log = LogRule(Log.Level.Warning) + + private val testDispatcher = TestCoroutineDispatcher() + + private lateinit var testScope: TestCoroutineScope + private lateinit var fakeHandleManager: FakeHandleManager + private lateinit var fakeHandle: FakeHandle + private lateinit var entityList: MutableList + + private lateinit var map: CollectionHandlePartitionMap + + @Before + fun setup() { + testScope = TestCoroutineScope(testDispatcher) + testScope.launch { + fakeHandleManager = FakeHandleManager() + fakeHandle = FakeHandle() + map = CollectionHandlePartitionMap(fakeHandleManager) + entityList = mutableListOf() + } + } + + @Test + fun set_then_read() = testScope.runBlockingTest { + val arcId = "testArcId".toArcId() + + val particleName1 = "testParticle1" + val particle1 = Plan.Particle( + particleName = particleName1, + location = "", + handles = emptyMap() + ) + val particleName2 = "testParticle2" + val particle2 = particle1.copy(particleName = particleName2) + val particleName3 = "testParticle3" + val particle3 = particle1.copy(particleName = particleName3) + + val arcHost1 = "testArcHost1" + val arcHost2 = "testArcHost2" + val partition1 = Plan.Partition( + arcId = arcId.toString(), + arcHost = arcHost1, + particles = listOf(particle1) + ) + val partition2 = partition1.copy( + arcHost = arcHost2, + particles = listOf(particle2, particle3) + ) + + assertThat(entityList.size).isEqualTo(0) + map.set(listOf(partition1, partition2)) + assertThat(entityList.size).isEqualTo(2) + val entity1 = entityList[0] + assertThat(entity1.getSingletonValue("arc")).isEqualTo(arcId.toString()) + assertThat(entity1.getSingletonValue("host")).isEqualTo(arcHost1) + assertThat(entity1.getCollectionValue("particles")).containsExactly(particleName1) + val entity2 = entityList[1] + assertThat(entity2.getSingletonValue("arc")).isEqualTo(arcId.toString()) + assertThat(entity2.getSingletonValue("host")).isEqualTo(arcHost2) + assertThat(entity2.getCollectionValue("particles")) + .containsExactly(particleName2, particleName3) + + val partitionList = map.readPartitions(arcId) + assertThat(partitionList.size).isEqualTo(2) + assertThat(partitionList).containsExactly( + Plan.Partition( + arcId = arcId.toString(), + arcHost = arcHost1, + listOf( + Plan.Particle( + particleName = particleName1, + location = "", + handles = mapOf() + ) + ) + ), + Plan.Partition( + arcId = arcId.toString(), + arcHost = arcHost2, + listOf( + Plan.Particle( + particleName = particleName2, + location = "", + handles = mapOf() + ), + Plan.Particle( + particleName = particleName3, + location = "", + handles = mapOf() + ) + ) + ) + ) + } + + @Test + fun set_then_clear() = testScope.runBlockingTest { + val arcId = "testArcId".toArcId() + val particleName = "testParticle" + val particle = Plan.Particle( + particleName = particleName, + location = "", + handles = emptyMap() + ) + + val arcHost = "testArcHost" + val partition = Plan.Partition( + arcId = arcId.toString(), + arcHost = arcHost, + particles = listOf(particle) + ) + + assertThat(entityList.size).isEqualTo(0) + map.set(listOf(partition)) + assertThat(entityList.size).isEqualTo(1) + + var partitionList = map.readAndClearPartitions(arcId) + assertThat(partitionList.size).isEqualTo(1) + assertThat(partitionList).containsExactly( + Plan.Partition( + arcId = arcId.toString(), + arcHost = arcHost, + listOf( + Plan.Particle( + particleName = particleName, + location = "", + handles = mapOf() + ) + ) + ) + ) + + partitionList = map.readAndClearPartitions(arcId) + assertThat(partitionList.size).isEqualTo(0) + } + + inner class FakeHandle : ReadWriteCollectionHandle { + override val dispatcher: CoroutineDispatcher + get() = testDispatcher + override val name: String + get() = TODO("Not yet implemented") + override val mode: HandleMode + get() = TODO("Not yet implemented") + + override fun onReady(action: () -> Unit) { + action.invoke() + } + + override fun close() { + TODO("Not yet implemented") + } + + override fun registerForStorageEvents(notify: (StorageProxy.StorageEvent) -> Unit) { + TODO("Not yet implemented") + } + + override fun unregisterForStorageEvents() { + TODO("Not yet implemented") + } + + override fun maybeInitiateSync() { + TODO("Not yet implemented") + } + + override fun getProxy(): StorageProxy<*, *, *> { + TODO("Not yet implemented") + } + + override suspend fun createForeignReference( + spec: EntitySpec, + id: String + ): Reference? { + TODO("Not yet implemented") + } + + override fun onUpdate(action: (CollectionDelta) -> Unit) { + TODO("Not yet implemented") + } + + override fun onDesync(action: () -> Unit) { + TODO("Not yet implemented") + } + + override fun onResync(action: () -> Unit) { + TODO("Not yet implemented") + } + + override suspend fun createReference(entity: E): Reference { + TODO("Not yet implemented") + } + + override fun size(): Int { + TODO("Not yet implemented") + } + + override fun isEmpty(): Boolean { + TODO("Not yet implemented") + } + + override fun fetchAll(): Set { + return entityList.toSet() + } + + override fun fetchById(entityId: String): EntityBase? { + TODO("Not yet implemented") + } + + override fun store(element: EntityBase): Job { + return testScope.launch { + entityList.add(element) + } + } + + override fun storeAll(elements: Collection): Job { + TODO("Not yet implemented") + } + + override fun clear(): Job { + TODO("Not yet implemented") + } + + override fun remove(element: EntityBase): Job { + return testScope.launch { + entityList.remove(element) + } + } + + override fun removeById(id: String): Job { + TODO("Not yet implemented") + } + } + + inner class FakeHandleManager : HandleManager { + + override suspend fun createHandle( + spec: HandleSpec, + storageKey: StorageKey, + ttl: Capability.Ttl, + particleId: String, + immediateSync: Boolean, + storeSchema: Schema? + ): Handle { + return fakeHandle + } + + override fun scheduler(): Scheduler { + return Scheduler(coroutineContext) + } + + override suspend fun close() { + TODO("Not yet implemented") + } + + override suspend fun allStorageProxies(): List> { + return emptyList() + } + } +}