mirror of
https://github.com/christianselig/apollo-backend
synced 2024-11-22 19:57:43 +00:00
use a locking mechanism for the scheduler instead of the singleton model in the cron lib
This commit is contained in:
parent
c216161836
commit
e35fee4871
3 changed files with 19 additions and 9 deletions
|
@ -23,8 +23,14 @@ import (
|
||||||
"github.com/christianselig/apollo-backend/internal/repository"
|
"github.com/christianselig/apollo-backend/internal/repository"
|
||||||
)
|
)
|
||||||
|
|
||||||
const batchSize = 250
|
const (
|
||||||
const accountEnqueueSeconds = 60
|
batchSize = 250
|
||||||
|
accountEnqueueSeconds = 60
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
enqueueAccountsMutex sync.Mutex
|
||||||
|
)
|
||||||
|
|
||||||
func SchedulerCmd(ctx context.Context) *cobra.Command {
|
func SchedulerCmd(ctx context.Context) *cobra.Command {
|
||||||
cmd := &cobra.Command{
|
cmd := &cobra.Command{
|
||||||
|
@ -103,9 +109,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
|
||||||
s := gocron.NewScheduler(time.UTC)
|
s := gocron.NewScheduler(time.UTC)
|
||||||
s.SetMaxConcurrentJobs(8, gocron.WaitMode)
|
s.SetMaxConcurrentJobs(8, gocron.WaitMode)
|
||||||
|
|
||||||
eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) })
|
_, _ = s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) })
|
||||||
eaj.SingletonMode()
|
|
||||||
|
|
||||||
_, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) })
|
_, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) })
|
||||||
_, _ = s.Every(5).Seconds().Do(func() { enqueueUsers(ctx, logger, statsd, db, userQueue) })
|
_, _ = s.Every(5).Seconds().Do(func() { enqueueUsers(ctx, logger, statsd, db, userQueue) })
|
||||||
_, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, liveActivitiesQueue) })
|
_, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, liveActivitiesQueue) })
|
||||||
|
@ -456,6 +460,12 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats
|
||||||
}
|
}
|
||||||
|
|
||||||
func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) {
|
func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) {
|
||||||
|
if enqueueAccountsMutex.TryLock() {
|
||||||
|
defer enqueueAccountsMutex.Unlock()
|
||||||
|
} else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
|
4
internal/reddit/testdata/refresh_token.json
vendored
4
internal/reddit/testdata/refresh_token.json
vendored
|
@ -1,7 +1,7 @@
|
||||||
{
|
{
|
||||||
"access_token": "***REMOVED***",
|
"access_token": "xxx",
|
||||||
"token_type": "bearer",
|
"token_type": "bearer",
|
||||||
"expires_in": 3600,
|
"expires_in": 3600,
|
||||||
"refresh_token": "***REMOVED***",
|
"refresh_token": "yyy",
|
||||||
"scope": "account creddits edit flair history identity livemanage modconfig modcontributors modflair modlog modmail modothers modposts modself modtraffic modwiki mysubreddits privatemessages read report save structuredstyles submit subscribe vote wikiedit wikiread"
|
"scope": "account creddits edit flair history identity livemanage modconfig modcontributors modflair modlog modmail modothers modposts modself modtraffic modwiki mysubreddits privatemessages read report save structuredstyles submit subscribe vote wikiedit wikiread"
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,8 +57,8 @@ func TestRefreshTokenResponseParsing(t *testing.T) {
|
||||||
rtr := ret.(*reddit.RefreshTokenResponse)
|
rtr := ret.(*reddit.RefreshTokenResponse)
|
||||||
assert.NotNil(t, rtr)
|
assert.NotNil(t, rtr)
|
||||||
|
|
||||||
assert.Equal(t, "***REMOVED***", rtr.AccessToken)
|
assert.Equal(t, "xxx", rtr.AccessToken)
|
||||||
assert.Equal(t, "***REMOVED***", rtr.RefreshToken)
|
assert.Equal(t, "yyy", rtr.RefreshToken)
|
||||||
assert.Equal(t, 1*time.Hour, rtr.Expiry)
|
assert.Equal(t, 1*time.Hour, rtr.Expiry)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue