借助zookeeper可以实现服务器的注册与发现,有需求的时候调用zookeeper来发现可用的服务器,将任务均匀分配到各个服务器上去.
这样可以方便的随任务的繁重程度对服务器进行弹性扩容,客户端和服务端是非耦合的,也可以随时增加客户端.
zk_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
import threading import json import socket import sys from kazoo.client import KazooClient # TCP服务端绑定端口开启监听,同时将自己注册到zk class ZKServer( object ): def __init__( self , host, port): self .sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self .sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 ) self .host = host self .port = port self .sock.bind((host, port)) self .zk = None def serve( self ): """ 开始服务,每次获取得到一个信息,都新建一个线程处理 """ self .sock.listen( 128 ) self .register_zk() print ( "开始监听" ) while True : conn, addr = self .sock.accept() print ( "建立链接%s" % str (addr)) t = threading.Thread(target = self .handle, args = (conn, addr)) t.start() # 具体的处理逻辑,只要接收到数据就立即投入工作,下次没有数据本次链接结束 def handle( self , conn, addr): while True : data = conn.recv( 1024 ) if not data or data.decode( 'utf-8' ) = = 'exit' : break print (data.decode( 'utf-8' )) conn.close() print ( 'My work is done!!!' ) # 将自己注册到zk,临时节点,所以连接不能中断 def register_zk( self ): """ 注册到zookeeper """ self .zk = KazooClient(hosts = '127.0.0.1:2181' ) self .zk.start() self .zk.ensure_path( '/rpc' ) # 创建根节点 value = json.dumps({ 'host' : self .host, 'port' : self .port}) # 创建服务子节点 self .zk.create( '/rpc/server' , value.encode(), ephemeral = True , sequence = True ) if __name__ = = '__main__' : if len (sys.argv) < 3 : print ( "usage:python server.py [host] [port]" ) exit( 1 ) host = sys.argv[ 1 ] port = sys.argv[ 2 ] server = ZKServer(host, int (port)) server.serve() |
zk_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
import random import sys import time import json import socket from kazoo.client import KazooClient # 客户端连接zk,并从zk获取可用的服务器列表 class ZKClient( object ): def __init__( self ): self ._zk = KazooClient(hosts = '127.0.0.1:2181' ) self ._zk.start() self ._get_servers() def _get_servers( self , event = None ): """ 从zookeeper获取服务器地址信息列表 """ servers = self ._zk.get_children( '/rpc' , watch = self ._get_servers) # print(servers) self ._servers = [] for server in servers: data = self ._zk.get( '/rpc/' + server)[ 0 ] if data: addr = json.loads(data.decode()) self ._servers.append(addr) def _get_server( self ): """ 随机选出一个可用的服务器 """ return random.choice( self ._servers) def get_connection( self ): """ 提供一个可用的tcp连接 """ sock = None while True : server = self ._get_server() print ( 'server:%s' % server) try : sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((server[ 'host' ], server[ 'port' ])) except ConnectionRefusedError: time.sleep( 1 ) continue else : break return sock if __name__ = = '__main__' : # 模拟多个客户端批量生成任务,推送给服务器执行 client = ZKClient() for i in range ( 40 ): sock = client.get_connection() sock.send(bytes( str (i), encoding = 'utf8' )) sock.close() time.sleep( 1 ) |
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://www.cnblogs.com/wangbin2188/p/13346079.html