From 84e499a7afec51e38a522f50d0de9cce7afa5b62 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sat, 14 Aug 2021 11:59:13 -0400 Subject: [PATCH] schedule stale and orphaned accounts --- internal/cmd/scheduler.go | 55 +++++++------------------ internal/domain/account.go | 3 +- internal/repository/postgres_account.go | 12 +++++- 3 files changed, 28 insertions(+), 42 deletions(-) diff --git a/internal/cmd/scheduler.go b/internal/cmd/scheduler.go index df1d499..23c1d93 100644 --- a/internal/cmd/scheduler.go +++ b/internal/cmd/scheduler.go @@ -71,7 +71,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command { s.Every(200).Milliseconds().SingletonMode().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) }) s.Every(1).Second().Do(func() { cleanQueues(ctx, logger, queue) }) s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, db, redis) }) - s.Every(1).Minute().Do(func() { pruneStale(ctx, logger, db) }) + s.Every(1).Minute().Do(func() { pruneAccounts(ctx, logger, db) }) s.StartAsync() <-ctx.Done() @@ -104,46 +104,11 @@ func evalScript(ctx context.Context, redis *redis.Client) (string, error) { return redis.ScriptLoad(ctx, lua).Result() } -func pruneStale(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) { - ar := repository.NewPostgresAccount(pool) - count, err := ar.PruneStale(ctx) - - if err != nil { - logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed cleaning stale accounts") - return - } - +func pruneAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) { now := time.Now().Unix() - 7200 - ids := []int64{} - - err = pool.BeginFunc(ctx, func(tx pgx.Tx) error { - stmt := ` - WITH account AS ( - SELECT id - FROM accounts - WHERE - expires_at < $1 - ) - DELETE FROM accounts - WHERE accounts.id IN(SELECT id FROM account) - RETURNING accounts.id` - rows, err := tx.Query(ctx, stmt, now) - if err != nil { - return err - } - defer rows.Close() - - for rows.Next() { - var id int64 - rows.Scan(&id) - ids = append(ids, id) - count++ - } - return nil - }) + ar := repository.NewPostgresAccount(pool) + stale, err := ar.PruneStale(ctx, now) if err != nil { logger.WithFields(logrus.Fields{ "err": err, @@ -151,10 +116,20 @@ func pruneStale(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) return } + orphaned, err := ar.PruneOrphaned(ctx) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Error("failed cleaning orphaned accounts") + return + } + + count := stale + orphaned + if count > 0 { logger.WithFields(logrus.Fields{ "count": count, - }).Info("cleaned stale accounts") + }).Info("pruned accounts") } } diff --git a/internal/domain/account.go b/internal/domain/account.go index 945a017..bf74b98 100644 --- a/internal/domain/account.go +++ b/internal/domain/account.go @@ -38,5 +38,6 @@ type AccountRepository interface { Associate(ctx context.Context, acc *Account, dev *Device) error Disassociate(ctx context.Context, acc *Account, dev *Device) error - PruneStale(ctx context.Context) (int64, error) + PruneOrphaned(ctx context.Context) (int64, error) + PruneStale(ctx context.Context, before int64) (int64, error) } diff --git a/internal/repository/postgres_account.go b/internal/repository/postgres_account.go index d1eb9fd..1694c86 100644 --- a/internal/repository/postgres_account.go +++ b/internal/repository/postgres_account.go @@ -189,7 +189,17 @@ func (p *postgresAccountRepository) GetByAPNSToken(ctx context.Context, token st return p.fetch(ctx, query, token) } -func (p *postgresAccountRepository) PruneStale(ctx context.Context) (int64, error) { +func (p *postgresAccountRepository) PruneStale(ctx context.Context, before int64) (int64, error) { + query := ` + DELETE FROM accounts + WHERE expires_at < $1` + + res, err := p.pool.Exec(ctx, query, before) + + return res.RowsAffected(), err +} + +func (p *postgresAccountRepository) PruneOrphaned(ctx context.Context) (int64, error) { query := ` WITH accounts_with_device_count AS ( SELECT accounts.id, COUNT(device_id) AS device_count