From 199643449d1d6b1c6d2f0d383c20673cefa6be85 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Thu, 16 Mar 2023 13:12:31 -0400 Subject: [PATCH] make sure we don't run jobs that are too old --- internal/worker/notifications.go | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/internal/worker/notifications.go b/internal/worker/notifications.go index a90644a..30e3ac4 100644 --- a/internal/worker/notifications.go +++ b/internal/worker/notifications.go @@ -157,20 +157,6 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { _ = nc.statsd.Incr("apollo.consumer.executions", notificationTags, 0.1) }() - // Measure queue latency - key := fmt.Sprintf("locks:accounts:%s", id) - ttl := nc.redis.PTTL(ctx, key).Val() - age := (domain.NotificationCheckTimeout - ttl) - _ = nc.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), notificationTags, 0.1) - - defer func() { - if err := nc.redis.Del(ctx, key).Err(); err != nil { - logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key)) - } - }() - - logger.Debug("starting job") - defer func(ctx context.Context) { _, span := nc.tracer.Start(ctx, "queue:ack") defer span.End() @@ -182,6 +168,24 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { } }(ctx) + // Measure queue latency + key := fmt.Sprintf("locks:accounts:%s", id) + ttl := nc.redis.PTTL(ctx, key).Val() + if ttl == 0 { + logger.Debug("job is too old, skipping") + return + } + age := (domain.NotificationCheckTimeout - ttl) + _ = nc.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), notificationTags, 0.1) + + defer func() { + if err := nc.redis.Del(ctx, key).Err(); err != nil { + logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key)) + } + }() + + logger.Debug("starting job") + account, err := nc.accountRepo.GetByRedditID(ctx, id) if err != nil { if err != domain.ErrNotFound {