From 173e1587d007871b0bd1ba2c7ea8cfd3a87ded5e Mon Sep 17 00:00:00 2001 From: Irena Rindos Date: Mon, 12 Dec 2022 13:48:28 -0500 Subject: [PATCH] Append initial worker upstreams if current upstreams are unresponsive (#2698) * Append initial worker upstreams if current upstreams are unresponsive --- internal/daemon/worker/status.go | 93 +++++++++++++------ internal/daemon/worker/testing.go | 2 +- internal/daemon/worker/worker.go | 1 - .../cluster/multi_controller_worker_test.go | 57 ++++++++++++ 4 files changed, 121 insertions(+), 32 deletions(-) diff --git a/internal/daemon/worker/status.go b/internal/daemon/worker/status.go index 09c1f1b629..e4db0fa3ba 100644 --- a/internal/daemon/worker/status.go +++ b/internal/daemon/worker/status.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/boundary/internal/server" "github.com/hashicorp/boundary/version" "github.com/hashicorp/go-secure-stdlib/strutil" + "google.golang.org/grpc/connectivity" ) var firstStatusCheckPostHooks []func(context.Context, *Worker) error @@ -206,6 +207,31 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess return true }) + // In the case that the control plane has gone down and come up with different IPs, + // append initial upstreams/ cluster addr to the resolver to try + if w.GrpcClientConn.GetState() == connectivity.TransientFailure { + lastStatus := w.lastStatusSuccess.Load().(*LastStatusInformation) + addrs := lastStatus.LastCalculatedUpstreams + + if len(w.conf.RawConfig.Worker.InitialUpstreams) > 0 { + addrs = append(addrs, w.conf.RawConfig.Worker.InitialUpstreams...) + } + if len(w.conf.RawConfig.HcpbClusterId) > 0 { + clusterAddress := fmt.Sprintf("%s%s", w.conf.RawConfig.HcpbClusterId, hcpbUrlSuffix) + addrs = append(addrs, clusterAddress) + } + + addrs = strutil.RemoveDuplicates(addrs, false) + if strutil.EquivalentSlices(lastStatus.LastCalculatedUpstreams, addrs) { + // Nothing to update + return + } + + w.updateAddresses(cancelCtx, addrs, addressReceivers) + lastStatus.LastCalculatedUpstreams = addrs + w.lastStatusSuccess.Store(lastStatus) + } + // Exit out of status function; our work here is done and we don't need to create closeConnection requests return } @@ -243,36 +269,7 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess } } - if len(addrs) > 0 { - lastStatus := w.lastStatusSuccess.Load().(*LastStatusInformation) - // Compare upstreams; update resolver if there is a difference, and emit an event with old and new addresses - if lastStatus != nil && !strutil.EquivalentSlices(lastStatus.LastCalculatedUpstreams, addrs) { - upstreamsMessage := fmt.Sprintf("Upstreams has changed; old upstreams were: %s, new upstreams are: %s", lastStatus.LastCalculatedUpstreams, addrs) - event.WriteSysEvent(cancelCtx, op, upstreamsMessage) - for _, as := range *addressReceivers { - as.SetAddresses(addrs) - } - } else if lastStatus == nil { - for _, as := range *addressReceivers { - as.SetAddresses(addrs) - } - event.WriteSysEvent(cancelCtx, op, fmt.Sprintf("Upstreams after first status set to: %s", addrs)) - } - } - - // regardless of whether or not it's a new address, we need to set - // them for dialingListeners - for _, as := range *addressReceivers { - switch { - case as.Type() == dialingListenerReceiverType: - tmpAddrs := make([]string, len(addrs)) - copy(tmpAddrs, addrs) - if len(tmpAddrs) == 0 { - tmpAddrs = append(tmpAddrs, w.conf.RawConfig.Worker.InitialUpstreams...) - } - as.SetAddresses(tmpAddrs) - } - } + w.updateAddresses(cancelCtx, addrs, addressReceivers) w.lastStatusSuccess.Store(&LastStatusInformation{StatusResponse: result, StatusTime: time.Now(), LastCalculatedUpstreams: addrs}) @@ -330,6 +327,42 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess } } +// Update address receivers and dialing listeners with new addrs +func (w *Worker) updateAddresses(cancelCtx context.Context, addrs []string, addressReceivers *[]addressReceiver) { + const op = "worker.(Worker).updateAddrs" + + if len(addrs) > 0 { + lastStatus := w.lastStatusSuccess.Load().(*LastStatusInformation) + // Compare upstreams; update resolver if there is a difference, and emit an event with old and new addresses + if lastStatus != nil && !strutil.EquivalentSlices(lastStatus.LastCalculatedUpstreams, addrs) { + upstreamsMessage := fmt.Sprintf("Upstreams has changed; old upstreams were: %s, new upstreams are: %s", lastStatus.LastCalculatedUpstreams, addrs) + event.WriteSysEvent(cancelCtx, op, upstreamsMessage) + for _, as := range *addressReceivers { + as.SetAddresses(addrs) + } + } else if lastStatus == nil { + for _, as := range *addressReceivers { + as.SetAddresses(addrs) + } + event.WriteSysEvent(cancelCtx, op, fmt.Sprintf("Upstreams after first status set to: %s", addrs)) + } + } + + // regardless of whether or not it's a new address, we need to set + // them for dialingListeners + for _, as := range *addressReceivers { + switch { + case as.Type() == dialingListenerReceiverType: + tmpAddrs := make([]string, len(addrs)) + copy(tmpAddrs, addrs) + if len(tmpAddrs) == 0 { + tmpAddrs = append(tmpAddrs, w.conf.RawConfig.Worker.InitialUpstreams...) + } + as.SetAddresses(tmpAddrs) + } + } +} + // cleanupConnections walks all sessions and shuts down all proxy connections. // After the local connections are terminated, they are requested to be marked // close on the controller. diff --git a/internal/daemon/worker/testing.go b/internal/daemon/worker/testing.go index 7cd6870370..c90f7d66b9 100644 --- a/internal/daemon/worker/testing.go +++ b/internal/daemon/worker/testing.go @@ -275,7 +275,7 @@ func NewTestWorker(t testing.TB, opts *TestWorkerOpts) *TestWorker { tw.name = opts.Config.Worker.Name if opts.SuccessfulStatusGracePeriodDuration != 0 { - opts.Config.Worker.SuccessfulStatusGracePeriod = opts.SuccessfulStatusGracePeriodDuration + opts.Config.Worker.SuccessfulStatusGracePeriodDuration = opts.SuccessfulStatusGracePeriodDuration } serverName, err := os.Hostname() diff --git a/internal/daemon/worker/worker.go b/internal/daemon/worker/worker.go index 8e48004627..57bf1906c3 100644 --- a/internal/daemon/worker/worker.go +++ b/internal/daemon/worker/worker.go @@ -203,7 +203,6 @@ func New(conf *Config) (*Worker, error) { err) } } - switch conf.RawConfig.Worker.SuccessfulStatusGracePeriodDuration { case 0: w.successfulStatusGracePeriod.Store(int64(server.DefaultLiveness)) diff --git a/internal/tests/cluster/multi_controller_worker_test.go b/internal/tests/cluster/multi_controller_worker_test.go index df6c023b91..d0ef4a6514 100644 --- a/internal/tests/cluster/multi_controller_worker_test.go +++ b/internal/tests/cluster/multi_controller_worker_test.go @@ -1,6 +1,7 @@ package cluster import ( + "context" "testing" "time" @@ -8,6 +9,8 @@ import ( "github.com/hashicorp/boundary/internal/daemon/controller" "github.com/hashicorp/boundary/internal/daemon/worker" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-secure-stdlib/strutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -83,3 +86,57 @@ func TestMultiControllerMultiWorkerConnections(t *testing.T) { expectWorkers(t, c1, w1, w2) expectWorkers(t, c2, w1, w2) } + +func TestWorkerAppendInitialUpstreams(t *testing.T) { + ctx := context.Background() + require, assert := require.New(t), assert.New(t) + logger := hclog.New(&hclog.LoggerOptions{ + Level: hclog.Trace, + }) + + conf, err := config.DevController() + require.NoError(err) + + c1 := controller.NewTestController(t, &controller.TestControllerOpts{ + Config: conf, + Logger: logger.Named("c1"), + }) + defer c1.Shutdown() + + expectWorkers(t, c1) + + initialUpstreams := append(c1.ClusterAddrs(), "127.0.0.9") + w1 := worker.NewTestWorker(t, &worker.TestWorkerOpts{ + WorkerAuthKms: c1.Config().WorkerAuthKms, + InitialUpstreams: initialUpstreams, + Logger: logger.Named("w1"), + SuccessfulStatusGracePeriodDuration: 1 * time.Second, + }) + defer w1.Shutdown() + + // Wait for worker to send status + cancelCtx, _ := context.WithTimeout(ctx, 10*time.Second) + for { + select { + case <-time.After(500 * time.Millisecond): + break + case <-cancelCtx.Done(): + require.FailNow("No worker found after 10 seconds") + } + successSent := w1.Worker().LastStatusSuccess() + if successSent != nil { + break + } + } + expectWorkers(t, c1, w1) + + // Upstreams should be equivalent to the controller cluster addr after status updates + assert.Equal(c1.ClusterAddrs(), w1.Worker().LastStatusSuccess().LastCalculatedUpstreams) + + // Bring down the controller + c1.Shutdown() + time.Sleep(3 * time.Second) // Wait a little longer than the grace period + + // Upstreams should now match initial upstreams + assert.True(strutil.EquivalentSlices(initialUpstreams, w1.Worker().LastStatusSuccess().LastCalculatedUpstreams)) +}