[oe-commits] [bitbake] 01/04: bitbake: hashserv: Add request statistics

git at git.openembedded.org git at git.openembedded.org
Tue Sep 3 09:19:43 UTC 2019


This is an automated email from the git hooks/post-receive script.

rpurdie pushed a commit to branch master-next
in repository bitbake.

commit a29610bffd13d00ef95cfa7e595026233b392dae
Author: Joshua Watt <JPEWhacker at gmail.com>
AuthorDate: Wed Aug 14 15:56:06 2019 -0500

    bitbake: hashserv: Add request statistics
    
    Adds support to the bitbake hash server for recording statistics. Two
    sets of statistics are reported: The number of connections and how long
    until the requests started to be serviced on them, and the number of
    requests and how long each request took to be serviced.
    
    Statistics are available via an HTTP end point on the server:
    
     curl http://server:8686/v1/stats | jq
    
    The statistics can be reset by issued an HTTP DELETE request to that end
    point:
    
     curl -X DELETE http://server:8686/v1/stats
    
    Signed-off-by: Joshua Watt <JPEWhacker at gmail.com>
    Signed-off-by: Richard Purdie <richard.purdie at linuxfoundation.org>
---
 lib/hashserv/__init__.py | 175 ++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 152 insertions(+), 23 deletions(-)

diff --git a/lib/hashserv/__init__.py b/lib/hashserv/__init__.py
index eb03c32..23528eb 100644
--- a/lib/hashserv/__init__.py
+++ b/lib/hashserv/__init__.py
@@ -16,11 +16,80 @@ import threading
 import signal
 import socket
 import struct
+import time
+import math
 from datetime import datetime
 
 logger = logging.getLogger('hashserv')
 
+class Stats(object):
+    def __init__(self):
+        self.reset()
+
+    def reset(self):
+        self.num = 0
+        self.total_time = 0
+        self.max_time = 0
+        self.m = 0
+        self.s = 0
+
+    def update(self, start_time):
+        elapsed = time.perf_counter() - start_time
+
+        self.num += 1
+        if self.num == 1:
+            self.m = elapsed
+            self.s = 0
+        else:
+            last_m = self.m
+            self.m = last_m + (elapsed - last_m) / self.num
+            self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
+
+        self.total_time += elapsed
+
+        if self.max_time < elapsed:
+            self.max_time = elapsed
+
+    @property
+    def average(self):
+        if self.num == 0:
+            return 0
+        return self.total_time / self.num
+
+    @property
+    def stdev(self):
+        if self.num <= 1:
+            return 0
+        return math.sqrt(self.s / (self.num - 1))
+
+    def todict(self):
+        return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
+
+request_stats = Stats()
+connect_stats = Stats()
+
 class HashEquivalenceServer(BaseHTTPRequestHandler):
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.equivalent_path = self.prefix + '/v1/equivalent'
+        self.stats_path = self.prefix + '/v1/stats'
+
+    def setup(self):
+        global connect_stats
+
+        if self.request.connect_time is not None:
+            connect_stats.update(self.request.connect_time)
+            self.request.connect_time = None
+
+        self.start_time = time.perf_counter()
+        super().setup()
+
+    def finish(self):
+        global request_stats
+
+        super().finish()
+        request_stats.update(self.start_time)
+
     def log_message(self, f, *args):
         logger.debug(f, *args)
 
@@ -30,6 +99,54 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
         self.db.execute("PRAGMA synchronous = OFF;")
         self.db.execute("PRAGMA journal_mode = MEMORY;")
 
+    def do_GET_equivalent(self, p):
+        query = urllib.parse.parse_qs(p.query, strict_parsing=True)
+        method = query['method'][0]
+        taskhash = query['taskhash'][0]
+
+        d = None
+        with contextlib.closing(self.db.cursor()) as cursor:
+            cursor.execute('SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1',
+                    {'method': method, 'taskhash': taskhash})
+
+            row = cursor.fetchone()
+
+            if row is not None:
+                logger.debug('Found equivalent task %s', row['taskhash'])
+                d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
+
+        data = json.dumps(d).encode('utf-8')
+        self.send_response(200)
+        self.send_header('Content-Type', 'application/json; charset=utf-8')
+        self.send_header('Content-length', str(len(data)))
+        self.end_headers()
+        self.wfile.write(data)
+
+    def do_GET_stats(self, p):
+        global request_stats
+        global connect_stats
+        d = {
+            'requests': request_stats.todict(),
+            'connections': connect_stats.todict()
+            }
+
+        data = json.dumps(d).encode('utf-8')
+        self.send_response(200)
+        self.send_header('Content-Type', 'application/json; charset=utf-8')
+        self.send_header('Content-length', str(len(data)))
+        self.end_headers()
+        self.wfile.write(data)
+
+    def do_DELETE_stats(self, p):
+        global request_stats
+        global connect_stats
+
+        request_stats.reset()
+        connect_stats.reset()
+
+        self.send_response(200)
+        self.end_headers()
+
     def do_GET(self):
         try:
             if not self.db:
@@ -37,33 +154,16 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
 
             p = urllib.parse.urlparse(self.path)
 
-            if p.path != self.prefix + '/v1/equivalent':
+            if p.path == self.prefix + '/v1/equivalent':
+                self.do_GET_equivalent(p)
+            elif p.path == self.prefix + '/v1/stats':
+                self.do_GET_stats(p)
+            else:
                 self.send_error(404)
                 return
-
-            query = urllib.parse.parse_qs(p.query, strict_parsing=True)
-            method = query['method'][0]
-            taskhash = query['taskhash'][0]
-
-            d = None
-            with contextlib.closing(self.db.cursor()) as cursor:
-                cursor.execute('SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1',
-                        {'method': method, 'taskhash': taskhash})
-
-                row = cursor.fetchone()
-
-                if row is not None:
-                    logger.debug('Found equivalent task %s', row['taskhash'])
-                    d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
-
-            self.send_response(200)
-            self.send_header('Content-Type', 'application/json; charset=utf-8')
-            self.end_headers()
-            self.wfile.write(json.dumps(d).encode('utf-8'))
         except:
             logger.exception('Error in GET')
             self.send_error(400, explain=traceback.format_exc())
-            return
 
     def do_POST(self):
         try:
@@ -139,7 +239,32 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
         except:
             logger.exception('Error in POST')
             self.send_error(400, explain=traceback.format_exc())
-            return
+
+    def do_DELETE(self):
+        try:
+            p = urllib.parse.urlparse(self.path)
+
+            if p.path == self.prefix + '/v1/stats':
+                self.do_DELETE_stats(p)
+            else:
+                self.send_error(404)
+                return
+        except:
+            logger.exception('Error in DELETE')
+            self.send_error(400, explain=traceback.format_exc())
+
+class ClientSocket(socket.socket):
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.connect_time = time.perf_counter()
+
+    @classmethod
+    def create(cls, sock):
+        import _socket
+        fd = _socket.dup(sock.fileno())
+        s = cls(sock.family, sock.type, sock.proto, fileno=fd)
+        s.settimeout(sock.gettimeout())
+        return s
 
 class ThreadedHTTPServer(HTTPServer):
     quit = False
@@ -188,6 +313,10 @@ class ThreadedHTTPServer(HTTPServer):
         self.requestqueue.put((None, None))
         self.handlerthread.join()
 
+    def get_request(self):
+        sock, client_address = super().get_request()
+        return ClientSocket.create(sock), client_address
+
 def create_server(addr, dbname, prefix=''):
     class Handler(HashEquivalenceServer):
         pass

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.


More information about the Openembedded-commits mailing list