Question
Your task is to implement the ReliableMessageSender and ReliableMessageReceiver classes in reliable_transport.py. ReliableMessageSender: This class reliably delivers a message to a receiver. You have to
Your task is to implement the ReliableMessageSender and ReliableMessageReceiver classes in reliable_transport.py.
ReliableMessageSender:
This class reliably delivers a message to a receiver. You have to implement the send_message and on_packet_received methods. You can use self.send(packet) to send a packet to the receiver. I am also attaching the reliable socket.py file below for help. You only need to edit the reliable transport.py. CODE BOTHTHESE METHODS IN PYTHON. IMPLEMENT THE WINDOW SLIDING MECHANISM. Divide data into chunks. Use the methods provided in util.py. Reliable socket.py has already been coded.
Method descriptions:
__init__(self, ..., window_size: int) This is the constructor of the class where you can define any class attributes. window_size is the size of your message transport window (the number of in-flight packets during message transmission). Ignore other arguments; they are passed to the parent class. You should immediately return from this function and not block.
send_message(self, message: str) This method reliably sends the passed message to the receiver. This method does not need to spawn a new thread and return immediately; it can block indefinitely until the message is completely received by the receiver. You can send a packet to the receiver by calling self.send(...).
on_packet_received(self, packet: str) This method is invoked whenever a packet is received from the receiver. Ideally, only ACK packets should be received here. You would have to use a way to communicate these packets to the send_message method. One way is to use a queue: you can enqueue packets to it in this method, and dequeue them in send_message. You can also use the timeout argument of a queues dequeue method to implement timeouts in this assignment. You should immediately return from this method and not block.
ReliableMessageReceiver:
This class reliably receives a message from a sender. You have to implement the on_packet_received method. You can use self.send(packet) to send a packet back to the sender, and will have to call self.on_message_completed(message) when the complete message is received. Method descriptions:
__init__(self, ...) This is the constructor of the class where you can define any class attributes to maintain state. You should immediately return from this function and not block.
on_packet_received(self, packet: str) This method is invoked whenever a packet is received from the sender. You have to inspect the packet and determine what to do. You should immediately return from this method and not block. You can either ignore the packet, or send a corresponding ACK packet back to the sender by calling self.send(packet). If you determine that the sender has completely sent the message, call self.on_message_completed(message) with the completed message as its argument. Details on the role of a reliable receiver are provided in the next section.
Implementation Instructions
3.1 Packets
Youll need four types of packets in this assignment: "start", "end", "data", and "ack". All packets will also have a sequence number and a checksum. If the packet is a data packet, it will also contain the message content.
The packet format you have to follow is: "|||" An example start packet would be: "start|4212||1947410104"
For the message "request_users_list", with a chunk size of 10 bytes, the data packets would be: "data|4213|request_us|2826007388" "data|4214|ers_list|993936753"
You can use util.make_packet(...) to make a packet. It accepts packet_type, seq_num, and msg_content as arguments and returns a packet in the correct format. The returned packet also contains the checksum. For example, to make your start packet you can use the helper function as shown: start_packet = util.make_packet("start", start_seq_num) To validate the checksum, you can pass your packet to util.validate_checksum(...), which returns true/false accordingly. You can also use util.parse_packet(...) to parse a packet into its individual components (packet type, sequence number, data and checksum).
Implementing Reliable Transport
A message transmission will start from a start packet, and end with an end packet. You will be sending packets using a sliding window mechanism. For every received packet, the receiver will send a cumulative ACK with the sequence number it expects to receive next.
Senders logic: 1. Break down the message into util.CHUNK_SIZE sized chunks. 2. Choose a random sequence number to start the communication from. 3. Reliably send a start packet. (i.e. wait for its ACK and resend the packet if the ACK is not received within util.TIME_OUT seconds.) 4. Send out a window3 of data packets and wait for ACKs to slide the window appropriately. 5. How to slide the window? Suppose that the current window starts at sequence number j. If you receive an ACK of sequence number k, such that k > j, send the subsequent k j number of chunks. Note that the window now starts from sequence number j + (k j). However, please note that this is just one way to implement your sliding window. 6. If you receive no ACKs for util.TIME_OUT seconds, resend all the packets in the current window4 . 7. Once all the chunks have been reliably sent, reliably send an end packet.
Starter code:
UTIL.PY:
'''
This file contains basic utility functions that you can use.
'''
import binascii
MAX_NUM_CLIENTS = 10
TIME_OUT = 0.5 # 500ms
NUM_OF_RETRANSMISSIONS = 3
CHUNK_SIZE = 1400 # 1400 Bytes
def validate_checksum(message):
'''
Validates Checksum of a message and returns true/false
'''
try:
msg, checksum = message.rsplit('|', 1)
msg += '|'
return generate_checksum(msg.encode()) == checksum
except BaseException:
return False
def generate_checksum(message):
'''
Returns Checksum of the given message
'''
return str(binascii.crc32(message) & 0xffffffff)
def make_packet(pck_type="data", seqno=0, msg=""):
'''
This will add the packet header to your message.
The formats is `
pck_type can be data, ack, end, start
seqno is a packet sequence number (integer)
msg is the actual message string
'''
body = "%s|%d|%s|" % (pck_type, seqno, msg)
checksum = generate_checksum(body.encode())
packet = "%s%s" % (body, checksum)
return packet
def parse_packet(packet):
'''
This function will parse the packet in the same way it was made in the above function.
'''
pieces = packet.split('|')
pck_type, seqno = pieces[0:2]
checksum = pieces[-1]
data = '|'.join(pieces[2:-1])
return pck_type, seqno, data, checksum
def make_message(msg_type, msg_format, message=None):
'''
This function can be used to format your message according
to any one of the formats described in the documentation.
msg_type defines type like join, disconnect etc.
msg_format is either 1,2,3 or 4
msg is remaining.
'''
if msg_format == 2:
return "%s" % (msg_type)
if msg_format in [1, 3, 4]:
return "%s %s" % (msg_type, message)
return ""
RELIABLE SOCKET.PY:
import socket
from typing import Dict, Tuple
from queue import Queue
from threading import Thread
from random import randint
from reliable_transport import ReliableMessageSender, ReliableMessageReceiver
Address = Tuple[str, int]
MsgID = int
class ReliableSocket:
"""
This is a socket that reliably transports messages.
Description:
This socket uses a UDP socket and builds reliability on top of it. For
each message transport, this class maintains a ReliableMessageSender and
a ReliableMessageReceiver that implement the reliable transport logic.
ReliableMessageSender and ReliableMessageReceiver are implemented in
reliable_transmission.py
APIs:
ReliableSocket.send(receiver_addr, message)
Sends a message to an address
ReliableSocket.recvfrom()
Receives a message sent to the socket
"""
def __init__(self, dest, port, window_size, bufsize=4096):
self.__dest = dest
self.__port = port
self.__window_size = window_size
self.__bufsize = bufsize
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.__sock.settimeout(None)
self.__sock.bind((self.__dest, self.__port))
self.__senders: Dict[Tuple[Address, MsgID], ReliableMessageSender] = {}
self.__receivers: Dict[Tuple[Address, MsgID], ReliableMessageReceiver] = {}
self.__received_messages = Queue()
# start the thread for receiving packets
Thread(target=self.__receive_handler, args=(), daemon=True).start()
def recvfrom(self,
block: int = True,
timeout: int = None) -> Tuple[str, Address]:
"""
Returns a reliably received message on the socket.
Args:
block (bool, optional):
Notifies if this function should block to wait for a value.
Defaults to True.
timeout (int, optional):
The amount of time in seconds the function waits for a value.
Defaults to None.
Returns:
Tuple[str, Address]:
A tuple of
(i) the received message, and
(ii) the address from where the message is received from
Note:
If optional args block is true and timeout is None (the default), it
blocks if necessary until a message is received. If timeout is a positive
number, it blocks at most timeout seconds and raises the queue.Empty
exception if no message is received within that time. Otherwise (block is
false), return a received message if one is immediately available, else
raise the queue.Empty exception (timeout is ignored in that case).
"""
return self.__received_messages.get(block=block, timeout=timeout)
def sendto(self, receiver_addr: Address, message: str):
"""
Send message to an address reliably.
Args:
receiver_addr (Address): Address of destination
message (str): Message to send to the destination
Note:
This function call is syncronous. It blocks until the message is
reliably transported to the destination (which can take arbitrary
time).
"""
self.__send_message_reliably(receiver_addr, message)
@staticmethod
def __is_from_a_receiver(sender_type: str) -> bool:
return sender_type == "r"
@staticmethod
def __parse_raw_packet(raw_packet: str) -> Tuple[str, int, str]:
raw_packet = raw_packet.split(':')
return raw_packet[0], int(raw_packet[1]), ':'.join(raw_packet[2:])
def __receive_handler(self):
"""
Receives packets on the socket and redirects them to the their particular
reliable message sender/receivers.
"""
while True:
# recieve a packet for a message from a client
byte_packet, addr = self.__sock.recvfrom(self.__bufsize)
raw_packet = byte_packet.decode("utf-8")
sender_type, msg_id, packet = self.__parse_raw_packet(raw_packet)
if self.__is_from_a_receiver(sender_type):
# this belongs to a sender
self.__send_to_a_sender(addr, msg_id, packet)
else:
# this belongs to a receiver
self.__send_to_a_receiver(addr, msg_id, packet)
def __send_to_a_sender(self, addr, msg_id: int, ack_packet: str):
"""
Redirects received ack packet to the corresponding message sender
"""
if (addr, msg_id) in self.__senders:
sender: ReliableMessageSender = self.__senders[(addr, msg_id)]
sender.on_packet_received(ack_packet)
else:
print("Warning: no sender identified for", (addr, msg_id))
def __send_to_a_receiver(self, addr, msg_id: int, packet: str):
"""
Redirects received packet to the corresponding message receiver.
If no such receiver is found, a new message receiver is initialized.
"""
if not (addr, msg_id) in self.__receivers:
# this is a new transmission, set up new receiver
self.__setup_new_receiver(addr, msg_id)
receiver: ReliableMessageReceiver = self.__receivers[(addr, msg_id)]
receiver.on_packet_received(packet)
def __setup_new_receiver(self, new_addr, msg_id: int):
"""
Initializes new message receiver.
Initialization involves making a queue in which the message receiver can enqueue the message
once it is completely (and reliably) received. ReliableSocket needs to start a thread that
dequeues this queue to get the completed message.
"""
completed_msg_q: Queue = Queue()
self.__receivers[(new_addr, msg_id)] = ReliableMessageReceiver(
self.__sock, new_addr, msg_id, completed_msg_q)
Thread(target=self.__process_completed_message,
args=(new_addr, completed_msg_q),
daemon=True).start()
def __process_completed_message(self, sender_addr, completed_msg_q: Queue):
"""
Waits for a completed message to be received by a reliable message receiver
and then processes it
"""
# blocks here for message
received_msg = completed_msg_q.get()
# act on completed message
self.__received_messages.put((received_msg, sender_addr))
# # delete this message receiver (as its of no use now, message has been completely received)
# del self.__receivers[(sender_addr, msg_id)]
def __get_unique_msg_id(self, recvr_addr):
msg_id = randint(50000, 99999)
while (recvr_addr, msg_id) in self.__senders:
msg_id = randint(50000, 99999)
return msg_id
def __send_message_reliably(self, recvr_addr, message):
"""
Sends a message reliably.
- Initializes a reliable message sender instance that can reliably send this message.
- Stores this in a dictionary so that we can send acks to it from our receive_handler.
- Notiifies the reliable message sender to start sending.
- Deletes the reliable message sender instance as the message has been completey sent.
"""
msg_id = self.__get_unique_msg_id(recvr_addr)
sender = ReliableMessageSender(self.__sock, recvr_addr, msg_id,
self.__window_size)
self.__senders[(recvr_addr, msg_id)] = sender
sender.send_message(message)
# del self.__senders[(recvr_addr, msg_id)]
RELAIABLE TRANSPORT.PY:
from queue import Queue
from typing import Tuple
from socket import socket
Address = Tuple[str, int]
class MessageSender:
'''
DO NOT EDIT ANYTHING IN THIS CLASS
'''
def __init__(self, sock: socket, receiver_addr: Address, msg_id: int):
self.__sock: socket = sock
self.__receiver_addr = receiver_addr
self.__msg_id = msg_id
def send(self, packet: str):
self.__sock.sendto(
(f"s:{str(self.__msg_id)}:{packet}").encode("utf-8"),
self.__receiver_addr)
class ReliableMessageSender(MessageSender):
'''
This class reliably delivers a message to a receiver.
You have to implement the send_message and on_packet_received methods.
You can use self.send(packet) to send a packet to the receiver.
You can add as many helper functions as you want.
'''
def __init__(self, sock: socket, receiver_addr: Address, msg_id: int,
window_size: int):
MessageSender.__init__(self, sock, receiver_addr, msg_id)
'''
This is the constructor of the class where you can define any class attributes.
window_size is the size of your message transport window (the number of in-flight packets during message transmission).
Ignore other arguments; they are passed to the parent class.
You should immediately return from this function and not block.
'''
def on_packet_received(self, packet: str):
'''
TO BE IMPLEMENTED BY STUDENTS
This method is invoked whenever a packet is received from the receiver.
Ideally, only ACK packets should be received here.
You would have to use a way to communicate these packets to the send_message method.
One way is to use a queue: you can enqueue packets to it in this method, and dequeue them in send_message.
You can also use the timeout argument of a queues dequeue method to implement timeouts in this assignment.
You should immediately return from this method and not block.
'''
raise NotImplementedError
def send_message(self, message: str):
''''
TO BE IMPLEMENTED BY STUDENTS
This method reliably sends the passed message to the receiver.
This method does not need to spawn a new thread and return immediately; it can block indefinitely until the message is completely received by the receiver.
You can send a packet to the receiver by calling self.send(...).
Sender's logic:
1) Break down the message into util.CHUNK_SIZE sized chunks.
2) Choose a random sequence number to start the communication from.
3) Reliably send a start packet. (i.e. wait for its ACK and resend the packet if the ACK is not received within util.TIME_OUT seconds.)
4) Send out a window of data packets and wait for ACKs to slide the window appropriately.
5) How to slide the window? Suppose that the current window starts at sequence number j. If you receive an ACK of sequence number k, such that k > j, send the subsequent k j number of chunks. Note that the window now starts from sequence number j + (k j).
6) If you receive no ACKs for util.TIME_OUT seconds, resend all the packets in the current window.
7) Once all the chunks have been reliably sent, reliably send an end packet.
'''
raise NotImplementedError
class MessageReceiver:
'''
DO NOT EDIT ANYTHING IN THIS CLASS
'''
def __init__(self, sock: socket, sender_addr: Address, msg_id: int,
completed_message_q: Queue):
self.__sock: socket = sock
self.__sender_addr = sender_addr
self.__msg_id = msg_id
self.__completed_message_q = completed_message_q
def send(self, packet: str):
self.__sock.sendto(
(f"r:{str(self.__msg_id)}:{packet}").encode("utf-8"),
self.__sender_addr)
def on_message_completed(self, message: str):
self.__completed_message_q.put(message)
class ReliableMessageReceiver(MessageReceiver):
'''
This class reliably receives a message from a sender.
You have to implement the on_packet_received method.
You can use self.send(packet) to send a packet back to the sender, and will have to call self.on_message_completed(message) when the complete message is received.
You can add as many helper functions as you want.
'''
def __init__(self, sock: socket, sender_addr: Address, msg_id: int,
completed_message_q: Queue):
MessageReceiver.__init__(self, sock, sender_addr, msg_id,
completed_message_q)
'''
This is the constructor of the class where you can define any class attributes to maintain state.
You should immediately return from this function and not block.
'''
def on_packet_received(self, packet: str):
'''
TO BE IMPLEMENTED BY STUDENTS
This method is invoked whenever a packet is received from the sender.
You have to inspect the packet and determine what to do.
You should immediately return from this method and not block.
You can either ignore the packet, or send a corresponding ACK packet back to the sender by calling self.send(packet).
If you determine that the sender has completely sent the message, call self.on_message_completed(message) with the completed message as its argument.
Receivers logic:
1) When you receive a packet, validate its checksum and ignore it if it is corrupted.
2) Inspect the packet_type and sequence number.
3) If the packet type is "start", prepare to store incoming chunks of data in some data structure and send an ACK back to the sender with the received packets sequence number + 1.
4) If the packet type is "data", store it in an appropriate data type (if it is not a duplicate packet you already have stored), and send a corresponding cumulative ACK. (ACK with the sequence number for which all previous packets have been received).
5) If the packet type is "end", assemble all the stored chunks into a message, call self.on_message_received(message) with the completed message, and send an ACK with the received packets sequence number + 1.
'''
raise NotImplementedError
Step by Step Solution
There are 3 Steps involved in it
Step: 1
Get Instant Access to Expert-Tailored Solutions
See step-by-step solutions with expert insights and AI powered tools for academic success
Step: 2
Step: 3
Ace Your Homework with AI
Get the answers you need in no time with our AI-driven, step-by-step assistance
Get Started