From 63ba2d5d012a76ee5d973a9db37d93f28ffd6090 Mon Sep 17 00:00:00 2001 From: Omar Date: Sat, 30 May 2026 08:21:23 +0300 Subject: [PATCH] Update collections aggregation strategy to have concurrency at the collections level; replace curl_cffi with wreq. --- main.py | 165 ++++++++++++++++++++++++++++++-------------------------- 1 file changed, 88 insertions(+), 77 deletions(-) diff --git a/main.py b/main.py index a1745f0..58867d8 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,5 @@ -from curl_cffi.requests.exceptions import HTTPError, Timeout, InvalidURL -from curl_cffi import AsyncSession -from json.decoder import JSONDecodeError +from wreq.exceptions import DecodingError, TimeoutError, StatusError, BuilderError +from wreq import Client, Emulation, Response from collections.abc import AsyncGenerator from asyncio import Semaphore from functools import wraps @@ -307,21 +306,21 @@ def parse_product(product: dict) -> dict: return parsed_product -async def get_total_products_count(scrape_url: str, session: AsyncSession) -> int: +async def get_total_products_count(scrape_url: str, client: Client) -> int: """Gets the total number of products in the Shopify store. Returns 25001 for stores with more than 25k products. Args: scrape_url: The URL of the working /products.json endpoint of the Shopify store. - session: A reference of the main scraping session.""" + client: A reference of the main scraping client.""" delay_time = 1 max_attempts = 10 for attempt in range(1, max_attempts + 1): try: - res = await session.get(scrape_url.replace("/products.json", "/meta.json")) + res: Response = await client.get(scrape_url.replace("/products.json", "/meta.json")) res.raise_for_status() - data = res.json() - except (HTTPError, JSONDecodeError, Timeout): + data = await res.json() + except (StatusError, DecodingError, TimeoutError): if attempt == 10: raise @@ -335,12 +334,12 @@ async def get_total_products_count(scrape_url: str, session: AsyncSession) -> in return total_products @limit_concurrency(limit=30) -async def get_page_products(scrape_url: str, page: int, session: AsyncSession) -> list: +async def get_page_products(scrape_url: str, page: int, client: Client) -> list: """Returns raw product data from any given API page. Args: scrape_url: The specific API url (e.g. https://some-store.myshopify.com/products.json). page: The pagination API query paramater. - session: A reference of the main scraping session.""" + client: A reference of the main scraping client.""" delay_time = 1 max_attempts = 10 @@ -350,10 +349,10 @@ async def get_page_products(scrape_url: str, page: int, session: AsyncSession) - for attempt in range(1, max_attempts + 1): try: - res = await session.get(scrape_url, params=parameters) + res: Response = await client.get(scrape_url, query=parameters) res.raise_for_status() - data = res.json() - except (HTTPError, JSONDecodeError, Timeout): + data = await res.json() + except (StatusError, DecodingError, TimeoutError): if attempt == 10: raise @@ -365,29 +364,36 @@ async def get_page_products(scrape_url: str, page: int, session: AsyncSession) - return data["products"] -async def get_endpoint_products(scrape_info: dict, session: AsyncSession) -> AsyncGenerator[dict, None, None]: +async def get_endpoint_products(scrape_info: dict, client: Client) -> AsyncGenerator[dict, None, None]: """Scrapes all available products from a given endpoint. Args: - scrape_info: A dictionary containing necessary info such as the url of the endpoint, total products count of the store, and collection info (if necessary). + scrape_info: A dictionary containing necessary info such as the url of the endpoint, total products count of the store. session: A reference of the main scraping session.""" scrape_url = scrape_info["url"] total_products = scrape_info["total_products"] - collection = scrape_info["collection"] - if collection: - num_pages = collection["products_count"] // 250 + (1 if collection["products_count"] % 250 > 0 else 0) - else: - num_pages = total_products // 250 + (1 if total_products % 250 > 0 else 0) - - tasks = [get_page_products(scrape_url if not collection else collection["url"], page_num, session) for page_num in range(1, num_pages + 1 if num_pages <= 100 else 101)] + num_pages = total_products // 250 + (1 if total_products % 250 > 0 else 0) + + + tasks = [get_page_products(scrape_url, page_num, client) for page_num in range(1, num_pages + 1 if num_pages <= 100 else 101)] for future in asyncio.as_completed(tasks): for product in await future: yield parse_product(product) +def get_collection_page_tasks(collection: dict, client: Client) -> list: + """Returns a list of get_page_products() coroutines for a given collection. + Args: + collection: A dictionary containing collection information. + client: A reference of the main scraping client.""" + + num_pages = collection["products_count"] // 250 + (1 if collection["products_count"] % 250 > 0 else 0) + tasks = [get_page_products(collection["url"], page_num, client) for page_num in range(1, num_pages + 1 if num_pages <= 100 else 101)] + + return tasks -async def get_collections(scrape_url: str, session: AsyncSession) -> list: +async def get_collections(scrape_url: str, client: Client) -> list: """Returns a list of all collections in the store with at least one listed product. Args: scrape_url: The URL of the valid /products.json endpoint of the store. @@ -406,10 +412,10 @@ async def get_collections(scrape_url: str, session: AsyncSession) -> list: while parameters["page"] <= 100: for attempt in range(1, max_attempts + 1): try: - res = await session.get(collections_url, params=parameters) + res: Response = await client.get(collections_url, query=parameters) res.raise_for_status() - data = res.json() - except (HTTPError, JSONDecodeError, Timeout): + data = await res.json() + except (StatusError, DecodingError, TimeoutError): if attempt == 10: raise @@ -440,7 +446,7 @@ async def get_collections(scrape_url: str, session: AsyncSession) -> list: -async def get_scrape_url(store_url: str, session: AsyncSession) -> str: +async def get_scrape_url(store_url: str, client: Client) -> str: """Returns the valid /products.json URL of a Shopify store. Args: store_url: The normal user-facing URL of the Shopify store. @@ -450,25 +456,25 @@ async def get_scrape_url(store_url: str, session: AsyncSession) -> str: products_endpoint = base_url + "/products.json" try: - res = await session.get(products_endpoint) + res = await client.get(products_endpoint) res.raise_for_status() - res.json() - except HTTPError: + data = await res.json() + except StatusError: products_endpoint = None except Exception: products_endpoint = None else: - if "products" in res.json(): + if "products" in data: return products_endpoint else: products_endpoint = None if not products_endpoint: try: - res = await session.get(base_url + "/" if base_url[-1] != "/" else "") + res = await client.get(base_url) # Use regex to find the .myshopify.com/products.json URL of the Shopify store in case the normal /products.json is blocked. - public_store_name = list(set(re.findall(pattern=r'\b([a-zA-Z0-9-]+)\.myshopify\.com\b', string=res.text)))[0] + public_store_name = list(set(re.findall(pattern=r'\b([a-zA-Z0-9-]+)\.myshopify\.com\b', string=await res.text())))[0] except IndexError: return "" except Exception: @@ -484,35 +490,59 @@ async def initiate_scraping_operation(store_url: str, output_csv_name: str="shop output_csv_name: The user's desired name for the output CSV file.""" scrape_count = 0 - scraped_handles = [] + scraped_handles = set() if not output_csv_name: output_csv_name = "shopify" - async with AsyncSession(impersonate="firefox", timeout=10) as scraping_session: - print(f"Initializing scraping operation...\n") - scrape_url = await get_scrape_url(store_url=store_url, session=scraping_session) + scraping_client = Client(emulation=Emulation.Chrome147, cookie_store=True) + + print(f"Initializing scraping operation...\n") + scrape_url = await get_scrape_url(store_url=store_url, client=scraping_client) - try: - total_products = await get_total_products_count(scrape_url=scrape_url, session=scraping_session) - except InvalidURL: - input(f"Failed to find any 'myshopify.com' public domain for {store_url}.\n\nPress ENTER to go to the main menu.") - return - - - # Implement the /products.json strategy for shops with less than or equal to 25,000 products. - if total_products <= 25_000: - scraping_info = { - "url": scrape_url, - "total_products": total_products, - "collection": {} - } + try: + total_products = await get_total_products_count(scrape_url=scrape_url, client=scraping_client) + except BuilderError: + input(f"Failed to find any 'myshopify.com' public domain for {store_url}.\n\nPress ENTER to go to the main menu.") + return + + + # Implement the /products.json strategy for shops with less than or equal to 25,000 products. + if total_products <= 25_000: + scraping_info = { + "url": scrape_url, + "total_products": total_products, + } - with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file: - start_time = time.perf_counter() - async for product in get_endpoint_products(scraping_info, scraping_session): + with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file: + start_time = time.perf_counter() + async for product in get_endpoint_products(scraping_info, scraping_client): + if product["Handle"] not in scraped_handles: + scraped_handles.add(product["Handle"]) + jsonl_file.write(json.dumps(product) + "\n") + scrape_count += 1 + + elapsed_secs = elapsed_time(since=start_time) + elapsed_secs_display = elapsed_secs % 60 + elapsed_mins = (elapsed_secs % 3600) // 60 + elapsed_hrs = elapsed_secs // 3600 + print(f"\rScrape Count: {scrape_count:,}/{total_products:,} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True) + + else: # Implement the collections strategy for stores with more than 25,000 products. + collections = await get_collections(scrape_url=scrape_url, client=scraping_client) + start_time = time.perf_counter() + + with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file: + task_groups = [get_collection_page_tasks(collection={"url": collection["url"], "products_count": collection["products_count"]}, client=scraping_client) for collection in collections] + tasks = [task for group in task_groups for task in group] + + + for future in asyncio.as_completed(tasks): + for raw_product in await future: + product = parse_product(raw_product) + if product["Handle"] not in scraped_handles: - scraped_handles.append(product["Handle"]) + scraped_handles.add(product["Handle"]) jsonl_file.write(json.dumps(product) + "\n") scrape_count += 1 @@ -520,28 +550,9 @@ async def initiate_scraping_operation(store_url: str, output_csv_name: str="shop elapsed_secs_display = elapsed_secs % 60 elapsed_mins = (elapsed_secs % 3600) // 60 elapsed_hrs = elapsed_secs // 3600 - print(f"\rScrape Count: {scrape_count}/{total_products} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True) - else: # Implement the collections strategy for stores with more than 25,000 products. - collections = await get_collections(scrape_url=scrape_url, session=scraping_session) - start_time = time.perf_counter() - with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file: - for collection_num, collection in enumerate(collections, 1): - scraping_info = { - "url": scrape_url, - "total_products": total_products, - "collection": {"url": collection["url"], "products_count": collection["products_count"]} - } - async for product in get_endpoint_products(scraping_info, scraping_session): - if product["Handle"] not in scraped_handles: - scraped_handles.append(product["Handle"]) - jsonl_file.write(json.dumps(product) + "\n") - scrape_count += 1 - - elapsed_secs = elapsed_time(since=start_time) - elapsed_secs_display = elapsed_secs % 60 - elapsed_mins = (elapsed_secs % 3600) // 60 - elapsed_hrs = elapsed_secs // 3600 - print(f"\rCollection: {collection_num}/{len(collections)} | Scrape Count: {scrape_count} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True) + print(f"\rScrape Count: {scrape_count:,} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True) + + print(f"\n\nScraping Complete!\n")