mirror of
https://github.com/christianselig/apollo-backend
synced 2024-11-25 13:17:42 +00:00
defer func on stats for scheduler
This commit is contained in:
parent
7d41c4ae3a
commit
70d73eab4c
1 changed files with 31 additions and 28 deletions
|
@ -227,10 +227,15 @@ func reportStats(ctx context.Context, logger *logrus.Logger, statsd *statsd.Clie
|
||||||
|
|
||||||
func enqueueUsers(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) {
|
func enqueueUsers(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
ready := now.Unix() - userEnqueueInterval
|
|
||||||
|
|
||||||
ids := []int64{}
|
ids := []int64{}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
tags := []string{"queue:users"}
|
||||||
|
_ = statsd.Histogram("apollo.queue.enqueued", float64(len(ids)), tags, 1)
|
||||||
|
_ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ready := now.Unix() - userEnqueueInterval
|
||||||
err := pool.BeginFunc(ctx, func(tx pgx.Tx) error {
|
err := pool.BeginFunc(ctx, func(tx pgx.Tx) error {
|
||||||
stmt := `
|
stmt := `
|
||||||
WITH userb AS (
|
WITH userb AS (
|
||||||
|
@ -283,18 +288,19 @@ func enqueueUsers(ctx context.Context, logger *logrus.Logger, statsd *statsd.Cli
|
||||||
"err": err,
|
"err": err,
|
||||||
}).Error("failed to enqueue user")
|
}).Error("failed to enqueue user")
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := []string{"queue:users"}
|
|
||||||
_ = statsd.Histogram("apollo.queue.enqueued", float64(len(ids)), tags, 1)
|
|
||||||
_ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func enqueueSubreddits(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queues []rmq.Queue) {
|
func enqueueSubreddits(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queues []rmq.Queue) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
ready := now.Unix() - subredditEnqueueInterval
|
|
||||||
|
|
||||||
ids := []int64{}
|
ids := []int64{}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
tags := []string{"queue:subreddits"}
|
||||||
|
_ = statsd.Histogram("apollo.queue.enqueued", float64(len(ids)), tags, 1)
|
||||||
|
_ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ready := now.Unix() - subredditEnqueueInterval
|
||||||
err := pool.BeginFunc(ctx, func(tx pgx.Tx) error {
|
err := pool.BeginFunc(ctx, func(tx pgx.Tx) error {
|
||||||
stmt := `
|
stmt := `
|
||||||
WITH subreddit AS (
|
WITH subreddit AS (
|
||||||
|
@ -351,17 +357,19 @@ func enqueueSubreddits(ctx context.Context, logger *logrus.Logger, statsd *stats
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := []string{"queue:subreddits"}
|
|
||||||
_ = statsd.Histogram("apollo.queue.enqueued", float64(len(ids)), tags, 1)
|
|
||||||
_ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func enqueueStuckAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) {
|
func enqueueStuckAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
ready := now.Unix() - stuckAccountEnqueueInterval
|
|
||||||
|
|
||||||
ids := []int64{}
|
ids := []int64{}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
tags := []string{"queue:stuck-accounts"}
|
||||||
|
_ = statsd.Histogram("apollo.queue.enqueued", float64(len(ids)), tags, 1)
|
||||||
|
_ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ready := now.Unix() - stuckAccountEnqueueInterval
|
||||||
err := pool.BeginFunc(ctx, func(tx pgx.Tx) error {
|
err := pool.BeginFunc(ctx, func(tx pgx.Tx) error {
|
||||||
stmt := `
|
stmt := `
|
||||||
WITH account AS (
|
WITH account AS (
|
||||||
|
@ -416,14 +424,20 @@ func enqueueStuckAccounts(ctx context.Context, logger *logrus.Logger, statsd *st
|
||||||
"err": err,
|
"err": err,
|
||||||
}).Error("failed to enqueue stuck accounts")
|
}).Error("failed to enqueue stuck accounts")
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := []string{"queue:stuck-accounts"}
|
|
||||||
_ = statsd.Histogram("apollo.queue.enqueued", float64(len(ids)), tags, 1)
|
|
||||||
_ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) {
|
func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
ids := []int64{}
|
||||||
|
enqueued := 0
|
||||||
|
skipped := 0
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
tags := []string{"queue:notifications"}
|
||||||
|
_ = statsd.Histogram("apollo.queue.enqueued", float64(enqueued), tags, 1)
|
||||||
|
_ = statsd.Histogram("apollo.queue.skipped", float64(skipped), tags, 1)
|
||||||
|
_ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1)
|
||||||
|
}()
|
||||||
|
|
||||||
// Start looking for accounts that were last checked at least 5 seconds ago
|
// Start looking for accounts that were last checked at least 5 seconds ago
|
||||||
// and at most 6 seconds ago. Also look for accounts that haven't been checked
|
// and at most 6 seconds ago. Also look for accounts that haven't been checked
|
||||||
|
@ -432,8 +446,6 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
|
||||||
ready := ts - accountEnqueueInterval
|
ready := ts - accountEnqueueInterval
|
||||||
expired := ts - checkTimeout
|
expired := ts - checkTimeout
|
||||||
|
|
||||||
ids := []int64{}
|
|
||||||
|
|
||||||
err := pool.BeginFunc(ctx, func(tx pgx.Tx) error {
|
err := pool.BeginFunc(ctx, func(tx pgx.Tx) error {
|
||||||
stmt := `
|
stmt := `
|
||||||
WITH account AS (
|
WITH account AS (
|
||||||
|
@ -477,10 +489,6 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
|
||||||
"count": len(ids),
|
"count": len(ids),
|
||||||
"start": ready,
|
"start": ready,
|
||||||
}).Debug("enqueueing account batch")
|
}).Debug("enqueueing account batch")
|
||||||
|
|
||||||
enqueued := 0
|
|
||||||
skipped := 0
|
|
||||||
|
|
||||||
// Split ids in batches
|
// Split ids in batches
|
||||||
for i := 0; i < len(ids); i += batchSize {
|
for i := 0; i < len(ids); i += batchSize {
|
||||||
j := i + batchSize
|
j := i + batchSize
|
||||||
|
@ -521,11 +529,6 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := []string{"queue:notifications"}
|
|
||||||
_ = statsd.Histogram("apollo.queue.enqueued", float64(enqueued), tags, 1)
|
|
||||||
_ = statsd.Histogram("apollo.queue.skipped", float64(skipped), tags, 1)
|
|
||||||
_ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1)
|
|
||||||
|
|
||||||
logger.WithFields(logrus.Fields{
|
logger.WithFields(logrus.Fields{
|
||||||
"count": enqueued,
|
"count": enqueued,
|
||||||
"skipped": skipped,
|
"skipped": skipped,
|
||||||
|
|
Loading…
Reference in a new issue