From 22a271aaa99daeb6b29c42c2c1dc670bf204310e Mon Sep 17 00:00:00 2001 From: Richard Purdie Date: Tue, 19 Jan 2010 14:48:19 +0000 Subject: bitbake-dev: Sync with upstream Signed-off-by: Richard Purdie --- bitbake-dev/lib/bb/runqueue.py | 98 +++++++++++++++++++++++++++++++----------- 1 file changed, 73 insertions(+), 25 deletions(-) (limited to 'bitbake-dev/lib/bb/runqueue.py') diff --git a/bitbake-dev/lib/bb/runqueue.py b/bitbake-dev/lib/bb/runqueue.py index 8b6e12d18..c3ad442e4 100644 --- a/bitbake-dev/lib/bb/runqueue.py +++ b/bitbake-dev/lib/bb/runqueue.py @@ -857,6 +857,7 @@ class RunQueue: self.runq_running = [] self.runq_complete = [] self.build_pids = {} + self.build_pipes = {} self.failed_fnids = [] # Mark initial buildable tasks @@ -870,7 +871,7 @@ class RunQueue: self.state = runQueueRunning - event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp, self.cfgData)) + event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp), self.cfgData) def task_complete(self, task): """ @@ -903,7 +904,7 @@ class RunQueue: self.stats.taskFailed() fnid = self.runq_fnid[task] self.failed_fnids.append(fnid) - bb.event.fire(runQueueTaskFailed(task, self.stats, self, self.cfgData)) + bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData) if self.taskData.abort: self.state = runQueueCleanup @@ -935,53 +936,67 @@ class RunQueue: sys.stdout.flush() sys.stderr.flush() - try: + try: + pipein, pipeout = os.pipe() pid = os.fork() except OSError, e: bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror)) if pid == 0: + os.close(pipein) # Save out the PID so that the event can include it the # events bb.event.worker_pid = os.getpid() + bb.event.worker_pipe = pipeout - bb.event.fire(runQueueTaskStarted(task, self.stats, self, self.cfgData)) - bb.msg.note(1, bb.msg.domain.RunQueue, - "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + 1, - self.stats.total, - task, - self.get_user_idstring(task))) self.state = runQueueChildProcess # Make the child the process group leader os.setpgid(0, 0) + # No stdin newsi = os.open('/dev/null', os.O_RDWR) os.dup2(newsi, sys.stdin.fileno()) + + bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData) + bb.msg.note(1, bb.msg.domain.RunQueue, + "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + 1, + self.stats.total, + task, + self.get_user_idstring(task))) + bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) try: self.cooker.tryBuild(fn, taskname[3:]) except bb.build.EventException: bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") - sys.exit(1) + os._exit(1) except: bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") - raise - sys.exit(0) + os._exit(1) + os._exit(0) + self.build_pids[pid] = task + self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData) self.runq_running[task] = 1 self.stats.taskActive() if self.stats.active < self.number_tasks: continue + + for pipe in self.build_pipes: + self.build_pipes[pipe].read() + if self.stats.active > 0: result = os.waitpid(-1, os.WNOHANG) if result[0] is 0 and result[1] is 0: return task = self.build_pids[result[0]] del self.build_pids[result[0]] + self.build_pipes[result[0]].close() + del self.build_pipes[result[0]] if result[1] != 0: self.task_fail(task, result[1]) return self.task_complete(task) self.stats.taskCompleted() - bb.event.fire(runQueueTaskCompleted(task, self.stats, self, self.cfgData)) + bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData) continue if len(self.failed_fnids) != 0: @@ -1006,6 +1021,8 @@ class RunQueue: os.kill(-k, signal.SIGINT) except: pass + for pipe in self.build_pipes: + self.build_pipes[pipe].read() def finish_runqueue(self, now = False): self.state = runQueueCleanUp @@ -1013,7 +1030,7 @@ class RunQueue: self.finish_runqueue_now() try: while self.stats.active > 0: - bb.event.fire(runQueueExitWait(self.stats.active, self.cfgData)) + bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.stats.active) tasknum = 1 for k, v in self.build_pids.iteritems(): @@ -1024,11 +1041,13 @@ class RunQueue: return task = self.build_pids[result[0]] del self.build_pids[result[0]] + self.build_pipes[result[0]].close() + del self.build_pipes[result[0]] if result[1] != 0: self.task_fail(task, result[1]) else: self.stats.taskCompleted() - bb.event.fire(runQueueTaskCompleted(task, self.stats, self, self.cfgData)) + bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData) except: self.finish_runqueue_now() raise @@ -1078,43 +1097,43 @@ class runQueueExitWait(bb.event.Event): Event when waiting for task processes to exit """ - def __init__(self, remain, d): + def __init__(self, remain): self.remain = remain self.message = "Waiting for %s active tasks to finish" % remain - bb.event.Event.__init__(self, d) + bb.event.Event.__init__(self) class runQueueEvent(bb.event.Event): """ Base runQueue event class """ - def __init__(self, task, stats, rq, d): + def __init__(self, task, stats, rq): self.taskid = task self.taskstring = rq.get_user_idstring(task) self.stats = stats - bb.event.Event.__init__(self, d) + bb.event.Event.__init__(self) class runQueueTaskStarted(runQueueEvent): """ Event notifing a task was started """ - def __init__(self, task, stats, rq, d): - runQueueEvent.__init__(self, task, stats, rq, d) + def __init__(self, task, stats, rq): + runQueueEvent.__init__(self, task, stats, rq) self.message = "Running task %s (%d of %d) (%s)" % (task, stats.completed + stats.active + 1, self.stats.total, self.taskstring) class runQueueTaskFailed(runQueueEvent): """ Event notifing a task failed """ - def __init__(self, task, stats, rq, d): - runQueueEvent.__init__(self, task, stats, rq, d) + def __init__(self, task, stats, rq): + runQueueEvent.__init__(self, task, stats, rq) self.message = "Task %s failed (%s)" % (task, self.taskstring) class runQueueTaskCompleted(runQueueEvent): """ Event notifing a task completed """ - def __init__(self, task, stats, rq, d): - runQueueEvent.__init__(self, task, stats, rq, d) + def __init__(self, task, stats, rq): + runQueueEvent.__init__(self, task, stats, rq) self.message = "Task %s completed (%s)" % (task, self.taskstring) def check_stamp_fn(fn, taskname, d): @@ -1124,3 +1143,32 @@ def check_stamp_fn(fn, taskname, d): if taskid is not None: return rq.check_stamp_task(taskid) return None + +class runQueuePipe(): + """ + Abstraction for a pipe between a worker thread and the server + """ + def __init__(self, pipein, pipeout, d): + self.fd = pipein + os.close(pipeout) + self.queue = "" + self.d = d + + def read(self): + start = len(self.queue) + self.queue = self.queue + os.read(self.fd, 1024) + end = len(self.queue) + index = self.queue.find("") + while index != -1: + bb.event.fire_from_worker(self.queue[:index+8], self.d) + self.queue = self.queue[index+8:] + index = self.queue.find("") + return (end > start) + + def close(self): + while self.read(): + continue + if len(self.queue) > 0: + print "Warning, worker left partial message" + os.close(self.fd) + -- cgit v1.2.3