source: subversion/applications/utils/export/osm2pgsql/middle-pgsql.c @ 26711

Last change on this file since 26711 was 26665, checked in by apmon, 8 years ago

[osm2pgsql] allocate node cache in one big chunk instead of small pieces

This is a patch by Hartmut Holzgraefe ( https://github.com/hholzgra/osm2pgsql/commit/9f05f1b925751eca657bbd7fa6cbac937f7756d2 ),
adapted to add a command-line switch to revert to the old behavior

Allocating the node cache in small chunks causes the memory not to be returned to the operating system after it is free()ed,
as it is using a different internal allocation strategy (not mmap based).

With the new (default) behavior the entire node cache is allocated in one single malloc call.

At least on Linux, physical memory is still only allocated in chunks when it is needed and only virtual memory is reserved in one chunk.
So apart from the correct freeing of memory during the indexing stage, behavior doesn't change.
Other operating systems might behave differently though, which is why the old strategy can still be selected via a command line switch.

File size: 50.9 KB
Line 
1/* Implements the mid-layer processing for osm2pgsql
2 * using several PostgreSQL tables
3 *
4 * This layer stores data read in from the planet.osm file
5 * and is then read by the backend processing code to
6 * emit the final geometry-enabled output formats
7*/
8 
9#include "config.h"
10
11#include <stdio.h>
12#include <unistd.h>
13#include <stdlib.h>
14#include <string.h>
15#include <assert.h>
16#include <math.h>
17#include <time.h>
18
19#ifdef HAVE_PTHREAD
20#include <pthread.h>
21#endif
22
23#include <libpq-fe.h>
24
25#include "osmtypes.h"
26#include "middle.h"
27#include "middle-pgsql.h"
28#include "output-pgsql.h"
29#include "pgsql.h"
30
31/* Store +-20,000km Mercator co-ordinates as fixed point 32bit number with maximum precision */
32/* Scale is chosen such that 40,000 * SCALE < 2^32          */
33#define FIXED_POINT
34
35static int scale = 100;
36#define DOUBLE_TO_FIX(x) ((int)((x) * scale))
37#define FIX_TO_DOUBLE(x) (((double)x) / scale)
38
39
40
41enum table_id {
42    t_node, t_way, t_rel
43} ;
44
45struct table_desc {
46    //enum table_id table;
47    const char *name;
48    const char *start;
49    const char *create;
50    const char *create_index;
51    const char *prepare;
52    const char *prepare_intarray;
53    const char *copy;
54    const char *analyze;
55    const char *stop;
56    const char *array_indexes;
57
58    int copyMode;    /* True if we are in copy mode */
59    PGconn *sql_conn;
60};
61
62static struct table_desc tables [] = {
63    { 
64        //table = t_node,
65         .name = "%p_nodes",
66        .start = "BEGIN;\n",
67#ifdef FIXED_POINT
68       .create = "CREATE TABLE %p_nodes (id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, lat int4 not null, lon int4 not null, tags text[]) {TABLESPACE %t};\n",
69      .prepare = "PREPARE insert_node (" POSTGRES_OSMID_TYPE ", int4, int4, text[]) AS INSERT INTO %p_nodes VALUES ($1,$2,$3,$4);\n"
70#else
71       .create = "CREATE TABLE %p_nodes (id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, lat double precision not null, lon double precision not null, tags text[]) {TABLESPACE %t};\n",
72      .prepare = "PREPARE insert_node (" POSTGRES_OSMID_TYPE ", double precision, double precision, text[]) AS INSERT INTO %p_nodes VALUES ($1,$2,$3,$4);\n"
73#endif
74               "PREPARE get_node (" POSTGRES_OSMID_TYPE ") AS SELECT lat,lon,tags FROM %p_nodes WHERE id = $1 LIMIT 1;\n"
75               "PREPARE delete_node (" POSTGRES_OSMID_TYPE ") AS DELETE FROM %p_nodes WHERE id = $1;\n",
76.prepare_intarray = // This is to fetch lots of nodes simultaneously, in order including duplicates. The commented out version doesn't do duplicates
77                  // It's not optimal as it does a Nested Loop / Index Scan which is suboptimal for large arrays
78                  //"PREPARE get_node_list(int[]) AS SELECT id, lat, lon FROM %p_nodes WHERE id = ANY($1::int4[]) ORDER BY $1::int4[] # id\n",
79                 //"PREPARE get_node_list(" POSTGRES_OSMID_TYPE "[]) AS select y.id, y.lat, y.lon from (select i, ($1)[i] as l_id from (select generate_series(1,array_length($1, 1)) as i) x) z, "
80                 //                                    "(select * from %p_nodes where id = ANY($1)) y where l_id=id order by i;\n",
81                 "PREPARE get_node_list(" POSTGRES_OSMID_TYPE "[]) AS SELECT id, lat, lon FROM %p_nodes WHERE id = ANY($1::" POSTGRES_OSMID_TYPE "[])",
82         .copy = "COPY %p_nodes FROM STDIN;\n",
83      .analyze = "ANALYZE %p_nodes;\n",
84         .stop = "COMMIT;\n"
85    },
86    { 
87        //table = t_way,
88         .name = "%p_ways",
89        .start = "BEGIN;\n",
90       .create = "CREATE TABLE %p_ways (id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, nodes " POSTGRES_OSMID_TYPE "[] not null, tags text[], pending boolean not null) {TABLESPACE %t};\n",
91 .create_index = "CREATE INDEX %p_ways_idx ON %p_ways (id) {TABLESPACE %i} WHERE pending;\n",
92.array_indexes = "CREATE INDEX %p_ways_nodes ON %p_ways USING gin (nodes) {TABLESPACE %i};\n",
93      .prepare = "PREPARE insert_way (" POSTGRES_OSMID_TYPE ", " POSTGRES_OSMID_TYPE "[], text[], boolean) AS INSERT INTO %p_ways VALUES ($1,$2,$3,$4);\n"
94               "PREPARE get_way (" POSTGRES_OSMID_TYPE ") AS SELECT nodes, tags, array_upper(nodes,1) FROM %p_ways WHERE id = $1;\n"
95               "PREPARE way_done(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_ways SET pending = false WHERE id = $1;\n"
96               "PREPARE pending_ways AS SELECT id FROM %p_ways WHERE pending;\n"
97               "PREPARE delete_way(" POSTGRES_OSMID_TYPE ") AS DELETE FROM %p_ways WHERE id = $1;\n",
98.prepare_intarray = "PREPARE node_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_ways SET pending = true WHERE nodes && ARRAY[$1] AND NOT pending;\n",
99         .copy = "COPY %p_ways FROM STDIN;\n",
100      .analyze = "ANALYZE %p_ways;\n",
101         .stop =  "COMMIT;\n"
102    },
103    { 
104        //table = t_rel,
105         .name = "%p_rels",
106        .start = "BEGIN;\n",
107       .create = "CREATE TABLE %p_rels(id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, way_off int2, rel_off int2, parts " POSTGRES_OSMID_TYPE "[], members text[], tags text[], pending boolean not null) {TABLESPACE %t};\n",
108 .create_index = "CREATE INDEX %p_rels_idx ON %p_rels (id) {TABLESPACE %i} WHERE pending;\n",
109.array_indexes = "CREATE INDEX %p_rels_parts ON %p_rels USING gin (parts) {TABLESPACE %i};\n",
110      .prepare = "PREPARE insert_rel (" POSTGRES_OSMID_TYPE ", int2, int2, " POSTGRES_OSMID_TYPE "[], text[], text[]) AS INSERT INTO %p_rels VALUES ($1,$2,$3,$4,$5,$6,false);\n"
111               "PREPARE get_rel (" POSTGRES_OSMID_TYPE ") AS SELECT members, tags, array_upper(members,1)/2 FROM %p_rels WHERE id = $1;\n"
112               "PREPARE rel_done(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = false WHERE id = $1;\n"
113               "PREPARE pending_rels AS SELECT id FROM %p_rels WHERE pending;\n"
114               "PREPARE delete_rel(" POSTGRES_OSMID_TYPE ") AS DELETE FROM %p_rels WHERE id = $1;\n",
115.prepare_intarray =
116                "PREPARE node_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = true WHERE parts && ARRAY[$1] AND parts[1:way_off] && ARRAY[$1] AND NOT pending;\n"
117                "PREPARE way_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = true WHERE parts && ARRAY[$1] AND parts[way_off+1:rel_off] && ARRAY[$1] AND NOT pending;\n"
118                "PREPARE rel_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = true WHERE parts && ARRAY[$1] AND parts[rel_off+1:array_length(parts,1)] && ARRAY[$1] AND NOT pending;\n",
119         .copy = "COPY %p_rels FROM STDIN;\n",
120      .analyze = "ANALYZE %p_rels;\n",
121         .stop =  "COMMIT;\n"
122    }
123};
124
125static const int num_tables = sizeof(tables)/sizeof(tables[0]);
126static int warn_node_order;
127static struct table_desc *node_table = &tables[t_node];
128static struct table_desc *way_table  = &tables[t_way];
129static struct table_desc *rel_table  = &tables[t_rel];
130
131/* Here we use a similar storage structure as middle-ram, except we allow
132 * the array to be lossy so we can cap the total memory usage. Hence it is a
133 * combination of a sparse array with a priority queue
134 *
135 * Like middle-ram we have a number of blocks all storing PER_BLOCK
136 * ramNodes. However, here we also track the number of nodes in each block.
137 * Seperately we have a priority queue like structure when maintains a list
138 * of all the used block so we can easily find the block with the least
139 * nodes. The cache has two phases:
140 *
141 * Phase 1: Loading initially, usedBlocks < maxBlocks. In this case when a
142 * new block is needed we simply allocate it and put it in
143 * queue[usedBlocks-1] which is the bottom of the tree. Every node added
144 * increases it's usage. When we move onto the next block we percolate this
145 * block up the queue until it reaches its correct position. The invariant
146 * is that the priority tree is complete except for this last node. We do
147 * not permit adding nodes to any other block to preserve this invariant.
148 *
149 * Phase 2: Once we've reached the maximum number of blocks permitted, we
150 * change so that the block currently be inserted into is at the top of the
151 * tree. When a new block is needed we take the one at the end of the queue,
152 * as it is the one with the least number of nodes in it. When we move onto
153 * the next block we first push the just completed block down to it's
154 * correct position in the queue and then reuse the block that now at the
155 * head.
156 *
157 * The result being that at any moment we have in memory the top maxBlock
158 * blocks in terms of number of nodes in memory. This should maximize the
159 * number of hits in lookups.
160 *
161 * Complexity:
162 *  Insert node: O(1)
163 *  Lookup node: O(1)
164 *  Add new block: O(log usedBlocks)
165 *  Reuse old block: O(log maxBlocks)
166 */
167
168struct ramNode {
169#ifdef FIXED_POINT
170    int lon;
171    int lat;
172#else
173    double lon;
174    double lat;
175#endif
176};
177
178struct ramNodeBlock {
179  struct ramNode    *nodes;
180  int used;
181};
182
183#define BLOCK_SHIFT 10
184#define PER_BLOCK  (((osmid_t)1) << BLOCK_SHIFT)
185#define NUM_BLOCKS (((osmid_t)1) << (36 - BLOCK_SHIFT))
186
187static struct ramNodeBlock blocks[NUM_BLOCKS];
188static int usedBlocks;
189/* Note: maxBlocks *must* be odd, to make sure the priority queue has no nodes with only one child */
190static int maxBlocks = 0;
191static void *blockCache = NULL;
192static int allocChunkwise = 1;
193static struct ramNodeBlock **queue;
194static osmid_t storedNodes, totalNodes;
195int nodesCacheHits, nodesCacheLookups;
196
197static int Append;
198
199static inline int id2block(osmid_t id)
200{
201    // + NUM_BLOCKS/2 allows for negative IDs
202    return (id >> BLOCK_SHIFT) + NUM_BLOCKS/2;
203}
204
205static inline int id2offset(osmid_t id)
206{
207    return id & (PER_BLOCK-1);
208}
209
210static inline osmid_t block2id(int block, int offset)
211{
212    return (((osmid_t) block - NUM_BLOCKS/2) << BLOCK_SHIFT) + (osmid_t) offset;
213}
214
215#define Swap(a,b) { typeof(a) __tmp = a; a = b; b = __tmp; }
216static void percolate_up( int pos )
217{
218    int i = pos;
219    while( i > 0 )
220    {
221      int parent = (i-1)>>1;
222      if( queue[i]->used < queue[parent]->used )
223      {
224        Swap( queue[i], queue[parent] );
225        i = parent;
226      }
227      else
228        break;
229    }
230}
231
232static void init_blockCache( int chunkwise) {
233  allocChunkwise = chunkwise;
234  if( !allocChunkwise ) {
235    fprintf(stderr, "Allocating node cache in one big chunk\n");
236    queue = malloc( maxBlocks * sizeof(struct ramNodeBlock) );   
237    blockCache = malloc(maxBlocks * PER_BLOCK * sizeof(struct ramNode));
238
239    if (!queue || !blockCache) {
240      fprintf(stderr, "Out of memory, reduce --cache size\n");
241      exit_nicely();
242    }
243  } else {
244    fprintf(stderr, "Allocating node cache in block sized chunks\n");
245    queue = malloc( maxBlocks * sizeof(struct ramNodeBlock) );
246    if (!queue) {
247      fprintf(stderr, "Out of memory, reduce --cache size\n");
248      exit_nicely();
249    }
250  }
251}
252
253static void free_blockCache() {
254  int i;
255  if ( !allocChunkwise ) {
256    free(blockCache);
257  } else {
258    for( i=0; i<usedBlocks; i++) {
259      free(queue[i]->nodes);
260      queue[i]->nodes = NULL;
261    }
262  }
263}
264
265static void *next_chunk(size_t count, size_t size)
266{
267  if ( !allocChunkwise ) {
268    static size_t pos = 0;
269    void *result;
270   
271    result = blockCache + pos;
272 
273    pos += count * size;
274   
275    return result;
276  } else {
277    return calloc(PER_BLOCK, sizeof(struct ramNode));
278  }
279}
280
281#define UNUSED  __attribute__ ((unused))
282static int pgsql_ram_nodes_set(osmid_t id, double lat, double lon, struct keyval *tags UNUSED)
283{
284    int block  = id2block(id);
285    int offset = id2offset(id);
286   
287    totalNodes++;
288
289    if (!blocks[block].nodes) {
290        if( usedBlocks < maxBlocks )
291        {
292          /* We've just finished with the previous block, so we need to percolate it up the queue to its correct position */
293          if( usedBlocks > 0 )
294            /* Upto log(usedBlocks) iterations */
295            percolate_up( usedBlocks-1 );
296
297          blocks[block].nodes = next_chunk(PER_BLOCK, sizeof(struct ramNode));
298
299          blocks[block].used = 0;
300          if (!blocks[block].nodes) {
301              fprintf(stderr, "Error allocating nodes\n");
302              exit_nicely();
303          }
304          queue[usedBlocks] = &blocks[block];
305          usedBlocks++;
306
307          /* If we've just used up the last possible block we enter the
308           * transition and we change the invariant. To do this we percolate
309           * the newly allocated block straight to the head */
310          if( usedBlocks == maxBlocks )
311            percolate_up( usedBlocks-1 );
312        }
313        else
314        {
315          /* We've reached the maximum number of blocks, so now we push the
316           * current head of the tree down to the right level to restore the
317           * priority queue invariant. Upto log(maxBlocks) iterations */
318         
319          int i=0;
320          while( 2*i+1 < maxBlocks )
321          {
322            if( queue[2*i+1]->used <= queue[2*i+2]->used )
323            {
324              if( queue[i]->used > queue[2*i+1]->used )
325              {
326                Swap( queue[i], queue[2*i+1] );
327                i = 2*i+1;
328              }
329              else
330                break;
331            }
332            else
333            {
334              if( queue[i]->used > queue[2*i+2]->used )
335              {
336                Swap( queue[i], queue[2*i+2] );
337                i = 2*i+2;
338              }
339              else
340                break;
341            }
342          }
343          /* Now the head of the queue is the smallest, so it becomes our replacement candidate */
344          blocks[block].nodes = queue[0]->nodes;
345          blocks[block].used = 0;
346          memset( blocks[block].nodes, 0, PER_BLOCK * sizeof(struct ramNode) );
347         
348          /* Clear old head block and point to new block */
349          storedNodes -= queue[0]->used;
350          queue[0]->nodes = NULL;
351          queue[0]->used = 0;
352          queue[0] = &blocks[block];
353        }
354    }
355    else
356    {
357      /* Insert into an existing block. We can't allow this in general or it
358       * will break the invariant. However, it will work fine if all the
359       * nodes come in numerical order, which is the common case */
360     
361      int expectedpos;
362      if( usedBlocks < maxBlocks )
363        expectedpos = usedBlocks-1;
364      else
365        expectedpos = 0;
366       
367      if( queue[expectedpos] != &blocks[block] )
368      {
369        if (!warn_node_order) {
370            fprintf( stderr, "WARNING: Found Out of order node %" PRIdOSMID " (%d,%d) - this will impact the cache efficiency\n", id, block, offset );
371            warn_node_order++;
372        }
373        return 1;
374      }
375    }
376       
377#ifdef FIXED_POINT
378    blocks[block].nodes[offset].lat = DOUBLE_TO_FIX(lat);
379    blocks[block].nodes[offset].lon = DOUBLE_TO_FIX(lon);
380#else
381    blocks[block].nodes[offset].lat = lat;
382    blocks[block].nodes[offset].lon = lon;
383#endif
384    blocks[block].used++;
385    storedNodes++;
386    return 0;
387}
388
389
390int pgsql_ram_nodes_get(struct osmNode *out, osmid_t id)
391{
392    int block  = id2block(id);
393    int offset = id2offset(id);
394    nodesCacheLookups++;
395
396    if (!blocks[block].nodes)
397        return 1;
398
399    if (!blocks[block].nodes[offset].lat && !blocks[block].nodes[offset].lon)
400        return 1;
401
402#ifdef FIXED_POINT
403    out->lat = FIX_TO_DOUBLE(blocks[block].nodes[offset].lat);
404    out->lon = FIX_TO_DOUBLE(blocks[block].nodes[offset].lon);
405#else
406    out->lat = blocks[block].nodes[offset].lat;
407    out->lon = blocks[block].nodes[offset].lon;
408#endif
409    nodesCacheHits++;
410    return 0;
411}
412
413static void pgsql_cleanup(void)
414{
415    int i;
416
417    for (i=0; i<num_tables; i++) {
418        if (tables[i].sql_conn) {
419            PQfinish(tables[i].sql_conn);
420            tables[i].sql_conn = NULL;
421        }
422    }
423}
424
425char *pgsql_store_nodes(osmid_t *nds, int nd_count)
426{
427  static char *buffer;
428  static int buflen;
429
430  char *ptr;
431  int i, first;
432   
433  if( buflen <= nd_count * 10 )
434  {
435    buflen = ((nd_count * 10) | 4095) + 1;  /* Round up to next page */
436    buffer = realloc( buffer, buflen );
437  }
438_restart:
439
440  ptr = buffer;
441  first = 1;
442  *ptr++ = '{';
443  for( i=0; i<nd_count; i++ )
444  {
445    if( !first ) *ptr++ = ',';
446    ptr += sprintf(ptr, "%" PRIdOSMID, nds[i] );
447   
448    if( (ptr-buffer) > (buflen-20) ) /* Almost overflowed? */
449    {
450      buflen <<= 1;
451      buffer = realloc( buffer, buflen );
452     
453      goto _restart;
454    }
455    first = 0;
456  }
457 
458  *ptr++ = '}';
459  *ptr++ = 0;
460 
461  return buffer;
462}
463
464/* Special escape routine for escaping strings in array constants: double quote, backslash,newline, tab*/
465static inline char *escape_tag( char *ptr, const char *in, int escape )
466{
467  while( *in )
468  {
469    switch(*in)
470    {
471      case '"':
472        if( escape ) *ptr++ = '\\';
473        *ptr++ = '\\';
474        *ptr++ = '"';
475        break;
476      case '\\':
477        if( escape ) *ptr++ = '\\';
478        if( escape ) *ptr++ = '\\';
479        *ptr++ = '\\';
480        *ptr++ = '\\';
481        break;
482      case '\n':
483        if( escape ) *ptr++ = '\\';
484        *ptr++ = '\\';
485        *ptr++ = 'n';
486        break;
487      case '\r':
488        if( escape ) *ptr++ = '\\';
489        *ptr++ = '\\';
490        *ptr++ = 'r';
491        break;
492      case '\t':
493        if( escape ) *ptr++ = '\\';
494        *ptr++ = '\\';
495        *ptr++ = 't';
496        break;
497      default:
498        *ptr++ = *in;
499        break;
500    }
501    in++;
502  }
503  return ptr;
504}
505
506/* escape means we return '\N' for copy mode, otherwise we return just NULL */
507char *pgsql_store_tags(struct keyval *tags, int escape)
508{
509  static char *buffer;
510  static int buflen;
511
512  char *ptr;
513  struct keyval *i;
514  int first;
515   
516  int countlist = countList(tags);
517  if( countlist == 0 )
518  {
519    if( escape )
520      return "\\N";
521    else
522      return NULL;
523  }
524   
525  if( buflen <= countlist * 24 ) /* LE so 0 always matches */
526  {
527    buflen = ((countlist * 24) | 4095) + 1;  /* Round up to next page */
528    buffer = realloc( buffer, buflen );
529  }
530_restart:
531
532  ptr = buffer;
533  first = 1;
534  *ptr++ = '{';
535  /* The lists are circular, exit when we reach the head again */
536  for( i=tags->next; i->key; i = i->next )
537  {
538    int maxlen = (strlen(i->key) + strlen(i->value)) * 4;
539    if( (ptr+maxlen-buffer) > (buflen-20) ) /* Almost overflowed? */
540    {
541      buflen <<= 1;
542      buffer = realloc( buffer, buflen );
543     
544      goto _restart;
545    }
546    if( !first ) *ptr++ = ',';
547    *ptr++ = '"';
548    ptr = escape_tag( ptr, i->key, escape );
549    *ptr++ = '"';
550    *ptr++ = ',';
551    *ptr++ = '"';
552    ptr = escape_tag( ptr, i->value, escape );
553    *ptr++ = '"';
554   
555    first=0;
556  }
557 
558  *ptr++ = '}';
559  *ptr++ = 0;
560 
561  return buffer;
562}
563
564/* Decodes a portion of an array literal from postgres */
565/* Argument should point to beginning of literal, on return points to delimiter */
566static const char *decode_upto( const char *src, char *dst )
567{
568  int quoted = (*src == '"');
569  if( quoted ) src++;
570 
571  while( quoted ? (*src != '"') : (*src != ',' && *src != '}') )
572  {
573    if( *src == '\\' )
574    {
575      switch( src[1] )
576      {
577        case 'n': *dst++ = '\n'; break;
578        case 't': *dst++ = '\t'; break;
579        default: *dst++ = src[1]; break;
580      }
581      src+=2;
582    }
583    else
584      *dst++ = *src++;
585  }
586  if( quoted ) src++;
587  *dst = 0;
588  return src;
589}
590
591static void pgsql_parse_tags( const char *string, struct keyval *tags )
592{
593  char key[1024];
594  char val[1024];
595 
596  if( *string == '\0' )
597    return;
598   
599//  fprintf( stderr, "Parsing: %s\n", string );
600  if( *string++ != '{' )
601    return;
602  while( *string != '}' )
603  {
604    string = decode_upto( string, key );
605    /* String points to the comma */
606    string++;
607    string = decode_upto( string, val );
608    /* String points to the comma or closing '}' */
609    addItem( tags, key, val, 0 );
610//    fprintf( stderr, "Extracted item: %s=%s\n", key, val );
611    if( *string == ',' )
612      string++;
613  }
614}
615
616/* Parses an array of integers */
617static void pgsql_parse_nodes(const char *src, osmid_t *nds, int nd_count )
618{
619  int count = 0;
620  const char *string = src;
621 
622  if( *string++ != '{' )
623    return;
624  while( *string != '}' )
625  {
626    char *ptr;
627    nds[count] = strtoosmid( string, &ptr, 10 );
628    string = ptr;
629    if( *string == ',' )
630      string++;
631    count++;
632  }
633  if( count != nd_count )
634  {
635    fprintf( stderr, "parse_nodes problem: '%s' expected %d got %d\n", src, nd_count, count );
636    exit_nicely();
637  }
638}
639
640static int pgsql_endCopy( struct table_desc *table)
641{
642    /* Terminate any pending COPY */
643     if (table->copyMode) {
644        PGconn *sql_conn = table->sql_conn;
645        int stop = PQputCopyEnd(sql_conn, NULL);
646        if (stop != 1) {
647            fprintf(stderr, "COPY_END for %s failed: %s\n", table->copy, PQerrorMessage(sql_conn));
648            exit_nicely();
649        }
650
651        PGresult *res = PQgetResult(sql_conn);
652        if (PQresultStatus(res) != PGRES_COMMAND_OK) {
653            fprintf(stderr, "COPY_END for %s failed: %s\n", table->copy, PQerrorMessage(sql_conn));
654            PQclear(res);
655            exit_nicely();
656        }
657        PQclear(res);
658        table->copyMode = 0;
659    }
660    return 0;
661}
662
663static int pgsql_nodes_set(osmid_t id, double lat, double lon, struct keyval *tags)
664{
665    /* Four params: id, lat, lon, tags */
666    char *paramValues[4];
667    char *buffer;
668
669    pgsql_ram_nodes_set( id, lat, lon, tags );
670    if( node_table->copyMode )
671    {
672      char *tag_buf = pgsql_store_tags(tags,1);
673      int length = strlen(tag_buf) + 64;
674      buffer = alloca( length );
675#ifdef FIXED_POINT
676      if( snprintf( buffer, length, "%" PRIdOSMID "\t%d\t%d\t%s\n", id, DOUBLE_TO_FIX(lat), DOUBLE_TO_FIX(lon), tag_buf ) > (length-10) )
677      { fprintf( stderr, "buffer overflow node id %" PRIdOSMID "\n", id); return 1; }
678#else
679      if( snprintf( buffer, length, "%" PRIdOSMID "\t%.10f\t%.10f\t%s\n", id, lat, lon, tag_buf ) > (length-10) )
680      { fprintf( stderr, "buffer overflow node id %" PRIdOSMID "\n", id); return 1; }
681#endif
682      return pgsql_CopyData(__FUNCTION__, node_table->sql_conn, buffer);
683    }
684    buffer = alloca(64);
685    paramValues[0] = buffer;
686    paramValues[1] = paramValues[0] + sprintf( paramValues[0], "%" PRIdOSMID, id ) + 1;
687#ifdef FIXED_POINT
688    paramValues[2] = paramValues[1] + sprintf( paramValues[1], "%d", DOUBLE_TO_FIX(lat) ) + 1;
689    sprintf( paramValues[2], "%d", DOUBLE_TO_FIX(lon) );
690#else
691    paramValues[2] = paramValues[1] + sprintf( paramValues[1], "%.10f", lat ) + 1;
692    sprintf( paramValues[2], "%.10f", lon );
693#endif
694    paramValues[3] = pgsql_store_tags(tags,0);
695    pgsql_execPrepared(node_table->sql_conn, "insert_node", 4, (const char * const *)paramValues, PGRES_COMMAND_OK);
696    return 0;
697}
698
699
700static int pgsql_nodes_get(struct osmNode *out, osmid_t id)
701{
702    /* Check cache first */
703    if( pgsql_ram_nodes_get( out, id ) == 0 )
704      return 0;
705     
706    PGresult   *res;
707    char tmp[16];
708    char const *paramValues[1];
709    PGconn *sql_conn = node_table->sql_conn;
710
711    /* Make sure we're out of copy mode */
712    pgsql_endCopy( node_table );
713
714    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
715    paramValues[0] = tmp;
716 
717    res = pgsql_execPrepared(sql_conn, "get_node", 1, paramValues, PGRES_TUPLES_OK);
718
719    if (PQntuples(res) != 1) {
720        PQclear(res);
721        return 1;
722    } 
723
724#ifdef FIXED_POINT
725    out->lat = FIX_TO_DOUBLE(strtol(PQgetvalue(res, 0, 0), NULL, 10));
726    out->lon = FIX_TO_DOUBLE(strtol(PQgetvalue(res, 0, 1), NULL, 10));
727#else
728    out->lat = strtod(PQgetvalue(res, 0, 0), NULL);
729    out->lon = strtod(PQgetvalue(res, 0, 1), NULL);
730#endif
731    PQclear(res);
732    return 0;
733}
734
735/* This should be made more efficient by using an IN(ARRAY[]) construct */
736static int pgsql_nodes_get_list(struct osmNode *nodes, osmid_t *ndids, int nd_count)
737{
738    char tmp[16];
739    char *tmp2; 
740    int count, count2, countDB, countPG, i,j;
741    osmid_t id;
742    osmid_t *ndidspg;
743    struct osmNode *nodespg;
744    struct osmNode *nodes2;
745    char const *paramValues[1]; 
746
747    PGresult *res;
748    PGconn *sql_conn = node_table->sql_conn;
749   
750    count = 0; countDB = 0;
751
752    tmp2 = malloc(sizeof(char)*nd_count*16);
753    if (tmp2 == NULL) return 0; /*failed to allocate memory, return */
754
755    /* create a list of ids in tmp2 to query the database  */
756    snprintf(tmp2, sizeof(tmp2), "{");
757    for( i=0; i<nd_count; i++ ) {
758        /* Check cache first */ 
759        if( pgsql_ram_nodes_get( &nodes[i], ndids[i]) == 0 ) {
760            count++;
761            continue;
762        }
763        countDB++;
764        /* Mark nodes as needing to be fetched from the DB */
765        nodes[i].lat = NAN;
766        nodes[i].lon = NAN;
767        snprintf(tmp, sizeof(tmp), "%" PRIdOSMID ",", ndids[i]);
768        strcat(tmp2,tmp);
769    }
770    tmp2[strlen(tmp2) - 1] = '}'; /* replace last , with } to complete list of ids*/
771 
772    if (countDB == 0) {
773        free(tmp2);
774        return count; /* All ids where in cache, so nothing more to do */
775    }
776 
777    pgsql_endCopy(node_table); 
778
779    paramValues[0] = tmp2; 
780    res = pgsql_execPrepared(sql_conn, "get_node_list", 1, paramValues, PGRES_TUPLES_OK);
781    countPG = PQntuples(res);
782
783    ndidspg = malloc(sizeof(osmid_t)*countPG);
784    nodespg = malloc(sizeof(struct osmNode)*countPG);
785
786    if ((ndidspg == NULL) || (nodespg == NULL)) {
787        free(tmp2);
788        free(ndidspg);
789        free(nodespg);
790        PQclear(res);
791        return 0;
792    }
793
794    for (i = 0; i < countPG; i++) {
795        ndidspg[i] = strtoosmid(PQgetvalue(res, i, 0), NULL, 10); 
796#ifdef FIXED_POINT
797        nodespg[i].lat = FIX_TO_DOUBLE(strtol(PQgetvalue(res, i, 1), NULL, 10)); 
798        nodespg[i].lon = FIX_TO_DOUBLE(strtol(PQgetvalue(res, i, 2), NULL, 10)); 
799#else
800        nodespg[i].lat = strtod(PQgetvalue(res, i, 1), NULL); 
801        nodespg[i].lon = strtod(PQgetvalue(res, i, 2), NULL); 
802#endif
803    }
804 
805 
806    /* The list of results coming back from the db is in a different order to the list of nodes in the way.
807       Match the results back to the way node list */
808   
809    for (i=0; i<nd_count; i++ ) {
810        if ((isnan(nodes[i].lat)) || (isnan(nodes[i].lon))) {
811            /* TODO: implement an O(log(n)) algorithm to match node ids */
812            for (j = 0; j < countPG; j++) {
813                if (ndidspg[j] == ndids[i]) {
814                    nodes[i].lat = nodespg[j].lat;
815                    nodes[i].lon = nodespg[j].lon;
816                    count++;
817                    break;
818                }
819            }
820        }
821    }
822
823    /* If some of the nodes in the way don't exist, the returning list has holes.
824       As the rest of the code expects a continous list, it needs to be re-compacted */
825    if (count != nd_count) {
826        j = 0;
827        for (i = 0; i < nd_count; i++) {
828            if ( !isnan(nodes[i].lat)) {
829                nodes[j].lat = nodes[i].lat;
830                nodes[j].lon = nodes[i].lon;
831                j++;
832            }
833         }
834    }
835
836    PQclear(res);
837    free(tmp2);
838    free(ndidspg);
839    free(nodespg);
840
841    return count;
842}
843
844static int pgsql_nodes_delete(osmid_t osm_id)
845{
846    char const *paramValues[1];
847    char buffer[64];
848    /* Make sure we're out of copy mode */
849    pgsql_endCopy( node_table );
850   
851    sprintf( buffer, "%" PRIdOSMID, osm_id );
852    paramValues[0] = buffer;
853    pgsql_execPrepared(node_table->sql_conn, "delete_node", 1, paramValues, PGRES_COMMAND_OK );
854    return 0;
855}
856
857static int pgsql_node_changed(osmid_t osm_id)
858{
859    char const *paramValues[1];
860    char buffer[64];
861    /* Make sure we're out of copy mode */
862    pgsql_endCopy( way_table );
863    pgsql_endCopy( rel_table );
864   
865    sprintf( buffer, "%" PRIdOSMID, osm_id );
866    paramValues[0] = buffer;
867    pgsql_execPrepared(way_table->sql_conn, "node_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
868    pgsql_execPrepared(rel_table->sql_conn, "node_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
869    return 0;
870}
871
872static int pgsql_ways_set(osmid_t way_id, osmid_t *nds, int nd_count, struct keyval *tags, int pending)
873{
874    /* Three params: id, nodes, tags, pending */
875    char *paramValues[4];
876    char *buffer;
877
878    if( way_table->copyMode )
879    {
880      char *tag_buf = pgsql_store_tags(tags,1);
881      char *node_buf = pgsql_store_nodes(nds, nd_count);
882      int length = strlen(tag_buf) + strlen(node_buf) + 64;
883      buffer = alloca(length);
884      if( snprintf( buffer, length, "%" PRIdOSMID "\t%s\t%s\t%c\n", 
885              way_id, node_buf, tag_buf, pending?'t':'f' ) > (length-10) )
886      { fprintf( stderr, "buffer overflow way id %" PRIdOSMID "\n", way_id); return 1; }
887      return pgsql_CopyData(__FUNCTION__, way_table->sql_conn, buffer);
888    }
889    buffer = alloca(64);
890    paramValues[0] = buffer;
891    paramValues[3] = paramValues[0] + sprintf( paramValues[0], "%" PRIdOSMID, way_id ) + 1;
892    sprintf( paramValues[3], "%c", pending?'t':'f' );
893    paramValues[1] = pgsql_store_nodes(nds, nd_count);
894    paramValues[2] = pgsql_store_tags(tags,0);
895    pgsql_execPrepared(way_table->sql_conn, "insert_way", 4, (const char * const *)paramValues, PGRES_COMMAND_OK);
896    return 0;
897}
898
899/* Caller is responsible for freeing nodesptr & resetList(tags) */
900static int pgsql_ways_get(osmid_t id, struct keyval *tags, struct osmNode **nodes_ptr, int *count_ptr)
901{
902    PGresult   *res;
903    char tmp[16];
904    char const *paramValues[1];
905    PGconn *sql_conn = way_table->sql_conn;
906
907    /* Make sure we're out of copy mode */
908    pgsql_endCopy( way_table );
909
910    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
911    paramValues[0] = tmp;
912 
913    res = pgsql_execPrepared(sql_conn, "get_way", 1, paramValues, PGRES_TUPLES_OK);
914
915    if (PQntuples(res) != 1) {
916        PQclear(res);
917        return 1;
918    } 
919
920    pgsql_parse_tags( PQgetvalue(res, 0, 1), tags );
921
922    int num_nodes = strtol(PQgetvalue(res, 0, 2), NULL, 10);
923    osmid_t *list = alloca(sizeof(osmid_t)*num_nodes );
924    *nodes_ptr = malloc(sizeof(struct osmNode) * num_nodes);
925    pgsql_parse_nodes( PQgetvalue(res, 0, 0), list, num_nodes);
926   
927    *count_ptr = pgsql_nodes_get_list( *nodes_ptr, list, num_nodes);
928    PQclear(res);
929    return 0;
930}
931
932static int pgsql_ways_done(osmid_t id)
933{
934    char tmp[16];
935    char const *paramValues[1];
936    PGconn *sql_conn = way_table->sql_conn;
937
938    /* Make sure we're out of copy mode */
939    pgsql_endCopy( way_table );
940
941    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
942    paramValues[0] = tmp;
943 
944    pgsql_execPrepared(sql_conn, "way_done", 1, paramValues, PGRES_COMMAND_OK);
945
946    return 0;
947}
948
949static int pgsql_ways_delete(osmid_t osm_id)
950{
951    char const *paramValues[1];
952    char buffer[64];
953    /* Make sure we're out of copy mode */
954    pgsql_endCopy( way_table );
955   
956    sprintf( buffer, "%" PRIdOSMID, osm_id );
957    paramValues[0] = buffer;
958    pgsql_execPrepared(way_table->sql_conn, "delete_way", 1, paramValues, PGRES_COMMAND_OK );
959    return 0;
960}
961
962static void pgsql_iterate_ways(int (*callback)(osmid_t id, struct keyval *tags, struct osmNode *nodes, int count, int exists))
963{
964    PGresult   *res_ways;
965    int i, count = 0;
966    /* The flag we pass to indicate that the way in question might exist already in the database */
967    int exists = Append;
968
969    time_t start, end;
970    time(&start);
971    fprintf(stderr, "\nGoing over pending ways\n");
972
973    /* Make sure we're out of copy mode */
974    pgsql_endCopy( way_table );
975   
976    res_ways = pgsql_execPrepared(way_table->sql_conn, "pending_ways", 0, NULL, PGRES_TUPLES_OK);
977
978    //fprintf(stderr, "\nIterating ways\n");
979    for (i = 0; i < PQntuples(res_ways); i++) {
980        osmid_t id = strtoosmid(PQgetvalue(res_ways, i, 0), NULL, 10);
981        struct keyval tags;
982        struct osmNode *nodes;
983        int nd_count;
984
985        if (count++ %1000 == 0) {
986            time(&end);
987            fprintf(stderr, "\rprocessing way (%dk) at %.2fk/s", count/1000,
988            (int)(end - start) > 0 ? ((double)count / 1000.0 / (double)(end - start)) : 0);
989        }
990
991        initList(&tags);
992        if( pgsql_ways_get(id, &tags, &nodes, &nd_count) )
993          continue;
994         
995        callback(id, &tags, nodes, nd_count, exists);
996        pgsql_ways_done( id );
997
998        free(nodes);
999        resetList(&tags);
1000    }
1001
1002    PQclear(res_ways);
1003    fprintf(stderr, "\n");
1004    time(&end);
1005    if (end - start > 0)
1006        fprintf(stderr, "Pending ways took %ds at a rate of %.2f/s\n",(int)(end - start), ((double)count / (double)(end - start)));
1007}
1008
1009static int pgsql_way_changed(osmid_t osm_id)
1010{
1011    char const *paramValues[1];
1012    char buffer[64];
1013    /* Make sure we're out of copy mode */
1014    pgsql_endCopy( rel_table );
1015   
1016    sprintf( buffer, "%" PRIdOSMID, osm_id );
1017    paramValues[0] = buffer;
1018    pgsql_execPrepared(rel_table->sql_conn, "way_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
1019    return 0;
1020}
1021
1022static int pgsql_rels_set(osmid_t id, struct member *members, int member_count, struct keyval *tags)
1023{
1024    /* Params: id, way_off, rel_off, parts, members, tags */
1025    char *paramValues[6];
1026    char *buffer;
1027    int i;
1028    struct keyval member_list;
1029   
1030    osmid_t node_parts[member_count],
1031            way_parts[member_count],
1032            rel_parts[member_count];
1033    int node_count = 0, way_count = 0, rel_count = 0;
1034   
1035    osmid_t all_parts[member_count];
1036    int all_count = 0;
1037    initList( &member_list );   
1038    for( i=0; i<member_count; i++ )
1039    {
1040      char tag = 0;
1041      switch( members[i].type )
1042      {
1043        case OSMTYPE_NODE:     node_parts[node_count++] = members[i].id; tag = 'n'; break;
1044        case OSMTYPE_WAY:      way_parts[way_count++] = members[i].id; tag = 'w'; break;
1045        case OSMTYPE_RELATION: rel_parts[rel_count++] = members[i].id; tag = 'r'; break;
1046        default: fprintf( stderr, "Internal error: Unknown member type %d\n", members[i].type ); exit_nicely();
1047      }
1048      char buf[64];
1049      sprintf( buf, "%c%" PRIdOSMID, tag, members[i].id );
1050      addItem( &member_list, buf, members[i].role, 0 );
1051    }
1052    memcpy( all_parts+all_count, node_parts, node_count*sizeof(int) ); all_count+=node_count;
1053    memcpy( all_parts+all_count, way_parts, way_count*sizeof(int) ); all_count+=way_count;
1054    memcpy( all_parts+all_count, rel_parts, rel_count*sizeof(int) ); all_count+=rel_count;
1055 
1056    if( rel_table->copyMode )
1057    {
1058      char *tag_buf = strdup(pgsql_store_tags(tags,1));
1059      char *member_buf = pgsql_store_tags(&member_list,1);
1060      char *parts_buf = pgsql_store_nodes(all_parts, all_count);
1061      int length = strlen(member_buf) + strlen(tag_buf) + strlen(parts_buf) + 64;
1062      buffer = alloca(length);
1063      if( snprintf( buffer, length, "%" PRIdOSMID "\t%d\t%d\t%s\t%s\t%s\tf\n", 
1064              id, node_count, node_count+way_count, parts_buf, member_buf, tag_buf ) > (length-10) )
1065      { fprintf( stderr, "buffer overflow relation id %" PRIdOSMID "\n", id); return 1; }
1066      free(tag_buf);
1067      resetList(&member_list);
1068      return pgsql_CopyData(__FUNCTION__, rel_table->sql_conn, buffer);
1069    }
1070    buffer = alloca(64);
1071    paramValues[0] = buffer;
1072    paramValues[1] = paramValues[0] + sprintf( paramValues[0], "%" PRIdOSMID, id ) + 1;
1073    paramValues[2] = paramValues[1] + sprintf( paramValues[1], "%d", node_count ) + 1;
1074    sprintf( paramValues[2], "%d", node_count+way_count );
1075    paramValues[3] = pgsql_store_nodes(all_parts, all_count);
1076    paramValues[4] = pgsql_store_tags(&member_list,0);
1077    if( paramValues[4] )
1078        paramValues[4] = strdup(paramValues[4]);
1079    paramValues[5] = pgsql_store_tags(tags,0);
1080    pgsql_execPrepared(rel_table->sql_conn, "insert_rel", 6, (const char * const *)paramValues, PGRES_COMMAND_OK);
1081    if( paramValues[4] )
1082        free(paramValues[4]);
1083    resetList(&member_list);
1084    return 0;
1085}
1086
1087/* Caller is responsible for freeing members & resetList(tags) */
1088static int pgsql_rels_get(osmid_t id, struct member **members, int *member_count, struct keyval *tags)
1089{
1090    PGresult   *res;
1091    char tmp[16];
1092    char const *paramValues[1];
1093    PGconn *sql_conn = rel_table->sql_conn;
1094    struct keyval member_temp;
1095
1096    /* Make sure we're out of copy mode */
1097    pgsql_endCopy( rel_table );
1098
1099    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
1100    paramValues[0] = tmp;
1101 
1102    res = pgsql_execPrepared(sql_conn, "get_rel", 1, paramValues, PGRES_TUPLES_OK);
1103    /* Fields are: members, tags, member_count */
1104
1105    if (PQntuples(res) != 1) {
1106        PQclear(res);
1107        return 1;
1108    } 
1109
1110    pgsql_parse_tags( PQgetvalue(res, 0, 1), tags );
1111    initList(&member_temp);
1112    pgsql_parse_tags( PQgetvalue(res, 0, 0), &member_temp );
1113
1114    int num_members = strtol(PQgetvalue(res, 0, 2), NULL, 10);
1115    struct member *list = malloc( sizeof(struct member)*num_members );
1116   
1117    int i=0;
1118    struct keyval *item;
1119    while( (item = popItem(&member_temp)) )
1120    {
1121        if( i >= num_members )
1122        {
1123            fprintf(stderr, "Unexpected member_count reading relation %" PRIdOSMID "\n", id);
1124            exit_nicely();
1125        }
1126        char tag = item->key[0];
1127        list[i].type = (tag == 'n')?OSMTYPE_NODE:(tag == 'w')?OSMTYPE_WAY:(tag == 'r')?OSMTYPE_RELATION:-1;
1128        list[i].id = strtoosmid(item->key+1, NULL, 10 );
1129        list[i].role = strdup( item->value );
1130        freeItem(item);
1131        i++;
1132    }
1133    *members = list;
1134    *member_count = num_members;
1135    PQclear(res);
1136    return 0;
1137}
1138
1139static int pgsql_rels_done(osmid_t id)
1140{
1141    char tmp[16];
1142    char const *paramValues[1];
1143    PGconn *sql_conn = rel_table->sql_conn;
1144
1145    /* Make sure we're out of copy mode */
1146    pgsql_endCopy( rel_table );
1147
1148    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
1149    paramValues[0] = tmp;
1150 
1151    pgsql_execPrepared(sql_conn, "rel_done", 1, paramValues, PGRES_COMMAND_OK);
1152
1153    return 0;
1154}
1155
1156static int pgsql_rels_delete(osmid_t osm_id)
1157{
1158    char const *paramValues[1];
1159    char buffer[64];
1160    /* Make sure we're out of copy mode */
1161    pgsql_endCopy( rel_table );
1162   
1163    sprintf( buffer, "%" PRIdOSMID, osm_id );
1164    paramValues[0] = buffer;
1165    pgsql_execPrepared(rel_table->sql_conn, "delete_rel", 1, paramValues, PGRES_COMMAND_OK );
1166    return 0;
1167}
1168
1169static void pgsql_iterate_relations(int (*callback)(osmid_t id, struct member *members, int member_count, struct keyval *tags, int exists))
1170{
1171    PGresult   *res_rels;
1172    int i, count = 0;
1173    /* The flag we pass to indicate that the way in question might exist already in the database */
1174    int exists = Append;
1175
1176    time_t start, end;
1177    time(&start);
1178    fprintf(stderr, "\nGoing over pending relations\n");
1179
1180    /* Make sure we're out of copy mode */
1181    pgsql_endCopy( rel_table );
1182   
1183    res_rels = pgsql_execPrepared(rel_table->sql_conn, "pending_rels", 0, NULL, PGRES_TUPLES_OK);
1184
1185    //fprintf(stderr, "\nIterating ways\n");
1186    for (i = 0; i < PQntuples(res_rels); i++) {
1187        osmid_t id = strtoosmid(PQgetvalue(res_rels, i, 0), NULL, 10);
1188        struct keyval tags;
1189        struct member *members;
1190        int member_count;
1191
1192        if (count++ %10 == 0) {
1193            time(&end);
1194            fprintf(stderr, "\rprocessing relation (%d) at %.2f/s", count,
1195                    (int)(end - start) > 0 ? ((double)count / (double)(end - start)) : 0);
1196        }
1197
1198        initList(&tags);
1199        if( pgsql_rels_get(id, &members, &member_count, &tags) )
1200          continue;
1201         
1202        callback(id, members, member_count, &tags, exists);
1203        pgsql_rels_done( id );
1204
1205        free(members);
1206        resetList(&tags);
1207    }
1208
1209    PQclear(res_rels);
1210    fprintf(stderr, "\n");
1211    time(&end);
1212    if (end - start > 0)
1213        fprintf(stderr, "Pending relations took %ds at a rate of %.2f/s\n",(int)(end - start), ((double)count / (double)(end - start)));
1214}
1215
1216static int pgsql_rel_changed(osmid_t osm_id)
1217{
1218    char const *paramValues[1];
1219    char buffer[64];
1220    /* Make sure we're out of copy mode */
1221    pgsql_endCopy( rel_table );
1222   
1223    sprintf( buffer, "%" PRIdOSMID, osm_id );
1224    paramValues[0] = buffer;
1225    pgsql_execPrepared(rel_table->sql_conn, "rel_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
1226    return 0;
1227}
1228
1229static void pgsql_analyze(void)
1230{
1231    int i;
1232
1233    for (i=0; i<num_tables; i++) {
1234        PGconn *sql_conn = tables[i].sql_conn;
1235 
1236        if (tables[i].analyze) {
1237            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].analyze);
1238        }
1239    }
1240}
1241
1242static void pgsql_end(void)
1243{
1244    int i;
1245
1246    for (i=0; i<num_tables; i++) {
1247        PGconn *sql_conn = tables[i].sql_conn;
1248 
1249        // Commit transaction
1250        if (tables[i].stop) {
1251            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].stop);
1252        }
1253
1254    }
1255}
1256
1257/**
1258 * Helper to create SQL queries.
1259 *
1260 * Takes four arguments:
1261 * prefix, t (tablespace) and i (index tablespace) - these may be NULL
1262 * a pointer to an input string which, on success, is changed to point to the
1263 * result (caller takes ownership).
1264 *
1265 * The input string is mangled as follows:
1266 * %p replaced by the given prefix,
1267 * %i replaced by the given index tablespace
1268 * %t replaced by the given tablespace
1269 * other occurrences of the "%" char are treated normally.
1270 * any occurrence of { or } will be ignored (not copied to output string);
1271 * anything inside {} is only copied if it contained at least one of
1272 * %p, %i, %t that was not NULL.
1273 *
1274 * So, the input string
1275 *    Hello{ dear %i}!
1276 * will, if i is set to "John", translate to
1277 *    Hello dear John!
1278 * but if i is unset, translate to
1279 *    Hello!
1280 *
1281 * This is used for constructing SQL queries with proper tablespace settings.
1282 */
1283static inline void set_prefix_and_tbls(const char *prefix, const char *t, const char *i, const char **string)
1284{
1285    char buffer[1024];
1286    if (*string == NULL) return;
1287    const char *source = *string;
1288    char *dest = buffer;
1289    char *openbrace = NULL;
1290    int copied = 0;
1291
1292    while (*source) {
1293        if (*source == '{') {
1294            openbrace = dest;
1295            copied = 0;
1296            source++;
1297            continue;
1298        } else if (*source == '}') {
1299            if (!copied && openbrace) dest = openbrace;
1300            source++;
1301            continue;
1302        } else if (*source == '%') {
1303            if (*(source+1) == 'p') {
1304                if (prefix) {
1305                    strcpy(dest, prefix);
1306                    dest += strlen(prefix);
1307                    copied = 1;
1308                }
1309                source+=2;
1310                continue;
1311            } else if (*(source+1) == 't') {
1312                if (t) {
1313                    strcpy(dest, t);
1314                    dest += strlen(t);
1315                    copied = 1;
1316                }
1317                source+=2;
1318                continue;
1319            } else if (*(source+1) == 'i') {
1320                if (i) {
1321                    strcpy(dest, i);
1322                    dest += strlen(i);
1323                    copied = 1;
1324                }
1325                source+=2;
1326                continue;
1327            }
1328        }
1329        *(dest++) = *(source++);
1330    }
1331    *dest = 0;
1332    *string = strdup(buffer);
1333}
1334
1335static int build_indexes;
1336
1337static int pgsql_start(const struct output_options *options)
1338{
1339    PGresult   *res;
1340    int i;
1341    int dropcreate = !options->append;
1342
1343    scale = options->scale;
1344    Append = options->append;
1345   
1346    /* How much we can fit, and make sure it's odd */
1347    maxBlocks = (options->cache*((1024*1024)/(PER_BLOCK*sizeof(struct ramNode)))) | 1;
1348   
1349    init_blockCache(options->alloc_chunkwise);
1350
1351#ifdef __MINGW_H
1352    fprintf( stderr, "Mid: pgsql, scale=%d, cache=%dMB, maxblocks=%d*%d\n", scale, options->cache, maxBlocks, PER_BLOCK*sizeof(struct ramNode) ); 
1353#else
1354    fprintf( stderr, "Mid: pgsql, scale=%d, cache=%dMB, maxblocks=%d*%zd\n", scale, options->cache, maxBlocks, PER_BLOCK*sizeof(struct ramNode) );
1355#endif
1356   
1357    /* We use a connection per table to enable the use of COPY */
1358    for (i=0; i<num_tables; i++) {
1359        PGconn *sql_conn;
1360                       
1361        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].name));
1362        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].start));
1363        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].create));
1364        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].create_index));
1365        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].prepare));
1366        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].prepare_intarray));
1367        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].copy));
1368        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].analyze));
1369        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].stop));
1370        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].array_indexes));
1371
1372        fprintf(stderr, "Setting up table: %s\n", tables[i].name);
1373        sql_conn = PQconnectdb(options->conninfo);
1374
1375        /* Check to see that the backend connection was successfully made */
1376        if (PQstatus(sql_conn) != CONNECTION_OK) {
1377            fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(sql_conn));
1378            exit_nicely();
1379        }
1380        tables[i].sql_conn = sql_conn;
1381
1382        /* Not really the right place for this test, but we need a live
1383         * connection that not used for anything else yet, and we'd like to
1384         * warn users *before* we start doing mountains of work */
1385        if (i == t_node)
1386        {
1387            res = PQexec(sql_conn, "select 1 from pg_opclass where opcname='gist__intbig_ops'" );
1388            if(PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1)
1389            {
1390                /* intarray is problematic now; causes at least postgres 8.4
1391                 * to not use the index on nodes[]/parts[] which slows diff
1392                 * updates to a crawl!
1393                 * If someone find a way to fix this rather than bow out here,
1394                 * please do.*/
1395
1396                fprintf(stderr, 
1397                    "\n"
1398                    "The target database has the intarray contrib module loaded.\n"
1399                    "While required for earlier versions of osm2pgsql, intarray \n"
1400                    "is now unnecessary and will interfere with osm2pgsql's array\n"
1401                    "handling. Please use a database without intarray.\n\n");
1402                exit_nicely();
1403            }
1404            PQclear(res);
1405
1406            if (options->append)
1407            {
1408                res = PQexec(sql_conn, "select id from planet_osm_nodes limit 1" );
1409                if(PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1)
1410                {
1411                    int size = PQfsize(res, 0);
1412                    if (size != sizeof(osmid_t))
1413                    {
1414                        fprintf(stderr, 
1415                            "\n"
1416                            "The target database has been created with %dbit ID fields,\n"
1417                            "but this version of osm2pgsql has been compiled to use %ldbit IDs.\n"
1418                            "You cannot append data to this database with this program.\n"
1419                            "Either re-create the database or use a matching osm2pgsql.\n\n",
1420                            size * 8, sizeof(osmid_t) * 8);
1421                        exit_nicely();
1422                    }
1423                }
1424                PQclear(res);
1425            }
1426
1427            if(!options->append)
1428                build_indexes = 1;
1429        }
1430        if (dropcreate) {
1431            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE IF EXISTS %s", tables[i].name);
1432        }
1433
1434        if (tables[i].start) {
1435            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].start);
1436        }
1437
1438        if (dropcreate && tables[i].create) {
1439            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].create);
1440            if (tables[i].create_index) {
1441              pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].create_index);
1442            }
1443        }
1444
1445
1446        if (tables[i].prepare) {
1447            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare);
1448        }
1449
1450        if (tables[i].prepare_intarray) {
1451            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare_intarray);
1452        }
1453
1454        if (tables[i].copy) {
1455            pgsql_exec(sql_conn, PGRES_COPY_IN, "%s", tables[i].copy);
1456            tables[i].copyMode = 1;
1457        }
1458    }
1459
1460    return 0;
1461}
1462
1463static void *pgsql_stop_one(void *arg)
1464{
1465    struct table_desc *table = arg;
1466    PGconn *sql_conn = table->sql_conn;
1467
1468    fprintf(stderr, "Stopping table: %s\n", table->name);
1469    pgsql_endCopy(table);
1470    if (table->stop) 
1471        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", table->stop);
1472
1473    if (build_indexes && table->array_indexes) {
1474        char *buffer = (char *) malloc(strlen(table->array_indexes) + 99);
1475        // we need to insert before the TABLESPACE setting, if any
1476        char *insertpos = strstr(table->array_indexes, "TABLESPACE");
1477        if (!insertpos) insertpos = strchr(table->array_indexes, ';');
1478
1479        // automatically insert FASTUPDATE=OFF when creating,
1480        // indexes for PostgreSQL 8.4 and higher
1481        // see http://lists.openstreetmap.org/pipermail/dev/2011-January/021704.html
1482        if (insertpos && PQserverVersion(sql_conn) >= 80400) {
1483            char old = *insertpos;
1484            fprintf(stderr, "Building index on table: %s (fastupdate=off)\n", table->name);
1485            *insertpos = 0; // temporary null byte for following strcpy operation
1486            strcpy(buffer, table->array_indexes);
1487            *insertpos = old; // restore old content
1488            strcat(buffer, " WITH (FASTUPDATE=OFF)");
1489            strcat(buffer, insertpos);
1490        } else {
1491            fprintf(stderr, "Building index on table: %s\n", table->name);
1492            strcpy(buffer, table->array_indexes);
1493        }
1494        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", buffer);
1495        free(buffer);
1496    }
1497    PQfinish(sql_conn);
1498    table->sql_conn = NULL;
1499    fprintf(stderr, "Stopped table: %s\n", table->name);
1500    return NULL;
1501}
1502
1503static void pgsql_stop(void)
1504{
1505    int i;
1506#ifdef HAVE_PTHREAD
1507    pthread_t threads[num_tables];
1508#endif
1509
1510    fprintf( stderr, "node cache: stored: %" PRIdOSMID "(%.2f%%), storage efficiency: %.2f%%, hit rate: %.2f%%\n", 
1511             storedNodes, 100.0f*storedNodes/totalNodes, 100.0f*storedNodes/(usedBlocks*PER_BLOCK),
1512             100.0f*nodesCacheHits/nodesCacheLookups );
1513         
1514    free_blockCache();
1515    free(queue);
1516
1517#ifdef HAVE_PTHREAD
1518    for (i=0; i<num_tables; i++) {
1519        int ret = pthread_create(&threads[i], NULL, pgsql_stop_one, &tables[i]);
1520        if (ret) {
1521            fprintf(stderr, "pthread_create() returned an error (%d)", ret);
1522            exit_nicely();
1523        }
1524    }
1525    for (i=0; i<num_tables; i++) {
1526        int ret = pthread_join(threads[i], NULL);
1527        if (ret) {
1528            fprintf(stderr, "pthread_join() returned an error (%d)", ret);
1529            exit_nicely();
1530        }
1531    }
1532#else
1533    for (i=0; i<num_tables; i++)
1534        pgsql_stop_one(&tables[i]);
1535#endif
1536}
1537 
1538struct middle_t mid_pgsql = {
1539        .start             = pgsql_start,
1540        .stop              = pgsql_stop,
1541        .cleanup           = pgsql_cleanup,
1542        .analyze           = pgsql_analyze,
1543        .end               = pgsql_end,
1544
1545        .nodes_set         = pgsql_nodes_set,
1546#if 0
1547        .nodes_get         = pgsql_nodes_get,
1548#endif
1549        .nodes_get_list    = pgsql_nodes_get_list,
1550        .nodes_delete      = pgsql_nodes_delete,
1551        .node_changed      = pgsql_node_changed,
1552
1553        .ways_set          = pgsql_ways_set,
1554        .ways_get          = pgsql_ways_get,
1555        .ways_done         = pgsql_ways_done,
1556        .ways_delete       = pgsql_ways_delete,
1557        .way_changed       = pgsql_way_changed,
1558
1559        .relations_set     = pgsql_rels_set,
1560#if 0
1561        .relations_get     = pgsql_rels_get,
1562#endif
1563        .relations_done    = pgsql_rels_done,
1564        .relations_delete  = pgsql_rels_delete,
1565        .relation_changed  = pgsql_rel_changed,
1566#if 0
1567        .iterate_nodes     = pgsql_iterate_nodes,
1568#endif
1569        .iterate_ways      = pgsql_iterate_ways,
1570        .iterate_relations = pgsql_iterate_relations
1571};
Note: See TracBrowser for help on using the repository browser.