Package gluon :: Module scheduler
[hide private]
[frames] | no frames]

Source Code for Module gluon.scheduler

   1  #!/usr/bin/env python 
   2  # -*- coding: utf-8 -*- 
   3   
   4  USAGE = """ 
   5  ## Example 
   6   
   7  For any existing app 
   8   
   9  Create File: app/models/scheduler.py ====== 
  10  from gluon.scheduler import Scheduler 
  11   
  12  def demo1(*args,**vars): 
  13      print 'you passed args=%s and vars=%s' % (args, vars) 
  14      return 'done!' 
  15   
  16  def demo2(): 
  17      1/0 
  18   
  19  scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2)) 
  20  ## run worker nodes with: 
  21   
  22     cd web2py 
  23     python web2py.py -K myapp 
  24  or 
  25     python gluon/scheduler.py -u sqlite://storage.sqlite \ 
  26                               -f applications/myapp/databases/ \ 
  27                               -t mytasks.py 
  28  (-h for info) 
  29  python scheduler.py -h 
  30   
  31  ## schedule jobs using 
  32  http://127.0.0.1:8000/myapp/appadmin/insert/db/scheduler_task 
  33   
  34  ## monitor scheduled jobs 
  35  http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_task.id>0 
  36   
  37  ## view completed jobs 
  38  http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_run.id>0 
  39   
  40  ## view workers 
  41  http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_worker.id>0 
  42   
  43  ## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put 
  44  ## the following into /etc/init/web2py-scheduler.conf: 
  45  ## (This assumes your web2py instance is installed in <user>'s home directory, 
  46  ## running as <user>, with app <myapp>, on network interface eth0.) 
  47   
  48  description "web2py task scheduler" 
  49  start on (local-filesystems and net-device-up IFACE=eth0) 
  50  stop on shutdown 
  51  respawn limit 8 60 # Give up if restart occurs 8 times in 60 seconds. 
  52  exec sudo -u <user> python /home/<user>/web2py/web2py.py -K <myapp> 
  53  respawn 
  54   
  55  ## You can then start/stop/restart/check status of the daemon with: 
  56  sudo start web2py-scheduler 
  57  sudo stop web2py-scheduler 
  58  sudo restart web2py-scheduler 
  59  sudo status web2py-scheduler 
  60  """ 
  61   
  62  import os 
  63  import time 
  64  import multiprocessing 
  65  import sys 
  66  import threading 
  67  import traceback 
  68  import signal 
  69  import socket 
  70  import datetime 
  71  import logging 
  72  import optparse 
  73  import types 
  74  import Queue 
  75   
  76  path = os.getcwd() 
  77   
  78  if 'WEB2PY_PATH' not in os.environ: 
  79      os.environ['WEB2PY_PATH'] = path 
  80   
  81  try: 
  82      from gluon.contrib.simplejson import loads, dumps 
  83  except: 
  84      from simplejson import loads, dumps 
  85   
  86  IDENTIFIER = "%s#%s" % (socket.gethostname(),os.getpid()) 
  87   
  88  logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER) 
  89   
  90  from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET, IS_NOT_IN_DB 
  91  from gluon import IS_INT_IN_RANGE, IS_DATETIME 
  92  from gluon.utils import web2py_uuid 
  93  from gluon.storage import Storage 
  94   
  95   
  96  QUEUED = 'QUEUED' 
  97  ASSIGNED = 'ASSIGNED' 
  98  RUNNING = 'RUNNING' 
  99  COMPLETED = 'COMPLETED' 
 100  FAILED = 'FAILED' 
 101  TIMEOUT = 'TIMEOUT' 
 102  STOPPED = 'STOPPED' 
 103  ACTIVE = 'ACTIVE' 
 104  TERMINATE = 'TERMINATE' 
 105  DISABLED = 'DISABLED' 
 106  KILL = 'KILL' 
 107  PICK = 'PICK' 
 108  STOP_TASK = 'STOP_TASK' 
 109  EXPIRED = 'EXPIRED' 
 110  SECONDS = 1 
 111  HEARTBEAT = 3 * SECONDS 
 112  MAXHIBERNATION = 10 
 113  CLEAROUT = '!clear!' 
 114   
 115  CALLABLETYPES = (types.LambdaType, types.FunctionType, 
 116                   types.BuiltinFunctionType, 
 117                   types.MethodType, types.BuiltinMethodType) 
 118   
 119   
120 -class Task(object):
121 - def __init__(self, app, function, timeout, args='[]', vars='{}', **kwargs):
122 logger.debug(' new task allocated: %s.%s', app, function) 123 self.app = app 124 self.function = function 125 self.timeout = timeout 126 self.args = args # json 127 self.vars = vars # json 128 self.__dict__.update(kwargs)
129
130 - def __str__(self):
131 return '<Task: %s>' % self.function
132 133
134 -class TaskReport(object):
135 - def __init__(self, status, result=None, output=None, tb=None):
136 logger.debug(' new task report: %s', status) 137 if tb: 138 logger.debug(' traceback: %s', tb) 139 else: 140 logger.debug(' result: %s', result) 141 self.status = status 142 self.result = result 143 self.output = output 144 self.tb = tb
145
146 - def __str__(self):
147 return '<TaskReport: %s>' % self.status
148 149
150 -def demo_function(*argv, **kwargs):
151 """ test function """ 152 for i in range(argv[0]): 153 print 'click', i 154 time.sleep(1) 155 return 'done'
156 157 #the two functions below deal with simplejson decoding as unicode, esp for the dict decode 158 #and subsequent usage as function Keyword arguments unicode variable names won't work! 159 #borrowed from http://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-unicode-ones-from-json-in-python 160 161
162 -def _decode_list(lst):
163 newlist = [] 164 for i in lst: 165 if isinstance(i, unicode): 166 i = i.encode('utf-8') 167 elif isinstance(i, list): 168 i = _decode_list(i) 169 newlist.append(i) 170 return newlist
171 172
173 -def _decode_dict(dct):
174 newdict = {} 175 for k, v in dct.iteritems(): 176 if isinstance(k, unicode): 177 k = k.encode('utf-8') 178 if isinstance(v, unicode): 179 v = v.encode('utf-8') 180 elif isinstance(v, list): 181 v = _decode_list(v) 182 newdict[k] = v 183 return newdict
184 185
186 -def executor(queue, task, out):
187 """ the background process """ 188 logger.debug(' task started') 189 190 class LogOutput(object): 191 """Facility to log output at intervals""" 192 def __init__(self, out_queue): 193 self.out_queue = out_queue 194 self.stdout = sys.stdout 195 sys.stdout = self
196 197 def __del__(self): 198 sys.stdout = self.stdout 199 200 def flush(self): 201 pass 202 203 def write(self, data): 204 self.out_queue.put(data) 205 206 W2P_TASK = Storage({'id' : task.task_id, 'uuid' : task.uuid}) 207 stdout = LogOutput(out) 208 try: 209 if task.app: 210 os.chdir(os.environ['WEB2PY_PATH']) 211 from gluon.shell import env, parse_path_info 212 from gluon import current 213 level = logging.getLogger().getEffectiveLevel() 214 logging.getLogger().setLevel(logging.WARN) 215 # Get controller-specific subdirectory if task.app is of 216 # form 'app/controller' 217 (a, c, f) = parse_path_info(task.app) 218 _env = env(a=a, c=c, import_models=True) 219 logging.getLogger().setLevel(level) 220 f = task.function 221 functions = current._scheduler.tasks 222 if not functions: 223 #look into env 224 _function = _env.get(f) 225 else: 226 _function = functions.get(f) 227 if not isinstance(_function, CALLABLETYPES): 228 raise NameError( 229 "name '%s' not found in scheduler's environment" % f) 230 #Inject W2P_TASK into environment 231 _env.update({'W2P_TASK' : W2P_TASK}) 232 #Inject W2P_TASK into current 233 from gluon import current 234 current.W2P_TASK = W2P_TASK 235 globals().update(_env) 236 args = loads(task.args) 237 vars = loads(task.vars, object_hook=_decode_dict) 238 result = dumps(_function(*args, **vars)) 239 else: 240 ### for testing purpose only 241 result = eval(task.function)( 242 *loads(task.args, object_hook=_decode_dict), 243 **loads(task.vars, object_hook=_decode_dict)) 244 queue.put(TaskReport('COMPLETED', result=result)) 245 except BaseException, e: 246 tb = traceback.format_exc() 247 queue.put(TaskReport('FAILED', tb=tb)) 248 del stdout 249 250
251 -class MetaScheduler(threading.Thread):
252 - def __init__(self):
253 threading.Thread.__init__(self) 254 self.process = None # the background process 255 self.have_heartbeat = True # set to False to kill 256 self.empty_runs = 0
257 258
259 - def async(self, task):
260 """ 261 starts the background process and returns: 262 ('ok',result,output) 263 ('error',exception,None) 264 ('timeout',None,None) 265 ('terminated',None,None) 266 """ 267 db = self.db 268 sr = db.scheduler_run 269 out = multiprocessing.Queue() 270 queue = multiprocessing.Queue(maxsize=1) 271 p = multiprocessing.Process(target=executor, args=(queue, task, out)) 272 self.process = p 273 logger.debug(' task starting') 274 p.start() 275 276 task_output = "" 277 tout = "" 278 279 try: 280 if task.sync_output > 0: 281 run_timeout = task.sync_output 282 else: 283 run_timeout = task.timeout 284 285 start = time.time() 286 287 while p.is_alive() and ( 288 not task.timeout or time.time() - start < task.timeout): 289 if tout: 290 try: 291 logger.debug(' partial output saved') 292 db(sr.id == task.run_id).update(run_output=task_output) 293 db.commit() 294 except: 295 pass 296 p.join(timeout=run_timeout) 297 tout = "" 298 while not out.empty(): 299 tout += out.get() 300 if tout: 301 logger.debug(' partial output: "%s"' % str(tout)) 302 if CLEAROUT in tout: 303 task_output = tout[ 304 tout.rfind(CLEAROUT) + len(CLEAROUT):] 305 else: 306 task_output += tout 307 except: 308 p.terminate() 309 p.join() 310 self.have_heartbeat = False 311 logger.debug(' task stopped by general exception') 312 tr = TaskReport(STOPPED) 313 else: 314 if p.is_alive(): 315 p.terminate() 316 logger.debug(' task timeout') 317 try: 318 # we try to get a traceback here 319 tr = queue.get(timeout=2) 320 tr.status = TIMEOUT 321 tr.output = task_output 322 except Queue.Empty: 323 tr = TaskReport(TIMEOUT) 324 elif queue.empty(): 325 self.have_heartbeat = False 326 logger.debug(' task stopped') 327 tr = TaskReport(STOPPED) 328 else: 329 logger.debug(' task completed or failed') 330 tr = queue.get() 331 tr.output = task_output 332 return tr
333
334 - def die(self):
335 logger.info('die!') 336 self.have_heartbeat = False 337 self.terminate_process()
338
339 - def give_up(self):
340 logger.info('Giving up as soon as possible!') 341 self.have_heartbeat = False
342
343 - def terminate_process(self):
344 try: 345 self.process.terminate() 346 except: 347 pass # no process to terminate
348
349 - def run(self):
350 """ the thread that sends heartbeat """ 351 counter = 0 352 while self.have_heartbeat: 353 self.send_heartbeat(counter) 354 counter += 1
355
356 - def start_heartbeats(self):
357 self.start()
358
359 - def send_heartbeat(self, counter):
360 print 'thum' 361 time.sleep(1)
362
363 - def pop_task(self):
364 return Task( 365 app=None, 366 function='demo_function', 367 timeout=7, 368 args='[2]', 369 vars='{}')
370
371 - def report_task(self, task, task_report):
372 print 'reporting task' 373 pass
374
375 - def sleep(self):
376 pass
377
378 - def loop(self):
379 try: 380 self.start_heartbeats() 381 while True and self.have_heartbeat: 382 logger.debug('looping...') 383 task = self.pop_task() 384 if task: 385 self.empty_runs = 0 386 self.report_task(task, self.async(task)) 387 else: 388 self.empty_runs += 1 389 logger.debug('sleeping...') 390 if self.max_empty_runs != 0: 391 logger.debug('empty runs %s/%s', 392 self.empty_runs, self.max_empty_runs) 393 if self.empty_runs >= self.max_empty_runs: 394 logger.info( 395 'empty runs limit reached, killing myself') 396 self.die() 397 self.sleep() 398 except KeyboardInterrupt: 399 self.die()
400 401 402 TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED, EXPIRED) 403 RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED) 404 WORKER_STATUS = (ACTIVE, PICK, DISABLED, TERMINATE, KILL, STOP_TASK) 405 406
407 -class TYPE(object):
408 """ 409 validator that check whether field is valid json and validate its type 410 """ 411
412 - def __init__(self, myclass=list, parse=False):
413 self.myclass = myclass 414 self.parse = parse
415
416 - def __call__(self, value):
417 from gluon import current 418 try: 419 obj = loads(value) 420 except: 421 return (value, current.T('invalid json')) 422 else: 423 if isinstance(obj, self.myclass): 424 if self.parse: 425 return (obj, None) 426 else: 427 return (value, None) 428 else: 429 return (value, current.T('Not of type: %s') % self.myclass)
430 431
432 -class Scheduler(MetaScheduler):
433 - def __init__(self, db, tasks=None, migrate=True, 434 worker_name=None, group_names=['main'], heartbeat=HEARTBEAT, 435 max_empty_runs=0, discard_results=False, utc_time=False):
436 437 MetaScheduler.__init__(self) 438 439 self.db = db 440 self.db_thread = None 441 self.tasks = tasks 442 self.group_names = group_names 443 self.heartbeat = heartbeat 444 self.worker_name = worker_name or IDENTIFIER 445 #list containing status as recorded in the table plus a boost parameter 446 #for hibernation (i.e. when someone stop the worker acting on the worker table) 447 self.worker_status = [RUNNING, 1] 448 self.max_empty_runs = max_empty_runs 449 self.discard_results = discard_results 450 self.is_a_ticker = False 451 self.do_assign_tasks = False 452 self.greedy = False 453 self.utc_time = utc_time 454 455 from gluon import current 456 current._scheduler = self 457 458 self.define_tables(db, migrate=migrate)
459
460 - def __get_migrate(self, tablename, migrate=True):
461 if migrate is False: 462 return False 463 elif migrate is True: 464 return True 465 elif isinstance(migrate, str): 466 return "%s%s.table" % (migrate , tablename) 467 return True
468
469 - def now(self):
470 return self.utc_time and datetime.datetime.utcnow() or datetime.datetime.now()
471
472 - def set_requirements(self, scheduler_task):
473 from gluon import current 474 if hasattr(current, 'request'): 475 scheduler_task.application_name.default = '%s/%s' % ( 476 current.request.application, current.request.controller 477 )
478
479 - def define_tables(self, db, migrate):
480 from gluon.dal import DEFAULT 481 logger.debug('defining tables (migrate=%s)', migrate) 482 now = self.now 483 db.define_table( 484 'scheduler_task', 485 Field('application_name', requires=IS_NOT_EMPTY(), 486 default=None, writable=False), 487 Field('task_name', default=None), 488 Field('group_name', default='main'), 489 Field('status', requires=IS_IN_SET(TASK_STATUS), 490 default=QUEUED, writable=False), 491 Field('function_name', 492 requires=IS_IN_SET(sorted(self.tasks.keys())) 493 if self.tasks else DEFAULT), 494 Field('uuid', length=255, 495 requires=IS_NOT_IN_DB(db, 'scheduler_task.uuid'), 496 unique=True, default=web2py_uuid), 497 Field('args', 'text', default='[]', requires=TYPE(list)), 498 Field('vars', 'text', default='{}', requires=TYPE(dict)), 499 Field('enabled', 'boolean', default=True), 500 Field('start_time', 'datetime', default=now, 501 requires=IS_DATETIME()), 502 Field('next_run_time', 'datetime', default=now), 503 Field('stop_time', 'datetime'), 504 Field('repeats', 'integer', default=1, comment="0=unlimited", 505 requires=IS_INT_IN_RANGE(0, None)), 506 Field('retry_failed', 'integer', default=0, comment="-1=unlimited", 507 requires=IS_INT_IN_RANGE(-1, None)), 508 Field('period', 'integer', default=60, comment='seconds', 509 requires=IS_INT_IN_RANGE(0, None)), 510 Field('prevent_drift', 'boolean', default=False, 511 comment='Cron-like start_times between runs'), 512 Field('timeout', 'integer', default=60, comment='seconds', 513 requires=IS_INT_IN_RANGE(0, None)), 514 Field('sync_output', 'integer', default=0, 515 comment="update output every n sec: 0=never", 516 requires=IS_INT_IN_RANGE(0, None)), 517 Field('times_run', 'integer', default=0, writable=False), 518 Field('times_failed', 'integer', default=0, writable=False), 519 Field('last_run_time', 'datetime', writable=False, readable=False), 520 Field('assigned_worker_name', default='', writable=False), 521 on_define=self.set_requirements, 522 migrate=self.__get_migrate('scheduler_task', migrate), 523 format='%(task_name)s') 524 525 db.define_table( 526 'scheduler_run', 527 Field('task_id', 'reference scheduler_task'), 528 Field('status', requires=IS_IN_SET(RUN_STATUS)), 529 Field('start_time', 'datetime'), 530 Field('stop_time', 'datetime'), 531 Field('run_output', 'text'), 532 Field('run_result', 'text'), 533 Field('traceback', 'text'), 534 Field('worker_name', default=self.worker_name), 535 migrate=self.__get_migrate('scheduler_run', migrate) 536 ) 537 538 db.define_table( 539 'scheduler_worker', 540 Field('worker_name', length=255, unique=True), 541 Field('first_heartbeat', 'datetime'), 542 Field('last_heartbeat', 'datetime'), 543 Field('status', requires=IS_IN_SET(WORKER_STATUS)), 544 Field('is_ticker', 'boolean', default=False, writable=False), 545 Field('group_names', 'list:string', default=self.group_names), 546 migrate=self.__get_migrate('scheduler_worker', migrate) 547 ) 548 549 if migrate is not False: 550 db.commit()
551
552 - def loop(self, worker_name=None):
553 signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1)) 554 try: 555 self.start_heartbeats() 556 while True and self.have_heartbeat: 557 if self.worker_status[0] == DISABLED: 558 logger.debug('Someone stopped me, sleeping until better times come (%s)', self.worker_status[1]) 559 self.sleep() 560 continue 561 logger.debug('looping...') 562 task = self.wrapped_pop_task() 563 if task: 564 self.empty_runs = 0 565 self.worker_status[0] = RUNNING 566 self.report_task(task, self.async(task)) 567 self.worker_status[0] = ACTIVE 568 else: 569 self.empty_runs += 1 570 logger.debug('sleeping...') 571 if self.max_empty_runs != 0: 572 logger.debug('empty runs %s/%s', 573 self.empty_runs, self.max_empty_runs) 574 if self.empty_runs >= self.max_empty_runs: 575 logger.info( 576 'empty runs limit reached, killing myself') 577 self.die() 578 self.sleep() 579 except (KeyboardInterrupt, SystemExit): 580 logger.info('catched') 581 self.die()
582
583 - def wrapped_assign_tasks(self, db):
584 logger.debug('Assigning tasks...') 585 db.commit() #db.commit() only for Mysql 586 x = 0 587 while x < 10: 588 try: 589 self.assign_tasks(db) 590 db.commit() 591 logger.debug('Tasks assigned...') 592 break 593 except: 594 db.rollback() 595 logger.error('TICKER: error assigning tasks (%s)', x) 596 x += 1 597 time.sleep(0.5)
598
599 - def wrapped_pop_task(self):
600 db = self.db 601 db.commit() #another nifty db.commit() only for Mysql 602 x = 0 603 while x < 10: 604 try: 605 rtn = self.pop_task(db) 606 return rtn 607 break 608 except: 609 db.rollback() 610 logger.error(' error popping tasks') 611 x += 1 612 time.sleep(0.5)
613
614 - def pop_task(self, db):
615 now = self.now() 616 st = self.db.scheduler_task 617 if self.is_a_ticker and self.do_assign_tasks: 618 #I'm a ticker, and 5 loops passed without reassigning tasks, let's do 619 #that and loop again 620 self.wrapped_assign_tasks(db) 621 return None 622 #ready to process something 623 grabbed = db(st.assigned_worker_name == self.worker_name)( 624 st.status == ASSIGNED) 625 626 task = grabbed.select(limitby=(0, 1), orderby=st.next_run_time).first() 627 if task: 628 task.update_record(status=RUNNING, last_run_time=now) 629 #noone will touch my task! 630 db.commit() 631 logger.debug(' work to do %s', task.id) 632 else: 633 if self.greedy and self.is_a_ticker: 634 #there are other tasks ready to be assigned 635 logger.info('TICKER: greedy loop') 636 self.wrapped_assign_tasks(db) 637 else: 638 logger.info('nothing to do') 639 return None 640 times_run = task.times_run + 1 641 if not task.prevent_drift: 642 next_run_time = task.last_run_time + datetime.timedelta( 643 seconds=task.period 644 ) 645 else: 646 next_run_time = task.start_time + datetime.timedelta( 647 seconds=task.period * times_run 648 ) 649 if times_run < task.repeats or task.repeats == 0: 650 #need to run (repeating task) 651 run_again = True 652 else: 653 #no need to run again 654 run_again = False 655 run_id = 0 656 while True and not self.discard_results: 657 logger.debug(' new scheduler_run record') 658 try: 659 run_id = db.scheduler_run.insert( 660 task_id=task.id, 661 status=RUNNING, 662 start_time=now, 663 worker_name=self.worker_name) 664 db.commit() 665 break 666 except: 667 time.sleep(0.5) 668 db.rollback() 669 logger.info('new task %(id)s "%(task_name)s" %(application_name)s.%(function_name)s' % task) 670 return Task( 671 app=task.application_name, 672 function=task.function_name, 673 timeout=task.timeout, 674 args=task.args, # in json 675 vars=task.vars, # in json 676 task_id=task.id, 677 run_id=run_id, 678 run_again=run_again, 679 next_run_time=next_run_time, 680 times_run=times_run, 681 stop_time=task.stop_time, 682 retry_failed=task.retry_failed, 683 times_failed=task.times_failed, 684 sync_output=task.sync_output, 685 uuid=task.uuid)
686
687 - def report_task(self, task, task_report):
688 db = self.db 689 now = self.now() 690 while True: 691 try: 692 if not self.discard_results: 693 if task_report.result != 'null' or task_report.tb: 694 #result is 'null' as a string if task completed 695 #if it's stopped it's None as NoneType, so we record 696 #the STOPPED "run" anyway 697 logger.debug(' recording task report in db (%s)', 698 task_report.status) 699 db(db.scheduler_run.id == task.run_id).update( 700 status=task_report.status, 701 stop_time=now, 702 run_result=task_report.result, 703 run_output=task_report.output, 704 traceback=task_report.tb) 705 else: 706 logger.debug(' deleting task report in db because of no result') 707 db(db.scheduler_run.id == task.run_id).delete() 708 #if there is a stop_time and the following run would exceed it 709 is_expired = (task.stop_time 710 and task.next_run_time > task.stop_time 711 and True or False) 712 status = (task.run_again and is_expired and EXPIRED 713 or task.run_again and not is_expired 714 and QUEUED or COMPLETED) 715 if task_report.status == COMPLETED: 716 d = dict(status=status, 717 next_run_time=task.next_run_time, 718 times_run=task.times_run, 719 times_failed=0 720 ) 721 db(db.scheduler_task.id == task.task_id)( 722 db.scheduler_task.status == RUNNING).update(**d) 723 else: 724 st_mapping = {'FAILED': 'FAILED', 725 'TIMEOUT': 'TIMEOUT', 726 'STOPPED': 'QUEUED'}[task_report.status] 727 status = (task.retry_failed 728 and task.times_failed < task.retry_failed 729 and QUEUED or task.retry_failed == -1 730 and QUEUED or st_mapping) 731 db( 732 (db.scheduler_task.id == task.task_id) & 733 (db.scheduler_task.status == RUNNING) 734 ).update( 735 times_failed=db.scheduler_task.times_failed + 1, 736 next_run_time=task.next_run_time, 737 status=status 738 ) 739 db.commit() 740 logger.info('task completed (%s)', task_report.status) 741 break 742 except: 743 db.rollback() 744 time.sleep(0.5)
745
746 - def adj_hibernation(self):
747 if self.worker_status[0] == DISABLED: 748 wk_st = self.worker_status[1] 749 hibernation = wk_st + 1 if wk_st < MAXHIBERNATION else MAXHIBERNATION 750 self.worker_status[1] = hibernation
751
752 - def send_heartbeat(self, counter):
753 if not self.db_thread: 754 logger.debug('thread building own DAL object') 755 self.db_thread = DAL( 756 self.db._uri, folder=self.db._adapter.folder) 757 self.define_tables(self.db_thread, migrate=False) 758 try: 759 db = self.db_thread 760 sw, st = db.scheduler_worker, db.scheduler_task 761 now = self.now() 762 # record heartbeat 763 mybackedstatus = db(sw.worker_name == self.worker_name).select().first() 764 if not mybackedstatus: 765 sw.insert(status=ACTIVE, worker_name=self.worker_name, 766 first_heartbeat=now, last_heartbeat=now, 767 group_names=self.group_names) 768 self.worker_status = [ACTIVE, 1] # activating the process 769 mybackedstatus = ACTIVE 770 else: 771 mybackedstatus = mybackedstatus.status 772 if mybackedstatus == DISABLED: 773 # keep sleeping 774 self.worker_status[0] = DISABLED 775 if self.worker_status[1] == MAXHIBERNATION: 776 logger.debug('........recording heartbeat (%s)', self.worker_status[0]) 777 db(sw.worker_name == self.worker_name).update( 778 last_heartbeat=now) 779 elif mybackedstatus == TERMINATE: 780 self.worker_status[0] = TERMINATE 781 logger.debug("Waiting to terminate the current task") 782 self.give_up() 783 return 784 elif mybackedstatus == KILL: 785 self.worker_status[0] = KILL 786 self.die() 787 else: 788 if mybackedstatus == STOP_TASK: 789 logger.info('Asked to kill the current task') 790 self.terminate_process() 791 logger.debug('........recording heartbeat (%s)', self.worker_status[0]) 792 db(sw.worker_name == self.worker_name).update( 793 last_heartbeat=now, status=ACTIVE) 794 self.worker_status[1] = 1 # re-activating the process 795 if self.worker_status[0] != RUNNING: 796 self.worker_status[0] = ACTIVE 797 798 self.do_assign_tasks = False 799 if counter % 5 == 0 or mybackedstatus == PICK: 800 try: 801 # delete inactive workers 802 expiration = now - datetime.timedelta(seconds=self.heartbeat * 3) 803 departure = now - datetime.timedelta( 804 seconds=self.heartbeat * 3 * MAXHIBERNATION) 805 logger.debug( 806 ' freeing workers that have not sent heartbeat') 807 inactive_workers = db( 808 ((sw.last_heartbeat < expiration) & (sw.status == ACTIVE)) | 809 ((sw.last_heartbeat < departure) & (sw.status != ACTIVE)) 810 ) 811 db(st.assigned_worker_name.belongs( 812 inactive_workers._select(sw.worker_name)))(st.status == RUNNING)\ 813 .update(assigned_worker_name='', status=QUEUED) 814 inactive_workers.delete() 815 try: 816 self.is_a_ticker = self.being_a_ticker() 817 except: 818 logger.error('Error coordinating TICKER') 819 if self.worker_status[0] == ACTIVE: 820 self.do_assign_tasks = True 821 except: 822 logger.error('Error cleaning up') 823 db.commit() 824 except: 825 logger.error('Error retrieving status') 826 db.rollback() 827 self.adj_hibernation() 828 self.sleep()
829
830 - def being_a_ticker(self):
831 db = self.db_thread 832 sw = db.scheduler_worker 833 all_active = db( 834 (sw.worker_name != self.worker_name) & (sw.status == ACTIVE) 835 ).select() 836 ticker = all_active.find(lambda row: row.is_ticker is True).first() 837 not_busy = self.worker_status[0] == ACTIVE 838 if not ticker: 839 #if no other tickers are around 840 if not_busy: 841 #only if I'm not busy 842 db(sw.worker_name == self.worker_name).update(is_ticker=True) 843 db(sw.worker_name != self.worker_name).update(is_ticker=False) 844 logger.info("TICKER: I'm a ticker") 845 else: 846 #I'm busy 847 if len(all_active) >= 1: 848 #so I'll "downgrade" myself to a "poor worker" 849 db(sw.worker_name == self.worker_name).update(is_ticker=False) 850 else: 851 not_busy = True 852 db.commit() 853 return not_busy 854 else: 855 logger.info( 856 "%s is a ticker, I'm a poor worker" % ticker.worker_name) 857 return False
858
859 - def assign_tasks(self, db):
860 sw, st = db.scheduler_worker, db.scheduler_task 861 now = self.now() 862 all_workers = db(sw.status == ACTIVE).select() 863 #build workers as dict of groups 864 wkgroups = {} 865 for w in all_workers: 866 group_names = w.group_names 867 for gname in group_names: 868 if gname not in wkgroups: 869 wkgroups[gname] = dict( 870 workers=[{'name': w.worker_name, 'c': 0}]) 871 else: 872 wkgroups[gname]['workers'].append( 873 {'name': w.worker_name, 'c': 0}) 874 #set queued tasks that expired between "runs" (i.e., you turned off 875 #the scheduler): then it wasn't expired, but now it is 876 db(st.status.belongs( 877 (QUEUED, ASSIGNED)))(st.stop_time < now).update(status=EXPIRED) 878 879 all_available = db( 880 (st.status.belongs((QUEUED, ASSIGNED))) & 881 ((st.times_run < st.repeats) | (st.repeats == 0)) & 882 (st.start_time <= now) & 883 ((st.stop_time == None) | (st.stop_time > now)) & 884 (st.next_run_time <= now) & 885 (st.enabled == True) 886 ) 887 limit = len(all_workers) * (50 / (len(wkgroups) or 1)) 888 #if there are a moltitude of tasks, let's figure out a maximum of tasks per worker. 889 #this can be adjusted with some added intelligence (like esteeming how many tasks will 890 #a worker complete before the ticker reassign them around, but the gain is quite small 891 #50 is quite a sweet spot also for fast tasks, with sane heartbeat values 892 #NB: ticker reassign tasks every 5 cycles, so if a worker completes his 50 tasks in less 893 #than heartbeat*5 seconds, it won't pick new tasks until heartbeat*5 seconds pass. 894 895 #If a worker is currently elaborating a long task, all other tasks assigned 896 #to him needs to be reassigned "freely" to other workers, that may be free. 897 #this shuffles up things a bit, in order to maintain the idea of a semi-linear scalability 898 899 #let's freeze it up 900 db.commit() 901 x = 0 902 for group in wkgroups.keys(): 903 tasks = all_available(st.group_name == group).select( 904 limitby=(0, limit), orderby = st.next_run_time) 905 #let's break up the queue evenly among workers 906 for task in tasks: 907 x += 1 908 gname = task.group_name 909 ws = wkgroups.get(gname) 910 if ws: 911 counter = 0 912 myw = 0 913 for i, w in enumerate(ws['workers']): 914 if w['c'] < counter: 915 myw = i 916 counter = w['c'] 917 d = dict( 918 status=ASSIGNED, 919 assigned_worker_name=wkgroups[gname]['workers'][myw]['name'] 920 ) 921 if not task.task_name: 922 d['task_name'] = task.function_name 923 db((st.id==task.id) & (st.status.belongs((QUEUED, ASSIGNED)))).update(**d) 924 db.commit() 925 wkgroups[gname]['workers'][myw]['c'] += 1 926 927 #I didn't report tasks but I'm working nonetheless!!!! 928 if x > 0: 929 self.empty_runs = 0 930 #I'll be greedy only if tasks assigned are equal to the limit 931 # (meaning there could be others ready to be assigned) 932 self.greedy = x >= limit and True or False 933 logger.info('TICKER: workers are %s', len(all_workers)) 934 logger.info('TICKER: tasks are %s', x)
935
936 - def sleep(self):
937 time.sleep(self.heartbeat * self.worker_status[1])
938 # should only sleep until next available task 939
940 - def set_worker_status(self, group_names=None, action=ACTIVE):
941 if not group_names: 942 group_names = self.group_names 943 elif isinstance(group_names, str): 944 group_names = [group_names] 945 for group in group_names: 946 self.db( 947 self.db.scheduler_worker.group_names.contains(group) 948 ).update(status=action)
949
950 - def disable(self, group_names=None):
951 self.set_worker_status(group_names=group_names,action=DISABLED)
952
953 - def resume(self, group_names=None):
954 self.set_worker_status(group_names=group_names,action=ACTIVE)
955
956 - def terminate(self, group_names=None):
957 self.set_worker_status(group_names=group_names,action=TERMINATE)
958
959 - def kill(self, group_names=None):
960 self.set_worker_status(group_names=group_names,action=KILL)
961
962 - def queue_task(self, function, pargs=[], pvars={}, **kwargs):
963 """ 964 Queue tasks. This takes care of handling the validation of all 965 values. 966 :param function: the function (anything callable with a __name__) 967 :param pargs: "raw" args to be passed to the function. Automatically 968 jsonified. 969 :param pvars: "raw" kwargs to be passed to the function. Automatically 970 jsonified 971 :param kwargs: all the scheduler_task columns. args and vars here should be 972 in json format already, they will override pargs and pvars 973 974 returns a dict just as a normal validate_and_insert, plus a uuid key holding 975 the uuid of the queued task. If validation is not passed, both id and uuid 976 will be None, and you'll get an "error" dict holding the errors found. 977 """ 978 if hasattr(function, '__name__'): 979 function = function.__name__ 980 targs = 'args' in kwargs and kwargs.pop('args') or dumps(pargs) 981 tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(pvars) 982 tuuid = 'uuid' in kwargs and kwargs.pop('uuid') or web2py_uuid() 983 tname = 'task_name' in kwargs and kwargs.pop('task_name') or function 984 immediate = 'immediate' in kwargs and kwargs.pop('immediate') or None 985 rtn = self.db.scheduler_task.validate_and_insert( 986 function_name=function, 987 task_name=tname, 988 args=targs, 989 vars=tvars, 990 uuid=tuuid, 991 **kwargs) 992 if not rtn.errors: 993 rtn.uuid = tuuid 994 if immediate: 995 self.db(self.db.scheduler_worker.is_ticker == True).update(status=PICK) 996 else: 997 rtn.uuid = None 998 return rtn
999
1000 - def task_status(self, ref, output=False):
1001 """ 1002 Shortcut for task status retrieval 1003 1004 :param ref: can be 1005 - integer --> lookup will be done by scheduler_task.id 1006 - string --> lookup will be done by scheduler_task.uuid 1007 - query --> lookup as you wish (as in db.scheduler_task.task_name == 'test1') 1008 :param output: fetch also the scheduler_run record 1009 1010 Returns a single Row object, for the last queued task 1011 If output == True, returns also the last scheduler_run record 1012 scheduler_run record is fetched by a left join, so it can 1013 have all fields == None 1014 1015 """ 1016 from gluon.dal import Query 1017 sr, st = self.db.scheduler_run, self.db.scheduler_task 1018 if isinstance(ref, int): 1019 q = st.id == ref 1020 elif isinstance(ref, str): 1021 q = st.uuid == ref 1022 elif isinstance(ref, Query): 1023 q = ref 1024 else: 1025 raise SyntaxError( 1026 "You can retrieve results only by id, uuid or Query") 1027 fields = [st.ALL] 1028 left = False 1029 orderby = ~st.id 1030 if output: 1031 fields = st.ALL, sr.ALL 1032 left = sr.on(sr.task_id == st.id) 1033 orderby = ~st.id | ~sr.id 1034 row = self.db(q).select( 1035 *fields, 1036 **dict(orderby=orderby, 1037 left=left, 1038 limitby=(0, 1)) 1039 ).first() 1040 if row and output: 1041 row.result = row.scheduler_run.run_result and \ 1042 loads(row.scheduler_run.run_result, 1043 object_hook=_decode_dict) or None 1044 return row
1045
1046 - def stop_task(self, ref):
1047 """ 1048 Experimental!!! 1049 Shortcut for task termination. 1050 If the task is RUNNING it will terminate it --> execution will be set as FAILED 1051 If the task is QUEUED, its stop_time will be set as to "now", 1052 the enabled flag will be set to False, status to STOPPED 1053 1054 :param ref: can be 1055 - integer --> lookup will be done by scheduler_task.id 1056 - string --> lookup will be done by scheduler_task.uuid 1057 Returns: 1058 - 1 if task was stopped (meaning an update has been done) 1059 - None if task was not found, or if task was not RUNNING or QUEUED 1060 """ 1061 from gluon.dal import Query 1062 st, sw = self.db.scheduler_task, self.db.scheduler_worker 1063 if isinstance(ref, int): 1064 q = st.id == ref 1065 elif isinstance(ref, str): 1066 q = st.uuid == ref 1067 else: 1068 raise SyntaxError( 1069 "You can retrieve results only by id or uuid") 1070 task = self.db(q).select(st.id, st.status, st.assigned_worker_name).first() 1071 rtn = None 1072 if not task: 1073 return rtn 1074 if task.status == 'RUNNING': 1075 rtn = self.db(sw.worker_name == task.assigned_worker_name).update(status=STOP_TASK) 1076 elif task.status == 'QUEUED': 1077 rtn = self.db(q).update(stop_time=self.now(), enabled=False, status=STOPPED) 1078 return rtn
1079 1080
1081 -def main():
1082 """ 1083 allows to run worker without python web2py.py .... by simply python this.py 1084 """ 1085 parser = optparse.OptionParser() 1086 parser.add_option( 1087 "-w", "--worker_name", dest="worker_name", default=None, 1088 help="start a worker with name") 1089 parser.add_option( 1090 "-b", "--heartbeat", dest="heartbeat", default=10, 1091 type='int', help="heartbeat time in seconds (default 10)") 1092 parser.add_option( 1093 "-L", "--logger_level", dest="logger_level", 1094 default=30, 1095 type='int', 1096 help="set debug output level (0-100, 0 means all, 100 means none;default is 30)") 1097 parser.add_option("-E", "--empty-runs", 1098 dest="max_empty_runs", 1099 type='int', 1100 default=0, 1101 help="max loops with no grabbed tasks permitted (0 for never check)") 1102 parser.add_option( 1103 "-g", "--group_names", dest="group_names", 1104 default='main', 1105 help="comma separated list of groups to be picked by the worker") 1106 parser.add_option( 1107 "-f", "--db_folder", dest="db_folder", 1108 default='/Users/mdipierro/web2py/applications/scheduler/databases', 1109 help="location of the dal database folder") 1110 parser.add_option( 1111 "-u", "--db_uri", dest="db_uri", 1112 default='sqlite://storage.sqlite', 1113 help="database URI string (web2py DAL syntax)") 1114 parser.add_option( 1115 "-t", "--tasks", dest="tasks", default=None, 1116 help="file containing task files, must define" + 1117 "tasks = {'task_name':(lambda: 'output')} or similar set of tasks") 1118 parser.add_option( 1119 "-U", "--utc-time", dest="utc_time", default=False, 1120 help="work with UTC timestamps" 1121 ) 1122 (options, args) = parser.parse_args() 1123 if not options.tasks or not options.db_uri: 1124 print USAGE 1125 if options.tasks: 1126 path, filename = os.path.split(options.tasks) 1127 if filename.endswith('.py'): 1128 filename = filename[:-3] 1129 sys.path.append(path) 1130 print 'importing tasks...' 1131 tasks = __import__(filename, globals(), locals(), [], -1).tasks 1132 print 'tasks found: ' + ', '.join(tasks.keys()) 1133 else: 1134 tasks = {} 1135 group_names = [x.strip() for x in options.group_names.split(',')] 1136 1137 logging.getLogger().setLevel(options.logger_level) 1138 1139 print 'groups for this worker: ' + ', '.join(group_names) 1140 print 'connecting to database in folder: ' + options.db_folder or './' 1141 print 'using URI: ' + options.db_uri 1142 db = DAL(options.db_uri, folder=options.db_folder) 1143 print 'instantiating scheduler...' 1144 scheduler = Scheduler(db=db, 1145 worker_name=options.worker_name, 1146 tasks=tasks, 1147 migrate=True, 1148 group_names=group_names, 1149 heartbeat=options.heartbeat, 1150 max_empty_runs=options.max_empty_runs, 1151 utc_time=options.utc_time) 1152 signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1)) 1153 print 'starting main worker loop...' 1154 scheduler.loop()
1155 1156 if __name__ == '__main__': 1157 main() 1158