Lambda(node.js)のトリガーでS3にPUTされたCSVをRDSにインサートする – AWS SDK for JavaScript v3

Lambda(node.js)のトリガーでS3にPUTされたCSVをRDSにインサートする – AWS SDK for JavaScript v3

前提

  • RDSはパブリックアクセス可能,Aurora(MySQL)
  • Serverless Lambda
項目
文字コードutf-8
改行コードCRLF
csv

2行3列のcsv(headerあり)

Lambdaのトリガー設定

S3にPUTされたcsvをトリガーとして、そのタイミングでLambdaを実行することができます。

Lambdaのトリガー設定します。

Lambda(node.js)のトリガーでS3にPUTされたCSVをRDSにインサートする – AWS SDK for JavaScript v3

プレフィックスでオブジェクトキーを指定したり、サフィックスで拡張子を指定したりと細かい設定ができるようです。

使用するモジュールは以下です。

npm i @aws-sdk/client-s3 mysql2 csv-parse

Lambda(index.js)

Lambdaソースです。

import {S3Client, SelectObjectContentCommand} from '@aws-sdk/client-s3'
import * as mysql2  from 'mysql2/promise'
import {parse} from 'csv-parse/sync'
export async function handler(event, context) {
  // PUTされたバケット、オブジェクトキーが取得可能
  const bucket = event['Records'][0]['s3']['bucket']['name']
  const key = event['Records'][0]['s3']['object']['key']
  const input = {
    Bucket: bucket,
    Key: key,
    ExpressionType: 'SQL',
    Expression: 'SELECT s._1,s._2 FROM S3Object s',
    InputSerialization: {
      CSV: {
        FileHeaderInfo: 'IGNORE', // ヘッダ行無視
        RecordDelimiter: '\r\n',
        FieldDelimiter: ','
      }
    },
    OutputSerialization: {
      CSV: {}
    }
  };
  const client = new S3Client({
    region: 'ap-northeast-1'
  })
  try {
    // S3
    const command = new SelectObjectContentCommand(input)
    const data = await client.send(command)
    const csv = parse(await streamToCsv(data.Payload))

    // RDS
    const conn = await mysql2.createConnection({
      host:'データベースエンドポイント',
      user: 'ユーザ名',
      password: 'パスワード',
      database: 'データベース名'
  })
    for await (const records of csv) {
      await conn.query("insert into user(id,name) values(?,?)", [records[0],records[1]])
    }
    await conn.end()
  }catch (e) {
    console.log(e)
    return
  }
  const response = {
    statusCode: 200,
    body: JSON.stringify('Hello from Lambda!'),
  }
  return response
}
  
async function streamToCsv(generator) {
  const chunks = []
  for await (const value of generator) {
    if (value.Records) {
      chunks.push(value.Records.Payload);
    }
  }
  let payload = Buffer.concat(chunks).toString('utf8')
  return payload
}

トリガーとなるPUTを行います。

aws s3 cp testdata.csv s3://バケット名/tmp/testdata.json

正常にuploadされると、Lambdaが実行され、RDSのテーブルに2レコード追加されています。

Lambda(node.js)のトリガーでS3にPUTされたCSVをRDSにインサートする – AWS SDK for JavaScript v3

コメント

タイトルとURLをコピーしました