From 873b9eb4db49c8500bf8d24240798b9f5e095382 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Tue, 1 Nov 2022 10:16:17 -0400 Subject: [PATCH] reduce logger allocations --- internal/worker/live_activities.go | 2 +- internal/worker/notifications.go | 113 +++++++++-------------------- 2 files changed, 36 insertions(+), 79 deletions(-) diff --git a/internal/worker/live_activities.go b/internal/worker/live_activities.go index f09d859..804759f 100644 --- a/internal/worker/live_activities.go +++ b/internal/worker/live_activities.go @@ -315,7 +315,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { _ = lac.liveActivityRepo.Delete(ctx, at) } else { _ = lac.statsd.Incr("apns.notification.sent", []string{}, 1) - lac.logger.Info("sent notification", + lac.logger.Debug("sent notification", zap.String("live_activity#apns_token", at), zap.Bool("live_activity#sandbox", la.Sandbox), zap.String("notification#type", ev), diff --git a/internal/worker/notifications.go b/internal/worker/notifications.go index 616309c..b4240ad 100644 --- a/internal/worker/notifications.go +++ b/internal/worker/notifications.go @@ -145,60 +145,52 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { }() id := delivery.Payload() - key := fmt.Sprintf("locks:accounts:%s", id) + logger := nc.logger.With(zap.String("account#reddit_account_id", id)) // Measure queue latency + key := fmt.Sprintf("locks:accounts:%s", id) ttl := nc.redis.PTTL(ctx, key).Val() age := (domain.NotificationCheckTimeout - ttl) _ = nc.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), notificationTags, 0.1) defer func() { if err := nc.redis.Del(ctx, key).Err(); err != nil { - nc.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key)) + logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key)) } }() - nc.logger.Debug("starting job", zap.String("account#reddit_account_id", id)) + logger.Debug("starting job") defer func() { if err := delivery.Ack(); err != nil { - nc.logger.Error("failed to acknowledge message", zap.Error(err), zap.String("account#reddit_account_id", id)) + logger.Error("failed to acknowledge message", zap.Error(err)) } }() account, err := nc.accountRepo.GetByRedditID(ctx, id) if err != nil { - nc.logger.Info("account not found, exiting", - zap.Error(err), - zap.String("account#reddit_account_id", id)) + logger.Info("account not found, exiting", zap.Error(err)) return } rac := nc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken) + logger = logger.With( + zap.String("account#username", account.NormalizedUsername()), + zap.String("account#access_token", rac.ObfuscatedAccessToken()), + zap.String("account#refresh_token", rac.ObfuscatedRefreshToken()), + ) if account.TokenExpiresAt.Before(now.Add(5 * time.Minute)) { - nc.logger.Debug("refreshing reddit token", - zap.String("account#reddit_account_id", id), - zap.String("account#username", account.NormalizedUsername()), - ) + logger.Debug("refreshing reddit token") tokens, err := rac.RefreshTokens(ctx) if err != nil { if err != reddit.ErrOauthRevoked { - nc.logger.Error("failed to refresh reddit tokens", - zap.Error(err), - zap.String("account#reddit_account_id", id), - zap.String("account#username", account.NormalizedUsername()), - ) + logger.Error("failed to refresh reddit tokens", zap.Error(err)) return } - err = nc.deleteAccount(account) - if err != nil { - nc.logger.Error("failed to remove revoked account", - zap.Error(err), - zap.String("account#reddit_account_id", id), - zap.String("account#username", account.NormalizedUsername()), - ) + if err = nc.deleteAccount(ctx, account); err != nil { + logger.Error("failed to remove revoked account", zap.Error(err)) } return @@ -212,9 +204,13 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { // Refresh client rac = nc.reddit.NewAuthenticatedClient(account.AccountID, tokens.RefreshToken, tokens.AccessToken) + logger = logger.With( + zap.String("account#access_token", rac.ObfuscatedAccessToken()), + zap.String("account#refresh_token", rac.ObfuscatedRefreshToken()), + ) } - nc.logger.Debug("fetching message inbox", zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername())) + logger.Debug("fetching message inbox") opts := []reddit.RequestOption{reddit.WithQuery("limit", "10")} if account.LastMessageID != "" { @@ -227,42 +223,24 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { case reddit.ErrTimeout, reddit.ErrRateLimited: // Don't log timeouts or rate limits break case reddit.ErrOauthRevoked: - if err = nc.deleteAccount(account); err != nil { - nc.logger.Error("failed to remove revoked account", - zap.Error(err), - zap.String("account#reddit_account_id", id), - zap.String("account#username", account.NormalizedUsername()), - ) + if err = nc.deleteAccount(ctx, account); err != nil { + logger.Error("failed to remove revoked account", zap.Error(err)) } else { - nc.logger.Info("removed revoked account", - zap.String("account#reddit_account_id", id), - zap.String("account#username", account.NormalizedUsername()), - ) + logger.Info("removed revoked account") } default: - nc.logger.Error("failed to fetch message inbox", - zap.Error(err), - zap.String("account#reddit_account_id", id), - zap.String("account#username", account.NormalizedUsername()), - ) + logger.Error("failed to fetch message inbox", zap.Error(err)) } return } // Figure out where we stand if msgs.Count == 0 { - nc.logger.Debug("no new messages, bailing early", - zap.String("account#reddit_account_id", id), - zap.String("account#username", account.NormalizedUsername()), - ) + logger.Debug("no new messages, bailing early") return } - nc.logger.Debug("fetched messages", - zap.String("account#reddit_account_id", id), - zap.String("account#username", account.NormalizedUsername()), - zap.Int("count", msgs.Count), - ) + logger.Debug("fetched messages", zap.Int("count", msgs.Count)) for _, msg := range msgs.Children { if !msg.IsDeleted() { @@ -274,10 +252,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { // Let's populate this with the latest message so we don't flood users with stuff if account.CheckCount == 0 { - nc.logger.Debug("populating first message id to prevent spamming", - zap.String("account#reddit_account_id", id), - zap.String("account#username", account.NormalizedUsername()), - ) + logger.Debug("populating first message id to prevent spamming") account.CheckCount = 1 _ = nc.accountRepo.Update(ctx, &account) @@ -286,19 +261,12 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { devices, err := nc.deviceRepo.GetInboxNotifiableByAccountID(ctx, account.ID) if err != nil { - nc.logger.Error("failed to fetch account devices", - zap.Error(err), - zap.String("account#reddit_account_id", id), - zap.String("account#username", account.NormalizedUsername()), - ) + logger.Error("failed to fetch account devices", zap.Error(err)) return } if len(devices) == 0 { - nc.logger.Debug("no notifiable devices, bailing early", - zap.String("account#reddit_account_id", id), - zap.String("account#username", account.NormalizedUsername()), - ) + logger.Debug("no notifiable devices, bailing early") return } @@ -325,10 +293,8 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { res, err := nc.apns.PushWithContext(ctx, notification) if err != nil { _ = nc.statsd.Incr("apns.notification.errors", []string{}, 1) - nc.logger.Error("failed to send notification", + logger.Error("failed to send notification", zap.Error(err), - zap.String("account#reddit_account_id", id), - zap.String("account#username", account.NormalizedUsername()), zap.String("device#token", device.APNSToken), ) @@ -336,9 +302,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { _ = nc.deviceRepo.Delete(ctx, device.APNSToken) } else if !res.Sent() { _ = nc.statsd.Incr("apns.notification.errors", []string{}, 1) - nc.logger.Error("notification not sent", - zap.String("account#reddit_account_id", id), - zap.String("account#username", account.NormalizedUsername()), + logger.Error("notification not sent", zap.String("device#token", device.APNSToken), zap.Int("response#status", res.StatusCode), zap.String("response#reason", res.Reason), @@ -348,11 +312,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { _ = nc.deviceRepo.Delete(ctx, device.APNSToken) } else { _ = nc.statsd.Incr("apns.notification.sent", []string{}, 1) - nc.logger.Info("sent notification", - zap.String("account#reddit_account_id", id), - zap.String("account#username", account.NormalizedUsername()), - zap.String("device#token", device.APNSToken), - ) + logger.Info("sent notification", zap.String("device#token", device.APNSToken)) } } } @@ -360,15 +320,12 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { ev := fmt.Sprintf("Sent notification to /u/%s (x%d)", account.Username, msgs.Count) _ = nc.statsd.SimpleEvent(ev, "") - nc.logger.Debug("finishing job", - zap.String("account#reddit_account_id", id), - zap.String("account#username", account.NormalizedUsername()), - ) + logger.Debug("finishing job") } -func (nc *notificationsConsumer) deleteAccount(account domain.Account) error { +func (nc *notificationsConsumer) deleteAccount(ctx context.Context, account domain.Account) error { // Disassociate account from devices - devs, err := nc.deviceRepo.GetByAccountID(nc, account.ID) + devs, err := nc.deviceRepo.GetByAccountID(ctx, account.ID) if err != nil { return err }