migrate out of logrus (#76)

This commit is contained in:
André Medeiros 2022-05-23 14:17:25 -04:00 committed by GitHub
parent ccba530255
commit b1f266bf91
17 changed files with 522 additions and 596 deletions

6
go.mod
View file

@ -17,11 +17,11 @@ require (
github.com/jackc/pgx/v4 v4.16.0
github.com/joho/godotenv v1.4.0
github.com/sideshow/apns2 v0.23.0
github.com/sirupsen/logrus v1.8.1
github.com/smtp2go-oss/smtp2go-go v1.0.1
github.com/spf13/cobra v1.4.0
github.com/stretchr/testify v1.7.1
github.com/valyala/fastjson v1.6.3
go.uber.org/zap v1.13.0
)
require (
@ -45,10 +45,14 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/atomic v1.6.0 // indirect
go.uber.org/multierr v1.5.0 // indirect
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect
golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect
golang.org/x/net v0.0.0-20220403103023-749bd193bc2b // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)

12
go.sum
View file

@ -2,6 +2,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
contrib.go.opencensus.io/exporter/ocagent v0.6.0/go.mod h1:zmKjrJcdo0aYcVS7bmEeSEBLPA9YJp5bjrofdU3pIXs=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/datadog-go v4.8.3+incompatible h1:fNGaYSuObuQb5nzeTQqowRAd9bpDIRRV4/gUtIBjh8Q=
github.com/DataDog/datadog-go v4.8.3+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
@ -277,8 +278,6 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smtp2go-oss/smtp2go-go v1.0.1 h1:rwcoNLjOyigOzCjKp/guylKY/xJpoeypSxgtcC/g6DA=
@ -345,13 +344,17 @@ go.opentelemetry.io/proto/otlp v0.9.0/go.mod h1:1vKfU9rv61e9EVGthD1zNvUbiwPcimSs
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.13.0 h1:nR6NoDBgAf67s68NhaXbsojM+2gxp3S1hWkHDl27pVU=
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
golang.org/x/crypto v0.0.0-20170512130425-ab89591268e0/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@ -372,9 +375,11 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -475,12 +480,14 @@ golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e h1:4nW4NLDYnU28ojHaHO8OVxFHk/aQ33U01a9cjED+pzE=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.0.0-20190502212712-4a2eb0188cbc/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
@ -556,5 +563,6 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
rsc.io/goversion v1.2.0/go.mod h1:Eih9y/uIBS3ulggl7KNJ09xGSLcuNaLgmvvqa07sgfo=

View file

@ -8,7 +8,7 @@ import (
"time"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
"github.com/christianselig/apollo-backend/internal/domain"
"github.com/christianselig/apollo-backend/internal/reddit"
@ -142,8 +142,8 @@ func (a *api) upsertAccountsHandler(w http.ResponseWriter, r *http.Request) {
for _, acc := range raccs {
delete(accsMap, acc.NormalizedUsername())
ac := a.reddit.NewAuthenticatedClient(reddit.SkipRateLimiting, acc.RefreshToken, acc.AccessToken)
tokens, err := ac.RefreshTokens(ctx)
rac := a.reddit.NewAuthenticatedClient(reddit.SkipRateLimiting, acc.RefreshToken, acc.AccessToken)
tokens, err := rac.RefreshTokens(ctx)
if err != nil {
a.errorResponse(w, r, 422, err)
return
@ -154,8 +154,8 @@ func (a *api) upsertAccountsHandler(w http.ResponseWriter, r *http.Request) {
acc.RefreshToken = tokens.RefreshToken
acc.AccessToken = tokens.AccessToken
ac = a.reddit.NewAuthenticatedClient(reddit.SkipRateLimiting, acc.RefreshToken, acc.AccessToken)
me, err := ac.Me(ctx)
rac = a.reddit.NewAuthenticatedClient(reddit.SkipRateLimiting, tokens.RefreshToken, tokens.AccessToken)
me, err := rac.Me(ctx)
if err != nil {
a.errorResponse(w, r, 422, err)
@ -183,29 +183,26 @@ func (a *api) upsertAccountsHandler(w http.ResponseWriter, r *http.Request) {
}
for _, acc := range accsMap {
fmt.Println(acc.NormalizedUsername())
_ = a.accountRepo.Disassociate(ctx, &acc, &dev)
}
body := fmt.Sprintf(`{"apns_token": "%s"}`, apns)
req, err := http.NewRequestWithContext(ctx, "POST", "https://apollopushserver.xyz/api/new-server-addition", strings.NewReader(body))
req.Header.Set("Authorization", "Bearer 98g5j89aurqwfcsp9khlnvgd38fa15")
if err != nil {
a.logger.WithFields(logrus.Fields{
"apns": apns,
}).Error(err)
a.logger.Error("could not setup request to disassociate from legacy api", zap.Error(err), zap.String("apns", apns))
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusOK)
req.Header.Set("Authorization", "Bearer 98g5j89aurqwfcsp9khlnvgd38fa15")
resp, _ := a.httpClient.Do(req)
if err != nil {
a.logger.WithFields(logrus.Fields{"err": err}).Error("failed to remove old client")
a.logger.Error("failed to remove from old notification server", zap.Error(err), zap.String("apns", apns))
return
}
resp.Body.Close()
w.WriteHeader(http.StatusOK)
}
func (a *api) upsertAccountHandler(w http.ResponseWriter, r *http.Request) {
@ -216,9 +213,7 @@ func (a *api) upsertAccountHandler(w http.ResponseWriter, r *http.Request) {
var acct domain.Account
if err := json.NewDecoder(r.Body).Decode(&acct); err != nil {
a.logger.WithFields(logrus.Fields{
"err": err,
}).Info("failed to parse request json")
a.logger.Error("failed to parse request json", zap.Error(err))
a.errorResponse(w, r, 422, err)
return
}
@ -227,9 +222,7 @@ func (a *api) upsertAccountHandler(w http.ResponseWriter, r *http.Request) {
ac := a.reddit.NewAuthenticatedClient(reddit.SkipRateLimiting, acct.RefreshToken, acct.AccessToken)
tokens, err := ac.RefreshTokens(ctx)
if err != nil {
a.logger.WithFields(logrus.Fields{
"err": err,
}).Info("failed to refresh token")
a.logger.Error("failed to refresh token", zap.Error(err))
a.errorResponse(w, r, 422, err)
return
}
@ -243,16 +236,14 @@ func (a *api) upsertAccountHandler(w http.ResponseWriter, r *http.Request) {
me, err := ac.Me(ctx)
if err != nil {
a.logger.WithFields(logrus.Fields{
"err": err,
}).Info("failed to grab user details from Reddit")
a.logger.Error("failed to grab user details from reddit", zap.Error(err))
a.errorResponse(w, r, 500, err)
return
}
if me.NormalizedUsername() != acct.NormalizedUsername() {
err := fmt.Errorf("wrong user: expected %s, got %s", me.NormalizedUsername(), acct.NormalizedUsername())
a.logger.WithFields(logrus.Fields{"err": err}).Warn("user is not who they say they are")
a.logger.Warn("user is not who they say they are", zap.Error(err))
a.errorResponse(w, r, 401, err)
return
}
@ -263,26 +254,20 @@ func (a *api) upsertAccountHandler(w http.ResponseWriter, r *http.Request) {
// Associate
dev, err := a.deviceRepo.GetByAPNSToken(ctx, vars["apns"])
if err != nil {
a.logger.WithFields(logrus.Fields{
"err": err,
}).Info("failed fetching device from database")
a.logger.Error("failed to fetch device from database", zap.Error(err))
a.errorResponse(w, r, 500, err)
return
}
// Upsert account
if err := a.accountRepo.CreateOrUpdate(ctx, &acct); err != nil {
a.logger.WithFields(logrus.Fields{
"err": err,
}).Info("failed updating account in database")
a.logger.Error("failed to update account", zap.Error(err))
a.errorResponse(w, r, 500, err)
return
}
if err := a.accountRepo.Associate(ctx, &acct, &dev); err != nil {
a.logger.WithFields(logrus.Fields{
"err": err,
}).Info("failed associating account with device")
a.logger.Error("failed to associate account with device", zap.Error(err))
a.errorResponse(w, r, 500, err)
return
}

View file

@ -14,7 +14,7 @@ import (
"github.com/gorilla/mux"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sideshow/apns2/token"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
"github.com/christianselig/apollo-backend/internal/domain"
"github.com/christianselig/apollo-backend/internal/reddit"
@ -22,7 +22,7 @@ import (
)
type api struct {
logger *logrus.Logger
logger *zap.Logger
statsd *statsd.Client
reddit *reddit.Client
apns *token.Token
@ -35,7 +35,7 @@ type api struct {
userRepo domain.UserRepository
}
func NewAPI(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, redis *redis.Client, pool *pgxpool.Pool) *api {
func NewAPI(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, redis *redis.Client, pool *pgxpool.Pool) *api {
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
@ -178,20 +178,20 @@ func (a *api) loggingMiddleware(next http.Handler) http.Handler {
}
}
logEntry := a.logger.WithFields(logrus.Fields{
"duration": time.Since(start).Milliseconds(),
"method": r.Method,
"remote#addr": remoteAddr,
"response#bytes": lrw.bytes,
"status": lrw.statusCode,
"uri": r.RequestURI,
})
fields := []zap.Field{
zap.Int64("duration", time.Since(start).Milliseconds()),
zap.String("method", r.Method),
zap.String("remote#addr", remoteAddr),
zap.Int("response#bytes", lrw.bytes),
zap.Int("status", lrw.statusCode),
zap.String("uri", r.RequestURI),
}
if lrw.statusCode == 200 {
logEntry.Info()
a.logger.Info("", fields...)
} else {
err := lrw.Header().Get("X-Apollo-Error")
logEntry.Error(err)
a.logger.Error(err, fields...)
}
})
}

View file

@ -11,7 +11,7 @@ import (
"github.com/gorilla/mux"
"github.com/sideshow/apns2"
"github.com/sideshow/apns2/payload"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
"github.com/christianselig/apollo-backend/internal/domain"
)
@ -46,9 +46,7 @@ func (a *api) testDeviceHandler(w http.ResponseWriter, r *http.Request) {
d, err := a.deviceRepo.GetByAPNSToken(ctx, tok)
if err != nil {
a.logger.WithFields(logrus.Fields{
"err": err,
}).Info("failed fetching device from database")
a.logger.Error("failed fetching device from database", zap.Error(err))
a.errorResponse(w, r, 500, err)
return
}
@ -83,9 +81,7 @@ func (a *api) testDeviceHandler(w http.ResponseWriter, r *http.Request) {
}
if _, err := client.Push(notification); err != nil {
a.logger.WithFields(logrus.Fields{
"err": err,
}).Info("failed to send test notification")
a.logger.Info("failed to send test notification", zap.Error(err))
a.errorResponse(w, r, 500, err)
return
}

View file

@ -7,7 +7,7 @@ import (
"github.com/gorilla/mux"
"github.com/sideshow/apns2"
"github.com/sideshow/apns2/payload"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
)
const (
@ -31,9 +31,7 @@ func generateNotificationTester(a *api, fun notificationGenerator) func(w http.R
d, err := a.deviceRepo.GetByAPNSToken(ctx, tok)
if err != nil {
a.logger.WithFields(logrus.Fields{
"err": err,
}).Info("failed fetching device from database")
a.logger.Info("failed fetching device from database", zap.Error(err))
a.errorResponse(w, r, 500, err)
return
}
@ -56,9 +54,7 @@ func generateNotificationTester(a *api, fun notificationGenerator) func(w http.R
}
if _, err := client.Push(notification); err != nil {
a.logger.WithFields(logrus.Fields{
"err": err,
}).Info("failed to send test notification")
a.logger.Info("failed to send test notification", zap.Error(err))
a.errorResponse(w, r, 500, err)
return
}

View file

@ -7,7 +7,7 @@ import (
"time"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
"github.com/christianselig/apollo-backend/internal/domain"
"github.com/christianselig/apollo-backend/internal/itunes"
@ -23,9 +23,7 @@ func (a *api) checkReceiptHandler(w http.ResponseWriter, r *http.Request) {
iapr, err := itunes.NewIAPResponse(string(body), true)
if err != nil {
a.logger.WithFields(logrus.Fields{
"err": err,
}).Info("failed verifying receipt")
a.logger.Info("failed to verify receipt", zap.Error(err))
a.errorResponse(w, r, 500, err)
return
}

View file

@ -7,8 +7,8 @@ import (
"github.com/christianselig/apollo-backend/internal/api"
"github.com/christianselig/apollo-backend/internal/cmdutil"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"go.uber.org/zap"
)
func APICmd(ctx context.Context) *cobra.Command {
@ -24,7 +24,8 @@ func APICmd(ctx context.Context) *cobra.Command {
port, _ = strconv.Atoi(os.Getenv("PORT"))
}
logger := cmdutil.NewLogrusLogger(false)
logger := cmdutil.NewLogger(false)
defer func() { _ = logger.Sync() }()
statsd, err := cmdutil.NewStatsdClient()
if err != nil {
@ -49,9 +50,7 @@ func APICmd(ctx context.Context) *cobra.Command {
go func() { _ = srv.ListenAndServe() }()
logger.WithFields(logrus.Fields{
"port": port,
}).Info("started api")
logger.Info("started api", zap.Int("port", port))
<-ctx.Done()

View file

@ -13,8 +13,8 @@ import (
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"go.uber.org/zap"
"github.com/christianselig/apollo-backend/internal/cmdutil"
"github.com/christianselig/apollo-backend/internal/domain"
@ -29,7 +29,8 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
Args: cobra.ExactArgs(0),
Short: "Schedules jobs and runs several maintenance tasks periodically.",
RunE: func(cmd *cobra.Command, args []string) error {
logger := cmdutil.NewLogrusLogger(false)
logger := cmdutil.NewLogger(false)
defer func() { _ = logger.Sync() }()
statsd, err := cmdutil.NewStatsdClient()
if err != nil {
@ -126,71 +127,56 @@ func evalScript(ctx context.Context, redis *redis.Client) (string, error) {
return redis.ScriptLoad(ctx, lua).Result()
}
func pruneAccounts(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) {
func pruneAccounts(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool) {
expiry := time.Now().Add(-domain.StaleTokenThreshold)
ar := repository.NewPostgresAccount(pool)
stale, err := ar.PruneStale(ctx, expiry)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed cleaning stale accounts")
logger.Error("failed to clean stale accounts", zap.Error(err))
return
}
orphaned, err := ar.PruneOrphaned(ctx)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed cleaning orphaned accounts")
logger.Error("failed to clean orphaned accounts", zap.Error(err))
return
}
if count := stale + orphaned; count > 0 {
logger.WithFields(logrus.Fields{
"stale": stale,
"orphaned": orphaned,
}).Info("pruned accounts")
logger.Info("pruned accounts", zap.Int64("stale", stale), zap.Int64("orphaned", orphaned))
}
}
func pruneDevices(ctx context.Context, logger *logrus.Logger, pool *pgxpool.Pool) {
func pruneDevices(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool) {
now := time.Now()
dr := repository.NewPostgresDevice(pool)
count, err := dr.PruneStale(ctx, now)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed cleaning stale devices")
logger.Error("failed to clean stale devices", zap.Error(err))
return
}
if count > 0 {
logger.WithFields(logrus.Fields{
"count": count,
}).Info("pruned devices")
logger.Info("pruned devices", zap.Int64("count", count))
}
}
func cleanQueues(logger *logrus.Logger, jobsConn rmq.Connection) {
func cleanQueues(logger *zap.Logger, jobsConn rmq.Connection) {
cleaner := rmq.NewCleaner(jobsConn)
count, err := cleaner.Clean()
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed cleaning jobs from queues")
logger.Error("failed to clean jobs from queues", zap.Error(err))
return
}
if count > 0 {
logger.WithFields(logrus.Fields{
"count": count,
}).Info("returned jobs to queues")
logger.Info("returned jobs to queues", zap.Int64("count", count))
}
}
func reportStats(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool) {
func reportStats(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool) {
var (
count int64
@ -209,14 +195,11 @@ func reportStats(ctx context.Context, logger *logrus.Logger, statsd *statsd.Clie
_ = pool.QueryRow(ctx, metric.query).Scan(&count)
_ = statsd.Gauge(metric.name, float64(count), []string{}, 1)
logger.WithFields(logrus.Fields{
"count": count,
"metric": metric.name,
}).Debug("fetched metrics")
logger.Debug("fetched metrics", zap.String("metric", metric.name), zap.Int64("count", count))
}
}
func enqueueUsers(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) {
func enqueueUsers(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) {
now := time.Now()
next := now.Add(domain.NotificationCheckInterval)
@ -255,9 +238,7 @@ func enqueueUsers(ctx context.Context, logger *logrus.Logger, statsd *statsd.Cli
})
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed to fetch batch of users")
logger.Error("failed to fetch batch of users", zap.Error(err))
return
}
@ -265,10 +246,7 @@ func enqueueUsers(ctx context.Context, logger *logrus.Logger, statsd *statsd.Cli
return
}
logger.WithFields(logrus.Fields{
"count": len(ids),
"start": now,
}).Debug("enqueueing user batch")
logger.Debug("enqueueing user batch", zap.Int("count", len(ids)), zap.Time("start", now))
batchIds := make([]string, len(ids))
for i, id := range ids {
@ -276,13 +254,11 @@ func enqueueUsers(ctx context.Context, logger *logrus.Logger, statsd *statsd.Cli
}
if err = queue.Publish(batchIds...); err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed to enqueue user")
logger.Error("failed to enqueue user batch", zap.Error(err))
}
}
func enqueueSubreddits(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queues []rmq.Queue) {
func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queues []rmq.Queue) {
now := time.Now()
next := now.Add(domain.SubredditCheckInterval)
@ -321,9 +297,7 @@ func enqueueSubreddits(ctx context.Context, logger *logrus.Logger, statsd *stats
})
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed to fetch batch of subreddits")
logger.Error("failed to fetch batch of subreddits", zap.Error(err))
return
}
@ -331,10 +305,7 @@ func enqueueSubreddits(ctx context.Context, logger *logrus.Logger, statsd *stats
return
}
logger.WithFields(logrus.Fields{
"count": len(ids),
"start": now,
}).Debug("enqueueing subreddit batch")
logger.Debug("enqueueing subreddit batch", zap.Int("count", len(ids)), zap.Time("start", now))
batchIds := make([]string, len(ids))
for i, id := range ids {
@ -343,16 +314,13 @@ func enqueueSubreddits(ctx context.Context, logger *logrus.Logger, statsd *stats
for _, queue := range queues {
if err = queue.Publish(batchIds...); err != nil {
logger.WithFields(logrus.Fields{
"queue": queue,
"err": err,
}).Error("failed to enqueue subreddit")
logger.Error("failed to enqueue subreddit batch", zap.Error(err))
}
}
}
func enqueueStuckAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) {
func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) {
now := time.Now()
next := now.Add(domain.StuckNotificationCheckInterval)
@ -391,9 +359,7 @@ func enqueueStuckAccounts(ctx context.Context, logger *logrus.Logger, statsd *st
})
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed to fetch possible stuck accounts")
logger.Error("failed to fetch accounts", zap.Error(err))
return
}
@ -401,10 +367,7 @@ func enqueueStuckAccounts(ctx context.Context, logger *logrus.Logger, statsd *st
return
}
logger.WithFields(logrus.Fields{
"count": len(ids),
"start": now,
}).Debug("enqueueing stuck account batch")
logger.Debug("enqueueing stuck account batch", zap.Int("count", len(ids)), zap.Time("start", now))
batchIds := make([]string, len(ids))
for i, id := range ids {
@ -412,14 +375,11 @@ func enqueueStuckAccounts(ctx context.Context, logger *logrus.Logger, statsd *st
}
if err = queue.Publish(batchIds...); err != nil {
logger.WithFields(logrus.Fields{
"queue": queue,
"err": err,
}).Error("failed to enqueue stuck accounts")
logger.Error("failed to enqueue stuck account batch", zap.Error(err))
}
}
func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) {
func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) {
now := time.Now()
next := now.Add(domain.NotificationCheckInterval)
@ -461,9 +421,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
})
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed to fetch batch of accounts")
logger.Error("failed to fetch batch of accounts", zap.Error(err))
return
}
@ -471,10 +429,8 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
return
}
logger.WithFields(logrus.Fields{
"count": len(ids),
"start": now,
}).Debug("enqueueing account batch")
logger.Debug("enqueueing account batch", zap.Int("count", len(ids)), zap.Time("start", now))
// Split ids in batches
for i := 0; i < len(ids); i += batchSize {
j := i + batchSize
@ -483,16 +439,11 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
}
batch := Int64Slice(ids[i:j])
logger.WithFields(logrus.Fields{
"len": len(batch),
}).Debug("enqueueing batch")
logger.Debug("enqueueing batch", zap.Int("len", len(batch)))
res, err := redisConn.EvalSha(ctx, luaSha, []string{"locks:accounts"}, batch).Result()
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed to check for locked accounts")
logger.Error("failed to check for locked accounts", zap.Error(err))
}
vals := res.([]interface{})
@ -509,17 +460,11 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
}
if err = queue.Publish(batchIds...); err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed to enqueue account")
logger.Error("failed to enqueue account batch", zap.Error(err))
}
}
logger.WithFields(logrus.Fields{
"count": enqueued,
"skipped": skipped,
"start": now,
}).Debug("done enqueueing account batch")
logger.Debug("done enqueueing account batch", zap.Int("count", enqueued), zap.Int("skipped", skipped), zap.Time("start", now))
}
type Int64Slice []int64

View file

@ -34,7 +34,8 @@ func WorkerCmd(ctx context.Context) *cobra.Command {
return fmt.Errorf("need a queue to work on")
}
logger := cmdutil.NewLogrusLogger(false)
logger := cmdutil.NewLogger(false)
defer func() { _ = logger.Sync() }()
tag := fmt.Sprintf("worker:%s", queueID)
statsd, err := cmdutil.NewStatsdClient(tag)

View file

@ -9,19 +9,13 @@ import (
"github.com/adjust/rmq/v4"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
)
func NewLogrusLogger(debug bool) *logrus.Logger {
logger := logrus.New()
if debug || os.Getenv("ENV") == "" {
logger.SetLevel(logrus.DebugLevel)
} else {
logger.SetFormatter(&logrus.TextFormatter{
DisableColors: true,
FullTimestamp: true,
})
func NewLogger(debug bool) *zap.Logger {
logger, _ := zap.NewProduction()
if debug || os.Getenv("ENV") != "production" {
logger, _ = zap.NewDevelopment()
}
return logger
@ -73,13 +67,11 @@ func NewDatabasePool(ctx context.Context, maxConns int) (*pgxpool.Pool, error) {
return pgxpool.ConnectConfig(ctx, config)
}
func NewQueueClient(logger *logrus.Logger, conn *redis.Client, identifier string) (rmq.Connection, error) {
func NewQueueClient(logger *zap.Logger, conn *redis.Client, identifier string) (rmq.Connection, error) {
errChan := make(chan error, 10)
go func() {
for err := range errChan {
logger.WithFields(logrus.Fields{
"err": err,
}).Error("error occured with queue")
logger.Error("error occurred within queue", zap.Error(err))
}
}()

View file

@ -14,7 +14,7 @@ import (
"github.com/sideshow/apns2"
"github.com/sideshow/apns2/payload"
"github.com/sideshow/apns2/token"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
"github.com/christianselig/apollo-backend/internal/domain"
"github.com/christianselig/apollo-backend/internal/reddit"
@ -33,7 +33,7 @@ const (
type notificationsWorker struct {
context.Context
logger *logrus.Logger
logger *zap.Logger
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
@ -47,7 +47,7 @@ type notificationsWorker struct {
deviceRepo domain.DeviceRepository
}
func NewNotificationsWorker(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
func NewNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
@ -92,9 +92,7 @@ func (nw *notificationsWorker) Start() error {
return err
}
nw.logger.WithFields(logrus.Fields{
"numConsumers": nw.consumers,
}).Info("starting up notifications worker")
nw.logger.Info("starting up notifications worker", zap.Int("consumers", nw.consumers))
prefetchLimit := int64(nw.consumers * 2)
@ -139,40 +137,28 @@ func NewNotificationsConsumer(nw *notificationsWorker, tag int) *notificationsCo
func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
defer func() {
lockKey := fmt.Sprintf("locks:accounts:%s", delivery.Payload())
if err := nc.redis.Del(nc, lockKey).Err(); err != nil {
nc.logger.WithFields(logrus.Fields{
"lockKey": lockKey,
"err": err,
}).Error("failed to remove lock")
key := fmt.Sprintf("locks:accounts:%s", delivery.Payload())
if err := nc.redis.Del(nc, key).Err(); err != nil {
nc.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key))
}
}()
nc.logger.WithFields(logrus.Fields{
"account#id": delivery.Payload(),
}).Debug("starting job")
id, err := strconv.ParseInt(delivery.Payload(), 10, 64)
if err != nil {
nc.logger.WithFields(logrus.Fields{
"account#id": delivery.Payload(),
"err": err,
}).Error("failed to parse account ID")
nc.logger.Error("failed to parse account id from payload", zap.Error(err), zap.String("payload", delivery.Payload()))
_ = delivery.Reject()
return
}
nc.logger.Debug("starting job", zap.Int64("account#id", id))
defer func() { _ = delivery.Ack() }()
now := time.Now()
account, err := nc.accountRepo.GetByID(nc, id)
if err != nil {
nc.logger.WithFields(logrus.Fields{
"account#id": id,
"err": err,
}).Error("failed to fetch account from database")
nc.logger.Error("failed to fetch account from database", zap.Error(err), zap.Int64("account#id", id))
return
}
@ -184,35 +170,39 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
defer func(acc *domain.Account) {
if err = nc.accountRepo.Update(nc, acc); err != nil {
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"err": err,
}).Error("failed to update account")
nc.logger.Error("failed to update account",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
}
}(&account)
rac := nc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken)
if account.TokenExpiresAt.Before(now) {
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
}).Debug("refreshing reddit token")
nc.logger.Debug("refreshing reddit token",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
tokens, err := rac.RefreshTokens(nc)
if err != nil {
if err != reddit.ErrOauthRevoked {
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"err": err,
}).Error("failed to refresh reddit tokens")
nc.logger.Error("failed to refresh reddit tokens",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return
}
err = nc.deleteAccount(account)
if err != nil {
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"err": err,
}).Error("failed to remove revoked account")
nc.logger.Error("failed to remove revoked account",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
}
return
@ -234,9 +224,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
_ = nc.statsd.Histogram("apollo.queue.delay", float64(latency.Milliseconds()), []string{}, rate)
}
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
}).Debug("fetching message inbox")
nc.logger.Debug("fetching message inbox", zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()))
opts := []reddit.RequestOption{reddit.WithQuery("limit", "10")}
if account.LastMessageID != "" {
@ -249,38 +237,42 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
case reddit.ErrTimeout, reddit.ErrRateLimited: // Don't log timeouts or rate limits
break
case reddit.ErrOauthRevoked:
err = nc.deleteAccount(account)
if err != nil {
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"err": err,
}).Error("failed to remove revoked account")
return
if err = nc.deleteAccount(account); err != nil {
nc.logger.Error("failed to remove revoked account",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
} else {
nc.logger.Info("removed revoked account",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
}
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
}).Info("removed revoked account")
default:
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"err": err,
}).Error("failed to fetch message inbox")
nc.logger.Error("failed to fetch message inbox",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
}
return
}
// Figure out where we stand
if msgs.Count == 0 {
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
}).Debug("no new messages, bailing early")
nc.logger.Debug("no new messages, bailing early",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return
}
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"count": msgs.Count,
}).Debug("fetched messages")
nc.logger.Debug("fetched messages",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.Int("count", msgs.Count),
)
for _, msg := range msgs.Children {
if !msg.IsDeleted() {
@ -291,25 +283,28 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
// Let's populate this with the latest message so we don't flood users with stuff
if newAccount {
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
}).Debug("populating first message ID to prevent spamming")
nc.logger.Debug("populating first message id to prevent spamming",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return
}
devices, err := nc.deviceRepo.GetInboxNotifiableByAccountID(nc, account.ID)
if err != nil {
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"err": err,
}).Error("failed to fetch account devices")
nc.logger.Error("failed to fetch account devices",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return
}
if len(devices) == 0 {
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
}).Debug("no notifiable devices, finishing job")
nc.logger.Debug("no notifiable devices, bailing early",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return
}
@ -335,21 +330,24 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
res, err := client.Push(notification)
if err != nil || !res.Sent() {
_ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"err": err,
"status": res.StatusCode,
"reason": res.Reason,
}).Error("failed to send notification")
nc.logger.Error("failed to send notification",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("device#token", device.APNSToken),
zap.Int("response#status", res.StatusCode),
zap.String("response#reason", res.Reason),
)
// Delete device as notifications might have been disabled here
_ = nc.deviceRepo.Delete(nc, device.APNSToken)
} else {
_ = nc.statsd.Incr("apns.notification.sent", []string{}, 1)
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"token": device.APNSToken,
}).Info("sent notification")
nc.logger.Info("sent notification",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("device#token", device.APNSToken),
)
}
}
}
@ -357,9 +355,10 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
ev := fmt.Sprintf("Sent notification to /u/%s (x%d)", account.Username, msgs.Count)
_ = nc.statsd.SimpleEvent(ev, "")
nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
}).Debug("finishing job")
nc.logger.Debug("finishing job",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
}
func (nc *notificationsConsumer) deleteAccount(account domain.Account) error {

View file

@ -10,7 +10,7 @@ import (
"github.com/adjust/rmq/v4"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
"github.com/christianselig/apollo-backend/internal/domain"
"github.com/christianselig/apollo-backend/internal/reddit"
@ -20,7 +20,7 @@ import (
type stuckNotificationsWorker struct {
context.Context
logger *logrus.Logger
logger *zap.Logger
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
@ -32,7 +32,7 @@ type stuckNotificationsWorker struct {
accountRepo domain.AccountRepository
}
func NewStuckNotificationsWorker(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
func NewStuckNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
@ -61,9 +61,7 @@ func (snw *stuckNotificationsWorker) Start() error {
return err
}
snw.logger.WithFields(logrus.Fields{
"numConsumers": snw.consumers,
}).Info("starting up stuck notifications worker")
snw.logger.Info("starting up stuck notifications worker", zap.Int("consumers", snw.consumers))
prefetchLimit := int64(snw.consumers * 2)
@ -102,69 +100,67 @@ func NewStuckNotificationsConsumer(snw *stuckNotificationsWorker, tag int) *stuc
}
func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) {
snc.logger.WithFields(logrus.Fields{
"account#id": delivery.Payload(),
}).Debug("starting job")
id, err := strconv.ParseInt(delivery.Payload(), 10, 64)
if err != nil {
snc.logger.WithFields(logrus.Fields{
"account#id": delivery.Payload(),
"err": err,
}).Error("failed to parse account ID")
snc.logger.Error("failed to parse account id from payload", zap.Error(err), zap.String("payload", delivery.Payload()))
_ = delivery.Reject()
return
}
snc.logger.Debug("starting job", zap.Int64("account#id", id))
defer func() { _ = delivery.Ack() }()
account, err := snc.accountRepo.GetByID(snc, id)
if err != nil {
snc.logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed to fetch account from database")
snc.logger.Error("failed to fetch account from database", zap.Error(err), zap.Int64("account#id", id))
return
}
if account.LastMessageID == "" {
snc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
}).Debug("account has no messages, returning")
snc.logger.Debug("account has no messages, bailing early",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return
}
rac := snc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken)
snc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"thing#id": account.LastMessageID,
}).Debug("fetching last thing")
snc.logger.Debug("fetching last thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
kind := account.LastMessageID[:2]
var things *reddit.ListingResponse
if kind == "t4" {
snc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"thing#id": account.LastMessageID,
}).Debug("checking last thing via inbox")
snc.logger.Debug("checking last thing via inbox",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
things, err = rac.MessageInbox(snc)
if err != nil {
if err != reddit.ErrRateLimited {
snc.logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed to fetch last thing via inbox")
snc.logger.Error("failed to fetch last thing via inbox",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
}
return
}
} else {
things, err = rac.AboutInfo(snc, account.LastMessageID)
if err != nil {
snc.logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed to fetch last thing")
snc.logger.Error("failed to fetch last thing",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return
}
}
@ -185,9 +181,11 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) {
sthings, err := rac.MessageInbox(snc)
if err != nil {
snc.logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed to check inbox")
snc.logger.Error("failed to check inbox",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return
}
@ -199,52 +197,59 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) {
}
if !found {
snc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"thing#id": account.LastMessageID,
}).Debug("thing exists, but not in inbox, marking as deleted")
snc.logger.Debug("thing exists, but not on inbox, marking as deleted",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
)
break
}
snc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"thing#id": account.LastMessageID,
}).Debug("thing exists, returning")
snc.logger.Debug("thing exists, bailing early",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
)
return
}
}
snc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"thing#id": account.LastMessageID,
}).Info("thing got deleted, resetting")
snc.logger.Info("thing got deleted, resetting",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
)
if kind != "t4" {
snc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
}).Debug("getting message inbox to determine last good thing")
snc.logger.Debug("getting message inbox to find last good thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
things, err = rac.MessageInbox(snc)
if err != nil {
snc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"err": err,
}).Error("failed to get message inbox")
snc.logger.Error("failed to check inbox",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return
}
}
account.LastMessageID = ""
snc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
}).Debug("calculating last good thing")
snc.logger.Debug("calculating last good thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
for _, thing := range things.Children {
if thing.IsDeleted() {
snc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"thing#id": thing.FullName(),
}).Debug("thing deleted, next")
snc.logger.Debug("thing got deleted, checking next",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", thing.FullName()),
)
continue
}
@ -252,15 +257,17 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) {
break
}
snc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"thing#id": account.LastMessageID,
}).Debug("updating last good thing")
snc.logger.Debug("updating last good thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
)
if err := snc.accountRepo.Update(snc, &account); err != nil {
snc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(),
"err": err,
}).Error("failed to update account's message id")
snc.logger.Error("failed to update account's last message id",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
}
}

View file

@ -16,7 +16,7 @@ import (
"github.com/sideshow/apns2"
"github.com/sideshow/apns2/payload"
"github.com/sideshow/apns2/token"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
"github.com/christianselig/apollo-backend/internal/domain"
"github.com/christianselig/apollo-backend/internal/reddit"
@ -26,7 +26,7 @@ import (
type subredditsWorker struct {
context.Context
logger *logrus.Logger
logger *zap.Logger
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
@ -47,7 +47,7 @@ const (
subredditNotificationBodyFormat = "r/%s: \u201c%s\u201d"
)
func NewSubredditsWorker(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
func NewSubredditsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
@ -94,9 +94,7 @@ func (sw *subredditsWorker) Start() error {
return err
}
sw.logger.WithFields(logrus.Fields{
"numConsumers": sw.consumers,
}).Info("starting up subreddits worker")
sw.logger.Info("starting up subreddits worker", zap.Int("consumers", sw.consumers))
prefetchLimit := int64(sw.consumers * 2)
@ -140,44 +138,38 @@ func NewSubredditsConsumer(sw *subredditsWorker, tag int) *subredditsConsumer {
}
func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": delivery.Payload(),
}).Debug("starting job")
id, err := strconv.ParseInt(delivery.Payload(), 10, 64)
if err != nil {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": delivery.Payload(),
"err": err,
}).Error("failed to parse subreddit ID")
sc.logger.Error("failed to parse subreddit id from payload", zap.Error(err), zap.String("payload", delivery.Payload()))
_ = delivery.Reject()
return
}
sc.logger.Debug("starting job", zap.Int64("subreddit#id", id))
defer func() { _ = delivery.Ack() }()
subreddit, err := sc.subredditRepo.GetByID(sc, id)
if err != nil {
sc.logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed to fetch subreddit from database")
sc.logger.Error("failed to fetch subreddit from database", zap.Error(err), zap.Int64("subreddit#id", id))
return
}
watchers, err := sc.watcherRepo.GetBySubredditID(sc, subreddit.ID)
if err != nil {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"err": err,
}).Error("failed to fetch watchers from database")
sc.logger.Error("failed to fetch watchers from database",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return
}
if len(watchers) == 0 {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
}).Debug("no watchers for subreddit, finishing job")
sc.logger.Debug("no watchers for subreddit, bailing early",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return
}
@ -188,17 +180,17 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
seenPosts := map[string]bool{}
// Load 500 newest posts
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
}).Debug("loading up to 500 new posts")
sc.logger.Debug("loading up to 500 new posts",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
for page := 0; page < 5; page++ {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"page": page,
}).Debug("loading new posts")
sc.logger.Debug("loading new posts",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("page", page),
)
i := rand.Intn(len(watchers))
watcher := watchers[i]
@ -213,19 +205,21 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
)
if err != nil {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"err": err,
}).Error("failed to fetch new posts")
sc.logger.Error("failed fetchint new posts",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("page", page),
)
continue
}
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"count": sps.Count,
"page": page,
}).Debug("loaded new posts for page")
sc.logger.Debug("loaded new posts",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("page", page),
zap.Int("count", sps.Count),
)
// If it's empty, we're done
if sps.Count == 0 {
@ -250,20 +244,20 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
}
if finished {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"page": page,
}).Debug("reached date threshold")
sc.logger.Debug("reached date threshold",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("page", page),
)
break
}
}
// Load hot posts
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
}).Debug("loading hot posts")
sc.logger.Debug("loading hot posts",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
{
i := rand.Intn(len(watchers))
watcher := watchers[i]
@ -276,16 +270,17 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
)
if err != nil {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"err": err,
}).Error("failed to fetch hot posts")
sc.logger.Error("failed to fetch hot posts",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
} else {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"count": sps.Count,
}).Debug("loaded hot posts")
sc.logger.Debug("loaded hot posts",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("count", sps.Count),
)
for _, post := range sps.Children {
if post.CreatedAt.Before(threshold) {
@ -299,11 +294,11 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
}
}
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"count": len(posts),
}).Debug("checking posts for hits")
sc.logger.Debug("checking posts for watcher hits",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("count", len(posts)),
)
for _, post := range posts {
lowcaseAuthor := strings.ToLower(post.Author)
lowcaseTitle := strings.ToLower(post.Title)
@ -344,31 +339,30 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
notified, _ := sc.redis.Get(sc, lockKey).Bool()
if notified {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"watcher#id": watcher.ID,
"post#id": post.ID,
}).Debug("already notified, skipping")
sc.logger.Debug("already notified, skipping",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
zap.String("post#id", post.ID),
)
continue
}
if err := sc.watcherRepo.IncrementHits(sc, watcher.ID); err != nil {
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"watcher#id": watcher.ID,
"err": err,
}).Error("could not increment hits")
sc.logger.Error("could not increment hits",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
)
return
}
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"watcher#id": watcher.ID,
"post#id": post.ID,
}).Debug("got a hit")
sc.logger.Debug("got a hit",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
zap.String("post#id", post.ID),
)
sc.redis.SetEX(sc, lockKey, true, 24*time.Hour)
notifs = append(notifs, watcher)
@ -377,13 +371,12 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
if len(notifs) == 0 {
continue
}
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"post#id": post.ID,
"count": len(notifs),
}).Debug("got hits for post")
sc.logger.Debug("got hits for post",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.String("post#id", post.ID),
zap.Int("count", len(notifs)),
)
payload := payloadFromPost(post)
@ -407,28 +400,31 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
res, err := client.Push(notification)
if err != nil || !res.Sent() {
_ = sc.statsd.Incr("apns.notification.errors", []string{}, 1)
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"device#id": watcher.Device.ID,
"err": err,
"status": res.StatusCode,
"reason": res.Reason,
}).Error("failed to send notification")
sc.logger.Error("failed to send notification",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.String("post#id", post.ID),
zap.String("apns", watcher.Device.APNSToken),
zap.Int("response#status", res.StatusCode),
zap.String("response#reason", res.Reason),
)
} else {
_ = sc.statsd.Incr("apns.notification.sent", []string{}, 1)
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"device#id": watcher.Device.ID,
"device#token": watcher.Device.APNSToken,
}).Info("sent notification")
sc.logger.Info("sent notification",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.String("post#id", post.ID),
zap.String("device#token", watcher.Device.APNSToken),
)
}
}
}
sc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
}).Debug("finishing job")
sc.logger.Debug("finishing job",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
}
func payloadFromPost(post *reddit.Thing) *payload.Payload {

View file

@ -15,7 +15,7 @@ import (
"github.com/sideshow/apns2"
"github.com/sideshow/apns2/payload"
"github.com/sideshow/apns2/token"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
"github.com/christianselig/apollo-backend/internal/domain"
"github.com/christianselig/apollo-backend/internal/reddit"
@ -25,7 +25,7 @@ import (
type trendingWorker struct {
context.Context
logger *logrus.Logger
logger *zap.Logger
statsd *statsd.Client
redis *redis.Client
queue rmq.Connection
@ -42,7 +42,7 @@ type trendingWorker struct {
const trendingNotificationTitleFormat = "🔥 r/%s Trending"
func NewTrendingWorker(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
func NewTrendingWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
@ -88,9 +88,7 @@ func (tw *trendingWorker) Start() error {
return err
}
tw.logger.WithFields(logrus.Fields{
"numConsumers": tw.consumers,
}).Info("starting up trending worker")
tw.logger.Info("starting up trending subreddits worker", zap.Int("consumers", tw.consumers))
prefetchLimit := int64(tw.consumers * 2)
@ -134,44 +132,38 @@ func NewTrendingConsumer(tw *trendingWorker, tag int) *trendingConsumer {
}
func (tc *trendingConsumer) Consume(delivery rmq.Delivery) {
tc.logger.WithFields(logrus.Fields{
"subreddit#id": delivery.Payload(),
}).Debug("starting job")
id, err := strconv.ParseInt(delivery.Payload(), 10, 64)
if err != nil {
tc.logger.WithFields(logrus.Fields{
"subreddit#id": delivery.Payload(),
"err": err,
}).Error("failed to parse subreddit ID")
tc.logger.Error("failed to parse subreddit id from payload", zap.Error(err), zap.String("payload", delivery.Payload()))
_ = delivery.Reject()
return
}
tc.logger.Debug("starting job", zap.Int64("subreddit#id", id))
defer func() { _ = delivery.Ack() }()
subreddit, err := tc.subredditRepo.GetByID(tc, id)
if err != nil {
tc.logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed to fetch subreddit from database")
tc.logger.Error("failed to fetch subreddit from database", zap.Error(err), zap.Int64("subreddit#id", id))
return
}
watchers, err := tc.watcherRepo.GetByTrendingSubredditID(tc, subreddit.ID)
if err != nil {
tc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"err": err,
}).Error("failed to fetch watchers from database")
tc.logger.Error("failed to fetch watchers from database",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return
}
if len(watchers) == 0 {
tc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
}).Debug("no watchers for trending, finishing job")
tc.logger.Debug("no watchers for subreddit, bailing early",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return
}
@ -182,39 +174,44 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) {
tps, err := rac.SubredditTop(tc, subreddit.Name, reddit.WithQuery("t", "week"))
if err != nil {
tc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"err": err,
}).Error("failed to fetch month's top posts")
tc.logger.Error("failed to fetch month's top posts",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return
}
tc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"count": tps.Count,
}).Debug("loaded month's hot posts")
tc.logger.Debug("loaded month's top posts",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("count", tps.Count),
)
if tps.Count == 0 {
tc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
}).Debug("no top posts for subreddit, returning")
tc.logger.Debug("no top posts, bailing early",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return
}
if tps.Count < 20 {
tc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
}).Debug("not enough posts, returning")
tc.logger.Debug("no top posts, bailing early",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("count", tps.Count),
)
return
}
middlePost := tps.Count / 2
medianScore := tps.Children[middlePost].Score
tc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"score": medianScore,
}).Debug("calculated median score")
tc.logger.Debug("calculated median score",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("score", medianScore),
)
// Grab hot posts and filter out anything that's > 2 days old
i = rand.Intn(len(watchers))
@ -222,18 +219,18 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) {
rac = tc.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken)
hps, err := rac.SubredditHot(tc, subreddit.Name)
if err != nil {
tc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"err": err,
}).Error("failed to fetch hot posts")
tc.logger.Error("failed to fetch hot posts",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return
}
tc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"count": hps.Count,
}).Debug("loaded hot posts")
tc.logger.Debug("loaded hot posts",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("count", hps.Count),
)
// Trending only counts for posts less than 2 days old
threshold := time.Now().Add(-24 * time.Hour * 2)
@ -260,23 +257,24 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) {
notified, _ := tc.redis.Get(tc, lockKey).Bool()
if notified {
tc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
"watcher#id": watcher.ID,
"post#id": post.ID,
}).Debug("already notified, skipping")
tc.logger.Debug("already notified, skipping",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
zap.String("post#id", post.ID),
)
continue
}
tc.redis.SetEX(tc, lockKey, true, 48*time.Hour)
if err := tc.watcherRepo.IncrementHits(tc, watcher.ID); err != nil {
tc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"watcher#id": watcher.ID,
"err": err,
}).Error("could not increment hits")
tc.logger.Error("could not increment hits",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
)
return
}
@ -290,30 +288,31 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) {
res, err := client.Push(notification)
if err != nil || !res.Sent() {
_ = tc.statsd.Incr("apns.notification.errors", []string{}, 1)
tc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"post#id": post.ID,
"device#id": watcher.Device.ID,
"err": err,
"status": res.StatusCode,
"reason": res.Reason,
}).Error("failed to send notification")
tc.logger.Error("failed to send notification",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.String("post#id", post.ID),
zap.String("apns", watcher.Device.APNSToken),
zap.Int("response#status", res.StatusCode),
zap.String("response#reason", res.Reason),
)
} else {
_ = tc.statsd.Incr("apns.notification.sent", []string{}, 1)
tc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"post#id": post.ID,
"device#id": watcher.Device.ID,
"device#token": watcher.Device.APNSToken,
}).Info("sent notification")
tc.logger.Info("sent notification",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.String("post#id", post.ID),
zap.String("device#token", watcher.Device.APNSToken),
)
}
}
}
tc.logger.WithFields(logrus.Fields{
"subreddit#id": subreddit.ID,
"subreddit#name": subreddit.Name,
}).Debug("finishing job")
tc.logger.Debug("finishing job",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
}
func payloadFromTrendingPost(post *reddit.Thing) *payload.Payload {

View file

@ -15,7 +15,7 @@ import (
"github.com/sideshow/apns2"
"github.com/sideshow/apns2/payload"
"github.com/sideshow/apns2/token"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
"github.com/christianselig/apollo-backend/internal/domain"
"github.com/christianselig/apollo-backend/internal/reddit"
@ -25,7 +25,7 @@ import (
type usersWorker struct {
context.Context
logger *logrus.Logger
logger *zap.Logger
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
@ -43,7 +43,7 @@ type usersWorker struct {
const userNotificationTitleFormat = "👨\u200d🚀 %s"
func NewUsersWorker(ctx context.Context, logger *logrus.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
func NewUsersWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
@ -90,9 +90,7 @@ func (uw *usersWorker) Start() error {
return err
}
uw.logger.WithFields(logrus.Fields{
"numConsumers": uw.consumers,
}).Info("starting up users worker")
uw.logger.Info("starting up subreddits worker", zap.Int("consumers", uw.consumers))
prefetchLimit := int64(uw.consumers * 2)
@ -136,44 +134,38 @@ func NewUsersConsumer(uw *usersWorker, tag int) *usersConsumer {
}
func (uc *usersConsumer) Consume(delivery rmq.Delivery) {
uc.logger.WithFields(logrus.Fields{
"user#id": delivery.Payload(),
}).Debug("starting job")
id, err := strconv.ParseInt(delivery.Payload(), 10, 64)
if err != nil {
uc.logger.WithFields(logrus.Fields{
"user#id": delivery.Payload(),
"err": err,
}).Error("failed to parse user ID")
uc.logger.Error("failed to parse subreddit id from payload", zap.Error(err), zap.String("payload", delivery.Payload()))
_ = delivery.Reject()
return
}
uc.logger.Debug("starting job", zap.Int64("subreddit#id", id))
defer func() { _ = delivery.Ack() }()
user, err := uc.userRepo.GetByID(uc, id)
if err != nil {
uc.logger.WithFields(logrus.Fields{
"err": err,
}).Error("failed to fetch user from database")
uc.logger.Error("failed to fetch user from database", zap.Error(err), zap.Int64("subreddit#id", id))
return
}
watchers, err := uc.watcherRepo.GetByUserID(uc, user.ID)
if err != nil {
uc.logger.WithFields(logrus.Fields{
"user#id": user.ID,
"err": err,
}).Error("failed to fetch watchers from database")
uc.logger.Error("failed to fetch watchers from database",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
return
}
if len(watchers) == 0 {
uc.logger.WithFields(logrus.Fields{
"user#id": user.ID,
}).Info("no watchers for user, skipping")
uc.logger.Debug("no watchers for user, bailing early",
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
return
}
@ -186,41 +178,46 @@ func (uc *usersConsumer) Consume(delivery rmq.Delivery) {
ru, err := rac.UserAbout(uc, user.Name)
if err != nil {
uc.logger.WithFields(logrus.Fields{
"user#id": user.ID,
"err": err,
}).Error("failed to fetch user details")
uc.logger.Error("failed to fetch user details",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
return
}
if !ru.AcceptFollowers {
uc.logger.WithFields(logrus.Fields{
"user#id": user.ID,
}).Info("user disabled followers, removing")
uc.logger.Info("user disabled followers, removing",
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
if err := uc.watcherRepo.DeleteByTypeAndWatcheeID(uc, domain.UserWatcher, user.ID); err != nil {
uc.logger.WithFields(logrus.Fields{
"user#id": user.ID,
"err": err,
}).Error("failed to delete watchers for user who does not allow followers")
uc.logger.Error("failed to remove watchers for user who disallows followers",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
return
}
if err := uc.userRepo.Delete(uc, user.ID); err != nil {
uc.logger.WithFields(logrus.Fields{
"user#id": user.ID,
"err": err,
}).Error("failed to delete user")
uc.logger.Error("failed to remove user",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
return
}
}
posts, err := rac.UserPosts(uc, user.Name)
if err != nil {
uc.logger.WithFields(logrus.Fields{
"user#id": user.ID,
"err": err,
}).Error("failed to fetch user activity")
uc.logger.Error("failed to fetch user activity",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
return
}
@ -261,11 +258,12 @@ func (uc *usersConsumer) Consume(delivery rmq.Delivery) {
for _, watcher := range notifs {
if err := uc.watcherRepo.IncrementHits(uc, watcher.ID); err != nil {
uc.logger.WithFields(logrus.Fields{
"user#id": user.ID,
"watcher#id": watcher.ID,
"err": err,
}).Error("could not increment hits")
uc.logger.Error("failed to increment watcher hits",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
)
return
}
@ -285,28 +283,31 @@ func (uc *usersConsumer) Consume(delivery rmq.Delivery) {
res, err := client.Push(notification)
if err != nil || !res.Sent() {
_ = uc.statsd.Incr("apns.notification.errors", []string{}, 1)
uc.logger.WithFields(logrus.Fields{
"user#id": user.ID,
"device#id": device.ID,
"err": err,
"status": res.StatusCode,
"reason": res.Reason,
}).Error("failed to send notification")
uc.logger.Error("failed to send notification",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
zap.String("post#id", post.ID),
zap.String("apns", watcher.Device.APNSToken),
zap.Int("response#status", res.StatusCode),
zap.String("response#reason", res.Reason),
)
} else {
_ = uc.statsd.Incr("apns.notification.sent", []string{}, 1)
uc.logger.WithFields(logrus.Fields{
"user#id": user.ID,
"device#id": device.ID,
"device#token": device.APNSToken,
}).Info("sent notification")
uc.logger.Info("sent notification",
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
zap.String("post#id", post.ID),
zap.String("device#token", watcher.Device.APNSToken),
)
}
}
}
uc.logger.WithFields(logrus.Fields{
"user#id": user.ID,
"user#name": user.Name,
}).Debug("finishing job")
uc.logger.Debug("finishing job",
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
}
func payloadFromUserPost(post *reddit.Thing) *payload.Payload {

View file

@ -8,12 +8,12 @@ import (
"github.com/adjust/rmq/v4"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
)
const pollDuration = 100 * time.Millisecond
type NewWorkerFn func(context.Context, *logrus.Logger, *statsd.Client, *pgxpool.Pool, *redis.Client, rmq.Connection, int) Worker
type NewWorkerFn func(context.Context, *zap.Logger, *statsd.Client, *pgxpool.Pool, *redis.Client, rmq.Connection, int) Worker
type Worker interface {
Start() error
Stop()