224 lines
10 KiB
Python
224 lines
10 KiB
Python
import json
|
|
import requests
|
|
import urllib.parse
|
|
from tools.converters import *
|
|
from tools.extractors import extract_yt_initial_data, extract_yt_cfg, eu_consent_cookie
|
|
from tools.invidious_ported import next
|
|
|
|
# TODO: support extracting comments from continuation
|
|
def extract_comments(id, **kwargs):
|
|
s = requests.session()
|
|
s.headers.update({"accept-language": "en-US,en;q=0.9"})
|
|
s.cookies.set("CONSENT", eu_consent_cookie().get("CONSENT"))
|
|
with s.get("https://www.youtube.com/watch?v={}".format(id)) as r:
|
|
r.raise_for_status()
|
|
yt_initial_data = extract_yt_initial_data(r.content.decode("utf8"))
|
|
item = yt_initial_data["contents"]["twoColumnWatchNextResults"]["results"]["results"]["contents"][2]["itemSectionRenderer"]
|
|
continuation = item["contents"][0]["continuationItemRenderer"]["continuationEndpoint"]["continuationCommand"]["token"]
|
|
|
|
yt_initial_data = next(continuation=continuation)
|
|
|
|
contents = None
|
|
header = None
|
|
body = None
|
|
if "onResponseReceivedEndpoints" in yt_initial_data:
|
|
for endpoint in yt_initial_data["onResponseReceivedEndpoints"]:
|
|
if "reloadContinuationItemsCommand" in endpoint:
|
|
slot = endpoint["reloadContinuationItemsCommand"]["slot"]
|
|
if slot == "RELOAD_CONTINUATION_SLOT_HEADER":
|
|
header = endpoint["reloadContinuationItemsCommand"]["continuationItems"][0]
|
|
elif slot == "RELOAD_CONTINUATION_SLOT_BODY":
|
|
contents = try_dig(endpoint, "reloadContinuationItemsCommand", "continuationItems")
|
|
elif "appendContinuationItemsAction" in endpoint:
|
|
contents = endpoint["appendContinuationItemsAction"]["continuationItems"]
|
|
elif "continuationContents" in yt_initial_data:
|
|
yt_initial_data = yt_initial_data["continuationContents"]
|
|
if "commentRepliesContinuation" in yt_initial_data:
|
|
body = yt_initial_data["commentRepliesContinuation"]
|
|
else:
|
|
body = yt_initial_data["itemSectionContinuation"]
|
|
contents = try_dig(body, "contents")
|
|
header = try_dig(body, "header")
|
|
if contents is None:
|
|
return {
|
|
"commentCount": 0,
|
|
"comments": []
|
|
}
|
|
|
|
continuation_item_renderer = None
|
|
for content in contents:
|
|
if "continuationItemRenderer" in content:
|
|
continuation_item_renderer = content["continuationItemRenderer"]
|
|
contents.remove(content)
|
|
break
|
|
|
|
mutations = try_dig(yt_initial_data, "frameworkUpdates", "entityBatchUpdate", "mutations") or []
|
|
|
|
response = {}
|
|
if header is not None:
|
|
count_text = combine_runs(header["commentsHeaderRenderer"]["countText"])
|
|
response["commentCount"] = view_count_text_to_number(count_text)
|
|
|
|
# TODO
|
|
response["videoId"] = id
|
|
|
|
response["comments"] = []
|
|
reply_count = 0
|
|
for node in contents:
|
|
comment_data = {}
|
|
response["comments"].append(comment_data)
|
|
|
|
if "commentThreadRenderer" in node:
|
|
node = node["commentThreadRenderer"]
|
|
node_replies = None
|
|
if "replies" in node:
|
|
node_replies = node["replies"]["commentRepliesRenderer"]
|
|
|
|
cvm = try_dig(node, "commentViewModel")
|
|
if cvm is not None:
|
|
if "commentViewModel" in cvm:
|
|
cvm = cvm["commentViewModel"]
|
|
|
|
comment_key = cvm["commentKey"]
|
|
toolbar_key = cvm["toolbarStateKey"]
|
|
comment_mutation = None
|
|
toolbar_mutation = None
|
|
for mutation in mutations:
|
|
key = try_dig(mutation, "payload", "commentEntityPayload", "key")
|
|
if key == comment_key:
|
|
comment_mutation = mutation
|
|
break
|
|
for mutation in mutations:
|
|
key = try_dig(mutation, "entityKey")
|
|
if key == toolbar_key:
|
|
toolbar_mutation = mutation
|
|
break
|
|
|
|
if comment_mutation is not None and toolbar_mutation is not None:
|
|
comment_content = try_dig(comment_mutation, "payload", "commentEntityPayload", "properties", "content")
|
|
comment_data["content"] = try_dig(comment_content, "content", combine=True)
|
|
|
|
comment_author = comment_mutation["payload"]["commentEntityPayload"]["author"]
|
|
comment_data["author"] = comment_author["displayName"]
|
|
comment_data["authorId"] = comment_author["channelId"]
|
|
comment_data["authorUrl"] = "/channel/{}".format(comment_author["channelId"])
|
|
comment_data["verified"] = comment_author["isVerified"]
|
|
comment_data["authorThumbnails"] = try_dig(comment_mutation, "payload", "commentEntityPayload", "avatar", "image", "sources")
|
|
comment_data["authorIsChannelOwner"] = comment_author["isCreator"]
|
|
comment_data["isSponsor"] = "sponsorBadgeUrl" in comment_author
|
|
if comment_data["isSponsor"]:
|
|
comment_data["sponsorIconUrl"] = comment_author["sponsorBadgeUrl"]
|
|
|
|
comment_toolbar = try_dig(comment_mutation, "payload", "commentEntityPayload", "toolbar")
|
|
comment_data["second__likeCount"] = comment_toolbar["likeCountA11y"] if "likeCountA11y" in comment_toolbar else None
|
|
comment_data["likeCount"] = uncompress_counter(comment_toolbar["likeCountA11y"].split(" ")[0])
|
|
comment_data["second__replyText"] = comment_toolbar["replyCountA11y"]
|
|
reply_count = uncompress_counter(try_dig(comment_toolbar, "replyCount") or "0")
|
|
|
|
heart_state = try_dig(toolbar_mutation, "payload", "engagementToolbarStateEntityPayload", "heartState")
|
|
if heart_state is not None and heart_state == "TOOLBAR_HEART_STATE_HEARTED":
|
|
comment_data["creatorHeart"] = {
|
|
"creatorThumbnail": comment_toolbar["creatorThumbnailUrl"],
|
|
"creatorName": comment_toolbar["heartActiveTooltip"].replace("❤ by ", "")
|
|
}
|
|
comment_data["publishedText"] = try_dig(comment_mutation, "payload", "commentEntityPayload", "properties", "publishedTime", combine=True)
|
|
comment_data["isPinned"] = "pinnedText" in cvm
|
|
comment_data["commentId"] = cvm["commentId"]
|
|
else:
|
|
if "comment" in node:
|
|
node_comment = node["comment"]["commentRenderer"]
|
|
else:
|
|
node_comment = node["commentRenderer"]
|
|
|
|
comment_data["commentId"] = node_comment["commentId"]
|
|
|
|
comment_content = {"content": try_dig(node_comment, "contentText")} if "contentText" in node_comment else {"content": ""}
|
|
comment_data["content"] = comment_content["content"]
|
|
comment_data["verified"] = "authorCommentBadge" in node_comment
|
|
comment_data["author"] = try_dig(node_comment, "authorText", combine=True)
|
|
comment_data["authorThumbnails"] = try_dig(node_comment, "authorThumbnails", "thumbnails")
|
|
|
|
comment_action_buttons_renderer = try_dig(node_comment, "actionButtons", "commentActionButtonsRenderer")
|
|
if comment_action_buttons_renderer is not None:
|
|
comment_data["likeCount"] = int(try_dig(comment_action_buttons_renderer, "likeButton", "toggleButtonRenderer", "accessibilityData", "accessibilityData", "label").split(" ")[0])
|
|
comment_data["second__likeCount"] = "{} like{}".format(comment_data["likeCount"], "s" if comment_data["likeCount"] != 1 else "")
|
|
if "creatorHeart" in comment_action_buttons_renderer:
|
|
heart_data = try_dig(comment_action_buttons_renderer, "creatorHeart", "creatorHeartRenderer", "creatorThumbnail")
|
|
comment_data["creatorHeart"] = {
|
|
"creatorThumbnail": try_dig(heart_data, "thumbnails", -1, "url"),
|
|
"creatorName": try_dig(heart_data, "accessibility", "accessibilityData", "label")
|
|
}
|
|
|
|
comment_data["authorId"] = try_dig(node_comment, "authorEndpoint", "browseEndpoint", "browseId")
|
|
comment_data["authorUrl"] = try_dig(node_comment, "authorEndpoint", "browseEndpoint", "canonicalBaseUrl")
|
|
comment_data["authorIsChannelOwner"] = "authorIsChannelOwner" in node_comment
|
|
comment_data["isPinned"] = "pinnedCommentBadge" in node_comment
|
|
comment_data["publishedText"] = try_dig(node_comment, "publishedTimeText", combine=True)
|
|
comment_data["isSponsor"] = "sponsorCommentBadge" in node_comment
|
|
if comment_data["isSponsor"]:
|
|
comment_data["sponsorIconUrl"] = try_dig(node_comment, "sponsorCommentBadge", "sponsorCommentBadgeRenderer", "customBadge", "thumbnails", 0, "url")
|
|
reply_count = node_comment["replyCount"]
|
|
|
|
comment_data["contentHtml"] = parse_comment_content(comment_content)
|
|
|
|
if "publishedText" in comment_data and comment_data["publishedText"] is not None:
|
|
comment_data["published"] = past_text_to_time(comment_data["publishedText"].split(" (edited)")[0])
|
|
comment_data["isEdited"] = comment_data["publishedText"].find(" (edited)") > -1
|
|
|
|
continuation = None
|
|
if node_replies is not None and not "commentRepliesContinuation" in response:
|
|
if "continuations" in node_replies:
|
|
continuation = try_dig(node_replies, "continuations", 0, "nextContinuationData", "continuation")
|
|
if continuation is None:
|
|
continuation = try_dig(node_replies, "contents", 0, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token")
|
|
|
|
comment_data["replies"] = {
|
|
"replyCount": reply_count,
|
|
"second__replyText": comment_data["second__replyText"] if "second__replyText" in comment_data else None,
|
|
"continuation": continuation
|
|
}
|
|
comment_data["second__replyText"] = None
|
|
|
|
if continuation_item_renderer is not None:
|
|
continuation_endpoint = try_dig(continuation_item_renderer, "continuationEndpoint") or try_dig(continuation_item_renderer, "button", "buttonRenderer", "command")
|
|
if continuation_endpoint is not None:
|
|
response["continuation"] = continuation_endpoint["continuationCommand"]["token"]
|
|
if not "commentCount" in response:
|
|
response["commentCount"] = 0
|
|
#response["commentCount"] = len(response["comments"])
|
|
|
|
return response
|
|
|
|
def parse_comment_content(content):
|
|
contentOffset = 0
|
|
lastEnd = 0
|
|
|
|
segments = []
|
|
|
|
if "attachmentRuns" in content:
|
|
for attachment in content["attachmentRuns"]:
|
|
start = attachment["startIndex"] + contentOffset
|
|
stop = start + attachment["length"]
|
|
|
|
segments.append(escape_html_textcontent(content["content"][lastEnd:start]))
|
|
|
|
substitution = "<img "
|
|
element = attachment["element"]
|
|
if "height" in element["properties"]["layoutProperties"]:
|
|
substitution += "height={} ".format(element["properties"]["layoutProperties"]["height"]["value"])
|
|
if "width" in element["properties"]["layoutProperties"]:
|
|
substitution += "width={} ".format(element["properties"]["layoutProperties"]["width"]["value"])
|
|
substitution += "src='{}'".format(element["type"]["imageType"]["image"]["sources"][0]["url"])
|
|
substitution += ">"
|
|
|
|
contentOffset += len(substitution) - (stop - start)
|
|
lastEnd = stop + len(substitution) - (stop - start)
|
|
|
|
segments.append(substitution)
|
|
|
|
content["content"] = content["content"][:start] + substitution + content["content"][stop:]
|
|
segments.append(escape_html_textcontent(content["content"][lastEnd:]))
|
|
|
|
return "".join(segments)
|
|
#return escape_html_textcontent(content["content"])
|