2022-10-19 13:37:41 +00:00
|
|
|
package worker
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"os"
|
|
|
|
"sort"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/DataDog/datadog-go/statsd"
|
|
|
|
"github.com/go-redis/redis/v8"
|
|
|
|
"github.com/jackc/pgx/v4/pgxpool"
|
|
|
|
"github.com/sideshow/apns2"
|
|
|
|
"github.com/sideshow/apns2/token"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
|
|
|
"github.com/christianselig/apollo-backend/internal/domain"
|
|
|
|
"github.com/christianselig/apollo-backend/internal/reddit"
|
|
|
|
"github.com/christianselig/apollo-backend/internal/repository"
|
|
|
|
)
|
|
|
|
|
|
|
|
type DynamicIslandNotification struct {
|
|
|
|
PostCommentCount int `json:"postTotalComments"`
|
|
|
|
PostScore int64 `json:"postScore"`
|
2022-10-28 01:54:30 +00:00
|
|
|
CommentID string `json:"commentId,omitempty"`
|
|
|
|
CommentAuthor string `json:"commentAuthor,omitempty"`
|
|
|
|
CommentBody string `json:"commentBody,omitempty"`
|
|
|
|
CommentAge int64 `json:"commentAge,omitempty"`
|
|
|
|
CommentScore int64 `json:"commentScore,omitempty"`
|
2022-10-19 13:37:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type liveActivitiesWorker struct {
|
2022-11-02 04:49:20 +00:00
|
|
|
logger *zap.Logger
|
|
|
|
statsd *statsd.Client
|
|
|
|
db *pgxpool.Pool
|
|
|
|
redis *redis.Client
|
|
|
|
reddit *reddit.Client
|
|
|
|
apns *apns2.Client
|
2022-10-19 13:37:41 +00:00
|
|
|
liveActivityRepo domain.LiveActivityRepository
|
|
|
|
}
|
|
|
|
|
2022-11-02 04:49:20 +00:00
|
|
|
func NewLiveActivitiesWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, consumers int) Worker {
|
2022-10-19 13:37:41 +00:00
|
|
|
reddit := reddit.NewClient(
|
|
|
|
os.Getenv("REDDIT_CLIENT_ID"),
|
|
|
|
os.Getenv("REDDIT_CLIENT_SECRET"),
|
|
|
|
statsd,
|
|
|
|
redis,
|
|
|
|
consumers,
|
|
|
|
)
|
|
|
|
|
2022-11-02 04:49:20 +00:00
|
|
|
var apns *apns2.Client
|
2022-10-19 13:37:41 +00:00
|
|
|
{
|
|
|
|
authKey, err := token.AuthKeyFromFile(os.Getenv("APPLE_KEY_PATH"))
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
2022-11-02 04:49:20 +00:00
|
|
|
tok := &token.Token{
|
2022-10-19 13:37:41 +00:00
|
|
|
AuthKey: authKey,
|
|
|
|
KeyID: os.Getenv("APPLE_KEY_ID"),
|
|
|
|
TeamID: os.Getenv("APPLE_TEAM_ID"),
|
|
|
|
}
|
2022-11-02 04:49:20 +00:00
|
|
|
apns = apns2.NewTokenClient(tok).Production()
|
2022-10-19 13:37:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return &liveActivitiesWorker{
|
|
|
|
logger,
|
|
|
|
statsd,
|
|
|
|
db,
|
|
|
|
redis,
|
|
|
|
reddit,
|
|
|
|
apns,
|
|
|
|
repository.NewPostgresLiveActivity(db),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-02 04:49:20 +00:00
|
|
|
func (law *liveActivitiesWorker) Process(ctx context.Context, args ...interface{}) error {
|
2022-10-27 19:47:26 +00:00
|
|
|
now := time.Now()
|
2022-10-19 13:37:41 +00:00
|
|
|
defer func() {
|
|
|
|
elapsed := time.Now().Sub(now).Milliseconds()
|
2022-11-02 04:49:20 +00:00
|
|
|
_ = law.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), []string{"queue:live_activities"}, 0.1)
|
2022-10-19 13:37:41 +00:00
|
|
|
}()
|
|
|
|
|
2022-11-02 04:49:20 +00:00
|
|
|
at := args[0].(string)
|
2022-10-19 13:37:41 +00:00
|
|
|
key := fmt.Sprintf("locks:live-activities:%s", at)
|
|
|
|
|
|
|
|
// Measure queue latency
|
2022-11-02 04:49:20 +00:00
|
|
|
ttl := law.redis.PTTL(ctx, key).Val()
|
2022-10-19 13:37:41 +00:00
|
|
|
age := (domain.NotificationCheckTimeout - ttl)
|
2022-11-02 04:49:20 +00:00
|
|
|
_ = law.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), []string{"queue:live_activities"}, 0.1)
|
2022-10-19 13:37:41 +00:00
|
|
|
|
|
|
|
defer func() {
|
2022-11-02 04:49:20 +00:00
|
|
|
if err := law.redis.Del(ctx, key).Err(); err != nil {
|
|
|
|
law.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key))
|
2022-10-19 13:37:41 +00:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2022-11-02 04:49:20 +00:00
|
|
|
law.logger.Debug("starting job", zap.String("live_activity#apns_token", at))
|
2022-10-19 13:37:41 +00:00
|
|
|
|
2022-11-02 04:49:20 +00:00
|
|
|
la, err := law.liveActivityRepo.Get(ctx, at)
|
2022-10-19 13:37:41 +00:00
|
|
|
if err != nil {
|
2022-11-02 04:49:20 +00:00
|
|
|
law.logger.Error("failed to get live activity", zap.Error(err), zap.String("live_activity#apns_token", at))
|
|
|
|
return err
|
2022-10-19 13:37:41 +00:00
|
|
|
}
|
|
|
|
|
2022-11-02 04:49:20 +00:00
|
|
|
rac := law.reddit.NewAuthenticatedClient(la.RedditAccountID, la.RefreshToken, la.AccessToken)
|
2022-10-19 13:37:41 +00:00
|
|
|
if la.TokenExpiresAt.Before(now.Add(5 * time.Minute)) {
|
2022-11-02 04:49:20 +00:00
|
|
|
law.logger.Debug("refreshing reddit token",
|
2022-10-19 13:37:41 +00:00
|
|
|
zap.String("live_activity#apns_token", at),
|
2022-10-26 22:28:11 +00:00
|
|
|
zap.String("reddit#id", la.RedditAccountID),
|
|
|
|
zap.String("reddit#access_token", rac.ObfuscatedAccessToken()),
|
|
|
|
zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()),
|
2022-10-19 13:37:41 +00:00
|
|
|
)
|
|
|
|
|
2022-10-27 00:43:44 +00:00
|
|
|
tokens, err := rac.RefreshTokens(ctx)
|
2022-10-19 13:37:41 +00:00
|
|
|
if err != nil {
|
2022-11-02 04:49:20 +00:00
|
|
|
law.logger.Error("failed to refresh reddit tokens",
|
2022-10-26 22:28:11 +00:00
|
|
|
zap.Error(err),
|
|
|
|
zap.String("live_activity#apns_token", at),
|
|
|
|
zap.String("reddit#id", la.RedditAccountID),
|
|
|
|
zap.String("reddit#access_token", rac.ObfuscatedAccessToken()),
|
|
|
|
zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()),
|
|
|
|
)
|
2022-10-26 23:00:40 +00:00
|
|
|
if err == reddit.ErrOauthRevoked {
|
2022-11-02 04:49:20 +00:00
|
|
|
_ = law.liveActivityRepo.Delete(ctx, at)
|
|
|
|
return nil
|
2022-10-26 23:00:40 +00:00
|
|
|
}
|
2022-11-02 04:49:20 +00:00
|
|
|
|
|
|
|
return err
|
2022-10-19 13:37:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Update account
|
|
|
|
la.AccessToken = tokens.AccessToken
|
|
|
|
la.RefreshToken = tokens.RefreshToken
|
|
|
|
la.TokenExpiresAt = now.Add(tokens.Expiry)
|
2022-11-02 04:49:20 +00:00
|
|
|
_ = law.liveActivityRepo.Update(ctx, &la)
|
2022-10-19 13:37:41 +00:00
|
|
|
|
|
|
|
// Refresh client
|
2022-11-02 04:49:20 +00:00
|
|
|
rac = law.reddit.NewAuthenticatedClient(la.RedditAccountID, tokens.RefreshToken, tokens.AccessToken)
|
2022-10-19 13:37:41 +00:00
|
|
|
}
|
|
|
|
|
2022-11-02 04:49:20 +00:00
|
|
|
law.logger.Debug("fetching latest comments", zap.String("live_activity#apns_token", at))
|
2022-10-19 13:37:41 +00:00
|
|
|
|
2022-10-27 00:43:44 +00:00
|
|
|
tr, err := rac.TopLevelComments(ctx, la.Subreddit, la.ThreadID)
|
2022-10-19 13:37:41 +00:00
|
|
|
if err != nil {
|
2022-11-02 04:49:20 +00:00
|
|
|
law.logger.Error("failed to fetch latest comments",
|
2022-10-19 13:37:41 +00:00
|
|
|
zap.Error(err),
|
|
|
|
zap.String("live_activity#apns_token", at),
|
2022-10-26 22:28:11 +00:00
|
|
|
zap.String("reddit#id", la.RedditAccountID),
|
|
|
|
zap.String("reddit#access_token", rac.ObfuscatedAccessToken()),
|
|
|
|
zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()),
|
2022-10-19 13:37:41 +00:00
|
|
|
)
|
2022-10-26 23:00:40 +00:00
|
|
|
if err == reddit.ErrOauthRevoked {
|
2022-11-02 04:49:20 +00:00
|
|
|
_ = law.liveActivityRepo.Delete(ctx, at)
|
|
|
|
return nil
|
2022-10-26 23:00:40 +00:00
|
|
|
}
|
2022-11-02 04:49:20 +00:00
|
|
|
|
|
|
|
return err
|
2022-10-19 13:37:41 +00:00
|
|
|
}
|
|
|
|
|
2022-10-28 01:52:06 +00:00
|
|
|
if len(tr.Children) == 0 && la.ExpiresAt.After(now) {
|
2022-11-02 04:49:20 +00:00
|
|
|
law.logger.Debug("no comments found", zap.String("live_activity#apns_token", at))
|
|
|
|
return nil
|
2022-10-19 13:37:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Filter out comments in the last minute
|
|
|
|
candidates := make([]*reddit.Thing, 0)
|
2022-10-26 23:31:16 +00:00
|
|
|
cutoffs := []time.Time{
|
|
|
|
now.Add(-domain.LiveActivityCheckInterval),
|
|
|
|
now.Add(-domain.LiveActivityCheckInterval * 2),
|
|
|
|
now.Add(-domain.LiveActivityCheckInterval * 4),
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, cutoff := range cutoffs {
|
|
|
|
for _, t := range tr.Children {
|
|
|
|
if t.CreatedAt.After(cutoff) {
|
|
|
|
candidates = append(candidates, t)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(candidates) > 0 {
|
|
|
|
break
|
2022-10-19 13:37:41 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-27 13:51:07 +00:00
|
|
|
if len(candidates) == 0 && la.ExpiresAt.After(now) {
|
2022-11-02 04:49:20 +00:00
|
|
|
law.logger.Debug("no new comments found", zap.String("live_activity#apns_token", at))
|
|
|
|
return nil
|
2022-10-19 13:37:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
sort.Slice(candidates, func(i, j int) bool {
|
|
|
|
return candidates[i].Score > candidates[j].Score
|
|
|
|
})
|
|
|
|
|
|
|
|
din := DynamicIslandNotification{
|
|
|
|
PostCommentCount: tr.Post.NumComments,
|
|
|
|
PostScore: tr.Post.Score,
|
2022-10-27 13:51:07 +00:00
|
|
|
}
|
|
|
|
|
2022-10-28 01:52:06 +00:00
|
|
|
if len(candidates) > 0 {
|
2022-10-27 13:51:07 +00:00
|
|
|
comment := candidates[0]
|
|
|
|
|
|
|
|
din.CommentID = comment.ID
|
|
|
|
din.CommentAuthor = comment.Author
|
|
|
|
din.CommentBody = comment.Body
|
|
|
|
din.CommentAge = comment.CreatedAt.Unix()
|
|
|
|
din.CommentScore = comment.Score
|
2022-10-19 13:37:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
ev := "update"
|
|
|
|
if la.ExpiresAt.Before(now) {
|
|
|
|
ev = "end"
|
|
|
|
}
|
|
|
|
|
2022-10-28 01:52:06 +00:00
|
|
|
bb, _ := json.Marshal(map[string]interface{}{
|
2022-10-19 13:37:41 +00:00
|
|
|
"aps": map[string]interface{}{
|
2022-10-31 17:35:01 +00:00
|
|
|
"content-state": din,
|
|
|
|
"dismissal-date": la.ExpiresAt.Unix(),
|
|
|
|
"event": ev,
|
|
|
|
"timestamp": now.Unix(),
|
2022-10-19 13:37:41 +00:00
|
|
|
},
|
2022-10-28 01:52:06 +00:00
|
|
|
})
|
2022-10-19 13:37:41 +00:00
|
|
|
|
|
|
|
notification := &apns2.Notification{
|
|
|
|
DeviceToken: la.APNSToken,
|
|
|
|
Topic: "com.christianselig.Apollo.push-type.liveactivity",
|
|
|
|
PushType: "liveactivity",
|
|
|
|
Payload: bb,
|
|
|
|
}
|
|
|
|
|
2022-11-02 04:49:20 +00:00
|
|
|
res, err := law.apns.PushWithContext(ctx, notification)
|
2022-10-19 13:37:41 +00:00
|
|
|
if err != nil {
|
2022-11-02 04:49:20 +00:00
|
|
|
_ = law.statsd.Incr("apns.live_activities.errors", []string{}, 1)
|
|
|
|
law.logger.Error("failed to send notification",
|
2022-10-19 13:37:41 +00:00
|
|
|
zap.Error(err),
|
|
|
|
zap.String("live_activity#apns_token", at),
|
2022-10-27 16:27:19 +00:00
|
|
|
zap.Bool("live_activity#sandbox", la.Sandbox),
|
2022-10-27 13:51:07 +00:00
|
|
|
zap.String("notification#type", ev),
|
2022-10-19 13:37:41 +00:00
|
|
|
)
|
|
|
|
|
2022-11-02 04:49:20 +00:00
|
|
|
_ = law.liveActivityRepo.Delete(ctx, at)
|
2022-10-19 13:37:41 +00:00
|
|
|
} else if !res.Sent() {
|
2022-11-02 04:49:20 +00:00
|
|
|
_ = law.statsd.Incr("apns.live_activities.errors", []string{}, 1)
|
|
|
|
law.logger.Error("notification not sent",
|
2022-10-19 13:37:41 +00:00
|
|
|
zap.String("live_activity#apns_token", at),
|
2022-10-27 16:27:19 +00:00
|
|
|
zap.Bool("live_activity#sandbox", la.Sandbox),
|
2022-10-27 13:51:07 +00:00
|
|
|
zap.String("notification#type", ev),
|
2022-10-19 13:37:41 +00:00
|
|
|
zap.Int("response#status", res.StatusCode),
|
|
|
|
zap.String("response#reason", res.Reason),
|
|
|
|
)
|
|
|
|
|
2022-11-02 04:49:20 +00:00
|
|
|
_ = law.liveActivityRepo.Delete(ctx, at)
|
2022-10-19 13:37:41 +00:00
|
|
|
} else {
|
2022-11-02 04:49:20 +00:00
|
|
|
_ = law.statsd.Incr("apns.notification.sent", []string{}, 1)
|
|
|
|
law.logger.Debug("sent notification",
|
2022-10-19 13:37:41 +00:00
|
|
|
zap.String("live_activity#apns_token", at),
|
2022-10-27 16:27:19 +00:00
|
|
|
zap.Bool("live_activity#sandbox", la.Sandbox),
|
2022-10-27 13:51:07 +00:00
|
|
|
zap.String("notification#type", ev),
|
2022-10-19 13:37:41 +00:00
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
if la.ExpiresAt.Before(now) {
|
2022-11-02 04:49:20 +00:00
|
|
|
law.logger.Debug("live activity expired, deleting", zap.String("live_activity#apns_token", at))
|
|
|
|
_ = law.liveActivityRepo.Delete(ctx, at)
|
2022-10-19 13:37:41 +00:00
|
|
|
}
|
|
|
|
|
2022-11-02 04:49:20 +00:00
|
|
|
law.logger.Debug("finishing job",
|
2022-10-19 13:37:41 +00:00
|
|
|
zap.String("live_activity#apns_token", at),
|
|
|
|
)
|
2022-11-02 04:49:20 +00:00
|
|
|
return nil
|
2022-10-19 13:37:41 +00:00
|
|
|
}
|