TradingAgents/venv/lib/python3.10/site-packages/curl_cffi/curl.py

613 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import re
import struct
import sys
import warnings
from http.cookies import SimpleCookie
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, Optional, Union, cast
import certifi
from ._wrapper import ffi, lib
from .const import CurlECode, CurlHttpVersion, CurlInfo, CurlOpt, CurlWsFlag
from .utils import CurlCffiWarning
DEFAULT_CACERT = certifi.where()
REASON_PHRASE_RE = re.compile(rb"HTTP/\d\.\d [0-9]{3} (.*)")
STATUS_LINE_RE = re.compile(rb"HTTP/(\d\.\d) ([0-9]{3}) (.*)")
if TYPE_CHECKING:
class CurlWsFrame:
age: int
flags: int
offset: int
bytesleft: int
len: int
class CurlError(Exception):
"""Base exception for curl_cffi package"""
def __init__(self, msg, code: Union[CurlECode, Literal[0]] = 0, *args, **kwargs):
super().__init__(msg, *args, **kwargs)
self.code: Union[CurlECode, Literal[0]] = code
CURLINFO_TEXT = 0
CURLINFO_HEADER_IN = 1
CURLINFO_HEADER_OUT = 2
CURLINFO_DATA_IN = 3
CURLINFO_DATA_OUT = 4
CURLINFO_SSL_DATA_IN = 5
CURLINFO_SSL_DATA_OUT = 6
CURL_WRITEFUNC_PAUSE = 0x10000001
CURL_WRITEFUNC_ERROR = 0xFFFFFFFF
@ffi.def_extern()
def debug_function(curl, type_: int, data, size: int, clientp) -> int:
"""ffi callback for curl debug info"""
callback = ffi.from_handle(clientp)
text = ffi.buffer(data, size)[:]
callback(type_, text)
return 0
def bytes_to_hex(b: bytes, uppercase: bool = False) -> str:
"""
Convert a bytes object to a space-separated hex string, e.g. "0a ff 3c".
If uppercase=True, letters will be AF instead of af.
"""
fmt = "{:02X}" if uppercase else "{:02x}"
return " ".join(fmt.format(byte) for byte in b)
def debug_function_default(type_: int, data: bytes) -> None:
PREFIXES = {
CURLINFO_TEXT: "*",
CURLINFO_HEADER_IN: "<",
CURLINFO_HEADER_OUT: ">",
CURLINFO_DATA_IN: "< DATA",
CURLINFO_DATA_OUT: "> DATA",
CURLINFO_SSL_DATA_IN: "< SSL",
CURLINFO_SSL_DATA_OUT: "> SSL",
}
MAX_SHOW_BYTES = 40
prefix = PREFIXES.get(type_, "*")
# always show ssl data in binary format
if type_ in (CURLINFO_SSL_DATA_IN, CURLINFO_SSL_DATA_OUT):
hex_str = bytes_to_hex(data[:MAX_SHOW_BYTES])
postfix = "" if len(data) <= MAX_SHOW_BYTES else "..."
sys.stderr.write(f"{prefix} [{len(data)} bytes]: {hex_str}{postfix}\n")
else:
try:
text = data.decode("utf-8")
sys.stderr.write(f"{prefix} {text}")
if type_ not in (CURLINFO_TEXT, CURLINFO_HEADER_IN, CURLINFO_HEADER_OUT):
sys.stderr.write("\n")
except UnicodeDecodeError:
# Fallback to hex representation of first MAX_SHOW_BYTES bytes
hex_str = bytes_to_hex(data[:MAX_SHOW_BYTES])
postfix = "" if len(data) <= MAX_SHOW_BYTES else "..."
sys.stderr.write(f"{prefix} [{len(data)} bytes]: {hex_str}{postfix}\n")
@ffi.def_extern()
def buffer_callback(ptr, size, nmemb, userdata):
"""ffi callback for curl write function, directly writes to a buffer"""
# assert size == 1
buffer = ffi.from_handle(userdata)
buffer.write(ffi.buffer(ptr, nmemb)[:])
return nmemb * size
def ensure_int(s):
if not s:
return 0
return int(s)
@ffi.def_extern()
def write_callback(ptr, size, nmemb, userdata):
"""ffi callback for curl write function, calls the callback python function"""
# although similar enough to the function above, kept here for performance reasons
callback = ffi.from_handle(userdata)
wrote = callback(ffi.buffer(ptr, nmemb)[:])
wrote = ensure_int(wrote)
if wrote == CURL_WRITEFUNC_PAUSE or wrote == CURL_WRITEFUNC_ERROR: # noqa: SIM109
return wrote
# should make this an exception in future versions
if wrote != nmemb * size:
warnings.warn("Wrote bytes != received bytes.", CurlCffiWarning, stacklevel=2)
return nmemb * size
# Credits: @alexio777 on https://github.com/lexiforest/curl_cffi/issues/4
def slist_to_list(head) -> list[bytes]:
"""Converts curl slist to a python list."""
result = []
ptr = head
while ptr:
result.append(ffi.string(ptr.data))
ptr = ptr.next
lib.curl_slist_free_all(head)
return result
class Curl:
"""
Wrapper for ``curl_easy_*`` functions of libcurl.
"""
def __init__(self, cacert: str = "", debug: bool = False, handle=None) -> None:
"""
Parameters:
cacert: CA cert path to use, by default, certs from ``certifi`` are used.
debug: whether to show curl debug messages.
handle: a curl handle instance from ``curl_easy_init``.
"""
self._curl = handle if handle else lib.curl_easy_init()
self._headers = ffi.NULL
self._proxy_headers = ffi.NULL
self._resolve = ffi.NULL
self._cacert = cacert or DEFAULT_CACERT
self._is_cert_set = False
self._write_handle: Any = None
self._header_handle: Any = None
self._debug_handle: Any = None
self._body_handle: Any = None
# TODO: use CURL_ERROR_SIZE
self._error_buffer = ffi.new("char[]", 256)
self._debug = debug
self._set_error_buffer()
def _set_error_buffer(self) -> None:
ret = lib._curl_easy_setopt(self._curl, CurlOpt.ERRORBUFFER, self._error_buffer)
if ret != 0:
warnings.warn("Failed to set error buffer", CurlCffiWarning, stacklevel=2)
if self._debug:
self.debug()
def debug(self) -> None:
"""Set debug to True"""
self.setopt(CurlOpt.VERBOSE, 1)
self.setopt(CurlOpt.DEBUGFUNCTION, True)
def __del__(self) -> None:
self.close()
def _check_error(self, errcode: int, *args: Any) -> None:
error = self._get_error(errcode, *args)
if error is not None:
raise error
def _get_error(self, errcode: int, *args: Any):
if errcode != 0:
errmsg = ffi.string(self._error_buffer).decode(errors="backslashreplace")
action = " ".join([str(a) for a in args])
return CurlError(
f"Failed to {action}, curl: ({errcode}) {errmsg}. "
"See https://curl.se/libcurl/c/libcurl-errors.html first for more "
"details.",
code=cast(CurlECode, errcode),
)
def setopt(self, option: CurlOpt, value: Any) -> int:
"""Wrapper for ``curl_easy_setopt``.
Args:
option: option to set, using constants from CurlOpt enum
value: value to set, strings will be handled automatically
Returns:
0 if no error, see ``CurlECode``.
"""
input_option = {
# this should be int in curl, but cffi requires pointer for void*
# it will be convert back in the glue c code.
0: "long*",
10000: "char*",
20000: "void*",
30000: "int64_t*", # offset type
40000: "void*", # blob type
}
# print("option", option, "value", value)
# Convert value
value_type = input_option.get((option // 10000) * 10000)
if value_type == "long*" or value_type == "int64_t*":
c_value = ffi.new(value_type, value)
elif option == CurlOpt.WRITEDATA:
c_value = ffi.new_handle(value)
self._write_handle = c_value
lib._curl_easy_setopt(
self._curl, CurlOpt.WRITEFUNCTION, lib.buffer_callback
)
elif option == CurlOpt.HEADERDATA:
c_value = ffi.new_handle(value)
self._header_handle = c_value
lib._curl_easy_setopt(
self._curl, CurlOpt.HEADERFUNCTION, lib.buffer_callback
)
elif option == CurlOpt.WRITEFUNCTION:
c_value = ffi.new_handle(value)
self._write_handle = c_value
lib._curl_easy_setopt(self._curl, CurlOpt.WRITEFUNCTION, lib.write_callback)
option = CurlOpt.WRITEDATA
elif option == CurlOpt.HEADERFUNCTION:
c_value = ffi.new_handle(value)
self._header_handle = c_value
lib._curl_easy_setopt(
self._curl, CurlOpt.HEADERFUNCTION, lib.write_callback
)
option = CurlOpt.HEADERDATA
elif option == CurlOpt.DEBUGFUNCTION:
if value is True:
value = debug_function_default
c_value = ffi.new_handle(value)
self._debug_handle = c_value
lib._curl_easy_setopt(self._curl, CurlOpt.DEBUGFUNCTION, lib.debug_function)
option = CurlOpt.DEBUGDATA
elif value_type == "char*":
c_value = value.encode() if isinstance(value, str) else value
# Must keep a reference, otherwise may be GCed.
if option == CurlOpt.POSTFIELDS:
self._body_handle = c_value
else:
raise NotImplementedError(f"Option unsupported: {option}")
if option == CurlOpt.HTTPHEADER:
for header in value:
self._headers = lib.curl_slist_append(self._headers, header)
ret = lib._curl_easy_setopt(self._curl, option, self._headers)
elif option == CurlOpt.PROXYHEADER:
for proxy_header in value:
self._proxy_headers = lib.curl_slist_append(
self._proxy_headers, proxy_header
)
ret = lib._curl_easy_setopt(self._curl, option, self._proxy_headers)
elif option == CurlOpt.RESOLVE:
for resolve in value:
if isinstance(resolve, str):
resolve = resolve.encode()
self._resolve = lib.curl_slist_append(self._resolve, resolve)
ret = lib._curl_easy_setopt(self._curl, option, self._resolve)
else:
ret = lib._curl_easy_setopt(self._curl, option, c_value)
self._check_error(ret, "setopt", option, value)
if option == CurlOpt.CAINFO:
self._is_cert_set = True
return ret
def getinfo(self, option: CurlInfo) -> Union[bytes, int, float, list]:
"""Wrapper for ``curl_easy_getinfo``. Gets information in response after
curl.perform.
Parameters:
option: option to get info of, using constants from ``CurlInfo`` enum
Returns:
value retrieved from last perform.
"""
ret_option = {
0x100000: "char**",
0x200000: "long*",
0x300000: "double*",
0x400000: "struct curl_slist **",
0x500000: "long*",
0x600000: "int64_t*",
}
ret_cast_option = {
0x100000: ffi.string,
0x200000: int,
0x300000: float,
0x500000: int,
0x600000: int,
}
c_value = ffi.new(ret_option[option & 0xF00000])
ret = lib.curl_easy_getinfo(self._curl, option, c_value)
self._check_error(ret, "getinfo", option)
# cookielist and ssl_engines starts with 0x400000, see also: const.py
if option & 0xF00000 == 0x400000:
return slist_to_list(c_value[0])
if c_value[0] == ffi.NULL:
return b""
return ret_cast_option[option & 0xF00000](c_value[0])
def version(self) -> bytes:
"""Get the underlying libcurl version."""
return ffi.string(lib.curl_version())
def impersonate(self, target: str, default_headers: bool = True) -> int:
"""Set the browser type to impersonate.
Parameters:
target: browser to impersonate.
default_headers: whether to add default headers, like User-Agent.
Returns:
0 if no error.
"""
return lib.curl_easy_impersonate(
self._curl, target.encode(), int(default_headers)
)
def _ensure_cacert(self) -> None:
if not self._is_cert_set:
ret = self.setopt(CurlOpt.CAINFO, self._cacert)
self._check_error(ret, "set cacert")
ret = self.setopt(CurlOpt.PROXY_CAINFO, self._cacert)
self._check_error(ret, "set proxy cacert")
def perform(self, clear_headers: bool = True) -> None:
"""Wrapper for ``curl_easy_perform``, performs a curl request.
Parameters:
clear_headers: clear header slist used in this perform
Raises:
CurlError: if the perform was not successful.
"""
# make sure we set a cacert store
self._ensure_cacert()
# here we go
ret = lib.curl_easy_perform(self._curl)
try:
self._check_error(ret, "perform")
finally:
# cleaning
self.clean_after_perform(clear_headers)
def upkeep(self) -> int:
return lib.curl_easy_upkeep(self._curl)
def clean_after_perform(self, clear_headers: bool = True) -> None:
"""Clean up handles and buffers after ``perform``, called at the end of
``perform``."""
self._write_handle = None
self._header_handle = None
self._debug_handle = None
self._body_handle = None
if clear_headers:
if self._headers != ffi.NULL:
lib.curl_slist_free_all(self._headers)
self._headers = ffi.NULL
if self._proxy_headers != ffi.NULL:
lib.curl_slist_free_all(self._proxy_headers)
self._proxy_headers = ffi.NULL
def duphandle(self) -> Curl:
"""Wrapper for ``curl_easy_duphandle``.
This is not a full copy of entire curl object in python. For example, headers
handle is not copied, you have to set them again."""
new_handle = lib.curl_easy_duphandle(self._curl)
c = Curl(cacert=self._cacert, debug=self._debug, handle=new_handle)
return c
def reset(self) -> None:
"""Reset all curl options, wrapper for ``curl_easy_reset``."""
self._is_cert_set = False
if self._curl is not None:
lib.curl_easy_reset(self._curl)
self._set_error_buffer()
self._resolve = ffi.NULL
def parse_cookie_headers(self, headers: list[bytes]) -> SimpleCookie:
"""Extract ``cookies.SimpleCookie`` from header lines.
Parameters:
headers: list of headers in bytes.
Returns:
A parsed cookies.SimpleCookie instance.
"""
cookie: SimpleCookie = SimpleCookie()
for header in headers:
if header.lower().startswith(b"set-cookie: "):
cookie.load(header[12:].decode()) # len("set-cookie: ") == 12
return cookie
@staticmethod
def get_reason_phrase(status_line: bytes) -> bytes:
"""Extract reason phrase, like ``OK``, ``Not Found`` from response status
line."""
m = REASON_PHRASE_RE.match(status_line)
return m.group(1) if m else b""
@staticmethod
def parse_status_line(status_line: bytes) -> tuple[CurlHttpVersion, int, bytes]:
"""Parse status line.
Returns:
http_version, status_code, and reason phrase
"""
m = STATUS_LINE_RE.match(status_line)
if not m:
return CurlHttpVersion.V1_0, 0, b""
if m.group(1) == "2.0":
http_version = CurlHttpVersion.V2_0
elif m.group(1) == "1.1":
http_version = CurlHttpVersion.V1_1
elif m.group(1) == "1.0":
http_version = CurlHttpVersion.V1_0
else:
http_version = CurlHttpVersion.NONE
status_code = int(m.group(2))
reason = m.group(3)
return http_version, status_code, reason
def close(self) -> None:
"""Close and cleanup curl handle, wrapper for ``curl_easy_cleanup``."""
if self._curl:
lib.curl_easy_cleanup(self._curl)
self._curl = None
ffi.release(self._error_buffer)
self._resolve = ffi.NULL
def ws_recv(self, n: int = 1024) -> tuple[bytes, CurlWsFrame]:
"""Receive a frame from a websocket connection.
Args:
n: maximum data to receive.
Returns:
a tuple of frame content and curl frame meta struct.
Raises:
CurlError: if failed.
"""
buffer = ffi.new("char[]", n)
n_recv = ffi.new("size_t *")
p_frame = ffi.new("struct curl_ws_frame **")
ret = lib.curl_ws_recv(self._curl, buffer, n, n_recv, p_frame)
self._check_error(ret, "WS_RECV")
# Frame meta explained: https://curl.se/libcurl/c/curl_ws_meta.html
frame = p_frame[0]
return ffi.buffer(buffer)[: n_recv[0]], frame
def ws_send(self, payload: bytes, flags: CurlWsFlag = CurlWsFlag.BINARY) -> int:
"""Send data to a websocket connection.
Args:
payload: content to send.
flags: websocket flag to set for the frame, default: binary.
Returns:
0 if no error.
Raises:
CurlError: if failed.
"""
n_sent = ffi.new("size_t *")
buffer = ffi.from_buffer(payload)
ret = lib.curl_ws_send(self._curl, buffer, len(payload), n_sent, 0, flags)
self._check_error(ret, "WS_SEND")
return n_sent[0]
def ws_close(self, code: int = 1000, message: bytes = b"") -> int:
"""Close a websocket connection. Shorthand for :meth:`ws_send`
with close code and message. Note that to completely close the connection,
you must close the curl handle after this call with :meth:`close`.
Args:
code: close code.
message: close message.
Returns:
0 if no error.
Raises:
CurlError: if failed.
"""
return self.ws_send(struct.pack("!H", code) + message)
class CurlMime:
"""Wrapper for the ``curl_mime_`` API."""
def __init__(self, curl: Optional[Curl] = None):
"""
Args:
curl: Curl instance to use.
"""
self._curl = curl if curl else Curl()
self._form = lib.curl_mime_init(self._curl._curl)
def addpart(
self,
name: str,
*,
content_type: Optional[str] = None,
filename: Optional[str] = None,
local_path: Optional[Union[str, bytes, Path]] = None,
data: Optional[bytes] = None,
) -> None:
"""Add a mime part for a mutlipart html form.
Note: You can only use either local_path or data, not both.
Args:
name: name of the field.
content_type: content_type for the field. for example: ``image/png``.
filename: filename for the server.
local_path: file to upload on local disk.
data: file content to upload.
"""
part = lib.curl_mime_addpart(self._form)
ret = lib.curl_mime_name(part, name.encode())
if ret != 0:
raise CurlError("Add field failed.")
# mime type
if content_type is not None:
ret = lib.curl_mime_type(part, content_type.encode())
if ret != 0:
raise CurlError("Add field failed.")
# remote file name
if filename is not None:
ret = lib.curl_mime_filename(part, filename.encode())
if ret != 0:
raise CurlError("Add field failed.")
if local_path and data:
raise CurlError("Can not use local_path and data at the same time.")
# this is a filename
if local_path is not None:
if isinstance(local_path, Path):
local_path_str = str(local_path)
elif isinstance(local_path, bytes):
local_path_str = local_path.decode()
else:
local_path_str = local_path
if not Path(local_path_str).exists():
raise FileNotFoundError(f"File not found at {local_path_str}")
ret = lib.curl_mime_filedata(part, local_path_str.encode())
if ret != 0:
raise CurlError("Add field failed.")
if data is not None:
if not isinstance(data, bytes):
data = str(data).encode()
ret = lib.curl_mime_data(part, data, len(data))
@classmethod
def from_list(cls, files: list[dict]):
"""Create a multipart instance from a list of dict, for keys, see ``addpart``"""
form = cls()
for file in files:
form.addpart(**file)
return form
def attach(self, curl: Optional[Curl] = None) -> None:
"""Attach the mime instance to a curl instance."""
c = curl if curl else self._curl
c.setopt(CurlOpt.MIMEPOST, self._form)
def close(self) -> None:
"""Close the mime instance and underlying files. This method must be called
after ``perform`` or ``request``."""
lib.curl_mime_free(self._form)
self._form = ffi.NULL
def __del__(self) -> None:
self.close()