C#でProducer-Consumerパターンのコレクションを長時間維持し、たまに生成/解除する
ただの個人的な調査結果です。
C#で、次のような要求があると仮定します。
- 1つ以上のソース(UIなりネットワークなり)からデータが入ってくる
- データが入るスレッドは待たせない
- 入ったデータは1系列に整理(シリアライズ)され、すぐに処理(UIへの反映など)
- 上記の入力は、アプリケーション全体の中で、0回以上(せいぜい数回)、ON/OFFされる
全体像
全体としては、「UIから、ある程度時間のかかる作業を起こす」のではなく「UIと並列で処理する。時々開始や終了がある(のでアプリケーションよりは短いライフサイクルとなる)」ものなので、async/awaitではなくTaskを直接扱うことにします*1。
シリアライズには、C# 4.0で加わったSystem.Collection.Concurrent.BlockingCollection<T>
クラスがあっています。
今回は、入力ON/OFFをボタンのクリック、入力データの処理をテキストボックスへの追加という形にしてみます。あと、手抜きなので入力は別のボタンのクリックを当てはめてみます。
BlockingCollection(キャンセルつき)
BlockingCollection
は、Take
メソッドを呼んだ時、コレクションにデータがないとブロック(無限ループ)します。.NETライブラリでの処理なので負荷も小さいでしょう。
さらに、入力処理のOFFを行うためには、Take(CancellationToken)
メソッドを使います。Tokenの元(CancellationTokenSource
インスタンス)でCancel()
メソッドを実行すれば、自動的にTake
メソッドは中断され、OperationCanceledException
例外を投げます。
TaskとBlockingCollection、CancellationTokenを組み合わせる
上記の内容を、入力をstring型として、仮コードにしてみます。
// 非同期処理用インスタンス BlockingCollection<string> bc; CancellationTokenSource cts; // 入力開始(どこかのメソッド内の処理) cts = new CancellationTokenSource(); bc = new BlockingCollection<string>(); Task.Run(() => { while (cts.IsCancellationRequested == false) { try { var s = bc.Take(cts.Token); // 適宜ブロック // sを使った処理 } catch (OperationCanceledException) { } // キャンセル時 } }); // 入力終了(どこかのメソッド内の処理) cts.Cancel(); // 入力そのもの(どこかのメソッド内の処理) bc.Add("Hello");
無限ループの判定とBlockingCollection.Take
で、まとめて同一のCancellationTokenSource
インスタンス(cts)を使っています。このため、cts.Cancel()
の呼び出しで入力処理を一気に終わらせられます。
GUIサンプル
この処理のテスト用に、WinFormsでボタン3つと複数行テキストボックスがあるフォームを作りました。
フォームには次のコントロールがおいてあります。
- button1 : 入力処理をONにするボタン
- button2 : 入力処理をOFFにするボタン。初期状態で
Enabled=false
- button3 : 入力を行うボタン。初期状態で
Enabled=false
- textBox1 : 入力データの処理先。
MultiLine=true
かつScrollBars = ScrollBars.Vertical
。button2で入力処理をOFFした時も表示する。
using System; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms; using System.Diagnostics; using System.Collections.Concurrent; namespace WinFormApplication01 { public partial class Form1 : Form { public Form1() { InitializeComponent(); } BlockingCollection<string> bc; CancellationTokenSource cts; // Utility:呼び出し元スレッドを無視してTextBoxに文字列を追加 void AddMessage(string msg) { textBox1.Invoke(new Action(() => { textBox1.AppendText(msg + Environment.NewLine); })); } private void button1_Click(object sender, EventArgs e) { cts = new CancellationTokenSource(); bc = new BlockingCollection<string>(); Task.Run(() => { while (cts.IsCancellationRequested == false) { string s; try { s = bc.Take(cts.Token); } catch (OperationCanceledException) { s = "[canceled]"; } AddMessage(DateTime.Now.ToString() + " " + s); } }).ContinueWith(t => { cts.Dispose(); bc.Dispose(); button1.Invoke(new Action(() => { button3.Enabled = false; button2.Enabled = false; button1.Enabled = true; })); }); button1.Enabled = false; button2.Enabled = true; button3.Enabled = true; } private void button2_Click(object sender, EventArgs e) { cts.Cancel(); } private void button3_Click(object sender, EventArgs e) { bc.Add("[Add]"); } } }
ここでは、入力処理のOFF後の処理(disposeやGUIのEnabled切替)のためにContinueWithを使っています。
*1:こういう場合でもasync/awaitの方がいいようならご教示いただければ幸いです。