source: subversion/applications/utils/export/osm2pgsql/gazetteer/util.update.php @ 23099

Revision 23099, 20.0 KB checked in by twain, 4 years ago (diff)

check the indexing process didn't generate any errors (second attempt)

Line 
1#!/usr/bin/php -Cq
2<?php
3
4        // Database connection string
5        $sDatabaseName = 'gazetteerworld';
6        $sDatabaseDSN = 'pgsql://@/'.$sDatabaseName;
7
8        // make this point to a place where you want --import-hourly and --import-daily
9        // save mirrored files
10        $sMirrorDir = "/home/twain/";
11
12        // Where to find and store osmosis files
13        $sOsmosisCMD = '/home/twain/osmosis-0.35.1/bin/osmosis';
14        $sOsmosisConfigDirectory = '/home/twain/.osmosis';
15
16        ini_set('memory_limit', '800M');
17        require_once('website/.htlib/lib.php');
18
19        $aCMDOptions = array(
20                "Import / update / index osm data",
21                array('help', 'h', 0, 1, 0, 0, false, 'Show Help'),
22                array('quiet', 'q', 0, 1, 0, 0, 'bool', 'Quiet output'),
23                array('verbose', 'v', 0, 1, 0, 0, 'bool', 'Verbose output'),
24
25                array('max-load', '', 0, 1, 1, 1, 'float', 'Maximum load average - indexing is paused if this is exceeded'),
26                array('max-blocking', '', 0, 1, 1, 1, 'int', 'Maximum blocking processes - indexing is aborted / paused if this is exceeded'),
27
28                array('import-osmosis', '', 0, 1, 0, 0, 'bool', 'Import using osmosis'),
29                array('import-osmosis-all', '', 0, 1, 0, 0, 'bool', 'Import using osmosis forever'),
30
31                array('import-hourly', '', 0, 1, 0, 0, 'bool', 'Import hourly diffs'),
32                array('import-daily', '', 0, 1, 0, 0, 'bool', 'Import daily diffs'),
33                array('import-all', '', 0, 1, 0, 0, 'bool', 'Import all available files'),
34
35                array('import-file', '', 0, 1, 1, 1, 'realpath', 'Re-import data from an OSM file'),
36                array('import-diff', '', 0, 1, 1, 1, 'realpath', 'Import a diff (osc) file from local file system'),
37
38                array('import-node', '', 0, 1, 1, 1, 'int', 'Re-import node'),
39                array('import-way', '', 0, 1, 1, 1, 'int', 'Re-import way'),
40                array('import-relation', '', 0, 1, 1, 1, 'int', 'Re-import relation'),
41
42                array('index', '', 0, 1, 0, 0, 'bool', 'Index'),
43                array('index-rank', '', 0, 1, 1, 1, 'int', 'Rank to start indexing from'),
44                array('index-instances', '', 0, 1, 1, 1, 'int', 'Number of indexing instances (threads)'),
45                array('index-estrate', '', 0, 1, 1, 1, 'int', 'Estimated indexed items per second (def:30)'),
46
47                array('deduplicate', '', 0, 1, 0, 0, 'bool', 'Deduplicate tokens'),
48        );
49        getCmdOpt($_SERVER['argv'], $aCMDOptions, $aResult, true, true);
50
51        if ($aResult['import-hourly'] + $aResult['import-daily'] + isset($aResult['import-diff']) > 1)
52        {
53                showUsage($aCMDOptions, true, 'Select either import of hourly or daily');
54        }
55
56        if (!isset($aResult['index-instances'])) $aResult['index-instances'] = 1;
57
58        // Lock to prevent multiple copies running
59        if (exec('/bin/ps uww | grep '.basename(__FILE__).' | grep -v /dev/null | grep -v grep -c', $aOutput2, $iResult) > 1)
60        {
61                echo "Copy already running\n";
62                exit;
63        }
64
65        if (!isset($aResult['max-load'])) $aResult['max-load'] = 1.9;
66        if (!isset($aResult['max-blocking'])) $aResult['max-blocking'] = 3;
67
68        if (getBlockingProcesses() > $aResult['max-blocking'])
69        {
70                echo "Too many blocking processes for import\n";
71                exit;
72        }
73
74        // Assume osm2pgsql is in the folder above
75        $sBasePath = dirname(dirname(__FILE__));
76
77        require_once('DB.php');
78        $oDB =& DB::connect($sDatabaseDSN.'?new_link=true', array('persistent' => false));
79        if (PEAR::IsError($oDB))
80        {
81                echo $oDB->getMessage()."\n";
82                exit;
83        } 
84        $oDB->setFetchMode(DB_FETCHMODE_ASSOC);
85        $oDB->query("SET DateStyle TO 'sql,european'");
86        $oDB->query("SET client_encoding TO 'utf-8'");
87
88        $bFirst = true;
89        $bContinue = $aResult['import-all'];
90        while ($bContinue || $bFirst)
91        {
92                $bFirst = false;
93
94                if ($aResult['import-hourly'])
95                {
96                        // Mirror the hourly diffs
97                        exec('wget --quiet --mirror -l 1 -P '.$sMirrorDir.' http://planet.openstreetmap.org/hourly');
98                        $sNextFile = $oDB->getOne('select TO_CHAR(lastimportdate,\'YYYYMMDDHH24\')||\'-\'||TO_CHAR(lastimportdate+\'1 hour\'::interval,\'YYYYMMDDHH24\')||\'.osc.gz\' from import_status');
99                        $sNextFile = $sMirrorDir.'planet.openstreetmap.org/hourly/'.$sNextFile;
100                        $sUpdateSQL = 'update import_status set lastimportdate = lastimportdate+\'1 hour\'::interval';
101                }
102
103                if ($aResult['import-daily'])
104                {
105                        // Mirror the daily diffs
106                        exec('wget --quiet --mirror -l 1 -P '.$sMirrorDir.' http://planet.openstreetmap.org/daily');
107                        $sNextFile = $oDB->getOne('select TO_CHAR(lastimportdate,\'YYYYMMDD\')||\'-\'||TO_CHAR(lastimportdate+\'1 day\'::interval,\'YYYYMMDD\')||\'.osc.gz\' from import_status');
108                        $sNextFile = $sMirrorDir.'planet.openstreetmap.org/daily/'.$sNextFile;
109                        $sUpdateSQL = 'update import_status set lastimportdate = lastimportdate::date + 1';
110                }
111               
112                if (isset($aResult['import-diff']))
113                {
114                        // import diff directly (e.g. from osmosis --rri)
115                        $sNextFile = $aResult['import-diff'];
116                        if (!file_exists($nextFile))
117                        {
118                                echo "Cannot open $nextFile\n";
119                                exit;
120                        }
121                        // Don't update the import status - we don't know what this file contains
122                        $sUpdateSQL = 'update import_status set lastimportdate = now() where false';
123                }
124
125                // Missing file is not an error - it might not be created yet
126                if (($aResult['import-hourly'] || $aResult['import-daily']) && file_exists($sNextFile))
127                {
128                        // Import the file
129                        $sCMD = $sBasePath.'/osm2pgsql -las -C 2000 -O gazetteer -d '.$sDatabaseName.' '.$sNextFile;
130                        echo $sCMD."\n";
131                        exec($sCMD, $sJunk, $iErrorLevel);
132
133                        if ($iErrorLevel)
134                        {
135                                echo "Error from $sBasePath/osm2pgsql -las -C 2000 -O gazetteer -d '.$sDatabaseName.' $sNextFile, $iErrorLevel\n";
136                                exit;
137                        }
138       
139                        // Move the date onwards
140                        $oDB->query($sUpdateSQL);
141                }
142                else
143                {
144                        $bContinue = false;
145                }
146        }
147
148        $sModifyXML = false;
149        if (isset($aResult['import-file']) && $aResult['import-file'])
150        {
151                $sModifyXML = file_get_contents($aResult['import-file']);
152        }
153        if (isset($aResult['import-node']) && $aResult['import-node'])
154        {
155                $sModifyXML = file_get_contents('http://www.openstreetmap.org/api/0.6/node/'.$aResult['import-node']);
156        }
157        if (isset($aResult['import-way']) && $aResult['import-way'])
158        {
159                $sModifyXML = file_get_contents('http://www.openstreetmap.org/api/0.6/way/'.$aResult['import-way'].'/full');
160        }
161        if (isset($aResult['import-relation']) && $aResult['import-relation'])
162        {
163                $sModifyXML = file_get_contents('http://www.openstreetmap.org/api/0.6/relation/'.$aResult['import-relation'].'/full');
164        }
165        if ($sModifyXML)
166        {
167                // Hack into a modify request
168                $sModifyXML = str_replace('<osm version="0.6" generator="OpenStreetMap server">',
169                        '<osmChange version="0.6" generator="OpenStreetMap server"><modify>', $sModifyXML);
170                $sModifyXML = str_replace('</osm>', '</modify></osmChange>', $sModifyXML);
171
172                // Outputing this is too verbose
173                if ($aResult['verbose'] && false) var_dump($sModifyXML);
174
175                $aSpec = array(
176                        0 => array("pipe", "r"),  // stdin
177                        1 => array("pipe", "w"),  // stdout
178                        2 => array("pipe", "w") // stderr
179                );
180                $aPipes = array();
181                $hProc = proc_open($sBasePath.'/osm2pgsql -las -C 2000 -O gazetteer -d '.$sDatabaseName.' -', $aSpec, $aPipes);
182                if (!is_resource($hProc))
183                {
184                        echo "$sBasePath/osm2pgsql failed\n";
185                        exit;   
186                }
187                fwrite($aPipes[0], $sModifyXML);
188                fclose($aPipes[0]);
189                $sOut = stream_get_contents($aPipes[1]);
190                if ($aResult['verbose']) echo $sOut;
191                fclose($aPipes[1]);
192                $sErrors = stream_get_contents($aPipes[2]);
193                if ($aResult['verbose']) echo $sErrors;
194                fclose($aPipes[2]);
195                if ($iError = proc_close($hProc))
196                {
197                        echo "osm2pgsql existed with error level $iError\n";
198                        echo $sOut;
199                        echo $sError;
200                        exit;
201                }
202        }
203
204        if ($aResult['deduplicate'])
205        {
206                $sSQL = "select word_token,count(*) from word where substr(word_token, 1, 1) = ' ' and class is null and type is null and country_code is null group by word_token having count(*) > 1 order by word_token";
207                $aDuplicateTokens = $oDB->getAll($sSQL);
208                foreach($aDuplicateTokens as $aToken)
209                {
210                        if (trim($aToken['word_token']) == '' || trim($aToken['word_token']) == '-') continue;
211                        echo "Deduping ".$aToken['word_token']."\n";
212                        $sSQL = "select word_id,(select count(*) from search_name where nameaddress_vector @> ARRAY[word_id]) as num from word where word_token = '".$aToken['word_token']."' and class is null and type is null and country_code is null order by num desc";
213                        $aTokenSet = $oDB->getAll($sSQL);
214                        if (PEAR::isError($aTokenSet))
215                        {
216                                var_dump($aTokenSet, $sSQL);
217                                exit;
218                        }
219                       
220                        $aKeep = array_shift($aTokenSet);
221                        $iKeepID = $aKeep['word_id'];
222
223                        foreach($aTokenSet as $aRemove)
224                        {
225                                $sSQL = "update search_name set";
226                                $sSQL .= " name_vector = (name_vector - ".$aRemove['word_id'].")+".$iKeepID.",";
227                                $sSQL .= " nameaddress_vector = (nameaddress_vector - ".$aRemove['word_id'].")+".$iKeepID;
228                                $sSQL .= " where name_vector @> ARRAY[".$aRemove['word_id']."]";
229                                $x = $oDB->query($sSQL);
230                                if (PEAR::isError($x))
231                                {
232                                        var_dump($x);
233                                        exit;
234                                }
235
236                                $sSQL = "update search_name set";
237                                $sSQL .= " nameaddress_vector = (nameaddress_vector - ".$aRemove['word_id'].")+".$iKeepID;
238                                $sSQL .= " where nameaddress_vector @> ARRAY[".$aRemove['word_id']."]";
239                                $x = $oDB->query($sSQL);
240                                if (PEAR::isError($x))
241                                {
242                                        var_dump($x);
243                                        exit;
244                                }
245
246                                $sSQL = "delete from word where word_id = ".$aRemove['word_id'];
247                                $x = $oDB->query($sSQL);
248                                if (PEAR::isError($x))
249                                {
250                                        var_dump($x);
251                                        exit;
252                                }
253                        }
254
255                }
256        }
257
258        if ($aResult['index'])
259        {
260                index($aResult, $sDatabaseDSN);
261        }
262
263        if ($aResult['import-osmosis'] || $aResult['import-osmosis-all'])
264        {
265                $sImportFile = $sBasePath.'/osmosischange.osc';
266                $sCMDDownload = $sOsmosisCMD.' --read-replication-interval workingDirectory='.$sOsmosisConfigDirectory.' --simplify-change --write-xml-change '.$sImportFile;
267                $sCMDImport = $sBasePath.'/osm2pgsql -las -C 2000 -O gazetteer -d '.$sDatabaseName.' '.$sImportFile;
268                $sCMDIndex = $sBasePath.'/gazetteer/util.update.php --index --index-instances 2 --max-load 20 --max-blocking 10';
269                while(true)
270                {
271                        $fStartTime = time();
272                        $iFileSize = 1001;
273
274                        // Logic behind this is that osm2pgsql locks the database quite a bit
275                        // So it is better to import lots of small files
276                        // But indexing works most efficiently on large amounts of data
277                        // So do lots of small imports and a BIG index
278
279                        while($aResult['import-osmosis-all'] && $iFileSize > 1000)
280                        {                       
281                                if (!file_exists($sImportFile))
282                                {
283                                        // Use osmosis to download the file
284                                        $fCMDStartTime = time();
285                                        echo $sCMDDownload."\n";
286                                        exec($sCMDDownload, $sJunk, $iErrorLevel);
287                                        if ($iErrorLevel)
288                                        {
289                                                echo "Error: $iErrorLevel\n";
290                                                exit;
291                                        }
292                                        $iFileSize = filesize($sImportFile);
293                                        $sBatchEnd = getosmosistimestamp($sOsmosisConfigDirectory);
294                                        echo "Completed for $sBatchEnd in ".round((time()-$fCMDStartTime)/60,2)." minutes\n";
295                                        $sSQL = "INSERT INTO import_osmosis_log values ('$sBatchEnd',$iFileSize,'".date('Y-m-d H:i:s',$fCMDStartTime)."','".date('Y-m-d H:i:s')."','osmosis')";
296                                        $oDB->query($sSQL);
297                                }
298
299                                $iFileSize = filesize($sImportFile);
300                                $sBatchEnd = getosmosistimestamp($sOsmosisConfigDirectory);
301               
302                                // Import the file
303                                $fCMDStartTime = time();
304                                echo $sCMDImport."\n";
305                                exec($sCMDImport, $sJunk, $iErrorLevel);
306                                if ($iErrorLevel)
307                                {
308                                        echo "Error: $iErrorLevel\n";
309                                        exit;
310                                }
311                                echo "Completed for $sBatchEnd in ".round((time()-$fCMDStartTime)/60,2)." minutes\n";
312                                $sSQL = "INSERT INTO import_osmosis_log values ('$sBatchEnd',$iFileSize,'".date('Y-m-d H:i:s',$fCMDStartTime)."','".date('Y-m-d H:i:s')."','osm2pgsql')";
313                                var_Dump($sSQL);
314                                $oDB->query($sSQL);
315
316                                // Archive for debug?
317                                unlink($sImportFile);
318                        }
319
320                        // Index file
321                        $fCMDStartTime = time();
322                        index($aResult, $sDatabaseDSN);
323
324                        // Force a new database connection - problem where one of the other treads closes our connection
325                        // which shouldn't be possible be still seems to happen!
326                        $oDB =& DB::connect($sDatabaseDSN.'?new_link=true', array('persistent' => false));
327                        if (PEAR::IsError($oDB))
328                        {
329                                echo $oDB->getMessage()."\n";
330                                exit;
331                        } 
332                        $oDB->setFetchMode(DB_FETCHMODE_ASSOC);
333                        $oDB->query("SET DateStyle TO 'sql,european'");
334                        $oDB->query("SET client_encoding TO 'utf-8'");
335
336                        echo "Completed for $sBatchEnd in ".round((time()-$fCMDStartTime)/60,2)." minutes\n";
337                        $sSQL = "INSERT INTO import_osmosis_log values ('$sBatchEnd',$iFileSize,'".date('Y-m-d H:i:s',$fCMDStartTime)."','".date('Y-m-d H:i:s')."','index')";
338                        $oDB->query($sSQL);
339
340                        $sSQL = "update import_status set lastimportdate = '$sBatchEnd'";
341                        $oDB->query($sSQL);
342
343                        $fDuration = time() - $fStartTime;
344                        echo "Completed for $sBatchEnd in ".round($fDuration/60,2)."\n";
345                        if (!$aResult['import-osmosis-all']) exit;
346
347                        echo "Sleeping ".max(0,600-$fDuration)." seconds\n";
348                        sleep(max(0,600-$fDuration));
349                }
350               
351        }
352
353function getosmosistimestamp($sOsmosisConfigDirectory)
354{
355        $sStateFile = file_get_contents($sOsmosisConfigDirectory.'/state.txt');
356        preg_match('#timestamp=(.+)#', $sStateFile, $aResult);
357        return str_replace('\:',':',$aResult[1]);
358}
359
360function index($aResult, $sDatabaseDSN)
361{
362        $oDB =& DB::connect($sDatabaseDSN.'?new_link=true', array('persistent' => false));
363        if (PEAR::IsError($oDB))
364        {
365                echo $oDB->getMessage()."\n";
366                exit;
367        } 
368        $oDB->setFetchMode(DB_FETCHMODE_ASSOC);
369        $oDB->query("SET DateStyle TO 'sql,european'");
370        $oDB->query("SET client_encoding TO 'utf-8'");
371
372        if (!isset($aResult['index-estrate']) || $aResult['index-estrate'] < 1) $aResult['index-estrate'] = 30;
373
374        if (getBlockingProcesses() > $aResult['max-blocking'])
375        {
376                echo "Too many blocking processes for index\n";
377                exit;
378        }
379
380        if ($aResult['index-instances'] > 1)
381        {
382                $aInstances = array();
383                for ($iInstance = 0; $iInstance < $aResult['index-instances']; $iInstance++)
384                {
385                        $aInstances[$iInstance] = array('iPID' => null, 'hSocket' => null, 'bBusy' => false);
386                        socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $aSockets);
387                        $iPID = pcntl_fork();
388                        if ($iPID == -1) die('Could not fork Process');
389                        if (!$iPID)
390                        {
391                                // THIS IS THE CHILD PROCESS
392
393                                // Reconnect to database - reusing the same connection is not a good idea!
394                                $oDBChild =& DB::connect($sDatabaseDSN.'?new_link=true', array('persistent' => false));
395                                if (PEAR::IsError($oDBChild))
396                                {
397                                        echo $oDBChild->getMessage()."\n";
398                                        exit;
399                                } 
400                                $oDBChild->setFetchMode(DB_FETCHMODE_ASSOC);
401                                $oDBChild->query("SET DateStyle TO 'sql,european'");
402                                $oDBChild->query("SET client_encoding TO 'utf-8'");
403
404                                socket_close($aSockets[1]);
405                                $hSocket = $aSockets[0];
406                                while($sRead = socket_read($hSocket, 1000, PHP_BINARY_READ))
407                                {
408                                        if ($sRead == 'EXIT') break;
409                                        $aSector = unserialize($sRead);
410                                        echo " - $iInstance processing ".$aSector['geometry_index']."\n";
411                                        indexSector($oDBChild, $aSector, $aResult['max-blocking'], $aResult['max-load']);
412                                        socket_write($hSocket, 'DONE');
413                                        echo " - $iInstance finished ".$aSector['geometry_index']."\n";
414                                }
415                                socket_close($hSocket);
416                                exit;
417
418                                // THIS IS THE END OF THE CHILD PROCESS
419                        }
420                        $aInstances[$iInstance]['iPID'] = $iPID;
421                        socket_close($aSockets[0]);
422                        socket_set_nonblock($aSockets[1]);
423                        $aInstances[$iInstance]['hSocket'] = $aSockets[1];
424                }
425        }
426
427        // Re-index the new items
428        if (!isset($aResult['index-rank']) || !$aResult['index-rank']) $aResult['index-rank'] = 0;
429        for ($i = $aResult['index-rank']; $i <= 30; $i++)
430        { 
431                echo "Rank: $i";
432
433                $iStartTime = date('U');
434                flush();
435                       
436                $sSQL = 'select geometry_index(geometry,indexed,name),count(*) from placex where rank_search = '.$i.' and indexed = false and name is not null group by geometry_index(geometry,indexed,name)';
437                $sSQL .= ' order by count desc';
438                $aAllSectors = $oDB->getAll($sSQL);
439                if (PEAR::isError($aAllSectors))
440                {
441                        var_dump($aAllSectors);
442                        exit;
443                }
444                $iTotalNum = 0;
445                foreach($aAllSectors as $aSector)
446                {
447                        $iTotalNum += $aSector['count'];
448                }
449                $iTotalLeft = $iTotalNum;
450
451                echo ", total to do: $iTotalNum \n";
452                flush();
453
454                $fRankStartTime = time();
455                $iTotalDone = 0;
456                $fRate = $aResult['index-estrate'];
457
458                foreach($aAllSectors as $aSector)
459                {
460                        $aSector['rank'] = $i;
461
462                        if ($aSector['rank'] == 21 && $aSector['geometry_index'] == 617467)
463                        {
464                                echo "Skipping sector 617467 @ 21 due to issue\n";
465                                continue;
466                        }
467
468                        while (getBlockingProcesses() > $aResult['max-blocking'] || getLoadAverage() > $aResult['max-load'])
469                        {
470                                echo "System busy, pausing indexing...\n";
471                                sleep(60);
472                        }
473
474                        $iEstSeconds = (int)($iTotalLeft / $fRate);
475                        $iEstDays = floor($iEstSeconds / (60*60*24));
476                        $iEstSeconds -= $iEstDays*(60*60*24);
477                        $iEstHours = floor($iEstSeconds / (60*60));
478                        $iEstSeconds -= $iEstHours*(60*60);
479                        $iEstMinutes = floor($iEstSeconds / (60));
480                        $iEstSeconds -= $iEstMinutes*(60);
481                        $iNum = $aSector['count'];
482                        $sRate = round($fRate, 1);
483                        echo $aSector['geometry_index'].": $iNum, $iTotalLeft left.  Est. time remaining (rank $i) d:$iEstDays h:$iEstHours m:$iEstMinutes s:$iEstSeconds @ $sRate per second\n";
484                        flush();
485
486                        if ($aResult['index-instances'] > 1)
487                        {
488                                // Wait for a non-busy socket
489                                while(true)
490                                {
491                                        for ($iInstance = 0; $iInstance < $aResult['index-instances']; $iInstance++)
492                                        {
493                                                if (!$aInstances[$iInstance]['bBusy']) break 2;
494                                                $sRead = socket_read($aInstances[$iInstance]['hSocket'], 10, PHP_BINARY_READ);
495                                                if ($sRead == 'DONE')
496                                                {
497                                                        $aInstances[$iInstance]['bBusy'] = false;
498                                                        $iTotalDone += $iNum;
499                                                        $iTotalLeft -= $iNum;
500                                                        break 2;
501                                                }
502                                        }
503                                        usleep(1000);
504                                }
505                                echo "Dispatch to $iInstance (".$aInstances[$iInstance]['iPID'].")\n";
506                                socket_write($aInstances[$iInstance]['hSocket'], serialize($aSector));
507                                $aInstances[$iInstance]['bBusy'] = true;
508                        }
509                        else
510                        {
511                                indexSector($oDB, $aSector, $aResult['max-blocking'], $aResult['max-load']);
512                                $iTotalDone += $iNum;
513                                $iTotalLeft -= $iNum;
514                        }
515
516                        $fDuration = time() - $fRankStartTime;
517                        if ($fDuration)
518                                $fRate = $iTotalDone / $fDuration;
519                        else
520                                $fRate = $aResult['index-estrate'];
521                }
522
523                if ($aResult['index-instances'] > 1)
524                {
525                        // Wait for a non-busy socket
526                        $bPending = true;
527                        while($bPending)
528                        {
529                                $bPending = false;
530                                for ($iInstance = 0; $iInstance < $aResult['index-instances']; $iInstance++)
531                                {
532                                        if ($aInstances[$iInstance]['bBusy'])
533                                        {
534                                                if (socket_read($aInstances[$iInstance]['hSocket'], 10, PHP_BINARY_READ) == 'DONE')
535                                                {
536                                                        $aInstances[$iInstance]['bBusy'] = false;
537                                                }
538                                                else
539                                                {
540                                                        $bPending = true;
541                                                }
542                                        }
543                                }
544                                if ($bPending) usleep(1000000);
545                        }
546                }
547                       
548                $iDuration = date('U') - $iStartTime;
549                if ($iDuration && $iTotalNum)
550                {
551                        echo "Finished Rank: $i in $iDuration @ ".($iTotalNum / $iDuration)." per second\n";
552                }
553        }
554
555        if ($aResult['index-instances'] > 1)
556        {
557                for ($iInstance = 0; $iInstance < $aResult['index-instances']; $iInstance++)
558                {
559                        socket_write($aInstances[$iInstance]['hSocket'], 'EXIT');
560                        socket_close($aInstances[$iInstance]['hSocket']);
561                }
562        }
563}
564
565function indexSector($oDB, $aSector, $fMaxBlocking, $fMaxLoad)
566{
567        $iNum = $aSector['count'];
568        $iRank = $aSector['rank'];
569
570        $fNumSteps = ceil(sqrt($iNum) / 10);
571        $iNumSteps = $fNumSteps*$fNumSteps;
572
573        if ($fNumSteps > 1 )
574        {
575                $iStepNum = 1;
576                echo "Spliting into ".($fNumSteps*$fNumSteps)." steps\n";
577
578                // Convert sector number back to lat lon
579                $fLon = (500 - floor($aSector['geometry_index']/1000)) - 0.5;
580                $fLat = (500 -  $aSector['geometry_index']%1000) - 0.5;
581
582                $fStepSize = 1 / $fNumSteps;
583                for ($fStepLat = $fLat; $fStepLat < ($fLat + 1); $fStepLat += $fStepSize)
584                {
585                        for ($fStepLon = $fLon; $fStepLon < ($fLon + 1); $fStepLon += $fStepSize)
586                        {
587                                while (getBlockingProcesses() > $fMaxBlocking || getLoadAverage() > $fMaxLoad)
588                                {
589                                        echo "System busy, pausing indexing...\n";
590                                        sleep(60);
591                                }
592
593                                $fStepLonTop = $fStepLon + $fStepSize;
594                                $fStepLatTop = $fStepLat + $fStepSize;
595                                echo "  Step $iStepNum of $iNumSteps: ($fStepLon,$fStepLat,$fStepLonTop,$fStepLatTop)\n";
596                                $sSQL = 'update placex set indexed = true where geometry_index(geometry,indexed,name) = '.$aSector['geometry_index'].' and rank_search = '.$iRank;
597                                $sSQL .= " and ST_Contains(ST_SetSRID(ST_MakeBox2D(ST_SetSRID(ST_POINT($fStepLon,$fStepLat),4326),ST_SetSRID(ST_POINT($fStepLonTop,$fStepLatTop),4326)),4326),geometry)";
598                                if (PEAR::IsError($xError = $oDB->query($sSQL)))
599                                {
600                                        var_dump($xError);
601                                        exit;
602                                }
603                                $iStepNum++;
604                        }
605                }
606        }
607        $sSQL = 'update placex set indexed = true where geometry_index(geometry,indexed,name) = '.$aSector['geometry_index'].' and rank_search = '.$iRank;
608        if (PEAR::IsError($xError = $oDB->query($sSQL)))
609        {
610                var_dump($xError);
611                exit;
612        }
613        $oDB->query($sSQL);
614}
Note: See TracBrowser for help on using the repository browser.