apollo-backend/internal/worker/users.go

337 lines
7.9 KiB
Go
Raw Permalink Normal View History

2021-10-09 14:59:20 +00:00
package worker
import (
"context"
"fmt"
"math/rand"
"os"
"strconv"
"strings"
2021-10-09 14:59:20 +00:00
"github.com/DataDog/datadog-go/statsd"
2022-11-01 23:02:25 +00:00
"github.com/adjust/rmq/v5"
2021-10-09 14:59:20 +00:00
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sideshow/apns2"
"github.com/sideshow/apns2/payload"
"github.com/sideshow/apns2/token"
2022-11-03 17:43:10 +00:00
"go.opentelemetry.io/otel/trace"
2022-05-23 18:17:25 +00:00
"go.uber.org/zap"
2021-10-09 14:59:20 +00:00
"github.com/christianselig/apollo-backend/internal/domain"
"github.com/christianselig/apollo-backend/internal/reddit"
"github.com/christianselig/apollo-backend/internal/repository"
)
type usersWorker struct {
context.Context
2022-05-23 18:17:25 +00:00
logger *zap.Logger
2022-11-03 17:43:10 +00:00
tracer trace.Tracer
2021-10-09 14:59:20 +00:00
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
queue rmq.Connection
reddit *reddit.Client
apns *token.Token
consumers int
accountRepo domain.AccountRepository
deviceRepo domain.DeviceRepository
userRepo domain.UserRepository
watcherRepo domain.WatcherRepository
}
2021-10-10 15:51:42 +00:00
const userNotificationTitleFormat = "👨\u200d🚀 %s"
2022-11-03 17:43:10 +00:00
func NewUsersWorker(ctx context.Context, logger *zap.Logger, tracer trace.Tracer, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
2021-10-09 14:59:20 +00:00
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
2022-11-03 17:43:10 +00:00
tracer,
2021-10-09 14:59:20 +00:00
statsd,
2022-03-12 17:50:05 +00:00
redis,
2021-10-09 14:59:20 +00:00
consumers,
)
var apns *token.Token
{
authKey, err := token.AuthKeyFromFile(os.Getenv("APPLE_KEY_PATH"))
if err != nil {
panic(err)
}
apns = &token.Token{
AuthKey: authKey,
KeyID: os.Getenv("APPLE_KEY_ID"),
TeamID: os.Getenv("APPLE_TEAM_ID"),
}
}
return &usersWorker{
ctx,
2021-10-09 14:59:20 +00:00
logger,
2022-11-03 17:43:10 +00:00
tracer,
2021-10-09 14:59:20 +00:00
statsd,
db,
redis,
queue,
reddit,
apns,
consumers,
repository.NewPostgresAccount(db),
repository.NewPostgresDevice(db),
repository.NewPostgresUser(db),
repository.NewPostgresWatcher(db),
}
}
func (uw *usersWorker) Start() error {
queue, err := uw.queue.OpenQueue("users")
if err != nil {
return err
}
2022-05-23 18:17:25 +00:00
uw.logger.Info("starting up subreddits worker", zap.Int("consumers", uw.consumers))
2021-10-09 14:59:20 +00:00
prefetchLimit := int64(uw.consumers * 2)
if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil {
return err
}
host, _ := os.Hostname()
for i := 0; i < uw.consumers; i++ {
name := fmt.Sprintf("consumer %s-%d", host, i)
consumer := NewUsersConsumer(uw, i)
if _, err := queue.AddConsumer(name, consumer); err != nil {
return err
}
}
return nil
}
func (uw *usersWorker) Stop() {
<-uw.queue.StopAllConsuming() // wait for all Consume() calls to finish
}
type usersConsumer struct {
*usersWorker
tag int
apnsSandbox *apns2.Client
apnsProduction *apns2.Client
}
func NewUsersConsumer(uw *usersWorker, tag int) *usersConsumer {
return &usersConsumer{
uw,
tag,
apns2.NewTokenClient(uw.apns),
apns2.NewTokenClient(uw.apns).Production(),
}
}
func (uc *usersConsumer) Consume(delivery rmq.Delivery) {
2022-10-27 00:46:17 +00:00
ctx, cancel := context.WithCancel(uc)
defer cancel()
2021-10-09 14:59:20 +00:00
id, err := strconv.ParseInt(delivery.Payload(), 10, 64)
if err != nil {
2022-05-23 18:17:25 +00:00
uc.logger.Error("failed to parse subreddit id from payload", zap.Error(err), zap.String("payload", delivery.Payload()))
2021-10-09 14:59:20 +00:00
_ = delivery.Reject()
return
}
2022-05-23 18:17:25 +00:00
uc.logger.Debug("starting job", zap.Int64("subreddit#id", id))
2021-10-09 14:59:20 +00:00
defer func() { _ = delivery.Ack() }()
2022-10-27 00:46:17 +00:00
user, err := uc.userRepo.GetByID(ctx, id)
2021-10-09 14:59:20 +00:00
if err != nil {
2022-05-23 18:17:25 +00:00
uc.logger.Error("failed to fetch user from database", zap.Error(err), zap.Int64("subreddit#id", id))
2021-10-09 14:59:20 +00:00
return
}
2022-10-27 00:46:17 +00:00
watchers, err := uc.watcherRepo.GetByUserID(ctx, user.ID)
2021-10-09 14:59:20 +00:00
if err != nil {
2022-05-23 18:17:25 +00:00
uc.logger.Error("failed to fetch watchers from database",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
2021-10-09 14:59:20 +00:00
return
}
if len(watchers) == 0 {
2022-05-23 18:17:25 +00:00
uc.logger.Debug("no watchers for user, bailing early",
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
2021-10-09 14:59:20 +00:00
return
}
// Load 25 newest posts
i := rand.Intn(len(watchers))
watcher := watchers[i]
2022-10-27 00:46:17 +00:00
acc, _ := uc.accountRepo.GetByID(ctx, watcher.AccountID)
2022-03-12 17:50:05 +00:00
rac := uc.reddit.NewAuthenticatedClient(acc.AccountID, acc.RefreshToken, acc.AccessToken)
2021-10-09 14:59:20 +00:00
2022-10-27 00:46:17 +00:00
ru, err := rac.UserAbout(ctx, user.Name)
if err != nil {
2022-05-23 18:17:25 +00:00
uc.logger.Error("failed to fetch user details",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
return
}
if !ru.AcceptFollowers {
2022-05-23 18:17:25 +00:00
uc.logger.Info("user disabled followers, removing",
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
2022-10-27 00:46:17 +00:00
if err := uc.watcherRepo.DeleteByTypeAndWatcheeID(ctx, domain.UserWatcher, user.ID); err != nil {
2022-05-23 18:17:25 +00:00
uc.logger.Error("failed to remove watchers for user who disallows followers",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
return
}
2022-10-27 00:46:17 +00:00
if err := uc.userRepo.Delete(ctx, user.ID); err != nil {
2022-05-23 18:17:25 +00:00
uc.logger.Error("failed to remove user",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
return
}
}
2022-10-27 00:46:17 +00:00
posts, err := rac.UserPosts(ctx, user.Name)
2021-10-09 14:59:20 +00:00
if err != nil {
2022-05-23 18:17:25 +00:00
uc.logger.Error("failed to fetch user activity",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
2021-10-09 14:59:20 +00:00
return
}
for _, post := range posts.Children {
lowcaseSubreddit := strings.ToLower(post.Subreddit)
if post.SubredditType == "private" {
continue
}
notifs := []domain.Watcher{}
2021-10-09 14:59:20 +00:00
for _, watcher := range watchers {
// Make sure we only alert on activities created after the search
2022-03-28 21:05:01 +00:00
if watcher.CreatedAt.After(post.CreatedAt) {
2021-10-09 14:59:20 +00:00
continue
}
2022-03-28 21:05:01 +00:00
if watcher.LastNotifiedAt.After(post.CreatedAt) {
continue
}
if watcher.Subreddit != "" && lowcaseSubreddit != watcher.Subreddit {
continue
}
notifs = append(notifs, watcher)
}
if len(notifs) == 0 {
continue
}
payload := payloadFromUserPost(post)
notification := &apns2.Notification{}
notification.Topic = "com.christianselig.Apollo"
for _, watcher := range notifs {
2022-10-27 00:46:17 +00:00
if err := uc.watcherRepo.IncrementHits(ctx, watcher.ID); err != nil {
2022-05-23 18:17:25 +00:00
uc.logger.Error("failed to increment watcher hits",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
)
return
}
2022-10-27 00:46:17 +00:00
device, _ := uc.deviceRepo.GetByID(ctx, watcher.DeviceID)
2021-10-10 15:51:42 +00:00
title := fmt.Sprintf(userNotificationTitleFormat, watcher.Label)
payload.AlertTitle(title)
notification.Payload = payload
2021-10-09 14:59:20 +00:00
notification.DeviceToken = device.APNSToken
client := uc.apnsProduction
if device.Sandbox {
client = uc.apnsSandbox
}
res, err := client.Push(notification)
2022-01-14 20:29:56 +00:00
if err != nil || !res.Sent() {
2021-10-09 14:59:20 +00:00
_ = uc.statsd.Incr("apns.notification.errors", []string{}, 1)
2022-05-23 18:17:25 +00:00
uc.logger.Error("failed to send notification",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
zap.String("post#id", post.ID),
zap.String("apns", watcher.Device.APNSToken),
zap.Int("response#status", res.StatusCode),
zap.String("response#reason", res.Reason),
)
2021-10-09 14:59:20 +00:00
} else {
_ = uc.statsd.Incr("apns.notification.sent", []string{}, 1)
2022-05-23 18:17:25 +00:00
uc.logger.Info("sent notification",
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
zap.String("post#id", post.ID),
zap.String("device#token", watcher.Device.APNSToken),
)
2021-10-09 14:59:20 +00:00
}
}
}
2022-05-23 18:17:25 +00:00
uc.logger.Debug("finishing job",
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
2021-10-09 14:59:20 +00:00
}
func payloadFromUserPost(post *reddit.Thing) *payload.Payload {
payload := payload.
NewPayload().
AlertBody(post.Title).
2021-10-10 15:51:42 +00:00
AlertSubtitle(post.Author).
2021-10-09 14:59:20 +00:00
AlertSummaryArg(post.Author).
Category("user-watch").
Custom("post_title", post.Title).
Custom("post_id", post.ID).
Custom("subreddit", post.Subreddit).
Custom("author", post.Author).
Custom("post_age", post.CreatedAt).
MutableContent().
Sound("traloop.wav")
return payload
}