From 3aa7d11419f439a2298544ad80020f2e614c999e Mon Sep 17 00:00:00 2001 From: irenarindos Date: Thu, 9 Jun 2022 16:04:13 -0400 Subject: [PATCH 1/3] feat(worker): Map cluster ID in config to upstream address --- internal/cmd/commands/server/server.go | 9 ++++ internal/cmd/config/config.go | 3 ++ .../daemon/worker/controller_connection.go | 16 ++++-- internal/daemon/worker/status.go | 52 ++++++++++++++++--- 4 files changed, 71 insertions(+), 9 deletions(-) diff --git a/internal/cmd/commands/server/server.go b/internal/cmd/commands/server/server.go index ef519e8823..ce2d414696 100644 --- a/internal/cmd/commands/server/server.go +++ b/internal/cmd/commands/server/server.go @@ -4,6 +4,7 @@ import ( "context" stderrors "errors" "fmt" + "github.com/hashicorp/go-uuid" "net" "os" "os/signal" @@ -330,6 +331,14 @@ func (c *Command) Run(args []string) int { } } } + + if c.Config.Worker.HCPBClusterId != "" { + _, err := uuid.ParseUUID(c.Config.Worker.HCPBClusterId) + if err != nil { + c.UI.Error(fmt.Errorf("Invalid HCPB cluster id %q: %w", c.Config.Worker.HCPBClusterId, err).Error()) + return base.CommandUserError + } + } } if err := c.SetupListeners(c.UI, c.Config.SharedConfig, []string{"api", "cluster", "proxy", "ops"}); err != nil { c.UI.Error(err.Error()) diff --git a/internal/cmd/config/config.go b/internal/cmd/config/config.go index 4d45c617ad..4f2603c12d 100644 --- a/internal/cmd/config/config.go +++ b/internal/cmd/config/config.go @@ -198,6 +198,9 @@ type Worker struct { // AuthStoragePath represents the location a worker stores its node credentials, if set AuthStoragePath string `hcl:"auth_storage_path"` + + // Internal field for use with HCP deployments. Will be used if controllers/ initial_upstreams is not set + HCPBClusterId string `hcl:"hcp_boundary_cluster_id"` } func (w *Worker) InitNameIfEmpty() (string, error) { diff --git a/internal/daemon/worker/controller_connection.go b/internal/daemon/worker/controller_connection.go index e75ac8a624..6c7039d797 100644 --- a/internal/daemon/worker/controller_connection.go +++ b/internal/daemon/worker/controller_connection.go @@ -35,6 +35,8 @@ import ( "google.golang.org/protobuf/proto" ) +const hcpb_url_suffix = ".proxy.boundary.hashicorp.cloud" + // StartControllerConnections starts up the resolver and initiates controller // connection client creation func (w *Worker) StartControllerConnections() error { @@ -57,7 +59,13 @@ func (w *Worker) StartControllerConnections() error { } if len(initialAddrs) == 0 { - return errors.New("no initial controller addresses found") + if w.conf.RawConfig.Worker.HCPBClusterId != "" { + clusterAddress := fmt.Sprintf("%s%s", w.conf.RawConfig.Worker.HCPBClusterId, hcpb_url_suffix) + initialAddrs = append(initialAddrs, resolver.Address{Addr: clusterAddress}) + event.WriteSysEvent(context.TODO(), op, fmt.Sprintf("Setting HCPB Cluster address %s as upstream address", clusterAddress)) + } else { + return errors.New("no initial controller addresses found") + } } w.Resolver().InitialState(resolver.State{ @@ -101,8 +109,10 @@ func (w *Worker) controllerDialerFunc() func(context.Context, string) (net.Conn, event.WriteError(ctx, op, err) } - if !w.everAuthenticated.Load() && err == nil && conn != nil { - w.everAuthenticated.Store(true) + if err == nil && conn != nil { + if !w.everAuthenticated.Load() { + w.everAuthenticated.Store(true) + } event.WriteSysEvent(ctx, op, "worker has successfully authenticated") } diff --git a/internal/daemon/worker/status.go b/internal/daemon/worker/status.go index 8890ef111d..3f875df9b6 100644 --- a/internal/daemon/worker/status.go +++ b/internal/daemon/worker/status.go @@ -3,21 +3,24 @@ package worker import ( "context" "errors" + "fmt" "math/rand" + "sort" + "strings" "time" - "github.com/hashicorp/boundary/internal/servers" - "github.com/hashicorp/boundary/internal/daemon/worker/common" "github.com/hashicorp/boundary/internal/daemon/worker/session" pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services" "github.com/hashicorp/boundary/internal/observability/event" + "github.com/hashicorp/boundary/internal/servers" "google.golang.org/grpc/resolver" ) type LastStatusInformation struct { *pbs.StatusResponse - StatusTime time.Time + StatusTime time.Time + LastCalculatedUpstreams []resolver.Address } func (w *Worker) startStatusTicking(cancelCtx context.Context) { @@ -191,14 +194,29 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context) { } else { w.updateTags.Store(false) // This may be nil if we are in a multiple hop scenario + var addrs []resolver.Address if len(result.CalculatedUpstreams) > 0 { - addrs := make([]resolver.Address, 0, len(result.CalculatedUpstreams)) + addrs = make([]resolver.Address, 0, len(result.CalculatedUpstreams)) for _, v := range result.CalculatedUpstreams { addrs = append(addrs, resolver.Address{Addr: v.Address}) } - w.Resolver().UpdateState(resolver.State{Addresses: addrs}) + lastStatus := w.lastStatusSuccess.Load().(*LastStatusInformation) + // Compare upstreams; update resolver if there is a difference, and emit an event with old and new addresses + if lastStatus != nil && upstreamsHasChanged(lastStatus.LastCalculatedUpstreams, addrs) { + var oldUpstreams []string + for _, v := range lastStatus.LastCalculatedUpstreams { + oldUpstreams = append(oldUpstreams, v.Addr) + } + var newUpstreams []string + for _, v := range addrs { + newUpstreams = append(newUpstreams, v.Addr) + } + upstreamsMessage := fmt.Sprintf("Upstreams has changed; old upstreams were: %s, new upstreams are: %s", oldUpstreams, newUpstreams) + event.WriteSysEvent(context.TODO(), op, upstreamsMessage) + w.Resolver().UpdateState(resolver.State{Addresses: addrs}) + } } - w.lastStatusSuccess.Store(&LastStatusInformation{StatusResponse: result, StatusTime: time.Now()}) + w.lastStatusSuccess.Store(&LastStatusInformation{StatusResponse: result, StatusTime: time.Now(), LastCalculatedUpstreams: addrs}) for _, request := range result.GetJobsRequests() { switch request.GetRequestType() { @@ -239,6 +257,28 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context) { w.cleanupConnections(cancelCtx, false) } +func upstreamsHasChanged(oldUpstreams, newUpstreams []resolver.Address) bool { + if len(oldUpstreams) != len(newUpstreams) { + return true + } + + // Sort both upstreams + sort.Slice(oldUpstreams, func(i, j int) bool { + return strings.Compare(oldUpstreams[i].Addr, oldUpstreams[j].Addr) < 0 + }) + sort.Slice(newUpstreams, func(i, j int) bool { + return strings.Compare(newUpstreams[i].Addr, newUpstreams[j].Addr) < 0 + }) + + // Compare and return true if change detected + for i, _ := range oldUpstreams { + if oldUpstreams[i].Addr != newUpstreams[i].Addr { + return true + } + } + return false +} + // cleanupConnections walks all sessions and shuts down connections. // Additionally, sessions without connections are cleaned up from the // local worker's state. From 256498d40fcb91d2b8a83c94cb887662d8ec7e89 Mon Sep 17 00:00:00 2001 From: irenarindos Date: Thu, 9 Jun 2022 16:39:20 -0400 Subject: [PATCH 2/3] fixup! feat(worker): Map cluster ID in config to upstream address --- internal/cmd/commands/server/server.go | 8 ++-- internal/cmd/config/config.go | 6 +-- .../daemon/worker/controller_connection.go | 6 +-- internal/daemon/worker/status.go | 46 ++++--------------- 4 files changed, 20 insertions(+), 46 deletions(-) diff --git a/internal/cmd/commands/server/server.go b/internal/cmd/commands/server/server.go index ce2d414696..f0aed5ed98 100644 --- a/internal/cmd/commands/server/server.go +++ b/internal/cmd/commands/server/server.go @@ -4,7 +4,6 @@ import ( "context" stderrors "errors" "fmt" - "github.com/hashicorp/go-uuid" "net" "os" "os/signal" @@ -33,6 +32,7 @@ import ( "github.com/hashicorp/go-secure-stdlib/mlock" "github.com/hashicorp/go-secure-stdlib/parseutil" "github.com/hashicorp/go-secure-stdlib/pluginutil/v2" + "github.com/hashicorp/go-uuid" "github.com/mitchellh/cli" "github.com/posener/complete" "go.uber.org/atomic" @@ -332,10 +332,10 @@ func (c *Command) Run(args []string) int { } } - if c.Config.Worker.HCPBClusterId != "" { - _, err := uuid.ParseUUID(c.Config.Worker.HCPBClusterId) + if c.Config.HCPBClusterId != "" { + _, err := uuid.ParseUUID(c.Config.HCPBClusterId) if err != nil { - c.UI.Error(fmt.Errorf("Invalid HCPB cluster id %q: %w", c.Config.Worker.HCPBClusterId, err).Error()) + c.UI.Error(fmt.Errorf("Invalid HCPB cluster id %q: %w", c.Config.HCPBClusterId, err).Error()) return base.CommandUserError } } diff --git a/internal/cmd/config/config.go b/internal/cmd/config/config.go index 4f2603c12d..7e054b0821 100644 --- a/internal/cmd/config/config.go +++ b/internal/cmd/config/config.go @@ -124,6 +124,9 @@ type Config struct { // Plugin-related options Plugins Plugins `hcl:"plugins"` + + // Internal field for use with HCP deployments. Used if controllers/ initial_upstreams is not set + HCPBClusterId string `hcl:"hcp_boundary_cluster_id"` } type Controller struct { @@ -198,9 +201,6 @@ type Worker struct { // AuthStoragePath represents the location a worker stores its node credentials, if set AuthStoragePath string `hcl:"auth_storage_path"` - - // Internal field for use with HCP deployments. Will be used if controllers/ initial_upstreams is not set - HCPBClusterId string `hcl:"hcp_boundary_cluster_id"` } func (w *Worker) InitNameIfEmpty() (string, error) { diff --git a/internal/daemon/worker/controller_connection.go b/internal/daemon/worker/controller_connection.go index 6c7039d797..82e410e4ea 100644 --- a/internal/daemon/worker/controller_connection.go +++ b/internal/daemon/worker/controller_connection.go @@ -35,7 +35,7 @@ import ( "google.golang.org/protobuf/proto" ) -const hcpb_url_suffix = ".proxy.boundary.hashicorp.cloud" +const hcpbUrlSuffix = ".proxy.boundary.hashicorp.cloud" // StartControllerConnections starts up the resolver and initiates controller // connection client creation @@ -59,8 +59,8 @@ func (w *Worker) StartControllerConnections() error { } if len(initialAddrs) == 0 { - if w.conf.RawConfig.Worker.HCPBClusterId != "" { - clusterAddress := fmt.Sprintf("%s%s", w.conf.RawConfig.Worker.HCPBClusterId, hcpb_url_suffix) + if w.conf.RawConfig.HCPBClusterId != "" { + clusterAddress := fmt.Sprintf("%s%s", w.conf.RawConfig.HCPBClusterId, hcpbUrlSuffix) initialAddrs = append(initialAddrs, resolver.Address{Addr: clusterAddress}) event.WriteSysEvent(context.TODO(), op, fmt.Sprintf("Setting HCPB Cluster address %s as upstream address", clusterAddress)) } else { diff --git a/internal/daemon/worker/status.go b/internal/daemon/worker/status.go index 3f875df9b6..14b53e083a 100644 --- a/internal/daemon/worker/status.go +++ b/internal/daemon/worker/status.go @@ -4,9 +4,8 @@ import ( "context" "errors" "fmt" + "github.com/hashicorp/go-secure-stdlib/strutil" "math/rand" - "sort" - "strings" "time" "github.com/hashicorp/boundary/internal/daemon/worker/common" @@ -20,7 +19,7 @@ import ( type LastStatusInformation struct { *pbs.StatusResponse StatusTime time.Time - LastCalculatedUpstreams []resolver.Address + LastCalculatedUpstreams []string } func (w *Worker) startStatusTicking(cancelCtx context.Context) { @@ -195,28 +194,25 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context) { w.updateTags.Store(false) // This may be nil if we are in a multiple hop scenario var addrs []resolver.Address + var newUpstreams []string if len(result.CalculatedUpstreams) > 0 { addrs = make([]resolver.Address, 0, len(result.CalculatedUpstreams)) for _, v := range result.CalculatedUpstreams { addrs = append(addrs, resolver.Address{Addr: v.Address}) + newUpstreams = append(newUpstreams, v.Address) } lastStatus := w.lastStatusSuccess.Load().(*LastStatusInformation) // Compare upstreams; update resolver if there is a difference, and emit an event with old and new addresses - if lastStatus != nil && upstreamsHasChanged(lastStatus.LastCalculatedUpstreams, addrs) { - var oldUpstreams []string - for _, v := range lastStatus.LastCalculatedUpstreams { - oldUpstreams = append(oldUpstreams, v.Addr) - } - var newUpstreams []string - for _, v := range addrs { - newUpstreams = append(newUpstreams, v.Addr) - } - upstreamsMessage := fmt.Sprintf("Upstreams has changed; old upstreams were: %s, new upstreams are: %s", oldUpstreams, newUpstreams) + if lastStatus != nil && !strutil.EquivalentSlices(lastStatus.LastCalculatedUpstreams, newUpstreams) { + upstreamsMessage := fmt.Sprintf("Upstreams has changed; old upstreams were: %s, new upstreams are: %s", lastStatus.LastCalculatedUpstreams, newUpstreams) event.WriteSysEvent(context.TODO(), op, upstreamsMessage) w.Resolver().UpdateState(resolver.State{Addresses: addrs}) + } else if lastStatus == nil { + w.Resolver().UpdateState(resolver.State{Addresses: addrs}) + event.WriteSysEvent(context.TODO(), op, fmt.Sprintf("Upstreams after first status set to: %s", newUpstreams)) } } - w.lastStatusSuccess.Store(&LastStatusInformation{StatusResponse: result, StatusTime: time.Now(), LastCalculatedUpstreams: addrs}) + w.lastStatusSuccess.Store(&LastStatusInformation{StatusResponse: result, StatusTime: time.Now(), LastCalculatedUpstreams: newUpstreams}) for _, request := range result.GetJobsRequests() { switch request.GetRequestType() { @@ -257,28 +253,6 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context) { w.cleanupConnections(cancelCtx, false) } -func upstreamsHasChanged(oldUpstreams, newUpstreams []resolver.Address) bool { - if len(oldUpstreams) != len(newUpstreams) { - return true - } - - // Sort both upstreams - sort.Slice(oldUpstreams, func(i, j int) bool { - return strings.Compare(oldUpstreams[i].Addr, oldUpstreams[j].Addr) < 0 - }) - sort.Slice(newUpstreams, func(i, j int) bool { - return strings.Compare(newUpstreams[i].Addr, newUpstreams[j].Addr) < 0 - }) - - // Compare and return true if change detected - for i, _ := range oldUpstreams { - if oldUpstreams[i].Addr != newUpstreams[i].Addr { - return true - } - } - return false -} - // cleanupConnections walks all sessions and shuts down connections. // Additionally, sessions without connections are cleaned up from the // local worker's state. From 3792fec48b390f3f08c7e03ce6e0f9d43a8145a8 Mon Sep 17 00:00:00 2001 From: irenarindos Date: Thu, 9 Jun 2022 16:42:56 -0400 Subject: [PATCH 3/3] fixup! feat(worker): Map cluster ID in config to upstream address --- internal/daemon/worker/status.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/daemon/worker/status.go b/internal/daemon/worker/status.go index 14b53e083a..75b5aae792 100644 --- a/internal/daemon/worker/status.go +++ b/internal/daemon/worker/status.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/hashicorp/go-secure-stdlib/strutil" "math/rand" "time" @@ -13,6 +12,7 @@ import ( pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services" "github.com/hashicorp/boundary/internal/observability/event" "github.com/hashicorp/boundary/internal/servers" + "github.com/hashicorp/go-secure-stdlib/strutil" "google.golang.org/grpc/resolver" )