0

I thought this would just be a quick and easy problem to solve, but alas it is not.

I'm trying to find a way of sending a "CancellationToken" from my python client implementation which communicates with my C# Server.

With the C# client implementation I can simply implement:

 var tokenSource = new CancellationTokenSource();

 while (await priceResponse.ResponseStream.MoveNext(tokenSource.Token))

The Server code:

public async override Task Prices(PricesRequest request, IServerStreamWriter<PricesResponse> responseStream, ServerCallContext context)
{
    ...

    while (!context.CancellationToken.IsCancellationRequested) 
    {
       ... 

Does anyone know how to implement the cancellation pattern using python to communicate with the C# Server, or am I going about this the wrong way?

Thanks for any advice or opinions :)

I tried this - but to no avail...

class CancellationToken:
    def __init__(self):
        self._is_canceled = False

    def is_canceled(self):
        return self._is_canceled
    
    def cancel(self):
        self._is_canceled = True

But I'm relatively poor at python - so took this from some other more enlightened soul :)

Here's the Python code (although like I said I'm no expert)

import grpc
import greeting_pb2 
import greeting_pb2_grpc
import pricing_pb2
import pricing_pb2_grpc 
import os

import logging

_LOGGER = logging.getLogger(__name__)
_SERVER_HOST = "localhost"
_SERVER_PORT = "50051"

def pricing_client():
    
    try:
        
        _LOGGER.info(f"Attempting to connecto {_SERVER_HOST}:{_SERVER_PORT}...")
                
        channel = grpc.insecure_channel(f"{_SERVER_HOST}:{_SERVER_PORT}")
        
        pricingStub = pricing_pb2_grpc.PricingServiceStub(channel)
        
        _LOGGER.info("Channel Connection made, stubs created...")
        
        myrequest = greeting_pb2.GreetingRequest(greeting=mygreeting)
        
        myprice = pricing_pb2.Price(instrument="BP")
        mypricerequest = pricing_pb2.PricesRequest(price=myprice)
        
         pricingResponse = pricingStub.Prices(mypricerequest)
        
        _LOGGER.debug(f"Pricing response executed successfully {pricingResponse}")
        
        prices = 0
        
        for price in pricingResponse:
       
            _LOGGER.debug(f"Price received: {price}")
            prices += 1
            
            if(prices == 10):
                
                # trying to cancel here, but for now I just exit the client
                return
                
    except:   
        
        _LOGGER.error(f"An error occured - please check the server is runing @ {_SERVER_HOST}:{_SERVER_PORT}")
        
    finally:
        
        _LOGGER.info("Application exiting...")
        
if __name__ == "__main__":
    
    logging.basicConfig(level=logging.DEBUG)
    pricing_client() 

5
  • How is the Python code being called? If it's in a separate process, then somehow you have to get a socket to send your response back. Commented Aug 7, 2024 at 18:13
  • @TimRoberts thanks; I saw something about using events, but again I'm not sure how to send the token back - also not entirely sure how gRPC can accept it? Unfortunately - the python side of things is causing me the problem... Commented Aug 7, 2024 at 18:41
  • I also tried using the following... my_task = asyncio.ensure_future(pricingResponse) and then tried cancelling, after N responses... but this just failed... my_task.cancel() Commented Aug 7, 2024 at 19:19
  • Are you using the Python gRPC module? Where is your code to connect to the server? Commented Aug 7, 2024 at 20:14
  • I've edited the original question with the code in question... I've also read that the grpc python library might provide a future as part of it's implementation... chromium.googlesource.com/external/github.com/grpc/grpc/+/HEAD/… But I will pick this up in the morning now - thanks buddy Commented Aug 7, 2024 at 21:53

1 Answer 1

0

So I think I have solved this using the following...

Again I would love anyone who is more knowledgeable than me to confirm!? :)

def run_streaming_client(Instrument):
with grpc.insecure_channel(f"{_SERVER_HOST}:{_SERVER_PORT}") as channel:
    
    stub = pricing_pb2_grpc.PricingServiceStub(channel)
    
    result_generator = stub.Prices(
        pricing_pb2.PriceRequest(price=pricing_pb2.Price(instrument="GOOG")),
        wait_for_ready=True)

    def cancel_request(unused_signum, unused_frame):
        result_generator.cancel()
        sys.exit(0)

    prices = 0
    
    for result in result_generator:
        print(result)
        _LOGGER.debug(f"Price received: {result}")
        prices += 1
        
        if(prices == 10):
            #result_generator.cancel()
            cancel_request(signal.SIGINT, cancel_request)
            return

if name == "main":

logging.basicConfig(level=logging.DEBUG)
run_streaming_client("GOOGLE")

With many thanks to (where I got the idea):

https://gitlab.uni-hannover.de/tci-gateway-module/grpc/-/blob/e8bd422c9f2ed53c1ffbd0e99906b6309af6523f/examples/python/cancellation/client.py

Sign up to request clarification or add additional context in comments.

1 Comment

I was able to confirm that the server now responds after the cancel request is received and exits the generation so all good.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.