Concurrency & Parallelism
1.1 Asyncio & Concurrency Patterns
Sección titulada «1.1 Asyncio & Concurrency Patterns»Objetivo: Manejar miles de requests concurrentes sin bloquear el event loop.
📖 Teoría Profunda
Sección titulada «📖 Teoría Profunda»El Global Interpreter Lock (GIL)
Sección titulada «El Global Interpreter Lock (GIL)»El GIL es un mutex que protege el acceso a objetos Python, previniendo que múltiples threads ejecuten bytecode Python simultáneamente.
Cuándo se libera el GIL:
- I/O operations (
requests,aiohttp, file operations) time.sleep()yasyncio.sleep()- Operaciones NumPy y computaciones C-level
- Llamadas a bibliotecas externas (C/C++/Rust)
Implicación:
- ❌ Multithreading NO escala en tareas CPU-bound
- ✅ Multithreading SÍ ayuda en tareas I/O-bound
- ✅ Multiprocessing para CPU-bound (pero tiene coste de IPC)
Asyncio Under the Hood
Sección titulada «Asyncio Under the Hood»import asyncio
# Event Loop - El corazón de asyncioasync def main(): # Coroutine - función que puede pausarse result = await fetch_data() # await = pausar aquí, dar control al loop
# Task - coroutine siendo ejecutada task = asyncio.create_task(fetch_data()) # fire and forget result = await task # esperar resultadoDiferencias clave:
await: Pausa la coroutine actual, bloquea hasta que completecreate_task(): Inicia ejecución en background, no bloquea
Race Conditions en Async
Sección titulada «Race Conditions en Async»# ❌ VULNERABLEcounter = 0
async def increment(): global counter temp = counter await asyncio.sleep(0) # Context switch! counter = temp + 1
# ✅ SEGUROlock = asyncio.Lock()
async def increment_safe(): global counter async with lock: temp = counter await asyncio.sleep(0) counter = temp + 1⚔️ Pregunta de Entrevista
Sección titulada «⚔️ Pregunta de Entrevista»“Diseñas un servicio de inferencia. El endpoint recibe una imagen, hace pre-procesado con OpenCV (CPU), llama a API externa de IA (I/O), y guarda en DB (I/O). Diseña la estrategia de concurrencia perfecta.”
Respuesta esperada:
- Pre-procesado (CPU-bound):
ThreadPoolExecutoroProcessPoolExecutor - API call (I/O-bound):
asyncioconaiohttp - DB write (I/O-bound):
asynciocon driver async (e.g.,asyncpg) - Arquitectura: FastAPI async endpoint que:
- Recibe imagen (async)
- Envía pre-procesado a thread pool
- Await resultado, hace API call con
aiohttp - Await resultado, escribe a DB con
asyncpg
🧪 Lab Práctico: “The Async Rate-Limiter”
Sección titulada «🧪 Lab Práctico: “The Async Rate-Limiter”»Objetivo
Sección titulada «Objetivo»Escribir un sistema de requests concurrentes con rate limiting usando solo asyncio.
Requisitos
Sección titulada «Requisitos»- Simular 500 requests HTTP (usa
asyncio.sleep(random.uniform(0.1, 0.5))) - Limitar concurrencia a 10 con
asyncio.Semaphore - Producer-Consumer pattern con
asyncio.Queue:- Producer: genera requests
- Workers (3): consumen de la queue y “procesan”
- Graceful Shutdown: Manejar
KeyboardInterrupt
Código Starter
Sección titulada «Código Starter»import asyncioimport randomfrom typing import List
async def fetch_url(url: str, semaphore: asyncio.Semaphore) -> dict: """Simula request HTTP con rate limiting.""" async with semaphore: print(f"🔄 Fetching {url}") await asyncio.sleep(random.uniform(0.1, 0.5)) return {"url": url, "status": 200, "data": f"Response from {url}"}
async def producer(queue: asyncio.Queue, urls: List[str]): """Produce requests en la queue.""" # TODO: Implementar pass
async def worker(name: str, queue: asyncio.Queue): """Consume y procesa requests.""" # TODO: Implementar pass
async def main(): # TODO: Implementar orquestación pass
if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n⚠️ Shutting down gracefully...")Entregable
Sección titulada «Entregable»- 500 requests completadas
- Nunca más de 10 concurrentes (verifica con logs)
- 3 workers procesando
- Graceful shutdown funciona
- Tiempo total < 15 segundos
📚 Recursos
Sección titulada «📚 Recursos»- asyncio — Asynchronous I/O (Official Docs)
- Real Python - Async IO in Python
- Understanding the Python GIL