@
NoAnyLove 版本是 4.3.3 目前不是必现,所以不太清楚可能是什么问题
贴下 send 的代码,看了下没看出来那会提前 close socket:
void *cts_zmq_socket_connect(void *context, void * requester, const char* dest,short destory)
{
void *pSocket = NULL;
int optionVlalue = 0;
int result;
if(destory)
{
zmq_disconnect(requester, dest);
zmq_close(requester);
}
pSocket = zmq_socket(pContext, ZMQ_REQ);
if(NULL == pSocket) {
return NULL;
}
zmq_setsockopt(pSocket, ZMQ_LINGER, &optionVlalue, sizeof(int));
result = zmq_connect(pSocket, dest);
if(result < 0) {
zmq_close(pSocket);
return NULL;
}
return pSocket;
}
int cts_zmq_send(ZMQ_MSG *msg, const char* dest)
{
int result;
int expect_reply = 1;
int retries_left = REQUEST_RETRIES;
void *pSocket = NULL;
pSocket = cts_zmq_socket_connect(pContext, pSocket, dest, 0);
if(pSocket == NULL)
{
return -1;
}
msg->MessageSequence = cts_zmq_random_get(10000);
result = zmq_send(pSocket, msg, sizeof(*msg), 0);
if(result < 0) {
zmq_disconnect(pSocket, dest);
zmq_close(pSocket);
return -2;
}
while (expect_reply)
{
zmq_pollitem_t items[] = {{pSocket, 0, ZMQ_POLLIN, 0}};
int rc = zmq_poll(items, 1, REQUEST_TIMEOUT);
if (rc == -1) {
result = -3;
break;
}
if (items[0].revents & ZMQ_POLLIN) {
ZMQ_MSG recvMsg;
int rcv = zmq_recv(pSocket, &recvMsg, sizeof(recvMsg), 0);
if (rcv < 0) {
result = -4;
break;
} else {
retries_left = REQUEST_RETRIES;
expect_reply = 0;
if (recvMsg.MessageSequence == msg->MessageSequence) {
result = 0;
break;
} else {
result = -5;
break;
}
}
} else {
if (retries_left <= 0) {
retries_left = REQUEST_RETRIES;
result = -6;
break;
} else {
retries_left -=1;
pSocket = cts_zmq_socket_connect(pContext, pSocket, dest, 1);
if(pSocket == NULL)
{
return -7;
}
msg->MessageSequence = cts_zmq_random_get(10000);
zmq_send(pSocket, msg, sizeof(*msg), 0);
continue;
}
}
}
zmq_disconnect(pSocket, dest);
zmq_close(pSocket);
return result;
}