Answered step by step
Verified Expert Solution
Link Copied!

Question

1 Approved Answer

Your task is to implement the ReliableMessageSender and ReliableMessageReceiver classes in reliable_transport.py. ReliableMessageSender: This class reliably delivers a message to a receiver. You have to

Your task is to implement the ReliableMessageSender and ReliableMessageReceiver classes in reliable_transport.py.

ReliableMessageSender:

This class reliably delivers a message to a receiver. You have to implement the send_message and on_packet_received methods. You can use self.send(packet) to send a packet to the receiver. I am also attaching the reliable socket.py file below for help. You only need to edit the reliable transport.py. CODE BOTHTHESE METHODS IN PYTHON.

Method descriptions:

__init__(self, ..., window_size: int) This is the constructor of the class where you can define any class attributes. window_size is the size of your message transport window (the number of in-flight packets during message transmission). Ignore other arguments; they are passed to the parent class. You should immediately return from this function and not block.

send_message(self, message: str) This method reliably sends the passed message to the receiver. This method does not need to spawn a new thread and return immediately; it can block indefinitely until the message is completely received by the receiver. You can send a packet to the receiver by calling self.send(...).

on_packet_received(self, packet: str) This method is invoked whenever a packet is received from the receiver. Ideally, only ACK packets should be received here. You would have to use a way to communicate these packets to the send_message method. One way is to use a queue: you can enqueue packets to it in this method, and dequeue them in send_message. You can also use the timeout argument of a queues dequeue method to implement timeouts in this assignment. You should immediately return from this method and not block.

ReliableMessageReceiver:

This class reliably receives a message from a sender. You have to implement the on_packet_received method. You can use self.send(packet) to send a packet back to the sender, and will have to call self.on_message_completed(message) when the complete message is received. Method descriptions:

__init__(self, ...) This is the constructor of the class where you can define any class attributes to maintain state. You should immediately return from this function and not block.

on_packet_received(self, packet: str) This method is invoked whenever a packet is received from the sender. You have to inspect the packet and determine what to do. You should immediately return from this method and not block. You can either ignore the packet, or send a corresponding ACK packet back to the sender by calling self.send(packet). If you determine that the sender has completely sent the message, call self.on_message_completed(message) with the completed message as its argument. Details on the role of a reliable receiver are provided in the next section.

Implementation Instructions

3.1 Packets

Youll need four types of packets in this assignment: "start", "end", "data", and "ack". All packets will also have a sequence number and a checksum. If the packet is a data packet, it will also contain the message content.

The packet format you have to follow is: "|||" An example start packet would be: "start|4212||1947410104"

For the message "request_users_list", with a chunk size of 10 bytes, the data packets would be: "data|4213|request_us|2826007388" "data|4214|ers_list|993936753"

You can use util.make_packet(...) to make a packet. It accepts packet_type, seq_num, and msg_content as arguments and returns a packet in the correct format. The returned packet also contains the checksum. For example, to make your start packet you can use the helper function as shown: start_packet = util.make_packet("start", start_seq_num) To validate the checksum, you can pass your packet to util.validate_checksum(...), which returns true/false accordingly. You can also use util.parse_packet(...) to parse a packet into its individual components (packet type, sequence number, data and checksum).

Implementing Reliable Transport

A message transmission will start from a start packet, and end with an end packet. You will be sending packets using a sliding window mechanism. For every received packet, the receiver will send a cumulative ACK with the sequence number it expects to receive next.

Senders logic: 1. Break down the message into util.CHUNK_SIZE sized chunks. 2. Choose a random sequence number to start the communication from. 3. Reliably send a start packet. (i.e. wait for its ACK and resend the packet if the ACK is not received within util.TIME_OUT seconds.) 4. Send out a window3 of data packets and wait for ACKs to slide the window appropriately. 5. How to slide the window? Suppose that the current window starts at sequence number j. If you receive an ACK of sequence number k, such that k > j, send the subsequent k j number of chunks. Note that the window now starts from sequence number j + (k j). However, please note that this is just one way to implement your sliding window. 6. If you receive no ACKs for util.TIME_OUT seconds, resend all the packets in the current window4 . 7. Once all the chunks have been reliably sent, reliably send an end packet.

Receivers logic: 1. When you receive a packet, validate its checksum and ignore it if it is corrupted. 2. Inspect the packet_type and sequence number. 3. If the packet type is "start", prepare to store incoming chunks of data in some data structure and send an ACK back to the sender with the received packets sequence number + 1. 4. If the packet type is "data", store it in an appropriate data type (if it is not a duplicate packet you already have stored), and send a corresponding cumulative ACK. (ACK with the sequence number for which all previous packets have been received). 5. If the packet type is "end", assemble all the stored chunks into a message, call self.on_message_received(message) with the completed message, and send an ACK with the received packets sequence number + 1.

Starter code:

UTIL.PY:

'''

This file contains basic utility functions that you can use.

'''

import binascii

MAX_NUM_CLIENTS = 10

TIME_OUT = 0.5 # 500ms

NUM_OF_RETRANSMISSIONS = 3

CHUNK_SIZE = 1400 # 1400 Bytes

def validate_checksum(message):

'''

Validates Checksum of a message and returns true/false

'''

try:

msg, checksum = message.rsplit('|', 1)

msg += '|'

return generate_checksum(msg.encode()) == checksum

except BaseException:

return False

def generate_checksum(message):

'''

Returns Checksum of the given message

'''

return str(binascii.crc32(message) & 0xffffffff)

def make_packet(pck_type="data", seqno=0, msg=""):

'''

This will add the packet header to your message.

The formats is `|||`

pck_type can be data, ack, end, start

seqno is a packet sequence number (integer)

msg is the actual message string

'''

body = "%s|%d|%s|" % (pck_type, seqno, msg)

checksum = generate_checksum(body.encode())

packet = "%s%s" % (body, checksum)

return packet

def parse_packet(packet):

'''

This function will parse the packet in the same way it was made in the above function.

'''

pieces = packet.split('|')

pck_type, seqno = pieces[0:2]

checksum = pieces[-1]

data = '|'.join(pieces[2:-1])

return pck_type, seqno, data, checksum

def make_message(msg_type, msg_format, message=None):

'''

This function can be used to format your message according

to any one of the formats described in the documentation.

msg_type defines type like join, disconnect etc.

msg_format is either 1,2,3 or 4

msg is remaining.

'''

if msg_format == 2:

return "%s" % (msg_type)

if msg_format in [1, 3, 4]:

return "%s %s" % (msg_type, message)

return ""

RELIABLE SOCKET.PY:

import socket

from typing import Dict, Tuple

from queue import Queue

from threading import Thread

from random import randint

from reliable_transport import ReliableMessageSender, ReliableMessageReceiver

Address = Tuple[str, int]

MsgID = int

class ReliableSocket:

"""

This is a socket that reliably transports messages.

Description:

This socket uses a UDP socket and builds reliability on top of it. For

each message transport, this class maintains a ReliableMessageSender and

a ReliableMessageReceiver that implement the reliable transport logic.

ReliableMessageSender and ReliableMessageReceiver are implemented in

reliable_transmission.py

APIs:

ReliableSocket.send(receiver_addr, message)

Sends a message to an address

ReliableSocket.recvfrom()

Receives a message sent to the socket

"""

def __init__(self, dest, port, window_size, bufsize=4096):

self.__dest = dest

self.__port = port

self.__window_size = window_size

self.__bufsize = bufsize

self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

self.__sock.settimeout(None)

self.__sock.bind((self.__dest, self.__port))

self.__senders: Dict[Tuple[Address, MsgID], ReliableMessageSender] = {}

self.__receivers: Dict[Tuple[Address, MsgID], ReliableMessageReceiver] = {}

self.__received_messages = Queue()

# start the thread for receiving packets

Thread(target=self.__receive_handler, args=(), daemon=True).start()

def recvfrom(self,

block: int = True,

timeout: int = None) -> Tuple[str, Address]:

"""

Returns a reliably received message on the socket.

Args:

block (bool, optional):

Notifies if this function should block to wait for a value.

Defaults to True.

timeout (int, optional):

The amount of time in seconds the function waits for a value.

Defaults to None.

Returns:

Tuple[str, Address]:

A tuple of

(i) the received message, and

(ii) the address from where the message is received from

Note:

If optional args block is true and timeout is None (the default), it

blocks if necessary until a message is received. If timeout is a positive

number, it blocks at most timeout seconds and raises the queue.Empty

exception if no message is received within that time. Otherwise (block is

false), return a received message if one is immediately available, else

raise the queue.Empty exception (timeout is ignored in that case).

"""

return self.__received_messages.get(block=block, timeout=timeout)

def sendto(self, receiver_addr: Address, message: str):

"""

Send message to an address reliably.

Args:

receiver_addr (Address): Address of destination

message (str): Message to send to the destination

Note:

This function call is syncronous. It blocks until the message is

reliably transported to the destination (which can take arbitrary

time).

"""

self.__send_message_reliably(receiver_addr, message)

@staticmethod

def __is_from_a_receiver(sender_type: str) -> bool:

return sender_type == "r"

@staticmethod

def __parse_raw_packet(raw_packet: str) -> Tuple[str, int, str]:

raw_packet = raw_packet.split(':')

return raw_packet[0], int(raw_packet[1]), ':'.join(raw_packet[2:])

def __receive_handler(self):

"""

Receives packets on the socket and redirects them to the their particular

reliable message sender/receivers.

"""

while True:

# recieve a packet for a message from a client

byte_packet, addr = self.__sock.recvfrom(self.__bufsize)

raw_packet = byte_packet.decode("utf-8")

sender_type, msg_id, packet = self.__parse_raw_packet(raw_packet)

if self.__is_from_a_receiver(sender_type):

# this belongs to a sender

self.__send_to_a_sender(addr, msg_id, packet)

else:

# this belongs to a receiver

self.__send_to_a_receiver(addr, msg_id, packet)

def __send_to_a_sender(self, addr, msg_id: int, ack_packet: str):

"""

Redirects received ack packet to the corresponding message sender

"""

if (addr, msg_id) in self.__senders:

sender: ReliableMessageSender = self.__senders[(addr, msg_id)]

sender.on_packet_received(ack_packet)

else:

print("Warning: no sender identified for", (addr, msg_id))

def __send_to_a_receiver(self, addr, msg_id: int, packet: str):

"""

Redirects received packet to the corresponding message receiver.

If no such receiver is found, a new message receiver is initialized.

"""

if not (addr, msg_id) in self.__receivers:

# this is a new transmission, set up new receiver

self.__setup_new_receiver(addr, msg_id)

receiver: ReliableMessageReceiver = self.__receivers[(addr, msg_id)]

receiver.on_packet_received(packet)

def __setup_new_receiver(self, new_addr, msg_id: int):

"""

Initializes new message receiver.

Initialization involves making a queue in which the message receiver can enqueue the message

once it is completely (and reliably) received. ReliableSocket needs to start a thread that

dequeues this queue to get the completed message.

"""

completed_msg_q: Queue = Queue()

self.__receivers[(new_addr, msg_id)] = ReliableMessageReceiver(

self.__sock, new_addr, msg_id, completed_msg_q)

Thread(target=self.__process_completed_message,

args=(new_addr, completed_msg_q),

daemon=True).start()

def __process_completed_message(self, sender_addr, completed_msg_q: Queue):

"""

Waits for a completed message to be received by a reliable message receiver

and then processes it

"""

# blocks here for message

received_msg = completed_msg_q.get()

# act on completed message

self.__received_messages.put((received_msg, sender_addr))

# # delete this message receiver (as its of no use now, message has been completely received)

# del self.__receivers[(sender_addr, msg_id)]

def __get_unique_msg_id(self, recvr_addr):

msg_id = randint(50000, 99999)

while (recvr_addr, msg_id) in self.__senders:

msg_id = randint(50000, 99999)

return msg_id

def __send_message_reliably(self, recvr_addr, message):

"""

Sends a message reliably.

- Initializes a reliable message sender instance that can reliably send this message.

- Stores this in a dictionary so that we can send acks to it from our receive_handler.

- Notiifies the reliable message sender to start sending.

- Deletes the reliable message sender instance as the message has been completey sent.

"""

msg_id = self.__get_unique_msg_id(recvr_addr)

sender = ReliableMessageSender(self.__sock, recvr_addr, msg_id,

self.__window_size)

self.__senders[(recvr_addr, msg_id)] = sender

sender.send_message(message)

# del self.__senders[(recvr_addr, msg_id)]

RELAIABLE TRANSPORT.PY:

from queue import Queue

from typing import Tuple

from socket import socket

Address = Tuple[str, int]

class MessageSender:

'''

DO NOT EDIT ANYTHING IN THIS CLASS

'''

def __init__(self, sock: socket, receiver_addr: Address, msg_id: int):

self.__sock: socket = sock

self.__receiver_addr = receiver_addr

self.__msg_id = msg_id

def send(self, packet: str):

self.__sock.sendto(

(f"s:{str(self.__msg_id)}:{packet}").encode("utf-8"),

self.__receiver_addr)

class ReliableMessageSender(MessageSender):

'''

This class reliably delivers a message to a receiver.

You have to implement the send_message and on_packet_received methods.

You can use self.send(packet) to send a packet to the receiver.

You can add as many helper functions as you want.

'''

def __init__(self, sock: socket, receiver_addr: Address, msg_id: int,

window_size: int):

MessageSender.__init__(self, sock, receiver_addr, msg_id)

'''

This is the constructor of the class where you can define any class attributes.

window_size is the size of your message transport window (the number of in-flight packets during message transmission).

Ignore other arguments; they are passed to the parent class.

You should immediately return from this function and not block.

'''

def on_packet_received(self, packet: str):

'''

TO BE IMPLEMENTED BY STUDENTS

This method is invoked whenever a packet is received from the receiver.

Ideally, only ACK packets should be received here.

You would have to use a way to communicate these packets to the send_message method.

One way is to use a queue: you can enqueue packets to it in this method, and dequeue them in send_message.

You can also use the timeout argument of a queues dequeue method to implement timeouts in this assignment.

You should immediately return from this method and not block.

'''

raise NotImplementedError

def send_message(self, message: str):

''''

TO BE IMPLEMENTED BY STUDENTS

This method reliably sends the passed message to the receiver.

This method does not need to spawn a new thread and return immediately; it can block indefinitely until the message is completely received by the receiver.

You can send a packet to the receiver by calling self.send(...).

Sender's logic:

1) Break down the message into util.CHUNK_SIZE sized chunks.

2) Choose a random sequence number to start the communication from.

3) Reliably send a start packet. (i.e. wait for its ACK and resend the packet if the ACK is not received within util.TIME_OUT seconds.)

4) Send out a window of data packets and wait for ACKs to slide the window appropriately.

5) How to slide the window? Suppose that the current window starts at sequence number j. If you receive an ACK of sequence number k, such that k > j, send the subsequent k j number of chunks. Note that the window now starts from sequence number j + (k j).

6) If you receive no ACKs for util.TIME_OUT seconds, resend all the packets in the current window.

7) Once all the chunks have been reliably sent, reliably send an end packet.

'''

raise NotImplementedError

class MessageReceiver:

'''

DO NOT EDIT ANYTHING IN THIS CLASS

'''

def __init__(self, sock: socket, sender_addr: Address, msg_id: int,

completed_message_q: Queue):

self.__sock: socket = sock

self.__sender_addr = sender_addr

self.__msg_id = msg_id

self.__completed_message_q = completed_message_q

def send(self, packet: str):

self.__sock.sendto(

(f"r:{str(self.__msg_id)}:{packet}").encode("utf-8"),

self.__sender_addr)

def on_message_completed(self, message: str):

self.__completed_message_q.put(message)

class ReliableMessageReceiver(MessageReceiver):

'''

This class reliably receives a message from a sender.

You have to implement the on_packet_received method.

You can use self.send(packet) to send a packet back to the sender, and will have to call self.on_message_completed(message) when the complete message is received.

You can add as many helper functions as you want.

'''

def __init__(self, sock: socket, sender_addr: Address, msg_id: int,

completed_message_q: Queue):

MessageReceiver.__init__(self, sock, sender_addr, msg_id,

completed_message_q)

'''

This is the constructor of the class where you can define any class attributes to maintain state.

You should immediately return from this function and not block.

'''

def on_packet_received(self, packet: str):

'''

TO BE IMPLEMENTED BY STUDENTS

This method is invoked whenever a packet is received from the sender.

You have to inspect the packet and determine what to do.

You should immediately return from this method and not block.

You can either ignore the packet, or send a corresponding ACK packet back to the sender by calling self.send(packet).

If you determine that the sender has completely sent the message, call self.on_message_completed(message) with the completed message as its argument.

Receivers logic:

1) When you receive a packet, validate its checksum and ignore it if it is corrupted.

2) Inspect the packet_type and sequence number.

3) If the packet type is "start", prepare to store incoming chunks of data in some data structure and send an ACK back to the sender with the received packets sequence number + 1.

4) If the packet type is "data", store it in an appropriate data type (if it is not a duplicate packet you already have stored), and send a corresponding cumulative ACK. (ACK with the sequence number for which all previous packets have been received).

5) If the packet type is "end", assemble all the stored chunks into a message, call self.on_message_received(message) with the completed message, and send an ACK with the received packets sequence number + 1.

'''

raise NotImplementedError

Step by Step Solution

There are 3 Steps involved in it

Step: 1

blur-text-image

Get Instant Access to Expert-Tailored Solutions

See step-by-step solutions with expert insights and AI powered tools for academic success

Step: 2

blur-text-image

Step: 3

blur-text-image

Ace Your Homework with AI

Get the answers you need in no time with our AI-driven, step-by-step assistance

Get Started

Recommended Textbook for

Repairing And Querying Databases Under Aggregate Constraints

Authors: Sergio Flesca ,Filippo Furfaro ,Francesco Parisi

2011th Edition

146141640X, 978-1461416401

More Books

Students also viewed these Databases questions