You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
boundary/internal/servers/worker/status.go

188 lines
6.1 KiB

package worker
import (
"context"
"math/rand"
"time"
pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services"
"github.com/hashicorp/boundary/internal/servers"
"github.com/hashicorp/boundary/internal/types/resource"
"google.golang.org/grpc/resolver"
)
// In the future we could make this configurable
const (
statusInterval = 2 * time.Second
)
type LastStatusInformation struct {
*pbs.StatusResponse
StatusTime time.Time
}
func (w *Worker) startStatusTicking(cancelCtx context.Context) {
go func() {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
// This function exists to desynchronize calls to controllers from
// workers so we aren't always getting status updates at the exact same
// intervals, to ease the load on the DB.
getRandomInterval := func() time.Duration {
// 0 to 0.5 adjustment to the base
f := r.Float64() / 2
// Half a chance to be faster, not slower
if r.Float32() > 0.5 {
f = -1 * f
}
return statusInterval + time.Duration(f*float64(time.Second))
}
timer := time.NewTimer(0)
for {
select {
case <-cancelCtx.Done():
w.logger.Info("status ticking shutting down")
return
case <-timer.C:
// First send info as-is. We'll perform cleanup duties after we
// get cancel/job change info back.
var activeJobs []*pbs.JobStatus
w.sessionInfoMap.Range(func(key, value interface{}) bool {
var jobInfo pbs.SessionJobInfo
sessionId := key.(string)
si := value.(*sessionInfo)
si.RLock()
status := si.status
connections := make([]*pbs.Connection, 0, len(si.connInfoMap))
for k, v := range si.connInfoMap {
connections = append(connections, &pbs.Connection{
ConnectionId: k,
Status: v.status,
})
}
si.RUnlock()
jobInfo.SessionId = sessionId
activeJobs = append(activeJobs, &pbs.JobStatus{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_SESSION,
JobInfo: &pbs.Job_SessionInfo{
SessionInfo: &pbs.SessionJobInfo{
SessionId: sessionId,
Status: status,
Connections: connections,
},
},
},
})
return true
})
client := w.controllerStatusConn.Load().(pbs.ServerCoordinationServiceClient)
result, err := client.Status(cancelCtx, &pbs.StatusRequest{
Jobs: activeJobs,
Worker: &servers.Server{
PrivateId: w.conf.RawConfig.Worker.Name,
Name: w.conf.RawConfig.Worker.Name,
Type: resource.Worker.String(),
Description: w.conf.RawConfig.Worker.Description,
Address: w.conf.RawConfig.Worker.PublicAddr,
},
})
if err != nil {
w.logger.Error("error making status request to controller", "error", err)
} else {
w.logger.Trace("successfully sent status to controller")
addrs := make([]resolver.Address, 0, len(result.Controllers))
strAddrs := make([]string, 0, len(result.Controllers))
for _, v := range result.Controllers {
addrs = append(addrs, resolver.Address{Addr: v.Address})
strAddrs = append(strAddrs, v.Address)
}
w.logger.Trace("found controllers", "addresses", strAddrs)
switch len(strAddrs) {
case 0:
w.logger.Warn("got no controller addresses from controller; possibly prior to first status save, not persisting")
default:
w.Resolver().UpdateState(resolver.State{Addresses: addrs})
}
w.lastStatusSuccess.Store(&LastStatusInformation{StatusResponse: result, StatusTime: time.Now()})
for _, request := range result.GetJobsRequests() {
switch request.GetRequestType() {
case pbs.CHANGETYPE_CHANGETYPE_UPDATE_STATE:
switch request.GetJob().GetType() {
case pbs.JOBTYPE_JOBTYPE_SESSION:
sessInfo := request.GetJob().GetSessionInfo()
sessionId := sessInfo.GetSessionId()
siRaw, ok := w.sessionInfoMap.Load(sessionId)
if !ok {
w.logger.Warn("asked to cancel session but could not find a local information for it", "session_id", sessionId)
continue
}
si := siRaw.(*sessionInfo)
si.Lock()
si.status = sessInfo.GetStatus()
si.Unlock()
}
}
}
}
// Cleanup: Run through current jobs. Cancel connections for any
// canceling session or any session that is expired. Clear out
// sessions that are canceled or expired with all connections
// marked as closed. Close any that aren't marked as such.
closeInfo := make(map[string]string)
cleanSessionIds := make([]string, 0)
w.sessionInfoMap.Range(func(key, value interface{}) bool {
si := value.(*sessionInfo)
si.Lock()
switch {
case si.status == pbs.SESSIONSTATUS_SESSIONSTATUS_CANCELING,
si.status == pbs.SESSIONSTATUS_SESSIONSTATUS_TERMINATED,
time.Until(si.lookupSessionResponse.Expiration.AsTime()) < 0:
var toClose int
for k, v := range si.connInfoMap {
if v.closeTime.IsZero() {
toClose++
v.connCancel()
w.logger.Info("terminated connection due to cancelation or expiration", "session_id", si.id, "connection_id", k)
closeInfo[k] = si.id
}
}
// closeTime is marked by closeConnections iff the
// status is returned for that connection as closed. If
// the session is no longer valid and all connections
// are marked closed, clean up the session.
if toClose == 0 {
cleanSessionIds = append(cleanSessionIds, si.id)
}
}
si.Unlock()
return true
})
// Note that we won't clean these from the info map until the
// next time we run this function
if len(closeInfo) > 0 {
if err := w.closeConnections(cancelCtx, closeInfo); err != nil {
w.logger.Error("error marking connections closed", "error", err)
}
}
// Forget sessions where the session is expired/canceled and all
// connections are canceled and marked closed
for _, v := range cleanSessionIds {
w.sessionInfoMap.Delete(v)
}
timer.Reset(getRandomInterval())
}
}
}()
}
func (w *Worker) LastStatusSuccess() *LastStatusInformation {
return w.lastStatusSuccess.Load().(*LastStatusInformation)
}