0

I am building a scraper/processor pipeline that iterates over a large list of URLs (~10k). For each URL, I perform the following steps asynchronously: Scrape HTML using playwright.async_api. Process content. Send a prompt to an LLM API (Groq) using an async client. Save the result to a JSON file. I am using asyncio.Semaphore to limit concurrency (e.g., 10 workers). The script runs perfectly for the first 50-100 items, but eventually, it just freezes (hangs) indefinitely. The progress bar (tqdm) stops updating. No exception is raised (I have try/except blocks). CPU usage is still up.

I wonder what could be the cause since the script started perfectly until a certain point before

Here is a version of my code reproducing the architecture:

import os
import json
import asyncio
from groq import Groq, RateLimitError,AsyncGroq
from tqdm.asyncio import tqdm as async_tqdm
import aiofiles
from utils import get_html_with_playwright,simplify_content,need_playwright
import httpx
from prompt import build_task_prompt
from playwright.async_api import async_playwright


# here go some constants
try:
    # developer key = ""
    # free tier = ""
    client = Groq(api_key="")
except Exception as e:
    print(f"ERREUR: Impossible d'initialiser le client Groq. Assurez-vous que la variable d'environnement GROQ_API_KEY est définie.")
    exit()

async def fetch_site_and_generate_tasks(persona, url):
    final_html = None
    
    #Essai rapide avec httpx
    # try:
    #     async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client_http:
    #         headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'}
    #         response = await client_http.get(url, headers=headers)
    #         response.raise_for_status()
    #         html_from_httpx = response.text
            
    #         if not need_playwright(html_from_httpx, response.headers):
    #             final_html = html_from_httpx
    #         else:
    #             pass 
    # except Exception:
    #     pass 
    
    # #  Fallback sur Playwright si nécessaire
    # if final_html is None:
    #     final_html = await get_html_with_playwright(url)
    
    # if not final_html: return None

    try:
        async with async_playwright() as p:
            browser = await p.chromium.launch(headless=True)
            context = await browser.new_context(
                user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',
                java_script_enabled=True,
                viewport={'width': 1920, 'height': 1080},
                # args=['--disable-gpu', '--no-sandbox', '--disable-dev-shm-usage']

            )
            page = await context.new_page()
            await page.goto(url, timeout=PAGE_TIMEOUT, wait_until='domcontentloaded')
            html_content = await page.content()
            await browser.close()
    except Exception:
        return None
    # try:
    #     # On simule un vrai navigateur via les headers pour éviter d'être bloqué
    #     headers = {
    #         'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',
    #         'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
    #         'Accept-Language': 'fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7'
    #     }

    #     async with httpx.AsyncClient(follow_redirects=True, timeout=PAGE_TIMEOUT, verify=False) as http_client:
    #         response = await http_client.get(url, headers=headers)
    #         response.raise_for_status() 
            
    #         html_content = response.text
            
    # except httpx.RequestError as e:
    #     print(f"[INFO] Erreur de connexion sur {url}: {e}")
    #     return None
    # except httpx.HTTPStatusError as e:
    #     print(f"[INFO] Erreur HTTP {e.response.status_code} sur {url}")
    #     return None
    # except Exception:
    #     return None

    # if not html_content: 
    #     return None
    
    simplified_html = simplify_content(final_html)
    if not simplified_html: return None

    prompt = build_task_prompt(persona, simplified_html[:6000], url)
    print(f"[DEBUG] Prompt envoyé : {prompt}")
    for _ in range(API_RETRY_ATTEMPTS):
        try:
            loop = asyncio.get_running_loop()
            chat_completion = await loop.run_in_executor(
                None,
                lambda: client.chat.completions.create(
                    messages=[{"role": "user", "content": prompt}],
                    model="llama-3.3-70b-versatile",
                    temperature=0.2,
                    response_format={"type": "json_object"},
                )
            )
            # chat_completion = await client.chat.completions.create(
            #     messages=[{"role": "user", "content": prompt}],
            #     model="llama-3.3-70b-versatile", 
            #     temperature=0.2,
            #     response_format={"type": "json_object"},
            #     # timeout=60.0 
            # )
            print(f"[DEBUG] Réponse reçue du modèle.")
            if chat_completion.choices and chat_completion.choices[0].message and chat_completion.choices[0].message.content:
                print(f"[DEBUG] Réponse brute : {chat_completion.choices[0].message.content}")
                response_text = chat_completion.choices[0].message.content.strip()
                return json.loads(response_text)

        except (RateLimitError, Exception) as e:
            print(e)
            err = str(e)
            import re
            m = re.search(r"try again in ([0-9\.]+)ms", err)
            if m:
                delay = float(m.group(1)) / 1000.0
                # await asyncio.sleep(delay)
            else:
                delay = 0.6
            await asyncio.sleep(delay=delay)
    return None


















async def process_site_for_persona(persona, url, pbar, file_lock):
    """Traite un seul site pour un persona et sauvegarde le résultat."""
    persona_id = persona['id']
    safe_filename = url.replace('https://', '').replace('http://', '').replace('/', '_') + '.json'
    persona_dir = os.path.join(TASKS_DIR, persona_id)
    task_file_path = os.path.join(persona_dir, safe_filename)

    if os.path.exists(task_file_path):
        pbar.update(1)
        return

    tasks_json = await fetch_site_and_generate_tasks(persona, url)
    print(f"[DEBUG] Tâches générées : {tasks_json}")
    if tasks_json and 'taches' in tasks_json:
        os.makedirs(persona_dir, exist_ok=True)
        # async with file_lock:
        async with aiofiles.open(task_file_path, 'w', encoding='utf-8') as f:
            await f.write(json.dumps(tasks_json, indent=2, ensure_ascii=False))

    pbar.update(1) 














async def main():
    """Fonction principale pour orchestrer la génération de tâches."""
    if not os.path.exists(TASKS_DIR):
        os.makedirs(TASKS_DIR)

    try:
        with open(PERSONAS_FILE, 'r', encoding='utf-8') as f:
            personas = json.load(f)
        with open(NAVIGATION_FILE, 'r', encoding='utf-8') as f:
            navigation_lists = json.load(f)
    except FileNotFoundError as e:
        print(f"ERREUR: Fichier introuvable - {e}.")
        return

    persona_map = {p['id']: p for p in personas}
    
    #  liste de toutes les paires (persona, url) à traiter
    all_pairs = []
    for persona_id, sites in navigation_lists.items():
        if persona_id in persona_map:
            for site_url in sites:
                all_pairs.append((persona_map[persona_id], site_url))

    print(f"Total de {len(all_pairs)} paires (persona, site) à traiter.")
    all_pairs =  all_pairs[50660:60000]
    semaphore = asyncio.Semaphore(CONCURRENT_WORKERS)
    file_lock = asyncio.Lock()

    async def worker(pair):
        async with semaphore:

             await process_site_for_persona(pair[0], pair[1], pbar, file_lock)

    with async_tqdm(total=len(all_pairs), desc="Génération des Tâches") as pbar:
        tasks = [worker(pair) for pair in all_pairs]
        await asyncio.gather(*tasks)

    print("\nProcessus de génération de tâches terminé.")

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nInterruption manuelle. Le script s'arrête.")
3
  • is this a minimal reproducible example? I don't see where url is defined, and no list of 10k URLs is provided (I suspect not that many are needed to trigger the issue). Are you sure 100% of this code is the bare minimum necessary to reproduce the problem? Commented 2 days ago
  • I removed some constants and variables, but the main logic is still there. The script loads large data, so I focused on the logic and the approach to get help. Commented 2 days ago
  • I can't copy and paste this and run it to see the problem though. The goal here is to strip out anything not needed, but still make it complete (everything defined). I doubt the groq API call is needed--you can probably hardcode in a result, or use a mock API. If you can minimally repro with 1 URL, please hardcode that, rather than a list of 10k URLs we don't have access to. I don't think anyone will be able to help with this in its current form, just by eyeballing ~200 lines of unrunnable code taken out of context. Commented 2 days ago

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.