diff --git a/internal/worker/live_activities.go b/internal/worker/live_activities.go index aad5042..0b0edde 100644 --- a/internal/worker/live_activities.go +++ b/internal/worker/live_activities.go @@ -135,6 +135,9 @@ func NewLiveActivitiesConsumer(law *liveActivitiesWorker, tag int) *liveActiviti } func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { + ctx, cancel := context.WithCancel(lac) + defer cancel() + now := time.Now().UTC() defer func() { elapsed := time.Now().Sub(now).Milliseconds() @@ -145,12 +148,12 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { key := fmt.Sprintf("locks:live-activities:%s", at) // Measure queue latency - ttl := lac.redis.PTTL(lac, key).Val() + ttl := lac.redis.PTTL(ctx, key).Val() age := (domain.NotificationCheckTimeout - ttl) _ = lac.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), []string{"queue:live_activities"}, 0.1) defer func() { - if err := lac.redis.Del(lac, key).Err(); err != nil { + if err := lac.redis.Del(ctx, key).Err(); err != nil { lac.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key)) } }() @@ -163,7 +166,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { } }() - la, err := lac.liveActivityRepo.Get(lac, at) + la, err := lac.liveActivityRepo.Get(ctx, at) if err != nil { lac.logger.Error("failed to get live activity", zap.Error(err), zap.String("live_activity#apns_token", at)) return @@ -178,7 +181,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()), ) - tokens, err := rac.RefreshTokens(lac) + tokens, err := rac.RefreshTokens(ctx) if err != nil { lac.logger.Error("failed to refresh reddit tokens", zap.Error(err), @@ -188,7 +191,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()), ) if err == reddit.ErrOauthRevoked { - _ = lac.liveActivityRepo.Delete(lac, at) + _ = lac.liveActivityRepo.Delete(ctx, at) } return } @@ -197,7 +200,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { la.AccessToken = tokens.AccessToken la.RefreshToken = tokens.RefreshToken la.TokenExpiresAt = now.Add(tokens.Expiry) - _ = lac.liveActivityRepo.Update(lac, &la) + _ = lac.liveActivityRepo.Update(ctx, &la) // Refresh client rac = lac.reddit.NewAuthenticatedClient(la.RedditAccountID, tokens.RefreshToken, tokens.AccessToken) @@ -205,7 +208,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { lac.logger.Debug("fetching latest comments", zap.String("live_activity#apns_token", at)) - tr, err := rac.TopLevelComments(lac, la.Subreddit, la.ThreadID) + tr, err := rac.TopLevelComments(ctx, la.Subreddit, la.ThreadID) if err != nil { lac.logger.Error("failed to fetch latest comments", zap.Error(err), @@ -215,7 +218,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()), ) if err == reddit.ErrOauthRevoked { - _ = lac.liveActivityRepo.Delete(lac, at) + _ = lac.liveActivityRepo.Delete(ctx, at) } return } @@ -292,7 +295,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { client = lac.apnsSandbox } - res, err := client.PushWithContext(lac, notification) + res, err := client.PushWithContext(ctx, notification) if err != nil { _ = lac.statsd.Incr("apns.live_activities.errors", []string{}, 1) lac.logger.Error("failed to send notification", @@ -300,7 +303,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { zap.String("live_activity#apns_token", at), ) - _ = lac.liveActivityRepo.Delete(lac, at) + _ = lac.liveActivityRepo.Delete(ctx, at) } else if !res.Sent() { _ = lac.statsd.Incr("apns.live_activities.errors", []string{}, 1) lac.logger.Error("notification not sent", @@ -309,7 +312,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { zap.String("response#reason", res.Reason), ) - _ = lac.liveActivityRepo.Delete(lac, at) + _ = lac.liveActivityRepo.Delete(ctx, at) } else { _ = lac.statsd.Incr("apns.live_activities.sent", []string{}, 1) lac.logger.Info("sent notification", @@ -319,7 +322,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { if la.ExpiresAt.Before(now) { lac.logger.Debug("live activity expired, deleting", zap.String("live_activity#apns_token", at)) - _ = lac.liveActivityRepo.Delete(lac, at) + _ = lac.liveActivityRepo.Delete(ctx, at) } lac.logger.Debug("finishing job",