-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
151 lines (109 loc) · 4.19 KB
/
main.py
File metadata and controls
151 lines (109 loc) · 4.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from youtube_transcript_api import YouTubeTranscriptApi
from urllib.parse import urlparse, parse_qs
import tiktoken
from robot import Robot
def get_video_id_urllib(url: str):
# https://www.youtube.com/watch?v=WMSjDXUV6Kw&t=637s
v_id = url.split("?")[1].replace("v=", "")
return v_id
def get_youtube_transcript(video_url: str):
# Extract video ID from URL
v_id = get_video_id_urllib(video_url)
try:
# Get transcript
print(v_id)
transcript = YouTubeTranscriptApi.get_transcript(video_id=v_id, languages=["yue"])
# Combine all text
full_transcript = ""
for entry in transcript:
full_transcript += entry['text'] + " "
return full_transcript.strip()
except Exception as e:
print(f"Error: {e}")
return None
def sent_tokenize_cn(text: str) -> list[str]:
r = text.split("。")
new_r = []
for sentence in r:
d = sentence + "。"
new_r.append(d)
return new_r
def count_tokens(text, encoding_name="cl100k_base"):
"""Count tokens in text using tiktoken."""
encoding = tiktoken.get_encoding(encoding_name)
return len(encoding.encode(text))
# Usage
def split_text_by_tokens(text: str, max_tokens=4000, for_chinese=False):
"""Split text by token count while preserving complete sentences."""
sentences = []
sentences = sent_tokenize_cn(text)
chunks = []
current_chunk = []
current_token_count = 0
for sentence in sentences:
sentence_tokens = count_tokens(sentence)
# If a single sentence exceeds the max token limit, we might need to split it
if sentence_tokens > max_tokens:
# Add the current chunk if it's not empty
if current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = []
current_token_count = 0
# Add the long sentence as its own chunk
chunks.append(sentence)
continue
# Check if adding this sentence would exceed the token limit
if current_token_count + sentence_tokens > max_tokens:
# Save the current chunk and start a new one
chunks.append(' '.join(current_chunk))
current_chunk = [sentence]
current_token_count = sentence_tokens
else:
# Add the sentence to the current chunk
current_chunk.append(sentence)
current_token_count += sentence_tokens
# Add the last chunk if there's anything left
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
def send_to_ai_model(text_chunk: str, level: str,
):
"""Send text chunk to an AI model API."""
# print(text_chunk)
# level = 'b2'
try:
r = Robot.transfer_book(text_chunk, level)
return r
except:
print("An exception occurred re try")
r = Robot.transfer_book(text_chunk, level)
return r
def to_text_file(string: str, file_name: str):
with open(file_name, "w") as file:
file.write(string)
def process_youtube(link: str, level: str, max_tokens=4000, is_chinese=True):
"""Process EPUB file and send chunks to AI model."""
# Extract text from EPUB
print(f"Extracting text from {link}...")
full_text = get_youtube_transcript(link)
if full_text is None:
print("Get text error")
# Split text into chunks by token count
print("Splitting text into chunks...")
chunks = split_text_by_tokens(full_text, max_tokens, is_chinese) # pyright: ignore [reportArgumentType]
print(f"Split into {len(chunks)} chunks.")
# Process each chunk with AI model
results = []
for i, chunk in enumerate(chunks):
print(f"Processing chunk {i + 1}/{len(chunks)}...")
result = send_to_ai_model(chunk, level)
results.append(result)
return results
if __name__ == "__main__":
# get the transcript form youtube
# Usage
video_url = "https://www.youtube.com/watch?v=WMSjDXUV6Kw&t=637s"
v_id = get_video_id_urllib(video_url)
trans = process_youtube(video_url, "b1", 100000)
joined_result = "".join(trans)
to_text_file(joined_result, 'text/' + v_id + '.txt')