使用requests+BeautifulSoup+sqlalchemy+pymysql爬取猫眼TOP100并写入数据库和txt文档
做题用到爬虫正好复习一下一些东西,爬取猫眼TOP100电影,并用sqlalchemy写入数据库,并写入txt文档
先做好数据库连接的配置
from sqlalchemy import create_engine,Column,Integer,String,Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
HOSTNAME = '127.0.0.1'
DATABASE = 'movies'
PORT = '3306'
USERNAME = 'root'
PASSWORD = 'root'
DB_URL = "mysql+pymysql://{username}:{password}@{host}:{port}/{database}?charset=utf8mb4".format(username=USERNAME,password=PASSWORD,host=HOSTNAME, port=PORT,database=DATABASE)
engine = create_engine(DB_URL)
conn = engine.connect()
Base = declarative_base()
Session = sessionmaker(engine)()
创建数据表
class Movies(Base):
__tablename__ = 'movies'
index = Column(Integer,primary_key=True,autoincrement=True)
src = Column(Text,nullable=False)
name = Column(String(50),nullable=False)
actor = Column(String(50),nullable=False)
time = Column(String(50),nullable=False)
score = Column(String(50),nullable=False)
Base.metadata.create_all(engine)
alter = 'alter table movies convert to character set utf8mb4;'
conn.execute(alter)
要注意执行修改字符集语句,否贼无法写入
分析结构
from bs4 import BeautifulSoup
import requests
import re
def main(index):
req = requests.get(url.format(str(index)))
soup = BeautifulSoup(req.text, "html5lib")
for item in soup.select('dd'):
pass
分析结构可以看出,每一部电影都写在一个<dd>
标签中,只要获取到这个标签,再向下搜索就能得到想要的数据
爬取数据
def get_index(item):
index = item.select_one("i").text
return index
def get_src(item):
img_src = item.select("img")[1]
template = re.compile('data-src="(.*?)"')
img_src = template.findall(str(img_src))[0]
return img_src
def get_name(item):
name = item.select(".name")[0].text
return name
def get_actor(item):
actor = item.select(".star")[0].text.split(':')[1]
return actor
def get_time(item):
time = item.select(".releasetime")[0].text.split(':')[1]
return time
def get_score(item):
score = item.select('.integer')[0].text + item.select('.fraction')[0].text
return score
获取需要的信息,因为src
在data-scr
中,所以这里我用正则去获取。
构造dict
def get_dict(item):
index = int(get_index(item))
src = get_src(item)
name = get_name(item)
actor = get_actor(item)
time = get_time(item)
score = get_score(item)
movies_dict = {'index': index, 'src': src, 'name': name, 'actor': actor, 'time': time, 'score': score}
return movies_dict
将爬取的数据整理成dict(写完后觉得这步没有必要)
写入txt
def write_file(content):
content = json.dumps(content,ensure_ascii=False)
with open('result.txt','a') as f:
f.write(content +'n')
这里需要将dict
用json.dumps
方法编码成json
字符串,否则无法写入
写入数据库
def write_to_mysql(content):
src = content['src']
name = content['name']
actor = content['actor'].split('n')[0]
time = content['time']
score = content['score']
data = Movies(src = src,name=name,actor=actor,time=time,score=score)
Session.add(data)
Session.commit()
在主函数中调用
def main(index):
req = requests.get(url.format(str(index)))
soup = BeautifulSoup(req.text, "html5lib")
for item in soup.select('dd'):
movies_dict = get_dict(item)
write_to_mysql(movies_dict)
write_file(movies_dict)
爬取所有页面
for i in range(10):
main(i*10)
完整代码
from bs4 import BeautifulSoup
from sqlalchemy import create_engine,Column,Integer,String,Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import requests
import re
import json
HOSTNAME = '127.0.0.1'
DATABASE = 'movies'
PORT = '3306'
USERNAME = 'root'
PASSWORD = 'root'
DB_URL = "mysql+pymysql://{username}:{password}@{host}:{port}/{database}?charset=utf8mb4".format(username=USERNAME,password=PASSWORD,host=HOSTNAME, port=PORT,database=DATABASE)
engine = create_engine(DB_URL)
conn = engine.connect()
Base = declarative_base()
Session = sessionmaker(engine)()
class Movies(Base):
__tablename__ = 'movies'
index = Column(Integer,primary_key=True,autoincrement=True)
src = Column(Text,nullable=False)
name = Column(String(50),nullable=False)
actor = Column(String(50),nullable=False)
time = Column(String(50),nullable=False)
score = Column(String(50),nullable=False)
Base.metadata.create_all(engine)
alter = 'alter table movies convert to character set utf8mb4;'
conn.execute(alter)
def get_index(item):
index = item.select_one("i").text
return index
def get_src(item):
img_src = item.select("img")[1]
template = re.compile('data-src="(.*?)"')
img_src = template.findall(str(img_src))[0]
return img_src
def get_name(item):
name = item.select(".name")[0].text
return name
def get_actor(item):
actor = item.select(".star")[0].text.split(':')[1]
return actor
def get_time(item):
time = item.select(".releasetime")[0].text.split(':')[1]
return time
def get_score(item):
score = item.select('.integer')[0].text + item.select('.fraction')[0].text
return score
def get_dict(item):
index = int(get_index(item))
src = get_src(item)
name = get_name(item)
actor = get_actor(item)
time = get_time(item)
score = get_score(item)
movies_dict = {'index': index, 'src': src, 'name': name, 'actor': actor, 'time': time, 'score': score}
return movies_dict
def write_file(content):
content = json.dumps(content,ensure_ascii=False)
with open('result.txt','a') as f:
f.write(content +'n')
def write_to_mysql(content):
src = content['src']
name = content['name']
actor = content['actor'].split('n')[0]
time = content['time']
score = content['score']
data = Movies(src = src,name=name,actor=actor,time=time,score=score)
Session.add(data)
Session.commit()
def main(index):
req = requests.get(url.format(str(index)))
soup = BeautifulSoup(req.text, "html5lib")
for item in soup.select('dd'):
movies_dict = get_dict(item)
write_to_mysql(movies_dict)
write_file(movies_dict)
url = 'https://maoyan.com/board/4?offset={}'
for i in range(10):
main(i*10)
使用selenium爬取空间说说
配置驱动,模拟登陆
from selenium import webdriver
import time
qq = input("请输入qq号")
ss_url ='https://user.qzone.qq.com/{}/311'.format(qq)
driver = webdriver.Chrome("chromedriver.exe")
driver.maximize_window()
driver.get(ss_url)
driver.switch_to.frame('login_frame')
driver.find_element_by_class_name('face').click()
next_page='page'
page=1
抓取说说
while next_page:
time.sleep(2)
# driver.implicitly_wait(100)
driver.switch_to.frame('app_canvas_frame')
content = driver.find_elements_by_css_selector('.content')
stime = driver.find_elements_by_css_selector('.c_tx.c_tx3.goDetail')
print('正在抓取第%s页'%page)
for con, sti in zip(content, stime):
data = {
'time': sti.text,
'shuos': con.text
}
print(data)
time.sleep(1)
使用zip
构建元组来遍历
使用time.sleep()
来等待页面加载(因为隐式等待和显示等待没搞明白,所以用强制等待。。。。)
翻页
next_page = driver.find_element_by_link_text('下一页')
page = page+1
next_page.click()
driver.switch_to.parent_frame()
翻页后要使用driver.switch_to.parent_frame()
找到上策frame
,否则无法定位标签
完整代码
from selenium import webdriver
import time
qq = input("请输入qq号")
ss_url ='https://user.qzone.qq.com/{}/311'.format(qq)
driver = webdriver.Chrome("chromedriver.exe")
driver.maximize_window()
driver.get(ss_url)
driver.switch_to.frame('login_frame')
driver.find_element_by_class_name('face').click()
next_page='page'
page=1
while next_page:
time.sleep(2)
# driver.implicitly_wait(100)
driver.switch_to.frame('app_canvas_frame')
content = driver.find_elements_by_css_selector('.content')
stime = driver.find_elements_by_css_selector('.c_tx.c_tx3.goDetail')
print('正在抓取第%s页'%page)
for con, sti in zip(content, stime):
data = {
'time': sti.text,
'shuos': con.text
}
print(data)
time.sleep(1)
next_page = driver.find_element_by_link_text('下一页')
page = page+1
next_page.click()
driver.switch_to.parent_frame()