stats where they belong yay

This commit is contained in:
Andre Medeiros 2021-07-08 19:26:15 -04:00
parent 8eeda61c38
commit dea42a40f6
4 changed files with 46 additions and 9 deletions

View file

@ -61,10 +61,12 @@ func main() {
// Set up Redis connection // Set up Redis connection
var redisConn *redis.Client var redisConn *redis.Client
{ {
redisConn = redis.NewClient(&redis.Options{ opt, err := redis.ParseURL(os.Getenv("REDISCLOUD_URL"))
Addr: os.Getenv("REDIS_URL"), if err != nil {
}) panic(err)
}
redisConn = redis.NewClient(opt)
if err := redisConn.Ping(ctx).Err(); err != nil { if err := redisConn.Ping(ctx).Err(); err != nil {
panic(err) panic(err)
} }

View file

@ -6,6 +6,7 @@ import (
"log" "log"
"os" "os"
"os/signal" "os/signal"
"runtime"
"strconv" "strconv"
"syscall" "syscall"
"time" "time"
@ -27,6 +28,8 @@ import (
const ( const (
pollDuration = 100 * time.Millisecond pollDuration = 100 * time.Millisecond
backoff = 5
rate = 0.1
) )
func main() { func main() {
@ -98,9 +101,19 @@ func main() {
log.Fatal("token error:", err) log.Fatal("token error:", err)
} }
redisConn := redis.NewClient(&redis.Options{ // Set up Redis connection
Addr: os.Getenv("REDIS_URL"), var redisConn *redis.Client
}) {
opt, err := redis.ParseURL(os.Getenv("REDISCLOUD_URL"))
if err != nil {
panic(err)
}
redisConn = redis.NewClient(opt)
if err := redisConn.Ping(ctx).Err(); err != nil {
panic(err)
}
}
connection, err := rmq.OpenConnectionWithRedisClient("consumer", redisConn, errChan) connection, err := rmq.OpenConnectionWithRedisClient("consumer", redisConn, errChan)
if err != nil { if err != nil {
@ -112,8 +125,7 @@ func main() {
panic(err) panic(err)
} }
//numConsumers := runtime.NumCPU() * 8 numConsumers := runtime.NumCPU() * 8
numConsumers := 1
prefetchLimit := int64(numConsumers * 8) prefetchLimit := int64(numConsumers * 8)
if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil { if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil {
@ -214,11 +226,17 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
return return
} }
if account.LastCheckedAt > 0 {
latency := now - account.LastCheckedAt - float64(backoff)
c.statsd.Histogram("apollo.queue.delay", latency, []string{}, rate)
}
rac := c.reddit.NewAuthenticatedClient(account.RefreshToken, account.AccessToken) rac := c.reddit.NewAuthenticatedClient(account.RefreshToken, account.AccessToken)
if account.ExpiresAt < int64(now) { if account.ExpiresAt < int64(now) {
c.logger.WithFields(logrus.Fields{ c.logger.WithFields(logrus.Fields{
"accountID": id, "accountID": id,
}).Debug("refreshing reddit token") }).Debug("refreshing reddit token")
tokens, err := rac.RefreshTokens() tokens, err := rac.RefreshTokens()
if err != nil { if err != nil {
c.logger.WithFields(logrus.Fields{ c.logger.WithFields(logrus.Fields{
@ -360,6 +378,7 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
res, err := client.Push(notification) res, err := client.Push(notification)
if err != nil { if err != nil {
c.statsd.Incr("apns.notification.errors", []string{}, 0.1)
c.logger.WithFields(logrus.Fields{ c.logger.WithFields(logrus.Fields{
"accountID": id, "accountID": id,
"err": err, "err": err,
@ -367,6 +386,7 @@ func (c *Consumer) Consume(delivery rmq.Delivery) {
"reason": res.Reason, "reason": res.Reason,
}).Error("failed to send notification") }).Error("failed to send notification")
} else { } else {
c.statsd.Incr("apns.notification.sent", []string{}, 0.1)
c.logger.WithFields(logrus.Fields{ c.logger.WithFields(logrus.Fields{
"accountID": delivery.Payload(), "accountID": delivery.Payload(),
"token": device.APNSToken, "token": device.APNSToken,

View file

@ -74,8 +74,13 @@ func (rac *AuthenticatedClient) request(r *Request) ([]byte, error) {
req = req.WithContext(httptrace.WithClientTrace(req.Context(), rac.tracer)) req = req.WithContext(httptrace.WithClientTrace(req.Context(), rac.tracer))
start := time.Now()
resp, err := rac.client.Do(req) resp, err := rac.client.Do(req)
rac.statsd.Incr("reddit.api.calls", r.tags, 0.1)
rac.statsd.Histogram("reddit.api.latency", float64(time.Now().Sub(start).Milliseconds()), r.tags, 0.1)
if err != nil { if err != nil {
rac.statsd.Incr("reddit.api.errors", r.tags, 0.1)
return nil, err return nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
@ -85,6 +90,7 @@ func (rac *AuthenticatedClient) request(r *Request) ([]byte, error) {
func (rac *AuthenticatedClient) RefreshTokens() (*RefreshTokenResponse, error) { func (rac *AuthenticatedClient) RefreshTokens() (*RefreshTokenResponse, error) {
req := NewRequest( req := NewRequest(
WithTags([]string{"url:/api/v1/access_token"}),
WithMethod("POST"), WithMethod("POST"),
WithURL(tokenURL), WithURL(tokenURL),
WithBody("grant_type", "refresh_token"), WithBody("grant_type", "refresh_token"),
@ -105,6 +111,7 @@ func (rac *AuthenticatedClient) RefreshTokens() (*RefreshTokenResponse, error) {
func (rac *AuthenticatedClient) MessageInbox(from string) (*MessageListingResponse, error) { func (rac *AuthenticatedClient) MessageInbox(from string) (*MessageListingResponse, error) {
req := NewRequest( req := NewRequest(
WithTags([]string{"/api/v1/message/inbox"}),
WithMethod("GET"), WithMethod("GET"),
WithToken(rac.accessToken), WithToken(rac.accessToken),
WithURL("https://oauth.reddit.com/message/inbox.json"), WithURL("https://oauth.reddit.com/message/inbox.json"),
@ -132,6 +139,7 @@ func (mr *MeResponse) NormalizedUsername() string {
func (rac *AuthenticatedClient) Me() (*MeResponse, error) { func (rac *AuthenticatedClient) Me() (*MeResponse, error) {
req := NewRequest( req := NewRequest(
WithTags([]string{"url:/api/v1/me"}),
WithMethod("GET"), WithMethod("GET"),
WithToken(rac.accessToken), WithToken(rac.accessToken),
WithURL("https://oauth.reddit.com/api/v1/me"), WithURL("https://oauth.reddit.com/api/v1/me"),

View file

@ -17,12 +17,13 @@ type Request struct {
token string token string
url string url string
auth string auth string
tags []string
} }
type RequestOption func(*Request) type RequestOption func(*Request)
func NewRequest(opts ...RequestOption) *Request { func NewRequest(opts ...RequestOption) *Request {
req := &Request{url.Values{}, url.Values{}, "GET", "", "", ""} req := &Request{url.Values{}, url.Values{}, "GET", "", "", "", nil}
for _, opt := range opts { for _, opt := range opts {
opt(req) opt(req)
} }
@ -47,6 +48,12 @@ func (r *Request) HTTPRequest() (*http.Request, error) {
return req, err return req, err
} }
func WithTags(tags []string) RequestOption {
return func(req *Request) {
req.tags = tags
}
}
func WithMethod(method string) RequestOption { func WithMethod(method string) RequestOption {
return func(req *Request) { return func(req *Request) {
req.method = method req.method = method