You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
boundary/internal/host/plugin/job_set_sync.go

586 lines
19 KiB

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package plugin
import (
"context"
"fmt"
"time"
"github.com/hashicorp/boundary/internal/db"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/event"
"github.com/hashicorp/boundary/internal/kms"
"github.com/hashicorp/boundary/internal/libs/endpoint"
"github.com/hashicorp/boundary/internal/oplog"
"github.com/hashicorp/boundary/internal/scheduler"
hcpb "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/hostcatalogs"
pb "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/hostsets"
plgpb "github.com/hashicorp/boundary/sdk/pbs/plugin"
"github.com/hashicorp/go-secure-stdlib/strutil"
ua "go.uber.org/atomic"
)
const (
setSyncJobName = "plugin_host_set_sync"
setSyncJobRunInterval = 10 * time.Minute
)
// SetSyncJob is the recurring job that syncs hosts from sets that are.
// The SetSyncJob is not thread safe,
// an attempt to Run the job concurrently will result in an JobAlreadyRunning error.
type SetSyncJob struct {
reader db.Reader
writer db.Writer
kms *kms.Kms
plugins map[string]plgpb.HostPluginServiceClient
limit int
running ua.Bool
numSets int
numProcessed int
}
// newSetSyncJob creates a new in-memory SetSyncJob.
//
// WithLimit is the only supported option.
func newSetSyncJob(ctx context.Context, r db.Reader, w db.Writer, kms *kms.Kms, plgm map[string]plgpb.HostPluginServiceClient, opt ...Option) (*SetSyncJob, error) {
const op = "plugin.newSetSyncJob"
switch {
case r == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing db.Reader")
case w == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing db.Writer")
case kms == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing kms")
case plgm == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing plugin manager")
}
opts := getOpts(opt...)
if opts.withLimit == 0 {
// zero signals the boundary defaults should be used.
opts.withLimit = db.DefaultLimit
}
return &SetSyncJob{
reader: r,
writer: w,
kms: kms,
plugins: plgm,
limit: opts.withLimit,
}, nil
}
// Status returns the current status of the set sync job. Total is the total number
// of sets that are to be synced. Completed is the number of sets already synced.
func (r *SetSyncJob) Status() scheduler.JobStatus {
return scheduler.JobStatus{
Completed: r.numProcessed,
Total: r.numSets,
}
}
// Run queries the plugin host repo for sets that need to be synced, it then
// creates a plugin client and syncs each set. Can not be run in parallel, if
// Run is invoked while already running an error with code JobAlreadyRunning
// will be returned.
func (r *SetSyncJob) Run(ctx context.Context, _ time.Duration) error {
const op = "plugin.(SetSyncJob).Run"
if !r.running.CompareAndSwap(r.running.Load(), true) {
return errors.New(ctx, errors.JobAlreadyRunning, op, "job already running")
}
defer r.running.Store(false)
// Verify context is not done before running
if err := ctx.Err(); err != nil {
return errors.Wrap(ctx, err, op)
}
var setAggs []*hostSetAgg
// Fetch all sets that will reach their sync point within the syncWindow.
// This is done to avoid constantly scheduling the set sync job when there
// are multiple sets to sync in sequence.
err := r.reader.SearchWhere(ctx, &setAggs, setSyncJobQuery, []any{-1 * setSyncJobRunInterval.Seconds()}, db.WithLimit(r.limit))
if err != nil {
return errors.Wrap(ctx, err, op)
}
// Set numProcessed and numHosts for status report
r.numProcessed, r.numSets = 0, len(setAggs)
if len(setAggs) == 0 {
// Nothing to do, return early
return nil
}
return r.syncSets(ctx, setAggs)
}
// NextRunIn queries the plugin host set db to determine when the next set should be synced.
func (r *SetSyncJob) NextRunIn(ctx context.Context) (time.Duration, error) {
const op = "plugin.(SetSyncJob).NextRunIn"
next, err := nextSync(ctx, r)
if err != nil {
return setSyncJobRunInterval, errors.Wrap(ctx, err, op)
}
return next, nil
}
// Name is the unique name of the job.
func (r *SetSyncJob) Name() string {
return setSyncJobName
}
// Description is the human readable description of the job.
func (r *SetSyncJob) Description() string {
return "Periodically syncs plugin based catalog hosts and host set memberships."
}
func nextSync(ctx context.Context, j scheduler.Job) (time.Duration, error) {
const op = "plugin.nextSync"
var query string
var r db.Reader
switch job := j.(type) {
case *SetSyncJob:
query = setSyncNextRunInQuery
r = job.reader
default:
return 0, errors.New(ctx, errors.Unknown, op, "unknown job")
}
rows, err := r.Query(context.Background(), query, []any{setSyncJobRunInterval})
if err != nil {
return 0, errors.Wrap(ctx, err, op)
}
defer rows.Close()
if !rows.Next() {
return setSyncJobRunInterval, nil
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op)
}
type NextResync struct {
SyncNow bool
SyncIntervalSeconds int32
ResyncIn time.Duration
}
var n NextResync
err = r.ScanRows(ctx, rows, &n)
if err != nil {
return 0, errors.Wrap(ctx, err, op)
}
switch {
case n.SyncNow:
// Immediate
return 0, nil
case n.SyncIntervalSeconds < 0:
// In this case automatic syncing is disabled; we still sync if SyncNow
// but otherwise do not. We schedule the job at the default cadence but
// it will do nothing, just calculate a next run time to ensure it
// should stay disabled.
return setSyncJobRunInterval, nil
case n.ResyncIn < 0:
// Immediate
return 0, nil
}
return n.ResyncIn * time.Second, nil
}
// syncSets retrieves from their plugins all the host and membership information
// for the provided host sets and updates their values in the database.
func (r *SetSyncJob) syncSets(ctx context.Context, setAggs []*hostSetAgg) error {
const op = "plugin.(SetSyncJob).syncSets"
if len(setAggs) == 0 {
return nil
}
type setInfo struct {
preferredEndpoint endpoint.Option
plgSet *pb.HostSet
}
type catalogInfo struct {
publicId string // ID of the catalog
plg plgpb.HostPluginServiceClient // plugin client for the catalog
setInfos map[string]*setInfo // map of set IDs to set information
plgCat *hcpb.HostCatalog // plugin host catalog
storeCat *HostCatalog
persisted *plgpb.HostCatalogPersisted // host catalog persisted (secret) data
}
// Next, look up the distinct catalog info and assign set infos to it.
// Notably, this does not include persisted info.
catalogInfos := make(map[string]*catalogInfo)
for _, ag := range setAggs {
ci, ok := catalogInfos[ag.CatalogId]
if !ok {
ci = &catalogInfo{
publicId: ag.CatalogId,
setInfos: make(map[string]*setInfo),
}
}
s, err := ag.toHostSet(ctx)
if err != nil {
return errors.Wrap(ctx, err, op)
}
si, ok := ci.setInfos[s.GetPublicId()]
if !ok {
si = &setInfo{}
}
si.preferredEndpoint = endpoint.WithPreferenceOrder(s.PreferredEndpoints)
si.plgSet, err = toPluginSet(ctx, s)
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("converting set %q to plugin set", s.GetPublicId())))
}
ci.setInfos[s.GetPublicId()] = si
catalogInfos[ag.CatalogId] = ci
}
// Now, look up the catalog persisted (secret) information. Additionally,
// find the correct plugin to use.
catIds := make([]string, 0, len(catalogInfos))
for k := range catalogInfos {
catIds = append(catIds, k)
}
var catAggs []*catalogAgg
if err := r.reader.SearchWhere(ctx, &catAggs, "public_id in (?)", []any{catIds}); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("can't retrieve catalogs %v", catIds)))
}
if len(catAggs) == 0 {
return errors.New(ctx, errors.NotSpecificIntegrity, op, "no catalogs returned for retrieved sets")
}
for _, ca := range catAggs {
c, s := ca.toCatalogAndPersisted()
ci, ok := catalogInfos[c.GetPublicId()]
if !ok {
return errors.New(ctx, errors.NotSpecificIntegrity, op, "catalog returned when no set requested it")
}
plgCat, err := toPluginCatalog(ctx, c, ca.plugin())
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("storage to plugin catalog conversion"))
}
ci.plgCat = plgCat
ci.storeCat = c
ci.plg, err = pluginClientFactoryFn(ctx, ci.plgCat, r.plugins)
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("failed to get plugin client"))
}
per, err := toPluginPersistedData(ctx, r.kms, c.GetProjectId(), s)
if err != nil {
return errors.Wrap(ctx, err, op)
}
ci.persisted = per
catalogInfos[c.GetPublicId()] = ci
}
// For each distinct catalog, list all sets at once
for _, ci := range catalogInfos {
var sets []*pb.HostSet
var catSetIds []string
for id, si := range ci.setInfos {
sets = append(sets, si.plgSet)
catSetIds = append(catSetIds, id)
}
resp, err := ci.plg.ListHosts(ctx, &plgpb.ListHostsRequest{
Catalog: ci.plgCat,
Sets: sets,
Persisted: ci.persisted,
})
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("listing hosts", "catalog id", ci.publicId))
r.numProcessed += len(catSetIds)
continue
}
if _, err := r.upsertAndCleanHosts(ctx, ci.storeCat, catSetIds, resp.GetHosts()); err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("upserting hosts", "catalog id", ci.publicId))
r.numProcessed += len(catSetIds)
continue
}
r.numProcessed += len(catSetIds)
}
return nil
}
// upsertAndCleanHosts inserts phs into the repository or updates its current
// attributes/set memberships and returns Hosts. h is not changed. hc must
// contain a valid public ID and project ID. Each ph in phs must not contain a
// PublicId but must contain an external ID. The PublicId is generated and
// assigned by this method.
//
// NOTE: If phs is empty, this assumes that there are simply no hosts that
// matched the given sets! Which means it will remove all hosts from the given
// sets.
func (r *SetSyncJob) upsertAndCleanHosts(
ctx context.Context,
hc *HostCatalog,
setIds []string,
phs []*plgpb.ListHostsResponseHost,
_ ...Option,
) ([]*Host, error) {
const op = "plugin.(SetSyncJob).upsertAndCleanHosts"
for _, ph := range phs {
if ph.GetExternalId() == "" {
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing host external id")
}
}
if hc == nil {
return nil, errors.New(ctx, errors.InvalidParameter, op, "nil host catalog")
}
if hc.GetPublicId() == "" {
return nil, errors.New(ctx, errors.InvalidParameter, op, "no catalog id")
}
if hc.GetProjectId() == "" {
return nil, errors.New(ctx, errors.InvalidParameter, op, "no project id")
}
if len(setIds) == 0 { // At least one must have been given to the plugin
return nil, errors.New(ctx, errors.InvalidParameter, op, "empty sets")
}
oplogWrapper, err := r.kms.GetWrapper(ctx, hc.GetProjectId(), kms.KeyPurposeOplog)
if err != nil {
return nil, errors.Wrap(ctx, err, op, errors.WithMsg("unable to get oplog wrapper"))
}
// First, fetch existing hosts for the set IDs passed in, and organize them
// into a lookup map by host ID for later usage
var currentHosts []*Host
var currentHostMap map[string]*Host
{
var err error
currentHosts, err = listHostBySetIds(ctx, r.reader, setIds)
if err != nil {
return nil, errors.Wrap(ctx, err, op, errors.WithMsg("looking up current hosts for returned sets"))
}
currentHostMap = make(map[string]*Host, len(currentHosts))
for _, h := range currentHosts {
currentHostMap[h.PublicId] = h
}
}
// Now, parse the externally defined hosts into hostInfo values, which
// stores info useful for later comparisons
newHostMap, err := createNewHostMap(ctx, hc, phs, currentHostMap)
if err != nil {
return nil, errors.Wrap(ctx, err, op, errors.WithMsg("unable to create new host map"))
}
var returnedHosts []*Host
// Iterate over hosts and add or update them
for _, hi := range newHostMap {
ret := hi.h.clone()
if !hi.dirtyHost &&
len(hi.ipsToAdd) == 0 &&
len(hi.ipsToRemove) == 0 &&
len(hi.dnsNamesToAdd) == 0 &&
len(hi.dnsNamesToRemove) == 0 {
returnedHosts = append(returnedHosts, ret)
continue
}
_, err = r.writer.DoTx(
ctx,
db.StdRetryCnt,
db.ExpBackoff{},
func(r db.Reader, w db.Writer) error {
msgs := make([]*oplog.Message, 0)
ticket, err := w.GetTicket(ctx, ret)
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get ticket"))
}
// We always write the host itself as we need to update version
// for optimistic locking for any value object update.
var hOplogMsg oplog.Message
onConflict := &db.OnConflict{
Target: db.Constraint("host_plugin_host_pkey"),
Action: db.SetColumns([]string{"name", "external_name", "description", "version"}),
}
var rowsAffected int64
dbOpts := []db.Option{
db.NewOplogMsg(&hOplogMsg),
db.WithOnConflict(onConflict),
db.WithReturnRowsAffected(&rowsAffected),
}
version := ret.Version
if version > 0 {
dbOpts = append(dbOpts, db.WithVersion(&version))
ret.Version += 1
}
// This check is the logical counterpart of the database
// constraints on the external_name field. By replicating the
// checks as closely as possible in code, we reduce the risk of
// this transaction failing due to a bad external name.
if !strutil.Printable(ret.ExternalName) || len(ret.ExternalName) > 256 {
event.WriteError(ctx, op,
fmt.Errorf("ignoring host id %q external name %q due to its length (greater than 256 characters) or the presence of unsupported unicode characters",
ret.PublicId,
ret.ExternalName),
)
ret.ExternalName = ""
}
if err := w.Create(ctx, ret, dbOpts...); err != nil {
return errors.Wrap(ctx, err, op)
}
if rowsAffected != 1 {
return errors.New(ctx, errors.UnexpectedRowsAffected, op, "no rows affected during upsert")
}
msgs = append(msgs, &hOplogMsg)
// IP handling
{
if len(hi.ipsToRemove) > 0 {
oplogMsgs := make([]*oplog.Message, 0, len(hi.ipsToRemove))
count, err := w.DeleteItems(ctx, hi.ipsToRemove.toSlice(), db.NewOplogMsgs(&oplogMsgs))
if err != nil {
return err
}
if count != len(hi.ipsToRemove) {
return errors.New(ctx, errors.UnexpectedRowsAffected, op, fmt.Sprintf("expected to remove %d ips from host %s, removed %d", len(hi.ipsToRemove), ret.PublicId, count))
}
msgs = append(msgs, oplogMsgs...)
}
if len(hi.ipsToAdd) > 0 {
oplogMsgs := make([]*oplog.Message, 0, len(hi.ipsToAdd))
onConflict := &db.OnConflict{
Target: db.Constraint("host_ip_address_pkey"),
Action: db.DoNothing(true),
}
if err := w.CreateItems(ctx, hi.ipsToAdd.toSlice(), db.NewOplogMsgs(&oplogMsgs), db.WithOnConflict(onConflict)); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("adding ips %v for host %q", hi.ipsToAdd.toSlice(), ret.GetPublicId())))
}
msgs = append(msgs, oplogMsgs...)
}
}
// DNS handling
{
if len(hi.dnsNamesToRemove) > 0 {
oplogMsgs := make([]*oplog.Message, 0, len(hi.dnsNamesToRemove))
count, err := w.DeleteItems(ctx, hi.dnsNamesToRemove.toSlice(), db.NewOplogMsgs(&oplogMsgs))
if err != nil {
return err
}
if count != len(hi.dnsNamesToRemove) {
return errors.New(ctx, errors.UnexpectedRowsAffected, op, fmt.Sprintf("expected to remove %d dns names from host %s, removed %d", len(hi.dnsNamesToRemove), ret.PublicId, count))
}
msgs = append(msgs, oplogMsgs...)
}
if len(hi.dnsNamesToAdd) > 0 {
oplogMsgs := make([]*oplog.Message, 0, len(hi.dnsNamesToAdd))
onConflict := &db.OnConflict{
Target: db.Constraint("host_dns_name_pkey"),
Action: db.DoNothing(true),
}
if err := w.CreateItems(ctx, hi.dnsNamesToAdd.toSlice(), db.NewOplogMsgs(&oplogMsgs), db.WithOnConflict(onConflict)); err != nil {
return err
}
msgs = append(msgs, oplogMsgs...)
}
}
metadata := ret.oplog(oplog.OpType_OP_TYPE_UPDATE)
if err := w.WriteOplogEntryWith(ctx, oplogWrapper, ticket, metadata, msgs); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to write oplog"))
}
returnedHosts = append(returnedHosts, ret)
return nil
},
)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
}
// Now, check set membership changes
setMembershipsToAdd, setMembershipsToRemove := getSetChanges(currentHostMap, newHostMap)
// Iterate through all sets and update memberships, one transaction per set
for _, setId := range setIds {
hs, err := NewHostSet(ctx, hc.PublicId)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
hs.PublicId = setId
_, err = r.writer.DoTx(
ctx,
db.StdRetryCnt,
db.ExpBackoff{},
func(r db.Reader, w db.Writer) error {
msgs := make([]*oplog.Message, 0)
// Perform additions
for _, hostId := range setMembershipsToAdd[hs.PublicId] {
membership, err := NewHostSetMember(ctx, hs.PublicId, hostId)
if err != nil {
return errors.Wrap(ctx, err, op)
}
var hOplogMsg oplog.Message
if err := w.Create(ctx, membership, db.NewOplogMsg(&hOplogMsg)); err != nil {
return errors.Wrap(ctx, err, op)
}
msgs = append(msgs, &hOplogMsg)
}
// Perform removals
for _, hostId := range setMembershipsToRemove[hs.PublicId] {
membership, err := NewHostSetMember(ctx, hs.PublicId, hostId)
if err != nil {
return errors.Wrap(ctx, err, op)
}
var hOplogMsg oplog.Message
rows, err := w.Delete(ctx, membership, db.NewOplogMsg(&hOplogMsg))
if err != nil {
return errors.Wrap(ctx, err, op)
}
if rows != 1 {
return errors.New(ctx, errors.RecordNotFound, op, "record not found when deleting set membership")
}
msgs = append(msgs, &hOplogMsg)
}
// Oplog
if len(msgs) > 0 {
ticket, err := w.GetTicket(ctx, hs)
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get ticket"))
}
metadata := hc.oplog(oplog.OpType_OP_TYPE_UPDATE)
if err := w.WriteOplogEntryWith(ctx, oplogWrapper, ticket, metadata, msgs); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to write oplog"))
}
}
// Update last sync time
numRows, err := w.Exec(ctx, updateSyncDataQuery, []any{setId})
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("updating last sync time"))
}
if numRows != 1 {
return errors.New(ctx, errors.Internal, op, fmt.Sprintf("host set (%v) synced, but failed to update repo", setId))
}
return nil
},
)
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("failed to update membership", "set id", setId))
}
}
return returnedHosts, nil
}