0

blockbuster.blockbuster.BlockingError: Blocking call to socket.socket.accept - when using langchain_postgres.v2.vectorstores

I'm customizing chat-langchain repo using PGvectorstore V2

I've been trying with PGVectorStore.create_sync or PGVectorStore.create() or even AsyncPGVectorStore.create() but still got Blocking error, how to fix it?

@contextmanager
def make_pg_retriever(
    configuration: BaseConfiguration, embedding_model: Embeddings
) -> Iterator[BaseRetriever]:
    connection_string = os.environ.get("PGVECTOR_CONNECTION_STRING",)
    engine = PGEngine.from_connection_string(url=connection_string)

    try:
        vectorstore = PGVectorStore.create_sync(
        engine=engine,
        embedding_service=embedding_model,
        table_name=WEAVIATE_DOCS_INDEX_NAME,  # The table must already exist!
        metadata_columns=["source", "title", "description", "language"]
        # ... other optional parameters
    )
        search_kwargs = {**configuration.search_kwargs, "return_uuids": True}
        retriever = vectorstore.as_retriever(search_kwargs=search_kwargs)
        yield retriever  

    finally:
        engine.close()  

or this with corresponding async call

@asynccontextmanager  # Use asynccontextmanager for async operations
async def make_pg_retriever(
    configuration: BaseConfiguration, embedding_model: Embeddings
) -> AsyncIterator[BaseRetriever]:
    # Establish an asynchronous connection pool to PostgreSQL
    # You'll need to configure your PostgreSQL connection string
    # For example, using environment variables
    connection_string = os.environ.get(
        "PGVECTOR_CONNECTION_STRING",
    )
    # engine = await asyncio.to_thread(PGEngine.from_connection_string,url= connection_string)
    engine = await asyncio.to_thread(lambda: PGEngine.from_connection_string(url=connection_string))

    # engine = PGEngine.from_connection_string(
    #     url=connection_string
    # )
    vectorstore = await AsyncPGVectorStore.create(
        engine=engine,
        embedding_service=embedding_model, #why dont use like ingest.py?
        table_name=WEAVIATE_DOCS_INDEX_NAME,  # The table must already exist!
        metadata_columns=["source", "title", "description", "language"]
        # ... other optional parameters
    )

    search_kwargs = {**configuration.search_kwargs}
        # AsyncPGVectorStore's as_retriever might not directly support 'return_uuids'
        # The returned documents will likely have the primary key or a unique identifier
        # as part of their metadata, which you can access.
    yield vectorstore.as_retriever(search_kwargs=search_kwargs)

the error for .create_sync() method

Traceback (most recent call last):
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph_api\worker.py", line 170, in worker
    await asyncio.wait_for(
  File "mylocation\AppData\Local\Programs\Python\Python311\Lib\asyncio\tasks.py", line 489, in wait_for
    return fut.result()
           ^^^^^^^^^^^^
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph_api\stream.py", line 307, in consume
    raise e from g
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph_api\stream.py", line 290, in consume
    async for mode, payload in stream:
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph_api\stream.py", line 225, in astream_state
    event = await wait_if_not_done(anext(stream, sentinel), done)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph_api\asyncio.py", line 89, in wait_if_not_done
    raise e.exceptions[0] from None
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\__init__.py", line 2830, in astream
    async for _ in runner.atick(
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\runner.py", line 400, in atick
    _panic_or_proceed(
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\runner.py", line 507, in _panic_or_proceed
    raise exc
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\retry.py", line 136, in arun_with_retry
    return await task.proc.ainvoke(task.input, config)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\utils\runnable.py", line 672, in ainvoke
    input = await asyncio.create_task(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\utils\runnable.py", line 440, in ainvoke
    ret = await self.afunc(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "mylocation\Downloads\Project\chat-langchain\./backend/retrieval_graph/graph.py", line 181, in conduct_research
    result = await researcher_graph.ainvoke({"question": state.steps[0]})
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\__init__.py", line 2963, in ainvoke
    async for chunk in self.astream(
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\__init__.py", line 2830, in astream
    async for _ in runner.atick(
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\runner.py", line 400, in atick
    _panic_or_proceed(
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\runner.py", line 507, in _panic_or_proceed
    raise exc
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\retry.py", line 136, in arun_with_retry
    return await task.proc.ainvoke(task.input, config)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\utils\runnable.py", line 672, in ainvoke
    input = await asyncio.create_task(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\utils\runnable.py", line 440, in ainvoke
    ret = await self.afunc(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "mylocation\Downloads\Project\chat-langchain\backend\retrieval_graph\researcher_graph\graph.py", line 70, in retrieve_documents
    with retrieval.make_retriever(config) as retriever:
  File "mylocation\AppData\Local\Programs\Python\Python311\Lib\contextlib.py", line 137, in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
  File "mylocation\Downloads\Project\chat-langchain\backend\retrieval.py", line 60, in make_retriever
    with make_pg_retriever(configuration, embedding_model) as retriever:
  File "mylocation\AppData\Local\Programs\Python\Python311\Lib\contextlib.py", line 137, in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
  File "mylocation\Downloads\Project\chat-langchain\backend\retrieval.py", line 33, in make_pg_retriever
    engine = PGEngine.from_connection_string(url=connection_string)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langchain_postgres\v2\engine.py", line 104, in from_connection_string
    cls._default_loop = asyncio.new_event_loop()
                        ^^^^^^^^^^^^^^^^^^^^^^^^
  File "mylocation\AppData\Local\Programs\Python\Python311\Lib\asyncio\events.py", line 810, in new_event_loop
    return get_event_loop_policy().new_event_loop()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "mylocation\AppData\Local\Programs\Python\Python311\Lib\asyncio\events.py", line 699, in new_event_loop
    return self._loop_factory()
           ^^^^^^^^^^^^^^^^^^^^
  File "mylocation\AppData\Local\Programs\Python\Python311\Lib\asyncio\selector_events.py", line 56, in __init__
    self._make_self_pipe()
  File "mylocation\AppData\Local\Programs\Python\Python311\Lib\asyncio\selector_events.py", line 107, in _make_self_pipe
    self._ssock, self._csock = socket.socketpair()
                               ^^^^^^^^^^^^^^^^^^^
  File "mylocation\AppData\Local\Programs\Python\Python311\Lib\socket.py", line 645, in socketpair
    ssock, _ = lsock.accept()
               ^^^^^^^^^^^^^^
  File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\blockbuster\blockbuster.py", line 109, in wrapper
    raise BlockingError(func_name)
blockbuster.blockbuster.BlockingError: Blocking call to socket.socket.accept

the error come from

engine = PGEngine.from_connection_string(url=connection_string)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

and the recommend is this but i have try many way and not working. (except option 3)

Here are your options to fix this:

1. Best approach: Convert any blocking code to use async/await patterns
   For example, use 'await aiohttp.get()' instead of 'requests.get()'

2. Quick fix: Move blocking operations to a separate thread
   Example: 'await asyncio.to_thread(your_blocking_function)'

3. Override (if you can't change the code):
   - For development: Run 'langgraph dev --allow-blocking'
   - For deployment: Set 'BG_JOB_ISOLATED_LOOPS=true' environment variable

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.