Update collections aggregation strategy to have concurrency at the collections level; replace curl_cffi with wreq.

This commit is contained in:
Omar 2026-05-30 08:21:23 +03:00
parent fe6b35a93e
commit 63ba2d5d01

165
main.py
View file

@ -1,6 +1,5 @@
from curl_cffi.requests.exceptions import HTTPError, Timeout, InvalidURL
from curl_cffi import AsyncSession
from json.decoder import JSONDecodeError
from wreq.exceptions import DecodingError, TimeoutError, StatusError, BuilderError
from wreq import Client, Emulation, Response
from collections.abc import AsyncGenerator
from asyncio import Semaphore
from functools import wraps
@ -307,21 +306,21 @@ def parse_product(product: dict) -> dict:
return parsed_product
async def get_total_products_count(scrape_url: str, session: AsyncSession) -> int:
async def get_total_products_count(scrape_url: str, client: Client) -> int:
"""Gets the total number of products in the Shopify store. Returns 25001 for stores with more than 25k products.
Args:
scrape_url: The URL of the working /products.json endpoint of the Shopify store.
session: A reference of the main scraping session."""
client: A reference of the main scraping client."""
delay_time = 1
max_attempts = 10
for attempt in range(1, max_attempts + 1):
try:
res = await session.get(scrape_url.replace("/products.json", "/meta.json"))
res: Response = await client.get(scrape_url.replace("/products.json", "/meta.json"))
res.raise_for_status()
data = res.json()
except (HTTPError, JSONDecodeError, Timeout):
data = await res.json()
except (StatusError, DecodingError, TimeoutError):
if attempt == 10:
raise
@ -335,12 +334,12 @@ async def get_total_products_count(scrape_url: str, session: AsyncSession) -> in
return total_products
@limit_concurrency(limit=30)
async def get_page_products(scrape_url: str, page: int, session: AsyncSession) -> list:
async def get_page_products(scrape_url: str, page: int, client: Client) -> list:
"""Returns raw product data from any given API page.
Args:
scrape_url: The specific API url (e.g. https://some-store.myshopify.com/products.json).
page: The pagination API query paramater.
session: A reference of the main scraping session."""
client: A reference of the main scraping client."""
delay_time = 1
max_attempts = 10
@ -350,10 +349,10 @@ async def get_page_products(scrape_url: str, page: int, session: AsyncSession) -
for attempt in range(1, max_attempts + 1):
try:
res = await session.get(scrape_url, params=parameters)
res: Response = await client.get(scrape_url, query=parameters)
res.raise_for_status()
data = res.json()
except (HTTPError, JSONDecodeError, Timeout):
data = await res.json()
except (StatusError, DecodingError, TimeoutError):
if attempt == 10:
raise
@ -365,29 +364,36 @@ async def get_page_products(scrape_url: str, page: int, session: AsyncSession) -
return data["products"]
async def get_endpoint_products(scrape_info: dict, session: AsyncSession) -> AsyncGenerator[dict, None, None]:
async def get_endpoint_products(scrape_info: dict, client: Client) -> AsyncGenerator[dict, None, None]:
"""Scrapes all available products from a given endpoint.
Args:
scrape_info: A dictionary containing necessary info such as the url of the endpoint, total products count of the store, and collection info (if necessary).
scrape_info: A dictionary containing necessary info such as the url of the endpoint, total products count of the store.
session: A reference of the main scraping session."""
scrape_url = scrape_info["url"]
total_products = scrape_info["total_products"]
collection = scrape_info["collection"]
if collection:
num_pages = collection["products_count"] // 250 + (1 if collection["products_count"] % 250 > 0 else 0)
else:
num_pages = total_products // 250 + (1 if total_products % 250 > 0 else 0)
tasks = [get_page_products(scrape_url if not collection else collection["url"], page_num, session) for page_num in range(1, num_pages + 1 if num_pages <= 100 else 101)]
num_pages = total_products // 250 + (1 if total_products % 250 > 0 else 0)
tasks = [get_page_products(scrape_url, page_num, client) for page_num in range(1, num_pages + 1 if num_pages <= 100 else 101)]
for future in asyncio.as_completed(tasks):
for product in await future:
yield parse_product(product)
def get_collection_page_tasks(collection: dict, client: Client) -> list:
"""Returns a list of get_page_products() coroutines for a given collection.
Args:
collection: A dictionary containing collection information.
client: A reference of the main scraping client."""
num_pages = collection["products_count"] // 250 + (1 if collection["products_count"] % 250 > 0 else 0)
tasks = [get_page_products(collection["url"], page_num, client) for page_num in range(1, num_pages + 1 if num_pages <= 100 else 101)]
return tasks
async def get_collections(scrape_url: str, session: AsyncSession) -> list:
async def get_collections(scrape_url: str, client: Client) -> list:
"""Returns a list of all collections in the store with at least one listed product.
Args:
scrape_url: The URL of the valid /products.json endpoint of the store.
@ -406,10 +412,10 @@ async def get_collections(scrape_url: str, session: AsyncSession) -> list:
while parameters["page"] <= 100:
for attempt in range(1, max_attempts + 1):
try:
res = await session.get(collections_url, params=parameters)
res: Response = await client.get(collections_url, query=parameters)
res.raise_for_status()
data = res.json()
except (HTTPError, JSONDecodeError, Timeout):
data = await res.json()
except (StatusError, DecodingError, TimeoutError):
if attempt == 10:
raise
@ -440,7 +446,7 @@ async def get_collections(scrape_url: str, session: AsyncSession) -> list:
async def get_scrape_url(store_url: str, session: AsyncSession) -> str:
async def get_scrape_url(store_url: str, client: Client) -> str:
"""Returns the valid /products.json URL of a Shopify store.
Args:
store_url: The normal user-facing URL of the Shopify store.
@ -450,25 +456,25 @@ async def get_scrape_url(store_url: str, session: AsyncSession) -> str:
products_endpoint = base_url + "/products.json"
try:
res = await session.get(products_endpoint)
res = await client.get(products_endpoint)
res.raise_for_status()
res.json()
except HTTPError:
data = await res.json()
except StatusError:
products_endpoint = None
except Exception:
products_endpoint = None
else:
if "products" in res.json():
if "products" in data:
return products_endpoint
else:
products_endpoint = None
if not products_endpoint:
try:
res = await session.get(base_url + "/" if base_url[-1] != "/" else "")
res = await client.get(base_url)
# Use regex to find the <STORE>.myshopify.com/products.json URL of the Shopify store in case the normal /products.json is blocked.
public_store_name = list(set(re.findall(pattern=r'\b([a-zA-Z0-9-]+)\.myshopify\.com\b', string=res.text)))[0]
public_store_name = list(set(re.findall(pattern=r'\b([a-zA-Z0-9-]+)\.myshopify\.com\b', string=await res.text())))[0]
except IndexError:
return ""
except Exception:
@ -484,35 +490,59 @@ async def initiate_scraping_operation(store_url: str, output_csv_name: str="shop
output_csv_name: The user's desired name for the output CSV file."""
scrape_count = 0
scraped_handles = []
scraped_handles = set()
if not output_csv_name:
output_csv_name = "shopify"
async with AsyncSession(impersonate="firefox", timeout=10) as scraping_session:
print(f"Initializing scraping operation...\n")
scrape_url = await get_scrape_url(store_url=store_url, session=scraping_session)
scraping_client = Client(emulation=Emulation.Chrome147, cookie_store=True)
print(f"Initializing scraping operation...\n")
scrape_url = await get_scrape_url(store_url=store_url, client=scraping_client)
try:
total_products = await get_total_products_count(scrape_url=scrape_url, session=scraping_session)
except InvalidURL:
input(f"Failed to find any 'myshopify.com' public domain for {store_url}.\n\nPress ENTER to go to the main menu.")
return
# Implement the /products.json strategy for shops with less than or equal to 25,000 products.
if total_products <= 25_000:
scraping_info = {
"url": scrape_url,
"total_products": total_products,
"collection": {}
}
try:
total_products = await get_total_products_count(scrape_url=scrape_url, client=scraping_client)
except BuilderError:
input(f"Failed to find any 'myshopify.com' public domain for {store_url}.\n\nPress ENTER to go to the main menu.")
return
# Implement the /products.json strategy for shops with less than or equal to 25,000 products.
if total_products <= 25_000:
scraping_info = {
"url": scrape_url,
"total_products": total_products,
}
with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file:
start_time = time.perf_counter()
async for product in get_endpoint_products(scraping_info, scraping_session):
with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file:
start_time = time.perf_counter()
async for product in get_endpoint_products(scraping_info, scraping_client):
if product["Handle"] not in scraped_handles:
scraped_handles.add(product["Handle"])
jsonl_file.write(json.dumps(product) + "\n")
scrape_count += 1
elapsed_secs = elapsed_time(since=start_time)
elapsed_secs_display = elapsed_secs % 60
elapsed_mins = (elapsed_secs % 3600) // 60
elapsed_hrs = elapsed_secs // 3600
print(f"\rScrape Count: {scrape_count:,}/{total_products:,} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True)
else: # Implement the collections strategy for stores with more than 25,000 products.
collections = await get_collections(scrape_url=scrape_url, client=scraping_client)
start_time = time.perf_counter()
with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file:
task_groups = [get_collection_page_tasks(collection={"url": collection["url"], "products_count": collection["products_count"]}, client=scraping_client) for collection in collections]
tasks = [task for group in task_groups for task in group]
for future in asyncio.as_completed(tasks):
for raw_product in await future:
product = parse_product(raw_product)
if product["Handle"] not in scraped_handles:
scraped_handles.append(product["Handle"])
scraped_handles.add(product["Handle"])
jsonl_file.write(json.dumps(product) + "\n")
scrape_count += 1
@ -520,28 +550,9 @@ async def initiate_scraping_operation(store_url: str, output_csv_name: str="shop
elapsed_secs_display = elapsed_secs % 60
elapsed_mins = (elapsed_secs % 3600) // 60
elapsed_hrs = elapsed_secs // 3600
print(f"\rScrape Count: {scrape_count}/{total_products} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True)
else: # Implement the collections strategy for stores with more than 25,000 products.
collections = await get_collections(scrape_url=scrape_url, session=scraping_session)
start_time = time.perf_counter()
with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file:
for collection_num, collection in enumerate(collections, 1):
scraping_info = {
"url": scrape_url,
"total_products": total_products,
"collection": {"url": collection["url"], "products_count": collection["products_count"]}
}
async for product in get_endpoint_products(scraping_info, scraping_session):
if product["Handle"] not in scraped_handles:
scraped_handles.append(product["Handle"])
jsonl_file.write(json.dumps(product) + "\n")
scrape_count += 1
elapsed_secs = elapsed_time(since=start_time)
elapsed_secs_display = elapsed_secs % 60
elapsed_mins = (elapsed_secs % 3600) // 60
elapsed_hrs = elapsed_secs // 3600
print(f"\rCollection: {collection_num}/{len(collections)} | Scrape Count: {scrape_count} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True)
print(f"\rScrape Count: {scrape_count:,} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True)
print(f"\n\nScraping Complete!\n")