
本帖最后由 王二小 于 20191229 18:23 编辑
业余Python爱好者,之前看到福娃分享的网站umei.fun,就写了个脚本,自动下载图片和视频,同时自动合并分段视频这个网站保存图片视频用的ipfs,每次爬的时候只需要对网站请求一次获取图片地址,应该不会对网站有太大影响
其实写出来很久了,但是账号密码登录一直有问题,弄了好久直到现在也没搞定,就先把脚本发出来,有大佬研究研究可以帮助完善一下
python3
下载付费资源需要使用cookies登录,如下图 登录后 按F12 复制红框中 cookies: 后面的字在脚本同一文件夹里新建txt文件,文件名umei.fun.cook.txt把刚才复制的字符串粘贴到里面 保存
需要安装以下模块
pip install requests
pip install natsort
pip install bs4
pip install hyper复制代码
使用时命令格式如下 最后一个参数是下载线程数,一个线程负责下载一个文件,可以不写,不写的同时下载10个文件,最大限制为50
python 脚本文件名 帖子地址 [同时下载图片/视频线程数]复制代码文件会保存在脚本同一个文件夹里的 umei.fun/xxx 文件夹里
下面是脚本代码,复制后保存为 .py 文件
注释写的很详细 有需要自己定制的可以修改
代码比较渣,见谅
# * coding:utf8 *
import os
import re
import sys
import requests
import threading
from glob import iglob
from natsort import natsorted
from bs4 import BeautifulSoup
from urllib.parse import quote
from hyper.contrib import HTTP20Adapter
# 当前脚本真实路径
_current_script_realpath = os.path.realpath(__file__)
# 当前脚本所在文件夹
_current_script_dir = os.path.dirname(_current_script_realpath)
# 当前脚本文件名
_current_script_name = os.path.basename(_current_script_realpath)
# 当前系统路径分隔符
_current_path_sep = os.path.sep
# 用户名 暂不可用 报错 The change you wanted was rejected (422)
_user = “”
# 密码 暂不可用 报错 The change you wanted was rejected (422)
_passwd = “”
# cookies文件路径
_cookies_path = “%s%sumei.fun.cook.txt”%(_current_script_dir,_current_path_sep)
# 默认下载线程数
_down_thread_num = 10
# 登录表单
_form_login_str = “utf8=|urf8_|&user[email]=%s&user[password]=%s&user[remember_me]=0&user[remember_me]=1&commit=登录&authenticity_token=”%(_user,_passwd)
# 网站host
_host_umei = “umei.fun”
# 网站域名
_url_umei = “https://%s”%_host_umei
# 登录页面
_url_login = “%s/users/sign_in”%_url_umei
# 下载文件夹
_down_dir = “%s/%s”%(_current_script_dir,_host_umei)
# UserAgent
_user_agent = “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3729.169 Safari/537.36”
# 请求头 访问帖子用 :path 需要改
_headers_post = {“:authority”:_host_umei,”:method”:”GET”,”:scheme”:”https”,
“accept”:”text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signedexchange;v=b3″,
“acceptencoding”:”gzip, deflate, br”,”acceptlanguage”:”zhCN,zh;q=0.9″,”cachecontrol”:”nocache”,
“dnt”:”1″,”pragma”:”nocache”,”upgradeinsecurerequests”:”1″,”useragent”:_user_agent
}
# 请求头 请求图片用 Host需要改
_headers_down = {
“AcceptEncoding”:”gzip, deflate, br”,”AcceptLanguage”:”zhCN,zh;q=0.9″,”CacheControl”:”nocache”,
“Accept”:”text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signedexchange;v=b3″,
“Connection”:”keepalive”,”DNT”:”1″,”Host”:””,”Pragma”:”nocache”,”UpgradeInsecureRequests”:”1″,”UserAgent”:_user_agent
}
# 登录headers
_headers_login = {“:authority”:_host_umei,”:path”:”/users/sign_in”,”:scheme”:”https”,
“accept”:”text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signedexchange;v=b3″,
“acceptencoding”:”gzip, deflate, br”,”acceptlanguage”:”zhCN,zh;q=0.9″,”cachecontrol”:”nocache”,
“contenttype”:”application/xwwwformurlencoded”,”dnt”:”1″,”origin”:_url_umei,”pragma”:”nocache”,
“referer”:_url_login,”secfetchmode”:”navigate”,”secfetchsite”:”sameorigin”,”secfetchuser”:”?1″,
“upgradeinsecurerequests”:”1″,”useragent”:_user_agent
}
def main():
# 脚本入参
argvs = sys.argv
# 使用说明
tip1 = “\n* 如需下载付费资源,确保 %s 文件存在且保存了cookies信息”%(_cookies_path)
tip2 = “\n命令格式: python %s 页面地址 [下载线程数(可选,默认10,最大50)]”%(_current_script_name)
tip3 = “\n 示例: python %s %s/posts/4255 10″%(_current_script_name,_url_umei)
tips = “%s\n%s%s”%(tip1,tip2,tip3)
if len(argvs) < 2 or len(argvs) > 3:
print(tips)
else:
# 帖子地址
url_post = argvs[1]
if url_post:
if not url_post.startswith(‘%s/posts/’%_url_umei):
print(“\n页面地址格式有误\n%s”%tips)
else:
if len(argvs) == 3:
# 线程数
down_thread_num_this = argvs[2]
if down_thread_num_this.isdigit():
down_thread_num_this = int(down_thread_num_this)
if down_thread_num_this < 1:
print(“\n线程数最必须大于0\n%s”%tips)
down_thread_num_this = None
elif down_thread_num_this > 50:
print(“\n线程数最大为50\n%s”%tips)
down_thread_num_this = None
else:
print(“\n线程数必须为数字\n%s”%tips)
down_thread_num_this = None
else:
# 默认线程数
down_thread_num_this = _down_thread_num
if down_thread_num_this:
down(url_post,down_thread_num_this)
else:
print(“\n必须输入页面地址\n%s”%tips)
def down(post_url,down_thread_num):
“””
执行下载
post_url: 帖子地址
down_thread_num: 下载线程”””
httpSession = requests.session()
# 启用http2请求头支持
httpSession.mount(_url_umei,HTTP20Adapter())
# 登录
is_cookies_updated = login_cook(httpSession)
# 获取资源路径 请求头 :path 字段用
header_path = post_url.replace(_url_umei,””)
# 更新headers
headers_post = {**_headers_post,**{“:path”:header_path}}
# 更新headers
#httpSession.headers.update(headers_post)
# 访问帖子地址,获取网页title作为下载地址
res = httpSession.get(post_url,headers=headers_post)
bsoup = BeautifulSoup(res.text,’html.parser’)
res.close()
if bsoup.select_one(‘body’).text == ‘You are being redirected.’:
print(“\n* 此资源可能为付费资源,请先将cookies信息写入 %s”%_cookies_path)
return
# 将新cookies写入文件
if is_cookies_updated:
write_cookies(_cookies_path,httpSession.cookies)
# 网页title 去除路径不支持的字符
web_title = bsoup.title.text.replace(” | “,”_”).replace(“\”,””).replace(“/”,””).replace(“””,””).replace(“‘”,””).replace(“?”,””).replace(” “,”_”)
# 创建下载文件夹
this_down_path = create_down_dir(_down_dir,web_title)
# 获取图片下载地址
img_file_url_list = get_file_url(bsoup)
print(“========== 共有 %d 张图片 ==========”%(len(img_file_url_list)))
# 下载
downThreads = downRecursion(this_down_path, img_file_url_list, down_thread_num, ‘img’)
# 获取m3u8_url_list
m3u8_url_list = get_m3u8_url(bsoup)
# 获取ts地址
ts_url_list = get_ts_url(m3u8_url_list)
print(“========== 共有 %d 个ts文件 ==========”%(len(ts_url_list)))
# 下载ts文件
downThreads = downRecursion(this_down_path, ts_url_list, down_thread_num, ‘video’) + downThreads
# 防止提前退出
for th in downThreads:
th.join()
print(“全部下载完毕”)
# 合并ts文件
merge_ts(this_down_path)
print(“ts文件合并完成”)
def get_ts_url(m3u8_url_list):
“””
从m3u8地址列表中获取视频ts地址”””
ts_url_list = []
for m3u8_url in m3u8_url_list:
# m3u8 地址
m3u8_url_prefix = m3u8_url[:m3u8_url.rindex(‘/’)]
# m3u8 名
m3u8_name = m3u8_url[m3u8_url.rindex(‘/’)+1:m3u8_url.rindex(‘.’)]
ts_url_list.append(m3u8_name)
# 访问m3u8地址 获取ts文件
res = requests.get(m3u8_url)
m3u8_text = res.text
res.close()
# 查找ts文件名
ts_result = re.findall(r'(.*\.ts)’,m3u8_text,re.I|re.M)
for ts_name in ts_result:
ts_url = “%s/%s”%(m3u8_url_prefix,ts_name)
ts_url_list.append(ts_url)
return ts_url_list
def get_m3u8_url(bsoup):
“””
从BeautifulSoup对象中获取要下载的video文件url”””
m3u8_url_list = []
# 视频区域
div_video_script_eles = bsoup.select(‘body div.container div.row div.container div.row script’)
for div_video_script_ele in div_video_script_eles:
script_text = div_video_script_ele.text
# 查找视频m3u8地址
reresult = re.search(r’source: *\'(http.*\.m3u8)\”,script_text,re.I|re.M)
if reresult:
m3u8_url_list.append(reresult.group(1))
return m3u8_url_list
def get_file_url(bsoup):
“””
从BeautifulSoup对象中获取要下载的文件url”””
file_url_list = []
# 图片区域 付费套图 层次不同,不在body标签内
div_img_ele = bsoup.select_one(‘div.container div.row div.card’)
#div_img_ele = bsoup.select_one(‘body div.container div.row div.card’)
# 所有img标签
img_ele_all = div_img_ele.select(‘img’)
for img_ele in img_ele_all:
file_url_list.append(img_ele[‘src’])
return file_url_list
def create_down_dir(down_dir_father,down_dir_name):
“””
文件夹不存在的话创建文件夹
down_dir_father:父级文件夹
down_dir_name: 下载文件夹名”””
# 拼接路径
this_down_path = “%s/%s”%(down_dir_father,down_dir_name)
this_down_path = this_down_path.replace(‘//’,’/’)
if not os.path.exists(this_down_path):
print(“文件夹 %s 不存在,创建”%this_down_path)
os.makedirs(this_down_path)
return this_down_path
def downRecursion(this_down_path, file_url_all, down_thread_num_this, file_type):
“””
遍历下载
this_down_path: 下载文件夹
file_url_all: 所有下载信息
down_thread_num_this: 下载最大线程数
file_type: 文件类型 video/img “””
# 下载线程
downThreads = []
# 遍历获取数据
file_url_len = len(file_url_all)
for i in range(0,file_url_len):
# 文件url
file_url = file_url_all[i]
# 文件名
if file_type == “img”:
file_name = file_url[file_url.rindex(‘/’)+1:]
file_name = “%s_%s”%(str(1000+i)[1:],file_name)
elif file_type == “video”:
if “.ts” not in file_url:
m3u8_name = file_url
continue
file_name = file_url[file_url.rindex(‘/’)+1:]
file_name = “%s_%s”%(m3u8_name,file_name)
# 创建下载线程
th = myThread(file_name,this_down_path,file_name,file_url,file_type)
downThreads.append(th)
# 启动线程
th.start()
while len(downThreads) >= down_thread_num_this:
for th in downThreads:
# 判断线程的 run() 函数是否已经执行完成
if not th.is_alive():
# 线程执行结束后就从列表中将线程移除
downThreads.remove(th)
continue
return downThreads
def down_file(down_dir, down_name, down_url, file_type):
“””
下载文件
down_dir: 下载文件夹
down_name: 下载文件名
down_url: 下载地址
file_type: 文件类型 video/img “””
# 保存路径
down_file_path = “%s/%s”%(down_dir,down_name)
# 文件不存在才下载
if not file_exists(down_dir,down_file_path,file_type):
print(“%s 开始下载”%down_name)
# 图片网址的 域名
domain_down = down_url[:down_url.index(‘/’,8)]
# 图片网址的 Host
host_down = domain_down[domain_down.index(‘//’)+2:]
# 更新Headers
headers_down = {**_headers_down,**{‘Host’:host_down}}
# 获取响应
res = requests.get(down_url,headers=headers_down)
if file_type == “img”:
# 获取后缀名
content_type = res.headers[‘ContentType’]
#content_type = res.headers[b’ContentType’].decode(‘utf8’)
content_type = content_type[content_type.index(‘/’)+1:]
# 拼接文件名
down_file_path = “%s.%s”%(down_file_path,content_type)
# 保存
with open(down_file_path, ‘wb’) as f:
f.write(res.content)
res.close()
print(“%s 下载完成”%down_name)
def file_exists(down_dir,file_path,file_type):
“””
判断文件是否存在
down_dir: 下载文件夹
file_path: 文件路径
file_type: 文件类型 video/img “””
if file_type == “img”:
# 文件名
down_name = file_path[file_path.rindex(‘/’)+1:]
# 当前文件夹下所有文件
file_list_str = str(os.listdir(down_dir))
return down_name in file_list_str
elif file_type == “video”:
return os.path.exists(file_path)
else:
return False
def merge_ts(dir_path):
“””
合并ts文件
dir_path: 合并路径 “””
# 列出当前路径下所有文件
file_list = os.listdir(dir_path)
# ts文件前缀 获取一个m3u8文件下的ts文件
file_name_pre_list = []
# 过滤ts文件
for file_name in file_list:
if file_name.lower().endswith(“.ts”) and ‘_’ in file_name.lower():
file_name_pre_list.append(file_name[:file_name.index(‘_’)])
# ts文件前缀
file_name_pre_list = list(set(file_name_pre_list))
for file_name_pre in file_name_pre_list:
# 同一个m3u8文件下的ts文件路径
file_name_ts = “%s/%s_*.ts”%(dir_path,file_name_pre)
# 新文件名
file_name_new = “%s.ts”%(file_name_pre)
# 新文件名
file_path_new = “%s/%s”%(dir_path,file_name_new)
print(“开始合并文件 %s”%file_name_new)
# 打开新文件
with open(file_path_new,’wb’) as fnew:
# 遍历旧文件
file_old_list = natsorted(iglob(file_name_ts))
# 遍历写入
for ts in file_old_list:
with open(ts,’rb’) as fold:
# 将旧文件写入新文件
fnew.write(fold.read())
print(“合并文件 %s 完成”%file_name_new)
# 删除旧文件
for ts in iglob(file_name_ts):
os.remove(ts)
print(“旧有 %s ts文件已删除”%file_name_pre)
def login_pwd(httpSession):
“””
账号密码登录,暂不可使用
报错 The change you wanted was rejected (422) “””
# 登录页面访问headers
headers_login_get = {**_headers_login,**{“:method”:”GET”}}
# 发送登录请求headers
headers_login_post = {**_headers_login,**{“:method”:”POST”}}
# 访问登录页
res = httpSession.get(_url_login,headers=headers_login_get)
bsoup = BeautifulSoup(res.text,’html.parser’)
# 获取authenticity_token
authenticity_token = bsoup.select_one(‘input[name=”authenticity_token”]’)[‘value’]
authenticity_token = quote(authenticity_token,safe=”)
# 获取表单中的utf8字段 是个鉁
urf8_ = bsoup.select_one(‘input[name=”utf8″]’)[‘value’]
# 添加 urf8 字段
form_login_str = _form_login_str.replace(‘|urf8_|’,urf8_)
# URLEncode
form_login_str = quote(form_login_str,safe=’|=&’)
# 最终表单数据
form_login_str = “%s%s”%(form_login_str,authenticity_token)
content_length = str(len(form_login_str))
# 更新登录请求headers
headers_login_post = {**headers_login_post,**{“contentlength”:content_length}}
# 发送登录请求
res = httpSession.post(_url_login,headers=headers_login_post,data=form_login_str)
print(res.text)
res.close()
def login_cook(httpSession):
“””
使用cookies登录”””
cooks_dict = read_cookies(_cookies_path)
if cooks_dict:
httpSession.cookies.update(cooks_dict)
return True
else:
print(“\n* 读取cookies登录失败,将无法下载付费资源”)
return False
def read_cookies(cook_path):
“””
从文件读取cookies”””
# 判断cookies文件是否存在
if os.path.exists(cook_path):
# 从文件中读取
with open(cook_path,’r’) as f:
cooks = f.read().strip()
# 读取到数据
if cooks:
cooks = cooks.split(“;”)
# 转为字典
cooks_dict = {}
for cook in cooks:
if ‘=’ in cook:
cook_key,cook_val = cook.strip().split(“=”,2)
else:
cook_key = cook.strip()
cook_val = ”
cooks_dict[cook_key] = cook_val
return cooks_dict
else:
print(“* 请确保cookies文件 %s 中有最新的cookies信息”%_cookies_path)
else:
with open(cook_path,’w’) as f:
f.write(”)
print(“* 请确保cookies文件 %s 存在”%_cookies_path)
# 没读取到数据则返回None
return None
def write_cookies(cook_path,cookie_data):
“””
将cookies写入文件
cook_path:cookies文件路径
cookie_data:requests.session().cookies”””
# 转为元祖
cookie_items = cookie_data.items()
# 拼接为新cookies
cookies_new = “”
for cookie_item in cookie_items:
cookies_new = “%s%s=%s;”%(cookies_new,cookie_item[0],cookie_item[1])
# 去除两端的分号
cookies_new = cookies_new.strip(‘;’)
# 打开文件
with open(cook_path,mode=”w+”) as f:
# w+ 模式指针是在文件最后的 需要使用seek将指针移到最前面才能读取
f.seek(0,0)
# 获取老cookies作为备份
cookies_bak = f.read()
try:
# 新cookies写入文件
f.write(cookies_new)
print(“新cookies已写入文件”)
except Exception as e:
# 老cookies写入文件
f.write(cookies_bak)
print(“老cookies已写入文件”,e)
class myThread (threading.Thread):
“””
多线程下载
thread_name: 线程名,用以出错时区分
httpSession: requests.session()
down_dir:下载文件夹
down_name: 下载文件名
down_url: 下载地址
file_type: 文件类型 video/img “””
def __init__(self, thread_name, down_dir, down_name, down_url,file_type):
threading.Thread.__init__(self, name=thread_name)
self.down_dir = down_dir
self.down_name = down_name
self.down_url = down_url
self.file_type = file_type
def run(self):
# 下载图片
down_file(self.down_dir, self.down_name, self.down_url, self.file_type)
if __name__ == ‘__main__’:
main()
复制代码
2楼:写的很棒,大写的、赞
我觉得有两点可以优(jing)化(shang)一(tian)下(hua)
cookies 可不可以用selenium+chromedriver 模拟登陆来获取?这样可以免去手动写入cookies
把多个网页链接放到一个txt文本中,把文本文件路径作为参数传递给Python脚本,这样可以一次性下载多个。
3楼:有点复杂不是我等的水平可以弄
4楼:上上楼括号里是锦上添花,但是锦是jin不是jing
5楼:SAMSUNG 发表于 20191229 22:06
上上楼括号里是锦上添花,但是锦是jin不是jing
get
叒涨姿势了
6楼:大佬牛批!!!膜拜中
7楼:我w7只能安装python4.4,要安装吗
8楼:那个加密的m3u8视频也可以下载合并吗。。。
9楼:前排膜拜大佬!很厉害的!
10楼:向伟大的程序员致敬,紫薯布丁~
11楼:之前用requests 下载m3u8视频
可以把所有分段视频按照顺序写入同一个文件,不需要分成多个文件下载再合并以下是下载亚瑟的m3u8脚本
def down_sub_video(m3u8_url, title):
# 从m3u8文件中获取子文件链接,并下载
headers = {
“UserAgent”: “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36”,
“XRequestedWith”: “XMLHttpRequest”,
“Referer”: m3u8_url
}
m3u8_repon = requests.get(m3u8_url, headers=headers)
# print(‘1’, m3u8_repon.status_code)
if m3u8_repon.status_code != 200:
return False
m3u8_file = m3u8_repon.text
# print(m3u8_file)
count = 0
for line in m3u8_file.split(‘\n’):
print(line)
if len(line) == 0 or ‘#’ == line[0]:
continue
else:
# print(f'{count}: {line}’) if count > 28 else None
url_head = m3u8_url.split(‘/’)[2]
sub_url = ‘https://’ + url_head + line
try:
# print(‘sub_url:’, sub_url)
# print(f'{count}: get_video_stream’) if count > 28 else None
sub_repon = requests.get(sub_url, headers=headers)
except Exception as e:
# print(sub_url)
print(str(e))
# print(f'{count}: {sub_repon.status_code}’) if count > 28 else None
dir_path = title
if not os.path.exists(dir_path):
os.mkdir(dir_path)
# sub_file = dir_path + os.sep + line.split(‘/’)[1]
sub_file = dir_path + os.sep + ‘target.ts’
if sub_repon.status_code == 200:
# print(f'{count}: begin download’) if count > 28 else None
with open(sub_file, ‘ab+’) as w:
w.write(sub_repon.content)
# print(f'{count}: download over’) if count > 28 else None
else:
print(sub_url)
print(sub_repon.status_code)
return False
# if line.split(‘/’)[1] == ‘hls720p20.jpg’:
# return
count += 1
return dir_path复制代码
12楼:这个可以弄嘛啊哈哈
13楼:mark
14楼:先赞一个,你是第一个福娃中能写这么长的高手了,至于代码质量,我明天再看看
15楼:shenghaomail 发表于 20191229 20:32
写的很棒,大写的、赞
我觉得有两点可以优(jing)化(shang)一(tian)下(hua)
cookies 可不可以用selenium+c …
加上selenium的话脚本写起来很方便,但是使用起来就麻烦了
多个网页批量下载挺好
16楼:conversely 发表于 20191230 08:18
我w7只能安装python4.4,要安装吗
Python4.4是啥?
现在只有 3.x 和 2.x吧
17楼:zbsb棒子 发表于 20191230 09:47
那个加密的m3u8视频也可以下载合并吗。。。
可以下
m3u8并不是加密的,它相当于是一个播放列表,就是个文本文件,里面记录了视频的多个片段的地址
18楼:shenghaomail 发表于 20191230 10:41
之前用requests 下载m3u8视频
可以把所有分段视频按照顺序写入同一个文件,不需要分成多个文件下载再合并 …
这是一个m3u8对应的多个分段按顺序一个一个的下载,如果文件服务器链接速度慢不能跑满带宽的话下载速度会比较慢
我的方式是多个片段一起下载,然后合并,可以通过控制线程数跑满带宽
如果链接速度快的话就差不多了
19楼:悦~ 发表于 20191230 21:42
先赞一个,你是第一个福娃中能写这么长的高手了,至于代码质量,我明天再看看
写的时候是想到什么写什么,没有啥合理的规划,很混乱,尤其是变量命名,我自己都没眼看
只懂个皮毛,代码写的也不怎么滴,只是能用,离着高质量差的很远
20楼:王二小 发表于 20191230 22:59
Python4.4是啥?
现在只有 3.x 和 2.x吧
是3.4,不带勾选变环境那个
21楼:有空下载看看!
22楼:本帖最后由 悦~ 于 20191231 19:02 编辑
# * coding:utf8 *
import os
import re
import sys
from functools import partial
from pathlib import Path
from typing import List
import requests as _requests
from bs4 import BeautifulSoup as _BeautifulSoup
from requests import Response
BeautifulSoup = partial(_BeautifulSoup, features=’html.parser’)
requests: _requests.Session = None
WEB_DOWN_DIR = Path(‘webdown’)
post_url = None
def init():
global post_url
global requests
try:
# 接受要处理的参数
# url_post = sys.argv[1]
post_url = ‘https://umei.fun/posts/4255’
# post_url = ‘https://umei.fun/posts/8027’
except:
print(f’请添加要处理的url的参数’)
sys.exit(0)
requests = _requests.Session()
requests.headers.update({
“UserAgent”: “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3729.169 Safari/537.36”
})
def api_get(url: str, **kwargs) > [Response, None]:
try:
return requests.get(url, **kwargs)
except:
return None
def parse_title(title: str) > str:
“””
处理 文章标题,剔除不符合 windows 文件夹命名字符
:param title:
:return:
“””
return re.sub(r'[\/:*?”<>|]’, ”, title)
def parse_img(soup: _BeautifulSoup) > List:
“””
解析所有图片地址
:param soup:
:return:
“””
img_tags = soup.select(‘img.imgfluid’)
return [it[‘src’] for it in img_tags]
def down_file(url: str, down_dir: Path, file_name: str = None, suffix: str = None) > bool:
“””
下载文件
:param url:
:param down_dir:
:param file_name:
:return:
“””
resp = api_get(url)
if resp is None or resp.status_code != 200: return False
# 第一种 header 中自带 文件名的
if not file_name:
file_name = resp.headers.get(‘ContentDisposition’)
file_name = file_name and re.findall(r’filename=”(.+?)”‘, file_name)[0]
# 第二种 如果被重定向 就使用重定向的url
if not file_name:
file_name = os.path.basename(url)
# 追加 扩展名
if suffix and ‘.’ not in file_name: file_name += suffix
down_dir.exists() or down_dir.mkdir(parents=True, exist_ok=True)
_down_path = down_dir / file_name
if _down_path.exists():
return _down_path
# 下载文件
try:
with _down_path.open(‘wb’) as f:
f.write(resp.content)
return _down_path
except:
_down_path.exists() and _down_path.unlink()
return None
def parse_m3u8_video(soup: _BeautifulSoup) > [str, None]:
“””
解析 m3u8 地址
:param soup:
:return:
“””
return (re.findall(r”‘(https?:.+?m3u8)'”, soup.text) or [None])[0]
def main():
init()
resp = api_get(post_url)
if resp and resp.status_code != 200:
print(f’请求{post_url}失败’)
sys.exit(1)
soup = BeautifulSoup(resp.text)
if soup.select_one(‘form#new_user’):
print(f'{post_url} 该资源需要登录’)
sys.exit(1)
web_title = parse_title(soup.title.text)
img_urls = parse_img(soup)
img_len = len(img_urls)
print(f’Img:{img_len} {web_title}’)
for i, imgurl in enumerate(img_urls, start=1):
print(f'{i}/{img_len} > ‘, end=”)
imgpath = down_file(imgurl, down_dir=WEB_DOWN_DIR / web_title, suffix=’.jpeg’)
print(f’Ok {imgpath}’) if imgpath else print(f’Error {imgurl}’)
video_url = parse_m3u8_video(soup)
print(f’视频地址: {video_url}’)
if __name__ == ‘__main__’:
main()
复制代码
没有上多线程了,上线程池也挺简单的 。有现成的线程池,不需要自己定义。另外m3u8还是 用专门的下载工具比较好。
23楼:先马克,,记得再看
24楼:王二小 发表于 20191230 23:01
可以下
m3u8并不是加密的,它相当于是一个播放列表,就是个文本文件,里面记录了视频的多个片段的地址
这个不可以下载加密的视频的吧,有一些网站的视频是加密的,下载好的分段需要用aes128解密后才能看的
25楼:悦~ 发表于 20191231 19:00
没有上多线程了,上线程池也挺简单的 。有现成的线程池,不需要自己定义。另外m3u8还是 用专门的下载工 …
你好,m3u8有什么好的工具吗?可以合并视频的
26楼:念天悠 发表于 20191231 21:30
你好,m3u8有什么好的工具吗?可以合并视频的
图形化下载工具 https://github.com/magicdmer/M3U8Downloader/releases
我用的是命令行工具 ffmpeg 顺便说下 同样地 posts/4255 你用ts直接下载是151M, ffmpeg 下载是 50.9M ,合成之后的。
27楼:悦~ 发表于 202011 10:04
图形化下载工具 https://github.com/magicdmer/M3U8Downloader/releases
我用的是命令行工具 ffmpeg 顺 …
好的,谢谢
