pagination: refactor for new pagination design

The new design defines 4 distinct functions, List, ListPage, ListRefresh
and ListRefreshPage. The first two are used during the initial
pagination, and the other two are used during refreshing.
pull/4202/head
Johan Brandhorst-Satzkorn 3 years ago
parent 3019038eab
commit 0657276a0e

@ -10,8 +10,8 @@ import (
"github.com/hashicorp/boundary/internal/boundary"
"github.com/hashicorp/boundary/internal/db/timestamp"
"github.com/hashicorp/boundary/internal/listtoken"
"github.com/hashicorp/boundary/internal/pagination"
"github.com/hashicorp/boundary/internal/refreshtoken"
"github.com/hashicorp/boundary/internal/types/resource"
)
@ -30,6 +30,10 @@ func (e *ExampleResource) GetUpdateTime() *timestamp.Timestamp {
return timestamp.New(time.Now().Add(-10 * time.Hour))
}
func (e *ExampleResource) GetCreateTime() *timestamp.Timestamp {
return timestamp.New(time.Now().Add(-20 * time.Hour))
}
func (e *ExampleResource) GetResourceType() resource.Type {
return resource.Unknown
}
@ -42,7 +46,7 @@ func ExampleList() {
// include it in the final list.
return item.Value < 5, nil
}
listItemsFunc := func(ctx context.Context, prevPageLast *ExampleResource, limit int) ([]*ExampleResource, error) {
listItemsFunc := func(ctx context.Context, prevPageLast *ExampleResource, limit int) ([]*ExampleResource, time.Time, error) {
// Do the listing of the resource, generally using a
// repository method such as target.(*Repository).listTargets.
// Use the input to set up any options, for example a limit
@ -58,7 +62,7 @@ func ExampleList() {
// next page.
// opts := target.Option{
// target.WithLimit(limit),
// target.WithStartPageAfterItem(prevPageLast.GetPublicId(), prevPageLast.GetUpdateTime()),
// target.WithStartPageAfterItem(prevPageLast),
// }
}
// Example result from the repository
@ -74,7 +78,7 @@ func ExampleList() {
{nil, 8},
{nil, 9},
{nil, 10},
}, nil
}, time.Now(), nil
}
estimatedCountFunc := func(ctx context.Context) (int, error) {
// Get an estimate from the database of the total number
@ -112,17 +116,222 @@ func ExampleList() {
// There are an estimated 1000 total items available
}
func ExampleListPage() {
grantsHash := []byte("hash-of-grants") // Acquired from authorization logic
pageSize := 10 // From request or service default
listToken, err := listtoken.NewPagination( // Normally from incoming request
context.Background(),
time.Now(),
resource.Unknown,
grantsHash,
"ttcp_1234567890",
time.Now().Add(-time.Hour),
)
filterItemFunc := func(ctx context.Context, item *ExampleResource) (bool, error) {
// Inspect item to determine whether we want to
// include it in the final list.
return item.Value < 5, nil
}
listItemsFunc := func(ctx context.Context, prevPageLast *ExampleResource, limit int) ([]*ExampleResource, time.Time, error) {
// Do the listing of the resource, generally using a
// repository method such as target.(*Repository).listTargets.
// Use the input to set up any options, for example a limit
// or a starting point.
if prevPageLast == nil {
// No previous page item means this is the first list request.
// List using limit.
// opts := target.Option{
// target.WithLimit(limit),
// }
} else {
// Use the previous page last item to start pagination from the
// next page.
// opts := target.Option{
// target.WithLimit(limit),
// target.WithStartPageAfterItem(prevPageLast),
// }
}
// Example result from the repository
return []*ExampleResource{
{nil, 0},
{nil, 1},
{nil, 2},
{nil, 3},
{nil, 4},
{nil, 5},
{nil, 6},
{nil, 7},
{nil, 8},
{nil, 9},
{nil, 10},
}, time.Now(), nil
}
estimatedCountFunc := func(ctx context.Context) (int, error) {
// Get an estimate from the database of the total number
// of entries for this resource, usually using some
// repository method.
return 1000, nil
}
resp, err := pagination.ListPage(
context.Background(),
grantsHash,
pageSize,
filterItemFunc,
listItemsFunc,
estimatedCountFunc,
listToken,
)
if err != nil {
fmt.Println("failed to paginate", err)
return
}
fmt.Println("Got results:")
for _, item := range resp.Items {
fmt.Printf("\tValue: %d\n", item.Value)
}
if resp.CompleteListing {
fmt.Println("Listing was complete")
} else {
fmt.Println("Listing was not complete")
}
fmt.Println("There are an estimated", resp.EstimatedItemCount, "total items available")
// Output: Got results:
// Value: 0
// Value: 1
// Value: 2
// Value: 3
// Value: 4
// Value: 0
// Value: 1
// Value: 2
// Value: 3
// Value: 4
// Listing was not complete
// There are an estimated 1000 total items available
}
func ExampleListRefresh() {
grantsHash := []byte("hash-of-grants") // Acquired from authorization logic
pageSize := 10 // From request or service default
refreshToken, err := refreshtoken.New( // Normally from incoming request
grantsHash := []byte("hash-of-grants") // Acquired from authorization logic
pageSize := 10 // From request or service default
listToken, err := listtoken.NewStartRefresh( // Normally from incoming request
context.Background(),
time.Now(),
resource.Unknown,
grantsHash,
time.Now().Add(-2*time.Hour),
time.Now().Add(-time.Hour),
)
if err != nil {
fmt.Println("failed to paginate", err)
return
}
filterItemFunc := func(ctx context.Context, item *ExampleResource) (bool, error) {
// Inspect item to determine whether we want to
// include it in the final list.
return item.Value < 5, nil
}
listItemsFunc := func(ctx context.Context, prevPageLast *ExampleResource, limit int) ([]*ExampleResource, time.Time, error) {
// Do the listing of the resource, generally using a
// repository method such as target.(*Repository).listTargets.
// Use the input to set up any options, for example a limit
// or a starting point.
if prevPageLast == nil {
// No previous page item means use the values from the refresh token.
// lastItem, _ := tok.LastItem()
// opts := target.Option{
// target.WithLimit(limit),
// target.WithStartPageAfterItem(lastItem),
// }
} else {
// Use the previous page last item to start pagination from the next page.
// opts := target.Option{
// target.WithLimit(limit),
// target.WithStartPageAfterItem(prevPageLast),
// }
}
// Example result from the repository
return []*ExampleResource{
{nil, 0},
{nil, 1},
{nil, 2},
{nil, 3},
{nil, 4},
{nil, 5},
{nil, 6},
{nil, 7},
{nil, 8},
{nil, 9},
{nil, 10},
}, time.Now(), nil
}
estimatedCountFunc := func(ctx context.Context) (int, error) {
// Get an estimate from the database of the total number
// of entries for this resource, usually using some
// repository method.
return 1000, nil
}
deletedIdsFunc := func(ctx context.Context, since time.Time) ([]string, time.Time, error) {
// Return IDs of resources that have been deleted since the provided timestamp.
return []string{"er_0123456789"}, time.Now(), nil
}
resp, err := pagination.ListRefresh(
context.Background(),
grantsHash,
pageSize,
filterItemFunc,
listItemsFunc,
estimatedCountFunc,
deletedIdsFunc,
listToken,
)
if err != nil {
fmt.Println("failed to paginate", err)
return
}
fmt.Println("Got results:")
for _, item := range resp.Items {
fmt.Printf("\tValue: %d\n", item.Value)
}
if resp.CompleteListing {
fmt.Println("Listing was complete")
} else {
fmt.Println("Listing was not complete")
}
fmt.Println("There are an estimated", resp.EstimatedItemCount, "total items available")
fmt.Println("The following resources have been deleted since we last saw them:")
for _, id := range resp.DeletedIds {
fmt.Println("\t" + id)
}
// Output: Got results:
// Value: 0
// Value: 1
// Value: 2
// Value: 3
// Value: 4
// Value: 0
// Value: 1
// Value: 2
// Value: 3
// Value: 4
// Listing was not complete
// There are an estimated 1000 total items available
// The following resources have been deleted since we last saw them:
// er_0123456789
}
func ExampleListRefreshPage() {
grantsHash := []byte("hash-of-grants") // Acquired from authorization logic
pageSize := 10 // From request or service default
listToken, err := listtoken.NewRefresh( // Normally from incoming request
context.Background(),
time.Now(),
resource.Unknown,
grantsHash,
"er_1234567890",
time.Now().Add(-time.Hour),
time.Now().Add(-2*time.Hour),
time.Now().Add(-3*time.Hour),
"ttcp_1234567890",
time.Now().Add(-4*time.Hour),
)
if err != nil {
fmt.Println("failed to paginate", err)
@ -133,22 +342,23 @@ func ExampleListRefresh() {
// include it in the final list.
return item.Value < 5, nil
}
listItemsFunc := func(ctx context.Context, tok *refreshtoken.Token, prevPageLast *ExampleResource, limit int) ([]*ExampleResource, error) {
listItemsFunc := func(ctx context.Context, prevPageLast *ExampleResource, limit int) ([]*ExampleResource, time.Time, error) {
// Do the listing of the resource, generally using a
// repository method such as target.(*Repository).listTargets.
// Use the input to set up any options, for example a limit
// or a starting point.
if prevPageLast == nil {
// No previous page item means use the values from the refresh token.
// lastItem, _ := tok.LastItem()
// opts := target.Option{
// target.WithLimit(limit),
// target.WithStartPageAfterItem(tok.GetPublicId(), tok.GetUpdateTime()),
// target.WithStartPageAfterItem(lastItem),
// }
} else {
// Use the previous page last item to start pagination from the next page.
// opts := target.Option{
// target.WithLimit(limit),
// target.WithStartPageAfterItem(prevPageLast.GetPublicId(), prevPageLast.GetUpdateTime()),
// target.WithStartPageAfterItem(prevPageLast),
// }
}
// Example result from the repository
@ -164,7 +374,7 @@ func ExampleListRefresh() {
{nil, 8},
{nil, 9},
{nil, 10},
}, nil
}, time.Now(), nil
}
estimatedCountFunc := func(ctx context.Context) (int, error) {
// Get an estimate from the database of the total number
@ -174,11 +384,18 @@ func ExampleListRefresh() {
}
deletedIdsFunc := func(ctx context.Context, since time.Time) ([]string, time.Time, error) {
// Return IDs of resources that have been deleted since the provided timestamp.
// Also return a timestamp for which this list was created, allowing the next
// invocation to start from the point where this invocation left off.
return []string{"er_0123456789"}, time.Now(), nil
}
resp, err := pagination.ListRefresh(context.Background(), grantsHash, pageSize, filterItemFunc, listItemsFunc, estimatedCountFunc, deletedIdsFunc, refreshToken)
resp, err := pagination.ListRefreshPage(
context.Background(),
grantsHash,
pageSize,
filterItemFunc,
listItemsFunc,
estimatedCountFunc,
deletedIdsFunc,
listToken,
)
if err != nil {
fmt.Println("failed to paginate", err)
return

@ -13,6 +13,7 @@ import (
// of pagination and sorting.
type Item interface {
GetPublicId() string
GetCreateTime() *timestamp.Timestamp
GetUpdateTime() *timestamp.Timestamp
GetResourceType() resource.Type
}

@ -9,7 +9,7 @@ import (
"github.com/hashicorp/boundary/internal/boundary"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/refreshtoken"
"github.com/hashicorp/boundary/internal/listtoken"
)
// ListResponse represents the response from the paginated list operation.
@ -27,19 +27,18 @@ type ListResponse[T boundary.Resource] struct {
// that it may be appropriate to wait some time before
// requesting additional pages.
CompleteListing bool
// RefreshToken is the token that the caller can use
// ListToken is the token that the caller can use
// to request a new page of items. The items in the
// new page will have been updated more recently
// than all the items in the previous page. This
// field may be empty if there were no results for a
// List call.
RefreshToken *refreshtoken.Token
ListToken *listtoken.Token
// DeletedIds contains a list of item IDs that have been
// deleted since the last request for items. This can happen
// both during the initial pagination or when requesting a
// refresh. This is always empty for the initial List call.
// deleted since the last request for items. This can only happen
// during a refresh pagination.
DeletedIds []string
// EstimatedItemCount is an estimate on exactly how many
// EstimatedItemCount is an estimate of exactly how many
// items matching the filter function are available. If
// a List call is complete, this number is equal to
// the number of items returned. Otherwise, the
@ -52,14 +51,11 @@ type ListResponse[T boundary.Resource] struct {
// result. Returning an error results in an error being returned from the pagination.
type ListFilterFunc[T boundary.Resource] func(ctx context.Context, item T) (bool, error)
// ListItemsFunc returns a slice of T that have been updated since prevPageLastItem.
// If prevPageLastItem is empty, it returns a slice of T starting with the least recently updated.
type ListItemsFunc[T boundary.Resource] func(ctx context.Context, prevPageLastItem T, limit int) ([]T, error)
// ListRefreshItemsFunc returns a slice of T that have been updated since prevPageLastItem.
// If prevPageLastItem is empty, it returns a slice of T that have been updated since the
// item in the refresh token.
type ListRefreshItemsFunc[T boundary.Resource] func(ctx context.Context, tok *refreshtoken.Token, prevPageLastItem T, limit int) ([]T, error)
// ListItemsFunc returns a slice of T that are ordered after prevPageLastItem according to
// the implementation of the function. If prevPageLastItem is empty, it should return
// a slice of T from the start, as defined by the function. It also returns the timestamp
// of the DB transaction used to list the items.
type ListItemsFunc[T boundary.Resource] func(ctx context.Context, prevPageLastItem T, limit int) ([]T, time.Time, error)
// EstimatedCountFunc is used to estimate the total number of items
// available for the resource that is being listed.
@ -67,17 +63,16 @@ type EstimatedCountFunc func(ctx context.Context) (int, error)
// ListDeletedIDsFunc is used to list the IDs of the resources deleted since
// the given timestamp. It returns a slice of IDs and the timestamp of the
// instant in which the slice was created.
// DB transaction used to list the IDs.
type ListDeletedIDsFunc func(ctx context.Context, since time.Time) ([]string, time.Time, error)
// List returns a ListResponse. The response will contain at most a
// number of items equal to the pageSize. Items are fetched using the
// listItemsFn and then items are checked using the filterItemFn
// to determine if they should be included in the response.
// The response includes a new refresh token based on the grants and items.
// List returns a ListResponse. The response will contain at most pageSize
// number of items. Items are fetched using the listItemsFn and checked using
// the filterItemFn to determine if they should be included in the response.
// The response includes a new list token used to continue pagination or refresh.
// The estimatedCountFn is used to provide an estimated total number of
// items that can be returned by making additional requests using the provided
// refresh token.
// items that can be returned by making additional requests using the returned
// list token.
func List[T boundary.Resource](
ctx context.Context,
grantsHash []byte,
@ -88,23 +83,20 @@ func List[T boundary.Resource](
) (*ListResponse[T], error) {
const op = "pagination.List"
if len(grantsHash) == 0 {
switch {
case len(grantsHash) == 0:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing grants hash")
}
if pageSize < 1 {
case pageSize < 1:
return nil, errors.New(ctx, errors.InvalidParameter, op, "page size must be at least 1")
}
if filterItemFn == nil {
case filterItemFn == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing filter item callback")
}
if listItemsFn == nil {
case listItemsFn == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing list items callback")
}
if estimatedCountFn == nil {
case estimatedCountFn == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing estimated count callback")
}
items, completeListing, err := list(ctx, pageSize, filterItemFn, listItemsFn)
items, completeListing, listTime, err := list(ctx, pageSize, filterItemFn, listItemsFn)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
@ -115,6 +107,38 @@ func List[T boundary.Resource](
EstimatedItemCount: len(items),
}
if len(items) > 0 {
lastItem := items[len(items)-1]
if completeListing {
// If this is the only page in the pagination, create a
// start refresh token so subsequent requests are informed
// that they need to start a new refresh phase.
resp.ListToken, err = listtoken.NewStartRefresh(
ctx,
listTime, // Use list time as the create time of the token
lastItem.GetResourceType(),
grantsHash,
listTime, // Use list time as the starting point for listing deleted ids
listTime, // Use list time as the lower bound for subsequent refresh
)
if err != nil {
return nil, err
}
} else {
resp.ListToken, err = listtoken.NewPagination(
ctx,
listTime, // Use list time as the create time of the token
lastItem.GetResourceType(),
grantsHash,
lastItem.GetPublicId(),
lastItem.GetCreateTime().AsTime(),
)
if err != nil {
return nil, err
}
}
}
if !completeListing {
// If this was not a complete listing, get an estimate
// of the total items from the DB.
@ -125,66 +149,201 @@ func List[T boundary.Resource](
}
}
if len(items) > 0 {
resp.RefreshToken = refreshtoken.FromResource(items[len(items)-1], grantsHash)
return resp, nil
}
// ListPage returns a ListResponse. The response will contain at most pageSize
// number of items. Items are fetched using the listItemsFn and checked using
// the filterItemFn to determine if they should be included in the response.
// Items will be fetched based on the contents of the list token. The list
// token must contain a PaginationToken component.
// The response includes a new list token used to continue pagination or refresh.
// The estimatedCountFn is used to provide an estimated total number of
// items that can be returned by making additional requests using the returned
// list token.
func ListPage[T boundary.Resource](
ctx context.Context,
grantsHash []byte,
pageSize int,
filterItemFn ListFilterFunc[T],
listItemsFn ListItemsFunc[T],
estimatedCountFn EstimatedCountFunc,
tok *listtoken.Token,
) (*ListResponse[T], error) {
const op = "pagination.ListPage"
switch {
case len(grantsHash) == 0:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing grants hash")
case pageSize < 1:
return nil, errors.New(ctx, errors.InvalidParameter, op, "page size must be at least 1")
case filterItemFn == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing filter item callback")
case listItemsFn == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing list items callback")
case estimatedCountFn == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing estimated count callback")
case tok == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing list token")
}
if _, ok := tok.Subtype.(*listtoken.PaginationToken); !ok {
return nil, errors.New(ctx, errors.InvalidParameter, op, "token did not have a pagination token component")
}
items, completeListing, listTime, err := list(ctx, pageSize, filterItemFn, listItemsFn)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
resp := &ListResponse[T]{
Items: items,
CompleteListing: completeListing,
ListToken: tok,
}
resp.EstimatedItemCount, err = estimatedCountFn(ctx)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
var lastItem boundary.Resource
if len(items) > 0 {
lastItem = items[len(items)-1]
}
if err := resp.ListToken.Transition(
ctx,
completeListing,
lastItem,
time.Time{}, // We have no deleted ids time
listTime,
); err != nil {
return nil, errors.Wrap(ctx, err, op)
}
return resp, nil
}
// ListRefresh returns a ListResponse. The response will contain at most a
// number of items equal to the pageSize. Items are fetched using the
// listRefreshItemsFn and then items are checked using the filterItemFn
// to determine if they should be included in the response.
// The response includes a new refresh token based on the grants and items.
// ListRefresh returns a ListResponse. The response will contain at most pageSize
// number of items. Items are fetched using the listItemsFn and checked using
// the filterItemFn to determine if they should be included in the response.
// Items will be fetched based on the contents of the list token. The list
// token must contain a StartRefreshToken component.
// The response includes a new list token used to continue pagination or refresh.
// The estimatedCountFn is used to provide an estimated total number of
// items that can be returned by making additional requests using the provided
// refresh token. The listDeletedIDsFn is used to list the IDs of any
// resources that have been deleted since the refresh token was last used.
// items that can be returned by making additional requests using the returned
// list token. The listDeletedIDsFn is used to list the IDs of any
// resources that have been deleted since the list token was last used.
func ListRefresh[T boundary.Resource](
ctx context.Context,
grantsHash []byte,
pageSize int,
filterItemFn ListFilterFunc[T],
listRefreshItemsFn ListRefreshItemsFunc[T],
listItemsFn ListItemsFunc[T],
estimatedCountFn EstimatedCountFunc,
listDeletedIDsFn ListDeletedIDsFunc,
tok *refreshtoken.Token,
tok *listtoken.Token,
) (*ListResponse[T], error) {
const op = "pagination.ListRefresh"
if len(grantsHash) == 0 {
switch {
case len(grantsHash) == 0:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing grants hash")
}
if pageSize < 1 {
case pageSize < 1:
return nil, errors.New(ctx, errors.InvalidParameter, op, "page size must be at least 1")
}
if filterItemFn == nil {
case filterItemFn == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing filter item callback")
case listItemsFn == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing list items callback")
case estimatedCountFn == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing estimated count callback")
case listDeletedIDsFn == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing list deleted IDs callback")
case tok == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing list token")
}
if listRefreshItemsFn == nil {
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing list refresh items callback")
srt, ok := tok.Subtype.(*listtoken.StartRefreshToken)
if !ok {
return nil, errors.New(ctx, errors.InvalidParameter, op, "token did not have a start-refresh token component")
}
if estimatedCountFn == nil {
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing estimated count callback")
deletedIds, deletedIdsTime, err := listDeletedIDsFn(ctx, srt.PreviousDeletedIdsTime)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
if listDeletedIDsFn == nil {
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing list deleted IDs callback")
items, completeListing, listTime, err := list(ctx, pageSize, filterItemFn, listItemsFn)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
if tok == nil {
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing refresh token")
resp := &ListResponse[T]{
Items: items,
CompleteListing: completeListing,
DeletedIds: deletedIds,
ListToken: tok,
}
deletedIds, transactionTimestamp, err := listDeletedIDsFn(ctx, tok.UpdatedTime)
resp.EstimatedItemCount, err = estimatedCountFn(ctx)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
var lastItem boundary.Resource
if len(items) > 0 {
lastItem = items[len(items)-1]
}
if err := resp.ListToken.Transition(ctx, completeListing, lastItem, deletedIdsTime, listTime); err != nil {
return nil, errors.Wrap(ctx, err, op)
}
return resp, nil
}
// ListRefreshPage returns a ListResponse. The response will contain at most pageSize
// number of items. Items are fetched using the listItemsFn and checked using
// the filterItemFn to determine if they should be included in the response.
// Items will be fetched based on the contents of the list token. The list
// token must contain a RefreshToken component.
// The response includes a new list token used to continue pagination or refresh.
// The estimatedCountFn is used to provide an estimated total number of
// items that can be returned by making additional requests using the returned
// list token. The listDeletedIDsFn is used to list the IDs of any
// resources that have been deleted since the list token was last used.
func ListRefreshPage[T boundary.Resource](
ctx context.Context,
grantsHash []byte,
pageSize int,
filterItemFn ListFilterFunc[T],
listItemsFn ListItemsFunc[T],
estimatedCountFn EstimatedCountFunc,
listDeletedIDsFn ListDeletedIDsFunc,
tok *listtoken.Token,
) (*ListResponse[T], error) {
const op = "pagination.ListRefreshPage"
switch {
case len(grantsHash) == 0:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing grants hash")
case pageSize < 1:
return nil, errors.New(ctx, errors.InvalidParameter, op, "page size must be at least 1")
case filterItemFn == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing filter item callback")
case listItemsFn == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing list items callback")
case estimatedCountFn == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing estimated count callback")
case listDeletedIDsFn == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing list deleted IDs callback")
case tok == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing list token")
}
rt, ok := tok.Subtype.(*listtoken.RefreshToken)
if !ok {
return nil, errors.New(ctx, errors.InvalidParameter, op, "token did not have a refresh token component")
}
listItemsFn := func(ctx context.Context, prevPageLast T, limit int) ([]T, error) {
return listRefreshItemsFn(ctx, tok, prevPageLast, limit)
deletedIds, deletedIdsTime, err := listDeletedIDsFn(ctx, rt.PreviousDeletedIdsTime)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
items, completeListing, err := list(ctx, pageSize, filterItemFn, listItemsFn)
items, completeListing, listTime, err := list(ctx, pageSize, filterItemFn, listItemsFn)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
@ -193,6 +352,7 @@ func ListRefresh[T boundary.Resource](
Items: items,
CompleteListing: completeListing,
DeletedIds: deletedIds,
ListToken: tok,
}
resp.EstimatedItemCount, err = estimatedCountFn(ctx)
@ -200,12 +360,13 @@ func ListRefresh[T boundary.Resource](
return nil, errors.Wrap(ctx, err, op)
}
var lastItem boundary.Resource
if len(items) > 0 {
resp.RefreshToken = tok.RefreshLastItem(items[len(items)-1], transactionTimestamp)
} else {
resp.RefreshToken = tok.Refresh(transactionTimestamp)
lastItem = items[len(items)-1]
}
if err := resp.ListToken.Transition(ctx, completeListing, lastItem, deletedIdsTime, listTime); err != nil {
return nil, errors.Wrap(ctx, err, op)
}
return resp, nil
}
@ -214,23 +375,29 @@ func list[T boundary.Resource](
pageSize int,
filterItemFn ListFilterFunc[T],
listItemsFn ListItemsFunc[T],
) ([]T, bool, error) {
) ([]T, bool, time.Time, error) {
const op = "pagination.list"
var lastItem T
var firstListTime time.Time
limit := pageSize + 1
items := make([]T, 0, limit)
dbLoop:
for {
// Request another page from the DB until we fill the final items
page, err := listItemsFn(ctx, lastItem, limit)
page, listTime, err := listItemsFn(ctx, lastItem, limit)
if err != nil {
return nil, false, errors.Wrap(ctx, err, op)
return nil, false, time.Time{}, errors.Wrap(ctx, err, op)
}
// Assign the firstListTime once, to ensure we always store the start of listing,
// rather the timestamp of the last listing.
if firstListTime.IsZero() {
firstListTime = listTime
}
for _, item := range page {
ok, err := filterItemFn(ctx, item)
if err != nil {
return nil, false, errors.Wrap(ctx, err, op)
return nil, false, time.Time{}, errors.Wrap(ctx, err, op)
}
if ok {
items = append(items, item)
@ -256,5 +423,5 @@ dbLoop:
items = items[:pageSize]
}
return items, completeListing, nil
return items, completeListing, firstListTime, nil
}

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save