source: subversion/applications/utils/export/osm2pgsql/middle-pgsql.c @ 27182

Revision 27182, 48.0 KB checked in by apmon, 2 years ago (diff)

[osm2pgsql] Attempt to improve the transaction management during "going over pending ways/relations"

  • Using an extended transaction on the planet_osm_ways table during the pending ways phase should be fine. It is the planet_osm_rels table that needs to stay out of transaction in this phase, as the different pending ways processes can simultaniously update the pending status of the same relations if multiple ways occur in a relation
  • Remove the extended transactions in the going over pending relations phase on the output tables. The processing of relations during diff-import causes the ways to be removed from the output tables. If a way is contained in two relations in the same diff, that also happen to be processed in different parallel processes, this could dead lock.
Line 
1/* Implements the mid-layer processing for osm2pgsql
2 * using several PostgreSQL tables
3 *
4 * This layer stores data read in from the planet.osm file
5 * and is then read by the backend processing code to
6 * emit the final geometry-enabled output formats
7*/
8 
9#include "config.h"
10
11#include <stdio.h>
12#include <unistd.h>
13#include <stdlib.h>
14#include <string.h>
15#include <assert.h>
16#include <math.h>
17#include <time.h>
18
19#ifdef HAVE_PTHREAD
20#include <pthread.h>
21#endif
22#ifdef HAVE_SYS_WAIT_H
23#include <sys/wait.h>
24#endif
25
26#include <libpq-fe.h>
27
28#include "osmtypes.h"
29#include "middle.h"
30#include "middle-pgsql.h"
31#include "output-pgsql.h"
32#include "node-ram-cache.h"
33#include "pgsql.h"
34
35/* Store +-20,000km Mercator co-ordinates as fixed point 32bit number with maximum precision */
36/* Scale is chosen such that 40,000 * SCALE < 2^32          */
37#define FIXED_POINT
38
39static int scale = 100;
40#define DOUBLE_TO_FIX(x) ((int)((x) * scale))
41#define FIX_TO_DOUBLE(x) (((double)x) / scale)
42
43
44
45enum table_id {
46    t_node, t_way, t_rel
47} ;
48
49struct table_desc {
50    //enum table_id table;
51    const char *name;
52    const char *start;
53    const char *create;
54    const char *create_index;
55    const char *prepare;
56    const char *prepare_intarray;
57    const char *copy;
58    const char *analyze;
59    const char *stop;
60    const char *array_indexes;
61
62    int copyMode;    /* True if we are in copy mode */
63    int transactionMode;    /* True if we are in an extended transaction */
64    PGconn *sql_conn;
65};
66
67static struct table_desc tables [] = {
68    { 
69        //table = t_node,
70         .name = "%p_nodes",
71        .start = "BEGIN;\n",
72#ifdef FIXED_POINT
73       .create = "CREATE TABLE %p_nodes (id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, lat int4 not null, lon int4 not null, tags text[]) {TABLESPACE %t};\n",
74      .prepare = "PREPARE insert_node (" POSTGRES_OSMID_TYPE ", int4, int4, text[]) AS INSERT INTO %p_nodes VALUES ($1,$2,$3,$4);\n"
75#else
76       .create = "CREATE TABLE %p_nodes (id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, lat double precision not null, lon double precision not null, tags text[]) {TABLESPACE %t};\n",
77      .prepare = "PREPARE insert_node (" POSTGRES_OSMID_TYPE ", double precision, double precision, text[]) AS INSERT INTO %p_nodes VALUES ($1,$2,$3,$4);\n"
78#endif
79               "PREPARE get_node (" POSTGRES_OSMID_TYPE ") AS SELECT lat,lon,tags FROM %p_nodes WHERE id = $1 LIMIT 1;\n"
80               "PREPARE delete_node (" POSTGRES_OSMID_TYPE ") AS DELETE FROM %p_nodes WHERE id = $1;\n",
81.prepare_intarray = // This is to fetch lots of nodes simultaneously, in order including duplicates. The commented out version doesn't do duplicates
82                  // It's not optimal as it does a Nested Loop / Index Scan which is suboptimal for large arrays
83                  //"PREPARE get_node_list(int[]) AS SELECT id, lat, lon FROM %p_nodes WHERE id = ANY($1::int4[]) ORDER BY $1::int4[] # id\n",
84                 //"PREPARE get_node_list(" POSTGRES_OSMID_TYPE "[]) AS select y.id, y.lat, y.lon from (select i, ($1)[i] as l_id from (select generate_series(1,array_length($1, 1)) as i) x) z, "
85                 //                                    "(select * from %p_nodes where id = ANY($1)) y where l_id=id order by i;\n",
86                 "PREPARE get_node_list(" POSTGRES_OSMID_TYPE "[]) AS SELECT id, lat, lon FROM %p_nodes WHERE id = ANY($1::" POSTGRES_OSMID_TYPE "[])",
87         .copy = "COPY %p_nodes FROM STDIN;\n",
88      .analyze = "ANALYZE %p_nodes;\n",
89         .stop = "COMMIT;\n"
90    },
91    { 
92        //table = t_way,
93         .name = "%p_ways",
94        .start = "BEGIN;\n",
95       .create = "CREATE TABLE %p_ways (id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, nodes " POSTGRES_OSMID_TYPE "[] not null, tags text[], pending boolean not null) {TABLESPACE %t};\n",
96 .create_index = "CREATE INDEX %p_ways_idx ON %p_ways (id) {TABLESPACE %i} WHERE pending;\n",
97.array_indexes = "CREATE INDEX %p_ways_nodes ON %p_ways USING gin (nodes) {TABLESPACE %i};\n",
98      .prepare = "PREPARE insert_way (" POSTGRES_OSMID_TYPE ", " POSTGRES_OSMID_TYPE "[], text[], boolean) AS INSERT INTO %p_ways VALUES ($1,$2,$3,$4);\n"
99               "PREPARE get_way (" POSTGRES_OSMID_TYPE ") AS SELECT nodes, tags, array_upper(nodes,1) FROM %p_ways WHERE id = $1;\n"
100               "PREPARE way_done(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_ways SET pending = false WHERE id = $1;\n"
101               "PREPARE pending_ways AS SELECT id FROM %p_ways WHERE pending;\n"
102               "PREPARE delete_way(" POSTGRES_OSMID_TYPE ") AS DELETE FROM %p_ways WHERE id = $1;\n",
103.prepare_intarray = "PREPARE node_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_ways SET pending = true WHERE nodes && ARRAY[$1] AND NOT pending;\n",
104         .copy = "COPY %p_ways FROM STDIN;\n",
105      .analyze = "ANALYZE %p_ways;\n",
106         .stop =  "COMMIT;\n"
107    },
108    { 
109        //table = t_rel,
110         .name = "%p_rels",
111        .start = "BEGIN;\n",
112       .create = "CREATE TABLE %p_rels(id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, way_off int2, rel_off int2, parts " POSTGRES_OSMID_TYPE "[], members text[], tags text[], pending boolean not null) {TABLESPACE %t};\n",
113 .create_index = "CREATE INDEX %p_rels_idx ON %p_rels (id) {TABLESPACE %i} WHERE pending;\n",
114.array_indexes = "CREATE INDEX %p_rels_parts ON %p_rels USING gin (parts) {TABLESPACE %i};\n",
115      .prepare = "PREPARE insert_rel (" POSTGRES_OSMID_TYPE ", int2, int2, " POSTGRES_OSMID_TYPE "[], text[], text[]) AS INSERT INTO %p_rels VALUES ($1,$2,$3,$4,$5,$6,false);\n"
116               "PREPARE get_rel (" POSTGRES_OSMID_TYPE ") AS SELECT members, tags, array_upper(members,1)/2 FROM %p_rels WHERE id = $1;\n"
117               "PREPARE rel_done(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = false WHERE id = $1;\n"
118               "PREPARE pending_rels AS SELECT id FROM %p_rels WHERE pending;\n"
119               "PREPARE delete_rel(" POSTGRES_OSMID_TYPE ") AS DELETE FROM %p_rels WHERE id = $1;\n",
120.prepare_intarray =
121                "PREPARE node_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = true WHERE parts && ARRAY[$1] AND parts[1:way_off] && ARRAY[$1] AND NOT pending;\n"
122                "PREPARE way_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = true WHERE parts && ARRAY[$1] AND parts[way_off+1:rel_off] && ARRAY[$1] AND NOT pending;\n"
123                "PREPARE rel_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = true WHERE parts && ARRAY[$1] AND parts[rel_off+1:array_length(parts,1)] && ARRAY[$1] AND NOT pending;\n",
124         .copy = "COPY %p_rels FROM STDIN;\n",
125      .analyze = "ANALYZE %p_rels;\n",
126         .stop =  "COMMIT;\n"
127    }
128};
129
130static const int num_tables = sizeof(tables)/sizeof(tables[0]);
131static int warn_node_order;
132static struct table_desc *node_table = &tables[t_node];
133static struct table_desc *way_table  = &tables[t_way];
134static struct table_desc *rel_table  = &tables[t_rel];
135
136static int Append;
137
138const struct output_options *out_options;
139
140static int pgsql_connect(const struct output_options *options) {
141    int i;
142    /* We use a connection per table to enable the use of COPY */
143    for (i=0; i<num_tables; i++) {
144        PGconn *sql_conn;
145        sql_conn = PQconnectdb(options->conninfo);
146
147        /* Check to see that the backend connection was successfully made */
148        if (PQstatus(sql_conn) != CONNECTION_OK) {
149            fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(sql_conn));
150            exit_nicely();
151        }
152        tables[i].sql_conn = sql_conn;
153
154        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "SET synchronous_commit TO off;");
155
156        if (tables[i].prepare) {
157            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare);
158        }
159
160        if (tables[i].prepare_intarray) {
161            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare_intarray);
162        }
163
164    }
165    return 0;
166}
167
168static void pgsql_cleanup(void)
169{
170    int i;
171
172    for (i=0; i<num_tables; i++) {
173        if (tables[i].sql_conn) {
174            PQfinish(tables[i].sql_conn);
175            tables[i].sql_conn = NULL;
176        }
177    }
178}
179
180char *pgsql_store_nodes(osmid_t *nds, int nd_count)
181{
182  static char *buffer;
183  static int buflen;
184
185  char *ptr;
186  int i, first;
187   
188  if( buflen <= nd_count * 10 )
189  {
190    buflen = ((nd_count * 10) | 4095) + 1;  /* Round up to next page */
191    buffer = realloc( buffer, buflen );
192  }
193_restart:
194
195  ptr = buffer;
196  first = 1;
197  *ptr++ = '{';
198  for( i=0; i<nd_count; i++ )
199  {
200    if( !first ) *ptr++ = ',';
201    ptr += sprintf(ptr, "%" PRIdOSMID, nds[i] );
202   
203    if( (ptr-buffer) > (buflen-20) ) /* Almost overflowed? */
204    {
205      buflen <<= 1;
206      buffer = realloc( buffer, buflen );
207     
208      goto _restart;
209    }
210    first = 0;
211  }
212 
213  *ptr++ = '}';
214  *ptr++ = 0;
215 
216  return buffer;
217}
218
219/* Special escape routine for escaping strings in array constants: double quote, backslash,newline, tab*/
220static inline char *escape_tag( char *ptr, const char *in, int escape )
221{
222  while( *in )
223  {
224    switch(*in)
225    {
226      case '"':
227        if( escape ) *ptr++ = '\\';
228        *ptr++ = '\\';
229        *ptr++ = '"';
230        break;
231      case '\\':
232        if( escape ) *ptr++ = '\\';
233        if( escape ) *ptr++ = '\\';
234        *ptr++ = '\\';
235        *ptr++ = '\\';
236        break;
237      case '\n':
238        if( escape ) *ptr++ = '\\';
239        *ptr++ = '\\';
240        *ptr++ = 'n';
241        break;
242      case '\r':
243        if( escape ) *ptr++ = '\\';
244        *ptr++ = '\\';
245        *ptr++ = 'r';
246        break;
247      case '\t':
248        if( escape ) *ptr++ = '\\';
249        *ptr++ = '\\';
250        *ptr++ = 't';
251        break;
252      default:
253        *ptr++ = *in;
254        break;
255    }
256    in++;
257  }
258  return ptr;
259}
260
261/* escape means we return '\N' for copy mode, otherwise we return just NULL */
262char *pgsql_store_tags(struct keyval *tags, int escape)
263{
264  static char *buffer;
265  static int buflen;
266
267  char *ptr;
268  struct keyval *i;
269  int first;
270   
271  int countlist = countList(tags);
272  if( countlist == 0 )
273  {
274    if( escape )
275      return "\\N";
276    else
277      return NULL;
278  }
279   
280  if( buflen <= countlist * 24 ) /* LE so 0 always matches */
281  {
282    buflen = ((countlist * 24) | 4095) + 1;  /* Round up to next page */
283    buffer = realloc( buffer, buflen );
284  }
285_restart:
286
287  ptr = buffer;
288  first = 1;
289  *ptr++ = '{';
290  /* The lists are circular, exit when we reach the head again */
291  for( i=tags->next; i->key; i = i->next )
292  {
293    int maxlen = (strlen(i->key) + strlen(i->value)) * 4;
294    if( (ptr+maxlen-buffer) > (buflen-20) ) /* Almost overflowed? */
295    {
296      buflen <<= 1;
297      buffer = realloc( buffer, buflen );
298     
299      goto _restart;
300    }
301    if( !first ) *ptr++ = ',';
302    *ptr++ = '"';
303    ptr = escape_tag( ptr, i->key, escape );
304    *ptr++ = '"';
305    *ptr++ = ',';
306    *ptr++ = '"';
307    ptr = escape_tag( ptr, i->value, escape );
308    *ptr++ = '"';
309   
310    first=0;
311  }
312 
313  *ptr++ = '}';
314  *ptr++ = 0;
315 
316  return buffer;
317}
318
319/* Decodes a portion of an array literal from postgres */
320/* Argument should point to beginning of literal, on return points to delimiter */
321static const char *decode_upto( const char *src, char *dst )
322{
323  int quoted = (*src == '"');
324  if( quoted ) src++;
325 
326  while( quoted ? (*src != '"') : (*src != ',' && *src != '}') )
327  {
328    if( *src == '\\' )
329    {
330      switch( src[1] )
331      {
332        case 'n': *dst++ = '\n'; break;
333        case 't': *dst++ = '\t'; break;
334        default: *dst++ = src[1]; break;
335      }
336      src+=2;
337    }
338    else
339      *dst++ = *src++;
340  }
341  if( quoted ) src++;
342  *dst = 0;
343  return src;
344}
345
346static void pgsql_parse_tags( const char *string, struct keyval *tags )
347{
348  char key[1024];
349  char val[1024];
350 
351  if( *string == '\0' )
352    return;
353   
354//  fprintf( stderr, "Parsing: %s\n", string );
355  if( *string++ != '{' )
356    return;
357  while( *string != '}' )
358  {
359    string = decode_upto( string, key );
360    /* String points to the comma */
361    string++;
362    string = decode_upto( string, val );
363    /* String points to the comma or closing '}' */
364    addItem( tags, key, val, 0 );
365//    fprintf( stderr, "Extracted item: %s=%s\n", key, val );
366    if( *string == ',' )
367      string++;
368  }
369}
370
371/* Parses an array of integers */
372static void pgsql_parse_nodes(const char *src, osmid_t *nds, int nd_count )
373{
374  int count = 0;
375  const char *string = src;
376 
377  if( *string++ != '{' )
378    return;
379  while( *string != '}' )
380  {
381    char *ptr;
382    nds[count] = strtoosmid( string, &ptr, 10 );
383    string = ptr;
384    if( *string == ',' )
385      string++;
386    count++;
387  }
388  if( count != nd_count )
389  {
390    fprintf( stderr, "parse_nodes problem: '%s' expected %d got %d\n", src, nd_count, count );
391    exit_nicely();
392  }
393}
394
395static int pgsql_endCopy( struct table_desc *table)
396{
397    /* Terminate any pending COPY */
398     if (table->copyMode) {
399        PGconn *sql_conn = table->sql_conn;
400        int stop = PQputCopyEnd(sql_conn, NULL);
401        if (stop != 1) {
402            fprintf(stderr, "COPY_END for %s failed: %s\n", table->copy, PQerrorMessage(sql_conn));
403            exit_nicely();
404        }
405
406        PGresult *res = PQgetResult(sql_conn);
407        if (PQresultStatus(res) != PGRES_COMMAND_OK) {
408            fprintf(stderr, "COPY_END for %s failed: %s\n", table->copy, PQerrorMessage(sql_conn));
409            PQclear(res);
410            exit_nicely();
411        }
412        PQclear(res);
413        table->copyMode = 0;
414    }
415    return 0;
416}
417
418static int pgsql_nodes_set(osmid_t id, double lat, double lon, struct keyval *tags)
419{
420    /* Four params: id, lat, lon, tags */
421    char *paramValues[4];
422    char *buffer;
423
424    ram_cache_nodes_set( id, lat, lon, tags );
425    if( node_table->copyMode )
426    {
427      char *tag_buf = pgsql_store_tags(tags,1);
428      int length = strlen(tag_buf) + 64;
429      buffer = alloca( length );
430#ifdef FIXED_POINT
431      if( snprintf( buffer, length, "%" PRIdOSMID "\t%d\t%d\t%s\n", id, DOUBLE_TO_FIX(lat), DOUBLE_TO_FIX(lon), tag_buf ) > (length-10) )
432      { fprintf( stderr, "buffer overflow node id %" PRIdOSMID "\n", id); return 1; }
433#else
434      if( snprintf( buffer, length, "%" PRIdOSMID "\t%.10f\t%.10f\t%s\n", id, lat, lon, tag_buf ) > (length-10) )
435      { fprintf( stderr, "buffer overflow node id %" PRIdOSMID "\n", id); return 1; }
436#endif
437      return pgsql_CopyData(__FUNCTION__, node_table->sql_conn, buffer);
438    }
439    buffer = alloca(64);
440    paramValues[0] = buffer;
441    paramValues[1] = paramValues[0] + sprintf( paramValues[0], "%" PRIdOSMID, id ) + 1;
442#ifdef FIXED_POINT
443    paramValues[2] = paramValues[1] + sprintf( paramValues[1], "%d", DOUBLE_TO_FIX(lat) ) + 1;
444    sprintf( paramValues[2], "%d", DOUBLE_TO_FIX(lon) );
445#else
446    paramValues[2] = paramValues[1] + sprintf( paramValues[1], "%.10f", lat ) + 1;
447    sprintf( paramValues[2], "%.10f", lon );
448#endif
449    paramValues[3] = pgsql_store_tags(tags,0);
450    pgsql_execPrepared(node_table->sql_conn, "insert_node", 4, (const char * const *)paramValues, PGRES_COMMAND_OK);
451    return 0;
452}
453
454
455static int pgsql_nodes_get(struct osmNode *out, osmid_t id)
456{
457    /* Check cache first */
458    if( ram_cache_nodes_get( out, id ) == 0 )
459        return 0;
460     
461    PGresult   *res;
462    char tmp[16];
463    char const *paramValues[1];
464    PGconn *sql_conn = node_table->sql_conn;
465
466    /* Make sure we're out of copy mode */
467    pgsql_endCopy( node_table );
468
469    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
470    paramValues[0] = tmp;
471 
472    res = pgsql_execPrepared(sql_conn, "get_node", 1, paramValues, PGRES_TUPLES_OK);
473
474    if (PQntuples(res) != 1) {
475        PQclear(res);
476        return 1;
477    } 
478
479#ifdef FIXED_POINT
480    out->lat = FIX_TO_DOUBLE(strtol(PQgetvalue(res, 0, 0), NULL, 10));
481    out->lon = FIX_TO_DOUBLE(strtol(PQgetvalue(res, 0, 1), NULL, 10));
482#else
483    out->lat = strtod(PQgetvalue(res, 0, 0), NULL);
484    out->lon = strtod(PQgetvalue(res, 0, 1), NULL);
485#endif
486    PQclear(res);
487    return 0;
488}
489
490/* This should be made more efficient by using an IN(ARRAY[]) construct */
491static int pgsql_nodes_get_list(struct osmNode *nodes, osmid_t *ndids, int nd_count)
492{
493    char tmp[16];
494    char *tmp2; 
495    int count, count2, countDB, countPG, i,j;
496    osmid_t id;
497    osmid_t *ndidspg;
498    struct osmNode *nodespg;
499    struct osmNode *nodes2;
500    char const *paramValues[1]; 
501
502    PGresult *res;
503    PGconn *sql_conn = node_table->sql_conn;
504   
505    count = 0; countDB = 0;
506
507    tmp2 = malloc(sizeof(char)*nd_count*16);
508    if (tmp2 == NULL) return 0; /*failed to allocate memory, return */
509
510    /* create a list of ids in tmp2 to query the database  */
511    snprintf(tmp2, sizeof(tmp2), "{");
512    for( i=0; i<nd_count; i++ ) {
513        /* Check cache first */ 
514        if( ram_cache_nodes_get( &nodes[i], ndids[i]) == 0 ) {
515            count++;
516            continue;
517        }
518        countDB++;
519        /* Mark nodes as needing to be fetched from the DB */
520        nodes[i].lat = NAN;
521        nodes[i].lon = NAN;
522        snprintf(tmp, sizeof(tmp), "%" PRIdOSMID ",", ndids[i]);
523        strcat(tmp2,tmp);
524    }
525    tmp2[strlen(tmp2) - 1] = '}'; /* replace last , with } to complete list of ids*/
526 
527    if (countDB == 0) {
528        free(tmp2);
529        return count; /* All ids where in cache, so nothing more to do */
530    }
531 
532    pgsql_endCopy(node_table); 
533
534    paramValues[0] = tmp2; 
535    res = pgsql_execPrepared(sql_conn, "get_node_list", 1, paramValues, PGRES_TUPLES_OK);
536    countPG = PQntuples(res);
537
538    ndidspg = malloc(sizeof(osmid_t)*countPG);
539    nodespg = malloc(sizeof(struct osmNode)*countPG);
540
541    if ((ndidspg == NULL) || (nodespg == NULL)) {
542        free(tmp2);
543        free(ndidspg);
544        free(nodespg);
545        PQclear(res);
546        return 0;
547    }
548
549    for (i = 0; i < countPG; i++) {
550        ndidspg[i] = strtoosmid(PQgetvalue(res, i, 0), NULL, 10); 
551#ifdef FIXED_POINT
552        nodespg[i].lat = FIX_TO_DOUBLE(strtol(PQgetvalue(res, i, 1), NULL, 10)); 
553        nodespg[i].lon = FIX_TO_DOUBLE(strtol(PQgetvalue(res, i, 2), NULL, 10)); 
554#else
555        nodespg[i].lat = strtod(PQgetvalue(res, i, 1), NULL); 
556        nodespg[i].lon = strtod(PQgetvalue(res, i, 2), NULL); 
557#endif
558    }
559 
560 
561    /* The list of results coming back from the db is in a different order to the list of nodes in the way.
562       Match the results back to the way node list */
563   
564    for (i=0; i<nd_count; i++ ) {
565        if ((isnan(nodes[i].lat)) || (isnan(nodes[i].lon))) {
566            /* TODO: implement an O(log(n)) algorithm to match node ids */
567            for (j = 0; j < countPG; j++) {
568                if (ndidspg[j] == ndids[i]) {
569                    nodes[i].lat = nodespg[j].lat;
570                    nodes[i].lon = nodespg[j].lon;
571                    count++;
572                    break;
573                }
574            }
575        }
576    }
577
578    /* If some of the nodes in the way don't exist, the returning list has holes.
579       As the rest of the code expects a continous list, it needs to be re-compacted */
580    if (count != nd_count) {
581        j = 0;
582        for (i = 0; i < nd_count; i++) {
583            if ( !isnan(nodes[i].lat)) {
584                nodes[j].lat = nodes[i].lat;
585                nodes[j].lon = nodes[i].lon;
586                j++;
587            }
588         }
589    }
590
591    PQclear(res);
592    free(tmp2);
593    free(ndidspg);
594    free(nodespg);
595
596    return count;
597}
598
599static int pgsql_nodes_delete(osmid_t osm_id)
600{
601    char const *paramValues[1];
602    char buffer[64];
603    /* Make sure we're out of copy mode */
604    pgsql_endCopy( node_table );
605   
606    sprintf( buffer, "%" PRIdOSMID, osm_id );
607    paramValues[0] = buffer;
608    pgsql_execPrepared(node_table->sql_conn, "delete_node", 1, paramValues, PGRES_COMMAND_OK );
609    return 0;
610}
611
612static int pgsql_node_changed(osmid_t osm_id)
613{
614    char const *paramValues[1];
615    char buffer[64];
616    /* Make sure we're out of copy mode */
617    pgsql_endCopy( way_table );
618    pgsql_endCopy( rel_table );
619   
620    sprintf( buffer, "%" PRIdOSMID, osm_id );
621    paramValues[0] = buffer;
622    pgsql_execPrepared(way_table->sql_conn, "node_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
623    pgsql_execPrepared(rel_table->sql_conn, "node_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
624    return 0;
625}
626
627static int pgsql_ways_set(osmid_t way_id, osmid_t *nds, int nd_count, struct keyval *tags, int pending)
628{
629    /* Three params: id, nodes, tags, pending */
630    char *paramValues[4];
631    char *buffer;
632
633    if( way_table->copyMode )
634    {
635      char *tag_buf = pgsql_store_tags(tags,1);
636      char *node_buf = pgsql_store_nodes(nds, nd_count);
637      int length = strlen(tag_buf) + strlen(node_buf) + 64;
638      buffer = alloca(length);
639      if( snprintf( buffer, length, "%" PRIdOSMID "\t%s\t%s\t%c\n", 
640              way_id, node_buf, tag_buf, pending?'t':'f' ) > (length-10) )
641      { fprintf( stderr, "buffer overflow way id %" PRIdOSMID "\n", way_id); return 1; }
642      return pgsql_CopyData(__FUNCTION__, way_table->sql_conn, buffer);
643    }
644    buffer = alloca(64);
645    paramValues[0] = buffer;
646    paramValues[3] = paramValues[0] + sprintf( paramValues[0], "%" PRIdOSMID, way_id ) + 1;
647    sprintf( paramValues[3], "%c", pending?'t':'f' );
648    paramValues[1] = pgsql_store_nodes(nds, nd_count);
649    paramValues[2] = pgsql_store_tags(tags,0);
650    pgsql_execPrepared(way_table->sql_conn, "insert_way", 4, (const char * const *)paramValues, PGRES_COMMAND_OK);
651    return 0;
652}
653
654/* Caller is responsible for freeing nodesptr & resetList(tags) */
655static int pgsql_ways_get(osmid_t id, struct keyval *tags, struct osmNode **nodes_ptr, int *count_ptr)
656{
657    PGresult   *res;
658    char tmp[16];
659    char const *paramValues[1];
660    PGconn *sql_conn = way_table->sql_conn;
661
662    /* Make sure we're out of copy mode */
663    pgsql_endCopy( way_table );
664
665    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
666    paramValues[0] = tmp;
667 
668    res = pgsql_execPrepared(sql_conn, "get_way", 1, paramValues, PGRES_TUPLES_OK);
669
670    if (PQntuples(res) != 1) {
671        PQclear(res);
672        return 1;
673    } 
674
675    pgsql_parse_tags( PQgetvalue(res, 0, 1), tags );
676
677    int num_nodes = strtol(PQgetvalue(res, 0, 2), NULL, 10);
678    osmid_t *list = alloca(sizeof(osmid_t)*num_nodes );
679    *nodes_ptr = malloc(sizeof(struct osmNode) * num_nodes);
680    pgsql_parse_nodes( PQgetvalue(res, 0, 0), list, num_nodes);
681   
682    *count_ptr = pgsql_nodes_get_list( *nodes_ptr, list, num_nodes);
683    PQclear(res);
684    return 0;
685}
686
687static int pgsql_ways_done(osmid_t id)
688{
689    char tmp[16];
690    char const *paramValues[1];
691    PGconn *sql_conn = way_table->sql_conn;
692
693    /* Make sure we're out of copy mode */
694    pgsql_endCopy( way_table );
695
696    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
697    paramValues[0] = tmp;
698 
699    pgsql_execPrepared(sql_conn, "way_done", 1, paramValues, PGRES_COMMAND_OK);
700
701    return 0;
702}
703
704static int pgsql_ways_delete(osmid_t osm_id)
705{
706    char const *paramValues[1];
707    char buffer[64];
708    /* Make sure we're out of copy mode */
709    pgsql_endCopy( way_table );
710   
711    sprintf( buffer, "%" PRIdOSMID, osm_id );
712    paramValues[0] = buffer;
713    pgsql_execPrepared(way_table->sql_conn, "delete_way", 1, paramValues, PGRES_COMMAND_OK );
714    return 0;
715}
716
717static void pgsql_iterate_ways(int (*callback)(osmid_t id, struct keyval *tags, struct osmNode *nodes, int count, int exists))
718{
719    int noProcs = out_options->num_procs;
720    int pid = 0;
721    PGresult   *res_ways;
722    int i, p, count = 0;
723    /* The flag we pass to indicate that the way in question might exist already in the database */
724    int exists = Append;
725
726    time_t start, end;
727    time(&start);
728    fprintf(stderr, "\nGoing over pending ways\n");
729
730    /* Make sure we're out of copy mode */
731    pgsql_endCopy( way_table );
732   
733    res_ways = pgsql_execPrepared(way_table->sql_conn, "pending_ways", 0, NULL, PGRES_TUPLES_OK);
734
735   
736    /**
737     * To speed up processing of pending ways, fork noProcs worker processes
738     * each of which independently goes through an equal subset of the pending ways array
739     */
740    fprintf(stderr, "\nUsing %i helper-processes\n", noProcs);
741    for (p = 1; p < noProcs; p++) {
742        pid=fork();
743        if (pid==0) break;
744        if (pid==-1) {
745            fprintf(stderr,"WARNING: Failed to fork helper processes. Falling back to only using %i \n", p);
746            noProcs = p;
747            break;
748        }
749    }
750    if ((pid == 0) && (noProcs > 1)) {
751        /* After forking, need to reconnect to the postgresql db */
752        out_options->out->connect(out_options, 1);
753        pgsql_connect(out_options);
754    } else {
755        p = 0;
756    }
757
758    /* Only start an extended transaction on the ways table,
759     * which should cover the bulk of the update statements.
760     * The nodes table should not be written to in this phase.
761     * The relations table can't be wrapped in an extended
762     * transaction, as with prallel processing it may deadlock.
763     * Updating a way will trigger an update of the pending status
764     * on connected relations. This should not be as many updates,
765     * so in combination with the synchronous_comit = off it should be fine.
766     *
767     */
768    if (tables[t_way].start) {
769        pgsql_endCopy(&tables[t_way]);
770        pgsql_exec(tables[t_way].sql_conn, PGRES_COMMAND_OK, "%s", tables[t_way].start);
771        tables[t_way].transactionMode = 1;
772    }
773
774    //fprintf(stderr, "\nIterating ways\n");
775    /* Use a stride length of the number of worker processes,
776       starting with an offset for each worker process p */
777    for (i = p; i < PQntuples(res_ways); i+= noProcs) {
778        osmid_t id = strtoosmid(PQgetvalue(res_ways, i, 0), NULL, 10);
779        struct keyval tags;
780        struct osmNode *nodes;
781        int nd_count;
782
783        if (count++ %1000 == 0) {
784            time(&end);
785            fprintf(stderr, "\rprocessing way (%dk) at %.2fk/s", count/1000,
786            (int)(end - start) > 0 ? ((double)count / 1000.0 / (double)(end - start)) : 0);
787        }
788
789        initList(&tags);
790        if( pgsql_ways_get(id, &tags, &nodes, &nd_count) )
791          continue;
792         
793        callback(id, &tags, nodes, nd_count, exists);
794        pgsql_ways_done( id );
795
796        free(nodes);
797        resetList(&tags);
798    }
799
800    if (tables[t_way].stop && tables[t_way].transactionMode) {
801        pgsql_exec(tables[t_way].sql_conn, PGRES_COMMAND_OK, "%s", tables[t_way].stop);
802        tables[t_way].transactionMode = 0;
803    }
804
805    if ((pid == 0) && (noProcs > 1)) {
806        pgsql_cleanup();
807        out_options->out->close(1);
808        exit(0);
809    } else {
810        for (p = 0; p < noProcs; p++) wait(NULL);
811        fprintf(stderr, "\nAll child processes exited\n");
812    }
813
814   
815    fprintf(stderr, "\n");
816    time(&end);
817    if (end - start > 0)
818        fprintf(stderr, "Pending ways took %ds at a rate of %.2f/s\n",(int)(end - start), 
819                ((double)PQntuples(res_ways) / (double)(end - start)));
820    PQclear(res_ways);
821}
822
823static int pgsql_way_changed(osmid_t osm_id)
824{
825    char const *paramValues[1];
826    char buffer[64];
827    /* Make sure we're out of copy mode */
828    pgsql_endCopy( rel_table );
829   
830    sprintf( buffer, "%" PRIdOSMID, osm_id );
831    paramValues[0] = buffer;
832    pgsql_execPrepared(rel_table->sql_conn, "way_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
833    return 0;
834}
835
836static int pgsql_rels_set(osmid_t id, struct member *members, int member_count, struct keyval *tags)
837{
838    /* Params: id, way_off, rel_off, parts, members, tags */
839    char *paramValues[6];
840    char *buffer;
841    int i;
842    struct keyval member_list;
843   
844    osmid_t node_parts[member_count],
845            way_parts[member_count],
846            rel_parts[member_count];
847    int node_count = 0, way_count = 0, rel_count = 0;
848   
849    osmid_t all_parts[member_count];
850    int all_count = 0;
851    initList( &member_list );   
852    for( i=0; i<member_count; i++ )
853    {
854      char tag = 0;
855      switch( members[i].type )
856      {
857        case OSMTYPE_NODE:     node_parts[node_count++] = members[i].id; tag = 'n'; break;
858        case OSMTYPE_WAY:      way_parts[way_count++] = members[i].id; tag = 'w'; break;
859        case OSMTYPE_RELATION: rel_parts[rel_count++] = members[i].id; tag = 'r'; break;
860        default: fprintf( stderr, "Internal error: Unknown member type %d\n", members[i].type ); exit_nicely();
861      }
862      char buf[64];
863      sprintf( buf, "%c%" PRIdOSMID, tag, members[i].id );
864      addItem( &member_list, buf, members[i].role, 0 );
865    }
866    memcpy( all_parts+all_count, node_parts, node_count*sizeof(int) ); all_count+=node_count;
867    memcpy( all_parts+all_count, way_parts, way_count*sizeof(int) ); all_count+=way_count;
868    memcpy( all_parts+all_count, rel_parts, rel_count*sizeof(int) ); all_count+=rel_count;
869 
870    if( rel_table->copyMode )
871    {
872      char *tag_buf = strdup(pgsql_store_tags(tags,1));
873      char *member_buf = pgsql_store_tags(&member_list,1);
874      char *parts_buf = pgsql_store_nodes(all_parts, all_count);
875      int length = strlen(member_buf) + strlen(tag_buf) + strlen(parts_buf) + 64;
876      buffer = alloca(length);
877      if( snprintf( buffer, length, "%" PRIdOSMID "\t%d\t%d\t%s\t%s\t%s\tf\n", 
878              id, node_count, node_count+way_count, parts_buf, member_buf, tag_buf ) > (length-10) )
879      { fprintf( stderr, "buffer overflow relation id %" PRIdOSMID "\n", id); return 1; }
880      free(tag_buf);
881      resetList(&member_list);
882      return pgsql_CopyData(__FUNCTION__, rel_table->sql_conn, buffer);
883    }
884    buffer = alloca(64);
885    paramValues[0] = buffer;
886    paramValues[1] = paramValues[0] + sprintf( paramValues[0], "%" PRIdOSMID, id ) + 1;
887    paramValues[2] = paramValues[1] + sprintf( paramValues[1], "%d", node_count ) + 1;
888    sprintf( paramValues[2], "%d", node_count+way_count );
889    paramValues[3] = pgsql_store_nodes(all_parts, all_count);
890    paramValues[4] = pgsql_store_tags(&member_list,0);
891    if( paramValues[4] )
892        paramValues[4] = strdup(paramValues[4]);
893    paramValues[5] = pgsql_store_tags(tags,0);
894    pgsql_execPrepared(rel_table->sql_conn, "insert_rel", 6, (const char * const *)paramValues, PGRES_COMMAND_OK);
895    if( paramValues[4] )
896        free(paramValues[4]);
897    resetList(&member_list);
898    return 0;
899}
900
901/* Caller is responsible for freeing members & resetList(tags) */
902static int pgsql_rels_get(osmid_t id, struct member **members, int *member_count, struct keyval *tags)
903{
904    PGresult   *res;
905    char tmp[16];
906    char const *paramValues[1];
907    PGconn *sql_conn = rel_table->sql_conn;
908    struct keyval member_temp;
909
910    /* Make sure we're out of copy mode */
911    pgsql_endCopy( rel_table );
912
913    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
914    paramValues[0] = tmp;
915 
916    res = pgsql_execPrepared(sql_conn, "get_rel", 1, paramValues, PGRES_TUPLES_OK);
917    /* Fields are: members, tags, member_count */
918
919    if (PQntuples(res) != 1) {
920        PQclear(res);
921        return 1;
922    } 
923
924    pgsql_parse_tags( PQgetvalue(res, 0, 1), tags );
925    initList(&member_temp);
926    pgsql_parse_tags( PQgetvalue(res, 0, 0), &member_temp );
927
928    int num_members = strtol(PQgetvalue(res, 0, 2), NULL, 10);
929    struct member *list = malloc( sizeof(struct member)*num_members );
930   
931    int i=0;
932    struct keyval *item;
933    while( (item = popItem(&member_temp)) )
934    {
935        if( i >= num_members )
936        {
937            fprintf(stderr, "Unexpected member_count reading relation %" PRIdOSMID "\n", id);
938            exit_nicely();
939        }
940        char tag = item->key[0];
941        list[i].type = (tag == 'n')?OSMTYPE_NODE:(tag == 'w')?OSMTYPE_WAY:(tag == 'r')?OSMTYPE_RELATION:-1;
942        list[i].id = strtoosmid(item->key+1, NULL, 10 );
943        list[i].role = strdup( item->value );
944        freeItem(item);
945        i++;
946    }
947    *members = list;
948    *member_count = num_members;
949    PQclear(res);
950    return 0;
951}
952
953static int pgsql_rels_done(osmid_t id)
954{
955    char tmp[16];
956    char const *paramValues[1];
957    PGconn *sql_conn = rel_table->sql_conn;
958
959    /* Make sure we're out of copy mode */
960    pgsql_endCopy( rel_table );
961
962    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
963    paramValues[0] = tmp;
964 
965    pgsql_execPrepared(sql_conn, "rel_done", 1, paramValues, PGRES_COMMAND_OK);
966
967    return 0;
968}
969
970static int pgsql_rels_delete(osmid_t osm_id)
971{
972    char const *paramValues[1];
973    char buffer[64];
974    /* Make sure we're out of copy mode */
975    pgsql_endCopy( rel_table );
976   
977    sprintf( buffer, "%" PRIdOSMID, osm_id );
978    paramValues[0] = buffer;
979    pgsql_execPrepared(rel_table->sql_conn, "delete_rel", 1, paramValues, PGRES_COMMAND_OK );
980    return 0;
981}
982
983static void pgsql_iterate_relations(int (*callback)(osmid_t id, struct member *members, int member_count, struct keyval *tags, int exists))
984{
985    PGresult   *res_rels;
986    int noProcs = out_options->num_procs;
987    int pid;
988    int i, p, count = 0;
989    /* The flag we pass to indicate that the way in question might exist already in the database */
990    int exists = Append;
991
992    time_t start, end;
993    time(&start);
994    fprintf(stderr, "\nGoing over pending relations\n");
995
996    /* Make sure we're out of copy mode */
997    pgsql_endCopy( rel_table );
998   
999    res_rels = pgsql_execPrepared(rel_table->sql_conn, "pending_rels", 0, NULL, PGRES_TUPLES_OK);
1000
1001    fprintf(stderr, "\nUsing %i helper-processes\n", noProcs);
1002    for (p = 1; p < noProcs; p++) {
1003        pid=fork();
1004        if (pid==0) break;
1005        if (pid==-1) {
1006            fprintf(stderr,"WARNING: Failed to fork helper processes. Falling back to only using %i \n", p);
1007            noProcs = p;
1008            break;
1009        }
1010    }
1011    if ((pid == 0) && (noProcs > 1)) {
1012        out_options->out->connect(out_options, 0);
1013        pgsql_connect(out_options);
1014    } else {
1015        p = 0;
1016    }
1017
1018    //fprintf(stderr, "\nIterating ways\n");
1019    for (i = p; i < PQntuples(res_rels); i+= noProcs) {
1020        osmid_t id = strtoosmid(PQgetvalue(res_rels, i, 0), NULL, 10);
1021        struct keyval tags;
1022        struct member *members;
1023        int member_count;
1024
1025        if (count++ %10 == 0) {
1026            time(&end);
1027            fprintf(stderr, "\rprocessing relation (%d) at %.2f/s", count,
1028                    (int)(end - start) > 0 ? ((double)count / (double)(end - start)) : 0);
1029        }
1030
1031        initList(&tags);
1032        if( pgsql_rels_get(id, &members, &member_count, &tags) )
1033          continue;
1034         
1035        callback(id, members, member_count, &tags, exists);
1036        pgsql_rels_done( id );
1037
1038        free(members);
1039        resetList(&tags);
1040    }
1041
1042    if ((pid == 0) && (noProcs > 1)) {
1043        pgsql_cleanup();
1044        out_options->out->close(0);
1045        exit(0);
1046    } else {
1047        for (p = 0; p < noProcs; p++) wait(NULL);
1048        fprintf(stderr, "\nAll child processes exited\n");
1049    }
1050
1051    PQclear(res_rels);
1052    fprintf(stderr, "\n");
1053    time(&end);
1054    if (end - start > 0)
1055        fprintf(stderr, "Pending relations took %ds at a rate of %.2f/s\n",(int)(end - start), ((double)count / (double)(end - start)));
1056}
1057
1058static int pgsql_rel_changed(osmid_t osm_id)
1059{
1060    char const *paramValues[1];
1061    char buffer[64];
1062    /* Make sure we're out of copy mode */
1063    pgsql_endCopy( rel_table );
1064   
1065    sprintf( buffer, "%" PRIdOSMID, osm_id );
1066    paramValues[0] = buffer;
1067    pgsql_execPrepared(rel_table->sql_conn, "rel_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
1068    return 0;
1069}
1070
1071static void pgsql_analyze(void)
1072{
1073    int i;
1074
1075    for (i=0; i<num_tables; i++) {
1076        PGconn *sql_conn = tables[i].sql_conn;
1077 
1078        if (tables[i].analyze) {
1079            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].analyze);
1080        }
1081    }
1082}
1083
1084static void pgsql_end(void)
1085{
1086    int i;
1087
1088    for (i=0; i<num_tables; i++) {
1089        PGconn *sql_conn = tables[i].sql_conn;
1090 
1091        // Commit transaction
1092        if (tables[i].stop && tables[i].transactionMode) {
1093            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].stop);
1094            tables[i].transactionMode = 0;
1095        }
1096
1097    }
1098}
1099
1100/**
1101 * Helper to create SQL queries.
1102 *
1103 * Takes four arguments:
1104 * prefix, t (tablespace) and i (index tablespace) - these may be NULL
1105 * a pointer to an input string which, on success, is changed to point to the
1106 * result (caller takes ownership).
1107 *
1108 * The input string is mangled as follows:
1109 * %p replaced by the given prefix,
1110 * %i replaced by the given index tablespace
1111 * %t replaced by the given tablespace
1112 * other occurrences of the "%" char are treated normally.
1113 * any occurrence of { or } will be ignored (not copied to output string);
1114 * anything inside {} is only copied if it contained at least one of
1115 * %p, %i, %t that was not NULL.
1116 *
1117 * So, the input string
1118 *    Hello{ dear %i}!
1119 * will, if i is set to "John", translate to
1120 *    Hello dear John!
1121 * but if i is unset, translate to
1122 *    Hello!
1123 *
1124 * This is used for constructing SQL queries with proper tablespace settings.
1125 */
1126static inline void set_prefix_and_tbls(const char *prefix, const char *t, const char *i, const char **string)
1127{
1128    char buffer[1024];
1129    if (*string == NULL) return;
1130    const char *source = *string;
1131    char *dest = buffer;
1132    char *openbrace = NULL;
1133    int copied = 0;
1134
1135    while (*source) {
1136        if (*source == '{') {
1137            openbrace = dest;
1138            copied = 0;
1139            source++;
1140            continue;
1141        } else if (*source == '}') {
1142            if (!copied && openbrace) dest = openbrace;
1143            source++;
1144            continue;
1145        } else if (*source == '%') {
1146            if (*(source+1) == 'p') {
1147                if (prefix) {
1148                    strcpy(dest, prefix);
1149                    dest += strlen(prefix);
1150                    copied = 1;
1151                }
1152                source+=2;
1153                continue;
1154            } else if (*(source+1) == 't') {
1155                if (t) {
1156                    strcpy(dest, t);
1157                    dest += strlen(t);
1158                    copied = 1;
1159                }
1160                source+=2;
1161                continue;
1162            } else if (*(source+1) == 'i') {
1163                if (i) {
1164                    strcpy(dest, i);
1165                    dest += strlen(i);
1166                    copied = 1;
1167                }
1168                source+=2;
1169                continue;
1170            }
1171        }
1172        *(dest++) = *(source++);
1173    }
1174    *dest = 0;
1175    *string = strdup(buffer);
1176}
1177
1178static int build_indexes;
1179
1180
1181static int pgsql_start(const struct output_options *options)
1182{
1183    PGresult   *res;
1184    int i;
1185    int dropcreate = !options->append;
1186
1187    scale = options->scale;
1188    Append = options->append;
1189
1190    out_options = options;
1191   
1192    init_node_ram_cache( options->alloc_chunkwise | ALLOC_LOSSY, options->cache, scale);
1193
1194    fprintf( stderr, "Mid: pgsql, scale=%d\n", scale, options->cache);
1195   
1196    /* We use a connection per table to enable the use of COPY */
1197    for (i=0; i<num_tables; i++) {
1198        PGconn *sql_conn;
1199                       
1200        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].name));
1201        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].start));
1202        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].create));
1203        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].create_index));
1204        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].prepare));
1205        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].prepare_intarray));
1206        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].copy));
1207        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].analyze));
1208        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].stop));
1209        set_prefix_and_tbls(options->prefix, options->tblsslim_data, options->tblsslim_index, &(tables[i].array_indexes));
1210
1211        fprintf(stderr, "Setting up table: %s\n", tables[i].name);
1212        sql_conn = PQconnectdb(options->conninfo);
1213
1214        /* Check to see that the backend connection was successfully made */
1215        if (PQstatus(sql_conn) != CONNECTION_OK) {
1216            fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(sql_conn));
1217            exit_nicely();
1218        }
1219        tables[i].sql_conn = sql_conn;
1220
1221        /*
1222         * To allow for parallelisation, the second phase (iterate_ways), cannot be run
1223         * in an extended transaction and each update statement is its own transaction.
1224         * Therefore commit rate of postgresql is very important to ensure high speed.
1225         * If fsync is enabled to ensure safe transactions, the commit rate can be very low.
1226         * To compensate for this, one can set the postgresql parameter synchronous_commit
1227         * to off. This means an update statement returns to the client as success before the
1228         * transaction is saved to disk via fsync, which in return allows to bunch up multiple
1229         * transactions into a single fsync. This may result in some data loss in the case of a
1230         * database crash. However, as we don't currently have the ability to restart a full osm2pgsql
1231         * import session anyway, this is fine. Diff imports are also not effected, as the next
1232         * diff import would simply deal with all pending ways that were not previously finished.
1233         * This parameter does not effect safety from data corruption on the back-end.
1234         */
1235        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "SET synchronous_commit TO off;");
1236
1237        /* Not really the right place for this test, but we need a live
1238         * connection that not used for anything else yet, and we'd like to
1239         * warn users *before* we start doing mountains of work */
1240        if (i == t_node)
1241        {
1242            res = PQexec(sql_conn, "select 1 from pg_opclass where opcname='gist__intbig_ops'" );
1243            if(PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1)
1244            {
1245                /* intarray is problematic now; causes at least postgres 8.4
1246                 * to not use the index on nodes[]/parts[] which slows diff
1247                 * updates to a crawl!
1248                 * If someone find a way to fix this rather than bow out here,
1249                 * please do.*/
1250
1251                fprintf(stderr, 
1252                    "\n"
1253                    "The target database has the intarray contrib module loaded.\n"
1254                    "While required for earlier versions of osm2pgsql, intarray \n"
1255                    "is now unnecessary and will interfere with osm2pgsql's array\n"
1256                    "handling. Please use a database without intarray.\n\n");
1257                exit_nicely();
1258            }
1259            PQclear(res);
1260
1261            if (options->append)
1262            {
1263                res = PQexec(sql_conn, "select id from planet_osm_nodes limit 1" );
1264                if(PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1)
1265                {
1266                    int size = PQfsize(res, 0);
1267                    if (size != sizeof(osmid_t))
1268                    {
1269                        fprintf(stderr, 
1270                            "\n"
1271                            "The target database has been created with %dbit ID fields,\n"
1272                            "but this version of osm2pgsql has been compiled to use %ldbit IDs.\n"
1273                            "You cannot append data to this database with this program.\n"
1274                            "Either re-create the database or use a matching osm2pgsql.\n\n",
1275                            size * 8, sizeof(osmid_t) * 8);
1276                        exit_nicely();
1277                    }
1278                }
1279                PQclear(res);
1280            }
1281
1282            if(!options->append)
1283                build_indexes = 1;
1284        }
1285        if (dropcreate) {
1286            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE IF EXISTS %s", tables[i].name);
1287        }
1288
1289        if (tables[i].start) {
1290            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].start);
1291            tables[i].transactionMode = 1;
1292        }
1293
1294        if (dropcreate && tables[i].create) {
1295            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].create);
1296            if (tables[i].create_index) {
1297              pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].create_index);
1298            }
1299        }
1300
1301
1302        if (tables[i].prepare) {
1303            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare);
1304        }
1305
1306        if (tables[i].prepare_intarray) {
1307            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare_intarray);
1308        }
1309
1310        if (tables[i].copy) {
1311            pgsql_exec(sql_conn, PGRES_COPY_IN, "%s", tables[i].copy);
1312            tables[i].copyMode = 1;
1313        }
1314    }
1315
1316    return 0;
1317}
1318
1319static void pgsql_commit(void) {
1320    int i;
1321    for (i=0; i<num_tables; i++) {
1322        PGconn *sql_conn = tables[i].sql_conn;
1323        pgsql_endCopy(&tables[i]);
1324        if (tables[i].stop && tables[i].transactionMode) {
1325            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].stop);
1326            tables[i].transactionMode = 0;
1327        }
1328    }
1329}
1330
1331static void *pgsql_stop_one(void *arg)
1332{
1333    time_t start, end;
1334   
1335    struct table_desc *table = arg;
1336    PGconn *sql_conn = table->sql_conn;
1337
1338    fprintf(stderr, "Stopping table: %s\n", table->name);
1339    pgsql_endCopy(table);
1340    //if (table->stop)
1341    //    pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", table->stop);
1342    time(&start);
1343    if (!out_options->droptemp)
1344    {
1345        if (build_indexes && table->array_indexes) {
1346            char *buffer = (char *) malloc(strlen(table->array_indexes) + 99);
1347            // we need to insert before the TABLESPACE setting, if any
1348            char *insertpos = strstr(table->array_indexes, "TABLESPACE");
1349            if (!insertpos) insertpos = strchr(table->array_indexes, ';');
1350
1351            // automatically insert FASTUPDATE=OFF when creating,
1352            // indexes for PostgreSQL 8.4 and higher
1353            // see http://lists.openstreetmap.org/pipermail/dev/2011-January/021704.html
1354            if (insertpos && PQserverVersion(sql_conn) >= 80400) {
1355                char old = *insertpos;
1356                fprintf(stderr, "Building index on table: %s (fastupdate=off)\n", table->name);
1357                *insertpos = 0; // temporary null byte for following strcpy operation
1358                strcpy(buffer, table->array_indexes);
1359                *insertpos = old; // restore old content
1360                strcat(buffer, " WITH (FASTUPDATE=OFF)");
1361                strcat(buffer, insertpos);
1362            } else {
1363                fprintf(stderr, "Building index on table: %s\n", table->name);
1364                strcpy(buffer, table->array_indexes);
1365            }
1366            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", buffer);
1367            free(buffer);
1368        }
1369    }
1370    else
1371    {
1372        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "drop table %s", table->name);
1373    }
1374    PQfinish(sql_conn);
1375    table->sql_conn = NULL;
1376    time(&end);
1377    fprintf(stderr, "Stopped table: %s in %is\n", table->name, (int)(end - start));
1378    return NULL;
1379}
1380
1381static void pgsql_stop(void)
1382{
1383    int i;
1384#ifdef HAVE_PTHREAD
1385    pthread_t threads[num_tables];
1386#endif
1387
1388    free_node_ram_cache();
1389
1390#ifdef HAVE_PTHREAD
1391    for (i=0; i<num_tables; i++) {
1392        int ret = pthread_create(&threads[i], NULL, pgsql_stop_one, &tables[i]);
1393        if (ret) {
1394            fprintf(stderr, "pthread_create() returned an error (%d)", ret);
1395            exit_nicely();
1396        }
1397    }
1398    for (i=0; i<num_tables; i++) {
1399        int ret = pthread_join(threads[i], NULL);
1400        if (ret) {
1401            fprintf(stderr, "pthread_join() returned an error (%d)", ret);
1402            exit_nicely();
1403        }
1404    }
1405#else
1406    for (i=0; i<num_tables; i++)
1407        pgsql_stop_one(&tables[i]);
1408#endif
1409}
1410 
1411struct middle_t mid_pgsql = {
1412        .start             = pgsql_start,
1413        .stop              = pgsql_stop,
1414        .cleanup           = pgsql_cleanup,
1415        .analyze           = pgsql_analyze,
1416        .end               = pgsql_end,
1417        .commit            = pgsql_commit,
1418
1419        .nodes_set         = pgsql_nodes_set,
1420#if 0
1421        .nodes_get         = pgsql_nodes_get,
1422#endif
1423        .nodes_get_list    = pgsql_nodes_get_list,
1424        .nodes_delete      = pgsql_nodes_delete,
1425        .node_changed      = pgsql_node_changed,
1426
1427        .ways_set          = pgsql_ways_set,
1428        .ways_get          = pgsql_ways_get,
1429        .ways_done         = pgsql_ways_done,
1430        .ways_delete       = pgsql_ways_delete,
1431        .way_changed       = pgsql_way_changed,
1432
1433        .relations_set     = pgsql_rels_set,
1434#if 0
1435        .relations_get     = pgsql_rels_get,
1436#endif
1437        .relations_done    = pgsql_rels_done,
1438        .relations_delete  = pgsql_rels_delete,
1439        .relation_changed  = pgsql_rel_changed,
1440#if 0
1441        .iterate_nodes     = pgsql_iterate_nodes,
1442#endif
1443        .iterate_ways      = pgsql_iterate_ways,
1444        .iterate_relations = pgsql_iterate_relations
1445};
Note: See TracBrowser for help on using the repository browser.