Remove stale devices

This commit is contained in:
Andre Medeiros 2021-08-14 12:08:17 -04:00
parent 84e499a7af
commit 9a5d699f66
3 changed files with 42 additions and 2 deletions

View file

@ -23,6 +23,9 @@ const (
batchSize = 250 batchSize = 250
checkTimeout = 60 // how long until we force a check checkTimeout = 60 // how long until we force a check
enqueueTimeout = 5 // how long until we try to re-enqueue enqueueTimeout = 5 // how long until we try to re-enqueue
staleAccountThreshold = 7200 // 2 hours
staleDeviceThreshold = 604800 // 1 week
) )
func SchedulerCmd(ctx context.Context) *cobra.Command { func SchedulerCmd(ctx context.Context) *cobra.Command {
@ -72,6 +75,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
s.Every(1).Second().Do(func() { cleanQueues(ctx, logger, queue) }) s.Every(1).Second().Do(func() { cleanQueues(ctx, logger, queue) })
s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, db, redis) }) s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, db, redis) })
s.Every(1).Minute().Do(func() { pruneAccounts(ctx, logger, db) }) s.Every(1).Minute().Do(func() { pruneAccounts(ctx, logger, db) })
s.Every(1).Minute().Do(func() { pruneDevices(ctx, logger, db) })
s.StartAsync() s.StartAsync()
<-ctx.Done() <-ctx.Done()
@ -105,10 +109,10 @@ func evalScript(ctx context.Context, redis *redis.Client) (string, error) {
} }
func pruneAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) { func pruneAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) {
now := time.Now().Unix() - 7200 before := time.Now().Unix() - staleAccountThreshold
ar := repository.NewPostgresAccount(pool) ar := repository.NewPostgresAccount(pool)
stale, err := ar.PruneStale(ctx, now) stale, err := ar.PruneStale(ctx, before)
if err != nil { if err != nil {
logger.WithFields(logrus.Fields{ logger.WithFields(logrus.Fields{
"err": err, "err": err,
@ -133,6 +137,25 @@ func pruneAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Poo
} }
} }
func pruneDevices(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) {
before := time.Now().Unix() - staleDeviceThreshold
dr := repository.NewPostgresDevice(pool)
count, err := dr.PruneStale(ctx, before)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed cleaning stale devices")
return
}
if count > 0 {
logger.WithFields(logrus.Fields{
"count": count,
}).Info("pruned devices")
}
}
func cleanQueues(ctx context.Context, logger *logrus.Logger, jobsConn rmq.Connection) { func cleanQueues(ctx context.Context, logger *logrus.Logger, jobsConn rmq.Connection) {
cleaner := rmq.NewCleaner(jobsConn) cleaner := rmq.NewCleaner(jobsConn)
count, err := cleaner.Clean() count, err := cleaner.Clean()

View file

@ -15,4 +15,6 @@ type DeviceRepository interface {
Update(ctx context.Context, dev *Device) error Update(ctx context.Context, dev *Device) error
Create(ctx context.Context, dev *Device) error Create(ctx context.Context, dev *Device) error
Delete(ctx context.Context, token string) error Delete(ctx context.Context, token string) error
PruneStale(ctx context.Context, before int64) (int64, error)
} }

View file

@ -114,3 +114,18 @@ func (p *postgresDeviceRepository) Delete(ctx context.Context, token string) err
} }
return err return err
} }
func (p *postgresDeviceRepository) PruneStale(ctx context.Context, before int64) (int64, error) {
query := `
WITH deleted_devices AS (
DELETE FROM devices
WHERE last_pinged_at < $1
RETURNING id
)
DELETE FROM devices_accounts
WHERE device_id IN (SELECT id FROM deleted_devices)`
res, err := p.pool.Exec(ctx, query, before)
return res.RowsAffected(), err
}