菜单
个人主页
(当前)
写文章
浏览博文
    
搜索
登录
微信公众号
站点链接
半根蓝白个人主页
CSDN
Github
友情链接
摘繁华个人博客
博文目录
#custom-toc-container
某投诉网站爬虫-获取对应的投诉信息
BGLB0324
2021年1月31日 18:59
最后发布:2021年1月31日 18:59
首发:2021年1月31日 18:42
780
2
博文分类:
Python
博文标签:
爬虫
版权声明:本文为博主[BGLB0324]原创文章,遵循
CC 4.0 BY
版权协议,转载请附上原文出处链接和本声明。
本文链接:
http://blog.bglb.work/blog/blog-detail/55
版权
# 某投诉网站爬虫-获取对应的投诉信息 > 此爬虫原本是一道面试题,说难不难,说简单也不简单;因为站点的反爬机制,我现在也没摸清楚,不过目前看来应该是你在一段时间内访问数据超过对应的数量就会封ip 5-10分钟,然后就可以正常访问了 ## 爬虫源代码 ```python # -*- coding:utf-8 -*- # @Time : 2020-12-03 13:20 # @Author : BGLB # @Email : bglb@qq.com # @Software : PyCharm import csv import hashlib import json import os import random import threading import time from requests import get """ 题目要求: 1. 用任意一语言,或者Python或者其他熟悉的,写一个爬虫程序,爬取tousu.sina.com.cn网站中 最新投诉的内容,并且输出到一个文本文件或者excel文件。 2. 输出内容: 1.投诉内容 2.投诉对象 3.投诉要求 4.投诉时间 3. 测试时间:测试时间为48小时 4. 提交内容 a.源程序并附上注释 b.输出的文件实例 """ def time_logging(func): """ 记录函数运行时间的装饰器 :param func: 需要记录的函数名 :return: """ def wrapper(*args, **kw): start_time = time.time() func_result = func(*args, **kw) runtime = time.time()-start_time if runtime < 60: runtime = "{:.2f}s".format(runtime) elif runtime < 3600: runtime = "{:.2f}m".format(runtime/60) else: runtime = "{:.2f}h".format(runtime/3600) content = '[{0:^15}] - 运行时间 - [{1:^6}]'.format(func.__name__, runtime) print("{}".format(content)) return func_result return wrapper class SinaTousu(object): def __init__(self, host_str="全国投诉", type_str="最热投诉", count=100): self.host_str = host_str self.type_str = type_str self.__page_size = 30 self.__pages = 1 self.__max_thread_count = 10 self.__current_thread_count = 0 self.__url = self.__create_url() self.count = count if self.count > self.__page_size: self.__pages += int(count/self.__page_size) else: self.__page_size = self.count __data_type_dict = { "最热投诉": 1, "最新投诉": 2, "已回复": 3, "已完成": 4, } __hosts_type_dict = { "湖北投诉": "https://hb.tousu.sina.com.cn", "全国投诉": "https://tousu.sina.com.cn" } # 站点的相关接口 由于只爬取投诉接口 所以没有过多优化 def __create_url(self): """ 构造url """ __api_dict = { "LAWS_FEED": self.__hosts_type_dict[self.host_str]+"/api/laws/feed", "INDEX_FEED": self.__hosts_type_dict[self.host_str]+"/api/index/feed", # 投诉相关的接口 通过js 解析获得 "index_article": self.__hosts_type_dict[self.host_str]+"/api/articles/notice", } return __api_dict["INDEX_FEED"] def forged_param(self, page): """ 构造参数: :param page: 当前抓取页数 :return: """ ts = int(time.time()*1000) key = "$d6eb7ff91ee257475%" rs = "" a = [str(x) for x in range(0, 9)] + \ [chr(x).lower() for x in range(65, 91)] + \ [chr(x) for x in range(65, 91)] for i in range(16): rs += a[random.randint(1, len(a)-1)] a = "".join( sorted([str(ts), rs, key, str(self.__data_type_dict[self.type_str]), str(self.__page_size), str(page)])) signature = hashlib.sha256(a.encode("utf8")).hexdigest() return { "ts": ts, "type": self.__data_type_dict[self.type_str], "page_size": self.__page_size, "page": page, "rs": rs, "signature": signature } @staticmethod def parse_json(jsondata): """ 处理json数据 :param jsondata: :return: """ if jsondata is None: return res_lists = [] host = "https:" for item in jsondata["lists"]: res_item = { "投诉编号": item["main"]["sn"], "投诉对象": item["main"]["cotitle"], "投诉标题": item["main"]["title"], "投诉时间": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(item["main"]["timestamp"]))), "投诉详情": item["main"]["summary"], "投诉要求": item["main"]["appeal"], "详情页面": host+item["main"]["url"], "投诉发起人昵称": item["author"]["title"], "投诉发起人微博": host+item["author"]["wb_profile"], # "投诉发起人性别": item["author"]["gender"] } res_lists.append(res_item) return res_lists def get_json(self, param_dict): res = get(self.__url, param_dict) # print(self.__url) res.encoding = res.apparent_encoding data = res.content.decode() print(res.status_code) if res.status_code == 456: print("您被封啦,请等待5~60分钟自动解封") if res.status_code == 200: result = json.loads(data)["result"] if result["status"]["code"] == 0: return result["data"] print("{}参数错误!检查参数".format(result["status"])) return None @staticmethod def write_file(filename, datas): """ 写入文件 :param filename: 文件名称 :param data: 数据 :return: 写入数据行数 """ ext = filename.split('.')[-1] path = "./"+filename # is_write = os.path.exists(path) with open(path, "w", encoding="utf8", newline='') as f: if ext == "csv": header = [x for x in datas[0].keys()] w = csv.DictWriter(f, fieldnames=header) w.writerows(datas) # 写入数据 if ext == "json": json.dump(datas, f, ensure_ascii=False) print("写入文件-[{}]-[{}]条数据".format(path, len(datas))) return len(datas) @staticmethod def file_walker(path): file_dict = {"json": [], "result": []} for root, dirs, files in os.walk(path): for fn in files: if fn.startswith("result"): file_dict["result"].append(fn) if fn.startswith("jsondata"): file_dict["json"].append(fn) for v in file_dict.values(): if len(v) > 0: v.sort(key=lambda x: int(x.split('.')[0].split('_')[-1])) return file_dict @staticmethod def combine(res_filename, filelist): ext = res_filename.split('.')[-1] resfilepath = "./"+res_filename if os.path.exists(resfilepath): os.remove(resfilepath) if len(filelist) == 0: return with open(res_filename, 'wb+') as fw: if ext == "csv": fw.write("投诉编号,投诉对象,投诉标题,投诉时间,投诉详情,投诉要求,详情页面,投诉发起人昵称,投诉发起人微博\n".encode('utf8')) for file in filelist: file_path = "./"+file fw.write(open(file_path, 'rb').read()) os.remove(file_path) if ext == "json": fw.write("{".encode("utf8")) for file in filelist: file_path = "./"+file fw.write('"result_{}":'.format(file.split('.')[0].split('_')[-1]).encode('utf8')) fw.write(open(file_path, 'rb').read()) if filelist.index(file) is not len(filelist)-1: fw.write(','.encode('utf8')) os.remove(file_path) fw.write("}".encode("utf8")) # raise ValueError("参数错误{}".format(res_filename)) def set_pages_pagesize(self, pages, page_size): self.count = pages*page_size if page_size > 30: self.__page_size = 30 self.__pages = int(self.count/self.__page_size)+1 print("警告:每页最大数据条数为30条,已为您选取最优选择:\n{}".format({"pages": self.__pages, "page_size": self.__page_size})) else: self.__pages = pages self.__page_size = page_size def get_pages_pagesize(self): return {"pages": self.__pages, "page_size": self.__page_size} def __start(self, page, isMultithreading): lock = None if isMultithreading: lock = threading.Lock() lock.acquire() params = self.forged_param(page) data_json = self.get_json(params) if data_json is None: return pages = data_json['pager']['page_amount'] item_count = data_json['pager']['page_amount'] if pages < self.__pages: self.__pages = pages print("----数据总页数-[{}]-数据总条数-[{}]--------".format(pages, item_count)) result_data = self.parse_json(data_json) self.write_file("jsondata_{}.json".format(page), data_json) self.write_file("result_{}.csv".format(page), result_data) if isMultithreading: lock.release() def thread_manage(self, f, kwargs): """ 线程管理函数 :param f: 函数 :return: """ t = threading.Thread(target=f, kwargs=kwargs) self.__current_thread_count += 1 time.sleep(0.1) t.start() if threading.active_count()-2 >= self.__max_thread_count+3: t.join() # if self.__current_thread_count >= self.__max_thread_count: if self.__current_thread_count > 40: self.__current_thread_count = 0 time.sleep(10) def rm_file(self): for file in ["./result.csv", "./jsondata.json"]: if os.path.exists(file): os.remove(file) def run(self, page_list=None, isMultithreading=True): """ 传入需要爬取的页码list :param page_list: 默认值空 :param isMultithreading: 默认多线程 :return: """ self.rm_file() print("------------一共爬取{}条数据,选取最优的爬取速度为 [每次抓取量:{}, 抓取次数:{}]-------------".format( self.count, self.__page_size, self.__pages)) page_count_list = [x for x in range(1, self.__pages+1)] if page_list is not None: page_count_list = page_list random.shuffle(page_count_list) for page in page_count_list: # time.sleep(0.5) # self.__start(page) self.thread_manage(self.__start, kwargs={"page": page,'isMultithreading':isMultithreading}) print( "--------------[{}]-[{}]-当前抓取次数-[{}]------------------".format( self.host_str, self.type_str, page)) while True: if threading.active_count() == 1: fs_dict = self.file_walker("./") self.combine("result.csv", fs_dict["result"]) self.combine("jsondata.json", fs_dict["json"]) print("抓取完毕") break if __name__ == '__main__': q = SinaTousu("全国投诉", "最新投诉") # print(q.get_pages_pagesize()) q.set_pages_pagesize(100, 30) @time_logging def main(): q.run() # q.run(isMultithreading=False) # 关闭多线程 main() """ 时间 - 访问次数 25s - 180 4m - 325 41s - 90 41s - 95 6m - 406 11s - 40 封 """ ``` ## 爬虫解析 1. 网站接口主要的加密方式为 `sha256 ` 主要变量有 时间戳`ts`, `key="$d6eb7ff91ee257475%"` 字符串数组a `[0-9,a-z,A-Z]` 当前访问页码`page`,当前访问每页数据量 `page_size` 访问类型对应的数字 - `_type = ["最热投诉","最新投诉",'已完成', '已回复']` 数组中随机一个字符串`rs` 先来一个空数组 `base_sign = []` 数组中有六个变量 字符串`ts` `rs` `key` `_type_index + 1` `page_size` `page` 然后 把 `base_sign` 升序排列 转为 字符串 最后通过 `sha256` 加密这个字符串 就可以得到 `signature` 的值 2. 网站接口分为 `全国站点` 和 `湖北站点` 目前也只发现这两个站点 3. 每次访问最多可以获取30条数据,网站上每次固定十条数据 ## 代码解析 1. 代码每次运行 会删除上次爬的数据 2. 代码加入了多线程,可以设置关闭, 多线程还有点小问题 会导致反爬 ## 测试图片    大概是访问了 80多次 就封了 有大佬可以请教下这个反爬该怎么避免吗?
点赞
2
打赏
暂时没有评论
请
登录
后评论
暂时没有评论