diff --git a/internal/cmd/scheduler.go b/internal/cmd/scheduler.go index c7e9700..b9ed9d4 100644 --- a/internal/cmd/scheduler.go +++ b/internal/cmd/scheduler.go @@ -37,25 +37,25 @@ func SchedulerCmd(ctx context.Context) *cobra.Command { statsd, err := cmdutil.NewStatsdClient() if err != nil { - return err + return fmt.Errorf("could not initialize statsd: %w", err) } defer statsd.Close() db, err := cmdutil.NewDatabasePool(ctx, 1) if err != nil { - return err + return fmt.Errorf("could not connect to database: %w", err) } defer db.Close() redis, err := cmdutil.NewRedisLocksClient(ctx, 64) if err != nil { - return err + return fmt.Errorf("could not connect to redis locks: %w", err) } defer redis.Close() qredis, err := cmdutil.NewRedisQueueClient(ctx, 16) if err != nil { - return err + return fmt.Errorf("could not connect to redis queues: %w", err) } defer qredis.Close() @@ -103,7 +103,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command { s := gocron.NewScheduler(time.UTC) s.SetMaxConcurrentJobs(8, gocron.WaitMode) - eaj, _ := s.Every(accountEnqueueSeconds).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) }) + eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) }) eaj.SingletonMode() _, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) }) @@ -139,8 +139,7 @@ func evalScript(ctx context.Context, redis *redis.Client) (string, error) { for i=1, #ARGV do local key = KEYS[1] .. ":" .. ARGV[i] - if redis.call("exists", key) == 0 then - redis.call("set", key, 1, "ex", %.0f) + if redis.call("set", key, 1, "nx", "ex", %.0f) then retv[#retv + 1] = ARGV[i] end end @@ -504,7 +503,10 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli defer wg.Done() candidates := chunks[offset] - time.Sleep(time.Duration(offset) * time.Second) + select { + case <-ctx.Done(): //context cancelled + case <-time.After(time.Duration(offset) * time.Second): //timeout + } enqueued, err := redisConn.EvalSha(ctx, luaSha, []string{"locks:accounts"}, candidates).StringSlice() if err != nil { @@ -524,7 +526,7 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli logger.Error("failed to enqueue account batch", zap.Error(err), zap.Int("offset", offset), - zap.Int("candidates", len(ids)), + zap.Int("candidates", len(candidates)), zap.Int("enqueued", len(enqueued)), ) return @@ -532,7 +534,7 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli logger.Info("enqueued account batch", zap.Int("offset", offset), - zap.Int("candidates", len(ids)), + zap.Int("candidates", len(candidates)), zap.Int("enqueued", len(enqueued)), ) }(ctx, i)