Skip to content

Commit

Permalink
fixing circular dependency run order (#3208)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei authored Jan 31, 2025
1 parent 7ca6b21 commit 1540e1a
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 15 deletions.
14 changes: 5 additions & 9 deletions backend/pkg/table-dependency/circular-dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,15 @@ func FindCircularDependencies(dependencies map[string][]string) [][]string {
var result [][]string

for node := range dependencies {
visited, recStack := make(map[string]bool), make(map[string]bool)
dfsCycles(node, node, dependencies, visited, recStack, []string{}, &result)
recStack := make(map[string]bool)
path := []string{}
dfsCycles(node, node, dependencies, recStack, path, &result)
}
return uniqueCycles(result)
}

// finds all possible path variations
func dfsCycles(start, current string, dependencies map[string][]string, visited, recStack map[string]bool, path []string, result *[][]string) {
func dfsCycles(start, current string, dependencies map[string][]string, recStack map[string]bool, path []string, result *[][]string) {
if recStack[current] {
if current == start {
// make copy to prevent reference issues
Expand All @@ -123,15 +124,10 @@ func dfsCycles(start, current string, dependencies map[string][]string, visited,
path = append(path, current)

for _, neighbor := range dependencies[current] {
if !visited[neighbor] {
dfsCycles(start, neighbor, dependencies, visited, recStack, path, result)
}
dfsCycles(start, neighbor, dependencies, recStack, path, result)
}

recStack[current] = false
if start == current {
visited[current] = true
}
}

func uniqueCycles(cycles [][]string) [][]string {
Expand Down
93 changes: 93 additions & 0 deletions backend/pkg/table-dependency/circular-dependencies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,99 @@ func Test_FindCircularDependencies(t *testing.T) {
},
expect: [][]string{{"public.a", "public.b", "public.c"}, {"public.b"}},
},
{
name: "Nested cycles",
dependencies: map[string][]string{
"public.a": {"public.b"},
"public.b": {"public.c"},
"public.c": {"public.d", "public.a"},
"public.d": {"public.b"},
},
expect: [][]string{
{"public.a", "public.b", "public.c"},
{"public.b", "public.c", "public.d"},
},
},
{
name: "Multiple overlapping cycles with shared nodes",
dependencies: map[string][]string{
"public.a": {"public.b", "public.d"},
"public.b": {"public.c"},
"public.c": {"public.a"},
"public.d": {"public.e"},
"public.e": {"public.f"},
"public.f": {"public.d", "public.a"},
},
expect: [][]string{
{"public.a", "public.b", "public.c"},
{"public.a", "public.d", "public.e", "public.f"},
{"public.d", "public.e", "public.f"},
},
},
{
name: "Diamond shape with multiple paths",
dependencies: map[string][]string{
"public.a": {"public.b", "public.c"},
"public.b": {"public.d"},
"public.c": {"public.d"},
"public.d": {"public.a"},
},
expect: [][]string{
{"public.a", "public.b", "public.d"},
{"public.a", "public.c", "public.d"},
},
},
{
name: "Complex web of dependencies",
dependencies: map[string][]string{
"public.a": {"public.b", "public.c"},
"public.b": {"public.d", "public.e"},
"public.c": {"public.e", "public.f"},
"public.d": {"public.g"},
"public.e": {"public.g", "public.h"},
"public.f": {"public.h"},
"public.g": {"public.i"},
"public.h": {"public.i"},
"public.i": {"public.a"},
},
expect: [][]string{
{"public.a", "public.b", "public.d", "public.g", "public.i"},
{"public.a", "public.b", "public.e", "public.g", "public.i"},
{"public.a", "public.b", "public.e", "public.h", "public.i"},
{"public.a", "public.c", "public.e", "public.g", "public.i"},
{"public.a", "public.c", "public.e", "public.h", "public.i"},
{"public.a", "public.c", "public.f", "public.h", "public.i"},
},
},
{
name: "Multiple self-references with shared dependencies",
dependencies: map[string][]string{
"public.a": {"public.a", "public.b"},
"public.b": {"public.b", "public.c"},
"public.c": {"public.a", "public.c"},
},
expect: [][]string{
{"public.a"},
{"public.b"},
{"public.c"},
{"public.a", "public.b", "public.c"},
},
},
{
name: "Cycle with branching paths",
dependencies: map[string][]string{
"public.root": {"public.a1", "public.a2"},
"public.a1": {"public.b1"},
"public.a2": {"public.b2"},
"public.b1": {"public.c"},
"public.b2": {"public.c"},
"public.c": {"public.root"},
},
expect: [][]string{
{"public.root", "public.a1", "public.b1", "public.c"},
{"public.root", "public.a2", "public.b2", "public.c"},
},
},
}

for _, tt := range tests {
Expand Down
5 changes: 3 additions & 2 deletions backend/pkg/table-dependency/table-dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func GetRunConfigs(
}
cycleConfigs, err := processCycles(group, tableColumnsMap, primaryKeyMap, subsets, dependencyMap, d.foreignKeyCols)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to process cycles: %w", err)
}
// update table processed map
for _, cfg := range cycleConfigs {
Expand Down Expand Up @@ -336,7 +336,8 @@ func processCycles(
if isTableInCycles(cycles, fkTable) {
if len(fkCols.NullableColumns) > 0 {
updateConfig.appendDependsOn(fkTable, fkCols.NullableColumns)
} else {
}
if len(fkCols.NonNullableColumns) > 0 {
insertConfig.appendDependsOn(fkTable, fkCols.NonNullableColumns)
}
} else {
Expand Down
6 changes: 3 additions & 3 deletions internal/benthos/benthos-builder/builders/processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func Test_buildProcessorConfigsJavascript(t *testing.T) {
Config: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_TransformJavascriptConfig{
TransformJavascriptConfig: &mgmtv1alpha1.TransformJavascript{
Code: `return "hello " + value;`,
Code: `return "hello " + value + " " + input.extra;`,
},
},
},
Expand Down Expand Up @@ -51,7 +51,7 @@ func Test_buildProcessorConfigsJavascript(t *testing.T) {
wrappedCode := fmt.Sprintf(`
let programOutput = undefined;
const benthos = {
v0_msg_as_structured: () => ({address: "world"}),
v0_msg_as_structured: () => ({address: "world", extra: "foobar"}),
};
const neosync = {
patchStructuredMessage: (val) => {
Expand All @@ -70,7 +70,7 @@ const neosync = {
require.NotNil(t, programOutput)
outputMap, ok := programOutput.(map[string]any)
require.True(t, ok)
require.Equal(t, "hello world", outputMap["address"])
require.Equal(t, "hello world foobar", outputMap["address"])
}

func Test_buildProcessorConfigsGenerateJavascript(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ pipeline:
threads: 1
processors:
- mutation: |
root.name = fake("first_name")
root.name = generate_first_name()
output:
label: ""
stdout:
Expand Down

0 comments on commit 1540e1a

Please sign in to comment.