fix: properly handle rows.Err() (#4261)

* fix (auth): properly handle rows.Err()

* fix (auth/ldap): properly handle rows.Err()

* fix (auth/oidc): properly handle rows.Err()

* fix (auth/password): properly handle rows.Err()

* fix (authtoken): properly handle rows.Err()

* fix (credential): properly handle rows.Err()

* fix (credential/static): properly handle rows.Err()

* fix (credential/vault): properly handle rows.Err()

* fix (db): properly handle rows.Err()

* fix (db/schema): properly handle rows.Err()

* fix (db/migrations): properly handle rows.Err()

* fix (host): properly handle rows.Err()

* fix (host/plugin): properly handle rows.Err()

* fix (host/static): properly handle rows.Err()

* fix (iam): properly handle rows.Err()

* fix (kms): properly handle rows.Err()

* fix (oplog): properly handle rows.Err()

* fix (pagination/purge): properly handle rows.Err()

* fix (plugin): properly handle rows.Err()

* fix (job/scheduler): properly handle rows.Err()

* fix (server): properly handle rows.Err()

* fix (session): properly handle rows.Err()

* fix (target): properly handle rows.Err()
pull/4267/head
Jim 2 years ago committed by GitHub
parent c6714c1302
commit 6a37c93d77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -88,6 +88,7 @@ select count(*) from test_auth_method where public_id = @public_id
err := rw.ScanRows(ctx, rows, &count)
require.NoError(err)
}
assert.NoError(rows.Err())
assert.Equal(1, count.Count)
count.Count = 0
@ -98,6 +99,7 @@ select count(*) from test_auth_method where public_id = @public_id
err := rw.ScanRows(ctx, rows, &count)
require.NoError(err)
}
assert.NoError(rows.Err())
assert.Equal(1, count.Count)
}
@ -170,6 +172,7 @@ values
err := rw.ScanRows(ctx, rows, &count)
require.NoError(err)
}
assert.NoError(rows.Err())
assert.Equal(1, count.Count)
count.Count = 0
@ -180,5 +183,6 @@ values
err := rw.ScanRows(ctx, rows, &count)
require.NoError(err)
}
assert.NoError(rows.Err())
assert.Equal(1, count.Count)
}

@ -371,5 +371,8 @@ func (r *Repository) estimatedAccountCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query ldap account counts"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query ldap account counts"))
}
return count, nil
}

@ -381,5 +381,8 @@ func (r *Repository) estimatedManagedGroupCount(ctx context.Context) (int, error
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query ldap managed group counts"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query ldap managed group counts"))
}
return count, nil
}

@ -400,5 +400,8 @@ func (r *Repository) estimatedAccountCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query ldap account counts"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query ldap account counts"))
}
return count, nil
}

@ -184,6 +184,9 @@ func (r *Repository) upsertAccount(ctx context.Context, am *AuthMethod, IdTokenC
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to scan rows for account"))
}
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get next rows for account"))
}
if rowCnt > 1 {
return errors.New(ctx, errors.MultipleRecords, op, fmt.Sprintf("expected 1 row but got: %d", rowCnt))
}

@ -368,5 +368,8 @@ func (r *Repository) estimatedManagedGroupCount(ctx context.Context) (int, error
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query ldap managed group counts"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query ldap managed group counts"))
}
return count, nil
}

@ -442,5 +442,8 @@ func (r *Repository) estimatedAccountCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query ldap account counts"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query ldap account counts"))
}
return count, nil
}

@ -159,7 +159,9 @@ func (r *Repository) currentConfigForAccount(ctx context.Context, accountId stri
}
confs = append(confs, conf)
}
if err := rows.Err(); err != nil {
return nil, errors.Wrap(ctx, err, op)
}
var cc currentConfig
switch {
case len(confs) == 0:

@ -229,6 +229,9 @@ func (r *Repository) authenticate(ctx context.Context, scopeId, authMethodId, lo
}
accts = append(accts, aa)
}
if err := rows.Err(); err != nil {
return nil, errors.Wrap(ctx, err, op)
}
var acct authAccount
switch {

@ -135,6 +135,9 @@ func (amr *AuthMethodRepository) EstimatedCount(ctx context.Context) (int, error
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total auth methods"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total auth methods"))
}
return count, nil
}
@ -154,6 +157,9 @@ func (amr *AuthMethodRepository) ListDeletedIds(ctx context.Context, since time.
return errors.Wrap(ctx, err, op)
}
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op)
}
transactionTimestamp, err = r.Now(ctx)
return err
}); err != nil {
@ -179,7 +185,9 @@ func (amr *AuthMethodRepository) queryAuthMethods(ctx context.Context, query str
return err
}
}
if err := rows.Err(); err != nil {
return err
}
for _, am := range foundAuthMethods {
authmethod, err := am.toAuthMethod(ctx)
if err != nil {

@ -378,6 +378,9 @@ func (r *Repository) queryAuthTokens(ctx context.Context, query string, args []a
}
atvs = append(atvs, &atv)
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("rows next error"))
}
authTokens = make([]*AuthToken, 0, len(atvs))
for _, atv := range atvs {
@ -429,6 +432,9 @@ func (r *Repository) estimatedCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total auth tokens"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total auth tokens"))
}
return count, nil
}

@ -106,6 +106,9 @@ func (s *StoreRepository) EstimatedCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total credential stores"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total credential stores"))
}
return count, nil
}
@ -127,6 +130,9 @@ func (s *StoreRepository) ListDeletedIds(ctx context.Context, since time.Time) (
}
deletedStoreIDs = append(deletedStoreIDs, id)
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op)
}
transactionTimestamp, err = r.Now(ctx)
return err
}); err != nil {
@ -152,6 +158,9 @@ func (s *StoreRepository) queryStores(ctx context.Context, query string, args []
return err
}
}
if err := rows.Err(); err != nil {
return err
}
for _, s := range foundStores {
store, err := s.toStore(ctx)

@ -826,6 +826,9 @@ func (r *Repository) queryCredentials(ctx context.Context, query string, args []
return errors.Wrap(ctx, err, op)
}
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op)
}
for _, result := range results {
cred, err := result.toCredential(ctx)
if err != nil {
@ -917,6 +920,9 @@ func (r *Repository) EstimatedCredentialCount(ctx context.Context) (int, error)
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total static credentials"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total static credentials"))
}
return count, nil
}

@ -285,6 +285,9 @@ func nextRenewal(ctx context.Context, j scheduler.Job) (time.Duration, error) {
}
return n.RenewalIn * time.Second, nil
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op)
}
return defaultNextRunIn, nil
}

@ -438,6 +438,9 @@ func (r *Repository) getIssueCredLibraries(ctx context.Context, requests []crede
libs = append(libs, cp)
}
}
if err := rows.Err(); err != nil {
return nil, errors.Wrap(ctx, err, op, errors.WithMsg("next row failed"))
}
var decryptedLibs []issuingCredentialLibrary
for _, pl := range libs {

@ -563,6 +563,9 @@ func (r *Repository) queryLibraries(ctx context.Context, query string, args []an
return errors.Wrap(ctx, err, op)
}
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op)
}
for _, result := range results {
lib, err := result.toLibrary(ctx)
if err != nil {
@ -592,6 +595,9 @@ func (r *Repository) EstimatedLibraryCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total vault credential libraries"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total vault credential libraries"))
}
return count, nil
}

@ -1617,6 +1617,7 @@ group by store_id, status;
break
}
}
assert.NoError(rows.Err())
assert.Contains(storeIds, storeId)
require.NotNil(privateStore)
if assert.NotNil(privateStore.DeleteTime) {
@ -1664,6 +1665,7 @@ group by store_id, status;
break
}
}
assert.NoError(rows.Err())
assert.Contains(storeIds, storeId)
require.NotNil(privateStore)
assert.Empty(cmp.Diff(deleteTime, privateStore.DeleteTime, protocmp.Transform()))

@ -73,6 +73,7 @@ func lookupDbCred(t *testing.T, ctx context.Context, rw *db.Db, dc credential.Dy
&got.ExpirationTime,
))
}
assert.NoError(t, rows.Err())
// Should never get more than one that matches, but can get 0
assert.LessOrEqual(t, rowCount, 1)

@ -39,6 +39,9 @@ Just some high-level usage highlights to get you started. Read the godocs for a
}
// Do something with the Gorm user struct
}
if err := rows.Err(); err != nil {
// do something with the err
}
// DoTx is a writer function that wraps a TxHandler
// in a retryable transaction. You simply implement a

@ -510,6 +510,7 @@ func TestDomain_DefaultUsersExist(t *testing.T) {
for rows.Next() {
count++
}
assert.NoError(t, rows.Err())
assert.Equal(t, 1, count)
}
}

@ -556,6 +556,9 @@ func (rw *Db) Now(ctx context.Context) (time.Time, error) {
return time.Time{}, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query current timestamp"))
}
}
if err := rows.Err(); err != nil {
return time.Time{}, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query current timestamp"))
}
return now, nil
}

@ -1642,6 +1642,7 @@ func TestDb_ScanRows(t *testing.T) {
require.NoError(err)
assert.Equal(user.PublicId, u.PublicId)
}
assert.NoError(rows.Err())
})
}
@ -1672,6 +1673,7 @@ func TestDb_Query(t *testing.T) {
require.NoError(err)
assert.Equal(user.PublicId, u.PublicId)
}
assert.NoError(rows.Err())
})
}

@ -87,6 +87,9 @@ func ExampleManager_hooks() {
}
invalid = append(invalid, fmt.Sprintf("%d:%s", id, name))
}
if err := rows.Err(); err != nil {
return nil, err
}
if len(invalid) > 0 {
return append([]string{"invalid foos:"}, invalid...), nil
@ -116,6 +119,9 @@ func ExampleManager_hooks() {
}
invalid = append(invalid, fmt.Sprintf("%d:%s", id, name))
}
if err := rows.Err(); err != nil {
return nil, err
}
if len(invalid) > 0 {
return append([]string{"deleted foos:"}, invalid...), nil

@ -90,6 +90,7 @@ func TestMigration(t *testing.T) {
require.NoError(rows.Scan(&sessVal, &serverVal))
require.False(serverVal.Valid)
}
require.NoError(rows.Err())
_, err = db.Query(update)
require.NoError(err)
@ -104,4 +105,5 @@ func TestMigration(t *testing.T) {
require.Equal(tests[count].sessServerId, serverVal.String)
count++
}
require.NoError(rows.Err())
}

@ -127,6 +127,7 @@ func Test_ServerEnumChanges(t *testing.T) {
require.NoError(rows.Scan(&a))
actualEnm = append(actualEnm, a)
}
require.NoError(rows.Err())
require.Equal([]string{"controller", "worker"}, actualEnm)
// Try inserting a broken row

@ -229,6 +229,7 @@ values
require.NoError(t, rw.ScanRows(context.Background(), rows, &addr))
results = append(results, addr)
}
assert.NoError(t, rows.Err())
assert.ElementsMatch(t, results, addresses)
}
}

@ -251,6 +251,7 @@ func loadKeks(t *testing.T, rw *db.Db) []kek {
require.NoError(t, rw.ScanRows(context.Background(), rows, &key))
keks = append(keks, key)
}
require.NoError(t, rows.Err())
return keks
}
@ -266,6 +267,7 @@ func loadKekVersions(t *testing.T, rw *db.Db) []kekVersion {
require.NoError(t, rw.ScanRows(context.Background(), rows, &v))
kekVersions = append(kekVersions, v)
}
require.NoError(t, rows.Err())
return kekVersions
}
@ -281,6 +283,7 @@ func loadNewDeks(t *testing.T, rw *db.Db) []dek {
require.NoError(t, rw.ScanRows(context.Background(), rows, &key))
deks = append(deks, key)
}
require.NoError(t, rows.Err())
return deks
}
@ -315,6 +318,7 @@ func loadCurrentDeks(t *testing.T, rw *db.Db) []dek {
require.NoError(t, rw.ScanRows(context.Background(), rows, &key))
deks = append(deks, key)
}
require.NoError(t, rows.Err())
}
return deks
}
@ -331,6 +335,7 @@ func loadNewDekVersions(t *testing.T, rw *db.Db) []dekVersion {
require.NoError(t, rw.ScanRows(context.Background(), rows, &v))
dekVersions = append(dekVersions, v)
}
require.NoError(t, rows.Err())
return dekVersions
}
@ -420,6 +425,7 @@ func loadCurrentDekVersions(t *testing.T, rw *db.Db) []dekVersion {
}
dekVersions = append(dekVersions, v)
}
require.NoError(t, rows.Err())
}
return dekVersions
}

@ -371,6 +371,7 @@ func validateRepairFunc(t *testing.T, rw *db.Db, repairReport migration.Repairs)
resourceId: resourceId,
})
}
require.NoError(rows.Err())
require.Equal([]targetAssociation{
{
targetId: "ttcp_PRJA___65001",

@ -53,7 +53,9 @@ func TestMigrationHook8202(t *testing.T) {
require.NoError(t, rows.Scan(&targetOrgId))
count++
}
require.NoError(t, rows.Err())
rows.Close()
require.NoError(t, rows.Err())
assert.Equal(t, 1, count)
assert.Equal(t, "o_test__82002", *targetOrgId)
@ -66,6 +68,7 @@ func TestMigrationHook8202(t *testing.T) {
require.NoError(t, rows.Scan(&targetOrgId))
count++
}
require.NoError(t, rows.Err())
assert.Equal(t, 1, count)
assert.Nil(t, targetOrgId)
}

@ -157,6 +157,9 @@ func nextSync(ctx context.Context, j scheduler.Job) (time.Duration, error) {
if !rows.Next() {
return setSyncJobRunInterval, nil
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op)
}
type NextResync struct {
SyncNow bool

@ -117,6 +117,9 @@ func (r *Repository) queryHosts(ctx context.Context, query string, args []any) (
return err
}
}
if err := rows.Err(); err != nil {
return err
}
if len(foundHosts) != 0 {
plg = plugin.NewPlugin()
plg.PublicId = foundHosts[0].PluginId
@ -223,5 +226,8 @@ func (r *Repository) estimatedHostCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query plugin hosts"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query plugin hosts"))
}
return count, nil
}

@ -678,6 +678,9 @@ func (r *Repository) querySets(ctx context.Context, query string, args []any) ([
return err
}
}
if err := rows.Err(); err != nil {
return err
}
if len(foundSets) != 0 {
plg = plugin.NewPlugin()
plg.PublicId = foundSets[0].PluginId
@ -917,6 +920,9 @@ func (r *Repository) estimatedSetCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query plugin host sets"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query plugin host sets"))
}
return count, nil
}

@ -107,6 +107,9 @@ func (s *CatalogRepository) EstimatedCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total host catalogs"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total host catalogs"))
}
return count, nil
}
@ -128,6 +131,9 @@ func (s *CatalogRepository) ListDeletedIds(ctx context.Context, since time.Time)
}
deletedCatalogIDs = append(deletedCatalogIDs, id)
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op)
}
transactionTimestamp, err = r.Now(ctx)
return err
}); err != nil {
@ -154,6 +160,9 @@ func (s *CatalogRepository) queryCatalogs(ctx context.Context, query string, arg
return err
}
}
if err := rows.Err(); err != nil {
return err
}
var plgIds []string
for _, c := range foundCatalogs {
catalog, err := c.toCatalog(ctx)

@ -307,6 +307,9 @@ func (r *Repository) queryHosts(ctx context.Context, query string, args []any) (
return err
}
}
if err := rows.Err(); err != nil {
return err
}
hosts = make([]*Host, 0, len(foundHosts))
for _, ha := range foundHosts {
hosts = append(hosts, ha.toHost())
@ -397,5 +400,8 @@ func (r *Repository) estimatedHostCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query static hosts"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query static hosts"))
}
return count, nil
}

@ -410,5 +410,8 @@ func (r *Repository) estimatedSetCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query static host sets"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query static host sets"))
}
return count, nil
}

@ -363,5 +363,8 @@ func (r *Repository) changes(ctx context.Context, setId string, hostIds []string
}
changes = append(changes, &chg)
}
if err := rows.Err(); err != nil {
return nil, errors.Wrap(ctx, err, op, errors.WithMsg("next row error"))
}
return changes, nil
}

@ -523,6 +523,9 @@ func groupMemberChanges(ctx context.Context, reader db.Reader, groupId string, u
}
changes = append(changes, &chg)
}
if err := rows.Err(); err != nil {
return nil, nil, errors.Wrap(ctx, err, op)
}
addMembers := []any{}
deleteMembers := []any{}
for _, c := range changes {
@ -683,5 +686,8 @@ func (r *Repository) estimatedGroupCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total groups"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total groups"))
}
return count, nil
}

@ -319,5 +319,8 @@ func (r *Repository) estimatedRoleCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total roles"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total roles"))
}
return count, nil
}

@ -421,5 +421,8 @@ func (r *Repository) GrantsForUser(ctx context.Context, userId string, _ ...Opti
}
grants = append(grants, g)
}
if err := rows.Err(); err != nil {
return nil, errors.Wrap(ctx, err, op)
}
return grants, nil
}

@ -680,5 +680,8 @@ func (r *Repository) estimatedScopeCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total scopes"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total scopes"))
}
return count, nil
}

@ -813,6 +813,9 @@ func associationChanges(ctx context.Context, reader db.Reader, userId string, ac
}
changes = append(changes, &chg)
}
if err := rows.Err(); err != nil {
return nil, nil, errors.Wrap(ctx, err, op)
}
var associateIds, disassociateIds []string
for _, c := range changes {
if c.AccountId == "" {
@ -986,5 +989,8 @@ func (r *Repository) estimatedUserCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total users"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total users"))
}
return count, nil
}

@ -476,6 +476,9 @@ func (k *Kms) MonitorTableRewrappingRuns(ctx context.Context, tableName string,
return errors.Wrap(ctx, err, op, errors.WithMsg("failed to scan pending run for %q", tableName))
}
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("failed to get next pending runs for %q", tableName))
}
if run.KeyId == "" {
// No queued runs, lets try again later
return nil
@ -525,6 +528,9 @@ func (k *Kms) MonitorTableRewrappingRuns(ctx context.Context, tableName string,
return errors.Wrap(ctx, err, op, errors.WithMsg("failed to scan scope id for data key version"))
}
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("failed to get next scope id for data key version"))
}
// Call the function to rewrap the data in the table. The progress will be automatically
// updated by the deferred function.
@ -551,6 +557,9 @@ func (k *Kms) MonitorDataKeyVersionDestruction(ctx context.Context) error {
return errors.Wrap(ctx, err, op)
}
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op)
}
for _, dataKeyVersionId := range completedDataKeyVersionIds {
// Finally, revoke the key, deleting it from the database.
// This will error if anything still references it that isn't
@ -641,6 +650,9 @@ keyLoop:
return errors.Wrap(ctx, err, op, errors.WithMsg("failed to scan number of rows for %q", table.GetTableName()))
}
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("failed to get next number of rows for %q", table.GetTableName()))
}
if numRows == 0 {
// No rows to rewrap 🎉
continue

@ -167,5 +167,6 @@ order by ccu.table_name,pgc.conname `
rw.ScanRows(rows, &r)
results = append(results, r)
}
require.NoError(err)
return results
}

@ -32,6 +32,9 @@ func (w *Writer) hasTable(ctx context.Context, tableName string) (bool, error) {
if ok := rows.Next(); ok {
rw.ScanRows(rows, &count)
}
if err := rows.Err(); err != nil {
return false, errors.Wrap(ctx, err, op)
}
return count > 0, nil
}

@ -49,5 +49,8 @@ func RegisterJobs(ctx context.Context, s *scheduler.Scheduler, r db.Reader, w db
return errors.Wrap(ctx, err, op)
}
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get next row for deletion tables"))
}
return nil
}

@ -64,6 +64,7 @@ func TestPurgeTables(t *testing.T) {
}
require.Equal(t, 1, count)
}
require.NoError(t, rows.Err())
}
func TestNewPurgeJob(t *testing.T) {

@ -349,6 +349,7 @@ func TestRepository_AddSupportFlag(t *testing.T) {
rowCount++
require.NoError(rows.Scan(&plgid))
}
require.NoError(rows.Err())
if tt.flagExists {
assert.Equal(1, rowCount)
@ -384,6 +385,7 @@ func TestRepository_AddSupportFlag(t *testing.T) {
for rows.Next() {
rowCount++
}
require.NoError(rows.Err())
assert.Equal(1, rowCount)
})
}

@ -119,6 +119,7 @@ func TestPlugin(t *testing.T) {
for rows.Next() {
numRows++
}
require.NoError(rows.Err())
_ = rows.Close()
require.Equal(1, numRows)
@ -139,6 +140,7 @@ func TestPlugin(t *testing.T) {
for rows.Next() {
numRows++
}
require.NoError(rows.Err())
_ = rows.Close()
require.Equal(1, numRows)
@ -165,6 +167,7 @@ func TestPlugin(t *testing.T) {
for rows.Next() {
numRows++
}
require.NoError(rows.Err())
_ = rows.Close()
require.Equal(2, numRows)
})

@ -61,6 +61,9 @@ func (r *Repository) UpsertJob(ctx context.Context, name, description string, op
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to scan rows for job"), errors.WithoutEvent())
}
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get next row for job"), errors.WithoutEvent())
}
if rowCnt == 0 {
return errors.New(ctx, errors.NotSpecificIntegrity, op, "failed to create new job", errors.WithoutEvent())
}
@ -110,6 +113,9 @@ func (r *Repository) UpdateJobNextRunInAtLeast(ctx context.Context, name string,
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to scan rows"))
}
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get next row for job"))
}
if rowCnt == 0 {
return errors.New(ctx, errors.RecordNotFound, op, fmt.Sprintf("job %q does not exist", name))
}

@ -53,6 +53,9 @@ func (r *Repository) RunJobs(ctx context.Context, serverId string, opt ...Option
}
runs = append(runs, run)
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get next row for job run"))
}
return nil
},
@ -95,6 +98,9 @@ func (r *Repository) UpdateProgress(ctx context.Context, runId string, completed
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to scan rows for job run"))
}
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get next row for job run"))
}
if rowCnt == 0 {
// Failed to update run, either it does not exist or was in an invalid state
if err = r.LookupById(ctx, run); err != nil {
@ -159,6 +165,9 @@ func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn ti
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to scan rows for job run"))
}
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get next row for job run"))
}
if rowCnt == 0 {
// Failed to update run, either it does not exist or was in an invalid state
if err = r.LookupById(ctx, run); err != nil {
@ -189,6 +198,9 @@ func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn ti
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to scan rows for job"))
}
}
if err := rows1.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get next row for job"))
}
return nil
},
@ -239,6 +251,9 @@ func (r *Repository) FailRun(ctx context.Context, runId string, completed, total
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to scan rows for job run"))
}
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get next row for job run"))
}
if rowCnt == 0 {
// Failed to update run, either it does not exist or was in an invalid state
if err = r.LookupById(ctx, run); err != nil {
@ -300,6 +315,9 @@ func (r *Repository) InterruptRuns(ctx context.Context, interruptThreshold time.
}
runs = append(runs, run)
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get next row for job run"))
}
return nil
},

@ -56,6 +56,9 @@ func testRun(conn *db.DB, pluginId, name, cId string) (*Run, error) {
if !rows.Next() {
return nil, nil
}
if err := rows.Err(); err != nil {
return nil, err
}
err = rw.ScanRows(ctx, rows, run)
if err != nil {
@ -84,6 +87,9 @@ func testRunWithUpdateTime(conn *db.DB, pluginId, name, cId string, updateTime t
if !rows.Next() {
return nil, fmt.Errorf("expected to rows")
}
if err := rows.Err(); err != nil {
return nil, err
}
err = rw.ScanRows(ctx, rows, run)
if err != nil {

@ -486,6 +486,9 @@ func (r *WorkerAuthRepositoryStorage) loadNodeInformation(ctx context.Context, n
}
workerAuths = append(workerAuths, &s)
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op)
}
workerAuthorizedSet, err := r.validateWorkerAuths(ctx, workerAuths)
if err != nil {
@ -640,6 +643,9 @@ func (r *WorkerAuthRepositoryStorage) FilterToAuthorizedWorkerKeyIds(ctx context
}
ret = append(ret, result.WorkerKeyIdentifier)
}
if err := rows.Err(); err != nil {
return nil, errors.Wrap(ctx, err, op)
}
return ret, nil
}

@ -189,6 +189,9 @@ func (j *sessionConnectionCleanupJob) closeConnectionsForDeadWorkers(ctx context
results = append(results, result)
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op)
}
return nil
},

@ -430,6 +430,9 @@ func (r *ConnectionRepository) closeOrphanedConnections(ctx context.Context, wor
}
orphanedConns = append(orphanedConns, connectionId)
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("error getting next row"))
}
return nil
},
)

@ -159,6 +159,9 @@ func (r *Repository) CreateSession(ctx context.Context, sessionWrapper wrapping.
w.ScanRows(ctx, rows, &returnedCred)
returnedSession.DynamicCredentials = append(returnedSession.DynamicCredentials, &returnedCred)
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op)
}
}
var foundStates []*State
@ -376,6 +379,9 @@ func (r *Repository) querySessions(ctx context.Context, query string, args []any
}
sessionsList = append(sessionsList, &s)
}
if err := rows.Err(); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("failed to get next row for session"))
}
sessions, err = r.convertToSessions(ctx, sessionsList)
if err != nil {
return errors.Wrap(ctx, err, op)
@ -426,6 +432,9 @@ func (r *Repository) estimatedCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total sessions"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total sessions"))
}
return count, nil
}
@ -576,6 +585,9 @@ func (r *Repository) sessionAuthzSummary(ctx context.Context, sessionId string)
return nil, errors.Wrap(ctx, err, op, errors.WithMsg("scan row failed"))
}
}
if err := rows.Err(); err != nil {
return nil, errors.Wrap(ctx, err, op, errors.WithMsg("failed to get next row for session"))
}
return info, nil
}

@ -147,6 +147,7 @@ func TestRepository_convertToSessions(t *testing.T) {
require.NoError(t, err)
sessionsList = append(sessionsList, &s)
}
require.NoError(t, rows.Err())
sessions, err := repo.convertToSessions(ctx, sessionsList)
require.NoError(t, err)

@ -93,6 +93,7 @@ func TestRewrap_sessionCredentialRewrapFn(t *testing.T) {
rowCount++
require.NoError(t, rows.Scan(&got.CtCredential, &got.KeyId, &got.CredentialSha256))
}
require.NoError(t, rows.Err())
assert.Equal(t, 1, rowCount)
kmsWrapper2, err := kmsCache.GetWrapper(context.Background(), prj.PublicId, kms.KeyPurposeDatabase, kms.WithKeyId(got.KeyId))

@ -225,6 +225,9 @@ func (r *Repository) FetchAuthzProtectedEntitiesByScope(ctx context.Context, pro
}
targetsMap[tv.GetProjectId()] = append(targetsMap[tv.GetProjectId()], tv)
}
if err := rows.Err(); err != nil {
return nil, errors.Wrap(ctx, err, op, errors.WithMsg("next rows error"))
}
return targetsMap, nil
}
@ -316,6 +319,9 @@ func (r *Repository) queryTargets(ctx context.Context, query string, args []any,
return err
}
}
if err := rows.Err(); err != nil {
return err
}
var targetIds []string
for _, t := range foundTargets {
targetIds = append(targetIds, t.GetPublicId())
@ -427,6 +433,9 @@ func (r *Repository) estimatedCount(ctx context.Context) (int, error) {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total targets"))
}
}
if err := rows.Err(); err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("failed to query total targets"))
}
return count, nil
}

@ -517,6 +517,9 @@ func (r *Repository) changes(ctx context.Context, targetId string, ids []string,
}
}
}
if err := rows.Err(); err != nil {
return nil, nil, nil, nil, errors.Wrap(ctx, err, op, errors.WithMsg("next rows error"))
}
return addCredLib, delCredLib, addStaticCred, delStaticCred, nil
}

Loading…
Cancel
Save