diff --git a/cmd/apollo-scheduler/main.go b/cmd/apollo-scheduler/main.go index bf4b9ed..47141de 100644 --- a/cmd/apollo-scheduler/main.go +++ b/cmd/apollo-scheduler/main.go @@ -90,15 +90,16 @@ func main() { // Set up queues var ( + jobsConn rmq.Connection notificationsQueue rmq.Queue ) { - connection, err := rmq.OpenConnectionWithRedisClient("producer", redisConn, errChan) + jobsConn, err = rmq.OpenConnectionWithRedisClient("producer", redisConn, errChan) if err != nil { panic(err) } - notificationsQueue, err = connection.OpenQueue("notifications") + notificationsQueue, err = jobsConn.OpenQueue("notifications") if err != nil { panic(err) } @@ -112,7 +113,9 @@ func main() { s := gocron.NewScheduler(time.UTC) s.Every(1).Second().Do(func() { enqueueAccounts(ctx, logger, statsd, pool, redisConn, luaSha, notificationsQueue) }) + s.Every(1).Second().Do(func() { cleanQueues(ctx, logger, jobsConn) }) s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, pool, redisConn) }) + s.Every(1).Minute().Do(func() { cleanAccounts(ctx, logger, pool) }) s.StartAsync() signals := make(chan os.Signal, 1) @@ -148,6 +151,64 @@ func evalScript(ctx context.Context, redisConn *redis.Client) (string, error) { return redisConn.ScriptLoad(ctx, lua).Result() } +func cleanAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) { + now := time.Now().Unix() + count := 0 + ids := []int64{} + + err := pool.BeginFunc(ctx, func(tx pgx.Tx) error { + stmt := ` + WITH account AS ( + SELECT id + FROM accounts + WHERE + expires_at < $1 + ) + DELETE FROM accounts + WHERE accounts.id IN(SELECT id FROM account) + RETURNING accounts.id` + rows, err := tx.Query(ctx, stmt, now) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var id int64 + rows.Scan(&id) + ids = append(ids, id) + count++ + } + return nil + }) + + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Error("failed cleaning stale accounts") + return + } + + logger.WithFields(logrus.Fields{ + "count": count, + }).Info("cleaned stale accounts") +} + +func cleanQueues(ctx context.Context, logger *logrus.Logger, jobsConn rmq.Connection) { + cleaner := rmq.NewCleaner(jobsConn) + count, err := cleaner.Clean() + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Error("failed cleaning jobs from queues") + return + } + + logger.WithFields(logrus.Fields{ + "count": count, + }).Debug("returned jobs to queues") +} + func reportStats(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client) { var ( count int64