本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下:
服务端使用asyncore, 收到文件后保存到本地。
客户端使用pyinotify监视目录的变化 ,把变动的文件发送到服务端。
重点:
1. 使用structs打包发送文件的信息,服务端收到后,根据文件信息来接收客户端传送过来的文件。
2. 客户端使用多线程,pyinotify监视到文件变化,放到队列中,由另外一个线程发送。
上代码:
服务端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
# receive file from client and store them into file use asyncore.# #/usr/bin/python #coding: utf-8 import asyncore import socket from socket import errno import logging import time import sys import struct import os import fcntl import threading from rrd_graph import MakeGraph try : import rrdtool except (ImportError, ImportWarnning): print "Hope this information can help you:" print "Can not find pyinotify module in sys path, just run [apt-get install python-rrdtool] in ubuntu." sys.exit( 1 ) class RequestHandler(asyncore.dispatcher): def __init__( self , sock, map = None , chunk_size = 1024 ): self .logger = logging.getLogger( '%s-%s' % ( self .__class__.__name__, str (sock.getsockname()))) self .chunk_size = chunk_size asyncore.dispatcher.__init__( self ,sock, map ) self .data_to_write = list () def readable( self ): #self.logger.debug("readable() called.") return True def writable( self ): response = ( not self .connected) or len ( self .data_to_write) #self.logger.debug('writable() -> %s data length -> %s' % (response, len(self.data_to_write))) return response def handle_write( self ): data = self .data_to_write.pop() #self.logger.debug("handle_write()->%s size: %s",data.rstrip('\r\n'),len(data)) sent = self .send(data[: self .chunk_size]) if sent < len (data): remaining = data[sent:] self .data_to_write.append(remaining) def handle_read( self ): self .writen_size = 0 nagios_perfdata = '../perfdata' head_packet_format = "!LL128s128sL" head_packet_size = struct.calcsize(head_packet_format) data = self .recv(head_packet_size) if not data: return filepath_len, filename_len, filepath,filename, filesize = struct.unpack(head_packet_format,data) filepath = os.path.join(nagios_perfdata, filepath[:filepath_len]) filename = filename[:filename_len] self .logger.debug( "update file: %s" % filepath + '/' + filename) try : if not os.path.exists(filepath): os.makedirs(filepath) except OSError: pass self .fd = open (os.path.join(filepath,filename), 'w' ) #self.fd = open(filename,'w') if filesize > self .chunk_size: times = filesize / self .chunk_size first_part_size = times * self .chunk_size second_part_size = filesize % self .chunk_size while 1 : try : data = self .recv( self .chunk_size) #self.logger.debug("handle_read()->%s size.",len(data)) except socket.error,e: if e.args[ 0 ] = = errno.EWOULDBLOCK: print "EWOULDBLOCK" time.sleep( 1 ) else : #self.logger.debug("Error happend while receive data: %s" % e) break else : self .fd.write(data) self .fd.flush() self .writen_size + = len (data) if self .writen_size = = first_part_size: break #receive the packet at last while 1 : try : data = self .recv(second_part_size) #self.logger.debug("handle_read()->%s size.",len(data)) except socket.error,e: if e.args[ 0 ] = = errno.EWOULDBLOCK: print "EWOULDBLOCK" time.sleep( 1 ) else : #self.logger.debug("Error happend while receive data: %s" % e) break else : self .fd.write(data) self .fd.flush() self .writen_size + = len (data) if len (data) = = second_part_size: break elif filesize < = self .chunk_size: while 1 : try : data = self .recv(filesize) #self.logger.debug("handle_read()->%s size.",len(data)) except socket.error,e: if e.args[ 0 ] = = errno.EWOULDBLOCK: print "EWOULDBLOCK" time.sleep( 1 ) else : #self.logger.debug("Error happend while receive data: %s" % e) break else : self .fd.write(data) self .fd.flush() self .writen_size + = len (data) if len (data) = = filesize: break self .logger.debug( "File size: %s" % self .writen_size) class SyncServer(asyncore.dispatcher): def __init__( self ,host,port): asyncore.dispatcher.__init__( self ) self .debug = True self .logger = logging.getLogger( self .__class__.__name__) self .create_socket(socket.AF_INET,socket.SOCK_STREAM) self .set_reuse_addr() self .bind((host,port)) self .listen( 2000 ) def handle_accept( self ): client_socket = self .accept() if client_socket is None : pass else : sock, addr = client_socket #self.logger.debug("Incoming connection from %s" % repr(addr)) handler = RequestHandler(sock = sock) class RunServer(threading.Thread): def __init__( self ): super (RunServer, self ).__init__() self .daemon = False def run( self ): server = SyncServer('', 9999 ) asyncore.loop(use_poll = True ) def StartServer(): logging.basicConfig(level = logging.DEBUG, format = '%(name)s: %(message)s' , ) RunServer().start() #MakeGraph().start() if __name__ = = '__main__' : StartServer() |
客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# monitor path with inotify(python module), and send them to remote server.# # use sendfile(2) instead of send function in socket, if we have python-sendfile installed.# import socket import time import os import sys import struct import threading import Queue try : import pyinotify except (ImportError, ImportWarnning): print "Hope this information can help you:" print "Can not find pyinotify module in sys path, just run [apt-get install python-pyinotify] in ubuntu." sys.exit( 1 ) try : from sendfile import sendfile except (ImportError,ImportWarnning): pass filetype_filter = [ ".rrd" , ".xml" ] def check_filetype(pathname): for suffix_name in filetype_filter: if pathname[ - 4 :] = = suffix_name: return True try : end_string = pathname.rsplit( '.' )[ - 1 :][ 0 ] end_int = int (end_string) except : pass else : # means pathname endwith digit return False class sync_file(threading.Thread): def __init__( self , addr, events_queue): super (sync_file, self ).__init__() self .daemon = False self .queue = events_queue self .addr = addr self .chunk_size = 1024 def run( self ): while 1 : event = self .queue.get() if check_filetype(event.pathname): print time.asctime(),event.maskname, event.pathname filepath = event.path.split( '/' )[ - 1 :][ 0 ] filename = event.name filesize = os.stat(os.path.join(event.path, filename)).st_size sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) filepath_len = len (filepath) filename_len = len (filename) sock.connect( self .addr) offset = 0 data = struct.pack( "!LL128s128sL" ,filepath_len, filename_len, filepath,filename,filesize) fd = open (event.pathname, 'rb' ) sock.sendall(data) if "sendfile" in sys.modules: # print "use sendfile(2)" while 1 : sent = sendfile(sock.fileno(), fd.fileno(), offset, self .chunk_size) if sent = = 0 : break offset + = sent else : # print "use original send function" while 1 : data = fd.read( self .chunk_size) if not data: break sock.send(data) sock.close() fd.close() class EventHandler(pyinotify.ProcessEvent): def __init__( self , events_queue): super (EventHandler, self ).__init__() self .events_queue = events_queue def my_init( self ): pass def process_IN_CLOSE_WRITE( self ,event): self .events_queue.put(event) def process_IN_MOVED_TO( self ,event): self .events_queue.put(event) def start_notify(path, mask, sync_server): events_queue = Queue.Queue() sync_thread_pool = list () for i in range ( 500 ): sync_thread_pool.append(sync_file(sync_server, events_queue)) for i in sync_thread_pool: i.start() wm = pyinotify.WatchManager() notifier = pyinotify.Notifier(wm,EventHandler(events_queue)) wdd = wm.add_watch(path,mask,rec = True ) notifier.loop() def do_notify(): perfdata_path = '/var/lib/pnp4nagios/perfdata' mask = pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO sync_server = ( '127.0.0.1' , 9999 ) start_notify(perfdata_path,mask,sync_server) if __name__ = = '__main__' : do_notify() |
python监视线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
#!/usr/bin/python import threading import time class Monitor(threading.Thread): def __init__( self , * args, * * kwargs): super (Monitor, self ).__init__() self .daemon = False self .args = args self .kwargs = kwargs self .pool_list = [] def run( self ): print self .args print self .kwargs for name,value in self .kwargs.items(): obj = value[ 0 ] temp = {} temp[name] = obj self .pool_list.append(temp) while 1 : print self .pool_list for name,value in self .kwargs.items(): obj = value[ 0 ] parameters = value[ 1 :] died_threads = self .cal_died_thread( self .pool_list,name) print "died_threads" , died_threads if died_threads > 0 : for i in range (died_threads): print "start %s thread..." % name t = obj[ 0 ].__class__( * parameters) t.start() self .add_to_pool_list(t,name) else : break time.sleep( 0.5 ) def cal_died_thread( self ,pool_list,name): i = 0 for item in self .pool_list: for k,v in item.items(): if name = = k: lists = v for t in lists: if not t.isAlive(): self .remove_from_pool_list(t) i + = 1 return i def add_to_pool_list( self ,obj,name): for item in self .pool_list: for k,v in item.items(): if name = = k: v.append(obj) def remove_from_pool_list( self , obj): for item in self .pool_list: for k,v in item.items(): try : v.remove(obj) except : pass else : return |
使用方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
rrds_queue = Queue.Queue() make_rrds_pool = [] for i in range ( 5 ): make_rrds_pool.append(MakeRrds(rrds_queue)) for i in make_rrds_pool: i.start() make_graph_pool = [] for i in range ( 5 ): make_graph_pool.append(MakeGraph(rrds_queue)) for i in make_graph_pool: i.start() monitor = Monitor(make_rrds_pool = (make_rrds_pool, rrds_queue), \ make_graph_pool = (make_graph_pool, rrds_queue)) monitor.start() |
解析:
1. 接受字典参数,value为一个元组,第一个元素是线程池,后面的都是参数。
2. 每0.5秒监视线程池中的线程数量,如果线程死掉了,记录死掉线程的数目,再启动同样数量的线程。
3. 如果没有线程死去,则什么也不做。
从外部调用Django模块
1
2
3
4
5
6
7
8
|
import os import sys sys.path.insert( 0 , '/data/cloud_manage' ) from django.core.management import setup_environ import settings setup_environ(settings) from common.monitor import Monitor from django.db import connection, transaction |
前提就是,要新建一个django的project,这里我们新建了一个cloud_manage.
这样不仅可以调用django自身的模块,还能调用project本身的东西。
希望本文所述对大家的Python程序设计有所帮助。