newleaf/extractors/comments.py

224 lines
10 KiB
Python

import json
import requests
import urllib.parse
from tools.converters import *
from tools.extractors import extract_yt_initial_data, extract_yt_cfg, eu_consent_cookie
from tools.invidious_ported import next
# TODO: support extracting comments from continuation
def extract_comments(id, **kwargs):
s = requests.session()
s.headers.update({"accept-language": "en-US,en;q=0.9"})
s.cookies.set("CONSENT", eu_consent_cookie().get("CONSENT"))
with s.get("https://www.youtube.com/watch?v={}".format(id)) as r:
r.raise_for_status()
yt_initial_data = extract_yt_initial_data(r.content.decode("utf8"))
item = yt_initial_data["contents"]["twoColumnWatchNextResults"]["results"]["results"]["contents"][2]["itemSectionRenderer"]
continuation = item["contents"][0]["continuationItemRenderer"]["continuationEndpoint"]["continuationCommand"]["token"]
yt_initial_data = next(continuation=continuation)
contents = None
header = None
body = None
if "onResponseReceivedEndpoints" in yt_initial_data:
for endpoint in yt_initial_data["onResponseReceivedEndpoints"]:
if "reloadContinuationItemsCommand" in endpoint:
slot = endpoint["reloadContinuationItemsCommand"]["slot"]
if slot == "RELOAD_CONTINUATION_SLOT_HEADER":
header = endpoint["reloadContinuationItemsCommand"]["continuationItems"][0]
elif slot == "RELOAD_CONTINUATION_SLOT_BODY":
contents = try_dig(endpoint, "reloadContinuationItemsCommand", "continuationItems")
elif "appendContinuationItemsAction" in endpoint:
contents = endpoint["appendContinuationItemsAction"]["continuationItems"]
elif "continuationContents" in yt_initial_data:
yt_initial_data = yt_initial_data["continuationContents"]
if "commentRepliesContinuation" in yt_initial_data:
body = yt_initial_data["commentRepliesContinuation"]
else:
body = yt_initial_data["itemSectionContinuation"]
contents = try_dig(body, "contents")
header = try_dig(body, "header")
if contents is None:
return {
"commentCount": 0,
"comments": []
}
continuation_item_renderer = None
for content in contents:
if "continuationItemRenderer" in content:
continuation_item_renderer = content["continuationItemRenderer"]
contents.remove(content)
break
mutations = try_dig(yt_initial_data, "frameworkUpdates", "entityBatchUpdate", "mutations") or []
response = {}
if header is not None:
count_text = combine_runs(header["commentsHeaderRenderer"]["countText"])
response["commentCount"] = view_count_text_to_number(count_text)
# TODO
response["videoId"] = id
response["comments"] = []
reply_count = 0
for node in contents:
comment_data = {}
response["comments"].append(comment_data)
if "commentThreadRenderer" in node:
node = node["commentThreadRenderer"]
node_replies = None
if "replies" in node:
node_replies = node["replies"]["commentRepliesRenderer"]
cvm = try_dig(node, "commentViewModel")
if cvm is not None:
if "commentViewModel" in cvm:
cvm = cvm["commentViewModel"]
comment_key = cvm["commentKey"]
toolbar_key = cvm["toolbarStateKey"]
comment_mutation = None
toolbar_mutation = None
for mutation in mutations:
key = try_dig(mutation, "payload", "commentEntityPayload", "key")
if key == comment_key:
comment_mutation = mutation
break
for mutation in mutations:
key = try_dig(mutation, "entityKey")
if key == toolbar_key:
toolbar_mutation = mutation
break
if comment_mutation is not None and toolbar_mutation is not None:
comment_content = try_dig(comment_mutation, "payload", "commentEntityPayload", "properties", "content")
comment_data["content"] = try_dig(comment_content, "content", combine=True)
comment_author = comment_mutation["payload"]["commentEntityPayload"]["author"]
comment_data["author"] = comment_author["displayName"]
comment_data["authorId"] = comment_author["channelId"]
comment_data["authorUrl"] = "/channel/{}".format(comment_author["channelId"])
comment_data["verified"] = comment_author["isVerified"]
comment_data["authorThumbnails"] = try_dig(comment_mutation, "payload", "commentEntityPayload", "avatar", "image", "sources")
comment_data["authorIsChannelOwner"] = comment_author["isCreator"]
comment_data["isSponsor"] = "sponsorBadgeUrl" in comment_author
if comment_data["isSponsor"]:
comment_data["sponsorIconUrl"] = comment_author["sponsorBadgeUrl"]
comment_toolbar = try_dig(comment_mutation, "payload", "commentEntityPayload", "toolbar")
comment_data["second__likeCount"] = comment_toolbar["likeCountA11y"] if "likeCountA11y" in comment_toolbar else None
comment_data["likeCount"] = uncompress_counter(comment_toolbar["likeCountA11y"].split(" ")[0])
comment_data["second__replyText"] = comment_toolbar["replyCountA11y"]
reply_count = uncompress_counter(try_dig(comment_toolbar, "replyCount") or "0")
heart_state = try_dig(toolbar_mutation, "payload", "engagementToolbarStateEntityPayload", "heartState")
if heart_state is not None and heart_state == "TOOLBAR_HEART_STATE_HEARTED":
comment_data["creatorHeart"] = {
"creatorThumbnail": comment_toolbar["creatorThumbnailUrl"],
"creatorName": comment_toolbar["heartActiveTooltip"].replace("❤ by ", "")
}
comment_data["publishedText"] = try_dig(comment_mutation, "payload", "commentEntityPayload", "properties", "publishedTime", combine=True)
comment_data["isPinned"] = "pinnedText" in cvm
comment_data["commentId"] = cvm["commentId"]
else:
if "comment" in node:
node_comment = node["comment"]["commentRenderer"]
else:
node_comment = node["commentRenderer"]
comment_data["commentId"] = node_comment["commentId"]
comment_content = {"content": try_dig(node_comment, "contentText")} if "contentText" in node_comment else {"content": ""}
comment_data["content"] = comment_content["content"]
comment_data["verified"] = "authorCommentBadge" in node_comment
comment_data["author"] = try_dig(node_comment, "authorText", combine=True)
comment_data["authorThumbnails"] = try_dig(node_comment, "authorThumbnails", "thumbnails")
comment_action_buttons_renderer = try_dig(node_comment, "actionButtons", "commentActionButtonsRenderer")
if comment_action_buttons_renderer is not None:
comment_data["likeCount"] = int(try_dig(comment_action_buttons_renderer, "likeButton", "toggleButtonRenderer", "accessibilityData", "accessibilityData", "label").split(" ")[0])
comment_data["second__likeCount"] = "{} like{}".format(comment_data["likeCount"], "s" if comment_data["likeCount"] != 1 else "")
if "creatorHeart" in comment_action_buttons_renderer:
heart_data = try_dig(comment_action_buttons_renderer, "creatorHeart", "creatorHeartRenderer", "creatorThumbnail")
comment_data["creatorHeart"] = {
"creatorThumbnail": try_dig(heart_data, "thumbnails", -1, "url"),
"creatorName": try_dig(heart_data, "accessibility", "accessibilityData", "label")
}
comment_data["authorId"] = try_dig(node_comment, "authorEndpoint", "browseEndpoint", "browseId")
comment_data["authorUrl"] = try_dig(node_comment, "authorEndpoint", "browseEndpoint", "canonicalBaseUrl")
comment_data["authorIsChannelOwner"] = "authorIsChannelOwner" in node_comment
comment_data["isPinned"] = "pinnedCommentBadge" in node_comment
comment_data["publishedText"] = try_dig(node_comment, "publishedTimeText", combine=True)
comment_data["isSponsor"] = "sponsorCommentBadge" in node_comment
if comment_data["isSponsor"]:
comment_data["sponsorIconUrl"] = try_dig(node_comment, "sponsorCommentBadge", "sponsorCommentBadgeRenderer", "customBadge", "thumbnails", 0, "url")
reply_count = node_comment["replyCount"]
comment_data["contentHtml"] = parse_comment_content(comment_content)
if "publishedText" in comment_data and comment_data["publishedText"] is not None:
comment_data["published"] = past_text_to_time(comment_data["publishedText"].split(" (edited)")[0])
comment_data["isEdited"] = comment_data["publishedText"].find(" (edited)") > -1
continuation = None
if node_replies is not None and not "commentRepliesContinuation" in response:
if "continuations" in node_replies:
continuation = try_dig(node_replies, "continuations", 0, "nextContinuationData", "continuation")
if continuation is None:
continuation = try_dig(node_replies, "contents", 0, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token")
comment_data["replies"] = {
"replyCount": reply_count,
"second__replyText": comment_data["second__replyText"] if "second__replyText" in comment_data else None,
"continuation": continuation
}
comment_data["second__replyText"] = None
if continuation_item_renderer is not None:
continuation_endpoint = try_dig(continuation_item_renderer, "continuationEndpoint") or try_dig(continuation_item_renderer, "button", "buttonRenderer", "command")
if continuation_endpoint is not None:
response["continuation"] = continuation_endpoint["continuationCommand"]["token"]
if not "commentCount" in response:
response["commentCount"] = 0
#response["commentCount"] = len(response["comments"])
return response
def parse_comment_content(content):
contentOffset = 0
lastEnd = 0
segments = []
if "attachmentRuns" in content:
for attachment in content["attachmentRuns"]:
start = attachment["startIndex"] + contentOffset
stop = start + attachment["length"]
segments.append(escape_html_textcontent(content["content"][lastEnd:start]))
substitution = "<img "
element = attachment["element"]
if "height" in element["properties"]["layoutProperties"]:
substitution += "height={} ".format(element["properties"]["layoutProperties"]["height"]["value"])
if "width" in element["properties"]["layoutProperties"]:
substitution += "width={} ".format(element["properties"]["layoutProperties"]["width"]["value"])
substitution += "src='{}'".format(element["type"]["imageType"]["image"]["sources"][0]["url"])
substitution += ">"
contentOffset += len(substitution) - (stop - start)
lastEnd = stop + len(substitution) - (stop - start)
segments.append(substitution)
content["content"] = content["content"][:start] + substitution + content["content"][stop:]
segments.append(escape_html_textcontent(content["content"][lastEnd:]))
return "".join(segments)
#return escape_html_textcontent(content["content"])