Saltar al contenido principal

CDP Processing API

Overview

El módulo de procesamiento CDP es el corazón del sistema, responsable de ejecutar análisis avanzados sobre los datos de clientes. Procesa más de 149,522 consumers en producción con análisis RFM, CLV y Churn.

🚀 Características Principales

  • Procesamiento en batch de grandes volúmenes de datos
  • Análisis RFM con 11 segmentos automáticos
  • Cálculo de CLV predictivo
  • Detección de churn con machine learning
  • Cache inteligente para optimización de performance

📊 Estadísticas de Producción

TenantNombreConsumers ProcesadosTiempo Promedio
56Chelsea IO - Exit65,22645s
52Celada SA - BAPRO45,28232s
53JJ Deportes - BAPRO29,70721s
55Chelsea IO - Principal9,1078s
1Default2002s

API Endpoints

1. Process Tenant Data

Ejecuta análisis CDP completo o parcial para un tenant.

POST /api/v2/cdp/process/{tenantId}

Parámetros

ParámetroTipoRequeridoDescripción
tenantIdintegerID del tenant a procesar

Request Body

{
"analyses": ["rfm", "clv", "churn"],
"force": false,
"batch_size": 1000
}
CampoTipoDefaultDescripción
analysesarray["rfm"]Análisis a ejecutar
forcebooleanfalseForzar reprocesamiento
batch_sizeinteger1000Tamaño del batch

Response Success (200)

{
"success": true,
"message": "CDP processing completed successfully",
"tenant_id": 56,
"results": {
"rfm": {
"total_processed": 65226,
"segments": [
{
"segment": "Champions",
"count": 16125,
"percentage": 24.72
},
{
"segment": "Loyal Customers",
"count": 12500,
"percentage": 19.17
},
{
"segment": "Need Attention",
"count": 23010,
"percentage": 35.28
}
],
"processing_time": 45.2,
"errors": 0
},
"clv": {
"processed": 65226,
"avg_clv": 15000.50,
"processing_time": 28.3
},
"churn": {
"processed": 65226,
"at_risk": 4456,
"churn_rate": 0.068,
"processing_time": 31.5
}
},
"total_time": 105.0
}

Error Responses

404 Not Found

{
"success": false,
"error": "Tenant not found",
"details": "No tenant exists with ID: 999"
}

409 Conflict

{
"success": false,
"error": "Processing already in progress",
"details": "Another processing job is running for tenant 56"
}

2. Get Processing Status

Verifica el estado del procesamiento para un tenant.

GET /api/v2/cdp/process/status/{tenantId}

Response

{
"status": "processed",
"last_processed": "2024-09-16T19:16:45.665Z",
"hours_since_last_sync": 2,
"is_stale": false,
"records_processed": "45282",
"next_scheduled": "2024-09-17T01:00:00.000Z",
"processing_queue": {
"position": null,
"estimated_time": null
}
}

Estados Posibles

EstadoDescripción
never_processedNunca se ha procesado
processingActualmente procesando
processedProcesamiento completado
failedÚltimo procesamiento falló
queuedEn cola para procesamiento

3. Schedule Processing

Programa procesamiento futuro para un tenant.

POST /api/v2/cdp/process/{tenantId}/schedule

Request Body

{
"schedule_at": "2024-09-17T03:00:00Z",
"recurring": true,
"frequency": "daily",
"analyses": ["rfm", "clv"]
}

4. Cancel Processing

Cancela un procesamiento en progreso.

DELETE /api/v2/cdp/process/{tenantId}/cancel

🔄 Flujo de Procesamiento

graph TD
A[Inicio] --> B{Tenant Válido?}
B -->|No| C[Error 404]
B -->|Sí| D{En Proceso?}
D -->|Sí| E[Error 409]
D -->|No| F[Obtener Consumers]
F --> G[Calcular RFM]
G --> H[Asignar Segmentos]
H --> I[Calcular CLV]
I --> J[Detectar Churn]
J --> K[Guardar en BD]
K --> L[Cache Results]
L --> M[Respuesta 200]

💾 Tablas de Base de Datos

cdp_rfm_analysis

CREATE TABLE cdp_rfm_analysis (
id BIGSERIAL PRIMARY KEY,
tenant_id BIGINT NOT NULL,
customer_id BIGINT NOT NULL,
email VARCHAR(255),
recency_days INTEGER,
frequency INTEGER,
monetary NUMERIC(15,2),
r_score INTEGER CHECK (r_score BETWEEN 1 AND 5),
f_score INTEGER CHECK (f_score BETWEEN 1 AND 5),
m_score INTEGER CHECK (m_score BETWEEN 1 AND 5),
rfm_score VARCHAR(3),
segment VARCHAR(50),
processed_at TIMESTAMP DEFAULT NOW()
);

🎯 Segmentos RFM

Los 11 segmentos automáticos:

  1. Champions (555, 554, 544, 545, 454, 455, 445)

    • Compraron recientemente, con frecuencia y gastan mucho
  2. Loyal Customers (543, 444, 435, 355, 354, 345, 344, 335)

    • Gastan buen dinero y responden a promociones
  3. Potential Loyalists (553, 551, 552, 541, 542, 533, 532, 531, 452, 451)

    • Clientes recientes con potencial
  4. Recent Customers (512, 511, 422, 421, 412, 411, 311)

    • Compraron recientemente por primera vez
  5. Promising (525, 524, 523, 522, 521, 515, 514, 513, 425, 424, 413, 414, 415, 315, 314, 313)

    • Compradores recientes con potencial de crecimiento
  6. Need Attention (535, 534, 443, 434, 343, 334, 325, 324)

    • Por encima del promedio pero en riesgo
  7. About to Sleep (331, 321, 312, 221, 213, 231, 241, 251)

    • Por debajo del promedio en todas las métricas
  8. At Risk (255, 254, 245, 244, 253, 252, 243, 242, 235, 234, 225, 224, 153, 152, 145, 143, 142, 135, 134, 133, 125, 124)

    • Gastaron mucho pero hace tiempo
  9. Cannot Lose Them (155, 154, 144, 214, 215, 115, 114, 113)

    • Grandes compradores que no han vuelto
  10. Hibernating (332, 322, 231, 241, 151, 141, 131, 121, 112, 122, 123, 132, 211, 212, 222, 223, 232, 233)

    • Última compra hace mucho tiempo
  11. Lost (111)

    • Menor puntuación en todas las métricas

🚀 Optimizaciones

Batch Processing

  • Procesa en lotes de 1000 registros
  • Uso de transacciones para consistencia
  • Índices optimizados para búsquedas rápidas

Caching Strategy

  • Cache de resultados por 1 hora
  • Invalidación automática en nuevos procesamientos
  • Cache warming para tenants principales

Performance Metrics

  • Throughput: ~1,500 customers/segundo
  • Memory usage: < 512MB para 100K customers
  • CPU usage: ~60% en procesamiento peak

📝 Ejemplos de Uso

Python

import requests

# Procesar tenant
response = requests.post(
'https://nerdistan-datalake-production.up.railway.app/api/v2/cdp/process/56',
json={'analyses': ['rfm', 'clv']}
)

if response.status_code == 200:
data = response.json()
print(f"Procesados: {data['results']['rfm']['total_processed']} customers")

JavaScript

const processTenan = async (tenantId) => {
const response = await fetch(
`https://nerdistan-datalake-production.up.railway.app/api/v2/cdp/process/${tenantId}`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ analyses: ['rfm'] })
}
);

const data = await response.json();
console.log(`Procesados: ${data.results.rfm.total_processed} customers`);
};

processTenan(56);

CURL

# Procesar RFM para tenant 56
curl -X POST \
https://nerdistan-datalake-production.up.railway.app/api/v2/cdp/process/56 \
-H "Content-Type: application/json" \
-d '{"analyses": ["rfm"]}'

# Verificar estado
curl https://nerdistan-datalake-production.up.railway.app/api/v2/cdp/process/status/56

🔧 Troubleshooting

Problema: Procesamiento lento

Solución:

  • Reducir batch_size a 500
  • Verificar índices en base de datos
  • Procesar en horarios de baja carga

Problema: Error 409 Conflict

Solución:

  • Esperar que termine el procesamiento actual
  • Usar endpoint /cancel si está atascado
  • Verificar con /status el estado actual

Problema: Datos no actualizados

Solución:

  • Usar parámetro force: true
  • Verificar is_stale en status
  • Revisar last_processed timestamp