#!/usr/bin/env python # Todo: # # - file uploads should be saved to temp files and passed to the callback. # - Ignore non-existing temp files in upload cleanup. # - How does script_raw check the exitcode? Document this. # - Validate field values properly. # * Integer/float min, max # * Uploaded files mime-types/extensions import sys import optparse import os import stat import json import BaseHTTPServer from BaseHTTPServer import BaseHTTPRequestHandler from SocketServer import ThreadingMixIn import cgi import re import datetime import subprocess html_header = '''

{title}

''' html_footer = '''
''' def cmd(cmd, input=None, env=None): """ Run command `cmd` in a shell. `input` (string) is passed in the process' STDIN. Returns a dictionary: `{'stdout': , 'stderr': , 'exitcode': }`. """ p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) stdout, stderr = p.communicate(input) return { 'stdout': stdout, 'stderr': stderr, 'exitcode': p.returncode } class FormDefinition: """ FormDefinition holds information about a single form and provides methods for validation of the form values. """ def __init__(self, name, title, description, fields, script=None, submit_title="Submit"): self.name = name self.title = title self.description = description self.fields = fields self.script = script self.submit_title = submit_title def get_field(self, field_name): for field in self.fields: if field['name'] == field_name: return field def validate(self, form_values): """ Validate all relevant fields for this form against form_values. """ values = {} for field_name in form_values: if field_name == 'form_name': continue v = self.validate_field(field_name, form_values.getfirst(field_name)) if v is not None: values[field_name] = v # Make sure all required fields are there for field in self.fields: if 'required' in field and \ field['required'] is True and \ field['name'] not in values: raise ValueError( "Required field {} not present".format(field['name'])) return values def validate_field(self, field_name, value): """ Validate a field in this form. """ # Find field definition by iterating through all the fields. field_def = self.get_field(field_name) if not field_def: raise KeyError("Unknown field: {}".format(field_name)) field_type = field_def['type'] validate_cb = getattr(self, 'validate_{}'.format(field_type), None) if not validate_cb: return value else: return validate_cb(field_def, value) def validate_integer(self, field_def, value): try: int(value) return value except ValueError: if field_def.get('required', False): raise return None def validate_float(self, field_def, value): try: return float(value) except ValueError: if field_def.get('required', False): raise return None def validate_date(self, field_def, value): m = re.match('([0-9]{4})-([0-9]{2})-([0-9]{2})', value) if m: return value elif field_def.get('required', False): raise ValueError( "Invalid value for date field: {}".format(value)) return None def validate_radio(self, field_def, value): if not value in [o[0] for o in field_def['options']]: raise ValueError( "Invalid value for radio button: {}".format(value)) return value def validate_select(self, field_def, value): if not value in [o[0] for o in field_def['options']]: raise ValueError( "Invalid value for dropdown: {}".format(value)) return value class ThreadedHTTPServer(ThreadingMixIn, BaseHTTPServer.HTTPServer): pass class WebSrv: """ Very basic web server. """ def __init__(self, request_handler, listen_addr='', listen_port=80): httpd = ThreadedHTTPServer((listen_addr, listen_port), request_handler) httpd.serve_forever() class WebAppHandler(BaseHTTPRequestHandler): """ Basic web server request handler. Handles GET and POST requests. This class should be extended with methods (starting with 'h_') to handle the actual requests. If no path is set, it dispatches to the 'index' or 'default' method. """ def do_GET(self): self.call(*self.parse(self.path)) def do_POST(self): form_values = cgi.FieldStorage( fp=self.rfile, headers=self.headers, environ={'REQUEST_METHOD': 'POST'}) self.call(self.path.strip('/'), params={'form_values': form_values}) def parse(self, reqinfo): if '?' in reqinfo: path, params = reqinfo.split('?', 1) params = dict( [p.split('=', 1) for p in params.split('&') if '=' in p] ) return (path.strip('/'), params) else: return (self.path.strip('/'), {}) def call(self, path, params): """ Find a method to call on self.app_class based on `path` and call it. """ response_code = 200 method_name = 'h_{}'.format(path) try: if hasattr(self, method_name) and \ callable(getattr(self, method_name)): out = getattr(self, method_name)(**params) elif path == '' and hasattr(self, 'index'): out = self.index(**params) elif hasattr(self, 'default'): out = self.default(**params) else: response_code = 404 out = 'Not Found' except Exception, e: self.wfile.write(e) raise self.send_response(response_code) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(out) class ScriptFormWebApp(WebAppHandler): """ This class is a request handler for WebSrv. """ def index(self): return self.h_list() def h_list(self): h_form_list = [] for form_name, form_def in self.vitaform.forms.items(): h_form_list.append('''
  • {title}

    {description}

    {title}
  • '''.format(title=form_def.title, description=form_def.description, name=form_name) ) return ''' {header}
    {form_list}
    {footer} '''.format(header=html_header.format(title=self.vitaform.title), footer=html_footer, form_list=''.join(h_form_list)) def h_form(self, form_name): field_tpl = { "string": '', "number": '', "integer": '', "float": '', "date": '', "file": '', "password": '', "text": '', "select": '', "radio": '{}
    ', } def render_field(field): tpl = field_tpl[field['type']] required = '' if field.get('required', None): required='required' if field['type'] == 'string': input = tpl.format(required, field['name']) elif field['type'] == 'number' or \ field['type'] == 'integer' or \ field['type'] == 'float': input = tpl.format(required, field.get('min', ''), field.get('max', ''), field['name']) elif field['type'] == 'date': input = tpl.format(required, field['name']) elif field['type'] == 'file': input = tpl.format(required, field['name']) elif field['type'] == 'password': input = tpl.format(required, field['name']) elif field['type'] == 'radio': input = ''.join( [ tpl.format(field['name'], o[0], o[1]) for o in field['options'] ] ) elif field['type'] == 'text': input = tpl.format(required, field['name']) elif field['type'] == 'select': options = ''.join([ tpl.format(o[0], o[1]) for o in field['options'] ] ) input = ''.format(required, field['name'], options) else: raise ValueError("Unsupported field type: {}".format( field['type']) ) return ('''
  • {title}

    {input}

  • '''.format(title=field['title'], input=input)) form = self.vitaform.get_form(form_name) return ''' {header}

    {title}

    {description}

      {fields}
    {footer} '''.format( header=html_header.format(title=self.vitaform.title), footer=html_footer, title=form.title, description=form.description, name=form.name, fields=''.join([render_field(f) for f in form.fields]), submit_title=form.submit_title ) def h_submit(self, form_values): form_name = form_values.getfirst('form_name', None) # Validate the form values form = self.vitaform.get_form(form_name) values = form.validate(form_values) # Call user's callback if form.script: # Run an external script env = os.environ.copy() env.update(values) res = cmd(form.script, env=env) if res['exitcode'] != 0: result = '{}'.format(res['stderr']) else: result = '
    {}
    '.format(res['stdout']) else: # Run a python callback callback = self.callbacks[form_name] result = callback(values) return ''' {header}

    {title}

    Result

    {result}
    {footer} '''.format( header=html_header.format(title=self.vitaform.title), footer=html_footer, title=form.title, form_name=form.name, result=result, ) class ScriptForm: """ 'Main' class that orchestrates parsing the Form definition file `config_file`, hooking up callbacks and running the webserver. """ def __init__(self, config_file, callbacks={}): self.forms = {} self.callbacks = {} self.title = 'ScriptForm Actions' self.basepath = os.path.realpath(os.path.dirname(config_file)) self._load_config(config_file) for form_name, cb in callbacks.items(): self.callbacks[form_name] = cb # Validate scripts for form_name, form_def in self.forms.items(): if form_def.script: if not stat.S_IXUSR & os.stat(form_def.script)[stat.ST_MODE]: raise Exception("{} is not executable".format(form_def.script)) else: if not form_name in self.callbacks: raise Exception("No script or callback registered for '{}'".format(form_name)) def _load_config(self, path): config = json.load(file(path, 'r')) if 'title' in config: self.title = config['title'] for form_name, form in config['forms'].items(): if 'script' in form: script = os.path.join(self.basepath, form['script']) else: script = None self.forms[form_name] = \ FormDefinition(form_name, form['title'], form['description'], form['fields'], script, submit_title=form.get('submit_title', None)) def get_form(self, form_name): return self.forms[form_name] def run(self, listen_addr='0.0.0.0', listen_port=80): ScriptFormWebApp.vitaform = self ScriptFormWebApp.callbacks = self.callbacks WebSrv(ScriptFormWebApp, listen_addr=listen_addr, listen_port=listen_port) if __name__ == "__main__": parser = optparse.OptionParser() parser.set_usage(sys.argv[0] + " [option] ") parser.add_option("-p", "--port", dest="port", action="store", type="int", default=80, help="Port to listen on.") (options, args) = parser.parse_args() if len(args) < 1: parser.error("Insufficient number of arguments") sf = ScriptForm(args[0]) sf.run(listen_port=options.port)