diff --git a/cmd/apollo-scheduler/main.go b/cmd/apollo-scheduler/main.go index ee439e3..a9c2b75 100644 --- a/cmd/apollo-scheduler/main.go +++ b/cmd/apollo-scheduler/main.go @@ -21,7 +21,9 @@ import ( ) const ( - batchSize = 100 + batchSize = 100 + checkTimeout = 60 // how long until we force a check + enqueueTimeout = 5 // how long until we try to re-enqueue ) func main() { @@ -128,9 +130,8 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. // and at most 6 seconds ago. Also look for accounts that haven't been checked // in over a minute. ts := start.Unix() - left := ts - 6 - right := left + 1 - expired := ts - 60 + ready := ts - enqueueTimeout + expired := ts - checkTimeout ids := []int64{} @@ -140,15 +141,15 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. SELECT id FROM accounts WHERE - last_checked_at BETWEEN $1 AND $2 - OR last_checked_at < $3 + last_enqueued_at < $1 + OR last_checked_at < $2 ORDER BY last_checked_at ) UPDATE accounts - SET last_enqueued_at = $4 + SET last_enqueued_at = $3 WHERE accounts.id IN(SELECT id FROM account) RETURNING accounts.id` - rows, err := tx.Query(ctx, stmt, left, right, expired, now) + rows, err := tx.Query(ctx, stmt, ready, expired, now) if err != nil { return err } @@ -170,8 +171,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. logger.WithFields(logrus.Fields{ "count": len(ids), - "start": left, - "end": right, + "start": ready, }).Debug("enqueueing account batch") enqueued := 0 @@ -190,20 +190,20 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. "len": len(batch), }).Debug("enqueueing batch") - lua := ` + lua := fmt.Sprintf(` local retv={} local ids=cjson.decode(ARGV[1]) for i=1, #ids do local key = "locks:accounts:" .. ids[i] if redis.call("exists", key) == 0 then - redis.call("setex", key, 60, 1) + redis.call("setex", key, %d, 1) retv[#retv + 1] = ids[i] end end return retv - ` + `, checkTimeout) res, err := redisConn.Eval(ctx, lua, []string{}, batch).Result() if err != nil { @@ -239,8 +239,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. "count": enqueued, "skipped": skipped, "failed": failed, - "start": left, - "end": right, + "start": ready, }).Info("done enqueueing account batch") }