Source code for larigira.audiogen_podcast

import datetime
import logging
import os
import random
import sys
from subprocess import CalledProcessError, check_output

import dateutil.parser
import requests
from lxml import html
from pytimeparse.timeparse import timeparse

from larigira.fsutils import download_http


[docs]def delta_humanreadable(tdelta): if tdelta is None: return "" days = tdelta.days hours = (tdelta - datetime.timedelta(days=days)).seconds // 3600 if days: return "{}d{}h".format(days, hours) return "{}h".format(hours)
[docs]def get_duration(url): try: lineout = check_output( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-i", url, ] ).split(b"\n") except CalledProcessError as exc: raise ValueError("error probing `%s`" % url) from exc duration = next(l for l in lineout if l.startswith(b"duration=")) value = duration.split(b"=")[1] return int(float(value))
[docs]class Audio(object): def __init__(self, url, duration=None, date=None): self.url = url self._duration = duration self.date = date self.end_date = datetime.datetime( 9999, 12, 31, tzinfo=datetime.timezone.utc ) def __str__(self): return self.url def __repr__(self): return "<Audio {} ({} {})>".format( self.url, self._duration, delta_humanreadable(self.age) ) @property def duration(self): """lazy-calculation""" if self._duration is None: try: self._duration = get_duration(self.url.encode("utf-8")) except: logging.exception( "Error while computing duration of %s; set it to 0", self.url, ) self._duration = 0 return self._duration @property def urls(self): return [self.url] @property def age(self): if self.date is None: return None now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) return now - self.date @property def valid(self): return self.end_date >= datetime.datetime.utcnow().replace( tzinfo=datetime.timezone.utc )
[docs]def get_tree(feed_url): if feed_url.startswith("http:") or feed_url.startswith("https:"): tree = html.fromstring(requests.get(feed_url).content) else: if not os.path.exists(feed_url): raise ValueError("file not found: {}".format(feed_url)) tree = html.parse(open(feed_url)) return tree
[docs]def get_item_date(el): el_date = el.find("pubdate") if el_date is None: return None for time_format in ("%Y-%m-%dT%H:%M:%S%z", "%a, %d %b %Y %H:%M:%S %z"): try: return datetime.datetime.strptime(el_date.text, time_format) except: continue return dateutil.parser.parse(el_date.text)
[docs]def get_audio_from_item(item): encl = item.find("enclosure") if encl is not None: url = encl.get("url") else: return None audio_args = {} if item.find("duration") is not None: duration_parts = item.findtext("duration").split(":") total_seconds = 0 for i, num in enumerate(reversed(duration_parts)): total_seconds += int(float(num)) * (60 ** i) if total_seconds: audio_args["duration"] = total_seconds else: contents = item.xpath("group/content") if not contents: contents = item.xpath("content") for child in contents: if child.get("url") == url and child.get("duration") is not None: audio_args["duration"] = int(float(child.get("duration"))) break return Audio(url, **audio_args)
[docs]def get_urls(tree): items = tree.xpath("//item") for it in items: # title = it.find("title").text audio = get_audio_from_item(it) if audio is None: continue if audio.date is None: audio.date = get_item_date(it) yield audio
[docs]def parse_duration(arg): if arg.isdecimal(): secs = int(arg) else: secs = timeparse(arg) if secs is None: raise ValueError("%r is not a valid duration" % arg) return secs
[docs]def generate(spec): if "url" not in spec: raise ValueError("Malformed audiospec: missing 'url'") audios = list(get_urls(get_tree(spec["url"]))) if spec.get("min_len", False): audios = [ a for a in audios if a.duration >= parse_duration(spec["min_len"]) ] if spec.get("max_len", False): audios = [ a for a in audios if a.duration <= parse_duration(spec["max_len"]) ] # sort sort_by = spec.get("sort_by", "none") if sort_by == "random": random.shuffle(audios) elif sort_by == "date": audios.sort(key=lambda x: x.age) elif sort_by == "duration": audios.sort(key=lambda x: x.duration) if spec.get("reverse", False): audios.reverse() # slice audios = audios[int(spec.get("start", 0)) :] audios = audios[: int(spec.get("howmany", 1))] # copy local local_audios = [ download_http(a.url, copy=spec.get("copy", True), prefix="podcast") for a in audios ] return local_audios
# TODO: testing # TODO: lxml should maybe be optional? # TODO: ui if __name__ == "__main__": # less than proper testing logging.basicConfig(level=logging.DEBUG) for u in get_urls(get_tree(sys.argv[1])): print(" -", repr(u))