python 手寫m3u8多執行緒下載器
阿新 • • 發佈:2020-12-16
現在幾乎所有的視訊都是m3u8視訊流了。自己嘗試手寫了一個簡單的m3u8下載器,除錯了之後,發現挺好用的,只需輸入m3u8連結,檔名及執行緒數n,就可以下載了。理論上執行緒數越大,下載速度越快,無上限,當然要看自己電腦配置和網速了。
直接放程式碼,windows電腦中,建議指令碼在cmd下執行,這樣進度條可以正常顯示
import os
import threading
from queue import Queue
import requests
def downm3u8(m3u8_url, name, directory, t_num) :
def get_content(url):#不停嘗試,直至成功獲取內容
while True:
try:
content = requests.get(url).content
return content
except:
continue
def download(pre, n, out_folder):
while True:
ts = task.get()#獲取單個視訊片段字尾
lock.acquire()
count[0] += 1
print('[' + '#' * int((count[0] + 1) / (n / float(100))) + ' ' * (
100 - int((count[0] + 1) / (n / float(100)))) + ']' + '--[' + str(count[0]) + '/' + str(
n) + ']', end='\r', flush=True)#輸出進度條
# print('已下載:[ %s/%s ]'%(count[0],n))#如果不用上面的進度條,可以把上面的print前面加#,然後把這行前面的#去掉,顯示進度
lock.release()
ts_url = pre + ts#拼接單個視訊片段下載連結
number = count[0] - 1
content = get_content(ts_url)
with open('%s/%s.mp4' % (out_folder, number), 'ab') as f:
f.write(content)#寫入檔案
task.task_done()
try:#在當前目錄下建資料夾,儲存下載結果
os.mkdir('%s/%s' % (directory, name))
except:
pass
pre = m3u8_url.rstrip(m3u8_url.split('/')[-1])#m3u8連結字首
lines = requests.get(m3u8_url).text.strip().split('\n')#獲取m3u8檔案內容
ts_list = [line.split('/')[-1] for line in lines if line.startswith('#') == False]#獲取m3u8檔案下視訊流字尾
n = len(ts_list)#視訊流片段數
count = [0]#用來對下載片段命名及計算進度
dict = {}
lock = threading.Lock()#執行緒鎖防止幾個執行緒同時輸出錯亂
task = Queue()#設定佇列
for i in range(int(t_num)):
t = threading.Thread(target=download, args=(pre, n, '%s/%s' % (directory, name)))
t.daemon = True
t.start()
for ts in ts_list:
task.put(ts)
task.join()
print('\n' + '%s has downloaded successfully' % name)
print('***Merging film***')
fo = open('%s/%s/%s.mp4' % (directory, name, name), 'ab')
for i in range(n):
fl = open('%s/%s/%s.mp4' % (directory, name, i), 'rb')
fo.write(fl.read())
fl.close()
os.remove('%s/%s/%s.mp4' % (directory, name, i))
fo.close()
print('Film is in %s' % directory)
if __name__ == '__main__':
input1 = input('m3u8 link:')#輸入m3u8連結(例如:https://wuji.zhulong-zuida.com/20190706/762_c260ca6c/800k/hls/index.m3u8)
input2 = input('film name:')#輸入檔名稱(支援中文名)
thread_number = input('threading number:')#輸入要設定的執行緒數字,一般可以設定10-30,電腦都跑得動。
directory = os.getcwd().replace('\\', '/')
downm3u8(input1, input2, directory, thread_number)
上面是簡單版的下載,平時使用上面的指令碼即可。m3u8連結可以通過建議通過貓抓外掛獲取,比較方便。
如果m3u8有加密,可以使用下面的指令碼。其實下面的指令碼一樣可以下載未加密的m3u8連結,但是Crypto.Cipher模組安裝的時候容易報錯,所以,如果並非必須要用,就不推薦了。
import os
import threading
from queue import Queue
from Crypto.Cipher import AES
import requests
def downm3u8(m3u8_url, name, directory, t_num):
def get_content(url):
while True:
try:
content = requests.get(url).content
return content
except:
continue
def download(pre, n, key, out_folder):
while True:
ts = task.get()
lock.acquire()
count[0] += 1
print('[' + '#' * int((count[0] + 1) / (n / float(100))) + ' ' * (
100 - int((count[0] + 1) / (n / float(100)))) + ']' + '--[' + str(count[0]) + '/' + str(
n) + ']', end='\r', flush=True)
# print('已下載:[ %s/%s ]'%(count[0],n))
lock.release()
ts_url = pre + ts
number = count[0] - 1
content = get_content(ts_url)
if len(key) != 0:
cryptor = AES.new(key, AES.MODE_CBC, key)
content = cryptor.decrypt(content)
with open('%s/%s.mp4' % (out_folder, number), 'ab') as f:
f.write(content)
task.task_done()
try:
os.mkdir('%s/%s' % (directory, name))
except:
pass
pre = m3u8_url.rstrip(m3u8_url.split('/')[-1])
lines = requests.get(m3u8_url).text.strip().split('\n')
ts_list = [line.split('/')[-1] for line in lines if line.startswith('#') == False]
key = b''
for line in lines:
if 'AES-128' in line:
key_url = pre + line.split('"')[1].split('/')[-1]
key = requests.get(key_url).content
n = len(ts_list)
count = [0]
dict = {}
lock = threading.Lock()
task = Queue()
for i in range(int(t_num)):
t = threading.Thread(target=download, args=(pre, n, key, '%s/%s' % (directory, name)))
t.daemon = True
t.start()
for ts in ts_list:
task.put(ts)
task.join()
print('\n' + '%s has downloaded successfully' % name)
print('***Merging film***')
fo = open('%s/%s/%s.mp4' % (directory, name, name), 'ab')
for i in range(n):
fl = open('%s/%s/%s.mp4' % (directory, name, i), 'rb')
fo.write(fl.read())
fl.close()
os.remove('%s/%s/%s.mp4' % (directory, name, i))
fo.close()
print('Film is in %s' % directory)
if __name__ == '__main__':
input1 = input('m3u8 link:')
input2 = input('film name:')
thread_number = input('threading number:')
directory = os.getcwd().replace('\\', '/')
downm3u8(input1, input2, directory, thread_number)
github專案傳送門:github連結
如果覺得好用的話,歡迎打賞啊