diff --git a/internal/worker/notifications.go b/internal/worker/notifications.go index 1b439ab..c7d7523 100644 --- a/internal/worker/notifications.go +++ b/internal/worker/notifications.go @@ -135,6 +135,9 @@ func NewNotificationsConsumer(nw *notificationsWorker, tag int) *notificationsCo } func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { + ctx, cancel := context.WithCancel(nc) + defer cancel() + now := time.Now() defer func() { elapsed := time.Now().Sub(now).Milliseconds() @@ -145,12 +148,12 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { key := fmt.Sprintf("locks:accounts:%s", id) // Measure queue latency - ttl := nc.redis.PTTL(nc, key).Val() + ttl := nc.redis.PTTL(ctx, key).Val() age := (domain.NotificationCheckTimeout - ttl) _ = nc.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), []string{"queue:notifications"}, 0.1) defer func() { - if err := nc.redis.Del(nc, key).Err(); err != nil { + if err := nc.redis.Del(ctx, key).Err(); err != nil { nc.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key)) } }() @@ -163,7 +166,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { } }() - account, err := nc.accountRepo.GetByRedditID(nc, id) + account, err := nc.accountRepo.GetByRedditID(ctx, id) if err != nil { nc.logger.Error("failed to fetch account from database", zap.Error(err), zap.String("account#reddit_account_id", id)) return @@ -176,7 +179,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { zap.String("account#username", account.NormalizedUsername()), ) - tokens, err := rac.RefreshTokens(nc) + tokens, err := rac.RefreshTokens(ctx) if err != nil { if err != reddit.ErrOauthRevoked { nc.logger.Error("failed to refresh reddit tokens", @@ -203,7 +206,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { account.AccessToken = tokens.AccessToken account.RefreshToken = tokens.RefreshToken account.TokenExpiresAt = now.Add(tokens.Expiry) - _ = nc.accountRepo.Update(nc, &account) + _ = nc.accountRepo.Update(ctx, &account) // Refresh client rac = nc.reddit.NewAuthenticatedClient(account.AccountID, tokens.RefreshToken, tokens.AccessToken) @@ -215,7 +218,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { if account.LastMessageID != "" { opts = append(opts, reddit.WithQuery("before", account.LastMessageID)) } - msgs, err := rac.MessageInbox(nc, opts...) + msgs, err := rac.MessageInbox(ctx, opts...) if err != nil { switch err { @@ -262,7 +265,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { for _, msg := range msgs.Children { if !msg.IsDeleted() { account.LastMessageID = msg.FullName() - _ = nc.accountRepo.Update(nc, &account) + _ = nc.accountRepo.Update(ctx, &account) break } } @@ -275,11 +278,11 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { ) account.CheckCount = 1 - _ = nc.accountRepo.Update(nc, &account) + _ = nc.accountRepo.Update(ctx, &account) return } - devices, err := nc.deviceRepo.GetInboxNotifiableByAccountID(nc, account.ID) + devices, err := nc.deviceRepo.GetInboxNotifiableByAccountID(ctx, account.ID) if err != nil { nc.logger.Error("failed to fetch account devices", zap.Error(err), @@ -321,7 +324,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { client = nc.apnsSandbox } - res, err := client.PushWithContext(nc, notification) + res, err := client.PushWithContext(ctx, notification) if err != nil { _ = nc.statsd.Incr("apns.notification.errors", []string{}, 1) nc.logger.Error("failed to send notification", @@ -332,7 +335,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { ) // Delete device as notifications might have been disabled here - _ = nc.deviceRepo.Delete(nc, device.APNSToken) + _ = nc.deviceRepo.Delete(ctx, device.APNSToken) } else if !res.Sent() { _ = nc.statsd.Incr("apns.notification.errors", []string{}, 1) nc.logger.Error("notification not sent", @@ -344,7 +347,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { ) // Delete device as notifications might have been disabled here - _ = nc.deviceRepo.Delete(nc, device.APNSToken) + _ = nc.deviceRepo.Delete(ctx, device.APNSToken) } else { _ = nc.statsd.Incr("apns.notification.sent", []string{}, 1) nc.logger.Info("sent notification",