新闻资讯软件下载(新闻个性化推荐系统源码之构建离线文章画像)

wufei123 发布于 2023-12-25 阅读(266)

文章画像由关键词和主题词组成,我们将每个词的 TF-IDF 权重和 TextRank 权重的乘积作为关键词权重,筛选出权重最高的 K 个词作为关键词,将 TextRank 权重最高的 K 个词与 TF-IDF 权重最高的 K 个词的共现词作为主题词。

首先,在 Hive 中创建文章数据库 article 及相关表,其中表 article_data 用于存储完整的文章信息,表 idf_keywords_values 用于存储关键词和索引信息,表 tfidf_keywords_values 用于存储关键词和 TF-IDF 权重信息,表 textrank_keywords_values 用于存储关键词和 TextRank 权重信息,表 article_profile 用于存储文章画像信息。

-- 创建文章数据库createdatabaseifnotexists article comment"artcile information" location /user/hive/warehouse/article.db/

; -- 创建文章信息表CREATETABLE article_data ( article_id BIGINTcomment"article_id", channel_id INT

comment"channel_id", channel_name STRINGcomment"channel_name", title STRINGcomment"title"

, contentSTRINGcomment"content", sentence STRINGcomment"sentence" ) COMMENT"toutiao news_channel"

LOCATION /user/hive/warehouse/article.db/article_data; -- 创建关键词索引信息表CREATETABLE idf_keywords_values ( keyword

STRINGcomment"article_id", idf DOUBLEcomment"idf", indexINTcomment"index" ); -- 创建关键词TF-IDF权重信息表

CREATETABLE tfidf_keywords_values ( article_id INTcomment"article_id", channel_id INTcomment"channel_id"

, keyword STRINGcomment"keyword", tfidf DOUBLEcomment"tfidf" ); -- 创建关键词TextRank权重信息表

CREATETABLE textrank_keywords_values ( article_id INTcomment"article_id", channel_id INTcomment

"channel_id", keyword STRINGcomment"keyword", textrank DOUBLEcomment"textrank" ); -- 创建文章画像信息表

CREATETABLE article_profile ( article_id INTcomment"article_id", channel_id INTcomment"channel_id"

, keyword mapcomment"keyword", topics arraycomment"topics" ); 计算文章完整信息为了计算文章画像,需要将文章信息表(news_article_basic)、文章内容表(news_article_content)及频道表(news_channel)进行合并,从而得到完整的文章信息,通常使用 Spark SQL 进行处理。

通过关联表 news_article_basic, news_article_content 和 news_channel 获得文章完整信息,包括 article_id, channel_id, channel_name, title, content,这里获取一个小时内的文章信息。

spark.sql("use toutiao") _now = datetime.today().replace(minute=0, second=0, microsecond=0) start = datetime.strftime(_now + timedelta(days=0, hours=-1, minutes=0), "

%Y-%m-%d %H:%M:%S") end = datetime.strftime(_now, "%Y-%m-%d %H:%M:%S") basic_content = spark.sql( "

select a.article_id, a.channel_id, a.title, b.content from news_article_basic a " "innerjoin

news_article_content b on a.article_id=b.article_id where a.review_time >= {}" "and a.review_time <

{}and a.status = 2".format(start, end)) basic_content.registerTempTable("temp_article") channel_basic_content = spark.sql( "

select t.*, n.channel_name from temp_article t leftjoin news_channel n on t.channel_id=n.channel_id")

channel_basic_content 结果如下所示

利用 concat_ws() 方法,将 channel_name, title, content 这 3 列数据合并为一列 sentence,并将结果写入文章完整信息表 article_data 中import

pyspark.sql.functions as F spark.sql("use article") sentence_df = channel_basic_content.select("article_id"

, "channel_id", "channel_name", "title", "content", \ F.concat_ws(

",", channel_basic_content.channel_name, channel_basic_content.title, channel_basic_content.content ).alias(

"sentence") ) del basic_content del channel_basic_content gc.collect()

# 垃圾回收 sentence_df.write.insertInto("article_data") sentence_df 结果如下所示,文章完整信息包括 article_id, channel_id, channel_name, title, content, sentence,其中 sentence 为 channel_name, title, content 合并而成的长文本内容

计算 TF-IDF前面我们得到了文章的完整内容信息,接下来,我们要先对文章进行分词,然后计算每个词的 TF-IDF 权重,将 TF-IDF 权重最高的 K 个词作为文章的关键词首先,读取文章信息spark.sql("。

use article") article_dataframe = spark.sql("select * from article_data") 利用 mapPartitions() 方法,对每篇文章进行分词,这里使用的是

jieba 分词器words_df = article_dataframe.rdd.mapPartitions(segmentation).toDF(["article_id", "channel_id"

, "words"]) defsegmentation(partition):import os import re import jieba import jieba.analyse

import jieba.posseg as pseg import codecs abspath = "/root/words"# 结巴加载用户词典 userdict_path = os.path.join(abspath,

"ITKeywords.txt") jieba.load_userdict(userdict_path) # 停用词文本 stopwords_path = os.path.join(abspath,

"stopwords.txt") defget_stopwords_list():"""返回stopwords列表""" stopwords_list = [i.strip()

for i in codecs.open(stopwords_path).readlines()] return stopwords_list # 所有的停用词列表 stopwords_list = get_stopwords_list()

# 分词defcut_sentence(sentence):"""对切割之后的词语进行过滤,去除停用词,保留名词,英文和自定义词库中的词,长度大于2的词""" seg_list = pseg.lcut(sentence) seg_list = [i

for i in seg_list if i.flag notin stopwords_list] filtered_words_list = [] for seg in

seg_list: if len(seg.word) <= 1: continueelif seg.flag == "eng":

if len(seg.word) <= 2: continueelse: filtered_words_list.append(seg.word)

elif seg.flag.startswith("n"): filtered_words_list.append(seg.word) elif seg.flag

in ["x", "eng"]: # 是自定一个词语或者是英文单词 filtered_words_list.append(seg.word) return

filtered_words_list for row in partition: sentence = re.sub("", "", row.sentence)

# 替换掉标签数据 words = cut_sentence(sentence) yield row.article_id, row.channel_id, words

words_df 结果如下所示,words 为将 sentence 分词后的单词列表

使用分词结果对词频统计模型(CountVectorizer)进行词频统计训练,并将 CountVectorizer 模型保存到 HDFS 中from pyspark.ml.feature import CountVectorizer

# vocabSize是总词汇的大小,minDF是文本中出现的最少次数 cv = CountVectorizer(inputCol="words", outputCol="countFeatures", vocabSize=

200*10000, minDF=1.0) # 训练词频统计模型 cv_model = cv.fit(words_df) cv_model.write().overwrite().save("hdfs://hadoop-master:9000/headlines/models/CV.model"

) 加载 CountVectorizer 模型,计算词频向量from pyspark.ml.feature import CountVectorizerModel cv_model = CountVectorizerModel.load(

"hdfs://hadoop-master:9000/headlines/models/CV.model") # 得出词频向量结果 cv_result = cv_model.transform(words_df)

cv_result 结果如下所示,countFeatures 为词频向量,如 (986, [2, 4, ...], [3.0, 5.0, ...]) 表示总词汇的大小为 986 个,索引为 2 和 4 的词在某篇文章中分别出现 3 次和 5 次,

得到词频向量后,再利用逆文本频率模型( IDF ),根据词频向量进行 IDF 统计训练,并将 IDF 模型保存到 HDFSfrom pyspark.ml.feature import IDF idf = IDF(inputCol=

"countFeatures", outputCol="idfFeatures") idf_model = idf.fit(cv_result) idf_model.write().overwrite().save(

"hdfs://hadoop-master:9000/headlines/models/IDF.model") 我们已经分别计算了文章信息中每个词的 TF 和 IDF,这时就可以加载 CountVectorizer 模型和 IDF 模型,计算每个词的 TF-IDF

from pyspark.ml.feature import CountVectorizerModel cv_model = CountVectorizerModel.load("hdfs://hadoop-master:9000/headlines/models/countVectorizerOfArticleWords.model"

) from pyspark.ml.feature import IDFModel idf_model = IDFModel.load("hdfs://hadoop-master:9000/headlines/models/IDFOfArticleWords.model"

) cv_result = cv_model.transform(words_df) tfidf_result = idf_model.transform(cv_result) tfidf_result

结果如下所示,idfFeatures 为 TF-IDF 权重向量,如 (986, [2, 4, ...], [0.3, 0.5, ...]) 表示总词汇的大小为 986 个,索引为 2 和 4 的词在某篇文章中的 TF-IDF 值分别为 0.3 和 0.5

对文章的每个词都根据 TF-IDF 权重排序,保留 TF-IDF 权重最高的前 K 个词作为关键词def sort_by_tfidf(partition): TOPK = 20 for row in partition:

# 找到索引与IDF值并进行排序 _dict = list(zip(row.idfFeatures.indices, row.idfFeatures.values)) _dict = sorted(_dict,

key=lambda x: x[1], reverse=True) result = _dict[:TOPK] for word_index, tfidf inresult

: yield row.article_id, row.channel_id, int(word_index), round(float(tfidf), 4) keywords_by_tfidf = tfidf_result.rdd.mapPartitions(sort_by_tfidf).toDF([

"article_id", "channel_id", "index", "weights"]) keywords_by_tfidf 结果如下所示,每篇文章保留了权重最高的 K 个单词,index 为单词索引,weights 为对应单词的 TF-IDF 权重

接下来,我们需要知道每个词的对应的 TF-IDF 值,可以利用 zip() 方法,将所有文章中的每个词及其 TF-IDF 权重组成字典,再加入索引列,由此得到每个词对应的 TF-IDF 值,将该结果保存到 idf_keywords_values 表

keywords_list_with_idf = list(zip(cv_model.vocabulary, idf_model.idf.toArray())) def append_index(data):

forindex in range(len(data)): data[index] = list(data[index]) # 将元组转为list data[index].append(

index) # 加入索引 data[index][1] = float(data[index][1]) append_index(keywords_list_with_idf) sc = spark.sparkContext rdd = sc.parallelize(keywords_list_with_idf)

# 创建rdd idf_keywords = rdd.toDF(["keywords", "idf", "index"]) idf_keywords.write.insertInto(idf_keywords_values

) idf_keywords 结果如下所示,包含了所有单词的名称、TF-IDF 权重及索引

通过 index 列,将 keywords_by_tfidf 与表 idf_keywords_values 进行连接,选取文章 ID、频道 ID、关键词、TF-IDF 权重作为结果,并保存到 TF-IDF 关键词表 tfidf_keywords_values

keywords_index = spark.sql("select keyword, index idx from idf_keywords_values") keywords_result = keywords_by_tfidf.

join(keywords_index, keywords_index.idx == keywords_by_tfidf.index).select(["article_id", "channel_id"

, "keyword", "weights"]) keywords_result.write.insertInto("tfidf_keywords_values") keywords_result 结果如下所示,keyword 和 weights 即为所有词在每个文章中的 TF-IDF 权重

计算 TextRank前面我们已经计算好了每个词的 TF-IDF 权重,为了计算关键词,还需要得到每个词的 TextRank 权重,接下来,还是先读取文章完整信息spark.sql("use article

") article_dataframe = spark.sql("select * from article_data") 对文章 sentence 列的内容进行分词,计算每个词的 TextRank 权重,并将每篇文章 TextRank 权重最高的 K 个词保存到 TextRank 结果表 textrank_keywords_values

textrank_keywords_df = article_dataframe.rdd.mapPartitions(textrank).toDF(["article_id", "channel_id"

, "keyword", "textrank"]) textrank_keywords_df.write.insertInto("textrank_keywords_values") TextRank 计算细节:分词后只保留指定词性的词,滑动截取长度为 K 的窗口,计算窗口内的各个词的投票数

deftextrank(partition):import os import jieba import jieba.analyse import jieba.posseg as

pseg import codecs abspath = "/root/words"# 结巴加载用户词典 userDict_path = os.path.join(abspath,

"ITKeywords.txt") jieba.load_userdict(userDict_path) # 停用词文本 stopwords_path = os.path.join(abspath,

"stopwords.txt") defget_stopwords_list():"""返回stopwords列表""" stopwords_list = [i.strip()

for i in codecs.open(stopwords_path).readlines()] return stopwords_list # 所有的停用词列表 stopwords_list = get_stopwords_list()

classTextRank(jieba.analyse.TextRank):def__init__(self, window=20, word_min_len=2): super(TextRank, self).__init__() self.span = window

# 窗口大小 self.word_min_len = word_min_len # 单词的最小长度# 要保留的词性,根据jieba github ,具体参见https://github.com/baidu/lac

self.pos_filt = frozenset( (n, x, eng, f, s, t, nr, ns, nt, "nw", "nz",

"PER", "LOC", "ORG")) defpairfilter(self, wp):"""过滤条件,返回True或者False"""if wp.flag == "eng":

if len(wp.word) = self.word_min_len \

and wp.word.lower() notin stopwords_list: returnTrue# TextRank过滤窗口大小为5,单词最小为2 textrank_model = TextRank(window=

5, word_min_len=2) allowPOS = (n, "x", eng, nr, ns, nt, "nw", "nz", "c") for row in partition: tags = textrank_model.textrank(row.sentence, topK=

20, withWeight=True, allowPOS=allowPOS, withFlag=False) for tag in tags: yield row.article_id, row.channel_id, tag[

0], tag[1] textrank_keywords_df 结果如下所示,keyword 和 textrank 即为每个单词在文章中的 TextRank 权重

画像计算我们计算出 TF-IDF 和 TextRank 后,就可以计算关键词和主题词了,读取 TF-IDF 权重idf_keywords_values = oa.spark.sql("select * from idf_keywords_values"

) 读取 TextRank 权重textrank_keywords_values = oa.spark.sql("select * from textrank_keywords_values") 通过

keyword 关联 TF-IDF 权重和 TextRank 权重keywords_res = textrank_keywords_values.join(idf_keywords_values, on

=[keyword], how=left) 计算 TF-IDF 权重和 TextRank 权重的乘积作为关键词权重keywords_weights = keywords_res.withColumn(weights

, keywords_res.textrank * keywords_res.idf).select(["article_id", "channel_id", "keyword", "weights"])

keywords_weights 结果如下所示

这里,我们需要将相同文章的词都合并到一条记录中,将 keywords_weights 按照 article_id 分组,并利用 collect_list() 方法,分别将关键词和权重合并为列表keywords_weights.registerTempTable(temp) keywords_weights = spark.sql("

select article_id, min(channel_id) channel_id, collect_list(keyword) keywords, collect_list(weights) weights

from temp groupby article_id")` keywords_weights 结果如下所示,keywords 为每篇文章的关键词列表,weights 为关键词对应的权重列表

为了方便查询,我们需要将关键词和权重合并为一列,并存储为 map 类型,这里利用 dict() 和 zip() 方法,将每个关键词及其权重组合成字典defto_map(row):return row.article_id, row.channel_id, dict(zip(row.keywords, row.weights)) article_keywords = keywords_weights.rdd.map(to_map).toDF([

article_id, channel_id, keywords]) article_keywords 结果如下所示,keywords 即为每篇文章的关键词和对应权重

前面我们计算完了关键词,接下来我们将 TF-IDF 和 TextRank 的共现词作为主题词,将 TF-IDF 权重表 tfidf_keywords_values 和 TextRank 权重表 textrank_keywords_values 进行关联,并利用

collect_set() 对结果进行去重,即可得到 TF-IDF 和 TextRank 的共现词,即主题词topic_sql = """ select t.article_id article_id2, collect_set(t.keyword) topics from tfidf_keywords_values t inner join textrank_keywords_values r where t.keyword=r.keyword group by article_id2 """

article_topics = spark.sql(topic_sql) article_topics 结果如下所示,topics 即为每篇文章的主题词列表

最后,将主题词结果和关键词结果合并,即为文章画像,保存到表 article_profilearticle_profile = article_keywords.join(article_topics, article_keywords.article_id==article_topics.article_id2).

select(["article_id", "channel_id", "keywords", "topics"]) article_profile.write.insertInto("article_profile"

) 文章画像数据查询测试hive> select * from article_profile limit 1; OK 2617 {"策略":0.3973770571351729,"jpg":

0.9806348975390871,"用户":1.2794959063944176,"strong":1.6488457985625076,"文件":0.28144603583387057,"逻辑":

0.45256526469610714,"形式":0.4123994242601279,"全自":0.9594604850547191,"h2":0.6244481634710125,"版本":0.44280276959510817

,"Adobe":0.8553618185108718,"安装":0.8305037437573172,"检查更新":1.8088946300014435,"产品":0.774842382276899,

"下载页":1.4256311032544344,"过程":0.19827163395829256,"json":0.6423301791599972,"方式":0.582762869780791,"退出应用"

:1.2338671268242603,"Setup":1.004399549339134} ["Electron","全自动","产品","版本号","安装包","检查更新","方案","版本",

"退出应用","逻辑","安装过程","方式","定性","新版本","Setup","静默","用户"] Time taken: 0.322 seconds, Fetched: 1 row(s) Apscheduler 定时更新

定义离线更新文章画像的方法,首先合并最近一个小时的文章信息,接着计算每个词的 TF-IDF 和 TextRank 权重,并根据 TF-IDF 和 TextRank 权重计算得出文章关键词和主题词,最后将文章画像信息保存到 Hive

defupdate_article_profile():""" 定时更新文章画像 :return: """ ua = UpdateArticle() # 合并文章信息

sentence_df = ua.merge_article_data() if sentence_df.rdd.collect(): textrank_keywords_df, keywordsIndex = ua.generate_article_label() ua.get_article_profile(textrank_keywords_df, keywordsIndex)

利用 Apscheduler 添加定时更新文章画像任务,设定每隔 1 个小时更新一次from apscheduler.schedulers.blocking import BlockingScheduler

from apscheduler.executors.pool import ProcessPoolExecutor # 创建scheduler,多进程执行 executors = { default

: ProcessPoolExecutor(3) } scheduler = BlockingScheduler(executors=executors) # 添加一个定时更新文章画像的任务,每隔1个小时运行一次

scheduler.add_job(update_article_profile, trigger=interval, hours=1) scheduler.start() 利用 Supervisor 进行进程管理,配置文件如下

[program:offline]environment=JAVA_HOME=/root/bigdata/jdk,SPARK_HOME=/root/bigdata/spark,HADOOP_HOME=/root/bigdata/hadoop,PYSPARK_PYTHON=/miniconda2/envs/reco_sys/bin/python,PYSPARK_DRIVER_PYTHON=/miniconda2/envs/reco_sys/bin/python

command=/miniconda2/envs/reco_sys/bin/python /root/toutiao_project/scheduler/main.py directory=/root/toutiao_project/scheduler

user=root autorestart=trueredirect_stderr=truestdout_logfile=/root/logs/offlinesuper.log loglevel=info

stopsignal=KILL stopasgroup=truekillasgroup=true

亲爱的读者们,感谢您花时间阅读本文。如果您对本文有任何疑问或建议,请随时联系我。我非常乐意与您交流。

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

大众 新闻22005