diff --git a/internal/cmd/scheduler.go b/internal/cmd/scheduler.go index 8005dce..97412c4 100644 --- a/internal/cmd/scheduler.go +++ b/internal/cmd/scheduler.go @@ -227,10 +227,15 @@ func reportStats(ctx context.Context, logger *logrus.Logger, statsd *statsd.Clie func enqueueUsers(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) { now := time.Now() - ready := now.Unix() - userEnqueueInterval - ids := []int64{} + defer func() { + tags := []string{"queue:users"} + _ = statsd.Histogram("apollo.queue.enqueued", float64(len(ids)), tags, 1) + _ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1) + }() + + ready := now.Unix() - userEnqueueInterval err := pool.BeginFunc(ctx, func(tx pgx.Tx) error { stmt := ` WITH userb AS ( @@ -283,18 +288,19 @@ func enqueueUsers(ctx context.Context, logger *logrus.Logger, statsd *statsd.Cli "err": err, }).Error("failed to enqueue user") } - - tags := []string{"queue:users"} - _ = statsd.Histogram("apollo.queue.enqueued", float64(len(ids)), tags, 1) - _ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1) } func enqueueSubreddits(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queues []rmq.Queue) { now := time.Now() - ready := now.Unix() - subredditEnqueueInterval - ids := []int64{} + defer func() { + tags := []string{"queue:subreddits"} + _ = statsd.Histogram("apollo.queue.enqueued", float64(len(ids)), tags, 1) + _ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1) + }() + + ready := now.Unix() - subredditEnqueueInterval err := pool.BeginFunc(ctx, func(tx pgx.Tx) error { stmt := ` WITH subreddit AS ( @@ -351,17 +357,19 @@ func enqueueSubreddits(ctx context.Context, logger *logrus.Logger, statsd *stats } } - tags := []string{"queue:subreddits"} - _ = statsd.Histogram("apollo.queue.enqueued", float64(len(ids)), tags, 1) - _ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1) } func enqueueStuckAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) { now := time.Now() - ready := now.Unix() - stuckAccountEnqueueInterval - ids := []int64{} + defer func() { + tags := []string{"queue:stuck-accounts"} + _ = statsd.Histogram("apollo.queue.enqueued", float64(len(ids)), tags, 1) + _ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1) + }() + + ready := now.Unix() - stuckAccountEnqueueInterval err := pool.BeginFunc(ctx, func(tx pgx.Tx) error { stmt := ` WITH account AS ( @@ -416,14 +424,20 @@ func enqueueStuckAccounts(ctx context.Context, logger *logrus.Logger, statsd *st "err": err, }).Error("failed to enqueue stuck accounts") } - - tags := []string{"queue:stuck-accounts"} - _ = statsd.Histogram("apollo.queue.enqueued", float64(len(ids)), tags, 1) - _ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1) } func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) { now := time.Now() + ids := []int64{} + enqueued := 0 + skipped := 0 + + defer func() { + tags := []string{"queue:notifications"} + _ = statsd.Histogram("apollo.queue.enqueued", float64(enqueued), tags, 1) + _ = statsd.Histogram("apollo.queue.skipped", float64(skipped), tags, 1) + _ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1) + }() // Start looking for accounts that were last checked at least 5 seconds ago // and at most 6 seconds ago. Also look for accounts that haven't been checked @@ -432,8 +446,6 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. ready := ts - accountEnqueueInterval expired := ts - checkTimeout - ids := []int64{} - err := pool.BeginFunc(ctx, func(tx pgx.Tx) error { stmt := ` WITH account AS ( @@ -477,10 +489,6 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. "count": len(ids), "start": ready, }).Debug("enqueueing account batch") - - enqueued := 0 - skipped := 0 - // Split ids in batches for i := 0; i < len(ids); i += batchSize { j := i + batchSize @@ -521,11 +529,6 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. } } - tags := []string{"queue:notifications"} - _ = statsd.Histogram("apollo.queue.enqueued", float64(enqueued), tags, 1) - _ = statsd.Histogram("apollo.queue.skipped", float64(skipped), tags, 1) - _ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1) - logger.WithFields(logrus.Fields{ "count": enqueued, "skipped": skipped,