From 698c65b1f44bd197f6d59224a3084b996ae32792 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sat, 25 Sep 2021 14:02:00 -0400 Subject: [PATCH] tweaks --- internal/api/watcher.go | 28 ++++--- internal/domain/watcher.go | 3 +- internal/itunes/receipt.go | 2 +- internal/reddit/client.go | 2 +- internal/reddit/request.go | 4 + internal/repository/postgres_watcher.go | 27 +++--- internal/worker/subreddits.go | 104 ++++++++++++++++++++++-- 7 files changed, 137 insertions(+), 33 deletions(-) diff --git a/internal/api/watcher.go b/internal/api/watcher.go index ead5ab8..f17c306 100644 --- a/internal/api/watcher.go +++ b/internal/api/watcher.go @@ -29,7 +29,14 @@ func (a *api) createWatcherHandler(w http.ResponseWriter, r *http.Request) { apns := vars["apns"] redditID := vars["redditID"] - cwr := &createWatcherRequest{} + cwr := &createWatcherRequest{ + Criteria: watcherCriteria{ + Upvotes: 0, + Keyword: "", + Flair: "", + Domain: "", + }, + } if err := json.NewDecoder(r.Body).Decode(cwr); err != nil { a.errorResponse(w, r, 500, err.Error()) return @@ -69,15 +76,16 @@ func (a *api) createWatcherHandler(w http.ResponseWriter, r *http.Request) { } sr, err := a.subredditRepo.GetByName(ctx, cwr.Subreddit) - - switch err { - case domain.ErrNotFound: - // Might be that we don't know about that subreddit yet - sr = domain.Subreddit{SubredditID: srr.ID, Name: srr.Name} - _ = a.subredditRepo.CreateOrUpdate(ctx, &sr) - default: - a.errorResponse(w, r, 500, err.Error()) - return + if err != nil { + switch err { + case domain.ErrNotFound: + // Might be that we don't know about that subreddit yet + sr = domain.Subreddit{SubredditID: srr.ID, Name: srr.Name} + _ = a.subredditRepo.CreateOrUpdate(ctx, &sr) + default: + a.errorResponse(w, r, 500, err.Error()) + return + } } watcher := domain.Watcher{ diff --git a/internal/domain/watcher.go b/internal/domain/watcher.go index f0e513f..f32254e 100644 --- a/internal/domain/watcher.go +++ b/internal/domain/watcher.go @@ -3,7 +3,8 @@ package domain import "context" type Watcher struct { - ID int64 + ID int64 + CreatedAt float64 DeviceID int64 AccountID int64 diff --git a/internal/itunes/receipt.go b/internal/itunes/receipt.go index f6891ea..6156e89 100644 --- a/internal/itunes/receipt.go +++ b/internal/itunes/receipt.go @@ -276,7 +276,7 @@ func (iapr *IAPResponse) handleAppleResponse() { // For sandbox environment, be more lenient (just ensure bundle ID is accurate) because otherwise you'll break // things for TestFlight users (see: https://twitter.com/ChristianSelig/status/1414990459861098496) // TODO(andremedeiros): let this through for now - if iapr.Environment == Sandbox && false { + if iapr.Environment == Sandbox && true { ultraProduct := VerificationProduct{Name: "ultra", Status: "SANDBOX", SubscriptionType: "SANDBOX"} proProduct := VerificationProduct{Name: "pro", Status: "SANDBOX"} communityIconsProduct := VerificationProduct{Name: "community_icons", Status: "SANDBOX"} diff --git a/internal/reddit/client.go b/internal/reddit/client.go index e5dc574..1550d6e 100644 --- a/internal/reddit/client.go +++ b/internal/reddit/client.go @@ -206,7 +206,7 @@ func (rac *AuthenticatedClient) subredditPosts(subreddit string, sort string, op }, opts...) req := NewRequest(opts...) - lr, err := rac.request(req, NewListingResponse, EmptyListingResponse) + lr, err := rac.request(req, NewListingResponse, nil) if err != nil { return nil, err } diff --git a/internal/reddit/request.go b/internal/reddit/request.go index 47aff4a..0e233b5 100644 --- a/internal/reddit/request.go +++ b/internal/reddit/request.go @@ -87,6 +87,10 @@ func WithBody(key, val string) RequestOption { } func WithQuery(key, val string) RequestOption { + if val == "" { + return func(req *Request) {} + } + return func(req *Request) { req.query.Set(key, val) } diff --git a/internal/repository/postgres_watcher.go b/internal/repository/postgres_watcher.go index e2a0931..34ef28d 100644 --- a/internal/repository/postgres_watcher.go +++ b/internal/repository/postgres_watcher.go @@ -3,6 +3,7 @@ package repository import ( "context" "fmt" + "time" "github.com/jackc/pgx/v4/pgxpool" @@ -29,6 +30,7 @@ func (p *postgresWatcherRepository) fetch(ctx context.Context, query string, arg var watcher domain.Watcher if err := rows.Scan( &watcher.ID, + &watcher.CreatedAt, &watcher.DeviceID, &watcher.AccountID, &watcher.SubredditID, @@ -46,7 +48,7 @@ func (p *postgresWatcherRepository) fetch(ctx context.Context, query string, arg func (p *postgresWatcherRepository) GetByID(ctx context.Context, id int64) (domain.Watcher, error) { query := ` - SELECT id, device_id, account_id, subreddit_id, upvotes, keyword, flair, domain + SELECT id, created_at, device_id, account_id, subreddit_id, upvotes, keyword, flair, domain FROM watchers WHERE id = $1` @@ -63,7 +65,7 @@ func (p *postgresWatcherRepository) GetByID(ctx context.Context, id int64) (doma func (p *postgresWatcherRepository) GetBySubredditID(ctx context.Context, id int64) ([]domain.Watcher, error) { query := ` - SELECT id, device_id, account_id, subreddit_id, upvotes, keyword, flair, domain + SELECT id, created_at, device_id, account_id, subreddit_id, upvotes, keyword, flair, domain FROM watchers WHERE subreddit_id = $1` @@ -71,22 +73,25 @@ func (p *postgresWatcherRepository) GetBySubredditID(ctx context.Context, id int } func (p *postgresWatcherRepository) Create(ctx context.Context, watcher *domain.Watcher) error { + now := float64(time.Now().UTC().Unix()) + query := ` INSERT INTO watchers - (device_id, account_id, subreddit_id, upvotes, keyword, flair, domain) - VALUES ($1, $2, $3, $4, $5, $6, $7) + (created_at, device_id, account_id, subreddit_id, upvotes, keyword, flair, domain) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id` return p.pool.QueryRow( ctx, query, - &watcher.DeviceID, - &watcher.AccountID, - &watcher.SubredditID, - &watcher.Upvotes, - &watcher.Keyword, - &watcher.Flair, - &watcher.Domain, + now, + watcher.DeviceID, + watcher.AccountID, + watcher.SubredditID, + watcher.Upvotes, + watcher.Keyword, + watcher.Flair, + watcher.Domain, ).Scan(&watcher.ID) } diff --git a/internal/worker/subreddits.go b/internal/worker/subreddits.go index c1ee802..89f8292 100644 --- a/internal/worker/subreddits.go +++ b/internal/worker/subreddits.go @@ -182,7 +182,18 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { seenPosts := map[string]bool{} // Load 500 newest posts - for pages := 0; pages < 5; pages++ { + sc.logger.WithFields(logrus.Fields{ + "subreddit#id": subreddit.ID, + "subreddit#name": subreddit.Name, + }).Debug("loading up to 500 new posts") + + for page := 0; page < 5; page++ { + sc.logger.WithFields(logrus.Fields{ + "subreddit#id": subreddit.ID, + "subreddit#name": subreddit.Name, + "page": page, + }).Debug("loading new posts") + i := rand.Intn(len(watchers)) watcher := watchers[i] @@ -198,12 +209,18 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { if err != nil { sc.logger.WithFields(logrus.Fields{ "subreddit#id": subreddit.ID, - "watcher#id": watcher.ID, "err": err, - }).Error("failed to fetch posts") + }).Error("failed to fetch new posts") continue } + sc.logger.WithFields(logrus.Fields{ + "subreddit#id": subreddit.ID, + "subreddit#name": subreddit.Name, + "count": sps.Count, + "page": page, + }).Debug("loaded new posts for page") + // If it's empty, we're done if sps.Count == 0 { break @@ -227,11 +244,20 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { } if finished { + sc.logger.WithFields(logrus.Fields{ + "subreddit#id": subreddit.ID, + "subreddit#name": subreddit.Name, + "page": page, + }).Debug("reached date threshold") break } } // Load hot posts + sc.logger.WithFields(logrus.Fields{ + "subreddit#id": subreddit.ID, + "subreddit#name": subreddit.Name, + }).Debug("loading hot posts") { i := rand.Intn(len(watchers)) watcher := watchers[i] @@ -244,6 +270,17 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { ) if err != nil { + sc.logger.WithFields(logrus.Fields{ + "subreddit#id": subreddit.ID, + "err": err, + }).Error("failed to fetch hot posts") + } else { + sc.logger.WithFields(logrus.Fields{ + "subreddit#id": subreddit.ID, + "subreddit#name": subreddit.Name, + "count": sps.Count, + }).Debug("loaded hot posts") + for _, post := range sps.Children { if post.CreatedAt < threshold { break @@ -256,26 +293,63 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { } } + sc.logger.WithFields(logrus.Fields{ + "subreddit#id": subreddit.ID, + "subreddit#name": subreddit.Name, + "count": len(posts), + }).Debug("checking posts for hits") for _, post := range posts { ids := []int64{} for _, watcher := range watchers { - matched := (watcher.Upvotes == 0 || (watcher.Upvotes > 0 && post.Score > watcher.Upvotes)) && - (watcher.Keyword == "" || strings.Contains(post.SelfText, watcher.Keyword)) && - (watcher.Flair == "" || strings.Contains(post.Flair, watcher.Flair)) && - (watcher.Domain == "" || strings.Contains(post.URL, watcher.Domain)) + // Make sure we only alert on posts created after the search + if watcher.CreatedAt > post.CreatedAt { + continue + } + + matched := true + + if watcher.Upvotes > 0 && post.Score < watcher.Upvotes { + matched = false + } + + if watcher.Keyword != "" && !strings.Contains(post.Title, watcher.Keyword) { + matched = false + } + + if watcher.Flair != "" && !strings.Contains(post.Flair, watcher.Flair) { + matched = false + } + + if watcher.Domain != "" && !strings.Contains(post.URL, watcher.Domain) { + matched = false + } if !matched { continue } - lockKey := fmt.Sprintf("watcher:%d:%s", watcher.ID, post.ID) + lockKey := fmt.Sprintf("watcher:%d:%s", watcher.DeviceID, post.ID) notified, _ := sc.redis.Get(ctx, lockKey).Bool() if notified { + sc.logger.WithFields(logrus.Fields{ + "subreddit#id": subreddit.ID, + "subreddit#name": subreddit.Name, + "watcher#id": watcher.ID, + "post#id": post.ID, + }).Debug("already notified, skipping") + continue } + sc.logger.WithFields(logrus.Fields{ + "subreddit#id": subreddit.ID, + "subreddit#name": subreddit.Name, + "watcher#id": watcher.ID, + "post#id": post.ID, + }).Debug("got a hit") + sc.redis.SetEX(ctx, lockKey, true, 24*time.Hour) ids = append(ids, watcher.DeviceID) } @@ -284,6 +358,13 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { continue } + sc.logger.WithFields(logrus.Fields{ + "subreddit#id": subreddit.ID, + "subreddit#name": subreddit.Name, + "post#id": post.ID, + "count": len(ids), + }).Debug("got hits for post") + notification := &apns2.Notification{} notification.Topic = "com.christianselig.Apollo" notification.Payload = payloadFromPost(post) @@ -317,12 +398,17 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { } } } + + sc.logger.WithFields(logrus.Fields{ + "subreddit#id": subreddit.ID, + "subreddit#name": subreddit.Name, + }).Debug("finishing job") } func payloadFromPost(post *reddit.Thing) *payload.Payload { payload := payload. NewPayload(). - AlertTitle("DING DONG"). + AlertTitle(post.Title). AlertBody("I got you something"). AlertSummaryArg(post.Subreddit). Category("post-watch").