From 34e050daad57e56465cfc0289bcc45a5fa1a4469 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Mon, 23 May 2022 18:51:30 -0400 Subject: [PATCH] tweak scheduler --- internal/cmd/scheduler.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/internal/cmd/scheduler.go b/internal/cmd/scheduler.go index 8729857..408632a 100644 --- a/internal/cmd/scheduler.go +++ b/internal/cmd/scheduler.go @@ -21,7 +21,10 @@ import ( "github.com/christianselig/apollo-backend/internal/repository" ) -const batchSize = 250 +const ( + batchSize = 1000 + maxNotificationChecks = 5000 +) func SchedulerCmd(ctx context.Context) *cobra.Command { cmd := &cobra.Command{ @@ -383,7 +386,8 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli now := time.Now() next := now.Add(domain.NotificationCheckInterval) - ids := []int64{} + ids := make([]int64, maxNotificationChecks) + numids := 0 enqueued := 0 skipped := 0 @@ -395,7 +399,7 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli }() err := pool.BeginFunc(ctx, func(tx pgx.Tx) error { - stmt := ` + stmt := fmt.Sprintf(` UPDATE accounts SET next_notification_check_at = $2 WHERE accounts.id IN( @@ -404,18 +408,19 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli WHERE next_notification_check_at < $1 ORDER BY next_notification_check_at FOR UPDATE SKIP LOCKED - LIMIT 5000 + LIMIT %d ) - RETURNING accounts.id` + RETURNING accounts.id`, maxNotificationChecks) rows, err := tx.Query(ctx, stmt, now, next) if err != nil { return err } defer rows.Close() - for rows.Next() { + for i := 0; rows.Next(); i++ { var id int64 _ = rows.Scan(&id) - ids = append(ids, id) + ids[i] = id + numids = i } return nil }) @@ -425,17 +430,17 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli return } - if len(ids) == 0 { + if numids == 0 { return } logger.Debug("enqueueing account batch", zap.Int("count", len(ids)), zap.Time("start", now)) // Split ids in batches - for i := 0; i < len(ids); i += batchSize { + for i := 0; i < numids; i += batchSize { j := i + batchSize - if j > len(ids) { - j = len(ids) + if j > numids { + j = numids } batch := Int64Slice(ids[i:j])