From aea473c77bbb4832c2006d6c5c93f9eff2760c6d Mon Sep 17 00:00:00 2001 From: Damian Debkowski Date: Fri, 15 Dec 2023 15:26:35 -0800 Subject: [PATCH] feat: new worker config for minimumAvailableCapacity --- internal/cmd/commands/dev/dev.go | 104 ++++++------ internal/cmd/commands/dev/flags_test.go | 1 + internal/cmd/config/config.go | 33 +++- internal/cmd/config/config_test.go | 148 ++++++++++++++++++ internal/daemon/worker/worker.go | 15 +- internal/storage/options.go | 32 +++- internal/storage/options_test.go | 13 ++ internal/storage/storage.go | 6 + .../create-storage-bucket.mdx | 22 ++- .../docs/configuration/worker/index.mdx | 16 ++ 10 files changed, 322 insertions(+), 68 deletions(-) diff --git a/internal/cmd/commands/dev/dev.go b/internal/cmd/commands/dev/dev.go index 06f73233d5..1767148473 100644 --- a/internal/cmd/commands/dev/dev.go +++ b/internal/cmd/commands/dev/dev.go @@ -66,54 +66,55 @@ type Command struct { controller *controller.Controller worker *worker.Worker - flagLogLevel string - flagLogFormat string - flagCombineLogs bool - flagLoginName string - flagPassword string - flagUnprivilegedLoginName string - flagUnprivilegedPassword string - flagIdSuffix string - flagSecondaryIdSuffix string - flagHostAddress string - flagTargetDefaultPort int - flagTargetSessionMaxSeconds int - flagTargetSessionConnectionLimit int - flagControllerApiListenAddr string - flagControllerClusterListenAddr string - flagControllerPublicClusterAddr string - flagControllerOnly bool - flagWorkerAuthKey string - flagWorkerProxyListenAddr string - flagWorkerPublicAddr string - flagOpsListenAddr string - flagUiPassthroughDir string - flagRecoveryKey string - flagDatabaseUrl string - flagContainerImage string - flagDisableDatabaseDestruction bool - flagEventFormat string - flagAudit string - flagObservations string - flagTelemetry string - flagSysEvents string - flagEveryEventAllowFilters []string - flagEveryEventDenyFilters []string - flagCreateLoopbackPlugin bool - flagPluginExecutionDir string - flagSkipPlugins bool - flagSkipOidcAuthMethodCreation bool - flagSkipLdapAuthMethodCreation bool - flagSkipAliasTargetCreation bool - flagWorkerDnsServer string - flagWorkerAuthMethod string - flagWorkerAuthStorageDir string - flagWorkerAuthStorageSkipCleanup bool - flagWorkerAuthWorkerRotationInterval time.Duration - flagWorkerAuthCaCertificateLifetime time.Duration - flagWorkerAuthDebuggingEnabled bool - flagWorkerRecordingStorageDir string - flagBsrKey string + flagLogLevel string + flagLogFormat string + flagCombineLogs bool + flagLoginName string + flagPassword string + flagUnprivilegedLoginName string + flagUnprivilegedPassword string + flagIdSuffix string + flagSecondaryIdSuffix string + flagHostAddress string + flagTargetDefaultPort int + flagTargetSessionMaxSeconds int + flagTargetSessionConnectionLimit int + flagControllerApiListenAddr string + flagControllerClusterListenAddr string + flagControllerPublicClusterAddr string + flagControllerOnly bool + flagWorkerAuthKey string + flagWorkerProxyListenAddr string + flagWorkerPublicAddr string + flagOpsListenAddr string + flagUiPassthroughDir string + flagRecoveryKey string + flagDatabaseUrl string + flagContainerImage string + flagDisableDatabaseDestruction bool + flagEventFormat string + flagAudit string + flagObservations string + flagTelemetry string + flagSysEvents string + flagEveryEventAllowFilters []string + flagEveryEventDenyFilters []string + flagCreateLoopbackPlugin bool + flagPluginExecutionDir string + flagSkipPlugins bool + flagSkipOidcAuthMethodCreation bool + flagSkipLdapAuthMethodCreation bool + flagSkipAliasTargetCreation bool + flagWorkerDnsServer string + flagWorkerAuthMethod string + flagWorkerAuthStorageDir string + flagWorkerAuthStorageSkipCleanup bool + flagWorkerAuthWorkerRotationInterval time.Duration + flagWorkerAuthCaCertificateLifetime time.Duration + flagWorkerAuthDebuggingEnabled bool + flagWorkerRecordingStorageDir string + flagWorkerRecordingStorageMinimumAvailableCapacity string + flagBsrKey string } func (c *Command) Synopsis() string { @@ -426,6 +427,12 @@ func (c *Command) Flags() *base.FlagSets { Usage: "Specifies the directory to store worker session recordings in dev mode. If not provided a temp directory will be created. Session recording is an Enterprise-only feature.", }) + f.StringVar(&base.StringVar{ + Name: "worker-recording-storage-minimum-available-capacity", + Target: &c.flagWorkerRecordingStorageMinimumAvailableCapacity, + Usage: "Specifies the minimum amount of available disk space a worker needs in the recording storage directory to process sessions with session recording enabled. Input should be a capacity string: 4kib or 3GB. Defaults to 500mib.", + }) + f.BoolVar(&base.BoolVar{ Name: "worker-auth-storage-skip-cleanup", Target: &c.flagWorkerAuthStorageSkipCleanup, @@ -528,6 +535,7 @@ func (c *Command) Run(args []string) int { if !c.flagControllerOnly { c.Config.Worker.AuthStoragePath = c.flagWorkerAuthStorageDir c.Config.Worker.RecordingStoragePath = c.flagWorkerRecordingStorageDir + c.Config.Worker.RecordingStorageMinimumAvailableCapacity = c.flagWorkerRecordingStorageMinimumAvailableCapacity if c.Config.Worker.RecordingStoragePath == "" { // Create a temp dir for recording storage diff --git a/internal/cmd/commands/dev/flags_test.go b/internal/cmd/commands/dev/flags_test.go index e2800a4a6b..b0c2f0212e 100644 --- a/internal/cmd/commands/dev/flags_test.go +++ b/internal/cmd/commands/dev/flags_test.go @@ -71,6 +71,7 @@ func TestCommand_Flags(t *testing.T) { assert.Contains(completions, "-worker-auth-method") assert.Contains(completions, "-worker-auth-storage-dir") assert.Contains(completions, "-worker-recording-storage-dir") + assert.Contains(completions, "-worker-recording-storage-minimum-available-capacity") assert.Contains(completions, "-worker-auth-storage-skip-cleanup") // keep adding assertions for other flags which should be set as a result of cmd.Flags() diff --git a/internal/cmd/config/config.go b/internal/cmd/config/config.go index d0da2f5f7b..efb7c1e297 100644 --- a/internal/cmd/config/config.go +++ b/internal/cmd/config/config.go @@ -23,6 +23,7 @@ import ( "github.com/hashicorp/boundary/internal/db" "github.com/hashicorp/boundary/internal/event" "github.com/hashicorp/boundary/internal/ratelimit" + "github.com/hashicorp/boundary/internal/storage" "github.com/hashicorp/boundary/internal/util" kms_plugin_assets "github.com/hashicorp/boundary/plugins/kms" "github.com/hashicorp/boundary/sdk/wrapper" @@ -196,7 +197,7 @@ type Controller struct { // TODO: This isn't documented (on purpose) because the right place for this // is central configuration so you can't drift across controllers, but we // don't have that yet. - WorkerStatusGracePeriod interface{} `hcl:"worker_status_grace_period"` + WorkerStatusGracePeriod any `hcl:"worker_status_grace_period"` WorkerStatusGracePeriodDuration time.Duration `hcl:"-"` // LivenessTimeToStale represents the period of time (as a duration) after @@ -206,7 +207,7 @@ type Controller struct { // TODO: This isn't documented (on purpose) because the right place for this // is central configuration so you can't drift across controllers, but we // don't have that yet. - LivenessTimeToStale interface{} `hcl:"liveness_time_to_stale"` + LivenessTimeToStale any `hcl:"liveness_time_to_stale"` LivenessTimeToStaleDuration time.Duration `hcl:"-"` // SchedulerRunJobInterval is the time interval between waking up the @@ -275,7 +276,7 @@ type Worker struct { // canceling it to try again. // // TODO: This is currently not documented and considered internal. - StatusCallTimeout interface{} `hcl:"status_call_timeout"` + StatusCallTimeout any `hcl:"status_call_timeout"` StatusCallTimeoutDuration time.Duration `hcl:"-"` // SuccessfulStatusGracePeriod represents the period of time (as a duration) @@ -284,7 +285,7 @@ type Worker struct { // less than StatusCallTimeout. // // TODO: This is currently not documented and considered internal. - SuccessfulStatusGracePeriod interface{} `hcl:"successful_status_grace_period"` + SuccessfulStatusGracePeriod any `hcl:"successful_status_grace_period"` SuccessfulStatusGracePeriodDuration time.Duration `hcl:"-"` // AuthStoragePath represents the location a worker stores its node credentials, if set @@ -294,6 +295,15 @@ type Worker struct { // they are sync'ed to the corresponding storage bucket. The path must already exist. RecordingStoragePath string `hcl:"recording_storage_path"` + // RecordingStorageMinimumAvailableCapacity represents the minimum amount of available + // disk space a worker needs in the path defined by RecordingStoragePath for processing + // sessions with recording enabled. The expected input value for this field is a + // “capacity string“. Supported suffixes are kb, kib, mb, mib, gb, gib, tb, tib, which + // are not case sensitive. We use a raw interface for parsing so that users can input a + // "capacity string" which is converted into a uint64 value that is measured in bytes. + RecordingStorageMinimumAvailableCapacity any `hcl:"recording_storage_minimum_available_capacity"` + RecordingStorageMinimumAvailableDiskSpace uint64 `hcl:"-"` + // ControllerGeneratedActivationToken is a controller-generated activation // token used to register this worker to the cluster. It can be a path, env // var, or direct value. @@ -790,6 +800,21 @@ func Parse(d string) (*Config, error) { return nil, errors.New("Successful status grace period value is negative") } + if !util.IsNil(result.Worker.RecordingStorageMinimumAvailableCapacity) { + if result.Worker.RecordingStoragePath == "" { + return nil, errors.New("recording_storage_path cannot be empty when providing recording_storage_minimum_available_capacity") + } + recordingStorageMinimumAvailableDiskSpace, err := parseutil.ParseCapacityString(result.Worker.RecordingStorageMinimumAvailableCapacity) + if err != nil { + return result, err + } + result.Worker.RecordingStorageMinimumAvailableDiskSpace = recordingStorageMinimumAvailableDiskSpace + } + // RecordingStorageMinimumAvailableDiskSpace defaults to 500MiB when not set by the user + if result.Worker.RecordingStoragePath != "" && result.Worker.RecordingStorageMinimumAvailableDiskSpace == 0 { + result.Worker.RecordingStorageMinimumAvailableDiskSpace = storage.DefaultMinimumAvailableDiskSpace + } + switch { case result.Worker.StatusCallTimeoutDuration == 0 && result.Worker.SuccessfulStatusGracePeriodDuration == 0: // Nothing diff --git a/internal/cmd/config/config_test.go b/internal/cmd/config/config_test.go index 2d20130ee8..eaa9f8836b 100644 --- a/internal/cmd/config/config_test.go +++ b/internal/cmd/config/config_test.go @@ -575,6 +575,154 @@ func TestDevWorkerCredentialStoragePath(t *testing.T) { } } +func TestDevWorkerRecordingStorageMinimumAvailableCapacity(t *testing.T) { + t.Parallel() + td := t.TempDir() + tests := []struct { + name string + devWorkerProvidedConfiguration string + storagePath string + storageCapacity string + expectedDiskSpace uint64 + expectedErrMsg string + }{ + { + name: "empty storage with empty capacity", + devWorkerProvidedConfiguration: ` + listener "tcp" { + purpose = "proxy" + } + + worker { + name = "w_1234567890" + description = "A default worker created in dev mode" + initial_upstreams = ["127.0.0.1"] + tags { + type = ["dev", "local"] + } + } + `, + expectedDiskSpace: 0, + storagePath: "", + }, + { + name: "empty storage path with set capacity", + devWorkerProvidedConfiguration: ` + listener "tcp" { + purpose = "proxy" + } + + worker { + name = "w_1234567890" + description = "A default worker created in dev mode" + initial_upstreams = ["127.0.0.1"] + tags { + type = ["dev", "local"] + } + recording_storage_minimum_available_capacity = "4kib" + } + `, + expectedErrMsg: "recording_storage_path cannot be empty when providing recording_storage_minimum_available_capacity", + }, + { + name: "storage path with empty capacity defaults to 500mib", + devWorkerProvidedConfiguration: fmt.Sprintf(` + listener "tcp" { + purpose = "proxy" + } + + worker { + name = "w_1234567890" + description = "A default worker created in dev mode" + initial_upstreams = ["127.0.0.1"] + tags { + type = ["dev", "local"] + } + recording_storage_path = "%v" + } + `, td), + storagePath: td, + expectedDiskSpace: 524288000, + }, + { + name: "storage path with capacity string", + devWorkerProvidedConfiguration: fmt.Sprintf(` + listener "tcp" { + purpose = "proxy" + } + + worker { + name = "w_1234567890" + description = "A default worker created in dev mode" + initial_upstreams = ["127.0.0.1"] + tags { + type = ["dev", "local"] + } + recording_storage_path = "%v" + recording_storage_minimum_available_capacity = "4kib" + } + `, td), + storagePath: td, + expectedDiskSpace: 4096, + }, + { + name: "storage path with raw byte value", + devWorkerProvidedConfiguration: fmt.Sprintf(` + listener "tcp" { + purpose = "proxy" + } + + worker { + name = "w_1234567890" + description = "A default worker created in dev mode" + initial_upstreams = ["127.0.0.1"] + tags { + type = ["dev", "local"] + } + recording_storage_path = "%v" + recording_storage_minimum_available_capacity = "4096" + } + `, td), + storagePath: td, + expectedDiskSpace: 4096, + }, + { + name: "storage path with invalid capacity input", + devWorkerProvidedConfiguration: fmt.Sprintf(` + listener "tcp" { + purpose = "proxy" + } + + worker { + name = "w_1234567890" + description = "A default worker created in dev mode" + initial_upstreams = ["127.0.0.1"] + tags { + type = ["dev", "local"] + } + recording_storage_path = "%v" + recording_storage_minimum_available_capacity = "gib" + } + `, td), + storagePath: td, + expectedErrMsg: "could not parse capacity from input", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parsed, err := Parse(devConfig + tt.devWorkerProvidedConfiguration) + if tt.expectedErrMsg != "" { + require.Error(t, err) + assert.ErrorContains(t, err, tt.expectedErrMsg) + return + } + require.NoError(t, err) + assert.Equal(t, tt.storagePath, parsed.Worker.RecordingStoragePath) + assert.Equal(t, tt.expectedDiskSpace, parsed.Worker.RecordingStorageMinimumAvailableDiskSpace) + }) + } +} + func TestDevWorkerRecordingStoragePath(t *testing.T) { t.Parallel() td := t.TempDir() diff --git a/internal/daemon/worker/worker.go b/internal/daemon/worker/worker.go index cd2d421104..4150ba92aa 100644 --- a/internal/daemon/worker/worker.go +++ b/internal/daemon/worker/worker.go @@ -99,7 +99,13 @@ type recorderManager interface { // create its reverseConnReceiver var reverseConnReceiverFactory func() reverseConnReceiver -var recordingStorageFactory func(ctx context.Context, path string, plgClients map[string]plgpb.StoragePluginServiceClient, enableLoopback bool) (storage.RecordingStorage, error) +var recordingStorageFactory func( + ctx context.Context, + path string, + plgClients map[string]plgpb.StoragePluginServiceClient, + enableLoopback bool, + minimumAvailableDiskSpace uint64, +) (storage.RecordingStorage, error) var recorderManagerFactory func(*Worker) (recorderManager, error) @@ -268,7 +274,12 @@ func New(ctx context.Context, conf *Config) (*Worker, error) { } // passing in an empty context so that storage can finish syncing during an emergency shutdown or interrupt - s, err := recordingStorageFactory(context.Background(), w.conf.RawConfig.Worker.RecordingStoragePath, plgClients, enableStorageLoopback) + s, err := recordingStorageFactory( + context.Background(), + w.conf.RawConfig.Worker.RecordingStoragePath, + plgClients, enableStorageLoopback, + w.conf.RawConfig.Worker.RecordingStorageMinimumAvailableDiskSpace, + ) if err != nil { return nil, fmt.Errorf("error create recording storage: %w", err) } diff --git a/internal/storage/options.go b/internal/storage/options.go index 37b498feb3..bda75e322e 100644 --- a/internal/storage/options.go +++ b/internal/storage/options.go @@ -21,6 +21,11 @@ const ( // NoSync mode will result in Close not syncing the file to the storage // bucket. NoSync + + // minimumAvailableDiskSpace represents the minimum amount of available disk + // space a worker needs in the path defined by RecordingStoragePath for processing + // sessions with recording enabled. + minimumAvailableDiskSpace ) // AccessMode is use to determine the access mode a file is opened with. @@ -49,19 +54,21 @@ func GetOpts(opt ...Option) Options { func getDefaultOptions() Options { return Options{ - WithCloseSyncMode: Asynchronous, - WithFileAccessMode: ReadOnly, - WithCreateFile: false, - WithBuffer: 0, + WithCloseSyncMode: Asynchronous, + WithFileAccessMode: ReadOnly, + WithCreateFile: false, + WithBuffer: 0, + WithMinimumAvailableDiskSpace: DefaultMinimumAvailableDiskSpace, } } // Options are storage options. type Options struct { - WithCloseSyncMode SyncMode - WithFileAccessMode AccessMode - WithCreateFile bool - WithBuffer uint64 + WithCloseSyncMode SyncMode + WithFileAccessMode AccessMode + WithCreateFile bool + WithBuffer uint64 + WithMinimumAvailableDiskSpace uint64 } // Option is a storage option. @@ -99,3 +106,12 @@ func WithBuffer(b uint64) Option { o.WithBuffer = b } } + +// WithMinimumAvailableDiskSpace sets the minimum amount of +// available disk space a worker needs in the local path defined for +// processing sessions with recording enabled. +func WithMinimumAvailableDiskSpace(m uint64) Option { + return func(o *Options) { + o.WithMinimumAvailableDiskSpace = m + } +} diff --git a/internal/storage/options_test.go b/internal/storage/options_test.go index 675bb6a799..5eb5281ec9 100644 --- a/internal/storage/options_test.go +++ b/internal/storage/options_test.go @@ -71,4 +71,17 @@ func Test_getOpts(t *testing.T) { testOpts.WithBuffer = 4096 assert.Equal(opts, testOpts) }) + + t.Run("WithMinimumAvailableDiskSpace", func(t *testing.T) { + t.Parallel() + assert := assert.New(t) + testOpts := getDefaultOptions() + opts := GetOpts() + assert.Equal(testOpts, opts) + + testOpts = getDefaultOptions() + opts = GetOpts(WithMinimumAvailableDiskSpace(4096)) + testOpts.WithMinimumAvailableDiskSpace = 4096 + assert.Equal(opts, testOpts) + }) } diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 48bde0a2f3..fa6237646f 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -13,6 +13,12 @@ import ( plgpb "github.com/hashicorp/boundary/sdk/pbs/plugin" ) +// DefaultMinimumAvailableDiskSpace is the default value a Boundary worker will use +// if the user does not configure the worker with a RecordingStorageMinimumAvailableCapacity +// value. This value is equivalent to 500MiB. This value is used to determine the worker's +// local storage state. +const DefaultMinimumAvailableDiskSpace = 500 * 1024 * 1024 + // RecordingStorage can be used to create an FS usable for session recording. type RecordingStorage interface { // NewSyncingFS returns an FS that will use local storage as a cache and sync files when they are closed. diff --git a/website/content/docs/configuration/session-recording/create-storage-bucket.mdx b/website/content/docs/configuration/session-recording/create-storage-bucket.mdx index f5dbca389f..ce59531d1d 100644 --- a/website/content/docs/configuration/session-recording/create-storage-bucket.mdx +++ b/website/content/docs/configuration/session-recording/create-storage-bucket.mdx @@ -149,20 +149,30 @@ and how long a BSR will be retained in the storage bucket. ### Boundary workers requirements [Session recording](/boundary/docs/configuration/session-recording) requires that you [configure at least one worker for local storage](/boundary/docs/configuration/session-recording/create-storage-bucket#boundary-workers-requirements). - - -You cannot use an HCP managed worker for the local storage. HCP Boundary users must configure a self-managed worker to enable session recording. - - The worker that you configure for storage must: - Have access to the AWS S3 storage bucket - Have an accessible directory defined by `recording_storage_path` for storing session recordings while they are in progress. On session closure, Boundary moves the local session recording to remote storage and deletes the local copy. - For HCP Boundary, refer to the [Self-managed worker configuration](/hcp/docs/boundary/self-managed-workers/session-recording) documentation. - For Boundary Enterprise, refer to the refer to the [worker configuration](/boundary/docs/configuration/worker/worker-configuration#session-recording) documentation. -- Have at least 1 MB of available disk space. +- Have available disk space defined by `recording_storage_minimum_available_capacity`. If you do not configure the minimum available storage capacity, Boundary uses the default value of 500MiB. - Run Darwin, Windows, or Linux. The following binaries are not supported for session recording: NetBSD, OpenBSD, Solaris. + +You cannot use an HCP managed worker for the local storage. HCP Boundary users must configure a self-managed worker to enable session recording. + + +Refer to the following example configuration: + +```hcl +worker { +auth_storage_path="/boundary/demo-worker-1" +initial_upstreams = ["10.0.0.1"] +recording_storage_path="/local/storage/directory" +recording_storage_minimum_available_capacity="500MB" +} +``` + Complete the following steps to create a storage bucket in Boundary for session recording: diff --git a/website/content/docs/configuration/worker/index.mdx b/website/content/docs/configuration/worker/index.mdx index fe5d353dfb..5fab781e8e 100644 --- a/website/content/docs/configuration/worker/index.mdx +++ b/website/content/docs/configuration/worker/index.mdx @@ -37,6 +37,9 @@ worker { # Local storage path required if session recording is enabled recording_storage_path = "tmp/boundary/" + # Minimum available disk space required in the local storage path if session recording is enabled + recording_storage_minimum_available_capacity = "500MB" + # Mutually exclusive with hcp_boundary_cluster_id initial_upstreams = [ "10.0.0.1", @@ -84,6 +87,16 @@ worker { Session recordings are stored in the local storage while they are in progress. When the session is complete, Boundary moves the local session recording to remote storage and deletes the local copy. +- `recording_storage_minimum_available_capacity` - A value measured in bytes that is used for defining the local storage state + of the worker. Boundary compares this value with the available local disk space found in the `recording_storage_path`. + The possible storage states based on the `recording_storage_minimum_available_capacity` are: + - Available - The worker is above the storage threshold and can proxy sessions that are enabled with session recording. + - Low storage - The worker is below the storage threshold. Existing sessions can continue without interruption, but new sessions that are enabled with session recording will not be proxied. + - Critically low storage - The worker is below half the storage threshold. Existing sessions that are enabled with session recording will be forcefully closed. + - Out of storage - The worker is out of local disk space. The worker is in a unrecoverable state. An administrator must intervene to remedy the issue. + - Not configured - The worker does not have a local storage path configured. + - Unknown - The default local storage state of a worker. This state indicates that the local storage state of a worker is not yet known. + - `tags` - A map of key-value pairs where values are an array of strings. Most commonly used for [filtering](/boundary/docs/concepts/filtering) targets a worker can proxy via [worker @@ -167,6 +180,9 @@ worker { # Local storage path required if session recording is enabled recording_storage_path = "tmp/boundary/" + # Minimum available disk space required in the local storage path if session recording is enabled + recording_storage_minimum_available_capacity = "500MB" + # Workers typically need to reach upstreams on :9201 initial_upstreams = [ "10.0.0.1",