michimani.net

Google Analytics Reporting API を定期実行する Lambda を CDK で構築してみた

2020-07-11

Google Analytics の Reporting API v4 を使って定期的に PV ランキングを生成して、その結果を Amazon S3 にエクスポートする処理を AWS Lambda で実装し、諸々の構成を AWS CDK を使って管理するようなものを作ってみました。

PV 数の取得までは以前にもやっていたので、今回はそれを Lambda で実装、結果を S3 にエクスポートするようにしてみました。

目次

概要

以上の処理をする構成を、 AWS CDK を使って実装しました。

構成図としては次のようになります。

architecture

PUT 対象となる S3 バケットは既に存在しているものとして、 CDK での管理から除外しています。また、 Google のサービスアカウントキーについては、ローカル環境から AWS CLI で SSM のパラメータストアに PUT するものとして、こちらも CDK での管理から除外しています。

なので CDK で管理するリソースは、 EventBridge のルールと Lambda 関数 (とそれに付随する IAM ポリシー) ということになります。

前提

上記の準備については下記の公式レイファレンスを参照してください。

実装

詳細については GitHub のリポジトリを参照してもらえればと思うので、ここでは Lambda 関数の中身と CDK の記述について簡単に説明します。

Lambda 関数

Lambda 関数は Python 3.7 で実装しています。関数の中身の前に、 CDK プロジェクト内でのディレクトリ構成について書いておきます。

CDK プロジェクト内でのディレクトリ構成

今回は Google Analytics の Reporting API を使うということで、下記のモジュールをインストールしています。

google-api-python-client
google-auth-httplib2
google-auth-oauthlib

また、 Reporting API では一回で各ページのタイトルまで取得できないので、上位 5 件のページタイトルを取得するために requests モジュールもインストールしています。

リポジトリ内のディレクトリ構成としては、直下に lambda ディレクトリを作成し、その中は次のような構成としています。

lambda
├── dist
├── requirements.txt
└── src
    └── fetch_rank.py

あとで書きますが、デプロイ時には dist ディレクトリを Lambda のデプロイパッケージ用のディレクトリとして使ってデプロイをします。

関数の実装内容

Lambda 関数の実装は次のようになっています。

from google.oauth2 import service_account
from googleapiclient.discovery import build
import boto3
import json
import logging
import os
import re
import traceback
import requests

SCOPES = ['https://www.googleapis.com/auth/analytics.readonly']
CLIENT_SECRET_SSM_KEY = os.environ.get('CLIENT_SECRET_SSM_KEY')
VIEW_ID = os.environ.get('VIEW_ID')
OUT_S3_BUCKET = os.environ.get('OUT_S3_BUCKET')
OUT_JSON_KEY = os.environ.get('OUT_JSON_KEY')
SITE_BASE_URL = os.environ.get('SITE_BASE_URL')

s3 = boto3.resource('s3')
ssm = boto3.client('ssm')
logger = logging.getLogger()
logger.setLevel(logging.INFO)


def get_ssm_param(key):
    # type: (str) -> str
    """ Get parameter fron SSM Parameter Store.
    Args:
        key: string for SSM parameter store key
    Returns:
        string
    """
    response = ssm.get_parameters(
        Names=[
            key,
        ],
        WithDecryption=True
    )
    return response['Parameters'][0]['Value']


def initialize_analyticsreporting():
    # type: () -> build
    """Initializes an Analytics Reporting API V4 service object.
    Returns:
      An authorized Analytics Reporting API V4 service object.
    """
    client_secret_string = get_ssm_param(CLIENT_SECRET_SSM_KEY)

    client_secret = json.loads(client_secret_string)
    credentials = service_account.Credentials.from_service_account_info(
        client_secret, scopes=SCOPES)

    # Build the service object.
    analytics = build('analyticsreporting', 'v4',
                      credentials=credentials,
                      cache_discovery=False)
    return analytics


def get_report(analytics):
    # type: (build) -> dict
    """Queries the Analytics Reporting API V4.
    Args:
      analytics: An authorized Analytics Reporting API V4 service object.
    Returns:
      The Analytics Reporting API V4 response.
    """
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': VIEW_ID,
                    'dateRanges': [{'startDate': '7daysAgo', 'endDate': 'yesterday'}],
                    'metrics': [{'expression': 'ga:pageviews'}],
                    'dimensions': [{'name': 'ga:pagePath'}]
                }]
        }
    ).execute()


def calc(response):
    # type: (dict) -> ()
    """Calculate page views of each page path.
    Args:
        response: The Analytics Reporting API V4 response.
    """
    calc_res = dict()
    pv_summary = []
    report = response.get('reports', [])[0]
    for report_data in report.get('data', {}).get('rows', []):
        # get page path
        page_path = report_data.get('dimensions', [])[0]
        # ignore query parameters
        page_path = re.sub(r'\?.+$', '', page_path)

        # get page view
        page_view = int(report_data.get('metrics', [])[0].get('values')[0])

        if page_path in calc_res:
            calc_res[page_path] += page_view
        else:
            calc_res[page_path] = page_view

    for path in calc_res:
        pv_summary.append({
            'page_path': path,
            'page_views': calc_res[path]
        })

    # sort by page views
    pv_summary.sort(
        key=lambda path_data: path_data['page_views'], reverse=True)

    return pv_summary


def report_to_rank(report, count=5):
    # type: (list) -> list
    """ Convert report data to ranking data
    Args:
        report: list object to convert
        count: number of ranking post. default is 5
    Returns:
        list
    """

    if count == 0 or len(report) < count:
        count = 5

    rank_tmp = report[:count]

    rank = list()
    try:
        for rt in rank_tmp:
            post_url = SITE_BASE_URL + rt['page_path']
            rank.append({
                'post_url': post_url,
                'post_title': get_post_title(post_url)})
    except Exception:
        print('An error occured in getting post title process.')
        print(traceback.format_exc())

    return rank


def get_post_title(post_url):
    # type: (str) -> str
    """ Get post title from post url
    Args:
        post_url: URL of the post
    Returns:
        string
    """
    post_title = ''

    try:
        res = requests.get(post_url)
        body = res.text
        post_title = re.sub(r'[\s\S]+<title>(.*)<\/title>[\s\S]+', r'\1', body)
    except Exception:
        print(f'Failed to get post title of "{post_url}"')
        print(traceback.format_exc())

    return post_title


def put_to_s3(data, key):
    # type: (dict, str) -> ()
    """ Put object to S3 bucket
    Args:
        data: dict or list object to put as JSON.
        key: object key
    """

    try:
        s3obj = s3.Object(OUT_S3_BUCKET, key)
        body = json.dumps(data, ensure_ascii=False)
        s3obj.put(
            Body=body,
            ContentType='application/json;charset=UTF-8',
            CacheControl='public, max-age=1209600')
    except Exception:
        logger.error('Put count data failed: %s', traceback.format_exc())


def main(event, context):
    analytics = initialize_analyticsreporting()
    response = get_report(analytics)
    summary = calc(response)
    rank = report_to_rank(summary)
    put_to_s3(rank, OUT_JSON_KEY)


if __name__ == '__main__':
    print('Running at local...\n\n')

    analytics = initialize_analyticsreporting()
    response = get_report(analytics)
    summary = calc(response)
    rank = report_to_rank(summary)
    print(json.dumps(rank, indent=2, ensure_ascii=False))

ほぼ 前回 と同じですが、上位 5 件のページタイトルについては requests を使って取得しています。

また、 事前に client_secrets.json の情報を SSM パラメータストアに登録しておき、 Lambda 関数内で取得して利用しています。

そして、最後には S3 バケットに PUT しています。
ローカルで実行する際には、 S3 バケットに PUT するのではなく標準出力に生成された JSON を出力します。

CDK

続いて CDK のメインとなるスタックの記述についてです。 CDK については TypeScript で記述しています。

環境依存の値については、次のような stac-config.json にまとめて、そこから値を取得して利用しています。

{
  "lambda": {
    "env": {
      "client_secret_ssm_key": "google-client-secret",
      "view_id": "00000000",
      "out_s3_bucket": "<aleady-exists-your-bucket>",
      "out_json_key": "data/rank.json",
      "site_base_url": "<your-site-base-url>"
    }
  },
  "event_bridge": {
    "cron_expression": "0 15 * * ? *"
  }
}

今回はシンプルな構成なので、スタックは分割せず一つのファイル lib/export-pv-rank-stack.ts で書いています。

import * as cdk from '@aws-cdk/core';
import lambda = require('@aws-cdk/aws-lambda');
import iam = require('@aws-cdk/aws-iam');
import events = require('@aws-cdk/aws-events');
import targets = require('@aws-cdk/aws-events-targets');
import fs = require('fs');


export class ExportPvRankStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const stackConfig = JSON.parse(fs.readFileSync('stack-config.json', {encoding: 'utf-8'}));

    // Lambda function
    const lambdaFn = new lambda.Function(this, 'fetchRank', {
      code: new lambda.AssetCode('lambda/dist'),
      runtime: lambda.Runtime.PYTHON_3_7,
      handler: 'fetch_rank.main',
      timeout: cdk.Duration.seconds(300),
      environment: {
        'CLIENT_SECRET_SSM_KEY': stackConfig['lambda']['env']['client_secret_ssm_key'],
        'VIEW_ID': stackConfig['lambda']['env']['view_id'],
        'OUT_S3_BUCKET': stackConfig['lambda']['env']['out_s3_bucket'],
        'OUT_JSON_KEY': stackConfig['lambda']['env']['out_json_key'],
        'SITE_BASE_URL': stackConfig['lambda']['env']['site_base_url'],
      }
    });

    lambdaFn.addToRolePolicy(new iam.PolicyStatement({
      actions: [
        's3:PutObject',
        'ssm:DescribeParameters',
      ],
      resources: ['*']
    }));

    lambdaFn.addToRolePolicy(new iam.PolicyStatement({
      actions: [
        'ssm:GetParameter',
        'ssm:GetParameters',
        'ssm:GetParameterHistory',
        'ssm:GetParametersByPath',
      ],
      resources: ['arn:aws:ssm:*']
    }));

    // EventBridge rule
    const fetchPvRanking = new events.Rule(this, 'FetchPvRanking', {
      schedule: events.Schedule.expression(`cron(${stackConfig.event_bridge.cron_expression})`)
    });

    fetchPvRanking.addTarget(new targets.LambdaFunction(lambdaFn));
  }
}

stac-config.json では Lambda 関数用の環境変数も保持しているので、 Lambda 関数オブジェクトを生成する際に environment で指定しています。

前述しましたが、今回は外部モジュールを使用しているため、それらを Lambda のデプロイパッケージに含める必要があります。 CDK では 次のようにパッケージとするディレクトリを指定することで実現できます。

new lambda.AssetCode('lambda/dist')

これを new lambda.Function()code に渡せばオッケーです。

デプロイ

CDK のデプロイ自体は cdk synth からの cdk deploy コマンドで完了しますが、その前に少し準備をします。

Python の外部モジュールのインストール

今回は Python の外部モジュールを使用するので、それらをデプロイパッケージ用のディレクトリ lambda/dist にインストールします。

$ pip3 install --upgrade -r ./lambda/requirements.txt -t ./lambda/dist/

本来であれば Lambda の実行環境である Amazon Linux 上でインストールしたモジュールを使うべきですが、今回は OS 依存となるようなモジュールを含まないためローカル環境でインストールしています。

ちなみに、画像処理用のモジュールである Pillow については OS 依存となるため、しっかりと環境を用意してインストールする必要があります。

あとは、 lambda/src 内にある関数本体となるスクリプトも lambda/dist にコピーしておきます。

$ cp -f ./lambda/src/fetch_rank.py ./lambda/dist/

これで準備は完了です。

CDK のデプロイ

$ cdk synth

で CLoudFormation テンプレートを生成して

$ cdk deploy

で デプロイします。

今回は IAM に関する変更・追加が発生するため、デプロイ時に確認されます。

このプロジェクトの使い所

最後に、このプロジェクトの使い所について説明します。

ブログでよくある「最近読まれている記事」みたいなものを静的サイトでも実現したいと思い、Google Analytics Reporting API v4 を使って直近一週間のページごとの PV 数を取得してみました。取得したデータを JSON とかで保持しておけば、前日までのデータで PV ランキングが作成できそうです。

これまた 前回 のブログで冒頭で書いていた内容なんですが、まさにこれを実現するためなんです。

静的サイトではあらかじめ各ページが HTML で生成されているため、動的な情報を表示するためには工夫が必要です。 SNS でのシェア数なども同じで、実際にこのブログでは各記事の はてブ数 を定期的に取得して JSON ファイルとして生成し、ページ表示時に JavaScript でその値を取得・表示させています。

今回はこれと同様の考えで、閲覧数ランキングを作りたいという思いで作りました。なので、同じように静的サイトでページの閲覧数ランキングを作りたいという方には参考になるかもしれません。

まとめ

Google Analytics の Reporting API v4 を使って PV ランキングを生成して、その結果を Amazon S3 にエクスポートする処理を AWS Lambda で実装し、諸々の構成を AWS CDK を使って管理するようなものを作ってみた話でした。

静的サイトで動的な情報を表示させるときの参考になれば幸いです。


comments powered by Disqus