# -*- coding: utf-8 -*-
# vim: set ts=4 sw=4 et ai:

| This file is part of the web2py Web Framework
| Copyrighted by Massimo Di Pierro <>
| License: LGPLv3 (

GUI widget and services start function

from __future__ import print_function

import time
import sys
import os
from collections import OrderedDict
import socket
import threading
import math
import logging
import signal
import getpass

from gluon.fileutils import read_file, create_welcome_w2p
from import die, run, test
from gluon._compat import PY2, xrange
from gluon.utils import (getipaddrinfo, is_loopback_ip_address,
from gluon.console import is_appdir, console
from gluon import newcron
from gluon import main
from gluon.settings import global_settings

ProgramName = 'web2py Web Framework'
ProgramAuthor = 'Created by Massimo Di Pierro, Copyright 2007-' + str(
ProgramVersion = read_file('VERSION').rstrip()

if sys.version_info < (2, 7) or (3, 0) < sys.version_info < (3, 5):
    from platform import python_version
    sys.stderr.write("Warning: web2py requires at least Python 2.7/3.5"
        " but you are running %s\n" % python_version())

[文档]def run_system_tests(options): """ Runs unittests for gluon.tests """ # see "python -m unittest -h" for unittest options help # NOTE: someone might be interested either in using the # -f (--failfast) option to stop testing on first failure, or # in customizing the test selection, for example to run only # 'gluon.tests.<module>', 'gluon.tests.<module>.<class>' (this # could be shortened as 'gluon.tests.<class>'), or even # 'gluon.tests.<module>.<class>.<method>' (or # the shorter 'gluon.tests.<class>.<method>') call_args = ['-m', 'unittest', '-c', 'gluon.tests'] if options.verbose: call_args.insert(-1, '-v') if options.with_coverage: try: import coverage except: die('Coverage not installed') if not PY2: sys.stderr.write('Experimental ') sys.stderr.write("Python %s\n" % sys.version) if options.with_coverage: coverage_exec = 'coverage2' if PY2 else 'coverage3' coverage_config_file = os.path.join('gluon', 'tests', 'coverage.ini') coverage_config = os.environ.setdefault("COVERAGE_PROCESS_START", coverage_config_file) run_args = [coverage_exec, 'run', '--rcfile=%s' % coverage_config] # replace the current process os.execvpe(run_args[0], run_args + call_args, os.environ) else: run_args = [sys.executable] # replace the current process os.execv(run_args[0], run_args + call_args)
[文档]def get_url(host, path='/', proto='http', port=80): if ':' in host: host = '[%s]' % host elif host == '': host = '' if path.startswith('/'): path = path[1:] if proto.endswith(':'): proto = proto[:-1] if not port or port == 80: port = '' else: port = ':%s' % port return '%s://%s%s/%s' % (proto, host, port, path)
[文档]def start_browser(url, startup=False): if startup: print('please visit:') print('\t' + url) print('starting browser...') try: import webbrowser except: print('warning: unable to detect your browser')
[文档]class web2pyDialog(object): """ Main window dialog """ def __init__(self, root, options): """ web2pyDialog constructor """ if PY2: import Tkinter as tkinter import tkMessageBox as messagebox else: import tkinter from tkinter import messagebox root.withdraw() bg_color = 'white' self.root = tkinter.Toplevel(root, bg=bg_color) self.root.resizable(0, 0) self.root.title(ProgramName) self.options = options self.scheduler_processes_lock = threading.RLock() self.scheduler_processes = OrderedDict() iconphoto = os.path.join('extras', 'icons', 'web2py.gif') if os.path.exists(iconphoto): img = tkinter.PhotoImage(file=iconphoto)'wm', 'iconphoto', self.root._w, img) # Building the Menu = tkinter.Menu(self.root) servermenu = tkinter.Menu(, tearoff=0) httplog = os.path.join(options.folder, options.log_filename) item = lambda: start_browser(httplog) servermenu.add_command(label='View httpserver.log', command=item) servermenu.add_command(label='Quit (pid:%i)' % os.getpid(), command=self.quit)'Server', menu=servermenu) self.pagesmenu = tkinter.Menu(, tearoff=0)'Pages', menu=self.pagesmenu) self.schedmenu = tkinter.Menu(, tearoff=0)'Scheduler', menu=self.schedmenu) # register and start schedulers self.update_schedulers(start=True) helpmenu = tkinter.Menu(, tearoff=0) # Home Page item = lambda: start_browser('') helpmenu.add_command(label='Home Page', command=item) # About ProgramInfo = """%s %s %s""" % (ProgramName, ProgramAuthor, ProgramVersion) item = lambda: messagebox.showinfo('About web2py', ProgramInfo) helpmenu.add_command(label='About', command=item)'Info', menu=helpmenu) self.root.config( if options.taskbar: self.root.protocol('WM_DELETE_WINDOW', lambda: self.quit(True)) else: self.root.protocol('WM_DELETE_WINDOW', self.quit) sticky = tkinter.NW # Prepare the logo area self.logoarea = tkinter.Canvas(self.root, background=bg_color, width=300, height=300) self.logoarea.grid(row=0, column=0, columnspan=4, sticky=sticky) self.logoarea.after(1000, self.update_canvas) logo = os.path.join('extras', 'icons', 'splashlogo.gif') if os.path.exists(logo): img = tkinter.PhotoImage(file=logo) pnl = tkinter.Label(self.logoarea, image=img, background=bg_color, bd=0) pnl.pack(side='top', fill='both', expand='yes') # Prevent garbage collection of img pnl.image = img # Prepare the banner area self.bannerarea = tkinter.Canvas(self.root, bg=bg_color, width=300, height=300) self.bannerarea.grid(row=1, column=1, columnspan=2, sticky=sticky) tkinter.Label(self.bannerarea, anchor=tkinter.N, text=str(ProgramVersion + "\n" + ProgramAuthor), font=('Helvetica', 11), justify=tkinter.CENTER, foreground='#195866', background=bg_color, height=3).pack(side='top', fill='both', expand='yes') self.bannerarea.after(1000, self.update_canvas) # IP # retrieves the list of server IP addresses try: if_ips = list(set( # no duplicates [addrinfo[4][0] for addrinfo in getipaddrinfo(socket.getfqdn()) if not is_loopback_ip_address(addrinfo=addrinfo)])) except socket.gaierror: if_ips = [] tkinter.Label(self.root, text='Server IP:', bg=bg_color, justify=tkinter.RIGHT).grid(row=4, column=1, sticky=sticky) self.ips = {} self.selected_ip = tkinter.StringVar() row = 4 ips = [('', 'Local (IPv4)')] + \ ([('::1', 'Local (IPv6)')] if socket.has_ipv6 else []) + \ [(ip, 'Public') for ip in if_ips] + \ [('', 'Public')] for ip, legend in ips: self.ips[ip] = tkinter.Radiobutton( self.root, bg=bg_color, highlightthickness=0, selectcolor='light grey', width=30, anchor=tkinter.W, text='%s (%s)' % (legend, ip), justify=tkinter.LEFT, variable=self.selected_ip, value=ip) self.ips[ip].grid(row=row, column=2, sticky=sticky) if row == 4: self.ips[ip].select() row += 1 shift = row # Port tkinter.Label(self.root, text='Server Port:', bg=bg_color, justify=tkinter.RIGHT).grid(row=shift, column=1, pady=10, sticky=sticky) self.port_number = tkinter.Entry(self.root) self.port_number.insert(tkinter.END, options.port) self.port_number.grid(row=shift, column=2, sticky=sticky, pady=10) # Password tkinter.Label(self.root, text='Choose Password:', bg=bg_color, justify=tkinter.RIGHT).grid(row=shift + 1, column=1, sticky=sticky) self.password = tkinter.Entry(self.root, show='*') self.password.bind('<Return>', lambda e: self.start()) self.password.focus_force() self.password.grid(row=shift + 1, column=2, sticky=sticky) # Prepare the canvas self.canvas = tkinter.Canvas(self.root, width=400, height=100, bg='black') self.canvas.grid(row=shift + 2, column=1, columnspan=2, pady=5, sticky=sticky) self.canvas.after(1000, self.update_canvas) # Prepare the frame frame = tkinter.Frame(self.root) frame.grid(row=shift + 3, column=1, columnspan=2, pady=5, sticky=sticky) # Start button self.button_start = tkinter.Button(frame, text='start server', command=self.start) self.button_start.grid(row=0, column=0, sticky=sticky) # Stop button self.button_stop = tkinter.Button(frame, text='stop server', command=self.stop) self.button_stop.grid(row=0, column=1, sticky=sticky) self.button_stop.configure(state='disabled') if options.taskbar: import gluon.contrib.taskbar_widget self.tb = gluon.contrib.taskbar_widget.TaskBarIcon() self.checkTaskBar() if options.password != '<ask>': self.password.insert(0, options.password) self.start() self.root.withdraw() else: self.tb = None
[文档] def update_schedulers(self, start=False): applications_folder = os.path.join(self.options.folder, 'applications') available_apps = [ arq for arq in os.listdir(applications_folder) if os.path.isdir(os.path.join(applications_folder, arq)) ] with self.scheduler_processes_lock: # reset the menu # since applications can disappear (be disinstalled) must # clear the menu (should use tkinter.END or tkinter.LAST) self.schedmenu.delete(0, 'end') for arq in available_apps: if arq not in self.scheduler_processes: item = lambda a = arq: self.try_start_scheduler(a) self.schedmenu.add_command(label="start %s" % arq, command=item) if arq in self.scheduler_processes: item = lambda a = arq: self.try_stop_scheduler(a) self.schedmenu.add_command(label="stop %s" % arq, command=item) if start and self.options.with_scheduler and self.options.schedulers: # the widget takes care of starting the schedulers apps = [ag.split(':', 1)[0] for ag in self.options.schedulers] else: apps = [] for app in apps: self.try_start_scheduler(app)
[文档] def start_schedulers(self, app): from multiprocessing import Process code = "from gluon.globals import current;current._scheduler.loop()" print('starting scheduler from widget for "%s"...' % app) args = (app, True, True, None, False, code, False, True) p = Process(target=run, args=args) with self.scheduler_processes_lock: self.scheduler_processes[app] = p self.update_schedulers() print("Currently running %s scheduler processes" % ( len(self.scheduler_processes))) p.start() print("Processes started")
[文档] def try_stop_scheduler(self, app, skip_update=False): p = None with self.scheduler_processes_lock: if app in self.scheduler_processes: p = self.scheduler_processes[app] del self.scheduler_processes[app] if p is not None: p.terminate() p.join() if not skip_update: self.update_schedulers()
[文档] def try_start_scheduler(self, app): t = None with self.scheduler_processes_lock: if not is_appdir(self.options.folder, app): self.schedmenu.delete("start %s" % app) return if app not in self.scheduler_processes: t = threading.Thread(target=self.start_schedulers, args=(app,)) if t is not None: t.start()
[文档] def checkTaskBar(self): """ Checks taskbar status """ tb = self.tb if tb.status: st0 = tb.status[0] EnumStatus = tb.EnumStatus if st0 == EnumStatus.QUIT: self.quit() elif st0 == EnumStatus.TOGGLE: if self.root.state() == 'withdrawn': self.root.deiconify() else: self.root.withdraw() elif st0 == EnumStatus.STOP: self.stop() elif st0 == EnumStatus.START: self.start() elif st0 == EnumStatus.RESTART: self.stop() self.start() del tb.status[0] self.root.after(1000, self.checkTaskBar)
[文档] def connect_pages(self): """ Connects pages """ # reset the menu, # since applications can disappear (be disinstalled) must # clear the menu (should use tkinter.END or tkinter.LAST) self.pagesmenu.delete(0, 'end') applications_folder = os.path.join(self.options.folder, 'applications') available_apps = [ arq for arq in os.listdir(applications_folder) if os.path.exists(os.path.join(applications_folder, arq, '')) ] for arq in available_apps: url = self.url + arq item = lambda a = arq: self.try_start_browser(a) self.pagesmenu.add_command( label=url, command=item)
[文档] def try_start_browser(self, app): url = self.url + app if not is_appdir(self.options.folder, app): self.pagesmenu.delete(url) return start_browser(url)
[文档] def quit(self, justHide=False): """ Finishes the program execution """ if justHide: self.root.withdraw() else: try: with self.scheduler_processes_lock: scheds = list(self.scheduler_processes.keys()) for t in scheds: self.try_stop_scheduler(t, skip_update=True) except: pass if self.options.with_cron and not self.options.soft_cron: # shutting down hardcron try: newcron.stopcron() except: pass try: # HttpServer.stop takes care of stopping softcron self.server.stop() except: pass try: self.tb.Destroy() except: pass self.root.destroy() sys.exit(0)
[文档] def error(self, message): """ Shows error message """ if PY2: import tkMessageBox as messagebox else: from tkinter import messagebox messagebox.showerror('web2py start server', message)
[文档] def start(self): """ Starts web2py server """ password = self.password.get() if not password: self.error('no password, no web admin interface') ip = self.selected_ip.get() if not is_valid_ip_address(ip): return self.error('invalid host ip address') try: port = int(self.port_number.get()) except ValueError: return self.error('invalid port number') if self.options.server_key and self.options.server_cert: proto = 'https' else: proto = 'http' self.url = get_url(ip, proto=proto, port=port) self.connect_pages() self.update_schedulers() # softcron is stopped with HttpServer, thus if starting again # need to reset newcron._stopping to re-enable cron if self.options.soft_cron: newcron.reset() # FIXME: if the HttpServer is stopped, then started again, # does not start because of following error: # WARNING:Rocket.Errors.Port8000:Listener started when not ready. self.button_start.configure(state='disabled') try: options = self.options req_queue_size = options.request_queue_size self.server = main.HttpServer( ip, port, password, pid_filename=options.pid_filename, log_filename=options.log_filename, profiler_dir=options.profiler_dir, ssl_certificate=options.server_cert, ssl_private_key=options.server_key, ssl_ca_certificate=options.ca_cert, min_threads=options.min_threads, max_threads=options.max_threads, server_name=options.server_name, request_queue_size=req_queue_size, timeout=options.timeout, shutdown_timeout=options.shutdown_timeout, path=options.folder, interfaces=options.interfaces) threading.Thread(target=self.server.start).start() except Exception as e: self.button_start.configure(state='normal') return self.error(str(e)) if not self.server_ready(): self.button_start.configure(state='normal') return self.button_stop.configure(state='normal') if not options.taskbar: cpt = threading.Thread(target=start_browser, args=(get_url(ip, proto=proto, port=port), True)) cpt.setDaemon(True) cpt.start() self.password.configure(state='readonly') for ip in self.ips.values(): ip.configure(state='disabled') self.port_number.configure(state='readonly') if self.tb: self.tb.SetServerRunning()
[文档] def server_ready(self): for listener in self.server.server.listeners: if listener.ready: return True return False
[文档] def stop(self): """ Stops web2py server """ self.button_start.configure(state='normal') self.button_stop.configure(state='disabled') self.password.configure(state='normal') for ip in self.ips.values(): ip.configure(state='normal') self.port_number.configure(state='normal') self.server.stop() if self.tb: self.tb.SetServerStopped()
[文档] def update_canvas(self): """ Updates canvas """ httplog = os.path.join(self.options.folder, self.options.log_filename) canvas = self.canvas try: t1 = os.path.getsize(httplog) except OSError: canvas.after(1000, self.update_canvas) return points = 400 try: pvalues = self.p0[1:] with open(httplog, 'r') as fp: data = - self.t0) self.p0 = pvalues + [10 + 90.0 / math.sqrt(1 + data.count('\n'))] for i in xrange(points - 1): c = canvas.coords(self.q0[i]) canvas.coords(self.q0[i], (c[0], self.p0[i], c[2], self.p0[i + 1])) self.t0 = t1 except AttributeError: self.t0 = time.time() self.t0 = t1 self.p0 = [100] * points self.q0 = [canvas.create_line(i, 100, i + 1, 100, fill='green') for i in xrange(points - 1)] canvas.after(1000, self.update_canvas)
[文档]def get_code_for_scheduler(applications_parent, app_groups): app = app_groups[0] if not is_appdir(applications_parent, app): print("Application '%s' doesn't exist, skipping" % app) return None, None code = 'from gluon.globals import current;' if len(app_groups) > 1: code += "current._scheduler.group_names=['%s'];" % "','".join( app_groups[1:]) code += "current._scheduler.loop()" return app, code
[文档]def start_schedulers(options): from multiprocessing import Process apps = [ag.split(':') for ag in options.schedulers] if not options.with_scheduler and len(apps) == 1: app, code = get_code_for_scheduler(options.folder, apps[0]) if not app: return print('starting single-scheduler for "%s"...' % app) run(app, True, True, None, False, code, False, True) return # Work around OS X problem: if PY2: import urllib else: import urllib.request as urllib urllib.getproxies() processes = [] for app_groups in apps: app, code = get_code_for_scheduler(options.folder, app_groups) if not app: continue print('starting scheduler for "%s"...' % app) args = (app, True, True, None, False, code, False, True) p = Process(target=run, args=args) processes.append(p) print("Currently running %s scheduler processes" % (len(processes))) p.start() ##to avoid bashing the db at the same time time.sleep(0.7) print("Processes started") for p in processes: try: p.join() except (KeyboardInterrupt, SystemExit): print("Processes stopped") except: p.terminate() p.join()
[文档]def start(): """ Starts server and other services """ # get command line arguments options = console(version=ProgramVersion) if options.with_scheduler or len(options.schedulers) > 1: try: from multiprocessing import Process except: die('Sorry, -K/--scheduler only supported for Python 2.6+') if options.gae: # write app.yaml,, and exit if not os.path.exists('app.yaml'): name = options.gae # for backward compatibility if name == 'configure': if PY2: input = raw_input name = input("Your GAE app name: ") content = open(os.path.join('examples', 'app.example.yaml'), 'rb').read() open('app.yaml', 'wb').write(content.replace("yourappname", name)) else: print("app.yaml alreday exists in the web2py folder") if not os.path.exists(''): content = open(os.path.join('handlers', ''), 'rb').read() open('', 'wb').write(content) else: print(" alreday exists in the web2py folder") return logger = logging.getLogger("web2py") logger.setLevel(options.log_level) logging.getLogger().setLevel(options.log_level) # root logger # on new installation build the scaffolding app create_welcome_w2p() if options.run_system_tests: # run system test and exit run_system_tests(options) if options.quiet: # mute existing loggers, to do that iterate # over all loggers (root logger included) and remove # attached logging.StreamHandler instances currently # streaming on sys.stdout or sys.stderr loggers = [logging.getLogger()] loggers.extend(logging.Logger.manager.loggerDict.values()) for l in loggers: if isinstance(l, logging.PlaceHolder): continue for h in l.handlers[:]: if isinstance(h, logging.StreamHandler) and \ in (sys.stdout, sys.stderr): l.removeHandler(h) # this is to avoid the warning # ``No handlers could be found for logger "..."`` # emitted by logging module when no handler is found logging.Logger.manager.emittedNoHandlerWarning = 1 # to prevent writes on stdout set a null stream class NullFile(object): def write(self, x): pass sys.stdout = NullFile() # NOTE: stderr.write() is still working elif not options.no_banner: # banner print(ProgramName) print(ProgramAuthor) print(ProgramVersion) from pydal.drivers import DRIVERS print('Database drivers available: %s' % ', '.join(DRIVERS)) if options.run_doctests: # run doctests and exit test(options.run_doctests, verbose=options.verbose) return if # run interactive shell and exit sys.argv = [ or ''] + options.args run(, plain=options.plain, bpython=options.bpython, import_models=options.import_models,, cron_job=options.cron_job, force_migrate=options.force_migrate, fake_migrate=options.fake_migrate) return # set size of cron thread pools newcron.dancer_size(options.min_threads) newcron.launcher_size(options.cron_threads) if options.cron_run: # run cron (extcron) and exit logger.debug('Running extcron...') global_settings.web2py_crontype = 'external' newcron.extcron(options.folder, apps=options.crontabs) return if not options.with_scheduler and options.schedulers: # run schedulers and exit try: start_schedulers(options) except KeyboardInterrupt: pass return if options.with_cron: if options.soft_cron: print('Using cron software emulation (but this is not very efficient)') global_settings.web2py_crontype = 'soft' else: # start hardcron thread logger.debug('Starting hardcron...') global_settings.web2py_crontype = 'hard' newcron.hardcron(options.folder, apps=options.crontabs).start() # if no password provided and have Tk library start GUI (when not # explicitly disabled), we also need a GUI to put in taskbar (system tray) # when requested root = None if (not options.no_gui and options.password == '<ask>') or options.taskbar: try: if PY2: import Tkinter as tkinter else: import tkinter root = tkinter.Tk() except (ImportError, OSError): logger.warn( 'GUI not available because Tk library is not installed') options.no_gui = True except: logger.exception('cannot get Tk root window, GUI disabled') options.no_gui = True if root: # run GUI and exit root.focus_force() # Mac OS X - make the GUI window rise to the top if os.path.exists("/usr/bin/osascript"): applescript = """ tell application "System Events" set proc to first process whose unix id is %d set frontmost of proc to true end tell """ % (os.getpid()) os.system("/usr/bin/osascript -e '%s'" % applescript) # web2pyDialog takes care of schedulers master = web2pyDialog(root, options) signal.signal(signal.SIGTERM, lambda a, b: master.quit()) try: root.mainloop() except: master.quit() sys.exit() spt = None if options.with_scheduler and options.schedulers: # start schedulers in a separate thread spt = threading.Thread(target=start_schedulers, args=(options,)) spt.start() # start server if options.password == '<ask>': options.password = getpass.getpass('choose a password:') if not options.password and not options.no_banner: print('no password, no web admin interface') # Use first interface IP and port if interfaces specified, since the # interfaces option overrides the IP (and related) options. if not options.interfaces: ip = options.ip port = options.port else: first_if = options.interfaces[0] ip = first_if[0] port = first_if[1] if options.server_key and options.server_cert: proto = 'https' else: proto = 'http' url = get_url(ip, proto=proto, port=port) if not options.no_banner: message = '\nplease visit:\n\t%s\n' if sys.platform.startswith('win'): message += 'use "taskkill /f /pid %i" to shutdown the web2py server\n\n' else: message += 'use "kill -SIGTERM %i" to shutdown the web2py server\n\n' print(message % (url, os.getpid())) # enhance linecache.getline (used by debugger) to look at the source file # if the line was not found (under py2exe & when file was modified) import linecache py2exe_getline = linecache.getline def getline(filename, lineno, *args, **kwargs): line = py2exe_getline(filename, lineno, *args, **kwargs) if not line: try: with open(filename, "rb") as f: for i, line in enumerate(f): line = line.decode('utf-8') if lineno == i + 1: break else: line = '' except (IOError, OSError): line = '' return line linecache.getline = getline server = main.HttpServer(ip=ip, port=port, password=options.password, pid_filename=options.pid_filename, log_filename=options.log_filename, profiler_dir=options.profiler_dir, ssl_certificate=options.server_cert, ssl_private_key=options.server_key, ssl_ca_certificate=options.ca_cert, min_threads=options.min_threads, max_threads=options.max_threads, server_name=options.server_name, request_queue_size=options.request_queue_size, timeout=options.timeout, socket_timeout=options.socket_timeout, shutdown_timeout=options.shutdown_timeout, path=options.folder, interfaces=options.interfaces) try: server.start() except KeyboardInterrupt: server.stop() if spt is not None: try: spt.join() except: logger.exception('error terminating schedulers') logging.shutdown()