# -*- coding:utf-8 -*- """ @Author : g1879 @Contact : g1879@qq.com """ from json import loads from time import perf_counter, sleep from requests import Session from .base import BasePage from .chromium_driver import ChromiumDriver from .chromium_element import ChromiumElementWaiter, ChromiumScroll, ChromiumElement, run_js, make_chromium_ele from .functions.locator import get_loc from .functions.web import offset_scroll, cookies_to_tuple from .session_element import make_session_ele class ChromiumBase(BasePage): """标签页、frame、页面基类""" def __init__(self, address, tab_id=None, timeout=None): """ :param address: 浏览器 ip:port :param tab_id: 要控制的标签页id,不指定默认为激活的 :param timeout: 超时时间 """ self._is_loading = None self._root_id = None self._debug = False self._debug_recorder = None self._timeouts = None self._page_load_strategy = None self._connect_browser(address, tab_id) timeout = timeout if timeout is not None else self.timeouts.implicit super().__init__(timeout) def _connect_browser(self, addr_driver_opts=None, tab_id=None): """连接浏览器,在第一次时运行 :param addr_driver_opts: 浏览器地址、ChromiumDriver对象或DriverOptions对象 :param tab_id: 要控制的标签页id,不指定默认为激活的 :return: None """ self._chromium_init() self.address = addr_driver_opts if not tab_id: json = self._control_session.get(f'http://{self.address}/json').json() tab_id = [i['id'] for i in json if i['type'] == 'page'][0] self._init_page(tab_id) self._set_options() self._get_document() self._first_run = False def _chromium_init(self): self._control_session = Session() self._control_session.keep_alive = False self._first_run = True self._is_reading = False def _set_options(self): """用于设置浏览器运行参数""" self._timeouts = Timeout(self) self._page_load_strategy = 'normal' def _init_page(self, tab_id=None): """新建页面、页面刷新、切换标签页后要进行的cdp参数初始化 :param tab_id: 要跳转到的标签页id :return: None """ self._is_loading = True if tab_id: self._tab_obj = ChromiumDriver(tab_id=tab_id, tab_type='page', address=self.address) self._tab_obj.start() self._tab_obj.DOM.enable() self._tab_obj.Page.enable() self._tab_obj.Page.frameStoppedLoading = self._onFrameStoppedLoading self._tab_obj.Page.frameStartedLoading = self._onFrameStartedLoading self._tab_obj.DOM.documentUpdated = self._onDocumentUpdated self._tab_obj.Page.loadEventFired = self._onLoadEventFired self._tab_obj.Page.frameNavigated = self._onFrameNavigated def _get_document(self): """刷新cdp使用的document数据""" if not self._is_reading: self._is_reading = True if self._debug: print('获取document') if self._debug_recorder: self._debug_recorder.add_data((perf_counter(), '获取document', '开始')) self._wait_loaded() while True: try: root_id = self._tab_obj.DOM.getDocument()['root']['nodeId'] if self._debug_recorder: self._debug_recorder.add_data((perf_counter(), '信息', f'root_id:{root_id}')) self._root_id = self._tab_obj.DOM.resolveNode(nodeId=root_id)['object']['objectId'] break except Exception: if self._debug_recorder: self._debug_recorder.add_data((perf_counter(), 'err', '读取root_id出错')) if self._debug: print('获取document结束') if self._debug_recorder: self._debug_recorder.add_data((perf_counter(), '获取document', '结束')) self._is_loading = False self._is_reading = False def _wait_loaded(self, timeout=None): """等待页面加载完成 :param timeout: 超时时间 :return: 是否成功,超时返回False """ timeout = timeout if timeout is not None else self.timeouts.page_load end_time = perf_counter() + timeout while perf_counter() < end_time: state = self.ready_state if self._debug_recorder: self._debug_recorder.add_data((perf_counter(), 'waiting', state)) if state == 'complete': return True elif self.page_load_strategy == 'eager' and state in ('interactive', 'complete'): self.stop_loading() return True elif self.page_load_strategy == 'none': self.stop_loading() return True sleep(.1) self.stop_loading() return False def _onFrameStartedLoading(self, **kwargs): """页面开始加载时触发""" if kwargs['frameId'] == self.tab_id: self._is_loading = True if self._debug: print('页面开始加载 FrameStartedLoading') if self._debug_recorder: self._debug_recorder.add_data((perf_counter(), '加载流程', 'FrameStartedLoading')) def _onFrameStoppedLoading(self, **kwargs): """页面加载完成后触发""" if kwargs['frameId'] == self.tab_id and self._first_run is False and self._is_loading: if self._debug: print('页面停止加载 FrameStoppedLoading') if self._debug_recorder: self._debug_recorder.add_data((perf_counter(), '加载流程', 'FrameStoppedLoading')) self._get_document() def _onLoadEventFired(self, **kwargs): """在页面刷新、变化后重新读取页面内容""" if self._debug: print('loadEventFired') if self._debug_recorder: self._debug_recorder.add_data((perf_counter(), '加载流程', 'loadEventFired')) self._get_document() def _onDocumentUpdated(self, **kwargs): """页面跳转时触发""" if self._debug: print('documentUpdated') if self._debug_recorder: self._debug_recorder.add_data((perf_counter(), '加载流程', 'documentUpdated')) def _onFrameNavigated(self, **kwargs): """页面跳转时触发""" if kwargs['frame'].get('parentId', None) == self.tab_id and self._first_run is False and self._is_loading: self._is_loading = True if self._debug: print('navigated') if self._debug_recorder: self._debug_recorder.add_data((perf_counter(), '加载流程', 'navigated')) def __call__(self, loc_or_str, timeout=None): """在内部查找元素 例:ele = page('@id=ele_id') :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 超时时间 :return: ChromiumElement对象 """ return self.ele(loc_or_str, timeout) @property def title(self): """返回当前页面title""" self._wait_loaded() return self._tab_obj.Target.getTargetInfo(targetId=self.tab_id)['targetInfo']['title'] @property def driver(self): """返回用于控制浏览器的ChromiumDriver对象""" return self._tab_obj @property def _driver(self): """返回用于控制浏览器的ChromiumDriver对象""" return self._tab_obj @property def _wait_driver(self): """返回用于控制浏览器的ChromiumDriver对象,会先等待页面加载完毕""" while self._is_loading: sleep(.1) return self._tab_obj @property def is_loading(self): """返回页面是否正在加载状态""" return self._is_loading @property def url(self): """返回当前页面url""" self._wait_loaded() return self._tab_obj.Target.getTargetInfo(targetId=self.tab_id)['targetInfo']['url'] @property def html(self): """返回当前页面html文本""" self._wait_loaded() return self._wait_driver.DOM.getOuterHTML(objectId=self._root_id)['outerHTML'] @property def json(self): """当返回内容是json格式时,返回对应的字典,非json格式时返回None""" try: return loads(self('t:pre', timeout=.5).text) except Exception: return None @property def tab_id(self): """返回当前标签页id""" return self.driver.id if self.driver.status == 'started' else '' @property def ready_state(self): """返回当前页面加载状态,'loading' 'interactive' 'complete'""" return self._tab_obj.Runtime.evaluate(expression='document.readyState;')['result']['value'] @property def size(self): """返回页面总长高,格式:(长, 高)""" w = self.run_js('document.body.scrollWidth;', as_expr=True) h = self.run_js('document.body.scrollHeight;', as_expr=True) return w, h @property def active_ele(self): """返回当前焦点所在元素""" return self.run_js('return document.activeElement;') @property def page_load_strategy(self): """返回页面加载策略,有3种:'none'、'normal'、'eager'""" return self._page_load_strategy @property def scroll(self): """返回用于滚动滚动条的对象""" if not hasattr(self, '_scroll'): self._scroll = ChromiumScroll(self) return self._scroll @property def timeouts(self): """返回timeouts设置""" return self._timeouts @property def set_page_load_strategy(self): """返回用于设置页面加载策略的对象""" return PageLoadStrategy(self) def set_timeouts(self, implicit=None, page_load=None, script=None): """设置超时时间,单位为秒 :param implicit: 查找元素超时时间 :param page_load: 页面加载超时时间 :param script: 脚本运行超时时间 :return: None """ if implicit is not None: self._timeouts.implicit = implicit if page_load is not None: self._timeouts.page_load = page_load if script is not None: self._timeouts.script = script def run_js(self, script, as_expr=False, *args): """运行javascript代码 :param script: js文本 :param as_expr: 是否作为表达式运行,为True时args无效 :param args: 参数,按顺序在js文本中对应argument[0]、argument[1]... :return: 运行的结果 """ self._to_d_mode() return run_js(self, script, as_expr, self.timeouts.script, args) def run_async_js(self, script, as_expr=False, *args): """以异步方式执行js代码 :param script: js文本 :param as_expr: 是否作为表达式运行,为True时args无效 :param args: 参数,按顺序在js文本中对应argument[0]、argument[1]... :return: None """ self._to_d_mode() from threading import Thread Thread(target=run_js, args=(self, script, as_expr, self.timeouts.script, args)).start() def get(self, url, show_errmsg=False, retry=None, interval=None, timeout=None): """访问url :param url: 目标url :param show_errmsg: 是否显示和抛出异常 :param retry: 重试次数 :param interval: 重试间隔(秒) :param timeout: 连接超时时间 :return: 目标url是否可用 """ retry, interval = self._before_connect(url, retry, interval) self._url_available = self._d_connect(self._url, times=retry, interval=interval, show_errmsg=show_errmsg, timeout=timeout) return self._url_available def wait_loading(self, timeout=None): """阻塞程序,等待页面进入加载状态 :param timeout: 超时时间 :return: 等待结束时是否进入加载状态 """ if timeout != 0: timeout = self.timeout if timeout in (None, True) else timeout end_time = perf_counter() + timeout while perf_counter() < end_time: if self.is_loading: return True sleep(.005) return False def get_cookies(self, as_dict=False): """获取cookies信息 :param as_dict: 为True时返回由{name: value}键值对组成的dict :return: cookies信息 """ cookies = self._wait_driver.Network.getCookies()['cookies'] if as_dict: return {cookie['name']: cookie['value'] for cookie in cookies} else: return cookies def set_cookies(self, cookies): """设置cookies值 :param cookies: cookies信息 :return: None """ cookies = cookies_to_tuple(cookies) result_cookies = [] for cookie in cookies: if not cookie.get('domain', None): continue c = {'value': '' if cookie['value'] is None else cookie['value'], 'name': cookie['name'], 'domain': cookie['domain']} result_cookies.append(c) self._wait_driver.Network.setCookies(cookies=result_cookies) def set_headers(self, headers: dict) -> None: """设置固定发送的headers :param headers: dict格式的headers数据 :return: None """ self.run_cdp('Network.setExtraHTTPHeaders', headers=headers, not_change=True) def ele(self, loc_or_ele, timeout=None): """获取第一个符合条件的元素对象 :param loc_or_ele: 定位符或元素对象 :param timeout: 查找超时时间 :return: ChromiumElement对象 """ return self._ele(loc_or_ele, timeout=timeout) def eles(self, loc_or_str, timeout=None): """获取所有符合条件的元素对象 :param loc_or_str: 定位符或元素对象 :param timeout: 查找超时时间 :return: ChromiumElement对象组成的列表 """ return self._ele(loc_or_str, timeout=timeout, single=False) def s_ele(self, loc_or_ele=None): """查找第一个符合条件的元素以SessionElement形式返回,处理复杂页面时效率很高 :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 :return: SessionElement对象或属性、文本 """ return make_session_ele(self, loc_or_ele) def s_eles(self, loc_or_str): """查找所有符合条件的元素以SessionElement列表形式返回 :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :return: SessionElement对象组成的列表 """ return make_session_ele(self, loc_or_str, single=False) def _ele(self, loc_or_ele, timeout=None, single=True, relative=False): """执行元素查找 :param loc_or_ele: 定位符或元素对象 :param timeout: 查找超时时间 :param single: 是否只返回第一个 :return: ChromiumElement对象或元素对象组成的列表 """ if isinstance(loc_or_ele, (str, tuple)): loc = get_loc(loc_or_ele)[1] elif isinstance(loc_or_ele, ChromiumElement) or str(type(loc_or_ele)).endswith(".ChromiumFrame'>"): return loc_or_ele else: raise ValueError('loc_or_str参数只能是tuple、str、ChromiumElement类型。') timeout = timeout if timeout is not None else self.timeout search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) count = search_result['resultCount'] nodeIds = None end_time = perf_counter() + timeout ok = False while True: if count > 0: count = 1 if single else count try: nodeIds = self._wait_driver.DOM.getSearchResults(searchId=search_result['searchId'], fromIndex=0, toIndex=count) if nodeIds['nodeIds'][0] != 0: ok = True except Exception: sleep(.01) if ok or perf_counter() >= end_time: break search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) count = search_result['resultCount'] if not nodeIds: return None if single else [] if single: return make_chromium_ele(self, node_id=nodeIds['nodeIds'][0]) else: return [make_chromium_ele(self, node_id=i) for i in nodeIds['nodeIds']] def wait_ele(self, loc_or_ele, timeout=None): """返回用于等待元素到达某个状态的等待器对象 :param loc_or_ele: 可以是元素、查询字符串、loc元组 :param timeout: 等待超时时间 :return: 用于等待的ElementWaiter对象 """ return ChromiumElementWaiter(self, loc_or_ele, timeout) def scroll_to_see(self, loc_or_ele): """滚动页面直到元素可见 :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串(详见ele函数注释) :return: None """ ele = self.ele(loc_or_ele) node_id = ele.node_id try: self._wait_driver.DOM.scrollIntoViewIfNeeded(nodeId=node_id) except Exception: self.ele(loc_or_ele).run_js("this.scrollIntoView();") if not ele.is_in_viewport: offset_scroll(ele, 0, 0) def refresh(self, ignore_cache=False): """刷新当前页面 :param ignore_cache: 是否忽略缓存 :return: None """ self._is_loading = True self._driver.Page.reload(ignoreCache=ignore_cache) def forward(self, steps=1): """在浏览历史中前进若干步 :param steps: 前进步数 :return: None """ self._forward_or_back(steps) def back(self, steps=1): """在浏览历史中后退若干步 :param steps: 后退步数 :return: None """ self._forward_or_back(-steps) def _forward_or_back(self, steps): """执行浏览器前进或后退,会跳过url相同的历史记录 :param steps: 步数 :return: None """ if steps == 0: return history = self.run_cdp('Page.getNavigationHistory') index = history['currentIndex'] history = history['entries'] direction = 1 if steps > 0 else -1 curr_url = history[index]['userTypedURL'] nid = None for num in range(abs(steps)): for i in history[index::direction]: index += direction if i['userTypedURL'] != curr_url: nid = i['id'] curr_url = i['userTypedURL'] break if nid: self._is_loading = True self.run_cdp('Page.navigateToHistoryEntry', entryId=nid) def stop_loading(self): """页面停止加载""" if self._debug: print('停止页面加载') if self._debug_recorder: self._debug_recorder.add_data((perf_counter(), '操作', '停止页面加载')) self._tab_obj.Page.stopLoading() while self.ready_state != 'complete': sleep(.1) def run_cdp(self, cmd, **cmd_args): """执行Chrome DevTools Protocol语句 :param cmd: 协议项目 :param cmd_args: 参数 :return: 执行的结果 """ if cmd_args.get('not_change', None): driver = self._tab_obj cmd_args.pop('not_change') else: driver = self._driver try: return driver.call_method(cmd, **cmd_args) except Exception as e: if 'Could not find node with given id' in str(e): raise RuntimeError('该元素已不在当前页面中。') raise def set_user_agent(self, ua): """为当前tab设置user agent,只在当前tab有效 :param ua: user agent字符串 :return: None """ self._wait_driver.Network.setUserAgentOverride(userAgent=ua) def get_session_storage(self, item=None): """获取sessionStorage信息,不设置item则获取全部 :param item: 要获取的项,不设置则返回全部 :return: sessionStorage一个或所有项内容 """ js = f'sessionStorage.getItem("{item}");' if item else 'sessionStorage;' return self.run_js(js, as_expr=True) def get_local_storage(self, item=None): """获取localStorage信息,不设置item则获取全部 :param item: 要获取的项目,不设置则返回全部 :return: localStorage一个或所有项内容 """ js = f'localStorage.getItem("{item}");' if item else 'localStorage;' return self.run_js(js, as_expr=True) def set_session_storage(self, item, value): """设置或删除某项sessionStorage信息 :param item: 要设置的项 :param value: 项的值,设置为False时,删除该项 :return: None """ js = f'sessionStorage.removeItem("{item}");' if item is False \ else f'sessionStorage.setItem("{item}","{value}");' return self.run_js(js, as_expr=True) def set_local_storage(self, item, value): """设置或删除某项localStorage信息 :param item: 要设置的项 :param value: 项的值,设置为False时,删除该项 :return: None """ js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");' return self.run_js(js, as_expr=True) def clear_cache(self, session_storage=True, local_storage=True, cache=True, cookies=True): """清除缓存,可选要清除的项 :param session_storage: 是否清除sessionStorage :param local_storage: 是否清除localStorage :param cache: 是否清除cache :param cookies: 是否清除cookies :return: None """ if session_storage: self.run_js('sessionStorage.clear();', as_expr=True) if local_storage: self.run_js('localStorage.clear();', as_expr=True) if cache: self._wait_driver.Network.clearBrowserCache() if cookies: self._wait_driver.Network.clearBrowserCookies() def _d_connect(self, to_url, times=0, interval=1, show_errmsg=False, timeout=None): """尝试连接,重试若干次 :param to_url: 要访问的url :param times: 重试次数 :param interval: 重试间隔(秒) :param show_errmsg: 是否抛出异常 :param timeout: 连接超时时间 :return: 是否成功,返回None表示不确定 """ err = None timeout = timeout if timeout is not None else self.timeouts.page_load for t in range(times + 1): err = None result = self._driver.Page.navigate(url=to_url) is_timeout = not self._wait_loaded(timeout) while self.is_loading: sleep(.1) if is_timeout: err = TimeoutError('页面连接超时。') if 'errorText' in result: err = ConnectionError(result['errorText']) if not err: break if t < times: sleep(interval) while self.ready_state != 'complete': sleep(.1) if self._debug: print('重试') if show_errmsg: print(f'重试 {to_url}') if err: if show_errmsg: raise err if err is not None else ConnectionError('连接异常。') return False return True def _to_d_mode(self): """用于使WebPage切换到d模式""" return self._driver class Timeout(object): """用于保存d模式timeout信息的类""" def __init__(self, page, implicit=None, page_load=None, script=None): """ :param page: ChromiumBase页面 :param implicit: 默认超时时间 :param page_load: 页面加载超时时间 :param script: js超时时间 """ self._page = page self.implicit = 10 if implicit is None else implicit self.page_load = 30 if page_load is None else page_load self.script = 30 if script is None else script def __repr__(self): return str({'implicit': self.implicit, 'page_load': self.page_load, 'script': self.script}) class PageLoadStrategy(object): """用于设置页面加载策略的类""" def __init__(self, page): """ :param page: ChromiumBase对象 """ self._page = page def __call__(self, value): """设置加载策略 :param value: 可选 'normal', 'eager', 'none' :return: None """ if value.lower() not in ('normal', 'eager', 'none'): raise ValueError("只能选择 'normal', 'eager', 'none'。") self._page._page_load_strategy = value def normal(self): """设置页面加载策略为normal""" self._page._page_load_strategy = 'normal' def eager(self): """设置页面加载策略为eager""" self._page._page_load_strategy = 'eager' def none(self): """设置页面加载策略为none""" self._page._page_load_strategy = 'none'