Source code for dice.client

from __future__ import print_function
import argparse
import collections
import io
import json
import logging
import os
# pylint: disable=import-error
import queue
import random
import re
import requests
import sys
import traceback
import threading
import time

from ..core import provider
from ..utils import rnd

from . import window

logger = logging.getLogger('dice')


class _TestThread(threading.Thread):
    """
    Thread class for running the main tests.
    """

    def __init__(self, exc_queue, app, **kwargs):
        threading.Thread.__init__(self, **kwargs)
        self.exc_queue = exc_queue
        self.app = app

    def run(self):
        try:
            self.app.run_tests()
        # pylint: disable=broad-except
        except Exception:
            self.exc_queue.put(sys.exc_info())


class _TestStat(object):
    """
    Class to store the tests and statistics information.
    """

    def __init__(self, key, queue_max=100, method='exact'):
        self.key = key
        self.counter = 0
        self.queue_max = queue_max
        self.method = method
        self.queue = collections.deque([], queue_max)

    def match(self, text):
        if self.method == 'exact':
            return text == self.key
        elif self.method == 'regex':
            return re.match(self.key + '$', text)

    def append(self, result):
        self.counter += 1
        self.queue.append(result)

    def extend(self, stat):
        for result in stat.queue:
            self.append(result)


[docs]class DiceApp(object): """ Curses-based DICE client application. """ def __init__(self): self.parser = argparse.ArgumentParser() self.parser.add_argument( 'providers', nargs='?', action='store', help="list of test providers separated by ','. Default to current " "working directory", default=os.getcwd(), ) self.parser.add_argument( '--server', action='store', help='server address', dest='server', default=None, ) self.parser.add_argument( '--port', action='store', help='server port', dest='port', default='8067', ) self.parser.add_argument( '--username', action='store', help='server authentication user name', dest='username', ) self.parser.add_argument( '--password', action='store', help='server authentication password', dest='password', ) self.parser.add_argument( '--no-ui', action='store_false', help="don't show terminal interactive user interface.", dest='ui', default=True, ) self.args, _ = self.parser.parse_known_args() try: self.providers = self._process_providers() except provider.ProviderError as detail: exit(detail) self.stats = { "skip": {}, "failure": {}, "success": {}, "timeout": {}, "expected_neg": {}, "unexpected_neg": {}, "unexpected_pass": {}, } self.QUEUE_MAX = 100 self.exiting = False self.pause = False self.setting_watch = False self.show_log = False self.watching = '' self.scroll_x = 0 self.scroll_y = 0 self.test_excs = queue.Queue() self.test_thread = _TestThread(self.test_excs, self) self.send_queue = [] self.last_send_thread = None self.last_item = None self.cur_counter = 'failure' if self.args.ui: self.window = window.Window(self) self.window.stat_panel.set_select_callback(self._update_items) self.window.stat_panel.add_keypress_listener( 'merge_stat', 'm', self._merge_stat) self.window.items_panel.set_select_callback(self._update_content) self.stream = io.StringIO() self.cur_class = (None, None) self.cur_item = (None, None) def _update_items(self, cat_name, item_idx): self.cur_class = (cat_name, item_idx) def _update_content(self, cat_name, item_idx): self.cur_item = (cat_name, item_idx) def _merge_stat(self, panel): self.pause = True cat_name, _ = panel.cur_key text = self.window.get_input() match_keys = [] for key in self.stats[cat_name]: res = re.match(text, key) if res is not None: match_keys.append(key) stat = self.stats[cat_name][text] = _TestStat(text, method='regex') for key in match_keys: stat.extend(self.stats[cat_name][key]) del self.stats[cat_name][key] self.pause = False def _stat_result(self, item): """ Categorizes and keep the count of a result of a test item depends on the expected failure patterns. """ res = item.res fail_patts = item.fail_patts key = res.stderr catalog = None if res: if res.exit_status == 'timeout': catalog = 'timeout' if self.watching and self.watching in res.stderr: self.pause = True if fail_patts: if res.exit_status == 'success': catalog = 'unexpected_pass' elif res.exit_status == 'failure': found = False for patt in fail_patts: if re.search(patt, res.stderr): catalog = 'expected_neg' key = patt found = True break if not found: catalog = 'unexpected_neg' else: if res.exit_status == 'success': catalog = 'success' elif res.exit_status == 'failure': catalog = 'failure' else: catalog = 'skip' found = False for stat in self.stats[catalog].values(): if stat.match(key): found = True key = stat.key break if not found: self.stats[catalog][key] = _TestStat(key) stat = self.stats[catalog][key] stat.append(res) def _process_providers(self): """ Print a list of available providers if --list-providers is set or return a dict of specified providers. """ providers = {} if self.args.providers: for path in self.args.providers.split(','): prvdr = provider.Provider(path) providers[prvdr.name] = prvdr else: sys.exit('Error: --providers option not specified') return providers def _send(self, item_queue): """ Serialize a list of test results and send them to remote server. """ content = [] for item in item_queue: content.append(item.serialize()) data = json.dumps(content) headers = {} headers['content-type'] = 'application/json' url = 'http://%s:%s/api/tests/' % (self.args.server, self.args.port) try: response = requests.post( url, data=data, headers=headers, auth=(self.args.username, self.args.password), ) if response.status_code != 201: logger.debug('Failed to send result (HTTP%s):', response.status_code) if 'DOCTYPE' in response.text: html_path = 'debug_%s.html' % rnd.regex('[a-z]{4}') with open(html_path, 'w') as fp: fp.write(response.text) logger.debug('Html response saved to %s', os.path.abspath(html_path)) else: logger.debug(response.text) except requests.ConnectionError as detail: logger.debug('Failed to send result to server: %s', detail)
[docs] def run_tests(self): """ Iteratively run tests. """ while not self.exiting: item = random.choice(self.providers.values()).generate() item.run() self.last_item = item if self.args.server is not None: self.send_queue.append(item) if len(self.send_queue) > 200: if self.last_send_thread: self.last_send_thread.join() send_thread = threading.Thread( target=self._send, args=(self.send_queue,) ) send_thread.start() self.last_send_thread = send_thread self.send_queue = [] self._stat_result(item) if self.pause: while self.pause and not self.exiting: time.sleep(0.5)
[docs] def update_window(self): """ Update the content of curses window and refresh it. """ # Set statistics panel content panel = self.window.stat_panel panel.clear() for cat_name in self.stats: for key, stat in self.stats[cat_name].items(): bundle = {'key': key, 'count': stat.counter} panel.add_item(bundle, catalog=cat_name) # Set items panel content panel = self.window.items_panel panel.clear() cat_name, item_idx = self.cur_class if cat_name is not None and item_idx is not None: item_name, stat = self.stats[cat_name].items()[item_idx] try: for item in self.stats[cat_name][item_name].queue: bundle = {'item': item.cmdline} panel.add_item(bundle) except RuntimeError: pass # Set detail panel content panel = self.window.detail_panel panel.clear() cat_name, item_idx = self.cur_class if cat_name is not None and item_idx is not None: item_name, stat = self.stats[cat_name].items()[item_idx] items = self.stats[cat_name][item_name].queue item_name, item_idx = self.cur_item if item_name is not None and item_idx is not None: bundle = items[self.cur_item[1]] panel.set_content(bundle) self.window.update()
[docs] def run(self): """ Main loop to run tests, update screen and send tests results. """ shandler = logging.StreamHandler(self.stream) logger.setLevel(logging.WARNING) for handler in logger.handlers: logger.removeHandler(handler) logger.addHandler(shandler) os.environ["EDITOR"] = "echo" self.last_item = None if self.args.ui: try: self.test_thread.start() while True: if self.args.ui: self.update_window() if self.exiting: break if not self.test_thread.isAlive(): break except KeyboardInterrupt: pass finally: if self.args.ui: self.window.destroy() self.exiting = True self.test_thread.join() try: exc = self.test_excs.get(block=False) for line in traceback.format_exception(*exc): print(line, end='') except queue.Empty: pass else: self.run_tests()