enqueue accounts in a more manageable fashion

This commit is contained in:
Andre Medeiros 2021-07-09 01:29:07 -04:00
parent c4a0704883
commit a93ff0e1af

View file

@ -7,6 +7,7 @@ import (
"log" "log"
"os" "os"
"os/signal" "os/signal"
"strconv"
"syscall" "syscall"
"time" "time"
@ -144,7 +145,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
last_enqueued_at < $1 last_enqueued_at < $1
OR last_checked_at < $2 OR last_checked_at < $2
ORDER BY last_checked_at ORDER BY last_checked_at
LIMIT 400 LIMIT 1000
) )
UPDATE accounts UPDATE accounts
SET last_enqueued_at = $3 SET last_enqueued_at = $3
@ -177,7 +178,6 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
enqueued := 0 enqueued := 0
skipped := 0 skipped := 0
failed := 0
// Split ids in batches // Split ids in batches
for i := 0; i < len(ids); i += batchSize { for i := 0; i < len(ids); i += batchSize {
@ -198,7 +198,6 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
for i=1, #ids do for i=1, #ids do
local key = "locks:accounts:" .. ids[i] local key = "locks:accounts:" .. ids[i]
if redis.call("exists", key) == 0 then if redis.call("exists", key) == 0 then
redis.call("lpush", "rmq::queue::[notifications]::ready", ids[i])
redis.call("setex", key, %d, 1) redis.call("setex", key, %d, 1)
retv[#retv + 1] = ids[i] retv[#retv + 1] = ids[i]
end end
@ -216,33 +215,32 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
} }
vals := res.([]interface{}) vals := res.([]interface{})
enqueued += len(vals)
skipped += len(batch) - len(vals) skipped += len(batch) - len(vals)
enqueued += len(vals)
/* if len(vals) == 0 {
for _, val := range vals { continue
id := val.(int64) }
payload := fmt.Sprintf("%d", id)
if err = queue.Publish(payload); err != nil { batchIds := make([]string, len(vals))
logger.WithFields(logrus.Fields{ for k, v := range vals {
"accountID": payload, batchIds[k] = strconv.FormatInt(v.(int64), 10)
"err": err, }
}).Error("failed to enqueue account")
failed++ if err = queue.Publish(batchIds...); err != nil {
} logger.WithFields(logrus.Fields{
} "err": err,
*/ }).Error("failed to enqueue account")
}
} }
statsd.Histogram("apollo.queue.enqueued", float64(enqueued), []string{}, 1) statsd.Histogram("apollo.queue.enqueued", float64(enqueued), []string{}, 1)
statsd.Histogram("apollo.queue.skipped", float64(skipped), []string{}, 1) statsd.Histogram("apollo.queue.skipped", float64(skipped), []string{}, 1)
statsd.Histogram("apollo.queue.failed", float64(failed), []string{}, 1)
statsd.Histogram("apollo.queue.runtime", float64(time.Now().Sub(start).Milliseconds()), []string{}, 1) statsd.Histogram("apollo.queue.runtime", float64(time.Now().Sub(start).Milliseconds()), []string{}, 1)
logger.WithFields(logrus.Fields{ logger.WithFields(logrus.Fields{
"count": enqueued, "count": enqueued,
"skipped": skipped, "skipped": skipped,
"failed": failed,
"start": ready, "start": ready,
}).Info("done enqueueing account batch") }).Info("done enqueueing account batch")
} }