Append initial worker upstreams if current upstreams are unresponsive (#2698)

* Append initial worker upstreams if current upstreams are unresponsive
pull/2711/head
Irena Rindos 3 years ago committed by GitHub
parent 24c3d6824d
commit 173e1587d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -16,6 +16,7 @@ import (
"github.com/hashicorp/boundary/internal/server"
"github.com/hashicorp/boundary/version"
"github.com/hashicorp/go-secure-stdlib/strutil"
"google.golang.org/grpc/connectivity"
)
var firstStatusCheckPostHooks []func(context.Context, *Worker) error
@ -206,6 +207,31 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
return true
})
// In the case that the control plane has gone down and come up with different IPs,
// append initial upstreams/ cluster addr to the resolver to try
if w.GrpcClientConn.GetState() == connectivity.TransientFailure {
lastStatus := w.lastStatusSuccess.Load().(*LastStatusInformation)
addrs := lastStatus.LastCalculatedUpstreams
if len(w.conf.RawConfig.Worker.InitialUpstreams) > 0 {
addrs = append(addrs, w.conf.RawConfig.Worker.InitialUpstreams...)
}
if len(w.conf.RawConfig.HcpbClusterId) > 0 {
clusterAddress := fmt.Sprintf("%s%s", w.conf.RawConfig.HcpbClusterId, hcpbUrlSuffix)
addrs = append(addrs, clusterAddress)
}
addrs = strutil.RemoveDuplicates(addrs, false)
if strutil.EquivalentSlices(lastStatus.LastCalculatedUpstreams, addrs) {
// Nothing to update
return
}
w.updateAddresses(cancelCtx, addrs, addressReceivers)
lastStatus.LastCalculatedUpstreams = addrs
w.lastStatusSuccess.Store(lastStatus)
}
// Exit out of status function; our work here is done and we don't need to create closeConnection requests
return
}
@ -243,36 +269,7 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
}
}
if len(addrs) > 0 {
lastStatus := w.lastStatusSuccess.Load().(*LastStatusInformation)
// Compare upstreams; update resolver if there is a difference, and emit an event with old and new addresses
if lastStatus != nil && !strutil.EquivalentSlices(lastStatus.LastCalculatedUpstreams, addrs) {
upstreamsMessage := fmt.Sprintf("Upstreams has changed; old upstreams were: %s, new upstreams are: %s", lastStatus.LastCalculatedUpstreams, addrs)
event.WriteSysEvent(cancelCtx, op, upstreamsMessage)
for _, as := range *addressReceivers {
as.SetAddresses(addrs)
}
} else if lastStatus == nil {
for _, as := range *addressReceivers {
as.SetAddresses(addrs)
}
event.WriteSysEvent(cancelCtx, op, fmt.Sprintf("Upstreams after first status set to: %s", addrs))
}
}
// regardless of whether or not it's a new address, we need to set
// them for dialingListeners
for _, as := range *addressReceivers {
switch {
case as.Type() == dialingListenerReceiverType:
tmpAddrs := make([]string, len(addrs))
copy(tmpAddrs, addrs)
if len(tmpAddrs) == 0 {
tmpAddrs = append(tmpAddrs, w.conf.RawConfig.Worker.InitialUpstreams...)
}
as.SetAddresses(tmpAddrs)
}
}
w.updateAddresses(cancelCtx, addrs, addressReceivers)
w.lastStatusSuccess.Store(&LastStatusInformation{StatusResponse: result, StatusTime: time.Now(), LastCalculatedUpstreams: addrs})
@ -330,6 +327,42 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
}
}
// Update address receivers and dialing listeners with new addrs
func (w *Worker) updateAddresses(cancelCtx context.Context, addrs []string, addressReceivers *[]addressReceiver) {
const op = "worker.(Worker).updateAddrs"
if len(addrs) > 0 {
lastStatus := w.lastStatusSuccess.Load().(*LastStatusInformation)
// Compare upstreams; update resolver if there is a difference, and emit an event with old and new addresses
if lastStatus != nil && !strutil.EquivalentSlices(lastStatus.LastCalculatedUpstreams, addrs) {
upstreamsMessage := fmt.Sprintf("Upstreams has changed; old upstreams were: %s, new upstreams are: %s", lastStatus.LastCalculatedUpstreams, addrs)
event.WriteSysEvent(cancelCtx, op, upstreamsMessage)
for _, as := range *addressReceivers {
as.SetAddresses(addrs)
}
} else if lastStatus == nil {
for _, as := range *addressReceivers {
as.SetAddresses(addrs)
}
event.WriteSysEvent(cancelCtx, op, fmt.Sprintf("Upstreams after first status set to: %s", addrs))
}
}
// regardless of whether or not it's a new address, we need to set
// them for dialingListeners
for _, as := range *addressReceivers {
switch {
case as.Type() == dialingListenerReceiverType:
tmpAddrs := make([]string, len(addrs))
copy(tmpAddrs, addrs)
if len(tmpAddrs) == 0 {
tmpAddrs = append(tmpAddrs, w.conf.RawConfig.Worker.InitialUpstreams...)
}
as.SetAddresses(tmpAddrs)
}
}
}
// cleanupConnections walks all sessions and shuts down all proxy connections.
// After the local connections are terminated, they are requested to be marked
// close on the controller.

@ -275,7 +275,7 @@ func NewTestWorker(t testing.TB, opts *TestWorkerOpts) *TestWorker {
tw.name = opts.Config.Worker.Name
if opts.SuccessfulStatusGracePeriodDuration != 0 {
opts.Config.Worker.SuccessfulStatusGracePeriod = opts.SuccessfulStatusGracePeriodDuration
opts.Config.Worker.SuccessfulStatusGracePeriodDuration = opts.SuccessfulStatusGracePeriodDuration
}
serverName, err := os.Hostname()

@ -203,7 +203,6 @@ func New(conf *Config) (*Worker, error) {
err)
}
}
switch conf.RawConfig.Worker.SuccessfulStatusGracePeriodDuration {
case 0:
w.successfulStatusGracePeriod.Store(int64(server.DefaultLiveness))

@ -1,6 +1,7 @@
package cluster
import (
"context"
"testing"
"time"
@ -8,6 +9,8 @@ import (
"github.com/hashicorp/boundary/internal/daemon/controller"
"github.com/hashicorp/boundary/internal/daemon/worker"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/strutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -83,3 +86,57 @@ func TestMultiControllerMultiWorkerConnections(t *testing.T) {
expectWorkers(t, c1, w1, w2)
expectWorkers(t, c2, w1, w2)
}
func TestWorkerAppendInitialUpstreams(t *testing.T) {
ctx := context.Background()
require, assert := require.New(t), assert.New(t)
logger := hclog.New(&hclog.LoggerOptions{
Level: hclog.Trace,
})
conf, err := config.DevController()
require.NoError(err)
c1 := controller.NewTestController(t, &controller.TestControllerOpts{
Config: conf,
Logger: logger.Named("c1"),
})
defer c1.Shutdown()
expectWorkers(t, c1)
initialUpstreams := append(c1.ClusterAddrs(), "127.0.0.9")
w1 := worker.NewTestWorker(t, &worker.TestWorkerOpts{
WorkerAuthKms: c1.Config().WorkerAuthKms,
InitialUpstreams: initialUpstreams,
Logger: logger.Named("w1"),
SuccessfulStatusGracePeriodDuration: 1 * time.Second,
})
defer w1.Shutdown()
// Wait for worker to send status
cancelCtx, _ := context.WithTimeout(ctx, 10*time.Second)
for {
select {
case <-time.After(500 * time.Millisecond):
break
case <-cancelCtx.Done():
require.FailNow("No worker found after 10 seconds")
}
successSent := w1.Worker().LastStatusSuccess()
if successSent != nil {
break
}
}
expectWorkers(t, c1, w1)
// Upstreams should be equivalent to the controller cluster addr after status updates
assert.Equal(c1.ClusterAddrs(), w1.Worker().LastStatusSuccess().LastCalculatedUpstreams)
// Bring down the controller
c1.Shutdown()
time.Sleep(3 * time.Second) // Wait a little longer than the grace period
// Upstreams should now match initial upstreams
assert.True(strutil.EquivalentSlices(initialUpstreams, w1.Worker().LastStatusSuccess().LastCalculatedUpstreams))
}

Loading…
Cancel
Save