michimani.net

AWS Step Functions を使った翻訳ワークフローを AWS CDK v2 (TypeScript) で構築してみた話

2021-12-09

はじめに

この記事は 弁護士ドットコム Advent Calendar 2021 の 9 日目の記事です。
昨日は @takky さんの 「 チームで内部API開発をするときのツール 」 の話でした。ぜひこちらの記事もご覧ください。

ちなみに去年は AWS CLI だけで Hugo のホスティング環境を構築するハンズオン的な記事 を書いてました。

目次

今回書くこと

今回は、この 1 年間で新たに触れた技術要素と、今までほとんど触っていなかった AWS のサービスを無理やり組み合わせて何かを作ってみた話です。

この 1 年間で新たに触れた技術要素は、 Go 言語です。
Go 言語を本格的に実務で使うようになったのは弁護士ドットコムに転職してからなので、歴としてはちょうど 1 年くらいです。今回は、Go をランタイムとした Lambda 関数を実装します。Lambda 関数では、 宣伝も兼ねて 自作した Twitter API v2 の Go のライブラリを使って特定の文字列をツイートします。

今までほとんど触っていなかった AWS のサービスとしては、 AWS Step Functions (以下、Step Functions (半角スペース警察に注意です)) を使ってみます。
Step Functions に関しては、最近リリースされた AWS SDK Integration も試してみます。

Go で Lambda 関数を書きたい方、 AWS CDK を使って Step Functions のステートマシンを構築してみたい方の参考になれば幸いです。

作ったもの

作ったものは、 Step Functions のステートマシンとして AWS SDK Integration と Lambda 関数を繋げただけの簡素な翻訳ワークフローです。

AWS Step Functions Workflow with Lambda Functions

そして、これらの構成は AWS CDK (以下、CDK) で管理します。
CDK については、最近 “RC” が取れてリリースされた v2.0.0 を TypeScript で使います。
v2 使うなら CDK も Go 使えばいいのでは? となりますが、 Go についてはまだ developer release なのでここでは TypeScript を使うことにします。

作ったものは GitHub に置いています。以降はほぼこのリポジトリ内の説明になるので、中身だけ見たいという方はリポジトリの方をご覧ください。

ざっくり解説

ここからは作ったもの ( michimani/honyakutter-ts ) の解説をしていきます。

解説の前提

解説するにあたって、下記の内容を前提とします。

CDK のディレクトリ構成

まずは CDK で管理するアプリケーションのディレクトリ構成です。ドキュメント類、 Makefile、 Git 管理されないものなど、直接アプリケーションに関係ないものは省略しています。

honyakutter-ts
├── README.md
├── bin
│   └── honyakutter-ts.ts
├── cdk.json
├── jest.config.js
├── lib
│   └── honyakutter-ts-stack.ts
├── package-lock.json
├── package.json
├── resources
│   ├── lambda.ts
│   ├── lambdaFunctions
│   │   └── tweet
│   │       ├── go.mod
│   │       ├── go.sum
│   │       └── main.go
│   ├── logs.ts
│   └── stepFunctions.ts
├── test
│   └── honyakutter-ts.test.ts
├── testdata
│   ├── statemachine_input.json
│   └── tweet_lambda_payload.json
└── tsconfig.json

cdk init --language=typescript で初期化したときに生成されるファイル群以外に追加しているのは resources ディレクトリと testdata ディレクトリです。

resources

reosurces ディレクトリでは、デプロイする AWS の各種リソースの定義をサービスごとに分けて配置しています。

また、 Lambda 関数の実体 (コード) については lambdaFunctions ディレクトリに配置しています。
今回は Go で実装したので、 lambdaFunctions/tweet の下に main.gogo.mod および go.sum があります。

testdata

testdata ディレクトリには、デプロイした Lambda 関数を単体で起動、およびステートマシンを起動するときに input として渡す JSON を置いています。Lambda 関数およびステートマシンを手動で実行する際には AWS CLI を使います。(後述します)

Go で Lambda 関数を実装する

今回は event (= input) として受け取った文字列に、実行時の時刻を付与してツイートする Lambda 関数を Go で実装しました。 main.go はこんな感じです。

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/aws/aws-lambda-go/lambda"
	"github.com/michimani/gotwi"
	"github.com/michimani/gotwi/tweets"
	"github.com/michimani/gotwi/tweets/types"
)

const (
	OAuthTokenEnvKeyName       = "GOTWI_ACCESS_TOKEN"
	OAuthTokenSecretEnvKeyName = "GOTWI_ACCESS_TOKEN_SECRET"
)

type TweetEvent struct {
	Text TweetText `json:"inputText"`
}

type TweetText string

func (t TweetText) withCurrentTime() string {
	now := time.Now()
	return fmt.Sprintf("[%s] %s", now, t)
}

func handleRequest(ctx context.Context, event TweetEvent) (string, error) {
	fmt.Printf("%#+v\n", event)

	c, err := newTiwtterClient()
	if err != nil {
		return "", err
	}

	tweetText := event.Text.withCurrentTime()

	tweetID, err := tweet(c, tweetText)
	if err != nil {
		return "", err
	}

	return tweetID, nil
}

func newTiwtterClient() (*gotwi.GotwiClient, error) {
	in := &gotwi.NewGotwiClientInput{
		AuthenticationMethod: gotwi.AuthenMethodOAuth1UserContext,
		OAuthToken:           os.Getenv(OAuthTokenEnvKeyName),
		OAuthTokenSecret:     os.Getenv(OAuthTokenSecretEnvKeyName),
	}

	return gotwi.NewGotwiClient(in)
}

func tweet(c *gotwi.GotwiClient, text string) (string, error) {
	p := &types.ManageTweetsPostParams{
		Text: gotwi.String(text),
	}

	res, err := tweets.ManageTweetsPost(context.Background(), c, p)
	if err != nil {
		return "", err
	}

	return gotwi.StringValue(res.Data.ID), nil
}

func main() {
	lambda.Start(handleRequest)
}

ツイートは Twitter API v2 の POST /v2/tweet にリクエストします。その際、 gotwi というライブラリを使用しています。Go で Twitter API v2 を使える良い感じのライブラリがなかったので、 Go の勉強も兼ねて自作しました。

Twitter Developer で取得した API Key/API Key Secret および Access Token/Access Token Secret を環境変数およびクライアント生成時のパラメータとして使用することで、 Twitter の各種 API を実行できるようになっています。
Stream 系 API についてはまだサポートできていないのですが、それ以外の v2 で利用可能な API については対応しています。star ください

少し話が逸れました。

Go で Lambda 関数を実装する際には、 Context と Event を受け取る関数 (今回だと handleRequest()) を定義して、 main 関数内でその関数を lambda.Start に渡すようにします。
Event に関しては JSON で渡されるので、パースできるように構造体を定義しておきます。(今回であれば TweetEvent)

Lambda 関数のレスポンスとして、今回は単純に String を返していますが、 JSON を返したい場合は別途 JSON タグをつけたフィールドを持つ構造体をレスポンス用に定義して、ハンドラー関数でそれを返すようにします。レスポンス用の構造体を定義しているサンプルも置いておきます。

Go で実装した Lambda 関数は、ビルドしてから zip として、または S3 にアップロードしてからデプロイする必要があります。ビルドする際、 OS は linux を、アーキテクチャは amd64 を、それぞれ指定します。

GOOS=linux GOARCH=amd64 go build -o bin/main

ハンドラー関数とメイン関数の定義、およびビルド時の OS とアーキテクチャに気をつければ、あとは普通に Go のアプリケーションとして実装するだけです。

CDK で Go で実装した Lambda 関数を定義する

Go で実装した Lambda を CDK で定義するには、下記のようにします。

 1import { Duration } from "aws-cdk-lib";
 2import { Construct } from "constructs";
 3import * as lambda from "aws-cdk-lib/aws-lambda";
 4
 5const memorySize = 128;
 6const timeout = 60;
 7
 8/**
 9 * Lambda function that tweet a text.
10 * @param scope
11 * @returns lambda.Function
12 */
13function tweetLambdaFunction(scope: Construct): lambda.Function {
14  return new lambda.Function(scope, "TweetLambdaFunction", {
15    functionName: "honyakutter-ts-tweet-function",
16    description: "Tweet text.",
17    code: new lambda.AssetCode("./resources/lambdaFunctions/tweet/bin/"),
18    handler: "main",
19    runtime: lambda.Runtime.GO_1_X,
20    memorySize: memorySize,
21    timeout: Duration.seconds(timeout),
22    environment: {
23      GOTWI_API_KEY: process.env.TWITTER_API_KEY!,
24      GOTWI_API_KEY_SECRET: process.env.TWITTER_API_KEY_SECRET!,
25      GOTWI_ACCESS_TOKEN: process.env.TWITTER_ACCESS_TOKEN!,
26      GOTWI_ACCESS_TOKEN_SECRET: process.env.TWITTER_ACCESS_TOKEN_SECRET!
27    }
28  });
29}
30
31export { tweetLambdaFunction };

ポイントになるのは Lambda 関数の実体 (= コード) をどうやって渡すか、です。
Python や Node.js であれば new lambda.InlineCode() でインラインでコードを指定できますが、 Go の場合は new lambda.AssetCode() でビルド後のバイナリが配置されたディレクトリを指定します。(17 行目)

ちなみに CDK v1 には @aws-cdk/aws-lambda-go というモジュールがあり、より簡単に Go で実装した Lambda 関数を定義してデプロイすることができます。(ローカルマシンに Dokcer がインストールされている必要があります)

CDK v2 にも同様のモジュールはありますが、この記事を書いている時点 (2021-12-09 時点) では α 版でした。

CDK で Step Functions のステートマシンを定義する

今回は resources/stepFunctions.ts で下記のように定義しています。

 1import { Duration } from "aws-cdk-lib";
 2import { Construct } from "constructs";
 3import * as lambda from "aws-cdk-lib/aws-lambda";
 4import * as stepfunctions from "aws-cdk-lib/aws-stepfunctions";
 5import * as tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
 6import { StateMachineLogGroup } from "./logs";
 7
 8const stateMachineTimeout = 300;
 9const taskTimeout = 60;
10
11function translateTweetStateMaschine(
12  scope: Construct,
13  tweetFunc: lambda.Function
14) {
15  const initState = new stepfunctions.Pass(scope, "init", {
16    comment: "init state"
17  });
18
19  // Translate state
20  const translateResultSelector: { [key: string]: string } = {
21    "inputText.$": "$.TranslatedText"
22  };
23  const callAWSServiceProps: tasks.CallAwsServiceProps = {
24    service: "Translate",
25    action: "translateText",
26    iamResources: ["*"],
27    iamAction: "translate:TranslateText",
28    parameters: {
29      SourceLanguageCode: stepfunctions.JsonPath.stringAt("$.sourceLang"),
30      TargetLanguageCode: stepfunctions.JsonPath.stringAt("$.targetLang"),
31      Text: stepfunctions.JsonPath.stringAt("$.inputText")
32    },
33    resultSelector: translateResultSelector
34  };
35  const translateState = new tasks.CallAwsService(
36    scope,
37    "TranslateByAmazonTranslate",
38    callAWSServiceProps
39  );
40
41  // Tweet state
42  const tweetState = lambdaFunctionToTask(scope, tweetFunc, {});
43
44  const definition = initState.next(translateState).next(tweetState);
45
46  const logGroup = StateMachineLogGroup(
47    scope,
48    "TranslateTweetStateMachineLogGroup"
49  );
50  new stepfunctions.StateMachine(scope, "TranslateTweetStateMaschine", {
51    stateMachineName: "honyakutter-ts-translate-tweet-state-maschine",
52    stateMachineType: stepfunctions.StateMachineType.EXPRESS,
53    timeout: Duration.seconds(stateMachineTimeout),
54    definition: definition,
55    logs: {
56      destination: logGroup,
57      level: stepfunctions.LogLevel.ALL
58    }
59  });
60}
61
62function lambdaFunctionToTask(
63  scope: Construct,
64  func: lambda.Function,
65  resultSelector: { [key: string]: string }
66): tasks.LambdaInvoke {
67  const props: tasks.LambdaInvokeProps = {
68    lambdaFunction: func,
69    invocationType: tasks.LambdaInvocationType.REQUEST_RESPONSE,
70    timeout: Duration.seconds(taskTimeout),
71    resultSelector: resultSelector
72  };
73
74  return new tasks.LambdaInvoke(scope, func.functionName, props);
75}
76
77export { translateTweetStateMaschine };

ハイライトしている箇所がステートマシンの各ステートになっています。ひとつずつ見ていきます。

何もしない最初のステート

15const initState = new stepfunctions.Pass(scope, "init", {
16  comment: "init state"
17});

これは一番最初に置くステート (必ず置くわけではない) で、Type は Pass です。Pass は input をそのまま output として次のステートに渡します。

AWS SDK Integration で翻訳を実行するステート

20  const translateResultSelector: { [key: string]: string } = {
21    "inputText.$": "$.TranslatedText"
22  };
23  const callAWSServiceProps: tasks.CallAwsServiceProps = {
24    service: "Translate",
25    action: "translateText",
26    iamResources: ["*"],
27    iamAction: "translate:TranslateText",
28    parameters: {
29      SourceLanguageCode: stepfunctions.JsonPath.stringAt("$.sourceLang"),
30      TargetLanguageCode: stepfunctions.JsonPath.stringAt("$.targetLang"),
31      Text: stepfunctions.JsonPath.stringAt("$.inputText")
32    },
33    resultSelector: translateResultSelector
34  };
35  const translateState = new tasks.CallAwsService(
36    scope,
37    "TranslateByAmazonTranslate",
38    callAWSServiceProps
39  );

続いては、input 内の文字列を翻訳するステートで、 Type は Task です。Task では、 Lambda 関数を実行したり、 ECS タスクを起動したり、 AWS SDK Integration で AWS の各サービスのアクションを実行することができます。

ここでは AWS SDK Integration で Amazon Translate の TranslateText を実行しています。

AWS SDK Integration で各サービスのアクションを実行する場合は、 tasks.CallAwsService() でタスクを生成します。その際の props で、 “どのサービスの”“どのアクションを”“どんなパラメータで” 実行するかを指定します。実行時のパラメータとしてステートへの input 内の値を使用する場合は JsonPath.stringAt() を使用します。

今回、このステートには下記のような JSON が input として渡されるので、それぞれ "$.sourceLang" のようにして取得し、 TranslteText の各パラメータにセットしています。

{
  "sourceLang": "ja",
  "targetLang": "en",
  "inputText": "こんにちは"
}

また、このステートでは ResultSelector として "inputText.$": "$.TranslatedText" を指定しています。これは、この Task の実行結果をステートの output として次のステートに渡すために良い感じに加工するための設定です。今回であれば、ResultSelector を指定しない場合は Amazon Translate の TranslateText の実行結果である下記のような JSON がそのままこのステートの output となり、次のステートに渡されます。

{
  "SourceLanguageCode": "ja",
  "TargetLanguageCode": "en",
  "TranslatedText": "Hello"
}

次のステートである Lambda 関数には {"inputText": "hogehoge"} という JSON を event (= input) として渡す必要があります。なので、この翻訳ステートの output を ResultSelector で加工しています。
ResultSelector を "inputText.$": "$.TranslatedText" のように定義することで、このステートの output は下記のような JSON になります。

{
  "inputText": "Hello"
}

ツイートする Lamdba 関数を実行するステート

42const tweetState = lambdaFunctionToTask(scope, tweetFunc, {});

続いては、 input 内の文字列をツイートする Lambda 関数を実行するステートで、 Type はこちらも Task です。このステートでは ResultSelector は指定していないので、 Lambda 関数からのレスポンスがそのままステートの output になります。

ステートマシンの定義を生成

最後に、これまで定義したステートを繋げてステートマシンの定義を生成します。

44const definition = initState.next(translateState).next(tweetState);

「CDK で Step Fucntions のステートマシンを定義するの、めっちゃ楽!」 というのは以前から聞いていたのですが、その楽さを身を以て感じました。
やっているのは、各ステートを next() で繋いでいるだけです。直感的でわかりやすくてめっちゃ楽ですねこれは。

例えば、 terraform でこれと同じ定義をしようと思うと、下記のような記述をする必要があります。

resource "aws_sfn_state_machine" "sfn_state_machine" {
  name     = "honyakutter-ts-translate-tweet-state-maschine"
  type     = "EXPRESS"

  definition = <<EOF
{
  "StartAt": "init",
  "States": {
    "init": {
      "Type": "Pass",
      "Comment": "init state",
      "Next": "TranslateByAmazonTranslate"
    },
    "TranslateByAmazonTranslate": {
      "Next": "honyakutter-ts-tweet-function",
      "Type": "Task",
      "ResultSelector": {
        "inputText.$": "$.TranslatedText"
      },
      "Resource": "arn:aws:states:::aws-sdk:translate:translateText",
      "Parameters": {
        "SourceLanguageCode.$": "$.sourceLang",
        "TargetLanguageCode.$": "$.targetLang",
        "Text.$": "$.inputText"
      }
    },
    "honyakutter-ts-tweet-function": {
      "End": true,
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "Type": "Task",
      "TimeoutSeconds": 60,
      "ResultSelector": {},
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:honyakutter-ts-tweet-function",
        "Payload.$": "$",
        "InvocationType": "RequestResponse"
      }
    }
  },
  "TimeoutSeconds": 300
}
EOF
}

この definition の部分は手書きです。辛いですねこれは。

CDK のテスト

CDK のテストとは、定義したリソースが正しく構築されるかをテストします。今回であれば、 Lambda 関数、 Step Functions のステートマシンが定義した通りに構築されるかをテストします。

と言っても実際にデプロイして構築されるかをチェックするわけではなく、 TypeScript で記述した定義から CloudFormation のテンプレートを生成し、そのテンプレートに対して期待するリソースが存在するか、というテストになります。具体的には、 Lambda 関数が構築されることのテストコードは下記のようになります。

 1import * as cdk from "aws-cdk-lib";
 2import { Template } from "aws-cdk-lib/assertions";
 3import * as HonyakutterTs from "../lib/honyakutter-ts-stack";
 4
 5test("Tweet Lambda function Created", () => {
 6  // set environment values for test
 7  process.env.TWITTER_API_KEY = "twitter_api_key_for_test";
 8  process.env.TWITTER_API_KEY_SECRET = "twitter_api_key_secret_for_test";
 9  process.env.TWITTER_ACCESS_TOKEN = "twitter_access_token_for_test";
10  process.env.TWITTER_ACCESS_TOKEN_SECRET = "twitter_access_token_secret_for_test";
11
12  const app = new cdk.App();
13  const stack = new HonyakutterTs.HonyakutterTsStack(app, "StackForTest");
14  const template = Template.fromStack(stack);
15
16  template.hasResourceProperties("AWS::Lambda::Function", {
17    FunctionName: "honyakutter-ts-tweet-function",
18    Description: "Tweet text.",
19    Runtime: "go1.x",
20    Handler: "main",
21    MemorySize: 128,
22    Timeout: 60,
23    Environment: {
24      Variables: {
25        GOTWI_API_KEY: "twitter_api_key_for_test",
26        GOTWI_API_KEY_SECRET: "twitter_api_key_secret_for_test",
27        GOTWI_ACCESS_TOKEN: "twitter_access_token_for_test",
28        GOTWI_ACCESS_TOKEN_SECRET: "twitter_access_token_secret_for_test"
29      }
30    }
31  });
32});

12, 13 行目でスタックを生成し、そのスタックをもとに 14 行目で CloudFormation のテンプレートを生成しています。
そして 16 行目で、生成したテンプレートが期待するプロパティを持つリソースを含んでいるかをテストしています。

デプロイした Lambda 関数、ステートマシンを手動で実行する

今回デプロイされるメインのリソースとしては Lambda 関数と Step Functions のステートマシンです。(その他 IAM Role/IAM Policy/CloudWatch Logs の LogGroup 等ももちろんあります)

デプロイ後の動作確認方法として、 Lambda 関数単体の実行、ステートマシンとしての実行をそれぞれ手動でできるような準備をしました。冒頭で testdata ディレクトリについて触れましたが、そこに配置した JSON を使ってそれぞれ手動実行します。

実行するにあたっては AWS CLI を使用します。使用するバージョンは v2.4.5 および 1.22.21 です。 v2v1 では若干コマンドに違いがあるので、それについては後述します。

Lambda 関数単体の実行

Lambda 関数の実行には下記コマンドを実行します。(v2 で実行する場合のコマンド)

aws lambda invoke \
--function-name honyakutter-ts-tweet-function \
--invocation-type Event \
--region ap-northeast-1 \
--payload fileb://testdata/tweet_lambda_payload.json \
out

このコマンドに関して、 v1 で実行する場合は下記のように変更します。

  aws lambda invoke \
  --function-name honyakutter-ts-tweet-function \
  --invocation-type Event \
  --region ap-northeast-1 \
- --payload fileb://testdata/tweet_lambda_payload.json \
+ --payload file://testdata/tweet_lambda_payload.json \
  out

これは AWS CLI の v2 と v1 でバイナリパラメータの扱いが違うためです。詳細については今回の記事の内容からはズレるので、下記ドキュメントを参照してください。

ステートマシンの開始

Step Functions のステートマシンを開始するには、まずは下記コマンドを実行します。

STATEMACHINE_ARN=$(
    aws stepfunctions list-state-machines \
    --query "stateMachines[?name=='honyakutter-ts-translate-tweet-state-maschine'].stateMachineArn" \
    --output text
) && echo "${STATEMACHINE_ARN}"

続いて、取得した ARN をもとにステートマシンを開始します。

aws stepfunctions start-execution \
--state-machine-arn "${STATEMACHINE_ARN}" \
--input file://testdata/statemachine_input.json

うまくいくと英語に翻訳された文章がツイートされます。

[2021-12-07 17:40:17.516840688 +0000 UTC m=+0.036532198] Hello. This is translated from Japanese to English in Amazon Translate (AWS SDK Task State) using Step Functions and tweeted with Lambda functions It is a sentence that was made.

— michimani Lv.861 (@michimani210) December 7, 2021

まとめ

弁護士ドットコム Advent Calendar 2021 の 9 日目の記事として、Go と AWS を無理やり混ぜ込んだ話でした。

去年のハンズオン形式の記事 とは違って単純なやってみた記事担ってしまいましたが、冒頭に書いた通り、Go で Lambda 関数を書きたい方、 AWS CDK を使って Step Functions のステートマシンを構築してみたい方の参考になれば幸いです。(早く AWS CDK (golang) で触ってみた話も書きたい)

明日の担当は @takapiya さんです。お楽しみに!


comments powered by Disqus