-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Array Based Dynamic Graph #135
Changes from all commits
2731265
2de7e96
06a2b73
1ee4c8a
3d78754
e4f38ba
ab0fef0
7e32ac9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,216 @@ | ||
package com.twitter.cassovary.graph | ||
|
||
import com.twitter.cassovary.graph.node.DynamicNode | ||
import com.twitter.cassovary.graph.StoredGraphDir._ | ||
import com.twitter.cassovary.util.FastUtilUtils | ||
import it.unimi.dsi.fastutil.ints.IntArrayList | ||
import scala.collection.mutable.ArrayBuffer | ||
|
||
/** | ||
* A dynamic directed graph, implemented using an ArrayBuffer of IntArrayList | ||
* (from the fastutil library). If n nodes are used, O(n) objects are created, | ||
* independent of the number of edges. | ||
* | ||
* For efficiency, it's recommended that ids be sequentially numbered, as | ||
* an internal array is stored which is longer than the maximum id seen. | ||
* | ||
* This class is not threadsafe. If multiple thread access it simultaneously | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps also add for clarity that all operations are done in a single thread. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. I may add threads in the future if the performance seems important in practice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. at least in ArrayBasedDirectedGraph, single thread performance was abysmal. From our past experience, reading graph is a time consuming operation and so it might pay to do it in multiple threads |
||
* and at least one mutates it, synchronization should be used. Currently | ||
* no new threads are created by this class (all operations execute on a single thread). | ||
*/ | ||
class ArrayBasedDynamicDirectedGraph(val storedGraphDir: StoredGraphDir) | ||
extends DynamicDirectedGraph { | ||
// outboundLists(id) contains the outbound neighbors of the given id, | ||
// or null if the id is not in this graph. | ||
// If we aren't storing outbound neighbors, outboundLists will always remain size 0. | ||
private val outboundLists = new ArrayBuffer[IntArrayList] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can also consider using http://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/ObjectArrayList.html instead ArrayBuffer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think it is more efficient than ArrayBuffer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No idea. |
||
// (See above note on outboundLists) | ||
private val inboundLists = new ArrayBuffer[IntArrayList] | ||
|
||
private var _nodeCount = 0 | ||
|
||
def this(dataIterator: Iterator[NodeIdEdgesMaxId], | ||
storedGraphDir: StoredGraphDir) { | ||
this(storedGraphDir) | ||
for (nodeData <- dataIterator) { | ||
val id = nodeData.id | ||
getOrCreateNode(id) | ||
nodeData.edges map getOrCreateNode | ||
storedGraphDir match { | ||
case OnlyOut | Mutual => outboundLists(id).addAll(IntArrayList.wrap(nodeData.edges)) | ||
case OnlyIn => inboundLists(id).addAll(IntArrayList.wrap(nodeData.edges)) | ||
case BothInOut => nodeData.edges map { addEdgeAllowingDuplicates(id, _) } // Duplicates shouldn't exist, but allow them for efficiency. | ||
} | ||
} | ||
} | ||
|
||
def this(iteratorSeq: Seq[() => Iterator[NodeIdEdgesMaxId]], | ||
storedGraphDir: StoredGraphDir) { | ||
this((iteratorSeq.view flatMap { _()}).iterator, storedGraphDir) | ||
} | ||
|
||
/* Returns an option which is non-empty if outbound list for id is non-null. */ | ||
private def outboundListOption(id: Int): Option[IntArrayList] = | ||
if (id < 0 || id >= outboundLists.size) | ||
None | ||
else | ||
Option(outboundLists(id)) // If list is null, convert to None | ||
|
||
private def inboundListOption(id: Int): Option[IntArrayList] = | ||
if (id < 0 || id >= inboundLists.size) | ||
None | ||
else | ||
Option(inboundLists(id)) // If list is null, convert to None | ||
|
||
/** Wraps outboundList and inboundList in a node structure. This is an inner class | ||
* because mutations will affect other nodes of the graph if both inbound | ||
* and outbound neighbor lists are stored. | ||
* | ||
* For efficiency, we don't store Nodes, but create them on the fly as needed. | ||
*/ | ||
class IntListNode(override val id: Int, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the recommended way to do what you are doing IIUC (to expose only a DynamicNode type to the outside world) is to declare a private object like this: private object IntListNode { def apply(id: Int, outboundList: ..., inboundList: ...) = new DynamicNode { There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This wasn't that clear, but I meant to say that it is an inner class because it needs a reference to this (the graph) in the mutator methods (they call addEdge on the enclosing graph). If I change it to a static class as you're suggesting, I would need to pass in a reference to this (which I'm happy with). Why would your alternative avoid storing outboutList and inboundList? Wouldn't they still be stored in a closure somewhere? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here you always pay for the storage of the two option classes which won't be the case otherwise. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want to avoid the storage of one of the two option classes, I think we would need to not access it inside the relevant closure, which would make the code awkward (I'm not sure how to do that without essentially writing two different Node classes). More precicely, in your code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right. We will need to avoid the references inside the {...}. At the minimum, you can do upfront the logic of converting to Seq like: and then use 'i' or 'o' inside the new DynamicNode {...} . Will this be better? Might not be a big saving regardless, so up to you. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doing the conversion to Seq outside IntListNode is actually a bit cleaner, so I did that. |
||
val outboundList: Option[Seq[Int]], | ||
val inboundList: Option[Seq[Int]]) | ||
extends DynamicNode { | ||
override def outboundNodes(): Seq[Int] = outboundList.getOrElse(Seq.empty) | ||
|
||
override def inboundNodes(): Seq[Int] = storedGraphDir match { | ||
case Mutual => outboundNodes() | ||
case _ => inboundList.getOrElse(Seq.empty) | ||
} | ||
|
||
def addOutBoundNodes(nodeIds: Seq[Int]): Unit = { | ||
//For future optimization, we could check if we are only storing outbound | ||
// nodes and then do: | ||
// outboundList.get.addAll(IntArrayList.wrap(nodeIds.toArray)) | ||
nodeIds map { addEdge(id, _) } | ||
} | ||
|
||
def addInBoundNodes(nodeIds: Seq[Int]): Unit = { | ||
nodeIds map { addEdge(_, id) } | ||
} | ||
|
||
def removeOutBoundNode(nodeId: Int): Unit = removeEdge(id, nodeId) | ||
|
||
def removeInBoundNode(nodeId: Int): Unit = removeEdge(nodeId, id) | ||
} | ||
|
||
/** | ||
* Determine if a node with the given id exists in this graph. | ||
*/ | ||
def nodeExists(id: Int): Boolean = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Throws NPE when id > outboundLists.length There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think outboundListOption already handles the case when id > outboundLists.length. I just added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right. |
||
outboundListOption(id).nonEmpty || inboundListOption(id).nonEmpty | ||
|
||
/** | ||
* Returns the node with the given {@code id} or else {@code None} if the given node does not | ||
* exist in this graph. | ||
*/ | ||
override def getNodeById(id: Int): Option[DynamicNode] = | ||
if (nodeExists(id)) { | ||
Some(new IntListNode(id, | ||
outboundListOption(id) map FastUtilUtils.intArrayListToSeq, | ||
inboundListOption(id) map FastUtilUtils.intArrayListToSeq)) | ||
} else { | ||
None | ||
} | ||
|
||
override def iterator: Iterator[Node] = (0 until maxIdBound).iterator flatMap getNodeById | ||
|
||
/** | ||
* Returns the total number of directed edges in the graph. A mutual edge, eg: A -> B and B -> A, | ||
* counts as 2 edges in this total. | ||
*/ | ||
override def edgeCount: Long = | ||
if (StoredGraphDir.isOutDirStored(storedGraphDir)) | ||
(iterator map {_.outboundCount }).sum | ||
else | ||
(iterator map {_.inboundCount }).sum | ||
|
||
/** | ||
* Returns the number of nodes in the graph. | ||
*/ | ||
override def nodeCount: Int = _nodeCount | ||
// Or this can be computed as (0 until maxIdBound) count nodeExists | ||
|
||
/** | ||
* Remove an edge from a {@code srdId} to {@code destId}. | ||
* Return Option of source and destination nodes. None indicates the node doesn't exist in graph. | ||
*/ | ||
override def removeEdge(srcId: Int, destId: Int): (Option[Node], Option[Node]) = { | ||
outboundListOption(srcId) map {list => list.rem(destId)} | ||
inboundListOption(destId) map {list => list.rem(srcId)} | ||
if (storedGraphDir == Mutual) { | ||
outboundListOption(destId) map {list => list.rem(srcId)} | ||
} | ||
(getNodeById(srcId), getNodeById(destId)) | ||
} | ||
|
||
/** | ||
* Add an edge from {@code srcId} to {@code destId}. | ||
* If the edge already exists, nothing will change. This takes time proportional | ||
* to out-degree(srcId) + in-degree(destId). | ||
*/ | ||
override def addEdge(srcId: Int, destId: Int): Unit = { | ||
getOrCreateNode(srcId) | ||
getOrCreateNode(destId) | ||
def addIfMissing(list: IntArrayList, k: Int): Unit = | ||
if (!list.contains(k)) | ||
list.add(k) | ||
outboundListOption(srcId) map { addIfMissing(_, destId) } | ||
inboundListOption(destId) map { addIfMissing(_, srcId) } | ||
if (storedGraphDir == StoredGraphDir.Mutual) { | ||
// Add mutual edge also | ||
outboundListOption(destId) map { addIfMissing(_, srcId) } | ||
} | ||
} | ||
|
||
/** | ||
* Add an edge from {@code srcId} to {@code destId}. | ||
* If the edge already exists, this will create a parallel edge. This runs in | ||
* constant amortized time. | ||
*/ | ||
def addEdgeAllowingDuplicates(srcId: Int, destId: Int): Unit = { | ||
getOrCreateNode(srcId) | ||
getOrCreateNode(destId) | ||
|
||
outboundListOption(srcId) map { _.add(destId) } | ||
inboundListOption(destId) map { _.add(srcId) } | ||
if (storedGraphDir == StoredGraphDir.Mutual) { | ||
// Add mutual edge also | ||
outboundListOption(destId) map { _.add(srcId) } | ||
} | ||
} | ||
|
||
/** | ||
* Add a node {@code id} into the graph. | ||
*/ | ||
override def getOrCreateNode(id: Int): DynamicNode = { | ||
def addIdToList(list: ArrayBuffer[IntArrayList]) { | ||
if (list.size <= id) { | ||
list.appendAll(Array.fill(id - list.size + 1 )(null)) | ||
} | ||
list(id) = new IntArrayList(initialNeighborListCapacity) | ||
} | ||
|
||
if (!nodeExists(id)) { | ||
_nodeCount += 1 | ||
if (StoredGraphDir.isOutDirStored(storedGraphDir)) { | ||
addIdToList(outboundLists) | ||
} | ||
if (StoredGraphDir.isInDirStored(storedGraphDir) && | ||
storedGraphDir != StoredGraphDir.Mutual) { | ||
addIdToList(inboundLists) | ||
} | ||
} | ||
|
||
getNodeById(id).get | ||
} | ||
|
||
|
||
private def maxIdBound: Int = math.max(outboundLists.size, | ||
inboundLists.size) | ||
|
||
// This can be overridden if a user knows there will be many nodes with no | ||
// neighbors, or if most nodes will have many more than 4 neighbors | ||
protected def initialNeighborListCapacity: Int = 4 | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,7 @@ package com.twitter.cassovary.util | |
|
||
import java.{util => jutil} | ||
|
||
import it.unimi.dsi.fastutil.ints.{Int2BooleanOpenHashMap, Int2IntMap, Int2IntOpenHashMap} | ||
import it.unimi.dsi.fastutil.ints.{IntArrayList, Int2BooleanOpenHashMap, Int2IntMap, Int2IntOpenHashMap} | ||
import it.unimi.dsi.fastutil.objects.Object2IntMap | ||
|
||
import scala.collection.JavaConversions._ | ||
|
@@ -61,4 +61,9 @@ object FastUtilUtils { | |
|
||
def newInt2IntOpenHashMap(): mutable.Map[Int, Int] = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have a look here. I'm creating fastutils collection that is only an implementation of scala's built-in collection (mutable.Map). If you use something like this, you can have
IMO, it's more convenient to hide fastutils api and use scala's that is easier in use (ex. map, find and so on), esp. in tests. This works using the fact that IntArrayList implements java.util.List that can be converted to scala.collection.mutable.Seq using scala.collection.JavaConversions._. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that's easier to use IndexedSeq, but I'm slightly afraid that if I used map, etc. on IndexedSeq I'll accidentally do an O(n) copy when I'm not expecting it. I also want to make sure the mutate operations don't accidentally transform my IntArrayLists into some other IndexedSeq implmentation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @szymonm in your suggestion the inner mutable.IndexedSeq will have unboxing/boxing, yes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I cannot answer, but you are right that wrapping the fastutils collection is expensive. See #142 |
||
new Int2IntOpenHashMap().asInstanceOf[jutil.Map[Int, Int]] | ||
|
||
def intArrayListToSeq(list: IntArrayList): Seq[Int] = new IndexedSeq[Int] { | ||
override def apply(idx: Int): Int = list.getInt(idx) | ||
override def length: Int = list.size | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
package com.twitter.cassovary.graph | ||
|
||
import org.scalatest.WordSpec | ||
import com.twitter.cassovary.graph.StoredGraphDir._ | ||
import org.scalatest.matchers.ShouldMatchers | ||
|
||
class ArrayBasedDynamicDirectedGraphSpec extends WordSpec with ShouldMatchers with GraphBehaviours { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See here for the structure of a test: To obtain nice class spec when running test, it should be sth like
|
||
val graphDirections = List(StoredGraphDir.OnlyIn, StoredGraphDir.OnlyOut, StoredGraphDir.BothInOut, | ||
StoredGraphDir.Mutual) // Bipartitie is not supported | ||
|
||
def builder(iteratorFunc: () => Iterator[NodeIdEdgesMaxId], | ||
storedGraphDir: StoredGraphDir) = | ||
new ArrayBasedDynamicDirectedGraph(iteratorFunc(), storedGraphDir) | ||
verifyGraphBuilding(builder, sampleGraphEdges) | ||
|
||
"An ArrayBasedDynamicDirectedGraph" should { | ||
"support adding nodes" in { | ||
val graph = new ArrayBasedDynamicDirectedGraph(StoredGraphDir.BothInOut) | ||
for (i <- 0 until 3) { | ||
graph.nodeCount shouldEqual i | ||
graph.edgeCount shouldEqual 0 | ||
graph.getOrCreateNode(10 * i) // non-contiguous | ||
} | ||
graph.getOrCreateNode(10) // Accessing again should increase node count | ||
graph.nodeCount shouldEqual 3 | ||
graph.nodeExists(1000000) shouldEqual false | ||
} | ||
|
||
|
||
"support adding edges" in { | ||
for (dir <- graphDirections) { | ||
val graph = new ArrayBasedDynamicDirectedGraph(dir) | ||
val inStored = graph.isDirStored(GraphDir.InDir) | ||
val outStored = graph.isDirStored(GraphDir.OutDir) | ||
val notMutual = dir != StoredGraphDir.Mutual | ||
|
||
graph.addEdge(1, 2) | ||
graph.addEdge(1, 2) // Test duplicate elimination | ||
graph.edgeCount shouldEqual (if (notMutual) 1 else 2) | ||
val node1 = graph.getNodeById(1).get | ||
node1.inboundNodes.toList shouldEqual (if(notMutual) List() else List(2)) | ||
node1.outboundNodes.toList shouldEqual (if(outStored) List(2) else List()) | ||
val node2 = graph.getNodeById(2).get | ||
node2.inboundNodes.toList shouldEqual (if(inStored) List(1) else List()) | ||
node2.outboundNodes.toList shouldEqual (if(notMutual) List() else List(1)) | ||
|
||
// Test multi-edge | ||
graph.addEdgeAllowingDuplicates(1, 2) | ||
graph.edgeCount shouldEqual (if (dir != StoredGraphDir.Mutual) 2 else 4) | ||
node1.inboundNodes.toList shouldEqual (if(notMutual) List() else List(2, 2)) | ||
node1.outboundNodes.toList shouldEqual (if(outStored) List(2, 2) else List()) | ||
node2.inboundNodes.toList shouldEqual (if(inStored) List(1, 1) else List()) | ||
node2.outboundNodes.toList shouldEqual (if(notMutual) List() else List(1, 1)) | ||
|
||
graph.addEdge(2, 1) | ||
graph.edgeCount shouldEqual (if (dir != StoredGraphDir.Mutual) 3 else 4) | ||
} | ||
} | ||
|
||
"support deleting edges" in { | ||
for (dir <- graphDirections) { | ||
val graph = new ArrayBasedDynamicDirectedGraph(dir) | ||
val inStored = graph.isDirStored(GraphDir.InDir) | ||
val outStored = graph.isDirStored(GraphDir.OutDir) | ||
val notMutual = dir != StoredGraphDir.Mutual | ||
|
||
graph.addEdge(1, 2) | ||
graph.addEdge(1, 3) | ||
graph.removeEdge(1, 2) | ||
graph.edgeCount shouldEqual (if (notMutual) 1 else 2) | ||
graph.nodeCount shouldEqual 3 // This is debatable but reasonable. | ||
val node1 = graph.getNodeById(1).get | ||
node1.inboundNodes.toList shouldEqual (if(notMutual) List() else List(3)) | ||
node1.outboundNodes.toList shouldEqual (if(outStored) List(3) else List()) | ||
val node2 = graph.getNodeById(2).get | ||
node2.inboundNodes.toList shouldEqual List() | ||
node2.outboundNodes.toList shouldEqual List() | ||
val node3 = graph.getNodeById(3).get | ||
node3.inboundNodes.toList shouldEqual (if(inStored) List(1) else List()) | ||
node3.outboundNodes.toList shouldEqual (if(notMutual) List() else List(1)) | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please arrange imports in alphabetical order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done