source: subversion/applications/utils/export/osm2pgsql/middle-pgsql.c @ 21054

Last change on this file since 21054 was 18353, checked in by jonb, 10 years ago

osm2pgsql: Update code to use DROP TABLE IF EXISTS. This avoids errors in the postgresql logs and requires postgresql-8.2+. Fixes ticket #2379. Update version to 0.68

File size: 39.5 KB
Line 
1/* Implements the mid-layer processing for osm2pgsql
2 * using several PostgreSQL tables
3 *
4 * This layer stores data read in from the planet.osm file
5 * and is then read by the backend processing code to
6 * emit the final geometry-enabled output formats
7*/
8 
9#include <stdio.h>
10#include <unistd.h>
11#include <stdlib.h>
12#include <string.h>
13#include <assert.h>
14
15#ifdef HAVE_PTHREAD
16#include <pthread.h>
17#endif
18
19#include <libpq-fe.h>
20
21#include "osmtypes.h"
22#include "middle.h"
23#include "middle-pgsql.h"
24#include "output-pgsql.h"
25#include "pgsql.h"
26
27/* Store +-20,000km Mercator co-ordinates as fixed point 32bit number with maximum precision */
28/* Scale is chosen such that 40,000 * SCALE < 2^32          */
29#define FIXED_POINT
30
31static int scale = 100;
32#define DOUBLE_TO_FIX(x) ((int)((x) * scale))
33#define FIX_TO_DOUBLE(x) (((double)x) / scale)
34
35
36
37enum table_id {
38    t_node, t_way, t_rel
39} ;
40
41struct table_desc {
42    //enum table_id table;
43    const char *name;
44    const char *start;
45    const char *create;
46    const char *prepare;
47    const char *prepare_intarray;
48    const char *copy;
49    const char *analyze;
50    const char *stop;
51    const char *array_indexes;
52
53    int copyMode;    /* True if we are in copy mode */
54    PGconn *sql_conn;
55};
56
57static struct table_desc tables [] = {
58    { 
59        //table: t_node,
60         name: "%s_nodes",
61        start: "BEGIN;\n",
62#ifdef FIXED_POINT
63       create: "CREATE TABLE %s_nodes (id int4 PRIMARY KEY, lat int4 not null, lon int4 not null, tags text[]);\n",
64      prepare: "PREPARE insert_node (int4, int4, int4, text[]) AS INSERT INTO %s_nodes VALUES ($1,$2,$3,$4);\n"
65#else
66       create: "CREATE TABLE %s_nodes (id int4 PRIMARY KEY, lat double precision not null, lon double precision not null, tags text[]);\n",
67      prepare: "PREPARE insert_node (int4, double precision, double precision, text[]) AS INSERT INTO %s_nodes VALUES ($1,$2,$3,$4);\n"
68#endif
69               "PREPARE get_node (int4) AS SELECT lat,lon,tags FROM %s_nodes WHERE id = $1 LIMIT 1;\n"
70               "PREPARE delete_node (int4) AS DELETE FROM %s_nodes WHERE id = $1;\n",
71prepare_intarray: // This is to fetch lots of nodes simultaneously, in order including duplicates. The commented out version doesn't do duplicates
72                  // It's not optimal as it does a Nested Loop / Index Scan which is suboptimal for large arrays
73                  //"PREPARE get_node_list(int[]) AS SELECT id, lat, lon FROM %s_nodes WHERE id = ANY($1::int4[]) ORDER BY $1::int4[] # id\n",
74               "PREPARE get_node_list(int[]) AS select y.id, y.lat, y.lon from (select i, ($1)[i] as l_id from (select generate_series(1,icount($1)) as i) x) z, "
75                                               "(select * from %s_nodes where id = ANY($1)) y where l_id=id order by i;\n",
76         copy: "COPY %s_nodes FROM STDIN;\n",
77      analyze: "ANALYZE %s_nodes;\n",
78         stop: "COMMIT;\n"
79    },
80    { 
81        //table: t_way,
82         name: "%s_ways",
83        start: "BEGIN;\n",
84       create: "CREATE TABLE %s_ways (id int4 PRIMARY KEY, nodes int4[] not null, tags text[], pending boolean not null);\n"
85               "CREATE INDEX %s_ways_idx ON %s_ways (id) WHERE pending;\n",
86array_indexes: "CREATE INDEX %s_ways_nodes ON %s_ways USING gin (nodes gin__int_ops);\n",
87      prepare: "PREPARE insert_way (int4, int4[], text[], boolean) AS INSERT INTO %s_ways VALUES ($1,$2,$3,$4);\n"
88               "PREPARE get_way (int4) AS SELECT nodes, tags, array_upper(nodes,1) FROM %s_ways WHERE id = $1;\n"
89               "PREPARE way_done(int4) AS UPDATE %s_ways SET pending = false WHERE id = $1;\n"
90               "PREPARE pending_ways AS SELECT id FROM %s_ways WHERE pending;\n"
91               "PREPARE delete_way(int4) AS DELETE FROM %s_ways WHERE id = $1;\n",
92prepare_intarray: "PREPARE node_changed_mark(int4) AS UPDATE %s_ways SET pending = true WHERE nodes && ARRAY[$1] AND NOT pending;\n",
93         copy: "COPY %s_ways FROM STDIN;\n",
94      analyze: "ANALYZE %s_ways;\n",
95         stop:  "COMMIT;\n"
96    },
97    { 
98        //table: t_rel,
99         name: "%s_rels",
100        start: "BEGIN;\n",
101       create: "CREATE TABLE %s_rels(id int4 PRIMARY KEY, way_off int2, rel_off int2, parts int4[], members text[], tags text[], pending boolean not null);\n"
102               "CREATE INDEX %s_rels_idx ON %s_rels (id) WHERE pending;\n",
103array_indexes: "CREATE INDEX %s_rels_parts ON %s_rels USING gin (parts gin__int_ops);\n",
104      prepare: "PREPARE insert_rel (int4, int2, int2, int[], text[], text[]) AS INSERT INTO %s_rels VALUES ($1,$2,$3,$4,$5,$6,false);\n"
105               "PREPARE get_rel (int4) AS SELECT members, tags, array_upper(members,1)/2 FROM %s_rels WHERE id = $1;\n"
106               "PREPARE rel_done(int4) AS UPDATE %s_rels SET pending = false WHERE id = $1;\n"
107               "PREPARE pending_rels AS SELECT id FROM %s_rels WHERE pending;\n"
108               "PREPARE delete_rel(int4) AS DELETE FROM %s_rels WHERE id = $1;\n",
109prepare_intarray: /* Note: don't use subarray here since (at least in 8.1) has odd effects if you request stuff out of range */
110                "PREPARE node_changed_mark(int4) AS UPDATE %s_rels SET pending = true WHERE parts && ARRAY[$1] AND parts[1:way_off] && ARRAY[$1] AND NOT pending;\n"
111                "PREPARE way_changed_mark(int4) AS UPDATE %s_rels SET pending = true WHERE parts && ARRAY[$1] AND parts[way_off+1:rel_off] && ARRAY[$1] AND NOT pending;\n"
112                  /* For this it works fine */
113                "PREPARE rel_changed_mark(int4) AS UPDATE %s_rels SET pending = true WHERE parts && ARRAY[$1] AND subarray(parts,rel_off+1) && ARRAY[$1] AND NOT pending;\n",
114         copy: "COPY %s_rels FROM STDIN;\n",
115      analyze: "ANALYZE %s_rels;\n",
116         stop:  "COMMIT;\n"
117    }
118};
119
120static const int num_tables = sizeof(tables)/sizeof(tables[0]);
121static int warn_node_order;
122static struct table_desc *node_table = &tables[t_node];
123static struct table_desc *way_table  = &tables[t_way];
124static struct table_desc *rel_table  = &tables[t_rel];
125
126/* Here we use a similar storage structure as middle-ram, except we allow
127 * the array to be lossy so we can cap the total memory usage. Hence it is a
128 * combination of a sparse array with a priority queue
129 *
130 * Like middle-ram we have a number of blocks all storing PER_BLOCK
131 * ramNodes. However, here we also track the number of nodes in each block.
132 * Seperately we have a priority queue like structure when maintains a list
133 * of all the used block so we can easily find the block with the least
134 * nodes. The cache has two phases:
135 *
136 * Phase 1: Loading initially, usedBlocks < maxBlocks. In this case when a
137 * new block is needed we simply allocate it and put it in
138 * queue[usedBlocks-1] which is the bottom of the tree. Every node added
139 * increases it's usage. When we move onto the next block we percolate this
140 * block up the queue until it reaches its correct position. The invariant
141 * is that the priority tree is complete except for this last node. We do
142 * not permit adding nodes to any other block to preserve this invariant.
143 *
144 * Phase 2: Once we've reached the maximum number of blocks permitted, we
145 * change so that the block currently be inserted into is at the top of the
146 * tree. When a new block is needed we take the one at the end of the queue,
147 * as it is the one with the least number of nodes in it. When we move onto
148 * the next block we first push the just completed block down to it's
149 * correct position in the queue and then reuse the block that now at the
150 * head.
151 *
152 * The result being that at any moment we have in memory the top maxBlock
153 * blocks in terms of number of nodes in memory. This should maximize the
154 * number of hits in lookups.
155 *
156 * Complexity:
157 *  Insert node: O(1)
158 *  Lookup node: O(1)
159 *  Add new block: O(log usedBlocks)
160 *  Reuse old block: O(log maxBlocks)
161 */
162
163struct ramNode {
164#ifdef FIXED_POINT
165    int lon;
166    int lat;
167#else
168    double lon;
169    double lat;
170#endif
171};
172
173struct ramNodeBlock {
174  struct ramNode    *nodes;
175  int used;
176};
177
178#define BLOCK_SHIFT 10
179#define PER_BLOCK  (1 << BLOCK_SHIFT)
180#define NUM_BLOCKS (1 << (32 - BLOCK_SHIFT))
181
182static struct ramNodeBlock blocks[NUM_BLOCKS];
183static int usedBlocks;
184/* Note: maxBlocks *must* be odd, to make sure the priority queue has no nodes with only one child */
185static int maxBlocks = 0;
186static struct ramNodeBlock **queue;
187static int storedNodes, totalNodes;
188int nodesCacheHits, nodesCacheLookups;
189
190static int Append;
191
192static inline int id2block(int id)
193{
194    // + NUM_BLOCKS/2 allows for negative IDs
195    return (id >> BLOCK_SHIFT) + NUM_BLOCKS/2;
196}
197
198static inline int id2offset(int id)
199{
200    return id & (PER_BLOCK-1);
201}
202
203static inline int block2id(int block, int offset)
204{
205    return ((block - NUM_BLOCKS/2) << BLOCK_SHIFT) + offset;
206}
207
208#define Swap(a,b) { typeof(a) __tmp = a; a = b; b = __tmp; }
209static void percolate_up( int pos )
210{
211    int i = pos;
212    while( i > 0 )
213    {
214      int parent = (i-1)>>1;
215      if( queue[i]->used < queue[parent]->used )
216      {
217        Swap( queue[i], queue[parent] );
218        i = parent;
219      }
220      else
221        break;
222    }
223}
224
225#define UNUSED  __attribute__ ((unused))
226static int pgsql_ram_nodes_set(int id, double lat, double lon, struct keyval *tags UNUSED)
227{
228    int block  = id2block(id);
229    int offset = id2offset(id);
230   
231    totalNodes++;
232
233    if (!blocks[block].nodes) {
234        if( usedBlocks < maxBlocks )
235        {
236          /* We've just finished with the previous block, so we need to percolate it up the queue to its correct position */
237          if( usedBlocks > 0 )
238            /* Upto log(usedBlocks) iterations */
239            percolate_up( usedBlocks-1 );
240
241          blocks[block].nodes = calloc(PER_BLOCK, sizeof(struct ramNode));
242          blocks[block].used = 0;
243          if (!blocks[block].nodes) {
244              fprintf(stderr, "Error allocating nodes\n");
245              exit_nicely();
246          }
247          queue[usedBlocks] = &blocks[block];
248          usedBlocks++;
249
250          /* If we've just used up the last possible block we enter the
251           * transition and we change the invariant. To do this we percolate
252           * the newly allocated block straight to the head */
253          if( usedBlocks == maxBlocks )
254            percolate_up( usedBlocks-1 );
255        }
256        else
257        {
258          /* We've reached the maximum number of blocks, so now we push the
259           * current head of the tree down to the right level to restore the
260           * priority queue invariant. Upto log(maxBlocks) iterations */
261         
262          int i=0;
263          while( 2*i+1 < maxBlocks )
264          {
265            if( queue[2*i+1]->used <= queue[2*i+2]->used )
266            {
267              if( queue[i]->used > queue[2*i+1]->used )
268              {
269                Swap( queue[i], queue[2*i+1] );
270                i = 2*i+1;
271              }
272              else
273                break;
274            }
275            else
276            {
277              if( queue[i]->used > queue[2*i+2]->used )
278              {
279                Swap( queue[i], queue[2*i+2] );
280                i = 2*i+2;
281              }
282              else
283                break;
284            }
285          }
286          /* Now the head of the queue is the smallest, so it becomes our replacement candidate */
287          blocks[block].nodes = queue[0]->nodes;
288          blocks[block].used = 0;
289          memset( blocks[block].nodes, 0, PER_BLOCK * sizeof(struct ramNode) );
290         
291          /* Clear old head block and point to new block */
292          storedNodes -= queue[0]->used;
293          queue[0]->nodes = NULL;
294          queue[0]->used = 0;
295          queue[0] = &blocks[block];
296        }
297    }
298    else
299    {
300      /* Insert into an existing block. We can't allow this in general or it
301       * will break the invariant. However, it will work fine if all the
302       * nodes come in numerical order, which is the common case */
303     
304      int expectedpos;
305      if( usedBlocks < maxBlocks )
306        expectedpos = usedBlocks-1;
307      else
308        expectedpos = 0;
309       
310      if( queue[expectedpos] != &blocks[block] )
311      {
312        if (!warn_node_order) {
313            fprintf( stderr, "WARNING: Found Out of order node %d (%d,%d) - this will impact the cache efficiency\n", id, block, offset );
314            warn_node_order++;
315        }
316        return 1;
317      }
318    }
319       
320#ifdef FIXED_POINT
321    blocks[block].nodes[offset].lat = DOUBLE_TO_FIX(lat);
322    blocks[block].nodes[offset].lon = DOUBLE_TO_FIX(lon);
323#else
324    blocks[block].nodes[offset].lat = lat;
325    blocks[block].nodes[offset].lon = lon;
326#endif
327    blocks[block].used++;
328    storedNodes++;
329    return 0;
330}
331
332
333int pgsql_ram_nodes_get(struct osmNode *out, int id)
334{
335    int block  = id2block(id);
336    int offset = id2offset(id);
337    nodesCacheLookups++;
338
339    if (!blocks[block].nodes)
340        return 1;
341
342    if (!blocks[block].nodes[offset].lat && !blocks[block].nodes[offset].lon)
343        return 1;
344
345#ifdef FIXED_POINT
346    out->lat = FIX_TO_DOUBLE(blocks[block].nodes[offset].lat);
347    out->lon = FIX_TO_DOUBLE(blocks[block].nodes[offset].lon);
348#else
349    out->lat = blocks[block].nodes[offset].lat;
350    out->lon = blocks[block].nodes[offset].lon;
351#endif
352    nodesCacheHits++;
353    return 0;
354}
355
356static void pgsql_cleanup(void)
357{
358    int i;
359
360    for (i=0; i<num_tables; i++) {
361        if (tables[i].sql_conn) {
362            PQfinish(tables[i].sql_conn);
363            tables[i].sql_conn = NULL;
364        }
365    }
366}
367
368char *pgsql_store_nodes(int *nds, int nd_count)
369{
370  static char *buffer;
371  static int buflen;
372
373  char *ptr;
374  int i, first;
375   
376  if( buflen <= nd_count * 10 )
377  {
378    buflen = ((nd_count * 10) | 4095) + 1;  /* Round up to next page */
379    buffer = realloc( buffer, buflen );
380  }
381_restart:
382
383  ptr = buffer;
384  first = 1;
385  *ptr++ = '{';
386  for( i=0; i<nd_count; i++ )
387  {
388    if( !first ) *ptr++ = ',';
389    ptr += sprintf( ptr, "%d", nds[i] );
390   
391    if( (ptr-buffer) > (buflen-20) ) /* Almost overflowed? */
392    {
393      buflen <<= 1;
394      buffer = realloc( buffer, buflen );
395     
396      goto _restart;
397    }
398    first = 0;
399  }
400 
401  *ptr++ = '}';
402  *ptr++ = 0;
403 
404  return buffer;
405}
406
407/* Special escape routine for escaping strings in array constants: double quote, backslash,newline, tab*/
408static inline char *escape_tag( char *ptr, const char *in, int escape )
409{
410  while( *in )
411  {
412    switch(*in)
413    {
414      case '"':
415        if( escape ) *ptr++ = '\\';
416        *ptr++ = '\\';
417        *ptr++ = '"';
418        break;
419      case '\\':
420        if( escape ) *ptr++ = '\\';
421        if( escape ) *ptr++ = '\\';
422        *ptr++ = '\\';
423        *ptr++ = '\\';
424        break;
425      case '\n':
426        if( escape ) *ptr++ = '\\';
427        *ptr++ = '\\';
428        *ptr++ = 'n';
429        break;
430      case '\r':
431        if( escape ) *ptr++ = '\\';
432        *ptr++ = '\\';
433        *ptr++ = 'r';
434        break;
435      case '\t':
436        if( escape ) *ptr++ = '\\';
437        *ptr++ = '\\';
438        *ptr++ = 't';
439        break;
440      default:
441        *ptr++ = *in;
442        break;
443    }
444    in++;
445  }
446  return ptr;
447}
448
449/* escape means we return '\N' for copy mode, otherwise we return just NULL */
450char *pgsql_store_tags(struct keyval *tags, int escape)
451{
452  static char *buffer;
453  static int buflen;
454
455  char *ptr;
456  struct keyval *i;
457  int first;
458   
459  int countlist = countList(tags);
460  if( countlist == 0 )
461  {
462    if( escape )
463      return "\\N";
464    else
465      return NULL;
466  }
467   
468  if( buflen <= countlist * 24 ) /* LE so 0 always matches */
469  {
470    buflen = ((countlist * 24) | 4095) + 1;  /* Round up to next page */
471    buffer = realloc( buffer, buflen );
472  }
473_restart:
474
475  ptr = buffer;
476  first = 1;
477  *ptr++ = '{';
478  /* The lists are circular, exit when we reach the head again */
479  for( i=tags->next; i->key; i = i->next )
480  {
481    int maxlen = (strlen(i->key) + strlen(i->value)) * 4;
482    if( (ptr+maxlen-buffer) > (buflen-20) ) /* Almost overflowed? */
483    {
484      buflen <<= 1;
485      buffer = realloc( buffer, buflen );
486     
487      goto _restart;
488    }
489    if( !first ) *ptr++ = ',';
490    *ptr++ = '"';
491    ptr = escape_tag( ptr, i->key, escape );
492    *ptr++ = '"';
493    *ptr++ = ',';
494    *ptr++ = '"';
495    ptr = escape_tag( ptr, i->value, escape );
496    *ptr++ = '"';
497   
498    first=0;
499  }
500 
501  *ptr++ = '}';
502  *ptr++ = 0;
503 
504  return buffer;
505}
506
507/* Decodes a portion of an array literal from postgres */
508/* Argument should point to beginning of literal, on return points to delimiter */
509static const char *decode_upto( const char *src, char *dst )
510{
511  int quoted = (*src == '"');
512  if( quoted ) src++;
513 
514  while( quoted ? (*src != '"') : (*src != ',' && *src != '}') )
515  {
516    if( *src == '\\' )
517    {
518      switch( src[1] )
519      {
520        case 'n': *dst++ = '\n'; break;
521        case 't': *dst++ = '\t'; break;
522        default: *dst++ = src[1]; break;
523      }
524      src+=2;
525    }
526    else
527      *dst++ = *src++;
528  }
529  if( quoted ) src++;
530  *dst = 0;
531  return src;
532}
533
534static void pgsql_parse_tags( const char *string, struct keyval *tags )
535{
536  char key[1024];
537  char val[1024];
538 
539  if( *string == '\0' )
540    return;
541   
542//  fprintf( stderr, "Parsing: %s\n", string );
543  if( *string++ != '{' )
544    return;
545  while( *string != '}' )
546  {
547    string = decode_upto( string, key );
548    /* String points to the comma */
549    string++;
550    string = decode_upto( string, val );
551    /* String points to the comma or closing '}' */
552    addItem( tags, key, val, 0 );
553//    fprintf( stderr, "Extracted item: %s=%s\n", key, val );
554    if( *string == ',' )
555      string++;
556  }
557}
558
559/* Parses an array of integers */
560static void pgsql_parse_nodes( const char *src, int *nds, int nd_count )
561{
562  int count = 0;
563  const char *string = src;
564 
565  if( *string++ != '{' )
566    return;
567  while( *string != '}' )
568  {
569    char *ptr;
570    nds[count] = strtol( string, &ptr, 10 );
571    string = ptr;
572    if( *string == ',' )
573      string++;
574    count++;
575  }
576  if( count != nd_count )
577  {
578    fprintf( stderr, "parse_nodes problem: '%s' expected %d got %d\n", src, nd_count, count );
579    exit_nicely();
580  }
581}
582
583static int pgsql_endCopy( struct table_desc *table)
584{
585    /* Terminate any pending COPY */
586     if (table->copyMode) {
587        PGconn *sql_conn = table->sql_conn;
588        int stop = PQputCopyEnd(sql_conn, NULL);
589        if (stop != 1) {
590            fprintf(stderr, "COPY_END for %s failed: %s\n", table->copy, PQerrorMessage(sql_conn));
591            exit_nicely();
592        }
593
594        PGresult *res = PQgetResult(sql_conn);
595        if (PQresultStatus(res) != PGRES_COMMAND_OK) {
596            fprintf(stderr, "COPY_END for %s failed: %s\n", table->copy, PQerrorMessage(sql_conn));
597            PQclear(res);
598            exit_nicely();
599        }
600        PQclear(res);
601        table->copyMode = 0;
602    }
603    return 0;
604}
605
606static int pgsql_nodes_set(int id, double lat, double lon, struct keyval *tags)
607{
608    /* Four params: id, lat, lon, tags */
609    char *paramValues[4];
610    char *buffer;
611
612    pgsql_ram_nodes_set( id, lat, lon, tags );
613    if( node_table->copyMode )
614    {
615      char *tag_buf = pgsql_store_tags(tags,1);
616      int length = strlen(tag_buf) + 64;
617      buffer = alloca( length );
618#ifdef FIXED_POINT
619      if( snprintf( buffer, length, "%d\t%d\t%d\t%s\n", id, DOUBLE_TO_FIX(lat), DOUBLE_TO_FIX(lon), tag_buf ) > (length-10) )
620      { fprintf( stderr, "buffer overflow node id %d\n", id); return 1; }
621#else
622      if( snprintf( buffer, length, "%d\t%.10f\t%.10f\t%s\n", id, lat, lon, tag_buf ) > (length-10) )
623      { fprintf( stderr, "buffer overflow node id %d\n", id); return 1; }
624#endif
625      return pgsql_CopyData(__FUNCTION__, node_table->sql_conn, buffer);
626    }
627    buffer = alloca(64);
628    paramValues[0] = buffer;
629    paramValues[1] = paramValues[0] + sprintf( paramValues[0], "%d", id ) + 1;
630#ifdef FIXED_POINT
631    paramValues[2] = paramValues[1] + sprintf( paramValues[1], "%d", DOUBLE_TO_FIX(lat) ) + 1;
632    sprintf( paramValues[2], "%d", DOUBLE_TO_FIX(lon) );
633#else
634    paramValues[2] = paramValues[1] + sprintf( paramValues[1], "%.10f", lat ) + 1;
635    sprintf( paramValues[2], "%.10f", lon );
636#endif
637    paramValues[3] = pgsql_store_tags(tags,0);
638    pgsql_execPrepared(node_table->sql_conn, "insert_node", 4, (const char * const *)paramValues, PGRES_COMMAND_OK);
639    return 0;
640}
641
642
643static int pgsql_nodes_get(struct osmNode *out, int id)
644{
645    /* Check cache first */
646    if( pgsql_ram_nodes_get( out, id ) == 0 )
647      return 0;
648     
649    PGresult   *res;
650    char tmp[16];
651    char const *paramValues[1];
652    PGconn *sql_conn = node_table->sql_conn;
653
654    /* Make sure we're out of copy mode */
655    pgsql_endCopy( node_table );
656
657    snprintf(tmp, sizeof(tmp), "%d", id);
658    paramValues[0] = tmp;
659 
660    res = pgsql_execPrepared(sql_conn, "get_node", 1, paramValues, PGRES_TUPLES_OK);
661
662    if (PQntuples(res) != 1) {
663        PQclear(res);
664        return 1;
665    } 
666
667#ifdef FIXED_POINT
668    out->lat = FIX_TO_DOUBLE(strtol(PQgetvalue(res, 0, 0), NULL, 10));
669    out->lon = FIX_TO_DOUBLE(strtol(PQgetvalue(res, 0, 1), NULL, 10));
670#else
671    out->lat = strtod(PQgetvalue(res, 0, 0), NULL);
672    out->lon = strtod(PQgetvalue(res, 0, 1), NULL);
673#endif
674    PQclear(res);
675    return 0;
676}
677
678/* This should be made more efficient by using an IN(ARRAY[]) construct */
679static int pgsql_nodes_get_list(struct osmNode *nodes, int *ndids, int nd_count)
680{
681    int count = 0, i;
682    for( i=0; i<nd_count; i++ )
683    {
684      if( pgsql_nodes_get( &nodes[count], ndids[i] ) == 0 )
685        count++;
686    }
687    return count;
688}
689
690static int pgsql_nodes_delete(int osm_id)
691{
692    char const *paramValues[1];
693    char buffer[64];
694    /* Make sure we're out of copy mode */
695    pgsql_endCopy( node_table );
696   
697    sprintf( buffer, "%d", osm_id );
698    paramValues[0] = buffer;
699    pgsql_execPrepared(node_table->sql_conn, "delete_node", 1, paramValues, PGRES_COMMAND_OK );
700    return 0;
701}
702
703static int pgsql_node_changed(int osm_id)
704{
705    char const *paramValues[1];
706    char buffer[64];
707    /* Make sure we're out of copy mode */
708    pgsql_endCopy( way_table );
709    pgsql_endCopy( rel_table );
710   
711    sprintf( buffer, "%d", osm_id );
712    paramValues[0] = buffer;
713    pgsql_execPrepared(way_table->sql_conn, "node_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
714    pgsql_execPrepared(rel_table->sql_conn, "node_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
715    return 0;
716}
717
718static int pgsql_ways_set(int way_id, int *nds, int nd_count, struct keyval *tags, int pending)
719{
720    /* Three params: id, nodes, tags, pending */
721    char *paramValues[4];
722    char *buffer;
723
724    if( way_table->copyMode )
725    {
726      char *tag_buf = pgsql_store_tags(tags,1);
727      char *node_buf = pgsql_store_nodes(nds, nd_count);
728      int length = strlen(tag_buf) + strlen(node_buf) + 64;
729      buffer = alloca(length);
730      if( snprintf( buffer, length, "%d\t%s\t%s\t%c\n", 
731              way_id, node_buf, tag_buf, pending?'t':'f' ) > (length-10) )
732      { fprintf( stderr, "buffer overflow way id %d\n", way_id); return 1; }
733      return pgsql_CopyData(__FUNCTION__, way_table->sql_conn, buffer);
734    }
735    buffer = alloca(64);
736    paramValues[0] = buffer;
737    paramValues[3] = paramValues[0] + sprintf( paramValues[0], "%d", way_id ) + 1;
738    sprintf( paramValues[3], "%c", pending?'t':'f' );
739    paramValues[1] = pgsql_store_nodes(nds, nd_count);
740    paramValues[2] = pgsql_store_tags(tags,0);
741    pgsql_execPrepared(way_table->sql_conn, "insert_way", 4, (const char * const *)paramValues, PGRES_COMMAND_OK);
742    return 0;
743}
744
745/* Caller is responsible for freeing nodesptr & resetList(tags) */
746static int pgsql_ways_get(int id, struct keyval *tags, struct osmNode **nodes_ptr, int *count_ptr)
747{
748    PGresult   *res;
749    char tmp[16];
750    char const *paramValues[1];
751    PGconn *sql_conn = way_table->sql_conn;
752
753    /* Make sure we're out of copy mode */
754    pgsql_endCopy( way_table );
755
756    snprintf(tmp, sizeof(tmp), "%d", id);
757    paramValues[0] = tmp;
758 
759    res = pgsql_execPrepared(sql_conn, "get_way", 1, paramValues, PGRES_TUPLES_OK);
760
761    if (PQntuples(res) != 1) {
762        PQclear(res);
763        return 1;
764    } 
765
766    pgsql_parse_tags( PQgetvalue(res, 0, 1), tags );
767
768    int num_nodes = strtol(PQgetvalue(res, 0, 2), NULL, 10);
769    int *list = alloca( sizeof(int)*num_nodes );
770    *nodes_ptr = malloc( sizeof(struct osmNode) * num_nodes );
771    pgsql_parse_nodes( PQgetvalue(res, 0, 0), list, num_nodes);
772   
773    *count_ptr = pgsql_nodes_get_list( *nodes_ptr, list, num_nodes);
774    PQclear(res);
775    return 0;
776}
777
778static int pgsql_ways_done(int id)
779{
780    char tmp[16];
781    char const *paramValues[1];
782    PGconn *sql_conn = way_table->sql_conn;
783
784    /* Make sure we're out of copy mode */
785    pgsql_endCopy( way_table );
786
787    snprintf(tmp, sizeof(tmp), "%d", id);
788    paramValues[0] = tmp;
789 
790    pgsql_execPrepared(sql_conn, "way_done", 1, paramValues, PGRES_COMMAND_OK);
791
792    return 0;
793}
794
795static int pgsql_ways_delete(int osm_id)
796{
797    char const *paramValues[1];
798    char buffer[64];
799    /* Make sure we're out of copy mode */
800    pgsql_endCopy( way_table );
801   
802    sprintf( buffer, "%d", osm_id );
803    paramValues[0] = buffer;
804    pgsql_execPrepared(way_table->sql_conn, "delete_way", 1, paramValues, PGRES_COMMAND_OK );
805    return 0;
806}
807
808static void pgsql_iterate_ways(int (*callback)(int id, struct keyval *tags, struct osmNode *nodes, int count, int exists))
809{
810    PGresult   *res_ways;
811    int i, count = 0;
812    /* The flag we pass to indicate that the way in question might exist already in the database */
813    int exists = Append;
814
815    fprintf(stderr, "\nGoing over pending ways\n");
816
817    /* Make sure we're out of copy mode */
818    pgsql_endCopy( way_table );
819   
820    res_ways = pgsql_execPrepared(way_table->sql_conn, "pending_ways", 0, NULL, PGRES_TUPLES_OK);
821
822    //fprintf(stderr, "\nIterating ways\n");
823    for (i = 0; i < PQntuples(res_ways); i++) {
824        int id = strtol(PQgetvalue(res_ways, i, 0), NULL, 10);
825        struct keyval tags;
826        struct osmNode *nodes;
827        int nd_count;
828
829        if (count++ %1000 == 0)
830                fprintf(stderr, "\rprocessing way (%dk)", count/1000);
831
832        initList(&tags);
833        if( pgsql_ways_get(id, &tags, &nodes, &nd_count) )
834          continue;
835         
836        callback(id, &tags, nodes, nd_count, exists);
837        pgsql_ways_done( id );
838
839        free(nodes);
840        resetList(&tags);
841    }
842
843    PQclear(res_ways);
844    fprintf(stderr, "\n");
845}
846
847static int pgsql_way_changed(int osm_id)
848{
849    char const *paramValues[1];
850    char buffer[64];
851    /* Make sure we're out of copy mode */
852    pgsql_endCopy( rel_table );
853   
854    sprintf( buffer, "%d", osm_id );
855    paramValues[0] = buffer;
856    pgsql_execPrepared(rel_table->sql_conn, "way_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
857    return 0;
858}
859
860static int pgsql_rels_set(int id, struct member *members, int member_count, struct keyval *tags)
861{
862    /* Params: id, way_off, rel_off, parts, members, tags */
863    char *paramValues[6];
864    char *buffer;
865    int i;
866    struct keyval member_list;
867   
868    int node_parts[member_count], node_count = 0,
869        ways_parts[member_count], ways_count = 0,
870        rels_parts[member_count], rels_count = 0;
871   
872    int all_parts[member_count], all_count = 0;
873    initList( &member_list );   
874    for( i=0; i<member_count; i++ )
875    {
876      char tag = 0;
877      switch( members[i].type )
878      {
879        case OSMTYPE_NODE:     node_parts[node_count++] = members[i].id; tag = 'n'; break;
880        case OSMTYPE_WAY:      ways_parts[ways_count++] = members[i].id; tag = 'w'; break;
881        case OSMTYPE_RELATION: rels_parts[rels_count++] = members[i].id; tag = 'r'; break;
882        default: fprintf( stderr, "Internal error: Unknown member type %d\n", members[i].type ); exit_nicely();
883      }
884      char buf[64];
885      sprintf( buf, "%c%d", tag, members[i].id );
886      addItem( &member_list, buf, members[i].role, 0 );
887    }
888    memcpy( all_parts+all_count, node_parts, node_count*sizeof(int) ); all_count+=node_count;
889    memcpy( all_parts+all_count, ways_parts, ways_count*sizeof(int) ); all_count+=ways_count;
890    memcpy( all_parts+all_count, rels_parts, rels_count*sizeof(int) ); all_count+=rels_count;
891 
892    if( rel_table->copyMode )
893    {
894      char *tag_buf = strdup(pgsql_store_tags(tags,1));
895      char *member_buf = pgsql_store_tags(&member_list,1);
896      char *parts_buf = pgsql_store_nodes(all_parts, all_count);
897      int length = strlen(member_buf) + strlen(tag_buf) + strlen(parts_buf) + 64;
898      buffer = alloca(length);
899      if( snprintf( buffer, length, "%d\t%d\t%d\t%s\t%s\t%s\tf\n", 
900              id, node_count, node_count+ways_count, parts_buf, member_buf, tag_buf ) > (length-10) )
901      { fprintf( stderr, "buffer overflow relation id %d\n", id); return 1; }
902      free(tag_buf);
903      resetList(&member_list);
904      return pgsql_CopyData(__FUNCTION__, rel_table->sql_conn, buffer);
905    }
906    buffer = alloca(64);
907    paramValues[0] = buffer;
908    paramValues[1] = paramValues[0] + sprintf( paramValues[0], "%d", id ) + 1;
909    paramValues[2] = paramValues[1] + sprintf( paramValues[1], "%d", node_count ) + 1;
910    sprintf( paramValues[2], "%d", node_count+ways_count );
911    paramValues[3] = pgsql_store_nodes(all_parts, all_count);
912    paramValues[4] = pgsql_store_tags(&member_list,0);
913    if( paramValues[4] )
914        paramValues[4] = strdup(paramValues[4]);
915    paramValues[5] = pgsql_store_tags(tags,0);
916    pgsql_execPrepared(rel_table->sql_conn, "insert_rel", 6, (const char * const *)paramValues, PGRES_COMMAND_OK);
917    if( paramValues[4] )
918        free(paramValues[4]);
919    resetList(&member_list);
920    return 0;
921}
922
923/* Caller is responsible for freeing members & resetList(tags) */
924static int pgsql_rels_get(int id, struct member **members, int *member_count, struct keyval *tags)
925{
926    PGresult   *res;
927    char tmp[16];
928    char const *paramValues[1];
929    PGconn *sql_conn = rel_table->sql_conn;
930    struct keyval member_temp;
931
932    /* Make sure we're out of copy mode */
933    pgsql_endCopy( rel_table );
934
935    snprintf(tmp, sizeof(tmp), "%d", id);
936    paramValues[0] = tmp;
937 
938    res = pgsql_execPrepared(sql_conn, "get_rel", 1, paramValues, PGRES_TUPLES_OK);
939    /* Fields are: members, tags, member_count */
940
941    if (PQntuples(res) != 1) {
942        PQclear(res);
943        return 1;
944    } 
945
946    pgsql_parse_tags( PQgetvalue(res, 0, 1), tags );
947    initList(&member_temp);
948    pgsql_parse_tags( PQgetvalue(res, 0, 0), &member_temp );
949
950    int num_members = strtol(PQgetvalue(res, 0, 2), NULL, 10);
951    struct member *list = malloc( sizeof(struct member)*num_members );
952   
953    int i=0;
954    struct keyval *item;
955    while( (item = popItem(&member_temp)) )
956    {
957        if( i >= num_members )
958        {
959            fprintf( stderr, "Unexpected member_count reading relation %d\n", id );
960            exit_nicely();
961        }
962        char tag = item->key[0];
963        list[i].type = (tag == 'n')?OSMTYPE_NODE:(tag == 'w')?OSMTYPE_WAY:(tag == 'r')?OSMTYPE_RELATION:-1;
964        list[i].id = strtol(item->key+1, NULL, 10 );
965        list[i].role = strdup( item->value );
966        freeItem(item);
967        i++;
968    }
969    *members = list;
970    *member_count = num_members;
971    PQclear(res);
972    return 0;
973}
974
975static int pgsql_rels_done(int id)
976{
977    char tmp[16];
978    char const *paramValues[1];
979    PGconn *sql_conn = rel_table->sql_conn;
980
981    /* Make sure we're out of copy mode */
982    pgsql_endCopy( rel_table );
983
984    snprintf(tmp, sizeof(tmp), "%d", id);
985    paramValues[0] = tmp;
986 
987    pgsql_execPrepared(sql_conn, "rel_done", 1, paramValues, PGRES_COMMAND_OK);
988
989    return 0;
990}
991
992static int pgsql_rels_delete(int osm_id)
993{
994    char const *paramValues[1];
995    char buffer[64];
996    /* Make sure we're out of copy mode */
997    pgsql_endCopy( rel_table );
998   
999    sprintf( buffer, "%d", osm_id );
1000    paramValues[0] = buffer;
1001    pgsql_execPrepared(rel_table->sql_conn, "delete_rel", 1, paramValues, PGRES_COMMAND_OK );
1002    return 0;
1003}
1004
1005static void pgsql_iterate_relations(int (*callback)(int id, struct member *members, int member_count, struct keyval *tags, int exists))
1006{
1007    PGresult   *res_rels;
1008    int i, count = 0;
1009    /* The flag we pass to indicate that the way in question might exist already in the database */
1010    int exists = Append;
1011
1012    fprintf(stderr, "\nGoing over pending relations\n");
1013
1014    /* Make sure we're out of copy mode */
1015    pgsql_endCopy( rel_table );
1016   
1017    res_rels = pgsql_execPrepared(rel_table->sql_conn, "pending_rels", 0, NULL, PGRES_TUPLES_OK);
1018
1019    //fprintf(stderr, "\nIterating ways\n");
1020    for (i = 0; i < PQntuples(res_rels); i++) {
1021        int id = strtol(PQgetvalue(res_rels, i, 0), NULL, 10);
1022        struct keyval tags;
1023        struct member *members;
1024        int member_count;
1025
1026        if (count++ %1000 == 0)
1027                fprintf(stderr, "\rprocessing relation (%dk)", count/1000);
1028
1029        initList(&tags);
1030        if( pgsql_rels_get(id, &members, &member_count, &tags) )
1031          continue;
1032         
1033        callback(id, members, member_count, &tags, exists);
1034        pgsql_rels_done( id );
1035
1036        free(members);
1037        resetList(&tags);
1038    }
1039
1040    PQclear(res_rels);
1041    fprintf(stderr, "\n");
1042}
1043
1044static int pgsql_rel_changed(int osm_id)
1045{
1046    char const *paramValues[1];
1047    char buffer[64];
1048    /* Make sure we're out of copy mode */
1049    pgsql_endCopy( rel_table );
1050   
1051    sprintf( buffer, "%d", osm_id );
1052    paramValues[0] = buffer;
1053    pgsql_execPrepared(rel_table->sql_conn, "rel_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
1054    return 0;
1055}
1056
1057static void pgsql_analyze(void)
1058{
1059    int i;
1060
1061    for (i=0; i<num_tables; i++) {
1062        PGconn *sql_conn = tables[i].sql_conn;
1063 
1064        if (tables[i].analyze) {
1065            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].analyze);
1066        }
1067    }
1068}
1069
1070static void pgsql_end(void)
1071{
1072    int i;
1073
1074    for (i=0; i<num_tables; i++) {
1075        PGconn *sql_conn = tables[i].sql_conn;
1076 
1077        // Commit transaction
1078        if (tables[i].stop) {
1079            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].stop);
1080        }
1081
1082    }
1083}
1084
1085/* Replace %s with prefix */
1086static inline void set_prefix( const char *prefix, const char **string )
1087{
1088  char buffer[1024];
1089  if( *string == NULL )
1090      return;
1091  sprintf( buffer, *string, prefix, prefix, prefix, prefix, prefix, prefix );
1092  *string = strdup( buffer );
1093}
1094
1095static int build_indexes;
1096
1097static int pgsql_start(const struct output_options *options)
1098{
1099    PGresult   *res;
1100    int i;
1101    int have_intarray = 0;
1102    int dropcreate = !options->append;
1103
1104    scale = options->scale;
1105    Append = options->append;
1106   
1107    /* How much we can fit, and make sure it's odd */
1108    maxBlocks = (options->cache*((1024*1024)/(PER_BLOCK*sizeof(struct ramNode)))) | 1;
1109    queue = malloc( maxBlocks * sizeof(struct ramNodeBlock) );   
1110   
1111    fprintf( stderr, "Mid: pgsql, scale=%d, cache=%dMB, maxblocks=%d*%zd\n", scale, options->cache, maxBlocks, PER_BLOCK*sizeof(struct ramNode) ); 
1112   
1113    /* We use a connection per table to enable the use of COPY */
1114    for (i=0; i<num_tables; i++) {
1115        PGconn *sql_conn;
1116                       
1117        set_prefix( options->prefix, &(tables[i].name) );
1118        set_prefix( options->prefix, &(tables[i].start) );
1119        set_prefix( options->prefix, &(tables[i].create) );
1120        set_prefix( options->prefix, &(tables[i].prepare) );
1121        set_prefix( options->prefix, &(tables[i].prepare_intarray) );
1122        set_prefix( options->prefix, &(tables[i].copy) );
1123        set_prefix( options->prefix, &(tables[i].analyze) );
1124        set_prefix( options->prefix, &(tables[i].stop) );
1125        set_prefix( options->prefix, &(tables[i].array_indexes) );
1126
1127        fprintf(stderr, "Setting up table: %s\n", tables[i].name);
1128        sql_conn = PQconnectdb(options->conninfo);
1129
1130        /* Check to see that the backend connection was successfully made */
1131        if (PQstatus(sql_conn) != CONNECTION_OK) {
1132            fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(sql_conn));
1133            exit_nicely();
1134        }
1135        tables[i].sql_conn = sql_conn;
1136
1137        /* Not really the right place for this test, but we need a live
1138         * connection that not used for anything else yet, and we'd like to
1139         * warn users *before* we start doing mountains of work */
1140        if (i == t_node)
1141        {
1142            /* Note: this only checks for the GIST version, but recently there is also a GIN version, which may be faster... */
1143            res = PQexec(sql_conn, "select 1 from pg_opclass where opcname='gist__intbig_ops'" );
1144            if( PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1 )
1145                have_intarray = 1;
1146            else
1147                fprintf( stderr, "*** WARNING: intarray contrib module not installed\n*** The resulting database will not be usable for applying diffs.\n" );
1148            PQclear(res);
1149           
1150            if( have_intarray && !options->append )
1151                build_indexes = 1;
1152        }
1153        if (dropcreate) {
1154            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE IF EXISTS %s", tables[i].name);
1155        }
1156
1157        if (tables[i].start) {
1158            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].start);
1159        }
1160
1161        if (dropcreate && tables[i].create) {
1162            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].create);
1163        }
1164
1165        if (tables[i].prepare) {
1166            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare);
1167        }
1168
1169        if (have_intarray && tables[i].prepare_intarray) {
1170            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare_intarray);
1171        }
1172
1173        if (tables[i].copy) {
1174            pgsql_exec(sql_conn, PGRES_COPY_IN, "%s", tables[i].copy);
1175            tables[i].copyMode = 1;
1176        }
1177    }
1178
1179    return 0;
1180}
1181
1182static void *pgsql_stop_one(void *arg)
1183{
1184    struct table_desc *table = arg;
1185    PGconn *sql_conn = table->sql_conn;
1186
1187    fprintf(stderr, "Stopping table: %s\n", table->name);
1188    pgsql_endCopy(table);
1189    if (table->stop) 
1190        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", table->stop);
1191
1192    if( build_indexes && table->array_indexes ) {
1193        fprintf(stderr, "Building index on table: %s\n", table->name);
1194        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", table->array_indexes);
1195    }
1196    PQfinish(sql_conn);
1197    table->sql_conn = NULL;
1198    fprintf(stderr, "Stopped table: %s\n", table->name);
1199    return NULL;
1200}
1201
1202static void pgsql_stop(void)
1203{
1204    int i;
1205#ifdef HAVE_PTHREAD
1206    pthread_t threads[num_tables];
1207#endif
1208
1209    fprintf( stderr, "node cache: stored: %d(%.2f%%), storage efficiency: %.2f%%, hit rate: %.2f%%\n", 
1210             storedNodes, 100.0f*storedNodes/totalNodes, 100.0f*storedNodes/(usedBlocks*PER_BLOCK),
1211             100.0f*nodesCacheHits/nodesCacheLookups );
1212         
1213    for( i=0; i<usedBlocks; i++ )
1214    {
1215      free(queue[i]->nodes);
1216      queue[i]->nodes = NULL;
1217    }
1218    free(queue);
1219
1220#ifdef HAVE_PTHREAD
1221    for (i=0; i<num_tables; i++) {
1222        int ret = pthread_create(&threads[i], NULL, pgsql_stop_one, &tables[i]);
1223        if (ret) {
1224            fprintf(stderr, "pthread_create() returned an error (%d)", ret);
1225            exit_nicely();
1226        }
1227    }
1228    for (i=0; i<num_tables; i++) {
1229        int ret = pthread_join(threads[i], NULL);
1230        if (ret) {
1231            fprintf(stderr, "pthread_join() returned an error (%d)", ret);
1232            exit_nicely();
1233        }
1234    }
1235#else
1236    for (i=0; i<num_tables; i++)
1237        pgsql_stop_one(&tables[i]);
1238#endif
1239}
1240 
1241struct middle_t mid_pgsql = {
1242        start:          pgsql_start,
1243        stop:           pgsql_stop,
1244        cleanup:        pgsql_cleanup,
1245        analyze:        pgsql_analyze,
1246        end:            pgsql_end,
1247
1248        nodes_set:      pgsql_nodes_set,
1249//        nodes_get:      pgsql_nodes_get,
1250        nodes_get_list:      pgsql_nodes_get_list,
1251        nodes_delete:   pgsql_nodes_delete,
1252        node_changed:   pgsql_node_changed,
1253
1254        ways_set:       pgsql_ways_set,
1255        ways_get:       pgsql_ways_get,
1256        ways_done:      pgsql_ways_done,
1257        ways_delete:    pgsql_ways_delete,
1258        way_changed:    pgsql_way_changed,
1259
1260        relations_set:  pgsql_rels_set,
1261//        relations_get:  pgsql_rels_get,
1262        relations_done:  pgsql_rels_done,
1263        relations_delete:  pgsql_rels_delete,
1264        relation_changed:  pgsql_rel_changed,
1265
1266//        iterate_nodes:  pgsql_iterate_nodes,
1267        iterate_ways:   pgsql_iterate_ways,
1268        iterate_relations: pgsql_iterate_relations
1269};
Note: See TracBrowser for help on using the repository browser.