-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtracer.py
169 lines (151 loc) · 5.39 KB
/
tracer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python3
import socket;
import struct;
import random;
import time;
import select;
import math;
import sys;
ICMP_ECHO_REQUEST = 8
ICMP_CODE = socket.getprotobyname('icmp');
"""
Given the bytes array, it calculates the checksum and returns it.
"""
def checksum(source_string):
# I'm not too confident that this is right but testing seems to
# suggest that it gives the same answers as in_cksum in ping.c.
sum = 0
count_to = (len(source_string) / 2) * 2
count = 0
while count < count_to:
this_val = (source_string[count + 1])*256 + (source_string[count])
sum = sum + this_val
sum = sum & 0xffffffff # Necessary?
count = count + 2
if count_to < len(source_string):
sum = sum + (source_string[len(source_string) - 1])
sum = sum & 0xffffffff # Necessary?
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
answer = ~sum
answer = answer & 0xffff
# Swap bytes. Bugger me if I know why.
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
"""
Create a new echo request packet based on the given "id".
"""
def create_packet(id):
# Header is type (8), code (8), checksum (16), id (16), sequence (16)
header = struct.pack('bbHHh', ICMP_ECHO_REQUEST, 0, 0, id, 1)
data = ''
# Calculate the checksum on the data and the dummy header.
my_checksum = checksum(header + data.encode('utf-8'))
# Now that we have the right checksum, we put that in. It's just easier
# to make up a new header than to stuff it into the dummy.
header = struct.pack('bbHHh', ICMP_ECHO_REQUEST, 0,
socket.htons(my_checksum), id, 1)
return header + data.encode('utf-8')
"""
Receive the ping from the socket.
Returns 0 if a timeout occurs.
Returns (src_ip_address, ping_time_milliseconds) if successful.
"""
def receive_ping(my_socket, packet_id, time_sent, timeout):
time_left = timeout
while True:
started_select = time.time()
ready = select.select([my_socket], [], [], time_left)
how_long_in_select = time.time() - started_select
if ready[0] == []: # Timeout
return 0
time_received = time.time()
rec_packet, addr = my_socket.recvfrom(1024)
# The last 8 bytes are the header of the packet we sent to the server
icmp_header = rec_packet[-8:]
type, code, checksum, p_id, sequence = struct.unpack(
'bbHHh', icmp_header)
if p_id == packet_id:
total_time_ms = (time_received - time_sent) * 1000
# Round to 3 decimal places:
total_time_ms = math.ceil(total_time_ms * 1000) / 1000
return (addr[0], total_time_ms)
time_left -= time_received - time_sent
if time_left <= 0:
return 0
"""
Sends an ICMP ping to the given host, and gets the response for that ping.
It sets the TTL in the IP header of the packet to the given value.
Returns 0 if a timeout occurs.
Returns (src_ip_address, ping_time_milliseconds) if successful.
"""
def echo_one(host, ttl):
my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, ICMP_CODE)
my_socket.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl)
# Maximum for an unsigned short int c object counts to 65535 so
# we have to sure that our packet id is not greater than that.
packet_id = int(random.random() * 65535)
packet = create_packet(packet_id)
while packet:
# The icmp protocol does not use a port, but the function
# below expects it, so we just give it a dummy port.
sent = my_socket.sendto(packet, (host, 1))
packet = packet[sent:]
ping_res = receive_ping(my_socket, packet_id, time.time(), timeout)
my_socket.close()
return ping_res
"""
Given the host and a TTL value, it sends 3 pings.
Formats a nice user friendly string.
Returns (user_friendly_string, destination_reached).
destination_reached is True if the IP address who replied matches the host,
and False otherwise.
"""
def echo_three(host, ttl):
try1 = echo_one(host, ttl)
try2 = echo_one(host, ttl)
try3 = echo_one(host, ttl)
if try1 == 0:
try1str = '*'
else:
try1str = try1[0] + ' - ' + str(try1[1]) + ' ms'
if try2 == 0:
try2str = '*'
else:
try2str = try2[0] + ' - ' + str(try2[1]) + ' ms'
if try3 == 0:
try3str = '*'
else:
try3str = try3[0] + ' - ' + str(try3[1]) + ' ms'
final_string = try1str + ', ' + try2str + ', ' + try3str
final_string = str(ttl) + ' ' + final_string
if try1 == 0:
destination_reached = False
else:
destination_reached = try1[0] == host
return (final_string, destination_reached)
# -------------------------- #
# Main execution starts here #
# -------------------------- #
if len(sys.argv) <= 1:
print(f'Error\n\tUsage: {sys.argv[0]} <hostname or ip>')
sys.exit(1)
dest_addr = sys.argv[1]
# Domain name to IP address conversion:
host = socket.gethostbyname(dest_addr)
timeout = 3
max_tries = 30
print('Trace to ' + dest_addr + ' (' + host + '), ' + str(max_tries) +
' hops max.')
try:
# Loop until we hit the maximum number of hops, or until we reach the
# final destination host:
for x in range(1, max_tries+1):
(line, destination_reached) = echo_three(host, x)
print(line)
if destination_reached:
break
except Exception as err:
print(err)
except KeyboardInterrupt as err:
print(err)