diff --git a/.idea/libraries/buildScala.xml b/.idea/libraries/buildScala.xml index 08a0b2ce..a547267d 100644 --- a/.idea/libraries/buildScala.xml +++ b/.idea/libraries/buildScala.xml @@ -1,13 +1,30 @@ - - + + + + + + + + + + + + + + + + + + + - - + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml index 4d37b690..9eb16788 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,8 +1,5 @@ - - diff --git a/.idea/modules.xml b/.idea/modules.xml index 81e78142..2c845841 100644 --- a/.idea/modules.xml +++ b/.idea/modules.xml @@ -3,7 +3,6 @@ - diff --git a/build.sbt b/build.sbt index aad55bac..0bdaf7ac 100644 --- a/build.sbt +++ b/build.sbt @@ -24,6 +24,8 @@ libraryDependencies += "org.mockito" % "mockito-all" % "1.8.5" % "test" withSour libraryDependencies += "org.scala-tools.testing" % "specs_2.8.1" % "1.6.6" % "test" withSources() +libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "0.20.2" + publishMavenStyle := true publishTo <<= version { (v: String) => diff --git a/cassovary.iml b/cassovary.iml index c9e23bf3..b51d1dc4 100644 --- a/cassovary.iml +++ b/cassovary.iml @@ -5,6 +5,8 @@ diff --git a/src/main/java/com/twitter/pers/graph_generator/EdgeListOutput.java b/src/main/java/com/twitter/pers/graph_generator/EdgeListOutput.java new file mode 100644 index 00000000..24835608 --- /dev/null +++ b/src/main/java/com/twitter/pers/graph_generator/EdgeListOutput.java @@ -0,0 +1,77 @@ +/* + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this + * file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.twitter.pers.graph_generator; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.FileOutputStream; +import java.io.IOException; + +/** + * Efficient parallel output of edges into a binary edge list format. + * @author Aapo Kyrola, akyrola@cs.cmu.edu, akyrola@twitter.com + */ +public class EdgeListOutput implements GraphOutput { + + private String fileNamePrefix; + + static int partSeq = 0; + + public EdgeListOutput(String fileNamePrefix) { + this.fileNamePrefix = fileNamePrefix; + } + + @Override + public void addEdges(int[] from, int[] to) { + try { + DataOutputStream dos = partitionOut.get(); + int n = from.length; + for(int i=0; i partitionOut = new ThreadLocal() { + @Override + protected DataOutputStream initialValue() { + try { + int thisPartId; + synchronized (this) { + thisPartId = partSeq++; + } + + String fileName = fileNamePrefix + "-part" + thisPartId; + return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(fileName))); + } catch (Exception err) { + err.printStackTrace(); + throw new RuntimeException(err); + } + } + }; + +} diff --git a/src/main/java/com/twitter/pers/graph_generator/GraphOutput.java b/src/main/java/com/twitter/pers/graph_generator/GraphOutput.java new file mode 100644 index 00000000..97b3d08f --- /dev/null +++ b/src/main/java/com/twitter/pers/graph_generator/GraphOutput.java @@ -0,0 +1,26 @@ +/* + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this + * file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.twitter.pers.graph_generator; + +/** + * @author Aapo Kyrola, akyrola@cs.cmu.edu, akyrola@twitter.com + */ +public interface GraphOutput { + + void addEdges(int[] from, int[] to); + + void finishUp(); + +} diff --git a/src/main/java/com/twitter/pers/graph_generator/HDFSEdgeListOutput.java b/src/main/java/com/twitter/pers/graph_generator/HDFSEdgeListOutput.java new file mode 100644 index 00000000..4be0d22b --- /dev/null +++ b/src/main/java/com/twitter/pers/graph_generator/HDFSEdgeListOutput.java @@ -0,0 +1,97 @@ +/* + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this + * file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.twitter.pers.graph_generator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +/** + * Outputs edges into HDFS in tab delimited edge list format + * @author Aapo Kyrola, akyrola@cs.cmu.edu, akyrola@twitter.com + */ +public class HDFSEdgeListOutput implements GraphOutput { + + private String fileNamePrefix; + + static int partSeq = 0; + + public HDFSEdgeListOutput(String fileNamePrefix) { + this.fileNamePrefix = fileNamePrefix; + System.out.println("Using HDFS output: " + fileNamePrefix); + } + + @Override + public void addEdges(int[] from, int[] to) { + try { + FSDataOutputStream dos = partitionOut.get(); + int n = from.length; + StringBuffer sb = new StringBuffer(from.length * 32); + for(int i=0; i partitionOut = new ThreadLocal() { + @Override + protected FSDataOutputStream initialValue() { + try { + int thisPartId; + synchronized (this) { + thisPartId = partSeq++; + } + + String hadoopHome = System.getProperty("HADOOP_HOME"); + if (hadoopHome == null) hadoopHome = System.getenv("HADOOP_HOME"); + + if (hadoopHome == null) { + throw new IllegalArgumentException("You need to specify environment variable or JVM option HADOOP_HOME!"); + } + + Configuration conf = new Configuration(); + conf.addResource(new Path(hadoopHome + "/conf/core-site.xml")); + conf.addResource(new Path(hadoopHome + "/conf/hdfs-site.xml")); + + + String fileName = fileNamePrefix + "-part" + thisPartId; + FileSystem fs = FileSystem.get(conf); + return fs.create(new Path(fileName)); + } catch (Exception err) { + err.printStackTrace(); + throw new RuntimeException(err); + } + } + }; + +} diff --git a/src/main/java/com/twitter/pers/graph_generator/RMATGraphGenerator.java b/src/main/java/com/twitter/pers/graph_generator/RMATGraphGenerator.java new file mode 100644 index 00000000..bb2a2732 --- /dev/null +++ b/src/main/java/com/twitter/pers/graph_generator/RMATGraphGenerator.java @@ -0,0 +1,184 @@ + +/* + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this + * file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + + +package com.twitter.pers.graph_generator; + +import java.util.ArrayList; +import java.util.Random; + +/** + * Graph generator based on the R-MAT algorithm + * R-MAT: A Recursive Model for Graph Mining + * Chakrabarti, Zhan, Faloutsos: http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf + * + * Usage: + * [outputfile-prefix] [num-vertices] [num-edges] probA probB probC probD + * + * See the paper for the description of parameters probA, probB, probC, probD. + * + * If the outputfile-prefix starts with hdfs://, the graph is written into HDFS as tab-delimited + * partitions. + * + * Specify the number of threads used by passing a JVM option -Dnum_threads + * Each thread will create its own partition. + * + * Note that the result may (will) contain duplicate edges. + * + * @author Aapo Kyrola, akyrola@cs.cmu.edu, akyrola@twitter.com + */ +public class RMATGraphGenerator { + + private GraphOutput outputter; + + /* Parameters for top-left, top-right, bottom-left, bottom-right probabilities */ + private double pA, pB, pC, pD; + private long numEdges; + private int numVertices; + + /** + * From http://pywebgraph.sourceforge.net + ## Probability of choosing quadrant A + self.probA = 0.45 + + ## Probability of choosing quadrant B + self.probB = 0.15 + + ## Probability of choosing quadrant C + self.probC = 0.15 + + ## Probability of choosing quadrant D + self.probD = 0.25 + */ + + + public RMATGraphGenerator(GraphOutput outputter, double pA, double pB, double pC, double pD, int nVertices,long nEdges) { + this.outputter = outputter; + this.pA = pA; + this.pB = pB; + this.pC = pC; + this.pD = pD; + + if (Math.abs(pA + pB + pC + pD - 1.0) > 0.01) + throw new IllegalArgumentException("Probabilities do not add up to one!"); + numVertices = nVertices; + numEdges = nEdges; + } + + public void execute() { + int nThreads = Integer.parseInt(System.getProperty("num_threads", Runtime.getRuntime().availableProcessors() + "")); + + ArrayList threads = new ArrayList(); + for(int i=0; i < nThreads; i++) { + Thread t = new Thread(new RMATGenerator(numEdges / nThreads)); + t.start(); + threads.add(t); + } + + /* Wait */ + try { + for(Thread t : threads) t.join(); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + } + + private class RMATGenerator implements Runnable { + + private long edgesToGenerate; + + private RMATGenerator(long genEdges) { + this.edgesToGenerate = genEdges; + } + + public void run() { + int nEdgesATime = 1000000; + long createdEdges = 0; + + Random r = new Random(System.currentTimeMillis() + this.hashCode()); + + double cumA = pA; + double cumB = cumA + pB; + double cumC = cumB + pC; + double cumD = 1.0; + assert(cumD > cumC); + + while(edgesToGenerate > createdEdges) { + int ne = (int) Math.min(edgesToGenerate - createdEdges, nEdgesATime); + int[] fromIds = new int[ne]; + int[] toIds = new int[ne]; + + for(int j=0; j < ne; j++) { + int col_st = 0, col_en = numVertices - 1, row_st = 0, row_en = numVertices - 1; + while (col_st != col_en || row_st != row_en) { + double x = r.nextDouble(); + + if (x < cumA) { + // Top-left + col_en = col_st + (col_en - col_st) / 2; + row_en = row_st + (row_en - row_st) / 2; + } else if (x < cumB) { + // Top-right + col_st = col_en - (col_en - col_st) / 2; + row_en = row_st + (row_en - row_st) / 2; + + } else if (x < cumC) { + // Bottom-left + col_en = col_st + (col_en - col_st) / 2; + row_st = row_en - (row_en - row_st) / 2; + } else { + // Bottom-right + col_st = col_en - (col_en - col_st) / 2; + row_st = row_en - (row_en - row_st) / 2; + } + } + fromIds[j] = col_st; + toIds[j] = row_st; + } + + outputter.addEdges(fromIds, toIds); + createdEdges += ne; + System.out.println(Thread.currentThread().getId() + " created " + createdEdges + " edges."); + } + outputter.finishUp(); + } + } + + public static void main(String[] args) { + int k = 0; + String outputFile = args[k++]; + int numVertices = Integer.parseInt(args[k++]); + long numEdges = Long.parseLong(args[k++]); + + double pA = Double.parseDouble(args[k++]); + double pB = Double.parseDouble(args[k++]); + double pC = Double.parseDouble(args[k++]); + double pD = Double.parseDouble(args[k++]); + + System.out.println("Going to create graph with approx. " + numVertices + " vertices and " + numEdges + " edges"); + + GraphOutput outputInstance = null; + if (outputFile.startsWith("hdfs://")) outputInstance = new HDFSEdgeListOutput(outputFile); + else outputInstance = new EdgeListOutput(outputFile); + + long t = System.currentTimeMillis(); + RMATGraphGenerator generator = new RMATGraphGenerator(outputInstance, + pA, pB, pC, pD, numVertices, numEdges); + generator.execute(); + System.out.println("Generating took " + (System.currentTimeMillis() - t) * 0.001 + " secs"); + + } + +} diff --git a/src/main/scala/com/twitter/cassovary/algorithms/PageRank.scala b/src/main/scala/com/twitter/cassovary/algorithms/PageRank.scala index 1da85910..81dfcfdb 100644 --- a/src/main/scala/com/twitter/cassovary/algorithms/PageRank.scala +++ b/src/main/scala/com/twitter/cassovary/algorithms/PageRank.scala @@ -80,17 +80,22 @@ private class PageRank(graph: DirectedGraph, params: PageRankParams) { var beforePR = new Array[Double](graph.maxNodeId + 1) log.info("Initializing starting PageRank...") val progress = Progress("pagerank_init", 65536, Some(graph.nodeCount)) + log.info("Starting to run") val initialPageRankValue = 1.0D / graph.nodeCount graph.foreach { node => beforePR(node.id) = initialPageRankValue progress.inc } + log.info("Starting iterations %d".format(params.iterations.get)) + (0 until params.iterations.get).foreach { i => log.info("Beginning %sth iteration".format(i)) beforePR = iterate(beforePR) } + log.info("Finished") + beforePR } diff --git a/src/main/scala/com/twitter/cassovary/util/io/AdjacencyListGraphReader.scala b/src/main/scala/com/twitter/cassovary/util/io/AdjacencyListGraphReader.scala index 71bb38da..c9180c99 100644 --- a/src/main/scala/com/twitter/cassovary/util/io/AdjacencyListGraphReader.scala +++ b/src/main/scala/com/twitter/cassovary/util/io/AdjacencyListGraphReader.scala @@ -48,7 +48,7 @@ class AdjacencyListGraphReader (directory: String, prefixFileNames: String = "") class OneShardReader(filename: String) extends Iterator[NodeIdEdgesMaxId] { private val outEdgePattern = """^(\d+)\s+(\d+)""".r - private val lines = Source.fromFile(filename).getLines() + private val lines = Source.fromFile(filename).getLines().filterNot(p => p.startsWith("#")) private val holder = NodeIdEdgesMaxId(-1, null, -1) override def hasNext: Boolean = lines.hasNext @@ -93,6 +93,7 @@ class AdjacencyListGraphReader (directory: String, prefixFileNames: String = "") None } }) + if (validFiles.length == 0) throw new RuntimeException("Did not find any files with prefix: " + prefixFileNames) validFiles.map({ filename => {() => new OneShardReader(directory + "/" + filename)} }).toSeq