1. WSGI简介
什么是WSGI?
WSGI(Web Server Gateway Interface,Web服务器网关接口)是Python Web应用程序和Web服务器之间的标准接口。它在2003年由PEP 333提出,2010年更新为PEP 3333(添加了Python 3支持)。
WSGI的核心目标
- 标准化:统一Python Web框架和Web服务器之间的接口
- 可移植性:应用程序可以在任何兼容的服务器上运行
- 互操作性:不同框架和服务器可以互相组合
- 简单性:接口设计简单明了
为什么需要WSGI?
在WSGI之前,每个Python Web框架(如Zope、Django)都需要特定的Web服务器或适配器,造成了生态系统碎片化。WSGI通过定义清晰的契约,让框架开发者专注业务逻辑,服务器开发者专注性能与稳定性,中间件开发者专注横切关注点——这种分层解耦正是现代后端 & 架构演进的关键范式。
2. WSGI规范详解
WSGI接口定义
WSGI规范定义了三个核心组件:
- 应用程序(Application):可调用对象(函数、类等)
- 服务器/网关(Server/Gateway):接收HTTP请求,调用应用程序
- 中间件(Middleware):同时扮演服务器和应用程序的角色
应用程序接口
最基础的WSGI应用程序是一个接受两个参数、返回可迭代响应体的函数:
def simple_app(environ, start_response):
"""
最简单的WSGI应用程序
environ: 包含请求信息的字典
start_response: 用于开始HTTP响应的回调函数
返回值: 响应体的可迭代对象(通常是字符串列表)
"""
status = '200 OK'
headers = [('Content-Type', 'text/plain; charset=utf-8')]
start_response(status, headers)
return [b'Hello, WSGI World!\n']
environ字典详解
environ 是一个由服务器注入的字典,包含完整的请求上下文。其键分为三类:CGI标准变量、WSGI必需变量、HTTP头部(自动添加 HTTP_ 前缀):
# environ字典包含的重要键:
environ = {
# 必需的环境变量
'REQUEST_METHOD': 'GET', # HTTP方法
'SCRIPT_NAME': '', # 应用根路径
'PATH_INFO': '/path/to/resource', # 请求路径
'QUERY_STRING': 'param=value', # 查询字符串
'SERVER_NAME': 'localhost', # 服务器名
'SERVER_PORT': '8080', # 服务器端口
# WSGI必需的变量
'wsgi.version': (1, 0), # WSGI版本
'wsgi.url_scheme': 'http', # URL方案
'wsgi.input': input_stream, # 输入流
'wsgi.errors': error_stream, # 错误流
'wsgi.multithread': True/False, # 是否多线程
'wsgi.multiprocess': True/False, # 是否多进程
'wsgi.run_once': True/False, # 是否单次运行
# HTTP头部(添加HTTP_前缀)
'HTTP_HOST': 'localhost:8080',
'HTTP_USER_AGENT': 'Mozilla/5.0',
'HTTP_ACCEPT': 'text/html',
# 其他CGI标准变量
'SERVER_PROTOCOL': 'HTTP/1.1',
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'CONTENT_LENGTH': '123',
}
start_response函数
start_response 是服务器提供的回调函数,用于声明响应状态与头部:
def start_response(status, response_headers, exc_info=None):
"""
开始HTTP响应的回调函数
status: 状态字符串,如 '200 OK'
response_headers: 头部元组列表,如 [('Content-Type', 'text/html')]
exc_info: 异常信息元组(可选)
返回值: write()函数(可选)
"""
# 服务器必须设置状态和头部
# 可能返回一个write()函数用于写入响应体
pass
3. 实现WSGI服务器
基本WSGI服务器实现
以下是一个基于 socket 的轻量级WSGI服务器实现,具备多线程处理能力,可用于教学与调试:
# simple_wsgi_server.py
import socket
import threading
import io
import sys
from urllib.parse import parse_qs, urlparse
from datetime import datetime
class SimpleWSGIServer:
"""简单的WSGI服务器实现"""
def __init__(self, host='localhost', port=8080):
self.host = host
self.port = port
self.app = None
self.server_socket = None
self.running = False
def set_app(self, app):
"""设置WSGI应用程序"""
self.app = app
def serve_forever(self):
"""启动服务器"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
print(f"WSGI服务器启动在 http://{self.host}:{self.port}")
try:
while self.running:
client_socket, address = self.server_socket.accept()
print(f"[{datetime.now()}] 客户端连接: {address}")
thread = threading.Thread(
target=self.handle_request,
args=(client_socket,)
)
thread.daemon = True
thread.start()
except KeyboardInterrupt:
print("\n服务器关闭中...")
finally:
self.server_socket.close()
def handle_request(self, client_socket):
"""处理单个HTTP请求"""
try:
# 接收请求数据
request_data = client_socket.recv(1024)
if not request_data:
return
# 解析请求
request_lines = request_data.decode('utf-8').split('\r\n')
request_line = request_lines[0]
method, path, version = request_line.split(' ')
# 解析查询参数
parsed_path = urlparse(path)
path_info = parsed_path.path
query_string = parsed_path.query
# 解析请求头
headers = {}
content_length = 0
for line in request_lines[1:]:
if not line:
break
if ': ' in line:
key, value = line.split(': ', 1)
headers[key.lower()] = value
if key.lower() == 'content-length':
content_length = int(value)
# 读取请求体(如果有)
body = b''
if content_length > 0:
body = client_socket.recv(content_length)
# 构建environ字典
environ = self.make_environ(
method, path_info, query_string,
headers, body, client_socket
)
# 准备start_response参数
response_status = [None]
response_headers = []
def start_response(status, headers, exc_info=None):
"""WSGI start_response回调"""
if exc_info:
# 处理异常
raise exc_info[0].with_traceback(exc_info[1], exc_info[2])
response_status[0] = status
response_headers.extend(headers)
return lambda data: None # write函数(这里不需要)
# 调用WSGI应用程序
if self.app:
app_iter = self.app(environ, start_response)
# 构建响应
response = self.build_response(
response_status[0],
response_headers,
app_iter
)
# 发送响应
client_socket.sendall(response)
else:
# 如果没有应用,返回404
response = b'HTTP/1.1 404 Not Found\r\n\r\nNo WSGI app configured'
client_socket.sendall(response)
except Exception as e:
print(f"处理请求错误: {e}")
error_response = (
b'HTTP/1.1 500 Internal Server Error\r\n'
b'Content-Type: text/plain\r\n'
b'\r\n'
f'Server Error: {e}'.encode()
)
client_socket.sendall(error_response)
finally:
client_socket.close()
def make_environ(self, method, path_info, query_string, headers, body, client_socket):
"""构建environ字典"""
# 解析客户端地址
client_address = client_socket.getpeername()
# 构建environ
environ = {
# CGI标准变量
'REQUEST_METHOD': method,
'SCRIPT_NAME': '', # 应用挂载点
'PATH_INFO': path_info,
'QUERY_STRING': query_string,
'CONTENT_TYPE': headers.get('content-type', ''),
'CONTENT_LENGTH': str(len(body)),
'SERVER_NAME': self.host,
'SERVER_PORT': str(self.port),
'SERVER_PROTOCOL': 'HTTP/1.1',
# WSGI必需变量
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': io.BytesIO(body),
'wsgi.errors': sys.stderr,
'wsgi.multithread': True,
'wsgi.multiprocess': False,
'wsgi.run_once': False,
# 客户端信息
'REMOTE_ADDR': client_address[0],
'REMOTE_PORT': str(client_address[1]),
}
# 添加HTTP头部
for key, value in headers.items():
if key.startswith('content-'):
continue # 已经处理过了
wsgi_key = 'HTTP_' + key.upper().replace('-', '_')
environ[wsgi_key] = value
return environ
def build_response(self, status, headers, app_iter):
"""构建HTTP响应"""
# 响应状态行
response = [f'HTTP/1.1 {status}\r\n'.encode()]
# 响应头部
for key, value in headers:
response.append(f'{key}: {value}\r\n'.encode())
response.append(b'\r\n')
# 响应体
for chunk in app_iter:
if isinstance(chunk, str):
chunk = chunk.encode('utf-8')
response.append(chunk)
# 关闭可迭代对象(如果支持)
if hasattr(app_iter, 'close'):
app_iter.close()
return b''.join(response)
# 测试应用程序
def test_app(environ, start_response):
"""测试WSGI应用"""
# 解析查询参数
query_string = environ.get('QUERY_STRING', '')
params = parse_qs(query_string)
# 获取路径
path = environ.get('PATH_INFO', '/')
# 构建响应
status = '200 OK'
headers = [
('Content-Type', 'text/html; charset=utf-8'),
('Server', 'SimpleWSGIServer/1.0')
]
start_response(status, headers)
# 响应体
body = f"""
<!DOCTYPE html>
<html>
<head><title>WSGI Test</title></head>
<body>
<h1>WSGI Test Application</h1>
<p>Path: {path}</p>
<p>Method: {environ['REQUEST_METHOD']}</p>
<p>Query Parameters: {params}</p>
<p>Time: {datetime.now()}</p>
<p>WSGI Version: {environ['wsgi.version']}</p>
</body>
</html>
"""
return [body.encode('utf-8')]
def run_simple_server():
"""运行简单WSGI服务器"""
server = SimpleWSGIServer(port=8080)
server.set_app(test_app)
server.serve_forever()
if __name__ == '__main__':
run_simple_server()
4. 实现WSGI应用程序
类实现的WSGI应用
相比函数式,类封装更利于路由管理与状态维护:
# wsgi_applications.py
class WSGIApplication:
"""基于类的WSGI应用"""
def __init__(self):
self.routes = {}
self.middleware = []
def route(self, path, methods=None):
"""路由装饰器"""
if methods is None:
methods = ['GET']
def decorator(handler):
self.routes[(path, tuple(methods))] = handler
return handler
return decorator
def __call__(self, environ, start_response):
"""WSGI应用接口"""
# 解析请求
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
# 查找路由
handler = None
for (route_path, route_methods), route_handler in self.routes.items():
if path == route_path and method in route_methods:
handler = route_handler
break
if handler:
try:
# 调用处理函数
response = handler(environ)
status = response.get('status', '200 OK')
headers = response.get('headers', [])
body = response.get('body', '')
start_response(status, headers)
return [body.encode('utf-8')]
except Exception as e:
status = '500 Internal Server Error'
headers = [('Content-Type', 'text/plain')]
body = f'Server Error: {e}'
start_response(status, headers)
return [body.encode('utf-8')]
else:
# 404 Not Found
status = '404 Not Found'
headers = [('Content-Type', 'text/html')]
body = f'<h1>404 Not Found</h1><p>Path: {path}</p>'
start_response(status, headers)
return [body.encode('utf-8')]
# 使用示例
def create_sample_app():
"""创建示例应用"""
app = WSGIApplication()
@app.route('/', methods=['GET'])
def home(environ):
return {
'status': '200 OK',
'headers': [('Content-Type', 'text/html')],
'body': '''
<html>
<head><title>Home</title></head>
<body>
<h1>Welcome!</h1>
<p><a href="/about">About</a></p>
<p><a href="/api/hello">API</a></p>
</body>
</html>
'''
}
@app.route('/about', methods=['GET'])
def about(environ):
return {
'status': '200 OK',
'headers': [('Content-Type', 'text/html')],
'body': '''
<html>
<head><title>About</title></head>
<body>
<h1>About This App</h1>
<p>This is a simple WSGI application.</p>
<p><a href="/">Home</a></p>
</body>
</html>
'''
}
@app.route('/api/hello', methods=['GET'])
def api_hello(environ):
import json
data = {
'message': 'Hello, World!',
'timestamp': datetime.now().isoformat(),
'path': environ['PATH_INFO']
}
return {
'status': '200 OK',
'headers': [('Content-Type', 'application/json')],
'body': json.dumps(data, indent=2)
}
return app
# 运行应用
if __name__ == '__main__':
from simple_wsgi_server import SimpleWSGIServer
app = create_sample_app()
server = SimpleWSGIServer(port=8080)
server.set_app(app)
server.serve_forever()
更完整的框架式实现
下面是一个支持正则路由、中间件、错误处理器的迷你框架,已具备生产可用雏形:
# mini_framework.py
import json
import re
from urllib.parse import parse_qs, urlparse
class Request:
"""HTTP请求包装类"""
def __init__(self, environ):
self.environ = environ
self.method = environ['REQUEST_METHOD']
self.path = environ['PATH_INFO']
self.query = parse_qs(environ.get('QUERY_STRING', ''))
self.headers = self._extract_headers()
self.body = self._read_body()
def _extract_headers(self):
"""提取HTTP头部"""
headers = {}
for key, value in self.environ.items():
if key.startswith('HTTP_'):
header_name = key[5:].replace('_', '-').title()
headers[header_name] = value
return headers
def _read_body(self):
"""读取请求体"""
content_length = int(self.environ.get('CONTENT_LENGTH', 0))
if content_length > 0:
return self.environ['wsgi.input'].read(content_length)
return b''
@property
def json(self):
"""解析JSON请求体"""
if self.body:
try:
return json.loads(self.body.decode('utf-8'))
except json.JSONDecodeError:
return None
return None
@property
def form(self):
"""解析表单数据"""
content_type = self.environ.get('CONTENT_TYPE', '')
if 'application/x-www-form-urlencoded' in content_type:
return parse_qs(self.body.decode('utf-8'))
return {}
class Response:
"""HTTP响应类"""
def __init__(self, body='', status=200, content_type='text/plain'):
self.body = body
self.status = status
self.content_type = content_type
self.headers = []
def set_header(self, key, value):
"""设置响应头"""
self.headers.append((key, value))
def as_wsgi(self):
"""转换为WSGI响应"""
status_map = {
200: '200 OK',
201: '201 Created',
204: '204 No Content',
301: '301 Moved Permanently',
302: '302 Found',
400: '400 Bad Request',
401: '401 Unauthorized',
403: '403 Forbidden',
404: '404 Not Found',
500: '500 Internal Server Error'
}
status_text = status_map.get(self.status, '200 OK')
# 默认头部
default_headers = [
(f'Content-Type', f'{self.content_type}; charset=utf-8'),
('Content-Length', str(len(self.body.encode('utf-8'))) if self.body else '0')
]
all_headers = default_headers + self.headers
def start_response(status, headers):
"""WSGI start_response"""
pass
return status_text, all_headers, [self.body.encode('utf-8')] if self.body else []
class MiniFramework:
"""迷你Web框架"""
def __init__(self):
self.routes = []
self.error_handlers = {}
self.middleware = []
def route(self, pattern, methods=None):
"""路由装饰器,支持正则表达式"""
if methods is None:
methods = ['GET']
def decorator(handler):
self.routes.append((re.compile(pattern), methods, handler))
return handler
return decorator
def errorhandler(self, code):
"""错误处理器装饰器"""
def decorator(handler):
self.error_handlers[code] = handler
return handler
return decorator
def add_middleware(self, middleware):
"""添加中间件"""
self.middleware.append(middleware)
def __call__(self, environ, start_response):
"""WSGI应用接口"""
try:
# 创建请求对象
request = Request(environ)
# 应用中间件
for middleware in self.middleware:
result = middleware(request)
if result:
# 如果中间件返回响应,直接返回
status, headers, body = result.as_wsgi()
start_response(status, headers)
return body
# 查找匹配的路由
handler = None
match_dict = {}
for pattern, methods, route_handler in self.routes:
if request.method in methods:
match = pattern.match(request.path)
if match:
handler = route_handler
match_dict = match.groupdict()
break
if handler:
# 调用处理函数
response = handler(request, **match_dict)
if isinstance(response, tuple):
# 如果是元组,解包为(body, status, headers)
if len(response) == 2:
body, status = response
headers = []
elif len(response) == 3:
body, status, headers = response
else:
raise ValueError("响应元组格式错误")
response_obj = Response(body, status)
for key, value in headers:
response_obj.set_header(key, value)
elif isinstance(response, Response):
response_obj = response
else:
# 假设是字符串
response_obj = Response(response)
status_text, headers, body = response_obj.as_wsgi()
start_response(status_text, headers)
return body
else:
# 404 Not Found
if 404 in self.error_handlers:
response = self.error_handlers[404](request)
else:
response = Response('404 Not Found', 404)
status_text, headers, body = response.as_wsgi()
start_response(status_text, headers)
return body
except Exception as e:
# 处理异常
print(f"Unhandled exception: {e}")
if 500 in self.error_handlers:
response = self.error_handlers[500](Request(environ))
else:
response = Response(f'500 Internal Server Error: {e}', 500)
status_text, headers, body = response.as_wsgi()
start_response(status_text, headers)
return body
# 使用示例
def create_mini_app():
"""创建迷你框架应用"""
app = MiniFramework()
@app.route(r'^/$')
def home(request):
return '''
<html>
<head><title>Home</title></head>
<body>
<h1>Mini Framework</h1>
<ul>
<li><a href="/hello">Hello</a></li>
<li><a href="/user/123">User 123</a></li>
<li><a href="/api/data">API Data</a></li>
<li><a href="/about">About</a></li>
</ul>
</body>
</html>
'''
@app.route(r'^/hello$')
def hello(request):
name = request.query.get('name', ['World'])[0]
return f'''
<html>
<head><title>Hello</title></head>
<body>
<h1>Hello, {name}!</h1>
<form method="GET">
<input type="text" name="name" placeholder="Your name">
<input type="submit" value="Greet">
</form>
<p><a href="/">Home</a></p>
</body>
</html>
'''
@app.route(r'^/user/(?P<user_id>\d+)$')
def user_profile(request, user_id):
return f'''
<html>
<head><title>User {user_id}</title></head>
<body>
<h1>User Profile</h1>
<p>User ID: {user_id}</p>
<p><a href="/">Home</a></p>
</body>
</html>
'''
@app.route(r'^/api/data$', methods=['GET', 'POST'])
def api_data(request):
if request.method == 'POST':
data = request.json or request.form
return json.dumps({
'status': 'success',
'message': 'Data received',
'data': data
}, indent=2), 200, [('Content-Type', 'application/json')]
else:
return json.dumps({
'status': 'success',
'data': [1, 2, 3, 4, 5],
'timestamp': datetime.now().isoformat()
}, indent=2), 200, [('Content-Type', 'application/json')]
@app.route(r'^/about$')
def about(request):
return '''
<html>
<head><title>About</title></head>
<body>
<h1>About This Framework</h1>
<p>This is a mini WSGI-based web framework.</p>
<p><a href="/">Home</a></p>
</body>
</html>
'''
# 错误处理
@app.errorhandler(404)
def not_found(request):
return Response(f'''
<html>
<head><title>404 Not Found</title></head>
<body>
<h1>404 Not Found</h1>
<p>The requested path "{request.path}" was not found.</p>
<p><a href="/">Home</a></p>
</body>
</html>
''', 404, 'text/html')
@app.errorhandler(500)
def server_error(request):
return Response('''
<html>
<head><title>500 Server Error</title></head>
<body>
<h1>500 Internal Server Error</h1>
<p>Something went wrong on our end.</p>
<p><a href="/">Home</a></p>
</body>
</html>
''', 500, 'text/html')
return app
# 运行框架
if __name__ == '__main__':
from simple_wsgi_server import SimpleWSGIServer
app = create_mini_app()
server = SimpleWSGIServer(port=8080)
server.set_app(app)
server.serve_forever()
5. WSGI中间件实现
中间件是WSGI生态的灵魂,它遵循“洋葱模型”,层层包裹请求与响应。
中间件基类
所有中间件都继承自统一基类,确保行为一致性:
# wsgi_middleware.py
class WSGIMiddleware:
"""WSGI中间件基类"""
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
"""必须由子类实现"""
raise NotImplementedError
class LoggingMiddleware(WSGIMiddleware):
"""日志中间件"""
def __call__(self, environ, start_response):
# 记录请求开始
start_time = datetime.now()
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
print(f"[{start_time}] {method} {path} - Request started")
# 包装start_response以记录响应状态
def custom_start_response(status, headers, exc_info=None):
# 调用原始的start_response
result = start_response(status, headers, exc_info)
# 记录响应状态
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print(f"[{end_time}] {method} {path} - {status} ({duration:.3f}s)")
return result
# 调用下一个应用或中间件
return self.app(environ, custom_start_response)
class AuthenticationMiddleware(WSGIMiddleware):
"""身份验证中间件"""
def __init__(self, app, api_keys=None):
super().__init__(app)
self.api_keys = api_keys or {}
def __call__(self, environ, start_response):
# 检查需要认证的路径
path = environ['PATH_INFO']
if path.startswith('/api/'):
# 提取API密钥
api_key = environ.get('HTTP_X_API_KEY', '')
if not api_key or api_key not in self.api_keys:
# 认证失败
status = '401 Unauthorized'
headers = [('Content-Type', 'text/plain')]
start_response(status, headers)
return [b'Authentication required']
# 认证通过,继续处理
return self.app(environ, start_response)
class CORSMiddleware(WSGIMiddleware):
"""CORS中间件"""
def __init__(self, app, allow_origins=None, allow_methods=None, allow_headers=None):
super().__init__(app)
self.allow_origins = allow_origins or ['*']
self.allow_methods = allow_methods or ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS']
self.allow_headers = allow_headers or ['Content-Type', 'Authorization']
def __call__(self, environ, start_response):
# 处理预检请求
if environ['REQUEST_METHOD'] == 'OPTIONS':
status = '200 OK'
headers = [
('Access-Control-Allow-Origin', ', '.join(self.allow_origins)),
('Access-Control-Allow-Methods', ', '.join(self.allow_methods)),
('Access-Control-Allow-Headers', ', '.join(self.allow_headers)),
('Access-Control-Max-Age', '86400'), # 24小时
]
start_response(status, headers)
return []
# 包装start_response以添加CORS头部
def custom_start_response(status, response_headers, exc_info=None):
# 添加CORS头部
cors_headers = [
('Access-Control-Allow-Origin', ', '.join(self.allow_origins)),
('Access-Control-Allow-Credentials', 'true'),
]
# 调用原始的start_response
return start_response(status, response_headers + cors_headers, exc_info)
# 调用下一个应用或中间件
return self.app(environ, custom_start_response)
class SessionMiddleware(WSGIMiddleware):
"""会话中间件"""
def __init__(self, app, session_cookie='session_id'):
super().__init__(app)
self.session_cookie = session_cookie
self.sessions = {} # 简单内存存储
def __call__(self, environ, start_response):
# 从cookie中提取会话ID
cookie_header = environ.get('HTTP_COOKIE', '')
cookies = self.parse_cookies(cookie_header)
session_id = cookies.get(self.session_cookie)
# 获取或创建会话
if session_id and session_id in self.sessions:
session = self.sessions[session_id]
else:
import uuid
session_id = str(uuid.uuid4())
session = {'id': session_id, 'data': {}}
self.sessions[session_id] = session
# 将会话添加到environ中
environ['wsgi.session'] = session
# 包装start_response以设置会话cookie
def custom_start_response(status, response_headers, exc_info=None):
# 添加会话cookie
cookie = f"{self.session_cookie}={session_id}; Path=/; HttpOnly"
response_headers.append(('Set-Cookie', cookie))
return start_response(status, response_headers, exc_info)
# 调用下一个应用或中间件
try:
return self.app(environ, custom_start_response)
finally:
# 清理过期会话(简化实现)
self.cleanup_sessions()
def parse_cookies(self, cookie_header):
"""解析Cookie头部"""
cookies = {}
if cookie_header:
for cookie in cookie_header.split(';'):
if '=' in cookie:
key, value = cookie.strip().split('=', 1)
cookies[key] = value
return cookies
def cleanup_sessions(self):
"""清理过期会话(简化实现)"""
pass
class StaticFileMiddleware(WSGIMiddleware):
"""静态文件中间件"""
def __init__(self, app, static_dir='static', url_prefix='/static'):
super().__init__(app)
self.static_dir = static_dir
self.url_prefix = url_prefix
def __call__(self, environ, start_response):
path = environ['PATH_INFO']
# 检查是否是静态文件请求
if path.startswith(self.url_prefix):
# 提取文件路径
file_path = path[len(self.url_prefix):].lstrip('/')
# 防止路径遍历攻击
if '..' in file_path or file_path.startswith('/'):
status = '403 Forbidden'
headers = [('Content-Type', 'text/plain')]
start_response(status, headers)
return [b'Access denied']
# 构建完整文件路径
import os
full_path = os.path.join(self.static_dir, file_path)
# 检查文件是否存在
if os.path.exists(full_path) and os.path.isfile(full_path):
# 根据扩展名确定Content-Type
content_type = self.get_content_type(full_path)
# 读取文件内容
with open(full_path, 'rb') as f:
content = f.read()
# 返回文件响应
status = '200 OK'
headers = [
('Content-Type', content_type),
('Content-Length', str(len(content)))
]
start_response(status, headers)
return [content]
else:
# 文件不存在
status = '404 Not Found'
headers = [('Content-Type', 'text/plain')]
start_response(status, headers)
return [b'File not found']
# 不是静态文件请求,继续处理
return self.app(environ, start_response)
def get_content_type(self, file_path):
"""根据文件扩展名获取Content-Type"""
import os
ext = os.path.splitext(file_path)[1].lower()
content_types = {
'.html': 'text/html',
'.css': 'text/css',
'.js': 'application/javascript',
'.json': 'application/json',
'.txt': 'text/plain',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.pdf': 'application/pdf',
'.svg': 'image/svg+xml',
'.ico': 'image/x-icon',
}
return content_types.get(ext, 'application/octet-stream')
# 中间件链构建器
def build_middleware_stack(app, middlewares):
"""构建中间件栈"""
wrapped_app = app
for middleware_class, kwargs in reversed(middlewares):
wrapped_app = middleware_class(wrapped_app, **kwargs)
return wrapped_app
# 使用示例
def create_app_with_middleware():
"""创建带有中间件的应用"""
# 基础应用
def simple_app(environ, start_response):
status = '200 OK'
headers = [('Content-Type', 'text/html; charset=utf-8')]
# 获取会话(如果存在)
session = environ.get('wsgi.session', {})
session_data = session.get('data', {})
# 更新访问计数
visit_count = session_data.get('visits', 0) + 1
session_data['visits'] = visit_count
body = f"""
<html>
<head><title>MiddleWare Demo</title></head>
<body>
<h1>MiddleWare Demo</h1>
<p>You have visited this page {visit_count} times.</p>
<p>Path: {environ['PATH_INFO']}</p>
<p>Method: {environ['REQUEST_METHOD']}</p>
<p><a href="/static/test.txt">Static File Test</a></p>
<p><a href="/api/test">API Test (requires auth)</a></p>
</body>
</html>
"""
start_response(status, headers)
return [body.encode('utf-8')]
# API端点(需要认证)
def api_app(environ, start_response):
status = '200 OK'
headers = [
('Content-Type', 'application/json'),
('X-Custom-Header', 'API Response')
]
data = {
'status': 'success',
'message': 'API is working',
'authenticated': True,
'timestamp': datetime.now().isoformat()
}
import json
start_response(status, headers)
return [json.dumps(data).encode('utf-8')]
# 组合应用
def combined_app(environ, start_response):
path = environ['PATH_INFO']
if path.startswith('/api/'):
return api_app(environ, start_response)
else:
return simple_app(environ, start_response)
# 构建中间件栈
middlewares = [
(LoggingMiddleware, {}),
(CORSMiddleware, {
'allow_origins': ['http://localhost:3000'],
'allow_methods': ['GET', 'POST', 'OPTIONS'],
'allow_headers': ['Content-Type', 'X-API-Key']
}),
(SessionMiddleware, {'session_cookie': 'myapp_session'}),
(AuthenticationMiddleware, {
'api_keys': {'test-key-123': 'admin'}
}),
(StaticFileMiddleware, {
'static_dir': 'static',
'url_prefix': '/static'
})
]
return build_middleware_stack(combined_app, middlewares)
# 运行带中间件的应用
if __name__ == '__main__':
# 创建静态目录和测试文件
import os
if not os.path.exists('static'):
os.makedirs('static')
with open('static/test.txt', 'w') as f:
f.write('This is a static file served by WSGI middleware.\n')
from simple_wsgi_server import SimpleWSGIServer
app = create_app_with_middleware()
server = SimpleWSGIServer(port=8080)
server.set_app(app)
server.serve_forever()
6. 生产级WSGI服务器
使用Python标准库的wsgiref
wsgiref 是 Python 内置的参考实现,适合学习与本地调试,但不可用于生产环境:
# wsgiref_demo.py
from wsgiref.simple_server import make_server
from wsgiref.util import setup_testing_defaults
from wsgiref.headers import Headers
import json
class WsgirefApp:
"""使用wsgiref的WSGI应用"""
def __init__(self):
self.routes = {}
def route(self, path):
def decorator(handler):
self.routes[path] = handler
return handler
return decorator
def __call__(self, environ, start_response):
setup_testing_defaults(environ)
path = environ['PATH_INFO']
method = environ['REQUEST_METHOD']
if path in self.routes:
try:
response = self.routes[path](environ)
if isinstance(response, tuple):
body, status, headers = response
else:
body = response
status = '200 OK'
headers = [('Content-Type', 'text/html')]
start_response(status, headers)
return [body.encode('utf-8')]
except Exception as e:
status = '500 Internal Server Error'
headers = [('Content-Type', 'text/plain')]
start_response(status, headers)
return [f'Error: {e}'.encode('utf-8')]
else:
status = '404 Not Found'
headers = [('Content-Type', 'text/html')]
start_response(status, headers)
return [b'<h1>404 Not Found</h1>']
# 创建应用
app = WsgirefApp()
@app.route('/')
def index(environ):
return '''
<html>
<head><title>WSGIREF Demo</title></head>
<body>
<h1>WSGIREF Demo Application</h1>
<p>Using Python standard library wsgiref</p>
<p><a href="/api">API Example</a></p>
<p><a href="/env">Environment</a></p>
</body>
</html>
'''
@app.route('/api')
def api(environ):
data = {
'status': 'success',
'message': 'This is a JSON API response',
'server': 'wsgiref',
'timestamp': datetime.now().isoformat()
}
headers = [
('Content-Type', 'application/json'),
('X-Server', 'WSGIREF')
]
return json.dumps(data, indent=2), '200 OK', headers
@app.route('/env')
def show_env(environ):
body = '<h1>WSGI Environment</h1><ul>'
for key, value in sorted(environ.items()):
body += f'<li><strong>{key}:</strong> {value}</li>'
body += '</ul>'
return body, '200 OK', [('Content-Type', 'text/html')]
# 运行服务器
def run_wsgiref_server():
"""使用wsgiref运行WSGI服务器"""
port = 8000
with make_server('', port, app) as httpd:
print(f"Serving on port {port}...")
print(f"Visit http://localhost:{port}")
httpd.serve_forever()
if __name__ == '__main__':
run_wsgiref_server()
多线程WSGI服务器
为提升并发能力,可将单线程服务器升级为线程池模型:
# threaded_wsgi_server.py
import socket
import threading
import queue
import io
import sys
class ThreadPoolWSGIServer:
"""线程池WSGI服务器"""
def __init__(self, host='localhost', port=8080, thread_count=10):
self.host = host
self.port = port
self.thread_count = thread_count
self.app = None
self.request_queue = queue.Queue()
self.threads = []
self.running = False
def set_app(self, app):
self.app = app
def worker(self):
"""工作线程函数"""
while self.running:
try:
client_socket = self.request_queue.get(timeout=1)
if client_socket:
self.handle_request(client_socket)
except queue.Empty:
continue
def handle_request(self, client_socket):
"""处理请求(简化版)"""
try:
# 接收请求
request_data = client_socket.recv(4096)
if not request_data:
return
# 简化的请求解析
request_lines = request_data.decode('utf-8').split('\r\n')
if not request_lines:
return
request_line = request_lines[0]
parts = request_line.split(' ')
if len(parts) < 3:
return
method, path, version = parts
# 构建environ
environ = {
'REQUEST_METHOD': method,
'PATH_INFO': path,
'SERVER_NAME': self.host,
'SERVER_PORT': str(self.port),
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': io.BytesIO(b''),
'wsgi.errors': sys.stderr,
'wsgi.multithread': True,
'wsgi.multiprocess': False,
'wsgi.run_once': False,
}
# 调用应用
if self.app:
def start_response(status, headers):
# 构建响应
response = f'HTTP/1.1 {status}\r\n'
for key, value in headers:
response += f'{key}: {value}\r\n'
response += '\r\n'
return response.encode('utf-8')
# 获取响应体
app_iter = self.app(environ, start_response)
response_parts = []
for chunk in app_iter:
if isinstance(chunk, str):
chunk = chunk.encode('utf-8')
response_parts.append(chunk)
# 发送响应
response = b''.join(response_parts)
client_socket.sendall(response)
except Exception as e:
print(f"请求处理错误: {e}")
error_response = (
b'HTTP/1.1 500 Internal Server Error\r\n'
b'Content-Type: text/plain\r\n'
b'\r\n'
b'Server Error'
)
client_socket.sendall(error_response)
finally:
client_socket.close()
def serve_forever(self):
"""启动服务器"""
# 创建服务器socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((self.host, self.port))
server_socket.listen(100)
self.running = True
# 启动工作线程
for i in range(self.thread_count):
thread = threading.Thread(target=self.worker, daemon=True)
thread.start()
self.threads.append(thread)
print(f"线程池WSGI服务器启动 (线程数: {self.thread_count})")
print(f"监听地址: http://{self.host}:{self.port}")
try:
while self.running:
try:
client_socket, address = server_socket.accept()
self.request_queue.put(client_socket)
except socket.timeout:
continue
except KeyboardInterrupt:
break
finally:
self.running = False
server_socket.close()
print("服务器已停止")
# 测试应用
def test_app(environ, start_response):
status = '200 OK'
headers = [('Content-Type', 'text/html')]
start_response(status, headers)
thread_name = threading.current_thread().name
body = f"""
<html>
<head><title>Thread Pool Test</title></head>
<body>
<h1>Thread Pool WSGI Server</h1>
<p>Thread: {thread_name}</p>
<p>Path: {environ['PATH_INFO']}</p>
<p>Time: {datetime.now()}</p>
</body>
</html>
"""
return [body.encode('utf-8')]
if __name__ == '__main__':
server = ThreadPoolWSGIServer(thread_count=4)
server.set_app(test_app)
server.serve_forever()
7. WSGI与主流框架
Flask的WSGI实现
Flask 应用本身就是一个符合 WSGI 规范的可调用对象,可直接被任何 WSGI 服务器托管:
# flask_wsgi_demo.py
from flask import Flask, request, jsonify
# Flask应用本身就是WSGI应用
app = Flask(__name__)
@app.route('/')
def home():
return '''
<html>
<head><title>Flask WSGI Demo</title></head>
<body>
<h1>Flask as WSGI Application</h1>
<p>Flask applications are WSGI compliant.</p>
<p><a href="/wsgi">WSGI Info</a></p>
<p><a href="/api">API</a></p>
</body>
</html>
'''
@app.route('/wsgi')
def wsgi_info():
"""显示WSGI环境信息"""
wsgi_env = {
'wsgi.version': request.environ.get('wsgi.version'),
'wsgi.url_scheme': request.environ.get('wsgi.url_scheme'),
'wsgi.multithread': request.environ.get('wsgi.multithread'),
'wsgi.multiprocess': request.environ.get('wsgi.multiprocess'),
'wsgi.run_once': request.environ.get('wsgi.run_once'),
'server_software': request.environ.get('SERVER_SOFTWARE'),
'request_method': request.environ.get('REQUEST_METHOD'),
}
return jsonify(wsgi_env)
@app.route('/api/data', methods=['GET', 'POST'])
def api_data():
if request.method == 'POST':
data = request.get_json() or request.form
return jsonify({
'status': 'success',
'method': 'POST',
'data': data,
'wsgi_input': 'Flask abstracts wsgi.input'
})
else:
return jsonify({
'status': 'success',
'method': 'GET',
'message': 'Flask handles WSGI for you'
})
# 直接作为WSGI应用使用
if __name__ == '__main__':
# 方式1: 使用Flask内置开发服务器
# app.run(debug=True, port=5000)
# 方式2: 使用标准WSGI服务器
from wsgiref.simple_server import make_server
port = 5000
with make_server('', port, app) as httpd:
print(f"Flask WSGI应用运行在 http://localhost:{port}")
httpd.serve_forever()
Django的WSGI配置
Django 的 WSGI 入口点位于项目目录下的 wsgi.py,其本质是加载 Django 设置并返回一个 WSGI 应用实例:
# django_wsgi_example.py (概念示例)
"""
Django的WSGI入口点通常位于项目目录下的wsgi.py文件
以下是简化的示例:
"""
import os
import sys
from django.core.wsgi import get_wsgi_application
# 设置Django设置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# 获取WSGI应用
application = get_wsgi_application()
"""
Django的WSGI应用可以在任何WSGI服务器上运行
例如使用gunicorn: gunicorn myproject.wsgi:application
"""
"""
Django的WSGI处理器实际上是一个中间件栈,包括:
1. SecurityMiddleware - 安全中间件
2. SessionMiddleware - 会话中间件
3. CommonMiddleware - 通用中间件
4. CsrfViewMiddleware - CSRF保护
5. AuthenticationMiddleware - 认证中间件
6. MessageMiddleware - 消息中间件
7. XFrameOptionsMiddleware - X-Frame-Options
最终调用视图函数处理请求
"""
8. WSGI部署实践
使用Gunicorn部署
Gunicorn 是 Python 生态中最主流的生产级 WSGI 服务器,支持同步、异步、多进程等多种工作模式:
# gunicorn_config.py
# Gunicorn配置文件示例
# 绑定地址和端口
bind = "0.0.0.0:8000"
# 工作进程数 (CPU核心数 * 2 + 1 是个不错的起点)
workers = 3
# 工作进程类型 (sync, eventlet, gevent, tornado, gthread)
worker_class = "sync"
# 每个工作进程的线程数 (仅对gthread worker有效)
threads = 1
# 最大并发请求数
worker_connections = 1000
# 进程名
proc_name = "my_wsgi_app"
# 日志配置
accesslog = "-" # 访问日志输出到stdout
errorlog = "-" # 错误日志输出到stderr
loglevel = "info"
# 超时设置
timeout = 30
keepalive = 2
# 优雅重启
graceful_timeout = 30
# 最大请求数 (防止内存泄漏)
max_requests = 1000
max_requests_jitter = 50
# 进程用户/组
user = "www-data"
group = "www-data"
# 使用命令: gunicorn -c gunicorn_config.py myapp:app
使用uWSGI部署
uWSGI 功能更强大,配置更灵活,尤其适合与 Nginx 深度集成:
# uwsgi.ini
# uWSGI配置文件示例
[uwsgi]
# 项目目录
chdir = /path/to/your/project
# WSGI模块
module = myapp:app
# 主进程
master = true
# 进程数
processes = 4
# 线程数
threads = 2
# 绑定地址
http = 0.0.0.0:8000
# Socket文件 (用于与Nginx通信)
# socket = /tmp/myapp.sock
# chmod-socket = 664
# vacuum = true
# 进程用户/组
uid = www-data
gid = www-data
# 日志文件
logto = /var/log/uwsgi/myapp.log
# 最大请求数
max-requests = 1000
# 优雅重启
reload-on-rss = 256
# 内存报告
memory-report = true
# 启用线程
enable-threads = true
# 使用命令: uwsgi --ini uwsgi.ini
Nginx + uWSGI配置
典型反向代理架构,Nginx 处理静态资源与负载均衡,uWSGI 处理动态请求:
# nginx配置示例
server {
listen 80;
server_name example.com;
location / {
include uwsgi_params;
uwsgi_pass unix:/tmp/myapp.sock;
# 超时设置
uwsgi_read_timeout 300;
uwsgi_connect_timeout 300;
uwsgi_send_timeout 300;
}
location /static {
alias /path/to/your/project/static;
expires 30d;
}
location /media {
alias /path/to/your/project/media;
expires 30d;
}
# 禁止访问隐藏文件
location ~ /\. {
deny all;
}
}
9. WSGI性能优化
异步WSGI应用
虽然 WSGI 本身是同步协议,但可通过线程池桥接异步 I/O,实现“伪异步”高并发:
# async_wsgi.py
import asyncio
from concurrent.futures import ThreadPoolExecutor
from wsgiref.simple_server import make_server
class AsyncWSGIApp:
"""支持异步处理的WSGI应用"""
def __init__(self, max_workers=10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.loop = asyncio.new_event_loop()
async def async_handler(self, environ):
"""异步处理函数"""
# 模拟异步I/O操作
await asyncio.sleep(0.1)
path = environ['PATH_INFO']
return f"""
<html>
<head><title>Async WSGI</title></head>
<body>
<h1>Async WSGI Response</h1>
<p>Path: {path}</p>
<p>Async processed at: {datetime.now()}</p>
</body>
</html>
"""
def __call__(self, environ, start_response):
# 将异步函数包装为同步调用
future = asyncio.run_coroutine_threadsafe(
self.async_handler(environ),
self.loop
)
try:
# 等待异步结果
body = future.result(timeout=5)
status = '200 OK'
headers = [('Content-Type', 'text/html')]
start_response(status, headers)
return [body.encode('utf-8')]
except asyncio.TimeoutError:
status = '504 Gateway Timeout'
headers = [('Content-Type', 'text/plain')]
start_response(status, headers)
return [b'Request timeout']
except Exception as e:
status = '500 Internal Server Error'
headers = [('Content-Type', 'text/plain')]
start_response(status, headers)
return [f'Error: {e}'.encode('utf-8')]
def run_async_app():
"""运行异步WSGI应用"""
app = AsyncWSGIApp()
# 启动事件循环线程
def run_loop():
asyncio.set_event_loop(app.loop)
app.loop.run_forever()
import threading
loop_thread = threading.Thread(target=run_loop, daemon=True)
loop_thread.start()
# 启动WSGI服务器
with make_server('', 8000, app) as httpd:
print("Async WSGI app serving on port 8000")
httpd.serve_forever()
if __name__ == '__main__':
run_async_app()
WSGI应用性能监控
实时采集关键指标,是保障线上服务稳定性的基石:
# wsgi_monitoring.py
import time
import psutil
import threading
from collections import deque
class WSGIMonitor:
"""WSGI应用性能监控"""
def __init__(self, app, history_size=100):
self.app = app
self.request_times = deque(maxlen=history_size)
self.error_count = 0
self.total_requests = 0
self.lock = threading.Lock()
def __call__(self, environ, start_response):
start_time = time.time()
# 监控start_response
def monitored_start_response(status, headers, exc_info=None):
# 记录响应状态
if status and status.startswith('5'):
with self.lock:
self.error_count += 1
# 添加监控头部
headers.append(('X-Request-ID', threading.current_thread().name))
headers.append(('X-Server-Load', str(psutil.cpu_percent())))
return start_response(status, headers, exc_info)
try:
# 处理请求
response = self.app(environ, monitored_start_response)
# 计算处理时间
duration = time.time() - start_time
with self.lock:
self.total_requests += 1
self.request_times.append(duration)
return response
except Exception as e:
with self.lock:
self.error_count += 1
raise
def get_stats(self):
"""获取性能统计"""
import statistics
with self.lock:
if self.request_times:
avg_time = statistics.mean(self.request_times)
max_time = max(self.request_times)
min_time = min(self.request_times)
p95 = statistics.quantiles(self.request_times, n=20)[18] if len(self.request_times) >= 20 else 0
else:
avg_time = max_time = min_time = p95 = 0
return {
'total_requests': self.total_requests,
'error_count': self.error_count,
'error_rate': self.error_count / max(self.total_requests, 1),
'avg_response_time': avg_time,
'max_response_time': max_time,
'min_response_time': min_time,
'p95_response_time': p95,
'requests_per_second': len(self.request_times) / 60 if self.request_times else 0,
'memory_usage': psutil.Process().memory_info().rss / 1024 / 1024, # MB
'cpu_percent': psutil.cpu_percent(),
}
def stats_endpoint(self, environ, start_response):
"""监控数据端点"""
stats = self.get_stats()
import json
body = json.dumps(stats, indent=2)
status = '200 OK'
headers = [
('Content-Type', 'application/json'),
('Cache-Control', 'no-cache')
]
start_response(status, headers)
return [body.encode('utf-8')]
# 使用示例
def create_monitored_app():
"""创建带监控的应用"""
def base_app(environ, start_response):
status = '200 OK'
headers = [('Content-Type', 'text/html')]
start_response(status, headers)
# 模拟一些处理时间
time.sleep(0.01)
return [b'<h1>Monitored App</h1><p>This app is being monitored.</p>']
# 包装应用
monitored_app = WSGIMonitor(base_app)
# 添加监控端点
def app_with_monitor(environ, start_response):
path = environ['PATH_INFO']
if path == '/stats':
return monitored_app.stats_endpoint(environ, start_response)
else:
return monitored_app(environ, start_response)
return app_with_monitor
if __name__ == '__main__':
from wsgiref.simple_server import make_server
app = create_monitored_app()
with make_server('', 8000, app) as httpd:
print("Monitored app serving on port 8000")
print("Visit http://localhost:8000/stats for monitoring data")
httpd.serve_forever()
总结
今天深入学习了WSGI协议,掌握了以下核心内容:
关键收获
- WSGI协议基础:理解了WSGI的设计哲学和核心接口
- WSGI服务器实现:从零实现了完整的WSGI服务器
- WSGI应用程序:创建了多种形式的WSGI应用(函数、类、框架)
- WSGI中间件:实现了日志、认证、CORS、会话等中间件
- 生产部署:了解了Gunicorn、uWSGI等生产服务器的配置
- 性能优化:学习了异步WSGI和性能监控技术
- ASGI简介:了解了WSGI的继任者ASGI协议
核心技术点
environ字典的构建和解析
start_response回调机制
- 中间件栈的设计模式
- 请求/响应生命周期管理
- 多线程/异步处理
- 与Flask、Django等框架的集成
WSGI的优缺点
优点:
- 标准化,提高互操作性
- 简单清晰的设计
- 成熟的生态系统
- 良好的调试支持
缺点:
- 同步模型,不适合高并发
- 不支持WebSocket等新协议
- 每个请求都需要完整的环境重建
实践建议
- 深入理解中间件:尝试实现更多类型的中间件
- 性能测试:对不同WSGI服务器进行性能对比测试
- 安全性:实现安全相关的中间件(如CSRF防护、XSS过滤)
- 协议扩展:尝试实现HTTP/2或WebSocket的WSGI扩展
- 容器化部署:将WSGI应用Docker化并部署到Kubernetes
扩展学习
- ASGI规范:深入学习ASGI协议和异步Web开发
- Web服务器实现:研究Nginx、Apache的WSGI模块实现
- 协议分析工具:使用Wireshark分析WSGI通信
- 性能调优:学习WSGI应用的内存分析和性能优化
- 云原生部署:学习在云平台上部署WSGI应用的最佳实践
明天我们将进入Web开发概述的学习,这是第四阶段Python生态系统的开始,将介绍Python Web开发的整体生态和主要框架。