1.Selenium 简介
Selenium 是一个用于自动化网页交互的开源工具。它可以通过模拟用户的行为(如点击、输入文本、滚动页面等)来自动化网页浏览器的操作。本次购票脚本使用selenimu4.21。
- 是一个Web自动化测试工具,可以直接运行在浏览器上
- 支持所有主流的浏览器
- 可以根据我们的指令,让浏览器自动加载页面,获取需要的数据,基础页面截图等
- 非Python标准模块,安装才能使用
2.Selenium基本使用
创建 WebDriver 实例以控制浏览器:
from selenium import webdriver
#指定 WebDriver 的路径
driver_path = '/path/to/chromedriver'
#创建 WebDriver 实例
driver = webdriver.Chrome(executable_path=driver_path)
使用 get 方法打开一个网页:
driver.get('http://example.com')
在浏览器中进行前进和后退:
driver.back() # 后退
driver.forward() # 前进
元素定位:
element = driver.find_element()
元素等待:
wait = WebDriverWait(driver, 10)
element = wait.until(EC.presence_of_element_located((By.ID, 'some-id')))
对页面元素执行操作,如点击、输入文本、获取元素属性等:
element.click() # 点击
element.send_keys('some text') # 输入文本
element_text = element.text # 获取元素文本
捕获页面截图或录制视频:
driver.get_screenshot_as_file('screenshot.png') # 截图
driver.save_screenshot('screenshot.png') # 另一种截图方法
#录制视频需要使用第三方库,如 pyvirtualdisplay
管理浏览器的 Cookies:
driver.get_cookies() # 获取所有 Cookies
driver.add_cookie({'name': 'cookie_name', 'value': 'cookie_value'}) # 添加 Cookie
driver.delete_cookie('cookie_name') # 删除 Cookie
结束 WebDriver 会话并清理资源:
driver.quit() # 关闭浏览器并结束 WebDriver 会话
3.购票实战
先爬取车站代号,用于出发站和目的站
#用于爬取车站代号
import requests
import re
import openpyxl
def get_station():
url='https://kyfw.12306.cn/otn/resources/js/framework/station_name.js?station_version=1.9151'
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36'}
resp=requests.get(url,headers=headers)
resp.encoding='utf-8'
#print(resp.text)
stations=re.findall('([\u4e00-\u9fa5]+)\|([A-Z]+)',resp.text)
#print(stations)
return stations #stations是一个列表
def save(lst):
wb=openpyxl.Workbook() #创建工作薄对象
ws=wb.active #使用活动表
for item in lst:
ws.append(item) #每执行一次append将向sheet中添加一行
wb.save('车站代码.xlsx')
if __name__ == '__main__':
lst=get_station()
save(lst)
然后运行抢票脚本,业务流程为:
1.自动打开12306登录界面,手动扫码登录
2.循环余票查询并预定
3.选择乘车人、席位并提交订单
抢票脚本如下:
import openpyxl
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait #显示等待
from selenium.webdriver.support import expected_conditions as ec #等待的条件
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import Select
from selenium.common.exceptions import NoSuchElementException, ElementNotVisibleException, \
ElementNotInteractableException
driver=webdriver.Chrome() #创建浏览器对象
class TrainSpider(object):
#定义类属性
login_url='https://kyfw.12306.cn/otn/resources/login.html' #登录的页面
profile_url='https://kyfw.12306.cn/otn/view/index.html' #个人中心的网址
left_ticket='https://kyfw.12306.cn/otn/leftTicket/init?linktypeid=dc' #余票查询页面
confirm_url='https://kyfw.12306.cn/otn/confirmPassenger/initDc' #确认乘车人和座席
#定义init初始化方法
def __init__(self,from_station,to_station,train_date,trains,passenger_name_lst):
self.from_station=from_station
self.to_station=to_station
self.train_date=train_date
self.station_code=self.init_station_code() #selft.station_code结果为dict
self.trains=trains
self.passenger_name_lst=passenger_name_lst
self.selected_no=None
self.selected_seat=None
#登录的方法
def login(self):
# 打开登录的页面
driver.get(self.login_url)
WebDriverWait(driver,1000).until(
ec.url_to_be(self.profile_url) #等待直到URL成为个人中心页面
)
print('登录成功')
#查询余票
def search_ticket(self):
# 打开查询余票的网站
driver.get(self.left_ticket)
#找到出发站到达站的 隐藏的HTML标签
from_station_input=driver.find_element(By.ID, 'fromStation')
to_station_input=driver.find_element(By.ID, 'toStation')
#找到出发时间的的input标签
train_date_input=driver.find_element(By.ID, 'train_date')
#根据key获取value
from_station_code=self.station_code[self.from_station] #根据出发地找到出发地的代号
to_station_code=self.station_code[self.to_station] #根据目的地找到目的地的代码
#执行js代码
driver.execute_script('arguments[0].value="%s"' % from_station_code,from_station_input)
driver.execute_script('arguments[0].value="%s"' % to_station_code,to_station_input)
driver.execute_script('arguments[0].value="%s"' % self.train_date, train_date_input)
#执行点击查询按钮,执行查询操作
query_ticket_tag=driver.find_element(By.ID, 'query_ticket')
query_ticket_tag.click()
#解析车次,显示等待,等待tbody的出现
WebDriverWait(driver,1000).until(
ec.presence_of_element_located((By.XPATH, '//tbody[@id="queryLeftTable"]/tr'))
)
#筛选出有数据的tr,去掉属性为datatran的tr
trains=driver.find_elements(By.XPATH, '//tbody[@id="queryLeftTable"]/tr[not(@datatran)]')
is_flag=False #标记是否有余票 ,没有余票为False,有余票为True
while True:
# 分别遍历每个车次
for train in trains:
# print(train.text)
infos = train.text.replace('\n', ' ').split(' ')
# print(infos)
train_no = infos[0] # 列表中索引为0的为车次
if train_no in self.trains:
# 根据key获取值 席别是一个列表
seat_types = self.trains[train_no]
for seat_type in seat_types: # 遍历席位的列表
if seat_type == 'O': # 说明是二等座
count = infos[9] # 索引为9的是二等座
if count.isdigit() or count == '有':
is_flag = True
break # 退出了遍历席位的循环
elif seat_type == 'M': # 说明是一等座
count = infos[8] # 索引为8的是一等座
if count.isdigit() or count == '有':
is_flag = True
break
# 判断是否有余票
if is_flag:
self.selected_no = train_no
# 有票就可以执行单击预订
order_btn = train.find_element(By.XPATH, './/a[@class="btn72"]')
order_btn.click()
return # 退出的是遍历车次的循环
def confirm(self):
#等待来到确认乘车人界面
WebDriverWait(driver,1000).until(
ec.url_to_be(self.confirm_url)
)
#等待乘车人标签显示出来
WebDriverWait(driver,1000).until(
ec.presence_of_element_located((By.XPATH,'//ul[@id="normal_passenger_id"]/li/label'))
)
#获取所有的乘车人
passengers=driver.find_elements(By.XPATH, '//ul[@id="normal_passenger_id"]/li/label')
for passenger in passengers: #分别遍历每一位乘车人 (label标签)
name=passenger.text #获取乘车人的姓名
if name in self.passenger_name_lst:
passenger.click() #label标签的单击
#确认席位
seat_select=Select(driver.find_element(By.ID, 'seatType_1'))
seat_types=self.trains[self.selected_no] #根据key获取值 self.trains是要抢票的车次的字典,self.selected_no 要抢票的车次的key
for seat_type in seat_types:
try:
seat_select.select_by_value(seat_type)
self.selected_seat=seat_type #记录有票的座席
except NoSuchElementException:
continue
else:
break
WebDriverWait(driver,1000).until(
ec.element_to_be_clickable((By.ID,'submitOrder_id')) #等待到a标签可以单击的时候
)
#提交订单
submit_btn=driver.find_element(By.ID, 'submitOrder_id')
submit_btn.click()
#显示等待,等待到模式对话框窗口出现
WebDriverWait(driver,1000).until(
ec.presence_of_element_located((By.CLASS_NAME,'dhtmlx_window_active'))
)
#等待按钮可以点
WebDriverWait(driver,1000).until(
ec.element_to_be_clickable((By.ID, 'qr_submit_id'))
)
submit_btn=driver.find_element(By.ID, 'qr_submit_id')
while submit_btn:
try:
submit_btn = driver.find_element(By.ID, 'qr_submit_id')
submit_btn.click()
except (ElementNotVisibleException, ElementNotInteractableException) as e:
print(f"Caught exception: {e}")
break
# finally:
# driver.quit()
print(f'恭喜{self.selected_no}的{self.selected_seat}抢票成功!!!')
#负责调用其它方法(组织其它的代码)
def run(self):
# 1.登录
self.login()
# 2.余票查询
self.search_ticket()
# 3.确认乘车人和订单
self.confirm()
def init_station_code(self):
wb=openpyxl.load_workbook('车站代码.xlsx')
ws=wb.active #使用活动表
lst=[] #存储所有车站名称及代号
for row in ws.rows: #遍历所有行
sub_lst=[] #用于存储每行中的车站名称,车站代号
for cell in row: #遍历一行中的单元格
sub_lst.append(cell.value)
lst.append(sub_lst)
#print(dict(lst)) #将列表转成字典类型
return dict(lst)
#启动爬虫程序
def start(): # O 表示的是二等座,M 代表是一等座
spider=TrainSpider('出发站', '目的站', '出发日期', {'车次': ['O', 'M']}, ['乘车人姓名'])
spider.run()
#spider.init_station_code()
if __name__ == '__main__':
start()