@@ -7,42 +7,61 @@ package snapshot
77
88import (
99 "context"
10+ "fmt"
1011 "path"
1112
13+ "github.com/docker/docker/api/types"
14+ "github.com/docker/docker/api/types/container"
15+ "github.com/docker/docker/api/types/network"
16+ "github.com/docker/docker/client"
1217 "github.com/pkg/errors"
1318
1419 dblabCfg "gitlab.com/postgres-ai/database-lab/pkg/config"
20+ "gitlab.com/postgres-ai/database-lab/pkg/log"
1521 "gitlab.com/postgres-ai/database-lab/pkg/retrieval/config"
1622 "gitlab.com/postgres-ai/database-lab/pkg/retrieval/dbmarker"
1723 "gitlab.com/postgres-ai/database-lab/pkg/retrieval/engine/postgres/tools"
24+ "gitlab.com/postgres-ai/database-lab/pkg/retrieval/engine/postgres/tools/cont"
25+ "gitlab.com/postgres-ai/database-lab/pkg/retrieval/engine/postgres/tools/health"
1826 "gitlab.com/postgres-ai/database-lab/pkg/retrieval/options"
1927 "gitlab.com/postgres-ai/database-lab/pkg/services/provision/databases/postgres/configuration"
2028 "gitlab.com/postgres-ai/database-lab/pkg/services/provision/thinclones"
2129)
2230
2331// LogicalInitial describes a job for preparing a logical initial snapshot.
2432type LogicalInitial struct {
25- name string
26- cloneManager thinclones.Manager
27- options LogicalOptions
28- globalCfg * dblabCfg.Global
29- dbMarker * dbmarker.Marker
33+ name string
34+ cloneManager thinclones.Manager
35+ dockerClient * client.Client
36+ options LogicalOptions
37+ globalCfg * dblabCfg.Global
38+ dbMarker * dbmarker.Marker
39+ queryProcessor * queryProcessor
3040}
3141
3242// LogicalOptions describes options for a logical initialization job.
3343type LogicalOptions struct {
44+ DataPatching DataPatching `yaml:"dataPatching"`
3445 PreprocessingScript string `yaml:"preprocessingScript"`
3546 Configs map [string ]string `yaml:"configs"`
3647 Schedule Scheduler `yaml:"schedule"`
3748}
3849
50+ // DataPatching allows executing queries to transform data before snapshot taking.
51+ type DataPatching struct {
52+ DockerImage string `yaml:"dockerImage"`
53+ QueryPreprocessing QueryPreprocessing `yaml:"queryPreprocessing"`
54+ }
55+
3956const (
4057 // LogicalInitialType declares a job type for preparing a logical initial snapshot.
4158 LogicalInitialType = "logicalSnapshot"
59+
60+ patchContainerPrefix = "dblab_patch_"
4261)
4362
4463// NewLogicalInitialJob creates a new logical initial job.
45- func NewLogicalInitialJob (cfg config.JobConfig , cloneManager thinclones.Manager ,
64+ func NewLogicalInitialJob (cfg config.JobConfig , dockerClient * client. Client , cloneManager thinclones.Manager ,
4665 global * dblabCfg.Global , marker * dbmarker.Marker ) (* LogicalInitial , error ) {
4766 li := & LogicalInitial {
4867 name : cfg .Name ,
@@ -55,6 +74,12 @@ func NewLogicalInitialJob(cfg config.JobConfig, cloneManager thinclones.Manager,
5574 return nil , errors .Wrap (err , "failed to unmarshal configuration options" )
5675 }
5776
77+ if li .options .DataPatching .QueryPreprocessing .QueryPath != "" {
78+ li .queryProcessor = newQueryProcessor (dockerClient , global .Database .Name (), global .Database .User (),
79+ li .options .DataPatching .QueryPreprocessing .QueryPath ,
80+ li .options .DataPatching .QueryPreprocessing .MaxParallelWorkers )
81+ }
82+
5883 return li , nil
5984}
6085
@@ -63,13 +88,18 @@ func (s *LogicalInitial) Name() string {
6388 return s .name
6489}
6590
91+ // patchContainerName returns container name.
92+ func (s * LogicalInitial ) patchContainerName () string {
93+ return patchContainerPrefix + s .globalCfg .InstanceID
94+ }
95+
6696// Reload reloads job configuration.
6797func (s * LogicalInitial ) Reload (cfg map [string ]interface {}) (err error ) {
6898 return options .Unmarshal (cfg , & s .options )
6999}
70100
71101// Run starts the job.
72- func (s * LogicalInitial ) Run (_ context.Context ) error {
102+ func (s * LogicalInitial ) Run (ctx context.Context ) error {
73103 if s .options .PreprocessingScript != "" {
74104 if err := runPreprocessingScript (s .options .PreprocessingScript ); err != nil {
75105 return err
@@ -80,16 +110,24 @@ func (s *LogicalInitial) Run(_ context.Context) error {
80110 return errors .Wrap (err , "failed to create PostgreSQL configuration files" )
81111 }
82112
113+ dataDir := s .globalCfg .DataDir ()
114+
83115 // Run basic PostgreSQL configuration.
84- if err := configuration .NewCorrector ().Run (s . globalCfg . DataDir () ); err != nil {
116+ if err := configuration .NewCorrector ().Run (dataDir ); err != nil {
85117 return errors .Wrap (err , "failed to adjust PostgreSQL configs" )
86118 }
87119
88120 // Apply user defined configs.
89- if err := applyUsersConfigs (s .options .Configs , path .Join (s . globalCfg . DataDir () , "postgresql.conf" )); err != nil {
121+ if err := applyUsersConfigs (s .options .Configs , path .Join (dataDir , "postgresql.conf" )); err != nil {
90122 return errors .Wrap (err , "failed to apply user-defined configs" )
91123 }
92124
125+ if s .queryProcessor != nil {
126+ if err := s .runPreprocessingQueries (ctx , dataDir ); err != nil {
127+ return errors .Wrap (err , "failed to run preprocessing queries" )
128+ }
129+ }
130+
93131 dataStateAt := extractDataStateAt (s .dbMarker )
94132
95133 if _ , err := s .cloneManager .CreateSnapshot ("" , dataStateAt ); err != nil {
@@ -100,9 +138,115 @@ func (s *LogicalInitial) Run(_ context.Context) error {
100138}
101139
102140func (s * LogicalInitial ) touchConfigFiles () error {
103- if err := tools .TouchFile (path .Join (s .globalCfg .DataDir (), "postgresql.conf" )); err != nil {
141+ dataDir := s .globalCfg .DataDir ()
142+
143+ if err := tools .TouchFile (path .Join (dataDir , "postgresql.conf" )); err != nil {
104144 return err
105145 }
106146
107- return tools .TouchFile (path .Join (s .globalCfg .DataDir (), "pg_hba.conf" ))
147+ return tools .TouchFile (path .Join (dataDir , "pg_hba.conf" ))
148+ }
149+
150+ func (s * LogicalInitial ) runPreprocessingQueries (ctx context.Context , dataDir string ) error {
151+ pgVersion , err := tools .DetectPGVersion (dataDir )
152+ if err != nil {
153+ return errors .Wrap (err , "failed to detect the Postgres version" )
154+ }
155+
156+ patchImage := s .options .DataPatching .DockerImage
157+ if patchImage == "" {
158+ patchImage = fmt .Sprintf ("postgresai/sync-instance:%s" , pgVersion )
159+ }
160+
161+ if err := tools .PullImage (ctx , s .dockerClient , patchImage ); err != nil {
162+ return errors .Wrap (err , "failed to scan image pulling response" )
163+ }
164+
165+ pwd , err := tools .GeneratePassword ()
166+ if err != nil {
167+ return errors .Wrap (err , "failed to generate PostgreSQL password" )
168+ }
169+
170+ hostConfig , err := s .buildHostConfig (ctx )
171+ if err != nil {
172+ return errors .Wrap (err , "failed to build container host config" )
173+ }
174+
175+ // Run patch container.
176+ patchCont , err := s .dockerClient .ContainerCreate (ctx ,
177+ s .buildContainerConfig (dataDir , patchImage , pwd ),
178+ hostConfig ,
179+ & network.NetworkingConfig {},
180+ s .patchContainerName (),
181+ )
182+ if err != nil {
183+ return errors .Wrap (err , "failed to create container" )
184+ }
185+
186+ defer tools .RemoveContainer (ctx , s .dockerClient , patchCont .ID , cont .StopPhysicalTimeout )
187+
188+ defer func () {
189+ if err != nil {
190+ tools .PrintContainerLogs (ctx , s .dockerClient , s .patchContainerName ())
191+ }
192+ }()
193+
194+ log .Msg (fmt .Sprintf ("Running container: %s. ID: %v" , s .patchContainerName (), patchCont .ID ))
195+
196+ if err := s .dockerClient .ContainerStart (ctx , patchCont .ID , types.ContainerStartOptions {}); err != nil {
197+ return errors .Wrap (err , "failed to start container" )
198+ }
199+
200+ log .Msg ("Starting PostgreSQL" )
201+ log .Msg (fmt .Sprintf ("View logs using the command: %s %s" , tools .ViewLogsCmd , s .patchContainerName ()))
202+
203+ // Start PostgreSQL instance.
204+ if err := tools .RunPostgres (ctx , s .dockerClient , patchCont .ID , dataDir ); err != nil {
205+ return errors .Wrap (err , "failed to start PostgreSQL instance" )
206+ }
207+
208+ log .Msg ("Waiting for PostgreSQL is ready" )
209+
210+ if err := tools .CheckContainerReadiness (ctx , s .dockerClient , patchCont .ID ); err != nil {
211+ return errors .Wrap (err , "failed to readiness check" )
212+ }
213+
214+ if err := s .queryProcessor .applyPreprocessingQueries (ctx , patchCont .ID ); err != nil {
215+ return errors .Wrap (err , "failed to run preprocessing queries" )
216+ }
217+
218+ return nil
219+ }
220+
221+ func (s * LogicalInitial ) buildHostConfig (ctx context.Context ) (* container.HostConfig , error ) {
222+ hostConfig := & container.HostConfig {}
223+
224+ if err := tools .AddVolumesToHostConfig (ctx , s .dockerClient , hostConfig , s .globalCfg .DataDir ()); err != nil {
225+ return nil , err
226+ }
227+
228+ return hostConfig , nil
229+ }
230+
231+ func (s * LogicalInitial ) buildContainerConfig (clonePath , patchImage , password string ) * container.Config {
232+ hcInterval := health .DefaultRestoreInterval
233+ hcRetries := health .DefaultRestoreRetries
234+
235+ return & container.Config {
236+ Labels : map [string ]string {
237+ cont .DBLabControlLabel : cont .DBLabPatchLabel ,
238+ cont .DBLabInstanceIDLabel : s .globalCfg .InstanceID ,
239+ },
240+ Env : []string {
241+ "PGDATA=" + clonePath ,
242+ "POSTGRES_PASSWORD=" + password ,
243+ },
244+ Image : patchImage ,
245+ Healthcheck : health .GetConfig (
246+ s .globalCfg .Database .User (),
247+ s .globalCfg .Database .Name (),
248+ health .OptionInterval (hcInterval ),
249+ health .OptionRetries (hcRetries ),
250+ ),
251+ }
108252}
0 commit comments