diff --git a/internal/worker/notifications.go b/internal/worker/notifications.go index 9240fd6..f8821d8 100644 --- a/internal/worker/notifications.go +++ b/internal/worker/notifications.go @@ -186,6 +186,24 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { nc.statsd.Histogram("apollo.queue.delay", latency, []string{}, rate) } + if err = nc.db.BeginFunc(ctx, func(tx pgx.Tx) error { + stmt := ` + UPDATE accounts + SET last_checked_at = $1 + WHERE id = $2` + _, err := tx.Exec(ctx, stmt, now, account.ID) + return err + }); err != nil { + nc.logger.WithFields(logrus.Fields{ + "accountID": id, + "err": err, + }).Error("failed to update last_checked_at for account") + return + } + + retried := false + +refreshToken: rac := nc.reddit.NewAuthenticatedClient(account.RefreshToken, account.AccessToken) if account.ExpiresAt < int64(now) { nc.logger.WithFields(logrus.Fields{ @@ -226,6 +244,13 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { msgs, err := rac.MessageInbox(account.LastMessageID) if err != nil { + // Check if maybe we need to refresh the token. + if !retried { + retried = true + account.ExpiresAt = 0 + goto refreshToken + } + nc.logger.WithFields(logrus.Fields{ "accountID": id, "err": err, @@ -238,21 +263,6 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { "count": len(msgs.MessageListing.Messages), }).Debug("fetched messages") - if err = nc.db.BeginFunc(ctx, func(tx pgx.Tx) error { - stmt := ` - UPDATE accounts - SET last_checked_at = $1 - WHERE id = $2` - _, err := tx.Exec(ctx, stmt, now, account.ID) - return err - }); err != nil { - nc.logger.WithFields(logrus.Fields{ - "accountID": id, - "err": err, - }).Error("failed to update last_checked_at for account") - return - } - if len(msgs.MessageListing.Messages) == 0 { nc.logger.WithFields(logrus.Fields{ "accountID": id,