mirror of
https://github.com/christianselig/apollo-backend
synced 2024-11-10 22:17:44 +00:00
batch in second intervals
This commit is contained in:
parent
4d2554fba6
commit
45576b4d16
1 changed files with 7 additions and 4 deletions
|
@ -108,6 +108,10 @@ func main() {
|
||||||
|
|
||||||
func enqueueAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool, redisConn *redis.Client, queue rmq.Queue) {
|
func enqueueAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool, redisConn *redis.Client, queue rmq.Queue) {
|
||||||
now := float64(time.Now().UnixNano()/int64(time.Millisecond)) / 1000
|
now := float64(time.Now().UnixNano()/int64(time.Millisecond)) / 1000
|
||||||
|
|
||||||
|
start := now
|
||||||
|
end := now + 1
|
||||||
|
|
||||||
ids := []int64{}
|
ids := []int64{}
|
||||||
|
|
||||||
err := pool.BeginFunc(ctx, func(tx pgx.Tx) error {
|
err := pool.BeginFunc(ctx, func(tx pgx.Tx) error {
|
||||||
|
@ -115,9 +119,8 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.P
|
||||||
WITH account AS (
|
WITH account AS (
|
||||||
SELECT id
|
SELECT id
|
||||||
FROM accounts
|
FROM accounts
|
||||||
WHERE
|
WHERE (last_checked_at + 5) BETWEEN $1 AND $2
|
||||||
last_checked_at + 5 < $1 AND
|
OR last_checked_at + 60 < $1
|
||||||
last_enqueued_at + 5 < $1
|
|
||||||
ORDER BY last_checked_at
|
ORDER BY last_checked_at
|
||||||
FOR UPDATE SKIP LOCKED
|
FOR UPDATE SKIP LOCKED
|
||||||
)
|
)
|
||||||
|
@ -125,7 +128,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.P
|
||||||
SET last_enqueued_at = $1
|
SET last_enqueued_at = $1
|
||||||
WHERE accounts.id IN(SELECT id FROM account)
|
WHERE accounts.id IN(SELECT id FROM account)
|
||||||
RETURNING accounts.id`
|
RETURNING accounts.id`
|
||||||
rows, err := tx.Query(ctx, stmt, now)
|
rows, err := tx.Query(ctx, stmt, start, end)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue