use a locking mechanism for the scheduler instead of the singleton model in the cron lib

This commit is contained in:
Andre Medeiros 2023-05-20 09:47:38 -04:00
parent c216161836
commit b992d23c03
3 changed files with 19 additions and 9 deletions

View file

@ -23,8 +23,14 @@ import (
"github.com/christianselig/apollo-backend/internal/repository" "github.com/christianselig/apollo-backend/internal/repository"
) )
const batchSize = 250 const (
const accountEnqueueSeconds = 60 batchSize = 250
accountEnqueueSeconds = 60
)
var (
enqueueAccountsMutex sync.Mutex
)
func SchedulerCmd(ctx context.Context) *cobra.Command { func SchedulerCmd(ctx context.Context) *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
@ -103,9 +109,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
s := gocron.NewScheduler(time.UTC) s := gocron.NewScheduler(time.UTC)
s.SetMaxConcurrentJobs(8, gocron.WaitMode) s.SetMaxConcurrentJobs(8, gocron.WaitMode)
eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) }) _, _ = s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) })
eaj.SingletonMode()
_, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) }) _, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) })
_, _ = s.Every(5).Seconds().Do(func() { enqueueUsers(ctx, logger, statsd, db, userQueue) }) _, _ = s.Every(5).Seconds().Do(func() { enqueueUsers(ctx, logger, statsd, db, userQueue) })
_, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, liveActivitiesQueue) }) _, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, liveActivitiesQueue) })
@ -456,6 +460,12 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats
} }
func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) { func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) {
if enqueueAccountsMutex.TryLock() {
defer enqueueAccountsMutex.Unlock()
} else {
return
}
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()

View file

@ -1,7 +1,7 @@
{ {
"access_token": "***REMOVED***", "access_token": "xxx",
"token_type": "bearer", "token_type": "bearer",
"expires_in": 3600, "expires_in": 3600,
"refresh_token": "***REMOVED***", "refresh_token": "yyy",
"scope": "account creddits edit flair history identity livemanage modconfig modcontributors modflair modlog modmail modothers modposts modself modtraffic modwiki mysubreddits privatemessages read report save structuredstyles submit subscribe vote wikiedit wikiread" "scope": "account creddits edit flair history identity livemanage modconfig modcontributors modflair modlog modmail modothers modposts modself modtraffic modwiki mysubreddits privatemessages read report save structuredstyles submit subscribe vote wikiedit wikiread"
} }

View file

@ -57,8 +57,8 @@ func TestRefreshTokenResponseParsing(t *testing.T) {
rtr := ret.(*reddit.RefreshTokenResponse) rtr := ret.(*reddit.RefreshTokenResponse)
assert.NotNil(t, rtr) assert.NotNil(t, rtr)
assert.Equal(t, "***REMOVED***", rtr.AccessToken) assert.Equal(t, "xxx", rtr.AccessToken)
assert.Equal(t, "***REMOVED***", rtr.RefreshToken) assert.Equal(t, "yyy", rtr.RefreshToken)
assert.Equal(t, 1*time.Hour, rtr.Expiry) assert.Equal(t, 1*time.Hour, rtr.Expiry)
} }