From 1dd0c3a47dbd628dd18565600b3f9573602a395e Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sat, 1 Apr 2023 11:57:28 -0400 Subject: [PATCH 1/5] first pass at distributed lock --- .github/workflows/test.yml | 11 ++- .golangci.yml | 8 +- Makefile | 3 +- internal/distributedlock/distributed_lock.go | 72 ++++++++++++++++++ .../distributedlock/distributed_lock_test.go | 73 +++++++++++++++++++ internal/distributedlock/errors.go | 9 +++ internal/distributedlock/lock.go | 45 ++++++++++++ 7 files changed, 212 insertions(+), 9 deletions(-) create mode 100644 internal/distributedlock/distributed_lock.go create mode 100644 internal/distributedlock/distributed_lock_test.go create mode 100644 internal/distributedlock/errors.go create mode 100644 internal/distributedlock/lock.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1b92535..b2e218e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,6 +5,7 @@ jobs: runs-on: ubuntu-latest env: DATABASE_URL: postgres://postgres:postgres@localhost/apollo_test + REDIS_URL: redis://redis:6379 services: postgres: image: postgres @@ -18,7 +19,15 @@ jobs: --health-retries 5 ports: - 5432:5432 - + redis: + image: redis + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 6379:6379 steps: - uses: actions/checkout@v3 - uses: actions/setup-go@v3 diff --git a/.golangci.yml b/.golangci.yml index f74fa4e..5d9c923 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -17,11 +17,5 @@ linters: - thelper # detects golang test helpers without t.Helper() call and checks consistency of test helpers - unconvert # removes unnecessary type conversions - unparam # removes unused function parameters + - paralleltest fast: true - -issues: - exclude-rules: - # False positive: https://github.com/kunwardeep/paralleltest/issues/8. - - linters: - - paralleltest - text: "does not use range value in test Run" diff --git a/Makefile b/Makefile index 90780ea..e3eac97 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,9 @@ BREW_PREFIX ?= $(shell brew --prefix) DATABASE_URL ?= "postgres://$(USER)@localhost/apollo_test?sslmode=disable" +REDIS_URL ?= "redis://localhost:6379" test: - @DATABASE_URL=$(DATABASE_URL) go test -race -timeout 1s ./... + @DATABASE_URL=$(DATABASE_URL) REDIS_URL=$(REDIS_URL) go test -race -timeout 1s ./... test-setup: $(BREW_PREFIX)/bin/migrate migrate -path migrations/ -database $(DATABASE_URL) up diff --git a/internal/distributedlock/distributed_lock.go b/internal/distributedlock/distributed_lock.go new file mode 100644 index 0000000..2525b29 --- /dev/null +++ b/internal/distributedlock/distributed_lock.go @@ -0,0 +1,72 @@ +package distributedlock + +import ( + "context" + "fmt" + "github.com/go-redis/redis/v8" + "math/rand" + "time" +) + +const lockTopicFormat = "pubsub:locks:%s" + +type DistributedLock struct { + client *redis.Client + timeout time.Duration +} + +func New(client *redis.Client, timeout time.Duration) *DistributedLock { + return &DistributedLock{ + client: client, + timeout: timeout, + } +} + +func (d *DistributedLock) setLock(ctx context.Context, key string, uid string) error { + result, err := d.client.SetNX(ctx, key, uid, d.timeout).Result() + if err != nil { + return err + } + + if !result { + return ErrLockAlreadyAcquired + } + + return nil +} + +func (d *DistributedLock) AcquireLock(ctx context.Context, key string) (*Lock, error) { + uid := generateUniqueID() + if err := d.setLock(ctx, key, uid); err != nil { + return nil, err + } + + return NewLock(d, key, uid), nil +} + +func (d *DistributedLock) WaitAcquireLock(ctx context.Context, key string, timeout time.Duration) (*Lock, error) { + uid := generateUniqueID() + if err := d.setLock(ctx, key, uid); err == nil { + return NewLock(d, key, uid), nil + } + + ch := fmt.Sprintf(lockTopicFormat, key) + pubsub := d.client.Subscribe(ctx, ch) + + select { + case <-time.After(timeout): + return nil, ErrLockAcquisitionTimeout + case <-ctx.Done(): + return nil, ctx.Err() + case <-pubsub.Channel(): + err := d.setLock(ctx, key, uid) + if err != nil { + return nil, err + } + return NewLock(d, key, uid), nil + } +} + +func generateUniqueID() string { + return fmt.Sprintf("%d-%d", time.Now().UnixNano(), rand.Int63()) +} diff --git a/internal/distributedlock/distributed_lock_test.go b/internal/distributedlock/distributed_lock_test.go new file mode 100644 index 0000000..b0bfdfb --- /dev/null +++ b/internal/distributedlock/distributed_lock_test.go @@ -0,0 +1,73 @@ +package distributedlock_test + +import ( + "context" + "fmt" + "github.com/christianselig/apollo-backend/internal/distributedlock" + "github.com/go-redis/redis/v8" + "github.com/stretchr/testify/assert" + "os" + "testing" + "time" +) + +func NewRedisClient(t *testing.T, ctx context.Context) (*redis.Client, func()) { + t.Helper() + + opt, err := redis.ParseURL(os.Getenv("REDIS_URL")) + if err != nil { + panic(err) + } + client := redis.NewClient(opt) + if err := client.Ping(ctx).Err(); err != nil { + panic(err) + } + + return client, func() { + _ = client.Close() + } +} + +func TestDistributedLock_AcquireLock(t *testing.T) { + ctx := context.Background() + key := fmt.Sprintf("%d", time.Now().UnixNano()) + + client, closer := NewRedisClient(t, ctx) + defer closer() + + d := distributedlock.New(client, 10*time.Second) + lock, err := d.AcquireLock(ctx, key) + assert.NoError(t, err) + + _, err = d.AcquireLock(ctx, key) + assert.Equal(t, distributedlock.ErrLockAlreadyAcquired, err) + + err = lock.Release(ctx) + assert.NoError(t, err) + + _, err = d.AcquireLock(ctx, key) + assert.NoError(t, err) +} + +func TestDistributedLock_WaitAcquireLock(t *testing.T) { + ctx := context.Background() + key := fmt.Sprintf("%d", time.Now().UnixNano()) + + client, closer := NewRedisClient(t, ctx) + defer closer() + + d := distributedlock.New(client, 10*time.Second) + lock, err := d.AcquireLock(ctx, key) + assert.NoError(t, err) + + go func(l *distributedlock.Lock) { + select { + case <-time.After(100 * time.Millisecond): + _ = l.Release(ctx) + } + }(lock) + + lock, err = d.WaitAcquireLock(ctx, key, 5*time.Second) + assert.NoError(t, err) + assert.NotNil(t, lock) +} diff --git a/internal/distributedlock/errors.go b/internal/distributedlock/errors.go new file mode 100644 index 0000000..7f0bb46 --- /dev/null +++ b/internal/distributedlock/errors.go @@ -0,0 +1,9 @@ +package distributedlock + +import "errors" + +var ( + ErrLockAcquisitionTimeout = errors.New("timed out acquiring lock") + ErrLockAlreadyAcquired = errors.New("lock already acquired") + ErrLockExpired = errors.New("releasing an expired lock") +) diff --git a/internal/distributedlock/lock.go b/internal/distributedlock/lock.go new file mode 100644 index 0000000..3a49f39 --- /dev/null +++ b/internal/distributedlock/lock.go @@ -0,0 +1,45 @@ +package distributedlock + +import ( + "context" + "fmt" +) + +type Lock struct { + distributedLock *DistributedLock + key string + uid string +} + +func NewLock(distributedLock *DistributedLock, key string, uid string) *Lock { + return &Lock{ + distributedLock: distributedLock, + key: key, + uid: uid, + } +} + +func (l *Lock) Release(ctx context.Context) error { + script := ` + if redis.call("GET", KEYS[1]) == ARGV[1] then + redis.call("DEL", KEYS[1]) + redis.call("PUBLISH", KEYS[2], KEYS[1]) + return 1 + else + return 0 + end + ` + + ch := fmt.Sprintf(lockTopicFormat, l.key) + + result, err := l.distributedLock.client.Eval(ctx, script, []string{l.key, ch}, l.uid).Result() + if err != nil { + return err + } + + if result == int64(0) { + return ErrLockExpired + } + + return nil +} From 9709c3a099dcfa1d7c99bd9bcdbb6faf5a535d3d Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sat, 1 Apr 2023 12:07:48 -0400 Subject: [PATCH 2/5] cache lock release function --- internal/distributedlock/distributed_lock.go | 27 ++++++++++++++++--- .../distributedlock/distributed_lock_test.go | 25 ++++++++++++----- internal/distributedlock/lock.go | 11 +------- 3 files changed, 42 insertions(+), 21 deletions(-) diff --git a/internal/distributedlock/distributed_lock.go b/internal/distributedlock/distributed_lock.go index 2525b29..bae898a 100644 --- a/internal/distributedlock/distributed_lock.go +++ b/internal/distributedlock/distributed_lock.go @@ -3,23 +3,42 @@ package distributedlock import ( "context" "fmt" - "github.com/go-redis/redis/v8" "math/rand" "time" + + "github.com/go-redis/redis/v8" ) -const lockTopicFormat = "pubsub:locks:%s" +const ( + lockTopicFormat = "pubsub:locks:%s" + lockReleaseScript = ` + if redis.call("GET", KEYS[1]) == ARGV[1] then + redis.call("DEL", KEYS[1]) + redis.call("PUBLISH", KEYS[2], KEYS[1]) + return 1 + else + return 0 + end + ` +) type DistributedLock struct { client *redis.Client + sha string timeout time.Duration } -func New(client *redis.Client, timeout time.Duration) *DistributedLock { +func New(client *redis.Client, timeout time.Duration) (*DistributedLock, error) { + sha, err := client.ScriptLoad(context.Background(), lockReleaseScript).Result() + if err != nil { + return nil, err + } + return &DistributedLock{ client: client, + sha: sha, timeout: timeout, - } + }, nil } func (d *DistributedLock) setLock(ctx context.Context, key string, uid string) error { diff --git a/internal/distributedlock/distributed_lock_test.go b/internal/distributedlock/distributed_lock_test.go index b0bfdfb..3d8211f 100644 --- a/internal/distributedlock/distributed_lock_test.go +++ b/internal/distributedlock/distributed_lock_test.go @@ -3,12 +3,15 @@ package distributedlock_test import ( "context" "fmt" - "github.com/christianselig/apollo-backend/internal/distributedlock" - "github.com/go-redis/redis/v8" - "github.com/stretchr/testify/assert" + "math/rand" "os" "testing" "time" + + "github.com/christianselig/apollo-backend/internal/distributedlock" + + "github.com/go-redis/redis/v8" + "github.com/stretchr/testify/assert" ) func NewRedisClient(t *testing.T, ctx context.Context) (*redis.Client, func()) { @@ -29,13 +32,17 @@ func NewRedisClient(t *testing.T, ctx context.Context) (*redis.Client, func()) { } func TestDistributedLock_AcquireLock(t *testing.T) { + t.Parallel() + ctx := context.Background() - key := fmt.Sprintf("%d", time.Now().UnixNano()) + key := fmt.Sprintf("key:%d-%d", time.Now().UnixNano(), rand.Int63()) client, closer := NewRedisClient(t, ctx) defer closer() - d := distributedlock.New(client, 10*time.Second) + d, err := distributedlock.New(client, 10*time.Second) + assert.NoError(t, err) + lock, err := d.AcquireLock(ctx, key) assert.NoError(t, err) @@ -50,13 +57,17 @@ func TestDistributedLock_AcquireLock(t *testing.T) { } func TestDistributedLock_WaitAcquireLock(t *testing.T) { + t.Parallel() + ctx := context.Background() - key := fmt.Sprintf("%d", time.Now().UnixNano()) + key := fmt.Sprintf("key:%d-%d", time.Now().UnixNano(), rand.Int63()) client, closer := NewRedisClient(t, ctx) defer closer() - d := distributedlock.New(client, 10*time.Second) + d, err := distributedlock.New(client, 10*time.Second) + assert.NoError(t, err) + lock, err := d.AcquireLock(ctx, key) assert.NoError(t, err) diff --git a/internal/distributedlock/lock.go b/internal/distributedlock/lock.go index 3a49f39..005730e 100644 --- a/internal/distributedlock/lock.go +++ b/internal/distributedlock/lock.go @@ -20,19 +20,10 @@ func NewLock(distributedLock *DistributedLock, key string, uid string) *Lock { } func (l *Lock) Release(ctx context.Context) error { - script := ` - if redis.call("GET", KEYS[1]) == ARGV[1] then - redis.call("DEL", KEYS[1]) - redis.call("PUBLISH", KEYS[2], KEYS[1]) - return 1 - else - return 0 - end - ` ch := fmt.Sprintf(lockTopicFormat, l.key) - result, err := l.distributedLock.client.Eval(ctx, script, []string{l.key, ch}, l.uid).Result() + result, err := l.distributedLock.client.EvalSha(ctx, l.distributedLock.sha, []string{l.key, ch}, l.uid).Result() if err != nil { return err } From 4bcc7f8c1b637ca3ed5c1b7e99315ef0e2081e54 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sat, 1 Apr 2023 12:10:08 -0400 Subject: [PATCH 3/5] fix redis image hostname --- .github/workflows/test.yml | 2 +- .idea/.gitignore | 8 ++++++++ .idea/apollo-backend.iml | 9 +++++++++ .idea/codeStyles/codeStyleConfig.xml | 5 +++++ .idea/modules.xml | 8 ++++++++ .idea/vcs.xml | 6 ++++++ 6 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/apollo-backend.iml create mode 100644 .idea/codeStyles/codeStyleConfig.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b2e218e..a65a5d4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,7 +5,7 @@ jobs: runs-on: ubuntu-latest env: DATABASE_URL: postgres://postgres:postgres@localhost/apollo_test - REDIS_URL: redis://redis:6379 + REDIS_URL: redis://localhost:6379 services: postgres: image: postgres diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/apollo-backend.iml b/.idea/apollo-backend.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/apollo-backend.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml new file mode 100644 index 0000000..a55e7a1 --- /dev/null +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -0,0 +1,5 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..fa496e0 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file From f3e6d27372a185319dc5413cfe292c5e71ee46cd Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sat, 1 Apr 2023 12:12:18 -0400 Subject: [PATCH 4/5] bump golang version --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a65a5d4..1936daa 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -32,7 +32,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v3 with: - go-version: 1.19.3 + go-version: 1.20.2 - uses: golangci/golangci-lint-action@v3 - run: psql -f docs/schema.sql $DATABASE_URL - run: go test ./... -v -race -timeout 5s From 2dec2ff90e6a03fd68741a170e65d74364f39831 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sat, 1 Apr 2023 12:19:56 -0400 Subject: [PATCH 5/5] close pubsub --- internal/distributedlock/distributed_lock.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/distributedlock/distributed_lock.go b/internal/distributedlock/distributed_lock.go index bae898a..b989aea 100644 --- a/internal/distributedlock/distributed_lock.go +++ b/internal/distributedlock/distributed_lock.go @@ -71,6 +71,7 @@ func (d *DistributedLock) WaitAcquireLock(ctx context.Context, key string, timeo ch := fmt.Sprintf(lockTopicFormat, key) pubsub := d.client.Subscribe(ctx, ch) + defer func() { _ = pubsub.Close() }() select { case <-time.After(timeout):