mirror of
https://github.com/Coding-Doctor-Omar/ShopExtract.git
synced 2026-06-25 08:48:17 +02:00
Compare commits
No commits in common. "cda660d240b76068069e3123c78fb4c86d41d438" and "77bb7aa1615ad9a8ba192fefe585e51580f9166b" have entirely different histories.
cda660d240
...
77bb7aa161
3 changed files with 93 additions and 105 deletions
15
README.md
15
README.md
|
|
@ -2,23 +2,16 @@
|
|||
|
||||

|
||||
|
||||
## Changelog
|
||||
|
||||
**May 30, 2026**
|
||||
|
||||
1. Migrated from curl_cffi to wreq.
|
||||
2. Upgraded the collections aggregation strategy to have concurrency at the collections level, resulting in a tremendously faster collections aggregation strategy for stores with more than 25k products.
|
||||
|
||||
|
||||
## Features
|
||||
|
||||
1. Interactive menu-based text-user-interface (TUI) with live on-screen scraping progress.
|
||||
2. Very fast scraping (~ up to 3,000 products/sec).
|
||||
3. Bypasses Cloudflare's anti-bot protections.
|
||||
2. Very fast scraping (~ up to 3,000 products/sec)
|
||||
3. Bypasses Cloudflare's anti-bot protections
|
||||
4. Handles timeouts via auto-retries and exponential back-off.
|
||||
5. Bypasses /products.json endpoint blocks by auto-detecting a store's myshopify.com domain.
|
||||
6. Produces ready-to-import CSVs (with proper column and row-formatting) to allow the user to immediately use the CSVs in Shopify.
|
||||
7. Respects the 15-MB-size and 50,000-row Shopify limits per CSV. For large catalogs, it auto-splits the data into multiple CSVs.
|
||||
7. Does not pass the 15-MB-size and 50,000-row limits per CSV. For large catalogs, it auto-splits the data into multiple CSVs.
|
||||
|
||||
## Outputs
|
||||
|
||||
|
|
@ -27,7 +20,7 @@ For any Shopify store, the scraper produces a JSON Lines (.jsonl) file that cont
|
|||
|
||||
## Limits
|
||||
|
||||
For stores with product catalogs of more than 25,000 products, the scraper falls back to the collections aggregation strategy, which makes it slower (mitigated significantly in the May 30, 2026 update).
|
||||
For stores with product catalogs of more than 25,000 products, the scraper falls back to the collections aggregation strategy, which makes it slower.
|
||||
|
||||
## Setup
|
||||
|
||||
|
|
|
|||
174
main.py
174
main.py
|
|
@ -1,7 +1,7 @@
|
|||
from wreq.exceptions import DecodingError, TimeoutError, StatusError, BuilderError
|
||||
from wreq import Client, Emulation, Response
|
||||
from curl_cffi.requests.exceptions import HTTPError, Timeout, InvalidURL
|
||||
from curl_cffi import AsyncSession
|
||||
from json.decoder import JSONDecodeError
|
||||
from collections.abc import AsyncGenerator
|
||||
from datetime import timedelta
|
||||
from asyncio import Semaphore
|
||||
from functools import wraps
|
||||
import pandas as pd
|
||||
|
|
@ -307,21 +307,21 @@ def parse_product(product: dict) -> dict:
|
|||
|
||||
return parsed_product
|
||||
|
||||
async def get_total_products_count(scrape_url: str, client: Client) -> int:
|
||||
async def get_total_products_count(scrape_url: str, session: AsyncSession) -> int:
|
||||
"""Gets the total number of products in the Shopify store. Returns 25001 for stores with more than 25k products.
|
||||
Args:
|
||||
scrape_url: The URL of the working /products.json endpoint of the Shopify store.
|
||||
client: A reference of the main scraping client."""
|
||||
session: A reference of the main scraping session."""
|
||||
|
||||
delay_time = 1
|
||||
max_attempts = 10
|
||||
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
res: Response = await client.get(scrape_url.replace("/products.json", "/meta.json"))
|
||||
res = await session.get(scrape_url.replace("/products.json", "/meta.json"))
|
||||
res.raise_for_status()
|
||||
data = await res.json()
|
||||
except (StatusError, DecodingError, TimeoutError):
|
||||
data = res.json()
|
||||
except (HTTPError, JSONDecodeError, Timeout):
|
||||
if attempt == 10:
|
||||
raise
|
||||
|
||||
|
|
@ -335,12 +335,12 @@ async def get_total_products_count(scrape_url: str, client: Client) -> int:
|
|||
return total_products
|
||||
|
||||
@limit_concurrency(limit=30)
|
||||
async def get_page_products(scrape_url: str, page: int, client: Client) -> list:
|
||||
async def get_page_products(scrape_url: str, page: int, session: AsyncSession) -> list:
|
||||
"""Returns raw product data from any given API page.
|
||||
Args:
|
||||
scrape_url: The specific API url (e.g. https://some-store.myshopify.com/products.json).
|
||||
page: The pagination API query paramater.
|
||||
client: A reference of the main scraping client."""
|
||||
session: A reference of the main scraping session."""
|
||||
|
||||
delay_time = 1
|
||||
max_attempts = 10
|
||||
|
|
@ -350,10 +350,10 @@ async def get_page_products(scrape_url: str, page: int, client: Client) -> list:
|
|||
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
res: Response = await client.get(scrape_url, query=parameters)
|
||||
res = await session.get(scrape_url, params=parameters)
|
||||
res.raise_for_status()
|
||||
data = await res.json()
|
||||
except (StatusError, DecodingError, TimeoutError):
|
||||
data = res.json()
|
||||
except (HTTPError, JSONDecodeError, Timeout):
|
||||
if attempt == 10:
|
||||
raise
|
||||
|
||||
|
|
@ -365,40 +365,33 @@ async def get_page_products(scrape_url: str, page: int, client: Client) -> list:
|
|||
|
||||
return data["products"]
|
||||
|
||||
async def get_endpoint_products(scrape_info: dict, client: Client) -> AsyncGenerator[dict, None, None]:
|
||||
async def get_endpoint_products(scrape_info: dict, session: AsyncSession) -> AsyncGenerator[dict, None, None]:
|
||||
"""Scrapes all available products from a given endpoint.
|
||||
Args:
|
||||
scrape_info: A dictionary containing necessary info such as the url of the endpoint, total products count of the store.
|
||||
client: A reference of the main scraping client."""
|
||||
scrape_info: A dictionary containing necessary info such as the url of the endpoint, total products count of the store, and collection info (if necessary).
|
||||
session: A reference of the main scraping session."""
|
||||
|
||||
scrape_url = scrape_info["url"]
|
||||
total_products = scrape_info["total_products"]
|
||||
collection = scrape_info["collection"]
|
||||
|
||||
num_pages = total_products // 250 + (1 if total_products % 250 > 0 else 0)
|
||||
|
||||
|
||||
tasks = [get_page_products(scrape_url, page_num, client) for page_num in range(1, num_pages + 1 if num_pages <= 100 else 101)]
|
||||
if collection:
|
||||
num_pages = collection["products_count"] // 250 + (1 if collection["products_count"] % 250 > 0 else 0)
|
||||
else:
|
||||
num_pages = total_products // 250 + (1 if total_products % 250 > 0 else 0)
|
||||
|
||||
tasks = [get_page_products(scrape_url if not collection else collection["url"], page_num, session) for page_num in range(1, num_pages + 1 if num_pages <= 100 else 101)]
|
||||
for future in asyncio.as_completed(tasks):
|
||||
for product in await future:
|
||||
yield parse_product(product)
|
||||
|
||||
def get_collection_page_tasks(collection: dict, client: Client) -> list:
|
||||
"""Returns a list of get_page_products() coroutines for a given collection.
|
||||
Args:
|
||||
collection: A dictionary containing collection information.
|
||||
client: A reference of the main scraping client."""
|
||||
|
||||
num_pages = collection["products_count"] // 250 + (1 if collection["products_count"] % 250 > 0 else 0)
|
||||
tasks = [get_page_products(collection["url"], page_num, client) for page_num in range(1, num_pages + 1 if num_pages <= 100 else 101)]
|
||||
|
||||
return tasks
|
||||
|
||||
|
||||
async def get_collections(scrape_url: str, client: Client) -> list:
|
||||
async def get_collections(scrape_url: str, session: AsyncSession) -> list:
|
||||
"""Returns a list of all collections in the store with at least one listed product.
|
||||
Args:
|
||||
scrape_url: The URL of the valid /products.json endpoint of the store.
|
||||
client: A reference of the main scraping client."""
|
||||
session: A reference of the main scraping session."""
|
||||
|
||||
parameters = {
|
||||
"page": 1,
|
||||
|
|
@ -413,10 +406,10 @@ async def get_collections(scrape_url: str, client: Client) -> list:
|
|||
while parameters["page"] <= 100:
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
res: Response = await client.get(collections_url, query=parameters)
|
||||
res = await session.get(collections_url, params=parameters)
|
||||
res.raise_for_status()
|
||||
data = await res.json()
|
||||
except (StatusError, DecodingError, TimeoutError):
|
||||
data = res.json()
|
||||
except (HTTPError, JSONDecodeError, Timeout):
|
||||
if attempt == 10:
|
||||
raise
|
||||
|
||||
|
|
@ -447,35 +440,35 @@ async def get_collections(scrape_url: str, client: Client) -> list:
|
|||
|
||||
|
||||
|
||||
async def get_scrape_url(store_url: str, client: Client) -> str:
|
||||
async def get_scrape_url(store_url: str, session: AsyncSession) -> str:
|
||||
"""Returns the valid /products.json URL of a Shopify store.
|
||||
Args:
|
||||
store_url: The normal user-facing URL of the Shopify store.
|
||||
client: A reference of the main scraping client."""
|
||||
session: A reference of the main scraping session"""
|
||||
|
||||
base_url = "https://" + store_url.split("//")[-1].split("/")[0].split("?")[0]
|
||||
products_endpoint = base_url + "/products.json"
|
||||
|
||||
try:
|
||||
res = await client.get(products_endpoint)
|
||||
res = await session.get(products_endpoint)
|
||||
res.raise_for_status()
|
||||
data = await res.json()
|
||||
except StatusError:
|
||||
res.json()
|
||||
except HTTPError:
|
||||
products_endpoint = None
|
||||
except Exception:
|
||||
products_endpoint = None
|
||||
else:
|
||||
if "products" in data:
|
||||
if "products" in res.json():
|
||||
return products_endpoint
|
||||
else:
|
||||
products_endpoint = None
|
||||
|
||||
if not products_endpoint:
|
||||
try:
|
||||
res = await client.get(base_url)
|
||||
res = await session.get(base_url + "/" if base_url[-1] != "/" else "")
|
||||
|
||||
# Use regex to find the <STORE>.myshopify.com/products.json URL of the Shopify store in case the normal /products.json is blocked.
|
||||
public_store_name = list(set(re.findall(pattern=r'\b([a-zA-Z0-9-]+)\.myshopify\.com\b', string=await res.text())))[0]
|
||||
public_store_name = list(set(re.findall(pattern=r'\b([a-zA-Z0-9-]+)\.myshopify\.com\b', string=res.text)))[0]
|
||||
except IndexError:
|
||||
return ""
|
||||
except Exception:
|
||||
|
|
@ -491,59 +484,35 @@ async def initiate_scraping_operation(store_url: str, output_csv_name: str="shop
|
|||
output_csv_name: The user's desired name for the output CSV file."""
|
||||
|
||||
scrape_count = 0
|
||||
scraped_handles = set()
|
||||
scraped_handles = []
|
||||
|
||||
if not output_csv_name:
|
||||
output_csv_name = "shopify"
|
||||
|
||||
scraping_client = Client(emulation=Emulation.Chrome147, cookie_store=True, timeout=timedelta(seconds=10))
|
||||
|
||||
print(f"Initializing scraping operation...\n")
|
||||
scrape_url = await get_scrape_url(store_url=store_url, client=scraping_client)
|
||||
async with AsyncSession(impersonate="firefox", timeout=10) as scraping_session:
|
||||
print(f"Initializing scraping operation...\n")
|
||||
scrape_url = await get_scrape_url(store_url=store_url, session=scraping_session)
|
||||
|
||||
try:
|
||||
total_products = await get_total_products_count(scrape_url=scrape_url, client=scraping_client)
|
||||
except BuilderError:
|
||||
input(f"Failed to find any 'myshopify.com' public domain for {store_url}.\n\nPress ENTER to go to the main menu.")
|
||||
return
|
||||
|
||||
|
||||
# Implement the /products.json strategy for shops with less than or equal to 25,000 products.
|
||||
if total_products <= 25_000:
|
||||
scraping_info = {
|
||||
"url": scrape_url,
|
||||
"total_products": total_products,
|
||||
}
|
||||
try:
|
||||
total_products = await get_total_products_count(scrape_url=scrape_url, session=scraping_session)
|
||||
except InvalidURL:
|
||||
input(f"Failed to find any 'myshopify.com' public domain for {store_url}.\n\nPress ENTER to go to the main menu.")
|
||||
return
|
||||
|
||||
|
||||
# Implement the /products.json strategy for shops with less than or equal to 25,000 products.
|
||||
if total_products <= 25_000:
|
||||
scraping_info = {
|
||||
"url": scrape_url,
|
||||
"total_products": total_products,
|
||||
"collection": {}
|
||||
}
|
||||
|
||||
with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file:
|
||||
start_time = time.perf_counter()
|
||||
async for product in get_endpoint_products(scraping_info, scraping_client):
|
||||
if product["Handle"] not in scraped_handles:
|
||||
scraped_handles.add(product["Handle"])
|
||||
jsonl_file.write(json.dumps(product) + "\n")
|
||||
scrape_count += 1
|
||||
|
||||
elapsed_secs = elapsed_time(since=start_time)
|
||||
elapsed_secs_display = elapsed_secs % 60
|
||||
elapsed_mins = (elapsed_secs % 3600) // 60
|
||||
elapsed_hrs = elapsed_secs // 3600
|
||||
print(f"\rScrape Count: {scrape_count:,}/{total_products:,} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True)
|
||||
|
||||
else: # Implement the collections strategy for stores with more than 25,000 products.
|
||||
collections = await get_collections(scrape_url=scrape_url, client=scraping_client)
|
||||
start_time = time.perf_counter()
|
||||
|
||||
with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file:
|
||||
task_groups = [get_collection_page_tasks(collection={"url": collection["url"], "products_count": collection["products_count"]}, client=scraping_client) for collection in collections]
|
||||
tasks = [task for group in task_groups for task in group]
|
||||
|
||||
|
||||
for future in asyncio.as_completed(tasks):
|
||||
for raw_product in await future:
|
||||
product = parse_product(raw_product)
|
||||
|
||||
with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file:
|
||||
start_time = time.perf_counter()
|
||||
async for product in get_endpoint_products(scraping_info, scraping_session):
|
||||
if product["Handle"] not in scraped_handles:
|
||||
scraped_handles.add(product["Handle"])
|
||||
scraped_handles.append(product["Handle"])
|
||||
jsonl_file.write(json.dumps(product) + "\n")
|
||||
scrape_count += 1
|
||||
|
||||
|
|
@ -551,9 +520,28 @@ async def initiate_scraping_operation(store_url: str, output_csv_name: str="shop
|
|||
elapsed_secs_display = elapsed_secs % 60
|
||||
elapsed_mins = (elapsed_secs % 3600) // 60
|
||||
elapsed_hrs = elapsed_secs // 3600
|
||||
print(f"\rScrape Count: {scrape_count:,} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True)
|
||||
|
||||
|
||||
print(f"\rScrape Count: {scrape_count}/{total_products} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True)
|
||||
else: # Implement the collections strategy for stores with more than 25,000 products.
|
||||
collections = await get_collections(scrape_url=scrape_url, session=scraping_session)
|
||||
start_time = time.perf_counter()
|
||||
with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file:
|
||||
for collection_num, collection in enumerate(collections, 1):
|
||||
scraping_info = {
|
||||
"url": scrape_url,
|
||||
"total_products": total_products,
|
||||
"collection": {"url": collection["url"], "products_count": collection["products_count"]}
|
||||
}
|
||||
async for product in get_endpoint_products(scraping_info, scraping_session):
|
||||
if product["Handle"] not in scraped_handles:
|
||||
scraped_handles.append(product["Handle"])
|
||||
jsonl_file.write(json.dumps(product) + "\n")
|
||||
scrape_count += 1
|
||||
|
||||
elapsed_secs = elapsed_time(since=start_time)
|
||||
elapsed_secs_display = elapsed_secs % 60
|
||||
elapsed_mins = (elapsed_secs % 3600) // 60
|
||||
elapsed_hrs = elapsed_secs // 3600
|
||||
print(f"\rCollection: {collection_num}/{len(collections)} | Scrape Count: {scrape_count} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True)
|
||||
|
||||
|
||||
print(f"\n\nScraping Complete!\n")
|
||||
|
|
@ -593,7 +581,7 @@ async def main() -> None:
|
|||
elif user_choice == 2:
|
||||
clear_screen()
|
||||
print(f"{LOGO}\n")
|
||||
print("ShopExtract is your go-to tool for scraping ANY shopify store on the internet.")
|
||||
print("Shopify Scraper is your go-to tool for scraping ANY shopify store on the internet.")
|
||||
print("It reliably and quickly extracts the entire product catalog of any shopify store and generates Shopify-compatible, import-ready CSVs.")
|
||||
print("All you have to do is provide the Shopify store URL.")
|
||||
print("-------------------------------------------------------\n")
|
||||
|
|
|
|||
|
|
@ -1,6 +1,13 @@
|
|||
certifi==2026.5.20
|
||||
cffi==2.0.0
|
||||
curl_cffi==0.15.0
|
||||
markdown-it-py==4.2.0
|
||||
mdurl==0.1.2
|
||||
numpy==2.4.6
|
||||
pandas==3.0.3
|
||||
pycparser==3.0
|
||||
Pygments==2.20.0
|
||||
python-dateutil==2.9.0.post0
|
||||
rich==15.0.0
|
||||
six==1.17.0
|
||||
tzdata==2026.2
|
||||
wreq==0.11.3
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue