1. 程式人生 > 其它 >m3u8轉mp4快取合併工廠_使用Python下載M3U8格式視訊

m3u8轉mp4快取合併工廠_使用Python下載M3U8格式視訊

技術標籤:m3u8轉mp4快取合併工廠python hls m3u8ts檔案按m3u8順序合併軟體快取合併工具m3u8

543ed0bc37b628392e130b51134f8e63.png

/ 作者簡介 /

本篇文章來自 MRArchive 的投稿,分享瞭如何使用Python下載M3U8格式的視訊,希望對大家有所幫助!同時也感謝作者貢獻的精彩文章。

作者資訊

網站:https://toodo.fun

B站:https://space.bilibili.com/314191627

/ 背景簡介 /

M3U是一種播放多媒體列表的檔案格式,它的設計初衷是為了播放音訊檔案,比如MP3,但是越來越多的軟體現在用來播放視訊檔案列表,M3U也可以指定線上流媒體音訊源。很多播放器和軟體都支援M3U檔案格式。

m3u8檔案是 HTTP Live Streaming(縮寫為 HLS)協議的部分內容,而 HLS 是一個由蘋果公司提出的基於 HTTP 的流媒體網路傳輸協議。

HLS 是新一代流媒體傳輸協議,其基本實現原理為將一個大的媒體檔案進行分片,將該分片檔案資源路徑記錄於 m3u8 檔案(即 playlist)內,其中附帶一些額外描述(比如該資源的多頻寬資訊···)用於提供給客戶端。客戶端依據該 m3u8 檔案即可獲取對應的媒體資源,進行播放。

擴充套件M3U指令

#EXTM3U //必需,表示一個擴充套件的m3u檔案

#EXT-X-VERSION:3 //hls的協議版本號,暗示媒體流的相容性

#EXT-X-MEDIA-SEQUENCE:xx //首個分段的sequence number

#EXT-X-ALLOW-CACHE:NO //是否快取

#EXT-X-TARGETDURATION:5 //每個視訊分段最大的時長(單位秒)

#EXT-X-DISCONTINUITY //表示換編碼

#EXTINF: //每個切片的時長

/ 獲取.m3u8檔案中的視訊資訊 /

.m3u8檔案儲存了視訊所在的位置資訊,我們可以通過傳送一個Get請求來獲取連結中的內容:

content = requests.get(m3u8Url).text

/ 拼接視訊下載連結 /

我們通過得到的視訊資訊進行視訊網址拼接,.m3u8檔案中的連結可以為全路徑或者相對路徑,所以我們判斷後進行拼接並存入一個列表當中:

urls = []for index, video in enumerate(content.split('\n')):    if '#EXTINF' in video:        if content[index + 1][0] == '/':            downloadLink = url.split('//')[0] + "//" + url.split('//')[1].split('/')[0] + content[index + 1]        elif content[index + 1][:4] == 'http':            downloadLink = content[index + 1]        else:            downloadLink = url.replace(url.split('/')[-1], content[index + 1])        urls.append(downloadLink)

/ 使用多執行緒下載視訊 /

得到視訊列表後,我們就可以對視訊進行下載了,為了提高下載效率,我們可以使用多執行緒進行下載:

def download(downloadLink, name):    for _ in range(10):        try:            req = requests.get(downloadLink, headers=headers, timeout=15).content            with open(f"{name}", "wb") as f:                f.write(req)                f.flush()            break        except:            if _ == 9:                print(f"{name}下載失敗")            else:                print(f"{name}正在進行第{_}次重試")pool = ThreadPoolExecutor(max_workers=threadNum)futures = []for index, downloadLink in enumerate(urls):    fileList.append(os.path.basename(downloadLink))    futures.append(pool.submit(download, downloadLink, f"{downloadPath}/{os.path.basename(downloadLink)}"))wait(futures)

/ 合併視訊 /

等待全部下載完成後,是一個個的ts視訊檔案,然後我們再將這些檔案合併成一個視訊檔案,此時要注意視訊的順序,我們可以在我們的視訊列表中依次取出進行合併

def merge_file(path, name):    global fileList    cmd = "copy /b "    for i in fileList:        if i != fileList[-1]:            cmd += f"{i} + "        else:            cmd += f"{i} {name}"    os.chdir(path)    with open('combine.cmd', 'w') as f:        f.write(cmd)    os.system("combine.cmd")    os.system('del /Q *.ts')    os.system('del /Q *.cmd')

這裡我們是寫了一個指令碼來完成合並的任務,使用命令'copy /b file1 + file2 +... + fileN newFile'進行合併,並於完成後將ts小檔案進行了刪除。至此,視訊就下載完成了。

/ 完整程式碼如下 /

import requestsimport osfrom concurrent.futures import ThreadPoolExecutor, waitimport sysfinishedNum = 0allNum = 0fileList = []headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0'}def download(downloadLink, name):    global finishedNum    global allNum    for _ in range(10):        try:            req = requests.get(downloadLink, headers=headers, timeout=15).content            with open(f"{name}", "wb") as f:                f.write(req)                f.flush()            finishedNum += 1            print(f"{name}下載成功, 總進度{round(finishedNum / allNum * 100, 2)}% ({finishedNum}/{allNum})")            break        except:            if _ == 9:                print(f"{name}下載失敗")            else:                print(f"{name}正在進行第{_}次重試")def merge_file(path, name):    global fileList    cmd = "copy /b "    for i in fileList:        if i != fileList[-1]:            cmd += f"{i} + "        else:            cmd += f"{i} {name}"    os.chdir(path)    with open('combine.cmd', 'w') as f:        f.write(cmd)    os.system("combine.cmd")    os.system('del /Q *.ts')    os.system('del /Q *.cmd')def downloader(url, name, threadNum):    global allNum    global fileList    print("讀取檔案資訊中...")    downloadPath = 'Download'    if not os.path.exists(downloadPath):        os.mkdir(downloadPath)    # 檢視是否存在    if os.path.exists(f"{downloadPath}/{name}"):        print(f"視訊檔案已經存在,如需重新下載請先刪除之前的視訊檔案")        return    content = requests.get(url, headers=headers).text.split('\n')    if "#EXTM3U" not in content[0]:        raise BaseException(f"非M3U8連結")    # .m3u8 跳轉    for video in content:        if ".m3u8" in video:            if video[0] == '/':                url = url.split('//')[0] + "//" + url.split('//')[1].split('/')[0] + video            elif video[:4] == 'http':                url = video            else:                url = url.replace(url.split('/')[-1], video)            print(url)            content = requests.get(url, headers=headers).text.split('\n')    urls = []    for index, video in enumerate(content):        if '#EXTINF' in video:            if content[index + 1][0] == '/':                downloadLink = url.split('//')[0] + "//" + url.split('//')[1].split('/')[0] + content[index + 1]            elif content[index + 1][:4] == 'http':                downloadLink = content[index + 1]            else:                downloadLink = url.replace(url.split('/')[-1], content[index + 1])            urls.append(downloadLink)    allNum = len(urls)    pool = ThreadPoolExecutor(max_workers=threadNum)    futures = []    for index, downloadLink in enumerate(urls):        fileList.append(os.path.basename(downloadLink))        futures.append(pool.submit(download, downloadLink, f"{downloadPath}/{os.path.basename(downloadLink)}"))    wait(futures)    print(f"執行完成")    merge_file(downloadPath, name)    print(f"合併完成")    print(f"檔案下載成功,盡情享用吧")if __name__ == '__main__':    videoUrl = str(sys.argv[1])    name = str(sys.argv[2])    threadNum = int(sys.argv[3])    downloader(videoUrl, name, threadNum)

/ 打包好的Windows版軟體下載 /

百度雲下載地址:https://pan.baidu.com/s/1ZsPb9WTmYP8VUKuR9tuoyw 提取碼:mxb6

藍奏雲下載地址:https://lanzous.com/icnc1ve