Niejednokrotnie podczas pisania aplikacji napotyka się na sytuację gdy jedna metoda produkuje pewne dane, inna natomiast w pewien sposób je konsumuje. Czasem dobrym pomysłem jest, w przypadku gdy produkowane dane są w pewien sposób podzielne na części, wykonywać produkcję i konsumpcję w równoległych wątkach. Tutaj pojawia się istotny problem z zagadnienia wielowątkowości – synchronizacja. Oba (wszystkie) wątki współdzielące dany zasób muszą z niego korzystać w pewien ustalony sposób, tak aby w danej chwili korzystał z niego tylko jeden z wątków. W uproszczeniu, zapewnienie tego stanu nazywamy synchronizacją. Problem ten jest na tyle niebanalny i powszechny, że platforma .NET wspomaga synchronizację za pomocą wbudowanych mechanizmów. Jednym z nich jest słowo kluczowe lock. Użycie słowa kluczowego lock wygląda tak:
lock(somethingSharedToLock)
{
// operations on the locked object
}
Jak widać składnia jest bardzo prosta, ale co tak właściwie daje nam lock? Użycie słowa kluczowego lock gwarantuje nam, iż żaden inny wątek nie będzie ingerował w zablokowany obiekt podczas wykonywania instrukcji z nawiasu klamrowego. Natomiast jeśli obiekt jest zablokowany przez inny wątek, to kolejny lock będzie cierpliwie czekał na zwolnienie obiektu i dopiero wtedy go zablokuje. Pięknie, prawda? :) Oczywiście lock, jak wszystko – ma swoje wady. Częste i nierozważne lockowanie może doprowadzić do tzw. zakleszczenia (ang. deadlock). Starczy tej teorii, przejdźmy do przykładu (przykład z całą pewnością nie pokazuje best practices tworzenia kodu, ale chodziło mi o maksymalne uproszczenie przykładu).
Stwórzmy sobie prostą klasę producenta:
class Producer
{
public bool IsFinished
{
get;
set;
}
public List List
{
get;
set;
}
public Producer()
{
List = new List();
IsFinished = false;
}
public void Produce()
{
Random random = new Random();
for (int i = 0; i < 20; i++)
{
List.Add(random.Next(0, 20));
Console.WriteLine("Producer[{0}]={1}", i, List[i]);
Thread.Sleep(500);
}
IsFinished = true;
Console.WriteLine("Producer has finished his work");
}
}
Klasa ta, jak widać, w konstruktorze przyjmuję listę, a następnie po wywołaniu Produce() wpisuje do tej listy losowe liczby całkowite z przedziału <0,20) wypisując je przy okazji. Klasa posiada pole które poinformuje nas, że praca została zakończona. Usypianie w metodzie produkującej umożliwi nam zaobserwowanie pewnej ciekawej rzeczy, ale o tym dalej.
Klasa konsumenta:
class Consument
{
private Producer producer;
public Consument(Producer producer)
{
this.producer = producer;
}
public void Consume()
{
int currentElement = 0;
int elementCount = 0;
while (true)
{
elementCount = producer.List.Count;
if (producer.List.Count > currentElement)
{
producer.List[currentElement] += 2;
Console.WriteLine("Consument[{0}]={1}", currentElement, producer.List[currentElement]);
currentElement++;
}
if (producer.List.Count == currentElement && producer.IsFinished)
break;
}
Console.WriteLine("Consument has finished his work");
}
}
Konsument, w konstruktorze przyjmuje producenta, a po wywołaniu Consume(), o ile pojawiło się coś nowego zwiększa wartość każdego elementu o 2, a następnie go wypisuje. Metoda kończy swoje działanie w momencie, gdy Producent da o tym znać i wszystkie elementy zostaną zmienione.
Pozostaje nam odpalić produkcje i konsumpcje w oddzielnych wątkach:
private static void Main(string[] args)
{
Producer producer = new Producer();
Consument consument = new Consument(producer);
Thread producerThread = new Thread(new ThreadStart(producer.Produce));
Thread consumentThread = new Thread(new ThreadStart(consument.Consume));
producerThread.Start();
consumentThread.Start();
producerThread.Join();
consumentThread.Join();
}
Wynik działania programu (przykładowy!!):
Producer[0]=0
Consument[0]=2
Producer[1]=4
Consument[1]=4
Producer[2]=17
Consument[2]=17
Producer[3]=8
Consument[3]=8
Producer[4]=0
Consument[4]=2
Producer[5]=16
Consument[5]=16
Producer[6]=6
Consument[6]=6
Producer[7]=13
Consument[7]=13
Producer[8]=17
Consument[8]=19
Producer[9]=9
Consument[9]=11
Producer[10]=15
Consument[10]=17
Producer[11]=14
Consument[11]=14
Producer[12]=11
Consument[12]=13
Producer[13]=3
Consument[13]=5
Producer[14]=15
Consument[14]=15
Producer[15]=13
Consument[15]=13
Producer[16]=16
Consument[16]=18
Producer[17]=5
Consument[17]=7
Producer[18]=2
Consument[18]=2
Producer[19]=14
Consument[19]=16
Producer has finished his work
Consument has finished his work
Ups! Widzimy, że w wielu przypadkach (1, 2, 3, 5, 6, 7, 11, 14, 15, 18 - czyli w połowie wszystkich) wartość producenta nie została zmieniona, tak jakbyśmy chcieli. Czemu się tak stało? Otóż produkcja kolejnych elementów trwa dłużej niż konsumpcja (poprzez wspomniane wcześniej Thread.Sleep(500)) co powoduje iż w pewnych momentach dana jest "jeszcze" tworzona, a próbuje już być skonsumowana, co powoduje iż proces konsumpcji (dodanie do wartości 2) zostaje pominięte. Uniknięcie tej katastrofy jest możliwe dzięki wspomnianemu lockowi. Wystarczy napisać:
lock (((ICollection)List).SyncRoot)
{
List.Add(random.Next(0, 20));
Console.WriteLine("Producer[{0}]={1}", i, List[i]);
Thread.Sleep(500);
}
oraz:
lock (((ICollection)producer.List).SyncRoot)
{
elementCount = producer.List.Count;
if (producer.List.Count > currentElement)
{
producer.List[currentElement] += 2;
Console.WriteLine("Consument[{0}]={1}", currentElement, producer.List[currentElement]);
currentElement++;
}
if (producer.List.Count == currentElement && producer.IsFinished)
break;
}
To zapewni nam prawidłowe działanie kodu (jak nie wierzycie - sprawdźcie sami :)). A co tak właściwie się stało? Kolekcja w momencie produkowania jest na chwilę blokowana, a więc konsumpcja jest wtedy wstrzymana, gdy produkcja się skończy, konsumpcja zabiera kolekcję dla siebie. Dzięki temu działanie jest zgrabnie zsynchronizowane. Dla kolekcji, w celu synchronizacji należy używać właściwości SyncRoot która jest wymagana przez interfejs ICollection. Wszystkie inne obiekty możemy lockować w sposób bezpośredni (lock(someObject)) chyba, że coś (dokumentacja) podpowiada nam inaczej ;)
To tyle w tej kwestii, dla zainteresowanych polecam poczytać Thread Synchronization oraz How to: Synchronize a Producer and a Consumer Thread.
Pełny listing:
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
namespace ProducentConsumerExample
{
class Example
{
private static void Main(string[] args)
{
Producer producer = new Producer();
Consument consument = new Consument(producer);
Thread producerThread = new Thread(new ThreadStart(producer.Produce));
Thread consumentThread = new Thread(new ThreadStart(consument.Consume));
producerThread.Start();
consumentThread.Start();
producerThread.Join();
consumentThread.Join();
}
}
class Producer
{
public bool IsFinished
{
get;
set;
}
public List List
{
get;
set;
}
public Producer()
{
List = new List();
IsFinished = false;
}
public void Produce()
{
Random random = new Random();
for (int i = 0; i < 20; i++)
{
lock (((ICollection)List).SyncRoot)
{
List.Add(random.Next(0, 20));
Console.WriteLine("Producer[{0}]={1}", i, List[i]);
Thread.Sleep(500);
}
}
IsFinished = true;
Console.WriteLine("Producer has finished his work");
}
}
class Consument
{
private Producer producer;
public Consument(Producer producer)
{
this.producer = producer;
}
public void Consume()
{
int currentElement = 0;
int elementCount = 0;
while (true)
{
lock (((ICollection)producer.List).SyncRoot)
{
elementCount = producer.List.Count;
if (producer.List.Count > currentElement)
{
producer.List[currentElement] += 2;
Console.WriteLine("Consument[{0}]={1}", currentElement, producer.List[currentElement]);
currentElement++;
}
if (producer.List.Count == currentElement && producer.IsFinished)
break;
}
}
Console.WriteLine("Consument has finished his work");
}
}
}