From 32ea7668712a50d8f8b67d5e4558039e5092a485 Mon Sep 17 00:00:00 2001 From: Chris Larson Date: Thu, 18 Nov 2010 20:21:54 -0700 Subject: Implement parallel parsing support This utilizes python's multiprocessing module. The default number of threads to be used is the same as the number of available processor cores, however, you can manually set this with the BB_NUMBER_PARSE_THREADS variable. (Bitbake rev: c7b3ec819549e51e438d293969e205883fee725f) Signed-off-by: Chris Larson Signed-off-by: Richard Purdie --- bitbake/lib/bb/cooker.py | 135 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 97 insertions(+), 38 deletions(-) (limited to 'bitbake/lib/bb/cooker.py') diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py index 6194919e4..0143c149b 100644 --- a/bitbake/lib/bb/cooker.py +++ b/bitbake/lib/bb/cooker.py @@ -25,6 +25,8 @@ from __future__ import print_function import sys, os, glob, os.path, re, time import logging import sre_constants +import multiprocessing +import signal from cStringIO import StringIO from contextlib import closing import bb @@ -976,7 +978,7 @@ class CookerExit(bb.event.Event): def __init__(self): bb.event.Event.__init__(self) -class CookerParser: +class CookerParser(object): def __init__(self, cooker, filelist, masked): # Internal data self.filelist = filelist @@ -987,49 +989,106 @@ class CookerParser: self.cached = 0 self.error = 0 self.masked = masked - self.total = len(filelist) self.skipped = 0 self.virtuals = 0 + self.total = len(filelist) - # Pointer to the next file to parse - self.pointer = 0 - - def parse_next(self): - cooker = self.cooker - if self.pointer < len(self.filelist): - f = self.filelist[self.pointer] - - try: - fromCache, skipped, virtuals = cooker.bb_cache.loadData(f, cooker.get_file_appends(f), cooker.configuration.data, cooker.status) - if fromCache: - self.cached += 1 - else: - self.parsed += 1 - - self.skipped += skipped - self.virtuals += virtuals + # current to the next file to parse + self.current = 0 + self.result_queue = None + self.fromcache = None - except KeyboardInterrupt: - cooker.bb_cache.remove(f) - cooker.bb_cache.sync() - raise - except Exception as e: - self.error += 1 - cooker.bb_cache.remove(f) - parselog.exception("Unable to open %s", f) - except: - cooker.bb_cache.remove(f) - raise - finally: - bb.event.fire(bb.event.ParseProgress(self.cached, self.parsed, self.skipped, self.masked, self.virtuals, self.error, self.total), cooker.configuration.event_data) + self.launch_processes() - self.pointer += 1 + def launch_processes(self): + self.task_queue = multiprocessing.Queue() + self.result_queue = multiprocessing.Queue() + + self.fromcache = [] + cfgdata = self.cooker.configuration.data + for filename in self.filelist: + appends = self.cooker.get_file_appends(filename) + if not self.cooker.bb_cache.cacheValid(filename): + self.task_queue.put((filename, appends)) + else: + self.fromcache.append((filename, appends)) + + def worker(input, output, cfgdata): + signal.signal(signal.SIGINT, signal.SIG_IGN) + for filename, appends in iter(input.get, 'STOP'): + infos = bb.cache.Cache.parse(filename, appends, cfgdata) + output.put(infos) + + self.processes = [] + num_processes = int(cfgdata.getVar("BB_NUMBER_PARSE_THREADS", True) or + multiprocessing.cpu_count()) + for i in xrange(num_processes): + process = multiprocessing.Process(target=worker, + args=(self.task_queue, + self.result_queue, + cfgdata)) + process.start() + self.processes.append(process) + + def shutdown(self, clean=True): + self.result_queue.close() + for process in self.processes: + if clean: + self.task_queue.put('STOP') + else: + process.terminate() + self.task_queue.close() + for process in self.processes: + process.join() + self.cooker.bb_cache.sync() + bb.codeparser.parser_cache_save(self.cooker.configuration.data) + if self.error > 0: + raise ParsingErrorsFound() + + def progress(self): + bb.event.fire(bb.event.ParseProgress(self.cached, self.parsed, + self.skipped, self.masked, + self.virtuals, self.error, + self.total), + self.cooker.configuration.event_data) - if self.pointer >= self.total: - cooker.bb_cache.sync() - bb.codeparser.parser_cache_save(cooker.configuration.data) - if self.error > 0: - raise ParsingErrorsFound + def parse_next(self): + cooker = self.cooker + if self.current >= self.total: + self.shutdown() return False + + try: + if self.result_queue.empty() and self.fromcache: + filename, appends = self.fromcache.pop() + _, infos = cooker.bb_cache.load(filename, appends, + self.cooker.configuration.data) + parsed = False + else: + infos = self.result_queue.get() + parsed = True + except KeyboardInterrupt: + self.shutdown(clean=False) + raise + except Exception as e: + self.error += 1 + parselog.critical(str(e)) + else: + if parsed: + self.parsed += 1 + else: + self.cached += 1 + self.virtuals += len(infos) + + for virtualfn, info in infos: + cooker.bb_cache.add_info(virtualfn, info, cooker.status, + parsed=parsed) + if info.skipped: + self.skipped += 1 + finally: + self.progress() + + self.current += 1 return True + -- cgit v1.2.3