use faktory pool

This commit is contained in:
Andre Medeiros 2022-11-02 01:56:13 -04:00
parent a442d30d05
commit 2b81f4c59f
2 changed files with 40 additions and 11 deletions

View file

@ -52,7 +52,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
} }
defer redis.Close() defer redis.Close()
fc, err := cmdutil.NewFaktoryClient(logger) fp, err := cmdutil.NewFaktoryPool(8)
if err != nil { if err != nil {
return err return err
} }
@ -66,12 +66,12 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
s := gocron.NewScheduler(time.UTC) s := gocron.NewScheduler(time.UTC)
s.SetMaxConcurrentJobs(8, gocron.WaitMode) s.SetMaxConcurrentJobs(8, gocron.WaitMode)
eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, fc) }) eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, fp) })
eaj.SingletonMode() eaj.SingletonMode()
_, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, fc) }) _, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, fp) })
_, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, fc) }) _, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, fp) })
_, _ = s.Every(5).Seconds().Do(func() { enqueueStuckAccounts(ctx, logger, statsd, db, fc) }) _, _ = s.Every(5).Seconds().Do(func() { enqueueStuckAccounts(ctx, logger, statsd, db, fp) })
_, _ = s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, db) }) _, _ = s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, db) })
//_, _ = s.Every(1).Minute().Do(func() { pruneAccounts(ctx, logger, db) }) //_, _ = s.Every(1).Minute().Do(func() { pruneAccounts(ctx, logger, db) })
//_, _ = s.Every(1).Minute().Do(func() { pruneDevices(ctx, logger, db) }) //_, _ = s.Every(1).Minute().Do(func() { pruneDevices(ctx, logger, db) })
@ -112,7 +112,7 @@ func evalScript(ctx context.Context, redis *redis.Client) (string, error) {
return redis.ScriptLoad(ctx, lua).Result() return redis.ScriptLoad(ctx, lua).Result()
} }
func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, fc *faktory.Client) { func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, fp *faktory.Pool) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@ -166,6 +166,13 @@ func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpoo
jobs[i] = faktory.NewJob("LiveActivityJob", tok) jobs[i] = faktory.NewJob("LiveActivityJob", tok)
} }
fc, err := fp.Get()
if err != nil {
logger.Error("failed to get faktory client", zap.Error(err))
return
}
defer fp.Put(fc)
if _, err = fc.PushBulk(jobs); err != nil { if _, err = fc.PushBulk(jobs); err != nil {
logger.Error("failed to enqueue live activity batch", zap.Error(err)) logger.Error("failed to enqueue live activity batch", zap.Error(err))
} }
@ -240,7 +247,7 @@ func reportStats(ctx context.Context, logger *zap.Logger, statsd *statsd.Client,
} }
} }
func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, fc *faktory.Client) { func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, fp *faktory.Pool) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@ -295,12 +302,20 @@ func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.C
jobs = append(jobs, faktory.NewJob("SubredditWatcherJob", id)) jobs = append(jobs, faktory.NewJob("SubredditWatcherJob", id))
jobs = append(jobs, faktory.NewJob("SubredditTrendingJob", id)) jobs = append(jobs, faktory.NewJob("SubredditTrendingJob", id))
} }
fc, err := fp.Get()
if err != nil {
logger.Error("failed to get faktory client", zap.Error(err))
return
}
defer fp.Put(fc)
if _, err := fc.PushBulk(jobs); err != nil { if _, err := fc.PushBulk(jobs); err != nil {
logger.Error("failed to enqueue subreddit batch", zap.Error(err)) logger.Error("failed to enqueue subreddit batch", zap.Error(err))
} }
} }
func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, fc *faktory.Client) { func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, fp *faktory.Pool) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@ -351,12 +366,19 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats
jobs[i] = faktory.NewJob("StuckNotificationsJob", id) jobs[i] = faktory.NewJob("StuckNotificationsJob", id)
} }
fc, err := fp.Get()
if err != nil {
logger.Error("failed to get faktory client", zap.Error(err))
return
}
defer fp.Put(fc)
if _, err = fc.PushBulk(jobs); err != nil { if _, err = fc.PushBulk(jobs); err != nil {
logger.Error("failed to enqueue stuck account batch", zap.Error(err)) logger.Error("failed to enqueue stuck account batch", zap.Error(err))
} }
} }
func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, fc *faktory.Client) { func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, fp *faktory.Pool) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@ -432,6 +454,13 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli
jobs[i] = faktory.NewJob("NotificationCheckJob", id) jobs[i] = faktory.NewJob("NotificationCheckJob", id)
} }
fc, err := fp.Get()
if err != nil {
logger.Error("failed to get faktory client", zap.Error(err))
return
}
defer fp.Put(fc)
if _, err = fc.PushBulk(jobs); err != nil { if _, err = fc.PushBulk(jobs); err != nil {
logger.Error("failed to enqueue stuck account batch", zap.Error(err)) logger.Error("failed to enqueue stuck account batch", zap.Error(err))
} }

View file

@ -74,6 +74,6 @@ func NewDatabasePool(ctx context.Context, maxConns int) (*pgxpool.Pool, error) {
return pgxpool.ConnectConfig(ctx, config) return pgxpool.ConnectConfig(ctx, config)
} }
func NewFaktoryClient(logger *zap.Logger) (*faktory.Client, error) { func NewFaktoryPool(maxConns int) (*faktory.Pool, error) {
return faktory.Open() return faktory.NewPool(maxConns)
} }