schedule stale and orphaned accounts

This commit is contained in:
Andre Medeiros 2021-08-14 11:59:13 -04:00
parent f6e05a6fd8
commit 84e499a7af
3 changed files with 28 additions and 42 deletions

View file

@ -71,7 +71,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
s.Every(200).Milliseconds().SingletonMode().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) })
s.Every(1).Second().Do(func() { cleanQueues(ctx, logger, queue) })
s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, db, redis) })
s.Every(1).Minute().Do(func() { pruneStale(ctx, logger, db) })
s.Every(1).Minute().Do(func() { pruneAccounts(ctx, logger, db) })
s.StartAsync()
<-ctx.Done()
@ -104,46 +104,11 @@ func evalScript(ctx context.Context, redis *redis.Client) (string, error) {
return redis.ScriptLoad(ctx, lua).Result()
}
func pruneStale(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) {
ar := repository.NewPostgresAccount(pool)
count, err := ar.PruneStale(ctx)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed cleaning stale accounts")
return
}
func pruneAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) {
now := time.Now().Unix() - 7200
ids := []int64{}
err = pool.BeginFunc(ctx, func(tx pgx.Tx) error {
stmt := `
WITH account AS (
SELECT id
FROM accounts
WHERE
expires_at < $1
)
DELETE FROM accounts
WHERE accounts.id IN(SELECT id FROM account)
RETURNING accounts.id`
rows, err := tx.Query(ctx, stmt, now)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var id int64
rows.Scan(&id)
ids = append(ids, id)
count++
}
return nil
})
ar := repository.NewPostgresAccount(pool)
stale, err := ar.PruneStale(ctx, now)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
@ -151,10 +116,20 @@ func pruneStale(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool)
return
}
orphaned, err := ar.PruneOrphaned(ctx)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed cleaning orphaned accounts")
return
}
count := stale + orphaned
if count > 0 {
logger.WithFields(logrus.Fields{
"count": count,
}).Info("cleaned stale accounts")
}).Info("pruned accounts")
}
}

View file

@ -38,5 +38,6 @@ type AccountRepository interface {
Associate(ctx context.Context, acc *Account, dev *Device) error
Disassociate(ctx context.Context, acc *Account, dev *Device) error
PruneStale(ctx context.Context) (int64, error)
PruneOrphaned(ctx context.Context) (int64, error)
PruneStale(ctx context.Context, before int64) (int64, error)
}

View file

@ -189,7 +189,17 @@ func (p *postgresAccountRepository) GetByAPNSToken(ctx context.Context, token st
return p.fetch(ctx, query, token)
}
func (p *postgresAccountRepository) PruneStale(ctx context.Context) (int64, error) {
func (p *postgresAccountRepository) PruneStale(ctx context.Context, before int64) (int64, error) {
query := `
DELETE FROM accounts
WHERE expires_at < $1`
res, err := p.pool.Exec(ctx, query, before)
return res.RowsAffected(), err
}
func (p *postgresAccountRepository) PruneOrphaned(ctx context.Context) (int64, error) {
query := `
WITH accounts_with_device_count AS (
SELECT accounts.id, COUNT(device_id) AS device_count