diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/ArrayBasedDynamicDirectedGraph.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/ArrayBasedDynamicDirectedGraph.scala new file mode 100644 index 00000000..63735c81 --- /dev/null +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/ArrayBasedDynamicDirectedGraph.scala @@ -0,0 +1,216 @@ +package com.twitter.cassovary.graph + +import com.twitter.cassovary.graph.node.DynamicNode +import com.twitter.cassovary.graph.StoredGraphDir._ +import com.twitter.cassovary.util.FastUtilUtils +import it.unimi.dsi.fastutil.ints.IntArrayList +import scala.collection.mutable.ArrayBuffer + +/** + * A dynamic directed graph, implemented using an ArrayBuffer of IntArrayList + * (from the fastutil library). If n nodes are used, O(n) objects are created, + * independent of the number of edges. + * + * For efficiency, it's recommended that ids be sequentially numbered, as + * an internal array is stored which is longer than the maximum id seen. + * + * This class is not threadsafe. If multiple thread access it simultaneously + * and at least one mutates it, synchronization should be used. Currently + * no new threads are created by this class (all operations execute on a single thread). + */ +class ArrayBasedDynamicDirectedGraph(val storedGraphDir: StoredGraphDir) + extends DynamicDirectedGraph { + // outboundLists(id) contains the outbound neighbors of the given id, + // or null if the id is not in this graph. + // If we aren't storing outbound neighbors, outboundLists will always remain size 0. + private val outboundLists = new ArrayBuffer[IntArrayList] + // (See above note on outboundLists) + private val inboundLists = new ArrayBuffer[IntArrayList] + + private var _nodeCount = 0 + + def this(dataIterator: Iterator[NodeIdEdgesMaxId], + storedGraphDir: StoredGraphDir) { + this(storedGraphDir) + for (nodeData <- dataIterator) { + val id = nodeData.id + getOrCreateNode(id) + nodeData.edges map getOrCreateNode + storedGraphDir match { + case OnlyOut | Mutual => outboundLists(id).addAll(IntArrayList.wrap(nodeData.edges)) + case OnlyIn => inboundLists(id).addAll(IntArrayList.wrap(nodeData.edges)) + case BothInOut => nodeData.edges map { addEdgeAllowingDuplicates(id, _) } // Duplicates shouldn't exist, but allow them for efficiency. + } + } + } + + def this(iteratorSeq: Seq[() => Iterator[NodeIdEdgesMaxId]], + storedGraphDir: StoredGraphDir) { + this((iteratorSeq.view flatMap { _()}).iterator, storedGraphDir) + } + + /* Returns an option which is non-empty if outbound list for id is non-null. */ + private def outboundListOption(id: Int): Option[IntArrayList] = + if (id < 0 || id >= outboundLists.size) + None + else + Option(outboundLists(id)) // If list is null, convert to None + + private def inboundListOption(id: Int): Option[IntArrayList] = + if (id < 0 || id >= inboundLists.size) + None + else + Option(inboundLists(id)) // If list is null, convert to None + + /** Wraps outboundList and inboundList in a node structure. This is an inner class + * because mutations will affect other nodes of the graph if both inbound + * and outbound neighbor lists are stored. + * + * For efficiency, we don't store Nodes, but create them on the fly as needed. + */ + class IntListNode(override val id: Int, + val outboundList: Option[Seq[Int]], + val inboundList: Option[Seq[Int]]) + extends DynamicNode { + override def outboundNodes(): Seq[Int] = outboundList.getOrElse(Seq.empty) + + override def inboundNodes(): Seq[Int] = storedGraphDir match { + case Mutual => outboundNodes() + case _ => inboundList.getOrElse(Seq.empty) + } + + def addOutBoundNodes(nodeIds: Seq[Int]): Unit = { + //For future optimization, we could check if we are only storing outbound + // nodes and then do: + // outboundList.get.addAll(IntArrayList.wrap(nodeIds.toArray)) + nodeIds map { addEdge(id, _) } + } + + def addInBoundNodes(nodeIds: Seq[Int]): Unit = { + nodeIds map { addEdge(_, id) } + } + + def removeOutBoundNode(nodeId: Int): Unit = removeEdge(id, nodeId) + + def removeInBoundNode(nodeId: Int): Unit = removeEdge(nodeId, id) + } + + /** + * Determine if a node with the given id exists in this graph. + */ + def nodeExists(id: Int): Boolean = + outboundListOption(id).nonEmpty || inboundListOption(id).nonEmpty + + /** + * Returns the node with the given {@code id} or else {@code None} if the given node does not + * exist in this graph. + */ + override def getNodeById(id: Int): Option[DynamicNode] = + if (nodeExists(id)) { + Some(new IntListNode(id, + outboundListOption(id) map FastUtilUtils.intArrayListToSeq, + inboundListOption(id) map FastUtilUtils.intArrayListToSeq)) + } else { + None + } + + override def iterator: Iterator[Node] = (0 until maxIdBound).iterator flatMap getNodeById + + /** + * Returns the total number of directed edges in the graph. A mutual edge, eg: A -> B and B -> A, + * counts as 2 edges in this total. + */ + override def edgeCount: Long = + if (StoredGraphDir.isOutDirStored(storedGraphDir)) + (iterator map {_.outboundCount }).sum + else + (iterator map {_.inboundCount }).sum + + /** + * Returns the number of nodes in the graph. + */ + override def nodeCount: Int = _nodeCount + // Or this can be computed as (0 until maxIdBound) count nodeExists + + /** + * Remove an edge from a {@code srdId} to {@code destId}. + * Return Option of source and destination nodes. None indicates the node doesn't exist in graph. + */ + override def removeEdge(srcId: Int, destId: Int): (Option[Node], Option[Node]) = { + outboundListOption(srcId) map {list => list.rem(destId)} + inboundListOption(destId) map {list => list.rem(srcId)} + if (storedGraphDir == Mutual) { + outboundListOption(destId) map {list => list.rem(srcId)} + } + (getNodeById(srcId), getNodeById(destId)) + } + + /** + * Add an edge from {@code srcId} to {@code destId}. + * If the edge already exists, nothing will change. This takes time proportional + * to out-degree(srcId) + in-degree(destId). + */ + override def addEdge(srcId: Int, destId: Int): Unit = { + getOrCreateNode(srcId) + getOrCreateNode(destId) + def addIfMissing(list: IntArrayList, k: Int): Unit = + if (!list.contains(k)) + list.add(k) + outboundListOption(srcId) map { addIfMissing(_, destId) } + inboundListOption(destId) map { addIfMissing(_, srcId) } + if (storedGraphDir == StoredGraphDir.Mutual) { + // Add mutual edge also + outboundListOption(destId) map { addIfMissing(_, srcId) } + } + } + + /** + * Add an edge from {@code srcId} to {@code destId}. + * If the edge already exists, this will create a parallel edge. This runs in + * constant amortized time. + */ + def addEdgeAllowingDuplicates(srcId: Int, destId: Int): Unit = { + getOrCreateNode(srcId) + getOrCreateNode(destId) + + outboundListOption(srcId) map { _.add(destId) } + inboundListOption(destId) map { _.add(srcId) } + if (storedGraphDir == StoredGraphDir.Mutual) { + // Add mutual edge also + outboundListOption(destId) map { _.add(srcId) } + } + } + + /** + * Add a node {@code id} into the graph. + */ + override def getOrCreateNode(id: Int): DynamicNode = { + def addIdToList(list: ArrayBuffer[IntArrayList]) { + if (list.size <= id) { + list.appendAll(Array.fill(id - list.size + 1 )(null)) + } + list(id) = new IntArrayList(initialNeighborListCapacity) + } + + if (!nodeExists(id)) { + _nodeCount += 1 + if (StoredGraphDir.isOutDirStored(storedGraphDir)) { + addIdToList(outboundLists) + } + if (StoredGraphDir.isInDirStored(storedGraphDir) && + storedGraphDir != StoredGraphDir.Mutual) { + addIdToList(inboundLists) + } + } + + getNodeById(id).get + } + + + private def maxIdBound: Int = math.max(outboundLists.size, + inboundLists.size) + + // This can be overridden if a user knows there will be many nodes with no + // neighbors, or if most nodes will have many more than 4 neighbors + protected def initialNeighborListCapacity: Int = 4 +} diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/util/FastUtilUtils.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/util/FastUtilUtils.scala index ff35195b..b5374505 100644 --- a/cassovary-core/src/main/scala/com/twitter/cassovary/util/FastUtilUtils.scala +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/util/FastUtilUtils.scala @@ -16,7 +16,7 @@ package com.twitter.cassovary.util import java.{util => jutil} -import it.unimi.dsi.fastutil.ints.{Int2BooleanOpenHashMap, Int2IntMap, Int2IntOpenHashMap} +import it.unimi.dsi.fastutil.ints.{IntArrayList, Int2BooleanOpenHashMap, Int2IntMap, Int2IntOpenHashMap} import it.unimi.dsi.fastutil.objects.Object2IntMap import scala.collection.JavaConversions._ @@ -61,4 +61,9 @@ object FastUtilUtils { def newInt2IntOpenHashMap(): mutable.Map[Int, Int] = new Int2IntOpenHashMap().asInstanceOf[jutil.Map[Int, Int]] + + def intArrayListToSeq(list: IntArrayList): Seq[Int] = new IndexedSeq[Int] { + override def apply(idx: Int): Int = list.getInt(idx) + override def length: Int = list.size + } } diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/GraphReader.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/GraphReader.scala index 81053b93..a3af0de7 100644 --- a/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/GraphReader.scala +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/GraphReader.scala @@ -74,4 +74,10 @@ trait GraphReader[T] { SharedArrayBasedDirectedGraph(iteratorSeq, executorService, storedGraphDir, numShards) } + /** + * Create an `ArrayBasedDynamicDirectedGraph` + */ + def toArrayBasedDynamicDirectedGraph() = { + new ArrayBasedDynamicDirectedGraph(iteratorSeq, storedGraphDir) + } } diff --git a/cassovary-core/src/test/scala/com/twitter/cassovary/graph/ArrayBasedDynamicDirectedGraphSpec.scala b/cassovary-core/src/test/scala/com/twitter/cassovary/graph/ArrayBasedDynamicDirectedGraphSpec.scala new file mode 100644 index 00000000..8fdb1f12 --- /dev/null +++ b/cassovary-core/src/test/scala/com/twitter/cassovary/graph/ArrayBasedDynamicDirectedGraphSpec.scala @@ -0,0 +1,84 @@ +package com.twitter.cassovary.graph + +import org.scalatest.WordSpec +import com.twitter.cassovary.graph.StoredGraphDir._ +import org.scalatest.matchers.ShouldMatchers + +class ArrayBasedDynamicDirectedGraphSpec extends WordSpec with ShouldMatchers with GraphBehaviours { + val graphDirections = List(StoredGraphDir.OnlyIn, StoredGraphDir.OnlyOut, StoredGraphDir.BothInOut, + StoredGraphDir.Mutual) // Bipartitie is not supported + + def builder(iteratorFunc: () => Iterator[NodeIdEdgesMaxId], + storedGraphDir: StoredGraphDir) = + new ArrayBasedDynamicDirectedGraph(iteratorFunc(), storedGraphDir) + verifyGraphBuilding(builder, sampleGraphEdges) + + "An ArrayBasedDynamicDirectedGraph" should { + "support adding nodes" in { + val graph = new ArrayBasedDynamicDirectedGraph(StoredGraphDir.BothInOut) + for (i <- 0 until 3) { + graph.nodeCount shouldEqual i + graph.edgeCount shouldEqual 0 + graph.getOrCreateNode(10 * i) // non-contiguous + } + graph.getOrCreateNode(10) // Accessing again should increase node count + graph.nodeCount shouldEqual 3 + graph.nodeExists(1000000) shouldEqual false + } + + + "support adding edges" in { + for (dir <- graphDirections) { + val graph = new ArrayBasedDynamicDirectedGraph(dir) + val inStored = graph.isDirStored(GraphDir.InDir) + val outStored = graph.isDirStored(GraphDir.OutDir) + val notMutual = dir != StoredGraphDir.Mutual + + graph.addEdge(1, 2) + graph.addEdge(1, 2) // Test duplicate elimination + graph.edgeCount shouldEqual (if (notMutual) 1 else 2) + val node1 = graph.getNodeById(1).get + node1.inboundNodes.toList shouldEqual (if(notMutual) List() else List(2)) + node1.outboundNodes.toList shouldEqual (if(outStored) List(2) else List()) + val node2 = graph.getNodeById(2).get + node2.inboundNodes.toList shouldEqual (if(inStored) List(1) else List()) + node2.outboundNodes.toList shouldEqual (if(notMutual) List() else List(1)) + + // Test multi-edge + graph.addEdgeAllowingDuplicates(1, 2) + graph.edgeCount shouldEqual (if (dir != StoredGraphDir.Mutual) 2 else 4) + node1.inboundNodes.toList shouldEqual (if(notMutual) List() else List(2, 2)) + node1.outboundNodes.toList shouldEqual (if(outStored) List(2, 2) else List()) + node2.inboundNodes.toList shouldEqual (if(inStored) List(1, 1) else List()) + node2.outboundNodes.toList shouldEqual (if(notMutual) List() else List(1, 1)) + + graph.addEdge(2, 1) + graph.edgeCount shouldEqual (if (dir != StoredGraphDir.Mutual) 3 else 4) + } + } + + "support deleting edges" in { + for (dir <- graphDirections) { + val graph = new ArrayBasedDynamicDirectedGraph(dir) + val inStored = graph.isDirStored(GraphDir.InDir) + val outStored = graph.isDirStored(GraphDir.OutDir) + val notMutual = dir != StoredGraphDir.Mutual + + graph.addEdge(1, 2) + graph.addEdge(1, 3) + graph.removeEdge(1, 2) + graph.edgeCount shouldEqual (if (notMutual) 1 else 2) + graph.nodeCount shouldEqual 3 // This is debatable but reasonable. + val node1 = graph.getNodeById(1).get + node1.inboundNodes.toList shouldEqual (if(notMutual) List() else List(3)) + node1.outboundNodes.toList shouldEqual (if(outStored) List(3) else List()) + val node2 = graph.getNodeById(2).get + node2.inboundNodes.toList shouldEqual List() + node2.outboundNodes.toList shouldEqual List() + val node3 = graph.getNodeById(3).get + node3.inboundNodes.toList shouldEqual (if(inStored) List(1) else List()) + node3.outboundNodes.toList shouldEqual (if(notMutual) List() else List(1)) + } + } + } +}