「Python笔记」python爬虫简单实战

使用requests+BeautifulSoup+sqlalchemy+pymysql爬取猫眼TOP100并写入数据库和txt文档

做题用到爬虫正好复习一下一些东西,爬取猫眼TOP100电影,并用sqlalchemy写入数据库,并写入txt文档

先做好数据库连接的配置

from sqlalchemy import create_engine,Column,Integer,String,Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

HOSTNAME = '127.0.0.1'
DATABASE = 'movies'
PORT = '3306'
USERNAME = 'root'
PASSWORD = 'root'
DB_URL = "mysql+pymysql://{username}:{password}@{host}:{port}/{database}?charset=utf8mb4".format(username=USERNAME,password=PASSWORD,host=HOSTNAME, port=PORT,database=DATABASE)
engine = create_engine(DB_URL)
conn = engine.connect()
Base = declarative_base()
Session = sessionmaker(engine)()

创建数据表

class Movies(Base):
    __tablename__ = 'movies'
    index = Column(Integer,primary_key=True,autoincrement=True)
    src = Column(Text,nullable=False)
    name = Column(String(50),nullable=False)
    actor = Column(String(50),nullable=False)
    time = Column(String(50),nullable=False)
    score = Column(String(50),nullable=False)

Base.metadata.create_all(engine)

alter = 'alter table movies convert to character set utf8mb4;'
conn.execute(alter)

要注意执行修改字符集语句,否贼无法写入

分析结构

from bs4 import BeautifulSoup
import requests
import re

def main(index):
    req = requests.get(url.format(str(index)))
    soup = BeautifulSoup(req.text, "html5lib")
    for item in soup.select('dd'):
        pass

分析结构可以看出,每一部电影都写在一个<dd>标签中,只要获取到这个标签,再向下搜索就能得到想要的数据

爬取数据

def get_index(item):
    index = item.select_one("i").text
    return index

def get_src(item):
    img_src = item.select("img")[1]
    template = re.compile('data-src="(.*?)"')
    img_src = template.findall(str(img_src))[0]
    return img_src

def get_name(item):
    name = item.select(".name")[0].text
    return name

def get_actor(item):
    actor = item.select(".star")[0].text.split(':')[1]
    return actor

def get_time(item):
    time = item.select(".releasetime")[0].text.split(':')[1]
    return time

def get_score(item):
    score = item.select('.integer')[0].text + item.select('.fraction')[0].text
    return score

获取需要的信息,因为srcdata-scr中,所以这里我用正则去获取。

构造dict

def get_dict(item):
    index = int(get_index(item))
    src = get_src(item)
    name = get_name(item)
    actor = get_actor(item)
    time = get_time(item)
    score = get_score(item)
    movies_dict = {'index': index, 'src': src, 'name': name, 'actor': actor, 'time': time, 'score': score}
    return movies_dict

将爬取的数据整理成dict(写完后觉得这步没有必要)

写入txt

def write_file(content):
    content = json.dumps(content,ensure_ascii=False)
    with open('result.txt','a') as f:
        f.write(content +'\n')

这里需要将dictjson.dumps方法编码成json字符串,否则无法写入

写入数据库

def write_to_mysql(content):
    src = content['src']
    name = content['name']
    actor = content['actor'].split('\n')[0]
    time = content['time']
    score = content['score']
    data = Movies(src = src,name=name,actor=actor,time=time,score=score)
    Session.add(data)
    Session.commit()

在主函数中调用

def main(index):
    req = requests.get(url.format(str(index)))
    soup = BeautifulSoup(req.text, "html5lib")
    for item in soup.select('dd'):
        movies_dict = get_dict(item)
        write_to_mysql(movies_dict)
        write_file(movies_dict)

爬取所有页面

for i in range(10):
    main(i*10)

完整代码

from bs4 import BeautifulSoup
from sqlalchemy import create_engine,Column,Integer,String,Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import requests
import re
import json

HOSTNAME = '127.0.0.1'
DATABASE = 'movies'
PORT = '3306'
USERNAME = 'root'
PASSWORD = 'root'
DB_URL = "mysql+pymysql://{username}:{password}@{host}:{port}/{database}?charset=utf8mb4".format(username=USERNAME,password=PASSWORD,host=HOSTNAME, port=PORT,database=DATABASE)
engine = create_engine(DB_URL)
conn = engine.connect()

Base = declarative_base()
Session = sessionmaker(engine)()


class Movies(Base):
    __tablename__ = 'movies'
    index = Column(Integer,primary_key=True,autoincrement=True)
    src = Column(Text,nullable=False)
    name = Column(String(50),nullable=False)
    actor = Column(String(50),nullable=False)
    time = Column(String(50),nullable=False)
    score = Column(String(50),nullable=False)

Base.metadata.create_all(engine)

alter = 'alter table movies convert to character set utf8mb4;'
conn.execute(alter)

def get_index(item):
    index = item.select_one("i").text
    return index

def get_src(item):
    img_src = item.select("img")[1]
    template = re.compile('data-src="(.*?)"')
    img_src = template.findall(str(img_src))[0]
    return img_src

def get_name(item):
    name = item.select(".name")[0].text
    return name

def get_actor(item):
    actor = item.select(".star")[0].text.split(':')[1]
    return actor

def get_time(item):
    time = item.select(".releasetime")[0].text.split(':')[1]
    return time

def get_score(item):
    score = item.select('.integer')[0].text + item.select('.fraction')[0].text
    return score

def get_dict(item):
    index = int(get_index(item))
    src = get_src(item)
    name = get_name(item)
    actor = get_actor(item)
    time = get_time(item)
    score = get_score(item)
    movies_dict = {'index': index, 'src': src, 'name': name, 'actor': actor, 'time': time, 'score': score}
    return movies_dict


def write_file(content):
    content = json.dumps(content,ensure_ascii=False)
    with open('result.txt','a') as f:
        f.write(content +'\n')

def write_to_mysql(content):
    src = content['src']
    name = content['name']
    actor = content['actor'].split('\n')[0]
    time = content['time']
    score = content['score']
    data = Movies(src = src,name=name,actor=actor,time=time,score=score)
    Session.add(data)
    Session.commit()


def main(index):
    req = requests.get(url.format(str(index)))
    soup = BeautifulSoup(req.text, "html5lib")
    for item in soup.select('dd'):
        movies_dict = get_dict(item)
        write_to_mysql(movies_dict)
        write_file(movies_dict)



url = 'https://maoyan.com/board/4?offset={}'

for i in range(10):
    main(i*10)

使用selenium爬取空间说说

配置驱动,模拟登陆

from selenium import webdriver
import time

qq = input("请输入qq号")

ss_url ='https://user.qzone.qq.com/{}/311'.format(qq)

driver = webdriver.Chrome("chromedriver.exe")
driver.maximize_window()

driver.get(ss_url)
driver.switch_to.frame('login_frame')
driver.find_element_by_class_name('face').click()

next_page='page'
page=1

抓取说说

while next_page:
    time.sleep(2)
    # driver.implicitly_wait(100)
    driver.switch_to.frame('app_canvas_frame')
    content = driver.find_elements_by_css_selector('.content')
    stime = driver.find_elements_by_css_selector('.c_tx.c_tx3.goDetail')
    print('正在抓取第%s页'%page)
    for con, sti in zip(content, stime):
        data = {
            'time': sti.text,
            'shuos': con.text
        }
        print(data)
    time.sleep(1)

使用zip构建元组来遍历
使用time.sleep()来等待页面加载(因为隐式等待和显示等待没搞明白,所以用强制等待。。。。)

翻页

    next_page = driver.find_element_by_link_text('下一页')
    page = page+1
    next_page.click()
    driver.switch_to.parent_frame()

翻页后要使用driver.switch_to.parent_frame()找到上策frame,否则无法定位标签

完整代码

from selenium import webdriver
import time

qq = input("请输入qq号")

ss_url ='https://user.qzone.qq.com/{}/311'.format(qq)

driver = webdriver.Chrome("chromedriver.exe")
driver.maximize_window()

driver.get(ss_url)
driver.switch_to.frame('login_frame')
driver.find_element_by_class_name('face').click()
next_page='page'
page=1
while next_page:
    time.sleep(2)
    # driver.implicitly_wait(100)
    driver.switch_to.frame('app_canvas_frame')
    content = driver.find_elements_by_css_selector('.content')
    stime = driver.find_elements_by_css_selector('.c_tx.c_tx3.goDetail')
    print('正在抓取第%s页'%page)
    for con, sti in zip(content, stime):
        data = {
            'time': sti.text,
            'shuos': con.text
        }
        print(data)
    time.sleep(1)
    next_page = driver.find_element_by_link_text('下一页')
    page = page+1
    next_page.click()
    driver.switch_to.parent_frame()

参与评论