apollo-backend/internal/worker/stuck_notifications.go
Andre Medeiros d27f4604b9 fix casting
2022-11-02 01:11:49 -04:00

217 lines
5.5 KiB
Go

package worker
import (
"context"
"os"
"time"
"github.com/DataDog/datadog-go/statsd"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
"go.uber.org/zap"
"github.com/christianselig/apollo-backend/internal/domain"
"github.com/christianselig/apollo-backend/internal/reddit"
"github.com/christianselig/apollo-backend/internal/repository"
)
type stuckNotificationsWorker struct {
logger *zap.Logger
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
reddit *reddit.Client
accountRepo domain.AccountRepository
}
func NewStuckNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, consumers int) Worker {
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
statsd,
redis,
consumers,
)
return &stuckNotificationsWorker{
logger,
statsd,
db,
redis,
reddit,
repository.NewPostgresAccount(db),
}
}
func (snw *stuckNotificationsWorker) Process(ctx context.Context, args ...interface{}) error {
now := time.Now()
defer func() {
elapsed := time.Now().Sub(now).Milliseconds()
_ = snw.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), []string{"queue:stuck-notifications"}, 0.1)
}()
id := int64(args[0].(float64))
snw.logger.Debug("starting job", zap.Int64("account#id", id))
account, err := snw.accountRepo.GetByID(ctx, id)
if err != nil {
snw.logger.Error("failed to fetch account from database", zap.Error(err), zap.Int64("account#id", id))
return nil
}
if account.LastMessageID == "" {
snw.logger.Debug("account has no messages, bailing early",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return nil
}
rac := snw.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken)
snw.logger.Debug("fetching last thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
kind := account.LastMessageID[:2]
var things *reddit.ListingResponse
if kind == "t4" {
snw.logger.Debug("checking last thing via inbox",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
things, err = rac.MessageInbox(ctx)
if err != nil {
if err != reddit.ErrRateLimited {
snw.logger.Error("failed to fetch last thing via inbox",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
}
return nil
}
} else {
things, err = rac.AboutInfo(ctx, account.LastMessageID)
if err != nil {
snw.logger.Error("failed to fetch last thing",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return nil
}
}
if things.Count > 0 {
for _, thing := range things.Children {
if thing.FullName() != account.LastMessageID {
continue
}
if thing.IsDeleted() {
break
}
if kind == "t4" {
return nil
}
sthings, err := rac.MessageInbox(ctx)
if err != nil {
snw.logger.Error("failed to check inbox",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return nil
}
found := false
for _, sthing := range sthings.Children {
if sthing.FullName() == account.LastMessageID {
found = true
}
}
if !found {
snw.logger.Debug("thing exists, but not on inbox, marking as deleted",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
)
break
}
snw.logger.Debug("thing exists, bailing early",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
)
return nil
}
}
snw.logger.Info("thing got deleted, resetting",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
)
if kind != "t4" {
snw.logger.Debug("getting message inbox to find last good thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
things, err = rac.MessageInbox(ctx)
if err != nil {
snw.logger.Error("failed to check inbox",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return nil
}
}
account.LastMessageID = ""
snw.logger.Debug("calculating last good thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
for _, thing := range things.Children {
if thing.IsDeleted() {
snw.logger.Debug("thing got deleted, checking next",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", thing.FullName()),
)
continue
}
account.LastMessageID = thing.FullName()
break
}
snw.logger.Debug("updating last good thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
)
if err := snw.accountRepo.Update(ctx, &account); err != nil {
snw.logger.Error("failed to update account's last message id",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return err
}
return nil
}