Objetivo del servicio
En el scraping de sitios web dinámicos (especialmente portales de noticias), los métodos tradicionales basados en XPath o expresiones regulares fallan. La alternativa de usar un navegador headless simplifica la extracción, aunque requiere más recursos hardware. La idea es centralizar el renderizado en un servicio independiente que varios spiders puedan consumir, evitando abrir/cerrar navegadores por cada petición.
Arquitectura propuesta
Se opta por desacoplar el motor de scraping y el servicio de renderizado. El servicio expone endpoints HTTP (basados en aiohttp) que controlan una instancia de Pyppeteer (aunque está en desuso, se menciona Playwright como alternativa futura). Se mantienen varias pestañas (tabs) precargadas en el navegador para reutilizarlas, reduciendo latencia.
Implementación del servidor web
from aiohttp import web
app = web.Application()
app.router.add_view('/render.html', RenderHtmlView)
app.router.add_view('/render.png', RenderPngView)
app.router.add_view('/render.jpeg', RenderJpegView)
app.router.add_view('/render.json', RenderJsonView)
# Las vistas se encargan de recibir datos POST y delegar al motor de navegación
Inicialización del navegador y pool de pestañas
class BrowserInitializer:
async def setup_browser(self, app: web.Application) -> None:
page_count = 4 # configurable
await asyncio.create_task(self._launch_browser())
app["browser"] = self.browser
# Crear pestañas adicionales en paralelo
tasks = [asyncio.create_task(self._create_tab()) for _ in range(page_count - 1)]
await asyncio.gather(*tasks)
# Agrupar en una cola asíncrona
queue = asyncio.Queue(maxsize=page_count + 1)
for page in await self.browser.pages():
await queue.put(page)
app["pages_queue"] = queue
app["screenshot_lock"] = asyncio.Lock()
async def cleanup(self, app: web.Application) -> None:
await self.browser.close()
initializer = BrowserInitializer()
app.on_startup.append(initializer.setup_browser)
app.on_cleanup.append(initializer.cleanup)
Las pestañas se almacenan en una cola y se asignan dinámicamente a cada petición. El bloqueo (screenshot_lock) evita conflictos al capturar pantallas simultáneamente.
Control de tiempo de espera y parada de carga
async def _navigate(self, page: Page, options: PostData) -> dict:
try:
await page.goto(options.url,
waitUntil=options.wait_util,
timeout=options.timeout * 1000)
except asyncio.TimeoutError:
# Detiene la carga sin cerrar la página
await page._client.send("Page.stopLoading")
finally:
page.remove_all_listeners("request")
Se utiliza Page.stopLoading (comando CDP) para forzar la detención cuando la página ya se ha renderizado pero sigue cargando recursos irrelevantes.
Definición de parámetros de petición
from pydantic import BaseModel
from typing import List, Optional
class RenderRequest(BaseModel):
url: str
timeout: float = 30.0
wait_until: str = "domcontentloaded"
wait_extra: float = 0.0
js_before: str = "" # inyectar JS antes de navegar
resource_filters: List[str] = [] # patrones regex para bloquear
load_images: bool = False
block_types: List[str] = ["image", "media"]
use_cache: bool = True
include_cookies: bool = False
include_html: bool = True
include_headers: bool = True
Captura de pantalla (parámetros específicos)
class ScreenshotRequest(RenderRequest):
full_page: bool = False
include_html: bool = False
load_images: bool = True
block_types: List[str] = []
wait_until: str = "networkidle2"
Ejemplos de uso
1. Múltiples pestañas en paralelo
import aiohttp, asyncio, sys
async def fetch(session, delay):
url = f"http://httpbin.org/delay/{delay}"
async with session.get(f"http://127.0.0.1:8080/render.html?url={url}") as resp:
data = await resp.json()
print(f"{url} -> estado {data.get('status')}")
return data
async def main():
headers = {'User-Agent': 'Mozilla/5.0 ...'}
loop = asyncio.get_event_loop()
start = loop.time()
async with aiohttp.ClientSession(headers=headers) as session:
tasks = [asyncio.create_task(fetch(session, i)) for i in range(1,5)]
await asyncio.gather(*tasks)
print(f"Tiempo total: {loop.time()-start:.2f}s")
if __name__ == "__main__":
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())
Se inician 4 peticiones a endpoints con retardos de 1 a 4 segundos; el tiempo total será ~4 segundos gracias al uso de pestañas independientes.
2. Interceptar respuesta AJAX específica
import json, aiohttp, asyncio
async def intercept_ajax(session, url):
payload = {
"url": url,
"xhr_pattern": "/api/", # filtro para capturar peticiones AJAX
"use_cache": False,
"resource_filters": [".png", ".jpg"]
}
async with session.post("http://127.0.0.1:8080/render.json",
data=json.dumps(payload)) as resp:
data = await resp.json()
print(f"Respuestas AJAX: {data}")
return data
async def main():
headers = {'User-Agent': 'Mozilla/5.0 ...'}
async with aiohttp.ClientSession(headers=headers) as session:
await intercept_ajax(session, "https://spa1.scrape.center/")
3. Captura de pantalla con timeout
import base64, aiohttp, asyncio
async def screenshot(session, url, filename):
payload = {
"url": url,
"full_page": False,
"load_images": True,
"use_cache": True,
"wait_extra": 1.0
}
async with session.post("http://127.0.0.1:8080/render.png",
data=json.dumps(payload)) as resp:
result = await resp.json()
if result.get('image'):
with open(filename, 'wb') as f:
f.write(base64.b64decode(result['image']))
print(f"Guardado {filename} ({len(result['image'])} bytes)")
async def main():
headers = {'User-Agent': 'Mozilla/5.0 ...'}
urls = [
"https://www.baidu.com/",
"https://www.toutiao.com/"
]
async with aiohttp.ClientSession(headers=headers) as session:
tasks = [screenshot(session, url, f"img_{i}.png") for i, url in enumerate(urls)]
await asyncio.gather(*tasks)
4. Integración con Scrapy (middleware)
import json, logging
from scrapy.exceptions import NotConfigured
from scrapy.http import Request, Response
class BrowserMiddleware:
def __init__(self, base_url: str):
self.base_url = base_url.rstrip('/')
self.logger = logging.getLogger(__name__)
@classmethod
def from_crawler(cls, crawler):
url = crawler.settings.get('BROWSER_SERVICE_URL')
if not url:
raise NotConfigured
return cls(url)
def process_request(self, request, spider):
opts = request.meta.get('browser_options')
if not opts or request.method != 'GET':
return None
endpoint = opts.get('endpoint', '/render.html')
opts['url'] = request.url
new_body = json.dumps(opts)
new_req = request.replace(
url=f"{self.base_url}{endpoint}",
method='POST',
body=new_body
)
new_req.meta['_original_url'] = request.url
return new_req
def process_response(self, request, response, spider):
original_url = request.meta.get('_original_url')
if not original_url:
return response
try:
data = json.loads(response.text)
except ValueError:
return response.replace(url=original_url, status=500)
new_body = (data.get('text') or data.get('content') or '').encode()
new_headers = data.get('headers', {})
return response.replace(
url=original_url,
status=data['status'],
headers=new_headers,
body=new_body
)
Consideraciones finales
El código anterior es un prototipo funcional que demuestra el concepto. Para producción se recomienda empaquetar el servicio en un contenedor Docker, gestionar el pool de pestañas de manera más robusta y migrar a Playwright (que recibe mantenimiento activo). El repositorio completo (incluyendo configuración Docker) está disponible en GitHub.