diff --git a/internal/reddit/client.go b/internal/reddit/client.go index d2e38cf..e5dc574 100644 --- a/internal/reddit/client.go +++ b/internal/reddit/client.go @@ -197,8 +197,8 @@ func (rac *AuthenticatedClient) SubredditAbout(subreddit string, opts ...Request return sr.(*SubredditResponse), nil } -func (rac *AuthenticatedClient) SubredditNew(subreddit string, opts ...RequestOption) (*ListingResponse, error) { - url := fmt.Sprintf("https://oauth.reddit.com/r/%s/new.json", subreddit) +func (rac *AuthenticatedClient) subredditPosts(subreddit string, sort string, opts ...RequestOption) (*ListingResponse, error) { + url := fmt.Sprintf("https://oauth.reddit.com/r/%s/%s.json", subreddit, sort) opts = append([]RequestOption{ WithMethod("GET"), WithToken(rac.accessToken), @@ -214,6 +214,14 @@ func (rac *AuthenticatedClient) SubredditNew(subreddit string, opts ...RequestOp return lr.(*ListingResponse), nil } +func (rac *AuthenticatedClient) SubredditHot(subreddit string, opts ...RequestOption) (*ListingResponse, error) { + return rac.subredditPosts(subreddit, "hot", opts...) +} + +func (rac *AuthenticatedClient) SubredditNew(subreddit string, opts ...RequestOption) (*ListingResponse, error) { + return rac.subredditPosts(subreddit, "new", opts...) +} + func (rac *AuthenticatedClient) MessageInbox(opts ...RequestOption) (*ListingResponse, error) { opts = append([]RequestOption{ WithTags([]string{"url:/api/v1/message/inbox"}), diff --git a/internal/worker/subreddits.go b/internal/worker/subreddits.go index 61bc52a..c1ee802 100644 --- a/internal/worker/subreddits.go +++ b/internal/worker/subreddits.go @@ -179,35 +179,14 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { posts := []*reddit.Thing{} before := "" finished := false + seenPosts := map[string]bool{} + // Load 500 newest posts for pages := 0; pages < 5; pages++ { i := rand.Intn(len(watchers)) watcher := watchers[i] - dev, err := sc.deviceRepo.GetByID(ctx, watcher.DeviceID) - if err != nil { - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "watcher#id": watcher.ID, - "err": err, - }).Error("failed to fetch device for watcher from database") - continue - } - - accs, err := sc.accountRepo.GetByAPNSToken(ctx, dev.APNSToken) - if err != nil { - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "watcher#id": watcher.ID, - "device#id": dev.ID, - "err": err, - }).Error("failed to fetch accounts for device from database") - continue - } - - i = rand.Intn(len(accs)) - acc := accs[i] - + acc, _ := sc.accountRepo.GetByID(ctx, watcher.AccountID) rac := sc.reddit.NewAuthenticatedClient(acc.RefreshToken, acc.AccessToken) sps, err := rac.SubredditNew( @@ -220,7 +199,6 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { sc.logger.WithFields(logrus.Fields{ "subreddit#id": subreddit.ID, "watcher#id": watcher.ID, - "device#id": dev.ID, "err": err, }).Error("failed to fetch posts") continue @@ -242,7 +220,10 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { break } - posts = append(posts, post) + if _, ok := seenPosts[post.ID]; !ok { + posts = append(posts, post) + seenPosts[post.ID] = true + } } if finished { @@ -250,6 +231,31 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { } } + // Load hot posts + { + i := rand.Intn(len(watchers)) + watcher := watchers[i] + + acc, _ := sc.accountRepo.GetByID(ctx, watcher.AccountID) + rac := sc.reddit.NewAuthenticatedClient(acc.RefreshToken, acc.AccessToken) + sps, err := rac.SubredditHot( + subreddit.Name, + reddit.WithQuery("limit", "100"), + ) + + if err != nil { + for _, post := range sps.Children { + if post.CreatedAt < threshold { + break + } + if _, ok := seenPosts[post.ID]; !ok { + posts = append(posts, post) + seenPosts[post.ID] = true + } + } + } + } + for _, post := range posts { ids := []int64{}