1、安装zeroMQ
wget http://download.zeromq.org/zeromq-4.1.2.tar.gz tar -xvzf zeromq-4.1.2.tar.gz cd zeromq-4.1.2.tar.gz ./configure --without-security make && make install
或者:
git clone https://github.com/zeromq/libzmq.git
如果使用c++语言编程还需要获取到
zeroMQ的c++版头文件
git clone https://github.com/zeromq/cppzmq.git
将zmq.hpp拷贝到/use/local/include下
2、三种模式实例
1、请求回应简单模式:
server.cpp
// // Hello World server in C++ // Binds REP socket to tcp://*:5555 // Expects "Hello" from client, replies with "World" // #include <zmq.hpp> #include <string> #include <iostream> #ifndef _WIN32 #include <unistd.h> #else #include <windows.h> #define sleep(n) Sleep(n) #endif #include <sys/time.h> int main () { // Prepare our context and socket zmq::context_t context (1); zmq::socket_t socket (context, ZMQ_REP); socket.bind ("tcp://*:5555"); struct timespec tv = {1,0}; while (true) { zmq::message_t request; // Wait for next request from client socket.recv (&request); std::cout << "Received Hello" << std::endl; // Do some 'work' // sleep(1); if(nanosleep(&tv, NULL) == -1) { exit(-1); } // Send reply back to client zmq::message_t reply (5); memcpy ((void *) reply.data (), "World", 5); socket.send (reply); } return 0; }
client.cpp
#include <zmq.hpp> #include <string> #include <iostream> int main () { // Prepare our context and socket zmq::context_t context (1); zmq::socket_t socket (context, ZMQ_REQ); std::cout << "Connecting to hello world server…" << std::endl; socket.connect ("tcp://localhost:5555"); // Do 10 requests, waiting each time for a response for (int request_nbr = 0; request_nbr != 10; request_nbr++) { zmq::message_t request (6); memcpy ((void *) request.data (), "Hello", 5); std::cout << "Sending Hello " << request_nbr << "…" << std::endl; socket.send (request); // Get the reply. zmq::message_t reply; socket.recv (&reply); std::cout << "Received World " << request_nbr << std::endl; } return 0; }
g++ -o server server.cpp -lzmq
g++ -o client client.cpp -lzmq
2、发布订阅模式:
server.cpp
#include <zmq.hpp> #include <stdio.h> #include <stdlib.h> #include <time.h> #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0)) int main () { // Prepare our context and publisher zmq::context_t context (1); zmq::socket_t publisher (context, ZMQ_PUB); publisher.bind("tcp://*:5556"); // Initialize random number generator srandom ((unsigned) time (NULL)); while (1) { int zipcode, temperature, relhumidity; // Get values that will fool the boss zipcode = within (100000); temperature = within (215) - 80; relhumidity = within (50) + 10; // Send message to all subscribers zmq::message_t message(20); snprintf ((char *) message.data(), 20 , "%05d %d %d", zipcode, temperature, relhumidity); publisher.send(message); } return 0; }
client.cpp
#include <zmq.hpp> #include <iostream> #include <sstream> int main (int argc, char *argv[]) { zmq::context_t context (1); // Socket to talk to server std::cout << "Collecting updates from weather server…\n" << std::endl; zmq::socket_t subscriber (context, ZMQ_SUB); subscriber.connect("tcp://localhost:5556"); // Subscribe to zipcode, default is NYC, 10001 const char *filter = (argc > 1)? argv [1]: "10001 "; subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter)); // Process 100 updates int update_nbr; long total_temp = 0; do { for (update_nbr = 0; update_nbr < 100; update_nbr++) { zmq::message_t update; int zipcode, temperature, relhumidity; subscriber.recv(&update); std::istringstream iss(static_cast<char*>(update.data())); iss >> zipcode >> temperature >> relhumidity ; total_temp += temperature; } std::cout << "Average temperature for zipcode '"<< filter <<"' was "<<(int) (total_temp / update_nbr) <<"F" << std::endl; sleep(1); } while (1); return 0; }
3、管道模式:
taskvent: Parallel task ventilator in c++
// // Task ventilator in C++ // Binds PUSH socket to tcp://localhost:5557 // Sends batch of tasks to workers via that socket // // Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com> // #include <zmq.hpp> #include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <iostream> #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0)) int main (int argc, char *argv[]) { zmq::context_t context (1); // Socket to send messages on zmq::socket_t sender(context, ZMQ_PUSH); sender.bind("tcp://*:5557"); std::cout << "Press Enter when the workers are ready: " << std::endl; getchar (); std::cout << "Sending tasks to workers…\n" << std::endl; // The first message is "0" and signals start of batch zmq::socket_t sink(context, ZMQ_PUSH); sink.connect("tcp://localhost:5558"); zmq::message_t message(2); memcpy(message.data(), "0", 1); sink.send(message); // Initialize random number generator srandom ((unsigned) time (NULL)); // Send 100 tasks int task_nbr; int total_msec = 0; // Total expected cost in msecs for (task_nbr = 0; task_nbr < 100; task_nbr++) { int workload; // Random workload from 1 to 100msecs workload = within (100) + 1; total_msec += workload; message.rebuild(10); sprintf ((char *) message.data(), "%d", workload); sender.send(message); } std::cout << "Total expected cost: " << total_msec << " msec" << std::endl; sleep (1); // Give 0MQ time to deliver return 0; }
taskwork: Parallel task worker in C++
// // Task worker in C++ // Connects PULL socket to tcp://localhost:5557 // Collects workloads from ventilator via that socket // Connects PUSH socket to tcp://localhost:5558 // Sends results to sink via that socket // // Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com> // #include "zhelpers.hpp" int main (int argc, char *argv[]) { zmq::context_t context(1); // Socket to receive messages on zmq::socket_t receiver(context, ZMQ_PULL); receiver.connect("tcp://localhost:5557"); // Socket to send messages to zmq::socket_t sender(context, ZMQ_PUSH); sender.connect("tcp://localhost:5558"); // Process tasks forever while (1) { zmq::message_t message; int workload; // Workload in msecs receiver.recv(&message); std::istringstream iss(static_cast<char*>(message.data())); iss >> workload; // Do the work s_sleep(workload); // Send results to sink message.rebuild(); sender.send(message); // Simple progress indicator for the viewer std::cout << "." << std::flush; } return 0; }
tasksink: Parallel task sink in C++
// // Task sink in C++ // Binds PULL socket to tcp://localhost:5558 // Collects results from workers via that socket // // Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com> // #include <zmq.hpp> #include <time.h> #include <sys/time.h> #include <iostream> int main (int argc, char *argv[]) { // Prepare our context and socket zmq::context_t context(1); zmq::socket_t receiver(context,ZMQ_PULL); receiver.bind("tcp://*:5558"); // Wait for start of batch zmq::message_t message; receiver.recv(&message); // Start our clock now struct timeval tstart; gettimeofday (&tstart, NULL); // Process 100 confirmations int task_nbr; int total_msec = 0; // Total calculated cost in msecs for (task_nbr = 0; task_nbr < 100; task_nbr++) { receiver.recv(&message); if ((task_nbr / 10) * 10 == task_nbr) std::cout << ":" << std::flush; else std::cout << "." << std::flush; } // Calculate and report duration of batch struct timeval tend, tdiff; gettimeofday (&tend, NULL); if (tend.tv_usec < tstart.tv_usec) { tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1; tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec; } else { tdiff.tv_sec = tend.tv_sec - tstart.tv_sec; tdiff.tv_usec = tend.tv_usec - tstart.tv_usec; } total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000; std::cout << "\nTotal elapsed time: " << total_msec << " msec\n" << std::endl; return 0; }
zeroMQ在设计上主要采用了以下几个高性能的特征:
1、无锁的队列模型
对于跨线程间的交互(用户端和session)之间的数据交换通道pipe,采用无锁的队列算法CAS;在pipe的两端注册有异步事件,在读或者写消息到pipe的时,会自动触发读写事件。
2、批量处理的算法
对于传统的消息处理,每个消息在发送和接收的时候,都需要系统的调用,这样对于大量的消息,系统的开销比较大,zeroMQ对于批量的消息,进行了适应性的优化,可以批量的接收和发送消息。
3、多核下的线程绑定,无须CPU切换
区别于传统的多线程并发模式,信号量或者临界区, zeroMQ充分利用多核的优势,每个核绑定运行一个工作者线程,避免多线程之间的CPU切换开销。
文章参考:
http://github.tiankonguse.com/blog/2014/12/20/zoermq-study/
http://zguide.zeromq.org/page:all
http://blog.csdn.net/sunrise918/article/details/8701274