diff --git a/cmd/apollo-scheduler/main.go b/cmd/apollo-scheduler/main.go index 3efadde..4542754 100644 --- a/cmd/apollo-scheduler/main.go +++ b/cmd/apollo-scheduler/main.go @@ -7,6 +7,7 @@ import ( "log" "os" "os/signal" + "strconv" "syscall" "time" @@ -144,7 +145,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. last_enqueued_at < $1 OR last_checked_at < $2 ORDER BY last_checked_at - LIMIT 400 + LIMIT 1000 ) UPDATE accounts SET last_enqueued_at = $3 @@ -177,7 +178,6 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. enqueued := 0 skipped := 0 - failed := 0 // Split ids in batches for i := 0; i < len(ids); i += batchSize { @@ -198,7 +198,6 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. for i=1, #ids do local key = "locks:accounts:" .. ids[i] if redis.call("exists", key) == 0 then - redis.call("lpush", "rmq::queue::[notifications]::ready", ids[i]) redis.call("setex", key, %d, 1) retv[#retv + 1] = ids[i] end @@ -216,33 +215,32 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. } vals := res.([]interface{}) - enqueued += len(vals) skipped += len(batch) - len(vals) + enqueued += len(vals) - /* - for _, val := range vals { - id := val.(int64) - payload := fmt.Sprintf("%d", id) - if err = queue.Publish(payload); err != nil { - logger.WithFields(logrus.Fields{ - "accountID": payload, - "err": err, - }).Error("failed to enqueue account") - failed++ - } - } - */ + if len(vals) == 0 { + continue + } + + batchIds := make([]string, len(vals)) + for k, v := range vals { + batchIds[k] = strconv.FormatInt(v.(int64), 10) + } + + if err = queue.Publish(batchIds...); err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Error("failed to enqueue account") + } } statsd.Histogram("apollo.queue.enqueued", float64(enqueued), []string{}, 1) statsd.Histogram("apollo.queue.skipped", float64(skipped), []string{}, 1) - statsd.Histogram("apollo.queue.failed", float64(failed), []string{}, 1) statsd.Histogram("apollo.queue.runtime", float64(time.Now().Sub(start).Milliseconds()), []string{}, 1) logger.WithFields(logrus.Fields{ "count": enqueued, "skipped": skipped, - "failed": failed, "start": ready, }).Info("done enqueueing account batch") }