使用python3实现ftp服务功能实例(服务端forlinux)

这篇文章主要介绍了python3实现ftp服务功能,服务端 for linux,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

本文实例为大家分享了python3实现ftp服务功能的具体代码,供大家参考,具体内容如下

功能介绍:

可执行的命令:

lspwdcd put rm get mkdir

1、用户加密认证

2、允许多用户同时登陆

3、每个用户有自己的家目录,且只可以访问自己的家目录

4、运行在自己家目录下随意切换目录

5、允许上传下载文件,且文件一致

6、传输过程中显示进度条server main 代码:

# author by andy
# _*_ coding:utf-8 _*_
import os, sys, json, hashlib, socketserver, time
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(file)))
sys.path.append(base_dir)
from conf import userdb_set
class ftp_server(socketserver.baserequesthandler):
user_home_dir = ”
def auth(self, *args):
”’验证用户名及密码”’
cmd_dic = args[0]
username = cmd_dic[“username”]
password = cmd_dic[“password”]
f = open(userdb_set.userdb_set(), ‘r’)
user_info = json.load(f)
if username in user_info.keys():
if password == user_info[username]:
self.request.send(‘0’.encode())
os.chdir(‘/home/%s’ % username)
self.user_home_dir = os.popen(‘pwd’).read().strip()
data = “%s login successed” % username
self.loging(data)
else:
self.request.send(‘1’.encode())
data = “%s login failed” % username
self.loging(data)
f.close
else:
self.request.send(‘1′.encode())
data = “%s login failed” % username
self.loging(data)
f.close
##########################################
def get(self, *args):
”’给客户端传输文件”’
request_code = {
‘0’: ‘file is ready to get’,
‘1’: ‘file not found!’
}
cmd_dic = args[0]
self.loging(json.dumps(cmd_dic))
filename = cmd_dic[“filename”]
if os.path.isfile(filename):
self.request.send(‘0’.encode(‘utf-8’)) # 确认文件存在
self.request.recv(1024)
self.request.send(str(os.stat(filename).st_size).encode(‘utf-8’))
self.request.recv(1024)
m = hashlib.md5()
f = open(filename, ‘rb’)
for line in f:
m.update(line)
self.request.send(line)
self.request.send(m.hexdigest().encode(‘utf-8’))
print(‘from server:md5 value has been sended!’)
f.close()
else:
self.request.send(‘1’.encode(‘utf-8′))
###########################################
def cd(self, *args):
”’执行cd命令”’
user_current_dir = os.popen(‘pwd’).read().strip()
cmd_dic = args[0]
self.loging(json.dumps(cmd_dic))
path = cmd_dic[‘path’]
if path.startswith(‘/’):
if self.user_home_dir in path:
os.chdir(path)
new_dir = os.popen(‘pwd’).read()
user_current_dir = new_dir
self.request.send(‘change dir successfully!’.encode(“utf-8”))
data = ‘change dir successfully!’
self.loging(data)
elif os.path.exists(path):
self.request.send(‘permission denied!’.encode(“utf-8”))
data = ‘permission denied!’
self.loging(data)
else:
self.request.send(‘directory not found!’.encode(“utf-8”))
data = ‘directory not found!’
self.loging(data)
elif os.path.exists(path):
os.chdir(path)
new_dir = os.popen(‘pwd’).read().strip()
if self.user_home_dir in new_dir:
self.request.send(‘change dir successfully!’.encode(“utf-8”))
user_current_dir = new_dir
data = ‘change dir successfully!’
self.loging(data)
else:
os.chdir(user_current_dir)
self.request.send(‘permission denied!’.encode(“utf-8”))
data = ‘permission denied!’
self.loging(data)
else:
self.request.send(‘directory not found!’.encode(“utf-8”))
data = ‘directory not found!’
self.loging(data)
###########################################
def rm(self, *args):
request_code = {
‘0’: ‘file exist,and please confirm whether to rm’,
‘1’: ‘file not found!’
}
cmd_dic = args[0]
self.loging(json.dumps(cmd_dic))
filename = cmd_dic[‘filename’]
if os.path.exists(filename):
self.request.send(‘0’.encode(“utf-8”)) # 确认文件存在
client_response = self.request.recv(1024).decode()
if client_response == ‘0’:
os.popen(‘rm -rf %s’ % filename)
self.request.send((‘file %s has been deleted!’ % filename).encode(“utf-8”))
self.loging(‘file %s has been deleted!’ % filename)
else:
self.request.send((‘file %s not deleted!’ % filename).encode(“utf-8”))
self.loging(‘file %s not deleted!’ % filename)
else:
self.request.send(‘1′.encode(“utf-8″))
########################################
def pwd(self, *args):
”’执行pwd命令”’
cmd_dic = args[0]
self.loging(json.dumps(cmd_dic))
server_response = os.popen(‘pwd’).read().strip().encode(“utf-8″)
self.request.send(server_response)
#############################################
def ls(self, *args):
”’执行ls命名”’
cmd_dic = args[0]
self.loging(json.dumps(cmd_dic))
path = cmd_dic[‘path’]
cmd = ‘ls -l %s’ % path
server_response = os.popen(cmd).read().encode(“utf-8″)
self.request.send(server_response)
############################################
def put(self, *args):
”’接收客户端文件”’
cmd_dic = args[0]
self.loging(json.dumps(cmd_dic))
filename = cmd_dic[“filename”]
filesize = cmd_dic[“size”]
if os.path.isfile(filename):
f = open(filename + ‘.new’, ‘wb’)
else:
f = open(filename, ‘wb’)
request_code = {
‘200’: ‘ready to recceive data!’,
‘210’: ‘not ready to received data!’
}
self.request.send(‘200’.encode())
receive_size = 0
while true:
if receive_size < filesize: data = self.request.recv(1024) f.write(data) receive_size += len(data) else: data = "file %s has been uploaded successfully!" % filename self.loging(data) print(data) break ################################################ def mkdir(self, *args): request_code = { '0': 'directory has been made!', '1': 'directory is aleady exist!' } cmd_dic = args[0] self.loging(json.dumps(cmd_dic)) dir_name = cmd_dic['dir_name'] if os.path.exists(dir_name): self.request.send('1'.encode("utf-8")) else: os.popen('mkdir %s' % dir_name) self.request.send('0'.encode("utf-8")) ############################################# def loging(self, data): '''日志记录''' localtime = time.asctime(time.localtime(time.time())) log_file = '/root/ftp/ftpserver/log/server.log' with open(log_file, 'a', encoding='utf-8') as f: f.write('%s-->‘ % localtime + data + ‘\n’)
##############################################
def handle(self):
# print(“您本次访问使用的ip为:%s” %self.client_address[0])
# localtime = time.asctime( time.localtime(time.time()))
# print(localtime)
while true:
try:
self.data = self.request.recv(1024).decode() #
# print(self.data)
cmd_dic = json.loads(self.data)
action = cmd_dic[“action”]
# print(“用户请求%s”%action)
if hasattr(self, action):
func = getattr(self, action)
func(cmd_dic)
except exception as e:
self.loging(str(e))
break
def run():
host, port = ‘0.0.0.0’, 6969
print(“the server is started,and listenning at port 6969”)
server = socketserver.threadingtcpserver((host, port), ftp_server)
server.serve_forever()
if name == ‘main’:
run()

设置用户口令代码:

#author by andy
#_*_ coding:utf-8 _*_
import os,json,hashlib,sys
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(file)))
userdb_file = base_dir+”\data\\userdb”
# print(userdb_file)
def userdb_set():
if os.path.isfile(userdb_file):
# print(userdb_file)
return userdb_file
else:
print(‘请先为您的服务器创建用户!’)
user_data = {}
dict={}
exit_flags = true
while exit_flags:
username = input(“please input username:”)
if username != ‘exit’:
password = input(“please input passwod:”)
if password != ‘exit’:
user_data.update({username:password})
m = hashlib.md5()
# m.update(‘hello’)
# print(m.hexdigest())
for i in user_data:
# print(i,user_data[i])
m.update(user_data[i].encode())
dict.update({i:m.hexdigest()})
else:
break
else:
break
f = open(userdb_file,’w’)
json.dump(dict,f)
f.close()
return userdb_file

目录结构:

以上就是使用python3实现ftp服务功能实例(服务端 for linux)的详细内容,更多请关注 第一php社区 其它相关文章!

Posted in 未分类