[oe-commits] [bitbake] 03/04: bitbake: hashserv: Add more threads

git at git.openembedded.org git at git.openembedded.org
Sun Sep 1 21:39:54 UTC 2019


This is an automated email from the git hooks/post-receive script.

rpurdie pushed a commit to branch master-next
in repository bitbake.

commit c9a9c3d89e2d77e2c57294a9dc637509397c0629
Author: Joshua Watt <JPEWhacker at gmail.com>
AuthorDate: Thu Aug 15 12:54:30 2019 -0500

    bitbake: hashserv: Add more threads
    
    Adds support for running the hash server with more threads. If the
    server is invoked automatically by bitbake it will use a single thread
    because in the presence of a single client this is the fastest
    mechanism.
    
    If the server is started manually on the command line, it is assumed
    that multiple clients will be connecting, so it will use one thread per
    CPU core (although since the server might be I/O bound not CPU bound,
    this still may not be sufficient for optimal performance).
    
    Signed-off-by: Joshua Watt <JPEWhacker at gmail.com>
    Signed-off-by: Richard Purdie <richard.purdie at linuxfoundation.org>
---
 bin/bitbake-hashserv     |  5 ++++-
 lib/hashserv/__init__.py | 40 +++++++++++++++++++++++++++-------------
 2 files changed, 31 insertions(+), 14 deletions(-)

diff --git a/bin/bitbake-hashserv b/bin/bitbake-hashserv
index 6c911c0..222dff8 100755
--- a/bin/bitbake-hashserv
+++ b/bin/bitbake-hashserv
@@ -10,6 +10,7 @@ import sys
 import logging
 import argparse
 import sqlite3
+import multiprocessing
 
 sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(__file__)),'lib'))
 
@@ -27,6 +28,8 @@ def main():
     parser.add_argument('--prefix', default='', help='HTTP path prefix (default "%(default)s")')
     parser.add_argument('--database', default='./hashserv.db', help='Database file (default "%(default)s")')
     parser.add_argument('--log', default='WARNING', help='Set logging level')
+    parser.add_argument('--threads', '-j', type=int, default=multiprocessing.cpu_count(),
+                        help='Number of server threads. Default is %(default)d')
 
     args = parser.parse_args()
 
@@ -41,7 +44,7 @@ def main():
     console.setLevel(level)
     logger.addHandler(console)
 
-    server = hashserv.create_server((args.address, args.port), args.database, args.prefix)
+    server = hashserv.create_server((args.address, args.port), args.database, args.prefix, num_threads=args.threads)
     server.serve_forever()
     return 0
 
diff --git a/lib/hashserv/__init__.py b/lib/hashserv/__init__.py
index 228586a..3411bf5 100644
--- a/lib/hashserv/__init__.py
+++ b/lib/hashserv/__init__.py
@@ -329,14 +329,22 @@ class ClientSocket(socket.socket):
         return s
 
 class ThreadedHTTPServer(HTTPServer):
-    quit = False
+    # The quit event needs to be a distinct object. Making a plain object() and
+    # checking it with "is" works well
+    quit = object()
+
+    def __init__(self, *args, num_threads=1, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.num_threads = num_threads
 
     def serve_forever(self):
+        logger.debug('Using %d threads' % self.num_threads)
         self.requestqueue = queue.Queue()
-        self.handlerthread = threading.Thread(target=self.process_request_thread)
-        self.handlerthread.daemon = False
+        self.handlerthreads = [threading.Thread(target=self.process_request_thread) for i in range(self.num_threads)]
 
-        self.handlerthread.start()
+        for t in self.handlerthreads:
+            t.daemon = False
+            t.start()
 
         signal.signal(signal.SIGTERM, self.sigterm_exception)
         super().serve_forever()
@@ -351,13 +359,18 @@ class ThreadedHTTPServer(HTTPServer):
         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0))
 
     def process_request_thread(self):
-        while not self.quit:
+        while True:
             try:
-                (request, client_address) = self.requestqueue.get(True)
+                d = self.requestqueue.get(True)
+                if d is self.quit:
+                    # Put the item back in the queue for another thread to get
+                    self.requestqueue.put(d)
+                    break
+
+                (request, client_address) = d
             except queue.Empty:
                 continue
-            if request is None:
-                continue
+
             try:
                 self.finish_request(request, client_address)
             except Exception:
@@ -371,15 +384,16 @@ class ThreadedHTTPServer(HTTPServer):
 
     def server_close(self):
         super().server_close()
-        self.quit = True
-        self.requestqueue.put((None, None))
-        self.handlerthread.join()
+
+        self.requestqueue.put(self.quit)
+        for t in self.handlerthreads:
+            t.join()
 
     def get_request(self):
         sock, client_address = super().get_request()
         return ClientSocket.create(sock), client_address
 
-def create_server(addr, dbname, prefix=''):
+def create_server(addr, dbname, prefix='', num_threads=1):
     class Handler(HashEquivalenceServer):
         pass
 
@@ -414,7 +428,7 @@ def create_server(addr, dbname, prefix=''):
         cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup ON tasks_v2 (method, taskhash)')
         cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup ON tasks_v2 (method, outhash)')
 
-    ret = ThreadedHTTPServer(addr, Handler)
+    ret = ThreadedHTTPServer(addr, Handler, num_threads=num_threads)
 
     logger.info('Starting server on %s\n', ret.server_port)
 

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.


More information about the Openembedded-commits mailing list