用nameko构建微服务
复用代码,有两种方式。一种是做成package
,另一种,就是做成微服务
做成微服务
,少了引包、装包的依赖、配置各个参数的各种麻烦。而且包的实现,对调用者是透明的。
但是我觉的,微服务也不能滥用。适可而止。把握好做package
与做微服务
的界限。但还是:
能做微服务
,就做微服务
。做微服务反带来麻烦的,就果断继续用package
来封装,方便调用。
微服务
我认为,定位还是企业内部用的。不宜开放给第三方。要开放给第三方,还是单做一个服务网关
吧。
比如kong
,做为服务网关
就很好。
微服务
,还是服务于内部代码、业务逻辑复用比较好些。
这样一个定位,我认为,nameko
就是python
的绝佳微服务
实现利器。
服务注册、发现、与代码升级
欣赏nameko
的地方,在于它做的极简。只实现必要的功能,其它放手给开发者做。
微服务
要想好用,离不开服务的注册、发现、代码升级。用nameko
实现起来也很简单。放上我的代码。
下面这段代码,只要每个具体实现的微服务
,继承这个TjBaseService
对象,就会自动有help()
方法。可以直接看这个微服务
都具体提供了哪些rpc
调用。调用dir.list()
,可以直接看到所有的微服务
列表(这块硬编码,因为服务太少,还没必要做成持久化的)。
用法:
$ nameko shell
n.rpc.dir.help() # 看 dir 服务有哪些方法
n.rpc.dir.list() # 看有哪些微服务
n.dispatch_event('dir', 'event_live_update', None) # 微服务代码自动升级
实现代码:
# coding: utf-8
"""
服务治理模块
=============
* 目录管理
"""
from nameko.events import event_handler, BROADCAST
from nameko.rpc import rpc
from services.base import TjBaseService
import logging
log = logging.getLogger(__name__)
from nameko.extensions import ENTRYPOINT_EXTENSIONS_ATTR
def is_decorated(obj):
return inspect.isfunction(obj) and getattr(obj, ENTRYPOINT_EXTENSIONS_ATTR, None)
class TjBaseService(object):
def help(self, method_name=None):
"""获取帮助信息。
>> help() # 获取所有支持的方法列表
>> help('help') # 获取 `help` 方法的帮助
"""
if not method_name:
entrypoint_methods = inspect.getmembers(type(self), is_decorated)
ret = [name for name, method in entrypoint_methods]
print(ret)
return ret
else:
method = getattr(self, method_name, None)
if not method:
return 'No this method'
else:
return getattr(method, '__doc__')
class DirService(TjBaseService):
"""目录管理。
* 服务列表
"""
name = 'dir'
@rpc
def version(self):
return '0.1'
@rpc
def help(self, method_name=None):
"""帮助
"""
return super().help(method_name)
@rpc
def list(self):
return [
'dir', 'tjmd5'
]
@event_handler('dir', 'event_live_update', handler_type=BROADCAST, reliable_delivery=False)
def handle_live_update(self, payload):
"""event: dir.event_live_update。在线更新代码"""
log.info('live update...')
import subprocess, os
cwd = os.path.join(os.path.dirname(__file__), '../../')
subprocess.Popen('git pull && make run&', shell=True, cwd=cwd)
log.info('live update complete.')
@rpc
def server(self):
"""获取服务器信息"""
import platform
return platform.version()
提到微服务
,测试是重中之重
而可测试性,正是nameko
设计的一大重点。
测试框架选用的pytest
。并重点使用了Mock
的特性。
注意服务实现中,相关依赖项(持久存储、第三方服务、其它微服务),要用dependancy
的方式实现。这样才被内置的测试特性支持。
上代码。
注意先把
nameko
官文读一读:https://nameko.readthedocs.io/en/stable/testing.html#philosophy
def test_query(caplog):
caplog.set_level(logging.DEBUG)
service = worker_factory(Tj1Service)
service.cassandra.session.execute.return_value.one.return_value.value = 'testdata1'
ret = service.query('kkk')
assert ret == (0, 'testdata1')
用pytest
的fixture
特性也好用的很。
class MockCassandra(object):
@property
def session(self):
return self
def execute(self, *args, **kwargs):
log.info('execute args: %s, kwargs: %s', args, kwargs)
return self
def one(self, *args, **kwargs):
log.info('one args: %s, kwargs: %s', args, kwargs)
return namedtuple('cassandra', ['value'])('testdata1')
@pytest.fixture()
def mock_cassandra():
return MockCassandra()
def test_query(caplog, mock_cassandra):
caplog.set_level(logging.DEBUG)
service = worker_factory(Tjmd5Service, cassandra=mock_cassandra)
service.cassandra.session.execute.return_value.one.return_value.value = 'testdata1'
ret = service.query('kkk')
assert ret == (0, 'testdata1')