From 3d8492e757863d223ee3d1e2f45cd1dbd91ee8fb Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Thu, 8 Jul 2021 21:44:06 -0400 Subject: [PATCH] change locking around --- cmd/apollo-scheduler/main.go | 15 ++++++++++++--- cmd/apollo-worker-notifications/main.go | 10 +++++++++- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/cmd/apollo-scheduler/main.go b/cmd/apollo-scheduler/main.go index 105f461..33a98f3 100644 --- a/cmd/apollo-scheduler/main.go +++ b/cmd/apollo-scheduler/main.go @@ -158,14 +158,23 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.P skipped := 0 for _, id := range ids { payload := fmt.Sprintf("%d", id) - if _, err := redisConn.HGet(ctx, "locks:accounts", payload).Result(); err != redis.Nil { + lockKey := fmt.Sprintf("locks:accounts:%s", payload) + + _, err := redisConn.Get(ctx, lockKey).Result() + if err == nil || err == redis.Nil { skipped++ continue + } else { + logger.WithFields(logrus.Fields{ + "lockKey": lockKey, + "err": err, + }).Error("failed to check for account lock") } - if err := redisConn.HSet(ctx, "locks:accounts", payload, true).Err(); err != nil { + if err := redisConn.SetEX(ctx, lockKey, true, 60*time.Second).Err(); err != nil { logger.WithFields(logrus.Fields{ - "err": err, + "lockKey": lockKey, + "err": err, }).Error("failed to lock account") } diff --git a/cmd/apollo-worker-notifications/main.go b/cmd/apollo-worker-notifications/main.go index 1129562..c389fd9 100644 --- a/cmd/apollo-worker-notifications/main.go +++ b/cmd/apollo-worker-notifications/main.go @@ -185,7 +185,13 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { ctx := context.Background() defer func() { - c.redis.HDel(ctx, "locks:accounts", delivery.Payload()).Err() + lockKey := fmt.Sprintf("locks:accounts:%s", delivery.Payload()) + if err := c.redis.Del(ctx, lockKey).Err(); err != nil { + c.logger.WithFields(logrus.Fields{ + "lockKey": lockKey, + "err": err, + }).Error("failed to remove lock") + } }() c.logger.WithFields(logrus.Fields{ @@ -207,6 +213,7 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { stmt := `SELECT id, + username, access_token, refresh_token, expires_at, @@ -217,6 +224,7 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { account := &data.Account{} if err := c.pool.QueryRow(ctx, stmt, id).Scan( &account.ID, + &account.Username, &account.AccessToken, &account.RefreshToken, &account.ExpiresAt,