import json import requests import urllib.parse from tools.converters import * from tools.extractors import extract_yt_initial_data, extract_yt_cfg, eu_consent_cookie from tools.invidious_ported import next # TODO: support extracting comments from continuation def extract_comments(id, **kwargs): s = requests.session() s.headers.update({"accept-language": "en-US,en;q=0.9"}) s.cookies.set("CONSENT", eu_consent_cookie().get("CONSENT")) with s.get("https://www.youtube.com/watch?v={}".format(id)) as r: r.raise_for_status() yt_initial_data = extract_yt_initial_data(r.content.decode("utf8")) item = yt_initial_data["contents"]["twoColumnWatchNextResults"]["results"]["results"]["contents"][2]["itemSectionRenderer"] continuation = item["contents"][0]["continuationItemRenderer"]["continuationEndpoint"]["continuationCommand"]["token"] yt_initial_data = next(continuation=continuation) contents = None header = None body = None if "onResponseReceivedEndpoints" in yt_initial_data: for endpoint in yt_initial_data["onResponseReceivedEndpoints"]: if "reloadContinuationItemsCommand" in endpoint: slot = endpoint["reloadContinuationItemsCommand"]["slot"] if slot == "RELOAD_CONTINUATION_SLOT_HEADER": header = endpoint["reloadContinuationItemsCommand"]["continuationItems"][0] elif slot == "RELOAD_CONTINUATION_SLOT_BODY": contents = try_dig(endpoint, "reloadContinuationItemsCommand", "continuationItems") elif "appendContinuationItemsAction" in endpoint: contents = endpoint["appendContinuationItemsAction"]["continuationItems"] elif "continuationContents" in yt_initial_data: yt_initial_data = yt_initial_data["continuationContents"] if "commentRepliesContinuation" in yt_initial_data: body = yt_initial_data["commentRepliesContinuation"] else: body = yt_initial_data["itemSectionContinuation"] contents = try_dig(body, "contents") header = try_dig(body, "header") if contents is None: return { "commentCount": 0, "comments": [] } continuation_item_renderer = None for content in contents: if "continuationItemRenderer" in content: continuation_item_renderer = content["continuationItemRenderer"] contents.remove(content) break mutations = try_dig(yt_initial_data, "frameworkUpdates", "entityBatchUpdate", "mutations") or [] response = {} if header is not None: count_text = combine_runs(header["commentsHeaderRenderer"]["countText"]) response["commentCount"] = view_count_text_to_number(count_text) # TODO response["videoId"] = id response["comments"] = [] reply_count = 0 for node in contents: comment_data = {} response["comments"].append(comment_data) if "commentThreadRenderer" in node: node = node["commentThreadRenderer"] node_replies = None if "replies" in node: node_replies = node["replies"]["commentRepliesRenderer"] cvm = try_dig(node, "commentViewModel") if cvm is not None: if "commentViewModel" in cvm: cvm = cvm["commentViewModel"] comment_key = cvm["commentKey"] toolbar_key = cvm["toolbarStateKey"] comment_mutation = None toolbar_mutation = None for mutation in mutations: key = try_dig(mutation, "payload", "commentEntityPayload", "key") if key == comment_key: comment_mutation = mutation break for mutation in mutations: key = try_dig(mutation, "entityKey") if key == toolbar_key: toolbar_mutation = mutation break if comment_mutation is not None and toolbar_mutation is not None: comment_content = try_dig(comment_mutation, "payload", "commentEntityPayload", "properties", "content") comment_data["content"] = try_dig(comment_content, "content", combine=True) comment_author = comment_mutation["payload"]["commentEntityPayload"]["author"] comment_data["author"] = comment_author["displayName"] comment_data["authorId"] = comment_author["channelId"] comment_data["authorUrl"] = "/channel/{}".format(comment_author["channelId"]) comment_data["verified"] = comment_author["isVerified"] comment_data["authorThumbnails"] = try_dig(comment_mutation, "payload", "commentEntityPayload", "avatar", "image", "sources") comment_data["authorIsChannelOwner"] = comment_author["isCreator"] comment_data["isSponsor"] = "sponsorBadgeUrl" in comment_author if comment_data["isSponsor"]: comment_data["sponsorIconUrl"] = comment_author["sponsorBadgeUrl"] comment_toolbar = try_dig(comment_mutation, "payload", "commentEntityPayload", "toolbar") comment_data["second__likeCount"] = comment_toolbar["likeCountA11y"] if "likeCountA11y" in comment_toolbar else None comment_data["likeCount"] = uncompress_counter(comment_toolbar["likeCountA11y"].split(" ")[0]) comment_data["second__replyText"] = comment_toolbar["replyCountA11y"] reply_count = uncompress_counter(try_dig(comment_toolbar, "replyCount") or "0") heart_state = try_dig(toolbar_mutation, "payload", "engagementToolbarStateEntityPayload", "heartState") if heart_state is not None and heart_state == "TOOLBAR_HEART_STATE_HEARTED": comment_data["creatorHeart"] = { "creatorThumbnail": comment_toolbar["creatorThumbnailUrl"], "creatorName": comment_toolbar["heartActiveTooltip"].replace("❤ by ", "") } comment_data["publishedText"] = try_dig(comment_mutation, "payload", "commentEntityPayload", "properties", "publishedTime", combine=True) comment_data["isPinned"] = "pinnedText" in cvm comment_data["commentId"] = cvm["commentId"] else: if "comment" in node: node_comment = node["comment"]["commentRenderer"] else: node_comment = node["commentRenderer"] comment_data["commentId"] = node_comment["commentId"] comment_content = {"content": try_dig(node_comment, "contentText")} if "contentText" in node_comment else {"content": ""} comment_data["content"] = comment_content["content"] comment_data["verified"] = "authorCommentBadge" in node_comment comment_data["author"] = try_dig(node_comment, "authorText", combine=True) comment_data["authorThumbnails"] = try_dig(node_comment, "authorThumbnails", "thumbnails") comment_action_buttons_renderer = try_dig(node_comment, "actionButtons", "commentActionButtonsRenderer") if comment_action_buttons_renderer is not None: comment_data["likeCount"] = int(try_dig(comment_action_buttons_renderer, "likeButton", "toggleButtonRenderer", "accessibilityData", "accessibilityData", "label").split(" ")[0]) comment_data["second__likeCount"] = "{} like{}".format(comment_data["likeCount"], "s" if comment_data["likeCount"] != 1 else "") if "creatorHeart" in comment_action_buttons_renderer: heart_data = try_dig(comment_action_buttons_renderer, "creatorHeart", "creatorHeartRenderer", "creatorThumbnail") comment_data["creatorHeart"] = { "creatorThumbnail": try_dig(heart_data, "thumbnails", -1, "url"), "creatorName": try_dig(heart_data, "accessibility", "accessibilityData", "label") } comment_data["authorId"] = try_dig(node_comment, "authorEndpoint", "browseEndpoint", "browseId") comment_data["authorUrl"] = try_dig(node_comment, "authorEndpoint", "browseEndpoint", "canonicalBaseUrl") comment_data["authorIsChannelOwner"] = "authorIsChannelOwner" in node_comment comment_data["isPinned"] = "pinnedCommentBadge" in node_comment comment_data["publishedText"] = try_dig(node_comment, "publishedTimeText", combine=True) comment_data["isSponsor"] = "sponsorCommentBadge" in node_comment if comment_data["isSponsor"]: comment_data["sponsorIconUrl"] = try_dig(node_comment, "sponsorCommentBadge", "sponsorCommentBadgeRenderer", "customBadge", "thumbnails", 0, "url") reply_count = node_comment["replyCount"] comment_data["contentHtml"] = parse_comment_content(comment_content) if "publishedText" in comment_data and comment_data["publishedText"] is not None: comment_data["published"] = past_text_to_time(comment_data["publishedText"].split(" (edited)")[0]) comment_data["isEdited"] = comment_data["publishedText"].find(" (edited)") > -1 continuation = None if node_replies is not None and not "commentRepliesContinuation" in response: if "continuations" in node_replies: continuation = try_dig(node_replies, "continuations", 0, "nextContinuationData", "continuation") if continuation is None: continuation = try_dig(node_replies, "contents", 0, "continuationItemRenderer", "continuationEndpoint", "continuationCommand", "token") comment_data["replies"] = { "replyCount": reply_count, "second__replyText": comment_data["second__replyText"] if "second__replyText" in comment_data else None, "continuation": continuation } comment_data["second__replyText"] = None if continuation_item_renderer is not None: continuation_endpoint = try_dig(continuation_item_renderer, "continuationEndpoint") or try_dig(continuation_item_renderer, "button", "buttonRenderer", "command") if continuation_endpoint is not None: response["continuation"] = continuation_endpoint["continuationCommand"]["token"] if not "commentCount" in response: response["commentCount"] = 0 #response["commentCount"] = len(response["comments"]) return response def parse_comment_content(content): contentOffset = 0 lastEnd = 0 segments = [] if "attachmentRuns" in content: for attachment in content["attachmentRuns"]: start = attachment["startIndex"] + contentOffset stop = start + attachment["length"] segments.append(escape_html_textcontent(content["content"][lastEnd:start])) substitution = "