Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Array Based Dynamic Graph #135

Merged
merged 8 commits into from
Jan 29, 2015
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package com.twitter.cassovary.graph

import com.twitter.cassovary.graph.node.DynamicNode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please arrange imports in alphabetical order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

import it.unimi.dsi.fastutil.ints.IntArrayList
import scala.collection.mutable.ArrayBuffer
import StoredGraphDir._
import com.twitter.cassovary.util.FastUtilUtils

/**
* A dynamic directed graph, implemented using an ArrayBuffer of IntArrayList
* (from the fastutil library). If n nodes are used, O(n) objects are created,
* independent of the number of edges.
*
* For efficiency, it's recommended that ids be sequentially numbered, as
* an internal array is stored which is longer than the maximum id seen.
*
* This class is not threadsafe. If multiple thread access it simultaneously
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps also add for clarity that all operations are done in a single thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I may add threads in the future if the performance seems important in practice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least in ArrayBasedDirectedGraph, single thread performance was abysmal. From our past experience, reading graph is a time consuming operation and so it might pay to do it in multiple threads

* and at least one mutates it, synchronization should be used.
*/
class ArrayBasedDynamicDirectedGraph(override val storedGraphDir: StoredGraphDir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need 'override' specifier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it isn't needed, but I thought it was good style. I'll remove it.

extends DynamicDirectedGraph {
// outboundLists(id) contains the outbound neighbors of the given id,
// or null if the id is not in this graph.
// If we aren't storing outbound neighbors, outboundLists will always remain size 0.
private val outboundLists = new ArrayBuffer[IntArrayList]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it is more efficient than ArrayBuffer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea.

// (See above note on outboundLists)
private val inboundLists = new ArrayBuffer[IntArrayList]

private var nodeCount_ = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala doesn't use the _ convention too much.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better name for _nodeCount? (I just realized that http://docs.scala-lang.org/style/naming-conventions.html#accessorsmutators puts the underscore before the name).


def this(dataIterator: Iterator[NodeIdEdgesMaxId],
storedGraphDir: StoredGraphDir) {
this(storedGraphDir)
for (nodeData <- dataIterator) {
val id = nodeData.id
getOrCreateNode(id)
nodeData.edges map getOrCreateNode
storedGraphDir match {
case OnlyOut => outboundLists(id).addAll(IntArrayList.wrap(nodeData.edges))
case OnlyIn => inboundLists(id).addAll(IntArrayList.wrap(nodeData.edges))
case BothInOut => nodeData.edges map { addEdgeAllowingDuplicates(id, _) } // Duplicates shouldn't exist, but allow them for efficiency.
}
}
}

def this(iteratorSeq: Seq[() => Iterator[NodeIdEdgesMaxId]],
storedGraphDir: StoredGraphDir) {
this((iteratorSeq map { _()} ).iterator.flatten, storedGraphDir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use flatMap:
iteratorSeq flatMap { _() }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered that, but I think it would convert the iterators into a single Seq (reading all the data from all the iterators into memory), and I thought that might give worse performance than constructing the graph as we iterate over the input.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think none of the iterators should be called -- easy to test in repl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to just test it. If I execute the following script:

def makeIterator(name: String): Iterator[Int] = new Iterator[Int] {
  var current = 0
  def next(): Int =  {
    println("iterator " + name + " returning " + current)
    current += 1
    current - 1
  }
  def hasNext(): Boolean = (current < 5)
}
val iteratorSeq: Seq[() => Iterator[Int]] = Seq(
    () => makeIterator("one"),
    () => makeIterator("two"),
    () => makeIterator("three"))

val flatMapResult = iteratorSeq flatMap { _()}

Then it prints 15 lines of the form "iterator _ returning _"
Wheras if I replace the last line with
val moreAwkwardResult = (iteratorSeq map { _()} ).iterator.flatten
then it prints nothing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a clearer alternative to what I have is
iteratorSeq.view flatMap { _()}

}

/* Returns an option which is non-empty if outbound list for id is non-null. */
private def outboundListOption(id: Int): Option[IntArrayList] =
if (id < 0 || id >= outboundLists.size)
None
else
Option(outboundLists(id)) // If list is null, convert to None

private def inboundListOption(id: Int): Option[IntArrayList] =
if (id < 0 || id >= inboundLists.size)
None
else
Option(inboundLists(id)) // If list is null, convert to None

/** Wraps outboundList and inboundList in a node structure. This is an inner class
* because mutations will affect other nodes of the graph if both inbound
* and outbound neighbor lists are stored.
*
* For efficiency, we don't store Nodes, but create them on the fly as needed.
*/
class IntListNode(override val id: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the recommended way to do what you are doing IIUC (to expose only a DynamicNode type to the outside world) is to declare a private object like this:

private object IntListNode {

def apply(id: Int, outboundList: ..., inboundList: ...) = new DynamicNode {
....
}
Besides being cleaner, you are saving on not having to pay for storage of outboundList and inboundList in the created instance of IntListNode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't that clear, but I meant to say that it is an inner class because it needs a reference to this (the graph) in the mutator methods (they call addEdge on the enclosing graph). If I change it to a static class as you're suggesting, I would need to pass in a reference to this (which I'm happy with). Why would your alternative avoid storing outboutList and inboundList? Wouldn't they still be stored in a closure somewhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you always pay for the storage of the two option classes which won't be the case otherwise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to avoid the storage of one of the two option classes, I think we would need to not access it inside the relevant closure, which would make the code awkward (I'm not sure how to do that without essentially writing two different Node classes). More precicely, in your code
new DynamicNode { .... }
I assume the "...." contains references to both outboundList and inboundList, inless there are separate calls to new Dynamic Node {...} for each case. Since Nodes aren't being stored anyway, I'm inclinded to leave it as is unless you think it's important.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. We will need to avoid the references inside the {...}. At the minimum, you can do upfront the logic of converting to Seq like:
val o: Seq[Int] = outboundList.map{fastUtilUtils.intArrayListToSeq()) getOrElse(Seq.empty[Int])

and then use 'i' or 'o' inside the new DynamicNode {...} . Will this be better? Might not be a big saving regardless, so up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing the conversion to Seq outside IntListNode is actually a bit cleaner, so I did that.

val outboundList: Option[IntArrayList],
val inboundList: Option[IntArrayList])
extends DynamicNode {
override def outboundNodes(): Seq[Int] = outboundList match {
case Some(list) => FastUtilUtils.intArrayListToSeq(list)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bear in mind that random access (e.g., for a random walk) would first require traversal of the whole intArrayList to convert the list to a Seq rather than an O(1) direct access. What you want to do I think is to define a new wrapper class like this:
class IntArrayListWrapper(l: IntArrayList) extends IndexedSeq[Int] {
def apply(idx: Int): Int = l.getInt(idx)
def length: Int = l.size
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this is the read-only wrapper I can use.

case None => Seq.empty[Int]
}

override def inboundNodes(): Seq[Int] = inboundList match {
case Some(list) => FastUtilUtils.intArrayListToSeq(list)
case None => Seq.empty[Int]
}

def addOutBoundNodes(nodeIds: Seq[Int]): Unit = {
//For future optimization, we could check if we are only storing outbound
// nodes and then do:
// outboundList.get.addAll(IntArrayList.wrap(nodeIds.toArray))
nodeIds map { addEdge(id, _) }
}

def addInBoundNodes(nodeIds: Seq[Int]): Unit = {
nodeIds map { addEdge(_, id) }
}

def removeOutBoundNode(nodeId: Int): Unit = removeEdge(id, nodeId)

def removeInBoundNode(nodeId: Int): Unit = removeEdge(nodeId, id)
}

/**
* Determine if a node with the given id exists in this graph.
*/
def nodeExists(id: Int): Boolean =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throws NPE when id > outboundLists.length

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think outboundListOption already handles the case when id > outboundLists.length. I just added
graph.nodeExists(1000000) shouldEqual false
to one of the tests (where graph is a small graph), and it passed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right.

outboundListOption(id).nonEmpty || inboundListOption(id).nonEmpty

/**
* Returns the node with the given {@code id} or else {@code None} if the given node does not
* exist in this graph.
*/
override def getNodeById(id: Int): Option[DynamicNode] =
if (nodeExists(id)) {
Some(new IntListNode(id, outboundListOption(id), inboundListOption(id)))
} else {
None
}

override def iterator: Iterator[Node] = (0 until maxIdBound).iterator flatMap getNodeById

/**
* Returns the total number of directed edges in the graph. A mutual edge, eg: A -> B and B -> A,
* counts as 2 edges in this total.
*/
override def edgeCount: Long =
if (StoredGraphDir.isOutDirStored(storedGraphDir))
(iterator map {_.outboundCount }).sum
else
(iterator map {_.inboundCount }).sum

/**
* Returns the number of nodes in the graph.
*/
override def nodeCount: Int = nodeCount_
// Or this can be computed as (0 until maxIdBound) count nodeExists

/**
* Remove an edge from a {@code srdId} to {@code destId}.
* Return Option of source and destination nodes. None indicates the node doesn't exist in graph.
*/
override def removeEdge(srcId: Int, destId: Int): (Option[Node], Option[Node]) = {
outboundListOption(srcId) map {list => list.rem(destId)}
inboundListOption(destId) map {list => list.rem(srcId)}
(getNodeById(srcId), getNodeById(destId))
}

/**
* Add an edge from {@code srcId} to {@code destId}.
* If the edge already exists, nothing will change. This takes time proportional
* to out-degree(srcId) + in-degree(destId).
*/
override def addEdge(srcId: Int, destId: Int): Unit = {
getOrCreateNode(srcId)
getOrCreateNode(destId)
def addIfMissing(list: IntArrayList, k: Int): Unit =
if (!list.contains(k))
list.add(k)
outboundListOption(srcId) map { addIfMissing(_, destId) }
inboundListOption(destId) map { addIfMissing(_, srcId) }
}

/**
* Add an edge from {@code srcId} to {@code destId}.
* If the edge already exists, this will create a parallel edge. This runs in
* constant amortized time.
*/
def addEdgeAllowingDuplicates(srcId: Int, destId: Int): Unit = {
getOrCreateNode(srcId)
getOrCreateNode(destId)
def addIfMissing(list: IntArrayList, id: Int): Unit =
if (!list.contains(destId))
list.add(destId)
outboundListOption(srcId) map { _.add(destId) }
inboundListOption(destId) map { _.add(srcId) }
}

/**
* Add a node {@code id} into the graph.
*/
override def getOrCreateNode(id: Int): DynamicNode = {
def addIdToList(list: ArrayBuffer[IntArrayList]) {
if (list.size <= id) {
list.appendAll(Array.fill(id - list.size + 1 )(null))
}
list(id) = new IntArrayList(initialNeighborListCapacity)
}

if (!nodeExists(id)) {
nodeCount_ += 1
if (StoredGraphDir.isOutDirStored(storedGraphDir)) {
addIdToList(outboundLists)
}
if (StoredGraphDir.isInDirStored(storedGraphDir)) {
addIdToList(inboundLists)
}
}

getNodeById(id).get
}


private def maxIdBound: Int = math.max(outboundLists.size,
inboundLists.size)

// This can be overridden if a user knows there will be many nodes with no
// neighbors, or if most nodes will have many more than 4 neighbors
protected def initialNeighborListCapacity: Int = 4
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package com.twitter.cassovary.util

import java.{util => jutil}

import it.unimi.dsi.fastutil.ints.{Int2BooleanOpenHashMap, Int2IntMap, Int2IntOpenHashMap}
import it.unimi.dsi.fastutil.ints.{IntArrayList, Int2BooleanOpenHashMap, Int2IntMap, Int2IntOpenHashMap}
import it.unimi.dsi.fastutil.objects.Object2IntMap

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -61,4 +61,6 @@ object FastUtilUtils {

def newInt2IntOpenHashMap(): mutable.Map[Int, Int] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a look here. I'm creating fastutils collection that is only an implementation of scala's built-in collection (mutable.Map). If you use something like this, you can have

private val outboundLists: mutable.IndexedSeq[mutable.IndexedSeq[Int]] = new ArrayBuffer()

IMO, it's more convenient to hide fastutils api and use scala's that is easier in use (ex. map, find and so on), esp. in tests.

This works using the fact that IntArrayList implements java.util.List that can be converted to scala.collection.mutable.Seq using scala.collection.JavaConversions._.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that's easier to use IndexedSeq, but I'm slightly afraid that if I used map, etc. on IndexedSeq I'll accidentally do an O(n) copy when I'm not expecting it. I also want to make sure the mutate operations don't accidentally transform my IntArrayLists into some other IndexedSeq implmentation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. That's Scala's charm.
  2. Should be fine as long as you use val.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szymonm in your suggestion the inner mutable.IndexedSeq will have unboxing/boxing, yes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot answer, but you are right that wrapping the fastutils collection is expensive. See #142

new Int2IntOpenHashMap().asInstanceOf[jutil.Map[Int, Int]]

def intArrayListToSeq(list: IntArrayList): Seq[Int] = list map { _.toInt }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list.toIntArray() will also work, but this is fine too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be O(n) btw

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to use this at all (see my previous comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about O(n)! [edit: I'm now using the IndexedSeq wrapper you propose below, so it should be O(1)]

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,10 @@ trait GraphReader[T] {
SharedArrayBasedDirectedGraph(iteratorSeq, executorService, storedGraphDir, numShards)
}

/**
* Create an `ArrayBasedDynamicDirectedGraph`
*/
def toArrayBasedDynamicDirectedGraph() = {
new ArrayBasedDynamicDirectedGraph(iteratorSeq, storedGraphDir)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.twitter.cassovary.graph

import org.scalatest.WordSpec
import com.twitter.cassovary.graph.StoredGraphDir._
import org.scalatest.matchers.ShouldMatchers

class ArrayBasedDynamicDirectedGraphSpec extends WordSpec with ShouldMatchers with GraphBehaviours {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See here for the structure of a test:
http://doc.scalatest.org/2.2.1/#org.scalatest.WordSpec

To obtain nice class spec when running test, it should be sth like

"An ArrayBasedDynamicDirectedGraph" should {
   "add new nodes" in {
   }
  "delete edges" in {
  }
}

def builder(iteratorFunc: () => Iterator[NodeIdEdgesMaxId],
storedGraphDir: StoredGraphDir) =
new ArrayBasedDynamicDirectedGraph(iteratorFunc(), storedGraphDir)
verifyGraphBuilding(builder, sampleGraphEdges)

"Add new nodes" in {
val graph = new ArrayBasedDynamicDirectedGraph(StoredGraphDir.BothInOut)
for (i <- 0 until 3) {
graph.nodeCount shouldEqual i
graph.edgeCount shouldEqual 0
graph.getOrCreateNode(10 * i) // non-contiguous
}
graph.getOrCreateNode(10) // Accessing again should increase node count
graph.nodeCount shouldEqual 3
}


"Add new edges" in {
for (dir <- List(StoredGraphDir.OnlyIn, StoredGraphDir.OnlyOut, StoredGraphDir.BothInOut)) {
val graph = new ArrayBasedDynamicDirectedGraph(dir)
val inStored = graph.isDirStored(GraphDir.InDir)
val outStored = graph.isDirStored(GraphDir.OutDir)

graph.addEdge(1, 2)
graph.addEdge(1, 2) // Test duplicate elimination
graph.edgeCount shouldEqual 1
val node1 = graph.getNodeById(1).get
node1.inboundNodes.toList shouldEqual List()
node1.outboundNodes.toList shouldEqual (if(outStored) List(2) else List())
val node2 = graph.getNodeById(2).get
node2.inboundNodes.toList shouldEqual (if(inStored) List(1) else List())
node2.outboundNodes.toList shouldEqual List()

// Test multi-edge
graph.addEdgeAllowingDuplicates(1, 2)
graph.edgeCount shouldEqual 2
node1.inboundNodes.toList shouldEqual List()
node1.outboundNodes.toList shouldEqual (if(outStored) List(2, 2) else List())
node2.inboundNodes.toList shouldEqual (if(inStored) List(1, 1) else List())
node2.outboundNodes.toList shouldEqual List()

graph.addEdge(2, 1)
graph.edgeCount shouldEqual 3
}
}

"Delete edges" in {
for (dir <- List(StoredGraphDir.OnlyIn, StoredGraphDir.OnlyOut, StoredGraphDir.BothInOut)) {
val graph = new ArrayBasedDynamicDirectedGraph(dir)
val inStored = graph.isDirStored(GraphDir.InDir)
val outStored = graph.isDirStored(GraphDir.OutDir)

graph.addEdge(1, 2)
graph.addEdge(1, 3)
graph.removeEdge(1, 2)
graph.edgeCount shouldEqual 1
graph.nodeCount shouldEqual 3 // This is debatable but reasonable.
val node1 = graph.getNodeById(1).get
node1.inboundNodes.toList shouldEqual List()
node1.outboundNodes.toList shouldEqual (if(outStored) List(3) else List())
val node2 = graph.getNodeById(2).get
node2.inboundNodes.toList shouldEqual List()
node2.outboundNodes.toList shouldEqual List()
val node3 = graph.getNodeById(3).get
node3.inboundNodes.toList shouldEqual (if(inStored) List(1) else List())
node3.outboundNodes.toList shouldEqual List()
}
}
}