Add monitor session job proto and controller changes

This adds a new Job type that is for generic session state monitoring.  Monitoring sessions does not include any information about specific connections.
pull/3251/head
Todd 3 years ago committed by Timothy Messier
parent 4d60f42210
commit bc48f2d0e0
No known key found for this signature in database
GPG Key ID: EFD2F184F7600572

@ -207,9 +207,23 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques
}
stateReport := make([]*session.StateReport, 0, len(req.GetJobs()))
var monitoredSessionIds []string
for _, jobStatus := range req.GetJobs() {
switch jobStatus.Job.GetType() {
case pbs.JOBTYPE_JOBTYPE_MONITOR_SESSION:
si := jobStatus.GetJob().GetMonitorSessionInfo()
if si == nil {
return nil, status.Error(codes.Internal, "Error getting monitored session info at status time")
}
switch si.Status {
case pbs.SESSIONSTATUS_SESSIONSTATUS_CANCELING,
pbs.SESSIONSTATUS_SESSIONSTATUS_TERMINATED:
// No need to see about canceling anything
continue
}
monitoredSessionIds = append(monitoredSessionIds, si.GetSessionId())
case pbs.JOBTYPE_JOBTYPE_SESSION:
si := jobStatus.GetJob().GetSessionInfo()
if si == nil {
@ -267,14 +281,44 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques
Status: session.StatusClosed.ProtoVal(),
})
}
processErr := pbs.SessionProcessingError_SESSION_PROCESSING_ERROR_UNSPECIFIED
if na.Unrecognized {
processErr = pbs.SessionProcessingError_SESSION_PROCESSING_ERROR_UNRECOGNIZED
}
ret.JobsRequests = append(ret.JobsRequests, &pbs.JobChangeRequest{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_SESSION,
JobInfo: &pbs.Job_SessionInfo{
SessionInfo: &pbs.SessionJobInfo{
SessionId: na.SessionId,
Status: na.Status.ProtoVal(),
Connections: connChanges,
SessionId: na.SessionId,
Status: na.Status.ProtoVal(),
Connections: connChanges,
ProcessingError: processErr,
},
},
},
RequestType: pbs.CHANGETYPE_CHANGETYPE_UPDATE_STATE,
})
}
nonActiveMonitoredSessions, err := sessRepo.CheckIfNotActive(ctx, monitoredSessionIds)
if err != nil {
return nil, status.Errorf(codes.Internal,
"Error checking if monitored jobs are no longer active: %v", err)
}
for _, na := range nonActiveMonitoredSessions {
processErr := pbs.SessionProcessingError_SESSION_PROCESSING_ERROR_UNSPECIFIED
if na.Unrecognized {
processErr = pbs.SessionProcessingError_SESSION_PROCESSING_ERROR_UNRECOGNIZED
}
ret.JobsRequests = append(ret.JobsRequests, &pbs.JobChangeRequest{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_MONITOR_SESSION,
JobInfo: &pbs.Job_MonitorSessionInfo{
MonitorSessionInfo: &pbs.MonitorSessionJobInfo{
SessionId: na.SessionId,
Status: na.Status.ProtoVal(),
ProcessingError: processErr,
},
},
},

@ -43,7 +43,8 @@ func TestStatus(t *testing.T) {
rw := db.New(conn)
wrapper := db.TestWrapper(t)
kms := kms.TestKms(t, conn, wrapper)
org, prj := iam.TestScopes(t, iam.TestRepo(t, conn, wrapper))
iamRepo := iam.TestRepo(t, conn, wrapper)
org, prj := iam.TestScopes(t, iamRepo)
serverRepo, _ := server.NewRepository(rw, rw, kms)
serverRepo.UpsertController(ctx, &store.Controller{
@ -83,6 +84,16 @@ func TestStatus(t *testing.T) {
worker1 := server.TestKmsWorker(t, conn, wrapper)
canceledSess := session.TestDefaultSession(t, conn, wrapper, iamRepo)
tofu := session.TestTofu(t)
canceledSess, _, err = repo.ActivateSession(ctx, canceledSess.PublicId, canceledSess.Version, tofu)
require.NoError(t, err)
canceledConn, _, err := connRepo.AuthorizeConnection(ctx, canceledSess.PublicId, worker1.PublicId)
require.NoError(t, err)
canceledSess, err = repo.CancelSession(ctx, canceledSess.PublicId, canceledSess.Version)
require.NoError(t, err)
sess := session.TestSession(t, conn, wrapper, session.ComposedOf{
UserId: uId,
HostId: h.GetPublicId(),
@ -93,7 +104,7 @@ func TestStatus(t *testing.T) {
Endpoint: "tcp://127.0.0.1:22",
ConnectionLimit: 10,
})
tofu := session.TestTofu(t)
tofu = session.TestTofu(t)
sess, _, err = repo.ActivateSession(ctx, sess.PublicId, sess.Version, tofu)
require.NoError(t, err)
require.NoError(t, err)
@ -133,6 +144,220 @@ func TestStatus(t *testing.T) {
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
{
name: "One unrecognized session",
wantErr: false,
req: &pbs.StatusRequest{
WorkerStatus: &pb.ServerWorkerStatus{
PublicId: worker1.GetPublicId(),
Name: worker1.GetName(),
Address: worker1.GetAddress(),
},
Jobs: []*pbs.JobStatus{
{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_SESSION,
JobInfo: &pbs.Job_SessionInfo{
SessionInfo: &pbs.SessionJobInfo{
SessionId: "unrecognized",
Status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE,
Connections: []*pbs.Connection{
{
ConnectionId: canceledConn.PublicId,
Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CONNECTED,
},
},
},
},
},
},
{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_SESSION,
JobInfo: &pbs.Job_SessionInfo{
SessionInfo: &pbs.SessionJobInfo{
SessionId: sess.PublicId,
Status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE,
Connections: []*pbs.Connection{
{
ConnectionId: connection.PublicId,
Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CONNECTED,
},
},
},
},
},
},
{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_MONITOR_SESSION,
JobInfo: &pbs.Job_MonitorSessionInfo{
MonitorSessionInfo: &pbs.MonitorSessionJobInfo{
SessionId: "unrecognized",
Status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE,
},
},
},
},
{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_MONITOR_SESSION,
JobInfo: &pbs.Job_MonitorSessionInfo{
MonitorSessionInfo: &pbs.MonitorSessionJobInfo{
SessionId: sess.PublicId,
Status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE,
},
},
},
},
},
},
want: &pbs.StatusResponse{
CalculatedUpstreams: []*pbs.UpstreamServer{
{
Type: pbs.UpstreamServer_TYPE_CONTROLLER,
Address: "127.0.0.1",
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
JobsRequests: []*pbs.JobChangeRequest{
{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_SESSION,
JobInfo: &pbs.Job_SessionInfo{
SessionInfo: &pbs.SessionJobInfo{
SessionId: "unrecognized",
Status: pbs.SESSIONSTATUS_SESSIONSTATUS_UNSPECIFIED,
ProcessingError: pbs.SessionProcessingError_SESSION_PROCESSING_ERROR_UNRECOGNIZED,
},
},
},
RequestType: pbs.CHANGETYPE_CHANGETYPE_UPDATE_STATE,
},
{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_MONITOR_SESSION,
JobInfo: &pbs.Job_MonitorSessionInfo{
MonitorSessionInfo: &pbs.MonitorSessionJobInfo{
SessionId: "unrecognized",
Status: pbs.SESSIONSTATUS_SESSIONSTATUS_UNSPECIFIED,
ProcessingError: pbs.SessionProcessingError_SESSION_PROCESSING_ERROR_UNRECOGNIZED,
},
},
},
RequestType: pbs.CHANGETYPE_CHANGETYPE_UPDATE_STATE,
},
},
},
},
{
name: "One Cancelled Session",
wantErr: false,
req: &pbs.StatusRequest{
WorkerStatus: &pb.ServerWorkerStatus{
PublicId: worker1.GetPublicId(),
Name: worker1.GetName(),
Address: worker1.GetAddress(),
},
Jobs: []*pbs.JobStatus{
{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_SESSION,
JobInfo: &pbs.Job_SessionInfo{
SessionInfo: &pbs.SessionJobInfo{
SessionId: canceledSess.PublicId,
Status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE,
Connections: []*pbs.Connection{
{
ConnectionId: canceledConn.PublicId,
Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CONNECTED,
},
},
},
},
},
},
{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_SESSION,
JobInfo: &pbs.Job_SessionInfo{
SessionInfo: &pbs.SessionJobInfo{
SessionId: sess.PublicId,
Status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE,
Connections: []*pbs.Connection{
{
ConnectionId: connection.PublicId,
Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CONNECTED,
},
},
},
},
},
},
{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_MONITOR_SESSION,
JobInfo: &pbs.Job_MonitorSessionInfo{
MonitorSessionInfo: &pbs.MonitorSessionJobInfo{
SessionId: canceledSess.PublicId,
Status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE,
},
},
},
},
{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_MONITOR_SESSION,
JobInfo: &pbs.Job_MonitorSessionInfo{
MonitorSessionInfo: &pbs.MonitorSessionJobInfo{
SessionId: sess.PublicId,
Status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE,
},
},
},
},
},
},
want: &pbs.StatusResponse{
CalculatedUpstreams: []*pbs.UpstreamServer{
{
Type: pbs.UpstreamServer_TYPE_CONTROLLER,
Address: "127.0.0.1",
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
JobsRequests: []*pbs.JobChangeRequest{
{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_SESSION,
JobInfo: &pbs.Job_SessionInfo{
SessionInfo: &pbs.SessionJobInfo{
SessionId: canceledSess.PublicId,
Status: pbs.SESSIONSTATUS_SESSIONSTATUS_CANCELING,
},
},
},
RequestType: pbs.CHANGETYPE_CHANGETYPE_UPDATE_STATE,
},
{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_MONITOR_SESSION,
JobInfo: &pbs.Job_MonitorSessionInfo{
MonitorSessionInfo: &pbs.MonitorSessionJobInfo{
SessionId: canceledSess.PublicId,
Status: pbs.SESSIONSTATUS_SESSIONSTATUS_CANCELING,
},
},
},
RequestType: pbs.CHANGETYPE_CHANGETYPE_UPDATE_STATE,
},
},
},
},
{
name: "Still Active",
wantErr: false,
@ -160,6 +385,17 @@ func TestStatus(t *testing.T) {
},
},
},
{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_MONITOR_SESSION,
JobInfo: &pbs.Job_MonitorSessionInfo{
MonitorSessionInfo: &pbs.MonitorSessionJobInfo{
SessionId: sess.PublicId,
Status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE,
},
},
},
},
},
},
want: &pbs.StatusResponse{
@ -220,7 +456,9 @@ func TestStatus(t *testing.T) {
pbs.JobChangeRequest{},
pbs.Job{},
pbs.Job_SessionInfo{},
pbs.Job_MonitorSessionInfo{},
pbs.SessionJobInfo{},
pbs.MonitorSessionJobInfo{},
pbs.Connection{},
pbs.AuthorizedWorkerList{},
pbs.AuthorizedDownstreamWorkerList{},

@ -93,19 +93,6 @@ func Test_TestController(t *testing.T) {
ws := tc.Kms().GetExternalWrappers(testCtx)
assert.NotNil(ws.Root())
assert.NotNil(ws.WorkerAuth())
assert.NotNil(ws.Recovery())
assert.NotNil(ws.Bsr())
})
t.Run("controller-external-wrappers", func(t *testing.T) {
testCtx := context.Background()
assert := assert.New(t)
tc := NewTestController(t, nil)
defer tc.Shutdown()
ws := tc.Kms().GetExternalWrappers(testCtx)
assert.NotNil(ws.Root())
assert.NotNil(ws.WorkerAuth())
assert.NotNil(ws.Recovery())

@ -45,11 +45,24 @@ message SessionJobInfo {
string session_id = 1; // @gotags: `class:"public"`
SESSIONSTATUS status = 2;
repeated Connection connections = 3;
SessionProcessingError processing_error = 4; // @gotags: `class:"public"`
}
enum SessionProcessingError {
SESSION_PROCESSING_ERROR_UNSPECIFIED = 0;
SESSION_PROCESSING_ERROR_UNRECOGNIZED = 1;
}
message MonitorSessionJobInfo {
string session_id = 1; // @gotags: `class:"public"`
SESSIONSTATUS status = 2; // @gotags: `class:"public"`
SessionProcessingError processing_error = 3; // @gotags: `class:"public"`
}
enum JOBTYPE {
JOBTYPE_UNSPECIFIED = 0;
JOBTYPE_SESSION = 1;
JOBTYPE_MONITOR_SESSION = 2;
}
message Job {
@ -57,6 +70,7 @@ message Job {
oneof job_info {
// This value is specified when type is JOBTYPE_SESSION.
SessionJobInfo session_info = 2;
MonitorSessionJobInfo monitor_session_info = 3;
}
}

@ -392,15 +392,6 @@ update session_connection
where
public_id in (select public_id from connections_to_close)
returning public_id;
`
checkIfNotActive = `
select session_id, state
from session_state ss
where
(ss.state = 'canceling' or ss.state = 'terminated')
and ss.end_time is null
%s
;
`
deleteTerminated = `
delete from session

@ -708,48 +708,46 @@ func (r *Repository) updateState(ctx context.Context, sessionId string, sessionV
return &updatedSession, returnedStates, nil
}
// checkIfNoLongerActive checks the given sessions to see if they are in a
// CheckIfNotActive checks the given sessions to see if they are in a
// non-active state, i.e. "canceling" or "terminated" It returns a *StateReport
// object for each session that is not active, with its current status.
func (r *Repository) checkIfNoLongerActive(ctx context.Context, reportedSessions []string) ([]*StateReport, error) {
const op = "session.(Repository).checkIfNotActive"
func (r *Repository) CheckIfNotActive(ctx context.Context, reportedSessions []string) ([]*StateReport, error) {
const op = "session.(Repository).listSessionIdAndState"
notActive := make([]*StateReport, 0, len(reportedSessions))
args := make([]any, 0, len(reportedSessions))
var inClause string
if len(reportedSessions) <= 0 {
return notActive, nil
}
inClause = `and session_id in (%s)`
params := make([]string, len(reportedSessions))
for i, sessId := range reportedSessions {
params[i] = fmt.Sprintf("@%d", i)
args = append(args, sql.Named(fmt.Sprintf("%d", i), sessId))
unrecognizedSessions := make(map[string]struct{})
for _, sessId := range reportedSessions {
unrecognizedSessions[sessId] = struct{}{}
}
inClause = fmt.Sprintf(inClause, strings.Join(params, ","))
_, err := r.writer.DoTx(
ctx,
db.StdRetryCnt,
db.ExpBackoff{},
func(reader db.Reader, w db.Writer) error {
rows, err := r.reader.Query(ctx, fmt.Sprintf(checkIfNotActive, inClause), args)
var states []*State
err := r.reader.SearchWhere(ctx, &states, "end_time is null and session_id in (?)", []any{reportedSessions})
if err != nil {
return errors.Wrap(ctx, err, op)
}
defer rows.Close()
for rows.Next() {
var sessionId string
var status Status
if err := rows.Scan(&sessionId, &status); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("scan row failed"))
for _, s := range states {
delete(unrecognizedSessions, s.SessionId)
switch s.Status {
case StatusPending, StatusActive:
continue
case StatusCanceling, StatusTerminated:
default:
return errors.New(ctx, errors.Internal, op, fmt.Sprintf("unknown session state %q", s.Status))
}
notActive = append(notActive, &StateReport{
SessionId: sessionId,
Status: status,
SessionId: s.SessionId,
Status: s.Status,
})
}
return nil
@ -758,6 +756,14 @@ func (r *Repository) checkIfNoLongerActive(ctx context.Context, reportedSessions
if err != nil {
return nil, errors.Wrap(ctx, err, op, errors.WithMsg("error checking if sessions are no longer active"))
}
for s := range unrecognizedSessions {
notActive = append(notActive, &StateReport{
SessionId: s,
Unrecognized: true,
})
}
return notActive, nil
}

@ -1816,3 +1816,41 @@ func Test_decryptAndMaybeUpdateSession(t *testing.T) {
require.ErrorContains(t, err, "You may need to recreate your session")
})
}
func TestRepository_CheckIfNoLongerActive(t *testing.T) {
t.Parallel()
ctx := context.Background()
conn, _ := db.TestSetup(t, "postgres")
rw := db.New(conn)
wrapper := db.TestWrapper(t)
iamRepo := iam.TestRepo(t, conn, wrapper)
testKms := kms.TestKms(t, conn, wrapper)
repo, err := NewRepository(ctx, rw, rw, testKms)
require.NoError(t, err)
terminatedSession := TestDefaultSession(t, conn, wrapper, iamRepo)
terminatedSession, err = repo.CancelSession(ctx, terminatedSession.PublicId, terminatedSession.Version)
require.NoError(t, err)
n, err := repo.terminateSessionIfPossible(ctx, terminatedSession.PublicId)
require.NoError(t, err)
require.Equal(t, 1, n)
cancelingSess := TestDefaultSession(t, conn, wrapper, iamRepo)
cancelingSess, err = repo.CancelSession(ctx, cancelingSess.PublicId, cancelingSess.Version)
require.NoError(t, err)
pendingSess := TestDefaultSession(t, conn, wrapper, iamRepo)
activeSess := TestDefaultSession(t, conn, wrapper, iamRepo)
_, _, err = repo.ActivateSession(ctx, activeSess.PublicId, activeSess.Version, []byte("tofu"))
require.NoError(t, err)
unrecognizedSessionId := "unrecognized_session_id"
got, err := repo.CheckIfNotActive(ctx, []string{unrecognizedSessionId, terminatedSession.PublicId, cancelingSess.PublicId, activeSess.PublicId, pendingSess.PublicId})
assert.NoError(t, err)
var gotIds []string
for _, g := range got {
gotIds = append(gotIds, g.SessionId)
}
assert.ElementsMatch(t, gotIds, []string{unrecognizedSessionId, terminatedSession.PublicId, cancelingSess.PublicId})
}

@ -17,6 +17,8 @@ type StateReport struct {
SessionId string
Status Status
Connections []*Connection
// Unrecognized indicates that the SessionId was not found in the database.
Unrecognized bool
}
// WorkerStatusReport is a domain service function that, given a Worker's
@ -48,7 +50,7 @@ func WorkerStatusReport(ctx context.Context, repo *Repository, connRepo *Connect
merr = multierror.Append(merr, errors.New(ctx, errors.Internal, op, fmt.Sprintf("failed to update bytes up and down for worker reported connections: %v", err)))
}
notActive, err := repo.checkIfNoLongerActive(ctx, reportedSessions)
notActive, err := repo.CheckIfNotActive(ctx, reportedSessions)
if err != nil {
merr = multierror.Append(merr, errors.New(ctx, errors.Internal, op, fmt.Sprintf("Error checking session state for worker %s: %v", workerId, err)))
}

@ -185,6 +185,28 @@ func TestWorkerStatusReport(t *testing.T) {
}
},
},
{
name: "unrecognized session",
caseFn: func(t *testing.T) testCase {
worker := server.TestKmsWorker(t, conn, wrapper)
return testCase{
worker: worker,
req: []*session.StateReport{
{
SessionId: "unrecognized_session_id",
Status: session.StatusActive,
},
},
want: []*session.StateReport{
{
SessionId: "unrecognized_session_id",
Unrecognized: true,
},
},
}
},
},
{
name: "MultipleSessionsClosed",
caseFn: func(t *testing.T) testCase {

Loading…
Cancel
Save