internal/session: ensure orphaned connection cleanup (#5438)

Previously, the orphaned connection cleanup job would
not detect that a connection that was created before a
worker had submitted a successful session info was
orphaned. We now fall back to using the worker update
time as an indicator if the session info time is not set.
pull/5449/head
Johan Brandhorst-Satzkorn 1 year ago committed by GitHub
parent 196f01bbd5
commit a43cb90a32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -292,16 +292,18 @@ where
//
// The query returns the set of servers that have had connections closed
// along with their last update time and the number of connections closed on
// each. If we do not know the last update time, we use the current time.
// each. If the worker has not yet sent a session info update, we use the
// worker's last update time as the last update time.
closeConnectionsForDeadServersCte = `
with
dead_workers (worker_id, last_update_time) as (
select
w.worker_id as worker_id,
w.last_request_time as last_update_time
from server_worker_session_info_request w
where
w.last_request_time < wt_sub_seconds_from_now(@grace_period_seconds)
select w.public_id as worker_id,
coalesce(wsi.last_request_time, w.update_time) as last_update_time
from server_worker w
left join server_worker_session_info_request wsi on wsi.worker_id = w.public_id
where wsi.last_request_time < wt_sub_seconds_from_now(@grace_period_seconds)
or ( wsi.last_request_time is null
and w.update_time < wt_sub_seconds_from_now(@grace_period_seconds))
),
closed_connections (connection_id, worker_id) as (
update session_connection

@ -0,0 +1,111 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package parallel
import (
"context"
"net"
"testing"
"time"
"github.com/hashicorp/boundary/api/targets"
"github.com/hashicorp/boundary/internal/cmd/config"
"github.com/hashicorp/boundary/internal/daemon/controller"
"github.com/hashicorp/boundary/internal/daemon/worker"
"github.com/hashicorp/boundary/internal/session"
"github.com/hashicorp/boundary/internal/tests/helper"
"github.com/hashicorp/dawdle"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
)
// TestSpeedyOrphanedConnectionCleanup tests that we successfully
// clean up connections that are orphaned to a worker that never
// successfully reported a session info.
func TestSpeedyOrphanedConnectionCleanup(t *testing.T) {
t.Parallel()
logger := hclog.New(&hclog.LoggerOptions{
Name: t.Name(),
Level: hclog.Trace,
})
conf, err := config.DevController()
require.NoError(t, err)
pl, err := net.Listen("tcp", "[::1]:0")
require.NoError(t, err)
c := controller.NewTestController(t, &controller.TestControllerOpts{
Config: conf,
InitialResourcesSuffix: "1234567890",
Logger: logger.Named("c1"),
PublicClusterAddr: pl.Addr().String(),
WorkerRPCGracePeriod: 2 * time.Second,
// Run the scheduler more often to speed up cleanup of orphaned connections
SchedulerRunJobInterval: time.Second,
})
helper.ExpectWorkers(t, c)
// Wire up the testing proxies
require.Len(t, c.ClusterAddrs(), 1)
proxy, err := dawdle.NewProxy("tcp", "", c.ClusterAddrs()[0],
dawdle.WithListener(pl),
dawdle.WithRbufSize(256),
dawdle.WithWbufSize(256),
)
require.NoError(t, err)
t.Cleanup(func() {
_ = proxy.Close()
})
require.NotEmpty(t, proxy.ListenerAddr())
w := worker.NewTestWorker(t, &worker.TestWorkerOpts{
WorkerAuthKms: c.Config().WorkerAuthKms,
InitialUpstreams: []string{proxy.ListenerAddr()},
Logger: logger.Named("w1"),
SuccessfulControllerRPCGracePeriodDuration: 2 * time.Second,
WorkerRPCInterval: time.Second,
})
helper.ExpectWorkers(t, c, w)
// Use an independent context for test things that take a context so
// that we aren't tied to any timeouts in the controller, etc. This
// can interfere with some of the test operations.
ctx := context.Background()
// Connect target
client := c.Client()
client.SetToken(c.Token().Token)
tcl := targets.NewClient(client)
tgt, err := tcl.Read(ctx, "ttcp_1234567890")
require.NoError(t, err)
require.NotNil(t, tgt)
// Create test server, update default port on target
ts := helper.NewTestTcpServer(t)
require.NotNil(t, ts)
t.Cleanup(ts.Close)
tgt, err = tcl.Update(ctx, tgt.Item.Id, tgt.Item.Version, targets.WithTcpTargetDefaultPort(ts.Port()), targets.WithSessionConnectionLimit(-1))
require.NoError(t, err)
require.NotNil(t, tgt)
// Authorize and connect
sess := helper.NewTestSession(ctx, t, tcl, "ttcp_1234567890")
sConn := sess.Connect(ctx, t)
// Run initial send/receive test, make sure things are working
t.Log("running initial send/recv test")
sConn.TestSendRecvAll(t)
// Kill the link before the worker has a chance to send a session info
t.Log("pausing controller/worker link")
proxy.Pause()
t.Cleanup(proxy.Resume) // Resume the proxy on cleanup to allow other cleanup to run
// Both the worker and the controller should detect that the worker is dead
// and close the connection.
sess.ExpectConnectionStateOnWorker(ctx, t, w, session.StatusClosed)
sess.ExpectConnectionStateOnController(ctx, t, c.Controller().ConnectionRepoFn, session.StatusClosed)
}
Loading…
Cancel
Save