6262#define DMQ_CONNSTR_MAX_LEN 1024
6363
6464#define DMQ_MAX_SUBS_PER_BACKEND 100
65- #define DMQ_MAX_DESTINATIONS 100
65+ #define DMQ_MAX_DESTINATIONS 127
6666#define DMQ_MAX_RECEIVERS 100
6767
6868typedef enum
@@ -118,7 +118,7 @@ struct DmqSharedState
118118
119119
120120/* Backend-local i/o queues. */
121- struct
121+ static struct
122122{
123123 shm_mq_handle * mq_outh ;
124124 int n_inhandles ;
@@ -294,14 +294,6 @@ dmq_toc_size()
294294 *
295295 *****************************************************************************/
296296
297- // static void
298- // fe_close(PGconn *conn)
299- // {
300- // PQputCopyEnd(conn, NULL);
301- // PQflush(conn);
302- // PQfinish(conn);
303- // }
304-
305297static int
306298fe_send (PGconn * conn , char * msg , size_t len )
307299{
@@ -435,12 +427,12 @@ dmq_sender_main(Datum main_arg)
435427 res = shm_mq_receive (mq_handles [i ], & len , & data , true);
436428 if (res == SHM_MQ_SUCCESS )
437429 {
438- int conn_id ;
430+ DmqDestinationId conn_id ;
439431
440432 /* first byte is connection_id */
441- conn_id = * (char * ) data ;
442- data = (char * ) data + 1 ;
443- len -= 1 ;
433+ conn_id = * (DmqDestinationId * ) data ;
434+ data = (char * ) data + sizeof ( DmqDestinationId ) ;
435+ len -= sizeof ( DmqDestinationId ) ;
444436 Assert (0 <= conn_id && conn_id < DMQ_MAX_DESTINATIONS );
445437
446438 if (conns [conn_id ].state == Active )
@@ -724,7 +716,9 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
724716{
725717 const char * stream_name ;
726718 const char * body ;
719+ const char * msgptr ;
727720 int body_len ;
721+ int msg_len ;
728722 bool found ;
729723 DmqStreamSubscription * sub ;
730724 shm_mq_result res ;
@@ -734,9 +728,11 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
734728 * as message body with unknown format that we are going to send down to
735729 * the subscribed backend.
736730 */
737- stream_name = pq_getmsgrawstring (msg );
738- body_len = msg -> len - msg -> cursor ;
739- body = pq_getmsgbytes (msg , body_len );
731+ msg_len = msg -> len - msg -> cursor ;
732+ msgptr = pq_getmsgbytes (msg , msg_len );
733+ stream_name = msgptr ;
734+ body = msgptr + strlen (stream_name ) + 1 ;
735+ body_len = msg_len - (body - msgptr );
740736 pq_getmsgend (msg );
741737
742738 /*
@@ -773,7 +769,7 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
773769 sub -> procno );
774770
775771 /* and send it */
776- res = shm_mq_send (mq_handles [sub -> procno ], body_len , body , false);
772+ res = shm_mq_send (mq_handles [sub -> procno ], msg_len , msgptr , false);
777773 if (res != SHM_MQ_SUCCESS )
778774 {
779775 mtm_log (WARNING , "[DMQ] can't send to queue %d" , sub -> procno );
@@ -1345,11 +1341,18 @@ dmq_reattach_shm_mq(int handle_id)
13451341}
13461342
13471343DmqSenderId
1348- dmq_attach_receiver (char * sender_name , int mask_pos )
1344+ dmq_attach_receiver (const char * sender_name , int mask_pos )
13491345{
13501346 int i ;
13511347 int handle_id ;
13521348
1349+ /* Search for existed receiver. */
1350+ for (i = 0 ; i < dmq_local .n_inhandles ; i ++ )
1351+ {
1352+ if (strcmp (sender_name , dmq_local .inhandles [i ].name ) == 0 )
1353+ return i ;
1354+ }
1355+
13531356 for (i = 0 ; i < DMQ_MAX_RECEIVERS ; i ++ )
13541357 {
13551358 if (dmq_local .inhandles [i ].name [0 ] == '\0' )
@@ -1375,7 +1378,7 @@ dmq_attach_receiver(char *sender_name, int mask_pos)
13751378}
13761379
13771380void
1378- dmq_detach_receiver (char * sender_name )
1381+ dmq_detach_receiver (const char * sender_name )
13791382{
13801383 int i ;
13811384 int handle_id = -1 ;
@@ -1440,6 +1443,36 @@ dmq_stream_unsubscribe(const char *stream_name)
14401443 Assert (found );
14411444}
14421445
1446+ const char *
1447+ dmq_sender_name (DmqSenderId id )
1448+ {
1449+ Assert ((id >= 0 ) && (id < dmq_local .n_inhandles ));
1450+
1451+ if (dmq_local .inhandles [id ].name [0 ] == '\0' )
1452+ return NULL ;
1453+ return dmq_local .inhandles [id ].name ;
1454+ }
1455+
1456+ DmqDestinationId
1457+ dmq_remote_id (const char * name )
1458+ {
1459+ DmqDestinationId i ;
1460+
1461+ LWLockAcquire (dmq_state -> lock , LW_SHARED );
1462+ for (i = 0 ; i < DMQ_MAX_DESTINATIONS ; i ++ )
1463+ {
1464+ DmqDestination * dest = & (dmq_state -> destinations [i ]);
1465+ if (strcmp (name , dest -> receiver_name ) == 0 )
1466+ break ;
1467+ }
1468+ LWLockRelease (dmq_state -> lock );
1469+
1470+ if (i == DMQ_MAX_DESTINATIONS )
1471+ return -1 ;
1472+
1473+ return i ;
1474+ }
1475+
14431476/*
14441477 * Get a message from input queue. Execution blocking until message will not
14451478 * received. Returns false, if an error is occured.
@@ -1448,10 +1481,12 @@ dmq_stream_unsubscribe(const char *stream_name)
14481481 * msg - buffer that contains received message.
14491482 * len - size of received message.
14501483 */
1451- bool
1452- dmq_pop (DmqSenderId * sender_id , void * * msg , Size * len , uint64 mask )
1484+ const char *
1485+ dmq_pop (DmqSenderId * sender_id , void * * msg , Size * len , uint64 mask ,
1486+ bool waitMsg )
14531487{
14541488 shm_mq_result res ;
1489+ const char * stream ;
14551490
14561491 Assert (msg && len );
14571492
@@ -1477,13 +1512,19 @@ dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask)
14771512
14781513 if (res == SHM_MQ_SUCCESS )
14791514 {
1480- * msg = data ;
1515+ /*
1516+ * Stream name is first null-terminated string in
1517+ * the message buffer.
1518+ */
1519+ stream = data ;
1520+ * msg = (void * ) ((char * )data + strlen (stream ) + 1 );
1521+ * len -= (char * )(* msg ) - (char * )data ;
14811522 * sender_id = i ;
14821523
14831524 mtm_log (DmqTraceIncoming ,
14841525 "[DMQ] dmq_pop: got message %s from %s" ,
14851526 (char * ) data , dmq_local .inhandles [i ].name );
1486- return true ;
1527+ return stream ;
14871528 }
14881529 else if (res == SHM_MQ_DETACHED )
14891530 {
@@ -1498,13 +1539,15 @@ dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask)
14981539 else
14991540 {
15001541 * sender_id = i ;
1501- return false ;
1542+ return NULL ;
15021543 }
15031544 }
15041545 }
15051546
1506- if (nowait )
1547+ if (nowait && waitMsg )
15071548 continue ;
1549+ if (!waitMsg )
1550+ return NULL ;
15081551
15091552 // XXX cache that
15101553 rc = WaitLatch (MyLatch , WL_LATCH_SET | WL_TIMEOUT , 10.0 ,
@@ -1516,6 +1559,7 @@ dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask)
15161559 if (rc & WL_LATCH_SET )
15171560 ResetLatch (MyLatch );
15181561 }
1562+ return NULL ;
15191563}
15201564
15211565bool
@@ -1566,8 +1610,7 @@ dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask)
15661610 * sender_name - a symbolic name of the sender. Remote backend will attach
15671611 * to this channel by sender name.
15681612 * See dmq_attach_receiver() routine for details.
1569- * Call this function after shared memory initialization. For example,
1570- * extensions may create channels during 'CREATE EXTENSION' command execution.
1613+ * Call this function after shared memory initialization.
15711614 */
15721615DmqDestinationId
15731616dmq_destination_add (char * connstr , char * sender_name , char * receiver_name ,
0 commit comments