This commit is contained in:
Correl Roush 2022-01-13 16:58:12 -05:00
parent e409d77566
commit 0435c1ea95
4 changed files with 414 additions and 1 deletions

View file

@ -98,7 +98,7 @@ avoid drawing undue attention to S.L.A.A. as a whole from the public media.
trusted servants; they do not govern.
3. The only requirement for S.L.A.A. membership is a desire to stop living out a
pattern of sex and love addiction. Any two or more persons gathered together
B for mutual aid in recovering from sex and love addiction may call themselves
for mutual aid in recovering from sex and love addiction may call themselves
an S.L.A.A. group, provided that as a group they have no other affiliation.
4. Each group should be autonomous except in matters affecting other groups or
S.L.A.A. as a whole.

180
daily/2022-01-13.org Normal file
View file

@ -0,0 +1,180 @@
:PROPERTIES:
:ID: da111b28-6414-48ac-af46-e57342042b98
:END:
#+title: 2022-01-13
* Fixing subscribers with untriggered campaigns :ATTACH:
Campaigns failed to trigger due to [[id:ac416861-ce45-49ac-8b60-f8ea39362135][Migration to common RabbitMQ]] of the
tagpublisher consumer. Because that consumer publishes tag events with the
routing key =recipient.tagged= that were previously only published to AWS
rabbit, they never reached the campaign engine consumer (which is still on AWS).
The migration was performed at [2022-01-12 Wed 10:00], and the issue was
corrected with shoveling changes at [2022-01-13 Thu 10:00]. Subscribers created
during that window need events republished.
#+CAPTION: Fetch events from Athena
#+begin_src sql :exports code :eval never
select s.*
from "events.subscribe_v1" as s
where s.partition_0 = '2022'
and s.partition_1 = '01'
and s.partition_2 = '12'
and s.partition_3 = '10'
order by s.timestamp
limit 5
#+end_src
#+CAPTION: First subscriber
#+name: first
#+begin_src http :pretty :select .value :exports both :cache yes
GET https://mapping.aweberprod.com/cdb8ea96-fadb-4305-acf8-c541cdf7d954
#+end_src
#+RESULTS[2f54d67cd896474b3a8aa3fd3dc8cf1131902670]: first
: 3791573768
#+CAPTION: Last subscriber
#+name: last
#+begin_src http :pretty :select .value :exports both :cache yes
GET https://mapping.aweberprod.com/7ecf9e65-4a05-4bc5-9c49-d2449c48f5a9
#+end_src
#+RESULTS[ffb7ff9ab6861dd1032b64321116de3a8f5f0f49]: last
: 3791972436
#+HEADER: :cache yes :eval no-export
#+HEADER: :engine postgresql
#+HEADER: :dbhost app.service.production.consul
#+HEADER: :dbuser cp_aweber
#+HEADER: :database app-txn
#+HEADER: :dbpassword mgoQKqV6ztMap8TvL9UuiXx3b27P3DRCGkwnE6jrhy4dfc2eYN
#+begin_src sql
select count(*) from public.subscriber_tags
where subscriber_id BETWEEN 3791573768 and 3791972436
AND tags != '{}'
LIMIT 1
#+end_src
#+RESULTS[2bff9d72eb19f37c2ce29cd2cadec12e0284a2c7]:
| count |
|-------|
| 96418 |
#+HEADER: :var dsn="postgresql://cp_aweber:mgoQKqV6ztMap8TvL9UuiXx3b27P3DRCGkwnE6jrhy4dfc2eYN@app.service.production.consul/app-txn"
#+HEADER: :var amqp_url="amqp://admin:rabbitmq@rabbitmq.aweberprod.com:5672/%2F"
#+begin_src python :results output :eval never
import datetime
import io
import time
import uuid
import fastavro
import pika
import psycopg
import pytz
import requests
DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum'
schema = fastavro.parse_schema(
requests.get("http://schema.aweberprod.com/avro/tag.v1.avsc").json()
)
correlation_id = "28c8e007-adad-4ad6-9dab-7caafe5ab126"
timestamp = str(datetime.datetime.utcnow().replace(microsecond=0, tzinfo=pytz.UTC))
def events():
with psycopg.connect(dsn) as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT accounts.account, lists.list, subs.id, tags.tags
FROM public.subscriber_tags AS tags
JOIN list.subscribers AS subs ON (tags.subscriber_id = subs.id)
JOIN accounts ON (accounts.a_id = subs.account_id)
JOIN autoresponders AS lists ON (lists.a_serv_id = subs.list_id)
WHERE tags.subscriber_id BETWEEN 3791573768 AND 3791972436
AND tags != '{}'
"""
)
for row in cur:
account_uuid, list_uuid, subscriber_id, tags = row
mapping = requests.get(
f"https://mapping.aweberprod.com/subscriber/{subscriber_id}"
)
if mapping.status_code != 200:
print(f"Error looking up mapping for subscriber {subscriber_id}")
continue
subscriber_uuid = mapping.json()["id"]
for tag in tags:
body = {
"account": str(account_uuid),
"list": str(list_uuid),
"recipient": str(subscriber_uuid),
"timestamp": timestamp,
"id": "",
"label": tag,
}
properties = {
"app_id": 'correl/1.0.0',
"correlation_id": correlation_id,
"message_id": uuid.uuid4().hex,
"type": "tag.v1",
"content_type": DATUM_MIME_TYPE,
"timestamp": int(time.time()),
"priority": 1
}
yield body, properties
conn = pika.BlockingConnection(
pika.URLParameters(amqp_url)
)
channel = conn.channel()
sent = 0
for body, properties in events():
stream = io.BytesIO()
fastavro.schemaless_writer(
stream, schema, body
)
payload = stream.getvalue()
channel.basic_publish(
"events",
"recipient.tagged",
payload,
pika.BasicProperties(**properties),
)
sent += 1
conn.close()
print(f"Sent {sent} tagging events.")
#+end_src
#+RESULTS:
: Sent 33336 tagging events.
Looking up the last subscriber processed in Athena.
#+begin_src sql :eval never
select *
from "events.tag_v1" as s
where s.partition_0 = '2022'
and s.partition_1 = '01'
and s.partition_2 = '13'
and s.amqp_message.app_id = 'correl/1.0.0'
order by s.amqp_message.timestamp desc
limit 1
#+end_src
Editing the script and resuming from the last subscriber processed.
[[file:data/da/111b28-6414-48ac-af46-e57342042b98/replaytags.py]]
Updated again to speed it up with concurrent mapping calls.
[[file:data/da/111b28-6414-48ac-af46-e57342042b98/replaytags2.py]]
#+begin_example
Emitting tag.v1 events [####################################] 67018/67018 100%
Sent 159801 tagging events.
#+end_example
Success.

View file

@ -0,0 +1,89 @@
import datetime
import io
import time
import uuid
import click
import fastavro
import pika
import psycopg
import pytz
import requests
dsn = "postgresql://cp_aweber:mgoQKqV6ztMap8TvL9UuiXx3b27P3DRCGkwnE6jrhy4dfc2eYN@app.service.production.consul/app-txn"
amqp_url = "amqp://admin:rabbitmq@rabbitmq.aweberprod.com:5672/%2F"
DATUM_MIME_TYPE = "application/vnd.apache.avro.datum"
schema = fastavro.parse_schema(
requests.get("http://schema.aweberprod.com/avro/tag.v1.avsc").json()
)
correlation_id = "28c8e007-adad-4ad6-9dab-7caafe5ab126"
timestamp = str(datetime.datetime.utcnow().replace(microsecond=0, tzinfo=pytz.UTC))
def subscribers():
with psycopg.connect(dsn) as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT accounts.account, lists.list, subs.id, tags.tags
FROM public.subscriber_tags AS tags
JOIN list.subscribers AS subs ON (tags.subscriber_id = subs.id)
JOIN accounts ON (accounts.a_id = subs.account_id)
JOIN autoresponders AS lists ON (lists.a_serv_id = subs.list_id)
WHERE tags.subscriber_id BETWEEN 3791592052 AND 3791972436
AND tags != '{}'
"""
)
for row in cur:
yield row
def events(row):
account_uuid, list_uuid, subscriber_id, tags = row
mapping = requests.get(f"https://mapping.aweberprod.com/subscriber/{subscriber_id}")
if mapping.status_code != 200:
print(f"Error looking up mapping for subscriber {subscriber_id}")
return
subscriber_uuid = mapping.json()["id"]
for tag in tags:
body = {
"account": str(account_uuid),
"list": str(list_uuid),
"recipient": str(subscriber_uuid),
"timestamp": timestamp,
"id": "",
"label": tag,
}
properties = {
"app_id": "correl/1.0.0",
"correlation_id": correlation_id,
"message_id": uuid.uuid4().hex,
"type": "tag.v1",
"content_type": DATUM_MIME_TYPE,
"timestamp": int(time.time()),
"priority": 1,
}
yield body, properties
conn = pika.BlockingConnection(pika.URLParameters(amqp_url))
channel = conn.channel()
sent = 0
with click.progressbar(list(subscribers()), show_pos=True) as subscribers:
for row in subscribers:
for body, properties in events(row):
stream = io.BytesIO()
fastavro.schemaless_writer(stream, schema, body)
payload = stream.getvalue()
channel.basic_publish(
"events",
"recipient.tagged",
payload,
pika.BasicProperties(**properties),
)
sent += 1
conn.close()
print(f"Sent {sent} tagging events.")

View file

@ -0,0 +1,144 @@
import asyncio
import datetime
import io
import itertools
import time
import typing
import uuid
import aiohttp
import click # type: ignore
import fastavro
import pika # type: ignore
import psycopg
import pytz # type: ignore
import requests # type: ignore
dsn = "postgresql://cp_aweber:mgoQKqV6ztMap8TvL9UuiXx3b27P3DRCGkwnE6jrhy4dfc2eYN@app.service.production.consul/app-txn"
amqp_url = "amqp://admin:rabbitmq@rabbitmq.aweberprod.com:5672/%2F"
DATUM_MIME_TYPE = "application/vnd.apache.avro.datum"
schema = fastavro.parse_schema(
requests.get("http://schema.aweberprod.com/avro/tag.v1.avsc").json()
)
correlation_id = "28c8e007-adad-4ad6-9dab-7caafe5ab126"
UnmappedSubscriber = typing.Tuple[uuid.UUID, uuid.UUID, int, typing.List[str]]
MappedSubscriber = typing.Tuple[uuid.UUID, uuid.UUID, uuid.UUID, typing.List[str]]
def subscribers() -> typing.Iterable[UnmappedSubscriber]:
with psycopg.connect(dsn) as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT accounts.account, lists.list, subs.id, tags.tags
FROM public.subscriber_tags AS tags
JOIN list.subscribers AS subs ON (tags.subscriber_id = subs.id)
JOIN accounts ON (accounts.a_id = subs.account_id)
JOIN autoresponders AS lists ON (lists.a_serv_id = subs.list_id)
WHERE tags.subscriber_id BETWEEN 3791680047 AND 3791972436
AND tags != '{}'
"""
)
for row in cur:
yield row # type: ignore
async def map_subscriber(
session: aiohttp.ClientSession, subscriber: UnmappedSubscriber
) -> typing.Optional[MappedSubscriber]:
account_uuid, list_uuid, subscriber_id, tags = subscriber
mapping = await session.get(
f"https://mapping.aweberprod.com/subscriber/{subscriber_id}"
)
if mapping.status != 200:
print(f"Error looking up mapping for subscriber {subscriber_id}")
return None
subscriber_uuid = (await mapping.json())["id"]
return account_uuid, list_uuid, subscriber_uuid, tags
def chunks(iterable: typing.Iterable, chunk_size=1) -> typing.Iterable:
"""Chunk an iterable.
e.g.: [1, 2, 3, 4, 5] -> [[1, 2, 3], [4, 5]]
"""
it = iter(iterable)
chunk = tuple(itertools.islice(it, chunk_size))
while chunk:
yield chunk
chunk = tuple(itertools.islice(it, chunk_size))
async def mappedsubscribers(
subscribers: typing.Iterable[UnmappedSubscriber],
) -> typing.AsyncIterable[MappedSubscriber]:
async with aiohttp.ClientSession() as session:
for group in chunks(subscribers, 5):
mapped = await asyncio.gather(
*[map_subscriber(session, sub) for sub in group]
)
for subscriber in mapped:
if subscriber is not None:
yield subscriber
def events(subscriber: MappedSubscriber) -> typing.Iterable[typing.Tuple[dict, dict]]:
account_uuid, list_uuid, subscriber_uuid, tags = subscriber
timestamp = str(datetime.datetime.utcnow().replace(microsecond=0, tzinfo=pytz.UTC))
for tag in tags:
body = {
"account": str(account_uuid),
"list": str(list_uuid),
"recipient": str(subscriber_uuid),
"timestamp": timestamp,
"id": "",
"label": tag,
}
properties = {
"app_id": "correl/1.0.0",
"correlation_id": correlation_id,
"message_id": uuid.uuid4().hex,
"type": "tag.v1",
"content_type": DATUM_MIME_TYPE,
"timestamp": int(time.time()),
"priority": 1,
}
yield body, properties
async def main() -> None:
conn = pika.BlockingConnection(pika.URLParameters(amqp_url))
channel = conn.channel()
sent = 0
unmapped = list(subscribers())
with click.progressbar(
length=len(unmapped),
label="Emitting tag.v1 events",
show_percent=True,
show_pos=True,
) as bar:
async for subscriber in mappedsubscribers(unmapped):
account_uuid, list_uuid, subscriber_id, tags = subscriber
for body, properties in events(subscriber):
stream = io.BytesIO()
fastavro.schemaless_writer(stream, schema, body)
payload = stream.getvalue()
channel.basic_publish(
"events",
"recipient.tagged",
payload,
pika.BasicProperties(**properties),
)
sent += 1
bar.update(1)
conn.close()
print(f"Sent {sent} tagging events.")
asyncio.run(main())