more context fixes

This commit is contained in:
Andre Medeiros 2022-10-26 20:43:44 -04:00
parent 9a34ae3493
commit b0025e2367

View file

@ -135,6 +135,9 @@ func NewLiveActivitiesConsumer(law *liveActivitiesWorker, tag int) *liveActiviti
} }
func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
ctx, cancel := context.WithCancel(lac)
defer cancel()
now := time.Now().UTC() now := time.Now().UTC()
defer func() { defer func() {
elapsed := time.Now().Sub(now).Milliseconds() elapsed := time.Now().Sub(now).Milliseconds()
@ -145,12 +148,12 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
key := fmt.Sprintf("locks:live-activities:%s", at) key := fmt.Sprintf("locks:live-activities:%s", at)
// Measure queue latency // Measure queue latency
ttl := lac.redis.PTTL(lac, key).Val() ttl := lac.redis.PTTL(ctx, key).Val()
age := (domain.NotificationCheckTimeout - ttl) age := (domain.NotificationCheckTimeout - ttl)
_ = lac.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), []string{"queue:live_activities"}, 0.1) _ = lac.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), []string{"queue:live_activities"}, 0.1)
defer func() { defer func() {
if err := lac.redis.Del(lac, key).Err(); err != nil { if err := lac.redis.Del(ctx, key).Err(); err != nil {
lac.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key)) lac.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key))
} }
}() }()
@ -163,7 +166,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
} }
}() }()
la, err := lac.liveActivityRepo.Get(lac, at) la, err := lac.liveActivityRepo.Get(ctx, at)
if err != nil { if err != nil {
lac.logger.Error("failed to get live activity", zap.Error(err), zap.String("live_activity#apns_token", at)) lac.logger.Error("failed to get live activity", zap.Error(err), zap.String("live_activity#apns_token", at))
return return
@ -178,7 +181,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()), zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()),
) )
tokens, err := rac.RefreshTokens(lac) tokens, err := rac.RefreshTokens(ctx)
if err != nil { if err != nil {
lac.logger.Error("failed to refresh reddit tokens", lac.logger.Error("failed to refresh reddit tokens",
zap.Error(err), zap.Error(err),
@ -188,7 +191,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()), zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()),
) )
if err == reddit.ErrOauthRevoked { if err == reddit.ErrOauthRevoked {
_ = lac.liveActivityRepo.Delete(lac, at) _ = lac.liveActivityRepo.Delete(ctx, at)
} }
return return
} }
@ -197,7 +200,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
la.AccessToken = tokens.AccessToken la.AccessToken = tokens.AccessToken
la.RefreshToken = tokens.RefreshToken la.RefreshToken = tokens.RefreshToken
la.TokenExpiresAt = now.Add(tokens.Expiry) la.TokenExpiresAt = now.Add(tokens.Expiry)
_ = lac.liveActivityRepo.Update(lac, &la) _ = lac.liveActivityRepo.Update(ctx, &la)
// Refresh client // Refresh client
rac = lac.reddit.NewAuthenticatedClient(la.RedditAccountID, tokens.RefreshToken, tokens.AccessToken) rac = lac.reddit.NewAuthenticatedClient(la.RedditAccountID, tokens.RefreshToken, tokens.AccessToken)
@ -205,7 +208,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
lac.logger.Debug("fetching latest comments", zap.String("live_activity#apns_token", at)) lac.logger.Debug("fetching latest comments", zap.String("live_activity#apns_token", at))
tr, err := rac.TopLevelComments(lac, la.Subreddit, la.ThreadID) tr, err := rac.TopLevelComments(ctx, la.Subreddit, la.ThreadID)
if err != nil { if err != nil {
lac.logger.Error("failed to fetch latest comments", lac.logger.Error("failed to fetch latest comments",
zap.Error(err), zap.Error(err),
@ -215,7 +218,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()), zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()),
) )
if err == reddit.ErrOauthRevoked { if err == reddit.ErrOauthRevoked {
_ = lac.liveActivityRepo.Delete(lac, at) _ = lac.liveActivityRepo.Delete(ctx, at)
} }
return return
} }
@ -292,7 +295,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
client = lac.apnsSandbox client = lac.apnsSandbox
} }
res, err := client.PushWithContext(lac, notification) res, err := client.PushWithContext(ctx, notification)
if err != nil { if err != nil {
_ = lac.statsd.Incr("apns.live_activities.errors", []string{}, 1) _ = lac.statsd.Incr("apns.live_activities.errors", []string{}, 1)
lac.logger.Error("failed to send notification", lac.logger.Error("failed to send notification",
@ -300,7 +303,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
zap.String("live_activity#apns_token", at), zap.String("live_activity#apns_token", at),
) )
_ = lac.liveActivityRepo.Delete(lac, at) _ = lac.liveActivityRepo.Delete(ctx, at)
} else if !res.Sent() { } else if !res.Sent() {
_ = lac.statsd.Incr("apns.live_activities.errors", []string{}, 1) _ = lac.statsd.Incr("apns.live_activities.errors", []string{}, 1)
lac.logger.Error("notification not sent", lac.logger.Error("notification not sent",
@ -309,7 +312,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
zap.String("response#reason", res.Reason), zap.String("response#reason", res.Reason),
) )
_ = lac.liveActivityRepo.Delete(lac, at) _ = lac.liveActivityRepo.Delete(ctx, at)
} else { } else {
_ = lac.statsd.Incr("apns.live_activities.sent", []string{}, 1) _ = lac.statsd.Incr("apns.live_activities.sent", []string{}, 1)
lac.logger.Info("sent notification", lac.logger.Info("sent notification",
@ -319,7 +322,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
if la.ExpiresAt.Before(now) { if la.ExpiresAt.Before(now) {
lac.logger.Debug("live activity expired, deleting", zap.String("live_activity#apns_token", at)) lac.logger.Debug("live activity expired, deleting", zap.String("live_activity#apns_token", at))
_ = lac.liveActivityRepo.Delete(lac, at) _ = lac.liveActivityRepo.Delete(ctx, at)
} }
lac.logger.Debug("finishing job", lac.logger.Debug("finishing job",