add url params for SERPAPI/SERPER search engine

This commit is contained in:
shenchucheng 2024-08-12 16:40:27 +08:00
parent f84fe76d28
commit 9deb9f3512
2 changed files with 14 additions and 20 deletions

View file

@ -6,7 +6,7 @@
@File : search_engine_serpapi.py
"""
import warnings
from typing import Any, Dict, Optional, Tuple
from typing import Any, Dict, Optional
import aiohttp
from pydantic import BaseModel, ConfigDict, Field, model_validator
@ -24,6 +24,7 @@ class SerpAPIWrapper(BaseModel):
"hl": "en",
}
)
url: str = "https://serpapi.com/search"
aiosession: Optional[aiohttp.ClientSession] = None
proxy: Optional[str] = None
@ -49,22 +50,18 @@ class SerpAPIWrapper(BaseModel):
async def results(self, query: str, max_results: int) -> dict:
"""Use aiohttp to run query through SerpAPI and return the results async."""
def construct_url_and_params() -> Tuple[str, Dict[str, str]]:
params = self.get_params(query)
params["source"] = "python"
params["num"] = max_results
params["output"] = "json"
url = "https://serpapi.com/search"
return url, params
params = self.get_params(query)
params["source"] = "python"
params["num"] = max_results
params["output"] = "json"
url, params = construct_url_and_params()
if not self.aiosession:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, proxy=self.proxy) as response:
async with session.get(self.url, params=params, proxy=self.proxy) as response:
response.raise_for_status()
res = await response.json()
else:
async with self.aiosession.get(url, params=params, proxy=self.proxy) as response:
async with self.aiosession.get(self.url, params=params, proxy=self.proxy) as response:
response.raise_for_status()
res = await response.json()

View file

@ -7,7 +7,7 @@
"""
import json
import warnings
from typing import Any, Dict, Optional, Tuple
from typing import Any, Dict, Optional
import aiohttp
from pydantic import BaseModel, ConfigDict, Field, model_validator
@ -17,6 +17,7 @@ class SerperWrapper(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
api_key: str
url: str = "https://google.serper.dev/search"
payload: dict = Field(default_factory=lambda: {"page": 1, "num": 10})
aiosession: Optional[aiohttp.ClientSession] = None
proxy: Optional[str] = None
@ -46,20 +47,16 @@ class SerperWrapper(BaseModel):
async def results(self, queries: list[str], max_results: int = 8) -> dict:
"""Use aiohttp to run query through Serper and return the results async."""
def construct_url_and_payload_and_headers() -> Tuple[str, Dict[str, str]]:
payloads = self.get_payloads(queries, max_results)
url = "https://google.serper.dev/search"
headers = self.get_headers()
return url, payloads, headers
payloads = self.get_payloads(queries, max_results)
headers = self.get_headers()
url, payloads, headers = construct_url_and_payload_and_headers()
if not self.aiosession:
async with aiohttp.ClientSession() as session:
async with session.post(url, data=payloads, headers=headers, proxy=self.proxy) as response:
async with session.post(self.url, data=payloads, headers=headers, proxy=self.proxy) as response:
response.raise_for_status()
res = await response.json()
else:
async with self.aiosession.get.post(url, data=payloads, headers=headers, proxy=self.proxy) as response:
async with self.aiosession.get.post(self.url, data=payloads, headers=headers, proxy=self.proxy) as response:
response.raise_for_status()
res = await response.json()