apollo-backend/cmd/apollo-worker/main.go

222 lines
5.9 KiB
Go
Raw Normal View History

2021-05-10 00:51:15 +00:00
package main
import (
"database/sql"
2021-07-06 01:12:09 +00:00
"fmt"
2021-05-10 00:51:15 +00:00
"log"
"os"
2021-06-24 02:19:43 +00:00
"os/signal"
"runtime"
"syscall"
"time"
2021-05-10 00:51:15 +00:00
2021-07-06 02:09:04 +00:00
"github.com/DataDog/datadog-go/statsd"
2021-07-08 01:44:52 +00:00
_ "github.com/heroku/x/hmetrics/onload"
2021-05-10 00:51:15 +00:00
"github.com/joho/godotenv"
2021-06-24 02:19:43 +00:00
_ "github.com/lib/pq"
"github.com/sideshow/apns2"
"github.com/sideshow/apns2/payload"
"github.com/sideshow/apns2/token"
2021-05-10 00:51:15 +00:00
2021-07-05 23:22:24 +00:00
"github.com/christianselig/apollo-backend/internal/data"
"github.com/christianselig/apollo-backend/internal/reddit"
2021-05-10 00:51:15 +00:00
)
type application struct {
logger *log.Logger
db *sql.DB
models *data.Models
client *reddit.Client
}
2021-07-07 22:09:29 +00:00
var (
workers int = runtime.NumCPU() * 6
rate float64 = 0.1
2021-07-07 22:37:10 +00:00
backoff int = 5
2021-07-07 22:09:29 +00:00
)
2021-06-24 02:19:43 +00:00
2021-07-06 02:46:56 +00:00
func accountWorker(id int, rc *reddit.Client, db *sql.DB, logger *log.Logger, statsd *statsd.Client, quit chan bool) {
2021-07-06 01:30:42 +00:00
authKey, err := token.AuthKeyFromBytes([]byte(os.Getenv("APPLE_KEY_PKEY")))
2021-06-24 02:19:43 +00:00
token := &token.Token{
AuthKey: authKey,
2021-07-06 01:30:42 +00:00
KeyID: os.Getenv("APPLE_KEY_ID"),
TeamID: os.Getenv("APPLE_TEAM_ID"),
2021-06-24 02:19:43 +00:00
}
if err != nil {
log.Fatal("token error:", err)
}
2021-07-07 15:37:30 +00:00
sandboxClient := apns2.NewTokenClient(token)
productionClient := apns2.NewTokenClient(token).Production()
2021-06-24 02:19:43 +00:00
for {
select {
case <-quit:
return
default:
2021-07-07 22:37:10 +00:00
now := float64(time.Now().UTC().UnixNano()/int64(time.Millisecond)) / 1000
2021-06-24 02:19:43 +00:00
tx, err := db.Begin()
if err != nil {
log.Fatal(err)
continue
}
query := `
2021-07-07 19:19:11 +00:00
SELECT id, username, access_token, refresh_token, expires_at, last_message_id, last_checked_at FROM accounts
2021-07-07 22:47:59 +00:00
WHERE last_checked_at + 5 <= $1::float
2021-06-24 02:19:43 +00:00
ORDER BY last_checked_at
LIMIT 1
FOR UPDATE SKIP LOCKED`
args := []interface{}{now}
2021-06-24 02:19:43 +00:00
account := &data.Account{}
2021-07-07 19:19:11 +00:00
err = tx.QueryRow(query, args...).Scan(&account.ID, &account.Username, &account.AccessToken, &account.RefreshToken, &account.ExpiresAt, &account.LastMessageID, &account.LastCheckedAt)
2021-06-24 02:19:43 +00:00
if account.ID == 0 {
tx.Commit()
time.Sleep(100 * time.Millisecond)
2021-06-24 02:19:43 +00:00
continue
}
2021-07-07 22:09:29 +00:00
if account.LastCheckedAt > 0 {
2021-07-07 22:37:10 +00:00
latency := now - account.LastCheckedAt - float64(backoff)
2021-07-07 22:44:14 +00:00
statsd.Histogram("apollo.queue.delay", latency, []string{}, rate)
2021-07-07 22:09:29 +00:00
}
2021-06-24 02:19:43 +00:00
_, err = tx.Exec(`UPDATE accounts SET last_checked_at = $1 WHERE id = $2`, now, account.ID)
rac := rc.NewAuthenticatedClient(account.RefreshToken, account.AccessToken)
2021-07-07 22:37:10 +00:00
if account.ExpiresAt < int64(now) {
2021-06-24 02:19:43 +00:00
tokens, _ := rac.RefreshTokens()
tx.Exec(`UPDATE accounts SET access_token = $1, refresh_token = $2, expires_at = $3 WHERE id = $4`,
2021-07-07 22:46:12 +00:00
tokens.AccessToken, tokens.RefreshToken, int64(now+3500), account.ID)
2021-06-24 02:19:43 +00:00
}
2021-07-06 02:09:04 +00:00
t1 := time.Now()
2021-06-24 02:19:43 +00:00
msgs, err := rac.MessageInbox(account.LastMessageID)
2021-07-06 02:09:04 +00:00
t2 := time.Now()
2021-07-06 02:18:22 +00:00
statsd.Histogram("reddit.api.latency", float64(t2.Sub(t1).Milliseconds()), []string{}, rate)
2021-07-06 02:09:04 +00:00
2021-06-24 02:19:43 +00:00
if err != nil {
log.Fatal(err)
}
if len(msgs.MessageListing.Messages) == 0 {
tx.Commit()
continue
}
// Set latest message we alerted on
latestMsg := msgs.MessageListing.Messages[0]
2021-07-07 22:37:10 +00:00
latency := now - latestMsg.CreatedAt
2021-07-07 21:15:44 +00:00
statsd.Histogram("apollo.notification.latency", latency, []string{}, rate)
2021-06-24 02:19:43 +00:00
_, err = tx.Exec(`UPDATE accounts SET last_message_id = $1 WHERE id = $2`, latestMsg.FullName(), account.ID)
if err != nil {
log.Fatal(err)
}
2021-07-06 01:55:01 +00:00
// If no latest message recorded, we're not going to notify on every message. Remember that and move on.
if account.LastMessageID == "" {
tx.Commit()
continue
}
2021-07-06 01:39:09 +00:00
query = `
2021-07-07 15:37:30 +00:00
SELECT apns_token, sandbox FROM devices
2021-07-06 01:39:09 +00:00
LEFT JOIN devices_accounts ON devices.id = devices_accounts.device_id
WHERE devices_accounts.account_id = $1`
rows, err := tx.Query(query, account.ID)
if err != nil {
logger.Fatal(err)
}
2021-07-06 02:46:56 +00:00
defer rows.Close()
2021-07-07 15:37:30 +00:00
devices := []data.Device{}
2021-07-06 01:39:09 +00:00
for rows.Next() {
2021-07-07 15:37:30 +00:00
device := data.Device{}
rows.Scan(&device.APNSToken, &device.Sandbox)
2021-07-06 01:39:09 +00:00
devices = append(devices, device)
}
2021-06-24 02:19:43 +00:00
for _, msg := range msgs.MessageListing.Messages {
2021-07-06 01:39:09 +00:00
for _, device := range devices {
notification := &apns2.Notification{}
2021-07-07 15:37:30 +00:00
notification.DeviceToken = device.APNSToken
2021-07-06 01:39:09 +00:00
notification.Topic = "com.christianselig.Apollo"
notification.Payload = payload.NewPayload().AlertTitle(msg.Subject).AlertBody(msg.Body)
2021-07-07 15:37:30 +00:00
client := productionClient
if device.Sandbox {
client = sandboxClient
}
2021-07-06 02:09:04 +00:00
t1 := time.Now()
2021-07-06 01:39:09 +00:00
res, err := client.Push(notification)
2021-07-06 02:09:04 +00:00
t2 := time.Now()
statsd.Histogram("apns.notification.latency", float64(t2.Sub(t1).Milliseconds()), []string{}, float64(1))
2021-07-06 01:39:09 +00:00
if err != nil {
statsd.Incr("apns.notification.errors", []string{}, float64(1))
logger.Printf("apns error account=%s token=%s err=%s status=%d reason=%q", account.Username, device.APNSToken, err, res.StatusCode, res.Reason)
2021-07-06 01:39:09 +00:00
} else {
statsd.Incr("apns.notification.sent", []string{}, float64(1))
2021-07-07 19:19:11 +00:00
logger.Printf("apns success account=%s token=%s", account.Username, device.APNSToken)
2021-07-06 01:39:09 +00:00
}
2021-07-06 01:19:29 +00:00
}
2021-06-24 02:19:43 +00:00
}
tx.Commit()
}
}
}
2021-05-10 00:51:15 +00:00
func main() {
logger := log.New(os.Stdout, "", log.Ldate|log.Ltime)
2021-07-06 00:20:00 +00:00
if err := godotenv.Load(); err != nil {
logger.Printf("Couldn't find .env so I will read from existing ENV.")
}
2021-06-24 02:19:43 +00:00
rc := reddit.NewClient(os.Getenv("REDDIT_CLIENT_ID"), os.Getenv("REDDIT_CLIENT_SECRET"))
2021-07-06 00:37:47 +00:00
dburl, ok := os.LookupEnv("DATABASE_CONNECTION_POOL_URL")
if !ok {
dburl = os.Getenv("DATABASE_URL")
}
2021-07-06 01:12:09 +00:00
db, err := sql.Open("postgres", fmt.Sprintf("%s?binary_parameters=yes", dburl))
2021-05-10 00:51:15 +00:00
if err != nil {
log.Fatal(err)
}
defer db.Close()
2021-07-05 23:37:18 +00:00
logger.Printf("Starting with %d workers.", workers)
2021-06-24 02:19:43 +00:00
db.SetMaxOpenConns(workers)
2021-05-10 00:51:15 +00:00
2021-07-06 02:46:56 +00:00
statsd, err := statsd.New("127.0.0.1:8125")
if err != nil {
log.Fatal(err)
}
2021-06-24 02:19:43 +00:00
// This is a very conservative value -- seen as most of the work that is done in these jobs is
//
runtime.GOMAXPROCS(workers)
quitCh := make(chan bool, workers)
for i := 0; i < workers; i++ {
2021-07-06 02:46:56 +00:00
go accountWorker(i, rc, db, logger, statsd, quitCh)
2021-05-10 00:51:15 +00:00
}
2021-06-24 02:19:43 +00:00
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
for i := 0; i < workers; i++ {
quitCh <- true
}
os.Exit(0)
2021-05-10 00:51:15 +00:00
}