You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
boundary/internal/db/migrations/postgres/51_connection.up.sql

379 lines
13 KiB

begin;
/*
┌────────────────┐
│ session │
├────────────────┤
│ public_id (pk) │
│ │
│ │
└────────────────┘
▲fk1 ┼
╱│╲ ┌──────────────────────────────────────┐
┌───────────────────────┐ │ session_connection_closed_reason_enm │
│ session_connection │╲ fk2▶ ├──────────────────────────────────────┤
├───────────────────────┤─○───○┼│ name │
│ public_id (pk) │╱ └──────────────────────────────────────┘
│ session_id (fk1) │
│ closed_reason (fk2) │
└───────────────────────┘
▲fk1 ┼
╱│╲
┌──────────────────────────────────────────┐
│ session_connection_state │
├──────────────────────────────────────────┤┼○┐
│ connection_id (pk,fk1,fk2,unq1,unq2) │ │◀fk2
│ state (fk3) │ │
│ previous_end_time (fk2,unq1) │┼○┘
│ start_time (pk) │
│ end_time (unq2) │
└──────────────────────────────────────────┘
╲│╱
▼fk3 ┼
┌───────────────────────────────┐
│ session_connection_state_enm │
├───────────────────────────────┤
│ name │
└───────────────────────────────┘
*/
create table session_connection_closed_reason_enm (
name text primary key
constraint only_predefined_session_connection_closed_reasons_allowed
check (
name in (
'unknown',
'timed out',
'closed by end-user',
'canceled',
'network error',
'system error'
)
)
);
insert into session_connection_closed_reason_enm (name)
values
('unknown'),
('timed out'),
('closed by end-user'),
('canceled'),
('network error'),
('system error');
-- A session connection is one connection proxied by a worker from a client to
-- a endpoint for a session. The client initiates the connection to the worker
-- and the worker initiates the connection to the endpoint.
-- A session can have zero or more session connections.
create table session_connection (
public_id wt_public_id primary key,
session_id wt_public_id not null
references session (public_id)
on delete cascade
on update cascade,
-- the client_tcp_address is the network address of the client which initiated
-- the connection to a worker
client_tcp_address inet, -- maybe null on insert
-- the client_tcp_port is the network port at the address of the client the
-- worker proxied a connection for the user
client_tcp_port integer -- maybe null on insert
constraint client_tcp_port_must_be_greater_than_0
check(client_tcp_port > 0)
constraint client_tcp_port_must_less_than_or_equal_to_65535
check(client_tcp_port <= 65535),
-- the endpoint_tcp_address is the network address of the endpoint which the
-- worker initiated the connection to, for the user
endpoint_tcp_address inet, -- maybe be null on insert
-- the endpoint_tcp_port is the network port at the address of the endpoint the
-- worker proxied a connection to, for the user
endpoint_tcp_port integer -- maybe null on insert
constraint endpoint_tcp_port_must_be_greater_than_0
check(endpoint_tcp_port > 0)
constraint endpoint_tcp_port_must_less_than_or_equal_to_65535
check(endpoint_tcp_port <= 65535),
-- the total number of bytes received by the worker from the client and sent
-- to the endpoint for this connection
bytes_up bigint -- can be null
constraint bytes_up_must_be_null_or_a_non_negative_number
check (
bytes_up is null
or
bytes_up >= 0
),
-- the total number of bytes received by the worker from the endpoint and sent
-- to the client for this connection
bytes_down bigint -- can be null
constraint bytes_down_must_be_null_or_a_non_negative_number
check (
bytes_down is null
or
bytes_down >= 0
),
closed_reason text
references session_connection_closed_reason_enm (name)
on delete restrict
on update cascade,
version wt_version,
create_time wt_timestamp,
update_time wt_timestamp
);
create trigger
immutable_columns
before
update on session_connection
for each row execute procedure immutable_columns('public_id', 'session_id', 'create_time');
create trigger
update_version_column
after update on session_connection
for each row execute procedure update_version_column();
create trigger
update_time_column
before update on session_connection
for each row execute procedure update_time_column();
create trigger
default_create_time_column
before
insert on session_connection
for each row execute procedure default_create_time();
-- insert_new_connection_state() is used in an after insert trigger on the
-- session_connection table. it will insert a state of "authorized" in
-- session_connection_state for the new session connection.
create or replace function
insert_new_connection_state()
returns trigger
as $$
begin
insert into session_connection_state (connection_id, state)
values
(new.public_id, 'authorized');
return new;
end;
$$ language plpgsql;
create trigger
insert_new_connection_state
after insert on session_connection
for each row execute procedure insert_new_connection_state();
-- update_connection_state_on_closed_reason() is used in an update trigger on the
-- session_connection table. it will insert a state of "closed" in
-- session_connection_state for the closed session connection.
create or replace function
update_connection_state_on_closed_reason()
returns trigger
as $$
begin
if new.closed_reason is not null then
-- check to see if there's a closed state already, before inserting a
-- new one.
perform from
session_connection_state cs
where
cs.connection_id = new.public_id and
cs.state = 'closed';
if not found then
insert into session_connection_state (connection_id, state)
values
(new.public_id, 'closed');
end if;
-- whenever we close a connection, we want to terminate the session if
-- possible.
perform terminate_session_if_possible(new.session_id);
end if;
return new;
end;
$$ language plpgsql;
create trigger
update_connection_state_on_closed_reason
after update of closed_reason on session_connection
for each row execute procedure update_connection_state_on_closed_reason();
create table session_connection_state_enm (
name text primary key
constraint only_predefined_session_connection_states_allowed
check (
name in ('authorized', 'connected', 'closed')
)
);
insert into session_connection_state_enm (name)
values
('authorized'),
('connected'),
('closed');
create table session_connection_state (
connection_id wt_public_id not null
references session_connection (public_id)
on delete cascade
on update cascade,
state text not null
references session_connection_state_enm(name)
on delete restrict
on update cascade,
previous_end_time timestamp with time zone, -- fk2 -- null means first state
start_time timestamp with time zone default current_timestamp not null,
constraint previous_end_time_and_start_time_in_sequence
check (previous_end_time <= start_time),
end_time timestamp with time zone, -- null means unfinished current state
constraint start_and_end_times_in_sequence
check (start_time <= end_time),
constraint end_times_in_sequence
check (previous_end_time <> end_time),
primary key (connection_id, start_time),
unique (connection_id, previous_end_time), -- null means first state
unique (connection_id, end_time), -- one null current state
foreign key (connection_id, previous_end_time) -- self-reference
references session_connection_state (connection_id, end_time)
);
create trigger
immutable_columns
before
update on session_connection_state
for each row execute procedure immutable_columns('connection_id', 'state', 'start_time', 'previous_end_time');
create or replace function
insert_session_connection_state()
returns trigger
as $$
begin
update session_connection_state
set end_time = now()
where connection_id = new.connection_id
and end_time is null;
if not found then
new.previous_end_time = null;
new.start_time = now();
new.end_time = null;
return new;
end if;
new.previous_end_time = now();
new.start_time = now();
new.end_time = null;
return new;
end;
$$ language plpgsql;
create trigger insert_session_connection_state before insert on session_connection_state
for each row execute procedure insert_session_connection_state();
-- terminate_session_if_possible takes a session id and terminates the session
-- if the following conditions are met:
-- * the session is expired and all its connections are closed.
-- * the session is canceling and all its connections are closed
-- * the session has exhausted its connection limit and all its connections
-- are closed.
--
-- Note: this function should align closely with the domain function
-- TerminateCompletedSessions
create or replace function
terminate_session_if_possible(terminate_session_id text)
returns void
as $$
begin
-- is terminate_session_id in a canceling state
with canceling_session(session_id) as
(
select
session_id
from
session_state ss
where
ss.session_id = terminate_session_id and
ss.state = 'canceling' and
ss.end_time is null
)
update session us
set termination_reason =
case
-- timed out sessions
when now() > us.expiration_time then 'timed out'
-- canceling sessions
when us.public_id in(
select
session_id
from
canceling_session cs
where
us.public_id = cs.session_id
) then 'canceled'
-- default: session connection limit reached.
else 'connection limit'
end
where
-- limit update to just the terminating_session_id
us.public_id = terminate_session_id and
termination_reason is null and
-- session expired or connection limit reached
(
-- expired sessions...
now() > us.expiration_time or
-- connection limit reached...
(
-- handle unlimited connections...
connection_limit != -1 and
(
select count (*)
from session_connection sc
where
sc.session_id = us.public_id
) >= connection_limit
) or
-- canceled sessions
us.public_id in (
select
session_id
from
canceling_session cs
where
us.public_id = cs.session_id
)
) and
-- make sure there are no existing connections
us.public_id not in (
select
session_id
from
session_connection
where public_id in (
select
connection_id
from
session_connection_state
where
state != 'closed' and
end_time is null
)
);
end;
$$ language plpgsql;
commit;