diff --git a/DrissionPage/chromium_page.py b/DrissionPage/chromium_page.py index 3f0bee6..8ab134e 100644 --- a/DrissionPage/chromium_page.py +++ b/DrissionPage/chromium_page.py @@ -6,7 +6,7 @@ from time import perf_counter, sleep from typing import Union, Tuple, List, Any from pychrome import Tab -from requests import get as requests_get +from requests import Session from json import loads from requests.cookies import RequestsCookieJar @@ -45,6 +45,10 @@ class ChromiumPage(BasePage): self._is_loading = False self._root_id = None self.timeouts = Timeout(self) + self._ss = Session() + self._ss.keep_alive = False + self._alert = Alert() + self._first_run = True # 接管或启动浏览器 if addr_tab_opts is None or isinstance(addr_tab_opts, DriverOptions): @@ -52,20 +56,25 @@ class ChromiumPage(BasePage): self.address = self.options.debugger_address self.process = connect_chrome(self.options)[1] self._set_options() - json = loads(requests_get(f'http://{self.address}/json').text) + json = loads(self._ss.get(f'http://{self.address}/json').text) tab_id = [i['id'] for i in json if i['type'] == 'page'][0] self._init_page(tab_id) + self._get_document() + self._first_run = False # 接收浏览器地址和端口 elif isinstance(addr_tab_opts, str): self.address = addr_tab_opts self.options = DriverOptions(read_file=False) + self.options.debugger_address = addr_tab_opts self.process = connect_chrome(self.options)[1] self._set_options() if not tab_id: - json = loads(requests_get(f'http://{self.address}/json').text) + json = loads(self._ss.get(f'http://{self.address}/json').text) tab_id = [i['id'] for i in json if i['type'] == 'page'][0] self._init_page(tab_id) + self._get_document() + self._first_run = False # 接收传递过来的Tab,浏览器 elif isinstance(addr_tab_opts, Tab): @@ -75,11 +84,26 @@ class ChromiumPage(BasePage): self.options = DriverOptions(read_file=False) self._set_options() self._init_page(None) + self._get_document() + self._first_run = False else: raise TypeError('只能接收Tab或DriverOptions类型参数。') - self._alert = Alert() + def _init_page(self, tab_id: str = None) -> None: + """新建页面、页面刷新、切换标签页后要进行的cdp参数初始化 + :param tab_id: 要跳转到的标签页id + :return: None + """ + self._is_loading = True + if tab_id: + self._driver = Tab(id=tab_id, type='page', + webSocketDebuggerUrl=f'ws://{self.address}/devtools/page/{tab_id}') + + self._driver.start() + self._driver.DOM.enable() + self._driver.Page.enable() + self._driver.Page.javascriptDialogOpening = self._on_alert_open self._driver.Page.javascriptDialogClosed = self._on_alert_close @@ -87,38 +111,21 @@ class ChromiumPage(BasePage): self._driver.Page.loadEventFired = self._onLoadEventFired self._driver.DOM.documentUpdated = self._onDocumentUpdated - def _init_page(self, tab_id: str = None) -> None: - """新建页面、页面刷新、切换标签页后要进行的cdp参数初始化 - :param tab_id: 要跳转到的标签页id - :return: None - """ - print('init page') - self._is_loading = True - if tab_id: - print('new tab') - self._driver = Tab(id=tab_id, type='page', - webSocketDebuggerUrl=f'ws://{self.address}/devtools/page/{tab_id}') - - self._driver.start() - self._driver.DOM.enable() - self._driver.Page.enable() + def _get_document(self) -> None: + """刷新cdp使用的document数据""" + print('get doc') + self._wait_loading() root_id = self._driver.DOM.getDocument()['root']['nodeId'] self._root_id = self._driver.DOM.resolveNode(nodeId=root_id)['object']['objectId'] - self._wait_loading() self._is_loading = False - def _set_options(self) -> None: - self.set_timeouts(page_load=self.options.timeouts['pageLoad'] / 1000, - script=self.options.timeouts['script'] / 1000, - implicit=self.options.timeouts['implicit'] / 1000 if self.timeout is None else self.timeout) - self._page_load_strategy = self.options.page_load_strategy - def _wait_loading(self, timeout: float = None) -> bool: """等待页面加载完成 :param timeout: 超时时间 :return: 是否成功,超时返回False """ - timeout = timeout if timeout is not None else self.timeouts.page_load + # timeout = timeout if timeout is not None else self.timeouts.page_load + timeout = 100 end_time = perf_counter() + timeout while perf_counter() < end_time: @@ -136,9 +143,9 @@ class ChromiumPage(BasePage): def _onLoadEventFired(self, **kwargs): """在页面刷新、变化后重新读取页面内容""" - # self._is_loading = True print('load complete') - self._init_page(self._driver.id) + if self._first_run is False: + self._get_document() def _onFrameNavigated(self, **kwargs): print('nav') @@ -147,7 +154,14 @@ class ChromiumPage(BasePage): self._is_loading = True def _onDocumentUpdated(self, **kwargs): - print('doc') + # print('doc') + pass + + def _set_options(self) -> None: + self.set_timeouts(page_load=self.options.timeouts['pageLoad'] / 1000, + script=self.options.timeouts['script'] / 1000, + implicit=self.options.timeouts['implicit'] / 1000 if self.timeout is None else self.timeout) + self._page_load_strategy = self.options.page_load_strategy def __call__(self, loc_or_str: Union[Tuple[str, str], str, 'ChromiumElement'], timeout: float = None) -> Union['ChromiumElement', None]: @@ -163,7 +177,7 @@ class ChromiumPage(BasePage): def driver(self) -> Tab: """返回用于控制浏览器的Tab对象""" while self._is_loading: - print('loading') + # print('loading') sleep(.1) return self._driver @@ -171,7 +185,7 @@ class ChromiumPage(BasePage): def url(self) -> str: """返回当前页面url""" tab_id = self.driver.id # 用于WebPage时激活浏览器 - json = loads(requests_get(f'http://{self.address}/json').text) + json = loads(self._ss.get(f'http://{self.address}/json').text) return [i['url'] for i in json if i['id'] == tab_id][0] @property @@ -188,25 +202,20 @@ class ChromiumPage(BasePage): @property def tabs_count(self) -> int: """返回标签页数量""" - return len(self.tab_ids) + return len(self.tabs) @property - def tab_ids(self) -> list: + def tabs(self) -> list: """返回所有标签页id""" d = self.driver - json = loads(requests_get(f'http://{self.address}/json').text) + json = loads(self._ss.get(f'http://{self.address}/json').text) return [i['id'] for i in json if i['type'] == 'page'] @property - def current_tab_id(self) -> str: + def tab_id(self) -> str: """返回当前标签页id""" return self.driver.id - @property - def current_tab_index(self) -> int: - """返回当前标签页序号""" - return self.tab_ids.index(self.current_tab_id) - @property def ready_state(self) -> str: """返回当前页面加载状态,'loading' 'interactive' 'complete'""" @@ -364,8 +373,8 @@ class ChromiumPage(BasePage): """ return self._ele(loc_or_ele, timeout=timeout, single=False) - def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement] = None) -> Union[ - SessionElement, str, None]: + def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement] = None) \ + -> Union[SessionElement, str, None]: """查找第一个符合条件的元素以SessionElement形式返回,处理复杂页面时效率很高 \n :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 :return: SessionElement对象或属性、文本 @@ -569,6 +578,10 @@ class ChromiumPage(BasePage): js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");' return self.run_script(js, as_expr=True) + def to_front(self) -> None: + """激活当前标签页使其处于最前面""" + self._ss.get(f'http://{self.address}/json/activate/{self.tab_id}') + def new_tab(self, url: str = None) -> None: """新建并定位到一个标签页,该标签页在最后面 \n :param url: 新标签页跳转到的网址 @@ -576,70 +589,62 @@ class ChromiumPage(BasePage): """ d = self.driver url = f'?{url}' if url else '' - requests_get(f'http://{self.address}/json/new{url}') + self._ss.get(f'http://{self.address}/json/new{url}') + self.to_tab() - def to_tab(self, num_or_id: Union[int, str] = 0, activate: bool = True) -> None: - """跳转到标签页 \n - 注意:当程序使用的是接管的浏览器,获取到的 id 顺序和视觉效果不一致 \n - :param num_or_id: 标签页序号或id字符串,序号第一个为0,最后为-1 + def to_tab(self, tab_id: str = None, activate: bool = True) -> None: + """跳转到标签页 \n + :param tab_id: 标签页id字符串,默认跳转到活动状态的 :param activate: 切换后是否变为活动状态 :return: None """ - try: - tab = int(num_or_id) - except (ValueError, TypeError): - tab = num_or_id - - tab = self.tab_ids[tab] if isinstance(tab, int) else tab + tabs = self.tabs + if not tab_id: + tab_id = tabs[0] + if tab_id == self.tab_id or tab_id not in tabs: + return if activate: - requests_get(f'http://{self.address}/json/activate/{tab}') + self._ss.get(f'http://{self.address}/json/activate/{tab_id}') - self.driver.stop() - self._init_page(tab) + self._driver.stop() + self._init_page(tab_id) + state = self._driver.Runtime.evaluate(expression='document.readyState;')['result']['value'] + if state == 'complete': + self._get_document() - def to_front(self) -> None: - """激活当前标签页使其处于最前面""" - d = self.driver - requests_get(f'http://{self.address}/json/activate/{self.current_tab_id}') - - def close_tabs(self, num_or_ids: Union[int, str, list, tuple, set] = None, others: bool = False) -> None: + def close_tabs(self, tab_ids: Union[str, List[str], Tuple[str]] = None, others: bool = False) -> None: """关闭传入的标签页,默认关闭当前页。可传入多个 \n - 注意:当程序使用的是接管的浏览器,获取到的 id 顺序和视觉效果不一致,不能按序号关闭。 \n - :param num_or_ids:要关闭的标签页序号或id,可传入id和序号组成的列表或元组,为None时关闭当前页 + :param tab_ids: 要关闭的标签页id,可传入id组成的列表或元组,为None时关闭当前页 :param others: 是否关闭指定标签页之外的 :return: None """ + all_tabs = set(self.tabs) + tabs = set(tab_ids) if tab_ids else {self.tab_id} if others: - all_tabs = self.tab_ids - reserve_tabs = {self.current_tab_id} if num_or_ids is None else _get_tabs(all_tabs, num_or_ids) - tabs = set(all_tabs) - reserve_tabs - else: - tabs = (self.current_tab_id,) if num_or_ids is None else _get_tabs(self.tab_ids, num_or_ids) + tabs = all_tabs - tabs - tabs_len = len(tabs) - all_len = len(self.tab_ids) - if tabs_len > all_len: - raise ValueError('要关闭的页面数量不能大于总数量。') + end_len = len(all_tabs) - len(tabs) + if end_len <= 0: + self.quit() + return - is_alive = True - if tabs_len == all_len: - self.driver.stop() - is_alive = False + if self.tab_id in tabs: + self._driver.stop() for tab in tabs: - requests_get(f'http://{self.address}/json/close/{tab}') + self._ss.get(f'http://{self.address}/json/close/{tab}') + while len(self.tabs) != end_len: + pass - if is_alive: - self.to_tab(0) + self.to_tab() - def close_other_tabs(self, num_or_ids: Union[int, str, list, tuple] = None) -> None: + def close_other_tabs(self, tab_ids: Union[str, List[str], Tuple[str]] = None) -> None: """关闭传入的标签页以外标签页,默认保留当前页。可传入多个 \n - 注意:当程序使用的是接管的浏览器,获取到的 id 顺序和视觉效果不一致,不能按序号关闭。 \n - :param num_or_ids: 要保留的标签页序号或id,可传入id和序号组成的列表或元组,为None时保存当前页 + :param tab_ids: 要保留的标签页id,可传入id组成的列表或元组,为None时保存当前页 :return: None """ - self.close_tabs(num_or_ids, True) + self.close_tabs(tab_ids, True) def clear_cache(self, session_storage: bool = True, @@ -692,11 +697,10 @@ class ChromiumPage(BasePage): """显示浏览器窗口,只在Windows系统可用""" _show_or_hide_browser(self, hide=False) - def check_page(self) -> Union[bool, None]: - """检查页面是否符合预期 \n - 由子类自行实现各页面的判定规则 - """ - return None + def quit(self) -> None: + """关闭浏览器""" + self._driver.Browser.close() + self._driver.stop() def _d_connect(self, to_url: str, @@ -713,22 +717,20 @@ class ChromiumPage(BasePage): :return: 是否成功,返回None表示不确定 """ err = None - is_ok = False timeout = timeout if timeout is not None else self.timeouts.page_load for _ in range(times + 1): result = self.driver.Page.navigate(url=to_url) - is_timeout = not self._wait_loading(timeout) if is_timeout: - raise TimeoutError('页面连接超时。') + err = TimeoutError('页面连接超时。') + continue if 'errorText' in result: - raise ConnectionError(result['errorText']) + err = ConnectionError(result['errorText']) + continue - is_ok = self.check_page() - - if is_ok is not False: + if not err: break if _ < times: @@ -736,10 +738,10 @@ class ChromiumPage(BasePage): if show_errmsg: print(f'重试 {to_url}') - if is_ok is False and show_errmsg: + if err and show_errmsg: raise err if err is not None else ConnectionError('连接异常。') - return is_ok + return False if err else True def _on_alert_close(self, **kwargs): """alert关闭时触发的方法""" @@ -845,20 +847,6 @@ class WindowSizeSetter(object): self.driver.Browser.setWindowBounds(windowId=self.window_id, bounds=bounds) -def _get_tabs(ids: list, num_or_ids: Union[int, str, list, tuple, set]) -> set: - """返回指定标签页id组成的set - :param ids: 所有页面id组成的列表 - :param num_or_ids: 指定的标签页,可以是多个 - :return: 指定标签页组成的set - """ - if isinstance(num_or_ids, (int, str)): - num_or_ids = (num_or_ids,) - elif not isinstance(num_or_ids, (list, tuple, set)): - raise TypeError('num_or_id参数只能是int、str、list、set 或 tuple类型。') - - return set(i if isinstance(i, str) else ids[i] for i in num_or_ids) - - def _show_or_hide_browser(page: ChromiumPage, hide: bool = True) -> None: """执行显示或隐藏浏览器窗口 :param page: ChromePage对象 diff --git a/DrissionPage/config.py b/DrissionPage/config.py index 3de5abe..4276108 100644 --- a/DrissionPage/config.py +++ b/DrissionPage/config.py @@ -464,6 +464,7 @@ class DriverOptions(Options): self._driver_path = None self._user_data_path = None self.ini_path = None + self.timeouts = {'implicit': 10, 'pageLoad': 30, 'script': 30} if read_file: self.ini_path = ini_path or str(Path(__file__).parent / 'configs.ini') @@ -647,14 +648,14 @@ class DriverOptions(Options): :param script: 脚本运行超时时间 :return: 当前对象 """ - timeouts = self._caps.get('timeouts', {'implicit': 10, 'pageLoad': 3000, 'script': 3000}) + # timeouts = self._caps.get('timeouts', {'implicit': 10, 'pageLoad': 3000, 'script': 3000}) if implicit is not None: - timeouts['implicit'] = implicit + self.timeouts['implicit'] = implicit if pageLoad is not None: - timeouts['pageLoad'] = pageLoad * 1000 + self.timeouts['pageLoad'] = pageLoad * 1000 if script is not None: - timeouts['script'] = script * 1000 - self.timeouts = timeouts + self.timeouts['script'] = script * 1000 + # self.timeouts = timeouts return self