Главная страница Взаимодействие нетривиальных процессов Запуск всех потоков 21-41 Инициализируем семафоры и запускаем потоки-производители и поток-потребитель. Затем ожидается завершение работы потоков. Эта часть кода практически идентична листингу 7.1. В листинге 10.13 приведен текст функции produce, которая выполняется каждым потоком-производителем. Листинг 10.13. Функция, выполняемая всеми потоками-производителями pxsem/prodcons3.c 43 void * 44 produce(void *arg) 45 { 46 for ( : ; ) { 47 Sem wait(&shared.nempty); /* ожидание освобождения поля */ 48 Sem wait(&shared.mutex); 49 1f (shared.nput nitems) { 50 Sem post(&sha red.nempty); 51 Sem post(&shared.mutex): 52 return(NLILL); /* готово */ 53 } 54 shared.buff[shared.nput % NBUFF] - shared.nputval; 55 shared.nput++; 56 shared.nputval++; 57 Sem post(&shared.mutex); 58 Sem post(&shared.nstored); /* еще один элемент */ 59 *((int *) arg) +- 1; 60 } 61 } Взаимное исключение между потоками-производителями 49-53 Отличие от листинга 10.8 в том, что цикл завершается, когда nitems объектов будет помещено в буфер всеми потоками. Обратите внимание, что потоки-производители могут получить семафор nempty в любой момент, но только один производитель может иметь семафор mutex. Это защищает переменные nput и nval от одновременного изменения несколькими производителями. Завершение производителей 50-51 Нам нужно аккуратно обработать завершение потоков-производителей. После того как последний объект помещен в буфер, каждый поток выполняет Sem wait(&shared.nempty): /* ожидание пустого поля */ в начале цикла, что уменьшает значение семафора nempty. Но прежде, чем поток будет завершен, он должен увеличить значение этого семафора, потому что он не помещает объект в буфер в последнем проходе цикла. Завершающий работу поток должен также освободить семафор mutex, чтобы другие производители смогли продолжить функционирование. Если мы не увеличим семафор nempty по завершении процесса и если производителей будет больше, чем мест в буфере. 68 Sem wait(&shared.mutex): одного элемента в буфер */ 69 if (Shared.buffEi % NBUFF] !- i) 70 printfCerror: buff[ад = %й\п\ i, shared.buff[i % NBUFF]): 71 Sem post(&shared.mutex): 72 Sem post(&shared.nempty): /* еще одно пустое поле */ 73 } 74 return(NULL): 75 } Условие завершения единственного потока-потребителя звучит просто: он считает все потребленные объекты и останавливается по достижении nitems. 10.10. Несколько производителей, несколько потребителей Следующее изменение, которое мы внесем в нашу программу, будет заключаться в добавлении возможности одновременной работы нескольких потребителей вместе с несколькими производителями. Есть ли смысл в наличии нескольких потребителей - зависит от приложения. Автор видел два примера, в которых использовался этот метод. 1. Программа преобразования IP-адресов в имена узлов. Каждый потребитель берет IP-адрес, вызывает gethostbyaddг (раздел 9.6 [24]), затем дописывает имя узла к файлу. Поскольку каждый вызов gethostbyaddr обрабатывается неопределенное время, порядок IP-адресов в буфере будет, скорее всего, отличаться от порядка имен узлов в файле, созданном потоками-потребителями. Преимущество этой схемы в параллельности выполнения вызовов gethostbyaddr (каждый из которых может работать несколько секунд) - по одному на каждый поток-потребитель. ПРИМЕЧАНИЕ Предполагается наличие версии gethostbyaddr, допускающей многократное вхождение, что не всегда верно. Если эта версия недоступна, можно хранить буфер в разделяемой памяти и использовать процессы вместо потоков. лишние потоки будут заблокированы навсегда, ожидая освобождения семафора пепф1у, и никогда не завершат свою работу. Функция consume в листинге 10.14 проверяет правильность всех записей в буфере, выводя сообщение при обнаружении ошибки. Листинг 10.14. Функция, выполняемая потоком-потребителем pxsem/prodcons3.c 62 void * 63 consume(void *arg) 64 { 65 int i: 66 for (i = 0: i < nitems: i++) { 67 Sem walt(&shared.nstored); /* ожидание помещения по крайней мере 2. Программа, принимающая дейтаграммы UDP, обрабатывающая их и записывающая результат в базу данных. Каждая дейтаграмма обрабатывается одним потоком-потребителем, которые выполняются параллельно для ускорения процесса. Хотя дейтаграммы записываются в базу данных в порядке, вообще говоря, отличном от порядка их приема, встроенная схема упорядочения записей в базе данных справляется с этой проблемой. В листинге 10.15 приведены глобальные переменные программы. Листинг 10.15. Глобальные переменные pxsem/prodcons4.c 1 finclude unpipc.h 2 fdefine NBUFF 10 3 fdefine MAXNTHREADS 100 4 int nitems. nproducers. nconsumers; /* только для чтения */ 5 struct { /* общие данные производителей и потребителей */ 6 int buff[NBUFF]; 7 int nput; /* номер объекта: 0. 1. 2. ... */ 8 int nputval: /* сохраняемое в buff[] значение */ 9 int nget: /* номер объекта: 0. 1. 2. ... */ 10 int ngetval: /* получаемое из buff[] значение */ 11 sem t mutex, nempty, nstored: /* семафоры, a не указатели */ 12 } shared: 13 void *produce(void *), *consume(void *): Глобальные переменные и общая структура 4-12 Количество потоков-потребителей является глобальной переменной, устанавливаемой из командной строки. В структуру shared добавилось два новых поля: nget - номер следующего объекта, получаемого одним из потоков-потребителей, и ngetval - соответствующее значение. Функция main, текст которой приведен в листинге 10.16, запускает несколько потоков-потребителей и потоков-производителей одновременно. 19-23 Новый аргумент командной строки указывает количество потоков-потребителей. Для хранения идентификаторов потоков-потребителей выделяется место под специальный массив (tid consume), а для подсчета обработанных каждым потоком объектов выделяется массив conscount. 24-50 Создаются несколько потоков-производителей и потребителей, после чего основной поток ждет их завершения. Листинг 10.16. Функция main для версии с несколькими производителями и потребителями pxsem/prodcons4.c 14 int 15 maindnt argc, char **argv) 16 { 17 int i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS]; 18 pthread t tid produce[MAXNTHREADS], tid consume[MAXNTHREADS]:
|
© 2000 - 2024 ULTRASONEX-AMFODENT.RU.
Копирование материалов разрешено исключительно при условии цититирования. |