Package gluon :: Module utils
[hide private]
[frames] | no frames]

Source Code for Module gluon.utils

  1  #!/usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3   
  4  """ 
  5  This file is part of the web2py Web Framework 
  6  Copyrighted by Massimo Di Pierro <mdipierro@cs.depaul.edu> 
  7  License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html) 
  8   
  9  This file specifically includes utilities for security. 
 10  """ 
 11   
 12  import threading 
 13  import struct 
 14  import uuid 
 15  import random 
 16  import time 
 17  import os 
 18  import re 
 19  import sys 
 20  import logging 
 21  import socket 
 22  import base64 
 23  import zlib 
 24   
 25   
 26  _struct_2_long_long = struct.Struct('=QQ') 
 27   
 28  python_version = sys.version_info[0] 
 29   
 30  if python_version == 2: 
 31      import cPickle as pickle 
 32  else: 
 33      import pickle 
 34   
 35  from hashlib import md5, sha1, sha224, sha256, sha384, sha512 
 36   
 37  try: 
 38      from Crypto.Cipher import AES 
 39  except ImportError: 
 40      import gluon.contrib.aes as AES 
 41   
 42  import hmac 
 43   
 44  try: 
 45      try: 
 46          from gluon.contrib.pbkdf2_ctypes import pbkdf2_hex 
 47      except (ImportError, AttributeError): 
 48          from gluon.contrib.pbkdf2 import pbkdf2_hex 
 49      HAVE_PBKDF2 = True 
 50  except ImportError: 
 51      try: 
 52          from .pbkdf2 import pbkdf2_hex 
 53          HAVE_PBKDF2 = True 
 54      except (ImportError, ValueError): 
 55          HAVE_PBKDF2 = False 
 56   
 57  logger = logging.getLogger("web2py") 
 58   
59 -def AES_new(key, IV=None):
60 """ Returns an AES cipher object and random IV if None specified """ 61 if IV is None: 62 IV = fast_urandom16() 63 64 return AES.new(key, AES.MODE_CBC, IV), IV
65 66
67 -def compare(a, b):
68 """ compares two strings and not vulnerable to timing attacks """ 69 if len(a) != len(b): 70 return False 71 result = 0 72 for x, y in zip(a, b): 73 result |= ord(x) ^ ord(y) 74 return result == 0
75 76
77 -def md5_hash(text):
78 """ Generate a md5 hash with the given text """ 79 return md5(text).hexdigest()
80
81 -def simple_hash(text, key='', salt='', digest_alg='md5'):
82 """ 83 Generates hash with the given text using the specified 84 digest hashing algorithm 85 """ 86 if not digest_alg: 87 raise RuntimeError("simple_hash with digest_alg=None") 88 elif not isinstance(digest_alg, str): # manual approach 89 h = digest_alg(text + key + salt) 90 elif digest_alg.startswith('pbkdf2'): # latest and coolest! 91 iterations, keylen, alg = digest_alg[7:-1].split(',') 92 return pbkdf2_hex(text, salt, int(iterations), 93 int(keylen), get_digest(alg)) 94 elif key: # use hmac 95 digest_alg = get_digest(digest_alg) 96 h = hmac.new(key + salt, text, digest_alg) 97 else: # compatible with third party systems 98 h = get_digest(digest_alg)() 99 h.update(text + salt) 100 return h.hexdigest()
101 102
103 -def get_digest(value):
104 """ 105 Returns a hashlib digest algorithm from a string 106 """ 107 if not isinstance(value, str): 108 return value 109 value = value.lower() 110 if value == "md5": 111 return md5 112 elif value == "sha1": 113 return sha1 114 elif value == "sha224": 115 return sha224 116 elif value == "sha256": 117 return sha256 118 elif value == "sha384": 119 return sha384 120 elif value == "sha512": 121 return sha512 122 else: 123 raise ValueError("Invalid digest algorithm: %s" % value)
124 125 DIGEST_ALG_BY_SIZE = { 126 128 / 4: 'md5', 127 160 / 4: 'sha1', 128 224 / 4: 'sha224', 129 256 / 4: 'sha256', 130 384 / 4: 'sha384', 131 512 / 4: 'sha512', 132 } 133 134
135 -def pad(s, n=32, padchar=' '):
136 return s + (32 - len(s) % 32) * padchar
137 138
139 -def secure_dumps(data, encryption_key, hash_key=None, compression_level=None):
140 if not hash_key: 141 hash_key = sha1(encryption_key).hexdigest() 142 dump = pickle.dumps(data) 143 if compression_level: 144 dump = zlib.compress(dump, compression_level) 145 key = pad(encryption_key[:32]) 146 cipher, IV = AES_new(key) 147 encrypted_data = base64.urlsafe_b64encode(IV + cipher.encrypt(pad(dump))) 148 signature = hmac.new(hash_key, encrypted_data).hexdigest() 149 return signature + ':' + encrypted_data
150 151
152 -def secure_loads(data, encryption_key, hash_key=None, compression_level=None):
153 if not ':' in data: 154 return None 155 if not hash_key: 156 hash_key = sha1(encryption_key).hexdigest() 157 signature, encrypted_data = data.split(':', 1) 158 actual_signature = hmac.new(hash_key, encrypted_data).hexdigest() 159 if not compare(signature, actual_signature): 160 return None 161 key = pad(encryption_key[:32]) 162 encrypted_data = base64.urlsafe_b64decode(encrypted_data) 163 IV, encrypted_data = encrypted_data[:16], encrypted_data[16:] 164 cipher, _ = AES_new(key, IV=IV) 165 try: 166 data = cipher.decrypt(encrypted_data) 167 data = data.rstrip(' ') 168 if compression_level: 169 data = zlib.decompress(data) 170 return pickle.loads(data) 171 except Exception, e: 172 return None
173 174 ### compute constant CTOKENS 175 176
177 -def initialize_urandom():
178 """ 179 This function and the web2py_uuid follow from the following discussion: 180 http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09 181 182 At startup web2py compute a unique ID that identifies the machine by adding 183 uuid.getnode() + int(time.time() * 1e3) 184 185 This is a 48-bit number. It converts the number into 16 8-bit tokens. 186 It uses this value to initialize the entropy source ('/dev/urandom') and to seed random. 187 188 If os.random() is not supported, it falls back to using random and issues a warning. 189 """ 190 node_id = uuid.getnode() 191 microseconds = int(time.time() * 1e6) 192 ctokens = [((node_id + microseconds) >> ((i % 6) * 8)) % 193 256 for i in range(16)] 194 random.seed(node_id + microseconds) 195 try: 196 os.urandom(1) 197 have_urandom = True 198 try: 199 # try to add process-specific entropy 200 frandom = open('/dev/urandom', 'wb') 201 try: 202 if python_version == 2: 203 frandom.write(''.join(chr(t) for t in ctokens)) # python 2 204 else: 205 frandom.write(bytes([]).join(bytes([t]) for t in ctokens)) # python 3 206 finally: 207 frandom.close() 208 except IOError: 209 # works anyway 210 pass 211 except NotImplementedError: 212 have_urandom = False 213 logger.warning( 214 """Cryptographically secure session management is not possible on your system because 215 your system does not provide a cryptographically secure entropy source. 216 This is not specific to web2py; consider deploying on a different operating system.""") 217 if python_version == 2: 218 packed = ''.join(chr(x) for x in ctokens) # python 2 219 else: 220 packed = bytes([]).join(bytes([x]) for x in ctokens) # python 3 221 unpacked_ctokens = _struct_2_long_long.unpack(packed) 222 return unpacked_ctokens, have_urandom
223 UNPACKED_CTOKENS, HAVE_URANDOM = initialize_urandom() 224 225
226 -def fast_urandom16(urandom=[], locker=threading.RLock()):
227 """ 228 this is 4x faster than calling os.urandom(16) and prevents 229 the "too many files open" issue with concurrent access to os.urandom() 230 """ 231 try: 232 return urandom.pop() 233 except IndexError: 234 try: 235 locker.acquire() 236 ur = os.urandom(16 * 1024) 237 urandom += [ur[i:i + 16] for i in xrange(16, 1024 * 16, 16)] 238 return ur[0:16] 239 finally: 240 locker.release()
241 242
243 -def web2py_uuid(ctokens=UNPACKED_CTOKENS):
244 """ 245 This function follows from the following discussion: 246 http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09 247 248 It works like uuid.uuid4 except that tries to use os.urandom() if possible 249 and it XORs the output with the tokens uniquely associated with this machine. 250 """ 251 rand_longs = (random.getrandbits(64), random.getrandbits(64)) 252 if HAVE_URANDOM: 253 urand_longs = _struct_2_long_long.unpack(fast_urandom16()) 254 byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0], 255 rand_longs[1] ^ urand_longs[1] ^ ctokens[1]) 256 else: 257 byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0], 258 rand_longs[1] ^ ctokens[1]) 259 return str(uuid.UUID(bytes=byte_s, version=4))
260 261 REGEX_IPv4 = re.compile('(\d+)\.(\d+)\.(\d+)\.(\d+)') 262 263
264 -def is_valid_ip_address(address):
265 """ 266 >>> is_valid_ip_address('127.0') 267 False 268 >>> is_valid_ip_address('127.0.0.1') 269 True 270 >>> is_valid_ip_address('2001:660::1') 271 True 272 """ 273 # deal with special cases 274 if address.lower() in ('127.0.0.1', 'localhost', '::1', '::ffff:127.0.0.1'): 275 return True 276 elif address.lower() in ('unknown', ''): 277 return False 278 elif address.count('.') == 3: # assume IPv4 279 if address.startswith('::ffff:'): 280 address = address[7:] 281 if hasattr(socket, 'inet_aton'): # try validate using the OS 282 try: 283 socket.inet_aton(address) 284 return True 285 except socket.error: # invalid address 286 return False 287 else: # try validate using Regex 288 match = REGEX_IPv4.match(address) 289 if match and all(0 <= int(match.group(i)) < 256 for i in (1, 2, 3, 4)): 290 return True 291 return False 292 elif hasattr(socket, 'inet_pton'): # assume IPv6, try using the OS 293 try: 294 socket.inet_pton(socket.AF_INET6, address) 295 return True 296 except socket.error: # invalid address 297 return False 298 else: # do not know what to do? assume it is a valid address 299 return True
300 301
302 -def is_loopback_ip_address(ip=None, addrinfo=None):
303 """ 304 Determines whether the address appears to be a loopback address. 305 This assumes that the IP is valid. 306 """ 307 if addrinfo: # see socket.getaddrinfo() for layout of addrinfo tuple 308 if addrinfo[0] == socket.AF_INET or addrinfo[0] == socket.AF_INET6: 309 ip = addrinfo[4] 310 if not isinstance(ip, basestring): 311 return False 312 # IPv4 or IPv6-embedded IPv4 or IPv4-compatible IPv6 313 if ip.count('.') == 3: 314 return ip.lower().startswith(('127', '::127', '0:0:0:0:0:0:127', 315 '::ffff:127', '0:0:0:0:0:ffff:127')) 316 return ip == '::1' or ip == '0:0:0:0:0:0:0:1' # IPv6 loopback
317 318
319 -def getipaddrinfo(host):
320 """ 321 Filter out non-IP and bad IP addresses from getaddrinfo 322 """ 323 try: 324 return [addrinfo for addrinfo in socket.getaddrinfo(host, None) 325 if (addrinfo[0] == socket.AF_INET or 326 addrinfo[0] == socket.AF_INET6) 327 and isinstance(addrinfo[4][0], basestring)] 328 except socket.error: 329 return []
330