diff --git a/internal/cmd/scheduler.go b/internal/cmd/scheduler.go index e1414e9..5989921 100644 --- a/internal/cmd/scheduler.go +++ b/internal/cmd/scheduler.go @@ -7,6 +7,7 @@ import ( "net/http" _ "net/http/pprof" "strconv" + "sync" "time" "github.com/DataDog/datadog-go/statsd" @@ -92,7 +93,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command { } s := gocron.NewScheduler(time.UTC) - _, _ = s.Every(500).Milliseconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) }) + _, _ = s.Every(500).Milliseconds().SingletonMode().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) }) _, _ = s.Every(5).Second().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) }) _, _ = s.Every(5).Second().Do(func() { enqueueUsers(ctx, logger, statsd, db, userQueue) }) _, _ = s.Every(5).Second().Do(func() { cleanQueues(logger, queue) }) @@ -441,38 +442,47 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli logger.Debug("enqueueing account batch", zap.Int("count", len(ids)), zap.Time("start", now)) + batches := (idslen / batchSize) + 1 + wg := sync.WaitGroup{} + wg.Add(batches) + // Split ids in batches for i := 0; i < idslen; i += batchSize { - j := i + batchSize - if j > idslen { - j = idslen - } - batch := Int64Slice(ids[i:j]) + go func(offset int) { + defer wg.Done() - logger.Debug("enqueueing batch", zap.Int("len", len(batch))) + j := offset + batchSize + if j > idslen { + j = idslen + } + batch := Int64Slice(ids[offset:j]) - res, err := redisConn.EvalSha(ctx, luaSha, []string{"locks:accounts"}, batch).Result() - if err != nil { - logger.Error("failed to check for locked accounts", zap.Error(err)) - } + logger.Debug("enqueueing batch", zap.Int("len", len(batch))) - vals := res.([]interface{}) - skipped += len(batch) - len(vals) - enqueued += len(vals) + res, err := redisConn.EvalSha(ctx, luaSha, []string{"locks:accounts"}, batch).Result() + if err != nil { + logger.Error("failed to check for locked accounts", zap.Error(err)) + } - if len(vals) == 0 { - continue - } + vals := res.([]interface{}) + skipped += len(batch) - len(vals) + enqueued += len(vals) - batchIds := make([]string, len(vals)) - for k, v := range vals { - batchIds[k] = strconv.FormatInt(v.(int64), 10) - } + if len(vals) == 0 { + return + } - if err = queue.Publish(batchIds...); err != nil { - logger.Error("failed to enqueue account batch", zap.Error(err)) - } + batchIds := make([]string, len(vals)) + for k, v := range vals { + batchIds[k] = strconv.FormatInt(v.(int64), 10) + } + + if err = queue.Publish(batchIds...); err != nil { + logger.Error("failed to enqueue account batch", zap.Error(err)) + } + }(i * batchSize) } + wg.Wait() logger.Debug("done enqueueing account batch", zap.Int("count", enqueued), zap.Int("skipped", skipped), zap.Time("start", now)) }