newleaf/extractors/channel.py

586 lines
21 KiB
Python

import cherrypy
import dateutil.parser
import requests
import re, datetime, time
import xml.etree.ElementTree as ET
from tools.converters import *
from tools.extractors import extract_yt_initial_data, eu_consent_cookie
from tools.invidious_ported import browse, protodec
from extractors.playlist import *
from threading import Lock
from cachetools import TTLCache
channel_cache = TTLCache(maxsize=50, ttl=300)
channel_cache_lock = Lock()
channel_latest_cache = TTLCache(maxsize=500, ttl=300)
channel_latest_cache_lock = Lock()
def extract_channel_new(ucid, second__path="user"):
cache_key = (ucid, second__path)
with channel_cache_lock:
if cache_key in channel_cache:
return channel_cache[cache_key]
yt_initial_data = browse(browseId=ucid)
if yt_initial_data is None:
return {
"error": alert_text,
"identifier": "NOT_FOUND"
}
for alert in yt_initial_data.get("alerts", []):
alert_text = combine_runs(alert["alertRenderer"]["text"])
if alert_text == "This channel does not exist.":
return {
"error": alert_text,
"identifier": "NOT_FOUND"
}
elif alert_text.startswith("This account has been terminated"):
return {
"error": alert_text,
"identifier": "ACCOUNT_TERMINATED"
}
else:
return {
"error": alert_text,
"identifier": "UNKNOWN"
}
# Redirect
browse_redirect = try_dig(yt_initial_data, "onResponseReceivedActions", 0, "navigateAction", "endpoint", "browseEndpoint")
# I dont know what to do with this...
auto_generated = False
if not "metadata" in yt_initial_data:
auto_generated = True
elif try_dig(yt_initial_data, "metadata", "channelMetadataRenderer", "musicArtistName") is not None:
auto_generated = True
tags = []
tab_names = []
total_views = 0
banner = None
joined = None
video_count = None
video_count_text = None
age_gate_renderer = try_dig(yt_initial_data, "contents", "twoColumnBrowseResultsRenderer", "tabs", 0, "tabRenderer", "content", "sectionListRenderer", "contents", 0, "channelAgeGateRenderer")
if age_gate_renderer is not None:
author = combine_runs(age_gate_renderer["channelTitle"])
newUcid = try_dig(yt_initial_data, "responseContext", "serviceTrackingParams", 0, "params", 0, "value")
if newUcid is not None:
ucid = newUcid
author_url = "https://www.youtube.com/channel/#{}".format(ucid)
author_thumbnail = try_dig(age_gate_renderer, "avatar", "thumbnails", [0], "url")
banners = []
banner = None
descriptionNode = None
is_family_friendly = None
is_age_gated = True
tab_names = ["videos", "shorts", "streams"]
auto_generated = False
else:
banners = try_dig(yt_initial_data, "header", "pageHeaderRenderer", "content", "pageHeaderViewModel", "banner", "imageBannerViewModel", "image", "sources")
if banners is not None:
banner = try_dig(banners, len(banners) - 1, "url")
author = try_dig(yt_initial_data, "metadata", "channelMetadataRenderer", "title")
author_url = try_dig(yt_initial_data, "metadata", "channelMetadataRenderer", "channelUrl")
ucid = try_dig(yt_initial_data, "metadata", "channelMetadataRenderer", "externalId")
descriptionNode = try_dig(yt_initial_data, "metadata", "channelMetadataRenderer", "description")
tags = try_dig(yt_initial_data, "microformat", "microformatDataRenderer", "tags")
is_family_friendly = try_dig(yt_initial_data, "microformat", "microformatDataRenderer", "familySafe")
tabs_json = try_dig(yt_initial_data, "contents", "twoColumnBrowseResultsRenderer", "tabs")
if tabs_json is not None:
tab_names = []
for tab in tabs_json:
name = try_dig(tab, "tabRenderer", "title")
if name is not None:
name = name.lower()
if name == "live":
name = "streams"
elif name == "posts":
name = "community"
tab_names.append(name)
# Get selected tab
selected_tab = None
for tab in tabs_json:
is_selected = try_dig(tab, "tabRenderer", "selected") == True
if is_selected:
selected_tab = try_dig(tab, "tabRenderer")
break
about_tab = selected_tab
author_thumbnail = try_dig(yt_initial_data, "metadata", "channelMetadataRenderer", "avatar")
if author_thumbnail is None:
author_thumbnail = try_dig(yt_initial_data, "header", "c4TabbedHeaderRenderer", "avatar")
if author_thumbnail is not None:
author_thumbnails = generate_full_author_thumbnails(author_thumbnail["thumbnails"])
author_thumbnail = try_dig(author_thumbnail, "thumbnails", 0, "url")
allowed_regions = yt_initial_data["microformat"]["microformatDataRenderer"]["availableCountries"]
description = descriptionNode #todo?
sub_count = 0
sub_count_text = "0"
if auto_generated:
sub_count_text = None
else:
metadata_rows = try_dig(yt_initial_data, "header", "pageHeaderRenderer", "content", "pageHeaderViewModel", "metadata", "contentMetadataViewModel", "metadataRows")
if metadata_rows is not None:
for row in metadata_rows:
metadata_parts = try_dig(row, "metadataParts")
for part in metadata_parts:
if "subscribers" in part["text"]["content"]:
count = part["text"]["content"].split(" ")[0]
sub_count = uncompress_counter(count)
sub_count_text = count + " subscribers"
break
# Get some extra data using the continuation token
continuation = try_dig(yt_initial_data, "header", "pageHeaderRenderer", "content", "pageHeaderViewModel", "description", "descriptionPreviewViewModel", "rendererContext", "commandContext", "onTap", "innertubeCommand", "showEngagementPanelEndpoint", "engagementPanel", "engagementPanelSectionListRenderer", "content", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents", 0, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token")
if continuation is not None:
yt_extra_data = browse(continuation=continuation)
extra_data = try_dig(yt_extra_data, "onResponseReceivedEndpoints", 0, "appendContinuationItemsAction", "continuationItems", 0, "aboutChannelRenderer", "metadata", "aboutChannelViewModel")
if extra_data is not None:
if not auto_generated:
sub_count_text = sub_count_text or extra_data["subscriberCountText"]
sub_count = sub_count or uncompress_counter(sub_count_text.split(" ")[0])
total_views = total_views or int(extra_data["viewCountText"].replace(",", "").split(" ")[0])
joined = joined or time.mktime(datetime.datetime.strptime(extra_data["joinedDateText"]["content"], "Joined %b %d, %Y").timetuple())
video_count_text = extra_data["videoCountText"]
video_count = video_count or uncompress_counter(video_count_text.split("videos")[0])
author_banners = []
if banner is not None:
for q in [{"width": 2560, "height": 424}, {"width": 2120, "height": 351}, {"width": 1060, "height": 175}]:
author_banners.append({
"url": banner.replace("=w1060-", "=w{}-".format(q["width"]), 1),
"width": q["width"],
"height": q["height"]
})
author_banners.append({
"url": banner.split("=w1060-")[0],
"width": 512,
"height": 288
})
channel = {
"author": author,
"authorId": ucid,
"authorUrl": author_url,
"authorBanners": author_banners,
"banner": banner,
"authorThumbnails": author_thumbnails,
"thumbnail": author_thumbnail,
"subCount": sub_count,
"second__subCountText": sub_count_text,
"totalViews": total_views,
"joined": joined,
"paid": None,
"autoGenerated": auto_generated,
"ageGated": age_gate_renderer is not None,
"isFamilyFriendly": is_family_friendly,
"description": description,
"descriptionHtml": add_html_links(escape_html_textcontent(description)) if description is not None else None,
"allowedRegions": allowed_regions,
"tabs": tab_names,
"tags": tags,
#"latestVideos": videos,
"videoCount": video_count,
"videoCountText": video_count_text,
"relatedChannels": []
}
channel["latestVideos"] = extract_channel_latest(ucid, second__path, channel=channel)#["videos"]
with channel_cache_lock:
channel_cache[cache_key] = channel
return channel
def produce_channel_content_continuation(ucid, content_type, page=1, sort_by="newest", targetId=None):
# object_inner_2 = {
# "2:0:embedded": {
# "1:0:varint": 0
# },
# "5:varint": 50,
# "6:varint": 1,
# "7:varint": page * 30,
# "9:varint": 1,
# "10:varint": 0
# }
#object_inner_2_encoded = protodec(json.dumps(object_inner_2, separators=(',', ':')), "-e")
#object_inner_2_encoded = object_inner_2_encoded.split("=")[0] + "%3D"
content_type_numerical = 15
if content_type == "streams":
content_type_numerical = 14
elif content_type == "shorts":
content_type_numerical = 10
sort_by_numerical = 1
if sort_by == "popular":
sort_by_numerical = 2
elif sort_by == "oldest":
sort_by_numerical = 4
object = {
"80226972:embedded": {
"2:string": ucid,
"3:base64": {
"110:embedded": {
"3:embedded": {
"{}:embedded".format(content_type_numerical): {
"2:string": "\n${}".format(targetId),
"4:varint": sort_by_numerical == 2 and 2 or 5
}
}
}
}
}
}
continuation = protodec(json.dumps(object, separators=(',', ':')), "-e")
#continuation = continuation.split("=")[0]
return continuation
def extract_videos_from_initial_data(yt_initial_data, channel, content_type):
content = try_dig(yt_initial_data, "contents")
videoItems = None
# "content"
if content is not None:
tabs = try_dig(content, "twoColumnBrowseResultsRenderer", "tabs")
active_tab = tabs[0]
for tab in tabs:
if "selected" in tab["tabRenderer"] and tab["tabRenderer"]["selected"]:
active_tab = tab
break
if content_type == "playlists":
videoItems = try_dig(active_tab, "tabRenderer", "content", "sectionListRenderer", "contents", 0, "itemSectionRenderer", "contents", 0, "gridRenderer", "items")
else:
videoItems = try_dig(active_tab, "tabRenderer", "content", "richGridRenderer", "contents")
# "response"
if content is None:
content = try_dig(yt_initial_data, "response")
# I DONT KNOW HOW TO GET THE CONTINUATION TOKEN HERE WAHHHH
if content is not None:
with open("PLEASE LOOK.txt", "w") as f:
f.write(content)
# "onResponseReceivedActions"
if content is None:
content = try_dig(yt_initial_data, "onResponseReceivedActions")
if content is not None:
content = content[-1]
videoItems = try_dig(content, "reloadContinuationItemsCommand", "continuationItems")
if videoItems is None:
videoItems = try_dig(content, "appendContinuationItemsAction", "continuationItems")
with open("meow.txt", "w") as f:
f.write(json.dumps(videoItems))
continuation = try_dig(videoItems[-1], "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token")
# Parse videos
videosToParse = videoItems
videos = []
for v in videosToParse:
if "continuationItemRenderer" in v:
continue
base = try_dig(v, "videoRenderer")
if content_type == "playlists":
base = try_dig(v, "lockupViewModel")
if base is None:
base = try_dig(v, "gridPlaylistRenderer")
elif base is None:
base = try_dig(v, "richItemRenderer", "content", content_type == "shorts" and "shortsLockupViewModel" or "videoRenderer")
if content_type != "playlists" or not "lockupViewModel" in v:
is_upcoming = False
live = False
length_text = None
length_seconds = None
if "thumbnailOverlays" in base:
for o in base["thumbnailOverlays"]:
if "thumbnailOverlayTimeStatusRenderer" in o:
length_text = combine_runs(o["thumbnailOverlayTimeStatusRenderer"]["text"])
length_text_style = o["thumbnailOverlayTimeStatusRenderer"]["style"]
if length_text_style == "DEFAULT":
length_seconds = length_text_to_seconds(length_text)
elif length_text_style == "LIVE":
live = True
elif length_text_style == "UPCOMING":
is_upcoming = True
if length_text is None and "lengthText" in base:
length_text = combine_runs(base["lengthText"])
length_seconds = length_text_to_seconds(length_text)
# Shorts
if content_type == "shorts":
title = try_dig(base, "overlayMetadata", "primaryText", "content")
video_id = try_dig(base, "onTap", "innertubeCommand", "reelWatchEndpoint", "videoId")
description = None
description_html = None
view_text = try_dig(base, "overlayMetadata", "secondaryText", "content")
view_count = uncompress_counter(view_text.split(" views")[0])
view_text_short = view_text
published = None
published_text = None
live = False
is_upcoming = False
# Playlists
elif content_type == "playlists":
if "lockupViewModel" in v:
metadata = try_dig(base, "metadata", "lockupMetadataViewModel")
title = try_dig(metadata, "title", "content")
playlist_id = try_dig(base, "contentId")
playlist_thumbnail = try_dig(base, "contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "image", "sources", 0, "url")
video_count_text = try_dig(base, "contentImage", "collectionThumbnailViewModel", "primaryThumbnail", "thumbnailViewModel", "overlays", 0, "thumbnailOverlayBadgeViewModel", "thumbnailBadges", 0, "thumbnailBadgeViewModel", "text")
if video_count_text is not None:
video_count = int(video_count_text.split(" ")[0])
else:
video_count = None
updated_text = try_dig(metadata, "metadata", "contentMetadataViewModel", "metadataRows", 0, "metadataParts", 0, "text", "content")
if updated_text is not None and updated_text.find("dated ") > -1:
updated = past_text_to_time(updated_text.split("dated ")[1])
else:
updated = None
updated_text = None
else:
title = try_dig(base, "title", combine=True)
playlist_id = try_dig(base, "playlistId")
playlist_thumbnail = try_dig(base, "thumbnail", "thumbnails", -1, "url")
video_count_text = try_dig(base, "videoCountText", combine=True)
if video_count_text is not None:
video_count = int(video_count_text.split(" ")[0])
else:
video_count = None
updated_text = None
updated = None
# Normal
else:
title = combine_runs(base["title"])
video_id = base["videoId"]
description = combine_runs(base["descriptionSnippet"])
description_html = add_html_links(escape_html_textcontent(combine_runs(base["descriptionSnippet"])))
view_text = combine_runs(base["viewCountText"])
view_count = uncompress_counter(view_text.split(" ")[0])
view_text_short = combine_runs(base["shortViewCountText"]) if "shortViewCountText" in base else view_text
published_text = combine_runs(base["publishedTimeText"])
published = past_text_to_time(published_text)
if content_type != "playlists":
videos.append({
"type": "video",
"title": title,
"videoId": video_id,
"author": channel["author"],
"authorId": channel["authorId"],
"authorUrl": channel["authorUrl"],
"videoThumbnails": generate_video_thumbnails(video_id),
"description": description,
"descriptionHtml": description_html,
"viewCount": view_count,
"second__viewCountText": view_text,
"second__viewCountTextShort": view_text_short,
"published": published,
"publishedText": published_text,
"lengthSeconds": length_seconds,
"second__lengthText": length_text,
"liveNow": live,
"paid": None,
"premium": None,
"isUpcoming": is_upcoming
})
else:
videos.append({
"type": "playlist",
"title": title,
"playlistId": playlist_id,
"playlistThumbnail": playlist_thumbnail,
"author": channel["author"],
"authorId": channel["authorId"],
"authorUrl": channel["authorUrl"],
"videoCount": video_count,
"videoCountText": video_count_text,
"second__videoCountText": video_count_text,
"videos": [],
"updatedText": updated_text,
"second__updatedText": updated_text,
"updated": updated
})
return {
(content_type == "playlists" and "playlists" or "videos"): videos,
"continuation": continuation
}
# UULF - videos
# UUSH - shorts
# UULV - streams
def extract_channel_videos(ucid, content_type, second__path="channel", **kwargs):
channel = None
if "channel" in kwargs:
channel = kwargs["channel"]
else:
channel = extract_channel_new(ucid, second__path)
if "error" in channel:
return channel
else:
# Reads the channel like a playlist
if channel["ageGated"]:
return extract_channel_videos_as_playlist(ucid, content_type, second__path, kwargs)
# Uses youtube's strange content sorting stuff based on channel content sorting ??? stuff i dunno
else:
continuation = None
params = None
# Videos
if content_type == "videos":
params = "EgZ2aWRlb3PyBgQKAjoA"
# Shorts
elif content_type == "shorts":
params = "8gYFCgOaAQA%3D"
# Streams
elif content_type == "streams":
params = "EgdzdHJlYW1z8gYECgJ6AA%3D%3D"
if "sort_by" in kwargs and kwargs["sort_by"] != "newest":
yt_initial_data = browse(browseId=ucid, params=params)
tabs = try_dig(yt_initial_data, "contents", "twoColumnBrowseResultsRenderer", "tabs")
active_tab = tabs[0]
for tab in tabs:
if "selected" in tab["tabRenderer"]:
active_tab = tab
break
target_id = try_dig(active_tab, "tabRenderer", "content", "richGridRenderer", "targetId")
continuation = produce_channel_content_continuation(channel["authorId"], content_type, 1, kwargs["sort_by"], target_id)
params = None
if params is not None:
yt_initial_data = browse(browseId=ucid, params=params)
else:
yt_initial_data = browse(continuation=continuation)
return extract_videos_from_initial_data(yt_initial_data, channel, content_type)
def extract_channel_videos_as_playlist(ucid, content_type, second__path="channel", **kwargs):
channel = extract_channel_new(ucid, second__path)
if "error" in channel:
return channel
else:
plid = channel["authorId"].replace("UC", {"videos": "UULF", "shorts": "UUSH", "streams": "UULV"}[content_type], 1)
offset = 0
if "continuation" in kwargs:
offset = parse_playlist_continuation(kwargs["continuation"])
videos = get_playlist_videos(plid, offset)
return {
"videos": videos,
"continuation": len(videos) > 0 and produce_playlist_continuation(plid, len(videos) + offset) or None
}
#def extract_channel_latest(ucid, second__path="channel", channel=None):
#return extract_channel_videos(ucid, "videos", second__path, channel=channel)
# TODO: replace this with whatever youtube uses. information like video length is missing
def extract_channel_latest(ucid, second__path, **kwargs):
with channel_latest_cache_lock:
if ucid in channel_latest_cache:
return channel_latest_cache[ucid]
r = requests.get("https://www.youtube.com/feeds/videos.xml?channel_id={}".format(ucid))
if r.status_code == 404:
cherrypy.response.status = 404
return {
"error": "This channel does not exist.",
"identifier": "NOT_FOUND"
}
feed = ET.fromstring(r.content)
author_container = feed.find("{http://www.w3.org/2005/Atom}author")
author = author_container.find("{http://www.w3.org/2005/Atom}name").text
author_url = author_container.find("{http://www.w3.org/2005/Atom}uri").text
channel_id = feed.find("{http://www.youtube.com/xml/schemas/2015}channelId").text
results = []
missing_published = False
for entry in feed.findall("{http://www.w3.org/2005/Atom}entry"):
id = entry.find("{http://www.youtube.com/xml/schemas/2015}videoId").text
video_channel_id = entry.find("{http://www.youtube.com/xml/schemas/2015}channelId").text or channel_id
if len(video_channel_id) == 22 and not video.startswith("UC"):
video_channel_id = "UC" + video_channel_id
media_group = entry.find("{http://search.yahoo.com/mrss/}group")
description = media_group.find("{http://search.yahoo.com/mrss/}description").text or ""
media_community = media_group.find("{http://search.yahoo.com/mrss/}community")
published_entry = entry.find("{http://www.w3.org/2005/Atom}published")
if published_entry is not None: # sometimes youtube does not provide published dates, no idea why.
published = int(dateutil.parser.isoparse(published_entry.text).timestamp())
results.append({
"type": "video",
"title": entry.find("{http://www.w3.org/2005/Atom}title").text,
"videoId": id,
"author": author,
"authorId": video_channel_id,
"authorUrl": author_url,
"videoThumbnails": generate_video_thumbnails(id),
"description": description,
"descriptionHtml": add_html_links(escape_html_textcontent(description)),
"viewCount": int(media_community.find("{http://search.yahoo.com/mrss/}statistics").attrib["views"]),
"published": published,
"publishedText": time_to_past_text(published),
"lengthSeconds": None,
"liveNow": None,
"paid": None,
"premium": None,
"isUpcoming": None
})
else:
missing_published = True
if len(results) == 0 and missing_published: # no results due to all missing published
cherrypy.response.status = 503
return {
"error": "YouTube did not provide published dates for any feed items. This is usually temporary - refresh in a few minutes.",
"identifier": "PUBLISHED_DATES_NOT_PROVIDED"
}
with channel_latest_cache_lock:
channel_latest_cache[ucid] = results
return results
def extract_channel_playlists(ucid, second__path, **kwargs):
channel = extract_channel_new(ucid, second__path)
if "error" in channel:
return channel
else:
sort_by = "newest"
if "sort" in kwargs:
sort_by = kwargs["sort"]
elif "sort_by" in kwargs:
sort_by = kwargs["sort_by"]
sort_by = sort_by.lower()
yt_initial_data = None
if "continuation" in kwargs:
yt_initial_data = browse(continuation=kwargs["continuation"])
else:
params = "EglwbGF5bGlzdHMYBCABMAE%3D"
if sort_by == "newest" or sort_by == "newest_created":
params = "EglwbGF5bGlzdHMYAyABMAE%3D"
yt_initial_data = browse(browseId=ucid, params=params)
with open("meow.json", "w") as f:
f.write(json.dumps(yt_initial_data))
return extract_videos_from_initial_data(yt_initial_data, channel, "playlists")