import cherrypy import dateutil.parser import requests import re, datetime, time import xml.etree.ElementTree as ET from tools.converters import * from tools.extractors import extract_yt_initial_data, eu_consent_cookie from tools.invidious_ported import browse, protodec from extractors.playlist import * from threading import Lock from cachetools import TTLCache channel_cache = TTLCache(maxsize=50, ttl=300) channel_cache_lock = Lock() channel_latest_cache = TTLCache(maxsize=500, ttl=300) channel_latest_cache_lock = Lock() def extract_channel_new(ucid, second__path="user"): cache_key = (ucid, second__path) with channel_cache_lock: if cache_key in channel_cache: return channel_cache[cache_key] yt_initial_data = browse(browseId=ucid) if yt_initial_data is None: return { "error": alert_text, "identifier": "NOT_FOUND" } for alert in yt_initial_data.get("alerts", []): alert_text = combine_runs(alert["alertRenderer"]["text"]) if alert_text == "This channel does not exist.": return { "error": alert_text, "identifier": "NOT_FOUND" } elif alert_text.startswith("This account has been terminated"): return { "error": alert_text, "identifier": "ACCOUNT_TERMINATED" } else: return { "error": alert_text, "identifier": "UNKNOWN" } # Redirect browse_redirect = try_dig(yt_initial_data, "onResponseReceivedActions", 0, "navigateAction", "endpoint", "browseEndpoint") # I dont know what to do with this... auto_generated = False if not "metadata" in yt_initial_data: auto_generated = True elif try_dig(yt_initial_data, "metadata", "channelMetadataRenderer", "musicArtistName") is not None: auto_generated = True tags = [] tab_names = [] total_views = 0 banner = None joined = None video_count = None video_count_text = None age_gate_renderer = try_dig(yt_initial_data, "contents", "twoColumnBrowseResultsRenderer", "tabs", 0, "tabRenderer", "content", "sectionListRenderer", "contents", 0, "channelAgeGateRenderer") if age_gate_renderer is not None: author = combine_runs(age_gate_renderer["channelTitle"]) newUcid = try_dig(yt_initial_data, "responseContext", "serviceTrackingParams", 0, "params", 0, "value") if newUcid is not None: ucid = newUcid author_url = "https://www.youtube.com/channel/#{}".format(ucid) author_thumbnail = try_dig(age_gate_renderer, "avatar", "thumbnails", [0], "url") banners = [] banner = None descriptionNode = None is_family_friendly = None is_age_gated = True tab_names = ["videos", "shorts", "streams"] auto_generated = False else: banners = try_dig(yt_initial_data, "header", "pageHeaderRenderer", "content", "pageHeaderViewModel", "banner", "imageBannerViewModel", "image", "sources") if banners is not None: banner = try_dig(banners, len(banners) - 1, "url") author = try_dig(yt_initial_data, "metadata", "channelMetadataRenderer", "title") author_url = try_dig(yt_initial_data, "metadata", "channelMetadataRenderer", "channelUrl") ucid = try_dig(yt_initial_data, "metadata", "channelMetadataRenderer", "externalId") descriptionNode = try_dig(yt_initial_data, "metadata", "channelMetadataRenderer", "description") tags = try_dig(yt_initial_data, "microformat", "microformatDataRenderer", "tags") is_family_friendly = try_dig(yt_initial_data, "microformat", "microformatDataRenderer", "familySafe") tabs_json = try_dig(yt_initial_data, "contents", "twoColumnBrowseResultsRenderer", "tabs") if tabs_json is not None: tab_names = [] for tab in tabs_json: name = try_dig(tab, "tabRenderer", "title") if name is not None: name = name.lower() if name == "live": name = "streams" elif name == "posts": name = "community" tab_names.append(name) # Get selected tab selected_tab = None for tab in tabs_json: is_selected = try_dig(tab, "tabRenderer", "selected") == True if is_selected: selected_tab = try_dig(tab, "tabRenderer") break about_tab = selected_tab author_thumbnail = try_dig(yt_initial_data, "metadata", "channelMetadataRenderer", "avatar") if author_thumbnail is None: author_thumbnail = try_dig(yt_initial_data, "header", "c4TabbedHeaderRenderer", "avatar") if author_thumbnail is not None: author_thumbnails = generate_full_author_thumbnails(author_thumbnail["thumbnails"]) author_thumbnail = try_dig(author_thumbnail, "thumbnails", 0, "url") allowed_regions = yt_initial_data["microformat"]["microformatDataRenderer"]["availableCountries"] description = descriptionNode #todo? sub_count = 0 sub_count_text = "0" if auto_generated: sub_count_text = None else: metadata_rows = try_dig(yt_initial_data, "header", "pageHeaderRenderer", "content", "pageHeaderViewModel", "metadata", "contentMetadataViewModel", "metadataRows") if metadata_rows is not None: for row in metadata_rows: metadata_parts = try_dig(row, "metadataParts") for part in metadata_parts: if "subscribers" in part["text"]["content"]: count = part["text"]["content"].split(" ")[0] sub_count = uncompress_counter(count) sub_count_text = count + " subscribers" break # Get some extra data using the continuation token continuation = try_dig(yt_initial_data, "header", "pageHeaderRenderer", "content", "pageHeaderViewModel", "description", "descriptionPreviewViewModel", "rendererContext", "commandContext", "onTap", "innertubeCommand", "showEngagementPanelEndpoint", "engagementPanel", "engagementPanelSectionListRenderer", "content", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents", 0, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token") if continuation is not None: yt_extra_data = browse(continuation=continuation) extra_data = try_dig(yt_extra_data, "onResponseReceivedEndpoints", 0, "appendContinuationItemsAction", "continuationItems", 0, "aboutChannelRenderer", "metadata", "aboutChannelViewModel") if extra_data is not None: if not auto_generated: sub_count_text = sub_count_text or extra_data["subscriberCountText"] sub_count = sub_count or uncompress_counter(sub_count_text.split(" ")[0]) total_views = total_views or int(extra_data["viewCountText"].replace(",", "").split(" ")[0]) joined = joined or time.mktime(datetime.datetime.strptime(extra_data["joinedDateText"]["content"], "Joined %b %d, %Y").timetuple()) video_count_text = extra_data["videoCountText"] video_count = video_count or uncompress_counter(video_count_text.split("videos")[0]) author_banners = [] if banner is not None: for q in [{"width": 2560, "height": 424}, {"width": 2120, "height": 351}, {"width": 1060, "height": 175}]: author_banners.append({ "url": banner.replace("=w1060-", "=w{}-".format(q["width"]), 1), "width": q["width"], "height": q["height"] }) author_banners.append({ "url": banner.split("=w1060-")[0], "width": 512, "height": 288 }) channel = { "author": author, "authorId": ucid, "authorUrl": author_url, "authorBanners": author_banners, "banner": banner, "authorThumbnails": author_thumbnails, "thumbnail": author_thumbnail, "subCount": sub_count, "second__subCountText": sub_count_text, "totalViews": total_views, "joined": joined, "paid": None, "autoGenerated": auto_generated, "ageGated": age_gate_renderer is not None, "isFamilyFriendly": is_family_friendly, "description": description, "descriptionHtml": add_html_links(escape_html_textcontent(description)) if description is not None else None, "allowedRegions": allowed_regions, "tabs": tab_names, "tags": tags, #"latestVideos": videos, "videoCount": video_count, "videoCountText": video_count_text, "relatedChannels": [] } channel["latestVideos"] = extract_channel_latest(ucid, second__path, channel=channel)#["videos"] with channel_cache_lock: channel_cache[cache_key] = channel return channel def produce_channel_content_continuation(ucid, content_type, page=1, sort_by="newest", targetId=None): # object_inner_2 = { # "2:0:embedded": { # "1:0:varint": 0 # }, # "5:varint": 50, # "6:varint": 1, # "7:varint": page * 30, # "9:varint": 1, # "10:varint": 0 # } #object_inner_2_encoded = protodec(json.dumps(object_inner_2, separators=(',', ':')), "-e") #object_inner_2_encoded = object_inner_2_encoded.split("=")[0] + "%3D" content_type_numerical = 15 if content_type == "streams": content_type_numerical = 14 elif content_type == "shorts": content_type_numerical = 10 sort_by_numerical = 1 if sort_by == "popular": sort_by_numerical = 2 elif sort_by == "oldest": sort_by_numerical = 4 object = { "80226972:embedded": { "2:string": ucid, "3:base64": { "110:embedded": { "3:embedded": { "{}:embedded".format(content_type_numerical): { "2:string": "\n${}".format(targetId), "4:varint": sort_by_numerical == 2 and 2 or 5 } } } } } } continuation = protodec(json.dumps(object, separators=(',', ':')), "-e") #continuation = continuation.split("=")[0] return continuation def extract_videos_from_initial_data(yt_initial_data, channel, content_type): content = try_dig(yt_initial_data, "contents") videoItems = None # "content" if content is not None: tabs = try_dig(content, "twoColumnBrowseResultsRenderer", "tabs") active_tab = tabs[0] for tab in tabs: if "selected" in tab["tabRenderer"] and tab["tabRenderer"]["selected"]: active_tab = tab break if content_type == "playlists": videoItems = try_dig(active_tab, "tabRenderer", "content", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents", 0, "gridRenderer", "items") else: videoItems = try_dig(active_tab, "tabRenderer", "content", "richGridRenderer", "contents") # "response" if content is None: content = try_dig(yt_initial_data, "response") # I DONT KNOW HOW TO GET THE CONTINUATION TOKEN HERE WAHHHH if content is not None: with open("PLEASE LOOK.txt", "w") as f: f.write(content) # "onResponseReceivedActions" if content is None: content = try_dig(yt_initial_data, "onResponseReceivedActions") if content is not None: content = content[-1] videoItems = try_dig(content, "reloadContinuationItemsCommand", "continuationItems") if videoItems is None: videoItems = try_dig(content, "appendContinuationItemsAction", "continuationItems") with open("meow.txt", "w") as f: f.write(json.dumps(videoItems)) continuation = try_dig(videoItems[-1], "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token") # Parse videos videosToParse = videoItems videos = [] for v in videosToParse: if "continuationItemRenderer" in v: continue base = try_dig(v, "videoRenderer") if content_type == "playlists": base = try_dig(v, "lockupViewModel") if base is None: base = try_dig(v, "gridPlaylistRenderer") elif base is None: base = try_dig(v, "richItemRenderer", "content", content_type == "shorts" and "shortsLockupViewModel" or "videoRenderer") if content_type != "playlists" or not "lockupViewModel" in v: is_upcoming = False live = False length_text = None length_seconds = None if "thumbnailOverlays" in base: for o in base["thumbnailOverlays"]: if "thumbnailOverlayTimeStatusRenderer" in o: length_text = combine_runs(o["thumbnailOverlayTimeStatusRenderer"]["text"]) length_text_style = o["thumbnailOverlayTimeStatusRenderer"]["style"] if length_text_style == "DEFAULT": length_seconds = length_text_to_seconds(length_text) elif length_text_style == "LIVE": live = True elif length_text_style == "UPCOMING": is_upcoming = True if length_text is None and "lengthText" in base: length_text = combine_runs(base["lengthText"]) length_seconds = length_text_to_seconds(length_text) # Shorts if content_type == "shorts": title = try_dig(base, "overlayMetadata", "primaryText", "content") video_id = try_dig(base, "onTap", "innertubeCommand", "reelWatchEndpoint", "videoId") description = None description_html = None view_text = try_dig(base, "overlayMetadata", "secondaryText", "content") view_count = uncompress_counter(view_text.split(" views")[0]) view_text_short = view_text published = None published_text = None live = False is_upcoming = False # Playlists elif content_type == "playlists": if "lockupViewModel" in v: metadata = try_dig(base, "metadata", "lockupMetadataViewModel") title = try_dig(metadata, "title", "content") playlist_id = try_dig(base, "contentId") playlist_thumbnail = try_dig(base, "contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "image", "sources", 0, "url") video_count_text = try_dig(base, "contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text") if video_count_text is not None: video_count = int(video_count_text.split(" ")[0]) else: video_count = None updated_text = try_dig(metadata, "metadata", "contentMetadataViewModel", "metadataRows", 0, "metadataParts", 0, "text", "content") if updated_text is not None and updated_text.find("dated ") > -1: updated = past_text_to_time(updated_text.split("dated ")[1]) else: updated = None updated_text = None else: title = try_dig(base, "title", combine=True) playlist_id = try_dig(base, "playlistId") playlist_thumbnail = try_dig(base, "thumbnail", "thumbnails", -1, "url") video_count_text = try_dig(base, "videoCountText", combine=True) if video_count_text is not None: video_count = int(video_count_text.split(" ")[0]) else: video_count = None updated_text = None updated = None # Normal else: title = combine_runs(base["title"]) video_id = base["videoId"] description = combine_runs(base["descriptionSnippet"]) description_html = add_html_links(escape_html_textcontent(combine_runs(base["descriptionSnippet"]))) view_text = combine_runs(base["viewCountText"]) view_count = uncompress_counter(view_text.split(" ")[0]) view_text_short = combine_runs(base["shortViewCountText"]) if "shortViewCountText" in base else view_text published_text = combine_runs(base["publishedTimeText"]) published = past_text_to_time(published_text) if content_type != "playlists": videos.append({ "type": "video", "title": title, "videoId": video_id, "author": channel["author"], "authorId": channel["authorId"], "authorUrl": channel["authorUrl"], "videoThumbnails": generate_video_thumbnails(video_id), "description": description, "descriptionHtml": description_html, "viewCount": view_count, "second__viewCountText": view_text, "second__viewCountTextShort": view_text_short, "published": published, "publishedText": published_text, "lengthSeconds": length_seconds, "second__lengthText": length_text, "liveNow": live, "paid": None, "premium": None, "isUpcoming": is_upcoming }) else: videos.append({ "type": "playlist", "title": title, "playlistId": playlist_id, "playlistThumbnail": playlist_thumbnail, "author": channel["author"], "authorId": channel["authorId"], "authorUrl": channel["authorUrl"], "videoCount": video_count, "videoCountText": video_count_text, "second__videoCountText": video_count_text, "videos": [], "updatedText": updated_text, "second__updatedText": updated_text, "updated": updated }) return { (content_type == "playlists" and "playlists" or "videos"): videos, "continuation": continuation } # UULF - videos # UUSH - shorts # UULV - streams def extract_channel_videos(ucid, content_type, second__path="channel", **kwargs): channel = None if "channel" in kwargs: channel = kwargs["channel"] else: channel = extract_channel_new(ucid, second__path) if "error" in channel: return channel else: # Reads the channel like a playlist if channel["ageGated"]: return extract_channel_videos_as_playlist(ucid, content_type, second__path, kwargs) # Uses youtube's strange content sorting stuff based on channel content sorting ??? stuff i dunno else: continuation = None params = None # Videos if content_type == "videos": params = "EgZ2aWRlb3PyBgQKAjoA" # Shorts elif content_type == "shorts": params = "8gYFCgOaAQA%3D" # Streams elif content_type == "streams": params = "EgdzdHJlYW1z8gYECgJ6AA%3D%3D" if "sort_by" in kwargs and kwargs["sort_by"] != "newest": yt_initial_data = browse(browseId=ucid, params=params) tabs = try_dig(yt_initial_data, "contents", "twoColumnBrowseResultsRenderer", "tabs") active_tab = tabs[0] for tab in tabs: if "selected" in tab["tabRenderer"]: active_tab = tab break target_id = try_dig(active_tab, "tabRenderer", "content", "richGridRenderer", "targetId") continuation = produce_channel_content_continuation(channel["authorId"], content_type, 1, kwargs["sort_by"], target_id) params = None if params is not None: yt_initial_data = browse(browseId=ucid, params=params) else: yt_initial_data = browse(continuation=continuation) return extract_videos_from_initial_data(yt_initial_data, channel, content_type) def extract_channel_videos_as_playlist(ucid, content_type, second__path="channel", **kwargs): channel = extract_channel_new(ucid, second__path) if "error" in channel: return channel else: plid = channel["authorId"].replace("UC", {"videos": "UULF", "shorts": "UUSH", "streams": "UULV"}[content_type], 1) offset = 0 if "continuation" in kwargs: offset = parse_playlist_continuation(kwargs["continuation"]) videos = get_playlist_videos(plid, offset) return { "videos": videos, "continuation": len(videos) > 0 and produce_playlist_continuation(plid, len(videos) + offset) or None } #def extract_channel_latest(ucid, second__path="channel", channel=None): #return extract_channel_videos(ucid, "videos", second__path, channel=channel) # TODO: replace this with whatever youtube uses. information like video length is missing def extract_channel_latest(ucid, second__path, **kwargs): with channel_latest_cache_lock: if ucid in channel_latest_cache: return channel_latest_cache[ucid] r = requests.get("https://www.youtube.com/feeds/videos.xml?channel_id={}".format(ucid)) if r.status_code == 404: cherrypy.response.status = 404 return { "error": "This channel does not exist.", "identifier": "NOT_FOUND" } feed = ET.fromstring(r.content) author_container = feed.find("{http://www.w3.org/2005/Atom}author") author = author_container.find("{http://www.w3.org/2005/Atom}name").text author_url = author_container.find("{http://www.w3.org/2005/Atom}uri").text channel_id = feed.find("{http://www.youtube.com/xml/schemas/2015}channelId").text results = [] missing_published = False for entry in feed.findall("{http://www.w3.org/2005/Atom}entry"): id = entry.find("{http://www.youtube.com/xml/schemas/2015}videoId").text video_channel_id = entry.find("{http://www.youtube.com/xml/schemas/2015}channelId").text or channel_id if len(video_channel_id) == 22 and not video.startswith("UC"): video_channel_id = "UC" + video_channel_id media_group = entry.find("{http://search.yahoo.com/mrss/}group") description = media_group.find("{http://search.yahoo.com/mrss/}description").text or "" media_community = media_group.find("{http://search.yahoo.com/mrss/}community") published_entry = entry.find("{http://www.w3.org/2005/Atom}published") if published_entry is not None: # sometimes youtube does not provide published dates, no idea why. published = int(dateutil.parser.isoparse(published_entry.text).timestamp()) results.append({ "type": "video", "title": entry.find("{http://www.w3.org/2005/Atom}title").text, "videoId": id, "author": author, "authorId": video_channel_id, "authorUrl": author_url, "videoThumbnails": generate_video_thumbnails(id), "description": description, "descriptionHtml": add_html_links(escape_html_textcontent(description)), "viewCount": int(media_community.find("{http://search.yahoo.com/mrss/}statistics").attrib["views"]), "published": published, "publishedText": time_to_past_text(published), "lengthSeconds": None, "liveNow": None, "paid": None, "premium": None, "isUpcoming": None }) else: missing_published = True if len(results) == 0 and missing_published: # no results due to all missing published cherrypy.response.status = 503 return { "error": "YouTube did not provide published dates for any feed items. This is usually temporary - refresh in a few minutes.", "identifier": "PUBLISHED_DATES_NOT_PROVIDED" } with channel_latest_cache_lock: channel_latest_cache[ucid] = results return results def extract_channel_playlists(ucid, second__path, **kwargs): channel = extract_channel_new(ucid, second__path) if "error" in channel: return channel else: sort_by = "newest" if "sort" in kwargs: sort_by = kwargs["sort"] elif "sort_by" in kwargs: sort_by = kwargs["sort_by"] sort_by = sort_by.lower() yt_initial_data = None if "continuation" in kwargs: yt_initial_data = browse(continuation=kwargs["continuation"]) else: params = "EglwbGF5bGlzdHMYBCABMAE%3D" if sort_by == "newest" or sort_by == "newest_created": params = "EglwbGF5bGlzdHMYAyABMAE%3D" yt_initial_data = browse(browseId=ucid, params=params) with open("meow.json", "w") as f: f.write(json.dumps(yt_initial_data)) return extract_videos_from_initial_data(yt_initial_data, channel, "playlists")