You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
boundary/internal/db/migrations/postgres/69_wh_session_facts.up.sql

274 lines
9.3 KiB

begin;
-- wh_rollup_connections calculates the aggregate values from
-- wh_session_connection_accumulating_fact for p_session_id and updates
-- wh_session_accumulating_fact for p_session_id with those values.
create or replace function wh_rollup_connections(p_session_id wt_public_id)
returns void
as $$
declare
session_row wh_session_accumulating_fact%rowtype;
begin
with
session_totals (session_id, total_connection_count, total_bytes_up, total_bytes_down) as (
select session_id,
sum(connection_count),
sum(bytes_up),
sum(bytes_down)
from wh_session_connection_accumulating_fact
where session_id = p_session_id
group by session_id
)
update wh_session_accumulating_fact
set total_connection_count = session_totals.total_connection_count,
total_bytes_up = session_totals.total_bytes_up,
total_bytes_down = session_totals.total_bytes_down
from session_totals
where wh_session_accumulating_fact.session_id = session_totals.session_id
returning wh_session_accumulating_fact.* into strict session_row;
end;
$$ language plpgsql;
--
-- Session triggers
--
-- wh_insert_session returns an after insert trigger for the session table
-- which inserts a row in wh_session_accumulating_fact for the new session.
-- wh_insert_session also calls the wh_upsert_host and wh_upsert_user
-- functions which can result in new rows in wh_host_dimension and
-- wh_user_dimension respectively.
create or replace function wh_insert_session()
returns trigger
as $$
declare
new_row wh_session_accumulating_fact%rowtype;
begin
with
pending_timestamp (date_dim_id, time_dim_id, ts) as (
select wh_date_id(start_time), wh_time_id(start_time), start_time
from session_state
where session_id = new.public_id
and state = 'pending'
)
insert into wh_session_accumulating_fact (
session_id,
auth_token_id,
host_id,
user_id,
session_pending_date_id,
session_pending_time_id,
session_pending_time
)
select new.public_id,
new.auth_token_id,
wh_upsert_host(new.host_id, new.host_set_id, new.target_id),
wh_upsert_user(new.user_id, new.auth_token_id),
pending_timestamp.date_dim_id,
pending_timestamp.time_dim_id,
pending_timestamp.ts
from pending_timestamp
returning * into strict new_row;
return null;
end;
$$ language plpgsql;
create trigger wh_insert_session
after insert on session
for each row
execute function wh_insert_session();
--
-- Session Connection triggers
--
-- wh_insert_session_connection returns an after insert trigger for the
-- session_connection table which inserts a row in
-- wh_session_connection_accumulating_fact for the new session connection.
-- wh_insert_session_connection also calls wh_rollup_connections which can
-- result in updates to wh_session_accumulating_fact.
create or replace function wh_insert_session_connection()
returns trigger
as $$
declare
new_row wh_session_connection_accumulating_fact%rowtype;
begin
with
authorized_timestamp (date_dim_id, time_dim_id, ts) as (
select wh_date_id(start_time), wh_time_id(start_time), start_time
from session_connection_state
where connection_id = new.public_id
and state = 'authorized'
),
session_dimension (host_dim_id, user_dim_id) as (
select host_id, user_id
from wh_session_accumulating_fact
where session_id = new.session_id
)
insert into wh_session_connection_accumulating_fact (
connection_id,
session_id,
host_id,
user_id,
connection_authorized_date_id,
connection_authorized_time_id,
connection_authorized_time,
client_tcp_address,
client_tcp_port_number,
endpoint_tcp_address,
endpoint_tcp_port_number,
bytes_up,
bytes_down
)
select new.public_id,
new.session_id,
session_dimension.host_dim_id,
session_dimension.user_dim_id,
authorized_timestamp.date_dim_id,
authorized_timestamp.time_dim_id,
authorized_timestamp.ts,
new.client_tcp_address,
new.client_tcp_port,
new.endpoint_tcp_address,
new.endpoint_tcp_port,
new.bytes_up,
new.bytes_down
from authorized_timestamp,
session_dimension
returning * into strict new_row;
perform wh_rollup_connections(new.session_id);
return null;
end;
$$ language plpgsql;
create trigger wh_insert_session_connection
after insert on session_connection
for each row
execute function wh_insert_session_connection();
-- wh_update_session_connection returns an after update trigger for the
-- session_connection table which updates a row in
-- wh_session_connection_accumulating_fact for the session connection.
-- wh_update_session_connection also calls wh_rollup_connections which can
-- result in updates to wh_session_accumulating_fact.
create or replace function wh_update_session_connection()
returns trigger
as $$
declare
updated_row wh_session_connection_accumulating_fact%rowtype;
begin
update wh_session_connection_accumulating_fact
set client_tcp_address = new.client_tcp_address,
client_tcp_port_number = new.client_tcp_port,
endpoint_tcp_address = new.endpoint_tcp_address,
endpoint_tcp_port_number = new.endpoint_tcp_port,
bytes_up = new.bytes_up,
bytes_down = new.bytes_down
where connection_id = new.public_id
returning * into strict updated_row;
perform wh_rollup_connections(new.session_id);
return null;
end;
$$ language plpgsql;
create trigger wh_update_session_connection
after update on session_connection
for each row
execute function wh_update_session_connection();
--
-- Session State trigger
--
-- wh_insert_session_state returns an after insert trigger for the
-- session_state table which updates wh_session_accumulating_fact.
create or replace function wh_insert_session_state()
returns trigger
as $$
declare
date_col text;
time_col text;
ts_col text;
q text;
session_row wh_session_accumulating_fact%rowtype;
begin
if new.state = 'pending' then
-- The pending state is the first state which is handled by the
-- wh_insert_session trigger. The update statement in this trigger will
-- fail for the pending state because the row for the session has not yet
-- been inserted into the wh_session_accumulating_fact table.
return null;
end if;
date_col = 'session_' || new.state || '_date_id';
time_col = 'session_' || new.state || '_time_id';
ts_col = 'session_' || new.state || '_time';
q = format('update wh_session_accumulating_fact
set (%I, %I, %I) = (select wh_date_id(%L), wh_time_id(%L), %L::timestamptz)
where session_id = %L
returning *',
date_col, time_col, ts_col,
new.start_time, new.start_time, new.start_time,
new.session_id);
execute q into strict session_row;
return null;
end;
$$ language plpgsql;
create trigger wh_insert_session_state
after insert on session_state
for each row
execute function wh_insert_session_state();
--
-- Session Connection State trigger
--
-- wh_insert_session_connection_state returns an after insert trigger for the
-- session_connection_state table which updates
-- wh_session_connection_accumulating_fact.
create or replace function wh_insert_session_connection_state()
returns trigger
as $$
declare
date_col text;
time_col text;
ts_col text;
q text;
connection_row wh_session_connection_accumulating_fact%rowtype;
begin
if new.state = 'authorized' then
-- The authorized state is the first state which is handled by the
-- wh_insert_session_connection trigger. The update statement in this
-- trigger will fail for the authorized state because the row for the
-- session connection has not yet been inserted into the
-- wh_session_connection_accumulating_fact table.
return null;
end if;
date_col = 'connection_' || new.state || '_date_id';
time_col = 'connection_' || new.state || '_time_id';
ts_col = 'connection_' || new.state || '_time';
q = format('update wh_session_connection_accumulating_fact
set (%I, %I, %I) = (select wh_date_id(%L), wh_time_id(%L), %L::timestamptz)
where connection_id = %L
returning *',
date_col, time_col, ts_col,
new.start_time, new.start_time, new.start_time,
new.connection_id);
execute q into strict connection_row;
return null;
end;
$$ language plpgsql;
create trigger wh_insert_session_connection_state
after insert on session_connection_state
for each row
execute function wh_insert_session_connection_state();
commit;