Source code for pacifica.cli.query

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""These are the query methods used for interactive query."""
from __future__ import print_function
from sys import stdin, stdout, stderr
from subprocess import Popen, PIPE
from os import getenv, pathsep, path
from functools import cmp_to_key
import re


[docs]def set_query_obj(dep_meta_ids, md_update, obj): """Return the query object or false.""" if dep_meta_ids: if {bool(md_update[dep_id].value) for dep_id in dep_meta_ids} == {True}: return obj return None return obj
[docs]def find_leaf_node(md_update): """Find a leaf node that has all deps resolved.""" query_obj = md_update[0] for obj in md_update: if not bool(obj.value): dep_meta_ids = md_update.dependent_meta_id(obj.metaID) query_obj = set_query_obj(dep_meta_ids, md_update, obj) if query_obj: break md_update.update_parents(query_obj.metaID) return md_update[query_obj.metaID]
[docs]def paged_content(title, display_data, valid_ids): """Display the data yielding results.""" num_re = re.compile('[0-9][0-9]') def id_cmp(a_id, b_id): """Compare two IDs by grabbing as many numbers as we can.""" int_a = int(num_re.match(a_id).group(0)) int_b = int(num_re.match(b_id).group(0)) return int_a < int_b yield """ {} - Select an ID ===================================== """.format(title) for _id in sorted(valid_ids, key=cmp_to_key(id_cmp)): yield display_data[_id]
[docs]def format_query_results(md_update, query_obj): """Format the query results and return some data structures.""" valid_ids = [] display_data = {} for obj in md_update[query_obj.metaID].query_results: valid_ids.append(str(obj[query_obj.valueField])) display_data[str(obj[query_obj.valueField])] = md_update[query_obj.metaID].displayFormat.format(**obj) return (valid_ids, display_data)
[docs]def set_selected_id(selected_id, default_id, valid_ids): """Return the selected ID validating it first.""" if not selected_id: selected_id = default_id if str(selected_id) not in valid_ids: selected_id = False return selected_id
[docs]def parse_command(exe): """Walk the system path and return executable path.""" real_cmd = None for ext in getenv('PATHEXT', '').split(pathsep): # pragma: no branch for binpath in getenv('PATH').split(pathsep): # pragma: no branch check_cmd = path.join(binpath, exe.strip()) + ext if path.isfile(check_cmd): # pragma: no branch real_cmd = check_cmd break if real_cmd: # pragma: no branch break return real_cmd
[docs]def execute_pager(content): """Find the appropriate pager default is embedded python pager.""" pager_default = ['python', '-m', 'pager', '-'] pager_exes = [ getenv('PAGER', 'more').split(), ['more'], ['less'], ['most'] ] for pager_exe in pager_exes: if not (pager_exe and pager_exe[0]): # pragma: no cover simple check to see if pager is something continue pager_full_path = parse_command(pager_exe[0]) if pager_full_path: pager_default = pager_exe pager_default[0] = pager_full_path break pager_proc = Popen( pager_default, stdin=PIPE, stdout=stdout, stderr=stderr ) pager_proc.communicate('\n'.join(content).encode('utf-8')) return pager_proc.wait()
[docs]def interactive_select_loop(md_update, query_obj, default_id): """While loop to ask users what they want to select.""" valid_ids, display_data = format_query_results(md_update, query_obj) selected_id = False if len(valid_ids) == 1: return valid_ids[0] while not selected_id: execute_pager(paged_content( query_obj.displayTitle, display_data, valid_ids)) stdout.write('Select ID ({}): '.format(default_id)) selected_id = stdin.readline().strip() selected_id = set_selected_id(selected_id, default_id, valid_ids) return selected_id
[docs]def set_results(md_update, query_obj, default_id, interactive=False): """Set results of the query and ask if interactive.""" if interactive: selected_id = interactive_select_loop(md_update, query_obj, default_id) else: print_text = 'Setting {} to {}.'.format(query_obj.metaID, default_id) print(print_text) selected_id = default_id if selected_id != md_update[query_obj.metaID].value: new_obj = query_obj._replace(value=selected_id) md_update[query_obj.metaID] = new_obj
[docs]def filter_results(md_update, query_obj, regex): """Filter the results of query_obj by regex and save result back into md_update.""" reg_engine = re.compile(regex, re.UNICODE) _valid_ids, display_data = format_query_results(md_update, query_obj) filtered_results = [] for index in range(len(query_obj.query_results)): res = query_obj.query_results[index] if reg_engine.search(display_data[str(res[query_obj.valueField])]): filtered_results.append(query_obj.query_results[index]) md_update[query_obj.metaID] = query_obj._replace( query_results=filtered_results) valid_ids, display_data = format_query_results(md_update, query_obj) if valid_ids and query_obj.value not in valid_ids: md_update[query_obj.metaID] = query_obj._replace( value=filtered_results[0][query_obj.valueField])
[docs]def remove_results(md_update): """Remove the query results before passing it on.""" for query_obj in md_update: for result in query_obj.query_results: if result[query_obj.valueField] == query_obj.value: md_update[query_obj.metaID] = query_obj._replace(query_results=[result]) continue
[docs]def query_main(md_update, args): """Query from the metadata configuration.""" while not md_update.is_valid(): query_obj = find_leaf_node(md_update) regex = getattr(args, '{}_regex'.format(query_obj.metaID)) if not regex: regex = '.*' filter_results(md_update, query_obj, regex) default_id = getattr(args, query_obj.metaID, None) if not default_id or query_obj.metaID == 'logon': default_id = md_update[query_obj.metaID].value set_results( md_update, query_obj, default_id, args.interactive ) if not md_update[query_obj.metaID].value: raise RuntimeError('Could not find value for {}'.format(query_obj.metaID)) remove_results(md_update) return md_update