Spring BootでS3 Selectの使い方

Spring BootでS3 Selectの使い方

S3のとあるバケットに以下のようなCSVがあるとします。バケット名は「saba6seimenjo」とします。

name age height weight
Bob 20 170 60
Tom 30 175 65
Gaga 25 155 44

Spring Bootを起動するクラスをちょっと編集します。今回はS3 Selectが目的なので。

package jp.co.confrage;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

import lombok.RequiredArgsConstructor;

@SpringBootApplication
@RequiredArgsConstructor
public class AwsS3selectApplication{
  private final Sample sample;

  public static void main(String[] args) throws Exception {
    try (ConfigurableApplicationContext ctx = SpringApplication.run(AwsS3selectApplication.class, args)) {
    AwsS3selectApplication awsS3 = ctx.getBean(AwsS3selectApplication.class);
    awsS3.run();
  }
}

  public void run() throws Exception {
    sample.run();
  }
}

SampleクラスをコンストラクタインジェクションでDIしてます。

SampleクラスでS3オブジェクトを作成してS3 Selectを実装してみます。

※DIは、依存性の注入=オブジェクトの注入という意味です

AmazonS3のselectObjectContentメソッドは以下のように書いています。

This operation filters the contents of an Amazon S3 object based on a simple Structured Query Language (SQL) statement.
In the request, along with the SQL expression, you must also specify a data serialization format (JSON or CSV) of the
object. Amazon S3 uses this to parse object data into records, and returns only records that match the specified SQL
expression. You must also specify the data serialization format for the response.

S3のオブジェクト(今回ではCSV)をSQLでフィルターする。要するにCSVに対してSQLを発行できますよ、みたいな感じでしょうか。

この前紹介したVSCodeのRainbow CSVと同じような感覚です。

selectObjectContentメソッドに渡す引数は、SelectObjectContentRequesオブジェクトです。

インスタンス生成してバケット、キー、クエリーを設定します。以下、Sampleクラスです。

package jp.co.confrage;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.util.Arrays;

import org.springframework.stereotype.Component;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.CSVInput;
import com.amazonaws.services.s3.model.CSVOutput;
import com.amazonaws.services.s3.model.CompressionType;
import com.amazonaws.services.s3.model.ExpressionType;
import com.amazonaws.services.s3.model.InputSerialization;
import com.amazonaws.services.s3.model.OutputSerialization;
import com.amazonaws.services.s3.model.SelectObjectContentEventVisitor;
import com.amazonaws.services.s3.model.SelectObjectContentRequest;
import com.amazonaws.services.s3.model.SelectObjectContentResult;

@Component
public class Sample {

public void run() throws Exception {
  AWSCredentials credentials = new ProfileCredentialsProvider("default").getCredentials();
  // s3クライアントを生成 start
  AmazonS3 s3 = AmazonS3ClientBuilder
    .standard()
    .withCredentials(new AWSStaticCredentialsProvider(credentials))
    .withRegion("ap-northeast-1")
    .build();
  // s3クライアントを生成 end

  // bucket,key,queryを指定 start
  String bucketName = "saba6seimenjo";
  String key = "test.csv";
  String query = "SELECT s._1 FROM S3Object s ";
  // bucket,key,queryを指定 end

  // SelectObjectContentRequesを生成 start
  SelectObjectContentRequest selectRequest = new SelectObjectContentRequest();
  selectRequest.setBucketName(bucketName);
  selectRequest.setKey(key);
  selectRequest.setExpression(query);
  selectRequest.setExpressionType(ExpressionType.SQL);

  InputSerialization inputSerialization = new InputSerialization();
  inputSerialization.setCsv(new CSVInput());
  inputSerialization.setCompressionType(CompressionType.NONE);
  selectRequest.setInputSerialization(inputSerialization);

  OutputSerialization outputSerialization = new OutputSerialization();
  outputSerialization.setCsv(new CSVOutput());
  selectRequest.setOutputSerialization(outputSerialization);
  // SelectObjectContentRequesを生成 end

  SelectObjectContentResult content = s3.selectObjectContent(selectRequest);
  InputStream resultInputStream = content.getPayload().getRecordsInputStream(new SelectObjectContentEventVisitor() {});

  fileOutput(resultInputStream); // ローカルに出力
}

  private void fileOutput(InputStream resultInputStream) throws Exception {
    String filePath = "C:\\Users\\takahashi\\Documents\\output.csv"; // このファイルにS3 Select結果を出力します
    File file = new File(filePath);

    int offset;
    int bytesRead;
    byte[] data = new byte[5242880];// バッファ
    boolean next;
    int outerLoop = 0;
    int innerLoop = 0;
    String renban = "";

    do {
      offset = 0; // offsetのリセット
      Arrays.fill(data, (byte)0);
      next = false;
      while ((bytesRead = resultInputStream.read(data, offset, data.length - offset)) != -1) {
        offset += bytesRead;
        innerLoop++;
        if (offset >= data.length) {
          next = true;
          break;
        }
      }
      if (offset >= 1) {
        ByteArrayInputStream bais = new ByteArrayInputStream(data, 0, offset);
        renban = String.format("%03d", outerLoop);
        FileOutputStream fos = new FileOutputStream(filePath);
        fos.write(data);
        bais.close();
        fos.close();
      }
      outerLoop++;
    } while(next);
  }
}

これを実行すると、

Bob
Tom
Gaga

ちなみにSQLは上記例のようにエイリアスは「s」である必要はありません。

String query = "SELECT a._1,a._2 FROM S3Object a ";

こうすると結果は以下のようになります。

Bob,20
Tom,30
Gaga,25

build.gradle

以下が必要です。

implementation 'org.springframework.cloud:spring-cloud-aws-context'
implementation 'org.springframework.integration:spring-integration-aws:2.0.0.RELEASE'
compile 'com.amazonaws:aws-java-sdk-s3'

以下のサイトが非常に役立ちました。

参考サイト:There is no EC2 meta data available

参考サイト:https://docs.aws.amazon.com/ja_jp/AmazonS3/latest/dev/SelectObjectContentUsingJava.html

コメント

タイトルとURLをコピーしました