blob: 79c3599038f583fa383170f724d63b20d5ace355 [file] [log] [blame]
"""
A local HTTP server for Android OTA package generation.
Based on OTA_from_target_files.py
Usage::
python ./web_server.py [<port>]
API::
GET /check : check the status of all jobs
GET /check/<id> : check the status of the job with <id>
GET /file : fetch the target file list
GET /file/<path> : Add build file(s) in <path>, and return the target file list
GET /download/<id> : download the ota package with <id>
POST /run/<id> : submit a job with <id>,
arguments set in a json uploaded together
POST /file/<filename> : upload a target file
[TODO] POST /cancel/<id> : cancel a job with <id>
TODO:
- Avoid unintentionally path leakage
- Avoid overwriting build when uploading build with same file name
Other GET request will be redirected to the static request under 'dist' directory
"""
from http.server import BaseHTTPRequestHandler, SimpleHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from threading import Lock
from ota_interface import ProcessesManagement
from target_lib import TargetLib
import logging
import json
import pipes
import cgi
import subprocess
import os
import sys
LOCAL_ADDRESS = '0.0.0.0'
class CORSSimpleHTTPHandler(SimpleHTTPRequestHandler):
def end_headers(self):
try:
origin_address, _ = cgi.parse_header(self.headers['Origin'])
self.send_header('Access-Control-Allow-Credentials', 'true')
self.send_header('Access-Control-Allow-Origin', origin_address)
except TypeError:
pass
super().end_headers()
class RequestHandler(CORSSimpleHTTPHandler):
def _set_response(self, code=200, type='text/html'):
self.send_response(code)
self.send_header('Content-type', type)
self.end_headers()
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Methods', 'GET, OPTIONS')
self.send_header("Access-Control-Allow-Headers", "X-Requested-With")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
def do_GET(self):
if self.path.startswith('/check'):
if self.path == '/check' or self.path == '/check/':
status = jobs.get_status()
self._set_response(type='application/json')
self.wfile.write(
json.dumps(status).encode()
)
else:
id = self.path[7:]
status = jobs.get_status_by_ID(id=id, details=True)
self._set_response(type='application/json')
self.wfile.write(
json.dumps(status).encode()
)
logging.info(
"GET request:\nPath:%s\nHeaders:\n%s\nBody:\n%s\n",
str(self.path), str(self.headers), status
)
return
elif self.path.startswith('/file'):
if self.path == '/file' or self.path == '/file/':
file_list = target_lib.get_builds()
else:
file_list = target_lib.new_build_from_dir(self.path[6:])
builds_info = [build.to_dict() for build in file_list]
self._set_response(type='application/json')
self.wfile.write(
json.dumps(builds_info).encode()
)
logging.info(
"GET request:\nPath:%s\nHeaders:\n%s\nBody:\n%s\n",
str(self.path), str(self.headers), file_list
)
return
elif self.path.startswith('/download'):
self.path = self.path[10:]
return CORSSimpleHTTPHandler.do_GET(self)
else:
self.path = '/dist' + self.path
return CORSSimpleHTTPHandler.do_GET(self)
def do_POST(self):
if self.path.startswith('/run'):
content_type, _ = cgi.parse_header(self.headers['content-type'])
if content_type != 'application/json':
self.send_response(400)
self.end_headers()
return
content_length = int(self.headers['Content-Length'])
post_data = json.loads(self.rfile.read(content_length))
try:
jobs.ota_generate(post_data, id=str(self.path[5:]))
self._set_response(code=201)
self.wfile.write(
"ota generator start running".encode('utf-8'))
except SyntaxError:
self.send_error(400)
logging.info(
"POST request:\nPath:%s\nHeaders:\n%s\nBody:\n%s\n",
str(self.path), str(self.headers),
json.dumps(post_data)
)
elif self.path.startswith('/file'):
file_name = os.path.join('target', self.path[6:])
file_length = int(self.headers['Content-Length'])
with open(file_name, 'wb') as output_file:
# Unwrap the uploaded file first (due to the usage of FormData)
# The wrapper has a boundary line at the top and bottom
# and some file information in the beginning
# There are a file content line, a file name line, and an empty line
# The boundary line in the bottom is 4 bytes longer than the top one
# Please refer to the following links for more details:
# https://stackoverflow.com/questions/8659808/how-does-http-file-upload-work
# https://datatracker.ietf.org/doc/html/rfc1867
upper_boundary = self.rfile.readline()
file_length -= len(upper_boundary) * 2 + 4
file_length -= len(self.rfile.readline())
file_length -= len(self.rfile.readline())
file_length -= len(self.rfile.readline())
output_file.write(self.rfile.read(file_length))
target_lib.new_build(self.path[6:], file_name)
self._set_response(code=201)
self.wfile.write(
"File received, saved into {}".format(
file_name).encode('utf-8')
)
else:
self.send_error(400)
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
pass
def run_server(SeverClass=ThreadedHTTPServer, HandlerClass=RequestHandler, port=8000):
logging.basicConfig(level=logging.DEBUG)
server_address = (LOCAL_ADDRESS, port)
server_instance = SeverClass(server_address, HandlerClass)
try:
logging.info(
'Server is on, address:\n %s',
'http://' + str(server_address[0]) + ':' + str(port))
server_instance.serve_forever()
except KeyboardInterrupt:
pass
server_instance.server_close()
logging.info('Server has been turned off.')
if __name__ == '__main__':
from sys import argv
print(argv)
if not os.path.isdir('target'):
os.mkdir('target', 755)
if not os.path.isdir('output'):
os.mkdir('output', 755)
target_lib = TargetLib()
jobs = ProcessesManagement()
try:
if len(argv) == 2:
run_server(port=int(argv[1]))
else:
run_server()
except KeyboardInterrupt:
sys.exit(0)