@@ -3,12 +3,35 @@ package Cluster;
33use strict;
44use warnings;
55
6- use Proc::ProcessTable;
76use PostgresNode;
87use TestLib;
98use Test::More;
109use Cwd;
1110
11+ use Socket;
12+
13+ use IPC::Run;
14+
15+ sub check_port
16+ {
17+ my ($host , $port ) = @_ ;
18+ my $iaddr = inet_aton($host );
19+ my $paddr = sockaddr_in($port , $iaddr );
20+ my $proto = getprotobyname (" tcp" );
21+ my $available = 0;
22+
23+ socket (SOCK, PF_INET, SOCK_STREAM, $proto )
24+ or die " socket failed: $! " ;
25+
26+ if (bind (SOCK, $paddr ) && listen (SOCK, SOMAXCONN))
27+ {
28+ $available = 1;
29+ }
30+
31+ close (SOCK);
32+ return $available ;
33+ }
34+
1235my %allocated_ports = ();
1336sub allocate_ports
1437{
@@ -19,8 +42,7 @@ sub allocate_ports
1942 {
2043 my $port = int (rand () * 16384) + 49152;
2144 next if $allocated_ports {$port };
22- diag(" checking for port $port \n " );
23- if (!TestLib::run_log([' pg_isready' , ' -h' , $host , ' -p' , $port ]))
45+ if (check_port($host , $port ))
2446 {
2547 $allocated_ports {$port } = 1;
2648 push (@allocated_now , $port );
@@ -44,6 +66,7 @@ sub new
4466 my $node = new PostgresNode(" node$i " , $host , $pgport );
4567 $node -> {id } = $i ;
4668 $node -> {arbiter_port } = $arbiter_port ;
69+ $node -> {mmconnstr } = " ${ \$ node->connstr('postgres') } arbiter_port=${ \$ node->{arbiter_port} }" ;
4770 push (@$nodes , $node );
4871 }
4972
@@ -67,48 +90,54 @@ sub init
6790 }
6891}
6992
93+ sub all_connstrs
94+ {
95+ my ($self ) = @_ ;
96+ my $nodes = $self -> {nodes };
97+ return join (' , ' , map { " ${ \$ _->connstr('postgres') } arbiter_port=${ \$ _->{arbiter_port} }" } @$nodes );
98+ }
99+
100+
70101sub configure
71102{
72103 my ($self ) = @_ ;
73104 my $nodes = $self -> {nodes };
74- my $nnodes = scalar @{ $nodes };
75105
76- my $connstr = join ( ' , ' , map { " ${ \$ _->connstr('postgres') } arbiter_port=${ \$ _->{arbiter_port} } " } @$nodes );
106+ my $connstr = $self -> all_connstrs( );
77107
78108 foreach my $node (@$nodes )
79109 {
80110 my $id = $node -> {id };
81111 my $host = $node -> host;
82112 my $pgport = $node -> port;
83113 my $arbiter_port = $node -> {arbiter_port };
114+ my $unix_sock_dir = $ENV {PGHOST };
84115
85116 $node -> append_conf(" postgresql.conf" , qq(
86117 log_statement = none
87118 listen_addresses = '$host '
88- unix_socket_directories = ''
119+ unix_socket_directories = '$unix_sock_dir '
89120 port = $pgport
90- max_prepared_transactions = 200
91- max_connections = 200
121+ max_prepared_transactions = 10
122+ max_connections = 10
92123 max_worker_processes = 100
93124 wal_level = logical
94- fsync = off
95- max_wal_senders = 10
125+ max_wal_senders = 6
96126 wal_sender_timeout = 0
97127 default_transaction_isolation = 'repeatable read'
98- max_replication_slots = 10
128+ max_replication_slots = 6
99129 shared_preload_libraries = 'multimaster'
130+ shared_buffers = 16MB
100131
101132 multimaster.arbiter_port = $arbiter_port
102- multimaster.workers = 10
103- multimaster.queue_size = 10485760 # 10mb
133+ multimaster.workers = 1
104134 multimaster.node_id = $id
105135 multimaster.conn_strings = '$connstr '
106- multimaster.heartbeat_recv_timeout = 1000
136+ multimaster.heartbeat_recv_timeout = 1050
107137 multimaster.heartbeat_send_timeout = 250
108- multimaster.max_nodes = $nnodes
109- multimaster.ignore_tables_without_pk = true
110- multimaster.twopc_min_timeout = 50000
111- multimaster.min_2pc_timeout = 50000
138+ multimaster.max_nodes = 6
139+ multimaster.ignore_tables_without_pk = false
140+ multimaster.queue_size = 4194304
112141 log_line_prefix = '%t : '
113142 ) );
114143
@@ -128,6 +157,7 @@ sub start
128157 foreach my $node (@$nodes )
129158 {
130159 $node -> start();
160+ note( " Starting node with connstr 'dbname=postgres port=@{[ $node ->port() ]} host=@{[ $node ->host() ]}'" );
131161 }
132162}
133163
@@ -137,7 +167,7 @@ sub stopnode
137167 return 1 unless defined $node -> {_pid };
138168 $mode = ' fast' unless defined $mode ;
139169 my $name = $node -> name;
140- diag (" stopping $name ${mode} ly" );
170+ note (" stopping $name ${mode} ly" );
141171
142172 if ($mode eq ' kill' ) {
143173 killtree($node -> {_pid });
@@ -147,13 +177,13 @@ sub stopnode
147177 my $pgdata = $node -> data_dir;
148178 my $ret = TestLib::system_log(' pg_ctl' , ' -D' , $pgdata , ' -m' , ' fast' , ' stop' );
149179 my $pidfile = $node -> data_dir . " /postmaster.pid" ;
150- diag (" unlink $pidfile " );
180+ note (" unlink $pidfile " );
151181 unlink $pidfile ;
152182 $node -> {_pid } = undef ;
153183 $node -> _update_pid;
154184
155185 if ($ret != 0) {
156- diag (" $name failed to stop ${mode} ly" );
186+ note (" $name failed to stop ${mode} ly" );
157187 return 0;
158188 }
159189
@@ -166,43 +196,22 @@ sub stopid
166196 return stopnode($self -> {nodes }-> [$idx ]);
167197}
168198
169- sub killtree
199+ sub dumplogs
170200{
171- my $root = shift ;
172- diag(" killtree $root \n " );
173-
174- my $t = new Proc::ProcessTable;
175-
176- my %parent = ();
177- # my %cmd = ();
178- foreach my $p (@{$t -> table}) {
179- $parent {$p -> pid} = $p -> ppid;
180- # $cmd{$p->pid} = $p->cmndline;
181- }
201+ my ($self ) = @_ ;
202+ my $nodes = $self -> {nodes };
182203
183- if (!defined $root ) {
184- return ;
185- }
186- my @queue = ($root );
187- my @killist = ();
188-
189- while (scalar @queue ) {
190- my $victim = shift @queue ;
191- while (my ($pid , $ppid ) = each %parent ) {
192- if ($ppid == $victim ) {
193- push @queue , $pid ;
194- }
195- }
196- diag(" SIGSTOP to $victim " );
197- kill ' STOP' , $victim ;
198- unshift @killist , $victim ;
204+ note(" Dumping logs:" );
205+ foreach my $node (@$nodes ) {
206+ note(" ##################################################################" );
207+ note($node -> {_logfile });
208+ note(" ##################################################################" );
209+ my $filename = $node -> {_logfile };
210+ open my $fh , ' <' , $filename or die " error opening $filename : $! " ;
211+ my $data = do { local $/ ; <$fh > };
212+ note($data );
213+ note(" ##################################################################\n\n " );
199214 }
200-
201- diag(" SIGKILL to " . join (' ' , @killist ));
202- kill ' KILL' , @killist ;
203- # foreach my $victim (@killist) {
204- # print("kill $victim " . $cmd{$victim} . "\n");
205- # }
206215}
207216
208217sub stop
@@ -211,34 +220,32 @@ sub stop
211220 my $nodes = $self -> {nodes };
212221 $mode = ' fast' unless defined $mode ;
213222
214- diag(" Dumping logs:" );
215- foreach my $node (@$nodes ) {
216- diag(" ##################################################################" );
217- diag($node -> {_logfile });
218- diag(" ##################################################################" );
219- my $filename = $node -> {_logfile };
220- open my $fh , ' <' , $filename or die " error opening $filename : $! " ;
221- my $data = do { local $/ ; <$fh > };
222- diag($data );
223- diag(" ##################################################################\n\n " );
224- }
225-
226223 my $ok = 1;
227- diag (" stopping cluster ${mode} ly" );
224+ note (" stopping cluster ${mode} ly" );
228225
229226 foreach my $node (@$nodes ) {
230227 if (!stopnode($node , $mode )) {
231228 $ok = 0;
232- if (!stopnode($node , ' kill' )) {
233- my $name = $node -> name;
234- BAIL_OUT(" failed to kill $name " );
235- }
229+ # if (!stopnode($node, 'kill')) {
230+ # my $name = $node->name;
231+ # BAIL_OUT("failed to kill $name");
232+ # }
236233 }
237234 }
238235 sleep (2);
236+
237+ $self -> dumplogs();
238+
239239 return $ok ;
240240}
241241
242+ sub bail_out_with_logs
243+ {
244+ my ($self , $msg ) = @_ ;
245+ $self -> dumplogs();
246+ BAIL_OUT($msg );
247+ }
248+
242249sub teardown
243250{
244251 my ($self ) = @_ ;
@@ -269,10 +276,127 @@ sub poll
269276 return 1;
270277 }
271278 my $tries_left = $tries - $i - 1;
272- diag (" $poller poll for $pollee failed [$tries_left tries left]" );
279+ note (" $poller poll for $pollee failed [$tries_left tries left]" );
273280 sleep ($delay );
274281 }
275282 return 0;
276283}
277284
285+ sub pgbench ()
286+ {
287+ my ($self , $node , @args ) = @_ ;
288+ my $pgbench_handle = $self -> pgbench_async($node , @args );
289+ $self -> pgbench_await($pgbench_handle );
290+ }
291+
292+ sub pgbench_async ()
293+ {
294+ my ($self , $node , @args ) = @_ ;
295+
296+ my ($in , $out , $err , $rc );
297+ $in = ' ' ;
298+ $out = ' ' ;
299+
300+ my @pgbench_command = (
301+ ' pgbench' ,
302+ @args ,
303+ -h => $self -> {nodes }-> [$node ]-> host(),
304+ -p => $self -> {nodes }-> [$node ]-> port(),
305+ ' postgres' ,
306+ );
307+ note(" running pgbench: " . join (" " , @pgbench_command ));
308+ my $handle = IPC::Run::start(\@pgbench_command , $in , $out );
309+ return $handle ;
310+ }
311+
312+ sub pgbench_await ()
313+ {
314+ my ($self , $pgbench_handle ) = @_ ;
315+ IPC::Run::finish($pgbench_handle ) || BAIL_OUT(" pgbench exited with $? " );
316+ }
317+
318+ sub is_data_identic ()
319+ {
320+ my ($self , @nodenums ) = @_ ;
321+ my $checksum = ' ' ;
322+
323+ my $sql = " select md5('(' || string_agg(aid::text || ', ' || abalance::text , '),(') || ')')
324+ from (select * from pgbench_accounts order by aid) t;" ;
325+
326+ foreach my $i (@nodenums )
327+ {
328+ my $current_hash = ' ' ;
329+ $self -> {nodes }-> [$i ]-> psql(' postgres' , $sql , stdout => \$current_hash );
330+ if ($current_hash eq ' ' )
331+ {
332+ note(" got empty hash from node $i " );
333+ return 0;
334+ }
335+ if ($checksum eq ' ' )
336+ {
337+ $checksum = $current_hash ;
338+ }
339+ elsif ($checksum ne $current_hash )
340+ {
341+ note(" got different hashes: $checksum ang $current_hash " );
342+ return 0;
343+ }
344+ }
345+
346+ note($checksum );
347+ return 1;
348+ }
349+
350+ sub add_node ()
351+ {
352+ my ($self , %params ) = @_ ;
353+
354+ my $pgport ;
355+ my $arbiter_port ;
356+ my $connstrs ;
357+ my $node_id ;
358+
359+ if (defined $params {node_id })
360+ {
361+ $node_id = $params {node_id };
362+ $pgport = $params {port };
363+ $arbiter_port = $params {arbiter_port };
364+ $connstrs = $self -> all_connstrs();
365+ }
366+ else
367+ {
368+ $node_id = scalar (@{$self -> {nodes }}) + 1;
369+ $pgport = (allocate_ports(' 127.0.0.1' , 1))[0];
370+ $arbiter_port = (allocate_ports(' 127.0.0.1' , 1))[0];
371+ $connstrs = $self -> all_connstrs() . " , dbname=postgres host=127.0.0.1 port=$pgport arbiter_port=$arbiter_port " ;
372+ }
373+
374+ my $node = PostgresNode-> get_new_node(" node${node_id} x" );
375+
376+ $self -> {nodes }-> [0]-> backup(" backup_for_$node_id " );
377+ # do init from backup before setting host, since init_from_backup() checks
378+ # it default value
379+ $node -> init_from_backup($self -> {nodes }-> [0], " backup_for_$node_id " );
380+
381+ $node -> {_host } = ' 127.0.0.1' ;
382+ $node -> {_port } = $pgport ;
383+ $node -> {port } = $pgport ;
384+ $node -> {host } = ' 127.0.0.1' ;
385+ $node -> {arbiter_port } = $arbiter_port ;
386+ $node -> {mmconnstr } = " ${ \$ node->connstr('postgres') } arbiter_port=${ \$ node->{arbiter_port} }" ;
387+ $node -> append_conf(" postgresql.conf" , qq(
388+ multimaster.arbiter_port = $arbiter_port
389+ multimaster.conn_strings = '$connstrs '
390+ multimaster.node_id = $node_id
391+ port = $pgport
392+ ) );
393+ $node -> append_conf(" pg_hba.conf" , qq(
394+ local replication all trust
395+ host replication all 127.0.0.1/32 trust
396+ host replication all ::1/128 trust
397+ ) );
398+
399+ push (@{$self -> {nodes }}, $node );
400+ }
401+
2784021;
0 commit comments