Skip to content

Youtube video finder using geminillm #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

veerababu1729
Copy link

This is made under one company internship hiring assessment. I hope it would be useful for freshers to know exact real time company requirements from a candidate specifically in Gen AI, and ML Projects.

This is created by me as part one of the company assesment as first round in their hiring finding best youtube videos using youtube api and Gemini llm
Developed by me as a company hiring assesment project to findbest youtube videos using yt api and gemini llm at backend
Copy link

👋 @veerababu1729
Thank you for raising your pull request.
Please make sure you have followed our contributing guidelines. We will review it as soon as possible.

Copy link
Contributor

@iamwatchdogs iamwatchdogs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @veerababu1729, Please make the respecified changes for your PR to get approved.

Comment on lines +1 to +3
"This is a Open source project initiated by me -you can fork, edit, and make a pull request"

````markdown
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is not required.

Suggested change
"This is a Open source project initiated by me -you can fork, edit, and make a pull request"
````markdown

Comment on lines +63 to +65
## 📃 License

Open source – feel free to use and modify
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refrain from adding obvious information. This repo is licensed under the Apache, and it's present within the root directory of this repo. So, this part is unnecessary.

Suggested change
## 📃 License
Open source – feel free to use and modify

Comment on lines +1 to +180
filtered_videos.append({
"id": item["id"],
"title": item["snippet"]["title"],
"duration": total_seconds,
"publishedAt": item["snippet"]["publishedAt"]
})

# If we've found enough videos, we can stop
if len(filtered_videos) >= max_filtered_results:
break
except Exception as e:
print(f"Could not parse duration for video {item.get('id', 'N/A')}: {e}")
continue

# Check if there are more pages of results
next_page_token = search_response.get("nextPageToken")
if not next_page_token:
break

print(f"Found {len(filtered_videos)} qualifying videos so far. Searching next page...")

print(f"Search completed. Found {len(filtered_videos)} videos meeting criteria.")
return filtered_videos


def score_title(title, query):
"""Score a video title's relevance to the query using Gemini AI."""
prompt = (
f"Query: {query}\n"
f"Title: {title}\n"
"Rate relevance & quality 1–10 (just give the number)."
)
try:
response = model.generate_content(prompt)
score_text = response.text.strip()
# Try to extract just the number if there's additional text
import re
match = re.search(r'\b([0-9]|10)(\.[0-9]+)?\b', score_text)
if match:
score = float(match.group(0))
else:
score = float(score_text)
return score
except ValueError:
print(f"Model returned non-numeric score for '{title}': '{score_text}'")
return 5.0 # Default middle score instead of 0
except Exception as e:
print(f"Error scoring title '{title}': {e}")
if 'response' in locals() and hasattr(response, 'text'):
print(f"API response text: {response.text}")
return 5.0 # Default middle score


def pick_best(query, num_results=20):
"""
Find and score the best YouTube videos for a query.

Args:
query: Search query string
num_results: Number of top videos to return
"""
# Get more videos than we need to ensure we have enough after scoring
vids = search_videos(query, max_filtered_results=max(30, num_results * 1.5))

if not vids:
print("No suitable videos found after applying filters.")
return

# Score each video
print(f"Scoring {len(vids)} videos...")
for i, v in enumerate(vids):
v["score"] = score_title(v["title"], query)
print(f" Scored video {i+1}/{len(vids)}: '{v['title']}' - Score: {v['score']:.2f}")

# Sort by score in descending order
vids.sort(key=lambda x: x.get("score", 0.0), reverse=True)

# Print the top num_results
result_count = min(num_results, len(vids))
print(f"\n--- Top {result_count} Relevant Videos ---")

for i, video in enumerate(vids[:num_results]):
print(f"\n{i+1}.")
print(f" • Title: {video.get('title', 'N/A')}")
print(f" • URL: https://youtu.be/{video.get('id', 'N/A')}")
print(f" • Score: {video.get('score', 0.0):.2f}")
duration_sec = video.get('duration', 0)
print(f" • Duration: {duration_sec // 60}m{duration_sec % 60:02d}s")
print(f" • Published: {video.get('publishedAt', 'N/A')}")


# —— RUN IT! ——
if __name__ == "__main__":
# Check if API keys are set
if "YT_API_KEY" not in os.environ or "GEMINI_API_KEY" not in os.environ:
print("Error: YouTube and/or Gemini API keys not set in environment variables.")
else:
user_query = input("Enter your search (voice-to-text or text): ")
# Call pick_best with the desired number of results
pick_best(user_query, num_results=20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your implementation is good, but there are a couple of things you need to work on, like,

  • Include a shebang if you were meant to use it as a script.
  • Initialising the config as global. (it maybe commonly used as scripting, but it's always better to define its own boundary using classes)
  • Using deprecated methods like datetime.datetime.utcnow().
  • Not defining constants for static values that define the behaviour and limits.
  • Implementing direct mathematical logic without abstraction.
  • Not using enumeration properly.
  • Not enclosing valuable data and object instance (like model) within class to restrict access, thus keeping the integrity of source code.
  • Logic needs to be divided into each block of logic (i.e., either as functions or methods)
  • Too many comments, code should be self-documenting.
  • Need to follow best practices and format the code.

Just to name a few, but here's a suggested version of your code. This not might be the best, but it does indeed resolve the above issue for the most part. Hope you learn a thing or two from it.

Suggested change
import os
import datetime
from googleapiclient.discovery import build
import google.generativeai as genai
# ——— CONFIG ———
# Initialize clients with environment variables
yt = build("youtube", "v3", developerKey=os.environ["YT_API_KEY"])
# Configure the Google Generative AI client
genai.configure(api_key=os.environ["GEMINI_API_KEY"])
# Initialize the Gemini model
model = genai.GenerativeModel('gemini-1.5-flash-latest')
def search_videos(query, max_filtered_results=20):
"""
Search for YouTube videos matching a query, filtering by recency and duration.
This function keeps searching until it finds enough videos that meet the criteria
or exhausts the search results.
"""
# Calculate publishedAfter timestamp (14 days ago)
fourteen_days_ago = (datetime.datetime.utcnow()
- datetime.timedelta(days=14)).isoformat("T") + "Z"
filtered_videos = []
next_page_token = None
page_count = 0
max_pages = 5 # Limit the number of pages to search to avoid excessive API calls
# Continue searching until we have enough filtered videos or run out of results
while len(filtered_videos) < max_filtered_results and page_count < max_pages:
# Step 1: Search for videos matching the query
search_response = yt.search().list(
q=query,
part="id,snippet",
type="video",
order="relevance",
publishedAfter=fourteen_days_ago,
maxResults=50, # Maximum allowed by the API
pageToken=next_page_token
).execute()
page_count += 1
# Step 2: Collect video IDs from this page
video_ids = [item["id"]["videoId"] for item in search_response.get("items", [])]
# Break if no more videos found
if not video_ids:
break
# Step 3: Get details for the fetched videos
details = yt.videos().list(
part="contentDetails,snippet",
id=",".join(video_ids)
).execute()
# Step 4: Filter by duration (4–20 minutes)
for item in details.get("items", []):
try:
# Parse duration (ISO 8601 format, e.g. "PT5M30S")
dur = item["contentDetails"]["duration"].replace("PT","")
# Skip videos with hours or without minutes
if "H" in dur or "M" not in dur:
continue
# Split minutes and seconds
parts = dur.split("M")
mins = int(parts[0])
secs = parts[1].replace("S","") if len(parts) > 1 else "0"
seconds = int(secs) if secs else 0
total_seconds = mins * 60 + seconds
# Filter by duration (4 to 20 minutes inclusive)
if 4 * 60 <= total_seconds <= 20 * 60:
filtered_videos.append({
"id": item["id"],
"title": item["snippet"]["title"],
"duration": total_seconds,
"publishedAt": item["snippet"]["publishedAt"]
})
# If we've found enough videos, we can stop
if len(filtered_videos) >= max_filtered_results:
break
except Exception as e:
print(f"Could not parse duration for video {item.get('id', 'N/A')}: {e}")
continue
# Check if there are more pages of results
next_page_token = search_response.get("nextPageToken")
if not next_page_token:
break
print(f"Found {len(filtered_videos)} qualifying videos so far. Searching next page...")
print(f"Search completed. Found {len(filtered_videos)} videos meeting criteria.")
return filtered_videos
def score_title(title, query):
"""Score a video title's relevance to the query using Gemini AI."""
prompt = (
f"Query: {query}\n"
f"Title: {title}\n"
"Rate relevance & quality 1–10 (just give the number)."
)
try:
response = model.generate_content(prompt)
score_text = response.text.strip()
# Try to extract just the number if there's additional text
import re
match = re.search(r'\b([0-9]|10)(\.[0-9]+)?\b', score_text)
if match:
score = float(match.group(0))
else:
score = float(score_text)
return score
except ValueError:
print(f"Model returned non-numeric score for '{title}': '{score_text}'")
return 5.0 # Default middle score instead of 0
except Exception as e:
print(f"Error scoring title '{title}': {e}")
if 'response' in locals() and hasattr(response, 'text'):
print(f"API response text: {response.text}")
return 5.0 # Default middle score
def pick_best(query, num_results=20):
"""
Find and score the best YouTube videos for a query.
Args:
query: Search query string
num_results: Number of top videos to return
"""
# Get more videos than we need to ensure we have enough after scoring
vids = search_videos(query, max_filtered_results=max(30, num_results * 1.5))
if not vids:
print("No suitable videos found after applying filters.")
return
# Score each video
print(f"Scoring {len(vids)} videos...")
for i, v in enumerate(vids):
v["score"] = score_title(v["title"], query)
print(f" Scored video {i+1}/{len(vids)}: '{v['title']}' - Score: {v['score']:.2f}")
# Sort by score in descending order
vids.sort(key=lambda x: x.get("score", 0.0), reverse=True)
# Print the top num_results
result_count = min(num_results, len(vids))
print(f"\n--- Top {result_count} Relevant Videos ---")
for i, video in enumerate(vids[:num_results]):
print(f"\n{i+1}.")
print(f" • Title: {video.get('title', 'N/A')}")
print(f" • URL: https://youtu.be/{video.get('id', 'N/A')}")
print(f" • Score: {video.get('score', 0.0):.2f}")
duration_sec = video.get('duration', 0)
print(f" • Duration: {duration_sec // 60}m{duration_sec % 60:02d}s")
print(f" • Published: {video.get('publishedAt', 'N/A')}")
# —— RUN IT! ——
if __name__ == "__main__":
# Check if API keys are set
if "YT_API_KEY" not in os.environ or "GEMINI_API_KEY" not in os.environ:
print("Error: YouTube and/or Gemini API keys not set in environment variables.")
else:
user_query = input("Enter your search (voice-to-text or text): ")
# Call pick_best with the desired number of results
pick_best(user_query, num_results=20)
#!/usr/bin/env python3
import os
import re
from typing import Dict
from datetime import datetime, timedelta, timezone
from googleapiclient.discovery import build
import google.generativeai as genai
# ——— ENV variables ———
YT_API_KEY = os.environ.get('YT_API_KEY')
GEMINI_API_KEY = os.environ.get('GEMINI_API_KEY')
# ——— CONSTANTS ———
SERVICE_TYPE = 'youtube'
SERVICE_VERSION = 'v3'
MODEL_NAME = 'gemini-1.5-flash-latest'
DEFAULT_MAX_API_CALLS = 5
DEFAULT_MAX_RESULTS_PER_PAGE = 50
DEFAULT_MAX_RESULTS = 20
DEFAULT_MIN_VIDEO_DURATION_MINUTES = 10
DEFAULT_MAX_VIDEO_DURATION_MINUTES = 60
DEFAULT_NO_OF_PREV_DAYS = 14
DEFAULT_MAX_RESULTS = 5
REGEX_PATTERN = r'\b(10|[1-9](\.\d+)?|0?\.\d+)\b'
DEFAULT_SCORE = 5.0
class TimeUtils:
@staticmethod
def get_timestamp_n_days_from_now(days: int) -> str:
"""
Get the timestamp for a date n days ago in ISO 8601 format.
"""
date_before_n_days = datetime.now(timezone.utc) - timedelta(days=days)
formatted_date = date_before_n_days \
.isoformat('T') \
.replace('+00:00', 'Z')
return formatted_date
@staticmethod
def is_duration_in_mins(duration: str) -> bool:
"""
Check if the duration is in minutes.
"""
return 'H' in duration or 'M' not in duration
@staticmethod
def derive_total_seconds_from_duration(duration: str) -> int:
"""
Derive total seconds from duration (ISO 8601 format, e.g. "PT5M30S").
"""
parts = duration.split('M')
mins = int(parts[0])
secs = parts[1].replace('S', '') if len(parts) > 1 else '0'
seconds = int(secs) if secs else 0
total_seconds = mins * 60 + seconds
return total_seconds
@staticmethod
def is_video_duration_in_range(
total_seconds: int,
*,
min_duration: int = DEFAULT_MIN_VIDEO_DURATION_MINUTES,
max_duration: int = DEFAULT_MAX_VIDEO_DURATION_MINUTES) -> bool:
"""
Check if the video duration is within the specified range in minutes.
"""
return min_duration * 60 <= total_seconds <= max_duration * 60
class VideoDetailsExtractor:
"""
A class to encapsulate YouTube video extraction logic.
This class can be extended or modified for more complex behaviors.
"""
__platform_conn = build(
serviceName=SERVICE_TYPE,
version=SERVICE_VERSION,
developerKey=YT_API_KEY
)
def __init__(
self,
query: str,
*,
no_prev_days: int = DEFAULT_NO_OF_PREV_DAYS,
max_pages: int = DEFAULT_MAX_API_CALLS,
max_results: int = DEFAULT_MAX_RESULTS) -> None:
"""
Initialize the VideoDetailsExtractor.
"""
self.__filtered_videos = []
self.__next_page_token = None
self.__page_count = 0
self.__max_pages = max_pages
self.query = query
self.__targeted_date = TimeUtils \
.get_timestamp_n_days_from_now(no_prev_days)
self.__search_response = self.get_new_search_response()
self.__max_results = max_results
self.scan_videos()
def get_new_search_response(self) -> dict:
"""
Fetch a new search response for the given query.
"""
search_config = {
"q": self.query,
"part": "id,snippet",
"type": "video",
"order": "relevance",
"publishedAfter": self.__targeted_date,
"maxResults": DEFAULT_MAX_RESULTS_PER_PAGE,
"pageToken": self.__next_page_token
}
new_search_response = VideoDetailsExtractor.__platform_conn \
.search() \
.list(**search_config) \
.execute()
self.__page_count += 1
return new_search_response
def get_video_ids_from_search_response(self) -> list:
"""
Extract video IDs from the search response.
"""
items_list = self.__search_response.get('items', [])
return [item['id']['videoId'] for item in items_list]
def filter_videos(self) -> None:
"""
Filter videos based on duration and recency.
This method processes the search response to filter videos that meet the criteria.
"""
video_ids = self.get_video_ids_from_search_response()
if not video_ids:
print("No video IDs found in the search response.")
return
details_config = {
"part": "contentDetails,snippet",
"id": ",".join(video_ids)
}
details = VideoDetailsExtractor.__platform_conn \
.videos() \
.list(**details_config) \
.execute()
for item in details.get('items', []):
try:
duration = item['contentDetails']['duration'].replace('PT', '')
if TimeUtils.is_duration_in_mins(duration):
continue
total_seconds = TimeUtils \
.derive_total_seconds_from_duration(duration)
if TimeUtils.is_video_duration_in_range(total_seconds):
video_details = {
'id': item['id'],
'title': item['snippet']['title'],
'duration': total_seconds,
'publishedAt': item['snippet']['publishedAt']
}
self.__filtered_videos.append(video_details)
if len(self.__filtered_videos) >= DEFAULT_MAX_RESULTS:
break
except Exception as e:
print(f"Error processing video {item.get('id', 'N/A')}: {e}")
continue
print(
f"Filtered {len(self.__filtered_videos)} videos based on criteria.")
def has_filtered_videos_reached_limit(self) -> bool:
"""
Check if the maximum number of filtered videos has been reached.
"""
return len(self.__filtered_videos) < self.__max_results
def has_page_token_reached_limit(self) -> bool:
"""
Check if the maximum number of API calls has been reached.
"""
return self.__page_count >= self.__max_pages
def update_next_page_token(self) -> None:
"""
Update the next page token based on the search response.
"""
self.__next_page_token = \
self.__search_response.get('nextPageToken', None)
def scan_videos(self) -> None:
"""
Scan for videos that meet the specified criteria.
This method keeps searching until it finds enough videos that meet the criteria
or exhausts the search results.
"""
while self.has_filtered_videos_reached_limit() and self.has_page_token_reached_limit():
self.__search_response = self.get_new_search_response()
self.filter_videos()
self.update_next_page_token()
if not self.__next_page_token:
break
def get_video_details(self) -> list:
"""
Fetch video details for a list of filtered video based that were previously computed.
"""
if not self.__filtered_videos:
print('No suitable videos found after applying filters.')
return self.__filtered_videos
class GenModel:
"""
A class to encapsulate the Gemini model for scoring video titles.
This class can be extended or modified for more complex behaviors.
"""
_model = None
@classmethod
def _initialize_model(cls):
"""
Initialize the Gemini model if it hasn't been initialized yet.
"""
if cls._model is None:
genai.configure(api_key=GEMINI_API_KEY)
cls._model = genai.GenerativeModel(MODEL_NAME)
@staticmethod
def get_prompt_for_title(title: str, query: str) -> str:
"""
Generate a prompt for the Gemini model to score the title based on the query.
"""
return (
f"Query: {query}\n"
f"Title: {title}\n"
"Rate relevance & quality 1–10 (just give the number)."
)
@classmethod
def get_score_for_title(cls, title: str, query: str) -> float:
"""
Get the score for a video title based on the query using the Gemini model.
If the model is not initialized, it will initialize it first.
If the score cannot be parsed, it returns a default score.
"""
cls._initialize_model()
prompt = cls.get_prompt_for_title(title, query)
try:
response = cls._model.generate_content(prompt)
score_text = response.text.strip()
match = re.search(REGEX_PATTERN, score_text)
return float(match.group()) if match else DEFAULT_SCORE
except (ValueError, AttributeError) as e:
print(f"[Error] Failed to parse score for '{title}': {e}")
return DEFAULT_SCORE
except Exception as e:
print(f"[Error] API call failed for '{title}': {e}")
return DEFAULT_SCORE
class VideoProcessor:
"""
A class to process video details and rank them based on a scoring model.
This class can be extended or modified for more complex behaviors.
"""
def __init__(self, scorer=GenModel):
"""
Initialize the VideoProcessor with a scoring model.
"""
self.scorer = scorer
def find_and_rank_videos(self, query: str, num_results: int = DEFAULT_MAX_RESULTS):
"""
Find and rank videos based on the query.
This method uses the VideoDetailsExtractor to find videos and the scoring model to rank them.
"""
videos = VideoDetailsExtractor(query).get_video_details()
if not videos:
return []
for video in videos:
video['score'] = self.scorer.get_score_for_title(
video['title'], query)
return sorted(videos, key=lambda v: v['score'], reverse=True)[:num_results]
if __name__ == '__main__':
required_env_vars = ['YT_API_KEY', 'GEMINI_API_KEY']
if any([env_var not in os.environ for env_var in required_env_vars]):
raise KeyError(
"Error: YouTube and/or Gemini API keys not set in environment variables.")
user_query = input("Enter your search: ")
video_processor = VideoProcessor()
pick_best = video_processor.find_and_rank_videos(user_query)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants