roam/daily/2022-01-13.org
2022-01-13 17:49:46 -05:00

5.9 KiB

2022-01-13

Fixing subscribers with untriggered campaigns   ATTACH

Campaigns failed to trigger due to Migration to common RabbitMQ of the tagpublisher consumer. Because that consumer publishes tag events with the routing key recipient.tagged that were previously only published to AWS rabbit, they never reached the campaign engine consumer (which is still on AWS).

The migration was performed at [2022-01-12 Wed 10:00], and the issue was corrected with shoveling changes at [2022-01-13 Thu 10:00]. Subscribers created during that window need events republished.

  select s.*
  from "events.subscribe_v1" as s
  where s.partition_0 = '2022'
  and s.partition_1 = '01'
  and s.partition_2 = '12'
  and s.partition_3 = '10'
  order by s.timestamp
  limit 5
Fetch events from Athena
  GET https://mapping.aweberprod.com/cdb8ea96-fadb-4305-acf8-c541cdf7d954
3791573768
  GET https://mapping.aweberprod.com/7ecf9e65-4a05-4bc5-9c49-d2449c48f5a9
3791972436
  select count(*) from public.subscriber_tags
  where subscriber_id BETWEEN 3791573768 and 3791972436
  AND tags != '{}'
  LIMIT 1
count
96418
  import datetime
  import io
  import time
  import uuid

  import fastavro
  import pika
  import psycopg
  import pytz
  import requests

  DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum'
  schema = fastavro.parse_schema(
      requests.get("http://schema.aweberprod.com/avro/tag.v1.avsc").json()
  )

  correlation_id = "28c8e007-adad-4ad6-9dab-7caafe5ab126"
  timestamp = str(datetime.datetime.utcnow().replace(microsecond=0, tzinfo=pytz.UTC))


  def events():
      with psycopg.connect(dsn) as conn:
          with conn.cursor() as cur:
              cur.execute(
                  """
                  SELECT accounts.account, lists.list, subs.id, tags.tags
                  FROM public.subscriber_tags AS tags
                  JOIN list.subscribers AS subs ON (tags.subscriber_id = subs.id)
                  JOIN accounts ON (accounts.a_id = subs.account_id)
                  JOIN autoresponders AS lists ON (lists.a_serv_id = subs.list_id)
                  WHERE tags.subscriber_id BETWEEN 3791573768 AND 3791972436
                  AND tags != '{}'
              """
              )
              for row in cur:
                  account_uuid, list_uuid, subscriber_id, tags = row
                  mapping = requests.get(
                      f"https://mapping.aweberprod.com/subscriber/{subscriber_id}"
                  )
                  if mapping.status_code != 200:
                      print(f"Error looking up mapping for subscriber {subscriber_id}")
                      continue
                  subscriber_uuid = mapping.json()["id"]
                  for tag in tags:
                      body = {
                          "account": str(account_uuid),
                          "list": str(list_uuid),
                          "recipient": str(subscriber_uuid),
                          "timestamp": timestamp,
                          "id": "",
                          "label": tag,
                      }
                      properties = {
                          "app_id": 'correl/1.0.0',
                          "correlation_id": correlation_id,
                          "message_id": uuid.uuid4().hex,
                          "type": "tag.v1",
                          "content_type": DATUM_MIME_TYPE,
                          "timestamp": int(time.time()),
                          "priority": 1
                      }
                      yield body, properties

  conn = pika.BlockingConnection(
      pika.URLParameters(amqp_url)
  )
  channel = conn.channel()
  sent = 0
  for body, properties in events():
      stream = io.BytesIO()
      fastavro.schemaless_writer(
          stream, schema, body
      )
      payload = stream.getvalue()
      channel.basic_publish(
          "events",
          "recipient.tagged",
          payload,
          pika.BasicProperties(**properties),
      )
      sent += 1
  conn.close()

  print(f"Sent {sent} tagging events.")
Sent 33336 tagging events.

Looking up the last subscriber processed in Athena.

  select *
  from "events.tag_v1" as s
  where s.partition_0 = '2022'
  and s.partition_1 = '01'
  and s.partition_2 = '13'
  and s.amqp_message.app_id = 'correl/1.0.0'
  order by s.amqp_message.timestamp desc
  limit 1

Editing the script and resuming from the last subscriber processed.

/correlr/roam/src/commit/26fa2f81176946116dc94dd448704178d388fbb0/daily/data/da/111b28-6414-48ac-af46-e57342042b98/replaytags.py

Updated again to speed it up with concurrent mapping calls.

/correlr/roam/src/commit/26fa2f81176946116dc94dd448704178d388fbb0/daily/data/da/111b28-6414-48ac-af46-e57342042b98/replaytags2.py

Emitting tag.v1 events  [####################################]  67018/67018  100%
Sent 159801 tagging events.

Success.

…. #&$*(@&#. UTC. Missed the last five hours of events that need replaying.