1
2
3
4 """
5 This file is part of the web2py Web Framework
6 Copyrighted by Massimo Di Pierro <mdipierro@cs.depaul.edu>
7 License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html)
8
9 This file specifically includes utilities for security.
10 """
11
12 import threading
13 import struct
14 import uuid
15 import random
16 import time
17 import os
18 import re
19 import sys
20 import logging
21 import socket
22 import base64
23 import zlib
24
25
26 _struct_2_long_long = struct.Struct('=QQ')
27
28 python_version = sys.version_info[0]
29
30 if python_version == 2:
31 import cPickle as pickle
32 else:
33 import pickle
34
35 from hashlib import md5, sha1, sha224, sha256, sha384, sha512
36
37 try:
38 from Crypto.Cipher import AES
39 except ImportError:
40 import gluon.contrib.aes as AES
41
42 import hmac
43
44 try:
45 try:
46 from gluon.contrib.pbkdf2_ctypes import pbkdf2_hex
47 except (ImportError, AttributeError):
48 from gluon.contrib.pbkdf2 import pbkdf2_hex
49 HAVE_PBKDF2 = True
50 except ImportError:
51 try:
52 from .pbkdf2 import pbkdf2_hex
53 HAVE_PBKDF2 = True
54 except (ImportError, ValueError):
55 HAVE_PBKDF2 = False
56
57 logger = logging.getLogger("web2py")
58
60 """ Returns an AES cipher object and random IV if None specified """
61 if IV is None:
62 IV = fast_urandom16()
63
64 return AES.new(key, AES.MODE_CBC, IV), IV
65
66
68 """ compares two strings and not vulnerable to timing attacks """
69 if len(a) != len(b):
70 return False
71 result = 0
72 for x, y in zip(a, b):
73 result |= ord(x) ^ ord(y)
74 return result == 0
75
76
78 """ Generate a md5 hash with the given text """
79 return md5(text).hexdigest()
80
81 -def simple_hash(text, key='', salt='', digest_alg='md5'):
82 """
83 Generates hash with the given text using the specified
84 digest hashing algorithm
85 """
86 if not digest_alg:
87 raise RuntimeError("simple_hash with digest_alg=None")
88 elif not isinstance(digest_alg, str):
89 h = digest_alg(text + key + salt)
90 elif digest_alg.startswith('pbkdf2'):
91 iterations, keylen, alg = digest_alg[7:-1].split(',')
92 return pbkdf2_hex(text, salt, int(iterations),
93 int(keylen), get_digest(alg))
94 elif key:
95 digest_alg = get_digest(digest_alg)
96 h = hmac.new(key + salt, text, digest_alg)
97 else:
98 h = get_digest(digest_alg)()
99 h.update(text + salt)
100 return h.hexdigest()
101
102
104 """
105 Returns a hashlib digest algorithm from a string
106 """
107 if not isinstance(value, str):
108 return value
109 value = value.lower()
110 if value == "md5":
111 return md5
112 elif value == "sha1":
113 return sha1
114 elif value == "sha224":
115 return sha224
116 elif value == "sha256":
117 return sha256
118 elif value == "sha384":
119 return sha384
120 elif value == "sha512":
121 return sha512
122 else:
123 raise ValueError("Invalid digest algorithm: %s" % value)
124
125 DIGEST_ALG_BY_SIZE = {
126 128 / 4: 'md5',
127 160 / 4: 'sha1',
128 224 / 4: 'sha224',
129 256 / 4: 'sha256',
130 384 / 4: 'sha384',
131 512 / 4: 'sha512',
132 }
133
134
135 -def pad(s, n=32, padchar=' '):
136 return s + (32 - len(s) % 32) * padchar
137
138
139 -def secure_dumps(data, encryption_key, hash_key=None, compression_level=None):
140 if not hash_key:
141 hash_key = sha1(encryption_key).hexdigest()
142 dump = pickle.dumps(data)
143 if compression_level:
144 dump = zlib.compress(dump, compression_level)
145 key = pad(encryption_key[:32])
146 cipher, IV = AES_new(key)
147 encrypted_data = base64.urlsafe_b64encode(IV + cipher.encrypt(pad(dump)))
148 signature = hmac.new(hash_key, encrypted_data).hexdigest()
149 return signature + ':' + encrypted_data
150
151
152 -def secure_loads(data, encryption_key, hash_key=None, compression_level=None):
153 if not ':' in data:
154 return None
155 if not hash_key:
156 hash_key = sha1(encryption_key).hexdigest()
157 signature, encrypted_data = data.split(':', 1)
158 actual_signature = hmac.new(hash_key, encrypted_data).hexdigest()
159 if not compare(signature, actual_signature):
160 return None
161 key = pad(encryption_key[:32])
162 encrypted_data = base64.urlsafe_b64decode(encrypted_data)
163 IV, encrypted_data = encrypted_data[:16], encrypted_data[16:]
164 cipher, _ = AES_new(key, IV=IV)
165 try:
166 data = cipher.decrypt(encrypted_data)
167 data = data.rstrip(' ')
168 if compression_level:
169 data = zlib.decompress(data)
170 return pickle.loads(data)
171 except Exception, e:
172 return None
173
174
175
176
178 """
179 This function and the web2py_uuid follow from the following discussion:
180 http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09
181
182 At startup web2py compute a unique ID that identifies the machine by adding
183 uuid.getnode() + int(time.time() * 1e3)
184
185 This is a 48-bit number. It converts the number into 16 8-bit tokens.
186 It uses this value to initialize the entropy source ('/dev/urandom') and to seed random.
187
188 If os.random() is not supported, it falls back to using random and issues a warning.
189 """
190 node_id = uuid.getnode()
191 microseconds = int(time.time() * 1e6)
192 ctokens = [((node_id + microseconds) >> ((i % 6) * 8)) %
193 256 for i in range(16)]
194 random.seed(node_id + microseconds)
195 try:
196 os.urandom(1)
197 have_urandom = True
198 try:
199
200 frandom = open('/dev/urandom', 'wb')
201 try:
202 if python_version == 2:
203 frandom.write(''.join(chr(t) for t in ctokens))
204 else:
205 frandom.write(bytes([]).join(bytes([t]) for t in ctokens))
206 finally:
207 frandom.close()
208 except IOError:
209
210 pass
211 except NotImplementedError:
212 have_urandom = False
213 logger.warning(
214 """Cryptographically secure session management is not possible on your system because
215 your system does not provide a cryptographically secure entropy source.
216 This is not specific to web2py; consider deploying on a different operating system.""")
217 if python_version == 2:
218 packed = ''.join(chr(x) for x in ctokens)
219 else:
220 packed = bytes([]).join(bytes([x]) for x in ctokens)
221 unpacked_ctokens = _struct_2_long_long.unpack(packed)
222 return unpacked_ctokens, have_urandom
223 UNPACKED_CTOKENS, HAVE_URANDOM = initialize_urandom()
224
225
227 """
228 this is 4x faster than calling os.urandom(16) and prevents
229 the "too many files open" issue with concurrent access to os.urandom()
230 """
231 try:
232 return urandom.pop()
233 except IndexError:
234 try:
235 locker.acquire()
236 ur = os.urandom(16 * 1024)
237 urandom += [ur[i:i + 16] for i in xrange(16, 1024 * 16, 16)]
238 return ur[0:16]
239 finally:
240 locker.release()
241
242
244 """
245 This function follows from the following discussion:
246 http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09
247
248 It works like uuid.uuid4 except that tries to use os.urandom() if possible
249 and it XORs the output with the tokens uniquely associated with this machine.
250 """
251 rand_longs = (random.getrandbits(64), random.getrandbits(64))
252 if HAVE_URANDOM:
253 urand_longs = _struct_2_long_long.unpack(fast_urandom16())
254 byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
255 rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
256 else:
257 byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0],
258 rand_longs[1] ^ ctokens[1])
259 return str(uuid.UUID(bytes=byte_s, version=4))
260
261 REGEX_IPv4 = re.compile('(\d+)\.(\d+)\.(\d+)\.(\d+)')
262
263
265 """
266 >>> is_valid_ip_address('127.0')
267 False
268 >>> is_valid_ip_address('127.0.0.1')
269 True
270 >>> is_valid_ip_address('2001:660::1')
271 True
272 """
273
274 if address.lower() in ('127.0.0.1', 'localhost', '::1', '::ffff:127.0.0.1'):
275 return True
276 elif address.lower() in ('unknown', ''):
277 return False
278 elif address.count('.') == 3:
279 if address.startswith('::ffff:'):
280 address = address[7:]
281 if hasattr(socket, 'inet_aton'):
282 try:
283 socket.inet_aton(address)
284 return True
285 except socket.error:
286 return False
287 else:
288 match = REGEX_IPv4.match(address)
289 if match and all(0 <= int(match.group(i)) < 256 for i in (1, 2, 3, 4)):
290 return True
291 return False
292 elif hasattr(socket, 'inet_pton'):
293 try:
294 socket.inet_pton(socket.AF_INET6, address)
295 return True
296 except socket.error:
297 return False
298 else:
299 return True
300
301
303 """
304 Determines whether the address appears to be a loopback address.
305 This assumes that the IP is valid.
306 """
307 if addrinfo:
308 if addrinfo[0] == socket.AF_INET or addrinfo[0] == socket.AF_INET6:
309 ip = addrinfo[4]
310 if not isinstance(ip, basestring):
311 return False
312
313 if ip.count('.') == 3:
314 return ip.lower().startswith(('127', '::127', '0:0:0:0:0:0:127',
315 '::ffff:127', '0:0:0:0:0:ffff:127'))
316 return ip == '::1' or ip == '0:0:0:0:0:0:0:1'
317
318
320 """
321 Filter out non-IP and bad IP addresses from getaddrinfo
322 """
323 try:
324 return [addrinfo for addrinfo in socket.getaddrinfo(host, None)
325 if (addrinfo[0] == socket.AF_INET or
326 addrinfo[0] == socket.AF_INET6)
327 and isinstance(addrinfo[4][0], basestring)]
328 except socket.error:
329 return []
330