Skip to content

Commit e24f8a9

Browse files
Add Meta Conversions API data source
1 parent 964ec16 commit e24f8a9

File tree

5 files changed

+596
-0
lines changed

5 files changed

+596
-0
lines changed

design.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# Meta Conversions API Data Source Design
2+
3+
## Goal
4+
Implement a PySpark Custom Data Source to write event data to the Meta Conversions API (CAPI). This enables Databricks users to send server-side events directly to Meta for ad optimization and measurement.
5+
6+
## Architecture
7+
The implementation will follow the `DataSource` V2 API in PySpark, specifically implementing a write-only data source.
8+
9+
### Components
10+
1. **MetaCapiDataSource**: The entry point, responsible for defining the name (`meta_capi`) and creating the writer.
11+
2. **MetaCapiWriter**: Handles the execution of write operations.
12+
- Validates configuration (Access Token, Pixel ID).
13+
- Batches records (Meta CAPI supports up to 1000 events per request).
14+
- Transforms Spark Rows into CAPI-compliant JSON payloads.
15+
- Sends POST requests to the Graph API.
16+
- Handles responses and errors.
17+
18+
## Configuration Options
19+
The data source will support the following options via `.option()`:
20+
- `access_token` (Required): Meta System User Access Token.
21+
- `pixel_id` (Required): The Meta Pixel ID (Dataset ID).
22+
- `api_version` (Optional): Graph API version (default: `v19.0`).
23+
- `batch_size` (Optional): Number of events per API request (default: `1000`, max is 1000).
24+
25+
## Schema & Data Mapping
26+
The data source expects the input DataFrame to contain columns that map to the [Meta CAPI Event structure](https://developers.facebook.com/docs/marketing-api/conversions-api/parameters).
27+
28+
To improve usability, the writer will support two modes:
29+
1. **Structured Mode**: Users provide columns matching the API structure (e.g., a `user_data` struct column, `custom_data` struct column).
30+
2. **Flat Mode** (optional/auto-detected): If `user_data` struct is missing, the writer looks for flat columns with specific prefixes or names and constructs the nested structure.
31+
- `email` -> `user_data.em` (will apply SHA256 if not already hashed - *nice to have*)
32+
- `phone` -> `user_data.ph`
33+
- `client_ip_address` -> `user_data.client_ip_address`
34+
- `event_name` -> `event_name`
35+
- `event_time` -> `event_time` (converts timestamp to Unix integer)
36+
- `value` -> `custom_data.value`
37+
- `currency` -> `custom_data.currency`
38+
39+
*Decision*: For the initial implementation, we will prioritize **Structured Mode** correctness but add basic **Flat Mode** mapping for common fields (`email`, `event_name`, `event_time`, `value`, `currency`) to simplify the "Notebook" experience mentioned in the PRD.
40+
41+
## API Details
42+
- **Endpoint**: `https://graph.facebook.com/{api_version}/{pixel_id}/events`
43+
- **Method**: `POST`
44+
- **Headers**: `Content-Type: application/json`
45+
- **Payload**:
46+
```json
47+
{
48+
"access_token": "...",
49+
"data": [
50+
{
51+
"event_name": "Purchase",
52+
"event_time": 1698765432,
53+
"action_source": "website",
54+
"user_data": {
55+
"em": ["7b..."],
56+
"ph": ["..."]
57+
},
58+
"custom_data": {
59+
"currency": "USD",
60+
"value": 100.0
61+
}
62+
}
63+
]
64+
}
65+
```
66+
*Note: `access_token` can be in the query param or body. We will use query param or body as recommended.*
67+
68+
## Implementation Plan
69+
1. Create `pyspark_datasources/meta_capi.py`.
70+
2. Implement `MetaCapiDataSource` class.
71+
3. Implement `MetaCapiWriter` class.
72+
4. Implement helper method `_transform_row_to_event(row)` to handle schema mapping and type conversion (e.g. datetime to unix timestamp).
73+
5. Implement `_send_batch(events)` using `requests` library.
74+
6. Add error handling (retries for 5xx, logging for 4xx).
75+
76+
## Dependencies
77+
- `requests`: For HTTP calls.
78+

examples/meta_capi_example.py

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
"""
4+
Meta Conversions API (CAPI) Datasource Example
5+
6+
This example demonstrates how to use the MetaCapiDataSource as a streaming datasource
7+
to write event data to Meta for ad optimization.
8+
9+
Requirements:
10+
- PySpark
11+
- requests
12+
- Valid Meta System User Access Token and Pixel ID
13+
14+
Setup:
15+
pip install pyspark requests
16+
17+
Environment Variables:
18+
export META_ACCESS_TOKEN="your-access-token"
19+
export META_PIXEL_ID="your-pixel-id"
20+
"""
21+
22+
import os
23+
import tempfile
24+
import time
25+
from pyspark.sql import SparkSession
26+
from pyspark.sql.functions import col, lit, current_timestamp, unix_timestamp
27+
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
28+
29+
def check_credentials():
30+
"""Check if Meta credentials are available"""
31+
token = os.getenv("META_ACCESS_TOKEN")
32+
pixel_id = os.getenv("META_PIXEL_ID")
33+
34+
if not all([token, pixel_id]):
35+
print("❌ Missing Meta credentials!")
36+
print("Please set the following environment variables:")
37+
print(" export META_ACCESS_TOKEN='your-access-token'")
38+
print(" export META_PIXEL_ID='your-pixel-id'")
39+
return False, None, None
40+
41+
print(f"✅ Using Pixel ID: {pixel_id}")
42+
return True, token, pixel_id
43+
44+
def example_1_rate_source_to_capi():
45+
"""Example 1: Stream simulated purchases to Meta CAPI"""
46+
print("\n" + "=" * 60)
47+
print("EXAMPLE 1: Simulated Purchases → Meta CAPI (Streaming)")
48+
print("=" * 60)
49+
50+
has_creds, token, pixel_id = check_credentials()
51+
if not has_creds:
52+
return
53+
54+
spark = SparkSession.builder.appName("MetaCapiExample1").getOrCreate()
55+
56+
try:
57+
from pyspark_datasources.meta_capi import MetaCapiDataSource
58+
spark.dataSource.register(MetaCapiDataSource)
59+
print("✅ Meta CAPI datasource registered")
60+
61+
# Create streaming data (simulating 1 purchase per second)
62+
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()
63+
64+
# Transform to CAPI format (Flat Mode)
65+
# We simulate user data. In production, this comes from your tables.
66+
events_df = streaming_df.select(
67+
lit("Purchase").alias("event_name"),
68+
col("timestamp").alias("event_time"),
69+
lit("test@example.com").alias("email"), # Will be auto-hashed
70+
lit("website").alias("action_source"),
71+
(col("value") * 10.0 + 5.0).alias("value"),
72+
lit("USD").alias("currency"),
73+
lit("TEST12345").alias("test_event_code") # For testing in Events Manager
74+
)
75+
76+
print("📊 Starting streaming write to Meta CAPI...")
77+
print(" Check your Events Manager 'Test Events' tab!")
78+
79+
# Write to Meta CAPI
80+
query = (
81+
events_df.writeStream.format("meta_capi")
82+
.option("access_token", token)
83+
.option("pixel_id", pixel_id)
84+
.option("test_event_code", "TEST12345") # Optional: direct test code option
85+
.option("batch_size", "10")
86+
.option("checkpointLocation", "/tmp/meta_capi_example1_checkpoint")
87+
.trigger(processingTime="10 seconds")
88+
.start()
89+
)
90+
91+
# Run for 30 seconds then stop
92+
time.sleep(30)
93+
query.stop()
94+
print("✅ Streaming stopped")
95+
96+
except Exception as e:
97+
print(f"❌ Error: {e}")
98+
finally:
99+
spark.stop()
100+
101+
def example_2_batch_dataframe_to_capi():
102+
"""Example 2: Batch write a static DataFrame to Meta CAPI"""
103+
print("\n" + "=" * 60)
104+
print("EXAMPLE 2: Static DataFrame → Meta CAPI (Batch)")
105+
print("=" * 60)
106+
107+
has_creds, token, pixel_id = check_credentials()
108+
if not has_creds:
109+
return
110+
111+
spark = SparkSession.builder.appName("MetaCapiExample2").getOrCreate()
112+
113+
try:
114+
from pyspark_datasources.meta_capi import MetaCapiDataSource
115+
spark.dataSource.register(MetaCapiDataSource)
116+
print("✅ Meta CAPI datasource registered")
117+
118+
# Create sample data
119+
data = [
120+
("Purchase", 1700000001, "user1@example.com", 120.50, "USD"),
121+
("Purchase", 1700000002, "user2@example.com", 85.00, "USD"),
122+
("AddToCart", 1700000003, "user3@example.com", 25.99, "USD"),
123+
]
124+
125+
columns = ["event_name", "event_time", "email", "value", "currency"]
126+
df = spark.createDataFrame(data, columns)
127+
128+
# Add optional fields
129+
df = df.withColumn("action_source", lit("website")) \
130+
.withColumn("test_event_code", lit("TEST12345"))
131+
132+
print(f"📊 Writing {df.count()} records to Meta CAPI in batch mode...")
133+
print(" Check your Events Manager 'Test Events' tab!")
134+
135+
# Write to Meta CAPI (Batch)
136+
df.write.format("meta_capi") \
137+
.option("access_token", token) \
138+
.option("pixel_id", pixel_id) \
139+
.option("test_event_code", "TEST12345") \
140+
.option("batch_size", "50") \
141+
.save()
142+
143+
print("✅ Batch write completed")
144+
145+
except Exception as e:
146+
print(f"❌ Error: {e}")
147+
finally:
148+
spark.stop()
149+
150+
def main():
151+
print("🚀 Meta CAPI Datasource Example")
152+
example_1_rate_source_to_capi()
153+
example_2_batch_dataframe_to_capi()
154+
155+
if __name__ == "__main__":
156+
main()

pyspark_datasources/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@
1010
from .simplejson import SimpleJsonDataSource
1111
from .stock import StockDataSource
1212
from .jsonplaceholder import JSONPlaceholderDataSource
13+
from .meta_capi import MetaCapiDataSource

0 commit comments

Comments
 (0)