Screaming Loud

研究・プログラミングなど気づいたことをメモをしています。読書記録はこちらに記載しています。https://bookmeter.com/users/75944

AkkaStreamで簡単にCSVファイル変換を行うことはできるか?

Scala Advent Calendar 2017の24日目、クリスマス前夜です。
前日はiTakeshi@githubさんのScalaメタプログラミング今昔物語 - 本編でした。

私はドラクエ11をプレイしており、今クリア後の世界を楽しむというクリスマス感が全くない日を過ごしています。

さて今回は、簡単に大きなデータのCSVファイルを簡単に変換する処理を実装してみたいと思います。

実装としては、
「S3からファイルを受け取ってそのファイルを変換しつつローカルに保存する」
というのをakka streamで実装してみたいと思います。

Akka streamとはなんぞや?という方は以下を読んでみてください

Akka Stream についての基礎概念 - Qiita

やりたいこと

ローカルのファイルを変換して保存し直す

  • 入力:CSVファイル
  • 出力:名寄せなど色々変換した後のCSVファイル

動かすマシンは低めでメモリで動かすため、なるべくメモリに載せないでストリーム処理で変換したい。

実装

ローカルのファイルのInputStreamを入力に受け付けて、変換処理を行いローカルに保存するというメソッドです。

  • 入力:java.io.InputStream, 変換に必要な情報
  • 出力:実行可能なマテリアル

下記のようにそれぞれの処理が分けられるというのがAkkaStreamの強みです。

  • stream:入力
  • processCSV:変換処理
  • sink:保存処理

実際にコードを見てみましょう

import java.nio.file.{ Paths, StandardOpenOption }
import akka.actor.ActorSystem
import akka.stream.{ ActorMaterializer, IOResult }
import akka.stream.scaladsl.{ FileIO, Flow, Keep, RunnableGraph, Sink, Source }
import akka.util.ByteString

implicit val system: ActorSystem = ActorSystem()
implicit val ec: ExecutionContext = system.dispatcher
implicit val mat = AkkaUtils.materializer(system)

case class CsvTransformFormat(
  rawFilePath: String, // S3のファイルのパス
  tempEditedFileName: String, // 変換後のファイルパス
  tempDownloadPath: String, // 変換前のファイルパス
  transformLine: String => Future[String], // 変換するメソッド
  headerLine: Option[String] = None, // CSVのヘッダー
  charset: String = "UTF-8"
)

def stream2string(csvStream: InputStream, format: CsvTransformFormat): RunnableGraph[Future[IOResult]] = {
    val stream = scala.io.Source.fromInputStream(csvStream, "UTF-8")

    val processCsv = Source
        .fromIterator(() => stream.getLines()) // InputStreamの入力
        .mapAsync(threadNum)(errorLogHandler(format.transformLine)) // threadNum数の並行処理で変換処理を実行
        .filter(_.nonEmpty) // 空のlineは除外
        .map(s => ByteString(s + "\n", format.charset)) // streamを登録できるようにByteStringに変換

    import StandardOpenOption._
    val sink = FileIO.toPath(Paths.get(filename), options = Set(APPEND)) // ファイルに追記

    processCsv.toMat(sink)(Keep.right) 
}

def transformLine(line: String): Future[String] = {
    val seq = CsvUtils.parse(line)
    val resultSeq = seq.map(s => s"($s)")
    Future(CsvUtils.write(resultSeq))
}

val format = CsvTransformFormat("log", "/tmp/edited/hoge.csv", "/tmp/downloaded/hoge.csv", transformLine, Some(header), "SJIS")

stream2string(new FileInputStream(format.tempEditedFileName), format) match {
    case Right(r) =>  r.run()
}

mapAsync

processCSVのmapAsyncというのは、そのメソッドの第二引数部分の処理を第一引数のthread数で実行できます。 AkkaStreamではこのように複数スレッドで実行できるので、重い処理でも素早く実行できます。

またmapAsyncは並行に実施しますが、順序は保証しているので、CSVの変換処理などにはもってこいです。

このmapAsyncは何個でも並べられるので、変換処理をプラグインのような形で、簡単に追加出来ます。

この自由に追加できて、処理を簡単に並列化できるのがAkkaStreamの最大の強みと言っていいでしょう。

mapAsyncのパフォーマンス周りの説明としては、以下のブログがとても丁寧に解説していますので、ぜひ参照してみてください。

Maximizing Throughput for Akka Streams

sink

akkaStreamではファイルへの吐き出しはFileIOというメソッドが用意されています。 akkaStreamで流れてきたストリームをファイルへ吐き出すというのをsinkという変数で定義しています。

toMat

最後に toMat(sink)(Keep.right) というのは、to materializeの略で実際に実行するのではなく、コンテナにしてrun()を実行するためのパッケージとする処理です。

以下にMatの詳しい解説があります。

[Akka-Stream]Matって何? - Qiita

まとめ

今回は実際にAkkaStreamを使ってCSVの変換処理を作って見ました。 実際の実装を見て使いやすさが分かっていただけたでしょうか?

今回はCSVの変換でしたが、簡単にプラグインをつなぎ合わせることができるので、もっと複雑なことも簡単に作れると思います。

ぜひみなさんもAkkaStreamを使ってみてください。

実際のコードは気が向いたら年末にGitHubにあげようかなw

それでは!