From e8a8e5a5b3445680fe653193c2f73dcf30cb0202 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Medeiros?= Date: Wed, 13 Jul 2022 11:50:13 -0400 Subject: [PATCH] chore: use reddit ids when enqueueing accounts (#87) --- internal/cmd/scheduler.go | 19 +++++------ internal/worker/notifications.go | 56 ++++++++++++++------------------ 2 files changed, 33 insertions(+), 42 deletions(-) diff --git a/internal/cmd/scheduler.go b/internal/cmd/scheduler.go index a921e23..295292d 100644 --- a/internal/cmd/scheduler.go +++ b/internal/cmd/scheduler.go @@ -120,13 +120,12 @@ func SchedulerCmd(ctx context.Context) *cobra.Command { func evalScript(ctx context.Context, redis *redis.Client) (string, error) { lua := fmt.Sprintf(` local retv={} - local ids=cjson.decode(ARGV[1]) - for i=1, #ids do - local key = KEYS[1] .. ":" .. ids[i] + for i=1, #ARGV do + local key = KEYS[1] .. ":" .. ARGV[i] if redis.call("exists", key) == 0 then redis.call("setex", key, %.0f, 1) - retv[#retv + 1] = ids[i] + retv[#retv + 1] = ARGV[i] end end @@ -392,7 +391,7 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli now := time.Now() next := now.Add(domain.NotificationCheckInterval) - ids := make([]int64, maxNotificationChecks) + ids := make([]string, maxNotificationChecks) idslen := 0 enqueued := 0 skipped := 0 @@ -416,16 +415,14 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli FOR UPDATE SKIP LOCKED LIMIT %d ) - RETURNING accounts.id`, maxNotificationChecks) + RETURNING accounts.reddit_account_id`, maxNotificationChecks) rows, err := tx.Query(ctx, stmt, now, next) if err != nil { return err } defer rows.Close() for i := 0; rows.Next(); i++ { - var id int64 - _ = rows.Scan(&id) - ids[i] = id + _ = rows.Scan(&ids[i]) idslen = i } return nil @@ -453,7 +450,7 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli if j > idslen { j = idslen } - batch := Int64Slice(ids[offset:j]) + batch := ids[offset:j] logger.Debug("enqueueing batch", zap.Int("len", len(batch))) @@ -472,7 +469,7 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli batchIds := make([]string, len(vals)) for k, v := range vals { - batchIds[k] = strconv.FormatInt(v.(int64), 10) + batchIds[k] = v.(string) } if err = queue.Publish(batchIds...); err != nil { diff --git a/internal/worker/notifications.go b/internal/worker/notifications.go index 30156a4..0392554 100644 --- a/internal/worker/notifications.go +++ b/internal/worker/notifications.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "strconv" "time" "github.com/DataDog/datadog-go/statsd" @@ -136,29 +135,24 @@ func NewNotificationsConsumer(nw *notificationsWorker, tag int) *notificationsCo } func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { - defer func() { - key := fmt.Sprintf("locks:accounts:%s", delivery.Payload()) + id := delivery.Payload() + + defer func(id string) { + key := fmt.Sprintf("locks:accounts:%s", id) if err := nc.redis.Del(nc, key).Err(); err != nil { nc.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key)) } - }() + }(id) - id, err := strconv.ParseInt(delivery.Payload(), 10, 64) - if err != nil { - nc.logger.Error("failed to parse account id from payload", zap.Error(err), zap.String("payload", delivery.Payload())) - _ = delivery.Reject() - return - } - - nc.logger.Debug("starting job", zap.Int64("account#id", id)) + nc.logger.Debug("starting job", zap.String("account#reddit_account_id", id)) defer func() { _ = delivery.Ack() }() now := time.Now() - account, err := nc.accountRepo.GetByID(nc, id) + account, err := nc.accountRepo.GetByRedditID(nc, id) if err != nil { - nc.logger.Error("failed to fetch account from database", zap.Error(err), zap.Int64("account#id", id)) + nc.logger.Error("failed to fetch account from database", zap.Error(err), zap.String("account#reddit_account_id", id)) return } @@ -171,7 +165,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { if err = nc.accountRepo.Update(nc, acc); err != nil { nc.logger.Error("failed to update account", zap.Error(err), - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), ) } @@ -180,7 +174,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { rac := nc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken) if account.TokenExpiresAt.Before(now.Add(5 * time.Minute)) { nc.logger.Debug("refreshing reddit token", - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), ) @@ -189,7 +183,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { if err != reddit.ErrOauthRevoked { nc.logger.Error("failed to refresh reddit tokens", zap.Error(err), - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), ) return @@ -199,7 +193,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { if err != nil { nc.logger.Error("failed to remove revoked account", zap.Error(err), - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), ) } @@ -223,7 +217,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { _ = nc.statsd.Histogram("apollo.queue.delay", float64(latency.Milliseconds()), []string{}, rate) } - nc.logger.Debug("fetching message inbox", zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername())) + nc.logger.Debug("fetching message inbox", zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername())) opts := []reddit.RequestOption{reddit.WithQuery("limit", "10")} if account.LastMessageID != "" { @@ -239,19 +233,19 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { if err = nc.deleteAccount(account); err != nil { nc.logger.Error("failed to remove revoked account", zap.Error(err), - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), ) } else { nc.logger.Info("removed revoked account", - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), ) } default: nc.logger.Error("failed to fetch message inbox", zap.Error(err), - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), ) } @@ -261,14 +255,14 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { // Figure out where we stand if msgs.Count == 0 { nc.logger.Debug("no new messages, bailing early", - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), ) return } nc.logger.Debug("fetched messages", - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), zap.Int("count", msgs.Count), ) @@ -283,7 +277,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { // Let's populate this with the latest message so we don't flood users with stuff if newAccount { nc.logger.Debug("populating first message id to prevent spamming", - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), ) return @@ -293,7 +287,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { if err != nil { nc.logger.Error("failed to fetch account devices", zap.Error(err), - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), ) return @@ -301,7 +295,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { if len(devices) == 0 { nc.logger.Debug("no notifiable devices, bailing early", - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), ) return @@ -331,7 +325,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { _ = nc.statsd.Incr("apns.notification.errors", []string{}, 1) nc.logger.Error("failed to send notification", zap.Error(err), - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), zap.String("device#token", device.APNSToken), ) @@ -341,7 +335,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { } else if !res.Sent() { _ = nc.statsd.Incr("apns.notification.errors", []string{}, 1) nc.logger.Error("notification not sent", - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), zap.String("device#token", device.APNSToken), zap.Int("response#status", res.StatusCode), @@ -353,7 +347,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { } else { _ = nc.statsd.Incr("apns.notification.sent", []string{}, 1) nc.logger.Info("sent notification", - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), zap.String("device#token", device.APNSToken), ) @@ -365,7 +359,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { _ = nc.statsd.SimpleEvent(ev, "") nc.logger.Debug("finishing job", - zap.Int64("account#id", id), + zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()), ) }