fix delivery ack in jobs

This commit is contained in:
Andre Medeiros 2021-07-08 23:50:34 -04:00
parent b9373832eb
commit 5cd3eeab09

View file

@ -128,14 +128,20 @@ func main() {
numConsumers := runtime.NumCPU() * 12 numConsumers := runtime.NumCPU() * 12
prefetchLimit := int64(numConsumers * 2) prefetchLimit := int64(numConsumers * 2)
runtime.GOMAXPROCS(numConsumers / 4) runtime.GOMAXPROCS(numConsumers)
logger.WithFields(logrus.Fields{
"numConsumers": numConsumers,
}).Info("starting up notifications worker")
if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil { if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil {
panic(err) panic(err)
} }
host, _ := os.Hostname()
for i := 0; i < numConsumers; i++ { for i := 0; i < numConsumers; i++ {
name := fmt.Sprintf("consumer %d", i) name := fmt.Sprintf("consumer %s-%d", host, i)
consumer := NewConsumer(i, logger, statsd, redisConn, pool, rc, apnsToken) consumer := NewConsumer(i, logger, statsd, redisConn, pool, rc, apnsToken)
if _, err := queue.AddConsumer(name, consumer); err != nil { if _, err := queue.AddConsumer(name, consumer); err != nil {
@ -209,6 +215,8 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
return return
} }
defer delivery.Ack()
now := float64(time.Now().UnixNano()/int64(time.Millisecond)) / 1000 now := float64(time.Now().UnixNano()/int64(time.Millisecond)) / 1000
stmt := `SELECT stmt := `SELECT
@ -235,8 +243,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
"accountID": id, "accountID": id,
"err": err, "err": err,
}).Error("failed to fetch account from database") }).Error("failed to fetch account from database")
delivery.Reject()
return return
} }
@ -257,8 +263,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
"accountID": id, "accountID": id,
"err": err, "err": err,
}).Error("failed to refresh reddit tokens") }).Error("failed to refresh reddit tokens")
delivery.Reject()
return return
} }
err = c.pool.BeginFunc(ctx, func(tx pgx.Tx) error { err = c.pool.BeginFunc(ctx, func(tx pgx.Tx) error {
@ -273,8 +277,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
"accountID": id, "accountID": id,
"err": err, "err": err,
}).Error("failed to update reddit tokens for account") }).Error("failed to update reddit tokens for account")
delivery.Reject()
return return
} }
} }
@ -289,8 +291,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
"accountID": id, "accountID": id,
"err": err, "err": err,
}).Error("failed to fetch message inbox") }).Error("failed to fetch message inbox")
delivery.Reject()
return return
} }
@ -311,8 +311,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
"accountID": id, "accountID": id,
"err": err, "err": err,
}).Error("failed to update last_checked_at for account") }).Error("failed to update last_checked_at for account")
delivery.Reject()
return return
} }
@ -320,8 +318,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
c.logger.WithFields(logrus.Fields{ c.logger.WithFields(logrus.Fields{
"accountID": id, "accountID": id,
}).Debug("no new messages, bailing early") }).Debug("no new messages, bailing early")
delivery.Ack()
return return
} }
@ -339,7 +335,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
"accountID": id, "accountID": id,
"err": err, "err": err,
}).Error("failed to update last_message_id for account") }).Error("failed to update last_message_id for account")
delivery.Reject()
return return
} }
@ -348,7 +343,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
c.logger.WithFields(logrus.Fields{ c.logger.WithFields(logrus.Fields{
"accountID": delivery.Payload(), "accountID": delivery.Payload(),
}).Debug("populating first message ID to prevent spamming") }).Debug("populating first message ID to prevent spamming")
delivery.Ack()
return return
} }
@ -364,8 +358,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
"accountID": id, "accountID": id,
"err": err, "err": err,
}).Error("failed to fetch account devices") }).Error("failed to fetch account devices")
delivery.Reject()
return return
} }
defer rows.Close() defer rows.Close()
@ -407,8 +399,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
} }
} }
delivery.Ack()
c.logger.WithFields(logrus.Fields{ c.logger.WithFields(logrus.Fields{
"accountID": delivery.Payload(), "accountID": delivery.Payload(),
}).Debug("finishing job") }).Debug("finishing job")