This commit is contained in:
Andre Medeiros 2021-09-25 14:02:00 -04:00
parent a5bd4c2ce4
commit 698c65b1f4
7 changed files with 137 additions and 33 deletions

View file

@ -29,7 +29,14 @@ func (a *api) createWatcherHandler(w http.ResponseWriter, r *http.Request) {
apns := vars["apns"] apns := vars["apns"]
redditID := vars["redditID"] redditID := vars["redditID"]
cwr := &createWatcherRequest{} cwr := &createWatcherRequest{
Criteria: watcherCriteria{
Upvotes: 0,
Keyword: "",
Flair: "",
Domain: "",
},
}
if err := json.NewDecoder(r.Body).Decode(cwr); err != nil { if err := json.NewDecoder(r.Body).Decode(cwr); err != nil {
a.errorResponse(w, r, 500, err.Error()) a.errorResponse(w, r, 500, err.Error())
return return
@ -69,15 +76,16 @@ func (a *api) createWatcherHandler(w http.ResponseWriter, r *http.Request) {
} }
sr, err := a.subredditRepo.GetByName(ctx, cwr.Subreddit) sr, err := a.subredditRepo.GetByName(ctx, cwr.Subreddit)
if err != nil {
switch err { switch err {
case domain.ErrNotFound: case domain.ErrNotFound:
// Might be that we don't know about that subreddit yet // Might be that we don't know about that subreddit yet
sr = domain.Subreddit{SubredditID: srr.ID, Name: srr.Name} sr = domain.Subreddit{SubredditID: srr.ID, Name: srr.Name}
_ = a.subredditRepo.CreateOrUpdate(ctx, &sr) _ = a.subredditRepo.CreateOrUpdate(ctx, &sr)
default: default:
a.errorResponse(w, r, 500, err.Error()) a.errorResponse(w, r, 500, err.Error())
return return
}
} }
watcher := domain.Watcher{ watcher := domain.Watcher{

View file

@ -3,7 +3,8 @@ package domain
import "context" import "context"
type Watcher struct { type Watcher struct {
ID int64 ID int64
CreatedAt float64
DeviceID int64 DeviceID int64
AccountID int64 AccountID int64

View file

@ -276,7 +276,7 @@ func (iapr *IAPResponse) handleAppleResponse() {
// For sandbox environment, be more lenient (just ensure bundle ID is accurate) because otherwise you'll break // For sandbox environment, be more lenient (just ensure bundle ID is accurate) because otherwise you'll break
// things for TestFlight users (see: https://twitter.com/ChristianSelig/status/1414990459861098496) // things for TestFlight users (see: https://twitter.com/ChristianSelig/status/1414990459861098496)
// TODO(andremedeiros): let this through for now // TODO(andremedeiros): let this through for now
if iapr.Environment == Sandbox && false { if iapr.Environment == Sandbox && true {
ultraProduct := VerificationProduct{Name: "ultra", Status: "SANDBOX", SubscriptionType: "SANDBOX"} ultraProduct := VerificationProduct{Name: "ultra", Status: "SANDBOX", SubscriptionType: "SANDBOX"}
proProduct := VerificationProduct{Name: "pro", Status: "SANDBOX"} proProduct := VerificationProduct{Name: "pro", Status: "SANDBOX"}
communityIconsProduct := VerificationProduct{Name: "community_icons", Status: "SANDBOX"} communityIconsProduct := VerificationProduct{Name: "community_icons", Status: "SANDBOX"}

View file

@ -206,7 +206,7 @@ func (rac *AuthenticatedClient) subredditPosts(subreddit string, sort string, op
}, opts...) }, opts...)
req := NewRequest(opts...) req := NewRequest(opts...)
lr, err := rac.request(req, NewListingResponse, EmptyListingResponse) lr, err := rac.request(req, NewListingResponse, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -87,6 +87,10 @@ func WithBody(key, val string) RequestOption {
} }
func WithQuery(key, val string) RequestOption { func WithQuery(key, val string) RequestOption {
if val == "" {
return func(req *Request) {}
}
return func(req *Request) { return func(req *Request) {
req.query.Set(key, val) req.query.Set(key, val)
} }

View file

@ -3,6 +3,7 @@ package repository
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"github.com/jackc/pgx/v4/pgxpool" "github.com/jackc/pgx/v4/pgxpool"
@ -29,6 +30,7 @@ func (p *postgresWatcherRepository) fetch(ctx context.Context, query string, arg
var watcher domain.Watcher var watcher domain.Watcher
if err := rows.Scan( if err := rows.Scan(
&watcher.ID, &watcher.ID,
&watcher.CreatedAt,
&watcher.DeviceID, &watcher.DeviceID,
&watcher.AccountID, &watcher.AccountID,
&watcher.SubredditID, &watcher.SubredditID,
@ -46,7 +48,7 @@ func (p *postgresWatcherRepository) fetch(ctx context.Context, query string, arg
func (p *postgresWatcherRepository) GetByID(ctx context.Context, id int64) (domain.Watcher, error) { func (p *postgresWatcherRepository) GetByID(ctx context.Context, id int64) (domain.Watcher, error) {
query := ` query := `
SELECT id, device_id, account_id, subreddit_id, upvotes, keyword, flair, domain SELECT id, created_at, device_id, account_id, subreddit_id, upvotes, keyword, flair, domain
FROM watchers FROM watchers
WHERE id = $1` WHERE id = $1`
@ -63,7 +65,7 @@ func (p *postgresWatcherRepository) GetByID(ctx context.Context, id int64) (doma
func (p *postgresWatcherRepository) GetBySubredditID(ctx context.Context, id int64) ([]domain.Watcher, error) { func (p *postgresWatcherRepository) GetBySubredditID(ctx context.Context, id int64) ([]domain.Watcher, error) {
query := ` query := `
SELECT id, device_id, account_id, subreddit_id, upvotes, keyword, flair, domain SELECT id, created_at, device_id, account_id, subreddit_id, upvotes, keyword, flair, domain
FROM watchers FROM watchers
WHERE subreddit_id = $1` WHERE subreddit_id = $1`
@ -71,22 +73,25 @@ func (p *postgresWatcherRepository) GetBySubredditID(ctx context.Context, id int
} }
func (p *postgresWatcherRepository) Create(ctx context.Context, watcher *domain.Watcher) error { func (p *postgresWatcherRepository) Create(ctx context.Context, watcher *domain.Watcher) error {
now := float64(time.Now().UTC().Unix())
query := ` query := `
INSERT INTO watchers INSERT INTO watchers
(device_id, account_id, subreddit_id, upvotes, keyword, flair, domain) (created_at, device_id, account_id, subreddit_id, upvotes, keyword, flair, domain)
VALUES ($1, $2, $3, $4, $5, $6, $7) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING id` RETURNING id`
return p.pool.QueryRow( return p.pool.QueryRow(
ctx, ctx,
query, query,
&watcher.DeviceID, now,
&watcher.AccountID, watcher.DeviceID,
&watcher.SubredditID, watcher.AccountID,
&watcher.Upvotes, watcher.SubredditID,
&watcher.Keyword, watcher.Upvotes,
&watcher.Flair, watcher.Keyword,
&watcher.Domain, watcher.Flair,
watcher.Domain,
).Scan(&watcher.ID) ).Scan(&watcher.ID)
} }

View file

@ -182,7 +182,18 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
seenPosts := map[string]bool{} seenPosts := map[string]bool{}
// Load 500 newest posts // Load 500 newest posts
for pages := 0; pages < 5; pages++ { sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
}).Debug("loading up to 500 new posts")
for page := 0; page < 5; page++ {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"page": page,
}).Debug("loading new posts")
i := rand.Intn(len(watchers)) i := rand.Intn(len(watchers))
watcher := watchers[i] watcher := watchers[i]
@ -198,12 +209,18 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
if err != nil { if err != nil {
sc.logger.WithFields(logrus.Fields{ sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID, "subreddit#id": subreddit.ID,
"watcher#id": watcher.ID,
"err": err, "err": err,
}).Error("failed to fetch posts") }).Error("failed to fetch new posts")
continue continue
} }
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"count": sps.Count,
"page": page,
}).Debug("loaded new posts for page")
// If it's empty, we're done // If it's empty, we're done
if sps.Count == 0 { if sps.Count == 0 {
break break
@ -227,11 +244,20 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
} }
if finished { if finished {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"page": page,
}).Debug("reached date threshold")
break break
} }
} }
// Load hot posts // Load hot posts
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
}).Debug("loading hot posts")
{ {
i := rand.Intn(len(watchers)) i := rand.Intn(len(watchers))
watcher := watchers[i] watcher := watchers[i]
@ -244,6 +270,17 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
) )
if err != nil { if err != nil {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"err": err,
}).Error("failed to fetch hot posts")
} else {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"count": sps.Count,
}).Debug("loaded hot posts")
for _, post := range sps.Children { for _, post := range sps.Children {
if post.CreatedAt < threshold { if post.CreatedAt < threshold {
break break
@ -256,26 +293,63 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
} }
} }
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"count": len(posts),
}).Debug("checking posts for hits")
for _, post := range posts { for _, post := range posts {
ids := []int64{} ids := []int64{}
for _, watcher := range watchers { for _, watcher := range watchers {
matched := (watcher.Upvotes == 0 || (watcher.Upvotes > 0 && post.Score > watcher.Upvotes)) && // Make sure we only alert on posts created after the search
(watcher.Keyword == "" || strings.Contains(post.SelfText, watcher.Keyword)) && if watcher.CreatedAt > post.CreatedAt {
(watcher.Flair == "" || strings.Contains(post.Flair, watcher.Flair)) && continue
(watcher.Domain == "" || strings.Contains(post.URL, watcher.Domain)) }
matched := true
if watcher.Upvotes > 0 && post.Score < watcher.Upvotes {
matched = false
}
if watcher.Keyword != "" && !strings.Contains(post.Title, watcher.Keyword) {
matched = false
}
if watcher.Flair != "" && !strings.Contains(post.Flair, watcher.Flair) {
matched = false
}
if watcher.Domain != "" && !strings.Contains(post.URL, watcher.Domain) {
matched = false
}
if !matched { if !matched {
continue continue
} }
lockKey := fmt.Sprintf("watcher:%d:%s", watcher.ID, post.ID) lockKey := fmt.Sprintf("watcher:%d:%s", watcher.DeviceID, post.ID)
notified, _ := sc.redis.Get(ctx, lockKey).Bool() notified, _ := sc.redis.Get(ctx, lockKey).Bool()
if notified { if notified {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"watcher#id": watcher.ID,
"post#id": post.ID,
}).Debug("already notified, skipping")
continue continue
} }
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"watcher#id": watcher.ID,
"post#id": post.ID,
}).Debug("got a hit")
sc.redis.SetEX(ctx, lockKey, true, 24*time.Hour) sc.redis.SetEX(ctx, lockKey, true, 24*time.Hour)
ids = append(ids, watcher.DeviceID) ids = append(ids, watcher.DeviceID)
} }
@ -284,6 +358,13 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
continue continue
} }
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"post#id": post.ID,
"count": len(ids),
}).Debug("got hits for post")
notification := &apns2.Notification{} notification := &apns2.Notification{}
notification.Topic = "com.christianselig.Apollo" notification.Topic = "com.christianselig.Apollo"
notification.Payload = payloadFromPost(post) notification.Payload = payloadFromPost(post)
@ -317,12 +398,17 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
} }
} }
} }
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
}).Debug("finishing job")
} }
func payloadFromPost(post *reddit.Thing) *payload.Payload { func payloadFromPost(post *reddit.Thing) *payload.Payload {
payload := payload. payload := payload.
NewPayload(). NewPayload().
AlertTitle("DING DONG"). AlertTitle(post.Title).
AlertBody("I got you something"). AlertBody("I got you something").
AlertSummaryArg(post.Subreddit). AlertSummaryArg(post.Subreddit).
Category("post-watch"). Category("post-watch").