#!/usr/bin/env python
import sys
import optparse
import os
import stat
import json
import BaseHTTPServer
from BaseHTTPServer import BaseHTTPRequestHandler
from SocketServer import ThreadingMixIn
import cgi
import re
import datetime
import subprocess
html_header = '''
{title}
'''
html_footer = '''
'''
def cmd(cmd, input=None, env=None):
"""
Run command `cmd` in a shell. `input` (string) is passed in the
process' STDIN.
Returns a dictionary: `{'stdout': , 'stderr': , 'exitcode':
}`.
"""
p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=env)
stdout, stderr = p.communicate(input)
return {
'stdout': stdout,
'stderr': stderr,
'exitcode': p.returncode
}
class FormDefinition:
"""
FormDefinition holds information about a single form and provides methods
for validation of the form values.
"""
def __init__(self, name, title, description, fields, script=None,
submit_title="Submit"):
self.name = name
self.title = title
self.description = description
self.fields = fields
self.script = script
self.submit_title = submit_title
def get_field(self, field_name):
for field in self.fields:
if field['name'] == field_name:
return field
def validate(self, form_values):
"""
Validate all relevant fields for this form against form_values.
"""
values = {}
for field_name in form_values:
if field_name == 'form_name':
continue
v = self.validate_field(field_name,
form_values.getfirst(field_name))
if v is not None:
values[field_name] = v
# Make sure all required fields are there
for field in self.fields:
if 'required' in field and \
field['required'] is True and \
field['name'] not in values:
raise ValueError(
"Required field {} not present".format(field['name']))
return values
def validate_field(self, field_name, value):
"""
Validate a field in this form.
"""
# Find field definition by iterating through all the fields.
field_def = self.get_field(field_name)
if not field_def:
raise KeyError("Unknown field: {}".format(field_name))
field_type = field_def['type']
validate_cb = getattr(self, 'validate_{}'.format(field_type), None)
if not validate_cb:
return value
else:
return validate_cb(field_def, value)
def validate_integer(self, field_def, value):
try:
int(value)
return value
except ValueError:
if field_def.get('required', False):
raise
return None
def validate_float(self, field_def, value):
try:
return float(value)
except ValueError:
if field_def.get('required', False):
raise
return None
def validate_date(self, field_def, value):
m = re.match('([0-9]{4})-([0-9]{2})-([0-9]{2})', value)
if m:
return value
#g = m.groups()
#return datetime.date(int(g[0]), int(g[1]), int(g[2]))
elif field_def.get('required', False):
raise ValueError(
"Invalid value for date field: {}".format(value))
return None
def validate_radio(self, field_def, value):
if not value in [o[0] for o in field_def['options']]:
raise ValueError(
"Invalid value for radio button: {}".format(value))
return value
def validate_select(self, field_def, value):
if not value in [o[0] for o in field_def['options']]:
raise ValueError(
"Invalid value for dropdown: {}".format(value))
return value
class ThreadedHTTPServer(ThreadingMixIn, BaseHTTPServer.HTTPServer):
pass
class WebSrv:
"""
Very basic web server.
"""
def __init__(self, request_handler, listen_addr='', listen_port=80):
httpd = ThreadedHTTPServer((listen_addr, listen_port), request_handler)
httpd.serve_forever()
class WebAppHandler(BaseHTTPRequestHandler):
"""
Basic web server request handler. Handles GET and POST requests. This class
should be extended with methods (starting with 'h_') to handle the actual
requests. If no path is set, it dispatches to the 'index' or 'default'
method.
"""
def do_GET(self):
self.call(*self.parse(self.path))
def do_POST(self):
form_values = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={'REQUEST_METHOD': 'POST'})
self.call(self.path.strip('/'), params={'form_values': form_values})
def parse(self, reqinfo):
if '?' in reqinfo:
path, params = reqinfo.split('?', 1)
params = dict(
[p.split('=', 1) for p in params.split('&') if '=' in p]
)
return (path.strip('/'), params)
else:
return (self.path.strip('/'), {})
def call(self, path, params):
"""
Find a method to call on self.app_class based on `path` and call it.
"""
response_code = 200
method_name = 'h_{}'.format(path)
try:
if hasattr(self, method_name) and \
callable(getattr(self, method_name)):
out = getattr(self, method_name)(**params)
elif path == '' and hasattr(self, 'index'):
out = self.index(**params)
elif hasattr(self, 'default'):
out = self.default(**params)
else:
response_code = 404
out = 'Not Found'
except Exception, e:
self.wfile.write(e)
raise
self.send_response(response_code)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(out)
class ScriptFormWebApp(WebAppHandler):
"""
This class is a request handler for WebSrv.
"""
def index(self):
return self.h_list()
def h_list(self):
h_form_list = []
for form_name, form_def in self.vitaform.forms.items():
h_form_list.append('''
'''.format(title=field['title'],
input=input))
form = self.vitaform.get_form(form_name)
return '''
{header}
{title}
{description}
{footer}
'''.format(
header=html_header.format(title=self.vitaform.title),
footer=html_footer,
title=form.title,
description=form.description,
name=form.name,
fields=''.join([render_field(f) for f in form.fields]),
submit_title=form.submit_title
)
def h_submit(self, form_values):
form_name = form_values.getfirst('form_name', None)
# Validate the form values
form = self.vitaform.get_form(form_name)
values = form.validate(form_values)
# Call user's callback
if form.script:
# Run an external script
env = os.environ.copy()
env.update(values)
res = cmd(form.script, env=env)
if res['exitcode'] != 0:
result = '{}'.format(res['stderr'])
else:
result = '
{}
'.format(res['stdout'])
else:
# Run a python callback
callback = self.callbacks[form_name]
result = callback(values)
return '''
{header}