网络编程
位置:首页>> 网络编程>> Python编程>> Python通过zookeeper实现分布式服务代码解析

Python通过zookeeper实现分布式服务代码解析

作者:Mars.wang  发布时间:2021-09-11 11:09:48 

标签:python,zookeeper,分布,服务

借助zookeeper可以实现服务器的注册与发现,有需求的时候调用zookeeper来发现可用的服务器,将任务均匀分配到各个服务器上去.

这样可以方便的随任务的繁重程度对服务器进行弹性扩容,客户端和服务端是非耦合的,也可以随时增加客户端.

zk_server.py


import threading
import json
import socket
import sys
from kazoo.client import KazooClient

# TCP服务端绑定端口开启监听,同时将自己注册到zk
class ZKServer(object):
 def __init__(self, host, port):
   self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
   self.host = host
   self.port = port
   self.sock.bind((host, port))
   self.zk = None

def serve(self):
   """
   开始服务,每次获取得到一个信息,都新建一个线程处理
   """
   self.sock.listen(128)
   self.register_zk()
   print("开始监听")
   while True:
     conn, addr = self.sock.accept()
     print("建立链接%s" % str(addr))
     t = threading.Thread(target=self.handle, args=(conn, addr))
     t.start()

# 具体的处理逻辑,只要接收到数据就立即投入工作,下次没有数据本次链接结束
 def handle(self, conn, addr):
   while True:
     data=conn.recv(1024)
     if not data or data.decode('utf-8') == 'exit':
       break
     print(data.decode('utf-8'))
   conn.close()
   print('My work is done!!!')

# 将自己注册到zk,临时节点,所以连接不能中断
 def register_zk(self):
   """
   注册到zookeeper
   """
   self.zk = KazooClient(hosts='127.0.0.1:2181')
   self.zk.start()
   self.zk.ensure_path('/rpc') # 创建根节点
   value = json.dumps({'host': self.host, 'port': self.port})
   # 创建服务子节点
   self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True)

if __name__ == '__main__':
 if len(sys.argv) < 3:
   print("usage:python server.py [host] [port]")
   exit(1)
 host = sys.argv[1]
 port = sys.argv[2]
 server = ZKServer(host, int(port))
 server.serve()

zk_client.py


import random
import sys
import time
import json
import socket

from kazoo.client import KazooClient

# 客户端连接zk,并从zk获取可用的服务器列表
class ZKClient(object):
 def __init__(self):
   self._zk = KazooClient(hosts='127.0.0.1:2181')
   self._zk.start()
   self._get_servers()

def _get_servers(self, event=None):
   """
   从zookeeper获取服务器地址信息列表
   """
   servers = self._zk.get_children('/rpc', watch=self._get_servers)
   # print(servers)
   self._servers = []
   for server in servers:
     data = self._zk.get('/rpc/' + server)[0]
     if data:
       addr = json.loads(data.decode())
       self._servers.append(addr)

def _get_server(self):
   """
   随机选出一个可用的服务器
   """
   return random.choice(self._servers)

def get_connection(self):
   """
   提供一个可用的tcp连接
   """
   sock = None
   while True:
     server = self._get_server()
     print('server:%s' % server)
     try:
       sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       sock.connect((server['host'], server['port']))
     except ConnectionRefusedError:
       time.sleep(1)
       continue
     else:
       break
   return sock
if __name__ == '__main__':
 # 模拟多个客户端批量生成任务,推送给服务器执行
 client = ZKClient()
 for i in range(40):
   sock = client.get_connection()
   sock.send(bytes(str(i), encoding='utf8'))
   sock.close()
   time.sleep(1)

来源:https://www.cnblogs.com/wangbin2188/p/13346079.html

0
投稿

猜你喜欢

手机版 网络编程 asp之家 www.aspxhome.com