apollo-backend/internal/worker/stuck_notifications.go

288 lines
6.9 KiB
Go
Raw Permalink Normal View History

2021-10-17 14:17:41 +00:00
package worker
import (
"context"
"fmt"
"os"
"strconv"
2022-07-13 22:30:03 +00:00
"time"
2021-10-17 14:17:41 +00:00
"github.com/DataDog/datadog-go/statsd"
2022-11-01 23:02:25 +00:00
"github.com/adjust/rmq/v5"
2021-10-17 14:17:41 +00:00
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
2022-11-03 17:43:10 +00:00
"go.opentelemetry.io/otel/trace"
2022-05-23 18:17:25 +00:00
"go.uber.org/zap"
2021-10-17 14:17:41 +00:00
"github.com/christianselig/apollo-backend/internal/domain"
"github.com/christianselig/apollo-backend/internal/reddit"
"github.com/christianselig/apollo-backend/internal/repository"
)
type stuckNotificationsWorker struct {
context.Context
2022-05-23 18:17:25 +00:00
logger *zap.Logger
2022-11-03 17:43:10 +00:00
tracer trace.Tracer
2021-10-17 14:17:41 +00:00
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
queue rmq.Connection
reddit *reddit.Client
consumers int
accountRepo domain.AccountRepository
}
2022-11-03 17:43:10 +00:00
func NewStuckNotificationsWorker(ctx context.Context, logger *zap.Logger, tracer trace.Tracer, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
2021-10-17 14:17:41 +00:00
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
2022-11-03 17:43:10 +00:00
tracer,
2021-10-17 14:17:41 +00:00
statsd,
2022-03-12 17:50:05 +00:00
redis,
2021-10-17 14:17:41 +00:00
consumers,
)
return &stuckNotificationsWorker{
ctx,
2021-10-17 14:17:41 +00:00
logger,
2022-11-03 17:43:10 +00:00
tracer,
2021-10-17 14:17:41 +00:00
statsd,
db,
redis,
queue,
reddit,
consumers,
repository.NewPostgresAccount(db),
}
}
func (snw *stuckNotificationsWorker) Start() error {
queue, err := snw.queue.OpenQueue("stuck-notifications")
if err != nil {
return err
}
2022-05-23 18:17:25 +00:00
snw.logger.Info("starting up stuck notifications worker", zap.Int("consumers", snw.consumers))
2021-10-17 14:17:41 +00:00
prefetchLimit := int64(snw.consumers * 2)
if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil {
return err
}
host, _ := os.Hostname()
for i := 0; i < snw.consumers; i++ {
name := fmt.Sprintf("consumer %s-%d", host, i)
consumer := NewStuckNotificationsConsumer(snw, i)
if _, err := queue.AddConsumer(name, consumer); err != nil {
return err
}
}
return nil
}
func (snw *stuckNotificationsWorker) Stop() {
<-snw.queue.StopAllConsuming() // wait for all Consume() calls to finish
}
type stuckNotificationsConsumer struct {
*stuckNotificationsWorker
tag int
}
func NewStuckNotificationsConsumer(snw *stuckNotificationsWorker, tag int) *stuckNotificationsConsumer {
return &stuckNotificationsConsumer{
snw,
tag,
}
}
func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) {
2022-10-27 00:46:17 +00:00
ctx, cancel := context.WithCancel(snc)
defer cancel()
2022-07-13 22:30:03 +00:00
now := time.Now()
defer func() {
elapsed := time.Now().Sub(now).Milliseconds()
_ = snc.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), []string{"queue:stuck-notifications"}, 0.1)
}()
2021-10-17 14:17:41 +00:00
id, err := strconv.ParseInt(delivery.Payload(), 10, 64)
if err != nil {
2022-05-23 18:17:25 +00:00
snc.logger.Error("failed to parse account id from payload", zap.Error(err), zap.String("payload", delivery.Payload()))
2021-10-17 14:17:41 +00:00
_ = delivery.Reject()
return
}
2022-05-23 18:17:25 +00:00
snc.logger.Debug("starting job", zap.Int64("account#id", id))
2021-10-17 14:17:41 +00:00
defer func() { _ = delivery.Ack() }()
2022-10-27 00:46:17 +00:00
account, err := snc.accountRepo.GetByID(ctx, id)
2021-10-17 14:17:41 +00:00
if err != nil {
2022-05-23 18:17:25 +00:00
snc.logger.Error("failed to fetch account from database", zap.Error(err), zap.Int64("account#id", id))
2021-10-17 14:17:41 +00:00
return
}
if account.LastMessageID == "" {
2022-05-23 18:17:25 +00:00
snc.logger.Debug("account has no messages, bailing early",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
2021-10-17 14:17:41 +00:00
return
}
2022-03-12 17:50:05 +00:00
rac := snc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken)
2021-10-17 14:17:41 +00:00
2022-05-23 18:17:25 +00:00
snc.logger.Debug("fetching last thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
2021-10-17 14:17:41 +00:00
2021-10-17 14:47:43 +00:00
kind := account.LastMessageID[:2]
var things *reddit.ListingResponse
if kind == "t4" {
2022-05-23 18:17:25 +00:00
snc.logger.Debug("checking last thing via inbox",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
2021-10-17 14:17:41 +00:00
2022-10-27 00:46:17 +00:00
things, err = rac.MessageInbox(ctx)
2021-10-17 14:47:43 +00:00
if err != nil {
2022-05-19 15:51:56 +00:00
if err != reddit.ErrRateLimited {
2022-05-23 18:17:25 +00:00
snc.logger.Error("failed to fetch last thing via inbox",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
2022-05-19 15:51:56 +00:00
}
2021-10-17 14:17:41 +00:00
return
}
2021-10-17 14:47:43 +00:00
} else {
2022-10-27 00:46:17 +00:00
things, err = rac.AboutInfo(ctx, account.LastMessageID)
2021-10-17 14:47:43 +00:00
if err != nil {
2022-05-23 18:17:25 +00:00
snc.logger.Error("failed to fetch last thing",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
2021-10-17 14:47:43 +00:00
return
}
}
2021-10-17 14:53:14 +00:00
if things.Count > 0 {
2021-10-17 14:47:43 +00:00
for _, thing := range things.Children {
if thing.FullName() != account.LastMessageID {
continue
}
2022-05-19 16:37:03 +00:00
if thing.IsDeleted() {
break
}
2022-05-19 17:02:16 +00:00
if kind == "t4" {
return
}
2022-10-27 00:46:17 +00:00
sthings, err := rac.MessageInbox(ctx)
2022-05-19 16:37:03 +00:00
if err != nil {
2022-05-23 18:17:25 +00:00
snc.logger.Error("failed to check inbox",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
2022-05-19 16:37:03 +00:00
return
}
2022-05-19 17:02:16 +00:00
found := false
for _, sthing := range sthings.Children {
if sthing.FullName() == account.LastMessageID {
found = true
}
}
if !found {
2022-05-23 18:17:25 +00:00
snc.logger.Debug("thing exists, but not on inbox, marking as deleted",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
)
2022-05-19 16:37:03 +00:00
break
2021-10-17 14:47:43 +00:00
}
2022-05-19 16:37:03 +00:00
2022-05-23 18:17:25 +00:00
snc.logger.Debug("thing exists, bailing early",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
)
2022-05-19 16:37:03 +00:00
return
2021-10-17 14:47:43 +00:00
}
2021-10-17 14:17:41 +00:00
}
2022-05-23 18:17:25 +00:00
snc.logger.Info("thing got deleted, resetting",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
)
2021-10-17 14:17:41 +00:00
2021-10-17 14:47:43 +00:00
if kind != "t4" {
2022-05-23 18:17:25 +00:00
snc.logger.Debug("getting message inbox to find last good thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
2021-10-17 15:48:41 +00:00
2022-10-27 00:46:17 +00:00
things, err = rac.MessageInbox(ctx)
2021-10-17 14:47:43 +00:00
if err != nil {
2022-05-23 18:17:25 +00:00
snc.logger.Error("failed to check inbox",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
2021-10-17 14:47:43 +00:00
return
}
2021-10-17 14:17:41 +00:00
}
account.LastMessageID = ""
2021-10-17 15:48:41 +00:00
2022-05-23 18:17:25 +00:00
snc.logger.Debug("calculating last good thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
2021-10-17 14:47:43 +00:00
for _, thing := range things.Children {
2021-10-17 15:27:52 +00:00
if thing.IsDeleted() {
2022-05-23 18:17:25 +00:00
snc.logger.Debug("thing got deleted, checking next",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", thing.FullName()),
)
2021-10-17 15:27:52 +00:00
continue
}
2021-10-17 15:48:41 +00:00
account.LastMessageID = thing.FullName()
break
2021-10-17 14:17:41 +00:00
}
2022-05-23 18:17:25 +00:00
snc.logger.Debug("updating last good thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
)
2021-10-17 15:27:52 +00:00
2022-10-27 00:46:17 +00:00
if err := snc.accountRepo.Update(ctx, &account); err != nil {
2022-05-23 18:17:25 +00:00
snc.logger.Error("failed to update account's last message id",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
2021-10-17 14:17:41 +00:00
}
}