Skip to content

Commit

Permalink
feat(devops): support subgraph debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
mrh997 committed Jan 22, 2025
1 parent 596135e commit c0ddf92
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 122 deletions.
36 changes: 17 additions & 19 deletions devops/internal/mock/container_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 34 additions & 23 deletions devops/internal/model/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,26 @@ import (
type GraphContainer struct {
// GraphID graph id.
GraphID string
// GraphName graph name.
GraphName string
// Name graph display name.
Name string
// GraphInfo graph info, from graph compile callback.
GraphInfo *GraphInfo
// Canvas graph canvas.
// CanvasInfo graph canvas.
CanvasInfo *devmodel.CanvasInfo
// NodesGraph NodeKey vs Graph, NodeKey is the node where debugging starts.
NodesGraph map[string]*Graph

// NodeGraphs NodeKey vs Graph, NodeKey is the node where debugging starts.
NodeGraphs map[string]*Graph
}

type GraphInfo struct {
*compose.GraphInfo
Option GraphOption
// SubGraphNodes NodeKey vs Subgraph Node Info.
SubGraphNodes map[string]*SubGraphNode
}

type SubGraphNode struct {
ID string
SubGraphNodes map[string]*SubGraphNode
}

type GraphOption struct {
Expand All @@ -69,9 +76,6 @@ func initGraphInfo(gi *GraphInfo) *GraphInfo {
Name: gi.Name,
GenStateFn: gi.GenStateFn,
},
Option: GraphOption{
GenState: gi.GenStateFn,
},
}
}

Expand All @@ -80,8 +84,8 @@ func BuildDevGraph(gi *GraphInfo, fromNode string) (g *Graph, err error) {
return nil, fmt.Errorf("can not start from end node")
}

if gi.Option.GenState != nil {
g = &Graph{Graph: compose.NewGraph[any, any](compose.WithGenLocalState(gi.Option.GenState))}
if gi.GraphInfo.GenStateFn != nil {
g = &Graph{Graph: compose.NewGraph[any, any](compose.WithGenLocalState(gi.GraphInfo.GenStateFn))}
} else {
g = &Graph{Graph: compose.NewGraph[any, any]()}
}
Expand Down Expand Up @@ -158,8 +162,10 @@ func BuildDevGraph(gi *GraphInfo, fromNode string) (g *Graph, err error) {
return g, nil
}

func (gi GraphInfo) BuildGraphSchema() (graph *devmodel.GraphSchema, err error) {
func (gi GraphInfo) BuildGraphSchema(graphID string) (graph *devmodel.GraphSchema, err error) {
graph = &devmodel.GraphSchema{
ID: graphID,
Name: gi.Name,
Nodes: make([]*devmodel.Node, 0, len(gi.Nodes)+2),
Edges: make([]*devmodel.Edge, 0, len(gi.Nodes)+2),
}
Expand Down Expand Up @@ -257,7 +263,6 @@ func (gi GraphInfo) buildGraphNodes() (nodes []*devmodel.Node, err error) {
}

return nodes, err

}

func (gi GraphInfo) buildGraphEdges() (edges []*devmodel.Edge, nodes []*devmodel.Node, err error) {
Expand Down Expand Up @@ -322,6 +327,7 @@ func (gi GraphInfo) buildGraphEdges() (edges []*devmodel.Edge, nodes []*devmodel

return edges, nodes, err
}

func (gi GraphInfo) buildGraphBranches() (edges []*devmodel.Edge, nodes []*devmodel.Node, err error) {
nodes = make([]*devmodel.Node, 0)
edges = make([]*devmodel.Edge, 0)
Expand Down Expand Up @@ -365,19 +371,24 @@ func (gi GraphInfo) buildGraphBranches() (edges []*devmodel.Edge, nodes []*devmo

return edges, nodes, err
}

func (gi GraphInfo) buildSubGraphSchema() (subGraphSchema map[string]*devmodel.GraphSchema, err error) {
subGraphSchema = make(map[string]*devmodel.GraphSchema, len(gi.Nodes))
for key, graphNodeInfo := range gi.Nodes {
if graphNodeInfo.GraphInfo != nil {
subG := GraphInfo{
GraphInfo: graphNodeInfo.GraphInfo,
}
graphSchema, err := subG.BuildGraphSchema()
if err != nil {
return nil, err
}
subGraphSchema[key] = graphSchema
for nk, sgi := range gi.Nodes {
if sgi.GraphInfo == nil {
continue
}

subG := GraphInfo{
GraphInfo: sgi.GraphInfo,
SubGraphNodes: gi.SubGraphNodes[nk].SubGraphNodes,
}
graphSchema, err := subG.BuildGraphSchema(gi.SubGraphNodes[nk].ID)
if err != nil {
return nil, err
}

subGraphSchema[nk] = graphSchema
}

return subGraphSchema, err
Expand Down
40 changes: 35 additions & 5 deletions devops/internal/model/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"

devmodel "github.com/cloudwego/eino-ext/devops/model"
Expand Down Expand Up @@ -73,9 +74,6 @@ type testCallback struct {
func (tt *testCallback) OnFinish(ctx context.Context, graphInfo *compose.GraphInfo) {
tt.gi = &GraphInfo{
GraphInfo: graphInfo,
Option: GraphOption{
GenState: graphInfo.GenStateFn,
},
}
}

Expand Down Expand Up @@ -1448,7 +1446,7 @@ func (c *canvasCallBack) OnFinish(ctx context.Context, info *compose.GraphInfo)
g := GraphInfo{
GraphInfo: info,
}
graphSchema, err := g.BuildGraphSchema()
graphSchema, err := g.BuildGraphSchema("1")
assert.NoError(t, err)
assert.Equal(t, 15, len(graphSchema.Nodes))
for _, edge := range graphSchema.Edges {
Expand Down Expand Up @@ -1671,7 +1669,39 @@ func (c *canvasCallBackInferStartNode) OnFinish(ctx context.Context, info *compo
g := GraphInfo{
GraphInfo: info,
}
graphSchema, err := g.BuildGraphSchema()

subGraphNodes := make(map[string]*SubGraphNode)
var add func(pgi *compose.GraphInfo, subGraphNodes map[string]*SubGraphNode) (string, error)
add = func(pgi *compose.GraphInfo, subGraphNodes map[string]*SubGraphNode) (string, error) {
genID, err := uuid.NewRandom()
if err != nil {
return "", err
}
gid := genID.String()

for nk, ni := range pgi.Nodes {
if ni.GraphInfo == nil {
continue
}
_subGraphNodes := make(map[string]*SubGraphNode, 10)
sgid, err := add(ni.GraphInfo, _subGraphNodes)
if err != nil {
return "", err
}
subGraphNodes[nk] = &SubGraphNode{
ID: sgid,
SubGraphNodes: _subGraphNodes,
}
}
return gid, nil
}

gid, err := add(info, subGraphNodes)
assert.NoError(t, err)
assert.NotEmpty(t, gid)
g.SubGraphNodes = subGraphNodes

graphSchema, err := g.BuildGraphSchema(gid)
assert.NoError(t, err)
for _, edge := range graphSchema.Edges {
names := strings.Split(edge.Name, "_to_")
Expand Down
3 changes: 0 additions & 3 deletions devops/internal/model/runnable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ func (tt *mockRunnableCallback) OnFinish(ctx context.Context, graphInfo *compose
}
c.gi = &GraphInfo{
GraphInfo: graphInfo,
Option: GraphOption{
GenState: c.genState,
},
}
}

Expand Down
7 changes: 1 addition & 6 deletions devops/internal/service/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"runtime"
"strings"

"github.com/cloudwego/eino-ext/devops/internal/model"
"github.com/cloudwego/eino-ext/devops/internal/utils/log"
"github.com/cloudwego/eino/compose"
)
Expand All @@ -49,16 +48,12 @@ func NewGlobalDevGraphCompileCallback() compose.GraphCompileCallback {
return
}

opt := model.GraphOption{
GenState: graphInfo.GenStateFn,
}

graphName := graphInfo.Name
if graphName == "" {
graphName = genGraphName(frame)
}

_, err := ContainerSVC.AddGraphInfo(graphName, graphInfo, opt)
_, err := ContainerSVC.AddGraphInfo(graphName, graphInfo)
if err != nil {
log.Errorf(err.Error())
}
Expand Down
13 changes: 6 additions & 7 deletions devops/internal/service/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (c *callbackTestSuite) buildCallbackChain() {

func (c *callbackTestSuite) Test_NewGlobalDevGraphCompileCallback() {
mockey.PatchConvey("add graph with no graph name", c.T(), func() {
c.mockContainer.EXPECT().AddGraphInfo(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(graphName string, graphInfo *compose.GraphInfo, graphOpt model.GraphOption) (graphID string, err error) {
c.mockContainer.EXPECT().AddGraphInfo(gomock.Any(), gomock.Any()).DoAndReturn(
func(graphName string, graphInfo *compose.GraphInfo) (graphID string, err error) {
assert.Equal(c.T(), "callback_test.buildCallbackGraph:64", graphName)
return "", nil
}).Times(1)
Expand All @@ -86,8 +86,8 @@ func (c *callbackTestSuite) Test_NewGlobalDevGraphCompileCallback() {
})

mockey.PatchConvey("add chain with no chian name", c.T(), func() {
c.mockContainer.EXPECT().AddGraphInfo(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(graphName string, graphInfo *compose.GraphInfo, graphOpt model.GraphOption) (graphID string, err error) {
c.mockContainer.EXPECT().AddGraphInfo(gomock.Any(), gomock.Any()).DoAndReturn(
func(graphName string, graphInfo *compose.GraphInfo) (graphID string, err error) {
assert.Equal(c.T(), "callback_test.buildCallbackChain:73", graphName)
return "", nil
}).Times(1)
Expand All @@ -97,11 +97,10 @@ func (c *callbackTestSuite) Test_NewGlobalDevGraphCompileCallback() {

mockey.PatchConvey("skip eino devops compile graph", c.T(), func() {
var gi model.GraphInfo
c.mockContainer.EXPECT().AddGraphInfo(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(graphName string, graphInfo *compose.GraphInfo, graphOpt model.GraphOption) (graphID string, err error) {
c.mockContainer.EXPECT().AddGraphInfo(gomock.Any(), gomock.Any()).DoAndReturn(
func(graphName string, graphInfo *compose.GraphInfo) (graphID string, err error) {
gi = model.GraphInfo{
GraphInfo: graphInfo,
Option: graphOpt,
}

assert.Equal(c.T(), "callback_test.buildCallbackGraph:64", graphName)
Expand Down
Loading

0 comments on commit c0ddf92

Please sign in to comment.