:PROPERTIES: :ID: da111b28-6414-48ac-af46-e57342042b98 :END: #+title: 2022-01-13 * Fixing subscribers with untriggered campaigns :ATTACH: Campaigns failed to trigger due to [[id:ac416861-ce45-49ac-8b60-f8ea39362135][Migration to common RabbitMQ]] of the tagpublisher consumer. Because that consumer publishes tag events with the routing key =recipient.tagged= that were previously only published to AWS rabbit, they never reached the campaign engine consumer (which is still on AWS). The migration was performed at [2022-01-12 Wed 10:00], and the issue was corrected with shoveling changes at [2022-01-13 Thu 10:00]. Subscribers created during that window need events republished. #+CAPTION: Fetch events from Athena #+begin_src sql :exports code :eval never select s.* from "events.subscribe_v1" as s where s.partition_0 = '2022' and s.partition_1 = '01' and s.partition_2 = '12' and s.partition_3 = '10' order by s.timestamp limit 5 #+end_src #+CAPTION: First subscriber #+name: first #+begin_src http :pretty :select .value :exports both :cache yes GET https://mapping.aweberprod.com/cdb8ea96-fadb-4305-acf8-c541cdf7d954 #+end_src #+RESULTS[2f54d67cd896474b3a8aa3fd3dc8cf1131902670]: first : 3791573768 #+CAPTION: Last subscriber #+name: last #+begin_src http :pretty :select .value :exports both :cache yes GET https://mapping.aweberprod.com/7ecf9e65-4a05-4bc5-9c49-d2449c48f5a9 #+end_src #+RESULTS[ffb7ff9ab6861dd1032b64321116de3a8f5f0f49]: last : 3791972436 #+HEADER: :cache yes :eval no-export #+HEADER: :engine postgresql #+HEADER: :dbhost app.service.production.consul #+HEADER: :dbuser cp_aweber #+HEADER: :database app-txn #+HEADER: :dbpassword mgoQKqV6ztMap8TvL9UuiXx3b27P3DRCGkwnE6jrhy4dfc2eYN #+begin_src sql select count(*) from public.subscriber_tags where subscriber_id BETWEEN 3791573768 and 3791972436 AND tags != '{}' LIMIT 1 #+end_src #+RESULTS[2bff9d72eb19f37c2ce29cd2cadec12e0284a2c7]: | count | |-------| | 96418 | #+HEADER: :var dsn="postgresql://cp_aweber:mgoQKqV6ztMap8TvL9UuiXx3b27P3DRCGkwnE6jrhy4dfc2eYN@app.service.production.consul/app-txn" #+HEADER: :var amqp_url="amqp://admin:rabbitmq@rabbitmq.aweberprod.com:5672/%2F" #+begin_src python :results output :eval never import datetime import io import time import uuid import fastavro import pika import psycopg import pytz import requests DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum' schema = fastavro.parse_schema( requests.get("http://schema.aweberprod.com/avro/tag.v1.avsc").json() ) correlation_id = "28c8e007-adad-4ad6-9dab-7caafe5ab126" timestamp = str(datetime.datetime.utcnow().replace(microsecond=0, tzinfo=pytz.UTC)) def events(): with psycopg.connect(dsn) as conn: with conn.cursor() as cur: cur.execute( """ SELECT accounts.account, lists.list, subs.id, tags.tags FROM public.subscriber_tags AS tags JOIN list.subscribers AS subs ON (tags.subscriber_id = subs.id) JOIN accounts ON (accounts.a_id = subs.account_id) JOIN autoresponders AS lists ON (lists.a_serv_id = subs.list_id) WHERE tags.subscriber_id BETWEEN 3791573768 AND 3791972436 AND tags != '{}' """ ) for row in cur: account_uuid, list_uuid, subscriber_id, tags = row mapping = requests.get( f"https://mapping.aweberprod.com/subscriber/{subscriber_id}" ) if mapping.status_code != 200: print(f"Error looking up mapping for subscriber {subscriber_id}") continue subscriber_uuid = mapping.json()["id"] for tag in tags: body = { "account": str(account_uuid), "list": str(list_uuid), "recipient": str(subscriber_uuid), "timestamp": timestamp, "id": "", "label": tag, } properties = { "app_id": 'correl/1.0.0', "correlation_id": correlation_id, "message_id": uuid.uuid4().hex, "type": "tag.v1", "content_type": DATUM_MIME_TYPE, "timestamp": int(time.time()), "priority": 1 } yield body, properties conn = pika.BlockingConnection( pika.URLParameters(amqp_url) ) channel = conn.channel() sent = 0 for body, properties in events(): stream = io.BytesIO() fastavro.schemaless_writer( stream, schema, body ) payload = stream.getvalue() channel.basic_publish( "events", "recipient.tagged", payload, pika.BasicProperties(**properties), ) sent += 1 conn.close() print(f"Sent {sent} tagging events.") #+end_src #+RESULTS: : Sent 33336 tagging events. Looking up the last subscriber processed in Athena. #+begin_src sql :eval never select * from "events.tag_v1" as s where s.partition_0 = '2022' and s.partition_1 = '01' and s.partition_2 = '13' and s.amqp_message.app_id = 'correl/1.0.0' order by s.amqp_message.timestamp desc limit 1 #+end_src Editing the script and resuming from the last subscriber processed. [[file:data/da/111b28-6414-48ac-af46-e57342042b98/replaytags.py]] Updated again to speed it up with concurrent mapping calls. [[file:data/da/111b28-6414-48ac-af46-e57342042b98/replaytags2.py]] #+begin_example Emitting tag.v1 events [####################################] 67018/67018 100% Sent 159801 tagging events. #+end_example Success. .... #&$*(@&#. UTC. Missed the last five hours of events that need replaying.