mirror of
https://github.com/christianselig/apollo-backend
synced 2024-11-15 00:17:42 +00:00
cache lock release function
This commit is contained in:
parent
1dd0c3a47d
commit
9709c3a099
3 changed files with 42 additions and 21 deletions
|
@ -3,23 +3,42 @@ package distributedlock
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/go-redis/redis/v8"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis/v8"
|
||||||
)
|
)
|
||||||
|
|
||||||
const lockTopicFormat = "pubsub:locks:%s"
|
const (
|
||||||
|
lockTopicFormat = "pubsub:locks:%s"
|
||||||
|
lockReleaseScript = `
|
||||||
|
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||||||
|
redis.call("DEL", KEYS[1])
|
||||||
|
redis.call("PUBLISH", KEYS[2], KEYS[1])
|
||||||
|
return 1
|
||||||
|
else
|
||||||
|
return 0
|
||||||
|
end
|
||||||
|
`
|
||||||
|
)
|
||||||
|
|
||||||
type DistributedLock struct {
|
type DistributedLock struct {
|
||||||
client *redis.Client
|
client *redis.Client
|
||||||
|
sha string
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(client *redis.Client, timeout time.Duration) *DistributedLock {
|
func New(client *redis.Client, timeout time.Duration) (*DistributedLock, error) {
|
||||||
|
sha, err := client.ScriptLoad(context.Background(), lockReleaseScript).Result()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return &DistributedLock{
|
return &DistributedLock{
|
||||||
client: client,
|
client: client,
|
||||||
|
sha: sha,
|
||||||
timeout: timeout,
|
timeout: timeout,
|
||||||
}
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DistributedLock) setLock(ctx context.Context, key string, uid string) error {
|
func (d *DistributedLock) setLock(ctx context.Context, key string, uid string) error {
|
||||||
|
|
|
@ -3,12 +3,15 @@ package distributedlock_test
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/christianselig/apollo-backend/internal/distributedlock"
|
"math/rand"
|
||||||
"github.com/go-redis/redis/v8"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/christianselig/apollo-backend/internal/distributedlock"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis/v8"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewRedisClient(t *testing.T, ctx context.Context) (*redis.Client, func()) {
|
func NewRedisClient(t *testing.T, ctx context.Context) (*redis.Client, func()) {
|
||||||
|
@ -29,13 +32,17 @@ func NewRedisClient(t *testing.T, ctx context.Context) (*redis.Client, func()) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDistributedLock_AcquireLock(t *testing.T) {
|
func TestDistributedLock_AcquireLock(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
key := fmt.Sprintf("%d", time.Now().UnixNano())
|
key := fmt.Sprintf("key:%d-%d", time.Now().UnixNano(), rand.Int63())
|
||||||
|
|
||||||
client, closer := NewRedisClient(t, ctx)
|
client, closer := NewRedisClient(t, ctx)
|
||||||
defer closer()
|
defer closer()
|
||||||
|
|
||||||
d := distributedlock.New(client, 10*time.Second)
|
d, err := distributedlock.New(client, 10*time.Second)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
lock, err := d.AcquireLock(ctx, key)
|
lock, err := d.AcquireLock(ctx, key)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
@ -50,13 +57,17 @@ func TestDistributedLock_AcquireLock(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDistributedLock_WaitAcquireLock(t *testing.T) {
|
func TestDistributedLock_WaitAcquireLock(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
key := fmt.Sprintf("%d", time.Now().UnixNano())
|
key := fmt.Sprintf("key:%d-%d", time.Now().UnixNano(), rand.Int63())
|
||||||
|
|
||||||
client, closer := NewRedisClient(t, ctx)
|
client, closer := NewRedisClient(t, ctx)
|
||||||
defer closer()
|
defer closer()
|
||||||
|
|
||||||
d := distributedlock.New(client, 10*time.Second)
|
d, err := distributedlock.New(client, 10*time.Second)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
lock, err := d.AcquireLock(ctx, key)
|
lock, err := d.AcquireLock(ctx, key)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
|
|
@ -20,19 +20,10 @@ func NewLock(distributedLock *DistributedLock, key string, uid string) *Lock {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Lock) Release(ctx context.Context) error {
|
func (l *Lock) Release(ctx context.Context) error {
|
||||||
script := `
|
|
||||||
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
|
||||||
redis.call("DEL", KEYS[1])
|
|
||||||
redis.call("PUBLISH", KEYS[2], KEYS[1])
|
|
||||||
return 1
|
|
||||||
else
|
|
||||||
return 0
|
|
||||||
end
|
|
||||||
`
|
|
||||||
|
|
||||||
ch := fmt.Sprintf(lockTopicFormat, l.key)
|
ch := fmt.Sprintf(lockTopicFormat, l.key)
|
||||||
|
|
||||||
result, err := l.distributedLock.client.Eval(ctx, script, []string{l.key, ch}, l.uid).Result()
|
result, err := l.distributedLock.client.EvalSha(ctx, l.distributedLock.sha, []string{l.key, ch}, l.uid).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue