make the scheduler better by reducing redis calls

This commit is contained in:
Andre Medeiros 2023-03-16 16:43:35 +00:00
parent 0cadd624cc
commit fc0d290735

View file

@ -37,25 +37,25 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
statsd, err := cmdutil.NewStatsdClient() statsd, err := cmdutil.NewStatsdClient()
if err != nil { if err != nil {
return err return fmt.Errorf("could not initialize statsd: %w", err)
} }
defer statsd.Close() defer statsd.Close()
db, err := cmdutil.NewDatabasePool(ctx, 1) db, err := cmdutil.NewDatabasePool(ctx, 1)
if err != nil { if err != nil {
return err return fmt.Errorf("could not connect to database: %w", err)
} }
defer db.Close() defer db.Close()
redis, err := cmdutil.NewRedisLocksClient(ctx, 64) redis, err := cmdutil.NewRedisLocksClient(ctx, 64)
if err != nil { if err != nil {
return err return fmt.Errorf("could not connect to redis locks: %w", err)
} }
defer redis.Close() defer redis.Close()
qredis, err := cmdutil.NewRedisQueueClient(ctx, 16) qredis, err := cmdutil.NewRedisQueueClient(ctx, 16)
if err != nil { if err != nil {
return err return fmt.Errorf("could not connect to redis queues: %w", err)
} }
defer qredis.Close() defer qredis.Close()
@ -103,7 +103,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
s := gocron.NewScheduler(time.UTC) s := gocron.NewScheduler(time.UTC)
s.SetMaxConcurrentJobs(8, gocron.WaitMode) s.SetMaxConcurrentJobs(8, gocron.WaitMode)
eaj, _ := s.Every(accountEnqueueSeconds).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) }) eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) })
eaj.SingletonMode() eaj.SingletonMode()
_, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) }) _, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) })
@ -139,8 +139,7 @@ func evalScript(ctx context.Context, redis *redis.Client) (string, error) {
for i=1, #ARGV do for i=1, #ARGV do
local key = KEYS[1] .. ":" .. ARGV[i] local key = KEYS[1] .. ":" .. ARGV[i]
if redis.call("exists", key) == 0 then if redis.call("set", key, 1, "nx", "ex", %.0f) then
redis.call("set", key, 1, "ex", %.0f)
retv[#retv + 1] = ARGV[i] retv[#retv + 1] = ARGV[i]
end end
end end
@ -504,7 +503,10 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli
defer wg.Done() defer wg.Done()
candidates := chunks[offset] candidates := chunks[offset]
time.Sleep(time.Duration(offset) * time.Second) select {
case <-ctx.Done(): //context cancelled
case <-time.After(time.Duration(offset) * time.Second): //timeout
}
enqueued, err := redisConn.EvalSha(ctx, luaSha, []string{"locks:accounts"}, candidates).StringSlice() enqueued, err := redisConn.EvalSha(ctx, luaSha, []string{"locks:accounts"}, candidates).StringSlice()
if err != nil { if err != nil {
@ -524,7 +526,7 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli
logger.Error("failed to enqueue account batch", logger.Error("failed to enqueue account batch",
zap.Error(err), zap.Error(err),
zap.Int("offset", offset), zap.Int("offset", offset),
zap.Int("candidates", len(ids)), zap.Int("candidates", len(candidates)),
zap.Int("enqueued", len(enqueued)), zap.Int("enqueued", len(enqueued)),
) )
return return
@ -532,7 +534,7 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli
logger.Info("enqueued account batch", logger.Info("enqueued account batch",
zap.Int("offset", offset), zap.Int("offset", offset),
zap.Int("candidates", len(ids)), zap.Int("candidates", len(candidates)),
zap.Int("enqueued", len(enqueued)), zap.Int("enqueued", len(enqueued)),
) )
}(ctx, i) }(ctx, i)