Package gluon :: Module rocket
[hide private]
[frames] | no frames]

Source Code for Module gluon.rocket

   1  # -*- coding: utf-8 -*- 
   2   
   3  # This file is part of the Rocket Web Server 
   4  # Copyright (c) 2011 Timothy Farrell 
   5  # Modified by Massimo Di Pierro 
   6   
   7  # Import System Modules 
   8  import sys 
   9  import errno 
  10  import socket 
  11  import logging 
  12  import platform 
  13   
  14  # Define Constants 
  15  VERSION = '1.2.6' 
  16  SERVER_NAME = socket.gethostname() 
  17  SERVER_SOFTWARE = 'Rocket %s' % VERSION 
  18  HTTP_SERVER_SOFTWARE = '%s Python/%s' % ( 
  19      SERVER_SOFTWARE, sys.version.split(' ')[0]) 
  20  BUF_SIZE = 16384 
  21  SOCKET_TIMEOUT = 10  # in secs 
  22  THREAD_STOP_CHECK_INTERVAL = 1  # in secs, How often should threads check for a server stop message? 
  23  IS_JYTHON = platform.system() == 'Java'  # Handle special cases for Jython 
  24  IGNORE_ERRORS_ON_CLOSE = set([errno.ECONNABORTED, errno.ECONNRESET]) 
  25  DEFAULT_LISTEN_QUEUE_SIZE = 5 
  26  DEFAULT_MIN_THREADS = 10 
  27  DEFAULT_MAX_THREADS = 0 
  28  DEFAULTS = dict(LISTEN_QUEUE_SIZE=DEFAULT_LISTEN_QUEUE_SIZE, 
  29                  MIN_THREADS=DEFAULT_MIN_THREADS, 
  30                  MAX_THREADS=DEFAULT_MAX_THREADS) 
  31   
  32  PY3K = sys.version_info[0] > 2 
  33   
  34   
35 -class NullHandler(logging.Handler):
36 "A Logging handler to prevent library errors."
37 - def emit(self, record):
38 pass
39 40 if PY3K:
41 - def b(val):
42 """ Convert string/unicode/bytes literals into bytes. This allows for 43 the same code to run on Python 2.x and 3.x. """ 44 if isinstance(val, str): 45 return val.encode() 46 else: 47 return val
48
49 - def u(val, encoding="us-ascii"):
50 """ Convert bytes into string/unicode. This allows for the 51 same code to run on Python 2.x and 3.x. """ 52 if isinstance(val, bytes): 53 return val.decode(encoding) 54 else: 55 return val
56 57 else:
58 - def b(val):
59 """ Convert string/unicode/bytes literals into bytes. This allows for 60 the same code to run on Python 2.x and 3.x. """ 61 if isinstance(val, unicode): 62 return val.encode() 63 else: 64 return val
65
66 - def u(val, encoding="us-ascii"):
67 """ Convert bytes into string/unicode. This allows for the 68 same code to run on Python 2.x and 3.x. """ 69 if isinstance(val, str): 70 return val.decode(encoding) 71 else: 72 return val
73 74 # Import Package Modules 75 # package imports removed in monolithic build 76 77 __all__ = ['VERSION', 'SERVER_SOFTWARE', 'HTTP_SERVER_SOFTWARE', 'BUF_SIZE', 78 'IS_JYTHON', 'IGNORE_ERRORS_ON_CLOSE', 'DEFAULTS', 'PY3K', 'b', 'u', 79 'Rocket', 'CherryPyWSGIServer', 'SERVER_NAME', 'NullHandler'] 80 81 # Monolithic build...end of module: rocket/__init__.py 82 # Monolithic build...start of module: rocket/connection.py 83 84 # Import System Modules 85 import sys 86 import time 87 import socket 88 try: 89 import ssl 90 has_ssl = True 91 except ImportError: 92 has_ssl = False 93 # Import Package Modules 94 # package imports removed in monolithic build 95 # TODO - This part is still very experimental. 96 #from .filelike import FileLikeSocket 97 98
99 -class Connection(object):
100 __slots__ = [ 101 'setblocking', 102 'sendall', 103 'shutdown', 104 'makefile', 105 'fileno', 106 'client_addr', 107 'client_port', 108 'server_port', 109 'socket', 110 'start_time', 111 'ssl', 112 'secure', 113 'recv', 114 'send', 115 'read', 116 'write' 117 ] 118
119 - def __init__(self, sock_tuple, port, secure=False):
120 self.client_addr, self.client_port = sock_tuple[1][:2] 121 self.server_port = port 122 self.socket = sock_tuple[0] 123 self.start_time = time.time() 124 self.ssl = has_ssl and isinstance(self.socket, ssl.SSLSocket) 125 self.secure = secure 126 127 if IS_JYTHON: 128 # In Jython we must set TCP_NODELAY here since it does not 129 # inherit from the listening socket. 130 # See: http://bugs.jython.org/issue1309 131 self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 132 133 self.socket.settimeout(SOCKET_TIMEOUT) 134 135 self.shutdown = self.socket.shutdown 136 self.fileno = self.socket.fileno 137 self.setblocking = self.socket.setblocking 138 self.recv = self.socket.recv 139 self.send = self.socket.send 140 self.makefile = self.socket.makefile 141 142 if sys.platform == 'darwin': 143 self.sendall = self._sendall_darwin 144 else: 145 self.sendall = self.socket.sendall
146
147 - def _sendall_darwin(self, buf):
148 pending = len(buf) 149 offset = 0 150 while pending: 151 try: 152 sent = self.socket.send(buf[offset:]) 153 pending -= sent 154 offset += sent 155 except socket.error: 156 import errno 157 info = sys.exc_info() 158 if info[1].args[0] != errno.EAGAIN: 159 raise 160 return offset
161 162 # FIXME - this is not ready for prime-time yet. 163 # def makefile(self, buf_size=BUF_SIZE): 164 # return FileLikeSocket(self, buf_size) 165
166 - def close(self):
167 if hasattr(self.socket, '_sock'): 168 try: 169 self.socket._sock.close() 170 except socket.error: 171 info = sys.exc_info() 172 if info[1].args[0] != socket.EBADF: 173 raise info[1] 174 else: 175 pass 176 self.socket.close()
177 178 # Monolithic build...end of module: rocket/connection.py 179 # Monolithic build...start of module: rocket/filelike.py 180 181 # Import System Modules 182 import socket 183 try: 184 from io import StringIO 185 except ImportError: 186 try: 187 from cStringIO import StringIO 188 except ImportError: 189 from StringIO import StringIO 190 # Import Package Modules 191 # package imports removed in monolithic build 192 193
194 -class FileLikeSocket(object):
195 - def __init__(self, conn, buf_size=BUF_SIZE):
196 self.conn = conn 197 self.buf_size = buf_size 198 self.buffer = StringIO() 199 self.content_length = None 200 201 if self.conn.socket.gettimeout() == 0.0: 202 self.read = self.non_blocking_read 203 else: 204 self.read = self.blocking_read
205
206 - def __iter__(self):
207 return self
208
209 - def recv(self, size):
210 while True: 211 try: 212 return self.conn.recv(size) 213 except socket.error: 214 exc = sys.exc_info() 215 e = exc[1] 216 # FIXME - Don't raise socket_errors_nonblocking or socket_error_eintr 217 if (e.args[0] not in set()): 218 raise
219
220 - def next(self):
221 data = self.readline() 222 if data == '': 223 raise StopIteration 224 return data
225
226 - def non_blocking_read(self, size=None):
227 # Shamelessly adapted from Cherrypy! 228 bufr = self.buffer 229 bufr.seek(0, 2) 230 if size is None: 231 while True: 232 data = self.recv(self.buf_size) 233 if not data: 234 break 235 bufr.write(data) 236 237 self.buffer = StringIO() 238 239 return bufr.getvalue() 240 else: 241 buf_len = self.buffer.tell() 242 if buf_len >= size: 243 bufr.seek(0) 244 data = bufr.read(size) 245 self.buffer = StringIO(bufr.read()) 246 return data 247 248 self.buffer = StringIO() 249 while True: 250 remaining = size - buf_len 251 data = self.recv(remaining) 252 253 if not data: 254 break 255 256 n = len(data) 257 if n == size and not buf_len: 258 return data 259 260 if n == remaining: 261 bufr.write(data) 262 del data 263 break 264 265 bufr.write(data) 266 buf_len += n 267 del data 268 269 return bufr.getvalue()
270
271 - def blocking_read(self, length=None):
272 if length is None: 273 if self.content_length is not None: 274 length = self.content_length 275 else: 276 length = 1 277 278 try: 279 data = self.conn.recv(length) 280 except: 281 data = b('') 282 283 return data
284
285 - def readline(self):
286 data = b("") 287 char = self.read(1) 288 while char != b('\n') and char is not b(''): 289 line = repr(char) 290 data += char 291 char = self.read(1) 292 data += char 293 return data
294
295 - def readlines(self, hint="ignored"):
296 return list(self)
297
298 - def close(self):
299 self.conn = None 300 self.content_length = None
301 302 # Monolithic build...end of module: rocket/filelike.py 303 # Monolithic build...start of module: rocket/futures.py 304 305 # Import System Modules 306 import time 307 try: 308 from concurrent.futures import Future, ThreadPoolExecutor 309 from concurrent.futures.thread import _WorkItem 310 has_futures = True 311 except ImportError: 312 has_futures = False 313
314 - class Future:
315 pass
316
317 - class ThreadPoolExecutor:
318 pass
319
320 - class _WorkItem:
321 pass
322 323
324 -class WSGIFuture(Future):
325 - def __init__(self, f_dict, *args, **kwargs):
326 Future.__init__(self, *args, **kwargs) 327 328 self.timeout = None 329 330 self._mem_dict = f_dict 331 self._lifespan = 30 332 self._name = None 333 self._start_time = time.time()
334
336 if time.time() - self._start_time >= self._lifespan: 337 self.cancel() 338 else: 339 return super(WSGIFuture, self).set_running_or_notify_cancel()
340
341 - def remember(self, name, lifespan=None):
342 self._lifespan = lifespan or self._lifespan 343 344 if name in self._mem_dict: 345 raise NameError('Cannot remember future by name "%s". ' % name + 346 'A future already exists with that name.') 347 self._name = name 348 self._mem_dict[name] = self 349 350 return self
351
352 - def forget(self):
353 if self._name in self._mem_dict and self._mem_dict[self._name] is self: 354 del self._mem_dict[self._name] 355 self._name = None
356 357
358 -class _WorkItem(object):
359 - def __init__(self, future, fn, args, kwargs):
360 self.future = future 361 self.fn = fn 362 self.args = args 363 self.kwargs = kwargs
364
365 - def run(self):
366 if not self.future.set_running_or_notify_cancel(): 367 return 368 369 try: 370 result = self.fn(*self.args, **self.kwargs) 371 except BaseException: 372 e = sys.exc_info()[1] 373 self.future.set_exception(e) 374 else: 375 self.future.set_result(result)
376 377
378 -class WSGIExecutor(ThreadPoolExecutor):
379 multithread = True 380 multiprocess = False 381
382 - def __init__(self, *args, **kwargs):
383 ThreadPoolExecutor.__init__(self, *args, **kwargs) 384 385 self.futures = dict()
386
387 - def submit(self, fn, *args, **kwargs):
388 if self._shutdown_lock.acquire(): 389 if self._shutdown: 390 self._shutdown_lock.release() 391 raise RuntimeError( 392 'Cannot schedule new futures after shutdown') 393 394 f = WSGIFuture(self.futures) 395 w = _WorkItem(f, fn, args, kwargs) 396 397 self._work_queue.put(w) 398 self._adjust_thread_count() 399 self._shutdown_lock.release() 400 return f 401 else: 402 return False
403 404
405 -class FuturesMiddleware(object):
406 "Futures middleware that adds a Futures Executor to the environment"
407 - def __init__(self, app, threads=5):
408 self.app = app 409 self.executor = WSGIExecutor(threads)
410
411 - def __call__(self, environ, start_response):
412 environ["wsgiorg.executor"] = self.executor 413 environ["wsgiorg.futures"] = self.executor.futures 414 return self.app(environ, start_response)
415 416 # Monolithic build...end of module: rocket/futures.py 417 # Monolithic build...start of module: rocket/listener.py 418 419 # Import System Modules 420 import os 421 import socket 422 import logging 423 import traceback 424 from threading import Thread 425 426 try: 427 import ssl 428 from ssl import SSLError 429 has_ssl = True 430 except ImportError: 431 has_ssl = False 432
433 - class SSLError(socket.error):
434 pass
435 # Import Package Modules 436 # package imports removed in monolithic build 437 438
439 -class Listener(Thread):
440 """The Listener class is a class responsible for accepting connections 441 and queuing them to be processed by a worker thread.""" 442
443 - def __init__(self, interface, queue_size, active_queue, *args, **kwargs):
444 Thread.__init__(self, *args, **kwargs) 445 446 # Instance variables 447 self.active_queue = active_queue 448 self.interface = interface 449 self.addr = interface[0] 450 self.port = interface[1] 451 self.secure = len(interface) >= 4 452 self.clientcert_req = (len(interface) == 5 and interface[4]) 453 454 self.thread = None 455 self.ready = False 456 457 # Error Log 458 self.err_log = logging.getLogger('Rocket.Errors.Port%i' % self.port) 459 self.err_log.addHandler(NullHandler()) 460 461 # Build the socket 462 if ':' in self.addr: 463 listener = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 464 else: 465 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 466 467 if not listener: 468 self.err_log.error("Failed to get socket.") 469 return 470 471 if self.secure: 472 if not has_ssl: 473 self.err_log.error("ssl module required to serve HTTPS.") 474 return 475 elif not os.path.exists(interface[2]): 476 data = (interface[2], interface[0], interface[1]) 477 self.err_log.error("Cannot find key file " 478 "'%s'. Cannot bind to %s:%s" % data) 479 return 480 elif not os.path.exists(interface[3]): 481 data = (interface[3], interface[0], interface[1]) 482 self.err_log.error("Cannot find certificate file " 483 "'%s'. Cannot bind to %s:%s" % data) 484 return 485 486 if self.clientcert_req and not os.path.exists(interface[4]): 487 data = (interface[4], interface[0], interface[1]) 488 self.err_log.error("Cannot find root ca certificate file " 489 "'%s'. Cannot bind to %s:%s" % data) 490 return 491 492 # Set socket options 493 try: 494 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 495 except: 496 msg = "Cannot share socket. Using %s:%i exclusively." 497 self.err_log.warning(msg % (self.addr, self.port)) 498 499 try: 500 if not IS_JYTHON: 501 listener.setsockopt(socket.IPPROTO_TCP, 502 socket.TCP_NODELAY, 503 1) 504 except: 505 msg = "Cannot set TCP_NODELAY, things might run a little slower" 506 self.err_log.warning(msg) 507 508 try: 509 listener.bind((self.addr, self.port)) 510 except: 511 msg = "Socket %s:%i in use by other process and it won't share." 512 self.err_log.error(msg % (self.addr, self.port)) 513 else: 514 # We want socket operations to timeout periodically so we can 515 # check if the server is shutting down 516 listener.settimeout(THREAD_STOP_CHECK_INTERVAL) 517 # Listen for new connections allowing queue_size number of 518 # connections to wait before rejecting a connection. 519 listener.listen(queue_size) 520 521 self.listener = listener 522 523 self.ready = True
524
525 - def wrap_socket(self, sock):
526 try: 527 if self.clientcert_req: 528 ca_certs = self.interface[4] 529 cert_reqs = ssl.CERT_OPTIONAL 530 sock = ssl.wrap_socket(sock, 531 keyfile=self.interface[2], 532 certfile=self.interface[3], 533 server_side=True, 534 cert_reqs=cert_reqs, 535 ca_certs=ca_certs, 536 ssl_version=ssl.PROTOCOL_SSLv23) 537 else: 538 sock = ssl.wrap_socket(sock, 539 keyfile=self.interface[2], 540 certfile=self.interface[3], 541 server_side=True, 542 ssl_version=ssl.PROTOCOL_SSLv23) 543 except SSLError: 544 # Generally this happens when an HTTP request is received on a 545 # secure socket. We don't do anything because it will be detected 546 # by Worker and dealt with appropriately. 547 pass 548 549 return sock
550
551 - def start(self):
552 if not self.ready: 553 self.err_log.warning('Listener started when not ready.') 554 return 555 556 if self.thread is not None and self.thread.isAlive(): 557 self.err_log.warning('Listener already running.') 558 return 559 560 self.thread = Thread(target=self.listen, name="Port" + str(self.port)) 561 562 self.thread.start()
563
564 - def isAlive(self):
565 if self.thread is None: 566 return False 567 568 return self.thread.isAlive()
569
570 - def join(self):
571 if self.thread is None: 572 return 573 574 self.ready = False 575 576 self.thread.join() 577 578 del self.thread 579 self.thread = None 580 self.ready = True
581
582 - def listen(self):
583 if __debug__: 584 self.err_log.debug('Entering main loop.') 585 while True: 586 try: 587 sock, addr = self.listener.accept() 588 589 if self.secure: 590 sock = self.wrap_socket(sock) 591 592 self.active_queue.put(((sock, addr), 593 self.interface[1], 594 self.secure)) 595 596 except socket.timeout: 597 # socket.timeout will be raised every 598 # THREAD_STOP_CHECK_INTERVAL seconds. When that happens, 599 # we check if it's time to die. 600 601 if not self.ready: 602 if __debug__: 603 self.err_log.debug('Listener exiting.') 604 return 605 else: 606 continue 607 except: 608 self.err_log.error(traceback.format_exc())
609 610 # Monolithic build...end of module: rocket/listener.py 611 # Monolithic build...start of module: rocket/main.py 612 613 # Import System Modules 614 import sys 615 import time 616 import socket 617 import logging 618 import traceback 619 from threading import Lock 620 try: 621 from queue import Queue 622 except ImportError: 623 from Queue import Queue 624 625 # Import Package Modules 626 # package imports removed in monolithic build 627 628 # Setup Logging 629 log = logging.getLogger('Rocket') 630 log.addHandler(NullHandler()) 631 632
633 -class Rocket(object):
634 """The Rocket class is responsible for handling threads and accepting and 635 dispatching connections.""" 636
637 - def __init__(self, 638 interfaces=('127.0.0.1', 8000), 639 method='wsgi', 640 app_info=None, 641 min_threads=None, 642 max_threads=None, 643 queue_size=None, 644 timeout=600, 645 handle_signals=True):
646 647 self.handle_signals = handle_signals 648 self.startstop_lock = Lock() 649 self.timeout = timeout 650 651 if not isinstance(interfaces, list): 652 self.interfaces = [interfaces] 653 else: 654 self.interfaces = interfaces 655 656 if min_threads is None: 657 min_threads = DEFAULTS['MIN_THREADS'] 658 659 if max_threads is None: 660 max_threads = DEFAULTS['MAX_THREADS'] 661 662 if not queue_size: 663 if hasattr(socket, 'SOMAXCONN'): 664 queue_size = socket.SOMAXCONN 665 else: 666 queue_size = DEFAULTS['LISTEN_QUEUE_SIZE'] 667 668 if max_threads and queue_size > max_threads: 669 queue_size = max_threads 670 671 if isinstance(app_info, dict): 672 app_info['server_software'] = SERVER_SOFTWARE 673 674 self.monitor_queue = Queue() 675 self.active_queue = Queue() 676 677 self._threadpool = ThreadPool(get_method(method), 678 app_info=app_info, 679 active_queue=self.active_queue, 680 monitor_queue=self.monitor_queue, 681 min_threads=min_threads, 682 max_threads=max_threads) 683 684 # Build our socket listeners 685 self.listeners = [Listener( 686 i, queue_size, self.active_queue) for i in self.interfaces] 687 for ndx in range(len(self.listeners) - 1, 0, -1): 688 if not self.listeners[ndx].ready: 689 del self.listeners[ndx] 690 691 if not self.listeners: 692 log.critical("No interfaces to listen on...closing.") 693 sys.exit(1)
694
695 - def _sigterm(self, signum, frame):
696 log.info('Received SIGTERM') 697 self.stop()
698
699 - def _sighup(self, signum, frame):
700 log.info('Received SIGHUP') 701 self.restart()
702
703 - def start(self, background=False):
704 log.info('Starting %s' % SERVER_SOFTWARE) 705 706 self.startstop_lock.acquire() 707 708 try: 709 # Set up our shutdown signals 710 if self.handle_signals: 711 try: 712 import signal 713 signal.signal(signal.SIGTERM, self._sigterm) 714 signal.signal(signal.SIGUSR1, self._sighup) 715 except: 716 log.debug('This platform does not support signals.') 717 718 # Start our worker threads 719 self._threadpool.start() 720 721 # Start our monitor thread 722 self._monitor = Monitor(self.monitor_queue, 723 self.active_queue, 724 self.timeout, 725 self._threadpool) 726 self._monitor.setDaemon(True) 727 self._monitor.start() 728 729 # I know that EXPR and A or B is bad but I'm keeping it for Py2.4 730 # compatibility. 731 str_extract = lambda l: (l.addr, l.port, l.secure and '*' or '') 732 733 msg = 'Listening on sockets: ' 734 msg += ', '.join( 735 ['%s:%i%s' % str_extract(l) for l in self.listeners]) 736 log.info(msg) 737 738 for l in self.listeners: 739 l.start() 740 741 finally: 742 self.startstop_lock.release() 743 744 if background: 745 return 746 747 while self._monitor.isAlive(): 748 try: 749 time.sleep(THREAD_STOP_CHECK_INTERVAL) 750 except KeyboardInterrupt: 751 # Capture a keyboard interrupt when running from a console 752 break 753 except: 754 if self._monitor.isAlive(): 755 log.error(traceback.format_exc()) 756 continue 757 758 return self.stop()
759
760 - def stop(self, stoplogging=False):
761 log.info('Stopping %s' % SERVER_SOFTWARE) 762 763 self.startstop_lock.acquire() 764 765 try: 766 # Stop listeners 767 for l in self.listeners: 768 l.ready = False 769 770 # Encourage a context switch 771 time.sleep(0.01) 772 773 for l in self.listeners: 774 if l.isAlive(): 775 l.join() 776 777 # Stop Monitor 778 self._monitor.stop() 779 if self._monitor.isAlive(): 780 self._monitor.join() 781 782 # Stop Worker threads 783 self._threadpool.stop() 784 785 if stoplogging: 786 logging.shutdown() 787 msg = "Calling logging.shutdown() is now the responsibility of \ 788 the application developer. Please update your \ 789 applications to no longer call rocket.stop(True)" 790 try: 791 import warnings 792 raise warnings.DeprecationWarning(msg) 793 except ImportError: 794 raise RuntimeError(msg) 795 796 finally: 797 self.startstop_lock.release()
798
799 - def restart(self):
800 self.stop() 801 self.start()
802 803
804 -def CherryPyWSGIServer(bind_addr, 805 wsgi_app, 806 numthreads=10, 807 server_name=None, 808 max=-1, 809 request_queue_size=5, 810 timeout=10, 811 shutdown_timeout=5):
812 """ A Cherrypy wsgiserver-compatible wrapper. """ 813 max_threads = max 814 if max_threads < 0: 815 max_threads = 0 816 return Rocket(bind_addr, 'wsgi', {'wsgi_app': wsgi_app}, 817 min_threads=numthreads, 818 max_threads=max_threads, 819 queue_size=request_queue_size, 820 timeout=timeout)
821 822 # Monolithic build...end of module: rocket/main.py 823 # Monolithic build...start of module: rocket/monitor.py 824 825 # Import System Modules 826 import time 827 import logging 828 import select 829 from threading import Thread 830 831 # Import Package Modules 832 # package imports removed in monolithic build 833 834
835 -class Monitor(Thread):
836 # Monitor worker class. 837
838 - def __init__(self, 839 monitor_queue, 840 active_queue, 841 timeout, 842 threadpool, 843 *args, 844 **kwargs):
845 846 Thread.__init__(self, *args, **kwargs) 847 848 self._threadpool = threadpool 849 850 # Instance Variables 851 self.monitor_queue = monitor_queue 852 self.active_queue = active_queue 853 self.timeout = timeout 854 855 self.log = logging.getLogger('Rocket.Monitor') 856 self.log.addHandler(NullHandler()) 857 858 self.connections = set() 859 self.active = False
860
861 - def run(self):
862 self.active = True 863 conn_list = list() 864 list_changed = False 865 866 # We need to make sure the queue is empty before we start 867 while not self.monitor_queue.empty(): 868 self.monitor_queue.get() 869 870 if __debug__: 871 self.log.debug('Entering monitor loop.') 872 873 # Enter thread main loop 874 while self.active: 875 876 # Move the queued connections to the selection pool 877 while not self.monitor_queue.empty(): 878 if __debug__: 879 self.log.debug('In "receive timed-out connections" loop.') 880 881 c = self.monitor_queue.get() 882 883 if c is None: 884 # A non-client is a signal to die 885 if __debug__: 886 self.log.debug('Received a death threat.') 887 self.stop() 888 break 889 890 self.log.debug('Received a timed out connection.') 891 892 if __debug__: 893 assert(c not in self.connections) 894 895 if IS_JYTHON: 896 # Jython requires a socket to be in Non-blocking mode in 897 # order to select on it. 898 c.setblocking(False) 899 900 if __debug__: 901 self.log.debug('Adding connection to monitor list.') 902 903 self.connections.add(c) 904 list_changed = True 905 906 # Wait on those connections 907 if list_changed: 908 conn_list = list(self.connections) 909 list_changed = False 910 911 try: 912 if len(conn_list): 913 readable = select.select(conn_list, 914 [], 915 [], 916 THREAD_STOP_CHECK_INTERVAL)[0] 917 else: 918 time.sleep(THREAD_STOP_CHECK_INTERVAL) 919 readable = [] 920 921 if not self.active: 922 break 923 924 # If we have any readable connections, put them back 925 for r in readable: 926 if __debug__: 927 self.log.debug('Restoring readable connection') 928 929 if IS_JYTHON: 930 # Jython requires a socket to be in Non-blocking mode in 931 # order to select on it, but the rest of the code requires 932 # that it be in blocking mode. 933 r.setblocking(True) 934 935 r.start_time = time.time() 936 self.active_queue.put(r) 937 938 self.connections.remove(r) 939 list_changed = True 940 941 except: 942 if self.active: 943 raise 944 else: 945 break 946 947 # If we have any stale connections, kill them off. 948 if self.timeout: 949 now = time.time() 950 stale = set() 951 for c in self.connections: 952 if (now - c.start_time) >= self.timeout: 953 stale.add(c) 954 955 for c in stale: 956 if __debug__: 957 # "EXPR and A or B" kept for Py2.4 compatibility 958 data = ( 959 c.client_addr, c.server_port, c.ssl and '*' or '') 960 self.log.debug( 961 'Flushing stale connection: %s:%i%s' % data) 962 963 self.connections.remove(c) 964 list_changed = True 965 966 try: 967 c.close() 968 finally: 969 del c 970 971 # Dynamically resize the threadpool to adapt to our changing needs. 972 self._threadpool.dynamic_resize()
973
974 - def stop(self):
975 self.active = False 976 977 if __debug__: 978 self.log.debug('Flushing waiting connections') 979 980 while self.connections: 981 c = self.connections.pop() 982 try: 983 c.close() 984 finally: 985 del c 986 987 if __debug__: 988 self.log.debug('Flushing queued connections') 989 990 while not self.monitor_queue.empty(): 991 c = self.monitor_queue.get() 992 993 if c is None: 994 continue 995 996 try: 997 c.close() 998 finally: 999 del c 1000 1001 # Place a None sentry value to cause the monitor to die. 1002 self.monitor_queue.put(None)
1003 1004 # Monolithic build...end of module: rocket/monitor.py 1005 # Monolithic build...start of module: rocket/threadpool.py 1006 1007 # Import System Modules 1008 import logging 1009 # Import Package Modules 1010 # package imports removed in monolithic build 1011 1012 1013 # Setup Logging 1014 log = logging.getLogger('Rocket.Errors.ThreadPool') 1015 log.addHandler(NullHandler()) 1016 1017
1018 -class ThreadPool:
1019 """The ThreadPool class is a container class for all the worker threads. It 1020 manages the number of actively running threads.""" 1021
1022 - def __init__(self, 1023 method, 1024 app_info, 1025 active_queue, 1026 monitor_queue, 1027 min_threads=DEFAULTS['MIN_THREADS'], 1028 max_threads=DEFAULTS['MAX_THREADS'], 1029 ):
1030 1031 if __debug__: 1032 log.debug("Initializing ThreadPool.") 1033 1034 self.check_for_dead_threads = 0 1035 self.active_queue = active_queue 1036 1037 self.worker_class = method 1038 self.min_threads = min_threads 1039 self.max_threads = max_threads 1040 self.monitor_queue = monitor_queue 1041 self.stop_server = False 1042 self.alive = False 1043 1044 # TODO - Optimize this based on some real-world usage data 1045 self.grow_threshold = int(max_threads / 10) + 2 1046 1047 if not isinstance(app_info, dict): 1048 app_info = dict() 1049 1050 if has_futures and app_info.get('futures'): 1051 app_info['executor'] = WSGIExecutor(max([DEFAULTS['MIN_THREADS'], 1052 2])) 1053 1054 app_info.update(max_threads=max_threads, 1055 min_threads=min_threads) 1056 1057 self.min_threads = min_threads 1058 self.app_info = app_info 1059 1060 self.threads = set()
1061
1062 - def start(self):
1063 self.stop_server = False 1064 if __debug__: 1065 log.debug("Starting threads.") 1066 1067 self.grow(self.min_threads) 1068 1069 self.alive = True
1070
1071 - def stop(self):
1072 self.alive = False 1073 1074 if __debug__: 1075 log.debug("Stopping threads.") 1076 1077 self.stop_server = True 1078 1079 # Prompt the threads to die 1080 self.shrink(len(self.threads)) 1081 1082 # Stop futures initially 1083 if has_futures and self.app_info.get('futures'): 1084 if __debug__: 1085 log.debug("Future executor is present. Python will not " 1086 "exit until all jobs have finished.") 1087 self.app_info['executor'].shutdown(wait=False) 1088 1089 # Give them the gun 1090 #active_threads = [t for t in self.threads if t.isAlive()] 1091 #while active_threads: 1092 # t = active_threads.pop() 1093 # t.kill() 1094 1095 # Wait until they pull the trigger 1096 for t in self.threads: 1097 if t.isAlive(): 1098 t.join() 1099 1100 # Clean up the mess 1101 self.bring_out_your_dead()
1102
1103 - def bring_out_your_dead(self):
1104 # Remove dead threads from the pool 1105 1106 dead_threads = [t for t in self.threads if not t.isAlive()] 1107 for t in dead_threads: 1108 if __debug__: 1109 log.debug("Removing dead thread: %s." % t.getName()) 1110 try: 1111 # Py2.4 complains here so we put it in a try block 1112 self.threads.remove(t) 1113 except: 1114 pass 1115 self.check_for_dead_threads -= len(dead_threads)
1116
1117 - def grow(self, amount=None):
1118 if self.stop_server: 1119 return 1120 1121 if not amount: 1122 amount = self.max_threads 1123 1124 if self.alive: 1125 amount = min([amount, self.max_threads - len(self.threads)]) 1126 1127 if __debug__: 1128 log.debug("Growing by %i." % amount) 1129 1130 for x in range(amount): 1131 worker = self.worker_class(self.app_info, 1132 self.active_queue, 1133 self.monitor_queue) 1134 1135 worker.setDaemon(True) 1136 self.threads.add(worker) 1137 worker.start()
1138
1139 - def shrink(self, amount=1):
1140 if __debug__: 1141 log.debug("Shrinking by %i." % amount) 1142 1143 self.check_for_dead_threads += amount 1144 1145 for x in range(amount): 1146 self.active_queue.put(None)
1147
1148 - def dynamic_resize(self):
1149 if (self.max_threads > self.min_threads or self.max_threads == 0): 1150 if self.check_for_dead_threads > 0: 1151 self.bring_out_your_dead() 1152 1153 queueSize = self.active_queue.qsize() 1154 threadCount = len(self.threads) 1155 1156 if __debug__: 1157 log.debug("Examining ThreadPool. %i threads and %i Q'd conxions" 1158 % (threadCount, queueSize)) 1159 1160 if queueSize == 0 and threadCount > self.min_threads: 1161 self.shrink() 1162 1163 elif queueSize > self.grow_threshold: 1164 1165 self.grow(queueSize)
1166 1167 # Monolithic build...end of module: rocket/threadpool.py 1168 # Monolithic build...start of module: rocket/worker.py 1169 1170 # Import System Modules 1171 import re 1172 import sys 1173 import socket 1174 import logging 1175 import traceback 1176 from wsgiref.headers import Headers 1177 from threading import Thread 1178 from datetime import datetime 1179 1180 try: 1181 from urllib import unquote 1182 except ImportError: 1183 from urllib.parse import unquote 1184 1185 try: 1186 from io import StringIO 1187 except ImportError: 1188 try: 1189 from cStringIO import StringIO 1190 except ImportError: 1191 from StringIO import StringIO 1192 1193 try: 1194 from ssl import SSLError 1195 except ImportError:
1196 - class SSLError(socket.error):
1197 pass
1198 # Import Package Modules 1199 # package imports removed in monolithic build 1200 1201 1202 # Define Constants 1203 re_SLASH = re.compile('%2F', re.IGNORECASE) 1204 re_REQUEST_LINE = re.compile(r"""^ 1205 (?P<method>OPTIONS|GET|HEAD|POST|PUT|DELETE|TRACE|CONNECT) # Request Method 1206 \ # (single space) 1207 ( 1208 (?P<scheme>[^:/]+) # Scheme 1209 (://) # 1210 (?P<host>[^/]+) # Host 1211 )? # 1212 (?P<path>(\*|/[^ \?]*)) # Path 1213 (\? (?P<query_string>[^ ]*))? # Query String 1214 \ # (single space) 1215 (?P<protocol>HTTPS?/1\.[01]) # Protocol 1216 $ 1217 """, re.X) 1218 LOG_LINE = '%(client_ip)s - "%(request_line)s" - %(status)s %(size)s' 1219 RESPONSE = '''\ 1220 %s %s 1221 Content-Length: %i 1222 Content-Type: %s 1223 1224 %s 1225 ''' 1226 if IS_JYTHON: 1227 HTTP_METHODS = set(['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 1228 'DELETE', 'TRACE', 'CONNECT']) 1229 1230
1231 -class Worker(Thread):
1232 """The Worker class is a base class responsible for receiving connections 1233 and (a subclass) will run an application to process the the connection """ 1234
1235 - def __init__(self, 1236 app_info, 1237 active_queue, 1238 monitor_queue, 1239 *args, 1240 **kwargs):
1241 1242 Thread.__init__(self, *args, **kwargs) 1243 1244 # Instance Variables 1245 self.app_info = app_info 1246 self.active_queue = active_queue 1247 self.monitor_queue = monitor_queue 1248 1249 self.size = 0 1250 self.status = "200 OK" 1251 self.closeConnection = True 1252 self.request_line = "" 1253 self.protocol = 'HTTP/1.1' 1254 1255 # Request Log 1256 self.req_log = logging.getLogger('Rocket.Requests') 1257 self.req_log.addHandler(NullHandler()) 1258 1259 # Error Log 1260 self.err_log = logging.getLogger('Rocket.Errors.' + self.getName()) 1261 self.err_log.addHandler(NullHandler())
1262
1263 - def _handleError(self, typ, val, tb):
1264 if typ == SSLError: 1265 if 'timed out' in str(val.args[0]): 1266 typ = SocketTimeout 1267 if typ == SocketTimeout: 1268 if __debug__: 1269 self.err_log.debug('Socket timed out') 1270 self.monitor_queue.put(self.conn) 1271 return True 1272 if typ == SocketClosed: 1273 self.closeConnection = True 1274 if __debug__: 1275 self.err_log.debug('Client closed socket') 1276 return False 1277 if typ == BadRequest: 1278 self.closeConnection = True 1279 if __debug__: 1280 self.err_log.debug('Client sent a bad request') 1281 return True 1282 if typ == socket.error: 1283 self.closeConnection = True 1284 if val.args[0] in IGNORE_ERRORS_ON_CLOSE: 1285 if __debug__: 1286 self.err_log.debug('Ignorable socket Error received...' 1287 'closing connection.') 1288 return False 1289 else: 1290 self.status = "999 Utter Server Failure" 1291 tb_fmt = traceback.format_exception(typ, val, tb) 1292 self.err_log.error('Unhandled Error when serving ' 1293 'connection:\n' + '\n'.join(tb_fmt)) 1294 return False 1295 1296 self.closeConnection = True 1297 tb_fmt = traceback.format_exception(typ, val, tb) 1298 self.err_log.error('\n'.join(tb_fmt)) 1299 self.send_response('500 Server Error') 1300 return False
1301
1302 - def run(self):
1303 if __debug__: 1304 self.err_log.debug('Entering main loop.') 1305 1306 # Enter thread main loop 1307 while True: 1308 conn = self.active_queue.get() 1309 1310 if not conn: 1311 # A non-client is a signal to die 1312 if __debug__: 1313 self.err_log.debug('Received a death threat.') 1314 return conn 1315 1316 if isinstance(conn, tuple): 1317 conn = Connection(*conn) 1318 1319 self.conn = conn 1320 1321 if conn.ssl != conn.secure: 1322 self.err_log.info('Received HTTP connection on HTTPS port.') 1323 self.send_response('400 Bad Request') 1324 self.closeConnection = True 1325 conn.close() 1326 continue 1327 else: 1328 if __debug__: 1329 self.err_log.debug('Received a connection.') 1330 self.closeConnection = False 1331 1332 # Enter connection serve loop 1333 while True: 1334 if __debug__: 1335 self.err_log.debug('Serving a request') 1336 try: 1337 self.run_app(conn) 1338 except: 1339 exc = sys.exc_info() 1340 handled = self._handleError(*exc) 1341 if handled: 1342 break 1343 finally: 1344 if self.request_line: 1345 log_info = dict(client_ip=conn.client_addr, 1346 time=datetime.now().strftime('%c'), 1347 status=self.status.split(' ')[0], 1348 size=self.size, 1349 request_line=self.request_line) 1350 self.req_log.info(LOG_LINE % log_info) 1351 1352 if self.closeConnection: 1353 try: 1354 conn.close() 1355 except: 1356 self.err_log.error(str(traceback.format_exc())) 1357 1358 break
1359
1360 - def run_app(self, conn):
1361 # Must be overridden with a method reads the request from the socket 1362 # and sends a response. 1363 self.closeConnection = True 1364 raise NotImplementedError('Overload this method!')
1365
1366 - def send_response(self, status):
1367 stat_msg = status.split(' ', 1)[1] 1368 msg = RESPONSE % (self.protocol, 1369 status, 1370 len(stat_msg), 1371 'text/plain', 1372 stat_msg) 1373 try: 1374 self.conn.sendall(b(msg)) 1375 except socket.timeout: 1376 self.closeConnection = True 1377 msg = 'Tried to send "%s" to client but received timeout error' 1378 self.err_log.error(msg % status) 1379 except socket.error: 1380 self.closeConnection = True 1381 msg = 'Tried to send "%s" to client but received socket error' 1382 self.err_log.error(msg % status)
1383
1384 - def read_request_line(self, sock_file):
1385 self.request_line = '' 1386 try: 1387 # Grab the request line 1388 d = sock_file.readline() 1389 if PY3K: 1390 d = d.decode('ISO-8859-1') 1391 1392 if d == '\r\n': 1393 # Allow an extra NEWLINE at the beginning per HTTP 1.1 spec 1394 if __debug__: 1395 self.err_log.debug('Client sent newline') 1396 1397 d = sock_file.readline() 1398 if PY3K: 1399 d = d.decode('ISO-8859-1') 1400 except socket.timeout: 1401 raise SocketTimeout('Socket timed out before request.') 1402 except TypeError: 1403 raise SocketClosed( 1404 'SSL bug caused closure of socket. See ' 1405 '"https://groups.google.com/d/topic/web2py/P_Gw0JxWzCs".') 1406 1407 d = d.strip() 1408 1409 if not d: 1410 if __debug__: 1411 self.err_log.debug( 1412 'Client did not send a recognizable request.') 1413 raise SocketClosed('Client closed socket.') 1414 1415 self.request_line = d 1416 1417 # NOTE: I've replaced the traditional method of procedurally breaking 1418 # apart the request line with a (rather unsightly) regular expression. 1419 # However, Java's regexp support sucks so bad that it actually takes 1420 # longer in Jython to process the regexp than procedurally. So I've 1421 # left the old code here for Jython's sake...for now. 1422 if IS_JYTHON: 1423 return self._read_request_line_jython(d) 1424 1425 match = re_REQUEST_LINE.match(d) 1426 1427 if not match: 1428 self.send_response('400 Bad Request') 1429 raise BadRequest 1430 1431 req = match.groupdict() 1432 for k, v in req.iteritems(): 1433 if not v: 1434 req[k] = "" 1435 if k == 'path': 1436 req['path'] = r'%2F'.join( 1437 [unquote(x) for x in re_SLASH.split(v)]) 1438 1439 self.protocol = req['protocol'] 1440 return req
1441
1442 - def _read_request_line_jython(self, d):
1443 d = d.strip() 1444 try: 1445 method, uri, proto = d.split(' ') 1446 if not proto.startswith('HTTP') or \ 1447 proto[-3:] not in ('1.0', '1.1') or \ 1448 method not in HTTP_METHODS: 1449 self.send_response('400 Bad Request') 1450 raise BadRequest 1451 except ValueError: 1452 self.send_response('400 Bad Request') 1453 raise BadRequest 1454 1455 req = dict(method=method, protocol=proto) 1456 scheme = '' 1457 host = '' 1458 if uri == '*' or uri.startswith('/'): 1459 path = uri 1460 elif '://' in uri: 1461 scheme, rest = uri.split('://') 1462 host, path = rest.split('/', 1) 1463 path = '/' + path 1464 else: 1465 self.send_response('400 Bad Request') 1466 raise BadRequest 1467 1468 query_string = '' 1469 if '?' in path: 1470 path, query_string = path.split('?', 1) 1471 1472 path = r'%2F'.join([unquote(x) for x in re_SLASH.split(path)]) 1473 1474 req.update(path=path, 1475 query_string=query_string, 1476 scheme=scheme.lower(), 1477 host=host) 1478 return req
1479
1480 - def read_headers(self, sock_file):
1481 try: 1482 headers = dict() 1483 lname = None 1484 lval = None 1485 while True: 1486 l = sock_file.readline() 1487 1488 if PY3K: 1489 try: 1490 l = str(l, 'ISO-8859-1') 1491 except UnicodeDecodeError: 1492 self.err_log.warning( 1493 'Client sent invalid header: ' + repr(l)) 1494 1495 if l.strip().replace('\0', '') == '': 1496 break 1497 1498 if l[0] in ' \t' and lname: 1499 # Some headers take more than one line 1500 lval += ' ' + l.strip() 1501 else: 1502 # HTTP header values are latin-1 encoded 1503 l = l.split(':', 1) 1504 # HTTP header names are us-ascii encoded 1505 1506 lname = l[0].strip().upper().replace('-', '_') 1507 lval = l[-1].strip() 1508 1509 headers[str(lname)] = str(lval) 1510 1511 except socket.timeout: 1512 raise SocketTimeout("Socket timed out before request.") 1513 1514 return headers
1515 1516
1517 -class SocketTimeout(Exception):
1518 "Exception for when a socket times out between requests." 1519 pass
1520 1521
1522 -class BadRequest(Exception):
1523 "Exception for when a client sends an incomprehensible request." 1524 pass
1525 1526
1527 -class SocketClosed(Exception):
1528 "Exception for when a socket is closed by the client." 1529 pass
1530 1531
1532 -class ChunkedReader(object):
1533
1534 - def __init__(self, sock_file):
1535 self.stream = sock_file 1536 self.chunk_size = 0
1537
1538 - def _read_header(self):
1539 chunk_len = "" 1540 try: 1541 while "" == chunk_len: 1542 chunk_len = self.stream.readline().strip() 1543 return int(chunk_len, 16) 1544 except ValueError: 1545 return 0
1546
1547 - def read(self, size):
1548 data = b('') 1549 chunk_size = self.chunk_size 1550 while size: 1551 if not chunk_size: 1552 chunk_size = self._read_header() 1553 1554 if size < chunk_size: 1555 data += self.stream.read(size) 1556 chunk_size -= size 1557 break 1558 else: 1559 if not chunk_size: 1560 break 1561 data += self.stream.read(chunk_size) 1562 size -= chunk_size 1563 chunk_size = 0 1564 1565 self.chunk_size = chunk_size 1566 return data
1567
1568 - def readline(self):
1569 data = b('') 1570 c = self.read(1) 1571 while c and c != b('\n'): 1572 data += c 1573 c = self.read(1) 1574 data += c 1575 return data
1576
1577 - def readlines(self):
1578 yield self.readline()
1579 1580
1581 -def get_method(method):
1582 methods = dict(wsgi=WSGIWorker) 1583 return methods[method.lower()]
1584 1585 # Monolithic build...end of module: rocket/worker.py 1586 # Monolithic build...start of module: rocket/methods/__init__.py 1587 1588 # Monolithic build...end of module: rocket/methods/__init__.py 1589 # Monolithic build...start of module: rocket/methods/wsgi.py 1590 1591 # Import System Modules 1592 import sys 1593 import socket 1594 from wsgiref.headers import Headers 1595 from wsgiref.util import FileWrapper 1596 1597 # Import Package Modules 1598 # package imports removed in monolithic build 1599 1600 if PY3K: 1601 from email.utils import formatdate 1602 else: 1603 # Caps Utils for Py2.4 compatibility 1604 from email.Utils import formatdate 1605 1606 # Define Constants 1607 NEWLINE = b('\r\n') 1608 HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s''' 1609 BASE_ENV = {'SERVER_NAME': SERVER_NAME, 1610 'SCRIPT_NAME': '', # Direct call WSGI does not need a name 1611 'wsgi.errors': sys.stderr, 1612 'wsgi.version': (1, 0), 1613 'wsgi.multiprocess': False, 1614 'wsgi.run_once': False, 1615 'wsgi.file_wrapper': FileWrapper 1616 } 1617 1618
1619 -class WSGIWorker(Worker):
1620 - def __init__(self, *args, **kwargs):
1621 """Builds some instance variables that will last the life of the 1622 thread.""" 1623 Worker.__init__(self, *args, **kwargs) 1624 1625 if isinstance(self.app_info, dict): 1626 multithreaded = self.app_info.get('max_threads') != 1 1627 else: 1628 multithreaded = False 1629 self.base_environ = dict( 1630 {'SERVER_SOFTWARE': self.app_info['server_software'], 1631 'wsgi.multithread': multithreaded, 1632 }) 1633 self.base_environ.update(BASE_ENV) 1634 1635 # Grab our application 1636 self.app = self.app_info.get('wsgi_app') 1637 1638 if not hasattr(self.app, "__call__"): 1639 raise TypeError("The wsgi_app specified (%s) is not a valid WSGI application." % repr(self.app)) 1640 1641 # Enable futures 1642 if has_futures and self.app_info.get('futures'): 1643 executor = self.app_info['executor'] 1644 self.base_environ.update({"wsgiorg.executor": executor, 1645 "wsgiorg.futures": executor.futures})
1646
1647 - def build_environ(self, sock_file, conn):
1648 """ Build the execution environment. """ 1649 # Grab the request line 1650 request = self.read_request_line(sock_file) 1651 1652 # Copy the Base Environment 1653 environ = self.base_environ.copy() 1654 1655 # Grab the headers 1656 for k, v in self.read_headers(sock_file).iteritems(): 1657 environ[str('HTTP_' + k)] = v 1658 1659 # Add CGI Variables 1660 environ['REQUEST_METHOD'] = request['method'] 1661 environ['PATH_INFO'] = request['path'] 1662 environ['SERVER_PROTOCOL'] = request['protocol'] 1663 environ['SERVER_PORT'] = str(conn.server_port) 1664 environ['REMOTE_PORT'] = str(conn.client_port) 1665 environ['REMOTE_ADDR'] = str(conn.client_addr) 1666 environ['QUERY_STRING'] = request['query_string'] 1667 if 'HTTP_CONTENT_LENGTH' in environ: 1668 environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH'] 1669 if 'HTTP_CONTENT_TYPE' in environ: 1670 environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE'] 1671 1672 # Save the request method for later 1673 self.request_method = environ['REQUEST_METHOD'] 1674 1675 # Add Dynamic WSGI Variables 1676 if conn.ssl: 1677 environ['wsgi.url_scheme'] = 'https' 1678 environ['HTTPS'] = 'on' 1679 try: 1680 peercert = conn.socket.getpeercert(binary_form=True) 1681 environ['SSL_CLIENT_RAW_CERT'] = \ 1682 peercert and ssl.DER_cert_to_PEM_cert(peercert) 1683 except Exception: 1684 print sys.exc_info()[1] 1685 else: 1686 environ['wsgi.url_scheme'] = 'http' 1687 1688 if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked': 1689 environ['wsgi.input'] = ChunkedReader(sock_file) 1690 else: 1691 environ['wsgi.input'] = sock_file 1692 1693 return environ
1694
1695 - def send_headers(self, data, sections):
1696 h_set = self.header_set 1697 1698 # Does the app want us to send output chunked? 1699 self.chunked = h_set.get('Transfer-Encoding', '').lower() == 'chunked' 1700 1701 # Add a Date header if it's not there already 1702 if not 'Date' in h_set: 1703 h_set['Date'] = formatdate(usegmt=True) 1704 1705 # Add a Server header if it's not there already 1706 if not 'Server' in h_set: 1707 h_set['Server'] = HTTP_SERVER_SOFTWARE 1708 1709 if 'Content-Length' in h_set: 1710 self.size = int(h_set['Content-Length']) 1711 else: 1712 s = int(self.status.split(' ')[0]) 1713 if (s < 200 or s not in (204, 205, 304)) and not self.chunked: 1714 if sections == 1 or self.protocol != 'HTTP/1.1': 1715 # Add a Content-Length header because it's not there 1716 self.size = len(data) 1717 h_set['Content-Length'] = str(self.size) 1718 else: 1719 # If they sent us more than one section, we blow chunks 1720 h_set['Transfer-Encoding'] = 'Chunked' 1721 self.chunked = True 1722 if __debug__: 1723 self.err_log.debug('Adding header...' 1724 'Transfer-Encoding: Chunked') 1725 1726 if 'Connection' not in h_set: 1727 # If the application did not provide a connection header, 1728 # fill it in 1729 client_conn = self.environ.get('HTTP_CONNECTION', '').lower() 1730 if self.environ['SERVER_PROTOCOL'] == 'HTTP/1.1': 1731 # HTTP = 1.1 defaults to keep-alive connections 1732 if client_conn: 1733 h_set['Connection'] = client_conn 1734 else: 1735 h_set['Connection'] = 'keep-alive' 1736 else: 1737 # HTTP < 1.1 supports keep-alive but it's quirky 1738 # so we don't support it 1739 h_set['Connection'] = 'close' 1740 1741 # Close our connection if we need to. 1742 self.closeConnection = h_set.get('Connection', '').lower() == 'close' 1743 1744 # Build our output headers 1745 header_data = HEADER_RESPONSE % (self.status, str(h_set)) 1746 1747 # Send the headers 1748 if __debug__: 1749 self.err_log.debug('Sending Headers: %s' % repr(header_data)) 1750 self.conn.sendall(b(header_data)) 1751 self.headers_sent = True
1752
1753 - def write_warning(self, data, sections=None):
1754 self.err_log.warning('WSGI app called write method directly. This is ' 1755 'deprecated behavior. Please update your app.') 1756 return self.write(data, sections)
1757
1758 - def write(self, data, sections=None):
1759 """ Write the data to the output socket. """ 1760 1761 if self.error[0]: 1762 self.status = self.error[0] 1763 data = b(self.error[1]) 1764 1765 if not self.headers_sent: 1766 self.send_headers(data, sections) 1767 1768 if self.request_method != 'HEAD': 1769 try: 1770 if self.chunked: 1771 self.conn.sendall(b('%x\r\n%s\r\n' % (len(data), data))) 1772 else: 1773 self.conn.sendall(data) 1774 except socket.timeout: 1775 self.closeConnection = True 1776 except socket.error: 1777 # But some clients will close the connection before that 1778 # resulting in a socket error. 1779 self.closeConnection = True
1780
1781 - def start_response(self, status, response_headers, exc_info=None):
1782 """ Store the HTTP status and headers to be sent when self.write is 1783 called. """ 1784 if exc_info: 1785 try: 1786 if self.headers_sent: 1787 # Re-raise original exception if headers sent 1788 # because this violates WSGI specification. 1789 raise 1790 finally: 1791 exc_info = None 1792 elif self.header_set: 1793 raise AssertionError("Headers already set!") 1794 1795 if PY3K and not isinstance(status, str): 1796 self.status = str(status, 'ISO-8859-1') 1797 else: 1798 self.status = status 1799 # Make sure headers are bytes objects 1800 try: 1801 self.header_set = Headers(response_headers) 1802 except UnicodeDecodeError: 1803 self.error = ('500 Internal Server Error', 1804 'HTTP Headers should be bytes') 1805 self.err_log.error('Received HTTP Headers from client that contain' 1806 ' invalid characters for Latin-1 encoding.') 1807 1808 return self.write_warning
1809
1810 - def run_app(self, conn):
1811 self.size = 0 1812 self.header_set = Headers([]) 1813 self.headers_sent = False 1814 self.error = (None, None) 1815 self.chunked = False 1816 sections = None 1817 output = None 1818 1819 if __debug__: 1820 self.err_log.debug('Getting sock_file') 1821 1822 # Build our file-like object 1823 if PY3K: 1824 sock_file = conn.makefile(mode='rb', buffering=BUF_SIZE) 1825 else: 1826 sock_file = conn.makefile(BUF_SIZE) 1827 1828 try: 1829 # Read the headers and build our WSGI environment 1830 self.environ = environ = self.build_environ(sock_file, conn) 1831 1832 # Handle 100 Continue 1833 if environ.get('HTTP_EXPECT', '') == '100-continue': 1834 res = environ['SERVER_PROTOCOL'] + ' 100 Continue\r\n\r\n' 1835 conn.sendall(b(res)) 1836 1837 # Send it to our WSGI application 1838 output = self.app(environ, self.start_response) 1839 1840 if not hasattr(output, '__len__') and not hasattr(output, '__iter__'): 1841 self.error = ('500 Internal Server Error', 1842 'WSGI applications must return a list or ' 1843 'generator type.') 1844 1845 if hasattr(output, '__len__'): 1846 sections = len(output) 1847 1848 for data in output: 1849 # Don't send headers until body appears 1850 if data: 1851 self.write(data, sections) 1852 1853 if self.chunked: 1854 # If chunked, send our final chunk length 1855 self.conn.sendall(b('0\r\n\r\n')) 1856 elif not self.headers_sent: 1857 # Send headers if the body was empty 1858 self.send_headers('', sections) 1859 1860 # Don't capture exceptions here. The Worker class handles 1861 # them appropriately. 1862 finally: 1863 if __debug__: 1864 self.err_log.debug('Finally closing output and sock_file') 1865 1866 if hasattr(output, 'close'): 1867 output.close() 1868 1869 sock_file.close()
1870 1871 # Monolithic build...end of module: rocket/methods/wsgi.py 1872