1
2
3
4
5
6
7
8 import sys
9 import errno
10 import socket
11 import logging
12 import platform
13
14
15 VERSION = '1.2.6'
16 SERVER_NAME = socket.gethostname()
17 SERVER_SOFTWARE = 'Rocket %s' % VERSION
18 HTTP_SERVER_SOFTWARE = '%s Python/%s' % (
19 SERVER_SOFTWARE, sys.version.split(' ')[0])
20 BUF_SIZE = 16384
21 SOCKET_TIMEOUT = 10
22 THREAD_STOP_CHECK_INTERVAL = 1
23 IS_JYTHON = platform.system() == 'Java'
24 IGNORE_ERRORS_ON_CLOSE = set([errno.ECONNABORTED, errno.ECONNRESET])
25 DEFAULT_LISTEN_QUEUE_SIZE = 5
26 DEFAULT_MIN_THREADS = 10
27 DEFAULT_MAX_THREADS = 0
28 DEFAULTS = dict(LISTEN_QUEUE_SIZE=DEFAULT_LISTEN_QUEUE_SIZE,
29 MIN_THREADS=DEFAULT_MIN_THREADS,
30 MAX_THREADS=DEFAULT_MAX_THREADS)
31
32 PY3K = sys.version_info[0] > 2
33
34
36 "A Logging handler to prevent library errors."
37 - def emit(self, record):
39
40 if PY3K:
42 """ Convert string/unicode/bytes literals into bytes. This allows for
43 the same code to run on Python 2.x and 3.x. """
44 if isinstance(val, str):
45 return val.encode()
46 else:
47 return val
48
49 - def u(val, encoding="us-ascii"):
50 """ Convert bytes into string/unicode. This allows for the
51 same code to run on Python 2.x and 3.x. """
52 if isinstance(val, bytes):
53 return val.decode(encoding)
54 else:
55 return val
56
57 else:
59 """ Convert string/unicode/bytes literals into bytes. This allows for
60 the same code to run on Python 2.x and 3.x. """
61 if isinstance(val, unicode):
62 return val.encode()
63 else:
64 return val
65
66 - def u(val, encoding="us-ascii"):
67 """ Convert bytes into string/unicode. This allows for the
68 same code to run on Python 2.x and 3.x. """
69 if isinstance(val, str):
70 return val.decode(encoding)
71 else:
72 return val
73
74
75
76
77 __all__ = ['VERSION', 'SERVER_SOFTWARE', 'HTTP_SERVER_SOFTWARE', 'BUF_SIZE',
78 'IS_JYTHON', 'IGNORE_ERRORS_ON_CLOSE', 'DEFAULTS', 'PY3K', 'b', 'u',
79 'Rocket', 'CherryPyWSGIServer', 'SERVER_NAME', 'NullHandler']
80
81
82
83
84
85 import sys
86 import time
87 import socket
88 try:
89 import ssl
90 has_ssl = True
91 except ImportError:
92 has_ssl = False
93
94
95
96
97
98
100 __slots__ = [
101 'setblocking',
102 'sendall',
103 'shutdown',
104 'makefile',
105 'fileno',
106 'client_addr',
107 'client_port',
108 'server_port',
109 'socket',
110 'start_time',
111 'ssl',
112 'secure',
113 'recv',
114 'send',
115 'read',
116 'write'
117 ]
118
119 - def __init__(self, sock_tuple, port, secure=False):
146
148 pending = len(buf)
149 offset = 0
150 while pending:
151 try:
152 sent = self.socket.send(buf[offset:])
153 pending -= sent
154 offset += sent
155 except socket.error:
156 import errno
157 info = sys.exc_info()
158 if info[1].args[0] != errno.EAGAIN:
159 raise
160 return offset
161
162
163
164
165
167 if hasattr(self.socket, '_sock'):
168 try:
169 self.socket._sock.close()
170 except socket.error:
171 info = sys.exc_info()
172 if info[1].args[0] != socket.EBADF:
173 raise info[1]
174 else:
175 pass
176 self.socket.close()
177
178
179
180
181
182 import socket
183 try:
184 from io import StringIO
185 except ImportError:
186 try:
187 from cStringIO import StringIO
188 except ImportError:
189 from StringIO import StringIO
190
191
192
193
196 self.conn = conn
197 self.buf_size = buf_size
198 self.buffer = StringIO()
199 self.content_length = None
200
201 if self.conn.socket.gettimeout() == 0.0:
202 self.read = self.non_blocking_read
203 else:
204 self.read = self.blocking_read
205
208
209 - def recv(self, size):
210 while True:
211 try:
212 return self.conn.recv(size)
213 except socket.error:
214 exc = sys.exc_info()
215 e = exc[1]
216
217 if (e.args[0] not in set()):
218 raise
219
221 data = self.readline()
222 if data == '':
223 raise StopIteration
224 return data
225
227
228 bufr = self.buffer
229 bufr.seek(0, 2)
230 if size is None:
231 while True:
232 data = self.recv(self.buf_size)
233 if not data:
234 break
235 bufr.write(data)
236
237 self.buffer = StringIO()
238
239 return bufr.getvalue()
240 else:
241 buf_len = self.buffer.tell()
242 if buf_len >= size:
243 bufr.seek(0)
244 data = bufr.read(size)
245 self.buffer = StringIO(bufr.read())
246 return data
247
248 self.buffer = StringIO()
249 while True:
250 remaining = size - buf_len
251 data = self.recv(remaining)
252
253 if not data:
254 break
255
256 n = len(data)
257 if n == size and not buf_len:
258 return data
259
260 if n == remaining:
261 bufr.write(data)
262 del data
263 break
264
265 bufr.write(data)
266 buf_len += n
267 del data
268
269 return bufr.getvalue()
270
272 if length is None:
273 if self.content_length is not None:
274 length = self.content_length
275 else:
276 length = 1
277
278 try:
279 data = self.conn.recv(length)
280 except:
281 data = b('')
282
283 return data
284
286 data = b("")
287 char = self.read(1)
288 while char != b('\n') and char is not b(''):
289 line = repr(char)
290 data += char
291 char = self.read(1)
292 data += char
293 return data
294
297
299 self.conn = None
300 self.content_length = None
301
302
303
304
305
306 import time
307 try:
308 from concurrent.futures import Future, ThreadPoolExecutor
309 from concurrent.futures.thread import _WorkItem
310 has_futures = True
311 except ImportError:
312 has_futures = False
313
316
319
322
323
325 - def __init__(self, f_dict, *args, **kwargs):
326 Future.__init__(self, *args, **kwargs)
327
328 self.timeout = None
329
330 self._mem_dict = f_dict
331 self._lifespan = 30
332 self._name = None
333 self._start_time = time.time()
334
340
341 - def remember(self, name, lifespan=None):
342 self._lifespan = lifespan or self._lifespan
343
344 if name in self._mem_dict:
345 raise NameError('Cannot remember future by name "%s". ' % name +
346 'A future already exists with that name.')
347 self._name = name
348 self._mem_dict[name] = self
349
350 return self
351
353 if self._name in self._mem_dict and self._mem_dict[self._name] is self:
354 del self._mem_dict[self._name]
355 self._name = None
356
357
359 - def __init__(self, future, fn, args, kwargs):
360 self.future = future
361 self.fn = fn
362 self.args = args
363 self.kwargs = kwargs
364
366 if not self.future.set_running_or_notify_cancel():
367 return
368
369 try:
370 result = self.fn(*self.args, **self.kwargs)
371 except BaseException:
372 e = sys.exc_info()[1]
373 self.future.set_exception(e)
374 else:
375 self.future.set_result(result)
376
377
379 multithread = True
380 multiprocess = False
381
383 ThreadPoolExecutor.__init__(self, *args, **kwargs)
384
385 self.futures = dict()
386
387 - def submit(self, fn, *args, **kwargs):
388 if self._shutdown_lock.acquire():
389 if self._shutdown:
390 self._shutdown_lock.release()
391 raise RuntimeError(
392 'Cannot schedule new futures after shutdown')
393
394 f = WSGIFuture(self.futures)
395 w = _WorkItem(f, fn, args, kwargs)
396
397 self._work_queue.put(w)
398 self._adjust_thread_count()
399 self._shutdown_lock.release()
400 return f
401 else:
402 return False
403
404
406 "Futures middleware that adds a Futures Executor to the environment"
410
411 - def __call__(self, environ, start_response):
415
416
417
418
419
420 import os
421 import socket
422 import logging
423 import traceback
424 from threading import Thread
425
426 try:
427 import ssl
428 from ssl import SSLError
429 has_ssl = True
430 except ImportError:
431 has_ssl = False
432
435
436
437
438
440 """The Listener class is a class responsible for accepting connections
441 and queuing them to be processed by a worker thread."""
442
443 - def __init__(self, interface, queue_size, active_queue, *args, **kwargs):
444 Thread.__init__(self, *args, **kwargs)
445
446
447 self.active_queue = active_queue
448 self.interface = interface
449 self.addr = interface[0]
450 self.port = interface[1]
451 self.secure = len(interface) >= 4
452 self.clientcert_req = (len(interface) == 5 and interface[4])
453
454 self.thread = None
455 self.ready = False
456
457
458 self.err_log = logging.getLogger('Rocket.Errors.Port%i' % self.port)
459 self.err_log.addHandler(NullHandler())
460
461
462 if ':' in self.addr:
463 listener = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
464 else:
465 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
466
467 if not listener:
468 self.err_log.error("Failed to get socket.")
469 return
470
471 if self.secure:
472 if not has_ssl:
473 self.err_log.error("ssl module required to serve HTTPS.")
474 return
475 elif not os.path.exists(interface[2]):
476 data = (interface[2], interface[0], interface[1])
477 self.err_log.error("Cannot find key file "
478 "'%s'. Cannot bind to %s:%s" % data)
479 return
480 elif not os.path.exists(interface[3]):
481 data = (interface[3], interface[0], interface[1])
482 self.err_log.error("Cannot find certificate file "
483 "'%s'. Cannot bind to %s:%s" % data)
484 return
485
486 if self.clientcert_req and not os.path.exists(interface[4]):
487 data = (interface[4], interface[0], interface[1])
488 self.err_log.error("Cannot find root ca certificate file "
489 "'%s'. Cannot bind to %s:%s" % data)
490 return
491
492
493 try:
494 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
495 except:
496 msg = "Cannot share socket. Using %s:%i exclusively."
497 self.err_log.warning(msg % (self.addr, self.port))
498
499 try:
500 if not IS_JYTHON:
501 listener.setsockopt(socket.IPPROTO_TCP,
502 socket.TCP_NODELAY,
503 1)
504 except:
505 msg = "Cannot set TCP_NODELAY, things might run a little slower"
506 self.err_log.warning(msg)
507
508 try:
509 listener.bind((self.addr, self.port))
510 except:
511 msg = "Socket %s:%i in use by other process and it won't share."
512 self.err_log.error(msg % (self.addr, self.port))
513 else:
514
515
516 listener.settimeout(THREAD_STOP_CHECK_INTERVAL)
517
518
519 listener.listen(queue_size)
520
521 self.listener = listener
522
523 self.ready = True
524
526 try:
527 if self.clientcert_req:
528 ca_certs = self.interface[4]
529 cert_reqs = ssl.CERT_OPTIONAL
530 sock = ssl.wrap_socket(sock,
531 keyfile=self.interface[2],
532 certfile=self.interface[3],
533 server_side=True,
534 cert_reqs=cert_reqs,
535 ca_certs=ca_certs,
536 ssl_version=ssl.PROTOCOL_SSLv23)
537 else:
538 sock = ssl.wrap_socket(sock,
539 keyfile=self.interface[2],
540 certfile=self.interface[3],
541 server_side=True,
542 ssl_version=ssl.PROTOCOL_SSLv23)
543 except SSLError:
544
545
546
547 pass
548
549 return sock
550
552 if not self.ready:
553 self.err_log.warning('Listener started when not ready.')
554 return
555
556 if self.thread is not None and self.thread.isAlive():
557 self.err_log.warning('Listener already running.')
558 return
559
560 self.thread = Thread(target=self.listen, name="Port" + str(self.port))
561
562 self.thread.start()
563
565 if self.thread is None:
566 return False
567
568 return self.thread.isAlive()
569
571 if self.thread is None:
572 return
573
574 self.ready = False
575
576 self.thread.join()
577
578 del self.thread
579 self.thread = None
580 self.ready = True
581
583 if __debug__:
584 self.err_log.debug('Entering main loop.')
585 while True:
586 try:
587 sock, addr = self.listener.accept()
588
589 if self.secure:
590 sock = self.wrap_socket(sock)
591
592 self.active_queue.put(((sock, addr),
593 self.interface[1],
594 self.secure))
595
596 except socket.timeout:
597
598
599
600
601 if not self.ready:
602 if __debug__:
603 self.err_log.debug('Listener exiting.')
604 return
605 else:
606 continue
607 except:
608 self.err_log.error(traceback.format_exc())
609
610
611
612
613
614 import sys
615 import time
616 import socket
617 import logging
618 import traceback
619 from threading import Lock
620 try:
621 from queue import Queue
622 except ImportError:
623 from Queue import Queue
624
625
626
627
628
629 log = logging.getLogger('Rocket')
630 log.addHandler(NullHandler())
631
632
634 """The Rocket class is responsible for handling threads and accepting and
635 dispatching connections."""
636
637 - def __init__(self,
638 interfaces=('127.0.0.1', 8000),
639 method='wsgi',
640 app_info=None,
641 min_threads=None,
642 max_threads=None,
643 queue_size=None,
644 timeout=600,
645 handle_signals=True):
646
647 self.handle_signals = handle_signals
648 self.startstop_lock = Lock()
649 self.timeout = timeout
650
651 if not isinstance(interfaces, list):
652 self.interfaces = [interfaces]
653 else:
654 self.interfaces = interfaces
655
656 if min_threads is None:
657 min_threads = DEFAULTS['MIN_THREADS']
658
659 if max_threads is None:
660 max_threads = DEFAULTS['MAX_THREADS']
661
662 if not queue_size:
663 if hasattr(socket, 'SOMAXCONN'):
664 queue_size = socket.SOMAXCONN
665 else:
666 queue_size = DEFAULTS['LISTEN_QUEUE_SIZE']
667
668 if max_threads and queue_size > max_threads:
669 queue_size = max_threads
670
671 if isinstance(app_info, dict):
672 app_info['server_software'] = SERVER_SOFTWARE
673
674 self.monitor_queue = Queue()
675 self.active_queue = Queue()
676
677 self._threadpool = ThreadPool(get_method(method),
678 app_info=app_info,
679 active_queue=self.active_queue,
680 monitor_queue=self.monitor_queue,
681 min_threads=min_threads,
682 max_threads=max_threads)
683
684
685 self.listeners = [Listener(
686 i, queue_size, self.active_queue) for i in self.interfaces]
687 for ndx in range(len(self.listeners) - 1, 0, -1):
688 if not self.listeners[ndx].ready:
689 del self.listeners[ndx]
690
691 if not self.listeners:
692 log.critical("No interfaces to listen on...closing.")
693 sys.exit(1)
694
696 log.info('Received SIGTERM')
697 self.stop()
698
700 log.info('Received SIGHUP')
701 self.restart()
702
703 - def start(self, background=False):
704 log.info('Starting %s' % SERVER_SOFTWARE)
705
706 self.startstop_lock.acquire()
707
708 try:
709
710 if self.handle_signals:
711 try:
712 import signal
713 signal.signal(signal.SIGTERM, self._sigterm)
714 signal.signal(signal.SIGUSR1, self._sighup)
715 except:
716 log.debug('This platform does not support signals.')
717
718
719 self._threadpool.start()
720
721
722 self._monitor = Monitor(self.monitor_queue,
723 self.active_queue,
724 self.timeout,
725 self._threadpool)
726 self._monitor.setDaemon(True)
727 self._monitor.start()
728
729
730
731 str_extract = lambda l: (l.addr, l.port, l.secure and '*' or '')
732
733 msg = 'Listening on sockets: '
734 msg += ', '.join(
735 ['%s:%i%s' % str_extract(l) for l in self.listeners])
736 log.info(msg)
737
738 for l in self.listeners:
739 l.start()
740
741 finally:
742 self.startstop_lock.release()
743
744 if background:
745 return
746
747 while self._monitor.isAlive():
748 try:
749 time.sleep(THREAD_STOP_CHECK_INTERVAL)
750 except KeyboardInterrupt:
751
752 break
753 except:
754 if self._monitor.isAlive():
755 log.error(traceback.format_exc())
756 continue
757
758 return self.stop()
759
760 - def stop(self, stoplogging=False):
761 log.info('Stopping %s' % SERVER_SOFTWARE)
762
763 self.startstop_lock.acquire()
764
765 try:
766
767 for l in self.listeners:
768 l.ready = False
769
770
771 time.sleep(0.01)
772
773 for l in self.listeners:
774 if l.isAlive():
775 l.join()
776
777
778 self._monitor.stop()
779 if self._monitor.isAlive():
780 self._monitor.join()
781
782
783 self._threadpool.stop()
784
785 if stoplogging:
786 logging.shutdown()
787 msg = "Calling logging.shutdown() is now the responsibility of \
788 the application developer. Please update your \
789 applications to no longer call rocket.stop(True)"
790 try:
791 import warnings
792 raise warnings.DeprecationWarning(msg)
793 except ImportError:
794 raise RuntimeError(msg)
795
796 finally:
797 self.startstop_lock.release()
798
802
803
804 -def CherryPyWSGIServer(bind_addr,
805 wsgi_app,
806 numthreads=10,
807 server_name=None,
808 max=-1,
809 request_queue_size=5,
810 timeout=10,
811 shutdown_timeout=5):
812 """ A Cherrypy wsgiserver-compatible wrapper. """
813 max_threads = max
814 if max_threads < 0:
815 max_threads = 0
816 return Rocket(bind_addr, 'wsgi', {'wsgi_app': wsgi_app},
817 min_threads=numthreads,
818 max_threads=max_threads,
819 queue_size=request_queue_size,
820 timeout=timeout)
821
822
823
824
825
826 import time
827 import logging
828 import select
829 from threading import Thread
830
831
832
833
834
836
837
838 - def __init__(self,
839 monitor_queue,
840 active_queue,
841 timeout,
842 threadpool,
843 *args,
844 **kwargs):
845
846 Thread.__init__(self, *args, **kwargs)
847
848 self._threadpool = threadpool
849
850
851 self.monitor_queue = monitor_queue
852 self.active_queue = active_queue
853 self.timeout = timeout
854
855 self.log = logging.getLogger('Rocket.Monitor')
856 self.log.addHandler(NullHandler())
857
858 self.connections = set()
859 self.active = False
860
862 self.active = True
863 conn_list = list()
864 list_changed = False
865
866
867 while not self.monitor_queue.empty():
868 self.monitor_queue.get()
869
870 if __debug__:
871 self.log.debug('Entering monitor loop.')
872
873
874 while self.active:
875
876
877 while not self.monitor_queue.empty():
878 if __debug__:
879 self.log.debug('In "receive timed-out connections" loop.')
880
881 c = self.monitor_queue.get()
882
883 if c is None:
884
885 if __debug__:
886 self.log.debug('Received a death threat.')
887 self.stop()
888 break
889
890 self.log.debug('Received a timed out connection.')
891
892 if __debug__:
893 assert(c not in self.connections)
894
895 if IS_JYTHON:
896
897
898 c.setblocking(False)
899
900 if __debug__:
901 self.log.debug('Adding connection to monitor list.')
902
903 self.connections.add(c)
904 list_changed = True
905
906
907 if list_changed:
908 conn_list = list(self.connections)
909 list_changed = False
910
911 try:
912 if len(conn_list):
913 readable = select.select(conn_list,
914 [],
915 [],
916 THREAD_STOP_CHECK_INTERVAL)[0]
917 else:
918 time.sleep(THREAD_STOP_CHECK_INTERVAL)
919 readable = []
920
921 if not self.active:
922 break
923
924
925 for r in readable:
926 if __debug__:
927 self.log.debug('Restoring readable connection')
928
929 if IS_JYTHON:
930
931
932
933 r.setblocking(True)
934
935 r.start_time = time.time()
936 self.active_queue.put(r)
937
938 self.connections.remove(r)
939 list_changed = True
940
941 except:
942 if self.active:
943 raise
944 else:
945 break
946
947
948 if self.timeout:
949 now = time.time()
950 stale = set()
951 for c in self.connections:
952 if (now - c.start_time) >= self.timeout:
953 stale.add(c)
954
955 for c in stale:
956 if __debug__:
957
958 data = (
959 c.client_addr, c.server_port, c.ssl and '*' or '')
960 self.log.debug(
961 'Flushing stale connection: %s:%i%s' % data)
962
963 self.connections.remove(c)
964 list_changed = True
965
966 try:
967 c.close()
968 finally:
969 del c
970
971
972 self._threadpool.dynamic_resize()
973
975 self.active = False
976
977 if __debug__:
978 self.log.debug('Flushing waiting connections')
979
980 while self.connections:
981 c = self.connections.pop()
982 try:
983 c.close()
984 finally:
985 del c
986
987 if __debug__:
988 self.log.debug('Flushing queued connections')
989
990 while not self.monitor_queue.empty():
991 c = self.monitor_queue.get()
992
993 if c is None:
994 continue
995
996 try:
997 c.close()
998 finally:
999 del c
1000
1001
1002 self.monitor_queue.put(None)
1003
1004
1005
1006
1007
1008 import logging
1009
1010
1011
1012
1013
1014 log = logging.getLogger('Rocket.Errors.ThreadPool')
1015 log.addHandler(NullHandler())
1016
1017
1019 """The ThreadPool class is a container class for all the worker threads. It
1020 manages the number of actively running threads."""
1021
1022 - def __init__(self,
1023 method,
1024 app_info,
1025 active_queue,
1026 monitor_queue,
1027 min_threads=DEFAULTS['MIN_THREADS'],
1028 max_threads=DEFAULTS['MAX_THREADS'],
1029 ):
1030
1031 if __debug__:
1032 log.debug("Initializing ThreadPool.")
1033
1034 self.check_for_dead_threads = 0
1035 self.active_queue = active_queue
1036
1037 self.worker_class = method
1038 self.min_threads = min_threads
1039 self.max_threads = max_threads
1040 self.monitor_queue = monitor_queue
1041 self.stop_server = False
1042 self.alive = False
1043
1044
1045 self.grow_threshold = int(max_threads / 10) + 2
1046
1047 if not isinstance(app_info, dict):
1048 app_info = dict()
1049
1050 if has_futures and app_info.get('futures'):
1051 app_info['executor'] = WSGIExecutor(max([DEFAULTS['MIN_THREADS'],
1052 2]))
1053
1054 app_info.update(max_threads=max_threads,
1055 min_threads=min_threads)
1056
1057 self.min_threads = min_threads
1058 self.app_info = app_info
1059
1060 self.threads = set()
1061
1063 self.stop_server = False
1064 if __debug__:
1065 log.debug("Starting threads.")
1066
1067 self.grow(self.min_threads)
1068
1069 self.alive = True
1070
1072 self.alive = False
1073
1074 if __debug__:
1075 log.debug("Stopping threads.")
1076
1077 self.stop_server = True
1078
1079
1080 self.shrink(len(self.threads))
1081
1082
1083 if has_futures and self.app_info.get('futures'):
1084 if __debug__:
1085 log.debug("Future executor is present. Python will not "
1086 "exit until all jobs have finished.")
1087 self.app_info['executor'].shutdown(wait=False)
1088
1089
1090
1091
1092
1093
1094
1095
1096 for t in self.threads:
1097 if t.isAlive():
1098 t.join()
1099
1100
1101 self.bring_out_your_dead()
1102
1104
1105
1106 dead_threads = [t for t in self.threads if not t.isAlive()]
1107 for t in dead_threads:
1108 if __debug__:
1109 log.debug("Removing dead thread: %s." % t.getName())
1110 try:
1111
1112 self.threads.remove(t)
1113 except:
1114 pass
1115 self.check_for_dead_threads -= len(dead_threads)
1116
1117 - def grow(self, amount=None):
1118 if self.stop_server:
1119 return
1120
1121 if not amount:
1122 amount = self.max_threads
1123
1124 if self.alive:
1125 amount = min([amount, self.max_threads - len(self.threads)])
1126
1127 if __debug__:
1128 log.debug("Growing by %i." % amount)
1129
1130 for x in range(amount):
1131 worker = self.worker_class(self.app_info,
1132 self.active_queue,
1133 self.monitor_queue)
1134
1135 worker.setDaemon(True)
1136 self.threads.add(worker)
1137 worker.start()
1138
1140 if __debug__:
1141 log.debug("Shrinking by %i." % amount)
1142
1143 self.check_for_dead_threads += amount
1144
1145 for x in range(amount):
1146 self.active_queue.put(None)
1147
1149 if (self.max_threads > self.min_threads or self.max_threads == 0):
1150 if self.check_for_dead_threads > 0:
1151 self.bring_out_your_dead()
1152
1153 queueSize = self.active_queue.qsize()
1154 threadCount = len(self.threads)
1155
1156 if __debug__:
1157 log.debug("Examining ThreadPool. %i threads and %i Q'd conxions"
1158 % (threadCount, queueSize))
1159
1160 if queueSize == 0 and threadCount > self.min_threads:
1161 self.shrink()
1162
1163 elif queueSize > self.grow_threshold:
1164
1165 self.grow(queueSize)
1166
1167
1168
1169
1170
1171 import re
1172 import sys
1173 import socket
1174 import logging
1175 import traceback
1176 from wsgiref.headers import Headers
1177 from threading import Thread
1178 from datetime import datetime
1179
1180 try:
1181 from urllib import unquote
1182 except ImportError:
1183 from urllib.parse import unquote
1184
1185 try:
1186 from io import StringIO
1187 except ImportError:
1188 try:
1189 from cStringIO import StringIO
1190 except ImportError:
1191 from StringIO import StringIO
1192
1193 try:
1194 from ssl import SSLError
1195 except ImportError:
1198
1199
1200
1201
1202
1203 re_SLASH = re.compile('%2F', re.IGNORECASE)
1204 re_REQUEST_LINE = re.compile(r"""^
1205 (?P<method>OPTIONS|GET|HEAD|POST|PUT|DELETE|TRACE|CONNECT) # Request Method
1206 \ # (single space)
1207 (
1208 (?P<scheme>[^:/]+) # Scheme
1209 (://) #
1210 (?P<host>[^/]+) # Host
1211 )? #
1212 (?P<path>(\*|/[^ \?]*)) # Path
1213 (\? (?P<query_string>[^ ]*))? # Query String
1214 \ # (single space)
1215 (?P<protocol>HTTPS?/1\.[01]) # Protocol
1216 $
1217 """, re.X)
1218 LOG_LINE = '%(client_ip)s - "%(request_line)s" - %(status)s %(size)s'
1219 RESPONSE = '''\
1220 %s %s
1221 Content-Length: %i
1222 Content-Type: %s
1223
1224 %s
1225 '''
1226 if IS_JYTHON:
1227 HTTP_METHODS = set(['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT',
1228 'DELETE', 'TRACE', 'CONNECT'])
1229
1230
1232 """The Worker class is a base class responsible for receiving connections
1233 and (a subclass) will run an application to process the the connection """
1234
1235 - def __init__(self,
1236 app_info,
1237 active_queue,
1238 monitor_queue,
1239 *args,
1240 **kwargs):
1241
1242 Thread.__init__(self, *args, **kwargs)
1243
1244
1245 self.app_info = app_info
1246 self.active_queue = active_queue
1247 self.monitor_queue = monitor_queue
1248
1249 self.size = 0
1250 self.status = "200 OK"
1251 self.closeConnection = True
1252 self.request_line = ""
1253 self.protocol = 'HTTP/1.1'
1254
1255
1256 self.req_log = logging.getLogger('Rocket.Requests')
1257 self.req_log.addHandler(NullHandler())
1258
1259
1260 self.err_log = logging.getLogger('Rocket.Errors.' + self.getName())
1261 self.err_log.addHandler(NullHandler())
1262
1264 if typ == SSLError:
1265 if 'timed out' in str(val.args[0]):
1266 typ = SocketTimeout
1267 if typ == SocketTimeout:
1268 if __debug__:
1269 self.err_log.debug('Socket timed out')
1270 self.monitor_queue.put(self.conn)
1271 return True
1272 if typ == SocketClosed:
1273 self.closeConnection = True
1274 if __debug__:
1275 self.err_log.debug('Client closed socket')
1276 return False
1277 if typ == BadRequest:
1278 self.closeConnection = True
1279 if __debug__:
1280 self.err_log.debug('Client sent a bad request')
1281 return True
1282 if typ == socket.error:
1283 self.closeConnection = True
1284 if val.args[0] in IGNORE_ERRORS_ON_CLOSE:
1285 if __debug__:
1286 self.err_log.debug('Ignorable socket Error received...'
1287 'closing connection.')
1288 return False
1289 else:
1290 self.status = "999 Utter Server Failure"
1291 tb_fmt = traceback.format_exception(typ, val, tb)
1292 self.err_log.error('Unhandled Error when serving '
1293 'connection:\n' + '\n'.join(tb_fmt))
1294 return False
1295
1296 self.closeConnection = True
1297 tb_fmt = traceback.format_exception(typ, val, tb)
1298 self.err_log.error('\n'.join(tb_fmt))
1299 self.send_response('500 Server Error')
1300 return False
1301
1303 if __debug__:
1304 self.err_log.debug('Entering main loop.')
1305
1306
1307 while True:
1308 conn = self.active_queue.get()
1309
1310 if not conn:
1311
1312 if __debug__:
1313 self.err_log.debug('Received a death threat.')
1314 return conn
1315
1316 if isinstance(conn, tuple):
1317 conn = Connection(*conn)
1318
1319 self.conn = conn
1320
1321 if conn.ssl != conn.secure:
1322 self.err_log.info('Received HTTP connection on HTTPS port.')
1323 self.send_response('400 Bad Request')
1324 self.closeConnection = True
1325 conn.close()
1326 continue
1327 else:
1328 if __debug__:
1329 self.err_log.debug('Received a connection.')
1330 self.closeConnection = False
1331
1332
1333 while True:
1334 if __debug__:
1335 self.err_log.debug('Serving a request')
1336 try:
1337 self.run_app(conn)
1338 except:
1339 exc = sys.exc_info()
1340 handled = self._handleError(*exc)
1341 if handled:
1342 break
1343 finally:
1344 if self.request_line:
1345 log_info = dict(client_ip=conn.client_addr,
1346 time=datetime.now().strftime('%c'),
1347 status=self.status.split(' ')[0],
1348 size=self.size,
1349 request_line=self.request_line)
1350 self.req_log.info(LOG_LINE % log_info)
1351
1352 if self.closeConnection:
1353 try:
1354 conn.close()
1355 except:
1356 self.err_log.error(str(traceback.format_exc()))
1357
1358 break
1359
1361
1362
1363 self.closeConnection = True
1364 raise NotImplementedError('Overload this method!')
1365
1367 stat_msg = status.split(' ', 1)[1]
1368 msg = RESPONSE % (self.protocol,
1369 status,
1370 len(stat_msg),
1371 'text/plain',
1372 stat_msg)
1373 try:
1374 self.conn.sendall(b(msg))
1375 except socket.timeout:
1376 self.closeConnection = True
1377 msg = 'Tried to send "%s" to client but received timeout error'
1378 self.err_log.error(msg % status)
1379 except socket.error:
1380 self.closeConnection = True
1381 msg = 'Tried to send "%s" to client but received socket error'
1382 self.err_log.error(msg % status)
1383
1385 self.request_line = ''
1386 try:
1387
1388 d = sock_file.readline()
1389 if PY3K:
1390 d = d.decode('ISO-8859-1')
1391
1392 if d == '\r\n':
1393
1394 if __debug__:
1395 self.err_log.debug('Client sent newline')
1396
1397 d = sock_file.readline()
1398 if PY3K:
1399 d = d.decode('ISO-8859-1')
1400 except socket.timeout:
1401 raise SocketTimeout('Socket timed out before request.')
1402 except TypeError:
1403 raise SocketClosed(
1404 'SSL bug caused closure of socket. See '
1405 '"https://groups.google.com/d/topic/web2py/P_Gw0JxWzCs".')
1406
1407 d = d.strip()
1408
1409 if not d:
1410 if __debug__:
1411 self.err_log.debug(
1412 'Client did not send a recognizable request.')
1413 raise SocketClosed('Client closed socket.')
1414
1415 self.request_line = d
1416
1417
1418
1419
1420
1421
1422 if IS_JYTHON:
1423 return self._read_request_line_jython(d)
1424
1425 match = re_REQUEST_LINE.match(d)
1426
1427 if not match:
1428 self.send_response('400 Bad Request')
1429 raise BadRequest
1430
1431 req = match.groupdict()
1432 for k, v in req.iteritems():
1433 if not v:
1434 req[k] = ""
1435 if k == 'path':
1436 req['path'] = r'%2F'.join(
1437 [unquote(x) for x in re_SLASH.split(v)])
1438
1439 self.protocol = req['protocol']
1440 return req
1441
1443 d = d.strip()
1444 try:
1445 method, uri, proto = d.split(' ')
1446 if not proto.startswith('HTTP') or \
1447 proto[-3:] not in ('1.0', '1.1') or \
1448 method not in HTTP_METHODS:
1449 self.send_response('400 Bad Request')
1450 raise BadRequest
1451 except ValueError:
1452 self.send_response('400 Bad Request')
1453 raise BadRequest
1454
1455 req = dict(method=method, protocol=proto)
1456 scheme = ''
1457 host = ''
1458 if uri == '*' or uri.startswith('/'):
1459 path = uri
1460 elif '://' in uri:
1461 scheme, rest = uri.split('://')
1462 host, path = rest.split('/', 1)
1463 path = '/' + path
1464 else:
1465 self.send_response('400 Bad Request')
1466 raise BadRequest
1467
1468 query_string = ''
1469 if '?' in path:
1470 path, query_string = path.split('?', 1)
1471
1472 path = r'%2F'.join([unquote(x) for x in re_SLASH.split(path)])
1473
1474 req.update(path=path,
1475 query_string=query_string,
1476 scheme=scheme.lower(),
1477 host=host)
1478 return req
1479
1481 try:
1482 headers = dict()
1483 lname = None
1484 lval = None
1485 while True:
1486 l = sock_file.readline()
1487
1488 if PY3K:
1489 try:
1490 l = str(l, 'ISO-8859-1')
1491 except UnicodeDecodeError:
1492 self.err_log.warning(
1493 'Client sent invalid header: ' + repr(l))
1494
1495 if l.strip().replace('\0', '') == '':
1496 break
1497
1498 if l[0] in ' \t' and lname:
1499
1500 lval += ' ' + l.strip()
1501 else:
1502
1503 l = l.split(':', 1)
1504
1505
1506 lname = l[0].strip().upper().replace('-', '_')
1507 lval = l[-1].strip()
1508
1509 headers[str(lname)] = str(lval)
1510
1511 except socket.timeout:
1512 raise SocketTimeout("Socket timed out before request.")
1513
1514 return headers
1515
1516
1518 "Exception for when a socket times out between requests."
1519 pass
1520
1521
1523 "Exception for when a client sends an incomprehensible request."
1524 pass
1525
1526
1528 "Exception for when a socket is closed by the client."
1529 pass
1530
1531
1533
1535 self.stream = sock_file
1536 self.chunk_size = 0
1537
1539 chunk_len = ""
1540 try:
1541 while "" == chunk_len:
1542 chunk_len = self.stream.readline().strip()
1543 return int(chunk_len, 16)
1544 except ValueError:
1545 return 0
1546
1547 - def read(self, size):
1548 data = b('')
1549 chunk_size = self.chunk_size
1550 while size:
1551 if not chunk_size:
1552 chunk_size = self._read_header()
1553
1554 if size < chunk_size:
1555 data += self.stream.read(size)
1556 chunk_size -= size
1557 break
1558 else:
1559 if not chunk_size:
1560 break
1561 data += self.stream.read(chunk_size)
1562 size -= chunk_size
1563 chunk_size = 0
1564
1565 self.chunk_size = chunk_size
1566 return data
1567
1569 data = b('')
1570 c = self.read(1)
1571 while c and c != b('\n'):
1572 data += c
1573 c = self.read(1)
1574 data += c
1575 return data
1576
1579
1580
1584
1585
1586
1587
1588
1589
1590
1591
1592 import sys
1593 import socket
1594 from wsgiref.headers import Headers
1595 from wsgiref.util import FileWrapper
1596
1597
1598
1599
1600 if PY3K:
1601 from email.utils import formatdate
1602 else:
1603
1604 from email.Utils import formatdate
1605
1606
1607 NEWLINE = b('\r\n')
1608 HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s'''
1609 BASE_ENV = {'SERVER_NAME': SERVER_NAME,
1610 'SCRIPT_NAME': '',
1611 'wsgi.errors': sys.stderr,
1612 'wsgi.version': (1, 0),
1613 'wsgi.multiprocess': False,
1614 'wsgi.run_once': False,
1615 'wsgi.file_wrapper': FileWrapper
1616 }
1617
1618
1621 """Builds some instance variables that will last the life of the
1622 thread."""
1623 Worker.__init__(self, *args, **kwargs)
1624
1625 if isinstance(self.app_info, dict):
1626 multithreaded = self.app_info.get('max_threads') != 1
1627 else:
1628 multithreaded = False
1629 self.base_environ = dict(
1630 {'SERVER_SOFTWARE': self.app_info['server_software'],
1631 'wsgi.multithread': multithreaded,
1632 })
1633 self.base_environ.update(BASE_ENV)
1634
1635
1636 self.app = self.app_info.get('wsgi_app')
1637
1638 if not hasattr(self.app, "__call__"):
1639 raise TypeError("The wsgi_app specified (%s) is not a valid WSGI application." % repr(self.app))
1640
1641
1642 if has_futures and self.app_info.get('futures'):
1643 executor = self.app_info['executor']
1644 self.base_environ.update({"wsgiorg.executor": executor,
1645 "wsgiorg.futures": executor.futures})
1646
1694
1696 h_set = self.header_set
1697
1698
1699 self.chunked = h_set.get('Transfer-Encoding', '').lower() == 'chunked'
1700
1701
1702 if not 'Date' in h_set:
1703 h_set['Date'] = formatdate(usegmt=True)
1704
1705
1706 if not 'Server' in h_set:
1707 h_set['Server'] = HTTP_SERVER_SOFTWARE
1708
1709 if 'Content-Length' in h_set:
1710 self.size = int(h_set['Content-Length'])
1711 else:
1712 s = int(self.status.split(' ')[0])
1713 if (s < 200 or s not in (204, 205, 304)) and not self.chunked:
1714 if sections == 1 or self.protocol != 'HTTP/1.1':
1715
1716 self.size = len(data)
1717 h_set['Content-Length'] = str(self.size)
1718 else:
1719
1720 h_set['Transfer-Encoding'] = 'Chunked'
1721 self.chunked = True
1722 if __debug__:
1723 self.err_log.debug('Adding header...'
1724 'Transfer-Encoding: Chunked')
1725
1726 if 'Connection' not in h_set:
1727
1728
1729 client_conn = self.environ.get('HTTP_CONNECTION', '').lower()
1730 if self.environ['SERVER_PROTOCOL'] == 'HTTP/1.1':
1731
1732 if client_conn:
1733 h_set['Connection'] = client_conn
1734 else:
1735 h_set['Connection'] = 'keep-alive'
1736 else:
1737
1738
1739 h_set['Connection'] = 'close'
1740
1741
1742 self.closeConnection = h_set.get('Connection', '').lower() == 'close'
1743
1744
1745 header_data = HEADER_RESPONSE % (self.status, str(h_set))
1746
1747
1748 if __debug__:
1749 self.err_log.debug('Sending Headers: %s' % repr(header_data))
1750 self.conn.sendall(b(header_data))
1751 self.headers_sent = True
1752
1754 self.err_log.warning('WSGI app called write method directly. This is '
1755 'deprecated behavior. Please update your app.')
1756 return self.write(data, sections)
1757
1758 - def write(self, data, sections=None):
1759 """ Write the data to the output socket. """
1760
1761 if self.error[0]:
1762 self.status = self.error[0]
1763 data = b(self.error[1])
1764
1765 if not self.headers_sent:
1766 self.send_headers(data, sections)
1767
1768 if self.request_method != 'HEAD':
1769 try:
1770 if self.chunked:
1771 self.conn.sendall(b('%x\r\n%s\r\n' % (len(data), data)))
1772 else:
1773 self.conn.sendall(data)
1774 except socket.timeout:
1775 self.closeConnection = True
1776 except socket.error:
1777
1778
1779 self.closeConnection = True
1780
1782 """ Store the HTTP status and headers to be sent when self.write is
1783 called. """
1784 if exc_info:
1785 try:
1786 if self.headers_sent:
1787
1788
1789 raise
1790 finally:
1791 exc_info = None
1792 elif self.header_set:
1793 raise AssertionError("Headers already set!")
1794
1795 if PY3K and not isinstance(status, str):
1796 self.status = str(status, 'ISO-8859-1')
1797 else:
1798 self.status = status
1799
1800 try:
1801 self.header_set = Headers(response_headers)
1802 except UnicodeDecodeError:
1803 self.error = ('500 Internal Server Error',
1804 'HTTP Headers should be bytes')
1805 self.err_log.error('Received HTTP Headers from client that contain'
1806 ' invalid characters for Latin-1 encoding.')
1807
1808 return self.write_warning
1809
1811 self.size = 0
1812 self.header_set = Headers([])
1813 self.headers_sent = False
1814 self.error = (None, None)
1815 self.chunked = False
1816 sections = None
1817 output = None
1818
1819 if __debug__:
1820 self.err_log.debug('Getting sock_file')
1821
1822
1823 if PY3K:
1824 sock_file = conn.makefile(mode='rb', buffering=BUF_SIZE)
1825 else:
1826 sock_file = conn.makefile(BUF_SIZE)
1827
1828 try:
1829
1830 self.environ = environ = self.build_environ(sock_file, conn)
1831
1832
1833 if environ.get('HTTP_EXPECT', '') == '100-continue':
1834 res = environ['SERVER_PROTOCOL'] + ' 100 Continue\r\n\r\n'
1835 conn.sendall(b(res))
1836
1837
1838 output = self.app(environ, self.start_response)
1839
1840 if not hasattr(output, '__len__') and not hasattr(output, '__iter__'):
1841 self.error = ('500 Internal Server Error',
1842 'WSGI applications must return a list or '
1843 'generator type.')
1844
1845 if hasattr(output, '__len__'):
1846 sections = len(output)
1847
1848 for data in output:
1849
1850 if data:
1851 self.write(data, sections)
1852
1853 if self.chunked:
1854
1855 self.conn.sendall(b('0\r\n\r\n'))
1856 elif not self.headers_sent:
1857
1858 self.send_headers('', sections)
1859
1860
1861
1862 finally:
1863 if __debug__:
1864 self.err_log.debug('Finally closing output and sock_file')
1865
1866 if hasattr(output, 'close'):
1867 output.close()
1868
1869 sock_file.close()
1870
1871
1872