diff --git a/Procfile b/Procfile index 328d20b..ffba5cb 100644 --- a/Procfile +++ b/Procfile @@ -1,3 +1,3 @@ web: apollo api scheduler: apollo scheduler -worker-notifications: apollo worker --queue notifications --multiplier 12 +worker-notifications: apollo worker --queue notifications --multiplier 32 diff --git a/internal/api/api.go b/internal/api/api.go index 9cbb023..d4f83df 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -28,6 +28,7 @@ func NewAPI(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, d os.Getenv("REDDIT_CLIENT_ID"), os.Getenv("REDDIT_CLIENT_SECRET"), statsd, + 16, ) models := data.NewModels(ctx, db) diff --git a/internal/cmd/worker.go b/internal/cmd/worker.go index dfac02c..fc13d5a 100644 --- a/internal/cmd/worker.go +++ b/internal/cmd/worker.go @@ -62,8 +62,8 @@ func WorkerCmd(ctx context.Context) *cobra.Command { consumers := runtime.NumCPU() * multiplier - worker := workerFn(logger, statsd, db, redis, queue) - worker.Start(consumers) + worker := workerFn(logger, statsd, db, redis, queue, consumers) + worker.Start() <-ctx.Done() diff --git a/internal/reddit/client.go b/internal/reddit/client.go index 7821fe0..31e9196 100644 --- a/internal/reddit/client.go +++ b/internal/reddit/client.go @@ -50,7 +50,7 @@ func PostIDFromContext(context string) string { return "" } -func NewClient(id, secret string, statsd *statsd.Client) *Client { +func NewClient(id, secret string, statsd *statsd.Client, connLimit int) *Client { tracer := &httptrace.ClientTrace{ GotConn: func(info httptrace.GotConnInfo) { if info.Reused { @@ -66,9 +66,10 @@ func NewClient(id, secret string, statsd *statsd.Client) *Client { } t := http.DefaultTransport.(*http.Transport).Clone() - t.MaxIdleConns = 128 - t.MaxConnsPerHost = 512 - t.MaxIdleConnsPerHost = 128 + t.MaxIdleConns = connLimit / 4 + t.MaxConnsPerHost = connLimit + t.MaxIdleConnsPerHost = connLimit / 4 + t.IdleConnTimeout = 10 * time.Second client := &http.Client{Transport: t} diff --git a/internal/worker/notifications.go b/internal/worker/notifications.go index 176e830..08ed73a 100644 --- a/internal/worker/notifications.go +++ b/internal/worker/notifications.go @@ -28,20 +28,22 @@ const ( ) type notificationsWorker struct { - logger *logrus.Logger - statsd *statsd.Client - db *pgxpool.Pool - redis *redis.Client - queue rmq.Connection - reddit *reddit.Client - apns *token.Token + logger *logrus.Logger + statsd *statsd.Client + db *pgxpool.Pool + redis *redis.Client + queue rmq.Connection + reddit *reddit.Client + apns *token.Token + consumers int } -func NewNotificationsWorker(logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection) Worker { +func NewNotificationsWorker(logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { reddit := reddit.NewClient( os.Getenv("REDDIT_CLIENT_ID"), os.Getenv("REDDIT_CLIENT_SECRET"), statsd, + consumers, ) var apns *token.Token @@ -66,20 +68,21 @@ func NewNotificationsWorker(logger *logrus.Logger, statsd *statsd.Client, db *pg queue, reddit, apns, + consumers, } } -func (nw *notificationsWorker) Start(consumers int) error { +func (nw *notificationsWorker) Start() error { queue, err := nw.queue.OpenQueue("notifications") if err != nil { return err } nw.logger.WithFields(logrus.Fields{ - "numConsumers": consumers, + "numConsumers": nw.consumers, }).Info("starting up notifications worker") - prefetchLimit := int64(consumers * 2) + prefetchLimit := int64(nw.consumers * 2) if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil { return err @@ -87,7 +90,7 @@ func (nw *notificationsWorker) Start(consumers int) error { host, _ := os.Hostname() - for i := 0; i < consumers; i++ { + for i := 0; i < nw.consumers; i++ { name := fmt.Sprintf("consumer %s-%d", host, i) consumer := NewNotificationsConsumer(nw, i) diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 3bb8ebf..6d89763 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -8,9 +8,8 @@ import ( "github.com/sirupsen/logrus" ) -type NewWorkerFn func(logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection) Worker - +type NewWorkerFn func(logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker type Worker interface { - Start(int) error + Start() error Stop() }