measure dequeue latency

This commit is contained in:
Andre Medeiros 2022-07-13 16:28:54 -04:00
parent 80bae7ff9f
commit 94d3941005

View file

@ -93,7 +93,7 @@ func (nw *notificationsWorker) Start() error {
nw.logger.Info("starting up notifications worker", zap.Int("consumers", nw.consumers)) nw.logger.Info("starting up notifications worker", zap.Int("consumers", nw.consumers))
prefetchLimit := int64(nw.consumers * 2) prefetchLimit := int64(nw.consumers * 4)
if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil { if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil {
return err return err
@ -136,9 +136,13 @@ func NewNotificationsConsumer(nw *notificationsWorker, tag int) *notificationsCo
func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
id := delivery.Payload() id := delivery.Payload()
key := fmt.Sprintf("locks:accounts:%s", id)
// Measure queue latency
ttl := nc.redis.TTL(nc, key).Val().Milliseconds()
_ = nc.statsd.Histogram("apollo.dequeue.time", float64(ttl), []string{"queue:notifications"}, 1.0)
defer func() { defer func() {
key := fmt.Sprintf("locks:accounts:%s", id)
if err := nc.redis.Del(nc, key).Err(); err != nil { if err := nc.redis.Del(nc, key).Err(); err != nil {
nc.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key)) nc.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key))
} }