I am building a scraper/processor pipeline that iterates over a large list of URLs (~10k). For each URL, I perform the following steps asynchronously: Scrape HTML using playwright.async_api. Process content. Send a prompt to an LLM API (Groq) using an async client. Save the result to a JSON file. I am using asyncio.Semaphore to limit concurrency (e.g., 10 workers). The script runs perfectly for the first 50-100 items, but eventually, it just freezes (hangs) indefinitely. The progress bar (tqdm) stops updating. No exception is raised (I have try/except blocks). CPU usage is still up.
I wonder what could be the cause since the script started perfectly until a certain point before
Here is a version of my code reproducing the architecture:
import os
import json
import asyncio
from groq import Groq, RateLimitError,AsyncGroq
from tqdm.asyncio import tqdm as async_tqdm
import aiofiles
from utils import get_html_with_playwright,simplify_content,need_playwright
import httpx
from prompt import build_task_prompt
from playwright.async_api import async_playwright
# here go some constants
try:
# developer key = ""
# free tier = ""
client = Groq(api_key="")
except Exception as e:
print(f"ERREUR: Impossible d'initialiser le client Groq. Assurez-vous que la variable d'environnement GROQ_API_KEY est définie.")
exit()
async def fetch_site_and_generate_tasks(persona, url):
final_html = None
#Essai rapide avec httpx
# try:
# async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client_http:
# headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'}
# response = await client_http.get(url, headers=headers)
# response.raise_for_status()
# html_from_httpx = response.text
# if not need_playwright(html_from_httpx, response.headers):
# final_html = html_from_httpx
# else:
# pass
# except Exception:
# pass
# # Fallback sur Playwright si nécessaire
# if final_html is None:
# final_html = await get_html_with_playwright(url)
# if not final_html: return None
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',
java_script_enabled=True,
viewport={'width': 1920, 'height': 1080},
# args=['--disable-gpu', '--no-sandbox', '--disable-dev-shm-usage']
)
page = await context.new_page()
await page.goto(url, timeout=PAGE_TIMEOUT, wait_until='domcontentloaded')
html_content = await page.content()
await browser.close()
except Exception:
return None
# try:
# # On simule un vrai navigateur via les headers pour éviter d'être bloqué
# headers = {
# 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',
# 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
# 'Accept-Language': 'fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7'
# }
# async with httpx.AsyncClient(follow_redirects=True, timeout=PAGE_TIMEOUT, verify=False) as http_client:
# response = await http_client.get(url, headers=headers)
# response.raise_for_status()
# html_content = response.text
# except httpx.RequestError as e:
# print(f"[INFO] Erreur de connexion sur {url}: {e}")
# return None
# except httpx.HTTPStatusError as e:
# print(f"[INFO] Erreur HTTP {e.response.status_code} sur {url}")
# return None
# except Exception:
# return None
# if not html_content:
# return None
simplified_html = simplify_content(final_html)
if not simplified_html: return None
prompt = build_task_prompt(persona, simplified_html[:6000], url)
print(f"[DEBUG] Prompt envoyé : {prompt}")
for _ in range(API_RETRY_ATTEMPTS):
try:
loop = asyncio.get_running_loop()
chat_completion = await loop.run_in_executor(
None,
lambda: client.chat.completions.create(
messages=[{"role": "user", "content": prompt}],
model="llama-3.3-70b-versatile",
temperature=0.2,
response_format={"type": "json_object"},
)
)
# chat_completion = await client.chat.completions.create(
# messages=[{"role": "user", "content": prompt}],
# model="llama-3.3-70b-versatile",
# temperature=0.2,
# response_format={"type": "json_object"},
# # timeout=60.0
# )
print(f"[DEBUG] Réponse reçue du modèle.")
if chat_completion.choices and chat_completion.choices[0].message and chat_completion.choices[0].message.content:
print(f"[DEBUG] Réponse brute : {chat_completion.choices[0].message.content}")
response_text = chat_completion.choices[0].message.content.strip()
return json.loads(response_text)
except (RateLimitError, Exception) as e:
print(e)
err = str(e)
import re
m = re.search(r"try again in ([0-9\.]+)ms", err)
if m:
delay = float(m.group(1)) / 1000.0
# await asyncio.sleep(delay)
else:
delay = 0.6
await asyncio.sleep(delay=delay)
return None
async def process_site_for_persona(persona, url, pbar, file_lock):
"""Traite un seul site pour un persona et sauvegarde le résultat."""
persona_id = persona['id']
safe_filename = url.replace('https://', '').replace('http://', '').replace('/', '_') + '.json'
persona_dir = os.path.join(TASKS_DIR, persona_id)
task_file_path = os.path.join(persona_dir, safe_filename)
if os.path.exists(task_file_path):
pbar.update(1)
return
tasks_json = await fetch_site_and_generate_tasks(persona, url)
print(f"[DEBUG] Tâches générées : {tasks_json}")
if tasks_json and 'taches' in tasks_json:
os.makedirs(persona_dir, exist_ok=True)
# async with file_lock:
async with aiofiles.open(task_file_path, 'w', encoding='utf-8') as f:
await f.write(json.dumps(tasks_json, indent=2, ensure_ascii=False))
pbar.update(1)
async def main():
"""Fonction principale pour orchestrer la génération de tâches."""
if not os.path.exists(TASKS_DIR):
os.makedirs(TASKS_DIR)
try:
with open(PERSONAS_FILE, 'r', encoding='utf-8') as f:
personas = json.load(f)
with open(NAVIGATION_FILE, 'r', encoding='utf-8') as f:
navigation_lists = json.load(f)
except FileNotFoundError as e:
print(f"ERREUR: Fichier introuvable - {e}.")
return
persona_map = {p['id']: p for p in personas}
# liste de toutes les paires (persona, url) à traiter
all_pairs = []
for persona_id, sites in navigation_lists.items():
if persona_id in persona_map:
for site_url in sites:
all_pairs.append((persona_map[persona_id], site_url))
print(f"Total de {len(all_pairs)} paires (persona, site) à traiter.")
all_pairs = all_pairs[50660:60000]
semaphore = asyncio.Semaphore(CONCURRENT_WORKERS)
file_lock = asyncio.Lock()
async def worker(pair):
async with semaphore:
await process_site_for_persona(pair[0], pair[1], pbar, file_lock)
with async_tqdm(total=len(all_pairs), desc="Génération des Tâches") as pbar:
tasks = [worker(pair) for pair in all_pairs]
await asyncio.gather(*tasks)
print("\nProcessus de génération de tâches terminé.")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nInterruption manuelle. Le script s'arrête.")
urlis defined, and no list of 10k URLs is provided (I suspect not that many are needed to trigger the issue). Are you sure 100% of this code is the bare minimum necessary to reproduce the problem?