[oe-commits] [bitbake] 02/04: bitbake: hashserve: Log I/O waits and SQL time

git at git.openembedded.org git at git.openembedded.org
Tue Sep 3 09:19:44 UTC 2019


This is an automated email from the git hooks/post-receive script.

rpurdie pushed a commit to branch master-next
in repository bitbake.

commit 25abf8e44dc311b0cefbf67f72404ed7b9c79b71
Author: Joshua Watt <JPEWhacker at gmail.com>
AuthorDate: Thu Aug 15 11:41:56 2019 -0500

    bitbake: hashserve: Log I/O waits and SQL time
    
    Keeps additional statistics about the amount of time the server is
    spending waiting on I/O and the amount of time waiting for sqlite.
    
    Also ensures that stat updates are thread safe
    
    Signed-off-by: Joshua Watt <JPEWhacker at gmail.com>
    Signed-off-by: Richard Purdie <richard.purdie at linuxfoundation.org>
---
 lib/hashserv/__init__.py | 218 ++++++++++++++++++++++++++++++-----------------
 1 file changed, 140 insertions(+), 78 deletions(-)

diff --git a/lib/hashserv/__init__.py b/lib/hashserv/__init__.py
index 23528eb..228586a 100644
--- a/lib/hashserv/__init__.py
+++ b/lib/hashserv/__init__.py
@@ -18,54 +18,84 @@ import socket
 import struct
 import time
 import math
+import threading
+from contextlib import contextmanager
 from datetime import datetime
 
 logger = logging.getLogger('hashserv')
 
+class Sample(object):
+    def __init__(self, stats):
+        self.elapsed = 0
+        self.stats = stats
+
+    def add(self, elapsed):
+        self.elapsed += elapsed
+
+    def end(self):
+        self.stats.add(self.elapsed)
+
+    def __enter__(self):
+        self.start_time = time.perf_counter()
+        return self
+
+    def __exit__(self, *args, **kwargs):
+        self.add(time.perf_counter() - self.start_time)
+
 class Stats(object):
     def __init__(self):
+        self.lock = threading.RLock()
         self.reset()
 
     def reset(self):
-        self.num = 0
-        self.total_time = 0
-        self.max_time = 0
-        self.m = 0
-        self.s = 0
-
-    def update(self, start_time):
-        elapsed = time.perf_counter() - start_time
-
-        self.num += 1
-        if self.num == 1:
-            self.m = elapsed
+        with self.lock:
+            self.num = 0
+            self.total_time = 0
+            self.max_time = 0
+            self.m = 0
             self.s = 0
-        else:
-            last_m = self.m
-            self.m = last_m + (elapsed - last_m) / self.num
-            self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
+            self.current_elapsed = None
+
+    def add(self, elapsed):
+        with self.lock:
+            self.num += 1
+            if self.num == 1:
+                self.m = elapsed
+                self.s = 0
+            else:
+                last_m = self.m
+                self.m = last_m + (elapsed - last_m) / self.num
+                self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
 
-        self.total_time += elapsed
+            self.total_time += elapsed
 
-        if self.max_time < elapsed:
-            self.max_time = elapsed
+            if self.max_time < elapsed:
+                self.max_time = elapsed
+
+    def start_sample(self):
+        return Sample(self)
 
     @property
     def average(self):
-        if self.num == 0:
-            return 0
-        return self.total_time / self.num
+        with self.lock:
+            if self.num == 0:
+                return 0
+            return self.total_time / self.num
 
     @property
     def stdev(self):
-        if self.num <= 1:
-            return 0
-        return math.sqrt(self.s / (self.num - 1))
+        with self.lock:
+            if self.num <= 1:
+                return 0
+            return math.sqrt(self.s / (self.num - 1))
 
     def todict(self):
-        return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
+        with self.lock:
+            return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
 
 request_stats = Stats()
+io_stats = Stats()
+sql_stats = Stats()
 connect_stats = Stats()
 
 class HashEquivalenceServer(BaseHTTPRequestHandler):
@@ -76,19 +106,33 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
 
     def setup(self):
         global connect_stats
+        global request_stats
+        global io_stats
+        global sql_stats
 
         if self.request.connect_time is not None:
-            connect_stats.update(self.request.connect_time)
+            connect_stats.add(time.perf_counter() - self.request.connect_time)
             self.request.connect_time = None
 
+        self.io_sample = io_stats.start_sample()
+        self.sql_sample = sql_stats.start_sample()
+
         self.start_time = time.perf_counter()
+
         super().setup()
 
     def finish(self):
         global request_stats
 
         super().finish()
-        request_stats.update(self.start_time)
+        request_stats.add(time.perf_counter() - self.start_time)
+
+        self.io_sample.end()
+        self.sql_sample.end()
+
+    def handle_one_request(self):
+        self.start_read_time = time.perf_counter()
+        super().handle_one_request()
 
     def log_message(self, f, *args):
         logger.debug(f, *args)
@@ -106,37 +150,30 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
 
         d = None
         with contextlib.closing(self.db.cursor()) as cursor:
-            cursor.execute('SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1',
-                    {'method': method, 'taskhash': taskhash})
+            with self.sql_sample:
+                cursor.execute('SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1',
+                        {'method': method, 'taskhash': taskhash})
 
-            row = cursor.fetchone()
+                row = cursor.fetchone()
 
             if row is not None:
                 logger.debug('Found equivalent task %s', row['taskhash'])
                 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
 
-        data = json.dumps(d).encode('utf-8')
-        self.send_response(200)
-        self.send_header('Content-Type', 'application/json; charset=utf-8')
-        self.send_header('Content-length', str(len(data)))
-        self.end_headers()
-        self.wfile.write(data)
+        return d
 
     def do_GET_stats(self, p):
         global request_stats
         global connect_stats
-        d = {
+        global io_stats
+
+        return {
             'requests': request_stats.todict(),
-            'connections': connect_stats.todict()
+            'connections': connect_stats.todict(),
+            'io': io_stats.todict(),
+            'sql': sql_stats.todict(),
             }
 
-        data = json.dumps(d).encode('utf-8')
-        self.send_response(200)
-        self.send_header('Content-Type', 'application/json; charset=utf-8')
-        self.send_header('Content-length', str(len(data)))
-        self.end_headers()
-        self.wfile.write(data)
-
     def do_DELETE_stats(self, p):
         global request_stats
         global connect_stats
@@ -148,6 +185,8 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
         self.end_headers()
 
     def do_GET(self):
+        self.io_sample.add(time.perf_counter() - self.start_read_time)
+
         try:
             if not self.db:
                 self.opendb()
@@ -155,17 +194,31 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
             p = urllib.parse.urlparse(self.path)
 
             if p.path == self.prefix + '/v1/equivalent':
-                self.do_GET_equivalent(p)
+                d = self.do_GET_equivalent(p)
             elif p.path == self.prefix + '/v1/stats':
-                self.do_GET_stats(p)
+                d = self.do_GET_stats(p)
             else:
-                self.send_error(404)
+                with self.io_sample:
+                    self.send_error(404)
                 return
+
+            data = json.dumps(d).encode('utf-8')
+
+            with self.io_sample:
+                self.send_response(200)
+                self.send_header('Content-Type', 'application/json; charset=utf-8')
+                self.send_header('Content-length', str(len(data)))
+                self.end_headers()
+                self.wfile.write(data)
+                self.wfile.flush()
         except:
             logger.exception('Error in GET')
-            self.send_error(400, explain=traceback.format_exc())
+            with self.io_sample:
+                self.send_error(400, explain=traceback.format_exc())
 
     def do_POST(self):
+        self.io_sample.add(time.perf_counter() - self.start_read_time)
+
         try:
             if not self.db:
                 self.opendb()
@@ -173,29 +226,32 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
             p = urllib.parse.urlparse(self.path)
 
             if p.path != self.prefix + '/v1/equivalent':
-                self.send_error(404)
+                with self.io_sample:
+                    self.send_error(404)
                 return
 
             length = int(self.headers['content-length'])
-            data = json.loads(self.rfile.read(length).decode('utf-8'))
+            with self.io_sample:
+                data = json.loads(self.rfile.read(length).decode('utf-8'))
 
             with contextlib.closing(self.db.cursor()) as cursor:
-                cursor.execute('''
-                    -- Find tasks with a matching outhash (that is, tasks that
-                    -- are equivalent)
-                    SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND outhash=:outhash
+                with self.sql_sample:
+                    cursor.execute('''
+                        -- Find tasks with a matching outhash (that is, tasks that
+                        -- are equivalent)
+                        SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND outhash=:outhash
 
-                    -- If there is an exact match on the taskhash, return it.
-                    -- Otherwise return the oldest matching outhash of any
-                    -- taskhash
-                    ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END,
-                        created ASC
+                        -- If there is an exact match on the taskhash, return it.
+                        -- Otherwise return the oldest matching outhash of any
+                        -- taskhash
+                        ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END,
+                            created ASC
 
-                    -- Only return one row
-                    LIMIT 1
-                    ''', {k: data[k] for k in ('method', 'outhash', 'taskhash')})
+                        -- Only return one row
+                        LIMIT 1
+                        ''', {k: data[k] for k in ('method', 'outhash', 'taskhash')})
 
-                row = cursor.fetchone()
+                    row = cursor.fetchone()
 
                 # If no matching outhash was found, or one *was* found but it
                 # wasn't an exact match on the taskhash, a new entry for this
@@ -220,25 +276,29 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
                         if k in data:
                             insert_data[k] = data[k]
 
-                    cursor.execute('''INSERT INTO tasks_v2 (%s) VALUES (%s)''' % (
-                            ', '.join(sorted(insert_data.keys())),
-                            ', '.join(':' + k for k in sorted(insert_data.keys()))),
-                        insert_data)
+                    with self.sql_sample:
+                        cursor.execute('''INSERT INTO tasks_v2 (%s) VALUES (%s)''' % (
+                                ', '.join(sorted(insert_data.keys())),
+                                ', '.join(':' + k for k in sorted(insert_data.keys()))),
+                            insert_data)
 
-                    logger.info('Adding taskhash %s with unihash %s', data['taskhash'], unihash)
+                        logger.info('Adding taskhash %s with unihash %s', data['taskhash'], unihash)
 
-                    self.db.commit()
+                        self.db.commit()
                     d = {'taskhash': data['taskhash'], 'method': data['method'], 'unihash': unihash}
                 else:
                     d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
 
-                self.send_response(200)
-                self.send_header('Content-Type', 'application/json; charset=utf-8')
-                self.end_headers()
-                self.wfile.write(json.dumps(d).encode('utf-8'))
+                with self.io_sample:
+                    self.send_response(200)
+                    self.send_header('Content-Type', 'application/json; charset=utf-8')
+                    self.end_headers()
+                    self.wfile.write(json.dumps(d).encode('utf-8'))
+                    self.wfile.flush()
         except:
             logger.exception('Error in POST')
-            self.send_error(400, explain=traceback.format_exc())
+            with self.io_sample:
+                self.send_error(400, explain=traceback.format_exc())
 
     def do_DELETE(self):
         try:
@@ -247,11 +307,13 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
             if p.path == self.prefix + '/v1/stats':
                 self.do_DELETE_stats(p)
             else:
-                self.send_error(404)
+                with self.io_sample:
+                    self.send_error(404)
                 return
         except:
             logger.exception('Error in DELETE')
-            self.send_error(400, explain=traceback.format_exc())
+            with self.io_sample:
+                self.send_error(400, explain=traceback.format_exc())
 
 class ClientSocket(socket.socket):
     def __init__(self, *args, **kwargs):

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.


More information about the Openembedded-commits mailing list