Move worker selection above session creation (#929)

This prevents a session being created when no workers are available to
handle it.
build-9761f1deee3fceeb4e9a11696e3a15de813c6979-eee4aa2684a7d81e
Jeff Mitchell 5 years ago committed by GitHub
parent 0993747702
commit 1ce09069a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -58,6 +58,7 @@ require (
go.uber.org/atomic v1.7.0
golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221
golang.org/x/tools v0.0.0-20210101214203-2dba1e4ea05c
google.golang.org/genproto v0.0.0-20210207032614-bba0dbe2a9ea
google.golang.org/grpc v1.35.0

@ -501,7 +501,6 @@ func (b *Server) CreateInitialTarget(ctx context.Context) (target.Target, error)
return nil, fmt.Errorf("unable to list existing roles in project: %w", err)
}
if len(roles) != 2 {
panic("")
return nil, fmt.Errorf("unexpected number of roles in default project, expected 2, got %d", len(roles))
}
var idx int = -1

@ -4,7 +4,7 @@ import (
"os"
"github.com/mitchellh/cli"
"golang.org/x/crypto/ssh/terminal"
"golang.org/x/term"
)
type BoundaryUI struct {
@ -15,7 +15,7 @@ type BoundaryUI struct {
var TermWidth uint = 80
func init() {
width, _, err := terminal.GetSize(int(os.Stdin.Fd()))
width, _, err := term.GetSize(int(os.Stdin.Fd()))
if err == nil {
TermWidth = uint(width)
}

@ -343,6 +343,79 @@ func (s Service) AuthorizeSession(ctx context.Context, req *pbs.AuthorizeSession
return nil, err
}
// First ensure we can actually service a request, that is, we have workers
// available (after any filtering). WorkerInfo only contains the address;
// worker IDs below is used to contain their IDs in the same order. This is
// used to fetch tags for filtering. But we avoid allocation unless we
// actually need it.
var workers []*pb.WorkerInfo
var workerIds []string
hasWorkerFilter := len(t.GetWorkerFilter()) > 0
servers, err := serversRepo.ListServers(ctx, servers.ServerTypeWorker)
if err != nil {
return nil, err
}
for _, v := range servers {
if hasWorkerFilter {
workerIds = append(workerIds, v.GetPrivateId())
}
workers = append(workers, &pb.WorkerInfo{Address: v.Address})
}
if hasWorkerFilter && len(workerIds) > 0 {
finalWorkers := make([]*pb.WorkerInfo, 0, len(workers))
// Fetch the tags for the given worker IDs
tags, err := serversRepo.ListTagsForServers(ctx, workerIds)
if err != nil {
return nil, err
}
// Build the map for filtering. This is similar to the filter map we
// built from the worker config, but with one extra level: a map of the
// worker's ID to its filter map.
tagMap := make(map[string]map[string][]string)
for _, tag := range tags {
currWorkerMap := tagMap[tag.ServerId]
if currWorkerMap == nil {
currWorkerMap = make(map[string][]string)
tagMap[tag.ServerId] = currWorkerMap
}
currWorkerMap[tag.Key] = append(currWorkerMap[tag.Key], tag.Value)
// We don't need to reinsert after the fact because maps are
// reference types, so we don't need to re-insert into tagMap
}
// Create the evaluator
eval, err := bexpr.CreateEvaluator(t.GetWorkerFilter())
if err != nil {
return nil, err
}
// Iterate through the known worker IDs, and evaluate. If evaluation
// returns true, add to the final worker slice, which is assigned back
// to workers after this.
for i, worker := range workerIds {
filterInput := map[string]interface{}{
"name": worker,
"tags": tagMap[worker],
}
ok, err := eval.Evaluate(filterInput)
if err != nil && !stderrors.Is(err, pointerstructure.ErrNotFound) {
return nil, handlers.ApiErrorWithCodeAndMessage(
codes.FailedPrecondition,
fmt.Sprintf("Worker filter expression evaluation resulted in error: %s", err))
}
if ok {
finalWorkers = append(finalWorkers, workers[i])
}
}
workers = finalWorkers
}
if len(workers) == 0 {
return nil, handlers.ApiErrorWithCodeAndMessage(
codes.FailedPrecondition,
"No workers are available to handle this session, or all have been filtered.")
}
// First, fetch all available hosts. Unless one was chosen in the request,
// we will pick one at random.
type compoundHost struct {
@ -445,77 +518,6 @@ HostSetIterationLoop:
return nil, err
}
// WorkerInfo only contains the address; worker IDs below is used to contain
// their IDs in the same order. This is used to fetch tags for filtering.
// But we avoid allocation unless we actually need it.
var workers []*pb.WorkerInfo
var workerIds []string
hasWorkerFilter := len(t.GetWorkerFilter()) > 0
servers, err := serversRepo.ListServers(ctx, servers.ServerTypeWorker)
if err != nil {
return nil, err
}
for _, v := range servers {
if hasWorkerFilter {
workerIds = append(workerIds, v.GetPrivateId())
}
workers = append(workers, &pb.WorkerInfo{Address: v.Address})
}
if hasWorkerFilter && len(workerIds) > 0 {
finalWorkers := make([]*pb.WorkerInfo, 0, len(workers))
// Fetch the tags for the given worker IDs
tags, err := serversRepo.ListTagsForServers(ctx, workerIds)
if err != nil {
return nil, err
}
// Build the map for filtering. This is similar to the filter map we
// built from the worker config, but with one extra level: a map of the
// worker's ID to its filter map.
tagMap := make(map[string]map[string][]string)
for _, tag := range tags {
currWorkerMap := tagMap[tag.ServerId]
if currWorkerMap == nil {
currWorkerMap = make(map[string][]string)
tagMap[tag.ServerId] = currWorkerMap
}
currWorkerMap[tag.Key] = append(currWorkerMap[tag.Key], tag.Value)
// We don't need to reinsert after the fact because maps are
// reference types, so we don't need to re-insert into tagMap
}
// Create the evaluator
eval, err := bexpr.CreateEvaluator(t.GetWorkerFilter())
if err != nil {
return nil, err
}
// Iterate through the known worker IDs, and evaluate. If evaluation
// returns true, add to the final worker slice, which is assigned back
// to workers after this.
for i, worker := range workerIds {
filterInput := map[string]interface{}{
"name": worker,
"tags": tagMap[worker],
}
ok, err := eval.Evaluate(filterInput)
if err != nil && !stderrors.Is(err, pointerstructure.ErrNotFound) {
return nil, handlers.ApiErrorWithCodeAndMessage(
codes.FailedPrecondition,
fmt.Sprintf("Worker filter expression evaluation resulted in error: %s", err))
}
if ok {
finalWorkers = append(finalWorkers, workers[i])
}
}
workers = finalWorkers
}
if len(workers) == 0 {
return nil, handlers.ApiErrorWithCodeAndMessage(
codes.FailedPrecondition,
"No workers are available to handle this session, or all have been filtered.")
}
sad := &pb.SessionAuthorizationData{
SessionId: sess.PublicId,
TargetId: t.GetPublicId(),

Loading…
Cancel
Save