前面八章把 RAG 的所有核心组件都拼齐了:多格式摄入、模块化检索管道、Qdrant 持久化。但目前只能在 IDE 里跑测试,文档要重启应用才能换。
本章把这些能力整合成一套生产可用的 REST API:上传文件即时入库、按知识库隔离查询、答案带上来源引用。
1. 设计目标
只需要一个 Controller 就能讲清楚要解决的几件事:
- 多知识库隔离:在摄入文档分块时,可以通过添加
namespace字段,实现不同知识库之间的逻辑隔离; - 运行时上传:通过 multipart 端点直接上传文件,不再依赖 classpath 资源
- 幂等更新:同名文件重复上传自动覆盖旧分块,避免污染
- 来源引用:答案返回时带上引用的文档 source,方便用户判断可信度
- 统一异常:参数错误、文档处理失败用 ProblemDetail 标准格式返回
前面那个 demo 性质的 RagController 不再需要,本章用 KnowledgeBaseController 替换。
2. 项目结构
rag-demo/
└── src/main/java/com/albertstack/rag/
├── config/
│ └── AiConfig.java
├── controller/
│ └── KnowledgeBaseController.java # 替换原来的 RagController
├── service/
│ ├── IngestionService.java # 增加 namespace 摄入 + 去重
│ └── RagService.java # 增加 namespace 问答 + 来源提取
├── dto/
│ ├── ChatRequest.java
│ ├── ChatResponse.java
│ └── IngestionResult.java
└── exception/
├── DocumentProcessingException.java
└── GlobalExceptionHandler.java3. DTO 设计
用 Java Record 定义请求和响应,让 Controller 签名一目了然。
src/main/java/com/albertstack/rag/dto/ChatRequest.java
package com.albertstack.rag.dto;
public record ChatRequest(
String question // 用户提问的原始文本,禁止为空或纯空白
) {
// record 的紧凑构造器,反序列化阶段就会被调用,把不合法的请求挡在 Controller 之外
public ChatRequest {
if (question == null || question.isBlank()) {
throw new IllegalArgumentException("问题不能为空");
}
}
}src/main/java/com/albertstack/rag/dto/ChatResponse.java
package com.albertstack.rag.dto;
import java.util.List;
public record ChatResponse(
String answer, // LLM 生成的回答正文
List<String> sources // 本次回答引用的文档 source 列表,已去重
) {}src/main/java/com/albertstack/rag/dto/IngestionResult.java
package com.albertstack.rag.dto;
public record IngestionResult(
String namespace, // 文件被写入的知识库标识
String source, // 上传文件的原始文件名
int chunkCount // 本次摄入生成的分块数量
) {}4. 全局异常处理
src/main/java/com/albertstack/rag/exception/DocumentProcessingException.java
package com.albertstack.rag.exception;
public class DocumentProcessingException extends RuntimeException {
public DocumentProcessingException(String message, Throwable cause) {
super(message, cause);
}
}src/main/java/com/albertstack/rag/exception/GlobalExceptionHandler.java
package com.albertstack.rag.exception;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ProblemDetail;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import org.springframework.web.server.ServerWebInputException;
@Slf4j
@RestControllerAdvice
public class GlobalExceptionHandler {
// 业务异常:文档处理失败、参数无效等可预期错误
@ExceptionHandler({DocumentProcessingException.class, IllegalArgumentException.class})
public ProblemDetail handleBusiness(RuntimeException ex) {
log.warn("业务异常: {}", ex.getMessage());
var problem = ProblemDetail.forStatusAndDetail(HttpStatus.BAD_REQUEST, ex.getMessage());
problem.setTitle("请求处理失败");
return problem;
}
// 请求体反序列化失败:record 紧凑构造器抛出的校验异常会被包成这种类型
@ExceptionHandler(ServerWebInputException.class)
public ProblemDetail handleInputError(ServerWebInputException ex) {
// 优先取最底层的原因信息,方便前端看到 "问题不能为空" 这类业务提示
Throwable root = ex;
while (root.getCause() != null) {
root = root.getCause();
}
String detail = root.getMessage() != null ? root.getMessage() : "请求格式错误";
log.warn("请求输入错误: {}", detail);
var problem = ProblemDetail.forStatusAndDetail(HttpStatus.BAD_REQUEST, detail);
problem.setTitle("请求处理失败");
return problem;
}
// 兜底:所有未处理的异常统一返回 500
@ExceptionHandler(Exception.class)
public ProblemDetail handleGeneric(Exception ex) {
log.error("服务内部错误", ex);
var problem = ProblemDetail.forStatusAndDetail(
HttpStatus.INTERNAL_SERVER_ERROR, "服务处理请求时发生错误");
problem.setTitle("服务器错误");
return problem;
}
}这里有一个不容易发现的坑:record 紧凑构造器抛的 IllegalArgumentException 会被 WebFlux 包成 ServerWebInputException,必须单独捕获,否则会被兜底 handler 当成 500 吞掉。沿因果链取最底层的 message 是为了让客户端拿到 "问题不能为空" 而不是 Jackson 的堆栈。
5. IngestionService 升级
src/main/java/com/albertstack/rag/service/IngestionService.java
package com.albertstack.rag.service;
import com.albertstack.rag.exception.DocumentProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.document.Document;
import org.springframework.ai.reader.TextReader;
import org.springframework.ai.reader.markdown.MarkdownDocumentReader;
import org.springframework.ai.reader.markdown.config.MarkdownDocumentReaderConfig;
import org.springframework.ai.reader.pdf.PagePdfDocumentReader;
import org.springframework.ai.reader.tika.TikaDocumentReader;
import org.springframework.ai.transformer.splitter.TokenTextSplitter;
import org.springframework.ai.vectorstore.VectorStore;
import org.springframework.ai.vectorstore.filter.FilterExpressionBuilder;
import org.springframework.core.io.Resource;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service
public class IngestionService {
private final VectorStore vectorStore;
private final TokenTextSplitter splitter;
public IngestionService(VectorStore vectorStore) {
this.vectorStore = vectorStore;
this.splitter = TokenTextSplitter.builder()
.withChunkSize(800)
.withMinChunkSizeChars(350)
.build();
}
/**
* 摄入文档到指定 namespace,同名文件会先被删除再写入,天然幂等
*/
public int ingest(Resource resource, String filename, String namespace) {
// 写入前先按 (namespace, source) 删旧分块,避免重复上传越堆越多
deleteBySource(namespace, filename);
List<Document> documents;
try {
documents = readDocuments(resource, filename);
} catch (Exception e) {
// 底层 Reader 抛错统一包成业务异常,让上层只需处理一种类型
throw new DocumentProcessingException("文档读取失败: " + filename, e);
}
// 给每个分块打上 namespace 和 source 标签:前者用于检索隔离,后者用于来源引用和去重
for (Document doc : documents) {
doc.getMetadata().put("namespace", namespace);
doc.getMetadata().put("source", filename);
}
List<Document> chunks = splitter.split(documents);
vectorStore.add(chunks);
log.info("[{}] 摄入完成: {} -> {} 个分块", namespace, filename, chunks.size());
return chunks.size();
}
/**
* 删除指定 namespace 下某个 source 的全部分块
*/
public void deleteBySource(String namespace, String source) {
// FilterExpressionBuilder.and(eq, eq) 组合两个等值条件,按 namespace + source 精确匹配,不会误伤其他文件
var b = new FilterExpressionBuilder();
var filter = b.and(b.eq("namespace", namespace), b.eq("source", source)).build();
vectorStore.delete(filter);
}
private List<Document> readDocuments(Resource resource, String filename) {
String ext = getFileExtension(filename);
return switch (ext) {
case "pdf" -> new PagePdfDocumentReader(resource).read();
case "md" -> {
var config = MarkdownDocumentReaderConfig.builder()
.withIncludeCodeBlock(true)
.withIncludeBlockquote(true)
.build();
yield new MarkdownDocumentReader(resource, config).read();
}
case "txt", "csv" -> new TextReader(resource).read();
default -> new TikaDocumentReader(resource).read();
};
}
private String getFileExtension(String filename) {
if (filename == null || !filename.contains(".")) {
return "";
}
return filename.substring(filename.lastIndexOf('.') + 1).toLowerCase();
}
}6. 顺手把 AiConfig 也对齐
第七章在 AiConfig 里写了一个 CommandLineRunner,启动时自动从 classpath 摄入几份示例文档。它当时调的是旧的 ingest(resource),写出的分块没有 namespace 字段。如果不一起改,新的 namespace 系统里就会同时存在两种数据:
- CommandLineRunner 写的老格式:payload 里只有
source、chunk_index等,没有namespace - 新 API 上传的新格式:payload 里多了
namespace字段
混在同一个 collection 里,打开 Qdrant Dashboard 随便点一个分块,很可能看到的是没有 namespace 的那条,会让人怀疑专栏在骗人。
src/main/java/com/albertstack/rag/config/AiConfig.java
@Bean
public CommandLineRunner ingestDocuments(IngestionService ingestionService) {
return args -> {
// 自动摄入的 demo 数据统一打上 namespace=demo 标签,让所有点都有 namespace 字段
// 旧的 chat() 方法用的是不带 filter 的 ragAdvisor,依然能搜到这些数据
String namespace = "demo";
List<String> files = List.of(
"docs/spring-ai-intro.txt",
"docs/rag-concepts.txt",
"docs/ollama-guide.txt"
);
for (String file : files) {
var resource = new ClassPathResource(file);
String filename = resource.getFilename();
int chunks = ingestionService.ingest(resource, filename, namespace);
log.info("[{}] 已摄入: {} -> {} 个分块", namespace, file, chunks);
}
};
}只改了 ingestDocuments 这一个 Bean,其它不动:
- 调用方从
ingest(resource)换成ingest(resource, filename, "demo"),自动加载的文档统一进demo知识库 - 第七章的
RagService.chat()走的是不带 filter 的ragAdvisor,对它来说有没有 namespace 标签都一样,老测试不破 - 用新 API 问
demo知识库(POST /api/knowledge-bases/demo/chat)也能直接复用这批种子数据
7. RagService 升级
src/main/java/com/albertstack/rag/service/RagService.java
package com.albertstack.rag.service;
import com.albertstack.rag.dto.ChatResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.document.Document;
import org.springframework.ai.rag.advisor.RetrievalAugmentationAdvisor;
import org.springframework.ai.rag.generation.augmentation.ContextualQueryAugmenter;
import org.springframework.ai.rag.preretrieval.query.transformation.RewriteQueryTransformer;
import org.springframework.ai.rag.retrieval.search.VectorStoreDocumentRetriever;
import org.springframework.ai.vectorstore.VectorStore;
import org.springframework.ai.vectorstore.filter.FilterExpressionBuilder;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import java.util.List;
@Slf4j
@Service
public class RagService {
private final ChatClient chatClient;
// RewriteQueryTransformer 内部要再起一个 ChatClient,所以这里持有 Builder 而不是已建好的 ChatClient
private final ChatClient.Builder chatClientBuilder;
private final VectorStore vectorStore;
public RagService(ChatClient chatClient,
ChatClient.Builder chatClientBuilder,
VectorStore vectorStore) {
this.chatClient = chatClient;
this.chatClientBuilder = chatClientBuilder;
this.vectorStore = vectorStore;
}
/**
* 多知识库问答:限定 namespace 检索,返回答案 + 来源引用
*/
public ChatResponse ask(String namespace, String question) {
log.info("[{}] 收到问题: {}", namespace, question);
var response = chatClient.prompt()
.user(question)
// 每次请求现场构建带 namespace filter 的 Advisor,原因见 buildNamespaceAdvisor 上的注释
.advisors(buildNamespaceAdvisor(namespace))
.call()
.chatResponse();
String answer = response.getResult().getOutput().getText();
List<String> sources = extractSources(response);
log.info("[{}] 回答完成,引用 {} 个来源", namespace, sources.size());
return new ChatResponse(answer, sources);
}
/**
* 多知识库流式问答:SSE 逐 token 推送
*/
public Flux<String> askStream(String namespace, String question) {
log.info("[{}] 收到流式问题: {}", namespace, question);
return chatClient.prompt()
.user(question)
.advisors(buildNamespaceAdvisor(namespace))
.stream()
.content();
}
/**
* 按 namespace 现场构建 RAG Advisor。
* 之所以不像之前那样注册成单例 Bean,是因为 VectorStoreDocumentRetriever.filterExpression(...)
* 接收的是一个静态 filter,构建后就固定了。要让 filter 跟着 namespace 变化,最简单的做法就是每次现场构建一个。
* 重新构建只是几个对象分配,相比后面的 LLM 调用可以忽略。
*/
private RetrievalAugmentationAdvisor buildNamespaceAdvisor(String namespace) {
// 这一行是和之前管道相比唯一的新增:用 namespace 等值条件限定检索范围
var filter = new FilterExpressionBuilder().eq("namespace", namespace).build();
// 下面的管道结构和之前的 ragAdvisor 一致:查询重写 -> 带 filter 的检索 -> 上下文增强
return RetrievalAugmentationAdvisor.builder()
.queryTransformers(RewriteQueryTransformer.builder()
.chatClientBuilder(chatClientBuilder).build())
.documentRetriever(VectorStoreDocumentRetriever.builder()
.vectorStore(vectorStore)
.similarityThreshold(0.5)
.topK(5)
.filterExpression(filter)
.build())
.queryAugmenter(ContextualQueryAugmenter.builder()
.allowEmptyContext(true)
.build())
.build();
}
/**
* 从 Advisor 上下文提取检索文档的 source,用于回答时标注引用。
* 参数用全限定名,是因为本项目 DTO 包里也定义了 ChatResponse,避免和 Spring AI 的同名类冲突。
*/
private List<String> extractSources(org.springframework.ai.chat.model.ChatResponse response) {
try {
// RetrievalAugmentationAdvisor 执行完检索后,会把命中的 List<Document> 放进 response metadata 的 DOCUMENT_CONTEXT key 下
Object context = response.getMetadata().get(RetrievalAugmentationAdvisor.DOCUMENT_CONTEXT);
if (context instanceof List<?> docs) {
return docs.stream()
.filter(Document.class::isInstance)
.map(Document.class::cast)
// 取每个文档的 source 元数据,去重后作为答案的引用列表
.map(doc -> doc.getMetadata().getOrDefault("source", "unknown").toString())
.distinct()
.toList();
}
} catch (Exception e) {
// 来源提取只是辅助信息,挂了也不能阻断主问答流程,兜底返回空列表
log.warn("提取来源失败: {}", e.getMessage());
}
return List.of();
}
}8. KnowledgeBaseController
src/main/java/com/albertstack/rag/controller/KnowledgeBaseController.java
package com.albertstack.rag.controller;
import com.albertstack.rag.dto.ChatRequest;
import com.albertstack.rag.dto.ChatResponse;
import com.albertstack.rag.dto.IngestionResult;
import com.albertstack.rag.service.IngestionService;
import com.albertstack.rag.service.RagService;
import lombok.RequiredArgsConstructor;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.MediaType;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.nio.file.Files;
import java.nio.file.Path;
@RestController
// 路径以 namespace 为顶层资源段,所有端点天然按知识库分组
@RequestMapping("/api/knowledge-bases/{namespace}")
@RequiredArgsConstructor
public class KnowledgeBaseController {
private final IngestionService ingestionService;
private final RagService ragService;
/**
* 上传文件到指定知识库。同名文件会自动覆盖(先删后写)
*/
@PostMapping(value = "/documents", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public Mono<IngestionResult> upload(@PathVariable String namespace,
@RequestPart("file") FilePart filePart) {
// 摄入是阻塞 IO,包到 fromCallable + boundedElastic 里,避免占住 Netty 事件循环线程
return Mono.fromCallable(() -> {
// FilePart 是 WebFlux 的响应式抽象,但 DocumentReader 都要 Resource,所以先落盘成临时文件
Path tempFile = Files.createTempFile("rag-upload-", "-" + filePart.filename());
filePart.transferTo(tempFile).block();
try {
int chunks = ingestionService.ingest(
new FileSystemResource(tempFile),
filePart.filename(),
namespace);
return new IngestionResult(namespace, filePart.filename(), chunks);
} finally {
// try-finally 保证临时文件一定被清理,避免占满 /tmp
Files.deleteIfExists(tempFile);
}
}).subscribeOn(Schedulers.boundedElastic());
}
/**
* 同步问答:返回完整答案 + 来源引用,适合后台对接和需要完整答案的场景
*/
@PostMapping("/chat")
public Mono<ChatResponse> chat(@PathVariable String namespace,
@RequestBody ChatRequest request) {
// LLM 调用同样是阻塞的,按摄入端点的同样套路丢给弹性线程池
return Mono.fromCallable(() -> ragService.ask(namespace, request.question()))
.subscribeOn(Schedulers.boundedElastic());
}
/**
* 流式问答:适合面向终端用户的交互,缓解首 token 等待焦虑
*/
@PostMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> chatStream(@PathVariable String namespace,
@RequestBody ChatRequest request) {
// 直接返回 Flux<String>,Spring WebFlux 会自动按 SSE 格式编码,每个 token 一个 data: 帧
return ragService.askStream(namespace, request.question());
}
}9. 集成测试
用 WebTestClient 串一遍三个端点:上传文件、问答、空问题拦截。每个测试只保留 expectStatus() 这种最基础的状态码断言,业务字段直接 log.info 打出来,顺着日志就能看清楚每一步的入参出参。
src/test/java/com/albertstack/rag/controller/KnowledgeBaseControllerTest.java
package com.albertstack.rag.controller;
import com.albertstack.rag.service.IngestionService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.webtestclient.autoconfigure.AutoConfigureWebTestClient;
import org.springframework.core.io.ClassPathResource;
import org.springframework.http.MediaType;
import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.test.web.reactive.server.WebTestClient;
@Slf4j
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
// timeout 设到 180 秒:RAG 包含查询重写、向量检索、LLM 生成三段,本地 Ollama 跑大模型耗时不少
@AutoConfigureWebTestClient(timeout = "180s")
// PER_CLASS 让 @BeforeAll 可以是非 static 方法,方便注入 Spring 管理的 IngestionService 做清理
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KnowledgeBaseControllerTest {
private static final String NAMESPACE = "test-kb";
@Autowired
WebTestClient webTestClient;
@Autowired
IngestionService ingestionService;
@BeforeAll
void setup() {
// Qdrant 数据落盘,重复跑测试会累积。先按 (namespace, source) 清掉残留,保证测试可重复执行
ingestionService.deleteBySource(NAMESPACE, "spring-ai-intro.txt");
}
@Test
void shouldUploadFileToNamespace() {
var resource = new ClassPathResource("docs/spring-ai-intro.txt");
var bodyBuilder = new MultipartBodyBuilder();
bodyBuilder.part("file", resource).filename("spring-ai-intro.txt");
log.info("上传文件: namespace={}, file=spring-ai-intro.txt", NAMESPACE);
byte[] body = webTestClient.post()
.uri("/api/knowledge-bases/{ns}/documents", NAMESPACE)
.contentType(MediaType.MULTIPART_FORM_DATA)
.bodyValue(bodyBuilder.build())
.exchange()
.expectStatus().isOk()
.expectBody()
.returnResult()
.getResponseBody();
log.info("上传响应: {}", new String(body));
}
@Test
void shouldAnswerWithSources() {
// 测试用例不依赖执行顺序,每个 case 自己保证前置数据。
// 这里再上传一次同样的文件是幂等的:deleteBySource 会先把旧分块清掉再写入新的
var resource = new ClassPathResource("docs/spring-ai-intro.txt");
var bodyBuilder = new MultipartBodyBuilder();
bodyBuilder.part("file", resource).filename("spring-ai-intro.txt");
webTestClient.post()
.uri("/api/knowledge-bases/{ns}/documents", NAMESPACE)
.contentType(MediaType.MULTIPART_FORM_DATA)
.bodyValue(bodyBuilder.build())
.exchange()
.expectStatus().isOk();
String question = "Spring AI 是什么?";
log.info("发起问答: namespace={}, question={}", NAMESPACE, question);
byte[] body = webTestClient.post()
.uri("/api/knowledge-bases/{ns}/chat", NAMESPACE)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue("{\"question\": \"" + question + "\"}")
.exchange()
.expectStatus().isOk()
.expectBody()
.returnResult()
.getResponseBody();
log.info("问答响应: {}", new String(body));
}
@Test
void shouldRejectEmptyQuestion() {
// 空问题会触发 ChatRequest 紧凑构造器抛 IllegalArgumentException,
// 被 WebFlux 包成 ServerWebInputException,最后由 GlobalExceptionHandler 转成 400 + ProblemDetail
log.info("发起空问题,预期被拦截");
byte[] body = webTestClient.post()
.uri("/api/knowledge-bases/{ns}/chat", NAMESPACE)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue("{\"question\": \"\"}")
.exchange()
.expectStatus().isBadRequest()
.expectBody()
.returnResult()
.getResponseBody();
log.info("拦截响应: {}", new String(body));
}
}10. API 汇总
| 方法 | 路径 | 说明 | 请求 | 响应 |
|---|---|---|---|---|
| POST | /api/knowledge-bases/{ns}/documents |
上传文件到指定知识库 | multipart/form-data | IngestionResult |
| POST | /api/knowledge-bases/{ns}/chat |
同步问答(含来源引用) | ChatRequest |
ChatResponse |
| POST | /api/knowledge-bases/{ns}/chat/stream |
流式问答(SSE) | ChatRequest |
text/event-stream |
11. 本章小结
把前面所有组件整合成一套生产可用的 REST API,核心是用 namespace 做多知识库隔离,用 deleteBySource + 写入实现幂等更新,用 extractSources 让答案带上引用。
| 知识点 | 说明 |
|---|---|
| Java Record DTO | ChatRequest 紧凑构造器在反序列化阶段做参数校验 |
@RestControllerAdvice + ProblemDetail |
RFC 9457 标准错误格式,业务/输入/兜底三类异常分开处理 |
| Namespace 隔离 | 摄入时打 namespace 元数据,检索时用 FilterExpressionBuilder.eq 限定 |
| 幂等更新 | 写入前 deleteBySource(ns, source),重复上传自动覆盖 |
| 来源追踪 | 从 RetrievalAugmentationAdvisor.DOCUMENT_CONTEXT 提取文档列表 |
| WebTestClient | 一个测试类覆盖上传、问答、空问题三个核心用例 |
接下来给系统装上"测量仪表盘":用命中率和 MRR 评估检索质量,通过对比实验找到最优的分块大小和 topK。