I'm trying to run an asyncio loop in a separate thread from my main thread. While the thread and loop are running, I would like to add new tasks to it. I have the following code:
class Craft:
# [...]
async def exec_subscription(self, session, subscription: str, variable_values: dict, callback: Callable) -> None:
"""Execute a subscription on the GraphQL API."""
async for response in session.subscribe(gql(subscription), variable_values=variable_values):
callback(response)
def subscribe_job(self, job_id: int, callback: Callable) -> Union[bool, None]:
"""Subscribe to a job, receive the job information every time it is updated and pass it to a callback function."""
async def _schedule_subscription_task(subscribe_job: str, variable_values: dict, callback: Callable) -> None:
"""Schedule subscription task in asyncio loop using existing websocket connection."""
async with self._ws_client as session:
task = asyncio.create_task(self.exec_subscription(session, subscribe_job, variable_values, callback))
await asyncio.gather(task)
def _run_subscription_loop(subscribe_job: str, variable_values: dict, callback: Callable) -> None:
"""Run asyncio loop."""
asyncio.run(_schedule_subscription_task(subscribe_job, variable_values, callback))
# Build GraphQL subscription
subscribe_job = """
subscription jobSubscription($jobId: Int!) {
jobSubscriptionById(id: $jobId) {
job {...}
}
}
"""
# Build variables dictionary
variable_values = {
'jobId': job_id
}
# Check if subscription thread is running
thread_name = 'CraftSubscriptionThread'
for thread in threading.enumerate():
# If thread is found, add subscription to existing asyncio loop
if thread.name == thread_name:
# Add task to asyncio loop
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(_schedule_subscription_task(subscribe_job, variable_values, callback), loop)
return True
# Else create new event loop and new thread
thread = threading.Thread(name=thread_name, daemon=True, target=_run_subscription_loop, args=(subscribe_job, variable_values, callback))
thread.start()
return True
In a Python terminal, I run the main method subscribe_job with the following callback function:
from craft import Craft
global updated_job
updated_job = "0"
def print_job(job):
global updated_job
updated_job = job
print(updated_job)
craft.subscribe_job(1561, print_job)
This works well and when the subscription receives a message, it prints the job in the terminal:
>>> {'jobSubscriptionById': {'job': {'id': 1561, 'spaceId': 1, 'applicationId': 47, 'configurationId': 139, 'title': 'Job 1631357928', 'description': None, 'status': 'failed', 'step': 1, 'progress': '1.00'}}}
However, when I trigger another subscription to add a new task to my loop, it seems nothing happens. I simply trigger a new subscription for a different job as below, it supposedly call run_coroutine_threadsafe:
craft.subscribe_job(1561, print_job)