diff --git a/internal/cmd/scheduler.go b/internal/cmd/scheduler.go index ec1549e..56a890b 100644 --- a/internal/cmd/scheduler.go +++ b/internal/cmd/scheduler.go @@ -7,7 +7,6 @@ import ( _ "net/http/pprof" "strconv" "sync" - "sync/atomic" "time" "github.com/DataDog/datadog-go/statsd" @@ -24,6 +23,7 @@ import ( ) const batchSize = 250 +const accountEnqueueSeconds = 60 func SchedulerCmd(ctx context.Context) *cobra.Command { cmd := &cobra.Command{ @@ -102,7 +102,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command { s := gocron.NewScheduler(time.UTC) s.SetMaxConcurrentJobs(8, gocron.WaitMode) - eaj, _ := s.Every(10).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) }) + eaj, _ := s.Every(accountEnqueueSeconds).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) }) eaj.SingletonMode() _, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) }) @@ -483,6 +483,47 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli ids = append(ids, id) } + chunks := [][]string{} + for i := 0; i < accountEnqueueSeconds; i++ { + min := (i * len(ids) / accountEnqueueSeconds) + max := ((i + 1) * len(ids)) / accountEnqueueSeconds + chunks = append(chunks, ids[min:max]) + } + + _ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), []string{"queue:notifications"}, 1) + + wg := sync.WaitGroup{} + for i := 0; i < accountEnqueueSeconds; i++ { + wg.Add(1) + go func(ctx context.Context, offset int) { + ids := chunks[offset] + time.Sleep(time.Duration(offset) * time.Second) + + enqueued, err := redisConn.EvalSha(ctx, luaSha, []string{"locks:accounts"}, ids).StringSlice() + if err != nil { + logger.Error("failed to check for locked accounts", zap.Error(err)) + } + + if err = queue.Publish(enqueued...); err != nil { + logger.Error("failed to enqueue account batch", + zap.Error(err), + zap.Int("offset", offset), + zap.Int("candidates", len(ids)), + zap.Int("enqueued", len(enqueued)), + ) + return + } + + logger.Info("enqueued account batch", + zap.Int("offset", offset), + zap.Int("attempts", len(ids)), + zap.Int("enqueued", len(enqueued)), + ) + + }(ctx, i) + } + wg.Wait() + var ( enqueued int64 = 0 skipped int64 = 0 @@ -494,49 +535,4 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli _ = statsd.Histogram("apollo.queue.skipped", float64(skipped), tags, 1) _ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1) }() - - logger.Debug("enqueueing account batch", zap.Int("count", len(ids)), zap.Time("start", now)) - - // Split ids in batches - wg := sync.WaitGroup{} - for i := 0; i < len(ids); i += batchSize { - wg.Add(1) - go func(offset int, ctx context.Context) { - defer wg.Done() - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - j := offset + batchSize - if j > len(ids) { - j = len(ids) - } - batch := ids[offset:j] - - logger.Debug("enqueueing batch", zap.Int("len", len(batch))) - - unlocked, err := redisConn.EvalSha(ctx, luaSha, []string{"locks:accounts"}, batch).StringSlice() - if err != nil { - logger.Error("failed to check for locked accounts", zap.Error(err)) - } - - atomic.AddInt64(&skipped, int64(len(batch)-len(unlocked))) - atomic.AddInt64(&enqueued, int64(len(unlocked))) - - if len(unlocked) == 0 { - return - } - - if err = queue.Publish(unlocked...); err != nil { - logger.Error("failed to enqueue account batch", zap.Error(err)) - } - }(i, ctx) - } - wg.Wait() - - logger.Info("enqueued account batch", - zap.Int64("count", enqueued), - zap.Int64("skipped", skipped), - zap.Int64("duration", time.Since(now).Milliseconds()), - ) }