Karan Sharma

Storing AWS Pinpoint Logs

5 minutes (1321 words)

At $dayjob, we use AWS Pinpoint to send out SMS to our customers. We’ve also written a detailed blog post on how we use Clickhouse + vector stack for our logging needs. We additionally wanted to store the delivery logs generated by the Pinpoint service. But like with anything else in AWS, even simpler tasks like these usually tend to piggyback on other counterparts of AWS - in this case, it happens to be AWS Kinesis. All the delivery logs which contain metadata about SMS delivery are streamed to Kinesis.

Our setup involves configuring Pinpoint with Amazon Kinesis Data Firehose stream. Firehose is an ETL service that helps stream events to other persistent stores. Firehose supports multiple such output sinks and in our case we use HTTP sink.

This is what the flow looks like:

Pinpoint -> Kinesis Firehose -> Vector HTTP -> Clickhouse


🔗Ingesting Data

On the HTTP server side, we used vector’s aws_kinesis_firehose source. Compared to just using http source, here are the differences I found:

Here’s the vector config so far:

[sources.firehose]
# General
type = "aws_kinesis_firehose"
address = "127.0.0.1:9000"
store_access_key = false
access_keys = ["superdupersecret"]

# Use it for debugging
[sinks.console]
type = "firehose"
inputs = ["format_pinpoint_logs"]
encoding.codec = "json"

🔗Formatting the data

Now that we have a pipeline which sends and receives data, we can process the events and transform them into a schema that is more desirable. Since we require the events to be queryable in a Clickhouse DB, this is the schema we have:

CREATE TABLE default.pinpoint_logs (
    `_timestamp` DateTime('Asia/Kolkata'),
    `app_id` LowCardinality(String),
    `event_type` LowCardinality(String),
    `record_status` LowCardinality(String),
    `origination_phone_number` String,
    `message_id` String,
    `destination_phone_number` String,
    `arrival_timestamp` DateTime('Asia/Kolkata'),
    `event_timestamp` DateTime('Asia/Kolkata'),
    `meta` Nullable(String)
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(_timestamp)
ORDER BY _timestamp
SETTINGS index_granularity = 8192;

To achieve the above format, we can use VRL to parse and format our SMS events:

[transforms.format_pinpoint_logs]
type = "remap" 
inputs = ["firehose"] 
source = '''
  # Decode the JSON message and set ingestion timestamp
  .message = parse_json!(.message)
  .ingestion_timestamp = .timestamp

  # Convert timestamps from Unix to DateTime
  .event_timestamp = from_unix_timestamp!(.message.event_timestamp, unit:"milliseconds")
  .arrival_timestamp = from_unix_timestamp!(.message.arrival_timestamp, unit:"milliseconds")

  # Extract keys to top level and remove from attributes
  .record_status = del(.message.attributes.record_status)
  .origination_phone_number = del(.message.attributes.origination_phone_number)
  .destination_phone_number = del(.message.attributes.destination_phone_number)
  .message_id = del(.message.attributes.message_id)

  # Encode the remaining attributes as JSON string
  .attr = encode_json(.message.attributes)

  # Format Payload for Clickhouse
  . = {
    "_timestamp": .ingestion_timestamp,
    "arrival_timestamp": .arrival_timestamp,
    "event_timestamp": .event_timestamp,
    "app_id": .message.application.app_id,
    "event_type": .message.event_type,
    "record_status": .record_status,
    "message_id": .message_id,
    "origination_phone_number": .origination_phone_number,
    "destination_phone_number": .destination_phone_number,
    "meta": .attr
  }
'''

Plugging this, we have a clean JSON object for each SMS event. The only thing now we need to add is an output sink to Clickhouse:

[sinks.clickhouse]
type = "clickhouse"
inputs = ["format_pinpoint_logs"]
skip_unknown_fields = true
compression = "gzip"
database = "default"
endpoint = "http://127.0.0.1:8123"
table = "pinpoint_logs"
encoding.timestamp_format = "unix"
batch.max_bytes = 1049000 # 1 MB
batch.timeout_secs = 5
buffer.max_size = 268435488
buffer.type = "disk"
buffer.when_full = "block"

Perfect! On running this pipeline with vector -c config.toml we can see the consumption the records

Hope this short post was useful if you’ve to do anything similar!

Fin!

Tags: #devops #clickhouse