From 45576b4d161cd9f9c26f328a41c8f0635de98ec4 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Thu, 8 Jul 2021 20:53:09 -0400 Subject: [PATCH] batch in second intervals --- cmd/apollo-scheduler/main.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/cmd/apollo-scheduler/main.go b/cmd/apollo-scheduler/main.go index d72488e..eb0e4bb 100644 --- a/cmd/apollo-scheduler/main.go +++ b/cmd/apollo-scheduler/main.go @@ -108,6 +108,10 @@ func main() { func enqueueAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool, redisConn *redis.Client, queue rmq.Queue) { now := float64(time.Now().UnixNano()/int64(time.Millisecond)) / 1000 + + start := now + end := now + 1 + ids := []int64{} err := pool.BeginFunc(ctx, func(tx pgx.Tx) error { @@ -115,9 +119,8 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.P WITH account AS ( SELECT id FROM accounts - WHERE - last_checked_at + 5 < $1 AND - last_enqueued_at + 5 < $1 + WHERE (last_checked_at + 5) BETWEEN $1 AND $2 + OR last_checked_at + 60 < $1 ORDER BY last_checked_at FOR UPDATE SKIP LOCKED ) @@ -125,7 +128,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.P SET last_enqueued_at = $1 WHERE accounts.id IN(SELECT id FROM account) RETURNING accounts.id` - rows, err := tx.Query(ctx, stmt, now) + rows, err := tx.Query(ctx, stmt, start, end) if err != nil { return err }