enqueue directly from redis

This commit is contained in:
Andre Medeiros 2021-07-09 01:05:36 -04:00
parent 4a92b90df9
commit c4a0704883

View file

@ -144,7 +144,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
last_enqueued_at < $1 last_enqueued_at < $1
OR last_checked_at < $2 OR last_checked_at < $2
ORDER BY last_checked_at ORDER BY last_checked_at
LIMIT 500 LIMIT 400
) )
UPDATE accounts UPDATE accounts
SET last_enqueued_at = $3 SET last_enqueued_at = $3
@ -198,6 +198,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
for i=1, #ids do for i=1, #ids do
local key = "locks:accounts:" .. ids[i] local key = "locks:accounts:" .. ids[i]
if redis.call("exists", key) == 0 then if redis.call("exists", key) == 0 then
redis.call("lpush", "rmq::queue::[notifications]::ready", ids[i])
redis.call("setex", key, %d, 1) redis.call("setex", key, %d, 1)
retv[#retv + 1] = ids[i] retv[#retv + 1] = ids[i]
end end
@ -218,17 +219,19 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
enqueued += len(vals) enqueued += len(vals)
skipped += len(batch) - len(vals) skipped += len(batch) - len(vals)
for _, val := range vals { /*
id := val.(int64) for _, val := range vals {
payload := fmt.Sprintf("%d", id) id := val.(int64)
if err = queue.Publish(payload); err != nil { payload := fmt.Sprintf("%d", id)
logger.WithFields(logrus.Fields{ if err = queue.Publish(payload); err != nil {
"accountID": payload, logger.WithFields(logrus.Fields{
"err": err, "accountID": payload,
}).Error("failed to enqueue account") "err": err,
failed++ }).Error("failed to enqueue account")
failed++
}
} }
} */
} }
statsd.Histogram("apollo.queue.enqueued", float64(enqueued), []string{}, 1) statsd.Histogram("apollo.queue.enqueued", float64(enqueued), []string{}, 1)