I had like to get some feedback about my Thread Safe Python Client-Server example.
Is it really thead-safe?
Do you see any dead-locks or other thread problems?
# Server
import socket
import sys
import threading
import select
class ServerConnection():
def __clientThread(self, callback, connection, client_address):
try:
inputs = ( connection )
outputs = ( )
timeout = 1
while self.startClient:
readable, writable, exceptional = select.select(inputs, outputs, inputs, timeout)
if not ((readable or writable or exceptional)):
if (not self.startClient):
break
continue
if (not self.startClient):
break
for sock in readable:
data = sock.recv(16)
if (data):
callback(client_address, data)
except Exception as e:
print ('Server: ', e)
finally:
print("Exiting Client Thread")
def __listenerThread(self, callback, numOfAllowedConnections):
try:
self.sock.settimeout(0)
self.sock.setblocking(0)
self.sock.listen(numOfAllowedConnections)
print ('Server: Waiting for a connection...')
read_list = (self.sock)
timeout = 1
while self.startListener:
readable, writable, exceptional = select.select(read_list, (), (), timeout)
if not ((readable or writable or exceptional)):
if (not self.startListener):
break
continue
connection = None
connection, client_address = self.sock.accept()
connection.setblocking(0)
self.mutex.acquire()
if (self.startListener and connection):
self.connections.append(connection)
print ('nServer: client connected:', client_address)
t = threading.Thread( target=self.__clientThread, args=(callback, connection, client_address) )
self._connectionThreads.append(t)
t.start()
self.mutex.release()
except Exception as e:
print (e)
self.Close()
finally:
print("Exiting Listener Thread")
def __init__(self):
self._listenerThread = None
self.connections = ()
self._connectionThreads = ()
self.mutex = threading.Lock()
self.sock = None
self.callback = None
self.numOfAllowedConnections = None
def Open(self, ip, port, callback, numOfAllowedConnections):
try:
self.mutex.acquire()
if (self.sock):
return False
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.callback = callback
self.numOfAllowedConnections = numOfAllowedConnections
server_address = (ip, port)
self.sock.bind(server_address)
print ('Creating socket on %s port %s' % self.sock.getsockname())
self.startListener = True
self.startClient = True
self._listenerThread = threading.Thread( target=self.__listenerThread, args=(self.callback, self.numOfAllowedConnections, ) )
self._listenerThread.start()
return True
finally:
self.mutex.release()
def Close(self):
self.mutex.acquire()
try:
if (self.startListener == False):
return False
self.startListener = False
if (self._listenerThread):
try:
self._listenerThread.join(3)
except Exception as e:
pass
self.startClient = False
try:
for connection in self.connections:
connection.close()
except Exception as e:
pass
self.connections = ()
for t in self._connectionThreads:
try:
t.join(3)
except Exception as e:
pass
self._connectionThreads = ()
finally:
self.mutex.release()
return True
def SendDataAll(self, data):
try:
self.mutex.acquire()
print ('Server: sending "%s"' % data)
for connection in self.connections:
connection.sendall(data)
return True
except Exception as e:
print(e)
return False
finally:
self.mutex.release()
def SendData(self, con, data):
try:
self.mutex.acquire()
if con in self.connections:
print (con.getpeername(), 'Server: sending "%s"' % data)
con.sendall(data)
return True
except Exception as e:
print(e)
return False
finally:
self.mutex.release()
def GetNumOfConnections(self):
try:
self.mutex.acquire()
numOfCon = len(self.connections)
return numOfCon
finally:
self.mutex.release()
def GetConnection(self, index):
try:
conn = None
self.mutex.acquire()
conn = self.connections(index)
return conn
except IndexError:
return None
finally:
self.mutex.release()
def cb(client_address, data):
print (client_address, 'server received "%s"' % data)
pass
def main():
import sys
serverConnection = ServerConnection()
serverConnection.Open('localhost', 5000, cb, 3)
print("nPress Enter to continue...n")
sys.stdin.flush()
sys.stdin.read(1)
serverConnection.SendDataAll(bytes("Itay3", encoding='utf8'))
numOfCon = serverConnection.GetNumOfConnections()
print ("nConnections: ", numOfCon)
con = None
if (numOfCon > 0):
con = serverConnection.GetConnection(0)
if (con):
serverConnection.SendData(con, bytes("Itay4", encoding='utf8'))
print("nPress Enter to Exit...n")
sys.stdin.flush()
sys.stdin.read(1)
serverConnection.Close()
pass
if __name__ == "__main__":
main()
# Client
import socket
import sys
import threading
import select
class ClientConnection():
def __init__(self, ip, port, callback):
self.mutex = threading.Lock()
self.sock = None
self.ip = ip
self.port = port
self.callback = callback
self.clientThread = None
self.connect = False
def __clientThread(self):
try:
inputs = ( self.sock )
outputs = ( )
timeout = 1
while self.connect:
readable, writable, exceptional = select.select(inputs, outputs, inputs, timeout)
if not ((readable or writable or exceptional)):
if (not self.connect):
break
continue
if (not self.connect):
break
for sock in readable:
data = sock.recv(16)
if (data):
self.callback(self.sock.getsockname(), data)
except Exception as e:
print (self.sock.getsockname()(1), e)
finally:
print(self.sock.getsockname()(1), "Exiting Client Thread")
def Connect(self):
try:
self.mutex.acquire()
if (self.connect):
return False
self.connect = True
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = (self.ip, self.port)
print ('Client: connecting to %s port %s' % server_address)
self.sock.connect(server_address)
#self.sock.setblocking(0)
t = threading.Thread( target=self.__clientThread)
self.clientThread = t
self.clientThread.start()
return True
except Exception as e:
print (e)
return False
finally:
self.mutex.release()
def Disconnect(self):
try:
self.mutex.acquire()
if (not self.connect):
return False
self.connect = False
if (self.clientThread):
try:
self.clientThread.join(3)
except Exception as e:
pass
if (self.sock):
self.sock.close()
return True
finally:
self.mutex.release()
def SendData(self, data):
try:
self.mutex.acquire()
if (not self.connect):
return False
if (self.sock):
print (self.sock.getsockname(), 'Client: sending "%s"' % data)
self.sock.sendall(data)
return True
except Exception as e:
print(e)
return False
finally:
self.mutex.release()
def cb1(address, data):
print (address, 'Client1: received "%s"' % data)
pass
def cb2(address, data):
print (address, 'Client2: received "%s"' % data)
pass
def main():
client1 = ClientConnection('localhost', 5000, cb1)
if (client1.Connect()):
client1.SendData(bytes("Itay1", encoding='utf8'))
if (client1.Connect()):
client1.SendData(bytes("Itay1", encoding='utf8'))
client2 = ClientConnection('localhost', 5000, cb2)
if (client2.Connect()):
client2.SendData(bytes("Itay2", encoding='utf8'))
print ("nPress Enter key to exit...n")
sys.stdin.flush()
sys.stdin.read(1)
client1.Disconnect()
client2.Disconnect()
pass
if __name__ == "__main__":
main()