[oe-commits] [bitbake] 02/04: bitbake: hashserve: Log I/O waits and SQL time
git at git.openembedded.org
git at git.openembedded.org
Thu Aug 15 21:55:25 UTC 2019
This is an automated email from the git hooks/post-receive script.
rpurdie pushed a commit to branch master-next
in repository bitbake.
commit 6de3d5a62fa49d3e26461958f306b56ef9b465e2
Author: Joshua Watt <JPEWhacker at gmail.com>
AuthorDate: Thu Aug 15 11:41:56 2019 -0500
bitbake: hashserve: Log I/O waits and SQL time
Keeps additional statistics about the amount of time the server is
spending waiting on I/O and the amount of time waiting for sqlite.
Also ensures that stat updates are thread safe
Signed-off-by: Joshua Watt <JPEWhacker at gmail.com>
Signed-off-by: Richard Purdie <richard.purdie at linuxfoundation.org>
---
lib/hashserv/__init__.py | 218 ++++++++++++++++++++++++++++++-----------------
1 file changed, 140 insertions(+), 78 deletions(-)
diff --git a/lib/hashserv/__init__.py b/lib/hashserv/__init__.py
index 038d11c..6be01f5 100644
--- a/lib/hashserv/__init__.py
+++ b/lib/hashserv/__init__.py
@@ -17,54 +17,84 @@ import signal
import time
import math
import socket
+import threading
+from contextlib import contextmanager
from datetime import datetime
logger = logging.getLogger('hashserv')
+class Sample(object):
+ def __init__(self, stats):
+ self.elapsed = 0
+ self.stats = stats
+
+ def add(self, elapsed):
+ self.elapsed += elapsed
+
+ def end(self):
+ self.stats.add(self.elapsed)
+
+ def __enter__(self):
+ self.start_time = time.perf_counter()
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ self.add(time.perf_counter() - self.start_time)
+
class Stats(object):
def __init__(self):
+ self.lock = threading.RLock()
self.reset()
def reset(self):
- self.num = 0
- self.total_time = 0
- self.max_time = 0
- self.m = 0
- self.s = 0
-
- def update(self, start_time):
- elapsed = time.perf_counter() - start_time
-
- self.num += 1
- if self.num == 1:
- self.m = elapsed
+ with self.lock:
+ self.num = 0
+ self.total_time = 0
+ self.max_time = 0
+ self.m = 0
self.s = 0
- else:
- last_m = self.m
- self.m = last_m + (elapsed - last_m) / self.num
- self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
+ self.current_elapsed = None
+
+ def add(self, elapsed):
+ with self.lock:
+ self.num += 1
+ if self.num == 1:
+ self.m = elapsed
+ self.s = 0
+ else:
+ last_m = self.m
+ self.m = last_m + (elapsed - last_m) / self.num
+ self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
- self.total_time += elapsed
+ self.total_time += elapsed
- if self.max_time < elapsed:
- self.max_time = elapsed
+ if self.max_time < elapsed:
+ self.max_time = elapsed
+
+ def start_sample(self):
+ return Sample(self)
@property
def average(self):
- if self.num == 0:
- return 0
- return self.total_time / self.num
+ with self.lock:
+ if self.num == 0:
+ return 0
+ return self.total_time / self.num
@property
def stdev(self):
- if self.num <= 1:
- return 0
- return math.sqrt(self.s / (self.num - 1))
+ with self.lock:
+ if self.num <= 1:
+ return 0
+ return math.sqrt(self.s / (self.num - 1))
def todict(self):
- return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
+ with self.lock:
+ return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
request_stats = Stats()
+io_stats = Stats()
+sql_stats = Stats()
connect_stats = Stats()
class HashEquivalenceServer(BaseHTTPRequestHandler):
@@ -75,19 +105,33 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
def setup(self):
global connect_stats
+ global request_stats
+ global io_stats
+ global sql_stats
if self.request.connect_time is not None:
- connect_stats.update(self.request.connect_time)
+ connect_stats.add(time.perf_counter() - self.request.connect_time)
self.request.connect_time = None
+ self.io_sample = io_stats.start_sample()
+ self.sql_sample = sql_stats.start_sample()
+
self.start_time = time.perf_counter()
+
super().setup()
def finish(self):
global request_stats
super().finish()
- request_stats.update(self.start_time)
+ request_stats.add(time.perf_counter() - self.start_time)
+
+ self.io_sample.end()
+ self.sql_sample.end()
+
+ def handle_one_request(self):
+ self.start_read_time = time.perf_counter()
+ super().handle_one_request()
def log_message(self, f, *args):
logger.debug(f, *args)
@@ -105,37 +149,30 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
d = None
with contextlib.closing(self.db.cursor()) as cursor:
- cursor.execute('SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1',
- {'method': method, 'taskhash': taskhash})
+ with self.sql_sample:
+ cursor.execute('SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1',
+ {'method': method, 'taskhash': taskhash})
- row = cursor.fetchone()
+ row = cursor.fetchone()
if row is not None:
logger.debug('Found equivalent task %s', row['taskhash'])
d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
- data = json.dumps(d).encode('utf-8')
- self.send_response(200)
- self.send_header('Content-Type', 'application/json; charset=utf-8')
- self.send_header('Content-length', str(len(data)))
- self.end_headers()
- self.wfile.write(data)
+ return d
def do_GET_stats(self, p):
global request_stats
global connect_stats
- d = {
+ global io_stats
+
+ return {
'requests': request_stats.todict(),
- 'connections': connect_stats.todict()
+ 'connections': connect_stats.todict(),
+ 'io': io_stats.todict(),
+ 'sql': sql_stats.todict(),
}
- data = json.dumps(d).encode('utf-8')
- self.send_response(200)
- self.send_header('Content-Type', 'application/json; charset=utf-8')
- self.send_header('Content-length', str(len(data)))
- self.end_headers()
- self.wfile.write(data)
-
def do_DELETE_stats(self, p):
global request_stats
global connect_stats
@@ -147,6 +184,8 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
self.end_headers()
def do_GET(self):
+ self.io_sample.add(time.perf_counter() - self.start_read_time)
+
try:
if not self.db:
self.opendb()
@@ -154,17 +193,31 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
p = urllib.parse.urlparse(self.path)
if p.path == self.prefix + '/v1/equivalent':
- self.do_GET_equivalent(p)
+ d = self.do_GET_equivalent(p)
elif p.path == self.prefix + '/v1/stats':
- self.do_GET_stats(p)
+ d = self.do_GET_stats(p)
else:
- self.send_error(404)
+ with self.io_sample:
+ self.send_error(404)
return
+
+ data = json.dumps(d).encode('utf-8')
+
+ with self.io_sample:
+ self.send_response(200)
+ self.send_header('Content-Type', 'application/json; charset=utf-8')
+ self.send_header('Content-length', str(len(data)))
+ self.end_headers()
+ self.wfile.write(data)
+ self.wfile.flush()
except:
logger.exception('Error in GET')
- self.send_error(400, explain=traceback.format_exc())
+ with self.io_sample:
+ self.send_error(400, explain=traceback.format_exc())
def do_POST(self):
+ self.io_sample.add(time.perf_counter() - self.start_read_time)
+
try:
if not self.db:
self.opendb()
@@ -172,29 +225,32 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
p = urllib.parse.urlparse(self.path)
if p.path != self.prefix + '/v1/equivalent':
- self.send_error(404)
+ with self.io_sample:
+ self.send_error(404)
return
length = int(self.headers['content-length'])
- data = json.loads(self.rfile.read(length).decode('utf-8'))
+ with self.io_sample:
+ data = json.loads(self.rfile.read(length).decode('utf-8'))
with contextlib.closing(self.db.cursor()) as cursor:
- cursor.execute('''
- -- Find tasks with a matching outhash (that is, tasks that
- -- are equivalent)
- SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND outhash=:outhash
+ with self.sql_sample:
+ cursor.execute('''
+ -- Find tasks with a matching outhash (that is, tasks that
+ -- are equivalent)
+ SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND outhash=:outhash
- -- If there is an exact match on the taskhash, return it.
- -- Otherwise return the oldest matching outhash of any
- -- taskhash
- ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END,
- created ASC
+ -- If there is an exact match on the taskhash, return it.
+ -- Otherwise return the oldest matching outhash of any
+ -- taskhash
+ ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END,
+ created ASC
- -- Only return one row
- LIMIT 1
- ''', {k: data[k] for k in ('method', 'outhash', 'taskhash')})
+ -- Only return one row
+ LIMIT 1
+ ''', {k: data[k] for k in ('method', 'outhash', 'taskhash')})
- row = cursor.fetchone()
+ row = cursor.fetchone()
# If no matching outhash was found, or one *was* found but it
# wasn't an exact match on the taskhash, a new entry for this
@@ -219,25 +275,29 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
if k in data:
insert_data[k] = data[k]
- cursor.execute('''INSERT INTO tasks_v2 (%s) VALUES (%s)''' % (
- ', '.join(sorted(insert_data.keys())),
- ', '.join(':' + k for k in sorted(insert_data.keys()))),
- insert_data)
+ with self.sql_sample:
+ cursor.execute('''INSERT INTO tasks_v2 (%s) VALUES (%s)''' % (
+ ', '.join(sorted(insert_data.keys())),
+ ', '.join(':' + k for k in sorted(insert_data.keys()))),
+ insert_data)
- logger.info('Adding taskhash %s with unihash %s', data['taskhash'], unihash)
+ logger.info('Adding taskhash %s with unihash %s', data['taskhash'], unihash)
- self.db.commit()
+ self.db.commit()
d = {'taskhash': data['taskhash'], 'method': data['method'], 'unihash': unihash}
else:
d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
- self.send_response(200)
- self.send_header('Content-Type', 'application/json; charset=utf-8')
- self.end_headers()
- self.wfile.write(json.dumps(d).encode('utf-8'))
+ with self.io_sample:
+ self.send_response(200)
+ self.send_header('Content-Type', 'application/json; charset=utf-8')
+ self.end_headers()
+ self.wfile.write(json.dumps(d).encode('utf-8'))
+ self.wfile.flush()
except:
logger.exception('Error in POST')
- self.send_error(400, explain=traceback.format_exc())
+ with self.io_sample:
+ self.send_error(400, explain=traceback.format_exc())
def do_DELETE(self):
try:
@@ -246,11 +306,13 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
if p.path == self.prefix + '/v1/stats':
self.do_DELETE_stats(p)
else:
- self.send_error(404)
+ with self.io_sample:
+ self.send_error(404)
return
except:
logger.exception('Error in DELETE')
- self.send_error(400, explain=traceback.format_exc())
+ with self.io_sample:
+ self.send_error(400, explain=traceback.format_exc())
class ClientSocket(socket.socket):
def __init__(self, *args, **kwargs):
--
To stop receiving notification emails like this one, please contact
the administrator of this repository.
More information about the Openembedded-commits
mailing list