From 0accbde48bd7927365e2a153c3edd970d1175ab7 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sat, 20 May 2023 09:47:38 -0400 Subject: [PATCH] use a locking mechanism for the scheduler instead of the singleton model in the cron lib --- internal/cmd/scheduler.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/internal/cmd/scheduler.go b/internal/cmd/scheduler.go index de66a75..1296257 100644 --- a/internal/cmd/scheduler.go +++ b/internal/cmd/scheduler.go @@ -23,8 +23,14 @@ import ( "github.com/christianselig/apollo-backend/internal/repository" ) -const batchSize = 250 -const accountEnqueueSeconds = 60 +const ( + batchSize = 250 + accountEnqueueSeconds = 60 +) + +var ( + enqueueAccountsMutex sync.Mutex +) func SchedulerCmd(ctx context.Context) *cobra.Command { cmd := &cobra.Command{ @@ -103,9 +109,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command { s := gocron.NewScheduler(time.UTC) s.SetMaxConcurrentJobs(8, gocron.WaitMode) - eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) }) - eaj.SingletonMode() - + _, _ = s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) }) _, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) }) _, _ = s.Every(5).Seconds().Do(func() { enqueueUsers(ctx, logger, statsd, db, userQueue) }) _, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, liveActivitiesQueue) }) @@ -456,6 +460,12 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats } func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) { + if enqueueAccountsMutex.TryLock() { + defer enqueueAccountsMutex.Unlock() + } else { + return + } + ctx, cancel := context.WithCancel(ctx) defer cancel()