1. 程式人生 > 其它 >python 手寫m3u8多執行緒下載器

python 手寫m3u8多執行緒下載器

技術標籤:指令碼python多執行緒m3u8下載

現在幾乎所有的視訊都是m3u8視訊流了。自己嘗試手寫了一個簡單的m3u8下載器,除錯了之後,發現挺好用的,只需輸入m3u8連結,檔名及執行緒數n,就可以下載了。理論上執行緒數越大,下載速度越快,無上限,當然要看自己電腦配置和網速了。

直接放程式碼,windows電腦中,建議指令碼在cmd下執行,這樣進度條可以正常顯示

import os
import threading
from queue import Queue
import requests

def downm3u8(m3u8_url, name, directory, t_num)
: def get_content(url):#不停嘗試,直至成功獲取內容 while True: try: content = requests.get(url).content return content except: continue def download(pre, n, out_folder): while True: ts = task.get()#獲取單個視訊片段字尾
lock.acquire() count[0] += 1 print('[' + '#' * int((count[0] + 1) / (n / float(100))) + ' ' * ( 100 - int((count[0] + 1) / (n / float(100)))) + ']' + '--[' + str(count[0]) + '/' + str( n) + ']', end='\r', flush=True)#輸出進度條 # print('已下載:[ %s/%s ]'%(count[0],n))#如果不用上面的進度條,可以把上面的print前面加#,然後把這行前面的#去掉,顯示進度
lock.release() ts_url = pre + ts#拼接單個視訊片段下載連結 number = count[0] - 1 content = get_content(ts_url) with open('%s/%s.mp4' % (out_folder, number), 'ab') as f: f.write(content)#寫入檔案 task.task_done() try:#在當前目錄下建資料夾,儲存下載結果 os.mkdir('%s/%s' % (directory, name)) except: pass pre = m3u8_url.rstrip(m3u8_url.split('/')[-1])#m3u8連結字首 lines = requests.get(m3u8_url).text.strip().split('\n')#獲取m3u8檔案內容 ts_list = [line.split('/')[-1] for line in lines if line.startswith('#') == False]#獲取m3u8檔案下視訊流字尾 n = len(ts_list)#視訊流片段數 count = [0]#用來對下載片段命名及計算進度 dict = {} lock = threading.Lock()#執行緒鎖防止幾個執行緒同時輸出錯亂 task = Queue()#設定佇列 for i in range(int(t_num)): t = threading.Thread(target=download, args=(pre, n, '%s/%s' % (directory, name))) t.daemon = True t.start() for ts in ts_list: task.put(ts) task.join() print('\n' + '%s has downloaded successfully' % name) print('***Merging film***') fo = open('%s/%s/%s.mp4' % (directory, name, name), 'ab') for i in range(n): fl = open('%s/%s/%s.mp4' % (directory, name, i), 'rb') fo.write(fl.read()) fl.close() os.remove('%s/%s/%s.mp4' % (directory, name, i)) fo.close() print('Film is in %s' % directory) if __name__ == '__main__': input1 = input('m3u8 link:')#輸入m3u8連結(例如:https://wuji.zhulong-zuida.com/20190706/762_c260ca6c/800k/hls/index.m3u8) input2 = input('film name:')#輸入檔名稱(支援中文名) thread_number = input('threading number:')#輸入要設定的執行緒數字,一般可以設定10-30,電腦都跑得動。 directory = os.getcwd().replace('\\', '/') downm3u8(input1, input2, directory, thread_number)

上面是簡單版的下載,平時使用上面的指令碼即可。m3u8連結可以通過建議通過貓抓外掛獲取,比較方便。
如果m3u8有加密,可以使用下面的指令碼。其實下面的指令碼一樣可以下載未加密的m3u8連結,但是Crypto.Cipher模組安裝的時候容易報錯,所以,如果並非必須要用,就不推薦了。

import os
import threading
from queue import Queue
from Crypto.Cipher import AES
import requests


def downm3u8(m3u8_url, name, directory, t_num):
    def get_content(url):
        while True:
            try:
                content = requests.get(url).content
                return content
            except:
                continue

    def download(pre, n, key, out_folder):
        while True:
            ts = task.get()
            lock.acquire()
            count[0] += 1
            print('[' + '#' * int((count[0] + 1) / (n / float(100))) + ' ' * (
                    100 - int((count[0] + 1) / (n / float(100)))) + ']' + '--[' + str(count[0]) + '/' + str(
                n) + ']', end='\r', flush=True)
#            print('已下載:[ %s/%s ]'%(count[0],n))
            lock.release()
            ts_url = pre + ts
            number = count[0] - 1
            content = get_content(ts_url)
            if len(key) != 0:
                cryptor = AES.new(key, AES.MODE_CBC, key)
                content = cryptor.decrypt(content)
            with open('%s/%s.mp4' % (out_folder, number), 'ab') as f:
                f.write(content)
            task.task_done()

    try:
        os.mkdir('%s/%s' % (directory, name))
    except:
        pass
    pre = m3u8_url.rstrip(m3u8_url.split('/')[-1])
    lines = requests.get(m3u8_url).text.strip().split('\n')
    ts_list = [line.split('/')[-1] for line in lines if line.startswith('#') == False]
    key = b''
    for line in lines:
        if 'AES-128' in line:
            key_url = pre + line.split('"')[1].split('/')[-1]
            key = requests.get(key_url).content
    n = len(ts_list)
    count = [0]
    dict = {}
    lock = threading.Lock()
    task = Queue()
    for i in range(int(t_num)):
        t = threading.Thread(target=download, args=(pre, n, key, '%s/%s' % (directory, name)))
        t.daemon = True
        t.start()
    for ts in ts_list:
        task.put(ts)
    task.join()

    print('\n' + '%s has downloaded successfully' % name)
    print('***Merging film***')
    fo = open('%s/%s/%s.mp4' % (directory, name, name), 'ab')
    for i in range(n):
        fl = open('%s/%s/%s.mp4' % (directory, name, i), 'rb')
        fo.write(fl.read())
        fl.close()
        os.remove('%s/%s/%s.mp4' % (directory, name, i))
    fo.close()
    print('Film is in %s' % directory)


if __name__ == '__main__':
    input1 = input('m3u8 link:')
    input2 = input('film name:')
    thread_number = input('threading number:')
    directory = os.getcwd().replace('\\', '/')
    downm3u8(input1, input2, directory, thread_number)

github專案傳送門:github連結
如果覺得好用的話,歡迎打賞啊
在這裡插入圖片描述