-
Notifications
You must be signed in to change notification settings - Fork 61
/
Copy pathflomo2notion.py
128 lines (108 loc) · 5.08 KB
/
flomo2notion.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import os
import random
import time
import html2text
from markdownify import markdownify
from flomo.flomo_api import FlomoApi
from notionify import notion_utils
from notionify.md2notion import Md2NotionUploader
from notionify.notion_cover_list import cover
from notionify.notion_helper import NotionHelper
from utils import truncate_string, is_within_n_days
class Flomo2Notion:
def __init__(self):
self.flomo_api = FlomoApi()
self.notion_helper = NotionHelper()
self.uploader = Md2NotionUploader()
def insert_memo(self, memo):
print("insert_memo:", memo)
content_md = markdownify(memo['content'])
parent = {"database_id": self.notion_helper.page_id, "type": "database_id"}
content_text = html2text.html2text(memo['content'])
properties = {
"标题": notion_utils.get_title(
truncate_string(content_text)
),
"标签": notion_utils.get_multi_select(
memo['tags']
),
"是否置顶": notion_utils.get_select("否" if memo['pin'] == 0 else "是"),
# 文件的处理方式待定
# "文件": notion_utils.get_file(""),
# slug是文章唯一标识
"slug": notion_utils.get_rich_text(memo['slug']),
"创建时间": notion_utils.get_date(memo['created_at']),
"更新时间": notion_utils.get_date(memo['updated_at']),
"来源": notion_utils.get_select(memo['source']),
"链接数量": notion_utils.get_number(memo['linked_count']),
}
random_cover = random.choice(cover)
print(f"Random element: {random_cover}")
page = self.notion_helper.client.pages.create(
parent=parent,
icon=notion_utils.get_icon("https://www.notion.so/icons/target_red.svg"),
cover=notion_utils.get_icon(random_cover),
properties=properties,
)
# 在page里面添加content
self.uploader.uploadSingleFileContent(self.notion_helper.client, content_md, page['id'])
def update_memo(self, memo, page_id):
print("update_memo:", memo)
content_md = markdownify(memo['content'])
# 只更新内容
content_text = html2text.html2text(memo['content'])
properties = {
"标题": notion_utils.get_title(
truncate_string(content_text)
),
"更新时间": notion_utils.get_date(memo['updated_at']),
"链接数量": notion_utils.get_number(memo['linked_count']),
"标签": notion_utils.get_multi_select(
memo['tags']
),
"是否置顶": notion_utils.get_select("否" if memo['pin'] == 0 else "是"),
}
page = self.notion_helper.client.pages.update(page_id=page_id, properties=properties)
# 先清空page的内容,再重新写入
self.notion_helper.clear_page_content(page["id"])
self.uploader.uploadSingleFileContent(self.notion_helper.client, content_md, page['id'])
# 具体步骤:
# 1. 调用flomo web端的api从flomo获取数据
# 2. 轮询flomo的列表数据,调用notion api将数据同步写入到database中的page
def sync_to_notion(self):
# 1. 调用flomo web端的api从flomo获取数据
authorization = os.getenv("FLOMO_TOKEN")
memo_list = []
latest_updated_at = "0"
while True:
new_memo_list = self.flomo_api.get_memo_list(authorization, latest_updated_at)
if not new_memo_list:
break
memo_list.extend(new_memo_list)
latest_updated_at = str(int(time.mktime(time.strptime(new_memo_list[-1]['updated_at'], "%Y-%m-%d %H:%M:%S"))))
# 2. 调用notion api获取数据库存在的记录,用slug标识唯一,如果存在则更新,不存在则写入
notion_memo_list = self.notion_helper.query_all(self.notion_helper.page_id)
slug_map = {}
for notion_memo in notion_memo_list:
slug_map[notion_utils.get_rich_text_from_result(notion_memo, "slug")] = notion_memo.get("id")
# 3. 轮询flomo的列表数据
for memo in memo_list:
# 3.1 判断memo的slug是否存在,不存在则写入
# 3.2 防止大批量更新,只更新更新时间为制定时间的数据(默认为7天)
if memo['slug'] in slug_map.keys():
# 是否全量更新,默认否
full_update = os.getenv("FULL_UPDATE", False)
interval_day = os.getenv("UPDATE_INTERVAL_DAY", 7)
if not full_update and not is_within_n_days(memo['updated_at'], interval_day):
print("is_within_n_days slug:", memo['slug'])
continue
page_id = slug_map[memo['slug']]
self.update_memo(memo, page_id)
else:
self.insert_memo(memo)
if __name__ == "__main__":
# flomo同步到notion入口
flomo2notion = Flomo2Notion()
flomo2notion.sync_to_notion()
# notionify key
# secret_IHWKSLUTqUh3A8TIKkeXWePu3PucwHiRwDEcqNp5uT3