269 lines
11 KiB
Python
269 lines
11 KiB
Python
from tools.converters import *
|
|
from tools.invidious_ported import browse, protodec
|
|
import json, re, datetime, time
|
|
from threading import Lock
|
|
from cachetools import TTLCache
|
|
|
|
playlist_cache = TTLCache(maxsize=50, ttl=300)
|
|
playlist_cache_lock = Lock()
|
|
|
|
def produce_playlist_continuation(id, index):
|
|
if id.startswith("UC"):
|
|
id = "UU" + id.split("UC", 1)[1]
|
|
plid = "VL" + id
|
|
|
|
request_count = index / 100
|
|
|
|
data = {"1:varint": index}
|
|
data = protodec(json.dumps(data, separators=(',', ':')), "-e")
|
|
data = data.split("=")[0]
|
|
|
|
object = {
|
|
"80226972:embedded": {
|
|
"2:string": plid,
|
|
"3:base64": {
|
|
"1:varint": int(request_count),
|
|
"15:string": "PT:{}".format(data),
|
|
"104:embedded": {"1:0:varint": 0},
|
|
},
|
|
"35:string": id,
|
|
}
|
|
}
|
|
object = json.dumps(object, separators=(',', ':'))
|
|
|
|
continuation = protodec(object, "-e")
|
|
continuation = continuation.split("=")[0]
|
|
return continuation
|
|
|
|
def parse_playlist_continuation(continuation):
|
|
object = protodec(continuation, "-db")
|
|
object = json.loads(object)
|
|
|
|
data = object["80226972:0:embedded"]["3:1:base64"]["15:1:string"]
|
|
data = data.split("PT:")[1]
|
|
data = protodec(data, "-db")
|
|
data = json.loads(data)
|
|
data = data["1:0:varint"]
|
|
return data
|
|
|
|
def fetch_playlist(plid):
|
|
if plid.startswith("UC"):
|
|
plid = "UU" + plid.split("UC", 1)[1]
|
|
|
|
with playlist_cache_lock:
|
|
if plid in playlist_cache:
|
|
return playlist_cache[plid]
|
|
|
|
yt_initial_data = browse(browseId="VL" + plid)
|
|
|
|
playlist_sidebar_renderer = try_dig(yt_initial_data, "sidebar", "playlistSidebarRenderer", "items")
|
|
if playlist_sidebar_renderer is None:
|
|
raise "Could not extract playlistSidebarRenderer."
|
|
|
|
playlist_info = try_dig(playlist_sidebar_renderer, 0, "playlistSidebarPrimaryInfoRenderer")
|
|
if playlist_info is None:
|
|
raise "Could not extract playlist info"
|
|
|
|
title = try_dig(playlist_info, "title", "runs", 0, "text")
|
|
desc_item = playlist_info["description"] if "description" in playlist_info else None
|
|
if desc_item is not None:
|
|
description_txt = combine_runs(desc_item)
|
|
description_html = add_html_links(escape_html_textcontent(description_txt))
|
|
else:
|
|
description_txt = None
|
|
description_html = None
|
|
|
|
thumbnail = try_dig(playlist_info, "thumbnailRenderer", "playlistVideoThumbnailRenderer", "thumbnail", "thumbnails", 0, "url")
|
|
|
|
views = 0
|
|
updated = 0
|
|
video_count = 0
|
|
|
|
subtitle = try_dig(yt_initial_data, "header", "playlist", "subtitle", combine=True)
|
|
|
|
stats = playlist_info["stats"]
|
|
if stats is not None:
|
|
for stat in stats:
|
|
text = combine_runs(stat)
|
|
if text is None:
|
|
continue
|
|
|
|
if "video" in text or "episode" in text:
|
|
video_count = int(re.sub("\D", "", text))
|
|
elif "view" in text:
|
|
views = re.sub("\D", "", text)
|
|
elif "updated" in text.lower():
|
|
if "Last updated on" in text:
|
|
updated = time.mktime(datetime.datetime.strptime(text, "Last updated on %b %d, %Y").timetuple())
|
|
else:
|
|
updated = past_text_to_time(text.split("Updated ")[1])
|
|
# if "seconds ago" in text:
|
|
# updated = datetime.datetime.utcnow() - datetime.timedelta(seconds=int(re.sub("\D", "", text)))
|
|
# elif "minutes ago" in text:
|
|
# updated = datetime.datetime.utcnow() - datetime.timedelta(minutes=int(re.sub("\D", "", text)))
|
|
# elif "hours ago" in text:
|
|
# updated = datetime.datetime.utcnow() - datetime.timedelta(hours=int(re.sub("\D", "", text)))
|
|
# elif "days ago" in text:
|
|
# updated = datetime.datetime.utcnow() - datetime.timedelta(days=int(re.sub("\D", "", text)))
|
|
# elif "weeks ago" in text:
|
|
# updated = datetime.datetime.utcnow() - datetime.timedelta(days=int(re.sub("\D", "", text)) * 7)
|
|
# elif "months ago" in text:
|
|
# updated = datetime.datetime.utcnow() - datetime.timedelta(days=int(re.sub("\D", "", text)) * 30)
|
|
|
|
if len(playlist_sidebar_renderer) < 2:
|
|
author = None
|
|
author_thumbnail = None
|
|
ucid = None
|
|
else:
|
|
author_info = try_dig(playlist_sidebar_renderer, 1, "playlistSidebarSecondaryInfoRenderer", "videoOwner", "videoOwnerRenderer")
|
|
if author_info is None:
|
|
raise "Could not extract author info"
|
|
|
|
author = try_dig(author_info, "title", "runs", 0, "text")
|
|
author_thumbnail = try_dig(author_info, "thumbnail", "thumbnails", 0, "url")
|
|
ucid = try_dig(author_info, "title", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId")
|
|
|
|
playlist = {
|
|
"title": title,
|
|
"id": plid,
|
|
"author": author,
|
|
"author_thumbnail": author_thumbnail,
|
|
"ucid": ucid,
|
|
"description": description_txt,
|
|
"description_html": description_html,
|
|
"video_count": video_count,
|
|
"views": views,
|
|
"updated": updated,
|
|
"thumbnail": thumbnail,
|
|
"subtitle": subtitle
|
|
}
|
|
|
|
with playlist_cache_lock:
|
|
playlist_cache[plid] = playlist
|
|
|
|
return playlist
|
|
|
|
def get_playlist_videos(plid, offset, videoId=None):
|
|
pl = fetch_playlist(plid)
|
|
|
|
if offset >= pl["video_count"] or offset < 0:
|
|
return []
|
|
|
|
if videoId is not None:
|
|
yt_initial_data = next(data={"videoId": videoId, "playlistId": pl["id"]})
|
|
new_offset = try_dig(yt_initial_data, "contents", "twoColumnWatchNextResults", "playlist", "playlist", "currentIndex")
|
|
if new_offset is not None:
|
|
offset = new_offset
|
|
|
|
videos = []
|
|
|
|
while True:
|
|
ctoken = produce_playlist_continuation(pl["id"], offset)
|
|
yt_initial_data = browse(continuation=ctoken)
|
|
for video in extract_playlist_videos(yt_initial_data):
|
|
videos.append(video)
|
|
|
|
offset = offset + 100
|
|
|
|
if len(videos) >= 200 or len(videos) == pl["video_count"] or offset >= pl["video_count"]:
|
|
break
|
|
break
|
|
|
|
return videos
|
|
|
|
def extract_playlist_videos(yt_initial_data):
|
|
videos = []
|
|
|
|
if "contents" in yt_initial_data:
|
|
tabs = yt_initial_data["contents"]["twoColumnBrowseResultsRenderer"]["tabs"]
|
|
tabs_renderer = []
|
|
for tab in tabs:
|
|
if tab["tabRenderer"]["selected"] and ("contents" in tab["tabRenderer"] or "content" in tab["tabRenderer"]):
|
|
tabs_renderer = tab["tabRenderer"]
|
|
break
|
|
|
|
if "contents" in tabs_renderer or "content" in tabs_renderer:
|
|
tab_contents = tabs_renderer["contents"] if "contents" in tabs_renderer else (tabs_renderer["content"])
|
|
|
|
list_renderer = tab_contents["sectionListRenderer"]["contents"][0]
|
|
item_renderer = list_renderer["itemSectionRenderer"]["contents"][0]
|
|
contents = item_renderer["playlistVideoListRenderer"]["contents"]
|
|
else:
|
|
contents = try_dig(yt_initial_data, "onResponseReceivedActions", 0, "appendContinuationItemsAction", "continuationItems")
|
|
else:
|
|
contents = try_dig(yt_initial_data, "response", "continuationContents", "playlistVideoListContinuation", "contents")
|
|
|
|
if contents is not None:
|
|
for v in contents:
|
|
if not "playlistVideoRenderer" in v:
|
|
continue
|
|
v = v["playlistVideoRenderer"]
|
|
video_id = v["navigationEndpoint"]["watchEndpoint"]["videoId"]
|
|
plid = v["navigationEndpoint"]["watchEndpoint"]["playlistId"]
|
|
index = v["navigationEndpoint"]["watchEndpoint"]["index"]
|
|
|
|
title = try_dig(v, "title", combine=True)
|
|
author = try_dig(v, "shortBylineText", "runs", 0, "text")
|
|
ucid = try_dig(v, "shortBylineText", "runs", 0, "navigationEndpoint", "browseEndpoint", "browseId")
|
|
length_seconds = v["lengthSeconds"] if "lengthSeconds" in v else None
|
|
live = False
|
|
|
|
is_upcoming = False
|
|
length_text = "UNKNOWN"
|
|
if length_seconds is None:
|
|
live = True
|
|
length_seconds = 0
|
|
for o in v["thumbnailOverlays"]:
|
|
if "thumbnailOverlayTimeStatusRenderer" in o:
|
|
length_text = combine_runs(o["thumbnailOverlayTimeStatusRenderer"]["text"])
|
|
length_text_style = o["thumbnailOverlayTimeStatusRenderer"]["style"]
|
|
if length_text_style == "DEFAULT":
|
|
length_seconds = length_text_to_seconds(length_text)
|
|
elif length_text_style == "LIVE":
|
|
live = True
|
|
elif length_text_style == "UPCOMING":
|
|
is_upcoming = True
|
|
|
|
published = 0
|
|
published_text = "Live now"
|
|
premiere_timestamp = None
|
|
view_count_text = "0 views"
|
|
for run in v["videoInfo"]["runs"]:
|
|
if run["text"].endswith("views"):
|
|
view_count_text = run["text"]
|
|
elif len(run["text"].split(" ")) == 3 or run["text"].startswith("Streamed"):
|
|
published_text = run["text"]
|
|
if published_text != "Live now":
|
|
published = past_text_to_time(published_text)
|
|
# TODO i dont know what this looks like...
|
|
if "upcomingEventData" in v:
|
|
premiere_timestamp = v["upcomingEventData"]["startTime"]
|
|
published_text = time_to_past_text(int(premiere_timestamp))
|
|
if view_count_text != "0 views":
|
|
view_count_text_short = view_count_text
|
|
view_count_text = uncompress_counter(view_count_text.split(" ")[0])
|
|
|
|
videos.append({
|
|
"type": "video",
|
|
"title": title,
|
|
"videoId": video_id,
|
|
"id": video_id,
|
|
"author": author,
|
|
"ucid": ucid,
|
|
"length_seconds": length_seconds,
|
|
"lengthSeconds": length_seconds,
|
|
"second__lengthText": length_text,
|
|
"viewCount": view_count_text,
|
|
"second__viewCountText": view_count_text_short,
|
|
"second__viewCountTextShort": view_count_text_short,
|
|
"published": published,
|
|
"publishedText": published_text,
|
|
"plid": plid,
|
|
"live_now": live,
|
|
"isUpcoming": is_upcoming,
|
|
"premiereTimestamp": premiere_timestamp,
|
|
"index": index
|
|
})
|
|
|
|
return videos
|