diff --git a/internal/worker/notifications.go b/internal/worker/notifications.go index 04b068f..79e2b18 100644 --- a/internal/worker/notifications.go +++ b/internal/worker/notifications.go @@ -280,11 +280,9 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { "count": msgs.Count, }).Debug("fetched messages") - for i := msgs.Count - 1; i >= 0; i-- { - msg := msgs.Children[i] - + for _, msg := range msgs.Children { if !msg.IsDeleted() { - account.LastMessageID = msgs.Children[0].FullName() + account.LastMessageID = msg.FullName() break } } diff --git a/internal/worker/stuck_notifications.go b/internal/worker/stuck_notifications.go index f57ae78..ff5bf47 100644 --- a/internal/worker/stuck_notifications.go +++ b/internal/worker/stuck_notifications.go @@ -173,6 +173,8 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) { continue } + lastAlertedAt = thing.CreatedAt + if !thing.IsDeleted() { snc.logger.WithFields(logrus.Fields{ "account#username": account.NormalizedUsername(), @@ -180,8 +182,6 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) { }).Debug("thing exists, returning") return } - - lastAlertedAt = thing.CreatedAt } } @@ -203,13 +203,25 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) { account.LastMessageID = "" for _, thing := range things.Children { - if lastAlertedAt > thing.CreatedAt && !thing.IsDeleted() { + if thing.IsDeleted() { + continue + } + + if lastAlertedAt == 0 { + account.LastMessageID = thing.FullName() break } - account.LastMessageID = thing.FullName() + if lastAlertedAt > thing.CreatedAt { + account.LastMessageID = thing.FullName() + } } + snc.logger.WithFields(logrus.Fields{ + "account#username": account.NormalizedUsername(), + "thing#id": account.LastMessageID, + }).Debug("updating last good thing") + if err := snc.accountRepo.Update(ctx, &account); err != nil { snc.logger.WithFields(logrus.Fields{ "account#username": account.NormalizedUsername(),