From 3b9da79e6ee5680d55ae41339eb4690c51d6c7c6 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sun, 13 Nov 2022 09:08:55 -0500 Subject: [PATCH] [render skip] add development in favour of sandbox --- internal/domain/account.go | 1 + internal/domain/live_activity.go | 6 +++--- internal/repository/postgres_account.go | 21 ++++++++++++------- internal/repository/postgres_live_activity.go | 10 ++++----- internal/worker/live_activities.go | 17 ++++++++++----- internal/worker/notifications.go | 13 +++++++++--- 6 files changed, 44 insertions(+), 24 deletions(-) diff --git a/internal/domain/account.go b/internal/domain/account.go index f32bf9c..7630f0f 100644 --- a/internal/domain/account.go +++ b/internal/domain/account.go @@ -25,6 +25,7 @@ type Account struct { AccessToken string RefreshToken string TokenExpiresAt time.Time + Development bool // Tracking how far behind we are LastMessageID string diff --git a/internal/domain/live_activity.go b/internal/domain/live_activity.go index 4a9bd50..dd67f28 100644 --- a/internal/domain/live_activity.go +++ b/internal/domain/live_activity.go @@ -11,9 +11,9 @@ const ( ) type LiveActivity struct { - ID int64 - APNSToken string `json:"apns_token"` - Sandbox bool `json:"sandbox"` + ID int64 + APNSToken string `json:"apns_token"` + Development bool `json:"development"` RedditAccountID string `json:"reddit_account_id"` AccessToken string `json:"access_token"` diff --git a/internal/repository/postgres_account.go b/internal/repository/postgres_account.go index fcd3e9d..33c02f8 100644 --- a/internal/repository/postgres_account.go +++ b/internal/repository/postgres_account.go @@ -48,6 +48,7 @@ func (p *postgresAccountRepository) fetch(ctx context.Context, query string, arg &acc.NextNotificationCheckAt, &acc.NextStuckNotificationCheckAt, &acc.CheckCount, + &acc.Development, ); err != nil { return nil, err } @@ -60,7 +61,7 @@ func (p *postgresAccountRepository) GetByID(ctx context.Context, id int64) (doma query := ` SELECT id, username, reddit_account_id, access_token, refresh_token, token_expires_at, last_message_id, next_notification_check_at, next_stuck_notification_check_at, - check_count + check_count, development FROM accounts WHERE id = $1 AND is_deleted IS FALSE` @@ -79,7 +80,7 @@ func (p *postgresAccountRepository) GetByRedditID(ctx context.Context, id string query := ` SELECT id, username, reddit_account_id, access_token, refresh_token, token_expires_at, last_message_id, next_notification_check_at, next_stuck_notification_check_at, - check_count + check_count, development FROM accounts WHERE reddit_account_id = $1 AND is_deleted IS FALSE` @@ -97,8 +98,8 @@ func (p *postgresAccountRepository) GetByRedditID(ctx context.Context, id string func (p *postgresAccountRepository) CreateOrUpdate(ctx context.Context, acc *domain.Account) error { query := ` INSERT INTO accounts (username, reddit_account_id, access_token, refresh_token, token_expires_at, - last_message_id, next_notification_check_at, next_stuck_notification_check_at, is_deleted) - VALUES ($1, $2, $3, $4, $5, $6, NOW(), NOW(), FALSE) + last_message_id, next_notification_check_at, next_stuck_notification_check_at, is_deleted, development) + VALUES ($1, $2, $3, $4, $5, $6, NOW(), NOW(), FALSE, $7) ON CONFLICT(username) DO UPDATE SET access_token = $3, refresh_token = $4, @@ -119,6 +120,7 @@ func (p *postgresAccountRepository) CreateOrUpdate(ctx context.Context, acc *dom acc.RefreshToken, acc.TokenExpiresAt, acc.LastMessageID, + acc.Development, ).Scan(&acc.ID); err != nil { span.SetStatus(codes.Error, "failed upserting account") span.RecordError(err) @@ -132,8 +134,8 @@ func (p *postgresAccountRepository) Create(ctx context.Context, acc *domain.Acco query := ` INSERT INTO accounts (username, reddit_account_id, access_token, refresh_token, token_expires_at, - last_message_id, next_notification_check_at, next_stuck_notification_check_at, is_deleted) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, FALSE) + last_message_id, next_notification_check_at, next_stuck_notification_check_at, is_deleted, development) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, FALSE, $9) RETURNING id` ctx, span := spanWithQuery(ctx, p.tracer, query) @@ -150,6 +152,7 @@ func (p *postgresAccountRepository) Create(ctx context.Context, acc *domain.Acco acc.LastMessageID, acc.NextNotificationCheckAt, acc.NextStuckNotificationCheckAt, + acc.Development, ).Scan(&acc.ID); err != nil { span.SetStatus(codes.Error, "failed inserting account") span.RecordError(err) @@ -170,7 +173,8 @@ func (p *postgresAccountRepository) Update(ctx context.Context, acc *domain.Acco last_message_id = $7, next_notification_check_at = $8, next_stuck_notification_check_at = $9, - check_count = $10 + check_count = $10, + development = $11 WHERE id = $1` ctx, span := spanWithQuery(ctx, p.tracer, query) @@ -189,6 +193,7 @@ func (p *postgresAccountRepository) Update(ctx context.Context, acc *domain.Acco acc.NextNotificationCheckAt, acc.NextStuckNotificationCheckAt, acc.CheckCount, + acc.Development, ); err != nil { span.SetStatus(codes.Error, "failed to update account") span.RecordError(err) @@ -248,7 +253,7 @@ func (p *postgresAccountRepository) GetByAPNSToken(ctx context.Context, token st query := ` SELECT accounts.id, username, accounts.reddit_account_id, access_token, refresh_token, token_expires_at, last_message_id, next_notification_check_at, next_stuck_notification_check_at, - check_count + check_count, development FROM accounts INNER JOIN devices_accounts ON accounts.id = devices_accounts.account_id INNER JOIN devices ON devices.id = devices_accounts.device_id diff --git a/internal/repository/postgres_live_activity.go b/internal/repository/postgres_live_activity.go index 0eb0e7e..af08235 100644 --- a/internal/repository/postgres_live_activity.go +++ b/internal/repository/postgres_live_activity.go @@ -28,7 +28,6 @@ func (p *postgresLiveActivityRepository) fetch(ctx context.Context, query string if err := rows.Scan( &la.ID, &la.APNSToken, - &la.Sandbox, &la.RedditAccountID, &la.AccessToken, &la.RefreshToken, @@ -37,6 +36,7 @@ func (p *postgresLiveActivityRepository) fetch(ctx context.Context, query string &la.Subreddit, &la.NextCheckAt, &la.ExpiresAt, + &la.Development, ); err != nil { return nil, err } @@ -47,7 +47,7 @@ func (p *postgresLiveActivityRepository) fetch(ctx context.Context, query string func (p *postgresLiveActivityRepository) Get(ctx context.Context, apnsToken string) (domain.LiveActivity, error) { query := ` - SELECT id, apns_token, sandbox, reddit_account_id, access_token, refresh_token, token_expires_at, thread_id, subreddit, next_check_at, expires_at + SELECT id, apns_token, reddit_account_id, access_token, refresh_token, token_expires_at, thread_id, subreddit, next_check_at, expires_at, development FROM live_activities WHERE apns_token = $1` @@ -64,7 +64,7 @@ func (p *postgresLiveActivityRepository) Get(ctx context.Context, apnsToken stri func (p *postgresLiveActivityRepository) List(ctx context.Context) ([]domain.LiveActivity, error) { query := ` - SELECT id, apns_token, sandbox, reddit_account_id, access_token, refresh_token, token_expires_at, thread_id, subreddit, next_check_at, expires_at + SELECT id, apns_token, reddit_account_id, access_token, refresh_token, token_expires_at, thread_id, subreddit, next_check_at, expires_at, development FROM live_activities WHERE expires_at > NOW()` @@ -73,14 +73,13 @@ func (p *postgresLiveActivityRepository) List(ctx context.Context) ([]domain.Liv func (p *postgresLiveActivityRepository) Create(ctx context.Context, la *domain.LiveActivity) error { query := ` - INSERT INTO live_activities (apns_token, sandbox, reddit_account_id, access_token, refresh_token, token_expires_at, thread_id, subreddit, next_check_at, expires_at) + INSERT INTO live_activities (apns_token, reddit_account_id, access_token, refresh_token, token_expires_at, thread_id, subreddit, next_check_at, expires_at, development) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (apns_token) DO UPDATE SET expires_at = $10 RETURNING id` return p.conn.QueryRow(ctx, query, la.APNSToken, - la.Sandbox, la.RedditAccountID, la.AccessToken, la.RefreshToken, @@ -89,6 +88,7 @@ func (p *postgresLiveActivityRepository) Create(ctx context.Context, la *domain. la.Subreddit, time.Now(), time.Now().Add(domain.LiveActivityDuration), + la.Development, ).Scan(&la.ID) } diff --git a/internal/worker/live_activities.go b/internal/worker/live_activities.go index 1ebfe42..64d69b1 100644 --- a/internal/worker/live_activities.go +++ b/internal/worker/live_activities.go @@ -125,7 +125,8 @@ type liveActivitiesConsumer struct { *liveActivitiesWorker tag int - apns *apns2.Client + papns *apns2.Client + dapns *apns2.Client } func NewLiveActivitiesConsumer(law *liveActivitiesWorker, tag int) *liveActivitiesConsumer { @@ -133,6 +134,7 @@ func NewLiveActivitiesConsumer(law *liveActivitiesWorker, tag int) *liveActiviti law, tag, apns2.NewTokenClient(law.apns).Production(), + apns2.NewTokenClient(law.apns).Development(), } } @@ -295,13 +297,18 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { Payload: bb, } - res, err := lac.apns.PushWithContext(ctx, notification) + client := lac.papns + if la.Development { + client = lac.dapns + } + + res, err := client.PushWithContext(ctx, notification) if err != nil { _ = lac.statsd.Incr("apns.live_activities.errors", []string{}, 1) lac.logger.Error("failed to send notification", zap.Error(err), zap.String("live_activity#apns_token", at), - zap.Bool("live_activity#sandbox", la.Sandbox), + zap.Bool("live_activity#development", la.Development), zap.String("notification#type", ev), ) @@ -310,7 +317,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { _ = lac.statsd.Incr("apns.live_activities.errors", []string{}, 1) lac.logger.Error("notification not sent", zap.String("live_activity#apns_token", at), - zap.Bool("live_activity#sandbox", la.Sandbox), + zap.Bool("live_activity#development", la.Development), zap.String("notification#type", ev), zap.Int("response#status", res.StatusCode), zap.String("response#reason", res.Reason), @@ -321,7 +328,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { _ = lac.statsd.Incr("apns.notification.sent", []string{}, 1) lac.logger.Debug("sent notification", zap.String("live_activity#apns_token", at), - zap.Bool("live_activity#sandbox", la.Sandbox), + zap.Bool("live_activity#development", la.Development), zap.String("notification#type", ev), ) } diff --git a/internal/worker/notifications.go b/internal/worker/notifications.go index f4a0d64..b4e3a21 100644 --- a/internal/worker/notifications.go +++ b/internal/worker/notifications.go @@ -125,8 +125,9 @@ func (nw *notificationsWorker) Stop() { type notificationsConsumer struct { *notificationsWorker - tag int - apns *apns2.Client + tag int + papns *apns2.Client + dapns *apns2.Client } func NewNotificationsConsumer(nw *notificationsWorker, tag int) *notificationsConsumer { @@ -134,6 +135,7 @@ func NewNotificationsConsumer(nw *notificationsWorker, tag int) *notificationsCo nw, tag, apns2.NewTokenClient(nw.apns).Production(), + apns2.NewTokenClient(nw.apns).Development(), } } @@ -300,10 +302,15 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { notification.Topic = "com.christianselig.Apollo" notification.Payload = payloadFromMessage(account, msg, msgs.Count) + client := nc.papns + if account.Development { + client = nc.dapns + } + for _, device := range devices { notification.DeviceToken = device.APNSToken - res, err := nc.apns.PushWithContext(ctx, notification) + res, err := client.PushWithContext(ctx, notification) if err != nil { _ = nc.statsd.Incr("apns.notification.errors", []string{}, 1) logger.Error("failed to send notification",