支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


让复杂 AI 应用构建就像搭积木:Spring AI Alibaba Graph 使用指南与源码解读

发布日期:2025-07-14 19:43:34 浏览次数: 1525
作者:阿里云云原生

微信搜一搜,关注“阿里云云原生”

推荐语

Spring AI Alibaba Graph让构建复杂AI应用变得像搭积木一样简单,深度集成Java生态并提供丰富预置组件,大幅降低开发门槛。

核心内容:
1. 框架核心架构与设计理念解析
2. 15+预定义节点类型与多种Agent模式详解
3. 从快速入门到生产级特性的完整开发指南

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

作者:罗天,怀玉,刘宏宇,刘军


目录




Cloud Native


1. 引言与概述
2. 核心架构与设计理念
3. 核心概念深度解析
4. 预定义组件与工具箱
5. 高级特性与扩展能力
6. 快速开始与实战指南


1. 引言与概述

Cloud Native


1.1 Spring AI Alibaba Graph 概述


Spring AI Alibaba Graph 是社区核心实现之一,也是整个框架在设计理念上区别于 Spring AI 只做底层原子抽象的地方,Spring AI Alibaba 期望帮助开发者更容易的构建智能体应用。基于 Graph 开发者可以构建工作流、多智能体应用。


Spring AI Alibaba Graph 在设计理念上借鉴 LangGraph,社区在此基础上增加了大量预置 Node、简化了 State 定义过程等,让开发者更容易编写对等低代码平台的工作流、多智能体等。


1.2 核心特性与优势


相比传统的 AI 应用开发方式,Spring AI Alibaba Graph 具有以下核心优势:


Java 生态深度集成


  • Spring 原生支持:完整的依赖注入、配置管理、监控观测。
  • 高并发处理:Java 天然的多线程优势,支持高并发场景。


丰富的预置组件


  • 15+ 预定义节点类型:QuestionClassifierNode、LlmNode、ToolNode、KnowledgeRetrievalNode 等。
  • 多种 Agent 模式:内置 React、Reflection、Supervisor 等智能体模式。
  • 简化的 State 管理:统一的状态定义和合并策略。


声明式 API 设计


  • 类似 LangGraph 的 API:Java 开发者更容易上手。
  • 链式调用:简洁的流式 API,代码更加优雅。
  • 条件分支:支持复杂的条件逻辑和并行处理。


生产级特性


  • 观测性支持:完整的指标收集、链路追踪。
  • 容错机制:支持检查点、状态恢复、错误处。
  • 人机协作:Human-in-the-loop 支持,支持修改状态、恢复执行。


快速开始:客户评价分类系统

Cloud Native


让我们通过一个具体示例了解 Spring AI Alibaba Graph 的使用方式。这个示例展示了如何构建一个客户评价分类系统:


系统架构



核心代码实现


@Configurationpublic class CustomerServiceWorkflow {
    @Bean    public StateGraph customerServiceGraph(ChatModel chatModel) {        ChatClient chatClient = ChatClient.builder(chatModel)            .defaultAdvisors(new SimpleLoggerAdvisor())            .build();
        // 评价分类器 - 区分正面/负面评价        QuestionClassifierNode feedbackClassifier = QuestionClassifierNode.builder()            .chatClient(chatClient)            .inputTextKey("input")            .outputKey("classifier_output")            .categories(List.of("positive feedback""negative feedback"))            .build();
        // 问题细分器 - 对负面评价进行细分        QuestionClassifierNode specificQuestionClassifier = QuestionClassifierNode.builder()            .chatClient(chatClient)            .inputTextKey("input")            .outputKey("classifier_output")            .categories(List.of("after-sale service""transportation""product quality""others"))            .build();
        // 状态工厂定义 - 简化的状态管理        KeyStrategyFactory stateFactory = () -> {            Map<String, KeyStrategy> strategies = new HashMap<>();            strategies.put("input"new ReplaceStrategy());            strategies.put("classifier_output"new ReplaceStrategy());            strategies.put("solution"new ReplaceStrategy());            return strategies;        };
        // 构建工作流 - 声明式API        return new StateGraph("客户服务评价处理", stateFactory)            .addNode("feedback_classifier", node_async(feedbackClassifier))            .addNode("specific_question_classifier", node_async(specificQuestionClassifier))            .addNode("recorder", node_async(new RecordingNode()))            .addEdge(START, "feedback_classifier")            .addConditionalEdges("feedback_classifier",                edge_async(new FeedbackQuestionDispatcher()),                Map.of("positive""recorder""negative""specific_question_classifier"))            .addEdge("recorder", END);    }}


以上代码只展示了图结构(StateGraph)的构建,具体的代码实现你可以关注 spring-ai-alibaba-example 仓库:spring-ai-alibaba-example【1】


这个示例展示了 Spring AI Alibaba Graph 的核心特性:


  • 预置组件:使用 QuestionClassifierNode 快速实现分类功能。
  • 简化状态管理:通过 KeyStrategyFactory 统一管理状态。
  • 声明式 API:链式调用构建复杂工作流。
  • Spring Boot 集成:通过 @Configuration 和 @Bean 完成依赖注入。


2. 核心架构与设计理念

Cloud Native


2.1 整体数据流转架构


Spring AI Alibaba Graph 采用工作流模型,整个框架的数据流转遵循"构建→编译→执行"的三阶段模式:


2.1.1 完整数据流转图



2.1.2 核心执行流程详解


数据流转的核心理念:整个框架围绕 OverAllState 这个数据载体进行流转,每个节点都是状态的转换器,通过 AsyncNodeGenerator 这个状态机来驱动整个流程的执行。



2.1.3 关键数据结构流转


StateGraph → CompiledGraph 转换



AsyncNodeGenerator 执行机制



2.2 整体架构设计


基于上述数据流转机制,Spring AI Alibaba Graph 的整体架构设计具有以下特点:


  • 清晰的执行流程:每个节点代表一个处理步骤,边表示数据流向。
  • 灵活的条件分支:支持根据状态动态选择执行路径。
  • 并行处理能力:多个节点可以并行执行,提高处理效率。
  • 状态可追溯:完整的状态变化历史,便于调试和监控。


架构核心理念:Spring AI Alibaba Graph 将复杂的 AI 任务分解为可组合的原子操作,每个节点专注于单一职责,通过状态驱动的方式实现节点间的协调。这种设计让开发者可以像搭积木一样构建复杂的 AI 应用,既保证了系统的可维护性,又提供了足够的灵活性。


2.2.1 系统架构总览



2.2.2 StateGraph 构建流程


StateGraph 是工作流的蓝图设计器,它负责定义整个工作流的结构和执行逻辑,就像建筑师绘制建筑图纸一样。通过声明式的 API,开发者可以轻松定义节点、边和状态管理策略,最终编译成可执行的 CompiledGraph。



关键设计思想:StateGraph 采用了"先定义后执行"的模式,将工作流的结构定义与实际执行分离,这样可以在编译时进行各种验证和优化,确保运行时的稳定性和高效性。


2.2.3 CompiledGraph 执行流程


CompiledGraph 是工作流的运行时引擎,它将 StateGraph 的静态定义转换为可执行的动态流程。就像将建筑图纸变成真正的建筑物一样,CompiledGraph 负责协调各个组件的执行,管理状态流转,确保整个工作流按照预期运行。


AsyncNodeGenerator 是整个图流转执行的唯一状态机,它控制着工作流的每一步执行,包括节点调度、状态更新、条件判断和异常处理。这种单一状态机的设计确保了执行的一致性和可预测性。



核心执行机制:CompiledGraph 采用了基于迭代器模式的异步执行机制,每次调用 next() 方法都会推进工作流的执行,这种设计既支持同步调用,也支持流式处理,为不同的使用场景提供了灵活性。


2.3 核心组件关系图


组件职责说明


  • StateGraph:工作流的架构师,负责定义整个流程的结构和规则。
  • CompiledGraph:工作流的指挥官,负责协调和管理整个执行过程。
  • OverAllState:工作流的记忆中心,负责存储和管理所有状态数据。
  • Node:工作流的执行单元,每个节点专注于特定的业务逻辑。
  • Edge:工作流的连接器,定义节点之间的转换关系和条件。
  • AsyncNodeGenerator:工作流的执行引擎,是推动整个流程运转的核心状态机。


2.4 核心设计理念


2.4.1 声明式编程模型


借鉴 LangGraph 的设计理念,Spring AI Alibaba Graph 采用声明式编程模型,开发者只需要描述"做什么":


// 声明式定义工作流StateGraph graph = new StateGraph("客户服务工作流", stateFactory)    .addNode("feedback_classifier"node_async(feedbackClassifier))    .addNode("specific_question_classifier"node_async(specificQuestionClassifier))    .addNode("recorder"node_async(recorderNode))    .addEdge(START"feedback_classifier")    .addConditionalEdges("feedback_classifier"        edge_async(new FeedbackQuestionDispatcher()),        Map.of("positive""recorder""negative""specific_question_classifier"))    .addEdge("recorder"END);

2.4.2 状态驱动的执行模型



所有的数据流转都通过 OverAllState 进行管理,确保状态的一致性和可追溯性:


// 状态工厂定义KeyStrategyFactory stateFactory = () -> {    Map<StringKeyStrategy> strategies = new HashMap<>();    strategies.put("input"new ReplaceStrategy());    strategies.put("classifier_output"new ReplaceStrategy());    strategies.put("solution"new ReplaceStrategy());    return strategies;};


2.4.3 异步优先的设计


框架优先支持异步处理,提高系统的吞吐量和响应性,同时还原生支持了节点内模型流式透传



// 异步节点定义AsyncNodeAction asyncNode = node_async(new CustomNodeAction());
// 并行节点处理public class ParallelNode extends Node {    record AsyncParallelNodeAction(        List<AsyncNodeActionWithConfig> actions,        Map<StringKeyStrategy> channels    ) implements AsyncNodeActionWithConfig {
        @Override        public CompletableFuture<Map<StringObject>> apply(OverAllState state, RunnableConfig config) {            var futures = actions.stream()                .map(action -> action.apply(state, config))                .toArray(CompletableFuture[]::new);
            return CompletableFuture.allOf(futures)                .thenApply(v -> {                    // 合并所有结果                    Map<StringObject> result = new HashMap<>();                    for (CompletableFuture<Map<StringObject>> future : futures) {                        result.putAll(future.join());                    }                    return result;                });        }    }}

2.5 Spring 生态集成


Spring AI Alibaba Graph 与 Spring 生态深度集成,你可以轻松在你的 Spring 应用中引入 AI 模型工作流以开发智能 Java 应用。


2.5.1 依赖注入架构



2.5.2 依赖注入支持


以下代码演示了 Spring AI Alibaba Graph 是如何被 IOC 容器所管理的。


@Configurationpublic class GraphConfiguration {
    @Bean    public StateGraph workflowGraph(ChatModel chatModel) {        ChatClient chatClient = ChatClient.builder(chatModel)            .defaultAdvisors(new SimpleLoggerAdvisor())            .build();
        // 构建图定义...        return stateGraph;    }
    @Bean    public CompiledGraph compiledGraph(StateGraph stateGraph,                                       ObservationRegistry observationRegistry) {        return stateGraph.compile(CompileConfig.builder()            .withLifecycleListener(new GraphObservationLifecycleListener(observationRegistry))            .build());    }}


2.5.3 观测性集成


Spring AI Alibaba Graph 基于 Micrometer 内置了可观测支持,可以无缝集成 Spring Boot 可观测性。


@RestControllerpublic class GraphController {
    public GraphController(@Qualifier("workflowGraph") StateGraph stateGraph,                          ObjectProvider<ObservationRegistry> observationRegistry) {        this.compiledGraph = stateGraph.compile(CompileConfig.builder()            .withLifecycleListener(new GraphObservationLifecycleListener(                observationRegistry.getIfUnique(() -> ObservationRegistry.NOOP)))            .build());    }}


3. 核心概念深度解析

Cloud Native


3.1 StateGraph (状态图)


StateGraph 是整个框架的设计蓝图,它就像建筑师的设计图纸一样,定义了工作流的完整结构和执行逻辑。StateGraph 采用声明式 API,让开发者可以用简洁的代码描述复杂的业务流程,而不需要关心底层的执行细节。


核心设计理念:StateGraph 将复杂的工作流抽象为节点和边的组合,每个节点代表一个具体的操作,边定义了操作之间的流转关系。这种抽象让开发者可以专注于业务逻辑的设计,而不是执行机制的实现。


3.1.1 StateGraph 生命周期



3.1.2 基本构造


public class StateGraph {    // 核心数据结构    final Nodes nodes = new Nodes();  // 存储所有节点    final Edges edges = new Edges();  // 存储所有边
    // 特殊节点常量    public static final String END = "__END__";    public static final String START = "__START__";    public static final String ERROR = "__ERROR__";
    // 状态管理    private KeyStrategyFactory keyStrategyFactory;    private PlainTextStateSerializer stateSerializer;}

3.1.3 节点管理流程



支持的节点添加方式:


// 添加普通节点public StateGraph addNode(String id, AsyncNodeAction action) {    Node node = new Node(id, (config) -> AsyncNodeActionWithConfig.of(action));    return addNode(id, node);}
// 添加带配置的节点public StateGraph addNode(String id, AsyncNodeActionWithConfig actionWithConfig) {    Node node = new Node(id, (config) -> actionWithConfig);    return addNode(id, node);}
// 添加子图节点public StateGraph addNode(String id, StateGraph subGraph) {    subGraph.validateGraph(); // 先验证子图    var node = new SubStateGraphNode(id, subGraph);    return addNode(id, node);}


3.1.4 边管理流程



3.1.5 图验证机制



3.2 OverAllState (全局状态)


OverAllState 是工作流的数据中枢,它就像工作流的记忆系统一样,负责在各个节点之间传递和管理状态数据。OverAllState 不仅存储数据,还定义了数据的合并策略,确保不同节点产生的数据能够正确地整合在一起。


设计巧思:OverAllState 采用了策略模式来处理状态更新,不同的数据类型可以采用不同的合并策略(如替换、追加、合并等),这种设计让状态管理变得非常灵活,能够适应各种复杂的业务场景。


3.2.1 状态管理架构



3.2.2 状态更新流程



3.2.3 状态策略详解


策略模式架构



内置策略实现


// 替换策略 - 新值覆盖旧值public class ReplaceStrategy implements KeyStrategy {    @Override    public Object apply(Object oldValue, Object newValue) {        return newValue;    }}
// 追加策略 - 新值追加到列表,支持复杂的列表操作public class AppendStrategy implements KeyStrategy {    @Override    public Object apply(Object oldValue, Object newValue) {        if (newValue == null) {            return oldValue;        }
        // 处理Optional类型        if (oldValue instanceof Optional<?> oldValueOptional) {            oldValue = oldValueOptional.orElse(null);        }
        boolean oldValueIsList = oldValue instanceof List<?>;
        // 处理移除操作        if (oldValueIsList && newValue instanceof AppenderChannel.RemoveIdentifier<?>) {            var result = new ArrayList<>((List<Object>) oldValue);            removeFromList(result, (AppenderChannel.RemoveIdentifier) newValue);            return unmodifiableList(result);        }
        // 处理新值为集合的情况        List<Object> list = null;        if (newValue instanceof List) {            list = new ArrayList<>((List<?>) newValue);        } else if (newValue.getClass().isArray()) {            list = Arrays.asList((Object[]) newValue);        } else if (newValue instanceof Collection) {            list = new ArrayList<>((Collection<?>) newValue);        }
        // 合并逻辑        if (oldValueIsList) {            List<Object> oldList = (List<Object>) oldValue;            if (list != null) {                if (list.isEmpty()) {                    return oldValue;                }                // 合并并去重                var result = evaluateRemoval(oldList, list);                return Stream.concat(result.oldValues().stream(), result.newValues().stream())                    .distinct()                    .collect(Collectors.toList());            } else {                oldList.add(newValue);            }            return oldList;        } else {            ArrayList<Object> arrayResult = new ArrayList<>();            if (list != null) {                arrayResult.addAll(list);            } else {                arrayResult.add(newValue);            }            return arrayResult;        }    }}


自定义策略示例


// 自定义Map合并策略public class MapMergeStrategy implements KeyStrategy {    @Override    public Object apply(Object oldValue, Object newValue) {        if (oldValue instanceof Map && newValue instanceof Map) {            Map<StringObject> merged = new HashMap<>((Map) oldValue);            merged.putAll((Map) newValue);            return merged;        }        return newValue; // 默认替换    }}
// 自定义字符串连接策略public class StringConcatStrategy implements KeyStrategy {    private final String separator;
    public StringConcatStrategy(String separator) {        this.separator = separator;    }
    @Override    public Object apply(Object oldValue, Object newValue) {        if (oldValue instanceof String && newValue instanceof String) {            return oldValue + separator + newValue;        }        return newValue;    }}


策略工厂模式


public class StrategyFactory {
    public static KeyStrategyFactory createDefaultFactory() {        return () -> {            Map<StringKeyStrategy> strategies = new HashMap<>();            strategies.put("messages"new AppendStrategy());            strategies.put("input"new ReplaceStrategy());            strategies.put("output"new ReplaceStrategy());            return strategies;        };    }
    public static KeyStrategyFactory createCustomFactory(Map<String, KeyStrategy> customStrategies) {        return () -> {            Map<StringKeyStrategy> strategies = new HashMap<>();            // 添加默认策略            strategies.put("messages"new AppendStrategy());            strategies.put("input"new ReplaceStrategy());            // 覆盖自定义策略            strategies.putAll(customStrategies);            return strategies;        };    }}

3.3 Node (节点)


Node 是工作流的功能模块,每个节点就像一个专门的工作站,负责执行特定的业务逻辑。Node 的设计遵循单一职责原则,每个节点只关注一件事情,这样既保证了代码的可维护性,也提高了节点的可复用性。


执行特性:Node 支持同步和异步两种执行模式,还支持并行执行多个子任务。这种灵活的执行机制让 Node 既能处理简单的数据转换,也能处理复杂的外部服务调用,满足各种性能要求。


3.3.1 节点执行流程



3.3.2 节点类型层次结构



3.3.3 并行节点处理机制



3.4 Edge (边)


Edge 是工作流的路由器,它决定了数据在节点之间的流转路径。Edge 不仅仅是简单的连接线,它还包含了复杂的条件判断逻辑,能够根据当前状态动态决定下一步的执行路径。


智能路由:Edge 支持静态路由和动态路由两种模式。静态边提供固定的转换路径,而条件边则可以根据状态内容进行智能判断,这种设计让工作流具备了强大的条件分支能力,能够处理各种复杂的业务逻辑。


3.4.1 边的类型与结构



3.4.2 条件边路由流程



3.4.3 边验证机制


public class Edge {    public void validate(Nodes nodes) throws GraphStateException {        // 验证源节点存在        if (!nodes.anyMatchById(sourceId)) {            throw Errors.missingNodeInEdgeMapping.exception(sourceId);        }
        // 验证目标节点        for (EdgeValue target : targets()) {            if (target.id() != null) {                // 静态边:直接验证目标节点                if (!nodes.anyMatchById(target.id()) && !END.equals(target.id())) {                    throw Errors.missingNodeInEdgeMapping.exception(target.id());                }            } else if (target.value() != null) {                // 条件边:验证映射中的所有目标节点                for (String targetNodeId : target.value().mappings().values()) {                    if (!nodes.anyMatchById(targetNodeId) && !END.equals(targetNodeId)) {                        throw Errors.missingNodeInEdgeMapping.exception(targetNodeId);                    }                }            }        }    }}


3.5 CompiledGraph (编译图)


CompiledGraph 是工作流的执行引擎,它将 StateGraph 的静态定义转换为高效的运行时代码。就像将高级语言编译成机器码一样,CompiledGraph 对工作流进行了各种优化,包括节点预处理、边路由优化、状态管理策略等。


运行时优化:CompiledGraph 在编译过程中会进行多种优化,如节点依赖分析、并行执行规划、状态访问优化等,这些优化确保了工作流在运行时的高效性和稳定性。


3.5.1 编译过程详解



3.5.2 AsyncNodeGenerator 执行机制


AsyncNodeGenerator 是工作流执行的核心状态机,它负责推动整个工作流的运行。AsyncNodeGenerator 采用了基于迭代器的设计模式,每次调用 next() 方法都会执行一个步骤,这种设计既支持同步执行,也支持异步流式处理。


执行控制:AsyncNodeGenerator 内置了完善的执行控制机制,包括最大迭代次数检查、中断条件处理、错误恢复等,确保工作流在各种情况下都能稳定运行。



3.5.3 状态流转核心逻辑


public class AsyncNodeGenerator<Output extends NodeOutputimplements AsyncGenerator<Output> {
    @Override    public Data<Output> next() {        try {            // 1. 检查最大迭代次数            if (++iteration > maxIterations) {                return Data.error(new IllegalStateException(                    format("Maximum number of iterations (%d) reached!", maxIterations)));            }
            // 2. 检查是否结束            if (nextNodeId == null && currentNodeId == null) {                return releaseThread().map(Data::<Output>done)                    .orElseGet(() -> Data.done(currentState));            }
            // 3. 处理START节点            if (START.equals(currentNodeId)) {                doListeners(START, null);                var nextNodeCommand = getEntryPoint(currentState, config);                nextNodeId = nextNodeCommand.gotoNode();                currentState = nextNodeCommand.update();
                var cp = addCheckpoint(config, START, currentState, nextNodeId);                var output = (cp.isPresent() && config.streamMode() == StreamMode.SNAPSHOTS)                    ? buildStateSnapshot(cp.get()) : buildNodeOutput(currentNodeId);
                currentNodeId = nextNodeId;                return Data.of(output);            }
            // 4. 处理END节点            if (END.equals(nextNodeId)) {                nextNodeId = null;                currentNodeId = null;                doListeners(END, null);                return Data.of(buildNodeOutput(END));            }
            // 5. 检查中断条件            if (shouldInterruptAfter(currentNodeId, nextNodeId)) {                return Data.done(currentNodeId);            }            if (shouldInterruptBefore(nextNodeId, currentNodeId)) {                return Data.done(currentNodeId);            }
            // 6. 执行节点            currentNodeId = nextNodeId;            var action = nodes.get(currentNodeId);            return Data.of(evaluateAction(action, overAllState));
        } catch (Exception e) {            return Data.error(e);        }    }}


4. 预定义组件与工具箱

Cloud Native


4.1 预定义节点类型


Spring AI Alibaba Graph 提供了丰富的预定义节点工具箱,这些节点就像乐高积木一样,开发者可以通过组合这些预定义节点快速构建复杂的 AI 应用。每个预定义节点都经过了精心设计和优化,不仅功能强大,而且易于使用。


设计理念:预定义节点的设计遵循了"开箱即用"的原则,开发者只需要提供必要的配置参数,就能立即使用这些节点的强大功能,大大降低了 AI 应用的开发门槛。


4.1.1 节点分类架构



4.1.2 QuestionClassifierNode - 智能分类节点


QuestionClassifierNode 是工作流的智能分拣员,它能够理解文本内容并将其归类到预定义的类别中。这个节点内置了少样本学习机制,即使没有大量训练数据,也能实现准确的分类效果。


核心优势:QuestionClassifierNode 采用了提示工程的最佳实践,通过精心设计的提示词模板和少样本示例,让大语言模型能够准确理解分类任务的要求,实现高质量的文本分类。



应用场景:QuestionClassifierNode 特别适合客服系统的问题分类、内容审核的类型判断、邮件的自动分拣等场景,能够显著提高业务处理的自动化程度。


QuestionClassifierNode classifier = QuestionClassifierNode.builder()    .chatClient(chatClient)    .inputTextKey("input")    .outputKey("classifier_output")    .categories(List.of("positive feedback""negative feedback"))    .classificationInstructions(List.of(        "Try to understand the user's feeling when giving feedback."    ))    .build();


核心实现原理:


@Overridepublic Map<StringObjectapply(OverAllState state) throws Exception {    // 1. 从状态获取输入文本    if (StringUtils.hasLength(inputTextKey)) {        this.inputText = (String) state.value(inputTextKey).orElse(this.inputText);    }
    // 2. 构建少样本学习消息    List<Message> messages = new ArrayList<>();    messages.add(new UserMessage(QUESTION_CLASSIFIER_USER_PROMPT_1));    messages.add(new AssistantMessage(QUESTION_CLASSIFIER_ASSISTANT_PROMPT_1));    messages.add(new UserMessage(QUESTION_CLASSIFIER_USER_PROMPT_2));    messages.add(new AssistantMessage(QUESTION_CLASSIFIER_ASSISTANT_PROMPT_2));
    // 3. 调用大模型进行分类    ChatResponse response = chatClient.prompt()        .system(systemPromptTemplate.render(Map.of(            "inputText", inputText,             "categories", categories,            "classificationInstructions", classificationInstructions)))        .user(inputText)        .messages(messages)        .call()        .chatResponse();
    // 4. 返回分类结果    Map<StringObject> updatedState = new HashMap<>();    updatedState.put(outputKey, response.getResult().getOutput().getText());    return updatedState;}


4.1.3 LlmNode - 大模型调用节点


LlmNode 是工作流的智能大脑,它封装了与大语言模型的所有交互逻辑,让开发者可以轻松地在工作流中使用 AI 的强大能力。LlmNode 不仅支持简单的文本生成,还支持复杂的对话管理和流式输出。


智能特性:LlmNode 内置了提示词模板引擎,支持动态参数替换,还能管理完整的对话历史,这些特性让它能够处理各种复杂的 AI 交互场景。



流式处理优势:LlmNode 原生支持流式输出,这意味着用户可以实时看到 AI 的生成过程,而不需要等待完整的响应,大大提升了用户体验。


LlmNode llmNode = LlmNode.builder()    .chatClient(chatClient)    .systemPromptTemplate("You are a helpful assistant.")    .userPromptTemplate("Please process: {input}")    .messagesKey("messages")    .outputKey("llm_response")    .stream(true)  // 启用流式输出    .build();


核心特性:


  • 模板支持:支持系统提示词和用户提示词模板。
  • 消息历史:支持消息历史管理。
  • 流式输出:原生支持流式处理。
  • 参数渲染:支持动态参数替换。


4.1.4 ToolNode - 工具调用节点


ToolNode 是工作流的万能工具箱,它让 AI 能够调用外部工具和 API,极大地扩展了 AI 的能力边界。ToolNode 不仅能执行单个工具调用,还能并行执行多个工具,显著提高了处理效率。


核心价值:ToolNode 将 AI 从纯文本生成扩展到了实际的行动能力,让 AI 能够查询数据库、调用 API、执行计算等,真正实现了 AI Agent 的概念。



灵活性设计:ToolNode 支持各种类型的工具调用,从简单的函数调用到复杂的 API 集成,都能轻松处理,这种灵活性让 AI 应用能够适应各种业务场景。


ToolNode toolNode = ToolNode.builder()    .toolCallbacks(toolCallbacks)    .llmResponseKey("llm_response")    .outputKey("tool_response")    .build();


执行机制:


@Overridepublic Map<StringObjectapply(OverAllState state) throws Exception {    // 1. 获取助手消息(包含工具调用)    this.assistantMessage = (AssistantMessage) state.value(this.llmResponseKey)        .orElseGet(() -> {            List<Message> messages = (List<Message>) state.value("messages").orElseThrow();            return messages.get(messages.size() - 1);        });
    // 2. 执行工具调用    ToolResponseMessage toolResponseMessage = executeFunction(assistantMessage, state);
    // 3. 返回工具响应    Map<StringObject> updatedState = new HashMap<>();    updatedState.put("messages", toolResponseMessage);    if (StringUtils.hasLength(this.outputKey)) {        updatedState.put(this.outputKey, toolResponseMessage);    }    return updatedState;}


4.1.5 KnowledgeRetrievalNode - 知识检索节点


KnowledgeRetrievalNode 是工作流的知识专家,它能够从庞大的知识库中快速找到与问题相关的信息,为 AI 提供准确的背景知识。这个节点结合了向量检索和重排序技术,确保检索结果的准确性和相关性。


技术优势:KnowledgeRetrievalNode 采用了先进的 RAG(检索增强生成)技术,通过向量相似度计算找到相关文档,再通过重排序模型进一步优化结果质量,这种两阶段的设计确保了检索的精准性。



应用价值:KnowledgeRetrievalNode 让 AI 能够基于企业的私有知识库回答问题,这对于构建企业级 AI 助手、智能客服等应用具有重要意义。


KnowledgeRetrievalNode retrievalNode = KnowledgeRetrievalNode.builder()    .vectorStore(vectorStore)    .userPromptKey("query")    .topK(5)    .similarityThreshold(0.7)    .enableRanker(true)    .rerankModel(rerankModel)    .outputKey("retrieved_docs")    .build();


4.2 预定义 Agent 类型


4.2.1 ReactAgent - 反应式 Agent


ReactAgent 是工作流的智能决策者,它实现了经典的 ReAct(Reasoning and Acting)模式,能够根据当前情况动态决定是否需要调用工具。ReactAgent 就像一个有经验的助手,知道什么时候需要查找信息,什么时候可以直接回答。


核心思想:ReactAgent 将推理和行动结合在一起,让 AI 不仅能思考,还能行动。这种设计让 AI 具备了解决复杂问题的能力,能够通过多轮推理和工具调用来完成复杂任务。



智能循环:ReactAgent 的执行过程是一个智能循环,每次循环都会评估当前状态,决定下一步行动,这种设计让 AI 能够处理各种复杂和动态的任务场景。


ReactAgent reactAgent = new ReactAgent(    "weatherAgent",    chatClient,    toolCallbacks,    10  // 最大迭代次数);
// 编译并使用CompiledGraph compiledGraph = reactAgent.getAndCompileGraph();


内部图结构构建:


private StateGraph initGraph() throws GraphStateException {    StateGraph graph = new StateGraph(name, this.keyStrategyFactory);
    // 添加核心节点    graph.addNode("llm", node_async(this.llmNode));    graph.addNode("tool", node_async(this.toolNode));
    // 构建执行流程    graph.addEdge(START, "llm")         .addConditionalEdges("llm", edge_async(this::think),             Map.of("continue""tool""end", END))         .addEdge("tool""llm");
    return graph;}
// 决策逻辑private String think(OverAllState state) {    if (iterations > max_iterations) {        return "end";    }
    List<Message> messages = (List<Message>) state.value("messages").orElseThrow();    AssistantMessage message = (AssistantMessage) messages.get(messages.size() - 1);
    // 检查是否有工具调用    return message.hasToolCalls() ? "continue" : "end";}


4.2.2 ReflectAgent - 反思 Agent


ReflectAgent 是工作流的质量监督者,它实现了反思模式,能够对自己的输出进行评估和改进。ReflectAgent 就像一个严格的编辑,会反复检查和修改内容,直到达到满意的质量标准。


自我改进机制:ReflectAgent 采用了双节点协作的设计,一个节点负责生成内容,另一个节点负责评估质量,通过多轮迭代不断提升输出质量。这种设计让 AI 具备了自我完善的能力。



质量保证:ReflectAgent 特别适合对输出质量要求较高的场景,如文档写作、代码生成、创意内容等,通过反思机制确保最终输出的质量。


ReflectAgent reflectAgent = ReflectAgent.builder()    .graph(assistantGraphNode)      // 生成节点    .reflection(judgeGraphNode)     // 评判节点    .maxIterations(3)    .build();


执行流程详解:


public StateGraph createReflectionGraph(NodeAction graph, NodeAction reflection, int maxIterations) {    StateGraph stateGraph = new StateGraph(() -> {        HashMap<StringKeyStrategy> keyStrategyHashMap = new HashMap<>();        keyStrategyHashMap.put(MESSAGESnew ReplaceStrategy());        keyStrategyHashMap.put(ITERATION_NUMnew ReplaceStrategy());        return keyStrategyHashMap;    })    .addNode(GRAPH_NODE_IDnode_async(graph))    .addNode(REFLECTION_NODE_IDnode_async(reflection))    .addEdge(STARTGRAPH_NODE_ID)    .addConditionalEdges(GRAPH_NODE_IDedge_async(this::graphCount),        Map.of(REFLECTION_NODE_IDREFLECTION_NODE_IDENDEND))    .addConditionalEdges(REFLECTION_NODE_IDedge_async(this::apply),        Map.of(GRAPH_NODE_IDGRAPH_NODE_IDENDEND));
    return stateGraph;}
// 迭代次数检查private String graphCount(OverAllState state) {    int iterationNum = state.value(ITERATION_NUMInteger.class).orElse(0);    state.updateState(Map.of(ITERATION_NUM, iterationNum + 1));
    return iterationNum >= maxIterations ? END : REFLECTION_NODE_ID;}
// 消息类型检查private String apply(OverAllState state) {    List<Message> messages = state.value(MESSAGESList.class).orElse(new ArrayList<>());    if (messages.isEmpty()) return END;
    Message lastMessage = messages.get(messages.size() - 1);    return lastMessage instanceof UserMessage ? GRAPH_NODE_ID : END;}


4.2.3 ReactAgentWithHuman - 人机协作 Agent


ReactAgentWithHuman 是工作流的人机协作专家,它在 ReactAgent 的基础上增加了人工干预能力,让 AI 和人类能够协作完成复杂任务。这种设计特别适合需要人工审核、决策确认或专业判断的场景。


协作机制:ReactAgentWithHuman 内置了完善的中断和恢复机制,当遇到需要人工干预的情况时,系统会自动暂停执行,等待人工处理,然后无缝恢复执行。这种设计让人机协作变得自然而流畅。



人机协作实现:


private StateGraph initGraph() throws GraphStateException {    StateGraph graph = new StateGraph(name, keyStrategyFactory)        .addNode("agent", node_async(this.llmNode))        .addNode("human", node_async(this.humanNode))        .addNode("tool", node_async(this.toolNode))        .addEdge(START, "agent")        .addEdge("agent""human")        .addConditionalEdges("human", edge_async(humanNode::think),            Map.of("agent""agent""tool""tool""end", END))        .addEdge("tool""agent");
    return graph;}
// HumanNode的决策逻辑public String think(OverAllState state) {    // 检查是否需要中断    if (shouldInterruptFunc != null && shouldInterruptFunc.apply(state)) {        // 设置中断消息,等待人工处理        state.setInterruptMessage("等待人工审批");        return "human_interrupt";    }
    // 检查是否需要工具调用    List<Message> messages = (List<Message>) state.value("messages").orElse(new ArrayList<>());    if (!messages.isEmpty()) {        Message lastMessage = messages.get(messages.size() - 1);        if (lastMessage instanceof AssistantMessage &&             ((AssistantMessage) lastMessage).hasToolCalls()) {            return "tool";        }    }
    return "agent";}


5. 高级特性与扩展能力

Cloud Native


5.1 可观测性


Spring AI Alibaba Graph 提供了企业级的全链路观测能力,基于 OpenTelemetry 和 Micrometer 标准,实现了从 Graph 执行到模型调用的完整追踪。


5.1.1 核心特性


  • 全链路可观测:实时追踪每个节点的输入、输出和状态变化。
  • 流式数据采集:支持异步、并行、流式节点的观测。
  • 异常溯源:快速定位异常节点和数据。
  • 多平台支持:兼容 Langfuse、Jaeger、Zipkin、Prometheus 等主流平台。


5.1.2 快速接入


使用观测性 Starter


<dependency>    <groupId>com.alibaba.cloud.ai</groupId>    <artifactId>spring-ai-alibaba-starter-graph-observation</artifactId>    <version>${spring-ai-alibaba.version}</version></dependency>
@Beanpublic CompiledGraph compiledGraph(StateGraph observabilityGraph,                                   CompileConfig observationCompileConfig) throws GraphStateException {    return observabilityGraph.compile(observationCompileConfig);}


5.1.3 详细文档


关于 Spring AI Alibaba Graph 观测性的完整架构设计、实现原理、配置方式、最佳实践等详细内容,请参考官方观测性文档:


📚 Graph 观测性完整指南

Spring AI Alibaba Graph 观测性设计与实现【2】


该文档涵盖:


  • 观测性设计理念与架构
  • 并行与流式观测实现
  • 多平台集成配置
  • Langfuse 等可视化平台使用
  • 最佳实践与扩展建议


🔗 完整示例代码

graph-observability-langfuse【3】


5.2 并行节点与流式处理


5.2.1 并行节点的两种创建方式


Spring AI Alibaba Graph 提供了两种创建并行节点的方式,这两种方式在底层实现上有所不同,但都能实现并行处理的效果。


方式一:直接创建 ParallelNode

直接创建一个 ParallelNode 实例,并将其注册到 StateGraph 中:


// 创建并行任务列表List<AsyncNodeActionWithConfig> parallelActions = List.of(    node_async(new DataProcessingNode1()),    node_async(new DataProcessingNode2()),    node_async(new DataProcessingNode3()));
// 定义状态合并策略Map<StringKeyStrategy> channels = Map.of(    "results"new AppendStrategy(),    "metadata"new ReplaceStrategy());
// 创建并行节点ParallelNode parallelNode = new ParallelNode(    "data_processing",           // 节点内部ID      parallelActions,            // 并行任务列表    channels                    // KeyStrategy映射);
// 添加到StateGraphstateGraph.addNode("parallel_tasks", parallelNode);


方式二:通过 StateGraph 描述并行边

这是更常用的方式,通过添加多个指向相同目标的边来定义并行结构:


StateGraph workflow = new StateGraph(keyStrategyFactory)    .addNode("source", node_async(sourceNode))    .addNode("task1", node_async(task1Node))    .addNode("task2", node_async(task2Node))    .addNode("task3", node_async(task3Node))    .addNode("merger", node_async(mergerNode))
    // 创建并行分支 - 从source到多个任务    .addEdge("source""task1")    .addEdge("source""task2")    .addEdge("source""task3")
    // 汇聚到merger节点    .addEdge("task1""merger")    .addEdge("task2""merger")    .addEdge("task3""merger")
    .addEdge(START, "source")    .addEdge("merger", END);


编译时转换机制


当 StateGraph 编译时,框架会自动检测并行边模式,并在内部创建 ParallelNode:


// CompiledGraph编译过程中的处理逻辑if (targets.size() > 1) {    // 检测到并行边,获取所有并行目标节点的Action    var actions = parallelNodeStream.get()        .map(target -> nodes.get(target.id()))        .toList();
    // 自动创建ParallelNode    var parallelNode = new ParallelNode(e.sourceId(), actions, keyStrategyMap);
    // 替换原有节点和边的映射    nodes.put(parallelNode.id(), parallelNode.actionFactory().apply(compileConfig));    edges.put(e.sourceId(), new EdgeValue(parallelNode.id()));}


5.2.2 并行节点的内部执行机制


ParallelNode 的核心实现基于 CompletableFuture.allOf(),实现真正的并行执行:


public class ParallelNode extends Node {
    record AsyncParallelNodeAction(        List<AsyncNodeActionWithConfig> actions,        Map<StringKeyStrategy> channels    ) implements AsyncNodeActionWithConfig {
        @Override        public CompletableFuture<Map<StringObject>> apply(OverAllState state, RunnableConfig config) {            Map<StringObject> partialMergedStates = new HashMap<>();            Map<StringObject> asyncGenerators = new HashMap<>();
            // 并行执行所有Action            var futures = actions.stream()                .map(action -> action.apply(state, config)                    .thenApply(partialState -> {                        // 分离普通结果和AsyncGenerator                        partialState.forEach((key, value) -> {                            if (value instanceof AsyncGenerator<?> || value instanceof GeneratorSubscriber) {                                ((List) asyncGenerators.computeIfAbsent(key, k -> new ArrayList<>())).add(value);                            } else {                                partialMergedStates.put(key, value);                            }                        });                        // 立即更新状态                        state.updateState(partialMergedStates);                        return action;                    }))                .toList()                .toArray(new CompletableFuture[0]);
            // 等待所有任务完成            return CompletableFuture.allOf(futures)                .thenApply((p) -> CollectionUtils.isEmpty(asyncGenerators)                     ? state.data()                     : asyncGenerators);        }    }}


5.2.3 并行流式处理的合并机制


核心挑战:当多个并行分支都产生流式输出时,如何将这些异步流合并成统一的输出流?


Spring AI Alibaba Graph 通过 AsyncGeneratorUtils.createMergedGenerator 框架内核中解决了这个复杂问题:



5.2.4 MergedGenerator 核心实现


AsyncGeneratorUtils.createMergedGenerator 是框架内核的核心算法,实现了多个异步流的智能合并:


public static <T> AsyncGenerator<T> createMergedGenerator(    List<AsyncGenerator<T>> generators,    Map<String, KeyStrategy> keyStrategyMap) {
    return new AsyncGenerator<>() {        // 使用StampedLock优化并发性能        private final StampedLock lock = new StampedLock();        private AtomicInteger pollCounter = new AtomicInteger(0);        private Map<StringObject> mergedResult = new HashMap<>();        private final List<AsyncGenerator<T>> activeGenerators = new CopyOnWriteArrayList<>(generators);
        @Override        public AsyncGenerator.Data<T> next() {            while (true) {                // 乐观读锁快速检查                long stamp = lock.tryOptimisticRead();                boolean empty = activeGenerators.isEmpty();                if (!lock.validate(stamp)) {                    stamp = lock.readLock();                    try {                        empty = activeGenerators.isEmpty();                    } finally {                        lock.unlockRead(stamp);                    }                }                if (empty) {                    return AsyncGenerator.Data.done(mergedResult);                }
                // 轮询策略选择Generator                final AsyncGenerator<T> current;                long writeStamp = lock.writeLock();                try {                    final int size = activeGenerators.size();                    if (size == 0return AsyncGenerator.Data.done(mergedResult);
                    int currentIdx = pollCounter.updateAndGet(i -> (i + 1) % size);                    current = activeGenerators.get(currentIdx);                } finally {                    lock.unlockWrite(writeStamp);                }
                // 在无锁状态下执行Generator                AsyncGenerator.Data<T> data = current.next();
                // 处理结果并更新状态                writeStamp = lock.writeLock();                try {                    if (!activeGenerators.contains(current)) {                        continue;                    }
                    if (data.isDone() || data.isError()) {                        handleCompletedGenerator(current, data);                        if (activeGenerators.isEmpty()) {                            return AsyncGenerator.Data.done(mergedResult);                        }                        continue;                    }
                    handleCompletedGenerator(current, data);                    return data;                } finally {                    lock.unlockWrite(writeStamp);                }            }        }
        private void handleCompletedGenerator(AsyncGenerator<T> generator, AsyncGenerator.Data<T> data) {            // 移除完成的Generator            if (data.isDone() || data.isError()) {                activeGenerators.remove(generator);            }
            // 使用KeyStrategy合并结果            data.resultValue().ifPresent(result -> {                if (result instanceof Map) {                    Map<StringObject> mapResult = (Map<StringObject>) result;                    mergedResult = OverAllState.updateState(mergedResult, mapResult, keyStrategyMap);                }            });        }    };}


核心算法特点


  • 轮询机制:通过 pollCounter 实现公平的轮询调度。
  • StampedLock 优化:使用乐观读锁提高并发性能。
  • 状态合并:通过 KeyStrategy 实现灵活的状态合并策略。
  • 线程安全:CopyOnWriteArrayList 确保并发访问的安全性。


5.2.5 流式输出配置


@RestController@RequestMapping("/stream")public class StreamingController {
    private final CompiledGraph compiledGraph;
    @GetMapping(value = "/process", produces = MediaType.TEXT_EVENT_STREAM_VALUE)    public Flux<ServerSentEvent<String>> processStream(@RequestParam String input) {        return Flux.create(sink -> {            try {                AsyncGenerator<NodeOutput> generator = compiledGraph.stream(                    Map.of("input", input),                    RunnableConfig.builder()                        .threadId(UUID.randomUUID().toString())                        .build()                );
                generator.forEachAsync(output -> {                    if (output instanceof StreamingOutput) {                        StreamingOutput streamingOutput = (StreamingOutput) output;                        String chunk = streamingOutput.chunk().toString();                        sink.next(ServerSentEvent.builder(chunk).build());                    }                }).thenRun(() -> {                    sink.complete();                }).exceptionally(throwable -> {                    sink.error(throwable);                    return null;                });
            } catch (Exception e) {                sink.error(e);            }        });    }}


5.3 子图节点


子图节点是工作流的模块化组件,它允许将复杂的工作流分解为可重用的子模块。子图节点就像函数调用一样,可以在主工作流中调用预定义的子工作流,实现代码复用和模块化设计。


5.3.1 子图节点类型


Spring AI Alibaba Graph 支持两种类型的子图节点:


SubStateGraphNode - 未编译子图节点

public class SubStateGraphNode extends Node {    private final StateGraph subGraph;
    public SubStateGraphNode(String id, StateGraph subGraph) {        super(id, (config) -> {            // 在运行时编译子图            CompiledGraph compiledSubGraph = subGraph.compile(config);            return new SubGraphAction(compiledSubGraph);        });        this.subGraph = subGraph;    }}

SubCompiledGraphNode - 预编译子图节点

public class SubCompiledGraphNode extends Node {    private final CompiledGraph subGraph;
    public SubCompiledGraphNode(String id, CompiledGraph subGraph) {        super(id, (config) -> new SubGraphAction(subGraph));        this.subGraph = subGraph;    }}

5.3.2 子图定义与使用


定义文档处理子图


public class DocumentProcessingSubGraph {
    public static StateGraph createDocumentProcessingGraph(ChatModel chatModel) {        ChatClient chatClient = ChatClient.builder(chatModel).build();
        // 文档提取节点        DocumentExtractorNode extractorNode = new DocumentExtractorNode(            "document_path""extracted_text", List.of("pdf""docx""txt")        );
        // 文档分析节点        LlmNode analysisNode = LlmNode.builder()            .chatClient(chatClient)            .systemPromptTemplate("你是一个文档分析专家,请分析文档内容并提取关键信息。")            .userPromptTemplate("请分析以下文档内容:\n{extracted_text}")            .outputKey("analysis_result")            .build();
        KeyStrategyFactory stateFactory = () -> {            Map<String, KeyStrategy> strategies = new HashMap<>();            strategies.put("document_path"new ReplaceStrategy());            strategies.put("extracted_text"new ReplaceStrategy());            strategies.put("analysis_result"new ReplaceStrategy());            return strategies;        };
        return new StateGraph("文档处理子图", stateFactory)            .addNode("extractor", node_async(extractorNode))            .addNode("analyzer", node_async(analysisNode))            .addEdge(START, "extractor")            .addEdge("extractor""analyzer")            .addEdge("analyzer", END);    }}


在主工作流中使用子图


@Configurationpublic class MainWorkflowConfiguration {
    @Bean    public StateGraph mainWorkflow(ChatModel chatModel) {        // 创建子图        StateGraph documentProcessingSubGraph = DocumentProcessingSubGraph            .createDocumentProcessingGraph(chatModel);
        // 创建其他节点        QuestionClassifierNode classifierNode = QuestionClassifierNode.builder()            .chatClient(ChatClient.builder(chatModel).build())            .inputTextKey("input")            .outputKey("classifier_output")            .categories(List.of("document_processing""general_question"))            .build();
        LlmNode generalAnswerNode = LlmNode.builder()            .chatClient(ChatClient.builder(chatModel).build())            .systemPromptTemplate("你是一个通用助手,请回答用户的问题。")            .userPromptTemplate("用户问题:{input}")            .outputKey("general_answer")            .build();
        KeyStrategyFactory stateFactory = () -> {            Map<String, KeyStrategy> strategies = new HashMap<>();            strategies.put("input"new ReplaceStrategy());            strategies.put("classifier_output"new ReplaceStrategy());            strategies.put("document_path"new ReplaceStrategy());            strategies.put("extracted_text"new ReplaceStrategy());            strategies.put("analysis_result"new ReplaceStrategy());            strategies.put("general_answer"new ReplaceStrategy());            return strategies;        };
        return new StateGraph("主工作流", stateFactory)            .addNode("classifier", node_async(classifierNode))            .addNode("document_processor", documentProcessingSubGraph)  // 添加子图            .addNode("general_answer", node_async(generalAnswerNode))            .addEdge(START, "classifier")            .addConditionalEdges("classifier"                edge_async(new ClassifierDispatcher()),                Map.of("document_processing""document_processor"                       "general_question""general_answer"))            .addEdge("document_processor", END)            .addEdge("general_answer", END);    }}


5.3.3 子图执行流程



5.3.4 子图状态管理


状态隔离与传递


public class SubGraphAction implements AsyncNodeActionWithConfig {    private final CompiledGraph subGraph;
    @Override    public CompletableFuture<Map<StringObject>> apply(OverAllState state, RunnableConfig config) {        return CompletableFuture.supplyAsync(() -> {            try {                // 从主状态中提取子图需要的数据                Map<StringObject> subGraphInput = extractSubGraphInput(state);
                // 执行子图                Optional<OverAllState> subGraphResult = subGraph.invoke(subGraphInput, config);
                // 将子图结果映射回主状态                return mapSubGraphOutput(subGraphResult.orElse(null));
            } catch (Exception e) {                throw new RuntimeException("子图执行失败", e);            }        });    }
    private Map<StringObjectextractSubGraphInput(OverAllState state) {        Map<StringObject> input = new HashMap<>();        // 根据子图的输入需求提取数据        state.value("document_path").ifPresent(value -> input.put("document_path", value));        state.value("input").ifPresent(value -> input.put("input", value));        return input;    }
    private Map<StringObjectmapSubGraphOutput(OverAllState subGraphState) {        Map<StringObject> output = new HashMap<>();        if (subGraphState != null) {            // 将子图的输出映射到主状态            subGraphState.value("analysis_result").ifPresent(value ->                 output.put("analysis_result", value));            subGraphState.value("extracted_text").ifPresent(value ->                 output.put("extracted_text", value));        }        return output;    }}


5.4 中断与恢复机制


中断与恢复机制是工作流的容错保障,它让工作流能够在遇到需要人工干预或外部条件不满足时优雅地暂停执行,并在条件满足后无缝恢复。这种机制对于构建可靠的企业级 AI 应用至关重要。


5.4.1 中断机制原理



5.4.2 中断条件配置


InterruptBefore - 节点执行前中断


@Configurationpublic class InterruptConfiguration {
    @Bean    public CompiledGraph interruptableGraph(StateGraph stateGraph) {        return stateGraph.compile(CompileConfig.builder()            .withInterruptBefore("human_approval")  // 在human_approval节点前中断            .build());    }}


InterruptAfter - 节点执行后中断


@Beanpublic CompiledGraph interruptableGraph(StateGraph stateGraph) {    return stateGraph.compile(CompileConfig.builder()        .withInterruptAfter("data_processing")  // 在data_processing节点后中断        .build());}


动态中断条件


public class DynamicInterruptNode implements AsyncNodeActionWithConfig {
    @Override    public CompletableFuture<Map<StringObject>> apply(OverAllState state, RunnableConfig config) {        return CompletableFuture.supplyAsync(() -> {            // 检查是否需要中断            if (shouldInterrupt(state)) {                // 设置中断消息                state.setInterruptMessage("需要人工审批,请检查数据质量");
                Map<StringObject> result = new HashMap<>();                result.put("interrupt_reason""data_quality_check");                result.put("requires_approval"true);                return result;            }
            // 正常处理逻辑            return processData(state);        });    }
    private boolean shouldInterrupt(OverAllState state) {        // 自定义中断条件逻辑        Double confidence = (Double) state.value("confidence_score").orElse(1.0);        return confidence < 0.8;  // 置信度低于80%时中断    }}


5.4.3 状态快照管理


内存快照存储


@Componentpublic class MemorySnapshotManager {
    private final Map<StringOverAllState> snapshots = new ConcurrentHashMap<>();
    public String saveSnapshot(OverAllState state) {        String snapshotId = UUID.randomUUID().toString();        snapshots.put(snapshotId, state.snapShot().orElse(state));        return snapshotId;    }
    public OverAllState loadSnapshot(String snapshotId) {        OverAllState snapshot = snapshots.get(snapshotId);        if (snapshot == null) {            throw new IllegalArgumentException("快照不存在: " + snapshotId);        }        return snapshot;    }
    public void removeSnapshot(String snapshotId) {        snapshots.remove(snapshotId);    }}


持久化快照存储


@Componentpublic class PersistentSnapshotManager {
    private final RedisTemplate<StringString> redisTemplate;    private final ObjectMapper objectMapper;
    public String saveSnapshot(OverAllState state) {        try {            String snapshotId = UUID.randomUUID().toString();            String serializedState = objectMapper.writeValueAsString(state);
            redisTemplate.opsForValue().set(                "snapshot:" + snapshotId,                 serializedState,                 Duration.ofHours(24)  // 24小时过期            );
            return snapshotId;        } catch (Exception e) {            throw new RuntimeException("保存快照失败", e);        }    }
    public OverAllState loadSnapshot(String snapshotId) {        try {            String serializedState = redisTemplate.opsForValue().get("snapshot:" + snapshotId);            if (serializedState == null) {                throw new IllegalArgumentException("快照不存在: " + snapshotId);            }
            return objectMapper.readValue(serializedState, OverAllState.class);        } catch (Exception e) {            throw new RuntimeException("加载快照失败", e);        }    }}


6. 快速开始与实战指南

Cloud Native


6.1 环境准备


6.1.1 依赖配置


在你的 Spring Boot 项目中添加 Spring AI Alibaba Graph 依赖:


<properties>    <spring-ai-alibaba.version>1.0.0.3-SNAPSHOT</spring-ai-alibaba.version></properties><dependencies>    <dependency>        <groupId>com.alibaba.cloud.ai</groupId>        <artifactId>spring-ai-alibaba-starter-dashscope</artifactId>        <version>${spring-ai-alibaba.version}</version>    </dependency>    <dependency>        <groupId>com.alibaba.cloud.ai</groupId>        <artifactId>spring-ai-alibaba-graph-core</artifactId>        <version>${spring-ai-alibaba.version}</version>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>    </dependency></dependencies>

6.2 快速开始流程


6.2.1 创建第一个工作流


@Configurationpublic class MyFirstGraphConfiguration {
    @Bean    public StateGraph myFirstGraph(ChatModel chatModel) {        // 1. 创建ChatClient        ChatClient chatClient = ChatClient.builder(chatModel).build();
        // 2. 定义节点        LlmNode welcomeNode = LlmNode.builder()            .chatClient(chatClient)            .systemPromptTemplate("你是一个友好的助手")            .userPromptTemplate("欢迎用户:{input}")            .outputKey("welcome_message")            .build();
        // 3. 定义状态策略        KeyStrategyFactory stateFactory = () -> {            Map<StringKeyStrategy> strategies = new HashMap<>();            strategies.put("input"new ReplaceStrategy());            strategies.put("welcome_message"new ReplaceStrategy());            return strategies;        };
        // 4. 构建工作流        return new StateGraph("我的第一个工作流", stateFactory)            .addNode("welcome"node_async(welcomeNode))            .addEdge(START"welcome")            .addEdge("welcome"END);    }
    @Bean    public CompiledGraph compiledGraph(StateGraph myFirstGraph) {        return myFirstGraph.compile();    }}


6.2.2 使用工作流


@RestControllerpublic class GraphController {
    private final CompiledGraph compiledGraph;
    @PostMapping("/chat")    public ResponseEntity<Map<StringObject>> chat(@RequestBody String input) {        Optional<OverAllState> result = compiledGraph.invoke(Map.of("input", input));        return ResponseEntity.ok(result.map(OverAllState::data).orElse(Map.of()));    }}


6.3 完整示例项目


为了帮助开发者更好地理解和使用 Spring AI Alibaba Graph,我们提供了完整的示例项目:


📚 官方示例仓库

spring-ai-alibaba-graph-example【1】


快速体验步骤


1. 克隆仓库

git clone https://github.com/springaialibaba/spring-ai-alibaba-examples.gitcd spring-ai-alibaba-examples/spring-ai-alibaba-graph-example

2. 配置环境

# 设置DashScope API Keyexport AI_DASHSCOPE_API_KEY=your_api_key_here

3. 运行示例

mvn spring-boot:run

6.4 社区支持


技术支持


  • GitHub Issues
    提交问题和建议【4】
  • 官方文档
    完整文档站点【5】
  • 示例代码
    更多示例【6】


通过以上指南和完整的示例项目,你可以快速掌握 Spring AI Alibaba Graph 的使用方法,并在实际项目中高效地构建智能化应用。


【1】spring-ai-alibaba-example 仓库

https://github.com/springaialibaba/spring-ai-alibaba-examples/tree/main/spring-ai-alibaba-graph-example


【2】Spring AI Alibaba Graph 观测性设计与实现

https://www.yuque.com/disaster-4qc4i/xhs01z/qrh6lv7m3sexgvr4


【3】完整示例代码

https://github.com/springaialibaba/spring-ai-alibaba-examples/tree/main/spring-ai-alibaba-graph-example/graph-observability-langfuse


【4】提交问题和建议

https://github.com/alibaba/spring-ai-alibaba/issues


【5】完整文档站点

https://java2ai.com/


【6】更多示例

https://github.com/springaialibaba/spring-ai-alibaba-examples


53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询