-
Notifications
You must be signed in to change notification settings - Fork 68
/
Copy pathsocks5srv.py
executable file
·277 lines (225 loc) · 9.54 KB
/
socks5srv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
#!/usr/bin/env python3
import argparse
import re
import select
import socket
import socketserver
# Usage: python3 socks5srv.py --port port [--auth username:password] [--map 'host:port to host:port' ...]
class AddressRemapper:
"""A helper for remapping (host, port) tuples to new (host, port) tuples
This is useful for Socks5 servers used in testing environments,
because the successful use of the Socks5 proxy can be demonstrated
by being able to 'connect' to a redirected port, which would always
fail without the proxy, even on localhost-only environments
"""
def __init__(self, mappings):
self.mappings = [
AddressRemapper.parse_single_mapping(string) for string in mappings
]
self.add_dns_remappings()
@staticmethod
def parse_single_mapping(string):
"""Parse a single mapping of the for '{host}:{port} to {host}:{port}'"""
# Accept either [ipv6]:port or host:port
host_re = r"(\[(?P<{0}_ipv6>[^[\]]+)\]|(?P<{0}_host>[^\[]+))"
port_re = r"(?P<{0}_port>\d+)"
src_re = host_re.format("src") + ":" + port_re.format("src")
dst_re = host_re.format("dst") + ":" + port_re.format("dst")
full_re = "^" + src_re + " to " + dst_re + "$"
match = re.match(full_re, string)
if match is None:
raise Exception(
f"Mapping {string} does not match format '{{host}}:{{port}} to {{host}}:{{port}}'"
)
src = (
(match.group("src_ipv6") or match.group("src_host")).encode("utf8"),
int(match.group("src_port")),
)
dst = (
(match.group("dst_ipv6") or match.group("dst_host")).encode("utf8"),
int(match.group("dst_port")),
)
return (src, dst)
def add_dns_remappings(self):
"""Add mappings for the IP addresses corresponding to hostnames
For example, if there is a mapping (localhost, 1000) to (localhost, 2000),
then this also adds (127.0.0.1, 1000) to (localhost, 2000)."""
for src, dst in self.mappings:
host, port = src
try:
addrs = socket.getaddrinfo(
host, port, socket.AF_UNSPEC, socket.SOCK_STREAM
)
except socket.gaierror:
continue
existing_src_entries = [src for src, dst in self.mappings]
for af, socktype, proto, canonname, sa in addrs:
if af == socket.AF_INET and sa not in existing_src_entries:
self.mappings.append((sa, dst))
elif af == socket.AF_INET6 and sa[:2] not in existing_src_entries:
self.mappings.append((sa[:2], dst))
def remap(self, hostport):
"""Re-map a (host, port) tuple to a new (host, port) tuple if that was requested"""
for src, dst in self.mappings:
if hostport == src:
return dst
return hostport
class Socks5Server(socketserver.ThreadingTCPServer):
"""A simple Socks5 proxy server"""
def __init__(self, server_address, RequestHandlerClass, args):
socketserver.ThreadingTCPServer.__init__(
self, server_address, RequestHandlerClass
)
self.args = args
self.address_remapper = AddressRemapper(args.map)
class Socks5Handler(socketserver.BaseRequestHandler):
"""Request handler for Socks5 connections"""
def finish(self):
"""Called after handle(), always just closes the connection"""
self.request.close()
def read_exact(self, n):
"""Read n bytes from a socket
In Socks5, strings are prefixed with a single byte containing
their length. This method reads a bytes string containing n bytes
(where n can be a number or a bytes object containing that
single byte).
If reading from the client ends prematurely, this returns None.
"""
if type(n) is bytes:
if len(n) == 0:
return None
assert len(n) == 1
n = n[0]
buf = bytearray(n)
mv = memoryview(buf)
bytes_read = 0
while bytes_read < n:
try:
chunk_length = self.request.recv_into(mv[bytes_read:])
except OSError:
return None
if chunk_length == 0:
return None
bytes_read += chunk_length
return bytes(buf)
def create_outgoing_tcp_connection(self, dst, port):
"""Create an outgoing TCP connection to dst:port"""
outgoing = None
for res in socket.getaddrinfo(dst, port, socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
outgoing = socket.socket(af, socktype, proto)
except OSError:
continue
try:
outgoing.connect(sa)
except OSError:
outgoing.close()
continue
break
return outgoing
def handle(self):
"""Handle the Socks5 communication with a freshly connected client"""
# This implements the Socks5 protocol as specified in
# https://datatracker.ietf.org/doc/html/rfc1928
# and username/password authentication as specified in
# https://datatracker.ietf.org/doc/html/rfc1929
# If you prefer HTML tables over ASCII tables, Wikipedia
# also currently has a decent description of the protocol in
# https://en.wikipedia.org/wiki/SOCKS#SOCKS5.
# Receive/send errors are intentionally left unhandled. Closing
# the socket is just fine in that case for us.
# Client greeting
if self.request.recv(1) != b"\x05": # Socks5 only
return
n_auth = self.request.recv(1)
client_auth_methods = self.read_exact(n_auth)
if client_auth_methods is None:
return
# choose either no-auth or username/password
required_auth_method = b"\x00" if self.server.args.auth is None else b"\x02"
if required_auth_method not in client_auth_methods:
self.request.sendall(b"\x05\xff")
return
self.request.sendall(b"\x05" + required_auth_method)
if required_auth_method == b"\x02":
auth_version = self.request.recv(1)
if auth_version != b"\x01": # Only username/password auth v1
return
username_len = self.request.recv(1)
username = self.read_exact(username_len)
password_len = self.request.recv(1)
password = self.read_exact(password_len)
if username is None or password is None:
return
if (
username.decode("utf8") + ":" + password.decode("utf8")
!= self.server.args.auth
):
return
self.request.sendall(b"\x01\x00") # auth success
if self.request.recv(1) != b"\x05": # Socks5 only
return
if self.request.recv(1) != b"\x01": # Outgoing TCP only
return
if self.request.recv(1) != b"\x00": # Reserved, must be 0
return
addrtype = self.request.recv(1)
dst = None
if addrtype == b"\x01": # IPv4
ipv4raw = self.read_exact(4)
if ipv4raw is not None:
dst = ".".join(["{}"] * 4).format(*ipv4raw)
elif addrtype == b"\x03": # Domain
domain_len = self.request.recv(1)
dst = self.read_exact(domain_len)
elif addrtype == b"\x04": # IPv6
ipv6raw = self.read_exact(16)
if ipv6raw is not None:
dst = ":".join(["{:0>2x}{:0>2x}"] * 8).format(*ipv6raw)
else:
return
if dst is None:
return
portraw = self.read_exact(2)
port = portraw[0] * 256 + portraw[1]
(dst, port) = self.server.address_remapper.remap((dst, port))
outgoing = self.create_outgoing_tcp_connection(dst, port)
if outgoing is None:
self.request.sendall(b"\x05\x01\x00") # just report a general failure
return
# success response, do not bother actually stating the locally bound
# host/port address and instead always say 127.0.0.1:4096.
# for our use case, the client will not be making meaningful use
# of this anyway
self.request.sendall(b"\x05\x00\x00\x01\x7f\x00\x00\x01\x10\x00")
self.raw_proxy(self.request, outgoing)
def raw_proxy(self, a, b):
"""Proxy data between sockets a and b as-is"""
with a, b:
while True:
try:
(readable, _, _) = select.select([a, b], [], [])
except (OSError, ValueError):
return
if not readable:
continue
for sock in readable:
buf = sock.recv(4096)
if buf == b"":
return
if sock is a:
b.sendall(buf)
else:
a.sendall(buf)
def main():
parser = argparse.ArgumentParser(description="Start a Socks5 proxy server.")
parser.add_argument("--port", type=int, required=True)
parser.add_argument("--auth", type=str)
parser.add_argument("--map", type=str, action="append", default=[])
args = parser.parse_args()
socketserver.TCPServer.allow_reuse_address = True
with Socks5Server(("localhost", args.port), Socks5Handler, args) as server:
server.serve_forever()
if __name__ == "__main__":
main()