First init project.
This commit is contained in:
commit
b29c52df47
|
@ -0,0 +1,100 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import requests
|
||||
|
||||
from encrypt import encrypted_request
|
||||
from constants import headers
|
||||
from constants import song_download_url
|
||||
from constants import get_song_url
|
||||
from constants import get_album_url
|
||||
from constants import get_artist_url
|
||||
from constants import get_playlist_url
|
||||
|
||||
|
||||
class CloudApi(object):
|
||||
|
||||
def __init__(self, timeout=30):
|
||||
super().__init__()
|
||||
self.session = requests.session()
|
||||
self.session.headers.update(headers)
|
||||
self.timeout = timeout
|
||||
|
||||
def get_request(self, url):
|
||||
|
||||
response = self.session.get(url, timeout=self.timeout)
|
||||
result = response.json()
|
||||
if result['code'] != 200:
|
||||
print('Return {} when try to get {}'.format(result, url))
|
||||
else:
|
||||
return result
|
||||
|
||||
def post_request(self, url, params):
|
||||
|
||||
data = encrypted_request(params)
|
||||
response = self.session.post(url, data=data, timeout=self.timeout)
|
||||
result = response.json()
|
||||
if result['code'] != 200:
|
||||
print('Return {} when try to post {} => {}'.format(result, params, url))
|
||||
else:
|
||||
return result
|
||||
|
||||
def get_song(self, song_id):
|
||||
"""
|
||||
Get song info by song id
|
||||
:param song_id:
|
||||
:return:
|
||||
"""
|
||||
url = get_song_url(song_id)
|
||||
result = self.get_request(url)
|
||||
|
||||
return result['songs'][0]
|
||||
|
||||
def get_album_songs(self, album_id):
|
||||
"""
|
||||
Get all album songs info by album id
|
||||
:param album_id:
|
||||
:return:
|
||||
"""
|
||||
url = get_album_url(album_id)
|
||||
result = self.get_request(url)
|
||||
|
||||
return result['album']['songs']
|
||||
|
||||
def get_song_url(self, song_id, bit_rate=320000):
|
||||
"""Get a song's download url.
|
||||
:params song_id: song id<int>.
|
||||
:params bit_rate: {'MD 128k': 128000, 'HD 320k': 320000}
|
||||
:return:
|
||||
"""
|
||||
url = song_download_url
|
||||
csrf = ''
|
||||
params = {'ids': [song_id], 'br': bit_rate, 'csrf_token': csrf}
|
||||
result = self.post_request(url, params)
|
||||
song_url = result['data'][0]['url']
|
||||
if song_url is None:
|
||||
print('Song {} is not available due to copyright issue. => {}'.format(song_id, result))
|
||||
else:
|
||||
return song_url
|
||||
|
||||
def get_hot_songs(self, artist_id):
|
||||
"""
|
||||
Get a artist 50 hot songs
|
||||
:param artist_id:
|
||||
:return:
|
||||
"""
|
||||
url = get_artist_url(artist_id)
|
||||
result = self.get_request(url)
|
||||
return result['hotSongs']
|
||||
|
||||
def get_playlist_songs(self, playlist_id):
|
||||
"""
|
||||
Get a public playlist all songs
|
||||
:param playlist_id:
|
||||
:return:
|
||||
"""
|
||||
url = get_playlist_url(playlist_id)
|
||||
result = self.get_request(url)
|
||||
return result['result']['tracks'], result['result']['name']
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,59 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import os
|
||||
|
||||
from configparser import ConfigParser
|
||||
|
||||
# Config key
|
||||
_CONFIG_KEY_DOWNLOAD_HOT_MAX = 'download.hot_max'
|
||||
_CONFIG_KEY_DOWNLOAD_DIR = 'download.dir'
|
||||
_CONFIG_KEY_SONG_NAME_TYPE = 'song.name_type'
|
||||
_CONFIG_KEY_SONG_FOLDER_TYPE = 'song.folder_type'
|
||||
|
||||
# Base path
|
||||
_CONFIG_MAIN_PATH = os.path.join(os.getenv('HOME'), '.ncm')
|
||||
_CONFIG_FILE_PATH = os.path.join(_CONFIG_MAIN_PATH, 'ncm.ini')
|
||||
_DEFAULT_DOWNLOAD_PATH = os.path.join(_CONFIG_MAIN_PATH, 'download')
|
||||
|
||||
# Global config value
|
||||
DOWNLOAD_HOT_MAX = 50
|
||||
DOWNLOAD_DIR = ''
|
||||
SONG_NAME_TYPE = 1
|
||||
SONG_FOLDER_TYPE = 1
|
||||
|
||||
|
||||
def load_config():
|
||||
if not os.path.exists(_CONFIG_FILE_PATH):
|
||||
init_config_file()
|
||||
|
||||
cfg = ConfigParser()
|
||||
cfg.read(_CONFIG_FILE_PATH)
|
||||
|
||||
global DOWNLOAD_HOT_MAX
|
||||
global DOWNLOAD_DIR
|
||||
global SONG_NAME_TYPE
|
||||
global SONG_FOLDER_TYPE
|
||||
|
||||
DOWNLOAD_HOT_MAX = cfg.getint('settings', _CONFIG_KEY_DOWNLOAD_HOT_MAX)
|
||||
DOWNLOAD_DIR = cfg.get('settings', _CONFIG_KEY_DOWNLOAD_DIR)
|
||||
SONG_NAME_TYPE = cfg.getint('settings', _CONFIG_KEY_SONG_NAME_TYPE)
|
||||
SONG_FOLDER_TYPE = cfg.getint('settings', _CONFIG_KEY_SONG_FOLDER_TYPE)
|
||||
|
||||
|
||||
def init_config_file():
|
||||
default_config = '''\
|
||||
[settings]
|
||||
{key_max} = 50
|
||||
{key_dir} = {value_dir}
|
||||
{key_name_type} = 1
|
||||
{key_folder_type} = 1
|
||||
'''.format(key_max=_CONFIG_KEY_DOWNLOAD_HOT_MAX,
|
||||
key_dir=_CONFIG_KEY_DOWNLOAD_DIR,
|
||||
value_dir=_DEFAULT_DOWNLOAD_PATH,
|
||||
key_name_type=_CONFIG_KEY_SONG_NAME_TYPE,
|
||||
key_folder_type=_CONFIG_KEY_SONG_FOLDER_TYPE)
|
||||
|
||||
if not os.path.exists(_CONFIG_MAIN_PATH):
|
||||
os.makedirs(_CONFIG_MAIN_PATH)
|
||||
f = open(_CONFIG_FILE_PATH, 'w')
|
||||
f.write(default_config)
|
||||
f.close()
|
|
@ -0,0 +1,32 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Encrypt key
|
||||
modulus = '00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7b725152b3ab17a876aea8a5aa76d2e417629ec4ee341f56135fccf695280104e0312ecbda92557c93870114af6c9d05c4f7f0c3685b7a46bee255932575cce10b424d813cfe4875d3e82047b97ddef52741d546b8e289dc6935b3ece0462db0a22b8e7'
|
||||
nonce = '0CoJUm6Qyw8W8jud'
|
||||
pub_key = '010001'
|
||||
|
||||
|
||||
headers = {
|
||||
'Accept': '*/*',
|
||||
'Host': 'music.163.com',
|
||||
'User-Agent': 'curl/7.51.0',
|
||||
'Referer': 'http://music.163.com',
|
||||
'Cookie': 'appver=2.0.2'
|
||||
}
|
||||
song_download_url = 'http://music.163.com/weapi/song/enhance/player/url?csrf_token='
|
||||
|
||||
|
||||
def get_song_url(song_id):
|
||||
return 'http://music.163.com/api/song/detail/?ids=[{}]'.format(song_id)
|
||||
|
||||
|
||||
def get_album_url(album_id):
|
||||
return 'http://music.163.com/api/album/{}/'.format(album_id)
|
||||
|
||||
|
||||
def get_artist_url(artist_id):
|
||||
return 'http://music.163.com/api/artist/{}'.format(artist_id)
|
||||
|
||||
|
||||
def get_playlist_url(playlist_id):
|
||||
return 'http://music.163.com/api/playlist/detail?id={}'.format(playlist_id)
|
|
@ -0,0 +1,88 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import requests
|
||||
|
||||
from api import CloudApi
|
||||
from file_util import add_metadata_to_song
|
||||
|
||||
|
||||
def download_song_by_id(song_id, download_folder):
|
||||
# get song info
|
||||
api = CloudApi()
|
||||
song = api.get_song(song_id)
|
||||
download_song_by_song(song, download_folder)
|
||||
|
||||
|
||||
def download_song_by_song(song, download_folder):
|
||||
# get song info
|
||||
api = CloudApi()
|
||||
song_id = song['id']
|
||||
song_name = song['name']
|
||||
artist_name = song['artists'][0]['name']
|
||||
song_file_name = '{}_{}.mp3'.format(artist_name, song_name)
|
||||
|
||||
# download song
|
||||
song_url = api.get_song_url(song_id)
|
||||
print('song url is:', song_url)
|
||||
download_file(song_url, song_file_name, download_folder)
|
||||
|
||||
# download cover
|
||||
cover_url = song['album']['blurPicUrl']
|
||||
cover_file_name = 'cover_{}.jpg'.format(song_id)
|
||||
print('cover url:', cover_url)
|
||||
download_file(cover_url, cover_file_name, download_folder)
|
||||
|
||||
# add metadata for song
|
||||
song_file_path = os.path.join(download_folder, song_file_name)
|
||||
cover_file_path = os.path.join(download_folder, cover_file_name)
|
||||
add_metadata_to_song(song_file_path, cover_file_path, song)
|
||||
|
||||
# delete cover file
|
||||
os.remove(cover_file_path)
|
||||
|
||||
|
||||
def download_file(file_url, file_name, folder):
|
||||
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
file_path = os.path.join(folder, file_name)
|
||||
|
||||
# if not os.path.exists(file_path):
|
||||
response = requests.get(file_url, stream=True)
|
||||
length = int(response.headers.get('Content-Length'))
|
||||
progress = ProgressBar(file_name, length)
|
||||
|
||||
with open(file_path, 'wb') as file:
|
||||
for buffer in response.iter_content(chunk_size=1024):
|
||||
if buffer:
|
||||
file.write(buffer)
|
||||
progress.refresh(len(buffer))
|
||||
|
||||
|
||||
class ProgressBar(object):
|
||||
|
||||
def __init__(self, file_name, total):
|
||||
super().__init__()
|
||||
self.file_name = file_name
|
||||
self.count = 0
|
||||
self.prev_count = 0
|
||||
self.total = total
|
||||
self.status = 'Downloading:'
|
||||
self.end_str = '\r'
|
||||
|
||||
def __get_info(self):
|
||||
return '[{}] {:.2f}KB, Progress: {:.2f}%'\
|
||||
.format(self.file_name, self.total/1024, self.count/self.total*100)
|
||||
|
||||
def refresh(self, count):
|
||||
self.count += count
|
||||
# Update progress if down size > 10k
|
||||
if (self.count - self.prev_count) > 10240:
|
||||
self.prev_count = self.count
|
||||
print(self.__get_info(), end=self.end_str)
|
||||
# Finish downloading
|
||||
if self.count >= self.total:
|
||||
self.status = 'Downloaded:'
|
||||
self.end_str = '\n'
|
||||
print(self.__get_info(), end=self.end_str)
|
|
@ -0,0 +1,37 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import base64
|
||||
import json
|
||||
import binascii
|
||||
from Crypto.Cipher import AES
|
||||
|
||||
from constants import modulus, nonce, pub_key
|
||||
|
||||
|
||||
def encrypted_request(text):
|
||||
text = json.dumps(text)
|
||||
sec_key = create_secret_key(16)
|
||||
enc_text = aes_encrypt(aes_encrypt(text, nonce), sec_key)
|
||||
enc_sec_key = rsa_encrypt(sec_key, pub_key, modulus)
|
||||
data = {'params': enc_text, 'encSecKey': enc_sec_key}
|
||||
return data
|
||||
|
||||
|
||||
def aes_encrypt(text, sec_key):
|
||||
pad = 16 - len(text) % 16
|
||||
text = text + chr(pad) * pad
|
||||
encryptor = AES.new(sec_key, 2, '0102030405060708')
|
||||
cipher_text = encryptor.encrypt(text)
|
||||
cipher_text = base64.b64encode(cipher_text).decode('utf-8')
|
||||
return cipher_text
|
||||
|
||||
|
||||
def rsa_encrypt(text, public_key, p_modulus):
|
||||
text = text[::-1]
|
||||
rs = pow(int(binascii.hexlify(text), 16), int(public_key, 16), int(p_modulus, 16))
|
||||
return format(rs, 'x').zfill(256)
|
||||
|
||||
|
||||
def create_secret_key(size):
|
||||
return binascii.hexlify(os.urandom(size))[:16]
|
|
@ -0,0 +1,38 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from mutagen.id3 import ID3, APIC, TPE1, TIT2, TALB
|
||||
|
||||
|
||||
def add_metadata_to_song(file_path, cover_path, song):
|
||||
id3 = ID3(file_path)
|
||||
# add album cover
|
||||
id3.add(
|
||||
APIC(
|
||||
encoding=0, # 3 is for UTF8, but here we use 0 (LATIN1) for 163, orz~~~
|
||||
mime='image/jpeg', # image/jpeg or image/png
|
||||
type=3, # 3 is for the cover(front) image
|
||||
data=open(cover_path, 'rb').read()
|
||||
)
|
||||
)
|
||||
# add artist name
|
||||
id3.add(
|
||||
TPE1(
|
||||
encoding=3,
|
||||
text=song['artists'][0]['name']
|
||||
)
|
||||
)
|
||||
# add song name
|
||||
id3.add(
|
||||
TIT2(
|
||||
encoding=3,
|
||||
text=song['name']
|
||||
)
|
||||
)
|
||||
# add album name
|
||||
id3.add(
|
||||
TALB(
|
||||
encoding=3,
|
||||
text=song['album']['name']
|
||||
)
|
||||
)
|
||||
id3.save(v2_version=3)
|
|
@ -0,0 +1,98 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import argparse
|
||||
import os
|
||||
import config
|
||||
|
||||
from api import CloudApi
|
||||
from downloader import download_song_by_id
|
||||
from downloader import download_song_by_song
|
||||
|
||||
# load the config first
|
||||
config.load_config()
|
||||
print('max:', config.DOWNLOAD_HOT_MAX)
|
||||
print('dir:', config.DOWNLOAD_DIR)
|
||||
print('name_type:', config.SONG_NAME_TYPE)
|
||||
print('folder_type:', config.SONG_FOLDER_TYPE)
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(description='Welcome to netease cloud music downloader!')
|
||||
parser.add_argument('-s', metavar='song_id', dest='song_id',
|
||||
help='Download a song by song_id')
|
||||
parser.add_argument('-ss', metavar='song_ids', dest='song_ids', nargs='+',
|
||||
help='Download a song list, song_id split by space')
|
||||
parser.add_argument('-hot', metavar='artist_id', dest='artist_id',
|
||||
help='Download an artist hot 50 songs by artist_id')
|
||||
parser.add_argument('-a', metavar='album_id', dest='album_id',
|
||||
help='Download an album all songs by album_id')
|
||||
parser.add_argument('-p', metavar='playlist_id', dest='playlist_id',
|
||||
help='Download a playlist all songs by playlist_id')
|
||||
args = parser.parse_args()
|
||||
api = CloudApi()
|
||||
|
||||
|
||||
def download_hot_songs(artist_id):
|
||||
songs = api.get_hot_songs(artist_id)
|
||||
folder_name = songs[0]['artists'][0]['name'] + '_hot50'
|
||||
folder_path = os.path.join(config.DOWNLOAD_DIR, folder_name)
|
||||
for i, song in enumerate(songs):
|
||||
print(str(i + 1) + ' song name:' + song['name'])
|
||||
download_song_by_song(song, folder_path)
|
||||
|
||||
|
||||
def download_album_songs(album_id):
|
||||
songs = api.get_album_songs(album_id)
|
||||
folder_name = songs[0]['artists'][0]['name'] + '_' + songs[0]['album']['name']
|
||||
folder_path = os.path.join(config.DOWNLOAD_DIR, folder_name)
|
||||
for i, song in enumerate(songs):
|
||||
print(str(i + 1) + ' song name:' + song['name'])
|
||||
download_song_by_song(song, folder_path)
|
||||
|
||||
|
||||
def download_playlist_songs(playlist_id):
|
||||
songs, playlist_name = api.get_playlist_songs(playlist_id)
|
||||
folder_name = 'playlist_' + playlist_name
|
||||
folder_path = os.path.join(config.DOWNLOAD_DIR, folder_name)
|
||||
for i, song in enumerate(songs):
|
||||
print(str(i + 1) + ' song name:' + song['name'])
|
||||
download_song_by_song(song, folder_path)
|
||||
|
||||
|
||||
if args.song_id:
|
||||
download_song_by_id(args.song_id, config.DOWNLOAD_DIR)
|
||||
elif args.song_ids:
|
||||
for song_id in args.song_ids:
|
||||
download_song_by_id(song_id, config.DOWNLOAD_DIR)
|
||||
elif args.artist_id:
|
||||
download_hot_songs(args.artist_id)
|
||||
elif args.album_id:
|
||||
download_album_songs(args.album_id)
|
||||
elif args.playlist_id:
|
||||
download_playlist_songs(args.playlist_id)
|
||||
|
||||
|
||||
# song = api.get_song('464035731')
|
||||
# print('song id:{}, song name:{}, album:{}'.format(song['id'], song['name'], song['album']['name']))
|
||||
|
||||
# from mutagen.mp3 import MP3
|
||||
# from mutagen.id3 import ID3, APIC, error
|
||||
#
|
||||
#
|
||||
# file_path = '/Users/codezjx/Downloads/test.mp3'
|
||||
# cover_path = '/Users/codezjx/Downloads/test.jpg'
|
||||
#
|
||||
# audio = MP3(file_path, ID3=ID3)
|
||||
# if audio.tags is None:
|
||||
# print('No ID3 tag, try to add one!')
|
||||
# try:
|
||||
# audio.add_tags()
|
||||
# except error:
|
||||
# pass
|
||||
# audio.tags.add(
|
||||
# APIC(
|
||||
# encoding=3, # 3 is for utf-8
|
||||
# mime='image/jpg', # image/jpeg or image/png
|
||||
# type=3, # 3 is for the cover(front) image
|
||||
# data=open(cover_path, 'rb').read()
|
||||
# )
|
||||
# )
|
||||
# audio.save()
|
Loading…
Reference in New Issue