1. 程式人生 > 其它 >python爬取m3u8視訊檔案

python爬取m3u8視訊檔案

import requests
import os
import aiohttp
import asyncio


first_m3u8_url = "https://cdn.zoubuting.com/20221129/waHIjBSS/index.m3u8"
headers = {
    'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Edg/107.0.1418.56'
}
# 獲取m3u8地址 -->  second_m3u8_url
first_m3u8_url_text = requests.get(url=first_m3u8_url,headers=headers).text
# first_m3u8_url_text = first_m3u8_url_text.strip()
# print(first_m3u8_url_text)

for line in first_m3u8_url_text.split('\n'):
    if line.startswith("/"):
        second_m3u8_url = "https://cdn.zoubuting.com" + line
# # print(second_m3u8_url)
#
# # 讀取m3u8檔案內容
second_url_text = requests.get(url=second_m3u8_url,headers=headers).text
# print(second_url_text)

# 將所有ts連結收集起來,放入列表中
ts_url_list = []
for ts_url in second_url_text.split('\n'):
    if ts_url.startswith("https://"):
        ts_url_list.append(ts_url)

# 建立下載目錄
dirName = "movieDown"
if not os.path.exists(dirName):
    os.mkdir(dirName)

# 下載所有的ts檔案
# for url in ts_url_list:
#     ts_name = url.split('/')[-1]
#     download_path = dirName + '/' + ts_name
#     data = requests.get(url=url,headers=headers).content
#     with open(download_path,'wb') as fp:
#         fp.write(data)
#         print(ts_name,"下載完成!")

# 協程改寫
# 發起請求
async def get_url_data(url):
    async with aiohttp.ClientSession() as sess_requests:
        async with await sess_requests.get(url=url,headers=headers) as response:
            url_data = await response.read()
            return url_data,url

# 建立回撥函式
def download(t):
    data,url = t.result()
    ts_name = url.split('/')[-1]
    download_path = dirName + '/' + ts_name
    data = requests.get(url=url,headers=headers).content
    with open(download_path,'wb') as fp:
        fp.write(data)
        print(ts_name,"下載完成!")
# 執行
tasks = []
for url in ts_url_list:
    # 建立任務
    c = get_url_data(url)
    task = asyncio.ensure_future(c)
    # 回撥函式
    task.add_done_callback(download)
    tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))