Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding the eggress node handling functionality #137

Merged
merged 3 commits into from
Apr 16, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion params/networks/triangle.graphml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
<data key="d31">1</data>
<data key="d32">-87.65005</data>
<data key="d33">Chicago</data>
<data key="d39">Normal</data>
<data key="d39">Egress</data>
<data key="d40">10</data>
</node>
<node id="2">
Expand Down
8 changes: 4 additions & 4 deletions src/coordsim/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def main():
random.seed(args.seed)
numpy.random.seed(args.seed)

# Parse network and get NetworkX object and ingress network list
network, ing_nodes = reader.read_network(args.network, node_cap=10, link_cap=10)
# Parse network, get NetworkX object ,ingress network list, and egress nodes list
network, ing_nodes, eg_nodes = reader.read_network(args.network, node_cap=10, link_cap=10)

# use dummy placement and schedule for running simulator without algorithm
# TODO: make configurable via CLI
Expand All @@ -44,8 +44,8 @@ def main():
metrics = Metrics(network, sf_list)

# Create the simulator parameters object with the provided args
params = SimulatorParams(network, ing_nodes, sfc_list, sf_list, config, metrics, sf_placement=sf_placement,
schedule=schedule)
params = SimulatorParams(network, ing_nodes, eg_nodes, sfc_list, sf_list, config, metrics,
sf_placement=sf_placement, schedule=schedule)
log.info(params)

if 'trace_path' in config:
Expand Down
8 changes: 6 additions & 2 deletions src/coordsim/network/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

class Flow:

def __init__(self, flow_id, sfc, dr, size, creation_time,
destination=None, current_sf=None, current_node_id=None, current_position=0, end2end_delay=0.0):
def __init__(self, flow_id, sfc, dr, size, creation_time, destination=None, egress_node_id=None, current_sf=None,
current_node_id=None, current_position=0, end2end_delay=0.0):

# Flow ID: Unique ID string
self.flow_id = flow_id
Expand All @@ -24,6 +24,10 @@ def __init__(self, flow_id, sfc, dr, size, creation_time,
self.current_sf = current_sf
# The current node that the flow is being processed in
self.current_node_id = current_node_id
# The specified ingress node of the flow. The flow will spawn at the ingress node.
self.ingress_node_id = current_node_id
# The specified egress node of the flow. The flow will depart at the egress node. Might be non-existent.
self.egress_node_id = egress_node_id
# The duration of the flow calculated in ms.
self.duration = (float(size) / float(dr)) * 1000 # Converted flow duration to ms
# Current flow position within the SFC
Expand Down
7 changes: 5 additions & 2 deletions src/coordsim/reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,16 @@ def read_network(file, node_cap=None, link_cap=None):
# Setting the all-pairs shortest path in the NetworkX network as a graph attribute
shortest_paths(networkx_network)

# Filter ingress nodes
# Filter ingress and egress (if any) nodes
ing_nodes = []
eg_nodes = []
for node in networkx_network.nodes.items():
if node[1]["type"] == "Ingress":
ing_nodes.append(node)
if node[1]["type"] == "Egress":
eg_nodes.append(node[0])

return networkx_network, ing_nodes
return networkx_network, ing_nodes, eg_nodes


def reset_cap(network):
Expand Down
42 changes: 35 additions & 7 deletions src/coordsim/simulation/flowsimulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def start(self):
# Setting the all-pairs shortest path in the NetworkX network as a graph attribute
log.info("Using nodes list {}\n".format(list(self.params.network.nodes.keys())))
log.info("Total of {} ingress nodes available\n".format(len(self.params.ing_nodes)))
if self.params.eg_nodes:
log.info("Total of {} egress nodes available\n".format(len(self.params.eg_nodes)))
for node in self.params.ing_nodes:
node_id = node[0]
self.env.process(self.generate_flow(node_id))
Expand Down Expand Up @@ -66,9 +68,13 @@ def generate_flow(self, node_id):
flow_sfc = np.random.choice([sfc for sfc in self.params.sfc_list.keys()])
# Get the flow's creation time (current environment time)
creation_time = self.env.now
# Set the egress node for the flow if some are specified in the network file
flow_egress_node = None
if self.params.eg_nodes:
flow_egress_node = random.choice(self.params.eg_nodes)
# Generate flow based on given params
flow = Flow(str(self.total_flow_count), flow_sfc, flow_dr, flow_size, creation_time,
current_node_id=node_id)
current_node_id=node_id, egress_node_id=flow_egress_node)
# Update metrics for the generated flow
self.params.metrics.generated_flow(flow, node_id)
# Generate flows and schedule them at ingress node
Expand Down Expand Up @@ -230,10 +236,31 @@ def process_flow(self, flow, sfc):
log.info("Flow {} started departing sf {} at node {}. Time {}"
.format(flow.flow_id, current_sf, current_node_id, self.env.now))

# Check if flow is currently in last SF, if so, then depart flow.
if (flow.current_position == len(sfc) - 1):
yield self.env.timeout(flow.duration)
self.depart_flow(flow)
# Check if flow is currently in last SF, if so, then:
# - Check if the flow has some Egress node set or not. If not then just depart. If Yes then:
# - check if the current node is the egress node. If Yes then depart. If No then forward the flow to
# the egress node using the shortest_path

if flow.current_position == len(sfc) - 1:
if flow.current_node_id == flow.egress_node_id:
# Flow is processed and resides at egress node: depart flow
yield self.env.timeout(flow.duration)
self.depart_flow(flow)
elif flow.egress_node_id is None:
# Flow is processed and no egress node specified: depart flow
log.info(f'Flow {flow.flow_id} has no egress node, will depart from'
f' current node {flow.current_node_id}. Time {self.env.now}.')
yield self.env.timeout(flow.duration)
self.depart_flow(flow)
else:
# Remove the active flow from the SF after it departed the SF on current node towards egress
self.params.metrics.remove_active_flow(flow, current_node_id, current_sf)
# Forward flow to the egress node and then depart from there
yield self.env.process(self.forward_flow(flow, flow.egress_node_id))
yield self.env.timeout(flow.duration)
# In this situation the last sf was never active for the egress node,
# so we should not remove it from the metrics
self.depart_flow(flow, remove_active_flow=False)
else:
# Increment the position of the flow within SFC
flow.current_position += 1
Expand Down Expand Up @@ -278,13 +305,14 @@ def process_flow(self, flow, sfc):
self.params.metrics.dropped_flow(flow)
self.env.exit()

def depart_flow(self, flow):
def depart_flow(self, flow, remove_active_flow=True):
"""
Process the flow at the requested SF of the current node.
"""
# Update metrics for the processed flow
self.params.metrics.completed_flow()
self.params.metrics.add_end2end_delay(flow.end2end_delay)
self.params.metrics.remove_active_flow(flow, flow.current_node_id, flow.current_sf)
if remove_active_flow:
self.params.metrics.remove_active_flow(flow, flow.current_node_id, flow.current_sf)
log.info("Flow {} was processed and departed the network from {}. Time {}"
.format(flow.flow_id, flow.current_node_id, self.env.now))
4 changes: 3 additions & 1 deletion src/coordsim/simulation/simulatorparams.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@


class SimulatorParams:
def __init__(self, network, ing_nodes, sfc_list, sf_list, config, metrics, prediction=False,
def __init__(self, network, ing_nodes, eg_nodes, sfc_list, sf_list, config, metrics, prediction=False,
schedule=None, sf_placement=None):
# NetworkX network object: DiGraph
self.network = network
# Ingress nodes of the network (nodes at which flows arrive): list
self.ing_nodes = ing_nodes
# Egress nodes of the network (nodes at which flows may leave the network): list
self.eg_nodes = eg_nodes
# List of available SFCs and their child SFs: defaultdict(None)
self.sfc_list = sfc_list
# List of every SF and it's properties (e.g. processing_delay): defaultdict(None)
Expand Down
6 changes: 3 additions & 3 deletions src/siminterface/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, network_file, service_functions_file, config_file, resource_
# Create CSV writer
self.writer = ResultWriter(self.test_mode, self.test_dir)
# init network, sfc, sf, and config files
self.network, self.ing_nodes = reader.read_network(self.network_file)
self.network, self.ing_nodes, self.eg_nodes = reader.read_network(self.network_file)
self.sfc_list = reader.get_sfc(service_functions_file)
self.sf_list = reader.get_sf(service_functions_file, resource_functions_path)
self.config = reader.get_config(config_file)
Expand All @@ -38,8 +38,8 @@ def __init__(self, network_file, service_functions_file, config_file, resource_
# Check if future ingress traffic setting is enabled
if 'future_traffic' in self.config and self.config['future_traffic']:
self.prediction = True
self.params = SimulatorParams(self.network, self.ing_nodes, self.sfc_list, self.sf_list, self.config,
self.metrics, prediction=self.prediction)
self.params = SimulatorParams(self.network, self.ing_nodes, self.eg_nodes, self.sfc_list, self.sf_list,
self.config, self.metrics, prediction=self.prediction)
if self.prediction:
self.predictor = TrafficPredictor(self.params)
self.episode = 0
Expand Down
4 changes: 2 additions & 2 deletions tests/test_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def setUp(self):

self.env = simpy.Environment()
# Configure simulator parameters
network, ing_nodes = reader.read_network(NETWORK_FILE, node_cap=10, link_cap=10)
network, ing_nodes, eg_nodes = reader.read_network(NETWORK_FILE, node_cap=10, link_cap=10)
sfc_list = reader.get_sfc(SERVICE_FUNCTIONS_FILE)
sf_list = reader.get_sf(SERVICE_FUNCTIONS_FILE, RESOURCE_FUNCTION_PATH)
config = reader.get_config(CONFIG_FILE)
Expand All @@ -38,7 +38,7 @@ def setUp(self):
schedule = dummy_data.triangle_schedule

# Initialize Simulator and SimulatoParams objects
self.simulator_params = SimulatorParams(network, ing_nodes, sfc_list, sf_list, config, self.metrics,
self.simulator_params = SimulatorParams(network, ing_nodes, eg_nodes, sfc_list, sf_list, config, self.metrics,
sf_placement=sf_placement, schedule=schedule)
self.flow_simulator = FlowSimulator(self.env, self.simulator_params)
self.flow_simulator.start()
Expand Down