Skip to content

Commit e94885e

Browse files
Add Meta Conversions API data source
1 parent 964ec16 commit e94885e

File tree

4 files changed

+518
-0
lines changed

4 files changed

+518
-0
lines changed

examples/meta_capi_example.py

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
"""
4+
Meta Conversions API (CAPI) Datasource Example
5+
6+
This example demonstrates how to use the MetaCapiDataSource as a streaming datasource
7+
to write event data to Meta for ad optimization.
8+
9+
Requirements:
10+
- PySpark
11+
- requests
12+
- Valid Meta System User Access Token and Pixel ID
13+
14+
Setup:
15+
pip install pyspark requests
16+
17+
Environment Variables:
18+
export META_ACCESS_TOKEN="your-access-token"
19+
export META_PIXEL_ID="your-pixel-id"
20+
"""
21+
22+
import os
23+
import tempfile
24+
import time
25+
from pyspark.sql import SparkSession
26+
from pyspark.sql.functions import col, lit, current_timestamp, unix_timestamp
27+
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
28+
29+
def check_credentials():
30+
"""Check if Meta credentials are available"""
31+
token = os.getenv("META_ACCESS_TOKEN")
32+
pixel_id = os.getenv("META_PIXEL_ID")
33+
34+
if not all([token, pixel_id]):
35+
print("❌ Missing Meta credentials!")
36+
print("Please set the following environment variables:")
37+
print(" export META_ACCESS_TOKEN='your-access-token'")
38+
print(" export META_PIXEL_ID='your-pixel-id'")
39+
return False, None, None
40+
41+
print(f"✅ Using Pixel ID: {pixel_id}")
42+
return True, token, pixel_id
43+
44+
def example_1_rate_source_to_capi():
45+
"""Example 1: Stream simulated purchases to Meta CAPI"""
46+
print("\n" + "=" * 60)
47+
print("EXAMPLE 1: Simulated Purchases → Meta CAPI (Streaming)")
48+
print("=" * 60)
49+
50+
has_creds, token, pixel_id = check_credentials()
51+
if not has_creds:
52+
return
53+
54+
spark = SparkSession.builder.appName("MetaCapiExample1").getOrCreate()
55+
56+
try:
57+
from pyspark_datasources.meta_capi import MetaCapiDataSource
58+
spark.dataSource.register(MetaCapiDataSource)
59+
print("✅ Meta CAPI datasource registered")
60+
61+
# Create streaming data (simulating 1 purchase per second)
62+
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()
63+
64+
# Transform to CAPI format (Flat Mode)
65+
# We simulate user data. In production, this comes from your tables.
66+
events_df = streaming_df.select(
67+
lit("Purchase").alias("event_name"),
68+
col("timestamp").alias("event_time"),
69+
lit("test@example.com").alias("email"), # Will be auto-hashed
70+
lit("website").alias("action_source"),
71+
(col("value") * 10.0 + 5.0).alias("value"),
72+
lit("USD").alias("currency"),
73+
lit("TEST12345").alias("test_event_code") # For testing in Events Manager
74+
)
75+
76+
print("📊 Starting streaming write to Meta CAPI...")
77+
print(" Check your Events Manager 'Test Events' tab!")
78+
79+
# Write to Meta CAPI
80+
query = (
81+
events_df.writeStream.format("meta_capi")
82+
.option("access_token", token)
83+
.option("pixel_id", pixel_id)
84+
.option("test_event_code", "TEST12345") # Optional: direct test code option
85+
.option("batch_size", "10")
86+
.option("checkpointLocation", "/tmp/meta_capi_example1_checkpoint")
87+
.trigger(processingTime="10 seconds")
88+
.start()
89+
)
90+
91+
# Run for 30 seconds then stop
92+
time.sleep(30)
93+
query.stop()
94+
print("✅ Streaming stopped")
95+
96+
except Exception as e:
97+
print(f"❌ Error: {e}")
98+
finally:
99+
spark.stop()
100+
101+
def example_2_batch_dataframe_to_capi():
102+
"""Example 2: Batch write a static DataFrame to Meta CAPI"""
103+
print("\n" + "=" * 60)
104+
print("EXAMPLE 2: Static DataFrame → Meta CAPI (Batch)")
105+
print("=" * 60)
106+
107+
has_creds, token, pixel_id = check_credentials()
108+
if not has_creds:
109+
return
110+
111+
spark = SparkSession.builder.appName("MetaCapiExample2").getOrCreate()
112+
113+
try:
114+
from pyspark_datasources.meta_capi import MetaCapiDataSource
115+
spark.dataSource.register(MetaCapiDataSource)
116+
print("✅ Meta CAPI datasource registered")
117+
118+
# Create sample data
119+
data = [
120+
("Purchase", 1700000001, "user1@example.com", 120.50, "USD"),
121+
("Purchase", 1700000002, "user2@example.com", 85.00, "USD"),
122+
("AddToCart", 1700000003, "user3@example.com", 25.99, "USD"),
123+
]
124+
125+
columns = ["event_name", "event_time", "email", "value", "currency"]
126+
df = spark.createDataFrame(data, columns)
127+
128+
# Add optional fields
129+
df = df.withColumn("action_source", lit("website")) \
130+
.withColumn("test_event_code", lit("TEST12345"))
131+
132+
print(f"📊 Writing {df.count()} records to Meta CAPI in batch mode...")
133+
print(" Check your Events Manager 'Test Events' tab!")
134+
135+
# Write to Meta CAPI (Batch)
136+
df.write.format("meta_capi") \
137+
.option("access_token", token) \
138+
.option("pixel_id", pixel_id) \
139+
.option("test_event_code", "TEST12345") \
140+
.option("batch_size", "50") \
141+
.save()
142+
143+
print("✅ Batch write completed")
144+
145+
except Exception as e:
146+
print(f"❌ Error: {e}")
147+
finally:
148+
spark.stop()
149+
150+
def main():
151+
print("🚀 Meta CAPI Datasource Example")
152+
example_1_rate_source_to_capi()
153+
example_2_batch_dataframe_to_capi()
154+
155+
if __name__ == "__main__":
156+
main()

pyspark_datasources/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@
1010
from .simplejson import SimpleJsonDataSource
1111
from .stock import StockDataSource
1212
from .jsonplaceholder import JSONPlaceholderDataSource
13+
from .meta_capi import MetaCapiDataSource

0 commit comments

Comments
 (0)