From d36d4f9b01a87b3bd804e4af7c7170c6365439df Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Wed, 2 Nov 2022 00:49:20 -0400 Subject: [PATCH] migrate from rmq to faktory --- go.mod | 4 +- go.sum | 33 +-- internal/cmd/scheduler.go | 160 +++--------- internal/cmd/worker.go | 34 +-- internal/cmdutil/cmdutil.go | 13 +- internal/worker/live_activities.go | 167 ++++--------- internal/worker/notifications.go | 169 ++++--------- internal/worker/stuck_notifications.go | 148 +++-------- internal/worker/subreddits.go | 168 ++++--------- internal/worker/trending.go | 164 ++++-------- internal/worker/users.go | 332 ------------------------- internal/worker/worker.go | 6 +- render.yaml | 2 +- 13 files changed, 300 insertions(+), 1100 deletions(-) delete mode 100644 internal/worker/users.go diff --git a/go.mod b/go.mod index 68081a4..b1a7063 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,9 @@ go 1.18 require ( github.com/DataDog/datadog-go v4.8.3+incompatible - github.com/adjust/rmq/v5 v5.0.1 github.com/bugsnag/bugsnag-go/v2 v2.2.0 + github.com/contribsys/faktory v1.6.2 + github.com/contribsys/faktory_worker_go v1.6.0 github.com/dustin/go-humanize v1.0.0 github.com/go-co-op/gocron v1.17.1 github.com/go-ozzo/ozzo-validation/v4 v4.3.0 @@ -52,5 +53,6 @@ require ( golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect golang.org/x/text v0.3.7 // indirect + gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 7714fa1..ced0305 100644 --- a/go.sum +++ b/go.sum @@ -11,8 +11,6 @@ github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpz github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= -github.com/adjust/rmq/v5 v5.0.1 h1:3d8IjgB6P5TzkCC8dF+ddzgaM+qHdK/6hKnZ8Jmt1ms= -github.com/adjust/rmq/v5 v5.0.1/go.mod h1:dLrg3gSOWYcXgEifgF8T+kttf3T38Ru5EPC7sTvKghI= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20201120081800-1786d5ef83d4/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= @@ -34,7 +32,6 @@ github.com/bugsnag/panicwrap v1.3.4/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywR github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= -github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -44,6 +41,10 @@ github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWH github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/codegangsta/negroni v1.0.0/go.mod h1:v0y3T5G7Y1UlFfyxFn/QLRU4a2EuNau2iZY63YTKWo0= +github.com/contribsys/faktory v1.6.2 h1:HuqJI9ZEeInN2nJg10WRy8zPpxNwVIZgACbex7wQG1A= +github.com/contribsys/faktory v1.6.2/go.mod h1:R8+inlM1rq3GzyG8iZUL7qhfNfXGIgcbQRvTHmSyuUI= +github.com/contribsys/faktory_worker_go v1.6.0 h1:ov69BLHL62i/wRLJwvuj5UphwgjMOINRCGW3KzrKOjk= +github.com/contribsys/faktory_worker_go v1.6.0/go.mod h1:XMNGn3sBJdqFGfTH4SkmYkMovhdkq5cDJj36wowfbNY= github.com/coreos/go-oidc/v3 v3.2.0/go.mod h1:rEJ/idjfUyfkBit1eI1fvyr+64/g9dcKpAm8MJMesvo= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= @@ -67,9 +68,7 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= -github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= github.com/go-co-op/gocron v1.17.1 h1:oEu3xGNVn9IGukN3JPzOsfaBoTGYmUVHtR9d1cv1cq8= @@ -83,7 +82,6 @@ github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiU github.com/go-ole/go-ole v1.2.6-0.20210915003542-8b1f7f90f6b1/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ozzo/ozzo-validation/v4 v4.3.0 h1:byhDUpfEwjsVQb1vBunvIjh2BHQ9ead57VkAEY4V+Es= github.com/go-ozzo/ozzo-validation/v4 v4.3.0/go.mod h1:2NKgrcHl3z6cJs+3Oo940FPRiTzuqKbvfrL2RxCj6Ew= -github.com/go-redis/redis/v8 v8.3.2/go.mod h1:jszGxBCez8QA1HWSmQxJO9Y82kNibbUmeYhKWrBejTU= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -123,7 +121,6 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gops v0.3.22/go.mod h1:7diIdLsqpCihPSX3fQagksT/Ku/y4RL9LHTlKyEUDl8= @@ -145,7 +142,6 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/heroku/rollrus v0.2.0/go.mod h1:B3MwEcr9nmf4xj0Sr5l9eSht7wLKMa1C+9ajgAU79ek= github.com/heroku/x v0.0.55 h1:LSXseirdcQaVobauVkRLbN1VnxVmRQgRABrDA1Cz2Q8= github.com/heroku/x v0.0.55/go.mod h1:YZxbWdDeSewnf/CDZM2UXCZZPU9JZfObfms5FT3P8NA= -github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hydrogen18/memlistener v0.0.0-20141126152155-54553eb933fb/go.mod h1:qEIFzExnS6016fRpRfxrExeVn2gbClQA99gQhnIcdhE= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc= @@ -234,16 +230,9 @@ github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= -github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= -github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= -github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -319,7 +308,6 @@ github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxt go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA= -go.opentelemetry.io/otel v0.13.0/go.mod h1:dlSNewoRYikTkotEnxdmuBHgzT+k/idJSfDv/FxEnOY= go.opentelemetry.io/otel v1.0.0-RC1/go.mod h1:x9tRa9HK4hSSq7jf2TKbqFbtt58/TGk0f9XiEYISI1I= go.opentelemetry.io/otel v1.0.0-RC3/go.mod h1:Ka5j3ua8tZs4Rkq4Ex3hwgBgOchyPVq5S6P2lz//nKQ= go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.22.0/go.mod h1:gIp6+vQxqmh6Vd/mucqnsaFpOuVycQAS/BBXMKzJk0w= @@ -384,7 +372,6 @@ golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKG golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -397,9 +384,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200505041828-1ed23360d12c/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220403103023-749bd193bc2b h1:vI32FkLJNAWtGD4BwkThwEy6XS7ZLLMHkSkYfF8M0W0= @@ -417,7 +402,6 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cO golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -429,16 +413,11 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -537,18 +516,14 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/square/go-jose.v2 v2.5.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/cmd/scheduler.go b/internal/cmd/scheduler.go index 5fa3af3..09412e7 100644 --- a/internal/cmd/scheduler.go +++ b/internal/cmd/scheduler.go @@ -11,7 +11,7 @@ import ( "time" "github.com/DataDog/datadog-go/statsd" - "github.com/adjust/rmq/v5" + faktory "github.com/contribsys/faktory/client" "github.com/go-co-op/gocron" "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v4/pgxpool" @@ -52,7 +52,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command { } defer redis.Close() - queue, err := cmdutil.NewQueueClient(logger, redis, "worker") + fc, err := cmdutil.NewFaktoryClient(logger) if err != nil { return err } @@ -63,47 +63,15 @@ func SchedulerCmd(ctx context.Context) *cobra.Command { return err } - notifQueue, err := queue.OpenQueue("notifications") - if err != nil { - return err - } - - subredditQueue, err := queue.OpenQueue("subreddits") - if err != nil { - return err - } - - trendingQueue, err := queue.OpenQueue("trending") - if err != nil { - return err - } - - userQueue, err := queue.OpenQueue("users") - if err != nil { - return err - } - - stuckNotificationsQueue, err := queue.OpenQueue("stuck-notifications") - if err != nil { - return err - } - - liveActivitiesQueue, err := queue.OpenQueue("live-activities") - if err != nil { - return err - } - s := gocron.NewScheduler(time.UTC) s.SetMaxConcurrentJobs(8, gocron.WaitMode) - eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) }) + eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, fc) }) eaj.SingletonMode() - _, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) }) - _, _ = s.Every(5).Seconds().Do(func() { enqueueUsers(ctx, logger, statsd, db, userQueue) }) - _, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, liveActivitiesQueue) }) - _, _ = s.Every(5).Seconds().Do(func() { cleanQueues(logger, queue) }) - _, _ = s.Every(5).Seconds().Do(func() { enqueueStuckAccounts(ctx, logger, statsd, db, stuckNotificationsQueue) }) + _, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, fc) }) + _, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, fc) }) + _, _ = s.Every(5).Seconds().Do(func() { enqueueStuckAccounts(ctx, logger, statsd, db, fc) }) _, _ = s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, db) }) //_, _ = s.Every(1).Minute().Do(func() { pruneAccounts(ctx, logger, db) }) //_, _ = s.Every(1).Minute().Do(func() { pruneDevices(ctx, logger, db) }) @@ -144,7 +112,7 @@ func evalScript(ctx context.Context, redis *redis.Client) (string, error) { return redis.ScriptLoad(ctx, lua).Result() } -func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) { +func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, fc *faktory.Client) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -193,7 +161,12 @@ func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpoo logger.Debug("enqueueing live activity batch", zap.Int("count", len(batch)), zap.Time("start", now)) - if err = queue.Publish(batch...); err != nil { + jobs := make([]*faktory.Job, len(batch)) + for i, tok := range batch { + jobs[i] = faktory.NewJob("LiveActivityJob", tok) + } + + if _, err = fc.PushBulk(jobs); err != nil { logger.Error("failed to enqueue live activity batch", zap.Error(err)) } } @@ -240,19 +213,6 @@ func pruneDevices(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool) { } } -func cleanQueues(logger *zap.Logger, jobsConn rmq.Connection) { - cleaner := rmq.NewCleaner(jobsConn) - count, err := cleaner.Clean() - if err != nil { - logger.Error("failed to clean jobs from queues", zap.Error(err)) - return - } - - if count > 0 { - logger.Info("returned jobs to queues", zap.Int64("count", count)) - } -} - func reportStats(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -280,62 +240,7 @@ func reportStats(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, } } -func enqueueUsers(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - now := time.Now() - next := now.Add(domain.NotificationCheckInterval) - - ids := []int64{} - - defer func() { - tags := []string{"queue:users"} - _ = statsd.Histogram("apollo.queue.enqueued", float64(len(ids)), tags, 1) - _ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1) - }() - - stmt := ` - UPDATE users - SET next_check_at = $2 - WHERE id IN ( - SELECT id - FROM users - WHERE next_check_at < $1 - ORDER BY next_check_at - FOR UPDATE SKIP LOCKED - LIMIT 100 - ) - RETURNING users.id` - rows, err := pool.Query(ctx, stmt, now, next) - if err != nil { - logger.Error("failed to fetch batch of users", zap.Error(err)) - return - } - for rows.Next() { - var id int64 - _ = rows.Scan(&id) - ids = append(ids, id) - } - rows.Close() - - if len(ids) == 0 { - return - } - - logger.Debug("enqueueing user batch", zap.Int("count", len(ids)), zap.Time("start", now)) - - batchIds := make([]string, len(ids)) - for i, id := range ids { - batchIds[i] = strconv.FormatInt(id, 10) - } - - if err = queue.Publish(batchIds...); err != nil { - logger.Error("failed to enqueue user batch", zap.Error(err)) - } -} - -func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queues []rmq.Queue) { +func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, fc *faktory.Client) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -385,22 +290,24 @@ func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.C batchIds[i] = strconv.FormatInt(id, 10) } - for _, queue := range queues { - if err = queue.Publish(batchIds...); err != nil { - logger.Error("failed to enqueue subreddit batch", zap.Error(err)) - } + jobs := make([]*faktory.Job, len(batchIds)*2) + for i, id := range ids { + jobs[i*2] = faktory.NewJob("SubredditWatcherJob", id) + jobs[i*2+1] = faktory.NewJob("SubredditTrendingJob", id) + } + if _, err := fc.PushBulk(jobs); err != nil { + logger.Error("failed to enqueue subreddit batch", zap.Error(err)) } - } -func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) { +func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, fc *faktory.Client) { ctx, cancel := context.WithCancel(ctx) defer cancel() now := time.Now() next := now.Add(domain.StuckNotificationCheckInterval) - ids := []int64{} + ids := []string{} defer func() { tags := []string{"queue:stuck-accounts"} @@ -419,7 +326,7 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats FOR UPDATE SKIP LOCKED LIMIT 500 ) - RETURNING accounts.id` + RETURNING accounts.reddit_account_id` rows, err := pool.Query(ctx, stmt, now, next) if err != nil { logger.Error("failed to fetch accounts", zap.Error(err)) @@ -427,7 +334,7 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats } for rows.Next() { - var id int64 + var id string _ = rows.Scan(&id) ids = append(ids, id) } @@ -439,17 +346,17 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats logger.Debug("enqueueing stuck account batch", zap.Int("count", len(ids)), zap.Time("start", now)) - batchIds := make([]string, len(ids)) + jobs := make([]*faktory.Job, len(ids)) for i, id := range ids { - batchIds[i] = strconv.FormatInt(id, 10) + jobs[i] = faktory.NewJob("StuckNotificationsJob", id) } - if err = queue.Publish(batchIds...); err != nil { + if _, err = fc.PushBulk(jobs); err != nil { logger.Error("failed to enqueue stuck account batch", zap.Error(err)) } } -func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) { +func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, fc *faktory.Client) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -520,8 +427,13 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli return } - if err = queue.Publish(unlocked...); err != nil { - logger.Error("failed to enqueue account batch", zap.Error(err)) + jobs := make([]*faktory.Job, len(ids)) + for i, id := range unlocked { + jobs[i] = faktory.NewJob("NotificationCheckJob", id) + } + + if _, err = fc.PushBulk(jobs); err != nil { + logger.Error("failed to enqueue stuck account batch", zap.Error(err)) } }(i, ctx) } diff --git a/internal/cmd/worker.go b/internal/cmd/worker.go index 5d4a792..6e11750 100644 --- a/internal/cmd/worker.go +++ b/internal/cmd/worker.go @@ -5,6 +5,7 @@ import ( "fmt" "runtime" + faktoryworker "github.com/contribsys/faktory_worker_go" "github.com/spf13/cobra" "github.com/christianselig/apollo-backend/internal/cmdutil" @@ -13,12 +14,11 @@ import ( var ( queues = map[string]worker.NewWorkerFn{ - "live-activities": worker.NewLiveActivitiesWorker, - "notifications": worker.NewNotificationsWorker, - "stuck-notifications": worker.NewStuckNotificationsWorker, - "subreddits": worker.NewSubredditsWorker, - "trending": worker.NewTrendingWorker, - "users": worker.NewUsersWorker, + "LiveActivityJob": worker.NewLiveActivitiesWorker, + "NotificationCheckJob": worker.NewNotificationsWorker, + "StuckNotificationsJob": worker.NewStuckNotificationsWorker, + "SubredditWatcherJob": worker.NewSubredditsWorker, + "SubredditTrendingJob": worker.NewTrendingWorker, } ) @@ -63,26 +63,16 @@ func WorkerCmd(ctx context.Context) *cobra.Command { } defer redis.Close() - queue, err := cmdutil.NewQueueClient(logger, redis, "worker") - if err != nil { - return err - } - workerFn, ok := queues[queueID] if !ok { - return fmt.Errorf("invalid queue: %s", queueID) + return fmt.Errorf("queue does not exist: %s", queueID) } + worker := workerFn(ctx, logger, statsd, db, redis, consumers) - worker := workerFn(ctx, logger, statsd, db, redis, queue, consumers) - if err := worker.Start(); err != nil { - return err - } - - <-ctx.Done() - - worker.Stop() - - return nil + mgr := faktoryworker.NewManager() + mgr.Concurrency = consumers + mgr.Register(queueID, worker.Process) + return mgr.RunWithContext(ctx) }, } diff --git a/internal/cmdutil/cmdutil.go b/internal/cmdutil/cmdutil.go index 327a05f..49bce68 100644 --- a/internal/cmdutil/cmdutil.go +++ b/internal/cmdutil/cmdutil.go @@ -7,7 +7,7 @@ import ( "time" "github.com/DataDog/datadog-go/statsd" - "github.com/adjust/rmq/v5" + faktory "github.com/contribsys/faktory/client" "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v4/pgxpool" "go.uber.org/zap" @@ -74,13 +74,6 @@ func NewDatabasePool(ctx context.Context, maxConns int) (*pgxpool.Pool, error) { return pgxpool.ConnectConfig(ctx, config) } -func NewQueueClient(logger *zap.Logger, conn *redis.Client, identifier string) (rmq.Connection, error) { - errChan := make(chan error, 10) - go func() { - for err := range errChan { - logger.Error("error occurred within queue", zap.Error(err)) - } - }() - - return rmq.OpenConnectionWithRedisClient(identifier, conn, errChan) +func NewFaktoryClient(logger *zap.Logger) (*faktory.Client, error) { + return faktory.Open() } diff --git a/internal/worker/live_activities.go b/internal/worker/live_activities.go index 3c676e6..31aecb1 100644 --- a/internal/worker/live_activities.go +++ b/internal/worker/live_activities.go @@ -9,7 +9,6 @@ import ( "time" "github.com/DataDog/datadog-go/statsd" - "github.com/adjust/rmq/v5" "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v4/pgxpool" "github.com/sideshow/apns2" @@ -32,22 +31,16 @@ type DynamicIslandNotification struct { } type liveActivitiesWorker struct { - context.Context - - logger *zap.Logger - statsd *statsd.Client - db *pgxpool.Pool - redis *redis.Client - queue rmq.Connection - reddit *reddit.Client - apns *token.Token - - consumers int - + logger *zap.Logger + statsd *statsd.Client + db *pgxpool.Pool + redis *redis.Client + reddit *reddit.Client + apns *apns2.Client liveActivityRepo domain.LiveActivityRepository } -func NewLiveActivitiesWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { +func NewLiveActivitiesWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, consumers int) Worker { reddit := reddit.NewClient( os.Getenv("REDDIT_CLIENT_ID"), os.Getenv("REDDIT_CLIENT_SECRET"), @@ -56,123 +49,64 @@ func NewLiveActivitiesWorker(ctx context.Context, logger *zap.Logger, statsd *st consumers, ) - var apns *token.Token + var apns *apns2.Client { authKey, err := token.AuthKeyFromFile(os.Getenv("APPLE_KEY_PATH")) if err != nil { panic(err) } - apns = &token.Token{ + tok := &token.Token{ AuthKey: authKey, KeyID: os.Getenv("APPLE_KEY_ID"), TeamID: os.Getenv("APPLE_TEAM_ID"), } + apns = apns2.NewTokenClient(tok).Production() } return &liveActivitiesWorker{ - ctx, logger, statsd, db, redis, - queue, reddit, apns, - consumers, - repository.NewPostgresLiveActivity(db), } } -func (law *liveActivitiesWorker) Start() error { - queue, err := law.queue.OpenQueue("live-activities") - if err != nil { - return err - } - - law.logger.Info("starting up live activities worker", zap.Int("consumers", law.consumers)) - - prefetchLimit := int64(law.consumers * 4) - - if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil { - return err - } - - host, _ := os.Hostname() - - for i := 0; i < law.consumers; i++ { - name := fmt.Sprintf("consumer %s-%d", host, i) - - consumer := NewLiveActivitiesConsumer(law, i) - if _, err := queue.AddConsumer(name, consumer); err != nil { - return err - } - } - - return nil -} - -func (law *liveActivitiesWorker) Stop() { - <-law.queue.StopAllConsuming() // wait for all Consume() calls to finish -} - -type liveActivitiesConsumer struct { - *liveActivitiesWorker - tag int - - apns *apns2.Client -} - -func NewLiveActivitiesConsumer(law *liveActivitiesWorker, tag int) *liveActivitiesConsumer { - return &liveActivitiesConsumer{ - law, - tag, - apns2.NewTokenClient(law.apns).Production(), - } -} - -func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { - ctx, cancel := context.WithCancel(lac) - defer cancel() - +func (law *liveActivitiesWorker) Process(ctx context.Context, args ...interface{}) error { now := time.Now() defer func() { elapsed := time.Now().Sub(now).Milliseconds() - _ = lac.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), []string{"queue:live_activities"}, 0.1) + _ = law.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), []string{"queue:live_activities"}, 0.1) }() - at := delivery.Payload() + at := args[0].(string) key := fmt.Sprintf("locks:live-activities:%s", at) // Measure queue latency - ttl := lac.redis.PTTL(ctx, key).Val() + ttl := law.redis.PTTL(ctx, key).Val() age := (domain.NotificationCheckTimeout - ttl) - _ = lac.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), []string{"queue:live_activities"}, 0.1) + _ = law.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), []string{"queue:live_activities"}, 0.1) defer func() { - if err := lac.redis.Del(ctx, key).Err(); err != nil { - lac.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key)) + if err := law.redis.Del(ctx, key).Err(); err != nil { + law.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key)) } }() - lac.logger.Debug("starting job", zap.String("live_activity#apns_token", at)) + law.logger.Debug("starting job", zap.String("live_activity#apns_token", at)) - defer func() { - if err := delivery.Ack(); err != nil { - lac.logger.Error("failed to acknowledge message", zap.Error(err), zap.String("live_activity#apns_token", at)) - } - }() - - la, err := lac.liveActivityRepo.Get(ctx, at) + la, err := law.liveActivityRepo.Get(ctx, at) if err != nil { - lac.logger.Error("failed to get live activity", zap.Error(err), zap.String("live_activity#apns_token", at)) - return + law.logger.Error("failed to get live activity", zap.Error(err), zap.String("live_activity#apns_token", at)) + return err } - rac := lac.reddit.NewAuthenticatedClient(la.RedditAccountID, la.RefreshToken, la.AccessToken) + rac := law.reddit.NewAuthenticatedClient(la.RedditAccountID, la.RefreshToken, la.AccessToken) if la.TokenExpiresAt.Before(now.Add(5 * time.Minute)) { - lac.logger.Debug("refreshing reddit token", + law.logger.Debug("refreshing reddit token", zap.String("live_activity#apns_token", at), zap.String("reddit#id", la.RedditAccountID), zap.String("reddit#access_token", rac.ObfuscatedAccessToken()), @@ -181,7 +115,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { tokens, err := rac.RefreshTokens(ctx) if err != nil { - lac.logger.Error("failed to refresh reddit tokens", + law.logger.Error("failed to refresh reddit tokens", zap.Error(err), zap.String("live_activity#apns_token", at), zap.String("reddit#id", la.RedditAccountID), @@ -189,26 +123,28 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()), ) if err == reddit.ErrOauthRevoked { - _ = lac.liveActivityRepo.Delete(ctx, at) + _ = law.liveActivityRepo.Delete(ctx, at) + return nil } - return + + return err } // Update account la.AccessToken = tokens.AccessToken la.RefreshToken = tokens.RefreshToken la.TokenExpiresAt = now.Add(tokens.Expiry) - _ = lac.liveActivityRepo.Update(ctx, &la) + _ = law.liveActivityRepo.Update(ctx, &la) // Refresh client - rac = lac.reddit.NewAuthenticatedClient(la.RedditAccountID, tokens.RefreshToken, tokens.AccessToken) + rac = law.reddit.NewAuthenticatedClient(la.RedditAccountID, tokens.RefreshToken, tokens.AccessToken) } - lac.logger.Debug("fetching latest comments", zap.String("live_activity#apns_token", at)) + law.logger.Debug("fetching latest comments", zap.String("live_activity#apns_token", at)) tr, err := rac.TopLevelComments(ctx, la.Subreddit, la.ThreadID) if err != nil { - lac.logger.Error("failed to fetch latest comments", + law.logger.Error("failed to fetch latest comments", zap.Error(err), zap.String("live_activity#apns_token", at), zap.String("reddit#id", la.RedditAccountID), @@ -216,14 +152,16 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()), ) if err == reddit.ErrOauthRevoked { - _ = lac.liveActivityRepo.Delete(ctx, at) + _ = law.liveActivityRepo.Delete(ctx, at) + return nil } - return + + return err } if len(tr.Children) == 0 && la.ExpiresAt.After(now) { - lac.logger.Debug("no comments found", zap.String("live_activity#apns_token", at)) - return + law.logger.Debug("no comments found", zap.String("live_activity#apns_token", at)) + return nil } // Filter out comments in the last minute @@ -247,8 +185,8 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { } if len(candidates) == 0 && la.ExpiresAt.After(now) { - lac.logger.Debug("no new comments found", zap.String("live_activity#apns_token", at)) - return + law.logger.Debug("no new comments found", zap.String("live_activity#apns_token", at)) + return nil } sort.Slice(candidates, func(i, j int) bool { @@ -291,20 +229,20 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { Payload: bb, } - res, err := lac.apns.PushWithContext(ctx, notification) + res, err := law.apns.PushWithContext(ctx, notification) if err != nil { - _ = lac.statsd.Incr("apns.live_activities.errors", []string{}, 1) - lac.logger.Error("failed to send notification", + _ = law.statsd.Incr("apns.live_activities.errors", []string{}, 1) + law.logger.Error("failed to send notification", zap.Error(err), zap.String("live_activity#apns_token", at), zap.Bool("live_activity#sandbox", la.Sandbox), zap.String("notification#type", ev), ) - _ = lac.liveActivityRepo.Delete(ctx, at) + _ = law.liveActivityRepo.Delete(ctx, at) } else if !res.Sent() { - _ = lac.statsd.Incr("apns.live_activities.errors", []string{}, 1) - lac.logger.Error("notification not sent", + _ = law.statsd.Incr("apns.live_activities.errors", []string{}, 1) + law.logger.Error("notification not sent", zap.String("live_activity#apns_token", at), zap.Bool("live_activity#sandbox", la.Sandbox), zap.String("notification#type", ev), @@ -312,10 +250,10 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { zap.String("response#reason", res.Reason), ) - _ = lac.liveActivityRepo.Delete(ctx, at) + _ = law.liveActivityRepo.Delete(ctx, at) } else { - _ = lac.statsd.Incr("apns.notification.sent", []string{}, 1) - lac.logger.Debug("sent notification", + _ = law.statsd.Incr("apns.notification.sent", []string{}, 1) + law.logger.Debug("sent notification", zap.String("live_activity#apns_token", at), zap.Bool("live_activity#sandbox", la.Sandbox), zap.String("notification#type", ev), @@ -323,11 +261,12 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) { } if la.ExpiresAt.Before(now) { - lac.logger.Debug("live activity expired, deleting", zap.String("live_activity#apns_token", at)) - _ = lac.liveActivityRepo.Delete(ctx, at) + law.logger.Debug("live activity expired, deleting", zap.String("live_activity#apns_token", at)) + _ = law.liveActivityRepo.Delete(ctx, at) } - lac.logger.Debug("finishing job", + law.logger.Debug("finishing job", zap.String("live_activity#apns_token", at), ) + return nil } diff --git a/internal/worker/notifications.go b/internal/worker/notifications.go index cd974f9..e18b52b 100644 --- a/internal/worker/notifications.go +++ b/internal/worker/notifications.go @@ -7,7 +7,6 @@ import ( "time" "github.com/DataDog/datadog-go/statsd" - "github.com/adjust/rmq/v5" "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v4/pgxpool" "github.com/sideshow/apns2" @@ -32,23 +31,17 @@ const ( var notificationTags = []string{"queue:notifications"} type notificationsWorker struct { - context.Context - - logger *zap.Logger - statsd *statsd.Client - db *pgxpool.Pool - redis *redis.Client - queue rmq.Connection - reddit *reddit.Client - apns *token.Token - - consumers int - + logger *zap.Logger + statsd *statsd.Client + db *pgxpool.Pool + redis *redis.Client + reddit *reddit.Client + apns *apns2.Client accountRepo domain.AccountRepository deviceRepo domain.DeviceRepository } -func NewNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { +func NewNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, consumers int) Worker { reddit := reddit.NewClient( os.Getenv("REDDIT_CLIENT_ID"), os.Getenv("REDDIT_CLIENT_SECRET"), @@ -57,123 +50,65 @@ func NewNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd *sta consumers, ) - var apns *token.Token + var apns *apns2.Client { authKey, err := token.AuthKeyFromFile(os.Getenv("APPLE_KEY_PATH")) if err != nil { panic(err) } - apns = &token.Token{ + tok := &token.Token{ AuthKey: authKey, KeyID: os.Getenv("APPLE_KEY_ID"), TeamID: os.Getenv("APPLE_TEAM_ID"), } + apns = apns2.NewTokenClient(tok).Production() } return ¬ificationsWorker{ - ctx, logger, statsd, db, redis, - queue, reddit, apns, - consumers, - repository.NewPostgresAccount(db), repository.NewPostgresDevice(db), } } -func (nw *notificationsWorker) Start() error { - queue, err := nw.queue.OpenQueue("notifications") - if err != nil { - return err - } - - nw.logger.Info("starting up notifications worker", zap.Int("consumers", nw.consumers)) - - prefetchLimit := int64(nw.consumers * 20) - - if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil { - return err - } - - host, _ := os.Hostname() - - for i := 0; i < nw.consumers; i++ { - name := fmt.Sprintf("consumer %s-%d", host, i) - - consumer := NewNotificationsConsumer(nw, i) - if _, err := queue.AddConsumer(name, consumer); err != nil { - return err - } - } - - return nil -} - -func (nw *notificationsWorker) Stop() { - <-nw.queue.StopAllConsuming() // wait for all Consume() calls to finish -} - -type notificationsConsumer struct { - *notificationsWorker - tag int - apns *apns2.Client -} - -func NewNotificationsConsumer(nw *notificationsWorker, tag int) *notificationsConsumer { - return ¬ificationsConsumer{ - nw, - tag, - apns2.NewTokenClient(nw.apns).Production(), - } -} - -func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { - ctx, cancel := context.WithCancel(nc) - defer cancel() - +func (nw *notificationsWorker) Process(ctx context.Context, args ...interface{}) error { now := time.Now() defer func() { elapsed := time.Now().Sub(now).Milliseconds() - _ = nc.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), notificationTags, 0.1) - _ = nc.statsd.Incr("apollo.consumer.executions", notificationTags, 0.1) + _ = nw.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), notificationTags, 0.1) + _ = nw.statsd.Incr("apollo.consumer.executions", notificationTags, 0.1) }() - id := delivery.Payload() - logger := nc.logger.With(zap.String("account#reddit_account_id", id)) + id := args[0].(string) + logger := nw.logger.With(zap.String("account#reddit_account_id", id)) // Measure queue latency key := fmt.Sprintf("locks:accounts:%s", id) - ttl := nc.redis.PTTL(ctx, key).Val() + ttl := nw.redis.PTTL(ctx, key).Val() age := (domain.NotificationCheckTimeout - ttl) - _ = nc.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), notificationTags, 0.1) + _ = nw.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), notificationTags, 0.1) defer func() { - if err := nc.redis.Del(ctx, key).Err(); err != nil { + if err := nw.redis.Del(ctx, key).Err(); err != nil { logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key)) } }() logger.Debug("starting job") - defer func() { - if err := delivery.Ack(); err != nil { - logger.Error("failed to acknowledge message", zap.Error(err)) - } - }() - - account, err := nc.accountRepo.GetByRedditID(ctx, id) + account, err := nw.accountRepo.GetByRedditID(ctx, id) if err != nil { logger.Info("account not found, exiting", zap.Error(err)) - return + return nil } - rac := nc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken) + rac := nw.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken) logger = logger.With( zap.String("account#username", account.NormalizedUsername()), zap.String("account#access_token", rac.ObfuscatedAccessToken()), @@ -186,24 +121,25 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { if err != nil { if err != reddit.ErrOauthRevoked { logger.Error("failed to refresh reddit tokens", zap.Error(err)) - return + return err } - if err = nc.deleteAccount(ctx, account); err != nil { + if err = nw.deleteAccount(ctx, account); err != nil { logger.Error("failed to remove revoked account", zap.Error(err)) + return err } - return + return nil } // Update account account.AccessToken = tokens.AccessToken account.RefreshToken = tokens.RefreshToken account.TokenExpiresAt = now.Add(tokens.Expiry) - _ = nc.accountRepo.Update(ctx, &account) + _ = nw.accountRepo.Update(ctx, &account) // Refresh client - rac = nc.reddit.NewAuthenticatedClient(account.AccountID, tokens.RefreshToken, tokens.AccessToken) + rac = nw.reddit.NewAuthenticatedClient(account.AccountID, tokens.RefreshToken, tokens.AccessToken) logger = logger.With( zap.String("account#access_token", rac.ObfuscatedAccessToken()), zap.String("account#refresh_token", rac.ObfuscatedRefreshToken()), @@ -220,24 +156,20 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { if err != nil { switch err { - case reddit.ErrTimeout, reddit.ErrRateLimited: // Don't log timeouts or rate limits - break case reddit.ErrOauthRevoked: - if err = nc.deleteAccount(ctx, account); err != nil { - logger.Error("failed to remove revoked account", zap.Error(err)) - } else { - logger.Info("removed revoked account") - } + _ = nw.deleteAccount(ctx, account) + logger.Info("removed revoked account") + return nil default: logger.Error("failed to fetch message inbox", zap.Error(err)) + return err } - return } // Figure out where we stand if msgs.Count == 0 { logger.Debug("no new messages, bailing early") - return + return nil } logger.Debug("fetched messages", zap.Int("count", msgs.Count)) @@ -245,7 +177,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { for _, msg := range msgs.Children { if !msg.IsDeleted() { account.LastMessageID = msg.FullName() - _ = nc.accountRepo.Update(ctx, &account) + _ = nw.accountRepo.Update(ctx, &account) break } } @@ -255,19 +187,19 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { logger.Debug("populating first message id to prevent spamming") account.CheckCount = 1 - _ = nc.accountRepo.Update(ctx, &account) - return + _ = nw.accountRepo.Update(ctx, &account) + return nil } - devices, err := nc.deviceRepo.GetInboxNotifiableByAccountID(ctx, account.ID) + devices, err := nw.deviceRepo.GetInboxNotifiableByAccountID(ctx, account.ID) if err != nil { logger.Error("failed to fetch account devices", zap.Error(err)) - return + return nil } if len(devices) == 0 { logger.Debug("no notifiable devices, bailing early") - return + return nil } // Iterate backwards so we notify from older to newer @@ -281,7 +213,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { // Latency is the time difference between the appearence of the new message and the // time we notified at. latency := now.Sub(msg.CreatedAt) - _ = nc.statsd.Histogram("apollo.queue.delay", float64(latency.Milliseconds()), []string{}, 1.0) + _ = nw.statsd.Histogram("apollo.queue.delay", float64(latency.Milliseconds()), []string{}, 1.0) notification := &apns2.Notification{} notification.Topic = "com.christianselig.Apollo" @@ -290,18 +222,18 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { for _, device := range devices { notification.DeviceToken = device.APNSToken - res, err := nc.apns.PushWithContext(ctx, notification) + res, err := nw.apns.PushWithContext(ctx, notification) if err != nil { - _ = nc.statsd.Incr("apns.notification.errors", []string{}, 1) + _ = nw.statsd.Incr("apns.notification.errors", []string{}, 1) logger.Error("failed to send notification", zap.Error(err), zap.String("device#token", device.APNSToken), ) // Delete device as notifications might have been disabled here - _ = nc.deviceRepo.Delete(ctx, device.APNSToken) + _ = nw.deviceRepo.Delete(ctx, device.APNSToken) } else if !res.Sent() { - _ = nc.statsd.Incr("apns.notification.errors", []string{}, 1) + _ = nw.statsd.Incr("apns.notification.errors", []string{}, 1) logger.Error("notification not sent", zap.String("device#token", device.APNSToken), zap.Int("response#status", res.StatusCode), @@ -309,34 +241,35 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) { ) // Delete device as notifications might have been disabled here - _ = nc.deviceRepo.Delete(ctx, device.APNSToken) + _ = nw.deviceRepo.Delete(ctx, device.APNSToken) } else { - _ = nc.statsd.Incr("apns.notification.sent", []string{}, 1) + _ = nw.statsd.Incr("apns.notification.sent", []string{}, 1) logger.Info("sent notification", zap.String("device#token", device.APNSToken)) } } } ev := fmt.Sprintf("Sent notification to /u/%s (x%d)", account.Username, msgs.Count) - _ = nc.statsd.SimpleEvent(ev, "") + _ = nw.statsd.SimpleEvent(ev, "") logger.Debug("finishing job") + return nil } -func (nc *notificationsConsumer) deleteAccount(ctx context.Context, account domain.Account) error { +func (nw *notificationsWorker) deleteAccount(ctx context.Context, account domain.Account) error { // Disassociate account from devices - devs, err := nc.deviceRepo.GetByAccountID(ctx, account.ID) + devs, err := nw.deviceRepo.GetByAccountID(ctx, account.ID) if err != nil { return err } for _, dev := range devs { - if err := nc.accountRepo.Disassociate(nc, &account, &dev); err != nil { + if err := nw.accountRepo.Disassociate(ctx, &account, &dev); err != nil { return err } } - return nc.accountRepo.Delete(nc, account.ID) + return nw.accountRepo.Delete(ctx, account.ID) } func payloadFromMessage(acct domain.Account, msg *reddit.Thing, badgeCount int) *payload.Payload { diff --git a/internal/worker/stuck_notifications.go b/internal/worker/stuck_notifications.go index 509377e..d707f5e 100644 --- a/internal/worker/stuck_notifications.go +++ b/internal/worker/stuck_notifications.go @@ -2,13 +2,10 @@ package worker import ( "context" - "fmt" "os" - "strconv" "time" "github.com/DataDog/datadog-go/statsd" - "github.com/adjust/rmq/v5" "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v4/pgxpool" "go.uber.org/zap" @@ -19,21 +16,15 @@ import ( ) type stuckNotificationsWorker struct { - context.Context - - logger *zap.Logger - statsd *statsd.Client - db *pgxpool.Pool - redis *redis.Client - queue rmq.Connection - reddit *reddit.Client - - consumers int - + logger *zap.Logger + statsd *statsd.Client + db *pgxpool.Pool + redis *redis.Client + reddit *reddit.Client accountRepo domain.AccountRepository } -func NewStuckNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { +func NewStuckNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, consumers int) Worker { reddit := reddit.NewClient( os.Getenv("REDDIT_CLIENT_ID"), os.Getenv("REDDIT_CLIENT_SECRET"), @@ -43,102 +34,42 @@ func NewStuckNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd ) return &stuckNotificationsWorker{ - ctx, logger, statsd, db, redis, - queue, reddit, - consumers, - repository.NewPostgresAccount(db), } } -func (snw *stuckNotificationsWorker) Start() error { - queue, err := snw.queue.OpenQueue("stuck-notifications") - if err != nil { - return err - } - - snw.logger.Info("starting up stuck notifications worker", zap.Int("consumers", snw.consumers)) - - prefetchLimit := int64(snw.consumers * 2) - - if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil { - return err - } - - host, _ := os.Hostname() - - for i := 0; i < snw.consumers; i++ { - name := fmt.Sprintf("consumer %s-%d", host, i) - - consumer := NewStuckNotificationsConsumer(snw, i) - if _, err := queue.AddConsumer(name, consumer); err != nil { - return err - } - } - - return nil -} - -func (snw *stuckNotificationsWorker) Stop() { - <-snw.queue.StopAllConsuming() // wait for all Consume() calls to finish -} - -type stuckNotificationsConsumer struct { - *stuckNotificationsWorker - tag int -} - -func NewStuckNotificationsConsumer(snw *stuckNotificationsWorker, tag int) *stuckNotificationsConsumer { - return &stuckNotificationsConsumer{ - snw, - tag, - } -} - -func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) { - ctx, cancel := context.WithCancel(snc) - defer cancel() - +func (snw *stuckNotificationsWorker) Process(ctx context.Context, args ...interface{}) error { now := time.Now() defer func() { elapsed := time.Now().Sub(now).Milliseconds() - _ = snc.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), []string{"queue:stuck-notifications"}, 0.1) + _ = snw.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), []string{"queue:stuck-notifications"}, 0.1) }() - id, err := strconv.ParseInt(delivery.Payload(), 10, 64) + id := args[0].(int64) + snw.logger.Debug("starting job", zap.Int64("account#id", id)) + + account, err := snw.accountRepo.GetByID(ctx, id) if err != nil { - snc.logger.Error("failed to parse account id from payload", zap.Error(err), zap.String("payload", delivery.Payload())) - - _ = delivery.Reject() - return - } - - snc.logger.Debug("starting job", zap.Int64("account#id", id)) - - defer func() { _ = delivery.Ack() }() - - account, err := snc.accountRepo.GetByID(ctx, id) - if err != nil { - snc.logger.Error("failed to fetch account from database", zap.Error(err), zap.Int64("account#id", id)) - return + snw.logger.Error("failed to fetch account from database", zap.Error(err), zap.Int64("account#id", id)) + return nil } if account.LastMessageID == "" { - snc.logger.Debug("account has no messages, bailing early", + snw.logger.Debug("account has no messages, bailing early", zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()), ) - return + return nil } - rac := snc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken) + rac := snw.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken) - snc.logger.Debug("fetching last thing", + snw.logger.Debug("fetching last thing", zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()), ) @@ -147,7 +78,7 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) { var things *reddit.ListingResponse if kind == "t4" { - snc.logger.Debug("checking last thing via inbox", + snw.logger.Debug("checking last thing via inbox", zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()), ) @@ -155,23 +86,23 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) { things, err = rac.MessageInbox(ctx) if err != nil { if err != reddit.ErrRateLimited { - snc.logger.Error("failed to fetch last thing via inbox", + snw.logger.Error("failed to fetch last thing via inbox", zap.Error(err), zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()), ) } - return + return nil } } else { things, err = rac.AboutInfo(ctx, account.LastMessageID) if err != nil { - snc.logger.Error("failed to fetch last thing", + snw.logger.Error("failed to fetch last thing", zap.Error(err), zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()), ) - return + return nil } } @@ -186,17 +117,17 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) { } if kind == "t4" { - return + return nil } sthings, err := rac.MessageInbox(ctx) if err != nil { - snc.logger.Error("failed to check inbox", + snw.logger.Error("failed to check inbox", zap.Error(err), zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()), ) - return + return nil } found := false @@ -207,7 +138,7 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) { } if !found { - snc.logger.Debug("thing exists, but not on inbox, marking as deleted", + snw.logger.Debug("thing exists, but not on inbox, marking as deleted", zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()), zap.String("thing#id", account.LastMessageID), @@ -215,47 +146,47 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) { break } - snc.logger.Debug("thing exists, bailing early", + snw.logger.Debug("thing exists, bailing early", zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()), zap.String("thing#id", account.LastMessageID), ) - return + return nil } } - snc.logger.Info("thing got deleted, resetting", + snw.logger.Info("thing got deleted, resetting", zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()), zap.String("thing#id", account.LastMessageID), ) if kind != "t4" { - snc.logger.Debug("getting message inbox to find last good thing", + snw.logger.Debug("getting message inbox to find last good thing", zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()), ) things, err = rac.MessageInbox(ctx) if err != nil { - snc.logger.Error("failed to check inbox", + snw.logger.Error("failed to check inbox", zap.Error(err), zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()), ) - return + return nil } } account.LastMessageID = "" - snc.logger.Debug("calculating last good thing", + snw.logger.Debug("calculating last good thing", zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()), ) for _, thing := range things.Children { if thing.IsDeleted() { - snc.logger.Debug("thing got deleted, checking next", + snw.logger.Debug("thing got deleted, checking next", zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()), zap.String("thing#id", thing.FullName()), @@ -267,17 +198,20 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) { break } - snc.logger.Debug("updating last good thing", + snw.logger.Debug("updating last good thing", zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()), zap.String("thing#id", account.LastMessageID), ) - if err := snc.accountRepo.Update(ctx, &account); err != nil { - snc.logger.Error("failed to update account's last message id", + if err := snw.accountRepo.Update(ctx, &account); err != nil { + snw.logger.Error("failed to update account's last message id", zap.Error(err), zap.Int64("account#id", id), zap.String("account#username", account.NormalizedUsername()), ) + return err } + + return nil } diff --git a/internal/worker/subreddits.go b/internal/worker/subreddits.go index 03d005d..e1903ba 100644 --- a/internal/worker/subreddits.go +++ b/internal/worker/subreddits.go @@ -5,12 +5,10 @@ import ( "fmt" "math/rand" "os" - "strconv" "strings" "time" "github.com/DataDog/datadog-go/statsd" - "github.com/adjust/rmq/v5" "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v4/pgxpool" "github.com/sideshow/apns2" @@ -24,17 +22,12 @@ import ( ) type subredditsWorker struct { - context.Context - logger *zap.Logger statsd *statsd.Client db *pgxpool.Pool redis *redis.Client - queue rmq.Connection reddit *reddit.Client - apns *token.Token - - consumers int + apns *apns2.Client accountRepo domain.AccountRepository deviceRepo domain.DeviceRepository @@ -47,7 +40,7 @@ const ( subredditNotificationBodyFormat = "r/%s: \u201c%s\u201d" ) -func NewSubredditsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { +func NewSubredditsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, consumers int) Worker { reddit := reddit.NewClient( os.Getenv("REDDIT_CLIENT_ID"), os.Getenv("REDDIT_CLIENT_SECRET"), @@ -56,31 +49,28 @@ func NewSubredditsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd consumers, ) - var apns *token.Token + var apns *apns2.Client { authKey, err := token.AuthKeyFromFile(os.Getenv("APPLE_KEY_PATH")) if err != nil { panic(err) } - apns = &token.Token{ + tok := &token.Token{ AuthKey: authKey, KeyID: os.Getenv("APPLE_KEY_ID"), TeamID: os.Getenv("APPLE_TEAM_ID"), } + apns = apns2.NewTokenClient(tok).Production() } return &subredditsWorker{ - ctx, logger, statsd, db, redis, - queue, reddit, apns, - consumers, - repository.NewPostgresAccount(db), repository.NewPostgresDevice(db), repository.NewPostgresSubreddit(db), @@ -88,92 +78,32 @@ func NewSubredditsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd } } -func (sw *subredditsWorker) Start() error { - queue, err := sw.queue.OpenQueue("subreddits") +func (sw *subredditsWorker) Process(ctx context.Context, args ...interface{}) error { + id := args[0].(int64) + sw.logger.Debug("starting job", zap.Int64("subreddit#id", id)) + + subreddit, err := sw.subredditRepo.GetByID(ctx, id) if err != nil { - return err + sw.logger.Error("failed to fetch subreddit from database", zap.Error(err), zap.Int64("subreddit#id", id)) + return nil } - sw.logger.Info("starting up subreddits worker", zap.Int("consumers", sw.consumers)) - - prefetchLimit := int64(sw.consumers * 2) - - if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil { - return err - } - - host, _ := os.Hostname() - - for i := 0; i < sw.consumers; i++ { - name := fmt.Sprintf("consumer %s-%d", host, i) - - consumer := NewSubredditsConsumer(sw, i) - if _, err := queue.AddConsumer(name, consumer); err != nil { - return err - } - } - - return nil -} - -func (sw *subredditsWorker) Stop() { - <-sw.queue.StopAllConsuming() // wait for all Consume() calls to finish -} - -type subredditsConsumer struct { - *subredditsWorker - tag int - - apnsSandbox *apns2.Client - apnsProduction *apns2.Client -} - -func NewSubredditsConsumer(sw *subredditsWorker, tag int) *subredditsConsumer { - return &subredditsConsumer{ - sw, - tag, - apns2.NewTokenClient(sw.apns), - apns2.NewTokenClient(sw.apns).Production(), - } -} - -func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { - ctx, cancel := context.WithCancel(sc) - defer cancel() - - id, err := strconv.ParseInt(delivery.Payload(), 10, 64) + watchers, err := sw.watcherRepo.GetBySubredditID(ctx, subreddit.ID) if err != nil { - sc.logger.Error("failed to parse subreddit id from payload", zap.Error(err), zap.String("payload", delivery.Payload())) - _ = delivery.Reject() - return - } - - sc.logger.Debug("starting job", zap.Int64("subreddit#id", id)) - - defer func() { _ = delivery.Ack() }() - - subreddit, err := sc.subredditRepo.GetByID(ctx, id) - if err != nil { - sc.logger.Error("failed to fetch subreddit from database", zap.Error(err), zap.Int64("subreddit#id", id)) - return - } - - watchers, err := sc.watcherRepo.GetBySubredditID(ctx, subreddit.ID) - if err != nil { - sc.logger.Error("failed to fetch watchers from database", + sw.logger.Error("failed to fetch watchers from database", zap.Error(err), zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), ) - return + return err } if len(watchers) == 0 { - sc.logger.Debug("no watchers for subreddit, bailing early", + sw.logger.Debug("no watchers for subreddit, bailing early", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), ) - return + return nil } threshold := time.Now().Add(-24 * time.Hour) @@ -183,13 +113,13 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { seenPosts := map[string]bool{} // Load 500 newest posts - sc.logger.Debug("loading up to 500 new posts", + sw.logger.Debug("loading up to 500 new posts", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), ) for page := 0; page < 5; page++ { - sc.logger.Debug("loading new posts", + sw.logger.Debug("loading new posts", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.Int("page", page), @@ -198,7 +128,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { i := rand.Intn(len(watchers)) watcher := watchers[i] - rac := sc.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken) + rac := sw.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken) sps, err := rac.SubredditNew(ctx, subreddit.Name, reddit.WithQuery("before", before), @@ -208,7 +138,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { ) if err != nil { - sc.logger.Error("failed to fetch new posts", + sw.logger.Error("failed to fetch new posts", zap.Error(err), zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), @@ -217,7 +147,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { continue } - sc.logger.Debug("loaded new posts", + sw.logger.Debug("loaded new posts", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.Int("page", page), @@ -247,7 +177,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { } if finished { - sc.logger.Debug("reached date threshold", + sw.logger.Debug("reached date threshold", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.Int("page", page), @@ -257,7 +187,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { } // Load hot posts - sc.logger.Debug("loading hot posts", + sw.logger.Debug("loading hot posts", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), ) @@ -265,7 +195,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { i := rand.Intn(len(watchers)) watcher := watchers[i] - rac := sc.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken) + rac := sw.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken) sps, err := rac.SubredditHot(ctx, subreddit.Name, reddit.WithQuery("limit", "100"), @@ -274,13 +204,13 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { ) if err != nil { - sc.logger.Error("failed to fetch hot posts", + sw.logger.Error("failed to fetch hot posts", zap.Error(err), zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), ) } else { - sc.logger.Debug("loaded hot posts", + sw.logger.Debug("loaded hot posts", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.Int("count", sps.Count), @@ -298,7 +228,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { } } - sc.logger.Debug("checking posts for watcher hits", + sw.logger.Debug("checking posts for watcher hits", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.Int("count", len(posts)), @@ -339,7 +269,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { continue } - sc.logger.Debug("matched post", + sw.logger.Debug("matched post", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.Int64("watcher#id", watcher.ID), @@ -351,10 +281,10 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { ) lockKey := fmt.Sprintf("watcher:%d:%s", watcher.DeviceID, post.ID) - notified, _ := sc.redis.Get(ctx, lockKey).Bool() + notified, _ := sw.redis.Get(ctx, lockKey).Bool() if notified { - sc.logger.Debug("already notified, skipping", + sw.logger.Debug("already notified, skipping", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.Int64("watcher#id", watcher.ID), @@ -363,30 +293,30 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { continue } - if err := sc.watcherRepo.IncrementHits(ctx, watcher.ID); err != nil { - sc.logger.Error("could not increment hits", + if err := sw.watcherRepo.IncrementHits(ctx, watcher.ID); err != nil { + sw.logger.Error("could not increment hits", zap.Error(err), zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.Int64("watcher#id", watcher.ID), ) - return + return err } - sc.logger.Debug("got a hit", + sw.logger.Debug("got a hit", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.Int64("watcher#id", watcher.ID), zap.String("post#id", post.ID), ) - sc.redis.SetEX(ctx, lockKey, true, 24*time.Hour) + sw.redis.SetEX(ctx, lockKey, true, 24*time.Hour) notifs = append(notifs, watcher) } if len(notifs) == 0 { continue } - sc.logger.Debug("got hits for post", + sw.logger.Debug("got hits for post", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.String("post#id", post.ID), @@ -407,24 +337,20 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { notification.DeviceToken = watcher.Device.APNSToken notification.Payload = payload - client := sc.apnsProduction - if watcher.Device.Sandbox { - client = sc.apnsSandbox - } - - res, err := client.Push(notification) + res, err := sw.apns.Push(notification) if err != nil { - _ = sc.statsd.Incr("apns.notification.errors", []string{}, 1) - sc.logger.Error("failed to send notification", + _ = sw.statsd.Incr("apns.notification.errors", []string{}, 1) + sw.logger.Error("failed to send notification", zap.Error(err), zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.String("post#id", post.ID), zap.String("apns", watcher.Device.APNSToken), ) + return err } else if !res.Sent() { - _ = sc.statsd.Incr("apns.notification.errors", []string{}, 1) - sc.logger.Error("notificaion not sent", + _ = sw.statsd.Incr("apns.notification.errors", []string{}, 1) + sw.logger.Error("notificaion not sent", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.String("post#id", post.ID), @@ -433,8 +359,8 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { zap.String("response#reason", res.Reason), ) } else { - _ = sc.statsd.Incr("apns.notification.sent", []string{}, 1) - sc.logger.Info("sent notification", + _ = sw.statsd.Incr("apns.notification.sent", []string{}, 1) + sw.logger.Info("sent notification", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.String("post#id", post.ID), @@ -444,10 +370,12 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) { } } - sc.logger.Debug("finishing job", + sw.logger.Debug("finishing job", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), ) + + return nil } func payloadFromPost(post *reddit.Thing) *payload.Payload { diff --git a/internal/worker/trending.go b/internal/worker/trending.go index 9bedd17..97b4f85 100644 --- a/internal/worker/trending.go +++ b/internal/worker/trending.go @@ -6,11 +6,9 @@ import ( "math/rand" "os" "sort" - "strconv" "time" "github.com/DataDog/datadog-go/statsd" - "github.com/adjust/rmq/v5" "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v4/pgxpool" "github.com/sideshow/apns2" @@ -24,16 +22,11 @@ import ( ) type trendingWorker struct { - context.Context - logger *zap.Logger statsd *statsd.Client redis *redis.Client - queue rmq.Connection reddit *reddit.Client - apns *token.Token - - consumers int + apns *apns2.Client accountRepo domain.AccountRepository deviceRepo domain.DeviceRepository @@ -43,7 +36,7 @@ type trendingWorker struct { const trendingNotificationTitleFormat = "🔥 r/%s Trending" -func NewTrendingWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { +func NewTrendingWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, consumers int) Worker { reddit := reddit.NewClient( os.Getenv("REDDIT_CLIENT_ID"), os.Getenv("REDDIT_CLIENT_SECRET"), @@ -52,29 +45,27 @@ func NewTrendingWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.C consumers, ) - var apns *token.Token + var apns *apns2.Client { authKey, err := token.AuthKeyFromFile(os.Getenv("APPLE_KEY_PATH")) if err != nil { panic(err) } - apns = &token.Token{ + tok := &token.Token{ AuthKey: authKey, KeyID: os.Getenv("APPLE_KEY_ID"), TeamID: os.Getenv("APPLE_TEAM_ID"), } + apns = apns2.NewTokenClient(tok).Production() } return &trendingWorker{ - ctx, logger, statsd, redis, - queue, reddit, apns, - consumers, repository.NewPostgresAccount(db), repository.NewPostgresDevice(db), @@ -83,130 +74,70 @@ func NewTrendingWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.C } } -func (tw *trendingWorker) Start() error { - queue, err := tw.queue.OpenQueue("trending") +func (tw *trendingWorker) Process(ctx context.Context, args ...interface{}) error { + id := args[0].(int64) + tw.logger.Debug("starting job", zap.Int64("subreddit#id", id)) + + subreddit, err := tw.subredditRepo.GetByID(ctx, id) if err != nil { - return err + tw.logger.Error("failed to fetch subreddit from database", zap.Error(err), zap.Int64("subreddit#id", id)) + return nil } - tw.logger.Info("starting up trending subreddits worker", zap.Int("consumers", tw.consumers)) - - prefetchLimit := int64(tw.consumers * 2) - - if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil { - return err - } - - host, _ := os.Hostname() - - for i := 0; i < tw.consumers; i++ { - name := fmt.Sprintf("consumer %s-%d", host, i) - - consumer := NewTrendingConsumer(tw, i) - if _, err := queue.AddConsumer(name, consumer); err != nil { - return err - } - } - - return nil -} - -func (tw *trendingWorker) Stop() { - <-tw.queue.StopAllConsuming() // wait for all Consume() calls to finish -} - -type trendingConsumer struct { - *trendingWorker - tag int - - apnsSandbox *apns2.Client - apnsProduction *apns2.Client -} - -func NewTrendingConsumer(tw *trendingWorker, tag int) *trendingConsumer { - return &trendingConsumer{ - tw, - tag, - apns2.NewTokenClient(tw.apns), - apns2.NewTokenClient(tw.apns).Production(), - } -} - -func (tc *trendingConsumer) Consume(delivery rmq.Delivery) { - ctx, cancel := context.WithCancel(tc) - defer cancel() - - id, err := strconv.ParseInt(delivery.Payload(), 10, 64) + watchers, err := tw.watcherRepo.GetByTrendingSubredditID(ctx, subreddit.ID) if err != nil { - tc.logger.Error("failed to parse subreddit id from payload", zap.Error(err), zap.String("payload", delivery.Payload())) - _ = delivery.Reject() - return - } - - tc.logger.Debug("starting job", zap.Int64("subreddit#id", id)) - - defer func() { _ = delivery.Ack() }() - - subreddit, err := tc.subredditRepo.GetByID(ctx, id) - if err != nil { - tc.logger.Error("failed to fetch subreddit from database", zap.Error(err), zap.Int64("subreddit#id", id)) - return - } - - watchers, err := tc.watcherRepo.GetByTrendingSubredditID(ctx, subreddit.ID) - if err != nil { - tc.logger.Error("failed to fetch watchers from database", + tw.logger.Error("failed to fetch watchers from database", zap.Error(err), zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), ) - return + return err } if len(watchers) == 0 { - tc.logger.Debug("no watchers for subreddit, bailing early", + tw.logger.Debug("no watchers for subreddit, bailing early", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), ) - return + return nil } // Grab last month's top posts so we calculate a trending average i := rand.Intn(len(watchers)) watcher := watchers[i] - rac := tc.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken) + rac := tw.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken) tps, err := rac.SubredditTop(ctx, subreddit.Name, reddit.WithQuery("t", "week"), reddit.WithQuery("show", "all"), reddit.WithQuery("limit", "25")) if err != nil { - tc.logger.Error("failed to fetch weeks's top posts", + tw.logger.Error("failed to fetch weeks's top posts", zap.Error(err), zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), ) - return + return err } - tc.logger.Debug("loaded weeks's top posts", + tw.logger.Debug("loaded weeks's top posts", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.Int("count", tps.Count), ) if tps.Count == 0 { - tc.logger.Debug("no top posts, bailing early", + tw.logger.Debug("no top posts, bailing early", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), ) - return + return nil } if tps.Count < 20 { - tc.logger.Debug("no top posts, bailing early", + tw.logger.Debug("no top posts, bailing early", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.Int("count", tps.Count), ) - return + return nil } sort.SliceStable(tps.Children, func(i, j int) bool { @@ -215,7 +146,7 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) { middlePost := tps.Count / 2 medianScore := tps.Children[middlePost].Score - tc.logger.Debug("calculated median score", + tw.logger.Debug("calculated median score", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.Int64("score", medianScore), @@ -224,18 +155,18 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) { // Grab hot posts and filter out anything that's > 2 days old i = rand.Intn(len(watchers)) watcher = watchers[i] - rac = tc.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken) + rac = tw.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken) hps, err := rac.SubredditHot(ctx, subreddit.Name, reddit.WithQuery("show", "all"), reddit.WithQuery("always_show_media", "1")) if err != nil { - tc.logger.Error("failed to fetch hot posts", + tw.logger.Error("failed to fetch hot posts", zap.Error(err), zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), ) - return + return err } - tc.logger.Debug("loaded hot posts", + tw.logger.Debug("loaded hot posts", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.Int("count", hps.Count), @@ -263,10 +194,10 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) { } lockKey := fmt.Sprintf("watcher:trending:%d:%s", watcher.DeviceID, post.ID) - notified, _ := tc.redis.Get(ctx, lockKey).Bool() + notified, _ := tw.redis.Get(ctx, lockKey).Bool() if notified { - tc.logger.Debug("already notified, skipping", + tw.logger.Debug("already notified, skipping", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.Int64("watcher#id", watcher.ID), @@ -275,29 +206,24 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) { continue } - tc.redis.SetEX(ctx, lockKey, true, 48*time.Hour) + tw.redis.SetEX(ctx, lockKey, true, 48*time.Hour) - if err := tc.watcherRepo.IncrementHits(ctx, watcher.ID); err != nil { - tc.logger.Error("could not increment hits", + if err := tw.watcherRepo.IncrementHits(ctx, watcher.ID); err != nil { + tw.logger.Error("could not increment hits", zap.Error(err), zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.Int64("watcher#id", watcher.ID), ) - return + return err } notification.DeviceToken = watcher.Device.APNSToken - client := tc.apnsProduction - if watcher.Device.Sandbox { - client = tc.apnsSandbox - } - - res, err := client.Push(notification) + res, err := tw.apns.Push(notification) if err != nil { - _ = tc.statsd.Incr("apns.notification.errors", []string{}, 1) - tc.logger.Error("failed to send notification", + _ = tw.statsd.Incr("apns.notification.errors", []string{}, 1) + tw.logger.Error("failed to send notification", zap.Error(err), zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), @@ -305,8 +231,8 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) { zap.String("apns", watcher.Device.APNSToken), ) } else if !res.Sent() { - _ = tc.statsd.Incr("apns.notification.errors", []string{}, 1) - tc.logger.Error("notification not sent", + _ = tw.statsd.Incr("apns.notification.errors", []string{}, 1) + tw.logger.Error("notification not sent", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.String("post#id", post.ID), @@ -315,8 +241,8 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) { zap.String("response#reason", res.Reason), ) } else { - _ = tc.statsd.Incr("apns.notification.sent", []string{}, 1) - tc.logger.Info("sent notification", + _ = tw.statsd.Incr("apns.notification.sent", []string{}, 1) + tw.logger.Info("sent notification", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), zap.String("post#id", post.ID), @@ -327,10 +253,12 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) { } } - tc.logger.Debug("finishing job", + tw.logger.Debug("finishing job", zap.Int64("subreddit#id", id), zap.String("subreddit#name", subreddit.NormalizedName()), ) + + return nil } func payloadFromTrendingPost(post *reddit.Thing) *payload.Payload { diff --git a/internal/worker/users.go b/internal/worker/users.go deleted file mode 100644 index 29431a1..0000000 --- a/internal/worker/users.go +++ /dev/null @@ -1,332 +0,0 @@ -package worker - -import ( - "context" - "fmt" - "math/rand" - "os" - "strconv" - "strings" - - "github.com/DataDog/datadog-go/statsd" - "github.com/adjust/rmq/v5" - "github.com/go-redis/redis/v8" - "github.com/jackc/pgx/v4/pgxpool" - "github.com/sideshow/apns2" - "github.com/sideshow/apns2/payload" - "github.com/sideshow/apns2/token" - "go.uber.org/zap" - - "github.com/christianselig/apollo-backend/internal/domain" - "github.com/christianselig/apollo-backend/internal/reddit" - "github.com/christianselig/apollo-backend/internal/repository" -) - -type usersWorker struct { - context.Context - - logger *zap.Logger - statsd *statsd.Client - db *pgxpool.Pool - redis *redis.Client - queue rmq.Connection - reddit *reddit.Client - apns *token.Token - - consumers int - - accountRepo domain.AccountRepository - deviceRepo domain.DeviceRepository - userRepo domain.UserRepository - watcherRepo domain.WatcherRepository -} - -const userNotificationTitleFormat = "👨\u200d🚀 %s" - -func NewUsersWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker { - reddit := reddit.NewClient( - os.Getenv("REDDIT_CLIENT_ID"), - os.Getenv("REDDIT_CLIENT_SECRET"), - statsd, - redis, - consumers, - ) - - var apns *token.Token - { - authKey, err := token.AuthKeyFromFile(os.Getenv("APPLE_KEY_PATH")) - if err != nil { - panic(err) - } - - apns = &token.Token{ - AuthKey: authKey, - KeyID: os.Getenv("APPLE_KEY_ID"), - TeamID: os.Getenv("APPLE_TEAM_ID"), - } - } - - return &usersWorker{ - ctx, - logger, - statsd, - db, - redis, - queue, - reddit, - apns, - consumers, - - repository.NewPostgresAccount(db), - repository.NewPostgresDevice(db), - repository.NewPostgresUser(db), - repository.NewPostgresWatcher(db), - } -} - -func (uw *usersWorker) Start() error { - queue, err := uw.queue.OpenQueue("users") - if err != nil { - return err - } - - uw.logger.Info("starting up subreddits worker", zap.Int("consumers", uw.consumers)) - - prefetchLimit := int64(uw.consumers * 2) - - if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil { - return err - } - - host, _ := os.Hostname() - - for i := 0; i < uw.consumers; i++ { - name := fmt.Sprintf("consumer %s-%d", host, i) - - consumer := NewUsersConsumer(uw, i) - if _, err := queue.AddConsumer(name, consumer); err != nil { - return err - } - } - - return nil -} - -func (uw *usersWorker) Stop() { - <-uw.queue.StopAllConsuming() // wait for all Consume() calls to finish -} - -type usersConsumer struct { - *usersWorker - tag int - - apnsSandbox *apns2.Client - apnsProduction *apns2.Client -} - -func NewUsersConsumer(uw *usersWorker, tag int) *usersConsumer { - return &usersConsumer{ - uw, - tag, - apns2.NewTokenClient(uw.apns), - apns2.NewTokenClient(uw.apns).Production(), - } -} - -func (uc *usersConsumer) Consume(delivery rmq.Delivery) { - ctx, cancel := context.WithCancel(uc) - defer cancel() - - id, err := strconv.ParseInt(delivery.Payload(), 10, 64) - if err != nil { - uc.logger.Error("failed to parse subreddit id from payload", zap.Error(err), zap.String("payload", delivery.Payload())) - _ = delivery.Reject() - return - } - - uc.logger.Debug("starting job", zap.Int64("subreddit#id", id)) - - defer func() { _ = delivery.Ack() }() - - user, err := uc.userRepo.GetByID(ctx, id) - if err != nil { - uc.logger.Error("failed to fetch user from database", zap.Error(err), zap.Int64("subreddit#id", id)) - return - } - - watchers, err := uc.watcherRepo.GetByUserID(ctx, user.ID) - if err != nil { - uc.logger.Error("failed to fetch watchers from database", - zap.Error(err), - zap.Int64("user#id", id), - zap.String("user#name", user.NormalizedName()), - ) - return - } - - if len(watchers) == 0 { - uc.logger.Debug("no watchers for user, bailing early", - zap.Int64("user#id", id), - zap.String("user#name", user.NormalizedName()), - ) - return - } - - // Load 25 newest posts - i := rand.Intn(len(watchers)) - watcher := watchers[i] - - acc, _ := uc.accountRepo.GetByID(ctx, watcher.AccountID) - rac := uc.reddit.NewAuthenticatedClient(acc.AccountID, acc.RefreshToken, acc.AccessToken) - - ru, err := rac.UserAbout(ctx, user.Name) - if err != nil { - uc.logger.Error("failed to fetch user details", - zap.Error(err), - zap.Int64("user#id", id), - zap.String("user#name", user.NormalizedName()), - ) - return - } - - if !ru.AcceptFollowers { - uc.logger.Info("user disabled followers, removing", - zap.Int64("user#id", id), - zap.String("user#name", user.NormalizedName()), - ) - - if err := uc.watcherRepo.DeleteByTypeAndWatcheeID(ctx, domain.UserWatcher, user.ID); err != nil { - uc.logger.Error("failed to remove watchers for user who disallows followers", - zap.Error(err), - zap.Int64("user#id", id), - zap.String("user#name", user.NormalizedName()), - ) - return - } - - if err := uc.userRepo.Delete(ctx, user.ID); err != nil { - uc.logger.Error("failed to remove user", - zap.Error(err), - zap.Int64("user#id", id), - zap.String("user#name", user.NormalizedName()), - ) - return - } - } - - posts, err := rac.UserPosts(ctx, user.Name) - if err != nil { - uc.logger.Error("failed to fetch user activity", - zap.Error(err), - zap.Int64("user#id", id), - zap.String("user#name", user.NormalizedName()), - ) - return - } - - for _, post := range posts.Children { - lowcaseSubreddit := strings.ToLower(post.Subreddit) - - if post.SubredditType == "private" { - continue - } - - notifs := []domain.Watcher{} - - for _, watcher := range watchers { - // Make sure we only alert on activities created after the search - if watcher.CreatedAt.After(post.CreatedAt) { - continue - } - - if watcher.LastNotifiedAt.After(post.CreatedAt) { - continue - } - - if watcher.Subreddit != "" && lowcaseSubreddit != watcher.Subreddit { - continue - } - - notifs = append(notifs, watcher) - } - - if len(notifs) == 0 { - continue - } - - payload := payloadFromUserPost(post) - - notification := &apns2.Notification{} - notification.Topic = "com.christianselig.Apollo" - - for _, watcher := range notifs { - if err := uc.watcherRepo.IncrementHits(ctx, watcher.ID); err != nil { - uc.logger.Error("failed to increment watcher hits", - zap.Error(err), - zap.Int64("user#id", id), - zap.String("user#name", user.NormalizedName()), - zap.Int64("watcher#id", watcher.ID), - ) - return - } - - device, _ := uc.deviceRepo.GetByID(ctx, watcher.DeviceID) - - title := fmt.Sprintf(userNotificationTitleFormat, watcher.Label) - payload.AlertTitle(title) - - notification.Payload = payload - notification.DeviceToken = device.APNSToken - - client := uc.apnsProduction - if device.Sandbox { - client = uc.apnsSandbox - } - - res, err := client.Push(notification) - if err != nil || !res.Sent() { - _ = uc.statsd.Incr("apns.notification.errors", []string{}, 1) - uc.logger.Error("failed to send notification", - zap.Error(err), - zap.Int64("user#id", id), - zap.String("user#name", user.NormalizedName()), - zap.String("post#id", post.ID), - zap.String("apns", watcher.Device.APNSToken), - zap.Int("response#status", res.StatusCode), - zap.String("response#reason", res.Reason), - ) - } else { - _ = uc.statsd.Incr("apns.notification.sent", []string{}, 1) - uc.logger.Info("sent notification", - zap.Int64("user#id", id), - zap.String("user#name", user.NormalizedName()), - zap.String("post#id", post.ID), - zap.String("device#token", watcher.Device.APNSToken), - ) - } - } - } - - uc.logger.Debug("finishing job", - zap.Int64("user#id", id), - zap.String("user#name", user.NormalizedName()), - ) -} - -func payloadFromUserPost(post *reddit.Thing) *payload.Payload { - payload := payload. - NewPayload(). - AlertBody(post.Title). - AlertSubtitle(post.Author). - AlertSummaryArg(post.Author). - Category("user-watch"). - Custom("post_title", post.Title). - Custom("post_id", post.ID). - Custom("subreddit", post.Subreddit). - Custom("author", post.Author). - Custom("post_age", post.CreatedAt). - MutableContent(). - Sound("traloop.wav") - - return payload -} diff --git a/internal/worker/worker.go b/internal/worker/worker.go index b514823..3c418ee 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -5,7 +5,6 @@ import ( "time" "github.com/DataDog/datadog-go/statsd" - "github.com/adjust/rmq/v5" "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v4/pgxpool" "go.uber.org/zap" @@ -13,8 +12,7 @@ import ( const pollDuration = 100 * time.Millisecond -type NewWorkerFn func(context.Context, *zap.Logger, *statsd.Client, *pgxpool.Pool, *redis.Client, rmq.Connection, int) Worker +type NewWorkerFn func(context.Context, *zap.Logger, *statsd.Client, *pgxpool.Pool, *redis.Client, int) Worker type Worker interface { - Start() error - Stop() + Process(ctx context.Context, args ...interface{}) error } diff --git a/render.yaml b/render.yaml index 8f7621a..2c5bbf9 100644 --- a/render.yaml +++ b/render.yaml @@ -70,7 +70,7 @@ services: disk: name: faktory-data mountPath: /var/lib/faktory - sizeGB: 20 + sizeGB: 10 # API - type: web