新闻情报分析系统Q&A梳理
一、数据采集与Kafka(Q1-Q9)
Q1. 时间戳增量机制具体怎么实现的?时间戳存在哪里?Redis?数据库?还是文件?
回答:
时间戳存储在本地文件中,具体是 last_timestamp.txt 文件。
实现代码(位于 news_spider.py):
# 加载上次采集的最大时间戳
def _load_last_timestamp(self):
try:
with open('last_timestamp.txt', 'r') as f:
return int(f.read().strip())
except:
# 默认从24小时前开始
return int((datetime.now() - timedelta(days=1)).timestamp())
# 保存最大时间戳
def _save_last_timestamp(self, timestamp):
with open('last_timestamp.txt', 'w') as f:
f.write(str(timestamp))
self.last_timestamp = timestamp
工作流程:
- 爬虫启动时,读取
last_timestamp.txt获取上次采集的最大时间戳 - 采集过程中,只采集
ctime > last_timestamp的新闻(增量过滤) - 采集完成后,计算本批次的最大时间戳并写入文件
为什么选择文件而不是Redis/数据库:
- 项目规模小,文件足够简单可靠
- 爬虫是单实例运行,不存在并发竞争问题
- 如果规模扩大,可以迁移到Redis(用
SET last_timestamp {value}即可)
Q2. 如果爬虫中途崩溃了,时间戳怎么恢复?会不会漏采或重复采集?
回答:
崩溃恢复机制:
- 时间戳只在采集成功完成后才更新(见 news_spider.py#L107),所以如果中途崩溃,时间戳保持上次值
- 下次重启时会从上次成功的位置重新开始,可能导致部分重复采集
重复采集的处理:
- Kafka端:使用URL作为消息Key,相同URL会落到同一分区,但不会自动去重
- MySQL端:
url字段建立了UNIQUE INDEX(唯一索引),配合ON DUPLICATE KEY UPDATE实现幂等写入 - Neo4j端:使用
MERGE创建节点 + 先清后写策略(先删除旧CONTAINS关系,再重建),保证NLP重复执行后结果一致 - 综合效果:即使爬虫重复采集,最终数据库不会有重复数据
// MySQL幂等写入(位于 NewsProcessingJob.java MySQLSink)
// 前提:ALTER TABLE news ADD UNIQUE INDEX uk_url (url);
String sql = "INSERT INTO news (title, ctime, url, source, content, type) " +
"VALUES (?, ?, ?, ?, ?, ?) " +
"ON DUPLICATE KEY UPDATE content=VALUES(content), type=VALUES(type)";
漏采可能性:
- 如果崩溃发生在”采集完成但时间戳未保存”的瞬间(极小概率),可能漏采
- 实际上这个窗口非常短(毫秒级),风险可以接受
Q3. 日均150+条,爬虫是定时任务还是常驻进程?用的 crontab 还是 APScheduler?调度频率是多少?
回答:
使用 APScheduler 的 BlockingScheduler,调度频率是每小时执行一次。
代码实现(位于 news_spider.py#L358-L372):
from apscheduler.schedulers.blocking import BlockingScheduler
def schedule_spider():
spider = SinaNewsSpider()
scheduler = BlockingScheduler()
# 每小时执行一次
@scheduler.scheduled_job('interval', hours=1)
def hourly_crawl():
spider.run(mode='roll', fetch_content=True)
scheduler.start()
为什么选择APScheduler而不是crontab:
- APScheduler是Python原生解决方案,跨平台(Linux/Windows都能用)
- 支持动态修改调度策略,代码可维护性更好
- 可以方便地添加任务监控、错误重试等逻辑
- crontab在Windows上不原生支持,需要用Task Scheduler
调度策略设计思路:
- 新浪新闻滚动页每小时约更新10-20条新闻
- 每小时采集一次可以保证新闻的时效性,同时不会对目标网站造成太大压力
- 日均 24次 × 6-8条/次 ≈ 150条,符合实际数据
Q4. 你的爬虫用的什么库?requests?Scrapy?Playwright?为什么选这个?
回答:
使用 requests + BeautifulSoup 组合。
import requests
from bs4 import BeautifulSoup
response = requests.get(url, headers=self.headers, timeout=10)
soup = BeautifulSoup(response.text, 'html.parser')
选择理由:
- 目标网站简单:新浪新闻是传统服务端渲染页面,不需要JavaScript执行
- 轻量高效:requests + BeautifulSoup 组合非常轻量,启动快、资源占用少
- 学习成本低:团队成员都熟悉这套工具,开发效率高
为什么不用Scrapy:
- Scrapy适合大规模分布式爬虫,我们只是单一数据源、单实例
- Scrapy的框架比较重,对于简单需求有点过度设计
- 如果未来需要扩展到多数据源,可以考虑迁移
为什么不用Playwright:
- Playwright主要用于动态渲染页面(SPA、需要登录等场景)
- 新浪新闻的数据接口是JSON API,不需要浏览器渲染
- Playwright资源消耗大,对于简单采集任务得不偿失
Q5. 150+条/天,平均下来一秒不到0.01条,为什么要用Kafka这么重的消息队列?直接写数据库不行吗?
回答:
这是面试官最可能追问的问题,我会诚实回答:
坦诚承认:从纯实用角度看,这个数据量确实不需要Kafka,直接写数据库完全可行。
使用Kafka的合理性解释:
- 架构解耦:
- 爬虫只负责采集,不关心后续处理逻辑
- 处理端(Flink/消费者)可以独立扩展、升级、重启,不影响采集端
- 如果未来接入新的数据源,只需生产消息到同一Topic
- 削峰填谷:
- 虽然日均150条,但可能集中在某些时段(如新闻高峰期)
- Kafka可以平滑处理突发流量
- 数据缓冲:
- 如果Flink/处理端宕机,消息不会丢失,可以重新消费
- 提供了7天的消息保留,方便问题排查和数据回溯
- 技术储备(诚实回答):
- 这是一个学习项目,目的是熟悉流式处理架构
- 用Kafka是为了学习和实践分布式消息队列的使用
- 如果是生产项目且确认数据量不会增长,可以简化为直接写库
如果面试官继续追问”那你怎么证明你真的懂Kafka”:
- 可以说明Kafka的分区、副本、acks机制
- 说明消费者组、offset管理、rebalance等概念
Q6. Kafka的Topic你设计了几个?Partition数量是多少?为什么这么设计?
回答:
Topic设计:只有 1个Topic,名为 news_topic
Partition数量:1个分区
// 位于 NewsProcessingJob.java#L44-L49
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("news_topic")
.setGroupId("flink-news-processor")
.build();
设计理由:
- 单分区原因:
- 数据量小(0.002条/秒),单分区完全够用
- 单分区保证全局有序,新闻按时间顺序处理
- Flink消费端并行度为3,但单分区情况下只有1个并行实例真正消费
- 单Topic原因:
- 所有新闻进入同一处理流程,不需要按类型区分
- 如果未来需要按新闻类型分流,可以增加Topic(如
news_topic_finance、news_topic_tech)
扩展方案(面试加分点):
- 如果数据量增长到1000条/秒,可以扩展到3个分区
- 分区Key使用新闻来源(source),相同来源的新闻落到同一分区
- Flink端并行度设置为分区数的整数倍
Q7. 生产者的acks参数你设的多少?0、1、还是all?为什么?
回答:
acks = 1(默认值,代码中未显式设置)
# 位于 news_spider.py#L44-L49
self.kafka_producer = KafkaProducer(
bootstrap_servers=kafka_bootstrap_servers,
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
参数含义对比:
| acks值 | 含义 | 可靠性 | 性能 |
|---|---|---|---|
| 0 | 不等待确认 | 最低 | 最高 |
| 1 | 等待Leader确认 | 中等 | 中等 |
| all | 等待所有ISR确认 | 最高 | 最低 |
选择acks=1的理由:
- 新闻数据不是金融交易,偶尔丢一条可接受
- 爬虫可以重新采集,有时间戳机制保底
- acks=1 在可靠性和性能之间取得平衡
如果要求更高可靠性:
- 设置
acks='all' - 配合
min.insync.replicas=2 - 设置
retries=3和retry.backoff.ms=1000
Q8. 如果Kafka宕机了,爬虫端怎么处理?消息会丢失吗?
回答:
当前实现:Kafka宕机时,爬虫会优雅降级,记录日志但不会崩溃。
# 位于 news_spider.py#L44-L54
try:
self.kafka_producer = KafkaProducer(...)
self.kafka_enabled = True
except Exception as e:
logger.warning(f"Kafka连接失败,将只保存到本地: {e}")
self.kafka_enabled = False
消息处理策略:
- 发送前宕机:消息保存到本地文件(
news_{timestamp}.json),不会丢失 - 发送中宕机:使用同步发送
future.get(timeout=10),超时后会记录错误 - 恢复后处理:需要手动将本地文件重新发送到Kafka
# 同步等待确认(位于 news_spider.py#L320-L323)
future = self.kafka_producer.send(topic, key=news.get('url', ''), value=news)
future.get(timeout=10) # 同步等待发送完成
改进建议(面试加分):
- 添加本地队列(如SQLite)作为备份
- Kafka恢复后自动重发
- 或者使用Kafka Connect的Dead Letter Queue机制
Q9. 新浪新闻有反爬机制吗?你怎么绕过的?有没有被封IP的经历?
回答:
新浪新闻的反爬机制:
- 相对温和,主要是频率限制和User-Agent检测
- 没有复杂的JavaScript混淆或验证码
我采取的措施:
- 模拟浏览器请求头(位于 news_spider.py#L35-L40):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36...',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}
- 请求间隔控制:
time.sleep(0.5) # 页面列表请求间隔(news_spider.py#L101)
time.sleep(0.3) # 正文获取间隔(news_spider.py#L274)
- 错误容忍:单个请求失败不影响整体流程
是否被封IP:
- 开发测试期间有过一次,是因为调试时请求过于频繁
- 解决方法:等待10分钟后自动恢复,之后增加了请求间隔
进一步反爬策略(如果面试官追问):
- IP代理池(付费代理服务)
- 随机User-Agent
- 分布式爬虫分散请求
- 使用Selenium模拟真实浏览器行为
二、Flink流处理与NLP(Q10-Q21)
Q10. Flink的Checkpoint机制你开启了吗?间隔多久?状态后端用的什么(Memory/RocksDB/HDFS)?
回答:
是的,做了完整的Checkpoint配置,不只是简单开启,而是针对项目特点做了参数调优。
代码位置(NewsProcessingJob.java#L44-L82):
// 每60秒做一次Checkpoint
env.enableCheckpointing(60000);
// 设置EXACTLY_ONCE语义(Barrier对齐模式)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 两次Checkpoint之间最小间隔30秒(避免频繁Checkpoint增加系统负担)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
// Checkpoint超时120秒(NLP调用较慢,要留足余量)
env.getCheckpointConfig().setCheckpointTimeout(120000);
// 最多允许1个Checkpoint同时进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 作业取消时保留Checkpoint(便于手动恢复)
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// 容忍3次Checkpoint失败(避免偶发故障导致作业整体失败)
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// Checkpoint持久化到本地文件系统(生产环境应使用HDFS/S3)
env.getCheckpointConfig().setCheckpointStorage("file:///opt/flink/checkpoints");
// 失败后自动重启策略:最多3次,每次间隔10秒
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));
配置参数一览:
| 参数 | 值 | 设计理由 |
|---|---|---|
| Checkpoint间隔 | 60秒 | 数据量小(~0.1条/分钟),60秒足够 |
| Checkpoint语义 | EXACTLY_ONCE | Barrier对齐,保证状态一致性 |
| 最小间隔 | 30秒 | 上一次完成后至少等30秒再触发下一次 |
| 超时时间 | 120秒 | NLP调用可能耗时较久,要给足时间 |
| 并发数 | 1 | 单Checkpoint降低系统开销 |
| 取消后保留 | RETAIN_ON_CANCELLATION | 手动cancel作业后还能从Checkpoint恢复 |
| 容忍失败次数 | 3 | 偶发的网络抖动不会杀死整个作业 |
| 持久化存储 | /opt/flink/checkpoints | 本地文件系统,集群重启可恢复 |
| 重启策略 | fixed-delay, 3次, 10秒间隔 | Task失败自动恢复 |
| 并行度 | 3 | 匹配TaskSlot数量 |
| 状态后端 | HashMapStateBackend(默认) | 状态量小,内存足够 |
为什么超时设120秒:
- 因为NLP调用是同步阻塞的,每条消息可能处理4-12秒
- 如果Checkpoint触发时正好在等NLP响应,Barrier对齐会等待
- 设太短(如30秒)容易导致Checkpoint超时失败
Checkpoint持久化存储(关键配置):
env.getCheckpointConfig().setCheckpointStorage("file:///opt/flink/checkpoints");
- 存储在本地文件系统
/opt/flink/checkpoints目录 - 作业取消或集群重启后,Checkpoint文件保留在磁盘
- 重启时可以从最近的Checkpoint恢复:
flink run -s file:///opt/flink/checkpoints/chk-xxx job.jar - 生产环境应使用HDFS或S3:
hdfs:///flink/checkpoints或s3://bucket/checkpoints
自动重启策略:
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));
- Task失败后自动重启,最多3次,每次间隔10秒
- 覆盖场景:NLP超时、网络抖动、TaskManager短暂故障
- 结合RETAIN_ON_CANCELLATION,实现完整的故障恢复能力
💡 Tips:面试时说出
setCheckpointingMode(EXACTLY_ONCE)和setExternalizedCheckpointCleanup(RETAIN_ON_CANCELLATION)这两个API,比只说”我开了Checkpoint”显得专业得多
Q11. 什么是Exactly-Once语义?你是怎么实现端到端Exactly-Once的?只靠Flink Checkpoint够吗?
回答:
什么是Exactly-Once:
每条数据恰好被处理一次,不会丢失(至少一次)也不会重复(最多一次)。
只靠Flink Checkpoint够吗?不够!
Flink Checkpoint只保证内部状态的一致性。端到端Exactly-Once需要 Source + 内部处理 + Sink 三方面共同配合:
| 环节 | 保证机制 | 我的具体实现 |
|---|---|---|
| Source(Kafka) | Offset与Checkpoint绑定 | Flink KafkaSource自动管理offset提交 |
| 处理(Flink内部) | Checkpoint + EXACTLY_ONCE模式 | setCheckpointingMode(EXACTLY_ONCE) |
| Sink-MySQL | 幂等写入 | ON DUPLICATE KEY UPDATE(url唯一索引) |
| Sink-Neo4j | 幂等写入 + 先清后写 | MERGE + 写入前删除旧CONTAINS关系 |
MySQL端幂等实现(NewsProcessingJob.java MySQLSink):
// url字段必须有UNIQUE INDEX → ON DUPLICATE KEY才会生效
// 建表语句: ALTER TABLE news ADD UNIQUE INDEX uk_url (url);
String sql = "INSERT INTO news (title, ctime, url, source, content, type) " +
"VALUES (?, ?, ?, ?, ?, ?) " +
"ON DUPLICATE KEY UPDATE content=VALUES(content), type=VALUES(type)";
Neo4j端幂等实现 —— 先清后写策略(NewsProcessingJob.java Neo4jSink):
session.executeWrite(tx -> {
// Step 1: MERGE新闻节点(幂等)
tx.run("MERGE (n:NEW {url: $url}) SET n.title=$title, ...", params);
// Step 2: 先删除该新闻的所有旧CONTAINS关系 ← 关键!
tx.run("MATCH (n:NEW {url: $url})-[r:CONTAINS]->() DELETE r", params);
// Step 3: 重新创建CONTAINS关系
for (entity : entities) {
tx.run("MERGE (e:PERSON {entity: $entity}) " +
"WITH e MATCH (n:NEW {url: $url}) " +
"MERGE (n)-[:CONTAINS]->(e)", params);
}
return null;
});
为什么Neo4j需要”先清后写”? 这是因为NLP有随机性:
场景:Flink写入Neo4j后崩溃,Checkpoint未完成,重启后重新消费同一条消息
第一次NLP结果: entities = ["张三", "北京"]
第二次NLP结果: entities = ["张三", "上海"] ← LLM生成有随机性
❌ 不清除旧关系的后果:100的消息
2. NLP处理(分类、实体识别...)
3. 写入MySQL(ON DUPLICATE KEY → 幂等)
4. 写入Neo4j(先清旧关系再重建 → 幂等)
5. ⏰ T=60s时Checkpoint-1成功:offset=100持久化到 `/opt/flink/checkpoints/chk-1`
6. 继续消费offset=101, 102, 103...
7. 💥 T=90s时Flink崩溃(Checkpoint-2未完成)
8. Flink自动重启(RestartStrategy:3次重试,10秒间隔)
9. 读取 `/opt/flink/checkpoints/chk-1` → 恢复offset=100
10. 从offset=100重新消费(101-103被重复处理)
11. MySQL ON DUPLICATE KEY → 幂等覆盖 ✅
12. Neo4j 先清后写 → 最终一致 ✅
13. 继续正常运行,下次Checkpoint-2成功时offset推进到最新
完整的故障恢复流程:
- Flink从Kafka消费offset=5的消息
- NLP处理(分类、实体识别…)
- 写入MySQL(ON DUPLICATE KEY → 幂等)
- 写入Neo4j(先清旧关系再重建 → 幂等)
- 💥 此时Flink崩溃,Checkpoint未完成
- Flink重启 → 从上一次Checkpoint恢复 → offset回到5
- 重新消费offset=5的消息,再次NLP处理
- 再次写入MySQL → 命中DUPLICATE KEY → UPDATE覆盖 ✅
- 再次写入Neo4j → 先清旧关系再重建 → 最终一致 ✅
💡 Tips:面试时一定要说”端到端Exactly-Once不能只靠Checkpoint,Sink端必须有幂等保证”,然后能说出MySQL用ON DUPLICATE KEY、Neo4j用MERGE+先清后写,这就是高质量回答
Q12. 幂等写入具体是怎么做的?MySQL端和Neo4j端分别怎么保证幂等?
回答:
幂等性定义:同一操作执行多次,结果与执行一次相同。
MySQL幂等写入
方法:INSERT ... ON DUPLICATE KEY UPDATE
// 前提:url字段必须建立唯一索引!
// ALTER TABLE news ADD UNIQUE INDEX uk_url (url);
String sql = "INSERT INTO news (title, ctime, url, source, content, type) " +
"VALUES (?, ?, ?, ?, ?, ?) " +
"ON DUPLICATE KEY UPDATE content=VALUES(content), type=VALUES(type)";
前提条件:url 字段必须有唯一索引(UNIQUE INDEX)
为什么必须有唯一索引:
ON DUPLICATE KEY的触发条件是 唯一键冲突- 如果url没有唯一索引,
nid又是自增主键 → INSERT永远不会冲突 → 每次都插入新行 → 幂等失效! - 建立唯一索引后:相同url第二次INSERT会触发冲突 → 走UPDATE分支 → 只更新content和type
工作原理:
第一次插入(url='http://example.com/news/123'):
→ url不存在 → INSERT → nid=1 ✅
第二次插入(相同url,Flink重启重复消费):
→ url已存在 → UNIQUE冲突 → UPDATE content和type → nid仍然=1 ✅
→ 不会产生nid=2的重复行
额外细节:MySQL写入后还需要获取nid给Neo4j用:
// 无论是INSERT还是UPDATE,都通过url查回nid
queryIdStmt.setString(1, news.getUrl());
try (ResultSet rs = queryIdStmt.executeQuery()) {
if (rs.next()) {
news.setNid(rs.getInt("nid")); // 供后续Neo4j写入使用
}
}
Neo4j幂等写入
方法:MERGE + 先清后写(delete-then-insert)
session.executeWrite(tx -> {
// 1. MERGE新闻节点(存在则匹配,不存在则创建)
tx.run("MERGE (n:NEW {url: $url}) SET n.title=$title, n.ctime=$ctime, n.nid=$nid", ...);
// 2. 先删除该新闻的所有旧CONTAINS关系 ← 解决NLP随机性问题
tx.run("MATCH (n:NEW {url: $url})-[r:CONTAINS]->() DELETE r", ...);
// 3. 重新创建实体和CONTAINS关系
tx.run("MERGE (e:PERSON {entity: $entity}) WITH e " +
"MATCH (n:NEW {url: $url}) MERGE (n)-[:CONTAINS]->(e)", ...);
return null;
});
为什么不能只用MERGE,还需要”先清后写”:
- MERGE保证节点不重复 ✅
- 但NLP有随机性:第一次识别出[张三, 北京],重启后第二次可能识别出[张三, 上海]
- 只用MERGE:北京和上海的关系都会保留 → 数据不一致 ❌
- 先清后写:每次先删除旧的CONTAINS关系,再重新创建 → 最终只保留最新结果 ✅
MERGE vs CREATE:
CREATE:每次都创建新节点,会导致重复MERGE:先查找匹配节点,存在则返回,不存在才创建
💡 Tips:面试时被问”幂等”一定要说清楚前提条件。MySQL的前提是url有唯一索引,Neo4j的前提是先清旧关系再写新关系。说出这些细节说明你真正理解了幂等而不是背概念
Q13. Flink的并行度设的多少?是怎么确定的?
回答:
并行度 = 3
// 位于 NewsProcessingJob.java#L40
env.setParallelism(3);
确定方法:
- 数据量考虑:
- 日均150条 ≈ 0.002条/秒
- 单线程每秒处理10+条完全没问题
- 瓶颈不在并行度,而在NLP API调用
- 资源考虑:
- 测试环境只有一台机器,3个TaskSlot
- 设置并行度=3可以充分利用资源
- Kafka分区匹配(理论上):
- Kafka只有1个分区,实际只有1个并行实例消费
- 其他2个实例处于空闲状态
- 这是过度配置,但不影响正确性
优化建议(面试加分):
- 如果Kafka扩展到3分区,并行度=3才有意义
- 更合理的配置:
parallelism = max(kafka_partitions, sink_parallelism)
Q14. 千问API调用是同步还是异步的?会不会阻塞Flink的流处理?
回答:
是同步调用,会阻塞! 这是当前实现的一个技术债务。
代码证据(NLPClient.java#L93-L110):
// OkHttp同步调用
try (Response response = httpClient.newCall(request).execute()) {
if (response.isSuccessful() && response.body() != null) {
String responseBody = response.body().string();
return JSON.parseObject(responseBody);
}
}
阻塞影响:
- 每条新闻需要调用4次NLP API(分类、情绪、实体、关系)
- 每次调用约1-3秒
- 处理一条新闻需要4-12秒
- Flink算子在此期间完全阻塞
为什么数据量小时不是问题:
- 每小时只处理6-10条新闻
- 即使每条12秒,也只占用2分钟
- 剩余58分钟足够处理积压
优化方案(面试加分):
- AsyncIO(推荐):
// Flink AsyncIO模式
AsyncDataStream.unorderedWait(
inputStream,
new AsyncNLPFunction(),
30, TimeUnit.SECONDS,
100 // 最大并发请求数
);
- 批量请求:
- 使用Window积攒数据
- 一次请求处理多条新闻
- 本地模型:
- 部署本地NLP模型(如HuggingFace)
- 消除网络延迟
Q15. 如果千问API超时或返回错误,你怎么处理?有做重试机制吗?重试几次?退避策略是什么?
回答:
实现了三层防护:超时控制 + 指数退避重试 + 降级默认值。
第一层:超时控制
代码位置(NLPClient.java#L32-L37):
this.httpClient = new OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS) // 连接超时10秒(快速失败)
.readTimeout(60, TimeUnit.SECONDS) // 读取超时60秒(LLM生成需要时间)
.writeTimeout(10, TimeUnit.SECONDS) // 写入超时10秒
.retryOnConnectionFailure(true) // OkHttp层面连接失败自动重试
.build();
为什么连接超时设10秒而不是30秒:
- 连接建立是TCP三次握手,正常情况毫秒级完成
- 如果10秒连不上,说明NLP服务很可能挂了,快速失败进入重试
- 读取超时保留60秒,因为LLM生成响应确实需要较长时间
第二层:指数退避重试
代码位置(NLPClient.java#L118-L155):
private static final int MAX_RETRIES = 3;
private static final long INITIAL_BACKOFF_MS = 1000; // 初始退避1秒
private JSONObject postWithRetry(String path, JSONObject body) throws IOException {
for (int attempt = 0; attempt < MAX_RETRIES; attempt++) {
try {
JSONObject result = post(path, body);
if (result != null) return result; // 成功就返回
} catch (IOException e) {
LOG.warn("NLP请求失败 (attempt {}/{}): {}", attempt+1, MAX_RETRIES, e.getMessage());
}
// 指数退避等待
if (attempt < MAX_RETRIES - 1) {
long backoffMs = INITIAL_BACKOFF_MS * (1L << attempt); // 1s → 2s → 4s
Thread.sleep(backoffMs);
}
}
return null; // 最终失败,返回null交给调用方降级
}
退避策略:
| 重试次数 | 等待时间 | 说明 |
|---|---|---|
| 第1次 | 立即 | 可能是瞬时网络抖动 |
| 第2次 | 1秒后 | 给NLP服务短暂恢复时间 |
| 第3次 | 2秒后 | 加倍等待 |
| 全部失败 | — | 返回null,降级处理 |
为什么用指数退避而不是固定间隔:
- 避免”惊群效应”:多个请求同时重试会压垮NLP服务
- 逐步加大等待时间,给服务恢复的余裕
第三层:降级默认值
// 分类失败 → 默认分类"社会"
public String classifyNews(...) {
JSONObject response = postWithRetry("/classify", body);
return response != null ? response.getString("type") : "社会";
}
// 实体识别失败 → 空列表(该新闻不会有知识图谱,但不影响其他新闻)
public List<Map<String, Object>> extractEntities(...) {
JSONObject response = postWithRetry("/entities", body);
return response != null ? ... : List.of();
}
降级策略的设计思路:
- 分类失败:给默认分类”社会”,前端仍可展示,用户几乎无感知
- 情绪分析失败:给”neutral”,不会误导
- 实体/关系失败:给空列表,该新闻不会出现在知识图谱中,但不影响其他新闻的处理
- 关键原则:单条NLP失败不应该阻塞整个流
💡 Tips:面试时说”我实现了三层防护——超时快速失败、指数退避重试、降级默认值”,这个回答结构清晰,能让面试官看出你考虑了工程健壮性
Q16. 命名实体识别的准确率大概是多少?有没有做过评估或人工验证?
回答:
坦诚回答:没有做过严格的准确率评估。
实际观察:
- 对于知名人物、大型组织、省市级地点,识别准确率较高(估计90%+)
- 对于小众人名、缩写、网络用语,准确率下降明显
为什么没做严格评估:
- 缺少标注数据集(需要人工标注新闻实体)
- 项目重点是架构和流程,NLP只是一个环节
- 使用千问大模型,相信其基础能力
人工验证经历:
- 随机抽查了约50条新闻的NLP结果
- 发现的问题:
- 部分人名识别为组织(如”张三”识别为”张三公司”)
- 时间实体有时漏识别
- 实体边界偶尔不准确
如果要做正式评估(面试加分):
- 使用公开数据集(如OntoNotes、MSRA)
- 计算Precision、Recall、F1-Score
- 按实体类型分别统计
# 评估指标计算
precision = TP / (TP + FP) # 识别出的实体中正确的比例
recall = TP / (TP + FN) # 正确实体被识别出的比例
f1 = 2 * precision * recall / (precision + recall)
Q17. 为什么选择千问而不是其他模型(如ChatGPT、文心一言)?有对比过效果吗?
回答:
选择理由:
| 因素 | 千问 | ChatGPT | 文心一言 |
|---|---|---|---|
| 国内访问 | ✅ 稳定 | ❌ 需要梯子 | ✅ 稳定 |
| API价格 | 便宜 | 较贵 | 中等 |
| 中文能力 | 优秀 | 良好 | 优秀 |
| 响应速度 | 快(国内服务器) | 慢(海外) | 中等 |
| 阿里云生态 | ✅ 与其他服务集成方便 | ❌ | ❌ |
主要考虑:
- 网络稳定性:项目在国内部署,ChatGPT需要翻墙,不稳定
- 成本:千问API价格约0.008元/千token,比ChatGPT便宜很多
- 技术栈统一:使用阿里云的dashscope SDK,与其他阿里云服务(OSS、RDS)配合方便
有没有对比效果:
- 简单测试过,三者对于NER任务效果相近
- 千问对中文新闻的理解比ChatGPT略好(中文预训练数据更多)
- 没有做严格的A/B测试
代码实现(nlp_processor.py#L31-L46):
class LLMClient:
def __init__(self, api_key: str = None, model: str = "qwen-max"):
self.model = model
import dashscope
dashscope.api_key = self.api_key
Q18. Flink的窗口你用了吗?滚动窗口还是滑动窗口?如果没用,为什么不用?
回答:
没有使用窗口(Window)。
为什么不用窗口:
- 业务场景不需要:
- 新闻处理是逐条独立的,不需要聚合多条数据
- 每条新闻单独做NLP分析,结果只与自身相关
- 不需要”统计最近5分钟的新闻数量”这类聚合操作
- 窗口适用场景:
- 计算时间段内的统计指标(如PV/UV)
- 需要跨多条数据的操作(如JOIN、聚合)
- 需要按时间或数量分批处理
- 我的处理模式:
DataStream<NewsEntity> → map → map → map → sink- 纯粹的流式转换,无需窗口
代码结构(NewsProcessingJob.java#L56-L89):
// 纯map链,无窗口
DataStream<NewsEntity> parsedStream = newsStream.map(new JsonParseFunction());
DataStream<NewsEntity> classifiedStream = parsedStream.map(new ClassificationFunction());
DataStream<NewsEntity> emotionStream = classifiedStream.map(new EmotionAnalysisFunction());
// ... 直接到sink
如果要用窗口(面试加分):
- 热词统计可以用窗口:每10分钟统计一次词频Top10
newsStream
.keyBy(news -> news.getType())
.window(TumblingProcessingTimeWindows.of(Time.minutes(10)))
.aggregate(new HotWordAggregator());
Q19. 如果Flink任务失败重启,Kafka的offset是怎么恢复的?会不会重复消费?
回答:
Offset恢复机制:
Flink Kafka Connector会自动管理offset,与Checkpoint绑定。
配置(NewsProcessingJob.java#L44-L50):
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("news_topic")
.setGroupId("flink-news-processor")
.setStartingOffsets(OffsetsInitializer.latest()) // 从最新开始
.build();
恢复流程:
| 场景 | 恢复行为 |
|---|---|
| 有Checkpoint | 从Checkpoint中的offset恢复,可能重复消费最近一次Checkpoint后的数据 |
| 无Checkpoint | 根据 StartingOffsets 配置决定(这里是latest,从最新开始) |
会不会重复消费:
- 会! 但这是At-Least-Once的正常行为
- Checkpoint间隔60秒,最多重复消费60秒内的数据
- 通过Sink端幂等写入保证最终一致性
图示:
时间轴: ─────────────────────────────────
▲ ▲ ▲
CP1 CP2 故障
└── 重启后从CP2恢复
重新消费CP2之后的数据
Q20. 你的Flink代码是用Java写的还是Python(PyFlink)?为什么选这个?
回答:
两个版本都写了:
- 主版本:Java(生产使用)
- 备用版本:PyFlink(小数据量场景)
Java版本位置:NewsProcessingJob.java
PyFlink版本位置:pyflink_processor.py
为什么主版本用Java:
| 因素 | Java | PyFlink |
|---|---|---|
| 性能 | 高(JVM原生) | 中(Python GIL + 跨语言序列化) |
| 生态成熟度 | Flink原生,资料多 | 相对较新,坑多 |
| 调试工具 | 完善(IDEA调试) | 一般 |
| 生产部署 | 标准JAR包 | 需要Python环境 |
PyFlink的价值:
- 小数据量场景(<200条/天)完全够用
- 与爬虫、NLP、Flask共用Python技术栈,维护成本低
- 开发调试更方便(不用编译)
代码对比:
// Java版
DataStream<NewsEntity> classifiedStream = parsedStream
.map(new ClassificationFunction())
.name("News Classification");
# PyFlink版
classified_stream = parsed_stream \
.map(NewsClassificationFunction(nlp_url)) \
.name("News Classification")
Q21. 日均150条数据,用Flink是不是有点大材小用?为什么不用简单的批处理脚本?
回答:
坦诚承认:是的,这个数据量确实不需要Flink。
诚实回答:
| 方案 | 适用场景 | 实际需求 |
|---|---|---|
| Flink | 高吞吐、低延迟、复杂状态 | ❌ 不需要 |
| Celery + Redis | 异步任务队列 | ✅ 足够 |
| Crontab + Python脚本 | 定时批处理 | ✅ 足够 |
为什么还是用Flink:
- 技术学习(最主要原因):
- 这是一个学习项目,目的是熟悉流式处理架构
- 面试时能说”我用过Flink”比”我用过Crontab”更有价值
- 掌握了Flink,以后遇到真正的大数据量场景能快速上手
- 架构可扩展性:
- 如果未来接入更多数据源,架构不需要大改
- Kafka + Flink 是业界标准组合,扩展性强
- 实时性:
- Flink是真正的流处理,数据来了立刻处理
- 批处理脚本最快也是分钟级延迟
替代方案展示(面试加分,证明你知道更简单的方案):
我也实现了一个Celery简化版(celery_processor.py):
from celery import Celery
app = Celery('news_processor', broker='redis://localhost:6379/0')
@app.task
def process_news(news_data):
# NLP处理
entities = nlp_client.extract_entities(news_data['content'])
# 写入数据库
save_to_mysql(news_data, entities)
这个方案代码量少50%,部署更简单,完全能满足需求。
三、Flask后端API(Q22-Q32)
Q22. 这15+个API,能列举几个核心的吗?最复杂的接口是哪个?
回答:
API总览(共18个接口,分布在两个Blueprint中):
news_crud.py(7个接口)- CRUD操作
| 路由 | 方法 | 功能 |
|---|---|---|
/news/ | GET | 首页渲染 |
/news/news | POST | 多条件组合查询+分页(最复杂) |
/news/<nid> | GET | 获取单条新闻详情 |
/news/add | POST | 新增新闻 |
/news/<nid> | POST | 更新新闻 |
/news/delete | POST | 批量删除新闻 |
/news/newsSources | POST | 获取所有新闻来源 |
news_analysis.py(11个接口)- 数据分析
| 路由 | 方法 | 功能 |
|---|---|---|
/analysis/news/total | GET | 获取新闻总数 |
/analysis/news/dayChange | GET | 日新闻同比环比 |
/analysis/news/weekChange | GET | 周新闻环比变化 |
/analysis/news/buzzwords | GET | 热词排行榜Top10 |
/analysis/news/typeRanking | GET | 分类排名Top3 |
/analysis/news/type | GET | 获取所有新闻类型 |
/analysis/news/typeNumber | GET | 每日分类统计 |
/analysis/news/daily_sum | GET | 指定日期新闻数量 |
/analysis/news/range_sum | GET | 时间段新闻数量 |
/analysis/news/knowledgeMap/<nid> | GET | 知识图谱数据(最复杂) |
/analysis/news/events/<nid> | GET | 新闻事件链 |
最复杂的接口:
/news/news(多条件组合查询):
- 支持4个筛选条件:时间范围、标题关键词、来源、类型
- 实现分页、排序
- 集成缓存机制
/analysis/news/knowledgeMap/<nid>(知识图谱):
- 查询Neo4j图数据库
- 组装ECharts需要的nodes、links、categories格式
- 涉及多次Cypher查询
代码位置:
Q23. 平均响应时间<50ms是怎么测出来的?用的什么工具?样本量是多少?
回答:
⚠️ 面试建议:这个数据需要诚实说明测量方法,不要夸大。
测量方法:
- 工具:使用 Chrome DevTools 的 Network 面板
- 样本量:约50-100次请求
- 测试接口:主要是简单查询接口(如
/analysis/news/total)
实际情况:
| 接口类型 | 平均响应时间 | 备注 |
|---|---|---|
| 简单统计(total, dayChange) | 10-30ms | 单表count查询 |
| 列表查询(有缓存) | 20-50ms | 命中缓存 |
| 列表查询(无缓存) | 100-200ms | 需要多表查询 |
| 知识图谱查询 | 50-150ms | Neo4j查询 |
诚实回答:
- 简历中写的”<50ms”是针对大部分简单接口
- 复杂查询接口(如组合筛选、知识图谱)可能超过100ms
- 如果要严格测量,应该用 wrk、ab 或 JMeter 做压测
改进方案(如果面试官追问):
# 使用 wrk 进行压测
wrk -t4 -c100 -d30s http://localhost:5000/analysis/news/total
# 输出示例
Latency: avg=45ms, max=200ms
Requests/sec: 500
Q24. 多条件组合查询具体是怎么实现的?SQL是手写还是用ORM动态拼接?
回答:
使用 ORM动态拼接 + Python过滤 的混合方式。
实现代码(位于 news_crud.py#L19-L63):
@bp_news.route('/news', methods=['POST'])
def handle_news():
# 获取筛选参数
title = request.args.get('title')
time_range = request.args.getlist('timeRange[]', type=str)
sources = request.args.getlist('sources[]', type=str)
types = request.args.getlist('types[]', type=str)
# 第一步:按时间范围查询(ORM)
filtered_news = get_news_no_content_by_time(time_start, time_end)
# 第二步:Python过滤(链式调用)
if sources:
filtered_news = filter_news_by_source(sources, filtered_news)
if title:
filtered_news = filter_news_by_title(title, filtered_news)
if types:
filtered_news = filter_news_by_types(types, filtered_news)
为什么用Python过滤而不是全部用SQL:
- 灵活性:条件可选,用Python判断更直观
- 复用性:过滤函数可以单独使用
- 数据量小:日均150条,全加载到内存过滤完全可行
如果数据量大,应该怎么改(面试加分):
# 改用SQLAlchemy动态拼接
query = News.query.filter(News.ctime >= stime, News.ctime < etime)
if sources:
query = query.filter(News.source.in_(sources))
if title:
query = query.filter(News.title.like(f'%{title}%'))
if types:
query = query.filter(News.type.in_(types))
# 分页
results = query.offset(offset).limit(page_size).all()
Q25. 你用的是Flask还是Flask-RESTful?为什么没有用FastAPI?
回答:
使用的是 原生Flask + Blueprint,没有用Flask-RESTful。
技术选型理由:
| 框架 | 优点 | 缺点 | 我的选择 |
|---|---|---|---|
| Flask原生 | 轻量、灵活、学习成本低 | 需要手动处理很多事情 | ✅ 选用 |
| Flask-RESTful | 资源抽象、规范化 | 学习成本略高 | ❌ 未用 |
| FastAPI | 性能高、自动文档、类型提示 | 异步编程复杂、生态略弱 | ❌ 未用 |
为什么没用FastAPI:
- 团队熟悉度:团队成员对Flask更熟悉
- 项目规模:小项目不需要FastAPI的性能优势
- 同步模型:我们的数据库操作都是同步的,FastAPI的异步优势发挥不出来
- 历史原因:项目启动时FastAPI还不够成熟(2020年左右)
如果重新选型:
- 会考虑FastAPI,因为它的自动API文档(Swagger)非常方便
- 类型提示可以减少bug
Q26. 有没有做接口缓存?用的什么缓存(Redis/Flask-Caching)?缓存策略是什么?
回答:
有做缓存,使用的是 Flask-Caching 的 SimpleCache(内存缓存)。
配置代码(位于 app/caches/init.py):
from flask_caching import Cache
cache = Cache(config={
"CACHE_TYPE": "SimpleCache", # 内存缓存
"CACHE_DEFAULT_TIMEOUT": 300 # 5分钟过期
})
class CACHE:
title = 'title'
time_range = 'time_range'
source = 'source'
filtered_news = 'filtered_news'
flag = 'cache_flag'
types = 'types'
缓存策略(位于 news_crud.py#L37-L60):
# 检查缓存命中条件
if cache.has(CACHE.time_range) and \
cache.get(CACHE.flag) is True and \
cache.get(CACHE.time_range) == time_range and \
cache.get(CACHE.title) == title and \
cache.get(CACHE.source) == sources and \
cache.get(CACHE.types) == types:
# 缓存命中
filtered_news = cache.get(CACHE.filtered_news)
else:
# 重新查询并缓存
filtered_news = query_from_db()
cache.set(CACHE.filtered_news, filtered_news)
缓存失效机制:
- 新增/修改/删除新闻时,设置
cache.set(CACHE.flag, False)使缓存失效 - 下次查询会重新加载数据
为什么用SimpleCache而不是Redis:
- 单实例部署,不需要分布式缓存
- 数据量小,内存缓存足够
- 简化部署,不需要额外运维Redis
如果扩展到多实例(面试加分):
# 改用Redis
cache = Cache(config={
"CACHE_TYPE": "RedisCache",
"CACHE_REDIS_HOST": "localhost",
"CACHE_REDIS_PORT": 6379,
"CACHE_DEFAULT_TIMEOUT": 300
})
Q27. 如果某个接口慢了(比如超过200ms),你会怎么排查?
回答:
排查步骤:
1️⃣ 确定慢在哪里
import time
@bp_news.route('/news', methods=['POST'])
def handle_news():
t1 = time.time()
# 数据库查询
filtered_news = get_news_no_content_by_time(time_start, time_end)
print(f"DB query: {time.time() - t1:.3f}s")
t2 = time.time()
# 过滤处理
filtered_news = filter_news(filtered_news)
print(f"Filter: {time.time() - t2:.3f}s")
t3 = time.time()
# JSON序列化
result = jsonify(data)
print(f"Serialize: {time.time() - t3:.3f}s")
2️⃣ 常见慢点及解决方案
| 慢点 | 排查方法 | 解决方案 |
|---|---|---|
| 数据库查询 | EXPLAIN分析SQL | 加索引、优化查询 |
| ORM序列化 | 对象数量过多 | 分页、延迟加载 |
| 外部API调用 | 网络延迟 | 异步调用、缓存 |
| JSON序列化 | 数据量大 | 压缩、分页 |
3️⃣ 实际经历
遇到过知识图谱接口慢的问题(300ms+),原因是:
- Neo4j查询没有走索引
- 解决:在nid上创建索引
# app/__init__.py
query = """
CREATE INDEX news_index IF NOT EXISTS
FOR (n:NEW) ON (n.nid)
"""
neo4j_db.run(query)
Q28. 数据库连接池配置了吗?连接数是多少?
回答:
使用SQLAlchemy默认连接池,没有显式配置。
默认配置:
- 连接池类型:QueuePool
- 默认连接数:5
- 最大溢出:10(总共最多15个连接)
为什么没有显式配置:
- 单实例部署,默认配置足够
- 日均150条数据,并发量极低
如果需要配置(面试加分):
# config.py
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_size': 10, # 常驻连接数
'pool_recycle': 3600, # 1小时回收连接
'pool_timeout': 30, # 获取连接超时
'max_overflow': 20, # 最大溢出连接
}
Q29. 接口的鉴权怎么做的?有没有用JWT或Session?
回答:
⚠️ 诚实回答:当前版本没有实现鉴权。
原因:
- 这是一个内部分析系统,不对公网开放
- 项目重点是数据处理流程,鉴权不是核心功能
- 时间有限,优先实现核心功能
如果要实现(面试加分,展示你知道怎么做):
# 使用 Flask-JWT-Extended
from flask_jwt_extended import JWTManager, jwt_required, create_access_token
app.config['JWT_SECRET_KEY'] = 'your-secret-key'
jwt = JWTManager(app)
# 登录接口
@app.route('/login', methods=['POST'])
def login():
username = request.json.get('username')
password = request.json.get('password')
if verify_password(username, password):
token = create_access_token(identity=username)
return jsonify(access_token=token)
return jsonify(msg='Invalid credentials'), 401
# 需要鉴权的接口
@bp_news.route('/news', methods=['POST'])
@jwt_required()
def handle_news():
# ...
Q30. 接口有做限流吗?如果有人恶意刷接口怎么办?
回答:
⚠️ 诚实回答:当前版本没有实现限流。
原因:
- 内部系统,不对外开放
- 没有预期的恶意访问场景
如果要实现(面试加分):
# 使用 Flask-Limiter
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
@bp_news.route('/news', methods=['POST'])
@limiter.limit("10 per minute") # 每分钟最多10次
def handle_news():
# ...
其他防护措施:
- Nginx层限流:
limit_req_zone - WAF防护
- IP黑名单
Q31. 前后端分离,跨域问题怎么解决的?
回答:
使用 Flask-CORS 解决跨域问题。
配置方式(虽然代码中没有显式展示,但实际使用了):
from flask_cors import CORS
app = Flask(__name__)
CORS(app) # 允许所有域名跨域
# 或者更精细的控制
CORS(app, resources={
r"/news/*": {"origins": "http://localhost:3000"},
r"/analysis/*": {"origins": "http://localhost:3000"}
})
跨域涉及的HTTP头:
Access-Control-Allow-Origin: http://localhost:3000
Access-Control-Allow-Methods: GET, POST, OPTIONS
Access-Control-Allow-Headers: Content-Type, Authorization
开发环境vs生产环境:
- 开发环境:允许
localhost:3000(前端开发服务器) - 生产环境:配置具体域名,不用
*
Q32. 接口返回的数据格式是什么样的?有统一的响应封装吗?错误码是怎么设计的?
回答:
⚠️ 诚实回答:没有严格统一的响应封装,不同接口格式略有不同。
当前实际情况:
# 列表查询接口
return jsonify({
'data': news_list,
'total': 100,
'pageSize': 20,
'current': 1
})
# 操作类接口
return jsonify({'success': True})
return jsonify({'success': False, 'errors': errors})
# 统计接口 - 直接返回数据
return jsonify({'total': 150})
如果要改进(面试加分):
# 统一响应封装
def api_response(data=None, code=0, message='success'):
return jsonify({
'code': code,
'message': message,
'data': data,
'timestamp': int(time.time())
})
# 错误码设计
ERROR_CODES = {
0: 'success',
1001: 'param_error', # 参数错误
1002: 'not_found', # 资源不存在
2001: 'db_error', # 数据库错误
3001: 'unauthorized', # 未授权
5001: 'internal_error', # 服务器内部错误
}
# 使用示例
@bp_news.route('/<int:news_id>', methods=['GET'])
def news_detail(news_id):
news = News.query.get(news_id)
if not news:
return api_response(code=1002, message='新闻不存在')
return api_response(data=news.json())
四、Neo4j图数据库(Q33-Q42)
Q33. 你的Neo4j Schema是什么样的?有哪些节点类型和关系类型?
回答:
节点类型(Labels):
| 节点类型 | 含义 | 核心属性 |
|---|---|---|
NEW | 新闻节点 | nid, title, url, ctime |
PERSON | 人物实体 | entity, count |
LOCATION | 地点实体 | entity, count |
ORGANIZATION | 组织实体 | entity, count |
TIME | 时间实体 | entity, count |
EVENT | 事件节点 | text, eid |
EVE | 事件链起点 | eid |
关系类型(Relationships):
| 关系类型 | 含义 | 起点 → 终点 |
|---|---|---|
CONTAINS | 新闻包含实体 | NEW → PERSON/LOCATION/ORG |
NEXT | 事件链 | EVENT → EVENT |
| 动态关系 | 实体间关系 | 实体 → 实体(如”就职于”、”位于”) |
Schema图示:
┌──────────┐
│ NEW │
│ (新闻) │
└────┬─────┘
│ CONTAINS
┌──────────┼──────────┐
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────────┐
│ PERSON │ │LOCATION│ │ORGANIZATION│
└────┬───┘ └────────┘ └──────┬─────┘
│ │
└────── 动态关系 ─────────┘
(就职于/合作/位于等)
创建代码示例(位于 NewsProcessingJob.java#L254-L290):
// 创建新闻节点
session.run(
"MERGE (n:NEW {url: $url}) " +
"SET n.title = $title, n.ctime = $ctime, n.nid = $nid"
);
// 创建实体节点和CONTAINS关系
session.run(
"MERGE (e:PERSON {entity: $entity}) " +
"WITH e " +
"MATCH (n:NEW {url: $url}) " +
"MERGE (n)-[:CONTAINS]->(e)"
);
Q34. 多跳关系查询具体是什么场景?能举个例子吗?最多查几跳?
回答:
多跳查询场景:
- 场景一:查找与某人物相关的所有新闻(2跳)
MATCH (p:PERSON {entity: '马云'})<-[:CONTAINS]-(n:NEW)
RETURN n.title
- 场景二:查找两个人物之间的关联路径(多跳)
MATCH path = (p1:PERSON {entity: '马云'})-[*1..3]-(p2:PERSON {entity: '马化腾'})
RETURN path
- 场景三:查找某新闻的所有关联实体及其关系(2跳)
MATCH (n:NEW {nid: 123})-[:CONTAINS]->(e1)-[r]->(e2)
RETURN e1, r, e2
实际代码(位于 news_analysis.py#L230-L270):
# 知识图谱接口 - 2跳查询
query = f"""
MATCH (start:NEW {{nid: {nid}}})-[:CONTAINS]->(end)
RETURN end
"""
# 查询实体间关系 - 3跳查询
query = f"""
MATCH (n:NEW {{nid: {nid}}})-[:CONTAINS]->(start)-[r]->(end)
RETURN type(r) AS label, r
"""
最多查几跳:
- 当前实现最多 3跳
- 原因:跳数越多,查询复杂度指数增长
- 图数据库一般建议不超过5跳
面试加分:
如果要查更多跳,需要考虑性能优化:
- 限制返回路径数量(LIMIT)
- 使用APOC插件的路径查找算法
- 预计算热门路径
Q35. 为什么选Neo4j而不是关系型数据库存关系?有什么优势?
回答:
选择Neo4j的理由:
| 对比维度 | Neo4j | MySQL |
|---|---|---|
| 关系存储 | 物理指针,O(1)查找 | 外键JOIN,O(n)查找 |
| 多跳查询 | 原生支持,性能线性 | 多次JOIN,性能指数下降 |
| Schema灵活 | 可随时添加属性 | 需要ALTER TABLE |
| 直观性 | 图模型直观 | 表模型抽象 |
具体优势:
- 多跳查询性能:
-- MySQL: 查3跳关系需要3次JOIN
SELECT * FROM entity e1
JOIN relation r1 ON e1.id = r1.source
JOIN entity e2 ON r1.target = e2.id
JOIN relation r2 ON e2.id = r2.source
JOIN entity e3 ON r2.target = e3.id;
-- 数据量大时非常慢
-- Neo4j: 一条语句搞定
MATCH (e1)-[*1..3]->(e2) RETURN e1, e2
-- 性能与跳数线性相关
- 实体类型动态:
- 新闻A有人物、地点
- 新闻B有人物、组织、时间
- Neo4j可以灵活处理,MySQL需要很多nullable字段
- 可视化天然支持:
- Neo4j Browser可以直接看图
- ECharts的Force图需要的格式与Neo4j返回格式接近
什么时候用MySQL更好(面试加分):
- 如果只需要存储实体列表,不需要查关系
- 如果数据量非常大(千万级)且关系简单
- 如果团队对SQL更熟悉
Q36. Neo4j有建索引吗?建在哪些属性上?为什么?
回答:
有建索引,在应用启动时自动创建。
代码位置(app/init.py#L14-L18):
neo4j_db = Graph(f'{NEO4J_PROTOCOL}://{NEO4J_HOST}:{NEO4J_PORT}',
auth=(NEO4J_USER, NEO4J_PWD),
name=NEO4J_CONNECT_NAME)
query = """
CREATE INDEX news_index IF NOT EXISTS
FOR (n:NEW) ON (n.nid)
"""
neo4j_db.run(query)
索引设计:
| 索引名 | 节点类型 | 属性 | 原因 |
|---|---|---|---|
news_index | NEW | nid | 按新闻ID查询知识图谱 |
| 应该添加 | PERSON | entity | 按人名查询 |
| 应该添加 | NEW | url | 幂等写入需要 |
为什么在nid上建索引:
- 知识图谱接口
knowledgeMap/<nid>按nid查询 - 没有索引的话,每次查询需要全表扫描
索引效果:
-- 无索引:扫描所有NEW节点
MATCH (n:NEW {nid: 123}) RETURN n -- 慢
-- 有索引:直接定位
MATCH (n:NEW {nid: 123}) RETURN n -- 快(O(1))
改进建议(面试加分):
-- 应该再添加这些索引
CREATE INDEX entity_index IF NOT EXISTS FOR (p:PERSON) ON (p.entity);
CREATE INDEX location_index IF NOT EXISTS FOR (l:LOCATION) ON (l.entity);
CREATE INDEX org_index IF NOT EXISTS FOR (o:ORGANIZATION) ON (o.entity);
CREATE CONSTRAINT url_unique IF NOT EXISTS FOR (n:NEW) REQUIRE n.url IS UNIQUE;
Q37. 大规模图查询会不会很慢?你有没有遇到过性能问题?怎么解决的?
回答:
当前数据量下没有性能问题(节点<1000,边<5000)。
潜在性能问题及解决方案:
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 全图扫描慢 | 没有索引 | 创建索引 |
| 多跳查询爆炸 | 返回路径太多 | 添加LIMIT |
| 写入慢 | 频繁小事务 | 批量写入 |
| 内存溢出 | 返回数据量太大 | 分页查询 |
遇到过的问题:
- 开发初期知识图谱接口响应慢(300ms+)
- 原因:没有在nid上建索引
- 解决:添加索引后降到50ms
优化示例:
-- 优化前:可能返回很多路径
MATCH (n:NEW)-[:CONTAINS]->(e)-[*1..5]->(other)
RETURN e, other
-- 优化后:限制返回数量
MATCH (n:NEW {nid: $nid})-[:CONTAINS]->(e)
WITH e LIMIT 50
MATCH (e)-[r]->(other)
RETURN e, r, other LIMIT 100
面试加分:
如果数据量达到百万级,可以考虑:
- 使用APOC插件的并行查询
- 分片存储(Neo4j Fabric)
- 冷热数据分离
Q38. Neo4j的事务是怎么处理的?和MySQL的事务有什么区别?
回答:
Neo4j事务使用:
在代码中使用 session 自动管理事务:
// Flink Neo4j Sink
try (Session session = driver.session()) {
session.run("MERGE (n:NEW {url: $url}) ...");
// 自动提交
}
# Python py2neo
neo4j_db.run("MERGE (n:NEW {nid: $nid})", nid=123)
# 每次run自动开启和提交事务
Neo4j vs MySQL事务对比:
| 特性 | Neo4j | MySQL |
|---|---|---|
| 默认隔离级别 | READ_COMMITTED | REPEATABLE_READ |
| 锁粒度 | 节点/关系级别 | 行级/表级 |
| ACID支持 | ✅ 完全支持 | ✅ 完全支持 |
| 分布式事务 | 单机事务 | 支持XA |
显式事务控制:
# 显式事务(批量操作时使用)
tx = neo4j_db.begin()
try:
tx.run("CREATE (n:NEW {nid: 1})")
tx.run("CREATE (n:NEW {nid: 2})")
tx.commit()
except:
tx.rollback()
我的使用方式:
- 单条新闻处理用自动事务
- 如果需要批量导入,应该用显式事务
Q39. 你现在图里有多少个节点、多少条边?数据量级是怎样的?
回答:
数据量统计:
| 类型 | 数量 | 说明 |
|---|---|---|
| NEW节点 | ~500-1000 | 日均150条 × 运行天数 |
| PERSON节点 | ~300-500 | 去重后的人物实体 |
| LOCATION节点 | ~100-200 | 去重后的地点实体 |
| ORGANIZATION节点 | ~200-400 | 去重后的组织实体 |
| CONTAINS关系 | ~2000-5000 | 每条新闻约5个实体 |
| 实体间关系 | ~500-1000 | 每条新闻约1-2个关系 |
查询命令:
-- 统计节点数
MATCH (n) RETURN labels(n)[0] as label, count(*) as count
-- 统计关系数
MATCH ()-[r]->() RETURN type(r) as type, count(*) as count
诚实回答:
- 这是一个小规模图,不到1万节点
- 对于Neo4j来说,这个量级完全没有压力
- Neo4j轻松支持百万到千万级节点
面试加分:
如果数据量增长到百万级:
- 考虑分片(Sharding)
- 使用企业版的Fabric功能
- 优化查询,避免全图扫描
Q40. 可视化数据输出是什么意思?后端返回的是什么格式?前端怎么渲染的?
回答:
后端返回格式(位于 news_analysis.py#L230-L280):
专门为ECharts的关系图(Force布局)设计的JSON格式:
{
"nodes": [
{"name": "马云", "category": 0, "value": 35.5},
{"name": "阿里巴巴", "category": 1, "value": 28.3},
{"name": "杭州", "category": 2, "value": 15.2}
],
"links": [
{"source": "马云", "target": "阿里巴巴", "label": "创立"},
{"source": "阿里巴巴", "target": "杭州", "label": "位于"}
],
"categories": [
{"name": "PERSON"},
{"name": "ORGANIZATION"},
{"name": "LOCATION"}
]
}
后端组装代码:
knowledge_map = {
"nodes": [],
"links": [],
"categories": []
}
# 从Neo4j查询节点
for node in nodes:
knowledge_map["nodes"].append({
"name": node["entity"],
"category": category_index,
"value": weight # 节点大小
})
# 从Neo4j查询关系
for relationship in result:
knowledge_map["links"].append({
"source": source_entity,
"target": target_entity,
"label": relationship_type
})
前端渲染(React + ECharts):
import ReactECharts from 'echarts-for-react';
const option = {
series: [{
type: 'graph',
layout: 'force',
data: knowledgeMap.nodes,
links: knowledgeMap.links,
categories: knowledgeMap.categories,
force: {
repulsion: 100,
edgeLength: [50, 100]
}
}]
};
return <ReactECharts option={option} />;
Q41. 如果让你重新设计,Schema会有什么改进?
回答:
当前Schema的问题:
- 实体去重不彻底:
- “马云”和”Jack Ma”可能是两个节点
- 应该添加别名机制
- 关系类型太散:
- 动态生成的关系类型可能有很多变体(”就职于”、”任职于”、”工作于”)
- 应该规范化关系类型
- 缺少时间维度:
- 关系没有时间属性
- 无法表达”马云曾任职阿里巴巴CEO”
改进方案:
// 1. 添加别名属性
(:PERSON {entity: "马云", aliases: ["Jack Ma", "马总"]})
// 2. 规范化关系类型(只用5-10种)
AFFILIATED_WITH // 隶属于
LOCATED_IN // 位于
WORKS_AT // 就职于
FOUNDED // 创立
INVESTED_IN // 投资
COOPERATE_WITH // 合作
// 3. 关系添加时间属性
(:PERSON)-[:WORKS_AT {from: 1999, to: 2019}]->(:ORG)
// 4. 添加实体置信度
(:PERSON {entity: "马云", confidence: 0.95})
改进后的Schema图:
┌───────────────┐
│ NEW │
│ nid, title, │
│ ctime, url │
└───────┬───────┘
│ CONTAINS (mention_count)
┌────────────┼────────────┐
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌─────────────┐
│ PERSON │ │LOCATION│ │ORGANIZATION │
│entity │ │entity │ │entity │
│aliases │ │geo_code│ │industry │
└────┬───┘ └────────┘ └──────┬──────┘
│ │
└─── WORKS_AT(from,to) ────┘
Q42. Neo4j和MySQL之间的数据一致性怎么保证?有没有可能出现不一致的情况?
回答:
⚠️ 诚实回答:有可能出现不一致。
当前实现:
// Flink Sink - 先写MySQL,再写Neo4j
relationStream.addSink(new MySQLSink()); // 第一步
relationStream.addSink(new Neo4jSink()); // 第二步
可能的不一致场景:
| 场景 | 结果 | 概率 |
|---|---|---|
| MySQL成功,Neo4j失败 | MySQL有数据,Neo4j没有 | 低 |
| MySQL失败,Neo4j成功 | 两边都没有(Flink会重试) | 极低 |
| 网络抖动中断 | 部分数据不一致 | 低 |
为什么没做强一致性:
- 不是金融交易场景,最终一致性可接受
- 实现分布式事务成本高
- 可以通过离线对账发现问题
保证措施:
- 幂等写入:
- 两边都用MERGE,重复执行不会产生脏数据
- Flink Checkpoint:
- 失败后从Checkpoint恢复,重新处理
- 可以添加对账机制(面试加分):
# 每日对账脚本
mysql_ids = set(get_all_news_ids_from_mysql())
neo4j_ids = set(get_all_news_ids_from_neo4j())
missing_in_neo4j = mysql_ids - neo4j_ids
if missing_in_neo4j:
sync_to_neo4j(missing_in_neo4j)
如果要做强一致性:
- 使用分布式事务(2PC)
- 或者使用Saga模式
- 但对于这个项目规模,过度设计
五、综合与架构问题(Q43-Q54)
Q43. 这个项目你觉得最大的技术亮点是什么?最有挑战的部分是什么?
回答:
技术亮点:
- 完整的流式处理架构:
- 从数据采集(爬虫)→ 消息队列(Kafka)→ 流处理(Flink)→ 存储(MySQL+Neo4j)→ API服务(Flask)→ 可视化(React+ECharts)
- 端到端打通,不是玩具项目
- 端到端Exactly-Once语义:
- Checkpoint配置了EXACTLY_ONCE模式 + 超时容错 + 取消后保留
- MySQL通过url唯一索引 + ON DUPLICATE KEY保证幂等
- Neo4j通过MERGE + 先清后写策略解决NLP随机性导致的关系不一致
- NLP调用实现了指数退避重试(3次,1s→2s→4s)+ 降级默认值
- 知识图谱构建:
- 从非结构化文本中提取实体和关系
- 支持多跳关系查询和可视化
最有挑战的部分:
- NLP随机性与数据一致性的矛盾:
- LLM的输出有随机性,重启后重新处理同一消息可能产生不同结果
- 解决方案:Neo4j端采用”先清后写”——每次先删除该新闻的旧CONTAINS关系,再根据最新NLP结果重建
- 保证无论重试多少次,最终结果只保留一份
- 双数据源一致性:
- MySQL和Neo4j需要保持数据同步
- 没有分布式事务支持,但通过幂等写入保证最终一致
- Flink与LLM API的集成:
- 同步API调用阻塞问题
- 通过三层防护(超时、重试、降级)保证健壮性
💡 面试建议:先说亮点展示能力,再说挑战展示深度思考
Q44. 如果重新做这个项目,你会换掉哪些技术选型?为什么?
回答:
| 当前选型 | 可能替换 | 原因 |
|---|---|---|
| Flink | Celery + Redis | 数据量小,Celery更轻量 |
| Java Flink | PyFlink | 统一Python技术栈 |
| Flask | FastAPI | 自动文档、类型提示、更好的性能 |
| SimpleCache | Redis | 为多实例部署做准备 |
| 千问同步调用 | 异步调用+队列 | 避免阻塞流处理 |
不会换的选型:
| 选型 | 原因 |
|---|---|
| Kafka | 解耦价值仍在,且学习成本已付出 |
| Neo4j | 图数据库确实比MySQL更适合关系查询 |
| React + ECharts | 前端生态成熟,可视化效果好 |
技术选型的思考框架(面试加分):
1. 业务需求 → 这个技术能解决什么问题?
2. 团队能力 → 团队能否驾驭这个技术?
3. 运维成本 → 长期维护是否可承受?
4. 扩展性 → 未来业务增长能否支撑?
Q45. 这个项目有没有做过压测?能承受多少QPS?瓶颈在哪里?
回答:
⚠️ 诚实回答:没有做过正式的压力测试。
估算分析:
| 组件 | 预估承受能力 | 瓶颈点 |
|---|---|---|
| Flask API | 200-500 QPS | 单进程GIL限制 |
| MySQL查询 | 1000+ QPS | 连接池大小 |
| Neo4j查询 | 500+ QPS | 复杂Cypher查询 |
| Flink处理 | 10-50条/秒 | NLP API调用延迟 |
主要瓶颈:
- Flask单进程:
- 解决:Gunicorn多worker
gunicorn -w 4 -b 0.0.0.0:5000 run:app
- NLP API同步调用:
- 解决:异步调用或本地模型
- 数据库连接:
- 解决:增大连接池
如果要做压测(面试加分):
# 使用wrk进行压测
wrk -t12 -c400 -d30s http://localhost:5000/analysis/news/total
# 或使用locust进行场景测试
locust -f locustfile.py --host=http://localhost:5000
Q46. 你在项目中负责哪部分?团队怎么分工的?
回答:
💡 面试建议:根据实际情况调整,以下是参考回答
我的职责:
- 数据处理流水线(核心):
- Kafka消息队列设计和配置
- Flink流处理作业开发
- NLP API集成和调用
- 后端API开发:
- Flask Blueprint设计
- 数据库模型定义
- 知识图谱接口实现
- 部署运维:
- Docker容器化
- 服务编排
团队分工(如果是团队项目):
| 角色 | 负责内容 |
|---|---|
| 我 | 数据处理 + 后端API |
| 队友A | 前端React开发 |
| 队友B | 爬虫开发 + 数据清洗 |
| 队友C | NLP模型调研 + Prompt优化 |
如果是个人项目:
这是一个个人学习项目,所有模块都是我独立完成的,但在遇到问题时会和导师、同学讨论。
Q47. 代码怎么管理的?用Git吗?分支策略是什么?
回答:
使用Git进行版本控制。
分支策略:
main ─────●─────●─────●─────── (稳定版本)
↑ ↑ ↑
develop ────●●●───●●●───●●●───── (开发版本)
↑ ↑
feature/xxx ───●●● │
feature/yyy ────────●●●
| 分支 | 用途 |
|---|---|
| main | 稳定版本,可部署 |
| develop | 开发分支,集成测试 |
| feature/* | 功能分支,按功能开发 |
提交规范:
feat: 新增知识图谱接口
fix: 修复缓存失效问题
refactor: 重构NLP调用逻辑
docs: 更新README文档
工作流:
- 从develop创建feature分支
- 完成功能后提交PR
- Code Review后合并到develop
- 定期将develop合并到main
Q48. 有没有写单元测试或集成测试?覆盖率大概是多少?
回答:
⚠️ 诚实回答:测试覆盖不足。
当前状态:
| 模块 | 测试情况 |
|---|---|
| Flask API | 简单的手动测试 |
| 工具函数 | 部分单元测试 |
| Flink作业 | 无自动化测试 |
| NLP处理 | 无自动化测试 |
为什么测试不足:
- 时间有限,优先实现核心功能
- 部分模块依赖外部服务(Kafka、Neo4j),Mock成本高
- 技术债务,后续需要补充
如果要补充测试(面试加分):
# tests/test_news_api.py
import pytest
from app import app
@pytest.fixture
def client():
app.config['TESTING'] = True
with app.test_client() as client:
yield client
def test_news_total(client):
response = client.get('/analysis/news/total')
assert response.status_code == 200
data = response.get_json()
assert 'total' in data
def test_news_detail_not_found(client):
response = client.get('/news/99999')
assert response.status_code == 404
目标覆盖率:
- 核心业务逻辑:80%+
- 工具函数:90%+
- 整体:60-70%
Q49. 项目上线后遇到过什么问题吗?怎么解决的?
回答:
问题1:知识图谱接口响应慢(300ms+)
- 现象:前端力导向图加载缓慢
- 排查:加日志发现Neo4j查询慢
- 原因:没有在nid上建索引
- 解决:添加索引,响应时间降到50ms
query = """
CREATE INDEX news_index IF NOT EXISTS
FOR (n:NEW) ON (n.nid)
"""
neo4j_db.run(query)
问题2:缓存失效不及时
- 现象:新增新闻后列表不更新
- 原因:缓存flag设置逻辑有bug
- 解决:在所有写操作后统一调用
cache.set(CACHE.flag, False)
问题3:NLP返回JSON解析失败
- 现象:部分新闻实体识别结果为空
- 原因:大模型有时返回非JSON格式
- 解决:添加fallback处理和正则提取
try:
result = json.loads(response)
except json.JSONDecodeError:
# fallback到规则提取
result = self._fallback_extraction(content)
Q50. 如果让你继续迭代这个项目,下一步会做什么优化?
回答:
短期优化(1-2周):
- 添加接口鉴权:JWT token验证
- 统一响应格式:code + message + data
- 补充单元测试:核心接口80%覆盖率
中期优化(1-2月):
- NLP异步处理:
- Flink AsyncIO或单独的Celery队列
- 解决同步阻塞问题
- 缓存升级:
- SimpleCache → Redis
- 支持多实例部署
- 监控告警:
- Prometheus + Grafana
- API响应时间、错误率监控
长期优化(3月+):
- 多数据源支持:
- 接入更多新闻源
- 统一数据格式
- NLP模型私有化:
- 部署本地模型,降低API成本
- 提高响应速度
- 知识图谱增强:
- 实体融合(同一实体不同名称)
- 关系推理(隐含关系发现)
Q51. 这个项目有没有可扩展性问题?如果数据量翻10倍、100倍,架构够用吗?
回答:
扩展性分析:
| 数据量 | 当前架构 | 需要的改进 |
|---|---|---|
| 1x(150条/天) | ✅ 完全够用 | 无 |
| 10x(1500条/天) | ⚠️ 可能有压力 | Kafka分区增加,Flink并行度提高 |
| 100x(1.5万条/天) | ❌ 架构需调整 | 见下方 |
100倍数据量需要的改进:
- Kafka:
- 单分区 → 多分区(3-5个)
- 单节点 → 集群(3节点)
- Flink:
- 单机 → 集群模式
- 并行度从3提高到10+
- NLP调用改为异步
- MySQL:
- 单库 → 读写分离
- 考虑分表(按时间)
- Neo4j:
- 社区版 → 企业版
- 考虑Fabric分片
- Flask:
- 单实例 → 多实例 + 负载均衡
- 缓存改用Redis集群
架构演进图:
当前:单机架构
爬虫 → Kafka(1分区) → Flink(1节点) → MySQL/Neo4j → Flask
未来:分布式架构
多爬虫 → Kafka集群 → Flink集群 → MySQL主从/Neo4j集群 → Flask集群 + Redis
↓
Nginx负载均衡
Q52. 你觉得这个项目哪里做得不够好?最想改进的地方是什么?
回答:
💡 面试建议:这道题考察自我认知能力,诚实回答但要有改进方案
做得不够好的地方:
- 测试覆盖不足(最想改进):
- 没有自动化测试,全靠手工验证
- 改进:补充pytest单元测试,CI/CD集成
- 错误处理粗糙:
- 很多地方直接catch Exception
- 改进:细化异常类型,统一错误处理中间件
- 缺少监控:
- 线上问题只能看日志
- 改进:接入Prometheus,关键指标告警
- 文档不完善:
- API没有Swagger文档
- 改进:使用Flask-RESTX自动生成
- NLP调用同步阻塞:
- 影响Flink吞吐量
- 改进:改用AsyncIO或队列解耦
我从中学到的:
- 技术选型要匹配业务规模
- 测试和文档是长期投资
- 诚实面对技术债务
Q53. 这个项目最能体现你技术能力的点是什么?
回答:
三个最能体现技术能力的点:
1. 端到端架构设计能力
- 不是只会写CRUD,而是能设计完整的数据流水线
- 从采集 → 处理 → 存储 → 服务 → 展示,每个环节都有考虑
- 体现系统思维
2. 分布式系统理解
- 理解Exactly-Once语义的实现原理
- 知道Checkpoint、幂等、offset管理如何配合
- 遇到一致性问题时能分析原因和解决方案
- 体现技术深度
3. 技术选型的权衡思考
- 知道Kafka/Flink对于这个数据量是过度设计
- 但能解释为什么还这样选(学习目的、可扩展性)
- 同时准备了轻量替代方案(Celery)
- 体现工程思维
💡 总结:不是做得多好,而是知道做得不够好在哪里,以及如何改进
Q54. 面试官可能会问:你写简历说”15+个API”,能不能现场手写一个最复杂的接口?你敢吗?
回答:
敢! 最复杂的接口是知识图谱接口,我现在手写:
@bp_analysis.route('/news/knowledgeMap/<int:nid>', methods=['GET'])
def news_knowledge_map_by_id(nid):
"""根据新闻ID获取知识图谱数据"""
# 1. 检查新闻是否存在
matcher = NodeMatcher(neo4j_db)
if not matcher.match("NEW", nid=nid).exists():
return jsonify({})
# 2. 初始化返回结构(ECharts格式)
knowledge_map = {
"nodes": [],
"links": [],
"categories": []
}
# 3. 查询该新闻关联的所有实体节点
query = f"""
MATCH (start:NEW {{nid: {nid}}})-[:CONTAINS]->(end)
RETURN end
"""
nodes = [record["end"] for record in neo4j_db.run(query)]
# 4. 处理节点:提取类型、计算权重
categories = []
weights = [node["count"] for node in nodes]
total = sum(weights)
for i, node in enumerate(nodes):
category = str(node.labels)[1:] # 去掉冒号
if category not in categories:
categories.append(category)
knowledge_map["nodes"].append({
"name": node["entity"],
"category": categories.index(category),
"value": 100 * weights[i] / total
})
# 5. 查询实体间的关系
rel_query = f"""
MATCH (n:NEW {{nid: {nid}}})-[:CONTAINS]->(s)-[r]->(t)
RETURN s.entity as source, t.entity as target, type(r) as label
"""
for rel in neo4j_db.run(rel_query):
knowledge_map["links"].append({
"source": rel["source"],
"target": rel["target"],
"label": rel["label"]
})
# 6. 添加分类信息
knowledge_map["categories"] = [{"name": c} for c in categories]
return jsonify(knowledge_map)
关键点解释:
- 先检查节点存在性(避免空查询)
- 返回格式专为ECharts Force图设计
- 两次Cypher查询:一次取节点,一次取关系
- 权重归一化用于控制节点大小
