From a3b9e1a0aca6ff66b4686876323d500a8008b4e4 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sat, 10 Jul 2021 00:59:34 -0400 Subject: [PATCH] fix up redis so that we only parse the script once --- cmd/apollo-scheduler/main.go | 46 ++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/cmd/apollo-scheduler/main.go b/cmd/apollo-scheduler/main.go index 7b765a7..bf4b9ed 100644 --- a/cmd/apollo-scheduler/main.go +++ b/cmd/apollo-scheduler/main.go @@ -104,8 +104,14 @@ func main() { } } + // Eval lua so that we don't keep parsing it + luaSha, err := evalScript(ctx, redisConn) + if err != nil { + panic(err) + } + s := gocron.NewScheduler(time.UTC) - s.Every(1).Second().Do(func() { enqueueAccounts(ctx, logger, statsd, pool, redisConn, notificationsQueue) }) + s.Every(1).Second().Do(func() { enqueueAccounts(ctx, logger, statsd, pool, redisConn, luaSha, notificationsQueue) }) s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, pool, redisConn) }) s.StartAsync() @@ -123,6 +129,25 @@ func main() { s.Stop() } +func evalScript(ctx context.Context, redisConn *redis.Client) (string, error) { + lua := fmt.Sprintf(` + local retv={} + local ids=cjson.decode(ARGV[1]) + + for i=1, #ids do + local key = KEYS[1] .. ":" .. ids[i] + if redis.call("exists", key) == 0 then + redis.call("setex", key, %d, 1) + retv[#retv + 1] = ids[i] + end + end + + return retv + `, checkTimeout) + + return redisConn.ScriptLoad(ctx, lua).Result() +} + func reportStats(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client) { var ( count int64 @@ -147,7 +172,7 @@ func reportStats(ctx context.Context, logger *logrus.Logger, statsd *statsd.Clie } } -func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, queue rmq.Queue) { +func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) { start := time.Now() now := float64(time.Now().UnixNano()/int64(time.Millisecond)) / 1000 @@ -216,22 +241,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. "len": len(batch), }).Debug("enqueueing batch") - lua := fmt.Sprintf(` - local retv={} - local ids=cjson.decode(ARGV[1]) - - for i=1, #ids do - local key = "locks:accounts:" .. ids[i] - if redis.call("exists", key) == 0 then - redis.call("setex", key, %d, 1) - retv[#retv + 1] = ids[i] - end - end - - return retv - `, checkTimeout) - - res, err := redisConn.Eval(ctx, lua, []string{}, batch).Result() + res, err := redisConn.EvalSha(ctx, luaSha, []string{"locks:accounts"}, batch).Result() if err != nil { logger.WithFields(logrus.Fields{ "err": err,