33 *
44 */
55
6+ #include "sbuf.h"
67#include "stream.h"
78#include "miscadmin.h"
89#include "unistd.h"
910#include "utils/memutils.h" /* MemoryContexts */
1011
11- #define IsDeliveryMessage (msg ) (msg->tot_len == MinSizeOfSendBuf )
12+ #define IsDeliveryMessage (msg ) (msg->datalen == 0 )
1213
1314static List * istreams = NIL ;
1415static List * ostreams = NIL ;
@@ -41,6 +42,8 @@ get_stream(List *streams, const char *name)
4142 for (lc = list_head (streams ); lc != NULL ; lc = lnext (lc ))
4243 {
4344 char * streamName = (char * ) lfirst (lc );
45+
46+ /* streamName is a first field of IStream and OStream structures. */
4447 if (strcmp (name , streamName ) == 0 )
4548 return streamName ;
4649 }
@@ -153,12 +156,13 @@ RecvIfAny(void)
153156 DmqDestinationId dest_id ;
154157
155158 /* If message is not delivery message, send delivery. */
156- dbuf .tot_len = MinSizeOfSendBuf ;
159+ dbuf .datalen = 0 ;
157160 dbuf .index = msg -> index ;
158161 dest_id = dmq_dest_id (sender_id );
159162 Assert (dest_id >= 0 );
160163
161- dmq_push_buffer (dest_id , istream -> streamName , & dbuf , dbuf .tot_len , false);
164+ dmq_push_buffer (dest_id , istream -> streamName , & dbuf ,
165+ MinSizeOfSendBuf , false);
162166 }
163167 }
164168}
@@ -193,17 +197,18 @@ checkDelivery(OStream *ostream)
193197 {
194198 RecvBuf * buf = lfirst (lc );
195199
196- istream -> msgs = list_delete_ptr (istream -> msgs , buf );
200+ istream -> deliveries = list_delete_ptr (istream -> deliveries , buf );
197201 pfree (buf );
198202 }
203+ list_free (temp );
199204 return found ;
200205}
201206
202207static void
203208StreamRepeatSend (OStream * ostream )
204209{
205210 while (!dmq_push_buffer (ostream -> dest_id , ostream -> streamName , ostream -> buf ,
206- ostream -> buf -> tot_len , true))
211+ buf_len ( ostream -> buf ) , true))
207212 RecvIfAny ();
208213}
209214
@@ -215,36 +220,31 @@ ISendTuple(DmqDestinationId dest_id, char *stream, TupleTableSlot *slot,
215220 int tupsize ;
216221 SendBuf * buf ;
217222 OStream * ostream ;
223+ int tot_len ;
218224
219225 RecvIfAny ();
220226
221227 ostream = (OStream * ) get_stream (ostreams , stream );
222228 Assert (ostream && !ostream -> buf );
223229
224- if (!TupIsNull (slot ))
225- {
226- int tot_len ;
230+ Assert (!TupIsNull (slot ));
227231
228- if (slot -> tts_tuple == NULL )
229- ExecMaterializeSlot (slot );
232+ if (slot -> tts_tuple == NULL )
233+ ExecMaterializeSlot (slot );
230234
231- tuple = slot -> tts_tuple ;
232- tupsize = offsetof(HeapTupleData , t_data );
233-
234- tot_len = MinSizeOfSendBuf + tupsize + tuple -> t_len ;
235- buf = palloc (tot_len );
236- buf -> tot_len = tot_len ;
237- memcpy (buf -> data , tuple , tupsize );
238- memcpy (buf -> data + tupsize , tuple -> t_data , tuple -> t_len );
239- }
240- else
241- Assert (0 );
235+ tuple = slot -> tts_tuple ;
236+ tupsize = offsetof(HeapTupleData , t_data );
237+ tot_len = MinSizeOfSendBuf + tupsize + tuple -> t_len ;
238+ buf = palloc (tot_len );
239+ buf -> datalen = tot_len - MinSizeOfSendBuf ;
240+ memcpy (buf -> data , tuple , tupsize );
241+ memcpy (buf -> data + tupsize , tuple -> t_data , tuple -> t_len );
242242
243243 buf -> index = ++ (ostream -> index );
244244 buf -> needConfirm = needConfirm ;
245245 ostream -> dest_id = dest_id ;
246246
247- while (!dmq_push_buffer (dest_id , stream , buf , buf -> tot_len , true))
247+ while (!dmq_push_buffer (dest_id , stream , buf , buf_len ( buf ) , true))
248248 RecvIfAny ();
249249
250250 if (buf -> needConfirm )
@@ -284,15 +284,15 @@ SendByteMessage(DmqDestinationId dest_id, char *stream, char tag)
284284 Assert (ostream && !ostream -> buf );
285285
286286 buf = palloc (MinSizeOfSendBuf + 1 );
287- buf -> tot_len = MinSizeOfSendBuf + 1 ;
287+ buf -> datalen = 1 ;
288288 buf -> data [0 ] = tag ;
289289 buf -> index = ++ (ostream -> index );
290290 buf -> needConfirm = true;
291291
292292 ostream -> buf = buf ;
293293 ostream -> dest_id = dest_id ;
294294
295- while (!dmq_push_buffer (dest_id , stream , buf , buf -> tot_len , true))
295+ while (!dmq_push_buffer (dest_id , stream , buf , buf_len ( buf ) , true))
296296 RecvIfAny ();
297297
298298 wait_for_delivery (ostream );
@@ -335,7 +335,7 @@ RecvByteMessage(const char *streamName, const char *sender)
335335 */
336336void
337337SendTuple (DmqDestinationId dest_id , char * stream , TupleTableSlot * slot ,
338- bool needConfirm )
338+ bool needConfirm )
339339{
340340 OStream * ostream ;
341341
0 commit comments