-
Notifications
You must be signed in to change notification settings - Fork 26
Youtube video finder using geminillm #32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Youtube video finder using geminillm #32
Conversation
This is created by me as part one of the company assesment as first round in their hiring finding best youtube videos using youtube api and Gemini llm
Developed by me as a company hiring assesment project to findbest youtube videos using yt api and gemini llm at backend
"Mmentioned about Youtube API"
👋 @veerababu1729 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @veerababu1729, Please make the respecified changes for your PR to get approved.
"This is a Open source project initiated by me -you can fork, edit, and make a pull request" | ||
|
||
````markdown |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is not required.
"This is a Open source project initiated by me -you can fork, edit, and make a pull request" | |
````markdown |
## 📃 License | ||
|
||
Open source – feel free to use and modify |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refrain from adding obvious information. This repo is licensed under the Apache, and it's present within the root directory of this repo. So, this part is unnecessary.
## 📃 License | |
Open source – feel free to use and modify |
filtered_videos.append({ | ||
"id": item["id"], | ||
"title": item["snippet"]["title"], | ||
"duration": total_seconds, | ||
"publishedAt": item["snippet"]["publishedAt"] | ||
}) | ||
|
||
# If we've found enough videos, we can stop | ||
if len(filtered_videos) >= max_filtered_results: | ||
break | ||
except Exception as e: | ||
print(f"Could not parse duration for video {item.get('id', 'N/A')}: {e}") | ||
continue | ||
|
||
# Check if there are more pages of results | ||
next_page_token = search_response.get("nextPageToken") | ||
if not next_page_token: | ||
break | ||
|
||
print(f"Found {len(filtered_videos)} qualifying videos so far. Searching next page...") | ||
|
||
print(f"Search completed. Found {len(filtered_videos)} videos meeting criteria.") | ||
return filtered_videos | ||
|
||
|
||
def score_title(title, query): | ||
"""Score a video title's relevance to the query using Gemini AI.""" | ||
prompt = ( | ||
f"Query: {query}\n" | ||
f"Title: {title}\n" | ||
"Rate relevance & quality 1–10 (just give the number)." | ||
) | ||
try: | ||
response = model.generate_content(prompt) | ||
score_text = response.text.strip() | ||
# Try to extract just the number if there's additional text | ||
import re | ||
match = re.search(r'\b([0-9]|10)(\.[0-9]+)?\b', score_text) | ||
if match: | ||
score = float(match.group(0)) | ||
else: | ||
score = float(score_text) | ||
return score | ||
except ValueError: | ||
print(f"Model returned non-numeric score for '{title}': '{score_text}'") | ||
return 5.0 # Default middle score instead of 0 | ||
except Exception as e: | ||
print(f"Error scoring title '{title}': {e}") | ||
if 'response' in locals() and hasattr(response, 'text'): | ||
print(f"API response text: {response.text}") | ||
return 5.0 # Default middle score | ||
|
||
|
||
def pick_best(query, num_results=20): | ||
""" | ||
Find and score the best YouTube videos for a query. | ||
|
||
Args: | ||
query: Search query string | ||
num_results: Number of top videos to return | ||
""" | ||
# Get more videos than we need to ensure we have enough after scoring | ||
vids = search_videos(query, max_filtered_results=max(30, num_results * 1.5)) | ||
|
||
if not vids: | ||
print("No suitable videos found after applying filters.") | ||
return | ||
|
||
# Score each video | ||
print(f"Scoring {len(vids)} videos...") | ||
for i, v in enumerate(vids): | ||
v["score"] = score_title(v["title"], query) | ||
print(f" Scored video {i+1}/{len(vids)}: '{v['title']}' - Score: {v['score']:.2f}") | ||
|
||
# Sort by score in descending order | ||
vids.sort(key=lambda x: x.get("score", 0.0), reverse=True) | ||
|
||
# Print the top num_results | ||
result_count = min(num_results, len(vids)) | ||
print(f"\n--- Top {result_count} Relevant Videos ---") | ||
|
||
for i, video in enumerate(vids[:num_results]): | ||
print(f"\n{i+1}.") | ||
print(f" • Title: {video.get('title', 'N/A')}") | ||
print(f" • URL: https://youtu.be/{video.get('id', 'N/A')}") | ||
print(f" • Score: {video.get('score', 0.0):.2f}") | ||
duration_sec = video.get('duration', 0) | ||
print(f" • Duration: {duration_sec // 60}m{duration_sec % 60:02d}s") | ||
print(f" • Published: {video.get('publishedAt', 'N/A')}") | ||
|
||
|
||
# —— RUN IT! —— | ||
if __name__ == "__main__": | ||
# Check if API keys are set | ||
if "YT_API_KEY" not in os.environ or "GEMINI_API_KEY" not in os.environ: | ||
print("Error: YouTube and/or Gemini API keys not set in environment variables.") | ||
else: | ||
user_query = input("Enter your search (voice-to-text or text): ") | ||
# Call pick_best with the desired number of results | ||
pick_best(user_query, num_results=20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your implementation is good, but there are a couple of things you need to work on, like,
- Include a shebang if you were meant to use it as a script.
- Initialising the config as global. (it maybe commonly used as scripting, but it's always better to define its own boundary using classes)
- Using deprecated methods like
datetime.datetime.utcnow()
. - Not defining constants for static values that define the behaviour and limits.
- Implementing direct mathematical logic without abstraction.
- Not using
enumeration
properly. - Not enclosing valuable data and object instance (like
model
) within class to restrict access, thus keeping the integrity of source code. - Logic needs to be divided into each block of logic (i.e., either as functions or methods)
- Too many comments, code should be self-documenting.
- Need to follow best practices and format the code.
Just to name a few, but here's a suggested version of your code. This not might be the best, but it does indeed resolve the above issue for the most part. Hope you learn a thing or two from it.
import os | |
import datetime | |
from googleapiclient.discovery import build | |
import google.generativeai as genai | |
# ——— CONFIG ——— | |
# Initialize clients with environment variables | |
yt = build("youtube", "v3", developerKey=os.environ["YT_API_KEY"]) | |
# Configure the Google Generative AI client | |
genai.configure(api_key=os.environ["GEMINI_API_KEY"]) | |
# Initialize the Gemini model | |
model = genai.GenerativeModel('gemini-1.5-flash-latest') | |
def search_videos(query, max_filtered_results=20): | |
""" | |
Search for YouTube videos matching a query, filtering by recency and duration. | |
This function keeps searching until it finds enough videos that meet the criteria | |
or exhausts the search results. | |
""" | |
# Calculate publishedAfter timestamp (14 days ago) | |
fourteen_days_ago = (datetime.datetime.utcnow() | |
- datetime.timedelta(days=14)).isoformat("T") + "Z" | |
filtered_videos = [] | |
next_page_token = None | |
page_count = 0 | |
max_pages = 5 # Limit the number of pages to search to avoid excessive API calls | |
# Continue searching until we have enough filtered videos or run out of results | |
while len(filtered_videos) < max_filtered_results and page_count < max_pages: | |
# Step 1: Search for videos matching the query | |
search_response = yt.search().list( | |
q=query, | |
part="id,snippet", | |
type="video", | |
order="relevance", | |
publishedAfter=fourteen_days_ago, | |
maxResults=50, # Maximum allowed by the API | |
pageToken=next_page_token | |
).execute() | |
page_count += 1 | |
# Step 2: Collect video IDs from this page | |
video_ids = [item["id"]["videoId"] for item in search_response.get("items", [])] | |
# Break if no more videos found | |
if not video_ids: | |
break | |
# Step 3: Get details for the fetched videos | |
details = yt.videos().list( | |
part="contentDetails,snippet", | |
id=",".join(video_ids) | |
).execute() | |
# Step 4: Filter by duration (4–20 minutes) | |
for item in details.get("items", []): | |
try: | |
# Parse duration (ISO 8601 format, e.g. "PT5M30S") | |
dur = item["contentDetails"]["duration"].replace("PT","") | |
# Skip videos with hours or without minutes | |
if "H" in dur or "M" not in dur: | |
continue | |
# Split minutes and seconds | |
parts = dur.split("M") | |
mins = int(parts[0]) | |
secs = parts[1].replace("S","") if len(parts) > 1 else "0" | |
seconds = int(secs) if secs else 0 | |
total_seconds = mins * 60 + seconds | |
# Filter by duration (4 to 20 minutes inclusive) | |
if 4 * 60 <= total_seconds <= 20 * 60: | |
filtered_videos.append({ | |
"id": item["id"], | |
"title": item["snippet"]["title"], | |
"duration": total_seconds, | |
"publishedAt": item["snippet"]["publishedAt"] | |
}) | |
# If we've found enough videos, we can stop | |
if len(filtered_videos) >= max_filtered_results: | |
break | |
except Exception as e: | |
print(f"Could not parse duration for video {item.get('id', 'N/A')}: {e}") | |
continue | |
# Check if there are more pages of results | |
next_page_token = search_response.get("nextPageToken") | |
if not next_page_token: | |
break | |
print(f"Found {len(filtered_videos)} qualifying videos so far. Searching next page...") | |
print(f"Search completed. Found {len(filtered_videos)} videos meeting criteria.") | |
return filtered_videos | |
def score_title(title, query): | |
"""Score a video title's relevance to the query using Gemini AI.""" | |
prompt = ( | |
f"Query: {query}\n" | |
f"Title: {title}\n" | |
"Rate relevance & quality 1–10 (just give the number)." | |
) | |
try: | |
response = model.generate_content(prompt) | |
score_text = response.text.strip() | |
# Try to extract just the number if there's additional text | |
import re | |
match = re.search(r'\b([0-9]|10)(\.[0-9]+)?\b', score_text) | |
if match: | |
score = float(match.group(0)) | |
else: | |
score = float(score_text) | |
return score | |
except ValueError: | |
print(f"Model returned non-numeric score for '{title}': '{score_text}'") | |
return 5.0 # Default middle score instead of 0 | |
except Exception as e: | |
print(f"Error scoring title '{title}': {e}") | |
if 'response' in locals() and hasattr(response, 'text'): | |
print(f"API response text: {response.text}") | |
return 5.0 # Default middle score | |
def pick_best(query, num_results=20): | |
""" | |
Find and score the best YouTube videos for a query. | |
Args: | |
query: Search query string | |
num_results: Number of top videos to return | |
""" | |
# Get more videos than we need to ensure we have enough after scoring | |
vids = search_videos(query, max_filtered_results=max(30, num_results * 1.5)) | |
if not vids: | |
print("No suitable videos found after applying filters.") | |
return | |
# Score each video | |
print(f"Scoring {len(vids)} videos...") | |
for i, v in enumerate(vids): | |
v["score"] = score_title(v["title"], query) | |
print(f" Scored video {i+1}/{len(vids)}: '{v['title']}' - Score: {v['score']:.2f}") | |
# Sort by score in descending order | |
vids.sort(key=lambda x: x.get("score", 0.0), reverse=True) | |
# Print the top num_results | |
result_count = min(num_results, len(vids)) | |
print(f"\n--- Top {result_count} Relevant Videos ---") | |
for i, video in enumerate(vids[:num_results]): | |
print(f"\n{i+1}.") | |
print(f" • Title: {video.get('title', 'N/A')}") | |
print(f" • URL: https://youtu.be/{video.get('id', 'N/A')}") | |
print(f" • Score: {video.get('score', 0.0):.2f}") | |
duration_sec = video.get('duration', 0) | |
print(f" • Duration: {duration_sec // 60}m{duration_sec % 60:02d}s") | |
print(f" • Published: {video.get('publishedAt', 'N/A')}") | |
# —— RUN IT! —— | |
if __name__ == "__main__": | |
# Check if API keys are set | |
if "YT_API_KEY" not in os.environ or "GEMINI_API_KEY" not in os.environ: | |
print("Error: YouTube and/or Gemini API keys not set in environment variables.") | |
else: | |
user_query = input("Enter your search (voice-to-text or text): ") | |
# Call pick_best with the desired number of results | |
pick_best(user_query, num_results=20) | |
#!/usr/bin/env python3 | |
import os | |
import re | |
from typing import Dict | |
from datetime import datetime, timedelta, timezone | |
from googleapiclient.discovery import build | |
import google.generativeai as genai | |
# ——— ENV variables ——— | |
YT_API_KEY = os.environ.get('YT_API_KEY') | |
GEMINI_API_KEY = os.environ.get('GEMINI_API_KEY') | |
# ——— CONSTANTS ——— | |
SERVICE_TYPE = 'youtube' | |
SERVICE_VERSION = 'v3' | |
MODEL_NAME = 'gemini-1.5-flash-latest' | |
DEFAULT_MAX_API_CALLS = 5 | |
DEFAULT_MAX_RESULTS_PER_PAGE = 50 | |
DEFAULT_MAX_RESULTS = 20 | |
DEFAULT_MIN_VIDEO_DURATION_MINUTES = 10 | |
DEFAULT_MAX_VIDEO_DURATION_MINUTES = 60 | |
DEFAULT_NO_OF_PREV_DAYS = 14 | |
DEFAULT_MAX_RESULTS = 5 | |
REGEX_PATTERN = r'\b(10|[1-9](\.\d+)?|0?\.\d+)\b' | |
DEFAULT_SCORE = 5.0 | |
class TimeUtils: | |
@staticmethod | |
def get_timestamp_n_days_from_now(days: int) -> str: | |
""" | |
Get the timestamp for a date n days ago in ISO 8601 format. | |
""" | |
date_before_n_days = datetime.now(timezone.utc) - timedelta(days=days) | |
formatted_date = date_before_n_days \ | |
.isoformat('T') \ | |
.replace('+00:00', 'Z') | |
return formatted_date | |
@staticmethod | |
def is_duration_in_mins(duration: str) -> bool: | |
""" | |
Check if the duration is in minutes. | |
""" | |
return 'H' in duration or 'M' not in duration | |
@staticmethod | |
def derive_total_seconds_from_duration(duration: str) -> int: | |
""" | |
Derive total seconds from duration (ISO 8601 format, e.g. "PT5M30S"). | |
""" | |
parts = duration.split('M') | |
mins = int(parts[0]) | |
secs = parts[1].replace('S', '') if len(parts) > 1 else '0' | |
seconds = int(secs) if secs else 0 | |
total_seconds = mins * 60 + seconds | |
return total_seconds | |
@staticmethod | |
def is_video_duration_in_range( | |
total_seconds: int, | |
*, | |
min_duration: int = DEFAULT_MIN_VIDEO_DURATION_MINUTES, | |
max_duration: int = DEFAULT_MAX_VIDEO_DURATION_MINUTES) -> bool: | |
""" | |
Check if the video duration is within the specified range in minutes. | |
""" | |
return min_duration * 60 <= total_seconds <= max_duration * 60 | |
class VideoDetailsExtractor: | |
""" | |
A class to encapsulate YouTube video extraction logic. | |
This class can be extended or modified for more complex behaviors. | |
""" | |
__platform_conn = build( | |
serviceName=SERVICE_TYPE, | |
version=SERVICE_VERSION, | |
developerKey=YT_API_KEY | |
) | |
def __init__( | |
self, | |
query: str, | |
*, | |
no_prev_days: int = DEFAULT_NO_OF_PREV_DAYS, | |
max_pages: int = DEFAULT_MAX_API_CALLS, | |
max_results: int = DEFAULT_MAX_RESULTS) -> None: | |
""" | |
Initialize the VideoDetailsExtractor. | |
""" | |
self.__filtered_videos = [] | |
self.__next_page_token = None | |
self.__page_count = 0 | |
self.__max_pages = max_pages | |
self.query = query | |
self.__targeted_date = TimeUtils \ | |
.get_timestamp_n_days_from_now(no_prev_days) | |
self.__search_response = self.get_new_search_response() | |
self.__max_results = max_results | |
self.scan_videos() | |
def get_new_search_response(self) -> dict: | |
""" | |
Fetch a new search response for the given query. | |
""" | |
search_config = { | |
"q": self.query, | |
"part": "id,snippet", | |
"type": "video", | |
"order": "relevance", | |
"publishedAfter": self.__targeted_date, | |
"maxResults": DEFAULT_MAX_RESULTS_PER_PAGE, | |
"pageToken": self.__next_page_token | |
} | |
new_search_response = VideoDetailsExtractor.__platform_conn \ | |
.search() \ | |
.list(**search_config) \ | |
.execute() | |
self.__page_count += 1 | |
return new_search_response | |
def get_video_ids_from_search_response(self) -> list: | |
""" | |
Extract video IDs from the search response. | |
""" | |
items_list = self.__search_response.get('items', []) | |
return [item['id']['videoId'] for item in items_list] | |
def filter_videos(self) -> None: | |
""" | |
Filter videos based on duration and recency. | |
This method processes the search response to filter videos that meet the criteria. | |
""" | |
video_ids = self.get_video_ids_from_search_response() | |
if not video_ids: | |
print("No video IDs found in the search response.") | |
return | |
details_config = { | |
"part": "contentDetails,snippet", | |
"id": ",".join(video_ids) | |
} | |
details = VideoDetailsExtractor.__platform_conn \ | |
.videos() \ | |
.list(**details_config) \ | |
.execute() | |
for item in details.get('items', []): | |
try: | |
duration = item['contentDetails']['duration'].replace('PT', '') | |
if TimeUtils.is_duration_in_mins(duration): | |
continue | |
total_seconds = TimeUtils \ | |
.derive_total_seconds_from_duration(duration) | |
if TimeUtils.is_video_duration_in_range(total_seconds): | |
video_details = { | |
'id': item['id'], | |
'title': item['snippet']['title'], | |
'duration': total_seconds, | |
'publishedAt': item['snippet']['publishedAt'] | |
} | |
self.__filtered_videos.append(video_details) | |
if len(self.__filtered_videos) >= DEFAULT_MAX_RESULTS: | |
break | |
except Exception as e: | |
print(f"Error processing video {item.get('id', 'N/A')}: {e}") | |
continue | |
print( | |
f"Filtered {len(self.__filtered_videos)} videos based on criteria.") | |
def has_filtered_videos_reached_limit(self) -> bool: | |
""" | |
Check if the maximum number of filtered videos has been reached. | |
""" | |
return len(self.__filtered_videos) < self.__max_results | |
def has_page_token_reached_limit(self) -> bool: | |
""" | |
Check if the maximum number of API calls has been reached. | |
""" | |
return self.__page_count >= self.__max_pages | |
def update_next_page_token(self) -> None: | |
""" | |
Update the next page token based on the search response. | |
""" | |
self.__next_page_token = \ | |
self.__search_response.get('nextPageToken', None) | |
def scan_videos(self) -> None: | |
""" | |
Scan for videos that meet the specified criteria. | |
This method keeps searching until it finds enough videos that meet the criteria | |
or exhausts the search results. | |
""" | |
while self.has_filtered_videos_reached_limit() and self.has_page_token_reached_limit(): | |
self.__search_response = self.get_new_search_response() | |
self.filter_videos() | |
self.update_next_page_token() | |
if not self.__next_page_token: | |
break | |
def get_video_details(self) -> list: | |
""" | |
Fetch video details for a list of filtered video based that were previously computed. | |
""" | |
if not self.__filtered_videos: | |
print('No suitable videos found after applying filters.') | |
return self.__filtered_videos | |
class GenModel: | |
""" | |
A class to encapsulate the Gemini model for scoring video titles. | |
This class can be extended or modified for more complex behaviors. | |
""" | |
_model = None | |
@classmethod | |
def _initialize_model(cls): | |
""" | |
Initialize the Gemini model if it hasn't been initialized yet. | |
""" | |
if cls._model is None: | |
genai.configure(api_key=GEMINI_API_KEY) | |
cls._model = genai.GenerativeModel(MODEL_NAME) | |
@staticmethod | |
def get_prompt_for_title(title: str, query: str) -> str: | |
""" | |
Generate a prompt for the Gemini model to score the title based on the query. | |
""" | |
return ( | |
f"Query: {query}\n" | |
f"Title: {title}\n" | |
"Rate relevance & quality 1–10 (just give the number)." | |
) | |
@classmethod | |
def get_score_for_title(cls, title: str, query: str) -> float: | |
""" | |
Get the score for a video title based on the query using the Gemini model. | |
If the model is not initialized, it will initialize it first. | |
If the score cannot be parsed, it returns a default score. | |
""" | |
cls._initialize_model() | |
prompt = cls.get_prompt_for_title(title, query) | |
try: | |
response = cls._model.generate_content(prompt) | |
score_text = response.text.strip() | |
match = re.search(REGEX_PATTERN, score_text) | |
return float(match.group()) if match else DEFAULT_SCORE | |
except (ValueError, AttributeError) as e: | |
print(f"[Error] Failed to parse score for '{title}': {e}") | |
return DEFAULT_SCORE | |
except Exception as e: | |
print(f"[Error] API call failed for '{title}': {e}") | |
return DEFAULT_SCORE | |
class VideoProcessor: | |
""" | |
A class to process video details and rank them based on a scoring model. | |
This class can be extended or modified for more complex behaviors. | |
""" | |
def __init__(self, scorer=GenModel): | |
""" | |
Initialize the VideoProcessor with a scoring model. | |
""" | |
self.scorer = scorer | |
def find_and_rank_videos(self, query: str, num_results: int = DEFAULT_MAX_RESULTS): | |
""" | |
Find and rank videos based on the query. | |
This method uses the VideoDetailsExtractor to find videos and the scoring model to rank them. | |
""" | |
videos = VideoDetailsExtractor(query).get_video_details() | |
if not videos: | |
return [] | |
for video in videos: | |
video['score'] = self.scorer.get_score_for_title( | |
video['title'], query) | |
return sorted(videos, key=lambda v: v['score'], reverse=True)[:num_results] | |
if __name__ == '__main__': | |
required_env_vars = ['YT_API_KEY', 'GEMINI_API_KEY'] | |
if any([env_var not in os.environ for env_var in required_env_vars]): | |
raise KeyError( | |
"Error: YouTube and/or Gemini API keys not set in environment variables.") | |
user_query = input("Enter your search: ") | |
video_processor = VideoProcessor() | |
pick_best = video_processor.find_and_rank_videos(user_query) | |
This is made under one company internship hiring assessment. I hope it would be useful for freshers to know exact real time company requirements from a candidate specifically in Gen AI, and ML Projects.