correct use of context

This commit is contained in:
Andre Medeiros 2022-10-26 20:42:44 -04:00
parent 208ae5557f
commit 9a34ae3493

View file

@ -135,6 +135,9 @@ func NewNotificationsConsumer(nw *notificationsWorker, tag int) *notificationsCo
} }
func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
ctx, cancel := context.WithCancel(nc)
defer cancel()
now := time.Now() now := time.Now()
defer func() { defer func() {
elapsed := time.Now().Sub(now).Milliseconds() elapsed := time.Now().Sub(now).Milliseconds()
@ -145,12 +148,12 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
key := fmt.Sprintf("locks:accounts:%s", id) key := fmt.Sprintf("locks:accounts:%s", id)
// Measure queue latency // Measure queue latency
ttl := nc.redis.PTTL(nc, key).Val() ttl := nc.redis.PTTL(ctx, key).Val()
age := (domain.NotificationCheckTimeout - ttl) age := (domain.NotificationCheckTimeout - ttl)
_ = nc.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), []string{"queue:notifications"}, 0.1) _ = nc.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), []string{"queue:notifications"}, 0.1)
defer func() { defer func() {
if err := nc.redis.Del(nc, key).Err(); err != nil { if err := nc.redis.Del(ctx, key).Err(); err != nil {
nc.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key)) nc.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key))
} }
}() }()
@ -163,7 +166,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
} }
}() }()
account, err := nc.accountRepo.GetByRedditID(nc, id) account, err := nc.accountRepo.GetByRedditID(ctx, id)
if err != nil { if err != nil {
nc.logger.Error("failed to fetch account from database", zap.Error(err), zap.String("account#reddit_account_id", id)) nc.logger.Error("failed to fetch account from database", zap.Error(err), zap.String("account#reddit_account_id", id))
return return
@ -176,7 +179,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
zap.String("account#username", account.NormalizedUsername()), zap.String("account#username", account.NormalizedUsername()),
) )
tokens, err := rac.RefreshTokens(nc) tokens, err := rac.RefreshTokens(ctx)
if err != nil { if err != nil {
if err != reddit.ErrOauthRevoked { if err != reddit.ErrOauthRevoked {
nc.logger.Error("failed to refresh reddit tokens", nc.logger.Error("failed to refresh reddit tokens",
@ -203,7 +206,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
account.AccessToken = tokens.AccessToken account.AccessToken = tokens.AccessToken
account.RefreshToken = tokens.RefreshToken account.RefreshToken = tokens.RefreshToken
account.TokenExpiresAt = now.Add(tokens.Expiry) account.TokenExpiresAt = now.Add(tokens.Expiry)
_ = nc.accountRepo.Update(nc, &account) _ = nc.accountRepo.Update(ctx, &account)
// Refresh client // Refresh client
rac = nc.reddit.NewAuthenticatedClient(account.AccountID, tokens.RefreshToken, tokens.AccessToken) rac = nc.reddit.NewAuthenticatedClient(account.AccountID, tokens.RefreshToken, tokens.AccessToken)
@ -215,7 +218,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
if account.LastMessageID != "" { if account.LastMessageID != "" {
opts = append(opts, reddit.WithQuery("before", account.LastMessageID)) opts = append(opts, reddit.WithQuery("before", account.LastMessageID))
} }
msgs, err := rac.MessageInbox(nc, opts...) msgs, err := rac.MessageInbox(ctx, opts...)
if err != nil { if err != nil {
switch err { switch err {
@ -262,7 +265,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
for _, msg := range msgs.Children { for _, msg := range msgs.Children {
if !msg.IsDeleted() { if !msg.IsDeleted() {
account.LastMessageID = msg.FullName() account.LastMessageID = msg.FullName()
_ = nc.accountRepo.Update(nc, &account) _ = nc.accountRepo.Update(ctx, &account)
break break
} }
} }
@ -275,11 +278,11 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
) )
account.CheckCount = 1 account.CheckCount = 1
_ = nc.accountRepo.Update(nc, &account) _ = nc.accountRepo.Update(ctx, &account)
return return
} }
devices, err := nc.deviceRepo.GetInboxNotifiableByAccountID(nc, account.ID) devices, err := nc.deviceRepo.GetInboxNotifiableByAccountID(ctx, account.ID)
if err != nil { if err != nil {
nc.logger.Error("failed to fetch account devices", nc.logger.Error("failed to fetch account devices",
zap.Error(err), zap.Error(err),
@ -321,7 +324,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
client = nc.apnsSandbox client = nc.apnsSandbox
} }
res, err := client.PushWithContext(nc, notification) res, err := client.PushWithContext(ctx, notification)
if err != nil { if err != nil {
_ = nc.statsd.Incr("apns.notification.errors", []string{}, 1) _ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
nc.logger.Error("failed to send notification", nc.logger.Error("failed to send notification",
@ -332,7 +335,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
) )
// Delete device as notifications might have been disabled here // Delete device as notifications might have been disabled here
_ = nc.deviceRepo.Delete(nc, device.APNSToken) _ = nc.deviceRepo.Delete(ctx, device.APNSToken)
} else if !res.Sent() { } else if !res.Sent() {
_ = nc.statsd.Incr("apns.notification.errors", []string{}, 1) _ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
nc.logger.Error("notification not sent", nc.logger.Error("notification not sent",
@ -344,7 +347,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
) )
// Delete device as notifications might have been disabled here // Delete device as notifications might have been disabled here
_ = nc.deviceRepo.Delete(nc, device.APNSToken) _ = nc.deviceRepo.Delete(ctx, device.APNSToken)
} else { } else {
_ = nc.statsd.Incr("apns.notification.sent", []string{}, 1) _ = nc.statsd.Incr("apns.notification.sent", []string{}, 1)
nc.logger.Info("sent notification", nc.logger.Info("sent notification",