Skip to content

Commit

Permalink
Merge pull request #176 from twitter/cross_partitioning
Browse files Browse the repository at this point in the history
Graph file splitter (deleted old and resubmit new pull request)
  • Loading branch information
pankajgupta committed Mar 18, 2015
2 parents b2df01f + 292715d commit 9278517
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.twitter.cassovary.graph.distributed

import com.twitter.cassovary.graph.NodeIdEdgesMaxId
import com.twitter.cassovary.util.BoundedFuturePool
import com.twitter.cassovary.util.io.GraphReaderFromDirectory
import com.twitter.logging.Logger
import com.twitter.util.{Await, Future, FuturePool}
import java.io._

/**
* Splits a graph read by `graphReaderFromDirectory` to multiple subgraphs, each
* in a separate subdirectory, named "instance_i" for partition numbered i.
* Splitting is done as per `partitioner`.
*/
class GraphFilesSplitter[T](outputDir: String, partitioner: Partitioner,
graphReaderFromDirectory: GraphReaderFromDirectory[T]) {

private val futurePool = new BoundedFuturePool(FuturePool.unboundedPool,
graphReaderFromDirectory.parallelismLimit)
private val log = Logger.get("graphFilesSplitter")

def splitGraph(): Unit = {
// there are many parts of the original input graph
val inputParts = graphReaderFromDirectory.iterableSeq

// instanceWriters is a 2-D array indexed by input part# and instance#
val instanceWriters = setupPerInstanceSubdirectories(partitioner.numInstances,
graphReaderFromDirectory.iterableSeq.length)
val futures = Future.collect(inputParts.indices map { i =>
split(inputParts(i).iterator, instanceWriters(i))
})
Await.result(futures)
}

private def mkDirHelper(dirName: String): Unit = {
val dir = new File(dirName)
if (dir.exists()) {
log.info("Directory %s already exists.", dir)
} else {
if (dir.mkdir()) {
log.debug("Made new directory %s", dir)
} else {
throw new FileNotFoundException("Unable to create new directory " + dir)
}
}
}

private def getBufferedWriter(fileName: String): BufferedWriter = {
try {
val f = new File(fileName)
f.createNewFile()
new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f), "utf-8"))
} catch {
case ex : IOException => throw new IOException(ex.toString)
}
}

// @return an array of arrays. The right index is of subgraph instance number and
// left index is of input seq number.
private def setupPerInstanceSubdirectories(numInstances: Int,
numInputParts: Int): Array[Array[BufferedWriter]] = {
mkDirHelper(outputDir)
val instanceWriters = Array.ofDim[BufferedWriter](numInputParts, numInstances)
(0 until numInstances) foreach { i =>
val subDirName = outputDir + "/instance_" + i
mkDirHelper(subDirName)
(0 until numInputParts) foreach { j =>
instanceWriters(j)(i) = getBufferedWriter(subDirName + "/" + j)
}
}
instanceWriters
}

private def split(it: Iterator[NodeIdEdgesMaxId],
instanceWriters: Array[BufferedWriter]): Future[Unit] = futurePool {
it foreach { origNode =>
partitioner.map(origNode) foreach { case (instance, node) =>
instanceWriters(instance).write(graphReaderFromDirectory.reverseParseNode(node))
}
}
instanceWriters foreach { writer =>
writer.flush()
writer.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ class AdjacencyListGraphReader[T] (
def oneShardReader(filename : String) : Iterable[NodeIdEdgesMaxId] = {
new OneShardReader(filename, nodeNumberer)
}

// note that we are assuming that n.id.toString does the right thing, which is
// true for int and long ids but might not be for a general T.
def reverseParseNode(n: NodeIdEdgesMaxId): String = {
n.id + separator + n.edges.length + "\n" + n.edges.mkString("\n") + "\n"
}

}

object AdjacencyListGraphReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ trait GraphReader[T] {

def parallelismLimit: Int = Runtime.getRuntime.availableProcessors

/**
* The reader knows the format as it knows how to read the file. This reverse parses
* the input `n` to a string in that same format.
*/
def reverseParseNode(n: NodeIdEdgesMaxId): String

/**
* Create an `ArrayBasedDirectedGraph`
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ class ListOfEdgesGraphReader[T](
def oneShardReader(filename: String): Iterable[NodeIdEdgesMaxId] = {
new OneShardReader(filename, nodeNumberer)
}

def reverseParseNode(n: NodeIdEdgesMaxId): String = {
n.edges.map { neighbor =>
n.id + " " + neighbor
}.mkString("\n") + "\n"
}

}

object ListOfEdgesGraphReader {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.twitter.cassovary.graph.distributed

import java.io.File
import java.util.concurrent.Executors

import com.twitter.cassovary.util.io.AdjacencyListGraphReader
import com.twitter.common.util.FileUtils
import org.scalatest.{Matchers, WordSpec}

class GraphFilesSplitterSpec extends WordSpec with Matchers {
val inputGraphDir = "cassovary-core/src/test/resources/graphs"
val reader = AdjacencyListGraphReader.forIntIds(inputGraphDir, "toy_6nodes_adj")
val tmpDir = "/tmp/test_graph_splitter"
val numInstances = 2
val partitioner = new HashSourceMapper(numInstances, i => i % numInstances)
val splitter = new GraphFilesSplitter[Int](tmpDir, partitioner, reader)
"splitter" should {
"make appropriate output files and directories" in {
splitter.splitGraph()
val tmpd = new File(tmpDir)
val subdirs = tmpd.list()
val expectedSubDirs = (0 until numInstances).map(i => "instance_" + i).toList.sorted
subdirs.toList.sorted shouldEqual expectedSubDirs
val expectedFiles = (0 until reader.iterableSeq.length).map(_.toString).toList.sorted
subdirs foreach { s =>
val files = new File(tmpDir + "/" + s).list()
files.toList.sorted shouldEqual expectedFiles
}
FileUtils.forceDeletePath(tmpd)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package com.twitter.cassovary.util.io

import com.twitter.cassovary.graph.{GraphBehaviours, Node}
import com.twitter.cassovary.graph.{NodeIdEdgesMaxId, GraphBehaviours, Node}
import com.twitter.cassovary.util.SequentialNodeNumberer
import org.scalatest.{Matchers, WordSpec}

Expand All @@ -37,8 +37,9 @@ class AdjacencyListGraphReaderSpec extends WordSpec with Matchers with GraphBeha
)

trait GraphWithoutRenumberer {
val graph = AdjacencyListGraphReader.forIntIds(directory,
"toy_6nodes_adj").toSharedArrayBasedDirectedGraph()
val reader = AdjacencyListGraphReader.forIntIds(directory,
"toy_6nodes_adj")
val graph = reader.toSharedArrayBasedDirectedGraph()
}

trait GraphWithRenumberer {
Expand Down Expand Up @@ -73,6 +74,15 @@ class AdjacencyListGraphReaderSpec extends WordSpec with Matchers with GraphBeha
behave like graphEquivalentToMap(graph, toy6nodeMap)
}
}

"reverse parse correctly" in {
new GraphWithoutRenumberer {
val node = NodeIdEdgesMaxId(10, Array(11, 12, 20))
val nodeStr = "10 3\n11\n12\n20\n"
reader.reverseParseNode(node) shouldEqual nodeStr
}
}

}

"AdjacencyListReader renumbered" when {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package com.twitter.cassovary.util.io

import com.twitter.cassovary.graph.{GraphBehaviours, Node}
import com.twitter.cassovary.graph.{NodeIdEdgesMaxId, GraphBehaviours, Node}
import com.twitter.cassovary.util.SequentialNodeNumberer
import org.scalatest.{Matchers, WordSpec}

Expand All @@ -36,8 +36,9 @@ class ListOfEdgesGraphReaderSpec extends WordSpec with GraphBehaviours[Node] wit
private val directory: String = "cassovary-core/src/test/resources/graphs/"

trait GraphWithIntIds {
val graph = ListOfEdgesGraphReader.forIntIds(directory,
"toy_list5edges").toArrayBasedDirectedGraph()
val reader = ListOfEdgesGraphReader.forIntIds(directory,
"toy_list5edges")
val graph = reader.toArrayBasedDirectedGraph()
}

trait GraphWithStringIds {
Expand Down Expand Up @@ -67,7 +68,16 @@ class ListOfEdgesGraphReaderSpec extends WordSpec with GraphBehaviours[Node] wit
behave like graphEquivalentToMap(graph, intGraphMap)
}
}

"reverse parse a node correctly" in {
new GraphWithIntIds {
val node = NodeIdEdgesMaxId(10, Array(11, 12, 13))
val nodeStr = "10 11\n10 12\n10 13\n"
reader.reverseParseNode(node) shouldEqual nodeStr
}
}
}

"using String ids" should {
"provide the correct graph properties" in {
new GraphWithStringIds {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

import com.twitter.app.Flags
import com.twitter.cassovary.graph.distributed.{GraphFilesSplitter, HashSourceAndDestMapper}
import com.twitter.cassovary.util.io.AdjacencyListGraphReader

object CrossPartitioning extends App {

val flags = new Flags("Cross Partitioning")
val numInstances = flags("n", 10, "Number of instances/shards")
val inputGraphDir = flags("i", "/tmp/input-graph", "Input graph directory")
val subgraphsDir = flags("o", "/tmp/output-graph", "Output subgraphs directory")
val helpFlag = flags("h", false, "Print usage")
flags.parseArgs(args)

val reader = AdjacencyListGraphReader.forIntIds(inputGraphDir(), "toy_6nodes_adj", null)

def hashNodeFn(i: Int) = i

val partitioner = new HashSourceAndDestMapper(numInstances(), hashNodeFn)
val splitter = new GraphFilesSplitter[Int](subgraphsDir(), partitioner, reader)
println(s"Now splitting graph in ${inputGraphDir()} into ${numInstances()} subgraphs.")
splitter.splitGraph()
println("Split is complete.")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

INPUT_GRAPH_FILES_PREFIX=$1
OUTPUT_GRAPH_DIR=$2

ALL_INSTANCES_SUBDIR=$OUTPUT_GRAPH_DIR/all_instances
OUTDEGREES=$ALL_INSTANCES_SUBDIR/outdegrees.txt
INDEGREES=$ALL_INSTANCES_SUBDIR/indegrees.txt

mkdir -p $ALL_INSTANCES_SUBDIR
echo Creating Outdegrees file in $OUTDEGREES ...
grep -h '. .' $INPUT_GRAPH_FILES_PREFIX* > $OUTDEGREES

echo Creating Indegrees file in $INDEGREES ...
grep -h '^[0-9][0-9]*$' $INPUT_GRAPH_FILES_PREFIX* | sort -S2G | uniq -c | perl -lane 'print $F[1]," ", $F[0]' > $INDEGREES
echo Done everything.

0 comments on commit 9278517

Please sign in to comment.