かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Stream Analytics入門してみた #azurejp

最近頑張ってるStream Analyticsですがここらへんでまとめてみようと思います。

Stream Analytics

ドキュメントにも書いてありますが、Stream Analyticsは大量のデータをリアルタイムに処理するための基盤になります。大量のデータのインプットには、これまた大量のデータを受けるためのイベントハブや、ストレージサービスのBlobなんかを指定できます。

Stream Analyticsは、SQLライクな言語でリアルタイムに流れてくるデータを処理できます。個人的な感想ではSQL + 時間処理関数といった感じです。時間という概念を入れることで最近流行のリアクティブプログラミングと同じような感覚でリアルタイムデータを処理することが出来ます。

処理した結果「SQL Database」「ストレージサービスのBlob」「ストレージサービスのTable」「イベントハブ」「Power BI」「Service Busのキュー」「Service Busのトピック」に出力することが出来ます。やろうと思えばイベントハブのデータをStream Analyticsで処理して、イベントハブに出力して、さらに次のStream Analyticsで処理するといったように繋げていくことも可能です。

Stream Analyticsのクエリ

さて、この記事で本題として扱いたいクエリについて書いていきたいと思います。SQLライクなクエリと書きましたが、Stream Analyticsのクエリは基本的に以下のような形になります。

SELECT
    *
INTO
    Output
FROM
    Input

SELECTで入力の内容を射影して、INTOで出力先を指定します(この場合Outputが出力先)。FROMでデータの入力元を指定します。入力や出力はイベントハブのポータルで定義することが出来ます。上記のクエリはInputという名前で定義した入力の内容をOutputという名前で定義した出力に対して全ての列(*で指定してる)を、そのまま流すというクエリになります。

これだけでも、入力にイベントハブ、出力にテーブルストレージを指定することでイベントハブに叩き込まれたデータをテーブルストレージに叩き込むという実用的なクエリになります。クラウドサービスのワーカーロールを立ててイベントハブのデータを延々とテーブルストレージに吐き出す処理を書くよりも遥かにお手軽になります。これだけでも、個人的にStream Analytics使うのはありかなと思っています。

さて、このままデータを右から左に流すだけではつまらないのでもうちょっとクエリに踏み込んでみたいと思います。SQLでお馴染みのWHERE句を使うことでデータを絞り込むことが出来ます。例えば、Valueという列がInputにあるとして、Valueが10以上のものだけに絞り込む場合は以下のようになります。

SELECT
    *
INTO
    Output
FROM
    Input
WHERE
    Value >= 10

SQLと同じで簡単ですね。それでは、次に時間という概念を持ち込んでみようと思います。Stream Analyticsのデータは延々と流れてくるデータを処理するため、グルーピングなどの処理をしようとしたらある一定時間で見切りをつけないと、いつまでたってもグルーピングが終わらないことになります。そのため、データのグルーピングをして処理をするときには、SQLと同じGROUP BY句を使うのですが、そこで時間での絞込が必須になります。

時間での絞込はWindow関数で行います。Window関数には以下の3種類があります。

  • TumbingWindow
  • HoppingWindow
  • SlidingWindow

順番に見ていきます。

TumbingWindow

これは、指定した時間で単純にグルーピングする関数になります。TumbingWindowは、TumbingWindow(second, 10)のように書くと10秒ごとにデータをまとめます。

(-が1つで1秒だとすると)
Data Stream              : ---o---o---o---o---o---o---o---o---o---o---...
↓
TumbingWindow(second, 10): ---o---o--
                                     -o---o---o
                                               ---o---o--
                                                         -o---o---o
                                                                   ---...

シンプルですね。

HoppingWindow

これは、指定した時間で、指定したインターバルでグルーピングします。HoppingWindow(second, 10, 5)とすると10秒のWindowを5秒間隔で作成します。

(-が1つで1秒だとすると)
Data Stream                 : ---o---o---o---o---o---o---o---...
↓
HoppingWindow(second, 10, 5): ---o---o--
                                   --o---o---
                                        -o---o---o
                                             o---o---o-
                                                  ---o---o--
                                                       --o---...

となります。Window同士が重なるのが特徴ですね。また、3番目の引数を2番目の引数より大きくすることでWindow同士の間をあけることができます。こんな感じに、HoppingWindow(second, 10, 20)。

(-が1つで1秒だとすると)
Data Stream                  : ---o---o---o---o---o---o---o---...
↓
HoppingWindow(second, 10, 20): ---o---o--
                                                   ---o---o--
                                                              ...

TumblingWindowと比べて柔軟にWindowを指定できます。

SlidingWindow

最後はSlidingWindowです。このWindow関数は、名前の通り指定した間隔のWindowがず~っと横滑りしていくイメージです。Windowは、値が出たり入ったりしたタイミングで作成されます。例えばSlidingWindow(second, 10)とすると以下のようになります。

(-が1つで1秒だとすると)
Data Stream                 : ---o---o---o---o...
↓
SlidingWindow(second, 10, 5): ---o
                              ---o---o--
                                -o---o---o
                                  ---o---o--
                                   --o---o--o

データが指定した時間内で出入りするたびに最新のデータの状況を知りたいときとかに使える関数になります。(ここちょっと自信ないので間違ってたら誰か教えて!)

Timeunit

Window系関数の第一引数に指定してたsecondですが、ここには時間の単位を指定します。秒を表すsecond以外にも以下のような値が指定できます。

かっこの中は省略形

  • day(dd, d)
  • hour(hh)
  • minute(mi, n)
  • second(ss, s)
  • millisecond(ms)
  • microsecond(mcs)

それぞれ、見た通りの時間になります。日、時、分、秒、ミリ秒、マイクロ秒です。

グルーピングの続き

Window関数の動きがわかったので使ってみます。ここでは一番簡単なTumbingWindowを例にしてクエリを書いてみます。

SELECT
    COUNT(*) AS CNT
INTO
    Output
FROM
    Input
GROUP BY TumbingWindow(second, 10)

Inputで流れた来た値を10秒ごとにグルーピングして数を数えています。例えば、10秒に100個ずつのデータが流れて来たら、このクエリの結果はCNTという列に100という値が入った結果を10秒ごとにOutputに書き出す動きになります。

ここで使ってるCOUNT関数は、SQLでお馴染みの数を数える関数です。集計系の関数は、以下のドキュメントにあるようなものが使えます。SQLでお馴染みの関数は大体使える感じです。

Aggregate Functions

列でのグルーピング

時間でグルーピングしたうえで、SQLと同じように列でのグルーピングもできます。InputにIDとValueという列があるとしてIDでグルーピングしてValueの最大値を求めるという場合は以下のようになります。

SELECT
    ID,
    MAX(Value)
INTO
    Output
FROM
    Input
GROUP BY TumbingWindow(second, 10), ID

SQLを知っていればなんてことはないクエリだと思います。

INTO

INTOって省略するとOutputという名前に対してデフォルトで出力します。なので、これまでのクエリは全部INTO省略できます。ということでこれからあとは省略して書きます。

タイムスタンプ

さて、これまでTumbingWindowなどで時間で区切ったりしてきましたが、このときなんの時間が使われるかというと、入力のソースによって(といってもイベントハブかBlobしかないですが)変わります。イベントハブでは、イベントハブで処理された時間でBlobの場合はBlobの更新時間になります。ただ、一般的にデータの発生時間とは異なってしまうため、データにISO 8601形式(yyyy-MM-ddTHH:mm:ss.fffffffzzzの形式)の日付の列を作って、そいつをタイムスタンプに指定するのがベストプラクティスとされています。例えばInputにTimeという列があって、そこにイベントが発生した時間が入っている場合、以下のようにしてイベントが本当に発生した時間でデータを処理することが出来るようになります。

SELECT
    ID,
    MAX(Value)
INTO
    Output
FROM
    Input TIMESTAMP BY Time
GROUP BY TumbingWindow(second, 10), ID

TIMESTAMP BYがポイントです。

ここで指定したタイムスタンプは、SELECT句の中でSystem.Timestampと書くことでアクセスできます。

SELECT
    ID,
    MAX(Value),
    System.Timestamp AS Time
INTO
    Output
FROM
    Input TIMESTAMP BY Time
GROUP BY TumbingWindow(second, 10), ID

System.Timestampは、現在のシステム時間でないという点だけは注意しておいてください。

結合

次は、結合について書きたいと思います。SQLと同じで内部結合と外部結合があります。グルーピングと同じで、結合も無限の時間軸上の長さを持つデータ同士を結合しようとするといつまでたっても終わらないので、時間で区切る必要があります。そのために使う関数としてDateDiffという関数があります。第一引数にtimeunitを渡して第二引数、第三引数に結合する入力データのエイリアスを指定します。そうすると第三引数から第二引数を引いた結果の時間が返ります。これに対してBETWEEN A AND Bを使って時間の範囲を指定します。文章でごちゃごちゃ書くよりクエリ書いたほうがわかりやすいと思いますので書いていきます。例えばInput1とInput2をIDという列同士で結合する場合を考えてみます。このとき、1~30秒の間の差のデータを対象に結合したいと思います。

SELECT
    i1.ID AS ID,
    i1.Value AS i1Value,
    i2.Value AS i2Value,
    System.Timestamp AS Time
FROM
    Input1 i1
    JOIN Input2 i2 ON DateDiff(second, i1, i2) BETWEEN 1 AND 30 AND i1.ID = i2.ID

DateDiffの意味さえわかってしまえば簡単だと思います。

複数のクエリの定義

WITH句を使うことでクエリ1つの長いクエリを定義するのではなく、複数の小さなクエリに分割して記述することができます。例えば、最初のステップで10以下のValueを切り捨てた上でWindow化して集計するといったことが出来ます。

WITH Step1 AS (
    SELECT * FROM Input WHERE Value > 10
)
SELECT 
    COUNT(*) AS CNT
FROM
    Step1
GROUP BY TumbingWindow(second, 10)

WITHには複数のクエリを書くことも出来ます。

WITH Step1 AS (
    SELECT ....
),
Step2 AS (
    SELECT ...
)
...

SELECT * FROM Step1 s1 JOIN Step2 s2 ON ...

複数の出力を使う

クエリを複数書くことで単純に複数の出力先にデータを出力できます。

SELECT * INTO Output1 FROM Input WHERE Value > 10
SELECT * INTO Output2 FROM Input WHERE Value <= 10

SQLには無い集計関数

SQLには無い集計関数を1つ最後に紹介したいと思います。

CollectTopという関数は、Window関数でグルーピングした中のデータをソートする関数になります。例えば、10秒間隔でグルーピングしたデータに対してValueで並び替えるといった場合以下のようになります。

SELECT
    ID,
    CollectTop(2) OVER (ORDER BY Value) AS Top2
FROM
    Input TIMESTAMP BY Time
GROUP BY TumbingWindow(second, 10), ID

ただ、このままだとTop2列は配列になってしまうのでフラットなデータ出力先であるテーブルストレージやSQL Databaseに出力できません。そんなときは、WITHを使ってCollectTopをするクエリと、その結果を平らにならすクエリを書けばいいです。LINQなどでいうSelectManyに該当するCROSS APPLYという関数があります。こいつを使うと以下のように書けます。

WITH Step1 AS (
    SELECT
        ID,
        CollectTop(2) OVER (ORDER BY Value) AS Top2
    FROM
        Input TIMESTAMP BY Time
    GROUP BY TumbingWindow(second, 10), ID
)
SELECT
    flat.ArrayValue.Value.ID,
    flat.ArrayValue.Value.Value,
    flat.ArrayValue.Value.Time
FROM
    Step1 CROSS APPLY GetElements(Step1.Top2) AS flat

CROSS APPLY GetElements(配列の列)という流れになります。平らにならしたあとは、ArrayValue.Valueで元の値(ここでいうInputの値)にアクセスできます。

クエリパターン

これくらいのことがわかれば、以下のページのクエリパターンを通して代表的なクエリの書き方やここで紹介していない関数についてもなんとなくわかってくると思います。(最後のクエリを除く)

azure.microsoft.com

また、クエリのリファレンスは以下のURLにあるので、そこで関数(たいした数はないので全部見るといいと思います)を調べることが出来ます。

Stream Analytics クエリ言語リファレンス

あと、公式のドキュメントには目を通しておくと捗ります。

azure.microsoft.com