You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
boundary/internal/host/plugin/job_set_sync_test.go

538 lines
14 KiB

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package plugin
import (
"context"
"fmt"
"testing"
"time"
"github.com/hashicorp/boundary/internal/db"
"github.com/hashicorp/boundary/internal/db/timestamp"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/iam"
"github.com/hashicorp/boundary/internal/kms"
hostplg "github.com/hashicorp/boundary/internal/plugin/host"
"github.com/hashicorp/boundary/internal/scheduler"
plgpb "github.com/hashicorp/boundary/sdk/pbs/plugin"
assertpkg "github.com/stretchr/testify/assert"
requirepkg "github.com/stretchr/testify/require"
)
func TestNewSetSyncJob(t *testing.T) {
t.Parallel()
ctx := context.Background()
conn, _ := db.TestSetup(t, "postgres")
rw := db.New(conn)
wrapper := db.TestWrapper(t)
kmsCache := kms.TestKms(t, conn, wrapper)
plg := hostplg.TestPlugin(t, conn, "lookup")
plgm := map[string]plgpb.HostPluginServiceClient{
plg.GetPublicId(): NewWrappingPluginClient(&TestPluginServer{}),
}
type args struct {
r db.Reader
w db.Writer
kms *kms.Kms
plgm map[string]plgpb.HostPluginServiceClient
}
tests := []struct {
name string
args args
options []Option
wantLimit int
wantErr bool
wantErrCode errors.Code
}{
{
name: "nil reader",
args: args{
w: rw,
kms: kmsCache,
plgm: plgm,
},
wantErr: true,
wantErrCode: errors.InvalidParameter,
},
{
name: "nil writer",
args: args{
r: rw,
kms: kmsCache,
plgm: plgm,
},
wantErr: true,
wantErrCode: errors.InvalidParameter,
},
{
name: "nil kms",
args: args{
r: rw,
w: rw,
plgm: plgm,
},
wantErr: true,
wantErrCode: errors.InvalidParameter,
},
{
name: "nil plgm",
args: args{
r: rw,
w: rw,
kms: kmsCache,
},
wantErr: true,
wantErrCode: errors.InvalidParameter,
},
{
name: "valid-no-options",
args: args{
r: rw,
w: rw,
kms: kmsCache,
plgm: plgm,
},
wantLimit: db.DefaultLimit,
},
{
name: "valid-with-limit",
args: args{
r: rw,
w: rw,
kms: kmsCache,
plgm: plgm,
},
options: []Option{WithLimit(100)},
wantLimit: 100,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert, require := assertpkg.New(t), requirepkg.New(t)
got, err := newSetSyncJob(ctx, tt.args.r, tt.args.w, tt.args.kms, tt.args.plgm, tt.options...)
if tt.wantErr {
require.Error(err)
assert.Nil(got)
assert.Truef(errors.Match(errors.T(tt.wantErrCode), err), "Unexpected error %s", err)
return
}
require.NoError(err)
require.NotNil(got)
assert.Equal(tt.args.r, got.reader)
assert.Equal(tt.args.w, got.writer)
assert.Equal(tt.args.kms, got.kms)
assert.Equal(tt.wantLimit, got.limit)
})
}
}
func TestSetSyncJob_Run(t *testing.T) {
t.Parallel()
assert, require := assertpkg.New(t), requirepkg.New(t)
ctx := context.Background()
conn, _ := db.TestSetup(t, "postgres")
rw := db.New(conn)
wrapper := db.TestWrapper(t)
kmsCache := kms.TestKms(t, conn, wrapper)
sched := scheduler.TestScheduler(t, conn, wrapper)
plgServer := &TestPluginServer{}
plg := hostplg.TestPlugin(t, conn, "run")
plgm := map[string]plgpb.HostPluginServiceClient{
plg.GetPublicId(): NewWrappingPluginClient(plgServer),
}
r, err := newSetSyncJob(ctx, rw, rw, kmsCache, plgm)
require.NoError(err)
sche := scheduler.TestScheduler(t, conn, wrapper)
err = sche.RegisterJob(context.Background(), r)
require.NoError(err)
err = r.Run(context.Background())
require.NoError(err)
// No sets should have been synced.
assert.Equal(0, r.numProcessed)
_, prj := iam.TestScopes(t, iam.TestRepo(t, conn, wrapper))
cat := TestCatalog(t, conn, prj.GetPublicId(), plg.GetPublicId())
plgServer.ListHostsFn = func(_ context.Context, _ *plgpb.ListHostsRequest) (*plgpb.ListHostsResponse, error) {
return &plgpb.ListHostsResponse{}, nil
}
// Start with a set with a member that should be removed
setToRemoveHosts := TestSet(t, conn, kmsCache, sched, cat, plgm)
hostToRemove := TestHost(t, conn, cat.GetPublicId(), "remove this host")
TestSetMembers(t, conn, setToRemoveHosts.GetPublicId(), []*Host{hostToRemove})
// Run sync again with the newly created set
err = r.Run(context.Background())
require.NoError(err)
hsa := &hostSetAgg{PublicId: setToRemoveHosts.GetPublicId()}
require.NoError(rw.LookupByPublicId(ctx, hsa))
assert.Greater(hsa.LastSyncTime.AsTime().UnixNano(), hsa.CreateTime.AsTime().UnixNano())
hs, err := hsa.toHostSet(ctx)
require.NoError(err)
assert.Len(hs.HostIds, 0)
_, err = rw.Delete(ctx, hostToRemove)
require.NoError(err)
set1 := TestSet(t, conn, kmsCache, sched, cat, plgm)
counter := new(uint32)
plgServer.ListHostsFn = func(ctx context.Context, req *plgpb.ListHostsRequest) (*plgpb.ListHostsResponse, error) {
assert.GreaterOrEqual(1, len(req.GetSets()))
var setIds []string
for _, s := range req.GetSets() {
setIds = append(setIds, s.GetId())
}
*counter += 1
return &plgpb.ListHostsResponse{
Hosts: []*plgpb.ListHostsResponseHost{
{
ExternalId: "first",
IpAddresses: []string{fmt.Sprintf("10.0.0.%d", *counter)},
DnsNames: []string{"foo.com"},
SetIds: setIds,
},
},
}, nil
}
hostRepo, err := NewRepository(rw, rw, kmsCache, sche, plgm)
require.NoError(err)
hsa = &hostSetAgg{PublicId: set1.GetPublicId()}
require.NoError(rw.LookupByPublicId(ctx, hsa))
assert.Less(hsa.LastSyncTime.AsTime().UnixNano(), hsa.CreateTime.AsTime().UnixNano())
// Run sync again with the newly created set
err = r.Run(context.Background())
require.NoError(err)
// The single existing set should have been processed
assert.Equal(1, r.numSets)
assert.Equal(1, r.numProcessed)
// Check the version number of the host(s)
hosts, _, err := hostRepo.ListHostsByCatalogId(ctx, hsa.CatalogId)
require.NoError(err)
assert.Len(hosts, 1)
for _, host := range hosts {
assert.Equal(uint32(1), host.Version)
}
require.NoError(rw.LookupByPublicId(ctx, hsa))
assert.Greater(hsa.LastSyncTime.AsTime().UnixNano(), hsa.CreateTime.AsTime().UnixNano())
assert.False(hsa.NeedSync)
firstSyncTime := hsa.LastSyncTime
// Run sync again with the freshly synced set
err = r.Run(context.Background())
require.NoError(err)
assert.Equal(0, r.numSets)
assert.Equal(0, r.numProcessed)
// Set needs update
hs, err = hsa.toHostSet(ctx)
require.NoError(err)
hs.NeedSync = true
count, err := rw.Update(ctx, hs, []string{"NeedSync"}, nil)
require.NoError(err)
assert.Equal(1, count)
assert.True(hs.NeedSync)
// Run sync again with the set needing update
err = r.Run(context.Background())
require.NoError(err)
// The single existing set should have been processed
assert.Equal(1, r.numSets)
assert.Equal(1, r.numProcessed)
// Check the version number of the host(s) again
hosts, _, err = hostRepo.ListHostsByCatalogId(ctx, hsa.CatalogId)
require.NoError(err)
assert.Len(hosts, 1)
for _, host := range hosts {
assert.Equal(uint32(2), host.Version)
}
// Run sync with a new second set
_ = TestSet(t, conn, kmsCache, sched, cat, plgm)
require.NoError(r.Run(context.Background()))
assert.Equal(1, r.numSets)
assert.Equal(1, r.numProcessed)
require.NoError(rw.LookupByPublicId(ctx, hs))
assert.Greater(hs.GetLastSyncTime().AsTime().UnixNano(), firstSyncTime.AsTime().UnixNano())
assert.False(hs.GetNeedSync())
// Now, run a battery of tests with values for SyncIntervalSeconds
type setArgs struct {
syncIntervalSeconds int32
lastSyncTime *timestamp.Timestamp
needsSync bool
}
tests := []struct {
name string
setArgs setArgs
expectSync bool
}{
{
name: "never-synced-before-needs-sync-false",
setArgs: setArgs{
lastSyncTime: timestamp.New(time.Unix(0, 0)),
needsSync: false,
},
expectSync: true,
},
{
name: "never-synced-before-needs-sync-true",
setArgs: setArgs{
lastSyncTime: timestamp.New(time.Unix(0, 0)),
needsSync: true,
},
expectSync: true,
},
{
name: "never-synced-before-sync-disabled",
setArgs: setArgs{
syncIntervalSeconds: -1,
lastSyncTime: timestamp.New(time.Unix(0, 0)),
needsSync: true,
},
expectSync: true,
},
{
name: "synced-just-now",
setArgs: setArgs{
lastSyncTime: timestamp.Now(),
needsSync: false,
},
expectSync: false,
},
{
name: "synced-just-now-need-sync",
setArgs: setArgs{
lastSyncTime: timestamp.Now(),
needsSync: true,
},
expectSync: true,
},
{
name: "synced-just-now-need-sync-but-sync-disabled",
setArgs: setArgs{
syncIntervalSeconds: -1,
lastSyncTime: timestamp.Now(),
needsSync: true,
},
expectSync: true,
},
{
name: "synced-30-seconds-ago-default-time",
setArgs: setArgs{
lastSyncTime: timestamp.New(time.Now().Add(-60 * time.Second)),
needsSync: false,
},
expectSync: false,
},
{
name: "synced-30-seconds-ago-custom-time",
setArgs: setArgs{
syncIntervalSeconds: 5,
lastSyncTime: timestamp.New(time.Now().Add(-60 * time.Second)),
needsSync: false,
},
expectSync: true,
},
{
name: "synced-30-seconds-ago-custom-larger-time",
setArgs: setArgs{
syncIntervalSeconds: 90,
lastSyncTime: timestamp.New(time.Now().Add(-60 * time.Second)),
needsSync: false,
},
expectSync: false,
},
{
name: "synced-30-seconds-ago-custom-larger-time-need-sync",
setArgs: setArgs{
syncIntervalSeconds: 60,
lastSyncTime: timestamp.New(time.Now().Add(-60 * time.Second)),
needsSync: true,
},
expectSync: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert, require := assertpkg.New(t), requirepkg.New(t)
// Update set
hs.LastSyncTime = tt.setArgs.lastSyncTime
hs.NeedSync = tt.setArgs.needsSync
hs.SyncIntervalSeconds = tt.setArgs.syncIntervalSeconds
fieldMaskPaths := []string{"LastSyncTime", "NeedSync"}
var setToNullPaths []string
if hs.SyncIntervalSeconds == 0 {
setToNullPaths = []string{"SyncIntervalSeconds"}
} else {
fieldMaskPaths = append(fieldMaskPaths, "SyncIntervalSeconds")
}
count, err := rw.Update(ctx, hs, fieldMaskPaths, setToNullPaths)
require.NoError(err)
assert.Equal(1, count)
// Run job
err = r.Run(context.Background())
require.NoError(err)
// Validate results
var expNum int
if tt.expectSync {
expNum = 1
}
assert.Equal(expNum, r.numSets)
assert.Equal(expNum, r.numProcessed)
})
}
}
func TestSetSyncJob_NextRunIn(t *testing.T) {
t.Parallel()
ctx := context.Background()
conn, _ := db.TestSetup(t, "postgres")
rw := db.New(conn)
wrapper := db.TestWrapper(t)
sched := scheduler.TestScheduler(t, conn, wrapper)
kmsCache := kms.TestKms(t, conn, wrapper)
iamRepo := iam.TestRepo(t, conn, wrapper)
_, prj := iam.TestScopes(t, iamRepo)
plg := hostplg.TestPlugin(t, conn, "lookup")
plgm := map[string]plgpb.HostPluginServiceClient{
plg.GetPublicId(): NewWrappingPluginClient(&TestPluginServer{}),
}
catalog := TestCatalog(t, conn, prj.PublicId, plg.GetPublicId())
hostSet := TestSet(t, conn, kmsCache, sched, catalog, plgm)
type setArgs struct {
syncIntervalSeconds int32
lastSyncTime *timestamp.Timestamp
needsSync bool
}
tests := []struct {
name string
syncInterval time.Duration
setArgs setArgs
want time.Duration
}{
{
name: "never-synced-before",
setArgs: setArgs{
lastSyncTime: timestamp.New(time.Unix(0, 0)),
needsSync: false,
},
want: 0,
},
{
name: "never-synced-before-with-sync-interval",
setArgs: setArgs{
syncIntervalSeconds: 60,
lastSyncTime: timestamp.New(time.Unix(0, 0)),
needsSync: false,
},
want: 0,
},
{
name: "synced-just-now",
setArgs: setArgs{
lastSyncTime: timestamp.Now(),
needsSync: false,
},
want: setSyncJobRunInterval,
},
{
name: "synced-just-now-with-sync-interval",
setArgs: setArgs{
syncIntervalSeconds: 180,
lastSyncTime: timestamp.Now(),
needsSync: false,
},
want: 3 * time.Minute,
},
{
name: "synced-just-now-need-sync",
setArgs: setArgs{
lastSyncTime: timestamp.Now(),
needsSync: true,
},
want: 0,
},
{
name: "synced-just-now-need-sync-with-sync-interval",
setArgs: setArgs{
syncIntervalSeconds: 60,
lastSyncTime: timestamp.Now(),
needsSync: true,
},
want: 0,
},
{
name: "synced-a-bit-ago",
setArgs: setArgs{
lastSyncTime: timestamp.New(time.Now().Add(-4 * time.Minute)),
needsSync: false,
},
want: time.Until(time.Now().Add(setSyncJobRunInterval - (4 * time.Minute))),
},
{
name: "synced-a-bit-ago-with-sync-interval",
setArgs: setArgs{
syncIntervalSeconds: 300,
lastSyncTime: timestamp.New(time.Now().Add(-4 * time.Minute)),
needsSync: false,
},
want: time.Minute,
},
{
name: "automatic-sync-disabled",
setArgs: setArgs{
syncIntervalSeconds: -1,
lastSyncTime: timestamp.New(time.Now().Add(-4 * time.Minute)),
needsSync: false,
},
want: setSyncJobRunInterval,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert, require := assertpkg.New(t), requirepkg.New(t)
r, err := newSetSyncJob(ctx, rw, rw, kmsCache, plgm)
assert.NoError(err)
require.NotNil(r)
hostSet.NeedSync = tt.setArgs.needsSync
hostSet.LastSyncTime = tt.setArgs.lastSyncTime
hostSet.SyncIntervalSeconds = tt.setArgs.syncIntervalSeconds
fieldMaskPaths := []string{"LastSyncTime", "NeedSync"}
var setToNullPaths []string
if hostSet.SyncIntervalSeconds == 0 {
setToNullPaths = []string{"SyncIntervalSeconds"}
} else {
fieldMaskPaths = append(fieldMaskPaths, "SyncIntervalSeconds")
}
_, err = rw.Update(ctx, hostSet, fieldMaskPaths, setToNullPaths)
require.NoError(err)
got, err := r.NextRunIn(context.Background())
require.NoError(err)
// Round to five seconds to account for lost time between updating set and determining next run
assert.Equal(tt.want.Round(5*time.Second), got.Round(5*time.Second))
})
}
}