mirror of
https://github.com/christianselig/apollo-backend
synced 2024-11-25 13:17:42 +00:00
reduce logger allocations
This commit is contained in:
parent
484876adab
commit
873b9eb4db
2 changed files with 36 additions and 79 deletions
|
@ -315,7 +315,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
|
||||||
_ = lac.liveActivityRepo.Delete(ctx, at)
|
_ = lac.liveActivityRepo.Delete(ctx, at)
|
||||||
} else {
|
} else {
|
||||||
_ = lac.statsd.Incr("apns.notification.sent", []string{}, 1)
|
_ = lac.statsd.Incr("apns.notification.sent", []string{}, 1)
|
||||||
lac.logger.Info("sent notification",
|
lac.logger.Debug("sent notification",
|
||||||
zap.String("live_activity#apns_token", at),
|
zap.String("live_activity#apns_token", at),
|
||||||
zap.Bool("live_activity#sandbox", la.Sandbox),
|
zap.Bool("live_activity#sandbox", la.Sandbox),
|
||||||
zap.String("notification#type", ev),
|
zap.String("notification#type", ev),
|
||||||
|
|
|
@ -145,60 +145,52 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
id := delivery.Payload()
|
id := delivery.Payload()
|
||||||
key := fmt.Sprintf("locks:accounts:%s", id)
|
logger := nc.logger.With(zap.String("account#reddit_account_id", id))
|
||||||
|
|
||||||
// Measure queue latency
|
// Measure queue latency
|
||||||
|
key := fmt.Sprintf("locks:accounts:%s", id)
|
||||||
ttl := nc.redis.PTTL(ctx, key).Val()
|
ttl := nc.redis.PTTL(ctx, key).Val()
|
||||||
age := (domain.NotificationCheckTimeout - ttl)
|
age := (domain.NotificationCheckTimeout - ttl)
|
||||||
_ = nc.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), notificationTags, 0.1)
|
_ = nc.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), notificationTags, 0.1)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := nc.redis.Del(ctx, key).Err(); err != nil {
|
if err := nc.redis.Del(ctx, key).Err(); err != nil {
|
||||||
nc.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key))
|
logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
nc.logger.Debug("starting job", zap.String("account#reddit_account_id", id))
|
logger.Debug("starting job")
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := delivery.Ack(); err != nil {
|
if err := delivery.Ack(); err != nil {
|
||||||
nc.logger.Error("failed to acknowledge message", zap.Error(err), zap.String("account#reddit_account_id", id))
|
logger.Error("failed to acknowledge message", zap.Error(err))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
account, err := nc.accountRepo.GetByRedditID(ctx, id)
|
account, err := nc.accountRepo.GetByRedditID(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
nc.logger.Info("account not found, exiting",
|
logger.Info("account not found, exiting", zap.Error(err))
|
||||||
zap.Error(err),
|
|
||||||
zap.String("account#reddit_account_id", id))
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rac := nc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken)
|
rac := nc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken)
|
||||||
if account.TokenExpiresAt.Before(now.Add(5 * time.Minute)) {
|
logger = logger.With(
|
||||||
nc.logger.Debug("refreshing reddit token",
|
|
||||||
zap.String("account#reddit_account_id", id),
|
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
|
zap.String("account#access_token", rac.ObfuscatedAccessToken()),
|
||||||
|
zap.String("account#refresh_token", rac.ObfuscatedRefreshToken()),
|
||||||
)
|
)
|
||||||
|
if account.TokenExpiresAt.Before(now.Add(5 * time.Minute)) {
|
||||||
|
logger.Debug("refreshing reddit token")
|
||||||
|
|
||||||
tokens, err := rac.RefreshTokens(ctx)
|
tokens, err := rac.RefreshTokens(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != reddit.ErrOauthRevoked {
|
if err != reddit.ErrOauthRevoked {
|
||||||
nc.logger.Error("failed to refresh reddit tokens",
|
logger.Error("failed to refresh reddit tokens", zap.Error(err))
|
||||||
zap.Error(err),
|
|
||||||
zap.String("account#reddit_account_id", id),
|
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
|
||||||
)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = nc.deleteAccount(account)
|
if err = nc.deleteAccount(ctx, account); err != nil {
|
||||||
if err != nil {
|
logger.Error("failed to remove revoked account", zap.Error(err))
|
||||||
nc.logger.Error("failed to remove revoked account",
|
|
||||||
zap.Error(err),
|
|
||||||
zap.String("account#reddit_account_id", id),
|
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -212,9 +204,13 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
|
|
||||||
// Refresh client
|
// Refresh client
|
||||||
rac = nc.reddit.NewAuthenticatedClient(account.AccountID, tokens.RefreshToken, tokens.AccessToken)
|
rac = nc.reddit.NewAuthenticatedClient(account.AccountID, tokens.RefreshToken, tokens.AccessToken)
|
||||||
|
logger = logger.With(
|
||||||
|
zap.String("account#access_token", rac.ObfuscatedAccessToken()),
|
||||||
|
zap.String("account#refresh_token", rac.ObfuscatedRefreshToken()),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
nc.logger.Debug("fetching message inbox", zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()))
|
logger.Debug("fetching message inbox")
|
||||||
|
|
||||||
opts := []reddit.RequestOption{reddit.WithQuery("limit", "10")}
|
opts := []reddit.RequestOption{reddit.WithQuery("limit", "10")}
|
||||||
if account.LastMessageID != "" {
|
if account.LastMessageID != "" {
|
||||||
|
@ -227,42 +223,24 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
case reddit.ErrTimeout, reddit.ErrRateLimited: // Don't log timeouts or rate limits
|
case reddit.ErrTimeout, reddit.ErrRateLimited: // Don't log timeouts or rate limits
|
||||||
break
|
break
|
||||||
case reddit.ErrOauthRevoked:
|
case reddit.ErrOauthRevoked:
|
||||||
if err = nc.deleteAccount(account); err != nil {
|
if err = nc.deleteAccount(ctx, account); err != nil {
|
||||||
nc.logger.Error("failed to remove revoked account",
|
logger.Error("failed to remove revoked account", zap.Error(err))
|
||||||
zap.Error(err),
|
|
||||||
zap.String("account#reddit_account_id", id),
|
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
|
||||||
)
|
|
||||||
} else {
|
} else {
|
||||||
nc.logger.Info("removed revoked account",
|
logger.Info("removed revoked account")
|
||||||
zap.String("account#reddit_account_id", id),
|
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
nc.logger.Error("failed to fetch message inbox",
|
logger.Error("failed to fetch message inbox", zap.Error(err))
|
||||||
zap.Error(err),
|
|
||||||
zap.String("account#reddit_account_id", id),
|
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Figure out where we stand
|
// Figure out where we stand
|
||||||
if msgs.Count == 0 {
|
if msgs.Count == 0 {
|
||||||
nc.logger.Debug("no new messages, bailing early",
|
logger.Debug("no new messages, bailing early")
|
||||||
zap.String("account#reddit_account_id", id),
|
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
|
||||||
)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
nc.logger.Debug("fetched messages",
|
logger.Debug("fetched messages", zap.Int("count", msgs.Count))
|
||||||
zap.String("account#reddit_account_id", id),
|
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
|
||||||
zap.Int("count", msgs.Count),
|
|
||||||
)
|
|
||||||
|
|
||||||
for _, msg := range msgs.Children {
|
for _, msg := range msgs.Children {
|
||||||
if !msg.IsDeleted() {
|
if !msg.IsDeleted() {
|
||||||
|
@ -274,10 +252,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
|
|
||||||
// Let's populate this with the latest message so we don't flood users with stuff
|
// Let's populate this with the latest message so we don't flood users with stuff
|
||||||
if account.CheckCount == 0 {
|
if account.CheckCount == 0 {
|
||||||
nc.logger.Debug("populating first message id to prevent spamming",
|
logger.Debug("populating first message id to prevent spamming")
|
||||||
zap.String("account#reddit_account_id", id),
|
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
|
||||||
)
|
|
||||||
|
|
||||||
account.CheckCount = 1
|
account.CheckCount = 1
|
||||||
_ = nc.accountRepo.Update(ctx, &account)
|
_ = nc.accountRepo.Update(ctx, &account)
|
||||||
|
@ -286,19 +261,12 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
|
|
||||||
devices, err := nc.deviceRepo.GetInboxNotifiableByAccountID(ctx, account.ID)
|
devices, err := nc.deviceRepo.GetInboxNotifiableByAccountID(ctx, account.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
nc.logger.Error("failed to fetch account devices",
|
logger.Error("failed to fetch account devices", zap.Error(err))
|
||||||
zap.Error(err),
|
|
||||||
zap.String("account#reddit_account_id", id),
|
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
|
||||||
)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(devices) == 0 {
|
if len(devices) == 0 {
|
||||||
nc.logger.Debug("no notifiable devices, bailing early",
|
logger.Debug("no notifiable devices, bailing early")
|
||||||
zap.String("account#reddit_account_id", id),
|
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
|
||||||
)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -325,10 +293,8 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
res, err := nc.apns.PushWithContext(ctx, notification)
|
res, err := nc.apns.PushWithContext(ctx, notification)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
|
_ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
|
||||||
nc.logger.Error("failed to send notification",
|
logger.Error("failed to send notification",
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.String("account#reddit_account_id", id),
|
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
|
||||||
zap.String("device#token", device.APNSToken),
|
zap.String("device#token", device.APNSToken),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -336,9 +302,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
_ = nc.deviceRepo.Delete(ctx, device.APNSToken)
|
_ = nc.deviceRepo.Delete(ctx, device.APNSToken)
|
||||||
} else if !res.Sent() {
|
} else if !res.Sent() {
|
||||||
_ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
|
_ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
|
||||||
nc.logger.Error("notification not sent",
|
logger.Error("notification not sent",
|
||||||
zap.String("account#reddit_account_id", id),
|
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
|
||||||
zap.String("device#token", device.APNSToken),
|
zap.String("device#token", device.APNSToken),
|
||||||
zap.Int("response#status", res.StatusCode),
|
zap.Int("response#status", res.StatusCode),
|
||||||
zap.String("response#reason", res.Reason),
|
zap.String("response#reason", res.Reason),
|
||||||
|
@ -348,11 +312,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
_ = nc.deviceRepo.Delete(ctx, device.APNSToken)
|
_ = nc.deviceRepo.Delete(ctx, device.APNSToken)
|
||||||
} else {
|
} else {
|
||||||
_ = nc.statsd.Incr("apns.notification.sent", []string{}, 1)
|
_ = nc.statsd.Incr("apns.notification.sent", []string{}, 1)
|
||||||
nc.logger.Info("sent notification",
|
logger.Info("sent notification", zap.String("device#token", device.APNSToken))
|
||||||
zap.String("account#reddit_account_id", id),
|
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
|
||||||
zap.String("device#token", device.APNSToken),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -360,15 +320,12 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
ev := fmt.Sprintf("Sent notification to /u/%s (x%d)", account.Username, msgs.Count)
|
ev := fmt.Sprintf("Sent notification to /u/%s (x%d)", account.Username, msgs.Count)
|
||||||
_ = nc.statsd.SimpleEvent(ev, "")
|
_ = nc.statsd.SimpleEvent(ev, "")
|
||||||
|
|
||||||
nc.logger.Debug("finishing job",
|
logger.Debug("finishing job")
|
||||||
zap.String("account#reddit_account_id", id),
|
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nc *notificationsConsumer) deleteAccount(account domain.Account) error {
|
func (nc *notificationsConsumer) deleteAccount(ctx context.Context, account domain.Account) error {
|
||||||
// Disassociate account from devices
|
// Disassociate account from devices
|
||||||
devs, err := nc.deviceRepo.GetByAccountID(nc, account.ID)
|
devs, err := nc.deviceRepo.GetByAccountID(ctx, account.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue