在应用开发中,生产者,消费者的模型非常常见,一方产生数据并把数据放入队列中,而另一方从队列中取数据,先进先出。
应用:线程间通信/进程间通信。
Linux系统中提供了两种不同接口的消息队列:
- POSIX消息队列。POSIX为可移植的操作系统接口。
- System V消息队列。System V 是 AT&T 的第一个商业UNIX版本(UNIX System III)的加强。
其中,POSIX消息队列可移植性较强,使用较广。
Linux系统中提供的消息队列一般应用于进行间通信,但也可以用于线程间通信。
本文介绍POSIX消息队列应用于线程间通信。
头文件:
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>
编译链接需要加上
-lrt
链接。
Linux内核提供了一系列函数来使用消息队列:
/**
* @brief 创建消息队列实例
*
* Detailed function description
*
* @param[in] name: 消息队列名称
* @param[in] oflag:根据传入标识来创建或者打开一个已创建的消息队列
- O_CREAT: 创建一个消息队列
- O_EXCL: 检查消息队列是否存在,一般与O_CREAT一起使用
- O_CREAT|O_EXCL: 消息队列不存在则创建,已存在返回NULL
- O_NONBLOCK: 非阻塞模式打开,消息队列不存在返回NULL
- O_RDONLY: 只读模式打开
- O_WRONLY: 只写模式打开
- O_RDWR: 读写模式打开
* @param[in] mode:访问权限
* @param[in] attr:消息队列属性地址
*
* @return 成功返回消息队列描述符,失败返回-1,错误码存于error中
*/
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
/**
* @brief 无限阻塞方式接收消息
*
* Detailed function description
*
* @param[in] mqdes: 消息队列描述符
* @param[in] msg_ptr:消息体缓冲区地址
* @param[in] msg_len:消息体长度,长度必须大于等于消息属性设定的最大值
* @param[in] msg_prio:消息优先级
*
* @return 成功返回消息长度,失败返回-1,错误码存于error中
*/
mqd_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
/**
* @brief 指定超时时间阻塞方式接收消息
*
* Detailed function description
*
* @param[in] mqdes: 消息队列描述符
* @param[in] msg_ptr:消息体缓冲区地址
* @param[in] msg_len:消息体长度,长度必须大于等于消息属性设定的最大值
* @param[in] msg_prio:消息优先级
* @param[in] abs_timeout:超时时间
*
* @return 成功返回消息长度,失败返回-1,错误码存于error中
*/
mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio, const struct timespec *abs_timeout);
/**
* @brief 无限阻塞方式发送消息
*
* Detailed function description
*
* @param[in] mqdes: 消息队列描述符
* @param[in] msg_ptr:待发送消息体缓冲区地址
* @param[in] msg_len:消息体长度
* @param[in] msg_prio:消息优先级
*
* @return 成功返回0,失败返回-1
*/
mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
/**
* @brief 指定超时时间阻塞方式发送消息
*
* Detailed function description
*
* @param[in] mqdes: 消息队列描述符
* @param[in] msg_ptr:待发送消息体缓冲区地址
* @param[in] msg_len:消息体长度
* @param[in] msg_prio:消息优先级
* @param[in] abs_timeout:超时时间
*
* @return 成功返回0,失败返回-1
*/
mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abs_timeout);
/**
* @brief 关闭消息队列
*
* Detailed function description
*
* @param[in] mqdes: 消息队列描述符
*
* @return 成功返回0,失败返回-1
*/
mqd_t mq_close(mqd_t mqdes);
/**
* @brief 分离消息队列
*
* Detailed function description
*
* @param[in] name: 消息队列名称
*
* @return 成功返回0,失败返回-1
*/
mqd_t mq_unlink(const char *name);
例子:线程1不断给线程2发送字符串数据。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>
#define MQ_MSG_MAX_SIZE 512 ///< 最大消息长度
#define MQ_MSG_MAX_ITEM 5 ///< 最大消息数目
static pthread_t s_thread1_id;
static pthread_t s_thread2_id;
static unsigned char s_thread1_running = 0;
static unsigned char s_thread2_running = 0;
static mqd_t s_mq;
static char send_msg[10] = "hello";
void *thread1_fun(void *arg)
{
int ret = 0;
s_thread1_running = 1;
while (s_thread1_running)
{
ret = mq_send(s_mq, send_msg, sizeof(send_msg), 0);
if (ret < 0)
{
perror("mq_send error");
}
printf("send msg = %s\n", send_msg);
usleep(100 * 1000);
}
pthread_exit(NULL);
}
void *thread2_fun(void *arg)
{
char buf[MQ_MSG_MAX_SIZE];
int recv_size = 0;
s_thread2_running = 1;
while (s_thread2_running)
{
recv_size = mq_receive(s_mq, &buf[0], sizeof(buf), NULL);
if (-1 != recv_size)
{
printf("receive msg = %s\n", buf);
}
else
{
perror("mq_receive error");
break;
}
usleep(100 * 1000);
}
pthread_exit(NULL);
}
int main(void)
{
int ret = 0;
struct mq_attr attr;
///< 创建消息队列
memset(&attr, 0, sizeof(attr));
attr.mq_maxmsg = MQ_MSG_MAX_ITEM;
attr.mq_msgsize = MQ_MSG_MAX_SIZE;
attr.mq_flags = 0;
s_mq = mq_open("/mq", O_CREAT|O_RDWR, 0777, &attr);
if(-1 == s_mq)
{
perror("mq_open error");
return -1;
}
///< 创建线程1
ret = pthread_create(&s_thread1_id, NULL, thread1_fun, NULL);
if (ret != 0)
{
printf("thread1_create error!\n");
exit(EXIT_FAILURE);
}
ret = pthread_detach(s_thread1_id);
if (ret != 0)
{
printf("s_thread1_id error!\n");
exit(EXIT_FAILURE);
}
///< 创建线程2
ret = pthread_create(&s_thread2_id, NULL, thread2_fun, NULL);
if (ret != 0)
{
printf("thread2_create error!\n");
exit(EXIT_FAILURE);
}
ret = pthread_detach(s_thread2_id);
if (ret != 0)
{
printf("s_thread2_id error!\n");
exit(EXIT_FAILURE);
}
while (1)
{
sleep(1);
}
return 0;
}
编译、运行: