mirror of
https://github.com/Coding-Doctor-Omar/ShopExtract.git
synced 2026-06-07 07:55:18 +02:00
592 lines
22 KiB
Python
592 lines
22 KiB
Python
from curl_cffi.requests.exceptions import HTTPError, Timeout
|
|
from curl_cffi import AsyncSession
|
|
from json.decoder import JSONDecodeError
|
|
from collections.abc import AsyncGenerator
|
|
from asyncio import Semaphore
|
|
from functools import wraps
|
|
import pandas as pd
|
|
import asyncio
|
|
import random
|
|
import json
|
|
import time
|
|
import csv
|
|
import sys
|
|
import re
|
|
import os
|
|
|
|
LOGO = r"""
|
|
_____ _ ______ _ _
|
|
/ ____| | | ____| | | | |
|
|
| (___ | |__ ___ _ __ | |__ __ _| |_ _ __ __ _ ___| |_
|
|
\___ \| '_ \ / _ \| '_ \| __| \ \/ / __| '__/ _` |/ __| __|
|
|
____) | | | | (_) | |_) | |____ > <| |_| | | (_| | (__| |_
|
|
|_____/|_| |_|\___/| .__/|______/_/\_\\__|_| \__,_|\___|\__|
|
|
| |
|
|
|_|
|
|
"""
|
|
|
|
MENU_OPTIONS = """
|
|
***************************
|
|
* MAIN MENU *
|
|
***************************
|
|
* 1. Generate Shopify CSV *
|
|
* 2. About *
|
|
* 3. Exit *
|
|
***************************
|
|
"""
|
|
|
|
def limit_concurrency(limit: int):
|
|
"""Limits the number of concurrent coroutines."""
|
|
|
|
SCRAPING_LIMIT = Semaphore(limit)
|
|
|
|
def decorator(scrape_func):
|
|
@wraps(scrape_func)
|
|
async def wrapper(*args, **kwargs):
|
|
async with SCRAPING_LIMIT:
|
|
return await scrape_func(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
def clear_screen() -> None:
|
|
"""Clears the screen in the console for better UX."""
|
|
|
|
if os.name == "nt": # Windows OS
|
|
os.system("cls")
|
|
else: # MacOS or Linux
|
|
os.system("clear")
|
|
|
|
def elapsed_time(since: float) -> int:
|
|
"""Returns the elapsed time in seconds since a given start time in seconds.
|
|
Args:
|
|
since: A timestamp in seconds, representing the start time (e.g. time.perf_counter())."""
|
|
|
|
current_time = time.perf_counter()
|
|
time_elapsed = round(current_time - since)
|
|
return time_elapsed
|
|
|
|
def create_empty_csv(name: str) -> None:
|
|
"""Generates an empty CSV with the required Shopify header row.
|
|
Args:
|
|
name: The name of the output CSV file."""
|
|
|
|
with open(f"{name}.csv", mode="w", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
header_row = [
|
|
"Handle",
|
|
"Title",
|
|
"Body (HTML)",
|
|
"Vendor",
|
|
"Product Category",
|
|
"Type",
|
|
"Tags",
|
|
"Published",
|
|
"Option1 Name",
|
|
"Option1 Value",
|
|
"Option2 Name",
|
|
"Option2 Value",
|
|
"Option3 Name",
|
|
"Option3 Value",
|
|
"Variant SKU",
|
|
"Variant Price",
|
|
"Variant Compare At Price",
|
|
"Image Src",
|
|
"Image Alt Text",
|
|
"Variant Image",
|
|
"Variant Weight",
|
|
"Variant Inventory Qty",
|
|
"Variant Barcode"
|
|
]
|
|
writer.writerow(header_row)
|
|
|
|
def generate_csvs(name: str) -> None:
|
|
"""Generates CSV files with the scraped data from a Shopify store.
|
|
Args:
|
|
name: The name of the output CSV file."""
|
|
|
|
current_csv_rows = 1
|
|
current_csv_num = 1
|
|
with open(f"{name}_{current_csv_num}.csv", mode="w", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
header_row = [
|
|
"Handle",
|
|
"Title",
|
|
"Body (HTML)",
|
|
"Vendor",
|
|
"Product Category",
|
|
"Type",
|
|
"Tags",
|
|
"Published",
|
|
"Option1 Name",
|
|
"Option1 Value",
|
|
"Option2 Name",
|
|
"Option2 Value",
|
|
"Option3 Name",
|
|
"Option3 Value",
|
|
"Variant SKU",
|
|
"Variant Price",
|
|
"Variant Compare At Price",
|
|
"Image Src",
|
|
"Image Alt Text",
|
|
"Variant Image",
|
|
"Variant Weight",
|
|
"Variant Inventory Qty",
|
|
"Variant Barcode"
|
|
]
|
|
writer.writerow(header_row)
|
|
|
|
# Stream read from the jsonl file that contains the scraped data.
|
|
with open(f"{name}.jsonl", mode="r", encoding="utf-8") as jsonl_file:
|
|
for line in jsonl_file:
|
|
product: dict = json.loads(line.strip()) # Convert each line in the jsonl file into a Python dict
|
|
|
|
# Ensure the CSV does not exceed the 15 MB size limit or the 50,000 row limit for Shopify import.
|
|
# If it reached near the limit, create a new CSV.
|
|
if current_csv_rows > 40_000 or current_csv_rows + len(product["other_variants"]) + len(product["other_product_images"]) > 40_000:
|
|
current_csv_num += 1
|
|
current_csv_rows = 1
|
|
create_empty_csv(f"{name}_{current_csv_num}")
|
|
|
|
product_rows = [
|
|
{key: val for key, val in product.items() if key not in ["other_variants", "other_product_images"]},
|
|
*product["other_variants"],
|
|
*product["other_product_images"]
|
|
]
|
|
pd.DataFrame(product_rows).to_csv(f"{name}_{current_csv_num}.csv", mode="a", encoding="utf-8", index=False, header=False)
|
|
current_csv_rows += ((len(product["other_variants"]) + len(product["other_product_images"])) + 1)
|
|
|
|
def parse_product(product: dict) -> dict:
|
|
"""Produces Shopify-import-CSV-compatible product data from any raw product data given.
|
|
Args:
|
|
product: A dictionary of raw product data obtained from the public Shopify API."""
|
|
|
|
parsed_product = {
|
|
"Handle": "",
|
|
"Title": "",
|
|
"Body (HTML)": "",
|
|
"Vendor": "",
|
|
"Product Category": "",
|
|
"Type": "",
|
|
"Tags": "",
|
|
"Published": True,
|
|
"Option1 Name": "",
|
|
"Option1 Value": "",
|
|
"Option2 Name": "",
|
|
"Option2 Value": "",
|
|
"Option3 Name": "",
|
|
"Option3 Value": "",
|
|
"Variant SKU": "",
|
|
"Variant Price": "",
|
|
"Variant Compare At Price": "",
|
|
"Image Src": "",
|
|
"Image Alt Text": "",
|
|
"Variant Image": "",
|
|
"Variant Weight": 0,
|
|
"Variant Inventory Qty": 0,
|
|
"Variant Barcode": "",
|
|
"other_variants": [],
|
|
"other_product_images": []
|
|
}
|
|
|
|
parsed_product["Handle"] = product["handle"]
|
|
parsed_product["Title"] = product["title"]
|
|
parsed_product["Body (HTML)"] = product.get("body_html", "")
|
|
parsed_product["Vendor"] = product["vendor"]
|
|
parsed_product["Product Category"] = ""
|
|
parsed_product["Product Category"] = product.get("product_type", "")
|
|
parsed_product["Tags"] = f'"{', '.join(product['tags'])}"'
|
|
parsed_product["Published"] = True
|
|
main_images = [image["src"] for image in product["images"]]
|
|
|
|
for optin_num, optn in enumerate(product["options"], 1):
|
|
parsed_product[f"Option{optin_num} Name"] = optn["name"]
|
|
|
|
variants = product["variants"]
|
|
|
|
parsed_product["Option1 Value"] = variants[0]["option1"]
|
|
parsed_product["Option2 Value"] = variants[0]["option2"]
|
|
parsed_product["Option3 Value"] = variants[0]["option3"]
|
|
parsed_product["Variant SKU"] = variants[0].get("sku", "")
|
|
parsed_product["Variant Price"] = variants[0]["price"]
|
|
parsed_product["Variant Compare At Price"] = variants[0].get("compare_at_price", "")
|
|
parsed_product["Image Src"] = main_images[0] if main_images else ""
|
|
parsed_product["Image Alt Text"] = ""
|
|
parsed_product["Variant Image"] = main_images[0] if main_images else ""
|
|
parsed_product["Variant Weight"] = variants[0].get("grams", 0)
|
|
|
|
if variants[0]["available"]:
|
|
parsed_product["Variant Inventory Qty"] = 1
|
|
else:
|
|
parsed_product["Variant Inventory Qty"] = 0
|
|
|
|
|
|
|
|
for variant in variants[1:]:
|
|
variant_data = {
|
|
"Handle": parsed_product["Handle"],
|
|
"Title": "",
|
|
"Body (HTML)": "",
|
|
"Vendor": "",
|
|
"Product Category": "",
|
|
"Type": "",
|
|
"Tags": "",
|
|
"Published": True,
|
|
"Option1 Name": parsed_product["Option1 Name"],
|
|
"Option1 Value": "",
|
|
"Option2 Name": parsed_product["Option2 Name"],
|
|
"Option2 Value": "",
|
|
"Option3 Name": parsed_product["Option3 Name"],
|
|
"Option3 Value": "",
|
|
"Variant SKU": "",
|
|
"Variant Price": "",
|
|
"Variant Compare At Price": "",
|
|
"Image Src": "",
|
|
"Image Alt Text": "",
|
|
"Variant Image": "",
|
|
"Variant Weight": 0,
|
|
"Variant Inventory Qty": 0,
|
|
"Variant Barcode": ""
|
|
}
|
|
|
|
variant_data["Option1 Value"] = variant["option1"]
|
|
variant_data["Option2 Value"] = variant["option2"]
|
|
variant_data["Option3 Value"] = variant["option3"]
|
|
|
|
variant_data["Variant SKU"] = variant.get("sku", "")
|
|
variant_data["Variant Price"] = variant["price"]
|
|
variant_data["Variant Compare At Price"] = variant.get("compare_at_price", "")
|
|
|
|
try:
|
|
variant_data["Variant Image"] = variant.get("featured_image", {}).get("src", "")
|
|
except AttributeError:
|
|
pass
|
|
|
|
variant_data["Variant Weight"] = variant.get("grams", 0)
|
|
|
|
if variant["available"]:
|
|
variant_data["Variant Inventory Qty"] = 1
|
|
else:
|
|
variant_data["Variant Inventory Qty"] = 0
|
|
|
|
|
|
|
|
parsed_product["other_variants"].append(variant_data)
|
|
|
|
for image in product["images"]:
|
|
if not image["variant_ids"] and image["src"] != parsed_product["Image Src"]:
|
|
parsed_product["other_product_images"].append(
|
|
{
|
|
"Handle": parsed_product["Handle"],
|
|
"Title": "",
|
|
"Body (HTML)": "",
|
|
"Vendor": "",
|
|
"Product Category": "",
|
|
"Type": "",
|
|
"Tags": "",
|
|
"Published": "",
|
|
"Option1 Name": "",
|
|
"Option1 Value": "",
|
|
"Option2 Name": "",
|
|
"Option2 Value": "",
|
|
"Option3 Name": "",
|
|
"Option3 Value": "",
|
|
"Variant SKU": "",
|
|
"Variant Price": "",
|
|
"Variant Compare At Price": "",
|
|
"Image Src": image["src"],
|
|
"Image Alt Text": "",
|
|
"Variant Image": "",
|
|
"Variant Weight": "",
|
|
"Variant Inventory Qty": "",
|
|
"Variant Barcode": ""
|
|
}
|
|
)
|
|
|
|
return parsed_product
|
|
|
|
async def get_total_products_count(scrape_url: str, session: AsyncSession) -> int:
|
|
"""Gets the total number of products in the Shopify store. Returns 25001 for stores with more than 25k products.
|
|
Args:
|
|
scrape_url: The URL of the working /products.json endpoint of the Shopify store.
|
|
session: A reference of the main scraping session."""
|
|
|
|
delay_time = 1
|
|
max_attempts = 10
|
|
|
|
for attempt in range(1, max_attempts + 1):
|
|
try:
|
|
res = await session.get(scrape_url.replace("/products.json", "/meta.json"))
|
|
res.raise_for_status()
|
|
data = res.json()
|
|
except (HTTPError, JSONDecodeError, Timeout):
|
|
if attempt == 10:
|
|
raise
|
|
|
|
sleep_time = min(delay_time * 2 ** attempt + random.uniform(0.1, 2), 45) # Exponential back-off with a 45-second cap.
|
|
await asyncio.sleep(sleep_time)
|
|
else:
|
|
break
|
|
|
|
total_products = data["published_products_count"]
|
|
|
|
return total_products
|
|
|
|
@limit_concurrency(limit=30)
|
|
async def get_page_products(scrape_url: str, page: int, session: AsyncSession) -> list:
|
|
"""Returns raw product data from any given API page.
|
|
Args:
|
|
scrape_url: The specific API url (e.g. https://some-store.myshopify.com/products.json).
|
|
page: The pagination API query paramater.
|
|
session: A reference of the main scraping session."""
|
|
|
|
delay_time = 1
|
|
max_attempts = 10
|
|
parameters = {"page": page, "limit": 250}
|
|
|
|
await asyncio.sleep(random.uniform(0.1, 1.5)) # Random small jitter
|
|
|
|
for attempt in range(1, max_attempts + 1):
|
|
try:
|
|
res = await session.get(scrape_url, params=parameters)
|
|
res.raise_for_status()
|
|
data = res.json()
|
|
except (HTTPError, JSONDecodeError, Timeout):
|
|
if attempt == 10:
|
|
raise
|
|
|
|
sleep_time = min(delay_time * 2 ** attempt + random.uniform(0.1, 2), 45) # Exponential back-off with a 45-second cap.
|
|
await asyncio.sleep(sleep_time)
|
|
else:
|
|
break
|
|
|
|
|
|
return data["products"]
|
|
|
|
async def get_endpoint_products(scrape_info: dict, session: AsyncSession) -> AsyncGenerator[dict, None, None]:
|
|
"""Scrapes all available products from a given endpoint.
|
|
Args:
|
|
scrape_info: A dictionary containing necessary info such as the url of the endpoint, total products count of the store, and collection info (if necessary).
|
|
session: A reference of the main scraping session."""
|
|
|
|
scrape_url = scrape_info["url"]
|
|
total_products = scrape_info["total_products"]
|
|
collection = scrape_info["collection"]
|
|
|
|
if collection:
|
|
num_pages = collection["products_count"] // 250 + (1 if collection["products_count"] % 250 > 0 else 0)
|
|
else:
|
|
num_pages = total_products // 250 + (1 if total_products % 250 > 0 else 0)
|
|
|
|
tasks = [get_page_products(scrape_url if not collection else collection["url"], page_num, session) for page_num in range(1, num_pages + 1 if num_pages <= 100 else 101)]
|
|
for future in asyncio.as_completed(tasks):
|
|
for product in await future:
|
|
yield parse_product(product)
|
|
|
|
|
|
|
|
async def get_collections(scrape_url: str, session: AsyncSession) -> list:
|
|
"""Returns a list of all collections in the store with at least one listed product.
|
|
Args:
|
|
scrape_url: The URL of the valid /products.json endpoint of the store.
|
|
session: A reference of the main scraping session."""
|
|
|
|
parameters = {
|
|
"page": 1,
|
|
"limit": 250
|
|
}
|
|
collections_url = scrape_url.replace("/products.json", "/collections.json")
|
|
|
|
collections_data = []
|
|
delay_time = 1
|
|
max_attempts = 10
|
|
|
|
while parameters["page"] <= 100:
|
|
for attempt in range(1, max_attempts + 1):
|
|
try:
|
|
res = await session.get(collections_url, params=parameters)
|
|
res.raise_for_status()
|
|
data = res.json()
|
|
except (HTTPError, JSONDecodeError, Timeout):
|
|
if attempt == 10:
|
|
raise
|
|
|
|
sleep_time = min(delay_time * 2 ** attempt + random.uniform(0.1, 2), 45)
|
|
await asyncio.sleep(sleep_time)
|
|
else:
|
|
break
|
|
|
|
|
|
collections = data["collections"]
|
|
|
|
if not collections:
|
|
break
|
|
|
|
for collection in collections:
|
|
if collection["handle"] not in [c["url"].split("/")[-1].split(".json")[0] for c in collections_data] and collection["products_count"] > 0:
|
|
collections_data.append(
|
|
{
|
|
"url": collections_url.split("/collections.json")[0] + f"/collections/{collection["handle"]}/products.json",
|
|
"products_count": collection["products_count"]
|
|
}
|
|
)
|
|
|
|
parameters["page"] += 1
|
|
await asyncio.sleep(0.3)
|
|
|
|
return collections_data
|
|
|
|
|
|
|
|
async def get_scrape_url(store_url: str, session: AsyncSession) -> str:
|
|
"""Returns the valid /products.json URL of a Shopify store.
|
|
Args:
|
|
store_url: The normal user-facing URL of the Shopify store.
|
|
session: A reference of the main scraping session"""
|
|
|
|
base_url = "https://" + store_url.split("//")[-1].split("/")[0].split("?")[0]
|
|
products_endpoint = base_url + "/products.json"
|
|
|
|
try:
|
|
res = await session.get(products_endpoint)
|
|
res.raise_for_status()
|
|
res.json()
|
|
except HTTPError:
|
|
products_endpoint = None
|
|
except Exception:
|
|
products_endpoint = None
|
|
else:
|
|
if "products" in res.json():
|
|
return products_endpoint
|
|
else:
|
|
products_endpoint = None
|
|
|
|
if not products_endpoint:
|
|
try:
|
|
res = await session.get(base_url)
|
|
|
|
# Use regex to find the <STORE>.myshopify.com/products.json URL of the Shopify store in case the normal /products.json is blocked.
|
|
public_store_name = list(set(re.findall(pattern=r'\b([a-zA-Z0-9-]+)\.myshopify\.com\b', string=res.text)))[0]
|
|
except IndexError:
|
|
return ""
|
|
except Exception:
|
|
return ""
|
|
else:
|
|
return f"https://{public_store_name}.myshopify.com/products.json"
|
|
|
|
|
|
async def initiate_scraping_operation(store_url: str, output_csv_name: str="shopify") -> None:
|
|
"""The main scraping function.
|
|
Args:
|
|
store_url: The normal user-facing URL of the Shopify store.
|
|
output_csv_name: The user's desired name for the output CSV file."""
|
|
|
|
scrape_count = 0
|
|
scraped_handles = []
|
|
|
|
if not output_csv_name:
|
|
output_csv_name = "shopify"
|
|
|
|
async with AsyncSession(impersonate="firefox", timeout=10) as scraping_session:
|
|
print(f"Initializing scraping operation...\n")
|
|
scrape_url = await get_scrape_url(store_url=store_url, session=scraping_session)
|
|
total_products = await get_total_products_count(scrape_url=scrape_url, session=scraping_session)
|
|
|
|
|
|
# Implement the /products.json strategy for shops with less than or equal to 25,000 products.
|
|
if total_products <= 25_000:
|
|
scraping_info = {
|
|
"url": scrape_url,
|
|
"total_products": total_products,
|
|
"collection": {}
|
|
}
|
|
|
|
with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file:
|
|
start_time = time.perf_counter()
|
|
async for product in get_endpoint_products(scraping_info, scraping_session):
|
|
if product["Handle"] not in scraped_handles:
|
|
scraped_handles.append(product["Handle"])
|
|
jsonl_file.write(json.dumps(product) + "\n")
|
|
scrape_count += 1
|
|
|
|
elapsed_secs = elapsed_time(since=start_time)
|
|
elapsed_secs_display = elapsed_secs % 60
|
|
elapsed_mins = (elapsed_secs % 3600) // 60
|
|
elapsed_hrs = elapsed_secs // 3600
|
|
print(f"\rScrape Count: {scrape_count}/{total_products} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True)
|
|
else: # Implement the collections strategy for stores with more than 25,000 products.
|
|
collections = await get_collections(scrape_url=scrape_url, session=scraping_session)
|
|
start_time = time.perf_counter()
|
|
with open(f"{output_csv_name}.jsonl", mode="w", newline="", encoding="utf-8") as jsonl_file:
|
|
for collection_num, collection in enumerate(collections, 1):
|
|
scraping_info = {
|
|
"url": scrape_url,
|
|
"total_products": total_products,
|
|
"collection": {"url": collection["url"], "products_count": collection["products_count"]}
|
|
}
|
|
async for product in get_endpoint_products(scraping_info, scraping_session):
|
|
if product["Handle"] not in scraped_handles:
|
|
scraped_handles.append(product["Handle"])
|
|
jsonl_file.write(json.dumps(product) + "\n")
|
|
scrape_count += 1
|
|
|
|
elapsed_secs = elapsed_time(since=start_time)
|
|
elapsed_secs_display = elapsed_secs % 60
|
|
elapsed_mins = (elapsed_secs % 3600) // 60
|
|
elapsed_hrs = elapsed_secs // 3600
|
|
print(f"\rCollection: {collection_num}/{len(collections)} | Scrape Count: {scrape_count} | Elapsed Time: {elapsed_hrs:02}:{elapsed_mins:02}:{elapsed_secs_display:02}\033[K", end="", flush=True)
|
|
|
|
|
|
print(f"\n\nScraping Complete!\n")
|
|
|
|
print(f"\nGenerating CSV(s)...\n")
|
|
generate_csvs(name=output_csv_name)
|
|
input("CSV Generated Successfully!\n\nPress ENTER to return to the main menu.")
|
|
|
|
|
|
async def main() -> None:
|
|
"""The main function that handles the entire scraper."""
|
|
|
|
while True:
|
|
clear_screen()
|
|
print(f"{LOGO}")
|
|
print(f"{MENU_OPTIONS}\n")
|
|
|
|
try:
|
|
user_choice = int(input("Choose an option: ").strip())
|
|
except ValueError:
|
|
input("Invalid option. Press ENTER to retry.")
|
|
continue
|
|
|
|
if user_choice == 3:
|
|
sys.exit(0)
|
|
|
|
if user_choice not in [1, 2]:
|
|
input("Invalid option. Press ENTER to retry.")
|
|
continue
|
|
elif user_choice == 1:
|
|
shopify_store_url = input("Store URL: ").strip().lower()
|
|
output_name = input("Type a name for the output CSV: ").lower().strip().split(".")[0].replace("/", "").replace('\\', "").replace("+", "").replace("-", "").replace(" ", "_")
|
|
clear_screen()
|
|
print(f"{LOGO}\n")
|
|
await initiate_scraping_operation(store_url=shopify_store_url, output_csv_name=output_name)
|
|
continue
|
|
elif user_choice == 2:
|
|
clear_screen()
|
|
print(f"{LOGO}\n")
|
|
print("Shopify Scraper is your go-to tool for scraping ANY shopify store on the internet.")
|
|
print("It reliably and quickly extracts the entire product catalog of any shopify store and generates Shopify-compatible, import-ready CSVs.")
|
|
print("All you have to do is provide the Shopify store URL.")
|
|
print("-------------------------------------------------------\n")
|
|
print("Developed By: Dr. Omar Abdelhamid, a 5th-Year Medical Student at KasrAlainy Medical School as of 2026.")
|
|
print("GitHub Profile: https://github.com/Coding-Doctor-Omar")
|
|
print("LinkedIn Profile: https://www.linkedin.com/in/dr-omar-abdelhamid-37ab6b366/\n")
|
|
input("Press ENTER to go back to the main menu.")
|
|
continue
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|