clean queues and stale accounts

This commit is contained in:
Andre Medeiros 2021-07-12 15:36:22 -04:00
parent 33b3af6584
commit 56cbf6b4ce

View file

@ -90,15 +90,16 @@ func main() {
// Set up queues // Set up queues
var ( var (
jobsConn rmq.Connection
notificationsQueue rmq.Queue notificationsQueue rmq.Queue
) )
{ {
connection, err := rmq.OpenConnectionWithRedisClient("producer", redisConn, errChan) jobsConn, err = rmq.OpenConnectionWithRedisClient("producer", redisConn, errChan)
if err != nil { if err != nil {
panic(err) panic(err)
} }
notificationsQueue, err = connection.OpenQueue("notifications") notificationsQueue, err = jobsConn.OpenQueue("notifications")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -112,7 +113,9 @@ func main() {
s := gocron.NewScheduler(time.UTC) s := gocron.NewScheduler(time.UTC)
s.Every(1).Second().Do(func() { enqueueAccounts(ctx, logger, statsd, pool, redisConn, luaSha, notificationsQueue) }) s.Every(1).Second().Do(func() { enqueueAccounts(ctx, logger, statsd, pool, redisConn, luaSha, notificationsQueue) })
s.Every(1).Second().Do(func() { cleanQueues(ctx, logger, jobsConn) })
s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, pool, redisConn) }) s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, pool, redisConn) })
s.Every(1).Minute().Do(func() { cleanAccounts(ctx, logger, pool) })
s.StartAsync() s.StartAsync()
signals := make(chan os.Signal, 1) signals := make(chan os.Signal, 1)
@ -148,6 +151,64 @@ func evalScript(ctx context.Context, redisConn *redis.Client) (string, error) {
return redisConn.ScriptLoad(ctx, lua).Result() return redisConn.ScriptLoad(ctx, lua).Result()
} }
func cleanAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) {
now := time.Now().Unix()
count := 0
ids := []int64{}
err := pool.BeginFunc(ctx, func(tx pgx.Tx) error {
stmt := `
WITH account AS (
SELECT id
FROM accounts
WHERE
expires_at < $1
)
DELETE FROM accounts
WHERE accounts.id IN(SELECT id FROM account)
RETURNING accounts.id`
rows, err := tx.Query(ctx, stmt, now)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var id int64
rows.Scan(&id)
ids = append(ids, id)
count++
}
return nil
})
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed cleaning stale accounts")
return
}
logger.WithFields(logrus.Fields{
"count": count,
}).Info("cleaned stale accounts")
}
func cleanQueues(ctx context.Context, logger *logrus.Logger, jobsConn rmq.Connection) {
cleaner := rmq.NewCleaner(jobsConn)
count, err := cleaner.Clean()
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed cleaning jobs from queues")
return
}
logger.WithFields(logrus.Fields{
"count": count,
}).Debug("returned jobs to queues")
}
func reportStats(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client) { func reportStats(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client) {
var ( var (
count int64 count int64