From 65b068a5f851815eead6b64f366b31fa7dbfe553 Mon Sep 17 00:00:00 2001 From: Richard Purdie Date: Wed, 18 Aug 2010 17:13:06 +0100 Subject: bitbake/runqueue.py: Create RunQueueExecute and RunQueueExecuteTasks classes, further splitting up runqueue Signed-off-by: Richard Purdie --- bitbake/lib/bb/runqueue.py | 272 ++++++++++++++++++++++++--------------------- 1 file changed, 144 insertions(+), 128 deletions(-) (limited to 'bitbake/lib/bb') diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 9f714e46a..86d60fa05 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py @@ -696,7 +696,6 @@ class RunQueueData: self.rqdata.runq_depends[task], self.rqdata.runq_revdeps[task])) - class RunQueue: def __init__(self, cooker, cfgData, dataCache, taskData, targets): @@ -704,8 +703,6 @@ class RunQueue: self.cfgData = cfgData self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets) - self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1) - self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed" self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile" self.state = runQueuePrepare @@ -862,13 +859,14 @@ class RunQueue: if self.state is runQueueRunInit: bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue") - self.execute_runqueue_initVars() + self.rqexe = RunQueueExecuteTasks(self) + self.state = runQueueRunning if self.state is runQueueRunning: - self.execute_runqueue_internal() + self.rqexe.execute() if self.state is runQueueCleanUp: - self.finish_runqueue() + self.rqexe.finish() if self.state is runQueueFailed: if not self.rqdata.taskData.tryaltconfigs: @@ -879,7 +877,7 @@ class RunQueue: if self.state is runQueueComplete: # All done - bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed)) + bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.rqexe.stats.completed, self.rqexe.stats.skipped, self.rqexe.stats.failed)) return False if self.state is runQueueChildProcess: @@ -889,9 +887,23 @@ class RunQueue: # Loop return retval - def execute_runqueue_initVars(self): + def finish_runqueue(self, now = False): + if now: + self.rqexe.finish_now() + else: + self.rqexe.finish() - self.stats = RunQueueStats(len(self.rqdata.runq_fnid)) + +class RunQueueExecute: + + def __init__(self, rq): + self.rq = rq + self.cooker = rq.cooker + self.cfgData = rq.cfgData + self.rqdata = rq.rqdata + + self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", self.cfgData, 1) or 1) + self.scheduler = bb.data.getVar("BB_SCHEDULER", self.cfgData, 1) or "speed" self.runq_buildable = [] self.runq_running = [] @@ -900,6 +912,120 @@ class RunQueue: self.build_pipes = {} self.failed_fnids = [] + def runqueue_process_waitpid(self): + """ + Return none is there are no processes awaiting result collection, otherwise + collect the process exit codes and close the information pipe. + """ + result = os.waitpid(-1, os.WNOHANG) + if result[0] is 0 and result[1] is 0: + return None + task = self.build_pids[result[0]] + del self.build_pids[result[0]] + self.build_pipes[result[0]].close() + del self.build_pipes[result[0]] + if result[1] != 0: + self.task_fail(task, result[1]) + else: + self.task_complete(task) + self.stats.taskCompleted() + bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) + + def finish_now(self): + if self.stats.active: + bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.stats.active) + for k, v in self.build_pids.iteritems(): + try: + os.kill(-k, signal.SIGINT) + except: + pass + for pipe in self.build_pipes: + self.build_pipes[pipe].read() + + def finish(self): + self.rq.state = runQueueCleanUp + + for pipe in self.build_pipes: + self.build_pipes[pipe].read() + + try: + while self.stats.active > 0: + bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) + if self.runqueue_process_waitpid() is None: + return + except: + self.finish_now() + raise + + if len(self.failed_fnids) != 0: + self.rq.state = runQueueFailed + return + + self.rq.state = runQueueComplete + return + + def fork_off_task(self, fn, task, taskname): + sys.stdout.flush() + sys.stderr.flush() + try: + pipein, pipeout = os.pipe() + pid = os.fork() + except OSError as e: + bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror)) + if pid == 0: + os.close(pipein) + # Save out the PID so that the event can include it the + # events + bb.event.worker_pid = os.getpid() + bb.event.worker_pipe = pipeout + + self.rq.state = runQueueChildProcess + # Make the child the process group leader + os.setpgid(0, 0) + # No stdin + newsi = os.open('/dev/null', os.O_RDWR) + os.dup2(newsi, sys.stdin.fileno()) + # Stdout to a logfile + #logout = data.expand("${TMPDIR}/log/stdout.%s" % os.getpid(), self.cfgData, True) + #mkdirhier(os.path.dirname(logout)) + #newso = open(logout, 'w') + #os.dup2(newso.fileno(), sys.stdout.fileno()) + #os.dup2(newso.fileno(), sys.stderr.fileno()) + + bb.event.fire(runQueueTaskStarted(task, self.stats, self.rq), self.cfgData) + bb.msg.note(1, bb.msg.domain.RunQueue, + "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1, + self.stats.total, + task, + self.rqdata.get_user_idstring(task))) + + bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) + bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data) + try: + the_data = self.cooker.bb_cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data) + + if not self.cooker.configuration.dry_run: + bb.build.exec_task(taskname, the_data) + os._exit(0) + + except bb.build.EventException as e: + event = e.args[1] + bb.msg.error(bb.msg.domain.Build, "%s event exception, aborting" % bb.event.getName(event)) + os._exit(1) + except Exception: + from traceback import format_exc + bb.msg.error(bb.msg.domain.Build, "Build of %s %s failed" % (fn, taskname)) + bb.msg.error(bb.msg.domain.Build, format_exc()) + os._exit(1) + os._exit(0) + return pid, pipein, pipeout + +class RunQueueExecuteTasks(RunQueueExecute): + def __init__(self, rq): + RunQueueExecute.__init__(self, rq) + + self.stats = RunQueueStats(len(self.rqdata.runq_fnid)) + # Mark initial buildable tasks for task in range(self.stats.total): self.runq_running.append(0) @@ -909,8 +1035,6 @@ class RunQueue: else: self.runq_buildable.append(0) - self.state = runQueueRunning - event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData) schedulers = [obj for obj in globals().itervalues() @@ -924,6 +1048,7 @@ class RunQueue: bb.error("Available schedulers: %s" % ", ".join(obj.name for obj in schedulers)) self.sched = RunQueueSchedulerSpeed(self, self.rqdata) + def task_complete(self, task): """ Mark a task as completed @@ -955,18 +1080,18 @@ class RunQueue: self.stats.taskFailed() fnid = self.rqdata.runq_fnid[task] self.failed_fnids.append(fnid) - bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData) + bb.event.fire(runQueueTaskFailed(task, self.stats, self.rq), self.cfgData) if self.rqdata.taskData.abort: - self.state = runQueueCleanUp + self.rq.state = runQueueCleanUp - def execute_runqueue_internal(self): + def execute(self): """ Run the tasks in a queue prepared by rqdata.prepare() """ if self.stats.total == 0: # nothing to do - self.state = runQueueCleanUp + self.rq.state = runQueueCleanUp while True: task = None @@ -976,7 +1101,7 @@ class RunQueue: fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] taskname = self.rqdata.runq_task[task] - if self.check_stamp_task(task, taskname): + if self.rq.check_stamp_task(task, taskname): bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.rqdata.get_user_idstring(task))) self.runq_running[task] = 1 self.runq_buildable[task] = 1 @@ -998,12 +1123,12 @@ class RunQueue: self.build_pipes[pipe].read() if self.stats.active > 0: - if self.runqueue_process_waitpid(self.task_complete, self.task_fail) is None: + if self.runqueue_process_waitpid() is None: return continue if len(self.failed_fnids) != 0: - self.state = runQueueFailed + self.rq.state = runQueueFailed return # Sanity Checks @@ -1014,118 +1139,9 @@ class RunQueue: bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task) if self.runq_complete[task] == 0: bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task) - self.state = runQueueComplete + self.rq.state = runQueueComplete return - def runqueue_process_waitpid(self, success, failure): - """ - Return none is there are no processes awaiting result collection, otherwise - collect the process exit codes and close the information pipe. - """ - result = os.waitpid(-1, os.WNOHANG) - if result[0] is 0 and result[1] is 0: - return None - task = self.build_pids[result[0]] - del self.build_pids[result[0]] - self.build_pipes[result[0]].close() - del self.build_pipes[result[0]] - if result[1] != 0: - failure(task, result[1]) - else: - success(task) - self.stats.taskCompleted() - bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData) - - def finish_runqueue_now(self): - if self.stats.active: - bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.stats.active) - for k, v in self.build_pids.iteritems(): - try: - os.kill(-k, signal.SIGINT) - except: - pass - for pipe in self.build_pipes: - self.build_pipes[pipe].read() - - def finish_runqueue(self, now = False): - self.state = runQueueCleanUp - - for pipe in self.build_pipes: - self.build_pipes[pipe].read() - - if now: - self.finish_runqueue_now() - try: - while self.stats.active > 0: - bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) - if self.runqueue_process_waitpid(self.task_complete, self.task_fail) is None: - return - except: - self.finish_runqueue_now() - raise - - if len(self.failed_fnids) != 0: - self.state = runQueueFailed - return - - self.state = runQueueComplete - return - - def fork_off_task(self, fn, task, taskname): - sys.stdout.flush() - sys.stderr.flush() - try: - pipein, pipeout = os.pipe() - pid = os.fork() - except OSError as e: - bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror)) - if pid == 0: - os.close(pipein) - # Save out the PID so that the event can include it the - # events - bb.event.worker_pid = os.getpid() - bb.event.worker_pipe = pipeout - - self.state = runQueueChildProcess - # Make the child the process group leader - os.setpgid(0, 0) - # No stdin - newsi = os.open('/dev/null', os.O_RDWR) - os.dup2(newsi, sys.stdin.fileno()) - # Stdout to a logfile - #logout = data.expand("${TMPDIR}/log/stdout.%s" % os.getpid(), self.cfgData, True) - #mkdirhier(os.path.dirname(logout)) - #newso = open(logout, 'w') - #os.dup2(newso.fileno(), sys.stdout.fileno()) - #os.dup2(newso.fileno(), sys.stderr.fileno()) - - bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData) - bb.msg.note(1, bb.msg.domain.RunQueue, - "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1, - self.stats.total, - task, - self.rqdata.get_user_idstring(task))) - - bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) - bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data) - try: - the_data = self.cooker.bb_cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data) - - if not self.cooker.configuration.dry_run: - bb.build.exec_task(taskname, the_data) - os._exit(0) - - except bb.build.EventException as e: - event = e.args[1] - bb.msg.error(bb.msg.domain.Build, "%s event exception, aborting" % bb.event.getName(event)) - os._exit(1) - except Exception: - from traceback import format_exc - bb.msg.error(bb.msg.domain.Build, "Build of %s %s failed" % (fn, taskname)) - bb.msg.error(bb.msg.domain.Build, format_exc()) - os._exit(1) - os._exit(0) - return pid, pipein, pipeout class TaskFailure(Exception): -- cgit v1.2.3