2021-05-10 00:51:15 +00:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"database/sql"
|
2021-07-06 01:12:09 +00:00
|
|
|
"fmt"
|
2021-05-10 00:51:15 +00:00
|
|
|
"log"
|
|
|
|
"os"
|
2021-06-24 02:19:43 +00:00
|
|
|
"os/signal"
|
|
|
|
"runtime"
|
|
|
|
"syscall"
|
|
|
|
"time"
|
2021-05-10 00:51:15 +00:00
|
|
|
|
2021-07-06 02:09:04 +00:00
|
|
|
"github.com/DataDog/datadog-go/statsd"
|
2021-05-10 00:51:15 +00:00
|
|
|
"github.com/joho/godotenv"
|
2021-06-24 02:19:43 +00:00
|
|
|
_ "github.com/lib/pq"
|
|
|
|
"github.com/sideshow/apns2"
|
|
|
|
"github.com/sideshow/apns2/payload"
|
|
|
|
"github.com/sideshow/apns2/token"
|
2021-05-10 00:51:15 +00:00
|
|
|
|
2021-07-05 23:22:24 +00:00
|
|
|
"github.com/christianselig/apollo-backend/internal/data"
|
|
|
|
"github.com/christianselig/apollo-backend/internal/reddit"
|
2021-05-10 00:51:15 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type application struct {
|
|
|
|
logger *log.Logger
|
|
|
|
db *sql.DB
|
|
|
|
models *data.Models
|
|
|
|
client *reddit.Client
|
|
|
|
}
|
|
|
|
|
2021-07-06 00:43:59 +00:00
|
|
|
var workers int = runtime.NumCPU() * 8
|
2021-06-24 02:19:43 +00:00
|
|
|
|
|
|
|
func accountWorker(id int, rc *reddit.Client, db *sql.DB, logger *log.Logger, quit chan bool) {
|
2021-07-06 01:30:42 +00:00
|
|
|
authKey, err := token.AuthKeyFromBytes([]byte(os.Getenv("APPLE_KEY_PKEY")))
|
2021-06-24 02:19:43 +00:00
|
|
|
token := &token.Token{
|
|
|
|
AuthKey: authKey,
|
2021-07-06 01:30:42 +00:00
|
|
|
KeyID: os.Getenv("APPLE_KEY_ID"),
|
|
|
|
TeamID: os.Getenv("APPLE_TEAM_ID"),
|
2021-06-24 02:19:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
log.Fatal("token error:", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
client := apns2.NewTokenClient(token)
|
|
|
|
|
2021-07-06 02:09:04 +00:00
|
|
|
statsd, err := statsd.New("127.0.0.1:8125")
|
|
|
|
if err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2021-06-24 02:19:43 +00:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-quit:
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
now := time.Now().UTC().Unix()
|
|
|
|
|
|
|
|
tx, err := db.Begin()
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
query := `
|
2021-07-06 02:13:39 +00:00
|
|
|
SELECT id, access_token, refresh_token, expires_at, last_message_id, last_checked_at FROM accounts
|
2021-06-24 02:19:43 +00:00
|
|
|
WHERE last_checked_at <= $1 - 5
|
|
|
|
ORDER BY last_checked_at
|
|
|
|
LIMIT 1
|
|
|
|
FOR UPDATE SKIP LOCKED`
|
|
|
|
args := []interface{}{now}
|
|
|
|
|
|
|
|
account := &data.Account{}
|
2021-07-06 02:13:39 +00:00
|
|
|
err = tx.QueryRow(query, args...).Scan(&account.ID, &account.AccessToken, &account.RefreshToken, &account.ExpiresAt, &account.LastMessageID, &account.LastCheckedAt)
|
2021-06-24 02:19:43 +00:00
|
|
|
|
|
|
|
if account.ID == 0 {
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
tx.Commit()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2021-07-06 02:13:39 +00:00
|
|
|
statsd.Histogram("apollo.notification.latency", (now - 5 - account.LastCheckedAt))
|
2021-06-24 02:19:43 +00:00
|
|
|
logger.Printf("Worker #%d, account %d", id, account.ID)
|
|
|
|
|
|
|
|
_, err = tx.Exec(`UPDATE accounts SET last_checked_at = $1 WHERE id = $2`, now, account.ID)
|
|
|
|
|
|
|
|
rac := rc.NewAuthenticatedClient(account.RefreshToken, account.AccessToken)
|
|
|
|
if account.ExpiresAt < now {
|
|
|
|
tokens, _ := rac.RefreshTokens()
|
|
|
|
tx.Exec(`UPDATE accounts SET access_token = $1, refresh_token = $2, expires_at = $3 WHERE id = $4`,
|
|
|
|
tokens.AccessToken, tokens.RefreshToken, now+3500, account.ID)
|
|
|
|
}
|
|
|
|
|
2021-07-06 02:09:04 +00:00
|
|
|
t1 := time.Now()
|
2021-06-24 02:19:43 +00:00
|
|
|
msgs, err := rac.MessageInbox(account.LastMessageID)
|
2021-07-06 02:09:04 +00:00
|
|
|
t2 := time.Now()
|
|
|
|
statsd.Histogram("reddit.api.latency", t2.Sub(t1).Milliseconds())
|
|
|
|
|
2021-06-24 02:19:43 +00:00
|
|
|
if err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(msgs.MessageListing.Messages) == 0 {
|
|
|
|
tx.Commit()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// Set latest message we alerted on
|
|
|
|
latestMsg := msgs.MessageListing.Messages[0]
|
|
|
|
|
|
|
|
_, err = tx.Exec(`UPDATE accounts SET last_message_id = $1 WHERE id = $2`, latestMsg.FullName(), account.ID)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2021-07-06 01:55:01 +00:00
|
|
|
// If no latest message recorded, we're not going to notify on every message. Remember that and move on.
|
|
|
|
if account.LastMessageID == "" {
|
|
|
|
tx.Commit()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2021-07-06 01:39:09 +00:00
|
|
|
devices := []string{}
|
|
|
|
query = `
|
|
|
|
SELECT apns_token FROM devices
|
|
|
|
LEFT JOIN devices_accounts ON devices.id = devices_accounts.device_id
|
|
|
|
WHERE devices_accounts.account_id = $1`
|
|
|
|
|
|
|
|
rows, err := tx.Query(query, account.ID)
|
|
|
|
if err != nil {
|
|
|
|
logger.Fatal(err)
|
|
|
|
}
|
|
|
|
for rows.Next() {
|
|
|
|
var device string
|
|
|
|
rows.Scan(&device)
|
|
|
|
devices = append(devices, device)
|
|
|
|
}
|
|
|
|
rows.Close()
|
|
|
|
|
2021-06-24 02:19:43 +00:00
|
|
|
for _, msg := range msgs.MessageListing.Messages {
|
2021-07-06 01:39:09 +00:00
|
|
|
for _, device := range devices {
|
|
|
|
notification := &apns2.Notification{}
|
|
|
|
notification.DeviceToken = device
|
|
|
|
notification.Topic = "com.christianselig.Apollo"
|
|
|
|
notification.Payload = payload.NewPayload().AlertTitle(msg.Subject).AlertBody(msg.Body)
|
2021-07-06 02:09:04 +00:00
|
|
|
t1 := time.Now()
|
2021-07-06 01:39:09 +00:00
|
|
|
res, err := client.Push(notification)
|
2021-07-06 02:09:04 +00:00
|
|
|
t2 := time.Now()
|
|
|
|
statsd.Histogram("apns.notification.latency", t2.Sub(t1).Milliseconds())
|
2021-07-06 01:39:09 +00:00
|
|
|
if err != nil {
|
2021-07-06 02:09:04 +00:00
|
|
|
statsd.Incr("apns.notification.errors", 1)
|
2021-07-06 01:42:22 +00:00
|
|
|
logger.Printf("Error sending push to %s: %s", device, err)
|
2021-07-06 01:39:09 +00:00
|
|
|
} else {
|
2021-07-06 02:09:04 +00:00
|
|
|
statsd.Incr("apns.notification.sent", 1)
|
2021-07-06 01:39:09 +00:00
|
|
|
logger.Printf("Push response: %v %v %v\n", res.StatusCode, res.ApnsID, res.Reason)
|
|
|
|
}
|
2021-07-06 01:19:29 +00:00
|
|
|
}
|
2021-06-24 02:19:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
tx.Commit()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-05-10 00:51:15 +00:00
|
|
|
func main() {
|
|
|
|
logger := log.New(os.Stdout, "", log.Ldate|log.Ltime)
|
|
|
|
|
2021-07-06 00:20:00 +00:00
|
|
|
if err := godotenv.Load(); err != nil {
|
|
|
|
logger.Printf("Couldn't find .env so I will read from existing ENV.")
|
|
|
|
}
|
|
|
|
|
2021-06-24 02:19:43 +00:00
|
|
|
rc := reddit.NewClient(os.Getenv("REDDIT_CLIENT_ID"), os.Getenv("REDDIT_CLIENT_SECRET"))
|
|
|
|
|
2021-07-06 00:37:47 +00:00
|
|
|
dburl, ok := os.LookupEnv("DATABASE_CONNECTION_POOL_URL")
|
|
|
|
if !ok {
|
|
|
|
dburl = os.Getenv("DATABASE_URL")
|
|
|
|
}
|
|
|
|
|
2021-07-06 01:12:09 +00:00
|
|
|
db, err := sql.Open("postgres", fmt.Sprintf("%s?binary_parameters=yes", dburl))
|
2021-05-10 00:51:15 +00:00
|
|
|
if err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
}
|
|
|
|
defer db.Close()
|
|
|
|
|
2021-07-05 23:37:18 +00:00
|
|
|
logger.Printf("Starting with %d workers.", workers)
|
|
|
|
|
2021-06-24 02:19:43 +00:00
|
|
|
db.SetMaxOpenConns(workers)
|
2021-05-10 00:51:15 +00:00
|
|
|
|
2021-06-24 02:19:43 +00:00
|
|
|
// This is a very conservative value -- seen as most of the work that is done in these jobs is
|
|
|
|
//
|
|
|
|
runtime.GOMAXPROCS(workers)
|
|
|
|
quitCh := make(chan bool, workers)
|
|
|
|
for i := 0; i < workers; i++ {
|
|
|
|
go accountWorker(i, rc, db, logger, quitCh)
|
2021-05-10 00:51:15 +00:00
|
|
|
}
|
|
|
|
|
2021-06-24 02:19:43 +00:00
|
|
|
sigs := make(chan os.Signal, 1)
|
|
|
|
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
|
|
|
|
<-sigs
|
|
|
|
|
|
|
|
for i := 0; i < workers; i++ {
|
|
|
|
quitCh <- true
|
|
|
|
}
|
|
|
|
os.Exit(0)
|
2021-05-10 00:51:15 +00:00
|
|
|
}
|