Files
guba2vec/analyze.py
T
2026-05-28 04:54:42 +08:00

347 lines
12 KiB
Python

import json
import os
import re
import jieba
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg') # 使用非交互式后端
# 中文停用词表
STOPWORDS = {
'', '', '', '', '', '', '', '', '', '', '', '', '一个', '', '', '', '', '', '',
'', '', '', '', '没有', '', '', '自己', '', '', '', '', '', '', '', '', '什么', '怎么',
'为什么', '哪里', '', '多少', '', '', '', '', '', '', '', '', '', '今天', '明天', '昨天', '',
'', '', '已经', '还是', '还是', '但是', '可是', '不过', '只是', '只有', '就是', '还是', '或者', '还是', '还是',
'这个', '那个', '这些', '那些', '那么', '这么', '怎么', '如何', '因为', '所以', '虽然', '但是', '如果', '', '那么',
'', '', '', '', '', '还是', '还是', '还是', '还是', '还是', '还是', '还是', '还是', '还是', '还是',
'股吧', '东方财富', '帖子', '发表', '回复', '点击', '查看', '更多', '原文', '转发', '分享', '收藏', '评论', '点赞',
'http', 'https', 'com', 'cn', 'www', 'net', 'org'
}
def clean_text(text):
"""清洗文本"""
if not text:
return ""
# 移除URL
text = re.sub(r'https?://\S+|www\.\S+', '', text)
# 移除HTML标签
text = re.sub(r'<.*?>', '', text)
# 移除表情符号
text = re.sub(r'\[.*?\]', '', text)
# 移除特殊字符
text = re.sub(r'[^\w\s]', '', text)
# 移除数字
text = re.sub(r'\d+', '', text)
# 移除多余空格
text = re.sub(r'\s+', ' ', text).strip()
return text
def tokenize(text):
"""中文分词"""
words = jieba.lcut(text)
# 过滤停用词和短词
words = [w for w in words if w not in STOPWORDS and len(w) > 1]
return words
def load_data(data_dir='data'):
"""加载所有股票数据"""
all_data = []
stock_info = {}
if not os.path.exists(data_dir):
print(f'数据目录 {data_dir} 不存在')
return all_data, stock_info
for filename in os.listdir(data_dir):
if filename.endswith('.json'):
filepath = os.path.join(data_dir, filename)
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
stock_name = data.get('stock_name', '未知')
stock_code = data.get('stock_code', '未知')
posts = data.get('posts', [])
stock_info[stock_code] = {
'name': stock_name,
'post_count': len(posts)
}
for post in posts:
content = post.get('post_content', '')
title = post.get('post_title', '')
full_text = f"{title} {content}".strip()
if full_text:
all_data.append({
'stock_code': stock_code,
'stock_name': stock_name,
'post_id': post.get('post_id'),
'text': full_text,
'clean_text': clean_text(full_text)
})
except Exception as e:
print(f'加载文件 {filename} 失败: {e}')
return all_data, stock_info
def calculate_tfidf(texts):
"""计算TF-IDF"""
vectorizer = TfidfVectorizer(
tokenizer=tokenize,
token_pattern=None,
max_features=1000,
ngram_range=(1, 2)
)
tfidf_matrix = vectorizer.fit_transform(texts)
feature_names = vectorizer.get_feature_names_out()
return tfidf_matrix, feature_names, vectorizer
def get_top_keywords(tfidf_matrix, feature_names, top_n=20):
"""获取Top关键词"""
avg_tfidf = np.array(tfidf_matrix.mean(axis=0)).flatten()
top_indices = avg_tfidf.argsort()[-top_n*4:][::-1] # 多取一些,避免重复后不够
# 先收集候选词
candidates = []
for idx in top_indices:
word = feature_names[idx]
if len(word.strip()) > 0:
candidates.append({
'word': word,
'tfidf': avg_tfidf[idx],
'length': len(word.split()) # 词的长度(包含多少个词)
})
# 按词长降序排序(优先保留组合词)
candidates.sort(key=lambda x: (-x['length'], -x['tfidf']))
# 智能去重 - 优先保留组合词
keywords = []
seen_words = set()
seen_parts = set()
for candidate in candidates:
word = candidate['word']
word_parts = word.split()
# 检查是否应该添加这个词
should_add = True
# 检查这个词的任何部分是否已经被其他词使用了
for part in word_parts:
if part in seen_parts:
should_add = False
break
if should_add and word not in seen_words:
seen_words.add(word)
# 记录所有使用过的词部分
for part in word_parts:
seen_parts.add(part)
keywords.append({
'word': word,
'tfidf': candidate['tfidf']
})
if len(keywords) >= top_n:
break
# 按TF-IDF重新排序
keywords.sort(key=lambda x: -x['tfidf'])
return keywords
def get_stock_specific_keywords(all_data, stock_code, top_n=20):
"""获取特定股票的关键词"""
stock_texts = [d['clean_text'] for d in all_data if d['stock_code'] == stock_code]
other_texts = [d['clean_text'] for d in all_data if d['stock_code'] != stock_code]
if len(stock_texts) < 5:
return []
all_texts = stock_texts + other_texts
tfidf_matrix, feature_names, vectorizer = calculate_tfidf(all_texts)
# 计算该股票的平均TF-IDF
stock_matrix = tfidf_matrix[:len(stock_texts)]
avg_tfidf = np.array(stock_matrix.mean(axis=0)).flatten()
# 计算其他股票的平均TF-IDF
if other_texts:
other_matrix = tfidf_matrix[len(stock_texts):]
other_avg = np.array(other_matrix.mean(axis=0)).flatten()
# 计算差值
diff = avg_tfidf - other_avg
else:
diff = avg_tfidf
top_indices = diff.argsort()[-top_n*4:][::-1] # 多取一些,避免重复后不够
# 先收集候选词
candidates = []
for idx in top_indices:
word = feature_names[idx]
if len(word.strip()) > 0:
candidates.append({
'word': word,
'tfidf': avg_tfidf[idx],
'diff': diff[idx],
'length': len(word.split()) # 词的长度
})
# 按词长降序排序(优先保留组合词)
candidates.sort(key=lambda x: (-x['length'], -x['diff']))
# 智能去重 - 优先保留组合词
keywords = []
seen_words = set()
seen_parts = set()
for candidate in candidates:
word = candidate['word']
word_parts = word.split()
# 检查是否应该添加这个词
should_add = True
# 检查这个词的任何部分是否已经被其他词使用了
for part in word_parts:
if part in seen_parts:
should_add = False
break
if should_add and word not in seen_words:
seen_words.add(word)
# 记录所有使用过的词部分
for part in word_parts:
seen_parts.add(part)
keywords.append({
'word': word,
'tfidf': candidate['tfidf'],
'diff': candidate['diff']
})
if len(keywords) >= top_n:
break
# 按diff重新排序
keywords.sort(key=lambda x: -x['diff'])
return keywords
def generate_wordcloud(keywords, stock_name, output_dir='output'):
"""生成词云"""
os.makedirs(output_dir, exist_ok=True)
word_freq = {k['word']: k['tfidf'] for k in keywords}
wc = WordCloud(
font_path='C:/Windows/Fonts/simhei.ttf', # Windows中文字体路径
width=800,
height=600,
background_color='white',
max_words=100
)
wc.generate_from_frequencies(word_freq)
output_path = os.path.join(output_dir, f'wordcloud_{stock_name}.png')
wc.to_file(output_path)
print(f'词云已保存到: {output_path}')
return output_path
def analyze_all():
"""完整分析流程"""
print('='*60)
print('股吧数据 TF-IDF 分析')
print('='*60)
# 创建输出目录
os.makedirs('output', exist_ok=True)
# 加载数据
print('\n[1/5] 加载数据...')
all_data, stock_info = load_data()
if not all_data:
print('没有找到数据,请先运行爬虫')
return
print(f' 共加载 {len(all_data)} 条帖子')
print(f' 涉及 {len(stock_info)} 只股票:')
for code, info in stock_info.items():
print(f' - {info["name"]} ({code}): {info["post_count"]}')
# 整体分析
print('\n[2/5] 整体关键词分析...')
all_texts = [d['clean_text'] for d in all_data]
tfidf_matrix, feature_names, vectorizer = calculate_tfidf(all_texts)
overall_keywords = get_top_keywords(tfidf_matrix, feature_names, top_n=30)
print('\n 整体Top 20关键词:')
for i, kw in enumerate(overall_keywords[:20], 1):
print(f' {i:2d}. {kw["word"]:10s} (TF-IDF: {kw["tfidf"]:.4f})')
# 保存整体关键词
overall_df = pd.DataFrame(overall_keywords)
overall_df.to_csv('output/overall_keywords.csv', index=False, encoding='utf-8-sig')
# 生成整体词云
generate_wordcloud(overall_keywords, 'overall')
# 各股票单独分析
print('\n[3/5] 各股票关键词分析...')
stock_keywords = {}
for stock_code in stock_info.keys():
stock_name = stock_info[stock_code]['name']
print(f'\n 分析 {stock_name} ({stock_code})...')
keywords = get_stock_specific_keywords(all_data, stock_code, top_n=20)
stock_keywords[stock_code] = keywords
if keywords:
print(f' Top 10关键词:')
for i, kw in enumerate(keywords[:10], 1):
print(f' {i:2d}. {kw["word"]:10s} (TF-IDF: {kw["tfidf"]:.4f}, 差值: {kw["diff"]:.4f})')
# 生成词云
generate_wordcloud(keywords, stock_name)
# 保存关键词
df = pd.DataFrame(keywords)
df.to_csv(f'output/keywords_{stock_name}.csv', index=False, encoding='utf-8-sig')
# 生成汇总报告
print('\n[4/5] 生成汇总报告...')
report_data = []
for stock_code, keywords in stock_keywords.items():
stock_name = stock_info[stock_code]['name']
top_words = ', '.join([k['word'] for k in keywords[:5]])
report_data.append({
'股票代码': stock_code,
'股票名称': stock_name,
'帖子数量': stock_info[stock_code]['post_count'],
'Top5关键词': top_words
})
report_df = pd.DataFrame(report_data)
report_df.to_csv('output/summary_report.csv', index=False, encoding='utf-8-sig')
print(' 汇总报告已保存到: output/summary_report.csv')
# 保存所有文本数据
print('\n[5/5] 保存预处理数据...')
all_df = pd.DataFrame(all_data)
all_df.to_csv('output/all_posts.csv', index=False, encoding='utf-8-sig')
print(' 所有帖子已保存到: output/all_posts.csv')
print('\n' + '='*60)
print('分析完成!结果保存在 output/ 目录中')
print('='*60)
if __name__ == '__main__':
analyze_all()