Files
guba2vec/sentiment_time_series.py
zzy5111398 0098977172 完成股吧数据分析项目:
1. 修复词云断句问题 - 添加英文单词过滤
2. 创建 Word2Vec + CNN 情绪感知模型
3. 创建情绪时间序列分析脚本(基于大连理工大学情感词典)
4. 添加停用词文件(1427个中英文停用词)
5. 更新 analyze.py 保存时间字段 post_publish_time
6. 更新 requirements.txt 添加必要依赖
2026-05-28 15:30:16 +08:00

297 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import json
import re
import numpy as np
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg')
import jieba
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'SimSun', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False
# 加载停用词
def load_stopwords(filepath='stopwords.txt'):
stopwords = set()
if os.path.exists(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
word = line.strip()
if word:
stopwords.add(word)
return stopwords
STOPWORDS = load_stopwords()
# ============================================================
# 构建情感词典(参照 sentiment_analysis.py
# ============================================================
def build_sentiment_dictionary():
"""使用大连理工大学中文情感词汇本体构建情感词典"""
dict_path = '大连理工大学中文情感词汇本体.xlsx'
try:
df = pd.read_excel(dict_path)
df = df[['词语', '词性种类', '词义数', '词义序号', '情感分类', '强度', '极性']]
Happy = []
Good = []
Surprise = []
Anger = []
Sad = []
Fear = []
Disgust = []
for idx, row in df.iterrows():
if row['情感分类'] in ['PA', 'PE']:
Happy.append(row['词语'])
if row['情感分类'] in ['PD', 'PH', 'PG', 'PB', 'PK']:
Good.append(row['词语'])
if row['情感分类'] in ['PC']:
Surprise.append(row['词语'])
if row['情感分类'] in ['NA']:
Anger.append(row['词语'])
if row['情感分类'] in ['NB', 'NJ', 'NH', 'PF']:
Sad.append(row['词语'])
if row['情感分类'] in ['NI', 'NC', 'NG']:
Fear.append(row['词语'])
if row['情感分类'] in ['NE', 'ND', 'NN', 'NK', 'NL']:
Disgust.append(row['词语'])
# 添加股票相关词汇
stock_positive = ['', '上涨', '暴涨', '拉升', '涨停', '盈利', '收益', '赚钱', '',
'利好', '增长', '上升', '增加', '发展', '进步', '提升', '改善', '突破',
'创新', '优势', '超预期', '亮眼', '惊艳', '奇迹']
stock_negative = ['', '下跌', '暴跌', '跳水', '跌停', '亏损', '亏钱', '', '损失',
'套牢', '垃圾', '恶心', '坑爹', '骗局', '', '爆雷', '崩盘', '退市']
Good.extend(stock_positive)
Disgust.extend(stock_negative)
Positive = Happy + Good + Surprise
Negative = Anger + Sad + Fear + Disgust
print(f'大连理工大学情感词典加载完成')
print(f' 正面情感词: {len(Positive)}')
print(f' 负面情感词: {len(Negative)}')
return {
'Happy': Happy,
'Good': Good,
'Surprise': Surprise,
'Anger': Anger,
'Sad': Sad,
'Fear': Fear,
'Disgust': Disgust,
'Positive': Positive,
'Negative': Negative
}
except Exception as e:
print(f'加载大连理工大学情感词典失败: {e}')
print('使用简化版情感词典')
return build_simplified_dictionary()
def build_simplified_dictionary():
"""构建简化的中文情感词典(备用方案)"""
Happy = ['开心', '快乐', '高兴', '喜悦', '愉快', '欣喜', '欢乐', '欢喜', '幸福',
'满意', '满足', '欣慰', '愉悦', '畅快', '乐观', '积极', '美好', '成功']
Good = ['', '优秀', '出色', '精彩', '卓越', '杰出', '优良', '良好', '完美', '不错',
'', '上涨', '暴涨', '拉升', '涨停', '盈利', '收益', '赚钱', '', '利好',
'增长', '上升', '增加', '发展', '进步', '提升', '改善', '突破', '创新', '优势']
Surprise = ['惊喜', '意外', '震惊', '惊讶', '震撼', '神奇', '奇迹', '惊艳', '亮眼', '超预期']
Anger = ['愤怒', '生气', '恼火', '气愤', '暴怒', '愤慨', '愤恨', '震怒', '发怒',
'', '垃圾', '恶心', '坑爹', '骗局', '欺骗', '欺诈', '造假', '腐败', '黑暗']
Sad = ['伤心', '难过', '悲伤', '痛苦', '悲哀', '沮丧', '失望', '绝望', '低落', '悲观',
'', '下跌', '暴跌', '跳水', '跌停', '亏损', '亏钱', '', '损失', '套牢']
Fear = ['害怕', '恐惧', '担心', '担忧', '恐慌', '不安', '焦虑', '忧虑', '紧张', '恐怖',
'风险', '危机', '危险', '下跌', '暴跌', '崩盘', '退市', '爆雷', '', '']
Disgust = ['厌恶', '恶心', '反感', '讨厌', '鄙视', '唾弃', '不屑', '蔑视', '嫌弃',
'垃圾', '废物', '不行', '差劲', '', '', '', '骗局']
Positive = Happy + Good + Surprise
Negative = Anger + Sad + Fear + Disgust
print(f'简化版情感词典构建完成')
print(f' 正面情感词: {len(Positive)}')
print(f' 负面情感词: {len(Negative)}')
return {
'Happy': Happy,
'Good': Good,
'Surprise': Surprise,
'Anger': Anger,
'Sad': Sad,
'Fear': Fear,
'Disgust': Disgust,
'Positive': Positive,
'Negative': Negative
}
# ============================================================
# 情绪计算函数(参照 sentiment_analysis.py
# ============================================================
def emotion_caculate(text, sentiment_dict):
"""计算单条文本的情绪"""
if not text or pd.isna(text):
return 0
positive = 0
negative = 0
wordlist = jieba.lcut(text)
for word in wordlist:
# 跳过停用词和短词
if word in STOPWORDS or len(word) <= 1:
continue
freq = wordlist.count(word)
if word in sentiment_dict['Positive']:
positive += freq
if word in sentiment_dict['Negative']:
negative += freq
sentiment_score = positive - negative
return sentiment_score
# ============================================================
# 时间序列分析
# ============================================================
def analyze_sentiment_trend():
"""分析情绪时间序列趋势(使用情感词典)"""
print("="*60)
print("情绪时间序列分析(基于情感词典)")
print("="*60)
# 构建情感词典
print("\n[1/5] 构建情感词典...")
sentiment_dict = build_sentiment_dictionary()
# 加载数据
print("\n[2/5] 加载数据...")
df = pd.read_csv('output/all_posts.csv', encoding='utf-8-sig')
# 检查是否有 post_publish_time 字段
if 'post_publish_time' not in df.columns:
print("警告:数据中没有 post_publish_time 字段,请先运行 analyze.py")
return
# 转换时间戳
print("\n[3/5] 转换时间戳...")
df['timestamp'] = pd.to_datetime(df['post_publish_time'], errors='coerce')
df = df.dropna(subset=['timestamp'])
df['date'] = df['timestamp'].dt.date
# 计算情绪得分
print("\n[4/5] 计算情绪得分...")
df['sentiment_score'] = df['clean_text'].apply(
lambda x: emotion_caculate(x, sentiment_dict)
)
# 保存结果
df.to_csv('output/sentiment_analysis_result.csv', index=False, encoding='utf-8-sig')
print(" 情绪分析结果已保存到: output/sentiment_analysis_result.csv")
# 按股票分组分析
stock_groups = df.groupby('stock_code')
os.makedirs('output/plots', exist_ok=True)
print("\n[5/5] 生成时间序列图表...")
for stock_code, group in stock_groups:
stock_name = group['stock_name'].iloc[0]
print(f"\n 分析 {stock_name} ({stock_code})...")
# 按日期分组计算平均情绪
daily_sentiment = group.groupby('date')['sentiment_score'].agg(['mean', 'count']).reset_index()
daily_sentiment.columns = ['date', 'avg_sentiment', 'post_count']
if len(daily_sentiment) < 2:
print(f" 数据不足,跳过")
continue
# 绘制时间序列图
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8), sharex=True)
# 情绪趋势
ax1.plot(daily_sentiment['date'], daily_sentiment['avg_sentiment'],
marker='o', linestyle='-', color='b', label='日均情绪')
# 添加移动平均线
daily_sentiment['MA3'] = daily_sentiment['avg_sentiment'].rolling(window=3).mean()
ax1.plot(daily_sentiment['date'], daily_sentiment['MA3'],
marker='', linestyle='--', color='r', label='3日移动平均')
ax1.set_title(f'{stock_name} ({stock_code}) 情绪时间序列趋势', fontsize=14)
ax1.set_ylabel('情绪分数', fontsize=12)
ax1.axhline(y=0, color='gray', linestyle='-', linewidth=0.5)
ax1.grid(True)
ax1.legend()
# 发帖量
ax2.bar(daily_sentiment['date'], daily_sentiment['post_count'], color='g', alpha=0.7)
ax2.set_xlabel('日期', fontsize=12)
ax2.set_ylabel('发帖数量', fontsize=12)
ax2.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
# 保存图表
plot_path = f'output/plots/sentiment_trend_{stock_name}.png'
plt.savefig(plot_path, dpi=100)
plt.close()
print(f" 图表已保存到: {plot_path}")
# 输出统计信息
avg_sentiment = group['sentiment_score'].mean()
pos_count = (group['sentiment_score'] > 0).sum()
neg_count = (group['sentiment_score'] < 0).sum()
neu_count = (group['sentiment_score'] == 0).sum()
print(f" 平均情绪: {avg_sentiment:.4f}")
print(f" 正面帖子: {pos_count}, 负面帖子: {neg_count}, 中性帖子: {neu_count}")
# 生成汇总报告
print("\n生成汇总报告...")
summary_data = []
for stock_code, group in stock_groups:
stock_name = group['stock_name'].iloc[0]
avg_sentiment = group['sentiment_score'].mean()
post_count = len(group)
pos_count = (group['sentiment_score'] > 0).sum()
neg_count = (group['sentiment_score'] < 0).sum()
neu_count = (group['sentiment_score'] == 0).sum()
summary_data.append({
'股票代码': stock_code,
'股票名称': stock_name,
'帖子数量': post_count,
'平均情绪': round(avg_sentiment, 4),
'正面帖子': pos_count,
'负面帖子': neg_count,
'中性帖子': neu_count
})
summary_df = pd.DataFrame(summary_data)
summary_df.to_csv('output/sentiment_summary.csv', index=False, encoding='utf-8-sig')
print("汇总报告已保存到: output/sentiment_summary.csv")
print("\n" + "="*60)
print("情绪时间序列分析完成!")
print("="*60)
if __name__ == '__main__':
analyze_sentiment_trend()