migrate from rmq to faktory

This commit is contained in:
Andre Medeiros 2022-11-02 00:49:20 -04:00
parent 58cb0460d9
commit d36d4f9b01
13 changed files with 300 additions and 1100 deletions

4
go.mod
View file

@ -4,8 +4,9 @@ go 1.18
require (
github.com/DataDog/datadog-go v4.8.3+incompatible
github.com/adjust/rmq/v5 v5.0.1
github.com/bugsnag/bugsnag-go/v2 v2.2.0
github.com/contribsys/faktory v1.6.2
github.com/contribsys/faktory_worker_go v1.6.0
github.com/dustin/go-humanize v1.0.0
github.com/go-co-op/gocron v1.17.1
github.com/go-ozzo/ozzo-validation/v4 v4.3.0
@ -52,5 +53,6 @@ require (
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

33
go.sum
View file

@ -11,8 +11,6 @@ github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpz
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/adjust/rmq/v5 v5.0.1 h1:3d8IjgB6P5TzkCC8dF+ddzgaM+qHdK/6hKnZ8Jmt1ms=
github.com/adjust/rmq/v5 v5.0.1/go.mod h1:dLrg3gSOWYcXgEifgF8T+kttf3T38Ru5EPC7sTvKghI=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20201120081800-1786d5ef83d4/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
@ -34,7 +32,6 @@ github.com/bugsnag/panicwrap v1.3.4/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywR
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
@ -44,6 +41,10 @@ github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWH
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/codegangsta/negroni v1.0.0/go.mod h1:v0y3T5G7Y1UlFfyxFn/QLRU4a2EuNau2iZY63YTKWo0=
github.com/contribsys/faktory v1.6.2 h1:HuqJI9ZEeInN2nJg10WRy8zPpxNwVIZgACbex7wQG1A=
github.com/contribsys/faktory v1.6.2/go.mod h1:R8+inlM1rq3GzyG8iZUL7qhfNfXGIgcbQRvTHmSyuUI=
github.com/contribsys/faktory_worker_go v1.6.0 h1:ov69BLHL62i/wRLJwvuj5UphwgjMOINRCGW3KzrKOjk=
github.com/contribsys/faktory_worker_go v1.6.0/go.mod h1:XMNGn3sBJdqFGfTH4SkmYkMovhdkq5cDJj36wowfbNY=
github.com/coreos/go-oidc/v3 v3.2.0/go.mod h1:rEJ/idjfUyfkBit1eI1fvyr+64/g9dcKpAm8MJMesvo=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
@ -67,9 +68,7 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ=
github.com/go-co-op/gocron v1.17.1 h1:oEu3xGNVn9IGukN3JPzOsfaBoTGYmUVHtR9d1cv1cq8=
@ -83,7 +82,6 @@ github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiU
github.com/go-ole/go-ole v1.2.6-0.20210915003542-8b1f7f90f6b1/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ozzo/ozzo-validation/v4 v4.3.0 h1:byhDUpfEwjsVQb1vBunvIjh2BHQ9ead57VkAEY4V+Es=
github.com/go-ozzo/ozzo-validation/v4 v4.3.0/go.mod h1:2NKgrcHl3z6cJs+3Oo940FPRiTzuqKbvfrL2RxCj6Ew=
github.com/go-redis/redis/v8 v8.3.2/go.mod h1:jszGxBCez8QA1HWSmQxJO9Y82kNibbUmeYhKWrBejTU=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@ -123,7 +121,6 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gops v0.3.22/go.mod h1:7diIdLsqpCihPSX3fQagksT/Ku/y4RL9LHTlKyEUDl8=
@ -145,7 +142,6 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/heroku/rollrus v0.2.0/go.mod h1:B3MwEcr9nmf4xj0Sr5l9eSht7wLKMa1C+9ajgAU79ek=
github.com/heroku/x v0.0.55 h1:LSXseirdcQaVobauVkRLbN1VnxVmRQgRABrDA1Cz2Q8=
github.com/heroku/x v0.0.55/go.mod h1:YZxbWdDeSewnf/CDZM2UXCZZPU9JZfObfms5FT3P8NA=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/hydrogen18/memlistener v0.0.0-20141126152155-54553eb933fb/go.mod h1:qEIFzExnS6016fRpRfxrExeVn2gbClQA99gQhnIcdhE=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc=
@ -234,16 +230,9 @@ github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@ -319,7 +308,6 @@ github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxt
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA=
go.opentelemetry.io/otel v0.13.0/go.mod h1:dlSNewoRYikTkotEnxdmuBHgzT+k/idJSfDv/FxEnOY=
go.opentelemetry.io/otel v1.0.0-RC1/go.mod h1:x9tRa9HK4hSSq7jf2TKbqFbtt58/TGk0f9XiEYISI1I=
go.opentelemetry.io/otel v1.0.0-RC3/go.mod h1:Ka5j3ua8tZs4Rkq4Ex3hwgBgOchyPVq5S6P2lz//nKQ=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.22.0/go.mod h1:gIp6+vQxqmh6Vd/mucqnsaFpOuVycQAS/BBXMKzJk0w=
@ -384,7 +372,6 @@ golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKG
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -397,9 +384,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200505041828-1ed23360d12c/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220403103023-749bd193bc2b h1:vI32FkLJNAWtGD4BwkThwEy6XS7ZLLMHkSkYfF8M0W0=
@ -417,7 +402,6 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cO
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -429,16 +413,11 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -537,18 +516,14 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s=
gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/square/go-jose.v2 v2.5.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View file

@ -11,7 +11,7 @@ import (
"time"
"github.com/DataDog/datadog-go/statsd"
"github.com/adjust/rmq/v5"
faktory "github.com/contribsys/faktory/client"
"github.com/go-co-op/gocron"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
@ -52,7 +52,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
}
defer redis.Close()
queue, err := cmdutil.NewQueueClient(logger, redis, "worker")
fc, err := cmdutil.NewFaktoryClient(logger)
if err != nil {
return err
}
@ -63,47 +63,15 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
return err
}
notifQueue, err := queue.OpenQueue("notifications")
if err != nil {
return err
}
subredditQueue, err := queue.OpenQueue("subreddits")
if err != nil {
return err
}
trendingQueue, err := queue.OpenQueue("trending")
if err != nil {
return err
}
userQueue, err := queue.OpenQueue("users")
if err != nil {
return err
}
stuckNotificationsQueue, err := queue.OpenQueue("stuck-notifications")
if err != nil {
return err
}
liveActivitiesQueue, err := queue.OpenQueue("live-activities")
if err != nil {
return err
}
s := gocron.NewScheduler(time.UTC)
s.SetMaxConcurrentJobs(8, gocron.WaitMode)
eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) })
eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, fc) })
eaj.SingletonMode()
_, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) })
_, _ = s.Every(5).Seconds().Do(func() { enqueueUsers(ctx, logger, statsd, db, userQueue) })
_, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, liveActivitiesQueue) })
_, _ = s.Every(5).Seconds().Do(func() { cleanQueues(logger, queue) })
_, _ = s.Every(5).Seconds().Do(func() { enqueueStuckAccounts(ctx, logger, statsd, db, stuckNotificationsQueue) })
_, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, fc) })
_, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, fc) })
_, _ = s.Every(5).Seconds().Do(func() { enqueueStuckAccounts(ctx, logger, statsd, db, fc) })
_, _ = s.Every(1).Minute().Do(func() { reportStats(ctx, logger, statsd, db) })
//_, _ = s.Every(1).Minute().Do(func() { pruneAccounts(ctx, logger, db) })
//_, _ = s.Every(1).Minute().Do(func() { pruneDevices(ctx, logger, db) })
@ -144,7 +112,7 @@ func evalScript(ctx context.Context, redis *redis.Client) (string, error) {
return redis.ScriptLoad(ctx, lua).Result()
}
func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) {
func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, fc *faktory.Client) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -193,7 +161,12 @@ func enqueueLiveActivities(ctx context.Context, logger *zap.Logger, pool *pgxpoo
logger.Debug("enqueueing live activity batch", zap.Int("count", len(batch)), zap.Time("start", now))
if err = queue.Publish(batch...); err != nil {
jobs := make([]*faktory.Job, len(batch))
for i, tok := range batch {
jobs[i] = faktory.NewJob("LiveActivityJob", tok)
}
if _, err = fc.PushBulk(jobs); err != nil {
logger.Error("failed to enqueue live activity batch", zap.Error(err))
}
}
@ -240,19 +213,6 @@ func pruneDevices(ctx context.Context, logger *zap.Logger, pool *pgxpool.Pool) {
}
}
func cleanQueues(logger *zap.Logger, jobsConn rmq.Connection) {
cleaner := rmq.NewCleaner(jobsConn)
count, err := cleaner.Clean()
if err != nil {
logger.Error("failed to clean jobs from queues", zap.Error(err))
return
}
if count > 0 {
logger.Info("returned jobs to queues", zap.Int64("count", count))
}
}
func reportStats(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -280,62 +240,7 @@ func reportStats(ctx context.Context, logger *zap.Logger, statsd *statsd.Client,
}
}
func enqueueUsers(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
now := time.Now()
next := now.Add(domain.NotificationCheckInterval)
ids := []int64{}
defer func() {
tags := []string{"queue:users"}
_ = statsd.Histogram("apollo.queue.enqueued", float64(len(ids)), tags, 1)
_ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1)
}()
stmt := `
UPDATE users
SET next_check_at = $2
WHERE id IN (
SELECT id
FROM users
WHERE next_check_at < $1
ORDER BY next_check_at
FOR UPDATE SKIP LOCKED
LIMIT 100
)
RETURNING users.id`
rows, err := pool.Query(ctx, stmt, now, next)
if err != nil {
logger.Error("failed to fetch batch of users", zap.Error(err))
return
}
for rows.Next() {
var id int64
_ = rows.Scan(&id)
ids = append(ids, id)
}
rows.Close()
if len(ids) == 0 {
return
}
logger.Debug("enqueueing user batch", zap.Int("count", len(ids)), zap.Time("start", now))
batchIds := make([]string, len(ids))
for i, id := range ids {
batchIds[i] = strconv.FormatInt(id, 10)
}
if err = queue.Publish(batchIds...); err != nil {
logger.Error("failed to enqueue user batch", zap.Error(err))
}
}
func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queues []rmq.Queue) {
func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, fc *faktory.Client) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -385,22 +290,24 @@ func enqueueSubreddits(ctx context.Context, logger *zap.Logger, statsd *statsd.C
batchIds[i] = strconv.FormatInt(id, 10)
}
for _, queue := range queues {
if err = queue.Publish(batchIds...); err != nil {
logger.Error("failed to enqueue subreddit batch", zap.Error(err))
}
jobs := make([]*faktory.Job, len(batchIds)*2)
for i, id := range ids {
jobs[i*2] = faktory.NewJob("SubredditWatcherJob", id)
jobs[i*2+1] = faktory.NewJob("SubredditTrendingJob", id)
}
if _, err := fc.PushBulk(jobs); err != nil {
logger.Error("failed to enqueue subreddit batch", zap.Error(err))
}
}
func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, queue rmq.Queue) {
func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, fc *faktory.Client) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
now := time.Now()
next := now.Add(domain.StuckNotificationCheckInterval)
ids := []int64{}
ids := []string{}
defer func() {
tags := []string{"queue:stuck-accounts"}
@ -419,7 +326,7 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats
FOR UPDATE SKIP LOCKED
LIMIT 500
)
RETURNING accounts.id`
RETURNING accounts.reddit_account_id`
rows, err := pool.Query(ctx, stmt, now, next)
if err != nil {
logger.Error("failed to fetch accounts", zap.Error(err))
@ -427,7 +334,7 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats
}
for rows.Next() {
var id int64
var id string
_ = rows.Scan(&id)
ids = append(ids, id)
}
@ -439,17 +346,17 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats
logger.Debug("enqueueing stuck account batch", zap.Int("count", len(ids)), zap.Time("start", now))
batchIds := make([]string, len(ids))
jobs := make([]*faktory.Job, len(ids))
for i, id := range ids {
batchIds[i] = strconv.FormatInt(id, 10)
jobs[i] = faktory.NewJob("StuckNotificationsJob", id)
}
if err = queue.Publish(batchIds...); err != nil {
if _, err = fc.PushBulk(jobs); err != nil {
logger.Error("failed to enqueue stuck account batch", zap.Error(err))
}
}
func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) {
func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, fc *faktory.Client) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -520,8 +427,13 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli
return
}
if err = queue.Publish(unlocked...); err != nil {
logger.Error("failed to enqueue account batch", zap.Error(err))
jobs := make([]*faktory.Job, len(ids))
for i, id := range unlocked {
jobs[i] = faktory.NewJob("NotificationCheckJob", id)
}
if _, err = fc.PushBulk(jobs); err != nil {
logger.Error("failed to enqueue stuck account batch", zap.Error(err))
}
}(i, ctx)
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"runtime"
faktoryworker "github.com/contribsys/faktory_worker_go"
"github.com/spf13/cobra"
"github.com/christianselig/apollo-backend/internal/cmdutil"
@ -13,12 +14,11 @@ import (
var (
queues = map[string]worker.NewWorkerFn{
"live-activities": worker.NewLiveActivitiesWorker,
"notifications": worker.NewNotificationsWorker,
"stuck-notifications": worker.NewStuckNotificationsWorker,
"subreddits": worker.NewSubredditsWorker,
"trending": worker.NewTrendingWorker,
"users": worker.NewUsersWorker,
"LiveActivityJob": worker.NewLiveActivitiesWorker,
"NotificationCheckJob": worker.NewNotificationsWorker,
"StuckNotificationsJob": worker.NewStuckNotificationsWorker,
"SubredditWatcherJob": worker.NewSubredditsWorker,
"SubredditTrendingJob": worker.NewTrendingWorker,
}
)
@ -63,26 +63,16 @@ func WorkerCmd(ctx context.Context) *cobra.Command {
}
defer redis.Close()
queue, err := cmdutil.NewQueueClient(logger, redis, "worker")
if err != nil {
return err
}
workerFn, ok := queues[queueID]
if !ok {
return fmt.Errorf("invalid queue: %s", queueID)
return fmt.Errorf("queue does not exist: %s", queueID)
}
worker := workerFn(ctx, logger, statsd, db, redis, consumers)
worker := workerFn(ctx, logger, statsd, db, redis, queue, consumers)
if err := worker.Start(); err != nil {
return err
}
<-ctx.Done()
worker.Stop()
return nil
mgr := faktoryworker.NewManager()
mgr.Concurrency = consumers
mgr.Register(queueID, worker.Process)
return mgr.RunWithContext(ctx)
},
}

View file

@ -7,7 +7,7 @@ import (
"time"
"github.com/DataDog/datadog-go/statsd"
"github.com/adjust/rmq/v5"
faktory "github.com/contribsys/faktory/client"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
"go.uber.org/zap"
@ -74,13 +74,6 @@ func NewDatabasePool(ctx context.Context, maxConns int) (*pgxpool.Pool, error) {
return pgxpool.ConnectConfig(ctx, config)
}
func NewQueueClient(logger *zap.Logger, conn *redis.Client, identifier string) (rmq.Connection, error) {
errChan := make(chan error, 10)
go func() {
for err := range errChan {
logger.Error("error occurred within queue", zap.Error(err))
}
}()
return rmq.OpenConnectionWithRedisClient(identifier, conn, errChan)
func NewFaktoryClient(logger *zap.Logger) (*faktory.Client, error) {
return faktory.Open()
}

View file

@ -9,7 +9,6 @@ import (
"time"
"github.com/DataDog/datadog-go/statsd"
"github.com/adjust/rmq/v5"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sideshow/apns2"
@ -32,22 +31,16 @@ type DynamicIslandNotification struct {
}
type liveActivitiesWorker struct {
context.Context
logger *zap.Logger
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
queue rmq.Connection
reddit *reddit.Client
apns *token.Token
consumers int
logger *zap.Logger
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
reddit *reddit.Client
apns *apns2.Client
liveActivityRepo domain.LiveActivityRepository
}
func NewLiveActivitiesWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
func NewLiveActivitiesWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, consumers int) Worker {
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
@ -56,123 +49,64 @@ func NewLiveActivitiesWorker(ctx context.Context, logger *zap.Logger, statsd *st
consumers,
)
var apns *token.Token
var apns *apns2.Client
{
authKey, err := token.AuthKeyFromFile(os.Getenv("APPLE_KEY_PATH"))
if err != nil {
panic(err)
}
apns = &token.Token{
tok := &token.Token{
AuthKey: authKey,
KeyID: os.Getenv("APPLE_KEY_ID"),
TeamID: os.Getenv("APPLE_TEAM_ID"),
}
apns = apns2.NewTokenClient(tok).Production()
}
return &liveActivitiesWorker{
ctx,
logger,
statsd,
db,
redis,
queue,
reddit,
apns,
consumers,
repository.NewPostgresLiveActivity(db),
}
}
func (law *liveActivitiesWorker) Start() error {
queue, err := law.queue.OpenQueue("live-activities")
if err != nil {
return err
}
law.logger.Info("starting up live activities worker", zap.Int("consumers", law.consumers))
prefetchLimit := int64(law.consumers * 4)
if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil {
return err
}
host, _ := os.Hostname()
for i := 0; i < law.consumers; i++ {
name := fmt.Sprintf("consumer %s-%d", host, i)
consumer := NewLiveActivitiesConsumer(law, i)
if _, err := queue.AddConsumer(name, consumer); err != nil {
return err
}
}
return nil
}
func (law *liveActivitiesWorker) Stop() {
<-law.queue.StopAllConsuming() // wait for all Consume() calls to finish
}
type liveActivitiesConsumer struct {
*liveActivitiesWorker
tag int
apns *apns2.Client
}
func NewLiveActivitiesConsumer(law *liveActivitiesWorker, tag int) *liveActivitiesConsumer {
return &liveActivitiesConsumer{
law,
tag,
apns2.NewTokenClient(law.apns).Production(),
}
}
func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
ctx, cancel := context.WithCancel(lac)
defer cancel()
func (law *liveActivitiesWorker) Process(ctx context.Context, args ...interface{}) error {
now := time.Now()
defer func() {
elapsed := time.Now().Sub(now).Milliseconds()
_ = lac.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), []string{"queue:live_activities"}, 0.1)
_ = law.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), []string{"queue:live_activities"}, 0.1)
}()
at := delivery.Payload()
at := args[0].(string)
key := fmt.Sprintf("locks:live-activities:%s", at)
// Measure queue latency
ttl := lac.redis.PTTL(ctx, key).Val()
ttl := law.redis.PTTL(ctx, key).Val()
age := (domain.NotificationCheckTimeout - ttl)
_ = lac.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), []string{"queue:live_activities"}, 0.1)
_ = law.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), []string{"queue:live_activities"}, 0.1)
defer func() {
if err := lac.redis.Del(ctx, key).Err(); err != nil {
lac.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key))
if err := law.redis.Del(ctx, key).Err(); err != nil {
law.logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key))
}
}()
lac.logger.Debug("starting job", zap.String("live_activity#apns_token", at))
law.logger.Debug("starting job", zap.String("live_activity#apns_token", at))
defer func() {
if err := delivery.Ack(); err != nil {
lac.logger.Error("failed to acknowledge message", zap.Error(err), zap.String("live_activity#apns_token", at))
}
}()
la, err := lac.liveActivityRepo.Get(ctx, at)
la, err := law.liveActivityRepo.Get(ctx, at)
if err != nil {
lac.logger.Error("failed to get live activity", zap.Error(err), zap.String("live_activity#apns_token", at))
return
law.logger.Error("failed to get live activity", zap.Error(err), zap.String("live_activity#apns_token", at))
return err
}
rac := lac.reddit.NewAuthenticatedClient(la.RedditAccountID, la.RefreshToken, la.AccessToken)
rac := law.reddit.NewAuthenticatedClient(la.RedditAccountID, la.RefreshToken, la.AccessToken)
if la.TokenExpiresAt.Before(now.Add(5 * time.Minute)) {
lac.logger.Debug("refreshing reddit token",
law.logger.Debug("refreshing reddit token",
zap.String("live_activity#apns_token", at),
zap.String("reddit#id", la.RedditAccountID),
zap.String("reddit#access_token", rac.ObfuscatedAccessToken()),
@ -181,7 +115,7 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
tokens, err := rac.RefreshTokens(ctx)
if err != nil {
lac.logger.Error("failed to refresh reddit tokens",
law.logger.Error("failed to refresh reddit tokens",
zap.Error(err),
zap.String("live_activity#apns_token", at),
zap.String("reddit#id", la.RedditAccountID),
@ -189,26 +123,28 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()),
)
if err == reddit.ErrOauthRevoked {
_ = lac.liveActivityRepo.Delete(ctx, at)
_ = law.liveActivityRepo.Delete(ctx, at)
return nil
}
return
return err
}
// Update account
la.AccessToken = tokens.AccessToken
la.RefreshToken = tokens.RefreshToken
la.TokenExpiresAt = now.Add(tokens.Expiry)
_ = lac.liveActivityRepo.Update(ctx, &la)
_ = law.liveActivityRepo.Update(ctx, &la)
// Refresh client
rac = lac.reddit.NewAuthenticatedClient(la.RedditAccountID, tokens.RefreshToken, tokens.AccessToken)
rac = law.reddit.NewAuthenticatedClient(la.RedditAccountID, tokens.RefreshToken, tokens.AccessToken)
}
lac.logger.Debug("fetching latest comments", zap.String("live_activity#apns_token", at))
law.logger.Debug("fetching latest comments", zap.String("live_activity#apns_token", at))
tr, err := rac.TopLevelComments(ctx, la.Subreddit, la.ThreadID)
if err != nil {
lac.logger.Error("failed to fetch latest comments",
law.logger.Error("failed to fetch latest comments",
zap.Error(err),
zap.String("live_activity#apns_token", at),
zap.String("reddit#id", la.RedditAccountID),
@ -216,14 +152,16 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
zap.String("reddit#refresh_token", rac.ObfuscatedRefreshToken()),
)
if err == reddit.ErrOauthRevoked {
_ = lac.liveActivityRepo.Delete(ctx, at)
_ = law.liveActivityRepo.Delete(ctx, at)
return nil
}
return
return err
}
if len(tr.Children) == 0 && la.ExpiresAt.After(now) {
lac.logger.Debug("no comments found", zap.String("live_activity#apns_token", at))
return
law.logger.Debug("no comments found", zap.String("live_activity#apns_token", at))
return nil
}
// Filter out comments in the last minute
@ -247,8 +185,8 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
}
if len(candidates) == 0 && la.ExpiresAt.After(now) {
lac.logger.Debug("no new comments found", zap.String("live_activity#apns_token", at))
return
law.logger.Debug("no new comments found", zap.String("live_activity#apns_token", at))
return nil
}
sort.Slice(candidates, func(i, j int) bool {
@ -291,20 +229,20 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
Payload: bb,
}
res, err := lac.apns.PushWithContext(ctx, notification)
res, err := law.apns.PushWithContext(ctx, notification)
if err != nil {
_ = lac.statsd.Incr("apns.live_activities.errors", []string{}, 1)
lac.logger.Error("failed to send notification",
_ = law.statsd.Incr("apns.live_activities.errors", []string{}, 1)
law.logger.Error("failed to send notification",
zap.Error(err),
zap.String("live_activity#apns_token", at),
zap.Bool("live_activity#sandbox", la.Sandbox),
zap.String("notification#type", ev),
)
_ = lac.liveActivityRepo.Delete(ctx, at)
_ = law.liveActivityRepo.Delete(ctx, at)
} else if !res.Sent() {
_ = lac.statsd.Incr("apns.live_activities.errors", []string{}, 1)
lac.logger.Error("notification not sent",
_ = law.statsd.Incr("apns.live_activities.errors", []string{}, 1)
law.logger.Error("notification not sent",
zap.String("live_activity#apns_token", at),
zap.Bool("live_activity#sandbox", la.Sandbox),
zap.String("notification#type", ev),
@ -312,10 +250,10 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
zap.String("response#reason", res.Reason),
)
_ = lac.liveActivityRepo.Delete(ctx, at)
_ = law.liveActivityRepo.Delete(ctx, at)
} else {
_ = lac.statsd.Incr("apns.notification.sent", []string{}, 1)
lac.logger.Debug("sent notification",
_ = law.statsd.Incr("apns.notification.sent", []string{}, 1)
law.logger.Debug("sent notification",
zap.String("live_activity#apns_token", at),
zap.Bool("live_activity#sandbox", la.Sandbox),
zap.String("notification#type", ev),
@ -323,11 +261,12 @@ func (lac *liveActivitiesConsumer) Consume(delivery rmq.Delivery) {
}
if la.ExpiresAt.Before(now) {
lac.logger.Debug("live activity expired, deleting", zap.String("live_activity#apns_token", at))
_ = lac.liveActivityRepo.Delete(ctx, at)
law.logger.Debug("live activity expired, deleting", zap.String("live_activity#apns_token", at))
_ = law.liveActivityRepo.Delete(ctx, at)
}
lac.logger.Debug("finishing job",
law.logger.Debug("finishing job",
zap.String("live_activity#apns_token", at),
)
return nil
}

View file

@ -7,7 +7,6 @@ import (
"time"
"github.com/DataDog/datadog-go/statsd"
"github.com/adjust/rmq/v5"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sideshow/apns2"
@ -32,23 +31,17 @@ const (
var notificationTags = []string{"queue:notifications"}
type notificationsWorker struct {
context.Context
logger *zap.Logger
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
queue rmq.Connection
reddit *reddit.Client
apns *token.Token
consumers int
logger *zap.Logger
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
reddit *reddit.Client
apns *apns2.Client
accountRepo domain.AccountRepository
deviceRepo domain.DeviceRepository
}
func NewNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
func NewNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, consumers int) Worker {
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
@ -57,123 +50,65 @@ func NewNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd *sta
consumers,
)
var apns *token.Token
var apns *apns2.Client
{
authKey, err := token.AuthKeyFromFile(os.Getenv("APPLE_KEY_PATH"))
if err != nil {
panic(err)
}
apns = &token.Token{
tok := &token.Token{
AuthKey: authKey,
KeyID: os.Getenv("APPLE_KEY_ID"),
TeamID: os.Getenv("APPLE_TEAM_ID"),
}
apns = apns2.NewTokenClient(tok).Production()
}
return &notificationsWorker{
ctx,
logger,
statsd,
db,
redis,
queue,
reddit,
apns,
consumers,
repository.NewPostgresAccount(db),
repository.NewPostgresDevice(db),
}
}
func (nw *notificationsWorker) Start() error {
queue, err := nw.queue.OpenQueue("notifications")
if err != nil {
return err
}
nw.logger.Info("starting up notifications worker", zap.Int("consumers", nw.consumers))
prefetchLimit := int64(nw.consumers * 20)
if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil {
return err
}
host, _ := os.Hostname()
for i := 0; i < nw.consumers; i++ {
name := fmt.Sprintf("consumer %s-%d", host, i)
consumer := NewNotificationsConsumer(nw, i)
if _, err := queue.AddConsumer(name, consumer); err != nil {
return err
}
}
return nil
}
func (nw *notificationsWorker) Stop() {
<-nw.queue.StopAllConsuming() // wait for all Consume() calls to finish
}
type notificationsConsumer struct {
*notificationsWorker
tag int
apns *apns2.Client
}
func NewNotificationsConsumer(nw *notificationsWorker, tag int) *notificationsConsumer {
return &notificationsConsumer{
nw,
tag,
apns2.NewTokenClient(nw.apns).Production(),
}
}
func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
ctx, cancel := context.WithCancel(nc)
defer cancel()
func (nw *notificationsWorker) Process(ctx context.Context, args ...interface{}) error {
now := time.Now()
defer func() {
elapsed := time.Now().Sub(now).Milliseconds()
_ = nc.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), notificationTags, 0.1)
_ = nc.statsd.Incr("apollo.consumer.executions", notificationTags, 0.1)
_ = nw.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), notificationTags, 0.1)
_ = nw.statsd.Incr("apollo.consumer.executions", notificationTags, 0.1)
}()
id := delivery.Payload()
logger := nc.logger.With(zap.String("account#reddit_account_id", id))
id := args[0].(string)
logger := nw.logger.With(zap.String("account#reddit_account_id", id))
// Measure queue latency
key := fmt.Sprintf("locks:accounts:%s", id)
ttl := nc.redis.PTTL(ctx, key).Val()
ttl := nw.redis.PTTL(ctx, key).Val()
age := (domain.NotificationCheckTimeout - ttl)
_ = nc.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), notificationTags, 0.1)
_ = nw.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), notificationTags, 0.1)
defer func() {
if err := nc.redis.Del(ctx, key).Err(); err != nil {
if err := nw.redis.Del(ctx, key).Err(); err != nil {
logger.Error("failed to remove account lock", zap.Error(err), zap.String("key", key))
}
}()
logger.Debug("starting job")
defer func() {
if err := delivery.Ack(); err != nil {
logger.Error("failed to acknowledge message", zap.Error(err))
}
}()
account, err := nc.accountRepo.GetByRedditID(ctx, id)
account, err := nw.accountRepo.GetByRedditID(ctx, id)
if err != nil {
logger.Info("account not found, exiting", zap.Error(err))
return
return nil
}
rac := nc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken)
rac := nw.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken)
logger = logger.With(
zap.String("account#username", account.NormalizedUsername()),
zap.String("account#access_token", rac.ObfuscatedAccessToken()),
@ -186,24 +121,25 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
if err != nil {
if err != reddit.ErrOauthRevoked {
logger.Error("failed to refresh reddit tokens", zap.Error(err))
return
return err
}
if err = nc.deleteAccount(ctx, account); err != nil {
if err = nw.deleteAccount(ctx, account); err != nil {
logger.Error("failed to remove revoked account", zap.Error(err))
return err
}
return
return nil
}
// Update account
account.AccessToken = tokens.AccessToken
account.RefreshToken = tokens.RefreshToken
account.TokenExpiresAt = now.Add(tokens.Expiry)
_ = nc.accountRepo.Update(ctx, &account)
_ = nw.accountRepo.Update(ctx, &account)
// Refresh client
rac = nc.reddit.NewAuthenticatedClient(account.AccountID, tokens.RefreshToken, tokens.AccessToken)
rac = nw.reddit.NewAuthenticatedClient(account.AccountID, tokens.RefreshToken, tokens.AccessToken)
logger = logger.With(
zap.String("account#access_token", rac.ObfuscatedAccessToken()),
zap.String("account#refresh_token", rac.ObfuscatedRefreshToken()),
@ -220,24 +156,20 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
if err != nil {
switch err {
case reddit.ErrTimeout, reddit.ErrRateLimited: // Don't log timeouts or rate limits
break
case reddit.ErrOauthRevoked:
if err = nc.deleteAccount(ctx, account); err != nil {
logger.Error("failed to remove revoked account", zap.Error(err))
} else {
logger.Info("removed revoked account")
}
_ = nw.deleteAccount(ctx, account)
logger.Info("removed revoked account")
return nil
default:
logger.Error("failed to fetch message inbox", zap.Error(err))
return err
}
return
}
// Figure out where we stand
if msgs.Count == 0 {
logger.Debug("no new messages, bailing early")
return
return nil
}
logger.Debug("fetched messages", zap.Int("count", msgs.Count))
@ -245,7 +177,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
for _, msg := range msgs.Children {
if !msg.IsDeleted() {
account.LastMessageID = msg.FullName()
_ = nc.accountRepo.Update(ctx, &account)
_ = nw.accountRepo.Update(ctx, &account)
break
}
}
@ -255,19 +187,19 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
logger.Debug("populating first message id to prevent spamming")
account.CheckCount = 1
_ = nc.accountRepo.Update(ctx, &account)
return
_ = nw.accountRepo.Update(ctx, &account)
return nil
}
devices, err := nc.deviceRepo.GetInboxNotifiableByAccountID(ctx, account.ID)
devices, err := nw.deviceRepo.GetInboxNotifiableByAccountID(ctx, account.ID)
if err != nil {
logger.Error("failed to fetch account devices", zap.Error(err))
return
return nil
}
if len(devices) == 0 {
logger.Debug("no notifiable devices, bailing early")
return
return nil
}
// Iterate backwards so we notify from older to newer
@ -281,7 +213,7 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
// Latency is the time difference between the appearence of the new message and the
// time we notified at.
latency := now.Sub(msg.CreatedAt)
_ = nc.statsd.Histogram("apollo.queue.delay", float64(latency.Milliseconds()), []string{}, 1.0)
_ = nw.statsd.Histogram("apollo.queue.delay", float64(latency.Milliseconds()), []string{}, 1.0)
notification := &apns2.Notification{}
notification.Topic = "com.christianselig.Apollo"
@ -290,18 +222,18 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
for _, device := range devices {
notification.DeviceToken = device.APNSToken
res, err := nc.apns.PushWithContext(ctx, notification)
res, err := nw.apns.PushWithContext(ctx, notification)
if err != nil {
_ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
_ = nw.statsd.Incr("apns.notification.errors", []string{}, 1)
logger.Error("failed to send notification",
zap.Error(err),
zap.String("device#token", device.APNSToken),
)
// Delete device as notifications might have been disabled here
_ = nc.deviceRepo.Delete(ctx, device.APNSToken)
_ = nw.deviceRepo.Delete(ctx, device.APNSToken)
} else if !res.Sent() {
_ = nc.statsd.Incr("apns.notification.errors", []string{}, 1)
_ = nw.statsd.Incr("apns.notification.errors", []string{}, 1)
logger.Error("notification not sent",
zap.String("device#token", device.APNSToken),
zap.Int("response#status", res.StatusCode),
@ -309,34 +241,35 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
)
// Delete device as notifications might have been disabled here
_ = nc.deviceRepo.Delete(ctx, device.APNSToken)
_ = nw.deviceRepo.Delete(ctx, device.APNSToken)
} else {
_ = nc.statsd.Incr("apns.notification.sent", []string{}, 1)
_ = nw.statsd.Incr("apns.notification.sent", []string{}, 1)
logger.Info("sent notification", zap.String("device#token", device.APNSToken))
}
}
}
ev := fmt.Sprintf("Sent notification to /u/%s (x%d)", account.Username, msgs.Count)
_ = nc.statsd.SimpleEvent(ev, "")
_ = nw.statsd.SimpleEvent(ev, "")
logger.Debug("finishing job")
return nil
}
func (nc *notificationsConsumer) deleteAccount(ctx context.Context, account domain.Account) error {
func (nw *notificationsWorker) deleteAccount(ctx context.Context, account domain.Account) error {
// Disassociate account from devices
devs, err := nc.deviceRepo.GetByAccountID(ctx, account.ID)
devs, err := nw.deviceRepo.GetByAccountID(ctx, account.ID)
if err != nil {
return err
}
for _, dev := range devs {
if err := nc.accountRepo.Disassociate(nc, &account, &dev); err != nil {
if err := nw.accountRepo.Disassociate(ctx, &account, &dev); err != nil {
return err
}
}
return nc.accountRepo.Delete(nc, account.ID)
return nw.accountRepo.Delete(ctx, account.ID)
}
func payloadFromMessage(acct domain.Account, msg *reddit.Thing, badgeCount int) *payload.Payload {

View file

@ -2,13 +2,10 @@ package worker
import (
"context"
"fmt"
"os"
"strconv"
"time"
"github.com/DataDog/datadog-go/statsd"
"github.com/adjust/rmq/v5"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
"go.uber.org/zap"
@ -19,21 +16,15 @@ import (
)
type stuckNotificationsWorker struct {
context.Context
logger *zap.Logger
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
queue rmq.Connection
reddit *reddit.Client
consumers int
logger *zap.Logger
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
reddit *reddit.Client
accountRepo domain.AccountRepository
}
func NewStuckNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
func NewStuckNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, consumers int) Worker {
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
@ -43,102 +34,42 @@ func NewStuckNotificationsWorker(ctx context.Context, logger *zap.Logger, statsd
)
return &stuckNotificationsWorker{
ctx,
logger,
statsd,
db,
redis,
queue,
reddit,
consumers,
repository.NewPostgresAccount(db),
}
}
func (snw *stuckNotificationsWorker) Start() error {
queue, err := snw.queue.OpenQueue("stuck-notifications")
if err != nil {
return err
}
snw.logger.Info("starting up stuck notifications worker", zap.Int("consumers", snw.consumers))
prefetchLimit := int64(snw.consumers * 2)
if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil {
return err
}
host, _ := os.Hostname()
for i := 0; i < snw.consumers; i++ {
name := fmt.Sprintf("consumer %s-%d", host, i)
consumer := NewStuckNotificationsConsumer(snw, i)
if _, err := queue.AddConsumer(name, consumer); err != nil {
return err
}
}
return nil
}
func (snw *stuckNotificationsWorker) Stop() {
<-snw.queue.StopAllConsuming() // wait for all Consume() calls to finish
}
type stuckNotificationsConsumer struct {
*stuckNotificationsWorker
tag int
}
func NewStuckNotificationsConsumer(snw *stuckNotificationsWorker, tag int) *stuckNotificationsConsumer {
return &stuckNotificationsConsumer{
snw,
tag,
}
}
func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) {
ctx, cancel := context.WithCancel(snc)
defer cancel()
func (snw *stuckNotificationsWorker) Process(ctx context.Context, args ...interface{}) error {
now := time.Now()
defer func() {
elapsed := time.Now().Sub(now).Milliseconds()
_ = snc.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), []string{"queue:stuck-notifications"}, 0.1)
_ = snw.statsd.Histogram("apollo.consumer.runtime", float64(elapsed), []string{"queue:stuck-notifications"}, 0.1)
}()
id, err := strconv.ParseInt(delivery.Payload(), 10, 64)
id := args[0].(int64)
snw.logger.Debug("starting job", zap.Int64("account#id", id))
account, err := snw.accountRepo.GetByID(ctx, id)
if err != nil {
snc.logger.Error("failed to parse account id from payload", zap.Error(err), zap.String("payload", delivery.Payload()))
_ = delivery.Reject()
return
}
snc.logger.Debug("starting job", zap.Int64("account#id", id))
defer func() { _ = delivery.Ack() }()
account, err := snc.accountRepo.GetByID(ctx, id)
if err != nil {
snc.logger.Error("failed to fetch account from database", zap.Error(err), zap.Int64("account#id", id))
return
snw.logger.Error("failed to fetch account from database", zap.Error(err), zap.Int64("account#id", id))
return nil
}
if account.LastMessageID == "" {
snc.logger.Debug("account has no messages, bailing early",
snw.logger.Debug("account has no messages, bailing early",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return
return nil
}
rac := snc.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken)
rac := snw.reddit.NewAuthenticatedClient(account.AccountID, account.RefreshToken, account.AccessToken)
snc.logger.Debug("fetching last thing",
snw.logger.Debug("fetching last thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
@ -147,7 +78,7 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) {
var things *reddit.ListingResponse
if kind == "t4" {
snc.logger.Debug("checking last thing via inbox",
snw.logger.Debug("checking last thing via inbox",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
@ -155,23 +86,23 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) {
things, err = rac.MessageInbox(ctx)
if err != nil {
if err != reddit.ErrRateLimited {
snc.logger.Error("failed to fetch last thing via inbox",
snw.logger.Error("failed to fetch last thing via inbox",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
}
return
return nil
}
} else {
things, err = rac.AboutInfo(ctx, account.LastMessageID)
if err != nil {
snc.logger.Error("failed to fetch last thing",
snw.logger.Error("failed to fetch last thing",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return
return nil
}
}
@ -186,17 +117,17 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) {
}
if kind == "t4" {
return
return nil
}
sthings, err := rac.MessageInbox(ctx)
if err != nil {
snc.logger.Error("failed to check inbox",
snw.logger.Error("failed to check inbox",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return
return nil
}
found := false
@ -207,7 +138,7 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) {
}
if !found {
snc.logger.Debug("thing exists, but not on inbox, marking as deleted",
snw.logger.Debug("thing exists, but not on inbox, marking as deleted",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
@ -215,47 +146,47 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) {
break
}
snc.logger.Debug("thing exists, bailing early",
snw.logger.Debug("thing exists, bailing early",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
)
return
return nil
}
}
snc.logger.Info("thing got deleted, resetting",
snw.logger.Info("thing got deleted, resetting",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
)
if kind != "t4" {
snc.logger.Debug("getting message inbox to find last good thing",
snw.logger.Debug("getting message inbox to find last good thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
things, err = rac.MessageInbox(ctx)
if err != nil {
snc.logger.Error("failed to check inbox",
snw.logger.Error("failed to check inbox",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return
return nil
}
}
account.LastMessageID = ""
snc.logger.Debug("calculating last good thing",
snw.logger.Debug("calculating last good thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
for _, thing := range things.Children {
if thing.IsDeleted() {
snc.logger.Debug("thing got deleted, checking next",
snw.logger.Debug("thing got deleted, checking next",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", thing.FullName()),
@ -267,17 +198,20 @@ func (snc *stuckNotificationsConsumer) Consume(delivery rmq.Delivery) {
break
}
snc.logger.Debug("updating last good thing",
snw.logger.Debug("updating last good thing",
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
zap.String("thing#id", account.LastMessageID),
)
if err := snc.accountRepo.Update(ctx, &account); err != nil {
snc.logger.Error("failed to update account's last message id",
if err := snw.accountRepo.Update(ctx, &account); err != nil {
snw.logger.Error("failed to update account's last message id",
zap.Error(err),
zap.Int64("account#id", id),
zap.String("account#username", account.NormalizedUsername()),
)
return err
}
return nil
}

View file

@ -5,12 +5,10 @@ import (
"fmt"
"math/rand"
"os"
"strconv"
"strings"
"time"
"github.com/DataDog/datadog-go/statsd"
"github.com/adjust/rmq/v5"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sideshow/apns2"
@ -24,17 +22,12 @@ import (
)
type subredditsWorker struct {
context.Context
logger *zap.Logger
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
queue rmq.Connection
reddit *reddit.Client
apns *token.Token
consumers int
apns *apns2.Client
accountRepo domain.AccountRepository
deviceRepo domain.DeviceRepository
@ -47,7 +40,7 @@ const (
subredditNotificationBodyFormat = "r/%s: \u201c%s\u201d"
)
func NewSubredditsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
func NewSubredditsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, consumers int) Worker {
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
@ -56,31 +49,28 @@ func NewSubredditsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd
consumers,
)
var apns *token.Token
var apns *apns2.Client
{
authKey, err := token.AuthKeyFromFile(os.Getenv("APPLE_KEY_PATH"))
if err != nil {
panic(err)
}
apns = &token.Token{
tok := &token.Token{
AuthKey: authKey,
KeyID: os.Getenv("APPLE_KEY_ID"),
TeamID: os.Getenv("APPLE_TEAM_ID"),
}
apns = apns2.NewTokenClient(tok).Production()
}
return &subredditsWorker{
ctx,
logger,
statsd,
db,
redis,
queue,
reddit,
apns,
consumers,
repository.NewPostgresAccount(db),
repository.NewPostgresDevice(db),
repository.NewPostgresSubreddit(db),
@ -88,92 +78,32 @@ func NewSubredditsWorker(ctx context.Context, logger *zap.Logger, statsd *statsd
}
}
func (sw *subredditsWorker) Start() error {
queue, err := sw.queue.OpenQueue("subreddits")
func (sw *subredditsWorker) Process(ctx context.Context, args ...interface{}) error {
id := args[0].(int64)
sw.logger.Debug("starting job", zap.Int64("subreddit#id", id))
subreddit, err := sw.subredditRepo.GetByID(ctx, id)
if err != nil {
return err
sw.logger.Error("failed to fetch subreddit from database", zap.Error(err), zap.Int64("subreddit#id", id))
return nil
}
sw.logger.Info("starting up subreddits worker", zap.Int("consumers", sw.consumers))
prefetchLimit := int64(sw.consumers * 2)
if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil {
return err
}
host, _ := os.Hostname()
for i := 0; i < sw.consumers; i++ {
name := fmt.Sprintf("consumer %s-%d", host, i)
consumer := NewSubredditsConsumer(sw, i)
if _, err := queue.AddConsumer(name, consumer); err != nil {
return err
}
}
return nil
}
func (sw *subredditsWorker) Stop() {
<-sw.queue.StopAllConsuming() // wait for all Consume() calls to finish
}
type subredditsConsumer struct {
*subredditsWorker
tag int
apnsSandbox *apns2.Client
apnsProduction *apns2.Client
}
func NewSubredditsConsumer(sw *subredditsWorker, tag int) *subredditsConsumer {
return &subredditsConsumer{
sw,
tag,
apns2.NewTokenClient(sw.apns),
apns2.NewTokenClient(sw.apns).Production(),
}
}
func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
ctx, cancel := context.WithCancel(sc)
defer cancel()
id, err := strconv.ParseInt(delivery.Payload(), 10, 64)
watchers, err := sw.watcherRepo.GetBySubredditID(ctx, subreddit.ID)
if err != nil {
sc.logger.Error("failed to parse subreddit id from payload", zap.Error(err), zap.String("payload", delivery.Payload()))
_ = delivery.Reject()
return
}
sc.logger.Debug("starting job", zap.Int64("subreddit#id", id))
defer func() { _ = delivery.Ack() }()
subreddit, err := sc.subredditRepo.GetByID(ctx, id)
if err != nil {
sc.logger.Error("failed to fetch subreddit from database", zap.Error(err), zap.Int64("subreddit#id", id))
return
}
watchers, err := sc.watcherRepo.GetBySubredditID(ctx, subreddit.ID)
if err != nil {
sc.logger.Error("failed to fetch watchers from database",
sw.logger.Error("failed to fetch watchers from database",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return
return err
}
if len(watchers) == 0 {
sc.logger.Debug("no watchers for subreddit, bailing early",
sw.logger.Debug("no watchers for subreddit, bailing early",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return
return nil
}
threshold := time.Now().Add(-24 * time.Hour)
@ -183,13 +113,13 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
seenPosts := map[string]bool{}
// Load 500 newest posts
sc.logger.Debug("loading up to 500 new posts",
sw.logger.Debug("loading up to 500 new posts",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
for page := 0; page < 5; page++ {
sc.logger.Debug("loading new posts",
sw.logger.Debug("loading new posts",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("page", page),
@ -198,7 +128,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
i := rand.Intn(len(watchers))
watcher := watchers[i]
rac := sc.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken)
rac := sw.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken)
sps, err := rac.SubredditNew(ctx,
subreddit.Name,
reddit.WithQuery("before", before),
@ -208,7 +138,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
)
if err != nil {
sc.logger.Error("failed to fetch new posts",
sw.logger.Error("failed to fetch new posts",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
@ -217,7 +147,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
continue
}
sc.logger.Debug("loaded new posts",
sw.logger.Debug("loaded new posts",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("page", page),
@ -247,7 +177,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
}
if finished {
sc.logger.Debug("reached date threshold",
sw.logger.Debug("reached date threshold",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("page", page),
@ -257,7 +187,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
}
// Load hot posts
sc.logger.Debug("loading hot posts",
sw.logger.Debug("loading hot posts",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
@ -265,7 +195,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
i := rand.Intn(len(watchers))
watcher := watchers[i]
rac := sc.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken)
rac := sw.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken)
sps, err := rac.SubredditHot(ctx,
subreddit.Name,
reddit.WithQuery("limit", "100"),
@ -274,13 +204,13 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
)
if err != nil {
sc.logger.Error("failed to fetch hot posts",
sw.logger.Error("failed to fetch hot posts",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
} else {
sc.logger.Debug("loaded hot posts",
sw.logger.Debug("loaded hot posts",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("count", sps.Count),
@ -298,7 +228,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
}
}
sc.logger.Debug("checking posts for watcher hits",
sw.logger.Debug("checking posts for watcher hits",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("count", len(posts)),
@ -339,7 +269,7 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
continue
}
sc.logger.Debug("matched post",
sw.logger.Debug("matched post",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
@ -351,10 +281,10 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
)
lockKey := fmt.Sprintf("watcher:%d:%s", watcher.DeviceID, post.ID)
notified, _ := sc.redis.Get(ctx, lockKey).Bool()
notified, _ := sw.redis.Get(ctx, lockKey).Bool()
if notified {
sc.logger.Debug("already notified, skipping",
sw.logger.Debug("already notified, skipping",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
@ -363,30 +293,30 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
continue
}
if err := sc.watcherRepo.IncrementHits(ctx, watcher.ID); err != nil {
sc.logger.Error("could not increment hits",
if err := sw.watcherRepo.IncrementHits(ctx, watcher.ID); err != nil {
sw.logger.Error("could not increment hits",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
)
return
return err
}
sc.logger.Debug("got a hit",
sw.logger.Debug("got a hit",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
zap.String("post#id", post.ID),
)
sc.redis.SetEX(ctx, lockKey, true, 24*time.Hour)
sw.redis.SetEX(ctx, lockKey, true, 24*time.Hour)
notifs = append(notifs, watcher)
}
if len(notifs) == 0 {
continue
}
sc.logger.Debug("got hits for post",
sw.logger.Debug("got hits for post",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.String("post#id", post.ID),
@ -407,24 +337,20 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
notification.DeviceToken = watcher.Device.APNSToken
notification.Payload = payload
client := sc.apnsProduction
if watcher.Device.Sandbox {
client = sc.apnsSandbox
}
res, err := client.Push(notification)
res, err := sw.apns.Push(notification)
if err != nil {
_ = sc.statsd.Incr("apns.notification.errors", []string{}, 1)
sc.logger.Error("failed to send notification",
_ = sw.statsd.Incr("apns.notification.errors", []string{}, 1)
sw.logger.Error("failed to send notification",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.String("post#id", post.ID),
zap.String("apns", watcher.Device.APNSToken),
)
return err
} else if !res.Sent() {
_ = sc.statsd.Incr("apns.notification.errors", []string{}, 1)
sc.logger.Error("notificaion not sent",
_ = sw.statsd.Incr("apns.notification.errors", []string{}, 1)
sw.logger.Error("notificaion not sent",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.String("post#id", post.ID),
@ -433,8 +359,8 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
zap.String("response#reason", res.Reason),
)
} else {
_ = sc.statsd.Incr("apns.notification.sent", []string{}, 1)
sc.logger.Info("sent notification",
_ = sw.statsd.Incr("apns.notification.sent", []string{}, 1)
sw.logger.Info("sent notification",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.String("post#id", post.ID),
@ -444,10 +370,12 @@ func (sc *subredditsConsumer) Consume(delivery rmq.Delivery) {
}
}
sc.logger.Debug("finishing job",
sw.logger.Debug("finishing job",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return nil
}
func payloadFromPost(post *reddit.Thing) *payload.Payload {

View file

@ -6,11 +6,9 @@ import (
"math/rand"
"os"
"sort"
"strconv"
"time"
"github.com/DataDog/datadog-go/statsd"
"github.com/adjust/rmq/v5"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sideshow/apns2"
@ -24,16 +22,11 @@ import (
)
type trendingWorker struct {
context.Context
logger *zap.Logger
statsd *statsd.Client
redis *redis.Client
queue rmq.Connection
reddit *reddit.Client
apns *token.Token
consumers int
apns *apns2.Client
accountRepo domain.AccountRepository
deviceRepo domain.DeviceRepository
@ -43,7 +36,7 @@ type trendingWorker struct {
const trendingNotificationTitleFormat = "🔥 r/%s Trending"
func NewTrendingWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
func NewTrendingWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, consumers int) Worker {
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
@ -52,29 +45,27 @@ func NewTrendingWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.C
consumers,
)
var apns *token.Token
var apns *apns2.Client
{
authKey, err := token.AuthKeyFromFile(os.Getenv("APPLE_KEY_PATH"))
if err != nil {
panic(err)
}
apns = &token.Token{
tok := &token.Token{
AuthKey: authKey,
KeyID: os.Getenv("APPLE_KEY_ID"),
TeamID: os.Getenv("APPLE_TEAM_ID"),
}
apns = apns2.NewTokenClient(tok).Production()
}
return &trendingWorker{
ctx,
logger,
statsd,
redis,
queue,
reddit,
apns,
consumers,
repository.NewPostgresAccount(db),
repository.NewPostgresDevice(db),
@ -83,130 +74,70 @@ func NewTrendingWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.C
}
}
func (tw *trendingWorker) Start() error {
queue, err := tw.queue.OpenQueue("trending")
func (tw *trendingWorker) Process(ctx context.Context, args ...interface{}) error {
id := args[0].(int64)
tw.logger.Debug("starting job", zap.Int64("subreddit#id", id))
subreddit, err := tw.subredditRepo.GetByID(ctx, id)
if err != nil {
return err
tw.logger.Error("failed to fetch subreddit from database", zap.Error(err), zap.Int64("subreddit#id", id))
return nil
}
tw.logger.Info("starting up trending subreddits worker", zap.Int("consumers", tw.consumers))
prefetchLimit := int64(tw.consumers * 2)
if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil {
return err
}
host, _ := os.Hostname()
for i := 0; i < tw.consumers; i++ {
name := fmt.Sprintf("consumer %s-%d", host, i)
consumer := NewTrendingConsumer(tw, i)
if _, err := queue.AddConsumer(name, consumer); err != nil {
return err
}
}
return nil
}
func (tw *trendingWorker) Stop() {
<-tw.queue.StopAllConsuming() // wait for all Consume() calls to finish
}
type trendingConsumer struct {
*trendingWorker
tag int
apnsSandbox *apns2.Client
apnsProduction *apns2.Client
}
func NewTrendingConsumer(tw *trendingWorker, tag int) *trendingConsumer {
return &trendingConsumer{
tw,
tag,
apns2.NewTokenClient(tw.apns),
apns2.NewTokenClient(tw.apns).Production(),
}
}
func (tc *trendingConsumer) Consume(delivery rmq.Delivery) {
ctx, cancel := context.WithCancel(tc)
defer cancel()
id, err := strconv.ParseInt(delivery.Payload(), 10, 64)
watchers, err := tw.watcherRepo.GetByTrendingSubredditID(ctx, subreddit.ID)
if err != nil {
tc.logger.Error("failed to parse subreddit id from payload", zap.Error(err), zap.String("payload", delivery.Payload()))
_ = delivery.Reject()
return
}
tc.logger.Debug("starting job", zap.Int64("subreddit#id", id))
defer func() { _ = delivery.Ack() }()
subreddit, err := tc.subredditRepo.GetByID(ctx, id)
if err != nil {
tc.logger.Error("failed to fetch subreddit from database", zap.Error(err), zap.Int64("subreddit#id", id))
return
}
watchers, err := tc.watcherRepo.GetByTrendingSubredditID(ctx, subreddit.ID)
if err != nil {
tc.logger.Error("failed to fetch watchers from database",
tw.logger.Error("failed to fetch watchers from database",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return
return err
}
if len(watchers) == 0 {
tc.logger.Debug("no watchers for subreddit, bailing early",
tw.logger.Debug("no watchers for subreddit, bailing early",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return
return nil
}
// Grab last month's top posts so we calculate a trending average
i := rand.Intn(len(watchers))
watcher := watchers[i]
rac := tc.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken)
rac := tw.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken)
tps, err := rac.SubredditTop(ctx, subreddit.Name, reddit.WithQuery("t", "week"), reddit.WithQuery("show", "all"), reddit.WithQuery("limit", "25"))
if err != nil {
tc.logger.Error("failed to fetch weeks's top posts",
tw.logger.Error("failed to fetch weeks's top posts",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return
return err
}
tc.logger.Debug("loaded weeks's top posts",
tw.logger.Debug("loaded weeks's top posts",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("count", tps.Count),
)
if tps.Count == 0 {
tc.logger.Debug("no top posts, bailing early",
tw.logger.Debug("no top posts, bailing early",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return
return nil
}
if tps.Count < 20 {
tc.logger.Debug("no top posts, bailing early",
tw.logger.Debug("no top posts, bailing early",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("count", tps.Count),
)
return
return nil
}
sort.SliceStable(tps.Children, func(i, j int) bool {
@ -215,7 +146,7 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) {
middlePost := tps.Count / 2
medianScore := tps.Children[middlePost].Score
tc.logger.Debug("calculated median score",
tw.logger.Debug("calculated median score",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("score", medianScore),
@ -224,18 +155,18 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) {
// Grab hot posts and filter out anything that's > 2 days old
i = rand.Intn(len(watchers))
watcher = watchers[i]
rac = tc.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken)
rac = tw.reddit.NewAuthenticatedClient(watcher.Account.AccountID, watcher.Account.RefreshToken, watcher.Account.AccessToken)
hps, err := rac.SubredditHot(ctx, subreddit.Name, reddit.WithQuery("show", "all"), reddit.WithQuery("always_show_media", "1"))
if err != nil {
tc.logger.Error("failed to fetch hot posts",
tw.logger.Error("failed to fetch hot posts",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return
return err
}
tc.logger.Debug("loaded hot posts",
tw.logger.Debug("loaded hot posts",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int("count", hps.Count),
@ -263,10 +194,10 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) {
}
lockKey := fmt.Sprintf("watcher:trending:%d:%s", watcher.DeviceID, post.ID)
notified, _ := tc.redis.Get(ctx, lockKey).Bool()
notified, _ := tw.redis.Get(ctx, lockKey).Bool()
if notified {
tc.logger.Debug("already notified, skipping",
tw.logger.Debug("already notified, skipping",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
@ -275,29 +206,24 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) {
continue
}
tc.redis.SetEX(ctx, lockKey, true, 48*time.Hour)
tw.redis.SetEX(ctx, lockKey, true, 48*time.Hour)
if err := tc.watcherRepo.IncrementHits(ctx, watcher.ID); err != nil {
tc.logger.Error("could not increment hits",
if err := tw.watcherRepo.IncrementHits(ctx, watcher.ID); err != nil {
tw.logger.Error("could not increment hits",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
)
return
return err
}
notification.DeviceToken = watcher.Device.APNSToken
client := tc.apnsProduction
if watcher.Device.Sandbox {
client = tc.apnsSandbox
}
res, err := client.Push(notification)
res, err := tw.apns.Push(notification)
if err != nil {
_ = tc.statsd.Incr("apns.notification.errors", []string{}, 1)
tc.logger.Error("failed to send notification",
_ = tw.statsd.Incr("apns.notification.errors", []string{}, 1)
tw.logger.Error("failed to send notification",
zap.Error(err),
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
@ -305,8 +231,8 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) {
zap.String("apns", watcher.Device.APNSToken),
)
} else if !res.Sent() {
_ = tc.statsd.Incr("apns.notification.errors", []string{}, 1)
tc.logger.Error("notification not sent",
_ = tw.statsd.Incr("apns.notification.errors", []string{}, 1)
tw.logger.Error("notification not sent",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.String("post#id", post.ID),
@ -315,8 +241,8 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) {
zap.String("response#reason", res.Reason),
)
} else {
_ = tc.statsd.Incr("apns.notification.sent", []string{}, 1)
tc.logger.Info("sent notification",
_ = tw.statsd.Incr("apns.notification.sent", []string{}, 1)
tw.logger.Info("sent notification",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
zap.String("post#id", post.ID),
@ -327,10 +253,12 @@ func (tc *trendingConsumer) Consume(delivery rmq.Delivery) {
}
}
tc.logger.Debug("finishing job",
tw.logger.Debug("finishing job",
zap.Int64("subreddit#id", id),
zap.String("subreddit#name", subreddit.NormalizedName()),
)
return nil
}
func payloadFromTrendingPost(post *reddit.Thing) *payload.Payload {

View file

@ -1,332 +0,0 @@
package worker
import (
"context"
"fmt"
"math/rand"
"os"
"strconv"
"strings"
"github.com/DataDog/datadog-go/statsd"
"github.com/adjust/rmq/v5"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sideshow/apns2"
"github.com/sideshow/apns2/payload"
"github.com/sideshow/apns2/token"
"go.uber.org/zap"
"github.com/christianselig/apollo-backend/internal/domain"
"github.com/christianselig/apollo-backend/internal/reddit"
"github.com/christianselig/apollo-backend/internal/repository"
)
type usersWorker struct {
context.Context
logger *zap.Logger
statsd *statsd.Client
db *pgxpool.Pool
redis *redis.Client
queue rmq.Connection
reddit *reddit.Client
apns *token.Token
consumers int
accountRepo domain.AccountRepository
deviceRepo domain.DeviceRepository
userRepo domain.UserRepository
watcherRepo domain.WatcherRepository
}
const userNotificationTitleFormat = "👨\u200d🚀 %s"
func NewUsersWorker(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, db *pgxpool.Pool, redis *redis.Client, queue rmq.Connection, consumers int) Worker {
reddit := reddit.NewClient(
os.Getenv("REDDIT_CLIENT_ID"),
os.Getenv("REDDIT_CLIENT_SECRET"),
statsd,
redis,
consumers,
)
var apns *token.Token
{
authKey, err := token.AuthKeyFromFile(os.Getenv("APPLE_KEY_PATH"))
if err != nil {
panic(err)
}
apns = &token.Token{
AuthKey: authKey,
KeyID: os.Getenv("APPLE_KEY_ID"),
TeamID: os.Getenv("APPLE_TEAM_ID"),
}
}
return &usersWorker{
ctx,
logger,
statsd,
db,
redis,
queue,
reddit,
apns,
consumers,
repository.NewPostgresAccount(db),
repository.NewPostgresDevice(db),
repository.NewPostgresUser(db),
repository.NewPostgresWatcher(db),
}
}
func (uw *usersWorker) Start() error {
queue, err := uw.queue.OpenQueue("users")
if err != nil {
return err
}
uw.logger.Info("starting up subreddits worker", zap.Int("consumers", uw.consumers))
prefetchLimit := int64(uw.consumers * 2)
if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil {
return err
}
host, _ := os.Hostname()
for i := 0; i < uw.consumers; i++ {
name := fmt.Sprintf("consumer %s-%d", host, i)
consumer := NewUsersConsumer(uw, i)
if _, err := queue.AddConsumer(name, consumer); err != nil {
return err
}
}
return nil
}
func (uw *usersWorker) Stop() {
<-uw.queue.StopAllConsuming() // wait for all Consume() calls to finish
}
type usersConsumer struct {
*usersWorker
tag int
apnsSandbox *apns2.Client
apnsProduction *apns2.Client
}
func NewUsersConsumer(uw *usersWorker, tag int) *usersConsumer {
return &usersConsumer{
uw,
tag,
apns2.NewTokenClient(uw.apns),
apns2.NewTokenClient(uw.apns).Production(),
}
}
func (uc *usersConsumer) Consume(delivery rmq.Delivery) {
ctx, cancel := context.WithCancel(uc)
defer cancel()
id, err := strconv.ParseInt(delivery.Payload(), 10, 64)
if err != nil {
uc.logger.Error("failed to parse subreddit id from payload", zap.Error(err), zap.String("payload", delivery.Payload()))
_ = delivery.Reject()
return
}
uc.logger.Debug("starting job", zap.Int64("subreddit#id", id))
defer func() { _ = delivery.Ack() }()
user, err := uc.userRepo.GetByID(ctx, id)
if err != nil {
uc.logger.Error("failed to fetch user from database", zap.Error(err), zap.Int64("subreddit#id", id))
return
}
watchers, err := uc.watcherRepo.GetByUserID(ctx, user.ID)
if err != nil {
uc.logger.Error("failed to fetch watchers from database",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
return
}
if len(watchers) == 0 {
uc.logger.Debug("no watchers for user, bailing early",
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
return
}
// Load 25 newest posts
i := rand.Intn(len(watchers))
watcher := watchers[i]
acc, _ := uc.accountRepo.GetByID(ctx, watcher.AccountID)
rac := uc.reddit.NewAuthenticatedClient(acc.AccountID, acc.RefreshToken, acc.AccessToken)
ru, err := rac.UserAbout(ctx, user.Name)
if err != nil {
uc.logger.Error("failed to fetch user details",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
return
}
if !ru.AcceptFollowers {
uc.logger.Info("user disabled followers, removing",
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
if err := uc.watcherRepo.DeleteByTypeAndWatcheeID(ctx, domain.UserWatcher, user.ID); err != nil {
uc.logger.Error("failed to remove watchers for user who disallows followers",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
return
}
if err := uc.userRepo.Delete(ctx, user.ID); err != nil {
uc.logger.Error("failed to remove user",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
return
}
}
posts, err := rac.UserPosts(ctx, user.Name)
if err != nil {
uc.logger.Error("failed to fetch user activity",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
return
}
for _, post := range posts.Children {
lowcaseSubreddit := strings.ToLower(post.Subreddit)
if post.SubredditType == "private" {
continue
}
notifs := []domain.Watcher{}
for _, watcher := range watchers {
// Make sure we only alert on activities created after the search
if watcher.CreatedAt.After(post.CreatedAt) {
continue
}
if watcher.LastNotifiedAt.After(post.CreatedAt) {
continue
}
if watcher.Subreddit != "" && lowcaseSubreddit != watcher.Subreddit {
continue
}
notifs = append(notifs, watcher)
}
if len(notifs) == 0 {
continue
}
payload := payloadFromUserPost(post)
notification := &apns2.Notification{}
notification.Topic = "com.christianselig.Apollo"
for _, watcher := range notifs {
if err := uc.watcherRepo.IncrementHits(ctx, watcher.ID); err != nil {
uc.logger.Error("failed to increment watcher hits",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
zap.Int64("watcher#id", watcher.ID),
)
return
}
device, _ := uc.deviceRepo.GetByID(ctx, watcher.DeviceID)
title := fmt.Sprintf(userNotificationTitleFormat, watcher.Label)
payload.AlertTitle(title)
notification.Payload = payload
notification.DeviceToken = device.APNSToken
client := uc.apnsProduction
if device.Sandbox {
client = uc.apnsSandbox
}
res, err := client.Push(notification)
if err != nil || !res.Sent() {
_ = uc.statsd.Incr("apns.notification.errors", []string{}, 1)
uc.logger.Error("failed to send notification",
zap.Error(err),
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
zap.String("post#id", post.ID),
zap.String("apns", watcher.Device.APNSToken),
zap.Int("response#status", res.StatusCode),
zap.String("response#reason", res.Reason),
)
} else {
_ = uc.statsd.Incr("apns.notification.sent", []string{}, 1)
uc.logger.Info("sent notification",
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
zap.String("post#id", post.ID),
zap.String("device#token", watcher.Device.APNSToken),
)
}
}
}
uc.logger.Debug("finishing job",
zap.Int64("user#id", id),
zap.String("user#name", user.NormalizedName()),
)
}
func payloadFromUserPost(post *reddit.Thing) *payload.Payload {
payload := payload.
NewPayload().
AlertBody(post.Title).
AlertSubtitle(post.Author).
AlertSummaryArg(post.Author).
Category("user-watch").
Custom("post_title", post.Title).
Custom("post_id", post.ID).
Custom("subreddit", post.Subreddit).
Custom("author", post.Author).
Custom("post_age", post.CreatedAt).
MutableContent().
Sound("traloop.wav")
return payload
}

View file

@ -5,7 +5,6 @@ import (
"time"
"github.com/DataDog/datadog-go/statsd"
"github.com/adjust/rmq/v5"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v4/pgxpool"
"go.uber.org/zap"
@ -13,8 +12,7 @@ import (
const pollDuration = 100 * time.Millisecond
type NewWorkerFn func(context.Context, *zap.Logger, *statsd.Client, *pgxpool.Pool, *redis.Client, rmq.Connection, int) Worker
type NewWorkerFn func(context.Context, *zap.Logger, *statsd.Client, *pgxpool.Pool, *redis.Client, int) Worker
type Worker interface {
Start() error
Stop()
Process(ctx context.Context, args ...interface{}) error
}

View file

@ -70,7 +70,7 @@ services:
disk:
name: faktory-data
mountPath: /var/lib/faktory
sizeGB: 20
sizeGB: 10
# API
- type: web