|
| 1 | +import os |
| 2 | +import base64 |
| 3 | +import re |
| 4 | +import requests |
| 5 | +from urllib import parse |
| 6 | +from pathlib import Path |
| 7 | +from requests_oauthlib import OAuth1 |
| 8 | +from bs4 import BeautifulSoup |
| 9 | +from rich import print |
| 10 | +from rich.progress import ( |
| 11 | + Progress, |
| 12 | + SpinnerColumn, |
| 13 | + TextColumn, |
| 14 | + BarColumn, |
| 15 | + TaskProgressColumn, |
| 16 | + TimeRemainingColumn, |
| 17 | +) |
| 18 | +import typer |
| 19 | + |
| 20 | +from .config import get_config |
| 21 | + |
| 22 | + |
| 23 | +_HATENA_BLOG_URL = f"https://blog.hatena.ne.jp/" |
| 24 | +_HATENA_FOTOLIFE_POST_URL = "https://f.hatena.ne.jp/atom/post" |
| 25 | +_HATENA_FOTOLIFE_EDIT_URL = "https://f.hatena.ne.jp/atom/edit" |
| 26 | + |
| 27 | + |
| 28 | +class Spinner(Progress): |
| 29 | + def __init__(self): |
| 30 | + super().__init__( |
| 31 | + SpinnerColumn(), |
| 32 | + TextColumn("[progress.description]{task.description}"), |
| 33 | + ) |
| 34 | + |
| 35 | + |
| 36 | +class ProgressBar(Progress): |
| 37 | + def __init__(self): |
| 38 | + super().__init__( |
| 39 | + TextColumn("[progress.description]{task.description}"), |
| 40 | + BarColumn(), |
| 41 | + TaskProgressColumn(), |
| 42 | + TimeRemainingColumn(elapsed_when_finished=True), |
| 43 | + ) |
| 44 | + |
| 45 | + |
| 46 | +def _convert(blog_path: Path = None): |
| 47 | + with Spinner() as progress: |
| 48 | + progress.add_task("Converting Markdown to HTML...") |
| 49 | + |
| 50 | + pandoc_path = get_config("path:pandoc") |
| 51 | + template_path = get_config("path:tempalte") |
| 52 | + os.system( |
| 53 | + f"{pandoc_path} --quiet --standalone --template '{template_path}' \ |
| 54 | + --output '{blog_path.with_suffix('.html')}' '{blog_path}'" |
| 55 | + ) |
| 56 | + |
| 57 | + |
| 58 | +def upload_entry(blog_title: str, blog_path: Path, with_images: bool, publish: bool): |
| 59 | + _convert(blog_path) |
| 60 | + auth = auth.get() |
| 61 | + if with_images: |
| 62 | + _upload_images(blog_title, blog_path, auth) |
| 63 | + _upload_html(blog_title, blog_path, publish, auth) |
| 64 | + |
| 65 | + |
| 66 | +def _upload_images(blog_title: str, blog_path: Path, auth: OAuth1): |
| 67 | + with ProgressBar() as progress: |
| 68 | + image_pattern = get_config("image:pattern") |
| 69 | + image_replace = get_config("image:replace") |
| 70 | + with open(blog_path.with_suffix(".html"), mode="r") as file: |
| 71 | + content = file.read() |
| 72 | + |
| 73 | + total = len(re.findall(image_pattern, content)) |
| 74 | + task = progress.add_task("Uploading images...", total=total) |
| 75 | + |
| 76 | + def callback(match: re.Match): |
| 77 | + progress.advance(task, 1) |
| 78 | + image_path = parse.unquote(match.group(1)) |
| 79 | + image_url = _upload_image(blog_title, image_path, auth) |
| 80 | + return image_replace.replace(r"\1", image_url) |
| 81 | + |
| 82 | + re.sub(image_pattern, callback, content) |
| 83 | + |
| 84 | + |
| 85 | +def _upload_image(blog_title: str, image_path: Path, auth: OAuth1) -> Path: |
| 86 | + with open(image_path, mode="rb") as file: |
| 87 | + content = file.read() |
| 88 | + content = base64.b64encode(content).decode() |
| 89 | + |
| 90 | + # Maximum length for <dc:subject /> is 24 |
| 91 | + res = requests.post( |
| 92 | + _HATENA_FOTOLIFE_POST_URL, |
| 93 | + auth=auth, |
| 94 | + timeout=None, |
| 95 | + headers={"Content-Type": "application/xml"}, |
| 96 | + data=f"""\ |
| 97 | +<?xml version="1.0"?> |
| 98 | +<entry xmlns="http://purl.org/atom/ns#"> |
| 99 | + <dc:subject>{blog_title[:24]}</dc:subject> |
| 100 | + <title>{blog_title}</title> |
| 101 | + <content mode="base64" type="image/png">{content}</content> |
| 102 | +</entry>""", |
| 103 | + ) |
| 104 | + |
| 105 | + if res.status_code != 201: |
| 106 | + print("[red]Error while uploading image.") |
| 107 | + raise typer.Abort() |
| 108 | + |
| 109 | + xml = BeautifulSoup(res.text, features="xml") |
| 110 | + url = xml.find("entry").find("hatena:imageurl").text |
| 111 | + return Path(url) |
| 112 | + |
| 113 | + |
| 114 | +def _upload_html(blog_title: str, blog_path: Path, publish: bool, auth: OAuth1): |
| 115 | + with Spinner() as progress: |
| 116 | + progress.add_task("Uploading HTML...") |
| 117 | + |
| 118 | + with open(blog_path.with_suffix("html"), mode="r") as file: |
| 119 | + content = file.read() |
| 120 | + |
| 121 | + username = get_config("blog:username") |
| 122 | + domain = get_config("blog:domain") |
| 123 | + res = requests.post( |
| 124 | + f"{_HATENA_BLOG_URL}/{username}/{domain}/atom/entry", |
| 125 | + auth=auth, |
| 126 | + timeout=None, |
| 127 | + headers={"Content-Type": "application/xml; charset=utf-8"}, |
| 128 | + data=f"""\ |
| 129 | + <?xml version="1.0" encoding="utf-8"?> |
| 130 | + <entry xmlns="http://www.w3.org/2005/Atom" xmlns:app="http://www.w3.org/2007/app"> |
| 131 | + <title>{blog_title}</title> |
| 132 | + <author><name>name</name></author> |
| 133 | + <content type="text/html"><![CDATA[{content}]]></content> |
| 134 | + <app:control><app:draft>{'no' if publish else 'yes'}</app:draft></app:control> |
| 135 | + </entry>""".encode( |
| 136 | + "utf-8" |
| 137 | + ), |
| 138 | + ) |
| 139 | + |
| 140 | + if res.status_code != 201: |
| 141 | + print("[red]Error while uploading HTML.") |
| 142 | + raise typer.Abort() |
| 143 | + |
| 144 | + |
| 145 | +def update_entry( |
| 146 | + blog_id: int, blog_title: str, blog_path: Path, with_images: bool, publish: bool |
| 147 | +): |
| 148 | + _convert(blog_path) |
| 149 | + auth = auth.get() |
| 150 | + if with_images: |
| 151 | + _update_images(blog_id, blog_title, blog_path, auth) |
| 152 | + _update_html(blog_id, blog_title, blog_path, publish, auth) |
| 153 | + |
| 154 | + |
| 155 | +def _update_images(blog_id: int, blog_title: str, blog_path: Path, auth: OAuth1): |
| 156 | + with Spinner() as progress: |
| 157 | + progress.add_task("Updating images...") |
| 158 | + _delete_images(blog_id, auth) |
| 159 | + _upload_images(blog_title, blog_path, auth) |
| 160 | + |
| 161 | + |
| 162 | +def _update_html( |
| 163 | + blog_id: int, blog_title: str, blog_path: Path, publish: bool, auth: OAuth1 |
| 164 | +): |
| 165 | + with Spinner() as progress: |
| 166 | + progress.add_task("Updating HTML...") |
| 167 | + |
| 168 | + with open(blog_path.with_suffix("html"), mode="r") as file: |
| 169 | + content = file.read() |
| 170 | + |
| 171 | + username = get_config("blog:username") |
| 172 | + domain = get_config("blog:domain") |
| 173 | + res = requests.put( |
| 174 | + f"{_HATENA_BLOG_URL}/{username}/{domain}/atom/entry/{blog_id}", |
| 175 | + auth=auth, |
| 176 | + timeout=None, |
| 177 | + headers={"Content-Type": "application/xml; charset=utf-8"}, |
| 178 | + data=f"""\ |
| 179 | + <?xml version="1.0" encoding="utf-8"?> |
| 180 | + <entry xmlns="http://www.w3.org/2005/Atom" xmlns:app="http://www.w3.org/2007/app"> |
| 181 | + <title>{blog_title}</title> |
| 182 | + <author><name>name</name></author> |
| 183 | + <content type="text/html"><![CDATA[{content}]]></content> |
| 184 | + <app:control><app:draft>{'no' if publish else 'yes'}</app:draft></app:control> |
| 185 | + </entry>""".encode( |
| 186 | + "utf-8" |
| 187 | + ), |
| 188 | + ) |
| 189 | + if res.status_code != 200: |
| 190 | + print("[red]Error while updating HTML.") |
| 191 | + raise typer.Abort() |
| 192 | + |
| 193 | + |
| 194 | +def delete_entry(blog_id: int, with_images: bool): |
| 195 | + auth = auth.get() |
| 196 | + if with_images: |
| 197 | + _delete_images(blog_id, auth) |
| 198 | + _delete_html(blog_id, auth) |
| 199 | + |
| 200 | + |
| 201 | +def _delete_images(blog_id: int, auth: OAuth1): |
| 202 | + with ProgressBar() as progress: |
| 203 | + username = get_config("blog:username") |
| 204 | + domain = get_config("blog:domain") |
| 205 | + res = requests.get( |
| 206 | + f"{_HATENA_BLOG_URL}/{username}/{domain}/{blog_id}", |
| 207 | + auth=auth, |
| 208 | + timeout=None, |
| 209 | + ) |
| 210 | + |
| 211 | + if res.status_code != 200: |
| 212 | + print("[red]Error while downloading HTML.") |
| 213 | + raise typer.Abort() |
| 214 | + |
| 215 | + xml = BeautifulSoup(res.text, features="xml") |
| 216 | + content = xml.find("entry").find("hatena:formatted-content").text |
| 217 | + content = parse.unquote(content) |
| 218 | + |
| 219 | + image_pattern = get_config("image:pattern") |
| 220 | + total = len(re.findall(image_pattern, content)) |
| 221 | + task = progress.add_task("Deleting images...", total=total) |
| 222 | + |
| 223 | + def callback(match: re.Match): |
| 224 | + progress.advance(task, 1) |
| 225 | + image_path = parse.unquote(match.group(1)) |
| 226 | + image_id = os.path.splitext(os.path.basename(image_path))[0] |
| 227 | + res = requests.delete( |
| 228 | + f"{_HATENA_FOTOLIFE_EDIT_URL}/{image_id}", auth=auth, timeout=None |
| 229 | + ) |
| 230 | + if res.status_code != 200: |
| 231 | + print("[red]Error while deleting image.") |
| 232 | + raise typer.Abort() |
| 233 | + |
| 234 | + re.sub(image_pattern, callback, content) |
| 235 | + |
| 236 | + |
| 237 | +def _delete_html(blog_id: int, auth: OAuth1): |
| 238 | + with Spinner() as progress: |
| 239 | + progress.add_task("Deleting HTML...") |
| 240 | + |
| 241 | + username = get_config("blog:username") |
| 242 | + domain = get_config("blog:domain") |
| 243 | + res = requests.delete( |
| 244 | + f"{_HATENA_BLOG_URL}/{username}/{domain}/{blog_id}", |
| 245 | + auth=auth, |
| 246 | + timeout=None, |
| 247 | + ) |
| 248 | + |
| 249 | + if res.status_code != 200: |
| 250 | + print("[red]Error while deleting HTML.") |
| 251 | + raise typer.Abort() |
0 commit comments