master
newrain001 3 years ago
parent 95337cc278
commit 9dd9260a21
  1. 36
      README.en.md
  2. 0
      api/.keep
  3. 0
      api/dingding.py
  4. 0
      api/sendmail.py
  5. 0
      api/wechat.py
  6. 128
      api/wechat3.py
  7. 53
      database/mysql.py
  8. 20
      library/data.json
  9. 125
      library/library.py
  10. 40
      op/Transp.py
  11. 0
      op/checkfile.py
  12. 50
      op/find.py
  13. 56
      spider/image.py
  14. 54
      spider/img.py
  15. 26
      spider/tktest.py
  16. 16
      spider/小说.py
  17. 30
      spider/小说片段.py
  18. 60
      spider/文件下载.py
  19. 102
      spider/文件下载2.py
  20. 19
      spider/校花网.py

@ -1,36 +0,0 @@
# python-project
#### Description
{**When you're done, you can delete the content in this README and update the file with details for others getting started with your repository**}
#### Software Architecture
Software architecture description
#### Installation
1. xxxx
2. xxxx
3. xxxx
#### Instructions
1. xxxx
2. xxxx
3. xxxx
#### Contribution
1. Fork the repository
2. Create Feat_xxx branch
3. Commit your code
4. Create Pull Request
#### Gitee Feature
1. You can use Readme\_XXX.md to support different languages, such as Readme\_en.md, Readme\_zh.md
2. Gitee blog [blog.gitee.com](https://blog.gitee.com)
3. Explore open source project [https://gitee.com/explore](https://gitee.com/explore)
4. The most valuable open source project [GVP](https://gitee.com/gvp)
5. The manual of Gitee [https://gitee.com/help](https://gitee.com/help)
6. The most popular members [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)

@ -1,65 +1,65 @@
#!/usr/bin/python3
#-*-coding: utf-8-*-
# by QianFeng.newrain
#
'''
Welcome QianFeng cloud computing
'''
import json
import sys
import time
import requests
# 此为企业的ID号
CorpID = ''
# 应用的ID
Agentid = 1000004
# 认证信息,企业ID+认证信息可获取tokent,获取之后向此tokent发送内容
Secret = ''
localtime = time.strftime("[%H:%M:%S]", time.localtime())
class Tencent(object):
def __init__(self,user,title):
import subprocess
a=subprocess.getoutput("free -h |awk 'NR==2{print $4}'")
b=subprocess.getoutput("df -Th |awk 'NR==2{print $5}'")
c= subprocess.getoutput("uptime |awk -F ':' '{print $NF}'")
msg =a+b+c
# 格式化输出内容:标题+内容
self.MSG = f'{title}\n{msg}\n{localtime}'
self.User = user
self.url = 'https://qyapi.weixin.qq.com'
self.send_msg = json.dumps({
"touser": self.User,
"msgtype": 'text',
"agentid": Agentid,
"text": {'content': self.MSG},
"safe": 0
})
# 获取tokent
def get_token(self):
token_url = '%s/cgi-bin/gettoken?corpid=%s&corpsecret=%s' % (self.url, CorpID, Secret)
r = requests.get(token_url)
r = r.json()
token = r['access_token']
return token
# 发送信息
def send_message(self):
send_url = '%s/cgi-bin/message/send?access_token=%s' % (self.url,self.get_token())
respone = requests.post(url=send_url, data=self.send_msg)
respone = respone.json()
x = respone['errcode']
if x == 0:
print ('Succesfully')
else:
print ('Failed')
if __name__ == '__main__':
# 创建对象
send_obj = Tencent('xxxxx','我就是我,不一样的烟火')
# 调用发送函数
#!/usr/bin/python3
#-*-coding: utf-8-*-
# by QianFeng.newrain
#
'''
Welcome QianFeng cloud computing
'''
import json
import sys
import time
import requests
# 此为企业的ID号
CorpID = ''
# 应用的ID
Agentid = 1000004
# 认证信息,企业ID+认证信息可获取tokent,获取之后向此tokent发送内容
Secret = ''
localtime = time.strftime("[%H:%M:%S]", time.localtime())
class Tencent(object):
def __init__(self,user,title):
import subprocess
a=subprocess.getoutput("free -h |awk 'NR==2{print $4}'")
b=subprocess.getoutput("df -Th |awk 'NR==2{print $5}'")
c= subprocess.getoutput("uptime |awk -F ':' '{print $NF}'")
msg =a+b+c
# 格式化输出内容:标题+内容
self.MSG = f'{title}\n{msg}\n{localtime}'
self.User = user
self.url = 'https://qyapi.weixin.qq.com'
self.send_msg = json.dumps({
"touser": self.User,
"msgtype": 'text',
"agentid": Agentid,
"text": {'content': self.MSG},
"safe": 0
})
# 获取tokent
def get_token(self):
token_url = '%s/cgi-bin/gettoken?corpid=%s&corpsecret=%s' % (self.url, CorpID, Secret)
r = requests.get(token_url)
r = r.json()
token = r['access_token']
return token
# 发送信息
def send_message(self):
send_url = '%s/cgi-bin/message/send?access_token=%s' % (self.url,self.get_token())
respone = requests.post(url=send_url, data=self.send_msg)
respone = respone.json()
x = respone['errcode']
if x == 0:
print ('Succesfully')
else:
print ('Failed')
if __name__ == '__main__':
# 创建对象
send_obj = Tencent('xxxxx','我就是我,不一样的烟火')
# 调用发送函数
send_obj.send_message()

@ -0,0 +1,53 @@
import pymysql
class MySQLDB():
def __init__(self,*args,
host='localhost',
user='root',
password=None,
port=3306,
db='mysql',
charset='utf8',
):
self.conn = pymysql.connect(user=user,host=host,port=port,password=password,db=db,charset=charset,cursorclass=pymysql.cursors.DictCursor)
self.cursor = self.conn.cursor()
def DML(self,sql,condition,mode):
try:
if mode == 'insert':
if len(condition) > 1:
result = self.cursor.executemany(sql,condition)
else:
result = self.cursor.execute(sql, condition)
print(f'插入完成,受影响{result}')
if mode == 'update':
result = self.cursor.execute(sql)
print(f'更新完成,受影响{result}')
if mode == 'delete':
result = self.cursor.execute(sql)
print(f'删除完成,受影响{result}')
self.conn.commit()
except pymysql.MySQLError as e:
self.conn.rollback()
print(e)
finally:
self.conn.close()
def DQL(self,sql,size=None):
try:
result = self.cursor.execute(sql)
if size:
result2 = self.cursor.fetchmany(size)
else:
result2 = self.cursor.fetchall()
for i in result2:
for v in i.values():
print(v,end='\t')
print()
print(f'共查找{result}条记录')
except pymysql.MySQLError as e:
print(e)
finally:
self.conn.close()
def __str__(self):
return '详细操作手册查看 https://www.baidu.com'
a = MySQLDB(host='39.103.141.138',user='eval',password='123456',db='db1')
a.DQL('select id,name from user')

@ -0,0 +1,20 @@
{
"user": {
"haha": "6ca13d52ca70c883e0f0bb101e425a89e8624de51db2d2392593af6a84118090"
},
"book": {
"\u6bdb\u6cfd\u4e1c\u8bd7\u96c6": "2021-03-19 18:56:05.986340",
"\u5c0f\u4e8c\u90ce\u653e\u725b\u8bb0": "2021-03-19 18:56:05.986340",
"\u53d8\u5f62\u91d1\u521a3": "2021-03-22 16:53:38",
"\u53d8\u5f62\u91d1\u521a2": "2021-03-22 16:53:38",
"\u53d8\u5f62\u91d1\u521a1": "2021-03-22 17:30:17",
"\u767d\u96ea\u516c\u4e3b": "2021-03-22 17:34:36",
"\u683c\u6797\u7ae5\u8bdd": "2021-03-22 17:34:36",
"\u4eba\u6c11\u65e5\u62a5": "2021-03-22 17:34:36",
"\u8d70\u8fd1\u79d1\u5b66": "2021-03-22 17:34:36",
"\u5c0f\u732b\u9493\u9c7c": "2021-03-22 17:50:53"
},
"admin": {
"admin001": "654321"
}
}

@ -0,0 +1,125 @@
import json
import time
import hashlib
class Library():
def __init__(self):
self.loginStats = 0
self.loginUser = None
self.password = None
self.now = time.strftime('%F %X')
def hashPass(self,password):
t = hashlib.sha256(password.encode('utf-8'))
return t.hexdigest()
def write_json(self, name, dict):
with open(name, 'w') as f:
json.dump(dict, f, indent='\t', ensure_ascii=False)
def read_json(self, name):
with open(name, 'r') as f:
data = json.load(f)
return data
def regUser(self, u):
self.write_json('./data.json', u)
self.write_json(f'./{self.loginUser}', {'regTime': self.now,'book':{}})
def login(self):
data = self.read_json('data.json')
if self.loginUser in data['user']:
if data['user'][self.loginUser] == self.hashPass(self.password):
return True
def borrowBook(self, bookName):
data = self.read_json('data.json')
my_data = self.read_json(self.loginUser)
if bookName in data['book']:
del data['book'][bookName]
my_data['book'][bookName] = self.now
self.write_json('data.json', data)
self.write_json(self.loginUser, my_data)
return True
def backBook(self, bookName):
my_data = self.read_json(self.loginUser)
data = self.read_json('data.json')
if bookName in my_data['book']:
del my_data['book'][bookName]
data['book'][bookName] = self.now
self.write_json('data.json', data)
self.write_json(self.loginUser, my_data)
return True
def pushBook(self, books):
if books:
s = {}.fromkeys(books, self.now)
data = self.read_json('data.json')
data['book'].update(s)
self.write_json('data.json', data)
def main(self):
try:
while True:
num = input('''(1) 注册\n(2) 登录\n(3) 借书\n(4) 还书\n(5) 上架\n(6) 注销\n(0) 退出\n请输入编号:''')
if num == '2':
if self.loginStats==1: print('用户已登录');continue
self.loginUser, self.password = input('请输入账号:'), input('请输入密码:')
s = self.login()
if not s:
print('密码错误')
else:
print('登录成功')
self.loginStats = 1
elif num == '1':
if self.loginStats==1: print('用户已登录');continue
data = self.read_json('data.json')
self.loginUser, self.password, password = input('请输入账号:'), input('请输入密码:'), input('请在此输入密码:')
if self.loginUser in data['user']: print('用户已存在')
else: data['user'][self.loginUser] = self.hashPass(self.password);self.regUser(data);print('创建成功');self.loginStats = 1
elif num == '3':
if self.loginStats==0: print('用户未登录');continue
print('书籍列表:')
for k,v in self.read_json('data.json')['book'].items():
print(f'名称:{k}',f'时间:{v}',sep='\t\t')
bn = input('请输入书籍名称:')
if self.borrowBook(bn): print('借书成功')
else: print('输入错误,请检查')
elif num == '4':
if self.loginStats==0: print('用户未登录');continue
if not self.read_json(self.loginUser)['book']:
print('没有借阅任何书籍')
continue
print('我的书架:')
for k,v in self.read_json(self.loginUser)['book'].items():
print(f'名称:{k}',f'时间:{v}',sep='\t\t')
bn = input('请输入书籍名称:')
if self.backBook(bn): print('还书成功')
else: print('输入错误,请检查')
elif num == '5':
if self.loginUser!='admin001': print('管理员用户请自行初始化');continue
data = self.read_json('data.json')
blist = input('请输入书籍名称(多本书,请使用“,”隔开 ):').split(',')
data['book'].update({}.fromkeys(blist,self.now))
self.write_json('data.json',data)
print('上架成功')
elif num == '6':
if self.loginStats==0: print('用户未登录');continue
print(f'{self.loginUser}已注销');self.loginUser = None
elif num == '0':break
else: print('输入错误,请重新输入')
except StopIteration as e:
print(e)
finally:
print('欢迎下次光临')
if __name__ == '__main__':
t = Library()
t.main()
# 管理员用户上架图书
# t = Library()
# t.loginUser = 'admin001'
# t.main()

@ -0,0 +1,40 @@
import paramiko
class sshd:
def __init__(self,hostname,
passwd,
username='root',
port=22):
self.hostname = hostname
self.passwd = passwd
self.username=username
self.port=port
self.obj=paramiko.Transport((self.hostname,self.port))
self.obj.connect(username=self.username,password=self.passwd)
self.ssh = paramiko.SSHClient()
self.ssh._transport = self.obj
self.sftp=paramiko.SFTPClient.from_transport(self.obj)
def op_ssh(self,cmd):
stdin,stdout,stderr = self.ssh.exec_command(cmd)
stdout = str(stdout.read().decode())
stderr = str(stderr.read().decode())
if stdout:
return stdout
else:
return stderr
def op_ftp_push(self,froms,tos):
self.sftp.put(froms,tos)
return True
def op_ftp_pull(self,froms,tos):
self.sftp.get(froms,tos)
return True
def close(self):
self.sftp.close()
self.obj.close()
def __str__(self):
return 'QianFeng cloud computing testing'
if __name__ == '__main__':
abc = sshd(hostname='127.0.0.1',passwd='123')
s = abc.op_ssh('df -Th')
b = abc.op_ftp_pull('/etc/passwd','/mnt/abc.txt')
print(s,b)
abc.close()

@ -0,0 +1,50 @@
# 帮助用户查找文件
# 由于windows自带的文件查找很是垃圾,所以我们自己写了一个查找文件的程序
# 1、用户输入文件的关键字
# 2、用户可以输入一个大概的位置,如果用户不输入,默认为/
# 3、返回查找了多少个文件和找到了相关的文件有多少
import os
allfile = []
kwfile = []
def check_exists(dir):
if os.path.exists(dir):
return True
else:
print('目录不存在,使用默认目录')
global p
if os.name == 'nt':
p = 'c:\\'
elif os.name == 'posix':
p = '/'
# 功能相关
def check_abs(path):
os.chdir(path)
return os.path.abspath(path)
kw = input('请输入需要查询文件中的关键字[default "network"]:')
if kw == '':
kw = 'network'
p = input('请输入文件的大概位置[default C:|/]:')
check_exists(p)
def main(path):
path = check_abs(path) # 执行函数修改成绝对路径 用户输入./test,cd ./test && pwd
dirlist = os.listdir(path) # 列表 = ls -A ./
for i in dirlist: # 循环这个列表,获得目录下面的所有文件
allfile.append(os.path.join(path,i)) # 将文件追加到空列表 allfile 中
if os.path.isdir(os.path.join(path,i)): # 使用isdir来判断是否是目录
main(os.path.join(path,i))
if kw in i: # 使用in来判断是否包含关键字
kwfile.append(os.path.join(path,i))
main(p)
for i in kwfile:
print(i)
print(f'{len(allfile)}个文件中进行了查找')
print(f'共查找到{len(kwfile)}个相关文件')

@ -0,0 +1,56 @@
from urllib import request,parse # urllib 网络需要使用的模块 python3 urllib urllib2 urllib3 requests # 配置文件
import chardet,re,requests # chardet 检测网页的字符集(有时候不准)
import logging # 日志模块
import os,sys
class spider(): # spider 爬虫框架
def __init__(self,word):
self.word = word # 要爬取的图片的关键字
self.url = f'https://image.baidu.com/search/index?tn=baiduimage&' # 定义基础url
logging.basicConfig(filename=f'message.log', level=logging.INFO, format='%(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S') # 日志模块,定义日志的内容模板
def urld(self):
word = self.word
word = {
"word": word
}
self.word = parse.urlencode(word) # 使用 parse 将关键字进行编码
self.url = self.url + self.word # 基础url 和关键字进行拼接
return self.url
def data(self,path): # 定义爬取的功能函数
if not os.path.exists(path): # 判断路径是否存在 如果不,打印日志 退出程序
self.logd('路径无法找到,请检查')
sys.exit(3)
rsps = request.urlopen(self.url) # 打开rul(访问url)
if rsps.getcode() == 200: # 判断返回的状态码是什么
html = rsps.read() # 获取html代码
code = chardet.detect(html) # 检测字符集
html = html.decode(code.get('encoding', 'utf-8')) # 解码过程
data = re.findall(r'http[s]://.*?\.jpg', html) # 使用正则匹配网页内的图片信息
data = list(set(data)) # 去重
n = 1
path = path+os.sep
print(path)
for i in data:
d = requests.get(i).content # 读取图片内容,将内容写到文件中,以二进制的方式
f = open(f'{path}{n}.jpg', 'wb')
self.logd(f'url{n}:ok--{i}')
print('正在爬取。。。')
print(f'url{n}:ok--{i}')
f.write(d)
f.close()
n += 1
else:
self.logd('访问错误,请检查网络是否连接')
#sys.exit(4)
def logd(self,log, level='error'):
if level == 'error':
logging.error(log)
else:
logging.critical(log)
if __name__ == '__main__':
path='/Users/mac/Desktop/a/'
a = spider('美女')
a.urld()
a.data(path=path)

@ -0,0 +1,54 @@
from urllib import request,parse # urllib 网络需要使用的模块 python3 urllib urllib2 urllib3 requests
from config import * # 配置文件
import chardet,config,re,requests # chardet 检测网页的字符集(有时候不准)
import logging # 日志模块
import os,sys
class spider(): # spider 爬虫框架
def __init__(self,word):
self.word = word # 要爬取的图片的关键字
self.url = f'https://image.baidu.com/search/index?tn=baiduimage&' # 定义基础url
logging.basicConfig(filename=f'{keyword}.log', level=logging.INFO, format='%(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S') # 日志模块,定义日志的内容模板
def urld(self):
word = self.word
word = {
"word": word
}
self.word = parse.urlencode(word) # 使用 parse 将关键字进行编码
self.url = self.url + self.word # 基础url 和关键字进行拼接
return self.url
def data(self,path): # 定义爬取的功能函数
if not os.path.exists(path): # 判断路径是否存在 如果不,打印日志 退出程序
self.logd('路径无法找到,请检查')
sys.exit(3)
rsps = request.urlopen(self.url) # 打开rul(访问url)
if rsps.getcode() == 200: # 判断返回的状态码是什么
html = rsps.read() # 获取html代码
code = chardet.detect(html) # 检测字符集
html = html.decode(code.get('encoding', 'utf-8')) # 解码过程
data = re.findall(r'http[s]://.*?\.jpg', html) # 使用正则匹配网页内的图片信息
data = list(set(data)) # 去重
n = 1
for i in data:
d = requests.get(i).content # 读取图片内容,将内容写到文件中,以二进制的方式
f = open(f'{path}{keyword}{n}.jpg', 'wb')
self.logd(f'url{n}:ok--{i}')
print('正在爬取。。。')
print(f'url{n}:ok--{i}')
f.write(d)
f.close()
n += 1
else:
self.logd('访问错误,请检查网络是否连接')
sys.exit(4)
def logd(self,log, level='error'):
if level == 'error':
logging.error(log)
else:
logging.critical(log)
if __name__ == '__main__':
a = spider(keyword)
a.urld()
a.data(path=path)

@ -0,0 +1,26 @@
#!/usr/bin/env python3
#-*-coding: utf-8-*-
from tkinter import *
from a import *
def test():
var= entry1.get()
path = entry2.get()
b = spider(var)
b.urld()
b.data(path=path)
# 构造窗口
root = Tk()
root.title('python爬虫')
Label(root, text='关键字|路径\n').pack()
entry1 = Entry(root,width=20)
entry2 = Entry(root,width=20)
but = Button(root,text='确认',command=test)
# 添加元素
entry1.pack()
entry2.pack()
but.pack()
# 循环运行
root.mainloop()

@ -0,0 +1,16 @@
import requests
from lxml import etree
import time
next_url = "http://book.zongheng.com/chapter/1128608/66171932.html"
for i in range(1,100):
html = requests.get(url=next_url)
e = etree.HTML(html.content)
title = e.xpath('//div[@class="title_txtbox"]/text()')[0]
text = '\n'.join(e.xpath('//p/text()'))
next_url = e.xpath('//div/a[text()="下一章"]/@href')[0]
with open(f'国公凶猛.txt','a') as f:
f.write(title+'\n')
f.writelines(text)
print(title,'下载成功')
time.sleep(0.5)

@ -0,0 +1,30 @@
import requests,re
import time
def func(url):
data = requests.get(url)
data.encoding = 'UTF-8'
data = data.text
strd = re.search(r'<p>(.*)</p>',data)
strd = strd.group().replace('<p>','----').replace('</p>','\n')
return strd
def func2(url):
data = requests.get(url)
data.encoding = 'UTF-8'
data = data.text
s = re.search(r'(http://book.zongheng.com/chapter.*?\d{8}.html).*?下一章',data)
return s.group(1)
url = ''
while True:
time.sleep(5)
if url == '':
url = url = 'http://book.zongheng.com/chapter/557195/27125898.html'
data = func(url)
f = open('a.txt','a+')
f.write(data)
url = func2(url)
print(url)
f.close()

@ -1,31 +1,31 @@
import requests
import tqdm as tqdm
import os
url = 'http://mirrors.163.com/centos/8.3.2011/isos/x86_64/CentOS-8.3.2011-x86_64-boot.iso'
def download(url):
filename = url.split('/')[-1]
total_size = int(requests.head(url).headers['Content-Length'])
if os.path.exists(filename):
file_size = os.path.getsize(filename)
if file_size < total_size:
print('断点续传中。。。')
elif file_size == total_size:
print('文件已存在')
exit(0)
else:
file_size = 0
header = {'Range': 'bytes=%s-%s' % (file_size, total_size)}
t = tqdm.tqdm(total=total_size, desc=filename, initial=file_size, unit='B', unit_scale=True)
result = requests.get(url, headers=header, stream=True)
with open(filename, 'ab') as f:
for i in result.iter_content(chunk_size=1024):
f.write(i)
t.update(1024)
t.close()
if __name__ == '__main__':
url = 'http://mirrors.163.com/centos/8.3.2011/isos/x86_64/CentOS-8.3.2011-x86_64-boot.iso'
import requests
import tqdm as tqdm
import os
url = 'http://mirrors.163.com/centos/8.3.2011/isos/x86_64/CentOS-8.3.2011-x86_64-boot.iso'
def download(url):
filename = url.split('/')[-1]
total_size = int(requests.head(url).headers['Content-Length'])
if os.path.exists(filename):
file_size = os.path.getsize(filename)
if file_size < total_size:
print('断点续传中。。。')
elif file_size == total_size:
print('文件已存在')
exit(0)
else:
file_size = 0
header = {'Range': 'bytes=%s-%s' % (file_size, total_size)}
t = tqdm.tqdm(total=total_size, desc=filename, initial=file_size, unit='B', unit_scale=True)
result = requests.get(url, headers=header, stream=True)
with open(filename, 'ab') as f:
for i in result.iter_content(chunk_size=1024):
f.write(i)
t.update(1024)
t.close()
if __name__ == '__main__':
url = 'http://mirrors.163.com/centos/8.3.2011/isos/x86_64/CentOS-8.3.2011-x86_64-boot.iso'
download(url)

@ -1,51 +1,51 @@
from alive_progress import alive_bar
import math
import requests
import os
class Download():
def __init__(self, urlPath=None):
self.urlPath = urlPath
self.filename = urlPath.split('/')[-1]
self.header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36'
}
def download(self):
self.header['Range'] = 'bytes=%s-%s' % (self.fileSize, self.totalSize)
self.result = requests.get(url=self.urlPath, headers=self.header, stream=True)
def progress(self):
with alive_bar(total=math.ceil((self.totalSize - self.fileSize) / 1024), title=self.filename, title_length=10, force_tty=True) as bar:
with open(self.filename, 'wb') as f:
for i in self.result.iter_content(chunk_size=1024):
f.write(i)
bar()
def checkPath(self):
self.totalSize = int(requests.head(url=self.urlPath, headers=self.header).headers['Content-Length'])
if os.path.exists(self.filename):
self.fileSize = os.path.getsize(self.filename)
if self.fileSize < self.totalSize:
print(f'文件{self.filename}断点续传中')
else:
print('文件已存在')
return ''
else:
self.fileSize = 0
def run(self):
self.checkPath()
self.download()
self.progress()
if __name__ == '__main__':
with open('./url.txt','r') as f:
urls = f.read().splitlines()
for url in urls:
if not url:
continue
s = Download(urlPath=url)
s.run()
from alive_progress import alive_bar
import math
import requests
import os
class Download():
def __init__(self, urlPath=None):
self.urlPath = urlPath
self.filename = urlPath.split('/')[-1]
self.header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36'
}
def download(self):
self.header['Range'] = 'bytes=%s-%s' % (self.fileSize, self.totalSize)
self.result = requests.get(url=self.urlPath, headers=self.header, stream=True)
def progress(self):
with alive_bar(total=math.ceil((self.totalSize - self.fileSize) / 1024), title=self.filename, title_length=10, force_tty=True) as bar:
with open(self.filename, 'wb') as f:
for i in self.result.iter_content(chunk_size=1024):
f.write(i)
bar()
def checkPath(self):
self.totalSize = int(requests.head(url=self.urlPath, headers=self.header).headers['Content-Length'])
if os.path.exists(self.filename):
self.fileSize = os.path.getsize(self.filename)
if self.fileSize < self.totalSize:
print(f'文件{self.filename}断点续传中')
else:
print('文件已存在')
return ''
else:
self.fileSize = 0
def run(self):
self.checkPath()
self.download()
self.progress()
if __name__ == '__main__':
with open('./url.txt','r') as f:
urls = f.read().splitlines()
for url in urls:
if not url:
continue
s = Download(urlPath=url)
s.run()

@ -0,0 +1,19 @@
import requests,re
'''
思路获取网页的前端页面通过正则方法获取到页面中的静态资源地址下载地址
'''
first_url = "http://www.xiaohuar.com/2014.html" # 定义url
reponse = requests.get(first_url) # 获取网页对象
reponse.encoding = 'GBK' # 定义编码方式
html = reponse.text # 获取html 代码
img_urls = re.findall(r'src="(/d/file/\w+\.jpg)"', html) #正则匹配图片地址
img_num = len(img_urls)
for i in range(img_num): # 拼接url
img_urls[i] = "http://www.xiaohuar.com%s" % img_urls[i]
for img_url in img_urls: # 下载图片并保存
img_file_name = img_url.split('/')[-1]
img_data = requests.get(img_url).content
with open(img_file_name, "wb") as f:
f.write(img_data)
print(img_url)
Loading…
Cancel
Save