Главная страница Взаимодействие нетривиальных процессов ПРИМЕЧАНИЕ Наша схема может оказаться медленной в случае наличия в очереди большого количества сообщений, поскольку каждый раз при добавлении нового придется просматривать их значительную часть. Можно хранить отдельно индексы последних сообщений со всеми имеющимися значениями приоритета. 65 break: 66 } 67 index = nisghdr->nisg next; 68 pmsghdr = msghdr; 69 } 70 if (index == 0) { 71 /* очередь была пуста или новое письмо добавлено к концу списка */ 72 pnisghdr->nisg next = freeindex; 73 nnisghdr->nisg next = 0; 74 } 75 /* запускаем любой из процессов, заблокированных в mq receive */ 76 if (attr->mq curnisgs == 0) 77 pthread cond signal(&mqhdr->mqh wait); 78 attr->niq cunnsgs++: 79 pthread niutex unlock(&niqhdr->mqh lock): 80 return(O); 81 err; 82 pthread niutex unlock(&mqhdr->mqh lock); 83 return(-l): 84 } Получение индекса свободного блока 50-52 Поскольку количество свободных сообщений при создании очереди равно niq maxmsg, ситуация, в которой niq cunnsgs будет меньше niq maxmsg для пустого списка свободных сообщений, возникнуть не может. Копирование сообщения 53-56 Указатель nmsghdr хранит адрес области памяти, в которую помещается сообщение. Приоритет и длина сообщения сохраняются в структуре msg hdr, а затем в память копируется содержимое сообщения, переданного вызвавшим процессом. Помещение нового сообщения в соответствующее место связного списка 57-74 Порядок сообщений в нашем списке зависит от их приоритета: они расположены в порядке его убывания. При добавлении нового сообщения мы проверяем, существуют ли сообщения с тем же приоритетом; в этом случае сообщение добавляется после последнего из них. Используя такой метод упорядочения, мы гарантируем, что niq recei ve всегда будет возвращать старейшее сообщение с наивысшим приоритетом. По мере продвижения по списку мы сохраняем в pmsghdr адрес предыдущего сообщения, поскольку именно это сообщение будет хранить индекс нового сообщения в поле msg next. Пробуждение любого процесса, заблокированного в вызове mq receive 75-77 Если очередь была пуста в момент помещения в нее нового сообщения, мы вызываем ptliread cond signal, чтобы разблокировать любой из процессов, ожидающих сообщения. 78 Увеличиваем на единицу количество сообщений в очереди mq curmsgs. Функция mq receive в листинге 5.27 приведен текст первой половины функции mq recei ve, которая получает необходимые указатели, блокирует взаимное исключение и проверяет объем буфера вызвавшего процесса, который должен быть достаточным для помещения туда сообщения максимально возможной длины. Проверка полноты очереди 30-40 Если очередь пуста и установлен флаг 0 NONBLOCK, возвращается ошибка с кодом EAGAIN. в противном слзпае увеличивается значение счетчика mqli nwait, который проверяется функцией mq sencl (листинг 5.25) в случае, если очередь пуста и есть процессы, ожидающие уведомления. Затем мы ожидаем сигнала по условной переменной, который будет передан функцией mq send (листинг 5.26). ПРИМЕЧАНИЕ- Наша реализация mq receive, как и реализация mq send, упрощает ситуацию с ошибкой EINTR, возвращаемой при прерывании ожидания сигналом, перехватываемым вызвавшим процессом. в листинге 5.28 приведен текст второй половины функции mq recei ve. Мы уже знаем, что в очереди есть сообщение, которое можно будет возвратить вызвавшему процессу. Листинг 5.27. Функция mq receive: первая половина iny pxmsg mmap/mq recei ve. с 1 #include unpipc.li 2 #inclucle mqueue.h 3 ssize t 4 mymq receive(mymqcl t mqd, char *ptr. size t maxlen, unsigned int *priop) 6 int n; 7 long index; 8 int8 t *mptr; 9 ssize t len; 10 struct ti(ymq hdr *mqhdr; 11 struct inymq attr *attr; 12 struct mymsg hdr *msghdr; 13 struct mymq info *mqinfo; 14 mqinfo = mqd; 15 if (mqinfo->mqi magic !- MQI MAGIC) { 16 errno = EBADF; 17 return(-l); 18 } 19 mqhdr = mqinfo->mqi hdr; /* указатель struct */ 20 mptr = (int8 t *) mqhdr; /* указатель на байт */ 21 attr = &mqhdr->mqh attr; 22 if ( (n = pthread mutexJock(&mqhdr->mqhJock)) != 0) { 23 errno = n; 24 return(-l); 25 } 26 if (maxlen < attr->fflq msgsize) { 27 errno - EMSGSIZE; 28 goto err: 29 } 30 if (attr->mq curmsgs - 0) { /* очередь пуста */ 31 , if (mqinfo->fflqi flags & OJONBLOCK) { 32 errno = EAGAIN; 33 goto err: 34 } 35 /* ожидаем помещения сообщения в очередь */ 36 mqhdr->fflqh nwait++: 37 while (attr->mq curmsgs == 0) 38 pthread cond wait(&mqhdr->mqh wait. &mqhdr->mqh lock): 39 mqhdr->mqh nwait--: 40 } Листинг 5.28. Функция mq receive: вторая половина 41 my pxmsg mmap/mq receive.c if ( (index = mqhdr->mqhjead) -= 0) 42 err dump( mymq receive: curmsgs = Id: head = 0 . attr->mq curmsgs): 43 msghdr = (struct mymsgjdr *) &mptr[index]: 44 mqhdr->mqhjead = msghdr->msg next; /* новое начало списка */ 45 len = msghdr->msg len; 46 memcpy(ptr. msghdr + 1. len): /* копирование самого сообщения */ 47 if (priop != NULL) 48 *priop = msghdr->msgj)rio: 49 /* только что считанное сообщение становится первым в списке пустых */ 50 msghdr->msg next = mqhdr->mqh free: 51 mqhdr->mqh free = index: 52 /* запуск любого процесса, заблокированного в вызове mq send */ 53 if (attr->mq cunnsgs =- attr->mq maxmsg) 54 pthread cond signal(&mqhdr->fflqh wait): 55 attr->mq curmsgs--; 56 pthread mutex unlock(&mqhdr->mqh lock); 57 return(Ten); 58 err: 59 pthread mutex unlock(&mqhdr->mqh lock); 60 return(-l); 61 } Возвращение сообщения вызвавшему процессу 43-51 msghdr указывает на msg hdr первого сообщения в очереди, которое мы и возвратим. Освободившееся сообщение становится первым в списке свободных.
|
© 2000 - 2024 ULTRASONEX-AMFODENT.RU.
Копирование материалов разрешено исключительно при условии цититирования. |