Saltar al contenido principal

Facebook Ads Integration - Arquitectura

Diagrama de Arquitectura

flowchart TB
subgraph Railway["Railway Platform"]
FB[Facebook Automation Worker]
DB[(PostgreSQL Database)]
end

subgraph CDP["CDP System"]
CFG[facebook_configurations]
CUST[customers_master]
RFM[cdp_rfm_analysis]
CLV[cdp_clv_analysis]
end

subgraph Facebook["Facebook Platform"]
API[Facebook Ads API]
AUD1[Custom Audience - Kangoo]
AUD2[Custom Audience - Septimo]
AUD3[Custom Audience - DigitalFarma]
end

FB -->|Query Config| CFG
FB -->|Extract Customers| CUST
FB -->|Get Segments| RFM
FB -->|Get CLV| CLV
FB -->|Sync Audiences| API
API --> AUD1
API --> AUD2
API --> AUD3

CFG --> DB
CUST --> DB
RFM --> DB
CLV --> DB

Componentes Principales

1. Worker Service (Railway)

Entry Point: start_facebook_automation.py

# Flujo principal del worker
1. Cargar configuraciones desde DB
2. Para cada sistema:
a. Obtener lista de tenants
b. Extraer clientes de cada tenant
c. Consolidar audiencias
d. Hashear datos (SHA-256)
e. Sync via Facebook API
f. Validar y registrar resultados

Variables de Entorno:

DATABASE_URL=postgresql://...
FACEBOOK_ACCESS_TOKEN=EAAJs4xreU3IBPp7...
WORKER_MODE=facebook
RUN_MODE=scheduler
TZ=America/Argentina/Buenos_Aires

2. Base de Datos

Tabla: cdp.facebook_configurations

CREATE TABLE cdp.facebook_configurations (
id SERIAL PRIMARY KEY,
system_name VARCHAR(100) UNIQUE NOT NULL,
ad_account_id VARCHAR(50) NOT NULL,
tenant_ids INTEGER[] NOT NULL,
custom_audience_id VARCHAR(50),
is_active BOOLEAN DEFAULT true,
schedule_hours INTEGER DEFAULT 48,
last_sync TIMESTAMP WITH TIME ZONE,
next_sync TIMESTAMP WITH TIME ZONE,
total_customers INTEGER DEFAULT 0,
sync_status VARCHAR(50),
error_log TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

Configuraciones Actuales:

system_namead_account_idtenant_idscustom_audience_id
kangoo_systemact_323379512251780[20, 25]-
septimo_systemact_881526052911252[100, 107, 112, 118]-
digitalfarma_systemact_2850528471874859[28]-

3. Facebook Ads API Integration

API Version: v18.0 Endpoint Base: https://graph.facebook.com/v18.0/

Custom Audiences API

# Crear Custom Audience
POST /{ad_account_id}/customaudiences
{
"name": "CDP - Kangoo System - 2024-10",
"subtype": "CUSTOM",
"description": "Audiencia sincronizada desde Nerdistan CDP",
"customer_file_source": "USER_PROVIDED_ONLY"
}

# Agregar usuarios a Audience
POST /{custom_audience_id}/users
{
"payload": {
"schema": ["EMAIL", "PHONE", "FN", "LN"],
"data": [
["hash_email", "hash_phone", "hash_firstname", "hash_lastname"]
]
}
}

Hashing de Datos (SHA-256)

import hashlib

def hash_data(value: str) -> str:
"""Hash data con SHA-256 según especificación de Facebook"""
if not value:
return ""

# Normalizar
normalized = value.lower().strip()

# Hashear
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()

4. Scheduler Sistema

Cron Schedule: Cada 48 horas (10:00 AM Argentina)

# Scheduler logic en start_facebook_automation.py
import schedule
import time
from datetime import datetime, timedelta

def schedule_sync():
"""Programa sincronización cada 48 horas"""
schedule.every(48).hours.do(run_facebook_sync)

while True:
schedule.run_pending()
time.sleep(3600) # Check cada hora

Flujo de Sincronización

Paso 1: Cargar Configuraciones

def load_active_configurations():
"""Cargar sistemas activos desde DB"""
query = """
SELECT
system_name,
ad_account_id,
tenant_ids,
custom_audience_id,
schedule_hours
FROM cdp.facebook_configurations
WHERE is_active = true
ORDER BY system_name
"""
return execute_query(query)

Paso 2: Extraer Clientes

def get_customers_for_system(tenant_ids: list):
"""Extraer clientes de múltiples tenants"""
query = """
SELECT DISTINCT
c.email,
c.phone,
c.firstname,
c.lastname,
c.tenant_id
FROM customers_master c
INNER JOIN cdp_rfm_analysis r ON c.id = r.customer_id
WHERE c.tenant_id = ANY(%s)
AND c.email IS NOT NULL
AND c.email != ''
AND r.rfm_segment NOT IN ('Lost', 'Hibernating')
ORDER BY c.tenant_id, c.email
"""
return execute_query(query, (tenant_ids,))

Paso 3: Preparar Payload

def prepare_facebook_payload(customers: list):
"""Preparar datos para Facebook API"""
data = []

for customer in customers:
row = [
hash_data(customer['email']),
hash_data(customer['phone']),
hash_data(customer['firstname']),
hash_data(customer['lastname'])
]
data.append(row)

return {
"schema": ["EMAIL", "PHONE", "FN", "LN"],
"data": data
}

Paso 4: Sync a Facebook

def sync_to_facebook(ad_account_id, payload, system_name):
"""Sincronizar audiencia a Facebook"""

# 1. Crear o actualizar Custom Audience
audience_id = create_or_update_audience(
ad_account_id=ad_account_id,
name=f"CDP - {system_name} - {datetime.now().strftime('%Y-%m')}",
description="Audiencia sincronizada desde Nerdistan CDP"
)

# 2. Agregar usuarios (batches de 10,000)
batch_size = 10000
total_added = 0

for i in range(0, len(payload['data']), batch_size):
batch = payload['data'][i:i+batch_size]

response = add_users_to_audience(
audience_id=audience_id,
schema=payload['schema'],
data=batch
)

total_added += response.get('num_received', 0)

return {
'audience_id': audience_id,
'total_added': total_added
}

Paso 5: Actualizar Status

def update_sync_status(system_name, result):
"""Actualizar estado de sincronización en DB"""
query = """
UPDATE cdp.facebook_configurations
SET
last_sync = CURRENT_TIMESTAMP,
next_sync = CURRENT_TIMESTAMP + INTERVAL '48 hours',
total_customers = %s,
custom_audience_id = %s,
sync_status = 'success',
error_log = NULL,
updated_at = CURRENT_TIMESTAMP
WHERE system_name = %s
"""
execute_query(query, (
result['total_added'],
result['audience_id'],
system_name
))

Error Handling

Retry Logic

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def facebook_api_call_with_retry(endpoint, payload):
"""API call con retry automático"""
try:
response = requests.post(endpoint, json=payload)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"API call failed: {e}")
raise

Error Logging

def log_sync_error(system_name, error):
"""Registrar errores en DB"""
query = """
UPDATE cdp.facebook_configurations
SET
sync_status = 'failed',
error_log = %s,
updated_at = CURRENT_TIMESTAMP
WHERE system_name = %s
"""
execute_query(query, (str(error), system_name))

Seguridad y Privacidad

1. Hashing de Datos Personales

Todos los datos personales (email, teléfono, nombre) se hashean con SHA-256 antes de enviar a Facebook.

2. Access Token Management

# Token con permisos específicos
REQUIRED_PERMISSIONS = [
'ads_management',
'ads_read',
'business_management'
]

# Token de larga duración (60 días)
# Renovar antes de expiración

3. Data Privacy

  • Solo se sincronizan clientes activos (RFM segments excluye Lost/Hibernating)
  • Emails validados (no vacíos, formato correcto)
  • Cumplimiento GDPR/LGPD
  • Opt-out respetado

Performance

Optimizaciones Implementadas

  1. Batch Processing: 10,000 usuarios por request
  2. Database Indexing: Indices en tenant_id, email
  3. Async Processing: Procesos paralelos por sistema
  4. Caching: Configuraciones cacheadas en memoria

Métricas Esperadas

MétricaValor
Tiempo de Sync (50k usuarios)~5-10 minutos
API Rate Limit200 requests/hora
Batch Size10,000 usuarios
Retry Attempts3 max

Escalabilidad

Agregar Nuevos Sistemas

-- Insertar nueva configuración
INSERT INTO cdp.facebook_configurations (
system_name,
ad_account_id,
tenant_ids,
is_active,
schedule_hours
) VALUES (
'nuevo_sistema',
'act_123456789',
ARRAY[30, 31, 32],
true,
48
);

El worker automáticamente detectará y procesará la nueva configuración en el próximo ciclo.

Multi-Region Support

Sistema preparado para soportar múltiples regiones:

  • Configuración por timezone
  • Custom schedules por sistema
  • Region-specific ad accounts

Monitoring Points

  1. Database Logs: cdp.facebook_configurations.sync_status
  2. Worker Logs: Railway service logs
  3. Facebook Insights: Custom Audience size & reach
  4. Error Alerts: Slack/Email notifications (futuro)

Próximos Pasos