From c4a0704883b65dc4e993a26b8c32c4299d1b2278 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Fri, 9 Jul 2021 01:05:36 -0400 Subject: [PATCH] enqueue directly from redis --- cmd/apollo-scheduler/main.go | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/cmd/apollo-scheduler/main.go b/cmd/apollo-scheduler/main.go index 01604a9..3efadde 100644 --- a/cmd/apollo-scheduler/main.go +++ b/cmd/apollo-scheduler/main.go @@ -144,7 +144,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. last_enqueued_at < $1 OR last_checked_at < $2 ORDER BY last_checked_at - LIMIT 500 + LIMIT 400 ) UPDATE accounts SET last_enqueued_at = $3 @@ -198,6 +198,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. for i=1, #ids do local key = "locks:accounts:" .. ids[i] if redis.call("exists", key) == 0 then + redis.call("lpush", "rmq::queue::[notifications]::ready", ids[i]) redis.call("setex", key, %d, 1) retv[#retv + 1] = ids[i] end @@ -218,17 +219,19 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. enqueued += len(vals) skipped += len(batch) - len(vals) - for _, val := range vals { - id := val.(int64) - payload := fmt.Sprintf("%d", id) - if err = queue.Publish(payload); err != nil { - logger.WithFields(logrus.Fields{ - "accountID": payload, - "err": err, - }).Error("failed to enqueue account") - failed++ + /* + for _, val := range vals { + id := val.(int64) + payload := fmt.Sprintf("%d", id) + if err = queue.Publish(payload); err != nil { + logger.WithFields(logrus.Fields{ + "accountID": payload, + "err": err, + }).Error("failed to enqueue account") + failed++ + } } - } + */ } statsd.Histogram("apollo.queue.enqueued", float64(enqueued), []string{}, 1)