diff --git a/cmd/apollo-worker-notifications/main.go b/cmd/apollo-worker-notifications/main.go index 25b0830..da81417 100644 --- a/cmd/apollo-worker-notifications/main.go +++ b/cmd/apollo-worker-notifications/main.go @@ -128,14 +128,20 @@ func main() { numConsumers := runtime.NumCPU() * 12 prefetchLimit := int64(numConsumers * 2) - runtime.GOMAXPROCS(numConsumers / 4) + runtime.GOMAXPROCS(numConsumers) + + logger.WithFields(logrus.Fields{ + "numConsumers": numConsumers, + }).Info("starting up notifications worker") if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil { panic(err) } + host, _ := os.Hostname() + for i := 0; i < numConsumers; i++ { - name := fmt.Sprintf("consumer %d", i) + name := fmt.Sprintf("consumer %s-%d", host, i) consumer := NewConsumer(i, logger, statsd, redisConn, pool, rc, apnsToken) if _, err := queue.AddConsumer(name, consumer); err != nil { @@ -209,6 +215,8 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { return } + defer delivery.Ack() + now := float64(time.Now().UnixNano()/int64(time.Millisecond)) / 1000 stmt := `SELECT @@ -235,8 +243,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { "accountID": id, "err": err, }).Error("failed to fetch account from database") - - delivery.Reject() return } @@ -257,8 +263,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { "accountID": id, "err": err, }).Error("failed to refresh reddit tokens") - - delivery.Reject() return } err = c.pool.BeginFunc(ctx, func(tx pgx.Tx) error { @@ -273,8 +277,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { "accountID": id, "err": err, }).Error("failed to update reddit tokens for account") - - delivery.Reject() return } } @@ -289,8 +291,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { "accountID": id, "err": err, }).Error("failed to fetch message inbox") - - delivery.Reject() return } @@ -311,8 +311,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { "accountID": id, "err": err, }).Error("failed to update last_checked_at for account") - - delivery.Reject() return } @@ -320,8 +318,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { c.logger.WithFields(logrus.Fields{ "accountID": id, }).Debug("no new messages, bailing early") - - delivery.Ack() return } @@ -339,7 +335,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { "accountID": id, "err": err, }).Error("failed to update last_message_id for account") - delivery.Reject() return } @@ -348,7 +343,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { c.logger.WithFields(logrus.Fields{ "accountID": delivery.Payload(), }).Debug("populating first message ID to prevent spamming") - delivery.Ack() return } @@ -364,8 +358,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { "accountID": id, "err": err, }).Error("failed to fetch account devices") - - delivery.Reject() return } defer rows.Close() @@ -407,8 +399,6 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { } } - delivery.Ack() - c.logger.WithFields(logrus.Fields{ "accountID": delivery.Payload(), }).Debug("finishing job")