Cassandra Connect Support (#5952)

* Init cassandra

* Init cassandra

* Initial Cassandra Command Support For cqlsh

* Add Make Gen

* adding cassandra test

* Cleanup cassandra docker setup

* Adding container wait

* Container restart

* Minor Fix

* Cleanup docker and test

* Adding final tests

* Removing comment

* Fixing cleanup (uncomment)

* Removing bad yaml

* Cleanup

* Cleanup

* Adressing PR review comments

* Adding test support

* fix spacing

* Adding changelog

* docs

* docs

* adding cassandra page

* nav

* Versbose commenting

* cleanup naming of file

* updating helper func to return errors

* doc change

* Removed an incorrect comment and now passing the config by reference.

* Comment cleanup

* Fix option warning in cassandra connect command (#5951)

* Add healthcheck to caller function as it's resposible for container health

* Resolve Comments

* Update Log Typo

---------

Co-authored-by: Bharath Gajjala <120367134+bgajjala8@users.noreply.github.com>
pull/5966/head
Ryan Derr 10 months ago committed by GitHub
parent 029aa361bd
commit 4d3cb76c28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -12,6 +12,10 @@ Canonical reference for changes, improvements, and bugfixes for Boundary.
connection parameters and credentials.
* Adds support to parse User-Agent headers and emit them in telemetry events
([PR](https://github.com/hashicorp/boundary/pull/5645)).
* cli: Added `boundary connect cassandra` command for connecting to Cassandra targets.
This new helper command allows users to authorize sessions against Cassandra
targets and automatically invoke a Cassandra client with the appropriate
connection parameters and credentials. Currently only username/password credentials are automatically attached.
### Deprecations/Changes

@ -15,7 +15,26 @@ apt update
# lsb-release is used for adding the hashicorp apt source
# postgresql-client is used for postgres tests
# default-mysql-client is used for mysql tests
apt install unzip pass lsb-release postgresql-client default-mysql-client -y
# wget is used for downloading external dependencies and repository keys
# apt-transport-https enables HTTPS transport for APT repositories
apt install unzip pass lsb-release postgresql-client default-mysql-client wget apt-transport-https -y
# Function to install Cassandra
install_cassandra() {
# Add Cassandra repository key
wget -q -O - https://www.apache.org/dist/cassandra/KEYS | apt-key add -
# Add Cassandra repository
echo "deb https://debian.cassandra.apache.org 41x main" | tee -a /etc/apt/sources.list.d/cassandra.sources.list
# Update package list and install Cassandra
apt update
apt install cassandra -y
}
# Install Cassandra
install_cassandra
# Create a GPG key
export KEY_PW=boundary

@ -417,6 +417,12 @@ func initCommands(ui, serverCmdUi cli.Ui, runOpts *RunOptions) {
Func: "mysql",
}
}),
"connect cassandra": wrapper.Wrap(func() wrapper.WrappableCommand {
return &connect.Command{
Command: base.NewCommand(ui, opts...),
Func: "cassandra",
}
}),
"connect rdp": wrapper.Wrap(func() wrapper.WrappableCommand {
return &connect.Command{
Command: base.NewCommand(ui, opts...),

@ -0,0 +1,117 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package connect
import (
"fmt"
"os"
"strings"
"github.com/hashicorp/boundary/api/proxy"
"github.com/hashicorp/boundary/internal/cmd/base"
"github.com/posener/complete"
)
const (
cassandraSynopsis = "Authorize a session against a target and invoke a Cassandra client to connect"
)
func cassandraOptions(c *Command, set *base.FlagSets) {
f := set.NewFlagSet("Cassandra Options")
f.StringVar(&base.StringVar{
Name: "style",
Target: &c.flagCassandraStyle,
EnvVar: "BOUNDARY_CONNECT_CASSANDRA_STYLE",
Completion: complete.PredictSet("cqlsh"),
Default: "cqlsh",
Usage: `Specifies how the CLI will attempt to invoke a Cassandra client. This will also set a suitable default for -exec if a value was not specified. Currently-understood values are "cqlsh".`,
})
f.StringVar(&base.StringVar{
Name: "username",
Target: &c.flagUsername,
EnvVar: "BOUNDARY_CONNECT_USERNAME",
Completion: complete.PredictNothing,
Usage: `Specifies the username to pass through to the client. May be overridden by credentials sourced from a credential store.`,
})
f.StringVar(&base.StringVar{
Name: "keyspace",
Target: &c.flagDbname,
EnvVar: "BOUNDARY_CONNECT_KEYSPACE",
Completion: complete.PredictNothing,
Usage: `Specifies the keyspace name to pass through to the client.`,
})
}
type cassandraFlags struct {
flagCassandraStyle string
}
func (m *cassandraFlags) defaultExec() string {
return strings.ToLower(m.flagCassandraStyle)
}
func (m *cassandraFlags) buildArgs(c *Command, port, ip, _ string, creds proxy.Credentials) (args, envs []string, retCreds proxy.Credentials, retErr error) {
var username, password string
retCreds = creds
if len(retCreds.UsernamePassword) > 0 {
// Mark credential as consumed, such that it is not printed to the user
retCreds.UsernamePassword[0].Consumed = true
// Grab the first available username/password credential brokered
username = retCreds.UsernamePassword[0].Username
password = retCreds.UsernamePassword[0].Password
}
switch m.flagCassandraStyle {
case "cqlsh":
switch {
case username != "":
args = append(args, "-u", username)
case c.flagUsername != "":
args = append(args, "-u", c.flagUsername)
}
if c.flagDbname != "" {
args = append(args, "-k", c.flagDbname)
} else {
c.UI.Warn("Credentials are being brokered but no -keyspace parameter provided. cqlsh may misinterpret another parameter as the keyspace name.")
}
if password != "" {
passfile, err := os.CreateTemp("", "*")
if err != nil {
return nil, nil, proxy.Credentials{}, fmt.Errorf("Error saving cassandra password to tmp file: %w", err)
}
c.cleanupFuncs = append(c.cleanupFuncs, func() error {
if err := os.Remove(passfile.Name()); err != nil {
return fmt.Errorf("Error removing temporary password file; consider removing %s manually: %w", passfile.Name(), err)
}
return nil
})
_, err = passfile.WriteString("[PlainTextAuthProvider]\npassword = " + password)
if err != nil {
passfile.Close()
return nil, nil, proxy.Credentials{}, fmt.Errorf("Error writing password file to %s: %w", passfile.Name(), err)
}
if err := passfile.Close(); err != nil {
return nil, nil, proxy.Credentials{}, fmt.Errorf("Error closing password file after writing to %s: %w", passfile.Name(), err)
}
args = append(args, "--credentials", passfile.Name())
}
args = append(args, ip)
if port != "" {
args = append(args, port)
}
}
return
}

@ -80,6 +80,9 @@ type Command struct {
// MySQL
mysqlFlags
// Cassandra
cassandraFlags
// RDP
rdpFlags
@ -108,6 +111,8 @@ func (c *Command) Synopsis() string {
return postgresSynopsis
case "mysql":
return mysqlSynopsis
case "cassandra":
return cassandraSynopsis
case "rdp":
return rdpSynopsis
case "ssh":
@ -230,6 +235,9 @@ func (c *Command) Flags() *base.FlagSets {
case "mysql":
mysqlOptions(c, set)
case "cassandra":
cassandraOptions(c, set)
case "rdp":
rdpOptions(c, set)
@ -319,6 +327,8 @@ func (c *Command) Run(args []string) (retCode int) {
c.flagExec = c.postgresFlags.defaultExec()
case "mysql":
c.flagExec = c.mysqlFlags.defaultExec()
case "cassandra":
c.flagExec = c.cassandraFlags.defaultExec()
case "rdp":
c.flagExec = c.rdpFlags.defaultExec()
case "kube":
@ -673,6 +683,16 @@ func (c *Command) handleExec(clientProxy *apiproxy.ClientProxy, passthroughArgs
envs = append(envs, mysqlEnvs...)
creds = mysqlCreds
case "cassandra":
cassandraArgs, cassandraEnvs, cassandraCreds, cassandraErr := c.cassandraFlags.buildArgs(c, port, host, addr, creds)
if cassandraErr != nil {
argsErr = cassandraErr
break
}
args = append(args, cassandraArgs...)
envs = append(envs, cassandraEnvs...)
creds = cassandraCreds
case "rdp":
args = append(args, c.rdpFlags.buildArgs(c, port, host, addr)...)

@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"os"
"os/exec"
"path"
"path/filepath"
"testing"
@ -26,6 +27,14 @@ type Container struct {
UriNetwork string
}
// cassandraConfig stores configuration details for the Cassandra container
type cassandraConfig struct {
User string
Password string
Keyspace string
NetworkAlias string
}
// StartBoundaryDatabase spins up a postgres database in a docker container.
// Returns information about the container
func StartBoundaryDatabase(t testing.TB, pool *dockertest.Pool, network *dockertest.Network, repository, tag string) *Container {
@ -344,3 +353,129 @@ func StartMysql(t testing.TB, pool *dockertest.Pool, network *dockertest.Network
UriNetwork: fmt.Sprintf("mysql://%s:%s@%s:3306/%s", mysqlUser, mysqlPassword, networkAlias, mysqlDb),
}
}
// StartCassandra starts a Cassandra database in a docker container.
// Returns information about the container
func StartCassandra(t testing.TB, pool *dockertest.Pool, network *dockertest.Network, repository, tag string) *Container {
t.Log("Starting Cassandra database...")
c, err := LoadConfig()
require.NoError(t, err)
err = pool.Client.PullImage(docker.PullImageOptions{
Repository: fmt.Sprintf("%s/%s", c.DockerMirror, repository),
Tag: tag,
}, docker.AuthConfiguration{})
require.NoError(t, err)
config := cassandraConfig{
User: "e2eboundary",
Password: "e2eboundary",
Keyspace: "e2eboundarykeyspace",
NetworkAlias: "e2ecassandra",
}
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: fmt.Sprintf("%s/%s", c.DockerMirror, repository),
Tag: tag,
Env: []string{
"CASSANDRA_CLUSTER_NAME=e2e-boundary-cluster",
},
ExposedPorts: []string{"9042/tcp"},
Name: config.NetworkAlias,
Networks: []*dockertest.Network{network},
})
require.NoError(t, err)
// Cassandra container takes a while to start due to the gossip protocol needing to settle and establish connections.
// This relies on the pool's extended maxWait time to ensure the container is healthy.
err = pool.Retry(func() error {
cmd := exec.Command("docker", "exec", config.NetworkAlias, "cqlsh", "-e", "SELECT now() FROM system.local;")
output, cmdErr := cmd.CombinedOutput()
if cmdErr != nil {
return fmt.Errorf("failed to connect to Cassandra container '%s': %v\nOutput: %s", config.NetworkAlias, cmdErr, string(output))
}
return nil
})
require.NoError(t, err, "Cassandra container did not start in time or is not healthy")
err = setupCassandraAuthAndUser(t, resource, pool, &config)
require.NoError(t, err)
return &Container{
Resource: resource,
UriLocalhost: fmt.Sprintf(
"cassandra://%s:%s@%s/%s",
config.User,
config.Password,
resource.GetHostPort("9042/tcp"),
config.Keyspace,
),
UriNetwork: fmt.Sprintf(
"cassandra://%s:%s@%s:9042/%s",
config.User,
config.Password,
config.NetworkAlias,
config.Keyspace,
),
}
}
// setupCassandraAuthAndUser enables authentication on a Cassandra container and creates a user with permissions.
func setupCassandraAuthAndUser(t testing.TB, resource *dockertest.Resource, pool *dockertest.Pool, config *cassandraConfig) error {
t.Helper()
t.Log("Configuring Cassandra authentication and user permissions...")
t.Logf("Initializing Cassandra keyspace: %s...", config.Keyspace)
createKeyspaceCmd := fmt.Sprintf(
"CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};",
config.Keyspace,
)
if err := exec.Command("docker", "exec", config.NetworkAlias, "cqlsh", "-e", createKeyspaceCmd).Run(); err != nil {
return err
}
// Commands to enable authentication and authorization by editing cassandra.yaml
sedCmd := []string{
"sed", "-i",
"-e", "s/^authenticator:.*/authenticator: PasswordAuthenticator/",
"-e", "s/^authorizer:.*/authorizer: CassandraAuthorizer/",
"-e", "s/^role_manager:.*/role_manager: CassandraRoleManager/",
"/etc/cassandra/cassandra.yaml",
}
if _, err := resource.Exec(sedCmd, dockertest.ExecOptions{}); err != nil {
return err
}
if err := pool.Client.RestartContainer(resource.Container.ID, uint(pool.MaxWait.Seconds())); err != nil {
return err
}
t.Log("Waiting for Cassandra container to restart and apply authentication settings...")
// Wait for Cassandra to be up with authentication enabled
if err := pool.Retry(func() error {
return exec.Command(
"docker", "exec", config.NetworkAlias,
"cqlsh", "-u", "cassandra", "-p", "cassandra",
"-e", "SELECT now() FROM system.local;",
).Run()
}); err != nil {
return err
}
t.Log("Creating Cassandra user and granting permissions...")
cqlCmds := []string{
fmt.Sprintf("CREATE ROLE IF NOT EXISTS %s WITH PASSWORD = '%s' AND LOGIN = true;", config.User, config.Password),
fmt.Sprintf("GRANT ALL PERMISSIONS ON KEYSPACE %s TO %s;", config.Keyspace, config.User),
fmt.Sprintf("USE %s; CREATE TABLE IF NOT EXISTS users (id UUID PRIMARY KEY, name TEXT, created_at TIMESTAMP);", config.Keyspace),
}
for _, cmd := range cqlCmds {
if err := exec.Command(
"docker", "exec", config.NetworkAlias,
"cqlsh", "-u", "cassandra", "-p", "cassandra", "-e", cmd,
).Run(); err != nil {
return err
}
}
return nil
}

@ -0,0 +1,127 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package base_test
import (
"bytes"
"context"
"fmt"
"io"
"net/url"
"os/exec"
"testing"
"time"
"github.com/creack/pty"
"github.com/hashicorp/boundary/internal/target"
"github.com/hashicorp/boundary/testing/internal/e2e"
"github.com/hashicorp/boundary/testing/internal/e2e/boundary"
"github.com/hashicorp/boundary/testing/internal/e2e/infra"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"
)
// TestCliTcpTargetConnectCassandra uses the boundary cli to connect to a
// target using `connect cassandra`
func TestCliTcpTargetConnectCassandra(t *testing.T) {
e2e.MaybeSkipTest(t)
pool, err := dockertest.NewPool("")
require.NoError(t, err)
// Increase timeout to accommodate Cassandra's longer startup duration due to gossip needing to settle
pool.MaxWait = 90 * time.Second
ctx := context.Background()
// e2e_cluster network is created by the e2e infra setup
network, err := pool.NetworksByName("e2e_cluster")
require.NoError(t, err, "Failed to get e2e_cluster network")
c := infra.StartCassandra(t, pool, &network[0], "cassandra", "latest")
require.NotNil(t, c, "Cassandra container should not be nil")
t.Cleanup(func() {
if err := pool.Purge(c.Resource); err != nil {
t.Logf("Failed to purge Cassandra container: %v", err)
}
})
u, err := url.Parse(c.UriNetwork)
t.Log(u)
require.NoError(t, err, "Failed to parse Cassandra URL")
user, hostname, port, keyspace := u.User.Username(), u.Hostname(), u.Port(), u.Path[1:]
pw, pwSet := u.User.Password()
t.Logf("Cassandra info: user=%s, keyspace=%s, host=%s, port=%s, password-set:%t",
user, keyspace, hostname, port, pwSet)
boundary.AuthenticateAdminCli(t, ctx)
orgId, err := boundary.CreateOrgCli(t, ctx)
require.NoError(t, err)
t.Cleanup(func() {
ctx := context.Background()
boundary.AuthenticateAdminCli(t, ctx)
output := e2e.RunCommand(ctx, "boundary", e2e.WithArgs("scopes", "delete", "-id", orgId))
require.NoError(t, output.Err, string(output.Stderr))
})
projectId, err := boundary.CreateProjectCli(t, ctx, orgId)
require.NoError(t, err)
targetId, err := boundary.CreateTargetCli(
t,
ctx,
projectId,
port,
target.WithAddress(hostname),
)
require.NoError(t, err)
storeId, err := boundary.CreateCredentialStoreStaticCli(t, ctx, projectId)
require.NoError(t, err)
credentialId, err := boundary.CreateStaticCredentialPasswordCli(
t,
ctx,
storeId,
user,
pw,
)
require.NoError(t, err)
err = boundary.AddBrokeredCredentialSourceToTargetCli(t, ctx, targetId, credentialId)
require.NoError(t, err)
cmd := exec.CommandContext(ctx,
"boundary",
"connect", "cassandra",
"-target-id", targetId,
"-keyspace", keyspace,
)
f, err := pty.Start(cmd)
require.NoError(t, err)
t.Cleanup(func() {
err := f.Close()
require.NoError(t, err)
})
_, err = f.Write([]byte("DESCRIBE KEYSPACES;\n"))
require.NoError(t, err)
_, err = f.Write([]byte(fmt.Sprintf("SELECT keyspace_name FROM system_schema.keyspaces WHERE keyspace_name = '%s';\n", keyspace)))
require.NoError(t, err)
_, err = f.Write([]byte("exit\n"))
require.NoError(t, err)
var buf bytes.Buffer
_, _ = io.Copy(&buf, f)
output := buf.String()
t.Logf("Cassandra session output: %s", output)
require.Contains(t, output, "keyspace_name")
require.Contains(t, output, keyspace)
t.Log("Successfully connected to Cassandra target")
}

@ -0,0 +1,69 @@
---
layout: docs
page_title: connect cassandra - Command
description: >-
The "connect cassandra" command performs a target authorization or consumes an existing authorization token, and then launches a proxied Cassandra (CQL) connection.
---
# connect cassandra
Command: `boundary connect cassandra`
The `connect cassandra` command authorizes a session against a target and invokes a Cassandra client for the connection.
The command fills in the local address and port.
> **Note:** Currently, only configurations using Cassandra's `PasswordAuthenticator` are supported.
@include 'cmd-connect-env-vars.mdx'
## Examples
The following example shows how to connect to a target with the ID `ttcp_eTcZMueUYv` using a Cassandra helper:
```shell-session
$ boundary connect cassandra -target-id=ttcp_eTcZMueUYv \
-keyspace=mykeyspace \
-username=superuser
```
When prompted, you must enter the password for the user, "superuser":
<CodeBlockConfig hideClipboard>
```plaintext
Password:
Connected to example-cluster at localhost:9042
[cqlsh 6.2.0 | Cassandra 5.0.4 | CQL spec 3.4.7 | Native protocol v5]
Use HELP for help.
superuser@cqlsh:mykeyspace>
```
</CodeBlockConfig>
## Usage
<CodeBlockConfig hideClipboard>
```shell-session
$ boundary connect cassandra [options] [args]
```
</CodeBlockConfig>
@include 'cmd-connect-command-options.mdx'
### Cassandra options:
- `-keyspace` `(string: "")` - The keyspace name you want to pass through to the client.
You can also specify the keyspace name using the **BOUNDARY_CONNECT_KEYSPACE** environment variable.
- `-style` `(string: "")` - How the CLI attempts to invoke a Cassandra client.
This value also sets a suitable default for `-exec`, if you did not specify a value.
The default and currently-understood value is `cqlsh`.
You can also specify how the CLI attempts to invoke a Cassandra client using the **BOUNDARY_CONNECT_CASSANDRA_STYLE** environment variable.
- `-username` `(string: "")` - The username you want to pass through to the client.
This value may be overridden by credentials sourced from a credential store.
You can also specify a username using the **BOUNDARY_CONNECT_USERNAME** environment variable.
@include 'cmd-option-note.mdx'

@ -41,6 +41,7 @@ Usage: boundary connect <subcommand> [options] [args]
Subcommands:
http Authorize a session against a target and invoke an HTTP client to connect
kube Authorize a session against a target and invoke a Kubernetes client to connect
cassandra Authorize a session against a target and invoke a Cassandra client to connect
mysql Authorize a session against a target and invoke a MySQL client to connect
postgres Authorize a session against a target and invoke a Postgres client to connect
rdp Authorize a session against a target and invoke an RDP client to connect
@ -54,6 +55,7 @@ of the subcommand in the sidebar or one of the links below:
- [http](/boundary/docs/commands/connect/http)
- [kube](/boundary/docs/commands/connect/kube)
- [cassandra](/boundary/docs/commands/connect/cassandra)
- [mysql](/boundary/docs/commands/connect/mysql)
- [postgres](/boundary/docs/commands/connect/postgres)
- [rdp](/boundary/docs/commands/connect/rdp)

@ -1007,6 +1007,10 @@
"title": "mysql",
"path": "commands/connect/mysql"
},
{
"title": "cassandra",
"path": "commands/connect/cassandra"
},
{
"title": "postgres",
"path": "commands/connect/postgres"

Loading…
Cancel
Save