【Flutter】非同期ジェネレータ関数(async* yield)でリアルタイムデータ処理

対象者

  • Flutterでの非同期処理に課題を感じているエンジニア
  • リアルタイムデータや大量データを効率的に扱いたい開発者
  • 非同期ジェネレータ関数の実践的な使い方を学びたい方

はじめに

リアルタイムデータや大量のデータを効率的に扱う必要があり、Flutterでの非同期処理に悩んでいませんか?APIからのデータ取得や複雑なバックグラウンド処理を行う際、パフォーマンスを落とさずスムーズに動作させたいと思うエンジニアにとって、非同期ジェネレータ関数は強力な武器になります。しかし、その概念や使い方に不安を感じ、正しく実装できないという悩みを抱えている方も多いのではないでしょうか。

この記事では、非同期ジェネレータ関数(asynchronous generator function)の基本的な仕組みから実際の使い方までを分かりやすく解説します。具体的なコード例を交えながら、どのようにFlutterで効率的な非同期処理を行い、リアルタイムデータを取り扱うかを学んでいきます。

最近、LLMサーバ(ChatGPTのAPIみたいな)からのデータをリアルタイムで受け取って表示するケースが増え、 非同期ジェネレータ関数を使う機会が増えてきました。今まであまり使ってませんでしたが、リアルタイムでのデータ表示に非常に有効であることに気付きました。また、正しく動作するかを確認するためにテストも行い、活用方法を深く掘り下げることができました。この記事では、非同期ジェネレータ関数の使用例だけでなく、ストリームと通常の非同期処理を組み合わせた場合の違いについても比較できました。

非同期ジェネレータ関数とは

非同期ジェネレータ関数は、非同期処理とジェネレータの両方の特性を併せ持つ関数で、非同期のデータストリームを生成し、逐次的にデータを提供することができる機能です。これにより、大量のデータを一度に処理するのではなく、必要なタイミングで順次データを取得・処理できるため、パフォーマンスとメモリ効率を向上させます。

例えば、非同期処理を使ってAPIからデータを取得し、そのデータを順次処理する場合、通常は全てのデータが取得されるまで待機する必要があります。しかし、非同期ジェネレータ関数を使用すると、取得したデータを逐次的に扱いながら、残りのデータを非同期で待機することが可能です。この点で、非同期ジェネレータは、非同期処理の柔軟性と効率的なデータの取り扱いを可能にする重要な技術です。

非同期処理とジェネレータ関数の基礎

非同期処理は、時間のかかる操作をブロックせずに並行処理するための手法で、特にUIのレンダリングが止まるのを防ぐため、アプリケーション開発において重要です。非同期処理では、Futureasync/awaitを用いて、データの処理を待機しつつ他の作業を並行して行います。

一方、ジェネレータ関数は、関数の実行を途中で停止し、再開することで逐次的にデータを生成する関数です。Dartではyieldキーワードを使用して、データを一度に全て返すのではなく、必要なタイミングで部分的にデータを返すことが可能です。

非同期ジェネレータ関数は、これらの2つの基礎的な概念を組み合わせたものであり、非同期処理を行いながらデータを逐次生成します。たとえば、データベースやリモートAPIから大量のデータを取得する際、全てのデータが揃うまで待たずに、部分的に処理を進めることができるため、アプリの応答性を保ちながら効率的なデータ処理が可能となります。

非同期ジェネレータ関数の役割

非同期ジェネレータ関数の最大の役割は、非同期的にデータを処理しつつ、逐次的にそのデータを生成・提供することにあります。特に、大量のデータを扱うアプリケーションでは、メモリの節約やパフォーマンスの向上が必要であり、一度に全てのデータをメモリに読み込むことは非効率です。こういった場合、非同期ジェネレータ関数を使うと、部分的にデータを取得しながら処理を進めることができ、全体のパフォーマンスが向上します。

具体的な例として、Web APIからデータを逐次的に取得するシナリオを考えます。全てのデータを取得してから処理を始めるのではなく、非同期ジェネレータを使えば、取得したデータを順次処理しつつ、バックグラウンドで他のデータの取得を待機することが可能です。

非同期ジェネレータ関数の基本的な使い方

非同期ジェネレータ関数を活用するためには、まずStreamyieldawait forの概念を理解することが重要です。これらの基本的な構成要素を組み合わせることで、非同期ジェネレータ関数を効果的に使いこなせます。

Streamの概念と使い方

結論として、Streamは非同期で連続的にデータを提供する手段です。これは、複数の値を逐次的に生成し、その値をリスナーが受け取るための仕組みです。Streamは一度に全てのデータを返すのではなく、必要に応じて非同期でデータを返す点が特徴です。

Streamの利点としては、UIの応答性を保ちながら、非同期でリアルタイムデータや大量データを処理できることです。例えば、ネットワークからデータを逐次取得してUIに反映させたり、ユーザーのアクションに応じてイベントを処理する際に便利です。

実例として、以下のコードでは1秒ごとに整数を非同期で生成し、Streamを通じてリスナーに送信します。

Stream<int> numberStream() async* {
  for (int i = 1; i <= 5; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield i; // 値を順次返す
  }
}

このように、Streamを使うことで、UIスレッドをブロックすることなく非同期でデータを取得できます。

yieldの使い方

yield非同期ジェネレータ関数内でデータを逐次返すためのキーワードです。yieldを使うことで、関数の実行途中で一時的に制御を呼び出し元に返しつつ、値を送り返すことができます。これは、データを一度に処理するのではなく、部分的にデータを処理する際に特に有効です。

await forループでのデータ取得

await forループを使うことで、非同期ジェネレータ関数から生成されるデータを一つずつ順に処理することが可能です。await forは、非同期ジェネレータからのデータを逐次取得するためのループ処理で、同期的なforループの非同期バージョンと言えます。

このループを使うことで、Streamから順次値を取り出し、その値に対して処理を行うことができます。たとえば、以下のように記述します。

Future<void> processStream() async {
  await for (int value in numberStream()) {
    print('Received: $value');
  }
}

このコードでは、numberStream()から返された値をawait forを使って順に受け取り、各値をコンソールに出力しています。await forは、非同期処理を行う際にとても便利で、ストリームのすべての値が返されるまで処理を続けるため、リアルタイムデータや長時間にわたるデータ処理にも適しています。

実践例:非同期ジェネレータ関数の実装

非同期ジェネレータ関数の実装では、特にAPIデータの逐次取得が非常に役立ちます。非同期ジェネレータ関数を用いることで、大量のデータを一度に取得するのではなく、必要に応じてデータを非同期に逐次処理でき、アプリケーションのパフォーマンスやユーザー体験を向上させることが可能です。
例えば、API呼び出しで大量のリクエストを一度に処理するのではなく、データを少しずつ受け取りながら表示することで、ユーザーはアプリの応答性を保ったまま処理結果を確認できます。

WebAPIのモックを非同期処理と非同期ジェネレータ関数で作成

0.2秒ごとに文字列が出力されるチャットのようなシステムを想定したWeb APIのモックを作成しました。データはJSON形式で、resultというキーに文字列が入り、サーバー側で生成されてFlutterのアプリに送られるという形式です。このシステムを非同期処理と非同期ジェネレータ関数を使って実装し、違いを比較してみます。

非同期処理

まず、非同期処理でWeb APIのモックを作成しました。以下のコードでは、StreamControllerを使用し、0.2秒ごとに1文字ずつ出力する仕組みを構築しています。

Stream<String> fetchMockStream() {
  final controller = StreamController<String>();

  for (int index = 0; index < (messageData.length + 1); index++) {
    Future.delayed(Duration(milliseconds: 200 * index)).then((_) {
      if (index < messageData.length) {
        final map = <String, dynamic>{'result': messageData[index]};
        controller.sink.add(jsonEncode(map));
      } else {
        controller.close();
      }
    });
  }

  return controller.stream;
}

このコードでは、まずストリームを返す必要があります。そのためにStreamControllerを作成し、0.2秒ごとに文字列を追加しています。具体的には、Future.delayedを使用して、時間が経過した後に新しい文字をストリームに追加し、最後にストリームを閉じます。データはJSON形式で送信され、サーバー側で新しいメッセージが生成されるイメージです。

非同期ジェネレータ関数

一方、非同期ジェネレータ関数を使用した場合、コードはよりシンプルで読みやすくなります。以下はその実装です。

Stream<String> fetchMockStreamWithAsynchronousGenerator() async* {
  for (int index = 0; index < messageData.length; index++) {
    await Future.delayed(const Duration(milliseconds: 200));
    final map = <String, dynamic>{'result': messageData[index]};
    yield jsonEncode(map);
  }
}

非同期ジェネレータ関数では、async*yieldを使用して、処理をシンプルに書くことができます。ここでは、0.2秒待機してから目的の文字を出力し、次の文字の処理に進みます。この処理の流れは、非同期処理よりも直感的に理解でき、コード全体がスッキリした形でまとまっています。

非同期ジェネレータ関数を使うことで、非同期処理のコードよりも簡潔で直感的な実装が可能となりました。特に、0.2秒ごとに文字列を出力するというシンプルなタスクにおいては、非同期ジェネレータ関数が圧倒的に使いやすいと感じました。

Flutterアプリでの処理

Flutterアプリでサーバーからのデータを受け取り、アプリ内で処理する流れを紹介します。特に、Web APIから返されるJSONデータを効率的に扱い、表示したい情報を抽出する方法に焦点を当てています。実際のサーバーでは、データだけでなく、キーや統計情報などのメタデータが含まれていることが多いため、必要なデータを正確に取り出す処理が必要になります。

サーバからのデータの受け取り

まず、サーバーから送られてくるストリームデータをFlutterアプリ内で受け取る処理を行います。サーバーからはJSON形式でデータが返ってくることが一般的で、その中から必要なデータを取り出し、簡単に操作できるように整形します。

Stream<String> receiveData(Stream<String> stream) async* {
  await for (final String source in stream) {
    final map = jsonDecode(source);
    yield map['result'] ?? '';
  }
}

上記のコードでは、receiveData()メソッドを使用してストリームで受け取ったJSONデータを処理しています。jsonDecode()を使ってJSONをマップ形式に変換し、必要なデータである'result'キーの値だけを取り出しています。サーバーからのレスポンスには不要なメタデータが含まれることもあるため、ここで必要な部分だけを取り出し、整形して処理しやすい形式にしています。

受け取ったデータの処理

サーバーから受け取ったデータを受けた後、そのデータを実際に画面上に反映させる処理を行います。具体的には、ストリームをリッスンし、受け取ったデータをアプリのUIに表示する変数に追加しています。

void onTap(Stream<String> stream) {
  setState(() {
    _messages = '';
    _disableTap = true;
  });
  receiveData(stream).listen((data) {
    setState(() {
      _messages = _messages + data;
    });
  }, onDone: () {
    setState(() {
      _disableTap = false;
    });
  });
}

ここでの処理は、以下の流れになります。

  1. ボタンを押した際の処理の初期化

    • onTap()メソッドを呼び出すと、まずメッセージ表示用の変数_messagesを初期化し、ボタンが再度押されないように_disableTaptrueに設定します。
  2. データの受け取りとリッスン

    • receiveData()関数で受け取ったデータをlisten()を使ってリッスンし、ストリームからのデータを順次_messages変数に追加します。この際、既存のデータを消さずに新しいデータを追加するため、_messages = _messages + dataという形で文字列が追記されていきます。
  3. 処理完了後のボタンの再有効化

    • 全てのデータの処理が完了したら、onDone()コールバックが呼ばれ、ボタンが再び押せるように_disableTapfalseに設定します。これにより、ボタンを押すたびに新たなデータ処理が行えるようになります。

非同期ジェネレータ関数のテスト

非同期ジェネレータ関数のテストについて、実際にどのように行うのかを確認してみました。通常、ストリーム自体のテストについては別記事で解説していますので、そちらをご参照ください。今回のテストでは、特に非同期ジェネレータ関数でストリームからデータを受け取る処理に焦点を当てています。

【Flutter】Streamのテストの基礎

サーバーから送られてくるデータを模したテスト

今回のテストでは、WebAPIなどのサーバーから送られてくるデータを模して、StreamControllerを使ってストリームデータを生成しています。例えば、実際にAPIが返すJSON形式のデータをStreamControllerで手動で追加することができ、そのデータを元にジェネレータ関数の挙動を確認できます。

test('StreamController', () {
  final stream = StreamController<String>();
  stream.add(jsonEncode({'result': 'Ready.'}));
  stream.add(jsonEncode({'result': 'Set'}));
  stream.add(jsonEncode({'result': 'Go!'}));
  stream.close();

  expectLater(
    receiveData(stream.stream),
    emitsInOrder(['Ready.', 'Set', 'Go!', emitsDone]),
  );
});

上記のコードでは、StreamControllerを使ってテスト用のストリームを作成し、サーバーから返ってくるであろうデータを模倣しています。サーバーから受信するデータを手動で追加し、それをreceiveData関数を使って処理します。

実際のサーバーデータを用いたテスト

私の場合、LMLサーバーからのデータを非同期ジェネレータ関数で処理しています。そのため、実際のサーバーから送られてくるデータをこのテストに組み込むことで、より現実的なシナリオを再現できます。これにより、実際のサーバーデータを模したテストが可能となり、プロダクション環境でのデータ処理をシミュレーションできます。

サーバーのレスポンスデータをStreamControllerを使って手動で生成し、それをjsonEncodeでエンコードしてストリームに追加することで、サーバーが送信してくるデータと同じように扱うことができます。

Q&A

Q1: 非同期ジェネレータ関数とは何ですか?

A1: 非同期ジェネレータ関数は、非同期処理を行いながら複数の値を順次返す関数です。これにより、データがすべて揃うのを待たず、途中の結果を即座に処理できます。Flutterでは、Streamyieldを用いて実装し、リアルタイムデータや大量データの効率的な処理に役立ちます。

Q2: yieldyield*の違いは何ですか?

A2: yieldは、非同期ジェネレータ関数内で単一の値を返すために使用します。一方、yield*は、他のジェネレータ関数からの複数の値をまとめて返すために使います。これにより、ジェネレータを効率的に連結して利用することが可能です。

Q3: 非同期ジェネレータ関数の実用例は?

A3: 代表的な例はAPIデータの逐次取得です。例えば、1秒ごとにデータを順次取得し、リアルタイムでUIを更新できます。この処理をStreamasync*で実装することで、アプリケーションの応答性を保ちながら、効率的なデータ処理が可能になります。

まとめ

非同期ジェネレータ関数について、非同期処理の基本から具体的な実装方法まで勉強しました。リアルタイムデータの処理や大量データの効率的な扱い方、エラー処理やリソース管理の重要性を理解しました。これにより、Flutterアプリのパフォーマンスを向上させ、ユーザー体験を高めるための知識を得ることができました。

参考

ソース(main.dartにコピペして動作確認用)

import 'dart:async';
import 'dart:convert';
import 'package:flutter/material.dart';

void main() {
  runApp(MyApp());
}

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      title: '非同期ジェネレータ サンプル',
      theme: ThemeData(
        primarySwatch: Colors.blue,
      ),
      home: StreamTextPage(),
    );
  }
}

class StreamTextPage extends StatefulWidget {
  @override
  _StreamTextPageState createState() => _StreamTextPageState();
}

class _StreamTextPageState extends State<StreamTextPage> {
  var _messages = ''; // 画面に表示するメッセージ
  var _disableTap = false;

  final messageData =
      'これはAPIからのデータ通信を模したメソッドです。チャットなどで随時データを取得しているつもりです'.split('');

  // Web APIを模したメソッド
  Stream<String> fetchMockStream() {
    final controller = StreamController<String>();

    for (int index = 0; index < (messageData.length + 1); index++) {
      Future.delayed(Duration(milliseconds: 200 * index)).then((_) {
        if (index < messageData.length) {
          final map = <String, dynamic>{'result': messageData[index]};
          controller.sink.add(jsonEncode(map));
        } else {
          controller.close();
        }
      });
    }

    return controller.stream;
  }

  Stream<String> fetchMockStreamWithAsynchronousGenerator() async* {
    for (int index = 0; index < messageData.length; index++) {
      await Future.delayed(const Duration(milliseconds: 200));
      final map = <String, dynamic>{'result': messageData[index]};
      yield jsonEncode(map);
    }
  }

  Stream<String> receiveData(Stream<String> stream) async* {
    await for (final String source in stream) {
      final map = jsonDecode(source);
      yield map['result'] ?? '';
    }
  }

  void onTap(Stream<String> stream) {
    setState(() {
      _messages = '';
      _disableTap = true;
    });
    receiveData(stream).listen((data) {
      setState(() {
        _messages = _messages + data;
      });
    }, onDone: () {
      setState(() {
        _disableTap = false;
      });
    });
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text('非同期ジェネレータのサンプル'),
      ),
      body: Padding(
        padding: const EdgeInsets.all(16.0),
        child: Column(
          crossAxisAlignment: CrossAxisAlignment.start,
          children: [
            Expanded(
              child: SingleChildScrollView(
                child: Text(
                  _messages,
                  style: TextStyle(fontSize: 16.0),
                ),
              ),
            ),
            const SizedBox(height: 16),
            FilledButton(
              onPressed: _disableTap ? null : () => onTap(fetchMockStream()),
              child: Text('非同期処理'),
            ),
            FilledButton(
              onPressed: _disableTap
                  ? null
                  : () => onTap(fetchMockStreamWithAsynchronousGenerator()),
              child: Text('非同期ジェネレータ関数'),
            ),
          ],
        ),
      ),
    );
  }
}