source: subversion/applications/utils/export/osm2pgsql/gazetteer/util.update.php @ 22220

Revision 22220, 11.0 KB checked in by twain, 4 years ago (diff)

improved multi-processor indexing

Line 
1#!/usr/bin/php -Cq
2<?php
3        ini_set('memory_limit', '800M');
4        require_once('website/.htlib/lib.php');
5
6        $aCMDOptions = array(
7                "Import / update / index osm data",
8                array('help', 'h', 0, 1, 0, 0, false, 'Show Help'),
9                array('quiet', 'q', 0, 1, 0, 0, 'bool', 'Quiet output'),
10                array('verbose', 'v', 0, 1, 0, 0, 'bool', 'Verbose output'),
11
12                array('max-load', '', 0, 1, 1, 1, 'float', 'Maximum load average - indexing is paused if this is exceeded'),
13                array('max-blocking', '', 0, 1, 1, 1, 'int', 'Maximum blocking processes - indexing is aborted / paused if this is exceeded'),
14
15                array('import-hourly', '', 0, 1, 0, 0, 'bool', 'Import hourly diffs'),
16                array('import-daily', '', 0, 1, 0, 0, 'bool', 'Import daily diffs'),
17                array('import-all', '', 0, 1, 0, 0, 'bool', 'Import all available files'),
18
19                array('import-file', '', 0, 1, 1, 1, 'realpath', 'Re-import data from an OSM file'),
20                array('import-node', '', 0, 1, 1, 1, 'int', 'Re-import node'),
21                array('import-way', '', 0, 1, 1, 1, 'int', 'Re-import way'),
22                array('import-relation', '', 0, 1, 1, 1, 'int', 'Re-import relation'),
23
24                array('index', '', 0, 1, 0, 0, 'bool', 'Index'),
25                array('index-rank', '', 0, 1, 1, 1, 'int', 'Rank to index'),
26                array('index-instances', '', 0, 1, 1, 1, 'int', 'Number of indexing instances'),
27                array('index-instance', '', 0, 1, 1, 1, 'int', 'Which instance are we (0 to index-instances-1)'),
28                array('index-estrate', '', 0, 1, 1, 1, 'int', 'Estimated indexed items per second (def:30)'),
29        );
30        getCmdOpt($_SERVER['argv'], $aCMDOptions, $aResult, true, true);
31
32        if ($aResult['import-hourly'] && $aResult['import-daily']) showUsage($aCMDOptions, true, 'Select either import of hourly or daily');
33
34        if (!isset($aResult['index-instances']) || $aResult['index-instances'] == 1)
35        {
36                // Lock to prevent multiple copies running
37                if (exec('/bin/ps uww | grep '.basename(__FILE__).' | grep -v /dev/null | grep -v grep -c', $aOutput2, $iResult) > 1)
38                {
39                        echo "Copy already running\n";
40                        exit;
41                }
42        }
43
44        if (!isset($aResult['max-load'])) $aResult['max-load'] = 1.9;
45        if (!isset($aResult['max-blocking'])) $aResult['max-blocking'] = 3;
46
47        if (getBlockingProcesses() > $aResult['max-blocking'])
48        {
49                echo "Too many blocking processes for import\n";
50                exit;
51        }
52
53        // Assume osm2pgsql is in the folder above
54        $sBasePath = dirname(dirname(__FILE__));
55
56        require_once('DB.php');
57        $oDB =& DB::connect('pgsql://@/gazetteerworld', false);
58        if (PEAR::IsError($oDB))
59        {
60                echo $oDB->getMessage()."\n";
61                exit;
62        } 
63        $oDB->setFetchMode(DB_FETCHMODE_ASSOC);
64        $oDB->query("SET DateStyle TO 'sql,european'");
65        $oDB->query("SET client_encoding TO 'utf-8'");
66
67        $bFirst = true;
68        $bContinue = $aResult['import-all'];
69        while ($bContinue || $bFirst)
70        {
71                $bFirst = false;
72
73                if ($aResult['import-hourly'])
74                {
75                        // Mirror the hourly diffs
76                        exec('wget --quiet --mirror -l 1 -P /home/twain/ http://planet.openstreetmap.org/hourly');
77                        $sNextFile = $oDB->getOne('select TO_CHAR(lastimportdate,\'YYYYMMDDHH24\')||\'-\'||TO_CHAR(lastimportdate+\'1 hour\'::interval,\'YYYYMMDDHH24\')||\'.osc.gz\' from import_status');
78                        $sNextFile = '/home/twain/planet.openstreetmap.org/hourly/'.$sNextFile;
79                        $sUpdateSQL = 'update import_status set lastimportdate = lastimportdate+\'1 hour\'::interval';
80                }
81
82                if ($aResult['import-daily'])
83                {
84                        // Mirror the daily diffs
85                        exec('wget --quiet --mirror -l 1 -P /home/twain/ http://planet.openstreetmap.org/daily');
86                        $sNextFile = $oDB->getOne('select TO_CHAR(lastimportdate,\'YYYYMMDD\')||\'-\'||TO_CHAR(lastimportdate+\'1 day\'::interval,\'YYYYMMDD\')||\'.osc.gz\' from import_status');
87                        $sNextFile = '/home/twain/planet.openstreetmap.org/daily/'.$sNextFile;
88                        $sUpdateSQL = 'update import_status set lastimportdate = lastimportdate::date + 1';
89                }
90
91                // Missing file is not an error - it might not be created yet
92                if (($aResult['import-hourly'] || $aResult['import-daily']) && file_exists($sNextFile))
93                {
94                        // Import the file
95                        $sCMD = $sBasePath.'/osm2pgsql -las -C 2000 -O gazetteer -d gazetteerworld '.$sNextFile;
96                        echo $sCMD."\n";
97                        exec($sCMD, $sJunk, $iErrorLevel);
98
99                        if ($iErrorLevel)
100                        {
101                                echo "Error from $sBasePath/osm2pgsql -las -C 2000 -O gazetteer -d gazetteerworld $sNextFile, $iErrorLevel\n";
102                                exit;
103                        }
104       
105                        // Move the date onwards
106                        $oDB->query($sUpdateSQL);
107                }
108                else
109                {
110                        $bContinue = false;
111                }
112        }
113
114        $sModifyXML = false;
115        if (isset($aResult['import-file']) && $aResult['import-file'])
116        {
117                $sModifyXML = file_get_contents($aResult['import-file']);
118        }
119        if (isset($aResult['import-node']) && $aResult['import-node'])
120        {
121                $sModifyXML = file_get_contents('http://www.openstreetmap.org/api/0.6/node/'.$aResult['import-node']);
122        }
123        if (isset($aResult['import-way']) && $aResult['import-way'])
124        {
125                $sModifyXML = file_get_contents('http://www.openstreetmap.org/api/0.6/way/'.$aResult['import-way'].'/full');
126        }
127        if (isset($aResult['import-relation']) && $aResult['import-relation'])
128        {
129                $sModifyXML = file_get_contents('http://www.openstreetmap.org/api/0.6/relation/'.$aResult['import-relation'].'/full');
130        }
131        if ($sModifyXML)
132        {
133                // Hack into a modify request
134                $sModifyXML = str_replace('<osm version="0.6" generator="OpenStreetMap server">',
135                        '<osmChange version="0.6" generator="OpenStreetMap server"><modify>', $sModifyXML);
136                $sModifyXML = str_replace('</osm>', '</modify></osmChange>', $sModifyXML);
137                if ($aResult['verbose']) var_dump($sModifyXML);
138
139                $aSpec = array(
140                        0 => array("pipe", "r"),  // stdin
141                        1 => array("pipe", "w"),  // stdout
142                        2 => array("pipe", "w") // stderr
143                );
144                $aPipes = array();
145                $hProc = proc_open($sBasePath.'/osm2pgsql -las -C 2000 -O gazetteer -d gazetteerworld -', $aSpec, $aPipes);
146                if (!is_resource($hProc))
147                {
148                        echo "$sBasePath/osm2pgsql failed\n";
149                        exit;   
150                }
151                fwrite($aPipes[0], $sModifyXML);
152                fclose($aPipes[0]);
153                $sOut = stream_get_contents($aPipes[1]);
154                if ($aResult['verbose']) echo $sOut;
155                fclose($aPipes[1]);
156                $sErrors = stream_get_contents($aPipes[2]);
157                if ($aResult['verbose']) echo $sErrors;
158                fclose($aPipes[2]);
159                if ($iError = proc_close($hProc))
160                {
161                        echo "osm2pgsql existed with error level $iError\n";
162                        echo $sOut;
163                        echo $sError;
164                        exit;
165                }
166        }
167
168        if ($aResult['index'])
169        {
170                if (!isset($aResult['index-estrate'])) $aResult['index-estrate'] = 30;
171
172                if (getBlockingProcesses() > $aResult['max-blocking'])
173                {
174                        echo "Too many blocking processes for index\n";
175                        exit;
176                }
177
178                $sModSQL = '';
179/*
180                if (isset($aResult['index-instances']) && $aResult['index-instances'] > 1)
181                {
182                        $sModSQL = ' and geometry_index(geometry,indexed,name) % '.$aResult['index-instances'].' = '.(int)$aResult['index-instance'].' ';
183                }
184*/
185                // Re-index the new items
186                if (!$aResult['index-rank']) $aResult['index-rank'] = 0;
187                for ($i = $aResult['index-rank']; $i <= 30; $i++)
188                { 
189                        echo "Rank: $i";
190                        $iStartTime = date('U');
191                        flush();
192                       
193                        $sSQL = 'select geometry_index(geometry,indexed,name),count(*) from placex where rank_search = '.$i.' and indexed = false and name is not null '.$sModSQL.'group by geometry_index(geometry,indexed,name) order by count desc, random()';
194                        $aAllSectors = $oDB->getAll($sSQL);
195                        $iTotalNum = 0;
196                        foreach($aAllSectors as $aSector)
197                        {
198                                $iTotalNum += $aSector['count'];
199                        }
200                        $iTotalLeft = $iTotalNum;
201
202                        echo ", total to do: $iTotalNum \n";
203                        flush();
204
205                        $fRankStartTime = time();
206                        $iTotalDone = 0;
207                        $fRate = 10;
208
209                        foreach($aAllSectors as $aSector)
210                        {
211                                $bAlreadyRunning = $oDB->getOne('select count(*) from pg_stat_activity where current_query like \'update placex set indexed = true where geometry_index(geometry,indexed,name) = '.$aSector['geometry_index'].' and rank_search = '.$i.'%\'');
212                                if ($bAlreadyRunning) continue;
213
214                                while (getBlockingProcesses() > $aResult['max-blocking'] || getLoadAverage() > $aResult['max-load'])
215                                {
216                                        echo "System busy, pausing indexing...\n";
217                                        sleep(60);
218                                }
219
220                                $iEstSeconds = (int)($iTotalLeft / $fRate);
221                                $iEstDays = floor($iEstSeconds / (60*60*24));
222                                $iEstSeconds -= $iEstDays*(60*60*24);
223                                $iEstHours = floor($iEstSeconds / (60*60));
224                                $iEstSeconds -= $iEstHours*(60*60);
225                                $iEstMinutes = floor($iEstSeconds / (60));
226                                $iEstSeconds -= $iEstMinutes*(60);
227                                $iNum = $aSector['count'];
228                                $sRate = round($fRate, 1);
229                                echo $aSector['geometry_index'].": $iNum, $iTotalLeft left.  Est. time remaining (this rank) d:$iEstDays h:$iEstHours m:$iEstMinutes s:$iEstSeconds @ $sRate per second\n";
230                                $iTotalLeft -= $iNum;
231                                flush();
232
233                                $fNumSteps = ceil(sqrt($iNum) / 10);
234                                $iNumSteps = $fNumSteps*$fNumSteps;
235
236                                if ($fNumSteps > 1 )
237                                {
238                                        $iStepNum = 1;
239                                        echo "Spliting into ".($fNumSteps*$fNumSteps)." steps\n";
240                                        // Convert sector number back to lat lon
241                                        $fLon = (500 - floor($aSector['geometry_index']/1000)) - 0.5;
242                                        $fLat = (500 -  $aSector['geometry_index']%1000) - 0.5;
243
244                                        $fStepSize = 1 / $fNumSteps;
245                                        for ($fStepLat = $fLat; $fStepLat < ($fLat + 1); $fStepLat += $fStepSize)
246                                        {
247                                                for ($fStepLon = $fLon; $fStepLon < ($fLon + 1); $fStepLon += $fStepSize)
248                                                {
249                                                        while (getBlockingProcesses() > $aResult['max-blocking'] || getLoadAverage() > $aResult['max-load'])
250                                                        {
251                                                                echo "System busy, pausing indexing...\n";
252                                                                sleep(60);
253                                                        }
254                                                        $fStepLonTop = $fStepLon + $fStepSize;
255                                                        $fStepLatTop = $fStepLat + $fStepSize;
256                                                        echo "  Step $iStepNum of $iNumSteps: ($fStepLon,$fStepLat,$fStepLonTop,$fStepLatTop)\n";
257                                                        $sSQL = 'update placex set indexed = true where geometry_index(geometry,indexed,name) = '.$aSector['geometry_index'].' and rank_search = '.$i;
258                                                        $sSQL .= " and ST_Contains(ST_SetSRID(ST_MakeBox2D(ST_SetSRID(ST_POINT($fStepLon,$fStepLat),4326),ST_SetSRID(ST_POINT($fStepLonTop,$fStepLatTop),4326)),4326),geometry)";
259//                                                      var_Dump($sSQL);
260                                                        $oDB->query($sSQL);
261                                                        $iStepNum++;
262                                                }
263                                        }
264                                }
265                                $sSQL = 'update placex set indexed = true where geometry_index(geometry,indexed,name) = '.$aSector['geometry_index'].' and rank_search = '.$i;
266                                $oDB->query($sSQL);
267
268                                $iTotalDone += $iNum;
269                                $fRate = $iTotalDone / (time() - $fRankStartTime);
270                        }
271                        $iDuration = date('U') - $iStartTime;
272                        if ($iDuration && $iTotalNum)
273                        {
274                                echo "Finished Rank: $i in $iDuration @ ".($iTotalNum / $iDuration)." per second\n";
275                        }
276
277                        // Keep in sync with other instances
278//                      if (isset($aResult['index-instances']) && $aResult['index-instances'] > 1)
279                        {
280                                $sSQL = 'select count(*) from placex where rank_search = '.$i.' and indexed = false and name is not null';
281                                while($iWaitNum = $oDB->getOne($sSQL))
282                                {
283                                        $iEstSleepSeconds = round(max(1,($iWaitNum / $aResult['index-estrate'])/10));
284                                        echo "Waiting for $iWaitNum other places to be indexed at this level, sleeping $iEstSleepSeconds seconds\n";
285                                        sleep($iEstSleepSeconds);
286                                }
287                        }
288                }
289        }
290
Note: See TracBrowser for help on using the repository browser.