somewhat functional

This commit is contained in:
N/A
2023-07-01 15:22:47 -05:00
commit 353227936a
3 changed files with 632 additions and 0 deletions

181
.gitignore vendored Normal file
View File

@@ -0,0 +1,181 @@
secrets/
temp/
# Created by https://www.toptal.com/developers/gitignore/api/python
# Edit at https://www.toptal.com/developers/gitignore?templates=python
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
### Python Patch ###
# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
poetry.toml
# ruff
.ruff_cache/
# LSP config files
pyrightconfig.json
# End of https://www.toptal.com/developers/gitignore/api/python

8
requirements.txt Normal file
View File

@@ -0,0 +1,8 @@
beautifulsoup4==4.12.2
bs4==0.0.1
certifi==2023.5.7
charset-normalizer==3.1.0
idna==3.4
requests==2.31.0
soupsieve==2.4.1
urllib3==2.0.3

443
src/blandcamp.py Normal file
View File

@@ -0,0 +1,443 @@
from __future__ import annotations
from typing import List, Optional, Iterable
import json
from bs4 import BeautifulSoup as BS
import requests
import sys
import argparse
import multiprocessing
import multiprocessing.pool
from enum import Enum, auto, unique
import requests
import os
import tempfile
import zipfile
import pathlib
import subprocess
import concurrent.futures
import traceback
import re
import shutil
import threading
import time
from pprint import pprint as pp
import queue
import contextlib
CHUNK_SIZE = 128 * 1024
REQ_TIME_SEC = 2.5
_sem = threading.Semaphore(0)
def _release_requests():
while True:
_sem.release()
time.sleep(REQ_TIME_SEC - (time.time() % REQ_TIME_SEC))
def wait_to_request():
_sem.acquire()
pg_headers = {
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/114.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Referer': 'https://bandcamp.com/',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'cross-site',
'Upgrade-Insecure-Requests': '1',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
dl_headers = {
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/114.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Referer': 'https://bandcamp.com/',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
cookies = {}
def get(*args, **kwargs):
h = kwargs.get('headers')
if h:
for k, v in pg_headers:
h[k] = v
else:
kwargs['headers'] = pg_headers
c = kwargs.get('cookies')
if c:
for k, v in cookies:
c[k] = v
else:
kwargs['cookies'] = cookies
if kwargs.get('session'):
s = kwargs['session']
del kwargs['session']
else:
s = requests.Session()
wait_to_request()
return s.get(*args, **kwargs)
def get_file(*args, **kwargs):
h = kwargs.get('headers')
if h:
for k, v in dl_headers:
h[k] = v
else:
kwargs['headers'] = dl_headers
c = kwargs.get('cookies')
if c:
for k, v in cookies:
c[k] = v
else:
kwargs['cookies'] = cookies
if kwargs.get('session'):
s = kwargs['session']
del kwargs['session']
else:
s = requests.Session()
wait_to_request()
return s.get(*args, **kwargs)
def remove_invalid(s: str) -> str:
#\ / * ? : " < > |
return re.sub(r'[\\/\*\?:"<>\|]', '', s)
@unique
class Format(Enum):
mp3 = auto()
mp3_320 = auto()
ogg = auto()
flac = auto()
wav = auto()
opus = auto()
def dl_info(self) -> DownloadInfo:
if self == Format.mp3:
return DownloadInfo(self, 'mp3', 'mp3-v0')
if self == Format.mp3_320:
return DownloadInfo(self, 'mp3', 'mp3-320')
if self == Format.ogg:
return DownloadInfo(self, 'ogg', 'vorbis')
if self == Format.flac:
return DownloadInfo(self, 'flac', 'flac')
if self == Format.wav:
return DownloadInfo(self, 'wav', 'wav')
if self == Format.opus:
return DownloadInfo(self, 'opus', None)
raise ValueError()
#'mp3-v0', 'mp3-320', 'flac', 'aac-hi', 'vorbis', 'alac', 'wav', 'aiff-lossless'
@staticmethod
def from_str(s: str) -> Format:
if s == 'mp3':
return Format.mp3
elif s == 'mp3_320':
return Format.mp3_320
elif s == 'ogg':
return Format.ogg
elif s == 'flac':
return Format.flac
elif s == 'wav':
return Format.wav
elif s == 'opus':
return Format.opus
raise ValueError()
class DownloadInfo:
def __init__(self, format: Format, ext: str, name: str):
self.format = format
self.ext = ext
self.name = name
class Item:
def __init__(self, title, artist, item_id: int, dl_link: str =None):
self.title: str = title
self.artist: str = artist
self.dl_link: Optional[str] = dl_link
self.id: int = item_id
class Logger:
def __init__(self, filename: str):
self.filename = filename
self._log_queue = queue.Queue()
self._items = set()
self._killer = threading.Semaphore(0)
self._temp_semaphore = threading.Semaphore(0)
self._thread = threading.Thread(target=self._logger_worker)
self._thread.start()
self._temp_semaphore.acquire()
del self._temp_semaphore
def log_item(self, item_id: int):
self._items.add(item_id)
return self._log_queue.put(item_id)
def lookup_item(self, item_id: int) -> bool:
return item_id in self._items
#todo make a context manager
def close(self):
self._killer.release()
self._thread.join()
def _logger_worker(self):
with open(self.filename, 'a+') as f:
f.seek(0)
for s in f.readlines():
st = s.strip()
if st:
self._items.add(int(st))
self._temp_semaphore.release()
while not self._killer.acquire(timeout=.05):
try:
item = self._log_queue.get(timeout=.05)
f.write(f'{item}\n')
except queue.Empty:
pass
_logger: Logger = None
def check_log(item_id: int) -> bool:
return _logger.lookup_item(item_id)
def add_to_log(item_id: int):
return _logger.log_item(item_id)
def start_logger(filename: str):
global _logger
_logger = Logger(filename)
def stop_logger():
global _logger
if _logger is None:
return
_logger.close()
_logger = None
def catch_print(f):
def inner(*args, **kwargs):
try:
return f(*args, **kwargs)
except requests.TooManyRedirects as e:
print(e.request.url, flush=True)
traceback.print_exception(e)
raise
except Exception as e:
#print(repr(e), flush=True)
traceback.print_exception(e)
raise
return inner
def is_picture(s: str) -> bool:
return bool(is_picture.re_pic.search(s))
is_picture.re_pic = re.compile(r'\.(?:jpg|jpeg|png|gif|tiff|bmp)$', flags=re.RegexFlag.IGNORECASE)
def convert(binary:str, input: str, output_dir: str, output_ext: str, bitrate: int =None):
out_path = os.path.split(output_dir)[0]
output = os.path.join(out_path, os.path.splitext(os.path.split(input)[1])[0] + '.' + output_ext)
if bitrate:
return subprocess.run([binary, '-y', '-i', input, '-b:a', f'{bitrate}K', output])
else:
return subprocess.run([binary, '-y', '-i', input, output])
def load_cookies(s: str) -> dict:
with open(s) as f:
return json.load(f)['Request Cookies']
def parse_items(j: dict) -> Iterable[Item]:
for item in j['items']:
i = Item(item['item_title'], item['band_name'], item['item_id'])
sid = f'p{item["sale_item_id"]}'
try:
i.dl_link = j['redownload_urls'][sid]
except KeyError:
pass
yield i
def load_header(filename: str) -> dict:
with open(filename) as f:
j = json.load(f)
items = j['headers']
d = {}
for i in items:
d[i['name']] = i['value']
return d
#@catch_print
def download_track(path: str, url: str, conv_binary: str, item_id: int, convert_ext: str =None, bitrate: int =None):
pathlib.Path(path).parents[0].mkdir(parents=True, exist_ok=True)
with get(url, stream=True) as r:
r.raise_for_status()
if convert_ext:
with tempfile.TemporaryDirectory() as dir:
temp_name = os.path.join(dir, os.path.split(path)[1])
with open(temp_name, 'w+b') as tmpf:
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
tmpf.write(chunk)
convert(conv_binary, temp_name, path, convert_ext, bitrate)
else:
with open(path, 'w+b') as f:
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
f.write(chunk)
add_to_log(item_id)
#@catch_print
def download_album(path: str, url: str, conv_binary: str, item_id: int, convert_ext: str =None, bitrate: int = None):
# print(path)
# print(url)
pathlib.Path(path).mkdir(parents=True, exist_ok=True)
with get(url, stream=True) as r:
r.raise_for_status()
ctype = r.headers['content-type']
if ctype !='application/zip':
#print(r.content)
print(url)
raise RuntimeError()
with tempfile.TemporaryDirectory() as dir1:
temp_name = os.path.join(dir1, 'temp.zip')
with open(temp_name, 'w+b') as f:
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
if chunk:
f.write(chunk)
# with get(url) as r:
# r.raise_for_status()
# ctype = r.headers['content-type']
# if ctype !='application/zip':
# print(r.content)
# exit(1)
# with tempfile.TemporaryDirectory() as dir1:
# temp_name = os.path.join(dir1, 'temp.zip')
# with open(temp_name, 'w+b') as f:
# f.write(r.content)
try:
with zipfile.ZipFile(temp_name) as zip:
# print(f'SUCCESS: {temp_name}')
if convert_ext:
with tempfile.TemporaryDirectory() as dir2:
zip.extractall(dir2)
for fl in os.listdir(dir2):
if is_picture(fl):
shutil.move(os.path.join(dir2, fl), path)
for fl in os.listdir(dir2):
convert(conv_binary, os.path.join(dir2, fl), path, convert_ext, bitrate)
else:
zip.extractall(path)
except zipfile.BadZipFile:
print(url)
raise
add_to_log(item_id)
#@catch_print
def process_item(base_path: str, i: Item, f: Format, singles: bool, conv_binary: str, bitrate: int):
#print(str(i), flush=True)
#print(f'{i.artist}: {i.title}')
if check_log(i.id):
return
format = f.dl_info()
resp = get(i.dl_link)
if not resp.ok:
raise RuntimeError()
soup = BS(resp.text, features="html.parser")
dv = soup.find('div', id='pagedata', attrs={'data-blob':True})
blob = dv['data-blob']
j = json.loads(blob)
#todo are there ever multiple? what do then?
dls = j['download_items'][0]['downloads']
#print(j.keys())
typ = j['download_items'][0]['type']
if format.name:
url = dls[format.name]['url']
if typ == 'track':
if singles:
return download_track(os.path.join(base_path, remove_invalid(i.artist), 'Singles', remove_invalid(i.title) + '.' + format.ext), url, conv_binary, i.id)
else:
return download_track(os.path.join(base_path, remove_invalid(i.artist), remove_invalid(i.title), remove_invalid(i.title) + '.' + format.ext), url, conv_binary, i.id)
#todo get cover
elif typ == 'album' or typ == 'package':
return download_album(os.path.join(base_path, remove_invalid(i.artist), remove_invalid(i.title)), url, conv_binary, i.id)
else:
raise ValueError()
else:
url = dls[Format.flac.dl_info().name]['url']
temp_ext = Format.flac.dl_info().ext
if typ == 'track':
if singles:
return download_track(os.path.join(base_path, i.artist, 'Singles', i.title + '.' + temp_ext), url, conv_binary, i.id, convert_ext=format.ext, bitrate=bitrate)
else:
return download_track(os.path.join(base_path, i.artist, i.title, i.title + '.' + temp_ext), url, conv_binary, i.id, convert_ext=format.ext, bitrate=bitrate)
elif typ == 'album':
return download_album(os.path.join(base_path, i.artist, i.title), url, conv_binary, i.id, convert_ext=format.ext, bitrate=bitrate)
else:
raise ValueError()
def main(arguments):
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--json-data', type=str, help='file containing '
+ 'json response', required=True, dest='json')
parser.add_argument('-f', '--format', type=str, choices=['mp3', 'mp3_320',
'ogg', 'flac', 'wav', 'opus'])
parser.add_argument('-c', '--cookies', type=str, help='cookie file (in json, top item named '
+ '"Request Cookies")')
parser.add_argument('--header-file', type=str, dest='header', help='file to read headers from')
parser.add_argument('--header-dl-file', type=str, dest='header_dl')
parser.add_argument('--ffmpeg', type=str, help='location of ffmpeg '
+ 'binary for conversion')
parser.add_argument('--singles-dir', action='store_true', help='create a single '
+ 'directory for singles rather than storing each in it\'s own directory', dest='singles')
parser.add_argument('--bitrate', type=int, help='bitrate in kbs, '
+ 'ignored if not converting', default=None)
parser.add_argument('--logger', type=str, help='file that stores ')
parser.add_argument('-t', '--threads', type=str, default=0)
parser.add_argument('dir', type=str, metavar='output-directory')
args = parser.parse_args(arguments)
format = Format.from_str(args.format)
with open(args.json) as f:
js = json.load(f)
#items = parse_items(js)
#dlable = [item for item in items if item.dl_link]
binary = args.ffmpeg or 'ffmpeg'
c = load_cookies(args.cookies)
if c:
global cookies
cookies = c
if args.header:
global pg_headers
pg_headers = load_header(args.header)
if args.header_dl:
global dl_headers
dl_headers = load_header(args.header_dl)
threading.Thread(target=_release_requests).start()
def pack_item(i):
return (args.dir, i, format, args.singles, binary, args.bitrate)#, cookies)
if args.threads:
pooler = lambda : multiprocessing.pool.ThreadPool(args.threads)
else:
pooler = lambda : multiprocessing.pool.ThreadPool()
with pooler() as pool:
try:
if args.logger:
start_logger(args.logger)
gen = (pack_item(item) for item in parse_items(js) if item.dl_link)
asy = pool.starmap_async(process_item, gen)
asy.wait()
#asy.get()
return 0 if asy.successful() else 1
finally:
stop_logger()
if __name__ == '__main__':
exit(main(sys.argv[1:]))