
NumPy Avanzado: Técnicas de Alto Rendimiento
Domina las técnicas avanzadas de NumPy: arrays estructurados, optimización de memoria, ufuncs personalizadas y alto rendimiento.
¡Bienvenido al tutorial avanzado de NumPy! Aquí explorarás las técnicas más sofisticadas para maximizar el rendimiento y aprovechar todo el potencial de esta poderosa biblioteca. Estas técnicas son esenciales para aplicaciones de alto rendimiento en ciencia de datos, computación científica y procesamiento de grandes volúmenes de datos.
Objetivo: Aprender técnicas avanzadas de NumPy incluyendo arrays estructurados, optimización de memoria, ufuncs personalizadas y estrategias de alto rendimiento.
Índice
- Arrays Estructurados
- Mapeo de Memoria
- Optimización de Rendimiento
- Ufuncs Personalizadas
- Orden de Memoria (C vs Fortran)
- Técnicas Avanzadas de Indexación
- Vectorización Avanzada
- Integración con Otras Bibliotecas
- Proyecto Práctico - Procesamiento de Datos Masivos
- Próximos Pasos
- Conclusión
Arrays Estructurados
Los arrays estructurados permiten almacenar datos heterogéneos en una estructura similar a una base de datos:
import numpy as np
# Definir tipos de datos para un array estructurado
dt = np.dtype([
('nombre', 'U20'), # String Unicode de 20 caracteres
('edad', 'i4'), # Entero de 4 bytes
('altura', 'f8'), # Float de 8 bytes
('peso', 'f4') # Float de 4 bytes
])
print("Tipo de dato estructurado:")
print(dt)
# Crear array con datos
personas = np.array([
('Ana', 25, 1.65, 58.5),
('Carlos', 30, 1.78, 75.2),
('María', 28, 1.70, 62.1),
('Pedro', 35, 1.82, 80.5)
], dtype=dt)
print("\nArray estructurado:")
print(personas)
Acceso a campos específicos
# Acceder a campos individuales
print("Nombres:", personas['nombre'])
print("Edades:", personas['edad'])
print("Alturas:", personas['altura'])
# Acceder a registros específicos
print("\nPrimer registro:", personas[0])
print("Nombre del segundo:", personas[1]['nombre'])
# Operaciones en campos
print("\nEstadísticas de edad:")
print("Media:", np.mean(personas['edad']))
print("Máxima:", np.max(personas['edad']))
# Cálculo de IMC
imc = personas['peso'] / (personas['altura'] ** 2)
print("\nÍndice de Masa Corporal:", imc)
Arrays estructurados anidados
# Tipo de dato con estructura anidada
dt_anidado = np.dtype([
('id', 'i4'),
('info', [
('nombre', 'U15'),
('edad', 'i2')
]),
('medidas', [
('altura', 'f4'),
('peso', 'f4')
])
])
# Crear datos
datos = np.array([
(1, ('Ana', 25), (1.65, 58.5)),
(2, ('Carlos', 30), (1.78, 75.2))
], dtype=dt_anidado)
print("Array con estructura anidada:")
print(datos)
# Acceder a datos anidados
print("\nNombres:", datos['info']['nombre'])
print("Alturas:", datos['medidas']['altura'])
Mapeo de Memoria
El mapeo de memoria permite trabajar con archivos grandes sin cargarlos completamente en RAM:
# Crear un archivo grande para demostrar
filename = 'datos_grandes.dat'
# Crear archivo con datos
data_to_save = np.random.random((1000, 1000))
data_to_save.tofile(filename)
# Mapear el archivo a memoria
mapped_data = np.memmap(filename, dtype='float64', mode='r', shape=(1000, 1000))
print("Datos mapeados:")
print("Forma:", mapped_data.shape)
print("Tipo:", mapped_data.dtype)
print("Primeros elementos:", mapped_data[0, :5])
# Trabajar con porciones sin cargar todo el archivo
subset = mapped_data[100:200, 200:300]
print("\nSubconjunto:")
print("Forma del subset:", subset.shape)
print("Suma del subset:", np.sum(subset))
Ventajas del mapeo de memoria
# Comparación de uso de memoria
import os
# Archivo grande
large_file = 'archivo_grande.npy'
large_data = np.random.random((5000, 5000))
# Guardar como archivo regular
np.save(large_file, large_data)
print(f"Tamaño del archivo: {os.path.getsize(large_file) / (1024**2):.2f} MB")
# Cargar como array normal (carga todo en RAM)
normal_load = np.load(large_file)
print(f"Memoria usada por array normal: {normal_load.nbytes / (1024**2):.2f} MB")
# Mapear a memoria (no carga todo en RAM)
memory_mapped = np.load(large_file, mmap_mode='r')
print(f"Memoria usada por mapeo: {memory_mapped.nbytes / (1024**2):.2f} MB")
print("Los datos se cargan bajo demanda")
Optimización de Rendimiento
Técnicas para maximizar el rendimiento de NumPy:
Evitar bucles explícitos
# Método ineficiente con bucles
def suma_ineficiente(arr1, arr2):
resultado = np.zeros_like(arr1)
for i in range(len(arr1)):
for j in range(len(arr1[i])):
resultado[i, j] = arr1[i, j] + arr2[i, j]
return resultado
# Método eficiente con vectorización
def suma_eficiente(arr1, arr2):
return arr1 + arr2
# Comparación de rendimiento
arr1 = np.random.random((1000, 1000))
arr2 = np.random.random((1000, 1000))
import time
# Medir tiempo del método ineficiente
start = time.time()
resultado1 = suma_ineficiente(arr1, arr2)
tiempo_ineficiente = time.time() - start
# Medir tiempo del método eficiente
start = time.time()
resultado2 = suma_eficiente(arr1, arr2)
tiempo_eficiente = time.time() - start
print(f"Tiempo con bucles: {tiempo_ineficiente:.4f} segundos")
print(f"Tiempo vectorizado: {tiempo_eficiente:.4f} segundos")
print(f"Aceleración: {tiempo_ineficiente/tiempo_eficiente:.1f}x más rápido")
Optimización de memoria
# Usar tipos de datos apropiados
arr_float64 = np.array([1.0, 2.0, 3.0], dtype=np.float64)
arr_float32 = np.array([1.0, 2.0, 3.0], dtype=np.float32)
print("Memoria float64:", arr_float64.nbytes, "bytes")
print("Memoria float32:", arr_float32.nbytes, "bytes")
print("Ahorro de memoria:", arr_float64.nbytes - arr_float32.nbytes, "bytes")
# Usar vistas en lugar de copias
arr = np.arange(1000000)
# Vista (eficiente)
vista = arr[100:200]
print("Vista - mismo buffer de memoria:", np.shares_memory(arr, vista))
# Copia (menos eficiente)
copia = arr[100:200].copy()
print("Copia - buffer diferente:", np.shares_memory(arr, copia))
Paralelización con NumPy
# NumPy puede aprovechar múltiples núcleos automáticamente
large_arr = np.random.random((5000, 5000))
# Operaciones que se paralelizan automáticamente
start = time.time()
result = np.sum(large_arr)
parallel_time = time.time() - start
print(f"Suma paralela completada en: {parallel_time:.4f} segundos")
# Verificar número de hilos
print(f"Número de hilos disponibles: {np.__config__.show()}")
# Configurar número de hilos (opcional)
import os
os.environ['OMP_NUM_THREADS'] = '4'
print("Hilos configurados a 4")
Ufuncs Personalizadas
Crear funciones universales personalizadas:
from numba import vectorize
# Ufunc personalizada con Numba
@vectorize(['float64(float64, float64)'])
def distancia_euclidiana(x1, y1, x2, y2):
return np.sqrt((x1 - x2)**2 + (y1 - y2)**2)
# Datos de ejemplo
puntos_x1 = np.random.random(100000)
puntos_y1 = np.random.random(100000)
puntos_x2 = np.random.random(100000)
puntos_y2 = np.random.random(100000)
# Calcular distancias
start = time.time()
distancias = distancia_euclidiana(puntos_x1, puntos_y1, puntos_x2, puntos_y2)
tiempo_ufunc = time.time() - start
print(f"Ufunc personalizada completada en: {tiempo_ufunc:.4f} segundos")
print(f"Primeras 5 distancias: {distancias[:5]}")
Ufuncs con NumPy puro
# Crear ufunc personalizada con np.frompyfunc
def mi_funcion(x, y):
return x**2 + y**2 + 2*x*y + 1
# Convertir a ufunc
mi_ufunc = np.frompyfunc(mi_funcion, 2, 1)
# Usar la ufunc
a = np.array([1, 2, 3])
b = np.array([4, 5, 6])
resultado = mi_ufunc(a, b)
print("Resultado de ufunc personalizada:", resultado)
print("Tipo de resultado:", type(resultado[0])) # Los resultados son objetos Python
Orden de Memoria (C vs Fortran)
El orden de memoria afecta el rendimiento de las operaciones:
# Crear arrays con diferentes órdenes de memoria
arr_c = np.random.random((1000, 1000)) # Orden C (por defecto)
arr_f = np.asfortranarray(arr_c) # Orden Fortran
print("Orden C - flags.c_contiguous:", arr_c.flags.c_contiguous)
print("Orden F - flags.f_contiguous:", arr_f.flags.f_contiguous)
# Medir rendimiento para diferentes órdenes
def suma_filas(arr):
return np.sum(arr, axis=1)
def suma_columnas(arr):
return np.sum(arr, axis=0)
# Para orden C: suma de filas es más eficiente
start = time.time()
result_c_rows = suma_filas(arr_c)
time_c_rows = time.time() - start
start = time.time()
result_c_cols = suma_columnas(arr_c)
time_c_cols = time.time() - start
print(f"\nOrden C - Suma filas: {time_c_rows:.4f}s")
print(f"Orden C - Suma columnas: {time_c_cols:.4f}s")
print(f"Ratio: {time_c_cols/time_c_rows:.2f}x")
# Para orden F: suma de columnas es más eficiente
start = time.time()
result_f_cols = suma_columnas(arr_f)
time_f_cols = time.time() - start
start = time.time()
result_f_rows = suma_filas(arr_f)
time_f_rows = time.time() - start
print(f"\nOrden F - Suma columnas: {time_f_cols:.4f}s")
print(f"Orden F - Suma filas: {time_f_rows:.4f}s")
print(f"Ratio: {time_f_rows/time_f_cols:.2f}x")
Técnicas Avanzadas de Indexación
Indexación sofisticada para manipulación compleja de datos:
Indexación con arrays de coordenadas
# Crear una matriz
matriz = np.arange(25).reshape(5, 5)
print("Matriz:")
print(matriz)
# Extraer elementos en posiciones específicas
filas = np.array([0, 1, 2, 3, 4])
columnas = np.array([0, 1, 2, 3, 4])
diagonal = matriz[filas, columnas]
print("\nDiagonal:", diagonal)
# Extraer patrón específico
filas_patron = np.array([0, 1, 2, 1, 0])
columnas_patron = np.array([0, 1, 2, 3, 4])
patron = matriz[filas_patron, columnas_patron]
print("Patrón personalizado:", patron)
Indexación booleana avanzada
# Datos de ejemplo
datos = np.random.normal(100, 15, 1000)
# Múltiples condiciones
condicion1 = datos > 85
condicion2 = datos < 115
condicion3 = (datos % 2) == 0 # Números pares
# Combinar condiciones
mascara_compleja = condicion1 & condicion2 & condicion3
datos_filtrados = datos[mascara_compleja]
print(f"Datos originales: {len(datos)}")
print(f"Datos filtrados: {len(datos_filtrados)}")
print(f"Porcentaje filtrado: {len(datos_filtrados)/len(datos)*100:.1f}%")
# Usar np.where para asignaciones condicionales complejas
resultado = np.where(
datos > 115, "Alto",
np.where(datos < 85, "Bajo", "Normal")
)
valores_unicos, conteos = np.unique(resultado, return_counts=True)
for valor, conteo in zip(valores_unicos, conteos):
print(f"{valor}: {conteo} ({conteo/len(datos)*100:.1f}%)")
Vectorización Avanzada
Técnicas para vectorizar operaciones complejas:
# Vectorización de operaciones condicionales complejas
def clasificar_puntos(x, y):
"""Clasificar puntos en un plano cartesiano"""
# Calcular distancia al origen
distancia = np.sqrt(x**2 + y**2)
# Calcular ángulo
angulo = np.arctan2(y, x)
# Clasificar basado en distancia y ángulo
clasificacion = np.where(
distancia < 1,
"Centro",
np.where(
(angulo >= 0) & (angulo < np.pi/2),
"Cuadrante 1",
np.where(
(angulo >= np.pi/2) & (angulo < np.pi),
"Cuadrante 2",
np.where(
(angulo >= np.pi) & (angulo < 3*np.pi/2),
"Cuadrante 3",
"Cuadrante 4"
)
)
)
)
return clasificacion
# Generar puntos aleatorios
np.random.seed(42)
x_coords = np.random.uniform(-2, 2, 100)
y_coords = np.random.uniform(-2, 2, 100)
# Clasificar todos los puntos de una vez
clasificaciones = clasificar_puntos(x_coords, y_coords)
# Contar clasificaciones
unique, counts = np.unique(clasificaciones, return_counts=True)
print("Distribución de clasificaciones:")
for clase, count in zip(unique, counts):
print(f"{clase}: {count}")
Broadcasting avanzado
# Broadcasting con arrays de diferentes formas
a = np.array([1, 2, 3]) # (3,)
b = np.array([[10], [20], [30]]) # (3, 1)
print("Array a:", a.shape, a)
print("Array b:")
print(b.shape)
print(b)
# Broadcasting automático
resultado = a + b
print("\nResultado del broadcasting:")
print(resultado.shape)
print(resultado)
# Broadcasting manual para casos complejos
x = np.array([1, 2, 3, 4]) # (4,)
y = np.array([10, 20, 30]) # (3,)
# Usar None para agregar dimensiones
x_broadcast = x[:, None] # (4, 1)
y_broadcast = y[None, :] # (1, 3)
print("\nBroadcasting manual:")
print("x_broadcast:", x_broadcast.shape)
print("y_broadcast:", y_broadcast.shape)
resultado_manual = x_broadcast + y_broadcast
print("Resultado:", resultado_manual.shape)
print(resultado_manual)
Integración con Otras Bibliotecas
NumPy se integra perfectamente con otras bibliotecas científicas:
# Integración con SciPy
from scipy import linalg, stats
# Datos de ejemplo
matriz = np.random.random((100, 100))
# Descomposición SVD con SciPy
U, s, Vt = linalg.svd(matriz)
print("Descomposición SVD completada")
# Estadísticas con SciPy
datos_normales = np.random.normal(0, 1, 1000)
stat, p_value = stats.shapiro(datos_normales)
print(f"Prueba de normalidad - p-valor: {p_value:.4f}")
# Integración con Pandas
import pandas as pd
# Convertir array de NumPy a DataFrame de Pandas
df = pd.DataFrame(matriz[:10, :5], columns=[f'Col_{i}' for i in range(5)])
print("\nDataFrame creado desde NumPy:")
print(df.head())
# Operaciones conjuntas
resultado_numpy = np.mean(matriz, axis=0)
resultado_pandas = df.mean()
print(f"Media NumPy: {resultado_numpy[:5]}")
print(f"Media Pandas: {resultado_pandas.values}")
Proyecto Práctico - Procesamiento de Datos Masivos
Vamos a crear un sistema avanzado de procesamiento de datos:
import numpy as np
import time
from numba import jit
import os
class ProcesadorDatosAvanzado:
def __init__(self, archivo_datos=None):
self.archivo_datos = archivo_datos
self.datos_cargados = False
def generar_datos_masivos(self, filas=1000000, columnas=50):
"""Genera un conjunto de datos masivo para pruebas"""
print(f"Generando {filas:,} filas x {columnas} columnas...")
# Crear diferentes tipos de datos
np.random.seed(42)
# Datos numéricos
datos_numericos = np.random.normal(100, 15, (filas, columnas//2))
# Datos categóricos (convertidos a códigos numéricos)
categorias = np.random.choice([0, 1, 2, 3, 4], (filas, columnas//2))
# Combinar datos
datos_completos = np.hstack([datos_numericos, categorias])
# Guardar en archivo
if self.archivo_datos:
np.save(self.archivo_datos, datos_completos)
print(f"Datos guardados en {self.archivo_datos}")
return datos_completos
def cargar_datos_optimizado(self):
"""Carga datos usando mapeo de memoria para eficiencia"""
if not os.path.exists(self.archivo_datos):
print("Archivo no encontrado, generando datos...")
self.generar_datos_masivos()
return self.cargar_datos_optimizado()
print(f"Cargando datos desde {self.archivo_datos}...")
# Cargar con mapeo de memoria
datos_mapeados = np.load(self.archivo_datos + '.npy', mmap_mode='r')
print(f"Datos cargados: {datos_mapeados.shape}")
print(f"Memoria usada: {datos_mapeados.nbytes / (1024**3):.2f} GB")
self.datos_cargados = True
return datos_mapeados
@staticmethod
@jit(nopython=True)
def procesar_datos_vectorizado(datos):
"""Procesamiento vectorizado optimizado con Numba"""
filas, columnas = datos.shape
# Calcular estadísticas por fila
medias = np.zeros(filas)
desviaciones = np.zeros(filas)
maximos = np.zeros(filas)
minimos = np.zeros(filas)
for i in range(filas):
fila = datos[i, :]
medias[i] = np.mean(fila)
desviaciones[i] = np.std(fila)
maximos[i] = np.max(fila)
minimos[i] = np.min(fila)
return medias, desviaciones, maximos, minimos
def analisis_completo(self, datos):
"""Realiza análisis completo usando técnicas avanzadas"""
print("\n=== ANÁLISIS AVANZADO DE DATOS ===")
# Análisis por columnas
print("Análisis por columnas:")
for i in range(min(10, datos.shape[1])): # Primeras 10 columnas
col = datos[:, i]
print(f"Columna {i}: media={np.mean(col):.2f}, "
f"std={np.std(col):.2f}, "
f"min={np.min(col):.2f}, "
f"max={np.max(col):.2f}")
# Procesamiento vectorizado
print("\nProcesando estadísticas por fila...")
start_time = time.time()
medias, desviaciones, maximos, minimos = self.procesar_datos_vectorizado(datos)
processing_time = time.time() - start_time
print(".4f"
# Análisis de resultados
print("
Resultados del procesamiento:")
print(f"Media global de medias por fila: {np.mean(medias):.2f}")
print(f"Media global de desviaciones: {np.mean(desviaciones):.2f}")
print(f"Valor máximo encontrado: {np.max(maximos):.2f}")
print(f"Valor mínimo encontrado: {np.min(minimos):.2f}")
# Análisis de distribución
print("
Análisis de distribución:")
hist, bins = np.histogram(medias, bins=50)
print(f"Histograma de medias por fila - bins: {len(hist)}")
# Identificar outliers usando IQR
Q1 = np.percentile(medias, 25)
Q3 = np.percentile(medias, 75)
IQR = Q3 - Q1
limite_inferior = Q1 - 1.5 * IQR
limite_superior = Q3 + 1.5 * IQR
outliers = (medias < limite_inferior) | (medias > limite_superior)
print(f"Número de outliers en medias: {np.sum(outliers)}")
return {
'medias': medias,
'desviaciones': desviaciones,
'maximos': maximos,
'minimos': minimos,
'outliers': outliers,
'tiempo_procesamiento': processing_time
}
def optimizacion_memoria(self, datos):
"""Demuestra técnicas de optimización de memoria"""
print("\n=== OPTIMIZACIÓN DE MEMORIA ===")
# Verificar tipo de datos actual
print(f"Tipo de datos actual: {datos.dtype}")
print(f"Memoria usada: {datos.nbytes / (1024**2):.2f} MB")
# Optimizar tipo de datos si es posible
if datos.dtype == np.float64:
datos_optimizados = datos.astype(np.float32)
print(f"Memoria después de float32: {datos_optimizados.nbytes / (1024**2):.2f} MB")
print(f"Ahorro: {(datos.nbytes - datos_optimizados.nbytes) / (1024**2):.2f} MB")
# Comparar uso de vistas vs copias
vista = datos[:1000, :10] # Vista
copia = datos[:1000, :10].copy() # Copia
print(f"Vista comparte memoria: {np.shares_memory(datos, vista)}")
print(f"Copia comparte memoria: {np.shares_memory(datos, copia)}")
return datos_optimizados if datos.dtype == np.float64 else datos
def main():
print("=== PROCESADOR AVANZADO DE DATOS CON NUMPY ===\n")
# Crear procesador
procesador = ProcesadorDatosAvanzado("datos_masivos.npy")
# Cargar o generar datos
datos = procesador.cargar_datos_optimizado()
# Optimización de memoria
datos_optimizados = procesador.optimizacion_memoria(datos)
# Análisis completo
resultados = procesador.analisis_completo(datos_optimizados)
# Resumen final
print("
=== RESUMEN FINAL ===")
print(f"Dataset procesado: {datos.shape[0]:,} filas x {datos.shape[1]} columnas")
print(".4f")
print(f"Outliers detectados: {np.sum(resultados['outliers'])}")
print(".2f")
print(".2f")
if __name__ == "__main__":
main()
Próximos Pasos
¡Felicidades! Has alcanzado el nivel avanzado de NumPy. Ahora puedes:
- NumPy para Data Science: Aplicaciones prácticas en análisis de datos
- Machine Learning con NumPy: Preparación de datos para ML
- Computación Paralela: Usando Dask o multiprocessing con NumPy
- Integración con GPU: Usando CuPy para computación GPU
Tutoriales Recomendados
Conclusión
Las técnicas avanzadas de NumPy que has aprendido son esenciales para aplicaciones de alto rendimiento. Has dominado:
- Arrays estructurados para datos heterogéneos
- Mapeo de memoria para datasets masivos
- Optimización de rendimiento y memoria
- Creación de ufuncs personalizadas
- Técnicas avanzadas de indexación y vectorización
- Integración eficiente con otras bibliotecas
Estas habilidades te permiten procesar datos a escala industrial con rendimiento óptimo. La combinación de NumPy con técnicas avanzadas de optimización te convierte en un experto en computación numérica eficiente.
¡Sigue aplicando estas técnicas en proyectos reales y continúa expandiendo tus conocimientos!
¡Tu expertise en NumPy ha alcanzado niveles avanzados!
💡 Tip Importante
📝 Mejores Prácticas para Alto Rendimiento con NumPy
- Usa tipos de datos apropiados para minimizar uso de memoria
- Aprovecha el mapeo de memoria para datasets grandes
- Vectoriza operaciones siempre que sea posible
- Utiliza broadcasting para operaciones eficientes
- Considera el orden de memoria para optimizar acceso
- Profilea tu código para identificar cuellos de botella
- Usa Numba para funciones críticas de rendimiento
📚 Recursos Recomendados:
¡El rendimiento es ahora tu aliado. Sigue optimizando y escalando tus aplicaciones!
No hay comentarios aún
Sé el primero en comentar este tutorial.