From f6494f0a071644c000720e089f640bf20b3946a4 Mon Sep 17 00:00:00 2001 From: Anthony Shaw Date: Mon, 2 Jun 2025 12:23:34 +1000 Subject: [PATCH 1/5] Make the file strategy parallel with a default of 10 files --- app/backend/prepdocs.py | 7 +++++++ app/backend/prepdocslib/filestrategy.py | 15 ++++++++++++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/app/backend/prepdocs.py b/app/backend/prepdocs.py index f03baac0dc..0c9fb47de6 100644 --- a/app/backend/prepdocs.py +++ b/app/backend/prepdocs.py @@ -311,6 +311,12 @@ async def main(strategy: Strategy, setup_index: bool = True): required=False, help="Search service system assigned Identity (Managed identity) (used for integrated vectorization)", ) + parser.add_argument( + "--concurrency", + type=int, + default=10, + help="Max. number of concurrent tasks to run for processing files (file strategy only) (default: 10)", + ) parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") args = parser.parse_args() @@ -467,6 +473,7 @@ async def main(strategy: Strategy, setup_index: bool = True): category=args.category, use_content_understanding=use_content_understanding, content_understanding_endpoint=os.getenv("AZURE_CONTENTUNDERSTANDING_ENDPOINT"), + concurrency=args.concurrency, ) loop.run_until_complete(main(ingestion_strategy, setup_index=not args.remove and not args.removeall)) diff --git a/app/backend/prepdocslib/filestrategy.py b/app/backend/prepdocslib/filestrategy.py index 37f399cf4b..e44d3ab6ff 100644 --- a/app/backend/prepdocslib/filestrategy.py +++ b/app/backend/prepdocslib/filestrategy.py @@ -1,3 +1,4 @@ +import asyncio import logging from typing import Optional @@ -56,6 +57,7 @@ def __init__( category: Optional[str] = None, use_content_understanding: bool = False, content_understanding_endpoint: Optional[str] = None, + concurrency: int = 10, ): self.list_file_strategy = list_file_strategy self.blob_manager = blob_manager @@ -70,6 +72,7 @@ def __init__( self.category = category self.use_content_understanding = use_content_understanding self.content_understanding_endpoint = content_understanding_endpoint + self.concurrency = concurrency def setup_search_manager(self): self.search_manager = SearchManager( @@ -98,9 +101,9 @@ async def setup(self): async def run(self): self.setup_search_manager() - if self.document_action == DocumentAction.Add: - files = self.list_file_strategy.list() - async for file in files: + + async def process_file_worker(semaphore: asyncio.Semaphore, file: File): + async with semaphore: try: sections = await parse_file(file, self.file_processors, self.category, self.image_embeddings) if sections: @@ -112,6 +115,12 @@ async def run(self): finally: if file: file.close() + + if self.document_action == DocumentAction.Add: + files = self.list_file_strategy.list() + semaphore = asyncio.Semaphore(self.concurrency) + tasks = [process_file_worker(semaphore, file) async for file in files] + await asyncio.gather(*tasks) elif self.document_action == DocumentAction.Remove: paths = self.list_file_strategy.list_paths() async for path in paths: From 44c912b68004f0d2450d18d8ede66179ab9ea6d5 Mon Sep 17 00:00:00 2001 From: Anthony Shaw Date: Mon, 2 Jun 2025 13:31:45 +1000 Subject: [PATCH 2/5] Set default c to 4, make a single constant --- app/backend/prepdocs.py | 2 +- app/backend/prepdocslib/filestrategy.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/app/backend/prepdocs.py b/app/backend/prepdocs.py index 0c9fb47de6..9017391571 100644 --- a/app/backend/prepdocs.py +++ b/app/backend/prepdocs.py @@ -314,7 +314,7 @@ async def main(strategy: Strategy, setup_index: bool = True): parser.add_argument( "--concurrency", type=int, - default=10, + default=FileStrategy.DEFAULT_CONCURRENCY, help="Max. number of concurrent tasks to run for processing files (file strategy only) (default: 10)", ) diff --git a/app/backend/prepdocslib/filestrategy.py b/app/backend/prepdocslib/filestrategy.py index e44d3ab6ff..9aa94b7acc 100644 --- a/app/backend/prepdocslib/filestrategy.py +++ b/app/backend/prepdocslib/filestrategy.py @@ -42,6 +42,8 @@ class FileStrategy(Strategy): Strategy for ingesting documents into a search service from files stored either locally or in a data lake storage account """ + DEFAULT_CONCURRENCY = 4 + def __init__( self, list_file_strategy: ListFileStrategy, @@ -57,7 +59,7 @@ def __init__( category: Optional[str] = None, use_content_understanding: bool = False, content_understanding_endpoint: Optional[str] = None, - concurrency: int = 10, + concurrency: int = DEFAULT_CONCURRENCY, ): self.list_file_strategy = list_file_strategy self.blob_manager = blob_manager @@ -118,6 +120,7 @@ async def process_file_worker(semaphore: asyncio.Semaphore, file: File): if self.document_action == DocumentAction.Add: files = self.list_file_strategy.list() + logger.info("Running with concurrency: %d", self.concurrency) semaphore = asyncio.Semaphore(self.concurrency) tasks = [process_file_worker(semaphore, file) async for file in files] await asyncio.gather(*tasks) From ad31bb04f8a0411b897163b4523f3298452335b2 Mon Sep 17 00:00:00 2001 From: Pamela Fox Date: Tue, 8 Jul 2025 11:01:28 -0700 Subject: [PATCH 3/5] Improve logging output --- app/backend/prepdocs.py | 2 +- app/backend/prepdocslib/blobmanager.py | 4 ++-- app/backend/prepdocslib/embeddings.py | 6 ------ app/backend/prepdocslib/filestrategy.py | 8 +++++--- app/backend/prepdocslib/htmlparser.py | 2 +- app/backend/prepdocslib/integratedvectorizerstrategy.py | 2 +- app/backend/prepdocslib/listfilestrategy.py | 2 +- app/backend/prepdocslib/mediadescriber.py | 4 ++-- app/backend/prepdocslib/pdfparser.py | 4 ++-- app/backend/prepdocslib/searchmanager.py | 6 +++--- 10 files changed, 18 insertions(+), 22 deletions(-) diff --git a/app/backend/prepdocs.py b/app/backend/prepdocs.py index 9017391571..e3010995b3 100644 --- a/app/backend/prepdocs.py +++ b/app/backend/prepdocs.py @@ -315,7 +315,7 @@ async def main(strategy: Strategy, setup_index: bool = True): "--concurrency", type=int, default=FileStrategy.DEFAULT_CONCURRENCY, - help="Max. number of concurrent tasks to run for processing files (file strategy only) (default: 10)", + help="Max. number of concurrent tasks to run for processing files (file strategy only) (default: 4)", ) parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") diff --git a/app/backend/prepdocslib/blobmanager.py b/app/backend/prepdocslib/blobmanager.py index d5c21e0d41..42ef37b0b7 100644 --- a/app/backend/prepdocslib/blobmanager.py +++ b/app/backend/prepdocslib/blobmanager.py @@ -56,7 +56,7 @@ async def upload_blob(self, file: File) -> Optional[list[str]]: if file.url is None: with open(file.content.name, "rb") as reopened_file: blob_name = BlobManager.blob_name_from_file_name(file.content.name) - logger.info("Uploading blob for whole file -> %s", blob_name) + logger.info("'%s': Uploading blob for whole file", file.content.name) blob_client = await container_client.upload_blob(blob_name, reopened_file, overwrite=True) file.url = blob_client.url @@ -64,7 +64,7 @@ async def upload_blob(self, file: File) -> Optional[list[str]]: if os.path.splitext(file.content.name)[1].lower() == ".pdf": return await self.upload_pdf_blob_images(service_client, container_client, file) else: - logger.info("File %s is not a PDF, skipping image upload", file.content.name) + logger.info("'%s': File is not a PDF, skipping image upload", file.content.name) return None diff --git a/app/backend/prepdocslib/embeddings.py b/app/backend/prepdocslib/embeddings.py index df56f39c08..f0b927deac 100644 --- a/app/backend/prepdocslib/embeddings.py +++ b/app/backend/prepdocslib/embeddings.py @@ -114,11 +114,6 @@ async def create_embedding_batch(self, texts: list[str], dimensions_args: ExtraA model=self.open_ai_model_name, input=batch.texts, **dimensions_args ) embeddings.extend([data.embedding for data in emb_response.data]) - logger.info( - "Computed embeddings in batch. Batch size: %d, Token count: %d", - len(batch.texts), - batch.token_length, - ) return embeddings @@ -134,7 +129,6 @@ async def create_embedding_single(self, text: str, dimensions_args: ExtraArgs) - emb_response = await client.embeddings.create( model=self.open_ai_model_name, input=text, **dimensions_args ) - logger.info("Computed embedding for text section. Character count: %d", len(text)) return emb_response.data[0].embedding diff --git a/app/backend/prepdocslib/filestrategy.py b/app/backend/prepdocslib/filestrategy.py index 9aa94b7acc..cd4b571276 100644 --- a/app/backend/prepdocslib/filestrategy.py +++ b/app/backend/prepdocslib/filestrategy.py @@ -24,11 +24,11 @@ async def parse_file( key = file.file_extension().lower() processor = file_processors.get(key) if processor is None: - logger.info("Skipping '%s', no parser found.", file.filename()) + logger.info("'%s': Skipping, no parser found.", file.content.name) return [] - logger.info("Ingesting '%s'", file.filename()) + logger.info("'%s': Starting ingestion process", file.content.name) pages = [page async for page in processor.parser.parse(content=file.content)] - logger.info("Splitting '%s' into sections", file.filename()) + logger.info("'%s': Splitting into sections", file.content.name) if image_embeddings: logger.warning("Each page will be split into smaller chunks of text, but images will be of the entire page.") sections = [ @@ -113,9 +113,11 @@ async def process_file_worker(semaphore: asyncio.Semaphore, file: File): blob_image_embeddings: Optional[list[list[float]]] = None if self.image_embeddings and blob_sas_uris: blob_image_embeddings = await self.image_embeddings.create_embeddings(blob_sas_uris) + logger.info("'%s': Computing embeddings and updating search index", file.content.name) await self.search_manager.update_content(sections, blob_image_embeddings, url=file.url) finally: if file: + logger.info("'%s': Finished processing file", file.content.name) file.close() if self.document_action == DocumentAction.Add: diff --git a/app/backend/prepdocslib/htmlparser.py b/app/backend/prepdocslib/htmlparser.py index 719045b393..cfb5aafaba 100644 --- a/app/backend/prepdocslib/htmlparser.py +++ b/app/backend/prepdocslib/htmlparser.py @@ -39,7 +39,7 @@ async def parse(self, content: IO) -> AsyncGenerator[Page, None]: Returns: Page: The parsed html Page. """ - logger.info("Extracting text from '%s' using local HTML parser (BeautifulSoup)", content.name) + logger.info("'%s': Extracting text using local HTML parser (BeautifulSoup)", content.name) data = content.read() soup = BeautifulSoup(data, "html.parser") diff --git a/app/backend/prepdocslib/integratedvectorizerstrategy.py b/app/backend/prepdocslib/integratedvectorizerstrategy.py index 9e89facc4c..b444a83f15 100644 --- a/app/backend/prepdocslib/integratedvectorizerstrategy.py +++ b/app/backend/prepdocslib/integratedvectorizerstrategy.py @@ -129,7 +129,7 @@ async def create_embedding_skill(self, index_name: str) -> SearchIndexerSkillset return skillset async def setup(self): - logger.info("Setting up search index using integrated vectorization...") + logger.info("Setting up search index using integrated vectorization") search_manager = SearchManager( search_info=self.search_info, search_analyzer_name=self.search_analyzer_name, diff --git a/app/backend/prepdocslib/listfilestrategy.py b/app/backend/prepdocslib/listfilestrategy.py index bdceef0754..2a4c5b7f03 100644 --- a/app/backend/prepdocslib/listfilestrategy.py +++ b/app/backend/prepdocslib/listfilestrategy.py @@ -102,7 +102,7 @@ def check_md5(self, path: str) -> bool: stored_hash = md5_f.read() if stored_hash and stored_hash.strip() == existing_hash.strip(): - logger.info("Skipping %s, no changes detected.", path) + logger.info("'%s': Skipping, no changes detected.", path) return True # Write the hash diff --git a/app/backend/prepdocslib/mediadescriber.py b/app/backend/prepdocslib/mediadescriber.py index 5aae79232e..42235ce1f7 100644 --- a/app/backend/prepdocslib/mediadescriber.py +++ b/app/backend/prepdocslib/mediadescriber.py @@ -58,7 +58,7 @@ async def poll(): return await poll() async def create_analyzer(self): - logger.info("Creating analyzer '%s'...", self.analyzer_schema["analyzerId"]) + logger.info("Creating analyzer '%s'", self.analyzer_schema["analyzerId"]) token_provider = get_bearer_token_provider(self.credential, "https://cognitiveservices.azure.com/.default") token = await token_provider() @@ -84,7 +84,7 @@ async def create_analyzer(self): await self.poll_api(session, poll_url, headers) async def describe_image(self, image_bytes: bytes) -> str: - logger.info("Sending image to Azure Content Understanding service...") + logger.info("Sending image to Azure Content Understanding service") async with aiohttp.ClientSession() as session: token = await self.credential.get_token("https://cognitiveservices.azure.com/.default") headers = {"Authorization": "Bearer " + token.token} diff --git a/app/backend/prepdocslib/pdfparser.py b/app/backend/prepdocslib/pdfparser.py index c96980d21c..3956e546ed 100644 --- a/app/backend/prepdocslib/pdfparser.py +++ b/app/backend/prepdocslib/pdfparser.py @@ -33,7 +33,7 @@ class LocalPdfParser(Parser): """ async def parse(self, content: IO) -> AsyncGenerator[Page, None]: - logger.info("Extracting text from '%s' using local PDF parser (pypdf)", content.name) + logger.info("'%s': Extracting text using local PDF parser (pypdf)", content.name) reader = PdfReader(content) pages = reader.pages @@ -65,7 +65,7 @@ def __init__( self.content_understanding_endpoint = content_understanding_endpoint async def parse(self, content: IO) -> AsyncGenerator[Page, None]: - logger.info("Extracting text from '%s' using Azure Document Intelligence", content.name) + logger.info("'%s': Extracting text using Azure Document Intelligence", content.name) async with DocumentIntelligenceClient( endpoint=self.endpoint, credential=self.credential diff --git a/app/backend/prepdocslib/searchmanager.py b/app/backend/prepdocslib/searchmanager.py index e6ca925e24..0d3faf3de3 100644 --- a/app/backend/prepdocslib/searchmanager.py +++ b/app/backend/prepdocslib/searchmanager.py @@ -77,7 +77,7 @@ def __init__( self.search_images = search_images async def create_index(self): - logger.info("Checking whether search index %s exists...", self.search_info.index_name) + logger.info("Checking whether search index '%s' exists", self.search_info.index_name) async with self.search_info.create_search_index_client() as search_index_client: @@ -280,10 +280,10 @@ async def create_index(self): await search_index_client.create_index(index) else: - logger.info("Search index %s already exists", self.search_info.index_name) + logger.info("Search index '%s' already exists", self.search_info.index_name) existing_index = await search_index_client.get_index(self.search_info.index_name) if not any(field.name == "storageUrl" for field in existing_index.fields): - logger.info("Adding storageUrl field to index %s", self.search_info.index_name) + logger.info("Adding storageUrl field to index '%s'", self.search_info.index_name) existing_index.fields.append( SimpleField( name="storageUrl", From a1444c2f9da2298d3fd05bf0bd3e64af607e9330 Mon Sep 17 00:00:00 2001 From: Pamela Fox Date: Tue, 8 Jul 2025 11:13:50 -0700 Subject: [PATCH 4/5] Bring back embedding logs, at DEBUG level --- app/backend/prepdocs.py | 2 +- app/backend/prepdocslib/embeddings.py | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/app/backend/prepdocs.py b/app/backend/prepdocs.py index e3010995b3..e6899c856e 100644 --- a/app/backend/prepdocs.py +++ b/app/backend/prepdocs.py @@ -325,7 +325,7 @@ async def main(strategy: Strategy, setup_index: bool = True): logging.basicConfig(format="%(message)s", datefmt="[%X]", handlers=[RichHandler(rich_tracebacks=True)]) # We only set the level to INFO for our logger, # to avoid seeing the noisy INFO level logs from the Azure SDKs - logger.setLevel(logging.DEBUG) + logger.setLevel(logging.INFO) load_azd_env() diff --git a/app/backend/prepdocslib/embeddings.py b/app/backend/prepdocslib/embeddings.py index f0b927deac..da0ae6d024 100644 --- a/app/backend/prepdocslib/embeddings.py +++ b/app/backend/prepdocslib/embeddings.py @@ -114,7 +114,11 @@ async def create_embedding_batch(self, texts: list[str], dimensions_args: ExtraA model=self.open_ai_model_name, input=batch.texts, **dimensions_args ) embeddings.extend([data.embedding for data in emb_response.data]) - + logger.debug( + "Computed embeddings in batch. Batch size: %d, Token count: %d", + len(batch.texts), + batch.token_length, + ) return embeddings async def create_embedding_single(self, text: str, dimensions_args: ExtraArgs) -> list[float]: @@ -129,7 +133,7 @@ async def create_embedding_single(self, text: str, dimensions_args: ExtraArgs) - emb_response = await client.embeddings.create( model=self.open_ai_model_name, input=text, **dimensions_args ) - + logger.debug("Computed embedding for text section. Character count: %d", len(text)) return emb_response.data[0].embedding async def create_embeddings(self, texts: list[str]) -> list[list[float]]: From e5f71d9aaf89bd477d4bb5c78f088359b7757c42 Mon Sep 17 00:00:00 2001 From: Pamela Fox Date: Tue, 8 Jul 2025 12:36:05 -0700 Subject: [PATCH 5/5] Address feedback from Copilot --- app/backend/prepdocs.py | 4 ++-- app/backend/prepdocslib/blobmanager.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/app/backend/prepdocs.py b/app/backend/prepdocs.py index e6899c856e..9e2cecf0e4 100644 --- a/app/backend/prepdocs.py +++ b/app/backend/prepdocs.py @@ -323,9 +323,9 @@ async def main(strategy: Strategy, setup_index: bool = True): if args.verbose: logging.basicConfig(format="%(message)s", datefmt="[%X]", handlers=[RichHandler(rich_tracebacks=True)]) - # We only set the level to INFO for our logger, + # We only set the level to DEBUG for our logger, # to avoid seeing the noisy INFO level logs from the Azure SDKs - logger.setLevel(logging.INFO) + logger.setLevel(logging.DEBUG) load_azd_env() diff --git a/app/backend/prepdocslib/blobmanager.py b/app/backend/prepdocslib/blobmanager.py index 42ef37b0b7..eb7097b0dc 100644 --- a/app/backend/prepdocslib/blobmanager.py +++ b/app/backend/prepdocslib/blobmanager.py @@ -56,7 +56,7 @@ async def upload_blob(self, file: File) -> Optional[list[str]]: if file.url is None: with open(file.content.name, "rb") as reopened_file: blob_name = BlobManager.blob_name_from_file_name(file.content.name) - logger.info("'%s': Uploading blob for whole file", file.content.name) + logger.info("'%s': Uploading blob for file to '%s'", file.content.name, blob_name) blob_client = await container_client.upload_blob(blob_name, reopened_file, overwrite=True) file.url = blob_client.url