address an edge case where threads without comments will never stop

This commit is contained in:
Andre Medeiros 2022-10-27 21:52:06 -04:00
parent c9877879da
commit 2352e146c3

View file

@ -121,15 +121,13 @@ type liveActivitiesConsumer struct {
*liveActivitiesWorker *liveActivitiesWorker
tag int tag int
apnsSandbox *apns2.Client apns *apns2.Client
apnsProduction *apns2.Client
} }
func NewLiveActivitiesConsumer(law *liveActivitiesWorker, tag int) *liveActivitiesConsumer { func NewLiveActivitiesConsumer(law *liveActivitiesWorker, tag int) *liveActivitiesConsumer {
return &liveActivitiesConsumer{ return &liveActivitiesConsumer{
law, law,
tag, tag,
apns2.NewTokenClient(law.apns),
apns2.NewTokenClient(law.apns).Production(), apns2.NewTokenClient(law.apns).Production(),
} }
} }
@ -223,7 +221,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
return return
} }
if len(tr.Children) == 0 { if len(tr.Children) == 0 && la.ExpiresAt.After(now) {
lac.logger.Debug("no comments found", zap.String("live_activity#apns_token", at)) lac.logger.Debug("no comments found", zap.String("live_activity#apns_token", at))
return return
} }
@ -262,7 +260,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
PostScore: tr.Post.Score, PostScore: tr.Post.Score,
} }
if len(candidates) >= 1 { if len(candidates) > 0 {
comment := candidates[0] comment := candidates[0]
din.CommentID = comment.ID din.CommentID = comment.ID
@ -277,14 +275,13 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
ev = "end" ev = "end"
} }
pl := map[string]interface{}{ bb, _ := json.Marshal(map[string]interface{}{
"aps": map[string]interface{}{ "aps": map[string]interface{}{
"timestamp": time.Now().Unix(), "timestamp": time.Now().Unix(),
"event": ev, "event": ev,
"content-state": din, "content-state": din,
}, },
} })
bb, _ := json.Marshal(pl)
notification := &apns2.Notification{ notification := &apns2.Notification{
DeviceToken: la.APNSToken, DeviceToken: la.APNSToken,
@ -293,12 +290,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
Payload: bb, Payload: bb,
} }
client := lac.apnsProduction res, err := lac.apns.PushWithContext(ctx, notification)
/*if la.Sandbox {
client = lac.apnsSandbox
}*/
res, err := client.PushWithContext(ctx, notification)
if err != nil { if err != nil {
_ = lac.statsd.Incr("apns.live_activities.errors", []string{}, 1) _ = lac.statsd.Incr("apns.live_activities.errors", []string{}, 1)
lac.logger.Error("failed to send notification", lac.logger.Error("failed to send notification",