前言

不知道从何记起,自己的博客小有ip后,经常会看后台访客数据,从哪个页面跳转进来?为什么他在这个页面呆了这么久,我写了什么呢?但是小张强迫症有一丢丢严重,宁死不访问入口地址,因为自己也算访客会造成『假』数据,而是在本地文件夹翻这个文章地址。所以想做一个爬虫把所有的文章标题、链接导出为文件

爬虫小记:那些年我们丢掉的文章01

做完之后又好奇他们怎么老扒我呢?!我经常会看到我的文章发在别的地方,公开处刑

所以这篇小记上满足三个需求:

  1. 导出博客文章数据包括标题、链接
  2. 爬取博客文章并导出md文件
  3. 利用百度统计收集博客信息:『热门文章、访客省份、访客国家』的PV、UV、停留时长等数据

本篇文章也是hexo-hot-article的一个开发前传,数据全是本地导出文件,因为一开始想做成Github工作流形式但是考虑到大部分小白对Github使用比较吃力,下一篇这三个需求都将集成接口方便玩hexo的朋友调用

导出博客数据

需求分析:提取博客内所有的『文章标题、文章地址、文章创建时间』,而博文页中封面和标题的div中都包含这些信息

思路分析:

  1. 获取博客有x页
  2. 当前页有x篇文章
  3. 抓取信息并写入json、md、sql、excel方便本地检索或为后期换用Github Actions做准备

其中博客页数又分三种情况,在不同的页数下网页结构是不一样的

文章栏分封面和信息两部分,都有标题和链接,但下面多一个『发表于』,所以就直接拿下面的信息一把梭哈

title_list = html.xpath ( '//*[@class="recent-post-info"]/a/@title' ) # 文章标题
link_list = html.xpath ( '//*[@class="recent-post-info"]/a/@href' ) # 文章地址
time_list = html.xpath ( '//*[@class="post-meta-date-created"]/text()' ) # 发表于

爬虫小记:那些年我们丢掉的文章02

具体代码实现

"""
@Author:张时贰
@Date:2022年12月16日
@CSDN:张时贰
@Blog:zhsher.cn
"""
import json
import sqlite3
import time
from lxml import etree
import requests
import asyncio
import aiohttp
import xlsxwriter


# 获取博客页数
def get_page_num(blog_url):
r = requests.get ( blog_url )
r = r.content.decode ( 'utf-8' )
html = etree.HTML ( r )
# 获取博客页数
try:
page_num = html.xpath ( '//*[@class="pagination"]/a[2]//text()' )[ 0 ] # 博客页数较多
except IndexError as e:
try:
e = str ( e )
print ( "error:" + e + ",博客页数较少没有 <span class=\"space\">…</span>" )
page_num = html.xpath ( '//*[@class="pagination"]/a//text()' )[ 0 ] # 博客页数少没有翻页按钮没有1...X而是直接显示1、2、3
except Exception as e:
e = str ( e )
print ( "error:" + e + ",博客只有一页" )
page_num = 1
return page_num


# request无法异步,换用aiohttp库做异步请求
async def requests_get(link):
async with aiohttp.ClientSession () as session:
async with session.get ( link ) as resp:
try:
text = await resp.text ()
return text
except Exception as e:
e = str ( e )
print ( f'请求状态码{resp.status},错误:{e}' )


# 抓取博客当前页文章的 标题 链接
async def get_info(url):
html = await requests_get ( url )
html = etree.HTML ( html )
title_list = html.xpath ( '//*[@class="recent-post-info"]/a/@title' )
link_list = html.xpath ( '//*[@class="recent-post-info"]/a/@href' )
# time_list = html.xpath ( '//*[@class="post-meta-date-created"]/text()' ) # 博客未开启更新于时不适用
time_list = html.xpath ( '//*[@class="post-meta-date-created"]/text()' )
if len ( time_list ) == 0:
time_list = html.xpath ( '//*[@class="post-meta-date"]/time/text()' )
post_num = len ( title_list ) # 文章数
title_link_time_list_page = [ ]
for i in range ( post_num ):
title = title_list[ i ]
link = link_list[ i ]
time = time_list[ i ]
tmp = {"title": title, "link": blog_url + link, "time": time}
title_link_time_list_page.append ( tmp )
return title_link_time_list_page


# 本地文件调试
def run(site_url):
global blog_url
blog_url = site_url
page_num = int ( get_page_num ( blog_url ) ) # 博客页数
# 如果博客只有一页
if page_num.__eq__ ( 1 ):
# 创建协程容器(获取事件循环)
loop = asyncio.get_event_loop ()
# 指定协程添加任务
tasks = [ asyncio.ensure_future ( get_info ( blog_url ) ) ]
# 运行任务(将所有的事件对象传入事件循环)
loop.run_until_complete ( asyncio.wait ( tasks ) ) # Fastapi中线程占用不能用until方法
# 如果博客多于1页
else:
# 创建协程容器(获取事件循环)
loop = asyncio.get_event_loop ()
# 指定协程添加任务
tasks = [ asyncio.ensure_future ( get_info ( blog_url ) ) ] # 第一页
for i in range ( 1, page_num ):
tasks.append ( asyncio.ensure_future ( get_info ( f'{blog_url}/page/{i + 1}/#content-inner' ) ) )
# 运行任务(将所有的事件对象传入事件循环)
loop.run_until_complete ( asyncio.wait ( tasks ) ) # Fastapi中线程占用不能用until方法

# 将异步返回值合并
title_link_time_list = [ ] # 列表字典 文章标题 链接 发布于
for task in tasks:
title_link_time_list = title_link_time_list + task.result ()
title_link_time_list = sorted ( title_link_time_list, key=lambda x: x[ 'time' ], reverse=True ) # 解决因为异步导致的乱序,按时间降序
return title_link_time_list


# 写入md文件
def save_md(row, post_info):
with open ( 'Post_Table.md', 'w', encoding='utf-8' ) as f:
for i in range ( row ):
f.write ( '| Post ' )
f.write ( '| \n' )
for i in range ( row ):
f.write ( '| :----: ' )
f.write ( '| \n' )
tmp = 0
for line in post_info:
tmp = tmp + 1
title = f'| [{line[ "title" ]}]({line[ "link" ]}) '
f.write ( title )
if row.__eq__ ( tmp ):
f.write ( '| \n' )
tmp = 0


# 写入json文件
def save_json(post_info):
with open ( 'Post_Table.json', 'w', encoding='utf-8' ) as f:
f.write ( json.dumps ( post_info, indent=4, ensure_ascii=False ) )


# 写入db
def save_sql(post_info):
# 将字典转集合,方便后续去重批量执行sql语句
link_title_set = set ()
for i in post_info:
tmp = (i[ 'title' ], i[ 'link' ], i[ 'time' ])
link_title_set.add ( tmp )

connect = sqlite3.connect ( 'Blog_Post.db' ) # 连接数据库
sql = connect.cursor () # 创建cursor对象
# sql.execute ( "DROP TABLE IF EXISTS Blog_Post" )
sql.execute ( "CREATE TABLE if NOT EXISTS Blog_Post( title text PRIMARY KEY, link text, time text)" )

# 去重
link_title_table = sql.execute ( "SELECT * FROM Blog_Post" ).fetchall ()
link_title_table = set ( link_title_table )
link_title_set = link_title_set - link_title_table

# 插入文章数据
# sql.execute (
# "INSERT INTO Blog_Post VALUES( '2022 11 13 月亮还是那个月亮','https://zzhsher.cn/posts/53805/','2022-11-13 00:10:24')" )
sql.executemany ( 'INSERT INTO Blog_Post(title,link,time) VALUES( ?, ?, ?)', link_title_set )

connect.commit ()
connect.close ()


# 写入excel
def save_excel(post_info):
workbook = xlsxwriter.Workbook ( 'Post_Table.xlsx' ) # 创建工作簿
worksheet = workbook.add_worksheet ( "博客数据" ) # 创建子表
format = {
# 'bold': True, # 字体加粗
'align': 'center', # 水平位置设置:居中
'valign': 'vcenter', # 垂直位置设置,居中
# 'font_size': 10, # '字体大小设置'
}
format = workbook.add_format ( format )
worksheet.set_column ( 0, 0, 50 ) # 行宽
worksheet.set_column ( 1, 1, 40 ) # 行宽
worksheet.set_column ( 2, 2, 30 ) # 行宽
worksheet.activate () # 激活表
title = [ '标题', '链接', '发布于' ] # 设置表头
worksheet.write_row ( 'A1', title, format ) # 从A1单元格开始写入表头
i = 2 # 从第二行开始写入数据
for j in range ( len ( post_info ) ):
insertData = [ post_info[ j ][ "title" ], post_info[ j ][ "link" ], post_info[ j ][ "time" ] ]
row = 'A' + str ( i )
worksheet.write_row ( row, insertData, format )
i += 1
workbook.close ()


if __name__ == '__main__':
start = time.time ()
blog_url = 'https://zhsher.cn' # 博客地址
# blog_url = 'https://blog.panghai.top' # 博客地址
# blog_url = 'https://luomengguo.top' # 博客地址
# blog_url = 'https://blog.leonus.cn' # 博客地址
# blog_url = 'https://www.chuckle.top' # 博客地址
# blog_url = 'https://anzhiy.cn' # 博客地址

row = 4 # 输出md文件列数
title_link_time_list = run ( blog_url ) # 异步抓取
print ( title_link_time_list )
# save_json ( title_link_time_list ) # 写入json文件
# save_md ( row, title_link_time_list ) # 写入md文件
# save_sql ( title_link_time_list ) # 写入sql文件
# save_excel ( title_link_time_list ) # 写入excel
print ( time.time () - start )

爬取博客文章

需求分析:通过给定链接获取页面html并转为markdown保存

遇到问题:

  1. 当蝴蝶主题开启懒加载,img标签中src不等于实际地址

    <a href="实际地址" data-fancybox="gallery" data-caption=""><img src="懒加载编码" data-lazy-src="实际地址" alt="" style=""></a>
  2. 代码框是一个table包裹的,转md格式会乱。而且当主题开启代码换行,table中分为两块,行号和代码,转md后会生成两个代码框,分别是行号和代码

    爬虫小记:那些年我们丢掉的文章03

三种思路,起初用的parsel库因为开启懒加载后无法获取图片地址,转用selenium库(南辕北辙了),后来又想到通过别的途径替换img中的src的值

经测试方案三用时最短,主要Selenium库是浏览器自动化工具,模拟打开浏览器加载请求资源太多了,这个不可控,而且网页渲染会多出很多其它标签,特别不好掌控

By Selenium Value

方案一:模拟打开浏览器,通过js操作:document.querySelectorAll('.post-content p img')[{i}].src = '{a_href},将img中的src地址换为a标签中的地址,之后获取网页html

def by_selenium_value(link):
'''
方案一:利用selenium库,img标签外包裹了a标签,a标签中有img的实际地址,通过dom操作将a.src给img.href
:param link: 文章链接
:return:
'''
# 1、创建浏览器对象 - 打开浏览器
driver = webdriver.Chrome () # 本地调试打开浏览器窗口
driver.maximize_window ()
# driver = webdriver.Chrome ( options=add_options () ) # 不开启浏览器的情况下调试
# 2、打开博文
driver.get ( link )
# 3、找到所有a、img标签
a_list = driver.find_elements ( By.XPATH, '//*[@id="article-container"]/p/a/img/..' )
# 4、替换所有的img.href=a.src
for i in range ( len ( a_list ) ):
a_href = a_list[ i ].get_attribute ( 'href' )
js = f"document.querySelectorAll('.post-content p img')[{i}].src = '{a_href}'"
driver.execute_script ( js )
# 5、获取文章内容部分的HTML
post_content = driver.find_element ( By.XPATH, '//*[@id="article-container"]' ).get_attribute ( 'outerHTML' )
post_title = driver.find_element ( By.CSS_SELECTOR, '.post-title' ).get_attribute ( 'innerHTML' )

# 6、二次处理
# 蝴蝶主题代码框用table绘制分为行号(.gutter)和代码区域(.code) table标签转md会生成 '---',解决: 用replace去除table,遍历去除<td class="gutter">xxx</td>
post_content = post_content.replace ( '<table><tbody><tr><td class="gutter">',
'<tbody><tr><td class="gutter">' ) # 主题配置代码不换行情况匹配规则 code_word_wrap: false
post_content = post_content.replace ( '<table><tbody><tr><td class="code">',
'<tbody><tr><td class="code">' ) # 主题配置代码不换行情况匹配规则 code_word_wrap: true
# 去除 .gutter
tmp = driver.find_elements ( By.CSS_SELECTOR, '.gutter' )
code_span = [ ] # 找出所有的.gutter标签
for i in tmp:
code_span.append ( i.get_attribute ( 'outerHTML' ) )
for i in code_span:
post_content = post_content.replace ( i, '' )

# 7、保存markdown
# markdown = html2text.html2text ( post_content )
#
# path = os.path.dirname ( __file__ )
# path = path + '/by_selenium_value'
# if not os.path.exists ( path ):
# os.mkdir ( path )
#
# with open ( f'{path}/{post_title}.md', 'w', encoding='utf-8' ) as file:
# file.write ( markdown )

if __name__ == '__main__':
url = 'https://zhsher.cn/posts/7884/'
# url = 'https://zhsher.cn/posts/53805/'

# print ( 'by_selenium_value运行时长:' )
# start = time.time ()
# by_selenium_value ( url )
# print ( time.time () - start )

By Selenium Scroll

方案二:模拟滚动到每一个包裹img标签的a标签,让图片加载出来,之后再获取网页html

def add_options():
# 创建谷歌浏览器驱动参数对象
chrome_options = webdriver.ChromeOptions ()
# 不加载图片
prefs = {"profile.managed_default_content_settings.images": 2}
chrome_options.add_experimental_option ( "prefs", prefs )
# 使用无界面浏览器模式!!
chrome_options.add_argument ( '--headless' )
# 使用隐身模式(无痕模式)
chrome_options.add_argument ( '--incognito' )
# 禁用GPU加速
chrome_options.add_argument ( '--disable-gpu' )
return chrome_options


def by_selenium_scroll(link):
'''
方案二:利用selenium库,img标签外包裹了a标签,找到所有的img的上一级a,滚动到a让图片加载(直接找图片会造成滚动坐标越界)
:param link: 文章链接
:return:
'''
# 1、创建浏览器对象 - 打开浏览器
driver = webdriver.Chrome () # 本地调试打开浏览器窗口
driver.maximize_window ()
# driver = webdriver.Chrome ( options=add_options () ) # 不开启浏览器的情况下调试
# 2、打开博文
driver.get ( link )
# 3、找到所有图片外包裹的a
img_list = driver.find_elements ( By.XPATH, '//*[@id="article-container"]/p/a/img/..' )
# img_list = driver.find_elements ( By.CSS_SELECTOR, '#article-container p a' )
# 4、滚动到图片位置
for img in img_list:
ActionChains ( driver ).scroll_to_element ( img ).perform ()
# 5、获取文章内容部分的HTML
post_content = driver.find_element ( By.XPATH, '//*[@id="article-container"]' ).get_attribute ( 'outerHTML' )
post_title = driver.find_element ( By.CSS_SELECTOR, '.post-title' ).get_attribute ( 'innerHTML' )

# 6、二次处理
# 蝴蝶主题代码框用table绘制分为行号(.gutter)和代码区域(.code) table标签转md会生成 '---',解决: 用replace去除table,遍历去除<td class="gutter">xxx</td>
post_content = post_content.replace ( '<table><tbody><tr><td class="gutter">',
'<tbody><tr><td class="gutter">' ) # 主题配置代码不换行情况匹配规则 code_word_wrap: false
post_content = post_content.replace ( '<table><tbody><tr><td class="code">',
'<tbody><tr><td class="code">' ) # 主题配置代码不换行情况匹配规则 code_word_wrap: true
# 去除 .gutter
tmp = driver.find_elements ( By.CSS_SELECTOR, '.gutter' )
code_span = [ ] # 找出所有的.gutter标签
for i in tmp:
code_span.append ( i.get_attribute ( 'outerHTML' ) )
for i in code_span:
post_content = post_content.replace ( i, '' )

# 7、保存markdown
# markdown = html2text.html2text ( post_content )
# path = os.path.dirname ( __file__ )
# path = path + '/by_selenium_scroll'
# if not os.path.exists ( path ):
# os.mkdir ( path )
#
# with open ( f'{path}/{post_title}.md', 'w', encoding='utf-8' ) as file:
# file.write ( markdown )

if __name__ == '__main__':
url = 'https://zhsher.cn/posts/7884/'
# url = 'https://zhsher.cn/posts/53805/'

# print ( 'by_selenium_scroll运行时长:' )
# start = time.time ()
# by_selenium_scroll ( url )
# print ( time.time () - start )

By Parsel

方案叁:又快又好用,太完美了!!!

def by_parsel_replace(link):
'''
方案叁:利用parsel库,img标签中 src和data-lazy-src替换: <img src="懒加载编码" data-lazy-src="实际地址" alt="" style="">
:param link: 文章链接
:return:
'''
# 1、爬取html代码
request = requests.get ( link )
html = request.content.decode ( 'utf-8' )
select = parsel.Selector ( html )

# 2、获取文章标题和内容
post_title = select.css ( '.post-title::text' ).get ()
post_content = select.css ( '.post-content' ).get ()

# 3、处理懒加载图片
post_content = post_content.replace ( 'src', 'lazy' ).replace ( 'data-lazy-lazy', 'src' ) # 处理图片

# 4、处理多余的标签
# 蝴蝶主题代码框用table绘制分为行号(.gutter)和代码区域(.code) table标签转md会生成 '---',解决: 用replace去除table,遍历去除<td class="gutter">xxx</td>
# 去<table>
post_content = post_content.replace ( '<table><tr><td class="gutter">',
'<tr><td class="gutter">' ) # 主题配置代码不换行情况匹配规则 code_word_wrap: false
post_content = post_content.replace ( '<table><tr><td class="code">',
'<tr><td class="code">' ) # 主题配置代码不换行情况匹配规则 code_word_wrap: true

# 去行号 当蝴蝶主题使用代码框换行时,代码框是单独的一个td标签,用replace处理掉
code_span = select.css ( '.gutter' ).getall ()
for i in code_span:
post_content = post_content.replace ( i, '' )

# 5、转md
markdown = html2text.html2text ( post_content )
# with open ( f'by_parsel_replace/{post_title}.md', 'w', encoding='utf-8' ) as file:
# file.writelines ( markdown )

# 6、去代码框前多余的换行
markdown = markdown.split ( '\n' )
file_content = [ ]
for i in range ( len ( markdown ) ):
if (markdown[ i ].__eq__ ( ' ' ) or markdown[ i ].__eq__ ( ' ' )):
continue
else:
file_content.append ( markdown[ i ] + '\n' )
# 7、保存文件
# path = os.path.dirname ( __file__ )
# path = path + '/by_parsel_replace'
# if not os.path.exists ( path ):
# os.mkdir ( path )
# print ( file_content )
# with open ( f'{path}/{post_title}.md', 'w', encoding='utf-8' ) as file:
# for i in file_content:
# file.write ( i )
return post_content

if __name__ == '__main__':
url = 'https://zhsher.cn/posts/7884/'
# url = 'https://zhsher.cn/posts/53805/'

# print ( 'by_parsel_replace运行时长:' )
# start = time.time ()
# by_parsel_replace ( url )
# print ( time.time () - start )

CSDN

到这里本应该结束了,但是偷博客的人总不可能一个博客主题做一种爬虫吧?何况光一个蝴蝶主题都要分不同的配置情况分析,我突然想起来以前不会Hexo的时候用CSDN做博客,而且少去了很多情况讨论,一把梭哈写到底

def csdn(link):
'''
爬取CSDN
:param link: 文章链接
:return:
'''
# (.*?):(.*)
# "$1":"$2",
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.105 Safari/537.36 Edg/84.0.522.52",
"Referer": "https://blog.csdn.net/tansty_zh"
}
request = requests.get ( link, headers=headers )
html = request.text
select = parsel.Selector ( html )
post_content = select.css ( '#article_content' ).get ()
post_title = select.css ( '.title-article::text' ).get ()
markdown = html2text.html2text ( post_content )

# 可能因为CSDN防盗机制,,html2text无法直接解析图片地址被分开 '![在这里插入图片描述](https://img-\n', 'blog.csdnimg.cn/90d.png#pic_center)\n'
# with open ( f'{post_title}.md', 'w', encoding='utf-8' ) as file:
# file.write (markdown)

# 解决办法
markdown = markdown.split ( '\n' )
file_content = [ ]
flag = 0

for i in range ( len ( markdown ) - 1 ):
# 如果是空说明换行,插入一个换行
if (markdown[ i ].__eq__ ( ' ' )):
# file_content.append ( '\n' )
continue
# 如果包含 '](https://img-' 说明下一次循环是后半个图片地址,flag作为标记
# img_pattern = re.compile(r'^!\[.*', re.M)
# img_pattern=img_pattern.match(content).group()
elif '](https://img-' in markdown[ i ]:
flag = 1
img_front_url = markdown[ i ]
# flag==1 说明这次循环是图片的后半段地址
elif flag == 1:
flag = 0
file_content.append ( img_front_url + markdown[ i ] + '\n' )
# 否则就是单纯的文本内容
else:
file_content.append ( markdown[ i ] + '\n' )

# path = os.path.dirname ( __file__ )
# path = path + '/CSDN'
# if not os.path.exists ( path ):
# os.mkdir ( path )
#
# with open ( f'{path}/{post_title}.md', 'w', encoding='utf-8' ) as file:
# for i in file_content:
# file.write ( i )
return post_content

if __name__ == '__main__':
# url = 'https://blog.csdn.net/qq_49488584/article/details/126884686?spm=1001.2014.3001.5502'
# print ( 'csdn爬取运行时长:' )
# start = time.time ()
# csdn ( url )
# print ( time.time () - start )

热门文章及访客统计

百度有个坑,文档说明可以获取到文章标题但实际请求是空的,所以上面的爬虫直接cv补充就好了。51la可以拿到文章标题但为什么选择百度?下一篇对比选百度的优势

"""
@Author:张时贰
@Date:2022年11月16日
@CSDN:张时贰
@Blog:zhsher.cn
"""
import datetime
import json
import os
import re
from lxml import etree

import requests

'''
百度统计API文档:https://tongji.baidu.com/api/manual/
ACESS_TOKEN 与 REFRESH_TOKEN 申请,查看API文档或以下说明

申请 token 的方法:
1.在百度统计控制台点击数据管理开通数据并获取 `API Key` 与 `Secret Key`
2.登录百度账号,获取 `code`(一次性且10min有效) :http://openapi.baidu.com/oauth/2.0/authorize?response_type=code&client_id={CLIENT_ID}&redirect_uri=oob&scope=basic&display=popup
其中 `{CLIENT_ID}` 为API key
3.获取 `ACCESS_TOKEN` :http://openapi.baidu.com/oauth/2.0/token?grant_type=authorization_code&code={CODE}&client_id={CLIENT_ID}&client_secret={CLIENT_SECRET}&redirect_uri=oob
其中 `{CLIENT_ID}`填写您的API Key
`{CLIENT_SECRET}`填写您的Secret Key
`{CODE}`填写刚才拿到的CODE
如果你对文档不清楚如何拿到 token 可以借助此项目接口
'''


def baidu_get_token(API_Key, Secret_Key, CODE):
'''
获取百度token
:param API_Key: 百度账号API_Key
:param Secret_Key: 百度账号Secret_Key
:param CODE: 登录并访问 http://openapi.baidu.com/oauth/2.0/authorize?response_type=code&client_id={你的API_Key}&redirect_uri=oob&scope=basic&display=popup
:return: {'access_token': access_token, 'refresh_token': refresh_token}
'''
payload = {
"grant_type": "authorization_code",
"redirect_uri": "oob",
"code": f'{CODE}',
"client_id": f'{API_Key}',
"client_secret": f'{Secret_Key}',
}
r = requests.post ( 'http://openapi.baidu.com/oauth/2.0/token', params=payload )
getData = r.json ()
try:
access_token = getData[ 'access_token' ] # 有效期一个月
refresh_token = getData[ 'refresh_token' ] # 有效期十年
print ( 'Acess_Token:' + '\n' + access_token )
print ( 'Refresh_Token:' + '\n' + refresh_token )
token = {'access_token': access_token, 'refresh_token': refresh_token}
return token
except Exception as e:
e = str ( e )
e = e + '获取失败,请保证code有效(十分钟有效期且仅能使用一次)'
return e


def baidu_refresh_token(API_Key, Secret_Key, refresh_token):
'''
通过 refresh_token 刷新
:param API_Key: 百度账号API_Key
:param Secret_Key: 百度账号Secret_Key
:param refresh_token: 百度账号refresh_token
:return: {'access_token': access_token, 'refresh_token': refresh_token}
'''
payload = {'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_id': API_Key,
'client_secret': Secret_Key
}
r = requests.post ( 'http://openapi.baidu.com/oauth/2.0/token', params=payload )
token = r.json ()
try:
access_token = token[ 'access_token' ] # 有效期一个月
refresh_token = token[ 'refresh_token' ] # 有效期十年
print ( "Token更新\nAcess_Token = " + access_token + "\nRefresh_Token = " + refresh_token )
token = {'access_token': access_token, 'refresh_token': refresh_token}
return token
except Exception as e:
e = str ( e )
return '错误信息:刷新后无' + e + '值 , 请检查 refresh_token 是否填写正确'


def getSiteList(access_token, domain):
'''
请求获取百度账号下所有的站点列表并处理得到自己博客的 site_id
:param access_token: 百度分析access_token
:param domain: 站点域名
:return: 构造 site_info 字典作为其它请求的 params
'''
payload = {'access_token': access_token}
r = requests.post ( 'https://openapi.baidu.com/rest/2.0/tongji/config/getSiteList', params=payload )
get_data = r.json ()
# 多个站点会返回多个 域名 和 id
# 成功示例:{'list': [{'site_id': 17960579, 'domain': 'zhsher.cn', 'status': 0, 'create_time': '2022-05-12 15:20:32', 'sub_dir_list': []}]}
# 失败示例:{'error_code': 110, 'error_msg': 'Access token invalid or no longer valid'}
# 利用 dic 对站点提取必要的 payload
getData = get_data[ 'list' ]
now = datetime.datetime.now ().date ()
now = datetime.datetime.strftime ( now, '%Y%m%d' ) # 纯字符串格式
site_info = {} # 定义一个字典,作为 post 请求的 payload
for i in getData:
if i[ 'domain' ].__eq__ ( domain ):
site_info[ 'site_id' ] = i[ 'site_id' ]
site_info[ 'domain' ] = i[ 'domain' ]
site_info[ 'status' ] = i[ 'status' ]
site_info[ 'start_date' ] = i[ 'create_time' ]
site_info[ 'end_date' ] = now
return site_info


def get_hot_article(access_token, domain):
'''
获取热文统计
:param access_token: 百度分析access_token
:param domain: 站点域名
:return: 以pv排序返回文章标题、链接、pv、uv、平均时长
'''
site_info = getSiteList ( access_token, domain ) # 站点基础数据
payload = {
'access_token': access_token,
'method': 'visit/toppage/a',
"metrics": "pv_count,visitor_count,average_stay_time", # 浏览量 访客数 平均访问时长s
}
payload.update ( site_info )
r = requests.post ( 'https://openapi.baidu.com/rest/2.0/tongji/report/getData', params=payload )
get_site_data = r.json ()

# 对 get_site_data 二次处理,去除主页、友链朋友圈、关于等信息,只保留 post 文章页信息
# 并构造一个字典 get_hot_article 包括 概览信息blog_general 每篇文章信息article_info
# 文章概览信息
blog_general = {"timeSpan": get_site_data[ 'result' ][ 'timeSpan' ][ 0 ], # 统计时间区间 eg:2022/05/12 - 2022/11/17
"total": get_site_data[ 'result' ][ 'total' ], # 百度统计控制台-受访页面中URL个数 但只有前20篇具体数据,需要购买商业版统计
"sum_pv_count": get_site_data[ 'result' ][ 'sum' ][ 0 ][ 0 ], # 总浏览量 PV
"sum_visitor_count": get_site_data[ 'result' ][ 'sum' ][ 0 ][ 1 ], # 总访客数 UV
"sum_average_stay_time": get_site_data[ 'result' ][ 'sum' ][ 0 ][ 2 ], # 总平均停留时长 单位 s
"top20_pv_count": get_site_data[ 'result' ][ 'pageSum' ][ 0 ][ 0 ], # 前20篇的总浏览量 PV
"top20_visitor_count": get_site_data[ 'result' ][ 'pageSum' ][ 0 ][ 1 ], # 前20篇的总访客数 UV
"top20_average_stay_time": get_site_data[ 'result' ][ 'pageSum' ][ 0 ][ 2 ], # 前20篇的平均访问时长
# 前20篇的总平均停留时长 单位 s
}

post_num = len ( get_site_data[ 'result' ][ 'items' ][ 0 ] ) # 避免有的人文章少超出索引

# 去除主页、友链朋友圈、关于等信息,只保留 post 文章页信息
index = 0
for i in range ( 0, post_num ):
if not re.match ( r'^https://' + site_info[ 'domain' ] + '/post/*',
get_site_data[ 'result' ][ 'items' ][ 0 ][ i - index ][ 0 ][ 'name' ] ):
del get_site_data[ 'result' ][ 'items' ][ 0 ][ i - index ]
del get_site_data[ 'result' ][ 'items' ][ 1 ][ i - index ]
index = index + 1
post_num = len ( get_site_data[ 'result' ][ 'items' ][ 0 ] ) # 去除处理后更新

# 单篇文章信息 百度统计没title:利用 xpath 爬取博客获取文章标题
article_info = [ ]
for i in range ( 0, post_num ):
tmp = {"title": get_title ( get_site_data[ 'result' ][ 'items' ][ 0 ][ i ][ 0 ][ 'name' ] ),
"url": get_site_data[ 'result' ][ 'items' ][ 0 ][ i ][ 0 ][ 'name' ], # 文章链接
"pv_count": get_site_data[ 'result' ][ 'items' ][ 1 ][ i ][ 0 ], # 浏览量PV
"visitor_count": get_site_data[ 'result' ][ 'items' ][ 1 ][ i ][ 1 ], # 访客数UV
"average_stay_time": get_site_data[ 'result' ][ 'items' ][ 1 ][ i ][ 2 ] # 平均停留时长
}
article_info.append ( tmp )

# 构造新字典并return
get_hot_article = {"blog_general": blog_general, "article_info": article_info}

# pwd = os.getcwd ()
# father_path_method1 = os.path.dirname ( pwd )
# file_path = father_path_method1 + "\\baidu.json"
# with open ( file_path, 'w', encoding='utf-8' ) as f:
# json.dump ( get_post_data, f, indent=4, ensure_ascii=False )
return get_hot_article


def get_title(url):
'''
补充百度分析不显示标题
:param url: 文章链接
:return: 文章标题
'''
r = requests.get ( url )
r = r.content.decode ( 'utf-8' )
html = etree.HTML ( r )
title = html.xpath ( '//*[@id="post-info"]/h1//text()' )[ 0 ]
return title


def get_visitor_province(access_token, domain):
'''
访客省份统计
:param access_token: 百度分析access_token
:param domain: 站点域名
:return: 省份UV
'''
site_info = getSiteList ( access_token, domain ) # 站点基础数据
payload = {
'access_token': access_token,
'method': 'overview/getDistrictRpt',
"metrics": "pv_count", # 获取pv_count或visitor_count
}
payload.update ( site_info )
r = requests.post ( 'https://openapi.baidu.com/rest/2.0/tongji/report/getData', params=payload )
get_data = r.json ()
get_visitor_province = [ ]
num = len ( get_data[ 'result' ][ 'items' ][ 0 ] )
for i in range ( 0, num ):
# get_data[ 'result' ][ 'items' ][ 1 ][ i ][ 0 ] # PV
tmp = {'name': get_data[ 'result' ][ 'items' ][ 0 ][ i ][ 0 ],
'value': get_data[ 'result' ][ 'items' ][ 1 ][ i ][ 0 ]}
get_visitor_province.append ( tmp )
return get_visitor_province


def get_visitor_counrty(access_token, domain):
'''
访客国家统计
:param access_token: 百度分析access_token
:param domain: 站点域名
:return: 国家UV
'''
site_info = getSiteList ( access_token, domain ) # 站点基础数据
payload = {
'access_token': access_token,
'method': 'visit/world/a',
"metrics": "pv_count,visitor_count,average_stay_time", # 浏览量 访客数 平均访问时长s
}
payload.update ( site_info )
r = requests.post ( 'https://openapi.baidu.com/rest/2.0/tongji/report/getData', params=payload )
get_data = r.json ()
get_visitor_country = [ ]
num = len ( get_data[ 'result' ][ 'items' ][ 0 ] )
for i in range ( 0, num ):
# get_data[ 'result' ][ 'items' ][ 0 ] # 国家
# get_data[ 'result' ][ 'items' ][ 1 ][ i ][ 0 ] # PV
# get_data[ 'result' ][ 'items' ][ 1 ][ i ][ 1 ] # UV
tmp = {'name': get_data[ 'result' ][ 'items' ][ 0 ][ i ][ 0 ][ 'name' ],
'value': get_data[ 'result' ][ 'items' ][ 1 ][ i ][ 0 ]}
get_visitor_country.append ( tmp )
return get_visitor_country


if __name__ == '__main__':
API_Key = ''
Secret_Key = ''
CODE = ''
refresh_token = ''
# 测试
# print ( baidu_get_token ( API_Key, Secret_Key, CODE ) )
# print ( baidu_refresh_token ( API_Key, Secret_Key, refresh_token ) )

# access_token = ''
# domain = 'zhsher.cn'
# print ( get_hot_article ( access_token, domain ) )
# print ( get_visitor_province ( access_token, domain ) )
# print ( get_visitor_counrty ( access_token, domain ) )

小结

感谢以下hexo框架的好友,为我提供了各种测试数据😁

这次爬虫得到了不少实践及代码分析思维

  1. selenium库主要用来网页自动化操作对爬取并不友好(居然还可以执行js操作,好用!)
  2. etree库需要将html转为etree对象,之后通过xpath解析,etree对象不能直接打印输出html文本
  3. 最强的parsel库yyds!!!不仅支持众多CSS选择器,支持xpath解析,还可以直接输出html文本,对有一定CSS前端基础的人极易上手