Главная страница  Взаимодействие нетривиальных процессов 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 [ 83 ] 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186

Запуск всех потоков

21-41 Инициализируем семафоры и запускаем потоки-производители и поток-потребитель. Затем ожидается завершение работы потоков. Эта часть кода практически идентична листингу 7.1.

В листинге 10.13 приведен текст функции produce, которая выполняется каждым потоком-производителем.

Листинг 10.13. Функция, выполняемая всеми потоками-производителями

pxsem/prodcons3.c

43 void *

44 produce(void *arg)

45 {

46 for ( : ; ) {

47 Sem wait(&shared.nempty); /* ожидание освобождения поля */

48 Sem wait(&shared.mutex);

49 1f (shared.nput nitems) {

50 Sem post(&sha red.nempty);

51 Sem post(&shared.mutex):

52 return(NLILL); /* готово */

53 }

54 shared.buff[shared.nput % NBUFF] - shared.nputval;

55 shared.nput++;

56 shared.nputval++;

57 Sem post(&shared.mutex);

58 Sem post(&shared.nstored); /* еще один элемент */

59 *((int *) arg) +- 1;

60 }

61 }

Взаимное исключение между потоками-производителями

49-53 Отличие от листинга 10.8 в том, что цикл завершается, когда nitems объектов будет помещено в буфер всеми потоками. Обратите внимание, что потоки-производители могут получить семафор nempty в любой момент, но только один производитель может иметь семафор mutex. Это защищает переменные nput и nval от одновременного изменения несколькими производителями.

Завершение производителей

50-51 Нам нужно аккуратно обработать завершение потоков-производителей. После того как последний объект помещен в буфер, каждый поток выполняет Sem wait(&shared.nempty): /* ожидание пустого поля */

в начале цикла, что уменьшает значение семафора nempty. Но прежде, чем поток будет завершен, он должен увеличить значение этого семафора, потому что он не помещает объект в буфер в последнем проходе цикла. Завершающий работу поток должен также освободить семафор mutex, чтобы другие производители смогли продолжить функционирование. Если мы не увеличим семафор nempty по завершении процесса и если производителей будет больше, чем мест в буфере.



68 Sem wait(&shared.mutex):

одного элемента в буфер */

69 if (Shared.buffEi % NBUFF] !- i)

70 printfCerror: buff[ад = %й\п\ i, shared.buff[i % NBUFF]):

71 Sem post(&shared.mutex):

72 Sem post(&shared.nempty): /* еще одно пустое поле */

73 }

74 return(NULL):

75 }

Условие завершения единственного потока-потребителя звучит просто: он считает все потребленные объекты и останавливается по достижении nitems.

10.10. Несколько производителей, несколько потребителей

Следующее изменение, которое мы внесем в нашу программу, будет заключаться в добавлении возможности одновременной работы нескольких потребителей вместе с несколькими производителями. Есть ли смысл в наличии нескольких потребителей - зависит от приложения. Автор видел два примера, в которых использовался этот метод.

1. Программа преобразования IP-адресов в имена узлов. Каждый потребитель берет IP-адрес, вызывает gethostbyaddг (раздел 9.6 [24]), затем дописывает имя узла к файлу. Поскольку каждый вызов gethostbyaddr обрабатывается неопределенное время, порядок IP-адресов в буфере будет, скорее всего, отличаться от порядка имен узлов в файле, созданном потоками-потребителями. Преимущество этой схемы в параллельности выполнения вызовов gethostbyaddr (каждый из которых может работать несколько секунд) - по одному на каждый поток-потребитель.

ПРИМЕЧАНИЕ

Предполагается наличие версии gethostbyaddr, допускающей многократное вхождение, что не всегда верно. Если эта версия недоступна, можно хранить буфер в разделяемой памяти и использовать процессы вместо потоков.

лишние потоки будут заблокированы навсегда, ожидая освобождения семафора пепф1у, и никогда не завершат свою работу.

Функция consume в листинге 10.14 проверяет правильность всех записей в буфере, выводя сообщение при обнаружении ошибки.

Листинг 10.14. Функция, выполняемая потоком-потребителем

pxsem/prodcons3.c

62 void *

63 consume(void *arg)

64 {

65 int i:

66 for (i = 0: i < nitems: i++) {

67 Sem walt(&shared.nstored); /* ожидание помещения по крайней мере



2. Программа, принимающая дейтаграммы UDP, обрабатывающая их и записывающая результат в базу данных. Каждая дейтаграмма обрабатывается одним потоком-потребителем, которые выполняются параллельно для ускорения процесса. Хотя дейтаграммы записываются в базу данных в порядке, вообще говоря, отличном от порядка их приема, встроенная схема упорядочения записей в базе данных справляется с этой проблемой.

В листинге 10.15 приведены глобальные переменные программы.

Листинг 10.15. Глобальные переменные

pxsem/prodcons4.c

1 finclude unpipc.h

2 fdefine NBUFF 10

3 fdefine MAXNTHREADS 100

4 int nitems. nproducers. nconsumers; /* только для чтения */

5 struct { /* общие данные производителей и потребителей */

6 int buff[NBUFF];

7 int nput; /* номер объекта: 0. 1. 2. ... */

8 int nputval: /* сохраняемое в buff[] значение */

9 int nget: /* номер объекта: 0. 1. 2. ... */

10 int ngetval: /* получаемое из buff[] значение */

11 sem t mutex, nempty, nstored: /* семафоры, a не указатели */

12 } shared:

13 void *produce(void *), *consume(void *):

Глобальные переменные и общая структура

4-12 Количество потоков-потребителей является глобальной переменной, устанавливаемой из командной строки. В структуру shared добавилось два новых поля: nget - номер следующего объекта, получаемого одним из потоков-потребителей, и ngetval - соответствующее значение.

Функция main, текст которой приведен в листинге 10.16, запускает несколько потоков-потребителей и потоков-производителей одновременно.

19-23 Новый аргумент командной строки указывает количество потоков-потребителей. Для хранения идентификаторов потоков-потребителей выделяется место под специальный массив (tid consume), а для подсчета обработанных каждым потоком объектов выделяется массив conscount.

24-50 Создаются несколько потоков-производителей и потребителей, после чего основной поток ждет их завершения.

Листинг 10.16. Функция main для версии с несколькими производителями и потребителями

pxsem/prodcons4.c

14 int

15 maindnt argc, char **argv)

16 {

17 int i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];

18 pthread t tid produce[MAXNTHREADS], tid consume[MAXNTHREADS]:



1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 [ 83 ] 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186

© 2000 - 2024 ULTRASONEX-AMFODENT.RU.
Копирование материалов разрешено исключительно при условии цититирования.