apollo-backend/internal/worker/notifications.go

449 lines
12 KiB
Go
Raw Normal View History

package worker
2021-07-08 23:03:46 +00:00
import (
"context"
"fmt"
"os"
"time"
"github.com/DataDog/datadog-go/statsd"
2022-11-01 23:02:25 +00:00
"github.com/adjust/rmq/v5"
2021-07-08 23:03:46 +00:00
"github.com/go-redis/redis/v8"
2023-03-24 17:12:09 +00:00
"github.com/jackc/pgx/v5/pgxpool"
2021-07-08 23:03:46 +00:00
"github.com/sideshow/apns2"
"github.com/sideshow/apns2/payload"
"github.com/sideshow/apns2/token"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
2022-05-23 18:17:25 +00:00
"go.uber.org/zap"
2021-07-08 23:03:46 +00:00
2021-08-14 17:42:28 +00:00
"github.com/christianselig/apollo-backend/internal/domain"
2021-07-08 23:03:46 +00:00
"github.com/christianselig/apollo-backend/internal/reddit"
2021-08-14 17:42:28 +00:00
"github.com/christianselig/apollo-backend/internal/repository"
2021-07-08 23:03:46 +00:00
)
const (
2022-05-21 17:17:04 +00:00
rate = 0.1
2022-05-01 17:09:08 +00:00
postReplyNotificationTitleFormat = "%s to %s"
commentReplyNotificationTitleFormat = "%s in %s"
privateMessageNotificationTitleFormat = "Message from %s"
usernameMentionNotificationTitleFormat = "Mention in \u201c%s\u201d"
2021-07-08 23:03:46 +00:00
)
2022-10-31 22:04:01 +00:00
var notificationTags = []string{"queue:notifications"}
type notificationsWorker struct {
context.Context
2022-05-23 18:17:25 +00:00
logger *zap.Logger
tracer trace.Tracer
2021-08-14 17:42:28 +00:00
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
queue rmq.Connection
reddit *reddit.Client
2021-08-14 17:42:28 +00:00
apns *token.Token
2021-07-14 00:09:44 +00:00
consumers int
2021-08-14 17:42:28 +00:00
accountRepo domain.AccountRepository
deviceRepo domain.DeviceRepository
}
2021-07-08 23:03:46 +00:00
func NewNotificationsWorker(ctx context.Context, logger *zap.Logger, tracer trace.Tracer, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
tracer,
statsd,
redis,
consumers,
)
var apns *token.Token
2021-07-08 23:03:46 +00:00
{
2021-07-23 00:32:37 +00:00
authKey, err := token.AuthKeyFromFile(os.Getenv("APPLE_KEY_PATH"))
2021-07-08 23:03:46 +00:00
if err != nil {
panic(err)
}
apns = &token.Token{
2021-07-08 23:03:46 +00:00
AuthKey: authKey,
KeyID: os.Getenv("APPLE_KEY_ID"),
TeamID: os.Getenv("APPLE_TEAM_ID"),
}
}
return &notificationsWorker{
ctx,
logger,
tracer,
statsd,
db,
redis,
queue,
reddit,
apns,
2021-07-14 00:09:44 +00:00
consumers,
2021-08-14 17:42:28 +00:00
repository.NewPostgresAccount(db),
repository.NewPostgresDevice(db),
2021-07-08 23:03:46 +00:00
}
}
2021-07-08 23:03:46 +00:00
2021-07-14 00:09:44 +00:00
func (nw *notificationsWorker) Start() error {
queue, err := nw.queue.OpenQueue("notifications")
2021-07-08 23:03:46 +00:00
if err != nil {
return err
2021-07-08 23:03:46 +00:00
}
2022-05-23 18:17:25 +00:00
nw.logger.Info("starting up notifications worker", zap.Int("consumers", nw.consumers))
2021-07-09 01:28:43 +00:00
2022-11-06 00:31:55 +00:00
if err := queue.StartConsuming(int64(nw.consumers*2), pollDuration); err != nil {
return err
2021-07-08 23:03:46 +00:00
}
2021-07-09 03:50:34 +00:00
host, _ := os.Hostname()
2021-07-14 00:09:44 +00:00
for i := 0; i < nw.consumers; i++ {
2021-07-09 03:50:34 +00:00
name := fmt.Sprintf("consumer %s-%d", host, i)
2021-07-08 23:03:46 +00:00
consumer := NewNotificationsConsumer(nw, i)
2021-07-08 23:03:46 +00:00
if _, err := queue.AddConsumer(name, consumer); err != nil {
return err
2021-07-08 23:03:46 +00:00
}
}
return nil
}
2021-07-08 23:03:46 +00:00
func (nw *notificationsWorker) Stop() {
<-nw.queue.StopAllConsuming() // wait for all Consume() calls to finish
2021-07-08 23:03:46 +00:00
}
type notificationsConsumer struct {
*notificationsWorker
tag int
papns *apns2.Client
dapns *apns2.Client
2021-07-08 23:03:46 +00:00
}
func NewNotificationsConsumer(nw *notificationsWorker, tag int) *notificationsConsumer {
return &notificationsConsumer{
nw,
2021-07-08 23:03:46 +00:00
tag,
apns2.NewTokenClient(nw.apns).Production(),
apns2.NewTokenClient(nw.apns).Development(),
2021-07-08 23:03:46 +00:00
}
}
func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
2022-10-27 00:42:44 +00:00
ctx, cancel := context.WithCancel(nc)
defer cancel()
id := delivery.Payload()
logger := nc.logger.With(zap.String("account#reddit_account_id", id))
ctx, span := nc.tracer.Start(ctx, "job:notifications")
span.SetAttributes(attribute.String("job.payload", id))
defer span.End()
2022-07-13 22:25:00 +00:00
now := time.Now()
defer func() {
elapsed := time.Now().Sub(now).Milliseconds()
2022-10-31 22:04:01 +00:00
_ = nc.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), notificationTags, 0.1)
_ = nc.statsd.Incr("apollo.consumer.executions", notificationTags, 0.1)
2022-07-13 22:25:00 +00:00
}()
defer func(ctx context.Context) {
_, span := nc.tracer.Start(ctx, "queue:ack")
defer span.End()
if err := delivery.Ack(); err != nil {
span.SetStatus(codes.Error, "failed to acknowledge message")
span.RecordError(err)
logger.Error("failed to acknowledge message", zap.Error(err))
}
}(ctx)
2022-07-13 20:28:54 +00:00
// Measure queue latency
2022-11-01 14:16:17 +00:00
key := fmt.Sprintf("locks:accounts:%s", id)
2022-10-27 00:42:44 +00:00
ttl := nc.redis.PTTL(ctx, key).Val()
if ttl == 0 {
logger.Debug("job is too old, skipping")
return
}
2022-07-13 20:46:10 +00:00
age := (domain.NotificationCheckTimeout - ttl)
2022-10-31 22:04:01 +00:00
_ = nc.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), notificationTags, 0.1)
2022-07-13 17:24:32 +00:00
defer func() {
2022-10-27 00:42:44 +00:00
if err := nc.redis.Del(ctx, key).Err(); err != nil {
2022-11-01 14:16:17 +00:00
logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key))
2021-07-09 01:44:06 +00:00
}
2022-07-13 17:24:32 +00:00
}()
2021-07-08 23:03:46 +00:00
2022-11-01 14:16:17 +00:00
logger.Debug("starting job")
2022-05-23 18:17:25 +00:00
2022-10-27 00:42:44 +00:00
account, err := nc.accountRepo.GetByRedditID(ctx, id)
2021-08-14 17:42:28 +00:00
if err != nil {
logger.Debug("could not fetch account", zap.Error(err))
2021-07-08 23:03:46 +00:00
return
}
rac := nc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken)
2022-11-01 14:16:17 +00:00
logger = logger.With(
zap.String("account#username", account.NormalizedUsername()),
zap.String("account#access_token", rac.ObfuscatedAccessToken()),
zap.String("account#refresh_token", rac.ObfuscatedRefreshToken()),
)
if account.TokenExpiresAt.Before(now.Add(5 * time.Minute)) {
2022-11-01 14:16:17 +00:00
logger.Debug("refreshing reddit token")
2022-10-27 00:42:44 +00:00
tokens, err := rac.RefreshTokens(ctx)
if err != nil {
if err != reddit.ErrOauthRevoked {
2022-11-01 14:16:17 +00:00
logger.Error("failed to refresh reddit tokens", zap.Error(err))
return
}
2022-11-01 14:16:17 +00:00
if err = nc.deleteAccount(ctx, account); err != nil {
logger.Error("failed to remove revoked account", zap.Error(err))
}
return
}
// Update account
account.AccessToken = tokens.AccessToken
account.RefreshToken = tokens.RefreshToken
account.TokenExpiresAt = now.Add(tokens.Expiry)
2022-10-27 00:42:44 +00:00
_ = nc.accountRepo.Update(ctx, &account)
// Refresh client
rac = nc.reddit.NewAuthenticatedClient(account.AccountID, tokens.RefreshToken, tokens.AccessToken)
2022-11-01 14:16:17 +00:00
logger = logger.With(
zap.String("account#access_token", rac.ObfuscatedAccessToken()),
zap.String("account#refresh_token", rac.ObfuscatedRefreshToken()),
)
}
2021-07-08 23:03:46 +00:00
2022-11-01 14:16:17 +00:00
logger.Debug("fetching message inbox")
opts := []reddit.RequestOption{reddit.WithQuery("limit", "10")}
if account.LastMessageID != "" {
opts = append(opts, reddit.WithQuery("before", account.LastMessageID))
}
2022-10-27 00:42:44 +00:00
msgs, err := rac.MessageInbox(ctx, opts...)
2021-07-08 23:03:46 +00:00
if err != nil {
2021-08-14 18:07:19 +00:00
switch err {
2022-05-19 15:51:56 +00:00
case reddit.ErrTimeout, reddit.ErrRateLimited: // Don't log timeouts or rate limits
2021-08-14 18:07:19 +00:00
break
case reddit.ErrOauthRevoked:
2022-11-01 14:16:17 +00:00
if err = nc.deleteAccount(ctx, account); err != nil {
logger.Error("failed to remove revoked account", zap.Error(err))
2022-05-23 18:17:25 +00:00
} else {
2022-11-01 14:16:17 +00:00
logger.Info("removed revoked account")
2021-08-14 18:07:19 +00:00
}
default:
2022-11-01 14:16:17 +00:00
logger.Error("failed to fetch message inbox", zap.Error(err))
2021-08-14 17:42:28 +00:00
}
2021-07-08 23:03:46 +00:00
return
}
2021-07-15 14:51:34 +00:00
// Figure out where we stand
if msgs.Count == 0 {
2022-11-01 14:16:17 +00:00
logger.Debug("no new messages, bailing early")
2021-07-15 14:51:34 +00:00
return
}
2022-11-01 14:16:17 +00:00
logger.Debug("fetched messages", zap.Int("count", msgs.Count))
2021-07-08 23:03:46 +00:00
2021-10-17 15:27:52 +00:00
for _, msg := range msgs.Children {
2021-10-17 15:05:40 +00:00
if !msg.IsDeleted() {
2021-10-17 15:27:52 +00:00
account.LastMessageID = msg.FullName()
2022-10-27 00:42:44 +00:00
_ = nc.accountRepo.Update(ctx, &account)
2021-10-17 15:05:40 +00:00
break
}
}
2021-08-14 17:42:28 +00:00
2022-11-05 20:15:04 +00:00
// Let's populate this with the latest message so we don't flood users with stuff
if account.CheckCount == 0 {
logger.Debug("populating first message id to prevent spamming")
account.CheckCount = 1
_ = nc.accountRepo.Update(ctx, &account)
return
}
2022-10-27 00:42:44 +00:00
devices, err := nc.deviceRepo.GetInboxNotifiableByAccountID(ctx, account.ID)
2021-07-08 23:03:46 +00:00
if err != nil {
2022-11-01 14:16:17 +00:00
logger.Error("failed to fetch account devices", zap.Error(err))
2021-07-08 23:03:46 +00:00
return
}
2022-03-26 16:52:10 +00:00
if len(devices) == 0 {
2022-11-01 14:16:17 +00:00
logger.Debug("no notifiable devices, bailing early")
2022-03-26 16:52:10 +00:00
return
}
2021-07-15 14:51:34 +00:00
// Iterate backwards so we notify from older to newer
for i := msgs.Count - 1; i >= 0; i-- {
msg := msgs.Children[i]
2021-10-17 15:05:40 +00:00
if msg.IsDeleted() {
continue
}
2022-07-13 17:42:21 +00:00
// Latency is the time difference between the appearence of the new message and the
// time we notified at.
latency := now.Sub(msg.CreatedAt)
2022-11-02 22:44:42 +00:00
_ = nc.statsd.Histogram("apollo.queue.delay", float64(latency.Milliseconds()), []string{}, 0.1)
2022-07-13 17:42:21 +00:00
2021-07-08 23:03:46 +00:00
notification := &apns2.Notification{}
notification.Topic = "com.christianselig.Apollo"
notification.Payload = payloadFromMessage(account, msg, msgs.Count)
2021-07-08 23:03:46 +00:00
client := nc.papns
if account.Development {
client = nc.dapns
}
2021-07-08 23:03:46 +00:00
for _, device := range devices {
notification.DeviceToken = device.APNSToken
2022-10-31 19:22:42 +00:00
res, err := client.PushWithContext(ctx, notification)
if err != nil {
2021-09-25 13:19:42 +00:00
_ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
2022-11-01 14:16:17 +00:00
logger.Error("failed to send notification",
2022-05-23 18:17:25 +00:00
zap.Error(err),
zap.String("device#token", device.APNSToken),
)
// Delete device as notifications might have been disabled here
2022-10-27 00:42:44 +00:00
_ = nc.deviceRepo.Delete(ctx, device.APNSToken)
} else if !res.Sent() {
_ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
2022-11-01 14:16:17 +00:00
logger.Error("notification not sent",
zap.String("device#token", device.APNSToken),
2022-05-23 18:17:25 +00:00
zap.Int("response#status", res.StatusCode),
zap.String("response#reason", res.Reason),
)
2021-10-10 15:51:42 +00:00
// Delete device as notifications might have been disabled here
2022-10-27 00:42:44 +00:00
_ = nc.deviceRepo.Delete(ctx, device.APNSToken)
2021-07-08 23:03:46 +00:00
} else {
2021-09-25 13:19:42 +00:00
_ = nc.statsd.Incr("apns.notification.sent", []string{}, 1)
2022-11-01 14:16:17 +00:00
logger.Info("sent notification", zap.String("device#token", device.APNSToken))
2021-07-08 23:03:46 +00:00
}
}
}
ev := fmt.Sprintf("Sent notification to /u/%s (x%d)", account.Username, msgs.Count)
2021-09-25 13:19:42 +00:00
_ = nc.statsd.SimpleEvent(ev, "")
2021-07-16 00:55:53 +00:00
2022-11-01 14:16:17 +00:00
logger.Debug("finishing job")
2021-07-08 23:03:46 +00:00
}
2022-11-01 14:16:17 +00:00
func (nc *notificationsConsumer) deleteAccount(ctx context.Context, account domain.Account) error {
2021-08-14 17:42:28 +00:00
// Disassociate account from devices
2022-11-01 14:16:17 +00:00
devs, err := nc.deviceRepo.GetByAccountID(ctx, account.ID)
2021-08-14 17:42:28 +00:00
if err != nil {
return err
}
for _, dev := range devs {
if err := nc.accountRepo.Disassociate(nc, &account, &dev); err != nil {
2021-08-14 17:42:28 +00:00
return err
}
}
return nc.accountRepo.Delete(nc, account.ID)
2021-08-14 17:42:28 +00:00
}
func payloadFromMessage(acct domain.Account, msg *reddit.Thing, badgeCount int) *payload.Payload {
2021-07-10 18:51:42 +00:00
postBody := msg.Body
if len(postBody) > 2000 {
postBody = msg.Body[:2000]
}
postTitle := msg.LinkTitle
if postTitle == "" {
postTitle = msg.Subject
}
if len(postTitle) > 75 {
postTitle = fmt.Sprintf("%s…", postTitle[0:75])
}
2021-07-12 18:36:08 +00:00
payload := payload.
NewPayload().
AlertBody(postBody).
AlertSummaryArg(msg.Author).
Badge(badgeCount).
Custom("account_id", acct.AccountID).
Custom("author", msg.Author).
Custom("destination_author", msg.Destination).
Custom("parent_id", msg.ParentID).
Custom("post_title", msg.LinkTitle).
Custom("subreddit", msg.Subreddit).
MutableContent().
Sound("traloop.wav")
2021-07-10 18:51:42 +00:00
switch {
case (msg.Kind == "t1" && msg.Type == "username_mention"):
2022-05-01 17:09:08 +00:00
title := fmt.Sprintf(usernameMentionNotificationTitleFormat, postTitle)
2022-05-01 17:24:10 +00:00
postID := reddit.PostIDFromContext(msg.Context)
2022-05-01 17:22:18 +00:00
payload = payload.
AlertTitle(title).
Custom("comment_id", msg.ID).
2022-05-01 17:24:10 +00:00
Custom("post_id", postID).
Custom("subreddit", msg.Subreddit).
2022-05-01 17:22:18 +00:00
Custom("type", "username")
2021-07-10 18:51:42 +00:00
pType, _ := reddit.SplitID(msg.ParentID)
if pType == "t1" {
payload = payload.Category("inbox-username-mention-context")
} else {
payload = payload.Category("inbox-username-mention-no-context")
}
payload = payload.Custom("subject", "comment").ThreadID("comment")
case (msg.Kind == "t1" && msg.Type == "post_reply"):
2022-05-01 17:09:08 +00:00
title := fmt.Sprintf(postReplyNotificationTitleFormat, msg.Author, postTitle)
2022-05-01 17:18:16 +00:00
postID := reddit.PostIDFromContext(msg.Context)
2021-07-12 18:36:08 +00:00
payload = payload.
AlertTitle(title).
Category("inbox-post-reply").
2022-05-01 17:18:16 +00:00
Custom("comment_id", msg.ID).
Custom("post_id", postID).
2021-07-12 18:36:08 +00:00
Custom("subject", "comment").
2022-05-01 17:24:10 +00:00
Custom("subreddit", msg.Subreddit).
2021-07-12 18:36:08 +00:00
Custom("type", "post").
ThreadID("comment")
2021-07-10 18:51:42 +00:00
case (msg.Kind == "t1" && msg.Type == "comment_reply"):
2022-05-01 17:09:08 +00:00
title := fmt.Sprintf(commentReplyNotificationTitleFormat, msg.Author, postTitle)
2021-07-12 19:51:02 +00:00
postID := reddit.PostIDFromContext(msg.Context)
2021-07-12 18:36:08 +00:00
payload = payload.
AlertTitle(title).
Category("inbox-comment-reply").
Custom("comment_id", msg.ID).
Custom("post_id", postID).
Custom("subject", "comment").
2022-05-01 17:24:10 +00:00
Custom("subreddit", msg.Subreddit).
2021-07-12 18:36:08 +00:00
Custom("type", "comment").
ThreadID("comment")
2021-07-10 18:51:42 +00:00
case (msg.Kind == "t4"):
2022-05-01 17:09:08 +00:00
title := fmt.Sprintf(privateMessageNotificationTitleFormat, msg.Author)
2021-07-12 18:36:08 +00:00
payload = payload.
AlertTitle(title).
AlertSubtitle(postTitle).
Category("inbox-private-message").
Custom("comment_id", msg.ID).
2021-07-12 18:36:08 +00:00
Custom("type", "private-message")
2021-07-10 18:51:42 +00:00
}
return payload
}