roam/daily/2022-01-13.org
2022-01-13 17:49:46 -05:00

182 lines
5.9 KiB
Org Mode

:PROPERTIES:
:ID: da111b28-6414-48ac-af46-e57342042b98
:END:
#+title: 2022-01-13
* Fixing subscribers with untriggered campaigns :ATTACH:
Campaigns failed to trigger due to [[id:ac416861-ce45-49ac-8b60-f8ea39362135][Migration to common RabbitMQ]] of the
tagpublisher consumer. Because that consumer publishes tag events with the
routing key =recipient.tagged= that were previously only published to AWS
rabbit, they never reached the campaign engine consumer (which is still on AWS).
The migration was performed at [2022-01-12 Wed 10:00], and the issue was
corrected with shoveling changes at [2022-01-13 Thu 10:00]. Subscribers created
during that window need events republished.
#+CAPTION: Fetch events from Athena
#+begin_src sql :exports code :eval never
select s.*
from "events.subscribe_v1" as s
where s.partition_0 = '2022'
and s.partition_1 = '01'
and s.partition_2 = '12'
and s.partition_3 = '10'
order by s.timestamp
limit 5
#+end_src
#+CAPTION: First subscriber
#+name: first
#+begin_src http :pretty :select .value :exports both :cache yes
GET https://mapping.aweberprod.com/cdb8ea96-fadb-4305-acf8-c541cdf7d954
#+end_src
#+RESULTS[2f54d67cd896474b3a8aa3fd3dc8cf1131902670]: first
: 3791573768
#+CAPTION: Last subscriber
#+name: last
#+begin_src http :pretty :select .value :exports both :cache yes
GET https://mapping.aweberprod.com/7ecf9e65-4a05-4bc5-9c49-d2449c48f5a9
#+end_src
#+RESULTS[ffb7ff9ab6861dd1032b64321116de3a8f5f0f49]: last
: 3791972436
#+HEADER: :cache yes :eval no-export
#+HEADER: :engine postgresql
#+HEADER: :dbhost app.service.production.consul
#+HEADER: :dbuser cp_aweber
#+HEADER: :database app-txn
#+HEADER: :dbpassword mgoQKqV6ztMap8TvL9UuiXx3b27P3DRCGkwnE6jrhy4dfc2eYN
#+begin_src sql
select count(*) from public.subscriber_tags
where subscriber_id BETWEEN 3791573768 and 3791972436
AND tags != '{}'
LIMIT 1
#+end_src
#+RESULTS[2bff9d72eb19f37c2ce29cd2cadec12e0284a2c7]:
| count |
|-------|
| 96418 |
#+HEADER: :var dsn="postgresql://cp_aweber:mgoQKqV6ztMap8TvL9UuiXx3b27P3DRCGkwnE6jrhy4dfc2eYN@app.service.production.consul/app-txn"
#+HEADER: :var amqp_url="amqp://admin:rabbitmq@rabbitmq.aweberprod.com:5672/%2F"
#+begin_src python :results output :eval never
import datetime
import io
import time
import uuid
import fastavro
import pika
import psycopg
import pytz
import requests
DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum'
schema = fastavro.parse_schema(
requests.get("http://schema.aweberprod.com/avro/tag.v1.avsc").json()
)
correlation_id = "28c8e007-adad-4ad6-9dab-7caafe5ab126"
timestamp = str(datetime.datetime.utcnow().replace(microsecond=0, tzinfo=pytz.UTC))
def events():
with psycopg.connect(dsn) as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT accounts.account, lists.list, subs.id, tags.tags
FROM public.subscriber_tags AS tags
JOIN list.subscribers AS subs ON (tags.subscriber_id = subs.id)
JOIN accounts ON (accounts.a_id = subs.account_id)
JOIN autoresponders AS lists ON (lists.a_serv_id = subs.list_id)
WHERE tags.subscriber_id BETWEEN 3791573768 AND 3791972436
AND tags != '{}'
"""
)
for row in cur:
account_uuid, list_uuid, subscriber_id, tags = row
mapping = requests.get(
f"https://mapping.aweberprod.com/subscriber/{subscriber_id}"
)
if mapping.status_code != 200:
print(f"Error looking up mapping for subscriber {subscriber_id}")
continue
subscriber_uuid = mapping.json()["id"]
for tag in tags:
body = {
"account": str(account_uuid),
"list": str(list_uuid),
"recipient": str(subscriber_uuid),
"timestamp": timestamp,
"id": "",
"label": tag,
}
properties = {
"app_id": 'correl/1.0.0',
"correlation_id": correlation_id,
"message_id": uuid.uuid4().hex,
"type": "tag.v1",
"content_type": DATUM_MIME_TYPE,
"timestamp": int(time.time()),
"priority": 1
}
yield body, properties
conn = pika.BlockingConnection(
pika.URLParameters(amqp_url)
)
channel = conn.channel()
sent = 0
for body, properties in events():
stream = io.BytesIO()
fastavro.schemaless_writer(
stream, schema, body
)
payload = stream.getvalue()
channel.basic_publish(
"events",
"recipient.tagged",
payload,
pika.BasicProperties(**properties),
)
sent += 1
conn.close()
print(f"Sent {sent} tagging events.")
#+end_src
#+RESULTS:
: Sent 33336 tagging events.
Looking up the last subscriber processed in Athena.
#+begin_src sql :eval never
select *
from "events.tag_v1" as s
where s.partition_0 = '2022'
and s.partition_1 = '01'
and s.partition_2 = '13'
and s.amqp_message.app_id = 'correl/1.0.0'
order by s.amqp_message.timestamp desc
limit 1
#+end_src
Editing the script and resuming from the last subscriber processed.
[[file:data/da/111b28-6414-48ac-af46-e57342042b98/replaytags.py]]
Updated again to speed it up with concurrent mapping calls.
[[file:data/da/111b28-6414-48ac-af46-e57342042b98/replaytags2.py]]
#+begin_example
Emitting tag.v1 events [####################################] 67018/67018 100%
Sent 159801 tagging events.
#+end_example
Success.
.... #&$*(@&#. UTC. Missed the last five hours of events that need replaying.