Python で gRPC の単体テスト

Swagger (OpenAPI) による API 定義が鬱陶しく感じるようになってきたので、 gRPC を使った API でシンプルに記述したいと思うようになりました。
要求として Python で不自由なく使えることという事項があったため、現状の Python による単体テストについて調査しまとめました。

公式には広くドキュメント化されていない方法ですので、これから変更が加えられる可能性はあります。

Python 3.6.5 と gRPC 1.13.0 の組み合わせで確認しました。

サンプルプロジェクトはこちら

5 月 31 日更新
Pipenv の場合の説明を追加しました。

7 月 22 日更新
“grpcio-testing” パッケージが PyPI から取得できるようになったため、それと Pipenv の利用を前提とした説明に変更しました。
古いページはこちら から参照できると思います。

目次

TL;DR

準備

$ pipenv install grpcio
$ pipenv install --dev grpcio-tools
$ pipenv install --dev grpcio-testing

サーバー側

https://github.com/grpc/grpc/tree/master/src/python/grpcio_tests/tests/testing/_server_test.py

  • invoke_?_?()
  • rpc.termination()
  • rpc.take_response()
  • rpc.send_request()
  • rpc.requests_closed()

クライアント用モック側

https://github.com/grpc/grpc/blob/master/src/python/grpcio_tests/tests/testing/_client_test.py

  • submit()
  • take_?_?()
  • rpc.terminate()
  • rpc.send_response()
  • rpc.take_request()
  • rpc.requests_closed()

Python での gRPC を使った開発の準備

単体テストを作成する前に、説明のため基本的な開発状況まで準備を進めます。
まずは gRPC で必要なパッケージをインストールします。

$ pipenv install grpcio
$ pipenv install --dev grpcio-tools

続いて、 Protocol Buffer ファイルを記述します。
サンプルプロジェクトでは ‘calculation.proto’ としました。
この中には、 “simple RPC”, “response-streaming RPC”, “request-streaming RPC”, “bidirectionally-streaming RPC” の4種類が含まれています。

そして、クライアントとサーバーを生成します。

$ python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./calculator.proto

ここから、 ‘server’ フォルダ内では gRPC のサーバーを実装し、 ‘client’ フォルダ内では gRPC のクライアントを利用したライブラリを実装します。
main.py とあるものをそれぞれ実行させると、お互いに通信できます。

単体テストを書く

本体の実装が終わったら、早速それぞれ単体テストを記述していきます。

GitHub の gRPC のリポジトリを探してみると、 src/python/grpcio_tests/tests/testing 以下にテストが存在してます。
これらを参考に、記述していきたいと思います。

まずは単体テストに必要なパッケージを取得します。

$ pipenv install --dev grpcio-testing

これにより、 import grpc_testing と使えるようになります。

テスト対象がどのような RPC の通信をするかを注意して呼び出すものを決める必要があります。
また、ディクショナリを用いて対象を指定したりする場合がありますので、注意が必要です。

以下の説明では、リクエスト、レスポンス共に int32 value フィールドのみを持ちます。

サーバー実装の単体テスト

src/python/grpcio_tests/tests/testing/_server_test.py を参考にしつつ、実装してみます。

setup

あまりひねったことはありませんが、テスト用にサーバーをセットアップしています。

Simple RPC

calculator_service.pylink
6
7
def Square(self, request, context):
return CalculationResponse(value=request.value * request.value)

このような Simple RPC 実装に対し、次のようにテストします。

calculator_service_test.pylink
27
28
29
30
31
32
33
34
35
36
37
38
def test_successful_Square(self):
request = CalculationRequest(value=2)

rpc = self._real_time_server.invoke_unary_unary(
target_service.methods_by_name['Square'], (),
request, None)

actual, trailing_metadata, code, details = rpc.termination()
expected = CalculationResponse(value=4)

self.assertEqual(expected, actual)
self.assertIs(code, grpc.StatusCode.OK)
  1. _real_time_server.invoke_unary_unary() に “Square” の情報を与え、疑似リクエストを送信して RPC オブジェクトを取得します。
  2. rpc.termination() で擬似的に通信を終了し、疑似クライアントが疑似レスポンスを受け取ります。
  3. 疑似クライアントが受け取った想定値と結果を比較します。

Response-streaming RPC

calculator_service.pylink
9
10
11
def NaturalNumberGenerator(self, request, context):
for i in range(request.value):
yield CalculationResponse(value=i + 1)

このような Response-streaming RPC 実装に対し、次のようにテストします。

calculator_service_test.pylink
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def test_successful_NaturalNumberGenerator(self):
request = CalculationRequest(value=2)

rpc = self._real_time_server.invoke_unary_stream(
target_service.methods_by_name['NaturalNumberGenerator'], (),
request, None)

actual = [
rpc.take_response(),
rpc.take_response(),
]
trailing_metadata, code, details = rpc.termination()
expected = [
CalculationResponse(value=1),
CalculationResponse(value=2)
]

self.assertEqual(expected[0], actual[0])
self.assertEqual(expected[1], actual[1])
self.assertIs(code, grpc.StatusCode.OK)
  1. _real_time_server.invoke_unary_stream() に “NaturalNumberGenerator” の情報を与え、疑似リクエストを送信して RPC オブジェクトを取得します。
  2. 想定回数だけ rpc.take_response() を行い、疑似クライアントが疑似レスポンスをいくつか受け取ります。過不足なく送られてくることをチェックすると良いと思います。
  3. rpc.termination() で擬似的に通信を終了し、疑似クライアントが疑似通信結果情報を受け取ります。
  4. 疑似クライアントが受け取った想定値と結果をそれぞれ比較します。

Request-streaming RPC

calculator_service.pylink
13
14
15
16
17
def Summation(self, request_iterator, context):
s = 0
for req in request_iterator:
s += req.value
return CalculationResponse(value=s)

このような Request-streaming RPC 実装に対し、次のようにテストします。

calculator_service_test.pylink
61
62
63
64
65
66
67
68
69
70
71
72
def test_successful_Summation(self):
rpc = self._real_time_server.invoke_stream_unary(
target_service.methods_by_name['Summation'], (), None)
rpc.send_request(CalculationRequest(value=2))
rpc.send_request(CalculationRequest(value=3))
rpc.requests_closed()

actual, trailing_metadata, code, details = rpc.termination()
expected = CalculationResponse(value=5)

self.assertEqual(expected, actual)
self.assertIs(code, grpc.StatusCode.OK)
  1. _real_time_server.invoke_stream_unary() に “Summation” の情報を与え、疑似的に通信を確立して RPC オブジェクトを取得します。
  2. テストケースで必要な回数だけ rpc.send_request() で擬似リクエストを送信します。
  3. rpc.requests_closed() で擬似リクエストの終了を宣言します。
  4. rpc.termination() で擬似的に通信を終了し、疑似クライアントが疑似レスポンスを受け取ります。
  5. 疑似クライアントが受け取った想定値と結果を比較します。

Bidirectionally-streaming RPC

calculator_service.pylink
19
20
21
22
23
24
25
26
27
def Buffer3Sum(self, request_iterator, context):
buffer = []
for req in request_iterator:
buffer.append(req.value)
if len(buffer) == 3:
yield CalculationResponse(value=sum(buffer))
buffer = []
if len(buffer) > 0:
yield CalculationResponse(value=sum(buffer))

このような Bidirectionally-streaming RPC 実装に対し、次のようにテストします。

calculator_service_test.pylink
74
75
76
77
78
79
80
81
82
83
84
def test_successful_Buffer3Sum(self):
rpc = self._real_time_server.invoke_stream_stream(
target_service.methods_by_name['Buffer3Sum'], (), None)
rpc.send_request(CalculationRequest(value=1))
rpc.send_request(CalculationRequest(value=2))
rpc.send_request(CalculationRequest(value=3))

actual = rpc.take_response()
expected = CalculationResponse(value=6)

self.assertEqual(expected, actual)
0
# omit
95
96
97
98
99
100
101
102
103
104
105
    rpc.send_request(CalculationRequest(value=7))
rpc.send_request(CalculationRequest(value=8))
rpc.requests_closed()

trailing_metadata, code, details = rpc.termination()
actual = rpc.take_response()
expected = CalculationResponse(value=15)

self.assertEqual(expected, actual)
self.assertIs(code, grpc.StatusCode.OK)
#
  1. _real_time_server.invoke_stream_stream() に “Buffer3Sum” の情報を与え、疑似的に通信を確立して RPC オブジェクトを取得します。
  2. テストケースで必要な回数だけ rpc.send_request() で擬似リクエストを送信します。
  3. 同様に、想定回数だけ rpc.take_response() で擬似クライアントが疑似レスポンスをいくつか受け取ります。この組み合わせが任意の回数繰り返されます。
  4. rpc.requests_closed() で擬似リクエストの終了を宣言します。
  5. rpc.termination() で擬似的に通信を終了し、疑似クライアントが疑似通信結果情報を受け取ります。
  6. (存在するものの場合、)通信終了時に受け取る疑似レスポンスをいくつか受け取ります。
  7. 疑似クライアントが受け取った想定値と結果をそれぞれ比較します。

クライアントライブラリの単体テスト

src/python/grpcio_tests/tests/testing/_client_test.py を参考にしつつ、ライブラリの単体テストに合うよう変形して実装してみます。

setup, tearDown

こちらもあまりひねったことはありません。テスト用チャンネルとスレッドプールを用意しています。

Simple RPC

client_lib.pylink
9
10
def square(self, number):
return self.stub.Square(CalculationRequest(value=number)).value

このような Simple RPC を用いたメソッドに対し、次のようにテストします。

client_lib_test.pylink
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def test_successful_square(self):
arguments = 2

def run(scenario, channel):
connector = Connector(channel)
return connector.square(scenario) # test target

application_future = self._client_execution_thread_pool.submit(
run, arguments, self._real_time_channel)
invocation_metadata, actual_request, rpc = (
self._real_time_channel.take_unary_unary(
target_service.methods_by_name['Square']))

expected_request = CalculationRequest(value=2)

self.assertEqual(expected_request, actual_request)

rpc.terminate(CalculationResponse(value=4), (),
grpc.StatusCode.OK, '')

actual_result = application_future.result()
expected_result = 4

self.assertEqual(expected_result, actual_result)
  1. テスト対象のメソッドを利用する関数を定義します。
  2. _client_execution_thread_pool.submit() により、先ほど定義した処理の結果の Future オブジェクトが得られます。
  3. _real_time_channel.take_unary_unary() に “Square” の情報を与え、テスト対象から受け取った疑似リクエストと RPC オブジェクトを取得します。
  4. 疑似サーバーが受け取った疑似リクエストの想定値と結果を比較します。
  5. rpc.terminate() で擬似的に通信を終了し、テスト対象へ疑似レスポンスを送ります。
  6. テスト対象の出力の想定値と、 Future オブジェクトから得られる結果を比較します。

Response-streaming RPC

client_lib.pylink
12
13
14
15
16
17
def natural_numbers_lq(self, number):
response_iterator = self.stub.NaturalNumberGenerator(CalculationRequest(value=number))
results = []
for response in response_iterator:
results.append(response.value)
return results

このような Response-streaming RPC を用いたメソッドに対し、次のようにテストします。

client_lib_test.pylink
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def test_successful_natural_numbers_lq(self):
arguments = 2

def run(scenario, channel):
connector = Connector(channel)
return connector.natural_numbers_lq(scenario)

application_future = self._client_execution_thread_pool.submit(
run, arguments, self._fake_time_channel)
invocation_metadata, actual_request, rpc = (
self._fake_time_channel.take_unary_stream(
target_service.methods_by_name['NaturalNumberGenerator']))

expected_request = CalculationRequest(value=2)

self.assertEqual(expected_request, actual_request)

rpc.send_response(CalculationResponse(value=1))
rpc.send_response(CalculationResponse(value=2))

rpc.terminate((), grpc.StatusCode.OK, '')

actual_result = application_future.result()
expected_result = [1, 2]

self.assertListEqual(expected_result, actual_result)
  1. テスト対象のメソッドを利用する関数を定義します。
  2. _client_execution_thread_pool.submit() により、先ほど定義した処理の結果の Future オブジェクトが得られます。
  3. _real_time_channel.take_unary_stream() に “NaturalNumberGenerator” の情報を与え、テスト対象から受け取った疑似リクエストと RPC オブジェクトを取得します。
  4. 疑似サーバーが受け取った疑似リクエストの想定値と結果を比較します。
  5. rpc.send_response() でテスト対象へ想定される疑似レスポンスをいくつか送ります。
  6. rpc.terminate() で擬似的に通信を終了し、テスト対象へ疑似通信結果情報を送ります。
  7. テスト対象の出力の想定値と、 Future オブジェクトから得られる結果を比較します。

Request-streaming RPC

client_lib.pylink
19
20
21
22
23
24
def summation(self, array):
def itr():
for number in array:
yield CalculationRequest(value=number)

return self.stub.Summation(itr()).value

このような Request-streaming RPC を用いたメソッドに対し、次のようにテストします。

client_lib_test.pylink
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
def test_successful_summation(self):
arguments = [2, 3]

def run(scenario, channel):
connector = Connector(channel)
return connector.summation(scenario)

application_future = self._client_execution_thread_pool.submit(
run, arguments, self._real_time_channel)
invocation_metadata, rpc = self._real_time_channel.take_stream_unary(
target_service.methods_by_name['Summation'])

actual_request1 = rpc.take_request()
actual_request2 = rpc.take_request()
actual_requests = [actual_request1, actual_request2]
expected_requests = [CalculationRequest(value=2), CalculationRequest(value=3)]

self.assertListEqual(expected_requests, actual_requests)

rpc.requests_closed()
rpc.terminate(CalculationResponse(value=5), (), grpc.StatusCode.OK, '')

actual_result = application_future.result()
expected_result = 5

self.assertEqual(expected_result, actual_result)
  1. テスト対象のメソッドを利用する関数を定義します。
  2. _client_execution_thread_pool.submit() により、先ほど定義した処理の結果の Future オブジェクトが得られます。
  3. _real_time_channel.take_stream_unary() に “Summation” の情報を与え、 RPC オブジェクトを取得します。
  4. rpc.take_request() で、テスト対象からの疑似リクエストを想定回数分だけ受け取ります。過不足なく送られてくることをチェックすると良いと思います。
  5. 疑似サーバーが受け取った疑似リクエストの想定値と結果をそれぞれ比較します。
  6. rpc.requests_closed() で擬似リクエストの終了を宣言します。
  7. rpc.terminate() で擬似的に通信を終了し、テスト対象へ疑似レスポンスを送ります。
  8. テスト対象の出力の想定値と、 Future オブジェクトから得られる結果を比較します。

Bidirectionally-streaming RPC

calculator_service.pylink
26
27
28
29
30
31
32
33
34
35
def buffer3_sum(self, array):
def itr():
for number in array:
yield CalculationRequest(value=number)

response_iterator = self.stub.Buffer3Sum(itr())
results = []
for response in response_iterator:
results.append(response.value)
return results

このような Bidirectionally-streaming RPC 実装に対し、次のようにテストします。

client_lib_test.pylink
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def test_successful_buffer3_sum(self):
arguments = [1, 2, 3, 4, 5, 6, 7, 8]

def run(scenario, channel):
connector = Connector(channel)
return connector.buffer3_sum(scenario)

application_future = self._client_execution_thread_pool.submit(
run, arguments, self._fake_time_channel)
invocation_metadata, rpc = self._fake_time_channel.take_stream_stream(
target_service.methods_by_name['Buffer3Sum'])

actual_request1 = rpc.take_request()
actual_request2 = rpc.take_request()
actual_request3 = rpc.take_request()
actual_requests = [actual_request1, actual_request2, actual_request3]
expected_requests = [CalculationRequest(value=1),
CalculationRequest(value=2),
CalculationRequest(value=3)]

self.assertListEqual(expected_requests, actual_requests)

rpc.send_response(CalculationResponse(value=6))
0
# omit
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
    actual_request1 = rpc.take_request()
actual_request2 = rpc.take_request()
actual_requests = [actual_request1, actual_request2]
expected_requests = [CalculationRequest(value=7),
CalculationRequest(value=8)]

self.assertListEqual(expected_requests, actual_requests)

rpc.requests_closed()
rpc.send_response(CalculationResponse(value=15))
rpc.terminate((), grpc.StatusCode.OK, '')

actual_result = application_future.result()
expected_result = [6, 15, 15]

self.assertListEqual(expected_result, actual_result)
#
  1. テスト対象のメソッドを利用する関数を定義します。
  2. _client_execution_thread_pool.submit() により、先ほど定義した処理の結果の Future オブジェクトが得られます。
  3. _fake_time_channel.take_stream_stream() に “Buffer3Sum” の情報を与え、 RPC オブジェクトを取得します。
  4. rpc.take_request() で、テスト対象からの疑似リクエストを想定回数分だけ受け取ります。
  5. 疑似サーバーが受け取った疑似リクエストの想定値と結果をそれぞれ比較します。この組み合わせが任意の回数繰り返されます。
  6. rpc.requests_closed() で擬似リクエストの終了を宣言します。
  7. rpc.terminate() で擬似的に通信を終了し、テスト対象へ疑似通信結果情報(存在すれば疑似レスポンスもいくつか)を送ります。
  8. テスト対象の出力の想定値と、 Future オブジェクトから得られる結果を比較します。

まとめ

Python から gRPC を使う際の、単体テスト方法を取り上げました。
必要となるパッケージの取得後、サーバー向けでは invoke_?_?()rpc.termination()rpc.take_response()rpc.send_request()rpc.requests_closed() を、クライアントのライブラリ向けモックとしては submit()take_?_?()rpc.terminate()rpc.send_response()rpc.take_request()rpc.requests_closed() などを使って記述できます。

ここからは大きくは変わらないと思いますが、テスト方法のドキュメントがしっかりするまでは注視が必要です。