微信扫码
添加专属顾问
我要投稿
Google GenAI Processors革新AI开发流程,让复杂多模态应用构建更高效可靠。核心内容: 1. ProcessorParts数据结构标准化多模态数据处理 2. 异步流处理与双向流控制实现高效运算 3. 统一Processor接口提供强大组合能力
文|祝融
编辑|郭嘉
class ProcessorPart:
content: ProcessorContent # 实际数据载荷
metadata: Dict[str, Any] # 元数据字典
mime_type: str # MIME类型标识
timestamp: float # 时间戳
sequence_id: str # 序列标识符
async def process_stream(input_stream: AsyncIterator[ProcessorPart]) -> AsyncIterator[ProcessorPart]:
async for part in input_stream:
# 处理每个部分
processed_part = await transform_part(part)
yield processed_part
class BidirectionalProcessor:
async def process(self, input_stream, feedback_stream):
# 同时处理输入和反馈
async for input_part, feedback_part in zip(input_stream, feedback_stream):
result = await self.handle_with_feedback(input_part, feedback_part)
yield result
class Processor(ABC):
@abstractmethod
async def process(self, input_stream: AsyncIterator[ProcessorPart]) -> AsyncIterator[ProcessorPart]:
pass
def __call__(self, input_stream):
return self.process(input_stream)
# 处理链组合
audio_processor = AudioTranscriber()
text_processor = TextAnalyzer()
response_generator = ResponseGenerator()
# 链式处理
async def process_audio_input(audio_stream):
transcribed = audio_processor(audio_stream)
analyzed = text_processor(transcribed)
responses = response_generator(analyzed)
return responses
1. 同步文本处理
from genai_processors.models import GeminiTextProcessor
text_processor = GeminiTextProcessor(
model_name="gemini-pro",
api_key="your-api-key",
temperature=0.7,
max_tokens=1000
)
async def process_text_query(query: str):
input_part = ProcessorPart(
content=TextContent(query),
metadata={"user_id": "123", "session_id": "abc"}
)
async for response_part in text_processor(async_iter([input_part])):
return response_part.content.text
2. Live API流式处理
from genai_processors.models import GeminiLiveProcessor
live_processor = GeminiLiveProcessor(
model_name="gemini-live",
api_key="your-api-key",
streaming=True,
real_time_factor=1.0
)
async def handle_live_audio(audio_stream):
async for audio_chunk in audio_stream:
input_part = ProcessorPart(
content=AudioContent(audio_chunk),
metadata={"format": "wav", "sample_rate": 16000}
)
async for response in live_processor(async_iter([input_part])):
if response.content.type == "audio":
yield response.content.audio_data
elif response.content.type == "text":
print(f"Transcription: {response.content.text}")
class AsyncModelProcessor:
async def process_batch(self, inputs: List[ProcessorPart]):
# 并发处理多个输入
tasks = [self.process_single(input_part) for input_part in inputs]
results = await asyncio.gather(*tasks)
return results
async def process_single(self, input_part: ProcessorPart):
# 异步API调用
async with aiohttp.ClientSession() as session:
response = await session.post(self.api_endpoint, json=input_part.to_dict())
return ProcessorPart.from_response(await response.json())
+
运算符组合输入源和处理步骤,从而创建清晰的数据流from genai_processors.core import audio_io, live_model, video
# Input processor: combines camera streams and audio streams
input_processor = video.VideoIn() + audio_io.PyAudioIn(...)
# Output processor: plays the audio parts. Handles interruptions and pauses
# audio output when the user is speaking.
play_output = audio_io.PyAudioOut(...)
# Gemini Live API processor
live_processor = live_model.LiveProcessor(...)
# Compose the agent: mic+camera -> Gemini Live API -> play audio
live_processor = live_model.LiveProcessor(...)
live_agent = input_processor + live_processor + play_output
async for part in live_agent(streams.endless_stream()):
# Process the output parts (e.g., print transcription, model output, metadata)
print(part)
class BatchProcessor:
def __init__(self, batch_size: int = 32, max_concurrency: int = 10):
self.batch_size = batch_size
self.semaphore = asyncio.Semaphore(max_concurrency)
async def process_batch(self, input_stream):
batch = []
async for item in input_stream:
batch.append(item)
if len(batch) >= self.batch_size:
async with self.semaphore:
results = await self.process_batch_items(batch)
for result in results:
yield result
batch = []
if batch:
async with self.semaphore:
results = await self.process_batch_items(batch)
for result in results:
yield result
class CustomAudioProcessor(Processor):
def __init__(self, model_path: str, config: Dict[str, Any]):
self.model = load_model(model_path)
self.config = config
async def process(self, input_stream: AsyncIterator[ProcessorPart]) -> AsyncIterator[ProcessorPart]:
async for part in input_stream:
# 验证输入类型
if not isinstance(part.content, AudioContent):
raise ValueError(f"Expected AudioContent, got {type(part.content)}")
audio_data = await self.preprocess_audio(part.content.audio_data)
result = await self.model.predict(audio_data)
# 创建输出ProcessorPart
output_part = ProcessorPart(
content=TextContent(result.transcription),
metadata={
**part.metadata,
'confidence': result.confidence,
'processing_time': result.processing_time
}
)
yield output_part
async def preprocess_audio(self, audio_data: bytes) -> np.ndarray:
# 音频预处理逻辑
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 标准化
audio_array = audio_array.astype(np.float32) / 32768.0
# 重采样到目标频率
if self.config.get('target_sample_rate'):
audio_array = resample(audio_array, self.config['target_sample_rate'])
return audio_array
PartProcessor的高级用法
对于需要更细粒度控制的场景,可以使用PartProcessor:
class AdvancedPartProcessor(PartProcessor):
async def process_part(self, part: ProcessorPart) -> AsyncIterator[ProcessorPart]:
# 检查是否需要分割大型数据
if part.content.size > self.max_chunk_size:
# 分割为较小的块
chunks = await self.split_content(part.content)
for i, chunk in enumerate(chunks):
chunk_part = ProcessorPart(
content=chunk,
metadata={
**part.metadata,
'chunk_index': i,
'total_chunks': len(chunks)
}
)
processed_chunk = await self.process_chunk(chunk_part)
yield processed_chunk
else:
# 直接处理小数据
result = await self.process_single_part(part)
yield result
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-07-14
月费200刀的AI浏览器,Perplexity Comet的真实体验如何?
2025-07-14
一文吃透Dify:Chatflow与Workflow的深度对决
2025-07-14
首个AI智能体安全测试标准发布,蚂蚁集团、清华大学、中国电信等联合编制
2025-07-14
有创始人开始收“尽调误工费”了
2025-07-14
一文讲清楚大模型中8个关键词及原理:LLM、Transformer、GPT、Bert、预训练、微调、深度学习、Token
2025-07-14
Anthropic官方:Claude Code - 安装配置手册
2025-07-14
不用AI,可能永远造不出新一代动力电池|甲子光年
2025-07-14
AI浏览器是AI产品破局关键?| Perplexity 创始人YC圆桌实录
2025-05-29
2025-05-23
2025-04-29
2025-05-07
2025-04-29
2025-05-07
2025-05-07
2025-06-01
2025-05-07
2025-04-17
2025-07-14
2025-07-14
2025-07-14
2025-07-14
2025-07-14
2025-07-14
2025-07-13
2025-07-13