diff --git a/internal/cmd/scheduler.go b/internal/cmd/scheduler.go index 998b2d4..00df4cb 100644 --- a/internal/cmd/scheduler.go +++ b/internal/cmd/scheduler.go @@ -52,7 +52,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command { } defer redis.Close() - fc, err := cmdutil.NewFaktoryClient(logger) + fp, err := cmdutil.NewFaktoryPool(8) if err != nil { return err } @@ -66,12 +66,12 @@ func SchedulerCmd(ctx context.Context) *cobra.Command { s := gocron.NewScheduler(time.UTC) s.SetMaxConcurrentJobs(8, gocron.WaitMode) - eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, fc) }) + eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, fp) }) eaj.SingletonMode() - _, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, fc) }) - _, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, fc) }) - _, _ = s.Every(5).Seconds().Do(func() { enqueueStuckAccounts(ctx, logger, statsd, db, fc) }) + _, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, fp) }) + _, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, fp) }) + _, _ = s.Every(5).Seconds().Do(func() { enqueueStuckAccounts(ctx, logger, statsd, db, fp) }) _, _ = s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, db) }) //_, _ = s.Every(1).Minute().Do(func() { pruneAccounts(ctx, logger, db) }) //_, _ = s.Every(1).Minute().Do(func() { pruneDevices(ctx, logger, db) }) @@ -112,7 +112,7 @@ func evalScript(ctx context.Context, redis *redis.Client) (string, error) { return redis.ScriptLoad(ctx, lua).Result() } -func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, fc *faktory.Client) { +func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, fp *faktory.Pool) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -166,6 +166,13 @@ func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpoo jobs[i] = faktory.NewJob("LiveActivityJob", tok) } + fc, err := fp.Get() + if err != nil { + logger.Error("failed to get faktory client", zap.Error(err)) + return + } + defer fp.Put(fc) + if _, err = fc.PushBulk(jobs); err != nil { logger.Error("failed to enqueue live activity batch", zap.Error(err)) } @@ -240,7 +247,7 @@ func reportStats(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, } } -func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, fc *faktory.Client) { +func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, fp *faktory.Pool) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -295,12 +302,20 @@ func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.C jobs = append(jobs, faktory.NewJob("SubredditWatcherJob", id)) jobs = append(jobs, faktory.NewJob("SubredditTrendingJob", id)) } + + fc, err := fp.Get() + if err != nil { + logger.Error("failed to get faktory client", zap.Error(err)) + return + } + defer fp.Put(fc) + if _, err := fc.PushBulk(jobs); err != nil { logger.Error("failed to enqueue subreddit batch", zap.Error(err)) } } -func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, fc *faktory.Client) { +func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, fp *faktory.Pool) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -351,12 +366,19 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats jobs[i] = faktory.NewJob("StuckNotificationsJob", id) } + fc, err := fp.Get() + if err != nil { + logger.Error("failed to get faktory client", zap.Error(err)) + return + } + defer fp.Put(fc) + if _, err = fc.PushBulk(jobs); err != nil { logger.Error("failed to enqueue stuck account batch", zap.Error(err)) } } -func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, fc *faktory.Client) { +func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, fp *faktory.Pool) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -432,6 +454,13 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli jobs[i] = faktory.NewJob("NotificationCheckJob", id) } + fc, err := fp.Get() + if err != nil { + logger.Error("failed to get faktory client", zap.Error(err)) + return + } + defer fp.Put(fc) + if _, err = fc.PushBulk(jobs); err != nil { logger.Error("failed to enqueue stuck account batch", zap.Error(err)) } diff --git a/internal/cmdutil/cmdutil.go b/internal/cmdutil/cmdutil.go index 49bce68..dd0bccf 100644 --- a/internal/cmdutil/cmdutil.go +++ b/internal/cmdutil/cmdutil.go @@ -74,6 +74,6 @@ func NewDatabasePool(ctx context.Context, maxConns int) (*pgxpool.Pool, error) { return pgxpool.ConnectConfig(ctx, config) } -func NewFaktoryClient(logger *zap.Logger) (*faktory.Client, error) { - return faktory.Open() +func NewFaktoryPool(maxConns int) (*faktory.Pool, error) { + return faktory.NewPool(maxConns) }