apollo-backend/internal/worker/subreddits.go

401 lines
10 KiB
Go
Raw Permalink Normal View History

2021-09-25 16:56:01 +00:00
package worker
import (
"context"
"fmt"
2022-05-26 22:56:53 +00:00
"math/rand"
2021-09-25 16:56:01 +00:00
"os"
"strings"
"time"
"github.com/DataDog/datadog-go/statsd"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sideshow/apns2"
"github.com/sideshow/apns2/payload"
"github.com/sideshow/apns2/token"
2022-05-23 18:17:25 +00:00
"go.uber.org/zap"
2021-09-25 16:56:01 +00:00
"github.com/christianselig/apollo-backend/internal/domain"
"github.com/christianselig/apollo-backend/internal/reddit"
"github.com/christianselig/apollo-backend/internal/repository"
)
type subredditsWorker struct {
2022-05-23 18:17:25 +00:00
logger *zap.Logger
2021-09-25 16:56:01 +00:00
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
reddit *reddit.Client
2022-11-02 04:49:20 +00:00
apns *apns2.Client
2021-09-25 16:56:01 +00:00
accountRepo domain.AccountRepository
deviceRepo domain.DeviceRepository
subredditRepo domain.SubredditRepository
watcherRepo domain.WatcherRepository
}
2022-05-01 17:00:29 +00:00
const (
subredditNotificationTitleFormat = "📣 \u201c%s\u201d Watcher"
subredditNotificationBodyFormat = "r/%s: \u201c%s\u201d"
)
2021-10-10 15:51:42 +00:00
2022-11-02 04:49:20 +00:00
func NewSubredditsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, consumers int) Worker {
2021-09-25 16:56:01 +00:00
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
statsd,
2022-03-12 17:50:05 +00:00
redis,
2021-09-25 16:56:01 +00:00
consumers,
)
2022-11-02 04:49:20 +00:00
var apns *apns2.Client
2021-09-25 16:56:01 +00:00
{
authKey, err := token.AuthKeyFromFile(os.Getenv("APPLE_KEY_PATH"))
if err != nil {
panic(err)
}
2022-11-02 04:49:20 +00:00
tok := &token.Token{
2021-09-25 16:56:01 +00:00
AuthKey: authKey,
KeyID: os.Getenv("APPLE_KEY_ID"),
TeamID: os.Getenv("APPLE_TEAM_ID"),
}
2022-11-02 04:49:20 +00:00
apns = apns2.NewTokenClient(tok).Production()
2021-09-25 16:56:01 +00:00
}
return &subredditsWorker{
logger,
statsd,
db,
redis,
reddit,
apns,
repository.NewPostgresAccount(db),
repository.NewPostgresDevice(db),
repository.NewPostgresSubreddit(db),
repository.NewPostgresWatcher(db),
}
}
2022-11-02 04:49:20 +00:00
func (sw *subredditsWorker) Process(ctx context.Context, args ...interface{}) error {
2022-11-02 05:11:49 +00:00
id := int64(args[0].(float64))
2022-11-02 04:49:20 +00:00
sw.logger.Debug("starting job", zap.Int64("subreddit#id", id))
2021-09-25 16:56:01 +00:00
2022-11-02 04:49:20 +00:00
subreddit, err := sw.subredditRepo.GetByID(ctx, id)
2021-09-25 16:56:01 +00:00
if err != nil {
2022-11-02 04:49:20 +00:00
sw.logger.Error("failed to fetch subreddit from database", zap.Error(err), zap.Int64("subreddit#id", id))
return nil
2021-09-25 16:56:01 +00:00
}
2022-11-02 04:49:20 +00:00
watchers, err := sw.watcherRepo.GetBySubredditID(ctx, subreddit.ID)
2021-09-25 16:56:01 +00:00
if err != nil {
2022-11-02 04:49:20 +00:00
sw.logger.Error("failed to fetch watchers from database",
2022-05-23 18:17:25 +00:00
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
2022-11-02 04:49:20 +00:00
return err
2021-09-25 16:56:01 +00:00
}
if len(watchers) == 0 {
2022-11-02 04:49:20 +00:00
sw.logger.Debug("no watchers for subreddit, bailing early",
2022-05-23 18:17:25 +00:00
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
2022-11-02 04:49:20 +00:00
return nil
2021-09-25 16:56:01 +00:00
}
2022-03-28 21:05:01 +00:00
threshold := time.Now().Add(-24 * time.Hour)
2021-09-25 16:56:01 +00:00
posts := []*reddit.Thing{}
before := ""
finished := false
2021-09-25 17:05:05 +00:00
seenPosts := map[string]bool{}
2021-09-25 16:56:01 +00:00
2021-09-25 17:05:05 +00:00
// Load 500 newest posts
2022-11-02 04:49:20 +00:00
sw.logger.Debug("loading up to 500 new posts",
2022-05-23 18:17:25 +00:00
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
2021-09-25 18:02:00 +00:00
for page := 0; page < 5; page++ {
2022-11-02 04:49:20 +00:00
sw.logger.Debug("loading new posts",
2022-05-23 18:17:25 +00:00
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("page", page),
)
2021-09-25 18:02:00 +00:00
2022-05-26 22:56:53 +00:00
i := rand.Intn(len(watchers))
watcher := watchers[i]
2022-11-02 04:49:20 +00:00
rac := sw.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken)
2022-10-27 00:46:17 +00:00
sps, err := rac.SubredditNew(ctx,
2021-09-25 16:56:01 +00:00
subreddit.Name,
reddit.WithQuery("before", before),
reddit.WithQuery("limit", "100"),
2022-05-26 22:56:53 +00:00
reddit.WithQuery("show", "all"),
reddit.WithQuery("always_show_media", "1"),
2021-09-25 16:56:01 +00:00
)
if err != nil {
2022-11-02 04:49:20 +00:00
sw.logger.Error("failed to fetch new posts",
2022-05-23 18:17:25 +00:00
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("page", page),
)
2021-09-25 16:56:01 +00:00
continue
}
2022-11-02 04:49:20 +00:00
sw.logger.Debug("loaded new posts",
2022-05-23 18:17:25 +00:00
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("page", page),
zap.Int("count", sps.Count),
)
2021-09-25 18:02:00 +00:00
2021-09-25 16:56:01 +00:00
// If it's empty, we're done
if sps.Count == 0 {
break
}
// If we don't have 100 posts, we're going to be done
if sps.Count < 100 {
finished = true
}
for _, post := range sps.Children {
2022-03-28 21:05:01 +00:00
if post.CreatedAt.Before(threshold) {
2021-09-25 16:56:01 +00:00
finished = true
break
}
2021-09-25 17:05:05 +00:00
if _, ok := seenPosts[post.ID]; !ok {
posts = append(posts, post)
seenPosts[post.ID] = true
}
2021-09-25 16:56:01 +00:00
}
if finished {
2022-11-02 04:49:20 +00:00
sw.logger.Debug("reached date threshold",
2022-05-23 18:17:25 +00:00
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("page", page),
)
2021-09-25 16:56:01 +00:00
break
}
}
2021-09-25 17:05:05 +00:00
// Load hot posts
2022-11-02 04:49:20 +00:00
sw.logger.Debug("loading hot posts",
2022-05-23 18:17:25 +00:00
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
2021-09-25 17:05:05 +00:00
{
2022-05-26 22:56:53 +00:00
i := rand.Intn(len(watchers))
watcher := watchers[i]
2022-11-02 04:49:20 +00:00
rac := sw.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken)
2022-10-27 00:46:17 +00:00
sps, err := rac.SubredditHot(ctx,
2021-09-25 17:05:05 +00:00
subreddit.Name,
reddit.WithQuery("limit", "100"),
2022-05-26 22:56:53 +00:00
reddit.WithQuery("show", "all"),
reddit.WithQuery("always_show_media", "1"),
2021-09-25 17:05:05 +00:00
)
if err != nil {
2022-11-02 04:49:20 +00:00
sw.logger.Error("failed to fetch hot posts",
2022-05-23 18:17:25 +00:00
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
2021-09-25 18:02:00 +00:00
} else {
2022-11-02 04:49:20 +00:00
sw.logger.Debug("loaded hot posts",
2022-05-23 18:17:25 +00:00
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("count", sps.Count),
)
2021-09-25 18:02:00 +00:00
2021-09-25 17:05:05 +00:00
for _, post := range sps.Children {
2022-03-28 21:05:01 +00:00
if post.CreatedAt.Before(threshold) {
2021-09-25 17:05:05 +00:00
break
}
if _, ok := seenPosts[post.ID]; !ok {
posts = append(posts, post)
seenPosts[post.ID] = true
}
}
}
}
2022-11-02 04:49:20 +00:00
sw.logger.Debug("checking posts for watcher hits",
2022-05-23 18:17:25 +00:00
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("count", len(posts)),
)
2021-09-25 16:56:01 +00:00
for _, post := range posts {
2021-10-10 15:51:42 +00:00
lowcaseAuthor := strings.ToLower(post.Author)
2021-10-09 14:59:20 +00:00
lowcaseTitle := strings.ToLower(post.Title)
lowcaseFlair := strings.ToLower(post.Flair)
lowcaseDomain := strings.ToLower(post.URL)
2021-10-10 15:51:42 +00:00
notifs := []domain.Watcher{}
2021-09-25 16:56:01 +00:00
for _, watcher := range watchers {
2021-09-25 18:02:00 +00:00
// Make sure we only alert on posts created after the search
2022-03-28 21:05:01 +00:00
if watcher.CreatedAt.After(post.CreatedAt) {
2021-09-25 18:02:00 +00:00
continue
}
2022-05-07 17:22:06 +00:00
matched := watcher.KeywordMatches(lowcaseTitle)
2021-09-25 18:02:00 +00:00
2021-10-10 15:51:42 +00:00
if watcher.Author != "" && lowcaseAuthor != watcher.Author {
matched = false
}
2021-09-25 18:02:00 +00:00
if watcher.Upvotes > 0 && post.Score < watcher.Upvotes {
matched = false
}
2021-10-09 14:59:20 +00:00
if watcher.Flair != "" && !strings.Contains(lowcaseFlair, watcher.Flair) {
2021-09-25 18:02:00 +00:00
matched = false
}
2021-10-09 14:59:20 +00:00
if watcher.Domain != "" && !strings.Contains(lowcaseDomain, watcher.Domain) {
2021-09-25 18:02:00 +00:00
matched = false
}
2021-09-25 16:56:01 +00:00
if !matched {
continue
}
2022-11-02 04:49:20 +00:00
sw.logger.Debug("matched post",
2022-05-25 23:55:51 +00:00
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
zap.String("watcher#keywords", watcher.Keyword),
zap.Int64("watcher#upvotes", watcher.Upvotes),
zap.String("post#id", post.ID),
zap.String("post#title", post.Title),
zap.Int64("post#score", post.Score),
)
2021-09-25 18:02:00 +00:00
lockKey := fmt.Sprintf("watcher:%d:%s", watcher.DeviceID, post.ID)
2022-11-02 04:49:20 +00:00
notified, _ := sw.redis.Get(ctx, lockKey).Bool()
2021-09-25 16:56:01 +00:00
if notified {
2022-11-02 04:49:20 +00:00
sw.logger.Debug("already notified, skipping",
2022-05-23 18:17:25 +00:00
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
zap.String("post#id", post.ID),
)
2021-09-25 16:56:01 +00:00
continue
}
2022-11-02 04:49:20 +00:00
if err := sw.watcherRepo.IncrementHits(ctx, watcher.ID); err != nil {
sw.logger.Error("could not increment hits",
2022-05-23 18:17:25 +00:00
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
)
2022-11-02 04:49:20 +00:00
return err
}
2022-11-02 04:49:20 +00:00
sw.logger.Debug("got a hit",
2022-05-23 18:17:25 +00:00
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
zap.String("post#id", post.ID),
)
2021-09-25 18:02:00 +00:00
2022-11-02 04:49:20 +00:00
sw.redis.SetEX(ctx, lockKey, true, 24*time.Hour)
2021-10-10 15:51:42 +00:00
notifs = append(notifs, watcher)
2021-09-25 16:56:01 +00:00
}
2021-10-10 15:51:42 +00:00
if len(notifs) == 0 {
2021-09-25 16:56:01 +00:00
continue
}
2022-11-02 04:49:20 +00:00
sw.logger.Debug("got hits for post",
2022-05-23 18:17:25 +00:00
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.String("post#id", post.ID),
zap.Int("count", len(notifs)),
)
2021-09-25 18:02:00 +00:00
2021-10-10 15:51:42 +00:00
payload := payloadFromPost(post)
for _, watcher := range notifs {
title := fmt.Sprintf(subredditNotificationTitleFormat, watcher.Label)
payload.AlertTitle(title)
2021-09-25 16:56:01 +00:00
2022-05-01 17:00:29 +00:00
body := fmt.Sprintf(subredditNotificationBodyFormat, subreddit.Name, post.Title)
payload.AlertBody(body)
2021-10-10 15:51:42 +00:00
notification := &apns2.Notification{}
notification.Topic = "com.christianselig.Apollo"
notification.DeviceToken = watcher.Device.APNSToken
notification.Payload = payload
2021-09-25 16:56:01 +00:00
2022-11-02 04:49:20 +00:00
res, err := sw.apns.Push(notification)
if err != nil {
2022-11-02 04:49:20 +00:00
_ = sw.statsd.Incr("apns.notification.errors", []string{}, 1)
sw.logger.Error("failed to send notification",
2022-05-23 18:17:25 +00:00
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.String("post#id", post.ID),
zap.String("apns", watcher.Device.APNSToken),
)
2022-11-02 04:49:20 +00:00
return err
} else if !res.Sent() {
2022-11-02 04:49:20 +00:00
_ = sw.statsd.Incr("apns.notification.errors", []string{}, 1)
sw.logger.Error("notificaion not sent",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.String("post#id", post.ID),
zap.String("apns", watcher.Device.APNSToken),
2022-05-23 18:17:25 +00:00
zap.Int("response#status", res.StatusCode),
zap.String("response#reason", res.Reason),
)
2021-09-25 16:56:01 +00:00
} else {
2022-11-02 04:49:20 +00:00
_ = sw.statsd.Incr("apns.notification.sent", []string{}, 1)
sw.logger.Info("sent notification",
2022-05-23 18:17:25 +00:00
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.String("post#id", post.ID),
zap.String("device#token", watcher.Device.APNSToken),
)
2021-09-25 16:56:01 +00:00
}
}
}
2021-09-25 18:02:00 +00:00
2022-11-02 04:49:20 +00:00
sw.logger.Debug("finishing job",
2022-05-23 18:17:25 +00:00
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
2022-11-02 04:49:20 +00:00
return nil
2021-09-25 16:56:01 +00:00
}
func payloadFromPost(post *reddit.Thing) *payload.Payload {
payload := payload.
NewPayload().
AlertSummaryArg(post.Subreddit).
2022-05-01 17:36:35 +00:00
Category("subreddit-watcher").
2021-09-25 16:56:01 +00:00
Custom("post_title", post.Title).
Custom("post_id", post.ID).
2021-09-25 18:05:34 +00:00
Custom("subreddit", post.Subreddit).
2021-09-25 16:56:01 +00:00
Custom("author", post.Author).
2021-09-25 18:05:34 +00:00
Custom("post_age", post.CreatedAt).
2022-05-01 17:36:35 +00:00
ThreadID("subreddit-watcher").
2021-09-25 18:05:34 +00:00
MutableContent().
Sound("traloop.wav")
2021-09-25 16:56:01 +00:00
if post.Thumbnail != "" && !post.Over18 {
2022-05-01 17:33:09 +00:00
payload.Custom("thumbnail", post.Thumbnail)
}
2021-09-25 16:56:01 +00:00
return payload
}