From 3ce858927b0f70ef933f79e5d715006ef0834b54 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Thu, 8 Jul 2021 23:12:50 -0400 Subject: [PATCH] do batches in redis --- cmd/apollo-scheduler/main.go | 82 +++++++++++++++++-------- cmd/apollo-worker-notifications/main.go | 2 +- 2 files changed, 58 insertions(+), 26 deletions(-) diff --git a/cmd/apollo-scheduler/main.go b/cmd/apollo-scheduler/main.go index d1dd112..f6abf5c 100644 --- a/cmd/apollo-scheduler/main.go +++ b/cmd/apollo-scheduler/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "fmt" "log" "os" @@ -19,6 +20,10 @@ import ( "github.com/sirupsen/logrus" ) +const ( + batchSize = 100 +) + func main() { _ = godotenv.Load() @@ -172,36 +177,56 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. enqueued := 0 skipped := 0 failed := 0 - for _, id := range ids { - payload := fmt.Sprintf("%d", id) - lockKey := fmt.Sprintf("locks:accounts:%s", payload) - _, err := redisConn.Get(ctx, lockKey).Result() - if err == nil { - skipped++ - continue - } else if err != redis.Nil { + // Split ids in batches + for i := 0; i < len(ids); i += batchSize { + j := i + batchSize + if j > len(ids) { + j = len(ids) + } + batch := Int64Slice(ids[i:j]) + + logger.WithFields(logrus.Fields{ + "len": len(batch), + }).Debug("enqueueing batch") + + lua := ` + local retv={} + local ids=cjson.decode(ARGV[1]) + + for i=1, #ids do + local key = "locks:accounts:" .. ids[i] + if redis.call("exists", key) == 0 then + retv[#retv + 1] = ids[i] + end + redis.call("setex", key, 60, 1) + end + + return retv + ` + + res, err := redisConn.Eval(ctx, lua, []string{}, batch).Result() + if err != nil { logger.WithFields(logrus.Fields{ - "lockKey": lockKey, - "err": err, - }).Error("failed to check for account lock") + "err": err, + }).Error("failed to check for locked accounts") + } - if err := redisConn.SetEX(ctx, lockKey, true, 60*time.Second).Err(); err != nil { - logger.WithFields(logrus.Fields{ - "lockKey": lockKey, - "err": err, - }).Error("failed to lock account") - } + vals := res.([]interface{}) + enqueued += len(vals) + skipped += len(batch) - len(vals) - if err = queue.Publish(payload); err != nil { - logger.WithFields(logrus.Fields{ - "accountID": payload, - "err": err, - }).Error("failed to enqueue account") - failed++ - } else { - enqueued++ + for _, val := range vals { + id := val.(int64) + payload := fmt.Sprintf("%d", id) + if err = queue.Publish(payload); err != nil { + logger.WithFields(logrus.Fields{ + "accountID": payload, + "err": err, + }).Error("failed to enqueue account") + failed++ + } } } @@ -224,3 +249,10 @@ func logErrors(errChan <-chan error) { log.Print("error: ", err) } } + +type Int64Slice []int64 + +func (ii Int64Slice) MarshalBinary() (data []byte, err error) { + bytes, err := json.Marshal(ii) + return bytes, err +} diff --git a/cmd/apollo-worker-notifications/main.go b/cmd/apollo-worker-notifications/main.go index c389fd9..a8ccd26 100644 --- a/cmd/apollo-worker-notifications/main.go +++ b/cmd/apollo-worker-notifications/main.go @@ -404,7 +404,7 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { "accountID": delivery.Payload(), "token": device.APNSToken, "redditUser": account.Username, - }).Debug("sent notification") + }).Info("sent notification") } } }