diff --git a/dag/dag.go b/dag/dag.go index 4177cd07e6..0026fd9fda 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -166,7 +166,7 @@ func (g *AcyclicGraph) Cycles() [][]Vertex { func (g *AcyclicGraph) Walk(cb WalkFunc) error { defer g.debug.BeginOperation(typeWalk, "").End("") - w := &walker{Callback: cb, Reverse: true} + w := &Walker{Callback: cb, Reverse: true} w.Update(&g.Graph) return w.Wait() } diff --git a/dag/walk.go b/dag/walk.go index 73045e0fe1..826b7816e6 100644 --- a/dag/walk.go +++ b/dag/walk.go @@ -10,8 +10,23 @@ import ( "github.com/hashicorp/go-multierror" ) -// walker performs a graph walk and supports walk-time changing of vertices -// and edges. +// Walker is used to walk every vertex of a graph in parallel. +// +// A vertex will only be walked when the dependencies of that vertex have +// been walked. If two vertices can be walked at the same time, they will be. +// +// Update can be called to update the graph. This can be called even during +// a walk, cahnging vertices/edges mid-walk. This should be done carefully. +// If a vertex is removed but has already been executed, the result of that +// execution (any error) is still returned by Wait. Changing or re-adding +// a vertex that has already executed has no effect. Changing edges of +// a vertex that has already executed has no effect. +// +// Non-parallelism can be enforced by introducing a lock in your callback +// function. However, the goroutine overhead of a walk will remain. +// Walker will create V*2 goroutines (one for each vertex, and dependency +// waiter for each vertex). In general this should be of no concern unless +// there are a huge number of vertices. // // The walk is depth first by default. This can be changed with the Reverse // option. @@ -19,7 +34,7 @@ import ( // A single walker is only valid for one graph walk. After the walk is complete // you must construct a new walker to walk again. State for the walk is never // deleted in case vertices or edges are changed. -type walker struct { +type Walker struct { // Callback is what is called for each vertex Callback WalkFunc @@ -46,18 +61,34 @@ type walker struct { } type walkerVertex struct { - // These should only be set once on initialization and never written again + // These should only be set once on initialization and never written again. + // They are not protected by a lock since they don't need to be since + // they are write-once. + + // DoneCh is closed when this vertex has completed execution, regardless + // of success. + // + // CancelCh is closed when the vertex should cancel execution. If execution + // is already complete (DoneCh is closed), this has no effect. Otherwise, + // execution is cancelled as quickly as possible. DoneCh chan struct{} CancelCh chan struct{} // Dependency information. Any changes to any of these fields requires // holding DepsLock. + // + // DepsCh is sent a single value that denotes whether the upstream deps + // were successful (no errors). Any value sent means that the upstream + // dependencies are complete. No other values will ever be sent again. + // + // DepsUpdateCh is closed when there is a new DepsCh set. DepsCh chan bool DepsUpdateCh chan struct{} DepsLock sync.Mutex // Below is not safe to read/write in parallel. This behavior is - // enforced by changes only happening in Update. + // enforced by changes only happening in Update. Nothing else should + // ever modify these. deps map[Vertex]chan struct{} depsCancelCh chan struct{} } @@ -74,7 +105,7 @@ var errWalkUpstream = errors.New("upstream dependency failed") // Wait will return as soon as all currently known vertices are complete. // If you plan on calling Update with more vertices in the future, you // should not call Wait until after this is done. -func (w *walker) Wait() error { +func (w *Walker) Wait() error { // Wait for completion w.wait.Wait() @@ -94,11 +125,17 @@ func (w *walker) Wait() error { return result } -// Update updates the currently executing walk with the given vertices -// and edges. It does not block until completion. +// Update updates the currently executing walk with the given graph. +// This will perform a diff of the vertices and edges and update the walker. +// Already completed vertices remain completed (including any errors during +// their execution). // -// Update can be called in parallel to Walk. -func (w *walker) Update(g *Graph) { +// This returns immediately once the walker is updated; it does not wait +// for completion of the walk. +// +// Multiple Updates can be called in parallel. Update can be called at any +// time during a walk. +func (w *Walker) Update(g *Graph) { v, e := g.vertices, g.edges // Grab the change lock so no more updates happen but also so that @@ -277,7 +314,7 @@ func (w *walker) Update(g *Graph) { // edgeParts returns the waiter and the dependency, in that order. // The waiter is waiting on the dependency. -func (w *walker) edgeParts(e Edge) (Vertex, Vertex) { +func (w *Walker) edgeParts(e Edge) (Vertex, Vertex) { if w.Reverse { return e.Source(), e.Target() } @@ -287,7 +324,7 @@ func (w *walker) edgeParts(e Edge) (Vertex, Vertex) { // walkVertex walks a single vertex, waiting for any dependencies before // executing the callback. -func (w *walker) walkVertex(v Vertex, info *walkerVertex) { +func (w *Walker) walkVertex(v Vertex, info *walkerVertex) { // When we're done executing, lower the waitgroup count defer w.wait.Done() @@ -297,6 +334,7 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) { // Wait for our dependencies. We create a [closed] deps channel so // that we can immediately fall through to load our actual DepsCh. var depsSuccess bool + var depsUpdateCh chan struct{} depsCh := make(chan bool, 1) depsCh <- true close(depsCh) @@ -310,7 +348,7 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) { // Deps complete! Mark as nil to trigger completion handling. depsCh = nil - case <-info.DepsUpdateCh: + case <-depsUpdateCh: // New deps, reloop } @@ -323,6 +361,9 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) { depsCh = info.DepsCh info.DepsCh = nil } + if info.DepsUpdateCh != nil { + depsUpdateCh = info.DepsUpdateCh + } info.DepsLock.Unlock() // If we still have no deps channel set, then we're done! @@ -362,7 +403,7 @@ func (w *walker) walkVertex(v Vertex, info *walkerVertex) { } } -func (w *walker) waitDeps( +func (w *Walker) waitDeps( v Vertex, deps map[Vertex]<-chan struct{}, doneCh chan<- bool, diff --git a/dag/walk_test.go b/dag/walk_test.go index 8f99d9046e..04e6ebe3ed 100644 --- a/dag/walk_test.go +++ b/dag/walk_test.go @@ -17,7 +17,7 @@ func TestWalker_basic(t *testing.T) { // Run it a bunch of times since it is timing dependent for i := 0; i < 50; i++ { var order []interface{} - w := &walker{Callback: walkCbRecord(&order)} + w := &Walker{Callback: walkCbRecord(&order)} w.Update(&g) // Wait @@ -56,7 +56,7 @@ func TestWalker_error(t *testing.T) { return recordF(v) } - w := &walker{Callback: cb} + w := &Walker{Callback: cb} w.Update(&g) // Wait @@ -80,7 +80,7 @@ func TestWalker_newVertex(t *testing.T) { g.Connect(BasicEdge(1, 2)) var order []interface{} - w := &walker{Callback: walkCbRecord(&order)} + w := &Walker{Callback: walkCbRecord(&order)} w.Update(&g) // Wait a bit @@ -120,7 +120,7 @@ func TestWalker_removeVertex(t *testing.T) { recordF := walkCbRecord(&order) // Build a callback that delays until we close a channel - var w *walker + var w *Walker cb := func(v Vertex) error { if v == 1 { g.Remove(2) @@ -131,7 +131,7 @@ func TestWalker_removeVertex(t *testing.T) { } // Add the initial vertices - w = &walker{Callback: cb} + w = &Walker{Callback: cb} w.Update(&g) // Wait @@ -160,7 +160,7 @@ func TestWalker_newEdge(t *testing.T) { recordF := walkCbRecord(&order) // Build a callback that delays until we close a channel - var w *walker + var w *Walker cb := func(v Vertex) error { if v == 1 { g.Add(3) @@ -172,7 +172,7 @@ func TestWalker_newEdge(t *testing.T) { } // Add the initial vertices - w = &walker{Callback: cb} + w = &Walker{Callback: cb} w.Update(&g) // Wait @@ -209,7 +209,7 @@ func TestWalker_removeEdge(t *testing.T) { // forcing 2 before 3 via the callback (and not the graph). If // 2 cannot execute before 3 (edge removal is non-functional), then // this test will timeout. - var w *walker + var w *Walker gateCh := make(chan struct{}) cb := func(v Vertex) error { if v == 1 { @@ -233,7 +233,7 @@ func TestWalker_removeEdge(t *testing.T) { } // Add the initial vertices - w = &walker{Callback: cb} + w = &Walker{Callback: cb} w.Update(&g) // Wait