diff --git a/internal/host/plugin/job_set_sync.go b/internal/host/plugin/job_set_sync.go index be49d0fb5b..4a3118bc8e 100644 --- a/internal/host/plugin/job_set_sync.go +++ b/internal/host/plugin/job_set_sync.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/boundary/internal/errors" "github.com/hashicorp/boundary/internal/kms" "github.com/hashicorp/boundary/internal/libs/endpoint" + "github.com/hashicorp/boundary/internal/observability/event" "github.com/hashicorp/boundary/internal/oplog" "github.com/hashicorp/boundary/internal/scheduler" hcpb "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/hostcatalogs" @@ -104,14 +105,11 @@ func (r *SetSyncJob) Run(ctx context.Context) error { // Set numProcessed and numHosts for status report r.numProcessed, r.numSets = 0, len(setAggs) if len(setAggs) == 0 { + // Nothing to do, return early return nil } - setIds := make([]string, 0, len(setAggs)) - for _, sa := range setAggs { - setIds = append(setIds, sa.PublicId) - } - return r.syncSets(ctx, setIds) + return r.syncSets(ctx, setAggs) } // NextRunIn queries the plugin host set db to determine when the next set should be synced. @@ -185,17 +183,8 @@ func nextSync(ctx context.Context, j scheduler.Job) (time.Duration, error) { // syncSets retrieves from their plugins all the host and membership information // for the provided host sets and updates their values in the database. -func (r *SetSyncJob) syncSets(ctx context.Context, setIds []string) error { +func (r *SetSyncJob) syncSets(ctx context.Context, setAggs []*hostSetAgg) error { const op = "plugin.(SetSyncJob).syncSets" - if len(setIds) == 0 { - return errors.New(ctx, errors.InvalidParameter, op, "no set ids") - } - - // First, look up the sets corresponding to the set IDs - var setAggs []*hostSetAgg - if err := r.reader.SearchWhere(ctx, &setAggs, "public_id in (?)", []interface{}{setIds}); err != nil { - return errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("can't retrieve sets %v", setIds))) - } if len(setAggs) == 0 { return nil } @@ -296,28 +285,15 @@ func (r *SetSyncJob) syncSets(ctx context.Context, setIds []string) error { Persisted: ci.persisted, }) if err != nil { - return errors.Wrap(ctx, err, op) + event.WriteError(ctx, op, err, event.WithInfoMsg("listing hosts", "catalog id", ci.publicId)) + r.numProcessed += len(catSetIds) + continue } if _, err := r.upsertAndCleanHosts(ctx, ci.storeCat, catSetIds, resp.GetHosts()); err != nil { - return errors.Wrap(ctx, err, op, errors.WithMsg("upserting hosts")) - } - - updateSyncDataQuery := ` -update host_plugin_set -set - last_sync_time = current_timestamp, - need_sync = false -where public_id in (?) -` - - // update last sync time on the sets - i, err := r.writer.Exec(ctx, updateSyncDataQuery, []interface{}{catSetIds}) - if err != nil { - return errors.Wrap(ctx, err, op, errors.WithMsg("updating last sync time")) - } - if i != len(catSetIds) { - return errors.New(ctx, errors.Internal, op, "mismatched number of sets updated and synced") + event.WriteError(ctx, op, err, event.WithInfoMsg("upserting hosts", "catalog id", ci.publicId)) + r.numProcessed += len(catSetIds) + continue } r.numProcessed += len(catSetIds) } @@ -354,9 +330,6 @@ func (r *SetSyncJob) upsertAndCleanHosts( if hc.GetScopeId() == "" { return nil, errors.New(ctx, errors.InvalidParameter, op, "no scope id") } - if setIds == nil { - return nil, errors.New(ctx, errors.InvalidParameter, op, "nil sets") - } if len(setIds) == 0 { // At least one must have been given to the plugin return nil, errors.New(ctx, errors.InvalidParameter, op, "empty sets") } @@ -507,10 +480,10 @@ func (r *SetSyncJob) upsertAndCleanHosts( } // Now, check set membership changes - setMembershipsToAdd, setMembershipsToRemove, allSetIds := getSetChanges(currentHostMap, newHostMap) + setMembershipsToAdd, setMembershipsToRemove := getSetChanges(currentHostMap, newHostMap) - // Iterate through the sets and update memberships, one transaction per set - for setId := range allSetIds { + // Iterate through all sets and update memberships, one transaction per set + for _, setId := range setIds { hs, err := NewHostSet(ctx, hc.PublicId) if err != nil { return nil, errors.Wrap(ctx, err, op) @@ -524,11 +497,6 @@ func (r *SetSyncJob) upsertAndCleanHosts( func(r db.Reader, w db.Writer) error { msgs := make([]*oplog.Message, 0) - ticket, err := w.GetTicket(ctx, hs) - if err != nil { - return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get ticket")) - } - // Perform additions for _, hostId := range setMembershipsToAdd[hs.PublicId] { membership, err := NewHostSetMember(ctx, hs.PublicId, hostId) @@ -561,16 +529,33 @@ func (r *SetSyncJob) upsertAndCleanHosts( msgs = append(msgs, &hOplogMsg) } - metadata := hc.oplog(oplog.OpType_OP_TYPE_UPDATE) - if err := w.WriteOplogEntryWith(ctx, oplogWrapper, ticket, metadata, msgs); err != nil { - return errors.Wrap(ctx, err, op, errors.WithMsg("unable to write oplog")) + // Oplog + if len(msgs) > 0 { + ticket, err := w.GetTicket(ctx, hs) + if err != nil { + return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get ticket")) + } + + metadata := hc.oplog(oplog.OpType_OP_TYPE_UPDATE) + if err := w.WriteOplogEntryWith(ctx, oplogWrapper, ticket, metadata, msgs); err != nil { + return errors.Wrap(ctx, err, op, errors.WithMsg("unable to write oplog")) + } + } + + // Update last sync time + numRows, err := w.Exec(ctx, updateSyncDataQuery, []interface{}{setId}) + if err != nil { + return errors.Wrap(ctx, err, op, errors.WithMsg("updating last sync time")) + } + if numRows != 1 { + return errors.New(ctx, errors.Internal, op, fmt.Sprintf("host set (%v) synced, but failed to update repo", setId)) } return nil }, ) if err != nil { - return nil, errors.Wrap(ctx, err, op) + event.WriteError(ctx, op, err, event.WithInfoMsg("failed to update membership", "set id", setId)) } } diff --git a/internal/host/plugin/query.go b/internal/host/plugin/query.go index 700a126cc5..0ebe625275 100644 --- a/internal/host/plugin/query.go +++ b/internal/host/plugin/query.go @@ -42,5 +42,13 @@ need_sync sync_interval_seconds is null and last_sync_time <= wt_add_seconds_to_now(?) or sync_interval_seconds > 0 and wt_add_seconds(sync_interval_seconds, last_sync_time) <= current_timestamp +` + + updateSyncDataQuery = ` +update host_plugin_set +set + last_sync_time = current_timestamp, + need_sync = false +where public_id = ? ` ) diff --git a/internal/host/plugin/repository_host_util.go b/internal/host/plugin/repository_host_util.go index 16c1e23af2..d272947a0a 100644 --- a/internal/host/plugin/repository_host_util.go +++ b/internal/host/plugin/repository_host_util.go @@ -200,7 +200,6 @@ func getSetChanges( currentHostMap map[string]*Host, newHostMap map[string]*hostInfo) ( setMembershipsToAdd, setMembershipsToRemove map[string][]string, - allSetIds map[string]struct{}, ) { // First, find sets that hosts should be added to: hosts that are new or // have new set IDs returned. @@ -224,11 +223,7 @@ func getSetChanges( if setMembershipsToAdd == nil { setMembershipsToAdd = make(map[string][]string) } - if allSetIds == nil { - allSetIds = make(map[string]struct{}) - } setMembershipsToAdd[setToAdd] = append(setMembershipsToAdd[setToAdd], newHostId) - allSetIds[setToAdd] = struct{}{} } } @@ -254,11 +249,7 @@ func getSetChanges( if setMembershipsToRemove == nil { setMembershipsToRemove = make(map[string][]string) } - if allSetIds == nil { - allSetIds = make(map[string]struct{}) - } setMembershipsToRemove[setToRemove] = append(setMembershipsToRemove[setToRemove], currentHostId) - allSetIds[setToRemove] = struct{}{} } } diff --git a/internal/host/plugin/repository_host_util_test.go b/internal/host/plugin/repository_host_util_test.go index fe852492d6..9668b6ad3a 100644 --- a/internal/host/plugin/repository_host_util_test.go +++ b/internal/host/plugin/repository_host_util_test.go @@ -303,7 +303,7 @@ func TestUtilFunctions(t *testing.T) { // Run through the sets function { newHostMap[baseHostId].h.SetIds = tt.sets() - toAdd, toRemove, _ := getSetChanges(currentHostMap, newHostMap) + toAdd, toRemove := getSetChanges(currentHostMap, newHostMap) assert.Equal(tt.setsToAdd, toAdd) assert.Equal(tt.setsToRemove, toRemove) }