Screaming Loud

研究・プログラミングなど気づいたことをメモをしています。読書記録はこちらに記載しています。https://bookmeter.com/users/75944

2017年に読んだ本のまとめ

今年もやっていきます。

昨年のリンクはこちら→ yuutookun.hatenablog.com

今年の読んだ冊数は42冊。 去年と同じくらいですね。

年の後半になって面白いと思う本が減ってしまって本自体の読む量が減ってしまったというのが今年の印象です。

前半のほうは2016年に引き続き、組織をどうするかという話が多かった気がします。 その中でも印象的だったのは、 bookmeter.com

bookmeter.com

です。

働くモチベーションってどこから来るんだろう?みたいなことを考えさせられたものでした。 面白くて社内の勉強会で発表するほどでしたね。 以下そのときのスライドです。

How to Build a Team

あとこれは自分の中で久しぶりにヒットした本でした。 bookmeter.com これもかなり面白くて社内の勉強会で発表しました。

スライドはこちら エンジニアのためのマーケティング

このスライドはマーケティングのメディアに何故か紹介されてPVがすごいあがってましたw

あとこれから自分はどうしていくべきなのだろうか?みたいな大きい視野で考える材料になったのは、以下2冊ですね。

bookmeter.com

bookmeter.com

今年は色々モヤモヤ悩んでいたので、上記2冊はいい思考のたたき台になったと思っています。

さて、2018年はどんな本を読んでいこうかな?

AkkaStreamで簡単にCSVファイル変換を行うことはできるか?

Scala Advent Calendar 2017の24日目、クリスマス前夜です。
前日はiTakeshi@githubさんのScalaメタプログラミング今昔物語 - 本編でした。

私はドラクエ11をプレイしており、今クリア後の世界を楽しむというクリスマス感が全くない日を過ごしています。

さて今回は、簡単に大きなデータのCSVファイルを簡単に変換する処理を実装してみたいと思います。

実装としては、
「S3からファイルを受け取ってそのファイルを変換しつつローカルに保存する」
というのをakka streamで実装してみたいと思います。

Akka streamとはなんぞや?という方は以下を読んでみてください

Akka Stream についての基礎概念 - Qiita

やりたいこと

ローカルのファイルを変換して保存し直す

  • 入力:CSVファイル
  • 出力:名寄せなど色々変換した後のCSVファイル

動かすマシンは低めでメモリで動かすため、なるべくメモリに載せないでストリーム処理で変換したい。

実装

ローカルのファイルのInputStreamを入力に受け付けて、変換処理を行いローカルに保存するというメソッドです。

  • 入力:java.io.InputStream, 変換に必要な情報
  • 出力:実行可能なマテリアル

下記のようにそれぞれの処理が分けられるというのがAkkaStreamの強みです。

  • stream:入力
  • processCSV:変換処理
  • sink:保存処理

実際にコードを見てみましょう

import java.nio.file.{ Paths, StandardOpenOption }
import akka.actor.ActorSystem
import akka.stream.{ ActorMaterializer, IOResult }
import akka.stream.scaladsl.{ FileIO, Flow, Keep, RunnableGraph, Sink, Source }
import akka.util.ByteString

implicit val system: ActorSystem = ActorSystem()
implicit val ec: ExecutionContext = system.dispatcher
implicit val mat = AkkaUtils.materializer(system)

case class CsvTransformFormat(
  rawFilePath: String, // S3のファイルのパス
  tempEditedFileName: String, // 変換後のファイルパス
  tempDownloadPath: String, // 変換前のファイルパス
  transformLine: String => Future[String], // 変換するメソッド
  headerLine: Option[String] = None, // CSVのヘッダー
  charset: String = "UTF-8"
)

def stream2string(csvStream: InputStream, format: CsvTransformFormat): RunnableGraph[Future[IOResult]] = {
    val stream = scala.io.Source.fromInputStream(csvStream, "UTF-8")

    val processCsv = Source
        .fromIterator(() => stream.getLines()) // InputStreamの入力
        .mapAsync(threadNum)(errorLogHandler(format.transformLine)) // threadNum数の並行処理で変換処理を実行
        .filter(_.nonEmpty) // 空のlineは除外
        .map(s => ByteString(s + "\n", format.charset)) // streamを登録できるようにByteStringに変換

    import StandardOpenOption._
    val sink = FileIO.toPath(Paths.get(filename), options = Set(APPEND)) // ファイルに追記

    processCsv.toMat(sink)(Keep.right) 
}

def transformLine(line: String): Future[String] = {
    val seq = CsvUtils.parse(line)
    val resultSeq = seq.map(s => s"($s)")
    Future(CsvUtils.write(resultSeq))
}

val format = CsvTransformFormat("log", "/tmp/edited/hoge.csv", "/tmp/downloaded/hoge.csv", transformLine, Some(header), "SJIS")

stream2string(new FileInputStream(format.tempEditedFileName), format) match {
    case Right(r) =>  r.run()
}

mapAsync

processCSVのmapAsyncというのは、そのメソッドの第二引数部分の処理を第一引数のthread数で実行できます。 AkkaStreamではこのように複数スレッドで実行できるので、重い処理でも素早く実行できます。

またmapAsyncは並行に実施しますが、順序は保証しているので、CSVの変換処理などにはもってこいです。

このmapAsyncは何個でも並べられるので、変換処理をプラグインのような形で、簡単に追加出来ます。

この自由に追加できて、処理を簡単に並列化できるのがAkkaStreamの最大の強みと言っていいでしょう。

mapAsyncのパフォーマンス周りの説明としては、以下のブログがとても丁寧に解説していますので、ぜひ参照してみてください。

Maximizing Throughput for Akka Streams

sink

akkaStreamではファイルへの吐き出しはFileIOというメソッドが用意されています。 akkaStreamで流れてきたストリームをファイルへ吐き出すというのをsinkという変数で定義しています。

toMat

最後に toMat(sink)(Keep.right) というのは、to materializeの略で実際に実行するのではなく、コンテナにしてrun()を実行するためのパッケージとする処理です。

以下にMatの詳しい解説があります。

[Akka-Stream]Matって何? - Qiita

まとめ

今回は実際にAkkaStreamを使ってCSVの変換処理を作って見ました。 実際の実装を見て使いやすさが分かっていただけたでしょうか?

今回はCSVの変換でしたが、簡単にプラグインをつなぎ合わせることができるので、もっと複雑なことも簡単に作れると思います。

ぜひみなさんもAkkaStreamを使ってみてください。

実際のコードは気が向いたら年末にGitHubにあげようかなw

それでは!

AeroSpikeのベンチマークツールの導入

AeroSpikeには簡単な負荷試験ツールがあり、AeroSpike社自身で提供しています。

github.com

そもそもAeroSpikeって?はこちら

Aerospike基本のき

toolのダウンロード

今回はCentOSでのインストールを行います。 ほかのものであれば、mavenのインストールを適宜変えてください。

mavenのセットアップ

$ sudo wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
$ sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo
$ sudo yum install -y apache-maven
$ mvn --version

aero clientのセットアップ

$ git clone git@github.com:aerospike/aerospike-client-java.git
$ cd aerospike-client-java
$ ./build_all
$ cd benchmarks
$ mvn package 

benchmarkを打ち込む

insert only

./run_benchmarks -h 127.0.0.1 -p 3000  -n test -s testset -k 100000 -latency "7,1" -S 1  -o S:100  -w I   -z 8

上記のパラメータは

  • namespace: test
  • set: testset
  • keys: 100000 records
  • latency: latencyの表示のされ方 7ブロック, 21ms ずつ
  • Starting key: 1
  • Object: 100bytesのString
  • workload: Insert only
  • threads: 8

read, update

./run_benchmarks -h 127.0.0.1 -p 3000  -n test -s testset -k 100000 -latency "7,1" -S 1  -o S:100  -w RU,60   -z 8

上記のパラメータは

  • namespace: test
  • set: testset
  • keys: 100000 records
  • latency: latencyの表示のされ方 7ブロック, 21ms ずつ
  • Starting key: 1
  • Object: 100bytesのString
  • workload: Read 60%, Update 40%
  • threads: 8

実際に流してAMCで確認

以下は3台に対して、上記のread, updateの負荷をかけたときのAMCです。 f:id:yuutookun:20171124200911p:plain

まとめ

負荷試験が手軽にできるのはいいですね。

ISUCON2017に初参加した

初参加は結果惨敗でした。チーム名は「Ta-da!」 Slackのロード画面で出てきたので、それを使いました。

以下が最後の結果 f:id:yuutookun:20171024145958p:plain

両日では、69位のようですね。 ISUCON7 オンライン予選 全ての順位とスコア(参考値) : ISUCON公式Blog

事前準備

会社のメンバーで参加したので、社内Slackを作成。 計測が大事なので、計測ツールだけ選定しておきました。

  • logの調査:alp
  • slow-query: pt-query-digest
  • Gitリポジトリの作成
  • ISUCONの過去の講評を読む

本番にやったこと

担当割り振りは、インフラ、アプリ、アプリという構成で行いました。 自分がPythonを書けることから、言語はPythonで選定されていました。

序盤

インフラ担当が、ssh周りや地盤を固めてくれてたので、 自分はデプロイスクリプトなどアプリ周りの足回りを固めていました。

【自分の中での失敗】

  1. アプリケーションの挙動の把握にものすごい時間がかかった。 序盤で、2~3時間くらい開発環境を整えるのに時間を食うという失態。 マニュアルを読むべきでした。。。 そもそもsystemctlで起動していたのに、gunicornを直接触ろうとしていた。

  2. ローカルで動かせるようにしていなかった。 これもちょっと修正して、デプロイして動くか確認。という果てしない行動を行っていて、ローカルで動かしていればそんなdebug一瞬だったのにという。。

やっとローカル環境を構築して、開発がちゃんと動き始めたのが、4:30くらいから。。

中盤

やっとおれたちのISUCONが4:00くらいから始まる。

/messagesと/fetchと/iconsが遅かったので、 分担を

  • 自分が/iconsの対応
  • もう一人が/messagesと/fetchの対応

と振りました。 この時点で、よくわからんsleepは消しとこ。ってなってsleepは消してしまった。。。

iconsの対応は

  1. profileで更新したiconをDBではなく、ファイルに吐く
  2. 既存のアイコンを全部引き抜く
  3. read先をファイルのアイコン、なければDB参照

の3ステップで対応しました。

それまでずっと4桁だったのが、この上記3つの対応を終えた瞬間に5万まであがり、さらにnginxの設定でexpiresを変更してから、7万くらいまであがりました。 Maxの値ですね。

あとは、N+1問題を解決するために、メモリに載せ2クエリにしました。

ここで、3台の構成を キャッシュ(nginx)→Web(Python)→MySQL という構成にしました。

終盤

その後ベンチを掛けるたびにスコアがおちていってしまいした。

ただベンチを掛けるとCPUがPythonで詰まってしまい、そもそも毎回userに問い合わせる必要なくね。 となって、20:00くらいからuserをglobalのdictに載っけようということで、作り直したんですが、時間的に間に合わなくデプロイできず。 途中まで作ってもベンチを掛けたらバリデーションでスコアがガタ落ちしたので、戻しました。

まとめ

総じて、面白かったです。 本戦に行けなかったですが、また参加できればしたいと思います。

次回への教訓としては、

「マニュアルをしっかり読む」

ですね。 ありがとうございました。