0

I'm trying to build up a non-blocking UDP-Server that listens to different ports and receives data-packages until timeout. Unfortunatly, I cannot change the client-side and UDP is mandatory. Receiving files works fine. The issue is, that creating the workers is a blocking operation. I'd like to have it non-blocking so all workers are called in parallel. Also each worker should run in a loop like while True but that was blocking, too.

Here is my code:

#!/usr/bin/env python
from socket import *
import sys
import select
import threading
threads = []

def worker(port):
        host="192.168.88.51"
        s = socket(AF_INET,SOCK_DGRAM)
        s.bind((host,port))
        addr = (host,port)
        buf=128
        data,addr = s.recvfrom(buf)
        filename =  str(port)+".data"
        print str(port)+" received File:"
        f = open(filename,'wb')

        data, addr = s.recvfrom(buf)
        try:
            while(data):
                f.write(data)
                s.settimeout(1)
                data,addr = s.recvfrom(buf)
        except timeout:
            f.close()
            s.close()
            print "File Downloaded"

for i in range(1300,1305):
    wrk = worker(i)
    threads.append(wrk)
1
  • You need to create an object to hold each of your socket connections. This object will have separate connection and listening methods. Once you have that setup you can run the listener method of each object in a separate thread. Commented Aug 29, 2017 at 11:51

2 Answers 2

1

This will work as you intended except that it will overwrite the file a new each time new data is sent without timing out. Timeout represents end of whole connection. But you can easily rework this to add data to same file or create a new file or do whatever you need.


#! /usr/bin/env python
from socket import AF_INET, SOCK_DGRAM
import socket
import threading

class Server (threading.Thread):
    def __init__ (self, host="192.168.88.51", port=123, bufsize=128):
        threading.Thread.__init__(self)
        self.host = host
        self.port = port
        self.bufsize = bufsize
        self.done = threading.Event()

    def opensock (self):
        s = socket.socket(AF_INET, SOCK_DGRAM)
        s.bind((self.host, self.port))
        s.settimeout(0.001)
        return s

    def run (self):
        host = self.host
        port = self.port
        self.s = s = self.opensock()
        print "Waiting for connection on", host+":"+str(port)
        while not self.done.isSet():
            try:
                data, addr = s.recvfrom(self.bufsize)
                print "Connection from", addr
                s.settimeout(1)
                self.recvdata(data, s, addr)
                s.settimeout(0.001)
            except socket.timeout: pass
            except:
                raise
        self.done.set()
        s.close()
        print  "Server on '%s:%s' stopped!" % (host, port)

    def recvdata (self, initdata, conn, addr):
        bufsize = self.bufsize
        filename =  str(self.port)+".data"
        print "Opening file", filename
        f = open(filename, "wb")
        print "Receiving & writingrest of data from", addr
        data = initdata
        while data and not self.done.isSet():
            f.write(data)
            try:
                data, addr = conn.recvfrom(bufsize)
            except socket.timeout: break
        f.close()
        if self.done.isSet():
            print "Forcefully interrupted transmission"
        else:
            print "File Downloaded"

    def stop (self):
        self.done.set()
        self.s.close()

servers = []
for port in xrange(123, 150):
    try:
        s = Server(port=port)
        s.start()
        servers.append(s)
    except Exception as e:
        print e

raw_input("Press enter to send data to one of ports for testing . . . ")
import random
a = servers[0].host
p = random.choice(servers).port
print "data will be sent to port '%s:%i'" % (a, p)
k = socket.socket(AF_INET, SOCK_DGRAM)
k.connect((a, p))
k.send("1234567890")
k.send("asdfghjkl")
k.send("0987654321")
k.close()
raw_input("Press enter to close the program . . . ")

# Stop all servers:
for s in servers:
    s.stop()

# Make sure all of them terminated:
for s in servers:
    s.join()

Sign up to request clarification or add additional context in comments.

1 Comment

Wow! Great Job. That's how it really should be done. Thank you!
0

That did it. Figured it out myself.

#!/usr/bin/env python
from socket import *
import sys
import select
import multiprocessing

def worker(port):
        print "started: "+str(port)
        host="192.168.88.51"
        s = socket(AF_INET,SOCK_DGRAM)
        s.bind((host,port))
        addr = (host,port)
        buf=128
        data,addr = s.recvfrom(buf)
        filename =  str(port)+".jpg"
        print str(port)+" received File:"
        f = open(filename,'wb')

        data, addr = s.recvfrom(buf)
        try:
            while(data):
                f.write(data)
                s.settimeout(1)
                data,addr = s.recvfrom(buf)
        except timeout:
            f.close()
            s.close()
            print "File Downloaded"

for i in range(1300,1305):
    multiprocessing.Process(target=worker, args=(i,)).start()

2 Comments

While this solves your problem it is very badly done. Also isn't exactly an answer to your Q because it uses multiprocessing instead of multithreading which may later lead to some unwanted complications. You have to do as suggested in a comment above. Subclass the class threading.Thread(), and put your server into its run() method. Then start thus created Thread() as many times as you like with different args. Or use asyncore module to create kernel managed sockets which can be served one by one in a loop thus simulating threads.
Oh, BTW, threading.Thread() also can be used in a similar manner to multiprocessing.Process(), but you really should do it properly by subclassing the class and implementing a stopping mechanism as well. If you close your program forcefully while socket is opened you may have trouble connecting to the same port later. On Windows especially.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.