This commit is contained in:
Andre Medeiros 2021-09-25 09:19:42 -04:00
parent 2902720675
commit fc1065deb5
12 changed files with 48 additions and 61 deletions

View file

@ -87,6 +87,11 @@ func (a *api) upsertAccountsHandler(w http.ResponseWriter, r *http.Request) {
ac = a.reddit.NewAuthenticatedClient(acc.RefreshToken, acc.AccessToken) ac = a.reddit.NewAuthenticatedClient(acc.RefreshToken, acc.AccessToken)
me, err := ac.Me() me, err := ac.Me()
if err != nil {
a.errorResponse(w, r, 422, err.Error())
return
}
if me.NormalizedUsername() != acc.NormalizedUsername() { if me.NormalizedUsername() != acc.NormalizedUsername() {
a.errorResponse(w, r, 422, "nice try") a.errorResponse(w, r, 422, "nice try")
return return
@ -100,12 +105,12 @@ func (a *api) upsertAccountsHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
a.accountRepo.Associate(ctx, &acc, &dev) _ = a.accountRepo.Associate(ctx, &acc, &dev)
} }
for _, acc := range accsMap { for _, acc := range accsMap {
fmt.Println(acc.NormalizedUsername()) fmt.Println(acc.NormalizedUsername())
a.accountRepo.Disassociate(ctx, &acc, &dev) _ = a.accountRepo.Disassociate(ctx, &acc, &dev)
} }
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)

View file

@ -127,7 +127,7 @@ func (a *api) loggingMiddleware(next http.Handler) http.Handler {
next.ServeHTTP(lrw, r) next.ServeHTTP(lrw, r)
logEntry := a.logger.WithFields(logrus.Fields{ logEntry := a.logger.WithFields(logrus.Fields{
"duration": time.Now().Sub(start).Milliseconds(), "duration": time.Since(start).Milliseconds(),
"method": r.Method, "method": r.Method,
"remote#addr": r.RemoteAddr, "remote#addr": r.RemoteAddr,
"response#bytes": lrw.bytes, "response#bytes": lrw.bytes,

View file

@ -107,10 +107,10 @@ func (a *api) deleteDeviceHandler(w http.ResponseWriter, r *http.Request) {
} }
for _, acc := range accs { for _, acc := range accs {
a.accountRepo.Disassociate(ctx, &acc, &dev) _ = a.accountRepo.Disassociate(ctx, &acc, &dev)
} }
a.deviceRepo.Delete(ctx, vars["apns"]) _ = a.deviceRepo.Delete(ctx, vars["apns"])
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }

View file

@ -12,5 +12,5 @@ func (a *api) healthCheckHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(data) _ = json.NewEncoder(w).Encode(data)
} }

View file

@ -20,7 +20,7 @@ func (a *api) checkReceiptHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
apns := vars["apns"] apns := vars["apns"]
body, err := ioutil.ReadAll(r.Body) body, _ := ioutil.ReadAll(r.Body)
iapr, err := itunes.NewIAPResponse(string(body), true) iapr, err := itunes.NewIAPResponse(string(body), true)
if err != nil { if err != nil {
@ -46,18 +46,18 @@ func (a *api) checkReceiptHandler(w http.ResponseWriter, r *http.Request) {
} }
for _, acc := range accs { for _, acc := range accs {
a.accountRepo.Disassociate(ctx, &acc, &dev) _ = a.accountRepo.Disassociate(ctx, &acc, &dev)
} }
a.deviceRepo.Delete(ctx, apns) _ = a.deviceRepo.Delete(ctx, apns)
} else { } else {
dev.ActiveUntil = time.Now().Unix() + domain.DeviceActiveAfterReceitCheckDuration dev.ActiveUntil = time.Now().Unix() + domain.DeviceActiveAfterReceitCheckDuration
a.deviceRepo.Update(ctx, &dev) _ = a.deviceRepo.Update(ctx, &dev)
} }
} }
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
bb, _ := json.Marshal(iapr.VerificationInfo) bb, _ := json.Marshal(iapr.VerificationInfo)
w.Write(bb) _, _ = w.Write(bb)
} }

View file

@ -47,7 +47,7 @@ func APICmd(ctx context.Context) *cobra.Command {
api := api.NewAPI(ctx, logger, statsd, db) api := api.NewAPI(ctx, logger, statsd, db)
srv := api.Server(port) srv := api.Server(port)
go srv.ListenAndServe() go func() { _ = srv.ListenAndServe() }()
logger.WithFields(logrus.Fields{ logger.WithFields(logrus.Fields{
"port": port, "port": port,
@ -55,7 +55,7 @@ func APICmd(ctx context.Context) *cobra.Command {
<-ctx.Done() <-ctx.Done()
srv.Shutdown(ctx) _ = srv.Shutdown(ctx)
return nil return nil
}, },

View file

@ -6,25 +6,11 @@ import (
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
"github.com/DataDog/datadog-go/statsd"
"github.com/adjust/rmq/v4"
"github.com/go-redis/redis/v8"
_ "github.com/heroku/x/hmetrics/onload" _ "github.com/heroku/x/hmetrics/onload"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/joho/godotenv" "github.com/joho/godotenv"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
type Command struct {
ctx context.Context
logger *logrus.Logger
statsd *statsd.Client
redis *redis.Client
jobs *rmq.Connection
db *pgxpool.Pool
}
func Execute(ctx context.Context) int { func Execute(ctx context.Context) int {
_ = godotenv.Load() _ = godotenv.Load()
@ -43,7 +29,7 @@ func Execute(ctx context.Context) int {
return perr return perr
} }
pprof.StartCPUProfile(f) _ = pprof.StartCPUProfile(f)
return nil return nil
}, },
PersistentPostRunE: func(cmd *cobra.Command, args []string) error { PersistentPostRunE: func(cmd *cobra.Command, args []string) error {

View file

@ -71,11 +71,11 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
} }
s := gocron.NewScheduler(time.UTC) s := gocron.NewScheduler(time.UTC)
s.Every(200).Milliseconds().SingletonMode().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) }) _, _ = s.Every(200).Milliseconds().SingletonMode().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) })
s.Every(1).Second().Do(func() { cleanQueues(ctx, logger, queue) }) _, _ = s.Every(1).Second().Do(func() { cleanQueues(ctx, logger, queue) })
s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, db, redis) }) _, _ = s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, db, redis) })
s.Every(1).Minute().Do(func() { pruneAccounts(ctx, logger, db) }) _, _ = s.Every(1).Minute().Do(func() { pruneAccounts(ctx, logger, db) })
s.Every(1).Minute().Do(func() { pruneDevices(ctx, logger, db) }) _, _ = s.Every(1).Minute().Do(func() { pruneDevices(ctx, logger, db) })
s.StartAsync() s.StartAsync()
<-ctx.Done() <-ctx.Done()
@ -185,8 +185,8 @@ func reportStats(ctx context.Context, logger *logrus.Logger, statsd *statsd.Clie
) )
for _, metric := range metrics { for _, metric := range metrics {
pool.QueryRow(ctx, metric.query).Scan(&count) _ = pool.QueryRow(ctx, metric.query).Scan(&count)
statsd.Gauge(metric.name, float64(count), []string{}, 1) _ = statsd.Gauge(metric.name, float64(count), []string{}, 1)
logger.WithFields(logrus.Fields{ logger.WithFields(logrus.Fields{
"count": count, "count": count,
@ -231,7 +231,7 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
defer rows.Close() defer rows.Close()
for rows.Next() { for rows.Next() {
var id int64 var id int64
rows.Scan(&id) _ = rows.Scan(&id)
ids = append(ids, id) ids = append(ids, id)
} }
return nil return nil
@ -292,9 +292,9 @@ func enqueueAccounts(ctx context.Context, logger *logrus.Logger, statsd *statsd.
} }
} }
statsd.Histogram("apollo.queue.enqueued", float64(enqueued), []string{}, 1) _ = statsd.Histogram("apollo.queue.enqueued", float64(enqueued), []string{}, 1)
statsd.Histogram("apollo.queue.skipped", float64(skipped), []string{}, 1) _ = statsd.Histogram("apollo.queue.skipped", float64(skipped), []string{}, 1)
statsd.Histogram("apollo.queue.runtime", float64(time.Now().Sub(start).Milliseconds()), []string{}, 1) _ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(start).Milliseconds()), []string{}, 1)
logger.WithFields(logrus.Fields{ logger.WithFields(logrus.Fields{
"count": enqueued, "count": enqueued,

View file

@ -64,7 +64,9 @@ func WorkerCmd(ctx context.Context) *cobra.Command {
} }
worker := workerFn(logger, statsd, db, redis, queue, consumers) worker := workerFn(logger, statsd, db, redis, queue, consumers)
worker.Start() if err := worker.Start(); err != nil {
return err
}
<-ctx.Done() <-ctx.Done()

View file

@ -218,7 +218,7 @@ func NewIAPResponse(receipt string, production bool) (*IAPResponse, error) {
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 { if resp.StatusCode < 200 || resp.StatusCode > 299 {
fmt.Println(fmt.Sprintf("Weird HTTP status code from Apple: %d", resp.StatusCode)) fmt.Printf("Weird HTTP status code from Apple: %d\n", resp.StatusCode)
} }
decoder := json.NewDecoder(resp.Body) decoder := json.NewDecoder(resp.Body)
@ -336,7 +336,6 @@ func (iapr *IAPResponse) handleAppleResponse() {
if transaction.ExpiresDateMS > mostRecentTransactionTime { if transaction.ExpiresDateMS > mostRecentTransactionTime {
mostRecentTransactionIndex = index mostRecentTransactionIndex = index
mostRecentTransactionTime = transaction.ExpiresDateMS mostRecentTransactionTime = transaction.ExpiresDateMS
choseOne = true
} }
} }
} }

View file

@ -48,13 +48,13 @@ func NewClient(id, secret string, statsd statsd.ClientInterface, connLimit int)
tracer := &httptrace.ClientTrace{ tracer := &httptrace.ClientTrace{
GotConn: func(info httptrace.GotConnInfo) { GotConn: func(info httptrace.GotConnInfo) {
if info.Reused { if info.Reused {
statsd.Incr("reddit.api.connections.reused", []string{}, 0.1) _ = statsd.Incr("reddit.api.connections.reused", []string{}, 0.1)
if info.WasIdle { if info.WasIdle {
idleTime := float64(int64(info.IdleTime) / int64(time.Millisecond)) idleTime := float64(int64(info.IdleTime) / int64(time.Millisecond))
statsd.Histogram("reddit.api.connections.idle_time", idleTime, []string{}, 0.1) _ = statsd.Histogram("reddit.api.connections.idle_time", idleTime, []string{}, 0.1)
} }
} else { } else {
statsd.Incr("reddit.api.connections.created", []string{}, 0.1) _ = statsd.Incr("reddit.api.connections.created", []string{}, 0.1)
} }
}, },
} }
@ -102,11 +102,11 @@ func (rac *AuthenticatedClient) request(r *Request, rh ResponseHandler, empty in
start := time.Now() start := time.Now()
resp, err := rac.client.Do(req) resp, err := rac.client.Do(req)
rac.statsd.Incr("reddit.api.calls", r.tags, 0.1) _ = rac.statsd.Incr("reddit.api.calls", r.tags, 0.1)
rac.statsd.Histogram("reddit.api.latency", float64(time.Now().Sub(start).Milliseconds()), r.tags, 0.1) _ = rac.statsd.Histogram("reddit.api.latency", float64(time.Since(start).Milliseconds()), r.tags, 0.1)
if err != nil { if err != nil {
rac.statsd.Incr("reddit.api.errors", r.tags, 0.1) _ = rac.statsd.Incr("reddit.api.errors", r.tags, 0.1)
if strings.Contains(err.Error(), "http2: timeout awaiting response headers") { if strings.Contains(err.Error(), "http2: timeout awaiting response headers") {
return nil, ErrTimeout return nil, ErrTimeout
} }
@ -116,12 +116,12 @@ func (rac *AuthenticatedClient) request(r *Request, rh ResponseHandler, empty in
bb, err := ioutil.ReadAll(resp.Body) bb, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
rac.statsd.Incr("reddit.api.errors", r.tags, 0.1) _ = rac.statsd.Incr("reddit.api.errors", r.tags, 0.1)
return nil, err return nil, err
} }
if resp.StatusCode != 200 { if resp.StatusCode != 200 {
rac.statsd.Incr("reddit.api.errors", r.tags, 0.1) _ = rac.statsd.Incr("reddit.api.errors", r.tags, 0.1)
// Try to parse a json error. Otherwise we generate a generic one // Try to parse a json error. Otherwise we generate a generic one
parser := rac.pool.Get() parser := rac.pool.Get()

View file

@ -154,11 +154,11 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
"err": err, "err": err,
}).Error("failed to parse account ID") }).Error("failed to parse account ID")
delivery.Reject() _ = delivery.Reject()
return return
} }
defer delivery.Ack() defer func() { _ = delivery.Ack() }()
now := float64(time.Now().UnixNano()/int64(time.Millisecond)) / 1000 now := float64(time.Now().UnixNano()/int64(time.Millisecond)) / 1000
@ -230,7 +230,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
// the numbers too much. // the numbers too much.
if !newAccount { if !newAccount {
latency := now - previousLastCheckedAt - float64(backoff) latency := now - previousLastCheckedAt - float64(backoff)
nc.statsd.Histogram("apollo.queue.delay", latency, []string{}, rate) _ = nc.statsd.Histogram("apollo.queue.delay", latency, []string{}, rate)
} }
nc.logger.WithFields(logrus.Fields{ nc.logger.WithFields(logrus.Fields{
@ -259,7 +259,6 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
nc.logger.WithFields(logrus.Fields{ nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(), "account#username": account.NormalizedUsername(),
}).Info("removed revoked account") }).Info("removed revoked account")
break
default: default:
nc.logger.WithFields(logrus.Fields{ nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(), "account#username": account.NormalizedUsername(),
@ -325,7 +324,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
res, err := client.Push(notification) res, err := client.Push(notification)
if err != nil { if err != nil {
nc.statsd.Incr("apns.notification.errors", []string{}, 1) _ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
nc.logger.WithFields(logrus.Fields{ nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(), "account#username": account.NormalizedUsername(),
"err": err, "err": err,
@ -333,7 +332,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
"reason": res.Reason, "reason": res.Reason,
}).Error("failed to send notification") }).Error("failed to send notification")
} else { } else {
nc.statsd.Incr("apns.notification.sent", []string{}, 1) _ = nc.statsd.Incr("apns.notification.sent", []string{}, 1)
nc.logger.WithFields(logrus.Fields{ nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(), "account#username": account.NormalizedUsername(),
"token": device.APNSToken, "token": device.APNSToken,
@ -343,7 +342,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
} }
ev := fmt.Sprintf("Sent notification to /u/%s (x%d)", account.Username, msgs.Count) ev := fmt.Sprintf("Sent notification to /u/%s (x%d)", account.Username, msgs.Count)
nc.statsd.SimpleEvent(ev, "") _ = nc.statsd.SimpleEvent(ev, "")
nc.logger.WithFields(logrus.Fields{ nc.logger.WithFields(logrus.Fields{
"account#username": account.NormalizedUsername(), "account#username": account.NormalizedUsername(),
@ -407,7 +406,6 @@ func payloadFromMessage(acct domain.Account, msg *reddit.Thing, badgeCount int)
} }
payload = payload.Custom("subject", "comment").ThreadID("comment") payload = payload.Custom("subject", "comment").ThreadID("comment")
break
case (msg.Kind == "t1" && msg.Type == "post_reply"): case (msg.Kind == "t1" && msg.Type == "post_reply"):
title := fmt.Sprintf(`%s to “%s”`, msg.Author, postTitle) title := fmt.Sprintf(`%s to “%s”`, msg.Author, postTitle)
payload = payload. payload = payload.
@ -417,7 +415,6 @@ func payloadFromMessage(acct domain.Account, msg *reddit.Thing, badgeCount int)
Custom("subject", "comment"). Custom("subject", "comment").
Custom("type", "post"). Custom("type", "post").
ThreadID("comment") ThreadID("comment")
break
case (msg.Kind == "t1" && msg.Type == "comment_reply"): case (msg.Kind == "t1" && msg.Type == "comment_reply"):
title := fmt.Sprintf(`%s in “%s”`, msg.Author, postTitle) title := fmt.Sprintf(`%s in “%s”`, msg.Author, postTitle)
postID := reddit.PostIDFromContext(msg.Context) postID := reddit.PostIDFromContext(msg.Context)
@ -429,7 +426,6 @@ func payloadFromMessage(acct domain.Account, msg *reddit.Thing, badgeCount int)
Custom("subject", "comment"). Custom("subject", "comment").
Custom("type", "comment"). Custom("type", "comment").
ThreadID("comment") ThreadID("comment")
break
case (msg.Kind == "t4"): case (msg.Kind == "t4"):
title := fmt.Sprintf(`Message from %s`, msg.Author) title := fmt.Sprintf(`Message from %s`, msg.Author)
payload = payload. payload = payload.
@ -437,7 +433,6 @@ func payloadFromMessage(acct domain.Account, msg *reddit.Thing, badgeCount int)
AlertSubtitle(postTitle). AlertSubtitle(postTitle).
Category("inbox-private-message"). Category("inbox-private-message").
Custom("type", "private-message") Custom("type", "private-message")
break
} }
return payload return payload