用nameko构建微服务
复用代码,有两种方式。一种是做成package,另一种,就是做成微服务
做成微服务,少了引包、装包的依赖、配置各个参数的各种麻烦。而且包的实现,对调用者是透明的。
但是我觉的,微服务也不能滥用。适可而止。把握好做package与做微服务的界限。但还是:
能做微服务,就做微服务。做微服务反带来麻烦的,就果断继续用package来封装,方便调用。
微服务我认为,定位还是企业内部用的。不宜开放给第三方。要开放给第三方,还是单做一个服务网关吧。
比如kong,做为服务网关就很好。
微服务,还是服务于内部代码、业务逻辑复用比较好些。
这样一个定位,我认为,nameko就是python的绝佳微服务实现利器。
服务注册、发现、与代码升级
欣赏nameko的地方,在于它做的极简。只实现必要的功能,其它放手给开发者做。
微服务要想好用,离不开服务的注册、发现、代码升级。用nameko实现起来也很简单。放上我的代码。
下面这段代码,只要每个具体实现的微服务,继承这个TjBaseService对象,就会自动有help()方法。可以直接看这个微服务都具体提供了哪些rpc调用。调用dir.list(),可以直接看到所有的微服务列表(这块硬编码,因为服务太少,还没必要做成持久化的)。
用法:
$ nameko shell
n.rpc.dir.help() # 看 dir 服务有哪些方法
n.rpc.dir.list() # 看有哪些微服务
n.dispatch_event('dir', 'event_live_update', None) # 微服务代码自动升级实现代码:
# coding: utf-8
"""
服务治理模块
=============
* 目录管理
"""
from nameko.events import event_handler, BROADCAST
from nameko.rpc import rpc
from services.base import TjBaseService
import logging
log = logging.getLogger(__name__)
from nameko.extensions import ENTRYPOINT_EXTENSIONS_ATTR
def is_decorated(obj):
return inspect.isfunction(obj) and getattr(obj, ENTRYPOINT_EXTENSIONS_ATTR, None)
class TjBaseService(object):
def help(self, method_name=None):
"""获取帮助信息。
>> help() # 获取所有支持的方法列表
>> help('help') # 获取 `help` 方法的帮助
"""
if not method_name:
entrypoint_methods = inspect.getmembers(type(self), is_decorated)
ret = [name for name, method in entrypoint_methods]
print(ret)
return ret
else:
method = getattr(self, method_name, None)
if not method:
return 'No this method'
else:
return getattr(method, '__doc__')
class DirService(TjBaseService):
"""目录管理。
* 服务列表
"""
name = 'dir'
@rpc
def version(self):
return '0.1'
@rpc
def help(self, method_name=None):
"""帮助
"""
return super().help(method_name)
@rpc
def list(self):
return [
'dir', 'tjmd5'
]
@event_handler('dir', 'event_live_update', handler_type=BROADCAST, reliable_delivery=False)
def handle_live_update(self, payload):
"""event: dir.event_live_update。在线更新代码"""
log.info('live update...')
import subprocess, os
cwd = os.path.join(os.path.dirname(__file__), '../../')
subprocess.Popen('git pull && make run&', shell=True, cwd=cwd)
log.info('live update complete.')
@rpc
def server(self):
"""获取服务器信息"""
import platform
return platform.version()提到微服务,测试是重中之重
而可测试性,正是nameko设计的一大重点。
测试框架选用的pytest。并重点使用了Mock的特性。
注意服务实现中,相关依赖项(持久存储、第三方服务、其它微服务),要用dependancy的方式实现。这样才被内置的测试特性支持。
上代码。
注意先把
nameko官文读一读:https://nameko.readthedocs.io/en/stable/testing.html#philosophy
def test_query(caplog):
caplog.set_level(logging.DEBUG)
service = worker_factory(Tj1Service)
service.cassandra.session.execute.return_value.one.return_value.value = 'testdata1'
ret = service.query('kkk')
assert ret == (0, 'testdata1')用pytest的fixture特性也好用的很。
class MockCassandra(object):
@property
def session(self):
return self
def execute(self, *args, **kwargs):
log.info('execute args: %s, kwargs: %s', args, kwargs)
return self
def one(self, *args, **kwargs):
log.info('one args: %s, kwargs: %s', args, kwargs)
return namedtuple('cassandra', ['value'])('testdata1')
@pytest.fixture()
def mock_cassandra():
return MockCassandra()
def test_query(caplog, mock_cassandra):
caplog.set_level(logging.DEBUG)
service = worker_factory(Tjmd5Service, cassandra=mock_cassandra)
service.cassandra.session.execute.return_value.one.return_value.value = 'testdata1'
ret = service.query('kkk')
assert ret == (0, 'testdata1')