From 9a5d699f66b56f10576678f64aef9aa153685718 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sat, 14 Aug 2021 12:08:17 -0400 Subject: [PATCH] Remove stale devices --- internal/cmd/scheduler.go | 27 ++++++++++++++++++++++++-- internal/domain/device.go | 2 ++ internal/repository/postgres_device.go | 15 ++++++++++++++ 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/internal/cmd/scheduler.go b/internal/cmd/scheduler.go index 23c1d93..b42427e 100644 --- a/internal/cmd/scheduler.go +++ b/internal/cmd/scheduler.go @@ -23,6 +23,9 @@ const ( batchSize = 250 checkTimeout = 60 // how long until we force a check enqueueTimeout = 5 // how long until we try to re-enqueue + + staleAccountThreshold = 7200 // 2 hours + staleDeviceThreshold = 604800 // 1 week ) func SchedulerCmd(ctx context.Context) *cobra.Command { @@ -72,6 +75,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command { s.Every(1).Second().Do(func() { cleanQueues(ctx, logger, queue) }) s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, db, redis) }) s.Every(1).Minute().Do(func() { pruneAccounts(ctx, logger, db) }) + s.Every(1).Minute().Do(func() { pruneDevices(ctx, logger, db) }) s.StartAsync() <-ctx.Done() @@ -105,10 +109,10 @@ func evalScript(ctx context.Context, redis *redis.Client) (string, error) { } func pruneAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) { - now := time.Now().Unix() - 7200 + before := time.Now().Unix() - staleAccountThreshold ar := repository.NewPostgresAccount(pool) - stale, err := ar.PruneStale(ctx, now) + stale, err := ar.PruneStale(ctx, before) if err != nil { logger.WithFields(logrus.Fields{ "err": err, @@ -133,6 +137,25 @@ func pruneAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Poo } } +func pruneDevices(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) { + before := time.Now().Unix() - staleDeviceThreshold + dr := repository.NewPostgresDevice(pool) + + count, err := dr.PruneStale(ctx, before) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Error("failed cleaning stale devices") + return + } + + if count > 0 { + logger.WithFields(logrus.Fields{ + "count": count, + }).Info("pruned devices") + } +} + func cleanQueues(ctx context.Context, logger *logrus.Logger, jobsConn rmq.Connection) { cleaner := rmq.NewCleaner(jobsConn) count, err := cleaner.Clean() diff --git a/internal/domain/device.go b/internal/domain/device.go index cc2f4cd..40deeeb 100644 --- a/internal/domain/device.go +++ b/internal/domain/device.go @@ -15,4 +15,6 @@ type DeviceRepository interface { Update(ctx context.Context, dev *Device) error Create(ctx context.Context, dev *Device) error Delete(ctx context.Context, token string) error + + PruneStale(ctx context.Context, before int64) (int64, error) } diff --git a/internal/repository/postgres_device.go b/internal/repository/postgres_device.go index 5134a32..637023c 100644 --- a/internal/repository/postgres_device.go +++ b/internal/repository/postgres_device.go @@ -114,3 +114,18 @@ func (p *postgresDeviceRepository) Delete(ctx context.Context, token string) err } return err } + +func (p *postgresDeviceRepository) PruneStale(ctx context.Context, before int64) (int64, error) { + query := ` + WITH deleted_devices AS ( + DELETE FROM devices + WHERE last_pinged_at < $1 + RETURNING id + ) + DELETE FROM devices_accounts + WHERE device_id IN (SELECT id FROM deleted_devices)` + + res, err := p.pool.Exec(ctx, query, before) + + return res.RowsAffected(), err +}