mirror of
https://github.com/christianselig/apollo-backend
synced 2024-11-25 21:27:42 +00:00
fix up redis so that we only parse the script once
This commit is contained in:
parent
d563ad26f1
commit
a3b9e1a0ac
1 changed files with 28 additions and 18 deletions
|
@ -104,8 +104,14 @@ func main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Eval lua so that we don't keep parsing it
|
||||||
|
luaSha, err := evalScript(ctx, redisConn)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
s := gocron.NewScheduler(time.UTC)
|
s := gocron.NewScheduler(time.UTC)
|
||||||
s.Every(1).Second().Do(func() { enqueueAccounts(ctx, logger, statsd, pool, redisConn, notificationsQueue) })
|
s.Every(1).Second().Do(func() { enqueueAccounts(ctx, logger, statsd, pool, redisConn, luaSha, notificationsQueue) })
|
||||||
s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, pool, redisConn) })
|
s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, pool, redisConn) })
|
||||||
s.StartAsync()
|
s.StartAsync()
|
||||||
|
|
||||||
|
@ -123,6 +129,25 @@ func main() {
|
||||||
s.Stop()
|
s.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func evalScript(ctx context.Context, redisConn *redis.Client) (string, error) {
|
||||||
|
lua := fmt.Sprintf(`
|
||||||
|
local retv={}
|
||||||
|
local ids=cjson.decode(ARGV[1])
|
||||||
|
|
||||||
|
for i=1, #ids do
|
||||||
|
local key = KEYS[1] .. ":" .. ids[i]
|
||||||
|
if redis.call("exists", key) == 0 then
|
||||||
|
redis.call("setex", key, %d, 1)
|
||||||
|
retv[#retv + 1] = ids[i]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
return retv
|
||||||
|
`, checkTimeout)
|
||||||
|
|
||||||
|
return redisConn.ScriptLoad(ctx, lua).Result()
|
||||||
|
}
|
||||||
|
|
||||||
func reportStats(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client) {
|
func reportStats(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client) {
|
||||||
var (
|
var (
|
||||||
count int64
|
count int64
|
||||||
|
@ -147,7 +172,7 @@ func reportStats(ctx context.Context, logger *logrus.Logger, statsd *statsd.Clie
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, queue rmq.Queue) {
|
func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
now := float64(time.Now().UnixNano()/int64(time.Millisecond)) / 1000
|
now := float64(time.Now().UnixNano()/int64(time.Millisecond)) / 1000
|
||||||
|
@ -216,22 +241,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
|
||||||
"len": len(batch),
|
"len": len(batch),
|
||||||
}).Debug("enqueueing batch")
|
}).Debug("enqueueing batch")
|
||||||
|
|
||||||
lua := fmt.Sprintf(`
|
res, err := redisConn.EvalSha(ctx, luaSha, []string{"locks:accounts"}, batch).Result()
|
||||||
local retv={}
|
|
||||||
local ids=cjson.decode(ARGV[1])
|
|
||||||
|
|
||||||
for i=1, #ids do
|
|
||||||
local key = "locks:accounts:" .. ids[i]
|
|
||||||
if redis.call("exists", key) == 0 then
|
|
||||||
redis.call("setex", key, %d, 1)
|
|
||||||
retv[#retv + 1] = ids[i]
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
return retv
|
|
||||||
`, checkTimeout)
|
|
||||||
|
|
||||||
res, err := redisConn.Eval(ctx, lua, []string{}, batch).Result()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithFields(logrus.Fields{
|
logger.WithFields(logrus.Fields{
|
||||||
"err": err,
|
"err": err,
|
||||||
|
|
Loading…
Reference in a new issue