From ec7bb411fd457341f8d1aaa51c03476c1ca2970f Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Wed, 7 Jul 2021 18:09:29 -0400 Subject: [PATCH] measure if we are falling behind --- cmd/apollo-worker/main.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/cmd/apollo-worker/main.go b/cmd/apollo-worker/main.go index 8370bd1..0c89639 100644 --- a/cmd/apollo-worker/main.go +++ b/cmd/apollo-worker/main.go @@ -28,8 +28,11 @@ type application struct { client *reddit.Client } -var workers int = runtime.NumCPU() * 6 -var rate float64 = 0.1 +var ( + workers int = runtime.NumCPU() * 6 + rate float64 = 0.1 + backoff int64 = 5 +) func accountWorker(id int, rc *reddit.Client, db *sql.DB, logger *log.Logger, statsd *statsd.Client, quit chan bool) { authKey, err := token.AuthKeyFromBytes([]byte(os.Getenv("APPLE_KEY_PKEY"))) @@ -62,11 +65,11 @@ func accountWorker(id int, rc *reddit.Client, db *sql.DB, logger *log.Logger, st query := ` SELECT id, username, access_token, refresh_token, expires_at, last_message_id, last_checked_at FROM accounts - WHERE last_checked_at <= $1 - 5 + WHERE last_checked_at <= $1 - $2 ORDER BY last_checked_at LIMIT 1 FOR UPDATE SKIP LOCKED` - args := []interface{}{now} + args := []interface{}{now, backoff} account := &data.Account{} err = tx.QueryRow(query, args...).Scan(&account.ID, &account.Username, &account.AccessToken, &account.RefreshToken, &account.ExpiresAt, &account.LastMessageID, &account.LastCheckedAt) @@ -77,6 +80,11 @@ func accountWorker(id int, rc *reddit.Client, db *sql.DB, logger *log.Logger, st continue } + if account.LastCheckedAt > 0 { + latency := now - account.LastCheckedAt - backoff + statsd.Histogram("apollo.queue.delay", float64(latency), []string{}, rate) + } + _, err = tx.Exec(`UPDATE accounts SET last_checked_at = $1 WHERE id = $2`, now, account.ID) rac := rc.NewAuthenticatedClient(account.RefreshToken, account.AccessToken)