fix: refined JSON parsing and cookie refreshing logic

* Fix the expired cookies issue.

* Fixes the JSON hijacking protection issue.

* Fixes the JSON hijacking protection issue.

* Fixes the JSON hijacking protection issue.

* Fixes the issue where a local variable might be referenced before assignment.

* Fixes the JSON hijacking protection issue.

* refactor: PR#158
This commit is contained in:
Lưu Quang Vũ
2025-11-20 11:15:53 +07:00
committed by GitHub
parent d96606aa4e
commit b2413b091f
4 changed files with 205 additions and 85 deletions

2
.gitignore vendored
View File

@@ -201,4 +201,4 @@ Temporary Items
.apdisk
# Temporary files
temp/
.temp/

View File

@@ -10,31 +10,33 @@ from httpx import AsyncClient, ReadTimeout, Response
from .components import GemMixin
from .constants import Endpoint, ErrorCode, Headers, Model
from .exceptions import (
AuthError,
APIError,
ImageGenerationError,
TimeoutError,
AuthError,
GeminiError,
UsageLimitExceeded,
ImageGenerationError,
ModelInvalid,
TemporarilyBlocked,
TimeoutError,
UsageLimitExceeded,
)
from .types import (
WebImage,
GeneratedImage,
Candidate,
ModelOutput,
Gem,
GeneratedImage,
ModelOutput,
RPCData,
WebImage,
)
from .utils import (
upload_file,
extract_json_from_response,
get_access_token,
get_nested_value,
logger,
parse_file_name,
rotate_1psidts,
get_access_token,
running,
rotate_tasks,
logger,
running,
upload_file,
)
@@ -42,7 +44,7 @@ class GeminiClient(GemMixin):
"""
Async httpx client interface for gemini.google.com.
`secure_1psid` must be provided unless the optional dependency `browser-cookie3` is installed and
`secure_1psid` must be provided unless the optional dependency `browser-cookie3` is installed, and
you have logged in to google.com in your local browser.
Parameters
@@ -203,6 +205,7 @@ class GeminiClient(GemMixin):
if self.close_task:
self.close_task.cancel()
self.close_task = None
self.close_task = asyncio.create_task(self.close(self.close_delay))
async def start_auto_refresh(self) -> None:
@@ -211,18 +214,25 @@ class GeminiClient(GemMixin):
"""
while True:
new_1psidts: str | None = None
try:
new_1psidts = await rotate_1psidts(self.cookies, self.proxy)
except AuthError:
if task := rotate_tasks.get(self.cookies["__Secure-1PSID"]):
if task := rotate_tasks.get(self.cookies.get("__Secure-1PSID", "")):
task.cancel()
logger.warning(
"Failed to refresh cookies. Background auto refresh task canceled."
"AuthError: Failed to refresh cookies. Auto refresh task canceled."
)
return
except Exception as exc:
logger.warning(f"Unexpected error while refreshing cookies: {exc}")
logger.debug(f"Cookies refreshed. New __Secure-1PSIDTS: {new_1psidts}")
if new_1psidts:
self.cookies["__Secure-1PSIDTS"] = new_1psidts
if self.running:
self.client.cookies.set("__Secure-1PSIDTS", new_1psidts)
logger.debug("Cookies refreshed. New __Secure-1PSIDTS applied.")
await asyncio.sleep(self.refresh_interval)
@running(retry=2)
@@ -335,18 +345,24 @@ class GeminiClient(GemMixin):
f"Failed to generate contents. Request failed with status code {response.status_code}"
)
else:
try:
response_json = json.loads(response.text.split("\n")[2])
response_json: list[Any] = []
body: list[Any] = []
body_index = 0
try:
response_json = extract_json_from_response(response.text)
body = None
body_index = 0
for part_index, part in enumerate(response_json):
try:
main_part = json.loads(part[2])
if main_part[4]:
body_index, body = part_index, main_part
part_body = get_nested_value(part, [2])
if not part_body:
continue
part_json = json.loads(part_body)
if get_nested_value(part_json, [4]):
body_index, body = part_index, part_json
break
except (IndexError, TypeError, ValueError):
except (TypeError, ValueError):
continue
if not body:
@@ -355,7 +371,8 @@ class GeminiClient(GemMixin):
await self.close()
try:
match ErrorCode(response_json[0][5][2][0][1][0]):
error_code = get_nested_value(response_json, [0, 5, 2, 0, 1, 0], -1)
match ErrorCode(error_code):
case ErrorCode.USAGE_LIMIT_EXCEEDED:
raise UsageLimitExceeded(
f"Failed to generate contents. Usage limit of {model.model_name} model has exceeded. Please try switching to another model."
@@ -385,44 +402,51 @@ class GeminiClient(GemMixin):
)
try:
candidates = []
for candidate_index, candidate in enumerate(body[4]):
text = candidate[1][0]
candidate_list: list[Any] = get_nested_value(body, [4], [])
output_candidates: list[Candidate] = []
for candidate_index, candidate in enumerate(candidate_list):
rcid = get_nested_value(candidate, [0])
if not rcid:
continue # Skip candidate if it has no rcid
# Text output and thoughts
text = get_nested_value(candidate, [1, 0], "")
if re.match(
r"^http://googleusercontent\.com/card_content/\d+", text
):
text = candidate[22] and candidate[22][0] or text
text = get_nested_value(candidate, [22, 0]) or text
try:
thoughts = candidate[37][0][0]
except (TypeError, IndexError):
thoughts = None
thoughts = get_nested_value(candidate, [37, 0, 0])
web_images = (
candidate[12]
and candidate[12][1]
and [
# Web images
web_images = []
for web_img_data in get_nested_value(candidate, [12, 1], []):
url = get_nested_value(web_img_data, [0, 0, 0])
if not url:
continue
web_images.append(
WebImage(
url=web_image[0][0][0],
title=web_image[7][0],
alt=web_image[0][4],
url=url,
title=get_nested_value(web_img_data, [7, 0], ""),
alt=get_nested_value(web_img_data, [0, 4], ""),
proxy=self.proxy,
)
for web_image in candidate[12][1]
]
or []
)
)
# Generated images
generated_images = []
if candidate[12] and candidate[12][7] and candidate[12][7][0]:
if get_nested_value(candidate, [12, 7, 0]):
img_body = None
for img_part_index, part in enumerate(response_json):
if img_part_index < body_index:
continue
try:
img_part = json.loads(part[2])
if img_part[4][candidate_index][12][7][0]:
if get_nested_value(
img_part, [4, candidate_index, 12, 7, 0]
):
img_body = img_part
break
except (IndexError, TypeError, ValueError):
@@ -434,57 +458,73 @@ class GeminiClient(GemMixin):
"If the error persists and is caused by the package, please report it on GitHub."
)
img_candidate = img_body[4][candidate_index]
img_candidate = get_nested_value(
img_body, [4, candidate_index], []
)
text = re.sub(
r"http://googleusercontent\.com/image_generation_content/\d+",
"",
img_candidate[1][0],
).rstrip()
if finished_text := get_nested_value(
img_candidate, [1, 0]
): # Only overwrite if new text is returned after image generation
text = re.sub(
r"http://googleusercontent\.com/image_generation_content/\d+",
"",
finished_text,
).rstrip()
generated_images = [
GeneratedImage(
url=generated_image[0][3][3],
title=(
f"[Generated Image {generated_image[3][6]}]"
if generated_image[3][6]
else "[Generated Image]"
),
alt=(
generated_image[3][5][image_index]
if generated_image[3][5]
and len(generated_image[3][5]) > image_index
else (
generated_image[3][5][0]
if generated_image[3][5]
else ""
)
),
proxy=self.proxy,
cookies=self.cookies,
for img_index, gen_img_data in enumerate(
get_nested_value(img_candidate, [12, 7, 0], [])
):
url = get_nested_value(gen_img_data, [0, 3, 3])
if not url:
continue
img_num = get_nested_value(gen_img_data, [3, 6])
title = (
f"[Generated Image {img_num}]"
if img_num
else "[Generated Image]"
)
for image_index, generated_image in enumerate(
img_candidate[12][7][0]
)
]
candidates.append(
alt_list = get_nested_value(gen_img_data, [3, 5], [])
alt = (
get_nested_value(alt_list, [img_index])
or get_nested_value(alt_list, [0])
or ""
)
generated_images.append(
GeneratedImage(
url=url,
title=title,
alt=alt,
proxy=self.proxy,
cookies=self.cookies,
)
)
output_candidates.append(
Candidate(
rcid=candidate[0],
rcid=rcid,
text=text,
thoughts=thoughts,
web_images=web_images,
generated_images=generated_images,
)
)
if not candidates:
if not output_candidates:
raise GeminiError(
"Failed to generate contents. No output data found in response."
)
output = ModelOutput(metadata=body[1], candidates=candidates)
except (TypeError, IndexError):
logger.debug(f"Invalid response: {response.text}")
output = ModelOutput(
metadata=get_nested_value(body, [1], []),
candidates=output_candidates,
)
except (TypeError, IndexError) as e:
logger.debug(
f"{type(e).__name__}: {e}; Invalid response structure: {response.text}"
)
raise APIError(
"Failed to parse response body. Data structure is invalid."
)

View File

@@ -3,11 +3,12 @@
from asyncio import Task
from .decorators import running
from .upload_file import upload_file, parse_file_name
from .rotate_1psidts import rotate_1psidts
from .get_access_token import get_access_token
from .load_browser_cookies import load_browser_cookies
from .logger import logger, set_log_level
from .parsing import extract_json_from_response, get_nested_value
from .rotate_1psidts import rotate_1psidts
from .upload_file import upload_file, parse_file_name
rotate_tasks: dict[str, Task] = {}

View File

@@ -0,0 +1,79 @@
from typing import Any
import orjson as json
from .logger import logger
def get_nested_value(data: list, path: list[int], default: Any = None) -> Any:
"""
Safely get a value from a nested list by a sequence of indices.
Parameters
----------
data: `list`
The nested list to traverse.
path: `list[int]`
A list of indices representing the path to the desired value.
default: `Any`, optional
The default value to return if the path is not found.
"""
current = data
for i, key in enumerate(path):
try:
current = current[key]
except (IndexError, TypeError, KeyError) as e:
current_repr = repr(current)
if len(current_repr) > 200:
current_repr = f"{current_repr[:197]}..."
logger.debug(
f"{type(e).__name__}: parsing failed at path {path} (index {i}, key '{key}') "
f"while attempting to get value from `{current_repr}`"
)
return default
if current is None and default is not None:
return default
return current
def extract_json_from_response(text: str) -> list:
"""
Clean and extract the JSON content from a Google API response.
Parameters
----------
text: `str`
The raw response text from a Google API.
Returns
-------
`list`
The extracted JSON array or object (should be an array).
Raises
------
`TypeError`
If the input is not a string.
`ValueError`
If no JSON object is found or the response is empty.
"""
if not isinstance(text, str):
raise TypeError(
f"Input text is expected to be a string, got {type(text).__name__} instead."
)
# Find the first line which is valid JSON
for line in text.splitlines():
try:
return json.loads(line.strip())
except json.JSONDecodeError:
continue
# If no JSON is found, raise ValueError
raise ValueError("Could not find a valid JSON object or array in the response.")