From b1f266bf91a819a510a7aedae079ae806fc6b470 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Medeiros?= Date: Mon, 23 May 2022 14:17:25 -0400 Subject: [PATCH] migrate out of logrus (#76) --- go.mod | 6 +- go.sum | 12 +- internal/api/accounts.go | 51 +++--- internal/api/api.go | 26 +-- internal/api/devices.go | 10 +- internal/api/notifications.go | 10 +- internal/api/receipt.go | 6 +- internal/cmd/api.go | 9 +- internal/cmd/scheduler.go | 125 ++++----------- internal/cmd/worker.go | 3 +- internal/cmdutil/cmdutil.go | 22 +-- internal/worker/notifications.go | 173 ++++++++++---------- internal/worker/stuck_notifications.go | 149 +++++++++-------- internal/worker/subreddits.go | 214 ++++++++++++------------- internal/worker/trending.go | 167 ++++++++++--------- internal/worker/users.go | 131 +++++++-------- internal/worker/worker.go | 4 +- 17 files changed, 522 insertions(+), 596 deletions(-) diff --git a/go.mod b/go.mod index df60f0c..b18d271 100644 --- a/go.mod +++ b/go.mod @@ -17,11 +17,11 @@ require ( github.com/jackc/pgx/v4 v4.16.0 github.com/joho/godotenv v1.4.0 github.com/sideshow/apns2 v0.23.0 - github.com/sirupsen/logrus v1.8.1 github.com/smtp2go-oss/smtp2go-go v1.0.1 github.com/spf13/cobra v1.4.0 github.com/stretchr/testify v1.7.1 github.com/valyala/fastjson v1.6.3 + go.uber.org/zap v1.13.0 ) require ( @@ -45,10 +45,14 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/spf13/pflag v1.0.5 // indirect + go.uber.org/atomic v1.6.0 // indirect + go.uber.org/multierr v1.5.0 // indirect golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect + golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect golang.org/x/net v0.0.0-20220403103023-749bd193bc2b // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect golang.org/x/text v0.3.7 // indirect + golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/go.sum b/go.sum index 23a2e3c..6b193e1 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= contrib.go.opencensus.io/exporter/ocagent v0.6.0/go.mod h1:zmKjrJcdo0aYcVS7bmEeSEBLPA9YJp5bjrofdU3pIXs= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/datadog-go v4.8.3+incompatible h1:fNGaYSuObuQb5nzeTQqowRAd9bpDIRRV4/gUtIBjh8Q= github.com/DataDog/datadog-go v4.8.3+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= @@ -277,8 +278,6 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= -github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smtp2go-oss/smtp2go-go v1.0.1 h1:rwcoNLjOyigOzCjKp/guylKY/xJpoeypSxgtcC/g6DA= @@ -345,13 +344,17 @@ go.opentelemetry.io/proto/otlp v0.9.0/go.mod h1:1vKfU9rv61e9EVGthD1zNvUbiwPcimSs go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +go.uber.org/zap v1.13.0 h1:nR6NoDBgAf67s68NhaXbsojM+2gxp3S1hWkHDl27pVU= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= golang.org/x/crypto v0.0.0-20170512130425-ab89591268e0/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -372,9 +375,11 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -475,12 +480,14 @@ golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e h1:4nW4NLDYnU28ojHaHO8OVxFHk/aQ33U01a9cjED+pzE= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.0.0-20190502212712-4a2eb0188cbc/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= @@ -556,5 +563,6 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= rsc.io/goversion v1.2.0/go.mod h1:Eih9y/uIBS3ulggl7KNJ09xGSLcuNaLgmvvqa07sgfo= diff --git a/internal/api/accounts.go b/internal/api/accounts.go index f3adb47..7fa594a 100644 --- a/internal/api/accounts.go +++ b/internal/api/accounts.go @@ -8,7 +8,7 @@ import ( "time" "github.com/gorilla/mux" - "github.com/sirupsen/logrus" + "go.uber.org/zap" "github.com/christianselig/apollo-backend/internal/domain" "github.com/christianselig/apollo-backend/internal/reddit" @@ -142,8 +142,8 @@ func (a *api) upsertAccountsHandler(w http.ResponseWriter, r *http.Request) { for _, acc := range raccs { delete(accsMap, acc.NormalizedUsername()) - ac := a.reddit.NewAuthenticatedClient(reddit.SkipRateLimiting, acc.RefreshToken, acc.AccessToken) - tokens, err := ac.RefreshTokens(ctx) + rac := a.reddit.NewAuthenticatedClient(reddit.SkipRateLimiting, acc.RefreshToken, acc.AccessToken) + tokens, err := rac.RefreshTokens(ctx) if err != nil { a.errorResponse(w, r, 422, err) return @@ -154,8 +154,8 @@ func (a *api) upsertAccountsHandler(w http.ResponseWriter, r *http.Request) { acc.RefreshToken = tokens.RefreshToken acc.AccessToken = tokens.AccessToken - ac = a.reddit.NewAuthenticatedClient(reddit.SkipRateLimiting, acc.RefreshToken, acc.AccessToken) - me, err := ac.Me(ctx) + rac = a.reddit.NewAuthenticatedClient(reddit.SkipRateLimiting, tokens.RefreshToken, tokens.AccessToken) + me, err := rac.Me(ctx) if err != nil { a.errorResponse(w, r, 422, err) @@ -183,29 +183,26 @@ func (a *api) upsertAccountsHandler(w http.ResponseWriter, r *http.Request) { } for _, acc := range accsMap { - fmt.Println(acc.NormalizedUsername()) _ = a.accountRepo.Disassociate(ctx, &acc, &dev) } body := fmt.Sprintf(`{"apns_token": "%s"}`, apns) req, err := http.NewRequestWithContext(ctx, "POST", "https://apollopushserver.xyz/api/new-server-addition", strings.NewReader(body)) - req.Header.Set("Authorization", "Bearer 98g5j89aurqwfcsp9khlnvgd38fa15") - if err != nil { - a.logger.WithFields(logrus.Fields{ - "apns": apns, - }).Error(err) + a.logger.Error("could not setup request to disassociate from legacy api", zap.Error(err), zap.String("apns", apns)) + w.WriteHeader(http.StatusOK) return } - w.WriteHeader(http.StatusOK) - + req.Header.Set("Authorization", "Bearer 98g5j89aurqwfcsp9khlnvgd38fa15") resp, _ := a.httpClient.Do(req) if err != nil { - a.logger.WithFields(logrus.Fields{"err": err}).Error("failed to remove old client") + a.logger.Error("failed to remove from old notification server", zap.Error(err), zap.String("apns", apns)) return } resp.Body.Close() + + w.WriteHeader(http.StatusOK) } func (a *api) upsertAccountHandler(w http.ResponseWriter, r *http.Request) { @@ -216,9 +213,7 @@ func (a *api) upsertAccountHandler(w http.ResponseWriter, r *http.Request) { var acct domain.Account if err := json.NewDecoder(r.Body).Decode(&acct); err != nil { - a.logger.WithFields(logrus.Fields{ - "err": err, - }).Info("failed to parse request json") + a.logger.Error("failed to parse request json", zap.Error(err)) a.errorResponse(w, r, 422, err) return } @@ -227,9 +222,7 @@ func (a *api) upsertAccountHandler(w http.ResponseWriter, r *http.Request) { ac := a.reddit.NewAuthenticatedClient(reddit.SkipRateLimiting, acct.RefreshToken, acct.AccessToken) tokens, err := ac.RefreshTokens(ctx) if err != nil { - a.logger.WithFields(logrus.Fields{ - "err": err, - }).Info("failed to refresh token") + a.logger.Error("failed to refresh token", zap.Error(err)) a.errorResponse(w, r, 422, err) return } @@ -243,16 +236,14 @@ func (a *api) upsertAccountHandler(w http.ResponseWriter, r *http.Request) { me, err := ac.Me(ctx) if err != nil { - a.logger.WithFields(logrus.Fields{ - "err": err, - }).Info("failed to grab user details from Reddit") + a.logger.Error("failed to grab user details from reddit", zap.Error(err)) a.errorResponse(w, r, 500, err) return } if me.NormalizedUsername() != acct.NormalizedUsername() { err := fmt.Errorf("wrong user: expected %s, got %s", me.NormalizedUsername(), acct.NormalizedUsername()) - a.logger.WithFields(logrus.Fields{"err": err}).Warn("user is not who they say they are") + a.logger.Warn("user is not who they say they are", zap.Error(err)) a.errorResponse(w, r, 401, err) return } @@ -263,26 +254,20 @@ func (a *api) upsertAccountHandler(w http.ResponseWriter, r *http.Request) { // Associate dev, err := a.deviceRepo.GetByAPNSToken(ctx, vars["apns"]) if err != nil { - a.logger.WithFields(logrus.Fields{ - "err": err, - }).Info("failed fetching device from database") + a.logger.Error("failed to fetch device from database", zap.Error(err)) a.errorResponse(w, r, 500, err) return } // Upsert account if err := a.accountRepo.CreateOrUpdate(ctx, &acct); err != nil { - a.logger.WithFields(logrus.Fields{ - "err": err, - }).Info("failed updating account in database") + a.logger.Error("failed to update account", zap.Error(err)) a.errorResponse(w, r, 500, err) return } if err := a.accountRepo.Associate(ctx, &acct, &dev); err != nil { - a.logger.WithFields(logrus.Fields{ - "err": err, - }).Info("failed associating account with device") + a.logger.Error("failed to associate account with device", zap.Error(err)) a.errorResponse(w, r, 500, err) return } diff --git a/internal/api/api.go b/internal/api/api.go index 7a3628c..fdbac49 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -14,7 +14,7 @@ import ( "github.com/gorilla/mux" "github.com/jackc/pgx/v4/pgxpool" "github.com/sideshow/apns2/token" - "github.com/sirupsen/logrus" + "go.uber.org/zap" "github.com/christianselig/apollo-backend/internal/domain" "github.com/christianselig/apollo-backend/internal/reddit" @@ -22,7 +22,7 @@ import ( ) type api struct { - logger *logrus.Logger + logger *zap.Logger statsd *statsd.Client reddit *reddit.Client apns *token.Token @@ -35,7 +35,7 @@ type api struct { userRepo domain.UserRepository } -func NewAPI(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, redis *redis.Client, pool *pgxpool.Pool) *api { +func NewAPI(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, redis *redis.Client, pool *pgxpool.Pool) *api { reddit := reddit.NewClient( os.Getenv("REDDIT_CLIENT_ID"), os.Getenv("REDDIT_CLIENT_SECRET"), @@ -178,20 +178,20 @@ func (a *api) loggingMiddleware(next http.Handler) http.Handler { } } - logEntry := a.logger.WithFields(logrus.Fields{ - "duration": time.Since(start).Milliseconds(), - "method": r.Method, - "remote#addr": remoteAddr, - "response#bytes": lrw.bytes, - "status": lrw.statusCode, - "uri": r.RequestURI, - }) + fields := []zap.Field{ + zap.Int64("duration", time.Since(start).Milliseconds()), + zap.String("method", r.Method), + zap.String("remote#addr", remoteAddr), + zap.Int("response#bytes", lrw.bytes), + zap.Int("status", lrw.statusCode), + zap.String("uri", r.RequestURI), + } if lrw.statusCode == 200 { - logEntry.Info() + a.logger.Info("", fields...) } else { err := lrw.Header().Get("X-Apollo-Error") - logEntry.Error(err) + a.logger.Error(err, fields...) } }) } diff --git a/internal/api/devices.go b/internal/api/devices.go index dc7c498..2249ddc 100644 --- a/internal/api/devices.go +++ b/internal/api/devices.go @@ -11,7 +11,7 @@ import ( "github.com/gorilla/mux" "github.com/sideshow/apns2" "github.com/sideshow/apns2/payload" - "github.com/sirupsen/logrus" + "go.uber.org/zap" "github.com/christianselig/apollo-backend/internal/domain" ) @@ -46,9 +46,7 @@ func (a *api) testDeviceHandler(w http.ResponseWriter, r *http.Request) { d, err := a.deviceRepo.GetByAPNSToken(ctx, tok) if err != nil { - a.logger.WithFields(logrus.Fields{ - "err": err, - }).Info("failed fetching device from database") + a.logger.Error("failed fetching device from database", zap.Error(err)) a.errorResponse(w, r, 500, err) return } @@ -83,9 +81,7 @@ func (a *api) testDeviceHandler(w http.ResponseWriter, r *http.Request) { } if _, err := client.Push(notification); err != nil { - a.logger.WithFields(logrus.Fields{ - "err": err, - }).Info("failed to send test notification") + a.logger.Info("failed to send test notification", zap.Error(err)) a.errorResponse(w, r, 500, err) return } diff --git a/internal/api/notifications.go b/internal/api/notifications.go index e6df177..73666da 100644 --- a/internal/api/notifications.go +++ b/internal/api/notifications.go @@ -7,7 +7,7 @@ import ( "github.com/gorilla/mux" "github.com/sideshow/apns2" "github.com/sideshow/apns2/payload" - "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const ( @@ -31,9 +31,7 @@ func generateNotificationTester(a *api, fun notificationGenerator) func(w http.R d, err := a.deviceRepo.GetByAPNSToken(ctx, tok) if err != nil { - a.logger.WithFields(logrus.Fields{ - "err": err, - }).Info("failed fetching device from database") + a.logger.Info("failed fetching device from database", zap.Error(err)) a.errorResponse(w, r, 500, err) return } @@ -56,9 +54,7 @@ func generateNotificationTester(a *api, fun notificationGenerator) func(w http.R } if _, err := client.Push(notification); err != nil { - a.logger.WithFields(logrus.Fields{ - "err": err, - }).Info("failed to send test notification") + a.logger.Info("failed to send test notification", zap.Error(err)) a.errorResponse(w, r, 500, err) return } diff --git a/internal/api/receipt.go b/internal/api/receipt.go index 0426aa8..0de9177 100644 --- a/internal/api/receipt.go +++ b/internal/api/receipt.go @@ -7,7 +7,7 @@ import ( "time" "github.com/gorilla/mux" - "github.com/sirupsen/logrus" + "go.uber.org/zap" "github.com/christianselig/apollo-backend/internal/domain" "github.com/christianselig/apollo-backend/internal/itunes" @@ -23,9 +23,7 @@ func (a *api) checkReceiptHandler(w http.ResponseWriter, r *http.Request) { iapr, err := itunes.NewIAPResponse(string(body), true) if err != nil { - a.logger.WithFields(logrus.Fields{ - "err": err, - }).Info("failed verifying receipt") + a.logger.Info("failed to verify receipt", zap.Error(err)) a.errorResponse(w, r, 500, err) return } diff --git a/internal/cmd/api.go b/internal/cmd/api.go index 1760a81..8e142d5 100644 --- a/internal/cmd/api.go +++ b/internal/cmd/api.go @@ -7,8 +7,8 @@ import ( "github.com/christianselig/apollo-backend/internal/api" "github.com/christianselig/apollo-backend/internal/cmdutil" - "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "go.uber.org/zap" ) func APICmd(ctx context.Context) *cobra.Command { @@ -24,7 +24,8 @@ func APICmd(ctx context.Context) *cobra.Command { port, _ = strconv.Atoi(os.Getenv("PORT")) } - logger := cmdutil.NewLogrusLogger(false) + logger := cmdutil.NewLogger(false) + defer func() { _ = logger.Sync() }() statsd, err := cmdutil.NewStatsdClient() if err != nil { @@ -49,9 +50,7 @@ func APICmd(ctx context.Context) *cobra.Command { go func() { _ = srv.ListenAndServe() }() - logger.WithFields(logrus.Fields{ - "port": port, - }).Info("started api") + logger.Info("started api", zap.Int("port", port)) <-ctx.Done() diff --git a/internal/cmd/scheduler.go b/internal/cmd/scheduler.go index 60344f7..ce25e25 100644 --- a/internal/cmd/scheduler.go +++ b/internal/cmd/scheduler.go @@ -13,8 +13,8 @@ import ( "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" - "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "go.uber.org/zap" "github.com/christianselig/apollo-backend/internal/cmdutil" "github.com/christianselig/apollo-backend/internal/domain" @@ -29,7 +29,8 @@ func SchedulerCmd(ctx context.Context) *cobra.Command { Args: cobra.ExactArgs(0), Short: "Schedules jobs and runs several maintenance tasks periodically.", RunE: func(cmd *cobra.Command, args []string) error { - logger := cmdutil.NewLogrusLogger(false) + logger := cmdutil.NewLogger(false) + defer func() { _ = logger.Sync() }() statsd, err := cmdutil.NewStatsdClient() if err != nil { @@ -126,71 +127,56 @@ func evalScript(ctx context.Context, redis *redis.Client) (string, error) { return redis.ScriptLoad(ctx, lua).Result() } -func pruneAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) { +func pruneAccounts(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool) { expiry := time.Now().Add(-domain.StaleTokenThreshold) ar := repository.NewPostgresAccount(pool) stale, err := ar.PruneStale(ctx, expiry) if err != nil { - logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed cleaning stale accounts") + logger.Error("failed to clean stale accounts", zap.Error(err)) return } orphaned, err := ar.PruneOrphaned(ctx) if err != nil { - logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed cleaning orphaned accounts") + logger.Error("failed to clean orphaned accounts", zap.Error(err)) return } if count := stale + orphaned; count > 0 { - logger.WithFields(logrus.Fields{ - "stale": stale, - "orphaned": orphaned, - }).Info("pruned accounts") + logger.Info("pruned accounts", zap.Int64("stale", stale), zap.Int64("orphaned", orphaned)) } } -func pruneDevices(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) { +func pruneDevices(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool) { now := time.Now() dr := repository.NewPostgresDevice(pool) count, err := dr.PruneStale(ctx, now) if err != nil { - logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed cleaning stale devices") + logger.Error("failed to clean stale devices", zap.Error(err)) return } if count > 0 { - logger.WithFields(logrus.Fields{ - "count": count, - }).Info("pruned devices") + logger.Info("pruned devices", zap.Int64("count", count)) } } -func cleanQueues(logger *logrus.Logger, jobsConn rmq.Connection) { +func cleanQueues(logger *zap.Logger, jobsConn rmq.Connection) { cleaner := rmq.NewCleaner(jobsConn) count, err := cleaner.Clean() if err != nil { - logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed cleaning jobs from queues") + logger.Error("failed to clean jobs from queues", zap.Error(err)) return } if count > 0 { - logger.WithFields(logrus.Fields{ - "count": count, - }).Info("returned jobs to queues") + logger.Info("returned jobs to queues", zap.Int64("count", count)) } } -func reportStats(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool) { +func reportStats(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool) { var ( count int64 @@ -209,14 +195,11 @@ func reportStats(ctx context.Context, logger *logrus.Logger, statsd *statsd.Clie _ = pool.QueryRow(ctx, metric.query).Scan(&count) _ = statsd.Gauge(metric.name, float64(count), []string{}, 1) - logger.WithFields(logrus.Fields{ - "count": count, - "metric": metric.name, - }).Debug("fetched metrics") + logger.Debug("fetched metrics", zap.String("metric", metric.name), zap.Int64("count", count)) } } -func enqueueUsers(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) { +func enqueueUsers(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) { now := time.Now() next := now.Add(domain.NotificationCheckInterval) @@ -255,9 +238,7 @@ func enqueueUsers(ctx context.Context, logger *logrus.Logger, statsd *statsd.Cli }) if err != nil { - logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed to fetch batch of users") + logger.Error("failed to fetch batch of users", zap.Error(err)) return } @@ -265,10 +246,7 @@ func enqueueUsers(ctx context.Context, logger *logrus.Logger, statsd *statsd.Cli return } - logger.WithFields(logrus.Fields{ - "count": len(ids), - "start": now, - }).Debug("enqueueing user batch") + logger.Debug("enqueueing user batch", zap.Int("count", len(ids)), zap.Time("start", now)) batchIds := make([]string, len(ids)) for i, id := range ids { @@ -276,13 +254,11 @@ func enqueueUsers(ctx context.Context, logger *logrus.Logger, statsd *statsd.Cli } if err = queue.Publish(batchIds...); err != nil { - logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed to enqueue user") + logger.Error("failed to enqueue user batch", zap.Error(err)) } } -func enqueueSubreddits(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queues []rmq.Queue) { +func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queues []rmq.Queue) { now := time.Now() next := now.Add(domain.SubredditCheckInterval) @@ -321,9 +297,7 @@ func enqueueSubreddits(ctx context.Context, logger *logrus.Logger, statsd *stats }) if err != nil { - logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed to fetch batch of subreddits") + logger.Error("failed to fetch batch of subreddits", zap.Error(err)) return } @@ -331,10 +305,7 @@ func enqueueSubreddits(ctx context.Context, logger *logrus.Logger, statsd *stats return } - logger.WithFields(logrus.Fields{ - "count": len(ids), - "start": now, - }).Debug("enqueueing subreddit batch") + logger.Debug("enqueueing subreddit batch", zap.Int("count", len(ids)), zap.Time("start", now)) batchIds := make([]string, len(ids)) for i, id := range ids { @@ -343,16 +314,13 @@ func enqueueSubreddits(ctx context.Context, logger *logrus.Logger, statsd *stats for _, queue := range queues { if err = queue.Publish(batchIds...); err != nil { - logger.WithFields(logrus.Fields{ - "queue": queue, - "err": err, - }).Error("failed to enqueue subreddit") + logger.Error("failed to enqueue subreddit batch", zap.Error(err)) } } } -func enqueueStuckAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) { +func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) { now := time.Now() next := now.Add(domain.StuckNotificationCheckInterval) @@ -391,9 +359,7 @@ func enqueueStuckAccounts(ctx context.Context, logger *logrus.Logger, statsd *st }) if err != nil { - logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed to fetch possible stuck accounts") + logger.Error("failed to fetch accounts", zap.Error(err)) return } @@ -401,10 +367,7 @@ func enqueueStuckAccounts(ctx context.Context, logger *logrus.Logger, statsd *st return } - logger.WithFields(logrus.Fields{ - "count": len(ids), - "start": now, - }).Debug("enqueueing stuck account batch") + logger.Debug("enqueueing stuck account batch", zap.Int("count", len(ids)), zap.Time("start", now)) batchIds := make([]string, len(ids)) for i, id := range ids { @@ -412,14 +375,11 @@ func enqueueStuckAccounts(ctx context.Context, logger *logrus.Logger, statsd *st } if err = queue.Publish(batchIds...); err != nil { - logger.WithFields(logrus.Fields{ - "queue": queue, - "err": err, - }).Error("failed to enqueue stuck accounts") + logger.Error("failed to enqueue stuck account batch", zap.Error(err)) } } -func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) { +func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) { now := time.Now() next := now.Add(domain.NotificationCheckInterval) @@ -461,9 +421,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. }) if err != nil { - logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed to fetch batch of accounts") + logger.Error("failed to fetch batch of accounts", zap.Error(err)) return } @@ -471,10 +429,8 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. return } - logger.WithFields(logrus.Fields{ - "count": len(ids), - "start": now, - }).Debug("enqueueing account batch") + logger.Debug("enqueueing account batch", zap.Int("count", len(ids)), zap.Time("start", now)) + // Split ids in batches for i := 0; i < len(ids); i += batchSize { j := i + batchSize @@ -483,16 +439,11 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. } batch := Int64Slice(ids[i:j]) - logger.WithFields(logrus.Fields{ - "len": len(batch), - }).Debug("enqueueing batch") + logger.Debug("enqueueing batch", zap.Int("len", len(batch))) res, err := redisConn.EvalSha(ctx, luaSha, []string{"locks:accounts"}, batch).Result() if err != nil { - logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed to check for locked accounts") - + logger.Error("failed to check for locked accounts", zap.Error(err)) } vals := res.([]interface{}) @@ -509,17 +460,11 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd. } if err = queue.Publish(batchIds...); err != nil { - logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed to enqueue account") + logger.Error("failed to enqueue account batch", zap.Error(err)) } } - logger.WithFields(logrus.Fields{ - "count": enqueued, - "skipped": skipped, - "start": now, - }).Debug("done enqueueing account batch") + logger.Debug("done enqueueing account batch", zap.Int("count", enqueued), zap.Int("skipped", skipped), zap.Time("start", now)) } type Int64Slice []int64 diff --git a/internal/cmd/worker.go b/internal/cmd/worker.go index babfdfb..915cb07 100644 --- a/internal/cmd/worker.go +++ b/internal/cmd/worker.go @@ -34,7 +34,8 @@ func WorkerCmd(ctx context.Context) *cobra.Command { return fmt.Errorf("need a queue to work on") } - logger := cmdutil.NewLogrusLogger(false) + logger := cmdutil.NewLogger(false) + defer func() { _ = logger.Sync() }() tag := fmt.Sprintf("worker:%s", queueID) statsd, err := cmdutil.NewStatsdClient(tag) diff --git a/internal/cmdutil/cmdutil.go b/internal/cmdutil/cmdutil.go index 726ac37..3acd9b6 100644 --- a/internal/cmdutil/cmdutil.go +++ b/internal/cmdutil/cmdutil.go @@ -9,19 +9,13 @@ import ( "github.com/adjust/rmq/v4" "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v4/pgxpool" - "github.com/sirupsen/logrus" + "go.uber.org/zap" ) -func NewLogrusLogger(debug bool) *logrus.Logger { - logger := logrus.New() - - if debug || os.Getenv("ENV") == "" { - logger.SetLevel(logrus.DebugLevel) - } else { - logger.SetFormatter(&logrus.TextFormatter{ - DisableColors: true, - FullTimestamp: true, - }) +func NewLogger(debug bool) *zap.Logger { + logger, _ := zap.NewProduction() + if debug || os.Getenv("ENV") != "production" { + logger, _ = zap.NewDevelopment() } return logger @@ -73,13 +67,11 @@ func NewDatabasePool(ctx context.Context, maxConns int) (*pgxpool.Pool, error) { return pgxpool.ConnectConfig(ctx, config) } -func NewQueueClient(logger *logrus.Logger, conn *redis.Client, identifier string) (rmq.Connection, error) { +func NewQueueClient(logger *zap.Logger, conn *redis.Client, identifier string) (rmq.Connection, error) { errChan := make(chan error, 10) go func() { for err := range errChan { - logger.WithFields(logrus.Fields{ - "err": err, - }).Error("error occured with queue") + logger.Error("error occurred within queue", zap.Error(err)) } }() diff --git a/internal/worker/notifications.go b/internal/worker/notifications.go index 123b9c9..ef04bae 100644 --- a/internal/worker/notifications.go +++ b/internal/worker/notifications.go @@ -14,7 +14,7 @@ import ( "github.com/sideshow/apns2" "github.com/sideshow/apns2/payload" "github.com/sideshow/apns2/token" - "github.com/sirupsen/logrus" + "go.uber.org/zap" "github.com/christianselig/apollo-backend/internal/domain" "github.com/christianselig/apollo-backend/internal/reddit" @@ -33,7 +33,7 @@ const ( type notificationsWorker struct { context.Context - logger *logrus.Logger + logger *zap.Logger statsd *statsd.Client db *pgxpool.Pool redis *redis.Client @@ -47,7 +47,7 @@ type notificationsWorker struct { deviceRepo domain.DeviceRepository } -func NewNotificationsWorker(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { +func NewNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { reddit := reddit.NewClient( os.Getenv("REDDIT_CLIENT_ID"), os.Getenv("REDDIT_CLIENT_SECRET"), @@ -92,9 +92,7 @@ func (nw *notificationsWorker) Start() error { return err } - nw.logger.WithFields(logrus.Fields{ - "numConsumers": nw.consumers, - }).Info("starting up notifications worker") + nw.logger.Info("starting up notifications worker", zap.Int("consumers", nw.consumers)) prefetchLimit := int64(nw.consumers * 2) @@ -139,40 +137,28 @@ func NewNotificationsConsumer(nw *notificationsWorker, tag int) *notificationsCo func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { defer func() { - lockKey := fmt.Sprintf("locks:accounts:%s", delivery.Payload()) - if err := nc.redis.Del(nc, lockKey).Err(); err != nil { - nc.logger.WithFields(logrus.Fields{ - "lockKey": lockKey, - "err": err, - }).Error("failed to remove lock") + key := fmt.Sprintf("locks:accounts:%s", delivery.Payload()) + if err := nc.redis.Del(nc, key).Err(); err != nil { + nc.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key)) } }() - nc.logger.WithFields(logrus.Fields{ - "account#id": delivery.Payload(), - }).Debug("starting job") - id, err := strconv.ParseInt(delivery.Payload(), 10, 64) if err != nil { - nc.logger.WithFields(logrus.Fields{ - "account#id": delivery.Payload(), - "err": err, - }).Error("failed to parse account ID") - + nc.logger.Error("failed to parse account id from payload", zap.Error(err), zap.String("payload", delivery.Payload())) _ = delivery.Reject() return } + nc.logger.Debug("starting job", zap.Int64("account#id", id)) + defer func() { _ = delivery.Ack() }() now := time.Now() account, err := nc.accountRepo.GetByID(nc, id) if err != nil { - nc.logger.WithFields(logrus.Fields{ - "account#id": id, - "err": err, - }).Error("failed to fetch account from database") + nc.logger.Error("failed to fetch account from database", zap.Error(err), zap.Int64("account#id", id)) return } @@ -184,35 +170,39 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { defer func(acc *domain.Account) { if err = nc.accountRepo.Update(nc, acc); err != nil { - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "err": err, - }).Error("failed to update account") + nc.logger.Error("failed to update account", + zap.Error(err), + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) } }(&account) rac := nc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken) if account.TokenExpiresAt.Before(now) { - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - }).Debug("refreshing reddit token") + nc.logger.Debug("refreshing reddit token", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) tokens, err := rac.RefreshTokens(nc) if err != nil { if err != reddit.ErrOauthRevoked { - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "err": err, - }).Error("failed to refresh reddit tokens") + nc.logger.Error("failed to refresh reddit tokens", + zap.Error(err), + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) return } err = nc.deleteAccount(account) if err != nil { - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "err": err, - }).Error("failed to remove revoked account") + nc.logger.Error("failed to remove revoked account", + zap.Error(err), + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) } return @@ -234,9 +224,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { _ = nc.statsd.Histogram("apollo.queue.delay", float64(latency.Milliseconds()), []string{}, rate) } - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - }).Debug("fetching message inbox") + nc.logger.Debug("fetching message inbox", zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername())) opts := []reddit.RequestOption{reddit.WithQuery("limit", "10")} if account.LastMessageID != "" { @@ -249,38 +237,42 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { case reddit.ErrTimeout, reddit.ErrRateLimited: // Don't log timeouts or rate limits break case reddit.ErrOauthRevoked: - err = nc.deleteAccount(account) - if err != nil { - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "err": err, - }).Error("failed to remove revoked account") - return + if err = nc.deleteAccount(account); err != nil { + nc.logger.Error("failed to remove revoked account", + zap.Error(err), + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) + } else { + nc.logger.Info("removed revoked account", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) } - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - }).Info("removed revoked account") default: - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "err": err, - }).Error("failed to fetch message inbox") + nc.logger.Error("failed to fetch message inbox", + zap.Error(err), + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) } return } // Figure out where we stand if msgs.Count == 0 { - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - }).Debug("no new messages, bailing early") + nc.logger.Debug("no new messages, bailing early", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) return } - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "count": msgs.Count, - }).Debug("fetched messages") + nc.logger.Debug("fetched messages", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + zap.Int("count", msgs.Count), + ) for _, msg := range msgs.Children { if !msg.IsDeleted() { @@ -291,25 +283,28 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { // Let's populate this with the latest message so we don't flood users with stuff if newAccount { - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - }).Debug("populating first message ID to prevent spamming") + nc.logger.Debug("populating first message id to prevent spamming", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) return } devices, err := nc.deviceRepo.GetInboxNotifiableByAccountID(nc, account.ID) if err != nil { - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "err": err, - }).Error("failed to fetch account devices") + nc.logger.Error("failed to fetch account devices", + zap.Error(err), + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) return } if len(devices) == 0 { - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - }).Debug("no notifiable devices, finishing job") + nc.logger.Debug("no notifiable devices, bailing early", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) return } @@ -335,21 +330,24 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { res, err := client.Push(notification) if err != nil || !res.Sent() { _ = nc.statsd.Incr("apns.notification.errors", []string{}, 1) - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "err": err, - "status": res.StatusCode, - "reason": res.Reason, - }).Error("failed to send notification") + nc.logger.Error("failed to send notification", + zap.Error(err), + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + zap.String("device#token", device.APNSToken), + zap.Int("response#status", res.StatusCode), + zap.String("response#reason", res.Reason), + ) // Delete device as notifications might have been disabled here _ = nc.deviceRepo.Delete(nc, device.APNSToken) } else { _ = nc.statsd.Incr("apns.notification.sent", []string{}, 1) - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "token": device.APNSToken, - }).Info("sent notification") + nc.logger.Info("sent notification", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + zap.String("device#token", device.APNSToken), + ) } } } @@ -357,9 +355,10 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { ev := fmt.Sprintf("Sent notification to /u/%s (x%d)", account.Username, msgs.Count) _ = nc.statsd.SimpleEvent(ev, "") - nc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - }).Debug("finishing job") + nc.logger.Debug("finishing job", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) } func (nc *notificationsConsumer) deleteAccount(account domain.Account) error { diff --git a/internal/worker/stuck_notifications.go b/internal/worker/stuck_notifications.go index 18de604..147c690 100644 --- a/internal/worker/stuck_notifications.go +++ b/internal/worker/stuck_notifications.go @@ -10,7 +10,7 @@ import ( "github.com/adjust/rmq/v4" "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v4/pgxpool" - "github.com/sirupsen/logrus" + "go.uber.org/zap" "github.com/christianselig/apollo-backend/internal/domain" "github.com/christianselig/apollo-backend/internal/reddit" @@ -20,7 +20,7 @@ import ( type stuckNotificationsWorker struct { context.Context - logger *logrus.Logger + logger *zap.Logger statsd *statsd.Client db *pgxpool.Pool redis *redis.Client @@ -32,7 +32,7 @@ type stuckNotificationsWorker struct { accountRepo domain.AccountRepository } -func NewStuckNotificationsWorker(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { +func NewStuckNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { reddit := reddit.NewClient( os.Getenv("REDDIT_CLIENT_ID"), os.Getenv("REDDIT_CLIENT_SECRET"), @@ -61,9 +61,7 @@ func (snw *stuckNotificationsWorker) Start() error { return err } - snw.logger.WithFields(logrus.Fields{ - "numConsumers": snw.consumers, - }).Info("starting up stuck notifications worker") + snw.logger.Info("starting up stuck notifications worker", zap.Int("consumers", snw.consumers)) prefetchLimit := int64(snw.consumers * 2) @@ -102,69 +100,67 @@ func NewStuckNotificationsConsumer(snw *stuckNotificationsWorker, tag int) *stuc } func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) { - snc.logger.WithFields(logrus.Fields{ - "account#id": delivery.Payload(), - }).Debug("starting job") - id, err := strconv.ParseInt(delivery.Payload(), 10, 64) if err != nil { - snc.logger.WithFields(logrus.Fields{ - "account#id": delivery.Payload(), - "err": err, - }).Error("failed to parse account ID") + snc.logger.Error("failed to parse account id from payload", zap.Error(err), zap.String("payload", delivery.Payload())) _ = delivery.Reject() return } + snc.logger.Debug("starting job", zap.Int64("account#id", id)) + defer func() { _ = delivery.Ack() }() account, err := snc.accountRepo.GetByID(snc, id) if err != nil { - snc.logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed to fetch account from database") + snc.logger.Error("failed to fetch account from database", zap.Error(err), zap.Int64("account#id", id)) return } if account.LastMessageID == "" { - snc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - }).Debug("account has no messages, returning") + snc.logger.Debug("account has no messages, bailing early", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) return } rac := snc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken) - snc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "thing#id": account.LastMessageID, - }).Debug("fetching last thing") + snc.logger.Debug("fetching last thing", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) kind := account.LastMessageID[:2] var things *reddit.ListingResponse if kind == "t4" { - snc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "thing#id": account.LastMessageID, - }).Debug("checking last thing via inbox") + snc.logger.Debug("checking last thing via inbox", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) things, err = rac.MessageInbox(snc) if err != nil { if err != reddit.ErrRateLimited { - snc.logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed to fetch last thing via inbox") + snc.logger.Error("failed to fetch last thing via inbox", + zap.Error(err), + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) } return } } else { things, err = rac.AboutInfo(snc, account.LastMessageID) if err != nil { - snc.logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed to fetch last thing") + snc.logger.Error("failed to fetch last thing", + zap.Error(err), + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) return } } @@ -185,9 +181,11 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) { sthings, err := rac.MessageInbox(snc) if err != nil { - snc.logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed to check inbox") + snc.logger.Error("failed to check inbox", + zap.Error(err), + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) return } @@ -199,52 +197,59 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) { } if !found { - snc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "thing#id": account.LastMessageID, - }).Debug("thing exists, but not in inbox, marking as deleted") + snc.logger.Debug("thing exists, but not on inbox, marking as deleted", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + zap.String("thing#id", account.LastMessageID), + ) break } - snc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "thing#id": account.LastMessageID, - }).Debug("thing exists, returning") + snc.logger.Debug("thing exists, bailing early", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + zap.String("thing#id", account.LastMessageID), + ) return } } - snc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "thing#id": account.LastMessageID, - }).Info("thing got deleted, resetting") + snc.logger.Info("thing got deleted, resetting", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + zap.String("thing#id", account.LastMessageID), + ) if kind != "t4" { - snc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - }).Debug("getting message inbox to determine last good thing") + snc.logger.Debug("getting message inbox to find last good thing", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) things, err = rac.MessageInbox(snc) if err != nil { - snc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "err": err, - }).Error("failed to get message inbox") + snc.logger.Error("failed to check inbox", + zap.Error(err), + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) return } } account.LastMessageID = "" - snc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - }).Debug("calculating last good thing") + snc.logger.Debug("calculating last good thing", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) for _, thing := range things.Children { if thing.IsDeleted() { - snc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "thing#id": thing.FullName(), - }).Debug("thing deleted, next") + snc.logger.Debug("thing got deleted, checking next", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + zap.String("thing#id", thing.FullName()), + ) continue } @@ -252,15 +257,17 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) { break } - snc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "thing#id": account.LastMessageID, - }).Debug("updating last good thing") + snc.logger.Debug("updating last good thing", + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + zap.String("thing#id", account.LastMessageID), + ) if err := snc.accountRepo.Update(snc, &account); err != nil { - snc.logger.WithFields(logrus.Fields{ - "account#username": account.NormalizedUsername(), - "err": err, - }).Error("failed to update account's message id") + snc.logger.Error("failed to update account's last message id", + zap.Error(err), + zap.Int64("account#id", id), + zap.String("account#username", account.NormalizedUsername()), + ) } } diff --git a/internal/worker/subreddits.go b/internal/worker/subreddits.go index 65fcbab..436f149 100644 --- a/internal/worker/subreddits.go +++ b/internal/worker/subreddits.go @@ -16,7 +16,7 @@ import ( "github.com/sideshow/apns2" "github.com/sideshow/apns2/payload" "github.com/sideshow/apns2/token" - "github.com/sirupsen/logrus" + "go.uber.org/zap" "github.com/christianselig/apollo-backend/internal/domain" "github.com/christianselig/apollo-backend/internal/reddit" @@ -26,7 +26,7 @@ import ( type subredditsWorker struct { context.Context - logger *logrus.Logger + logger *zap.Logger statsd *statsd.Client db *pgxpool.Pool redis *redis.Client @@ -47,7 +47,7 @@ const ( subredditNotificationBodyFormat = "r/%s: \u201c%s\u201d" ) -func NewSubredditsWorker(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { +func NewSubredditsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { reddit := reddit.NewClient( os.Getenv("REDDIT_CLIENT_ID"), os.Getenv("REDDIT_CLIENT_SECRET"), @@ -94,9 +94,7 @@ func (sw *subredditsWorker) Start() error { return err } - sw.logger.WithFields(logrus.Fields{ - "numConsumers": sw.consumers, - }).Info("starting up subreddits worker") + sw.logger.Info("starting up subreddits worker", zap.Int("consumers", sw.consumers)) prefetchLimit := int64(sw.consumers * 2) @@ -140,44 +138,38 @@ func NewSubredditsConsumer(sw *subredditsWorker, tag int) *subredditsConsumer { } func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": delivery.Payload(), - }).Debug("starting job") - id, err := strconv.ParseInt(delivery.Payload(), 10, 64) if err != nil { - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": delivery.Payload(), - "err": err, - }).Error("failed to parse subreddit ID") - + sc.logger.Error("failed to parse subreddit id from payload", zap.Error(err), zap.String("payload", delivery.Payload())) _ = delivery.Reject() return } + sc.logger.Debug("starting job", zap.Int64("subreddit#id", id)) + defer func() { _ = delivery.Ack() }() subreddit, err := sc.subredditRepo.GetByID(sc, id) if err != nil { - sc.logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed to fetch subreddit from database") + sc.logger.Error("failed to fetch subreddit from database", zap.Error(err), zap.Int64("subreddit#id", id)) return } watchers, err := sc.watcherRepo.GetBySubredditID(sc, subreddit.ID) if err != nil { - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "err": err, - }).Error("failed to fetch watchers from database") + sc.logger.Error("failed to fetch watchers from database", + zap.Error(err), + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + ) return } if len(watchers) == 0 { - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - }).Debug("no watchers for subreddit, finishing job") + sc.logger.Debug("no watchers for subreddit, bailing early", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + ) return } @@ -188,17 +180,17 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { seenPosts := map[string]bool{} // Load 500 newest posts - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - }).Debug("loading up to 500 new posts") + sc.logger.Debug("loading up to 500 new posts", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + ) for page := 0; page < 5; page++ { - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - "page": page, - }).Debug("loading new posts") + sc.logger.Debug("loading new posts", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.Int("page", page), + ) i := rand.Intn(len(watchers)) watcher := watchers[i] @@ -213,19 +205,21 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { ) if err != nil { - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "err": err, - }).Error("failed to fetch new posts") + sc.logger.Error("failed fetchint new posts", + zap.Error(err), + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.Int("page", page), + ) continue } - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - "count": sps.Count, - "page": page, - }).Debug("loaded new posts for page") + sc.logger.Debug("loaded new posts", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.Int("page", page), + zap.Int("count", sps.Count), + ) // If it's empty, we're done if sps.Count == 0 { @@ -250,20 +244,20 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { } if finished { - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - "page": page, - }).Debug("reached date threshold") + sc.logger.Debug("reached date threshold", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.Int("page", page), + ) break } } // Load hot posts - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - }).Debug("loading hot posts") + sc.logger.Debug("loading hot posts", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + ) { i := rand.Intn(len(watchers)) watcher := watchers[i] @@ -276,16 +270,17 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { ) if err != nil { - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "err": err, - }).Error("failed to fetch hot posts") + sc.logger.Error("failed to fetch hot posts", + zap.Error(err), + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + ) } else { - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - "count": sps.Count, - }).Debug("loaded hot posts") + sc.logger.Debug("loaded hot posts", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.Int("count", sps.Count), + ) for _, post := range sps.Children { if post.CreatedAt.Before(threshold) { @@ -299,11 +294,11 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { } } - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - "count": len(posts), - }).Debug("checking posts for hits") + sc.logger.Debug("checking posts for watcher hits", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.Int("count", len(posts)), + ) for _, post := range posts { lowcaseAuthor := strings.ToLower(post.Author) lowcaseTitle := strings.ToLower(post.Title) @@ -344,31 +339,30 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { notified, _ := sc.redis.Get(sc, lockKey).Bool() if notified { - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - "watcher#id": watcher.ID, - "post#id": post.ID, - }).Debug("already notified, skipping") - + sc.logger.Debug("already notified, skipping", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.Int64("watcher#id", watcher.ID), + zap.String("post#id", post.ID), + ) continue } if err := sc.watcherRepo.IncrementHits(sc, watcher.ID); err != nil { - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "watcher#id": watcher.ID, - "err": err, - }).Error("could not increment hits") + sc.logger.Error("could not increment hits", + zap.Error(err), + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.Int64("watcher#id", watcher.ID), + ) return } - - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - "watcher#id": watcher.ID, - "post#id": post.ID, - }).Debug("got a hit") + sc.logger.Debug("got a hit", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.Int64("watcher#id", watcher.ID), + zap.String("post#id", post.ID), + ) sc.redis.SetEX(sc, lockKey, true, 24*time.Hour) notifs = append(notifs, watcher) @@ -377,13 +371,12 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { if len(notifs) == 0 { continue } - - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - "post#id": post.ID, - "count": len(notifs), - }).Debug("got hits for post") + sc.logger.Debug("got hits for post", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.String("post#id", post.ID), + zap.Int("count", len(notifs)), + ) payload := payloadFromPost(post) @@ -407,28 +400,31 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { res, err := client.Push(notification) if err != nil || !res.Sent() { _ = sc.statsd.Incr("apns.notification.errors", []string{}, 1) - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "device#id": watcher.Device.ID, - "err": err, - "status": res.StatusCode, - "reason": res.Reason, - }).Error("failed to send notification") + sc.logger.Error("failed to send notification", + zap.Error(err), + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.String("post#id", post.ID), + zap.String("apns", watcher.Device.APNSToken), + zap.Int("response#status", res.StatusCode), + zap.String("response#reason", res.Reason), + ) } else { _ = sc.statsd.Incr("apns.notification.sent", []string{}, 1) - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "device#id": watcher.Device.ID, - "device#token": watcher.Device.APNSToken, - }).Info("sent notification") + sc.logger.Info("sent notification", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.String("post#id", post.ID), + zap.String("device#token", watcher.Device.APNSToken), + ) } } } - sc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - }).Debug("finishing job") + sc.logger.Debug("finishing job", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + ) } func payloadFromPost(post *reddit.Thing) *payload.Payload { diff --git a/internal/worker/trending.go b/internal/worker/trending.go index 939b9e0..94691d3 100644 --- a/internal/worker/trending.go +++ b/internal/worker/trending.go @@ -15,7 +15,7 @@ import ( "github.com/sideshow/apns2" "github.com/sideshow/apns2/payload" "github.com/sideshow/apns2/token" - "github.com/sirupsen/logrus" + "go.uber.org/zap" "github.com/christianselig/apollo-backend/internal/domain" "github.com/christianselig/apollo-backend/internal/reddit" @@ -25,7 +25,7 @@ import ( type trendingWorker struct { context.Context - logger *logrus.Logger + logger *zap.Logger statsd *statsd.Client redis *redis.Client queue rmq.Connection @@ -42,7 +42,7 @@ type trendingWorker struct { const trendingNotificationTitleFormat = "🔥 r/%s Trending" -func NewTrendingWorker(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { +func NewTrendingWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { reddit := reddit.NewClient( os.Getenv("REDDIT_CLIENT_ID"), os.Getenv("REDDIT_CLIENT_SECRET"), @@ -88,9 +88,7 @@ func (tw *trendingWorker) Start() error { return err } - tw.logger.WithFields(logrus.Fields{ - "numConsumers": tw.consumers, - }).Info("starting up trending worker") + tw.logger.Info("starting up trending subreddits worker", zap.Int("consumers", tw.consumers)) prefetchLimit := int64(tw.consumers * 2) @@ -134,44 +132,38 @@ func NewTrendingConsumer(tw *trendingWorker, tag int) *trendingConsumer { } func (tc *trendingConsumer) Consume(delivery rmq.Delivery) { - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": delivery.Payload(), - }).Debug("starting job") - id, err := strconv.ParseInt(delivery.Payload(), 10, 64) if err != nil { - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": delivery.Payload(), - "err": err, - }).Error("failed to parse subreddit ID") - + tc.logger.Error("failed to parse subreddit id from payload", zap.Error(err), zap.String("payload", delivery.Payload())) _ = delivery.Reject() return } + tc.logger.Debug("starting job", zap.Int64("subreddit#id", id)) + defer func() { _ = delivery.Ack() }() subreddit, err := tc.subredditRepo.GetByID(tc, id) if err != nil { - tc.logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed to fetch subreddit from database") + tc.logger.Error("failed to fetch subreddit from database", zap.Error(err), zap.Int64("subreddit#id", id)) return } watchers, err := tc.watcherRepo.GetByTrendingSubredditID(tc, subreddit.ID) if err != nil { - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "err": err, - }).Error("failed to fetch watchers from database") + tc.logger.Error("failed to fetch watchers from database", + zap.Error(err), + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + ) return } if len(watchers) == 0 { - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - }).Debug("no watchers for trending, finishing job") + tc.logger.Debug("no watchers for subreddit, bailing early", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + ) return } @@ -182,39 +174,44 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) { tps, err := rac.SubredditTop(tc, subreddit.Name, reddit.WithQuery("t", "week")) if err != nil { - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - "err": err, - }).Error("failed to fetch month's top posts") + tc.logger.Error("failed to fetch month's top posts", + zap.Error(err), + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + ) return } - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - "count": tps.Count, - }).Debug("loaded month's hot posts") + + tc.logger.Debug("loaded month's top posts", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.Int("count", tps.Count), + ) if tps.Count == 0 { - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - }).Debug("no top posts for subreddit, returning") + tc.logger.Debug("no top posts, bailing early", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + ) return } if tps.Count < 20 { - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - }).Debug("not enough posts, returning") + tc.logger.Debug("no top posts, bailing early", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.Int("count", tps.Count), + ) return } middlePost := tps.Count / 2 medianScore := tps.Children[middlePost].Score - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "score": medianScore, - }).Debug("calculated median score") + tc.logger.Debug("calculated median score", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.Int64("score", medianScore), + ) // Grab hot posts and filter out anything that's > 2 days old i = rand.Intn(len(watchers)) @@ -222,18 +219,18 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) { rac = tc.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken) hps, err := rac.SubredditHot(tc, subreddit.Name) if err != nil { - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - "err": err, - }).Error("failed to fetch hot posts") + tc.logger.Error("failed to fetch hot posts", + zap.Error(err), + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + ) return } - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - "count": hps.Count, - }).Debug("loaded hot posts") + tc.logger.Debug("loaded hot posts", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.Int("count", hps.Count), + ) // Trending only counts for posts less than 2 days old threshold := time.Now().Add(-24 * time.Hour * 2) @@ -260,23 +257,24 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) { notified, _ := tc.redis.Get(tc, lockKey).Bool() if notified { - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - "watcher#id": watcher.ID, - "post#id": post.ID, - }).Debug("already notified, skipping") + tc.logger.Debug("already notified, skipping", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.Int64("watcher#id", watcher.ID), + zap.String("post#id", post.ID), + ) continue } tc.redis.SetEX(tc, lockKey, true, 48*time.Hour) if err := tc.watcherRepo.IncrementHits(tc, watcher.ID); err != nil { - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "watcher#id": watcher.ID, - "err": err, - }).Error("could not increment hits") + tc.logger.Error("could not increment hits", + zap.Error(err), + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.Int64("watcher#id", watcher.ID), + ) return } @@ -290,30 +288,31 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) { res, err := client.Push(notification) if err != nil || !res.Sent() { _ = tc.statsd.Incr("apns.notification.errors", []string{}, 1) - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "post#id": post.ID, - "device#id": watcher.Device.ID, - "err": err, - "status": res.StatusCode, - "reason": res.Reason, - }).Error("failed to send notification") + tc.logger.Error("failed to send notification", + zap.Error(err), + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.String("post#id", post.ID), + zap.String("apns", watcher.Device.APNSToken), + zap.Int("response#status", res.StatusCode), + zap.String("response#reason", res.Reason), + ) } else { _ = tc.statsd.Incr("apns.notification.sent", []string{}, 1) - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "post#id": post.ID, - "device#id": watcher.Device.ID, - "device#token": watcher.Device.APNSToken, - }).Info("sent notification") + tc.logger.Info("sent notification", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + zap.String("post#id", post.ID), + zap.String("device#token", watcher.Device.APNSToken), + ) } } } - tc.logger.WithFields(logrus.Fields{ - "subreddit#id": subreddit.ID, - "subreddit#name": subreddit.Name, - }).Debug("finishing job") + tc.logger.Debug("finishing job", + zap.Int64("subreddit#id", id), + zap.String("subreddit#name", subreddit.NormalizedName()), + ) } func payloadFromTrendingPost(post *reddit.Thing) *payload.Payload { diff --git a/internal/worker/users.go b/internal/worker/users.go index 52e228e..fdcfab1 100644 --- a/internal/worker/users.go +++ b/internal/worker/users.go @@ -15,7 +15,7 @@ import ( "github.com/sideshow/apns2" "github.com/sideshow/apns2/payload" "github.com/sideshow/apns2/token" - "github.com/sirupsen/logrus" + "go.uber.org/zap" "github.com/christianselig/apollo-backend/internal/domain" "github.com/christianselig/apollo-backend/internal/reddit" @@ -25,7 +25,7 @@ import ( type usersWorker struct { context.Context - logger *logrus.Logger + logger *zap.Logger statsd *statsd.Client db *pgxpool.Pool redis *redis.Client @@ -43,7 +43,7 @@ type usersWorker struct { const userNotificationTitleFormat = "👨\u200d🚀 %s" -func NewUsersWorker(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { +func NewUsersWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { reddit := reddit.NewClient( os.Getenv("REDDIT_CLIENT_ID"), os.Getenv("REDDIT_CLIENT_SECRET"), @@ -90,9 +90,7 @@ func (uw *usersWorker) Start() error { return err } - uw.logger.WithFields(logrus.Fields{ - "numConsumers": uw.consumers, - }).Info("starting up users worker") + uw.logger.Info("starting up subreddits worker", zap.Int("consumers", uw.consumers)) prefetchLimit := int64(uw.consumers * 2) @@ -136,44 +134,38 @@ func NewUsersConsumer(uw *usersWorker, tag int) *usersConsumer { } func (uc *usersConsumer) Consume(delivery rmq.Delivery) { - uc.logger.WithFields(logrus.Fields{ - "user#id": delivery.Payload(), - }).Debug("starting job") - id, err := strconv.ParseInt(delivery.Payload(), 10, 64) if err != nil { - uc.logger.WithFields(logrus.Fields{ - "user#id": delivery.Payload(), - "err": err, - }).Error("failed to parse user ID") - + uc.logger.Error("failed to parse subreddit id from payload", zap.Error(err), zap.String("payload", delivery.Payload())) _ = delivery.Reject() return } + uc.logger.Debug("starting job", zap.Int64("subreddit#id", id)) + defer func() { _ = delivery.Ack() }() user, err := uc.userRepo.GetByID(uc, id) if err != nil { - uc.logger.WithFields(logrus.Fields{ - "err": err, - }).Error("failed to fetch user from database") + uc.logger.Error("failed to fetch user from database", zap.Error(err), zap.Int64("subreddit#id", id)) return } watchers, err := uc.watcherRepo.GetByUserID(uc, user.ID) if err != nil { - uc.logger.WithFields(logrus.Fields{ - "user#id": user.ID, - "err": err, - }).Error("failed to fetch watchers from database") + uc.logger.Error("failed to fetch watchers from database", + zap.Error(err), + zap.Int64("user#id", id), + zap.String("user#name", user.NormalizedName()), + ) return } if len(watchers) == 0 { - uc.logger.WithFields(logrus.Fields{ - "user#id": user.ID, - }).Info("no watchers for user, skipping") + uc.logger.Debug("no watchers for user, bailing early", + zap.Int64("user#id", id), + zap.String("user#name", user.NormalizedName()), + ) return } @@ -186,41 +178,46 @@ func (uc *usersConsumer) Consume(delivery rmq.Delivery) { ru, err := rac.UserAbout(uc, user.Name) if err != nil { - uc.logger.WithFields(logrus.Fields{ - "user#id": user.ID, - "err": err, - }).Error("failed to fetch user details") + uc.logger.Error("failed to fetch user details", + zap.Error(err), + zap.Int64("user#id", id), + zap.String("user#name", user.NormalizedName()), + ) return } if !ru.AcceptFollowers { - uc.logger.WithFields(logrus.Fields{ - "user#id": user.ID, - }).Info("user disabled followers, removing") + uc.logger.Info("user disabled followers, removing", + zap.Int64("user#id", id), + zap.String("user#name", user.NormalizedName()), + ) if err := uc.watcherRepo.DeleteByTypeAndWatcheeID(uc, domain.UserWatcher, user.ID); err != nil { - uc.logger.WithFields(logrus.Fields{ - "user#id": user.ID, - "err": err, - }).Error("failed to delete watchers for user who does not allow followers") + uc.logger.Error("failed to remove watchers for user who disallows followers", + zap.Error(err), + zap.Int64("user#id", id), + zap.String("user#name", user.NormalizedName()), + ) return } if err := uc.userRepo.Delete(uc, user.ID); err != nil { - uc.logger.WithFields(logrus.Fields{ - "user#id": user.ID, - "err": err, - }).Error("failed to delete user") + uc.logger.Error("failed to remove user", + zap.Error(err), + zap.Int64("user#id", id), + zap.String("user#name", user.NormalizedName()), + ) return } } posts, err := rac.UserPosts(uc, user.Name) if err != nil { - uc.logger.WithFields(logrus.Fields{ - "user#id": user.ID, - "err": err, - }).Error("failed to fetch user activity") + uc.logger.Error("failed to fetch user activity", + zap.Error(err), + zap.Int64("user#id", id), + zap.String("user#name", user.NormalizedName()), + ) return } @@ -261,11 +258,12 @@ func (uc *usersConsumer) Consume(delivery rmq.Delivery) { for _, watcher := range notifs { if err := uc.watcherRepo.IncrementHits(uc, watcher.ID); err != nil { - uc.logger.WithFields(logrus.Fields{ - "user#id": user.ID, - "watcher#id": watcher.ID, - "err": err, - }).Error("could not increment hits") + uc.logger.Error("failed to increment watcher hits", + zap.Error(err), + zap.Int64("user#id", id), + zap.String("user#name", user.NormalizedName()), + zap.Int64("watcher#id", watcher.ID), + ) return } @@ -285,28 +283,31 @@ func (uc *usersConsumer) Consume(delivery rmq.Delivery) { res, err := client.Push(notification) if err != nil || !res.Sent() { _ = uc.statsd.Incr("apns.notification.errors", []string{}, 1) - uc.logger.WithFields(logrus.Fields{ - "user#id": user.ID, - "device#id": device.ID, - "err": err, - "status": res.StatusCode, - "reason": res.Reason, - }).Error("failed to send notification") + uc.logger.Error("failed to send notification", + zap.Error(err), + zap.Int64("user#id", id), + zap.String("user#name", user.NormalizedName()), + zap.String("post#id", post.ID), + zap.String("apns", watcher.Device.APNSToken), + zap.Int("response#status", res.StatusCode), + zap.String("response#reason", res.Reason), + ) } else { _ = uc.statsd.Incr("apns.notification.sent", []string{}, 1) - uc.logger.WithFields(logrus.Fields{ - "user#id": user.ID, - "device#id": device.ID, - "device#token": device.APNSToken, - }).Info("sent notification") + uc.logger.Info("sent notification", + zap.Int64("user#id", id), + zap.String("user#name", user.NormalizedName()), + zap.String("post#id", post.ID), + zap.String("device#token", watcher.Device.APNSToken), + ) } } } - uc.logger.WithFields(logrus.Fields{ - "user#id": user.ID, - "user#name": user.Name, - }).Debug("finishing job") + uc.logger.Debug("finishing job", + zap.Int64("user#id", id), + zap.String("user#name", user.NormalizedName()), + ) } func payloadFromUserPost(post *reddit.Thing) *payload.Payload { diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 4dfd038..f7b31ac 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -8,12 +8,12 @@ import ( "github.com/adjust/rmq/v4" "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v4/pgxpool" - "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const pollDuration = 100 * time.Millisecond -type NewWorkerFn func(context.Context, *logrus.Logger, *statsd.Client, *pgxpool.Pool, *redis.Client, rmq.Connection, int) Worker +type NewWorkerFn func(context.Context, *zap.Logger, *statsd.Client, *pgxpool.Pool, *redis.Client, rmq.Connection, int) Worker type Worker interface { Start() error Stop()