fix context usage in scheduler

This commit is contained in:
Andre Medeiros 2022-10-26 20:48:22 -04:00
parent 1a861ea628
commit c08de574e9

View file

@ -123,6 +123,9 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
} }
func evalScript(ctx context.Context, redis *redis.Client) (string, error) { func evalScript(ctx context.Context, redis *redis.Client) (string, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
lua := fmt.Sprintf(` lua := fmt.Sprintf(`
local retv={} local retv={}
@ -141,6 +144,9 @@ func evalScript(ctx context.Context, redis *redis.Client) (string, error) {
} }
func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) { func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
now := time.Now().UTC() now := time.Now().UTC()
next := now.Add(domain.LiveActivityCheckInterval) next := now.Add(domain.LiveActivityCheckInterval)
@ -192,6 +198,9 @@ func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpoo
} }
func pruneAccounts(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool) { func pruneAccounts(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
expiry := time.Now().Add(-domain.StaleTokenThreshold) expiry := time.Now().Add(-domain.StaleTokenThreshold)
ar := repository.NewPostgresAccount(pool) ar := repository.NewPostgresAccount(pool)
@ -213,6 +222,9 @@ func pruneAccounts(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool)
} }
func pruneDevices(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool) { func pruneDevices(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
now := time.Now() now := time.Now()
dr := repository.NewPostgresDevice(pool) dr := repository.NewPostgresDevice(pool)
@ -241,6 +253,9 @@ func cleanQueues(logger *zap.Logger, jobsConn rmq.Connection) {
} }
func reportStats(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool) { func reportStats(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var ( var (
count int64 count int64
@ -265,6 +280,9 @@ func reportStats(ctx context.Context, logger *zap.Logger, statsd *statsd.Client,
} }
func enqueueUsers(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) { func enqueueUsers(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
now := time.Now() now := time.Now()
next := now.Add(domain.NotificationCheckInterval) next := now.Add(domain.NotificationCheckInterval)
@ -317,6 +335,9 @@ func enqueueUsers(ctx context.Context, logger *zap.Logger, statsd *statsd.Client
} }
func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queues []rmq.Queue) { func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queues []rmq.Queue) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
now := time.Now() now := time.Now()
next := now.Add(domain.SubredditCheckInterval) next := now.Add(domain.SubredditCheckInterval)
@ -372,6 +393,9 @@ func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.C
} }
func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) { func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
now := time.Now() now := time.Now()
next := now.Add(domain.StuckNotificationCheckInterval) next := now.Add(domain.StuckNotificationCheckInterval)
@ -425,6 +449,9 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats
} }
func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) { func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
now := time.Now() now := time.Now()
query := `SELECT reddit_account_id FROM accounts` query := `SELECT reddit_account_id FROM accounts`