import os import json import re import numpy as np import pandas as pd from datetime import datetime import matplotlib.pyplot as plt import matplotlib matplotlib.use('Agg') import jieba # 设置中文字体 plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'SimSun', 'Arial Unicode MS'] plt.rcParams['axes.unicode_minus'] = False # 加载停用词 def load_stopwords(filepath='stopwords.txt'): stopwords = set() if os.path.exists(filepath): with open(filepath, 'r', encoding='utf-8') as f: for line in f: word = line.strip() if word: stopwords.add(word) return stopwords STOPWORDS = load_stopwords() # ============================================================ # 构建情感词典(参照 sentiment_analysis.py) # ============================================================ def build_sentiment_dictionary(): """使用大连理工大学中文情感词汇本体构建情感词典""" dict_path = '大连理工大学中文情感词汇本体.xlsx' try: df = pd.read_excel(dict_path) df = df[['词语', '词性种类', '词义数', '词义序号', '情感分类', '强度', '极性']] Happy = [] Good = [] Surprise = [] Anger = [] Sad = [] Fear = [] Disgust = [] for idx, row in df.iterrows(): if row['情感分类'] in ['PA', 'PE']: Happy.append(row['词语']) if row['情感分类'] in ['PD', 'PH', 'PG', 'PB', 'PK']: Good.append(row['词语']) if row['情感分类'] in ['PC']: Surprise.append(row['词语']) if row['情感分类'] in ['NA']: Anger.append(row['词语']) if row['情感分类'] in ['NB', 'NJ', 'NH', 'PF']: Sad.append(row['词语']) if row['情感分类'] in ['NI', 'NC', 'NG']: Fear.append(row['词语']) if row['情感分类'] in ['NE', 'ND', 'NN', 'NK', 'NL']: Disgust.append(row['词语']) # 添加股票相关词汇 stock_positive = ['涨', '上涨', '暴涨', '拉升', '涨停', '盈利', '收益', '赚钱', '赚', '利好', '增长', '上升', '增加', '发展', '进步', '提升', '改善', '突破', '创新', '优势', '超预期', '亮眼', '惊艳', '奇迹'] stock_negative = ['跌', '下跌', '暴跌', '跳水', '跌停', '亏损', '亏钱', '赔', '损失', '套牢', '垃圾', '恶心', '坑爹', '骗局', '雷', '爆雷', '崩盘', '退市'] Good.extend(stock_positive) Disgust.extend(stock_negative) Positive = Happy + Good + Surprise Negative = Anger + Sad + Fear + Disgust print(f'大连理工大学情感词典加载完成') print(f' 正面情感词: {len(Positive)}个') print(f' 负面情感词: {len(Negative)}个') return { 'Happy': Happy, 'Good': Good, 'Surprise': Surprise, 'Anger': Anger, 'Sad': Sad, 'Fear': Fear, 'Disgust': Disgust, 'Positive': Positive, 'Negative': Negative } except Exception as e: print(f'加载大连理工大学情感词典失败: {e}') print('使用简化版情感词典') return build_simplified_dictionary() def build_simplified_dictionary(): """构建简化的中文情感词典(备用方案)""" Happy = ['开心', '快乐', '高兴', '喜悦', '愉快', '欣喜', '欢乐', '欢喜', '幸福', '满意', '满足', '欣慰', '愉悦', '畅快', '乐观', '积极', '美好', '成功'] Good = ['好', '优秀', '出色', '精彩', '卓越', '杰出', '优良', '良好', '完美', '不错', '涨', '上涨', '暴涨', '拉升', '涨停', '盈利', '收益', '赚钱', '赚', '利好', '增长', '上升', '增加', '发展', '进步', '提升', '改善', '突破', '创新', '优势'] Surprise = ['惊喜', '意外', '震惊', '惊讶', '震撼', '神奇', '奇迹', '惊艳', '亮眼', '超预期'] Anger = ['愤怒', '生气', '恼火', '气愤', '暴怒', '愤慨', '愤恨', '震怒', '发怒', '骂', '垃圾', '恶心', '坑爹', '骗局', '欺骗', '欺诈', '造假', '腐败', '黑暗'] Sad = ['伤心', '难过', '悲伤', '痛苦', '悲哀', '沮丧', '失望', '绝望', '低落', '悲观', '跌', '下跌', '暴跌', '跳水', '跌停', '亏损', '亏钱', '赔', '损失', '套牢'] Fear = ['害怕', '恐惧', '担心', '担忧', '恐慌', '不安', '焦虑', '忧虑', '紧张', '恐怖', '风险', '危机', '危险', '下跌', '暴跌', '崩盘', '退市', '爆雷', '雷', '怕'] Disgust = ['厌恶', '恶心', '反感', '讨厌', '鄙视', '唾弃', '不屑', '蔑视', '嫌弃', '垃圾', '废物', '不行', '差劲', '差', '烂', '渣', '骗局'] Positive = Happy + Good + Surprise Negative = Anger + Sad + Fear + Disgust print(f'简化版情感词典构建完成') print(f' 正面情感词: {len(Positive)}个') print(f' 负面情感词: {len(Negative)}个') return { 'Happy': Happy, 'Good': Good, 'Surprise': Surprise, 'Anger': Anger, 'Sad': Sad, 'Fear': Fear, 'Disgust': Disgust, 'Positive': Positive, 'Negative': Negative } # ============================================================ # 情绪计算函数(参照 sentiment_analysis.py) # ============================================================ def emotion_caculate(text, sentiment_dict): """计算单条文本的情绪""" if not text or pd.isna(text): return 0 positive = 0 negative = 0 wordlist = jieba.lcut(text) for word in wordlist: # 跳过停用词和短词 if word in STOPWORDS or len(word) <= 1: continue freq = wordlist.count(word) if word in sentiment_dict['Positive']: positive += freq if word in sentiment_dict['Negative']: negative += freq sentiment_score = positive - negative return sentiment_score # ============================================================ # 时间序列分析 # ============================================================ def analyze_sentiment_trend(): """分析情绪时间序列趋势(使用情感词典)""" print("="*60) print("情绪时间序列分析(基于情感词典)") print("="*60) # 构建情感词典 print("\n[1/5] 构建情感词典...") sentiment_dict = build_sentiment_dictionary() # 加载数据 print("\n[2/5] 加载数据...") df = pd.read_csv('output/all_posts.csv', encoding='utf-8-sig') # 检查是否有 post_publish_time 字段 if 'post_publish_time' not in df.columns: print("警告:数据中没有 post_publish_time 字段,请先运行 analyze.py") return # 转换时间戳 print("\n[3/5] 转换时间戳...") df['timestamp'] = pd.to_datetime(df['post_publish_time'], errors='coerce') df = df.dropna(subset=['timestamp']) df['date'] = df['timestamp'].dt.date # 计算情绪得分 print("\n[4/5] 计算情绪得分...") df['sentiment_score'] = df['clean_text'].apply( lambda x: emotion_caculate(x, sentiment_dict) ) # 保存结果 df.to_csv('output/sentiment_analysis_result.csv', index=False, encoding='utf-8-sig') print(" 情绪分析结果已保存到: output/sentiment_analysis_result.csv") # 按股票分组分析 stock_groups = df.groupby('stock_code') os.makedirs('output/plots', exist_ok=True) print("\n[5/5] 生成时间序列图表...") for stock_code, group in stock_groups: stock_name = group['stock_name'].iloc[0] print(f"\n 分析 {stock_name} ({stock_code})...") # 按日期分组计算平均情绪 daily_sentiment = group.groupby('date')['sentiment_score'].agg(['mean', 'count']).reset_index() daily_sentiment.columns = ['date', 'avg_sentiment', 'post_count'] if len(daily_sentiment) < 2: print(f" 数据不足,跳过") continue # 绘制时间序列图 fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8), sharex=True) # 情绪趋势 ax1.plot(daily_sentiment['date'], daily_sentiment['avg_sentiment'], marker='o', linestyle='-', color='b', label='日均情绪') # 添加移动平均线 daily_sentiment['MA3'] = daily_sentiment['avg_sentiment'].rolling(window=3).mean() ax1.plot(daily_sentiment['date'], daily_sentiment['MA3'], marker='', linestyle='--', color='r', label='3日移动平均') ax1.set_title(f'{stock_name} ({stock_code}) 情绪时间序列趋势', fontsize=14) ax1.set_ylabel('情绪分数', fontsize=12) ax1.axhline(y=0, color='gray', linestyle='-', linewidth=0.5) ax1.grid(True) ax1.legend() # 发帖量 ax2.bar(daily_sentiment['date'], daily_sentiment['post_count'], color='g', alpha=0.7) ax2.set_xlabel('日期', fontsize=12) ax2.set_ylabel('发帖数量', fontsize=12) ax2.grid(True) plt.xticks(rotation=45) plt.tight_layout() # 保存图表 plot_path = f'output/plots/sentiment_trend_{stock_name}.png' plt.savefig(plot_path, dpi=100) plt.close() print(f" 图表已保存到: {plot_path}") # 输出统计信息 avg_sentiment = group['sentiment_score'].mean() pos_count = (group['sentiment_score'] > 0).sum() neg_count = (group['sentiment_score'] < 0).sum() neu_count = (group['sentiment_score'] == 0).sum() print(f" 平均情绪: {avg_sentiment:.4f}") print(f" 正面帖子: {pos_count}, 负面帖子: {neg_count}, 中性帖子: {neu_count}") # 生成汇总报告 print("\n生成汇总报告...") summary_data = [] for stock_code, group in stock_groups: stock_name = group['stock_name'].iloc[0] avg_sentiment = group['sentiment_score'].mean() post_count = len(group) pos_count = (group['sentiment_score'] > 0).sum() neg_count = (group['sentiment_score'] < 0).sum() neu_count = (group['sentiment_score'] == 0).sum() summary_data.append({ '股票代码': stock_code, '股票名称': stock_name, '帖子数量': post_count, '平均情绪': round(avg_sentiment, 4), '正面帖子': pos_count, '负面帖子': neg_count, '中性帖子': neu_count }) summary_df = pd.DataFrame(summary_data) summary_df.to_csv('output/sentiment_summary.csv', index=False, encoding='utf-8-sig') print("汇总报告已保存到: output/sentiment_summary.csv") print("\n" + "="*60) print("情绪时间序列分析完成!") print("="*60) if __name__ == '__main__': analyze_sentiment_trend()