mirror of
https://github.com/christianselig/apollo-backend
synced 2024-11-13 23:47:44 +00:00
use a locking mechanism for the scheduler instead of the singleton model in the cron lib
This commit is contained in:
parent
c216161836
commit
0accbde48b
1 changed files with 15 additions and 5 deletions
|
@ -23,8 +23,14 @@ import (
|
|||
"github.com/christianselig/apollo-backend/internal/repository"
|
||||
)
|
||||
|
||||
const batchSize = 250
|
||||
const accountEnqueueSeconds = 60
|
||||
const (
|
||||
batchSize = 250
|
||||
accountEnqueueSeconds = 60
|
||||
)
|
||||
|
||||
var (
|
||||
enqueueAccountsMutex sync.Mutex
|
||||
)
|
||||
|
||||
func SchedulerCmd(ctx context.Context) *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
|
@ -103,9 +109,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
|
|||
s := gocron.NewScheduler(time.UTC)
|
||||
s.SetMaxConcurrentJobs(8, gocron.WaitMode)
|
||||
|
||||
eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) })
|
||||
eaj.SingletonMode()
|
||||
|
||||
_, _ = s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) })
|
||||
_, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) })
|
||||
_, _ = s.Every(5).Seconds().Do(func() { enqueueUsers(ctx, logger, statsd, db, userQueue) })
|
||||
_, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, liveActivitiesQueue) })
|
||||
|
@ -456,6 +460,12 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats
|
|||
}
|
||||
|
||||
func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) {
|
||||
if enqueueAccountsMutex.TryLock() {
|
||||
defer enqueueAccountsMutex.Unlock()
|
||||
} else {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
|
|
Loading…
Reference in a new issue