[oe-commits] [bitbake] 01/04: bitbake: hashserv: Add request statistics
git at git.openembedded.org
git at git.openembedded.org
Sat Sep 7 12:10:05 UTC 2019
This is an automated email from the git hooks/post-receive script.
rpurdie pushed a commit to branch master-next
in repository bitbake.
commit 37e957d3de57d5f477f24f7c31d2b0a510669f22
Author: Joshua Watt <JPEWhacker at gmail.com>
AuthorDate: Wed Aug 14 15:56:06 2019 -0500
bitbake: hashserv: Add request statistics
Adds support to the bitbake hash server for recording statistics. Two
sets of statistics are reported: The number of connections and how long
until the requests started to be serviced on them, and the number of
requests and how long each request took to be serviced.
Statistics are available via an HTTP end point on the server:
curl http://server:8686/v1/stats | jq
The statistics can be reset by issued an HTTP DELETE request to that end
point:
curl -X DELETE http://server:8686/v1/stats
Signed-off-by: Joshua Watt <JPEWhacker at gmail.com>
Signed-off-by: Richard Purdie <richard.purdie at linuxfoundation.org>
---
lib/hashserv/__init__.py | 175 ++++++++++++++++++++++++++++++++++++++++-------
1 file changed, 152 insertions(+), 23 deletions(-)
diff --git a/lib/hashserv/__init__.py b/lib/hashserv/__init__.py
index eb03c32..23528eb 100644
--- a/lib/hashserv/__init__.py
+++ b/lib/hashserv/__init__.py
@@ -16,11 +16,80 @@ import threading
import signal
import socket
import struct
+import time
+import math
from datetime import datetime
logger = logging.getLogger('hashserv')
+class Stats(object):
+ def __init__(self):
+ self.reset()
+
+ def reset(self):
+ self.num = 0
+ self.total_time = 0
+ self.max_time = 0
+ self.m = 0
+ self.s = 0
+
+ def update(self, start_time):
+ elapsed = time.perf_counter() - start_time
+
+ self.num += 1
+ if self.num == 1:
+ self.m = elapsed
+ self.s = 0
+ else:
+ last_m = self.m
+ self.m = last_m + (elapsed - last_m) / self.num
+ self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
+
+ self.total_time += elapsed
+
+ if self.max_time < elapsed:
+ self.max_time = elapsed
+
+ @property
+ def average(self):
+ if self.num == 0:
+ return 0
+ return self.total_time / self.num
+
+ @property
+ def stdev(self):
+ if self.num <= 1:
+ return 0
+ return math.sqrt(self.s / (self.num - 1))
+
+ def todict(self):
+ return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
+
+request_stats = Stats()
+connect_stats = Stats()
+
class HashEquivalenceServer(BaseHTTPRequestHandler):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.equivalent_path = self.prefix + '/v1/equivalent'
+ self.stats_path = self.prefix + '/v1/stats'
+
+ def setup(self):
+ global connect_stats
+
+ if self.request.connect_time is not None:
+ connect_stats.update(self.request.connect_time)
+ self.request.connect_time = None
+
+ self.start_time = time.perf_counter()
+ super().setup()
+
+ def finish(self):
+ global request_stats
+
+ super().finish()
+ request_stats.update(self.start_time)
+
def log_message(self, f, *args):
logger.debug(f, *args)
@@ -30,6 +99,54 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
self.db.execute("PRAGMA synchronous = OFF;")
self.db.execute("PRAGMA journal_mode = MEMORY;")
+ def do_GET_equivalent(self, p):
+ query = urllib.parse.parse_qs(p.query, strict_parsing=True)
+ method = query['method'][0]
+ taskhash = query['taskhash'][0]
+
+ d = None
+ with contextlib.closing(self.db.cursor()) as cursor:
+ cursor.execute('SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1',
+ {'method': method, 'taskhash': taskhash})
+
+ row = cursor.fetchone()
+
+ if row is not None:
+ logger.debug('Found equivalent task %s', row['taskhash'])
+ d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
+
+ data = json.dumps(d).encode('utf-8')
+ self.send_response(200)
+ self.send_header('Content-Type', 'application/json; charset=utf-8')
+ self.send_header('Content-length', str(len(data)))
+ self.end_headers()
+ self.wfile.write(data)
+
+ def do_GET_stats(self, p):
+ global request_stats
+ global connect_stats
+ d = {
+ 'requests': request_stats.todict(),
+ 'connections': connect_stats.todict()
+ }
+
+ data = json.dumps(d).encode('utf-8')
+ self.send_response(200)
+ self.send_header('Content-Type', 'application/json; charset=utf-8')
+ self.send_header('Content-length', str(len(data)))
+ self.end_headers()
+ self.wfile.write(data)
+
+ def do_DELETE_stats(self, p):
+ global request_stats
+ global connect_stats
+
+ request_stats.reset()
+ connect_stats.reset()
+
+ self.send_response(200)
+ self.end_headers()
+
def do_GET(self):
try:
if not self.db:
@@ -37,33 +154,16 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
p = urllib.parse.urlparse(self.path)
- if p.path != self.prefix + '/v1/equivalent':
+ if p.path == self.prefix + '/v1/equivalent':
+ self.do_GET_equivalent(p)
+ elif p.path == self.prefix + '/v1/stats':
+ self.do_GET_stats(p)
+ else:
self.send_error(404)
return
-
- query = urllib.parse.parse_qs(p.query, strict_parsing=True)
- method = query['method'][0]
- taskhash = query['taskhash'][0]
-
- d = None
- with contextlib.closing(self.db.cursor()) as cursor:
- cursor.execute('SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1',
- {'method': method, 'taskhash': taskhash})
-
- row = cursor.fetchone()
-
- if row is not None:
- logger.debug('Found equivalent task %s', row['taskhash'])
- d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
-
- self.send_response(200)
- self.send_header('Content-Type', 'application/json; charset=utf-8')
- self.end_headers()
- self.wfile.write(json.dumps(d).encode('utf-8'))
except:
logger.exception('Error in GET')
self.send_error(400, explain=traceback.format_exc())
- return
def do_POST(self):
try:
@@ -139,7 +239,32 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
except:
logger.exception('Error in POST')
self.send_error(400, explain=traceback.format_exc())
- return
+
+ def do_DELETE(self):
+ try:
+ p = urllib.parse.urlparse(self.path)
+
+ if p.path == self.prefix + '/v1/stats':
+ self.do_DELETE_stats(p)
+ else:
+ self.send_error(404)
+ return
+ except:
+ logger.exception('Error in DELETE')
+ self.send_error(400, explain=traceback.format_exc())
+
+class ClientSocket(socket.socket):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.connect_time = time.perf_counter()
+
+ @classmethod
+ def create(cls, sock):
+ import _socket
+ fd = _socket.dup(sock.fileno())
+ s = cls(sock.family, sock.type, sock.proto, fileno=fd)
+ s.settimeout(sock.gettimeout())
+ return s
class ThreadedHTTPServer(HTTPServer):
quit = False
@@ -188,6 +313,10 @@ class ThreadedHTTPServer(HTTPServer):
self.requestqueue.put((None, None))
self.handlerthread.join()
+ def get_request(self):
+ sock, client_address = super().get_request()
+ return ClientSocket.create(sock), client_address
+
def create_server(addr, dbname, prefix=''):
class Handler(HashEquivalenceServer):
pass
--
To stop receiving notification emails like this one, please contact
the administrator of this repository.
More information about the Openembedded-commits
mailing list