From c08de574e97e4dc0b8879f3c03bda2ea2eafcd09 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Wed, 26 Oct 2022 20:48:22 -0400 Subject: [PATCH] fix context usage in scheduler --- internal/cmd/scheduler.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/internal/cmd/scheduler.go b/internal/cmd/scheduler.go index 3156fbc..655dc66 100644 --- a/internal/cmd/scheduler.go +++ b/internal/cmd/scheduler.go @@ -123,6 +123,9 @@ func SchedulerCmd(ctx context.Context) *cobra.Command { } func evalScript(ctx context.Context, redis *redis.Client) (string, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + lua := fmt.Sprintf(` local retv={} @@ -141,6 +144,9 @@ func evalScript(ctx context.Context, redis *redis.Client) (string, error) { } func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + now := time.Now().UTC() next := now.Add(domain.LiveActivityCheckInterval) @@ -192,6 +198,9 @@ func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpoo } func pruneAccounts(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + expiry := time.Now().Add(-domain.StaleTokenThreshold) ar := repository.NewPostgresAccount(pool) @@ -213,6 +222,9 @@ func pruneAccounts(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool) } func pruneDevices(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + now := time.Now() dr := repository.NewPostgresDevice(pool) @@ -241,6 +253,9 @@ func cleanQueues(logger *zap.Logger, jobsConn rmq.Connection) { } func reportStats(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var ( count int64 @@ -265,6 +280,9 @@ func reportStats(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, } func enqueueUsers(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + now := time.Now() next := now.Add(domain.NotificationCheckInterval) @@ -317,6 +335,9 @@ func enqueueUsers(ctx context.Context, logger *zap.Logger, statsd *statsd.Client } func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queues []rmq.Queue) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + now := time.Now() next := now.Add(domain.SubredditCheckInterval) @@ -372,6 +393,9 @@ func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.C } func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + now := time.Now() next := now.Add(domain.StuckNotificationCheckInterval) @@ -425,6 +449,9 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats } func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + now := time.Now() query := `SELECT reddit_account_id FROM accounts`