-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathhttp_writer.py
294 lines (236 loc) · 8.41 KB
/
http_writer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
"""Http related parsers and protocol."""
import collections
import socket
import zlib
from .abc import AbstractPayloadWriter
from .helpers import noop
__all__ = ('PayloadWriter', 'HttpVersion', 'HttpVersion10', 'HttpVersion11',
'StreamWriter')
HttpVersion = collections.namedtuple('HttpVersion', ['major', 'minor'])
HttpVersion10 = HttpVersion(1, 0)
HttpVersion11 = HttpVersion(1, 1)
if hasattr(socket, 'TCP_CORK'): # pragma: no cover
CORK = socket.TCP_CORK
elif hasattr(socket, 'TCP_NOPUSH'): # pragma: no cover
CORK = socket.TCP_NOPUSH
else: # pragma: no cover
CORK = None
class StreamWriter:
def __init__(self, protocol, transport, loop):
self._protocol = protocol
self._loop = loop
self._tcp_nodelay = False
self._tcp_cork = False
self._socket = transport.get_extra_info('socket')
self._waiters = []
self.available = True
self.transport = transport
def acquire(self, writer):
if self.available:
self.available = False
writer.set_transport(self.transport)
else:
self._waiters.append(writer)
def release(self):
if self._waiters:
self.available = False
writer = self._waiters.pop(0)
writer.set_transport(self.transport)
else:
self.available = True
def replace(self, writer, factory):
try:
idx = self._waiters.index(writer)
writer = factory(self, self._loop, False)
self._waiters[idx] = writer
return writer
except ValueError:
self.available = True
return factory(self, self._loop)
@property
def tcp_nodelay(self):
return self._tcp_nodelay
def set_tcp_nodelay(self, value):
value = bool(value)
if self._tcp_nodelay == value:
return
if self._socket is None:
return
if self._socket.family not in (socket.AF_INET, socket.AF_INET6):
return
# socket may be closed already, on windows OSError get raised
try:
if self._tcp_cork:
if CORK is not None: # pragma: no branch
self._socket.setsockopt(socket.IPPROTO_TCP, CORK, False)
self._tcp_cork = False
self._socket.setsockopt(
socket.IPPROTO_TCP, socket.TCP_NODELAY, value)
self._tcp_nodelay = value
except OSError:
pass
@property
def tcp_cork(self):
return self._tcp_cork
def set_tcp_cork(self, value):
value = bool(value)
if self._tcp_cork == value:
return
if self._socket is None:
return
if self._socket.family not in (socket.AF_INET, socket.AF_INET6):
return
try:
if self._tcp_nodelay:
self._socket.setsockopt(
socket.IPPROTO_TCP, socket.TCP_NODELAY, False)
self._tcp_nodelay = False
if CORK is not None: # pragma: no branch
self._socket.setsockopt(socket.IPPROTO_TCP, CORK, value)
self._tcp_cork = value
except OSError:
pass
async def drain(self):
"""Flush the write buffer.
The intended use is to write
w.write(data)
await w.drain()
"""
if self._protocol.transport is not None:
await self._protocol._drain_helper()
class PayloadWriter(AbstractPayloadWriter):
def __init__(self, stream, loop, acquire=True):
self._stream = stream
self._transport = None
self.loop = loop
self.length = None
self.chunked = False
self.buffer_size = 0
self.output_size = 0
self._eof = False
self._buffer = []
self._compress = None
self._drain_waiter = None
if self._stream.available:
self._transport = self._stream.transport
self._stream.available = False
elif acquire:
self._stream.acquire(self)
def set_transport(self, transport):
self._transport = transport
chunk = b''.join(self._buffer)
if chunk:
transport.write(chunk)
self._buffer.clear()
if self._drain_waiter is not None:
waiter, self._drain_waiter = self._drain_waiter, None
if not waiter.done():
waiter.set_result(None)
@property
def tcp_nodelay(self):
return self._stream.tcp_nodelay
def set_tcp_nodelay(self, value):
self._stream.set_tcp_nodelay(value)
@property
def tcp_cork(self):
return self._stream.tcp_cork
def set_tcp_cork(self, value):
self._stream.set_tcp_cork(value)
def enable_chunking(self):
self.chunked = True
def enable_compression(self, encoding='deflate'):
zlib_mode = (16 + zlib.MAX_WBITS
if encoding == 'gzip' else -zlib.MAX_WBITS)
self._compress = zlib.compressobj(wbits=zlib_mode)
def buffer_data(self, chunk):
if chunk:
size = len(chunk)
self.buffer_size += size
self.output_size += size
self._buffer.append(chunk)
def _write(self, chunk):
size = len(chunk)
self.buffer_size += size
self.output_size += size
if self._transport is not None:
if self._buffer:
self._buffer.append(chunk)
self._transport.write(b''.join(self._buffer))
self._buffer.clear()
else:
self._transport.write(chunk)
else:
self._buffer.append(chunk)
def write(self, chunk, *, drain=True, LIMIT=64*1024):
"""Writes chunk of data to a stream.
write_eof() indicates end of stream.
writer can't be used after write_eof() method being called.
write() return drain future.
"""
if self._compress is not None:
chunk = self._compress.compress(chunk)
if not chunk:
return noop()
if self.length is not None:
chunk_len = len(chunk)
if self.length >= chunk_len:
self.length = self.length - chunk_len
else:
chunk = chunk[:self.length]
self.length = 0
if not chunk:
return noop()
if chunk:
if self.chunked:
chunk_len = ('%x\r\n' % len(chunk)).encode('ascii')
chunk = chunk_len + chunk + b'\r\n'
self._write(chunk)
if self.buffer_size > LIMIT and drain:
self.buffer_size = 0
return self.drain()
return noop()
def write_headers(self, status_line, headers, SEP=': ', END='\r\n'):
"""Write request/response status and headers."""
# status + headers
headers = status_line + ''.join(
[k + SEP + v + END for k, v in headers.items()])
headers = headers.encode('utf-8') + b'\r\n'
size = len(headers)
self.buffer_size += size
self.output_size += size
self._buffer.append(headers)
async def write_eof(self, chunk=b''):
if self._eof:
return
if self._compress:
if chunk:
chunk = self._compress.compress(chunk)
chunk = chunk + self._compress.flush()
if chunk and self.chunked:
chunk_len = ('%x\r\n' % len(chunk)).encode('ascii')
chunk = chunk_len + chunk + b'\r\n0\r\n\r\n'
else:
if self.chunked:
if chunk:
chunk_len = ('%x\r\n' % len(chunk)).encode('ascii')
chunk = chunk_len + chunk + b'\r\n0\r\n\r\n'
else:
chunk = b'0\r\n\r\n'
if chunk:
self.buffer_data(chunk)
await self.drain(True)
self._eof = True
self._transport = None
self._stream.release()
async def drain(self, last=False):
if self._transport is not None:
if self._buffer:
self._transport.write(b''.join(self._buffer))
if not last:
self._buffer.clear()
await self._stream.drain()
else:
# wait for transport
if self._drain_waiter is None:
self._drain_waiter = self.loop.create_future()
await self._drain_waiter