tweak params

This commit is contained in:
Andre Medeiros 2021-07-13 20:09:44 -04:00
parent 47ee2a6e52
commit 55c45592ac
6 changed files with 26 additions and 22 deletions

View file

@ -1,3 +1,3 @@
web: apollo api
scheduler: apollo scheduler
worker-notifications: apollo worker --queue notifications --multiplier 12
worker-notifications: apollo worker --queue notifications --multiplier 32

View file

@ -28,6 +28,7 @@ func NewAPI(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, d
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
statsd,
16,
)
models := data.NewModels(ctx, db)

View file

@ -62,8 +62,8 @@ func WorkerCmd(ctx context.Context) *cobra.Command {
consumers := runtime.NumCPU() * multiplier
worker := workerFn(logger, statsd, db, redis, queue)
worker.Start(consumers)
worker := workerFn(logger, statsd, db, redis, queue, consumers)
worker.Start()
<-ctx.Done()

View file

@ -50,7 +50,7 @@ func PostIDFromContext(context string) string {
return ""
}
func NewClient(id, secret string, statsd *statsd.Client) *Client {
func NewClient(id, secret string, statsd *statsd.Client, connLimit int) *Client {
tracer := &httptrace.ClientTrace{
GotConn: func(info httptrace.GotConnInfo) {
if info.Reused {
@ -66,9 +66,10 @@ func NewClient(id, secret string, statsd *statsd.Client) *Client {
}
t := http.DefaultTransport.(*http.Transport).Clone()
t.MaxIdleConns = 128
t.MaxConnsPerHost = 512
t.MaxIdleConnsPerHost = 128
t.MaxIdleConns = connLimit / 4
t.MaxConnsPerHost = connLimit
t.MaxIdleConnsPerHost = connLimit / 4
t.IdleConnTimeout = 10 * time.Second
client := &http.Client{Transport: t}

View file

@ -35,13 +35,15 @@ type notificationsWorker struct {
queue rmq.Connection
reddit *reddit.Client
apns *token.Token
consumers int
}
func NewNotificationsWorker(logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection) Worker {
func NewNotificationsWorker(logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
statsd,
consumers,
)
var apns *token.Token
@ -66,20 +68,21 @@ func NewNotificationsWorker(logger *logrus.Logger, statsd *statsd.Client, db *pg
queue,
reddit,
apns,
consumers,
}
}
func (nw *notificationsWorker) Start(consumers int) error {
func (nw *notificationsWorker) Start() error {
queue, err := nw.queue.OpenQueue("notifications")
if err != nil {
return err
}
nw.logger.WithFields(logrus.Fields{
"numConsumers": consumers,
"numConsumers": nw.consumers,
}).Info("starting up notifications worker")
prefetchLimit := int64(consumers * 2)
prefetchLimit := int64(nw.consumers * 2)
if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil {
return err
@ -87,7 +90,7 @@ func (nw *notificationsWorker) Start(consumers int) error {
host, _ := os.Hostname()
for i := 0; i < consumers; i++ {
for i := 0; i < nw.consumers; i++ {
name := fmt.Sprintf("consumer %s-%d", host, i)
consumer := NewNotificationsConsumer(nw, i)

View file

@ -8,9 +8,8 @@ import (
"github.com/sirupsen/logrus"
)
type NewWorkerFn func(logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection) Worker
type NewWorkerFn func(logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker
type Worker interface {
Start(int) error
Start() error
Stop()
}