mirror of
https://github.com/christianselig/apollo-backend
synced 2024-11-22 19:57:43 +00:00
check last 100 hot posts too
This commit is contained in:
parent
3361403379
commit
a5bd4c2ce4
2 changed files with 42 additions and 28 deletions
|
@ -197,8 +197,8 @@ func (rac *AuthenticatedClient) SubredditAbout(subreddit string, opts ...Request
|
||||||
return sr.(*SubredditResponse), nil
|
return sr.(*SubredditResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rac *AuthenticatedClient) SubredditNew(subreddit string, opts ...RequestOption) (*ListingResponse, error) {
|
func (rac *AuthenticatedClient) subredditPosts(subreddit string, sort string, opts ...RequestOption) (*ListingResponse, error) {
|
||||||
url := fmt.Sprintf("https://oauth.reddit.com/r/%s/new.json", subreddit)
|
url := fmt.Sprintf("https://oauth.reddit.com/r/%s/%s.json", subreddit, sort)
|
||||||
opts = append([]RequestOption{
|
opts = append([]RequestOption{
|
||||||
WithMethod("GET"),
|
WithMethod("GET"),
|
||||||
WithToken(rac.accessToken),
|
WithToken(rac.accessToken),
|
||||||
|
@ -214,6 +214,14 @@ func (rac *AuthenticatedClient) SubredditNew(subreddit string, opts ...RequestOp
|
||||||
return lr.(*ListingResponse), nil
|
return lr.(*ListingResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rac *AuthenticatedClient) SubredditHot(subreddit string, opts ...RequestOption) (*ListingResponse, error) {
|
||||||
|
return rac.subredditPosts(subreddit, "hot", opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rac *AuthenticatedClient) SubredditNew(subreddit string, opts ...RequestOption) (*ListingResponse, error) {
|
||||||
|
return rac.subredditPosts(subreddit, "new", opts...)
|
||||||
|
}
|
||||||
|
|
||||||
func (rac *AuthenticatedClient) MessageInbox(opts ...RequestOption) (*ListingResponse, error) {
|
func (rac *AuthenticatedClient) MessageInbox(opts ...RequestOption) (*ListingResponse, error) {
|
||||||
opts = append([]RequestOption{
|
opts = append([]RequestOption{
|
||||||
WithTags([]string{"url:/api/v1/message/inbox"}),
|
WithTags([]string{"url:/api/v1/message/inbox"}),
|
||||||
|
|
|
@ -179,35 +179,14 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
posts := []*reddit.Thing{}
|
posts := []*reddit.Thing{}
|
||||||
before := ""
|
before := ""
|
||||||
finished := false
|
finished := false
|
||||||
|
seenPosts := map[string]bool{}
|
||||||
|
|
||||||
|
// Load 500 newest posts
|
||||||
for pages := 0; pages < 5; pages++ {
|
for pages := 0; pages < 5; pages++ {
|
||||||
i := rand.Intn(len(watchers))
|
i := rand.Intn(len(watchers))
|
||||||
watcher := watchers[i]
|
watcher := watchers[i]
|
||||||
|
|
||||||
dev, err := sc.deviceRepo.GetByID(ctx, watcher.DeviceID)
|
acc, _ := sc.accountRepo.GetByID(ctx, watcher.AccountID)
|
||||||
if err != nil {
|
|
||||||
sc.logger.WithFields(logrus.Fields{
|
|
||||||
"subreddit#id": subreddit.ID,
|
|
||||||
"watcher#id": watcher.ID,
|
|
||||||
"err": err,
|
|
||||||
}).Error("failed to fetch device for watcher from database")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
accs, err := sc.accountRepo.GetByAPNSToken(ctx, dev.APNSToken)
|
|
||||||
if err != nil {
|
|
||||||
sc.logger.WithFields(logrus.Fields{
|
|
||||||
"subreddit#id": subreddit.ID,
|
|
||||||
"watcher#id": watcher.ID,
|
|
||||||
"device#id": dev.ID,
|
|
||||||
"err": err,
|
|
||||||
}).Error("failed to fetch accounts for device from database")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
i = rand.Intn(len(accs))
|
|
||||||
acc := accs[i]
|
|
||||||
|
|
||||||
rac := sc.reddit.NewAuthenticatedClient(acc.RefreshToken, acc.AccessToken)
|
rac := sc.reddit.NewAuthenticatedClient(acc.RefreshToken, acc.AccessToken)
|
||||||
|
|
||||||
sps, err := rac.SubredditNew(
|
sps, err := rac.SubredditNew(
|
||||||
|
@ -220,7 +199,6 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
sc.logger.WithFields(logrus.Fields{
|
sc.logger.WithFields(logrus.Fields{
|
||||||
"subreddit#id": subreddit.ID,
|
"subreddit#id": subreddit.ID,
|
||||||
"watcher#id": watcher.ID,
|
"watcher#id": watcher.ID,
|
||||||
"device#id": dev.ID,
|
|
||||||
"err": err,
|
"err": err,
|
||||||
}).Error("failed to fetch posts")
|
}).Error("failed to fetch posts")
|
||||||
continue
|
continue
|
||||||
|
@ -242,7 +220,10 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, ok := seenPosts[post.ID]; !ok {
|
||||||
posts = append(posts, post)
|
posts = append(posts, post)
|
||||||
|
seenPosts[post.ID] = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if finished {
|
if finished {
|
||||||
|
@ -250,6 +231,31 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Load hot posts
|
||||||
|
{
|
||||||
|
i := rand.Intn(len(watchers))
|
||||||
|
watcher := watchers[i]
|
||||||
|
|
||||||
|
acc, _ := sc.accountRepo.GetByID(ctx, watcher.AccountID)
|
||||||
|
rac := sc.reddit.NewAuthenticatedClient(acc.RefreshToken, acc.AccessToken)
|
||||||
|
sps, err := rac.SubredditHot(
|
||||||
|
subreddit.Name,
|
||||||
|
reddit.WithQuery("limit", "100"),
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
for _, post := range sps.Children {
|
||||||
|
if post.CreatedAt < threshold {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if _, ok := seenPosts[post.ID]; !ok {
|
||||||
|
posts = append(posts, post)
|
||||||
|
seenPosts[post.ID] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, post := range posts {
|
for _, post := range posts {
|
||||||
ids := []int64{}
|
ids := []int64{}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue