【Dart】synchronizedでデータ競合を防ぐ方法

対象者

  • マルチスレッド環境でのデータ競合に悩んでいるエンジニア
  • synchronizedの実装やデッドロック回避方法について知りたい方
  • パフォーマンスを最適化しつつ、安定したシステムを構築したい方

はじめに

複数の非同期処理が実行される環境での開発を進める中で、データ競合や不整合に悩んでいませんか?特に、複数の処理が同時に同じデータにアクセスすることで、思わぬバグやエラーが発生し、プロジェクトの進行に影響を与えることは少なくありません。この記事では、そんな悩みを解決するために不可欠な技術である「synchronized」の役割や必要性、そしてFlutterでの具体的な実装方法について解説します。
デッドロックのリスクを回避し、パフォーマンスを最適化するためのベストプラクティスも紹介しているため、これを読めばsynchronizedを効果的に活用し、より安定したシステムを構築する自信がつくでしょう。

概論

synchronizedとは何か?

synchronizedは、複数のスレッドが同時に同じデータにアクセスしたり、操作したりすることによって起こるデータ競合や不整合を防ぐために使用される技術です。特に、マルチスレッド環境での安全なデータ処理を保証するために重要です。

マルチスレッド環境では、複数のスレッドが同時に実行されるため、同じデータに対して同時にアクセスが発生すると、意図しないデータの上書きや不整合が発生する可能性があります。これにより、プログラムが不正な状態に陥ったり、予期せぬエラーが発生したりするリスクが高まります。synchronizedは、このようなデータ競合を防ぐため、特定のコードブロックを排他制御し、1つのスレッドが処理を終えるまで他のスレッドがそのブロックに入れないようにします。

たとえば、銀行の口座を表すクラスがあり、2つのスレッドが同時に異なる振り替え操作を行う場合を考えます。この状況では、2つの振り替えが競合し、最終的な残高が不正確になる可能性があります。synchronizedを使用することで、1つのスレッドが振り替え操作を完了するまで、他のスレッドが同じ口座に対して操作できないようにします。

Flutterでのsynchronizedの実装方法

Flutterでsynchronizedを実装する際には、synchronizedパッケージを利用するのが一般的です。このパッケージは、Dartの非同期プログラミング環境において、排他制御を簡単に実現できるツールを提供します。

Dartはシングルスレッドで動作するため、Javaなどのマルチスレッド環境とは異なり、通常の操作ではデータ競合のリスクは低いです。しかし、非同期処理が絡む場合や、複数の非同期タスクが同時に実行される場合には、データの整合性を保つためにsynchronizedを使用することが重要になります。

Flutterでのsynchronizedの実装は、Lockクラスを使用することで容易に行えます。このクラスは、非同期関数の実行を排他制御するためのロックを提供します。

final _lock = Lock();

Future<bool> transfer(Account account, int value) async {
  return _lock.synchronized(() async {
    return true;
  });
}

このコードでは、_lock.synchronizedブロック内の処理が1つのスレッドによって排他的に実行されることを保証しています。これにより、他のスレッドが同時にこのブロックに入ることができなくなり、データの競合を防ぎます。

synchronizedを使わない場合のリスク

マルチスレッド環境でsynchronizedを使用しない場合、データの整合性が損なわれ、プログラムの動作が予測できないものになるリスクがあります。特に、複数のスレッドが同時にデータにアクセスする際には、データ競合が発生しやすく、その結果、プログラム全体の信頼性が低下します。

マルチスレッド環境では、複数のスレッドが同時に実行されるため、それぞれのスレッドが独立して動作することが期待されます。しかし、同じデータに対して同時にアクセスする場合、競合が発生し、そのデータの一貫性が失われる可能性があります。たとえば、1つのスレッドがデータを読み取っている間に、別のスレッドがそのデータを更新すると、最初のスレッドが期待していた値と異なる結果が返されることがあります。このような状況は、特に金融アプリケーションやリアルタイムシステムなど、正確なデータ処理が求められるシステムにおいて致命的です。

Flutterでのsynchronizedの実装例

Flutterでsynchronizedを実装する際、synchronizedパッケージを使用することで、非同期処理におけるデータ競合を防ぐことが可能です。このパッケージは、Dartの非同期プログラミングにおける排他制御を簡単に実現するためのツールを提供しています。特に、クラスレベルとインスタンスレベルのLockを適切に使い分けることで、異なるスコープでの競合を防ぐことができます。

基本的な使い方とコード例

synchronizedの基本的な使い方は、Lockクラスを用いて特定のコードブロックを排他的に実行することです。これにより、複数の非同期タスクが同時に実行される環境でも、データの整合性を保つことができます。

たとえば、以下のコードは口座間の振り替え処理をsynchronizedを使ってスレッドセーフにしたものです。

final _lock = Lock();

Future<bool> transfer(Account account, int value) async {
  return _lock.synchronized(() async {
    if (_amount < value) {
      return false;
    }
    await Future.delayed(const Duration(milliseconds: 10));
    _amount -= value;
    account._amount += value;
    return true;
  });
}

この例では、_lock.synchronizedを使用して振り替え処理を保護しています。これにより、複数のスレッドが同時にこのコードブロックを実行しようとしても、1つのスレッドが処理を完了するまで他のスレッドは待機することになります。

クラスレベルのLockとインスタンスレベルのLock

クラスレベルとインスタンスレベルのLockは、それぞれ異なるシナリオで使用されます。クラスレベルのLockは、同じクラス内のすべてのインスタンス間で排他制御を行う場合に使用され、インスタンスレベルのLockは、各インスタンスごとに個別に排他制御を行います。

クラスレベルのLockを使う場合、すべてのインスタンスで共通の操作に対して排他制御を行います。以下の例では、クラスレベルのLockを使用して、すべてのAccountインスタンスで同時に振り替えが行われないようにしています。

class Account {
  static final _classLock = Lock();
  int _amount;

  Account(this._amount);

  Future<bool> transferWithClassLock(Account account, int value) async {
    return _classLock.synchronized(() async {
      if (_amount < value) {
        return false;
      }
      await Future.delayed(const Duration(milliseconds: 10));
      _amount -= value;
      account._amount += value;
      return true;
    });
  }
}

一方、インスタンスレベルのLockは、特定のインスタンスでの操作にのみ排他制御を適用します。インスタンス間で独立した処理が行われる場合には、この方法が有効です。

class Account {
  final _lock = Lock();
  int _amount;

  Account(this._amount);

  Future<bool> transferWithInstanceLock(Account account, int value) async {
    return _lock.synchronized(() async {
      if (_amount < value) {
        return false;
      }
      await Future.delayed(const Duration(milliseconds: 10));
      _amount -= value;
      account._amount += value;
      return true;
    });
  }
}

まとめると、クラスレベルのLockとインスタンスレベルのLockを適切に使い分けることで、Flutterアプリケーションにおけるデータ競合を効果的に防ぐことができます。クラス全体で共通のデータを扱う場合はクラスレベルのLockを、個別のインスタンスで独立した操作を行う場合はインスタンスレベルのLockを使用するのが理想的です。これにより、アプリケーションの信頼性とデータの整合性を確保することが可能です。

Accountクラスを用いた具体的な例

ロックなしでの振り替え処理

ロックなしで振り替え処理を行うと、複数のスレッドが同時に実行されるときにデータの不整合が発生するリスクが非常に高くなります。特に、同じアカウント間で異なるスレッドが同時に振り替えを行うと、残高の計算に誤りが生じ、最終的な残高が正しく反映されない可能性があります。

ロックなしでの振り替え処理は、以下のように実装されます。

Future<bool> transfer(Account account, int value) async {
  if (_amount < value) {
    return false;
  }
  await Future.delayed(const Duration(milliseconds: 10));
  _amount -= value;
  account._amount += value;
  return true;
}

このコードでは、複数のスレッドが同時に振り替え処理を行おうとすると、_amountの計算が正しく行われず、データの整合性が保たれない可能性があります。例えば、アカウントの残高が減ったり増えたりする際に、他のスレッドが同時にアクセスすると、最終的な残高が期待される値と異なる結果になります。

クラスロックを用いた振り替え処理

クラスロックを使用することで、すべてのアカウントインスタンス間で排他制御を行い、同時に複数のスレッドが振り替え操作を行わないようにすることができます。これにより、データの一貫性が保たれ、振り替え処理が確実に安全に行われます。

クラスロックを使用した振り替え処理は、以下のように実装されます。

class Account {
  static final _classLock = Lock();
  int _amount;

  Account(this._amount);

  Future<bool> transferWithClassLock(Account account, int value) async {
    return _classLock.synchronized(() async {
      if (_amount < value) {
        return false;
      }
      await Future.delayed(const Duration(milliseconds: 10));
      _amount -= value;
      account._amount += value;
      return true;
    });
  }
}

この方法では、_classLockがクラス全体で共有されるため、同じクラスの異なるインスタンスでも排他制御が行われ、データ競合を防ぐことができます。すべての振り替え操作が1つのロックによって制御されるため、スレッドが同時に異なるアカウント間で操作を行っても、安全に処理が行われます。
ただ、クラスで同期されてしまうため、一つのインスタンスがロックすると、他のインスタンスが進めず、パフォーマンスが低下します。

インスタンスごとのロックを用いた振り替え処理

インスタンスレベルでロックを行う場合、各アカウントインスタンスごとに排他制御を設定します。これにより、異なるアカウント間での振り替え処理は独立して安全に実行され、同じアカウント間での振り替え操作が同時に行われることを防ぎます。

インスタンスレベルのロックを使用した振り替え処理は、以下のように実装されます。

class Account {
  final _lock = Lock();
  int _amount;

  Account(this._amount);

  Future<bool> transferWithInstanceLock(Account account, int value) async {
    return _lock.synchronized(() async {
      if (_amount < value) {
        return false;
      }
      await Future.delayed(const Duration(milliseconds: 10));
      _amount -= value;
      account._amount += value;
      return true;
    });
  }
}

この例では、各Accountインスタンスに対して個別のロックが設定されており、同じアカウントインスタンス間での競合が発生しないようになっています。これにより、複数の異なるアカウント間で同時に振り替え処理を行っても、各アカウントのデータ整合性が保たれます。

メソッドロックを用いた振り替え処理

メソッドロックを使用することで、特定のメソッド内でのみ排他制御を行うことができます。これは、特定の処理単位でのみロックが必要な場合に有効です。

メソッドロックを使用した振り替え処理は、以下のように実装されます。

class Account {
  int _amount;

  Account(this._amount);

  Future<bool> transferWithMethodLock(Account account, int value) async {
    return synchronized(() async {
      if (_amount < value) {
        return false;
      }
      await Future.delayed(const Duration(milliseconds: 10));
      _amount -= value;
      account._amount += value;
      return true;
    });
  }
}

この方法では、synchronizedを使用してメソッド単位での排他制御を行っています。これにより、特定のメソッドでのみスレッドセーフな処理が保証され、他の部分での競合を防ぐことができます。

ロックの種類まとめ

ロックなしでの振り替え処理は、複数のスレッドが同時にアクセスする際にデータ競合を引き起こすリスクが高く、クラスロックやインスタンスロック、メソッドロックを使用することで、このリスクを効果的に軽減できます。それぞれのロック方法には適したシナリオがあり、クラス全体での競合を防ぎたい場合にはクラスロックを、インスタンスごとの操作を安全に行いたい場合にはインスタンスロックを、特定のメソッド内での排他制御が必要な場合にはメソッドロックを使用するのが適切です。これにより、Flutterアプリケーションにおけるデータの整合性とプログラムの信頼性を大幅に向上させることができます。

synchronizedを使う際の注意点

デッドロックのリスクと回避方法

synchronizedを使用する際に最大のリスクの一つがデッドロックです。デッドロックとは、複数のスレッドが互いに待機状態に入り、永遠に処理が進まなくなる状態を指します。デッドロックは、複数のリソースをロックする際に発生しやすく、特に複数のスレッドが異なる順序でロックを取得しようとする場合に起こります。

デッドロックを回避するためには、以下の方法が有効です:

  • ロックの順序を統一する: すべてのスレッドが同じ順序でロックを取得するようにすることで、デッドロックのリスクを減らします。
  • タイムアウトを設定する: ロックを取得する際にタイムアウトを設定し、一定時間で取得できない場合は処理を中断することでデッドロックを防ぎます。

実例として、ロックの順序を統一する方法を用いたコードを以下に示します。

final lock1 = Lock();
final lock2 = Lock();

Future<void> performTask() async {
  await lock1.synchronized(() async {
    await lock2.synchronized(() async {
      // 処理内容
    });
  });
}

このコードでは、lock1を先に取得し、その後でlock2を取得することで、すべてのスレッドが同じ順序でロックを取得するため、デッドロックが発生するリスクを低減しています。

まとめると、デッドロックのリスクを回避するためには、ロックの順序を統一することやタイムアウトを設定することが効果的です。これにより、synchronizedを使用してもプログラムが予期せぬ停止状態に陥ることを防げます。

パフォーマンスへの影響と最適化

synchronizedを使用することで、排他制御が行われるためにパフォーマンスが低下する可能性があります。特に、複数のスレッドが頻繁にロックを取得しようとする場合、スレッドが待機状態になる時間が長くなり、全体的なパフォーマンスに悪影響を与えることがあります。

パフォーマンスを最適化するための方法としては、以下が考えられます:

  • ロックの範囲を最小限に抑える: ロックするコードブロックを必要最小限にすることで、ロックの競合を減らし、処理の待機時間を短縮します。
  • 不要なロックを避ける: 同じデータにアクセスする必要がない場合は、ロックを使用せずに処理を分散させることで、パフォーマンスを向上させます。

以下に、ロックの範囲を最小限に抑えた例を示します。

final lock = Lock();

Future<void> performTask() async {
  // 他の処理
  await lock.synchronized(() async {
    // ロックが必要な最小限の処理
  });
  // 他の処理
}

このコードでは、ロックが必要な処理部分を最小限に抑えることで、ロックの競合を減らし、スレッドが待機する時間を短縮しています。

結論として、synchronizedの使用はパフォーマンスに影響を与える可能性がありますが、ロックの範囲を最小限に抑え、不要なロックを避けることで、パフォーマンスの最適化が可能です。

synchronizedと他のスレッドセーフ技術の比較

synchronized以外にも、スレッドセーフを実現するための技術があります。それぞれの技術には利点と欠点があり、状況に応じて適切なものを選択することが重要です。

  • Mutex (ミューテックス): 複数のスレッドが同時にリソースにアクセスするのを防ぐための排他制御を行います。synchronizedに比べ、より柔軟な制御が可能ですが、コードが複雑になることがあります。(mutex パッケージ)

  • Future-based concurrency (Futureとasync/await): 非同期処理の中でスレッドセーフな操作を行う際に使用されます。ロックを使用せずに自然な流れで非同期処理を行えますが、スレッド間での明示的な排他制御は行いません。

  • Isolates: Dart特有の機能で、メモリを共有せずにスレッドのように動作する独立した並列処理の単位です。完全なスレッドセーフを保証しますが、処理間でのデータのやり取りが複雑になります。

Q&A

Q1: synchronizedを使わないとどうなりますか?

A1: synchronizedを使用しない場合、マルチスレッド環境でデータ競合が発生しやすくなります。これにより、データの不整合やプログラムの予期しない動作が起こり、結果的にシステムの信頼性が低下するリスクがあります。

Q2: synchronizedのデッドロックはどう回避しますか?

A2: デッドロックを回避するには、すべてのスレッドが同じ順序でロックを取得するように設計し、タイムアウトを設定して取得できなかった場合に処理を中断する方法が効果的です。

Q3: 「ロックの順序を統一する」はどのように実現しますか

A3: 各インスタンスで変化しない値をベースに順番を決めます。例えば「銀行口座」であれば、「口座番号」は固定です。そのため、口座番号の低い口座を先にロックし、その後高い口座をロックします。このように順番を固定することで、デッドロックのリスクを抑えることができます。「口座残高」は変化するので、使用しない方が良いです。(実装はソースのデッドロックの回避を参照)

Q4: synchronizedと他のスレッドセーフ技術はどう違いますか?

A4: synchronizedは簡単に排他制御を実現できますが、MutexやIsolatesなど、他にも状況に応じたスレッドセーフ技術があります。それぞれ利点が異なるため、適切な技術を選ぶことが重要です。

まとめ

この記事を通じて、synchronizedの役割や重要性、Flutterでの実装方法、そして注意点について学びました。synchronizedを使うことで、マルチスレッド環境におけるデータ競合のリスクを効果的に防ぐ方法を理解しました。また、デッドロックのリスクやパフォーマンスへの影響を考慮し、適切な技術選択が重要であることを勉強しました。最適なロック戦略を選ぶことで、アプリケーションの信頼性と効率を高める方法を理解できたと思います。

参考

ソース(main.dartにコピペして動作確認用)

import 'package:flutter_test/flutter_test.dart';
import 'package:synchronized/extension.dart';
import 'package:synchronized/synchronized.dart';

void main() {
  test('without lock', () async {
    final account1 = Account(1, 100);
    final account2 = Account(2, 50);

    final transfer1 = account1.transfer(account2, 100);
    await Future.delayed(const Duration(milliseconds: 5));
    final transfer2 = account2.transfer(account1, 90);

    expect(await Future.wait([transfer1, transfer2]), [true, false]);
  });

  test('with lock', () async {
    final account1 = Account(1, 100);
    final account2 = Account(2, 50);

    final transfer1 =
        Account.classLock.synchronized(() => account1.transfer(account2, 100));
    await Future.delayed(const Duration(milliseconds: 5));

    final transfer2 =
        Account.classLock.synchronized(() => account2.transfer(account1, 90));

    expect(await Future.wait([transfer1, transfer2]), [true, true]);
  });

  group('インスタンス毎にLockを持たせる(正しい方法)', () {
    test('1回目の振り替えが成功しているので、2回目の振り替えも成功する', () async {
      final account1 = Account(1, 100);
      final account2 = Account(2, 50);

      final transfer1 = account1.transferWithLock(account2, 100);
      await Future.delayed(const Duration(milliseconds: 5));
      final transfer2 = account2.transferWithLock(account1, 90);

      expect(await Future.wait([transfer1, transfer2]), [true, true]);
    });

    test('残金が足らず、両方の振り替えが失敗する', () async {
      final account1 = Account(1, 100);
      final account2 = Account(2, 50);

      final transfer1 = account1.transferWithLock(account2, 150);
      await Future.delayed(const Duration(milliseconds: 5));
      final transfer2 = account2.transferWithLock(account1, 90);

      expect(await Future.wait([transfer1, transfer2]), [false, false]);
    });

    test('1,3回目は残金不足で振り替えに失敗', () async {
      final account1 = Account(1, 100);
      final account2 = Account(2, 50);

      final transfer1 = account1.transferWithLock(account2, 150);
      await Future.delayed(const Duration(milliseconds: 5));
      final transfer2 = account1.transferWithLock(account2, 80);
      await Future.delayed(const Duration(milliseconds: 5));
      final transfer3 = account1.transferWithLock(account2, 30);

      expect(
        await Future.wait([transfer1, transfer2, transfer3]),
        [false, true, false],
      );
    });
  });

  test('with method lock', () async {
    final account1 = Account(1, 100);
    final account2 = Account(2, 50);

    final transfer1 = account1.transferWithMethodLock(account2, 100);
    final transfer2 = account1.transferWithMethodLock(account2, 30);
    final transfer3 = account1.transfer(account2, 30);

    expect(await Future.wait([transfer1, transfer2, transfer3]),
        [true, false, true]);
  });
}

class Account {
  Account(this.id, this._amount);

  static final classLock = Lock();

  final _lock = Lock();

  final int id;
  int _amount = 0;

  Future<bool> transfer(Account account, int value) async {
    if (_amount < value) {
      return false;
    }

    await Future.delayed(const Duration(milliseconds: 10));

    _amount -= value;
    account._amount += value;
    return true;
  }

  int getAmount() => _amount;

  Future<bool> transferWithClassLock(Account account, int value) async {
    classLock.synchronized(() async {
      if (_amount < value) {
        return false;
      }

      await Future.delayed(const Duration(milliseconds: 10));

      _amount -= value;
      account._amount += value;
    });
    return true;
  }

  Future<bool> transferWithLock(Account account, int value) async {
    Future<bool> transfer() async {
      if (_amount < value) {
        return false;
      }

      await Future.delayed(const Duration(milliseconds: 10));
      _amount -= value;
      account._amount += value;
      return true;
    }

    if (id == account.id) {
      return false;
    } else if (id < account.id) {
      return _lock
          .synchronized(() => account._lock.synchronized(() => transfer()));
    } else {
      return account._lock
          .synchronized(() => _lock.synchronized(() => transfer()));
    }
  }

  Future<bool> transferWithMethodLock(Account account, int value) async {
    return synchronized(() async {
      if (_amount < value) {
        return false;
      }

      await Future.delayed(const Duration(milliseconds: 10));
      _amount -= value;
      account._amount += value;
      return true;
    });
  }
}

Synchronizedの解説

今回のサンプルコードでは、Flutterにおけるsynchronizedを使用して、複数の非同期処理が同時に実行される際のデータ競合を防ぐ方法を紹介しています。特に、口座間での振り替え操作を安全に行うために、さまざまなロックの使い方を実装しています。

1. Lockを使った同期処理

非同期処理をスレッドセーフにするため、Lockを使って処理を一つのスレッドが排他的に実行できるようにしています。_lock.synchronized()ブロック内で処理が実行されるため、他のスレッドが同時にデータにアクセスすることが防がれます。

final _lock = Lock();

2. クラスレベルのロック

Account.classLockというクラスレベルのロックを使うことで、すべてのアカウントインスタンスに対して排他制御を実施しています。これにより、どのインスタンスからアクセスされても、競合を避けることができます。

static final classLock = Lock();

3. インスタンスレベルのロック

Accountインスタンスごとにロックを持たせ、特定のインスタンスに対する振り替え操作を安全に行います。これにより、異なるアカウント間での振り替えが安全に実行され、競合が防止されます。

final _lock = Lock();

4.デッドロックの回避

コード内では、アカウントIDを基にロックの順序を制御しています。これにより、デッドロック(複数のスレッドが互いのリソースを待ち続ける状態)が発生するのを防いでいます。例えば、IDの小さい方から先にロックを取得するようにして、リソース取得の順序を統一しています。

if (id < account.id) {
  return _lock.synchronized(() => account._lock.synchronized(() => transfer()));
} else {
  return account._lock.synchronized(() => _lock.synchronized(() => transfer()));
}

5. メソッドロック

メソッド全体をロックすることで、メソッドスコープでの排他制御を行うことも可能です。

return synchronized(() async {
  // ロックされた振り替え処理
});