diff --git a/README.md b/README.md index 002bb1e..016c019 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ import ( ) func main() { - graph := pagerank.NewGraph() + graph := pagerank.NewGraph64() graph.Link(1, 2, 1.0) graph.Link(1, 3, 2.0) @@ -22,7 +22,7 @@ func main() { graph.Link(2, 4, 4.0) graph.Link(3, 1, 5.0) - graph.Rank(0.85, 0.000001, func(node uint32, rank float64) { + graph.Rank(0.85, 0.000001, func(node uint64, rank float64) { fmt.Println("Node", node, "has a rank of", rank) }) } diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..14dccd8 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/pointlander/pagerank + +go 1.13 diff --git a/pagerank.go b/pagerank.go deleted file mode 100644 index a345204..0000000 --- a/pagerank.go +++ /dev/null @@ -1,117 +0,0 @@ -/* -Package pagerank implements the *weighted* PageRank algorithm. -*/ -package pagerank - -import ( - "math" -) - -type node struct { - weight float64 - outbound float64 -} - -// Graph holds node and edge data. -type Graph struct { - edges map[uint32](map[uint32]float64) - nodes map[uint32]*node -} - -// NewGraph initializes and returns a new graph. -func NewGraph() *Graph { - return &Graph{ - edges: make(map[uint32](map[uint32]float64)), - nodes: make(map[uint32]*node), - } -} - -// Link creates a weighted edge between a source-target node pair. -// If the edge already exists, the weight is incremented. -func (self *Graph) Link(source, target uint32, weight float64) { - if _, ok := self.nodes[source]; ok == false { - self.nodes[source] = &node{ - weight: 0, - outbound: 0, - } - } - - self.nodes[source].outbound += weight - - if _, ok := self.nodes[target]; ok == false { - self.nodes[target] = &node{ - weight: 0, - outbound: 0, - } - } - - if _, ok := self.edges[source]; ok == false { - self.edges[source] = map[uint32]float64{} - } - - self.edges[source][target] += weight -} - -// Rank computes the PageRank of every node in the directed graph. -// α (alpha) is the damping factor, usually set to 0.85. -// ε (epsilon) is the convergence criteria, usually set to a tiny value. -// -// This method will run as many iterations as needed, until the graph converges. -func (self *Graph) Rank(α, ε float64, callback func(id uint32, rank float64)) { - Δ := float64(1.0) - inverse := 1 / float64(len(self.nodes)) - - // Normalize all the edge weights so that their sum amounts to 1. - for source := range self.edges { - if self.nodes[source].outbound > 0 { - for target := range self.edges[source] { - self.edges[source][target] /= self.nodes[source].outbound - } - } - } - - for key := range self.nodes { - self.nodes[key].weight = inverse - } - - for Δ > ε { - leak := float64(0) - nodes := map[uint32]float64{} - - for key, value := range self.nodes { - nodes[key] = value.weight - - if value.outbound == 0 { - leak += value.weight - } - - self.nodes[key].weight = 0 - } - - leak *= α - - for source := range self.nodes { - for target, weight := range self.edges[source] { - self.nodes[target].weight += α * nodes[source] * weight - } - - self.nodes[source].weight += (1-α)*inverse + leak*inverse - } - - Δ = 0 - - for key, value := range self.nodes { - Δ += math.Abs(value.weight - nodes[key]) - } - } - - for key, value := range self.nodes { - callback(key, value.weight) - } -} - -// Reset clears all the current graph data. -func (self *Graph) Reset() { - self.edges = make(map[uint32](map[uint32]float64)) - self.nodes = make(map[uint32]*node) -} diff --git a/pagerank32.go b/pagerank32.go new file mode 100644 index 0000000..9a330dd --- /dev/null +++ b/pagerank32.go @@ -0,0 +1,204 @@ +/* +Package pagerank implements the *weighted* PageRank algorithm. +*/ +package pagerank + +import ( + "fmt" + "runtime" + "sync" +) + +var ( + // NumCPU is the number of cpus + NumCPU = runtime.NumCPU() +) + +// Node32 is a node in a graph +type Node32 struct { + sync.RWMutex + weight [2]float32 + outbound float32 + edges map[uint]float32 +} + +// Graph32 holds node and edge data. +type Graph32 struct { + Verbose bool + count uint + index map[uint64]uint + nodes []Node32 +} + +// NewGraph32 initializes and returns a new graph. +func NewGraph32(size ...int) *Graph32 { + capacity := 8 + if len(size) == 1 { + capacity = size[0] + } + return &Graph32{ + index: make(map[uint64]uint, capacity), + nodes: make([]Node32, 0, capacity), + } +} + +// Link creates a weighted edge between a source-target node pair. +// If the edge already exists, the weight is incremented. +func (g *Graph32) Link(source, target uint64, weight float32) { + s, ok := g.index[source] + if !ok { + s = g.count + g.index[source] = s + g.nodes = append(g.nodes, Node32{}) + g.count++ + } + + g.nodes[s].outbound += weight + + t, ok := g.index[target] + if !ok { + t = g.count + g.index[target] = t + g.nodes = append(g.nodes, Node32{}) + g.count++ + } + + if g.nodes[s].edges == nil { + g.nodes[s].edges = map[uint]float32{} + } + + g.nodes[s].edges[t] += weight +} + +// Rank computes the PageRank of every node in the directed graph. +// α (alpha) is the damping factor, usually set to 0.85. +// ε (epsilon) is the convergence criteria, usually set to a tiny value. +// +// This method will run as many iterations as needed, until the graph converges. +func (g *Graph32) Rank(α, ε float32, callback func(id uint64, rank float32)) { + Δ := float32(1.0) + nodes := g.nodes + inverse := 1 / float32(len(nodes)) + + // Normalize all the edge weights so that their sum amounts to 1. + if g.Verbose { + fmt.Println("normalize...") + } + done := make(chan bool, 8) + normalize := func(node *Node32) { + if outbound := node.outbound; outbound > 0 { + for target := range node.edges { + node.edges[target] /= outbound + } + } + done <- true + } + i, flight := 0, 0 + for i < len(nodes) && flight < NumCPU { + go normalize(&nodes[i]) + flight++ + i++ + } + for i < len(nodes) { + <-done + flight-- + go normalize(&nodes[i]) + flight++ + i++ + } + for j := 0; j < flight; j++ { + <-done + } + + if g.Verbose { + fmt.Println("initialize...") + } + leak := float32(0) + + a, b := 0, 1 + for source := range nodes { + nodes[source].weight[a] = inverse + + if nodes[source].outbound == 0 { + leak += inverse + } + } + + update := func(adjustment float32, node *Node32) { + node.RLock() + aa := α * node.weight[a] + node.RUnlock() + for target, weight := range node.edges { + nodes[target].Lock() + nodes[target].weight[b] += aa * weight + nodes[target].Unlock() + } + node.Lock() + bb := node.weight[b] + node.weight[b] = bb + adjustment + node.Unlock() + done <- true + } + for Δ > ε { + if g.Verbose { + fmt.Println("updating...") + } + adjustment := (1-α)*inverse + α*leak*inverse + i, flight := 0, 0 + for i < len(nodes) && flight < NumCPU { + go update(adjustment, &nodes[i]) + flight++ + i++ + } + for i < len(nodes) { + <-done + flight-- + go update(adjustment, &nodes[i]) + flight++ + i++ + } + for j := 0; j < flight; j++ { + <-done + } + + if g.Verbose { + fmt.Println("computing delta...") + } + Δ, leak = 0, 0 + for source := range nodes { + node := &nodes[source] + aa, bb := node.weight[a], node.weight[b] + if difference := aa - bb; difference < 0 { + Δ -= difference + } else { + Δ += difference + } + + if node.outbound == 0 { + leak += bb + } + nodes[source].weight[a] = 0 + } + + a, b = b, a + + if g.Verbose { + fmt.Println(Δ, ε) + } + } + + for key, value := range g.index { + callback(key, nodes[value].weight[a]) + } +} + +// Reset clears all the current graph data. +func (g *Graph32) Reset(size ...int) { + capacity := 8 + if len(size) == 1 { + capacity = size[0] + } + g.count = 0 + g.index = make(map[uint64]uint, capacity) + g.nodes = make([]Node32, 0, capacity) +} diff --git a/pagerank64.go b/pagerank64.go new file mode 100644 index 0000000..6b6c818 --- /dev/null +++ b/pagerank64.go @@ -0,0 +1,198 @@ +/* +Package pagerank implements the *weighted* PageRank algorithm. +*/ +package pagerank + +import ( + "fmt" + "sync" +) + +// Node64 is a node in a graph +type Node64 struct { + sync.RWMutex + weight [2]float64 + outbound float64 + edges map[uint]float64 +} + +// Graph64 holds node and edge data. +type Graph64 struct { + Verbose bool + count uint + index map[uint64]uint + nodes []Node64 +} + +// NewGraph64 initializes and returns a new graph. +func NewGraph64(size ...int) *Graph64 { + capacity := 8 + if len(size) == 1 { + capacity = size[0] + } + return &Graph64{ + index: make(map[uint64]uint, capacity), + nodes: make([]Node64, 0, capacity), + } +} + +// Link creates a weighted edge between a source-target node pair. +// If the edge already exists, the weight is incremented. +func (g *Graph64) Link(source, target uint64, weight float64) { + s, ok := g.index[source] + if !ok { + s = g.count + g.index[source] = s + g.nodes = append(g.nodes, Node64{}) + g.count++ + } + + g.nodes[s].outbound += weight + + t, ok := g.index[target] + if !ok { + t = g.count + g.index[target] = t + g.nodes = append(g.nodes, Node64{}) + g.count++ + } + + if g.nodes[s].edges == nil { + g.nodes[s].edges = map[uint]float64{} + } + + g.nodes[s].edges[t] += weight +} + +// Rank computes the PageRank of every node in the directed graph. +// α (alpha) is the damping factor, usually set to 0.85. +// ε (epsilon) is the convergence criteria, usually set to a tiny value. +// +// This method will run as many iterations as needed, until the graph converges. +func (g *Graph64) Rank(α, ε float64, callback func(id uint64, rank float64)) { + Δ := float64(1.0) + nodes := g.nodes + inverse := 1 / float64(len(nodes)) + + // Normalize all the edge weights so that their sum amounts to 1. + if g.Verbose { + fmt.Println("normalize...") + } + done := make(chan bool, 8) + normalize := func(node *Node64) { + if outbound := node.outbound; outbound > 0 { + for target := range node.edges { + node.edges[target] /= outbound + } + } + done <- true + } + i, flight := 0, 0 + for i < len(nodes) && flight < NumCPU { + go normalize(&nodes[i]) + flight++ + i++ + } + for i < len(nodes) { + <-done + flight-- + go normalize(&nodes[i]) + flight++ + i++ + } + for j := 0; j < flight; j++ { + <-done + } + + if g.Verbose { + fmt.Println("initialize...") + } + leak := float64(0) + + a, b := 0, 1 + for source := range nodes { + nodes[source].weight[a] = inverse + + if nodes[source].outbound == 0 { + leak += inverse + } + } + + update := func(adjustment float64, node *Node64) { + node.RLock() + aa := α * node.weight[a] + node.RUnlock() + for target, weight := range node.edges { + nodes[target].Lock() + nodes[target].weight[b] += aa * weight + nodes[target].Unlock() + } + node.Lock() + bb := node.weight[b] + node.weight[b] = bb + adjustment + node.Unlock() + done <- true + } + for Δ > ε { + if g.Verbose { + fmt.Println("updating...") + } + adjustment := (1-α)*inverse + α*leak*inverse + i, flight := 0, 0 + for i < len(nodes) && flight < NumCPU { + go update(adjustment, &nodes[i]) + flight++ + i++ + } + for i < len(nodes) { + <-done + flight-- + go update(adjustment, &nodes[i]) + flight++ + i++ + } + for j := 0; j < flight; j++ { + <-done + } + + if g.Verbose { + fmt.Println("computing delta...") + } + Δ, leak = 0, 0 + for source := range nodes { + node := &nodes[source] + aa, bb := node.weight[a], node.weight[b] + if difference := aa - bb; difference < 0 { + Δ -= difference + } else { + Δ += difference + } + + if node.outbound == 0 { + leak += bb + } + nodes[source].weight[a] = 0 + } + + a, b = b, a + + if g.Verbose { + fmt.Println(Δ, ε) + } + } + + for key, value := range g.index { + callback(key, nodes[value].weight[a]) + } +} + +// Reset clears all the current graph data. +func (g *Graph64) Reset(size ...int) { + capacity := 8 + if len(size) == 1 { + capacity = size[0] + } + g.count = 0 + g.index = make(map[uint64]uint, capacity) + g.nodes = make([]Node64, 0, capacity) +} diff --git a/pagerank_test.go b/pagerank_test.go index 9bac514..959c232 100644 --- a/pagerank_test.go +++ b/pagerank_test.go @@ -5,13 +5,21 @@ import ( "testing" ) -func TestEmpty(t *testing.T) { - graph := NewGraph() +func convert64(a map[uint64]float64) map[uint64]int { + b := make(map[uint64]int, len(a)) + for key, value := range a { + b[key] = int(1000 * value) + } + return b +} - actual := map[uint32]float64{} - expected := map[uint32]float64{} +func TestEmpty64(t *testing.T) { + graph := NewGraph64() - graph.Rank(0.85, 0.000001, func(node uint32, rank float64) { + actual := map[uint64]float64{} + expected := map[uint64]float64{} + + graph.Rank(0.85, 0.000001, func(node uint64, rank float64) { actual[node] = rank }) @@ -20,8 +28,8 @@ func TestEmpty(t *testing.T) { } } -func TestSimple(t *testing.T) { - graph := NewGraph() +func TestSimple64(t *testing.T) { + graph := NewGraph64() graph.Link(1, 2, 1.0) graph.Link(1, 3, 1.0) @@ -29,25 +37,25 @@ func TestSimple(t *testing.T) { graph.Link(2, 4, 1.0) graph.Link(3, 1, 1.0) - actual := map[uint32]float64{} - expected := map[uint32]float64{ + actual := map[uint64]float64{} + expected := map[uint64]float64{ 1: 0.32721836185043207, 2: 0.2108699481253495, 3: 0.3004897566512289, 4: 0.16142193337298952, } - graph.Rank(0.85, 0.000001, func(node uint32, rank float64) { + graph.Rank(0.85, 0.000001, func(node uint64, rank float64) { actual[node] = rank }) - if reflect.DeepEqual(actual, expected) != true { + if reflect.DeepEqual(convert64(actual), convert64(expected)) != true { t.Error("Expected", expected, "but got", actual) } } -func TestWeighted(t *testing.T) { - graph := NewGraph() +func TestWeighted64(t *testing.T) { + graph := NewGraph64() graph.Link(1, 2, 1.0) graph.Link(1, 3, 2.0) @@ -55,25 +63,25 @@ func TestWeighted(t *testing.T) { graph.Link(2, 4, 4.0) graph.Link(3, 1, 5.0) - actual := map[uint32]float64{} - expected := map[uint32]float64{ + actual := map[uint64]float64{} + expected := map[uint64]float64{ 1: 0.34983779905464363, 2: 0.1688733284604475, 3: 0.3295121849483849, 4: 0.15177668753652385, } - graph.Rank(0.85, 0.000001, func(node uint32, rank float64) { + graph.Rank(0.85, 0.000001, func(node uint64, rank float64) { actual[node] = rank }) - if reflect.DeepEqual(actual, expected) != true { + if reflect.DeepEqual(convert64(actual), convert64(expected)) != true { t.Error("Expected", expected, "but got", actual) } } -func TestDuplicates(t *testing.T) { - graph := NewGraph() +func TestDuplicates64(t *testing.T) { + graph := NewGraph64() graph.Link(1, 2, 1.0) graph.Link(1, 3, 2.0) @@ -84,25 +92,25 @@ func TestDuplicates(t *testing.T) { graph.Link(1, 2, 6.0) graph.Link(1, 3, 7.0) - actual := map[uint32]float64{} - expected := map[uint32]float64{ + actual := map[uint64]float64{} + expected := map[uint64]float64{ 1: 0.3312334209098247, 2: 0.19655848316544225, 3: 0.3033555769882879, 4: 0.168852518936445, } - graph.Rank(0.85, 0.000001, func(node uint32, rank float64) { + graph.Rank(0.85, 0.000001, func(node uint64, rank float64) { actual[node] = rank }) - if reflect.DeepEqual(actual, expected) != true { + if reflect.DeepEqual(convert64(actual), convert64(expected)) != true { t.Error("Expected", expected, "but got", actual) } } -func TestDuplicatesAfterReset(t *testing.T) { - graph := NewGraph() +func TestDuplicatesAfterReset64(t *testing.T) { + graph := NewGraph64() graph.Link(1, 2, 1.0) graph.Link(1, 3, 2.0) @@ -115,14 +123,55 @@ func TestDuplicatesAfterReset(t *testing.T) { graph.Link(1, 2, 6.0) graph.Link(1, 3, 7.0) - actual := map[uint32]float64{} - expected := map[uint32]float64{ + actual := map[uint64]float64{} + expected := map[uint64]float64{ 1: 0.25974019022001016, 2: 0.3616383883769191, 3: 0.3786214214030706, } - graph.Rank(0.85, 0.000001, func(node uint32, rank float64) { + graph.Rank(0.85, 0.000001, func(node uint64, rank float64) { + actual[node] = rank + }) + + if reflect.DeepEqual(convert64(actual), convert64(expected)) != true { + t.Error("Expected", expected, "but got", actual) + } +} + +func BenchmarkGraph64(b *testing.B) { + for n := 0; n < b.N; n++ { + graph := NewGraph64() + + graph.Link(1, 2, 1.0) + graph.Link(1, 3, 2.0) + graph.Link(2, 3, 3.0) + graph.Link(2, 4, 4.0) + graph.Link(3, 1, 5.0) + + results := map[uint64]float64{} + + graph.Rank(0.85, 0.000001, func(node uint64, rank float64) { + results[node] = rank + }) + } +} + +func convert32(a map[uint64]float32) map[uint64]int { + b := make(map[uint64]int, len(a)) + for key, value := range a { + b[key] = int(1000 * value) + } + return b +} + +func TestEmpty32(t *testing.T) { + graph := NewGraph32() + + actual := map[uint64]float32{} + expected := map[uint64]float32{} + + graph.Rank(0.85, 0.000001, func(node uint64, rank float32) { actual[node] = rank }) @@ -130,3 +179,132 @@ func TestDuplicatesAfterReset(t *testing.T) { t.Error("Expected", expected, "but got", actual) } } + +func TestSimple32(t *testing.T) { + graph := NewGraph32() + + graph.Link(1, 2, 1.0) + graph.Link(1, 3, 1.0) + graph.Link(2, 3, 1.0) + graph.Link(2, 4, 1.0) + graph.Link(3, 1, 1.0) + + actual := map[uint64]float32{} + expected := map[uint64]float32{ + 1: 0.32721835, + 2: 0.21086994, + 3: 0.30048975, + 4: 0.16142192, + } + + graph.Rank(0.85, 0.000001, func(node uint64, rank float32) { + actual[node] = rank + }) + + if reflect.DeepEqual(convert32(actual), convert32(expected)) != true { + t.Error("Expected", expected, "but got", actual) + } +} + +func TestWeighted32(t *testing.T) { + graph := NewGraph32() + + graph.Link(1, 2, 1.0) + graph.Link(1, 3, 2.0) + graph.Link(2, 3, 3.0) + graph.Link(2, 4, 4.0) + graph.Link(3, 1, 5.0) + + actual := map[uint64]float32{} + expected := map[uint64]float32{ + 1: 0.34983802, + 2: 0.16887322, + 3: 0.32951212, + 4: 0.15177679, + } + + graph.Rank(0.85, 0.000001, func(node uint64, rank float32) { + actual[node] = rank + }) + + if reflect.DeepEqual(convert32(actual), convert32(expected)) != true { + t.Error("Expected", expected, "but got", actual) + } +} + +func TestDuplicates32(t *testing.T) { + graph := NewGraph32() + + graph.Link(1, 2, 1.0) + graph.Link(1, 3, 2.0) + graph.Link(2, 3, 3.0) + graph.Link(2, 4, 4.0) + graph.Link(3, 1, 5.0) + + graph.Link(1, 2, 6.0) + graph.Link(1, 3, 7.0) + + actual := map[uint64]float32{} + expected := map[uint64]float32{ + 1: 0.33123338, + 2: 0.19655848, + 3: 0.30335557, + 4: 0.16885251, + } + + graph.Rank(0.85, 0.000001, func(node uint64, rank float32) { + actual[node] = rank + }) + + if reflect.DeepEqual(convert32(actual), convert32(expected)) != true { + t.Error("Expected", expected, "but got", actual) + } +} + +func TestDuplicatesAfterReset32(t *testing.T) { + graph := NewGraph32() + + graph.Link(1, 2, 1.0) + graph.Link(1, 3, 2.0) + graph.Link(2, 3, 3.0) + graph.Link(2, 4, 4.0) + graph.Link(3, 1, 5.0) + + graph.Reset() + + graph.Link(1, 2, 6.0) + graph.Link(1, 3, 7.0) + + actual := map[uint64]float32{} + expected := map[uint64]float32{ + 1: 0.25974017, + 2: 0.36163837, + 3: 0.3786214, + } + + graph.Rank(0.85, 0.000001, func(node uint64, rank float32) { + actual[node] = rank + }) + + if reflect.DeepEqual(convert32(actual), convert32(expected)) != true { + t.Error("Expected", expected, "but got", actual) + } +} + +func BenchmarkGraph32(b *testing.B) { + for n := 0; n < b.N; n++ { + graph := NewGraph64() + + graph.Link(1, 2, 1.0) + graph.Link(1, 3, 2.0) + graph.Link(2, 3, 3.0) + graph.Link(2, 4, 4.0) + graph.Link(3, 1, 5.0) + + results := map[uint64]float64{} + + graph.Rank(0.85, 0.000001, func(node uint64, rank float64) { + results[node] = rank + }) + } +}