diff --git a/cmd/apollo-scheduler/main.go b/cmd/apollo-scheduler/main.go index 2480dbd..e5b2c7f 100644 --- a/cmd/apollo-scheduler/main.go +++ b/cmd/apollo-scheduler/main.go @@ -61,10 +61,12 @@ func main() { // Set up Redis connection var redisConn *redis.Client { - redisConn = redis.NewClient(&redis.Options{ - Addr: os.Getenv("REDIS_URL"), - }) + opt, err := redis.ParseURL(os.Getenv("REDISCLOUD_URL")) + if err != nil { + panic(err) + } + redisConn = redis.NewClient(opt) if err := redisConn.Ping(ctx).Err(); err != nil { panic(err) } diff --git a/cmd/apollo-worker-notifications/main.go b/cmd/apollo-worker-notifications/main.go index 1cc16a9..5d00892 100644 --- a/cmd/apollo-worker-notifications/main.go +++ b/cmd/apollo-worker-notifications/main.go @@ -6,6 +6,7 @@ import ( "log" "os" "os/signal" + "runtime" "strconv" "syscall" "time" @@ -27,6 +28,8 @@ import ( const ( pollDuration = 100 * time.Millisecond + backoff = 5 + rate = 0.1 ) func main() { @@ -98,9 +101,19 @@ func main() { log.Fatal("token error:", err) } - redisConn := redis.NewClient(&redis.Options{ - Addr: os.Getenv("REDIS_URL"), - }) + // Set up Redis connection + var redisConn *redis.Client + { + opt, err := redis.ParseURL(os.Getenv("REDISCLOUD_URL")) + if err != nil { + panic(err) + } + + redisConn = redis.NewClient(opt) + if err := redisConn.Ping(ctx).Err(); err != nil { + panic(err) + } + } connection, err := rmq.OpenConnectionWithRedisClient("consumer", redisConn, errChan) if err != nil { @@ -112,8 +125,7 @@ func main() { panic(err) } - //numConsumers := runtime.NumCPU() * 8 - numConsumers := 1 + numConsumers := runtime.NumCPU() * 8 prefetchLimit := int64(numConsumers * 8) if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil { @@ -214,11 +226,17 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { return } + if account.LastCheckedAt > 0 { + latency := now - account.LastCheckedAt - float64(backoff) + c.statsd.Histogram("apollo.queue.delay", latency, []string{}, rate) + } + rac := c.reddit.NewAuthenticatedClient(account.RefreshToken, account.AccessToken) if account.ExpiresAt < int64(now) { c.logger.WithFields(logrus.Fields{ "accountID": id, }).Debug("refreshing reddit token") + tokens, err := rac.RefreshTokens() if err != nil { c.logger.WithFields(logrus.Fields{ @@ -360,6 +378,7 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { res, err := client.Push(notification) if err != nil { + c.statsd.Incr("apns.notification.errors", []string{}, 0.1) c.logger.WithFields(logrus.Fields{ "accountID": id, "err": err, @@ -367,6 +386,7 @@ func (c *Consumer) Consume(delivery rmq.Delivery) { "reason": res.Reason, }).Error("failed to send notification") } else { + c.statsd.Incr("apns.notification.sent", []string{}, 0.1) c.logger.WithFields(logrus.Fields{ "accountID": delivery.Payload(), "token": device.APNSToken, diff --git a/internal/reddit/client.go b/internal/reddit/client.go index 8d8ee55..f6c1377 100644 --- a/internal/reddit/client.go +++ b/internal/reddit/client.go @@ -74,8 +74,13 @@ func (rac *AuthenticatedClient) request(r *Request) ([]byte, error) { req = req.WithContext(httptrace.WithClientTrace(req.Context(), rac.tracer)) + start := time.Now() resp, err := rac.client.Do(req) + rac.statsd.Incr("reddit.api.calls", r.tags, 0.1) + rac.statsd.Histogram("reddit.api.latency", float64(time.Now().Sub(start).Milliseconds()), r.tags, 0.1) + if err != nil { + rac.statsd.Incr("reddit.api.errors", r.tags, 0.1) return nil, err } defer resp.Body.Close() @@ -85,6 +90,7 @@ func (rac *AuthenticatedClient) request(r *Request) ([]byte, error) { func (rac *AuthenticatedClient) RefreshTokens() (*RefreshTokenResponse, error) { req := NewRequest( + WithTags([]string{"url:/api/v1/access_token"}), WithMethod("POST"), WithURL(tokenURL), WithBody("grant_type", "refresh_token"), @@ -105,6 +111,7 @@ func (rac *AuthenticatedClient) RefreshTokens() (*RefreshTokenResponse, error) { func (rac *AuthenticatedClient) MessageInbox(from string) (*MessageListingResponse, error) { req := NewRequest( + WithTags([]string{"/api/v1/message/inbox"}), WithMethod("GET"), WithToken(rac.accessToken), WithURL("https://oauth.reddit.com/message/inbox.json"), @@ -132,6 +139,7 @@ func (mr *MeResponse) NormalizedUsername() string { func (rac *AuthenticatedClient) Me() (*MeResponse, error) { req := NewRequest( + WithTags([]string{"url:/api/v1/me"}), WithMethod("GET"), WithToken(rac.accessToken), WithURL("https://oauth.reddit.com/api/v1/me"), diff --git a/internal/reddit/request.go b/internal/reddit/request.go index 5ed7ca1..e263fa3 100644 --- a/internal/reddit/request.go +++ b/internal/reddit/request.go @@ -17,12 +17,13 @@ type Request struct { token string url string auth string + tags []string } type RequestOption func(*Request) func NewRequest(opts ...RequestOption) *Request { - req := &Request{url.Values{}, url.Values{}, "GET", "", "", ""} + req := &Request{url.Values{}, url.Values{}, "GET", "", "", "", nil} for _, opt := range opts { opt(req) } @@ -47,6 +48,12 @@ func (r *Request) HTTPRequest() (*http.Request, error) { return req, err } +func WithTags(tags []string) RequestOption { + return func(req *Request) { + req.tags = tags + } +} + func WithMethod(method string) RequestOption { return func(req *Request) { req.method = method