source: subversion/applications/utils/export/osm2pgsql/middle-pgsql.c @ 26588

Last change on this file since 26588 was 26588, checked in by apmon, 8 years ago

[osm2pgsql] Report some more timing information to see where osm2pgsql spends its time

File size: 49.8 KB
Line 
1/* Implements the mid-layer processing for osm2pgsql
2 * using several PostgreSQL tables
3 *
4 * This layer stores data read in from the planet.osm file
5 * and is then read by the backend processing code to
6 * emit the final geometry-enabled output formats
7*/
8 
9#include "config.h"
10
11#include <stdio.h>
12#include <unistd.h>
13#include <stdlib.h>
14#include <string.h>
15#include <assert.h>
16#include <math.h>
17#include <time.h>
18
19#ifdef HAVE_PTHREAD
20#include <pthread.h>
21#endif
22
23#include <libpq-fe.h>
24
25#include "osmtypes.h"
26#include "middle.h"
27#include "middle-pgsql.h"
28#include "output-pgsql.h"
29#include "pgsql.h"
30
31/* Store +-20,000km Mercator co-ordinates as fixed point 32bit number with maximum precision */
32/* Scale is chosen such that 40,000 * SCALE < 2^32          */
33#define FIXED_POINT
34
35static int scale = 100;
36#define DOUBLE_TO_FIX(x) ((int)((x) * scale))
37#define FIX_TO_DOUBLE(x) (((double)x) / scale)
38
39
40
41enum table_id {
42    t_node, t_way, t_rel
43} ;
44
45struct table_desc {
46    //enum table_id table;
47    const char *name;
48    const char *start;
49    const char *create;
50    const char *create_index;
51    const char *prepare;
52    const char *prepare_intarray;
53    const char *copy;
54    const char *analyze;
55    const char *stop;
56    const char *array_indexes;
57
58    int copyMode;    /* True if we are in copy mode */
59    PGconn *sql_conn;
60};
61
62static struct table_desc tables [] = {
63    { 
64        //table = t_node,
65         .name = "%p_nodes",
66        .start = "BEGIN;\n",
67#ifdef FIXED_POINT
68       .create = "CREATE TABLE %p_nodes (id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, lat int4 not null, lon int4 not null, tags text[]) {TABLESPACE %t};\n",
69      .prepare = "PREPARE insert_node (" POSTGRES_OSMID_TYPE ", int4, int4, text[]) AS INSERT INTO %p_nodes VALUES ($1,$2,$3,$4);\n"
70#else
71       .create = "CREATE TABLE %p_nodes (id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, lat double precision not null, lon double precision not null, tags text[]) {TABLESPACE %t};\n",
72      .prepare = "PREPARE insert_node (" POSTGRES_OSMID_TYPE ", double precision, double precision, text[]) AS INSERT INTO %p_nodes VALUES ($1,$2,$3,$4);\n"
73#endif
74               "PREPARE get_node (" POSTGRES_OSMID_TYPE ") AS SELECT lat,lon,tags FROM %p_nodes WHERE id = $1 LIMIT 1;\n"
75               "PREPARE delete_node (" POSTGRES_OSMID_TYPE ") AS DELETE FROM %p_nodes WHERE id = $1;\n",
76.prepare_intarray = // This is to fetch lots of nodes simultaneously, in order including duplicates. The commented out version doesn't do duplicates
77                  // It's not optimal as it does a Nested Loop / Index Scan which is suboptimal for large arrays
78                  //"PREPARE get_node_list(int[]) AS SELECT id, lat, lon FROM %p_nodes WHERE id = ANY($1::int4[]) ORDER BY $1::int4[] # id\n",
79                 //"PREPARE get_node_list(" POSTGRES_OSMID_TYPE "[]) AS select y.id, y.lat, y.lon from (select i, ($1)[i] as l_id from (select generate_series(1,array_length($1, 1)) as i) x) z, "
80                 //                                    "(select * from %p_nodes where id = ANY($1)) y where l_id=id order by i;\n",
81                 "PREPARE get_node_list(" POSTGRES_OSMID_TYPE "[]) AS SELECT id, lat, lon FROM %p_nodes WHERE id = ANY($1::" POSTGRES_OSMID_TYPE "[])",
82         .copy = "COPY %p_nodes FROM STDIN;\n",
83      .analyze = "ANALYZE %p_nodes;\n",
84         .stop = "COMMIT;\n"
85    },
86    { 
87        //table = t_way,
88         .name = "%p_ways",
89        .start = "BEGIN;\n",
90       .create = "CREATE TABLE %p_ways (id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, nodes " POSTGRES_OSMID_TYPE "[] not null, tags text[], pending boolean not null) {TABLESPACE %t};\n",
91 .create_index = "CREATE INDEX %p_ways_idx ON %p_ways (id) {TABLESPACE %i} WHERE pending;\n",
92.array_indexes = "CREATE INDEX %p_ways_nodes ON %p_ways USING gin (nodes) {TABLESPACE %i};\n",
93      .prepare = "PREPARE insert_way (" POSTGRES_OSMID_TYPE ", " POSTGRES_OSMID_TYPE "[], text[], boolean) AS INSERT INTO %p_ways VALUES ($1,$2,$3,$4);\n"
94               "PREPARE get_way (" POSTGRES_OSMID_TYPE ") AS SELECT nodes, tags, array_upper(nodes,1) FROM %p_ways WHERE id = $1;\n"
95               "PREPARE way_done(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_ways SET pending = false WHERE id = $1;\n"
96               "PREPARE pending_ways AS SELECT id FROM %p_ways WHERE pending;\n"
97               "PREPARE delete_way(" POSTGRES_OSMID_TYPE ") AS DELETE FROM %p_ways WHERE id = $1;\n",
98.prepare_intarray = "PREPARE node_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_ways SET pending = true WHERE nodes && ARRAY[$1] AND NOT pending;\n",
99         .copy = "COPY %p_ways FROM STDIN;\n",
100      .analyze = "ANALYZE %p_ways;\n",
101         .stop =  "COMMIT;\n"
102    },
103    { 
104        //table = t_rel,
105         .name = "%p_rels",
106        .start = "BEGIN;\n",
107       .create = "CREATE TABLE %p_rels(id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, way_off int2, rel_off int2, parts " POSTGRES_OSMID_TYPE "[], members text[], tags text[], pending boolean not null) {TABLESPACE %t};\n",
108 .create_index = "CREATE INDEX %p_rels_idx ON %p_rels (id) {TABLESPACE %i} WHERE pending;\n",
109.array_indexes = "CREATE INDEX %p_rels_parts ON %p_rels USING gin (parts) {TABLESPACE %i};\n",
110      .prepare = "PREPARE insert_rel (" POSTGRES_OSMID_TYPE ", int2, int2, " POSTGRES_OSMID_TYPE "[], text[], text[]) AS INSERT INTO %p_rels VALUES ($1,$2,$3,$4,$5,$6,false);\n"
111               "PREPARE get_rel (" POSTGRES_OSMID_TYPE ") AS SELECT members, tags, array_upper(members,1)/2 FROM %p_rels WHERE id = $1;\n"
112               "PREPARE rel_done(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = false WHERE id = $1;\n"
113               "PREPARE pending_rels AS SELECT id FROM %p_rels WHERE pending;\n"
114               "PREPARE delete_rel(" POSTGRES_OSMID_TYPE ") AS DELETE FROM %p_rels WHERE id = $1;\n",
115.prepare_intarray =
116                "PREPARE node_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = true WHERE parts && ARRAY[$1] AND parts[1:way_off] && ARRAY[$1] AND NOT pending;\n"
117                "PREPARE way_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = true WHERE parts && ARRAY[$1] AND parts[way_off+1:rel_off] && ARRAY[$1] AND NOT pending;\n"
118                "PREPARE rel_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = true WHERE parts && ARRAY[$1] AND parts[rel_off+1:array_length(parts,1)] && ARRAY[$1] AND NOT pending;\n",
119         .copy = "COPY %p_rels FROM STDIN;\n",
120      .analyze = "ANALYZE %p_rels;\n",
121         .stop =  "COMMIT;\n"
122    }
123};
124
125static const int num_tables = sizeof(tables)/sizeof(tables[0]);
126static int warn_node_order;
127static struct table_desc *node_table = &tables[t_node];
128static struct table_desc *way_table  = &tables[t_way];
129static struct table_desc *rel_table  = &tables[t_rel];
130
131/* Here we use a similar storage structure as middle-ram, except we allow
132 * the array to be lossy so we can cap the total memory usage. Hence it is a
133 * combination of a sparse array with a priority queue
134 *
135 * Like middle-ram we have a number of blocks all storing PER_BLOCK
136 * ramNodes. However, here we also track the number of nodes in each block.
137 * Seperately we have a priority queue like structure when maintains a list
138 * of all the used block so we can easily find the block with the least
139 * nodes. The cache has two phases:
140 *
141 * Phase 1: Loading initially, usedBlocks < maxBlocks. In this case when a
142 * new block is needed we simply allocate it and put it in
143 * queue[usedBlocks-1] which is the bottom of the tree. Every node added
144 * increases it's usage. When we move onto the next block we percolate this
145 * block up the queue until it reaches its correct position. The invariant
146 * is that the priority tree is complete except for this last node. We do
147 * not permit adding nodes to any other block to preserve this invariant.
148 *
149 * Phase 2: Once we've reached the maximum number of blocks permitted, we
150 * change so that the block currently be inserted into is at the top of the
151 * tree. When a new block is needed we take the one at the end of the queue,
152 * as it is the one with the least number of nodes in it. When we move onto
153 * the next block we first push the just completed block down to it's
154 * correct position in the queue and then reuse the block that now at the
155 * head.
156 *
157 * The result being that at any moment we have in memory the top maxBlock
158 * blocks in terms of number of nodes in memory. This should maximize the
159 * number of hits in lookups.
160 *
161 * Complexity:
162 *  Insert node: O(1)
163 *  Lookup node: O(1)
164 *  Add new block: O(log usedBlocks)
165 *  Reuse old block: O(log maxBlocks)
166 */
167
168struct ramNode {
169#ifdef FIXED_POINT
170    int lon;
171    int lat;
172#else
173    double lon;
174    double lat;
175#endif
176};
177
178struct ramNodeBlock {
179  struct ramNode    *nodes;
180  int used;
181};
182
183#define BLOCK_SHIFT 10
184#define PER_BLOCK  (((osmid_t)1) << BLOCK_SHIFT)
185#define NUM_BLOCKS (((osmid_t)1) << (36 - BLOCK_SHIFT))
186
187static struct ramNodeBlock blocks[NUM_BLOCKS];
188static int usedBlocks;
189/* Note: maxBlocks *must* be odd, to make sure the priority queue has no nodes with only one child */
190static int maxBlocks = 0;
191static struct ramNodeBlock **queue;
192static osmid_t storedNodes, totalNodes;
193int nodesCacheHits, nodesCacheLookups;
194
195static int Append;
196
197static inline int id2block(osmid_t id)
198{
199    // + NUM_BLOCKS/2 allows for negative IDs
200    return (id >> BLOCK_SHIFT) + NUM_BLOCKS/2;
201}
202
203static inline int id2offset(osmid_t id)
204{
205    return id & (PER_BLOCK-1);
206}
207
208static inline osmid_t block2id(int block, int offset)
209{
210    return (((osmid_t) block - NUM_BLOCKS/2) << BLOCK_SHIFT) + (osmid_t) offset;
211}
212
213#define Swap(a,b) { typeof(a) __tmp = a; a = b; b = __tmp; }
214static void percolate_up( int pos )
215{
216    int i = pos;
217    while( i > 0 )
218    {
219      int parent = (i-1)>>1;
220      if( queue[i]->used < queue[parent]->used )
221      {
222        Swap( queue[i], queue[parent] );
223        i = parent;
224      }
225      else
226        break;
227    }
228}
229
230#define UNUSED  __attribute__ ((unused))
231static int pgsql_ram_nodes_set(osmid_t id, double lat, double lon, struct keyval *tags UNUSED)
232{
233    int block  = id2block(id);
234    int offset = id2offset(id);
235   
236    totalNodes++;
237
238    if (!blocks[block].nodes) {
239        if( usedBlocks < maxBlocks )
240        {
241          /* We've just finished with the previous block, so we need to percolate it up the queue to its correct position */
242          if( usedBlocks > 0 )
243            /* Upto log(usedBlocks) iterations */
244            percolate_up( usedBlocks-1 );
245
246          blocks[block].nodes = calloc(PER_BLOCK, sizeof(struct ramNode));
247          blocks[block].used = 0;
248          if (!blocks[block].nodes) {
249              fprintf(stderr, "Error allocating nodes\n");
250              exit_nicely();
251          }
252          queue[usedBlocks] = &blocks[block];
253          usedBlocks++;
254
255          /* If we've just used up the last possible block we enter the
256           * transition and we change the invariant. To do this we percolate
257           * the newly allocated block straight to the head */
258          if( usedBlocks == maxBlocks )
259            percolate_up( usedBlocks-1 );
260        }
261        else
262        {
263          /* We've reached the maximum number of blocks, so now we push the
264           * current head of the tree down to the right level to restore the
265           * priority queue invariant. Upto log(maxBlocks) iterations */
266         
267          int i=0;
268          while( 2*i+1 < maxBlocks )
269          {
270            if( queue[2*i+1]->used <= queue[2*i+2]->used )
271            {
272              if( queue[i]->used > queue[2*i+1]->used )
273              {
274                Swap( queue[i], queue[2*i+1] );
275                i = 2*i+1;
276              }
277              else
278                break;
279            }
280            else
281            {
282              if( queue[i]->used > queue[2*i+2]->used )
283              {
284                Swap( queue[i], queue[2*i+2] );
285                i = 2*i+2;
286              }
287              else
288                break;
289            }
290          }
291          /* Now the head of the queue is the smallest, so it becomes our replacement candidate */
292          blocks[block].nodes = queue[0]->nodes;
293          blocks[block].used = 0;
294          memset( blocks[block].nodes, 0, PER_BLOCK * sizeof(struct ramNode) );
295         
296          /* Clear old head block and point to new block */
297          storedNodes -= queue[0]->used;
298          queue[0]->nodes = NULL;
299          queue[0]->used = 0;
300          queue[0] = &blocks[block];
301        }
302    }
303    else
304    {
305      /* Insert into an existing block. We can't allow this in general or it
306       * will break the invariant. However, it will work fine if all the
307       * nodes come in numerical order, which is the common case */
308     
309      int expectedpos;
310      if( usedBlocks < maxBlocks )
311        expectedpos = usedBlocks-1;
312      else
313        expectedpos = 0;
314       
315      if( queue[expectedpos] != &blocks[block] )
316      {
317        if (!warn_node_order) {
318            fprintf( stderr, "WARNING: Found Out of order node %" PRIdOSMID " (%d,%d) - this will impact the cache efficiency\n", id, block, offset );
319            warn_node_order++;
320        }
321        return 1;
322      }
323    }
324       
325#ifdef FIXED_POINT
326    blocks[block].nodes[offset].lat = DOUBLE_TO_FIX(lat);
327    blocks[block].nodes[offset].lon = DOUBLE_TO_FIX(lon);
328#else
329    blocks[block].nodes[offset].lat = lat;
330    blocks[block].nodes[offset].lon = lon;
331#endif
332    blocks[block].used++;
333    storedNodes++;
334    return 0;
335}
336
337
338int pgsql_ram_nodes_get(struct osmNode *out, osmid_t id)
339{
340    int block  = id2block(id);
341    int offset = id2offset(id);
342    nodesCacheLookups++;
343
344    if (!blocks[block].nodes)
345        return 1;
346
347    if (!blocks[block].nodes[offset].lat && !blocks[block].nodes[offset].lon)
348        return 1;
349
350#ifdef FIXED_POINT
351    out->lat = FIX_TO_DOUBLE(blocks[block].nodes[offset].lat);
352    out->lon = FIX_TO_DOUBLE(blocks[block].nodes[offset].lon);
353#else
354    out->lat = blocks[block].nodes[offset].lat;
355    out->lon = blocks[block].nodes[offset].lon;
356#endif
357    nodesCacheHits++;
358    return 0;
359}
360
361static void pgsql_cleanup(void)
362{
363    int i;
364
365    for (i=0; i<num_tables; i++) {
366        if (tables[i].sql_conn) {
367            PQfinish(tables[i].sql_conn);
368            tables[i].sql_conn = NULL;
369        }
370    }
371}
372
373char *pgsql_store_nodes(osmid_t *nds, int nd_count)
374{
375  static char *buffer;
376  static int buflen;
377
378  char *ptr;
379  int i, first;
380   
381  if( buflen <= nd_count * 10 )
382  {
383    buflen = ((nd_count * 10) | 4095) + 1;  /* Round up to next page */
384    buffer = realloc( buffer, buflen );
385  }
386_restart:
387
388  ptr = buffer;
389  first = 1;
390  *ptr++ = '{';
391  for( i=0; i<nd_count; i++ )
392  {
393    if( !first ) *ptr++ = ',';
394    ptr += sprintf(ptr, "%" PRIdOSMID, nds[i] );
395   
396    if( (ptr-buffer) > (buflen-20) ) /* Almost overflowed? */
397    {
398      buflen <<= 1;
399      buffer = realloc( buffer, buflen );
400     
401      goto _restart;
402    }
403    first = 0;
404  }
405 
406  *ptr++ = '}';
407  *ptr++ = 0;
408 
409  return buffer;
410}
411
412/* Special escape routine for escaping strings in array constants: double quote, backslash,newline, tab*/
413static inline char *escape_tag( char *ptr, const char *in, int escape )
414{
415  while( *in )
416  {
417    switch(*in)
418    {
419      case '"':
420        if( escape ) *ptr++ = '\\';
421        *ptr++ = '\\';
422        *ptr++ = '"';
423        break;
424      case '\\':
425        if( escape ) *ptr++ = '\\';
426        if( escape ) *ptr++ = '\\';
427        *ptr++ = '\\';
428        *ptr++ = '\\';
429        break;
430      case '\n':
431        if( escape ) *ptr++ = '\\';
432        *ptr++ = '\\';
433        *ptr++ = 'n';
434        break;
435      case '\r':
436        if( escape ) *ptr++ = '\\';
437        *ptr++ = '\\';
438        *ptr++ = 'r';
439        break;
440      case '\t':
441        if( escape ) *ptr++ = '\\';
442        *ptr++ = '\\';
443        *ptr++ = 't';
444        break;
445      default:
446        *ptr++ = *in;
447        break;
448    }
449    in++;
450  }
451  return ptr;
452}
453
454/* escape means we return '\N' for copy mode, otherwise we return just NULL */
455char *pgsql_store_tags(struct keyval *tags, int escape)
456{
457  static char *buffer;
458  static int buflen;
459
460  char *ptr;
461  struct keyval *i;
462  int first;
463   
464  int countlist = countList(tags);
465  if( countlist == 0 )
466  {
467    if( escape )
468      return "\\N";
469    else
470      return NULL;
471  }
472   
473  if( buflen <= countlist * 24 ) /* LE so 0 always matches */
474  {
475    buflen = ((countlist * 24) | 4095) + 1;  /* Round up to next page */
476    buffer = realloc( buffer, buflen );
477  }
478_restart:
479
480  ptr = buffer;
481  first = 1;
482  *ptr++ = '{';
483  /* The lists are circular, exit when we reach the head again */
484  for( i=tags->next; i->key; i = i->next )
485  {
486    int maxlen = (strlen(i->key) + strlen(i->value)) * 4;
487    if( (ptr+maxlen-buffer) > (buflen-20) ) /* Almost overflowed? */
488    {
489      buflen <<= 1;
490      buffer = realloc( buffer, buflen );
491     
492      goto _restart;
493    }
494    if( !first ) *ptr++ = ',';
495    *ptr++ = '"';
496    ptr = escape_tag( ptr, i->key, escape );
497    *ptr++ = '"';
498    *ptr++ = ',';
499    *ptr++ = '"';
500    ptr = escape_tag( ptr, i->value, escape );
501    *ptr++ = '"';
502   
503    first=0;
504  }
505 
506  *ptr++ = '}';
507  *ptr++ = 0;
508 
509  return buffer;
510}
511
512/* Decodes a portion of an array literal from postgres */
513/* Argument should point to beginning of literal, on return points to delimiter */
514static const char *decode_upto( const char *src, char *dst )
515{
516  int quoted = (*src == '"');
517  if( quoted ) src++;
518 
519  while( quoted ? (*src != '"') : (*src != ',' && *src != '}') )
520  {
521    if( *src == '\\' )
522    {
523      switch( src[1] )
524      {
525        case 'n': *dst++ = '\n'; break;
526        case 't': *dst++ = '\t'; break;
527        default: *dst++ = src[1]; break;
528      }
529      src+=2;
530    }
531    else
532      *dst++ = *src++;
533  }
534  if( quoted ) src++;
535  *dst = 0;
536  return src;
537}
538
539static void pgsql_parse_tags( const char *string, struct keyval *tags )
540{
541  char key[1024];
542  char val[1024];
543 
544  if( *string == '\0' )
545    return;
546   
547//  fprintf( stderr, "Parsing: %s\n", string );
548  if( *string++ != '{' )
549    return;
550  while( *string != '}' )
551  {
552    string = decode_upto( string, key );
553    /* String points to the comma */
554    string++;
555    string = decode_upto( string, val );
556    /* String points to the comma or closing '}' */
557    addItem( tags, key, val, 0 );
558//    fprintf( stderr, "Extracted item: %s=%s\n", key, val );
559    if( *string == ',' )
560      string++;
561  }
562}
563
564/* Parses an array of integers */
565static void pgsql_parse_nodes(const char *src, osmid_t *nds, int nd_count )
566{
567  int count = 0;
568  const char *string = src;
569 
570  if( *string++ != '{' )
571    return;
572  while( *string != '}' )
573  {
574    char *ptr;
575    nds[count] = strtoosmid( string, &ptr, 10 );
576    string = ptr;
577    if( *string == ',' )
578      string++;
579    count++;
580  }
581  if( count != nd_count )
582  {
583    fprintf( stderr, "parse_nodes problem: '%s' expected %d got %d\n", src, nd_count, count );
584    exit_nicely();
585  }
586}
587
588static int pgsql_endCopy( struct table_desc *table)
589{
590    /* Terminate any pending COPY */
591     if (table->copyMode) {
592        PGconn *sql_conn = table->sql_conn;
593        int stop = PQputCopyEnd(sql_conn, NULL);
594        if (stop != 1) {
595            fprintf(stderr, "COPY_END for %s failed: %s\n", table->copy, PQerrorMessage(sql_conn));
596            exit_nicely();
597        }
598
599        PGresult *res = PQgetResult(sql_conn);
600        if (PQresultStatus(res) != PGRES_COMMAND_OK) {
601            fprintf(stderr, "COPY_END for %s failed: %s\n", table->copy, PQerrorMessage(sql_conn));
602            PQclear(res);
603            exit_nicely();
604        }
605        PQclear(res);
606        table->copyMode = 0;
607    }
608    return 0;
609}
610
611static int pgsql_nodes_set(osmid_t id, double lat, double lon, struct keyval *tags)
612{
613    /* Four params: id, lat, lon, tags */
614    char *paramValues[4];
615    char *buffer;
616
617    pgsql_ram_nodes_set( id, lat, lon, tags );
618    if( node_table->copyMode )
619    {
620      char *tag_buf = pgsql_store_tags(tags,1);
621      int length = strlen(tag_buf) + 64;
622      buffer = alloca( length );
623#ifdef FIXED_POINT
624      if( snprintf( buffer, length, "%" PRIdOSMID "\t%d\t%d\t%s\n", id, DOUBLE_TO_FIX(lat), DOUBLE_TO_FIX(lon), tag_buf ) > (length-10) )
625      { fprintf( stderr, "buffer overflow node id %" PRIdOSMID "\n", id); return 1; }
626#else
627      if( snprintf( buffer, length, "%" PRIdOSMID "\t%.10f\t%.10f\t%s\n", id, lat, lon, tag_buf ) > (length-10) )
628      { fprintf( stderr, "buffer overflow node id %" PRIdOSMID "\n", id); return 1; }
629#endif
630      return pgsql_CopyData(__FUNCTION__, node_table->sql_conn, buffer);
631    }
632    buffer = alloca(64);
633    paramValues[0] = buffer;
634    paramValues[1] = paramValues[0] + sprintf( paramValues[0], "%" PRIdOSMID, id ) + 1;
635#ifdef FIXED_POINT
636    paramValues[2] = paramValues[1] + sprintf( paramValues[1], "%d", DOUBLE_TO_FIX(lat) ) + 1;
637    sprintf( paramValues[2], "%d", DOUBLE_TO_FIX(lon) );
638#else
639    paramValues[2] = paramValues[1] + sprintf( paramValues[1], "%.10f", lat ) + 1;
640    sprintf( paramValues[2], "%.10f", lon );
641#endif
642    paramValues[3] = pgsql_store_tags(tags,0);
643    pgsql_execPrepared(node_table->sql_conn, "insert_node", 4, (const char * const *)paramValues, PGRES_COMMAND_OK);
644    return 0;
645}
646
647
648static int pgsql_nodes_get(struct osmNode *out, osmid_t id)
649{
650    /* Check cache first */
651    if( pgsql_ram_nodes_get( out, id ) == 0 )
652      return 0;
653     
654    PGresult   *res;
655    char tmp[16];
656    char const *paramValues[1];
657    PGconn *sql_conn = node_table->sql_conn;
658
659    /* Make sure we're out of copy mode */
660    pgsql_endCopy( node_table );
661
662    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
663    paramValues[0] = tmp;
664 
665    res = pgsql_execPrepared(sql_conn, "get_node", 1, paramValues, PGRES_TUPLES_OK);
666
667    if (PQntuples(res) != 1) {
668        PQclear(res);
669        return 1;
670    } 
671
672#ifdef FIXED_POINT
673    out->lat = FIX_TO_DOUBLE(strtol(PQgetvalue(res, 0, 0), NULL, 10));
674    out->lon = FIX_TO_DOUBLE(strtol(PQgetvalue(res, 0, 1), NULL, 10));
675#else
676    out->lat = strtod(PQgetvalue(res, 0, 0), NULL);
677    out->lon = strtod(PQgetvalue(res, 0, 1), NULL);
678#endif
679    PQclear(res);
680    return 0;
681}
682
683/* This should be made more efficient by using an IN(ARRAY[]) construct */
684static int pgsql_nodes_get_list(struct osmNode *nodes, osmid_t *ndids, int nd_count)
685{
686    char tmp[16];
687    char *tmp2; 
688    int count, count2, countDB, countPG, i,j;
689    osmid_t id;
690    osmid_t *ndidspg;
691    struct osmNode *nodespg;
692    struct osmNode *nodes2;
693    char const *paramValues[1]; 
694
695    PGresult *res;
696    PGconn *sql_conn = node_table->sql_conn;
697   
698    count = 0; countDB = 0;
699
700    tmp2 = malloc(sizeof(char)*nd_count*16);
701    if (tmp2 == NULL) return 0; /*failed to allocate memory, return */
702
703    /* create a list of ids in tmp2 to query the database  */
704    snprintf(tmp2, sizeof(tmp2), "{");
705    for( i=0; i<nd_count; i++ ) {
706        /* Check cache first */ 
707        if( pgsql_ram_nodes_get( &nodes[i], ndids[i]) == 0 ) {
708            count++;
709            continue;
710        }
711        countDB++;
712        /* Mark nodes as needing to be fetched from the DB */
713        nodes[i].lat = NAN;
714        nodes[i].lon = NAN;
715        snprintf(tmp, sizeof(tmp), "%" PRIdOSMID ",", ndids[i]);
716        strcat(tmp2,tmp);
717    }
718    tmp2[strlen(tmp2) - 1] = '}'; /* replace last , with } to complete list of ids*/
719 
720    if (countDB == 0) {
721        free(tmp2);
722        return count; /* All ids where in cache, so nothing more to do */
723    }
724 
725    pgsql_endCopy(node_table); 
726
727    paramValues[0] = tmp2; 
728    res = pgsql_execPrepared(sql_conn, "get_node_list", 1, paramValues, PGRES_TUPLES_OK);
729    countPG = PQntuples(res);
730
731    ndidspg = malloc(sizeof(osmid_t)*countPG);
732    nodespg = malloc(sizeof(struct osmNode)*countPG);
733
734    if ((ndidspg == NULL) || (nodespg == NULL)) {
735        free(tmp2);
736        free(ndidspg);
737        free(nodespg);
738        PQclear(res);
739        return 0;
740    }
741
742    for (i = 0; i < countPG; i++) {
743        ndidspg[i] = strtoosmid(PQgetvalue(res, i, 0), NULL, 10); 
744#ifdef FIXED_POINT
745        nodespg[i].lat = FIX_TO_DOUBLE(strtol(PQgetvalue(res, i, 1), NULL, 10)); 
746        nodespg[i].lon = FIX_TO_DOUBLE(strtol(PQgetvalue(res, i, 2), NULL, 10)); 
747#else
748        nodespg[i].lat = strtod(PQgetvalue(res, i, 1), NULL); 
749        nodespg[i].lon = strtod(PQgetvalue(res, i, 2), NULL); 
750#endif
751    }
752 
753 
754    /* The list of results coming back from the db is in a different order to the list of nodes in the way.
755       Match the results back to the way node list */
756   
757    for (i=0; i<nd_count; i++ ) {
758        if ((isnan(nodes[i].lat)) || (isnan(nodes[i].lon))) {
759            /* TODO: implement an O(log(n)) algorithm to match node ids */
760            for (j = 0; j < countPG; j++) {
761                if (ndidspg[j] == ndids[i]) {
762                    nodes[i].lat = nodespg[j].lat;
763                    nodes[i].lon = nodespg[j].lon;
764                    count++;
765                    break;
766                }
767            }
768        }
769    }
770
771    /* If some of the nodes in the way don't exist, the returning list has holes.
772       As the rest of the code expects a continous list, it needs to be re-compacted */
773    if (count != nd_count) {
774        j = 0;
775        for (i = 0; i < nd_count; i++) {
776            if ( !isnan(nodes[i].lat)) {
777                nodes[j].lat = nodes[i].lat;
778                nodes[j].lon = nodes[i].lon;
779                j++;
780            }
781         }
782    }
783
784    PQclear(res);
785    free(tmp2);
786    free(ndidspg);
787    free(nodespg);
788
789    return count;
790}
791
792static int pgsql_nodes_delete(osmid_t osm_id)
793{
794    char const *paramValues[1];
795    char buffer[64];
796    /* Make sure we're out of copy mode */
797    pgsql_endCopy( node_table );
798   
799    sprintf( buffer, "%" PRIdOSMID, osm_id );
800    paramValues[0] = buffer;
801    pgsql_execPrepared(node_table->sql_conn, "delete_node", 1, paramValues, PGRES_COMMAND_OK );
802    return 0;
803}
804
805static int pgsql_node_changed(osmid_t osm_id)
806{
807    char const *paramValues[1];
808    char buffer[64];
809    /* Make sure we're out of copy mode */
810    pgsql_endCopy( way_table );
811    pgsql_endCopy( rel_table );
812   
813    sprintf( buffer, "%" PRIdOSMID, osm_id );
814    paramValues[0] = buffer;
815    pgsql_execPrepared(way_table->sql_conn, "node_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
816    pgsql_execPrepared(rel_table->sql_conn, "node_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
817    return 0;
818}
819
820static int pgsql_ways_set(osmid_t way_id, osmid_t *nds, int nd_count, struct keyval *tags, int pending)
821{
822    /* Three params: id, nodes, tags, pending */
823    char *paramValues[4];
824    char *buffer;
825
826    if( way_table->copyMode )
827    {
828      char *tag_buf = pgsql_store_tags(tags,1);
829      char *node_buf = pgsql_store_nodes(nds, nd_count);
830      int length = strlen(tag_buf) + strlen(node_buf) + 64;
831      buffer = alloca(length);
832      if( snprintf( buffer, length, "%" PRIdOSMID "\t%s\t%s\t%c\n", 
833              way_id, node_buf, tag_buf, pending?'t':'f' ) > (length-10) )
834      { fprintf( stderr, "buffer overflow way id %" PRIdOSMID "\n", way_id); return 1; }
835      return pgsql_CopyData(__FUNCTION__, way_table->sql_conn, buffer);
836    }
837    buffer = alloca(64);
838    paramValues[0] = buffer;
839    paramValues[3] = paramValues[0] + sprintf( paramValues[0], "%" PRIdOSMID, way_id ) + 1;
840    sprintf( paramValues[3], "%c", pending?'t':'f' );
841    paramValues[1] = pgsql_store_nodes(nds, nd_count);
842    paramValues[2] = pgsql_store_tags(tags,0);
843    pgsql_execPrepared(way_table->sql_conn, "insert_way", 4, (const char * const *)paramValues, PGRES_COMMAND_OK);
844    return 0;
845}
846
847/* Caller is responsible for freeing nodesptr & resetList(tags) */
848static int pgsql_ways_get(osmid_t id, struct keyval *tags, struct osmNode **nodes_ptr, int *count_ptr)
849{
850    PGresult   *res;
851    char tmp[16];
852    char const *paramValues[1];
853    PGconn *sql_conn = way_table->sql_conn;
854
855    /* Make sure we're out of copy mode */
856    pgsql_endCopy( way_table );
857
858    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
859    paramValues[0] = tmp;
860 
861    res = pgsql_execPrepared(sql_conn, "get_way", 1, paramValues, PGRES_TUPLES_OK);
862
863    if (PQntuples(res) != 1) {
864        PQclear(res);
865        return 1;
866    } 
867
868    pgsql_parse_tags( PQgetvalue(res, 0, 1), tags );
869
870    int num_nodes = strtol(PQgetvalue(res, 0, 2), NULL, 10);
871    osmid_t *list = alloca(sizeof(osmid_t)*num_nodes );
872    *nodes_ptr = malloc(sizeof(struct osmNode) * num_nodes);
873    pgsql_parse_nodes( PQgetvalue(res, 0, 0), list, num_nodes);
874   
875    *count_ptr = pgsql_nodes_get_list( *nodes_ptr, list, num_nodes);
876    PQclear(res);
877    return 0;
878}
879
880static int pgsql_ways_done(osmid_t id)
881{
882    char tmp[16];
883    char const *paramValues[1];
884    PGconn *sql_conn = way_table->sql_conn;
885
886    /* Make sure we're out of copy mode */
887    pgsql_endCopy( way_table );
888
889    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
890    paramValues[0] = tmp;
891 
892    pgsql_execPrepared(sql_conn, "way_done", 1, paramValues, PGRES_COMMAND_OK);
893
894    return 0;
895}
896
897static int pgsql_ways_delete(osmid_t osm_id)
898{
899    char const *paramValues[1];
900    char buffer[64];
901    /* Make sure we're out of copy mode */
902    pgsql_endCopy( way_table );
903   
904    sprintf( buffer, "%" PRIdOSMID, osm_id );
905    paramValues[0] = buffer;
906    pgsql_execPrepared(way_table->sql_conn, "delete_way", 1, paramValues, PGRES_COMMAND_OK );
907    return 0;
908}
909
910static void pgsql_iterate_ways(int (*callback)(osmid_t id, struct keyval *tags, struct osmNode *nodes, int count, int exists))
911{
912    PGresult   *res_ways;
913    int i, count = 0;
914    /* The flag we pass to indicate that the way in question might exist already in the database */
915    int exists = Append;
916
917    time_t start, end;
918    time(&start);
919    fprintf(stderr, "\nGoing over pending ways\n");
920
921    /* Make sure we're out of copy mode */
922    pgsql_endCopy( way_table );
923   
924    res_ways = pgsql_execPrepared(way_table->sql_conn, "pending_ways", 0, NULL, PGRES_TUPLES_OK);
925
926    //fprintf(stderr, "\nIterating ways\n");
927    for (i = 0; i < PQntuples(res_ways); i++) {
928        osmid_t id = strtoosmid(PQgetvalue(res_ways, i, 0), NULL, 10);
929        struct keyval tags;
930        struct osmNode *nodes;
931        int nd_count;
932
933        if (count++ %1000 == 0) {
934            time(&end);
935            fprintf(stderr, "\rprocessing way (%dk) at %.2fk/s", count/1000,
936            (int)(end - start) > 0 ? ((double)count / 1000.0 / (double)(end - start)) : 0);
937        }
938
939        initList(&tags);
940        if( pgsql_ways_get(id, &tags, &nodes, &nd_count) )
941          continue;
942         
943        callback(id, &tags, nodes, nd_count, exists);
944        pgsql_ways_done( id );
945
946        free(nodes);
947        resetList(&tags);
948    }
949
950    PQclear(res_ways);
951    fprintf(stderr, "\n");
952    time(&end);
953    if (end - start > 0)
954        fprintf(stderr, "Pending ways took %ds at a rate of %.2f/s\n",(int)(end - start), ((double)count / (double)(end - start)));
955}
956
957static int pgsql_way_changed(osmid_t osm_id)
958{
959    char const *paramValues[1];
960    char buffer[64];
961    /* Make sure we're out of copy mode */
962    pgsql_endCopy( rel_table );
963   
964    sprintf( buffer, "%" PRIdOSMID, osm_id );
965    paramValues[0] = buffer;
966    pgsql_execPrepared(rel_table->sql_conn, "way_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
967    return 0;
968}
969
970static int pgsql_rels_set(osmid_t id, struct member *members, int member_count, struct keyval *tags)
971{
972    /* Params: id, way_off, rel_off, parts, members, tags */
973    char *paramValues[6];
974    char *buffer;
975    int i;
976    struct keyval member_list;
977   
978    osmid_t node_parts[member_count],
979            way_parts[member_count],
980            rel_parts[member_count];
981    int node_count = 0, way_count = 0, rel_count = 0;
982   
983    osmid_t all_parts[member_count];
984    int all_count = 0;
985    initList( &member_list );   
986    for( i=0; i<member_count; i++ )
987    {
988      char tag = 0;
989      switch( members[i].type )
990      {
991        case OSMTYPE_NODE:     node_parts[node_count++] = members[i].id; tag = 'n'; break;
992        case OSMTYPE_WAY:      way_parts[way_count++] = members[i].id; tag = 'w'; break;
993        case OSMTYPE_RELATION: rel_parts[rel_count++] = members[i].id; tag = 'r'; break;
994        default: fprintf( stderr, "Internal error: Unknown member type %d\n", members[i].type ); exit_nicely();
995      }
996      char buf[64];
997      sprintf( buf, "%c%" PRIdOSMID, tag, members[i].id );
998      addItem( &member_list, buf, members[i].role, 0 );
999    }
1000    memcpy( all_parts+all_count, node_parts, node_count*sizeof(int) ); all_count+=node_count;
1001    memcpy( all_parts+all_count, way_parts, way_count*sizeof(int) ); all_count+=way_count;
1002    memcpy( all_parts+all_count, rel_parts, rel_count*sizeof(int) ); all_count+=rel_count;
1003 
1004    if( rel_table->copyMode )
1005    {
1006      char *tag_buf = strdup(pgsql_store_tags(tags,1));
1007      char *member_buf = pgsql_store_tags(&member_list,1);
1008      char *parts_buf = pgsql_store_nodes(all_parts, all_count);
1009      int length = strlen(member_buf) + strlen(tag_buf) + strlen(parts_buf) + 64;
1010      buffer = alloca(length);
1011      if( snprintf( buffer, length, "%" PRIdOSMID "\t%d\t%d\t%s\t%s\t%s\tf\n", 
1012              id, node_count, node_count+way_count, parts_buf, member_buf, tag_buf ) > (length-10) )
1013      { fprintf( stderr, "buffer overflow relation id %" PRIdOSMID "\n", id); return 1; }
1014      free(tag_buf);
1015      resetList(&member_list);
1016      return pgsql_CopyData(__FUNCTION__, rel_table->sql_conn, buffer);
1017    }
1018    buffer = alloca(64);
1019    paramValues[0] = buffer;
1020    paramValues[1] = paramValues[0] + sprintf( paramValues[0], "%" PRIdOSMID, id ) + 1;
1021    paramValues[2] = paramValues[1] + sprintf( paramValues[1], "%d", node_count ) + 1;
1022    sprintf( paramValues[2], "%d", node_count+way_count );
1023    paramValues[3] = pgsql_store_nodes(all_parts, all_count);
1024    paramValues[4] = pgsql_store_tags(&member_list,0);
1025    if( paramValues[4] )
1026        paramValues[4] = strdup(paramValues[4]);
1027    paramValues[5] = pgsql_store_tags(tags,0);
1028    pgsql_execPrepared(rel_table->sql_conn, "insert_rel", 6, (const char * const *)paramValues, PGRES_COMMAND_OK);
1029    if( paramValues[4] )
1030        free(paramValues[4]);
1031    resetList(&member_list);
1032    return 0;
1033}
1034
1035/* Caller is responsible for freeing members & resetList(tags) */
1036static int pgsql_rels_get(osmid_t id, struct member **members, int *member_count, struct keyval *tags)
1037{
1038    PGresult   *res;
1039    char tmp[16];
1040    char const *paramValues[1];
1041    PGconn *sql_conn = rel_table->sql_conn;
1042    struct keyval member_temp;
1043
1044    /* Make sure we're out of copy mode */
1045    pgsql_endCopy( rel_table );
1046
1047    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
1048    paramValues[0] = tmp;
1049 
1050    res = pgsql_execPrepared(sql_conn, "get_rel", 1, paramValues, PGRES_TUPLES_OK);
1051    /* Fields are: members, tags, member_count */
1052
1053    if (PQntuples(res) != 1) {
1054        PQclear(res);
1055        return 1;
1056    } 
1057
1058    pgsql_parse_tags( PQgetvalue(res, 0, 1), tags );
1059    initList(&member_temp);
1060    pgsql_parse_tags( PQgetvalue(res, 0, 0), &member_temp );
1061
1062    int num_members = strtol(PQgetvalue(res, 0, 2), NULL, 10);
1063    struct member *list = malloc( sizeof(struct member)*num_members );
1064   
1065    int i=0;
1066    struct keyval *item;
1067    while( (item = popItem(&member_temp)) )
1068    {
1069        if( i >= num_members )
1070        {
1071            fprintf(stderr, "Unexpected member_count reading relation %" PRIdOSMID "\n", id);
1072            exit_nicely();
1073        }
1074        char tag = item->key[0];
1075        list[i].type = (tag == 'n')?OSMTYPE_NODE:(tag == 'w')?OSMTYPE_WAY:(tag == 'r')?OSMTYPE_RELATION:-1;
1076        list[i].id = strtoosmid(item->key+1, NULL, 10 );
1077        list[i].role = strdup( item->value );
1078        freeItem(item);
1079        i++;
1080    }
1081    *members = list;
1082    *member_count = num_members;
1083    PQclear(res);
1084    return 0;
1085}
1086
1087static int pgsql_rels_done(osmid_t id)
1088{
1089    char tmp[16];
1090    char const *paramValues[1];
1091    PGconn *sql_conn = rel_table->sql_conn;
1092
1093    /* Make sure we're out of copy mode */
1094    pgsql_endCopy( rel_table );
1095
1096    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
1097    paramValues[0] = tmp;
1098 
1099    pgsql_execPrepared(sql_conn, "rel_done", 1, paramValues, PGRES_COMMAND_OK);
1100
1101    return 0;
1102}
1103
1104static int pgsql_rels_delete(osmid_t osm_id)
1105{
1106    char const *paramValues[1];
1107    char buffer[64];
1108    /* Make sure we're out of copy mode */
1109    pgsql_endCopy( rel_table );
1110   
1111    sprintf( buffer, "%" PRIdOSMID, osm_id );
1112    paramValues[0] = buffer;
1113    pgsql_execPrepared(rel_table->sql_conn, "delete_rel", 1, paramValues, PGRES_COMMAND_OK );
1114    return 0;
1115}
1116
1117static void pgsql_iterate_relations(int (*callback)(osmid_t id, struct member *members, int member_count, struct keyval *tags, int exists))
1118{
1119    PGresult   *res_rels;
1120    int i, count = 0;
1121    /* The flag we pass to indicate that the way in question might exist already in the database */
1122    int exists = Append;
1123
1124    time_t start, end;
1125    time(&start);
1126    fprintf(stderr, "\nGoing over pending relations\n");
1127
1128    /* Make sure we're out of copy mode */
1129    pgsql_endCopy( rel_table );
1130   
1131    res_rels = pgsql_execPrepared(rel_table->sql_conn, "pending_rels", 0, NULL, PGRES_TUPLES_OK);
1132
1133    //fprintf(stderr, "\nIterating ways\n");
1134    for (i = 0; i < PQntuples(res_rels); i++) {
1135        osmid_t id = strtoosmid(PQgetvalue(res_rels, i, 0), NULL, 10);
1136        struct keyval tags;
1137        struct member *members;
1138        int member_count;
1139
1140        if (count++ %10 == 0) {
1141            time(&end);
1142            fprintf(stderr, "\rprocessing relation (%d) at %.2f/s", count,
1143                    (int)(end - start) > 0 ? ((double)count / (double)(end - start)) : 0);
1144        }
1145
1146        initList(&tags);
1147        if( pgsql_rels_get(id, &members, &member_count, &tags) )
1148          continue;
1149         
1150        callback(id, members, member_count, &tags, exists);
1151        pgsql_rels_done( id );
1152
1153        free(members);
1154        resetList(&tags);
1155    }
1156
1157    PQclear(res_rels);
1158    fprintf(stderr, "\n");
1159    time(&end);
1160    if (end - start > 0)
1161        fprintf(stderr, "Pending relations took %ds at a rate of %.2f/s\n",(int)(end - start), ((double)count / (double)(end - start)));
1162}
1163
1164static int pgsql_rel_changed(osmid_t osm_id)
1165{
1166    char const *paramValues[1];
1167    char buffer[64];
1168    /* Make sure we're out of copy mode */
1169    pgsql_endCopy( rel_table );
1170   
1171    sprintf( buffer, "%" PRIdOSMID, osm_id );
1172    paramValues[0] = buffer;
1173    pgsql_execPrepared(rel_table->sql_conn, "rel_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
1174    return 0;
1175}
1176
1177static void pgsql_analyze(void)
1178{
1179    int i;
1180
1181    for (i=0; i<num_tables; i++) {
1182        PGconn *sql_conn = tables[i].sql_conn;
1183 
1184        if (tables[i].analyze) {
1185            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].analyze);
1186        }
1187    }
1188}
1189
1190static void pgsql_end(void)
1191{
1192    int i;
1193
1194    for (i=0; i<num_tables; i++) {
1195        PGconn *sql_conn = tables[i].sql_conn;
1196 
1197        // Commit transaction
1198        if (tables[i].stop) {
1199            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].stop);
1200        }
1201
1202    }
1203}
1204
1205/**
1206 * Helper to create SQL queries.
1207 *
1208 * Takes four arguments:
1209 * prefix, t (tablespace) and i (index tablespace) - these may be NULL
1210 * a pointer to an input string which, on success, is changed to point to the
1211 * result (caller takes ownership).
1212 *
1213 * The input string is mangled as follows:
1214 * %p replaced by the given prefix,
1215 * %i replaced by the given index tablespace
1216 * %t replaced by the given tablespace
1217 * other occurrences of the "%" char are treated normally.
1218 * any occurrence of { or } will be ignored (not copied to output string);
1219 * anything inside {} is only copied if it contained at least one of
1220 * %p, %i, %t that was not NULL.
1221 *
1222 * So, the input string
1223 *    Hello{ dear %i}!
1224 * will, if i is set to "John", translate to
1225 *    Hello dear John!
1226 * but if i is unset, translate to
1227 *    Hello!
1228 *
1229 * This is used for constructing SQL queries with proper tablespace settings.
1230 */
1231static inline void set_prefix_and_tbls(const char *prefix, const char *t, const char *i, const char **string)
1232{
1233    char buffer[1024];
1234    if (*string == NULL) return;
1235    const char *source = *string;
1236    char *dest = buffer;
1237    char *openbrace = NULL;
1238    int copied = 0;
1239
1240    while (*source) {
1241        if (*source == '{') {
1242            openbrace = dest;
1243            copied = 0;
1244            source++;
1245            continue;
1246        } else if (*source == '}') {
1247            if (!copied && openbrace) dest = openbrace;
1248            source++;
1249            continue;
1250        } else if (*source == '%') {
1251            if (*(source+1) == 'p') {
1252                if (prefix) {
1253                    strcpy(dest, prefix);
1254                    dest += strlen(prefix);
1255                    copied = 1;
1256                }
1257                source+=2;
1258                continue;
1259            } else if (*(source+1) == 't') {
1260                if (t) {
1261                    strcpy(dest, t);
1262                    dest += strlen(t);
1263                    copied = 1;
1264                }
1265                source+=2;
1266                continue;
1267            } else if (*(source+1) == 'i') {
1268                if (i) {
1269                    strcpy(dest, i);
1270                    dest += strlen(i);
1271                    copied = 1;
1272                }
1273                source+=2;
1274                continue;
1275            }
1276        }
1277        *(dest++) = *(source++);
1278    }
1279    *dest = 0;
1280    *string = strdup(buffer);
1281}
1282
1283static int build_indexes;
1284
1285static int pgsql_start(const struct output_options *options)
1286{
1287    PGresult   *res;
1288    int i;
1289    int dropcreate = !options->append;
1290
1291    scale = options->scale;
1292    Append = options->append;
1293   
1294    /* How much we can fit, and make sure it's odd */
1295    maxBlocks = (options->cache*((1024*1024)/(PER_BLOCK*sizeof(struct ramNode)))) | 1;
1296    queue = malloc( maxBlocks * sizeof(struct ramNodeBlock) );   
1297   
1298#ifdef __MINGW_H
1299    fprintf( stderr, "Mid: pgsql, scale=%d, cache=%dMB, maxblocks=%d*%d\n", scale, options->cache, maxBlocks, PER_BLOCK*sizeof(struct ramNode) ); 
1300#else
1301    fprintf( stderr, "Mid: pgsql, scale=%d, cache=%dMB, maxblocks=%d*%zd\n", scale, options->cache, maxBlocks, PER_BLOCK*sizeof(struct ramNode) );
1302#endif
1303   
1304    /* We use a connection per table to enable the use of COPY */
1305    for (i=0; i<num_tables; i++) {
1306        PGconn *sql_conn;
1307                       
1308        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].name));
1309        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].start));
1310        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].create));
1311        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].create_index));
1312        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].prepare));
1313        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].prepare_intarray));
1314        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].copy));
1315        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].analyze));
1316        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].stop));
1317        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].array_indexes));
1318
1319        fprintf(stderr, "Setting up table: %s\n", tables[i].name);
1320        sql_conn = PQconnectdb(options->conninfo);
1321
1322        /* Check to see that the backend connection was successfully made */
1323        if (PQstatus(sql_conn) != CONNECTION_OK) {
1324            fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(sql_conn));
1325            exit_nicely();
1326        }
1327        tables[i].sql_conn = sql_conn;
1328
1329        /* Not really the right place for this test, but we need a live
1330         * connection that not used for anything else yet, and we'd like to
1331         * warn users *before* we start doing mountains of work */
1332        if (i == t_node)
1333        {
1334            res = PQexec(sql_conn, "select 1 from pg_opclass where opcname='gist__intbig_ops'" );
1335            if(PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1)
1336            {
1337                /* intarray is problematic now; causes at least postgres 8.4
1338                 * to not use the index on nodes[]/parts[] which slows diff
1339                 * updates to a crawl!
1340                 * If someone find a way to fix this rather than bow out here,
1341                 * please do.*/
1342
1343                fprintf(stderr, 
1344                    "\n"
1345                    "The target database has the intarray contrib module loaded.\n"
1346                    "While required for earlier versions of osm2pgsql, intarray \n"
1347                    "is now unnecessary and will interfere with osm2pgsql's array\n"
1348                    "handling. Please use a database without intarray.\n\n");
1349                exit_nicely();
1350            }
1351            PQclear(res);
1352
1353            if (options->append)
1354            {
1355                res = PQexec(sql_conn, "select id from planet_osm_nodes limit 1" );
1356                if(PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1)
1357                {
1358                    int size = PQfsize(res, 0);
1359                    if (size != sizeof(osmid_t))
1360                    {
1361                        fprintf(stderr, 
1362                            "\n"
1363                            "The target database has been created with %dbit ID fields,\n"
1364                            "but this version of osm2pgsql has been compiled to use %ldbit IDs.\n"
1365                            "You cannot append data to this database with this program.\n"
1366                            "Either re-create the database or use a matching osm2pgsql.\n\n",
1367                            size * 8, sizeof(osmid_t) * 8);
1368                        exit_nicely();
1369                    }
1370                }
1371                PQclear(res);
1372            }
1373
1374            if(!options->append)
1375                build_indexes = 1;
1376        }
1377        if (dropcreate) {
1378            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE IF EXISTS %s", tables[i].name);
1379        }
1380
1381        if (tables[i].start) {
1382            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].start);
1383        }
1384
1385        if (dropcreate && tables[i].create) {
1386            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].create);
1387            if (tables[i].create_index) {
1388              pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].create_index);
1389            }
1390        }
1391
1392
1393        if (tables[i].prepare) {
1394            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare);
1395        }
1396
1397        if (tables[i].prepare_intarray) {
1398            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare_intarray);
1399        }
1400
1401        if (tables[i].copy) {
1402            pgsql_exec(sql_conn, PGRES_COPY_IN, "%s", tables[i].copy);
1403            tables[i].copyMode = 1;
1404        }
1405    }
1406
1407    return 0;
1408}
1409
1410static void *pgsql_stop_one(void *arg)
1411{
1412    struct table_desc *table = arg;
1413    PGconn *sql_conn = table->sql_conn;
1414
1415    fprintf(stderr, "Stopping table: %s\n", table->name);
1416    pgsql_endCopy(table);
1417    if (table->stop) 
1418        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", table->stop);
1419
1420    if (build_indexes && table->array_indexes) {
1421        char *buffer = (char *) malloc(strlen(table->array_indexes) + 99);
1422        // we need to insert before the TABLESPACE setting, if any
1423        char *insertpos = strstr(table->array_indexes, "TABLESPACE");
1424        if (!insertpos) insertpos = strchr(table->array_indexes, ';');
1425
1426        // automatically insert FASTUPDATE=OFF when creating,
1427        // indexes for PostgreSQL 8.4 and higher
1428        // see http://lists.openstreetmap.org/pipermail/dev/2011-January/021704.html
1429        if (insertpos && PQserverVersion(sql_conn) >= 80400) {
1430            char old = *insertpos;
1431            fprintf(stderr, "Building index on table: %s (fastupdate=off)\n", table->name);
1432            *insertpos = 0; // temporary null byte for following strcpy operation
1433            strcpy(buffer, table->array_indexes);
1434            *insertpos = old; // restore old content
1435            strcat(buffer, " WITH (FASTUPDATE=OFF)");
1436            strcat(buffer, insertpos);
1437        } else {
1438            fprintf(stderr, "Building index on table: %s\n", table->name);
1439            strcpy(buffer, table->array_indexes);
1440        }
1441        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", buffer);
1442        free(buffer);
1443    }
1444    PQfinish(sql_conn);
1445    table->sql_conn = NULL;
1446    fprintf(stderr, "Stopped table: %s\n", table->name);
1447    return NULL;
1448}
1449
1450static void pgsql_stop(void)
1451{
1452    int i;
1453#ifdef HAVE_PTHREAD
1454    pthread_t threads[num_tables];
1455#endif
1456
1457    fprintf( stderr, "node cache: stored: %" PRIdOSMID "(%.2f%%), storage efficiency: %.2f%%, hit rate: %.2f%%\n", 
1458             storedNodes, 100.0f*storedNodes/totalNodes, 100.0f*storedNodes/(usedBlocks*PER_BLOCK),
1459             100.0f*nodesCacheHits/nodesCacheLookups );
1460         
1461    for( i=0; i<usedBlocks; i++ )
1462    {
1463      free(queue[i]->nodes);
1464      queue[i]->nodes = NULL;
1465    }
1466    free(queue);
1467
1468#ifdef HAVE_PTHREAD
1469    for (i=0; i<num_tables; i++) {
1470        int ret = pthread_create(&threads[i], NULL, pgsql_stop_one, &tables[i]);
1471        if (ret) {
1472            fprintf(stderr, "pthread_create() returned an error (%d)", ret);
1473            exit_nicely();
1474        }
1475    }
1476    for (i=0; i<num_tables; i++) {
1477        int ret = pthread_join(threads[i], NULL);
1478        if (ret) {
1479            fprintf(stderr, "pthread_join() returned an error (%d)", ret);
1480            exit_nicely();
1481        }
1482    }
1483#else
1484    for (i=0; i<num_tables; i++)
1485        pgsql_stop_one(&tables[i]);
1486#endif
1487}
1488 
1489struct middle_t mid_pgsql = {
1490        .start             = pgsql_start,
1491        .stop              = pgsql_stop,
1492        .cleanup           = pgsql_cleanup,
1493        .analyze           = pgsql_analyze,
1494        .end               = pgsql_end,
1495
1496        .nodes_set         = pgsql_nodes_set,
1497#if 0
1498        .nodes_get         = pgsql_nodes_get,
1499#endif
1500        .nodes_get_list    = pgsql_nodes_get_list,
1501        .nodes_delete      = pgsql_nodes_delete,
1502        .node_changed      = pgsql_node_changed,
1503
1504        .ways_set          = pgsql_ways_set,
1505        .ways_get          = pgsql_ways_get,
1506        .ways_done         = pgsql_ways_done,
1507        .ways_delete       = pgsql_ways_delete,
1508        .way_changed       = pgsql_way_changed,
1509
1510        .relations_set     = pgsql_rels_set,
1511#if 0
1512        .relations_get     = pgsql_rels_get,
1513#endif
1514        .relations_done    = pgsql_rels_done,
1515        .relations_delete  = pgsql_rels_delete,
1516        .relation_changed  = pgsql_rel_changed,
1517#if 0
1518        .iterate_nodes     = pgsql_iterate_nodes,
1519#endif
1520        .iterate_ways      = pgsql_iterate_ways,
1521        .iterate_relations = pgsql_iterate_relations
1522};
Note: See TracBrowser for help on using the repository browser.