base_response.py
27.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
import warnings
from .._compat import integer_types
from .._compat import string_types
from .._compat import text_type
from .._compat import to_bytes
from .._compat import to_native
from ..datastructures import Headers
from ..http import dump_cookie
from ..http import HTTP_STATUS_CODES
from ..http import remove_entity_headers
from ..urls import iri_to_uri
from ..urls import url_join
from ..utils import get_content_type
from ..wsgi import ClosingIterator
from ..wsgi import get_current_url
def _run_wsgi_app(*args):
"""This function replaces itself to ensure that the test module is not
imported unless required. DO NOT USE!
"""
global _run_wsgi_app
from ..test import run_wsgi_app as _run_wsgi_app
return _run_wsgi_app(*args)
def _warn_if_string(iterable):
"""Helper for the response objects to check if the iterable returned
to the WSGI server is not a string.
"""
if isinstance(iterable, string_types):
warnings.warn(
"Response iterable was set to a string. This will appear to"
" work but means that the server will send the data to the"
" client one character at a time. This is almost never"
" intended behavior, use 'response.data' to assign strings"
" to the response object.",
stacklevel=2,
)
def _iter_encoded(iterable, charset):
for item in iterable:
if isinstance(item, text_type):
yield item.encode(charset)
else:
yield item
def _clean_accept_ranges(accept_ranges):
if accept_ranges is True:
return "bytes"
elif accept_ranges is False:
return "none"
elif isinstance(accept_ranges, text_type):
return to_native(accept_ranges)
raise ValueError("Invalid accept_ranges value")
class BaseResponse(object):
"""Base response class. The most important fact about a response object
is that it's a regular WSGI application. It's initialized with a couple
of response parameters (headers, body, status code etc.) and will start a
valid WSGI response when called with the environ and start response
callable.
Because it's a WSGI application itself processing usually ends before the
actual response is sent to the server. This helps debugging systems
because they can catch all the exceptions before responses are started.
Here a small example WSGI application that takes advantage of the
response objects::
from werkzeug.wrappers import BaseResponse as Response
def index():
return Response('Index page')
def application(environ, start_response):
path = environ.get('PATH_INFO') or '/'
if path == '/':
response = index()
else:
response = Response('Not Found', status=404)
return response(environ, start_response)
Like :class:`BaseRequest` which object is lacking a lot of functionality
implemented in mixins. This gives you a better control about the actual
API of your response objects, so you can create subclasses and add custom
functionality. A full featured response object is available as
:class:`Response` which implements a couple of useful mixins.
To enforce a new type of already existing responses you can use the
:meth:`force_type` method. This is useful if you're working with different
subclasses of response objects and you want to post process them with a
known interface.
Per default the response object will assume all the text data is `utf-8`
encoded. Please refer to :doc:`the unicode chapter </unicode>` for more
details about customizing the behavior.
Response can be any kind of iterable or string. If it's a string it's
considered being an iterable with one item which is the string passed.
Headers can be a list of tuples or a
:class:`~werkzeug.datastructures.Headers` object.
Special note for `mimetype` and `content_type`: For most mime types
`mimetype` and `content_type` work the same, the difference affects
only 'text' mimetypes. If the mimetype passed with `mimetype` is a
mimetype starting with `text/`, the charset parameter of the response
object is appended to it. In contrast the `content_type` parameter is
always added as header unmodified.
.. versionchanged:: 0.5
the `direct_passthrough` parameter was added.
:param response: a string or response iterable.
:param status: a string with a status or an integer with the status code.
:param headers: a list of headers or a
:class:`~werkzeug.datastructures.Headers` object.
:param mimetype: the mimetype for the response. See notice above.
:param content_type: the content type for the response. See notice above.
:param direct_passthrough: if set to `True` :meth:`iter_encoded` is not
called before iteration which makes it
possible to pass special iterators through
unchanged (see :func:`wrap_file` for more
details.)
"""
#: the charset of the response.
charset = "utf-8"
#: the default status if none is provided.
default_status = 200
#: the default mimetype if none is provided.
default_mimetype = "text/plain"
#: if set to `False` accessing properties on the response object will
#: not try to consume the response iterator and convert it into a list.
#:
#: .. versionadded:: 0.6.2
#:
#: That attribute was previously called `implicit_seqence_conversion`.
#: (Notice the typo). If you did use this feature, you have to adapt
#: your code to the name change.
implicit_sequence_conversion = True
#: Should this response object correct the location header to be RFC
#: conformant? This is true by default.
#:
#: .. versionadded:: 0.8
autocorrect_location_header = True
#: Should this response object automatically set the content-length
#: header if possible? This is true by default.
#:
#: .. versionadded:: 0.8
automatically_set_content_length = True
#: Warn if a cookie header exceeds this size. The default, 4093, should be
#: safely `supported by most browsers <cookie_>`_. A cookie larger than
#: this size will still be sent, but it may be ignored or handled
#: incorrectly by some browsers. Set to 0 to disable this check.
#:
#: .. versionadded:: 0.13
#:
#: .. _`cookie`: http://browsercookielimits.squawky.net/
max_cookie_size = 4093
def __init__(
self,
response=None,
status=None,
headers=None,
mimetype=None,
content_type=None,
direct_passthrough=False,
):
if isinstance(headers, Headers):
self.headers = headers
elif not headers:
self.headers = Headers()
else:
self.headers = Headers(headers)
if content_type is None:
if mimetype is None and "content-type" not in self.headers:
mimetype = self.default_mimetype
if mimetype is not None:
mimetype = get_content_type(mimetype, self.charset)
content_type = mimetype
if content_type is not None:
self.headers["Content-Type"] = content_type
if status is None:
status = self.default_status
if isinstance(status, integer_types):
self.status_code = status
else:
self.status = status
self.direct_passthrough = direct_passthrough
self._on_close = []
# we set the response after the headers so that if a class changes
# the charset attribute, the data is set in the correct charset.
if response is None:
self.response = []
elif isinstance(response, (text_type, bytes, bytearray)):
self.set_data(response)
else:
self.response = response
def call_on_close(self, func):
"""Adds a function to the internal list of functions that should
be called as part of closing down the response. Since 0.7 this
function also returns the function that was passed so that this
can be used as a decorator.
.. versionadded:: 0.6
"""
self._on_close.append(func)
return func
def __repr__(self):
if self.is_sequence:
body_info = "%d bytes" % sum(map(len, self.iter_encoded()))
else:
body_info = "streamed" if self.is_streamed else "likely-streamed"
return "<%s %s [%s]>" % (self.__class__.__name__, body_info, self.status)
@classmethod
def force_type(cls, response, environ=None):
"""Enforce that the WSGI response is a response object of the current
type. Werkzeug will use the :class:`BaseResponse` internally in many
situations like the exceptions. If you call :meth:`get_response` on an
exception you will get back a regular :class:`BaseResponse` object, even
if you are using a custom subclass.
This method can enforce a given response type, and it will also
convert arbitrary WSGI callables into response objects if an environ
is provided::
# convert a Werkzeug response object into an instance of the
# MyResponseClass subclass.
response = MyResponseClass.force_type(response)
# convert any WSGI application into a response object
response = MyResponseClass.force_type(response, environ)
This is especially useful if you want to post-process responses in
the main dispatcher and use functionality provided by your subclass.
Keep in mind that this will modify response objects in place if
possible!
:param response: a response object or wsgi application.
:param environ: a WSGI environment object.
:return: a response object.
"""
if not isinstance(response, BaseResponse):
if environ is None:
raise TypeError(
"cannot convert WSGI application into response"
" objects without an environ"
)
response = BaseResponse(*_run_wsgi_app(response, environ))
response.__class__ = cls
return response
@classmethod
def from_app(cls, app, environ, buffered=False):
"""Create a new response object from an application output. This
works best if you pass it an application that returns a generator all
the time. Sometimes applications may use the `write()` callable
returned by the `start_response` function. This tries to resolve such
edge cases automatically. But if you don't get the expected output
you should set `buffered` to `True` which enforces buffering.
:param app: the WSGI application to execute.
:param environ: the WSGI environment to execute against.
:param buffered: set to `True` to enforce buffering.
:return: a response object.
"""
return cls(*_run_wsgi_app(app, environ, buffered))
@property
def status_code(self):
"""The HTTP status code as a number."""
return self._status_code
@status_code.setter
def status_code(self, code):
self._status_code = code
try:
self._status = "%d %s" % (code, HTTP_STATUS_CODES[code].upper())
except KeyError:
self._status = "%d UNKNOWN" % code
@property
def status(self):
"""The HTTP status code as a string."""
return self._status
@status.setter
def status(self, value):
try:
self._status = to_native(value)
except AttributeError:
raise TypeError("Invalid status argument")
try:
self._status_code = int(self._status.split(None, 1)[0])
except ValueError:
self._status_code = 0
self._status = "0 %s" % self._status
except IndexError:
raise ValueError("Empty status argument")
def get_data(self, as_text=False):
"""The string representation of the request body. Whenever you call
this property the request iterable is encoded and flattened. This
can lead to unwanted behavior if you stream big data.
This behavior can be disabled by setting
:attr:`implicit_sequence_conversion` to `False`.
If `as_text` is set to `True` the return value will be a decoded
unicode string.
.. versionadded:: 0.9
"""
self._ensure_sequence()
rv = b"".join(self.iter_encoded())
if as_text:
rv = rv.decode(self.charset)
return rv
def set_data(self, value):
"""Sets a new string as response. The value set must be either a
unicode or bytestring. If a unicode string is set it's encoded
automatically to the charset of the response (utf-8 by default).
.. versionadded:: 0.9
"""
# if an unicode string is set, it's encoded directly so that we
# can set the content length
if isinstance(value, text_type):
value = value.encode(self.charset)
else:
value = bytes(value)
self.response = [value]
if self.automatically_set_content_length:
self.headers["Content-Length"] = str(len(value))
data = property(
get_data,
set_data,
doc="A descriptor that calls :meth:`get_data` and :meth:`set_data`.",
)
def calculate_content_length(self):
"""Returns the content length if available or `None` otherwise."""
try:
self._ensure_sequence()
except RuntimeError:
return None
return sum(len(x) for x in self.iter_encoded())
def _ensure_sequence(self, mutable=False):
"""This method can be called by methods that need a sequence. If
`mutable` is true, it will also ensure that the response sequence
is a standard Python list.
.. versionadded:: 0.6
"""
if self.is_sequence:
# if we need a mutable object, we ensure it's a list.
if mutable and not isinstance(self.response, list):
self.response = list(self.response)
return
if self.direct_passthrough:
raise RuntimeError(
"Attempted implicit sequence conversion but the"
" response object is in direct passthrough mode."
)
if not self.implicit_sequence_conversion:
raise RuntimeError(
"The response object required the iterable to be a"
" sequence, but the implicit conversion was disabled."
" Call make_sequence() yourself."
)
self.make_sequence()
def make_sequence(self):
"""Converts the response iterator in a list. By default this happens
automatically if required. If `implicit_sequence_conversion` is
disabled, this method is not automatically called and some properties
might raise exceptions. This also encodes all the items.
.. versionadded:: 0.6
"""
if not self.is_sequence:
# if we consume an iterable we have to ensure that the close
# method of the iterable is called if available when we tear
# down the response
close = getattr(self.response, "close", None)
self.response = list(self.iter_encoded())
if close is not None:
self.call_on_close(close)
def iter_encoded(self):
"""Iter the response encoded with the encoding of the response.
If the response object is invoked as WSGI application the return
value of this method is used as application iterator unless
:attr:`direct_passthrough` was activated.
"""
if __debug__:
_warn_if_string(self.response)
# Encode in a separate function so that self.response is fetched
# early. This allows us to wrap the response with the return
# value from get_app_iter or iter_encoded.
return _iter_encoded(self.response, self.charset)
def set_cookie(
self,
key,
value="",
max_age=None,
expires=None,
path="/",
domain=None,
secure=False,
httponly=False,
samesite=None,
):
"""Sets a cookie. The parameters are the same as in the cookie `Morsel`
object in the Python standard library but it accepts unicode data, too.
A warning is raised if the size of the cookie header exceeds
:attr:`max_cookie_size`, but the header will still be set.
:param key: the key (name) of the cookie to be set.
:param value: the value of the cookie.
:param max_age: should be a number of seconds, or `None` (default) if
the cookie should last only as long as the client's
browser session.
:param expires: should be a `datetime` object or UNIX timestamp.
:param path: limits the cookie to a given path, per default it will
span the whole domain.
:param domain: if you want to set a cross-domain cookie. For example,
``domain=".example.com"`` will set a cookie that is
readable by the domain ``www.example.com``,
``foo.example.com`` etc. Otherwise, a cookie will only
be readable by the domain that set it.
:param secure: If `True`, the cookie will only be available via HTTPS
:param httponly: disallow JavaScript to access the cookie. This is an
extension to the cookie standard and probably not
supported by all browsers.
:param samesite: Limits the scope of the cookie such that it will only
be attached to requests if those requests are
"same-site".
"""
self.headers.add(
"Set-Cookie",
dump_cookie(
key,
value=value,
max_age=max_age,
expires=expires,
path=path,
domain=domain,
secure=secure,
httponly=httponly,
charset=self.charset,
max_size=self.max_cookie_size,
samesite=samesite,
),
)
def delete_cookie(self, key, path="/", domain=None):
"""Delete a cookie. Fails silently if key doesn't exist.
:param key: the key (name) of the cookie to be deleted.
:param path: if the cookie that should be deleted was limited to a
path, the path has to be defined here.
:param domain: if the cookie that should be deleted was limited to a
domain, that domain has to be defined here.
"""
self.set_cookie(key, expires=0, max_age=0, path=path, domain=domain)
@property
def is_streamed(self):
"""If the response is streamed (the response is not an iterable with
a length information) this property is `True`. In this case streamed
means that there is no information about the number of iterations.
This is usually `True` if a generator is passed to the response object.
This is useful for checking before applying some sort of post
filtering that should not take place for streamed responses.
"""
try:
len(self.response)
except (TypeError, AttributeError):
return True
return False
@property
def is_sequence(self):
"""If the iterator is buffered, this property will be `True`. A
response object will consider an iterator to be buffered if the
response attribute is a list or tuple.
.. versionadded:: 0.6
"""
return isinstance(self.response, (tuple, list))
def close(self):
"""Close the wrapped response if possible. You can also use the object
in a with statement which will automatically close it.
.. versionadded:: 0.9
Can now be used in a with statement.
"""
if hasattr(self.response, "close"):
self.response.close()
for func in self._on_close:
func()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
self.close()
def freeze(self):
"""Call this method if you want to make your response object ready for
being pickled. This buffers the generator if there is one. It will
also set the `Content-Length` header to the length of the body.
.. versionchanged:: 0.6
The `Content-Length` header is now set.
"""
# we explicitly set the length to a list of the *encoded* response
# iterator. Even if the implicit sequence conversion is disabled.
self.response = list(self.iter_encoded())
self.headers["Content-Length"] = str(sum(map(len, self.response)))
def get_wsgi_headers(self, environ):
"""This is automatically called right before the response is started
and returns headers modified for the given environment. It returns a
copy of the headers from the response with some modifications applied
if necessary.
For example the location header (if present) is joined with the root
URL of the environment. Also the content length is automatically set
to zero here for certain status codes.
.. versionchanged:: 0.6
Previously that function was called `fix_headers` and modified
the response object in place. Also since 0.6, IRIs in location
and content-location headers are handled properly.
Also starting with 0.6, Werkzeug will attempt to set the content
length if it is able to figure it out on its own. This is the
case if all the strings in the response iterable are already
encoded and the iterable is buffered.
:param environ: the WSGI environment of the request.
:return: returns a new :class:`~werkzeug.datastructures.Headers`
object.
"""
headers = Headers(self.headers)
location = None
content_location = None
content_length = None
status = self.status_code
# iterate over the headers to find all values in one go. Because
# get_wsgi_headers is used each response that gives us a tiny
# speedup.
for key, value in headers:
ikey = key.lower()
if ikey == u"location":
location = value
elif ikey == u"content-location":
content_location = value
elif ikey == u"content-length":
content_length = value
# make sure the location header is an absolute URL
if location is not None:
old_location = location
if isinstance(location, text_type):
# Safe conversion is necessary here as we might redirect
# to a broken URI scheme (for instance itms-services).
location = iri_to_uri(location, safe_conversion=True)
if self.autocorrect_location_header:
current_url = get_current_url(environ, strip_querystring=True)
if isinstance(current_url, text_type):
current_url = iri_to_uri(current_url)
location = url_join(current_url, location)
if location != old_location:
headers["Location"] = location
# make sure the content location is a URL
if content_location is not None and isinstance(content_location, text_type):
headers["Content-Location"] = iri_to_uri(content_location)
if 100 <= status < 200 or status == 204:
# Per section 3.3.2 of RFC 7230, "a server MUST NOT send a
# Content-Length header field in any response with a status
# code of 1xx (Informational) or 204 (No Content)."
headers.remove("Content-Length")
elif status == 304:
remove_entity_headers(headers)
# if we can determine the content length automatically, we
# should try to do that. But only if this does not involve
# flattening the iterator or encoding of unicode strings in
# the response. We however should not do that if we have a 304
# response.
if (
self.automatically_set_content_length
and self.is_sequence
and content_length is None
and status not in (204, 304)
and not (100 <= status < 200)
):
try:
content_length = sum(len(to_bytes(x, "ascii")) for x in self.response)
except UnicodeError:
# aha, something non-bytestringy in there, too bad, we
# can't safely figure out the length of the response.
pass
else:
headers["Content-Length"] = str(content_length)
return headers
def get_app_iter(self, environ):
"""Returns the application iterator for the given environ. Depending
on the request method and the current status code the return value
might be an empty response rather than the one from the response.
If the request method is `HEAD` or the status code is in a range
where the HTTP specification requires an empty response, an empty
iterable is returned.
.. versionadded:: 0.6
:param environ: the WSGI environment of the request.
:return: a response iterable.
"""
status = self.status_code
if (
environ["REQUEST_METHOD"] == "HEAD"
or 100 <= status < 200
or status in (204, 304)
):
iterable = ()
elif self.direct_passthrough:
if __debug__:
_warn_if_string(self.response)
return self.response
else:
iterable = self.iter_encoded()
return ClosingIterator(iterable, self.close)
def get_wsgi_response(self, environ):
"""Returns the final WSGI response as tuple. The first item in
the tuple is the application iterator, the second the status and
the third the list of headers. The response returned is created
specially for the given environment. For example if the request
method in the WSGI environment is ``'HEAD'`` the response will
be empty and only the headers and status code will be present.
.. versionadded:: 0.6
:param environ: the WSGI environment of the request.
:return: an ``(app_iter, status, headers)`` tuple.
"""
headers = self.get_wsgi_headers(environ)
app_iter = self.get_app_iter(environ)
return app_iter, self.status, headers.to_wsgi_list()
def __call__(self, environ, start_response):
"""Process this response as WSGI application.
:param environ: the WSGI environment.
:param start_response: the response callable provided by the WSGI
server.
:return: an application iterator
"""
app_iter, status, headers = self.get_wsgi_response(environ)
start_response(status, headers)
return app_iter