gluon.shell 源代码

# -*- coding: utf-8 -*-

"""
| This file is part of the web2py Web Framework
| Developed by Massimo Di Pierro <mdipierro@cs.depaul.edu>,
| limodou <limodou@gmail.com> and srackham <srackham@gmail.com>.
| License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html)

Web2py environment in the shell
--------------------------------
"""

from __future__ import print_function

import os
import sys
import code
import copy
import logging
import types
import re
import glob
import site
import traceback
import gluon.fileutils as fileutils
from gluon.settings import global_settings
from gluon.compileapp import build_environment, read_pyc, run_models_in
from gluon.restricted import RestrictedError
from gluon.globals import Request, Response, Session
from gluon.storage import Storage, List
from gluon.admin import w2p_unpack
from pydal.base import BaseAdapter
from gluon._compat import iteritems, ClassType, PY2

logger = logging.getLogger("web2py")

if not PY2:
[文档] def execfile(filename, global_vars=None, local_vars=None): with open(filename, "rb") as f: code = compile(f.read(), filename, 'exec') exec(code, global_vars, local_vars)
raw_input = input
[文档]def enable_autocomplete_and_history(adir, env): try: import rlcompleter import atexit import readline except ImportError: pass else: readline.parse_and_bind("tab: complete") history_file = os.path.join(adir, '.pythonhistory') try: readline.read_history_file(history_file) except IOError: open(history_file, 'a').close() atexit.register(readline.write_history_file, history_file) readline.set_completer(rlcompleter.Completer(env).complete)
REGEX_APP_PATH = '(?:.*/)?applications/(?P<a>[^/]+)'
[文档]def exec_environment( pyfile='', request=None, response=None, session=None, ): """Environment builder and module loader. Builds a web2py environment and optionally executes a Python file into the environment. A Storage dictionary containing the resulting environment is returned. The working directory must be web2py root -- this is the web2py default. """ if request is None: request = Request({}) if response is None: response = Response() if session is None: session = Session() if request.folder is None: mo = re.match(REGEX_APP_PATH, pyfile) if mo: a = mo.group('a') request.folder = os.path.abspath(os.path.join('applications', a)) else: request.folder = '' env = build_environment(request, response, session, store_current=False) if pyfile: pycfile = pyfile + 'c' if os.path.isfile(pycfile): exec(read_pyc(pycfile), env) else: execfile(pyfile, env) return Storage(env)
[文档]def env( a, import_models=False, c=None, f=None, dir='', extra_request={}, ): """ Returns web2py execution environment for application (a), controller (c), function (f). If import_models is True the exec all application models into the environment. extra_request allows you to pass along any extra variables to the request object before your models get executed. This was mainly done to support web2py_utils.test_runner, however you can use it with any wrapper scripts that need access to the web2py environment. """ request = Request({}) response = Response() session = Session() request.application = a # Populate the dummy environment with sensible defaults. if not dir: request.folder = os.path.join('applications', a) else: request.folder = dir request.controller = c or 'default' request.function = f or 'index' response.view = '%s/%s.html' % (request.controller, request.function) cmd_opts = global_settings.cmd_options if cmd_opts: if not cmd_opts.interfaces: ip = cmd_opts.ip port = cmd_opts.port else: first_if = cmd_opts.interfaces[0] ip = first_if[0] port = first_if[1] request.is_shell = cmd_opts.shell is not None else: ip = '127.0.0.1'; port = 8000 request.is_shell = False request.is_scheduler = False request.env.http_host = '%s:%s' % (ip, port) request.env.remote_addr = '127.0.0.1' request.env.web2py_runtime_gae = global_settings.web2py_runtime_gae for k, v in extra_request.items(): setattr(request, k, v) path_info = '/%s/%s/%s' % (a, c, f) if request.args: path_info = '%s/%s' % (path_info, '/'.join(request.args)) if request.vars: vars = ['%s=%s' % (k, v) if v else '%s' % k for (k, v) in iteritems(request.vars)] path_info = '%s?%s' % (path_info, '&'.join(vars)) request.env.path_info = path_info # Monkey patch so credentials checks pass. def check_credentials(request, other_application='admin'): return True fileutils.check_credentials = check_credentials environment = build_environment(request, response, session) if import_models: try: run_models_in(environment) except RestrictedError as e: sys.stderr.write(e.traceback + '\n') sys.exit(1) response._view_environment = copy.copy(environment) environment['__name__'] = '__main__' return environment
[文档]def exec_pythonrc(): pythonrc = os.environ.get('PYTHONSTARTUP') if pythonrc and os.path.isfile(pythonrc): def execfile_getlocals(file): execfile(file) return locals() try: return execfile_getlocals(pythonrc) except NameError: pass return dict()
[文档]def die(msg, exit_status=1, error_preamble=True): if error_preamble: msg = "%s: error: %s" % (sys.argv[0], msg) print(msg, file=sys.stderr) sys.exit(exit_status)
[文档]def run( appname, plain=False, import_models=False, startfile=None, bpython=False, python_code=None, cron_job=False, scheduler_job=False, force_migrate=False, fake_migrate=False): """ Start interactive shell or run Python script (startfile) in web2py controller environment. appname is formatted like: - a : web2py application name - a/c : exec the controller c into the application environment - a/c/f : exec the controller c, then the action f into the application environment - a/c/f?x=y : as above """ (a, c, f, args, vars) = parse_path_info(appname, av=True) errmsg = 'invalid application name: %s' % appname if not a: die(errmsg, error_preamble=False) adir = os.path.join('applications', a) if not os.path.exists(adir): if not cron_job and not scheduler_job and \ sys.stdin and not sys.stdin.name == '/dev/null': confirm = raw_input( 'application %s does not exist, create (y/N)?' % a) else: logging.warn('application does not exist and will not be created') return if confirm.lower() in ('y', 'yes'): os.mkdir(adir) fileutils.create_app(adir) else: logging.warn('application folder does not exist and has not been created as requested') return if force_migrate: c = 'appadmin' # Load all models (hack already used for appadmin controller) import_models = True from gluon.dal import DAL orig_init = DAL.__init__ def custom_init(*args, **kwargs): kwargs['migrate_enabled'] = True kwargs['migrate'] = True kwargs['fake_migrate'] = fake_migrate logger.info('Forcing migrate_enabled=True') orig_init(*args, **kwargs) DAL.__init__ = custom_init if c: import_models = True extra_request = {} if args: extra_request['args'] = args if scheduler_job: extra_request['is_scheduler'] = True if vars: # underscore necessary because request.vars is a property extra_request['_vars'] = vars _env = env(a, c=c, f=f, import_models=import_models, extra_request=extra_request) if c: pyfile = os.path.join('applications', a, 'controllers', c + '.py') pycfile = os.path.join('applications', a, 'compiled', "controllers.%s.%s.pyc" % (c, f)) if ((cron_job and os.path.isfile(pycfile)) or not os.path.isfile(pyfile)): exec(read_pyc(pycfile), _env) elif os.path.isfile(pyfile): execfile(pyfile, _env) else: die(errmsg, error_preamble=False) if f: exec('print( %s())' % f, _env) return _env.update(exec_pythonrc()) if startfile: try: ccode = None if startfile.endswith('.pyc'): ccode = read_pyc(startfile) exec(ccode, _env) else: execfile(startfile, _env) if import_models: BaseAdapter.close_all_instances('commit') except SystemExit: print(traceback.format_exc()) if import_models: BaseAdapter.close_all_instances('rollback') raise except: print(traceback.format_exc()) if import_models: BaseAdapter.close_all_instances('rollback') elif python_code: try: exec(python_code, _env) if import_models: BaseAdapter.close_all_instances('commit') except SystemExit: print(traceback.format_exc()) if import_models: BaseAdapter.close_all_instances('rollback') raise except: print(traceback.format_exc()) if import_models: BaseAdapter.close_all_instances('rollback') elif force_migrate: try: execfile("scripts/migrator.py", _env) if import_models: BaseAdapter.close_all_instances('commit') except SystemExit: print(traceback.format_exc()) if import_models: BaseAdapter.close_all_instances('rollback') raise except: print(traceback.format_exc()) if import_models: BaseAdapter.close_all_instances('rollback') else: if not plain: if bpython: try: import bpython bpython.embed(locals_=_env) return except: logger.warning( 'import bpython error; trying ipython...') else: try: import IPython if IPython.__version__ > '1.0.0': IPython.start_ipython(user_ns=_env) return elif IPython.__version__ == '1.0.0': from IPython.terminal.embed import InteractiveShellEmbed shell = InteractiveShellEmbed(user_ns=_env) shell() return elif IPython.__version__ >= '0.11': from IPython.frontend.terminal.embed import InteractiveShellEmbed shell = InteractiveShellEmbed(user_ns=_env) shell() return else: # following 2 lines fix a problem with # IPython; thanks Michael Toomim if '__builtins__' in _env: del _env['__builtins__'] shell = IPython.Shell.IPShell(argv=[], user_ns=_env) shell.mainloop() return except: logger.warning( 'import IPython error; use default python shell') enable_autocomplete_and_history(adir, _env) code.interact(local=_env)
[文档]def parse_path_info(path_info, av=False): """ Parses path info formatted like a/c/f where c and f are optional and a leading `/` is accepted. Return tuple (a, c, f). If invalid path_info a is set to None. If c or f are omitted they are set to None. If av=True, parse args and vars """ if av: vars = None if '?' in path_info: path_info, query = path_info.split('?', 2) vars = Storage() for var in query.split('&'): (var, val) = var.split('=', 2) if '=' in var else (var, None) vars[var] = val items = List(path_info.split('/')) args = List(items[3:]) if len(items) > 3 else None return (items(0), items(1), items(2), args, vars) mo = re.match(r'^/?(?P<a>\w+)(/(?P<c>\w+)(/(?P<f>\w+))?)?$', path_info) if mo: return (mo.group('a'), mo.group('c'), mo.group('f')) else: return (None, None, None)
[文档]def test(testpath, import_models=True, verbose=False): """ Run doctests in web2py environment. testpath is formatted like: - a: tests all controllers in application a - a/c: tests controller c in application a - a/c/f test function f in controller c, application a Where a, c and f are application, controller and function names respectively. If the testpath is a file name the file is tested. If a controller is specified models are executed by default. """ import doctest if os.path.isfile(testpath): mo = re.match(REGEX_APP_PATH, testpath) if not mo: die('test file is not in application directory: %s' % testpath) a = mo.group('a') c = f = None files = [testpath] else: (a, c, f) = parse_path_info(testpath) errmsg = 'invalid test path: %s' % testpath if not a: die(errmsg) cdir = os.path.join('applications', a, 'controllers') if not os.path.isdir(cdir): die(errmsg) if c: cfile = os.path.join(cdir, c + '.py') if not os.path.isfile(cfile): die(errmsg) files = [cfile] else: files = glob.glob(os.path.join(cdir, '*.py')) for testfile in files: globs = env(a, import_models) ignores = globs.copy().keys() execfile(testfile, globs) def doctest_object(name, obj): """doctest obj and enclosed methods and classes.""" if type(obj) in (types.FunctionType, type, ClassType, types.MethodType, types.ModuleType): # Reload environment before each test. globs = env(a, c=c, f=f, import_models=import_models) execfile(testfile, globs) doctest.run_docstring_examples( obj, globs=globs, name='%s: %s' % (os.path.basename(testfile), name), verbose=verbose) if type(obj) in (type, ClassType): for attr_name in dir(obj): # Execute . operator so decorators are executed. o = eval('%s.%s' % (name, attr_name), globs) doctest_object(attr_name, o) for (name, obj) in globs.items(): if name not in ignores and (f is None or f == name): doctest_object(name, obj)