michimani.net

AWS Lambda の Extension API を使いたいがために Go で Lambda Extension を自作してみた

2022-12-07

この記事は AWS LambdaとServerless Advent Calendar 2022 7 日目の記事です。

最近 Lambda の Runtime API について調べていたところで Extension API というものもある事に気づきました。どうやら Lambda Extension を自作するときに使う API のようだったので、その API を使いたいがために実際に Lambda Extension を自作してみました。

Lambda Extension とは

Lambda Extension (以下、 Extension) は、 Lambda 関数の Runtime の一部、もしくは実行環境内で独立したプロセスとして様々な機能を実行できる機能です。それぞれ、前者は内部拡張機能、後者は外部拡張機能と呼ばれ、この記事内で触れる Extension は後者の外部拡張機能を指すものとします。

Extension は、 AWS が提供しているもの、 AWS Lambda パートナー 1 が提供しているもの、および今回紹介するように独自の Extension を作成して使用することができます。 最近だと AWS からリリースされた AWS Parameters and Secrets Lambda Extension 2 3 が注目されていました。

Lambda は実行環境上で関数を呼び出しますが、 Extension はその実行環境が作成される際に開始されます。
そして、関数と同じ実行環境上で起動するため Extension 自体のパフォーマンス (初期化にかかる時間、処理時間、メモリ消費量など) は関数のパフォーマンスにも影響します。そのため、 Extension を自作する際には、そのあたりにも注意が必要です。

参考: Lambda extensions - AWS Lambda

実行環境のライフサイクルと Extension

Lambda の実行環境のライフサイクルには、 InitInvokeShutdown という 3 つのフェーズがあります。各フェーズでは実行環境の作成や関数の Invoke、実行環境の破棄などを行い、 Extension は各フェーズで Extension API を使用して Runtime への登録や実行環境で発生するイベントの検知を行い、 Extension としての処理を実行します。

sequenceDiagram participant c as Client participant ls as Lambda Service participant r as Runtime participant e as Extension alt Init ls ->> e: init extensions in /ops/extensions e ->> ls: POST /extension/register e ->> ls: GET /extension/event/next end loop alt Invoke c ->> ls: invoke ls ->> r: invoke ls ->> e: response of GET /extension/event/next (INVOKE event) r ->> ls: /response (Runtime API) ls ->> c: response e ->> ls: GET /extension/event/next end end alt Shutdown ls ->> r: SIGTERM ls ->> e: response of GET /extension/event/next (SHUTDOWN event) end

Init

Invoke

Shutdown

参考: Lambda Extensions API - AWS Lambda

Go で Lambda Extension を作ってみる

これまでの内容を踏まえて、実際に Extension を作ってみます。
Extension は関数とは独立したプロセスとして実行されるため、実装する際に使用する言語は関数と合わせる必要はありません。今回は下記リポジトリの実装例を参考にして Go で実装してみます。

aws-samples/aws-lambda-extensions: A collection of sample extensions to help you get started with AWS Lambda Extensions

自作する Extension の概要

今回実装する Extension でやりたいことは、下記のとおりです。

  1. 同一実行環境で Invoke された関数に対応する Request ID をメモリ上に保持する
  2. 関数から任意のタイミングで Extension に対して GET リクエストを送ることで、保持している Request ID の一覧を取得できる

1 は、 Lambda の Invoke フェーズにて Invoke イベントを検知して、そのイベント内に含まれる Request ID をメモリ上に保持します。

2 は、 Extension のサブプロセス (goroutine) として HTTP API サーバを起動し、 特定のエンドポイントに対するリクエストのレスポンスとしてメモリ上に保持している Request ID のリストを JSON 形式で返します。

シーケンス図にするとこんな感じです。

sequenceDiagram participant c as Client participant ls as Lambda Service participant r as Runtime participant f as Function participant e as Extension participant m as On Memory participant s as Extension's HTTP API Server alt Init ls ->> e: init e ->> ls: POST /extension/register e ->> s: start e ->> m: init invocation history e ->> ls: GET /extension/event/next end loop alt Invoke c ->> ls: invoke ls ->> r: invoke ls ->> e: response of GET /extension/event/next (INVOKE event) e ->> m: save Request ID to history e ->> ls: GET /extension/event/next r ->> f: execute main handler f ->> s: GET localhost:1203/invocations s ->> m: read history m ->> s: s ->> f: {"invocations": [{"awsRequestId":"id1", "invocatedAt":"2022-12-04T00:15:06.992783032Z"}]} f ->> r: r ->> ls: /response (Runtime API) ls ->> c: response end end alt Shutdown ls ->> r: SIGTERM ls ->> e: response of GET /extension/event/next (SHUTDOWN event) end

実装

実装したものは下記リポジトリに置いています。

michimani/invocation-history-extension: This is a Extension for AWS Lambda Function that records history of invocation at the same runtime environment.

この記事内では、主に Extension API を利用するところについて触れます。

Extension のメイン処理

main.go#main では下記の処理を順に実行しています。

  1. Extension API をコールするための Context を生成
  2. シグナルハンドリング用の goroutine を起動
  3. Extension API POST /extension/register をコールして実行環境に Extension を登録
  4. Invocation の履歴を返す HTTP API サーバを起動
  5. Extension API GET /extension/event/next をコールして各イベントをハンドリングする処理を開始

1,2,4 は特に目新しいポイントはないので、 3 と 4 について 各 Extension API の仕様とともに実装を見ていきます。

POST /extension/register で実行環境に Extension を登録

実行環境のライフサイクルと Extension の関係のところで書いたとおり、 Extension のコードが実行されたときに Extension API の /extension/register エンドポイントに POST リクエストを送ることで Extension を登録する必要があります。

この API ではリクエストヘッダおよびリクエストボディに必須の項目があります。

まず、リクエストヘッダの Lambda-Extension-Name には Extension の名前をセットする必要があります。セットする値は、 Extension の実行可能ファイルのファイル名です。今回の Extension の場合、 invocation-history-extension というファイル名で /opt/extensions 配下に実行ファイルを配置するようにしているので invocation-history-extension を値としてセットします。

次にリクエストボディですが、この Extension でハンドリングしたいイベントを配列で指定します。 イベントの種類は INVOKESHUTDOWN のみで、今回は両方のイベントをハンドリングしたいので両方指定します。

API のレスポンスヘッダ Lambda-Extension-Identifier には登録が完了した結果として一意な値が設定されています。この値は後にコールする API のリクエストヘッダにセットする必要があるので保持しておきます。

Extension API を実行するには単純に HTTP リクエストを送信すればよいのですが、今回はその部分を aws-lambda-api-go というクライアントライブラリを 自作 使用して実装します。

michimani/aws-lambda-api-go: This is a client library for Go language to use AWS Lambda’s Runtime API, Extension API, Telemetry API, and Logs API.

import (
	"context"
	"fmt"
	"net/http"
	"time"

	"github.com/michimani/aws-lambda-api-go/alago"
	"github.com/michimani/aws-lambda-api-go/extension"
)

type Client struct {
	alagoClient         *alago.Client
	logger              *Logger
	extensionIdentifier string
}

func (c *Client) Register(ctx context.Context, extensionName string) error {
	out, err := extension.Register(ctx, c.alagoClient, &extension.RegisterInput{
		LambdaExtensionName: extensionName,
		Events: []extension.EventType{
			extension.EventTypeInvoke,
			extension.EventTypeShutdown,
		},
	})

	if err != nil {
		return fmt.Errorf("An expected error occurred at extension registration. err:%v", err)
	}

	if out.StatusCode != http.StatusOK {
		return fmt.Errorf("An error occurred at extension registration. statusCode:%d errType:%s errMessage:%s",
			out.StatusCode, out.Error.ErrorType, out.Error.ErrorMessage)
	}

	c.logger.Info("Succeeded to register extension.")
	c.extensionIdentifier = out.LambdaExtensionIdentifier

	return nil
}

クライアントライブラリの extension.Register 関数に対して extension.RegisterInput に Extension の名前とハンドリングしたいイベントをセットして渡すだけで POST /extension/register API をコールすることができます。簡単ですね。

API のレスポンスは extension.RegisterOutput として取得できるので、 RegisterOutput.LambdaExtensionIdentifier の値を保持しておきます。

参考: Lambda Extensions API | Register - AWS Lambda

GET /extension/event/next で次のイベントを取得・ハンドリング

実行環境のライフサイクルと Extension の関係のところで書いたとおり、 Lambda の実行環境は関数の invoke および実行環境の破棄時にそれぞれ INVOKESHUTDOWN のイベントを発行します。発行します、と書きましたが、実際には GET /extension/event/next API がリクエストをブロックして、実行環境からイベントが発行されるタイミングでレスポンスが返るという動きになります。なので、この API をコールするための http.Client のタイムアウトは長め (もしくは 0) にしておく必要があります。

この API は、 Register API のレスポンスヘッダ Lambda-Extension-Identifier に含まれていた値をリクエストヘッダ Lambda-Extension-Identifier にセットしてコールします。リクエストボディは不要です。

こちらも Register と同じクライアントライブラリを使って、イベントのハンドリングと合わせて下記のように実装しています。

func (c *Client) PollingEvent(ctx context.Context) (bool, error) {
	c.logger.Info("Waiting for next event...")
	out, err := extension.EventNext(ctx, c.alagoClient, &extension.EventNextInput{
		LambdaExtensionIdentifier: c.extensionIdentifier,
	})

	if err != nil {
		return false, err
	}

	if out.StatusCode != http.StatusOK {
		return false, fmt.Errorf("An error occurred at calling /extension/event/next API. statusCode:%d errType:%s errMessage:%s",
			out.StatusCode, out.Error.ErrorType, out.Error.ErrorMessage)
	}

	switch out.EventType {
	case eventTypeInvoke:
		now := time.Now().UTC()
		saveInvocationHistory(out.RequestID, &now)
		c.logger.Info("Succeeded to save new history. awsRequestId:%s invokedAt:%v", out.RequestID, now)
	case eventTypeShutdown:
		c.logger.Info("Received shutdown event. reason:%s", out.ShutdownReason)
		c.logger.Info("Truncate invocation history.")
		for _, h := range History.Invocations {
			c.logger.Info("%+v", *h)
		}
		return false, nil
	default:
		return false, fmt.Errorf("Cannot handle event. eventType:%s", out.EventType)
	}

	return true, nil
}

イベントの種類が INVOKE のときは、レスポンスに含まれる Request ID をメモリ上に保存しています。 一方、イベントの種類が SHUTDOWN のときは、それまでに保存していた Request ID をすべてログに出力するようにしています。

メソッド名にもあるように、実質イベントをポーリングしているような処理になるので、このメソッドの返り値としては boolerror を返すようにしていて、 booltrue であれば呼び出し元 (今回であれば main.go#processEvents) で再度このメソッドを実行してポーリングを始めるような形にしています。

参考: Lambda Extensions API | Next - AWS Lambda

自作した Extension を Lambda 関数で使う

では、自作した Extension を実際に使ってみます。

Extension の利用方法としては下記の 3 通りがあります。

今回はコンテナイメージに同梱してローカル環境で自作 Extension を使ってみます。コンテナイメージにすることで RIE を使ってローカル環境でも実際の Lambda の実行環境に近い形で動作確認ができます。4

Extension の実行可能ファイルを作成

まずは Extension を実行可能ファイルとして用意します。
今回は Go で実装したので、下記コマンドでビルドして実行可能ファイルを生成し、 zip 化します。

GOOS=linux GOARCH=amd64 go build -o bin/extensions/invocation-history-extension main.go \
&& chmod +x bin/extensions/invocation-history-extension \
&& cd bin \
&& zip -r extension.zip extensions/

もしくは、今回実装した Extension については GitHub の Releases から zip でダウンロードできるので、そちらから取得してきます。

Releases · michimani/invocation-history-extension

Lambda 関数のコード

今回実装した Extension は localhost:1203 で HTTP API サーバを起動しているので、そこにリクエストを送ってそのレスポンスを関数のレスポンスとするような Lambda 関数を用意します。

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"time"

	runtime "github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-lambda-go/lambdacontext"
)

type Response struct {
	Message     string       `json:"message"`
	Invocations []invocation `json:"invocations"`
}

func handleRequest(ctx context.Context) (*Response, error) {
	log.Println("start handler")
	defer log.Println("end handler")

	lc, ok := lambdacontext.FromContext(ctx)
	if !ok {
		return nil, fmt.Errorf("Failed to get Lambda context from context.")
	}

	history, err := getInvocationHistory()
	if err != nil {
		return nil, err
	}

	return &Response{
		Message:     fmt.Sprintf("Current request ID is %s", lc.AwsRequestID),
		Invocations: history,
	}, nil
}

func init() {
	log.Println("cold start")
}

func main() {
	runtime.Start(handleRequest)
}

// Struct of response from Invocation History Extension - GET /invocations API
type resultFromExtension struct {
	Invocations []invocation `json:"invocations"`
}

type invocation struct {
	AWSRequestID string     `json:"awsRequestId"`
	InvocatedAt  *time.Time `json:"invocatedAt"`
}

const invocationsEndpoint = "http://localhost:1203/invocations"

// Get invocation history from Invocation History Extension IPC.
func getInvocationHistory() ([]invocation, error) {
	req, err := http.NewRequestWithContext(context.Background(), "GET", invocationsEndpoint, nil)
	if err != nil {
		return nil, err
	}

	// call Extension API
	client := http.Client{}
	res, err := client.Do(req)
	if err != nil {
		return nil, err
	}
	defer res.Body.Close()

	buf := new(bytes.Buffer)
	if _, err := buf.ReadFrom(res.Body); err != nil {
		return nil, err
	}
	bodyString := buf.String()

	if res.StatusCode != http.StatusOK {
		return nil, fmt.Errorf("Failed to get invocation history by using extension. statusCode:%d body:%s", res.StatusCode, bodyString)
	}

	exRes := resultFromExtension{}
	if err := json.Unmarshal([]byte(bodyString), &exRes); err != nil {
		return nil, err
	}

	return exRes.Invocations, nil
}

別プロセスで起動している Extension の HTTP API サーバに GET リクエストを送信し、そのレスポンスに現在の Request ID を付与したものを関数のレスポンスとして返すような実装になっています。具体的なレスポンスの内容はこのあと確認します。

Dockerfile

Extension をコンテナイメージに同梱する場合は Extension の実行可能なファイルをイメージ内の /opt/extensions 以下に配置すればよいので、下記のような Dockerfile を作成することで Extension を同梱したイメージを作成することができます。

FROM public.ecr.aws/lambda/provided:al2 as build
RUN yum install -y golang unzip
RUN go env -w GOPROXY=direct
ADD go.mod go.sum ./
RUN go mod download
ADD . .
RUN go build -o /main
RUN mkdir -p /opt
## ここは GitHub から直接取得するようにしてもよい
# ADD https://github.com/michimani/invocation-history-extension/releases/download/v0.2.0/extension.zip ./
ADD ./bin/extension.zip ./
RUN unzip extension.zip -d /opt
RUN rm extension.zip

FROM public.ecr.aws/lambda/provided:al2
COPY --from=build /main /main
COPY entry.sh /
RUN chmod 755 /entry.sh
RUN mkdir -p /opt/extensions
WORKDIR /opt/extensions
COPY --from=build /opt/extensions .
ENTRYPOINT [ "/entry.sh" ]
CMD ["/main"]

AWS やパートナーから提供されている Extension で実行可能ファイルそのものが公開されていない場合、コンテナイメージとして公開されているものがあればそれを使えばよくて、 Lambda Layer として公開されているものについては下記の AWS CLI コマンドを実行することで zip 形式で Extension をダウンロードすることができます。 (AWS Parameters And Secrets Lambda Extension の場合)

curl $(
  aws lambda get-layer-version-by-arn \
  --arn 'arn:aws:lambda:ap-northeast-1:133490724326:layer:AWS-Parameters-and-Secrets-Lambda-Extension:2' \
  --query 'Content.Location' \
  --output text
) --output extension.zip

その後 unzip して実行可能なファイルのみをコンテナイメージに同梱すれば、自作した Extension と同様にコンテナイメージで使用することができます。

ローカルで起動して invoke

Dockerfile の準備もできたので、あとは docker build して docker run すればローカルで Lambda の実行環境が起動します。

docker build -t invocation-history-ex-func:local . \
&& docker run \
--rm \
-p 9000:8080 \
invocation-history-ex-func:local

これで localhost:9000 で Lambda の実行環境が起動しているので、そこに対して curl でリクエストを送って invoke します。

curl \
-H 'Content-Type: application/json' \
http://localhost:9000/2015-03-31/functions/function/invocations

すると下記のようなレスポンスが得られます。

{
  "message": "Current request ID is e65d6f55-2335-47e9-88f9-a851e7d539d5",
  "invocations": [
    {
      "awsRequestId": "e65d6f55-2335-47e9-88f9-a851e7d539d5",
      "invocatedAt": "2022-12-06T13:25:52.447847893Z"
    }
  ]
}

続けて invoke すると

{
  "message": "Current request ID is 4a61a5dc-7bbf-43cb-abd0-67ed0388cf5f",
  "invocations": [
    {
      "awsRequestId": "e65d6f55-2335-47e9-88f9-a851e7d539d5",
      "invocatedAt": "2022-12-06T13:25:52.447847893Z"
    },
    {
      "awsRequestId": "4a61a5dc-7bbf-43cb-abd0-67ed0388cf5f",
      "invocatedAt": "2022-12-06T13:27:17.244096344Z"
    }
  ]
}

それまでに同じ実行環境で invoke された際の Request ID が履歴となって保持されていることがわかります。

Lambda の実行環境が出力しているログは下記のようになっています。

 1START RequestId: e65d6f55-2335-47e9-88f9-a851e7d539d5 Version: $LATEST
 206 Dec 2022 13:25:52,433 [INFO] (rapid) External agent invocation-history-extension (c96c9db2-076e-4ac7-9572-f98d4091d0e8) registered, subscribed to [INVOKE SHUTDOWN]
 3[Invocation History Extension] INFO: Succeeded to register extension.
 4[Invocation History Extension] INFO: Waiting for next event...
 52022/12/06 13:25:52 cold start
 62022/12/06 13:25:52 start handler
 7[Invocation History Extension] INFO: Succeeded to save new history. awsRequestId:e65d6f55-2335-47e9-88f9-a851e7d539d5 invokedAt:2022-12-06 13:25:52.447847893 +0000 UTC
 8[Invocation History Extension] INFO: Waiting for next event...
 92022/12/06 13:25:52 end handler
10END RequestId: e65d6f55-2335-47e9-88f9-a851e7d539d5
11REPORT RequestId: e65d6f55-2335-47e9-88f9-a851e7d539d5  Init Duration: 0.22 ms  Duration: 28.23 ms      Billed Duration: 29 ms     Memory Size: 3008 MB    Max Memory Used: 3008 MB
12START RequestId: 4a61a5dc-7bbf-43cb-abd0-67ed0388cf5f Version: $LATEST
132022/12/06 13:27:17 start handler
14[Invocation History Extension] INFO: Succeeded to save new history. awsRequestId:4a61a5dc-7bbf-43cb-abd0-67ed0388cf5f invokedAt:2022-12-06 13:27:17.244096344 +0000 UTC
15[Invocation History Extension] INFO: Waiting for next event...
162022/12/06 13:27:17 end handler
17END RequestId: 4a61a5dc-7bbf-43cb-abd0-67ed0388cf5f
18REPORT RequestId: 4a61a5dc-7bbf-43cb-abd0-67ed0388cf5f  Duration: 1.53 ms       Billed Duration: 2 ms   Memory Size: 3008 MB       Max Memory Used: 3008 MB

1 〜 11 行目が 1 回目の invoke によるログで、このときのみ Register API を実行したログが出力されていることがわかります。

この状態から ctr + C で実行環境のプロセスを終了させると下記のようなログが出力されます。

^C06 Dec 2022 13:31:54,898 [INFO] (rapid) Received signal signal=interrupt
06 Dec 2022 13:31:54,898 [INFO] (rapid) Shutting down...
06 Dec 2022 13:31:54,898 [WARNING] (rapid) Reset initiated: SandboxTerminated
[Invocation History Extension] INFO: Received shutdown event. reason:SandboxTerminated
[Invocation History Extension] INFO: Truncate invocation history.
[Invocation History Extension] INFO: {AWSRequestID:e65d6f55-2335-47e9-88f9-a851e7d539d5 InvocatedAt:2022-12-06 13:25:52.447847893 +0000 UTC}
[Invocation History Extension] INFO: {AWSRequestID:4a61a5dc-7bbf-43cb-abd0-67ed0388cf5f InvocatedAt:2022-12-06 13:27:17.244096344 +0000 UTC}
06 Dec 2022 13:31:54,899 [INFO] (rapid) runtime exited

ローカル環境ではプロセス終了 = 実行環境の破棄となります。出力されているログからも、 SHUTDOWN イベントを受け取ってハンドリングされていることがわかります。

まとめ

AWS Lambda の Extension API を使ってみたいがために Lambda Extension を自作してみた話でした。

Lambda Extension を自作してみて Extension API がやっていること、 Lambda の実行環境上での Extension のライフサイクル、 Lambda そのもののライフサイクルについて完全に理解できた気がします。

と言っても Extension API にはまだ Init Error API と Exit Error API があり、今回はそれらを使っていません。また、 Lambda には Extension API の他にも Runtime API、 Telemetry API (Logs API) もあり、まだまだ何もわからない部分がたくさんありそうです。

個人的に一時期 Lambda 熱が冷めていたんですが、最近また温まってきたので今後はこのあたりの API 群を追っていこうかなと思います。


  1. AWS Lambda 拡張機能パートナー - AWS Lambda  ↩︎

  2. AWS Parameters and Secrets Lambda Extension を発表  ↩︎

  3. SSM Parameter Store および Secrets Manager への問い合わせをキャッシュすることで再取得時のレイテンシーおよびコストを削減できるという機能 ↩︎

  4. コンテナイメージを使った Lambda 関数のあれこれ - michimani.net  ↩︎


comments powered by Disqus