1
2
3
4 USAGE = """
5 ## Example
6
7 For any existing app
8
9 Create File: app/models/scheduler.py ======
10 from gluon.scheduler import Scheduler
11
12 def demo1(*args,**vars):
13 print 'you passed args=%s and vars=%s' % (args, vars)
14 return 'done!'
15
16 def demo2():
17 1/0
18
19 scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2))
20 ## run worker nodes with:
21
22 cd web2py
23 python web2py.py -K myapp
24 or
25 python gluon/scheduler.py -u sqlite://storage.sqlite \
26 -f applications/myapp/databases/ \
27 -t mytasks.py
28 (-h for info)
29 python scheduler.py -h
30
31 ## schedule jobs using
32 http://127.0.0.1:8000/myapp/appadmin/insert/db/scheduler_task
33
34 ## monitor scheduled jobs
35 http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_task.id>0
36
37 ## view completed jobs
38 http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_run.id>0
39
40 ## view workers
41 http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_worker.id>0
42
43 ## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put
44 ## the following into /etc/init/web2py-scheduler.conf:
45 ## (This assumes your web2py instance is installed in <user>'s home directory,
46 ## running as <user>, with app <myapp>, on network interface eth0.)
47
48 description "web2py task scheduler"
49 start on (local-filesystems and net-device-up IFACE=eth0)
50 stop on shutdown
51 respawn limit 8 60 # Give up if restart occurs 8 times in 60 seconds.
52 exec sudo -u <user> python /home/<user>/web2py/web2py.py -K <myapp>
53 respawn
54
55 ## You can then start/stop/restart/check status of the daemon with:
56 sudo start web2py-scheduler
57 sudo stop web2py-scheduler
58 sudo restart web2py-scheduler
59 sudo status web2py-scheduler
60 """
61
62 import os
63 import time
64 import multiprocessing
65 import sys
66 import threading
67 import traceback
68 import signal
69 import socket
70 import datetime
71 import logging
72 import optparse
73 import types
74 import Queue
75
76 path = os.getcwd()
77
78 if 'WEB2PY_PATH' not in os.environ:
79 os.environ['WEB2PY_PATH'] = path
80
81 try:
82 from gluon.contrib.simplejson import loads, dumps
83 except:
84 from simplejson import loads, dumps
85
86 IDENTIFIER = "%s#%s" % (socket.gethostname(),os.getpid())
87
88 logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER)
89
90 from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET, IS_NOT_IN_DB
91 from gluon import IS_INT_IN_RANGE, IS_DATETIME
92 from gluon.utils import web2py_uuid
93 from gluon.storage import Storage
94
95
96 QUEUED = 'QUEUED'
97 ASSIGNED = 'ASSIGNED'
98 RUNNING = 'RUNNING'
99 COMPLETED = 'COMPLETED'
100 FAILED = 'FAILED'
101 TIMEOUT = 'TIMEOUT'
102 STOPPED = 'STOPPED'
103 ACTIVE = 'ACTIVE'
104 TERMINATE = 'TERMINATE'
105 DISABLED = 'DISABLED'
106 KILL = 'KILL'
107 PICK = 'PICK'
108 STOP_TASK = 'STOP_TASK'
109 EXPIRED = 'EXPIRED'
110 SECONDS = 1
111 HEARTBEAT = 3 * SECONDS
112 MAXHIBERNATION = 10
113 CLEAROUT = '!clear!'
114
115 CALLABLETYPES = (types.LambdaType, types.FunctionType,
116 types.BuiltinFunctionType,
117 types.MethodType, types.BuiltinMethodType)
118
119
121 - def __init__(self, app, function, timeout, args='[]', vars='{}', **kwargs):
122 logger.debug(' new task allocated: %s.%s', app, function)
123 self.app = app
124 self.function = function
125 self.timeout = timeout
126 self.args = args
127 self.vars = vars
128 self.__dict__.update(kwargs)
129
131 return '<Task: %s>' % self.function
132
133
135 - def __init__(self, status, result=None, output=None, tb=None):
136 logger.debug(' new task report: %s', status)
137 if tb:
138 logger.debug(' traceback: %s', tb)
139 else:
140 logger.debug(' result: %s', result)
141 self.status = status
142 self.result = result
143 self.output = output
144 self.tb = tb
145
147 return '<TaskReport: %s>' % self.status
148
149
151 """ test function """
152 for i in range(argv[0]):
153 print 'click', i
154 time.sleep(1)
155 return 'done'
156
157
158
159
160
161
163 newlist = []
164 for i in lst:
165 if isinstance(i, unicode):
166 i = i.encode('utf-8')
167 elif isinstance(i, list):
168 i = _decode_list(i)
169 newlist.append(i)
170 return newlist
171
172
174 newdict = {}
175 for k, v in dct.iteritems():
176 if isinstance(k, unicode):
177 k = k.encode('utf-8')
178 if isinstance(v, unicode):
179 v = v.encode('utf-8')
180 elif isinstance(v, list):
181 v = _decode_list(v)
182 newdict[k] = v
183 return newdict
184
185
187 """ the background process """
188 logger.debug(' task started')
189
190 class LogOutput(object):
191 """Facility to log output at intervals"""
192 def __init__(self, out_queue):
193 self.out_queue = out_queue
194 self.stdout = sys.stdout
195 sys.stdout = self
196
197 def __del__(self):
198 sys.stdout = self.stdout
199
200 def flush(self):
201 pass
202
203 def write(self, data):
204 self.out_queue.put(data)
205
206 W2P_TASK = Storage({'id' : task.task_id, 'uuid' : task.uuid})
207 stdout = LogOutput(out)
208 try:
209 if task.app:
210 os.chdir(os.environ['WEB2PY_PATH'])
211 from gluon.shell import env, parse_path_info
212 from gluon import current
213 level = logging.getLogger().getEffectiveLevel()
214 logging.getLogger().setLevel(logging.WARN)
215
216
217 (a, c, f) = parse_path_info(task.app)
218 _env = env(a=a, c=c, import_models=True)
219 logging.getLogger().setLevel(level)
220 f = task.function
221 functions = current._scheduler.tasks
222 if not functions:
223
224 _function = _env.get(f)
225 else:
226 _function = functions.get(f)
227 if not isinstance(_function, CALLABLETYPES):
228 raise NameError(
229 "name '%s' not found in scheduler's environment" % f)
230
231 _env.update({'W2P_TASK' : W2P_TASK})
232
233 from gluon import current
234 current.W2P_TASK = W2P_TASK
235 globals().update(_env)
236 args = loads(task.args)
237 vars = loads(task.vars, object_hook=_decode_dict)
238 result = dumps(_function(*args, **vars))
239 else:
240
241 result = eval(task.function)(
242 *loads(task.args, object_hook=_decode_dict),
243 **loads(task.vars, object_hook=_decode_dict))
244 queue.put(TaskReport('COMPLETED', result=result))
245 except BaseException, e:
246 tb = traceback.format_exc()
247 queue.put(TaskReport('FAILED', tb=tb))
248 del stdout
249
250
400
401
402 TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED, EXPIRED)
403 RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED)
404 WORKER_STATUS = (ACTIVE, PICK, DISABLED, TERMINATE, KILL, STOP_TASK)
405
406
408 """
409 validator that check whether field is valid json and validate its type
410 """
411
412 - def __init__(self, myclass=list, parse=False):
415
417 from gluon import current
418 try:
419 obj = loads(value)
420 except:
421 return (value, current.T('invalid json'))
422 else:
423 if isinstance(obj, self.myclass):
424 if self.parse:
425 return (obj, None)
426 else:
427 return (value, None)
428 else:
429 return (value, current.T('Not of type: %s') % self.myclass)
430
431
433 - def __init__(self, db, tasks=None, migrate=True,
434 worker_name=None, group_names=['main'], heartbeat=HEARTBEAT,
435 max_empty_runs=0, discard_results=False, utc_time=False):
436
437 MetaScheduler.__init__(self)
438
439 self.db = db
440 self.db_thread = None
441 self.tasks = tasks
442 self.group_names = group_names
443 self.heartbeat = heartbeat
444 self.worker_name = worker_name or IDENTIFIER
445
446
447 self.worker_status = [RUNNING, 1]
448 self.max_empty_runs = max_empty_runs
449 self.discard_results = discard_results
450 self.is_a_ticker = False
451 self.do_assign_tasks = False
452 self.greedy = False
453 self.utc_time = utc_time
454
455 from gluon import current
456 current._scheduler = self
457
458 self.define_tables(db, migrate=migrate)
459
461 if migrate is False:
462 return False
463 elif migrate is True:
464 return True
465 elif isinstance(migrate, str):
466 return "%s%s.table" % (migrate , tablename)
467 return True
468
470 return self.utc_time and datetime.datetime.utcnow() or datetime.datetime.now()
471
478
480 from gluon.dal import DEFAULT
481 logger.debug('defining tables (migrate=%s)', migrate)
482 now = self.now
483 db.define_table(
484 'scheduler_task',
485 Field('application_name', requires=IS_NOT_EMPTY(),
486 default=None, writable=False),
487 Field('task_name', default=None),
488 Field('group_name', default='main'),
489 Field('status', requires=IS_IN_SET(TASK_STATUS),
490 default=QUEUED, writable=False),
491 Field('function_name',
492 requires=IS_IN_SET(sorted(self.tasks.keys()))
493 if self.tasks else DEFAULT),
494 Field('uuid', length=255,
495 requires=IS_NOT_IN_DB(db, 'scheduler_task.uuid'),
496 unique=True, default=web2py_uuid),
497 Field('args', 'text', default='[]', requires=TYPE(list)),
498 Field('vars', 'text', default='{}', requires=TYPE(dict)),
499 Field('enabled', 'boolean', default=True),
500 Field('start_time', 'datetime', default=now,
501 requires=IS_DATETIME()),
502 Field('next_run_time', 'datetime', default=now),
503 Field('stop_time', 'datetime'),
504 Field('repeats', 'integer', default=1, comment="0=unlimited",
505 requires=IS_INT_IN_RANGE(0, None)),
506 Field('retry_failed', 'integer', default=0, comment="-1=unlimited",
507 requires=IS_INT_IN_RANGE(-1, None)),
508 Field('period', 'integer', default=60, comment='seconds',
509 requires=IS_INT_IN_RANGE(0, None)),
510 Field('prevent_drift', 'boolean', default=False,
511 comment='Cron-like start_times between runs'),
512 Field('timeout', 'integer', default=60, comment='seconds',
513 requires=IS_INT_IN_RANGE(0, None)),
514 Field('sync_output', 'integer', default=0,
515 comment="update output every n sec: 0=never",
516 requires=IS_INT_IN_RANGE(0, None)),
517 Field('times_run', 'integer', default=0, writable=False),
518 Field('times_failed', 'integer', default=0, writable=False),
519 Field('last_run_time', 'datetime', writable=False, readable=False),
520 Field('assigned_worker_name', default='', writable=False),
521 on_define=self.set_requirements,
522 migrate=self.__get_migrate('scheduler_task', migrate),
523 format='%(task_name)s')
524
525 db.define_table(
526 'scheduler_run',
527 Field('task_id', 'reference scheduler_task'),
528 Field('status', requires=IS_IN_SET(RUN_STATUS)),
529 Field('start_time', 'datetime'),
530 Field('stop_time', 'datetime'),
531 Field('run_output', 'text'),
532 Field('run_result', 'text'),
533 Field('traceback', 'text'),
534 Field('worker_name', default=self.worker_name),
535 migrate=self.__get_migrate('scheduler_run', migrate)
536 )
537
538 db.define_table(
539 'scheduler_worker',
540 Field('worker_name', length=255, unique=True),
541 Field('first_heartbeat', 'datetime'),
542 Field('last_heartbeat', 'datetime'),
543 Field('status', requires=IS_IN_SET(WORKER_STATUS)),
544 Field('is_ticker', 'boolean', default=False, writable=False),
545 Field('group_names', 'list:string', default=self.group_names),
546 migrate=self.__get_migrate('scheduler_worker', migrate)
547 )
548
549 if migrate is not False:
550 db.commit()
551
552 - def loop(self, worker_name=None):
553 signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1))
554 try:
555 self.start_heartbeats()
556 while True and self.have_heartbeat:
557 if self.worker_status[0] == DISABLED:
558 logger.debug('Someone stopped me, sleeping until better times come (%s)', self.worker_status[1])
559 self.sleep()
560 continue
561 logger.debug('looping...')
562 task = self.wrapped_pop_task()
563 if task:
564 self.empty_runs = 0
565 self.worker_status[0] = RUNNING
566 self.report_task(task, self.async(task))
567 self.worker_status[0] = ACTIVE
568 else:
569 self.empty_runs += 1
570 logger.debug('sleeping...')
571 if self.max_empty_runs != 0:
572 logger.debug('empty runs %s/%s',
573 self.empty_runs, self.max_empty_runs)
574 if self.empty_runs >= self.max_empty_runs:
575 logger.info(
576 'empty runs limit reached, killing myself')
577 self.die()
578 self.sleep()
579 except (KeyboardInterrupt, SystemExit):
580 logger.info('catched')
581 self.die()
582
598
613
615 now = self.now()
616 st = self.db.scheduler_task
617 if self.is_a_ticker and self.do_assign_tasks:
618
619
620 self.wrapped_assign_tasks(db)
621 return None
622
623 grabbed = db(st.assigned_worker_name == self.worker_name)(
624 st.status == ASSIGNED)
625
626 task = grabbed.select(limitby=(0, 1), orderby=st.next_run_time).first()
627 if task:
628 task.update_record(status=RUNNING, last_run_time=now)
629
630 db.commit()
631 logger.debug(' work to do %s', task.id)
632 else:
633 if self.greedy and self.is_a_ticker:
634
635 logger.info('TICKER: greedy loop')
636 self.wrapped_assign_tasks(db)
637 else:
638 logger.info('nothing to do')
639 return None
640 times_run = task.times_run + 1
641 if not task.prevent_drift:
642 next_run_time = task.last_run_time + datetime.timedelta(
643 seconds=task.period
644 )
645 else:
646 next_run_time = task.start_time + datetime.timedelta(
647 seconds=task.period * times_run
648 )
649 if times_run < task.repeats or task.repeats == 0:
650
651 run_again = True
652 else:
653
654 run_again = False
655 run_id = 0
656 while True and not self.discard_results:
657 logger.debug(' new scheduler_run record')
658 try:
659 run_id = db.scheduler_run.insert(
660 task_id=task.id,
661 status=RUNNING,
662 start_time=now,
663 worker_name=self.worker_name)
664 db.commit()
665 break
666 except:
667 time.sleep(0.5)
668 db.rollback()
669 logger.info('new task %(id)s "%(task_name)s" %(application_name)s.%(function_name)s' % task)
670 return Task(
671 app=task.application_name,
672 function=task.function_name,
673 timeout=task.timeout,
674 args=task.args,
675 vars=task.vars,
676 task_id=task.id,
677 run_id=run_id,
678 run_again=run_again,
679 next_run_time=next_run_time,
680 times_run=times_run,
681 stop_time=task.stop_time,
682 retry_failed=task.retry_failed,
683 times_failed=task.times_failed,
684 sync_output=task.sync_output,
685 uuid=task.uuid)
686
688 db = self.db
689 now = self.now()
690 while True:
691 try:
692 if not self.discard_results:
693 if task_report.result != 'null' or task_report.tb:
694
695
696
697 logger.debug(' recording task report in db (%s)',
698 task_report.status)
699 db(db.scheduler_run.id == task.run_id).update(
700 status=task_report.status,
701 stop_time=now,
702 run_result=task_report.result,
703 run_output=task_report.output,
704 traceback=task_report.tb)
705 else:
706 logger.debug(' deleting task report in db because of no result')
707 db(db.scheduler_run.id == task.run_id).delete()
708
709 is_expired = (task.stop_time
710 and task.next_run_time > task.stop_time
711 and True or False)
712 status = (task.run_again and is_expired and EXPIRED
713 or task.run_again and not is_expired
714 and QUEUED or COMPLETED)
715 if task_report.status == COMPLETED:
716 d = dict(status=status,
717 next_run_time=task.next_run_time,
718 times_run=task.times_run,
719 times_failed=0
720 )
721 db(db.scheduler_task.id == task.task_id)(
722 db.scheduler_task.status == RUNNING).update(**d)
723 else:
724 st_mapping = {'FAILED': 'FAILED',
725 'TIMEOUT': 'TIMEOUT',
726 'STOPPED': 'QUEUED'}[task_report.status]
727 status = (task.retry_failed
728 and task.times_failed < task.retry_failed
729 and QUEUED or task.retry_failed == -1
730 and QUEUED or st_mapping)
731 db(
732 (db.scheduler_task.id == task.task_id) &
733 (db.scheduler_task.status == RUNNING)
734 ).update(
735 times_failed=db.scheduler_task.times_failed + 1,
736 next_run_time=task.next_run_time,
737 status=status
738 )
739 db.commit()
740 logger.info('task completed (%s)', task_report.status)
741 break
742 except:
743 db.rollback()
744 time.sleep(0.5)
745
747 if self.worker_status[0] == DISABLED:
748 wk_st = self.worker_status[1]
749 hibernation = wk_st + 1 if wk_st < MAXHIBERNATION else MAXHIBERNATION
750 self.worker_status[1] = hibernation
751
753 if not self.db_thread:
754 logger.debug('thread building own DAL object')
755 self.db_thread = DAL(
756 self.db._uri, folder=self.db._adapter.folder)
757 self.define_tables(self.db_thread, migrate=False)
758 try:
759 db = self.db_thread
760 sw, st = db.scheduler_worker, db.scheduler_task
761 now = self.now()
762
763 mybackedstatus = db(sw.worker_name == self.worker_name).select().first()
764 if not mybackedstatus:
765 sw.insert(status=ACTIVE, worker_name=self.worker_name,
766 first_heartbeat=now, last_heartbeat=now,
767 group_names=self.group_names)
768 self.worker_status = [ACTIVE, 1]
769 mybackedstatus = ACTIVE
770 else:
771 mybackedstatus = mybackedstatus.status
772 if mybackedstatus == DISABLED:
773
774 self.worker_status[0] = DISABLED
775 if self.worker_status[1] == MAXHIBERNATION:
776 logger.debug('........recording heartbeat (%s)', self.worker_status[0])
777 db(sw.worker_name == self.worker_name).update(
778 last_heartbeat=now)
779 elif mybackedstatus == TERMINATE:
780 self.worker_status[0] = TERMINATE
781 logger.debug("Waiting to terminate the current task")
782 self.give_up()
783 return
784 elif mybackedstatus == KILL:
785 self.worker_status[0] = KILL
786 self.die()
787 else:
788 if mybackedstatus == STOP_TASK:
789 logger.info('Asked to kill the current task')
790 self.terminate_process()
791 logger.debug('........recording heartbeat (%s)', self.worker_status[0])
792 db(sw.worker_name == self.worker_name).update(
793 last_heartbeat=now, status=ACTIVE)
794 self.worker_status[1] = 1
795 if self.worker_status[0] != RUNNING:
796 self.worker_status[0] = ACTIVE
797
798 self.do_assign_tasks = False
799 if counter % 5 == 0 or mybackedstatus == PICK:
800 try:
801
802 expiration = now - datetime.timedelta(seconds=self.heartbeat * 3)
803 departure = now - datetime.timedelta(
804 seconds=self.heartbeat * 3 * MAXHIBERNATION)
805 logger.debug(
806 ' freeing workers that have not sent heartbeat')
807 inactive_workers = db(
808 ((sw.last_heartbeat < expiration) & (sw.status == ACTIVE)) |
809 ((sw.last_heartbeat < departure) & (sw.status != ACTIVE))
810 )
811 db(st.assigned_worker_name.belongs(
812 inactive_workers._select(sw.worker_name)))(st.status == RUNNING)\
813 .update(assigned_worker_name='', status=QUEUED)
814 inactive_workers.delete()
815 try:
816 self.is_a_ticker = self.being_a_ticker()
817 except:
818 logger.error('Error coordinating TICKER')
819 if self.worker_status[0] == ACTIVE:
820 self.do_assign_tasks = True
821 except:
822 logger.error('Error cleaning up')
823 db.commit()
824 except:
825 logger.error('Error retrieving status')
826 db.rollback()
827 self.adj_hibernation()
828 self.sleep()
829
831 db = self.db_thread
832 sw = db.scheduler_worker
833 all_active = db(
834 (sw.worker_name != self.worker_name) & (sw.status == ACTIVE)
835 ).select()
836 ticker = all_active.find(lambda row: row.is_ticker is True).first()
837 not_busy = self.worker_status[0] == ACTIVE
838 if not ticker:
839
840 if not_busy:
841
842 db(sw.worker_name == self.worker_name).update(is_ticker=True)
843 db(sw.worker_name != self.worker_name).update(is_ticker=False)
844 logger.info("TICKER: I'm a ticker")
845 else:
846
847 if len(all_active) >= 1:
848
849 db(sw.worker_name == self.worker_name).update(is_ticker=False)
850 else:
851 not_busy = True
852 db.commit()
853 return not_busy
854 else:
855 logger.info(
856 "%s is a ticker, I'm a poor worker" % ticker.worker_name)
857 return False
858
860 sw, st = db.scheduler_worker, db.scheduler_task
861 now = self.now()
862 all_workers = db(sw.status == ACTIVE).select()
863
864 wkgroups = {}
865 for w in all_workers:
866 group_names = w.group_names
867 for gname in group_names:
868 if gname not in wkgroups:
869 wkgroups[gname] = dict(
870 workers=[{'name': w.worker_name, 'c': 0}])
871 else:
872 wkgroups[gname]['workers'].append(
873 {'name': w.worker_name, 'c': 0})
874
875
876 db(st.status.belongs(
877 (QUEUED, ASSIGNED)))(st.stop_time < now).update(status=EXPIRED)
878
879 all_available = db(
880 (st.status.belongs((QUEUED, ASSIGNED))) &
881 ((st.times_run < st.repeats) | (st.repeats == 0)) &
882 (st.start_time <= now) &
883 ((st.stop_time == None) | (st.stop_time > now)) &
884 (st.next_run_time <= now) &
885 (st.enabled == True)
886 )
887 limit = len(all_workers) * (50 / (len(wkgroups) or 1))
888
889
890
891
892
893
894
895
896
897
898
899
900 db.commit()
901 x = 0
902 for group in wkgroups.keys():
903 tasks = all_available(st.group_name == group).select(
904 limitby=(0, limit), orderby = st.next_run_time)
905
906 for task in tasks:
907 x += 1
908 gname = task.group_name
909 ws = wkgroups.get(gname)
910 if ws:
911 counter = 0
912 myw = 0
913 for i, w in enumerate(ws['workers']):
914 if w['c'] < counter:
915 myw = i
916 counter = w['c']
917 d = dict(
918 status=ASSIGNED,
919 assigned_worker_name=wkgroups[gname]['workers'][myw]['name']
920 )
921 if not task.task_name:
922 d['task_name'] = task.function_name
923 db((st.id==task.id) & (st.status.belongs((QUEUED, ASSIGNED)))).update(**d)
924 db.commit()
925 wkgroups[gname]['workers'][myw]['c'] += 1
926
927
928 if x > 0:
929 self.empty_runs = 0
930
931
932 self.greedy = x >= limit and True or False
933 logger.info('TICKER: workers are %s', len(all_workers))
934 logger.info('TICKER: tasks are %s', x)
935
937 time.sleep(self.heartbeat * self.worker_status[1])
938
939
941 if not group_names:
942 group_names = self.group_names
943 elif isinstance(group_names, str):
944 group_names = [group_names]
945 for group in group_names:
946 self.db(
947 self.db.scheduler_worker.group_names.contains(group)
948 ).update(status=action)
949
950 - def disable(self, group_names=None):
952
953 - def resume(self, group_names=None):
955
958
959 - def kill(self, group_names=None):
961
962 - def queue_task(self, function, pargs=[], pvars={}, **kwargs):
963 """
964 Queue tasks. This takes care of handling the validation of all
965 values.
966 :param function: the function (anything callable with a __name__)
967 :param pargs: "raw" args to be passed to the function. Automatically
968 jsonified.
969 :param pvars: "raw" kwargs to be passed to the function. Automatically
970 jsonified
971 :param kwargs: all the scheduler_task columns. args and vars here should be
972 in json format already, they will override pargs and pvars
973
974 returns a dict just as a normal validate_and_insert, plus a uuid key holding
975 the uuid of the queued task. If validation is not passed, both id and uuid
976 will be None, and you'll get an "error" dict holding the errors found.
977 """
978 if hasattr(function, '__name__'):
979 function = function.__name__
980 targs = 'args' in kwargs and kwargs.pop('args') or dumps(pargs)
981 tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(pvars)
982 tuuid = 'uuid' in kwargs and kwargs.pop('uuid') or web2py_uuid()
983 tname = 'task_name' in kwargs and kwargs.pop('task_name') or function
984 immediate = 'immediate' in kwargs and kwargs.pop('immediate') or None
985 rtn = self.db.scheduler_task.validate_and_insert(
986 function_name=function,
987 task_name=tname,
988 args=targs,
989 vars=tvars,
990 uuid=tuuid,
991 **kwargs)
992 if not rtn.errors:
993 rtn.uuid = tuuid
994 if immediate:
995 self.db(self.db.scheduler_worker.is_ticker == True).update(status=PICK)
996 else:
997 rtn.uuid = None
998 return rtn
999
1001 """
1002 Shortcut for task status retrieval
1003
1004 :param ref: can be
1005 - integer --> lookup will be done by scheduler_task.id
1006 - string --> lookup will be done by scheduler_task.uuid
1007 - query --> lookup as you wish (as in db.scheduler_task.task_name == 'test1')
1008 :param output: fetch also the scheduler_run record
1009
1010 Returns a single Row object, for the last queued task
1011 If output == True, returns also the last scheduler_run record
1012 scheduler_run record is fetched by a left join, so it can
1013 have all fields == None
1014
1015 """
1016 from gluon.dal import Query
1017 sr, st = self.db.scheduler_run, self.db.scheduler_task
1018 if isinstance(ref, int):
1019 q = st.id == ref
1020 elif isinstance(ref, str):
1021 q = st.uuid == ref
1022 elif isinstance(ref, Query):
1023 q = ref
1024 else:
1025 raise SyntaxError(
1026 "You can retrieve results only by id, uuid or Query")
1027 fields = [st.ALL]
1028 left = False
1029 orderby = ~st.id
1030 if output:
1031 fields = st.ALL, sr.ALL
1032 left = sr.on(sr.task_id == st.id)
1033 orderby = ~st.id | ~sr.id
1034 row = self.db(q).select(
1035 *fields,
1036 **dict(orderby=orderby,
1037 left=left,
1038 limitby=(0, 1))
1039 ).first()
1040 if row and output:
1041 row.result = row.scheduler_run.run_result and \
1042 loads(row.scheduler_run.run_result,
1043 object_hook=_decode_dict) or None
1044 return row
1045
1047 """
1048 Experimental!!!
1049 Shortcut for task termination.
1050 If the task is RUNNING it will terminate it --> execution will be set as FAILED
1051 If the task is QUEUED, its stop_time will be set as to "now",
1052 the enabled flag will be set to False, status to STOPPED
1053
1054 :param ref: can be
1055 - integer --> lookup will be done by scheduler_task.id
1056 - string --> lookup will be done by scheduler_task.uuid
1057 Returns:
1058 - 1 if task was stopped (meaning an update has been done)
1059 - None if task was not found, or if task was not RUNNING or QUEUED
1060 """
1061 from gluon.dal import Query
1062 st, sw = self.db.scheduler_task, self.db.scheduler_worker
1063 if isinstance(ref, int):
1064 q = st.id == ref
1065 elif isinstance(ref, str):
1066 q = st.uuid == ref
1067 else:
1068 raise SyntaxError(
1069 "You can retrieve results only by id or uuid")
1070 task = self.db(q).select(st.id, st.status, st.assigned_worker_name).first()
1071 rtn = None
1072 if not task:
1073 return rtn
1074 if task.status == 'RUNNING':
1075 rtn = self.db(sw.worker_name == task.assigned_worker_name).update(status=STOP_TASK)
1076 elif task.status == 'QUEUED':
1077 rtn = self.db(q).update(stop_time=self.now(), enabled=False, status=STOPPED)
1078 return rtn
1079
1080
1082 """
1083 allows to run worker without python web2py.py .... by simply python this.py
1084 """
1085 parser = optparse.OptionParser()
1086 parser.add_option(
1087 "-w", "--worker_name", dest="worker_name", default=None,
1088 help="start a worker with name")
1089 parser.add_option(
1090 "-b", "--heartbeat", dest="heartbeat", default=10,
1091 type='int', help="heartbeat time in seconds (default 10)")
1092 parser.add_option(
1093 "-L", "--logger_level", dest="logger_level",
1094 default=30,
1095 type='int',
1096 help="set debug output level (0-100, 0 means all, 100 means none;default is 30)")
1097 parser.add_option("-E", "--empty-runs",
1098 dest="max_empty_runs",
1099 type='int',
1100 default=0,
1101 help="max loops with no grabbed tasks permitted (0 for never check)")
1102 parser.add_option(
1103 "-g", "--group_names", dest="group_names",
1104 default='main',
1105 help="comma separated list of groups to be picked by the worker")
1106 parser.add_option(
1107 "-f", "--db_folder", dest="db_folder",
1108 default='/Users/mdipierro/web2py/applications/scheduler/databases',
1109 help="location of the dal database folder")
1110 parser.add_option(
1111 "-u", "--db_uri", dest="db_uri",
1112 default='sqlite://storage.sqlite',
1113 help="database URI string (web2py DAL syntax)")
1114 parser.add_option(
1115 "-t", "--tasks", dest="tasks", default=None,
1116 help="file containing task files, must define" +
1117 "tasks = {'task_name':(lambda: 'output')} or similar set of tasks")
1118 parser.add_option(
1119 "-U", "--utc-time", dest="utc_time", default=False,
1120 help="work with UTC timestamps"
1121 )
1122 (options, args) = parser.parse_args()
1123 if not options.tasks or not options.db_uri:
1124 print USAGE
1125 if options.tasks:
1126 path, filename = os.path.split(options.tasks)
1127 if filename.endswith('.py'):
1128 filename = filename[:-3]
1129 sys.path.append(path)
1130 print 'importing tasks...'
1131 tasks = __import__(filename, globals(), locals(), [], -1).tasks
1132 print 'tasks found: ' + ', '.join(tasks.keys())
1133 else:
1134 tasks = {}
1135 group_names = [x.strip() for x in options.group_names.split(',')]
1136
1137 logging.getLogger().setLevel(options.logger_level)
1138
1139 print 'groups for this worker: ' + ', '.join(group_names)
1140 print 'connecting to database in folder: ' + options.db_folder or './'
1141 print 'using URI: ' + options.db_uri
1142 db = DAL(options.db_uri, folder=options.db_folder)
1143 print 'instantiating scheduler...'
1144 scheduler = Scheduler(db=db,
1145 worker_name=options.worker_name,
1146 tasks=tasks,
1147 migrate=True,
1148 group_names=group_names,
1149 heartbeat=options.heartbeat,
1150 max_empty_runs=options.max_empty_runs,
1151 utc_time=options.utc_time)
1152 signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1))
1153 print 'starting main worker loop...'
1154 scheduler.loop()
1155
1156 if __name__ == '__main__':
1157 main()
1158