From 23156afa116f61560cd0c6762f7d1526d0204924 Mon Sep 17 00:00:00 2001 From: Jeff Mitchell Date: Thu, 3 Sep 2020 13:24:34 -0400 Subject: [PATCH] Add in most of the proxy flow (#326) --- ci/goinstall.sh | 2 +- globals/globals.go | 1 + go.mod | 5 +- go.sum | 46 +- internal/cmd/base/listener.go | 2 +- internal/cmd/commands.go | 6 + .../cmd/commands/controller/controller.go | 2 +- internal/cmd/commands/dev/dev.go | 2 +- internal/cmd/commands/proxy/proxy.go | 273 +++++++++++ internal/cmd/commands/worker/worker.go | 2 +- internal/cmd/config/config.go | 2 +- internal/cmd/config/config_test.go | 2 +- internal/gen/controller.swagger.json | 57 +++ .../api/services/worker_service.pb.go | 425 ++++++++++++++++-- internal/kms/const.go | 2 +- .../api/services/v1/worker_service.proto | 44 ++ internal/servers/controller/controller.go | 2 + internal/servers/controller/handler.go | 117 ++++- .../handlers/workers/worker_service.go | 74 ++- internal/servers/controller/listeners.go | 6 +- .../servers/controller/worker_tls_config.go | 3 +- internal/servers/repository.go | 15 +- .../servers/worker/controller_connection.go | 28 +- internal/servers/worker/handler.go | 67 ++- internal/servers/worker/job.go | 84 ++++ internal/servers/worker/listeners.go | 104 ++++- internal/servers/worker/status.go | 74 +-- internal/servers/worker/tcp_proxy.go | 39 ++ internal/servers/worker/testing.go | 17 +- internal/servers/worker/worker.go | 12 +- internal/sessions/util.go | 37 ++ 31 files changed, 1370 insertions(+), 182 deletions(-) create mode 100644 internal/cmd/commands/proxy/proxy.go create mode 100644 internal/servers/worker/job.go create mode 100644 internal/servers/worker/tcp_proxy.go create mode 100644 internal/sessions/util.go diff --git a/ci/goinstall.sh b/ci/goinstall.sh index 3fe52f0693..b2146ae1cb 100755 --- a/ci/goinstall.sh +++ b/ci/goinstall.sh @@ -1,7 +1,7 @@ #!/bin/bash set -e -VERSION="1.14" +VERSION="1.15.1" [ -z "$GOROOT" ] && GOROOT="$HOME/.go" [ -z "$GOPATH" ] && GOPATH="$HOME/go" diff --git a/globals/globals.go b/globals/globals.go index 42dce34306..2c949d2ca8 100644 --- a/globals/globals.go +++ b/globals/globals.go @@ -10,6 +10,7 @@ import "time" // only ever be set at startup, but simply available to reference from anywhere. const ( + TcpProxyV1 = "boundary-tcp-proxy-v1" TokenEncryptionVersion = "1" ) diff --git a/go.mod b/go.mod index b5a41252d6..ffc6aa58e6 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/hashicorp/boundary -go 1.13 +go 1.15 require ( github.com/armon/go-metrics v0.3.4 @@ -19,7 +19,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.0.0-beta.4.0.20200813083929-9bdca3d79b97 github.com/hashicorp/dbassert v0.0.0-20200602142727-a6709b436ad6 github.com/hashicorp/errwrap v1.0.0 - github.com/hashicorp/go-alpnmux v0.0.0-20200513011953-0293f5d23c31 + github.com/hashicorp/go-alpnmux v0.0.0-20200822022353-14527cfc58e5 github.com/hashicorp/go-cleanhttp v0.5.1 github.com/hashicorp/go-hclog v0.14.1 github.com/hashicorp/go-kms-wrapping v0.5.14 @@ -59,4 +59,5 @@ require ( google.golang.org/genproto v0.0.0-20200813001606-1ccf2a5ae4fd google.golang.org/grpc v1.31.0 google.golang.org/protobuf v1.25.0 + nhooyr.io/websocket v1.8.6 ) diff --git a/go.sum b/go.sum index 75c22d65e7..0271f5eac4 100644 --- a/go.sum +++ b/go.sum @@ -117,7 +117,6 @@ github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-metrics v0.3.0/go.mod h1:zXjbSimjXTd7vOpY8B0/2LpvNvDoXBuplAD+gJD3GYs= -github.com/armon/go-metrics v0.3.3 h1:a9F4rlj7EWWrbj7BYw8J8+x+ZZkJeqzNyRk8hdPF+ro= github.com/armon/go-metrics v0.3.3/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= github.com/armon/go-metrics v0.3.4 h1:Xqf+7f2Vhl9tsqDYmXhnXInUdcrtgpRNpIA15/uldSc= github.com/armon/go-metrics v0.3.4/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= @@ -149,7 +148,6 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= -github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= @@ -281,6 +279,10 @@ github.com/fsouza/fake-gcs-server v1.17.0/go.mod h1:D1rTE4YCyHFNa99oyJJ5HyclvN/0 github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 h1:Mn26/9ZMNWSw9C9ERFA1PUxfmGpolnw2v0bKOREu5ew= github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32/go.mod h1:GIjDIg/heH5DOkXY3YJ/wNhfHsQHoXGjl8G8amsYQ1I= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= +github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= github.com/go-asn1-ber/asn1-ber v1.3.1/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= @@ -365,6 +367,14 @@ github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2K github.com/go-openapi/validate v0.19.3/go.mod h1:90Vh6jjkTn+OT1Eefm0ZixWNFjhtOH7vS9k0lo6zwJo= github.com/go-openapi/validate v0.19.10 h1:tG3SZ5DC5KF4cyt7nqLVcQXGj5A7mpaYkAcNPlDK+Yk= github.com/go-openapi/validate v0.19.10/go.mod h1:RKEZTUWDkxKQxN2jDT7ZnZi2bhZlbNMAuKvKB+IaGx8= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= +github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= @@ -403,6 +413,12 @@ github.com/gobuffalo/packd v0.1.0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWe github.com/gobuffalo/packr/v2 v2.0.9/go.mod h1:emmyGweYTm6Kdper+iywB6YK5YzuKchGtJQZ0Odn4pQ= github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0= github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= +github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= +github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= +github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= github.com/gocql/gocql v0.0.0-20190301043612-f6df8288f9b4/go.mod h1:4Fw1eo5iaEhDUs8XyuhSVCVy52Jq3L+/3GJgYkwc+/0= github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/godbus/dbus v4.1.0+incompatible h1:WqqLRTsQic3apZUK9qC5sGNfXthmPXzUZ7nQPrNITa4= @@ -499,11 +515,12 @@ github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7 github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.0/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= -github.com/grpc-ecosystem/grpc-gateway v1.9.0 h1:bM6ZAFZmc/wPFaRDi0d5L7hGEZEx/2u+Tmr2evNHDiI= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1QAp/SlnNrZhI= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= @@ -518,8 +535,8 @@ github.com/hashicorp/dbassert v0.0.0-20200602142727-a6709b436ad6 h1:Nejn3Kli0VXW github.com/hashicorp/dbassert v0.0.0-20200602142727-a6709b436ad6/go.mod h1:+B5eZq7vXqN4Gxkgjm4ub1bwG6OTcYzFL0r+bMvWorg= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/go-alpnmux v0.0.0-20200513011953-0293f5d23c31 h1:pxqI71/0R1WIASjQEJ9W9skCKYiREEkRoXFvHCZH1pg= -github.com/hashicorp/go-alpnmux v0.0.0-20200513011953-0293f5d23c31/go.mod h1:KvpteZzIafT4tRAuQ9vVRBgZyqeVCS2B2177fNAyEZc= +github.com/hashicorp/go-alpnmux v0.0.0-20200822022353-14527cfc58e5 h1:2YnwbJbobkTULUOhJ77seiSGqd7tD3V7zFXn7u5666k= +github.com/hashicorp/go-alpnmux v0.0.0-20200822022353-14527cfc58e5/go.mod h1:KvpteZzIafT4tRAuQ9vVRBgZyqeVCS2B2177fNAyEZc= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= @@ -533,7 +550,6 @@ github.com/hashicorp/go-hclog v0.14.1/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39 github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-immutable-radix v1.1.0 h1:vN9wG1D6KG6YHRTWr8512cxGOVgTMEfgEdSj/hr8MPc= github.com/hashicorp/go-immutable-radix v1.1.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= -github.com/hashicorp/go-kms-wrapping v0.5.12 h1:4zZCaLqOob5moaAmpS6ZtGZYm4yOcqvmt2lO+zNXHls= github.com/hashicorp/go-kms-wrapping v0.5.12/go.mod h1:yVIWtGOTh/cdGc++/NOlXLus0hJ19Lz4iFrpF6WsZh4= github.com/hashicorp/go-kms-wrapping v0.5.14 h1:XtNZYE9n4ypz9CnhQc/5Vd6ovGqbHO1lQ3IVPc2TV+w= github.com/hashicorp/go-kms-wrapping v0.5.14/go.mod h1:hKJ7tS+eMXOLxwFs7mYJtPRQtT/rGtwqE6awY3JATCw= @@ -547,7 +563,6 @@ github.com/hashicorp/go-plugin v1.2.0 h1:CUfYokW0EJNDcGecVrHZK//Cp1GFlHwoqtcUIEi github.com/hashicorp/go-plugin v1.2.0/go.mod h1:F9eH4LrE/ZsRdbwhfjs9k9HoDUwAHnYtXdgmf1AVNs0= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-retryablehttp v0.6.2/go.mod h1:gEx6HMUGxYYhJScX7W1Il64m6cc2C1mDaW3NQ9sY1FY= -github.com/hashicorp/go-retryablehttp v0.6.6 h1:HJunrbHTDDbBb/ay4kxa1n+dLmttUlnP3V9oNE4hmsM= github.com/hashicorp/go-retryablehttp v0.6.6/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY= github.com/hashicorp/go-retryablehttp v0.6.7 h1:8/CAEZt/+F7kR7GevNHulKkUjLht3CPmn7egmhieNKo= github.com/hashicorp/go-retryablehttp v0.6.7/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY= @@ -581,7 +596,6 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J github.com/hashicorp/shared-secure-libs v0.0.2 h1:+izKlB8vxpBK+K6zNmB4DZtDEl1a7fB8JmqdBNhnhjs= github.com/hashicorp/shared-secure-libs v0.0.2/go.mod h1:xhtA0FH6AYYFOy0sir7u0O0zzdLi7ofU6oWxy+gjnuc= github.com/hashicorp/vault/api v1.0.5-0.20200514164214-89b1987e38c2/go.mod h1:P8A7gn1a6j/7qQ4zOaWI6FC/Q9jQWEBRTOwZH33tC9o= -github.com/hashicorp/vault/api v1.0.5-0.20200519221902-385fac77e20f h1:PYtnlUZzFSZxPcq7mYp5oC9N+BcJ8IKYf6/EG0GHM2Y= github.com/hashicorp/vault/api v1.0.5-0.20200519221902-385fac77e20f/go.mod h1:euTFbi2YJgwcju3imEt919lhJKF68nN1cQPq3aA+kBE= github.com/hashicorp/vault/api v1.0.5-0.20200805123347-1ef507638af6 h1:ChHpobQse3kjVX7GmbpUFVqaMTf4lb1gcLrJltsUmS4= github.com/hashicorp/vault/api v1.0.5-0.20200805123347-1ef507638af6/go.mod h1:R3Umvhlxi2TN7Ex2hzOowyeNb+SfbVWI973N+ctaFMk= @@ -600,7 +614,6 @@ github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174/go.mod h1:DqJ97dSdRW github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huaweicloud/golangsdk v0.0.0-20200304081349-45ec0797f2a4/go.mod h1:WQBcHRNX9shz3928lWEvstQJtAtYI7ks6XlgtRT9Tcw= github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg= -github.com/iancoleman/strcase v0.0.0-20180726023541-3605ed457bf7 h1:ux/56T2xqZO/3cP1I2F86qpeoYPCOzk+KF/UH/Ar+lk= github.com/iancoleman/strcase v0.0.0-20180726023541-3605ed457bf7/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE= github.com/iancoleman/strcase v0.1.0 h1:Lar8rut26AXkJUmVOb2bRsFGv//+tJBeJLxXvpZpF1Q= github.com/iancoleman/strcase v0.1.0/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE= @@ -693,7 +706,6 @@ github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -714,6 +726,7 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.10.10 h1:a/y8CglcM7gLGYmlbP/stPE5sR3hbhFRUjCBfd/0B3I= github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/pgzip v1.2.4 h1:TQ7CNpYKovDOmqzRHKxJh0BeaBI7UdQZYc6p7pMQh1A= @@ -736,6 +749,8 @@ github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= @@ -915,7 +930,6 @@ github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= -github.com/prometheus/client_golang v1.4.0 h1:YVIb/fVcOTMSqtqZWSKnHpSLBxu8DKgxq8z6RuBZwqI= github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_golang v1.7.1 h1:NTGy1Ja9pByO+xAeH/qiWnLrKtr3hJPNjaVUwnjpdpA= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= @@ -932,7 +946,6 @@ github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= -github.com/prometheus/common v0.9.1 h1:KOMtN28tlbam3/7ZKEYKHhKoJZYYj3gMH4uc62x7X7U= github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.11.1 h1:0ZISXCMRuCZcxF77aT1BXY5m74mX2vrGYl1dSwBI0Jo= @@ -943,7 +956,6 @@ github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= -github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= @@ -960,7 +972,6 @@ github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OK github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= -github.com/ryanuber/columnize v2.1.0+incompatible h1:j1Wcmh8OrK4Q7GXY+V7SVSY8nUWQxHW5TkBe7YUl+2s= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/go-glob v1.0.0 h1:iQh3xXAumdQ+4Ufa5b25cRpC5TYKlno6hsv6Cb3pkBk= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= @@ -1036,6 +1047,10 @@ github.com/toqueteos/webbrowser v1.2.0 h1:tVP/gpK69Fx+qMJKsLE7TD8LuGWPnEV71wBN9r github.com/toqueteos/webbrowser v1.2.0/go.mod h1:XWoZq4cyp9WeUeak7w7LXRUQf1F1ATJMir8RTqb4ayM= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926 h1:G3dpKMzFDjgEh2q1Z7zUUtKa8ViPtH+ocF0bE0g00O8= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= @@ -1437,7 +1452,6 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0 h1:T7P4R73V3SSDPhH7WW7ATbfViLtmamH0DKrP3f9AuDI= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200527211525-6c9e30c09db2 h1:KNluVV5ay+orsSPJ6XTpwJQ8qBhrBkOTmtBFGeDlBcY= google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200527211525-6c9e30c09db2/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -1519,6 +1533,8 @@ modernc.org/ql v1.0.0/go.mod h1:xGVyrLIatPcO2C1JvI/Co8c0sr6y91HKFNy4pt9JXEY= modernc.org/sortutil v1.1.0/go.mod h1:ZyL98OQHJgH9IEfN71VsamvJgrtRX9Dj2gX+vH86L1k= modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= modernc.org/zappy v1.0.0/go.mod h1:hHe+oGahLVII/aTTyWK/b53VDHMAGCBYYeZ9sn83HC4= +nhooyr.io/websocket v1.8.6 h1:s+C3xAMLwGmlI31Nyn/eAehUlZPwfYZu2JXM621Q5/k= +nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/internal/cmd/base/listener.go b/internal/cmd/base/listener.go index 3e80e894ed..95928f73d1 100644 --- a/internal/cmd/base/listener.go +++ b/internal/cmd/base/listener.go @@ -65,7 +65,7 @@ func tcpListenerFactory(l *configutil.Listener, logger hclog.Logger, ui cli.Ui) switch l.Purpose[0] { case "cluster": l.Address = "127.0.0.1:9201" - case "worker-alpn-tls": + case "proxy": l.Address = "127.0.0.1:9202" default: l.Address = "127.0.0.1:9200" diff --git a/internal/cmd/commands.go b/internal/cmd/commands.go index 653fee0091..b36724f126 100644 --- a/internal/cmd/commands.go +++ b/internal/cmd/commands.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/boundary/internal/cmd/commands/hostcatalogs" "github.com/hashicorp/boundary/internal/cmd/commands/hosts" "github.com/hashicorp/boundary/internal/cmd/commands/hostsets" + "github.com/hashicorp/boundary/internal/cmd/commands/proxy" "github.com/hashicorp/boundary/internal/cmd/commands/roles" "github.com/hashicorp/boundary/internal/cmd/commands/scopes" "github.com/hashicorp/boundary/internal/cmd/commands/users" @@ -59,6 +60,11 @@ func initCommands(ui, serverCmdUi cli.Ui, runOpts *RunOptions) { SigUSR2Ch: MakeSigUSR2Ch(), }, nil }, + "proxy": func() (cli.Command, error) { + return &proxy.Command{ + Command: base.NewCommand(ui), + }, nil + }, "authenticate": func() (cli.Command, error) { return &authenticate.Command{ diff --git a/internal/cmd/commands/controller/controller.go b/internal/cmd/commands/controller/controller.go index affe9538fb..364ccf151f 100644 --- a/internal/cmd/commands/controller/controller.go +++ b/internal/cmd/commands/controller/controller.go @@ -253,7 +253,7 @@ func (c *Command) Run(args []string) int { foundCluster = true case "api": foundAPI = true - case "worker-alpn-tls": + case "proxy": // Do nothing, in a dev mode we might see it here default: c.UI.Error(fmt.Sprintf("Unknown listener purpose %q", lnConfig.Purpose[0])) diff --git a/internal/cmd/commands/dev/dev.go b/internal/cmd/commands/dev/dev.go index 77f7bbc53a..2746ca7a81 100644 --- a/internal/cmd/commands/dev/dev.go +++ b/internal/cmd/commands/dev/dev.go @@ -235,7 +235,7 @@ func (c *Command) Run(args []string) int { c.Info["[Recovery] AEAD Key Bytes"] = devConfig.Controller.DevRecoveryKey // Initialize the listeners - if err := c.SetupListeners(c.UI, devConfig.SharedConfig, []string{"api", "cluster", "worker-alpn-tls"}); err != nil { + if err := c.SetupListeners(c.UI, devConfig.SharedConfig, []string{"api", "cluster", "proxy"}); err != nil { c.UI.Error(err.Error()) return 1 } diff --git a/internal/cmd/commands/proxy/proxy.go b/internal/cmd/commands/proxy/proxy.go new file mode 100644 index 0000000000..f6ad37b1d2 --- /dev/null +++ b/internal/cmd/commands/proxy/proxy.go @@ -0,0 +1,273 @@ +package proxy + +import ( + "crypto/ed25519" + "crypto/tls" + "crypto/x509" + "encoding/base64" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "os" + "strings" + "sync" + + "github.com/golang/protobuf/proto" + "github.com/hashicorp/boundary/globals" + "github.com/hashicorp/boundary/internal/cmd/base" + "github.com/hashicorp/boundary/internal/gen/controller/api/services" + "github.com/hashicorp/go-cleanhttp" + "github.com/mitchellh/cli" + "github.com/posener/complete" + "nhooyr.io/websocket" +) + +var _ cli.Command = (*Command)(nil) +var _ cli.CommandAutocomplete = (*Command)(nil) + +type Command struct { + *base.Command + + flagAuth string + flagListenAddr string + flagListenPort int + flagVerbose bool +} + +func (c *Command) Synopsis() string { + return "Launch the Boundary CLI in proxy mode" +} + +func (c *Command) Help() string { + return base.WrapForHelpText([]string{ + "Usage: boundary proxy [options] [args]", + "", + " This command allows launching the Boundary CLI in proxy mode. In this mode, the CLI expects to take in an authorization string returned from a Boundary controller. The CLI will then create a connection to a Boundary worker and ready a listening port for a local connection.", + "", + " Example:", + "", + ` $ boundary proxy -auth "UgxzX29mVEpwNUt6QlGiAQ..."`, + "", + " Please see the {{type}}s subcommand help for detailed usage information.", + }) + c.Flags().Help() +} + +func (c *Command) Flags() *base.FlagSets { + set := c.FlagSet(0) + + f := set.NewFlagSet("Proxy Options") + + f.StringVar(&base.StringVar{ + Name: "auth", + Target: &c.flagAuth, + EnvVar: "BOUNDARY_PROXY_AUTH", + Completion: complete.PredictAnything, + Usage: `The authorization string returned from the Boundary controller. If set to "-", the command will attempt to read in the authorization string from standard input.`, + }) + + f.StringVar(&base.StringVar{ + Name: "listen-addr", + Target: &c.flagListenAddr, + EnvVar: "BOUNDARY_PROXY_LISTEN_ADDR", + Completion: complete.PredictAnything, + Usage: `If set, the CLI will attempt to bind its listening address to the given value, which must be an IP address. If it cannot, the command will error. If not set, defaults to the most common IPv4 loopback address (127.0.0.1)."`, + }) + + f.IntVar(&base.IntVar{ + Name: "listen-port", + Target: &c.flagListenPort, + EnvVar: "BOUNDARY_PROXY_LISTEN_PORT", + Completion: complete.PredictAnything, + Usage: `If set, the CLI will attempt to bind its listening port to the given value. If it cannot, the command will error."`, + }) + + f.BoolVar(&base.BoolVar{ + Name: "verbose", + Target: &c.flagVerbose, + Completion: complete.PredictAnything, + Usage: "Turns on some extra verbosity in the command output.", + }) + + return set +} + +func (c *Command) AutocompleteArgs() complete.Predictor { + return complete.PredictAnything +} + +func (c *Command) AutocompleteFlags() complete.Flags { + return c.Flags().Completions() +} + +func (c *Command) Run(args []string) int { + f := c.Flags() + + if err := f.Parse(args); err != nil { + c.UI.Error(err.Error()) + return 1 + } + + if c.flagListenAddr == "" { + c.flagListenAddr = "127.0.0.1" + } + listenAddr := net.ParseIP(c.flagListenAddr) + if listenAddr == nil { + c.UI.Error(fmt.Sprintf("Could not successfully parse listen address of %s", c.flagListenAddr)) + return 1 + } + + if c.flagAuth == "-" { + authBytes, err := ioutil.ReadAll(os.Stdin) + if err != nil { + c.UI.Error(fmt.Errorf("No authorization string was provided and encountered the following error attempting to read it from stdin: %w", err).Error()) + return 1 + } + if len(authBytes) == 0 { + c.UI.Error("No authorization data read from stdin") + return 1 + } + c.flagAuth = string(authBytes) + } + + marshaled, err := base64.RawStdEncoding.DecodeString(c.flagAuth) + if err != nil { + c.UI.Error(fmt.Errorf("Unable to decode authorization string: %w", err).Error()) + return 1 + } + + sessionInfo := new(services.ValidateSessionResponse) + if err := proto.Unmarshal(marshaled, sessionInfo); err != nil { + c.UI.Error(fmt.Errorf("Unable to proto-decode authorization string: %w", err).Error()) + return 1 + } + + if len(sessionInfo.GetWorkerInfo()) == 0 { + c.UI.Error("No workers found in authorization string") + return 1 + } + + parsedCert, err := x509.ParseCertificate(sessionInfo.Certificate) + if err != nil { + c.UI.Error(fmt.Errorf("Unable to decode mTLS certificate: %w", err).Error()) + return 1 + } + + if len(parsedCert.DNSNames) != 1 { + c.UI.Error(fmt.Errorf("mTLS certificate has invalid parameters: %w", err).Error()) + return 1 + } + + certPool := x509.NewCertPool() + certPool.AddCert(parsedCert) + + tlsConf := &tls.Config{ + Certificates: []tls.Certificate{ + { + Certificate: [][]byte{sessionInfo.Certificate}, + PrivateKey: ed25519.PrivateKey(sessionInfo.PrivateKey), + Leaf: parsedCert, + }, + }, + RootCAs: certPool, + ServerName: parsedCert.DNSNames[0], + MinVersion: tls.VersionTLS13, + } + + transport := cleanhttp.DefaultTransport() + transport.DisableKeepAlives = false + transport.TLSClientConfig = tlsConf + + listener, err := net.ListenTCP("tcp", &net.TCPAddr{ + IP: listenAddr, + Port: c.flagListenPort, + }) + if err != nil { + c.UI.Error(fmt.Errorf("Error starting listening port: %w", err).Error()) + return 1 + } + c.UI.Info(fmt.Sprintf("%s", listener.Addr().String())) + + workerAddr := sessionInfo.GetWorkerInfo()[0].GetAddress() + + conn, resp, err := websocket.Dial( + c.Context, + fmt.Sprintf("wss://%s/v1/proxy", workerAddr), + &websocket.DialOptions{ + HTTPClient: &http.Client{ + Transport: transport, + }, + Subprotocols: []string{globals.TcpProxyV1}, + }, + ) + if err != nil { + switch { + case strings.Contains(err.Error(), "tls: internal error"): + c.UI.Error("Session is unauthorized") + case strings.Contains(err.Error(), "connect: connection refused"): + c.UI.Error(fmt.Sprintf("Unable to connect to worker at %s", workerAddr)) + default: + c.UI.Error(fmt.Errorf("Error dialing the worker: %w", err).Error()) + } + return 1 + } + + if resp == nil { + c.UI.Error("Response from worker is nil") + return 1 + } + if resp.Header == nil { + c.UI.Error("Response header is nil") + return 1 + } + negProto := resp.Header.Get("Sec-WebSocket-Protocol") + if negProto != globals.TcpProxyV1 { + c.UI.Error(fmt.Sprintf("Unexpected negotiated protocol: %s", negProto)) + return 1 + } + + // Get a wrapped net.Conn so we can use io.Copy + netConn := websocket.NetConn(c.Context, conn, websocket.MessageBinary) + + // Allow closing the listener from Ctrl-C + go func() { + <-c.Context.Done() + listener.Close() + }() + + listeningConn, err := listener.AcceptTCP() + listener.Close() + if err != nil { + select { + case <-c.Context.Done(): + return 0 + default: + c.UI.Error(fmt.Errorf("Error accepting connection: %w", err).Error()) + return 1 + } + } + + connWg := new(sync.WaitGroup) + connWg.Add(2) + go func() { + defer connWg.Done() + _, err := io.Copy(netConn, listeningConn) + if c.flagVerbose { + c.UI.Info(fmt.Sprintf("copy from client to endpoint done, error: %v", err)) + } + netConn.Close() + listeningConn.Close() + }() + go func() { + defer connWg.Done() + _, err := io.Copy(listeningConn, netConn) + if c.flagVerbose { + c.UI.Info(fmt.Sprintf("copy from endpoint to client done, error: %v", err)) + } + listeningConn.Close() + netConn.Close() + }() + connWg.Wait() + return 0 +} diff --git a/internal/cmd/commands/worker/worker.go b/internal/cmd/commands/worker/worker.go index 71aeddd639..ec09aa6a53 100644 --- a/internal/cmd/commands/worker/worker.go +++ b/internal/cmd/commands/worker/worker.go @@ -188,7 +188,7 @@ func (c *Command) Run(args []string) int { "in a Docker container, provide the IPC_LOCK cap to the container.")) } - if err := c.SetupListeners(c.UI, c.Config.SharedConfig, []string{"worker-alpn-tls"}); err != nil { + if err := c.SetupListeners(c.UI, c.Config.SharedConfig, []string{"proxy"}); err != nil { c.UI.Error(err.Error()) return 1 } diff --git a/internal/cmd/config/config.go b/internal/cmd/config/config.go index 46028ef8f8..21a6f50497 100644 --- a/internal/cmd/config/config.go +++ b/internal/cmd/config/config.go @@ -69,7 +69,7 @@ listener "tcp" { devWorkerExtraConfig = ` listener "tcp" { - purpose = "worker-alpn-tls" + purpose = "proxy" tls_disable = true proxy_protocol_behavior = "allow_authorized" proxy_protocol_authorized_addrs = "127.0.0.1" diff --git a/internal/cmd/config/config_test.go b/internal/cmd/config/config_test.go index b2318b79d1..6994f408e9 100644 --- a/internal/cmd/config/config_test.go +++ b/internal/cmd/config/config_test.go @@ -115,7 +115,7 @@ func TestDevWorker(t *testing.T) { { Type: "tcp", TLSDisable: true, - Purpose: []string{"worker-alpn-tls"}, + Purpose: []string{"proxy"}, ProxyProtocolBehavior: "allow_authorized", ProxyProtocolAuthorizedAddrs: []*sockaddr.SockAddrMarshaler{ {SockAddr: addr}, diff --git a/internal/gen/controller.swagger.json b/internal/gen/controller.swagger.json index a3b648add9..3625ac3ca8 100644 --- a/internal/gen/controller.swagger.json +++ b/internal/gen/controller.swagger.json @@ -5812,6 +5812,63 @@ } } }, + "controller.api.services.v1.ValidateSessionResponse": { + "type": "object", + "properties": { + "id": { + "type": "string", + "title": "ID of the session job" + }, + "scope_id": { + "type": "string", + "title": "The scope the job was created in" + }, + "user_id": { + "type": "string", + "title": "The ID of the user that requested the session" + }, + "type": { + "type": "string", + "title": "Type of the session (e.g. tcp, ssh, etc.)" + }, + "endpoint": { + "type": "string", + "title": "The endpoint to connect to, in a manner that makes sense for the type" + }, + "certificate": { + "type": "string", + "format": "byte", + "description": "The certificate to use when connecting (or if using custom certs, to\nserve as the \"login\"). Raw DER bytes." + }, + "private_key": { + "type": "string", + "format": "byte", + "description": "The private key to use when connecting (or if using custom certs, to pass\nas the \"password\")." + }, + "expiration_time": { + "type": "string", + "format": "date-time", + "title": "After this time the connection will be expired, e.g. forcefully terminated" + }, + "worker_info": { + "type": "array", + "items": { + "$ref": "#/definitions/controller.api.services.v1.WorkerInfo" + }, + "description": "Worker information. The first worker in the slice should be prioritized." + } + }, + "title": "SessionResponse contains information necessary for a client to establish a session" + }, + "controller.api.services.v1.WorkerInfo": { + "type": "object", + "properties": { + "address": { + "type": "string", + "title": "The address of the worker" + } + } + }, "controller.servers.v1.Server": { "type": "object", "properties": { diff --git a/internal/gen/controller/api/services/worker_service.pb.go b/internal/gen/controller/api/services/worker_service.pb.go index 9807d34757..603882a352 100644 --- a/internal/gen/controller/api/services/worker_service.pb.go +++ b/internal/gen/controller/api/services/worker_service.pb.go @@ -9,6 +9,7 @@ package services import ( context "context" proto "github.com/golang/protobuf/proto" + timestamp "github.com/golang/protobuf/ptypes/timestamp" servers "github.com/hashicorp/boundary/internal/servers" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" @@ -150,6 +151,225 @@ func (x *StatusResponse) GetCancelJobIds() []string { return nil } +type ValidateSessionRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The session ID from the client + Id string `protobuf:"bytes,10,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *ValidateSessionRequest) Reset() { + *x = ValidateSessionRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_controller_api_services_v1_worker_service_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ValidateSessionRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ValidateSessionRequest) ProtoMessage() {} + +func (x *ValidateSessionRequest) ProtoReflect() protoreflect.Message { + mi := &file_controller_api_services_v1_worker_service_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ValidateSessionRequest.ProtoReflect.Descriptor instead. +func (*ValidateSessionRequest) Descriptor() ([]byte, []int) { + return file_controller_api_services_v1_worker_service_proto_rawDescGZIP(), []int{2} +} + +func (x *ValidateSessionRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +// SessionResponse contains information necessary for a client to establish a session +type ValidateSessionResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // ID of the session job + Id string `protobuf:"bytes,10,opt,name=id,proto3" json:"id,omitempty"` + // The scope the job was created in + ScopeId string `protobuf:"bytes,20,opt,name=scope_id,json=scopeId,proto3" json:"scope_id,omitempty"` + // The ID of the user that requested the session + UserId string `protobuf:"bytes,30,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + // Type of the session (e.g. tcp, ssh, etc.) + Type string `protobuf:"bytes,40,opt,name=type,proto3" json:"type,omitempty"` + // The endpoint to connect to, in a manner that makes sense for the type + Endpoint string `protobuf:"bytes,50,opt,name=endpoint,proto3" json:"endpoint,omitempty"` + // The certificate to use when connecting (or if using custom certs, to + // serve as the "login"). Raw DER bytes. + Certificate []byte `protobuf:"bytes,60,opt,name=certificate,proto3" json:"certificate,omitempty"` + // The private key to use when connecting (or if using custom certs, to pass + // as the "password"). + PrivateKey []byte `protobuf:"bytes,70,opt,name=private_key,json=privateKey,proto3" json:"private_key,omitempty"` + // After this time the connection will be expired, e.g. forcefully terminated + ExpirationTime *timestamp.Timestamp `protobuf:"bytes,80,opt,name=expiration_time,json=expirationTime,proto3" json:"expiration_time,omitempty"` + // Worker information. The first worker in the slice should be prioritized. + WorkerInfo []*WorkerInfo `protobuf:"bytes,90,rep,name=worker_info,json=workerInfo,proto3" json:"worker_info,omitempty"` +} + +func (x *ValidateSessionResponse) Reset() { + *x = ValidateSessionResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_controller_api_services_v1_worker_service_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ValidateSessionResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ValidateSessionResponse) ProtoMessage() {} + +func (x *ValidateSessionResponse) ProtoReflect() protoreflect.Message { + mi := &file_controller_api_services_v1_worker_service_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ValidateSessionResponse.ProtoReflect.Descriptor instead. +func (*ValidateSessionResponse) Descriptor() ([]byte, []int) { + return file_controller_api_services_v1_worker_service_proto_rawDescGZIP(), []int{3} +} + +func (x *ValidateSessionResponse) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *ValidateSessionResponse) GetScopeId() string { + if x != nil { + return x.ScopeId + } + return "" +} + +func (x *ValidateSessionResponse) GetUserId() string { + if x != nil { + return x.UserId + } + return "" +} + +func (x *ValidateSessionResponse) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +func (x *ValidateSessionResponse) GetEndpoint() string { + if x != nil { + return x.Endpoint + } + return "" +} + +func (x *ValidateSessionResponse) GetCertificate() []byte { + if x != nil { + return x.Certificate + } + return nil +} + +func (x *ValidateSessionResponse) GetPrivateKey() []byte { + if x != nil { + return x.PrivateKey + } + return nil +} + +func (x *ValidateSessionResponse) GetExpirationTime() *timestamp.Timestamp { + if x != nil { + return x.ExpirationTime + } + return nil +} + +func (x *ValidateSessionResponse) GetWorkerInfo() []*WorkerInfo { + if x != nil { + return x.WorkerInfo + } + return nil +} + +type WorkerInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The address of the worker + Address string `protobuf:"bytes,10,opt,name=address,proto3" json:"address,omitempty"` +} + +func (x *WorkerInfo) Reset() { + *x = WorkerInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_controller_api_services_v1_worker_service_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WorkerInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WorkerInfo) ProtoMessage() {} + +func (x *WorkerInfo) ProtoReflect() protoreflect.Message { + mi := &file_controller_api_services_v1_worker_service_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WorkerInfo.ProtoReflect.Descriptor instead. +func (*WorkerInfo) Descriptor() ([]byte, []int) { + return file_controller_api_services_v1_worker_service_proto_rawDescGZIP(), []int{4} +} + +func (x *WorkerInfo) GetAddress() string { + if x != nil { + return x.Address + } + return "" +} + var File_controller_api_services_v1_worker_service_proto protoreflect.FileDescriptor var file_controller_api_services_v1_worker_service_proto_rawDesc = []byte{ @@ -157,37 +377,74 @@ var file_controller_api_services_v1_worker_service_proto_rawDesc = []byte{ 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x76, 0x31, 0x2f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1a, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x61, 0x70, - 0x69, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x1a, 0x23, 0x63, - 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x73, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x22, 0x6c, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x35, 0x0a, 0x06, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x18, 0x0a, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, - 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x65, 0x72, 0x76, - 0x65, 0x72, 0x52, 0x06, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x12, 0x24, 0x0a, 0x0e, 0x61, 0x63, - 0x74, 0x69, 0x76, 0x65, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x14, 0x20, 0x03, - 0x28, 0x09, 0x52, 0x0c, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x4a, 0x6f, 0x62, 0x49, 0x64, 0x73, - 0x22, 0x77, 0x0a, 0x0e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x3f, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, - 0x73, 0x18, 0x0a, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, - 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x76, 0x31, 0x2e, - 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, - 0x65, 0x72, 0x73, 0x12, 0x24, 0x0a, 0x0e, 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x5f, 0x6a, 0x6f, - 0x62, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x14, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x61, 0x6e, - 0x63, 0x65, 0x6c, 0x4a, 0x6f, 0x62, 0x49, 0x64, 0x73, 0x32, 0x72, 0x0a, 0x0d, 0x57, 0x6f, 0x72, - 0x6b, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x61, 0x0a, 0x06, 0x53, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x12, 0x29, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, + 0x69, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x1a, 0x1f, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, + 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x23, + 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x73, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x22, 0x6c, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x35, 0x0a, 0x06, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x18, 0x0a, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, + 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x52, 0x06, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x12, 0x24, 0x0a, 0x0e, 0x61, + 0x63, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x14, 0x20, + 0x03, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x4a, 0x6f, 0x62, 0x49, 0x64, + 0x73, 0x22, 0x77, 0x0a, 0x0e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x3f, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, + 0x72, 0x73, 0x18, 0x0a, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, + 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x76, 0x31, + 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, + 0x6c, 0x65, 0x72, 0x73, 0x12, 0x24, 0x0a, 0x0e, 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x5f, 0x6a, + 0x6f, 0x62, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x14, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x61, + 0x6e, 0x63, 0x65, 0x6c, 0x4a, 0x6f, 0x62, 0x49, 0x64, 0x73, 0x22, 0x28, 0x0a, 0x16, 0x56, 0x61, + 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x02, 0x69, 0x64, 0x22, 0xde, 0x02, 0x0a, 0x17, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, + 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, + 0x12, 0x19, 0x0a, 0x08, 0x73, 0x63, 0x6f, 0x70, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x14, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x07, 0x73, 0x63, 0x6f, 0x70, 0x65, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x75, + 0x73, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x1e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x75, 0x73, + 0x65, 0x72, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x28, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, + 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x32, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70, + 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x65, 0x72, 0x74, 0x69, 0x66, 0x69, 0x63, + 0x61, 0x74, 0x65, 0x18, 0x3c, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x63, 0x65, 0x72, 0x74, 0x69, + 0x66, 0x69, 0x63, 0x61, 0x74, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, + 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x46, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x70, 0x72, 0x69, + 0x76, 0x61, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x43, 0x0a, 0x0f, 0x65, 0x78, 0x70, 0x69, 0x72, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x50, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0e, 0x65, 0x78, + 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x47, 0x0a, 0x0b, + 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x5a, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x26, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x61, + 0x70, 0x69, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x57, + 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x65, + 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x26, 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, + 0x6e, 0x66, 0x6f, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x0a, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x32, 0xf0, 0x01, + 0x0a, 0x0d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, + 0x61, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x29, 0x2e, 0x63, 0x6f, 0x6e, 0x74, + 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, - 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x2a, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x61, 0x70, 0x69, - 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x4d, 0x5a, - 0x4b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, - 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72, 0x79, 0x2f, 0x69, - 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x74, - 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, - 0x63, 0x65, 0x73, 0x3b, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x00, 0x12, 0x7c, 0x0a, 0x0f, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x53, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x32, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, + 0x65, 0x72, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, + 0x76, 0x31, 0x2e, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, + 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x33, 0x2e, 0x63, 0x6f, 0x6e, 0x74, + 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x53, + 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x42, 0x4d, 0x5a, 0x4b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, + 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72, + 0x79, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x63, + 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x3b, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -202,22 +459,30 @@ func file_controller_api_services_v1_worker_service_proto_rawDescGZIP() []byte { return file_controller_api_services_v1_worker_service_proto_rawDescData } -var file_controller_api_services_v1_worker_service_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_controller_api_services_v1_worker_service_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_controller_api_services_v1_worker_service_proto_goTypes = []interface{}{ - (*StatusRequest)(nil), // 0: controller.api.services.v1.StatusRequest - (*StatusResponse)(nil), // 1: controller.api.services.v1.StatusResponse - (*servers.Server)(nil), // 2: controller.servers.v1.Server + (*StatusRequest)(nil), // 0: controller.api.services.v1.StatusRequest + (*StatusResponse)(nil), // 1: controller.api.services.v1.StatusResponse + (*ValidateSessionRequest)(nil), // 2: controller.api.services.v1.ValidateSessionRequest + (*ValidateSessionResponse)(nil), // 3: controller.api.services.v1.ValidateSessionResponse + (*WorkerInfo)(nil), // 4: controller.api.services.v1.WorkerInfo + (*servers.Server)(nil), // 5: controller.servers.v1.Server + (*timestamp.Timestamp)(nil), // 6: google.protobuf.Timestamp } var file_controller_api_services_v1_worker_service_proto_depIdxs = []int32{ - 2, // 0: controller.api.services.v1.StatusRequest.worker:type_name -> controller.servers.v1.Server - 2, // 1: controller.api.services.v1.StatusResponse.controllers:type_name -> controller.servers.v1.Server - 0, // 2: controller.api.services.v1.WorkerService.Status:input_type -> controller.api.services.v1.StatusRequest - 1, // 3: controller.api.services.v1.WorkerService.Status:output_type -> controller.api.services.v1.StatusResponse - 3, // [3:4] is the sub-list for method output_type - 2, // [2:3] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 5, // 0: controller.api.services.v1.StatusRequest.worker:type_name -> controller.servers.v1.Server + 5, // 1: controller.api.services.v1.StatusResponse.controllers:type_name -> controller.servers.v1.Server + 6, // 2: controller.api.services.v1.ValidateSessionResponse.expiration_time:type_name -> google.protobuf.Timestamp + 4, // 3: controller.api.services.v1.ValidateSessionResponse.worker_info:type_name -> controller.api.services.v1.WorkerInfo + 0, // 4: controller.api.services.v1.WorkerService.Status:input_type -> controller.api.services.v1.StatusRequest + 2, // 5: controller.api.services.v1.WorkerService.ValidateSession:input_type -> controller.api.services.v1.ValidateSessionRequest + 1, // 6: controller.api.services.v1.WorkerService.Status:output_type -> controller.api.services.v1.StatusResponse + 3, // 7: controller.api.services.v1.WorkerService.ValidateSession:output_type -> controller.api.services.v1.ValidateSessionResponse + 6, // [6:8] is the sub-list for method output_type + 4, // [4:6] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_controller_api_services_v1_worker_service_proto_init() } @@ -250,6 +515,42 @@ func file_controller_api_services_v1_worker_service_proto_init() { return nil } } + file_controller_api_services_v1_worker_service_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ValidateSessionRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_controller_api_services_v1_worker_service_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ValidateSessionResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_controller_api_services_v1_worker_service_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*WorkerInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -257,7 +558,7 @@ func file_controller_api_services_v1_worker_service_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_controller_api_services_v1_worker_service_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 5, NumExtensions: 0, NumServices: 1, }, @@ -284,6 +585,7 @@ const _ = grpc.SupportPackageIsVersion6 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type WorkerServiceClient interface { Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) + ValidateSession(ctx context.Context, in *ValidateSessionRequest, opts ...grpc.CallOption) (*ValidateSessionResponse, error) } type workerServiceClient struct { @@ -303,9 +605,19 @@ func (c *workerServiceClient) Status(ctx context.Context, in *StatusRequest, opt return out, nil } +func (c *workerServiceClient) ValidateSession(ctx context.Context, in *ValidateSessionRequest, opts ...grpc.CallOption) (*ValidateSessionResponse, error) { + out := new(ValidateSessionResponse) + err := c.cc.Invoke(ctx, "/controller.api.services.v1.WorkerService/ValidateSession", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // WorkerServiceServer is the server API for WorkerService service. type WorkerServiceServer interface { Status(context.Context, *StatusRequest) (*StatusResponse, error) + ValidateSession(context.Context, *ValidateSessionRequest) (*ValidateSessionResponse, error) } // UnimplementedWorkerServiceServer can be embedded to have forward compatible implementations. @@ -315,6 +627,9 @@ type UnimplementedWorkerServiceServer struct { func (*UnimplementedWorkerServiceServer) Status(context.Context, *StatusRequest) (*StatusResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Status not implemented") } +func (*UnimplementedWorkerServiceServer) ValidateSession(context.Context, *ValidateSessionRequest) (*ValidateSessionResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ValidateSession not implemented") +} func RegisterWorkerServiceServer(s *grpc.Server, srv WorkerServiceServer) { s.RegisterService(&_WorkerService_serviceDesc, srv) @@ -338,6 +653,24 @@ func _WorkerService_Status_Handler(srv interface{}, ctx context.Context, dec fun return interceptor(ctx, in, info, handler) } +func _WorkerService_ValidateSession_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ValidateSessionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(WorkerServiceServer).ValidateSession(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/controller.api.services.v1.WorkerService/ValidateSession", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(WorkerServiceServer).ValidateSession(ctx, req.(*ValidateSessionRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _WorkerService_serviceDesc = grpc.ServiceDesc{ ServiceName: "controller.api.services.v1.WorkerService", HandlerType: (*WorkerServiceServer)(nil), @@ -346,6 +679,10 @@ var _WorkerService_serviceDesc = grpc.ServiceDesc{ MethodName: "Status", Handler: _WorkerService_Status_Handler, }, + { + MethodName: "ValidateSession", + Handler: _WorkerService_ValidateSession_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "controller/api/services/v1/worker_service.proto", diff --git a/internal/kms/const.go b/internal/kms/const.go index 96a9148f3b..ad6b4025ae 100644 --- a/internal/kms/const.go +++ b/internal/kms/const.go @@ -22,7 +22,7 @@ const ( // KeyPurposeTokens is used for token encryption KeyPurposeTokens - // KeyPurposeSessions is used for session encryption + // KeyPurposeSessions is used as a base key to derive session-specific encryption keys KeyPurposeSessions ) diff --git a/internal/proto/local/controller/api/services/v1/worker_service.proto b/internal/proto/local/controller/api/services/v1/worker_service.proto index 3a525ed0b3..7a805f489b 100644 --- a/internal/proto/local/controller/api/services/v1/worker_service.proto +++ b/internal/proto/local/controller/api/services/v1/worker_service.proto @@ -4,10 +4,12 @@ package controller.api.services.v1; option go_package = "github.com/hashicorp/boundary/internal/gen/controller/api/services;services"; +import "google/protobuf/timestamp.proto"; import "controller/servers/v1/servers.proto"; service WorkerService { rpc Status(StatusRequest) returns (StatusResponse) {} + rpc ValidateSession(ValidateSessionRequest) returns (ValidateSessionResponse) {} } message StatusRequest { @@ -31,3 +33,45 @@ message StatusResponse { // next heartbeat, and we can move the job to canceled state. repeated string cancel_job_ids = 20; } + +message ValidateSessionRequest { + // The session ID from the client + string id = 10; +} + +// SessionResponse contains information necessary for a client to establish a session +message ValidateSessionResponse { + // ID of the session job + string id = 10; + + // The scope the job was created in + string scope_id = 20; + + // The ID of the user that requested the session + string user_id = 30; + + // Type of the session (e.g. tcp, ssh, etc.) + string type = 40; + + // The endpoint to connect to, in a manner that makes sense for the type + string endpoint = 50; + + // The certificate to use when connecting (or if using custom certs, to + // serve as the "login"). Raw DER bytes. + bytes certificate = 60; + + // The private key to use when connecting (or if using custom certs, to pass + // as the "password"). + bytes private_key = 70; + + // After this time the connection will be expired, e.g. forcefully terminated + google.protobuf.Timestamp expiration_time = 80; + + // Worker information. The first worker in the slice should be prioritized. + repeated WorkerInfo worker_info = 90; +} + +message WorkerInfo { + // The address of the worker + string address = 10; +} \ No newline at end of file diff --git a/internal/servers/controller/controller.go b/internal/servers/controller/controller.go index 241fed8be9..ab0bf2b355 100644 --- a/internal/servers/controller/controller.go +++ b/internal/servers/controller/controller.go @@ -35,6 +35,7 @@ type Controller struct { // Used for testing workerStatusUpdateTimes *sync.Map + jobMap *sync.Map // Repo factory methods AuthTokenRepoFn common.AuthTokenRepoFactory @@ -54,6 +55,7 @@ func New(conf *Config) (*Controller, error) { conf: conf, logger: conf.Logger.Named("controller"), workerStatusUpdateTimes: new(sync.Map), + jobMap: new(sync.Map), } c.started.Store(false) diff --git a/internal/servers/controller/handler.go b/internal/servers/controller/handler.go index 9f115a7d5c..0ab0d3843d 100644 --- a/internal/servers/controller/handler.go +++ b/internal/servers/controller/handler.go @@ -2,8 +2,14 @@ package controller import ( "context" + "crypto/rand" + "crypto/x509" + "encoding/base64" "encoding/json" + "errors" "fmt" + "math/big" + mathrand "math/rand" "net/http" "os" "strings" @@ -14,11 +20,16 @@ import ( "github.com/hashicorp/boundary/globals" "github.com/hashicorp/boundary/internal/auth" "github.com/hashicorp/boundary/internal/gen/controller/api/services" + "github.com/hashicorp/boundary/internal/kms" + "github.com/hashicorp/boundary/internal/servers" "github.com/hashicorp/boundary/internal/servers/controller/handlers/accounts" "github.com/hashicorp/boundary/internal/servers/controller/handlers/authmethods" "github.com/hashicorp/boundary/internal/servers/controller/handlers/host_sets" "github.com/hashicorp/boundary/internal/servers/controller/handlers/targets" + "github.com/hashicorp/boundary/internal/sessions" + "github.com/hashicorp/boundary/internal/types/scope" "github.com/hashicorp/shared-secure-libs/configutil" + "github.com/hashicorp/vault/sdk/helper/base62" "github.com/hashicorp/vault/sdk/helper/strutil" "github.com/hashicorp/boundary/internal/servers/controller/handlers" @@ -31,6 +42,8 @@ import ( "github.com/hashicorp/boundary/internal/servers/controller/handlers/scopes" "github.com/hashicorp/boundary/internal/servers/controller/handlers/users" "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" ) type HandlerProperties struct { @@ -49,7 +62,7 @@ func (c *Controller) handler(props HandlerProperties) (http.Handler, error) { return nil, err } mux.Handle("/v1/", h) - + mux.Handle("/jobtesting", jobTestingHandler(c)) mux.Handle("/", handleUi(c)) corsWrappedHandler := wrapHandlerWithCors(mux, props) @@ -311,6 +324,108 @@ func wrapHandlerWithCors(h http.Handler, props HandlerProperties) http.Handler { }) } +func jobTestingHandler(c *Controller) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + errorResp := func(e error) { + w.Write([]byte(e.Error())) + w.WriteHeader(http.StatusInternalServerError) + } + + if err := r.ParseForm(); err != nil { + errorResp(err) + return + } + endpoint := r.URL.Query().Get("endpoint") + if endpoint == "" { + errorResp(errors.New("missing endpoint query param")) + return + } + + timeout := 15 * time.Second + var err error + if t := r.URL.Query().Get("timeout"); t != "" { + if timeout, err = time.ParseDuration(t); err != nil { + errorResp(fmt.Errorf("error parsing timeout: %w", err)) + return + } + } + + var workers []*services.WorkerInfo + repo, err := c.ServersRepoFn() + if err != nil { + errorResp(err) + return + } + servers, err := repo.ListServers(r.Context(), servers.ServerTypeWorker) + if err != nil { + errorResp(err) + return + } + for _, v := range servers { + workers = append(workers, &services.WorkerInfo{Address: v.Address}) + } + + wrapper, err := c.kms.GetWrapper(r.Context(), scope.Global.String(), kms.KeyPurposeSessions) + if err != nil { + errorResp(err) + return + } + jobId, err := base62.Random(10) + if err != nil { + errorResp(err) + return + } + jobId = "s_" + jobId + pubKey, privKey, err := sessions.DeriveED25519Key(wrapper, "u_1234567890", jobId) + + template := &x509.Certificate{ + ExtKeyUsage: []x509.ExtKeyUsage{ + x509.ExtKeyUsageServerAuth, + x509.ExtKeyUsageClientAuth, + }, + DNSNames: []string{jobId}, + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment | x509.KeyUsageKeyAgreement | x509.KeyUsageCertSign, + SerialNumber: big.NewInt(mathrand.Int63()), + NotBefore: time.Now().Add(-1 * time.Minute), + NotAfter: time.Now().Add(5 * time.Minute), + BasicConstraintsValid: true, + IsCA: true, + } + + certBytes, err := x509.CreateCertificate(rand.Reader, template, template, pubKey, privKey) + if err != nil { + errorResp(err) + return + } + + ret := &services.ValidateSessionResponse{ + Id: jobId, + ScopeId: scope.Global.String(), + UserId: "u_1234567890", + Type: "tcp", + Endpoint: endpoint, + Certificate: certBytes, + PrivateKey: privKey, + WorkerInfo: workers, + ExpirationTime: ×tamppb.Timestamp{Seconds: time.Now().Add(timeout).Unix()}, + } + + marshaled, err := proto.Marshal(ret) + if err != nil { + errorResp(err) + return + } + + if _, err := w.Write([]byte(base64.RawStdEncoding.EncodeToString(marshaled))); err != nil { + errorResp(err) + return + } + + ret.PrivateKey = nil + c.jobMap.Store(jobId, ret) + }) +} + /* func WrapForwardedForHandler(h http.Handler, authorizedAddrs []*sockaddr.SockAddrMarshaler, rejectNotPresent, rejectNonAuthz bool, hopSkips int) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/servers/controller/handlers/workers/worker_service.go b/internal/servers/controller/handlers/workers/worker_service.go index 2b0c5fca80..cac841ad58 100644 --- a/internal/servers/controller/handlers/workers/worker_service.go +++ b/internal/servers/controller/handlers/workers/worker_service.go @@ -6,7 +6,9 @@ import ( "time" pbs "github.com/hashicorp/boundary/internal/gen/controller/api/services" + "github.com/hashicorp/boundary/internal/kms" "github.com/hashicorp/boundary/internal/servers/controller/common" + "github.com/hashicorp/boundary/internal/sessions" "github.com/hashicorp/boundary/internal/types/resource" "github.com/hashicorp/go-hclog" "google.golang.org/grpc/codes" @@ -14,21 +16,27 @@ import ( ) type workerServiceServer struct { - logger hclog.Logger - repoFn common.ServersRepoFactory - updateTimes *sync.Map + logger hclog.Logger + repoFn common.ServersRepoFactory + updateTimes *sync.Map + kms *kms.Kms + jobMap *sync.Map + jobCancelMap *sync.Map } -func NewWorkerServiceServer(logger hclog.Logger, repoFn common.ServersRepoFactory, updateTimes *sync.Map) *workerServiceServer { +func NewWorkerServiceServer(logger hclog.Logger, repoFn common.ServersRepoFactory, updateTimes *sync.Map, kms *kms.Kms, jobMap *sync.Map) *workerServiceServer { return &workerServiceServer{ - logger: logger, - repoFn: repoFn, - updateTimes: updateTimes, + logger: logger, + repoFn: repoFn, + updateTimes: updateTimes, + kms: kms, + jobMap: jobMap, + jobCancelMap: new(sync.Map), } } func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusRequest) (*pbs.StatusResponse, error) { - ws.logger.Trace("got status request from worker", "name", req.Worker.Name, "address", req.Worker.Address) + ws.logger.Trace("got status request from worker", "name", req.Worker.Name, "address", req.Worker.Address, "active_jobs", req.ActiveJobIds) ws.updateTimes.Store(req.Worker.Name, time.Now()) repo, err := ws.repoFn() if err != nil { @@ -41,7 +49,53 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques ws.logger.Error("error storing worker status", "error", err) return &pbs.StatusResponse{}, status.Errorf(codes.Internal, "Error storing worker status: %v", err) } - return &pbs.StatusResponse{ + ret := &pbs.StatusResponse{ Controllers: controllers, - }, nil + } + ws.jobCancelMap.Range(func(key, value interface{}) bool { + ret.CancelJobIds = append(ret.CancelJobIds, key.(string)) + return true + }) + for _, id := range ret.CancelJobIds { + ws.jobCancelMap.Delete(id) + } + return ret, nil +} + +func (ws *workerServiceServer) ValidateSession(ctx context.Context, req *pbs.ValidateSessionRequest) (*pbs.ValidateSessionResponse, error) { + ws.logger.Trace("got validate session request from worker", "job_id", req.GetId()) + + // Look up the job info + storedSessionInfo, loaded := ws.jobMap.LoadAndDelete(req.GetId()) + if !loaded { + return &pbs.ValidateSessionResponse{}, status.Errorf(codes.PermissionDenied, "Unknown job ID: %v", req.GetId()) + } + sessionInfo := storedSessionInfo.(*pbs.ValidateSessionResponse) + + wrapper, err := ws.kms.GetWrapper(ctx, sessionInfo.ScopeId, kms.KeyPurposeSessions) + if err != nil { + return &pbs.ValidateSessionResponse{}, status.Errorf(codes.Internal, "Error getting sessions wrapper: %v", err) + } + + // Derive the private key, which should match. Deriving on both ends allows + // us to not store it in the DB. + _, privKey, err := sessions.DeriveED25519Key(wrapper, sessionInfo.GetUserId(), req.GetId()) + if err != nil { + return &pbs.ValidateSessionResponse{}, status.Errorf(codes.Internal, "Error deriving session key: %v", err) + } + + if sessionInfo.ExpirationTime.GetSeconds() > 0 { + timeDiff := time.Until(sessionInfo.GetExpirationTime().AsTime()) + if timeDiff < 0 { + return &pbs.ValidateSessionResponse{}, status.Errorf(codes.OutOfRange, "Session has already expired") + } + defer func() { + time.AfterFunc(timeDiff, func() { + ws.jobCancelMap.Store(req.GetId(), true) + }) + }() + } + + sessionInfo.PrivateKey = privKey + return sessionInfo, nil } diff --git a/internal/servers/controller/listeners.go b/internal/servers/controller/listeners.go index 020376f75d..3b71cae0f4 100644 --- a/internal/servers/controller/listeners.go +++ b/internal/servers/controller/listeners.go @@ -108,7 +108,7 @@ func (c *Controller) startListeners() error { // Clear out in case this is a second start of the controller ln.Mux.UnregisterProto(alpnmux.DefaultProto) l, err := ln.Mux.RegisterProto(alpnmux.DefaultProto, &tls.Config{ - GetConfigForClient: c.validateWorkerTLS, + GetConfigForClient: c.validateWorkerTls, }) if err != nil { return fmt.Errorf("error getting sub-listener for worker proto: %w", err) @@ -120,7 +120,7 @@ func (c *Controller) startListeners() error { grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), ) - services.RegisterWorkerServiceServer(workerServer, workers.NewWorkerServiceServer(c.logger.Named("worker-handler"), c.ServersRepoFn, c.workerStatusUpdateTimes)) + services.RegisterWorkerServiceServer(workerServer, workers.NewWorkerServiceServer(c.logger.Named("worker-handler"), c.ServersRepoFn, c.workerStatusUpdateTimes, c.kms, c.jobMap)) interceptor := newInterceptingListener(c, l) ln.ALPNListener = interceptor @@ -144,7 +144,7 @@ func (c *Controller) startListeners() error { } else { err = configureForCluster(ln) } - case "worker-alpn-tls": + case "proxy": // Do nothing, in a dev mode we might see it here default: err = fmt.Errorf("unknown listener purpose %q", purpose) diff --git a/internal/servers/controller/worker_tls_config.go b/internal/servers/controller/worker_tls_config.go index 2c65aac282..0396d076cd 100644 --- a/internal/servers/controller/worker_tls_config.go +++ b/internal/servers/controller/worker_tls_config.go @@ -20,7 +20,7 @@ type workerAuthEntry struct { conn net.Conn } -func (c Controller) validateWorkerTLS(hello *tls.ClientHelloInfo) (*tls.Config, error) { +func (c Controller) validateWorkerTls(hello *tls.ClientHelloInfo) (*tls.Config, error) { for _, p := range hello.SupportedProtos { switch { case strings.HasPrefix(p, "v1workerauth-"): @@ -84,7 +84,6 @@ func (c Controller) v1WorkerAuthConfig(protos []string) (*tls.Config, *base.Work NextProtos: []string{firstMatchProto}, MinVersion: tls.VersionTLS13, } - tlsConfig.BuildNameToCertificate() return tlsConfig, info, nil } diff --git a/internal/servers/repository.go b/internal/servers/repository.go index 6be5182bbb..c78d7000a8 100644 --- a/internal/servers/repository.go +++ b/internal/servers/repository.go @@ -16,6 +16,17 @@ const ( defaultLiveness = 15 * time.Second ) +type ServerType string + +const ( + ServerTypeController ServerType = "controller" + ServerTypeWorker ServerType = "worker" +) + +func (s ServerType) String() string { + return string(s) +} + // Repository is the servers database repository type Repository struct { reader db.Reader @@ -44,7 +55,7 @@ func NewRepository(r db.Reader, w db.Writer, kms *kms.Kms) (*Repository, error) // list will return a listing of resources and honor the WithLimit option or the // repo defaultLimit -func (r *Repository) ListServers(ctx context.Context, serverType string, opt ...Option) ([]*Server, error) { +func (r *Repository) ListServers(ctx context.Context, serverType ServerType, opt ...Option) ([]*Server, error) { opts := getOpts(opt...) liveness := opts.withLiveness if liveness == 0 { @@ -107,7 +118,7 @@ func (r *Repository) UpsertServer(ctx context.Context, server *Server, opt ...Op return nil, int(rowsAffected), nil } // Fetch current controllers to feed to the workers - controllers, err := r.ListServers(ctx, resource.Controller.String()) + controllers, err := r.ListServers(ctx, ServerTypeController) return controllers, len(controllers), err } diff --git a/internal/servers/worker/controller_connection.go b/internal/servers/worker/controller_connection.go index 8f815edfcb..684510dfb2 100644 --- a/internal/servers/worker/controller_connection.go +++ b/internal/servers/worker/controller_connection.go @@ -9,6 +9,7 @@ import ( "encoding/base64" "encoding/json" "encoding/pem" + "errors" "fmt" "math" "math/big" @@ -25,20 +26,6 @@ import ( "google.golang.org/protobuf/proto" ) -type controllerConnection struct { - controllerAddr string - client services.WorkerServiceClient -} - -func newControllerConnection(controllerAddr string, client services.WorkerServiceClient) *controllerConnection { - ret := &controllerConnection{ - controllerAddr: controllerAddr, - client: client, - } - - return ret -} - func (w *Worker) startControllerConnections() error { initialAddrs := make([]resolver.Address, 0, len(w.conf.RawConfig.Worker.Controllers)) for _, addr := range w.conf.RawConfig.Worker.Controllers { @@ -52,13 +39,15 @@ func (w *Worker) startControllerConnections() error { initialAddrs = append(initialAddrs, resolver.Address{Addr: fmt.Sprintf("%s:%s", host, port)}) } + if len(initialAddrs) == 0 { + return errors.New("no initial controller addresses found") + } + w.Resolver().InitialState(resolver.State{ Addresses: initialAddrs, }) - for _, addr := range initialAddrs { - if err := w.createClientConn(addr.Addr); err != nil { - return fmt.Errorf("error making client connection to controller: %w", err) - } + if err := w.createClientConn(initialAddrs[0].Addr); err != nil { + return fmt.Errorf("error making client connection to controller: %w", err) } return nil @@ -123,7 +112,7 @@ func (w *Worker) createClientConn(addr string) error { } client := services.NewWorkerServiceClient(cc) - w.controllerConns.Store(addr, newControllerConnection(addr, client)) + w.controllerConn.Store(client) w.logger.Info("connected to controller", "address", addr) return nil @@ -257,7 +246,6 @@ func (w Worker) workerAuthTLSConfig() (*tls.Config, *base.WorkerAuthInfo, error) NextProtos: nextProtos, MinVersion: tls.VersionTLS13, } - tlsConfig.BuildNameToCertificate() return tlsConfig, info, nil } diff --git a/internal/servers/worker/handler.go b/internal/servers/worker/handler.go index c215577c8e..d5c3c874cd 100644 --- a/internal/servers/worker/handler.go +++ b/internal/servers/worker/handler.go @@ -6,7 +6,9 @@ import ( "time" "github.com/hashicorp/boundary/globals" + "github.com/hashicorp/boundary/internal/gen/controller/api/services" "github.com/hashicorp/shared-secure-libs/configutil" + "nhooyr.io/websocket" ) type HandlerProperties struct { @@ -15,25 +17,67 @@ type HandlerProperties struct { // Handler returns an http.Handler for the API. This can be used on // its own to mount the Vault API within another web server. -func (c *Worker) Handler(props HandlerProperties) http.Handler { +func (w *Worker) handler(props HandlerProperties) http.Handler { // Create the muxer to handle the actual endpoints mux := http.NewServeMux() - mux.Handle("/v1/", handleDummy()) + mux.Handle("/v1/proxy", w.handleProxy()) - genericWrappedHandler := c.wrapGenericHandler(mux, props) + genericWrappedHandler := w.wrapGenericHandler(mux, props) return genericWrappedHandler } -func handleDummy() http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.Write([]byte(`{"foo": "bar"}`)) +func (w *Worker) handleProxy() http.HandlerFunc { + return http.HandlerFunc(func(wr http.ResponseWriter, r *http.Request) { + if r.TLS == nil { + w.logger.Error("no request TLS information found") + wr.WriteHeader(http.StatusInternalServerError) + return + } + jobId := r.TLS.ServerName + + jobInfoRaw, valid := w.jobInfoMap.LoadAndDelete(jobId) + if !valid { + w.logger.Error("job not found in info map", "job_id", jobId) + wr.WriteHeader(http.StatusInternalServerError) + return + } + jobInfo := jobInfoRaw.(*services.ValidateSessionResponse) + + opts := &websocket.AcceptOptions{ + Subprotocols: []string{globals.TcpProxyV1}, + } + conn, err := websocket.Accept(wr, r, opts) + if err != nil { + w.logger.Error("error during websocket upgrade", "error", err) + wr.WriteHeader(http.StatusInternalServerError) + return + } + // Later calls will cause this to noop if they return a different status + defer conn.Close(websocket.StatusNormalClosure, "done") + + connCtx, connCancel := context.WithCancel(r.Context()) + w.cancellationMap.Store(jobId, connCancel) + defer func() { + cancel, loaded := w.cancellationMap.LoadAndDelete(jobId) + if !loaded { + return + } + cancel.(context.CancelFunc)() + }() + + switch conn.Subprotocol() { + case globals.TcpProxyV1: + w.handleTcpProxyV1(connCtx, conn, jobInfo) + default: + conn.Close(websocket.StatusProtocolError, "unsupported-protocol") + return + } }) } -func (c *Worker) wrapGenericHandler(h http.Handler, props HandlerProperties) http.Handler { +func (w *Worker) wrapGenericHandler(h http.Handler, props HandlerProperties) http.Handler { var maxRequestDuration time.Duration var maxRequestSize int64 if props.ListenerConfig != nil { @@ -46,9 +90,9 @@ func (c *Worker) wrapGenericHandler(h http.Handler, props HandlerProperties) htt if maxRequestSize == 0 { maxRequestSize = globals.DefaultMaxRequestSize } - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + return http.HandlerFunc(func(wr http.ResponseWriter, r *http.Request) { // Set the Cache-Control header for all responses returned - w.Header().Set("Cache-Control", "no-store") + wr.Header().Set("Cache-Control", "no-store") // Start with the request context ctx := r.Context() @@ -62,9 +106,8 @@ func (c *Worker) wrapGenericHandler(h http.Handler, props HandlerProperties) htt ctx = context.WithValue(ctx, globals.ContextOriginalRequestPathTypeKey, r.URL.Path) r = r.WithContext(ctx) - h.ServeHTTP(w, r) + h.ServeHTTP(wr, r) cancelFunc() - return }) } diff --git a/internal/servers/worker/job.go b/internal/servers/worker/job.go new file mode 100644 index 0000000000..317b00d66f --- /dev/null +++ b/internal/servers/worker/job.go @@ -0,0 +1,84 @@ +package worker + +import ( + "context" + "crypto/ed25519" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "strings" + "time" + + "github.com/hashicorp/boundary/internal/gen/controller/api/services" +) + +const ( + validateSessionTimeout = 90 * time.Second +) + +func (w *Worker) getJobTls(hello *tls.ClientHelloInfo) (*tls.Config, error) { + var jobId string + switch { + case strings.HasPrefix(hello.ServerName, "s_"): + jobId = hello.ServerName + default: + return nil, fmt.Errorf("could not find job ID in SNI") + } + + rawConn := w.controllerConn.Load() + if rawConn == nil { + return nil, errors.New("could not get a controller client") + } + conn, ok := rawConn.(services.WorkerServiceClient) + if !ok { + return nil, errors.New("could not cast atomic controller client to the real thing") + } + if conn == nil { + return nil, errors.New("controller client is nil") + } + + timeoutContext, cancel := context.WithTimeout(w.baseContext, validateSessionTimeout) + defer cancel() + + resp, err := conn.ValidateSession(timeoutContext, &services.ValidateSessionRequest{ + Id: jobId, + }) + if err != nil { + return nil, fmt.Errorf("error validating session: %w", err) + } + + parsedCert, err := x509.ParseCertificate(resp.GetCertificate()) + if err != nil { + return nil, fmt.Errorf("error parsing session certificate: %w", err) + } + + if len(parsedCert.DNSNames) != 1 { + return nil, fmt.Errorf("invalid length of DNS names (%d) in parsed certificate", len(parsedCert.DNSNames)) + } + + certPool := x509.NewCertPool() + certPool.AddCert(parsedCert) + + tlsConf := &tls.Config{ + Certificates: []tls.Certificate{ + { + Certificate: [][]byte{resp.GetCertificate()}, + PrivateKey: ed25519.PrivateKey(resp.GetPrivateKey()), + Leaf: parsedCert, + }, + }, + ServerName: parsedCert.DNSNames[0], + ClientAuth: tls.RequireAndVerifyClientCert, + ClientCAs: certPool, + MinVersion: tls.VersionTLS13, + } + + // TODO: Periodicially clean this up. We can't rely on things in here but + // not in cancellation because they could be on the way to being + // established. However, since cert lifetimes are short, we can simply range + // through and remove values that are expired. + w.jobInfoMap.Store(jobId, resp) + + return tlsConf, nil +} diff --git a/internal/servers/worker/listeners.go b/internal/servers/worker/listeners.go index 87b89132c6..f46ef46ded 100644 --- a/internal/servers/worker/listeners.go +++ b/internal/servers/worker/listeners.go @@ -1,9 +1,16 @@ package worker import ( + "context" + "crypto/tls" "errors" "fmt" + "net" + "net/http" + "sync" + "time" + "github.com/hashicorp/go-alpnmux" "github.com/hashicorp/go-multierror" ) @@ -11,24 +18,74 @@ func (w *Worker) startListeners() error { servers := make([]func(), 0, len(w.conf.Listeners)) for _, ln := range w.conf.Listeners { - var err error for _, purpose := range ln.Config.Purpose { switch purpose { case "api", "cluster": - // Do nothing, in dev mode we might see it here - case "worker-alpn-tls": - if w.listeningAddress != "" { - return errors.New("more than one listening address found") - } - w.listeningAddress = ln.Config.Address - w.logger.Info("reporting listening address to controllers", "address", w.listeningAddress) - // TODO: other stuff - // TODO: once we have an actual listener, record in w.listeningAddress the actual address with port + // We may have this in dev mode; ignore + continue + + case "proxy": + // Do nothing; handle below + default: - err = fmt.Errorf("unknown listener purpose %q", purpose) + return fmt.Errorf("unknown listener purpose %q", purpose) + } + + if w.listeningAddress != "" { + return errors.New("more than one listening address found") + } + + handler := w.handler(HandlerProperties{ + ListenerConfig: ln.Config, + }) + + cancelCtx := w.baseContext + + server := &http.Server{ + Handler: handler, + ReadHeaderTimeout: 10 * time.Second, + ReadTimeout: 30 * time.Second, + IdleTimeout: 5 * time.Minute, + ErrorLog: w.logger.StandardLogger(nil), + BaseContext: func(net.Listener) context.Context { + return cancelCtx + }, + } + ln.HTTPServer = server + + if ln.Config.HTTPReadHeaderTimeout > 0 { + server.ReadHeaderTimeout = ln.Config.HTTPReadHeaderTimeout + } + if ln.Config.HTTPReadTimeout > 0 { + server.ReadTimeout = ln.Config.HTTPReadTimeout + } + if ln.Config.HTTPWriteTimeout > 0 { + server.WriteTimeout = ln.Config.HTTPWriteTimeout + } + if ln.Config.HTTPIdleTimeout > 0 { + server.IdleTimeout = ln.Config.HTTPIdleTimeout } + + // Clear out in case this is a second start of the controller + ln.Mux.UnregisterProto(alpnmux.DefaultProto) + ln.Mux.UnregisterProto(alpnmux.NoProto) + l, err := ln.Mux.RegisterProto(alpnmux.DefaultProto, &tls.Config{ + GetConfigForClient: w.getJobTls, + }) if err != nil { - return err + return fmt.Errorf("error getting tls listener: %w", err) + } + if l == nil { + return errors.New("could not get tls listener") + } + + servers = append(servers, func() { + go server.Serve(l) + }) + + if w.listeningAddress == "" { + w.listeningAddress = l.Addr().String() + w.logger.Info("reporting listening address to controllers", "address", w.listeningAddress) } } } @@ -41,15 +98,26 @@ func (w *Worker) startListeners() error { } func (w *Worker) stopListeners() error { - var retErr *multierror.Error + serverWg := new(sync.WaitGroup) for _, ln := range w.conf.Listeners { - if ln.ALPNListener != nil { - if err := ln.ALPNListener.Close(); err != nil { - retErr = multierror.Append(retErr, err) + localLn := ln + serverWg.Add(1) + go func() { + defer serverWg.Done() + + shutdownKill, shutdownKillCancel := context.WithTimeout(w.baseContext, localLn.Config.MaxRequestDuration) + defer shutdownKillCancel() + + if localLn.HTTPServer != nil { + localLn.HTTPServer.Shutdown(shutdownKill) } - } + }() + } + serverWg.Wait() - if !w.conf.RawConfig.DevController { + var retErr *multierror.Error + if !w.conf.RawConfig.DevController { + for _, ln := range w.conf.Listeners { if err := ln.Mux.Close(); err != nil { retErr = multierror.Append(retErr, err) } diff --git a/internal/servers/worker/status.go b/internal/servers/worker/status.go index f17f081f49..91515790c6 100644 --- a/internal/servers/worker/status.go +++ b/internal/servers/worker/status.go @@ -5,6 +5,7 @@ import ( "math/rand" "time" + "github.com/hashicorp/boundary/internal/gen/controller/api/services" pbs "github.com/hashicorp/boundary/internal/gen/controller/api/services" "github.com/hashicorp/boundary/internal/servers" "github.com/hashicorp/boundary/internal/types/resource" @@ -16,6 +17,11 @@ const ( statusInterval = 2 * time.Second ) +type LastStatusInformation struct { + *pbs.StatusResponse + StatusTime time.Time +} + func (w *Worker) startStatusTicking(cancelCtx context.Context) { go func() { r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -40,43 +46,51 @@ func (w *Worker) startStatusTicking(cancelCtx context.Context) { return case <-timer.C: - w.controllerConns.Range(func(_, v interface{}) bool { - // If something is removed from the map while ranging, ignore it - if v == nil { - return true + var activeJobs []string + w.cancellationMap.Range(func(key, value interface{}) bool { + activeJobs = append(activeJobs, key.(string)) + return true + }) + client := w.controllerConn.Load().(services.WorkerServiceClient) + result, err := client.Status(cancelCtx, &pbs.StatusRequest{ + ActiveJobIds: activeJobs, + Worker: &servers.Server{ + PrivateId: w.conf.RawConfig.Worker.Name, + Name: w.conf.RawConfig.Worker.Name, + Type: resource.Worker.String(), + Description: w.conf.RawConfig.Worker.Description, + Address: w.listeningAddress, + }, + }) + if err != nil { + w.logger.Error("error making status request to controller", "error", err) + } else { + w.logger.Trace("successfully sent status to controller") + addrs := make([]resolver.Address, 0, len(result.Controllers)) + strAddrs := make([]string, 0, len(result.Controllers)) + for _, v := range result.Controllers { + addrs = append(addrs, resolver.Address{Addr: v.Address}) + strAddrs = append(strAddrs, v.Address) } - c := v.(*controllerConnection) - result, err := c.client.Status(cancelCtx, &pbs.StatusRequest{ - Worker: &servers.Server{ - PrivateId: w.conf.RawConfig.Worker.Name, - Name: w.conf.RawConfig.Worker.Name, - Type: resource.Worker.String(), - Description: w.conf.RawConfig.Worker.Description, - Address: w.listeningAddress, - }, - }) - if err != nil { - w.logger.Error("error making status request to controller", "error", err) - } else { - w.logger.Trace("successfully sent status to controller") - addrs := make([]resolver.Address, 0, len(result.Controllers)) - strAddrs := make([]string, 0, len(result.Controllers)) - for _, v := range result.Controllers { - addrs = append(addrs, resolver.Address{Addr: v.Address}) - strAddrs = append(strAddrs, v.Address) + w.Resolver().UpdateState(resolver.State{Addresses: addrs}) + w.logger.Trace("found controllers", "addresses", strAddrs) + w.lastStatusSuccess.Store(&LastStatusInformation{StatusResponse: result, StatusTime: time.Now()}) + + for _, id := range result.GetCancelJobIds() { + if cancel, ok := w.cancellationMap.LoadAndDelete(id); ok { + cancel.(context.CancelFunc)() + w.logger.Info("canceled job", "job_id", id) + } else { + w.logger.Warn("asked to cancel job but could not find a cancellation function for it", "job_id", id) } - w.Resolver().UpdateState(resolver.State{Addresses: addrs}) - w.logger.Trace("found controllers", "addresses", strAddrs) - w.lastStatusSuccess.Store(time.Now()) } - return true - }) + } timer.Reset(getRandomInterval()) } } }() } -func (w *Worker) LastStatusSuccess() time.Time { - return w.lastStatusSuccess.Load().(time.Time) +func (w *Worker) LastStatusSuccess() *LastStatusInformation { + return w.lastStatusSuccess.Load().(*LastStatusInformation) } diff --git a/internal/servers/worker/tcp_proxy.go b/internal/servers/worker/tcp_proxy.go new file mode 100644 index 0000000000..fc61808950 --- /dev/null +++ b/internal/servers/worker/tcp_proxy.go @@ -0,0 +1,39 @@ +package worker + +import ( + "context" + "io" + "net" + "sync" + + "github.com/hashicorp/boundary/internal/gen/controller/api/services" + "nhooyr.io/websocket" +) + +func (w *Worker) handleTcpProxyV1(jobCtx context.Context, conn *websocket.Conn, jobInfo *services.ValidateSessionResponse) { + remoteConn, err := net.Dial("tcp", jobInfo.Endpoint) + if err != nil { + w.logger.Error("error dialing endpoint", "error", err, "endpoint", jobInfo.Endpoint) + conn.Close(websocket.StatusInternalError, "endpoint-dialing") + return + } + // Assert this for better Go 1.11 splice support + tcpRemoteConn := remoteConn.(*net.TCPConn) + + // Get a wrapped net.Conn so we can use io.Copy + netConn := websocket.NetConn(jobCtx, conn, websocket.MessageBinary) + + connWg := new(sync.WaitGroup) + connWg.Add(2) + go func() { + defer connWg.Done() + _, err := io.Copy(netConn, tcpRemoteConn) + w.logger.Debug("copy from client to endpoint done", "error", err) + }() + go func() { + defer connWg.Done() + _, err := io.Copy(tcpRemoteConn, netConn) + w.logger.Debug("copy from endpoint to client done", "error", err) + }() + connWg.Wait() +} diff --git a/internal/servers/worker/testing.go b/internal/servers/worker/testing.go index 5a919abc8b..1c7f6e2763 100644 --- a/internal/servers/worker/testing.go +++ b/internal/servers/worker/testing.go @@ -49,15 +49,10 @@ func (tw *TestWorker) Name() string { func (tw *TestWorker) ControllerAddrs() []string { var addrs []string - tw.w.controllerConns.Range(func(_, v interface{}) bool { - // If something is removed from the map while ranging, ignore it - if v == nil { - return true - } - c := v.(*controllerConnection) - addrs = append(addrs, c.controllerAddr) - return true - }) + lastStatus := tw.w.LastStatusSuccess() + for _, v := range lastStatus.GetControllers() { + addrs = append(addrs, v.Address) + } return addrs } @@ -68,7 +63,7 @@ func (tw *TestWorker) ProxyAddrs() []string { } for _, listener := range tw.b.Listeners { - if listener.Config.Purpose[0] == "worker-alpn-tls" { + if listener.Config.Purpose[0] == "proxy" { tcpAddr, ok := listener.Mux.Addr().(*net.TCPAddr) if !ok { tw.t.Fatal("could not parse address as a TCP addr") @@ -190,7 +185,7 @@ func NewTestWorker(t *testing.T, opts *TestWorkerOpts) *TestWorker { for _, listener := range opts.Config.Listeners { listener.RandomPort = true } - if err := tw.b.SetupListeners(nil, opts.Config.SharedConfig, []string{"worker-alpn-tls"}); err != nil { + if err := tw.b.SetupListeners(nil, opts.Config.SharedConfig, []string{"proxy"}); err != nil { t.Fatal(err) } diff --git a/internal/servers/worker/worker.go b/internal/servers/worker/worker.go index 2b3edff807..641748de58 100644 --- a/internal/servers/worker/worker.go +++ b/internal/servers/worker/worker.go @@ -6,7 +6,6 @@ import ( "fmt" "sync" "sync/atomic" - "time" "github.com/hashicorp/boundary/internal/cmd/config" "github.com/hashicorp/go-hclog" @@ -25,26 +24,31 @@ type Worker struct { baseCancel context.CancelFunc started ua.Bool - controllerConns *sync.Map + controllerConn *atomic.Value lastStatusSuccess *atomic.Value listeningAddress string controllerResolver *atomic.Value controllerResolverCleanup *atomic.Value + + jobInfoMap *sync.Map + cancellationMap *sync.Map } func New(conf *Config) (*Worker, error) { w := &Worker{ conf: conf, logger: conf.Logger.Named("worker"), - controllerConns: new(sync.Map), + controllerConn: new(atomic.Value), lastStatusSuccess: new(atomic.Value), controllerResolver: new(atomic.Value), controllerResolverCleanup: new(atomic.Value), + jobInfoMap: new(sync.Map), + cancellationMap: new(sync.Map), } - w.lastStatusSuccess.Store(time.Time{}) + w.lastStatusSuccess.Store((*LastStatusInformation)(nil)) w.started.Store(false) w.controllerResolver.Store((*manual.Resolver)(nil)) w.controllerResolverCleanup.Store(func() {}) diff --git a/internal/sessions/util.go b/internal/sessions/util.go new file mode 100644 index 0000000000..6c7d426d38 --- /dev/null +++ b/internal/sessions/util.go @@ -0,0 +1,37 @@ +package sessions + +import ( + "crypto/ed25519" + "crypto/sha256" + "errors" + "io" + + wrapping "github.com/hashicorp/go-kms-wrapping" + "github.com/hashicorp/go-kms-wrapping/wrappers/aead" + "github.com/hashicorp/go-kms-wrapping/wrappers/multiwrapper" + "golang.org/x/crypto/hkdf" +) + +// DeriveED25519Key generates a key based on the scope's sessions DEK, the +// requesting user, and the generated job ID. +func DeriveED25519Key(wrapper wrapping.Wrapper, userId, jobId string) (ed25519.PublicKey, ed25519.PrivateKey, error) { + var aeadWrapper *aead.Wrapper + switch w := wrapper.(type) { + case *multiwrapper.MultiWrapper: + raw := w.WrapperForKeyID("__base__") + var ok bool + if aeadWrapper, ok = raw.(*aead.Wrapper); !ok { + return nil, nil, errors.New("unexpected wrapper type from multiwrapper base") + } + case *aead.Wrapper: + aeadWrapper = w + default: + return nil, nil, errors.New("unknown wrapper type") + } + reader := hkdf.New(sha256.New, aeadWrapper.GetKeyBytes(), []byte(jobId), []byte(userId)) + limitedReader := &io.LimitedReader{ + R: reader, + N: 32, + } + return ed25519.GenerateKey(limitedReader) +}