mirror of
https://github.com/christianselig/apollo-backend
synced 2024-11-13 23:47:44 +00:00
chore: use reddit ids when enqueueing accounts (#87)
This commit is contained in:
parent
45793c9a1f
commit
e8a8e5a5b3
2 changed files with 33 additions and 42 deletions
|
@ -120,13 +120,12 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
|
||||||
func evalScript(ctx context.Context, redis *redis.Client) (string, error) {
|
func evalScript(ctx context.Context, redis *redis.Client) (string, error) {
|
||||||
lua := fmt.Sprintf(`
|
lua := fmt.Sprintf(`
|
||||||
local retv={}
|
local retv={}
|
||||||
local ids=cjson.decode(ARGV[1])
|
|
||||||
|
|
||||||
for i=1, #ids do
|
for i=1, #ARGV do
|
||||||
local key = KEYS[1] .. ":" .. ids[i]
|
local key = KEYS[1] .. ":" .. ARGV[i]
|
||||||
if redis.call("exists", key) == 0 then
|
if redis.call("exists", key) == 0 then
|
||||||
redis.call("setex", key, %.0f, 1)
|
redis.call("setex", key, %.0f, 1)
|
||||||
retv[#retv + 1] = ids[i]
|
retv[#retv + 1] = ARGV[i]
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -392,7 +391,7 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
next := now.Add(domain.NotificationCheckInterval)
|
next := now.Add(domain.NotificationCheckInterval)
|
||||||
|
|
||||||
ids := make([]int64, maxNotificationChecks)
|
ids := make([]string, maxNotificationChecks)
|
||||||
idslen := 0
|
idslen := 0
|
||||||
enqueued := 0
|
enqueued := 0
|
||||||
skipped := 0
|
skipped := 0
|
||||||
|
@ -416,16 +415,14 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli
|
||||||
FOR UPDATE SKIP LOCKED
|
FOR UPDATE SKIP LOCKED
|
||||||
LIMIT %d
|
LIMIT %d
|
||||||
)
|
)
|
||||||
RETURNING accounts.id`, maxNotificationChecks)
|
RETURNING accounts.reddit_account_id`, maxNotificationChecks)
|
||||||
rows, err := tx.Query(ctx, stmt, now, next)
|
rows, err := tx.Query(ctx, stmt, now, next)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
for i := 0; rows.Next(); i++ {
|
for i := 0; rows.Next(); i++ {
|
||||||
var id int64
|
_ = rows.Scan(&ids[i])
|
||||||
_ = rows.Scan(&id)
|
|
||||||
ids[i] = id
|
|
||||||
idslen = i
|
idslen = i
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -453,7 +450,7 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli
|
||||||
if j > idslen {
|
if j > idslen {
|
||||||
j = idslen
|
j = idslen
|
||||||
}
|
}
|
||||||
batch := Int64Slice(ids[offset:j])
|
batch := ids[offset:j]
|
||||||
|
|
||||||
logger.Debug("enqueueing batch", zap.Int("len", len(batch)))
|
logger.Debug("enqueueing batch", zap.Int("len", len(batch)))
|
||||||
|
|
||||||
|
@ -472,7 +469,7 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli
|
||||||
|
|
||||||
batchIds := make([]string, len(vals))
|
batchIds := make([]string, len(vals))
|
||||||
for k, v := range vals {
|
for k, v := range vals {
|
||||||
batchIds[k] = strconv.FormatInt(v.(int64), 10)
|
batchIds[k] = v.(string)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = queue.Publish(batchIds...); err != nil {
|
if err = queue.Publish(batchIds...); err != nil {
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/DataDog/datadog-go/statsd"
|
"github.com/DataDog/datadog-go/statsd"
|
||||||
|
@ -136,29 +135,24 @@ func NewNotificationsConsumer(nw *notificationsWorker, tag int) *notificationsCo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
defer func() {
|
id := delivery.Payload()
|
||||||
key := fmt.Sprintf("locks:accounts:%s", delivery.Payload())
|
|
||||||
|
defer func(id string) {
|
||||||
|
key := fmt.Sprintf("locks:accounts:%s", id)
|
||||||
if err := nc.redis.Del(nc, key).Err(); err != nil {
|
if err := nc.redis.Del(nc, key).Err(); err != nil {
|
||||||
nc.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key))
|
nc.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key))
|
||||||
}
|
}
|
||||||
}()
|
}(id)
|
||||||
|
|
||||||
id, err := strconv.ParseInt(delivery.Payload(), 10, 64)
|
nc.logger.Debug("starting job", zap.String("account#reddit_account_id", id))
|
||||||
if err != nil {
|
|
||||||
nc.logger.Error("failed to parse account id from payload", zap.Error(err), zap.String("payload", delivery.Payload()))
|
|
||||||
_ = delivery.Reject()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
nc.logger.Debug("starting job", zap.Int64("account#id", id))
|
|
||||||
|
|
||||||
defer func() { _ = delivery.Ack() }()
|
defer func() { _ = delivery.Ack() }()
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
account, err := nc.accountRepo.GetByID(nc, id)
|
account, err := nc.accountRepo.GetByRedditID(nc, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
nc.logger.Error("failed to fetch account from database", zap.Error(err), zap.Int64("account#id", id))
|
nc.logger.Error("failed to fetch account from database", zap.Error(err), zap.String("account#reddit_account_id", id))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,7 +165,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
if err = nc.accountRepo.Update(nc, acc); err != nil {
|
if err = nc.accountRepo.Update(nc, acc); err != nil {
|
||||||
nc.logger.Error("failed to update account",
|
nc.logger.Error("failed to update account",
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -180,7 +174,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
rac := nc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken)
|
rac := nc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken)
|
||||||
if account.TokenExpiresAt.Before(now.Add(5 * time.Minute)) {
|
if account.TokenExpiresAt.Before(now.Add(5 * time.Minute)) {
|
||||||
nc.logger.Debug("refreshing reddit token",
|
nc.logger.Debug("refreshing reddit token",
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -189,7 +183,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
if err != reddit.ErrOauthRevoked {
|
if err != reddit.ErrOauthRevoked {
|
||||||
nc.logger.Error("failed to refresh reddit tokens",
|
nc.logger.Error("failed to refresh reddit tokens",
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
@ -199,7 +193,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
nc.logger.Error("failed to remove revoked account",
|
nc.logger.Error("failed to remove revoked account",
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -223,7 +217,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
_ = nc.statsd.Histogram("apollo.queue.delay", float64(latency.Milliseconds()), []string{}, rate)
|
_ = nc.statsd.Histogram("apollo.queue.delay", float64(latency.Milliseconds()), []string{}, rate)
|
||||||
}
|
}
|
||||||
|
|
||||||
nc.logger.Debug("fetching message inbox", zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()))
|
nc.logger.Debug("fetching message inbox", zap.String("account#reddit_account_id", id), zap.String("account#username", account.NormalizedUsername()))
|
||||||
|
|
||||||
opts := []reddit.RequestOption{reddit.WithQuery("limit", "10")}
|
opts := []reddit.RequestOption{reddit.WithQuery("limit", "10")}
|
||||||
if account.LastMessageID != "" {
|
if account.LastMessageID != "" {
|
||||||
|
@ -239,19 +233,19 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
if err = nc.deleteAccount(account); err != nil {
|
if err = nc.deleteAccount(account); err != nil {
|
||||||
nc.logger.Error("failed to remove revoked account",
|
nc.logger.Error("failed to remove revoked account",
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
nc.logger.Info("removed revoked account",
|
nc.logger.Info("removed revoked account",
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
nc.logger.Error("failed to fetch message inbox",
|
nc.logger.Error("failed to fetch message inbox",
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -261,14 +255,14 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
// Figure out where we stand
|
// Figure out where we stand
|
||||||
if msgs.Count == 0 {
|
if msgs.Count == 0 {
|
||||||
nc.logger.Debug("no new messages, bailing early",
|
nc.logger.Debug("no new messages, bailing early",
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
nc.logger.Debug("fetched messages",
|
nc.logger.Debug("fetched messages",
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
zap.Int("count", msgs.Count),
|
zap.Int("count", msgs.Count),
|
||||||
)
|
)
|
||||||
|
@ -283,7 +277,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
// Let's populate this with the latest message so we don't flood users with stuff
|
// Let's populate this with the latest message so we don't flood users with stuff
|
||||||
if newAccount {
|
if newAccount {
|
||||||
nc.logger.Debug("populating first message id to prevent spamming",
|
nc.logger.Debug("populating first message id to prevent spamming",
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
@ -293,7 +287,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
nc.logger.Error("failed to fetch account devices",
|
nc.logger.Error("failed to fetch account devices",
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
@ -301,7 +295,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
|
|
||||||
if len(devices) == 0 {
|
if len(devices) == 0 {
|
||||||
nc.logger.Debug("no notifiable devices, bailing early",
|
nc.logger.Debug("no notifiable devices, bailing early",
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
@ -331,7 +325,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
_ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
|
_ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
|
||||||
nc.logger.Error("failed to send notification",
|
nc.logger.Error("failed to send notification",
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
zap.String("device#token", device.APNSToken),
|
zap.String("device#token", device.APNSToken),
|
||||||
)
|
)
|
||||||
|
@ -341,7 +335,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
} else if !res.Sent() {
|
} else if !res.Sent() {
|
||||||
_ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
|
_ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
|
||||||
nc.logger.Error("notification not sent",
|
nc.logger.Error("notification not sent",
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
zap.String("device#token", device.APNSToken),
|
zap.String("device#token", device.APNSToken),
|
||||||
zap.Int("response#status", res.StatusCode),
|
zap.Int("response#status", res.StatusCode),
|
||||||
|
@ -353,7 +347,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
} else {
|
} else {
|
||||||
_ = nc.statsd.Incr("apns.notification.sent", []string{}, 1)
|
_ = nc.statsd.Incr("apns.notification.sent", []string{}, 1)
|
||||||
nc.logger.Info("sent notification",
|
nc.logger.Info("sent notification",
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
zap.String("device#token", device.APNSToken),
|
zap.String("device#token", device.APNSToken),
|
||||||
)
|
)
|
||||||
|
@ -365,7 +359,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
_ = nc.statsd.SimpleEvent(ev, "")
|
_ = nc.statsd.SimpleEvent(ev, "")
|
||||||
|
|
||||||
nc.logger.Debug("finishing job",
|
nc.logger.Debug("finishing job",
|
||||||
zap.Int64("account#id", id),
|
zap.String("account#reddit_account_id", id),
|
||||||
zap.String("account#username", account.NormalizedUsername()),
|
zap.String("account#username", account.NormalizedUsername()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue