Skip to content

Commit fab915c

Browse files
authored
Merge pull request #137 from adnan904/egress_nodes
Adding the eggress node handling functionality
2 parents a6fd6fc + 69ec4ef commit fab915c

File tree

9 files changed

+73
-27
lines changed

9 files changed

+73
-27
lines changed

README.md

+14-5
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ Simulate flow-level, inter-node network coordination including scaling and place
2727

2828
## Citing this work
2929

30-
If you are using this work in whole or in part in your project, please cite it as follows:
30+
If you are using this work in whole or in part in your project, please cite it as follows:
3131

3232
```
3333
@inproceedings{schneider2020coordination,
@@ -54,9 +54,9 @@ pip install -r requirements.txt
5454

5555
## Usage
5656

57-
Type `coord-sim -h` for help using the simulator. For now, this should print
57+
Type `coord-sim -h` for help using the simulator. For now, this should print
5858

59-
```
59+
```
6060
$ coord-sim -h
6161
usage: coord-sim [-h] -d DURATION -sf SF [-sfr SFR] -n NETWORK -c CONFIG
6262
[-t TRACE] [-s SEED]
@@ -85,7 +85,7 @@ optional arguments:
8585

8686
You can use the following command as an example (run from the root project folder)
8787

88-
```bash
88+
```bash
8989
coord-sim -d 20 -n params/networks/triangle.graphml -sf params/services/abc.yaml -sfr params/services/resource_functions -c params/config/sim_config.yaml
9090
```
9191
This will run a simulation on a provided GraphML network file and a YAML placement file for a duration of 20 timesteps.
@@ -95,7 +95,7 @@ This will run a simulation on a provided GraphML network file and a YAML placeme
9595

9696
By default, all SFs have a node resource consumption, which exactly equals the aggregated traffic that they have to handle.
9797

98-
It is possible to specify arbitrary other resource consumption models simply by implementing a python module with a
98+
It is possible to specify arbitrary other resource consumption models simply by implementing a python module with a
9999
function `resource_function(load)` (see examples [here](https://github.com/RealVNF/coordination-simulation/tree/master/params/services/resource_functions)).
100100

101101
To use these modules, they need to be referenced in the service file:
@@ -113,6 +113,15 @@ And the path to the folder with the Python modules needs to be passed via the `-
113113
See PR https://github.com/RealVNF/coordination-simulation/pull/78 for details.
114114

115115

116+
### Egress nodes
117+
118+
- A node can be set to be a `Egress` node in the `NodeType` attribute of the network file
119+
- If some nodes are set as `Egress` then only the simulator will randomly choose one of them as the Egress node for each flow in the network
120+
- If some nodes are set to be Egress then once the flow is processed we check if for the flow, `current node == egress node` . If Yes then we depart , otherwise we forward the flow to the egress_node using the shortest_path routing.
121+
- **Todo**: Ideally the coordination algorithms should keep the path(Ingress to Egress) of the flow in view while creating the schedule/placement.
122+
123+
See [PR 137](https://github.com/RealVNF/coord-sim/pull/137) for details.
124+
116125
## Tests
117126

118127
```bash

params/networks/triangle.graphml

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
<data key="d31">1</data>
8989
<data key="d32">-87.65005</data>
9090
<data key="d33">Chicago</data>
91-
<data key="d39">Normal</data>
91+
<data key="d39">Egress</data>
9292
<data key="d40">10</data>
9393
</node>
9494
<node id="2">

src/coordsim/main.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ def main():
2828
random.seed(args.seed)
2929
numpy.random.seed(args.seed)
3030

31-
# Parse network and get NetworkX object and ingress network list
32-
network, ing_nodes = reader.read_network(args.network, node_cap=10, link_cap=10)
31+
# Parse network, get NetworkX object ,ingress network list, and egress nodes list
32+
network, ing_nodes, eg_nodes = reader.read_network(args.network, node_cap=10, link_cap=10)
3333

3434
# use dummy placement and schedule for running simulator without algorithm
3535
# TODO: make configurable via CLI
@@ -44,8 +44,8 @@ def main():
4444
metrics = Metrics(network, sf_list)
4545

4646
# Create the simulator parameters object with the provided args
47-
params = SimulatorParams(network, ing_nodes, sfc_list, sf_list, config, metrics, sf_placement=sf_placement,
48-
schedule=schedule)
47+
params = SimulatorParams(network, ing_nodes, eg_nodes, sfc_list, sf_list, config, metrics,
48+
sf_placement=sf_placement, schedule=schedule)
4949
log.info(params)
5050

5151
if 'trace_path' in config:

src/coordsim/network/flow.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99

1010
class Flow:
1111

12-
def __init__(self, flow_id, sfc, dr, size, creation_time,
13-
destination=None, current_sf=None, current_node_id=None, current_position=0, end2end_delay=0.0):
12+
def __init__(self, flow_id, sfc, dr, size, creation_time, destination=None, egress_node_id=None, current_sf=None,
13+
current_node_id=None, current_position=0, end2end_delay=0.0):
1414

1515
# Flow ID: Unique ID string
1616
self.flow_id = flow_id
@@ -24,6 +24,10 @@ def __init__(self, flow_id, sfc, dr, size, creation_time,
2424
self.current_sf = current_sf
2525
# The current node that the flow is being processed in
2626
self.current_node_id = current_node_id
27+
# The specified ingress node of the flow. The flow will spawn at the ingress node.
28+
self.ingress_node_id = current_node_id
29+
# The specified egress node of the flow. The flow will depart at the egress node. Might be non-existent.
30+
self.egress_node_id = egress_node_id
2731
# The duration of the flow calculated in ms.
2832
self.duration = (float(size) / float(dr)) * 1000 # Converted flow duration to ms
2933
# Current flow position within the SFC

src/coordsim/reader/reader.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -223,13 +223,16 @@ def read_network(file, node_cap=None, link_cap=None):
223223
# Setting the all-pairs shortest path in the NetworkX network as a graph attribute
224224
shortest_paths(networkx_network)
225225

226-
# Filter ingress nodes
226+
# Filter ingress and egress (if any) nodes
227227
ing_nodes = []
228+
eg_nodes = []
228229
for node in networkx_network.nodes.items():
229230
if node[1]["type"] == "Ingress":
230231
ing_nodes.append(node)
232+
if node[1]["type"] == "Egress":
233+
eg_nodes.append(node[0])
231234

232-
return networkx_network, ing_nodes
235+
return networkx_network, ing_nodes, eg_nodes
233236

234237

235238
def reset_cap(network):

src/coordsim/simulation/flowsimulator.py

+35-7
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ def start(self):
3131
# Setting the all-pairs shortest path in the NetworkX network as a graph attribute
3232
log.info("Using nodes list {}\n".format(list(self.params.network.nodes.keys())))
3333
log.info("Total of {} ingress nodes available\n".format(len(self.params.ing_nodes)))
34+
if self.params.eg_nodes:
35+
log.info("Total of {} egress nodes available\n".format(len(self.params.eg_nodes)))
3436
for node in self.params.ing_nodes:
3537
node_id = node[0]
3638
self.env.process(self.generate_flow(node_id))
@@ -66,9 +68,13 @@ def generate_flow(self, node_id):
6668
flow_sfc = np.random.choice([sfc for sfc in self.params.sfc_list.keys()])
6769
# Get the flow's creation time (current environment time)
6870
creation_time = self.env.now
71+
# Set the egress node for the flow if some are specified in the network file
72+
flow_egress_node = None
73+
if self.params.eg_nodes:
74+
flow_egress_node = random.choice(self.params.eg_nodes)
6975
# Generate flow based on given params
7076
flow = Flow(str(self.total_flow_count), flow_sfc, flow_dr, flow_size, creation_time,
71-
current_node_id=node_id)
77+
current_node_id=node_id, egress_node_id=flow_egress_node)
7278
# Update metrics for the generated flow
7379
self.params.metrics.generated_flow(flow, node_id)
7480
# Generate flows and schedule them at ingress node
@@ -230,10 +236,31 @@ def process_flow(self, flow, sfc):
230236
log.info("Flow {} started departing sf {} at node {}. Time {}"
231237
.format(flow.flow_id, current_sf, current_node_id, self.env.now))
232238

233-
# Check if flow is currently in last SF, if so, then depart flow.
234-
if (flow.current_position == len(sfc) - 1):
235-
yield self.env.timeout(flow.duration)
236-
self.depart_flow(flow)
239+
# Check if flow is currently in last SF, if so, then:
240+
# - Check if the flow has some Egress node set or not. If not then just depart. If Yes then:
241+
# - check if the current node is the egress node. If Yes then depart. If No then forward the flow to
242+
# the egress node using the shortest_path
243+
244+
if flow.current_position == len(sfc) - 1:
245+
if flow.current_node_id == flow.egress_node_id:
246+
# Flow is processed and resides at egress node: depart flow
247+
yield self.env.timeout(flow.duration)
248+
self.depart_flow(flow)
249+
elif flow.egress_node_id is None:
250+
# Flow is processed and no egress node specified: depart flow
251+
log.info(f'Flow {flow.flow_id} has no egress node, will depart from'
252+
f' current node {flow.current_node_id}. Time {self.env.now}.')
253+
yield self.env.timeout(flow.duration)
254+
self.depart_flow(flow)
255+
else:
256+
# Remove the active flow from the SF after it departed the SF on current node towards egress
257+
self.params.metrics.remove_active_flow(flow, current_node_id, current_sf)
258+
# Forward flow to the egress node and then depart from there
259+
yield self.env.process(self.forward_flow(flow, flow.egress_node_id))
260+
yield self.env.timeout(flow.duration)
261+
# In this situation the last sf was never active for the egress node,
262+
# so we should not remove it from the metrics
263+
self.depart_flow(flow, remove_active_flow=False)
237264
else:
238265
# Increment the position of the flow within SFC
239266
flow.current_position += 1
@@ -278,13 +305,14 @@ def process_flow(self, flow, sfc):
278305
self.params.metrics.dropped_flow(flow)
279306
self.env.exit()
280307

281-
def depart_flow(self, flow):
308+
def depart_flow(self, flow, remove_active_flow=True):
282309
"""
283310
Process the flow at the requested SF of the current node.
284311
"""
285312
# Update metrics for the processed flow
286313
self.params.metrics.completed_flow()
287314
self.params.metrics.add_end2end_delay(flow.end2end_delay)
288-
self.params.metrics.remove_active_flow(flow, flow.current_node_id, flow.current_sf)
315+
if remove_active_flow:
316+
self.params.metrics.remove_active_flow(flow, flow.current_node_id, flow.current_sf)
289317
log.info("Flow {} was processed and departed the network from {}. Time {}"
290318
.format(flow.flow_id, flow.current_node_id, self.env.now))

src/coordsim/simulation/simulatorparams.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@
1010

1111

1212
class SimulatorParams:
13-
def __init__(self, network, ing_nodes, sfc_list, sf_list, config, metrics, prediction=False,
13+
def __init__(self, network, ing_nodes, eg_nodes, sfc_list, sf_list, config, metrics, prediction=False,
1414
schedule=None, sf_placement=None):
1515
# NetworkX network object: DiGraph
1616
self.network = network
1717
# Ingress nodes of the network (nodes at which flows arrive): list
1818
self.ing_nodes = ing_nodes
19+
# Egress nodes of the network (nodes at which flows may leave the network): list
20+
self.eg_nodes = eg_nodes
1921
# List of available SFCs and their child SFs: defaultdict(None)
2022
self.sfc_list = sfc_list
2123
# List of every SF and it's properties (e.g. processing_delay): defaultdict(None)

src/siminterface/simulator.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def __init__(self, network_file, service_functions_file, config_file, resource_
2828
# Create CSV writer
2929
self.writer = ResultWriter(self.test_mode, self.test_dir)
3030
# init network, sfc, sf, and config files
31-
self.network, self.ing_nodes = reader.read_network(self.network_file)
31+
self.network, self.ing_nodes, self.eg_nodes = reader.read_network(self.network_file)
3232
self.sfc_list = reader.get_sfc(service_functions_file)
3333
self.sf_list = reader.get_sf(service_functions_file, resource_functions_path)
3434
self.config = reader.get_config(config_file)
@@ -38,8 +38,8 @@ def __init__(self, network_file, service_functions_file, config_file, resource_
3838
# Check if future ingress traffic setting is enabled
3939
if 'future_traffic' in self.config and self.config['future_traffic']:
4040
self.prediction = True
41-
self.params = SimulatorParams(self.network, self.ing_nodes, self.sfc_list, self.sf_list, self.config,
42-
self.metrics, prediction=self.prediction)
41+
self.params = SimulatorParams(self.network, self.ing_nodes, self.eg_nodes, self.sfc_list, self.sf_list,
42+
self.config, self.metrics, prediction=self.prediction)
4343
if self.prediction:
4444
self.predictor = TrafficPredictor(self.params)
4545
self.episode = 0

tests/test_simulator.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def setUp(self):
2727

2828
self.env = simpy.Environment()
2929
# Configure simulator parameters
30-
network, ing_nodes = reader.read_network(NETWORK_FILE, node_cap=10, link_cap=10)
30+
network, ing_nodes, eg_nodes = reader.read_network(NETWORK_FILE, node_cap=10, link_cap=10)
3131
sfc_list = reader.get_sfc(SERVICE_FUNCTIONS_FILE)
3232
sf_list = reader.get_sf(SERVICE_FUNCTIONS_FILE, RESOURCE_FUNCTION_PATH)
3333
config = reader.get_config(CONFIG_FILE)
@@ -38,7 +38,7 @@ def setUp(self):
3838
schedule = dummy_data.triangle_schedule
3939

4040
# Initialize Simulator and SimulatoParams objects
41-
self.simulator_params = SimulatorParams(network, ing_nodes, sfc_list, sf_list, config, self.metrics,
41+
self.simulator_params = SimulatorParams(network, ing_nodes, eg_nodes, sfc_list, sf_list, config, self.metrics,
4242
sf_placement=sf_placement, schedule=schedule)
4343
self.flow_simulator = FlowSimulator(self.env, self.simulator_params)
4444
self.flow_simulator.start()

0 commit comments

Comments
 (0)