Add Closed error code and rename recorder cache to manager

pull/3251/head
Todd 3 years ago committed by Timothy Messier
parent dbbcdb72bf
commit feb87e717e
No known key found for this signature in database
GPG Key ID: EFD2F184F7600572

@ -294,7 +294,7 @@ func (w *Worker) handleProxy(listenerCfg *listenerutil.ListenerConfig, sessionMa
conn.Close(proxyHandlers.WebsocketStatusProtocolSetupError, "error getting decryption function")
event.WriteError(ctx, op, err)
}
runProxy, err := handleProxyFn(ctx, decryptFn, cc, pDialer, acResp.GetConnectionId(), protocolCtx, w.recorderCache)
runProxy, err := handleProxyFn(ctx, decryptFn, cc, pDialer, acResp.GetConnectionId(), protocolCtx, w.recorderManager)
if err != nil {
conn.Close(proxyHandlers.WebsocketStatusProtocolSetupError, "unable to setup proxying")
event.WriteError(ctx, op, err)

@ -75,9 +75,9 @@ type downstreamers interface {
RootId() string
}
// recorderCache updates the status updates with relevant recording
// recorderManager updates the status updates with relevant recording
// information
type recorderCache any
type recorderManager any
// reverseConnReceiverFactory provides a simple factory which a Worker can use to
// create its reverseConnReceiver
@ -85,7 +85,7 @@ var reverseConnReceiverFactory func() reverseConnReceiver
var recordingStorageFactory func(ctx context.Context, path string, plgClients map[string]plgpb.StoragePluginServiceClient, enableLoopback bool) (storage.RecordingStorage, error)
var recorderCacheFactory func() recorderCache
var recorderManagerFactory func(*Worker) (recorderManager, error)
var initializeReverseGrpcClientCollectors = noopInitializePromCollectors
@ -117,7 +117,7 @@ type Worker struct {
sessionManager session.Manager
recorderCache recorderCache
recorderManager recorderManager
controllerStatusConn *atomic.Value
everAuthenticated *ua.Uint32
@ -204,10 +204,6 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
w.downstreamReceiver = reverseConnReceiverFactory()
}
if recorderCacheFactory != nil {
w.recorderCache = recorderCacheFactory()
}
w.lastStatusSuccess.Store((*LastStatusInformation)(nil))
scheme := strconv.FormatInt(time.Now().UnixNano(), 36)
controllerResolver := manual.NewBuilderWithScheme(scheme)
@ -291,6 +287,14 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
// FIXME: This is really ugly, but works.
session.CloseCallTimeout = w.statusCallTimeoutDuration
if recorderManagerFactory != nil {
var err error
w.recorderManager, err = recorderManagerFactory(w)
if err != nil {
return nil, fmt.Errorf("error calling recorderManagerFactory: %w", err)
}
}
var listenerCount int
for i := range conf.Listeners {
l := conf.Listeners[i]

@ -64,6 +64,7 @@ const (
StorageContainerWriteOnly = 132 // StorageContainerWriteOnly represents an error when a container is write only and a read operation is attempted on it
WorkerNotFoundForRequest = 133 // WorkerNotFoundForRequest represents an error when no appropriate worker is found which meets the conditions required to handle a request
Closed = 134 // Closed represents an error when an operation cannot be completed because the thing being operated on is closed
AuthAttemptExpired Code = 198 // AuthAttemptExpired represents an expired authentication attempt
AuthMethodInactive Code = 199 // AuthMethodInactive represents an error that means the auth method is not active.

@ -395,6 +395,11 @@ func TestCode_Both_String_Info(t *testing.T) {
c: WorkerNotFoundForRequest,
want: WorkerNotFoundForRequest,
},
{
name: "Closed",
c: Closed,
want: Closed,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

@ -331,4 +331,8 @@ var errorCodeInfo = map[Code]Info{
Message: "container is write only",
Kind: State,
},
Closed: {
Message: "closed",
Kind: State,
},
}

Loading…
Cancel
Save