change locking around

This commit is contained in:
Andre Medeiros 2021-07-08 21:44:06 -04:00
parent 83fb4cdcc7
commit 3d8492e757
2 changed files with 21 additions and 4 deletions

View file

@ -158,13 +158,22 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.P
skipped := 0 skipped := 0
for _, id := range ids { for _, id := range ids {
payload := fmt.Sprintf("%d", id) payload := fmt.Sprintf("%d", id)
if _, err := redisConn.HGet(ctx, "locks:accounts", payload).Result(); err != redis.Nil { lockKey := fmt.Sprintf("locks:accounts:%s", payload)
_, err := redisConn.Get(ctx, lockKey).Result()
if err == nil || err == redis.Nil {
skipped++ skipped++
continue continue
} else {
logger.WithFields(logrus.Fields{
"lockKey": lockKey,
"err": err,
}).Error("failed to check for account lock")
} }
if err := redisConn.HSet(ctx, "locks:accounts", payload, true).Err(); err != nil { if err := redisConn.SetEX(ctx, lockKey, true, 60*time.Second).Err(); err != nil {
logger.WithFields(logrus.Fields{ logger.WithFields(logrus.Fields{
"lockKey": lockKey,
"err": err, "err": err,
}).Error("failed to lock account") }).Error("failed to lock account")
} }

View file

@ -185,7 +185,13 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
ctx := context.Background() ctx := context.Background()
defer func() { defer func() {
c.redis.HDel(ctx, "locks:accounts", delivery.Payload()).Err() lockKey := fmt.Sprintf("locks:accounts:%s", delivery.Payload())
if err := c.redis.Del(ctx, lockKey).Err(); err != nil {
c.logger.WithFields(logrus.Fields{
"lockKey": lockKey,
"err": err,
}).Error("failed to remove lock")
}
}() }()
c.logger.WithFields(logrus.Fields{ c.logger.WithFields(logrus.Fields{
@ -207,6 +213,7 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
stmt := `SELECT stmt := `SELECT
id, id,
username,
access_token, access_token,
refresh_token, refresh_token,
expires_at, expires_at,
@ -217,6 +224,7 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
account := &data.Account{} account := &data.Account{}
if err := c.pool.QueryRow(ctx, stmt, id).Scan( if err := c.pool.QueryRow(ctx, stmt, id).Scan(
&account.ID, &account.ID,
&account.Username,
&account.AccessToken, &account.AccessToken,
&account.RefreshToken, &account.RefreshToken,
&account.ExpiresAt, &account.ExpiresAt,