|
|
|
|
@ -157,7 +157,7 @@ func (r *Repository) LookupSession(ctx context.Context, sessionId string, opt ..
|
|
|
|
|
return fmt.Errorf("lookup session: failed %w for %s", err, sessionId)
|
|
|
|
|
}
|
|
|
|
|
var err error
|
|
|
|
|
if states, err = fetchStates(ctx, read, sessionId); err != nil {
|
|
|
|
|
if states, err = fetchStates(ctx, read, sessionId, db.WithOrder("start_time desc")); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
@ -200,8 +200,87 @@ func (r *Repository) UpdateSession(ctx context.Context, s *Session, version uint
|
|
|
|
|
panic("not implemented")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Repository) UpdateState(ctx context.Context, sessionId string, sessionVersion uint32, s Status, opt ...Option) (*Session, []*State, int, error) {
|
|
|
|
|
panic("not implemented")
|
|
|
|
|
// UpdateState will update the session's state using the session id and its
|
|
|
|
|
// version. No options are currently supported.
|
|
|
|
|
func (r *Repository) UpdateState(ctx context.Context, sessionId string, sessionVersion uint32, s Status, opt ...Option) (*Session, []*State, error) {
|
|
|
|
|
if sessionId == "" {
|
|
|
|
|
return nil, nil, fmt.Errorf("update session state: missing session id %w", db.ErrInvalidParameter)
|
|
|
|
|
}
|
|
|
|
|
if sessionVersion == 0 {
|
|
|
|
|
return nil, nil, fmt.Errorf("update session state: version cannot be zero: %w", db.ErrInvalidParameter)
|
|
|
|
|
}
|
|
|
|
|
if s == "" {
|
|
|
|
|
return nil, nil, fmt.Errorf("update session state: missing session status: %w", db.ErrInvalidParameter)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
newState, err := NewState(sessionId, s)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, nil, fmt.Errorf("update session state: %w", err)
|
|
|
|
|
}
|
|
|
|
|
ses, _, err := r.LookupSession(ctx, sessionId)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, nil, fmt.Errorf("update session state: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if ses == nil {
|
|
|
|
|
return nil, nil, fmt.Errorf("update session state: unable to look up session for %s: %w", sessionId, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
oplogWrapper, err := r.kms.GetWrapper(ctx, ses.ScopeId, kms.KeyPurposeOplog)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, nil, fmt.Errorf("update session state: unable to get oplog wrapper: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
updatedSession := allocSession()
|
|
|
|
|
var returnedStates []*State
|
|
|
|
|
_, err = r.writer.DoTx(
|
|
|
|
|
ctx,
|
|
|
|
|
db.StdRetryCnt,
|
|
|
|
|
db.ExpBackoff{},
|
|
|
|
|
func(reader db.Reader, w db.Writer) error {
|
|
|
|
|
msgs := make([]*oplog.Message, 0, 2)
|
|
|
|
|
sessionTicket, err := w.GetTicket(ses)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to get ticket: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// We need to update the session version as that's the aggregate
|
|
|
|
|
updatedSession.PublicId = sessionId
|
|
|
|
|
updatedSession.Version = uint32(sessionVersion) + 1
|
|
|
|
|
var sessionOplogMsg oplog.Message
|
|
|
|
|
rowsUpdated, err := w.Update(ctx, &updatedSession, []string{"Version"}, nil, db.NewOplogMsg(&sessionOplogMsg), db.WithVersion(&sessionVersion))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to update session version: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if rowsUpdated != 1 {
|
|
|
|
|
return fmt.Errorf("updated session and %d rows updated", rowsUpdated)
|
|
|
|
|
}
|
|
|
|
|
msgs = append(msgs, &sessionOplogMsg)
|
|
|
|
|
var stateOplogMsg oplog.Message
|
|
|
|
|
if err := w.Create(ctx, newState, db.NewOplogMsg(&stateOplogMsg)); err != nil {
|
|
|
|
|
return fmt.Errorf("unable to add new state: %w", err)
|
|
|
|
|
}
|
|
|
|
|
msgs = append(msgs, &stateOplogMsg)
|
|
|
|
|
|
|
|
|
|
metadata := oplog.Metadata{
|
|
|
|
|
"op-type": []string{oplog.OpType_OP_TYPE_CREATE.String()},
|
|
|
|
|
"scope-id": []string{ses.ScopeId},
|
|
|
|
|
"scope-type": []string{"project"},
|
|
|
|
|
"resource-public-id": []string{sessionId},
|
|
|
|
|
}
|
|
|
|
|
if err := w.WriteOplogEntryWith(ctx, oplogWrapper, sessionTicket, metadata, msgs); err != nil {
|
|
|
|
|
return fmt.Errorf("unable to write oplog: %w", err)
|
|
|
|
|
}
|
|
|
|
|
returnedStates, err = fetchStates(ctx, reader, sessionId, db.WithOrder("start_time desc"))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, nil, fmt.Errorf("update session state: error creating new state: %w", err)
|
|
|
|
|
}
|
|
|
|
|
return &updatedSession, returnedStates, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// list will return a listing of resources and honor the WithLimit option or the
|
|
|
|
|
|