python(24) : 自动抢票
免责声明
本博客中的所有代码示例和相关材料(以下简称“代码”)仅供学习、研究和参考之用。作者或博主对代码的准确性、完整性、适用性或安全性不做任何保证。
-
使用风险自负:读者自行决定是否使用这些代码,并承担由此产生的一切风险。对于因使用或依赖本博客提供的代码而导致的任何直接或间接损失,包括但不限于数据丢失、业务中断或其他任何形式的损害,作者或博主不承担任何责任。
-
无担保声明:本博客中的代码按“原样”提供,没有任何形式的明示或暗示担保,包括但不限于适销性、特定用途适用性和非侵权性的担保。
-
修改与更新:作者或博主保留随时修改或删除博客内容的权利,恕不另行通知。读者应定期检查博客以获取最新信息。
-
第三方链接:本博客可能包含指向第三方网站或资源的链接。这些链接仅为方便读者而提供,作者或博主不对这些网站的内容负责,也不对其可用性做出任何承诺。
-
许可与授权:除非另有说明,本博客中的代码遵循[具体开源许可证](如MIT许可证、Apache许可证等)。请在使用前查阅相应的许可证条款。
-
法律适用:本免责声明受[国家/地区]法律管辖,并根据其解释。如果您不同意上述条款,请勿使用本博客中的代码。
使用建议
- 测试环境:在将代码应用于生产环境之前,请务必在安全的测试环境中进行充分测试。
- 自定义调整:根据您的具体需求和环境,对代码进行必要的调整和优化。
- 寻求专业意见:对于关键任务的应用程序或系统,请考虑咨询专业的开发人员或顾问。
1.概述
自动抢票程序(含python源码), 自动打开浏览器发现有票自动下单。
需要安装谷歌浏览器, 已含安装包, 运行之前需要修改config.json文件添加购票信息。
运行run.exe后会打开浏览器跳转到12306登录页面, 手机12306扫码之后,点击已登录会进行自动抢票, 整点或者整半点自动刷新查看是否有票(其他时间间隔5-30秒查询一次), 有票之后会选择乘车人和座位之后提交订单并发送钉钉群机器人(已配置的话), 只能购买高铁动车。
下载:
2.安装依赖
pip install selenium
pip install pyinstaller
3.配置文件
3.1.config.json
{
"购票信息": [
{
"起点": "杭州",
"终点": "上海",
"日期": [
"2025-01-19"
],
"指定车次": [],
"座位类型": [
"二等座",
"商务座",
"一等座"
],
"是否高铁动车": true,
"乘车人": [
"张三",
"李四",
"王五"
],
"座位偏好": [
"A",
"B",
"C"
]
}
],
"钉钉机器人推送地址": ""
}
3.2.city_code.txt
12306城市编码, 会自动下载
4.代码
import datetime
import json
import logging
import os
import queue
import random
import threading
import time
import tkinter as tk
import traceback
from tkinter import messagebox
import requests
from selenium import webdriver
from selenium.common import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger('auto_buy_ticket')
browser = None
is_close = False
config = None
city_code = {}
root = None
popup = None
q = queue.Queue()
buy_ticket_count = {}
is_running = False
is_run_task = True
is_login = False
seat_index = {
"商务座 特等座": 1,
"优选 一等座": 2,
"一等座": 3,
"二等座 二等包座": 4,
"高级软卧": 5,
"软卧/动卧 一等卧": 6,
"硬卧 二等卧": 7,
"软座": 8,
"硬座": 9,
"无座": 10
}
is_success_buy = []
is_error_by = []
# ************************************************[公共函数]********************************************************
def is_on_the_hour_or_half_hour():
now = datetime.datetime.now()
minute = now.minute
# 判断是否为整点(分钟数为0)或者整半小时(分钟数为30)
return minute == 0 or minute == 30
def exists_id(id):
try:
get_ele_by_id(id)
return True
except:
return False
def is_ready(id):
while True:
try:
get_ele_by_id(id)
break
except:
pass
sleep(0.1)
def get_random_sleep_second():
return int(random.uniform(5, 30))
def send_ding_ding(msg):
# 构建消息内容
message = {
"msgtype": "text",
"text": {
"content": msg
}, "at": {
"atMobiles": [],
"isAtAll": True
}
}
headers = {
'Content-Type': 'application/json'
}
try:
response = requests.post(config['钉钉机器人推送地址'], headers=headers, data=json.dumps(message))
response.raise_for_status() # 检查请求是否成功
log.info('购票信息发送钉钉完成')
except requests.exceptions.RequestException as e:
log.info(f'购票信息发送钉钉失败,{e}')
def sleep(second):
time.sleep(second)
def random_sleep(second):
log.info(f'随机睡眠{second}秒')
time.sleep(second)
def set_value_by_id(id, value):
ele = browser.find_element(By.ID, id)
ele.clear()
ele.send_keys(value)
def select_station(station):
selects = get_ele_by_id('panel_cities')
divs = selects.find_elements(By.TAG_NAME, 'div')
for d in divs:
s0 = d.text.split('\n')[0]
if s0 == station:
d.click()
break
def get_ele_by_id(id):
return browser.find_element(By.ID, id)
def click_by_id(id):
browser.find_element(By.ID, id).click()
def get_seat_index(name):
for k in seat_index:
if name in k:
if '优选' in k and '优选' not in name:
continue
return seat_index[k]
return None
def write_f_8(path, content):
os.makedirs(os.path.dirname(path), exist_ok=True)
# 打开一个新的文本文件,如果文件不存在将会自动创建
file = open(path, "w", encoding='utf-8')
# 写入内容到文件
file.write(content)
# 关闭文件
file.close()
def read_f_8(path):
with open(path, 'r', encoding='utf-8') as file:
rs = file.read()
return rs
def check_train(text, trains):
for t in trains:
if t in text:
return t
return None
# ************************************************[日志]********************************************************
class TkLoggingHandler(logging.Handler):
def __init__(self, text_widget):
logging.Handler.__init__(self)
self.text_widget = text_widget
def emit(self, record):
msg = self.format(record)
self.text_widget.configure(state='normal')
self.text_widget.insert(tk.END, msg + '\n')
self.text_widget.configure(state='disabled')
# 滚动到底部
self.text_widget.yview(tk.END)
def show_msg(title, message):
# 这里放置你想要执行的代码
messagebox.showinfo(title, message)
def show_windows():
global root, popup
root = tk.Tk()
root.withdraw()
popup = tk.Toplevel(root)
# 关闭窗口
def close():
global is_close
is_close = True
popup.protocol("WM_DELETE_WINDOW", close)
# 操作窗口
popup.title("操作")
# 设置窗口大小:宽度x高度
p_width = 200
p_height = 200
p_margin = 100
popup.geometry(f"{p_width}x{p_height}") # 宽400像素,高300像素
# 获取屏幕的宽度和高度
screen_width = popup.winfo_screenwidth()
screen_height = popup.winfo_screenheight()
# 设置窗口的位置:距离屏幕左侧的距离+距离屏幕上侧的距离
# 这里我们将窗口放置在右上角
popup.geometry("+{}+{}".format(screen_width - p_width - p_margin, p_margin)) # 距离右侧400像素,顶部0像素
# 确保窗口置顶
popup.attributes('-topmost', True)
# 退出, 无法退出, 会卡住
# b_exit = tk.Button(popup, text="退出程序", command=close)
# b_exit.pack(pady=5)
b_login = tk.Button(popup, text="已登录", command=index)
b_login.pack(pady=5)
b_login = tk.Button(popup, text="停止任务", command=stop_task)
b_login.pack(pady=5)
b_login = tk.Button(popup, text="启动任务", command=start_task)
b_login.pack(pady=5)
b_login = tk.Button(popup, text="执行一次买票", command=buy_tickets)
b_login.pack(pady=5)
root.mainloop()
def stop_task():
global is_run_task
log.info('停止买票定时任务')
is_run_task = False
def start_task():
global is_run_task
log.info('启动买票定时任务')
is_run_task = True
def block():
while not is_close:
time.sleep(1)
browser.quit()
browser.close()
os.kill(os.getpid(), 9)
# ************************************************[买票逻辑]********************************************************
def cmd_task():
log.info('已启动买票定时任务')
next_second = get_random_sleep_second()
count = 0
on_half_hour_time_second = None
while True:
try:
if not is_login:
sleep(1)
continue
if on_half_hour_time_second and (time.time() - on_half_hour_time_second > 70):
on_half_hour_time_second = None
if is_run_task:
if is_on_the_hour_or_half_hour() and on_half_hour_time_second is None:
on_half_hour_time_second = time.time()
log.info('整点或者整半点, 执行三次买票逻辑')
for i in range(3):
buy_tickets()
sleep(0.1)
else:
count += 1
if count >= next_second:
log.info('随机睡眠完成, 执行买票逻辑')
buy_tickets()
count = 0
next_second = get_random_sleep_second()
log.info(f'定时买票执行完成, 下次买票时间:{next_second}秒后')
sleep(1)
except Exception:
sleep(1)
def buy_tickets():
try:
global is_running, config
if is_running:
log.info('买票逻辑正在运行中, 稍后重试')
return None
is_running = True
st1 = time.time()
# 加载买票配置
config = json.loads(read_f_8(os.getcwd() + os.sep + 'config.json'))
formatted_json = json.dumps(config, indent=4, sort_keys=True, ensure_ascii=False)
log.info(f'已加载配置文件,内容为:\n\n{formatted_json}\n')
log.info(f'开始执行购票逻辑...')
st = time.time()
train_config = config['购票信息']
log.info(f'一共{len(train_config)}个购票需求, 开始依次执行')
ind = 0
item_ind = 0
for item in train_config:
ind += 1
item_ind += 1
btc_key = json.dumps(item, ensure_ascii=False)
person_count = len(item["乘车人"])
need_buy_ticket_count = person_count
current_buy_ticket_count = buy_ticket_count.get(btc_key, None)
if current_buy_ticket_count is not None:
if current_buy_ticket_count == need_buy_ticket_count:
log.info(f'该购票需求已完成, 跳过, 购票信息:{btc_key}')
continue
log.info(f'执行购票需求[{ind}/{len(train_config)}]...')
trains = item['指定车次']
seats = item['座位类型']
start = item['起点']
end = item['终点']
current_url = None
last_date = None
for day in item['日期']:
try:
if current_buy_ticket_count == need_buy_ticket_count:
log.info(f'第[{item_ind}]个购票需求已完成购票数量{need_buy_ticket_count}, 执行处理其他购票需求')
break
log.info(f'查询到日期[{day}]起点[{start}]终点[{end}]')
if current_url is None:
last_date = day
is_high_dan = item.get('是否高铁动车', None)
if is_high_dan:
high_dan = 'Y'
else:
high_dan = 'N'
current_url = (f'https://kyfw.12306.cn/otn/leftTicket/init?linktypeid=dc&'
f'fs={start},{city_code[start]}&ts={end},{city_code[end]}&date={day}&flag=N,{high_dan},Y')
browser.get(current_url)
else:
current_url = current_url.replace(last_date, day)
browser.get(current_url)
is_ready('queryLeftTable')
table = browser.find_element(By.ID, 'queryLeftTable')
rows = table.find_elements(By.TAG_NAME, "tr")
if len(rows) == 0:
log.info('无车票信息')
sleep(0.1)
break
# 遍历每一行
for i, row in enumerate(rows):
# 获取该行的所有单元格
try:
cells = row.find_elements(By.TAG_NAME, "td")
except:
pass
if len(cells) == 0:
continue
train = cells[0].text
# log.info(f'车次信息:{train}')
if trains:
this_train = check_train(train, trains)
if not this_train:
continue
else:
this_train = train.split('\n')[0]
train_infos = train.split('\n')
if len(train_infos) < 2:
continue
if this_train in is_error_by:
continue
# 报错
start_station = train_infos[1]
end_station = train_infos[2]
start_time = train_infos[3]
end_time = train_infos[4]
total_time = train_infos[5]
log_text = f'[{day}] [{this_train}] [{start_time}]从[{start_station}]出发, [{end_time}]到达[{end_station}], 历时:[{total_time}]'
log.info(f'===>> 查询到 {log_text}')
if f'{this_train}|{day}' in is_success_buy:
log.info(f'车次日期[{day}][{this_train}]日期[{day}已购买到票, 跳过]')
continue
valid_seat_type = None
for seat in seats:
si = get_seat_index(seat)
remain_ticket = None
try:
if '有' in cells[si].text:
remain_ticket = '很多'
else:
remain_ticket = int(cells[si].text)
except:
pass
if remain_ticket is None:
log.info(f'日期[{day}]车次[{this_train}]座位[{seat}]无票')
else:
log.info(f'日期[{day}]车次[{this_train}]座位[{seat}]剩余[{remain_ticket}]张票')
if remain_ticket == '很多' or remain_ticket >= person_count:
valid_seat_type = seat
break
if valid_seat_type is None:
log.info(f'日期[{day}]车次[{this_train}]可用车票小于{person_count}张, 查询其他车次')
continue
log.info(f'日期[{day}]车次[{this_train}]可用座位类型[{valid_seat_type}], 执行购买')
log.info(cells[12].text)
log.info(f'查到有票耗时:{round(time.time() - st1, 2)}秒')
while True:
try:
get_ele_by_id('queryLeftTable')
cells[12].click()
sleep(0.1)
except:
break
log.info(f'跳到提交订单页面耗时:{round(time.time() - st1, 2)}秒')
sleep(0.1)
is_continue = False
# 车辆临近提醒
no_login_count = 0
while True:
try:
get_ele_by_id('normal_passenger_id').find_elements(By.TAG_NAME, "li")
break
except:
no_login_count += 1
if no_login_count > 30:
log.info('未登录或者有未处理的订单, 无法处理, 退出本次购买逻辑')
is_running = False
return None
try:
browser.find_element(By.ID, "qd_closeDefaultWarningWindowDialog_id").click()
except:
pass
if exists_id('content_defaultwarningAlert_hearder'):
error_msg = get_ele_by_id('content_defaultwarningAlert_hearder').text
log.info(f'出错, 无法预定该车次, 错误信息:{error_msg}')
is_continue = True
break
sleep(0.1)
log.info(f'车辆临近提醒检测耗时:{round(time.time() - st1, 2)}秒')
if is_continue:
break
# 选择乘车人
is_ready('normal_passenger_id')
choose_person_count = 0
while choose_person_count != len(item['乘车人']):
while True:
try:
get_ele_by_id('normal_passenger_id').find_elements(By.TAG_NAME, "li")
break
except:
time.sleep(0.2)
lis = get_ele_by_id('normal_passenger_id').find_elements(By.TAG_NAME, "li")
for li in lis:
if li.text in item['乘车人']:
if 'colorA' not in li.get_attribute('outerHTML'):
li.find_element(By.TAG_NAME, 'label').click()
try:
choose_person_count = len(
browser.find_element(By.ID, 'normal_passenger_id').find_elements(By.CLASS_NAME,
"colorA"))
except:
pass
sleep(0.1)
log.info(f'选择乘车人耗时耗时:{round(time.time() - st1, 2)}秒')
# 选择座位类型
for seq in range(person_count):
id = 'seatType_' + str(seq + 1)
click_by_id(id)
seat_type_selects = get_ele_by_id(id).find_elements(By.TAG_NAME, "option")
for sts in seat_type_selects:
if valid_seat_type in sts.text:
sts.click()
# 提交订单
click_by_id('submitOrder_id')
is_ready('id-seat-sel')
# 选择座位位置
seat_likes = item.get('座位偏好', None)
if seat_likes:
choose_seat_count = 0
sitem = None
while sitem is None:
sshd = browser.find_element(By.ID, 'id-seat-sel').find_element(By.CLASS_NAME,
'seat-sel-bd')
sitems = sshd.find_elements(By.CLASS_NAME, 'sel-item')
for st in sitems:
st_html = st.get_attribute('outerHTML')
if 'display' in st_html and 'block' in st_html and st.get_attribute('id').endswith(
'1'):
sitem = st
break
if sitem is None:
sleep(0.1)
while choose_seat_count != person_count:
slis = sitem.find_elements(By.TAG_NAME, 'li')
ind = 0
for sli in slis:
if ind == len(seat_likes):
break
if sli.text in seat_likes:
if 'cur' not in sli.get_attribute('outerHTML'):
sli.click()
sleep(0.1)
if 'cur' in sli.get_attribute('outerHTML'):
ind += 1
if ind == len(seat_likes) or ind == person_count:
break
try:
choose_seat_count = len(
browser.find_element(By.ID, 'id-seat-sel').find_elements(By.CLASS_NAME,
"cur"))
except:
pass
if exists_id('orderResultInfo_id'):
error_msg = get_ele_by_id('orderResultInfo_id').text
if error_msg.strip():
log.info(f'出错, 无法提交订单, 错误信息:{error_msg}')
is_error_by.append(this_train)
continue
if choose_seat_count != person_count:
sleep(0.1)
# 提交订单
is_ready('qr_submit_id')
get_ele_by_id('qr_submit_id').click()
sub_count = 0
while exists_id('qr_submit_id'):
sub_count += 1
try:
get_ele_by_id('qr_submit_id').click()
sleep(1)
except:
pass
if exists_id('cancelButton') or exists_id('payButton'):
break
if sub_count > 5:
sleep(3)
sleep(0.1)
# 测试
# if 1 == 1:
# log.info('测试退出')
# is_running = False
# return None
# 测试
if current_buy_ticket_count is None:
buy_ticket_count[btc_key] = person_count
else:
buy_ticket_count[btc_key] = current_buy_ticket_count + person_count
is_success_buy.append(f'{this_train}|{day}')
# 订单提交成功,发送消息,
send_ding_ding(
f'{log_text} 已下单成功, 请快速付款')
# 随机睡眠1-30秒
log.info(f'{log_text} 已下单成功, 查询其他车次')
sleep(0.1)
# 计算购票次数是否足够
current_buy_ticket_count = buy_ticket_count.get(btc_key, None)
if current_buy_ticket_count < need_buy_ticket_count:
log.info(
f'未完成购票数量, 继续购买当天其他车次, 已购买数量:{current_buy_ticket_count}, 需求购票数量:{need_buy_ticket_count}')
browser.get(current_url)
break
else:
log.info(
f'第[{item_ind}]个购票需求已完成购票数量{need_buy_ticket_count}, 执行处理其他购票需求')
break
except Exception:
log.info(f'执行出错, 随机睡眠秒, {traceback.format_exc()}')
# random_sleep(get_random_sleep_second())
sleep(3)
except Exception:
log.info(f'执行买票逻辑出错, {traceback.format_exc()}')
is_running = False
log.info(f'购票逻辑执行完成')
def login():
log.info('打开12306官网并点击扫码')
# 登录
browser.get("https://kyfw.12306.cn/otn/resources/login.html")
# 设置一个最长等待时间为10秒
wait = WebDriverWait(browser, 10)
try:
# 等待直到元素变得可见
element = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "[id='J-userName']")))
except TimeoutException as e:
log.error("Element not found within the given time.", e)
browser.find_element(By.CLASS_NAME, "login-hd-account").click()
def index():
log.info('已扫码登录, 根据配置执行买票逻辑...')
global is_login
is_login = True
buy_tickets()
def handle():
global browser, city_code
# 加载城市编码
city_code_path = os.getcwd() + os.sep + 'city_code.txt'
if os.path.exists(city_code_path):
city_code_str = read_f_8(city_code_path)
log.info(f'已下载城市编码文件')
else:
res = requests.get('https://kyfw.12306.cn/otn/resources/js/framework/station_name.js')
city_code_str = res.text.split("'")[1]
write_f_8(city_code_path, city_code_str)
log.info(f'无城市编码文件, 下载城市编码文件, 保存目录:{city_code_path}')
for item in city_code_str.split('|||'):
codes = item.split('|')
if len(codes) > 2:
city_code[codes[1]] = codes[2]
log.info(f'已载入城市编码文件, 数量:{len(city_code)}')
log.info('启动谷歌浏览器')
browser = webdriver.Chrome()
browser.set_window_size(1280, 920)
login()
# buy_tickets()
def test():
is_test = 2
if is_test == 1:
ss = get_random_sleep_second()
print(f'1已下单成功, 随机睡眠{ss}秒, 查询其他车次')
sleep(ss)
os.kill(os.getpid(), 9)
if __name__ == '__main__':
test()
# 操作按钮
threading.Thread(target=show_windows, args=(), daemon=True).start()
# 启动定时任务
threading.Thread(target=cmd_task, args=(), daemon=True).start()
# 处理
handle()
# 阻塞
block()
原文地址:https://blog.csdn.net/Lxinccode/article/details/145161844
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!