PYTHON 实现智能网络请求批处理系统
PYTHON 实现智能网络请求批处理系统
功能描述
这个智能网络请求批处理系统提供以下核心功能:
批量请求并发执行
智能失败重试机制
请求速率控制
结果自动解析
异常自动处理
进度实时监控
结果缓存支持
自定义请求中间件
代码实现
import requests
import asyncio
import aiohttp
from typing import List, Dict, Optional, Callable, Any
from dataclasses import dataclass
import time
import json
import logging
from functools import wraps
import hashlib
import os
from concurrent.futures import ThreadPoolExecutor
import backoff
logger = logging.getLogger(name)
@dataclass
class RequestTask:
url: str
method: str = “GET”
headers: Optional[Dict] = None
params: Optional[Dict] = None
data: Optional[Dict] = None
json: Optional[Dict] = None
metadata: Optional[Dict] = None
@dataclass
class ResponseResult:
task: RequestTask
status: int
data: Any
error: Optional[str] = None
attempt: int = 1
latency: float = 0
class BatchRequestProcessor:
“”“智能批量请求处理器”“”
def __init__(self,
max_concurrency: int = 10,
rate_limit: int = 30,
cache_dir: Optional[str] = None):
self.max_concurrency = max_concurrency
self.rate_limit = rate_limit # 每分钟请求数
self.cache_dir = cache_dir
self.middlewares: List[Callable] = []
self._last_request_time = 0
self._request_count = 0
self._reset_time = time.time() + 60
if cache_dir and not os.path.exists(cache_dir):
os.makedirs(cache_dir)
def add_middleware(self, middleware: Callable):
"""添加请求中间件"""
self.middlewares.append(middleware)
def _apply_middlewares(self, task: RequestTask) -> RequestTask:
"""应用中间件处理请求"""
for middleware in self.middlewares:
task = middleware(task)
return task
def _get_cache_key(self, task: RequestTask) -> str:
"""生成缓存键"""
key_data = {
'url': task.url,
'method': task.method,
'params': task.params,
'data': task.data,
'json': task.json
return hashlib.md5(json.dumps(key_data, sort_keys=True).encode()).hexdigest()
def _check_cache(self, task: RequestTask) -> Optional[ResponseResult]:
"""检查缓存"""
if not self.cache_dir:
return None
cache_key = self._get_cache_key(task)
cache_file = os.path.join(self.cache_dir, f"{cache_key}.json")
if os.path.exists(cache_file):
try:
with open(cache_file, 'r') as f:
data = json.load(f)
return ResponseResult(
task=task,
status=200,
data=data,
latency=0
)
except Exception as e:
logger.warning(f"Failed to load cache: {str(e)}")
return None
def _save_cache(self, result: ResponseResult):
"""保存结果到缓存"""
if not self.cache_dir or result.error or result.status != 200:
return
cache_key = self._get_cache_key(result.task)
cache_file = os.path.join(self.cache_dir, f"{cache_key}.json")
try:
with open(cache_file, 'w') as f:
json.dump(result.data, f)
except Exception as e:
logger.warning(f"Failed to save cache: {str(e)}")
def _rate_limit_control(self):
"""请求速率控制"""
now = time.time()
if now > self._reset_time:
self._request_count = 0
self._reset_time = now + 60
if self._request_count >= self.rate_limit:
sleep_time = self._reset_time - now
if sleep_time > 0:
time.sleep(sleep_time)
self._request_count = 0
self._reset_time = time.time() + 60
@backoff.on_exception(backoff.expo,
(requests.exceptions.RequestException,),
max_tries=3)
def _execute_single_request(self, task: RequestTask) -> ResponseResult:
"""执行单个请求"""
self._rate_limit_control()
start_time = time.time()
task = self._apply_middlewares(task)
cached = self._check_cache(task)
if cached:
return cached
try:
response = requests.request(
method=task.method,
url=task.url,
headers=task.headers,
params=task.params,
data=task.data,
json=task.json,
timeout=10
)
try:
response_data = response.json()
except ValueError:
response_data = response.text
result = ResponseResult(
task=task,
status=response.status_code,
data=response_data,
latency=time.time() - start_time
)
self._save_cache(result)
self._request_count += 1
return result
except Exception as e:
self._request_count += 1
return ResponseResult(
task=task,
status=500,
data=None,
error=str(e),
latency=time.time() - start_time
)
async def _execute_async_request(self, session: aiohttp.ClientSession, task: RequestTask) -> ResponseResult:
"""异步执行单个请求"""
self._rate_limit_control()
start_time = time.time()
task = self._apply_middlewares(task)
cached = self._check_cache(task)
if cached:
return cached
try:
async with session.request(
method=task.method,
url=task.url,
headers=task.headers,
params=task.params,
data=task.data,
json=task.json,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
try:
response_data = await response.json()
except (ValueError, aiohttp.ContentTypeError):
response_data = await response.text()
result = ResponseResult(
task=task,
status=response.status,
data=response_data,
latency=time.time() - start_time
)
self._save_cache(result)
self._request_count += 1
return result
except Exception as e:
self._request_count += 1
return ResponseResult(
task=task,
status=500,
data=None,
error=str(e),
latency=time.time() - start_time
)
def process_batch_sync(self, tasks: List[RequestTask], progress_callback: Optional[Callable] = None) -> List[ResponseResult]:
"""同步批量处理请求"""
results = []
with ThreadPoolExecutor(max_workers=self.max_concurrency) as executor:
futures = [executor.submit(self._execute_single_request, task) for task in tasks]
for i, future in enumerate(futures):
try:
result = future.result()
results.append(result)
if progress_callback:
progress_callback(len(tasks), i+1)
except Exception as e:
logger.error(f"Request failed: {str(e)}")
results.append(ResponseResult(
task=tasks[i],
status=500,
data=None,
error=str(e)
))
return results
async def process_batch_async(self, tasks: List[RequestTask], progress_callback: Optional[Callable] = None) -> List[ResponseResult]:
"""异步批量处理请求"""
results = []
connector = aiohttp.TCPConnector(limit=self.max_concurrency)
async with aiohttp.ClientSession(connector=connector) as session:
pending = [self._execute_async_request(session, task) for task in tasks]
for i, task in enumerate(asyncio.as_completed(pending)):
try:
result = await task
results.append(result)
if progress_callback:
progress_callback(len(tasks), i+1)
except Exception as e:
logger.error(f"Request failed: {str(e)}")
results.append(ResponseResult(
task=tasks[i],
status=500,
data=None,
error=str(e)
))
return results
使用说明
初始化处理器:
processor = BatchRequestProcessor(
max_concurrency=20, # 最大并发数
rate_limit=100, # 每分钟最多100个请求
cache_dir=“.request_cache” # 启用请求缓存
)
添加中间件:
def auth_middleware(task: RequestTask) -> RequestTask:
“”“添加认证头的中间件”“”
task.headers = task.headers or {}
task.headers[“Authorization”] = “Bearer your_token”
return task
processor.add_middleware(auth_middleware)
准备批量请求:
tasks = [
RequestTask(
url=“https://api.example.com/users/1”,
method=“GET”
),
RequestTask(
url=“https://api.example.com/users/2”,
method=“GET”
),
RequestTask(
url=“https://api.example.com/posts”,
method=“POST”,
json={“title”: “Hello”, “content”: “World”}
)
执行批量请求:
同步方式
def progress(total, current):
print(f"已完成 {current}/{total} 个请求")
results = processor.process_batch_sync(tasks, progress_callback=progress)
异步方式
results = await processor.process_batch_async(tasks, progress_callback=progress)
处理结果:
for result in results:
if result.error:
print(f"请求失败: {result.task.url} - {result.error}“)
else:
print(f"请求成功: {result.task.url} - 状态码: {result.status}”)
print(f"响应数据: {result.data}")
高级功能:
通过max_concurrency控制并发量
使用rate_limit限制请求速率
利用cache_dir缓存请求结果
添加中间件实现统一请求处理
这个系统特别适合需要高效处理大量API请求的场景,如数据采集、微服务调用等,能够显著提高网络请求的效率和可靠性。

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)