QLITRE DIALY

AWSサーバレス Chaliceでさくっとニュース配信アプリ

2024年09月28日

先日のエントリで書いた通りAWS SAAに合格をした。

AWS 認定ソリューションアーキテクト - アソシエイトに合格したので振り返る

SAAというと感覚的にはサーバレスのソリューションを問われるような問題が多かったように思える。

サーバーを立てないでSNSとかLambdaでごにょごにょやるという感じ。

座学でサービス構成は触れていたものの、実際に手を動かして作った経験があまりなかった。

というわけで今回は簡易的なサーバレスアプリを作ってみようと思う。

作るもの

定期的にニュースサイトをスクレイピングして、新着ニュースタイトルとリンクを配信するアプリ。

要件

  • 一日に3時間おきに5回実行して、特定のサイトの最新ニュースのリンクとタイトルを配信する。
  • 今回はナタリーを題材とする。
  • 配信済みのニュースが重複して配信されないようにする。

利用技術

AWS

  • Amazon EventBridge

一日に4回Lambdaを呼び出す。

  • Lambda

Amazon EventBridgeから呼び出され、ニュースサイトをスクレイピングして、タイトルとリンクを収集する。

  • DynamoDB

配信済みのニュースを管理する。

  • Amazon SNS

収集したニュースをEメールで配信する。

プログラム

Pythonを使う。

versionは3.10.0

実装

早速実装に入っていく。

SNSトピックの作成

まずは配信用のSNSトピックを作成する。

AWSコンソールにログインし、SNSにアクセス。

ダッシュボード→トピック→トピックの作成と進む。

FIFOトピックとスタンダードトピックがあるが、今回はスタンダードトピックを作成する。

FIFOトピックだと重複が起きないなどのメリットはあるが、SQSキューにしか配信できない。

今回はそこまでシビアな管理は必要なく、メールで直接配信できればよしとする。

名前と表示名は適当につけて作成をする。

遷移先でサブスクリプションの作成をする。

プロトコルにはEメールを選択、エンドポイントには自分のメールアドレスを入れる。

作成するとAWSからメールが届くのでConfirm subscriptionをクリックして、本登録を行う。

これでSNSの準備は完了。

DynamoDB

同様にAWSコンソールからアクセスする。

テーブル名をNewsUrls 、パーティションキーをnews_url、ソートキーをsite_nameとする。

他全てデフォルトの設定でテーブルを作成する。

これでAWS側の準備は終了。

Pythonプログラムの作成

Lambdaへのデプロイはchaliceを使う。

前提としてAWS CLIツールとアクセスキーを使うので準備をする。

まずはchaliceをpip installする。

pip install chalice

適当なディレクトリでchaliceコマンドを叩いてアプリを作成する。

chalice new-project news-notifier

projectフォルダができるので、仮想環境を作成する。

lambdaをpython 3.10系で動かしたいのでpyenvで環境を作るようにする。

※3.8系は2024年10月にサポート終了予定

cd news-notifier
pyenv local 3.10.0
python -m venv myvenv

requirements.txtに必要ライブラリを書く。

chalice
Beautifulsoup4
requests
boto3
botocore

インストールする。

myvenv\scripts\activate
pip install -r requirements.txt

続いてapp.pyにPythonプログラミングを記述する。

import os
import time
from chalice import Chalice, Cron
import boto3
from bs4 import BeautifulSoup
import requests
from botocore.exceptions import ClientError

app = Chalice(app_name='news-notifier')


def scrape_natalie(data: dict):
    """ナタリーからニュースリンクを収集する"""
    site_name = "ナタリー"
    data[site_name] = {}
    url = 'https://natalie.mu/news/'
    r = requests.get(url)
    soup = BeautifulSoup(r.content, 'html.parser')
    soup = soup.find('div', class_='NA_section')
    news_cards = soup.find_all('div', class_='NA_card')
    for card in news_cards:
        title = card.find('p', class_='NA_card_title').get_text()
        # ここでaタグで特定できればいいが、タグリンクのaタグが含まれているためaタグを一個ずつ調べる
        news_url = None
        for a in card.find_all('a'):
            # urlにnewsが含まれるのが記事リンク
            if 'news' in a.get('href'):
                news_url = a.get('href')
                break
        if news_url:
            data[site_name][news_url] = title
    return data


def is_url_processed(table, url, site_name):
    """DynamoDBに登録済みか調べる"""
    try:
        response = table.get_item(
            Key={
                'news_url': url,
                'site_name': site_name
            }
        )
        return 'Item' in response
    except ClientError as e:
        # エラーが発生した場合は、Trueを返す
        return True


def prepare_notification(news):
    """通知メッセージを準備"""
    message = "新着ニュース:\n\n"
    for site_name, articles in news.items():
        message += f"{site_name}:\n\n"
        for title, url in articles.items():
            message += f"- {title}\n  {url}\n\n"
        message += "\n\n"
    return message


def save_to_dynamodb(table, news):
    """DynamoDBにアイテムを保存"""
    current_time = int(time.time())
    one_week_later = current_time + (7 * 24 * 60 * 60)
    for site_name, articles in news.items():
        for url, title in articles.items():
            try:
                table.put_item(
                    Item={
                        'news_url': url,
                        'site_name': site_name,
                        'title': title,
                        'timestamp': current_time,
                        'expiration_time': one_week_later
                    }
                )
            except ClientError as e:
                if e.response['Error']['Code'] == 'ValidationException':
                    continue
                else:
                    # 他のClientErrorの場合は再raise
                    raise


# 毎日 8:30 11:30 14:30 17:30に実行
# UTCなので、9H前で指定
@app.schedule(Cron(minutes='30', hours='23,2,5,8,11', day_of_month='?', month='*', day_of_week='*', year='*'))
def scrape_and_notify(event):
    """メイン処理"""
    # DynamoDBクライアントの設定
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(os.environ.get('DYNAMODB_TABLE'))

    # SNSクライアントの設定
    sns = boto3.client('sns')
    sns_topic_arn = os.environ.get('SNS_TOPIC_ARN')
    new_news = {}
    # 新しいニュースをスクレイピング
    new_news = scrape_natalie(new_news)
    # 配信するニュース
    news_subscribe = {}
    for site_name, articles in new_news.items():
        found = False
        d = {}
        for url, title in articles.items():
            # 既にDynamoDBに登録済みだったら配信済みなのでスキップ
            if is_url_processed(table, url, site_name):
                continue
            found = True
            d[url] = title
        if found:
            news_subscribe[site_name] = d
    if news_subscribe:
        # SNS通知のテキストを作る
        notification_message = prepare_notification(news_subscribe)
        # SNS通知する
        if notification_message:
            sns.publish(
                TopicArn=sns_topic_arn,
                Message=notification_message
            )

        # DynamoDBへの保存
        save_to_dynamodb(table, news_subscribe)

以下のような流れ

  1. スクレイピング
  • 「ナタリー」サイトからニュースタイトルとURLを収集するために、BeautifulSoupを使用。
  • requestsを使ってサイトにアクセスし、ニュースカードの要素からタイトルとURLを抽出する。
  • これを1日に4回(8:30、11:30、14:30、17:30)実行する。
  1. DynamoDBへの保存
  • 重複したニュースを再通知しないように、取得したURLをDynamoDBに保存。
  • スクレイピングしたURLが既にDynamoDBに存在するかをチェックし、未処理のものだけを通知対象にする。
  • 各ニュースはタイムスタンプ付きで保存され、1週間後に自動的に削除できるように設定値を保存
  1. SNSでの通知
  • 新しいニュースが見つかると、SNSに対して通知を送信する
  • メッセージには、ニュースタイトルとそのURLが含まれ、複数のニュースがあればまとめて通知する。

LambdaからDynamoDBとSNSを操作する必要があるので、.chalice/policy.jsonを作成し、IAMロールを付与する。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:GetItem",
        "dynamodb:PutItem"
      ],
      "Resource": "arn:aws:dynamodb:YOUR_REGION:YOUR_ACCOUNT_ID:table/YOUR_DYNAMODB_TABLE_NAME"
    },
    {
      "Effect": "Allow",
      "Action": [
        "sns:Publish"
      ],
      "Resource": "arn:aws:sns:YOUR_REGION:YOUR_ACCOUNT_ID:YOUR_SNS_TOPIC_NAME"
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    }
  ]
}

Resourceには作成したDynamoDBとSNSのarnで置き換える。

次に.chalice/config.jsonを編集する。

先ほど作成したpolicy.jsonを適用するようにする。

{
  "version": "2.0",
  "app_name": "news-notifier",
  "stages": {
    "dev": {
      "autogen_policy": false,
      "iam_policy_file": "policy.json",
      "environment_variables": {
        "DYNAMODB_TABLE": "arn:aws:dynamodb:YOUR_REGION:YOUR_ACCOUNT_ID:table/YOUR_DYNAMODB_TABLE_NAME",
        "SNS_TOPIC_ARN": "arn:aws:sns:YOUR_REGION:YOUR_ACCOUNT_ID:YOUR_SNS_TOPIC_NAME"
      }
    }
  }
}

デプロイする。

chalice deploy

AWSにログインし、EventBridgeをトリガーに関数がデプロイされていることを確認する。

テスト

これで定期実行されるはずだが、テストを行う。

以下のようなダミーのeventのjsonデータを貼り付けてテスト実施。

{
  "version": "0",
  "id": "12345678-1234-1234-1234-123456789012",
  "detail-type": "Scheduled Event",
  "source": "aws.events",
  "account": "123456789012",
  "time": "2024-09-28T12:00:00Z",
  "region": "us-east-1",
  "resources": [
    "arn:aws:events:us-east-1:123456789012:rule/MyScheduledRule"
  ],
  "detail": {}
}

以下のようにメールが配信される。

TTLの設定

DynamoDBの役割としては配信済みかどうかを管理しているだけなので、古いデータは必要ない。

現在、トップページのみを確認しているので、一週間くらい保持すればいいとする。

(一週間経てばニュースが全て入れ替わると想定)

DynamoDBの機能としてTTL機能があるので、これを活用して古いデータを自動的に削除する。

追加の設定からTTLをオンにする。

データ保存時に設定しているexpiration_timeを指定する。

プレビューで次の7日間として、テスト実行したURLが削除されることを確認する。

これで定期的に削除してテーブルサイズを節約できる。

ソースコード

githubに公開した。

https://github.com/qlitre/news-notifier

おわりに

あまりサクッととはいかなかったが、作りたい物ができたのでよかった。

とはいえSAAで概念は習得済みだったので躓きは少なかったと思う。

例えばLambdaにロールを持たせる必要があるなど、把握してなかったら躓いていた可能性が高い。

今回は単独のニュース配信だが、プログラムを拡張して複数のニュースサイトを巡回したり、SNSを使わずLineにプッシュ通知するように変更しても面白いかもしれない。

あとは、まぁこんなことしなくても、RSSフィードがあるじゃんという指摘はあるかと思う。

ただフィードが用意されていないサイトもあるし、あとはプッシュ型で決まった時間に複数サイトのニュースを一括して受け取れるという点にメリットはありそう。

他にも何か思いついたら共有したい。ではでは。