|
|
|
|
@ -74,7 +74,6 @@ var (
|
|
|
|
|
type Client struct {
|
|
|
|
|
config *ClientConfig
|
|
|
|
|
exited bool
|
|
|
|
|
doneLogging chan struct{}
|
|
|
|
|
l sync.Mutex
|
|
|
|
|
address net.Addr
|
|
|
|
|
process *os.Process
|
|
|
|
|
@ -82,7 +81,16 @@ type Client struct {
|
|
|
|
|
protocol Protocol
|
|
|
|
|
logger hclog.Logger
|
|
|
|
|
doneCtx context.Context
|
|
|
|
|
ctxCancel context.CancelFunc
|
|
|
|
|
negotiatedVersion int
|
|
|
|
|
|
|
|
|
|
// clientWaitGroup is used to manage the lifecycle of the plugin management
|
|
|
|
|
// goroutines.
|
|
|
|
|
clientWaitGroup sync.WaitGroup
|
|
|
|
|
|
|
|
|
|
// processKilled is used for testing only, to flag when the process was
|
|
|
|
|
// forcefully killed.
|
|
|
|
|
processKilled bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NegotiatedVersion returns the protocol version negotiated with the server.
|
|
|
|
|
@ -369,6 +377,14 @@ func (c *Client) Exited() bool {
|
|
|
|
|
return c.exited
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// killed is used in tests to check if a process failed to exit gracefully, and
|
|
|
|
|
// needed to be killed.
|
|
|
|
|
func (c *Client) killed() bool {
|
|
|
|
|
c.l.Lock()
|
|
|
|
|
defer c.l.Unlock()
|
|
|
|
|
return c.processKilled
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// End the executing subprocess (if it is running) and perform any cleanup
|
|
|
|
|
// tasks necessary such as capturing any remaining logs and so on.
|
|
|
|
|
//
|
|
|
|
|
@ -380,7 +396,6 @@ func (c *Client) Kill() {
|
|
|
|
|
c.l.Lock()
|
|
|
|
|
process := c.process
|
|
|
|
|
addr := c.address
|
|
|
|
|
doneCh := c.doneLogging
|
|
|
|
|
c.l.Unlock()
|
|
|
|
|
|
|
|
|
|
// If there is no process, there is nothing to kill.
|
|
|
|
|
@ -389,11 +404,14 @@ func (c *Client) Kill() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
|
// Wait for the all client goroutines to finish.
|
|
|
|
|
c.clientWaitGroup.Wait()
|
|
|
|
|
|
|
|
|
|
// Make sure there is no reference to the old process after it has been
|
|
|
|
|
// killed.
|
|
|
|
|
c.l.Lock()
|
|
|
|
|
defer c.l.Unlock()
|
|
|
|
|
c.process = nil
|
|
|
|
|
c.l.Unlock()
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// We need to check for address here. It is possible that the plugin
|
|
|
|
|
@ -416,6 +434,8 @@ func (c *Client) Kill() {
|
|
|
|
|
// kill in a moment anyways.
|
|
|
|
|
c.logger.Warn("error closing client during Kill", "err", err)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
c.logger.Error("client", "error", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -424,19 +444,20 @@ func (c *Client) Kill() {
|
|
|
|
|
// doneCh which would be closed if the process exits.
|
|
|
|
|
if graceful {
|
|
|
|
|
select {
|
|
|
|
|
case <-doneCh:
|
|
|
|
|
case <-c.doneCtx.Done():
|
|
|
|
|
c.logger.Debug("plugin exited")
|
|
|
|
|
return
|
|
|
|
|
case <-time.After(1500 * time.Millisecond):
|
|
|
|
|
c.logger.Warn("plugin failed to exit gracefully")
|
|
|
|
|
case <-time.After(2 * time.Second):
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If graceful exiting failed, just kill it
|
|
|
|
|
c.logger.Warn("plugin failed to exit gracefully")
|
|
|
|
|
process.Kill()
|
|
|
|
|
|
|
|
|
|
// Wait for the client to finish logging so we have a complete log
|
|
|
|
|
<-doneCh
|
|
|
|
|
c.l.Lock()
|
|
|
|
|
c.processKilled = true
|
|
|
|
|
c.l.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Starts the underlying subprocess, communicating with it to negotiate
|
|
|
|
|
@ -455,7 +476,7 @@ func (c *Client) Start() (addr net.Addr, err error) {
|
|
|
|
|
|
|
|
|
|
// If one of cmd or reattach isn't set, then it is an error. We wrap
|
|
|
|
|
// this in a {} for scoping reasons, and hopeful that the escape
|
|
|
|
|
// analysis will pop the stock here.
|
|
|
|
|
// analysis will pop the stack here.
|
|
|
|
|
{
|
|
|
|
|
cmdSet := c.config.Cmd != nil
|
|
|
|
|
attachSet := c.config.Reattach != nil
|
|
|
|
|
@ -469,59 +490,8 @@ func (c *Client) Start() (addr net.Addr, err error) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create the logging channel for when we kill
|
|
|
|
|
c.doneLogging = make(chan struct{})
|
|
|
|
|
// Create a context for when we kill
|
|
|
|
|
var ctxCancel context.CancelFunc
|
|
|
|
|
c.doneCtx, ctxCancel = context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
|
|
if c.config.Reattach != nil {
|
|
|
|
|
// Verify the process still exists. If not, then it is an error
|
|
|
|
|
p, err := os.FindProcess(c.config.Reattach.Pid)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Attempt to connect to the addr since on Unix systems FindProcess
|
|
|
|
|
// doesn't actually return an error if it can't find the process.
|
|
|
|
|
conn, err := net.Dial(
|
|
|
|
|
c.config.Reattach.Addr.Network(),
|
|
|
|
|
c.config.Reattach.Addr.String())
|
|
|
|
|
if err != nil {
|
|
|
|
|
p.Kill()
|
|
|
|
|
return nil, ErrProcessNotFound
|
|
|
|
|
}
|
|
|
|
|
conn.Close()
|
|
|
|
|
|
|
|
|
|
// Goroutine to mark exit status
|
|
|
|
|
go func(pid int) {
|
|
|
|
|
// ensure the context is cancelled when we're done
|
|
|
|
|
defer ctxCancel()
|
|
|
|
|
// Wait for the process to die
|
|
|
|
|
pidWait(pid)
|
|
|
|
|
|
|
|
|
|
// Log so we can see it
|
|
|
|
|
c.logger.Debug("reattached plugin process exited")
|
|
|
|
|
|
|
|
|
|
// Mark it
|
|
|
|
|
c.l.Lock()
|
|
|
|
|
defer c.l.Unlock()
|
|
|
|
|
c.exited = true
|
|
|
|
|
|
|
|
|
|
// Close the logging channel since that doesn't work on reattach
|
|
|
|
|
close(c.doneLogging)
|
|
|
|
|
}(p.Pid)
|
|
|
|
|
|
|
|
|
|
// Set the address and process
|
|
|
|
|
c.address = c.config.Reattach.Addr
|
|
|
|
|
c.process = p
|
|
|
|
|
c.protocol = c.config.Reattach.Protocol
|
|
|
|
|
if c.protocol == "" {
|
|
|
|
|
// Default the protocol to net/rpc for backwards compatibility
|
|
|
|
|
c.protocol = ProtocolNetRPC
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return c.address, nil
|
|
|
|
|
return c.reattach()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if c.config.VersionedPlugins == nil {
|
|
|
|
|
@ -618,11 +588,15 @@ func (c *Client) Start() (addr net.Addr, err error) {
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Start goroutine to wait for process to exit
|
|
|
|
|
exitCh := make(chan struct{})
|
|
|
|
|
// Create a context for when we kill
|
|
|
|
|
c.doneCtx, c.ctxCancel = context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
|
|
c.clientWaitGroup.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
// ensure the context is cancelled when we're done
|
|
|
|
|
defer ctxCancel()
|
|
|
|
|
defer c.ctxCancel()
|
|
|
|
|
|
|
|
|
|
defer c.clientWaitGroup.Done()
|
|
|
|
|
|
|
|
|
|
// get the cmd info early, since the process information will be removed
|
|
|
|
|
// in Kill.
|
|
|
|
|
@ -645,9 +619,6 @@ func (c *Client) Start() (addr net.Addr, err error) {
|
|
|
|
|
c.logger.Debug("plugin process exited", debugMsgArgs...)
|
|
|
|
|
os.Stderr.Sync()
|
|
|
|
|
|
|
|
|
|
// Mark that we exited
|
|
|
|
|
close(exitCh)
|
|
|
|
|
|
|
|
|
|
// Set that we exited, which takes a lock
|
|
|
|
|
c.l.Lock()
|
|
|
|
|
defer c.l.Unlock()
|
|
|
|
|
@ -655,12 +626,16 @@ func (c *Client) Start() (addr net.Addr, err error) {
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Start goroutine that logs the stderr
|
|
|
|
|
c.clientWaitGroup.Add(1)
|
|
|
|
|
// logStderr calls Done()
|
|
|
|
|
go c.logStderr(cmdStderr)
|
|
|
|
|
|
|
|
|
|
// Start a goroutine that is going to be reading the lines
|
|
|
|
|
// out of stdout
|
|
|
|
|
linesCh := make(chan string)
|
|
|
|
|
c.clientWaitGroup.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer c.clientWaitGroup.Done()
|
|
|
|
|
defer close(linesCh)
|
|
|
|
|
|
|
|
|
|
scanner := bufio.NewScanner(cmdStdout)
|
|
|
|
|
@ -671,8 +646,12 @@ func (c *Client) Start() (addr net.Addr, err error) {
|
|
|
|
|
|
|
|
|
|
// Make sure after we exit we read the lines from stdout forever
|
|
|
|
|
// so they don't block since it is a pipe.
|
|
|
|
|
// The scanner goroutine above will close this, but track it with a wait
|
|
|
|
|
// group for completeness.
|
|
|
|
|
c.clientWaitGroup.Add(1)
|
|
|
|
|
defer func() {
|
|
|
|
|
go func() {
|
|
|
|
|
defer c.clientWaitGroup.Done()
|
|
|
|
|
for range linesCh {
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
@ -686,7 +665,7 @@ func (c *Client) Start() (addr net.Addr, err error) {
|
|
|
|
|
select {
|
|
|
|
|
case <-timeout:
|
|
|
|
|
err = errors.New("timeout while waiting for plugin to start")
|
|
|
|
|
case <-exitCh:
|
|
|
|
|
case <-c.doneCtx.Done():
|
|
|
|
|
err = errors.New("plugin exited before we could connect")
|
|
|
|
|
case line := <-linesCh:
|
|
|
|
|
// Trim the line and split by "|" in order to get the parts of
|
|
|
|
|
@ -797,6 +776,59 @@ func (c *Client) loadServerCert(cert string) error {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Client) reattach() (net.Addr, error) {
|
|
|
|
|
// Verify the process still exists. If not, then it is an error
|
|
|
|
|
p, err := os.FindProcess(c.config.Reattach.Pid)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Attempt to connect to the addr since on Unix systems FindProcess
|
|
|
|
|
// doesn't actually return an error if it can't find the process.
|
|
|
|
|
conn, err := net.Dial(
|
|
|
|
|
c.config.Reattach.Addr.Network(),
|
|
|
|
|
c.config.Reattach.Addr.String())
|
|
|
|
|
if err != nil {
|
|
|
|
|
p.Kill()
|
|
|
|
|
return nil, ErrProcessNotFound
|
|
|
|
|
}
|
|
|
|
|
conn.Close()
|
|
|
|
|
|
|
|
|
|
// Create a context for when we kill
|
|
|
|
|
c.doneCtx, c.ctxCancel = context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
|
|
c.clientWaitGroup.Add(1)
|
|
|
|
|
// Goroutine to mark exit status
|
|
|
|
|
go func(pid int) {
|
|
|
|
|
defer c.clientWaitGroup.Done()
|
|
|
|
|
|
|
|
|
|
// ensure the context is cancelled when we're done
|
|
|
|
|
defer c.ctxCancel()
|
|
|
|
|
|
|
|
|
|
// Wait for the process to die
|
|
|
|
|
pidWait(pid)
|
|
|
|
|
|
|
|
|
|
// Log so we can see it
|
|
|
|
|
c.logger.Debug("reattached plugin process exited")
|
|
|
|
|
|
|
|
|
|
// Mark it
|
|
|
|
|
c.l.Lock()
|
|
|
|
|
defer c.l.Unlock()
|
|
|
|
|
c.exited = true
|
|
|
|
|
}(p.Pid)
|
|
|
|
|
|
|
|
|
|
// Set the address and process
|
|
|
|
|
c.address = c.config.Reattach.Addr
|
|
|
|
|
c.process = p
|
|
|
|
|
c.protocol = c.config.Reattach.Protocol
|
|
|
|
|
if c.protocol == "" {
|
|
|
|
|
// Default the protocol to net/rpc for backwards compatibility
|
|
|
|
|
c.protocol = ProtocolNetRPC
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return c.address, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// checkProtoVersion returns the negotiated version and PluginSet.
|
|
|
|
|
// This returns an error if the server returned an incompatible protocol
|
|
|
|
|
// version, or an invalid handshake response.
|
|
|
|
|
@ -902,7 +934,7 @@ func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Client) logStderr(r io.Reader) {
|
|
|
|
|
defer close(c.doneLogging)
|
|
|
|
|
defer c.clientWaitGroup.Done()
|
|
|
|
|
|
|
|
|
|
scanner := bufio.NewScanner(r)
|
|
|
|
|
l := c.logger.Named(filepath.Base(c.config.Cmd.Path))
|
|
|
|
|
|