diff --git a/internal/worker/notifications.go b/internal/worker/notifications.go index 8665c29..7c47927 100644 --- a/internal/worker/notifications.go +++ b/internal/worker/notifications.go @@ -93,7 +93,7 @@ func (nw *notificationsWorker) Start() error { nw.logger.Info("starting up notifications worker", zap.Int("consumers", nw.consumers)) - prefetchLimit := int64(nw.consumers * 2) + prefetchLimit := int64(nw.consumers * 4) if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil { return err @@ -136,9 +136,13 @@ func NewNotificationsConsumer(nw *notificationsWorker, tag int) *notificationsCo func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { id := delivery.Payload() + key := fmt.Sprintf("locks:accounts:%s", id) + + // Measure queue latency + ttl := nc.redis.TTL(nc, key).Val().Milliseconds() + _ = nc.statsd.Histogram("apollo.dequeue.time", float64(ttl), []string{"queue:notifications"}, 1.0) defer func() { - key := fmt.Sprintf("locks:accounts:%s", id) if err := nc.redis.Del(nc, key).Err(); err != nil { nc.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key)) }