Swagger (OpenAPI) による API 定義が鬱陶しく感じるようになってきたので、 gRPC を使った API でシンプルに記述したいと思うようになりました。
要求として Python で不自由なく使えることという事項があったため、現状の Python による単体テストについて調査しまとめました。
公式には広くドキュメント化されていない方法ですので、これから変更が加えられる可能性はあります。
Python 3.6.5 と gRPC 1.13.0 の組み合わせで確認しました。
サンプルプロジェクトはこちら
5 月 31 日更新
Pipenv の場合の説明を追加しました。
7 月 22 日更新
“grpcio-testing” パッケージが PyPI から取得できるようになったため、それと Pipenv の利用を前提とした説明に変更しました。
古いページはこちら から参照できると思います。
目次
TL;DR
準備
$ pipenv install grpcio $ pipenv install --dev grpcio-tools $ pipenv install --dev grpcio-testing
|
サーバー側
https://github.com/grpc/grpc/tree/master/src/python/grpcio_tests/tests/testing/_server_test.py
invoke_?_?()
rpc.termination()
rpc.take_response()
rpc.send_request()
rpc.requests_closed()
クライアント用モック側
https://github.com/grpc/grpc/blob/master/src/python/grpcio_tests/tests/testing/_client_test.py
submit()
take_?_?()
rpc.terminate()
rpc.send_response()
rpc.take_request()
rpc.requests_closed()
Python での gRPC を使った開発の準備
単体テストを作成する前に、説明のため基本的な開発状況まで準備を進めます。
まずは gRPC で必要なパッケージをインストールします。
$ pipenv install grpcio $ pipenv install --dev grpcio-tools
|
続いて、 Protocol Buffer ファイルを記述します。
サンプルプロジェクトでは ‘calculation.proto’ としました。
この中には、 “simple RPC”, “response-streaming RPC”, “request-streaming RPC”, “bidirectionally-streaming RPC” の4種類が含まれています。
そして、クライアントとサーバーを生成します。
$ python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./calculator.proto
|
ここから、 ‘server’ フォルダ内では gRPC のサーバーを実装し、 ‘client’ フォルダ内では gRPC のクライアントを利用したライブラリを実装します。
main.py とあるものをそれぞれ実行させると、お互いに通信できます。
単体テストを書く
本体の実装が終わったら、早速それぞれ単体テストを記述していきます。
GitHub の gRPC のリポジトリを探してみると、 src/python/grpcio_tests/tests/testing 以下にテストが存在してます。
これらを参考に、記述していきたいと思います。
まずは単体テストに必要なパッケージを取得します。
$ pipenv install --dev grpcio-testing
|
これにより、 import grpc_testing
と使えるようになります。
テスト対象がどのような RPC の通信をするかを注意して呼び出すものを決める必要があります。
また、ディクショナリを用いて対象を指定したりする場合がありますので、注意が必要です。
以下の説明では、リクエスト、レスポンス共に int32 value
フィールドのみを持ちます。
サーバー実装の単体テスト
src/python/grpcio_tests/tests/testing/_server_test.py を参考にしつつ、実装してみます。
setup
あまりひねったことはありませんが、テスト用にサーバーをセットアップしています。
Simple RPC
calculator_service.pylink6 7
| def Square(self, request, context): return CalculationResponse(value=request.value * request.value)
|
このような Simple RPC 実装に対し、次のようにテストします。
calculator_service_test.pylink27 28 29 30 31 32 33 34 35 36 37 38
| def test_successful_Square(self): request = CalculationRequest(value=2)
rpc = self._real_time_server.invoke_unary_unary( target_service.methods_by_name['Square'], (), request, None)
actual, trailing_metadata, code, details = rpc.termination() expected = CalculationResponse(value=4)
self.assertEqual(expected, actual) self.assertIs(code, grpc.StatusCode.OK)
|
_real_time_server.invoke_unary_unary()
に “Square” の情報を与え、疑似リクエストを送信して RPC オブジェクトを取得します。
rpc.termination()
で擬似的に通信を終了し、疑似クライアントが疑似レスポンスを受け取ります。
- 疑似クライアントが受け取った想定値と結果を比較します。
Response-streaming RPC
calculator_service.pylink9 10 11
| def NaturalNumberGenerator(self, request, context): for i in range(request.value): yield CalculationResponse(value=i + 1)
|
このような Response-streaming RPC 実装に対し、次のようにテストします。
calculator_service_test.pylink40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| def test_successful_NaturalNumberGenerator(self): request = CalculationRequest(value=2)
rpc = self._real_time_server.invoke_unary_stream( target_service.methods_by_name['NaturalNumberGenerator'], (), request, None)
actual = [ rpc.take_response(), rpc.take_response(), ] trailing_metadata, code, details = rpc.termination() expected = [ CalculationResponse(value=1), CalculationResponse(value=2) ]
self.assertEqual(expected[0], actual[0]) self.assertEqual(expected[1], actual[1]) self.assertIs(code, grpc.StatusCode.OK)
|
_real_time_server.invoke_unary_stream()
に “NaturalNumberGenerator” の情報を与え、疑似リクエストを送信して RPC オブジェクトを取得します。
- 想定回数だけ
rpc.take_response()
を行い、疑似クライアントが疑似レスポンスをいくつか受け取ります。過不足なく送られてくることをチェックすると良いと思います。
rpc.termination()
で擬似的に通信を終了し、疑似クライアントが疑似通信結果情報を受け取ります。
- 疑似クライアントが受け取った想定値と結果をそれぞれ比較します。
Request-streaming RPC
calculator_service.pylink13 14 15 16 17
| def Summation(self, request_iterator, context): s = 0 for req in request_iterator: s += req.value return CalculationResponse(value=s)
|
このような Request-streaming RPC 実装に対し、次のようにテストします。
calculator_service_test.pylink61 62 63 64 65 66 67 68 69 70 71 72
| def test_successful_Summation(self): rpc = self._real_time_server.invoke_stream_unary( target_service.methods_by_name['Summation'], (), None) rpc.send_request(CalculationRequest(value=2)) rpc.send_request(CalculationRequest(value=3)) rpc.requests_closed()
actual, trailing_metadata, code, details = rpc.termination() expected = CalculationResponse(value=5)
self.assertEqual(expected, actual) self.assertIs(code, grpc.StatusCode.OK)
|
_real_time_server.invoke_stream_unary()
に “Summation” の情報を与え、疑似的に通信を確立して RPC オブジェクトを取得します。
- テストケースで必要な回数だけ
rpc.send_request()
で擬似リクエストを送信します。
rpc.requests_closed()
で擬似リクエストの終了を宣言します。
rpc.termination()
で擬似的に通信を終了し、疑似クライアントが疑似レスポンスを受け取ります。
- 疑似クライアントが受け取った想定値と結果を比較します。
Bidirectionally-streaming RPC
calculator_service.pylink19 20 21 22 23 24 25 26 27
| def Buffer3Sum(self, request_iterator, context): buffer = [] for req in request_iterator: buffer.append(req.value) if len(buffer) == 3: yield CalculationResponse(value=sum(buffer)) buffer = [] if len(buffer) > 0: yield CalculationResponse(value=sum(buffer))
|
このような Bidirectionally-streaming RPC 実装に対し、次のようにテストします。
calculator_service_test.pylink74 75 76 77 78 79 80 81 82 83 84
| def test_successful_Buffer3Sum(self): rpc = self._real_time_server.invoke_stream_stream( target_service.methods_by_name['Buffer3Sum'], (), None) rpc.send_request(CalculationRequest(value=1)) rpc.send_request(CalculationRequest(value=2)) rpc.send_request(CalculationRequest(value=3))
actual = rpc.take_response() expected = CalculationResponse(value=6)
self.assertEqual(expected, actual)
|
95 96 97 98 99 100 101 102 103 104 105
| rpc.send_request(CalculationRequest(value=7)) rpc.send_request(CalculationRequest(value=8)) rpc.requests_closed()
trailing_metadata, code, details = rpc.termination() actual = rpc.take_response() expected = CalculationResponse(value=15)
self.assertEqual(expected, actual) self.assertIs(code, grpc.StatusCode.OK)
|
_real_time_server.invoke_stream_stream()
に “Buffer3Sum” の情報を与え、疑似的に通信を確立して RPC オブジェクトを取得します。
- テストケースで必要な回数だけ
rpc.send_request()
で擬似リクエストを送信します。
- 同様に、想定回数だけ
rpc.take_response()
で擬似クライアントが疑似レスポンスをいくつか受け取ります。この組み合わせが任意の回数繰り返されます。
rpc.requests_closed()
で擬似リクエストの終了を宣言します。
rpc.termination()
で擬似的に通信を終了し、疑似クライアントが疑似通信結果情報を受け取ります。
- (存在するものの場合、)通信終了時に受け取る疑似レスポンスをいくつか受け取ります。
- 疑似クライアントが受け取った想定値と結果をそれぞれ比較します。
クライアントライブラリの単体テスト
src/python/grpcio_tests/tests/testing/_client_test.py を参考にしつつ、ライブラリの単体テストに合うよう変形して実装してみます。
setup, tearDown
こちらもあまりひねったことはありません。テスト用チャンネルとスレッドプールを用意しています。
Simple RPC
client_lib.pylink9 10
| def square(self, number): return self.stub.Square(CalculationRequest(value=number)).value
|
このような Simple RPC を用いたメソッドに対し、次のようにテストします。
client_lib_test.pylink29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| def test_successful_square(self): arguments = 2
def run(scenario, channel): connector = Connector(channel) return connector.square(scenario)
application_future = self._client_execution_thread_pool.submit( run, arguments, self._real_time_channel) invocation_metadata, actual_request, rpc = ( self._real_time_channel.take_unary_unary( target_service.methods_by_name['Square']))
expected_request = CalculationRequest(value=2)
self.assertEqual(expected_request, actual_request)
rpc.terminate(CalculationResponse(value=4), (), grpc.StatusCode.OK, '')
actual_result = application_future.result() expected_result = 4
self.assertEqual(expected_result, actual_result)
|
- テスト対象のメソッドを利用する関数を定義します。
_client_execution_thread_pool.submit()
により、先ほど定義した処理の結果の Future オブジェクトが得られます。
_real_time_channel.take_unary_unary()
に “Square” の情報を与え、テスト対象から受け取った疑似リクエストと RPC オブジェクトを取得します。
- 疑似サーバーが受け取った疑似リクエストの想定値と結果を比較します。
rpc.terminate()
で擬似的に通信を終了し、テスト対象へ疑似レスポンスを送ります。
- テスト対象の出力の想定値と、 Future オブジェクトから得られる結果を比較します。
Response-streaming RPC
client_lib.pylink12 13 14 15 16 17
| def natural_numbers_lq(self, number): response_iterator = self.stub.NaturalNumberGenerator(CalculationRequest(value=number)) results = [] for response in response_iterator: results.append(response.value) return results
|
このような Response-streaming RPC を用いたメソッドに対し、次のようにテストします。
client_lib_test.pylink54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| def test_successful_natural_numbers_lq(self): arguments = 2
def run(scenario, channel): connector = Connector(channel) return connector.natural_numbers_lq(scenario)
application_future = self._client_execution_thread_pool.submit( run, arguments, self._fake_time_channel) invocation_metadata, actual_request, rpc = ( self._fake_time_channel.take_unary_stream( target_service.methods_by_name['NaturalNumberGenerator']))
expected_request = CalculationRequest(value=2)
self.assertEqual(expected_request, actual_request)
rpc.send_response(CalculationResponse(value=1)) rpc.send_response(CalculationResponse(value=2))
rpc.terminate((), grpc.StatusCode.OK, '')
actual_result = application_future.result() expected_result = [1, 2]
self.assertListEqual(expected_result, actual_result)
|
- テスト対象のメソッドを利用する関数を定義します。
_client_execution_thread_pool.submit()
により、先ほど定義した処理の結果の Future オブジェクトが得られます。
_real_time_channel.take_unary_stream()
に “NaturalNumberGenerator” の情報を与え、テスト対象から受け取った疑似リクエストと RPC オブジェクトを取得します。
- 疑似サーバーが受け取った疑似リクエストの想定値と結果を比較します。
rpc.send_response()
でテスト対象へ想定される疑似レスポンスをいくつか送ります。
rpc.terminate()
で擬似的に通信を終了し、テスト対象へ疑似通信結果情報を送ります。
- テスト対象の出力の想定値と、 Future オブジェクトから得られる結果を比較します。
Request-streaming RPC
client_lib.pylink19 20 21 22 23 24
| def summation(self, array): def itr(): for number in array: yield CalculationRequest(value=number)
return self.stub.Summation(itr()).value
|
このような Request-streaming RPC を用いたメソッドに対し、次のようにテストします。
client_lib_test.pylink81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| def test_successful_summation(self): arguments = [2, 3]
def run(scenario, channel): connector = Connector(channel) return connector.summation(scenario)
application_future = self._client_execution_thread_pool.submit( run, arguments, self._real_time_channel) invocation_metadata, rpc = self._real_time_channel.take_stream_unary( target_service.methods_by_name['Summation'])
actual_request1 = rpc.take_request() actual_request2 = rpc.take_request() actual_requests = [actual_request1, actual_request2] expected_requests = [CalculationRequest(value=2), CalculationRequest(value=3)]
self.assertListEqual(expected_requests, actual_requests)
rpc.requests_closed() rpc.terminate(CalculationResponse(value=5), (), grpc.StatusCode.OK, '')
actual_result = application_future.result() expected_result = 5
self.assertEqual(expected_result, actual_result)
|
- テスト対象のメソッドを利用する関数を定義します。
_client_execution_thread_pool.submit()
により、先ほど定義した処理の結果の Future オブジェクトが得られます。
_real_time_channel.take_stream_unary()
に “Summation” の情報を与え、 RPC オブジェクトを取得します。
rpc.take_request()
で、テスト対象からの疑似リクエストを想定回数分だけ受け取ります。過不足なく送られてくることをチェックすると良いと思います。
- 疑似サーバーが受け取った疑似リクエストの想定値と結果をそれぞれ比較します。
rpc.requests_closed()
で擬似リクエストの終了を宣言します。
rpc.terminate()
で擬似的に通信を終了し、テスト対象へ疑似レスポンスを送ります。
- テスト対象の出力の想定値と、 Future オブジェクトから得られる結果を比較します。
Bidirectionally-streaming RPC
calculator_service.pylink26 27 28 29 30 31 32 33 34 35
| def buffer3_sum(self, array): def itr(): for number in array: yield CalculationRequest(value=number)
response_iterator = self.stub.Buffer3Sum(itr()) results = [] for response in response_iterator: results.append(response.value) return results
|
このような Bidirectionally-streaming RPC 実装に対し、次のようにテストします。
client_lib_test.pylink108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| def test_successful_buffer3_sum(self): arguments = [1, 2, 3, 4, 5, 6, 7, 8]
def run(scenario, channel): connector = Connector(channel) return connector.buffer3_sum(scenario)
application_future = self._client_execution_thread_pool.submit( run, arguments, self._fake_time_channel) invocation_metadata, rpc = self._fake_time_channel.take_stream_stream( target_service.methods_by_name['Buffer3Sum'])
actual_request1 = rpc.take_request() actual_request2 = rpc.take_request() actual_request3 = rpc.take_request() actual_requests = [actual_request1, actual_request2, actual_request3] expected_requests = [CalculationRequest(value=1), CalculationRequest(value=2), CalculationRequest(value=3)]
self.assertListEqual(expected_requests, actual_requests)
rpc.send_response(CalculationResponse(value=6))
|
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
| actual_request1 = rpc.take_request() actual_request2 = rpc.take_request() actual_requests = [actual_request1, actual_request2] expected_requests = [CalculationRequest(value=7), CalculationRequest(value=8)]
self.assertListEqual(expected_requests, actual_requests)
rpc.requests_closed() rpc.send_response(CalculationResponse(value=15)) rpc.terminate((), grpc.StatusCode.OK, '')
actual_result = application_future.result() expected_result = [6, 15, 15]
self.assertListEqual(expected_result, actual_result)
|
- テスト対象のメソッドを利用する関数を定義します。
_client_execution_thread_pool.submit()
により、先ほど定義した処理の結果の Future オブジェクトが得られます。
_fake_time_channel.take_stream_stream()
に “Buffer3Sum” の情報を与え、 RPC オブジェクトを取得します。
rpc.take_request()
で、テスト対象からの疑似リクエストを想定回数分だけ受け取ります。
- 疑似サーバーが受け取った疑似リクエストの想定値と結果をそれぞれ比較します。この組み合わせが任意の回数繰り返されます。
rpc.requests_closed()
で擬似リクエストの終了を宣言します。
rpc.terminate()
で擬似的に通信を終了し、テスト対象へ疑似通信結果情報(存在すれば疑似レスポンスもいくつか)を送ります。
- テスト対象の出力の想定値と、 Future オブジェクトから得られる結果を比較します。
まとめ
Python から gRPC を使う際の、単体テスト方法を取り上げました。
必要となるパッケージの取得後、サーバー向けでは invoke_?_?()
、 rpc.termination()
、 rpc.take_response()
、 rpc.send_request()
、 rpc.requests_closed()
を、クライアントのライブラリ向けモックとしては submit()
、 take_?_?()
、 rpc.terminate()
、 rpc.send_response()
、 rpc.take_request()
、 rpc.requests_closed()
などを使って記述できます。
ここからは大きくは変わらないと思いますが、テスト方法のドキュメントがしっかりするまでは注視が必要です。