feat: new worker config for minimumAvailableCapacity

pull/4607/head
Damian Debkowski 2 years ago committed by Elim Tsiagbey
parent f483d1417c
commit aea473c77b

@ -66,54 +66,55 @@ type Command struct {
controller *controller.Controller
worker *worker.Worker
flagLogLevel string
flagLogFormat string
flagCombineLogs bool
flagLoginName string
flagPassword string
flagUnprivilegedLoginName string
flagUnprivilegedPassword string
flagIdSuffix string
flagSecondaryIdSuffix string
flagHostAddress string
flagTargetDefaultPort int
flagTargetSessionMaxSeconds int
flagTargetSessionConnectionLimit int
flagControllerApiListenAddr string
flagControllerClusterListenAddr string
flagControllerPublicClusterAddr string
flagControllerOnly bool
flagWorkerAuthKey string
flagWorkerProxyListenAddr string
flagWorkerPublicAddr string
flagOpsListenAddr string
flagUiPassthroughDir string
flagRecoveryKey string
flagDatabaseUrl string
flagContainerImage string
flagDisableDatabaseDestruction bool
flagEventFormat string
flagAudit string
flagObservations string
flagTelemetry string
flagSysEvents string
flagEveryEventAllowFilters []string
flagEveryEventDenyFilters []string
flagCreateLoopbackPlugin bool
flagPluginExecutionDir string
flagSkipPlugins bool
flagSkipOidcAuthMethodCreation bool
flagSkipLdapAuthMethodCreation bool
flagSkipAliasTargetCreation bool
flagWorkerDnsServer string
flagWorkerAuthMethod string
flagWorkerAuthStorageDir string
flagWorkerAuthStorageSkipCleanup bool
flagWorkerAuthWorkerRotationInterval time.Duration
flagWorkerAuthCaCertificateLifetime time.Duration
flagWorkerAuthDebuggingEnabled bool
flagWorkerRecordingStorageDir string
flagBsrKey string
flagLogLevel string
flagLogFormat string
flagCombineLogs bool
flagLoginName string
flagPassword string
flagUnprivilegedLoginName string
flagUnprivilegedPassword string
flagIdSuffix string
flagSecondaryIdSuffix string
flagHostAddress string
flagTargetDefaultPort int
flagTargetSessionMaxSeconds int
flagTargetSessionConnectionLimit int
flagControllerApiListenAddr string
flagControllerClusterListenAddr string
flagControllerPublicClusterAddr string
flagControllerOnly bool
flagWorkerAuthKey string
flagWorkerProxyListenAddr string
flagWorkerPublicAddr string
flagOpsListenAddr string
flagUiPassthroughDir string
flagRecoveryKey string
flagDatabaseUrl string
flagContainerImage string
flagDisableDatabaseDestruction bool
flagEventFormat string
flagAudit string
flagObservations string
flagTelemetry string
flagSysEvents string
flagEveryEventAllowFilters []string
flagEveryEventDenyFilters []string
flagCreateLoopbackPlugin bool
flagPluginExecutionDir string
flagSkipPlugins bool
flagSkipOidcAuthMethodCreation bool
flagSkipLdapAuthMethodCreation bool
flagSkipAliasTargetCreation bool
flagWorkerDnsServer string
flagWorkerAuthMethod string
flagWorkerAuthStorageDir string
flagWorkerAuthStorageSkipCleanup bool
flagWorkerAuthWorkerRotationInterval time.Duration
flagWorkerAuthCaCertificateLifetime time.Duration
flagWorkerAuthDebuggingEnabled bool
flagWorkerRecordingStorageDir string
flagWorkerRecordingStorageMinimumAvailableCapacity string
flagBsrKey string
}
func (c *Command) Synopsis() string {
@ -426,6 +427,12 @@ func (c *Command) Flags() *base.FlagSets {
Usage: "Specifies the directory to store worker session recordings in dev mode. If not provided a temp directory will be created. Session recording is an Enterprise-only feature.",
})
f.StringVar(&base.StringVar{
Name: "worker-recording-storage-minimum-available-capacity",
Target: &c.flagWorkerRecordingStorageMinimumAvailableCapacity,
Usage: "Specifies the minimum amount of available disk space a worker needs in the recording storage directory to process sessions with session recording enabled. Input should be a capacity string: 4kib or 3GB. Defaults to 500mib.",
})
f.BoolVar(&base.BoolVar{
Name: "worker-auth-storage-skip-cleanup",
Target: &c.flagWorkerAuthStorageSkipCleanup,
@ -528,6 +535,7 @@ func (c *Command) Run(args []string) int {
if !c.flagControllerOnly {
c.Config.Worker.AuthStoragePath = c.flagWorkerAuthStorageDir
c.Config.Worker.RecordingStoragePath = c.flagWorkerRecordingStorageDir
c.Config.Worker.RecordingStorageMinimumAvailableCapacity = c.flagWorkerRecordingStorageMinimumAvailableCapacity
if c.Config.Worker.RecordingStoragePath == "" {
// Create a temp dir for recording storage

@ -71,6 +71,7 @@ func TestCommand_Flags(t *testing.T) {
assert.Contains(completions, "-worker-auth-method")
assert.Contains(completions, "-worker-auth-storage-dir")
assert.Contains(completions, "-worker-recording-storage-dir")
assert.Contains(completions, "-worker-recording-storage-minimum-available-capacity")
assert.Contains(completions, "-worker-auth-storage-skip-cleanup")
// keep adding assertions for other flags which should be set as a result of cmd.Flags()

@ -23,6 +23,7 @@ import (
"github.com/hashicorp/boundary/internal/db"
"github.com/hashicorp/boundary/internal/event"
"github.com/hashicorp/boundary/internal/ratelimit"
"github.com/hashicorp/boundary/internal/storage"
"github.com/hashicorp/boundary/internal/util"
kms_plugin_assets "github.com/hashicorp/boundary/plugins/kms"
"github.com/hashicorp/boundary/sdk/wrapper"
@ -196,7 +197,7 @@ type Controller struct {
// TODO: This isn't documented (on purpose) because the right place for this
// is central configuration so you can't drift across controllers, but we
// don't have that yet.
WorkerStatusGracePeriod interface{} `hcl:"worker_status_grace_period"`
WorkerStatusGracePeriod any `hcl:"worker_status_grace_period"`
WorkerStatusGracePeriodDuration time.Duration `hcl:"-"`
// LivenessTimeToStale represents the period of time (as a duration) after
@ -206,7 +207,7 @@ type Controller struct {
// TODO: This isn't documented (on purpose) because the right place for this
// is central configuration so you can't drift across controllers, but we
// don't have that yet.
LivenessTimeToStale interface{} `hcl:"liveness_time_to_stale"`
LivenessTimeToStale any `hcl:"liveness_time_to_stale"`
LivenessTimeToStaleDuration time.Duration `hcl:"-"`
// SchedulerRunJobInterval is the time interval between waking up the
@ -275,7 +276,7 @@ type Worker struct {
// canceling it to try again.
//
// TODO: This is currently not documented and considered internal.
StatusCallTimeout interface{} `hcl:"status_call_timeout"`
StatusCallTimeout any `hcl:"status_call_timeout"`
StatusCallTimeoutDuration time.Duration `hcl:"-"`
// SuccessfulStatusGracePeriod represents the period of time (as a duration)
@ -284,7 +285,7 @@ type Worker struct {
// less than StatusCallTimeout.
//
// TODO: This is currently not documented and considered internal.
SuccessfulStatusGracePeriod interface{} `hcl:"successful_status_grace_period"`
SuccessfulStatusGracePeriod any `hcl:"successful_status_grace_period"`
SuccessfulStatusGracePeriodDuration time.Duration `hcl:"-"`
// AuthStoragePath represents the location a worker stores its node credentials, if set
@ -294,6 +295,15 @@ type Worker struct {
// they are sync'ed to the corresponding storage bucket. The path must already exist.
RecordingStoragePath string `hcl:"recording_storage_path"`
// RecordingStorageMinimumAvailableCapacity represents the minimum amount of available
// disk space a worker needs in the path defined by RecordingStoragePath for processing
// sessions with recording enabled. The expected input value for this field is a
// “capacity string“. Supported suffixes are kb, kib, mb, mib, gb, gib, tb, tib, which
// are not case sensitive. We use a raw interface for parsing so that users can input a
// "capacity string" which is converted into a uint64 value that is measured in bytes.
RecordingStorageMinimumAvailableCapacity any `hcl:"recording_storage_minimum_available_capacity"`
RecordingStorageMinimumAvailableDiskSpace uint64 `hcl:"-"`
// ControllerGeneratedActivationToken is a controller-generated activation
// token used to register this worker to the cluster. It can be a path, env
// var, or direct value.
@ -790,6 +800,21 @@ func Parse(d string) (*Config, error) {
return nil, errors.New("Successful status grace period value is negative")
}
if !util.IsNil(result.Worker.RecordingStorageMinimumAvailableCapacity) {
if result.Worker.RecordingStoragePath == "" {
return nil, errors.New("recording_storage_path cannot be empty when providing recording_storage_minimum_available_capacity")
}
recordingStorageMinimumAvailableDiskSpace, err := parseutil.ParseCapacityString(result.Worker.RecordingStorageMinimumAvailableCapacity)
if err != nil {
return result, err
}
result.Worker.RecordingStorageMinimumAvailableDiskSpace = recordingStorageMinimumAvailableDiskSpace
}
// RecordingStorageMinimumAvailableDiskSpace defaults to 500MiB when not set by the user
if result.Worker.RecordingStoragePath != "" && result.Worker.RecordingStorageMinimumAvailableDiskSpace == 0 {
result.Worker.RecordingStorageMinimumAvailableDiskSpace = storage.DefaultMinimumAvailableDiskSpace
}
switch {
case result.Worker.StatusCallTimeoutDuration == 0 && result.Worker.SuccessfulStatusGracePeriodDuration == 0:
// Nothing

@ -575,6 +575,154 @@ func TestDevWorkerCredentialStoragePath(t *testing.T) {
}
}
func TestDevWorkerRecordingStorageMinimumAvailableCapacity(t *testing.T) {
t.Parallel()
td := t.TempDir()
tests := []struct {
name string
devWorkerProvidedConfiguration string
storagePath string
storageCapacity string
expectedDiskSpace uint64
expectedErrMsg string
}{
{
name: "empty storage with empty capacity",
devWorkerProvidedConfiguration: `
listener "tcp" {
purpose = "proxy"
}
worker {
name = "w_1234567890"
description = "A default worker created in dev mode"
initial_upstreams = ["127.0.0.1"]
tags {
type = ["dev", "local"]
}
}
`,
expectedDiskSpace: 0,
storagePath: "",
},
{
name: "empty storage path with set capacity",
devWorkerProvidedConfiguration: `
listener "tcp" {
purpose = "proxy"
}
worker {
name = "w_1234567890"
description = "A default worker created in dev mode"
initial_upstreams = ["127.0.0.1"]
tags {
type = ["dev", "local"]
}
recording_storage_minimum_available_capacity = "4kib"
}
`,
expectedErrMsg: "recording_storage_path cannot be empty when providing recording_storage_minimum_available_capacity",
},
{
name: "storage path with empty capacity defaults to 500mib",
devWorkerProvidedConfiguration: fmt.Sprintf(`
listener "tcp" {
purpose = "proxy"
}
worker {
name = "w_1234567890"
description = "A default worker created in dev mode"
initial_upstreams = ["127.0.0.1"]
tags {
type = ["dev", "local"]
}
recording_storage_path = "%v"
}
`, td),
storagePath: td,
expectedDiskSpace: 524288000,
},
{
name: "storage path with capacity string",
devWorkerProvidedConfiguration: fmt.Sprintf(`
listener "tcp" {
purpose = "proxy"
}
worker {
name = "w_1234567890"
description = "A default worker created in dev mode"
initial_upstreams = ["127.0.0.1"]
tags {
type = ["dev", "local"]
}
recording_storage_path = "%v"
recording_storage_minimum_available_capacity = "4kib"
}
`, td),
storagePath: td,
expectedDiskSpace: 4096,
},
{
name: "storage path with raw byte value",
devWorkerProvidedConfiguration: fmt.Sprintf(`
listener "tcp" {
purpose = "proxy"
}
worker {
name = "w_1234567890"
description = "A default worker created in dev mode"
initial_upstreams = ["127.0.0.1"]
tags {
type = ["dev", "local"]
}
recording_storage_path = "%v"
recording_storage_minimum_available_capacity = "4096"
}
`, td),
storagePath: td,
expectedDiskSpace: 4096,
},
{
name: "storage path with invalid capacity input",
devWorkerProvidedConfiguration: fmt.Sprintf(`
listener "tcp" {
purpose = "proxy"
}
worker {
name = "w_1234567890"
description = "A default worker created in dev mode"
initial_upstreams = ["127.0.0.1"]
tags {
type = ["dev", "local"]
}
recording_storage_path = "%v"
recording_storage_minimum_available_capacity = "gib"
}
`, td),
storagePath: td,
expectedErrMsg: "could not parse capacity from input",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parsed, err := Parse(devConfig + tt.devWorkerProvidedConfiguration)
if tt.expectedErrMsg != "" {
require.Error(t, err)
assert.ErrorContains(t, err, tt.expectedErrMsg)
return
}
require.NoError(t, err)
assert.Equal(t, tt.storagePath, parsed.Worker.RecordingStoragePath)
assert.Equal(t, tt.expectedDiskSpace, parsed.Worker.RecordingStorageMinimumAvailableDiskSpace)
})
}
}
func TestDevWorkerRecordingStoragePath(t *testing.T) {
t.Parallel()
td := t.TempDir()

@ -99,7 +99,13 @@ type recorderManager interface {
// create its reverseConnReceiver
var reverseConnReceiverFactory func() reverseConnReceiver
var recordingStorageFactory func(ctx context.Context, path string, plgClients map[string]plgpb.StoragePluginServiceClient, enableLoopback bool) (storage.RecordingStorage, error)
var recordingStorageFactory func(
ctx context.Context,
path string,
plgClients map[string]plgpb.StoragePluginServiceClient,
enableLoopback bool,
minimumAvailableDiskSpace uint64,
) (storage.RecordingStorage, error)
var recorderManagerFactory func(*Worker) (recorderManager, error)
@ -268,7 +274,12 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
}
// passing in an empty context so that storage can finish syncing during an emergency shutdown or interrupt
s, err := recordingStorageFactory(context.Background(), w.conf.RawConfig.Worker.RecordingStoragePath, plgClients, enableStorageLoopback)
s, err := recordingStorageFactory(
context.Background(),
w.conf.RawConfig.Worker.RecordingStoragePath,
plgClients, enableStorageLoopback,
w.conf.RawConfig.Worker.RecordingStorageMinimumAvailableDiskSpace,
)
if err != nil {
return nil, fmt.Errorf("error create recording storage: %w", err)
}

@ -21,6 +21,11 @@ const (
// NoSync mode will result in Close not syncing the file to the storage
// bucket.
NoSync
// minimumAvailableDiskSpace represents the minimum amount of available disk
// space a worker needs in the path defined by RecordingStoragePath for processing
// sessions with recording enabled.
minimumAvailableDiskSpace
)
// AccessMode is use to determine the access mode a file is opened with.
@ -49,19 +54,21 @@ func GetOpts(opt ...Option) Options {
func getDefaultOptions() Options {
return Options{
WithCloseSyncMode: Asynchronous,
WithFileAccessMode: ReadOnly,
WithCreateFile: false,
WithBuffer: 0,
WithCloseSyncMode: Asynchronous,
WithFileAccessMode: ReadOnly,
WithCreateFile: false,
WithBuffer: 0,
WithMinimumAvailableDiskSpace: DefaultMinimumAvailableDiskSpace,
}
}
// Options are storage options.
type Options struct {
WithCloseSyncMode SyncMode
WithFileAccessMode AccessMode
WithCreateFile bool
WithBuffer uint64
WithCloseSyncMode SyncMode
WithFileAccessMode AccessMode
WithCreateFile bool
WithBuffer uint64
WithMinimumAvailableDiskSpace uint64
}
// Option is a storage option.
@ -99,3 +106,12 @@ func WithBuffer(b uint64) Option {
o.WithBuffer = b
}
}
// WithMinimumAvailableDiskSpace sets the minimum amount of
// available disk space a worker needs in the local path defined for
// processing sessions with recording enabled.
func WithMinimumAvailableDiskSpace(m uint64) Option {
return func(o *Options) {
o.WithMinimumAvailableDiskSpace = m
}
}

@ -71,4 +71,17 @@ func Test_getOpts(t *testing.T) {
testOpts.WithBuffer = 4096
assert.Equal(opts, testOpts)
})
t.Run("WithMinimumAvailableDiskSpace", func(t *testing.T) {
t.Parallel()
assert := assert.New(t)
testOpts := getDefaultOptions()
opts := GetOpts()
assert.Equal(testOpts, opts)
testOpts = getDefaultOptions()
opts = GetOpts(WithMinimumAvailableDiskSpace(4096))
testOpts.WithMinimumAvailableDiskSpace = 4096
assert.Equal(opts, testOpts)
})
}

@ -13,6 +13,12 @@ import (
plgpb "github.com/hashicorp/boundary/sdk/pbs/plugin"
)
// DefaultMinimumAvailableDiskSpace is the default value a Boundary worker will use
// if the user does not configure the worker with a RecordingStorageMinimumAvailableCapacity
// value. This value is equivalent to 500MiB. This value is used to determine the worker's
// local storage state.
const DefaultMinimumAvailableDiskSpace = 500 * 1024 * 1024
// RecordingStorage can be used to create an FS usable for session recording.
type RecordingStorage interface {
// NewSyncingFS returns an FS that will use local storage as a cache and sync files when they are closed.

@ -149,20 +149,30 @@ and how long a BSR will be retained in the storage bucket.
### Boundary workers requirements
[Session recording](/boundary/docs/configuration/session-recording) requires that you [configure at least one worker for local storage](/boundary/docs/configuration/session-recording/create-storage-bucket#boundary-workers-requirements).
<Note>
You cannot use an HCP managed worker for the local storage. HCP Boundary users must configure a self-managed worker to enable session recording.
</Note>
The worker that you configure for storage must:
- Have access to the AWS S3 storage bucket
- Have an accessible directory defined by `recording_storage_path` for storing session recordings while they are in progress. On session closure, Boundary moves the local session recording to remote storage and deletes the local copy.
- For HCP Boundary, refer to the [Self-managed worker configuration](/hcp/docs/boundary/self-managed-workers/session-recording) documentation.
- For Boundary Enterprise, refer to the refer to the [worker configuration](/boundary/docs/configuration/worker/worker-configuration#session-recording) documentation.
- Have at least 1 MB of available disk space.
- Have available disk space defined by `recording_storage_minimum_available_capacity`. If you do not configure the minimum available storage capacity, Boundary uses the default value of 500MiB.
- Run Darwin, Windows, or Linux. The following binaries are not supported for session recording: NetBSD, OpenBSD, Solaris.
<Note>
You cannot use an HCP managed worker for the local storage. HCP Boundary users must configure a self-managed worker to enable session recording.
</Note>
Refer to the following example configuration:
```hcl
worker {
auth_storage_path="/boundary/demo-worker-1"
initial_upstreams = ["10.0.0.1"]
recording_storage_path="/local/storage/directory"
recording_storage_minimum_available_capacity="500MB"
}
```
Complete the following steps to create a storage bucket in Boundary for session recording:
<Tabs>

@ -37,6 +37,9 @@ worker {
# Local storage path required if session recording is enabled
recording_storage_path = "tmp/boundary/"
# Minimum available disk space required in the local storage path if session recording is enabled
recording_storage_minimum_available_capacity = "500MB"
# Mutually exclusive with hcp_boundary_cluster_id
initial_upstreams = [
"10.0.0.1",
@ -84,6 +87,16 @@ worker {
Session recordings are stored in the local storage while they are in progress.
When the session is complete, Boundary moves the local session recording to remote storage and deletes the local copy.
- `recording_storage_minimum_available_capacity` - A value measured in bytes that is used for defining the local storage state
of the worker. Boundary compares this value with the available local disk space found in the `recording_storage_path`.
The possible storage states based on the `recording_storage_minimum_available_capacity` are:
- Available - The worker is above the storage threshold and can proxy sessions that are enabled with session recording.
- Low storage - The worker is below the storage threshold. Existing sessions can continue without interruption, but new sessions that are enabled with session recording will not be proxied.
- Critically low storage - The worker is below half the storage threshold. Existing sessions that are enabled with session recording will be forcefully closed.
- Out of storage - The worker is out of local disk space. The worker is in a unrecoverable state. An administrator must intervene to remedy the issue.
- Not configured - The worker does not have a local storage path configured.
- Unknown - The default local storage state of a worker. This state indicates that the local storage state of a worker is not yet known.
- `tags` - A map of key-value pairs where values are an array of strings. Most
commonly used for [filtering](/boundary/docs/concepts/filtering) targets a
worker can proxy via [worker
@ -167,6 +180,9 @@ worker {
# Local storage path required if session recording is enabled
recording_storage_path = "tmp/boundary/"
# Minimum available disk space required in the local storage path if session recording is enabled
recording_storage_minimum_available_capacity = "500MB"
# Workers typically need to reach upstreams on :9201
initial_upstreams = [
"10.0.0.1",

Loading…
Cancel
Save