Compare commits

...

7 commits

3 changed files with 105 additions and 93 deletions

View file

@ -2,16 +2,23 @@
![Repo Logo](shopify_csv_extractor_logo.svg)
## Changelog
**May 30, 2026**
1. Migrated from curl_cffi to wreq.
2. Upgraded the collections aggregation strategy to have concurrency at the collections level, resulting in a tremendously faster collections aggregation strategy for stores with more than 25k products.
## Features
1. Interactive menu-based text-user-interface (TUI) with live on-screen scraping progress.
2. Very fast scraping (~ up to 3,000 products/sec)
3. Bypasses Cloudflare's anti-bot protections
2. Very fast scraping (~ up to 3,000 products/sec).
3. Bypasses Cloudflare's anti-bot protections.
4. Handles timeouts via auto-retries and exponential back-off.
5. Bypasses /products.json endpoint blocks by auto-detecting a store's myshopify.com domain.
6. Produces ready-to-import CSVs (with proper column and row-formatting) to allow the user to immediately use the CSVs in Shopify.
7. Does not pass the 15-MB-size and 50,000-row limits per CSV. For large catalogs, it auto-splits the data into multiple CSVs.
7. Respects the 15-MB-size and 50,000-row Shopify limits per CSV. For large catalogs, it auto-splits the data into multiple CSVs.
## Outputs
@ -20,7 +27,7 @@ For any Shopify store, the scraper produces a JSON Lines (.jsonl) file that cont
## Limits
For stores with product catalogs of more than 25,000 products, the scraper falls back to the collections aggregation strategy, which makes it slower.
For stores with product catalogs of more than 25,000 products, the scraper falls back to the collections aggregation strategy, which makes it slower (mitigated significantly in the May 30, 2026 update).
## Setup

174
main.py
View file

@ -1,7 +1,7 @@
from curl_cffi.requests.exceptions import HTTPError, Timeout, InvalidURL
from curl_cffi import AsyncSession
from json.decoder import JSONDecodeError
from wreq.exceptions import DecodingError, TimeoutError, StatusError, BuilderError
from wreq import Client, Emulation, Response
from collections.abc import AsyncGenerator
from datetime import timedelta
from asyncio import Semaphore
from functools import wraps
import pandas as pd
@ -307,21 +307,21 @@ def parse_product(product: dict) -> dict:
return parsed_product
async def get_total_products_count(scrape_url: str, session: AsyncSession) -> int:
async def get_total_products_count(scrape_url: str, client: Client) -> int:
"""Gets the total number of products in the Shopify store. Returns 25001 for stores with more than 25k products.
Args:
scrape_url: The URL of the working /products.json endpoint of the Shopify store.
session: A reference of the main scraping session."""
client: A reference of the main scraping client."""
delay_time = 1
max_attempts = 10
for attempt in range(1, max_attempts + 1):
try:
res = await session.get(scrape_url.replace("/products.json", "/meta.json"))
res: Response = await client.get(scrape_url.replace("/products.json", "/meta.json"))
res.raise_for_status()
data = res.json()
except (HTTPError, JSONDecodeError, Timeout):
data = await res.json()
except (StatusError, DecodingError, TimeoutError):
if attempt == 10:
raise
@ -335,12 +335,12 @@ async def get_total_products_count(scrape_url: str, session: AsyncSession) -> in
return total_products
@limit_concurrency(limit=30)
async def get_page_products(scrape_url: str, page: int, session: AsyncSession) -> list:
async def get_page_products(scrape_url: str, page: int, client: Client) -> list:
"""Returns raw product data from any given API page.
Args:
scrape_url: The specific API url (e.g. https://some-store.myshopify.com/products.json).
page: The pagination API query paramater.
session: A reference of the main scraping session."""
client: A reference of the main scraping client."""
delay_time = 1
max_attempts = 10
@ -350,10 +350,10 @@ async def get_page_products(scrape_url: str, page: int, session: AsyncSession) -
for attempt in range(1, max_attempts + 1):
try:
res = await session.get(scrape_url, params=parameters)
res: Response = await client.get(scrape_url, query=parameters)
res.raise_for_status()
data = res.json()
except (HTTPError, JSONDecodeError, Timeout):
data = await res.json()
except (StatusError, DecodingError, TimeoutError):
if attempt == 10:
raise
@ -365,33 +365,40 @@ async def get_page_products(scrape_url: str, page: int, session: AsyncSession) -
return data["products"]
async def get_endpoint_products(scrape_info: dict, session: AsyncSession) -> AsyncGenerator[dict, None, None]:
async def get_endpoint_products(scrape_info: dict, client: Client) -> AsyncGenerator[dict, None, None]:
"""Scrapes all available products from a given endpoint.
Args:
scrape_info: A dictionary containing necessary info such as the url of the endpoint, total products count of the store, and collection info (if necessary).
session: A reference of the main scraping session."""
scrape_info: A dictionary containing necessary info such as the url of the endpoint, total products count of the store.
client: A reference of the main scraping client."""
scrape_url = scrape_info["url"]
total_products = scrape_info["total_products"]
collection = scrape_info["collection"]
if collection:
num_pages = collection["products_count"] // 250 + (1 if collection["products_count"] % 250 > 0 else 0)
else:
num_pages = total_products // 250 + (1 if total_products % 250 > 0 else 0)
tasks = [get_page_products(scrape_url if not collection else collection["url"], page_num, session) for page_num in range(1, num_pages + 1 if num_pages <= 100 else 101)]
num_pages = total_products // 250 + (1 if total_products % 250 > 0 else 0)
tasks = [get_page_products(scrape_url, page_num, client) for page_num in range(1, num_pages + 1 if num_pages <= 100 else 101)]
for future in asyncio.as_completed(tasks):
for product in await future:
yield parse_product(product)
def get_collection_page_tasks(collection: dict, client: Client) -> list:
"""Returns a list of get_page_products() coroutines for a given collection.
Args:
collection: A dictionary containing collection information.
client: A reference of the main scraping client."""
num_pages = collection["products_count"] // 250 + (1 if collection["products_count"] % 250 > 0 else 0)
tasks = [get_page_products(collection["url"], page_num, client) for page_num in range(1, num_pages + 1 if num_pages <= 100 else 101)]
return tasks
async def get_collections(scrape_url: str, session: AsyncSession) -> list:
async def get_collections(scrape_url: str, client: Client) -> list:
"""Returns a list of all collections in the store with at least one listed product.
Args:
scrape_url: The URL of the valid /products.json endpoint of the store.
session: A reference of the main scraping session."""
client: A reference of the main scraping client."""
parameters = {
"page": 1,
@ -406,10 +413,10 @@ async def get_collections(scrape_url: str, session: AsyncSession) -> list:
while parameters["page"] <= 100:
for attempt in range(1, max_attempts + 1):
try:
res = await session.get(collections_url, params=parameters)
res: Response = await client.get(collections_url, query=parameters)
res.raise_for_status()
data = res.json()
except (HTTPError, JSONDecodeError, Timeout):
data = await res.json()
except (StatusError, DecodingError, TimeoutError):
if attempt == 10:
raise
@ -440,35 +447,35 @@ async def get_collections(scrape_url: str, session: AsyncSession) -> list:
async def get_scrape_url(store_url: str, session: AsyncSession) -> str:
async def get_scrape_url(store_url: str, client: Client) -> str:
"""Returns the valid /products.json URL of a Shopify store.
Args:
store_url: The normal user-facing URL of the Shopify store.
session: A reference of the main scraping session"""
client: A reference of the main scraping client."""
base_url = "https://" + store_url.split("//")[-1].split("/")[0].split("?")[0]
products_endpoint = base_url + "/products.json"
try:
res = await session.get(products_endpoint)
res = await client.get(products_endpoint)
res.raise_for_status()
res.json()
except HTTPError:
data = await res.json()
except StatusError:
products_endpoint = None
except Exception:
products_endpoint = None
else:
if "products" in res.json():
if "products" in data:
return products_endpoint
else:
products_endpoint = None
if not products_endpoint:
try:
res = await session.get(base_url + "/" if base_url[-1] != "/" else "")
res = await client.get(base_url)
# Use regex to find the <STORE>.myshopify.com/products.json URL of the Shopify store in case the normal /products.json is blocked.
public_store_name = list(set(re.findall(pattern=r'\b([a-zA-Z0-9-]+)\.myshopify\.com\b', string=res.text)))[0]
public_store_name = list(set(re.findall(pattern=r'\b([a-zA-Z0-9-]+)\.myshopify\.com\b', string=await res.text())))[0]
except IndexError:
return ""
except Exception:
@ -484,35 +491,59 @@ async def initiate_scraping_operation(store_url: str, output_csv_name: str="shop
output_csv_name: The user's desired name for the output CSV file."""
scrape_count = 0
scraped_handles = []
scraped_handles = set()
if not output_csv_name:
output_csv_name = "shopify"
async with AsyncSession(impersonate="firefox", timeout=10) as scraping_session:
print(f"Initializing scraping operation...\n")
scrape_url = await get_scrape_url(store_url=store_url, session=scraping_session)
scraping_client = Client(emulation=Emulation.Chrome147, cookie_store=True, timeout=timedelta(seconds=10))
print(f"Initializing scraping operation...\n")
scrape_url = await get_scrape_url(store_url=store_url, client=scraping_client)
try:
total_products = await get_total_products_count(scrape_url=scrape_url, session=scraping_session)
except InvalidURL:
input(f"Failed to find any 'myshopify.com' public domain for {store_url}.\n\nPress ENTER to go to the main menu.")
return
# Implement the /products.json strategy for shops with less than or equal to 25,000 products.
if total_products <= 25_000:
scraping_info = {
"url": scrape_url,
"total_products": total_products,
"collection": {}
}
try:
total_products = await get_total_products_count(scrape_url=scrape_url, client=scraping_client)
except BuilderError:
input(f"Failed to find any 'myshopify.com' public domain for {store_url}.\n\nPress ENTER to go to the main menu.")
return
# Implement the /products.json strategy for shops with less than or equal to 25,000 products.
if total_products <= 25_000:
scraping_info = {
"url": scrape_url,
"total_products": total_products,
}
with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file:
start_time = time.perf_counter()
async for product in get_endpoint_products(scraping_info, scraping_session):
with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file:
start_time = time.perf_counter()
async for product in get_endpoint_products(scraping_info, scraping_client):
if product["Handle"] not in scraped_handles:
scraped_handles.add(product["Handle"])
jsonl_file.write(json.dumps(product) + "\n")
scrape_count += 1
elapsed_secs = elapsed_time(since=start_time)
elapsed_secs_display = elapsed_secs % 60
elapsed_mins = (elapsed_secs % 3600) // 60
elapsed_hrs = elapsed_secs // 3600
print(f"\rScrape Count: {scrape_count:,}/{total_products:,} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True)
else: # Implement the collections strategy for stores with more than 25,000 products.
collections = await get_collections(scrape_url=scrape_url, client=scraping_client)
start_time = time.perf_counter()
with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file:
task_groups = [get_collection_page_tasks(collection={"url": collection["url"], "products_count": collection["products_count"]}, client=scraping_client) for collection in collections]
tasks = [task for group in task_groups for task in group]
for future in asyncio.as_completed(tasks):
for raw_product in await future:
product = parse_product(raw_product)
if product["Handle"] not in scraped_handles:
scraped_handles.append(product["Handle"])
scraped_handles.add(product["Handle"])
jsonl_file.write(json.dumps(product) + "\n")
scrape_count += 1
@ -520,28 +551,9 @@ async def initiate_scraping_operation(store_url: str, output_csv_name: str="shop
elapsed_secs_display = elapsed_secs % 60
elapsed_mins = (elapsed_secs % 3600) // 60
elapsed_hrs = elapsed_secs // 3600
print(f"\rScrape Count: {scrape_count}/{total_products} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True)
else: # Implement the collections strategy for stores with more than 25,000 products.
collections = await get_collections(scrape_url=scrape_url, session=scraping_session)
start_time = time.perf_counter()
with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file:
for collection_num, collection in enumerate(collections, 1):
scraping_info = {
"url": scrape_url,
"total_products": total_products,
"collection": {"url": collection["url"], "products_count": collection["products_count"]}
}
async for product in get_endpoint_products(scraping_info, scraping_session):
if product["Handle"] not in scraped_handles:
scraped_handles.append(product["Handle"])
jsonl_file.write(json.dumps(product) + "\n")
scrape_count += 1
elapsed_secs = elapsed_time(since=start_time)
elapsed_secs_display = elapsed_secs % 60
elapsed_mins = (elapsed_secs % 3600) // 60
elapsed_hrs = elapsed_secs // 3600
print(f"\rCollection: {collection_num}/{len(collections)} | Scrape Count: {scrape_count} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True)
print(f"\rScrape Count: {scrape_count:,} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True)
print(f"\n\nScraping Complete!\n")
@ -581,7 +593,7 @@ async def main() -> None:
elif user_choice == 2:
clear_screen()
print(f"{LOGO}\n")
print("Shopify Scraper is your go-to tool for scraping ANY shopify store on the internet.")
print("ShopExtract is your go-to tool for scraping ANY shopify store on the internet.")
print("It reliably and quickly extracts the entire product catalog of any shopify store and generates Shopify-compatible, import-ready CSVs.")
print("All you have to do is provide the Shopify store URL.")
print("-------------------------------------------------------\n")

View file

@ -1,13 +1,6 @@
certifi==2026.5.20
cffi==2.0.0
curl_cffi==0.15.0
markdown-it-py==4.2.0
mdurl==0.1.2
numpy==2.4.6
pandas==3.0.3
pycparser==3.0
Pygments==2.20.0
python-dateutil==2.9.0.post0
rich==15.0.0
six==1.17.0
tzdata==2026.2
wreq==0.11.3