From 0435c1ea95f5b9aa7b9fa2211cce8b05adbb03f1 Mon Sep 17 00:00:00 2001 From: Correl Date: Thu, 13 Jan 2022 16:58:12 -0500 Subject: [PATCH] updates --- 20200713191259-slaa.org | 2 +- daily/2022-01-13.org | 180 ++++++++++++++++++ .../replaytags.py | 89 +++++++++ .../replaytags2.py | 144 ++++++++++++++ 4 files changed, 414 insertions(+), 1 deletion(-) create mode 100644 daily/2022-01-13.org create mode 100644 daily/data/da/111b28-6414-48ac-af46-e57342042b98/replaytags.py create mode 100644 daily/data/da/111b28-6414-48ac-af46-e57342042b98/replaytags2.py diff --git a/20200713191259-slaa.org b/20200713191259-slaa.org index 42df036..8f98590 100644 --- a/20200713191259-slaa.org +++ b/20200713191259-slaa.org @@ -98,7 +98,7 @@ avoid drawing undue attention to S.L.A.A. as a whole from the public media. trusted servants; they do not govern. 3. The only requirement for S.L.A.A. membership is a desire to stop living out a pattern of sex and love addiction. Any two or more persons gathered together -B for mutual aid in recovering from sex and love addiction may call themselves + for mutual aid in recovering from sex and love addiction may call themselves an S.L.A.A. group, provided that as a group they have no other affiliation. 4. Each group should be autonomous except in matters affecting other groups or S.L.A.A. as a whole. diff --git a/daily/2022-01-13.org b/daily/2022-01-13.org new file mode 100644 index 0000000..e98db5a --- /dev/null +++ b/daily/2022-01-13.org @@ -0,0 +1,180 @@ +:PROPERTIES: +:ID: da111b28-6414-48ac-af46-e57342042b98 +:END: +#+title: 2022-01-13 +* Fixing subscribers with untriggered campaigns :ATTACH: +Campaigns failed to trigger due to [[id:ac416861-ce45-49ac-8b60-f8ea39362135][Migration to common RabbitMQ]] of the +tagpublisher consumer. Because that consumer publishes tag events with the +routing key =recipient.tagged= that were previously only published to AWS +rabbit, they never reached the campaign engine consumer (which is still on AWS). + +The migration was performed at [2022-01-12 Wed 10:00], and the issue was +corrected with shoveling changes at [2022-01-13 Thu 10:00]. Subscribers created +during that window need events republished. + +#+CAPTION: Fetch events from Athena +#+begin_src sql :exports code :eval never + select s.* + from "events.subscribe_v1" as s + where s.partition_0 = '2022' + and s.partition_1 = '01' + and s.partition_2 = '12' + and s.partition_3 = '10' + order by s.timestamp + limit 5 +#+end_src + +#+CAPTION: First subscriber +#+name: first +#+begin_src http :pretty :select .value :exports both :cache yes + GET https://mapping.aweberprod.com/cdb8ea96-fadb-4305-acf8-c541cdf7d954 +#+end_src + +#+RESULTS[2f54d67cd896474b3a8aa3fd3dc8cf1131902670]: first +: 3791573768 + +#+CAPTION: Last subscriber +#+name: last +#+begin_src http :pretty :select .value :exports both :cache yes + GET https://mapping.aweberprod.com/7ecf9e65-4a05-4bc5-9c49-d2449c48f5a9 +#+end_src + +#+RESULTS[ffb7ff9ab6861dd1032b64321116de3a8f5f0f49]: last +: 3791972436 + +#+HEADER: :cache yes :eval no-export +#+HEADER: :engine postgresql +#+HEADER: :dbhost app.service.production.consul +#+HEADER: :dbuser cp_aweber +#+HEADER: :database app-txn +#+HEADER: :dbpassword mgoQKqV6ztMap8TvL9UuiXx3b27P3DRCGkwnE6jrhy4dfc2eYN +#+begin_src sql + select count(*) from public.subscriber_tags + where subscriber_id BETWEEN 3791573768 and 3791972436 + AND tags != '{}' + LIMIT 1 +#+end_src + +#+RESULTS[2bff9d72eb19f37c2ce29cd2cadec12e0284a2c7]: +| count | +|-------| +| 96418 | + +#+HEADER: :var dsn="postgresql://cp_aweber:mgoQKqV6ztMap8TvL9UuiXx3b27P3DRCGkwnE6jrhy4dfc2eYN@app.service.production.consul/app-txn" +#+HEADER: :var amqp_url="amqp://admin:rabbitmq@rabbitmq.aweberprod.com:5672/%2F" +#+begin_src python :results output :eval never + import datetime + import io + import time + import uuid + + import fastavro + import pika + import psycopg + import pytz + import requests + + DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum' + schema = fastavro.parse_schema( + requests.get("http://schema.aweberprod.com/avro/tag.v1.avsc").json() + ) + + correlation_id = "28c8e007-adad-4ad6-9dab-7caafe5ab126" + timestamp = str(datetime.datetime.utcnow().replace(microsecond=0, tzinfo=pytz.UTC)) + + + def events(): + with psycopg.connect(dsn) as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT accounts.account, lists.list, subs.id, tags.tags + FROM public.subscriber_tags AS tags + JOIN list.subscribers AS subs ON (tags.subscriber_id = subs.id) + JOIN accounts ON (accounts.a_id = subs.account_id) + JOIN autoresponders AS lists ON (lists.a_serv_id = subs.list_id) + WHERE tags.subscriber_id BETWEEN 3791573768 AND 3791972436 + AND tags != '{}' + """ + ) + for row in cur: + account_uuid, list_uuid, subscriber_id, tags = row + mapping = requests.get( + f"https://mapping.aweberprod.com/subscriber/{subscriber_id}" + ) + if mapping.status_code != 200: + print(f"Error looking up mapping for subscriber {subscriber_id}") + continue + subscriber_uuid = mapping.json()["id"] + for tag in tags: + body = { + "account": str(account_uuid), + "list": str(list_uuid), + "recipient": str(subscriber_uuid), + "timestamp": timestamp, + "id": "", + "label": tag, + } + properties = { + "app_id": 'correl/1.0.0', + "correlation_id": correlation_id, + "message_id": uuid.uuid4().hex, + "type": "tag.v1", + "content_type": DATUM_MIME_TYPE, + "timestamp": int(time.time()), + "priority": 1 + } + yield body, properties + + conn = pika.BlockingConnection( + pika.URLParameters(amqp_url) + ) + channel = conn.channel() + sent = 0 + for body, properties in events(): + stream = io.BytesIO() + fastavro.schemaless_writer( + stream, schema, body + ) + payload = stream.getvalue() + channel.basic_publish( + "events", + "recipient.tagged", + payload, + pika.BasicProperties(**properties), + ) + sent += 1 + conn.close() + + print(f"Sent {sent} tagging events.") +#+end_src + +#+RESULTS: +: Sent 33336 tagging events. + +Looking up the last subscriber processed in Athena. +#+begin_src sql :eval never + select * + from "events.tag_v1" as s + where s.partition_0 = '2022' + and s.partition_1 = '01' + and s.partition_2 = '13' + and s.amqp_message.app_id = 'correl/1.0.0' + order by s.amqp_message.timestamp desc + limit 1 +#+end_src + +Editing the script and resuming from the last subscriber processed. + +[[file:data/da/111b28-6414-48ac-af46-e57342042b98/replaytags.py]] + +Updated again to speed it up with concurrent mapping calls. + +[[file:data/da/111b28-6414-48ac-af46-e57342042b98/replaytags2.py]] + +#+begin_example +Emitting tag.v1 events [####################################] 67018/67018 100% +Sent 159801 tagging events. +#+end_example + +Success. diff --git a/daily/data/da/111b28-6414-48ac-af46-e57342042b98/replaytags.py b/daily/data/da/111b28-6414-48ac-af46-e57342042b98/replaytags.py new file mode 100644 index 0000000..d11b725 --- /dev/null +++ b/daily/data/da/111b28-6414-48ac-af46-e57342042b98/replaytags.py @@ -0,0 +1,89 @@ +import datetime +import io +import time +import uuid + +import click +import fastavro +import pika +import psycopg +import pytz +import requests + +dsn = "postgresql://cp_aweber:mgoQKqV6ztMap8TvL9UuiXx3b27P3DRCGkwnE6jrhy4dfc2eYN@app.service.production.consul/app-txn" +amqp_url = "amqp://admin:rabbitmq@rabbitmq.aweberprod.com:5672/%2F" +DATUM_MIME_TYPE = "application/vnd.apache.avro.datum" +schema = fastavro.parse_schema( + requests.get("http://schema.aweberprod.com/avro/tag.v1.avsc").json() +) + +correlation_id = "28c8e007-adad-4ad6-9dab-7caafe5ab126" +timestamp = str(datetime.datetime.utcnow().replace(microsecond=0, tzinfo=pytz.UTC)) + + +def subscribers(): + with psycopg.connect(dsn) as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT accounts.account, lists.list, subs.id, tags.tags + FROM public.subscriber_tags AS tags + JOIN list.subscribers AS subs ON (tags.subscriber_id = subs.id) + JOIN accounts ON (accounts.a_id = subs.account_id) + JOIN autoresponders AS lists ON (lists.a_serv_id = subs.list_id) + WHERE tags.subscriber_id BETWEEN 3791592052 AND 3791972436 + AND tags != '{}' + """ + ) + for row in cur: + yield row + + +def events(row): + account_uuid, list_uuid, subscriber_id, tags = row + mapping = requests.get(f"https://mapping.aweberprod.com/subscriber/{subscriber_id}") + if mapping.status_code != 200: + print(f"Error looking up mapping for subscriber {subscriber_id}") + return + subscriber_uuid = mapping.json()["id"] + for tag in tags: + body = { + "account": str(account_uuid), + "list": str(list_uuid), + "recipient": str(subscriber_uuid), + "timestamp": timestamp, + "id": "", + "label": tag, + } + properties = { + "app_id": "correl/1.0.0", + "correlation_id": correlation_id, + "message_id": uuid.uuid4().hex, + "type": "tag.v1", + "content_type": DATUM_MIME_TYPE, + "timestamp": int(time.time()), + "priority": 1, + } + yield body, properties + + +conn = pika.BlockingConnection(pika.URLParameters(amqp_url)) +channel = conn.channel() +sent = 0 + +with click.progressbar(list(subscribers()), show_pos=True) as subscribers: + for row in subscribers: + for body, properties in events(row): + stream = io.BytesIO() + fastavro.schemaless_writer(stream, schema, body) + payload = stream.getvalue() + channel.basic_publish( + "events", + "recipient.tagged", + payload, + pika.BasicProperties(**properties), + ) + sent += 1 +conn.close() + +print(f"Sent {sent} tagging events.") diff --git a/daily/data/da/111b28-6414-48ac-af46-e57342042b98/replaytags2.py b/daily/data/da/111b28-6414-48ac-af46-e57342042b98/replaytags2.py new file mode 100644 index 0000000..faef78b --- /dev/null +++ b/daily/data/da/111b28-6414-48ac-af46-e57342042b98/replaytags2.py @@ -0,0 +1,144 @@ +import asyncio +import datetime +import io +import itertools +import time +import typing +import uuid + +import aiohttp +import click # type: ignore +import fastavro +import pika # type: ignore +import psycopg +import pytz # type: ignore +import requests # type: ignore + +dsn = "postgresql://cp_aweber:mgoQKqV6ztMap8TvL9UuiXx3b27P3DRCGkwnE6jrhy4dfc2eYN@app.service.production.consul/app-txn" +amqp_url = "amqp://admin:rabbitmq@rabbitmq.aweberprod.com:5672/%2F" +DATUM_MIME_TYPE = "application/vnd.apache.avro.datum" +schema = fastavro.parse_schema( + requests.get("http://schema.aweberprod.com/avro/tag.v1.avsc").json() +) + +correlation_id = "28c8e007-adad-4ad6-9dab-7caafe5ab126" + + +UnmappedSubscriber = typing.Tuple[uuid.UUID, uuid.UUID, int, typing.List[str]] +MappedSubscriber = typing.Tuple[uuid.UUID, uuid.UUID, uuid.UUID, typing.List[str]] + + +def subscribers() -> typing.Iterable[UnmappedSubscriber]: + with psycopg.connect(dsn) as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT accounts.account, lists.list, subs.id, tags.tags + FROM public.subscriber_tags AS tags + JOIN list.subscribers AS subs ON (tags.subscriber_id = subs.id) + JOIN accounts ON (accounts.a_id = subs.account_id) + JOIN autoresponders AS lists ON (lists.a_serv_id = subs.list_id) + WHERE tags.subscriber_id BETWEEN 3791680047 AND 3791972436 + AND tags != '{}' + """ + ) + for row in cur: + yield row # type: ignore + + +async def map_subscriber( + session: aiohttp.ClientSession, subscriber: UnmappedSubscriber +) -> typing.Optional[MappedSubscriber]: + account_uuid, list_uuid, subscriber_id, tags = subscriber + mapping = await session.get( + f"https://mapping.aweberprod.com/subscriber/{subscriber_id}" + ) + if mapping.status != 200: + print(f"Error looking up mapping for subscriber {subscriber_id}") + return None + subscriber_uuid = (await mapping.json())["id"] + return account_uuid, list_uuid, subscriber_uuid, tags + + +def chunks(iterable: typing.Iterable, chunk_size=1) -> typing.Iterable: + """Chunk an iterable. + e.g.: [1, 2, 3, 4, 5] -> [[1, 2, 3], [4, 5]] + """ + it = iter(iterable) + chunk = tuple(itertools.islice(it, chunk_size)) + while chunk: + yield chunk + chunk = tuple(itertools.islice(it, chunk_size)) + + +async def mappedsubscribers( + subscribers: typing.Iterable[UnmappedSubscriber], +) -> typing.AsyncIterable[MappedSubscriber]: + async with aiohttp.ClientSession() as session: + for group in chunks(subscribers, 5): + mapped = await asyncio.gather( + *[map_subscriber(session, sub) for sub in group] + ) + for subscriber in mapped: + if subscriber is not None: + yield subscriber + + +def events(subscriber: MappedSubscriber) -> typing.Iterable[typing.Tuple[dict, dict]]: + account_uuid, list_uuid, subscriber_uuid, tags = subscriber + timestamp = str(datetime.datetime.utcnow().replace(microsecond=0, tzinfo=pytz.UTC)) + + for tag in tags: + body = { + "account": str(account_uuid), + "list": str(list_uuid), + "recipient": str(subscriber_uuid), + "timestamp": timestamp, + "id": "", + "label": tag, + } + properties = { + "app_id": "correl/1.0.0", + "correlation_id": correlation_id, + "message_id": uuid.uuid4().hex, + "type": "tag.v1", + "content_type": DATUM_MIME_TYPE, + "timestamp": int(time.time()), + "priority": 1, + } + yield body, properties + + +async def main() -> None: + conn = pika.BlockingConnection(pika.URLParameters(amqp_url)) + channel = conn.channel() + sent = 0 + + unmapped = list(subscribers()) + with click.progressbar( + length=len(unmapped), + label="Emitting tag.v1 events", + show_percent=True, + show_pos=True, + ) as bar: + async for subscriber in mappedsubscribers(unmapped): + account_uuid, list_uuid, subscriber_id, tags = subscriber + for body, properties in events(subscriber): + stream = io.BytesIO() + fastavro.schemaless_writer(stream, schema, body) + payload = stream.getvalue() + channel.basic_publish( + "events", + "recipient.tagged", + payload, + pika.BasicProperties(**properties), + ) + sent += 1 + + bar.update(1) + + conn.close() + print(f"Sent {sent} tagging events.") + + +asyncio.run(main())