tweak scheduler

This commit is contained in:
Andre Medeiros 2022-05-23 18:51:30 -04:00
parent b1ed5fb71a
commit 34e050daad

View file

@ -21,7 +21,10 @@ import (
"github.com/christianselig/apollo-backend/internal/repository" "github.com/christianselig/apollo-backend/internal/repository"
) )
const batchSize = 250 const (
batchSize = 1000
maxNotificationChecks = 5000
)
func SchedulerCmd(ctx context.Context) *cobra.Command { func SchedulerCmd(ctx context.Context) *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
@ -383,7 +386,8 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli
now := time.Now() now := time.Now()
next := now.Add(domain.NotificationCheckInterval) next := now.Add(domain.NotificationCheckInterval)
ids := []int64{} ids := make([]int64, maxNotificationChecks)
numids := 0
enqueued := 0 enqueued := 0
skipped := 0 skipped := 0
@ -395,7 +399,7 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli
}() }()
err := pool.BeginFunc(ctx, func(tx pgx.Tx) error { err := pool.BeginFunc(ctx, func(tx pgx.Tx) error {
stmt := ` stmt := fmt.Sprintf(`
UPDATE accounts UPDATE accounts
SET next_notification_check_at = $2 SET next_notification_check_at = $2
WHERE accounts.id IN( WHERE accounts.id IN(
@ -404,18 +408,19 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli
WHERE next_notification_check_at < $1 WHERE next_notification_check_at < $1
ORDER BY next_notification_check_at ORDER BY next_notification_check_at
FOR UPDATE SKIP LOCKED FOR UPDATE SKIP LOCKED
LIMIT 5000 LIMIT %d
) )
RETURNING accounts.id` RETURNING accounts.id`, maxNotificationChecks)
rows, err := tx.Query(ctx, stmt, now, next) rows, err := tx.Query(ctx, stmt, now, next)
if err != nil { if err != nil {
return err return err
} }
defer rows.Close() defer rows.Close()
for rows.Next() { for i := 0; rows.Next(); i++ {
var id int64 var id int64
_ = rows.Scan(&id) _ = rows.Scan(&id)
ids = append(ids, id) ids[i] = id
numids = i
} }
return nil return nil
}) })
@ -425,17 +430,17 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli
return return
} }
if len(ids) == 0 { if numids == 0 {
return return
} }
logger.Debug("enqueueing account batch", zap.Int("count", len(ids)), zap.Time("start", now)) logger.Debug("enqueueing account batch", zap.Int("count", len(ids)), zap.Time("start", now))
// Split ids in batches // Split ids in batches
for i := 0; i < len(ids); i += batchSize { for i := 0; i < numids; i += batchSize {
j := i + batchSize j := i + batchSize
if j > len(ids) { if j > numids {
j = len(ids) j = numids
} }
batch := Int64Slice(ids[i:j]) batch := Int64Slice(ids[i:j])