-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathhttp_writer.py
139 lines (106 loc) · 3.88 KB
/
http_writer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""Http related parsers and protocol."""
import asyncio
import collections
import zlib
from .abc import AbstractStreamWriter
from .helpers import noop
__all__ = ('StreamWriter', 'HttpVersion', 'HttpVersion10', 'HttpVersion11')
HttpVersion = collections.namedtuple('HttpVersion', ['major', 'minor'])
HttpVersion10 = HttpVersion(1, 0)
HttpVersion11 = HttpVersion(1, 1)
class StreamWriter(AbstractStreamWriter):
def __init__(self, protocol, transport, loop):
self._protocol = protocol
self._transport = transport
self.loop = loop
self.length = None
self.chunked = False
self.buffer_size = 0
self.output_size = 0
self._eof = False
self._compress = None
self._drain_waiter = None
@property
def transport(self):
return self._transport
@property
def protocol(self):
return self._protocol
def enable_chunking(self):
self.chunked = True
def enable_compression(self, encoding='deflate'):
zlib_mode = (16 + zlib.MAX_WBITS
if encoding == 'gzip' else -zlib.MAX_WBITS)
self._compress = zlib.compressobj(wbits=zlib_mode)
def _write(self, chunk):
size = len(chunk)
self.buffer_size += size
self.output_size += size
if self._transport.is_closing():
raise asyncio.CancelledError('Cannot write to closing transport')
self._transport.write(chunk)
def write(self, chunk, *, drain=True, LIMIT=64*1024):
"""Writes chunk of data to a stream.
write_eof() indicates end of stream.
writer can't be used after write_eof() method being called.
write() return drain future.
"""
if self._compress is not None:
chunk = self._compress.compress(chunk)
if not chunk:
return noop()
if self.length is not None:
chunk_len = len(chunk)
if self.length >= chunk_len:
self.length = self.length - chunk_len
else:
chunk = chunk[:self.length]
self.length = 0
if not chunk:
return noop()
if chunk:
if self.chunked:
chunk_len = ('%x\r\n' % len(chunk)).encode('ascii')
chunk = chunk_len + chunk + b'\r\n'
self._write(chunk)
if self.buffer_size > LIMIT and drain:
self.buffer_size = 0
return self.drain()
return noop()
def write_headers(self, status_line, headers, SEP=': ', END='\r\n'):
"""Write request/response status and headers."""
# status + headers
headers = status_line + ''.join(
[k + SEP + v + END for k, v in headers.items()])
headers = headers.encode('utf-8') + b'\r\n'
self._write(headers)
async def write_eof(self, chunk=b''):
if self._eof:
return
if self._compress:
if chunk:
chunk = self._compress.compress(chunk)
chunk = chunk + self._compress.flush()
if chunk and self.chunked:
chunk_len = ('%x\r\n' % len(chunk)).encode('ascii')
chunk = chunk_len + chunk + b'\r\n0\r\n\r\n'
else:
if self.chunked:
if chunk:
chunk_len = ('%x\r\n' % len(chunk)).encode('ascii')
chunk = chunk_len + chunk + b'\r\n0\r\n\r\n'
else:
chunk = b'0\r\n\r\n'
if chunk:
self._write(chunk)
await self.drain()
self._eof = True
self._transport = None
async def drain(self):
"""Flush the write buffer.
The intended use is to write
await w.write(data)
await w.drain()
"""
if self._protocol.transport is not None:
await self._protocol._drain_helper()