Python3 umei 脚本

本帖最后由 王二小 于 20191229 18:23 编辑
业余Python爱好者

Python3 umei 脚本

本帖最后由 王二小 于 20191229 18:23 编辑

业余Python爱好者,之前看到福娃分享的网站umei.fun,就写了个脚本,自动下载图片和视频,同时自动合并分段视频这个网站保存图片视频用的ipfs,每次爬的时候只需要对网站请求一次获取图片地址,应该不会对网站有太大影响

其实写出来很久了,但是账号密码登录一直有问题,弄了好久直到现在也没搞定,就先把脚本发出来,有大佬研究研究可以帮助完善一下

python3

下载付费资源需要使用cookies登录,如下图 登录后 按F12 复制红框中 cookies: 后面的字在脚本同一文件夹里新建txt文件,文件名umei.fun.cook.txt把刚才复制的字符串粘贴到里面 保存

需要安装以下模块

pip install requests

pip install natsort

pip install bs4

pip install hyper复制代码

使用时命令格式如下 最后一个参数是下载线程数,一个线程负责下载一个文件,可以不写,不写的同时下载10个文件,最大限制为50

python 脚本文件名 帖子地址 [同时下载图片/视频线程数]复制代码文件会保存在脚本同一个文件夹里的 umei.fun/xxx 文件夹里

下面是脚本代码,复制后保存为 .py 文件

注释写的很详细 有需要自己定制的可以修改

代码比较渣,见谅

# * coding:utf8 *

import os

import re

import sys

import requests

import threading

from glob import iglob

from natsort import natsorted

from bs4 import BeautifulSoup

from urllib.parse import quote

from hyper.contrib import HTTP20Adapter

# 当前脚本真实路径

_current_script_realpath = os.path.realpath(__file__)

# 当前脚本所在文件夹

_current_script_dir = os.path.dirname(_current_script_realpath)

# 当前脚本文件名

_current_script_name = os.path.basename(_current_script_realpath)

# 当前系统路径分隔符

_current_path_sep = os.path.sep

# 用户名 暂不可用 报错 The change you wanted was rejected (422)

_user = “”

# 密码 暂不可用 报错 The change you wanted was rejected (422)

_passwd = “”

# cookies文件路径

_cookies_path = “%s%sumei.fun.cook.txt”%(_current_script_dir,_current_path_sep)

# 默认下载线程数

_down_thread_num = 10

# 登录表单

_form_login_str = “utf8=|urf8_|&user[email]=%s&user[password]=%s&user[remember_me]=0&user[remember_me]=1&commit=登录&authenticity_token=”%(_user,_passwd)

# 网站host

_host_umei = “umei.fun”

# 网站域名

_url_umei = “https://%s”%_host_umei

# 登录页面

_url_login = “%s/users/sign_in”%_url_umei

# 下载文件夹

_down_dir = “%s/%s”%(_current_script_dir,_host_umei)

# UserAgent

_user_agent = “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3729.169 Safari/537.36”

# 请求头 访问帖子用 :path 需要改

_headers_post = {“:authority”:_host_umei,”:method”:”GET”,”:scheme”:”https”,

“accept”:”text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signedexchange;v=b3″,

“acceptencoding”:”gzip, deflate, br”,”acceptlanguage”:”zhCN,zh;q=0.9″,”cachecontrol”:”nocache”,

“dnt”:”1″,”pragma”:”nocache”,”upgradeinsecurerequests”:”1″,”useragent”:_user_agent

}

# 请求头 请求图片用 Host需要改

_headers_down = {

“AcceptEncoding”:”gzip, deflate, br”,”AcceptLanguage”:”zhCN,zh;q=0.9″,”CacheControl”:”nocache”,

“Accept”:”text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signedexchange;v=b3″,

“Connection”:”keepalive”,”DNT”:”1″,”Host”:””,”Pragma”:”nocache”,”UpgradeInsecureRequests”:”1″,”UserAgent”:_user_agent

}

# 登录headers

_headers_login = {“:authority”:_host_umei,”:path”:”/users/sign_in”,”:scheme”:”https”,

“accept”:”text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signedexchange;v=b3″,

“acceptencoding”:”gzip, deflate, br”,”acceptlanguage”:”zhCN,zh;q=0.9″,”cachecontrol”:”nocache”,

“contenttype”:”application/xwwwformurlencoded”,”dnt”:”1″,”origin”:_url_umei,”pragma”:”nocache”,

“referer”:_url_login,”secfetchmode”:”navigate”,”secfetchsite”:”sameorigin”,”secfetchuser”:”?1″,

“upgradeinsecurerequests”:”1″,”useragent”:_user_agent

}

def main():

# 脚本入参

argvs = sys.argv

# 使用说明

tip1 = “\n* 如需下载付费资源,确保 %s 文件存在且保存了cookies信息”%(_cookies_path)

tip2 = “\n命令格式: python %s 页面地址 [下载线程数(可选,默认10,最大50)]”%(_current_script_name)

tip3 = “\n 示例: python %s %s/posts/4255 10″%(_current_script_name,_url_umei)

tips = “%s\n%s%s”%(tip1,tip2,tip3)

if len(argvs) < 2 or len(argvs) > 3:

print(tips)

else:

# 帖子地址

url_post = argvs[1]

if url_post:

if not url_post.startswith(‘%s/posts/’%_url_umei):

print(“\n页面地址格式有误\n%s”%tips)

else:

if len(argvs) == 3:

# 线程数

down_thread_num_this = argvs[2]

if down_thread_num_this.isdigit():

down_thread_num_this = int(down_thread_num_this)

if down_thread_num_this < 1:

print(“\n线程数最必须大于0\n%s”%tips)

down_thread_num_this = None

elif down_thread_num_this > 50:

print(“\n线程数最大为50\n%s”%tips)

down_thread_num_this = None

else:

print(“\n线程数必须为数字\n%s”%tips)

down_thread_num_this = None

else:

# 默认线程数

down_thread_num_this = _down_thread_num

if down_thread_num_this:

down(url_post,down_thread_num_this)

else:

print(“\n必须输入页面地址\n%s”%tips)

def down(post_url,down_thread_num):

“””

执行下载

post_url: 帖子地址

down_thread_num: 下载线程”””

httpSession = requests.session()

# 启用http2请求头支持

httpSession.mount(_url_umei,HTTP20Adapter())

# 登录

is_cookies_updated = login_cook(httpSession)

# 获取资源路径 请求头 :path 字段用

header_path = post_url.replace(_url_umei,””)

# 更新headers

headers_post = {**_headers_post,**{“:path”:header_path}}

# 更新headers

#httpSession.headers.update(headers_post)

# 访问帖子地址,获取网页title作为下载地址

res = httpSession.get(post_url,headers=headers_post)

bsoup = BeautifulSoup(res.text,’html.parser’)

res.close()

if bsoup.select_one(‘body’).text == ‘You are being redirected.’:

print(“\n* 此资源可能为付费资源,请先将cookies信息写入 %s”%_cookies_path)

return

# 将新cookies写入文件

if is_cookies_updated:

write_cookies(_cookies_path,httpSession.cookies)

# 网页title 去除路径不支持的字符

web_title = bsoup.title.text.replace(” | “,”_”).replace(“\”,””).replace(“/”,””).replace(“””,””).replace(“‘”,””).replace(“?”,””).replace(” “,”_”)

# 创建下载文件夹

this_down_path = create_down_dir(_down_dir,web_title)

# 获取图片下载地址

img_file_url_list = get_file_url(bsoup)

print(“========== 共有 %d 张图片 ==========”%(len(img_file_url_list)))

# 下载

downThreads = downRecursion(this_down_path, img_file_url_list, down_thread_num, ‘img’)

# 获取m3u8_url_list

m3u8_url_list = get_m3u8_url(bsoup)

# 获取ts地址

ts_url_list = get_ts_url(m3u8_url_list)

print(“========== 共有 %d 个ts文件 ==========”%(len(ts_url_list)))

# 下载ts文件

downThreads = downRecursion(this_down_path, ts_url_list, down_thread_num, ‘video’) + downThreads

# 防止提前退出

for th in downThreads:

th.join()

print(“全部下载完毕”)

# 合并ts文件

merge_ts(this_down_path)

print(“ts文件合并完成”)

def get_ts_url(m3u8_url_list):

“””

从m3u8地址列表中获取视频ts地址”””

ts_url_list = []

for m3u8_url in m3u8_url_list:

# m3u8 地址

m3u8_url_prefix = m3u8_url[:m3u8_url.rindex(‘/’)]

# m3u8 名

m3u8_name = m3u8_url[m3u8_url.rindex(‘/’)+1:m3u8_url.rindex(‘.’)]

ts_url_list.append(m3u8_name)

# 访问m3u8地址 获取ts文件

res = requests.get(m3u8_url)

m3u8_text = res.text

res.close()

# 查找ts文件名

ts_result = re.findall(r'(.*\.ts)’,m3u8_text,re.I|re.M)

for ts_name in ts_result:

ts_url = “%s/%s”%(m3u8_url_prefix,ts_name)

ts_url_list.append(ts_url)

return ts_url_list

def get_m3u8_url(bsoup):

“””

从BeautifulSoup对象中获取要下载的video文件url”””

m3u8_url_list = []

# 视频区域

div_video_script_eles = bsoup.select(‘body div.container div.row div.container div.row script’)

for div_video_script_ele in div_video_script_eles:

script_text = div_video_script_ele.text

# 查找视频m3u8地址

reresult = re.search(r’source: *\'(http.*\.m3u8)\”,script_text,re.I|re.M)

if reresult:

m3u8_url_list.append(reresult.group(1))

return m3u8_url_list

def get_file_url(bsoup):

“””

从BeautifulSoup对象中获取要下载的文件url”””

file_url_list = []

# 图片区域 付费套图 层次不同,不在body标签内

div_img_ele = bsoup.select_one(‘div.container div.row div.card’)

#div_img_ele = bsoup.select_one(‘body div.container div.row div.card’)

# 所有img标签

img_ele_all = div_img_ele.select(‘img’)

for img_ele in img_ele_all:

file_url_list.append(img_ele[‘src’])

return file_url_list

def create_down_dir(down_dir_father,down_dir_name):

“””

文件夹不存在的话创建文件夹

down_dir_father:父级文件夹

down_dir_name: 下载文件夹名”””

# 拼接路径

this_down_path = “%s/%s”%(down_dir_father,down_dir_name)

this_down_path = this_down_path.replace(‘//’,’/’)

if not os.path.exists(this_down_path):

print(“文件夹 %s 不存在,创建”%this_down_path)

os.makedirs(this_down_path)

return this_down_path

def downRecursion(this_down_path, file_url_all, down_thread_num_this, file_type):

“””

遍历下载

this_down_path: 下载文件夹

file_url_all: 所有下载信息

down_thread_num_this: 下载最大线程数

file_type: 文件类型 video/img “””

# 下载线程

downThreads = []

# 遍历获取数据

file_url_len = len(file_url_all)

for i in range(0,file_url_len):

# 文件url

file_url = file_url_all[i]

# 文件名

if file_type == “img”:

file_name = file_url[file_url.rindex(‘/’)+1:]

file_name = “%s_%s”%(str(1000+i)[1:],file_name)

elif file_type == “video”:

if “.ts” not in file_url:

m3u8_name = file_url

continue

file_name = file_url[file_url.rindex(‘/’)+1:]

file_name = “%s_%s”%(m3u8_name,file_name)

# 创建下载线程

th = myThread(file_name,this_down_path,file_name,file_url,file_type)

downThreads.append(th)

# 启动线程

th.start()

while len(downThreads) >= down_thread_num_this:

for th in downThreads:

# 判断线程的 run() 函数是否已经执行完成

if not th.is_alive():

# 线程执行结束后就从列表中将线程移除

downThreads.remove(th)

continue

return downThreads

def down_file(down_dir, down_name, down_url, file_type):

“””

下载文件

down_dir: 下载文件夹

down_name: 下载文件名

down_url: 下载地址

file_type: 文件类型 video/img “””

# 保存路径

down_file_path = “%s/%s”%(down_dir,down_name)

# 文件不存在才下载

if not file_exists(down_dir,down_file_path,file_type):

print(“%s 开始下载”%down_name)

# 图片网址的 域名

domain_down = down_url[:down_url.index(‘/’,8)]

# 图片网址的 Host

host_down = domain_down[domain_down.index(‘//’)+2:]

# 更新Headers

headers_down = {**_headers_down,**{‘Host’:host_down}}

# 获取响应

res = requests.get(down_url,headers=headers_down)

if file_type == “img”:

# 获取后缀名

content_type = res.headers[‘ContentType’]

#content_type = res.headers[b’ContentType’].decode(‘utf8’)

content_type = content_type[content_type.index(‘/’)+1:]

# 拼接文件名

down_file_path = “%s.%s”%(down_file_path,content_type)

# 保存

with open(down_file_path, ‘wb’) as f:

f.write(res.content)

res.close()

print(“%s 下载完成”%down_name)

def file_exists(down_dir,file_path,file_type):

“””

判断文件是否存在

down_dir: 下载文件夹

file_path: 文件路径

file_type: 文件类型 video/img “””

if file_type == “img”:

# 文件名

down_name = file_path[file_path.rindex(‘/’)+1:]

# 当前文件夹下所有文件

file_list_str = str(os.listdir(down_dir))

return down_name in file_list_str

elif file_type == “video”:

return os.path.exists(file_path)

else:

return False

def merge_ts(dir_path):

“””

合并ts文件

dir_path: 合并路径 “””

# 列出当前路径下所有文件

file_list = os.listdir(dir_path)

# ts文件前缀 获取一个m3u8文件下的ts文件

file_name_pre_list = []

# 过滤ts文件

for file_name in file_list:

if file_name.lower().endswith(“.ts”) and ‘_’ in file_name.lower():

file_name_pre_list.append(file_name[:file_name.index(‘_’)])

# ts文件前缀

file_name_pre_list = list(set(file_name_pre_list))

for file_name_pre in file_name_pre_list:

# 同一个m3u8文件下的ts文件路径

file_name_ts = “%s/%s_*.ts”%(dir_path,file_name_pre)

# 新文件名

file_name_new = “%s.ts”%(file_name_pre)

# 新文件名

file_path_new = “%s/%s”%(dir_path,file_name_new)

print(“开始合并文件 %s”%file_name_new)

# 打开新文件

with open(file_path_new,’wb’) as fnew:

# 遍历旧文件

file_old_list = natsorted(iglob(file_name_ts))

# 遍历写入

for ts in file_old_list:

with open(ts,’rb’) as fold:

# 将旧文件写入新文件

fnew.write(fold.read())

print(“合并文件 %s 完成”%file_name_new)

# 删除旧文件

for ts in iglob(file_name_ts):

os.remove(ts)

print(“旧有 %s ts文件已删除”%file_name_pre)

def login_pwd(httpSession):

“””

账号密码登录,暂不可使用

报错 The change you wanted was rejected (422) “””

# 登录页面访问headers

headers_login_get = {**_headers_login,**{“:method”:”GET”}}

# 发送登录请求headers

headers_login_post = {**_headers_login,**{“:method”:”POST”}}

# 访问登录页

res = httpSession.get(_url_login,headers=headers_login_get)

bsoup = BeautifulSoup(res.text,’html.parser’)

# 获取authenticity_token

authenticity_token = bsoup.select_one(‘input[name=”authenticity_token”]’)[‘value’]

authenticity_token = quote(authenticity_token,safe=”)

# 获取表单中的utf8字段 是个鉁

urf8_ = bsoup.select_one(‘input[name=”utf8″]’)[‘value’]

# 添加 urf8 字段

form_login_str = _form_login_str.replace(‘|urf8_|’,urf8_)

# URLEncode

form_login_str = quote(form_login_str,safe=’|=&’)

# 最终表单数据

form_login_str = “%s%s”%(form_login_str,authenticity_token)

content_length = str(len(form_login_str))

# 更新登录请求headers

headers_login_post = {**headers_login_post,**{“contentlength”:content_length}}

# 发送登录请求

res = httpSession.post(_url_login,headers=headers_login_post,data=form_login_str)

print(res.text)

res.close()

def login_cook(httpSession):

“””

使用cookies登录”””

cooks_dict = read_cookies(_cookies_path)

if cooks_dict:

httpSession.cookies.update(cooks_dict)

return True

else:

print(“\n* 读取cookies登录失败,将无法下载付费资源”)

return False

def read_cookies(cook_path):

“””

从文件读取cookies”””

# 判断cookies文件是否存在

if os.path.exists(cook_path):

# 从文件中读取

with open(cook_path,’r’) as f:

cooks = f.read().strip()

# 读取到数据

if cooks:

cooks = cooks.split(“;”)

# 转为字典

cooks_dict = {}

for cook in cooks:

if ‘=’ in cook:

cook_key,cook_val = cook.strip().split(“=”,2)

else:

cook_key = cook.strip()

cook_val = ”

cooks_dict[cook_key] = cook_val

return cooks_dict

else:

print(“* 请确保cookies文件 %s 中有最新的cookies信息”%_cookies_path)

else:

with open(cook_path,’w’) as f:

f.write(”)

print(“* 请确保cookies文件 %s 存在”%_cookies_path)

# 没读取到数据则返回None

return None

def write_cookies(cook_path,cookie_data):

“””

将cookies写入文件

cook_path:cookies文件路径

cookie_data:requests.session().cookies”””

# 转为元祖

cookie_items = cookie_data.items()

# 拼接为新cookies

cookies_new = “”

for cookie_item in cookie_items:

cookies_new = “%s%s=%s;”%(cookies_new,cookie_item[0],cookie_item[1])

# 去除两端的分号

cookies_new = cookies_new.strip(‘;’)

# 打开文件

with open(cook_path,mode=”w+”) as f:

# w+ 模式指针是在文件最后的 需要使用seek将指针移到最前面才能读取

f.seek(0,0)

# 获取老cookies作为备份

cookies_bak = f.read()

try:

# 新cookies写入文件

f.write(cookies_new)

print(“新cookies已写入文件”)

except Exception as e:

# 老cookies写入文件

f.write(cookies_bak)

print(“老cookies已写入文件”,e)

class myThread (threading.Thread):

“””

多线程下载

thread_name: 线程名,用以出错时区分

httpSession: requests.session()

down_dir:下载文件夹

down_name: 下载文件名

down_url: 下载地址

file_type: 文件类型 video/img “””

def __init__(self, thread_name, down_dir, down_name, down_url,file_type):

threading.Thread.__init__(self, name=thread_name)

self.down_dir = down_dir

self.down_name = down_name

self.down_url = down_url

self.file_type = file_type

def run(self):

# 下载图片

down_file(self.down_dir, self.down_name, self.down_url, self.file_type)

if __name__ == ‘__main__’:

main()

复制代码

2楼:写的很棒,大写的、赞

我觉得有两点可以优(jing)化(shang)一(tian)下(hua)

cookies 可不可以用selenium+chromedriver 模拟登陆来获取?这样可以免去手动写入cookies

把多个网页链接放到一个txt文本中,把文本文件路径作为参数传递给Python脚本,这样可以一次性下载多个。

3楼:有点复杂不是我等的水平可以弄

4楼:上上楼括号里是锦上添花,但是锦是jin不是jing

5楼:SAMSUNG 发表于 20191229 22:06

上上楼括号里是锦上添花,但是锦是jin不是jing

get

叒涨姿势了

6楼:大佬牛批!!!膜拜中

7楼:我w7只能安装python4.4,要安装吗

8楼:那个加密的m3u8视频也可以下载合并吗。。。

9楼:前排膜拜大佬!很厉害的!

10楼:向伟大的程序员致敬,紫薯布丁~

11楼:之前用requests 下载m3u8视频

可以把所有分段视频按照顺序写入同一个文件,不需要分成多个文件下载再合并以下是下载亚瑟的m3u8脚本

def down_sub_video(m3u8_url, title):

# 从m3u8文件中获取子文件链接,并下载

headers = {

“UserAgent”: “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36”,

“XRequestedWith”: “XMLHttpRequest”,

“Referer”: m3u8_url

}

m3u8_repon = requests.get(m3u8_url, headers=headers)

# print(‘1’, m3u8_repon.status_code)

if m3u8_repon.status_code != 200:

return False

m3u8_file = m3u8_repon.text

# print(m3u8_file)

count = 0

for line in m3u8_file.split(‘\n’):

print(line)

if len(line) == 0 or ‘#’ == line[0]:

continue

else:

# print(f'{count}: {line}’) if count > 28 else None

url_head = m3u8_url.split(‘/’)[2]

sub_url = ‘https://’ + url_head + line

try:

# print(‘sub_url:’, sub_url)

# print(f'{count}: get_video_stream’) if count > 28 else None

sub_repon = requests.get(sub_url, headers=headers)

except Exception as e:

# print(sub_url)

print(str(e))

# print(f'{count}: {sub_repon.status_code}’) if count > 28 else None

dir_path = title

if not os.path.exists(dir_path):

os.mkdir(dir_path)

# sub_file = dir_path + os.sep + line.split(‘/’)[1]

sub_file = dir_path + os.sep + ‘target.ts’

if sub_repon.status_code == 200:

# print(f'{count}: begin download’) if count > 28 else None

with open(sub_file, ‘ab+’) as w:

w.write(sub_repon.content)

# print(f'{count}: download over’) if count > 28 else None

else:

print(sub_url)

print(sub_repon.status_code)

return False

# if line.split(‘/’)[1] == ‘hls720p20.jpg’:

# return

count += 1

return dir_path复制代码

12楼:这个可以弄嘛啊哈哈

13楼:mark

14楼:先赞一个,你是第一个福娃中能写这么长的高手了,至于代码质量,我明天再看看

15楼:shenghaomail 发表于 20191229 20:32

写的很棒,大写的、赞

我觉得有两点可以优(jing)化(shang)一(tian)下(hua)

cookies 可不可以用selenium+c …

加上selenium的话脚本写起来很方便,但是使用起来就麻烦了

多个网页批量下载挺好

16楼:conversely 发表于 20191230 08:18

我w7只能安装python4.4,要安装吗

Python4.4是啥?

现在只有 3.x 和 2.x吧

17楼:zbsb棒子 发表于 20191230 09:47

那个加密的m3u8视频也可以下载合并吗。。。

可以下

m3u8并不是加密的,它相当于是一个播放列表,就是个文本文件,里面记录了视频的多个片段的地址

18楼:shenghaomail 发表于 20191230 10:41

之前用requests 下载m3u8视频

可以把所有分段视频按照顺序写入同一个文件,不需要分成多个文件下载再合并 …

这是一个m3u8对应的多个分段按顺序一个一个的下载,如果文件服务器链接速度慢不能跑满带宽的话下载速度会比较慢

我的方式是多个片段一起下载,然后合并,可以通过控制线程数跑满带宽

如果链接速度快的话就差不多了

19楼:悦~ 发表于 20191230 21:42

先赞一个,你是第一个福娃中能写这么长的高手了,至于代码质量,我明天再看看

写的时候是想到什么写什么,没有啥合理的规划,很混乱,尤其是变量命名,我自己都没眼看

只懂个皮毛,代码写的也不怎么滴,只是能用,离着高质量差的很远

20楼:王二小 发表于 20191230 22:59

Python4.4是啥?

现在只有 3.x 和 2.x吧

是3.4,不带勾选变环境那个

21楼:有空下载看看!

22楼:本帖最后由 悦~ 于 20191231 19:02 编辑

# * coding:utf8 *

import os

import re

import sys

from functools import partial

from pathlib import Path

from typing import List

import requests as _requests

from bs4 import BeautifulSoup as _BeautifulSoup

from requests import Response

BeautifulSoup = partial(_BeautifulSoup, features=’html.parser’)

requests: _requests.Session = None

WEB_DOWN_DIR = Path(‘webdown’)

post_url = None

def init():

global post_url

global requests

try:

# 接受要处理的参数

# url_post = sys.argv[1]

post_url = ‘https://umei.fun/posts/4255’

# post_url = ‘https://umei.fun/posts/8027’

except:

print(f’请添加要处理的url的参数’)

sys.exit(0)

requests = _requests.Session()

requests.headers.update({

“UserAgent”: “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3729.169 Safari/537.36”

})

def api_get(url: str, **kwargs) > [Response, None]:

try:

return requests.get(url, **kwargs)

except:

return None

def parse_title(title: str) > str:

“””

处理 文章标题,剔除不符合 windows 文件夹命名字符

:param title:

:return:

“””

return re.sub(r'[\/:*?”<>|]’, ”, title)

def parse_img(soup: _BeautifulSoup) > List:

“””

解析所有图片地址

:param soup:

:return:

“””

img_tags = soup.select(‘img.imgfluid’)

return [it[‘src’] for it in img_tags]

def down_file(url: str, down_dir: Path, file_name: str = None, suffix: str = None) > bool:

“””

下载文件

:param url:

:param down_dir:

:param file_name:

:return:

“””

resp = api_get(url)

if resp is None or resp.status_code != 200: return False

# 第一种 header 中自带 文件名的

if not file_name:

file_name = resp.headers.get(‘ContentDisposition’)

file_name = file_name and re.findall(r’filename=”(.+?)”‘, file_name)[0]

# 第二种 如果被重定向 就使用重定向的url

if not file_name:

file_name = os.path.basename(url)

# 追加 扩展名

if suffix and ‘.’ not in file_name: file_name += suffix

down_dir.exists() or down_dir.mkdir(parents=True, exist_ok=True)

_down_path = down_dir / file_name

if _down_path.exists():

return _down_path

# 下载文件

try:

with _down_path.open(‘wb’) as f:

f.write(resp.content)

return _down_path

except:

_down_path.exists() and _down_path.unlink()

return None

def parse_m3u8_video(soup: _BeautifulSoup) > [str, None]:

“””

解析 m3u8 地址

:param soup:

:return:

“””

return (re.findall(r”‘(https?:.+?m3u8)'”, soup.text) or [None])[0]

def main():

init()

resp = api_get(post_url)

if resp and resp.status_code != 200:

print(f’请求{post_url}失败’)

sys.exit(1)

soup = BeautifulSoup(resp.text)

if soup.select_one(‘form#new_user’):

print(f'{post_url} 该资源需要登录’)

sys.exit(1)

web_title = parse_title(soup.title.text)

img_urls = parse_img(soup)

img_len = len(img_urls)

print(f’Img:{img_len} {web_title}’)

for i, imgurl in enumerate(img_urls, start=1):

print(f'{i}/{img_len} > ‘, end=”)

imgpath = down_file(imgurl, down_dir=WEB_DOWN_DIR / web_title, suffix=’.jpeg’)

print(f’Ok {imgpath}’) if imgpath else print(f’Error {imgurl}’)

video_url = parse_m3u8_video(soup)

print(f’视频地址: {video_url}’)

if __name__ == ‘__main__’:

main()

复制代码

没有上多线程了,上线程池也挺简单的 。有现成的线程池,不需要自己定义。另外m3u8还是 用专门的下载工具比较好。

23楼:先马克,,记得再看

24楼:王二小 发表于 20191230 23:01

可以下

m3u8并不是加密的,它相当于是一个播放列表,就是个文本文件,里面记录了视频的多个片段的地址

这个不可以下载加密的视频的吧,有一些网站的视频是加密的,下载好的分段需要用aes128解密后才能看的

25楼:悦~ 发表于 20191231 19:00

没有上多线程了,上线程池也挺简单的 。有现成的线程池,不需要自己定义。另外m3u8还是 用专门的下载工 …

你好,m3u8有什么好的工具吗?可以合并视频的

26楼:念天悠 发表于 20191231 21:30

你好,m3u8有什么好的工具吗?可以合并视频的

图形化下载工具 https://github.com/magicdmer/M3U8Downloader/releases

我用的是命令行工具 ffmpeg 顺便说下 同样地 posts/4255 你用ts直接下载是151M, ffmpeg 下载是 50.9M ,合成之后的。

27楼:悦~ 发表于 202011 10:04

图形化下载工具 https://github.com/magicdmer/M3U8Downloader/releases

我用的是命令行工具 ffmpeg 顺 …

好的,谢谢

(0)

相关推荐