技术咨询、项目合作、广告投放、简历咨询、技术文档下载 点击这里 联系博主

# LangGraph 使用案例

# 普通的langgraph案例

  graphBuilder.addNode("node1", node1);
  graphBuilder.addNode("node2", node2);
  graphBuilder.addNode("node3", node3Action);
  graphBuilder.addEdge(START, "node1");
  graphBuilder.addEdge(START, "node2");
  graphBuilder.addEdge(START, "node3");
  graphBuilder.addEdge("node1", END);
  graphBuilder.addEdge("node2", END);
  graphBuilder.addEdge("node3", END);

# 条件分支

addConditionalEdges(source, routingFunction, pathMap?): this
  • source: 源节点
  • routingFunction: 路由函数,根据返回值决定流向
  • pathMap: 可选,用于明确 routingFunction 每个返回值的流向,不指定则默认增加一个 end 的边

    pathMap 可以是一个 string map 或者 string[],如果是 string map,则表示node 的 map 结构,否则如果是一个数组类型,代表当前条件路由允许到达的节点。

# 条件edge 的langgraph案例

如果没有通过 pathMap 明确 routingFunction 每个返回值的流向,LG 会默认为条件分支增加一个 end 的边。

  // 如果没有通过 pathMap 明确 routingFunction 每个返回值的流向,LG 会默认为条件分支增加一个 __end__ 的边。
  const condition = (state: typeof TestStateAnnotation.State) => {
    // 一个模拟条件,假设存在a了就走node2,否则走node3
    if (state.list.includes("a")) {
      return "node2";
    } else {
      return "node3";
    }
  };

  graphBuilder.addNode("node1", node1);
  graphBuilder.addNode("node2", node2);
  graphBuilder.addNode("node3", node3Action);

  graphBuilder.addEdge(START, "node1");
  graphBuilder.addConditionalEdges("node1", condition, {
    node2: "node2",
    node3: "node3"
  });
  graphBuilder.addEdge("node2", END);
  graphBuilder.addEdge("node3", END);

# 条件edge + 并行节点 的langgraph案例

如果没有通过 pathMap 明确 routingFunction 每个返回值的流向,LG 会默认为条件分支增加一个 end 的边。

  const condition = (state: typeof TestStateAnnotation.State) => {
    if (state.list.includes("a")) {
      return END;
    } else {
      return ["node2", "node3"];
    }
  };

  graphBuilder.addNode("node1", node1);
  graphBuilder.addNode("node2", node2);
  graphBuilder.addNode("node3", node3Action);

  graphBuilder.addEdge(START, "node1");
  graphBuilder.addConditionalEdges("node1", condition);
  graphBuilder.addEdge("node2", END);
  graphBuilder.addEdge("node3", END);

# 并行、循环节点

在 前面的代码 中,我们可以基于 addConditionalEdges() 方法,辅助我们实现并行任务的能力,我们已经实现了简单的并行任务

  const condition = (state: typeof TestStateAnnotation.State) => {
    if (state.list.includes("a")) {
      return END;
    } else {
      return ["node2", "node3"];
    }
  };

但是此时我们只能固定 node2 和 node3 一起执行,无法动态的复用多个节点,而且这个过程也不是可循环的(这里循环是指多次执行同一个节点)。

# 利用 Send 实现并行

Send 在 LG 中是一个 Class,我们可以实例化,并且构造函数很简单:

  • node:要路由到哪个节点
  • args:路由到节点时,传递给节点的参数是什么(可以传入整个 state 也可以传入独立的参数)

我们想要实现的效果:

  • 输入:["langgraph"]
  • 输出:{"langgraph": ['l', 'a', 'n', 'g', 'g', 'r', 'a', 'p', 'h']}

整体思路:动态创建 N 个 edge,路由到【字符串处理】(如下面的Node1)这个节点上,并行执行,然后收集结果;(并行)

我们观察流程图,虽然只有一条线 start -> node1 -> end , 但是我们知道这个过程中 node1 会并行的执行 N 次。

  const nodeAction = (str: string) => {
        // 因为 node1 每次返回的 State 类型是是 { output : Record<string, string[]> },可以通过我们的 reducer 直接合并成最终的结果
    return {
      output: { [str]: Array.from(str) }
    };
  };
  const condition = (state: typeof TestStateAnnotation.State) => {
    const { inputList } = state;
    // 创建多个节点
    return inputList.map(item => new Send("node1", item));
  };

  graphBuilder.addNode("node1", nodeAction);
  graphBuilder.addConditionalEdges(START, condition, ["node1"]);
  graphBuilder.addEdge("node1", END);

# 利用 Send 实现循环

我们想要实现的效果:

  • 输入:["langgraph"]
  • 输出:{"langgraph": ['l', 'a', 'n', 'g', 'g', 'r', 'a', 'p', 'h']}

整体思路:非并行循环:我们再输入 state 之后,将 state 传递到节点,节点自己循环自己,最终完成任务,输出结果;(非并行,无法利用现有的 node,因为现有的 node 只接受 string 入参)

流程中循环的实现其实就是一个节点执行完成后,重新回到某个起点,然后通过某个条件判断是否要结束,转到另外一个节点

注意

  • Send 不允许直接指向 end 节点
  • Send 不允许直接指向 start 节点
  const nodeAction = (str: string) => {
    // 并且因为 node1 每次返回的 State 类型是是 { output : Record<string, string[]> },可以通过我们的 reducer 直接合并成最终的结果。
    return {
      output: { [str]: Array.from(str) }
    };
  };
  const loopNode = (state: typeof TestStateAnnotation.State) => {
    return state;
  };
  const condition = (state: typeof TestStateAnnotation.State) => {
    const { inputList } = state;
    const first = inputList.shift();
    if (!first) {
      return END;
    }
  
    return new Send('node1', first);
  };

  graphBuilder
  .addNode('node1', nodeAction)
  .addNode('loop', loopNode)
  .addEdge(START, 'loop')
  .addConditionalEdges('loop', condition, ['node1', END])
  .addEdge('node1', 'loop');

# State 的修改

我们声明的 State 会在流程的各个节点流转,每个节点都可以进行 state 更改,我们通过 reducer 决定数据更改的方式,默认的 state 会通过覆盖的方式不断被修改。

# State 的合并

当业务应用复杂的时候,我们可能会有多个 Annotation.Root 声明的 State,或者我们希望复用某些 State Annotation,LG 支持我们通过state 的 spec 属性进行合并。 下面的示例中,我们新增了一个 AnotherStateAnnotation 然后我们将前面的 StateAnnotation 和 AnotherStateAnnotation 合并成一个新的 mergedStateAnnotation。


const AnotherStateAnnotation = Annotation.Root({
  title: Annotation<string>,
});

const mergedStateAnnotation = Annotation.Root({
  ...StateAnnotation.spec,
  ...AnotherStateAnnotation.spec,
});

# State 自动忽略

通过 Annotation.Root 声明了 State 之后,得到三个属性(channel):name、list、age 意味着我们可以在流程中修改这3个属性的值,因为每个属性是一个 channel,因此当 state 输入到节点中时:

  1. 如果节点只修改了其中某个属性,其他的属性会保持不变,不会丢失
  2. 如果节点修改了没有在 Annotation.Root 中声明的属性,则不会生效

参考下面案例:

const StateAnnotation = Annotation.Root({
  name: Annotation<string>,
  list: Annotation<Record<string, string[]>>({
    reducer: (current, updated) => ({ ...current, ...updated }),
  }),
  age: Annotation<number>({
    default: () => 1,
    reducer: current => current + 1,
  }),
});

const graphBuilder = new StateGraph(StateAnnotation);

const nodeAction = () => {
  return {
    name: 'node1',
    title: 'not existed channel',
  };
};

graphBuilder.addNode('node1', nodeAction).addEdge(START, 'node1').addEdge('node1', END);

const graph = await graphBuilder.compile();

const res = await graph.invoke({ list: ['xxx'] });

  1. list 和 age我们没有在 node1 节点中返回,么有处理,因此保持不变;
  2. name 是 node1 赋值的
  3. title 虽然 node1 返回了,但是我们没有声明这个 channel,因此结果中也不会存在这个属性

# 输出输出使用不同State

const InAnnotation = Annotation.Root({
  question: Annotation<string>,
});

const OutAnnotation = Annotation.Root({
  answer: Annotation<string>,
});
// 使用不同的 State
const graphBuilder = new StateGraph({
  input: InAnnotation,
  output: OutAnnotation,
});

const nodeAction = (state: typeof InAnnotation.State) => {
  this.logger.info('state', state);
  return {
    answer: 'my name is zhanwen li',
    duration: 3000,
    question: state.question,
  };
};

graphBuilder.addNode('node1', nodeAction).addEdge(START, 'node1').addEdge('node1', END);

const graph = await graphBuilder.compile();

const res = await graph.invoke({ question: "what's your name?", user: 'zhanwen' });

上面的代码输出为 {"answer": 'my name is zhanwen li'}

# 总结

LangGraph 流程的 state 设计中,所有的 state 属性都会变成一个 channel,任何节点都能通过 channel 修改 state 的值。 一个节点只能读取通过 Annotation.Root 声明过的 state channel,如果某个 channel 没有声明,则 node 节点无法直接读取值,但仍然可以修改它。

# 使用 ToolNode + Agent 实现简单的天气查询

ToolNode 是 LangGraph 中一种特殊的节点,用来帮助我们集合 tool calling 实现第三方服务集成,构建 Agent 应用。 在基于 LangGraph 实现的应用中,ToolNode 帮助我们衔接 模型决策与动作执行

  1. 意图识别:ToolNode 会读取 LLM tool calling 后返回的消息(AIMessage),消息中会包含 tool calling 的结论和参数提取; 根据 tool 的描述,决定调用哪个 tool(可能有 N 个tool)
  2. 参数提取:ToolNode 基于提供的工具列表,实际上触发工具的 invoke 方法,并且传入模型提取的参数

分析一下下面的流程:

  1. 我们定义了一个 getWeather 的工具,这个工具会根据输入的 location 返回天气信息;
  2. 使用 OpenAI 的 gpt-3.5-turbo LLM模型,作为agent Node;
  3. 用户 输入 what is the weather in SF
  4. 先经过 agent node ; 得到AI Message; 即获取到定位信息为 San Francisco
  5. 然后经过 tools node; 触发 getWeather 工具,传入参数为 San Francisco;最终得到天气信息为 It's 60 degrees and foggy.
  6. 由于 edge 定义,tool 执行后,下一个为 agent , 将工具的结果返回给 LLM 模型,触发 LLM 模型的第二次调用
  7. 此时 LLM 模型的输入为 It's 60 degrees and foggy. LLM 结合上下文信息,得到最终的结果为 The weather in San Francisco is currently 60 degrees and foggy.
export async function main() {
    // 手动顶一个tool
  const getWeather = tool(
    input => {
      if (["sf", "san francisco"].includes(input.location.toLowerCase())) {
        return "It's 60 degrees and foggy.";
      } else {
        return "It's 90 degrees and sunny.";
      }
    },
    {
      name: "get_weather",
      description: "Call to get the current weather.",
      schema: z.object({
        location: z.string().describe("Location to get the weather for.")
      })
    }
  );

  const tools = [getWeather];
  const modelWithTools = new ChatOpenAI({
    model: "gpt-3.5-turbo",
    temperature: 0,
    maxTokens: 600
  }).bindTools(tools);

  // 工具的集合
  const toolNodeForGraph = new ToolNode(tools);

  const shouldContinue = (state: typeof MessagesAnnotation.State) => {
    const { messages } = state;
    const lastMessage = messages[messages.length - 1];
    // 如果AI Message 里有 tool_calls,就进入 tools 节点 去处理
    if ("tool_calls" in lastMessage && Array.isArray(lastMessage.tool_calls) && lastMessage.tool_calls?.length) {
      return "tools";
    }
    return "__end__";
  };

  const callModel = async (state: typeof MessagesAnnotation.State) => {
    const { messages } = state;
    const response = await modelWithTools.invoke(messages);
    return { messages: response };
  };

  const graph = new StateGraph(MessagesAnnotation)
    .addNode("agent", callModel)
    .addNode("tools", toolNodeForGraph)
    .addEdge("__start__", "agent")
    .addConditionalEdges("agent", shouldContinue)
    .addEdge("tools", "agent")
    .compile();

  const inputs = {
    messages: [{ role: "user", content: "what is the weather in SF?" }]
  };

  const graphImg =( await graph.getGraphAsync())?.drawMermaid()
  console.log('=======merchantid=====start==');
  
  console.log(graphImg);
  
  console.log('=======merchantid=====end==');

  const stream = await graph.stream(inputs, {
    streamMode: "values"
  });

  for await (const { messages } of stream) {
    console.log(messages);
  }
  // Returns the messages in the state at each step of execution
}

下面是输出


  [
    // 第一步:用户输入
    HumanMessage {
      "id": "94b78bd3-42cb-4489-84c9-50bf5766eaa8",
      "content": "what is the weather in SF?",
      "additional_kwargs": {},
      "response_metadata": {}
    },
      // 第二步:基于 LLM 得到返回的 AI Message; 即获取到定位信息为 San Francisco

    AIMessage {
      "id": "chatcmpl-BCmEAklWcybBrHURQsDspoY8uGBqv",
      "content": "",
      "additional_kwargs": {
        "tool_calls": [
          {
            "id": "call_gqkKyGeA2md2fWWKEh6uekpf",
            "type": "function",
            "function": "[Object]"
          }
        ]
      },
      "response_metadata": {
        "tokenUsage": {
          "promptTokens": 61,
          "completionTokens": 16,
          "totalTokens": 77
        },
        "finish_reason": "tool_calls",
        "model_name": "gpt-3.5-turbo-0125"
      },
      "tool_calls": [
        {
          "name": "get_weather",
          "args": {
            "location": "San Francisco"
          },
          "type": "tool_call",
          "id": "call_gqkKyGeA2md2fWWKEh6uekpf"
        }
      ],
      "invalid_tool_calls": [],
      "usage_metadata": {
        "output_tokens": 16,
        "input_tokens": 61,
        "total_tokens": 77,
        "input_token_details": {
          "audio": 0,
          "cache_read": 0
        },
        "output_token_details": {
          "audio": 0,
          "reasoning": 0
        }
      }
    },
        //  第三步:调用 getWeather 工具,获取天气信息
    ToolMessage {
      "id": "78e84fdc-6697-4071-b439-2679bdb4343d",
      "content": "It's 60 degrees and foggy.",
      "name": "get_weather",
      "additional_kwargs": {},
      "response_metadata": {},
      "tool_call_id": "call_gqkKyGeA2md2fWWKEh6uekpf"
    },
        // 第四步:由于edge 定义,tool 执行后,下一个为agenet , 将工具的结果返回给 LLM 模型,触发 LLM 模型的第二次调用
    AIMessage {
      "id": "chatcmpl-BCmEBCY6CzVIME25oFfXR9MTS5Vbl",
      "content": "The weather in San Francisco is currently 60 degrees and foggy.",
      "additional_kwargs": {},
      "response_metadata": {
        "tokenUsage": {
          "promptTokens": 93,
          "completionTokens": 16,
          "totalTokens": 109
        },
        "finish_reason": "stop",
        "model_name": "gpt-3.5-turbo-0125"
      },
      "tool_calls": [],
      "invalid_tool_calls": [],
      "usage_metadata": {
        "output_tokens": 16,
        "input_tokens": 93,
        "total_tokens": 109,
        "input_token_details": {
          "audio": 0,
          "cache_read": 0
        },
        "output_token_details": {
          "audio": 0,
          "reasoning": 0
        }
      }
    }
  ]

# 使用 createReactAgent 实现简单的天气查询

createReactAgent 已经将 ToolNode + LLM 的实现封装成了一个方法,我们只需要传入工具列表,就可以创建一个 Agent 应用。 简单而言:createReactAgent 内置了 ToolNode、StateGraph(Node/Edge/Annotation)、compile

# createReactAgent 源码

https://github.com/langchain-ai/langgraphjs/blob/main/libs/langgraph/src/prebuilt/react_agent_executor.ts#L378

# 实现天气查询

和上面手动编写 ToolNode+Agent 一样的效果和输出;但代码量减少了很多

import { StateGraph, MessagesAnnotation } from "@langchain/langgraph";
import { createReactAgent, ToolNode } from "@langchain/langgraph/prebuilt";
import { tool } from "@langchain/core/tools";
import { z } from "zod";
import { ChatOpenAI } from "@langchain/openai";

export async function main() {
  // 手动顶一个tool
  const getWeather = tool(
    input => {
      if (["sf", "san francisco"].includes(input.location.toLowerCase())) {
        return "It's 60 degrees and foggy.";
      } else {
        return "It's 90 degrees and sunny.";
      }
    },
    {
      name: "get_weather",
      description: "Call to get the current weather.",
      schema: z.object({
        location: z.string().describe("Location to get the weather for.")
      })
    }
  );

  const tools = [getWeather];
  const agent = createReactAgent({
    llm: new ChatOpenAI({
      temperature: 0,
      model: "gpt-3.5-turbo",
      maxTokens: 600
    }),
    tools: tools
  });

  const inputs = {
    messages: [{ role: "user", content: "what is the weather in SF?" }]
  };

  const graph = await agent.getGraphAsync();
  const graphImg = graph?.drawMermaid();
  console.log(graphImg);
  const stream = await agent.stream(inputs, {
    streamMode: "values"
  });

  for await (const { messages } of stream) {
    console.log(messages);
  }
  // Returns the messages in the state at each step of execution
}


【未经作者允许禁止转载】 Last Updated: 4/3/2025, 7:47:20 AM