fixup! feat(worker): Map cluster ID in config to upstream address

pull/2163/head
irenarindos 4 years ago
parent 3aa7d11419
commit 256498d40f

@ -4,7 +4,6 @@ import (
"context"
stderrors "errors"
"fmt"
"github.com/hashicorp/go-uuid"
"net"
"os"
"os/signal"
@ -33,6 +32,7 @@ import (
"github.com/hashicorp/go-secure-stdlib/mlock"
"github.com/hashicorp/go-secure-stdlib/parseutil"
"github.com/hashicorp/go-secure-stdlib/pluginutil/v2"
"github.com/hashicorp/go-uuid"
"github.com/mitchellh/cli"
"github.com/posener/complete"
"go.uber.org/atomic"
@ -332,10 +332,10 @@ func (c *Command) Run(args []string) int {
}
}
if c.Config.Worker.HCPBClusterId != "" {
_, err := uuid.ParseUUID(c.Config.Worker.HCPBClusterId)
if c.Config.HCPBClusterId != "" {
_, err := uuid.ParseUUID(c.Config.HCPBClusterId)
if err != nil {
c.UI.Error(fmt.Errorf("Invalid HCPB cluster id %q: %w", c.Config.Worker.HCPBClusterId, err).Error())
c.UI.Error(fmt.Errorf("Invalid HCPB cluster id %q: %w", c.Config.HCPBClusterId, err).Error())
return base.CommandUserError
}
}

@ -124,6 +124,9 @@ type Config struct {
// Plugin-related options
Plugins Plugins `hcl:"plugins"`
// Internal field for use with HCP deployments. Used if controllers/ initial_upstreams is not set
HCPBClusterId string `hcl:"hcp_boundary_cluster_id"`
}
type Controller struct {
@ -198,9 +201,6 @@ type Worker struct {
// AuthStoragePath represents the location a worker stores its node credentials, if set
AuthStoragePath string `hcl:"auth_storage_path"`
// Internal field for use with HCP deployments. Will be used if controllers/ initial_upstreams is not set
HCPBClusterId string `hcl:"hcp_boundary_cluster_id"`
}
func (w *Worker) InitNameIfEmpty() (string, error) {

@ -35,7 +35,7 @@ import (
"google.golang.org/protobuf/proto"
)
const hcpb_url_suffix = ".proxy.boundary.hashicorp.cloud"
const hcpbUrlSuffix = ".proxy.boundary.hashicorp.cloud"
// StartControllerConnections starts up the resolver and initiates controller
// connection client creation
@ -59,8 +59,8 @@ func (w *Worker) StartControllerConnections() error {
}
if len(initialAddrs) == 0 {
if w.conf.RawConfig.Worker.HCPBClusterId != "" {
clusterAddress := fmt.Sprintf("%s%s", w.conf.RawConfig.Worker.HCPBClusterId, hcpb_url_suffix)
if w.conf.RawConfig.HCPBClusterId != "" {
clusterAddress := fmt.Sprintf("%s%s", w.conf.RawConfig.HCPBClusterId, hcpbUrlSuffix)
initialAddrs = append(initialAddrs, resolver.Address{Addr: clusterAddress})
event.WriteSysEvent(context.TODO(), op, fmt.Sprintf("Setting HCPB Cluster address %s as upstream address", clusterAddress))
} else {

@ -4,9 +4,8 @@ import (
"context"
"errors"
"fmt"
"github.com/hashicorp/go-secure-stdlib/strutil"
"math/rand"
"sort"
"strings"
"time"
"github.com/hashicorp/boundary/internal/daemon/worker/common"
@ -20,7 +19,7 @@ import (
type LastStatusInformation struct {
*pbs.StatusResponse
StatusTime time.Time
LastCalculatedUpstreams []resolver.Address
LastCalculatedUpstreams []string
}
func (w *Worker) startStatusTicking(cancelCtx context.Context) {
@ -195,28 +194,25 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context) {
w.updateTags.Store(false)
// This may be nil if we are in a multiple hop scenario
var addrs []resolver.Address
var newUpstreams []string
if len(result.CalculatedUpstreams) > 0 {
addrs = make([]resolver.Address, 0, len(result.CalculatedUpstreams))
for _, v := range result.CalculatedUpstreams {
addrs = append(addrs, resolver.Address{Addr: v.Address})
newUpstreams = append(newUpstreams, v.Address)
}
lastStatus := w.lastStatusSuccess.Load().(*LastStatusInformation)
// Compare upstreams; update resolver if there is a difference, and emit an event with old and new addresses
if lastStatus != nil && upstreamsHasChanged(lastStatus.LastCalculatedUpstreams, addrs) {
var oldUpstreams []string
for _, v := range lastStatus.LastCalculatedUpstreams {
oldUpstreams = append(oldUpstreams, v.Addr)
}
var newUpstreams []string
for _, v := range addrs {
newUpstreams = append(newUpstreams, v.Addr)
}
upstreamsMessage := fmt.Sprintf("Upstreams has changed; old upstreams were: %s, new upstreams are: %s", oldUpstreams, newUpstreams)
if lastStatus != nil && !strutil.EquivalentSlices(lastStatus.LastCalculatedUpstreams, newUpstreams) {
upstreamsMessage := fmt.Sprintf("Upstreams has changed; old upstreams were: %s, new upstreams are: %s", lastStatus.LastCalculatedUpstreams, newUpstreams)
event.WriteSysEvent(context.TODO(), op, upstreamsMessage)
w.Resolver().UpdateState(resolver.State{Addresses: addrs})
} else if lastStatus == nil {
w.Resolver().UpdateState(resolver.State{Addresses: addrs})
event.WriteSysEvent(context.TODO(), op, fmt.Sprintf("Upstreams after first status set to: %s", newUpstreams))
}
}
w.lastStatusSuccess.Store(&LastStatusInformation{StatusResponse: result, StatusTime: time.Now(), LastCalculatedUpstreams: addrs})
w.lastStatusSuccess.Store(&LastStatusInformation{StatusResponse: result, StatusTime: time.Now(), LastCalculatedUpstreams: newUpstreams})
for _, request := range result.GetJobsRequests() {
switch request.GetRequestType() {
@ -257,28 +253,6 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context) {
w.cleanupConnections(cancelCtx, false)
}
func upstreamsHasChanged(oldUpstreams, newUpstreams []resolver.Address) bool {
if len(oldUpstreams) != len(newUpstreams) {
return true
}
// Sort both upstreams
sort.Slice(oldUpstreams, func(i, j int) bool {
return strings.Compare(oldUpstreams[i].Addr, oldUpstreams[j].Addr) < 0
})
sort.Slice(newUpstreams, func(i, j int) bool {
return strings.Compare(newUpstreams[i].Addr, newUpstreams[j].Addr) < 0
})
// Compare and return true if change detected
for i, _ := range oldUpstreams {
if oldUpstreams[i].Addr != newUpstreams[i].Addr {
return true
}
}
return false
}
// cleanupConnections walks all sessions and shuts down connections.
// Additionally, sessions without connections are cleaned up from the
// local worker's state.

Loading…
Cancel
Save