From b992d23c037305da6ced027950517bfef4712ebf Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sat, 20 May 2023 09:47:38 -0400 Subject: [PATCH] use a locking mechanism for the scheduler instead of the singleton model in the cron lib --- internal/cmd/scheduler.go | 20 +++++++++++++++----- internal/reddit/testdata/refresh_token.json | 4 ++-- internal/reddit/types_test.go | 4 ++-- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/internal/cmd/scheduler.go b/internal/cmd/scheduler.go index de66a75..1296257 100644 --- a/internal/cmd/scheduler.go +++ b/internal/cmd/scheduler.go @@ -23,8 +23,14 @@ import ( "github.com/christianselig/apollo-backend/internal/repository" ) -const batchSize = 250 -const accountEnqueueSeconds = 60 +const ( + batchSize = 250 + accountEnqueueSeconds = 60 +) + +var ( + enqueueAccountsMutex sync.Mutex +) func SchedulerCmd(ctx context.Context) *cobra.Command { cmd := &cobra.Command{ @@ -103,9 +109,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command { s := gocron.NewScheduler(time.UTC) s.SetMaxConcurrentJobs(8, gocron.WaitMode) - eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) }) - eaj.SingletonMode() - + _, _ = s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) }) _, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) }) _, _ = s.Every(5).Seconds().Do(func() { enqueueUsers(ctx, logger, statsd, db, userQueue) }) _, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, liveActivitiesQueue) }) @@ -456,6 +460,12 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats } func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) { + if enqueueAccountsMutex.TryLock() { + defer enqueueAccountsMutex.Unlock() + } else { + return + } + ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/internal/reddit/testdata/refresh_token.json b/internal/reddit/testdata/refresh_token.json index d1c7ad3..2eb6a27 100644 --- a/internal/reddit/testdata/refresh_token.json +++ b/internal/reddit/testdata/refresh_token.json @@ -1,7 +1,7 @@ { - "access_token": "***REMOVED***", + "access_token": "xxx", "token_type": "bearer", "expires_in": 3600, - "refresh_token": "***REMOVED***", + "refresh_token": "yyy", "scope": "account creddits edit flair history identity livemanage modconfig modcontributors modflair modlog modmail modothers modposts modself modtraffic modwiki mysubreddits privatemessages read report save structuredstyles submit subscribe vote wikiedit wikiread" } diff --git a/internal/reddit/types_test.go b/internal/reddit/types_test.go index d4debab..67b43f0 100644 --- a/internal/reddit/types_test.go +++ b/internal/reddit/types_test.go @@ -57,8 +57,8 @@ func TestRefreshTokenResponseParsing(t *testing.T) { rtr := ret.(*reddit.RefreshTokenResponse) assert.NotNil(t, rtr) - assert.Equal(t, "***REMOVED***", rtr.AccessToken) - assert.Equal(t, "***REMOVED***", rtr.RefreshToken) + assert.Equal(t, "xxx", rtr.AccessToken) + assert.Equal(t, "yyy", rtr.RefreshToken) assert.Equal(t, 1*time.Hour, rtr.Expiry) }