mirror of https://github.com/hashicorp/boundary
internal/servers/controller: Worker failure connection cleanup (#1340)
This commit adds the support to do the following: * Mark connections for non-reporting workers as closed. This is the controller counterpart to the worker functionality (see #1330). This is written as a scheduled job that does the work DB-side in a single atomic query. * Works to reconcile states if such a broken controller-worker connection resumes and a worker reports a connection as connected that should be disconnected. In this case, the controller will send an update request, and the worker will honor it and terminate the connection. * Further refinement of the grace period setting has been added here. We have converged on the current server "liveness" setting as our default here, which is half of the previous 30s (15 seconds, in other words). Additionally, this is now configurable on the controller and worker side, with the caveat that it's currently impossible to do so in config as the setting has been untagged in HCL. This is exposed so that we can run some sophisticated testing scenarios where we skew the grace period to either the controller or worker to ensure the aforementioned reconciliation works. * Some repository functions have been added to support the new functionality, in addition to some test code to the worker to allow querying of session state while testing. * Finally, we've added some add timestamp subtraction functions as well, basically serving as the opposite of the addition functions.pull/1397/head
parent
462306fea5
commit
5a70875726
@ -0,0 +1,23 @@
|
||||
begin;
|
||||
|
||||
create function wt_sub_seconds(sec integer, ts timestamp with time zone)
|
||||
returns timestamp with time zone
|
||||
as $$
|
||||
select ts - sec * '1 second'::interval;
|
||||
$$ language sql
|
||||
stable
|
||||
returns null on null input;
|
||||
comment on function wt_add_seconds is
|
||||
'wt_sub_seconds returns ts - sec.';
|
||||
|
||||
create function wt_sub_seconds_from_now(sec integer)
|
||||
returns timestamp with time zone
|
||||
as $$
|
||||
select wt_sub_seconds(sec, current_timestamp);
|
||||
$$ language sql
|
||||
stable
|
||||
returns null on null input;
|
||||
comment on function wt_add_seconds_to_now is
|
||||
'wt_sub_seconds_from_now returns current_timestamp - sec.';
|
||||
|
||||
commit;
|
||||
@ -0,0 +1,61 @@
|
||||
package migration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/boundary/internal/db/schema"
|
||||
"github.com/hashicorp/boundary/internal/docker"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const targetMigration = 12001
|
||||
|
||||
func TestWtSubSeconds(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
ctx := context.Background()
|
||||
d := testSetupDb(ctx, t)
|
||||
|
||||
// Test by subtracing a day from the test date
|
||||
sourceTime, err := time.Parse(time.RFC3339, "2006-01-02T15:04:05+07:00")
|
||||
require.NoError(err)
|
||||
expectedTime := sourceTime.Add(time.Second * -86400)
|
||||
|
||||
var actualTime time.Time
|
||||
row := d.QueryRowContext(ctx, "select wt_sub_seconds($1, $2)", 86400, sourceTime)
|
||||
require.NoError(row.Scan(&actualTime))
|
||||
require.True(expectedTime.Equal(actualTime))
|
||||
}
|
||||
|
||||
func testSetupDb(ctx context.Context, t *testing.T) *sql.DB {
|
||||
t.Helper()
|
||||
require := require.New(t)
|
||||
|
||||
dialect := "postgres"
|
||||
|
||||
c, u, _, err := docker.StartDbInDocker(dialect)
|
||||
require.NoError(err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(c())
|
||||
})
|
||||
d, err := sql.Open(dialect, u)
|
||||
require.NoError(err)
|
||||
|
||||
oState := schema.TestCloneMigrationStates(t)
|
||||
nState := schema.TestCreatePartialMigrationState(oState["postgres"], targetMigration)
|
||||
oState["postgres"] = nState
|
||||
|
||||
m, err := schema.NewManager(ctx, dialect, d, schema.WithMigrationStates(oState))
|
||||
require.NoError(err)
|
||||
|
||||
require.NoError(m.RollForward(ctx))
|
||||
state, err := m.CurrentState(ctx)
|
||||
require.NoError(err)
|
||||
require.Equal(targetMigration, state.DatabaseSchemaVersion)
|
||||
require.False(state.Dirty)
|
||||
|
||||
return d
|
||||
}
|
||||
@ -0,0 +1,133 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/boundary/internal/errors"
|
||||
"github.com/hashicorp/boundary/internal/scheduler"
|
||||
"github.com/hashicorp/boundary/internal/servers/controller/common"
|
||||
"github.com/hashicorp/boundary/internal/session"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
// sessionCleanupJob defines a periodic job that monitors workers for
|
||||
// loss of connection and terminates connections on workers that have
|
||||
// not sent a heartbeat in a significant period of time.
|
||||
//
|
||||
// Relevant connections are simply marked as disconnected in the
|
||||
// database. Connections will be independently terminated by the
|
||||
// worker, or the event of a synchronization issue between the two,
|
||||
// the controller will win out and order that the connections be
|
||||
// closed on the worker.
|
||||
type sessionCleanupJob struct {
|
||||
logger hclog.Logger
|
||||
sessionRepoFn common.SessionRepoFactory
|
||||
|
||||
// The amount of time to give disconnected workers before marking
|
||||
// their connections as closed.
|
||||
gracePeriod int
|
||||
|
||||
// The total number of connections closed in the last run.
|
||||
totalClosed int
|
||||
}
|
||||
|
||||
// newSessionCleanupJob instantiates the session cleanup job.
|
||||
func newSessionCleanupJob(
|
||||
logger hclog.Logger,
|
||||
sessionRepoFn common.SessionRepoFactory,
|
||||
gracePeriod int,
|
||||
) (*sessionCleanupJob, error) {
|
||||
const op = "controller.newNewSessionCleanupJob"
|
||||
switch {
|
||||
case logger == nil:
|
||||
return nil, errors.New(errors.InvalidParameter, op, "missing logger")
|
||||
case sessionRepoFn == nil:
|
||||
return nil, errors.New(errors.InvalidParameter, op, "missing sessionRepoFn")
|
||||
case gracePeriod < session.DeadWorkerConnCloseMinGrace:
|
||||
return nil, errors.New(
|
||||
errors.InvalidParameter, op, fmt.Sprintf("invalid gracePeriod, must be greater than %d", session.DeadWorkerConnCloseMinGrace))
|
||||
}
|
||||
|
||||
return &sessionCleanupJob{
|
||||
logger: logger,
|
||||
sessionRepoFn: sessionRepoFn,
|
||||
gracePeriod: gracePeriod,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Name returns a short, unique name for the job.
|
||||
func (j *sessionCleanupJob) Name() string { return "session_cleanup" }
|
||||
|
||||
// Description returns the description for the job.
|
||||
func (j *sessionCleanupJob) Description() string {
|
||||
return "Clean up session connections from disconnected workers"
|
||||
}
|
||||
|
||||
// NextRunIn returns the next run time after a job is completed.
|
||||
//
|
||||
// The next run time is defined for sessionCleanupJob as one second.
|
||||
// This is because the job should run continuously to terminate
|
||||
// connections as soon as a worker has not reported in for a long
|
||||
// enough time. Only one job will ever run at once, so there is no
|
||||
// reason why it cannot run again immediately.
|
||||
func (j *sessionCleanupJob) NextRunIn() (time.Duration, error) { return time.Second, nil }
|
||||
|
||||
// Status returns the status of the running job.
|
||||
func (j *sessionCleanupJob) Status() scheduler.JobStatus {
|
||||
return scheduler.JobStatus{
|
||||
Completed: j.totalClosed,
|
||||
Total: j.totalClosed,
|
||||
}
|
||||
}
|
||||
|
||||
// Run executes the job.
|
||||
func (j *sessionCleanupJob) Run(ctx context.Context) error {
|
||||
const op = "controller.(sessionCleanupJob).Run"
|
||||
j.logger.Debug(
|
||||
"starting job",
|
||||
"op", op,
|
||||
)
|
||||
j.totalClosed = 0
|
||||
|
||||
// Load repos.
|
||||
sessionRepo, err := j.sessionRepoFn()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, op, errors.WithMsg("error getting session repo"))
|
||||
}
|
||||
|
||||
// Run the atomic dead worker cleanup job.
|
||||
results, err := sessionRepo.CloseConnectionsForDeadWorkers(ctx, j.gracePeriod)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, op)
|
||||
}
|
||||
|
||||
if len(results) < 1 {
|
||||
j.logger.Debug(
|
||||
"all workers OK, no connections to close",
|
||||
"op", op,
|
||||
)
|
||||
} else {
|
||||
for _, result := range results {
|
||||
// Log a closed connection message for each worker as a warning
|
||||
j.logger.Warn(
|
||||
"worker has not reported status within acceptable grace period, all connections closed",
|
||||
"op", op,
|
||||
"private_id", result.ServerId,
|
||||
"update_time", result.LastUpdateTime,
|
||||
"grace_period_seconds", j.gracePeriod,
|
||||
"number_connections_closed", result.NumberConnectionsClosed,
|
||||
)
|
||||
|
||||
j.totalClosed += result.NumberConnectionsClosed
|
||||
}
|
||||
}
|
||||
|
||||
j.logger.Debug(
|
||||
"job finished",
|
||||
"op", op,
|
||||
"total_connections_closed", j.totalClosed,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
@ -0,0 +1,170 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/boundary/internal/db"
|
||||
"github.com/hashicorp/boundary/internal/errors"
|
||||
"github.com/hashicorp/boundary/internal/iam"
|
||||
"github.com/hashicorp/boundary/internal/kms"
|
||||
"github.com/hashicorp/boundary/internal/scheduler"
|
||||
"github.com/hashicorp/boundary/internal/servers"
|
||||
"github.com/hashicorp/boundary/internal/session"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// assert the interface
|
||||
var _ = scheduler.Job(new(sessionCleanupJob))
|
||||
|
||||
// This test has been largely adapted from
|
||||
// TestRepository_CloseDeadConnectionsOnWorker in
|
||||
// internal/session/repository_connection_test.go.
|
||||
func TestSessionCleanupJob(t *testing.T) {
|
||||
t.Parallel()
|
||||
require, assert := require.New(t), assert.New(t)
|
||||
conn, _ := db.TestSetup(t, "postgres")
|
||||
rw := db.New(conn)
|
||||
wrapper := db.TestWrapper(t)
|
||||
iamRepo := iam.TestRepo(t, conn, wrapper)
|
||||
kms := kms.TestKms(t, conn, wrapper)
|
||||
serversRepo, err := servers.NewRepository(rw, rw, kms)
|
||||
require.NoError(err)
|
||||
sessionRepo, err := session.NewRepository(rw, rw, kms)
|
||||
require.NoError(err)
|
||||
ctx := context.Background()
|
||||
numConns := 12
|
||||
|
||||
// Create two "workers". One will remain untouched while the other "goes
|
||||
// away and comes back" (worker 2).
|
||||
worker1 := session.TestWorker(t, conn, wrapper, session.WithServerId("worker1"))
|
||||
worker2 := session.TestWorker(t, conn, wrapper, session.WithServerId("worker2"))
|
||||
|
||||
// Create a few sessions on each, activate, and authorize a connection
|
||||
var connIds []string
|
||||
connIdsByWorker := make(map[string][]string)
|
||||
for i := 0; i < numConns; i++ {
|
||||
serverId := worker1.PrivateId
|
||||
if i%2 == 0 {
|
||||
serverId = worker2.PrivateId
|
||||
}
|
||||
sess := session.TestDefaultSession(t, conn, wrapper, iamRepo, session.WithServerId(serverId), session.WithDbOpts(db.WithSkipVetForWrite(true)))
|
||||
sess, _, err = sessionRepo.ActivateSession(ctx, sess.GetPublicId(), sess.Version, serverId, "worker", []byte("foo"))
|
||||
require.NoError(err)
|
||||
c, cs, _, err := sessionRepo.AuthorizeConnection(ctx, sess.GetPublicId(), serverId)
|
||||
require.NoError(err)
|
||||
require.Len(cs, 1)
|
||||
require.Equal(session.StatusAuthorized, cs[0].Status)
|
||||
connIds = append(connIds, c.GetPublicId())
|
||||
if i%2 == 0 {
|
||||
connIdsByWorker[worker2.PrivateId] = append(connIdsByWorker[worker2.PrivateId], c.GetPublicId())
|
||||
} else {
|
||||
connIdsByWorker[worker1.PrivateId] = append(connIdsByWorker[worker1.PrivateId], c.GetPublicId())
|
||||
}
|
||||
}
|
||||
|
||||
// Mark half of the connections connected and leave the others authorized.
|
||||
// This is just to ensure we have a spread when we test it out.
|
||||
for i, connId := range connIds {
|
||||
if i%2 == 0 {
|
||||
_, cs, err := sessionRepo.ConnectConnection(ctx, session.ConnectWith{
|
||||
ConnectionId: connId,
|
||||
ClientTcpAddress: "127.0.0.1",
|
||||
ClientTcpPort: 22,
|
||||
EndpointTcpAddress: "127.0.0.1",
|
||||
EndpointTcpPort: 22,
|
||||
})
|
||||
require.NoError(err)
|
||||
require.Len(cs, 2)
|
||||
var foundAuthorized, foundConnected bool
|
||||
for _, status := range cs {
|
||||
if status.Status == session.StatusAuthorized {
|
||||
foundAuthorized = true
|
||||
}
|
||||
if status.Status == session.StatusConnected {
|
||||
foundConnected = true
|
||||
}
|
||||
}
|
||||
require.True(foundAuthorized)
|
||||
require.True(foundConnected)
|
||||
}
|
||||
}
|
||||
|
||||
// Create the job.
|
||||
job, err := newSessionCleanupJob(
|
||||
hclog.New(&hclog.LoggerOptions{Level: hclog.Trace}),
|
||||
func() (*session.Repository, error) { return sessionRepo, nil },
|
||||
session.DeadWorkerConnCloseMinGrace,
|
||||
)
|
||||
require.NoError(err)
|
||||
|
||||
// sleep the status grace period.
|
||||
time.Sleep(time.Second * time.Duration(session.DeadWorkerConnCloseMinGrace))
|
||||
|
||||
// Push an upsert to the first worker so that its status has been
|
||||
// updated.
|
||||
_, rowsUpdated, err := serversRepo.UpsertServer(ctx, worker1, []servers.Option{}...)
|
||||
require.NoError(err)
|
||||
require.Equal(1, rowsUpdated)
|
||||
|
||||
// Run the job.
|
||||
require.NoError(job.Run(ctx))
|
||||
|
||||
// Assert connection state on both workers.
|
||||
assertConnections := func(workerId string, closed bool) {
|
||||
connIds, ok := connIdsByWorker[workerId]
|
||||
require.True(ok)
|
||||
require.Len(connIds, 6)
|
||||
for _, connId := range connIds {
|
||||
_, states, err := sessionRepo.LookupConnection(ctx, connId, nil)
|
||||
require.NoError(err)
|
||||
var foundClosed bool
|
||||
for _, state := range states {
|
||||
if state.Status == session.StatusClosed {
|
||||
foundClosed = true
|
||||
break
|
||||
}
|
||||
}
|
||||
assert.Equal(closed, foundClosed)
|
||||
}
|
||||
}
|
||||
|
||||
// Assert that all connections on the second worker are closed
|
||||
assertConnections(worker2.PrivateId, true)
|
||||
// Assert that all connections on the first worker are still open
|
||||
assertConnections(worker1.PrivateId, false)
|
||||
}
|
||||
|
||||
func TestSessionCleanupJobNewJobErr(t *testing.T) {
|
||||
t.Parallel()
|
||||
const op = "controller.newNewSessionCleanupJob"
|
||||
require := require.New(t)
|
||||
|
||||
job, err := newSessionCleanupJob(nil, nil, 0)
|
||||
require.Equal(err, errors.E(
|
||||
errors.WithCode(errors.InvalidParameter),
|
||||
errors.WithOp(op),
|
||||
errors.WithMsg("missing logger"),
|
||||
))
|
||||
require.Nil(job)
|
||||
|
||||
job, err = newSessionCleanupJob(hclog.New(nil), nil, 0)
|
||||
require.Equal(err, errors.E(
|
||||
errors.WithCode(errors.InvalidParameter),
|
||||
errors.WithOp(op),
|
||||
errors.WithMsg("missing sessionRepoFn"),
|
||||
))
|
||||
require.Nil(job)
|
||||
|
||||
job, err = newSessionCleanupJob(hclog.New(nil), func() (*session.Repository, error) { return nil, nil }, 0)
|
||||
require.Equal(err, errors.E(
|
||||
errors.WithCode(errors.InvalidParameter),
|
||||
errors.WithOp(op),
|
||||
errors.WithMsg(fmt.Sprintf("invalid gracePeriod, must be greater than %d", session.DeadWorkerConnCloseMinGrace)),
|
||||
))
|
||||
require.Nil(job)
|
||||
}
|
||||
@ -0,0 +1,56 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/boundary/internal/cmd/base"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestWorkerWaitForNextSuccessfulStatusUpdate(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
for _, name := range []string{"ok", "timeout"} {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
// As-needed initialization of a mock worker
|
||||
w := &Worker{
|
||||
logger: hclog.New(nil),
|
||||
lastStatusSuccess: new(atomic.Value),
|
||||
baseContext: context.Background(),
|
||||
conf: &Config{
|
||||
Server: &base.Server{
|
||||
StatusGracePeriodDuration: time.Second * 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// This is present in New()
|
||||
w.lastStatusSuccess.Store((*LastStatusInformation)(nil))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var err error
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
err = w.WaitForNextSuccessfulStatusUpdate()
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
if name == "ok" {
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
w.lastStatusSuccess.Store(&LastStatusInformation{StatusTime: time.Now()})
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
if name == "timeout" {
|
||||
require.ErrorIs(err, context.DeadlineExceeded)
|
||||
} else {
|
||||
require.NoError(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Loading…
Reference in new issue