rejig sql

This commit is contained in:
Andre Medeiros 2021-07-08 22:09:14 -04:00
parent 9c70cee166
commit d65bab7970

View file

@ -9,6 +9,7 @@ import (
"syscall"
"time"
"github.com/DataDog/datadog-go/statsd"
"github.com/adjust/rmq/v4"
"github.com/go-co-op/gocron"
"github.com/go-redis/redis/v8"
@ -39,6 +40,13 @@ func main() {
}
}
statsd, err := statsd.New("127.0.0.1:8125")
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed to set up stats")
}
// Set up Postgres connection
var pool *pgxpool.Pool
{
@ -89,7 +97,7 @@ func main() {
}
s := gocron.NewScheduler(time.UTC)
s.Every(1).Second().Do(func() { enqueueAccounts(ctx, logger, pool, redisConn, notificationsQueue) })
s.Every(1).Second().Do(func() { enqueueAccounts(ctx, logger, statsd, pool, redisConn, notificationsQueue) })
s.StartAsync()
signals := make(chan os.Signal, 1)
@ -106,11 +114,16 @@ func main() {
s.Stop()
}
func enqueueAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool, redisConn *redis.Client, queue rmq.Queue) {
func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, queue rmq.Queue) {
now := float64(time.Now().UnixNano()/int64(time.Millisecond)) / 1000
start := time.Now().Unix()
// Start looking for accounts that were last checked at least 5 seconds ago
// and at most 6 seconds ago. Also look for accounts that haven't been checked
// in over a minute.
ts := time.Now().Unix()
start := ts - 6
end := start + 1
expired := ts - 60
ids := []int64{}
@ -120,15 +133,15 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.P
SELECT id
FROM accounts
WHERE
(last_checked_at + 5) BETWEEN $1 and $2
OR last_checked_at + 60 <= $1
last_checked_at BETWEEN $1 AND $2
OR last_checked_at < $3
ORDER BY last_checked_at
)
UPDATE accounts
SET last_enqueued_at = $3
SET last_enqueued_at = $4
WHERE accounts.id IN(SELECT id FROM account)
RETURNING accounts.id`
rows, err := tx.Query(ctx, stmt, start, end, now)
rows, err := tx.Query(ctx, stmt, start, end, expired, now)
if err != nil {
return err
}
@ -156,6 +169,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.P
enqueued := 0
skipped := 0
failed := 0
for _, id := range ids {
payload := fmt.Sprintf("%d", id)
lockKey := fmt.Sprintf("locks:accounts:%s", payload)
@ -178,13 +192,25 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.P
}).Error("failed to lock account")
}
enqueued++
_ = queue.Publish(payload)
if err = queue.Publish(payload); err != nil {
logger.WithFields(logrus.Fields{
"accountID": payload,
"err": err,
}).Error("failed to enqueue account")
failed++
} else {
enqueued++
}
}
statsd.Histogram("apollo.queue.enqueued", float64(enqueued), []string{}, 1)
statsd.Histogram("apollo.queue.skipped", float64(skipped), []string{}, 1)
statsd.Histogram("apollo.queue.failed", float64(failed), []string{}, 1)
logger.WithFields(logrus.Fields{
"count": enqueued,
"skipped": skipped,
"failed": failed,
"start": start,
"end": end,
}).Info("done enqueueing account batch")