Python で gRPC の Metadata と Interceptor を使う

この記事では、前回( Python で gRPC の単体テスト)に引き続き、 Python で gRPC を使う際の知見をご紹介します。

主に、メタデータとインターセプターの実装法と、そのテスト方法がメインになっています。
まだ experimental API だったり非 public クラスだったりしているので、公式のアナウンスがあるまではプロダクション利用は難しいようにも思いました。

目次

エラーの取扱

gRPC のエラーの方法は、 gRPC Errors が詳しいです。
エラー用のコードは、(言語は違うものの) Status.Code Javadoc を参考にするとわかりやすいかと思います。

サーバーの単体テストの際には、コード例 での rpc.termination() における戻り値 code を使うことで、想定通りのエラーとなっているかを確認できます。
クライアントライブラリの単体テストの際には、コード例 での rpc.terminate()grpc.StatusCode.OK としている引数を使うことで、ライブラリが想定どおりにエラーをハンドルできているかを確認できます。

メタデータについて

gRPC では事前に定義したフォーマットのメッセージをやりとりします。一方で、一時的な情報や横断的な情報を付与したい場合には不便なことがあります。
そんなこともあってか、 gRPC には「メタデータ( Metadata )」という仕組みがあり、文字列ベースで自由な情報をメッセージに付加できます。

一口にメタデータと言っても、現時点の gRPC には 3 つの種類があります。(Streaming の有無・種類によりません)

  • Invocation Metadata は、リクエスト時にクライアント側がヘッダ情報として最初に渡すメタデータです。
  • Initial Metadata は、レスポンス時にサーバー側がヘッダ情報として最初に返すメタデータです。
  • Trailing Metadata は、レスポンス時にサーバー側がフッタ情報として最後に返すメタデータです。

(リクエスト時のフッタ情報というものがあってもいいような感じもしますが…)

依存元も依存先もフルにメタデータを活用しているような gRPC サービスは次の手順でメタデータ処理を行う可能性があります。

Metadata processes Metadata processes

リクエストのチェーンでは以下のようになる可能性があります。

  1. “client” が Invocation Metadata を送信
  2. “server” が Invocation Metadata を受信
  3. “server” が Invocation Metadata を送信
  4. “upstream server” が Invocation Metadata を受信

レスポンスのチェーンでは、Simple RPC と Request-streaming RPC の場所では以下の 2 つが同時に、 Response-streaming PRC と Bidirectional streaming では以下の 2 つが並行(Concurrent)に動く可能性があります。

  1. “upstream server” が Initial Metadata を送信
  2. “server” が Initial Metadata を受信
  3. “server” が Initial Metadata を送信
  4. “client” が Initial Metadata を受信
  1. “upstream server” が Trailing Metadata を送信
  2. “server” が Trailing Metadata を受信
  3. “server” が Trailing Metadata を送信
  4. “client” が Trailing Metadata を受信

(実際はより複雑に絡み合うことになるかもしれません。)

メタデータを利用する例

この記事では、以下のような構成のシステムを用いてサンプルコードを示します。

Example system Example system
  1. “client” が “quote_service” に “(key_auth_client_interceptor)” を通過したリクエストを送信する。
  2. “quote_service” が “key_auth_server_interceptor” を通過したリクエストを受信する。
  3. “quote_service” が “cowsay_client” を介して “cowsay_service” に “tracer_client_interceptor” を通過したリクエストを送信する。
  4. “cowsay_service” がリクエストを受信する。
  5. “cowsay_service” が “quote_service” にレスポンスを送信する。
  6. “quote_service” が “cowsay_client” を介して “tracer_client_interceptor” を通過したレスポンスを受信する。
  7. “quote_service” が “key_auth_server_interceptor” を通過したレスポンスを送信する。
  8. “client” が “(key_auth_client_interceptor)” を通過したレスポンスを受信する。

各サーバー・クライアントでは、メタデータをレスポンスから取得したりリクエストに付加したりできます。ここではその例を示します。(インターセプタは次の章で説明します。)

サーバー

サーバーでの例として、 “quote_service” の部分を取り上げます。

メソッドの中で、 context.invocation_metadata() を呼び出すことで、クライアントから送られてきた Invocation Metadata をすべて取得できます。
また、 context.send_initial_metadata([('a', 'b'), ('c', 'd')]) とすることで Initial Metadata を、 context.set_trailing_metadata([('e', 'f'), ('g', 'h')]) とすることで Trailing Metadata を付加できます。前者はレスポンス前に送るので “send” に、後者はレスポンス後に送るので予約の意味を込めて “set” となっているのでしょうか?

メソッドの実装として、エラーを返す場合の情報としてメタデータを付与して返すというパターンも存在します。 context.set_details() に文字列を詰め込むよりはいい方法だと思いますが、クライアントでの利用には取り決めが必要になるでしょう。

クライアントライブラリ

クライアントライブラリ( Stub を利用するコード)での例として、 “cowsay_client” の部分を取り上げます。

スタブ呼び出しのときに stub.Get(req, metadata=[('z', 'y'), ('x', 'w')])stub.Get.future(req, metadata=[('z', 'y'), ('x', 'w')]) のように metadata という名前付き引数を渡すことで、サーバーに Invocation Metadata を送信できます。
受け取るときは、 stub.Get.future() という形式で送信したときの戻り値( f とする)に対して、 f.initial_metadata() で Initial Metadata を、 f.trailing_metadata() で Trailing Metadata を取得できます。メソッドの種類によっては後者が即時評価できるわけではないのでこうなっているのでしょうか?

メタデータを利用するクラスとテストする例

メタデータを積極利用する場合はそんなに多くないと思いますが、その利用を単体テストする方法は用意されています。
前回( Python で gRPC の単体テスト) も合わせてご覧ください。この記事ではメタデータの部分だけを取り上げます。

サーバー

サーバーのテストの例として、 “test_quote_service” の部分を取り上げます。

invoke_unary_unary(target_service.methods_by_name['Get'], [('a', 'b'), ('c', 'd')], request, None) により、テスト対象への Invocation Metadata の送信を再現できます。
rpc.initial_metadata() でテスト対象から送信された Initial Metadata を、 rpc.termination() の第 2 戻り値では Trailing Metadata をチェックできます。

クライアントライブラリ

クライアントライブラリのテストの例として、 “test_cowsay_client” の部分を取り上げます。

take_unary_unary(target_service.methods_by_name['Get']) で、テスト対象から送信された Invocation Metadata を確認できます。
rpc.send_initial_metadata([('z', 'y'), ('x', 'w')]) で Initial Metadata の、 rpc.terminate(res, [('z', 'y'), ('x', 'w')], StatusCode.OK, '') で Trailing Metadata のテスト対象への送信を再現できます。

インターセプタについて

gRPC の「インターセプタ( Interceptor )」は、レスポンスやリクエストが実際に受送信される前に通過する処理のことです。 Middleware 的とか AOP 的といえばよいのでしょうか。
こちらはまだ gRPC の Python 実装では実験段階なのか、少し込み入ったことをすると private クラスが必要な場合も出てきます。 API は変化する可能性があるので注意が必要です。(メタデータを見てフィルタリングする程度ならいいと思いますが。)

インターセプタには、サーバー側の受信部分とクライアント側の送信部分の 2 種類があります。前者は Streaming の有無・種類により 4 通りの実装の可能性がありますが、後者は 1 通りしかありません。

ちなみに、未実装のメソッドへのリクエストでも、 interceptor が適当にでも値を返せば UNIMPLEMENTED のエラーにはなりません。この挙動はなにかに使えるかもしれません。

インターセプタを利用する例

ここでも上にある図のようなシステムを用いてサンプルコードを示します。

インターセプタの実装では、サーバー用もクライアント用も、本体の処理となる continuation の前後を実装していく形が多くなりそうです。

サーバー用

サーバー用での例として、 “key_auth_server_interceptor” の部分を取り上げます。

サーバー用のインターセプタは、 grpc.ServerInterceptor を継承したクラスとして定義し、 intercept_service() を実装することで実現できます。

key_auth_server_interceptor.py
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def unauthenticated_process(ignored_request, context):
context.send_initial_metadata([('initial-fail', "a")])
context.set_trailing_metadata([('trailing-fail', "b")])
context.abort(StatusCode.UNAUTHENTICATED, 'Secret key is wrong!')

class KeyAuthServerInterceptor(ServerInterceptor):
def __init__(self, secret_key):
self._secret_key = secret_key

def intercept_service(self, continuation, handler_call_details):
if handler_call_details.method.endswith('GetSecretKey') or (
'secret-key', self._secret_key) in handler_call_details.invocation_metadata:
return continuation(handler_call_details)
else:
return unary_unary_rpc_method_handler(unauthenticated_process)

処理を続ける場合は continuation で元の処理を、そうでない場合は ??_???_rpc_method_handler と関数を使って別の処理を利用するようにできます。
サンプルの実装では、 GetSecretKey へのアクセスか secret-key メタデータに正しい鍵を持っていれば続行し、そうでなければメタデータを付与して UNAUTHENTICATED エラーを返します。

インターセプタはただ作っただけでは利用できないので、サーバーを起動するときに登録する処理が必要になります。
サーバーの作成を行う際に、 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), interceptors=[KeyAuthServerInterceptor(secret_key)]) のように interceptors 引数として渡すことで、フックすることができます。

クライアント用

クライアント用での例として、 “tracer_client_interceptor” の部分を取り上げます。

クライアント用インターセプタは、 Simple RPC の場合は UnaryUnaryClientInterceptor を継承したクラスを定義し、 intercept_unary_unary を実装することで実現できます。他の RPC の場合でも、同様のクラスやメソッドが用意されているので、それを実装します。なお、 4 種類すべてを継承・実装した処理を書くことも可能です。

tracer_client_interceptor.py
6
7
8
9
10
11
12
13
14
15
16
17
class TracerClientInterceptor(UnaryUnaryClientInterceptor):
def intercept_unary_unary(self, continuation, client_call_details, request):
metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
metadata.append(('tracer', 'This is from interceptor'))
new_details = _ClientCallDetails(client_call_details.method, client_call_details.timeout, metadata,
client_call_details.credentials)
response = continuation(new_details, request)
original_initial_metadata = response.initial_metadata
response.initial_metadata = lambda: original_initial_metadata() + [_Metadatum('tracer', 'aaaa!')]
return response

処理を続ける場合は continuation で元の処理を、そうでない場合は偽のレスポンスやエラーを含むレスポンス( grpc.Call を利用?)を返すようにします。

こちらも作っただけでは利用できないので、 Stub を作るときの channel に登録する処理が必要になります。
intercepted_channel = grpc.intercept_channel(channel, tracer_client_interceptor) のように grpc.intercept_channel を使うことで、フックすることができます。(配列を渡せば順番を保持したまま一斉にフックできます。)

インターセプタを利用するクラスをテストする例

インターセプタをいくつも開発していくことはほぼないと思いますし、そもそもまだ実験的な機能のような気がしますが、一応インターセプタのテストもできそうです。
普通の gRPC の機能のテストのようにはきれいに書けず、(標準ライブラリの) mock だらけになってしまいました。
インターセプタのテストには特に gRPC 固有なものはないので、深い説明もいらないと思います。

サーバー用

サーバー用のテスト例として、 “test_key_auth_server_interceptor” の部分を取り上げます。

continuation の実態を作るのは厳しいので、 mock を使いました。 mock の機能により continuation が呼ばれたかどうかの確認ができます。??_???_rpc_method_handler で別の処理に入ったときのチェックも行っています。
かなり実装依存のテストになってしまうなという印象があります。

クライアント用

クライアント用のテスト例として、 “test_tracer_client_interceptor” の部分を取り上げます。

grpc.Call オブジェクトを作るのは continuation 同様厳しいので、両方とも mock を使いました。こちらでも mock の機能で呼び出しをチェックができます。

まとめ

  • ( gRPC エラーの取扱い・)メタデータ・インターセプタの利用法を紹介した。
  • それらに対する単体テスト方法について紹介した。
  • インターセプタの実装やテストについては private クラスなどが出る場合もあり、まだプロダクション用途で使うのは怖い。

(単体テストができないと思っていたので)サンプルではサーバーとクライアントライブラリを別ファイルに分けていますが、サーバー実装の中で Stub を直接呼ぶような場合でも、実際の処理の順番と同じように

  1. Stub send
  2. Service receive
  3. Service send
  4. Stub receive

の順番で擬似リクエスト・レスポンスを渡してやれば動きます。ただし、 Future オブジェクトを複数取り扱うので、タイミングをミスするとデッドロックを起こして進まなくなります。

参照記事