
Patrones de Diseño - Aplicaciones Avanzadas
Aprende aplicaciones avanzadas de patrones de diseño en arquitecturas empresariales como microservicios, CQRS, event sourcing y más con ejemplos prácticos en Python.
¡Domina las aplicaciones avanzadas de patrones de diseño! En este tutorial especializado te guiaré paso a paso para que aprendas cómo aplicar patrones en arquitecturas empresariales modernas, incluyendo microservicios, CQRS, event sourcing, circuit breaker y API gateway, con ejemplos prácticos y casos de uso reales en Python.
Objetivo: Aprender aplicaciones avanzadas de patrones de diseño en arquitecturas empresariales, sus implementaciones en Python, ventajas, desventajas y cuándo aplicarlos en proyectos de gran escala.
Índice
- Paso 1: Introducción a patrones avanzados
- Paso 2: Circuit Breaker - Tolerancia a fallos
- Paso 3: API Gateway - Puerta de enlace
- Paso 4: Arquitectura de microservicios
- Paso 5: CQRS - Separación de lecturas y escrituras
- Paso 6: Event Sourcing - Fuente de eventos
- Paso 7: Saga Pattern - Transacciones distribuidas
- Paso 8: Caso de estudio - Sistema de e-commerce
- Paso 9: Consideraciones de rendimiento y escalabilidad
- Conclusión
- 💡 Tip Importante
Paso 1: Introducción a patrones avanzados
Los patrones avanzados de diseño se enfocan en resolver problemas de arquitectura a gran escala, especialmente en sistemas distribuidos y aplicaciones empresariales complejas.
¿Por qué son importantes?
- Escalabilidad: Permiten manejar crecimiento exponencial
- Resiliencia: Sistemas que se recuperan de fallos automáticamente
- Mantenibilidad: Arquitecturas que evolucionan sin romperse
- Performance: Optimización para cargas de trabajo intensivas
Patrones avanzados comunes
Patrón | Propósito | Complejidad |
---|---|---|
Circuit Breaker | Prevenir fallos en cascada | Media |
API Gateway | Punto de entrada único | Alta |
Microservicios | Arquitectura distribuida | Alta |
CQRS | Separar lecturas/escrituras | Alta |
Event Sourcing | Almacenar eventos en lugar de estado | Alta |
Saga Pattern | Transacciones distribuidas | Alta |
Paso 2: Circuit Breaker - Tolerancia a fallos
El patrón Circuit Breaker previene fallos en cascada en sistemas distribuidos, detectando fallos y evitando llamadas repetidas a servicios que están fallando.
Implementación básica
import time
import random
from enum import Enum
from typing import Callable, Any
class EstadoCircuitBreaker(Enum):
CERRADO = "CERRADO" # Funcionamiento normal
ABIERTO = "ABIERTO" # Circuito abierto, no permite llamadas
MEDIO_ABIERTO = "MEDIO_ABIERTO" # Probando si el servicio se recuperó
class CircuitBreaker:
def __init__(self, nombre: str, umbral_fallos: int = 3, tiempo_timeout: float = 5.0):
self.nombre = nombre
self.umbral_fallos = umbral_fallos
self.tiempo_timeout = tiempo_timeout
self.estado = EstadoCircuitBreaker.CERRADO
self.contador_fallos = 0
self.ultimo_fallo_tiempo = 0
self.ultimo_exito_tiempo = 0
def ejecutar(self, funcion: Callable[..., Any], *args, **kwargs) -> Any:
if self.estado == EstadoCircuitBreaker.ABIERTO:
# Verificar si es tiempo de intentar recuperación
tiempo_actual = time.time()
if tiempo_actual - self.ultimo_fallo_tiempo > self.tiempo_timeout:
self.estado = EstadoCircuitBreaker.MEDIO_ABIERTO
print(f"🔄 {self.nombre}: Cambiando a estado MEDIO_ABIERTO")
else:
raise Exception(f"Circuito {self.nombre} ABIERTO - Servicio no disponible")
try:
resultado = funcion(*args, **kwargs)
# Si la llamada es exitosa, resetear contadores
if self.estado == EstadoCircuitBreaker.MEDIO_ABIERTO:
self.estado = EstadoCircuitBreaker.CERRADO
self.contador_fallos = 0
print(f"✅ {self.nombre}: Servicio recuperado, circuito CERRADO")
self.ultimo_exito_tiempo = time.time()
return resultado
except Exception as e:
self.contador_fallos += 1
self.ultimo_fallo_tiempo = time.time()
print(f"❌ {self.nombre}: Fallo #{self.contador_fallos} - {e}")
if (self.estado == EstadoCircuitBreaker.CERRADO and
self.contador_fallos >= self.umbral_fallos):
self.estado = EstadoCircuitBreaker.ABIERTO
print(f"🚨 {self.nombre}: Umbral de fallos alcanzado, circuito ABIERTO")
elif self.estado == EstadoCircuitBreaker.MEDIO_ABIERTO:
self.estado = EstadoCircuitBreaker.ABIERTO
print(f"🚨 {self.nombre}: Fallo en prueba de recuperación, circuito ABIERTO")
raise e
# Servicio externo simulado con fallos
class ServicioExterno:
def __init__(self, tasa_fallo: float = 0.3):
self.tasa_fallo = tasa_fallo
self.llamadas = 0
def llamar(self) -> str:
self.llamadas += 1
# Simular fallo aleatorio
if random.random() < self.tasa_fallo:
raise Exception("Servicio externo no disponible")
return f"Respuesta del servicio #{self.llamadas}"
# Cliente que usa Circuit Breaker
class ClienteServicio:
def __init__(self, servicio: ServicioExterno):
self.servicio = servicio
self.circuit_breaker = CircuitBreaker(
nombre="servicio_externo",
umbral_fallos=2,
tiempo_timeout=3.0
)
def hacer_llamada(self) -> str:
return self.circuit_breaker.ejecutar(self.servicio.llamar)
# Demo
def demo_circuit_breaker():
servicio = ServicioExterno(tasa_fallo=0.7) # 70% de fallos
cliente = ClienteServicio(servicio)
for i in range(10):
try:
resultado = cliente.hacer_llamada()
print(f"✅ Llamada {i+1}: {resultado}")
except Exception as e:
print(f"❌ Llamada {i+1}: {e}")
time.sleep(0.5) # Pequeña pausa entre llamadas
if __name__ == "__main__":
demo_circuit_breaker()
Paso 3: API Gateway - Puerta de enlace
El patrón API Gateway actúa como punto de entrada único para todas las solicitudes, manejando enrutamiento, composición y transformación.
Implementación básica
from abc import ABC, abstractmethod
from typing import Dict, Any, List
from dataclasses import dataclass
import json
import requests
from functools import wraps
import time
@dataclass
class Request:
method: str
path: str
headers: Dict[str, str]
body: Any
query_params: Dict[str, str]
@dataclass
class Response:
status_code: int
headers: Dict[str, str]
body: Any
class Gateway(ABC):
@abstractmethod
def manejar_request(self, request: Request) -> Response:
pass
class GatewaySimple(Gateway):
def __init__(self):
self.rutas = {}
self.middlewares = []
def agregar_ruta(self, path: str, destino: str):
self.rutas[path] = destino
def agregar_middleware(self, middleware):
self.middlewares.append(middleware)
def manejar_request(self, request: Request) -> Response:
# Aplicar middlewares
for middleware in self.middlewares:
request = middleware.pre_procesar(request)
# Enrutar
destino = self._encontrar_destino(request.path)
if not destino:
return Response(404, {}, {"error": "Ruta no encontrada"})
# Procesar request
try:
if destino.startswith("http"):
# Llamada HTTP
response = self._llamar_servicio_externo(request, destino)
else:
# Lógica interna
response = self._procesar_interno(request, destino)
except Exception as e:
response = Response(500, {}, {"error": str(e)})
# Aplicar middlewares post-procesamiento
for middleware in reversed(self.middlewares):
response = middleware.post_procesar(response)
return response
def _encontrar_destino(self, path: str) -> str:
for ruta, destino in self.rutas.items():
if path.startswith(ruta):
return destino
return None
def _llamar_servicio_externo(self, request: Request, destino: str) -> Response:
url = f"{destino}{request.path}"
# Simular llamada HTTP
print(f"🔄 Llamando a {url} con método {request.method}")
# En un caso real usaríamos requests o aiohttp
if request.method == "GET":
# Simular respuesta exitosa
return Response(200, {}, {"data": f"Respuesta de {destino}"})
else:
return Response(200, {}, {"message": "Operación completada"})
def _procesar_interno(self, request: Request, destino: str) -> Response:
# Lógica interna del gateway
if destino == "auth":
return self._procesar_auth(request)
elif destino == "metrics":
return self._procesar_metrics(request)
else:
return Response(404, {}, {"error": "Destino interno no encontrado"})
def _procesar_auth(self, request: Request) -> Response:
# Simular autenticación
token = request.headers.get("Authorization", "")
if token == "Bearer valid-token":
return Response(200, {}, {"authenticated": True, "user": "admin"})
else:
return Response(401, {}, {"error": "No autorizado"})
def _procesar_metrics(self, request: Request) -> Response:
# Métricas del gateway
return Response(200, {}, {
"requests_handled": 1000,
"uptime": "24h",
"status": "healthy"
})
# Middlewares
class Middleware(ABC):
def pre_procesar(self, request: Request) -> Request:
return request
def post_procesar(self, response: Response) -> Response:
return response
class LoggingMiddleware(Middleware):
def pre_procesar(self, request: Request) -> Request:
print(f"📝 Request: {request.method} {request.path}")
return request
def post_procesar(self, response: Response) -> Response:
print(f"📝 Response: {response.status_code}")
return response
class AuthMiddleware(Middleware):
def pre_procesar(self, request: Request) -> Request:
# Rutas que no requieren autenticación
rutas_publicas = ["/auth", "/health", "/metrics"]
if any(request.path.startswith(ruta) for ruta in rutas_publicas):
return request
# Verificar token
token = request.headers.get("Authorization", "")
if token != "Bearer valid-token":
raise Exception("No autorizado")
return request
class RateLimitingMiddleware(Middleware):
def __init__(self, limite_por_segundo: int = 10):
self.limite = limite_por_segundo
self.contador = 0
self.ultimo_reset = time.time()
def pre_procesar(self, request: Request) -> Request:
ahora = time.time()
if ahora - self.ultimo_reset >= 1.0:
self.contador = 0
self.ultimo_reset = ahora
if self.contador >= self.limite:
raise Exception("Límite de tasa excedido")
self.contador += 1
return request
# Demo
def demo_api_gateway():
gateway = GatewaySimple()
# Configurar rutas
gateway.agregar_ruta("/api/users", "http://users-service:3000")
gateway.agregar_ruta("/api/products", "http://products-service:3001")
gateway.agregar_ruta("/auth", "auth")
gateway.agregar_ruta("/metrics", "metrics")
# Agregar middlewares
gateway.agregar_middleware(LoggingMiddleware())
gateway.agregar_middleware(AuthMiddleware())
gateway.agregar_middleware(RateLimitingMiddleware(5))
# Simular requests
requests = [
Request("GET", "/auth", {"Authorization": "Bearer valid-token"}, None, {}),
Request("GET", "/api/users", {"Authorization": "Bearer valid-token"}, None, {}),
Request("GET", "/metrics", {"Authorization": "Bearer valid-token"}, None, {}),
Request("GET", "/api/products", {"Authorization": "invalid-token"}, None, {}),
]
for i, request in enumerate(requests):
print(f"\n--- Request {i+1} ---")
try:
response = gateway.manejar_request(request)
print(f"✅ Status: {response.status_code}")
print(f"Body: {response.body}")
except Exception as e:
print(f"❌ Error: {e}")
if __name__ == "__main__":
demo_api_gateway()
Paso 4: Arquitectura de microservicios
La arquitectura de microservicios estructura una aplicación como una colección de servicios débilmente acoplados.
Ejemplo de microservicios
# Servicio de usuarios
class ServicioUsuarios:
def __init__(self):
self.usuarios = {}
def crear_usuario(self, nombre: str, email: str) -> Dict[str, Any]:
usuario_id = len(self.usuarios) + 1
usuario = {
"id": usuario_id,
"nombre": nombre,
"email": email,
"fecha_creacion": time.time()
}
self.usuarios[usuario_id] = usuario
return usuario
def obtener_usuario(self, id: int) -> Optional[Dict[str, Any]]:
return self.usuarios.get(id)
def listar_usuarios(self) -> List[Dict[str, Any]]:
return list(self.usuarios.values())
# Servicio de productos
class ServicioProductos:
def __init__(self):
self.productos = {}
def crear_producto(self, nombre: str, precio: float) -> Dict[str, Any]:
producto_id = len(self.productos) + 1
producto = {
"id": producto_id,
"nombre": nombre,
"precio": precio,
"stock": 0
}
self.productos[producto_id] = producto
return producto
def actualizar_stock(self, producto_id: int, cantidad: int) -> bool:
if producto_id in self.productos:
self.productos[producto_id]["stock"] += cantidad
return True
return False
def listar_productos(self) -> List[Dict[str, Any]]:
return list(self.productos.values())
# Servicio de pedidos (orquestador)
class ServicioPedidos:
def __init__(self, servicio_usuarios, servicio_productos):
self.servicio_usuarios = servicio_usuarios
self.servicio_productos = servicio_productos
self.pedidos = {}
def crear_pedido(self, usuario_id: int, items: List[Dict[str, Any]]) -> Dict[str, Any]:
# Verificar usuario
usuario = self.servicio_usuarios.obtener_usuario(usuario_id)
if not usuario:
raise ValueError("Usuario no encontrado")
# Verificar productos y stock
total = 0
for item in items:
producto = self.servicio_productos.obtener_producto(item["producto_id"])
if not producto:
raise ValueError(f"Producto {item['producto_id']} no encontrado")
if producto["stock"] < item["cantidad"]:
raise ValueError(f"Stock insuficiente para producto {producto['nombre']}")
total += producto["precio"] * item["cantidad"]
# Crear pedido
pedido_id = len(self.pedidos) + 1
pedido = {
"id": pedido_id,
"usuario_id": usuario_id,
"items": items,
"total": total,
"estado": "creado",
"fecha_creacion": time.time()
}
self.pedidos[pedido_id] = pedido
# Actualizar stock (en un caso real sería transaccional)
for item in items:
self.servicio_productos.actualizar_stock(
item["producto_id"], -item["cantidad"]
)
return pedido
# Service Discovery
class ServiceDiscovery:
def __init__(self):
self.servicios = {}
def registrar_servicio(self, nombre: str, instancia):
self.servicios[nombre] = instancia
def obtener_servicio(self, nombre: str):
return self.servicios.get(nombre)
# Configuración del sistema de microservicios
def configurar_microservicios():
discovery = ServiceDiscovery()
# Crear servicios
servicio_usuarios = ServicioUsuarios()
servicio_productos = ServicioProductos()
servicio_pedidos = ServicioPedidos(servicio_usuarios, servicio_productos)
# Registrar servicios
discovery.registrar_servicio("usuarios", servicio_usuarios)
discovery.registrar_servicio("productos", servicio_productos)
discovery.registrar_servicio("pedidos", servicio_pedidos)
return discovery
# Demo
def demo_microservicios():
discovery = configurar_microservicios()
# Obtener servicios
usuarios = discovery.obtener_servicio("usuarios")
productos = discovery.obtener_servicio("productos")
pedidos = discovery.obtener_servicio("pedidos")
# Crear datos de prueba
usuario = usuarios.crear_usuario("Ana García", "[email protected]")
print(f"Usuario creado: {usuario['nombre']}")
producto1 = productos.crear_producto("Laptop", 1200.00)
producto2 = productos.crear_producto("Mouse", 45.99)
productos.actualizar_stock(producto1["id"], 10)
productos.actualizar_stock(producto2["id"], 20)
print(f"Productos creados: {len(productos.listar_productos())}")
# Crear pedido
items = [
{"producto_id": producto1["id"], "cantidad": 1},
{"producto_id": producto2["id"], "cantidad": 2}
]
try:
pedido = pedidos.crear_pedido(usuario["id"], items)
print(f"Pedido creado: ${pedido['total']}")
print(f"Estado: {pedido['estado']}")
except Exception as e:
print(f"Error al crear pedido: {e}")
# Mostrar estado final
print(f"Usuarios: {len(usuarios.listar_usuarios())}")
print(f"Productos: {len(productos.listar_productos())}")
print(f"Pedidos: {len(pedidos.pedidos)}")
if __name__ == "__main__":
demo_microservicios()
Paso 5: CQRS - Separación de lecturas y escrituras
El patrón CQRS (Command Query Responsibility Segregation) separa las operaciones de lectura y escritura en modelos diferentes.
Implementación básica
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import uuid
import time
# Command - Escrituras
@dataclass
class CrearUsuarioCommand:
nombre: str
email: str
direccion: str
@dataclass
class CrearProductoCommand:
nombre: str
precio: float
categoria: str
stock: int = 0
@dataclass
class CrearPedidoCommand:
usuario_id: str
items: List[Dict[str, Any]]
# Query - Lecturas
@dataclass
class ObtenerUsuarioQuery:
usuario_id: str
@dataclass
class ListarProductosQuery:
categoria: Optional[str] = None
@dataclass
class ObtenerPedidoQuery:
pedido_id: str
# Command Handler
class CommandHandler(ABC):
@abstractmethod
def manejar(self, command):
pass
class CrearUsuarioHandler(CommandHandler):
def __init__(self, repositorio_usuarios):
self.repositorio = repositorio_usuarios
def manejar(self, command: CrearUsuarioCommand):
# Validar que el email no exista
for usuario in self.repositorio.usuarios.values():
if usuario.email == command.email:
raise ValueError("Email ya registrado")
usuario_id = str(uuid.uuid4())
usuario = {
"id": usuario_id,
"nombre": command.nombre,
"email": command.email,
"direccion": command.direccion,
"fecha_creacion": time.time()
}
self.repositorio.usuarios[usuario_id] = usuario
return usuario
class CrearProductoHandler(CommandHandler):
def __init__(self, repositorio_productos):
self.repositorio = repositorio_productos
def manejar(self, command: CrearProductoCommand):
producto_id = str(uuid.uuid4())
producto = {
"id": producto_id,
"nombre": command.nombre,
"precio": command.precio,
"categoria": command.categoria,
"stock": command.stock
}
self.repositorio.productos[producto_id] = producto
return producto
# Query Handler
class QueryHandler(ABC):
@abstractmethod
def manejar(self, query):
pass
class ObtenerUsuarioHandler(QueryHandler):
def __init__(self, repositorio_usuarios):
self.repositorio = repositorio_usuarios
def manejar(self, query: ObtenerUsuarioQuery):
return self.repositorio.usuarios.get(query.usuario_id)
class ListarProductosHandler(QueryHandler):
def __init__(self, repositorio_productos):
self.repositorio = repositorio_productos
def manejar(self, query: ListarProductosQuery):
productos = list(self.repositorio.productos.values())
if query.categoria:
productos = [p for p in productos if p["categoria"] == query.categoria]
return productos
# Repositorios
class RepositorioUsuarios:
def __init__(self):
self.usuarios = {}
class RepositorioProductos:
def __init__(self):
self.productos = {}
# Bus de comandos y consultas
class CommandBus:
def __init__(self):
self.handlers = {}
def registrar_handler(self, tipo_command, handler):
self.handlers[tipo_command] = handler
def enviar(self, command):
handler = self.handlers.get(type(command))
if not handler:
raise ValueError(f"No hay handler para {type(command).__name__}")
return handler.manejar(command)
class QueryBus:
def __init__(self):
self.handlers = {}
def registrar_handler(self, tipo_query, handler):
self.handlers[tipo_query] = handler
def enviar(self, query):
handler = self.handlers.get(type(query))
if not handler:
raise ValueError(f"No hay handler para {type(query).__name__}")
return handler.manejar(query)
# Demo CQRS
def demo_cqrs():
# Configurar repositorios
repo_usuarios = RepositorioUsuarios()
repo_productos = RepositorioProductos()
# Configurar command bus
command_bus = CommandBus()
command_bus.registrar_handler(CrearUsuarioCommand, CrearUsuarioHandler(repo_usuarios))
command_bus.registrar_handler(CrearProductoCommand, CrearProductoHandler(repo_productos))
# Configurar query bus
query_bus = QueryBus()
query_bus.registrar_handler(ObtenerUsuarioQuery, ObtenerUsuarioHandler(repo_usuarios))
query_bus.registrar_handler(ListarProductosQuery, ListarProductosHandler(repo_productos))
# Ejecutar comandos (escrituras)
print("=== Ejecutando Comandos ===")
usuario = command_bus.enviar(CrearUsuarioCommand(
"Ana García", "[email protected]", "Calle Principal 123"
))
print(f"Usuario creado: {usuario['nombre']}")
producto1 = command_bus.enviar(CrearProductoCommand(
"Laptop Gaming", 1200.00, "tecnologia", 10
))
producto2 = command_bus.enviar(CrearProductoCommand(
"Mouse Inalámbrico", 45.99, "tecnologia", 20
))
print(f"Productos creados: {len(repo_productos.productos)}")
# Ejecutar consultas (lecturas)
print("\n=== Ejecutando Consultas ===")
usuario_consultado = query_bus.enviar(ObtenerUsuarioQuery(usuario["id"]))
print(f"Usuario consultado: {usuario_consultado['nombre']}")
productos_tecnologia = query_bus.enviar(ListarProductosQuery("tecnologia"))
print(f"Productos de tecnología: {len(productos_tecnologia)}")
for producto in productos_tecnologia:
print(f" - {producto['nombre']}: ${producto['precio']}")
if __name__ == "__main__":
demo_cqrs()
Paso 6: Event Sourcing - Fuente de eventos
El patrón Event Sourcing almacena el estado de una aplicación como una secuencia de eventos en lugar de solo el estado actual.
Implementación básica
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from dataclasses import dataclass
import json
import uuid
from datetime import datetime
# Eventos
@dataclass
class Evento:
id: str
tipo: str
datos: Dict[str, Any]
timestamp: datetime
version: int = 1
class UsuarioCreadoEvent(Evento):
def __init__(self, usuario_id, nombre, email):
super().__init__(
id=str(uuid.uuid4()),
tipo="UsuarioCreado",
datos={"usuario_id": usuario_id, "nombre": nombre, "email": email},
timestamp=datetime.now()
)
class ProductoCreadoEvent(Evento):
def __init__(self, producto_id, nombre, precio, categoria):
super().__init__(
id=str(uuid.uuid4()),
tipo="ProductoCreado",
datos={"producto_id": producto_id, "nombre": nombre, "precio": precio, "categoria": categoria},
timestamp=datetime.now()
)
class StockActualizadoEvent(Evento):
def __init__(self, producto_id, cantidad, nuevo_stock):
super().__init__(
id=str(uuid.uuid4()),
tipo="StockActualizado",
datos={"producto_id": producto_id, "cantidad": cantidad, "nuevo_stock": nuevo_stock},
timestamp=datetime.now()
)
# Aggregate (Agregado)
class ProductoAggregate:
def __init__(self, producto_id=None):
self.producto_id = producto_id
self.nombre = ""
self.precio = 0
self.categoria = ""
self.stock = 0
self.version = 0
self.cambios_pendientes = []
def crear(self, nombre, precio, categoria, stock=0):
self.producto_id = str(uuid.uuid4())
evento = ProductoCreadoEvent(self.producto_id, nombre, precio, categoria)
self.aplicar_cambio(evento)
if stock > 0:
self.actualizar_stock(stock)
return self
def actualizar_stock(self, cantidad):
nuevo_stock = self.stock + cantidad
evento = StockActualizadoEvent(self.producto_id, cantidad, nuevo_stock)
self.aplicar_cambio(evento)
return self
def aplicar_cambio(self, evento: Evento):
self.cambios_pendientes.append(evento)
self.aplicar(evento)
def aplicar(self, evento: Evento):
if evento.tipo == "ProductoCreado":
self.nombre = evento.datos["nombre"]
self.precio = evento.datos["precio"]
self.categoria = evento.datos["categoria"]
elif evento.tipo == "StockActualizado":
self.stock = evento.datos["nuevo_stock"]
self.version += 1
def obtener_cambios(self):
return self.cambios_pendientes
def limpiar_cambios(self):
self.cambios_pendientes = []
# Event Store
class EventStore:
def __init__(self):
self.eventos = {}
def guardar_eventos(self, aggregate_id, eventos: List[Evento], version_esperada):
if aggregate_id not in self.eventos:
self.eventos[aggregate_id] = []
# Verificar concurrencia (optimistic concurrency)
if version_esperada != -1 and len(self.eventos[aggregate_id]) != version_esperada:
raise Exception("Concurrency conflict")
for evento in eventos:
self.eventos[aggregate_id].append(evento)
def obtener_eventos(self, aggregate_id):
return self.eventos.get(aggregate_id, [])
def obtener_todos_eventos(self):
todos_eventos = []
for eventos in self.eventos.values():
todos_eventos.extend(eventos)
return sorted(todos_eventos, key=lambda x: x.timestamp)
# Repository para Event Sourcing
class ProductoRepository:
def __init__(self, event_store):
self.event_store = event_store
def guardar(self, producto: ProductoAggregate):
eventos = producto.obtener_cambios()
version_esperada = len(self.event_store.obtener_eventos(producto.producto_id))
self.event_store.guardar_eventos(producto.producto_id, eventos, version_esperada)
producto.limpiar_cambios()
def obtener_por_id(self, producto_id):
eventos = self.event_store.obtener_eventos(producto_id)
if not eventos:
return None
producto = ProductoAggregate(producto_id)
for evento in eventos:
producto.aplicar(evento)
return producto
# Proyecciones (Projections)
class ProductoProjection:
def __init__(self, event_store):
self.event_store = event_store
self.productos = {}
def actualizar(self):
eventos = self.event_store.obtener_todos_eventos()
for evento in eventos:
if evento.tipo == "ProductoCreado":
self.productos[evento.datos["producto_id"]] = {
"nombre": evento.datos["nombre"],
"precio": evento.datos["precio"],
"categoria": evento.datos["categoria"],
"stock": 0
}
elif evento.tipo == "StockActualizado":
if evento.datos["producto_id"] in self.productos:
self.productos[evento.datos["producto_id"]]["stock"] = evento.datos["nuevo_stock"]
def obtener_todos(self):
return list(self.productos.values())
def obtener_por_categoria(self, categoria):
return [p for p in self.productos.values() if p["categoria"] == categoria]
# Demo Event Sourcing
def demo_event_sourcing():
# Configurar
event_store = EventStore()
producto_repo = ProductoRepository(event_store)
projection = ProductoProjection(event_store)
print("=== Event Sourcing Demo ===")
# Crear producto con eventos
producto = ProductoAggregate()
producto.crear("Laptop Gaming", 1200.00, "tecnologia", 10)
producto_repo.guardar(producto)
print(f"Producto creado: {producto.nombre}")
print(f"Stock inicial: {producto.stock}")
# Actualizar stock con eventos
producto.actualizar_stock(5)
producto_repo.guardar(producto)
print(f"Stock después de agregar 5: {producto.stock}")
# Cargar desde eventos
print("\n=== Cargando desde Event Store ===")
producto_cargado = producto_repo.obtener_por_id(producto.producto_id)
print(f"Producto cargado: {producto_cargado.nombre}")
print(f"Stock cargado: {producto_cargado.stock}")
# Usar proyección
print("\n=== Usando Proyección ===")
projection.actualizar()
productos = projection.obtener_todos()
print(f"Productos en proyección: {len(productos)}")
for p in productos:
print(f" - {p['nombre']}: ${p['precio']} (Stock: {p['stock']})")
# Mostrar todos los eventos
print("\n=== Todos los Eventos ===")
eventos = event_store.obtener_todos_eventos()
for evento in eventos:
print(f"{evento.tipo}: {evento.datos}")
if __name__ == "__main__":
demo_event_sourcing()
Paso 7: Saga Pattern - Transacciones distribuidas
El patrón Saga maneja transacciones distribuidas de larga duración mediante una secuencia de transacciones locales.
Implementación básica
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Callable
from dataclasses import dataclass
import uuid
from enum import Enum
import time
class EstadoSaga(Enum):
INICIADA = "INICIADA"
EN_PROGRESO = "EN_PROGRESO"
COMPLETADA = "COMPLETADA"
COMPENSANDO = "COMPENSANDO"
COMPENSADA = "COMPENSADA"
FALLIDA = "FALLIDA"
@dataclass
class Paso
Saga:
id: str
tipo: str
pasos: List['PasoSaga']
estado: EstadoSaga
datos: Dict[str, Any]
fecha_creacion: float
fecha_actualizacion: float
@dataclass
class PasoSaga:
nombre: str
accion: Callable[[Dict[str, Any]], Any]
compensacion: Callable[[Dict[str, Any]], Any]
orden: int
class SagaManager:
def __init__(self):
self.sagas = {}
self.pasos_registrados = {}
def registrar_paso(self, nombre: str, accion: Callable, compensacion: Callable):
self.pasos_registrados[nombre] = PasoSaga(nombre, accion, compensacion, len(self.pasos_registrados))
def iniciar_saga(self, tipo: str, datos_iniciales: Dict[str, Any]) -> Saga:
saga_id = str(uuid.uuid4())
saga = Saga(
id=saga_id,
tipo=tipo,
pasos=[],
estado=EstadoSaga.INICIADA,
datos=datos_iniciales,
fecha_creacion=time.time(),
fecha_actualizacion=time.time()
)
self.sagas[saga_id] = saga
return saga
def agregar_paso(self, saga_id: str, nombre_paso: str):
saga = self.sagas[saga_id]
paso = self.pasos_registrados.get(nombre_paso)
if not paso:
raise ValueError(f"Paso '{nombre_paso}' no registrado")
# Crear nueva instancia del paso para esta saga
nuevo_paso = PasoSaga(
nombre=paso.nombre,
accion=paso.accion,
compensacion=paso.compensacion,
orden=len(saga.pasos)
)
saga.pasos.append(nuevo_paso)
saga.fecha_actualizacion = time.time()
def ejecutar_saga(self, saga_id: str):
saga = self.sagas[saga_id]
saga.estado = EstadoSaga.EN_PROGRESO
saga.fecha_actualizacion = time.time()
pasos_ejecutados = []
try:
for i, paso in enumerate(saga.pasos):
print(f"🏃 Ejecutando paso {i+1}: {paso.nombre}")
# Ejecutar acción
resultado = paso.accion(saga.datos)
saga.datos.update(resultado or {})
pasos_ejecutados.append(paso)
time.sleep(0.1) # Simular procesamiento
saga.estado = EstadoSaga.COMPLETADA
print("✅ Saga completada exitosamente")
except Exception as e:
print(f"❌ Error en saga: {e}")
saga.estado = EstadoSaga.FALLIDA
self.compensar_saga(saga, pasos_ejecutados)
saga.fecha_actualizacion = time.time()
return saga
def compensar_saga(self, saga: Saga, pasos_ejecutados: List[PasoSaga]):
saga.estado = EstadoSaga.COMPENSANDO
print("🔄 Compensando saga...")
# Compensar en orden inverso
for paso in reversed(pasos_ejecutados):
try:
print(f"↩️ Compensando paso: {paso.nombre}")
paso.compensacion(saga.datos)
time.sleep(0.1) # Simular compensación
except Exception as e:
print(f"⚠️ Error compensando paso {paso.nombre}: {e}")
saga.estado = EstadoSaga.COMPENSADA
print("✅ Saga compensada")
# Servicios para la saga
class ServicioInventario:
def reservar_stock(self, datos):
producto_id = datos["producto_id"]
cantidad = datos["cantidad"]
print(f"📦 Reservando {cantidad} unidades del producto {producto_id}")
# Simular fallo aleatorio
if random.random() < 0.3:
raise Exception("Error al reservar stock")
return {"stock_reservado": True}
def compensar_reserva(self, datos):
producto_id = datos["producto_id"]
cantidad = datos["cantidad"]
print(f"↩️ Liberando reserva de {cantidad} unidades del producto {producto_id}")
class ServicioPagos:
def procesar_pago(self, datos):
usuario_id = datos["usuario_id"]
monto = datos["monto"]
print(f"💳 Procesando pago de ${monto} para usuario {usuario_id}")
# Simular fallo aleatorio
if random.random() < 0.3:
raise Exception("Error al procesar pago")
return {"pago_procesado": True, "id_transaccion": str(uuid.uuid4())}
def compensar_pago(self, datos):
id_transaccion = datos.get("id_transaccion")
print(f"↩️ Reembolsando pago {id_transaccion}")
class ServicioEnvio:
def programar_envio(self, datos):
usuario_id = datos["usuario_id"]
direccion = datos["direccion"]
print(f"🚚 Programando envío para usuario {usuario_id} a {direccion}")
return {"envio_programado": True, "numero_guia": str(uuid.uuid4())}
def cancelar_envio(self, datos):
numero_guia = datos.get("numero_guia")
print(f"↩️ Cancelando envío {numero_guia}")
# Demo Saga Pattern
def demo_saga_pattern():
# Configurar servicios
inventario = ServicioInventario()
pagos = ServicioPagos()
envio = ServicioEnvio()
# Configurar saga manager
manager = SagaManager()
# Registrar pasos
manager.registrar_paso(
"reservar_stock",
lambda datos: inventario.reservar_stock(datos),
lambda datos: inventario.compensar_reserva(datos)
)
manager.registrar_paso(
"procesar_pago",
lambda datos: pagos.procesar_pago(datos),
lambda datos: pagos.compensar_pago(datos)
)
manager.registrar_paso(
"programar_envio",
lambda datos: envio.programar_envio(datos),
lambda datos: envio.cancelar_envio(datos)
)
# Datos iniciales
datos_pedido = {
"usuario_id": "user_123",
"producto_id": "prod_456",
"cantidad": 2,
"monto": 99.99,
"direccion": "Calle Principal 123"
}
# Crear y configurar saga
saga = manager.iniciar_saga("procesar_pedido", datos_pedido)
manager.agregar_paso(saga.id, "reservar_stock")
manager.agregar_paso(saga.id, "procesar_pago")
manager.agregar_paso(saga.id, "programar_envio")
# Ejecutar saga
print("=== Iniciando Saga de Procesamiento de Pedido ===")
resultado = manager.ejecutar_saga(saga.id)
print(f"\nEstado final: {resultado.estado.value}")
print(f"Datos finales: {resultado.datos}")
if __name__ == "__main__":
demo_saga_pattern()
Paso 8: Caso de estudio - Sistema de e-commerce
Vamos a crear un sistema completo de e-commerce usando múltiples patrones avanzados.
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime
import uuid
import time
# Domain Models
@dataclass
class Producto:
id: str
nombre: str
precio: float
stock: int
categoria: str
@dataclass
class Usuario:
id: str
nombre: str
email: str
direccion: str
@dataclass
class Pedido:
id: str
usuario_id: str
items: List[Dict[str, Any]]
total: float
estado: str
fecha_creacion: datetime
# Repositories
class RepositorioProductos:
def __init__(self):
self.productos = {}
def guardar(self, producto: Producto) -> Producto:
self.productos[producto.id] = producto
return producto
def obtener_por_id(self, id: str) -> Optional[Producto]:
return self.productos.get(id)
def obtener_todos(self) -> List[Producto]:
return list(self.productos.values())
def actualizar_stock(self, id: str, cantidad: int) -> bool:
if id in self.productos:
self.productos[id].stock += cantidad
return True
return False
class RepositorioUsuarios:
def __init__(self):
self.usuarios = {}
def guardar(self, usuario: Usuario) -> Usuario:
self.usuarios[usuario.id] = usuario
return usuario
def obtener_por_id(self, id: str) -> Optional[Usuario]:
return self.usuarios.get(id)
def obtener_por_email(self, email: str) -> Optional[Usuario]:
for usuario in self.usuarios.values():
if usuario.email == email:
return usuario
return None
class RepositorioPedidos:
def __init__(self):
self.pedidos = {}
def guardar(self, pedido: Pedido) -> Pedido:
self.pedidos[pedido.id] = pedido
return pedido
def obtener_por_id(self, id: str) -> Optional[Pedido]:
return self.pedidos.get(id)
def obtener_por_usuario(self, usuario_id: str) -> List[Pedido]:
return [p for p in self.pedidos.values() if p.usuario_id == usuario_id]
# Services
class ServicioCatalogo:
def __init__(self, repositorio: RepositorioProductos):
self.repositorio = repositorio
def crear_producto(self, nombre: str, precio: float, categoria: str, stock: int = 0) -> Producto:
producto = Producto(
id=str(uuid.uuid4()),
nombre=nombre,
precio=precio,
stock=stock,
categoria=categoria
)
return self.repositorio.guardar(producto)
def listar_productos(self, categoria: Optional[str] = None) -> List[Producto]:
productos = self.repositorio.obtener_todos()
if categoria:
productos = [p for p in productos if p.categoria == categoria]
return productos
def obtener_producto(self, id: str) -> Optional[Producto]:
return self.repositorio.obtener_por_id(id)
class ServicioUsuarios:
def __init__(self, repositorio: RepositorioUsuarios):
self.repositorio = repositorio
def registrar_usuario(self, nombre: str, email: str, direccion: str) -> Usuario:
# Verificar si ya existe
if self.repositorio.obtener_por_email(email):
raise ValueError("Email ya registrado")
usuario = Usuario(
id=str(uuid.uuid4()),
nombre=nombre,
email=email,
direccion=direccion
)
return self.repositorio.guardar(usuario)
def obtener_usuario(self, id: str) -> Optional[Usuario]:
return self.repositorio.obtener_por_id(id)
class ServicioPedidos:
def __init__(self,
repositorio_pedidos: RepositorioPedidos,
repositorio_productos: RepositorioProductos,
repositorio_usuarios: RepositorioUsuarios):
self.repositorio_pedidos = repositorio_pedidos
self.repositorio_productos = repositorio_productos
self.repositorio_usuarios = repositorio_usuarios
def crear_pedido(self, usuario_id: str, items: List[Dict[str, Any]]) -> Pedido:
# Verificar usuario
usuario = self.repositorio_usuarios.obtener_por_id(usuario_id)
if not usuario:
raise ValueError("Usuario no encontrado")
# Verificar productos y calcular total
total = 0
for item in items:
producto = self.repositorio_productos.obtener_por_id(item["producto_id"])
if not producto:
raise ValueError(f"Producto {item['producto_id']} no encontrado")
if producto.stock < item["cantidad"]:
raise ValueError(f"Stock insuficiente para {producto.nombre}")
total += producto.precio * item["cantidad"]
# Crear pedido
pedido = Pedido(
id=str(uuid.uuid4()),
usuario_id=usuario_id,
items=items,
total=total,
estado="creado",
fecha_creacion=datetime.now()
)
# Actualizar stock
for item in items:
self.repositorio_productos.actualizar_stock(
item["producto_id"], -item["cantidad"]
)
return self.repositorio_pedidos.guardar(pedido)
def cancelar_pedido(self, pedido_id: str) -> bool:
pedido = self.repositorio_pedidos.obtener_por_id(pedido_id)
if not pedido or pedido.estado == "cancelado":
return False
# Revertir stock
for item in pedido.items:
self.repositorio_productos.actualizar_stock(
item["producto_id"], item["cantidad"]
)
pedido.estado = "cancelado"
self.repositorio_pedidos.guardar(pedido)
return True
# API Gateway
class ECommerceGateway:
def __init__(self,
servicio_catalogo: ServicioCatalogo,
servicio_usuarios: ServicioUsuarios,
servicio_pedidos: ServicioPedidos):
self.servicio_catalogo = servicio_catalogo
self.servicio_usuarios = servi
cio_usuarios
self.servicio_pedidos = servicio_pedidos
def manejar_request(self, metodo: str, ruta: str, datos: Dict[str, Any]) -> Dict[str, Any]:
try:
if metodo == "GET" and ruta == "/productos":
categoria = datos.get("categoria")
productos = self.servicio_catalogo.listar_productos(categoria)
return {"status": 200, "data": [vars(p) for p in productos]}
elif metodo == "POST" and ruta == "/usuarios":
usuario = self.servicio_usuarios.registrar_usuario(
datos["nombre"], datos["email"], datos["direccion"]
)
return {"status": 201, "data": vars(usuario)}
elif metodo == "POST" and ruta == "/pedidos":
pedido = self.servicio_pedidos.crear_pedido(
datos["usuario_id"], datos["items"]
)
return {"status": 201, "data": vars(pedido)}
elif metodo == "POST" and ruta == "/pedidos/cancelar":
exito = self.servicio_pedidos.cancelar_pedido(datos["pedido_id"])
return {"status": 200, "data": {"cancelado": exito}}
else:
return {"status": 404, "error": "Ruta no encontrada"}
except Exception as e:
return {"status": 400, "error": str(e)}
# Demo del sistema completo
def demo_ecommerce():
# Configurar repositorios
repo_productos = RepositorioProductos()
repo_usuarios = RepositorioUsuarios()
repo_pedidos = RepositorioPedidos()
# Configurar servicios
servicio_catalogo = ServicioCatalogo(repo_productos)
servicio_usuarios = ServicioUsuarios(repo_usuarios)
servicio_pedidos = ServicioPedidos(repo_pedidos, repo_productos, repo_usuarios)
# Configurar gateway
gateway = ECommerceGateway(servicio_catalogo, servicio_usuarios, servicio_pedidos)
# Crear productos
producto1 = servicio_catalogo.crear_producto("Laptop Gaming", 1200.00, "tecnologia", 10)
producto2 = servicio_catalogo.crear_producto("Mouse Inalámbrico", 45.99, "tecnologia", 20)
producto3 = servicio_catalogo.crear_producto("Silla Ergonómica", 299.99, "muebles", 5)
print("🛍️ Productos creados:")
for p in [producto1, producto2, producto3]:
print(f" - {p.nombre}: ${p.precio} (Stock: {p.stock})")
# Registrar usuario
usuario = servicio_usuarios.registrar_usuario(
"Ana García", "[email protected]", "Calle Principal 123"
)
print(f"\n👤 Usuario registrado: {usuario.nombre} ({usuario.email})")
# Crear pedido a través del gateway
response = gateway.manejar_request("POST", "/pedidos", {
"usuario_id": usuario.id,
"items": [
{"producto_id": producto1.id, "cantidad": 1},
{"producto_id": producto2.id, "cantidad": 2}
]
})
if response["status"] == 201:
pedido = response["data"]
print(f"\n📦 Pedido creado exitosamente:")
print(f" ID: {pedido['id']}")
print(f" Total: ${pedido['total']}")
print(f" Estado: {pedido['estado']}")
else:
print(f"\n❌ Error al crear pedido: {response['error']}")
# Mostrar estado actual
print(f"\n📊 Estado actual:")
print(f" Productos en stock: {len(servicio_catalogo.listar_productos())}")
print(f" Usuarios registrados: {len(repo_usuarios.usuarios)}")
print(f" Pedidos realizados: {len(repo_pedidos.pedidos)}")
# Mostrar stock actualizado
print(f"\n📦 Stock actualizado:")
for producto in servicio_catalogo.listar_productos():
print(f" {producto.nombre}: {producto.stock} unidades")
if __name__ == "__main__":
demo_ecommerce()
Paso 9: Consideraciones de rendimiento y escalabilidad
Patrones para alta escalabilidad
# Caching con Redis/Memcached
class Cache:
def __init__(self):
self.cache = {}
def get(self, key: str, default=None):
return self.cache.get(key, default)
def set(self, key: str, value, ttl: int = 300):
self.cache[key] = value
# En un caso real, aquí se manejaría TTL
def delete(self, key: str):
if key in self.cache:
del self.cache[key]
# Load Balancer
class LoadBalancer:
def __init__(self, servidores: List[str]):
self.servidores = servidores
self.indice = 0
def siguiente_servidor(self) -> str:
servidor = self.servidores[self.indice]
self.indice = (self.indice + 1) % len(self.servidores)
return servidor
# Database Sharding
class ShardManager:
def __init__(self, shards: List[Any]):
self.shards = shards
def obtener_shard(self, clave: str) -> Any:
# Hash simple para determinar shard
hash_val = hash(clave) % len(self.shards)
return self.shards[hash_val]
# Message Queue para procesamiento asíncrono
class MessageQueue:
def __init__(self):
self.colas = {}
def publicar(self, topico: str, mensaje: Any):
if topico not in self.colas:
self.colas[topico] = []
self.colas[topico].append(mensaje)
def consumir(self, topico: str) -> Any:
if topico in self.colas and self.colas[topico]:
return self.colas[topico].pop(0)
return None
# Ejemplo de sistema escalable
class SistemaEscalable:
def __init__(self):
self.cache = Cache()
self.load_balancer = LoadBalancer(["server1", "server2", "server3"])
self.message_queue = MessageQueue()
# Shards de base de datos
self.shard_manager = ShardManager(["shard1", "shard2", "shard3"])
def procesar_solicitud(self, usuario_id: str, accion: str, datos: Any):
# Verificar cache primero
cache_key = f"{usuario_id}:{accion}"
cached = self.cache.get(cache_key)
if cached:
return cached
# Balancear carga
servidor = self.load_balancer.siguiente_servidor()
# Determinar shard
shard = self.shard_manager.obtener_shard(usuario_id)
# Procesar (simulado)
resultado = f"Procesado en {servidor} usando {shard}"
# Cachear resultado
self.cache.set(cache_key, resultado)
# Publicar en cola para procesamiento asíncrono
self.message_queue.publicar("logs", {
"usuario_id": usuario_id,
"accion": accion,
"timestamp": time.time()
})
return resultado
# Demo de escalabilidad
def demo_escalabilidad():
sistema = SistemaEscalable()
# Simular múltiples solicitudes
usuarios = [f"user{i}" for i in range(10)]
acciones = ["read", "write", "update", "delete"]
for i in range(20):
usuario = usuarios[i % len(usuarios)]
accion = acciones[i % len(acciones)]
resultado = sistema.procesar_solicitud(usuario, accion, f"data{i}")
print(f"Solicitud {i+1}: {resultado}")
# Simular procesamiento asíncrono
log = sistema.message_queue.consumir("logs")
if log:
print(f" 📋 Log procesado: {log['usuario_id']} - {log['accion']}")
if __name__ == "__main__":
demo_escalabilidad()
Conclusión
¡Has dominado los patrones avanzados y aplicaciones del mundo real! Estos patrones te permiten construir sistemas escalables, resilientes y mantenibles que pueden manejar la complejidad de aplicaciones empresariales modernas.
Recuerda que cada patrón tiene su lugar y propósito. La clave está en entender cuándo y cómo aplicar cada uno según las necesidades específicas de tu proyecto.
Para más tutoriales sobre arquitectura avanzada y patrones de diseño, visita nuestra sección de tutoriales.
¡Sigue practicando y aplicando estos patrones en proyectos reales!
💡 Tip Importante
📝 Mejores Prácticas para Patrones Avanzados
- Empieza simple: No implementes patrones complejos prematuramente
- Mide el impacto: Evalúa si un patrón realmente resuelve un problema
- Considera el costo: Algunos patrones añaden complejidad significativa
- Documenta decisiones: Explica por qué elegiste ciertos patrones
- Mantén la coherencia: Usa patrones consistentemente en todo el sistema
- Prueba exhaustivamente: Los sistemas distribuidos requieren testing riguroso
- Monitorea en producción: Supervisa el comportamiento real de los patrones
- Aprende de fallos: Los patrones de resiliencia se prueban mejor con fallos reales
- Evoluciona iterativamente: Refactoriza y mejora la arquitectura con el tiempo
- Mantén la simplicidad: El mejor patrón es a menudo el más simple que funciona
📚 Recursos Recomendados:
- Designing Data-Intensive Applications - Martin Kleppmann
- Building Microservices - Sam Newman
- Domain-Driven Design - Eric Evans
- Patterns of Enterprise Application Architecture - Martin Fowler
- Release It! - Michael T. Nygard
¡Estos recursos te ayudarán a profundizar en arquitecturas empresariales avanzadas!
No hay comentarios aún
Sé el primero en comentar este tutorial.