be more aggressive with scheduling

This commit is contained in:
Andre Medeiros 2021-07-09 00:27:50 -04:00
parent 961e41094d
commit 60e18c37f7

View file

@ -21,7 +21,9 @@ import (
) )
const ( const (
batchSize = 100 batchSize = 100
checkTimeout = 60 // how long until we force a check
enqueueTimeout = 5 // how long until we try to re-enqueue
) )
func main() { func main() {
@ -128,9 +130,8 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
// and at most 6 seconds ago. Also look for accounts that haven't been checked // and at most 6 seconds ago. Also look for accounts that haven't been checked
// in over a minute. // in over a minute.
ts := start.Unix() ts := start.Unix()
left := ts - 6 ready := ts - enqueueTimeout
right := left + 1 expired := ts - checkTimeout
expired := ts - 60
ids := []int64{} ids := []int64{}
@ -140,15 +141,15 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
SELECT id SELECT id
FROM accounts FROM accounts
WHERE WHERE
last_checked_at BETWEEN $1 AND $2 last_enqueued_at < $1
OR last_checked_at < $3 OR last_checked_at < $2
ORDER BY last_checked_at ORDER BY last_checked_at
) )
UPDATE accounts UPDATE accounts
SET last_enqueued_at = $4 SET last_enqueued_at = $3
WHERE accounts.id IN(SELECT id FROM account) WHERE accounts.id IN(SELECT id FROM account)
RETURNING accounts.id` RETURNING accounts.id`
rows, err := tx.Query(ctx, stmt, left, right, expired, now) rows, err := tx.Query(ctx, stmt, ready, expired, now)
if err != nil { if err != nil {
return err return err
} }
@ -170,8 +171,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
logger.WithFields(logrus.Fields{ logger.WithFields(logrus.Fields{
"count": len(ids), "count": len(ids),
"start": left, "start": ready,
"end": right,
}).Debug("enqueueing account batch") }).Debug("enqueueing account batch")
enqueued := 0 enqueued := 0
@ -190,20 +190,20 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
"len": len(batch), "len": len(batch),
}).Debug("enqueueing batch") }).Debug("enqueueing batch")
lua := ` lua := fmt.Sprintf(`
local retv={} local retv={}
local ids=cjson.decode(ARGV[1]) local ids=cjson.decode(ARGV[1])
for i=1, #ids do for i=1, #ids do
local key = "locks:accounts:" .. ids[i] local key = "locks:accounts:" .. ids[i]
if redis.call("exists", key) == 0 then if redis.call("exists", key) == 0 then
redis.call("setex", key, 60, 1) redis.call("setex", key, %d, 1)
retv[#retv + 1] = ids[i] retv[#retv + 1] = ids[i]
end end
end end
return retv return retv
` `, checkTimeout)
res, err := redisConn.Eval(ctx, lua, []string{}, batch).Result() res, err := redisConn.Eval(ctx, lua, []string{}, batch).Result()
if err != nil { if err != nil {
@ -239,8 +239,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
"count": enqueued, "count": enqueued,
"skipped": skipped, "skipped": skipped,
"failed": failed, "failed": failed,
"start": left, "start": ready,
"end": right,
}).Info("done enqueueing account batch") }).Info("done enqueueing account batch")
} }