5.9 KiB
2022-01-13
Fixing subscribers with untriggered campaigns ATTACH
Campaigns failed to trigger due to Migration to common RabbitMQ of the
tagpublisher consumer. Because that consumer publishes tag events with the
routing key recipient.tagged
that were previously only published to AWS
rabbit, they never reached the campaign engine consumer (which is still on AWS).
The migration was performed at [2022-01-12 Wed 10:00], and the issue was corrected with shoveling changes at [2022-01-13 Thu 10:00]. Subscribers created during that window need events republished.
GET https://mapping.aweberprod.com/cdb8ea96-fadb-4305-acf8-c541cdf7d954
3791573768
GET https://mapping.aweberprod.com/7ecf9e65-4a05-4bc5-9c49-d2449c48f5a9
3791972436
select count(*) from public.subscriber_tags
where subscriber_id BETWEEN 3791573768 and 3791972436
AND tags != '{}'
LIMIT 1
count |
---|
96418 |
import datetime
import io
import time
import uuid
import fastavro
import pika
import psycopg
import pytz
import requests
DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum'
schema = fastavro.parse_schema(
requests.get("http://schema.aweberprod.com/avro/tag.v1.avsc").json()
)
correlation_id = "28c8e007-adad-4ad6-9dab-7caafe5ab126"
timestamp = str(datetime.datetime.utcnow().replace(microsecond=0, tzinfo=pytz.UTC))
def events():
with psycopg.connect(dsn) as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT accounts.account, lists.list, subs.id, tags.tags
FROM public.subscriber_tags AS tags
JOIN list.subscribers AS subs ON (tags.subscriber_id = subs.id)
JOIN accounts ON (accounts.a_id = subs.account_id)
JOIN autoresponders AS lists ON (lists.a_serv_id = subs.list_id)
WHERE tags.subscriber_id BETWEEN 3791573768 AND 3791972436
AND tags != '{}'
"""
)
for row in cur:
account_uuid, list_uuid, subscriber_id, tags = row
mapping = requests.get(
f"https://mapping.aweberprod.com/subscriber/{subscriber_id}"
)
if mapping.status_code != 200:
print(f"Error looking up mapping for subscriber {subscriber_id}")
continue
subscriber_uuid = mapping.json()["id"]
for tag in tags:
body = {
"account": str(account_uuid),
"list": str(list_uuid),
"recipient": str(subscriber_uuid),
"timestamp": timestamp,
"id": "",
"label": tag,
}
properties = {
"app_id": 'correl/1.0.0',
"correlation_id": correlation_id,
"message_id": uuid.uuid4().hex,
"type": "tag.v1",
"content_type": DATUM_MIME_TYPE,
"timestamp": int(time.time()),
"priority": 1
}
yield body, properties
conn = pika.BlockingConnection(
pika.URLParameters(amqp_url)
)
channel = conn.channel()
sent = 0
for body, properties in events():
stream = io.BytesIO()
fastavro.schemaless_writer(
stream, schema, body
)
payload = stream.getvalue()
channel.basic_publish(
"events",
"recipient.tagged",
payload,
pika.BasicProperties(**properties),
)
sent += 1
conn.close()
print(f"Sent {sent} tagging events.")
Sent 33336 tagging events.
Looking up the last subscriber processed in Athena.
select *
from "events.tag_v1" as s
where s.partition_0 = '2022'
and s.partition_1 = '01'
and s.partition_2 = '13'
and s.amqp_message.app_id = 'correl/1.0.0'
order by s.amqp_message.timestamp desc
limit 1
Editing the script and resuming from the last subscriber processed.
Updated again to speed it up with concurrent mapping calls.
Emitting tag.v1 events [####################################] 67018/67018 100% Sent 159801 tagging events.
Success.
…. #&$*(@&#. UTC. Missed the last five hours of events that need replaying.