From 78dc6cf370f359094aac01c3a2704e48a1a0b96a Mon Sep 17 00:00:00 2001 From: Todd Date: Fri, 12 Nov 2021 16:17:43 -0700 Subject: [PATCH] OnConflict constraints for adding host ip and dns names in upsertHosts (#1700) * Ignore constraint violations when adding ip addresses and dns addresses that already exist. --- internal/host/plugin/job_set_sync.go | 14 +++++++++++--- internal/host/plugin/job_set_sync_test.go | 16 +++++++++++++--- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/internal/host/plugin/job_set_sync.go b/internal/host/plugin/job_set_sync.go index 1b63da46c2..4f58ad1c6a 100644 --- a/internal/host/plugin/job_set_sync.go +++ b/internal/host/plugin/job_set_sync.go @@ -444,8 +444,12 @@ func (r *SetSyncJob) upsertHosts( } if len(hi.ipsToAdd) > 0 { oplogMsgs := make([]*oplog.Message, 0, len(hi.ipsToAdd)) - if err := w.CreateItems(ctx, hi.ipsToAdd.toArray(), db.NewOplogMsgs(&oplogMsgs)); err != nil { - return err + onConflict := &db.OnConflict{ + Target: db.Constraint("host_ip_address_pkey"), + Action: db.DoNothing(true), + } + if err := w.CreateItems(ctx, hi.ipsToAdd.toArray(), db.NewOplogMsgs(&oplogMsgs), db.WithOnConflict(onConflict)); err != nil { + return errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("adding ips %v for host %q", hi.ipsToAdd.toArray(), hi.h.GetPublicId()))) } msgs = append(msgs, oplogMsgs...) } @@ -466,7 +470,11 @@ func (r *SetSyncJob) upsertHosts( } if len(hi.dnsNamesToAdd) > 0 { oplogMsgs := make([]*oplog.Message, 0, len(hi.dnsNamesToAdd)) - if err := w.CreateItems(ctx, hi.dnsNamesToAdd.toArray(), db.NewOplogMsgs(&oplogMsgs)); err != nil { + onConflict := &db.OnConflict{ + Target: db.Constraint("host_dns_name_pkey"), + Action: db.DoNothing(true), + } + if err := w.CreateItems(ctx, hi.dnsNamesToAdd.toArray(), db.NewOplogMsgs(&oplogMsgs), db.WithOnConflict(onConflict)); err != nil { return err } msgs = append(msgs, oplogMsgs...) diff --git a/internal/host/plugin/job_set_sync_test.go b/internal/host/plugin/job_set_sync_test.go index 66e8705d5b..96347f301d 100644 --- a/internal/host/plugin/job_set_sync_test.go +++ b/internal/host/plugin/job_set_sync_test.go @@ -160,14 +160,18 @@ func TestSetSyncJob_Run(t *testing.T) { cat := TestCatalog(t, conn, prj.GetPublicId(), plg.GetPublicId()) set1 := TestSet(t, conn, kmsCache, sched, cat, plgm) plgServer.ListHostsFn = func(ctx context.Context, req *plgpb.ListHostsRequest) (*plgpb.ListHostsResponse, error) { - assert.Len(req.GetSets(), 1) - assert.Equal(set1.GetPublicId(), req.GetSets()[0].GetId()) + assert.GreaterOrEqual(1, len(req.GetSets())) + var setIds []string + for _, s := range req.GetSets() { + setIds = append(setIds, s.GetId()) + } return &plgpb.ListHostsResponse{ Hosts: []*plgpb.ListHostsResponseHost{ { ExternalId: "first", IpAddresses: []string{"10.0.0.1"}, - SetIds: []string{req.GetSets()[0].GetId()}, + DnsNames: []string{"foo.com"}, + SetIds: setIds, }, }, }, nil @@ -212,6 +216,12 @@ func TestSetSyncJob_Run(t *testing.T) { assert.Equal(1, r.numSets) assert.Equal(1, r.numProcessed) + // Run sync with a new second set + _ = TestSet(t, conn, kmsCache, sched, cat, plgm) + require.NoError(r.Run(context.Background())) + assert.Equal(1, r.numSets) + assert.Equal(1, r.numProcessed) + require.NoError(rw.LookupByPublicId(ctx, hs)) assert.Greater(hs.GetLastSyncTime().AsTime().UnixNano(), firstSyncTime.AsTime().UnixNano()) assert.False(hs.GetNeedSync())