bug(host): Fix sync job failure blocking other set syncs (#1841)

During the plugin_host_set_sync job, a single set failing to  sync
can cause all following syncs to fail.
pull/1847/head
Louis Ruch 4 years ago committed by GitHub
parent 2f7b4e5451
commit aaaf0241ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/kms"
"github.com/hashicorp/boundary/internal/libs/endpoint"
"github.com/hashicorp/boundary/internal/observability/event"
"github.com/hashicorp/boundary/internal/oplog"
"github.com/hashicorp/boundary/internal/scheduler"
hcpb "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/hostcatalogs"
@ -104,14 +105,11 @@ func (r *SetSyncJob) Run(ctx context.Context) error {
// Set numProcessed and numHosts for status report
r.numProcessed, r.numSets = 0, len(setAggs)
if len(setAggs) == 0 {
// Nothing to do, return early
return nil
}
setIds := make([]string, 0, len(setAggs))
for _, sa := range setAggs {
setIds = append(setIds, sa.PublicId)
}
return r.syncSets(ctx, setIds)
return r.syncSets(ctx, setAggs)
}
// NextRunIn queries the plugin host set db to determine when the next set should be synced.
@ -185,17 +183,8 @@ func nextSync(ctx context.Context, j scheduler.Job) (time.Duration, error) {
// syncSets retrieves from their plugins all the host and membership information
// for the provided host sets and updates their values in the database.
func (r *SetSyncJob) syncSets(ctx context.Context, setIds []string) error {
func (r *SetSyncJob) syncSets(ctx context.Context, setAggs []*hostSetAgg) error {
const op = "plugin.(SetSyncJob).syncSets"
if len(setIds) == 0 {
return errors.New(ctx, errors.InvalidParameter, op, "no set ids")
}
// First, look up the sets corresponding to the set IDs
var setAggs []*hostSetAgg
if err := r.reader.SearchWhere(ctx, &setAggs, "public_id in (?)", []interface{}{setIds}); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("can't retrieve sets %v", setIds)))
}
if len(setAggs) == 0 {
return nil
}
@ -296,28 +285,15 @@ func (r *SetSyncJob) syncSets(ctx context.Context, setIds []string) error {
Persisted: ci.persisted,
})
if err != nil {
return errors.Wrap(ctx, err, op)
event.WriteError(ctx, op, err, event.WithInfoMsg("listing hosts", "catalog id", ci.publicId))
r.numProcessed += len(catSetIds)
continue
}
if _, err := r.upsertAndCleanHosts(ctx, ci.storeCat, catSetIds, resp.GetHosts()); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("upserting hosts"))
}
updateSyncDataQuery := `
update host_plugin_set
set
last_sync_time = current_timestamp,
need_sync = false
where public_id in (?)
`
// update last sync time on the sets
i, err := r.writer.Exec(ctx, updateSyncDataQuery, []interface{}{catSetIds})
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("updating last sync time"))
}
if i != len(catSetIds) {
return errors.New(ctx, errors.Internal, op, "mismatched number of sets updated and synced")
event.WriteError(ctx, op, err, event.WithInfoMsg("upserting hosts", "catalog id", ci.publicId))
r.numProcessed += len(catSetIds)
continue
}
r.numProcessed += len(catSetIds)
}
@ -354,9 +330,6 @@ func (r *SetSyncJob) upsertAndCleanHosts(
if hc.GetScopeId() == "" {
return nil, errors.New(ctx, errors.InvalidParameter, op, "no scope id")
}
if setIds == nil {
return nil, errors.New(ctx, errors.InvalidParameter, op, "nil sets")
}
if len(setIds) == 0 { // At least one must have been given to the plugin
return nil, errors.New(ctx, errors.InvalidParameter, op, "empty sets")
}
@ -507,10 +480,10 @@ func (r *SetSyncJob) upsertAndCleanHosts(
}
// Now, check set membership changes
setMembershipsToAdd, setMembershipsToRemove, allSetIds := getSetChanges(currentHostMap, newHostMap)
setMembershipsToAdd, setMembershipsToRemove := getSetChanges(currentHostMap, newHostMap)
// Iterate through the sets and update memberships, one transaction per set
for setId := range allSetIds {
// Iterate through all sets and update memberships, one transaction per set
for _, setId := range setIds {
hs, err := NewHostSet(ctx, hc.PublicId)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
@ -524,11 +497,6 @@ func (r *SetSyncJob) upsertAndCleanHosts(
func(r db.Reader, w db.Writer) error {
msgs := make([]*oplog.Message, 0)
ticket, err := w.GetTicket(ctx, hs)
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get ticket"))
}
// Perform additions
for _, hostId := range setMembershipsToAdd[hs.PublicId] {
membership, err := NewHostSetMember(ctx, hs.PublicId, hostId)
@ -561,16 +529,33 @@ func (r *SetSyncJob) upsertAndCleanHosts(
msgs = append(msgs, &hOplogMsg)
}
metadata := hc.oplog(oplog.OpType_OP_TYPE_UPDATE)
if err := w.WriteOplogEntryWith(ctx, oplogWrapper, ticket, metadata, msgs); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to write oplog"))
// Oplog
if len(msgs) > 0 {
ticket, err := w.GetTicket(ctx, hs)
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get ticket"))
}
metadata := hc.oplog(oplog.OpType_OP_TYPE_UPDATE)
if err := w.WriteOplogEntryWith(ctx, oplogWrapper, ticket, metadata, msgs); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to write oplog"))
}
}
// Update last sync time
numRows, err := w.Exec(ctx, updateSyncDataQuery, []interface{}{setId})
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("updating last sync time"))
}
if numRows != 1 {
return errors.New(ctx, errors.Internal, op, fmt.Sprintf("host set (%v) synced, but failed to update repo", setId))
}
return nil
},
)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
event.WriteError(ctx, op, err, event.WithInfoMsg("failed to update membership", "set id", setId))
}
}

@ -42,5 +42,13 @@ need_sync
sync_interval_seconds is null and last_sync_time <= wt_add_seconds_to_now(?)
or
sync_interval_seconds > 0 and wt_add_seconds(sync_interval_seconds, last_sync_time) <= current_timestamp
`
updateSyncDataQuery = `
update host_plugin_set
set
last_sync_time = current_timestamp,
need_sync = false
where public_id = ?
`
)

@ -200,7 +200,6 @@ func getSetChanges(
currentHostMap map[string]*Host,
newHostMap map[string]*hostInfo) (
setMembershipsToAdd, setMembershipsToRemove map[string][]string,
allSetIds map[string]struct{},
) {
// First, find sets that hosts should be added to: hosts that are new or
// have new set IDs returned.
@ -224,11 +223,7 @@ func getSetChanges(
if setMembershipsToAdd == nil {
setMembershipsToAdd = make(map[string][]string)
}
if allSetIds == nil {
allSetIds = make(map[string]struct{})
}
setMembershipsToAdd[setToAdd] = append(setMembershipsToAdd[setToAdd], newHostId)
allSetIds[setToAdd] = struct{}{}
}
}
@ -254,11 +249,7 @@ func getSetChanges(
if setMembershipsToRemove == nil {
setMembershipsToRemove = make(map[string][]string)
}
if allSetIds == nil {
allSetIds = make(map[string]struct{})
}
setMembershipsToRemove[setToRemove] = append(setMembershipsToRemove[setToRemove], currentHostId)
allSetIds[setToRemove] = struct{}{}
}
}

@ -303,7 +303,7 @@ func TestUtilFunctions(t *testing.T) {
// Run through the sets function
{
newHostMap[baseHostId].h.SetIds = tt.sets()
toAdd, toRemove, _ := getSetChanges(currentHostMap, newHostMap)
toAdd, toRemove := getSetChanges(currentHostMap, newHostMap)
assert.Equal(tt.setsToAdd, toAdd)
assert.Equal(tt.setsToRemove, toRemove)
}

Loading…
Cancel
Save