Screaming Loud

日々是精進

AkkaStreamで簡単にCSVファイル変換を行うことはできるか?

Scala Advent Calendar 2017の24日目、クリスマス前夜です。
前日はiTakeshi@githubさんのScalaメタプログラミング今昔物語 - 本編でした。

私はドラクエ11をプレイしており、今クリア後の世界を楽しむというクリスマス感が全くない日を過ごしています。

さて今回は、簡単に大きなデータのCSVファイルを簡単に変換する処理を実装してみたいと思います。

実装としては、
「S3からファイルを受け取ってそのファイルを変換しつつローカルに保存する」
というのをakka streamで実装してみたいと思います。

Akka streamとはなんぞや?という方は以下を読んでみてください

Akka Stream についての基礎概念 - Qiita

やりたいこと

ローカルのファイルを変換して保存し直す

  • 入力:CSVファイル
  • 出力:名寄せなど色々変換した後のCSVファイル

動かすマシンは低めでメモリで動かすため、なるべくメモリに載せないでストリーム処理で変換したい。

実装

ローカルのファイルのInputStreamを入力に受け付けて、変換処理を行いローカルに保存するというメソッドです。

  • 入力:java.io.InputStream, 変換に必要な情報
  • 出力:実行可能なマテリアル

下記のようにそれぞれの処理が分けられるというのがAkkaStreamの強みです。

  • stream:入力
  • processCSV:変換処理
  • sink:保存処理

実際にコードを見てみましょう

import java.nio.file.{ Paths, StandardOpenOption }
import akka.actor.ActorSystem
import akka.stream.{ ActorMaterializer, IOResult }
import akka.stream.scaladsl.{ FileIO, Flow, Keep, RunnableGraph, Sink, Source }
import akka.util.ByteString

implicit val system: ActorSystem = ActorSystem()
implicit val ec: ExecutionContext = system.dispatcher
implicit val mat = AkkaUtils.materializer(system)

case class CsvTransformFormat(
  rawFilePath: String, // S3のファイルのパス
  tempEditedFileName: String, // 変換後のファイルパス
  tempDownloadPath: String, // 変換前のファイルパス
  transformLine: String => Future[String], // 変換するメソッド
  headerLine: Option[String] = None, // CSVのヘッダー
  charset: String = "UTF-8"
)

def stream2string(csvStream: InputStream, format: CsvTransformFormat): RunnableGraph[Future[IOResult]] = {
    val stream = scala.io.Source.fromInputStream(csvStream, "UTF-8")

    val processCsv = Source
        .fromIterator(() => stream.getLines()) // InputStreamの入力
        .mapAsync(threadNum)(errorLogHandler(format.transformLine)) // threadNum数の並行処理で変換処理を実行
        .filter(_.nonEmpty) // 空のlineは除外
        .map(s => ByteString(s + "\n", format.charset)) // streamを登録できるようにByteStringに変換

    import StandardOpenOption._
    val sink = FileIO.toPath(Paths.get(filename), options = Set(APPEND)) // ファイルに追記

    processCsv.toMat(sink)(Keep.right) 
}

def transformLine(line: String): Future[String] = {
    val seq = CsvUtils.parse(line)
    val resultSeq = seq.map(s => s"($s)")
    Future(CsvUtils.write(resultSeq))
}

val format = CsvTransformFormat("log", "/tmp/edited/hoge.csv", "/tmp/downloaded/hoge.csv", transformLine, Some(header), "SJIS")

stream2string(new FileInputStream(format.tempEditedFileName), format) match {
    case Right(r) =>  r.run()
}

mapAsync

processCSVのmapAsyncというのは、そのメソッドの第二引数部分の処理を第一引数のthread数で実行できます。 AkkaStreamではこのように複数スレッドで実行できるので、重い処理でも素早く実行できます。

またmapAsyncは並行に実施しますが、順序は保証しているので、CSVの変換処理などにはもってこいです。

このmapAsyncは何個でも並べられるので、変換処理をプラグインのような形で、簡単に追加出来ます。

この自由に追加できて、処理を簡単に並列化できるのがAkkaStreamの最大の強みと言っていいでしょう。

mapAsyncのパフォーマンス周りの説明としては、以下のブログがとても丁寧に解説していますので、ぜひ参照してみてください。

Maximizing Throughput for Akka Streams

sink

akkaStreamではファイルへの吐き出しはFileIOというメソッドが用意されています。 akkaStreamで流れてきたストリームをファイルへ吐き出すというのをsinkという変数で定義しています。

toMat

最後に toMat(sink)(Keep.right) というのは、to materializeの略で実際に実行するのではなく、コンテナにしてrun()を実行するためのパッケージとする処理です。

以下にMatの詳しい解説があります。

[Akka-Stream]Matって何? - Qiita

まとめ

今回は実際にAkkaStreamを使ってCSVの変換処理を作って見ました。 実際の実装を見て使いやすさが分かっていただけたでしょうか?

今回はCSVの変換でしたが、簡単にプラグインをつなぎ合わせることができるので、もっと複雑なことも簡単に作れると思います。

ぜひみなさんもAkkaStreamを使ってみてください。

実際のコードは気が向いたら年末にGitHubにあげようかなw

それでは!

AeroSpikeのベンチマークツールの導入

AeroSpikeには簡単な負荷試験ツールがあり、AeroSpike社自身で提供しています。

github.com

そもそもAeroSpikeって?はこちら

Aerospike基本のき

toolのダウンロード

今回はCentOSでのインストールを行います。 ほかのものであれば、mavenのインストールを適宜変えてください。

mavenのセットアップ

$ sudo wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
$ sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo
$ sudo yum install -y apache-maven
$ mvn --version

aero clientのセットアップ

$ git clone git@github.com:aerospike/aerospike-client-java.git
$ cd aerospike-client-java
$ ./build_all
$ cd benchmarks
$ mvn package 

benchmarkを打ち込む

insert only

./run_benchmarks -h 127.0.0.1 -p 3000  -n test -s testset -k 100000 -latency "7,1" -S 1  -o S:100  -w I   -z 8

上記のパラメータは

  • namespace: test
  • set: testset
  • keys: 100000 records
  • latency: latencyの表示のされ方 7ブロック, 21ms ずつ
  • Starting key: 1
  • Object: 100bytesのString
  • workload: Insert only
  • threads: 8

read, update

./run_benchmarks -h 127.0.0.1 -p 3000  -n test -s testset -k 100000 -latency "7,1" -S 1  -o S:100  -w RU,60   -z 8

上記のパラメータは

  • namespace: test
  • set: testset
  • keys: 100000 records
  • latency: latencyの表示のされ方 7ブロック, 21ms ずつ
  • Starting key: 1
  • Object: 100bytesのString
  • workload: Read 60%, Update 40%
  • threads: 8

実際に流してAMCで確認

以下は3台に対して、上記のread, updateの負荷をかけたときのAMCです。 f:id:yuutookun:20171124200911p:plain

まとめ

負荷試験が手軽にできるのはいいですね。

ISUCON2017に初参加した

初参加は結果惨敗でした。チーム名は「Ta-da!」 Slackのロード画面で出てきたので、それを使いました。

以下が最後の結果 f:id:yuutookun:20171024145958p:plain

両日では、69位のようですね。 ISUCON7 オンライン予選 全ての順位とスコア(参考値) : ISUCON公式Blog

事前準備

会社のメンバーで参加したので、社内Slackを作成。 計測が大事なので、計測ツールだけ選定しておきました。

  • logの調査:alp
  • slow-query: pt-query-digest
  • Gitリポジトリの作成
  • ISUCONの過去の講評を読む

本番にやったこと

担当割り振りは、インフラ、アプリ、アプリという構成で行いました。 自分がPythonを書けることから、言語はPythonで選定されていました。

序盤

インフラ担当が、ssh周りや地盤を固めてくれてたので、 自分はデプロイスクリプトなどアプリ周りの足回りを固めていました。

【自分の中での失敗】

  1. アプリケーションの挙動の把握にものすごい時間がかかった。 序盤で、2~3時間くらい開発環境を整えるのに時間を食うという失態。 マニュアルを読むべきでした。。。 そもそもsystemctlで起動していたのに、gunicornを直接触ろうとしていた。

  2. ローカルで動かせるようにしていなかった。 これもちょっと修正して、デプロイして動くか確認。という果てしない行動を行っていて、ローカルで動かしていればそんなdebug一瞬だったのにという。。

やっとローカル環境を構築して、開発がちゃんと動き始めたのが、4:30くらいから。。

中盤

やっとおれたちのISUCONが4:00くらいから始まる。

/messagesと/fetchと/iconsが遅かったので、 分担を

  • 自分が/iconsの対応
  • もう一人が/messagesと/fetchの対応

と振りました。 この時点で、よくわからんsleepは消しとこ。ってなってsleepは消してしまった。。。

iconsの対応は

  1. profileで更新したiconをDBではなく、ファイルに吐く
  2. 既存のアイコンを全部引き抜く
  3. read先をファイルのアイコン、なければDB参照

の3ステップで対応しました。

それまでずっと4桁だったのが、この上記3つの対応を終えた瞬間に5万まであがり、さらにnginxの設定でexpiresを変更してから、7万くらいまであがりました。 Maxの値ですね。

あとは、N+1問題を解決するために、メモリに載せ2クエリにしました。

ここで、3台の構成を キャッシュ(nginx)→Web(Python)→MySQL という構成にしました。

終盤

その後ベンチを掛けるたびにスコアがおちていってしまいした。

ただベンチを掛けるとCPUがPythonで詰まってしまい、そもそも毎回userに問い合わせる必要なくね。 となって、20:00くらいからuserをglobalのdictに載っけようということで、作り直したんですが、時間的に間に合わなくデプロイできず。 途中まで作ってもベンチを掛けたらバリデーションでスコアがガタ落ちしたので、戻しました。

まとめ

総じて、面白かったです。 本戦に行けなかったですが、また参加できればしたいと思います。

次回への教訓としては、

「マニュアルをしっかり読む」

ですね。 ありがとうございました。

プロダクトのビジョンを決める

ビジョンとは

企業の経営を行う上で、経営におけるビジョンを設定するということは最近ではごく当たり前のことになっている。

そもそも ビジョンとは何か? 以下コトバンクによると

将来のある時点でどのような発展を遂げていたか、成長していたいかなどの構想や未来像。またそれらを文章などで描いたもの。会社全体の未来像を経営ビジョン、事業の未来像は事業ビジョン、組織は組織ビジョンなどと呼ばれる。また個人の将来像を指してキャリアビジョン、自己成長ビジョンなどということもある。

ビジョン(びじょん)とは - コトバンク

ではなぜビジョンを描くのか? 以下の考察が面白い。

会社のビジョンはなぜ大切なのか – Koichiro Honda – Medium

ただ企業のビジョンの場合、たいていふわっとしていてプロダクトの方向性まで定義できないことが多い。 急成長の理由が分かる?注目スタートアップ23社の洗練されたビジョン・ミッションまとめ | CyberTimes [シバタイムス]

企業のビジョンは決めていてもプロダクトのビジョンまでどうするかまでしっかり考えているところはどれくらいいるのだろうか?

以下の理由から企業のビジョンだけではなく、プロダクトのビジョンも作り浸透させる必要があると自分は考えている。

  • 作っているメンバーの士気を上げる
  • プロダクトの方向性を浸透させる
  • 社内外へのプロダクトのファンを増やす

また、ビジョンの決め方として、自分たちがやりたいことではなく、ユーザ視点でどう価値を提供するかということを言語化するほうがいいと思っている。

メンバーの士気

働く人たちは何をモチベーションにしてプロダクトを作っているのか? もちろんスタートから作っている人たちは、自分のプロダクト大きくしたいとか単純な思考でモチベーションが上がる。 では途中からジョインする人たちにとっての「そのプロダクトをどうして作りたいか?」という視点に立つとどうだろう?

  1. 給料
  2. そのプロダクトが先進的で面白い(成長できるか)
  3. 社会で一番価値を提供できているか(このプロダクトを作る意義)

上記3つはどれも重要だ。ただ1や2は目に見えるものなのでわかりやすく設定できるが、3はなかなか分かりづらい。

しかし3をしっかり規定しなければ、今の時代給料であれば出せばいいし、技術であれば大体導入できてしまう(もちろんそこでの差別化は可能)ので、モチベーションの差別化が難しい。

プロダクトの方向性

またイノベーションという観点でも、ビジョンを策定する必要がある。

プロダクトを作り始める上で、まずターゲットユーザ層を決めるのは当たり前であるし、ペルソナを決めたり、どういうプロダクトにするかを決めるはずだ。

しかし、そのプロダクトが長生きするほど、市場環境やテクノロジーが変わり、当初考えていたプロダクトの方向性から乖離していくということは多くあると思う。

そうなってくると、そのプロダクトの方向性を変えなければいけなくなる。 しかし漫然と作っていると、プロダクトの売上だけを見てしまい、そもそものイノベーションを起こせなくなってしまう。そして、気づけば時代遅れなものになっているということもある。

プロダクトのビジョンを考える上ですごく重要なことは、「使ってもらうユーザに対してなぜこのプロダクトを使ってもらうのか?」ということを決めることだ。

既存の延長線上の思考で単純な機能追加を繰り返していくだけでは、イノベーションは生まれないのである。 そう日本のリモコンのように。

「ガラパゴスリモコン」が示す日本のもの作りの限界~リモコンで飛行機でも飛ばしたいの? – アゴラ

常にそのプロダクトがどうあるべきか?ということを考え続けることこそ、プロダクトが進化していくことに繋がる。

ファンを作る

人間とは、ストーリーに共感するイキモノである。 ビジョンとは企業やプロダクトをどのようにしていくか、言い換えるとどのようなストーリーで作っていくかを定義することである。

日本の場合、外部に向けて共有されていないイメージだが、海外ではビジョンだけでなく、その沿革なども共有し、ファンを増やしているものが多い。

例)Trello

About | What is Trello?

締め

プロダクトのビジョンがまだ決まっていないのであれば、今一度ビジョンをチームで語り合ってみてはどうだろうか。