from tools.converters import * from tools.invidious_ported import browse, protodec import json, re, datetime, time from threading import Lock from cachetools import TTLCache playlist_cache = TTLCache(maxsize=50, ttl=300) playlist_cache_lock = Lock() def produce_playlist_continuation(id, index): if id.startswith("UC"): id = "UU" + id.split("UC", 1)[1] plid = "VL" + id request_count = index / 100 data = {"1:varint": index} data = protodec(json.dumps(data, separators=(',', ':')), "-e") data = data.split("=")[0] object = { "80226972:embedded": { "2:string": plid, "3:base64": { "1:varint": int(request_count), "15:string": "PT:{}".format(data), "104:embedded": {"1:0:varint": 0}, }, "35:string": id, } } object = json.dumps(object, separators=(',', ':')) continuation = protodec(object, "-e") continuation = continuation.split("=")[0] return continuation def parse_playlist_continuation(continuation): object = protodec(continuation, "-db") object = json.loads(object) data = object["80226972:0:embedded"]["3:1:base64"]["15:1:string"] data = data.split("PT:")[1] data = protodec(data, "-db") data = json.loads(data) data = data["1:0:varint"] return data def fetch_playlist(plid): if plid.startswith("UC"): plid = "UU" + plid.split("UC", 1)[1] with playlist_cache_lock: if plid in playlist_cache: return playlist_cache[plid] yt_initial_data = browse(browseId="VL" + plid) playlist_sidebar_renderer = try_dig(yt_initial_data, "sidebar", "playlistSidebarRenderer", "items") if playlist_sidebar_renderer is None: raise "Could not extract playlistSidebarRenderer." playlist_info = try_dig(playlist_sidebar_renderer, 0, "playlistSidebarPrimaryInfoRenderer") if playlist_info is None: raise "Could not extract playlist info" title = try_dig(playlist_info, "title", "runs", 0, "text") desc_item = playlist_info["description"] if "description" in playlist_info else None if desc_item is not None: description_txt = combine_runs(desc_item) description_html = add_html_links(escape_html_textcontent(description_txt)) else: description_txt = None description_html = None thumbnail = try_dig(playlist_info, "thumbnailRenderer", "playlistVideoThumbnailRenderer", "thumbnail", "thumbnails", 0, "url") views = 0 updated = 0 video_count = 0 subtitle = try_dig(yt_initial_data, "header", "playlist", "subtitle", combine=True) stats = playlist_info["stats"] if stats is not None: for stat in stats: text = combine_runs(stat) if text is None: continue if "video" in text or "episode" in text: video_count = int(re.sub("\D", "", text)) elif "view" in text: views = re.sub("\D", "", text) elif "updated" in text.lower(): if "Last updated on" in text: updated = time.mktime(datetime.datetime.strptime(text, "Last updated on %b %d, %Y").timetuple()) else: updated = past_text_to_time(text.split("Updated ")[1]) # if "seconds ago" in text: # updated = datetime.datetime.utcnow() - datetime.timedelta(seconds=int(re.sub("\D", "", text))) # elif "minutes ago" in text: # updated = datetime.datetime.utcnow() - datetime.timedelta(minutes=int(re.sub("\D", "", text))) # elif "hours ago" in text: # updated = datetime.datetime.utcnow() - datetime.timedelta(hours=int(re.sub("\D", "", text))) # elif "days ago" in text: # updated = datetime.datetime.utcnow() - datetime.timedelta(days=int(re.sub("\D", "", text))) # elif "weeks ago" in text: # updated = datetime.datetime.utcnow() - datetime.timedelta(days=int(re.sub("\D", "", text)) * 7) # elif "months ago" in text: # updated = datetime.datetime.utcnow() - datetime.timedelta(days=int(re.sub("\D", "", text)) * 30) if len(playlist_sidebar_renderer) < 2: author = None author_thumbnail = None ucid = None else: author_info = try_dig(playlist_sidebar_renderer, 1, "playlistSidebarSecondaryInfoRenderer", "videoOwner", "videoOwnerRenderer") if author_info is None: raise "Could not extract author info" author = try_dig(author_info, "title", "runs", 0, "text") author_thumbnail = try_dig(author_info, "thumbnail", "thumbnails", 0, "url") ucid = try_dig(author_info, "title", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId") playlist = { "title": title, "id": plid, "author": author, "author_thumbnail": author_thumbnail, "ucid": ucid, "description": description_txt, "description_html": description_html, "video_count": video_count, "views": views, "updated": updated, "thumbnail": thumbnail, "subtitle": subtitle } with playlist_cache_lock: playlist_cache[plid] = playlist return playlist def get_playlist_videos(plid, offset, videoId=None): pl = fetch_playlist(plid) if offset >= pl["video_count"] or offset < 0: return [] if videoId is not None: yt_initial_data = next(data={"videoId": videoId, "playlistId": pl["id"]}) new_offset = try_dig(yt_initial_data, "contents", "twoColumnWatchNextResults", "playlist", "playlist", "currentIndex") if new_offset is not None: offset = new_offset videos = [] while True: ctoken = produce_playlist_continuation(pl["id"], offset) yt_initial_data = browse(continuation=ctoken) for video in extract_playlist_videos(yt_initial_data): videos.append(video) offset = offset + 100 if len(videos) >= 200 or len(videos) == pl["video_count"] or offset >= pl["video_count"]: break break return videos def extract_playlist_videos(yt_initial_data): videos = [] if "contents" in yt_initial_data: tabs = yt_initial_data["contents"]["twoColumnBrowseResultsRenderer"]["tabs"] tabs_renderer = [] for tab in tabs: if tab["tabRenderer"]["selected"] and ("contents" in tab["tabRenderer"] or "content" in tab["tabRenderer"]): tabs_renderer = tab["tabRenderer"] break if "contents" in tabs_renderer or "content" in tabs_renderer: tab_contents = tabs_renderer["contents"] if "contents" in tabs_renderer else (tabs_renderer["content"]) list_renderer = tab_contents["sectionListRenderer"]["contents"][0] item_renderer = list_renderer["itemSectionRenderer"]["contents"][0] contents = item_renderer["playlistVideoListRenderer"]["contents"] else: contents = try_dig(yt_initial_data, "onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems") else: contents = try_dig(yt_initial_data, "response", "continuationContents", "playlistVideoListContinuation", "contents") if contents is not None: for v in contents: if not "playlistVideoRenderer" in v: continue v = v["playlistVideoRenderer"] video_id = v["navigationEndpoint"]["watchEndpoint"]["videoId"] plid = v["navigationEndpoint"]["watchEndpoint"]["playlistId"] index = v["navigationEndpoint"]["watchEndpoint"]["index"] title = try_dig(v, "title", combine=True) author = try_dig(v, "shortBylineText", "runs", 0, "text") ucid = try_dig(v, "shortBylineText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId") length_seconds = v["lengthSeconds"] if "lengthSeconds" in v else None live = False is_upcoming = False length_text = "UNKNOWN" if length_seconds is None: live = True length_seconds = 0 for o in v["thumbnailOverlays"]: if "thumbnailOverlayTimeStatusRenderer" in o: length_text = combine_runs(o["thumbnailOverlayTimeStatusRenderer"]["text"]) length_text_style = o["thumbnailOverlayTimeStatusRenderer"]["style"] if length_text_style == "DEFAULT": length_seconds = length_text_to_seconds(length_text) elif length_text_style == "LIVE": live = True elif length_text_style == "UPCOMING": is_upcoming = True published = 0 published_text = "Live now" premiere_timestamp = None view_count_text = "0 views" for run in v["videoInfo"]["runs"]: if run["text"].endswith("views"): view_count_text = run["text"] elif len(run["text"].split(" ")) == 3 or run["text"].startswith("Streamed"): published_text = run["text"] if published_text != "Live now": published = past_text_to_time(published_text) # TODO i dont know what this looks like... if "upcomingEventData" in v: premiere_timestamp = v["upcomingEventData"]["startTime"] published_text = time_to_past_text(int(premiere_timestamp)) if view_count_text != "0 views": view_count_text_short = view_count_text view_count_text = uncompress_counter(view_count_text.split(" ")[0]) videos.append({ "type": "video", "title": title, "videoId": video_id, "id": video_id, "author": author, "ucid": ucid, "length_seconds": length_seconds, "lengthSeconds": length_seconds, "second__lengthText": length_text, "viewCount": view_count_text, "second__viewCountText": view_count_text_short, "second__viewCountTextShort": view_count_text_short, "published": published, "publishedText": published_text, "plid": plid, "live_now": live, "isUpcoming": is_upcoming, "premiereTimestamp": premiere_timestamp, "index": index }) return videos